11cb0ef41Sopenharmony_ci#pragma once 21cb0ef41Sopenharmony_ci 31cb0ef41Sopenharmony_ci#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 41cb0ef41Sopenharmony_ci 51cb0ef41Sopenharmony_ci#include <base_object.h> 61cb0ef41Sopenharmony_ci#include <memory_tracker.h> 71cb0ef41Sopenharmony_ci#include <node.h> 81cb0ef41Sopenharmony_ci#include <node_bob.h> 91cb0ef41Sopenharmony_ci#include <node_file.h> 101cb0ef41Sopenharmony_ci#include <stream_base.h> 111cb0ef41Sopenharmony_ci#include <uv.h> 121cb0ef41Sopenharmony_ci#include <v8.h> 131cb0ef41Sopenharmony_ci 141cb0ef41Sopenharmony_ci#include <memory> 151cb0ef41Sopenharmony_ci#include <optional> 161cb0ef41Sopenharmony_ci#include <vector> 171cb0ef41Sopenharmony_ci 181cb0ef41Sopenharmony_cinamespace node { 191cb0ef41Sopenharmony_ci 201cb0ef41Sopenharmony_ci// Represents a sequenced collection of data sources that can be 211cb0ef41Sopenharmony_ci// consumed as a single logical stream of data. Sources can be 221cb0ef41Sopenharmony_ci// memory-resident or streaming. 231cb0ef41Sopenharmony_ci// 241cb0ef41Sopenharmony_ci// There are two essential kinds of DataQueue: 251cb0ef41Sopenharmony_ci// 261cb0ef41Sopenharmony_ci// * Idempotent - Multiple reads always produce the same result. 271cb0ef41Sopenharmony_ci// This is even the case if individual sources 281cb0ef41Sopenharmony_ci// are not memory-resident. Reads never change 291cb0ef41Sopenharmony_ci// the state of the DataQueue. Every entry in 301cb0ef41Sopenharmony_ci// an Idempotent DataQueue must also be idempotent. 311cb0ef41Sopenharmony_ci// 321cb0ef41Sopenharmony_ci// * Non-idempotent - Reads are destructive of the internal state. 331cb0ef41Sopenharmony_ci// A non-idempotent DataQueue can be read at 341cb0ef41Sopenharmony_ci// most once and only by a single reader. 351cb0ef41Sopenharmony_ci// Entries in a non-idempotent DataQueue can 361cb0ef41Sopenharmony_ci// be a mix of idempotent and non-idempotent 371cb0ef41Sopenharmony_ci// entries. 381cb0ef41Sopenharmony_ci// 391cb0ef41Sopenharmony_ci// The DataQueue is essentially a collection of DataQueue::Entry 401cb0ef41Sopenharmony_ci// instances. A DataQueue::Entry is a single logical source of 411cb0ef41Sopenharmony_ci// data. The data may be memory-resident or streaming. The entry 421cb0ef41Sopenharmony_ci// can be idempotent or non-idempotent. An entry cannot be read 431cb0ef41Sopenharmony_ci// by itself, it must be part of a DataQueue to be consumed. 441cb0ef41Sopenharmony_ci// 451cb0ef41Sopenharmony_ci// Example of creating an idempotent DataQueue: 461cb0ef41Sopenharmony_ci// 471cb0ef41Sopenharmony_ci// std::shared_ptr<v8::BackingStore> store1 = getBackingStoreSomehow(); 481cb0ef41Sopenharmony_ci// std::shared_ptr<v8::BackingStore> store2 = getBackingStoreSomehow(); 491cb0ef41Sopenharmony_ci// 501cb0ef41Sopenharmony_ci// std::vector<std::unique_ptr<DataQueue::Entry>> list; 511cb0ef41Sopenharmony_ci// list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore( 521cb0ef41Sopenharmony_ci// store1, 0, len1)); 531cb0ef41Sopenharmony_ci// list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore( 541cb0ef41Sopenharmony_ci// store2, 0, len2)); 551cb0ef41Sopenharmony_ci// 561cb0ef41Sopenharmony_ci// std::shared_ptr<DataQueue> data_queue = 571cb0ef41Sopenharmony_ci// DataQueue::CreateIdempotent(std::move(list)); 581cb0ef41Sopenharmony_ci// 591cb0ef41Sopenharmony_ci// Importantly, idempotent DataQueue's are immutable and all entries 601cb0ef41Sopenharmony_ci// must be provided when the DataQueue is constructed. Every entry 611cb0ef41Sopenharmony_ci// must be idempotent with known sizes. The entries may be memory 621cb0ef41Sopenharmony_ci// resident or streaming. Streaming entries must be capable of 631cb0ef41Sopenharmony_ci// being read multiple times. 641cb0ef41Sopenharmony_ci// 651cb0ef41Sopenharmony_ci// Because idempotent DataQueue's will always produce the same results 661cb0ef41Sopenharmony_ci// when read, they can be sliced. Slices yield a new DataQueue instance 671cb0ef41Sopenharmony_ci// that is a subset view over the original: 681cb0ef41Sopenharmony_ci// 691cb0ef41Sopenharmony_ci// std::shared_ptr<DataQueue> slice = data_queue.slice( 701cb0ef41Sopenharmony_ci// 5, v8::Just(10UL)); 711cb0ef41Sopenharmony_ci// 721cb0ef41Sopenharmony_ci// Example of creating a non-idempotent DataQueue: 731cb0ef41Sopenharmony_ci// 741cb0ef41Sopenharmony_ci// std::shared_ptr<v8::BackingStore> store1 = getBackingStoreSomehow(); 751cb0ef41Sopenharmony_ci// std::shared_ptr<v8::BackingStore> store2 = getBackingStoreSomehow(); 761cb0ef41Sopenharmony_ci// 771cb0ef41Sopenharmony_ci// std::shared_ptr<DataQueue> data_queue = DataQueue::Create(); 781cb0ef41Sopenharmony_ci// 791cb0ef41Sopenharmony_ci// data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore( 801cb0ef41Sopenharmony_ci// store1, 0, len1)); 811cb0ef41Sopenharmony_ci// 821cb0ef41Sopenharmony_ci// data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore( 831cb0ef41Sopenharmony_ci// store2, 0, len2)); 841cb0ef41Sopenharmony_ci// 851cb0ef41Sopenharmony_ci// These data-queues can have new entries appended to them. Entries can 861cb0ef41Sopenharmony_ci// be memory-resident or streaming. Streaming entries might not have 871cb0ef41Sopenharmony_ci// a known size. Entries may not be capable of being read multiple 881cb0ef41Sopenharmony_ci// times. 891cb0ef41Sopenharmony_ci// 901cb0ef41Sopenharmony_ci// A non-idempotent data queue will, by default, allow any amount of 911cb0ef41Sopenharmony_ci// entries to be appended to it. To limit the size of the DataQueue, 921cb0ef41Sopenharmony_ci// or the close the DataQueue (preventing new entries from being 931cb0ef41Sopenharmony_ci// appending), use the cap() method. The DataQueue can be capped 941cb0ef41Sopenharmony_ci// at a specific size or whatever size it currently it. 951cb0ef41Sopenharmony_ci// 961cb0ef41Sopenharmony_ci// It might not be possible for a non-idempotent DataQueue to provide 971cb0ef41Sopenharmony_ci// a size because it might not know how much data a streaming entry 981cb0ef41Sopenharmony_ci// will ultimately provide. 991cb0ef41Sopenharmony_ci// 1001cb0ef41Sopenharmony_ci// Non-idempotent DataQueues cannot be sliced. 1011cb0ef41Sopenharmony_ci// 1021cb0ef41Sopenharmony_ci// To read from a DataQueue, we use the node::bob::Source API 1031cb0ef41Sopenharmony_ci// (see src/node_bob.h). 1041cb0ef41Sopenharmony_ci// 1051cb0ef41Sopenharmony_ci// std::shared_ptr<DataQueue::Reader> reader = data_queue->get_reader(); 1061cb0ef41Sopenharmony_ci// 1071cb0ef41Sopenharmony_ci// reader->Pull( 1081cb0ef41Sopenharmony_ci// [](int status, const DataQueue::Vec* vecs, 1091cb0ef41Sopenharmony_ci// uint64_t count, Done done) { 1101cb0ef41Sopenharmony_ci// // status is one of node::bob::Status 1111cb0ef41Sopenharmony_ci// // vecs is zero or more data buffers containing the read data 1121cb0ef41Sopenharmony_ci// // count is the number of vecs 1131cb0ef41Sopenharmony_ci// // done is a callback to be invoked when done processing the data 1141cb0ef41Sopenharmony_ci// }, options, nullptr, 0, 16); 1151cb0ef41Sopenharmony_ci// 1161cb0ef41Sopenharmony_ci// Keep calling Pull() until status is equal to node::bob::Status::STATUS_EOS. 1171cb0ef41Sopenharmony_ci// 1181cb0ef41Sopenharmony_ci// For idempotent DataQueues, any number of readers can be created and 1191cb0ef41Sopenharmony_ci// pull concurrently from the same DataQueue. The DataQueue can be read 1201cb0ef41Sopenharmony_ci// multiple times. Successful reads should always produce the same result. 1211cb0ef41Sopenharmony_ci// If, for whatever reason, the implementation cannot ensure that the 1221cb0ef41Sopenharmony_ci// data read will remain the same, the read must fail with an error status. 1231cb0ef41Sopenharmony_ci// 1241cb0ef41Sopenharmony_ci// For non-idempotent DataQueues, only a single reader is ever allowed for 1251cb0ef41Sopenharmony_ci// the DataQueue, and the data can only ever be read once. 1261cb0ef41Sopenharmony_ci 1271cb0ef41Sopenharmony_ciclass DataQueue : public MemoryRetainer { 1281cb0ef41Sopenharmony_ci public: 1291cb0ef41Sopenharmony_ci struct Vec { 1301cb0ef41Sopenharmony_ci uint8_t* base; 1311cb0ef41Sopenharmony_ci uint64_t len; 1321cb0ef41Sopenharmony_ci }; 1331cb0ef41Sopenharmony_ci 1341cb0ef41Sopenharmony_ci // A DataQueue::Reader consumes the DataQueue. If the data queue is 1351cb0ef41Sopenharmony_ci // idempotent, multiple Readers can be attached to the DataQueue at 1361cb0ef41Sopenharmony_ci // any given time, all guaranteed to yield the same result when the 1371cb0ef41Sopenharmony_ci // data is read. Otherwise, only a single Reader can be attached. 1381cb0ef41Sopenharmony_ci class Reader : public MemoryRetainer, public bob::Source<Vec> { 1391cb0ef41Sopenharmony_ci public: 1401cb0ef41Sopenharmony_ci using Next = bob::Next<Vec>; 1411cb0ef41Sopenharmony_ci using Done = bob::Done; 1421cb0ef41Sopenharmony_ci }; 1431cb0ef41Sopenharmony_ci 1441cb0ef41Sopenharmony_ci // A DataQueue::Entry represents a logical chunk of data in the queue. 1451cb0ef41Sopenharmony_ci // The entry may or may not represent memory-resident data. It may 1461cb0ef41Sopenharmony_ci // or may not be consumable more than once. 1471cb0ef41Sopenharmony_ci class Entry : public MemoryRetainer { 1481cb0ef41Sopenharmony_ci public: 1491cb0ef41Sopenharmony_ci // Returns a new Entry that is a view over this entries data 1501cb0ef41Sopenharmony_ci // from the start offset to the ending offset. If the end 1511cb0ef41Sopenharmony_ci // offset is omitted, the slice extends to the end of the 1521cb0ef41Sopenharmony_ci // data. 1531cb0ef41Sopenharmony_ci // 1541cb0ef41Sopenharmony_ci // Creating a slice is only possible if is_idempotent() returns 1551cb0ef41Sopenharmony_ci // true. This is because consuming either the original entry or 1561cb0ef41Sopenharmony_ci // the new entry would change the state of the other in non- 1571cb0ef41Sopenharmony_ci // deterministic ways. When is_idempotent() returns false, slice() 1581cb0ef41Sopenharmony_ci // must return a nulled unique_ptr. 1591cb0ef41Sopenharmony_ci // 1601cb0ef41Sopenharmony_ci // Creating a slice is also only possible if the size of the 1611cb0ef41Sopenharmony_ci // entry is known. If size() returns std::nullopt, slice() 1621cb0ef41Sopenharmony_ci // must return a nulled unique_ptr. 1631cb0ef41Sopenharmony_ci virtual std::unique_ptr<Entry> slice( 1641cb0ef41Sopenharmony_ci uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0; 1651cb0ef41Sopenharmony_ci 1661cb0ef41Sopenharmony_ci // Returns the number of bytes represented by this Entry if it is 1671cb0ef41Sopenharmony_ci // known. Certain types of entries, such as those backed by streams 1681cb0ef41Sopenharmony_ci // might not know the size in advance and therefore cannot provide 1691cb0ef41Sopenharmony_ci // a value. In such cases, size() must return v8::Nothing<uint64_t>. 1701cb0ef41Sopenharmony_ci // 1711cb0ef41Sopenharmony_ci // If the entry is idempotent, a size should always be available. 1721cb0ef41Sopenharmony_ci virtual std::optional<uint64_t> size() const = 0; 1731cb0ef41Sopenharmony_ci 1741cb0ef41Sopenharmony_ci // When true, multiple reads on the object must produce the exact 1751cb0ef41Sopenharmony_ci // same data or the reads will fail. Some sources of entry data, 1761cb0ef41Sopenharmony_ci // such as streams, may not be capable of preserving idempotency 1771cb0ef41Sopenharmony_ci // and therefore must not claim to be. If an entry claims to be 1781cb0ef41Sopenharmony_ci // idempotent and cannot preserve that quality, subsequent reads 1791cb0ef41Sopenharmony_ci // must fail with an error when a variance is detected. 1801cb0ef41Sopenharmony_ci virtual bool is_idempotent() const = 0; 1811cb0ef41Sopenharmony_ci }; 1821cb0ef41Sopenharmony_ci 1831cb0ef41Sopenharmony_ci // Creates an idempotent DataQueue with a pre-established collection 1841cb0ef41Sopenharmony_ci // of entries. All of the entries must also be idempotent otherwise 1851cb0ef41Sopenharmony_ci // an empty std::unique_ptr will be returned. 1861cb0ef41Sopenharmony_ci static std::shared_ptr<DataQueue> CreateIdempotent( 1871cb0ef41Sopenharmony_ci std::vector<std::unique_ptr<Entry>> list); 1881cb0ef41Sopenharmony_ci 1891cb0ef41Sopenharmony_ci // Creates a non-idempotent DataQueue. This kind of queue can be 1901cb0ef41Sopenharmony_ci // mutated and updated such that multiple reads are not guaranteed 1911cb0ef41Sopenharmony_ci // to produce the same result. The entries added can be of any type. 1921cb0ef41Sopenharmony_ci static std::shared_ptr<DataQueue> Create( 1931cb0ef41Sopenharmony_ci std::optional<uint64_t> capped = std::nullopt); 1941cb0ef41Sopenharmony_ci 1951cb0ef41Sopenharmony_ci // Creates an idempotent Entry from a v8::ArrayBufferView. To help 1961cb0ef41Sopenharmony_ci // ensure idempotency, the underlying ArrayBuffer is detached from 1971cb0ef41Sopenharmony_ci // the BackingStore. It is the callers responsibility to ensure that 1981cb0ef41Sopenharmony_ci // the BackingStore is not otherwise modified through any other 1991cb0ef41Sopenharmony_ci // means. If the ArrayBuffer is not detachable, nullptr will be 2001cb0ef41Sopenharmony_ci // returned. 2011cb0ef41Sopenharmony_ci static std::unique_ptr<Entry> CreateInMemoryEntryFromView( 2021cb0ef41Sopenharmony_ci v8::Local<v8::ArrayBufferView> view); 2031cb0ef41Sopenharmony_ci 2041cb0ef41Sopenharmony_ci // Creates an idempotent Entry from a v8::BackingStore. It is the 2051cb0ef41Sopenharmony_ci // callers responsibility to ensure that the BackingStore is not 2061cb0ef41Sopenharmony_ci // otherwise modified through any other means. If the ArrayBuffer 2071cb0ef41Sopenharmony_ci // is not detachable, nullptr will be returned. 2081cb0ef41Sopenharmony_ci static std::unique_ptr<Entry> CreateInMemoryEntryFromBackingStore( 2091cb0ef41Sopenharmony_ci std::shared_ptr<v8::BackingStore> store, 2101cb0ef41Sopenharmony_ci uint64_t offset, 2111cb0ef41Sopenharmony_ci uint64_t length); 2121cb0ef41Sopenharmony_ci 2131cb0ef41Sopenharmony_ci static std::unique_ptr<Entry> CreateDataQueueEntry( 2141cb0ef41Sopenharmony_ci std::shared_ptr<DataQueue> data_queue); 2151cb0ef41Sopenharmony_ci 2161cb0ef41Sopenharmony_ci static std::unique_ptr<Entry> CreateFdEntry(Environment* env, 2171cb0ef41Sopenharmony_ci v8::Local<v8::Value> path); 2181cb0ef41Sopenharmony_ci 2191cb0ef41Sopenharmony_ci // Creates a Reader for the given queue. If the queue is idempotent, 2201cb0ef41Sopenharmony_ci // any number of readers can be created, all of which are guaranteed 2211cb0ef41Sopenharmony_ci // to provide the same data. Otherwise, only a single reader is 2221cb0ef41Sopenharmony_ci // permitted. 2231cb0ef41Sopenharmony_ci virtual std::shared_ptr<Reader> get_reader() = 0; 2241cb0ef41Sopenharmony_ci 2251cb0ef41Sopenharmony_ci // Append a single new entry to the queue. Appending is only allowed 2261cb0ef41Sopenharmony_ci // when is_idempotent() is false. std::nullopt will be returned 2271cb0ef41Sopenharmony_ci // if is_idempotent() is true. std::optional(false) will be returned if the 2281cb0ef41Sopenharmony_ci // data queue is not idempotent but the entry otherwise cannot be added. 2291cb0ef41Sopenharmony_ci virtual std::optional<bool> append(std::unique_ptr<Entry> entry) = 0; 2301cb0ef41Sopenharmony_ci 2311cb0ef41Sopenharmony_ci // Caps the size of this DataQueue preventing additional entries to 2321cb0ef41Sopenharmony_ci // be added if those cause the size to extend beyond the specified 2331cb0ef41Sopenharmony_ci // limit. 2341cb0ef41Sopenharmony_ci // 2351cb0ef41Sopenharmony_ci // If limit is zero, or is less than the known current size of the 2361cb0ef41Sopenharmony_ci // data queue, the limit is set to the current known size, meaning 2371cb0ef41Sopenharmony_ci // that no additional entries can be added at all. 2381cb0ef41Sopenharmony_ci // 2391cb0ef41Sopenharmony_ci // If the size of the data queue is not known, the limit will be 2401cb0ef41Sopenharmony_ci // ignored and no additional entries will be allowed at all. 2411cb0ef41Sopenharmony_ci // 2421cb0ef41Sopenharmony_ci // If is_idempotent is true capping is unnecessary because the data 2431cb0ef41Sopenharmony_ci // queue cannot be appended to. In that case, cap() is a non-op. 2441cb0ef41Sopenharmony_ci // 2451cb0ef41Sopenharmony_ci // If the data queue has already been capped, cap can be called 2461cb0ef41Sopenharmony_ci // again with a smaller size. 2471cb0ef41Sopenharmony_ci virtual void cap(uint64_t limit = 0) = 0; 2481cb0ef41Sopenharmony_ci 2491cb0ef41Sopenharmony_ci // Returns a new DataQueue that is a view over this queues data 2501cb0ef41Sopenharmony_ci // from the start offset to the ending offset. If the end offset 2511cb0ef41Sopenharmony_ci // is omitted, the slice extends to the end of the data. 2521cb0ef41Sopenharmony_ci // 2531cb0ef41Sopenharmony_ci // The slice will coverage a range from start up to, but excluding, end. 2541cb0ef41Sopenharmony_ci // 2551cb0ef41Sopenharmony_ci // Creating a slice is only possible if is_idempotent() returns 2561cb0ef41Sopenharmony_ci // true. This is because consuming either the original DataQueue or 2571cb0ef41Sopenharmony_ci // the new queue would change the state of the other in non- 2581cb0ef41Sopenharmony_ci // deterministic ways. When is_idempotent() returns false, slice() 2591cb0ef41Sopenharmony_ci // must return a nulled unique_ptr. 2601cb0ef41Sopenharmony_ci // 2611cb0ef41Sopenharmony_ci // Creating a slice is also only possible if the size of the 2621cb0ef41Sopenharmony_ci // DataQueue is known. If size() returns std::nullopt, slice() 2631cb0ef41Sopenharmony_ci // must return a null unique_ptr. 2641cb0ef41Sopenharmony_ci virtual std::shared_ptr<DataQueue> slice( 2651cb0ef41Sopenharmony_ci uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0; 2661cb0ef41Sopenharmony_ci 2671cb0ef41Sopenharmony_ci // The size of DataQueue is the total size of all of its member entries. 2681cb0ef41Sopenharmony_ci // If any of the entries is not able to specify a size, the DataQueue 2691cb0ef41Sopenharmony_ci // will also be incapable of doing so, in which case size() must return 2701cb0ef41Sopenharmony_ci // std::nullopt. 2711cb0ef41Sopenharmony_ci virtual std::optional<uint64_t> size() const = 0; 2721cb0ef41Sopenharmony_ci 2731cb0ef41Sopenharmony_ci // A DataQueue is idempotent only if all of its member entries are 2741cb0ef41Sopenharmony_ci // idempotent. 2751cb0ef41Sopenharmony_ci virtual bool is_idempotent() const = 0; 2761cb0ef41Sopenharmony_ci 2771cb0ef41Sopenharmony_ci // True only if cap is called or the data queue is a limited to a 2781cb0ef41Sopenharmony_ci // fixed size. 2791cb0ef41Sopenharmony_ci virtual bool is_capped() const = 0; 2801cb0ef41Sopenharmony_ci 2811cb0ef41Sopenharmony_ci // If the data queue has been capped, and the size of the data queue 2821cb0ef41Sopenharmony_ci // is known, maybeCapRemaining will return the number of additional 2831cb0ef41Sopenharmony_ci // bytes the data queue can receive before reaching the cap limit. 2841cb0ef41Sopenharmony_ci // If the size of the queue cannot be known, or the cap has not 2851cb0ef41Sopenharmony_ci // been set, maybeCapRemaining() will return std::nullopt. 2861cb0ef41Sopenharmony_ci virtual std::optional<uint64_t> maybeCapRemaining() const = 0; 2871cb0ef41Sopenharmony_ci 2881cb0ef41Sopenharmony_ci static void Initialize(Environment* env, v8::Local<v8::Object> target); 2891cb0ef41Sopenharmony_ci static void RegisterExternalReferences(ExternalReferenceRegistry* registry); 2901cb0ef41Sopenharmony_ci}; 2911cb0ef41Sopenharmony_ci 2921cb0ef41Sopenharmony_ci} // namespace node 2931cb0ef41Sopenharmony_ci 2941cb0ef41Sopenharmony_ci#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 295