1'use strict'
2
3// A readable tar stream creator
4// Technically, this is a transform stream that you write paths into,
5// and tar format comes out of.
6// The `add()` method is like `write()` but returns this,
7// and end() return `this` as well, so you can
8// do `new Pack(opt).add('files').add('dir').end().pipe(output)
9// You could also do something like:
10// streamOfPaths().pipe(new Pack()).pipe(new fs.WriteStream('out.tar'))
11
12class PackJob {
13  constructor (path, absolute) {
14    this.path = path || './'
15    this.absolute = absolute
16    this.entry = null
17    this.stat = null
18    this.readdir = null
19    this.pending = false
20    this.ignore = false
21    this.piped = false
22  }
23}
24
25const { Minipass } = require('minipass')
26const zlib = require('minizlib')
27const ReadEntry = require('./read-entry.js')
28const WriteEntry = require('./write-entry.js')
29const WriteEntrySync = WriteEntry.Sync
30const WriteEntryTar = WriteEntry.Tar
31const Yallist = require('yallist')
32const EOF = Buffer.alloc(1024)
33const ONSTAT = Symbol('onStat')
34const ENDED = Symbol('ended')
35const QUEUE = Symbol('queue')
36const CURRENT = Symbol('current')
37const PROCESS = Symbol('process')
38const PROCESSING = Symbol('processing')
39const PROCESSJOB = Symbol('processJob')
40const JOBS = Symbol('jobs')
41const JOBDONE = Symbol('jobDone')
42const ADDFSENTRY = Symbol('addFSEntry')
43const ADDTARENTRY = Symbol('addTarEntry')
44const STAT = Symbol('stat')
45const READDIR = Symbol('readdir')
46const ONREADDIR = Symbol('onreaddir')
47const PIPE = Symbol('pipe')
48const ENTRY = Symbol('entry')
49const ENTRYOPT = Symbol('entryOpt')
50const WRITEENTRYCLASS = Symbol('writeEntryClass')
51const WRITE = Symbol('write')
52const ONDRAIN = Symbol('ondrain')
53
54const fs = require('fs')
55const path = require('path')
56const warner = require('./warn-mixin.js')
57const normPath = require('./normalize-windows-path.js')
58
59const Pack = warner(class Pack extends Minipass {
60  constructor (opt) {
61    super(opt)
62    opt = opt || Object.create(null)
63    this.opt = opt
64    this.file = opt.file || ''
65    this.cwd = opt.cwd || process.cwd()
66    this.maxReadSize = opt.maxReadSize
67    this.preservePaths = !!opt.preservePaths
68    this.strict = !!opt.strict
69    this.noPax = !!opt.noPax
70    this.prefix = normPath(opt.prefix || '')
71    this.linkCache = opt.linkCache || new Map()
72    this.statCache = opt.statCache || new Map()
73    this.readdirCache = opt.readdirCache || new Map()
74
75    this[WRITEENTRYCLASS] = WriteEntry
76    if (typeof opt.onwarn === 'function') {
77      this.on('warn', opt.onwarn)
78    }
79
80    this.portable = !!opt.portable
81    this.zip = null
82
83    if (opt.gzip || opt.brotli) {
84      if (opt.gzip && opt.brotli) {
85        throw new TypeError('gzip and brotli are mutually exclusive')
86      }
87      if (opt.gzip) {
88        if (typeof opt.gzip !== 'object') {
89          opt.gzip = {}
90        }
91        if (this.portable) {
92          opt.gzip.portable = true
93        }
94        this.zip = new zlib.Gzip(opt.gzip)
95      }
96      if (opt.brotli) {
97        if (typeof opt.brotli !== 'object') {
98          opt.brotli = {}
99        }
100        this.zip = new zlib.BrotliCompress(opt.brotli)
101      }
102      this.zip.on('data', chunk => super.write(chunk))
103      this.zip.on('end', _ => super.end())
104      this.zip.on('drain', _ => this[ONDRAIN]())
105      this.on('resume', _ => this.zip.resume())
106    } else {
107      this.on('drain', this[ONDRAIN])
108    }
109
110    this.noDirRecurse = !!opt.noDirRecurse
111    this.follow = !!opt.follow
112    this.noMtime = !!opt.noMtime
113    this.mtime = opt.mtime || null
114
115    this.filter = typeof opt.filter === 'function' ? opt.filter : _ => true
116
117    this[QUEUE] = new Yallist()
118    this[JOBS] = 0
119    this.jobs = +opt.jobs || 4
120    this[PROCESSING] = false
121    this[ENDED] = false
122  }
123
124  [WRITE] (chunk) {
125    return super.write(chunk)
126  }
127
128  add (path) {
129    this.write(path)
130    return this
131  }
132
133  end (path) {
134    if (path) {
135      this.write(path)
136    }
137    this[ENDED] = true
138    this[PROCESS]()
139    return this
140  }
141
142  write (path) {
143    if (this[ENDED]) {
144      throw new Error('write after end')
145    }
146
147    if (path instanceof ReadEntry) {
148      this[ADDTARENTRY](path)
149    } else {
150      this[ADDFSENTRY](path)
151    }
152    return this.flowing
153  }
154
155  [ADDTARENTRY] (p) {
156    const absolute = normPath(path.resolve(this.cwd, p.path))
157    // in this case, we don't have to wait for the stat
158    if (!this.filter(p.path, p)) {
159      p.resume()
160    } else {
161      const job = new PackJob(p.path, absolute, false)
162      job.entry = new WriteEntryTar(p, this[ENTRYOPT](job))
163      job.entry.on('end', _ => this[JOBDONE](job))
164      this[JOBS] += 1
165      this[QUEUE].push(job)
166    }
167
168    this[PROCESS]()
169  }
170
171  [ADDFSENTRY] (p) {
172    const absolute = normPath(path.resolve(this.cwd, p))
173    this[QUEUE].push(new PackJob(p, absolute))
174    this[PROCESS]()
175  }
176
177  [STAT] (job) {
178    job.pending = true
179    this[JOBS] += 1
180    const stat = this.follow ? 'stat' : 'lstat'
181    fs[stat](job.absolute, (er, stat) => {
182      job.pending = false
183      this[JOBS] -= 1
184      if (er) {
185        this.emit('error', er)
186      } else {
187        this[ONSTAT](job, stat)
188      }
189    })
190  }
191
192  [ONSTAT] (job, stat) {
193    this.statCache.set(job.absolute, stat)
194    job.stat = stat
195
196    // now we have the stat, we can filter it.
197    if (!this.filter(job.path, stat)) {
198      job.ignore = true
199    }
200
201    this[PROCESS]()
202  }
203
204  [READDIR] (job) {
205    job.pending = true
206    this[JOBS] += 1
207    fs.readdir(job.absolute, (er, entries) => {
208      job.pending = false
209      this[JOBS] -= 1
210      if (er) {
211        return this.emit('error', er)
212      }
213      this[ONREADDIR](job, entries)
214    })
215  }
216
217  [ONREADDIR] (job, entries) {
218    this.readdirCache.set(job.absolute, entries)
219    job.readdir = entries
220    this[PROCESS]()
221  }
222
223  [PROCESS] () {
224    if (this[PROCESSING]) {
225      return
226    }
227
228    this[PROCESSING] = true
229    for (let w = this[QUEUE].head;
230      w !== null && this[JOBS] < this.jobs;
231      w = w.next) {
232      this[PROCESSJOB](w.value)
233      if (w.value.ignore) {
234        const p = w.next
235        this[QUEUE].removeNode(w)
236        w.next = p
237      }
238    }
239
240    this[PROCESSING] = false
241
242    if (this[ENDED] && !this[QUEUE].length && this[JOBS] === 0) {
243      if (this.zip) {
244        this.zip.end(EOF)
245      } else {
246        super.write(EOF)
247        super.end()
248      }
249    }
250  }
251
252  get [CURRENT] () {
253    return this[QUEUE] && this[QUEUE].head && this[QUEUE].head.value
254  }
255
256  [JOBDONE] (job) {
257    this[QUEUE].shift()
258    this[JOBS] -= 1
259    this[PROCESS]()
260  }
261
262  [PROCESSJOB] (job) {
263    if (job.pending) {
264      return
265    }
266
267    if (job.entry) {
268      if (job === this[CURRENT] && !job.piped) {
269        this[PIPE](job)
270      }
271      return
272    }
273
274    if (!job.stat) {
275      if (this.statCache.has(job.absolute)) {
276        this[ONSTAT](job, this.statCache.get(job.absolute))
277      } else {
278        this[STAT](job)
279      }
280    }
281    if (!job.stat) {
282      return
283    }
284
285    // filtered out!
286    if (job.ignore) {
287      return
288    }
289
290    if (!this.noDirRecurse && job.stat.isDirectory() && !job.readdir) {
291      if (this.readdirCache.has(job.absolute)) {
292        this[ONREADDIR](job, this.readdirCache.get(job.absolute))
293      } else {
294        this[READDIR](job)
295      }
296      if (!job.readdir) {
297        return
298      }
299    }
300
301    // we know it doesn't have an entry, because that got checked above
302    job.entry = this[ENTRY](job)
303    if (!job.entry) {
304      job.ignore = true
305      return
306    }
307
308    if (job === this[CURRENT] && !job.piped) {
309      this[PIPE](job)
310    }
311  }
312
313  [ENTRYOPT] (job) {
314    return {
315      onwarn: (code, msg, data) => this.warn(code, msg, data),
316      noPax: this.noPax,
317      cwd: this.cwd,
318      absolute: job.absolute,
319      preservePaths: this.preservePaths,
320      maxReadSize: this.maxReadSize,
321      strict: this.strict,
322      portable: this.portable,
323      linkCache: this.linkCache,
324      statCache: this.statCache,
325      noMtime: this.noMtime,
326      mtime: this.mtime,
327      prefix: this.prefix,
328    }
329  }
330
331  [ENTRY] (job) {
332    this[JOBS] += 1
333    try {
334      return new this[WRITEENTRYCLASS](job.path, this[ENTRYOPT](job))
335        .on('end', () => this[JOBDONE](job))
336        .on('error', er => this.emit('error', er))
337    } catch (er) {
338      this.emit('error', er)
339    }
340  }
341
342  [ONDRAIN] () {
343    if (this[CURRENT] && this[CURRENT].entry) {
344      this[CURRENT].entry.resume()
345    }
346  }
347
348  // like .pipe() but using super, because our write() is special
349  [PIPE] (job) {
350    job.piped = true
351
352    if (job.readdir) {
353      job.readdir.forEach(entry => {
354        const p = job.path
355        const base = p === './' ? '' : p.replace(/\/*$/, '/')
356        this[ADDFSENTRY](base + entry)
357      })
358    }
359
360    const source = job.entry
361    const zip = this.zip
362
363    if (zip) {
364      source.on('data', chunk => {
365        if (!zip.write(chunk)) {
366          source.pause()
367        }
368      })
369    } else {
370      source.on('data', chunk => {
371        if (!super.write(chunk)) {
372          source.pause()
373        }
374      })
375    }
376  }
377
378  pause () {
379    if (this.zip) {
380      this.zip.pause()
381    }
382    return super.pause()
383  }
384})
385
386class PackSync extends Pack {
387  constructor (opt) {
388    super(opt)
389    this[WRITEENTRYCLASS] = WriteEntrySync
390  }
391
392  // pause/resume are no-ops in sync streams.
393  pause () {}
394  resume () {}
395
396  [STAT] (job) {
397    const stat = this.follow ? 'statSync' : 'lstatSync'
398    this[ONSTAT](job, fs[stat](job.absolute))
399  }
400
401  [READDIR] (job, stat) {
402    this[ONREADDIR](job, fs.readdirSync(job.absolute))
403  }
404
405  // gotta get it all in this tick
406  [PIPE] (job) {
407    const source = job.entry
408    const zip = this.zip
409
410    if (job.readdir) {
411      job.readdir.forEach(entry => {
412        const p = job.path
413        const base = p === './' ? '' : p.replace(/\/*$/, '/')
414        this[ADDFSENTRY](base + entry)
415      })
416    }
417
418    if (zip) {
419      source.on('data', chunk => {
420        zip.write(chunk)
421      })
422    } else {
423      source.on('data', chunk => {
424        super[WRITE](chunk)
425      })
426    }
427  }
428}
429
430Pack.Sync = PackSync
431
432module.exports = Pack
433