xref: /third_party/node/src/dataqueue/queue.h (revision 1cb0ef41)
1#pragma once
2
3#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
4
5#include <base_object.h>
6#include <memory_tracker.h>
7#include <node.h>
8#include <node_bob.h>
9#include <node_file.h>
10#include <stream_base.h>
11#include <uv.h>
12#include <v8.h>
13
14#include <memory>
15#include <optional>
16#include <vector>
17
18namespace node {
19
20// Represents a sequenced collection of data sources that can be
21// consumed as a single logical stream of data. Sources can be
22// memory-resident or streaming.
23//
24// There are two essential kinds of DataQueue:
25//
26// * Idempotent - Multiple reads always produce the same result.
27//                This is even the case if individual sources
28//                are not memory-resident. Reads never change
29//                the state of the DataQueue. Every entry in
30//                an Idempotent DataQueue must also be idempotent.
31//
32// * Non-idempotent - Reads are destructive of the internal state.
33//                    A non-idempotent DataQueue can be read at
34//                    most once and only by a single reader.
35//                    Entries in a non-idempotent DataQueue can
36//                    be a mix of idempotent and non-idempotent
37//                    entries.
38//
39// The DataQueue is essentially a collection of DataQueue::Entry
40// instances. A DataQueue::Entry is a single logical source of
41// data. The data may be memory-resident or streaming. The entry
42// can be idempotent or non-idempotent. An entry cannot be read
43// by itself, it must be part of a DataQueue to be consumed.
44//
45// Example of creating an idempotent DataQueue:
46//
47//   std::shared_ptr<v8::BackingStore> store1 = getBackingStoreSomehow();
48//   std::shared_ptr<v8::BackingStore> store2 = getBackingStoreSomehow();
49//
50//   std::vector<std::unique_ptr<DataQueue::Entry>> list;
51//   list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(
52//       store1, 0, len1));
53//   list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(
54//       store2, 0, len2));
55//
56//   std::shared_ptr<DataQueue> data_queue =
57//       DataQueue::CreateIdempotent(std::move(list));
58//
59// Importantly, idempotent DataQueue's are immutable and all entries
60// must be provided when the DataQueue is constructed. Every entry
61// must be idempotent with known sizes. The entries may be memory
62// resident or streaming. Streaming entries must be capable of
63// being read multiple times.
64//
65// Because idempotent DataQueue's will always produce the same results
66// when read, they can be sliced. Slices yield a new DataQueue instance
67// that is a subset view over the original:
68//
69//   std::shared_ptr<DataQueue> slice = data_queue.slice(
70//       5, v8::Just(10UL));
71//
72// Example of creating a non-idempotent DataQueue:
73//
74//   std::shared_ptr<v8::BackingStore> store1 = getBackingStoreSomehow();
75//   std::shared_ptr<v8::BackingStore> store2 = getBackingStoreSomehow();
76//
77//   std::shared_ptr<DataQueue> data_queue = DataQueue::Create();
78//
79//   data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore(
80//       store1, 0, len1));
81//
82//   data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore(
83//       store2, 0, len2));
84//
85// These data-queues can have new entries appended to them. Entries can
86// be memory-resident or streaming. Streaming entries might not have
87// a known size. Entries may not be capable of being read multiple
88// times.
89//
90// A non-idempotent data queue will, by default, allow any amount of
91// entries to be appended to it. To limit the size of the DataQueue,
92// or the close the DataQueue (preventing new entries from being
93// appending), use the cap() method. The DataQueue can be capped
94// at a specific size or whatever size it currently it.
95//
96// It might not be possible for a non-idempotent DataQueue to provide
97// a size because it might not know how much data a streaming entry
98// will ultimately provide.
99//
100// Non-idempotent DataQueues cannot be sliced.
101//
102// To read from a DataQueue, we use the node::bob::Source API
103// (see src/node_bob.h).
104//
105//   std::shared_ptr<DataQueue::Reader> reader = data_queue->get_reader();
106//
107//   reader->Pull(
108//        [](int status, const DataQueue::Vec* vecs,
109//           uint64_t count, Done done) {
110//     // status is one of node::bob::Status
111//     // vecs is zero or more data buffers containing the read data
112//     // count is the number of vecs
113//     // done is a callback to be invoked when done processing the data
114//   }, options, nullptr, 0, 16);
115//
116// Keep calling Pull() until status is equal to node::bob::Status::STATUS_EOS.
117//
118// For idempotent DataQueues, any number of readers can be created and
119// pull concurrently from the same DataQueue. The DataQueue can be read
120// multiple times. Successful reads should always produce the same result.
121// If, for whatever reason, the implementation cannot ensure that the
122// data read will remain the same, the read must fail with an error status.
123//
124// For non-idempotent DataQueues, only a single reader is ever allowed for
125// the DataQueue, and the data can only ever be read once.
126
127class DataQueue : public MemoryRetainer {
128 public:
129  struct Vec {
130    uint8_t* base;
131    uint64_t len;
132  };
133
134  // A DataQueue::Reader consumes the DataQueue. If the data queue is
135  // idempotent, multiple Readers can be attached to the DataQueue at
136  // any given time, all guaranteed to yield the same result when the
137  // data is read. Otherwise, only a single Reader can be attached.
138  class Reader : public MemoryRetainer, public bob::Source<Vec> {
139   public:
140    using Next = bob::Next<Vec>;
141    using Done = bob::Done;
142  };
143
144  // A DataQueue::Entry represents a logical chunk of data in the queue.
145  // The entry may or may not represent memory-resident data. It may
146  // or may not be consumable more than once.
147  class Entry : public MemoryRetainer {
148   public:
149    // Returns a new Entry that is a view over this entries data
150    // from the start offset to the ending offset. If the end
151    // offset is omitted, the slice extends to the end of the
152    // data.
153    //
154    // Creating a slice is only possible if is_idempotent() returns
155    // true. This is because consuming either the original entry or
156    // the new entry would change the state of the other in non-
157    // deterministic ways. When is_idempotent() returns false, slice()
158    // must return a nulled unique_ptr.
159    //
160    // Creating a slice is also only possible if the size of the
161    // entry is known. If size() returns std::nullopt, slice()
162    // must return a nulled unique_ptr.
163    virtual std::unique_ptr<Entry> slice(
164        uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0;
165
166    // Returns the number of bytes represented by this Entry if it is
167    // known. Certain types of entries, such as those backed by streams
168    // might not know the size in advance and therefore cannot provide
169    // a value. In such cases, size() must return v8::Nothing<uint64_t>.
170    //
171    // If the entry is idempotent, a size should always be available.
172    virtual std::optional<uint64_t> size() const = 0;
173
174    // When true, multiple reads on the object must produce the exact
175    // same data or the reads will fail. Some sources of entry data,
176    // such as streams, may not be capable of preserving idempotency
177    // and therefore must not claim to be. If an entry claims to be
178    // idempotent and cannot preserve that quality, subsequent reads
179    // must fail with an error when a variance is detected.
180    virtual bool is_idempotent() const = 0;
181  };
182
183  // Creates an idempotent DataQueue with a pre-established collection
184  // of entries. All of the entries must also be idempotent otherwise
185  // an empty std::unique_ptr will be returned.
186  static std::shared_ptr<DataQueue> CreateIdempotent(
187      std::vector<std::unique_ptr<Entry>> list);
188
189  // Creates a non-idempotent DataQueue. This kind of queue can be
190  // mutated and updated such that multiple reads are not guaranteed
191  // to produce the same result. The entries added can be of any type.
192  static std::shared_ptr<DataQueue> Create(
193      std::optional<uint64_t> capped = std::nullopt);
194
195  // Creates an idempotent Entry from a v8::ArrayBufferView. To help
196  // ensure idempotency, the underlying ArrayBuffer is detached from
197  // the BackingStore. It is the callers responsibility to ensure that
198  // the BackingStore is not otherwise modified through any other
199  // means. If the ArrayBuffer is not detachable, nullptr will be
200  // returned.
201  static std::unique_ptr<Entry> CreateInMemoryEntryFromView(
202      v8::Local<v8::ArrayBufferView> view);
203
204  // Creates an idempotent Entry from a v8::BackingStore. It is the
205  // callers responsibility to ensure that the BackingStore is not
206  // otherwise modified through any other means. If the ArrayBuffer
207  // is not detachable, nullptr will be returned.
208  static std::unique_ptr<Entry> CreateInMemoryEntryFromBackingStore(
209      std::shared_ptr<v8::BackingStore> store,
210      uint64_t offset,
211      uint64_t length);
212
213  static std::unique_ptr<Entry> CreateDataQueueEntry(
214      std::shared_ptr<DataQueue> data_queue);
215
216  static std::unique_ptr<Entry> CreateFdEntry(Environment* env,
217                                              v8::Local<v8::Value> path);
218
219  // Creates a Reader for the given queue. If the queue is idempotent,
220  // any number of readers can be created, all of which are guaranteed
221  // to provide the same data. Otherwise, only a single reader is
222  // permitted.
223  virtual std::shared_ptr<Reader> get_reader() = 0;
224
225  // Append a single new entry to the queue. Appending is only allowed
226  // when is_idempotent() is false. std::nullopt will be returned
227  // if is_idempotent() is true. std::optional(false) will be returned if the
228  // data queue is not idempotent but the entry otherwise cannot be added.
229  virtual std::optional<bool> append(std::unique_ptr<Entry> entry) = 0;
230
231  // Caps the size of this DataQueue preventing additional entries to
232  // be added if those cause the size to extend beyond the specified
233  // limit.
234  //
235  // If limit is zero, or is less than the known current size of the
236  // data queue, the limit is set to the current known size, meaning
237  // that no additional entries can be added at all.
238  //
239  // If the size of the data queue is not known, the limit will be
240  // ignored and no additional entries will be allowed at all.
241  //
242  // If is_idempotent is true capping is unnecessary because the data
243  // queue cannot be appended to. In that case, cap() is a non-op.
244  //
245  // If the data queue has already been capped, cap can be called
246  // again with a smaller size.
247  virtual void cap(uint64_t limit = 0) = 0;
248
249  // Returns a new DataQueue that is a view over this queues data
250  // from the start offset to the ending offset. If the end offset
251  // is omitted, the slice extends to the end of the data.
252  //
253  // The slice will coverage a range from start up to, but excluding, end.
254  //
255  // Creating a slice is only possible if is_idempotent() returns
256  // true. This is because consuming either the original DataQueue or
257  // the new queue would change the state of the other in non-
258  // deterministic ways. When is_idempotent() returns false, slice()
259  // must return a nulled unique_ptr.
260  //
261  // Creating a slice is also only possible if the size of the
262  // DataQueue is known. If size() returns std::nullopt, slice()
263  // must return a null unique_ptr.
264  virtual std::shared_ptr<DataQueue> slice(
265      uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0;
266
267  // The size of DataQueue is the total size of all of its member entries.
268  // If any of the entries is not able to specify a size, the DataQueue
269  // will also be incapable of doing so, in which case size() must return
270  // std::nullopt.
271  virtual std::optional<uint64_t> size() const = 0;
272
273  // A DataQueue is idempotent only if all of its member entries are
274  // idempotent.
275  virtual bool is_idempotent() const = 0;
276
277  // True only if cap is called or the data queue is a limited to a
278  // fixed size.
279  virtual bool is_capped() const = 0;
280
281  // If the data queue has been capped, and the size of the data queue
282  // is known, maybeCapRemaining will return the number of additional
283  // bytes the data queue can receive before reaching the cap limit.
284  // If the size of the queue cannot be known, or the cap has not
285  // been set, maybeCapRemaining() will return std::nullopt.
286  virtual std::optional<uint64_t> maybeCapRemaining() const = 0;
287
288  static void Initialize(Environment* env, v8::Local<v8::Object> target);
289  static void RegisterExternalReferences(ExternalReferenceRegistry* registry);
290};
291
292}  // namespace node
293
294#endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
295