11cb0ef41Sopenharmony_ci#include "stream_pipe.h"
21cb0ef41Sopenharmony_ci#include "stream_base-inl.h"
31cb0ef41Sopenharmony_ci#include "node_buffer.h"
41cb0ef41Sopenharmony_ci#include "util-inl.h"
51cb0ef41Sopenharmony_ci
61cb0ef41Sopenharmony_cinamespace node {
71cb0ef41Sopenharmony_ci
81cb0ef41Sopenharmony_ciusing v8::BackingStore;
91cb0ef41Sopenharmony_ciusing v8::Context;
101cb0ef41Sopenharmony_ciusing v8::Function;
111cb0ef41Sopenharmony_ciusing v8::FunctionCallbackInfo;
121cb0ef41Sopenharmony_ciusing v8::FunctionTemplate;
131cb0ef41Sopenharmony_ciusing v8::HandleScope;
141cb0ef41Sopenharmony_ciusing v8::Isolate;
151cb0ef41Sopenharmony_ciusing v8::Just;
161cb0ef41Sopenharmony_ciusing v8::Local;
171cb0ef41Sopenharmony_ciusing v8::Maybe;
181cb0ef41Sopenharmony_ciusing v8::Nothing;
191cb0ef41Sopenharmony_ciusing v8::Object;
201cb0ef41Sopenharmony_ciusing v8::Value;
211cb0ef41Sopenharmony_ci
221cb0ef41Sopenharmony_ciStreamPipe::StreamPipe(StreamBase* source,
231cb0ef41Sopenharmony_ci                       StreamBase* sink,
241cb0ef41Sopenharmony_ci                       Local<Object> obj)
251cb0ef41Sopenharmony_ci    : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
261cb0ef41Sopenharmony_ci  MakeWeak();
271cb0ef41Sopenharmony_ci
281cb0ef41Sopenharmony_ci  CHECK_NOT_NULL(sink);
291cb0ef41Sopenharmony_ci  CHECK_NOT_NULL(source);
301cb0ef41Sopenharmony_ci
311cb0ef41Sopenharmony_ci  source->PushStreamListener(&readable_listener_);
321cb0ef41Sopenharmony_ci  sink->PushStreamListener(&writable_listener_);
331cb0ef41Sopenharmony_ci
341cb0ef41Sopenharmony_ci  uses_wants_write_ = sink->HasWantsWrite();
351cb0ef41Sopenharmony_ci}
361cb0ef41Sopenharmony_ci
371cb0ef41Sopenharmony_ciStreamPipe::~StreamPipe() {
381cb0ef41Sopenharmony_ci  Unpipe(true);
391cb0ef41Sopenharmony_ci}
401cb0ef41Sopenharmony_ci
411cb0ef41Sopenharmony_ciStreamBase* StreamPipe::source() {
421cb0ef41Sopenharmony_ci  return static_cast<StreamBase*>(readable_listener_.stream());
431cb0ef41Sopenharmony_ci}
441cb0ef41Sopenharmony_ci
451cb0ef41Sopenharmony_ciStreamBase* StreamPipe::sink() {
461cb0ef41Sopenharmony_ci  return static_cast<StreamBase*>(writable_listener_.stream());
471cb0ef41Sopenharmony_ci}
481cb0ef41Sopenharmony_ci
491cb0ef41Sopenharmony_civoid StreamPipe::Unpipe(bool is_in_deletion) {
501cb0ef41Sopenharmony_ci  if (is_closed_)
511cb0ef41Sopenharmony_ci    return;
521cb0ef41Sopenharmony_ci
531cb0ef41Sopenharmony_ci  // Note that we possibly cannot use virtual methods on `source` and `sink`
541cb0ef41Sopenharmony_ci  // here, because this function can be called from their destructors via
551cb0ef41Sopenharmony_ci  // `OnStreamDestroy()`.
561cb0ef41Sopenharmony_ci  if (!source_destroyed_)
571cb0ef41Sopenharmony_ci    source()->ReadStop();
581cb0ef41Sopenharmony_ci
591cb0ef41Sopenharmony_ci  is_closed_ = true;
601cb0ef41Sopenharmony_ci  is_reading_ = false;
611cb0ef41Sopenharmony_ci  source()->RemoveStreamListener(&readable_listener_);
621cb0ef41Sopenharmony_ci  if (pending_writes_ == 0)
631cb0ef41Sopenharmony_ci    sink()->RemoveStreamListener(&writable_listener_);
641cb0ef41Sopenharmony_ci
651cb0ef41Sopenharmony_ci  if (is_in_deletion) return;
661cb0ef41Sopenharmony_ci
671cb0ef41Sopenharmony_ci  // Delay the JS-facing part with SetImmediate, because this might be from
681cb0ef41Sopenharmony_ci  // inside the garbage collector, so we can’t run JS here.
691cb0ef41Sopenharmony_ci  HandleScope handle_scope(env()->isolate());
701cb0ef41Sopenharmony_ci  BaseObjectPtr<StreamPipe> strong_ref{this};
711cb0ef41Sopenharmony_ci  env()->SetImmediate([this, strong_ref](Environment* env) {
721cb0ef41Sopenharmony_ci    HandleScope handle_scope(env->isolate());
731cb0ef41Sopenharmony_ci    Context::Scope context_scope(env->context());
741cb0ef41Sopenharmony_ci    Local<Object> object = this->object();
751cb0ef41Sopenharmony_ci
761cb0ef41Sopenharmony_ci    Local<Value> onunpipe;
771cb0ef41Sopenharmony_ci    if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
781cb0ef41Sopenharmony_ci      return;
791cb0ef41Sopenharmony_ci    if (onunpipe->IsFunction() &&
801cb0ef41Sopenharmony_ci        MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
811cb0ef41Sopenharmony_ci      return;
821cb0ef41Sopenharmony_ci    }
831cb0ef41Sopenharmony_ci
841cb0ef41Sopenharmony_ci    // Set all the links established in the constructor to `null`.
851cb0ef41Sopenharmony_ci    Local<Value> null = Null(env->isolate());
861cb0ef41Sopenharmony_ci
871cb0ef41Sopenharmony_ci    Local<Value> source_v;
881cb0ef41Sopenharmony_ci    Local<Value> sink_v;
891cb0ef41Sopenharmony_ci    if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) ||
901cb0ef41Sopenharmony_ci        !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) ||
911cb0ef41Sopenharmony_ci        !source_v->IsObject() || !sink_v->IsObject()) {
921cb0ef41Sopenharmony_ci      return;
931cb0ef41Sopenharmony_ci    }
941cb0ef41Sopenharmony_ci
951cb0ef41Sopenharmony_ci    if (object->Set(env->context(), env->source_string(), null).IsNothing() ||
961cb0ef41Sopenharmony_ci        object->Set(env->context(), env->sink_string(), null).IsNothing() ||
971cb0ef41Sopenharmony_ci        source_v.As<Object>()
981cb0ef41Sopenharmony_ci            ->Set(env->context(), env->pipe_target_string(), null)
991cb0ef41Sopenharmony_ci            .IsNothing() ||
1001cb0ef41Sopenharmony_ci        sink_v.As<Object>()
1011cb0ef41Sopenharmony_ci            ->Set(env->context(), env->pipe_source_string(), null)
1021cb0ef41Sopenharmony_ci            .IsNothing()) {
1031cb0ef41Sopenharmony_ci      return;
1041cb0ef41Sopenharmony_ci    }
1051cb0ef41Sopenharmony_ci  });
1061cb0ef41Sopenharmony_ci}
1071cb0ef41Sopenharmony_ci
1081cb0ef41Sopenharmony_ciuv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
1091cb0ef41Sopenharmony_ci  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
1101cb0ef41Sopenharmony_ci  size_t size = std::min(suggested_size, pipe->wanted_data_);
1111cb0ef41Sopenharmony_ci  CHECK_GT(size, 0);
1121cb0ef41Sopenharmony_ci  return pipe->env()->allocate_managed_buffer(size);
1131cb0ef41Sopenharmony_ci}
1141cb0ef41Sopenharmony_ci
1151cb0ef41Sopenharmony_civoid StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
1161cb0ef41Sopenharmony_ci                                                const uv_buf_t& buf_) {
1171cb0ef41Sopenharmony_ci  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
1181cb0ef41Sopenharmony_ci  std::unique_ptr<BackingStore> bs = pipe->env()->release_managed_buffer(buf_);
1191cb0ef41Sopenharmony_ci  if (nread < 0) {
1201cb0ef41Sopenharmony_ci    // EOF or error; stop reading and pass the error to the previous listener
1211cb0ef41Sopenharmony_ci    // (which might end up in JS).
1221cb0ef41Sopenharmony_ci    pipe->is_eof_ = true;
1231cb0ef41Sopenharmony_ci    // Cache `sink()` here because the previous listener might do things
1241cb0ef41Sopenharmony_ci    // that eventually lead to an `Unpipe()` call.
1251cb0ef41Sopenharmony_ci    StreamBase* sink = pipe->sink();
1261cb0ef41Sopenharmony_ci    stream()->ReadStop();
1271cb0ef41Sopenharmony_ci    CHECK_NOT_NULL(previous_listener_);
1281cb0ef41Sopenharmony_ci    previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
1291cb0ef41Sopenharmony_ci    // If we’re not writing, close now. Otherwise, we’ll do that in
1301cb0ef41Sopenharmony_ci    // `OnStreamAfterWrite()`.
1311cb0ef41Sopenharmony_ci    if (pipe->pending_writes_ == 0) {
1321cb0ef41Sopenharmony_ci      sink->Shutdown();
1331cb0ef41Sopenharmony_ci      pipe->Unpipe();
1341cb0ef41Sopenharmony_ci    }
1351cb0ef41Sopenharmony_ci    return;
1361cb0ef41Sopenharmony_ci  }
1371cb0ef41Sopenharmony_ci
1381cb0ef41Sopenharmony_ci  pipe->ProcessData(nread, std::move(bs));
1391cb0ef41Sopenharmony_ci}
1401cb0ef41Sopenharmony_ci
1411cb0ef41Sopenharmony_civoid StreamPipe::ProcessData(size_t nread,
1421cb0ef41Sopenharmony_ci                             std::unique_ptr<BackingStore> bs) {
1431cb0ef41Sopenharmony_ci  CHECK(uses_wants_write_ || pending_writes_ == 0);
1441cb0ef41Sopenharmony_ci  uv_buf_t buffer = uv_buf_init(static_cast<char*>(bs->Data()), nread);
1451cb0ef41Sopenharmony_ci  StreamWriteResult res = sink()->Write(&buffer, 1);
1461cb0ef41Sopenharmony_ci  pending_writes_++;
1471cb0ef41Sopenharmony_ci  if (!res.async) {
1481cb0ef41Sopenharmony_ci    writable_listener_.OnStreamAfterWrite(nullptr, res.err);
1491cb0ef41Sopenharmony_ci  } else {
1501cb0ef41Sopenharmony_ci    is_reading_ = false;
1511cb0ef41Sopenharmony_ci    res.wrap->SetBackingStore(std::move(bs));
1521cb0ef41Sopenharmony_ci    if (source() != nullptr)
1531cb0ef41Sopenharmony_ci      source()->ReadStop();
1541cb0ef41Sopenharmony_ci  }
1551cb0ef41Sopenharmony_ci}
1561cb0ef41Sopenharmony_ci
1571cb0ef41Sopenharmony_civoid StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
1581cb0ef41Sopenharmony_ci                                                      int status) {
1591cb0ef41Sopenharmony_ci  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
1601cb0ef41Sopenharmony_ci  pipe->pending_writes_--;
1611cb0ef41Sopenharmony_ci  if (pipe->is_closed_) {
1621cb0ef41Sopenharmony_ci    if (pipe->pending_writes_ == 0) {
1631cb0ef41Sopenharmony_ci      Environment* env = pipe->env();
1641cb0ef41Sopenharmony_ci      HandleScope handle_scope(env->isolate());
1651cb0ef41Sopenharmony_ci      Context::Scope context_scope(env->context());
1661cb0ef41Sopenharmony_ci      if (pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).IsEmpty())
1671cb0ef41Sopenharmony_ci        return;
1681cb0ef41Sopenharmony_ci      stream()->RemoveStreamListener(this);
1691cb0ef41Sopenharmony_ci    }
1701cb0ef41Sopenharmony_ci    return;
1711cb0ef41Sopenharmony_ci  }
1721cb0ef41Sopenharmony_ci
1731cb0ef41Sopenharmony_ci  if (pipe->is_eof_) {
1741cb0ef41Sopenharmony_ci    HandleScope handle_scope(pipe->env()->isolate());
1751cb0ef41Sopenharmony_ci    InternalCallbackScope callback_scope(pipe,
1761cb0ef41Sopenharmony_ci        InternalCallbackScope::kSkipTaskQueues);
1771cb0ef41Sopenharmony_ci    pipe->sink()->Shutdown();
1781cb0ef41Sopenharmony_ci    pipe->Unpipe();
1791cb0ef41Sopenharmony_ci    return;
1801cb0ef41Sopenharmony_ci  }
1811cb0ef41Sopenharmony_ci
1821cb0ef41Sopenharmony_ci  if (status != 0) {
1831cb0ef41Sopenharmony_ci    CHECK_NOT_NULL(previous_listener_);
1841cb0ef41Sopenharmony_ci    StreamListener* prev = previous_listener_;
1851cb0ef41Sopenharmony_ci    pipe->Unpipe();
1861cb0ef41Sopenharmony_ci    prev->OnStreamAfterWrite(w, status);
1871cb0ef41Sopenharmony_ci    return;
1881cb0ef41Sopenharmony_ci  }
1891cb0ef41Sopenharmony_ci
1901cb0ef41Sopenharmony_ci  if (!pipe->uses_wants_write_) {
1911cb0ef41Sopenharmony_ci    OnStreamWantsWrite(65536);
1921cb0ef41Sopenharmony_ci  }
1931cb0ef41Sopenharmony_ci}
1941cb0ef41Sopenharmony_ci
1951cb0ef41Sopenharmony_civoid StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
1961cb0ef41Sopenharmony_ci                                                         int status) {
1971cb0ef41Sopenharmony_ci  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
1981cb0ef41Sopenharmony_ci  CHECK_NOT_NULL(previous_listener_);
1991cb0ef41Sopenharmony_ci  StreamListener* prev = previous_listener_;
2001cb0ef41Sopenharmony_ci  pipe->Unpipe();
2011cb0ef41Sopenharmony_ci  prev->OnStreamAfterShutdown(w, status);
2021cb0ef41Sopenharmony_ci}
2031cb0ef41Sopenharmony_ci
2041cb0ef41Sopenharmony_civoid StreamPipe::ReadableListener::OnStreamDestroy() {
2051cb0ef41Sopenharmony_ci  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
2061cb0ef41Sopenharmony_ci  pipe->source_destroyed_ = true;
2071cb0ef41Sopenharmony_ci  if (!pipe->is_eof_) {
2081cb0ef41Sopenharmony_ci    OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
2091cb0ef41Sopenharmony_ci  }
2101cb0ef41Sopenharmony_ci}
2111cb0ef41Sopenharmony_ci
2121cb0ef41Sopenharmony_civoid StreamPipe::WritableListener::OnStreamDestroy() {
2131cb0ef41Sopenharmony_ci  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
2141cb0ef41Sopenharmony_ci  pipe->sink_destroyed_ = true;
2151cb0ef41Sopenharmony_ci  pipe->is_eof_ = true;
2161cb0ef41Sopenharmony_ci  pipe->pending_writes_ = 0;
2171cb0ef41Sopenharmony_ci  pipe->Unpipe();
2181cb0ef41Sopenharmony_ci}
2191cb0ef41Sopenharmony_ci
2201cb0ef41Sopenharmony_civoid StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
2211cb0ef41Sopenharmony_ci  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
2221cb0ef41Sopenharmony_ci  pipe->wanted_data_ = suggested_size;
2231cb0ef41Sopenharmony_ci  if (pipe->is_reading_ || pipe->is_closed_)
2241cb0ef41Sopenharmony_ci    return;
2251cb0ef41Sopenharmony_ci  HandleScope handle_scope(pipe->env()->isolate());
2261cb0ef41Sopenharmony_ci  InternalCallbackScope callback_scope(pipe,
2271cb0ef41Sopenharmony_ci      InternalCallbackScope::kSkipTaskQueues);
2281cb0ef41Sopenharmony_ci  pipe->is_reading_ = true;
2291cb0ef41Sopenharmony_ci  pipe->source()->ReadStart();
2301cb0ef41Sopenharmony_ci}
2311cb0ef41Sopenharmony_ci
2321cb0ef41Sopenharmony_ciuv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
2331cb0ef41Sopenharmony_ci  CHECK_NOT_NULL(previous_listener_);
2341cb0ef41Sopenharmony_ci  return previous_listener_->OnStreamAlloc(suggested_size);
2351cb0ef41Sopenharmony_ci}
2361cb0ef41Sopenharmony_ci
2371cb0ef41Sopenharmony_civoid StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
2381cb0ef41Sopenharmony_ci                                                const uv_buf_t& buf) {
2391cb0ef41Sopenharmony_ci  CHECK_NOT_NULL(previous_listener_);
2401cb0ef41Sopenharmony_ci  return previous_listener_->OnStreamRead(nread, buf);
2411cb0ef41Sopenharmony_ci}
2421cb0ef41Sopenharmony_ci
2431cb0ef41Sopenharmony_ciMaybe<StreamPipe*> StreamPipe::New(StreamBase* source,
2441cb0ef41Sopenharmony_ci                                   StreamBase* sink,
2451cb0ef41Sopenharmony_ci                                   Local<Object> obj) {
2461cb0ef41Sopenharmony_ci  std::unique_ptr<StreamPipe> stream_pipe(new StreamPipe(source, sink, obj));
2471cb0ef41Sopenharmony_ci
2481cb0ef41Sopenharmony_ci  // Set up links between this object and the source/sink objects.
2491cb0ef41Sopenharmony_ci  // In particular, this makes sure that they are garbage collected as a group,
2501cb0ef41Sopenharmony_ci  // if that applies to the given streams (for example, Http2Streams use
2511cb0ef41Sopenharmony_ci  // weak references).
2521cb0ef41Sopenharmony_ci  Environment* env = source->stream_env();
2531cb0ef41Sopenharmony_ci  if (obj->Set(env->context(), env->source_string(), source->GetObject())
2541cb0ef41Sopenharmony_ci          .IsNothing()) {
2551cb0ef41Sopenharmony_ci    return Nothing<StreamPipe*>();
2561cb0ef41Sopenharmony_ci  }
2571cb0ef41Sopenharmony_ci  if (source->GetObject()
2581cb0ef41Sopenharmony_ci          ->Set(env->context(), env->pipe_target_string(), obj)
2591cb0ef41Sopenharmony_ci          .IsNothing()) {
2601cb0ef41Sopenharmony_ci    return Nothing<StreamPipe*>();
2611cb0ef41Sopenharmony_ci  }
2621cb0ef41Sopenharmony_ci  if (obj->Set(env->context(), env->sink_string(), sink->GetObject())
2631cb0ef41Sopenharmony_ci          .IsNothing()) {
2641cb0ef41Sopenharmony_ci    return Nothing<StreamPipe*>();
2651cb0ef41Sopenharmony_ci  }
2661cb0ef41Sopenharmony_ci  if (sink->GetObject()
2671cb0ef41Sopenharmony_ci          ->Set(env->context(), env->pipe_source_string(), obj)
2681cb0ef41Sopenharmony_ci          .IsNothing()) {
2691cb0ef41Sopenharmony_ci    return Nothing<StreamPipe*>();
2701cb0ef41Sopenharmony_ci  }
2711cb0ef41Sopenharmony_ci
2721cb0ef41Sopenharmony_ci  return Just(stream_pipe.release());
2731cb0ef41Sopenharmony_ci}
2741cb0ef41Sopenharmony_ci
2751cb0ef41Sopenharmony_civoid StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
2761cb0ef41Sopenharmony_ci  CHECK(args.IsConstructCall());
2771cb0ef41Sopenharmony_ci  CHECK(args[0]->IsObject());
2781cb0ef41Sopenharmony_ci  CHECK(args[1]->IsObject());
2791cb0ef41Sopenharmony_ci  StreamBase* source = StreamBase::FromObject(args[0].As<Object>());
2801cb0ef41Sopenharmony_ci  StreamBase* sink = StreamBase::FromObject(args[1].As<Object>());
2811cb0ef41Sopenharmony_ci
2821cb0ef41Sopenharmony_ci  if (StreamPipe::New(source, sink, args.This()).IsNothing()) return;
2831cb0ef41Sopenharmony_ci}
2841cb0ef41Sopenharmony_ci
2851cb0ef41Sopenharmony_civoid StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
2861cb0ef41Sopenharmony_ci  StreamPipe* pipe;
2871cb0ef41Sopenharmony_ci  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
2881cb0ef41Sopenharmony_ci  pipe->is_closed_ = false;
2891cb0ef41Sopenharmony_ci  pipe->writable_listener_.OnStreamWantsWrite(65536);
2901cb0ef41Sopenharmony_ci}
2911cb0ef41Sopenharmony_ci
2921cb0ef41Sopenharmony_civoid StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
2931cb0ef41Sopenharmony_ci  StreamPipe* pipe;
2941cb0ef41Sopenharmony_ci  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
2951cb0ef41Sopenharmony_ci  pipe->Unpipe();
2961cb0ef41Sopenharmony_ci}
2971cb0ef41Sopenharmony_ci
2981cb0ef41Sopenharmony_civoid StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
2991cb0ef41Sopenharmony_ci  StreamPipe* pipe;
3001cb0ef41Sopenharmony_ci  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
3011cb0ef41Sopenharmony_ci  args.GetReturnValue().Set(pipe->is_closed_);
3021cb0ef41Sopenharmony_ci}
3031cb0ef41Sopenharmony_ci
3041cb0ef41Sopenharmony_civoid StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
3051cb0ef41Sopenharmony_ci  StreamPipe* pipe;
3061cb0ef41Sopenharmony_ci  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
3071cb0ef41Sopenharmony_ci  args.GetReturnValue().Set(pipe->pending_writes_);
3081cb0ef41Sopenharmony_ci}
3091cb0ef41Sopenharmony_ci
3101cb0ef41Sopenharmony_cinamespace {
3111cb0ef41Sopenharmony_ci
3121cb0ef41Sopenharmony_civoid InitializeStreamPipe(Local<Object> target,
3131cb0ef41Sopenharmony_ci                          Local<Value> unused,
3141cb0ef41Sopenharmony_ci                          Local<Context> context,
3151cb0ef41Sopenharmony_ci                          void* priv) {
3161cb0ef41Sopenharmony_ci  Environment* env = Environment::GetCurrent(context);
3171cb0ef41Sopenharmony_ci  Isolate* isolate = env->isolate();
3181cb0ef41Sopenharmony_ci
3191cb0ef41Sopenharmony_ci  // Create FunctionTemplate for FileHandle::CloseReq
3201cb0ef41Sopenharmony_ci  Local<FunctionTemplate> pipe = NewFunctionTemplate(isolate, StreamPipe::New);
3211cb0ef41Sopenharmony_ci  SetProtoMethod(isolate, pipe, "unpipe", StreamPipe::Unpipe);
3221cb0ef41Sopenharmony_ci  SetProtoMethod(isolate, pipe, "start", StreamPipe::Start);
3231cb0ef41Sopenharmony_ci  SetProtoMethod(isolate, pipe, "isClosed", StreamPipe::IsClosed);
3241cb0ef41Sopenharmony_ci  SetProtoMethod(isolate, pipe, "pendingWrites", StreamPipe::PendingWrites);
3251cb0ef41Sopenharmony_ci  pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
3261cb0ef41Sopenharmony_ci  pipe->InstanceTemplate()->SetInternalFieldCount(
3271cb0ef41Sopenharmony_ci      StreamPipe::kInternalFieldCount);
3281cb0ef41Sopenharmony_ci  SetConstructorFunction(context, target, "StreamPipe", pipe);
3291cb0ef41Sopenharmony_ci}
3301cb0ef41Sopenharmony_ci
3311cb0ef41Sopenharmony_ci}  // anonymous namespace
3321cb0ef41Sopenharmony_ci
3331cb0ef41Sopenharmony_ci}  // namespace node
3341cb0ef41Sopenharmony_ci
3351cb0ef41Sopenharmony_ciNODE_BINDING_CONTEXT_AWARE_INTERNAL(stream_pipe, node::InitializeStreamPipe)
336