123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556 |
- // Ported from https://github.com/nodejs/undici/pull/907
- 'use strict'
- const assert = require('node:assert')
- const { Readable } = require('node:stream')
- const { RequestAbortedError, NotSupportedError, InvalidArgumentError, AbortError } = require('../core/errors')
- const util = require('../core/util')
- const { ReadableStreamFrom } = require('../core/util')
- const kConsume = Symbol('kConsume')
- const kReading = Symbol('kReading')
- const kBody = Symbol('kBody')
- const kAbort = Symbol('kAbort')
- const kContentType = Symbol('kContentType')
- const kContentLength = Symbol('kContentLength')
- const kUsed = Symbol('kUsed')
- const kBytesRead = Symbol('kBytesRead')
- const noop = () => {}
- /**
- * @class
- * @extends {Readable}
- * @see https://fetch.spec.whatwg.org/#body
- */
- class BodyReadable extends Readable {
- /**
- * @param {object} opts
- * @param {(this: Readable, size: number) => void} opts.resume
- * @param {() => (void | null)} opts.abort
- * @param {string} [opts.contentType = '']
- * @param {number} [opts.contentLength]
- * @param {number} [opts.highWaterMark = 64 * 1024]
- */
- constructor ({
- resume,
- abort,
- contentType = '',
- contentLength,
- highWaterMark = 64 * 1024 // Same as nodejs fs streams.
- }) {
- super({
- autoDestroy: true,
- read: resume,
- highWaterMark
- })
- this._readableState.dataEmitted = false
- this[kAbort] = abort
- /**
- * @type {Consume | null}
- */
- this[kConsume] = null
- this[kBytesRead] = 0
- /**
- * @type {ReadableStream|null}
- */
- this[kBody] = null
- this[kUsed] = false
- this[kContentType] = contentType
- this[kContentLength] = Number.isFinite(contentLength) ? contentLength : null
- // Is stream being consumed through Readable API?
- // This is an optimization so that we avoid checking
- // for 'data' and 'readable' listeners in the hot path
- // inside push().
- this[kReading] = false
- }
- /**
- * @param {Error|null} err
- * @param {(error:(Error|null)) => void} callback
- * @returns {void}
- */
- _destroy (err, callback) {
- if (!err && !this._readableState.endEmitted) {
- err = new RequestAbortedError()
- }
- if (err) {
- this[kAbort]()
- }
- // Workaround for Node "bug". If the stream is destroyed in same
- // tick as it is created, then a user who is waiting for a
- // promise (i.e micro tick) for installing an 'error' listener will
- // never get a chance and will always encounter an unhandled exception.
- if (!this[kUsed]) {
- setImmediate(callback, err)
- } else {
- callback(err)
- }
- }
- /**
- * @param {string} event
- * @param {(...args: any[]) => void} listener
- * @returns {this}
- */
- on (event, listener) {
- if (event === 'data' || event === 'readable') {
- this[kReading] = true
- this[kUsed] = true
- }
- return super.on(event, listener)
- }
- /**
- * @param {string} event
- * @param {(...args: any[]) => void} listener
- * @returns {this}
- */
- addListener (event, listener) {
- return this.on(event, listener)
- }
- /**
- * @param {string|symbol} event
- * @param {(...args: any[]) => void} listener
- * @returns {this}
- */
- off (event, listener) {
- const ret = super.off(event, listener)
- if (event === 'data' || event === 'readable') {
- this[kReading] = (
- this.listenerCount('data') > 0 ||
- this.listenerCount('readable') > 0
- )
- }
- return ret
- }
- /**
- * @param {string|symbol} event
- * @param {(...args: any[]) => void} listener
- * @returns {this}
- */
- removeListener (event, listener) {
- return this.off(event, listener)
- }
- /**
- * @param {Buffer|null} chunk
- * @returns {boolean}
- */
- push (chunk) {
- this[kBytesRead] += chunk ? chunk.length : 0
- if (this[kConsume] && chunk !== null) {
- consumePush(this[kConsume], chunk)
- return this[kReading] ? super.push(chunk) : true
- }
- return super.push(chunk)
- }
- /**
- * Consumes and returns the body as a string.
- *
- * @see https://fetch.spec.whatwg.org/#dom-body-text
- * @returns {Promise<string>}
- */
- text () {
- return consume(this, 'text')
- }
- /**
- * Consumes and returns the body as a JavaScript Object.
- *
- * @see https://fetch.spec.whatwg.org/#dom-body-json
- * @returns {Promise<unknown>}
- */
- json () {
- return consume(this, 'json')
- }
- /**
- * Consumes and returns the body as a Blob
- *
- * @see https://fetch.spec.whatwg.org/#dom-body-blob
- * @returns {Promise<Blob>}
- */
- blob () {
- return consume(this, 'blob')
- }
- /**
- * Consumes and returns the body as an Uint8Array.
- *
- * @see https://fetch.spec.whatwg.org/#dom-body-bytes
- * @returns {Promise<Uint8Array>}
- */
- bytes () {
- return consume(this, 'bytes')
- }
- /**
- * Consumes and returns the body as an ArrayBuffer.
- *
- * @see https://fetch.spec.whatwg.org/#dom-body-arraybuffer
- * @returns {Promise<ArrayBuffer>}
- */
- arrayBuffer () {
- return consume(this, 'arrayBuffer')
- }
- /**
- * Not implemented
- *
- * @see https://fetch.spec.whatwg.org/#dom-body-formdata
- * @throws {NotSupportedError}
- */
- async formData () {
- // TODO: Implement.
- throw new NotSupportedError()
- }
- /**
- * Returns true if the body is not null and the body has been consumed.
- * Otherwise, returns false.
- *
- * @see https://fetch.spec.whatwg.org/#dom-body-bodyused
- * @readonly
- * @returns {boolean}
- */
- get bodyUsed () {
- return util.isDisturbed(this)
- }
- /**
- * @see https://fetch.spec.whatwg.org/#dom-body-body
- * @readonly
- * @returns {ReadableStream}
- */
- get body () {
- if (!this[kBody]) {
- this[kBody] = ReadableStreamFrom(this)
- if (this[kConsume]) {
- // TODO: Is this the best way to force a lock?
- this[kBody].getReader() // Ensure stream is locked.
- assert(this[kBody].locked)
- }
- }
- return this[kBody]
- }
- /**
- * Dumps the response body by reading `limit` number of bytes.
- * @param {object} opts
- * @param {number} [opts.limit = 131072] Number of bytes to read.
- * @param {AbortSignal} [opts.signal] An AbortSignal to cancel the dump.
- * @returns {Promise<null>}
- */
- async dump (opts) {
- const signal = opts?.signal
- if (signal != null && (typeof signal !== 'object' || !('aborted' in signal))) {
- throw new InvalidArgumentError('signal must be an AbortSignal')
- }
- const limit = opts?.limit && Number.isFinite(opts.limit)
- ? opts.limit
- : 128 * 1024
- signal?.throwIfAborted()
- if (this._readableState.closeEmitted) {
- return null
- }
- return await new Promise((resolve, reject) => {
- if (
- (this[kContentLength] && (this[kContentLength] > limit)) ||
- this[kBytesRead] > limit
- ) {
- this.destroy(new AbortError())
- }
- if (signal) {
- const onAbort = () => {
- this.destroy(signal.reason ?? new AbortError())
- }
- signal.addEventListener('abort', onAbort)
- this
- .on('close', function () {
- signal.removeEventListener('abort', onAbort)
- if (signal.aborted) {
- reject(signal.reason ?? new AbortError())
- } else {
- resolve(null)
- }
- })
- } else {
- this.on('close', resolve)
- }
- this
- .on('error', noop)
- .on('data', () => {
- if (this[kBytesRead] > limit) {
- this.destroy()
- }
- })
- .resume()
- })
- }
- /**
- * @param {BufferEncoding} encoding
- * @returns {this}
- */
- setEncoding (encoding) {
- if (Buffer.isEncoding(encoding)) {
- this._readableState.encoding = encoding
- }
- return this
- }
- }
- /**
- * @see https://streams.spec.whatwg.org/#readablestream-locked
- * @param {BodyReadable} bodyReadable
- * @returns {boolean}
- */
- function isLocked (bodyReadable) {
- // Consume is an implicit lock.
- return bodyReadable[kBody]?.locked === true || bodyReadable[kConsume] !== null
- }
- /**
- * @see https://fetch.spec.whatwg.org/#body-unusable
- * @param {BodyReadable} bodyReadable
- * @returns {boolean}
- */
- function isUnusable (bodyReadable) {
- return util.isDisturbed(bodyReadable) || isLocked(bodyReadable)
- }
- /**
- * @typedef {object} Consume
- * @property {string} type
- * @property {BodyReadable} stream
- * @property {((value?: any) => void)} resolve
- * @property {((err: Error) => void)} reject
- * @property {number} length
- * @property {Buffer[]} body
- */
- /**
- * @param {BodyReadable} stream
- * @param {string} type
- * @returns {Promise<any>}
- */
- function consume (stream, type) {
- assert(!stream[kConsume])
- return new Promise((resolve, reject) => {
- if (isUnusable(stream)) {
- const rState = stream._readableState
- if (rState.destroyed && rState.closeEmitted === false) {
- stream
- .on('error', err => {
- reject(err)
- })
- .on('close', () => {
- reject(new TypeError('unusable'))
- })
- } else {
- reject(rState.errored ?? new TypeError('unusable'))
- }
- } else {
- queueMicrotask(() => {
- stream[kConsume] = {
- type,
- stream,
- resolve,
- reject,
- length: 0,
- body: []
- }
- stream
- .on('error', function (err) {
- consumeFinish(this[kConsume], err)
- })
- .on('close', function () {
- if (this[kConsume].body !== null) {
- consumeFinish(this[kConsume], new RequestAbortedError())
- }
- })
- consumeStart(stream[kConsume])
- })
- }
- })
- }
- /**
- * @param {Consume} consume
- * @returns {void}
- */
- function consumeStart (consume) {
- if (consume.body === null) {
- return
- }
- const { _readableState: state } = consume.stream
- if (state.bufferIndex) {
- const start = state.bufferIndex
- const end = state.buffer.length
- for (let n = start; n < end; n++) {
- consumePush(consume, state.buffer[n])
- }
- } else {
- for (const chunk of state.buffer) {
- consumePush(consume, chunk)
- }
- }
- if (state.endEmitted) {
- consumeEnd(this[kConsume], this._readableState.encoding)
- } else {
- consume.stream.on('end', function () {
- consumeEnd(this[kConsume], this._readableState.encoding)
- })
- }
- consume.stream.resume()
- while (consume.stream.read() != null) {
- // Loop
- }
- }
- /**
- * @param {Buffer[]} chunks
- * @param {number} length
- * @param {BufferEncoding} encoding
- * @returns {string}
- */
- function chunksDecode (chunks, length, encoding) {
- if (chunks.length === 0 || length === 0) {
- return ''
- }
- const buffer = chunks.length === 1 ? chunks[0] : Buffer.concat(chunks, length)
- const bufferLength = buffer.length
- // Skip BOM.
- const start =
- bufferLength > 2 &&
- buffer[0] === 0xef &&
- buffer[1] === 0xbb &&
- buffer[2] === 0xbf
- ? 3
- : 0
- if (!encoding || encoding === 'utf8' || encoding === 'utf-8') {
- return buffer.utf8Slice(start, bufferLength)
- } else {
- return buffer.subarray(start, bufferLength).toString(encoding)
- }
- }
- /**
- * @param {Buffer[]} chunks
- * @param {number} length
- * @returns {Uint8Array}
- */
- function chunksConcat (chunks, length) {
- if (chunks.length === 0 || length === 0) {
- return new Uint8Array(0)
- }
- if (chunks.length === 1) {
- // fast-path
- return new Uint8Array(chunks[0])
- }
- const buffer = new Uint8Array(Buffer.allocUnsafeSlow(length).buffer)
- let offset = 0
- for (let i = 0; i < chunks.length; ++i) {
- const chunk = chunks[i]
- buffer.set(chunk, offset)
- offset += chunk.length
- }
- return buffer
- }
- /**
- * @param {Consume} consume
- * @param {BufferEncoding} encoding
- * @returns {void}
- */
- function consumeEnd (consume, encoding) {
- const { type, body, resolve, stream, length } = consume
- try {
- if (type === 'text') {
- resolve(chunksDecode(body, length, encoding))
- } else if (type === 'json') {
- resolve(JSON.parse(chunksDecode(body, length, encoding)))
- } else if (type === 'arrayBuffer') {
- resolve(chunksConcat(body, length).buffer)
- } else if (type === 'blob') {
- resolve(new Blob(body, { type: stream[kContentType] }))
- } else if (type === 'bytes') {
- resolve(chunksConcat(body, length))
- }
- consumeFinish(consume)
- } catch (err) {
- stream.destroy(err)
- }
- }
- /**
- * @param {Consume} consume
- * @param {Buffer} chunk
- * @returns {void}
- */
- function consumePush (consume, chunk) {
- consume.length += chunk.length
- consume.body.push(chunk)
- }
- /**
- * @param {Consume} consume
- * @param {Error} [err]
- * @returns {void}
- */
- function consumeFinish (consume, err) {
- if (consume.body === null) {
- return
- }
- if (err) {
- consume.reject(err)
- } else {
- consume.resolve()
- }
- // Reset the consume object to allow for garbage collection.
- consume.type = null
- consume.stream = null
- consume.resolve = null
- consume.reject = null
- consume.length = 0
- consume.body = null
- }
- module.exports = {
- Readable: BodyReadable,
- chunksDecode
- }
|