1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "power_message_queue.h"
16 
17 #include <dlfcn.h>
18 #include <unistd.h>
19 
20 #include "securec.h"
21 
22 
PowerMessageQueue(size_t maxSize)23 PowerMessageQueue::PowerMessageQueue(size_t maxSize)
24 {
25     maxSize_ = maxSize;
26 }
27 
~PowerMessageQueue()28 PowerMessageQueue::~PowerMessageQueue() {}
29 
IsShutDown()30 bool PowerMessageQueue::IsShutDown()
31 {
32     return this->shutDown_;
33 }
34 
ShutDown()35 void PowerMessageQueue::ShutDown()
36 {
37     std::unique_lock<std::mutex> lock(mutex_);
38     dataQueue_.clear();
39     this->shutDown_ = true;
40     fullCon_.notify_all();
41     emptyCon_.notify_all();
42 }
43 
WaitAndPop(std::shared_ptr<PowerOptimizeData> &value, const std::chrono::microseconds realTime)44 bool PowerMessageQueue::WaitAndPop(std::shared_ptr<PowerOptimizeData> &value, const std::chrono::microseconds realTime)
45 {
46     // the relative timeout rel_time expires
47     std::unique_lock<std::mutex> lock(mutex_);
48     if (IsShutDown()) {
49         return false;
50     }
51     if (emptyCon_.wait_for(lock, realTime, [&] { return !dataQueue_.empty(); })) {
52         value = dataQueue_.front();
53         dataQueue_.pop_front();
54     } else {
55         return false;
56     }
57     lock.unlock();
58     fullCon_.notify_one();
59     return true;
60 }
61 
WaitAndPopBatch(std::vector<std::shared_ptr<PowerOptimizeData>> &array, const std::chrono::microseconds realTime, size_t batchCount)62 bool PowerMessageQueue::WaitAndPopBatch(std::vector<std::shared_ptr<PowerOptimizeData>> &array,
63                                         const std::chrono::microseconds realTime, size_t batchCount)
64 {
65     std::unique_lock<std::mutex> lock(mutex_);
66     if (IsShutDown()) {
67         return false;
68     }
69     uint32_t queueSize = 0;
70     if (emptyCon_.wait_for(lock, realTime, [&] { return !dataQueue_.empty(); })) {
71         queueSize = dataQueue_.size();
72         size_t resultSize = queueSize > batchCount ? batchCount : queueSize;
73         for (size_t i = 0; i < resultSize; i++) {
74             std::shared_ptr<PowerOptimizeData> result = dataQueue_.front();
75             dataQueue_.pop_front();
76             array[i] = result;
77         }
78     } else {
79         return false;
80     }
81     lock.unlock();
82     fullCon_.notify_one();
83     return true;
84 }
85 
PushBack(std::shared_ptr<PowerOptimizeData> &item)86 void PowerMessageQueue::PushBack(std::shared_ptr<PowerOptimizeData> &item)
87 {
88     std::unique_lock<std::mutex> mlock(mutex_);
89     while (dataQueue_.size() >= maxSize_) {
90         fullCon_.wait(mlock);
91     }
92     dataQueue_.push_back(item);
93     mlock.unlock();         // unlock before notificiation to minimize mutex con
94     emptyCon_.notify_one(); // notify one waiting thread
95 }
96 
Size()97 size_t PowerMessageQueue::Size()
98 {
99     std::unique_lock<std::mutex> mlock(mutex_);
100     size_t size = dataQueue_.size();
101     mlock.unlock();
102     return size;
103 }
104 
Empty()105 bool PowerMessageQueue::Empty()
106 {
107     std::unique_lock<std::mutex> mlock(mutex_);
108     bool isEmpty = dataQueue_.empty();
109     mlock.unlock();
110     return isEmpty;
111 }
112