11cb0ef41Sopenharmony_ci'use strict'
21cb0ef41Sopenharmony_ci
31cb0ef41Sopenharmony_ciconst DispatcherBase = require('./dispatcher-base')
41cb0ef41Sopenharmony_ciconst FixedQueue = require('./node/fixed-queue')
51cb0ef41Sopenharmony_ciconst { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch } = require('./core/symbols')
61cb0ef41Sopenharmony_ciconst PoolStats = require('./pool-stats')
71cb0ef41Sopenharmony_ci
81cb0ef41Sopenharmony_ciconst kClients = Symbol('clients')
91cb0ef41Sopenharmony_ciconst kNeedDrain = Symbol('needDrain')
101cb0ef41Sopenharmony_ciconst kQueue = Symbol('queue')
111cb0ef41Sopenharmony_ciconst kClosedResolve = Symbol('closed resolve')
121cb0ef41Sopenharmony_ciconst kOnDrain = Symbol('onDrain')
131cb0ef41Sopenharmony_ciconst kOnConnect = Symbol('onConnect')
141cb0ef41Sopenharmony_ciconst kOnDisconnect = Symbol('onDisconnect')
151cb0ef41Sopenharmony_ciconst kOnConnectionError = Symbol('onConnectionError')
161cb0ef41Sopenharmony_ciconst kGetDispatcher = Symbol('get dispatcher')
171cb0ef41Sopenharmony_ciconst kAddClient = Symbol('add client')
181cb0ef41Sopenharmony_ciconst kRemoveClient = Symbol('remove client')
191cb0ef41Sopenharmony_ciconst kStats = Symbol('stats')
201cb0ef41Sopenharmony_ci
211cb0ef41Sopenharmony_ciclass PoolBase extends DispatcherBase {
221cb0ef41Sopenharmony_ci  constructor () {
231cb0ef41Sopenharmony_ci    super()
241cb0ef41Sopenharmony_ci
251cb0ef41Sopenharmony_ci    this[kQueue] = new FixedQueue()
261cb0ef41Sopenharmony_ci    this[kClients] = []
271cb0ef41Sopenharmony_ci    this[kQueued] = 0
281cb0ef41Sopenharmony_ci
291cb0ef41Sopenharmony_ci    const pool = this
301cb0ef41Sopenharmony_ci
311cb0ef41Sopenharmony_ci    this[kOnDrain] = function onDrain (origin, targets) {
321cb0ef41Sopenharmony_ci      const queue = pool[kQueue]
331cb0ef41Sopenharmony_ci
341cb0ef41Sopenharmony_ci      let needDrain = false
351cb0ef41Sopenharmony_ci
361cb0ef41Sopenharmony_ci      while (!needDrain) {
371cb0ef41Sopenharmony_ci        const item = queue.shift()
381cb0ef41Sopenharmony_ci        if (!item) {
391cb0ef41Sopenharmony_ci          break
401cb0ef41Sopenharmony_ci        }
411cb0ef41Sopenharmony_ci        pool[kQueued]--
421cb0ef41Sopenharmony_ci        needDrain = !this.dispatch(item.opts, item.handler)
431cb0ef41Sopenharmony_ci      }
441cb0ef41Sopenharmony_ci
451cb0ef41Sopenharmony_ci      this[kNeedDrain] = needDrain
461cb0ef41Sopenharmony_ci
471cb0ef41Sopenharmony_ci      if (!this[kNeedDrain] && pool[kNeedDrain]) {
481cb0ef41Sopenharmony_ci        pool[kNeedDrain] = false
491cb0ef41Sopenharmony_ci        pool.emit('drain', origin, [pool, ...targets])
501cb0ef41Sopenharmony_ci      }
511cb0ef41Sopenharmony_ci
521cb0ef41Sopenharmony_ci      if (pool[kClosedResolve] && queue.isEmpty()) {
531cb0ef41Sopenharmony_ci        Promise
541cb0ef41Sopenharmony_ci          .all(pool[kClients].map(c => c.close()))
551cb0ef41Sopenharmony_ci          .then(pool[kClosedResolve])
561cb0ef41Sopenharmony_ci      }
571cb0ef41Sopenharmony_ci    }
581cb0ef41Sopenharmony_ci
591cb0ef41Sopenharmony_ci    this[kOnConnect] = (origin, targets) => {
601cb0ef41Sopenharmony_ci      pool.emit('connect', origin, [pool, ...targets])
611cb0ef41Sopenharmony_ci    }
621cb0ef41Sopenharmony_ci
631cb0ef41Sopenharmony_ci    this[kOnDisconnect] = (origin, targets, err) => {
641cb0ef41Sopenharmony_ci      pool.emit('disconnect', origin, [pool, ...targets], err)
651cb0ef41Sopenharmony_ci    }
661cb0ef41Sopenharmony_ci
671cb0ef41Sopenharmony_ci    this[kOnConnectionError] = (origin, targets, err) => {
681cb0ef41Sopenharmony_ci      pool.emit('connectionError', origin, [pool, ...targets], err)
691cb0ef41Sopenharmony_ci    }
701cb0ef41Sopenharmony_ci
711cb0ef41Sopenharmony_ci    this[kStats] = new PoolStats(this)
721cb0ef41Sopenharmony_ci  }
731cb0ef41Sopenharmony_ci
741cb0ef41Sopenharmony_ci  get [kBusy] () {
751cb0ef41Sopenharmony_ci    return this[kNeedDrain]
761cb0ef41Sopenharmony_ci  }
771cb0ef41Sopenharmony_ci
781cb0ef41Sopenharmony_ci  get [kConnected] () {
791cb0ef41Sopenharmony_ci    return this[kClients].filter(client => client[kConnected]).length
801cb0ef41Sopenharmony_ci  }
811cb0ef41Sopenharmony_ci
821cb0ef41Sopenharmony_ci  get [kFree] () {
831cb0ef41Sopenharmony_ci    return this[kClients].filter(client => client[kConnected] && !client[kNeedDrain]).length
841cb0ef41Sopenharmony_ci  }
851cb0ef41Sopenharmony_ci
861cb0ef41Sopenharmony_ci  get [kPending] () {
871cb0ef41Sopenharmony_ci    let ret = this[kQueued]
881cb0ef41Sopenharmony_ci    for (const { [kPending]: pending } of this[kClients]) {
891cb0ef41Sopenharmony_ci      ret += pending
901cb0ef41Sopenharmony_ci    }
911cb0ef41Sopenharmony_ci    return ret
921cb0ef41Sopenharmony_ci  }
931cb0ef41Sopenharmony_ci
941cb0ef41Sopenharmony_ci  get [kRunning] () {
951cb0ef41Sopenharmony_ci    let ret = 0
961cb0ef41Sopenharmony_ci    for (const { [kRunning]: running } of this[kClients]) {
971cb0ef41Sopenharmony_ci      ret += running
981cb0ef41Sopenharmony_ci    }
991cb0ef41Sopenharmony_ci    return ret
1001cb0ef41Sopenharmony_ci  }
1011cb0ef41Sopenharmony_ci
1021cb0ef41Sopenharmony_ci  get [kSize] () {
1031cb0ef41Sopenharmony_ci    let ret = this[kQueued]
1041cb0ef41Sopenharmony_ci    for (const { [kSize]: size } of this[kClients]) {
1051cb0ef41Sopenharmony_ci      ret += size
1061cb0ef41Sopenharmony_ci    }
1071cb0ef41Sopenharmony_ci    return ret
1081cb0ef41Sopenharmony_ci  }
1091cb0ef41Sopenharmony_ci
1101cb0ef41Sopenharmony_ci  get stats () {
1111cb0ef41Sopenharmony_ci    return this[kStats]
1121cb0ef41Sopenharmony_ci  }
1131cb0ef41Sopenharmony_ci
1141cb0ef41Sopenharmony_ci  async [kClose] () {
1151cb0ef41Sopenharmony_ci    if (this[kQueue].isEmpty()) {
1161cb0ef41Sopenharmony_ci      return Promise.all(this[kClients].map(c => c.close()))
1171cb0ef41Sopenharmony_ci    } else {
1181cb0ef41Sopenharmony_ci      return new Promise((resolve) => {
1191cb0ef41Sopenharmony_ci        this[kClosedResolve] = resolve
1201cb0ef41Sopenharmony_ci      })
1211cb0ef41Sopenharmony_ci    }
1221cb0ef41Sopenharmony_ci  }
1231cb0ef41Sopenharmony_ci
1241cb0ef41Sopenharmony_ci  async [kDestroy] (err) {
1251cb0ef41Sopenharmony_ci    while (true) {
1261cb0ef41Sopenharmony_ci      const item = this[kQueue].shift()
1271cb0ef41Sopenharmony_ci      if (!item) {
1281cb0ef41Sopenharmony_ci        break
1291cb0ef41Sopenharmony_ci      }
1301cb0ef41Sopenharmony_ci      item.handler.onError(err)
1311cb0ef41Sopenharmony_ci    }
1321cb0ef41Sopenharmony_ci
1331cb0ef41Sopenharmony_ci    return Promise.all(this[kClients].map(c => c.destroy(err)))
1341cb0ef41Sopenharmony_ci  }
1351cb0ef41Sopenharmony_ci
1361cb0ef41Sopenharmony_ci  [kDispatch] (opts, handler) {
1371cb0ef41Sopenharmony_ci    const dispatcher = this[kGetDispatcher]()
1381cb0ef41Sopenharmony_ci
1391cb0ef41Sopenharmony_ci    if (!dispatcher) {
1401cb0ef41Sopenharmony_ci      this[kNeedDrain] = true
1411cb0ef41Sopenharmony_ci      this[kQueue].push({ opts, handler })
1421cb0ef41Sopenharmony_ci      this[kQueued]++
1431cb0ef41Sopenharmony_ci    } else if (!dispatcher.dispatch(opts, handler)) {
1441cb0ef41Sopenharmony_ci      dispatcher[kNeedDrain] = true
1451cb0ef41Sopenharmony_ci      this[kNeedDrain] = !this[kGetDispatcher]()
1461cb0ef41Sopenharmony_ci    }
1471cb0ef41Sopenharmony_ci
1481cb0ef41Sopenharmony_ci    return !this[kNeedDrain]
1491cb0ef41Sopenharmony_ci  }
1501cb0ef41Sopenharmony_ci
1511cb0ef41Sopenharmony_ci  [kAddClient] (client) {
1521cb0ef41Sopenharmony_ci    client
1531cb0ef41Sopenharmony_ci      .on('drain', this[kOnDrain])
1541cb0ef41Sopenharmony_ci      .on('connect', this[kOnConnect])
1551cb0ef41Sopenharmony_ci      .on('disconnect', this[kOnDisconnect])
1561cb0ef41Sopenharmony_ci      .on('connectionError', this[kOnConnectionError])
1571cb0ef41Sopenharmony_ci
1581cb0ef41Sopenharmony_ci    this[kClients].push(client)
1591cb0ef41Sopenharmony_ci
1601cb0ef41Sopenharmony_ci    if (this[kNeedDrain]) {
1611cb0ef41Sopenharmony_ci      process.nextTick(() => {
1621cb0ef41Sopenharmony_ci        if (this[kNeedDrain]) {
1631cb0ef41Sopenharmony_ci          this[kOnDrain](client[kUrl], [this, client])
1641cb0ef41Sopenharmony_ci        }
1651cb0ef41Sopenharmony_ci      })
1661cb0ef41Sopenharmony_ci    }
1671cb0ef41Sopenharmony_ci
1681cb0ef41Sopenharmony_ci    return this
1691cb0ef41Sopenharmony_ci  }
1701cb0ef41Sopenharmony_ci
1711cb0ef41Sopenharmony_ci  [kRemoveClient] (client) {
1721cb0ef41Sopenharmony_ci    client.close(() => {
1731cb0ef41Sopenharmony_ci      const idx = this[kClients].indexOf(client)
1741cb0ef41Sopenharmony_ci      if (idx !== -1) {
1751cb0ef41Sopenharmony_ci        this[kClients].splice(idx, 1)
1761cb0ef41Sopenharmony_ci      }
1771cb0ef41Sopenharmony_ci    })
1781cb0ef41Sopenharmony_ci
1791cb0ef41Sopenharmony_ci    this[kNeedDrain] = this[kClients].some(dispatcher => (
1801cb0ef41Sopenharmony_ci      !dispatcher[kNeedDrain] &&
1811cb0ef41Sopenharmony_ci      dispatcher.closed !== true &&
1821cb0ef41Sopenharmony_ci      dispatcher.destroyed !== true
1831cb0ef41Sopenharmony_ci    ))
1841cb0ef41Sopenharmony_ci  }
1851cb0ef41Sopenharmony_ci}
1861cb0ef41Sopenharmony_ci
1871cb0ef41Sopenharmony_cimodule.exports = {
1881cb0ef41Sopenharmony_ci  PoolBase,
1891cb0ef41Sopenharmony_ci  kClients,
1901cb0ef41Sopenharmony_ci  kNeedDrain,
1911cb0ef41Sopenharmony_ci  kAddClient,
1921cb0ef41Sopenharmony_ci  kRemoveClient,
1931cb0ef41Sopenharmony_ci  kGetDispatcher
1941cb0ef41Sopenharmony_ci}
195