1'use strict' 2 3const events = require('events') 4 5const contentPath = require('./path') 6const fs = require('fs/promises') 7const { moveFile } = require('@npmcli/fs') 8const { Minipass } = require('minipass') 9const Pipeline = require('minipass-pipeline') 10const Flush = require('minipass-flush') 11const path = require('path') 12const ssri = require('ssri') 13const uniqueFilename = require('unique-filename') 14const fsm = require('fs-minipass') 15 16module.exports = write 17 18// Cache of move operations in process so we don't duplicate 19const moveOperations = new Map() 20 21async function write (cache, data, opts = {}) { 22 const { algorithms, size, integrity } = opts 23 24 if (typeof size === 'number' && data.length !== size) { 25 throw sizeError(size, data.length) 26 } 27 28 const sri = ssri.fromData(data, algorithms ? { algorithms } : {}) 29 if (integrity && !ssri.checkData(data, integrity, opts)) { 30 throw checksumError(integrity, sri) 31 } 32 33 for (const algo in sri) { 34 const tmp = await makeTmp(cache, opts) 35 const hash = sri[algo].toString() 36 try { 37 await fs.writeFile(tmp.target, data, { flag: 'wx' }) 38 await moveToDestination(tmp, cache, hash, opts) 39 } finally { 40 if (!tmp.moved) { 41 await fs.rm(tmp.target, { recursive: true, force: true }) 42 } 43 } 44 } 45 return { integrity: sri, size: data.length } 46} 47 48module.exports.stream = writeStream 49 50// writes proxied to the 'inputStream' that is passed to the Promise 51// 'end' is deferred until content is handled. 52class CacacheWriteStream extends Flush { 53 constructor (cache, opts) { 54 super() 55 this.opts = opts 56 this.cache = cache 57 this.inputStream = new Minipass() 58 this.inputStream.on('error', er => this.emit('error', er)) 59 this.inputStream.on('drain', () => this.emit('drain')) 60 this.handleContentP = null 61 } 62 63 write (chunk, encoding, cb) { 64 if (!this.handleContentP) { 65 this.handleContentP = handleContent( 66 this.inputStream, 67 this.cache, 68 this.opts 69 ) 70 this.handleContentP.catch(error => this.emit('error', error)) 71 } 72 return this.inputStream.write(chunk, encoding, cb) 73 } 74 75 flush (cb) { 76 this.inputStream.end(() => { 77 if (!this.handleContentP) { 78 const e = new Error('Cache input stream was empty') 79 e.code = 'ENODATA' 80 // empty streams are probably emitting end right away. 81 // defer this one tick by rejecting a promise on it. 82 return Promise.reject(e).catch(cb) 83 } 84 // eslint-disable-next-line promise/catch-or-return 85 this.handleContentP.then( 86 (res) => { 87 res.integrity && this.emit('integrity', res.integrity) 88 // eslint-disable-next-line promise/always-return 89 res.size !== null && this.emit('size', res.size) 90 cb() 91 }, 92 (er) => cb(er) 93 ) 94 }) 95 } 96} 97 98function writeStream (cache, opts = {}) { 99 return new CacacheWriteStream(cache, opts) 100} 101 102async function handleContent (inputStream, cache, opts) { 103 const tmp = await makeTmp(cache, opts) 104 try { 105 const res = await pipeToTmp(inputStream, cache, tmp.target, opts) 106 await moveToDestination( 107 tmp, 108 cache, 109 res.integrity, 110 opts 111 ) 112 return res 113 } finally { 114 if (!tmp.moved) { 115 await fs.rm(tmp.target, { recursive: true, force: true }) 116 } 117 } 118} 119 120async function pipeToTmp (inputStream, cache, tmpTarget, opts) { 121 const outStream = new fsm.WriteStream(tmpTarget, { 122 flags: 'wx', 123 }) 124 125 if (opts.integrityEmitter) { 126 // we need to create these all simultaneously since they can fire in any order 127 const [integrity, size] = await Promise.all([ 128 events.once(opts.integrityEmitter, 'integrity').then(res => res[0]), 129 events.once(opts.integrityEmitter, 'size').then(res => res[0]), 130 new Pipeline(inputStream, outStream).promise(), 131 ]) 132 return { integrity, size } 133 } 134 135 let integrity 136 let size 137 const hashStream = ssri.integrityStream({ 138 integrity: opts.integrity, 139 algorithms: opts.algorithms, 140 size: opts.size, 141 }) 142 hashStream.on('integrity', i => { 143 integrity = i 144 }) 145 hashStream.on('size', s => { 146 size = s 147 }) 148 149 const pipeline = new Pipeline(inputStream, hashStream, outStream) 150 await pipeline.promise() 151 return { integrity, size } 152} 153 154async function makeTmp (cache, opts) { 155 const tmpTarget = uniqueFilename(path.join(cache, 'tmp'), opts.tmpPrefix) 156 await fs.mkdir(path.dirname(tmpTarget), { recursive: true }) 157 return { 158 target: tmpTarget, 159 moved: false, 160 } 161} 162 163async function moveToDestination (tmp, cache, sri, opts) { 164 const destination = contentPath(cache, sri) 165 const destDir = path.dirname(destination) 166 if (moveOperations.has(destination)) { 167 return moveOperations.get(destination) 168 } 169 moveOperations.set( 170 destination, 171 fs.mkdir(destDir, { recursive: true }) 172 .then(async () => { 173 await moveFile(tmp.target, destination, { overwrite: false }) 174 tmp.moved = true 175 return tmp.moved 176 }) 177 .catch(err => { 178 if (!err.message.startsWith('The destination file exists')) { 179 throw Object.assign(err, { code: 'EEXIST' }) 180 } 181 }).finally(() => { 182 moveOperations.delete(destination) 183 }) 184 185 ) 186 return moveOperations.get(destination) 187} 188 189function sizeError (expected, found) { 190 /* eslint-disable-next-line max-len */ 191 const err = new Error(`Bad data size: expected inserted data to be ${expected} bytes, but got ${found} instead`) 192 err.expected = expected 193 err.found = found 194 err.code = 'EBADSIZE' 195 return err 196} 197 198function checksumError (expected, found) { 199 const err = new Error(`Integrity check failed: 200 Wanted: ${expected} 201 Found: ${found}`) 202 err.code = 'EINTEGRITY' 203 err.expected = expected 204 err.found = found 205 return err 206} 207