1'use strict'
2
3const {
4  BalancedPoolMissingUpstreamError,
5  InvalidArgumentError
6} = require('./core/errors')
7const {
8  PoolBase,
9  kClients,
10  kNeedDrain,
11  kAddClient,
12  kRemoveClient,
13  kGetDispatcher
14} = require('./pool-base')
15const Pool = require('./pool')
16const { kUrl, kInterceptors } = require('./core/symbols')
17const { parseOrigin } = require('./core/util')
18const kFactory = Symbol('factory')
19
20const kOptions = Symbol('options')
21const kGreatestCommonDivisor = Symbol('kGreatestCommonDivisor')
22const kCurrentWeight = Symbol('kCurrentWeight')
23const kIndex = Symbol('kIndex')
24const kWeight = Symbol('kWeight')
25const kMaxWeightPerServer = Symbol('kMaxWeightPerServer')
26const kErrorPenalty = Symbol('kErrorPenalty')
27
28function getGreatestCommonDivisor (a, b) {
29  if (b === 0) return a
30  return getGreatestCommonDivisor(b, a % b)
31}
32
33function defaultFactory (origin, opts) {
34  return new Pool(origin, opts)
35}
36
37class BalancedPool extends PoolBase {
38  constructor (upstreams = [], { factory = defaultFactory, ...opts } = {}) {
39    super()
40
41    this[kOptions] = opts
42    this[kIndex] = -1
43    this[kCurrentWeight] = 0
44
45    this[kMaxWeightPerServer] = this[kOptions].maxWeightPerServer || 100
46    this[kErrorPenalty] = this[kOptions].errorPenalty || 15
47
48    if (!Array.isArray(upstreams)) {
49      upstreams = [upstreams]
50    }
51
52    if (typeof factory !== 'function') {
53      throw new InvalidArgumentError('factory must be a function.')
54    }
55
56    this[kInterceptors] = opts.interceptors && opts.interceptors.BalancedPool && Array.isArray(opts.interceptors.BalancedPool)
57      ? opts.interceptors.BalancedPool
58      : []
59    this[kFactory] = factory
60
61    for (const upstream of upstreams) {
62      this.addUpstream(upstream)
63    }
64    this._updateBalancedPoolStats()
65  }
66
67  addUpstream (upstream) {
68    const upstreamOrigin = parseOrigin(upstream).origin
69
70    if (this[kClients].find((pool) => (
71      pool[kUrl].origin === upstreamOrigin &&
72      pool.closed !== true &&
73      pool.destroyed !== true
74    ))) {
75      return this
76    }
77    const pool = this[kFactory](upstreamOrigin, Object.assign({}, this[kOptions]))
78
79    this[kAddClient](pool)
80    pool.on('connect', () => {
81      pool[kWeight] = Math.min(this[kMaxWeightPerServer], pool[kWeight] + this[kErrorPenalty])
82    })
83
84    pool.on('connectionError', () => {
85      pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty])
86      this._updateBalancedPoolStats()
87    })
88
89    pool.on('disconnect', (...args) => {
90      const err = args[2]
91      if (err && err.code === 'UND_ERR_SOCKET') {
92        // decrease the weight of the pool.
93        pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty])
94        this._updateBalancedPoolStats()
95      }
96    })
97
98    for (const client of this[kClients]) {
99      client[kWeight] = this[kMaxWeightPerServer]
100    }
101
102    this._updateBalancedPoolStats()
103
104    return this
105  }
106
107  _updateBalancedPoolStats () {
108    this[kGreatestCommonDivisor] = this[kClients].map(p => p[kWeight]).reduce(getGreatestCommonDivisor, 0)
109  }
110
111  removeUpstream (upstream) {
112    const upstreamOrigin = parseOrigin(upstream).origin
113
114    const pool = this[kClients].find((pool) => (
115      pool[kUrl].origin === upstreamOrigin &&
116      pool.closed !== true &&
117      pool.destroyed !== true
118    ))
119
120    if (pool) {
121      this[kRemoveClient](pool)
122    }
123
124    return this
125  }
126
127  get upstreams () {
128    return this[kClients]
129      .filter(dispatcher => dispatcher.closed !== true && dispatcher.destroyed !== true)
130      .map((p) => p[kUrl].origin)
131  }
132
133  [kGetDispatcher] () {
134    // We validate that pools is greater than 0,
135    // otherwise we would have to wait until an upstream
136    // is added, which might never happen.
137    if (this[kClients].length === 0) {
138      throw new BalancedPoolMissingUpstreamError()
139    }
140
141    const dispatcher = this[kClients].find(dispatcher => (
142      !dispatcher[kNeedDrain] &&
143      dispatcher.closed !== true &&
144      dispatcher.destroyed !== true
145    ))
146
147    if (!dispatcher) {
148      return
149    }
150
151    const allClientsBusy = this[kClients].map(pool => pool[kNeedDrain]).reduce((a, b) => a && b, true)
152
153    if (allClientsBusy) {
154      return
155    }
156
157    let counter = 0
158
159    let maxWeightIndex = this[kClients].findIndex(pool => !pool[kNeedDrain])
160
161    while (counter++ < this[kClients].length) {
162      this[kIndex] = (this[kIndex] + 1) % this[kClients].length
163      const pool = this[kClients][this[kIndex]]
164
165      // find pool index with the largest weight
166      if (pool[kWeight] > this[kClients][maxWeightIndex][kWeight] && !pool[kNeedDrain]) {
167        maxWeightIndex = this[kIndex]
168      }
169
170      // decrease the current weight every `this[kClients].length`.
171      if (this[kIndex] === 0) {
172        // Set the current weight to the next lower weight.
173        this[kCurrentWeight] = this[kCurrentWeight] - this[kGreatestCommonDivisor]
174
175        if (this[kCurrentWeight] <= 0) {
176          this[kCurrentWeight] = this[kMaxWeightPerServer]
177        }
178      }
179      if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) {
180        return pool
181      }
182    }
183
184    this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight]
185    this[kIndex] = maxWeightIndex
186    return this[kClients][maxWeightIndex]
187  }
188}
189
190module.exports = BalancedPool
191