1'use strict'; 2 3const { 4 ArrayPrototypeIndexOf, 5 ArrayPrototypePush, 6 ArrayPrototypeSplice, 7 SafeFinalizationRegistry, 8 ObjectGetPrototypeOf, 9 ObjectSetPrototypeOf, 10 Promise, 11 PromisePrototypeThen, 12 PromiseResolve, 13 PromiseReject, 14 ReflectApply, 15 SafeMap, 16 SymbolHasInstance, 17} = primordials; 18 19const { 20 codes: { 21 ERR_INVALID_ARG_TYPE, 22 }, 23} = require('internal/errors'); 24const { 25 validateFunction, 26} = require('internal/validators'); 27 28const { triggerUncaughtException } = internalBinding('errors'); 29 30const { WeakReference } = internalBinding('util'); 31 32// Can't delete when weakref count reaches 0 as it could increment again. 33// Only GC can be used as a valid time to clean up the channels map. 34class WeakRefMap extends SafeMap { 35 #finalizers = new SafeFinalizationRegistry((key) => { 36 this.delete(key); 37 }); 38 39 set(key, value) { 40 this.#finalizers.register(value, key); 41 return super.set(key, new WeakReference(value)); 42 } 43 44 get(key) { 45 return super.get(key)?.get(); 46 } 47 48 incRef(key) { 49 return super.get(key)?.incRef(); 50 } 51 52 decRef(key) { 53 return super.get(key)?.decRef(); 54 } 55} 56 57function markActive(channel) { 58 // eslint-disable-next-line no-use-before-define 59 ObjectSetPrototypeOf(channel, ActiveChannel.prototype); 60 channel._subscribers = []; 61 channel._stores = new SafeMap(); 62} 63 64function maybeMarkInactive(channel) { 65 // When there are no more active subscribers or bound, restore to fast prototype. 66 if (!channel._subscribers.length && !channel._stores.size) { 67 // eslint-disable-next-line no-use-before-define 68 ObjectSetPrototypeOf(channel, Channel.prototype); 69 channel._subscribers = undefined; 70 channel._stores = undefined; 71 } 72} 73 74function defaultTransform(data) { 75 return data; 76} 77 78function wrapStoreRun(store, data, next, transform = defaultTransform) { 79 return () => { 80 let context; 81 try { 82 context = transform(data); 83 } catch (err) { 84 process.nextTick(() => { 85 triggerUncaughtException(err, false); 86 }); 87 return next(); 88 } 89 90 return store.run(context, next); 91 }; 92} 93 94// TODO(qard): should there be a C++ channel interface? 95class ActiveChannel { 96 subscribe(subscription) { 97 validateFunction(subscription, 'subscription'); 98 ArrayPrototypePush(this._subscribers, subscription); 99 channels.incRef(this.name); 100 } 101 102 unsubscribe(subscription) { 103 const index = ArrayPrototypeIndexOf(this._subscribers, subscription); 104 if (index === -1) return false; 105 106 ArrayPrototypeSplice(this._subscribers, index, 1); 107 108 channels.decRef(this.name); 109 maybeMarkInactive(this); 110 111 return true; 112 } 113 114 bindStore(store, transform) { 115 const replacing = this._stores.has(store); 116 if (!replacing) channels.incRef(this.name); 117 this._stores.set(store, transform); 118 } 119 120 unbindStore(store) { 121 if (!this._stores.has(store)) { 122 return false; 123 } 124 125 this._stores.delete(store); 126 127 channels.decRef(this.name); 128 maybeMarkInactive(this); 129 130 return true; 131 } 132 133 get hasSubscribers() { 134 return true; 135 } 136 137 publish(data) { 138 for (let i = 0; i < this._subscribers.length; i++) { 139 try { 140 const onMessage = this._subscribers[i]; 141 onMessage(data, this.name); 142 } catch (err) { 143 process.nextTick(() => { 144 triggerUncaughtException(err, false); 145 }); 146 } 147 } 148 } 149 150 runStores(data, fn, thisArg, ...args) { 151 let run = () => { 152 this.publish(data); 153 return ReflectApply(fn, thisArg, args); 154 }; 155 156 for (const entry of this._stores.entries()) { 157 const store = entry[0]; 158 const transform = entry[1]; 159 run = wrapStoreRun(store, data, run, transform); 160 } 161 162 return run(); 163 } 164} 165 166class Channel { 167 constructor(name) { 168 this._subscribers = undefined; 169 this._stores = undefined; 170 this.name = name; 171 172 channels.set(name, this); 173 } 174 175 static [SymbolHasInstance](instance) { 176 const prototype = ObjectGetPrototypeOf(instance); 177 return prototype === Channel.prototype || 178 prototype === ActiveChannel.prototype; 179 } 180 181 subscribe(subscription) { 182 markActive(this); 183 this.subscribe(subscription); 184 } 185 186 unsubscribe() { 187 return false; 188 } 189 190 bindStore(store, transform) { 191 markActive(this); 192 this.bindStore(store, transform); 193 } 194 195 unbindStore() { 196 return false; 197 } 198 199 get hasSubscribers() { 200 return false; 201 } 202 203 publish() {} 204 205 runStores(data, fn, thisArg, ...args) { 206 return ReflectApply(fn, thisArg, args); 207 } 208} 209 210const channels = new WeakRefMap(); 211 212function channel(name) { 213 const channel = channels.get(name); 214 if (channel) return channel; 215 216 if (typeof name !== 'string' && typeof name !== 'symbol') { 217 throw new ERR_INVALID_ARG_TYPE('channel', ['string', 'symbol'], name); 218 } 219 220 return new Channel(name); 221} 222 223function subscribe(name, subscription) { 224 return channel(name).subscribe(subscription); 225} 226 227function unsubscribe(name, subscription) { 228 return channel(name).unsubscribe(subscription); 229} 230 231function hasSubscribers(name) { 232 const channel = channels.get(name); 233 if (!channel) return false; 234 235 return channel.hasSubscribers; 236} 237 238const traceEvents = [ 239 'start', 240 'end', 241 'asyncStart', 242 'asyncEnd', 243 'error', 244]; 245 246function assertChannel(value, name) { 247 if (!(value instanceof Channel)) { 248 throw new ERR_INVALID_ARG_TYPE(name, ['Channel'], value); 249 } 250} 251 252class TracingChannel { 253 constructor(nameOrChannels) { 254 if (typeof nameOrChannels === 'string') { 255 this.start = channel(`tracing:${nameOrChannels}:start`); 256 this.end = channel(`tracing:${nameOrChannels}:end`); 257 this.asyncStart = channel(`tracing:${nameOrChannels}:asyncStart`); 258 this.asyncEnd = channel(`tracing:${nameOrChannels}:asyncEnd`); 259 this.error = channel(`tracing:${nameOrChannels}:error`); 260 } else if (typeof nameOrChannels === 'object') { 261 const { start, end, asyncStart, asyncEnd, error } = nameOrChannels; 262 263 assertChannel(start, 'nameOrChannels.start'); 264 assertChannel(end, 'nameOrChannels.end'); 265 assertChannel(asyncStart, 'nameOrChannels.asyncStart'); 266 assertChannel(asyncEnd, 'nameOrChannels.asyncEnd'); 267 assertChannel(error, 'nameOrChannels.error'); 268 269 this.start = start; 270 this.end = end; 271 this.asyncStart = asyncStart; 272 this.asyncEnd = asyncEnd; 273 this.error = error; 274 } else { 275 throw new ERR_INVALID_ARG_TYPE('nameOrChannels', 276 ['string', 'object', 'Channel'], 277 nameOrChannels); 278 } 279 } 280 281 subscribe(handlers) { 282 for (const name of traceEvents) { 283 if (!handlers[name]) continue; 284 285 this[name]?.subscribe(handlers[name]); 286 } 287 } 288 289 unsubscribe(handlers) { 290 let done = true; 291 292 for (const name of traceEvents) { 293 if (!handlers[name]) continue; 294 295 if (!this[name]?.unsubscribe(handlers[name])) { 296 done = false; 297 } 298 } 299 300 return done; 301 } 302 303 traceSync(fn, context = {}, thisArg, ...args) { 304 const { start, end, error } = this; 305 306 return start.runStores(context, () => { 307 try { 308 const result = ReflectApply(fn, thisArg, args); 309 context.result = result; 310 return result; 311 } catch (err) { 312 context.error = err; 313 error.publish(context); 314 throw err; 315 } finally { 316 end.publish(context); 317 } 318 }); 319 } 320 321 tracePromise(fn, context = {}, thisArg, ...args) { 322 const { start, end, asyncStart, asyncEnd, error } = this; 323 324 function reject(err) { 325 context.error = err; 326 error.publish(context); 327 asyncStart.publish(context); 328 // TODO: Is there a way to have asyncEnd _after_ the continuation? 329 asyncEnd.publish(context); 330 return PromiseReject(err); 331 } 332 333 function resolve(result) { 334 context.result = result; 335 asyncStart.publish(context); 336 // TODO: Is there a way to have asyncEnd _after_ the continuation? 337 asyncEnd.publish(context); 338 return result; 339 } 340 341 return start.runStores(context, () => { 342 try { 343 let promise = ReflectApply(fn, thisArg, args); 344 // Convert thenables to native promises 345 if (!(promise instanceof Promise)) { 346 promise = PromiseResolve(promise); 347 } 348 return PromisePrototypeThen(promise, resolve, reject); 349 } catch (err) { 350 context.error = err; 351 error.publish(context); 352 throw err; 353 } finally { 354 end.publish(context); 355 } 356 }); 357 } 358 359 traceCallback(fn, position = -1, context = {}, thisArg, ...args) { 360 const { start, end, asyncStart, asyncEnd, error } = this; 361 362 function wrappedCallback(err, res) { 363 if (err) { 364 context.error = err; 365 error.publish(context); 366 } else { 367 context.result = res; 368 } 369 370 // Using runStores here enables manual context failure recovery 371 asyncStart.runStores(context, () => { 372 try { 373 if (callback) { 374 return ReflectApply(callback, this, arguments); 375 } 376 } finally { 377 asyncEnd.publish(context); 378 } 379 }); 380 } 381 382 const callback = args.at(position); 383 if (typeof callback !== 'function') { 384 throw new ERR_INVALID_ARG_TYPE('callback', ['function'], callback); 385 } 386 ArrayPrototypeSplice(args, position, 1, wrappedCallback); 387 388 return start.runStores(context, () => { 389 try { 390 return ReflectApply(fn, thisArg, args); 391 } catch (err) { 392 context.error = err; 393 error.publish(context); 394 throw err; 395 } finally { 396 end.publish(context); 397 } 398 }); 399 } 400} 401 402function tracingChannel(nameOrChannels) { 403 return new TracingChannel(nameOrChannels); 404} 405 406module.exports = { 407 channel, 408 hasSubscribers, 409 subscribe, 410 tracingChannel, 411 unsubscribe, 412 Channel, 413}; 414