xref: /third_party/node/src/stream_pipe.cc (revision 1cb0ef41)
1#include "stream_pipe.h"
2#include "stream_base-inl.h"
3#include "node_buffer.h"
4#include "util-inl.h"
5
6namespace node {
7
8using v8::BackingStore;
9using v8::Context;
10using v8::Function;
11using v8::FunctionCallbackInfo;
12using v8::FunctionTemplate;
13using v8::HandleScope;
14using v8::Isolate;
15using v8::Just;
16using v8::Local;
17using v8::Maybe;
18using v8::Nothing;
19using v8::Object;
20using v8::Value;
21
22StreamPipe::StreamPipe(StreamBase* source,
23                       StreamBase* sink,
24                       Local<Object> obj)
25    : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
26  MakeWeak();
27
28  CHECK_NOT_NULL(sink);
29  CHECK_NOT_NULL(source);
30
31  source->PushStreamListener(&readable_listener_);
32  sink->PushStreamListener(&writable_listener_);
33
34  uses_wants_write_ = sink->HasWantsWrite();
35}
36
37StreamPipe::~StreamPipe() {
38  Unpipe(true);
39}
40
41StreamBase* StreamPipe::source() {
42  return static_cast<StreamBase*>(readable_listener_.stream());
43}
44
45StreamBase* StreamPipe::sink() {
46  return static_cast<StreamBase*>(writable_listener_.stream());
47}
48
49void StreamPipe::Unpipe(bool is_in_deletion) {
50  if (is_closed_)
51    return;
52
53  // Note that we possibly cannot use virtual methods on `source` and `sink`
54  // here, because this function can be called from their destructors via
55  // `OnStreamDestroy()`.
56  if (!source_destroyed_)
57    source()->ReadStop();
58
59  is_closed_ = true;
60  is_reading_ = false;
61  source()->RemoveStreamListener(&readable_listener_);
62  if (pending_writes_ == 0)
63    sink()->RemoveStreamListener(&writable_listener_);
64
65  if (is_in_deletion) return;
66
67  // Delay the JS-facing part with SetImmediate, because this might be from
68  // inside the garbage collector, so we can’t run JS here.
69  HandleScope handle_scope(env()->isolate());
70  BaseObjectPtr<StreamPipe> strong_ref{this};
71  env()->SetImmediate([this, strong_ref](Environment* env) {
72    HandleScope handle_scope(env->isolate());
73    Context::Scope context_scope(env->context());
74    Local<Object> object = this->object();
75
76    Local<Value> onunpipe;
77    if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
78      return;
79    if (onunpipe->IsFunction() &&
80        MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
81      return;
82    }
83
84    // Set all the links established in the constructor to `null`.
85    Local<Value> null = Null(env->isolate());
86
87    Local<Value> source_v;
88    Local<Value> sink_v;
89    if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) ||
90        !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) ||
91        !source_v->IsObject() || !sink_v->IsObject()) {
92      return;
93    }
94
95    if (object->Set(env->context(), env->source_string(), null).IsNothing() ||
96        object->Set(env->context(), env->sink_string(), null).IsNothing() ||
97        source_v.As<Object>()
98            ->Set(env->context(), env->pipe_target_string(), null)
99            .IsNothing() ||
100        sink_v.As<Object>()
101            ->Set(env->context(), env->pipe_source_string(), null)
102            .IsNothing()) {
103      return;
104    }
105  });
106}
107
108uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
109  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
110  size_t size = std::min(suggested_size, pipe->wanted_data_);
111  CHECK_GT(size, 0);
112  return pipe->env()->allocate_managed_buffer(size);
113}
114
115void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
116                                                const uv_buf_t& buf_) {
117  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
118  std::unique_ptr<BackingStore> bs = pipe->env()->release_managed_buffer(buf_);
119  if (nread < 0) {
120    // EOF or error; stop reading and pass the error to the previous listener
121    // (which might end up in JS).
122    pipe->is_eof_ = true;
123    // Cache `sink()` here because the previous listener might do things
124    // that eventually lead to an `Unpipe()` call.
125    StreamBase* sink = pipe->sink();
126    stream()->ReadStop();
127    CHECK_NOT_NULL(previous_listener_);
128    previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
129    // If we’re not writing, close now. Otherwise, we’ll do that in
130    // `OnStreamAfterWrite()`.
131    if (pipe->pending_writes_ == 0) {
132      sink->Shutdown();
133      pipe->Unpipe();
134    }
135    return;
136  }
137
138  pipe->ProcessData(nread, std::move(bs));
139}
140
141void StreamPipe::ProcessData(size_t nread,
142                             std::unique_ptr<BackingStore> bs) {
143  CHECK(uses_wants_write_ || pending_writes_ == 0);
144  uv_buf_t buffer = uv_buf_init(static_cast<char*>(bs->Data()), nread);
145  StreamWriteResult res = sink()->Write(&buffer, 1);
146  pending_writes_++;
147  if (!res.async) {
148    writable_listener_.OnStreamAfterWrite(nullptr, res.err);
149  } else {
150    is_reading_ = false;
151    res.wrap->SetBackingStore(std::move(bs));
152    if (source() != nullptr)
153      source()->ReadStop();
154  }
155}
156
157void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
158                                                      int status) {
159  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
160  pipe->pending_writes_--;
161  if (pipe->is_closed_) {
162    if (pipe->pending_writes_ == 0) {
163      Environment* env = pipe->env();
164      HandleScope handle_scope(env->isolate());
165      Context::Scope context_scope(env->context());
166      if (pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).IsEmpty())
167        return;
168      stream()->RemoveStreamListener(this);
169    }
170    return;
171  }
172
173  if (pipe->is_eof_) {
174    HandleScope handle_scope(pipe->env()->isolate());
175    InternalCallbackScope callback_scope(pipe,
176        InternalCallbackScope::kSkipTaskQueues);
177    pipe->sink()->Shutdown();
178    pipe->Unpipe();
179    return;
180  }
181
182  if (status != 0) {
183    CHECK_NOT_NULL(previous_listener_);
184    StreamListener* prev = previous_listener_;
185    pipe->Unpipe();
186    prev->OnStreamAfterWrite(w, status);
187    return;
188  }
189
190  if (!pipe->uses_wants_write_) {
191    OnStreamWantsWrite(65536);
192  }
193}
194
195void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
196                                                         int status) {
197  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
198  CHECK_NOT_NULL(previous_listener_);
199  StreamListener* prev = previous_listener_;
200  pipe->Unpipe();
201  prev->OnStreamAfterShutdown(w, status);
202}
203
204void StreamPipe::ReadableListener::OnStreamDestroy() {
205  StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
206  pipe->source_destroyed_ = true;
207  if (!pipe->is_eof_) {
208    OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
209  }
210}
211
212void StreamPipe::WritableListener::OnStreamDestroy() {
213  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
214  pipe->sink_destroyed_ = true;
215  pipe->is_eof_ = true;
216  pipe->pending_writes_ = 0;
217  pipe->Unpipe();
218}
219
220void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
221  StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
222  pipe->wanted_data_ = suggested_size;
223  if (pipe->is_reading_ || pipe->is_closed_)
224    return;
225  HandleScope handle_scope(pipe->env()->isolate());
226  InternalCallbackScope callback_scope(pipe,
227      InternalCallbackScope::kSkipTaskQueues);
228  pipe->is_reading_ = true;
229  pipe->source()->ReadStart();
230}
231
232uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
233  CHECK_NOT_NULL(previous_listener_);
234  return previous_listener_->OnStreamAlloc(suggested_size);
235}
236
237void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
238                                                const uv_buf_t& buf) {
239  CHECK_NOT_NULL(previous_listener_);
240  return previous_listener_->OnStreamRead(nread, buf);
241}
242
243Maybe<StreamPipe*> StreamPipe::New(StreamBase* source,
244                                   StreamBase* sink,
245                                   Local<Object> obj) {
246  std::unique_ptr<StreamPipe> stream_pipe(new StreamPipe(source, sink, obj));
247
248  // Set up links between this object and the source/sink objects.
249  // In particular, this makes sure that they are garbage collected as a group,
250  // if that applies to the given streams (for example, Http2Streams use
251  // weak references).
252  Environment* env = source->stream_env();
253  if (obj->Set(env->context(), env->source_string(), source->GetObject())
254          .IsNothing()) {
255    return Nothing<StreamPipe*>();
256  }
257  if (source->GetObject()
258          ->Set(env->context(), env->pipe_target_string(), obj)
259          .IsNothing()) {
260    return Nothing<StreamPipe*>();
261  }
262  if (obj->Set(env->context(), env->sink_string(), sink->GetObject())
263          .IsNothing()) {
264    return Nothing<StreamPipe*>();
265  }
266  if (sink->GetObject()
267          ->Set(env->context(), env->pipe_source_string(), obj)
268          .IsNothing()) {
269    return Nothing<StreamPipe*>();
270  }
271
272  return Just(stream_pipe.release());
273}
274
275void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
276  CHECK(args.IsConstructCall());
277  CHECK(args[0]->IsObject());
278  CHECK(args[1]->IsObject());
279  StreamBase* source = StreamBase::FromObject(args[0].As<Object>());
280  StreamBase* sink = StreamBase::FromObject(args[1].As<Object>());
281
282  if (StreamPipe::New(source, sink, args.This()).IsNothing()) return;
283}
284
285void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
286  StreamPipe* pipe;
287  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
288  pipe->is_closed_ = false;
289  pipe->writable_listener_.OnStreamWantsWrite(65536);
290}
291
292void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
293  StreamPipe* pipe;
294  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
295  pipe->Unpipe();
296}
297
298void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
299  StreamPipe* pipe;
300  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
301  args.GetReturnValue().Set(pipe->is_closed_);
302}
303
304void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
305  StreamPipe* pipe;
306  ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
307  args.GetReturnValue().Set(pipe->pending_writes_);
308}
309
310namespace {
311
312void InitializeStreamPipe(Local<Object> target,
313                          Local<Value> unused,
314                          Local<Context> context,
315                          void* priv) {
316  Environment* env = Environment::GetCurrent(context);
317  Isolate* isolate = env->isolate();
318
319  // Create FunctionTemplate for FileHandle::CloseReq
320  Local<FunctionTemplate> pipe = NewFunctionTemplate(isolate, StreamPipe::New);
321  SetProtoMethod(isolate, pipe, "unpipe", StreamPipe::Unpipe);
322  SetProtoMethod(isolate, pipe, "start", StreamPipe::Start);
323  SetProtoMethod(isolate, pipe, "isClosed", StreamPipe::IsClosed);
324  SetProtoMethod(isolate, pipe, "pendingWrites", StreamPipe::PendingWrites);
325  pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
326  pipe->InstanceTemplate()->SetInternalFieldCount(
327      StreamPipe::kInternalFieldCount);
328  SetConstructorFunction(context, target, "StreamPipe", pipe);
329}
330
331}  // anonymous namespace
332
333}  // namespace node
334
335NODE_BINDING_CONTEXT_AWARE_INTERNAL(stream_pipe, node::InitializeStreamPipe)
336