1 #pragma once
2 
3 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
4 
5 #include <base_object.h>
6 #include <memory_tracker.h>
7 #include <node.h>
8 #include <node_bob.h>
9 #include <node_file.h>
10 #include <stream_base.h>
11 #include <uv.h>
12 #include <v8.h>
13 
14 #include <memory>
15 #include <optional>
16 #include <vector>
17 
18 namespace node {
19 
20 // Represents a sequenced collection of data sources that can be
21 // consumed as a single logical stream of data. Sources can be
22 // memory-resident or streaming.
23 //
24 // There are two essential kinds of DataQueue:
25 //
26 // * Idempotent - Multiple reads always produce the same result.
27 //                This is even the case if individual sources
28 //                are not memory-resident. Reads never change
29 //                the state of the DataQueue. Every entry in
30 //                an Idempotent DataQueue must also be idempotent.
31 //
32 // * Non-idempotent - Reads are destructive of the internal state.
33 //                    A non-idempotent DataQueue can be read at
34 //                    most once and only by a single reader.
35 //                    Entries in a non-idempotent DataQueue can
36 //                    be a mix of idempotent and non-idempotent
37 //                    entries.
38 //
39 // The DataQueue is essentially a collection of DataQueue::Entry
40 // instances. A DataQueue::Entry is a single logical source of
41 // data. The data may be memory-resident or streaming. The entry
42 // can be idempotent or non-idempotent. An entry cannot be read
43 // by itself, it must be part of a DataQueue to be consumed.
44 //
45 // Example of creating an idempotent DataQueue:
46 //
47 //   std::shared_ptr<v8::BackingStore> store1 = getBackingStoreSomehow();
48 //   std::shared_ptr<v8::BackingStore> store2 = getBackingStoreSomehow();
49 //
50 //   std::vector<std::unique_ptr<DataQueue::Entry>> list;
51 //   list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(
52 //       store1, 0, len1));
53 //   list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(
54 //       store2, 0, len2));
55 //
56 //   std::shared_ptr<DataQueue> data_queue =
57 //       DataQueue::CreateIdempotent(std::move(list));
58 //
59 // Importantly, idempotent DataQueue's are immutable and all entries
60 // must be provided when the DataQueue is constructed. Every entry
61 // must be idempotent with known sizes. The entries may be memory
62 // resident or streaming. Streaming entries must be capable of
63 // being read multiple times.
64 //
65 // Because idempotent DataQueue's will always produce the same results
66 // when read, they can be sliced. Slices yield a new DataQueue instance
67 // that is a subset view over the original:
68 //
69 //   std::shared_ptr<DataQueue> slice = data_queue.slice(
70 //       5, v8::Just(10UL));
71 //
72 // Example of creating a non-idempotent DataQueue:
73 //
74 //   std::shared_ptr<v8::BackingStore> store1 = getBackingStoreSomehow();
75 //   std::shared_ptr<v8::BackingStore> store2 = getBackingStoreSomehow();
76 //
77 //   std::shared_ptr<DataQueue> data_queue = DataQueue::Create();
78 //
79 //   data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore(
80 //       store1, 0, len1));
81 //
82 //   data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore(
83 //       store2, 0, len2));
84 //
85 // These data-queues can have new entries appended to them. Entries can
86 // be memory-resident or streaming. Streaming entries might not have
87 // a known size. Entries may not be capable of being read multiple
88 // times.
89 //
90 // A non-idempotent data queue will, by default, allow any amount of
91 // entries to be appended to it. To limit the size of the DataQueue,
92 // or the close the DataQueue (preventing new entries from being
93 // appending), use the cap() method. The DataQueue can be capped
94 // at a specific size or whatever size it currently it.
95 //
96 // It might not be possible for a non-idempotent DataQueue to provide
97 // a size because it might not know how much data a streaming entry
98 // will ultimately provide.
99 //
100 // Non-idempotent DataQueues cannot be sliced.
101 //
102 // To read from a DataQueue, we use the node::bob::Source API
103 // (see src/node_bob.h).
104 //
105 //   std::shared_ptr<DataQueue::Reader> reader = data_queue->get_reader();
106 //
107 //   reader->Pull(
108 //        [](int status, const DataQueue::Vec* vecs,
109 //           uint64_t count, Done done) {
110 //     // status is one of node::bob::Status
111 //     // vecs is zero or more data buffers containing the read data
112 //     // count is the number of vecs
113 //     // done is a callback to be invoked when done processing the data
114 //   }, options, nullptr, 0, 16);
115 //
116 // Keep calling Pull() until status is equal to node::bob::Status::STATUS_EOS.
117 //
118 // For idempotent DataQueues, any number of readers can be created and
119 // pull concurrently from the same DataQueue. The DataQueue can be read
120 // multiple times. Successful reads should always produce the same result.
121 // If, for whatever reason, the implementation cannot ensure that the
122 // data read will remain the same, the read must fail with an error status.
123 //
124 // For non-idempotent DataQueues, only a single reader is ever allowed for
125 // the DataQueue, and the data can only ever be read once.
126 
127 class DataQueue : public MemoryRetainer {
128  public:
129   struct Vec {
130     uint8_t* base;
131     uint64_t len;
132   };
133 
134   // A DataQueue::Reader consumes the DataQueue. If the data queue is
135   // idempotent, multiple Readers can be attached to the DataQueue at
136   // any given time, all guaranteed to yield the same result when the
137   // data is read. Otherwise, only a single Reader can be attached.
138   class Reader : public MemoryRetainer, public bob::Source<Vec> {
139    public:
140     using Next = bob::Next<Vec>;
141     using Done = bob::Done;
142   };
143 
144   // A DataQueue::Entry represents a logical chunk of data in the queue.
145   // The entry may or may not represent memory-resident data. It may
146   // or may not be consumable more than once.
147   class Entry : public MemoryRetainer {
148    public:
149     // Returns a new Entry that is a view over this entries data
150     // from the start offset to the ending offset. If the end
151     // offset is omitted, the slice extends to the end of the
152     // data.
153     //
154     // Creating a slice is only possible if is_idempotent() returns
155     // true. This is because consuming either the original entry or
156     // the new entry would change the state of the other in non-
157     // deterministic ways. When is_idempotent() returns false, slice()
158     // must return a nulled unique_ptr.
159     //
160     // Creating a slice is also only possible if the size of the
161     // entry is known. If size() returns std::nullopt, slice()
162     // must return a nulled unique_ptr.
163     virtual std::unique_ptr<Entry> slice(
164         uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0;
165 
166     // Returns the number of bytes represented by this Entry if it is
167     // known. Certain types of entries, such as those backed by streams
168     // might not know the size in advance and therefore cannot provide
169     // a value. In such cases, size() must return v8::Nothing<uint64_t>.
170     //
171     // If the entry is idempotent, a size should always be available.
172     virtual std::optional<uint64_t> size() const = 0;
173 
174     // When true, multiple reads on the object must produce the exact
175     // same data or the reads will fail. Some sources of entry data,
176     // such as streams, may not be capable of preserving idempotency
177     // and therefore must not claim to be. If an entry claims to be
178     // idempotent and cannot preserve that quality, subsequent reads
179     // must fail with an error when a variance is detected.
180     virtual bool is_idempotent() const = 0;
181   };
182 
183   // Creates an idempotent DataQueue with a pre-established collection
184   // of entries. All of the entries must also be idempotent otherwise
185   // an empty std::unique_ptr will be returned.
186   static std::shared_ptr<DataQueue> CreateIdempotent(
187       std::vector<std::unique_ptr<Entry>> list);
188 
189   // Creates a non-idempotent DataQueue. This kind of queue can be
190   // mutated and updated such that multiple reads are not guaranteed
191   // to produce the same result. The entries added can be of any type.
192   static std::shared_ptr<DataQueue> Create(
193       std::optional<uint64_t> capped = std::nullopt);
194 
195   // Creates an idempotent Entry from a v8::ArrayBufferView. To help
196   // ensure idempotency, the underlying ArrayBuffer is detached from
197   // the BackingStore. It is the callers responsibility to ensure that
198   // the BackingStore is not otherwise modified through any other
199   // means. If the ArrayBuffer is not detachable, nullptr will be
200   // returned.
201   static std::unique_ptr<Entry> CreateInMemoryEntryFromView(
202       v8::Local<v8::ArrayBufferView> view);
203 
204   // Creates an idempotent Entry from a v8::BackingStore. It is the
205   // callers responsibility to ensure that the BackingStore is not
206   // otherwise modified through any other means. If the ArrayBuffer
207   // is not detachable, nullptr will be returned.
208   static std::unique_ptr<Entry> CreateInMemoryEntryFromBackingStore(
209       std::shared_ptr<v8::BackingStore> store,
210       uint64_t offset,
211       uint64_t length);
212 
213   static std::unique_ptr<Entry> CreateDataQueueEntry(
214       std::shared_ptr<DataQueue> data_queue);
215 
216   static std::unique_ptr<Entry> CreateFdEntry(Environment* env,
217                                               v8::Local<v8::Value> path);
218 
219   // Creates a Reader for the given queue. If the queue is idempotent,
220   // any number of readers can be created, all of which are guaranteed
221   // to provide the same data. Otherwise, only a single reader is
222   // permitted.
223   virtual std::shared_ptr<Reader> get_reader() = 0;
224 
225   // Append a single new entry to the queue. Appending is only allowed
226   // when is_idempotent() is false. std::nullopt will be returned
227   // if is_idempotent() is true. std::optional(false) will be returned if the
228   // data queue is not idempotent but the entry otherwise cannot be added.
229   virtual std::optional<bool> append(std::unique_ptr<Entry> entry) = 0;
230 
231   // Caps the size of this DataQueue preventing additional entries to
232   // be added if those cause the size to extend beyond the specified
233   // limit.
234   //
235   // If limit is zero, or is less than the known current size of the
236   // data queue, the limit is set to the current known size, meaning
237   // that no additional entries can be added at all.
238   //
239   // If the size of the data queue is not known, the limit will be
240   // ignored and no additional entries will be allowed at all.
241   //
242   // If is_idempotent is true capping is unnecessary because the data
243   // queue cannot be appended to. In that case, cap() is a non-op.
244   //
245   // If the data queue has already been capped, cap can be called
246   // again with a smaller size.
247   virtual void cap(uint64_t limit = 0) = 0;
248 
249   // Returns a new DataQueue that is a view over this queues data
250   // from the start offset to the ending offset. If the end offset
251   // is omitted, the slice extends to the end of the data.
252   //
253   // The slice will coverage a range from start up to, but excluding, end.
254   //
255   // Creating a slice is only possible if is_idempotent() returns
256   // true. This is because consuming either the original DataQueue or
257   // the new queue would change the state of the other in non-
258   // deterministic ways. When is_idempotent() returns false, slice()
259   // must return a nulled unique_ptr.
260   //
261   // Creating a slice is also only possible if the size of the
262   // DataQueue is known. If size() returns std::nullopt, slice()
263   // must return a null unique_ptr.
264   virtual std::shared_ptr<DataQueue> slice(
265       uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0;
266 
267   // The size of DataQueue is the total size of all of its member entries.
268   // If any of the entries is not able to specify a size, the DataQueue
269   // will also be incapable of doing so, in which case size() must return
270   // std::nullopt.
271   virtual std::optional<uint64_t> size() const = 0;
272 
273   // A DataQueue is idempotent only if all of its member entries are
274   // idempotent.
275   virtual bool is_idempotent() const = 0;
276 
277   // True only if cap is called or the data queue is a limited to a
278   // fixed size.
279   virtual bool is_capped() const = 0;
280 
281   // If the data queue has been capped, and the size of the data queue
282   // is known, maybeCapRemaining will return the number of additional
283   // bytes the data queue can receive before reaching the cap limit.
284   // If the size of the queue cannot be known, or the cap has not
285   // been set, maybeCapRemaining() will return std::nullopt.
286   virtual std::optional<uint64_t> maybeCapRemaining() const = 0;
287 
288   static void Initialize(Environment* env, v8::Local<v8::Object> target);
289   static void RegisterExternalReferences(ExternalReferenceRegistry* registry);
290 };
291 
292 }  // namespace node
293 
294 #endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
295