11cb0ef41Sopenharmony_ci#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC
21cb0ef41Sopenharmony_ci
31cb0ef41Sopenharmony_ci#include "logstream.h"
41cb0ef41Sopenharmony_ci#include <async_wrap-inl.h>
51cb0ef41Sopenharmony_ci#include <base_object-inl.h>
61cb0ef41Sopenharmony_ci#include <env-inl.h>
71cb0ef41Sopenharmony_ci#include <memory_tracker-inl.h>
81cb0ef41Sopenharmony_ci#include <node_external_reference.h>
91cb0ef41Sopenharmony_ci#include <stream_base-inl.h>
101cb0ef41Sopenharmony_ci#include <uv.h>
111cb0ef41Sopenharmony_ci#include <v8.h>
121cb0ef41Sopenharmony_ci#include "bindingdata.h"
131cb0ef41Sopenharmony_ci
141cb0ef41Sopenharmony_cinamespace node {
151cb0ef41Sopenharmony_ci
161cb0ef41Sopenharmony_ciusing v8::FunctionTemplate;
171cb0ef41Sopenharmony_ciusing v8::Local;
181cb0ef41Sopenharmony_ciusing v8::Object;
191cb0ef41Sopenharmony_ci
201cb0ef41Sopenharmony_cinamespace quic {
211cb0ef41Sopenharmony_ci
221cb0ef41Sopenharmony_ciLocal<FunctionTemplate> LogStream::GetConstructorTemplate(Environment* env) {
231cb0ef41Sopenharmony_ci  auto& state = BindingData::Get(env);
241cb0ef41Sopenharmony_ci  auto tmpl = state.logstream_constructor_template();
251cb0ef41Sopenharmony_ci  if (tmpl.IsEmpty()) {
261cb0ef41Sopenharmony_ci    tmpl = FunctionTemplate::New(env->isolate());
271cb0ef41Sopenharmony_ci    tmpl->Inherit(AsyncWrap::GetConstructorTemplate(env));
281cb0ef41Sopenharmony_ci    tmpl->InstanceTemplate()->SetInternalFieldCount(
291cb0ef41Sopenharmony_ci        StreamBase::kInternalFieldCount);
301cb0ef41Sopenharmony_ci    tmpl->SetClassName(state.logstream_string());
311cb0ef41Sopenharmony_ci    StreamBase::AddMethods(env, tmpl);
321cb0ef41Sopenharmony_ci    state.set_logstream_constructor_template(tmpl);
331cb0ef41Sopenharmony_ci  }
341cb0ef41Sopenharmony_ci  return tmpl;
351cb0ef41Sopenharmony_ci}
361cb0ef41Sopenharmony_ci
371cb0ef41Sopenharmony_ciBaseObjectPtr<LogStream> LogStream::Create(Environment* env) {
381cb0ef41Sopenharmony_ci  v8::Local<v8::Object> obj;
391cb0ef41Sopenharmony_ci  if (!GetConstructorTemplate(env)
401cb0ef41Sopenharmony_ci           ->InstanceTemplate()
411cb0ef41Sopenharmony_ci           ->NewInstance(env->context())
421cb0ef41Sopenharmony_ci           .ToLocal(&obj)) {
431cb0ef41Sopenharmony_ci    return BaseObjectPtr<LogStream>();
441cb0ef41Sopenharmony_ci  }
451cb0ef41Sopenharmony_ci  return MakeDetachedBaseObject<LogStream>(env, obj);
461cb0ef41Sopenharmony_ci}
471cb0ef41Sopenharmony_ci
481cb0ef41Sopenharmony_ciLogStream::LogStream(Environment* env, Local<Object> obj)
491cb0ef41Sopenharmony_ci    : AsyncWrap(env, obj, AsyncWrap::PROVIDER_QUIC_LOGSTREAM), StreamBase(env) {
501cb0ef41Sopenharmony_ci  MakeWeak();
511cb0ef41Sopenharmony_ci  StreamBase::AttachToObject(GetObject());
521cb0ef41Sopenharmony_ci}
531cb0ef41Sopenharmony_ci
541cb0ef41Sopenharmony_civoid LogStream::Emit(const uint8_t* data, size_t len, EmitOption option) {
551cb0ef41Sopenharmony_ci  if (fin_seen_) return;
561cb0ef41Sopenharmony_ci  fin_seen_ = option == EmitOption::FIN;
571cb0ef41Sopenharmony_ci
581cb0ef41Sopenharmony_ci  size_t remaining = len;
591cb0ef41Sopenharmony_ci  // If the len is greater than the size of the buffer returned by
601cb0ef41Sopenharmony_ci  // EmitAlloc then EmitRead will be called multiple times.
611cb0ef41Sopenharmony_ci  while (remaining != 0) {
621cb0ef41Sopenharmony_ci    uv_buf_t buf = EmitAlloc(len);
631cb0ef41Sopenharmony_ci    size_t len = std::min<size_t>(remaining, buf.len);
641cb0ef41Sopenharmony_ci    memcpy(buf.base, data, len);
651cb0ef41Sopenharmony_ci    remaining -= len;
661cb0ef41Sopenharmony_ci    data += len;
671cb0ef41Sopenharmony_ci    // If we are actively reading from the stream, we'll call emit
681cb0ef41Sopenharmony_ci    // read immediately. Otherwise we buffer the chunk and will push
691cb0ef41Sopenharmony_ci    // the chunks out the next time ReadStart() is called.
701cb0ef41Sopenharmony_ci    if (reading_) {
711cb0ef41Sopenharmony_ci      EmitRead(len, buf);
721cb0ef41Sopenharmony_ci    } else {
731cb0ef41Sopenharmony_ci      // The total measures the total memory used so we always
741cb0ef41Sopenharmony_ci      // increment but buf.len and not chunk len.
751cb0ef41Sopenharmony_ci      ensure_space(buf.len);
761cb0ef41Sopenharmony_ci      total_ += buf.len;
771cb0ef41Sopenharmony_ci      buffer_.push_back(Chunk{len, buf});
781cb0ef41Sopenharmony_ci    }
791cb0ef41Sopenharmony_ci  }
801cb0ef41Sopenharmony_ci
811cb0ef41Sopenharmony_ci  if (ended_ && reading_) {
821cb0ef41Sopenharmony_ci    EmitRead(UV_EOF);
831cb0ef41Sopenharmony_ci  }
841cb0ef41Sopenharmony_ci}
851cb0ef41Sopenharmony_ci
861cb0ef41Sopenharmony_civoid LogStream::Emit(const std::string_view line, EmitOption option) {
871cb0ef41Sopenharmony_ci  Emit(reinterpret_cast<const uint8_t*>(line.data()), line.length(), option);
881cb0ef41Sopenharmony_ci}
891cb0ef41Sopenharmony_ci
901cb0ef41Sopenharmony_civoid LogStream::End() {
911cb0ef41Sopenharmony_ci  ended_ = true;
921cb0ef41Sopenharmony_ci}
931cb0ef41Sopenharmony_ci
941cb0ef41Sopenharmony_ciint LogStream::ReadStart() {
951cb0ef41Sopenharmony_ci  if (reading_) return 0;
961cb0ef41Sopenharmony_ci  // Flush any chunks that have already been buffered.
971cb0ef41Sopenharmony_ci  for (const auto& chunk : buffer_) EmitRead(chunk.len, chunk.buf);
981cb0ef41Sopenharmony_ci  total_ = 0;
991cb0ef41Sopenharmony_ci  buffer_.clear();
1001cb0ef41Sopenharmony_ci  if (fin_seen_) {
1011cb0ef41Sopenharmony_ci    // If we've already received the fin, there's nothing else to wait for.
1021cb0ef41Sopenharmony_ci    EmitRead(UV_EOF);
1031cb0ef41Sopenharmony_ci    return ReadStop();
1041cb0ef41Sopenharmony_ci  }
1051cb0ef41Sopenharmony_ci  // Otherwise, we're going to wait for more chunks to be written.
1061cb0ef41Sopenharmony_ci  reading_ = true;
1071cb0ef41Sopenharmony_ci  return 0;
1081cb0ef41Sopenharmony_ci}
1091cb0ef41Sopenharmony_ci
1101cb0ef41Sopenharmony_ciint LogStream::ReadStop() {
1111cb0ef41Sopenharmony_ci  reading_ = false;
1121cb0ef41Sopenharmony_ci  return 0;
1131cb0ef41Sopenharmony_ci}
1141cb0ef41Sopenharmony_ci
1151cb0ef41Sopenharmony_ci// We do not use either of these.
1161cb0ef41Sopenharmony_ciint LogStream::DoShutdown(ShutdownWrap* req_wrap) {
1171cb0ef41Sopenharmony_ci  UNREACHABLE();
1181cb0ef41Sopenharmony_ci}
1191cb0ef41Sopenharmony_ciint LogStream::DoWrite(WriteWrap* w,
1201cb0ef41Sopenharmony_ci                       uv_buf_t* bufs,
1211cb0ef41Sopenharmony_ci                       size_t count,
1221cb0ef41Sopenharmony_ci                       uv_stream_t* send_handle) {
1231cb0ef41Sopenharmony_ci  UNREACHABLE();
1241cb0ef41Sopenharmony_ci}
1251cb0ef41Sopenharmony_ci
1261cb0ef41Sopenharmony_cibool LogStream::IsAlive() {
1271cb0ef41Sopenharmony_ci  return !ended_;
1281cb0ef41Sopenharmony_ci}
1291cb0ef41Sopenharmony_ci
1301cb0ef41Sopenharmony_cibool LogStream::IsClosing() {
1311cb0ef41Sopenharmony_ci  return ended_;
1321cb0ef41Sopenharmony_ci}
1331cb0ef41Sopenharmony_ci
1341cb0ef41Sopenharmony_ciAsyncWrap* LogStream::GetAsyncWrap() {
1351cb0ef41Sopenharmony_ci  return this;
1361cb0ef41Sopenharmony_ci}
1371cb0ef41Sopenharmony_ci
1381cb0ef41Sopenharmony_civoid LogStream::MemoryInfo(MemoryTracker* tracker) const {
1391cb0ef41Sopenharmony_ci  tracker->TrackFieldWithSize("buffer", total_);
1401cb0ef41Sopenharmony_ci}
1411cb0ef41Sopenharmony_ci
1421cb0ef41Sopenharmony_ci// The LogStream buffer enforces a maximum size of kMaxLogStreamBuffer.
1431cb0ef41Sopenharmony_civoid LogStream::ensure_space(size_t amt) {
1441cb0ef41Sopenharmony_ci  while (total_ + amt > kMaxLogStreamBuffer) {
1451cb0ef41Sopenharmony_ci    total_ -= buffer_.front().buf.len;
1461cb0ef41Sopenharmony_ci    buffer_.pop_front();
1471cb0ef41Sopenharmony_ci  }
1481cb0ef41Sopenharmony_ci}
1491cb0ef41Sopenharmony_ci}  // namespace quic
1501cb0ef41Sopenharmony_ci}  // namespace node
1511cb0ef41Sopenharmony_ci
1521cb0ef41Sopenharmony_ci#endif  // HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC
153