balanced-pool.js 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. 'use strict'
  2. const {
  3. BalancedPoolMissingUpstreamError,
  4. InvalidArgumentError
  5. } = require('../core/errors')
  6. const {
  7. PoolBase,
  8. kClients,
  9. kNeedDrain,
  10. kAddClient,
  11. kRemoveClient,
  12. kGetDispatcher
  13. } = require('./pool-base')
  14. const Pool = require('./pool')
  15. const { kUrl } = require('../core/symbols')
  16. const { parseOrigin } = require('../core/util')
  17. const kFactory = Symbol('factory')
  18. const kOptions = Symbol('options')
  19. const kGreatestCommonDivisor = Symbol('kGreatestCommonDivisor')
  20. const kCurrentWeight = Symbol('kCurrentWeight')
  21. const kIndex = Symbol('kIndex')
  22. const kWeight = Symbol('kWeight')
  23. const kMaxWeightPerServer = Symbol('kMaxWeightPerServer')
  24. const kErrorPenalty = Symbol('kErrorPenalty')
  25. /**
  26. * Calculate the greatest common divisor of two numbers by
  27. * using the Euclidean algorithm.
  28. *
  29. * @param {number} a
  30. * @param {number} b
  31. * @returns {number}
  32. */
  33. function getGreatestCommonDivisor (a, b) {
  34. if (a === 0) return b
  35. while (b !== 0) {
  36. const t = b
  37. b = a % b
  38. a = t
  39. }
  40. return a
  41. }
  42. function defaultFactory (origin, opts) {
  43. return new Pool(origin, opts)
  44. }
  45. class BalancedPool extends PoolBase {
  46. constructor (upstreams = [], { factory = defaultFactory, ...opts } = {}) {
  47. if (typeof factory !== 'function') {
  48. throw new InvalidArgumentError('factory must be a function.')
  49. }
  50. super()
  51. this[kOptions] = opts
  52. this[kIndex] = -1
  53. this[kCurrentWeight] = 0
  54. this[kMaxWeightPerServer] = this[kOptions].maxWeightPerServer || 100
  55. this[kErrorPenalty] = this[kOptions].errorPenalty || 15
  56. if (!Array.isArray(upstreams)) {
  57. upstreams = [upstreams]
  58. }
  59. this[kFactory] = factory
  60. for (const upstream of upstreams) {
  61. this.addUpstream(upstream)
  62. }
  63. this._updateBalancedPoolStats()
  64. }
  65. addUpstream (upstream) {
  66. const upstreamOrigin = parseOrigin(upstream).origin
  67. if (this[kClients].find((pool) => (
  68. pool[kUrl].origin === upstreamOrigin &&
  69. pool.closed !== true &&
  70. pool.destroyed !== true
  71. ))) {
  72. return this
  73. }
  74. const pool = this[kFactory](upstreamOrigin, Object.assign({}, this[kOptions]))
  75. this[kAddClient](pool)
  76. pool.on('connect', () => {
  77. pool[kWeight] = Math.min(this[kMaxWeightPerServer], pool[kWeight] + this[kErrorPenalty])
  78. })
  79. pool.on('connectionError', () => {
  80. pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty])
  81. this._updateBalancedPoolStats()
  82. })
  83. pool.on('disconnect', (...args) => {
  84. const err = args[2]
  85. if (err && err.code === 'UND_ERR_SOCKET') {
  86. // decrease the weight of the pool.
  87. pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty])
  88. this._updateBalancedPoolStats()
  89. }
  90. })
  91. for (const client of this[kClients]) {
  92. client[kWeight] = this[kMaxWeightPerServer]
  93. }
  94. this._updateBalancedPoolStats()
  95. return this
  96. }
  97. _updateBalancedPoolStats () {
  98. let result = 0
  99. for (let i = 0; i < this[kClients].length; i++) {
  100. result = getGreatestCommonDivisor(this[kClients][i][kWeight], result)
  101. }
  102. this[kGreatestCommonDivisor] = result
  103. }
  104. removeUpstream (upstream) {
  105. const upstreamOrigin = parseOrigin(upstream).origin
  106. const pool = this[kClients].find((pool) => (
  107. pool[kUrl].origin === upstreamOrigin &&
  108. pool.closed !== true &&
  109. pool.destroyed !== true
  110. ))
  111. if (pool) {
  112. this[kRemoveClient](pool)
  113. }
  114. return this
  115. }
  116. get upstreams () {
  117. return this[kClients]
  118. .filter(dispatcher => dispatcher.closed !== true && dispatcher.destroyed !== true)
  119. .map((p) => p[kUrl].origin)
  120. }
  121. [kGetDispatcher] () {
  122. // We validate that pools is greater than 0,
  123. // otherwise we would have to wait until an upstream
  124. // is added, which might never happen.
  125. if (this[kClients].length === 0) {
  126. throw new BalancedPoolMissingUpstreamError()
  127. }
  128. const dispatcher = this[kClients].find(dispatcher => (
  129. !dispatcher[kNeedDrain] &&
  130. dispatcher.closed !== true &&
  131. dispatcher.destroyed !== true
  132. ))
  133. if (!dispatcher) {
  134. return
  135. }
  136. const allClientsBusy = this[kClients].map(pool => pool[kNeedDrain]).reduce((a, b) => a && b, true)
  137. if (allClientsBusy) {
  138. return
  139. }
  140. let counter = 0
  141. let maxWeightIndex = this[kClients].findIndex(pool => !pool[kNeedDrain])
  142. while (counter++ < this[kClients].length) {
  143. this[kIndex] = (this[kIndex] + 1) % this[kClients].length
  144. const pool = this[kClients][this[kIndex]]
  145. // find pool index with the largest weight
  146. if (pool[kWeight] > this[kClients][maxWeightIndex][kWeight] && !pool[kNeedDrain]) {
  147. maxWeightIndex = this[kIndex]
  148. }
  149. // decrease the current weight every `this[kClients].length`.
  150. if (this[kIndex] === 0) {
  151. // Set the current weight to the next lower weight.
  152. this[kCurrentWeight] = this[kCurrentWeight] - this[kGreatestCommonDivisor]
  153. if (this[kCurrentWeight] <= 0) {
  154. this[kCurrentWeight] = this[kMaxWeightPerServer]
  155. }
  156. }
  157. if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) {
  158. return pool
  159. }
  160. }
  161. this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight]
  162. this[kIndex] = maxWeightIndex
  163. return this[kClients][maxWeightIndex]
  164. }
  165. }
  166. module.exports = BalancedPool