1#pragma once 2 3#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 4 5#include <base_object.h> 6#include <memory_tracker.h> 7#include <node.h> 8#include <node_bob.h> 9#include <node_file.h> 10#include <stream_base.h> 11#include <uv.h> 12#include <v8.h> 13 14#include <memory> 15#include <optional> 16#include <vector> 17 18namespace node { 19 20// Represents a sequenced collection of data sources that can be 21// consumed as a single logical stream of data. Sources can be 22// memory-resident or streaming. 23// 24// There are two essential kinds of DataQueue: 25// 26// * Idempotent - Multiple reads always produce the same result. 27// This is even the case if individual sources 28// are not memory-resident. Reads never change 29// the state of the DataQueue. Every entry in 30// an Idempotent DataQueue must also be idempotent. 31// 32// * Non-idempotent - Reads are destructive of the internal state. 33// A non-idempotent DataQueue can be read at 34// most once and only by a single reader. 35// Entries in a non-idempotent DataQueue can 36// be a mix of idempotent and non-idempotent 37// entries. 38// 39// The DataQueue is essentially a collection of DataQueue::Entry 40// instances. A DataQueue::Entry is a single logical source of 41// data. The data may be memory-resident or streaming. The entry 42// can be idempotent or non-idempotent. An entry cannot be read 43// by itself, it must be part of a DataQueue to be consumed. 44// 45// Example of creating an idempotent DataQueue: 46// 47// std::shared_ptr<v8::BackingStore> store1 = getBackingStoreSomehow(); 48// std::shared_ptr<v8::BackingStore> store2 = getBackingStoreSomehow(); 49// 50// std::vector<std::unique_ptr<DataQueue::Entry>> list; 51// list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore( 52// store1, 0, len1)); 53// list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore( 54// store2, 0, len2)); 55// 56// std::shared_ptr<DataQueue> data_queue = 57// DataQueue::CreateIdempotent(std::move(list)); 58// 59// Importantly, idempotent DataQueue's are immutable and all entries 60// must be provided when the DataQueue is constructed. Every entry 61// must be idempotent with known sizes. The entries may be memory 62// resident or streaming. Streaming entries must be capable of 63// being read multiple times. 64// 65// Because idempotent DataQueue's will always produce the same results 66// when read, they can be sliced. Slices yield a new DataQueue instance 67// that is a subset view over the original: 68// 69// std::shared_ptr<DataQueue> slice = data_queue.slice( 70// 5, v8::Just(10UL)); 71// 72// Example of creating a non-idempotent DataQueue: 73// 74// std::shared_ptr<v8::BackingStore> store1 = getBackingStoreSomehow(); 75// std::shared_ptr<v8::BackingStore> store2 = getBackingStoreSomehow(); 76// 77// std::shared_ptr<DataQueue> data_queue = DataQueue::Create(); 78// 79// data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore( 80// store1, 0, len1)); 81// 82// data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore( 83// store2, 0, len2)); 84// 85// These data-queues can have new entries appended to them. Entries can 86// be memory-resident or streaming. Streaming entries might not have 87// a known size. Entries may not be capable of being read multiple 88// times. 89// 90// A non-idempotent data queue will, by default, allow any amount of 91// entries to be appended to it. To limit the size of the DataQueue, 92// or the close the DataQueue (preventing new entries from being 93// appending), use the cap() method. The DataQueue can be capped 94// at a specific size or whatever size it currently it. 95// 96// It might not be possible for a non-idempotent DataQueue to provide 97// a size because it might not know how much data a streaming entry 98// will ultimately provide. 99// 100// Non-idempotent DataQueues cannot be sliced. 101// 102// To read from a DataQueue, we use the node::bob::Source API 103// (see src/node_bob.h). 104// 105// std::shared_ptr<DataQueue::Reader> reader = data_queue->get_reader(); 106// 107// reader->Pull( 108// [](int status, const DataQueue::Vec* vecs, 109// uint64_t count, Done done) { 110// // status is one of node::bob::Status 111// // vecs is zero or more data buffers containing the read data 112// // count is the number of vecs 113// // done is a callback to be invoked when done processing the data 114// }, options, nullptr, 0, 16); 115// 116// Keep calling Pull() until status is equal to node::bob::Status::STATUS_EOS. 117// 118// For idempotent DataQueues, any number of readers can be created and 119// pull concurrently from the same DataQueue. The DataQueue can be read 120// multiple times. Successful reads should always produce the same result. 121// If, for whatever reason, the implementation cannot ensure that the 122// data read will remain the same, the read must fail with an error status. 123// 124// For non-idempotent DataQueues, only a single reader is ever allowed for 125// the DataQueue, and the data can only ever be read once. 126 127class DataQueue : public MemoryRetainer { 128 public: 129 struct Vec { 130 uint8_t* base; 131 uint64_t len; 132 }; 133 134 // A DataQueue::Reader consumes the DataQueue. If the data queue is 135 // idempotent, multiple Readers can be attached to the DataQueue at 136 // any given time, all guaranteed to yield the same result when the 137 // data is read. Otherwise, only a single Reader can be attached. 138 class Reader : public MemoryRetainer, public bob::Source<Vec> { 139 public: 140 using Next = bob::Next<Vec>; 141 using Done = bob::Done; 142 }; 143 144 // A DataQueue::Entry represents a logical chunk of data in the queue. 145 // The entry may or may not represent memory-resident data. It may 146 // or may not be consumable more than once. 147 class Entry : public MemoryRetainer { 148 public: 149 // Returns a new Entry that is a view over this entries data 150 // from the start offset to the ending offset. If the end 151 // offset is omitted, the slice extends to the end of the 152 // data. 153 // 154 // Creating a slice is only possible if is_idempotent() returns 155 // true. This is because consuming either the original entry or 156 // the new entry would change the state of the other in non- 157 // deterministic ways. When is_idempotent() returns false, slice() 158 // must return a nulled unique_ptr. 159 // 160 // Creating a slice is also only possible if the size of the 161 // entry is known. If size() returns std::nullopt, slice() 162 // must return a nulled unique_ptr. 163 virtual std::unique_ptr<Entry> slice( 164 uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0; 165 166 // Returns the number of bytes represented by this Entry if it is 167 // known. Certain types of entries, such as those backed by streams 168 // might not know the size in advance and therefore cannot provide 169 // a value. In such cases, size() must return v8::Nothing<uint64_t>. 170 // 171 // If the entry is idempotent, a size should always be available. 172 virtual std::optional<uint64_t> size() const = 0; 173 174 // When true, multiple reads on the object must produce the exact 175 // same data or the reads will fail. Some sources of entry data, 176 // such as streams, may not be capable of preserving idempotency 177 // and therefore must not claim to be. If an entry claims to be 178 // idempotent and cannot preserve that quality, subsequent reads 179 // must fail with an error when a variance is detected. 180 virtual bool is_idempotent() const = 0; 181 }; 182 183 // Creates an idempotent DataQueue with a pre-established collection 184 // of entries. All of the entries must also be idempotent otherwise 185 // an empty std::unique_ptr will be returned. 186 static std::shared_ptr<DataQueue> CreateIdempotent( 187 std::vector<std::unique_ptr<Entry>> list); 188 189 // Creates a non-idempotent DataQueue. This kind of queue can be 190 // mutated and updated such that multiple reads are not guaranteed 191 // to produce the same result. The entries added can be of any type. 192 static std::shared_ptr<DataQueue> Create( 193 std::optional<uint64_t> capped = std::nullopt); 194 195 // Creates an idempotent Entry from a v8::ArrayBufferView. To help 196 // ensure idempotency, the underlying ArrayBuffer is detached from 197 // the BackingStore. It is the callers responsibility to ensure that 198 // the BackingStore is not otherwise modified through any other 199 // means. If the ArrayBuffer is not detachable, nullptr will be 200 // returned. 201 static std::unique_ptr<Entry> CreateInMemoryEntryFromView( 202 v8::Local<v8::ArrayBufferView> view); 203 204 // Creates an idempotent Entry from a v8::BackingStore. It is the 205 // callers responsibility to ensure that the BackingStore is not 206 // otherwise modified through any other means. If the ArrayBuffer 207 // is not detachable, nullptr will be returned. 208 static std::unique_ptr<Entry> CreateInMemoryEntryFromBackingStore( 209 std::shared_ptr<v8::BackingStore> store, 210 uint64_t offset, 211 uint64_t length); 212 213 static std::unique_ptr<Entry> CreateDataQueueEntry( 214 std::shared_ptr<DataQueue> data_queue); 215 216 static std::unique_ptr<Entry> CreateFdEntry(Environment* env, 217 v8::Local<v8::Value> path); 218 219 // Creates a Reader for the given queue. If the queue is idempotent, 220 // any number of readers can be created, all of which are guaranteed 221 // to provide the same data. Otherwise, only a single reader is 222 // permitted. 223 virtual std::shared_ptr<Reader> get_reader() = 0; 224 225 // Append a single new entry to the queue. Appending is only allowed 226 // when is_idempotent() is false. std::nullopt will be returned 227 // if is_idempotent() is true. std::optional(false) will be returned if the 228 // data queue is not idempotent but the entry otherwise cannot be added. 229 virtual std::optional<bool> append(std::unique_ptr<Entry> entry) = 0; 230 231 // Caps the size of this DataQueue preventing additional entries to 232 // be added if those cause the size to extend beyond the specified 233 // limit. 234 // 235 // If limit is zero, or is less than the known current size of the 236 // data queue, the limit is set to the current known size, meaning 237 // that no additional entries can be added at all. 238 // 239 // If the size of the data queue is not known, the limit will be 240 // ignored and no additional entries will be allowed at all. 241 // 242 // If is_idempotent is true capping is unnecessary because the data 243 // queue cannot be appended to. In that case, cap() is a non-op. 244 // 245 // If the data queue has already been capped, cap can be called 246 // again with a smaller size. 247 virtual void cap(uint64_t limit = 0) = 0; 248 249 // Returns a new DataQueue that is a view over this queues data 250 // from the start offset to the ending offset. If the end offset 251 // is omitted, the slice extends to the end of the data. 252 // 253 // The slice will coverage a range from start up to, but excluding, end. 254 // 255 // Creating a slice is only possible if is_idempotent() returns 256 // true. This is because consuming either the original DataQueue or 257 // the new queue would change the state of the other in non- 258 // deterministic ways. When is_idempotent() returns false, slice() 259 // must return a nulled unique_ptr. 260 // 261 // Creating a slice is also only possible if the size of the 262 // DataQueue is known. If size() returns std::nullopt, slice() 263 // must return a null unique_ptr. 264 virtual std::shared_ptr<DataQueue> slice( 265 uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0; 266 267 // The size of DataQueue is the total size of all of its member entries. 268 // If any of the entries is not able to specify a size, the DataQueue 269 // will also be incapable of doing so, in which case size() must return 270 // std::nullopt. 271 virtual std::optional<uint64_t> size() const = 0; 272 273 // A DataQueue is idempotent only if all of its member entries are 274 // idempotent. 275 virtual bool is_idempotent() const = 0; 276 277 // True only if cap is called or the data queue is a limited to a 278 // fixed size. 279 virtual bool is_capped() const = 0; 280 281 // If the data queue has been capped, and the size of the data queue 282 // is known, maybeCapRemaining will return the number of additional 283 // bytes the data queue can receive before reaching the cap limit. 284 // If the size of the queue cannot be known, or the cap has not 285 // been set, maybeCapRemaining() will return std::nullopt. 286 virtual std::optional<uint64_t> maybeCapRemaining() const = 0; 287 288 static void Initialize(Environment* env, v8::Local<v8::Object> target); 289 static void RegisterExternalReferences(ExternalReferenceRegistry* registry); 290}; 291 292} // namespace node 293 294#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 295