1'use strict' 2 3// A readable tar stream creator 4// Technically, this is a transform stream that you write paths into, 5// and tar format comes out of. 6// The `add()` method is like `write()` but returns this, 7// and end() return `this` as well, so you can 8// do `new Pack(opt).add('files').add('dir').end().pipe(output) 9// You could also do something like: 10// streamOfPaths().pipe(new Pack()).pipe(new fs.WriteStream('out.tar')) 11 12class PackJob { 13 constructor (path, absolute) { 14 this.path = path || './' 15 this.absolute = absolute 16 this.entry = null 17 this.stat = null 18 this.readdir = null 19 this.pending = false 20 this.ignore = false 21 this.piped = false 22 } 23} 24 25const { Minipass } = require('minipass') 26const zlib = require('minizlib') 27const ReadEntry = require('./read-entry.js') 28const WriteEntry = require('./write-entry.js') 29const WriteEntrySync = WriteEntry.Sync 30const WriteEntryTar = WriteEntry.Tar 31const Yallist = require('yallist') 32const EOF = Buffer.alloc(1024) 33const ONSTAT = Symbol('onStat') 34const ENDED = Symbol('ended') 35const QUEUE = Symbol('queue') 36const CURRENT = Symbol('current') 37const PROCESS = Symbol('process') 38const PROCESSING = Symbol('processing') 39const PROCESSJOB = Symbol('processJob') 40const JOBS = Symbol('jobs') 41const JOBDONE = Symbol('jobDone') 42const ADDFSENTRY = Symbol('addFSEntry') 43const ADDTARENTRY = Symbol('addTarEntry') 44const STAT = Symbol('stat') 45const READDIR = Symbol('readdir') 46const ONREADDIR = Symbol('onreaddir') 47const PIPE = Symbol('pipe') 48const ENTRY = Symbol('entry') 49const ENTRYOPT = Symbol('entryOpt') 50const WRITEENTRYCLASS = Symbol('writeEntryClass') 51const WRITE = Symbol('write') 52const ONDRAIN = Symbol('ondrain') 53 54const fs = require('fs') 55const path = require('path') 56const warner = require('./warn-mixin.js') 57const normPath = require('./normalize-windows-path.js') 58 59const Pack = warner(class Pack extends Minipass { 60 constructor (opt) { 61 super(opt) 62 opt = opt || Object.create(null) 63 this.opt = opt 64 this.file = opt.file || '' 65 this.cwd = opt.cwd || process.cwd() 66 this.maxReadSize = opt.maxReadSize 67 this.preservePaths = !!opt.preservePaths 68 this.strict = !!opt.strict 69 this.noPax = !!opt.noPax 70 this.prefix = normPath(opt.prefix || '') 71 this.linkCache = opt.linkCache || new Map() 72 this.statCache = opt.statCache || new Map() 73 this.readdirCache = opt.readdirCache || new Map() 74 75 this[WRITEENTRYCLASS] = WriteEntry 76 if (typeof opt.onwarn === 'function') { 77 this.on('warn', opt.onwarn) 78 } 79 80 this.portable = !!opt.portable 81 this.zip = null 82 83 if (opt.gzip || opt.brotli) { 84 if (opt.gzip && opt.brotli) { 85 throw new TypeError('gzip and brotli are mutually exclusive') 86 } 87 if (opt.gzip) { 88 if (typeof opt.gzip !== 'object') { 89 opt.gzip = {} 90 } 91 if (this.portable) { 92 opt.gzip.portable = true 93 } 94 this.zip = new zlib.Gzip(opt.gzip) 95 } 96 if (opt.brotli) { 97 if (typeof opt.brotli !== 'object') { 98 opt.brotli = {} 99 } 100 this.zip = new zlib.BrotliCompress(opt.brotli) 101 } 102 this.zip.on('data', chunk => super.write(chunk)) 103 this.zip.on('end', _ => super.end()) 104 this.zip.on('drain', _ => this[ONDRAIN]()) 105 this.on('resume', _ => this.zip.resume()) 106 } else { 107 this.on('drain', this[ONDRAIN]) 108 } 109 110 this.noDirRecurse = !!opt.noDirRecurse 111 this.follow = !!opt.follow 112 this.noMtime = !!opt.noMtime 113 this.mtime = opt.mtime || null 114 115 this.filter = typeof opt.filter === 'function' ? opt.filter : _ => true 116 117 this[QUEUE] = new Yallist() 118 this[JOBS] = 0 119 this.jobs = +opt.jobs || 4 120 this[PROCESSING] = false 121 this[ENDED] = false 122 } 123 124 [WRITE] (chunk) { 125 return super.write(chunk) 126 } 127 128 add (path) { 129 this.write(path) 130 return this 131 } 132 133 end (path) { 134 if (path) { 135 this.write(path) 136 } 137 this[ENDED] = true 138 this[PROCESS]() 139 return this 140 } 141 142 write (path) { 143 if (this[ENDED]) { 144 throw new Error('write after end') 145 } 146 147 if (path instanceof ReadEntry) { 148 this[ADDTARENTRY](path) 149 } else { 150 this[ADDFSENTRY](path) 151 } 152 return this.flowing 153 } 154 155 [ADDTARENTRY] (p) { 156 const absolute = normPath(path.resolve(this.cwd, p.path)) 157 // in this case, we don't have to wait for the stat 158 if (!this.filter(p.path, p)) { 159 p.resume() 160 } else { 161 const job = new PackJob(p.path, absolute, false) 162 job.entry = new WriteEntryTar(p, this[ENTRYOPT](job)) 163 job.entry.on('end', _ => this[JOBDONE](job)) 164 this[JOBS] += 1 165 this[QUEUE].push(job) 166 } 167 168 this[PROCESS]() 169 } 170 171 [ADDFSENTRY] (p) { 172 const absolute = normPath(path.resolve(this.cwd, p)) 173 this[QUEUE].push(new PackJob(p, absolute)) 174 this[PROCESS]() 175 } 176 177 [STAT] (job) { 178 job.pending = true 179 this[JOBS] += 1 180 const stat = this.follow ? 'stat' : 'lstat' 181 fs[stat](job.absolute, (er, stat) => { 182 job.pending = false 183 this[JOBS] -= 1 184 if (er) { 185 this.emit('error', er) 186 } else { 187 this[ONSTAT](job, stat) 188 } 189 }) 190 } 191 192 [ONSTAT] (job, stat) { 193 this.statCache.set(job.absolute, stat) 194 job.stat = stat 195 196 // now we have the stat, we can filter it. 197 if (!this.filter(job.path, stat)) { 198 job.ignore = true 199 } 200 201 this[PROCESS]() 202 } 203 204 [READDIR] (job) { 205 job.pending = true 206 this[JOBS] += 1 207 fs.readdir(job.absolute, (er, entries) => { 208 job.pending = false 209 this[JOBS] -= 1 210 if (er) { 211 return this.emit('error', er) 212 } 213 this[ONREADDIR](job, entries) 214 }) 215 } 216 217 [ONREADDIR] (job, entries) { 218 this.readdirCache.set(job.absolute, entries) 219 job.readdir = entries 220 this[PROCESS]() 221 } 222 223 [PROCESS] () { 224 if (this[PROCESSING]) { 225 return 226 } 227 228 this[PROCESSING] = true 229 for (let w = this[QUEUE].head; 230 w !== null && this[JOBS] < this.jobs; 231 w = w.next) { 232 this[PROCESSJOB](w.value) 233 if (w.value.ignore) { 234 const p = w.next 235 this[QUEUE].removeNode(w) 236 w.next = p 237 } 238 } 239 240 this[PROCESSING] = false 241 242 if (this[ENDED] && !this[QUEUE].length && this[JOBS] === 0) { 243 if (this.zip) { 244 this.zip.end(EOF) 245 } else { 246 super.write(EOF) 247 super.end() 248 } 249 } 250 } 251 252 get [CURRENT] () { 253 return this[QUEUE] && this[QUEUE].head && this[QUEUE].head.value 254 } 255 256 [JOBDONE] (job) { 257 this[QUEUE].shift() 258 this[JOBS] -= 1 259 this[PROCESS]() 260 } 261 262 [PROCESSJOB] (job) { 263 if (job.pending) { 264 return 265 } 266 267 if (job.entry) { 268 if (job === this[CURRENT] && !job.piped) { 269 this[PIPE](job) 270 } 271 return 272 } 273 274 if (!job.stat) { 275 if (this.statCache.has(job.absolute)) { 276 this[ONSTAT](job, this.statCache.get(job.absolute)) 277 } else { 278 this[STAT](job) 279 } 280 } 281 if (!job.stat) { 282 return 283 } 284 285 // filtered out! 286 if (job.ignore) { 287 return 288 } 289 290 if (!this.noDirRecurse && job.stat.isDirectory() && !job.readdir) { 291 if (this.readdirCache.has(job.absolute)) { 292 this[ONREADDIR](job, this.readdirCache.get(job.absolute)) 293 } else { 294 this[READDIR](job) 295 } 296 if (!job.readdir) { 297 return 298 } 299 } 300 301 // we know it doesn't have an entry, because that got checked above 302 job.entry = this[ENTRY](job) 303 if (!job.entry) { 304 job.ignore = true 305 return 306 } 307 308 if (job === this[CURRENT] && !job.piped) { 309 this[PIPE](job) 310 } 311 } 312 313 [ENTRYOPT] (job) { 314 return { 315 onwarn: (code, msg, data) => this.warn(code, msg, data), 316 noPax: this.noPax, 317 cwd: this.cwd, 318 absolute: job.absolute, 319 preservePaths: this.preservePaths, 320 maxReadSize: this.maxReadSize, 321 strict: this.strict, 322 portable: this.portable, 323 linkCache: this.linkCache, 324 statCache: this.statCache, 325 noMtime: this.noMtime, 326 mtime: this.mtime, 327 prefix: this.prefix, 328 } 329 } 330 331 [ENTRY] (job) { 332 this[JOBS] += 1 333 try { 334 return new this[WRITEENTRYCLASS](job.path, this[ENTRYOPT](job)) 335 .on('end', () => this[JOBDONE](job)) 336 .on('error', er => this.emit('error', er)) 337 } catch (er) { 338 this.emit('error', er) 339 } 340 } 341 342 [ONDRAIN] () { 343 if (this[CURRENT] && this[CURRENT].entry) { 344 this[CURRENT].entry.resume() 345 } 346 } 347 348 // like .pipe() but using super, because our write() is special 349 [PIPE] (job) { 350 job.piped = true 351 352 if (job.readdir) { 353 job.readdir.forEach(entry => { 354 const p = job.path 355 const base = p === './' ? '' : p.replace(/\/*$/, '/') 356 this[ADDFSENTRY](base + entry) 357 }) 358 } 359 360 const source = job.entry 361 const zip = this.zip 362 363 if (zip) { 364 source.on('data', chunk => { 365 if (!zip.write(chunk)) { 366 source.pause() 367 } 368 }) 369 } else { 370 source.on('data', chunk => { 371 if (!super.write(chunk)) { 372 source.pause() 373 } 374 }) 375 } 376 } 377 378 pause () { 379 if (this.zip) { 380 this.zip.pause() 381 } 382 return super.pause() 383 } 384}) 385 386class PackSync extends Pack { 387 constructor (opt) { 388 super(opt) 389 this[WRITEENTRYCLASS] = WriteEntrySync 390 } 391 392 // pause/resume are no-ops in sync streams. 393 pause () {} 394 resume () {} 395 396 [STAT] (job) { 397 const stat = this.follow ? 'statSync' : 'lstatSync' 398 this[ONSTAT](job, fs[stat](job.absolute)) 399 } 400 401 [READDIR] (job, stat) { 402 this[ONREADDIR](job, fs.readdirSync(job.absolute)) 403 } 404 405 // gotta get it all in this tick 406 [PIPE] (job) { 407 const source = job.entry 408 const zip = this.zip 409 410 if (job.readdir) { 411 job.readdir.forEach(entry => { 412 const p = job.path 413 const base = p === './' ? '' : p.replace(/\/*$/, '/') 414 this[ADDFSENTRY](base + entry) 415 }) 416 } 417 418 if (zip) { 419 source.on('data', chunk => { 420 zip.write(chunk) 421 }) 422 } else { 423 source.on('data', chunk => { 424 super[WRITE](chunk) 425 }) 426 } 427 } 428} 429 430Pack.Sync = PackSync 431 432module.exports = Pack 433