106f6ba60Sopenharmony_ci/*
206f6ba60Sopenharmony_ci * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
306f6ba60Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
406f6ba60Sopenharmony_ci * you may not use this file except in compliance with the License.
506f6ba60Sopenharmony_ci * You may obtain a copy of the License at
606f6ba60Sopenharmony_ci *
706f6ba60Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0
806f6ba60Sopenharmony_ci *
906f6ba60Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software
1006f6ba60Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
1106f6ba60Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1206f6ba60Sopenharmony_ci * See the License for the specific language governing permissions and
1306f6ba60Sopenharmony_ci * limitations under the License.
1406f6ba60Sopenharmony_ci */
1506f6ba60Sopenharmony_ci#include "power_message_queue.h"
1606f6ba60Sopenharmony_ci
1706f6ba60Sopenharmony_ci#include <dlfcn.h>
1806f6ba60Sopenharmony_ci#include <unistd.h>
1906f6ba60Sopenharmony_ci
2006f6ba60Sopenharmony_ci#include "securec.h"
2106f6ba60Sopenharmony_ci
2206f6ba60Sopenharmony_ci
2306f6ba60Sopenharmony_ciPowerMessageQueue::PowerMessageQueue(size_t maxSize)
2406f6ba60Sopenharmony_ci{
2506f6ba60Sopenharmony_ci    maxSize_ = maxSize;
2606f6ba60Sopenharmony_ci}
2706f6ba60Sopenharmony_ci
2806f6ba60Sopenharmony_ciPowerMessageQueue::~PowerMessageQueue() {}
2906f6ba60Sopenharmony_ci
3006f6ba60Sopenharmony_cibool PowerMessageQueue::IsShutDown()
3106f6ba60Sopenharmony_ci{
3206f6ba60Sopenharmony_ci    return this->shutDown_;
3306f6ba60Sopenharmony_ci}
3406f6ba60Sopenharmony_ci
3506f6ba60Sopenharmony_civoid PowerMessageQueue::ShutDown()
3606f6ba60Sopenharmony_ci{
3706f6ba60Sopenharmony_ci    std::unique_lock<std::mutex> lock(mutex_);
3806f6ba60Sopenharmony_ci    dataQueue_.clear();
3906f6ba60Sopenharmony_ci    this->shutDown_ = true;
4006f6ba60Sopenharmony_ci    fullCon_.notify_all();
4106f6ba60Sopenharmony_ci    emptyCon_.notify_all();
4206f6ba60Sopenharmony_ci}
4306f6ba60Sopenharmony_ci
4406f6ba60Sopenharmony_cibool PowerMessageQueue::WaitAndPop(std::shared_ptr<PowerOptimizeData> &value, const std::chrono::microseconds realTime)
4506f6ba60Sopenharmony_ci{
4606f6ba60Sopenharmony_ci    // the relative timeout rel_time expires
4706f6ba60Sopenharmony_ci    std::unique_lock<std::mutex> lock(mutex_);
4806f6ba60Sopenharmony_ci    if (IsShutDown()) {
4906f6ba60Sopenharmony_ci        return false;
5006f6ba60Sopenharmony_ci    }
5106f6ba60Sopenharmony_ci    if (emptyCon_.wait_for(lock, realTime, [&] { return !dataQueue_.empty(); })) {
5206f6ba60Sopenharmony_ci        value = dataQueue_.front();
5306f6ba60Sopenharmony_ci        dataQueue_.pop_front();
5406f6ba60Sopenharmony_ci    } else {
5506f6ba60Sopenharmony_ci        return false;
5606f6ba60Sopenharmony_ci    }
5706f6ba60Sopenharmony_ci    lock.unlock();
5806f6ba60Sopenharmony_ci    fullCon_.notify_one();
5906f6ba60Sopenharmony_ci    return true;
6006f6ba60Sopenharmony_ci}
6106f6ba60Sopenharmony_ci
6206f6ba60Sopenharmony_cibool PowerMessageQueue::WaitAndPopBatch(std::vector<std::shared_ptr<PowerOptimizeData>> &array,
6306f6ba60Sopenharmony_ci                                        const std::chrono::microseconds realTime, size_t batchCount)
6406f6ba60Sopenharmony_ci{
6506f6ba60Sopenharmony_ci    std::unique_lock<std::mutex> lock(mutex_);
6606f6ba60Sopenharmony_ci    if (IsShutDown()) {
6706f6ba60Sopenharmony_ci        return false;
6806f6ba60Sopenharmony_ci    }
6906f6ba60Sopenharmony_ci    uint32_t queueSize = 0;
7006f6ba60Sopenharmony_ci    if (emptyCon_.wait_for(lock, realTime, [&] { return !dataQueue_.empty(); })) {
7106f6ba60Sopenharmony_ci        queueSize = dataQueue_.size();
7206f6ba60Sopenharmony_ci        size_t resultSize = queueSize > batchCount ? batchCount : queueSize;
7306f6ba60Sopenharmony_ci        for (size_t i = 0; i < resultSize; i++) {
7406f6ba60Sopenharmony_ci            std::shared_ptr<PowerOptimizeData> result = dataQueue_.front();
7506f6ba60Sopenharmony_ci            dataQueue_.pop_front();
7606f6ba60Sopenharmony_ci            array[i] = result;
7706f6ba60Sopenharmony_ci        }
7806f6ba60Sopenharmony_ci    } else {
7906f6ba60Sopenharmony_ci        return false;
8006f6ba60Sopenharmony_ci    }
8106f6ba60Sopenharmony_ci    lock.unlock();
8206f6ba60Sopenharmony_ci    fullCon_.notify_one();
8306f6ba60Sopenharmony_ci    return true;
8406f6ba60Sopenharmony_ci}
8506f6ba60Sopenharmony_ci
8606f6ba60Sopenharmony_civoid PowerMessageQueue::PushBack(std::shared_ptr<PowerOptimizeData> &item)
8706f6ba60Sopenharmony_ci{
8806f6ba60Sopenharmony_ci    std::unique_lock<std::mutex> mlock(mutex_);
8906f6ba60Sopenharmony_ci    while (dataQueue_.size() >= maxSize_) {
9006f6ba60Sopenharmony_ci        fullCon_.wait(mlock);
9106f6ba60Sopenharmony_ci    }
9206f6ba60Sopenharmony_ci    dataQueue_.push_back(item);
9306f6ba60Sopenharmony_ci    mlock.unlock();         // unlock before notificiation to minimize mutex con
9406f6ba60Sopenharmony_ci    emptyCon_.notify_one(); // notify one waiting thread
9506f6ba60Sopenharmony_ci}
9606f6ba60Sopenharmony_ci
9706f6ba60Sopenharmony_cisize_t PowerMessageQueue::Size()
9806f6ba60Sopenharmony_ci{
9906f6ba60Sopenharmony_ci    std::unique_lock<std::mutex> mlock(mutex_);
10006f6ba60Sopenharmony_ci    size_t size = dataQueue_.size();
10106f6ba60Sopenharmony_ci    mlock.unlock();
10206f6ba60Sopenharmony_ci    return size;
10306f6ba60Sopenharmony_ci}
10406f6ba60Sopenharmony_ci
10506f6ba60Sopenharmony_cibool PowerMessageQueue::Empty()
10606f6ba60Sopenharmony_ci{
10706f6ba60Sopenharmony_ci    std::unique_lock<std::mutex> mlock(mutex_);
10806f6ba60Sopenharmony_ci    bool isEmpty = dataQueue_.empty();
10906f6ba60Sopenharmony_ci    mlock.unlock();
11006f6ba60Sopenharmony_ci    return isEmpty;
11106f6ba60Sopenharmony_ci}
112