1/*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15#ifndef PKG_STREAM_H
16#define PKG_STREAM_H
17#ifndef __WIN32
18#include <sys/mman.h>
19#endif
20#include <atomic>
21#include "pkg_manager.h"
22#include "pkg_utils.h"
23#include "ring_buffer/ring_buffer.h"
24
25namespace Hpackage {
26class PkgStreamImpl : public PkgStream {
27public:
28    explicit PkgStreamImpl(PkgManager::PkgManagerPtr pkgManager, const std::string fileName)
29        : fileName_(fileName), refCount_(0), pkgManager_(pkgManager) {}
30
31    virtual ~PkgStreamImpl() {}
32
33    int32_t Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen) override
34    {
35        UNUSED(data);
36        UNUSED(start);
37        UNUSED(readLen);
38        UNUSED(needRead);
39        return PKG_SUCCESS;
40    }
41
42    int32_t GetBuffer(PkgBuffer &buffer) const override
43    {
44        buffer.length = 0;
45        buffer.buffer = nullptr;
46        return PKG_SUCCESS;
47    }
48
49    int32_t Write(const PkgBuffer &data, size_t size, size_t start) override
50    {
51        UNUSED(data);
52        UNUSED(size);
53        UNUSED(start);
54        return PKG_SUCCESS;
55    }
56
57    virtual int32_t Seek(long int offset, int whence) = 0;
58
59    int32_t Flush(size_t size) override
60    {
61        UNUSED(size);
62        return PKG_SUCCESS;
63    }
64
65    const std::string GetFileName() const override;
66
67    int32_t GetStreamType() const override
68    {
69        return PkgStreamType_Read;
70    };
71
72    void AddRef() override;
73
74    void DelRef() override;
75
76    bool IsRef() const override;
77
78    static PkgStreamPtr ConvertPkgStream(PkgManager::StreamPtr stream);
79
80protected:
81    void PostDecodeProgress(int type, size_t writeDataLen, const void *context) const;
82    std::string fileName_;
83
84private:
85    std::atomic_int refCount_;
86    PkgManager::PkgManagerPtr pkgManager_ = nullptr;
87};
88
89class FileStream : public PkgStreamImpl {
90public:
91    FileStream(PkgManager::PkgManagerPtr pkgManager, const std::string fileName, FILE *stream, int32_t streamType)
92        : PkgStreamImpl(pkgManager, fileName), stream_(stream), fileLength_(0), streamType_(streamType) {}
93
94    ~FileStream() override;
95
96    int32_t Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen) override;
97
98    int32_t Write(const PkgBuffer &data, size_t size, size_t start) override;
99
100    int32_t Seek(long int offset, int whence) override;
101
102    int32_t Flush(size_t size) override;
103
104    size_t GetFileLength() override;
105
106    int32_t GetStreamType() const override
107    {
108        return streamType_;
109    }
110
111private:
112    FILE *stream_;
113    size_t fileLength_;
114    int32_t streamType_;
115};
116
117class MemoryMapStream : public PkgStreamImpl {
118public:
119    MemoryMapStream(PkgManager::PkgManagerPtr pkgManager, const std::string fileName, const PkgBuffer &buffer,
120        int32_t streamType = PkgStreamType_MemoryMap) : PkgStreamImpl(pkgManager, fileName), memMap_(buffer.buffer),
121        memSize_(buffer.length), currOffset_(0), streamType_(streamType) {}
122    ~MemoryMapStream() override;
123
124    int32_t Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen) override;
125
126    int32_t Write(const PkgBuffer &data, size_t size, size_t start) override;
127
128    int32_t Seek(long int offset, int whence) override;
129
130    int32_t GetStreamType() const override
131    {
132        return streamType_;
133    }
134
135    size_t GetFileLength() override
136    {
137        return memSize_;
138    }
139
140    int32_t Flush(size_t size) override
141    {
142        if (size != memSize_) {
143            PKG_LOGE("Flush size %zu local size:%zu", size, memSize_);
144        }
145        if (streamType_ == PkgStreamType_MemoryMap) {
146            msync(static_cast<void *>(memMap_), memSize_, MS_ASYNC);
147        }
148        currOffset_ = size;
149        return PKG_SUCCESS;
150    }
151
152    int32_t GetBuffer(PkgBuffer &buffer) const override
153    {
154        buffer.buffer = memMap_;
155        buffer.length = memSize_;
156        return PKG_SUCCESS;
157    }
158
159private:
160    uint8_t *memMap_;
161    size_t memSize_;
162    size_t currOffset_;
163    int32_t streamType_;
164};
165
166class ProcessorStream : public PkgStreamImpl {
167public:
168    ProcessorStream(PkgManager::PkgManagerPtr pkgManager, const std::string fileName,
169        ExtractFileProcessor processor, const void *context)
170        : PkgStreamImpl(pkgManager, fileName), processor_(processor), context_(context) {}
171
172    ~ProcessorStream() override {}
173
174    int32_t Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen) override
175    {
176        UNUSED(data);
177        UNUSED(start);
178        UNUSED(readLen);
179        UNUSED(needRead);
180        return PKG_INVALID_STREAM;
181    }
182
183    int32_t Write(const PkgBuffer &data, size_t size, size_t start) override
184    {
185        if (processor_ == nullptr) {
186            PKG_LOGE("processor not exist");
187            return PKG_INVALID_STREAM;
188        }
189        int ret = processor_(data, size, start, false, context_);
190        PostDecodeProgress(POST_TYPE_DECODE_PKG, size, nullptr);
191        return ret;
192    }
193
194    int32_t Seek(long int size, int whence) override
195    {
196        UNUSED(size);
197        UNUSED(whence);
198        return PKG_SUCCESS;
199    }
200
201    int32_t GetStreamType() const override
202    {
203        return PkgStreamType_Process;
204    }
205
206    size_t GetFileLength() override
207    {
208        return 0;
209    }
210
211    int32_t Flush(size_t size) override
212    {
213        UNUSED(size);
214        if (processor_ == nullptr) {
215            PKG_LOGE("processor not exist");
216            return PKG_INVALID_STREAM;
217        }
218        PkgBuffer data = {};
219        return processor_(data, 0, 0, true, context_);
220    }
221
222private:
223    ExtractFileProcessor processor_ = nullptr;
224    const void *context_;
225};
226
227constexpr uint32_t MAX_FLOW_BUFFER_SIZE = 4 * 1024 * 1024;
228
229class FlowDataStream : public Hpackage::PkgStreamImpl {
230public:
231    FlowDataStream(Hpackage::PkgManager::PkgManagerPtr pkgManager, const std::string fileName,
232        const size_t fileSize, Updater::RingBuffer *buffer, int32_t streamType =
233        PkgStreamType_FlowData) : PkgStreamImpl(pkgManager, fileName), fileLength_(fileSize),
234        ringBuf_(buffer), streamType_(streamType) {}
235    ~FlowDataStream() override {}
236
237    int32_t Read(Hpackage::PkgBuffer &data, size_t start, size_t needRead, size_t &readLen) override;
238
239    int32_t Write(const Hpackage::PkgBuffer &data, size_t size, size_t start) override;
240
241    int32_t Seek(long int offset, int whence) override
242    {
243        UNUSED(offset);
244        UNUSED(whence);
245        return Hpackage::PKG_INVALID_STREAM;
246    }
247
248    int32_t GetStreamType() const override
249    {
250        return streamType_;
251    }
252
253    size_t GetFileLength() override
254    {
255        return fileLength_;
256    }
257
258    int32_t Flush(size_t size) override
259    {
260        UNUSED(size);
261        return Hpackage::PKG_INVALID_STREAM;
262    }
263
264    size_t GetReadOffset() const override
265    {
266        return readOffset_;
267    }
268
269    void Stop() override;
270
271private:
272    int32_t ReadFromRingBuf(uint8_t *&buff, const uint32_t needLen, uint32_t &readLen);
273
274    size_t fileLength_ {};
275    Updater::RingBuffer *ringBuf_ {};
276    int32_t streamType_;
277    uint8_t buff_[MAX_FLOW_BUFFER_SIZE] = {0};
278    uint32_t avail_ {};
279    uint32_t bufOffset_ {};
280    size_t readOffset_ {};
281    size_t writeOffset_ {};
282};
283} // namespace Hpackage
284#endif // PKG_STREAM_H
285