1/* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15#include <iostream> 16#include <cstring> 17#include <string> 18#include <thread> 19 20#include "log.h" 21#include "securec.h" 22#include "ring_buffer.h" 23 24namespace Updater { 25RingBuffer::~RingBuffer() 26{ 27 Release(); 28} 29 30bool RingBuffer::Init(uint32_t singleSize, uint32_t num) 31{ 32 if (singleSize == 0 || num == 0 || (num & (num - 1)) != 0) { // power of 2 33 LOG(ERROR) << "singleSize:" << singleSize << " num:" << num << " error"; 34 return false; 35 } 36 bufArray_ = new (std::nothrow) uint8_t* [num] {}; 37 lenArray_ = new (std::nothrow) uint32_t [num] {}; 38 if (bufArray_ == nullptr || lenArray_ == nullptr) { 39 LOG(ERROR) << "new buf or len " << num << " error"; 40 return false; 41 } 42 for (uint32_t i = 0; i < num; i++) { 43 bufArray_[i] = new (std::nothrow) uint8_t [singleSize] {}; 44 if (bufArray_[i] == nullptr) { 45 LOG(ERROR) << "new buf " << i << " size " << singleSize << " error"; 46 return false; 47 } 48 } 49 50 writeIndex_ = 0; 51 readIndex_ = 0; 52 num_ = num; 53 singleSize_ = singleSize; 54 return true; 55} 56 57void RingBuffer::Reset() 58{ 59 isStop_ = false; 60 writeIndex_ = 0; 61 readIndex_ = 0; 62 for (uint32_t i = 0; i < num_; ++i) { 63 lenArray_[i] = 0; 64 } 65} 66 67bool RingBuffer::IsFull() 68{ 69 // writeIndex readIndex real size: 0 ~ num_ -1, logic size: 0 ~ 2num_ - 1 70 // when writeIndex_ - readIndex_ == n means full 71 return writeIndex_ == (readIndex_ ^ num_); 72} 73 74bool RingBuffer::IsEmpty() 75{ 76 // writeIndex readIndex real size: 0 ~ num_ -1, logic size: 0 ~ 2num_ - 1 77 // when same means empty 78 return writeIndex_ == readIndex_; 79} 80 81bool RingBuffer::Push(uint8_t *buf, uint32_t len) 82{ 83 if (buf == nullptr || len == 0 || len > singleSize_) { 84 LOG(ERROR) << "RingBuffer push error, len:" << len << " singleSize:" << singleSize_; 85 return false; 86 } 87 if (IsFull()) { 88 std::unique_lock<std::mutex> pushLock(notifyMtx_); 89 while (IsFull()) { 90 if (isStop_) { 91 LOG(WARNING) << "RingBuffer push stopped"; 92 return false; 93 } 94 LOG(DEBUG) << "RingBuffer full, wait !!!"; 95 notFull_.wait(pushLock); 96 } 97 } 98 99 uint32_t index = writeIndex_ & (num_ - 1); 100 if (memcpy_s(bufArray_[index], singleSize_, buf, len) != EOK) { 101 LOG(ERROR) << "memcpy error, len:" << len; 102 return false; 103 } 104 lenArray_[index] = len; 105 writeIndex_ = (writeIndex_ + 1) & (2 * num_ - 1); // 2: logic buffer size 106 107 std::unique_lock<std::mutex> popLock(notifyMtx_); 108 notEmpty_.notify_all(); 109 return true; 110} 111 112bool RingBuffer::Pop(uint8_t *buf, uint32_t maxLen, uint32_t &len) 113{ 114 if (buf == nullptr) { 115 LOG(ERROR) << "RingBuffer pop para error"; 116 return false; 117 } 118 if (IsEmpty()) { 119 std::unique_lock<std::mutex> popLock(notifyMtx_); 120 while (IsEmpty()) { 121 if (isStop_) { 122 LOG(WARNING) << "RingBuffer pop stopped"; 123 return false; 124 } 125 LOG(DEBUG) << "RingBuffer empty, wait !!!"; 126 notEmpty_.wait(popLock); 127 } 128 } 129 130 uint32_t index = readIndex_ & (num_ - 1); 131 if (memcpy_s(buf, maxLen, bufArray_[index], lenArray_[index]) != EOK) { 132 LOG(ERROR) << "memcpy error, len:" << lenArray_[index]; 133 return false; 134 } 135 len = lenArray_[index]; 136 readIndex_ = (readIndex_ + 1) & (2 * num_ - 1); // 2: logic buffer size 137 138 std::unique_lock<std::mutex> popLock(notifyMtx_); 139 notFull_.notify_all(); 140 return true; 141} 142 143void RingBuffer::Stop() 144{ 145 isStop_ = true; 146 notFull_.notify_all(); 147 notEmpty_.notify_all(); 148} 149 150void RingBuffer::StopPush() 151{ 152 { 153 std::unique_lock<std::mutex> pushLock(notifyMtx_); 154 isStop_ = true; 155 } 156 notFull_.notify_all(); 157} 158 159void RingBuffer::StopPop() 160{ 161 { 162 std::unique_lock<std::mutex> popLock(notifyMtx_); 163 isStop_ = true; 164 } 165 notEmpty_.notify_all(); 166} 167 168void RingBuffer::Release() 169{ 170 if (lenArray_ != nullptr) { 171 delete[] lenArray_; 172 lenArray_ = nullptr; 173 } 174 175 if (bufArray_ != nullptr) { 176 for (uint32_t i = 0; i < num_ && bufArray_[i] != nullptr; i++) { 177 delete[] bufArray_[i]; 178 bufArray_[i] = nullptr; 179 } 180 delete[] bufArray_; 181 bufArray_ = nullptr; 182 } 183} 184} // namespace Updater 185