1'use strict' 2 3const DispatcherBase = require('./dispatcher-base') 4const FixedQueue = require('./node/fixed-queue') 5const { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch } = require('./core/symbols') 6const PoolStats = require('./pool-stats') 7 8const kClients = Symbol('clients') 9const kNeedDrain = Symbol('needDrain') 10const kQueue = Symbol('queue') 11const kClosedResolve = Symbol('closed resolve') 12const kOnDrain = Symbol('onDrain') 13const kOnConnect = Symbol('onConnect') 14const kOnDisconnect = Symbol('onDisconnect') 15const kOnConnectionError = Symbol('onConnectionError') 16const kGetDispatcher = Symbol('get dispatcher') 17const kAddClient = Symbol('add client') 18const kRemoveClient = Symbol('remove client') 19const kStats = Symbol('stats') 20 21class PoolBase extends DispatcherBase { 22 constructor () { 23 super() 24 25 this[kQueue] = new FixedQueue() 26 this[kClients] = [] 27 this[kQueued] = 0 28 29 const pool = this 30 31 this[kOnDrain] = function onDrain (origin, targets) { 32 const queue = pool[kQueue] 33 34 let needDrain = false 35 36 while (!needDrain) { 37 const item = queue.shift() 38 if (!item) { 39 break 40 } 41 pool[kQueued]-- 42 needDrain = !this.dispatch(item.opts, item.handler) 43 } 44 45 this[kNeedDrain] = needDrain 46 47 if (!this[kNeedDrain] && pool[kNeedDrain]) { 48 pool[kNeedDrain] = false 49 pool.emit('drain', origin, [pool, ...targets]) 50 } 51 52 if (pool[kClosedResolve] && queue.isEmpty()) { 53 Promise 54 .all(pool[kClients].map(c => c.close())) 55 .then(pool[kClosedResolve]) 56 } 57 } 58 59 this[kOnConnect] = (origin, targets) => { 60 pool.emit('connect', origin, [pool, ...targets]) 61 } 62 63 this[kOnDisconnect] = (origin, targets, err) => { 64 pool.emit('disconnect', origin, [pool, ...targets], err) 65 } 66 67 this[kOnConnectionError] = (origin, targets, err) => { 68 pool.emit('connectionError', origin, [pool, ...targets], err) 69 } 70 71 this[kStats] = new PoolStats(this) 72 } 73 74 get [kBusy] () { 75 return this[kNeedDrain] 76 } 77 78 get [kConnected] () { 79 return this[kClients].filter(client => client[kConnected]).length 80 } 81 82 get [kFree] () { 83 return this[kClients].filter(client => client[kConnected] && !client[kNeedDrain]).length 84 } 85 86 get [kPending] () { 87 let ret = this[kQueued] 88 for (const { [kPending]: pending } of this[kClients]) { 89 ret += pending 90 } 91 return ret 92 } 93 94 get [kRunning] () { 95 let ret = 0 96 for (const { [kRunning]: running } of this[kClients]) { 97 ret += running 98 } 99 return ret 100 } 101 102 get [kSize] () { 103 let ret = this[kQueued] 104 for (const { [kSize]: size } of this[kClients]) { 105 ret += size 106 } 107 return ret 108 } 109 110 get stats () { 111 return this[kStats] 112 } 113 114 async [kClose] () { 115 if (this[kQueue].isEmpty()) { 116 return Promise.all(this[kClients].map(c => c.close())) 117 } else { 118 return new Promise((resolve) => { 119 this[kClosedResolve] = resolve 120 }) 121 } 122 } 123 124 async [kDestroy] (err) { 125 while (true) { 126 const item = this[kQueue].shift() 127 if (!item) { 128 break 129 } 130 item.handler.onError(err) 131 } 132 133 return Promise.all(this[kClients].map(c => c.destroy(err))) 134 } 135 136 [kDispatch] (opts, handler) { 137 const dispatcher = this[kGetDispatcher]() 138 139 if (!dispatcher) { 140 this[kNeedDrain] = true 141 this[kQueue].push({ opts, handler }) 142 this[kQueued]++ 143 } else if (!dispatcher.dispatch(opts, handler)) { 144 dispatcher[kNeedDrain] = true 145 this[kNeedDrain] = !this[kGetDispatcher]() 146 } 147 148 return !this[kNeedDrain] 149 } 150 151 [kAddClient] (client) { 152 client 153 .on('drain', this[kOnDrain]) 154 .on('connect', this[kOnConnect]) 155 .on('disconnect', this[kOnDisconnect]) 156 .on('connectionError', this[kOnConnectionError]) 157 158 this[kClients].push(client) 159 160 if (this[kNeedDrain]) { 161 process.nextTick(() => { 162 if (this[kNeedDrain]) { 163 this[kOnDrain](client[kUrl], [this, client]) 164 } 165 }) 166 } 167 168 return this 169 } 170 171 [kRemoveClient] (client) { 172 client.close(() => { 173 const idx = this[kClients].indexOf(client) 174 if (idx !== -1) { 175 this[kClients].splice(idx, 1) 176 } 177 }) 178 179 this[kNeedDrain] = this[kClients].some(dispatcher => ( 180 !dispatcher[kNeedDrain] && 181 dispatcher.closed !== true && 182 dispatcher.destroyed !== true 183 )) 184 } 185} 186 187module.exports = { 188 PoolBase, 189 kClients, 190 kNeedDrain, 191 kAddClient, 192 kRemoveClient, 193 kGetDispatcher 194} 195