123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- 'use strict'
- const { InvalidArgumentError } = require('../core/errors')
- const { kClients, kRunning, kClose, kDestroy, kDispatch, kUrl } = require('../core/symbols')
- const DispatcherBase = require('./dispatcher-base')
- const Pool = require('./pool')
- const Client = require('./client')
- const util = require('../core/util')
- const kOnConnect = Symbol('onConnect')
- const kOnDisconnect = Symbol('onDisconnect')
- const kOnConnectionError = Symbol('onConnectionError')
- const kOnDrain = Symbol('onDrain')
- const kFactory = Symbol('factory')
- const kOptions = Symbol('options')
- function defaultFactory (origin, opts) {
- return opts && opts.connections === 1
- ? new Client(origin, opts)
- : new Pool(origin, opts)
- }
- class Agent extends DispatcherBase {
- constructor ({ factory = defaultFactory, connect, ...options } = {}) {
- if (typeof factory !== 'function') {
- throw new InvalidArgumentError('factory must be a function.')
- }
- if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
- throw new InvalidArgumentError('connect must be a function or an object')
- }
- super()
- if (connect && typeof connect !== 'function') {
- connect = { ...connect }
- }
- this[kOptions] = { ...util.deepClone(options), connect }
- this[kFactory] = factory
- this[kClients] = new Map()
- this[kOnDrain] = (origin, targets) => {
- this.emit('drain', origin, [this, ...targets])
- }
- this[kOnConnect] = (origin, targets) => {
- const result = this[kClients].get(origin)
- if (result) {
- result.count += 1
- }
- this.emit('connect', origin, [this, ...targets])
- }
- this[kOnDisconnect] = (origin, targets, err) => {
- const result = this[kClients].get(origin)
- if (result) {
- result.count -= 1
- if (result.count <= 0) {
- this[kClients].delete(origin)
- result.dispatcher.destroy()
- }
- }
- this.emit('disconnect', origin, [this, ...targets], err)
- }
- this[kOnConnectionError] = (origin, targets, err) => {
- // TODO: should this decrement result.count here?
- this.emit('connectionError', origin, [this, ...targets], err)
- }
- }
- get [kRunning] () {
- let ret = 0
- for (const { dispatcher } of this[kClients].values()) {
- ret += dispatcher[kRunning]
- }
- return ret
- }
- [kDispatch] (opts, handler) {
- let key
- if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
- key = String(opts.origin)
- } else {
- throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.')
- }
- const result = this[kClients].get(key)
- let dispatcher = result && result.dispatcher
- if (!dispatcher) {
- dispatcher = this[kFactory](opts.origin, this[kOptions])
- .on('drain', this[kOnDrain])
- .on('connect', this[kOnConnect])
- .on('disconnect', this[kOnDisconnect])
- .on('connectionError', this[kOnConnectionError])
- this[kClients].set(key, { count: 0, dispatcher })
- }
- return dispatcher.dispatch(opts, handler)
- }
- async [kClose] () {
- const closePromises = []
- for (const { dispatcher } of this[kClients].values()) {
- closePromises.push(dispatcher.close())
- }
- this[kClients].clear()
- await Promise.all(closePromises)
- }
- async [kDestroy] (err) {
- const destroyPromises = []
- for (const { dispatcher } of this[kClients].values()) {
- destroyPromises.push(dispatcher.destroy(err))
- }
- this[kClients].clear()
- await Promise.all(destroyPromises)
- }
- get stats () {
- const allClientStats = {}
- for (const { dispatcher } of this[kClients].values()) {
- if (dispatcher.stats) {
- allClientStats[dispatcher[kUrl].origin] = dispatcher.stats
- }
- }
- return allClientStats
- }
- }
- module.exports = Agent
|