1fb299fa2Sopenharmony_ci/*
2fb299fa2Sopenharmony_ci * Copyright (c) 2021 Huawei Device Co., Ltd.
3fb299fa2Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4fb299fa2Sopenharmony_ci * you may not use this file except in compliance with the License.
5fb299fa2Sopenharmony_ci * You may obtain a copy of the License at
6fb299fa2Sopenharmony_ci *
7fb299fa2Sopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
8fb299fa2Sopenharmony_ci *
9fb299fa2Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10fb299fa2Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11fb299fa2Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12fb299fa2Sopenharmony_ci * See the License for the specific language governing permissions and
13fb299fa2Sopenharmony_ci * limitations under the License.
14fb299fa2Sopenharmony_ci */
15fb299fa2Sopenharmony_ci#include "pkg_stream.h"
16fb299fa2Sopenharmony_ci#include <fcntl.h>
17fb299fa2Sopenharmony_ci#include <sys/types.h>
18fb299fa2Sopenharmony_ci#include <sys/stat.h>
19fb299fa2Sopenharmony_ci#include <unistd.h>
20fb299fa2Sopenharmony_ci#include <cstdio>
21fb299fa2Sopenharmony_ci#include "dump.h"
22fb299fa2Sopenharmony_ci#include "pkg_manager.h"
23fb299fa2Sopenharmony_ci#include "pkg_utils.h"
24fb299fa2Sopenharmony_ci#include "securec.h"
25fb299fa2Sopenharmony_ci
26fb299fa2Sopenharmony_ci#ifdef __APPLE__
27fb299fa2Sopenharmony_ci#define off64_t off_t
28fb299fa2Sopenharmony_ci#define fopen64 fopen
29fb299fa2Sopenharmony_ci#define ftello64 ftello
30fb299fa2Sopenharmony_ci#define fseeko64 fseek
31fb299fa2Sopenharmony_ci#endif
32fb299fa2Sopenharmony_ci
33fb299fa2Sopenharmony_cinamespace Hpackage {
34fb299fa2Sopenharmony_ciconst std::string PkgStreamImpl::GetFileName() const
35fb299fa2Sopenharmony_ci{
36fb299fa2Sopenharmony_ci    return fileName_;
37fb299fa2Sopenharmony_ci}
38fb299fa2Sopenharmony_ci
39fb299fa2Sopenharmony_ciPkgStreamPtr PkgStreamImpl::ConvertPkgStream(PkgManager::StreamPtr stream)
40fb299fa2Sopenharmony_ci{
41fb299fa2Sopenharmony_ci    return (PkgStreamPtr)stream;
42fb299fa2Sopenharmony_ci}
43fb299fa2Sopenharmony_ci
44fb299fa2Sopenharmony_civoid PkgStreamImpl::AddRef()
45fb299fa2Sopenharmony_ci{
46fb299fa2Sopenharmony_ci    refCount_++;
47fb299fa2Sopenharmony_ci}
48fb299fa2Sopenharmony_ci
49fb299fa2Sopenharmony_civoid PkgStreamImpl::DelRef()
50fb299fa2Sopenharmony_ci{
51fb299fa2Sopenharmony_ci    refCount_--;
52fb299fa2Sopenharmony_ci}
53fb299fa2Sopenharmony_ci
54fb299fa2Sopenharmony_cibool PkgStreamImpl::IsRef() const
55fb299fa2Sopenharmony_ci{
56fb299fa2Sopenharmony_ci    return refCount_ == 0;
57fb299fa2Sopenharmony_ci}
58fb299fa2Sopenharmony_ci
59fb299fa2Sopenharmony_civoid PkgStreamImpl::PostDecodeProgress(int type, size_t writeDataLen, const void *context) const
60fb299fa2Sopenharmony_ci{
61fb299fa2Sopenharmony_ci    if (pkgManager_ != nullptr) {
62fb299fa2Sopenharmony_ci        pkgManager_->PostDecodeProgress(type, writeDataLen, context);
63fb299fa2Sopenharmony_ci    }
64fb299fa2Sopenharmony_ci}
65fb299fa2Sopenharmony_ci
66fb299fa2Sopenharmony_ciFileStream::~FileStream()
67fb299fa2Sopenharmony_ci{
68fb299fa2Sopenharmony_ci    if (stream_ != nullptr) {
69fb299fa2Sopenharmony_ci        fflush(stream_);
70fb299fa2Sopenharmony_ci        fclose(stream_);
71fb299fa2Sopenharmony_ci        stream_ = nullptr;
72fb299fa2Sopenharmony_ci    }
73fb299fa2Sopenharmony_ci}
74fb299fa2Sopenharmony_ci
75fb299fa2Sopenharmony_ciint32_t FileStream::Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen)
76fb299fa2Sopenharmony_ci{
77fb299fa2Sopenharmony_ci    Updater::UPDATER_INIT_RECORD;
78fb299fa2Sopenharmony_ci    if (stream_ == nullptr) {
79fb299fa2Sopenharmony_ci        PKG_LOGE("Invalid stream");
80fb299fa2Sopenharmony_ci        UPDATER_LAST_WORD(PKG_INVALID_STREAM);
81fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
82fb299fa2Sopenharmony_ci    }
83fb299fa2Sopenharmony_ci    if (data.length < needRead) {
84fb299fa2Sopenharmony_ci        PKG_LOGE("insufficient buffer capacity");
85fb299fa2Sopenharmony_ci        UPDATER_LAST_WORD(PKG_INVALID_STREAM);
86fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
87fb299fa2Sopenharmony_ci    }
88fb299fa2Sopenharmony_ci    readLen = 0;
89fb299fa2Sopenharmony_ci    if (fseeko64(stream_, start, SEEK_SET) != 0) {
90fb299fa2Sopenharmony_ci        PKG_LOGE("read data fail");
91fb299fa2Sopenharmony_ci        UPDATER_LAST_WORD(PKG_INVALID_STREAM);
92fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
93fb299fa2Sopenharmony_ci    }
94fb299fa2Sopenharmony_ci    if (start > GetFileLength()) {
95fb299fa2Sopenharmony_ci        PKG_LOGE("Invalid start");
96fb299fa2Sopenharmony_ci        UPDATER_LAST_WORD(PKG_INVALID_STREAM);
97fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
98fb299fa2Sopenharmony_ci    }
99fb299fa2Sopenharmony_ci    if (data.buffer == nullptr) {
100fb299fa2Sopenharmony_ci        data.data.resize(data.length);
101fb299fa2Sopenharmony_ci        data.buffer = data.data.data();
102fb299fa2Sopenharmony_ci    }
103fb299fa2Sopenharmony_ci    readLen = fread(data.buffer, 1, needRead, stream_);
104fb299fa2Sopenharmony_ci    if (readLen == 0) {
105fb299fa2Sopenharmony_ci        PKG_LOGE("read data fail");
106fb299fa2Sopenharmony_ci        UPDATER_LAST_WORD(PKG_INVALID_STREAM);
107fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
108fb299fa2Sopenharmony_ci    }
109fb299fa2Sopenharmony_ci    return PKG_SUCCESS;
110fb299fa2Sopenharmony_ci}
111fb299fa2Sopenharmony_ci
112fb299fa2Sopenharmony_ciint32_t FileStream::Write(const PkgBuffer &data, size_t size, size_t start)
113fb299fa2Sopenharmony_ci{
114fb299fa2Sopenharmony_ci    if (streamType_ != PkgStreamType_Write) {
115fb299fa2Sopenharmony_ci        PKG_LOGE("Invalid stream type");
116fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
117fb299fa2Sopenharmony_ci    }
118fb299fa2Sopenharmony_ci    if (stream_ == nullptr) {
119fb299fa2Sopenharmony_ci        PKG_LOGE("Invalid stream");
120fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
121fb299fa2Sopenharmony_ci    }
122fb299fa2Sopenharmony_ci    if (fseeko64(stream_, start, SEEK_SET) != 0) {
123fb299fa2Sopenharmony_ci        PKG_LOGE("write data fail");
124fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
125fb299fa2Sopenharmony_ci    }
126fb299fa2Sopenharmony_ci    size_t len = fwrite(data.buffer, size, 1, stream_);
127fb299fa2Sopenharmony_ci    if (len != 1) {
128fb299fa2Sopenharmony_ci        PKG_LOGE("Write buffer fail");
129fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
130fb299fa2Sopenharmony_ci    }
131fb299fa2Sopenharmony_ci    PostDecodeProgress(POST_TYPE_DECODE_PKG, size, nullptr);
132fb299fa2Sopenharmony_ci    return PKG_SUCCESS;
133fb299fa2Sopenharmony_ci}
134fb299fa2Sopenharmony_ci
135fb299fa2Sopenharmony_cisize_t FileStream::GetFileLength()
136fb299fa2Sopenharmony_ci{
137fb299fa2Sopenharmony_ci    if (stream_ == nullptr) {
138fb299fa2Sopenharmony_ci        PKG_LOGE("Invalid stream");
139fb299fa2Sopenharmony_ci        return 0;
140fb299fa2Sopenharmony_ci    }
141fb299fa2Sopenharmony_ci    if (fileLength_ == 0) {
142fb299fa2Sopenharmony_ci        if (Seek(0, SEEK_END) != 0) {
143fb299fa2Sopenharmony_ci            PKG_LOGE("Invalid stream");
144fb299fa2Sopenharmony_ci            return 0;
145fb299fa2Sopenharmony_ci        }
146fb299fa2Sopenharmony_ci        off64_t ret = ftello64(stream_);
147fb299fa2Sopenharmony_ci        if (ret < 0) {
148fb299fa2Sopenharmony_ci            PKG_LOGE("ftell64 failed");
149fb299fa2Sopenharmony_ci            return 0;
150fb299fa2Sopenharmony_ci        }
151fb299fa2Sopenharmony_ci        fileLength_ = static_cast<size_t>(ret);
152fb299fa2Sopenharmony_ci        if (fseek(stream_, 0, SEEK_SET) != 0) {
153fb299fa2Sopenharmony_ci            PKG_LOGE("fseek failed");
154fb299fa2Sopenharmony_ci            return 0;
155fb299fa2Sopenharmony_ci        }
156fb299fa2Sopenharmony_ci    }
157fb299fa2Sopenharmony_ci    return fileLength_;
158fb299fa2Sopenharmony_ci}
159fb299fa2Sopenharmony_ci
160fb299fa2Sopenharmony_ciint32_t FileStream::Seek(long int offset, int whence)
161fb299fa2Sopenharmony_ci{
162fb299fa2Sopenharmony_ci    if (stream_ == nullptr) {
163fb299fa2Sopenharmony_ci        PKG_LOGE("Invalid stream");
164fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
165fb299fa2Sopenharmony_ci    }
166fb299fa2Sopenharmony_ci    return fseek(stream_, offset, whence);
167fb299fa2Sopenharmony_ci}
168fb299fa2Sopenharmony_ci
169fb299fa2Sopenharmony_ciint32_t FileStream::Flush(size_t size)
170fb299fa2Sopenharmony_ci{
171fb299fa2Sopenharmony_ci    if (stream_ == nullptr) {
172fb299fa2Sopenharmony_ci        PKG_LOGE("Invalid stream");
173fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
174fb299fa2Sopenharmony_ci    }
175fb299fa2Sopenharmony_ci    if (fileLength_ == 0) {
176fb299fa2Sopenharmony_ci        fileLength_ = size;
177fb299fa2Sopenharmony_ci    }
178fb299fa2Sopenharmony_ci    if (fseek(stream_, 0, SEEK_END) != 0) {
179fb299fa2Sopenharmony_ci        PKG_LOGE("fseek failed");
180fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
181fb299fa2Sopenharmony_ci    }
182fb299fa2Sopenharmony_ci    off64_t ret = ftello64(stream_);
183fb299fa2Sopenharmony_ci    if (ret < 0) {
184fb299fa2Sopenharmony_ci        PKG_LOGE("ftell64 failed");
185fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
186fb299fa2Sopenharmony_ci    }
187fb299fa2Sopenharmony_ci    fileLength_ = static_cast<size_t>(ret);
188fb299fa2Sopenharmony_ci    if (size != fileLength_) {
189fb299fa2Sopenharmony_ci        PKG_LOGE("Flush size %zu local size:%zu", size, fileLength_);
190fb299fa2Sopenharmony_ci    }
191fb299fa2Sopenharmony_ci    if (fflush(stream_) != 0) {
192fb299fa2Sopenharmony_ci        PKG_LOGE("Invalid stream");
193fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
194fb299fa2Sopenharmony_ci    }
195fb299fa2Sopenharmony_ci    return PKG_SUCCESS;
196fb299fa2Sopenharmony_ci}
197fb299fa2Sopenharmony_ci
198fb299fa2Sopenharmony_ciMemoryMapStream::~MemoryMapStream()
199fb299fa2Sopenharmony_ci{
200fb299fa2Sopenharmony_ci    if (memMap_ == nullptr) {
201fb299fa2Sopenharmony_ci        PKG_LOGE("Invalid memory map");
202fb299fa2Sopenharmony_ci        return;
203fb299fa2Sopenharmony_ci    }
204fb299fa2Sopenharmony_ci    if (streamType_ == PkgStreamType_MemoryMap) {
205fb299fa2Sopenharmony_ci        ReleaseMemory(memMap_, memSize_);
206fb299fa2Sopenharmony_ci    }
207fb299fa2Sopenharmony_ci}
208fb299fa2Sopenharmony_ci
209fb299fa2Sopenharmony_ciint32_t MemoryMapStream::Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen)
210fb299fa2Sopenharmony_ci{
211fb299fa2Sopenharmony_ci    if (memMap_ == nullptr) {
212fb299fa2Sopenharmony_ci        PKG_LOGE("Invalid memory map");
213fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
214fb299fa2Sopenharmony_ci    }
215fb299fa2Sopenharmony_ci    if (start > memSize_) {
216fb299fa2Sopenharmony_ci        PKG_LOGE("Invalid start");
217fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
218fb299fa2Sopenharmony_ci    }
219fb299fa2Sopenharmony_ci    if (data.length < needRead) {
220fb299fa2Sopenharmony_ci        PKG_LOGE("insufficient buffer capacity");
221fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
222fb299fa2Sopenharmony_ci    }
223fb299fa2Sopenharmony_ci    size_t copyLen = GetFileLength() - start;
224fb299fa2Sopenharmony_ci    readLen = ((copyLen > needRead) ? needRead : copyLen);
225fb299fa2Sopenharmony_ci    if (data.data.size() == 0) {
226fb299fa2Sopenharmony_ci        data.buffer = memMap_ + start;
227fb299fa2Sopenharmony_ci    } else {
228fb299fa2Sopenharmony_ci        if (memcpy_s(data.buffer, needRead, memMap_ + start, readLen) != EOK) {
229fb299fa2Sopenharmony_ci            PKG_LOGE("Memcpy failed size:%zu, start:%zu copyLen:%zu %zu", needRead, start, copyLen, readLen);
230fb299fa2Sopenharmony_ci            return PKG_NONE_MEMORY;
231fb299fa2Sopenharmony_ci        }
232fb299fa2Sopenharmony_ci    }
233fb299fa2Sopenharmony_ci    return PKG_SUCCESS;
234fb299fa2Sopenharmony_ci}
235fb299fa2Sopenharmony_ci
236fb299fa2Sopenharmony_ciint32_t MemoryMapStream::Write(const PkgBuffer &data, size_t size, size_t start)
237fb299fa2Sopenharmony_ci{
238fb299fa2Sopenharmony_ci    if (memMap_ == nullptr) {
239fb299fa2Sopenharmony_ci        PKG_LOGE("Invalid memory map");
240fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
241fb299fa2Sopenharmony_ci    }
242fb299fa2Sopenharmony_ci    if (start > memSize_) {
243fb299fa2Sopenharmony_ci        PKG_LOGE("Invalid start");
244fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
245fb299fa2Sopenharmony_ci    }
246fb299fa2Sopenharmony_ci
247fb299fa2Sopenharmony_ci    currOffset_ = start;
248fb299fa2Sopenharmony_ci    size_t copyLen = memSize_ - start;
249fb299fa2Sopenharmony_ci    if (copyLen < size) {
250fb299fa2Sopenharmony_ci        PKG_LOGE("Write fail copyLen %zu, %zu", copyLen, size);
251fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
252fb299fa2Sopenharmony_ci    }
253fb299fa2Sopenharmony_ci    int32_t ret = memcpy_s(memMap_ + currOffset_, memSize_ - currOffset_, data.buffer, size);
254fb299fa2Sopenharmony_ci    if (ret != PKG_SUCCESS) {
255fb299fa2Sopenharmony_ci        PKG_LOGE("Write fail");
256fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
257fb299fa2Sopenharmony_ci    }
258fb299fa2Sopenharmony_ci    PostDecodeProgress(POST_TYPE_DECODE_PKG, size, nullptr);
259fb299fa2Sopenharmony_ci    return PKG_SUCCESS;
260fb299fa2Sopenharmony_ci}
261fb299fa2Sopenharmony_ci
262fb299fa2Sopenharmony_ciint32_t MemoryMapStream::Seek(long int offset, int whence)
263fb299fa2Sopenharmony_ci{
264fb299fa2Sopenharmony_ci    if (whence == SEEK_SET) {
265fb299fa2Sopenharmony_ci        if (offset < 0) {
266fb299fa2Sopenharmony_ci            PKG_LOGE("Invalid offset");
267fb299fa2Sopenharmony_ci            return PKG_INVALID_STREAM;
268fb299fa2Sopenharmony_ci        }
269fb299fa2Sopenharmony_ci        if (static_cast<size_t>(offset) > memSize_) {
270fb299fa2Sopenharmony_ci            PKG_LOGE("Invalid offset");
271fb299fa2Sopenharmony_ci            return PKG_INVALID_STREAM;
272fb299fa2Sopenharmony_ci        }
273fb299fa2Sopenharmony_ci        currOffset_ = static_cast<size_t>(offset);
274fb299fa2Sopenharmony_ci    } else if (whence == SEEK_CUR) {
275fb299fa2Sopenharmony_ci        if (static_cast<size_t>(offset) > (memSize_ - currOffset_)) {
276fb299fa2Sopenharmony_ci            PKG_LOGE("Invalid offset");
277fb299fa2Sopenharmony_ci            return PKG_INVALID_STREAM;
278fb299fa2Sopenharmony_ci        }
279fb299fa2Sopenharmony_ci        currOffset_ += static_cast<size_t>(offset);
280fb299fa2Sopenharmony_ci    } else {
281fb299fa2Sopenharmony_ci        if (offset > 0) {
282fb299fa2Sopenharmony_ci            PKG_LOGE("Invalid offset");
283fb299fa2Sopenharmony_ci            return PKG_INVALID_STREAM;
284fb299fa2Sopenharmony_ci        }
285fb299fa2Sopenharmony_ci        auto memSize = static_cast<long long>(memSize_);
286fb299fa2Sopenharmony_ci        if (memSize + offset < 0) {
287fb299fa2Sopenharmony_ci            PKG_LOGE("Invalid offset");
288fb299fa2Sopenharmony_ci            return PKG_INVALID_STREAM;
289fb299fa2Sopenharmony_ci        }
290fb299fa2Sopenharmony_ci        currOffset_ = static_cast<size_t>(memSize + offset);
291fb299fa2Sopenharmony_ci    }
292fb299fa2Sopenharmony_ci    return PKG_SUCCESS;
293fb299fa2Sopenharmony_ci}
294fb299fa2Sopenharmony_ci
295fb299fa2Sopenharmony_ciint32_t FlowDataStream::Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen)
296fb299fa2Sopenharmony_ci{
297fb299fa2Sopenharmony_ci    if (readOffset_ != start) {
298fb299fa2Sopenharmony_ci        PKG_LOGE("Invalid start, readOffset_: %d, start: %d", readOffset_, start);
299fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
300fb299fa2Sopenharmony_ci    }
301fb299fa2Sopenharmony_ci
302fb299fa2Sopenharmony_ci    if (data.length < needRead) {
303fb299fa2Sopenharmony_ci        PKG_LOGE("Invalid need length");
304fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
305fb299fa2Sopenharmony_ci    }
306fb299fa2Sopenharmony_ci
307fb299fa2Sopenharmony_ci    if (data.buffer == nullptr) {
308fb299fa2Sopenharmony_ci        data.data.resize(needRead);
309fb299fa2Sopenharmony_ci        data.buffer = data.data.data();
310fb299fa2Sopenharmony_ci    }
311fb299fa2Sopenharmony_ci
312fb299fa2Sopenharmony_ci    readLen = 0;
313fb299fa2Sopenharmony_ci    uint8_t *buffer = nullptr;
314fb299fa2Sopenharmony_ci    while (needRead - readLen > 0) {
315fb299fa2Sopenharmony_ci        uint32_t readOnce = 0;
316fb299fa2Sopenharmony_ci        if (ReadFromRingBuf(buffer, needRead - readLen, readOnce) != PKG_SUCCESS) {
317fb299fa2Sopenharmony_ci            PKG_LOGE("Fail to read header");
318fb299fa2Sopenharmony_ci            return PKG_INVALID_STREAM;
319fb299fa2Sopenharmony_ci        }
320fb299fa2Sopenharmony_ci        if (buffer == nullptr || readOnce == 0) {
321fb299fa2Sopenharmony_ci            PKG_LOGE("Fail to read header, readOnce: %d", readOnce);
322fb299fa2Sopenharmony_ci            return PKG_INVALID_STREAM;
323fb299fa2Sopenharmony_ci        }
324fb299fa2Sopenharmony_ci        if (memcpy_s(data.buffer + readLen, readOnce, buffer, readOnce) != EOK) {
325fb299fa2Sopenharmony_ci            PKG_LOGE("Memcpy failed size:%zu, copyLen:%zu", needRead, readOnce);
326fb299fa2Sopenharmony_ci            return PKG_NONE_MEMORY;
327fb299fa2Sopenharmony_ci        }
328fb299fa2Sopenharmony_ci        readLen += readOnce;
329fb299fa2Sopenharmony_ci    }
330fb299fa2Sopenharmony_ci    readOffset_ += needRead;
331fb299fa2Sopenharmony_ci    return PKG_SUCCESS;
332fb299fa2Sopenharmony_ci}
333fb299fa2Sopenharmony_ci
334fb299fa2Sopenharmony_ciint32_t FlowDataStream::ReadFromRingBuf(uint8_t *&buff, const uint32_t needLen, uint32_t &readLen)
335fb299fa2Sopenharmony_ci{
336fb299fa2Sopenharmony_ci    if (ringBuf_ == nullptr) {
337fb299fa2Sopenharmony_ci        PKG_LOGE("ringBuf_ is nullptr");
338fb299fa2Sopenharmony_ci        buff = nullptr;
339fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
340fb299fa2Sopenharmony_ci    }
341fb299fa2Sopenharmony_ci
342fb299fa2Sopenharmony_ci    // buf_ is empty, read from ringbuf
343fb299fa2Sopenharmony_ci    if ((avail_ == 0) && !ringBuf_->Pop(buff_, MAX_FLOW_BUFFER_SIZE, avail_)) {
344fb299fa2Sopenharmony_ci        PKG_LOGE("read data fail");
345fb299fa2Sopenharmony_ci        buff = nullptr;
346fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
347fb299fa2Sopenharmony_ci    }
348fb299fa2Sopenharmony_ci
349fb299fa2Sopenharmony_ci    buff = buff_ + bufOffset_;
350fb299fa2Sopenharmony_ci    readLen = needLen <= avail_ ? needLen : avail_;
351fb299fa2Sopenharmony_ci    avail_ -= readLen;
352fb299fa2Sopenharmony_ci    bufOffset_ = avail_ == 0 ? 0 : bufOffset_ + readLen;
353fb299fa2Sopenharmony_ci    return PKG_SUCCESS;
354fb299fa2Sopenharmony_ci}
355fb299fa2Sopenharmony_ci
356fb299fa2Sopenharmony_ciint32_t FlowDataStream::Write(const PkgBuffer &data, size_t size, size_t start)
357fb299fa2Sopenharmony_ci{
358fb299fa2Sopenharmony_ci    if (ringBuf_ == nullptr) {
359fb299fa2Sopenharmony_ci        PKG_LOGE("ringBuf_ is nullptr");
360fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
361fb299fa2Sopenharmony_ci    }
362fb299fa2Sopenharmony_ci
363fb299fa2Sopenharmony_ci    if (writeOffset_ != start) {
364fb299fa2Sopenharmony_ci        PKG_LOGE("Invalid start, writeOffset: %zu, start: %zu", writeOffset_, start);
365fb299fa2Sopenharmony_ci        return PKG_INVALID_STREAM;
366fb299fa2Sopenharmony_ci    }
367fb299fa2Sopenharmony_ci
368fb299fa2Sopenharmony_ci    if (ringBuf_->Push(data.buffer, size)) {
369fb299fa2Sopenharmony_ci        writeOffset_ += size;
370fb299fa2Sopenharmony_ci        return PKG_SUCCESS;
371fb299fa2Sopenharmony_ci    }
372fb299fa2Sopenharmony_ci    PKG_LOGE("Write ring buffer fail");
373fb299fa2Sopenharmony_ci    return PKG_INVALID_STREAM;
374fb299fa2Sopenharmony_ci}
375fb299fa2Sopenharmony_ci
376fb299fa2Sopenharmony_civoid FlowDataStream::Stop()
377fb299fa2Sopenharmony_ci{
378fb299fa2Sopenharmony_ci    PKG_LOGI("FlowDataStream stop");
379fb299fa2Sopenharmony_ci    if (ringBuf_ != nullptr) {
380fb299fa2Sopenharmony_ci        ringBuf_->Stop();
381fb299fa2Sopenharmony_ci    }
382fb299fa2Sopenharmony_ci}
383fb299fa2Sopenharmony_ci} // namespace Hpackage
384