1'use strict'
2const { Minipass } = require('minipass')
3const EE = require('events').EventEmitter
4const fs = require('fs')
5
6const writev = fs.writev
7
8const _autoClose = Symbol('_autoClose')
9const _close = Symbol('_close')
10const _ended = Symbol('_ended')
11const _fd = Symbol('_fd')
12const _finished = Symbol('_finished')
13const _flags = Symbol('_flags')
14const _flush = Symbol('_flush')
15const _handleChunk = Symbol('_handleChunk')
16const _makeBuf = Symbol('_makeBuf')
17const _mode = Symbol('_mode')
18const _needDrain = Symbol('_needDrain')
19const _onerror = Symbol('_onerror')
20const _onopen = Symbol('_onopen')
21const _onread = Symbol('_onread')
22const _onwrite = Symbol('_onwrite')
23const _open = Symbol('_open')
24const _path = Symbol('_path')
25const _pos = Symbol('_pos')
26const _queue = Symbol('_queue')
27const _read = Symbol('_read')
28const _readSize = Symbol('_readSize')
29const _reading = Symbol('_reading')
30const _remain = Symbol('_remain')
31const _size = Symbol('_size')
32const _write = Symbol('_write')
33const _writing = Symbol('_writing')
34const _defaultFlag = Symbol('_defaultFlag')
35const _errored = Symbol('_errored')
36
37class ReadStream extends Minipass {
38  constructor (path, opt) {
39    opt = opt || {}
40    super(opt)
41
42    this.readable = true
43    this.writable = false
44
45    if (typeof path !== 'string') {
46      throw new TypeError('path must be a string')
47    }
48
49    this[_errored] = false
50    this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
51    this[_path] = path
52    this[_readSize] = opt.readSize || 16 * 1024 * 1024
53    this[_reading] = false
54    this[_size] = typeof opt.size === 'number' ? opt.size : Infinity
55    this[_remain] = this[_size]
56    this[_autoClose] = typeof opt.autoClose === 'boolean' ?
57      opt.autoClose : true
58
59    if (typeof this[_fd] === 'number') {
60      this[_read]()
61    } else {
62      this[_open]()
63    }
64  }
65
66  get fd () {
67    return this[_fd]
68  }
69
70  get path () {
71    return this[_path]
72  }
73
74  write () {
75    throw new TypeError('this is a readable stream')
76  }
77
78  end () {
79    throw new TypeError('this is a readable stream')
80  }
81
82  [_open] () {
83    fs.open(this[_path], 'r', (er, fd) => this[_onopen](er, fd))
84  }
85
86  [_onopen] (er, fd) {
87    if (er) {
88      this[_onerror](er)
89    } else {
90      this[_fd] = fd
91      this.emit('open', fd)
92      this[_read]()
93    }
94  }
95
96  [_makeBuf] () {
97    return Buffer.allocUnsafe(Math.min(this[_readSize], this[_remain]))
98  }
99
100  [_read] () {
101    if (!this[_reading]) {
102      this[_reading] = true
103      const buf = this[_makeBuf]()
104      /* istanbul ignore if */
105      if (buf.length === 0) {
106        return process.nextTick(() => this[_onread](null, 0, buf))
107      }
108      fs.read(this[_fd], buf, 0, buf.length, null, (er, br, b) =>
109        this[_onread](er, br, b))
110    }
111  }
112
113  [_onread] (er, br, buf) {
114    this[_reading] = false
115    if (er) {
116      this[_onerror](er)
117    } else if (this[_handleChunk](br, buf)) {
118      this[_read]()
119    }
120  }
121
122  [_close] () {
123    if (this[_autoClose] && typeof this[_fd] === 'number') {
124      const fd = this[_fd]
125      this[_fd] = null
126      fs.close(fd, er => er ? this.emit('error', er) : this.emit('close'))
127    }
128  }
129
130  [_onerror] (er) {
131    this[_reading] = true
132    this[_close]()
133    this.emit('error', er)
134  }
135
136  [_handleChunk] (br, buf) {
137    let ret = false
138    // no effect if infinite
139    this[_remain] -= br
140    if (br > 0) {
141      ret = super.write(br < buf.length ? buf.slice(0, br) : buf)
142    }
143
144    if (br === 0 || this[_remain] <= 0) {
145      ret = false
146      this[_close]()
147      super.end()
148    }
149
150    return ret
151  }
152
153  emit (ev, data) {
154    switch (ev) {
155      case 'prefinish':
156      case 'finish':
157        break
158
159      case 'drain':
160        if (typeof this[_fd] === 'number') {
161          this[_read]()
162        }
163        break
164
165      case 'error':
166        if (this[_errored]) {
167          return
168        }
169        this[_errored] = true
170        return super.emit(ev, data)
171
172      default:
173        return super.emit(ev, data)
174    }
175  }
176}
177
178class ReadStreamSync extends ReadStream {
179  [_open] () {
180    let threw = true
181    try {
182      this[_onopen](null, fs.openSync(this[_path], 'r'))
183      threw = false
184    } finally {
185      if (threw) {
186        this[_close]()
187      }
188    }
189  }
190
191  [_read] () {
192    let threw = true
193    try {
194      if (!this[_reading]) {
195        this[_reading] = true
196        do {
197          const buf = this[_makeBuf]()
198          /* istanbul ignore next */
199          const br = buf.length === 0 ? 0
200            : fs.readSync(this[_fd], buf, 0, buf.length, null)
201          if (!this[_handleChunk](br, buf)) {
202            break
203          }
204        } while (true)
205        this[_reading] = false
206      }
207      threw = false
208    } finally {
209      if (threw) {
210        this[_close]()
211      }
212    }
213  }
214
215  [_close] () {
216    if (this[_autoClose] && typeof this[_fd] === 'number') {
217      const fd = this[_fd]
218      this[_fd] = null
219      fs.closeSync(fd)
220      this.emit('close')
221    }
222  }
223}
224
225class WriteStream extends EE {
226  constructor (path, opt) {
227    opt = opt || {}
228    super(opt)
229    this.readable = false
230    this.writable = true
231    this[_errored] = false
232    this[_writing] = false
233    this[_ended] = false
234    this[_needDrain] = false
235    this[_queue] = []
236    this[_path] = path
237    this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
238    this[_mode] = opt.mode === undefined ? 0o666 : opt.mode
239    this[_pos] = typeof opt.start === 'number' ? opt.start : null
240    this[_autoClose] = typeof opt.autoClose === 'boolean' ?
241      opt.autoClose : true
242
243    // truncating makes no sense when writing into the middle
244    const defaultFlag = this[_pos] !== null ? 'r+' : 'w'
245    this[_defaultFlag] = opt.flags === undefined
246    this[_flags] = this[_defaultFlag] ? defaultFlag : opt.flags
247
248    if (this[_fd] === null) {
249      this[_open]()
250    }
251  }
252
253  emit (ev, data) {
254    if (ev === 'error') {
255      if (this[_errored]) {
256        return
257      }
258      this[_errored] = true
259    }
260    return super.emit(ev, data)
261  }
262
263  get fd () {
264    return this[_fd]
265  }
266
267  get path () {
268    return this[_path]
269  }
270
271  [_onerror] (er) {
272    this[_close]()
273    this[_writing] = true
274    this.emit('error', er)
275  }
276
277  [_open] () {
278    fs.open(this[_path], this[_flags], this[_mode],
279      (er, fd) => this[_onopen](er, fd))
280  }
281
282  [_onopen] (er, fd) {
283    if (this[_defaultFlag] &&
284        this[_flags] === 'r+' &&
285        er && er.code === 'ENOENT') {
286      this[_flags] = 'w'
287      this[_open]()
288    } else if (er) {
289      this[_onerror](er)
290    } else {
291      this[_fd] = fd
292      this.emit('open', fd)
293      if (!this[_writing]) {
294        this[_flush]()
295      }
296    }
297  }
298
299  end (buf, enc) {
300    if (buf) {
301      this.write(buf, enc)
302    }
303
304    this[_ended] = true
305
306    // synthetic after-write logic, where drain/finish live
307    if (!this[_writing] && !this[_queue].length &&
308        typeof this[_fd] === 'number') {
309      this[_onwrite](null, 0)
310    }
311    return this
312  }
313
314  write (buf, enc) {
315    if (typeof buf === 'string') {
316      buf = Buffer.from(buf, enc)
317    }
318
319    if (this[_ended]) {
320      this.emit('error', new Error('write() after end()'))
321      return false
322    }
323
324    if (this[_fd] === null || this[_writing] || this[_queue].length) {
325      this[_queue].push(buf)
326      this[_needDrain] = true
327      return false
328    }
329
330    this[_writing] = true
331    this[_write](buf)
332    return true
333  }
334
335  [_write] (buf) {
336    fs.write(this[_fd], buf, 0, buf.length, this[_pos], (er, bw) =>
337      this[_onwrite](er, bw))
338  }
339
340  [_onwrite] (er, bw) {
341    if (er) {
342      this[_onerror](er)
343    } else {
344      if (this[_pos] !== null) {
345        this[_pos] += bw
346      }
347      if (this[_queue].length) {
348        this[_flush]()
349      } else {
350        this[_writing] = false
351
352        if (this[_ended] && !this[_finished]) {
353          this[_finished] = true
354          this[_close]()
355          this.emit('finish')
356        } else if (this[_needDrain]) {
357          this[_needDrain] = false
358          this.emit('drain')
359        }
360      }
361    }
362  }
363
364  [_flush] () {
365    if (this[_queue].length === 0) {
366      if (this[_ended]) {
367        this[_onwrite](null, 0)
368      }
369    } else if (this[_queue].length === 1) {
370      this[_write](this[_queue].pop())
371    } else {
372      const iovec = this[_queue]
373      this[_queue] = []
374      writev(this[_fd], iovec, this[_pos],
375        (er, bw) => this[_onwrite](er, bw))
376    }
377  }
378
379  [_close] () {
380    if (this[_autoClose] && typeof this[_fd] === 'number') {
381      const fd = this[_fd]
382      this[_fd] = null
383      fs.close(fd, er => er ? this.emit('error', er) : this.emit('close'))
384    }
385  }
386}
387
388class WriteStreamSync extends WriteStream {
389  [_open] () {
390    let fd
391    // only wrap in a try{} block if we know we'll retry, to avoid
392    // the rethrow obscuring the error's source frame in most cases.
393    if (this[_defaultFlag] && this[_flags] === 'r+') {
394      try {
395        fd = fs.openSync(this[_path], this[_flags], this[_mode])
396      } catch (er) {
397        if (er.code === 'ENOENT') {
398          this[_flags] = 'w'
399          return this[_open]()
400        } else {
401          throw er
402        }
403      }
404    } else {
405      fd = fs.openSync(this[_path], this[_flags], this[_mode])
406    }
407
408    this[_onopen](null, fd)
409  }
410
411  [_close] () {
412    if (this[_autoClose] && typeof this[_fd] === 'number') {
413      const fd = this[_fd]
414      this[_fd] = null
415      fs.closeSync(fd)
416      this.emit('close')
417    }
418  }
419
420  [_write] (buf) {
421    // throw the original, but try to close if it fails
422    let threw = true
423    try {
424      this[_onwrite](null,
425        fs.writeSync(this[_fd], buf, 0, buf.length, this[_pos]))
426      threw = false
427    } finally {
428      if (threw) {
429        try {
430          this[_close]()
431        } catch {
432          // ok error
433        }
434      }
435    }
436  }
437}
438
439exports.ReadStream = ReadStream
440exports.ReadStreamSync = ReadStreamSync
441
442exports.WriteStream = WriteStream
443exports.WriteStreamSync = WriteStreamSync
444