api-pipeline.js 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. 'use strict'
  2. const {
  3. Readable,
  4. Duplex,
  5. PassThrough
  6. } = require('node:stream')
  7. const assert = require('node:assert')
  8. const { AsyncResource } = require('node:async_hooks')
  9. const {
  10. InvalidArgumentError,
  11. InvalidReturnValueError,
  12. RequestAbortedError
  13. } = require('../core/errors')
  14. const util = require('../core/util')
  15. const { addSignal, removeSignal } = require('./abort-signal')
  16. function noop () {}
  17. const kResume = Symbol('resume')
  18. class PipelineRequest extends Readable {
  19. constructor () {
  20. super({ autoDestroy: true })
  21. this[kResume] = null
  22. }
  23. _read () {
  24. const { [kResume]: resume } = this
  25. if (resume) {
  26. this[kResume] = null
  27. resume()
  28. }
  29. }
  30. _destroy (err, callback) {
  31. this._read()
  32. callback(err)
  33. }
  34. }
  35. class PipelineResponse extends Readable {
  36. constructor (resume) {
  37. super({ autoDestroy: true })
  38. this[kResume] = resume
  39. }
  40. _read () {
  41. this[kResume]()
  42. }
  43. _destroy (err, callback) {
  44. if (!err && !this._readableState.endEmitted) {
  45. err = new RequestAbortedError()
  46. }
  47. callback(err)
  48. }
  49. }
  50. class PipelineHandler extends AsyncResource {
  51. constructor (opts, handler) {
  52. if (!opts || typeof opts !== 'object') {
  53. throw new InvalidArgumentError('invalid opts')
  54. }
  55. if (typeof handler !== 'function') {
  56. throw new InvalidArgumentError('invalid handler')
  57. }
  58. const { signal, method, opaque, onInfo, responseHeaders } = opts
  59. if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
  60. throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
  61. }
  62. if (method === 'CONNECT') {
  63. throw new InvalidArgumentError('invalid method')
  64. }
  65. if (onInfo && typeof onInfo !== 'function') {
  66. throw new InvalidArgumentError('invalid onInfo callback')
  67. }
  68. super('UNDICI_PIPELINE')
  69. this.opaque = opaque || null
  70. this.responseHeaders = responseHeaders || null
  71. this.handler = handler
  72. this.abort = null
  73. this.context = null
  74. this.onInfo = onInfo || null
  75. this.req = new PipelineRequest().on('error', noop)
  76. this.ret = new Duplex({
  77. readableObjectMode: opts.objectMode,
  78. autoDestroy: true,
  79. read: () => {
  80. const { body } = this
  81. if (body?.resume) {
  82. body.resume()
  83. }
  84. },
  85. write: (chunk, encoding, callback) => {
  86. const { req } = this
  87. if (req.push(chunk, encoding) || req._readableState.destroyed) {
  88. callback()
  89. } else {
  90. req[kResume] = callback
  91. }
  92. },
  93. destroy: (err, callback) => {
  94. const { body, req, res, ret, abort } = this
  95. if (!err && !ret._readableState.endEmitted) {
  96. err = new RequestAbortedError()
  97. }
  98. if (abort && err) {
  99. abort()
  100. }
  101. util.destroy(body, err)
  102. util.destroy(req, err)
  103. util.destroy(res, err)
  104. removeSignal(this)
  105. callback(err)
  106. }
  107. }).on('prefinish', () => {
  108. const { req } = this
  109. // Node < 15 does not call _final in same tick.
  110. req.push(null)
  111. })
  112. this.res = null
  113. addSignal(this, signal)
  114. }
  115. onConnect (abort, context) {
  116. const { res } = this
  117. if (this.reason) {
  118. abort(this.reason)
  119. return
  120. }
  121. assert(!res, 'pipeline cannot be retried')
  122. this.abort = abort
  123. this.context = context
  124. }
  125. onHeaders (statusCode, rawHeaders, resume) {
  126. const { opaque, handler, context } = this
  127. if (statusCode < 200) {
  128. if (this.onInfo) {
  129. const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
  130. this.onInfo({ statusCode, headers })
  131. }
  132. return
  133. }
  134. this.res = new PipelineResponse(resume)
  135. let body
  136. try {
  137. this.handler = null
  138. const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
  139. body = this.runInAsyncScope(handler, null, {
  140. statusCode,
  141. headers,
  142. opaque,
  143. body: this.res,
  144. context
  145. })
  146. } catch (err) {
  147. this.res.on('error', noop)
  148. throw err
  149. }
  150. if (!body || typeof body.on !== 'function') {
  151. throw new InvalidReturnValueError('expected Readable')
  152. }
  153. body
  154. .on('data', (chunk) => {
  155. const { ret, body } = this
  156. if (!ret.push(chunk) && body.pause) {
  157. body.pause()
  158. }
  159. })
  160. .on('error', (err) => {
  161. const { ret } = this
  162. util.destroy(ret, err)
  163. })
  164. .on('end', () => {
  165. const { ret } = this
  166. ret.push(null)
  167. })
  168. .on('close', () => {
  169. const { ret } = this
  170. if (!ret._readableState.ended) {
  171. util.destroy(ret, new RequestAbortedError())
  172. }
  173. })
  174. this.body = body
  175. }
  176. onData (chunk) {
  177. const { res } = this
  178. return res.push(chunk)
  179. }
  180. onComplete (trailers) {
  181. const { res } = this
  182. res.push(null)
  183. }
  184. onError (err) {
  185. const { ret } = this
  186. this.handler = null
  187. util.destroy(ret, err)
  188. }
  189. }
  190. function pipeline (opts, handler) {
  191. try {
  192. const pipelineHandler = new PipelineHandler(opts, handler)
  193. this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)
  194. return pipelineHandler.ret
  195. } catch (err) {
  196. return new PassThrough().destroy(err)
  197. }
  198. }
  199. module.exports = pipeline