write.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. 'use strict'
  2. const events = require('events')
  3. const contentPath = require('./path')
  4. const fs = require('fs/promises')
  5. const moveFile = require('../util/move-file')
  6. const { Minipass } = require('minipass')
  7. const Pipeline = require('minipass-pipeline')
  8. const Flush = require('minipass-flush')
  9. const path = require('path')
  10. const ssri = require('ssri')
  11. const uniqueFilename = require('unique-filename')
  12. const fsm = require('fs-minipass')
  13. module.exports = write
  14. async function write (cache, data, opts = {}) {
  15. const { algorithms, size, integrity } = opts
  16. if (algorithms && algorithms.length > 1) {
  17. throw new Error('opts.algorithms only supports a single algorithm for now')
  18. }
  19. if (typeof size === 'number' && data.length !== size) {
  20. throw sizeError(size, data.length)
  21. }
  22. const sri = ssri.fromData(data, algorithms ? { algorithms } : {})
  23. if (integrity && !ssri.checkData(data, integrity, opts)) {
  24. throw checksumError(integrity, sri)
  25. }
  26. const tmp = await makeTmp(cache, opts)
  27. try {
  28. await fs.writeFile(tmp.target, data, { flag: 'wx' })
  29. await moveToDestination(tmp, cache, sri, opts)
  30. return { integrity: sri, size: data.length }
  31. } finally {
  32. if (!tmp.moved) {
  33. await fs.rm(tmp.target, { recursive: true, force: true })
  34. }
  35. }
  36. }
  37. module.exports.stream = writeStream
  38. // writes proxied to the 'inputStream' that is passed to the Promise
  39. // 'end' is deferred until content is handled.
  40. class CacacheWriteStream extends Flush {
  41. constructor (cache, opts) {
  42. super()
  43. this.opts = opts
  44. this.cache = cache
  45. this.inputStream = new Minipass()
  46. this.inputStream.on('error', er => this.emit('error', er))
  47. this.inputStream.on('drain', () => this.emit('drain'))
  48. this.handleContentP = null
  49. }
  50. write (chunk, encoding, cb) {
  51. if (!this.handleContentP) {
  52. this.handleContentP = handleContent(
  53. this.inputStream,
  54. this.cache,
  55. this.opts
  56. )
  57. }
  58. return this.inputStream.write(chunk, encoding, cb)
  59. }
  60. flush (cb) {
  61. this.inputStream.end(() => {
  62. if (!this.handleContentP) {
  63. const e = new Error('Cache input stream was empty')
  64. e.code = 'ENODATA'
  65. // empty streams are probably emitting end right away.
  66. // defer this one tick by rejecting a promise on it.
  67. return Promise.reject(e).catch(cb)
  68. }
  69. // eslint-disable-next-line promise/catch-or-return
  70. this.handleContentP.then(
  71. (res) => {
  72. res.integrity && this.emit('integrity', res.integrity)
  73. // eslint-disable-next-line promise/always-return
  74. res.size !== null && this.emit('size', res.size)
  75. cb()
  76. },
  77. (er) => cb(er)
  78. )
  79. })
  80. }
  81. }
  82. function writeStream (cache, opts = {}) {
  83. return new CacacheWriteStream(cache, opts)
  84. }
  85. async function handleContent (inputStream, cache, opts) {
  86. const tmp = await makeTmp(cache, opts)
  87. try {
  88. const res = await pipeToTmp(inputStream, cache, tmp.target, opts)
  89. await moveToDestination(
  90. tmp,
  91. cache,
  92. res.integrity,
  93. opts
  94. )
  95. return res
  96. } finally {
  97. if (!tmp.moved) {
  98. await fs.rm(tmp.target, { recursive: true, force: true })
  99. }
  100. }
  101. }
  102. async function pipeToTmp (inputStream, cache, tmpTarget, opts) {
  103. const outStream = new fsm.WriteStream(tmpTarget, {
  104. flags: 'wx',
  105. })
  106. if (opts.integrityEmitter) {
  107. // we need to create these all simultaneously since they can fire in any order
  108. const [integrity, size] = await Promise.all([
  109. events.once(opts.integrityEmitter, 'integrity').then(res => res[0]),
  110. events.once(opts.integrityEmitter, 'size').then(res => res[0]),
  111. new Pipeline(inputStream, outStream).promise(),
  112. ])
  113. return { integrity, size }
  114. }
  115. let integrity
  116. let size
  117. const hashStream = ssri.integrityStream({
  118. integrity: opts.integrity,
  119. algorithms: opts.algorithms,
  120. size: opts.size,
  121. })
  122. hashStream.on('integrity', i => {
  123. integrity = i
  124. })
  125. hashStream.on('size', s => {
  126. size = s
  127. })
  128. const pipeline = new Pipeline(inputStream, hashStream, outStream)
  129. await pipeline.promise()
  130. return { integrity, size }
  131. }
  132. async function makeTmp (cache, opts) {
  133. const tmpTarget = uniqueFilename(path.join(cache, 'tmp'), opts.tmpPrefix)
  134. await fs.mkdir(path.dirname(tmpTarget), { recursive: true })
  135. return {
  136. target: tmpTarget,
  137. moved: false,
  138. }
  139. }
  140. async function moveToDestination (tmp, cache, sri, opts) {
  141. const destination = contentPath(cache, sri)
  142. const destDir = path.dirname(destination)
  143. await fs.mkdir(destDir, { recursive: true })
  144. await moveFile(tmp.target, destination)
  145. tmp.moved = true
  146. }
  147. function sizeError (expected, found) {
  148. /* eslint-disable-next-line max-len */
  149. const err = new Error(`Bad data size: expected inserted data to be ${expected} bytes, but got ${found} instead`)
  150. err.expected = expected
  151. err.found = found
  152. err.code = 'EBADSIZE'
  153. return err
  154. }
  155. function checksumError (expected, found) {
  156. const err = new Error(`Integrity check failed:
  157. Wanted: ${expected}
  158. Found: ${found}`)
  159. err.code = 'EINTEGRITY'
  160. err.expected = expected
  161. err.found = found
  162. return err
  163. }