1#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC 2 3#include "logstream.h" 4#include <async_wrap-inl.h> 5#include <base_object-inl.h> 6#include <env-inl.h> 7#include <memory_tracker-inl.h> 8#include <node_external_reference.h> 9#include <stream_base-inl.h> 10#include <uv.h> 11#include <v8.h> 12#include "bindingdata.h" 13 14namespace node { 15 16using v8::FunctionTemplate; 17using v8::Local; 18using v8::Object; 19 20namespace quic { 21 22Local<FunctionTemplate> LogStream::GetConstructorTemplate(Environment* env) { 23 auto& state = BindingData::Get(env); 24 auto tmpl = state.logstream_constructor_template(); 25 if (tmpl.IsEmpty()) { 26 tmpl = FunctionTemplate::New(env->isolate()); 27 tmpl->Inherit(AsyncWrap::GetConstructorTemplate(env)); 28 tmpl->InstanceTemplate()->SetInternalFieldCount( 29 StreamBase::kInternalFieldCount); 30 tmpl->SetClassName(state.logstream_string()); 31 StreamBase::AddMethods(env, tmpl); 32 state.set_logstream_constructor_template(tmpl); 33 } 34 return tmpl; 35} 36 37BaseObjectPtr<LogStream> LogStream::Create(Environment* env) { 38 v8::Local<v8::Object> obj; 39 if (!GetConstructorTemplate(env) 40 ->InstanceTemplate() 41 ->NewInstance(env->context()) 42 .ToLocal(&obj)) { 43 return BaseObjectPtr<LogStream>(); 44 } 45 return MakeDetachedBaseObject<LogStream>(env, obj); 46} 47 48LogStream::LogStream(Environment* env, Local<Object> obj) 49 : AsyncWrap(env, obj, AsyncWrap::PROVIDER_QUIC_LOGSTREAM), StreamBase(env) { 50 MakeWeak(); 51 StreamBase::AttachToObject(GetObject()); 52} 53 54void LogStream::Emit(const uint8_t* data, size_t len, EmitOption option) { 55 if (fin_seen_) return; 56 fin_seen_ = option == EmitOption::FIN; 57 58 size_t remaining = len; 59 // If the len is greater than the size of the buffer returned by 60 // EmitAlloc then EmitRead will be called multiple times. 61 while (remaining != 0) { 62 uv_buf_t buf = EmitAlloc(len); 63 size_t len = std::min<size_t>(remaining, buf.len); 64 memcpy(buf.base, data, len); 65 remaining -= len; 66 data += len; 67 // If we are actively reading from the stream, we'll call emit 68 // read immediately. Otherwise we buffer the chunk and will push 69 // the chunks out the next time ReadStart() is called. 70 if (reading_) { 71 EmitRead(len, buf); 72 } else { 73 // The total measures the total memory used so we always 74 // increment but buf.len and not chunk len. 75 ensure_space(buf.len); 76 total_ += buf.len; 77 buffer_.push_back(Chunk{len, buf}); 78 } 79 } 80 81 if (ended_ && reading_) { 82 EmitRead(UV_EOF); 83 } 84} 85 86void LogStream::Emit(const std::string_view line, EmitOption option) { 87 Emit(reinterpret_cast<const uint8_t*>(line.data()), line.length(), option); 88} 89 90void LogStream::End() { 91 ended_ = true; 92} 93 94int LogStream::ReadStart() { 95 if (reading_) return 0; 96 // Flush any chunks that have already been buffered. 97 for (const auto& chunk : buffer_) EmitRead(chunk.len, chunk.buf); 98 total_ = 0; 99 buffer_.clear(); 100 if (fin_seen_) { 101 // If we've already received the fin, there's nothing else to wait for. 102 EmitRead(UV_EOF); 103 return ReadStop(); 104 } 105 // Otherwise, we're going to wait for more chunks to be written. 106 reading_ = true; 107 return 0; 108} 109 110int LogStream::ReadStop() { 111 reading_ = false; 112 return 0; 113} 114 115// We do not use either of these. 116int LogStream::DoShutdown(ShutdownWrap* req_wrap) { 117 UNREACHABLE(); 118} 119int LogStream::DoWrite(WriteWrap* w, 120 uv_buf_t* bufs, 121 size_t count, 122 uv_stream_t* send_handle) { 123 UNREACHABLE(); 124} 125 126bool LogStream::IsAlive() { 127 return !ended_; 128} 129 130bool LogStream::IsClosing() { 131 return ended_; 132} 133 134AsyncWrap* LogStream::GetAsyncWrap() { 135 return this; 136} 137 138void LogStream::MemoryInfo(MemoryTracker* tracker) const { 139 tracker->TrackFieldWithSize("buffer", total_); 140} 141 142// The LogStream buffer enforces a maximum size of kMaxLogStreamBuffer. 143void LogStream::ensure_space(size_t amt) { 144 while (total_ + amt > kMaxLogStreamBuffer) { 145 total_ -= buffer_.front().buf.len; 146 buffer_.pop_front(); 147 } 148} 149} // namespace quic 150} // namespace node 151 152#endif // HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC 153