client.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. 'use strict'
  2. const assert = require('node:assert')
  3. const net = require('node:net')
  4. const http = require('node:http')
  5. const util = require('../core/util.js')
  6. const { ClientStats } = require('../util/stats.js')
  7. const { channels } = require('../core/diagnostics.js')
  8. const Request = require('../core/request.js')
  9. const DispatcherBase = require('./dispatcher-base')
  10. const {
  11. InvalidArgumentError,
  12. InformationalError,
  13. ClientDestroyedError
  14. } = require('../core/errors.js')
  15. const buildConnector = require('../core/connect.js')
  16. const {
  17. kUrl,
  18. kServerName,
  19. kClient,
  20. kBusy,
  21. kConnect,
  22. kResuming,
  23. kRunning,
  24. kPending,
  25. kSize,
  26. kQueue,
  27. kConnected,
  28. kConnecting,
  29. kNeedDrain,
  30. kKeepAliveDefaultTimeout,
  31. kHostHeader,
  32. kPendingIdx,
  33. kRunningIdx,
  34. kError,
  35. kPipelining,
  36. kKeepAliveTimeoutValue,
  37. kMaxHeadersSize,
  38. kKeepAliveMaxTimeout,
  39. kKeepAliveTimeoutThreshold,
  40. kHeadersTimeout,
  41. kBodyTimeout,
  42. kStrictContentLength,
  43. kConnector,
  44. kMaxRequests,
  45. kCounter,
  46. kClose,
  47. kDestroy,
  48. kDispatch,
  49. kLocalAddress,
  50. kMaxResponseSize,
  51. kOnError,
  52. kHTTPContext,
  53. kMaxConcurrentStreams,
  54. kResume
  55. } = require('../core/symbols.js')
  56. const connectH1 = require('./client-h1.js')
  57. const connectH2 = require('./client-h2.js')
  58. const kClosedResolve = Symbol('kClosedResolve')
  59. const getDefaultNodeMaxHeaderSize = http &&
  60. http.maxHeaderSize &&
  61. Number.isInteger(http.maxHeaderSize) &&
  62. http.maxHeaderSize > 0
  63. ? () => http.maxHeaderSize
  64. : () => { throw new InvalidArgumentError('http module not available or http.maxHeaderSize invalid') }
  65. const noop = () => {}
  66. function getPipelining (client) {
  67. return client[kPipelining] ?? client[kHTTPContext]?.defaultPipelining ?? 1
  68. }
  69. /**
  70. * @type {import('../../types/client.js').default}
  71. */
  72. class Client extends DispatcherBase {
  73. /**
  74. *
  75. * @param {string|URL} url
  76. * @param {import('../../types/client.js').Client.Options} options
  77. */
  78. constructor (url, {
  79. maxHeaderSize,
  80. headersTimeout,
  81. socketTimeout,
  82. requestTimeout,
  83. connectTimeout,
  84. bodyTimeout,
  85. idleTimeout,
  86. keepAlive,
  87. keepAliveTimeout,
  88. maxKeepAliveTimeout,
  89. keepAliveMaxTimeout,
  90. keepAliveTimeoutThreshold,
  91. socketPath,
  92. pipelining,
  93. tls,
  94. strictContentLength,
  95. maxCachedSessions,
  96. connect,
  97. maxRequestsPerClient,
  98. localAddress,
  99. maxResponseSize,
  100. autoSelectFamily,
  101. autoSelectFamilyAttemptTimeout,
  102. // h2
  103. maxConcurrentStreams,
  104. allowH2
  105. } = {}) {
  106. if (keepAlive !== undefined) {
  107. throw new InvalidArgumentError('unsupported keepAlive, use pipelining=0 instead')
  108. }
  109. if (socketTimeout !== undefined) {
  110. throw new InvalidArgumentError('unsupported socketTimeout, use headersTimeout & bodyTimeout instead')
  111. }
  112. if (requestTimeout !== undefined) {
  113. throw new InvalidArgumentError('unsupported requestTimeout, use headersTimeout & bodyTimeout instead')
  114. }
  115. if (idleTimeout !== undefined) {
  116. throw new InvalidArgumentError('unsupported idleTimeout, use keepAliveTimeout instead')
  117. }
  118. if (maxKeepAliveTimeout !== undefined) {
  119. throw new InvalidArgumentError('unsupported maxKeepAliveTimeout, use keepAliveMaxTimeout instead')
  120. }
  121. if (maxHeaderSize != null) {
  122. if (!Number.isInteger(maxHeaderSize) || maxHeaderSize < 1) {
  123. throw new InvalidArgumentError('invalid maxHeaderSize')
  124. }
  125. } else {
  126. // If maxHeaderSize is not provided, use the default value from the http module
  127. // or if that is not available, throw an error.
  128. maxHeaderSize = getDefaultNodeMaxHeaderSize()
  129. }
  130. if (socketPath != null && typeof socketPath !== 'string') {
  131. throw new InvalidArgumentError('invalid socketPath')
  132. }
  133. if (connectTimeout != null && (!Number.isFinite(connectTimeout) || connectTimeout < 0)) {
  134. throw new InvalidArgumentError('invalid connectTimeout')
  135. }
  136. if (keepAliveTimeout != null && (!Number.isFinite(keepAliveTimeout) || keepAliveTimeout <= 0)) {
  137. throw new InvalidArgumentError('invalid keepAliveTimeout')
  138. }
  139. if (keepAliveMaxTimeout != null && (!Number.isFinite(keepAliveMaxTimeout) || keepAliveMaxTimeout <= 0)) {
  140. throw new InvalidArgumentError('invalid keepAliveMaxTimeout')
  141. }
  142. if (keepAliveTimeoutThreshold != null && !Number.isFinite(keepAliveTimeoutThreshold)) {
  143. throw new InvalidArgumentError('invalid keepAliveTimeoutThreshold')
  144. }
  145. if (headersTimeout != null && (!Number.isInteger(headersTimeout) || headersTimeout < 0)) {
  146. throw new InvalidArgumentError('headersTimeout must be a positive integer or zero')
  147. }
  148. if (bodyTimeout != null && (!Number.isInteger(bodyTimeout) || bodyTimeout < 0)) {
  149. throw new InvalidArgumentError('bodyTimeout must be a positive integer or zero')
  150. }
  151. if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
  152. throw new InvalidArgumentError('connect must be a function or an object')
  153. }
  154. if (maxRequestsPerClient != null && (!Number.isInteger(maxRequestsPerClient) || maxRequestsPerClient < 0)) {
  155. throw new InvalidArgumentError('maxRequestsPerClient must be a positive number')
  156. }
  157. if (localAddress != null && (typeof localAddress !== 'string' || net.isIP(localAddress) === 0)) {
  158. throw new InvalidArgumentError('localAddress must be valid string IP address')
  159. }
  160. if (maxResponseSize != null && (!Number.isInteger(maxResponseSize) || maxResponseSize < -1)) {
  161. throw new InvalidArgumentError('maxResponseSize must be a positive number')
  162. }
  163. if (
  164. autoSelectFamilyAttemptTimeout != null &&
  165. (!Number.isInteger(autoSelectFamilyAttemptTimeout) || autoSelectFamilyAttemptTimeout < -1)
  166. ) {
  167. throw new InvalidArgumentError('autoSelectFamilyAttemptTimeout must be a positive number')
  168. }
  169. // h2
  170. if (allowH2 != null && typeof allowH2 !== 'boolean') {
  171. throw new InvalidArgumentError('allowH2 must be a valid boolean value')
  172. }
  173. if (maxConcurrentStreams != null && (typeof maxConcurrentStreams !== 'number' || maxConcurrentStreams < 1)) {
  174. throw new InvalidArgumentError('maxConcurrentStreams must be a positive integer, greater than 0')
  175. }
  176. super()
  177. if (typeof connect !== 'function') {
  178. connect = buildConnector({
  179. ...tls,
  180. maxCachedSessions,
  181. allowH2,
  182. socketPath,
  183. timeout: connectTimeout,
  184. ...(typeof autoSelectFamily === 'boolean' ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined),
  185. ...connect
  186. })
  187. }
  188. this[kUrl] = util.parseOrigin(url)
  189. this[kConnector] = connect
  190. this[kPipelining] = pipelining != null ? pipelining : 1
  191. this[kMaxHeadersSize] = maxHeaderSize
  192. this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout
  193. this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout
  194. this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 2e3 : keepAliveTimeoutThreshold
  195. this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout]
  196. this[kServerName] = null
  197. this[kLocalAddress] = localAddress != null ? localAddress : null
  198. this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming
  199. this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming
  200. this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n`
  201. this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 300e3
  202. this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 300e3
  203. this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength
  204. this[kMaxRequests] = maxRequestsPerClient
  205. this[kClosedResolve] = null
  206. this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1
  207. this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server
  208. this[kHTTPContext] = null
  209. // kQueue is built up of 3 sections separated by
  210. // the kRunningIdx and kPendingIdx indices.
  211. // | complete | running | pending |
  212. // ^ kRunningIdx ^ kPendingIdx ^ kQueue.length
  213. // kRunningIdx points to the first running element.
  214. // kPendingIdx points to the first pending element.
  215. // This implements a fast queue with an amortized
  216. // time of O(1).
  217. this[kQueue] = []
  218. this[kRunningIdx] = 0
  219. this[kPendingIdx] = 0
  220. this[kResume] = (sync) => resume(this, sync)
  221. this[kOnError] = (err) => onError(this, err)
  222. }
  223. get pipelining () {
  224. return this[kPipelining]
  225. }
  226. set pipelining (value) {
  227. this[kPipelining] = value
  228. this[kResume](true)
  229. }
  230. get stats () {
  231. return new ClientStats(this)
  232. }
  233. get [kPending] () {
  234. return this[kQueue].length - this[kPendingIdx]
  235. }
  236. get [kRunning] () {
  237. return this[kPendingIdx] - this[kRunningIdx]
  238. }
  239. get [kSize] () {
  240. return this[kQueue].length - this[kRunningIdx]
  241. }
  242. get [kConnected] () {
  243. return !!this[kHTTPContext] && !this[kConnecting] && !this[kHTTPContext].destroyed
  244. }
  245. get [kBusy] () {
  246. return Boolean(
  247. this[kHTTPContext]?.busy(null) ||
  248. (this[kSize] >= (getPipelining(this) || 1)) ||
  249. this[kPending] > 0
  250. )
  251. }
  252. /* istanbul ignore: only used for test */
  253. [kConnect] (cb) {
  254. connect(this)
  255. this.once('connect', cb)
  256. }
  257. [kDispatch] (opts, handler) {
  258. const origin = opts.origin || this[kUrl].origin
  259. const request = new Request(origin, opts, handler)
  260. this[kQueue].push(request)
  261. if (this[kResuming]) {
  262. // Do nothing.
  263. } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
  264. // Wait a tick in case stream/iterator is ended in the same tick.
  265. this[kResuming] = 1
  266. queueMicrotask(() => resume(this))
  267. } else {
  268. this[kResume](true)
  269. }
  270. if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) {
  271. this[kNeedDrain] = 2
  272. }
  273. return this[kNeedDrain] < 2
  274. }
  275. async [kClose] () {
  276. // TODO: for H2 we need to gracefully flush the remaining enqueued
  277. // request and close each stream.
  278. return new Promise((resolve) => {
  279. if (this[kSize]) {
  280. this[kClosedResolve] = resolve
  281. } else {
  282. resolve(null)
  283. }
  284. })
  285. }
  286. async [kDestroy] (err) {
  287. return new Promise((resolve) => {
  288. const requests = this[kQueue].splice(this[kPendingIdx])
  289. for (let i = 0; i < requests.length; i++) {
  290. const request = requests[i]
  291. util.errorRequest(this, request, err)
  292. }
  293. const callback = () => {
  294. if (this[kClosedResolve]) {
  295. // TODO (fix): Should we error here with ClientDestroyedError?
  296. this[kClosedResolve]()
  297. this[kClosedResolve] = null
  298. }
  299. resolve(null)
  300. }
  301. if (this[kHTTPContext]) {
  302. this[kHTTPContext].destroy(err, callback)
  303. this[kHTTPContext] = null
  304. } else {
  305. queueMicrotask(callback)
  306. }
  307. this[kResume]()
  308. })
  309. }
  310. }
  311. function onError (client, err) {
  312. if (
  313. client[kRunning] === 0 &&
  314. err.code !== 'UND_ERR_INFO' &&
  315. err.code !== 'UND_ERR_SOCKET'
  316. ) {
  317. // Error is not caused by running request and not a recoverable
  318. // socket error.
  319. assert(client[kPendingIdx] === client[kRunningIdx])
  320. const requests = client[kQueue].splice(client[kRunningIdx])
  321. for (let i = 0; i < requests.length; i++) {
  322. const request = requests[i]
  323. util.errorRequest(client, request, err)
  324. }
  325. assert(client[kSize] === 0)
  326. }
  327. }
  328. /**
  329. * @param {Client} client
  330. * @returns
  331. */
  332. async function connect (client) {
  333. assert(!client[kConnecting])
  334. assert(!client[kHTTPContext])
  335. let { host, hostname, protocol, port } = client[kUrl]
  336. // Resolve ipv6
  337. if (hostname[0] === '[') {
  338. const idx = hostname.indexOf(']')
  339. assert(idx !== -1)
  340. const ip = hostname.substring(1, idx)
  341. assert(net.isIPv6(ip))
  342. hostname = ip
  343. }
  344. client[kConnecting] = true
  345. if (channels.beforeConnect.hasSubscribers) {
  346. channels.beforeConnect.publish({
  347. connectParams: {
  348. host,
  349. hostname,
  350. protocol,
  351. port,
  352. version: client[kHTTPContext]?.version,
  353. servername: client[kServerName],
  354. localAddress: client[kLocalAddress]
  355. },
  356. connector: client[kConnector]
  357. })
  358. }
  359. try {
  360. const socket = await new Promise((resolve, reject) => {
  361. client[kConnector]({
  362. host,
  363. hostname,
  364. protocol,
  365. port,
  366. servername: client[kServerName],
  367. localAddress: client[kLocalAddress]
  368. }, (err, socket) => {
  369. if (err) {
  370. reject(err)
  371. } else {
  372. resolve(socket)
  373. }
  374. })
  375. })
  376. if (client.destroyed) {
  377. util.destroy(socket.on('error', noop), new ClientDestroyedError())
  378. return
  379. }
  380. assert(socket)
  381. try {
  382. client[kHTTPContext] = socket.alpnProtocol === 'h2'
  383. ? await connectH2(client, socket)
  384. : await connectH1(client, socket)
  385. } catch (err) {
  386. socket.destroy().on('error', noop)
  387. throw err
  388. }
  389. client[kConnecting] = false
  390. socket[kCounter] = 0
  391. socket[kMaxRequests] = client[kMaxRequests]
  392. socket[kClient] = client
  393. socket[kError] = null
  394. if (channels.connected.hasSubscribers) {
  395. channels.connected.publish({
  396. connectParams: {
  397. host,
  398. hostname,
  399. protocol,
  400. port,
  401. version: client[kHTTPContext]?.version,
  402. servername: client[kServerName],
  403. localAddress: client[kLocalAddress]
  404. },
  405. connector: client[kConnector],
  406. socket
  407. })
  408. }
  409. client.emit('connect', client[kUrl], [client])
  410. } catch (err) {
  411. if (client.destroyed) {
  412. return
  413. }
  414. client[kConnecting] = false
  415. if (channels.connectError.hasSubscribers) {
  416. channels.connectError.publish({
  417. connectParams: {
  418. host,
  419. hostname,
  420. protocol,
  421. port,
  422. version: client[kHTTPContext]?.version,
  423. servername: client[kServerName],
  424. localAddress: client[kLocalAddress]
  425. },
  426. connector: client[kConnector],
  427. error: err
  428. })
  429. }
  430. if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') {
  431. assert(client[kRunning] === 0)
  432. while (client[kPending] > 0 && client[kQueue][client[kPendingIdx]].servername === client[kServerName]) {
  433. const request = client[kQueue][client[kPendingIdx]++]
  434. util.errorRequest(client, request, err)
  435. }
  436. } else {
  437. onError(client, err)
  438. }
  439. client.emit('connectionError', client[kUrl], [client], err)
  440. }
  441. client[kResume]()
  442. }
  443. function emitDrain (client) {
  444. client[kNeedDrain] = 0
  445. client.emit('drain', client[kUrl], [client])
  446. }
  447. function resume (client, sync) {
  448. if (client[kResuming] === 2) {
  449. return
  450. }
  451. client[kResuming] = 2
  452. _resume(client, sync)
  453. client[kResuming] = 0
  454. if (client[kRunningIdx] > 256) {
  455. client[kQueue].splice(0, client[kRunningIdx])
  456. client[kPendingIdx] -= client[kRunningIdx]
  457. client[kRunningIdx] = 0
  458. }
  459. }
  460. function _resume (client, sync) {
  461. while (true) {
  462. if (client.destroyed) {
  463. assert(client[kPending] === 0)
  464. return
  465. }
  466. if (client[kClosedResolve] && !client[kSize]) {
  467. client[kClosedResolve]()
  468. client[kClosedResolve] = null
  469. return
  470. }
  471. if (client[kHTTPContext]) {
  472. client[kHTTPContext].resume()
  473. }
  474. if (client[kBusy]) {
  475. client[kNeedDrain] = 2
  476. } else if (client[kNeedDrain] === 2) {
  477. if (sync) {
  478. client[kNeedDrain] = 1
  479. queueMicrotask(() => emitDrain(client))
  480. } else {
  481. emitDrain(client)
  482. }
  483. continue
  484. }
  485. if (client[kPending] === 0) {
  486. return
  487. }
  488. if (client[kRunning] >= (getPipelining(client) || 1)) {
  489. return
  490. }
  491. const request = client[kQueue][client[kPendingIdx]]
  492. if (client[kUrl].protocol === 'https:' && client[kServerName] !== request.servername) {
  493. if (client[kRunning] > 0) {
  494. return
  495. }
  496. client[kServerName] = request.servername
  497. client[kHTTPContext]?.destroy(new InformationalError('servername changed'), () => {
  498. client[kHTTPContext] = null
  499. resume(client)
  500. })
  501. }
  502. if (client[kConnecting]) {
  503. return
  504. }
  505. if (!client[kHTTPContext]) {
  506. connect(client)
  507. return
  508. }
  509. if (client[kHTTPContext].destroyed) {
  510. return
  511. }
  512. if (client[kHTTPContext].busy(request)) {
  513. return
  514. }
  515. if (!request.aborted && client[kHTTPContext].write(request)) {
  516. client[kPendingIdx]++
  517. } else {
  518. client[kQueue].splice(client[kPendingIdx], 1)
  519. }
  520. }
  521. }
  522. module.exports = Client