11cb0ef41Sopenharmony_ci'use strict'
21cb0ef41Sopenharmony_ci
31cb0ef41Sopenharmony_ciconst {
41cb0ef41Sopenharmony_ci  BalancedPoolMissingUpstreamError,
51cb0ef41Sopenharmony_ci  InvalidArgumentError
61cb0ef41Sopenharmony_ci} = require('./core/errors')
71cb0ef41Sopenharmony_ciconst {
81cb0ef41Sopenharmony_ci  PoolBase,
91cb0ef41Sopenharmony_ci  kClients,
101cb0ef41Sopenharmony_ci  kNeedDrain,
111cb0ef41Sopenharmony_ci  kAddClient,
121cb0ef41Sopenharmony_ci  kRemoveClient,
131cb0ef41Sopenharmony_ci  kGetDispatcher
141cb0ef41Sopenharmony_ci} = require('./pool-base')
151cb0ef41Sopenharmony_ciconst Pool = require('./pool')
161cb0ef41Sopenharmony_ciconst { kUrl, kInterceptors } = require('./core/symbols')
171cb0ef41Sopenharmony_ciconst { parseOrigin } = require('./core/util')
181cb0ef41Sopenharmony_ciconst kFactory = Symbol('factory')
191cb0ef41Sopenharmony_ci
201cb0ef41Sopenharmony_ciconst kOptions = Symbol('options')
211cb0ef41Sopenharmony_ciconst kGreatestCommonDivisor = Symbol('kGreatestCommonDivisor')
221cb0ef41Sopenharmony_ciconst kCurrentWeight = Symbol('kCurrentWeight')
231cb0ef41Sopenharmony_ciconst kIndex = Symbol('kIndex')
241cb0ef41Sopenharmony_ciconst kWeight = Symbol('kWeight')
251cb0ef41Sopenharmony_ciconst kMaxWeightPerServer = Symbol('kMaxWeightPerServer')
261cb0ef41Sopenharmony_ciconst kErrorPenalty = Symbol('kErrorPenalty')
271cb0ef41Sopenharmony_ci
281cb0ef41Sopenharmony_cifunction getGreatestCommonDivisor (a, b) {
291cb0ef41Sopenharmony_ci  if (b === 0) return a
301cb0ef41Sopenharmony_ci  return getGreatestCommonDivisor(b, a % b)
311cb0ef41Sopenharmony_ci}
321cb0ef41Sopenharmony_ci
331cb0ef41Sopenharmony_cifunction defaultFactory (origin, opts) {
341cb0ef41Sopenharmony_ci  return new Pool(origin, opts)
351cb0ef41Sopenharmony_ci}
361cb0ef41Sopenharmony_ci
371cb0ef41Sopenharmony_ciclass BalancedPool extends PoolBase {
381cb0ef41Sopenharmony_ci  constructor (upstreams = [], { factory = defaultFactory, ...opts } = {}) {
391cb0ef41Sopenharmony_ci    super()
401cb0ef41Sopenharmony_ci
411cb0ef41Sopenharmony_ci    this[kOptions] = opts
421cb0ef41Sopenharmony_ci    this[kIndex] = -1
431cb0ef41Sopenharmony_ci    this[kCurrentWeight] = 0
441cb0ef41Sopenharmony_ci
451cb0ef41Sopenharmony_ci    this[kMaxWeightPerServer] = this[kOptions].maxWeightPerServer || 100
461cb0ef41Sopenharmony_ci    this[kErrorPenalty] = this[kOptions].errorPenalty || 15
471cb0ef41Sopenharmony_ci
481cb0ef41Sopenharmony_ci    if (!Array.isArray(upstreams)) {
491cb0ef41Sopenharmony_ci      upstreams = [upstreams]
501cb0ef41Sopenharmony_ci    }
511cb0ef41Sopenharmony_ci
521cb0ef41Sopenharmony_ci    if (typeof factory !== 'function') {
531cb0ef41Sopenharmony_ci      throw new InvalidArgumentError('factory must be a function.')
541cb0ef41Sopenharmony_ci    }
551cb0ef41Sopenharmony_ci
561cb0ef41Sopenharmony_ci    this[kInterceptors] = opts.interceptors && opts.interceptors.BalancedPool && Array.isArray(opts.interceptors.BalancedPool)
571cb0ef41Sopenharmony_ci      ? opts.interceptors.BalancedPool
581cb0ef41Sopenharmony_ci      : []
591cb0ef41Sopenharmony_ci    this[kFactory] = factory
601cb0ef41Sopenharmony_ci
611cb0ef41Sopenharmony_ci    for (const upstream of upstreams) {
621cb0ef41Sopenharmony_ci      this.addUpstream(upstream)
631cb0ef41Sopenharmony_ci    }
641cb0ef41Sopenharmony_ci    this._updateBalancedPoolStats()
651cb0ef41Sopenharmony_ci  }
661cb0ef41Sopenharmony_ci
671cb0ef41Sopenharmony_ci  addUpstream (upstream) {
681cb0ef41Sopenharmony_ci    const upstreamOrigin = parseOrigin(upstream).origin
691cb0ef41Sopenharmony_ci
701cb0ef41Sopenharmony_ci    if (this[kClients].find((pool) => (
711cb0ef41Sopenharmony_ci      pool[kUrl].origin === upstreamOrigin &&
721cb0ef41Sopenharmony_ci      pool.closed !== true &&
731cb0ef41Sopenharmony_ci      pool.destroyed !== true
741cb0ef41Sopenharmony_ci    ))) {
751cb0ef41Sopenharmony_ci      return this
761cb0ef41Sopenharmony_ci    }
771cb0ef41Sopenharmony_ci    const pool = this[kFactory](upstreamOrigin, Object.assign({}, this[kOptions]))
781cb0ef41Sopenharmony_ci
791cb0ef41Sopenharmony_ci    this[kAddClient](pool)
801cb0ef41Sopenharmony_ci    pool.on('connect', () => {
811cb0ef41Sopenharmony_ci      pool[kWeight] = Math.min(this[kMaxWeightPerServer], pool[kWeight] + this[kErrorPenalty])
821cb0ef41Sopenharmony_ci    })
831cb0ef41Sopenharmony_ci
841cb0ef41Sopenharmony_ci    pool.on('connectionError', () => {
851cb0ef41Sopenharmony_ci      pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty])
861cb0ef41Sopenharmony_ci      this._updateBalancedPoolStats()
871cb0ef41Sopenharmony_ci    })
881cb0ef41Sopenharmony_ci
891cb0ef41Sopenharmony_ci    pool.on('disconnect', (...args) => {
901cb0ef41Sopenharmony_ci      const err = args[2]
911cb0ef41Sopenharmony_ci      if (err && err.code === 'UND_ERR_SOCKET') {
921cb0ef41Sopenharmony_ci        // decrease the weight of the pool.
931cb0ef41Sopenharmony_ci        pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty])
941cb0ef41Sopenharmony_ci        this._updateBalancedPoolStats()
951cb0ef41Sopenharmony_ci      }
961cb0ef41Sopenharmony_ci    })
971cb0ef41Sopenharmony_ci
981cb0ef41Sopenharmony_ci    for (const client of this[kClients]) {
991cb0ef41Sopenharmony_ci      client[kWeight] = this[kMaxWeightPerServer]
1001cb0ef41Sopenharmony_ci    }
1011cb0ef41Sopenharmony_ci
1021cb0ef41Sopenharmony_ci    this._updateBalancedPoolStats()
1031cb0ef41Sopenharmony_ci
1041cb0ef41Sopenharmony_ci    return this
1051cb0ef41Sopenharmony_ci  }
1061cb0ef41Sopenharmony_ci
1071cb0ef41Sopenharmony_ci  _updateBalancedPoolStats () {
1081cb0ef41Sopenharmony_ci    this[kGreatestCommonDivisor] = this[kClients].map(p => p[kWeight]).reduce(getGreatestCommonDivisor, 0)
1091cb0ef41Sopenharmony_ci  }
1101cb0ef41Sopenharmony_ci
1111cb0ef41Sopenharmony_ci  removeUpstream (upstream) {
1121cb0ef41Sopenharmony_ci    const upstreamOrigin = parseOrigin(upstream).origin
1131cb0ef41Sopenharmony_ci
1141cb0ef41Sopenharmony_ci    const pool = this[kClients].find((pool) => (
1151cb0ef41Sopenharmony_ci      pool[kUrl].origin === upstreamOrigin &&
1161cb0ef41Sopenharmony_ci      pool.closed !== true &&
1171cb0ef41Sopenharmony_ci      pool.destroyed !== true
1181cb0ef41Sopenharmony_ci    ))
1191cb0ef41Sopenharmony_ci
1201cb0ef41Sopenharmony_ci    if (pool) {
1211cb0ef41Sopenharmony_ci      this[kRemoveClient](pool)
1221cb0ef41Sopenharmony_ci    }
1231cb0ef41Sopenharmony_ci
1241cb0ef41Sopenharmony_ci    return this
1251cb0ef41Sopenharmony_ci  }
1261cb0ef41Sopenharmony_ci
1271cb0ef41Sopenharmony_ci  get upstreams () {
1281cb0ef41Sopenharmony_ci    return this[kClients]
1291cb0ef41Sopenharmony_ci      .filter(dispatcher => dispatcher.closed !== true && dispatcher.destroyed !== true)
1301cb0ef41Sopenharmony_ci      .map((p) => p[kUrl].origin)
1311cb0ef41Sopenharmony_ci  }
1321cb0ef41Sopenharmony_ci
1331cb0ef41Sopenharmony_ci  [kGetDispatcher] () {
1341cb0ef41Sopenharmony_ci    // We validate that pools is greater than 0,
1351cb0ef41Sopenharmony_ci    // otherwise we would have to wait until an upstream
1361cb0ef41Sopenharmony_ci    // is added, which might never happen.
1371cb0ef41Sopenharmony_ci    if (this[kClients].length === 0) {
1381cb0ef41Sopenharmony_ci      throw new BalancedPoolMissingUpstreamError()
1391cb0ef41Sopenharmony_ci    }
1401cb0ef41Sopenharmony_ci
1411cb0ef41Sopenharmony_ci    const dispatcher = this[kClients].find(dispatcher => (
1421cb0ef41Sopenharmony_ci      !dispatcher[kNeedDrain] &&
1431cb0ef41Sopenharmony_ci      dispatcher.closed !== true &&
1441cb0ef41Sopenharmony_ci      dispatcher.destroyed !== true
1451cb0ef41Sopenharmony_ci    ))
1461cb0ef41Sopenharmony_ci
1471cb0ef41Sopenharmony_ci    if (!dispatcher) {
1481cb0ef41Sopenharmony_ci      return
1491cb0ef41Sopenharmony_ci    }
1501cb0ef41Sopenharmony_ci
1511cb0ef41Sopenharmony_ci    const allClientsBusy = this[kClients].map(pool => pool[kNeedDrain]).reduce((a, b) => a && b, true)
1521cb0ef41Sopenharmony_ci
1531cb0ef41Sopenharmony_ci    if (allClientsBusy) {
1541cb0ef41Sopenharmony_ci      return
1551cb0ef41Sopenharmony_ci    }
1561cb0ef41Sopenharmony_ci
1571cb0ef41Sopenharmony_ci    let counter = 0
1581cb0ef41Sopenharmony_ci
1591cb0ef41Sopenharmony_ci    let maxWeightIndex = this[kClients].findIndex(pool => !pool[kNeedDrain])
1601cb0ef41Sopenharmony_ci
1611cb0ef41Sopenharmony_ci    while (counter++ < this[kClients].length) {
1621cb0ef41Sopenharmony_ci      this[kIndex] = (this[kIndex] + 1) % this[kClients].length
1631cb0ef41Sopenharmony_ci      const pool = this[kClients][this[kIndex]]
1641cb0ef41Sopenharmony_ci
1651cb0ef41Sopenharmony_ci      // find pool index with the largest weight
1661cb0ef41Sopenharmony_ci      if (pool[kWeight] > this[kClients][maxWeightIndex][kWeight] && !pool[kNeedDrain]) {
1671cb0ef41Sopenharmony_ci        maxWeightIndex = this[kIndex]
1681cb0ef41Sopenharmony_ci      }
1691cb0ef41Sopenharmony_ci
1701cb0ef41Sopenharmony_ci      // decrease the current weight every `this[kClients].length`.
1711cb0ef41Sopenharmony_ci      if (this[kIndex] === 0) {
1721cb0ef41Sopenharmony_ci        // Set the current weight to the next lower weight.
1731cb0ef41Sopenharmony_ci        this[kCurrentWeight] = this[kCurrentWeight] - this[kGreatestCommonDivisor]
1741cb0ef41Sopenharmony_ci
1751cb0ef41Sopenharmony_ci        if (this[kCurrentWeight] <= 0) {
1761cb0ef41Sopenharmony_ci          this[kCurrentWeight] = this[kMaxWeightPerServer]
1771cb0ef41Sopenharmony_ci        }
1781cb0ef41Sopenharmony_ci      }
1791cb0ef41Sopenharmony_ci      if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) {
1801cb0ef41Sopenharmony_ci        return pool
1811cb0ef41Sopenharmony_ci      }
1821cb0ef41Sopenharmony_ci    }
1831cb0ef41Sopenharmony_ci
1841cb0ef41Sopenharmony_ci    this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight]
1851cb0ef41Sopenharmony_ci    this[kIndex] = maxWeightIndex
1861cb0ef41Sopenharmony_ci    return this[kClients][maxWeightIndex]
1871cb0ef41Sopenharmony_ci  }
1881cb0ef41Sopenharmony_ci}
1891cb0ef41Sopenharmony_ci
1901cb0ef41Sopenharmony_cimodule.exports = BalancedPool
191