1'use strict';
2
3const {
4  ArrayPrototypeIndexOf,
5  ArrayPrototypePush,
6  ArrayPrototypeSplice,
7  SafeFinalizationRegistry,
8  ObjectGetPrototypeOf,
9  ObjectSetPrototypeOf,
10  Promise,
11  PromisePrototypeThen,
12  PromiseResolve,
13  PromiseReject,
14  ReflectApply,
15  SafeMap,
16  SymbolHasInstance,
17} = primordials;
18
19const {
20  codes: {
21    ERR_INVALID_ARG_TYPE,
22  },
23} = require('internal/errors');
24const {
25  validateFunction,
26} = require('internal/validators');
27
28const { triggerUncaughtException } = internalBinding('errors');
29
30const { WeakReference } = internalBinding('util');
31
32// Can't delete when weakref count reaches 0 as it could increment again.
33// Only GC can be used as a valid time to clean up the channels map.
34class WeakRefMap extends SafeMap {
35  #finalizers = new SafeFinalizationRegistry((key) => {
36    this.delete(key);
37  });
38
39  set(key, value) {
40    this.#finalizers.register(value, key);
41    return super.set(key, new WeakReference(value));
42  }
43
44  get(key) {
45    return super.get(key)?.get();
46  }
47
48  incRef(key) {
49    return super.get(key)?.incRef();
50  }
51
52  decRef(key) {
53    return super.get(key)?.decRef();
54  }
55}
56
57function markActive(channel) {
58  // eslint-disable-next-line no-use-before-define
59  ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
60  channel._subscribers = [];
61  channel._stores = new SafeMap();
62}
63
64function maybeMarkInactive(channel) {
65  // When there are no more active subscribers or bound, restore to fast prototype.
66  if (!channel._subscribers.length && !channel._stores.size) {
67    // eslint-disable-next-line no-use-before-define
68    ObjectSetPrototypeOf(channel, Channel.prototype);
69    channel._subscribers = undefined;
70    channel._stores = undefined;
71  }
72}
73
74function defaultTransform(data) {
75  return data;
76}
77
78function wrapStoreRun(store, data, next, transform = defaultTransform) {
79  return () => {
80    let context;
81    try {
82      context = transform(data);
83    } catch (err) {
84      process.nextTick(() => {
85        triggerUncaughtException(err, false);
86      });
87      return next();
88    }
89
90    return store.run(context, next);
91  };
92}
93
94// TODO(qard): should there be a C++ channel interface?
95class ActiveChannel {
96  subscribe(subscription) {
97    validateFunction(subscription, 'subscription');
98    ArrayPrototypePush(this._subscribers, subscription);
99    channels.incRef(this.name);
100  }
101
102  unsubscribe(subscription) {
103    const index = ArrayPrototypeIndexOf(this._subscribers, subscription);
104    if (index === -1) return false;
105
106    ArrayPrototypeSplice(this._subscribers, index, 1);
107
108    channels.decRef(this.name);
109    maybeMarkInactive(this);
110
111    return true;
112  }
113
114  bindStore(store, transform) {
115    const replacing = this._stores.has(store);
116    if (!replacing) channels.incRef(this.name);
117    this._stores.set(store, transform);
118  }
119
120  unbindStore(store) {
121    if (!this._stores.has(store)) {
122      return false;
123    }
124
125    this._stores.delete(store);
126
127    channels.decRef(this.name);
128    maybeMarkInactive(this);
129
130    return true;
131  }
132
133  get hasSubscribers() {
134    return true;
135  }
136
137  publish(data) {
138    for (let i = 0; i < this._subscribers.length; i++) {
139      try {
140        const onMessage = this._subscribers[i];
141        onMessage(data, this.name);
142      } catch (err) {
143        process.nextTick(() => {
144          triggerUncaughtException(err, false);
145        });
146      }
147    }
148  }
149
150  runStores(data, fn, thisArg, ...args) {
151    let run = () => {
152      this.publish(data);
153      return ReflectApply(fn, thisArg, args);
154    };
155
156    for (const entry of this._stores.entries()) {
157      const store = entry[0];
158      const transform = entry[1];
159      run = wrapStoreRun(store, data, run, transform);
160    }
161
162    return run();
163  }
164}
165
166class Channel {
167  constructor(name) {
168    this._subscribers = undefined;
169    this._stores = undefined;
170    this.name = name;
171
172    channels.set(name, this);
173  }
174
175  static [SymbolHasInstance](instance) {
176    const prototype = ObjectGetPrototypeOf(instance);
177    return prototype === Channel.prototype ||
178           prototype === ActiveChannel.prototype;
179  }
180
181  subscribe(subscription) {
182    markActive(this);
183    this.subscribe(subscription);
184  }
185
186  unsubscribe() {
187    return false;
188  }
189
190  bindStore(store, transform) {
191    markActive(this);
192    this.bindStore(store, transform);
193  }
194
195  unbindStore() {
196    return false;
197  }
198
199  get hasSubscribers() {
200    return false;
201  }
202
203  publish() {}
204
205  runStores(data, fn, thisArg, ...args) {
206    return ReflectApply(fn, thisArg, args);
207  }
208}
209
210const channels = new WeakRefMap();
211
212function channel(name) {
213  const channel = channels.get(name);
214  if (channel) return channel;
215
216  if (typeof name !== 'string' && typeof name !== 'symbol') {
217    throw new ERR_INVALID_ARG_TYPE('channel', ['string', 'symbol'], name);
218  }
219
220  return new Channel(name);
221}
222
223function subscribe(name, subscription) {
224  return channel(name).subscribe(subscription);
225}
226
227function unsubscribe(name, subscription) {
228  return channel(name).unsubscribe(subscription);
229}
230
231function hasSubscribers(name) {
232  const channel = channels.get(name);
233  if (!channel) return false;
234
235  return channel.hasSubscribers;
236}
237
238const traceEvents = [
239  'start',
240  'end',
241  'asyncStart',
242  'asyncEnd',
243  'error',
244];
245
246function assertChannel(value, name) {
247  if (!(value instanceof Channel)) {
248    throw new ERR_INVALID_ARG_TYPE(name, ['Channel'], value);
249  }
250}
251
252class TracingChannel {
253  constructor(nameOrChannels) {
254    if (typeof nameOrChannels === 'string') {
255      this.start = channel(`tracing:${nameOrChannels}:start`);
256      this.end = channel(`tracing:${nameOrChannels}:end`);
257      this.asyncStart = channel(`tracing:${nameOrChannels}:asyncStart`);
258      this.asyncEnd = channel(`tracing:${nameOrChannels}:asyncEnd`);
259      this.error = channel(`tracing:${nameOrChannels}:error`);
260    } else if (typeof nameOrChannels === 'object') {
261      const { start, end, asyncStart, asyncEnd, error } = nameOrChannels;
262
263      assertChannel(start, 'nameOrChannels.start');
264      assertChannel(end, 'nameOrChannels.end');
265      assertChannel(asyncStart, 'nameOrChannels.asyncStart');
266      assertChannel(asyncEnd, 'nameOrChannels.asyncEnd');
267      assertChannel(error, 'nameOrChannels.error');
268
269      this.start = start;
270      this.end = end;
271      this.asyncStart = asyncStart;
272      this.asyncEnd = asyncEnd;
273      this.error = error;
274    } else {
275      throw new ERR_INVALID_ARG_TYPE('nameOrChannels',
276                                     ['string', 'object', 'Channel'],
277                                     nameOrChannels);
278    }
279  }
280
281  subscribe(handlers) {
282    for (const name of traceEvents) {
283      if (!handlers[name]) continue;
284
285      this[name]?.subscribe(handlers[name]);
286    }
287  }
288
289  unsubscribe(handlers) {
290    let done = true;
291
292    for (const name of traceEvents) {
293      if (!handlers[name]) continue;
294
295      if (!this[name]?.unsubscribe(handlers[name])) {
296        done = false;
297      }
298    }
299
300    return done;
301  }
302
303  traceSync(fn, context = {}, thisArg, ...args) {
304    const { start, end, error } = this;
305
306    return start.runStores(context, () => {
307      try {
308        const result = ReflectApply(fn, thisArg, args);
309        context.result = result;
310        return result;
311      } catch (err) {
312        context.error = err;
313        error.publish(context);
314        throw err;
315      } finally {
316        end.publish(context);
317      }
318    });
319  }
320
321  tracePromise(fn, context = {}, thisArg, ...args) {
322    const { start, end, asyncStart, asyncEnd, error } = this;
323
324    function reject(err) {
325      context.error = err;
326      error.publish(context);
327      asyncStart.publish(context);
328      // TODO: Is there a way to have asyncEnd _after_ the continuation?
329      asyncEnd.publish(context);
330      return PromiseReject(err);
331    }
332
333    function resolve(result) {
334      context.result = result;
335      asyncStart.publish(context);
336      // TODO: Is there a way to have asyncEnd _after_ the continuation?
337      asyncEnd.publish(context);
338      return result;
339    }
340
341    return start.runStores(context, () => {
342      try {
343        let promise = ReflectApply(fn, thisArg, args);
344        // Convert thenables to native promises
345        if (!(promise instanceof Promise)) {
346          promise = PromiseResolve(promise);
347        }
348        return PromisePrototypeThen(promise, resolve, reject);
349      } catch (err) {
350        context.error = err;
351        error.publish(context);
352        throw err;
353      } finally {
354        end.publish(context);
355      }
356    });
357  }
358
359  traceCallback(fn, position = -1, context = {}, thisArg, ...args) {
360    const { start, end, asyncStart, asyncEnd, error } = this;
361
362    function wrappedCallback(err, res) {
363      if (err) {
364        context.error = err;
365        error.publish(context);
366      } else {
367        context.result = res;
368      }
369
370      // Using runStores here enables manual context failure recovery
371      asyncStart.runStores(context, () => {
372        try {
373          if (callback) {
374            return ReflectApply(callback, this, arguments);
375          }
376        } finally {
377          asyncEnd.publish(context);
378        }
379      });
380    }
381
382    const callback = args.at(position);
383    if (typeof callback !== 'function') {
384      throw new ERR_INVALID_ARG_TYPE('callback', ['function'], callback);
385    }
386    ArrayPrototypeSplice(args, position, 1, wrappedCallback);
387
388    return start.runStores(context, () => {
389      try {
390        return ReflectApply(fn, thisArg, args);
391      } catch (err) {
392        context.error = err;
393        error.publish(context);
394        throw err;
395      } finally {
396        end.publish(context);
397      }
398    });
399  }
400}
401
402function tracingChannel(nameOrChannels) {
403  return new TracingChannel(nameOrChannels);
404}
405
406module.exports = {
407  channel,
408  hasSubscribers,
409  subscribe,
410  tracingChannel,
411  unsubscribe,
412  Channel,
413};
414