11cb0ef41Sopenharmony_ci'use strict' 21cb0ef41Sopenharmony_ci 31cb0ef41Sopenharmony_ciconst DispatcherBase = require('./dispatcher-base') 41cb0ef41Sopenharmony_ciconst FixedQueue = require('./node/fixed-queue') 51cb0ef41Sopenharmony_ciconst { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch } = require('./core/symbols') 61cb0ef41Sopenharmony_ciconst PoolStats = require('./pool-stats') 71cb0ef41Sopenharmony_ci 81cb0ef41Sopenharmony_ciconst kClients = Symbol('clients') 91cb0ef41Sopenharmony_ciconst kNeedDrain = Symbol('needDrain') 101cb0ef41Sopenharmony_ciconst kQueue = Symbol('queue') 111cb0ef41Sopenharmony_ciconst kClosedResolve = Symbol('closed resolve') 121cb0ef41Sopenharmony_ciconst kOnDrain = Symbol('onDrain') 131cb0ef41Sopenharmony_ciconst kOnConnect = Symbol('onConnect') 141cb0ef41Sopenharmony_ciconst kOnDisconnect = Symbol('onDisconnect') 151cb0ef41Sopenharmony_ciconst kOnConnectionError = Symbol('onConnectionError') 161cb0ef41Sopenharmony_ciconst kGetDispatcher = Symbol('get dispatcher') 171cb0ef41Sopenharmony_ciconst kAddClient = Symbol('add client') 181cb0ef41Sopenharmony_ciconst kRemoveClient = Symbol('remove client') 191cb0ef41Sopenharmony_ciconst kStats = Symbol('stats') 201cb0ef41Sopenharmony_ci 211cb0ef41Sopenharmony_ciclass PoolBase extends DispatcherBase { 221cb0ef41Sopenharmony_ci constructor () { 231cb0ef41Sopenharmony_ci super() 241cb0ef41Sopenharmony_ci 251cb0ef41Sopenharmony_ci this[kQueue] = new FixedQueue() 261cb0ef41Sopenharmony_ci this[kClients] = [] 271cb0ef41Sopenharmony_ci this[kQueued] = 0 281cb0ef41Sopenharmony_ci 291cb0ef41Sopenharmony_ci const pool = this 301cb0ef41Sopenharmony_ci 311cb0ef41Sopenharmony_ci this[kOnDrain] = function onDrain (origin, targets) { 321cb0ef41Sopenharmony_ci const queue = pool[kQueue] 331cb0ef41Sopenharmony_ci 341cb0ef41Sopenharmony_ci let needDrain = false 351cb0ef41Sopenharmony_ci 361cb0ef41Sopenharmony_ci while (!needDrain) { 371cb0ef41Sopenharmony_ci const item = queue.shift() 381cb0ef41Sopenharmony_ci if (!item) { 391cb0ef41Sopenharmony_ci break 401cb0ef41Sopenharmony_ci } 411cb0ef41Sopenharmony_ci pool[kQueued]-- 421cb0ef41Sopenharmony_ci needDrain = !this.dispatch(item.opts, item.handler) 431cb0ef41Sopenharmony_ci } 441cb0ef41Sopenharmony_ci 451cb0ef41Sopenharmony_ci this[kNeedDrain] = needDrain 461cb0ef41Sopenharmony_ci 471cb0ef41Sopenharmony_ci if (!this[kNeedDrain] && pool[kNeedDrain]) { 481cb0ef41Sopenharmony_ci pool[kNeedDrain] = false 491cb0ef41Sopenharmony_ci pool.emit('drain', origin, [pool, ...targets]) 501cb0ef41Sopenharmony_ci } 511cb0ef41Sopenharmony_ci 521cb0ef41Sopenharmony_ci if (pool[kClosedResolve] && queue.isEmpty()) { 531cb0ef41Sopenharmony_ci Promise 541cb0ef41Sopenharmony_ci .all(pool[kClients].map(c => c.close())) 551cb0ef41Sopenharmony_ci .then(pool[kClosedResolve]) 561cb0ef41Sopenharmony_ci } 571cb0ef41Sopenharmony_ci } 581cb0ef41Sopenharmony_ci 591cb0ef41Sopenharmony_ci this[kOnConnect] = (origin, targets) => { 601cb0ef41Sopenharmony_ci pool.emit('connect', origin, [pool, ...targets]) 611cb0ef41Sopenharmony_ci } 621cb0ef41Sopenharmony_ci 631cb0ef41Sopenharmony_ci this[kOnDisconnect] = (origin, targets, err) => { 641cb0ef41Sopenharmony_ci pool.emit('disconnect', origin, [pool, ...targets], err) 651cb0ef41Sopenharmony_ci } 661cb0ef41Sopenharmony_ci 671cb0ef41Sopenharmony_ci this[kOnConnectionError] = (origin, targets, err) => { 681cb0ef41Sopenharmony_ci pool.emit('connectionError', origin, [pool, ...targets], err) 691cb0ef41Sopenharmony_ci } 701cb0ef41Sopenharmony_ci 711cb0ef41Sopenharmony_ci this[kStats] = new PoolStats(this) 721cb0ef41Sopenharmony_ci } 731cb0ef41Sopenharmony_ci 741cb0ef41Sopenharmony_ci get [kBusy] () { 751cb0ef41Sopenharmony_ci return this[kNeedDrain] 761cb0ef41Sopenharmony_ci } 771cb0ef41Sopenharmony_ci 781cb0ef41Sopenharmony_ci get [kConnected] () { 791cb0ef41Sopenharmony_ci return this[kClients].filter(client => client[kConnected]).length 801cb0ef41Sopenharmony_ci } 811cb0ef41Sopenharmony_ci 821cb0ef41Sopenharmony_ci get [kFree] () { 831cb0ef41Sopenharmony_ci return this[kClients].filter(client => client[kConnected] && !client[kNeedDrain]).length 841cb0ef41Sopenharmony_ci } 851cb0ef41Sopenharmony_ci 861cb0ef41Sopenharmony_ci get [kPending] () { 871cb0ef41Sopenharmony_ci let ret = this[kQueued] 881cb0ef41Sopenharmony_ci for (const { [kPending]: pending } of this[kClients]) { 891cb0ef41Sopenharmony_ci ret += pending 901cb0ef41Sopenharmony_ci } 911cb0ef41Sopenharmony_ci return ret 921cb0ef41Sopenharmony_ci } 931cb0ef41Sopenharmony_ci 941cb0ef41Sopenharmony_ci get [kRunning] () { 951cb0ef41Sopenharmony_ci let ret = 0 961cb0ef41Sopenharmony_ci for (const { [kRunning]: running } of this[kClients]) { 971cb0ef41Sopenharmony_ci ret += running 981cb0ef41Sopenharmony_ci } 991cb0ef41Sopenharmony_ci return ret 1001cb0ef41Sopenharmony_ci } 1011cb0ef41Sopenharmony_ci 1021cb0ef41Sopenharmony_ci get [kSize] () { 1031cb0ef41Sopenharmony_ci let ret = this[kQueued] 1041cb0ef41Sopenharmony_ci for (const { [kSize]: size } of this[kClients]) { 1051cb0ef41Sopenharmony_ci ret += size 1061cb0ef41Sopenharmony_ci } 1071cb0ef41Sopenharmony_ci return ret 1081cb0ef41Sopenharmony_ci } 1091cb0ef41Sopenharmony_ci 1101cb0ef41Sopenharmony_ci get stats () { 1111cb0ef41Sopenharmony_ci return this[kStats] 1121cb0ef41Sopenharmony_ci } 1131cb0ef41Sopenharmony_ci 1141cb0ef41Sopenharmony_ci async [kClose] () { 1151cb0ef41Sopenharmony_ci if (this[kQueue].isEmpty()) { 1161cb0ef41Sopenharmony_ci return Promise.all(this[kClients].map(c => c.close())) 1171cb0ef41Sopenharmony_ci } else { 1181cb0ef41Sopenharmony_ci return new Promise((resolve) => { 1191cb0ef41Sopenharmony_ci this[kClosedResolve] = resolve 1201cb0ef41Sopenharmony_ci }) 1211cb0ef41Sopenharmony_ci } 1221cb0ef41Sopenharmony_ci } 1231cb0ef41Sopenharmony_ci 1241cb0ef41Sopenharmony_ci async [kDestroy] (err) { 1251cb0ef41Sopenharmony_ci while (true) { 1261cb0ef41Sopenharmony_ci const item = this[kQueue].shift() 1271cb0ef41Sopenharmony_ci if (!item) { 1281cb0ef41Sopenharmony_ci break 1291cb0ef41Sopenharmony_ci } 1301cb0ef41Sopenharmony_ci item.handler.onError(err) 1311cb0ef41Sopenharmony_ci } 1321cb0ef41Sopenharmony_ci 1331cb0ef41Sopenharmony_ci return Promise.all(this[kClients].map(c => c.destroy(err))) 1341cb0ef41Sopenharmony_ci } 1351cb0ef41Sopenharmony_ci 1361cb0ef41Sopenharmony_ci [kDispatch] (opts, handler) { 1371cb0ef41Sopenharmony_ci const dispatcher = this[kGetDispatcher]() 1381cb0ef41Sopenharmony_ci 1391cb0ef41Sopenharmony_ci if (!dispatcher) { 1401cb0ef41Sopenharmony_ci this[kNeedDrain] = true 1411cb0ef41Sopenharmony_ci this[kQueue].push({ opts, handler }) 1421cb0ef41Sopenharmony_ci this[kQueued]++ 1431cb0ef41Sopenharmony_ci } else if (!dispatcher.dispatch(opts, handler)) { 1441cb0ef41Sopenharmony_ci dispatcher[kNeedDrain] = true 1451cb0ef41Sopenharmony_ci this[kNeedDrain] = !this[kGetDispatcher]() 1461cb0ef41Sopenharmony_ci } 1471cb0ef41Sopenharmony_ci 1481cb0ef41Sopenharmony_ci return !this[kNeedDrain] 1491cb0ef41Sopenharmony_ci } 1501cb0ef41Sopenharmony_ci 1511cb0ef41Sopenharmony_ci [kAddClient] (client) { 1521cb0ef41Sopenharmony_ci client 1531cb0ef41Sopenharmony_ci .on('drain', this[kOnDrain]) 1541cb0ef41Sopenharmony_ci .on('connect', this[kOnConnect]) 1551cb0ef41Sopenharmony_ci .on('disconnect', this[kOnDisconnect]) 1561cb0ef41Sopenharmony_ci .on('connectionError', this[kOnConnectionError]) 1571cb0ef41Sopenharmony_ci 1581cb0ef41Sopenharmony_ci this[kClients].push(client) 1591cb0ef41Sopenharmony_ci 1601cb0ef41Sopenharmony_ci if (this[kNeedDrain]) { 1611cb0ef41Sopenharmony_ci process.nextTick(() => { 1621cb0ef41Sopenharmony_ci if (this[kNeedDrain]) { 1631cb0ef41Sopenharmony_ci this[kOnDrain](client[kUrl], [this, client]) 1641cb0ef41Sopenharmony_ci } 1651cb0ef41Sopenharmony_ci }) 1661cb0ef41Sopenharmony_ci } 1671cb0ef41Sopenharmony_ci 1681cb0ef41Sopenharmony_ci return this 1691cb0ef41Sopenharmony_ci } 1701cb0ef41Sopenharmony_ci 1711cb0ef41Sopenharmony_ci [kRemoveClient] (client) { 1721cb0ef41Sopenharmony_ci client.close(() => { 1731cb0ef41Sopenharmony_ci const idx = this[kClients].indexOf(client) 1741cb0ef41Sopenharmony_ci if (idx !== -1) { 1751cb0ef41Sopenharmony_ci this[kClients].splice(idx, 1) 1761cb0ef41Sopenharmony_ci } 1771cb0ef41Sopenharmony_ci }) 1781cb0ef41Sopenharmony_ci 1791cb0ef41Sopenharmony_ci this[kNeedDrain] = this[kClients].some(dispatcher => ( 1801cb0ef41Sopenharmony_ci !dispatcher[kNeedDrain] && 1811cb0ef41Sopenharmony_ci dispatcher.closed !== true && 1821cb0ef41Sopenharmony_ci dispatcher.destroyed !== true 1831cb0ef41Sopenharmony_ci )) 1841cb0ef41Sopenharmony_ci } 1851cb0ef41Sopenharmony_ci} 1861cb0ef41Sopenharmony_ci 1871cb0ef41Sopenharmony_cimodule.exports = { 1881cb0ef41Sopenharmony_ci PoolBase, 1891cb0ef41Sopenharmony_ci kClients, 1901cb0ef41Sopenharmony_ci kNeedDrain, 1911cb0ef41Sopenharmony_ci kAddClient, 1921cb0ef41Sopenharmony_ci kRemoveClient, 1931cb0ef41Sopenharmony_ci kGetDispatcher 1941cb0ef41Sopenharmony_ci} 195