1'use strict'
2
3const DispatcherBase = require('./dispatcher-base')
4const FixedQueue = require('./node/fixed-queue')
5const { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch } = require('./core/symbols')
6const PoolStats = require('./pool-stats')
7
8const kClients = Symbol('clients')
9const kNeedDrain = Symbol('needDrain')
10const kQueue = Symbol('queue')
11const kClosedResolve = Symbol('closed resolve')
12const kOnDrain = Symbol('onDrain')
13const kOnConnect = Symbol('onConnect')
14const kOnDisconnect = Symbol('onDisconnect')
15const kOnConnectionError = Symbol('onConnectionError')
16const kGetDispatcher = Symbol('get dispatcher')
17const kAddClient = Symbol('add client')
18const kRemoveClient = Symbol('remove client')
19const kStats = Symbol('stats')
20
21class PoolBase extends DispatcherBase {
22  constructor () {
23    super()
24
25    this[kQueue] = new FixedQueue()
26    this[kClients] = []
27    this[kQueued] = 0
28
29    const pool = this
30
31    this[kOnDrain] = function onDrain (origin, targets) {
32      const queue = pool[kQueue]
33
34      let needDrain = false
35
36      while (!needDrain) {
37        const item = queue.shift()
38        if (!item) {
39          break
40        }
41        pool[kQueued]--
42        needDrain = !this.dispatch(item.opts, item.handler)
43      }
44
45      this[kNeedDrain] = needDrain
46
47      if (!this[kNeedDrain] && pool[kNeedDrain]) {
48        pool[kNeedDrain] = false
49        pool.emit('drain', origin, [pool, ...targets])
50      }
51
52      if (pool[kClosedResolve] && queue.isEmpty()) {
53        Promise
54          .all(pool[kClients].map(c => c.close()))
55          .then(pool[kClosedResolve])
56      }
57    }
58
59    this[kOnConnect] = (origin, targets) => {
60      pool.emit('connect', origin, [pool, ...targets])
61    }
62
63    this[kOnDisconnect] = (origin, targets, err) => {
64      pool.emit('disconnect', origin, [pool, ...targets], err)
65    }
66
67    this[kOnConnectionError] = (origin, targets, err) => {
68      pool.emit('connectionError', origin, [pool, ...targets], err)
69    }
70
71    this[kStats] = new PoolStats(this)
72  }
73
74  get [kBusy] () {
75    return this[kNeedDrain]
76  }
77
78  get [kConnected] () {
79    return this[kClients].filter(client => client[kConnected]).length
80  }
81
82  get [kFree] () {
83    return this[kClients].filter(client => client[kConnected] && !client[kNeedDrain]).length
84  }
85
86  get [kPending] () {
87    let ret = this[kQueued]
88    for (const { [kPending]: pending } of this[kClients]) {
89      ret += pending
90    }
91    return ret
92  }
93
94  get [kRunning] () {
95    let ret = 0
96    for (const { [kRunning]: running } of this[kClients]) {
97      ret += running
98    }
99    return ret
100  }
101
102  get [kSize] () {
103    let ret = this[kQueued]
104    for (const { [kSize]: size } of this[kClients]) {
105      ret += size
106    }
107    return ret
108  }
109
110  get stats () {
111    return this[kStats]
112  }
113
114  async [kClose] () {
115    if (this[kQueue].isEmpty()) {
116      return Promise.all(this[kClients].map(c => c.close()))
117    } else {
118      return new Promise((resolve) => {
119        this[kClosedResolve] = resolve
120      })
121    }
122  }
123
124  async [kDestroy] (err) {
125    while (true) {
126      const item = this[kQueue].shift()
127      if (!item) {
128        break
129      }
130      item.handler.onError(err)
131    }
132
133    return Promise.all(this[kClients].map(c => c.destroy(err)))
134  }
135
136  [kDispatch] (opts, handler) {
137    const dispatcher = this[kGetDispatcher]()
138
139    if (!dispatcher) {
140      this[kNeedDrain] = true
141      this[kQueue].push({ opts, handler })
142      this[kQueued]++
143    } else if (!dispatcher.dispatch(opts, handler)) {
144      dispatcher[kNeedDrain] = true
145      this[kNeedDrain] = !this[kGetDispatcher]()
146    }
147
148    return !this[kNeedDrain]
149  }
150
151  [kAddClient] (client) {
152    client
153      .on('drain', this[kOnDrain])
154      .on('connect', this[kOnConnect])
155      .on('disconnect', this[kOnDisconnect])
156      .on('connectionError', this[kOnConnectionError])
157
158    this[kClients].push(client)
159
160    if (this[kNeedDrain]) {
161      process.nextTick(() => {
162        if (this[kNeedDrain]) {
163          this[kOnDrain](client[kUrl], [this, client])
164        }
165      })
166    }
167
168    return this
169  }
170
171  [kRemoveClient] (client) {
172    client.close(() => {
173      const idx = this[kClients].indexOf(client)
174      if (idx !== -1) {
175        this[kClients].splice(idx, 1)
176      }
177    })
178
179    this[kNeedDrain] = this[kClients].some(dispatcher => (
180      !dispatcher[kNeedDrain] &&
181      dispatcher.closed !== true &&
182      dispatcher.destroyed !== true
183    ))
184  }
185}
186
187module.exports = {
188  PoolBase,
189  kClients,
190  kNeedDrain,
191  kAddClient,
192  kRemoveClient,
193  kGetDispatcher
194}
195