client-h2.js 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798
  1. 'use strict'
  2. const assert = require('node:assert')
  3. const { pipeline } = require('node:stream')
  4. const util = require('../core/util.js')
  5. const {
  6. RequestContentLengthMismatchError,
  7. RequestAbortedError,
  8. SocketError,
  9. InformationalError
  10. } = require('../core/errors.js')
  11. const {
  12. kUrl,
  13. kReset,
  14. kClient,
  15. kRunning,
  16. kPending,
  17. kQueue,
  18. kPendingIdx,
  19. kRunningIdx,
  20. kError,
  21. kSocket,
  22. kStrictContentLength,
  23. kOnError,
  24. kMaxConcurrentStreams,
  25. kHTTP2Session,
  26. kResume,
  27. kSize,
  28. kHTTPContext,
  29. kClosed,
  30. kBodyTimeout
  31. } = require('../core/symbols.js')
  32. const { channels } = require('../core/diagnostics.js')
  33. const kOpenStreams = Symbol('open streams')
  34. let extractBody
  35. /** @type {import('http2')} */
  36. let http2
  37. try {
  38. http2 = require('node:http2')
  39. } catch {
  40. // @ts-ignore
  41. http2 = { constants: {} }
  42. }
  43. const {
  44. constants: {
  45. HTTP2_HEADER_AUTHORITY,
  46. HTTP2_HEADER_METHOD,
  47. HTTP2_HEADER_PATH,
  48. HTTP2_HEADER_SCHEME,
  49. HTTP2_HEADER_CONTENT_LENGTH,
  50. HTTP2_HEADER_EXPECT,
  51. HTTP2_HEADER_STATUS
  52. }
  53. } = http2
  54. function parseH2Headers (headers) {
  55. const result = []
  56. for (const [name, value] of Object.entries(headers)) {
  57. // h2 may concat the header value by array
  58. // e.g. Set-Cookie
  59. if (Array.isArray(value)) {
  60. for (const subvalue of value) {
  61. // we need to provide each header value of header name
  62. // because the headers handler expect name-value pair
  63. result.push(Buffer.from(name), Buffer.from(subvalue))
  64. }
  65. } else {
  66. result.push(Buffer.from(name), Buffer.from(value))
  67. }
  68. }
  69. return result
  70. }
  71. async function connectH2 (client, socket) {
  72. client[kSocket] = socket
  73. const session = http2.connect(client[kUrl], {
  74. createConnection: () => socket,
  75. peerMaxConcurrentStreams: client[kMaxConcurrentStreams],
  76. settings: {
  77. // TODO(metcoder95): add support for PUSH
  78. enablePush: false
  79. }
  80. })
  81. session[kOpenStreams] = 0
  82. session[kClient] = client
  83. session[kSocket] = socket
  84. session[kHTTP2Session] = null
  85. util.addListener(session, 'error', onHttp2SessionError)
  86. util.addListener(session, 'frameError', onHttp2FrameError)
  87. util.addListener(session, 'end', onHttp2SessionEnd)
  88. util.addListener(session, 'goaway', onHttp2SessionGoAway)
  89. util.addListener(session, 'close', onHttp2SessionClose)
  90. session.unref()
  91. client[kHTTP2Session] = session
  92. socket[kHTTP2Session] = session
  93. util.addListener(socket, 'error', onHttp2SocketError)
  94. util.addListener(socket, 'end', onHttp2SocketEnd)
  95. util.addListener(socket, 'close', onHttp2SocketClose)
  96. socket[kClosed] = false
  97. socket.on('close', onSocketClose)
  98. return {
  99. version: 'h2',
  100. defaultPipelining: Infinity,
  101. write (request) {
  102. return writeH2(client, request)
  103. },
  104. resume () {
  105. resumeH2(client)
  106. },
  107. destroy (err, callback) {
  108. if (socket[kClosed]) {
  109. queueMicrotask(callback)
  110. } else {
  111. socket.destroy(err).on('close', callback)
  112. }
  113. },
  114. get destroyed () {
  115. return socket.destroyed
  116. },
  117. busy () {
  118. return false
  119. }
  120. }
  121. }
  122. function resumeH2 (client) {
  123. const socket = client[kSocket]
  124. if (socket?.destroyed === false) {
  125. if (client[kSize] === 0 || client[kMaxConcurrentStreams] === 0) {
  126. socket.unref()
  127. client[kHTTP2Session].unref()
  128. } else {
  129. socket.ref()
  130. client[kHTTP2Session].ref()
  131. }
  132. }
  133. }
  134. function onHttp2SessionError (err) {
  135. assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
  136. this[kSocket][kError] = err
  137. this[kClient][kOnError](err)
  138. }
  139. function onHttp2FrameError (type, code, id) {
  140. if (id === 0) {
  141. const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
  142. this[kSocket][kError] = err
  143. this[kClient][kOnError](err)
  144. }
  145. }
  146. function onHttp2SessionEnd () {
  147. const err = new SocketError('other side closed', util.getSocketInfo(this[kSocket]))
  148. this.destroy(err)
  149. util.destroy(this[kSocket], err)
  150. }
  151. /**
  152. * This is the root cause of #3011
  153. * We need to handle GOAWAY frames properly, and trigger the session close
  154. * along with the socket right away
  155. *
  156. * @this {import('http2').ClientHttp2Session}
  157. * @param {number} errorCode
  158. */
  159. function onHttp2SessionGoAway (errorCode) {
  160. // TODO(mcollina): Verify if GOAWAY implements the spec correctly:
  161. // https://datatracker.ietf.org/doc/html/rfc7540#section-6.8
  162. // Specifically, we do not verify the "valid" stream id.
  163. const err = this[kError] || new SocketError(`HTTP/2: "GOAWAY" frame received with code ${errorCode}`, util.getSocketInfo(this[kSocket]))
  164. const client = this[kClient]
  165. client[kSocket] = null
  166. client[kHTTPContext] = null
  167. // this is an HTTP2 session
  168. this.close()
  169. this[kHTTP2Session] = null
  170. util.destroy(this[kSocket], err)
  171. // Fail head of pipeline.
  172. if (client[kRunningIdx] < client[kQueue].length) {
  173. const request = client[kQueue][client[kRunningIdx]]
  174. client[kQueue][client[kRunningIdx]++] = null
  175. util.errorRequest(client, request, err)
  176. client[kPendingIdx] = client[kRunningIdx]
  177. }
  178. assert(client[kRunning] === 0)
  179. client.emit('disconnect', client[kUrl], [client], err)
  180. client.emit('connectionError', client[kUrl], [client], err)
  181. client[kResume]()
  182. }
  183. function onHttp2SessionClose () {
  184. const { [kClient]: client } = this
  185. const { [kSocket]: socket } = client
  186. const err = this[kSocket][kError] || this[kError] || new SocketError('closed', util.getSocketInfo(socket))
  187. client[kSocket] = null
  188. client[kHTTPContext] = null
  189. if (client.destroyed) {
  190. assert(client[kPending] === 0)
  191. // Fail entire queue.
  192. const requests = client[kQueue].splice(client[kRunningIdx])
  193. for (let i = 0; i < requests.length; i++) {
  194. const request = requests[i]
  195. util.errorRequest(client, request, err)
  196. }
  197. }
  198. }
  199. function onHttp2SocketClose () {
  200. const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
  201. const client = this[kHTTP2Session][kClient]
  202. client[kSocket] = null
  203. client[kHTTPContext] = null
  204. if (this[kHTTP2Session] !== null) {
  205. this[kHTTP2Session].destroy(err)
  206. }
  207. client[kPendingIdx] = client[kRunningIdx]
  208. assert(client[kRunning] === 0)
  209. client.emit('disconnect', client[kUrl], [client], err)
  210. client[kResume]()
  211. }
  212. function onHttp2SocketError (err) {
  213. assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
  214. this[kError] = err
  215. this[kClient][kOnError](err)
  216. }
  217. function onHttp2SocketEnd () {
  218. util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
  219. }
  220. function onSocketClose () {
  221. this[kClosed] = true
  222. }
  223. // https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
  224. function shouldSendContentLength (method) {
  225. return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
  226. }
  227. function writeH2 (client, request) {
  228. const requestTimeout = request.bodyTimeout ?? client[kBodyTimeout]
  229. const session = client[kHTTP2Session]
  230. const { method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request
  231. let { body } = request
  232. if (upgrade) {
  233. util.errorRequest(client, request, new Error('Upgrade not supported for H2'))
  234. return false
  235. }
  236. const headers = {}
  237. for (let n = 0; n < reqHeaders.length; n += 2) {
  238. const key = reqHeaders[n + 0]
  239. const val = reqHeaders[n + 1]
  240. if (Array.isArray(val)) {
  241. for (let i = 0; i < val.length; i++) {
  242. if (headers[key]) {
  243. headers[key] += `, ${val[i]}`
  244. } else {
  245. headers[key] = val[i]
  246. }
  247. }
  248. } else if (headers[key]) {
  249. headers[key] += `, ${val}`
  250. } else {
  251. headers[key] = val
  252. }
  253. }
  254. /** @type {import('node:http2').ClientHttp2Stream} */
  255. let stream = null
  256. const { hostname, port } = client[kUrl]
  257. headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}`
  258. headers[HTTP2_HEADER_METHOD] = method
  259. const abort = (err) => {
  260. if (request.aborted || request.completed) {
  261. return
  262. }
  263. err = err || new RequestAbortedError()
  264. util.errorRequest(client, request, err)
  265. if (stream != null) {
  266. // Some chunks might still come after abort,
  267. // let's ignore them
  268. stream.removeAllListeners('data')
  269. // On Abort, we close the stream to send RST_STREAM frame
  270. stream.close()
  271. // We move the running index to the next request
  272. client[kOnError](err)
  273. client[kResume]()
  274. }
  275. // We do not destroy the socket as we can continue using the session
  276. // the stream gets destroyed and the session remains to create new streams
  277. util.destroy(body, err)
  278. }
  279. try {
  280. // We are already connected, streams are pending.
  281. // We can call on connect, and wait for abort
  282. request.onConnect(abort)
  283. } catch (err) {
  284. util.errorRequest(client, request, err)
  285. }
  286. if (request.aborted) {
  287. return false
  288. }
  289. if (method === 'CONNECT') {
  290. session.ref()
  291. // We are already connected, streams are pending, first request
  292. // will create a new stream. We trigger a request to create the stream and wait until
  293. // `ready` event is triggered
  294. // We disabled endStream to allow the user to write to the stream
  295. stream = session.request(headers, { endStream: false, signal })
  296. if (!stream.pending) {
  297. request.onUpgrade(null, null, stream)
  298. ++session[kOpenStreams]
  299. client[kQueue][client[kRunningIdx]++] = null
  300. } else {
  301. stream.once('ready', () => {
  302. request.onUpgrade(null, null, stream)
  303. ++session[kOpenStreams]
  304. client[kQueue][client[kRunningIdx]++] = null
  305. })
  306. }
  307. stream.once('close', () => {
  308. session[kOpenStreams] -= 1
  309. if (session[kOpenStreams] === 0) session.unref()
  310. })
  311. stream.setTimeout(requestTimeout)
  312. return true
  313. }
  314. // https://tools.ietf.org/html/rfc7540#section-8.3
  315. // :path and :scheme headers must be omitted when sending CONNECT
  316. headers[HTTP2_HEADER_PATH] = path
  317. headers[HTTP2_HEADER_SCHEME] = 'https'
  318. // https://tools.ietf.org/html/rfc7231#section-4.3.1
  319. // https://tools.ietf.org/html/rfc7231#section-4.3.2
  320. // https://tools.ietf.org/html/rfc7231#section-4.3.5
  321. // Sending a payload body on a request that does not
  322. // expect it can cause undefined behavior on some
  323. // servers and corrupt connection state. Do not
  324. // re-use the connection for further requests.
  325. const expectsPayload = (
  326. method === 'PUT' ||
  327. method === 'POST' ||
  328. method === 'PATCH'
  329. )
  330. if (body && typeof body.read === 'function') {
  331. // Try to read EOF in order to get length.
  332. body.read(0)
  333. }
  334. let contentLength = util.bodyLength(body)
  335. if (util.isFormDataLike(body)) {
  336. extractBody ??= require('../web/fetch/body.js').extractBody
  337. const [bodyStream, contentType] = extractBody(body)
  338. headers['content-type'] = contentType
  339. body = bodyStream.stream
  340. contentLength = bodyStream.length
  341. }
  342. if (contentLength == null) {
  343. contentLength = request.contentLength
  344. }
  345. if (contentLength === 0 || !expectsPayload) {
  346. // https://tools.ietf.org/html/rfc7230#section-3.3.2
  347. // A user agent SHOULD NOT send a Content-Length header field when
  348. // the request message does not contain a payload body and the method
  349. // semantics do not anticipate such a body.
  350. contentLength = null
  351. }
  352. // https://github.com/nodejs/undici/issues/2046
  353. // A user agent may send a Content-Length header with 0 value, this should be allowed.
  354. if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) {
  355. if (client[kStrictContentLength]) {
  356. util.errorRequest(client, request, new RequestContentLengthMismatchError())
  357. return false
  358. }
  359. process.emitWarning(new RequestContentLengthMismatchError())
  360. }
  361. if (contentLength != null) {
  362. assert(body, 'no body must not have content length')
  363. headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}`
  364. }
  365. session.ref()
  366. if (channels.sendHeaders.hasSubscribers) {
  367. let header = ''
  368. for (const key in headers) {
  369. header += `${key}: ${headers[key]}\r\n`
  370. }
  371. channels.sendHeaders.publish({ request, headers: header, socket: session[kSocket] })
  372. }
  373. // TODO(metcoder95): add support for sending trailers
  374. const shouldEndStream = method === 'GET' || method === 'HEAD' || body === null
  375. if (expectContinue) {
  376. headers[HTTP2_HEADER_EXPECT] = '100-continue'
  377. stream = session.request(headers, { endStream: shouldEndStream, signal })
  378. stream.once('continue', writeBodyH2)
  379. } else {
  380. stream = session.request(headers, {
  381. endStream: shouldEndStream,
  382. signal
  383. })
  384. writeBodyH2()
  385. }
  386. // Increment counter as we have new streams open
  387. ++session[kOpenStreams]
  388. stream.setTimeout(requestTimeout)
  389. stream.once('response', headers => {
  390. const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
  391. request.onResponseStarted()
  392. // Due to the stream nature, it is possible we face a race condition
  393. // where the stream has been assigned, but the request has been aborted
  394. // the request remains in-flight and headers hasn't been received yet
  395. // for those scenarios, best effort is to destroy the stream immediately
  396. // as there's no value to keep it open.
  397. if (request.aborted) {
  398. stream.removeAllListeners('data')
  399. return
  400. }
  401. if (request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), stream.resume.bind(stream), '') === false) {
  402. stream.pause()
  403. }
  404. })
  405. stream.on('data', (chunk) => {
  406. if (request.onData(chunk) === false) {
  407. stream.pause()
  408. }
  409. })
  410. stream.once('end', (err) => {
  411. stream.removeAllListeners('data')
  412. // When state is null, it means we haven't consumed body and the stream still do not have
  413. // a state.
  414. // Present specially when using pipeline or stream
  415. if (stream.state?.state == null || stream.state.state < 6) {
  416. // Do not complete the request if it was aborted
  417. // Not prone to happen for as safety net to avoid race conditions with 'trailers'
  418. if (!request.aborted && !request.completed) {
  419. request.onComplete({})
  420. }
  421. client[kQueue][client[kRunningIdx]++] = null
  422. client[kResume]()
  423. } else {
  424. // Stream is closed or half-closed-remote (6), decrement counter and cleanup
  425. // It does not have sense to continue working with the stream as we do not
  426. // have yet RST_STREAM support on client-side
  427. --session[kOpenStreams]
  428. if (session[kOpenStreams] === 0) {
  429. session.unref()
  430. }
  431. abort(err ?? new InformationalError('HTTP/2: stream half-closed (remote)'))
  432. client[kQueue][client[kRunningIdx]++] = null
  433. client[kPendingIdx] = client[kRunningIdx]
  434. client[kResume]()
  435. }
  436. })
  437. stream.once('close', () => {
  438. stream.removeAllListeners('data')
  439. session[kOpenStreams] -= 1
  440. if (session[kOpenStreams] === 0) {
  441. session.unref()
  442. }
  443. })
  444. stream.once('error', function (err) {
  445. stream.removeAllListeners('data')
  446. abort(err)
  447. })
  448. stream.once('frameError', (type, code) => {
  449. stream.removeAllListeners('data')
  450. abort(new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`))
  451. })
  452. stream.on('aborted', () => {
  453. stream.removeAllListeners('data')
  454. })
  455. stream.on('timeout', () => {
  456. const err = new InformationalError(`HTTP/2: "stream timeout after ${requestTimeout}"`)
  457. stream.removeAllListeners('data')
  458. session[kOpenStreams] -= 1
  459. if (session[kOpenStreams] === 0) {
  460. session.unref()
  461. }
  462. abort(err)
  463. })
  464. stream.once('trailers', trailers => {
  465. if (request.aborted || request.completed) {
  466. return
  467. }
  468. request.onComplete(trailers)
  469. })
  470. return true
  471. function writeBodyH2 () {
  472. /* istanbul ignore else: assertion */
  473. if (!body || contentLength === 0) {
  474. writeBuffer(
  475. abort,
  476. stream,
  477. null,
  478. client,
  479. request,
  480. client[kSocket],
  481. contentLength,
  482. expectsPayload
  483. )
  484. } else if (util.isBuffer(body)) {
  485. writeBuffer(
  486. abort,
  487. stream,
  488. body,
  489. client,
  490. request,
  491. client[kSocket],
  492. contentLength,
  493. expectsPayload
  494. )
  495. } else if (util.isBlobLike(body)) {
  496. if (typeof body.stream === 'function') {
  497. writeIterable(
  498. abort,
  499. stream,
  500. body.stream(),
  501. client,
  502. request,
  503. client[kSocket],
  504. contentLength,
  505. expectsPayload
  506. )
  507. } else {
  508. writeBlob(
  509. abort,
  510. stream,
  511. body,
  512. client,
  513. request,
  514. client[kSocket],
  515. contentLength,
  516. expectsPayload
  517. )
  518. }
  519. } else if (util.isStream(body)) {
  520. writeStream(
  521. abort,
  522. client[kSocket],
  523. expectsPayload,
  524. stream,
  525. body,
  526. client,
  527. request,
  528. contentLength
  529. )
  530. } else if (util.isIterable(body)) {
  531. writeIterable(
  532. abort,
  533. stream,
  534. body,
  535. client,
  536. request,
  537. client[kSocket],
  538. contentLength,
  539. expectsPayload
  540. )
  541. } else {
  542. assert(false)
  543. }
  544. }
  545. }
  546. function writeBuffer (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
  547. try {
  548. if (body != null && util.isBuffer(body)) {
  549. assert(contentLength === body.byteLength, 'buffer body must have content length')
  550. h2stream.cork()
  551. h2stream.write(body)
  552. h2stream.uncork()
  553. h2stream.end()
  554. request.onBodySent(body)
  555. }
  556. if (!expectsPayload) {
  557. socket[kReset] = true
  558. }
  559. request.onRequestSent()
  560. client[kResume]()
  561. } catch (error) {
  562. abort(error)
  563. }
  564. }
  565. function writeStream (abort, socket, expectsPayload, h2stream, body, client, request, contentLength) {
  566. assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')
  567. // For HTTP/2, is enough to pipe the stream
  568. const pipe = pipeline(
  569. body,
  570. h2stream,
  571. (err) => {
  572. if (err) {
  573. util.destroy(pipe, err)
  574. abort(err)
  575. } else {
  576. util.removeAllListeners(pipe)
  577. request.onRequestSent()
  578. if (!expectsPayload) {
  579. socket[kReset] = true
  580. }
  581. client[kResume]()
  582. }
  583. }
  584. )
  585. util.addListener(pipe, 'data', onPipeData)
  586. function onPipeData (chunk) {
  587. request.onBodySent(chunk)
  588. }
  589. }
  590. async function writeBlob (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
  591. assert(contentLength === body.size, 'blob body must have content length')
  592. try {
  593. if (contentLength != null && contentLength !== body.size) {
  594. throw new RequestContentLengthMismatchError()
  595. }
  596. const buffer = Buffer.from(await body.arrayBuffer())
  597. h2stream.cork()
  598. h2stream.write(buffer)
  599. h2stream.uncork()
  600. h2stream.end()
  601. request.onBodySent(buffer)
  602. request.onRequestSent()
  603. if (!expectsPayload) {
  604. socket[kReset] = true
  605. }
  606. client[kResume]()
  607. } catch (err) {
  608. abort(err)
  609. }
  610. }
  611. async function writeIterable (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
  612. assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')
  613. let callback = null
  614. function onDrain () {
  615. if (callback) {
  616. const cb = callback
  617. callback = null
  618. cb()
  619. }
  620. }
  621. const waitForDrain = () => new Promise((resolve, reject) => {
  622. assert(callback === null)
  623. if (socket[kError]) {
  624. reject(socket[kError])
  625. } else {
  626. callback = resolve
  627. }
  628. })
  629. h2stream
  630. .on('close', onDrain)
  631. .on('drain', onDrain)
  632. try {
  633. // It's up to the user to somehow abort the async iterable.
  634. for await (const chunk of body) {
  635. if (socket[kError]) {
  636. throw socket[kError]
  637. }
  638. const res = h2stream.write(chunk)
  639. request.onBodySent(chunk)
  640. if (!res) {
  641. await waitForDrain()
  642. }
  643. }
  644. h2stream.end()
  645. request.onRequestSent()
  646. if (!expectsPayload) {
  647. socket[kReset] = true
  648. }
  649. client[kResume]()
  650. } catch (err) {
  651. abort(err)
  652. } finally {
  653. h2stream
  654. .off('close', onDrain)
  655. .off('drain', onDrain)
  656. }
  657. }
  658. module.exports = connectH2