1'use strict'; 2 3const { 4 ObjectDefineProperties, 5 PromiseResolve, 6 ReflectConstruct, 7} = primordials; 8 9const { 10 kState, 11 setPromiseHandled, 12} = require('internal/webstreams/util'); 13 14const { 15 DOMException, 16} = internalBinding('messaging'); 17 18const { 19 ReadableStream, 20 readableStreamDefaultControllerEnqueue, 21 readableStreamDefaultControllerClose, 22 readableStreamDefaultControllerError, 23 readableStreamPipeTo, 24} = require('internal/webstreams/readablestream'); 25 26const { 27 WritableStream, 28 writableStreamDefaultControllerErrorIfNeeded, 29} = require('internal/webstreams/writablestream'); 30 31const { 32 createDeferredPromise, 33} = require('internal/util'); 34 35const assert = require('internal/assert'); 36 37const { 38 makeTransferable, 39 kClone, 40 kDeserialize, 41} = require('internal/worker/js_transferable'); 42 43// This class is a bit of a hack. The Node.js implementation of 44// DOMException is not transferable/cloneable. This provides us 45// with a variant that is. Unfortunately, it means playing around 46// a bit with the message, name, and code properties and the 47// prototype. We can revisit this if DOMException is ever made 48// properly cloneable. 49class CloneableDOMException extends DOMException { 50 constructor(message, name) { 51 super(message, name); 52 this[kDeserialize]({ 53 message: this.message, 54 name: this.name, 55 code: this.code, 56 }); 57 // eslint-disable-next-line no-constructor-return 58 return makeTransferable(this); 59 } 60 61 [kClone]() { 62 return { 63 data: { 64 message: this.message, 65 name: this.name, 66 code: this.code, 67 }, 68 deserializeInfo: 69 'internal/webstreams/transfer:InternalCloneableDOMException', 70 }; 71 } 72 73 [kDeserialize]({ message, name, code }) { 74 ObjectDefineProperties(this, { 75 message: { 76 __proto__: null, 77 configurable: true, 78 enumerable: true, 79 get() { return message; }, 80 }, 81 name: { 82 __proto__: null, 83 configurable: true, 84 enumerable: true, 85 get() { return name; }, 86 }, 87 code: { 88 __proto__: null, 89 configurable: true, 90 enumerable: true, 91 get() { return code; }, 92 }, 93 }); 94 } 95} 96 97function InternalCloneableDOMException() { 98 return makeTransferable( 99 ReflectConstruct( 100 CloneableDOMException, 101 [], 102 DOMException)); 103} 104InternalCloneableDOMException[kDeserialize] = () => {}; 105 106class CrossRealmTransformReadableSource { 107 constructor(port) { 108 this[kState] = { 109 port, 110 controller: undefined, 111 }; 112 113 port.onmessage = ({ data }) => { 114 const { 115 controller, 116 } = this[kState]; 117 const { 118 type, 119 value, 120 } = data; 121 switch (type) { 122 case 'chunk': 123 readableStreamDefaultControllerEnqueue( 124 controller, 125 value); 126 break; 127 case 'close': 128 readableStreamDefaultControllerClose(controller); 129 port.close(); 130 break; 131 case 'error': 132 readableStreamDefaultControllerError(controller, value); 133 port.close(); 134 break; 135 } 136 }; 137 138 port.onmessageerror = () => { 139 const error = new CloneableDOMException( 140 'Internal transferred ReadableStream error', 141 'DataCloneError'); 142 port.postMessage({ type: 'error', value: error }); 143 readableStreamDefaultControllerError( 144 this[kState].controller, 145 error); 146 port.close(); 147 }; 148 } 149 150 start(controller) { 151 this[kState].controller = controller; 152 } 153 154 async pull() { 155 this[kState].port.postMessage({ type: 'pull' }); 156 } 157 158 async cancel(reason) { 159 try { 160 this[kState].port.postMessage({ type: 'error', value: reason }); 161 } catch (error) { 162 if (error instanceof DOMException) { 163 // eslint-disable-next-line no-ex-assign 164 error = new CloneableDOMException(error.message, error.name); 165 } 166 this[kState].port.postMessage({ type: 'error', value: error }); 167 throw error; 168 } finally { 169 this[kState].port.close(); 170 } 171 } 172} 173 174class CrossRealmTransformWritableSink { 175 constructor(port) { 176 this[kState] = { 177 port, 178 controller: undefined, 179 backpressurePromise: createDeferredPromise(), 180 }; 181 182 port.onmessage = ({ data }) => { 183 assert(typeof data === 'object'); 184 const { 185 type, 186 value, 187 } = { ...data }; 188 assert(typeof type === 'string'); 189 switch (type) { 190 case 'pull': 191 if (this[kState].backpressurePromise !== undefined) 192 this[kState].backpressurePromise.resolve?.(); 193 this[kState].backpressurePromise = undefined; 194 break; 195 case 'error': 196 writableStreamDefaultControllerErrorIfNeeded( 197 this[kState].controller, 198 value); 199 if (this[kState].backpressurePromise !== undefined) 200 this[kState].backpressurePromise.resolve?.(); 201 this[kState].backpressurePromise = undefined; 202 break; 203 } 204 }; 205 port.onmessageerror = () => { 206 const error = new CloneableDOMException( 207 'Internal transferred ReadableStream error', 208 'DataCloneError'); 209 port.postMessage({ type: 'error', value: error }); 210 writableStreamDefaultControllerErrorIfNeeded( 211 this[kState].controller, 212 error); 213 port.close(); 214 }; 215 216 } 217 218 start(controller) { 219 this[kState].controller = controller; 220 } 221 222 async write(chunk) { 223 if (this[kState].backpressurePromise === undefined) { 224 this[kState].backpressurePromise = { 225 promise: PromiseResolve(), 226 resolve: undefined, 227 reject: undefined, 228 }; 229 } 230 await this[kState].backpressurePromise.promise; 231 this[kState].backpressurePromise = createDeferredPromise(); 232 try { 233 this[kState].port.postMessage({ type: 'chunk', value: chunk }); 234 } catch (error) { 235 if (error instanceof DOMException) { 236 // eslint-disable-next-line no-ex-assign 237 error = new CloneableDOMException(error.message, error.name); 238 } 239 this[kState].port.postMessage({ type: 'error', value: error }); 240 this[kState].port.close(); 241 throw error; 242 } 243 } 244 245 close() { 246 this[kState].port.postMessage({ type: 'close' }); 247 this[kState].port.close(); 248 } 249 250 abort(reason) { 251 try { 252 this[kState].port.postMessage({ type: 'error', value: reason }); 253 } catch (error) { 254 if (error instanceof DOMException) { 255 // eslint-disable-next-line no-ex-assign 256 error = new CloneableDOMException(error.message, error.name); 257 } 258 this[kState].port.postMessage({ type: 'error', value: error }); 259 throw error; 260 } finally { 261 this[kState].port.close(); 262 } 263 } 264} 265 266function newCrossRealmReadableStream(writable, port) { 267 const readable = 268 new ReadableStream( 269 new CrossRealmTransformReadableSource(port)); 270 271 const promise = 272 readableStreamPipeTo(readable, writable, false, false, false); 273 274 setPromiseHandled(promise); 275 276 return { 277 readable, 278 promise, 279 }; 280} 281 282function newCrossRealmWritableSink(readable, port) { 283 const writable = 284 new WritableStream( 285 new CrossRealmTransformWritableSink(port)); 286 287 const promise = readableStreamPipeTo(readable, writable, false, false, false); 288 setPromiseHandled(promise); 289 return { 290 writable, 291 promise, 292 }; 293} 294 295module.exports = { 296 newCrossRealmReadableStream, 297 newCrossRealmWritableSink, 298 CrossRealmTransformWritableSink, 299 CrossRealmTransformReadableSource, 300 CloneableDOMException, 301 InternalCloneableDOMException, 302}; 303