client-h1.js 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606
  1. 'use strict'
  2. /* global WebAssembly */
  3. const assert = require('node:assert')
  4. const util = require('../core/util.js')
  5. const { channels } = require('../core/diagnostics.js')
  6. const timers = require('../util/timers.js')
  7. const {
  8. RequestContentLengthMismatchError,
  9. ResponseContentLengthMismatchError,
  10. RequestAbortedError,
  11. HeadersTimeoutError,
  12. HeadersOverflowError,
  13. SocketError,
  14. InformationalError,
  15. BodyTimeoutError,
  16. HTTPParserError,
  17. ResponseExceededMaxSizeError
  18. } = require('../core/errors.js')
  19. const {
  20. kUrl,
  21. kReset,
  22. kClient,
  23. kParser,
  24. kBlocking,
  25. kRunning,
  26. kPending,
  27. kSize,
  28. kWriting,
  29. kQueue,
  30. kNoRef,
  31. kKeepAliveDefaultTimeout,
  32. kHostHeader,
  33. kPendingIdx,
  34. kRunningIdx,
  35. kError,
  36. kPipelining,
  37. kSocket,
  38. kKeepAliveTimeoutValue,
  39. kMaxHeadersSize,
  40. kKeepAliveMaxTimeout,
  41. kKeepAliveTimeoutThreshold,
  42. kHeadersTimeout,
  43. kBodyTimeout,
  44. kStrictContentLength,
  45. kMaxRequests,
  46. kCounter,
  47. kMaxResponseSize,
  48. kOnError,
  49. kResume,
  50. kHTTPContext,
  51. kClosed
  52. } = require('../core/symbols.js')
  53. const constants = require('../llhttp/constants.js')
  54. const EMPTY_BUF = Buffer.alloc(0)
  55. const FastBuffer = Buffer[Symbol.species]
  56. const removeAllListeners = util.removeAllListeners
  57. let extractBody
  58. function lazyllhttp () {
  59. const llhttpWasmData = process.env.JEST_WORKER_ID ? require('../llhttp/llhttp-wasm.js') : undefined
  60. let mod
  61. try {
  62. mod = new WebAssembly.Module(require('../llhttp/llhttp_simd-wasm.js'))
  63. } catch (e) {
  64. /* istanbul ignore next */
  65. // We could check if the error was caused by the simd option not
  66. // being enabled, but the occurring of this other error
  67. // * https://github.com/emscripten-core/emscripten/issues/11495
  68. // got me to remove that check to avoid breaking Node 12.
  69. mod = new WebAssembly.Module(llhttpWasmData || require('../llhttp/llhttp-wasm.js'))
  70. }
  71. return new WebAssembly.Instance(mod, {
  72. env: {
  73. /**
  74. * @param {number} p
  75. * @param {number} at
  76. * @param {number} len
  77. * @returns {number}
  78. */
  79. wasm_on_url: (p, at, len) => {
  80. /* istanbul ignore next */
  81. return 0
  82. },
  83. /**
  84. * @param {number} p
  85. * @param {number} at
  86. * @param {number} len
  87. * @returns {number}
  88. */
  89. wasm_on_status: (p, at, len) => {
  90. assert(currentParser.ptr === p)
  91. const start = at - currentBufferPtr + currentBufferRef.byteOffset
  92. return currentParser.onStatus(new FastBuffer(currentBufferRef.buffer, start, len))
  93. },
  94. /**
  95. * @param {number} p
  96. * @returns {number}
  97. */
  98. wasm_on_message_begin: (p) => {
  99. assert(currentParser.ptr === p)
  100. return currentParser.onMessageBegin()
  101. },
  102. /**
  103. * @param {number} p
  104. * @param {number} at
  105. * @param {number} len
  106. * @returns {number}
  107. */
  108. wasm_on_header_field: (p, at, len) => {
  109. assert(currentParser.ptr === p)
  110. const start = at - currentBufferPtr + currentBufferRef.byteOffset
  111. return currentParser.onHeaderField(new FastBuffer(currentBufferRef.buffer, start, len))
  112. },
  113. /**
  114. * @param {number} p
  115. * @param {number} at
  116. * @param {number} len
  117. * @returns {number}
  118. */
  119. wasm_on_header_value: (p, at, len) => {
  120. assert(currentParser.ptr === p)
  121. const start = at - currentBufferPtr + currentBufferRef.byteOffset
  122. return currentParser.onHeaderValue(new FastBuffer(currentBufferRef.buffer, start, len))
  123. },
  124. /**
  125. * @param {number} p
  126. * @param {number} statusCode
  127. * @param {0|1} upgrade
  128. * @param {0|1} shouldKeepAlive
  129. * @returns {number}
  130. */
  131. wasm_on_headers_complete: (p, statusCode, upgrade, shouldKeepAlive) => {
  132. assert(currentParser.ptr === p)
  133. return currentParser.onHeadersComplete(statusCode, upgrade === 1, shouldKeepAlive === 1)
  134. },
  135. /**
  136. * @param {number} p
  137. * @param {number} at
  138. * @param {number} len
  139. * @returns {number}
  140. */
  141. wasm_on_body: (p, at, len) => {
  142. assert(currentParser.ptr === p)
  143. const start = at - currentBufferPtr + currentBufferRef.byteOffset
  144. return currentParser.onBody(new FastBuffer(currentBufferRef.buffer, start, len))
  145. },
  146. /**
  147. * @param {number} p
  148. * @returns {number}
  149. */
  150. wasm_on_message_complete: (p) => {
  151. assert(currentParser.ptr === p)
  152. return currentParser.onMessageComplete()
  153. }
  154. }
  155. })
  156. }
  157. let llhttpInstance = null
  158. /**
  159. * @type {Parser|null}
  160. */
  161. let currentParser = null
  162. let currentBufferRef = null
  163. /**
  164. * @type {number}
  165. */
  166. let currentBufferSize = 0
  167. let currentBufferPtr = null
  168. const USE_NATIVE_TIMER = 0
  169. const USE_FAST_TIMER = 1
  170. // Use fast timers for headers and body to take eventual event loop
  171. // latency into account.
  172. const TIMEOUT_HEADERS = 2 | USE_FAST_TIMER
  173. const TIMEOUT_BODY = 4 | USE_FAST_TIMER
  174. // Use native timers to ignore event loop latency for keep-alive
  175. // handling.
  176. const TIMEOUT_KEEP_ALIVE = 8 | USE_NATIVE_TIMER
  177. class Parser {
  178. /**
  179. * @param {import('./client.js')} client
  180. * @param {import('net').Socket} socket
  181. * @param {*} llhttp
  182. */
  183. constructor (client, socket, { exports }) {
  184. this.llhttp = exports
  185. this.ptr = this.llhttp.llhttp_alloc(constants.TYPE.RESPONSE)
  186. this.client = client
  187. /**
  188. * @type {import('net').Socket}
  189. */
  190. this.socket = socket
  191. this.timeout = null
  192. this.timeoutValue = null
  193. this.timeoutType = null
  194. this.statusCode = 0
  195. this.statusText = ''
  196. this.upgrade = false
  197. this.headers = []
  198. this.headersSize = 0
  199. this.headersMaxSize = client[kMaxHeadersSize]
  200. this.shouldKeepAlive = false
  201. this.paused = false
  202. this.resume = this.resume.bind(this)
  203. this.bytesRead = 0
  204. this.keepAlive = ''
  205. this.contentLength = ''
  206. this.connection = ''
  207. this.maxResponseSize = client[kMaxResponseSize]
  208. }
  209. setTimeout (delay, type) {
  210. // If the existing timer and the new timer are of different timer type
  211. // (fast or native) or have different delay, we need to clear the existing
  212. // timer and set a new one.
  213. if (
  214. delay !== this.timeoutValue ||
  215. (type & USE_FAST_TIMER) ^ (this.timeoutType & USE_FAST_TIMER)
  216. ) {
  217. // If a timeout is already set, clear it with clearTimeout of the fast
  218. // timer implementation, as it can clear fast and native timers.
  219. if (this.timeout) {
  220. timers.clearTimeout(this.timeout)
  221. this.timeout = null
  222. }
  223. if (delay) {
  224. if (type & USE_FAST_TIMER) {
  225. this.timeout = timers.setFastTimeout(onParserTimeout, delay, new WeakRef(this))
  226. } else {
  227. this.timeout = setTimeout(onParserTimeout, delay, new WeakRef(this))
  228. this.timeout?.unref()
  229. }
  230. }
  231. this.timeoutValue = delay
  232. } else if (this.timeout) {
  233. // istanbul ignore else: only for jest
  234. if (this.timeout.refresh) {
  235. this.timeout.refresh()
  236. }
  237. }
  238. this.timeoutType = type
  239. }
  240. resume () {
  241. if (this.socket.destroyed || !this.paused) {
  242. return
  243. }
  244. assert(this.ptr != null)
  245. assert(currentParser === null)
  246. this.llhttp.llhttp_resume(this.ptr)
  247. assert(this.timeoutType === TIMEOUT_BODY)
  248. if (this.timeout) {
  249. // istanbul ignore else: only for jest
  250. if (this.timeout.refresh) {
  251. this.timeout.refresh()
  252. }
  253. }
  254. this.paused = false
  255. this.execute(this.socket.read() || EMPTY_BUF) // Flush parser.
  256. this.readMore()
  257. }
  258. readMore () {
  259. while (!this.paused && this.ptr) {
  260. const chunk = this.socket.read()
  261. if (chunk === null) {
  262. break
  263. }
  264. this.execute(chunk)
  265. }
  266. }
  267. /**
  268. * @param {Buffer} chunk
  269. */
  270. execute (chunk) {
  271. assert(currentParser === null)
  272. assert(this.ptr != null)
  273. assert(!this.paused)
  274. const { socket, llhttp } = this
  275. // Allocate a new buffer if the current buffer is too small.
  276. if (chunk.length > currentBufferSize) {
  277. if (currentBufferPtr) {
  278. llhttp.free(currentBufferPtr)
  279. }
  280. // Allocate a buffer that is a multiple of 4096 bytes.
  281. currentBufferSize = Math.ceil(chunk.length / 4096) * 4096
  282. currentBufferPtr = llhttp.malloc(currentBufferSize)
  283. }
  284. new Uint8Array(llhttp.memory.buffer, currentBufferPtr, currentBufferSize).set(chunk)
  285. // Call `execute` on the wasm parser.
  286. // We pass the `llhttp_parser` pointer address, the pointer address of buffer view data,
  287. // and finally the length of bytes to parse.
  288. // The return value is an error code or `constants.ERROR.OK`.
  289. try {
  290. let ret
  291. try {
  292. currentBufferRef = chunk
  293. currentParser = this
  294. ret = llhttp.llhttp_execute(this.ptr, currentBufferPtr, chunk.length)
  295. /* eslint-disable-next-line no-useless-catch */
  296. } catch (err) {
  297. /* istanbul ignore next: difficult to make a test case for */
  298. throw err
  299. } finally {
  300. currentParser = null
  301. currentBufferRef = null
  302. }
  303. if (ret !== constants.ERROR.OK) {
  304. const data = chunk.subarray(llhttp.llhttp_get_error_pos(this.ptr) - currentBufferPtr)
  305. if (ret === constants.ERROR.PAUSED_UPGRADE) {
  306. this.onUpgrade(data)
  307. } else if (ret === constants.ERROR.PAUSED) {
  308. this.paused = true
  309. socket.unshift(data)
  310. } else {
  311. const ptr = llhttp.llhttp_get_error_reason(this.ptr)
  312. let message = ''
  313. /* istanbul ignore else: difficult to make a test case for */
  314. if (ptr) {
  315. const len = new Uint8Array(llhttp.memory.buffer, ptr).indexOf(0)
  316. message =
  317. 'Response does not match the HTTP/1.1 protocol (' +
  318. Buffer.from(llhttp.memory.buffer, ptr, len).toString() +
  319. ')'
  320. }
  321. throw new HTTPParserError(message, constants.ERROR[ret], data)
  322. }
  323. }
  324. } catch (err) {
  325. util.destroy(socket, err)
  326. }
  327. }
  328. destroy () {
  329. assert(currentParser === null)
  330. assert(this.ptr != null)
  331. this.llhttp.llhttp_free(this.ptr)
  332. this.ptr = null
  333. this.timeout && timers.clearTimeout(this.timeout)
  334. this.timeout = null
  335. this.timeoutValue = null
  336. this.timeoutType = null
  337. this.paused = false
  338. }
  339. /**
  340. * @param {Buffer} buf
  341. * @returns {0}
  342. */
  343. onStatus (buf) {
  344. this.statusText = buf.toString()
  345. return 0
  346. }
  347. /**
  348. * @returns {0|-1}
  349. */
  350. onMessageBegin () {
  351. const { socket, client } = this
  352. /* istanbul ignore next: difficult to make a test case for */
  353. if (socket.destroyed) {
  354. return -1
  355. }
  356. const request = client[kQueue][client[kRunningIdx]]
  357. if (!request) {
  358. return -1
  359. }
  360. request.onResponseStarted()
  361. return 0
  362. }
  363. /**
  364. * @param {Buffer} buf
  365. * @returns {number}
  366. */
  367. onHeaderField (buf) {
  368. const len = this.headers.length
  369. if ((len & 1) === 0) {
  370. this.headers.push(buf)
  371. } else {
  372. this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
  373. }
  374. this.trackHeader(buf.length)
  375. return 0
  376. }
  377. /**
  378. * @param {Buffer} buf
  379. * @returns {number}
  380. */
  381. onHeaderValue (buf) {
  382. let len = this.headers.length
  383. if ((len & 1) === 1) {
  384. this.headers.push(buf)
  385. len += 1
  386. } else {
  387. this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
  388. }
  389. const key = this.headers[len - 2]
  390. if (key.length === 10) {
  391. const headerName = util.bufferToLowerCasedHeaderName(key)
  392. if (headerName === 'keep-alive') {
  393. this.keepAlive += buf.toString()
  394. } else if (headerName === 'connection') {
  395. this.connection += buf.toString()
  396. }
  397. } else if (key.length === 14 && util.bufferToLowerCasedHeaderName(key) === 'content-length') {
  398. this.contentLength += buf.toString()
  399. }
  400. this.trackHeader(buf.length)
  401. return 0
  402. }
  403. /**
  404. * @param {number} len
  405. */
  406. trackHeader (len) {
  407. this.headersSize += len
  408. if (this.headersSize >= this.headersMaxSize) {
  409. util.destroy(this.socket, new HeadersOverflowError())
  410. }
  411. }
  412. /**
  413. * @param {Buffer} head
  414. */
  415. onUpgrade (head) {
  416. const { upgrade, client, socket, headers, statusCode } = this
  417. assert(upgrade)
  418. assert(client[kSocket] === socket)
  419. assert(!socket.destroyed)
  420. assert(!this.paused)
  421. assert((headers.length & 1) === 0)
  422. const request = client[kQueue][client[kRunningIdx]]
  423. assert(request)
  424. assert(request.upgrade || request.method === 'CONNECT')
  425. this.statusCode = 0
  426. this.statusText = ''
  427. this.shouldKeepAlive = false
  428. this.headers = []
  429. this.headersSize = 0
  430. socket.unshift(head)
  431. socket[kParser].destroy()
  432. socket[kParser] = null
  433. socket[kClient] = null
  434. socket[kError] = null
  435. removeAllListeners(socket)
  436. client[kSocket] = null
  437. client[kHTTPContext] = null // TODO (fix): This is hacky...
  438. client[kQueue][client[kRunningIdx]++] = null
  439. client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade'))
  440. try {
  441. request.onUpgrade(statusCode, headers, socket)
  442. } catch (err) {
  443. util.destroy(socket, err)
  444. }
  445. client[kResume]()
  446. }
  447. /**
  448. * @param {number} statusCode
  449. * @param {boolean} upgrade
  450. * @param {boolean} shouldKeepAlive
  451. * @returns {number}
  452. */
  453. onHeadersComplete (statusCode, upgrade, shouldKeepAlive) {
  454. const { client, socket, headers, statusText } = this
  455. /* istanbul ignore next: difficult to make a test case for */
  456. if (socket.destroyed) {
  457. return -1
  458. }
  459. const request = client[kQueue][client[kRunningIdx]]
  460. /* istanbul ignore next: difficult to make a test case for */
  461. if (!request) {
  462. return -1
  463. }
  464. assert(!this.upgrade)
  465. assert(this.statusCode < 200)
  466. if (statusCode === 100) {
  467. util.destroy(socket, new SocketError('bad response', util.getSocketInfo(socket)))
  468. return -1
  469. }
  470. /* this can only happen if server is misbehaving */
  471. if (upgrade && !request.upgrade) {
  472. util.destroy(socket, new SocketError('bad upgrade', util.getSocketInfo(socket)))
  473. return -1
  474. }
  475. assert(this.timeoutType === TIMEOUT_HEADERS)
  476. this.statusCode = statusCode
  477. this.shouldKeepAlive = (
  478. shouldKeepAlive ||
  479. // Override llhttp value which does not allow keepAlive for HEAD.
  480. (request.method === 'HEAD' && !socket[kReset] && this.connection.toLowerCase() === 'keep-alive')
  481. )
  482. if (this.statusCode >= 200) {
  483. const bodyTimeout = request.bodyTimeout != null
  484. ? request.bodyTimeout
  485. : client[kBodyTimeout]
  486. this.setTimeout(bodyTimeout, TIMEOUT_BODY)
  487. } else if (this.timeout) {
  488. // istanbul ignore else: only for jest
  489. if (this.timeout.refresh) {
  490. this.timeout.refresh()
  491. }
  492. }
  493. if (request.method === 'CONNECT') {
  494. assert(client[kRunning] === 1)
  495. this.upgrade = true
  496. return 2
  497. }
  498. if (upgrade) {
  499. assert(client[kRunning] === 1)
  500. this.upgrade = true
  501. return 2
  502. }
  503. assert((this.headers.length & 1) === 0)
  504. this.headers = []
  505. this.headersSize = 0
  506. if (this.shouldKeepAlive && client[kPipelining]) {
  507. const keepAliveTimeout = this.keepAlive ? util.parseKeepAliveTimeout(this.keepAlive) : null
  508. if (keepAliveTimeout != null) {
  509. const timeout = Math.min(
  510. keepAliveTimeout - client[kKeepAliveTimeoutThreshold],
  511. client[kKeepAliveMaxTimeout]
  512. )
  513. if (timeout <= 0) {
  514. socket[kReset] = true
  515. } else {
  516. client[kKeepAliveTimeoutValue] = timeout
  517. }
  518. } else {
  519. client[kKeepAliveTimeoutValue] = client[kKeepAliveDefaultTimeout]
  520. }
  521. } else {
  522. // Stop more requests from being dispatched.
  523. socket[kReset] = true
  524. }
  525. const pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false
  526. if (request.aborted) {
  527. return -1
  528. }
  529. if (request.method === 'HEAD') {
  530. return 1
  531. }
  532. if (statusCode < 200) {
  533. return 1
  534. }
  535. if (socket[kBlocking]) {
  536. socket[kBlocking] = false
  537. client[kResume]()
  538. }
  539. return pause ? constants.ERROR.PAUSED : 0
  540. }
  541. /**
  542. * @param {Buffer} buf
  543. * @returns {number}
  544. */
  545. onBody (buf) {
  546. const { client, socket, statusCode, maxResponseSize } = this
  547. if (socket.destroyed) {
  548. return -1
  549. }
  550. const request = client[kQueue][client[kRunningIdx]]
  551. assert(request)
  552. assert(this.timeoutType === TIMEOUT_BODY)
  553. if (this.timeout) {
  554. // istanbul ignore else: only for jest
  555. if (this.timeout.refresh) {
  556. this.timeout.refresh()
  557. }
  558. }
  559. assert(statusCode >= 200)
  560. if (maxResponseSize > -1 && this.bytesRead + buf.length > maxResponseSize) {
  561. util.destroy(socket, new ResponseExceededMaxSizeError())
  562. return -1
  563. }
  564. this.bytesRead += buf.length
  565. if (request.onData(buf) === false) {
  566. return constants.ERROR.PAUSED
  567. }
  568. return 0
  569. }
  570. /**
  571. * @returns {number}
  572. */
  573. onMessageComplete () {
  574. const { client, socket, statusCode, upgrade, headers, contentLength, bytesRead, shouldKeepAlive } = this
  575. if (socket.destroyed && (!statusCode || shouldKeepAlive)) {
  576. return -1
  577. }
  578. if (upgrade) {
  579. return 0
  580. }
  581. assert(statusCode >= 100)
  582. assert((this.headers.length & 1) === 0)
  583. const request = client[kQueue][client[kRunningIdx]]
  584. assert(request)
  585. this.statusCode = 0
  586. this.statusText = ''
  587. this.bytesRead = 0
  588. this.contentLength = ''
  589. this.keepAlive = ''
  590. this.connection = ''
  591. this.headers = []
  592. this.headersSize = 0
  593. if (statusCode < 200) {
  594. return 0
  595. }
  596. /* istanbul ignore next: should be handled by llhttp? */
  597. if (request.method !== 'HEAD' && contentLength && bytesRead !== parseInt(contentLength, 10)) {
  598. util.destroy(socket, new ResponseContentLengthMismatchError())
  599. return -1
  600. }
  601. request.onComplete(headers)
  602. client[kQueue][client[kRunningIdx]++] = null
  603. if (socket[kWriting]) {
  604. assert(client[kRunning] === 0)
  605. // Response completed before request.
  606. util.destroy(socket, new InformationalError('reset'))
  607. return constants.ERROR.PAUSED
  608. } else if (!shouldKeepAlive) {
  609. util.destroy(socket, new InformationalError('reset'))
  610. return constants.ERROR.PAUSED
  611. } else if (socket[kReset] && client[kRunning] === 0) {
  612. // Destroy socket once all requests have completed.
  613. // The request at the tail of the pipeline is the one
  614. // that requested reset and no further requests should
  615. // have been queued since then.
  616. util.destroy(socket, new InformationalError('reset'))
  617. return constants.ERROR.PAUSED
  618. } else if (client[kPipelining] == null || client[kPipelining] === 1) {
  619. // We must wait a full event loop cycle to reuse this socket to make sure
  620. // that non-spec compliant servers are not closing the connection even if they
  621. // said they won't.
  622. setImmediate(client[kResume])
  623. } else {
  624. client[kResume]()
  625. }
  626. return 0
  627. }
  628. }
  629. function onParserTimeout (parser) {
  630. const { socket, timeoutType, client, paused } = parser.deref()
  631. /* istanbul ignore else */
  632. if (timeoutType === TIMEOUT_HEADERS) {
  633. if (!socket[kWriting] || socket.writableNeedDrain || client[kRunning] > 1) {
  634. assert(!paused, 'cannot be paused while waiting for headers')
  635. util.destroy(socket, new HeadersTimeoutError())
  636. }
  637. } else if (timeoutType === TIMEOUT_BODY) {
  638. if (!paused) {
  639. util.destroy(socket, new BodyTimeoutError())
  640. }
  641. } else if (timeoutType === TIMEOUT_KEEP_ALIVE) {
  642. assert(client[kRunning] === 0 && client[kKeepAliveTimeoutValue])
  643. util.destroy(socket, new InformationalError('socket idle timeout'))
  644. }
  645. }
  646. /**
  647. * @param {import ('./client.js')} client
  648. * @param {import('net').Socket} socket
  649. * @returns
  650. */
  651. async function connectH1 (client, socket) {
  652. client[kSocket] = socket
  653. if (!llhttpInstance) {
  654. llhttpInstance = lazyllhttp()
  655. }
  656. if (socket.errored) {
  657. throw socket.errored
  658. }
  659. if (socket.destroyed) {
  660. throw new SocketError('destroyed')
  661. }
  662. socket[kNoRef] = false
  663. socket[kWriting] = false
  664. socket[kReset] = false
  665. socket[kBlocking] = false
  666. socket[kParser] = new Parser(client, socket, llhttpInstance)
  667. util.addListener(socket, 'error', onHttpSocketError)
  668. util.addListener(socket, 'readable', onHttpSocketReadable)
  669. util.addListener(socket, 'end', onHttpSocketEnd)
  670. util.addListener(socket, 'close', onHttpSocketClose)
  671. socket[kClosed] = false
  672. socket.on('close', onSocketClose)
  673. return {
  674. version: 'h1',
  675. defaultPipelining: 1,
  676. write (request) {
  677. return writeH1(client, request)
  678. },
  679. resume () {
  680. resumeH1(client)
  681. },
  682. /**
  683. * @param {Error|undefined} err
  684. * @param {() => void} callback
  685. */
  686. destroy (err, callback) {
  687. if (socket[kClosed]) {
  688. queueMicrotask(callback)
  689. } else {
  690. socket.on('close', callback)
  691. socket.destroy(err)
  692. }
  693. },
  694. /**
  695. * @returns {boolean}
  696. */
  697. get destroyed () {
  698. return socket.destroyed
  699. },
  700. /**
  701. * @param {import('../core/request.js')} request
  702. * @returns {boolean}
  703. */
  704. busy (request) {
  705. if (socket[kWriting] || socket[kReset] || socket[kBlocking]) {
  706. return true
  707. }
  708. if (request) {
  709. if (client[kRunning] > 0 && !request.idempotent) {
  710. // Non-idempotent request cannot be retried.
  711. // Ensure that no other requests are inflight and
  712. // could cause failure.
  713. return true
  714. }
  715. if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
  716. // Don't dispatch an upgrade until all preceding requests have completed.
  717. // A misbehaving server might upgrade the connection before all pipelined
  718. // request has completed.
  719. return true
  720. }
  721. if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
  722. (util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) {
  723. // Request with stream or iterator body can error while other requests
  724. // are inflight and indirectly error those as well.
  725. // Ensure this doesn't happen by waiting for inflight
  726. // to complete before dispatching.
  727. // Request with stream or iterator body cannot be retried.
  728. // Ensure that no other requests are inflight and
  729. // could cause failure.
  730. return true
  731. }
  732. }
  733. return false
  734. }
  735. }
  736. }
  737. function onHttpSocketError (err) {
  738. assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
  739. const parser = this[kParser]
  740. // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
  741. // to the user.
  742. if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
  743. // We treat all incoming data so for as a valid response.
  744. parser.onMessageComplete()
  745. return
  746. }
  747. this[kError] = err
  748. this[kClient][kOnError](err)
  749. }
  750. function onHttpSocketReadable () {
  751. this[kParser]?.readMore()
  752. }
  753. function onHttpSocketEnd () {
  754. const parser = this[kParser]
  755. if (parser.statusCode && !parser.shouldKeepAlive) {
  756. // We treat all incoming data so far as a valid response.
  757. parser.onMessageComplete()
  758. return
  759. }
  760. util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
  761. }
  762. function onHttpSocketClose () {
  763. const parser = this[kParser]
  764. if (parser) {
  765. if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
  766. // We treat all incoming data so far as a valid response.
  767. parser.onMessageComplete()
  768. }
  769. this[kParser].destroy()
  770. this[kParser] = null
  771. }
  772. const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
  773. const client = this[kClient]
  774. client[kSocket] = null
  775. client[kHTTPContext] = null // TODO (fix): This is hacky...
  776. if (client.destroyed) {
  777. assert(client[kPending] === 0)
  778. // Fail entire queue.
  779. const requests = client[kQueue].splice(client[kRunningIdx])
  780. for (let i = 0; i < requests.length; i++) {
  781. const request = requests[i]
  782. util.errorRequest(client, request, err)
  783. }
  784. } else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') {
  785. // Fail head of pipeline.
  786. const request = client[kQueue][client[kRunningIdx]]
  787. client[kQueue][client[kRunningIdx]++] = null
  788. util.errorRequest(client, request, err)
  789. }
  790. client[kPendingIdx] = client[kRunningIdx]
  791. assert(client[kRunning] === 0)
  792. client.emit('disconnect', client[kUrl], [client], err)
  793. client[kResume]()
  794. }
  795. function onSocketClose () {
  796. this[kClosed] = true
  797. }
  798. /**
  799. * @param {import('./client.js')} client
  800. */
  801. function resumeH1 (client) {
  802. const socket = client[kSocket]
  803. if (socket && !socket.destroyed) {
  804. if (client[kSize] === 0) {
  805. if (!socket[kNoRef] && socket.unref) {
  806. socket.unref()
  807. socket[kNoRef] = true
  808. }
  809. } else if (socket[kNoRef] && socket.ref) {
  810. socket.ref()
  811. socket[kNoRef] = false
  812. }
  813. if (client[kSize] === 0) {
  814. if (socket[kParser].timeoutType !== TIMEOUT_KEEP_ALIVE) {
  815. socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_KEEP_ALIVE)
  816. }
  817. } else if (client[kRunning] > 0 && socket[kParser].statusCode < 200) {
  818. if (socket[kParser].timeoutType !== TIMEOUT_HEADERS) {
  819. const request = client[kQueue][client[kRunningIdx]]
  820. const headersTimeout = request.headersTimeout != null
  821. ? request.headersTimeout
  822. : client[kHeadersTimeout]
  823. socket[kParser].setTimeout(headersTimeout, TIMEOUT_HEADERS)
  824. }
  825. }
  826. }
  827. }
  828. // https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
  829. function shouldSendContentLength (method) {
  830. return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
  831. }
  832. /**
  833. * @param {import('./client.js')} client
  834. * @param {import('../core/request.js')} request
  835. * @returns
  836. */
  837. function writeH1 (client, request) {
  838. const { method, path, host, upgrade, blocking, reset } = request
  839. let { body, headers, contentLength } = request
  840. // https://tools.ietf.org/html/rfc7231#section-4.3.1
  841. // https://tools.ietf.org/html/rfc7231#section-4.3.2
  842. // https://tools.ietf.org/html/rfc7231#section-4.3.5
  843. // Sending a payload body on a request that does not
  844. // expect it can cause undefined behavior on some
  845. // servers and corrupt connection state. Do not
  846. // re-use the connection for further requests.
  847. const expectsPayload = (
  848. method === 'PUT' ||
  849. method === 'POST' ||
  850. method === 'PATCH' ||
  851. method === 'QUERY' ||
  852. method === 'PROPFIND' ||
  853. method === 'PROPPATCH'
  854. )
  855. if (util.isFormDataLike(body)) {
  856. if (!extractBody) {
  857. extractBody = require('../web/fetch/body.js').extractBody
  858. }
  859. const [bodyStream, contentType] = extractBody(body)
  860. if (request.contentType == null) {
  861. headers.push('content-type', contentType)
  862. }
  863. body = bodyStream.stream
  864. contentLength = bodyStream.length
  865. } else if (util.isBlobLike(body) && request.contentType == null && body.type) {
  866. headers.push('content-type', body.type)
  867. }
  868. if (body && typeof body.read === 'function') {
  869. // Try to read EOF in order to get length.
  870. body.read(0)
  871. }
  872. const bodyLength = util.bodyLength(body)
  873. contentLength = bodyLength ?? contentLength
  874. if (contentLength === null) {
  875. contentLength = request.contentLength
  876. }
  877. if (contentLength === 0 && !expectsPayload) {
  878. // https://tools.ietf.org/html/rfc7230#section-3.3.2
  879. // A user agent SHOULD NOT send a Content-Length header field when
  880. // the request message does not contain a payload body and the method
  881. // semantics do not anticipate such a body.
  882. contentLength = null
  883. }
  884. // https://github.com/nodejs/undici/issues/2046
  885. // A user agent may send a Content-Length header with 0 value, this should be allowed.
  886. if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength !== null && request.contentLength !== contentLength) {
  887. if (client[kStrictContentLength]) {
  888. util.errorRequest(client, request, new RequestContentLengthMismatchError())
  889. return false
  890. }
  891. process.emitWarning(new RequestContentLengthMismatchError())
  892. }
  893. const socket = client[kSocket]
  894. /**
  895. * @param {Error} [err]
  896. * @returns {void}
  897. */
  898. const abort = (err) => {
  899. if (request.aborted || request.completed) {
  900. return
  901. }
  902. util.errorRequest(client, request, err || new RequestAbortedError())
  903. util.destroy(body)
  904. util.destroy(socket, new InformationalError('aborted'))
  905. }
  906. try {
  907. request.onConnect(abort)
  908. } catch (err) {
  909. util.errorRequest(client, request, err)
  910. }
  911. if (request.aborted) {
  912. return false
  913. }
  914. if (method === 'HEAD') {
  915. // https://github.com/mcollina/undici/issues/258
  916. // Close after a HEAD request to interop with misbehaving servers
  917. // that may send a body in the response.
  918. socket[kReset] = true
  919. }
  920. if (upgrade || method === 'CONNECT') {
  921. // On CONNECT or upgrade, block pipeline from dispatching further
  922. // requests on this connection.
  923. socket[kReset] = true
  924. }
  925. if (reset != null) {
  926. socket[kReset] = reset
  927. }
  928. if (client[kMaxRequests] && socket[kCounter]++ >= client[kMaxRequests]) {
  929. socket[kReset] = true
  930. }
  931. if (blocking) {
  932. socket[kBlocking] = true
  933. }
  934. let header = `${method} ${path} HTTP/1.1\r\n`
  935. if (typeof host === 'string') {
  936. header += `host: ${host}\r\n`
  937. } else {
  938. header += client[kHostHeader]
  939. }
  940. if (upgrade) {
  941. header += `connection: upgrade\r\nupgrade: ${upgrade}\r\n`
  942. } else if (client[kPipelining] && !socket[kReset]) {
  943. header += 'connection: keep-alive\r\n'
  944. } else {
  945. header += 'connection: close\r\n'
  946. }
  947. if (Array.isArray(headers)) {
  948. for (let n = 0; n < headers.length; n += 2) {
  949. const key = headers[n + 0]
  950. const val = headers[n + 1]
  951. if (Array.isArray(val)) {
  952. for (let i = 0; i < val.length; i++) {
  953. header += `${key}: ${val[i]}\r\n`
  954. }
  955. } else {
  956. header += `${key}: ${val}\r\n`
  957. }
  958. }
  959. }
  960. if (channels.sendHeaders.hasSubscribers) {
  961. channels.sendHeaders.publish({ request, headers: header, socket })
  962. }
  963. /* istanbul ignore else: assertion */
  964. if (!body || bodyLength === 0) {
  965. writeBuffer(abort, null, client, request, socket, contentLength, header, expectsPayload)
  966. } else if (util.isBuffer(body)) {
  967. writeBuffer(abort, body, client, request, socket, contentLength, header, expectsPayload)
  968. } else if (util.isBlobLike(body)) {
  969. if (typeof body.stream === 'function') {
  970. writeIterable(abort, body.stream(), client, request, socket, contentLength, header, expectsPayload)
  971. } else {
  972. writeBlob(abort, body, client, request, socket, contentLength, header, expectsPayload)
  973. }
  974. } else if (util.isStream(body)) {
  975. writeStream(abort, body, client, request, socket, contentLength, header, expectsPayload)
  976. } else if (util.isIterable(body)) {
  977. writeIterable(abort, body, client, request, socket, contentLength, header, expectsPayload)
  978. } else {
  979. assert(false)
  980. }
  981. return true
  982. }
  983. /**
  984. * @param {AbortCallback} abort
  985. * @param {import('stream').Stream} body
  986. * @param {import('./client.js')} client
  987. * @param {import('../core/request.js')} request
  988. * @param {import('net').Socket} socket
  989. * @param {number} contentLength
  990. * @param {string} header
  991. * @param {boolean} expectsPayload
  992. */
  993. function writeStream (abort, body, client, request, socket, contentLength, header, expectsPayload) {
  994. assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')
  995. let finished = false
  996. const writer = new AsyncWriter({ abort, socket, request, contentLength, client, expectsPayload, header })
  997. /**
  998. * @param {Buffer} chunk
  999. * @returns {void}
  1000. */
  1001. const onData = function (chunk) {
  1002. if (finished) {
  1003. return
  1004. }
  1005. try {
  1006. if (!writer.write(chunk) && this.pause) {
  1007. this.pause()
  1008. }
  1009. } catch (err) {
  1010. util.destroy(this, err)
  1011. }
  1012. }
  1013. /**
  1014. * @returns {void}
  1015. */
  1016. const onDrain = function () {
  1017. if (finished) {
  1018. return
  1019. }
  1020. if (body.resume) {
  1021. body.resume()
  1022. }
  1023. }
  1024. /**
  1025. * @returns {void}
  1026. */
  1027. const onClose = function () {
  1028. // 'close' might be emitted *before* 'error' for
  1029. // broken streams. Wait a tick to avoid this case.
  1030. queueMicrotask(() => {
  1031. // It's only safe to remove 'error' listener after
  1032. // 'close'.
  1033. body.removeListener('error', onFinished)
  1034. })
  1035. if (!finished) {
  1036. const err = new RequestAbortedError()
  1037. queueMicrotask(() => onFinished(err))
  1038. }
  1039. }
  1040. /**
  1041. * @param {Error} [err]
  1042. * @returns
  1043. */
  1044. const onFinished = function (err) {
  1045. if (finished) {
  1046. return
  1047. }
  1048. finished = true
  1049. assert(socket.destroyed || (socket[kWriting] && client[kRunning] <= 1))
  1050. socket
  1051. .off('drain', onDrain)
  1052. .off('error', onFinished)
  1053. body
  1054. .removeListener('data', onData)
  1055. .removeListener('end', onFinished)
  1056. .removeListener('close', onClose)
  1057. if (!err) {
  1058. try {
  1059. writer.end()
  1060. } catch (er) {
  1061. err = er
  1062. }
  1063. }
  1064. writer.destroy(err)
  1065. if (err && (err.code !== 'UND_ERR_INFO' || err.message !== 'reset')) {
  1066. util.destroy(body, err)
  1067. } else {
  1068. util.destroy(body)
  1069. }
  1070. }
  1071. body
  1072. .on('data', onData)
  1073. .on('end', onFinished)
  1074. .on('error', onFinished)
  1075. .on('close', onClose)
  1076. if (body.resume) {
  1077. body.resume()
  1078. }
  1079. socket
  1080. .on('drain', onDrain)
  1081. .on('error', onFinished)
  1082. if (body.errorEmitted ?? body.errored) {
  1083. setImmediate(onFinished, body.errored)
  1084. } else if (body.endEmitted ?? body.readableEnded) {
  1085. setImmediate(onFinished, null)
  1086. }
  1087. if (body.closeEmitted ?? body.closed) {
  1088. setImmediate(onClose)
  1089. }
  1090. }
  1091. /**
  1092. * @typedef AbortCallback
  1093. * @type {Function}
  1094. * @param {Error} [err]
  1095. * @returns {void}
  1096. */
  1097. /**
  1098. * @param {AbortCallback} abort
  1099. * @param {Uint8Array|null} body
  1100. * @param {import('./client.js')} client
  1101. * @param {import('../core/request.js')} request
  1102. * @param {import('net').Socket} socket
  1103. * @param {number} contentLength
  1104. * @param {string} header
  1105. * @param {boolean} expectsPayload
  1106. * @returns {void}
  1107. */
  1108. function writeBuffer (abort, body, client, request, socket, contentLength, header, expectsPayload) {
  1109. try {
  1110. if (!body) {
  1111. if (contentLength === 0) {
  1112. socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
  1113. } else {
  1114. assert(contentLength === null, 'no body must not have content length')
  1115. socket.write(`${header}\r\n`, 'latin1')
  1116. }
  1117. } else if (util.isBuffer(body)) {
  1118. assert(contentLength === body.byteLength, 'buffer body must have content length')
  1119. socket.cork()
  1120. socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
  1121. socket.write(body)
  1122. socket.uncork()
  1123. request.onBodySent(body)
  1124. if (!expectsPayload && request.reset !== false) {
  1125. socket[kReset] = true
  1126. }
  1127. }
  1128. request.onRequestSent()
  1129. client[kResume]()
  1130. } catch (err) {
  1131. abort(err)
  1132. }
  1133. }
  1134. /**
  1135. * @param {AbortCallback} abort
  1136. * @param {Blob} body
  1137. * @param {import('./client.js')} client
  1138. * @param {import('../core/request.js')} request
  1139. * @param {import('net').Socket} socket
  1140. * @param {number} contentLength
  1141. * @param {string} header
  1142. * @param {boolean} expectsPayload
  1143. * @returns {Promise<void>}
  1144. */
  1145. async function writeBlob (abort, body, client, request, socket, contentLength, header, expectsPayload) {
  1146. assert(contentLength === body.size, 'blob body must have content length')
  1147. try {
  1148. if (contentLength != null && contentLength !== body.size) {
  1149. throw new RequestContentLengthMismatchError()
  1150. }
  1151. const buffer = Buffer.from(await body.arrayBuffer())
  1152. socket.cork()
  1153. socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
  1154. socket.write(buffer)
  1155. socket.uncork()
  1156. request.onBodySent(buffer)
  1157. request.onRequestSent()
  1158. if (!expectsPayload && request.reset !== false) {
  1159. socket[kReset] = true
  1160. }
  1161. client[kResume]()
  1162. } catch (err) {
  1163. abort(err)
  1164. }
  1165. }
  1166. /**
  1167. * @param {AbortCallback} abort
  1168. * @param {Iterable} body
  1169. * @param {import('./client.js')} client
  1170. * @param {import('../core/request.js')} request
  1171. * @param {import('net').Socket} socket
  1172. * @param {number} contentLength
  1173. * @param {string} header
  1174. * @param {boolean} expectsPayload
  1175. * @returns {Promise<void>}
  1176. */
  1177. async function writeIterable (abort, body, client, request, socket, contentLength, header, expectsPayload) {
  1178. assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')
  1179. let callback = null
  1180. function onDrain () {
  1181. if (callback) {
  1182. const cb = callback
  1183. callback = null
  1184. cb()
  1185. }
  1186. }
  1187. const waitForDrain = () => new Promise((resolve, reject) => {
  1188. assert(callback === null)
  1189. if (socket[kError]) {
  1190. reject(socket[kError])
  1191. } else {
  1192. callback = resolve
  1193. }
  1194. })
  1195. socket
  1196. .on('close', onDrain)
  1197. .on('drain', onDrain)
  1198. const writer = new AsyncWriter({ abort, socket, request, contentLength, client, expectsPayload, header })
  1199. try {
  1200. // It's up to the user to somehow abort the async iterable.
  1201. for await (const chunk of body) {
  1202. if (socket[kError]) {
  1203. throw socket[kError]
  1204. }
  1205. if (!writer.write(chunk)) {
  1206. await waitForDrain()
  1207. }
  1208. }
  1209. writer.end()
  1210. } catch (err) {
  1211. writer.destroy(err)
  1212. } finally {
  1213. socket
  1214. .off('close', onDrain)
  1215. .off('drain', onDrain)
  1216. }
  1217. }
  1218. class AsyncWriter {
  1219. /**
  1220. *
  1221. * @param {object} arg
  1222. * @param {AbortCallback} arg.abort
  1223. * @param {import('net').Socket} arg.socket
  1224. * @param {import('../core/request.js')} arg.request
  1225. * @param {number} arg.contentLength
  1226. * @param {import('./client.js')} arg.client
  1227. * @param {boolean} arg.expectsPayload
  1228. * @param {string} arg.header
  1229. */
  1230. constructor ({ abort, socket, request, contentLength, client, expectsPayload, header }) {
  1231. this.socket = socket
  1232. this.request = request
  1233. this.contentLength = contentLength
  1234. this.client = client
  1235. this.bytesWritten = 0
  1236. this.expectsPayload = expectsPayload
  1237. this.header = header
  1238. this.abort = abort
  1239. socket[kWriting] = true
  1240. }
  1241. /**
  1242. * @param {Buffer} chunk
  1243. * @returns
  1244. */
  1245. write (chunk) {
  1246. const { socket, request, contentLength, client, bytesWritten, expectsPayload, header } = this
  1247. if (socket[kError]) {
  1248. throw socket[kError]
  1249. }
  1250. if (socket.destroyed) {
  1251. return false
  1252. }
  1253. const len = Buffer.byteLength(chunk)
  1254. if (!len) {
  1255. return true
  1256. }
  1257. // We should defer writing chunks.
  1258. if (contentLength !== null && bytesWritten + len > contentLength) {
  1259. if (client[kStrictContentLength]) {
  1260. throw new RequestContentLengthMismatchError()
  1261. }
  1262. process.emitWarning(new RequestContentLengthMismatchError())
  1263. }
  1264. socket.cork()
  1265. if (bytesWritten === 0) {
  1266. if (!expectsPayload && request.reset !== false) {
  1267. socket[kReset] = true
  1268. }
  1269. if (contentLength === null) {
  1270. socket.write(`${header}transfer-encoding: chunked\r\n`, 'latin1')
  1271. } else {
  1272. socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
  1273. }
  1274. }
  1275. if (contentLength === null) {
  1276. socket.write(`\r\n${len.toString(16)}\r\n`, 'latin1')
  1277. }
  1278. this.bytesWritten += len
  1279. const ret = socket.write(chunk)
  1280. socket.uncork()
  1281. request.onBodySent(chunk)
  1282. if (!ret) {
  1283. if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
  1284. // istanbul ignore else: only for jest
  1285. if (socket[kParser].timeout.refresh) {
  1286. socket[kParser].timeout.refresh()
  1287. }
  1288. }
  1289. }
  1290. return ret
  1291. }
  1292. /**
  1293. * @returns {void}
  1294. */
  1295. end () {
  1296. const { socket, contentLength, client, bytesWritten, expectsPayload, header, request } = this
  1297. request.onRequestSent()
  1298. socket[kWriting] = false
  1299. if (socket[kError]) {
  1300. throw socket[kError]
  1301. }
  1302. if (socket.destroyed) {
  1303. return
  1304. }
  1305. if (bytesWritten === 0) {
  1306. if (expectsPayload) {
  1307. // https://tools.ietf.org/html/rfc7230#section-3.3.2
  1308. // A user agent SHOULD send a Content-Length in a request message when
  1309. // no Transfer-Encoding is sent and the request method defines a meaning
  1310. // for an enclosed payload body.
  1311. socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
  1312. } else {
  1313. socket.write(`${header}\r\n`, 'latin1')
  1314. }
  1315. } else if (contentLength === null) {
  1316. socket.write('\r\n0\r\n\r\n', 'latin1')
  1317. }
  1318. if (contentLength !== null && bytesWritten !== contentLength) {
  1319. if (client[kStrictContentLength]) {
  1320. throw new RequestContentLengthMismatchError()
  1321. } else {
  1322. process.emitWarning(new RequestContentLengthMismatchError())
  1323. }
  1324. }
  1325. if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
  1326. // istanbul ignore else: only for jest
  1327. if (socket[kParser].timeout.refresh) {
  1328. socket[kParser].timeout.refresh()
  1329. }
  1330. }
  1331. client[kResume]()
  1332. }
  1333. /**
  1334. * @param {Error} [err]
  1335. * @returns {void}
  1336. */
  1337. destroy (err) {
  1338. const { socket, client, abort } = this
  1339. socket[kWriting] = false
  1340. if (err) {
  1341. assert(client[kRunning] <= 1, 'pipeline should only contain this request')
  1342. abort(err)
  1343. }
  1344. }
  1345. }
  1346. module.exports = connectH1