1 #pragma once 2 3 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 4 5 #include <base_object.h> 6 #include <memory_tracker.h> 7 #include <node.h> 8 #include <node_bob.h> 9 #include <node_file.h> 10 #include <stream_base.h> 11 #include <uv.h> 12 #include <v8.h> 13 14 #include <memory> 15 #include <optional> 16 #include <vector> 17 18 namespace node { 19 20 // Represents a sequenced collection of data sources that can be 21 // consumed as a single logical stream of data. Sources can be 22 // memory-resident or streaming. 23 // 24 // There are two essential kinds of DataQueue: 25 // 26 // * Idempotent - Multiple reads always produce the same result. 27 // This is even the case if individual sources 28 // are not memory-resident. Reads never change 29 // the state of the DataQueue. Every entry in 30 // an Idempotent DataQueue must also be idempotent. 31 // 32 // * Non-idempotent - Reads are destructive of the internal state. 33 // A non-idempotent DataQueue can be read at 34 // most once and only by a single reader. 35 // Entries in a non-idempotent DataQueue can 36 // be a mix of idempotent and non-idempotent 37 // entries. 38 // 39 // The DataQueue is essentially a collection of DataQueue::Entry 40 // instances. A DataQueue::Entry is a single logical source of 41 // data. The data may be memory-resident or streaming. The entry 42 // can be idempotent or non-idempotent. An entry cannot be read 43 // by itself, it must be part of a DataQueue to be consumed. 44 // 45 // Example of creating an idempotent DataQueue: 46 // 47 // std::shared_ptr<v8::BackingStore> store1 = getBackingStoreSomehow(); 48 // std::shared_ptr<v8::BackingStore> store2 = getBackingStoreSomehow(); 49 // 50 // std::vector<std::unique_ptr<DataQueue::Entry>> list; 51 // list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore( 52 // store1, 0, len1)); 53 // list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore( 54 // store2, 0, len2)); 55 // 56 // std::shared_ptr<DataQueue> data_queue = 57 // DataQueue::CreateIdempotent(std::move(list)); 58 // 59 // Importantly, idempotent DataQueue's are immutable and all entries 60 // must be provided when the DataQueue is constructed. Every entry 61 // must be idempotent with known sizes. The entries may be memory 62 // resident or streaming. Streaming entries must be capable of 63 // being read multiple times. 64 // 65 // Because idempotent DataQueue's will always produce the same results 66 // when read, they can be sliced. Slices yield a new DataQueue instance 67 // that is a subset view over the original: 68 // 69 // std::shared_ptr<DataQueue> slice = data_queue.slice( 70 // 5, v8::Just(10UL)); 71 // 72 // Example of creating a non-idempotent DataQueue: 73 // 74 // std::shared_ptr<v8::BackingStore> store1 = getBackingStoreSomehow(); 75 // std::shared_ptr<v8::BackingStore> store2 = getBackingStoreSomehow(); 76 // 77 // std::shared_ptr<DataQueue> data_queue = DataQueue::Create(); 78 // 79 // data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore( 80 // store1, 0, len1)); 81 // 82 // data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore( 83 // store2, 0, len2)); 84 // 85 // These data-queues can have new entries appended to them. Entries can 86 // be memory-resident or streaming. Streaming entries might not have 87 // a known size. Entries may not be capable of being read multiple 88 // times. 89 // 90 // A non-idempotent data queue will, by default, allow any amount of 91 // entries to be appended to it. To limit the size of the DataQueue, 92 // or the close the DataQueue (preventing new entries from being 93 // appending), use the cap() method. The DataQueue can be capped 94 // at a specific size or whatever size it currently it. 95 // 96 // It might not be possible for a non-idempotent DataQueue to provide 97 // a size because it might not know how much data a streaming entry 98 // will ultimately provide. 99 // 100 // Non-idempotent DataQueues cannot be sliced. 101 // 102 // To read from a DataQueue, we use the node::bob::Source API 103 // (see src/node_bob.h). 104 // 105 // std::shared_ptr<DataQueue::Reader> reader = data_queue->get_reader(); 106 // 107 // reader->Pull( 108 // [](int status, const DataQueue::Vec* vecs, 109 // uint64_t count, Done done) { 110 // // status is one of node::bob::Status 111 // // vecs is zero or more data buffers containing the read data 112 // // count is the number of vecs 113 // // done is a callback to be invoked when done processing the data 114 // }, options, nullptr, 0, 16); 115 // 116 // Keep calling Pull() until status is equal to node::bob::Status::STATUS_EOS. 117 // 118 // For idempotent DataQueues, any number of readers can be created and 119 // pull concurrently from the same DataQueue. The DataQueue can be read 120 // multiple times. Successful reads should always produce the same result. 121 // If, for whatever reason, the implementation cannot ensure that the 122 // data read will remain the same, the read must fail with an error status. 123 // 124 // For non-idempotent DataQueues, only a single reader is ever allowed for 125 // the DataQueue, and the data can only ever be read once. 126 127 class DataQueue : public MemoryRetainer { 128 public: 129 struct Vec { 130 uint8_t* base; 131 uint64_t len; 132 }; 133 134 // A DataQueue::Reader consumes the DataQueue. If the data queue is 135 // idempotent, multiple Readers can be attached to the DataQueue at 136 // any given time, all guaranteed to yield the same result when the 137 // data is read. Otherwise, only a single Reader can be attached. 138 class Reader : public MemoryRetainer, public bob::Source<Vec> { 139 public: 140 using Next = bob::Next<Vec>; 141 using Done = bob::Done; 142 }; 143 144 // A DataQueue::Entry represents a logical chunk of data in the queue. 145 // The entry may or may not represent memory-resident data. It may 146 // or may not be consumable more than once. 147 class Entry : public MemoryRetainer { 148 public: 149 // Returns a new Entry that is a view over this entries data 150 // from the start offset to the ending offset. If the end 151 // offset is omitted, the slice extends to the end of the 152 // data. 153 // 154 // Creating a slice is only possible if is_idempotent() returns 155 // true. This is because consuming either the original entry or 156 // the new entry would change the state of the other in non- 157 // deterministic ways. When is_idempotent() returns false, slice() 158 // must return a nulled unique_ptr. 159 // 160 // Creating a slice is also only possible if the size of the 161 // entry is known. If size() returns std::nullopt, slice() 162 // must return a nulled unique_ptr. 163 virtual std::unique_ptr<Entry> slice( 164 uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0; 165 166 // Returns the number of bytes represented by this Entry if it is 167 // known. Certain types of entries, such as those backed by streams 168 // might not know the size in advance and therefore cannot provide 169 // a value. In such cases, size() must return v8::Nothing<uint64_t>. 170 // 171 // If the entry is idempotent, a size should always be available. 172 virtual std::optional<uint64_t> size() const = 0; 173 174 // When true, multiple reads on the object must produce the exact 175 // same data or the reads will fail. Some sources of entry data, 176 // such as streams, may not be capable of preserving idempotency 177 // and therefore must not claim to be. If an entry claims to be 178 // idempotent and cannot preserve that quality, subsequent reads 179 // must fail with an error when a variance is detected. 180 virtual bool is_idempotent() const = 0; 181 }; 182 183 // Creates an idempotent DataQueue with a pre-established collection 184 // of entries. All of the entries must also be idempotent otherwise 185 // an empty std::unique_ptr will be returned. 186 static std::shared_ptr<DataQueue> CreateIdempotent( 187 std::vector<std::unique_ptr<Entry>> list); 188 189 // Creates a non-idempotent DataQueue. This kind of queue can be 190 // mutated and updated such that multiple reads are not guaranteed 191 // to produce the same result. The entries added can be of any type. 192 static std::shared_ptr<DataQueue> Create( 193 std::optional<uint64_t> capped = std::nullopt); 194 195 // Creates an idempotent Entry from a v8::ArrayBufferView. To help 196 // ensure idempotency, the underlying ArrayBuffer is detached from 197 // the BackingStore. It is the callers responsibility to ensure that 198 // the BackingStore is not otherwise modified through any other 199 // means. If the ArrayBuffer is not detachable, nullptr will be 200 // returned. 201 static std::unique_ptr<Entry> CreateInMemoryEntryFromView( 202 v8::Local<v8::ArrayBufferView> view); 203 204 // Creates an idempotent Entry from a v8::BackingStore. It is the 205 // callers responsibility to ensure that the BackingStore is not 206 // otherwise modified through any other means. If the ArrayBuffer 207 // is not detachable, nullptr will be returned. 208 static std::unique_ptr<Entry> CreateInMemoryEntryFromBackingStore( 209 std::shared_ptr<v8::BackingStore> store, 210 uint64_t offset, 211 uint64_t length); 212 213 static std::unique_ptr<Entry> CreateDataQueueEntry( 214 std::shared_ptr<DataQueue> data_queue); 215 216 static std::unique_ptr<Entry> CreateFdEntry(Environment* env, 217 v8::Local<v8::Value> path); 218 219 // Creates a Reader for the given queue. If the queue is idempotent, 220 // any number of readers can be created, all of which are guaranteed 221 // to provide the same data. Otherwise, only a single reader is 222 // permitted. 223 virtual std::shared_ptr<Reader> get_reader() = 0; 224 225 // Append a single new entry to the queue. Appending is only allowed 226 // when is_idempotent() is false. std::nullopt will be returned 227 // if is_idempotent() is true. std::optional(false) will be returned if the 228 // data queue is not idempotent but the entry otherwise cannot be added. 229 virtual std::optional<bool> append(std::unique_ptr<Entry> entry) = 0; 230 231 // Caps the size of this DataQueue preventing additional entries to 232 // be added if those cause the size to extend beyond the specified 233 // limit. 234 // 235 // If limit is zero, or is less than the known current size of the 236 // data queue, the limit is set to the current known size, meaning 237 // that no additional entries can be added at all. 238 // 239 // If the size of the data queue is not known, the limit will be 240 // ignored and no additional entries will be allowed at all. 241 // 242 // If is_idempotent is true capping is unnecessary because the data 243 // queue cannot be appended to. In that case, cap() is a non-op. 244 // 245 // If the data queue has already been capped, cap can be called 246 // again with a smaller size. 247 virtual void cap(uint64_t limit = 0) = 0; 248 249 // Returns a new DataQueue that is a view over this queues data 250 // from the start offset to the ending offset. If the end offset 251 // is omitted, the slice extends to the end of the data. 252 // 253 // The slice will coverage a range from start up to, but excluding, end. 254 // 255 // Creating a slice is only possible if is_idempotent() returns 256 // true. This is because consuming either the original DataQueue or 257 // the new queue would change the state of the other in non- 258 // deterministic ways. When is_idempotent() returns false, slice() 259 // must return a nulled unique_ptr. 260 // 261 // Creating a slice is also only possible if the size of the 262 // DataQueue is known. If size() returns std::nullopt, slice() 263 // must return a null unique_ptr. 264 virtual std::shared_ptr<DataQueue> slice( 265 uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0; 266 267 // The size of DataQueue is the total size of all of its member entries. 268 // If any of the entries is not able to specify a size, the DataQueue 269 // will also be incapable of doing so, in which case size() must return 270 // std::nullopt. 271 virtual std::optional<uint64_t> size() const = 0; 272 273 // A DataQueue is idempotent only if all of its member entries are 274 // idempotent. 275 virtual bool is_idempotent() const = 0; 276 277 // True only if cap is called or the data queue is a limited to a 278 // fixed size. 279 virtual bool is_capped() const = 0; 280 281 // If the data queue has been capped, and the size of the data queue 282 // is known, maybeCapRemaining will return the number of additional 283 // bytes the data queue can receive before reaching the cap limit. 284 // If the size of the queue cannot be known, or the cap has not 285 // been set, maybeCapRemaining() will return std::nullopt. 286 virtual std::optional<uint64_t> maybeCapRemaining() const = 0; 287 288 static void Initialize(Environment* env, v8::Local<v8::Object> target); 289 static void RegisterExternalReferences(ExternalReferenceRegistry* registry); 290 }; 291 292 } // namespace node 293 294 #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 295