11cb0ef41Sopenharmony_ci#include "stream_pipe.h" 21cb0ef41Sopenharmony_ci#include "stream_base-inl.h" 31cb0ef41Sopenharmony_ci#include "node_buffer.h" 41cb0ef41Sopenharmony_ci#include "util-inl.h" 51cb0ef41Sopenharmony_ci 61cb0ef41Sopenharmony_cinamespace node { 71cb0ef41Sopenharmony_ci 81cb0ef41Sopenharmony_ciusing v8::BackingStore; 91cb0ef41Sopenharmony_ciusing v8::Context; 101cb0ef41Sopenharmony_ciusing v8::Function; 111cb0ef41Sopenharmony_ciusing v8::FunctionCallbackInfo; 121cb0ef41Sopenharmony_ciusing v8::FunctionTemplate; 131cb0ef41Sopenharmony_ciusing v8::HandleScope; 141cb0ef41Sopenharmony_ciusing v8::Isolate; 151cb0ef41Sopenharmony_ciusing v8::Just; 161cb0ef41Sopenharmony_ciusing v8::Local; 171cb0ef41Sopenharmony_ciusing v8::Maybe; 181cb0ef41Sopenharmony_ciusing v8::Nothing; 191cb0ef41Sopenharmony_ciusing v8::Object; 201cb0ef41Sopenharmony_ciusing v8::Value; 211cb0ef41Sopenharmony_ci 221cb0ef41Sopenharmony_ciStreamPipe::StreamPipe(StreamBase* source, 231cb0ef41Sopenharmony_ci StreamBase* sink, 241cb0ef41Sopenharmony_ci Local<Object> obj) 251cb0ef41Sopenharmony_ci : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) { 261cb0ef41Sopenharmony_ci MakeWeak(); 271cb0ef41Sopenharmony_ci 281cb0ef41Sopenharmony_ci CHECK_NOT_NULL(sink); 291cb0ef41Sopenharmony_ci CHECK_NOT_NULL(source); 301cb0ef41Sopenharmony_ci 311cb0ef41Sopenharmony_ci source->PushStreamListener(&readable_listener_); 321cb0ef41Sopenharmony_ci sink->PushStreamListener(&writable_listener_); 331cb0ef41Sopenharmony_ci 341cb0ef41Sopenharmony_ci uses_wants_write_ = sink->HasWantsWrite(); 351cb0ef41Sopenharmony_ci} 361cb0ef41Sopenharmony_ci 371cb0ef41Sopenharmony_ciStreamPipe::~StreamPipe() { 381cb0ef41Sopenharmony_ci Unpipe(true); 391cb0ef41Sopenharmony_ci} 401cb0ef41Sopenharmony_ci 411cb0ef41Sopenharmony_ciStreamBase* StreamPipe::source() { 421cb0ef41Sopenharmony_ci return static_cast<StreamBase*>(readable_listener_.stream()); 431cb0ef41Sopenharmony_ci} 441cb0ef41Sopenharmony_ci 451cb0ef41Sopenharmony_ciStreamBase* StreamPipe::sink() { 461cb0ef41Sopenharmony_ci return static_cast<StreamBase*>(writable_listener_.stream()); 471cb0ef41Sopenharmony_ci} 481cb0ef41Sopenharmony_ci 491cb0ef41Sopenharmony_civoid StreamPipe::Unpipe(bool is_in_deletion) { 501cb0ef41Sopenharmony_ci if (is_closed_) 511cb0ef41Sopenharmony_ci return; 521cb0ef41Sopenharmony_ci 531cb0ef41Sopenharmony_ci // Note that we possibly cannot use virtual methods on `source` and `sink` 541cb0ef41Sopenharmony_ci // here, because this function can be called from their destructors via 551cb0ef41Sopenharmony_ci // `OnStreamDestroy()`. 561cb0ef41Sopenharmony_ci if (!source_destroyed_) 571cb0ef41Sopenharmony_ci source()->ReadStop(); 581cb0ef41Sopenharmony_ci 591cb0ef41Sopenharmony_ci is_closed_ = true; 601cb0ef41Sopenharmony_ci is_reading_ = false; 611cb0ef41Sopenharmony_ci source()->RemoveStreamListener(&readable_listener_); 621cb0ef41Sopenharmony_ci if (pending_writes_ == 0) 631cb0ef41Sopenharmony_ci sink()->RemoveStreamListener(&writable_listener_); 641cb0ef41Sopenharmony_ci 651cb0ef41Sopenharmony_ci if (is_in_deletion) return; 661cb0ef41Sopenharmony_ci 671cb0ef41Sopenharmony_ci // Delay the JS-facing part with SetImmediate, because this might be from 681cb0ef41Sopenharmony_ci // inside the garbage collector, so we can’t run JS here. 691cb0ef41Sopenharmony_ci HandleScope handle_scope(env()->isolate()); 701cb0ef41Sopenharmony_ci BaseObjectPtr<StreamPipe> strong_ref{this}; 711cb0ef41Sopenharmony_ci env()->SetImmediate([this, strong_ref](Environment* env) { 721cb0ef41Sopenharmony_ci HandleScope handle_scope(env->isolate()); 731cb0ef41Sopenharmony_ci Context::Scope context_scope(env->context()); 741cb0ef41Sopenharmony_ci Local<Object> object = this->object(); 751cb0ef41Sopenharmony_ci 761cb0ef41Sopenharmony_ci Local<Value> onunpipe; 771cb0ef41Sopenharmony_ci if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe)) 781cb0ef41Sopenharmony_ci return; 791cb0ef41Sopenharmony_ci if (onunpipe->IsFunction() && 801cb0ef41Sopenharmony_ci MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) { 811cb0ef41Sopenharmony_ci return; 821cb0ef41Sopenharmony_ci } 831cb0ef41Sopenharmony_ci 841cb0ef41Sopenharmony_ci // Set all the links established in the constructor to `null`. 851cb0ef41Sopenharmony_ci Local<Value> null = Null(env->isolate()); 861cb0ef41Sopenharmony_ci 871cb0ef41Sopenharmony_ci Local<Value> source_v; 881cb0ef41Sopenharmony_ci Local<Value> sink_v; 891cb0ef41Sopenharmony_ci if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) || 901cb0ef41Sopenharmony_ci !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) || 911cb0ef41Sopenharmony_ci !source_v->IsObject() || !sink_v->IsObject()) { 921cb0ef41Sopenharmony_ci return; 931cb0ef41Sopenharmony_ci } 941cb0ef41Sopenharmony_ci 951cb0ef41Sopenharmony_ci if (object->Set(env->context(), env->source_string(), null).IsNothing() || 961cb0ef41Sopenharmony_ci object->Set(env->context(), env->sink_string(), null).IsNothing() || 971cb0ef41Sopenharmony_ci source_v.As<Object>() 981cb0ef41Sopenharmony_ci ->Set(env->context(), env->pipe_target_string(), null) 991cb0ef41Sopenharmony_ci .IsNothing() || 1001cb0ef41Sopenharmony_ci sink_v.As<Object>() 1011cb0ef41Sopenharmony_ci ->Set(env->context(), env->pipe_source_string(), null) 1021cb0ef41Sopenharmony_ci .IsNothing()) { 1031cb0ef41Sopenharmony_ci return; 1041cb0ef41Sopenharmony_ci } 1051cb0ef41Sopenharmony_ci }); 1061cb0ef41Sopenharmony_ci} 1071cb0ef41Sopenharmony_ci 1081cb0ef41Sopenharmony_ciuv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) { 1091cb0ef41Sopenharmony_ci StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); 1101cb0ef41Sopenharmony_ci size_t size = std::min(suggested_size, pipe->wanted_data_); 1111cb0ef41Sopenharmony_ci CHECK_GT(size, 0); 1121cb0ef41Sopenharmony_ci return pipe->env()->allocate_managed_buffer(size); 1131cb0ef41Sopenharmony_ci} 1141cb0ef41Sopenharmony_ci 1151cb0ef41Sopenharmony_civoid StreamPipe::ReadableListener::OnStreamRead(ssize_t nread, 1161cb0ef41Sopenharmony_ci const uv_buf_t& buf_) { 1171cb0ef41Sopenharmony_ci StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); 1181cb0ef41Sopenharmony_ci std::unique_ptr<BackingStore> bs = pipe->env()->release_managed_buffer(buf_); 1191cb0ef41Sopenharmony_ci if (nread < 0) { 1201cb0ef41Sopenharmony_ci // EOF or error; stop reading and pass the error to the previous listener 1211cb0ef41Sopenharmony_ci // (which might end up in JS). 1221cb0ef41Sopenharmony_ci pipe->is_eof_ = true; 1231cb0ef41Sopenharmony_ci // Cache `sink()` here because the previous listener might do things 1241cb0ef41Sopenharmony_ci // that eventually lead to an `Unpipe()` call. 1251cb0ef41Sopenharmony_ci StreamBase* sink = pipe->sink(); 1261cb0ef41Sopenharmony_ci stream()->ReadStop(); 1271cb0ef41Sopenharmony_ci CHECK_NOT_NULL(previous_listener_); 1281cb0ef41Sopenharmony_ci previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0)); 1291cb0ef41Sopenharmony_ci // If we’re not writing, close now. Otherwise, we’ll do that in 1301cb0ef41Sopenharmony_ci // `OnStreamAfterWrite()`. 1311cb0ef41Sopenharmony_ci if (pipe->pending_writes_ == 0) { 1321cb0ef41Sopenharmony_ci sink->Shutdown(); 1331cb0ef41Sopenharmony_ci pipe->Unpipe(); 1341cb0ef41Sopenharmony_ci } 1351cb0ef41Sopenharmony_ci return; 1361cb0ef41Sopenharmony_ci } 1371cb0ef41Sopenharmony_ci 1381cb0ef41Sopenharmony_ci pipe->ProcessData(nread, std::move(bs)); 1391cb0ef41Sopenharmony_ci} 1401cb0ef41Sopenharmony_ci 1411cb0ef41Sopenharmony_civoid StreamPipe::ProcessData(size_t nread, 1421cb0ef41Sopenharmony_ci std::unique_ptr<BackingStore> bs) { 1431cb0ef41Sopenharmony_ci CHECK(uses_wants_write_ || pending_writes_ == 0); 1441cb0ef41Sopenharmony_ci uv_buf_t buffer = uv_buf_init(static_cast<char*>(bs->Data()), nread); 1451cb0ef41Sopenharmony_ci StreamWriteResult res = sink()->Write(&buffer, 1); 1461cb0ef41Sopenharmony_ci pending_writes_++; 1471cb0ef41Sopenharmony_ci if (!res.async) { 1481cb0ef41Sopenharmony_ci writable_listener_.OnStreamAfterWrite(nullptr, res.err); 1491cb0ef41Sopenharmony_ci } else { 1501cb0ef41Sopenharmony_ci is_reading_ = false; 1511cb0ef41Sopenharmony_ci res.wrap->SetBackingStore(std::move(bs)); 1521cb0ef41Sopenharmony_ci if (source() != nullptr) 1531cb0ef41Sopenharmony_ci source()->ReadStop(); 1541cb0ef41Sopenharmony_ci } 1551cb0ef41Sopenharmony_ci} 1561cb0ef41Sopenharmony_ci 1571cb0ef41Sopenharmony_civoid StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w, 1581cb0ef41Sopenharmony_ci int status) { 1591cb0ef41Sopenharmony_ci StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); 1601cb0ef41Sopenharmony_ci pipe->pending_writes_--; 1611cb0ef41Sopenharmony_ci if (pipe->is_closed_) { 1621cb0ef41Sopenharmony_ci if (pipe->pending_writes_ == 0) { 1631cb0ef41Sopenharmony_ci Environment* env = pipe->env(); 1641cb0ef41Sopenharmony_ci HandleScope handle_scope(env->isolate()); 1651cb0ef41Sopenharmony_ci Context::Scope context_scope(env->context()); 1661cb0ef41Sopenharmony_ci if (pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).IsEmpty()) 1671cb0ef41Sopenharmony_ci return; 1681cb0ef41Sopenharmony_ci stream()->RemoveStreamListener(this); 1691cb0ef41Sopenharmony_ci } 1701cb0ef41Sopenharmony_ci return; 1711cb0ef41Sopenharmony_ci } 1721cb0ef41Sopenharmony_ci 1731cb0ef41Sopenharmony_ci if (pipe->is_eof_) { 1741cb0ef41Sopenharmony_ci HandleScope handle_scope(pipe->env()->isolate()); 1751cb0ef41Sopenharmony_ci InternalCallbackScope callback_scope(pipe, 1761cb0ef41Sopenharmony_ci InternalCallbackScope::kSkipTaskQueues); 1771cb0ef41Sopenharmony_ci pipe->sink()->Shutdown(); 1781cb0ef41Sopenharmony_ci pipe->Unpipe(); 1791cb0ef41Sopenharmony_ci return; 1801cb0ef41Sopenharmony_ci } 1811cb0ef41Sopenharmony_ci 1821cb0ef41Sopenharmony_ci if (status != 0) { 1831cb0ef41Sopenharmony_ci CHECK_NOT_NULL(previous_listener_); 1841cb0ef41Sopenharmony_ci StreamListener* prev = previous_listener_; 1851cb0ef41Sopenharmony_ci pipe->Unpipe(); 1861cb0ef41Sopenharmony_ci prev->OnStreamAfterWrite(w, status); 1871cb0ef41Sopenharmony_ci return; 1881cb0ef41Sopenharmony_ci } 1891cb0ef41Sopenharmony_ci 1901cb0ef41Sopenharmony_ci if (!pipe->uses_wants_write_) { 1911cb0ef41Sopenharmony_ci OnStreamWantsWrite(65536); 1921cb0ef41Sopenharmony_ci } 1931cb0ef41Sopenharmony_ci} 1941cb0ef41Sopenharmony_ci 1951cb0ef41Sopenharmony_civoid StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w, 1961cb0ef41Sopenharmony_ci int status) { 1971cb0ef41Sopenharmony_ci StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); 1981cb0ef41Sopenharmony_ci CHECK_NOT_NULL(previous_listener_); 1991cb0ef41Sopenharmony_ci StreamListener* prev = previous_listener_; 2001cb0ef41Sopenharmony_ci pipe->Unpipe(); 2011cb0ef41Sopenharmony_ci prev->OnStreamAfterShutdown(w, status); 2021cb0ef41Sopenharmony_ci} 2031cb0ef41Sopenharmony_ci 2041cb0ef41Sopenharmony_civoid StreamPipe::ReadableListener::OnStreamDestroy() { 2051cb0ef41Sopenharmony_ci StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); 2061cb0ef41Sopenharmony_ci pipe->source_destroyed_ = true; 2071cb0ef41Sopenharmony_ci if (!pipe->is_eof_) { 2081cb0ef41Sopenharmony_ci OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0)); 2091cb0ef41Sopenharmony_ci } 2101cb0ef41Sopenharmony_ci} 2111cb0ef41Sopenharmony_ci 2121cb0ef41Sopenharmony_civoid StreamPipe::WritableListener::OnStreamDestroy() { 2131cb0ef41Sopenharmony_ci StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); 2141cb0ef41Sopenharmony_ci pipe->sink_destroyed_ = true; 2151cb0ef41Sopenharmony_ci pipe->is_eof_ = true; 2161cb0ef41Sopenharmony_ci pipe->pending_writes_ = 0; 2171cb0ef41Sopenharmony_ci pipe->Unpipe(); 2181cb0ef41Sopenharmony_ci} 2191cb0ef41Sopenharmony_ci 2201cb0ef41Sopenharmony_civoid StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) { 2211cb0ef41Sopenharmony_ci StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); 2221cb0ef41Sopenharmony_ci pipe->wanted_data_ = suggested_size; 2231cb0ef41Sopenharmony_ci if (pipe->is_reading_ || pipe->is_closed_) 2241cb0ef41Sopenharmony_ci return; 2251cb0ef41Sopenharmony_ci HandleScope handle_scope(pipe->env()->isolate()); 2261cb0ef41Sopenharmony_ci InternalCallbackScope callback_scope(pipe, 2271cb0ef41Sopenharmony_ci InternalCallbackScope::kSkipTaskQueues); 2281cb0ef41Sopenharmony_ci pipe->is_reading_ = true; 2291cb0ef41Sopenharmony_ci pipe->source()->ReadStart(); 2301cb0ef41Sopenharmony_ci} 2311cb0ef41Sopenharmony_ci 2321cb0ef41Sopenharmony_ciuv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) { 2331cb0ef41Sopenharmony_ci CHECK_NOT_NULL(previous_listener_); 2341cb0ef41Sopenharmony_ci return previous_listener_->OnStreamAlloc(suggested_size); 2351cb0ef41Sopenharmony_ci} 2361cb0ef41Sopenharmony_ci 2371cb0ef41Sopenharmony_civoid StreamPipe::WritableListener::OnStreamRead(ssize_t nread, 2381cb0ef41Sopenharmony_ci const uv_buf_t& buf) { 2391cb0ef41Sopenharmony_ci CHECK_NOT_NULL(previous_listener_); 2401cb0ef41Sopenharmony_ci return previous_listener_->OnStreamRead(nread, buf); 2411cb0ef41Sopenharmony_ci} 2421cb0ef41Sopenharmony_ci 2431cb0ef41Sopenharmony_ciMaybe<StreamPipe*> StreamPipe::New(StreamBase* source, 2441cb0ef41Sopenharmony_ci StreamBase* sink, 2451cb0ef41Sopenharmony_ci Local<Object> obj) { 2461cb0ef41Sopenharmony_ci std::unique_ptr<StreamPipe> stream_pipe(new StreamPipe(source, sink, obj)); 2471cb0ef41Sopenharmony_ci 2481cb0ef41Sopenharmony_ci // Set up links between this object and the source/sink objects. 2491cb0ef41Sopenharmony_ci // In particular, this makes sure that they are garbage collected as a group, 2501cb0ef41Sopenharmony_ci // if that applies to the given streams (for example, Http2Streams use 2511cb0ef41Sopenharmony_ci // weak references). 2521cb0ef41Sopenharmony_ci Environment* env = source->stream_env(); 2531cb0ef41Sopenharmony_ci if (obj->Set(env->context(), env->source_string(), source->GetObject()) 2541cb0ef41Sopenharmony_ci .IsNothing()) { 2551cb0ef41Sopenharmony_ci return Nothing<StreamPipe*>(); 2561cb0ef41Sopenharmony_ci } 2571cb0ef41Sopenharmony_ci if (source->GetObject() 2581cb0ef41Sopenharmony_ci ->Set(env->context(), env->pipe_target_string(), obj) 2591cb0ef41Sopenharmony_ci .IsNothing()) { 2601cb0ef41Sopenharmony_ci return Nothing<StreamPipe*>(); 2611cb0ef41Sopenharmony_ci } 2621cb0ef41Sopenharmony_ci if (obj->Set(env->context(), env->sink_string(), sink->GetObject()) 2631cb0ef41Sopenharmony_ci .IsNothing()) { 2641cb0ef41Sopenharmony_ci return Nothing<StreamPipe*>(); 2651cb0ef41Sopenharmony_ci } 2661cb0ef41Sopenharmony_ci if (sink->GetObject() 2671cb0ef41Sopenharmony_ci ->Set(env->context(), env->pipe_source_string(), obj) 2681cb0ef41Sopenharmony_ci .IsNothing()) { 2691cb0ef41Sopenharmony_ci return Nothing<StreamPipe*>(); 2701cb0ef41Sopenharmony_ci } 2711cb0ef41Sopenharmony_ci 2721cb0ef41Sopenharmony_ci return Just(stream_pipe.release()); 2731cb0ef41Sopenharmony_ci} 2741cb0ef41Sopenharmony_ci 2751cb0ef41Sopenharmony_civoid StreamPipe::New(const FunctionCallbackInfo<Value>& args) { 2761cb0ef41Sopenharmony_ci CHECK(args.IsConstructCall()); 2771cb0ef41Sopenharmony_ci CHECK(args[0]->IsObject()); 2781cb0ef41Sopenharmony_ci CHECK(args[1]->IsObject()); 2791cb0ef41Sopenharmony_ci StreamBase* source = StreamBase::FromObject(args[0].As<Object>()); 2801cb0ef41Sopenharmony_ci StreamBase* sink = StreamBase::FromObject(args[1].As<Object>()); 2811cb0ef41Sopenharmony_ci 2821cb0ef41Sopenharmony_ci if (StreamPipe::New(source, sink, args.This()).IsNothing()) return; 2831cb0ef41Sopenharmony_ci} 2841cb0ef41Sopenharmony_ci 2851cb0ef41Sopenharmony_civoid StreamPipe::Start(const FunctionCallbackInfo<Value>& args) { 2861cb0ef41Sopenharmony_ci StreamPipe* pipe; 2871cb0ef41Sopenharmony_ci ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); 2881cb0ef41Sopenharmony_ci pipe->is_closed_ = false; 2891cb0ef41Sopenharmony_ci pipe->writable_listener_.OnStreamWantsWrite(65536); 2901cb0ef41Sopenharmony_ci} 2911cb0ef41Sopenharmony_ci 2921cb0ef41Sopenharmony_civoid StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) { 2931cb0ef41Sopenharmony_ci StreamPipe* pipe; 2941cb0ef41Sopenharmony_ci ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); 2951cb0ef41Sopenharmony_ci pipe->Unpipe(); 2961cb0ef41Sopenharmony_ci} 2971cb0ef41Sopenharmony_ci 2981cb0ef41Sopenharmony_civoid StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) { 2991cb0ef41Sopenharmony_ci StreamPipe* pipe; 3001cb0ef41Sopenharmony_ci ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); 3011cb0ef41Sopenharmony_ci args.GetReturnValue().Set(pipe->is_closed_); 3021cb0ef41Sopenharmony_ci} 3031cb0ef41Sopenharmony_ci 3041cb0ef41Sopenharmony_civoid StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) { 3051cb0ef41Sopenharmony_ci StreamPipe* pipe; 3061cb0ef41Sopenharmony_ci ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); 3071cb0ef41Sopenharmony_ci args.GetReturnValue().Set(pipe->pending_writes_); 3081cb0ef41Sopenharmony_ci} 3091cb0ef41Sopenharmony_ci 3101cb0ef41Sopenharmony_cinamespace { 3111cb0ef41Sopenharmony_ci 3121cb0ef41Sopenharmony_civoid InitializeStreamPipe(Local<Object> target, 3131cb0ef41Sopenharmony_ci Local<Value> unused, 3141cb0ef41Sopenharmony_ci Local<Context> context, 3151cb0ef41Sopenharmony_ci void* priv) { 3161cb0ef41Sopenharmony_ci Environment* env = Environment::GetCurrent(context); 3171cb0ef41Sopenharmony_ci Isolate* isolate = env->isolate(); 3181cb0ef41Sopenharmony_ci 3191cb0ef41Sopenharmony_ci // Create FunctionTemplate for FileHandle::CloseReq 3201cb0ef41Sopenharmony_ci Local<FunctionTemplate> pipe = NewFunctionTemplate(isolate, StreamPipe::New); 3211cb0ef41Sopenharmony_ci SetProtoMethod(isolate, pipe, "unpipe", StreamPipe::Unpipe); 3221cb0ef41Sopenharmony_ci SetProtoMethod(isolate, pipe, "start", StreamPipe::Start); 3231cb0ef41Sopenharmony_ci SetProtoMethod(isolate, pipe, "isClosed", StreamPipe::IsClosed); 3241cb0ef41Sopenharmony_ci SetProtoMethod(isolate, pipe, "pendingWrites", StreamPipe::PendingWrites); 3251cb0ef41Sopenharmony_ci pipe->Inherit(AsyncWrap::GetConstructorTemplate(env)); 3261cb0ef41Sopenharmony_ci pipe->InstanceTemplate()->SetInternalFieldCount( 3271cb0ef41Sopenharmony_ci StreamPipe::kInternalFieldCount); 3281cb0ef41Sopenharmony_ci SetConstructorFunction(context, target, "StreamPipe", pipe); 3291cb0ef41Sopenharmony_ci} 3301cb0ef41Sopenharmony_ci 3311cb0ef41Sopenharmony_ci} // anonymous namespace 3321cb0ef41Sopenharmony_ci 3331cb0ef41Sopenharmony_ci} // namespace node 3341cb0ef41Sopenharmony_ci 3351cb0ef41Sopenharmony_ciNODE_BINDING_CONTEXT_AWARE_INTERNAL(stream_pipe, node::InitializeStreamPipe) 336