11cb0ef41Sopenharmony_ci#pragma once
21cb0ef41Sopenharmony_ci
31cb0ef41Sopenharmony_ci#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
41cb0ef41Sopenharmony_ci
51cb0ef41Sopenharmony_ci#include <base_object.h>
61cb0ef41Sopenharmony_ci#include <memory_tracker.h>
71cb0ef41Sopenharmony_ci#include <node.h>
81cb0ef41Sopenharmony_ci#include <node_bob.h>
91cb0ef41Sopenharmony_ci#include <node_file.h>
101cb0ef41Sopenharmony_ci#include <stream_base.h>
111cb0ef41Sopenharmony_ci#include <uv.h>
121cb0ef41Sopenharmony_ci#include <v8.h>
131cb0ef41Sopenharmony_ci
141cb0ef41Sopenharmony_ci#include <memory>
151cb0ef41Sopenharmony_ci#include <optional>
161cb0ef41Sopenharmony_ci#include <vector>
171cb0ef41Sopenharmony_ci
181cb0ef41Sopenharmony_cinamespace node {
191cb0ef41Sopenharmony_ci
201cb0ef41Sopenharmony_ci// Represents a sequenced collection of data sources that can be
211cb0ef41Sopenharmony_ci// consumed as a single logical stream of data. Sources can be
221cb0ef41Sopenharmony_ci// memory-resident or streaming.
231cb0ef41Sopenharmony_ci//
241cb0ef41Sopenharmony_ci// There are two essential kinds of DataQueue:
251cb0ef41Sopenharmony_ci//
261cb0ef41Sopenharmony_ci// * Idempotent - Multiple reads always produce the same result.
271cb0ef41Sopenharmony_ci//                This is even the case if individual sources
281cb0ef41Sopenharmony_ci//                are not memory-resident. Reads never change
291cb0ef41Sopenharmony_ci//                the state of the DataQueue. Every entry in
301cb0ef41Sopenharmony_ci//                an Idempotent DataQueue must also be idempotent.
311cb0ef41Sopenharmony_ci//
321cb0ef41Sopenharmony_ci// * Non-idempotent - Reads are destructive of the internal state.
331cb0ef41Sopenharmony_ci//                    A non-idempotent DataQueue can be read at
341cb0ef41Sopenharmony_ci//                    most once and only by a single reader.
351cb0ef41Sopenharmony_ci//                    Entries in a non-idempotent DataQueue can
361cb0ef41Sopenharmony_ci//                    be a mix of idempotent and non-idempotent
371cb0ef41Sopenharmony_ci//                    entries.
381cb0ef41Sopenharmony_ci//
391cb0ef41Sopenharmony_ci// The DataQueue is essentially a collection of DataQueue::Entry
401cb0ef41Sopenharmony_ci// instances. A DataQueue::Entry is a single logical source of
411cb0ef41Sopenharmony_ci// data. The data may be memory-resident or streaming. The entry
421cb0ef41Sopenharmony_ci// can be idempotent or non-idempotent. An entry cannot be read
431cb0ef41Sopenharmony_ci// by itself, it must be part of a DataQueue to be consumed.
441cb0ef41Sopenharmony_ci//
451cb0ef41Sopenharmony_ci// Example of creating an idempotent DataQueue:
461cb0ef41Sopenharmony_ci//
471cb0ef41Sopenharmony_ci//   std::shared_ptr<v8::BackingStore> store1 = getBackingStoreSomehow();
481cb0ef41Sopenharmony_ci//   std::shared_ptr<v8::BackingStore> store2 = getBackingStoreSomehow();
491cb0ef41Sopenharmony_ci//
501cb0ef41Sopenharmony_ci//   std::vector<std::unique_ptr<DataQueue::Entry>> list;
511cb0ef41Sopenharmony_ci//   list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(
521cb0ef41Sopenharmony_ci//       store1, 0, len1));
531cb0ef41Sopenharmony_ci//   list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(
541cb0ef41Sopenharmony_ci//       store2, 0, len2));
551cb0ef41Sopenharmony_ci//
561cb0ef41Sopenharmony_ci//   std::shared_ptr<DataQueue> data_queue =
571cb0ef41Sopenharmony_ci//       DataQueue::CreateIdempotent(std::move(list));
581cb0ef41Sopenharmony_ci//
591cb0ef41Sopenharmony_ci// Importantly, idempotent DataQueue's are immutable and all entries
601cb0ef41Sopenharmony_ci// must be provided when the DataQueue is constructed. Every entry
611cb0ef41Sopenharmony_ci// must be idempotent with known sizes. The entries may be memory
621cb0ef41Sopenharmony_ci// resident or streaming. Streaming entries must be capable of
631cb0ef41Sopenharmony_ci// being read multiple times.
641cb0ef41Sopenharmony_ci//
651cb0ef41Sopenharmony_ci// Because idempotent DataQueue's will always produce the same results
661cb0ef41Sopenharmony_ci// when read, they can be sliced. Slices yield a new DataQueue instance
671cb0ef41Sopenharmony_ci// that is a subset view over the original:
681cb0ef41Sopenharmony_ci//
691cb0ef41Sopenharmony_ci//   std::shared_ptr<DataQueue> slice = data_queue.slice(
701cb0ef41Sopenharmony_ci//       5, v8::Just(10UL));
711cb0ef41Sopenharmony_ci//
721cb0ef41Sopenharmony_ci// Example of creating a non-idempotent DataQueue:
731cb0ef41Sopenharmony_ci//
741cb0ef41Sopenharmony_ci//   std::shared_ptr<v8::BackingStore> store1 = getBackingStoreSomehow();
751cb0ef41Sopenharmony_ci//   std::shared_ptr<v8::BackingStore> store2 = getBackingStoreSomehow();
761cb0ef41Sopenharmony_ci//
771cb0ef41Sopenharmony_ci//   std::shared_ptr<DataQueue> data_queue = DataQueue::Create();
781cb0ef41Sopenharmony_ci//
791cb0ef41Sopenharmony_ci//   data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore(
801cb0ef41Sopenharmony_ci//       store1, 0, len1));
811cb0ef41Sopenharmony_ci//
821cb0ef41Sopenharmony_ci//   data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore(
831cb0ef41Sopenharmony_ci//       store2, 0, len2));
841cb0ef41Sopenharmony_ci//
851cb0ef41Sopenharmony_ci// These data-queues can have new entries appended to them. Entries can
861cb0ef41Sopenharmony_ci// be memory-resident or streaming. Streaming entries might not have
871cb0ef41Sopenharmony_ci// a known size. Entries may not be capable of being read multiple
881cb0ef41Sopenharmony_ci// times.
891cb0ef41Sopenharmony_ci//
901cb0ef41Sopenharmony_ci// A non-idempotent data queue will, by default, allow any amount of
911cb0ef41Sopenharmony_ci// entries to be appended to it. To limit the size of the DataQueue,
921cb0ef41Sopenharmony_ci// or the close the DataQueue (preventing new entries from being
931cb0ef41Sopenharmony_ci// appending), use the cap() method. The DataQueue can be capped
941cb0ef41Sopenharmony_ci// at a specific size or whatever size it currently it.
951cb0ef41Sopenharmony_ci//
961cb0ef41Sopenharmony_ci// It might not be possible for a non-idempotent DataQueue to provide
971cb0ef41Sopenharmony_ci// a size because it might not know how much data a streaming entry
981cb0ef41Sopenharmony_ci// will ultimately provide.
991cb0ef41Sopenharmony_ci//
1001cb0ef41Sopenharmony_ci// Non-idempotent DataQueues cannot be sliced.
1011cb0ef41Sopenharmony_ci//
1021cb0ef41Sopenharmony_ci// To read from a DataQueue, we use the node::bob::Source API
1031cb0ef41Sopenharmony_ci// (see src/node_bob.h).
1041cb0ef41Sopenharmony_ci//
1051cb0ef41Sopenharmony_ci//   std::shared_ptr<DataQueue::Reader> reader = data_queue->get_reader();
1061cb0ef41Sopenharmony_ci//
1071cb0ef41Sopenharmony_ci//   reader->Pull(
1081cb0ef41Sopenharmony_ci//        [](int status, const DataQueue::Vec* vecs,
1091cb0ef41Sopenharmony_ci//           uint64_t count, Done done) {
1101cb0ef41Sopenharmony_ci//     // status is one of node::bob::Status
1111cb0ef41Sopenharmony_ci//     // vecs is zero or more data buffers containing the read data
1121cb0ef41Sopenharmony_ci//     // count is the number of vecs
1131cb0ef41Sopenharmony_ci//     // done is a callback to be invoked when done processing the data
1141cb0ef41Sopenharmony_ci//   }, options, nullptr, 0, 16);
1151cb0ef41Sopenharmony_ci//
1161cb0ef41Sopenharmony_ci// Keep calling Pull() until status is equal to node::bob::Status::STATUS_EOS.
1171cb0ef41Sopenharmony_ci//
1181cb0ef41Sopenharmony_ci// For idempotent DataQueues, any number of readers can be created and
1191cb0ef41Sopenharmony_ci// pull concurrently from the same DataQueue. The DataQueue can be read
1201cb0ef41Sopenharmony_ci// multiple times. Successful reads should always produce the same result.
1211cb0ef41Sopenharmony_ci// If, for whatever reason, the implementation cannot ensure that the
1221cb0ef41Sopenharmony_ci// data read will remain the same, the read must fail with an error status.
1231cb0ef41Sopenharmony_ci//
1241cb0ef41Sopenharmony_ci// For non-idempotent DataQueues, only a single reader is ever allowed for
1251cb0ef41Sopenharmony_ci// the DataQueue, and the data can only ever be read once.
1261cb0ef41Sopenharmony_ci
1271cb0ef41Sopenharmony_ciclass DataQueue : public MemoryRetainer {
1281cb0ef41Sopenharmony_ci public:
1291cb0ef41Sopenharmony_ci  struct Vec {
1301cb0ef41Sopenharmony_ci    uint8_t* base;
1311cb0ef41Sopenharmony_ci    uint64_t len;
1321cb0ef41Sopenharmony_ci  };
1331cb0ef41Sopenharmony_ci
1341cb0ef41Sopenharmony_ci  // A DataQueue::Reader consumes the DataQueue. If the data queue is
1351cb0ef41Sopenharmony_ci  // idempotent, multiple Readers can be attached to the DataQueue at
1361cb0ef41Sopenharmony_ci  // any given time, all guaranteed to yield the same result when the
1371cb0ef41Sopenharmony_ci  // data is read. Otherwise, only a single Reader can be attached.
1381cb0ef41Sopenharmony_ci  class Reader : public MemoryRetainer, public bob::Source<Vec> {
1391cb0ef41Sopenharmony_ci   public:
1401cb0ef41Sopenharmony_ci    using Next = bob::Next<Vec>;
1411cb0ef41Sopenharmony_ci    using Done = bob::Done;
1421cb0ef41Sopenharmony_ci  };
1431cb0ef41Sopenharmony_ci
1441cb0ef41Sopenharmony_ci  // A DataQueue::Entry represents a logical chunk of data in the queue.
1451cb0ef41Sopenharmony_ci  // The entry may or may not represent memory-resident data. It may
1461cb0ef41Sopenharmony_ci  // or may not be consumable more than once.
1471cb0ef41Sopenharmony_ci  class Entry : public MemoryRetainer {
1481cb0ef41Sopenharmony_ci   public:
1491cb0ef41Sopenharmony_ci    // Returns a new Entry that is a view over this entries data
1501cb0ef41Sopenharmony_ci    // from the start offset to the ending offset. If the end
1511cb0ef41Sopenharmony_ci    // offset is omitted, the slice extends to the end of the
1521cb0ef41Sopenharmony_ci    // data.
1531cb0ef41Sopenharmony_ci    //
1541cb0ef41Sopenharmony_ci    // Creating a slice is only possible if is_idempotent() returns
1551cb0ef41Sopenharmony_ci    // true. This is because consuming either the original entry or
1561cb0ef41Sopenharmony_ci    // the new entry would change the state of the other in non-
1571cb0ef41Sopenharmony_ci    // deterministic ways. When is_idempotent() returns false, slice()
1581cb0ef41Sopenharmony_ci    // must return a nulled unique_ptr.
1591cb0ef41Sopenharmony_ci    //
1601cb0ef41Sopenharmony_ci    // Creating a slice is also only possible if the size of the
1611cb0ef41Sopenharmony_ci    // entry is known. If size() returns std::nullopt, slice()
1621cb0ef41Sopenharmony_ci    // must return a nulled unique_ptr.
1631cb0ef41Sopenharmony_ci    virtual std::unique_ptr<Entry> slice(
1641cb0ef41Sopenharmony_ci        uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0;
1651cb0ef41Sopenharmony_ci
1661cb0ef41Sopenharmony_ci    // Returns the number of bytes represented by this Entry if it is
1671cb0ef41Sopenharmony_ci    // known. Certain types of entries, such as those backed by streams
1681cb0ef41Sopenharmony_ci    // might not know the size in advance and therefore cannot provide
1691cb0ef41Sopenharmony_ci    // a value. In such cases, size() must return v8::Nothing<uint64_t>.
1701cb0ef41Sopenharmony_ci    //
1711cb0ef41Sopenharmony_ci    // If the entry is idempotent, a size should always be available.
1721cb0ef41Sopenharmony_ci    virtual std::optional<uint64_t> size() const = 0;
1731cb0ef41Sopenharmony_ci
1741cb0ef41Sopenharmony_ci    // When true, multiple reads on the object must produce the exact
1751cb0ef41Sopenharmony_ci    // same data or the reads will fail. Some sources of entry data,
1761cb0ef41Sopenharmony_ci    // such as streams, may not be capable of preserving idempotency
1771cb0ef41Sopenharmony_ci    // and therefore must not claim to be. If an entry claims to be
1781cb0ef41Sopenharmony_ci    // idempotent and cannot preserve that quality, subsequent reads
1791cb0ef41Sopenharmony_ci    // must fail with an error when a variance is detected.
1801cb0ef41Sopenharmony_ci    virtual bool is_idempotent() const = 0;
1811cb0ef41Sopenharmony_ci  };
1821cb0ef41Sopenharmony_ci
1831cb0ef41Sopenharmony_ci  // Creates an idempotent DataQueue with a pre-established collection
1841cb0ef41Sopenharmony_ci  // of entries. All of the entries must also be idempotent otherwise
1851cb0ef41Sopenharmony_ci  // an empty std::unique_ptr will be returned.
1861cb0ef41Sopenharmony_ci  static std::shared_ptr<DataQueue> CreateIdempotent(
1871cb0ef41Sopenharmony_ci      std::vector<std::unique_ptr<Entry>> list);
1881cb0ef41Sopenharmony_ci
1891cb0ef41Sopenharmony_ci  // Creates a non-idempotent DataQueue. This kind of queue can be
1901cb0ef41Sopenharmony_ci  // mutated and updated such that multiple reads are not guaranteed
1911cb0ef41Sopenharmony_ci  // to produce the same result. The entries added can be of any type.
1921cb0ef41Sopenharmony_ci  static std::shared_ptr<DataQueue> Create(
1931cb0ef41Sopenharmony_ci      std::optional<uint64_t> capped = std::nullopt);
1941cb0ef41Sopenharmony_ci
1951cb0ef41Sopenharmony_ci  // Creates an idempotent Entry from a v8::ArrayBufferView. To help
1961cb0ef41Sopenharmony_ci  // ensure idempotency, the underlying ArrayBuffer is detached from
1971cb0ef41Sopenharmony_ci  // the BackingStore. It is the callers responsibility to ensure that
1981cb0ef41Sopenharmony_ci  // the BackingStore is not otherwise modified through any other
1991cb0ef41Sopenharmony_ci  // means. If the ArrayBuffer is not detachable, nullptr will be
2001cb0ef41Sopenharmony_ci  // returned.
2011cb0ef41Sopenharmony_ci  static std::unique_ptr<Entry> CreateInMemoryEntryFromView(
2021cb0ef41Sopenharmony_ci      v8::Local<v8::ArrayBufferView> view);
2031cb0ef41Sopenharmony_ci
2041cb0ef41Sopenharmony_ci  // Creates an idempotent Entry from a v8::BackingStore. It is the
2051cb0ef41Sopenharmony_ci  // callers responsibility to ensure that the BackingStore is not
2061cb0ef41Sopenharmony_ci  // otherwise modified through any other means. If the ArrayBuffer
2071cb0ef41Sopenharmony_ci  // is not detachable, nullptr will be returned.
2081cb0ef41Sopenharmony_ci  static std::unique_ptr<Entry> CreateInMemoryEntryFromBackingStore(
2091cb0ef41Sopenharmony_ci      std::shared_ptr<v8::BackingStore> store,
2101cb0ef41Sopenharmony_ci      uint64_t offset,
2111cb0ef41Sopenharmony_ci      uint64_t length);
2121cb0ef41Sopenharmony_ci
2131cb0ef41Sopenharmony_ci  static std::unique_ptr<Entry> CreateDataQueueEntry(
2141cb0ef41Sopenharmony_ci      std::shared_ptr<DataQueue> data_queue);
2151cb0ef41Sopenharmony_ci
2161cb0ef41Sopenharmony_ci  static std::unique_ptr<Entry> CreateFdEntry(Environment* env,
2171cb0ef41Sopenharmony_ci                                              v8::Local<v8::Value> path);
2181cb0ef41Sopenharmony_ci
2191cb0ef41Sopenharmony_ci  // Creates a Reader for the given queue. If the queue is idempotent,
2201cb0ef41Sopenharmony_ci  // any number of readers can be created, all of which are guaranteed
2211cb0ef41Sopenharmony_ci  // to provide the same data. Otherwise, only a single reader is
2221cb0ef41Sopenharmony_ci  // permitted.
2231cb0ef41Sopenharmony_ci  virtual std::shared_ptr<Reader> get_reader() = 0;
2241cb0ef41Sopenharmony_ci
2251cb0ef41Sopenharmony_ci  // Append a single new entry to the queue. Appending is only allowed
2261cb0ef41Sopenharmony_ci  // when is_idempotent() is false. std::nullopt will be returned
2271cb0ef41Sopenharmony_ci  // if is_idempotent() is true. std::optional(false) will be returned if the
2281cb0ef41Sopenharmony_ci  // data queue is not idempotent but the entry otherwise cannot be added.
2291cb0ef41Sopenharmony_ci  virtual std::optional<bool> append(std::unique_ptr<Entry> entry) = 0;
2301cb0ef41Sopenharmony_ci
2311cb0ef41Sopenharmony_ci  // Caps the size of this DataQueue preventing additional entries to
2321cb0ef41Sopenharmony_ci  // be added if those cause the size to extend beyond the specified
2331cb0ef41Sopenharmony_ci  // limit.
2341cb0ef41Sopenharmony_ci  //
2351cb0ef41Sopenharmony_ci  // If limit is zero, or is less than the known current size of the
2361cb0ef41Sopenharmony_ci  // data queue, the limit is set to the current known size, meaning
2371cb0ef41Sopenharmony_ci  // that no additional entries can be added at all.
2381cb0ef41Sopenharmony_ci  //
2391cb0ef41Sopenharmony_ci  // If the size of the data queue is not known, the limit will be
2401cb0ef41Sopenharmony_ci  // ignored and no additional entries will be allowed at all.
2411cb0ef41Sopenharmony_ci  //
2421cb0ef41Sopenharmony_ci  // If is_idempotent is true capping is unnecessary because the data
2431cb0ef41Sopenharmony_ci  // queue cannot be appended to. In that case, cap() is a non-op.
2441cb0ef41Sopenharmony_ci  //
2451cb0ef41Sopenharmony_ci  // If the data queue has already been capped, cap can be called
2461cb0ef41Sopenharmony_ci  // again with a smaller size.
2471cb0ef41Sopenharmony_ci  virtual void cap(uint64_t limit = 0) = 0;
2481cb0ef41Sopenharmony_ci
2491cb0ef41Sopenharmony_ci  // Returns a new DataQueue that is a view over this queues data
2501cb0ef41Sopenharmony_ci  // from the start offset to the ending offset. If the end offset
2511cb0ef41Sopenharmony_ci  // is omitted, the slice extends to the end of the data.
2521cb0ef41Sopenharmony_ci  //
2531cb0ef41Sopenharmony_ci  // The slice will coverage a range from start up to, but excluding, end.
2541cb0ef41Sopenharmony_ci  //
2551cb0ef41Sopenharmony_ci  // Creating a slice is only possible if is_idempotent() returns
2561cb0ef41Sopenharmony_ci  // true. This is because consuming either the original DataQueue or
2571cb0ef41Sopenharmony_ci  // the new queue would change the state of the other in non-
2581cb0ef41Sopenharmony_ci  // deterministic ways. When is_idempotent() returns false, slice()
2591cb0ef41Sopenharmony_ci  // must return a nulled unique_ptr.
2601cb0ef41Sopenharmony_ci  //
2611cb0ef41Sopenharmony_ci  // Creating a slice is also only possible if the size of the
2621cb0ef41Sopenharmony_ci  // DataQueue is known. If size() returns std::nullopt, slice()
2631cb0ef41Sopenharmony_ci  // must return a null unique_ptr.
2641cb0ef41Sopenharmony_ci  virtual std::shared_ptr<DataQueue> slice(
2651cb0ef41Sopenharmony_ci      uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0;
2661cb0ef41Sopenharmony_ci
2671cb0ef41Sopenharmony_ci  // The size of DataQueue is the total size of all of its member entries.
2681cb0ef41Sopenharmony_ci  // If any of the entries is not able to specify a size, the DataQueue
2691cb0ef41Sopenharmony_ci  // will also be incapable of doing so, in which case size() must return
2701cb0ef41Sopenharmony_ci  // std::nullopt.
2711cb0ef41Sopenharmony_ci  virtual std::optional<uint64_t> size() const = 0;
2721cb0ef41Sopenharmony_ci
2731cb0ef41Sopenharmony_ci  // A DataQueue is idempotent only if all of its member entries are
2741cb0ef41Sopenharmony_ci  // idempotent.
2751cb0ef41Sopenharmony_ci  virtual bool is_idempotent() const = 0;
2761cb0ef41Sopenharmony_ci
2771cb0ef41Sopenharmony_ci  // True only if cap is called or the data queue is a limited to a
2781cb0ef41Sopenharmony_ci  // fixed size.
2791cb0ef41Sopenharmony_ci  virtual bool is_capped() const = 0;
2801cb0ef41Sopenharmony_ci
2811cb0ef41Sopenharmony_ci  // If the data queue has been capped, and the size of the data queue
2821cb0ef41Sopenharmony_ci  // is known, maybeCapRemaining will return the number of additional
2831cb0ef41Sopenharmony_ci  // bytes the data queue can receive before reaching the cap limit.
2841cb0ef41Sopenharmony_ci  // If the size of the queue cannot be known, or the cap has not
2851cb0ef41Sopenharmony_ci  // been set, maybeCapRemaining() will return std::nullopt.
2861cb0ef41Sopenharmony_ci  virtual std::optional<uint64_t> maybeCapRemaining() const = 0;
2871cb0ef41Sopenharmony_ci
2881cb0ef41Sopenharmony_ci  static void Initialize(Environment* env, v8::Local<v8::Object> target);
2891cb0ef41Sopenharmony_ci  static void RegisterExternalReferences(ExternalReferenceRegistry* registry);
2901cb0ef41Sopenharmony_ci};
2911cb0ef41Sopenharmony_ci
2921cb0ef41Sopenharmony_ci}  // namespace node
2931cb0ef41Sopenharmony_ci
2941cb0ef41Sopenharmony_ci#endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
295