readable.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. // Ported from https://github.com/nodejs/undici/pull/907
  2. 'use strict'
  3. const assert = require('node:assert')
  4. const { Readable } = require('node:stream')
  5. const { RequestAbortedError, NotSupportedError, InvalidArgumentError, AbortError } = require('../core/errors')
  6. const util = require('../core/util')
  7. const { ReadableStreamFrom } = require('../core/util')
  8. const kConsume = Symbol('kConsume')
  9. const kReading = Symbol('kReading')
  10. const kBody = Symbol('kBody')
  11. const kAbort = Symbol('kAbort')
  12. const kContentType = Symbol('kContentType')
  13. const kContentLength = Symbol('kContentLength')
  14. const kUsed = Symbol('kUsed')
  15. const kBytesRead = Symbol('kBytesRead')
  16. const noop = () => {}
  17. /**
  18. * @class
  19. * @extends {Readable}
  20. * @see https://fetch.spec.whatwg.org/#body
  21. */
  22. class BodyReadable extends Readable {
  23. /**
  24. * @param {object} opts
  25. * @param {(this: Readable, size: number) => void} opts.resume
  26. * @param {() => (void | null)} opts.abort
  27. * @param {string} [opts.contentType = '']
  28. * @param {number} [opts.contentLength]
  29. * @param {number} [opts.highWaterMark = 64 * 1024]
  30. */
  31. constructor ({
  32. resume,
  33. abort,
  34. contentType = '',
  35. contentLength,
  36. highWaterMark = 64 * 1024 // Same as nodejs fs streams.
  37. }) {
  38. super({
  39. autoDestroy: true,
  40. read: resume,
  41. highWaterMark
  42. })
  43. this._readableState.dataEmitted = false
  44. this[kAbort] = abort
  45. /**
  46. * @type {Consume | null}
  47. */
  48. this[kConsume] = null
  49. this[kBytesRead] = 0
  50. /**
  51. * @type {ReadableStream|null}
  52. */
  53. this[kBody] = null
  54. this[kUsed] = false
  55. this[kContentType] = contentType
  56. this[kContentLength] = Number.isFinite(contentLength) ? contentLength : null
  57. // Is stream being consumed through Readable API?
  58. // This is an optimization so that we avoid checking
  59. // for 'data' and 'readable' listeners in the hot path
  60. // inside push().
  61. this[kReading] = false
  62. }
  63. /**
  64. * @param {Error|null} err
  65. * @param {(error:(Error|null)) => void} callback
  66. * @returns {void}
  67. */
  68. _destroy (err, callback) {
  69. if (!err && !this._readableState.endEmitted) {
  70. err = new RequestAbortedError()
  71. }
  72. if (err) {
  73. this[kAbort]()
  74. }
  75. // Workaround for Node "bug". If the stream is destroyed in same
  76. // tick as it is created, then a user who is waiting for a
  77. // promise (i.e micro tick) for installing an 'error' listener will
  78. // never get a chance and will always encounter an unhandled exception.
  79. if (!this[kUsed]) {
  80. setImmediate(callback, err)
  81. } else {
  82. callback(err)
  83. }
  84. }
  85. /**
  86. * @param {string} event
  87. * @param {(...args: any[]) => void} listener
  88. * @returns {this}
  89. */
  90. on (event, listener) {
  91. if (event === 'data' || event === 'readable') {
  92. this[kReading] = true
  93. this[kUsed] = true
  94. }
  95. return super.on(event, listener)
  96. }
  97. /**
  98. * @param {string} event
  99. * @param {(...args: any[]) => void} listener
  100. * @returns {this}
  101. */
  102. addListener (event, listener) {
  103. return this.on(event, listener)
  104. }
  105. /**
  106. * @param {string|symbol} event
  107. * @param {(...args: any[]) => void} listener
  108. * @returns {this}
  109. */
  110. off (event, listener) {
  111. const ret = super.off(event, listener)
  112. if (event === 'data' || event === 'readable') {
  113. this[kReading] = (
  114. this.listenerCount('data') > 0 ||
  115. this.listenerCount('readable') > 0
  116. )
  117. }
  118. return ret
  119. }
  120. /**
  121. * @param {string|symbol} event
  122. * @param {(...args: any[]) => void} listener
  123. * @returns {this}
  124. */
  125. removeListener (event, listener) {
  126. return this.off(event, listener)
  127. }
  128. /**
  129. * @param {Buffer|null} chunk
  130. * @returns {boolean}
  131. */
  132. push (chunk) {
  133. this[kBytesRead] += chunk ? chunk.length : 0
  134. if (this[kConsume] && chunk !== null) {
  135. consumePush(this[kConsume], chunk)
  136. return this[kReading] ? super.push(chunk) : true
  137. }
  138. return super.push(chunk)
  139. }
  140. /**
  141. * Consumes and returns the body as a string.
  142. *
  143. * @see https://fetch.spec.whatwg.org/#dom-body-text
  144. * @returns {Promise<string>}
  145. */
  146. text () {
  147. return consume(this, 'text')
  148. }
  149. /**
  150. * Consumes and returns the body as a JavaScript Object.
  151. *
  152. * @see https://fetch.spec.whatwg.org/#dom-body-json
  153. * @returns {Promise<unknown>}
  154. */
  155. json () {
  156. return consume(this, 'json')
  157. }
  158. /**
  159. * Consumes and returns the body as a Blob
  160. *
  161. * @see https://fetch.spec.whatwg.org/#dom-body-blob
  162. * @returns {Promise<Blob>}
  163. */
  164. blob () {
  165. return consume(this, 'blob')
  166. }
  167. /**
  168. * Consumes and returns the body as an Uint8Array.
  169. *
  170. * @see https://fetch.spec.whatwg.org/#dom-body-bytes
  171. * @returns {Promise<Uint8Array>}
  172. */
  173. bytes () {
  174. return consume(this, 'bytes')
  175. }
  176. /**
  177. * Consumes and returns the body as an ArrayBuffer.
  178. *
  179. * @see https://fetch.spec.whatwg.org/#dom-body-arraybuffer
  180. * @returns {Promise<ArrayBuffer>}
  181. */
  182. arrayBuffer () {
  183. return consume(this, 'arrayBuffer')
  184. }
  185. /**
  186. * Not implemented
  187. *
  188. * @see https://fetch.spec.whatwg.org/#dom-body-formdata
  189. * @throws {NotSupportedError}
  190. */
  191. async formData () {
  192. // TODO: Implement.
  193. throw new NotSupportedError()
  194. }
  195. /**
  196. * Returns true if the body is not null and the body has been consumed.
  197. * Otherwise, returns false.
  198. *
  199. * @see https://fetch.spec.whatwg.org/#dom-body-bodyused
  200. * @readonly
  201. * @returns {boolean}
  202. */
  203. get bodyUsed () {
  204. return util.isDisturbed(this)
  205. }
  206. /**
  207. * @see https://fetch.spec.whatwg.org/#dom-body-body
  208. * @readonly
  209. * @returns {ReadableStream}
  210. */
  211. get body () {
  212. if (!this[kBody]) {
  213. this[kBody] = ReadableStreamFrom(this)
  214. if (this[kConsume]) {
  215. // TODO: Is this the best way to force a lock?
  216. this[kBody].getReader() // Ensure stream is locked.
  217. assert(this[kBody].locked)
  218. }
  219. }
  220. return this[kBody]
  221. }
  222. /**
  223. * Dumps the response body by reading `limit` number of bytes.
  224. * @param {object} opts
  225. * @param {number} [opts.limit = 131072] Number of bytes to read.
  226. * @param {AbortSignal} [opts.signal] An AbortSignal to cancel the dump.
  227. * @returns {Promise<null>}
  228. */
  229. async dump (opts) {
  230. const signal = opts?.signal
  231. if (signal != null && (typeof signal !== 'object' || !('aborted' in signal))) {
  232. throw new InvalidArgumentError('signal must be an AbortSignal')
  233. }
  234. const limit = opts?.limit && Number.isFinite(opts.limit)
  235. ? opts.limit
  236. : 128 * 1024
  237. signal?.throwIfAborted()
  238. if (this._readableState.closeEmitted) {
  239. return null
  240. }
  241. return await new Promise((resolve, reject) => {
  242. if (
  243. (this[kContentLength] && (this[kContentLength] > limit)) ||
  244. this[kBytesRead] > limit
  245. ) {
  246. this.destroy(new AbortError())
  247. }
  248. if (signal) {
  249. const onAbort = () => {
  250. this.destroy(signal.reason ?? new AbortError())
  251. }
  252. signal.addEventListener('abort', onAbort)
  253. this
  254. .on('close', function () {
  255. signal.removeEventListener('abort', onAbort)
  256. if (signal.aborted) {
  257. reject(signal.reason ?? new AbortError())
  258. } else {
  259. resolve(null)
  260. }
  261. })
  262. } else {
  263. this.on('close', resolve)
  264. }
  265. this
  266. .on('error', noop)
  267. .on('data', () => {
  268. if (this[kBytesRead] > limit) {
  269. this.destroy()
  270. }
  271. })
  272. .resume()
  273. })
  274. }
  275. /**
  276. * @param {BufferEncoding} encoding
  277. * @returns {this}
  278. */
  279. setEncoding (encoding) {
  280. if (Buffer.isEncoding(encoding)) {
  281. this._readableState.encoding = encoding
  282. }
  283. return this
  284. }
  285. }
  286. /**
  287. * @see https://streams.spec.whatwg.org/#readablestream-locked
  288. * @param {BodyReadable} bodyReadable
  289. * @returns {boolean}
  290. */
  291. function isLocked (bodyReadable) {
  292. // Consume is an implicit lock.
  293. return bodyReadable[kBody]?.locked === true || bodyReadable[kConsume] !== null
  294. }
  295. /**
  296. * @see https://fetch.spec.whatwg.org/#body-unusable
  297. * @param {BodyReadable} bodyReadable
  298. * @returns {boolean}
  299. */
  300. function isUnusable (bodyReadable) {
  301. return util.isDisturbed(bodyReadable) || isLocked(bodyReadable)
  302. }
  303. /**
  304. * @typedef {object} Consume
  305. * @property {string} type
  306. * @property {BodyReadable} stream
  307. * @property {((value?: any) => void)} resolve
  308. * @property {((err: Error) => void)} reject
  309. * @property {number} length
  310. * @property {Buffer[]} body
  311. */
  312. /**
  313. * @param {BodyReadable} stream
  314. * @param {string} type
  315. * @returns {Promise<any>}
  316. */
  317. function consume (stream, type) {
  318. assert(!stream[kConsume])
  319. return new Promise((resolve, reject) => {
  320. if (isUnusable(stream)) {
  321. const rState = stream._readableState
  322. if (rState.destroyed && rState.closeEmitted === false) {
  323. stream
  324. .on('error', err => {
  325. reject(err)
  326. })
  327. .on('close', () => {
  328. reject(new TypeError('unusable'))
  329. })
  330. } else {
  331. reject(rState.errored ?? new TypeError('unusable'))
  332. }
  333. } else {
  334. queueMicrotask(() => {
  335. stream[kConsume] = {
  336. type,
  337. stream,
  338. resolve,
  339. reject,
  340. length: 0,
  341. body: []
  342. }
  343. stream
  344. .on('error', function (err) {
  345. consumeFinish(this[kConsume], err)
  346. })
  347. .on('close', function () {
  348. if (this[kConsume].body !== null) {
  349. consumeFinish(this[kConsume], new RequestAbortedError())
  350. }
  351. })
  352. consumeStart(stream[kConsume])
  353. })
  354. }
  355. })
  356. }
  357. /**
  358. * @param {Consume} consume
  359. * @returns {void}
  360. */
  361. function consumeStart (consume) {
  362. if (consume.body === null) {
  363. return
  364. }
  365. const { _readableState: state } = consume.stream
  366. if (state.bufferIndex) {
  367. const start = state.bufferIndex
  368. const end = state.buffer.length
  369. for (let n = start; n < end; n++) {
  370. consumePush(consume, state.buffer[n])
  371. }
  372. } else {
  373. for (const chunk of state.buffer) {
  374. consumePush(consume, chunk)
  375. }
  376. }
  377. if (state.endEmitted) {
  378. consumeEnd(this[kConsume], this._readableState.encoding)
  379. } else {
  380. consume.stream.on('end', function () {
  381. consumeEnd(this[kConsume], this._readableState.encoding)
  382. })
  383. }
  384. consume.stream.resume()
  385. while (consume.stream.read() != null) {
  386. // Loop
  387. }
  388. }
  389. /**
  390. * @param {Buffer[]} chunks
  391. * @param {number} length
  392. * @param {BufferEncoding} encoding
  393. * @returns {string}
  394. */
  395. function chunksDecode (chunks, length, encoding) {
  396. if (chunks.length === 0 || length === 0) {
  397. return ''
  398. }
  399. const buffer = chunks.length === 1 ? chunks[0] : Buffer.concat(chunks, length)
  400. const bufferLength = buffer.length
  401. // Skip BOM.
  402. const start =
  403. bufferLength > 2 &&
  404. buffer[0] === 0xef &&
  405. buffer[1] === 0xbb &&
  406. buffer[2] === 0xbf
  407. ? 3
  408. : 0
  409. if (!encoding || encoding === 'utf8' || encoding === 'utf-8') {
  410. return buffer.utf8Slice(start, bufferLength)
  411. } else {
  412. return buffer.subarray(start, bufferLength).toString(encoding)
  413. }
  414. }
  415. /**
  416. * @param {Buffer[]} chunks
  417. * @param {number} length
  418. * @returns {Uint8Array}
  419. */
  420. function chunksConcat (chunks, length) {
  421. if (chunks.length === 0 || length === 0) {
  422. return new Uint8Array(0)
  423. }
  424. if (chunks.length === 1) {
  425. // fast-path
  426. return new Uint8Array(chunks[0])
  427. }
  428. const buffer = new Uint8Array(Buffer.allocUnsafeSlow(length).buffer)
  429. let offset = 0
  430. for (let i = 0; i < chunks.length; ++i) {
  431. const chunk = chunks[i]
  432. buffer.set(chunk, offset)
  433. offset += chunk.length
  434. }
  435. return buffer
  436. }
  437. /**
  438. * @param {Consume} consume
  439. * @param {BufferEncoding} encoding
  440. * @returns {void}
  441. */
  442. function consumeEnd (consume, encoding) {
  443. const { type, body, resolve, stream, length } = consume
  444. try {
  445. if (type === 'text') {
  446. resolve(chunksDecode(body, length, encoding))
  447. } else if (type === 'json') {
  448. resolve(JSON.parse(chunksDecode(body, length, encoding)))
  449. } else if (type === 'arrayBuffer') {
  450. resolve(chunksConcat(body, length).buffer)
  451. } else if (type === 'blob') {
  452. resolve(new Blob(body, { type: stream[kContentType] }))
  453. } else if (type === 'bytes') {
  454. resolve(chunksConcat(body, length))
  455. }
  456. consumeFinish(consume)
  457. } catch (err) {
  458. stream.destroy(err)
  459. }
  460. }
  461. /**
  462. * @param {Consume} consume
  463. * @param {Buffer} chunk
  464. * @returns {void}
  465. */
  466. function consumePush (consume, chunk) {
  467. consume.length += chunk.length
  468. consume.body.push(chunk)
  469. }
  470. /**
  471. * @param {Consume} consume
  472. * @param {Error} [err]
  473. * @returns {void}
  474. */
  475. function consumeFinish (consume, err) {
  476. if (consume.body === null) {
  477. return
  478. }
  479. if (err) {
  480. consume.reject(err)
  481. } else {
  482. consume.resolve()
  483. }
  484. // Reset the consume object to allow for garbage collection.
  485. consume.type = null
  486. consume.stream = null
  487. consume.resolve = null
  488. consume.reject = null
  489. consume.length = 0
  490. consume.body = null
  491. }
  492. module.exports = {
  493. Readable: BodyReadable,
  494. chunksDecode
  495. }