1/*
2 * Copyright (c) 2022-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#define HST_LOG_TAG "DataStreamImpl"
17
18#include "data_stream_impl.h"
19#include "foundation/log.h"
20
21namespace OHOS {
22namespace Media {
23DataStreamImpl::DataStreamImpl(size_t size, size_t count, MemoryType type)
24{
25    FALSE_LOG(type == MemoryType::VIRTUAL_ADDR);
26    for (size_t i = 0; i < count; ++i) {
27        auto buffer = std::make_shared<VirtualDataBuffer>(size);
28        emptyBuffers_.push(buffer);
29        allBuffers_.emplace_back(buffer);
30    }
31}
32
33bool DataStreamImpl::GetDataBuffer(std::shared_ptr<DataBuffer>& buffer, int timeout)
34{
35    OSAL::ScopedLock lock(dataMutex_);
36    if (dataBuffers_.empty()) {
37        dataCondition_.Wait(lock, [this] { return !dataBuffers_.empty(); });
38    }
39    buffer = dataBuffers_.front();
40    dataBuffers_.pop();
41    return true;
42}
43
44bool DataStreamImpl::QueueEmptyBuffer(const std::shared_ptr<DataBuffer>& buffer)
45{
46    OSAL::ScopedLock lock(emptyMutex_);
47    emptyBuffers_.push(buffer);
48    emptyCondition_.NotifyAll();
49    return true;
50}
51
52bool DataStreamImpl::QueueEmptyBuffer(uint8_t* address)
53{
54    OSAL::ScopedLock lock(emptyMutex_);
55    for (size_t i = 0; i < allBuffers_.size(); i++) {
56        if (allBuffers_[i]->GetAddress() == address) {
57            emptyBuffers_.push(allBuffers_[i]);
58            emptyCondition_.NotifyAll();
59            return true;
60        }
61    }
62    MEDIA_LOG_E("Queue empty buffer address not in DataStream.");
63    return false;
64}
65
66bool DataStreamImpl::GetEmptyBuffer(std::shared_ptr<DataBuffer>& buffer, int timeout)
67{
68    OSAL::ScopedLock lock(emptyMutex_);
69    if (emptyBuffers_.empty()) {
70        emptyCondition_.Wait(lock, [this] { return !emptyBuffers_.empty(); });
71    }
72    buffer = emptyBuffers_.front();
73    emptyBuffers_.pop();
74    return true;
75}
76
77bool DataStreamImpl::QueueDataBuffer(const std::shared_ptr<DataBuffer>& buffer)
78{
79    OSAL::ScopedLock lock(dataMutex_);
80    dataBuffers_.push(buffer);
81    dataCondition_.NotifyAll();
82    return true;
83}
84
85VirtualDataBuffer::VirtualDataBuffer(size_t capacity) : capacity_(capacity)
86{
87    address_ = new uint8_t[capacity];
88}
89
90VirtualDataBuffer::~VirtualDataBuffer()
91{
92    delete [] address_;
93}
94
95bool VirtualDataBuffer::IsEos()
96{
97    return isEos_;
98}
99
100void VirtualDataBuffer::SetEos(bool isEos)
101{
102    isEos_ = isEos;
103}
104
105uint8_t* VirtualDataBuffer::GetAddress()
106{
107    return address_;
108}
109
110size_t VirtualDataBuffer::GetCapacity()
111{
112    return capacity_;
113}
114
115size_t VirtualDataBuffer::GetSize()
116{
117    return size_;
118}
119
120void VirtualDataBuffer::SetSize(size_t size)
121{
122    size_ = size;
123}
124} // Media
125} // OHOS