1#ifndef SRC_STREAM_BASE_INL_H_ 2#define SRC_STREAM_BASE_INL_H_ 3 4#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 5 6#include "async_wrap-inl.h" 7#include "base_object-inl.h" 8#include "node.h" 9#include "stream_base.h" 10#include "v8.h" 11 12namespace node { 13 14StreamReq::StreamReq( 15 StreamBase* stream, 16 v8::Local<v8::Object> req_wrap_obj) : stream_(stream) { 17 AttachToObject(req_wrap_obj); 18} 19 20void StreamReq::AttachToObject(v8::Local<v8::Object> req_wrap_obj) { 21 CHECK_EQ(req_wrap_obj->GetAlignedPointerFromInternalField( 22 StreamReq::kStreamReqField), 23 nullptr); 24 req_wrap_obj->SetAlignedPointerInInternalField( 25 StreamReq::kStreamReqField, this); 26} 27 28StreamReq* StreamReq::FromObject(v8::Local<v8::Object> req_wrap_obj) { 29 return static_cast<StreamReq*>( 30 req_wrap_obj->GetAlignedPointerFromInternalField( 31 StreamReq::kStreamReqField)); 32} 33 34void StreamReq::Dispose() { 35 BaseObjectPtr<AsyncWrap> destroy_me{GetAsyncWrap()}; 36 object()->SetAlignedPointerInInternalField( 37 StreamReq::kStreamReqField, nullptr); 38 destroy_me->Detach(); 39} 40 41v8::Local<v8::Object> StreamReq::object() { 42 return GetAsyncWrap()->object(); 43} 44 45ShutdownWrap::ShutdownWrap( 46 StreamBase* stream, 47 v8::Local<v8::Object> req_wrap_obj) 48 : StreamReq(stream, req_wrap_obj) { } 49 50WriteWrap::WriteWrap( 51 StreamBase* stream, 52 v8::Local<v8::Object> req_wrap_obj) 53 : StreamReq(stream, req_wrap_obj) { } 54 55void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) { 56 CHECK_NOT_NULL(previous_listener_); 57 previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0)); 58} 59 60void StreamResource::PushStreamListener(StreamListener* listener) { 61 CHECK_NOT_NULL(listener); 62 CHECK_NULL(listener->stream_); 63 64 listener->previous_listener_ = listener_; 65 listener->stream_ = this; 66 67 listener_ = listener; 68} 69 70uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) { 71 DebugSealHandleScope seal_handle_scope; 72 return listener_->OnStreamAlloc(suggested_size); 73} 74 75void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) { 76 DebugSealHandleScope seal_handle_scope; 77 if (nread > 0) 78 bytes_read_ += static_cast<uint64_t>(nread); 79 listener_->OnStreamRead(nread, buf); 80} 81 82void StreamResource::EmitAfterWrite(WriteWrap* w, int status) { 83 DebugSealHandleScope seal_handle_scope; 84 listener_->OnStreamAfterWrite(w, status); 85} 86 87void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) { 88 DebugSealHandleScope seal_handle_scope; 89 listener_->OnStreamAfterShutdown(w, status); 90} 91 92void StreamResource::EmitWantsWrite(size_t suggested_size) { 93 DebugSealHandleScope seal_handle_scope; 94 listener_->OnStreamWantsWrite(suggested_size); 95} 96 97StreamBase::StreamBase(Environment* env) : env_(env) { 98 PushStreamListener(&default_listener_); 99} 100 101template <typename OtherBase> 102SimpleShutdownWrap<OtherBase>::SimpleShutdownWrap( 103 StreamBase* stream, 104 v8::Local<v8::Object> req_wrap_obj) 105 : ShutdownWrap(stream, req_wrap_obj), 106 OtherBase(stream->stream_env(), 107 req_wrap_obj, 108 AsyncWrap::PROVIDER_SHUTDOWNWRAP) { 109} 110 111template <typename OtherBase> 112SimpleWriteWrap<OtherBase>::SimpleWriteWrap( 113 StreamBase* stream, 114 v8::Local<v8::Object> req_wrap_obj) 115 : WriteWrap(stream, req_wrap_obj), 116 OtherBase(stream->stream_env(), 117 req_wrap_obj, 118 AsyncWrap::PROVIDER_WRITEWRAP) { 119} 120 121void StreamBase::AttachToObject(v8::Local<v8::Object> obj) { 122 obj->SetAlignedPointerInInternalField( 123 StreamBase::kStreamBaseField, this); 124} 125 126StreamBase* StreamBase::FromObject(v8::Local<v8::Object> obj) { 127 if (obj->GetAlignedPointerFromInternalField(StreamBase::kSlot) == nullptr) 128 return nullptr; 129 130 return static_cast<StreamBase*>( 131 obj->GetAlignedPointerFromInternalField( 132 StreamBase::kStreamBaseField)); 133} 134 135WriteWrap* WriteWrap::FromObject(v8::Local<v8::Object> req_wrap_obj) { 136 return static_cast<WriteWrap*>(StreamReq::FromObject(req_wrap_obj)); 137} 138 139template <typename T, bool kIsWeak> 140WriteWrap* WriteWrap::FromObject( 141 const BaseObjectPtrImpl<T, kIsWeak>& base_obj) { 142 if (!base_obj) return nullptr; 143 return FromObject(base_obj->object()); 144} 145 146ShutdownWrap* ShutdownWrap::FromObject(v8::Local<v8::Object> req_wrap_obj) { 147 return static_cast<ShutdownWrap*>(StreamReq::FromObject(req_wrap_obj)); 148} 149 150template <typename T, bool kIsWeak> 151ShutdownWrap* ShutdownWrap::FromObject( 152 const BaseObjectPtrImpl<T, kIsWeak>& base_obj) { 153 if (!base_obj) return nullptr; 154 return FromObject(base_obj->object()); 155} 156 157void WriteWrap::SetBackingStore(std::unique_ptr<v8::BackingStore> bs) { 158 CHECK(!backing_store_); 159 backing_store_ = std::move(bs); 160} 161 162void StreamReq::ResetObject(v8::Local<v8::Object> obj) { 163 DCHECK_GT(obj->InternalFieldCount(), StreamReq::kStreamReqField); 164 165 obj->SetAlignedPointerInInternalField(StreamReq::kSlot, nullptr); 166 obj->SetAlignedPointerInInternalField(StreamReq::kStreamReqField, nullptr); 167} 168 169} // namespace node 170 171#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 172 173#endif // SRC_STREAM_BASE_INL_H_ 174