1'use strict'
2
3const events = require('events')
4
5const contentPath = require('./path')
6const fs = require('fs/promises')
7const { moveFile } = require('@npmcli/fs')
8const { Minipass } = require('minipass')
9const Pipeline = require('minipass-pipeline')
10const Flush = require('minipass-flush')
11const path = require('path')
12const ssri = require('ssri')
13const uniqueFilename = require('unique-filename')
14const fsm = require('fs-minipass')
15
16module.exports = write
17
18// Cache of move operations in process so we don't duplicate
19const moveOperations = new Map()
20
21async function write (cache, data, opts = {}) {
22  const { algorithms, size, integrity } = opts
23
24  if (typeof size === 'number' && data.length !== size) {
25    throw sizeError(size, data.length)
26  }
27
28  const sri = ssri.fromData(data, algorithms ? { algorithms } : {})
29  if (integrity && !ssri.checkData(data, integrity, opts)) {
30    throw checksumError(integrity, sri)
31  }
32
33  for (const algo in sri) {
34    const tmp = await makeTmp(cache, opts)
35    const hash = sri[algo].toString()
36    try {
37      await fs.writeFile(tmp.target, data, { flag: 'wx' })
38      await moveToDestination(tmp, cache, hash, opts)
39    } finally {
40      if (!tmp.moved) {
41        await fs.rm(tmp.target, { recursive: true, force: true })
42      }
43    }
44  }
45  return { integrity: sri, size: data.length }
46}
47
48module.exports.stream = writeStream
49
50// writes proxied to the 'inputStream' that is passed to the Promise
51// 'end' is deferred until content is handled.
52class CacacheWriteStream extends Flush {
53  constructor (cache, opts) {
54    super()
55    this.opts = opts
56    this.cache = cache
57    this.inputStream = new Minipass()
58    this.inputStream.on('error', er => this.emit('error', er))
59    this.inputStream.on('drain', () => this.emit('drain'))
60    this.handleContentP = null
61  }
62
63  write (chunk, encoding, cb) {
64    if (!this.handleContentP) {
65      this.handleContentP = handleContent(
66        this.inputStream,
67        this.cache,
68        this.opts
69      )
70      this.handleContentP.catch(error => this.emit('error', error))
71    }
72    return this.inputStream.write(chunk, encoding, cb)
73  }
74
75  flush (cb) {
76    this.inputStream.end(() => {
77      if (!this.handleContentP) {
78        const e = new Error('Cache input stream was empty')
79        e.code = 'ENODATA'
80        // empty streams are probably emitting end right away.
81        // defer this one tick by rejecting a promise on it.
82        return Promise.reject(e).catch(cb)
83      }
84      // eslint-disable-next-line promise/catch-or-return
85      this.handleContentP.then(
86        (res) => {
87          res.integrity && this.emit('integrity', res.integrity)
88          // eslint-disable-next-line promise/always-return
89          res.size !== null && this.emit('size', res.size)
90          cb()
91        },
92        (er) => cb(er)
93      )
94    })
95  }
96}
97
98function writeStream (cache, opts = {}) {
99  return new CacacheWriteStream(cache, opts)
100}
101
102async function handleContent (inputStream, cache, opts) {
103  const tmp = await makeTmp(cache, opts)
104  try {
105    const res = await pipeToTmp(inputStream, cache, tmp.target, opts)
106    await moveToDestination(
107      tmp,
108      cache,
109      res.integrity,
110      opts
111    )
112    return res
113  } finally {
114    if (!tmp.moved) {
115      await fs.rm(tmp.target, { recursive: true, force: true })
116    }
117  }
118}
119
120async function pipeToTmp (inputStream, cache, tmpTarget, opts) {
121  const outStream = new fsm.WriteStream(tmpTarget, {
122    flags: 'wx',
123  })
124
125  if (opts.integrityEmitter) {
126    // we need to create these all simultaneously since they can fire in any order
127    const [integrity, size] = await Promise.all([
128      events.once(opts.integrityEmitter, 'integrity').then(res => res[0]),
129      events.once(opts.integrityEmitter, 'size').then(res => res[0]),
130      new Pipeline(inputStream, outStream).promise(),
131    ])
132    return { integrity, size }
133  }
134
135  let integrity
136  let size
137  const hashStream = ssri.integrityStream({
138    integrity: opts.integrity,
139    algorithms: opts.algorithms,
140    size: opts.size,
141  })
142  hashStream.on('integrity', i => {
143    integrity = i
144  })
145  hashStream.on('size', s => {
146    size = s
147  })
148
149  const pipeline = new Pipeline(inputStream, hashStream, outStream)
150  await pipeline.promise()
151  return { integrity, size }
152}
153
154async function makeTmp (cache, opts) {
155  const tmpTarget = uniqueFilename(path.join(cache, 'tmp'), opts.tmpPrefix)
156  await fs.mkdir(path.dirname(tmpTarget), { recursive: true })
157  return {
158    target: tmpTarget,
159    moved: false,
160  }
161}
162
163async function moveToDestination (tmp, cache, sri, opts) {
164  const destination = contentPath(cache, sri)
165  const destDir = path.dirname(destination)
166  if (moveOperations.has(destination)) {
167    return moveOperations.get(destination)
168  }
169  moveOperations.set(
170    destination,
171    fs.mkdir(destDir, { recursive: true })
172      .then(async () => {
173        await moveFile(tmp.target, destination, { overwrite: false })
174        tmp.moved = true
175        return tmp.moved
176      })
177      .catch(err => {
178        if (!err.message.startsWith('The destination file exists')) {
179          throw Object.assign(err, { code: 'EEXIST' })
180        }
181      }).finally(() => {
182        moveOperations.delete(destination)
183      })
184
185  )
186  return moveOperations.get(destination)
187}
188
189function sizeError (expected, found) {
190  /* eslint-disable-next-line max-len */
191  const err = new Error(`Bad data size: expected inserted data to be ${expected} bytes, but got ${found} instead`)
192  err.expected = expected
193  err.found = found
194  err.code = 'EBADSIZE'
195  return err
196}
197
198function checksumError (expected, found) {
199  const err = new Error(`Integrity check failed:
200  Wanted: ${expected}
201   Found: ${found}`)
202  err.code = 'EINTEGRITY'
203  err.expected = expected
204  err.found = found
205  return err
206}
207