11cb0ef41Sopenharmony_ci'use strict' 21cb0ef41Sopenharmony_ci 31cb0ef41Sopenharmony_ciconst { 41cb0ef41Sopenharmony_ci BalancedPoolMissingUpstreamError, 51cb0ef41Sopenharmony_ci InvalidArgumentError 61cb0ef41Sopenharmony_ci} = require('./core/errors') 71cb0ef41Sopenharmony_ciconst { 81cb0ef41Sopenharmony_ci PoolBase, 91cb0ef41Sopenharmony_ci kClients, 101cb0ef41Sopenharmony_ci kNeedDrain, 111cb0ef41Sopenharmony_ci kAddClient, 121cb0ef41Sopenharmony_ci kRemoveClient, 131cb0ef41Sopenharmony_ci kGetDispatcher 141cb0ef41Sopenharmony_ci} = require('./pool-base') 151cb0ef41Sopenharmony_ciconst Pool = require('./pool') 161cb0ef41Sopenharmony_ciconst { kUrl, kInterceptors } = require('./core/symbols') 171cb0ef41Sopenharmony_ciconst { parseOrigin } = require('./core/util') 181cb0ef41Sopenharmony_ciconst kFactory = Symbol('factory') 191cb0ef41Sopenharmony_ci 201cb0ef41Sopenharmony_ciconst kOptions = Symbol('options') 211cb0ef41Sopenharmony_ciconst kGreatestCommonDivisor = Symbol('kGreatestCommonDivisor') 221cb0ef41Sopenharmony_ciconst kCurrentWeight = Symbol('kCurrentWeight') 231cb0ef41Sopenharmony_ciconst kIndex = Symbol('kIndex') 241cb0ef41Sopenharmony_ciconst kWeight = Symbol('kWeight') 251cb0ef41Sopenharmony_ciconst kMaxWeightPerServer = Symbol('kMaxWeightPerServer') 261cb0ef41Sopenharmony_ciconst kErrorPenalty = Symbol('kErrorPenalty') 271cb0ef41Sopenharmony_ci 281cb0ef41Sopenharmony_cifunction getGreatestCommonDivisor (a, b) { 291cb0ef41Sopenharmony_ci if (b === 0) return a 301cb0ef41Sopenharmony_ci return getGreatestCommonDivisor(b, a % b) 311cb0ef41Sopenharmony_ci} 321cb0ef41Sopenharmony_ci 331cb0ef41Sopenharmony_cifunction defaultFactory (origin, opts) { 341cb0ef41Sopenharmony_ci return new Pool(origin, opts) 351cb0ef41Sopenharmony_ci} 361cb0ef41Sopenharmony_ci 371cb0ef41Sopenharmony_ciclass BalancedPool extends PoolBase { 381cb0ef41Sopenharmony_ci constructor (upstreams = [], { factory = defaultFactory, ...opts } = {}) { 391cb0ef41Sopenharmony_ci super() 401cb0ef41Sopenharmony_ci 411cb0ef41Sopenharmony_ci this[kOptions] = opts 421cb0ef41Sopenharmony_ci this[kIndex] = -1 431cb0ef41Sopenharmony_ci this[kCurrentWeight] = 0 441cb0ef41Sopenharmony_ci 451cb0ef41Sopenharmony_ci this[kMaxWeightPerServer] = this[kOptions].maxWeightPerServer || 100 461cb0ef41Sopenharmony_ci this[kErrorPenalty] = this[kOptions].errorPenalty || 15 471cb0ef41Sopenharmony_ci 481cb0ef41Sopenharmony_ci if (!Array.isArray(upstreams)) { 491cb0ef41Sopenharmony_ci upstreams = [upstreams] 501cb0ef41Sopenharmony_ci } 511cb0ef41Sopenharmony_ci 521cb0ef41Sopenharmony_ci if (typeof factory !== 'function') { 531cb0ef41Sopenharmony_ci throw new InvalidArgumentError('factory must be a function.') 541cb0ef41Sopenharmony_ci } 551cb0ef41Sopenharmony_ci 561cb0ef41Sopenharmony_ci this[kInterceptors] = opts.interceptors && opts.interceptors.BalancedPool && Array.isArray(opts.interceptors.BalancedPool) 571cb0ef41Sopenharmony_ci ? opts.interceptors.BalancedPool 581cb0ef41Sopenharmony_ci : [] 591cb0ef41Sopenharmony_ci this[kFactory] = factory 601cb0ef41Sopenharmony_ci 611cb0ef41Sopenharmony_ci for (const upstream of upstreams) { 621cb0ef41Sopenharmony_ci this.addUpstream(upstream) 631cb0ef41Sopenharmony_ci } 641cb0ef41Sopenharmony_ci this._updateBalancedPoolStats() 651cb0ef41Sopenharmony_ci } 661cb0ef41Sopenharmony_ci 671cb0ef41Sopenharmony_ci addUpstream (upstream) { 681cb0ef41Sopenharmony_ci const upstreamOrigin = parseOrigin(upstream).origin 691cb0ef41Sopenharmony_ci 701cb0ef41Sopenharmony_ci if (this[kClients].find((pool) => ( 711cb0ef41Sopenharmony_ci pool[kUrl].origin === upstreamOrigin && 721cb0ef41Sopenharmony_ci pool.closed !== true && 731cb0ef41Sopenharmony_ci pool.destroyed !== true 741cb0ef41Sopenharmony_ci ))) { 751cb0ef41Sopenharmony_ci return this 761cb0ef41Sopenharmony_ci } 771cb0ef41Sopenharmony_ci const pool = this[kFactory](upstreamOrigin, Object.assign({}, this[kOptions])) 781cb0ef41Sopenharmony_ci 791cb0ef41Sopenharmony_ci this[kAddClient](pool) 801cb0ef41Sopenharmony_ci pool.on('connect', () => { 811cb0ef41Sopenharmony_ci pool[kWeight] = Math.min(this[kMaxWeightPerServer], pool[kWeight] + this[kErrorPenalty]) 821cb0ef41Sopenharmony_ci }) 831cb0ef41Sopenharmony_ci 841cb0ef41Sopenharmony_ci pool.on('connectionError', () => { 851cb0ef41Sopenharmony_ci pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty]) 861cb0ef41Sopenharmony_ci this._updateBalancedPoolStats() 871cb0ef41Sopenharmony_ci }) 881cb0ef41Sopenharmony_ci 891cb0ef41Sopenharmony_ci pool.on('disconnect', (...args) => { 901cb0ef41Sopenharmony_ci const err = args[2] 911cb0ef41Sopenharmony_ci if (err && err.code === 'UND_ERR_SOCKET') { 921cb0ef41Sopenharmony_ci // decrease the weight of the pool. 931cb0ef41Sopenharmony_ci pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty]) 941cb0ef41Sopenharmony_ci this._updateBalancedPoolStats() 951cb0ef41Sopenharmony_ci } 961cb0ef41Sopenharmony_ci }) 971cb0ef41Sopenharmony_ci 981cb0ef41Sopenharmony_ci for (const client of this[kClients]) { 991cb0ef41Sopenharmony_ci client[kWeight] = this[kMaxWeightPerServer] 1001cb0ef41Sopenharmony_ci } 1011cb0ef41Sopenharmony_ci 1021cb0ef41Sopenharmony_ci this._updateBalancedPoolStats() 1031cb0ef41Sopenharmony_ci 1041cb0ef41Sopenharmony_ci return this 1051cb0ef41Sopenharmony_ci } 1061cb0ef41Sopenharmony_ci 1071cb0ef41Sopenharmony_ci _updateBalancedPoolStats () { 1081cb0ef41Sopenharmony_ci this[kGreatestCommonDivisor] = this[kClients].map(p => p[kWeight]).reduce(getGreatestCommonDivisor, 0) 1091cb0ef41Sopenharmony_ci } 1101cb0ef41Sopenharmony_ci 1111cb0ef41Sopenharmony_ci removeUpstream (upstream) { 1121cb0ef41Sopenharmony_ci const upstreamOrigin = parseOrigin(upstream).origin 1131cb0ef41Sopenharmony_ci 1141cb0ef41Sopenharmony_ci const pool = this[kClients].find((pool) => ( 1151cb0ef41Sopenharmony_ci pool[kUrl].origin === upstreamOrigin && 1161cb0ef41Sopenharmony_ci pool.closed !== true && 1171cb0ef41Sopenharmony_ci pool.destroyed !== true 1181cb0ef41Sopenharmony_ci )) 1191cb0ef41Sopenharmony_ci 1201cb0ef41Sopenharmony_ci if (pool) { 1211cb0ef41Sopenharmony_ci this[kRemoveClient](pool) 1221cb0ef41Sopenharmony_ci } 1231cb0ef41Sopenharmony_ci 1241cb0ef41Sopenharmony_ci return this 1251cb0ef41Sopenharmony_ci } 1261cb0ef41Sopenharmony_ci 1271cb0ef41Sopenharmony_ci get upstreams () { 1281cb0ef41Sopenharmony_ci return this[kClients] 1291cb0ef41Sopenharmony_ci .filter(dispatcher => dispatcher.closed !== true && dispatcher.destroyed !== true) 1301cb0ef41Sopenharmony_ci .map((p) => p[kUrl].origin) 1311cb0ef41Sopenharmony_ci } 1321cb0ef41Sopenharmony_ci 1331cb0ef41Sopenharmony_ci [kGetDispatcher] () { 1341cb0ef41Sopenharmony_ci // We validate that pools is greater than 0, 1351cb0ef41Sopenharmony_ci // otherwise we would have to wait until an upstream 1361cb0ef41Sopenharmony_ci // is added, which might never happen. 1371cb0ef41Sopenharmony_ci if (this[kClients].length === 0) { 1381cb0ef41Sopenharmony_ci throw new BalancedPoolMissingUpstreamError() 1391cb0ef41Sopenharmony_ci } 1401cb0ef41Sopenharmony_ci 1411cb0ef41Sopenharmony_ci const dispatcher = this[kClients].find(dispatcher => ( 1421cb0ef41Sopenharmony_ci !dispatcher[kNeedDrain] && 1431cb0ef41Sopenharmony_ci dispatcher.closed !== true && 1441cb0ef41Sopenharmony_ci dispatcher.destroyed !== true 1451cb0ef41Sopenharmony_ci )) 1461cb0ef41Sopenharmony_ci 1471cb0ef41Sopenharmony_ci if (!dispatcher) { 1481cb0ef41Sopenharmony_ci return 1491cb0ef41Sopenharmony_ci } 1501cb0ef41Sopenharmony_ci 1511cb0ef41Sopenharmony_ci const allClientsBusy = this[kClients].map(pool => pool[kNeedDrain]).reduce((a, b) => a && b, true) 1521cb0ef41Sopenharmony_ci 1531cb0ef41Sopenharmony_ci if (allClientsBusy) { 1541cb0ef41Sopenharmony_ci return 1551cb0ef41Sopenharmony_ci } 1561cb0ef41Sopenharmony_ci 1571cb0ef41Sopenharmony_ci let counter = 0 1581cb0ef41Sopenharmony_ci 1591cb0ef41Sopenharmony_ci let maxWeightIndex = this[kClients].findIndex(pool => !pool[kNeedDrain]) 1601cb0ef41Sopenharmony_ci 1611cb0ef41Sopenharmony_ci while (counter++ < this[kClients].length) { 1621cb0ef41Sopenharmony_ci this[kIndex] = (this[kIndex] + 1) % this[kClients].length 1631cb0ef41Sopenharmony_ci const pool = this[kClients][this[kIndex]] 1641cb0ef41Sopenharmony_ci 1651cb0ef41Sopenharmony_ci // find pool index with the largest weight 1661cb0ef41Sopenharmony_ci if (pool[kWeight] > this[kClients][maxWeightIndex][kWeight] && !pool[kNeedDrain]) { 1671cb0ef41Sopenharmony_ci maxWeightIndex = this[kIndex] 1681cb0ef41Sopenharmony_ci } 1691cb0ef41Sopenharmony_ci 1701cb0ef41Sopenharmony_ci // decrease the current weight every `this[kClients].length`. 1711cb0ef41Sopenharmony_ci if (this[kIndex] === 0) { 1721cb0ef41Sopenharmony_ci // Set the current weight to the next lower weight. 1731cb0ef41Sopenharmony_ci this[kCurrentWeight] = this[kCurrentWeight] - this[kGreatestCommonDivisor] 1741cb0ef41Sopenharmony_ci 1751cb0ef41Sopenharmony_ci if (this[kCurrentWeight] <= 0) { 1761cb0ef41Sopenharmony_ci this[kCurrentWeight] = this[kMaxWeightPerServer] 1771cb0ef41Sopenharmony_ci } 1781cb0ef41Sopenharmony_ci } 1791cb0ef41Sopenharmony_ci if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) { 1801cb0ef41Sopenharmony_ci return pool 1811cb0ef41Sopenharmony_ci } 1821cb0ef41Sopenharmony_ci } 1831cb0ef41Sopenharmony_ci 1841cb0ef41Sopenharmony_ci this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight] 1851cb0ef41Sopenharmony_ci this[kIndex] = maxWeightIndex 1861cb0ef41Sopenharmony_ci return this[kClients][maxWeightIndex] 1871cb0ef41Sopenharmony_ci } 1881cb0ef41Sopenharmony_ci} 1891cb0ef41Sopenharmony_ci 1901cb0ef41Sopenharmony_cimodule.exports = BalancedPool 191