1'use strict';
2
3const {
4  ObjectDefineProperties,
5  PromiseResolve,
6  ReflectConstruct,
7} = primordials;
8
9const {
10  kState,
11  setPromiseHandled,
12} = require('internal/webstreams/util');
13
14const {
15  DOMException,
16} = internalBinding('messaging');
17
18const {
19  ReadableStream,
20  readableStreamDefaultControllerEnqueue,
21  readableStreamDefaultControllerClose,
22  readableStreamDefaultControllerError,
23  readableStreamPipeTo,
24} = require('internal/webstreams/readablestream');
25
26const {
27  WritableStream,
28  writableStreamDefaultControllerErrorIfNeeded,
29} = require('internal/webstreams/writablestream');
30
31const {
32  createDeferredPromise,
33} = require('internal/util');
34
35const assert = require('internal/assert');
36
37const {
38  makeTransferable,
39  kClone,
40  kDeserialize,
41} = require('internal/worker/js_transferable');
42
43// This class is a bit of a hack. The Node.js implementation of
44// DOMException is not transferable/cloneable. This provides us
45// with a variant that is. Unfortunately, it means playing around
46// a bit with the message, name, and code properties and the
47// prototype. We can revisit this if DOMException is ever made
48// properly cloneable.
49class CloneableDOMException extends DOMException {
50  constructor(message, name) {
51    super(message, name);
52    this[kDeserialize]({
53      message: this.message,
54      name: this.name,
55      code: this.code,
56    });
57    // eslint-disable-next-line no-constructor-return
58    return makeTransferable(this);
59  }
60
61  [kClone]() {
62    return {
63      data: {
64        message: this.message,
65        name: this.name,
66        code: this.code,
67      },
68      deserializeInfo:
69        'internal/webstreams/transfer:InternalCloneableDOMException',
70    };
71  }
72
73  [kDeserialize]({ message, name, code }) {
74    ObjectDefineProperties(this, {
75      message: {
76        __proto__: null,
77        configurable: true,
78        enumerable: true,
79        get() { return message; },
80      },
81      name: {
82        __proto__: null,
83        configurable: true,
84        enumerable: true,
85        get() { return name; },
86      },
87      code: {
88        __proto__: null,
89        configurable: true,
90        enumerable: true,
91        get() { return code; },
92      },
93    });
94  }
95}
96
97function InternalCloneableDOMException() {
98  return makeTransferable(
99    ReflectConstruct(
100      CloneableDOMException,
101      [],
102      DOMException));
103}
104InternalCloneableDOMException[kDeserialize] = () => {};
105
106class CrossRealmTransformReadableSource {
107  constructor(port) {
108    this[kState] = {
109      port,
110      controller: undefined,
111    };
112
113    port.onmessage = ({ data }) => {
114      const {
115        controller,
116      } = this[kState];
117      const {
118        type,
119        value,
120      } = data;
121      switch (type) {
122        case 'chunk':
123          readableStreamDefaultControllerEnqueue(
124            controller,
125            value);
126          break;
127        case 'close':
128          readableStreamDefaultControllerClose(controller);
129          port.close();
130          break;
131        case 'error':
132          readableStreamDefaultControllerError(controller, value);
133          port.close();
134          break;
135      }
136    };
137
138    port.onmessageerror = () => {
139      const error = new CloneableDOMException(
140        'Internal transferred ReadableStream error',
141        'DataCloneError');
142      port.postMessage({ type: 'error', value: error });
143      readableStreamDefaultControllerError(
144        this[kState].controller,
145        error);
146      port.close();
147    };
148  }
149
150  start(controller) {
151    this[kState].controller = controller;
152  }
153
154  async pull() {
155    this[kState].port.postMessage({ type: 'pull' });
156  }
157
158  async cancel(reason) {
159    try {
160      this[kState].port.postMessage({ type: 'error', value: reason });
161    } catch (error) {
162      if (error instanceof DOMException) {
163        // eslint-disable-next-line no-ex-assign
164        error = new CloneableDOMException(error.message, error.name);
165      }
166      this[kState].port.postMessage({ type: 'error', value: error });
167      throw error;
168    } finally {
169      this[kState].port.close();
170    }
171  }
172}
173
174class CrossRealmTransformWritableSink {
175  constructor(port) {
176    this[kState] = {
177      port,
178      controller: undefined,
179      backpressurePromise: createDeferredPromise(),
180    };
181
182    port.onmessage = ({ data }) => {
183      assert(typeof data === 'object');
184      const {
185        type,
186        value,
187      } = { ...data };
188      assert(typeof type === 'string');
189      switch (type) {
190        case 'pull':
191          if (this[kState].backpressurePromise !== undefined)
192            this[kState].backpressurePromise.resolve?.();
193          this[kState].backpressurePromise = undefined;
194          break;
195        case 'error':
196          writableStreamDefaultControllerErrorIfNeeded(
197            this[kState].controller,
198            value);
199          if (this[kState].backpressurePromise !== undefined)
200            this[kState].backpressurePromise.resolve?.();
201          this[kState].backpressurePromise = undefined;
202          break;
203      }
204    };
205    port.onmessageerror = () => {
206      const error = new CloneableDOMException(
207        'Internal transferred ReadableStream error',
208        'DataCloneError');
209      port.postMessage({ type: 'error', value: error });
210      writableStreamDefaultControllerErrorIfNeeded(
211        this[kState].controller,
212        error);
213      port.close();
214    };
215
216  }
217
218  start(controller) {
219    this[kState].controller = controller;
220  }
221
222  async write(chunk) {
223    if (this[kState].backpressurePromise === undefined) {
224      this[kState].backpressurePromise = {
225        promise: PromiseResolve(),
226        resolve: undefined,
227        reject: undefined,
228      };
229    }
230    await this[kState].backpressurePromise.promise;
231    this[kState].backpressurePromise = createDeferredPromise();
232    try {
233      this[kState].port.postMessage({ type: 'chunk', value: chunk });
234    } catch (error) {
235      if (error instanceof DOMException) {
236        // eslint-disable-next-line no-ex-assign
237        error = new CloneableDOMException(error.message, error.name);
238      }
239      this[kState].port.postMessage({ type: 'error', value: error });
240      this[kState].port.close();
241      throw error;
242    }
243  }
244
245  close() {
246    this[kState].port.postMessage({ type: 'close' });
247    this[kState].port.close();
248  }
249
250  abort(reason) {
251    try {
252      this[kState].port.postMessage({ type: 'error', value: reason });
253    } catch (error) {
254      if (error instanceof DOMException) {
255        // eslint-disable-next-line no-ex-assign
256        error = new CloneableDOMException(error.message, error.name);
257      }
258      this[kState].port.postMessage({ type: 'error', value: error });
259      throw error;
260    } finally {
261      this[kState].port.close();
262    }
263  }
264}
265
266function newCrossRealmReadableStream(writable, port) {
267  const readable =
268    new ReadableStream(
269      new CrossRealmTransformReadableSource(port));
270
271  const promise =
272    readableStreamPipeTo(readable, writable, false, false, false);
273
274  setPromiseHandled(promise);
275
276  return {
277    readable,
278    promise,
279  };
280}
281
282function newCrossRealmWritableSink(readable, port) {
283  const writable =
284    new WritableStream(
285      new CrossRealmTransformWritableSink(port));
286
287  const promise = readableStreamPipeTo(readable, writable, false, false, false);
288  setPromiseHandled(promise);
289  return {
290    writable,
291    promise,
292  };
293}
294
295module.exports = {
296  newCrossRealmReadableStream,
297  newCrossRealmWritableSink,
298  CrossRealmTransformWritableSink,
299  CrossRealmTransformReadableSource,
300  CloneableDOMException,
301  InternalCloneableDOMException,
302};
303