agent.js 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. 'use strict'
  2. const { InvalidArgumentError } = require('../core/errors')
  3. const { kClients, kRunning, kClose, kDestroy, kDispatch, kUrl } = require('../core/symbols')
  4. const DispatcherBase = require('./dispatcher-base')
  5. const Pool = require('./pool')
  6. const Client = require('./client')
  7. const util = require('../core/util')
  8. const kOnConnect = Symbol('onConnect')
  9. const kOnDisconnect = Symbol('onDisconnect')
  10. const kOnConnectionError = Symbol('onConnectionError')
  11. const kOnDrain = Symbol('onDrain')
  12. const kFactory = Symbol('factory')
  13. const kOptions = Symbol('options')
  14. function defaultFactory (origin, opts) {
  15. return opts && opts.connections === 1
  16. ? new Client(origin, opts)
  17. : new Pool(origin, opts)
  18. }
  19. class Agent extends DispatcherBase {
  20. constructor ({ factory = defaultFactory, connect, ...options } = {}) {
  21. if (typeof factory !== 'function') {
  22. throw new InvalidArgumentError('factory must be a function.')
  23. }
  24. if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
  25. throw new InvalidArgumentError('connect must be a function or an object')
  26. }
  27. super()
  28. if (connect && typeof connect !== 'function') {
  29. connect = { ...connect }
  30. }
  31. this[kOptions] = { ...util.deepClone(options), connect }
  32. this[kFactory] = factory
  33. this[kClients] = new Map()
  34. this[kOnDrain] = (origin, targets) => {
  35. this.emit('drain', origin, [this, ...targets])
  36. }
  37. this[kOnConnect] = (origin, targets) => {
  38. const result = this[kClients].get(origin)
  39. if (result) {
  40. result.count += 1
  41. }
  42. this.emit('connect', origin, [this, ...targets])
  43. }
  44. this[kOnDisconnect] = (origin, targets, err) => {
  45. const result = this[kClients].get(origin)
  46. if (result) {
  47. result.count -= 1
  48. if (result.count <= 0) {
  49. this[kClients].delete(origin)
  50. result.dispatcher.destroy()
  51. }
  52. }
  53. this.emit('disconnect', origin, [this, ...targets], err)
  54. }
  55. this[kOnConnectionError] = (origin, targets, err) => {
  56. // TODO: should this decrement result.count here?
  57. this.emit('connectionError', origin, [this, ...targets], err)
  58. }
  59. }
  60. get [kRunning] () {
  61. let ret = 0
  62. for (const { dispatcher } of this[kClients].values()) {
  63. ret += dispatcher[kRunning]
  64. }
  65. return ret
  66. }
  67. [kDispatch] (opts, handler) {
  68. let key
  69. if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
  70. key = String(opts.origin)
  71. } else {
  72. throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.')
  73. }
  74. const result = this[kClients].get(key)
  75. let dispatcher = result && result.dispatcher
  76. if (!dispatcher) {
  77. dispatcher = this[kFactory](opts.origin, this[kOptions])
  78. .on('drain', this[kOnDrain])
  79. .on('connect', this[kOnConnect])
  80. .on('disconnect', this[kOnDisconnect])
  81. .on('connectionError', this[kOnConnectionError])
  82. this[kClients].set(key, { count: 0, dispatcher })
  83. }
  84. return dispatcher.dispatch(opts, handler)
  85. }
  86. async [kClose] () {
  87. const closePromises = []
  88. for (const { dispatcher } of this[kClients].values()) {
  89. closePromises.push(dispatcher.close())
  90. }
  91. this[kClients].clear()
  92. await Promise.all(closePromises)
  93. }
  94. async [kDestroy] (err) {
  95. const destroyPromises = []
  96. for (const { dispatcher } of this[kClients].values()) {
  97. destroyPromises.push(dispatcher.destroy(err))
  98. }
  99. this[kClients].clear()
  100. await Promise.all(destroyPromises)
  101. }
  102. get stats () {
  103. const allClientStats = {}
  104. for (const { dispatcher } of this[kClients].values()) {
  105. if (dispatcher.stats) {
  106. allClientStats[dispatcher[kUrl].origin] = dispatcher.stats
  107. }
  108. }
  109. return allClientStats
  110. }
  111. }
  112. module.exports = Agent