1'use strict' 2 3const { 4 BalancedPoolMissingUpstreamError, 5 InvalidArgumentError 6} = require('./core/errors') 7const { 8 PoolBase, 9 kClients, 10 kNeedDrain, 11 kAddClient, 12 kRemoveClient, 13 kGetDispatcher 14} = require('./pool-base') 15const Pool = require('./pool') 16const { kUrl, kInterceptors } = require('./core/symbols') 17const { parseOrigin } = require('./core/util') 18const kFactory = Symbol('factory') 19 20const kOptions = Symbol('options') 21const kGreatestCommonDivisor = Symbol('kGreatestCommonDivisor') 22const kCurrentWeight = Symbol('kCurrentWeight') 23const kIndex = Symbol('kIndex') 24const kWeight = Symbol('kWeight') 25const kMaxWeightPerServer = Symbol('kMaxWeightPerServer') 26const kErrorPenalty = Symbol('kErrorPenalty') 27 28function getGreatestCommonDivisor (a, b) { 29 if (b === 0) return a 30 return getGreatestCommonDivisor(b, a % b) 31} 32 33function defaultFactory (origin, opts) { 34 return new Pool(origin, opts) 35} 36 37class BalancedPool extends PoolBase { 38 constructor (upstreams = [], { factory = defaultFactory, ...opts } = {}) { 39 super() 40 41 this[kOptions] = opts 42 this[kIndex] = -1 43 this[kCurrentWeight] = 0 44 45 this[kMaxWeightPerServer] = this[kOptions].maxWeightPerServer || 100 46 this[kErrorPenalty] = this[kOptions].errorPenalty || 15 47 48 if (!Array.isArray(upstreams)) { 49 upstreams = [upstreams] 50 } 51 52 if (typeof factory !== 'function') { 53 throw new InvalidArgumentError('factory must be a function.') 54 } 55 56 this[kInterceptors] = opts.interceptors && opts.interceptors.BalancedPool && Array.isArray(opts.interceptors.BalancedPool) 57 ? opts.interceptors.BalancedPool 58 : [] 59 this[kFactory] = factory 60 61 for (const upstream of upstreams) { 62 this.addUpstream(upstream) 63 } 64 this._updateBalancedPoolStats() 65 } 66 67 addUpstream (upstream) { 68 const upstreamOrigin = parseOrigin(upstream).origin 69 70 if (this[kClients].find((pool) => ( 71 pool[kUrl].origin === upstreamOrigin && 72 pool.closed !== true && 73 pool.destroyed !== true 74 ))) { 75 return this 76 } 77 const pool = this[kFactory](upstreamOrigin, Object.assign({}, this[kOptions])) 78 79 this[kAddClient](pool) 80 pool.on('connect', () => { 81 pool[kWeight] = Math.min(this[kMaxWeightPerServer], pool[kWeight] + this[kErrorPenalty]) 82 }) 83 84 pool.on('connectionError', () => { 85 pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty]) 86 this._updateBalancedPoolStats() 87 }) 88 89 pool.on('disconnect', (...args) => { 90 const err = args[2] 91 if (err && err.code === 'UND_ERR_SOCKET') { 92 // decrease the weight of the pool. 93 pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty]) 94 this._updateBalancedPoolStats() 95 } 96 }) 97 98 for (const client of this[kClients]) { 99 client[kWeight] = this[kMaxWeightPerServer] 100 } 101 102 this._updateBalancedPoolStats() 103 104 return this 105 } 106 107 _updateBalancedPoolStats () { 108 this[kGreatestCommonDivisor] = this[kClients].map(p => p[kWeight]).reduce(getGreatestCommonDivisor, 0) 109 } 110 111 removeUpstream (upstream) { 112 const upstreamOrigin = parseOrigin(upstream).origin 113 114 const pool = this[kClients].find((pool) => ( 115 pool[kUrl].origin === upstreamOrigin && 116 pool.closed !== true && 117 pool.destroyed !== true 118 )) 119 120 if (pool) { 121 this[kRemoveClient](pool) 122 } 123 124 return this 125 } 126 127 get upstreams () { 128 return this[kClients] 129 .filter(dispatcher => dispatcher.closed !== true && dispatcher.destroyed !== true) 130 .map((p) => p[kUrl].origin) 131 } 132 133 [kGetDispatcher] () { 134 // We validate that pools is greater than 0, 135 // otherwise we would have to wait until an upstream 136 // is added, which might never happen. 137 if (this[kClients].length === 0) { 138 throw new BalancedPoolMissingUpstreamError() 139 } 140 141 const dispatcher = this[kClients].find(dispatcher => ( 142 !dispatcher[kNeedDrain] && 143 dispatcher.closed !== true && 144 dispatcher.destroyed !== true 145 )) 146 147 if (!dispatcher) { 148 return 149 } 150 151 const allClientsBusy = this[kClients].map(pool => pool[kNeedDrain]).reduce((a, b) => a && b, true) 152 153 if (allClientsBusy) { 154 return 155 } 156 157 let counter = 0 158 159 let maxWeightIndex = this[kClients].findIndex(pool => !pool[kNeedDrain]) 160 161 while (counter++ < this[kClients].length) { 162 this[kIndex] = (this[kIndex] + 1) % this[kClients].length 163 const pool = this[kClients][this[kIndex]] 164 165 // find pool index with the largest weight 166 if (pool[kWeight] > this[kClients][maxWeightIndex][kWeight] && !pool[kNeedDrain]) { 167 maxWeightIndex = this[kIndex] 168 } 169 170 // decrease the current weight every `this[kClients].length`. 171 if (this[kIndex] === 0) { 172 // Set the current weight to the next lower weight. 173 this[kCurrentWeight] = this[kCurrentWeight] - this[kGreatestCommonDivisor] 174 175 if (this[kCurrentWeight] <= 0) { 176 this[kCurrentWeight] = this[kMaxWeightPerServer] 177 } 178 } 179 if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) { 180 return pool 181 } 182 } 183 184 this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight] 185 this[kIndex] = maxWeightIndex 186 return this[kClients][maxWeightIndex] 187 } 188} 189 190module.exports = BalancedPool 191