1/*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15#include <iostream>
16#include <cstring>
17#include <string>
18#include <thread>
19
20#include "log.h"
21#include "securec.h"
22#include "ring_buffer.h"
23
24namespace Updater {
25RingBuffer::~RingBuffer()
26{
27    Release();
28}
29
30bool RingBuffer::Init(uint32_t singleSize, uint32_t num)
31{
32    if (singleSize == 0 || num == 0 || (num & (num - 1)) != 0) { // power of 2
33        LOG(ERROR) << "singleSize:" <<  singleSize << " num:" << num << " error";
34        return false;
35    }
36    bufArray_ = new (std::nothrow) uint8_t* [num] {};
37    lenArray_ = new (std::nothrow) uint32_t [num] {};
38    if (bufArray_ == nullptr || lenArray_ == nullptr) {
39        LOG(ERROR) << "new buf or len " << num << " error";
40        return false;
41    }
42    for (uint32_t i = 0; i < num; i++) {
43        bufArray_[i] = new (std::nothrow) uint8_t [singleSize] {};
44        if (bufArray_[i] == nullptr) {
45            LOG(ERROR) << "new buf " << i << " size " << singleSize << " error";
46            return false;
47        }
48    }
49
50    writeIndex_ = 0;
51    readIndex_ = 0;
52    num_ = num;
53    singleSize_ = singleSize;
54    return true;
55}
56
57void RingBuffer::Reset()
58{
59    isStop_ = false;
60    writeIndex_ = 0;
61    readIndex_ = 0;
62    for (uint32_t i = 0; i < num_; ++i) {
63        lenArray_[i] = 0;
64    }
65}
66
67bool RingBuffer::IsFull()
68{
69    // writeIndex readIndex real size: 0 ~ num_ -1, logic size: 0 ~ 2num_ - 1
70    // when writeIndex_ - readIndex_ == n means full
71    return writeIndex_ == (readIndex_ ^ num_);
72}
73
74bool RingBuffer::IsEmpty()
75{
76    // writeIndex readIndex real size: 0 ~ num_ -1, logic size: 0 ~ 2num_ - 1
77    // when same means empty
78    return writeIndex_ == readIndex_;
79}
80
81bool RingBuffer::Push(uint8_t *buf, uint32_t len)
82{
83    if (buf == nullptr || len == 0 || len > singleSize_) {
84        LOG(ERROR) << "RingBuffer push error, len:" << len << " singleSize:" << singleSize_;
85        return false;
86    }
87    if (IsFull()) {
88        std::unique_lock<std::mutex> pushLock(notifyMtx_);
89        while (IsFull()) {
90            if (isStop_) {
91                LOG(WARNING) << "RingBuffer push stopped";
92                return false;
93            }
94            LOG(DEBUG) << "RingBuffer full, wait !!!";
95            notFull_.wait(pushLock);
96        }
97    }
98
99    uint32_t index = writeIndex_ & (num_ - 1);
100    if (memcpy_s(bufArray_[index], singleSize_, buf, len) != EOK) {
101        LOG(ERROR) << "memcpy error, len:" << len;
102        return false;
103    }
104    lenArray_[index] = len;
105    writeIndex_ = (writeIndex_ + 1) & (2 * num_ - 1); // 2: logic buffer size
106
107    std::unique_lock<std::mutex> popLock(notifyMtx_);
108    notEmpty_.notify_all();
109    return true;
110}
111
112bool RingBuffer::Pop(uint8_t *buf, uint32_t maxLen, uint32_t &len)
113{
114    if (buf == nullptr) {
115        LOG(ERROR) << "RingBuffer pop para error";
116        return false;
117    }
118    if (IsEmpty()) {
119        std::unique_lock<std::mutex> popLock(notifyMtx_);
120        while (IsEmpty()) {
121            if (isStop_) {
122                LOG(WARNING) << "RingBuffer pop stopped";
123                return false;
124            }
125            LOG(DEBUG) << "RingBuffer empty, wait !!!";
126            notEmpty_.wait(popLock);
127        }
128    }
129
130    uint32_t index = readIndex_ & (num_ - 1);
131    if (memcpy_s(buf, maxLen, bufArray_[index], lenArray_[index]) != EOK) {
132        LOG(ERROR) << "memcpy error, len:" << lenArray_[index];
133        return false;
134    }
135    len = lenArray_[index];
136    readIndex_ = (readIndex_ + 1) & (2 * num_ - 1); // 2: logic buffer size
137
138    std::unique_lock<std::mutex> popLock(notifyMtx_);
139    notFull_.notify_all();
140    return true;
141}
142
143void RingBuffer::Stop()
144{
145    isStop_ = true;
146    notFull_.notify_all();
147    notEmpty_.notify_all();
148}
149
150void RingBuffer::StopPush()
151{
152    {
153        std::unique_lock<std::mutex> pushLock(notifyMtx_);
154        isStop_ = true;
155    }
156    notFull_.notify_all();
157}
158
159void RingBuffer::StopPop()
160{
161    {
162        std::unique_lock<std::mutex> popLock(notifyMtx_);
163        isStop_ = true;
164    }
165    notEmpty_.notify_all();
166}
167
168void RingBuffer::Release()
169{
170    if (lenArray_ != nullptr) {
171        delete[] lenArray_;
172        lenArray_ = nullptr;
173    }
174
175    if (bufArray_ != nullptr) {
176        for (uint32_t i = 0; i < num_ && bufArray_[i] != nullptr; i++) {
177            delete[] bufArray_[i];
178            bufArray_[i] = nullptr;
179        }
180        delete[] bufArray_;
181        bufArray_ = nullptr;
182    }
183}
184} // namespace Updater
185