1/* 2 * Copyright (c) 2022-2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#define HST_LOG_TAG "DataStreamImpl" 17 18#include "data_stream_impl.h" 19#include "foundation/log.h" 20 21namespace OHOS { 22namespace Media { 23DataStreamImpl::DataStreamImpl(size_t size, size_t count, MemoryType type) 24{ 25 FALSE_LOG(type == MemoryType::VIRTUAL_ADDR); 26 for (size_t i = 0; i < count; ++i) { 27 auto buffer = std::make_shared<VirtualDataBuffer>(size); 28 emptyBuffers_.push(buffer); 29 allBuffers_.emplace_back(buffer); 30 } 31} 32 33bool DataStreamImpl::GetDataBuffer(std::shared_ptr<DataBuffer>& buffer, int timeout) 34{ 35 OSAL::ScopedLock lock(dataMutex_); 36 if (dataBuffers_.empty()) { 37 dataCondition_.Wait(lock, [this] { return !dataBuffers_.empty(); }); 38 } 39 buffer = dataBuffers_.front(); 40 dataBuffers_.pop(); 41 return true; 42} 43 44bool DataStreamImpl::QueueEmptyBuffer(const std::shared_ptr<DataBuffer>& buffer) 45{ 46 OSAL::ScopedLock lock(emptyMutex_); 47 emptyBuffers_.push(buffer); 48 emptyCondition_.NotifyAll(); 49 return true; 50} 51 52bool DataStreamImpl::QueueEmptyBuffer(uint8_t* address) 53{ 54 OSAL::ScopedLock lock(emptyMutex_); 55 for (size_t i = 0; i < allBuffers_.size(); i++) { 56 if (allBuffers_[i]->GetAddress() == address) { 57 emptyBuffers_.push(allBuffers_[i]); 58 emptyCondition_.NotifyAll(); 59 return true; 60 } 61 } 62 MEDIA_LOG_E("Queue empty buffer address not in DataStream."); 63 return false; 64} 65 66bool DataStreamImpl::GetEmptyBuffer(std::shared_ptr<DataBuffer>& buffer, int timeout) 67{ 68 OSAL::ScopedLock lock(emptyMutex_); 69 if (emptyBuffers_.empty()) { 70 emptyCondition_.Wait(lock, [this] { return !emptyBuffers_.empty(); }); 71 } 72 buffer = emptyBuffers_.front(); 73 emptyBuffers_.pop(); 74 return true; 75} 76 77bool DataStreamImpl::QueueDataBuffer(const std::shared_ptr<DataBuffer>& buffer) 78{ 79 OSAL::ScopedLock lock(dataMutex_); 80 dataBuffers_.push(buffer); 81 dataCondition_.NotifyAll(); 82 return true; 83} 84 85VirtualDataBuffer::VirtualDataBuffer(size_t capacity) : capacity_(capacity) 86{ 87 address_ = new uint8_t[capacity]; 88} 89 90VirtualDataBuffer::~VirtualDataBuffer() 91{ 92 delete [] address_; 93} 94 95bool VirtualDataBuffer::IsEos() 96{ 97 return isEos_; 98} 99 100void VirtualDataBuffer::SetEos(bool isEos) 101{ 102 isEos_ = isEos; 103} 104 105uint8_t* VirtualDataBuffer::GetAddress() 106{ 107 return address_; 108} 109 110size_t VirtualDataBuffer::GetCapacity() 111{ 112 return capacity_; 113} 114 115size_t VirtualDataBuffer::GetSize() 116{ 117 return size_; 118} 119 120void VirtualDataBuffer::SetSize(size_t size) 121{ 122 size_ = size; 123} 124} // Media 125} // OHOS