1/*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15#ifndef INTELL_VOICE_BUFFER_QUEUE_H
16#define INTELL_VOICE_BUFFER_QUEUE_H
17
18#include <unistd.h>
19#include <queue>
20#include <mutex>
21#include <condition_variable>
22#include <chrono>
23#include "array_buffer_util.h"
24#include "intell_voice_log.h"
25
26#define LOG_TAG "QueueUtil"
27
28namespace OHOS {
29namespace IntellVoiceUtils {
30constexpr uint32_t MAX_CAPACITY = 500;
31
32template <typename T>
33class QueueUtil {
34public:
35    QueueUtil() = default;
36    ~QueueUtil()
37    {
38        Uninit();
39    }
40    bool Init(uint32_t capacity = MAX_CAPACITY)
41    {
42        std::unique_lock<std::mutex> lock(queueMutex_);
43        SetAvailable(true);
44        capacity_ = capacity;
45        return true;
46    }
47    bool Push(const T &element, bool isWait = true)
48    {
49        std::unique_lock<std::mutex> lock(queueMutex_);
50        CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
51
52        while (queue_.size() >= capacity_) {
53            CHECK_CONDITION_RETURN_FALSE((!isWait), "queue is full, no need to wait");
54            notFullCv_.wait(lock, [&]() { return ((queue_.size() < capacity_) || (!IsAvailable())); });
55            CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
56        }
57
58        queue_.push(element);
59        notEmptyCv_.notify_one();
60        return true;
61    }
62    bool Push(T &&element, bool isWait = true)
63    {
64        std::unique_lock<std::mutex> lock(queueMutex_);
65        CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
66
67        while (queue_.size() >= capacity_) {
68            CHECK_CONDITION_RETURN_FALSE((!isWait), "queue is full, no need to wait");
69            notFullCv_.wait(lock, [&]() { return ((queue_.size() < capacity_) || (!IsAvailable())); });
70            CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
71        }
72
73        queue_.push(std::move(element));
74        notEmptyCv_.notify_one();
75        return true;
76    }
77    bool Pop(T &element)
78    {
79        std::unique_lock<std::mutex> lock(queueMutex_);
80        CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
81
82        while (queue_.empty()) {
83            notEmptyCv_.wait(lock, [&] { return (!queue_.empty() || !IsAvailable()); });
84            CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
85        }
86
87        element = std::move(queue_.front());
88        queue_.pop();
89        notFullCv_.notify_one();
90        return true;
91    }
92    bool PopUntilTimeout(uint32_t timeLenMs, T &element)
93    {
94        std::unique_lock<std::mutex> lock(queueMutex_);
95        CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
96
97        while (queue_.empty()) {
98            if (!(notEmptyCv_.wait_for(lock, std::chrono::milliseconds(timeLenMs),
99                [&] { return (!queue_.empty() || !IsAvailable()); }))) {
100                INTELL_VOICE_LOG_WARN("wait time out");
101                return false;
102            }
103            CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
104        }
105
106        element = std::move(queue_.front());
107        queue_.pop();
108        notFullCv_.notify_one();
109        return true;
110    }
111    void Uninit()
112    {
113        {
114            std::unique_lock<std::mutex> lock(queueMutex_);
115            capacity_ = 0;
116            ClearQueue();
117            SetAvailable(false);
118        }
119        notEmptyCv_.notify_all();
120        notFullCv_.notify_all();
121    }
122private:
123    bool IsAvailable() const
124    {
125        return isAvailable_;
126    }
127    void SetAvailable(bool isAvailable)
128    {
129        isAvailable_ = isAvailable;
130    }
131    void ClearQueue()
132    {
133        while (!queue_.empty()) {
134            queue_.pop();
135        }
136    }
137
138private:
139    bool isAvailable_ = false;
140    uint32_t capacity_ = 0;
141    std::mutex queueMutex_;
142    std::condition_variable notEmptyCv_;
143    std::condition_variable notFullCv_;
144    std::queue<T> queue_;
145};
146
147using Uint8ArrayBufferQueue = QueueUtil<std::unique_ptr<Uint8ArrayBuffer>>;
148}
149}
150
151#undef LOG_TAG
152
153#endif