xref: /third_party/node/src/quic/logstream.cc (revision 1cb0ef41)
1#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC
2
3#include "logstream.h"
4#include <async_wrap-inl.h>
5#include <base_object-inl.h>
6#include <env-inl.h>
7#include <memory_tracker-inl.h>
8#include <node_external_reference.h>
9#include <stream_base-inl.h>
10#include <uv.h>
11#include <v8.h>
12#include "bindingdata.h"
13
14namespace node {
15
16using v8::FunctionTemplate;
17using v8::Local;
18using v8::Object;
19
20namespace quic {
21
22Local<FunctionTemplate> LogStream::GetConstructorTemplate(Environment* env) {
23  auto& state = BindingData::Get(env);
24  auto tmpl = state.logstream_constructor_template();
25  if (tmpl.IsEmpty()) {
26    tmpl = FunctionTemplate::New(env->isolate());
27    tmpl->Inherit(AsyncWrap::GetConstructorTemplate(env));
28    tmpl->InstanceTemplate()->SetInternalFieldCount(
29        StreamBase::kInternalFieldCount);
30    tmpl->SetClassName(state.logstream_string());
31    StreamBase::AddMethods(env, tmpl);
32    state.set_logstream_constructor_template(tmpl);
33  }
34  return tmpl;
35}
36
37BaseObjectPtr<LogStream> LogStream::Create(Environment* env) {
38  v8::Local<v8::Object> obj;
39  if (!GetConstructorTemplate(env)
40           ->InstanceTemplate()
41           ->NewInstance(env->context())
42           .ToLocal(&obj)) {
43    return BaseObjectPtr<LogStream>();
44  }
45  return MakeDetachedBaseObject<LogStream>(env, obj);
46}
47
48LogStream::LogStream(Environment* env, Local<Object> obj)
49    : AsyncWrap(env, obj, AsyncWrap::PROVIDER_QUIC_LOGSTREAM), StreamBase(env) {
50  MakeWeak();
51  StreamBase::AttachToObject(GetObject());
52}
53
54void LogStream::Emit(const uint8_t* data, size_t len, EmitOption option) {
55  if (fin_seen_) return;
56  fin_seen_ = option == EmitOption::FIN;
57
58  size_t remaining = len;
59  // If the len is greater than the size of the buffer returned by
60  // EmitAlloc then EmitRead will be called multiple times.
61  while (remaining != 0) {
62    uv_buf_t buf = EmitAlloc(len);
63    size_t len = std::min<size_t>(remaining, buf.len);
64    memcpy(buf.base, data, len);
65    remaining -= len;
66    data += len;
67    // If we are actively reading from the stream, we'll call emit
68    // read immediately. Otherwise we buffer the chunk and will push
69    // the chunks out the next time ReadStart() is called.
70    if (reading_) {
71      EmitRead(len, buf);
72    } else {
73      // The total measures the total memory used so we always
74      // increment but buf.len and not chunk len.
75      ensure_space(buf.len);
76      total_ += buf.len;
77      buffer_.push_back(Chunk{len, buf});
78    }
79  }
80
81  if (ended_ && reading_) {
82    EmitRead(UV_EOF);
83  }
84}
85
86void LogStream::Emit(const std::string_view line, EmitOption option) {
87  Emit(reinterpret_cast<const uint8_t*>(line.data()), line.length(), option);
88}
89
90void LogStream::End() {
91  ended_ = true;
92}
93
94int LogStream::ReadStart() {
95  if (reading_) return 0;
96  // Flush any chunks that have already been buffered.
97  for (const auto& chunk : buffer_) EmitRead(chunk.len, chunk.buf);
98  total_ = 0;
99  buffer_.clear();
100  if (fin_seen_) {
101    // If we've already received the fin, there's nothing else to wait for.
102    EmitRead(UV_EOF);
103    return ReadStop();
104  }
105  // Otherwise, we're going to wait for more chunks to be written.
106  reading_ = true;
107  return 0;
108}
109
110int LogStream::ReadStop() {
111  reading_ = false;
112  return 0;
113}
114
115// We do not use either of these.
116int LogStream::DoShutdown(ShutdownWrap* req_wrap) {
117  UNREACHABLE();
118}
119int LogStream::DoWrite(WriteWrap* w,
120                       uv_buf_t* bufs,
121                       size_t count,
122                       uv_stream_t* send_handle) {
123  UNREACHABLE();
124}
125
126bool LogStream::IsAlive() {
127  return !ended_;
128}
129
130bool LogStream::IsClosing() {
131  return ended_;
132}
133
134AsyncWrap* LogStream::GetAsyncWrap() {
135  return this;
136}
137
138void LogStream::MemoryInfo(MemoryTracker* tracker) const {
139  tracker->TrackFieldWithSize("buffer", total_);
140}
141
142// The LogStream buffer enforces a maximum size of kMaxLogStreamBuffer.
143void LogStream::ensure_space(size_t amt) {
144  while (total_ + amt > kMaxLogStreamBuffer) {
145    total_ -= buffer_.front().buf.len;
146    buffer_.pop_front();
147  }
148}
149}  // namespace quic
150}  // namespace node
151
152#endif  // HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC
153