1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "power_message_queue.h"
16
17 #include <dlfcn.h>
18 #include <unistd.h>
19
20 #include "securec.h"
21
22
PowerMessageQueue(size_t maxSize)23 PowerMessageQueue::PowerMessageQueue(size_t maxSize)
24 {
25 maxSize_ = maxSize;
26 }
27
~PowerMessageQueue()28 PowerMessageQueue::~PowerMessageQueue() {}
29
IsShutDown()30 bool PowerMessageQueue::IsShutDown()
31 {
32 return this->shutDown_;
33 }
34
ShutDown()35 void PowerMessageQueue::ShutDown()
36 {
37 std::unique_lock<std::mutex> lock(mutex_);
38 dataQueue_.clear();
39 this->shutDown_ = true;
40 fullCon_.notify_all();
41 emptyCon_.notify_all();
42 }
43
WaitAndPop(std::shared_ptr<PowerOptimizeData> &value, const std::chrono::microseconds realTime)44 bool PowerMessageQueue::WaitAndPop(std::shared_ptr<PowerOptimizeData> &value, const std::chrono::microseconds realTime)
45 {
46 // the relative timeout rel_time expires
47 std::unique_lock<std::mutex> lock(mutex_);
48 if (IsShutDown()) {
49 return false;
50 }
51 if (emptyCon_.wait_for(lock, realTime, [&] { return !dataQueue_.empty(); })) {
52 value = dataQueue_.front();
53 dataQueue_.pop_front();
54 } else {
55 return false;
56 }
57 lock.unlock();
58 fullCon_.notify_one();
59 return true;
60 }
61
WaitAndPopBatch(std::vector<std::shared_ptr<PowerOptimizeData>> &array, const std::chrono::microseconds realTime, size_t batchCount)62 bool PowerMessageQueue::WaitAndPopBatch(std::vector<std::shared_ptr<PowerOptimizeData>> &array,
63 const std::chrono::microseconds realTime, size_t batchCount)
64 {
65 std::unique_lock<std::mutex> lock(mutex_);
66 if (IsShutDown()) {
67 return false;
68 }
69 uint32_t queueSize = 0;
70 if (emptyCon_.wait_for(lock, realTime, [&] { return !dataQueue_.empty(); })) {
71 queueSize = dataQueue_.size();
72 size_t resultSize = queueSize > batchCount ? batchCount : queueSize;
73 for (size_t i = 0; i < resultSize; i++) {
74 std::shared_ptr<PowerOptimizeData> result = dataQueue_.front();
75 dataQueue_.pop_front();
76 array[i] = result;
77 }
78 } else {
79 return false;
80 }
81 lock.unlock();
82 fullCon_.notify_one();
83 return true;
84 }
85
PushBack(std::shared_ptr<PowerOptimizeData> &item)86 void PowerMessageQueue::PushBack(std::shared_ptr<PowerOptimizeData> &item)
87 {
88 std::unique_lock<std::mutex> mlock(mutex_);
89 while (dataQueue_.size() >= maxSize_) {
90 fullCon_.wait(mlock);
91 }
92 dataQueue_.push_back(item);
93 mlock.unlock(); // unlock before notificiation to minimize mutex con
94 emptyCon_.notify_one(); // notify one waiting thread
95 }
96
Size()97 size_t PowerMessageQueue::Size()
98 {
99 std::unique_lock<std::mutex> mlock(mutex_);
100 size_t size = dataQueue_.size();
101 mlock.unlock();
102 return size;
103 }
104
Empty()105 bool PowerMessageQueue::Empty()
106 {
107 std::unique_lock<std::mutex> mlock(mutex_);
108 bool isEmpty = dataQueue_.empty();
109 mlock.unlock();
110 return isEmpty;
111 }
112