1#include "stream_pipe.h" 2#include "stream_base-inl.h" 3#include "node_buffer.h" 4#include "util-inl.h" 5 6namespace node { 7 8using v8::BackingStore; 9using v8::Context; 10using v8::Function; 11using v8::FunctionCallbackInfo; 12using v8::FunctionTemplate; 13using v8::HandleScope; 14using v8::Isolate; 15using v8::Just; 16using v8::Local; 17using v8::Maybe; 18using v8::Nothing; 19using v8::Object; 20using v8::Value; 21 22StreamPipe::StreamPipe(StreamBase* source, 23 StreamBase* sink, 24 Local<Object> obj) 25 : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) { 26 MakeWeak(); 27 28 CHECK_NOT_NULL(sink); 29 CHECK_NOT_NULL(source); 30 31 source->PushStreamListener(&readable_listener_); 32 sink->PushStreamListener(&writable_listener_); 33 34 uses_wants_write_ = sink->HasWantsWrite(); 35} 36 37StreamPipe::~StreamPipe() { 38 Unpipe(true); 39} 40 41StreamBase* StreamPipe::source() { 42 return static_cast<StreamBase*>(readable_listener_.stream()); 43} 44 45StreamBase* StreamPipe::sink() { 46 return static_cast<StreamBase*>(writable_listener_.stream()); 47} 48 49void StreamPipe::Unpipe(bool is_in_deletion) { 50 if (is_closed_) 51 return; 52 53 // Note that we possibly cannot use virtual methods on `source` and `sink` 54 // here, because this function can be called from their destructors via 55 // `OnStreamDestroy()`. 56 if (!source_destroyed_) 57 source()->ReadStop(); 58 59 is_closed_ = true; 60 is_reading_ = false; 61 source()->RemoveStreamListener(&readable_listener_); 62 if (pending_writes_ == 0) 63 sink()->RemoveStreamListener(&writable_listener_); 64 65 if (is_in_deletion) return; 66 67 // Delay the JS-facing part with SetImmediate, because this might be from 68 // inside the garbage collector, so we can’t run JS here. 69 HandleScope handle_scope(env()->isolate()); 70 BaseObjectPtr<StreamPipe> strong_ref{this}; 71 env()->SetImmediate([this, strong_ref](Environment* env) { 72 HandleScope handle_scope(env->isolate()); 73 Context::Scope context_scope(env->context()); 74 Local<Object> object = this->object(); 75 76 Local<Value> onunpipe; 77 if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe)) 78 return; 79 if (onunpipe->IsFunction() && 80 MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) { 81 return; 82 } 83 84 // Set all the links established in the constructor to `null`. 85 Local<Value> null = Null(env->isolate()); 86 87 Local<Value> source_v; 88 Local<Value> sink_v; 89 if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) || 90 !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) || 91 !source_v->IsObject() || !sink_v->IsObject()) { 92 return; 93 } 94 95 if (object->Set(env->context(), env->source_string(), null).IsNothing() || 96 object->Set(env->context(), env->sink_string(), null).IsNothing() || 97 source_v.As<Object>() 98 ->Set(env->context(), env->pipe_target_string(), null) 99 .IsNothing() || 100 sink_v.As<Object>() 101 ->Set(env->context(), env->pipe_source_string(), null) 102 .IsNothing()) { 103 return; 104 } 105 }); 106} 107 108uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) { 109 StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); 110 size_t size = std::min(suggested_size, pipe->wanted_data_); 111 CHECK_GT(size, 0); 112 return pipe->env()->allocate_managed_buffer(size); 113} 114 115void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread, 116 const uv_buf_t& buf_) { 117 StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); 118 std::unique_ptr<BackingStore> bs = pipe->env()->release_managed_buffer(buf_); 119 if (nread < 0) { 120 // EOF or error; stop reading and pass the error to the previous listener 121 // (which might end up in JS). 122 pipe->is_eof_ = true; 123 // Cache `sink()` here because the previous listener might do things 124 // that eventually lead to an `Unpipe()` call. 125 StreamBase* sink = pipe->sink(); 126 stream()->ReadStop(); 127 CHECK_NOT_NULL(previous_listener_); 128 previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0)); 129 // If we’re not writing, close now. Otherwise, we’ll do that in 130 // `OnStreamAfterWrite()`. 131 if (pipe->pending_writes_ == 0) { 132 sink->Shutdown(); 133 pipe->Unpipe(); 134 } 135 return; 136 } 137 138 pipe->ProcessData(nread, std::move(bs)); 139} 140 141void StreamPipe::ProcessData(size_t nread, 142 std::unique_ptr<BackingStore> bs) { 143 CHECK(uses_wants_write_ || pending_writes_ == 0); 144 uv_buf_t buffer = uv_buf_init(static_cast<char*>(bs->Data()), nread); 145 StreamWriteResult res = sink()->Write(&buffer, 1); 146 pending_writes_++; 147 if (!res.async) { 148 writable_listener_.OnStreamAfterWrite(nullptr, res.err); 149 } else { 150 is_reading_ = false; 151 res.wrap->SetBackingStore(std::move(bs)); 152 if (source() != nullptr) 153 source()->ReadStop(); 154 } 155} 156 157void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w, 158 int status) { 159 StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); 160 pipe->pending_writes_--; 161 if (pipe->is_closed_) { 162 if (pipe->pending_writes_ == 0) { 163 Environment* env = pipe->env(); 164 HandleScope handle_scope(env->isolate()); 165 Context::Scope context_scope(env->context()); 166 if (pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).IsEmpty()) 167 return; 168 stream()->RemoveStreamListener(this); 169 } 170 return; 171 } 172 173 if (pipe->is_eof_) { 174 HandleScope handle_scope(pipe->env()->isolate()); 175 InternalCallbackScope callback_scope(pipe, 176 InternalCallbackScope::kSkipTaskQueues); 177 pipe->sink()->Shutdown(); 178 pipe->Unpipe(); 179 return; 180 } 181 182 if (status != 0) { 183 CHECK_NOT_NULL(previous_listener_); 184 StreamListener* prev = previous_listener_; 185 pipe->Unpipe(); 186 prev->OnStreamAfterWrite(w, status); 187 return; 188 } 189 190 if (!pipe->uses_wants_write_) { 191 OnStreamWantsWrite(65536); 192 } 193} 194 195void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w, 196 int status) { 197 StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); 198 CHECK_NOT_NULL(previous_listener_); 199 StreamListener* prev = previous_listener_; 200 pipe->Unpipe(); 201 prev->OnStreamAfterShutdown(w, status); 202} 203 204void StreamPipe::ReadableListener::OnStreamDestroy() { 205 StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); 206 pipe->source_destroyed_ = true; 207 if (!pipe->is_eof_) { 208 OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0)); 209 } 210} 211 212void StreamPipe::WritableListener::OnStreamDestroy() { 213 StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); 214 pipe->sink_destroyed_ = true; 215 pipe->is_eof_ = true; 216 pipe->pending_writes_ = 0; 217 pipe->Unpipe(); 218} 219 220void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) { 221 StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); 222 pipe->wanted_data_ = suggested_size; 223 if (pipe->is_reading_ || pipe->is_closed_) 224 return; 225 HandleScope handle_scope(pipe->env()->isolate()); 226 InternalCallbackScope callback_scope(pipe, 227 InternalCallbackScope::kSkipTaskQueues); 228 pipe->is_reading_ = true; 229 pipe->source()->ReadStart(); 230} 231 232uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) { 233 CHECK_NOT_NULL(previous_listener_); 234 return previous_listener_->OnStreamAlloc(suggested_size); 235} 236 237void StreamPipe::WritableListener::OnStreamRead(ssize_t nread, 238 const uv_buf_t& buf) { 239 CHECK_NOT_NULL(previous_listener_); 240 return previous_listener_->OnStreamRead(nread, buf); 241} 242 243Maybe<StreamPipe*> StreamPipe::New(StreamBase* source, 244 StreamBase* sink, 245 Local<Object> obj) { 246 std::unique_ptr<StreamPipe> stream_pipe(new StreamPipe(source, sink, obj)); 247 248 // Set up links between this object and the source/sink objects. 249 // In particular, this makes sure that they are garbage collected as a group, 250 // if that applies to the given streams (for example, Http2Streams use 251 // weak references). 252 Environment* env = source->stream_env(); 253 if (obj->Set(env->context(), env->source_string(), source->GetObject()) 254 .IsNothing()) { 255 return Nothing<StreamPipe*>(); 256 } 257 if (source->GetObject() 258 ->Set(env->context(), env->pipe_target_string(), obj) 259 .IsNothing()) { 260 return Nothing<StreamPipe*>(); 261 } 262 if (obj->Set(env->context(), env->sink_string(), sink->GetObject()) 263 .IsNothing()) { 264 return Nothing<StreamPipe*>(); 265 } 266 if (sink->GetObject() 267 ->Set(env->context(), env->pipe_source_string(), obj) 268 .IsNothing()) { 269 return Nothing<StreamPipe*>(); 270 } 271 272 return Just(stream_pipe.release()); 273} 274 275void StreamPipe::New(const FunctionCallbackInfo<Value>& args) { 276 CHECK(args.IsConstructCall()); 277 CHECK(args[0]->IsObject()); 278 CHECK(args[1]->IsObject()); 279 StreamBase* source = StreamBase::FromObject(args[0].As<Object>()); 280 StreamBase* sink = StreamBase::FromObject(args[1].As<Object>()); 281 282 if (StreamPipe::New(source, sink, args.This()).IsNothing()) return; 283} 284 285void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) { 286 StreamPipe* pipe; 287 ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); 288 pipe->is_closed_ = false; 289 pipe->writable_listener_.OnStreamWantsWrite(65536); 290} 291 292void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) { 293 StreamPipe* pipe; 294 ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); 295 pipe->Unpipe(); 296} 297 298void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) { 299 StreamPipe* pipe; 300 ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); 301 args.GetReturnValue().Set(pipe->is_closed_); 302} 303 304void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) { 305 StreamPipe* pipe; 306 ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); 307 args.GetReturnValue().Set(pipe->pending_writes_); 308} 309 310namespace { 311 312void InitializeStreamPipe(Local<Object> target, 313 Local<Value> unused, 314 Local<Context> context, 315 void* priv) { 316 Environment* env = Environment::GetCurrent(context); 317 Isolate* isolate = env->isolate(); 318 319 // Create FunctionTemplate for FileHandle::CloseReq 320 Local<FunctionTemplate> pipe = NewFunctionTemplate(isolate, StreamPipe::New); 321 SetProtoMethod(isolate, pipe, "unpipe", StreamPipe::Unpipe); 322 SetProtoMethod(isolate, pipe, "start", StreamPipe::Start); 323 SetProtoMethod(isolate, pipe, "isClosed", StreamPipe::IsClosed); 324 SetProtoMethod(isolate, pipe, "pendingWrites", StreamPipe::PendingWrites); 325 pipe->Inherit(AsyncWrap::GetConstructorTemplate(env)); 326 pipe->InstanceTemplate()->SetInternalFieldCount( 327 StreamPipe::kInternalFieldCount); 328 SetConstructorFunction(context, target, "StreamPipe", pipe); 329} 330 331} // anonymous namespace 332 333} // namespace node 334 335NODE_BINDING_CONTEXT_AWARE_INTERNAL(stream_pipe, node::InitializeStreamPipe) 336