1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <algorithm>
17 #include "parameter.h"
18 #include "soundpool.h"
19 #include "media_log.h"
20 #include "media_errors.h"
21 #include "stream_id_manager.h"
22
23 namespace {
24 // audiorender max concurrency.
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN_SOUNDPOOL, "StreamIDManager"};
26 static const std::string THREAD_POOL_NAME = "OS_StreamMgr";
27 static const int32_t MAX_THREADS_NUM = std::thread::hardware_concurrency() >= 4 ? 2 : 1;
28 }
29
30 namespace OHOS {
31 namespace Media {
StreamIDManager(int32_t maxStreams, AudioStandard::AudioRendererInfo audioRenderInfo)32 StreamIDManager::StreamIDManager(int32_t maxStreams,
33 AudioStandard::AudioRendererInfo audioRenderInfo) : audioRendererInfo_(audioRenderInfo), maxStreams_(maxStreams)
34 {
35 MEDIA_LOGI("Construction StreamIDManager.");
36 InitThreadPool();
37 }
38
~StreamIDManager()39 StreamIDManager::~StreamIDManager()
40 {
41 MEDIA_LOGI("Destruction StreamIDManager");
42 if (callback_ != nullptr) {
43 callback_.reset();
44 }
45 if (frameWriteCallback_ != nullptr) {
46 frameWriteCallback_.reset();
47 }
48 for (auto cacheBuffer : cacheBuffers_) {
49 if (cacheBuffer.second != nullptr) {
50 int32_t streamID = cacheBuffer.second->GetStreamID();
51 cacheBuffer.second->Stop(streamID);
52 cacheBuffer.second->Release();
53 }
54 }
55 cacheBuffers_.clear();
56 if (isStreamPlayingThreadPoolStarted_.load()) {
57 if (streamPlayingThreadPool_ != nullptr) {
58 streamPlayingThreadPool_->Stop();
59 }
60 isStreamPlayingThreadPoolStarted_.store(false);
61 }
62 }
63
InitThreadPool()64 int32_t StreamIDManager::InitThreadPool()
65 {
66 if (isStreamPlayingThreadPoolStarted_.load()) {
67 return MSERR_OK;
68 }
69 streamPlayingThreadPool_ = std::make_unique<ThreadPool>(THREAD_POOL_NAME);
70 CHECK_AND_RETURN_RET_LOG(streamPlayingThreadPool_ != nullptr, MSERR_INVALID_VAL,
71 "Failed to obtain playing ThreadPool");
72 if (maxStreams_ > MAX_PLAY_STREAMS_NUMBER) {
73 maxStreams_ = MAX_PLAY_STREAMS_NUMBER;
74 MEDIA_LOGI("more than max play stream number, align to max play strem number.");
75 }
76 if (maxStreams_ < MIN_PLAY_STREAMS_NUMBER) {
77 maxStreams_ = MIN_PLAY_STREAMS_NUMBER;
78 MEDIA_LOGI("less than min play stream number, align to min play strem number.");
79 }
80 MEDIA_LOGI("stream playing thread pool maxStreams_:%{public}d", maxStreams_);
81 // For stream priority logic, thread num need align to task num.
82 streamPlayingThreadPool_->Start(maxStreams_);
83 streamPlayingThreadPool_->SetMaxTaskNum(maxStreams_);
84 isStreamPlayingThreadPoolStarted_.store(true);
85
86 return MSERR_OK;
87 }
88
Play(std::shared_ptr<SoundParser> soundParser, PlayParams playParameters)89 int32_t StreamIDManager::Play(std::shared_ptr<SoundParser> soundParser, PlayParams playParameters)
90 {
91 MediaTrace trace("StreamIDManager::Play");
92 CHECK_AND_RETURN_RET_LOG(soundParser != nullptr, -1, "Invalid soundParser.");
93 int32_t soundID = soundParser->GetSoundID();
94 int32_t streamID = GetFreshStreamID(soundID, playParameters);
95 {
96 std::lock_guard lock(streamIDManagerLock_);
97 if (streamID <= 0) {
98 do {
99 nextStreamID_ = nextStreamID_ == INT32_MAX ? 1 : nextStreamID_ + 1;
100 } while (FindCacheBuffer(nextStreamID_) != nullptr);
101 streamID = nextStreamID_;
102 std::deque<std::shared_ptr<AudioBufferEntry>> cacheData;
103 soundParser->GetSoundData(cacheData);
104 size_t cacheDataTotalSize = soundParser->GetSoundDataTotalSize();
105 MEDIA_LOGI("cacheData size:%{public}zu , cacheDataTotalSize:%{public}zu",
106 cacheData.size(), cacheDataTotalSize);
107 auto cacheBuffer =
108 std::make_shared<CacheBuffer>(soundParser->GetSoundTrackFormat(), cacheData, cacheDataTotalSize,
109 soundID, streamID);
110 CHECK_AND_RETURN_RET_LOG(cacheBuffer != nullptr, -1, "failed to create cache buffer");
111 CHECK_AND_RETURN_RET_LOG(callback_ != nullptr, MSERR_INVALID_VAL, "Invalid callback.");
112 cacheBuffer->SetCallback(callback_);
113 cacheBufferCallback_ = std::make_shared<CacheBufferCallBack>(weak_from_this());
114 CHECK_AND_RETURN_RET_LOG(cacheBufferCallback_ != nullptr, MSERR_INVALID_VAL,
115 "Invalid cachebuffer callback");
116 cacheBuffer->SetCacheBufferCallback(cacheBufferCallback_);
117 if (frameWriteCallback_ != nullptr) {
118 cacheBuffer->SetFrameWriteCallback(frameWriteCallback_);
119 }
120 cacheBuffers_.emplace(streamID, cacheBuffer);
121 }
122 }
123 MEDIA_LOGI("StreamIDManager::SetPlay start soundID:%{public}d, streamID:%{public}d", soundID, streamID);
124 SetPlay(soundID, streamID, playParameters);
125 return streamID;
126 }
127
SetPlay(const int32_t soundID, const int32_t streamID, const PlayParams playParameters)128 int32_t StreamIDManager::SetPlay(const int32_t soundID, const int32_t streamID, const PlayParams playParameters)
129 {
130 MediaTrace trace("StreamIDManager::SetPlay");
131 if (!isStreamPlayingThreadPoolStarted_.load()) {
132 InitThreadPool();
133 }
134
135 CHECK_AND_RETURN_RET_LOG(streamPlayingThreadPool_ != nullptr, MSERR_INVALID_VAL,
136 "Failed to obtain stream play threadpool.");
137 // CacheBuffer must prepare before play.
138 std::shared_ptr<CacheBuffer> freshCacheBuffer = FindCacheBuffer(streamID);
139 CHECK_AND_RETURN_RET_LOG(freshCacheBuffer != nullptr, -1, "Invalid fresh cache buffer");
140 freshCacheBuffer->PreparePlay(streamID, audioRendererInfo_, playParameters);
141 int32_t tempMaxStream = maxStreams_;
142 MEDIA_LOGI("StreamIDManager cur task num:%{public}zu, maxStreams_:%{public}d",
143 playingStreamIDs_.size(), maxStreams_);
144 if (playingStreamIDs_.size() < static_cast<size_t>(tempMaxStream)) {
145 AddPlayTask(streamID, playParameters);
146 } else {
147 int32_t playingStreamID = playingStreamIDs_.back();
148 std::shared_ptr<CacheBuffer> playingCacheBuffer = FindCacheBuffer(playingStreamID);
149 CHECK_AND_RETURN_RET_LOG(freshCacheBuffer != nullptr, -1, "Invalid fresh cache buffer");
150 CHECK_AND_RETURN_RET_LOG(playingCacheBuffer != nullptr, -1, "Invalid playingCacheBuffer");
151 MEDIA_LOGI("StreamIDManager fresh sound priority:%{public}d, playing stream priority:%{public}d",
152 freshCacheBuffer->GetPriority(), playingCacheBuffer->GetPriority());
153 if (freshCacheBuffer->GetPriority() >= playingCacheBuffer->GetPriority()) {
154 MEDIA_LOGI("StreamIDManager stop playing low priority sound:%{public}d", playingStreamID);
155 playingCacheBuffer->Stop(playingStreamID);
156 MEDIA_LOGI("StreamIDManager to playing fresh streamID:%{public}d.", streamID);
157 AddPlayTask(streamID, playParameters);
158 } else {
159 std::lock_guard lock(streamIDManagerLock_);
160 MEDIA_LOGI("StreamIDManager queue will play streams, streamID:%{public}d.", streamID);
161 StreamIDAndPlayParamsInfo freshStreamIDAndPlayParamsInfo;
162 freshStreamIDAndPlayParamsInfo.streamID = streamID;
163 freshStreamIDAndPlayParamsInfo.playParameters = playParameters;
164 QueueAndSortWillPlayStreamID(freshStreamIDAndPlayParamsInfo);
165 }
166 }
167 for (size_t i = 0; i < playingStreamIDs_.size(); i++) {
168 int32_t playingStreamID = playingStreamIDs_[i];
169 MEDIA_LOGD("StreamIDManager::SetPlay playingStreamID:%{public}d", playingStreamID);
170 }
171 for (size_t i = 0; i < willPlayStreamInfos_.size(); i++) {
172 StreamIDAndPlayParamsInfo willPlayInfo = willPlayStreamInfos_[i];
173 MEDIA_LOGD("StreamIDManager::SetPlay willPlayStreamID:%{public}d", willPlayInfo.streamID);
174 }
175 return MSERR_OK;
176 }
177
178 // Sort in descending order
179 // 0 has the lowest priority, and the higher the value, the higher the priority
180 // The queue head has the highest value and priority
QueueAndSortPlayingStreamID(int32_t streamID)181 void StreamIDManager::QueueAndSortPlayingStreamID(int32_t streamID)
182 {
183 if (playingStreamIDs_.empty()) {
184 playingStreamIDs_.emplace_back(streamID);
185 } else {
186 bool shouldReCombinePlayingQueue = false;
187 for (size_t i = 0; i < playingStreamIDs_.size(); i++) {
188 int32_t playingStreamID = playingStreamIDs_[i];
189 std::shared_ptr<CacheBuffer> freshCacheBuffer = FindCacheBuffer(streamID);
190 std::shared_ptr<CacheBuffer> playingCacheBuffer = FindCacheBuffer(playingStreamID);
191 if (playingCacheBuffer == nullptr) {
192 playingStreamIDs_.erase(playingStreamIDs_.begin() + i);
193 shouldReCombinePlayingQueue = true;
194 break;
195 }
196 if (freshCacheBuffer == nullptr) {
197 break;
198 }
199 if (freshCacheBuffer->GetPriority() >= playingCacheBuffer->GetPriority()) {
200 playingStreamIDs_.insert(playingStreamIDs_.begin() + i, streamID);
201 break;
202 }
203 if (playingStreamIDs_.size() >= 1 && i == playingStreamIDs_.size() - 1 &&
204 freshCacheBuffer->GetPriority() < playingCacheBuffer->GetPriority()) {
205 playingStreamIDs_.push_back(streamID);
206 break;
207 }
208 }
209 if (shouldReCombinePlayingQueue) {
210 QueueAndSortPlayingStreamID(streamID);
211 }
212 }
213 }
214
215 // Sort in descending order.
216 // 0 has the lowest priority, and the higher the value, the higher the priority
217 // The queue head has the highest value and priority
QueueAndSortWillPlayStreamID(StreamIDAndPlayParamsInfo freshStreamIDAndPlayParamsInfo)218 void StreamIDManager::QueueAndSortWillPlayStreamID(StreamIDAndPlayParamsInfo freshStreamIDAndPlayParamsInfo)
219 {
220 if (willPlayStreamInfos_.empty()) {
221 willPlayStreamInfos_.emplace_back(freshStreamIDAndPlayParamsInfo);
222 } else {
223 bool shouldReCombineWillPlayQueue = false;
224 for (size_t i = 0; i < willPlayStreamInfos_.size(); i++) {
225 std::shared_ptr<CacheBuffer> freshCacheBuffer = FindCacheBuffer(freshStreamIDAndPlayParamsInfo.streamID);
226 std::shared_ptr<CacheBuffer> willPlayCacheBuffer = FindCacheBuffer(willPlayStreamInfos_[i].streamID);
227 if (willPlayCacheBuffer == nullptr) {
228 willPlayStreamInfos_.erase(willPlayStreamInfos_.begin() + i);
229 shouldReCombineWillPlayQueue = true;
230 break;
231 }
232 if (freshCacheBuffer == nullptr) {
233 break;
234 }
235 if (freshCacheBuffer->GetPriority() >= willPlayCacheBuffer->GetPriority()) {
236 willPlayStreamInfos_.insert(willPlayStreamInfos_.begin() + i, freshStreamIDAndPlayParamsInfo);
237 break;
238 }
239 if (willPlayStreamInfos_.size() >= 1 && i == willPlayStreamInfos_.size() - 1 &&
240 freshCacheBuffer->GetPriority() < willPlayCacheBuffer->GetPriority()) {
241 willPlayStreamInfos_.push_back(freshStreamIDAndPlayParamsInfo);
242 break;
243 }
244 }
245 if (shouldReCombineWillPlayQueue) {
246 QueueAndSortWillPlayStreamID(freshStreamIDAndPlayParamsInfo);
247 }
248 }
249 }
250
AddPlayTask(const int32_t streamID, const PlayParams playParameters)251 int32_t StreamIDManager::AddPlayTask(const int32_t streamID, const PlayParams playParameters)
252 {
253 ThreadPool::Task streamPlayTask = [this, streamID] { this->DoPlay(streamID); };
254 CHECK_AND_RETURN_RET_LOG(streamPlayingThreadPool_ != nullptr, MSERR_INVALID_VAL,
255 "Failed to obtain playing ThreadPool");
256 CHECK_AND_RETURN_RET_LOG(streamPlayTask != nullptr, MSERR_INVALID_VAL, "Failed to obtain stream play Task");
257 streamPlayingThreadPool_->AddTask(streamPlayTask);
258 std::lock_guard lock(streamIDManagerLock_);
259 QueueAndSortPlayingStreamID(streamID);
260 return MSERR_OK;
261 }
262
DoPlay(const int32_t streamID)263 int32_t StreamIDManager::DoPlay(const int32_t streamID)
264 {
265 MEDIA_LOGI("StreamIDManager::DoPlay start streamID:%{public}d", streamID);
266 std::shared_ptr<CacheBuffer> cacheBuffer = FindCacheBuffer(streamID);
267 CHECK_AND_RETURN_RET_LOG(cacheBuffer.get() != nullptr, MSERR_INVALID_VAL, "cachebuffer invalid.");
268 if (cacheBuffer->DoPlay(streamID) == MSERR_OK) {
269 MEDIA_LOGI("StreamIDManager::DoPlay success streamID:%{public}d", streamID);
270 return MSERR_OK;
271 }
272 MEDIA_LOGI("StreamIDManager::DoPlay failed streamID:%{public}d", streamID);
273 {
274 std::lock_guard lock(streamIDManagerLock_);
275 for (int32_t i = 0; i < static_cast<int32_t>(playingStreamIDs_.size()); i++) {
276 int32_t playingStreamID = playingStreamIDs_[i];
277 std::shared_ptr<CacheBuffer> playingCacheBuffer = FindCacheBuffer(playingStreamID);
278 if (playingCacheBuffer != nullptr && !playingCacheBuffer->IsRunning()) {
279 MEDIA_LOGI("StreamIDManager::DoPlay fail erase playingStreamID:%{public}d", playingStreamID);
280 playingStreamIDs_.erase(playingStreamIDs_.begin() + i);
281 i--;
282 }
283 }
284 }
285 return MSERR_INVALID_VAL;
286 }
287
FindCacheBuffer(const int32_t streamID)288 std::shared_ptr<CacheBuffer> StreamIDManager::FindCacheBuffer(const int32_t streamID)
289 {
290 if (cacheBuffers_.empty()) {
291 MEDIA_LOGI("StreamIDManager::FindCacheBuffer cacheBuffers_ empty");
292 return nullptr;
293 }
294 CHECK_AND_RETURN_RET_LOG(streamID >= 0, nullptr, "streamID invalid.");
295 if (cacheBuffers_.find(streamID) != cacheBuffers_.end()) {
296 return cacheBuffers_.at(streamID);
297 }
298 return nullptr;
299 }
300
GetStreamIDBySoundID(const int32_t soundID)301 int32_t StreamIDManager::GetStreamIDBySoundID(const int32_t soundID)
302 {
303 PlayParams playParameters;
304 return GetFreshStreamID(soundID, playParameters);
305 }
306
ReorderStream(int32_t streamID, int32_t priority)307 int32_t StreamIDManager::ReorderStream(int32_t streamID, int32_t priority)
308 {
309 std::lock_guard lock(streamIDManagerLock_);
310 int32_t playingSize = static_cast<int32_t>(playingStreamIDs_.size());
311 for (int32_t i = 0; i < playingSize - 1; ++i) {
312 for (int32_t j = 0; j < playingSize - 1 - i; ++j) {
313 std::shared_ptr<CacheBuffer> left = FindCacheBuffer(playingStreamIDs_[j]);
314 std::shared_ptr<CacheBuffer> right = FindCacheBuffer(playingStreamIDs_[j + 1]);
315 if (left != nullptr && right != nullptr && left->GetPriority() < right->GetPriority()) {
316 int32_t streamIdTemp = playingStreamIDs_[j];
317 playingStreamIDs_[j] = playingStreamIDs_[j + 1];
318 playingStreamIDs_[j + 1] = streamIdTemp;
319 }
320 }
321 }
322 for (size_t i = 0; i < playingStreamIDs_.size(); i++) {
323 int32_t playingStreamID = playingStreamIDs_[i];
324 MEDIA_LOGD("StreamIDManager::ReorderStream playingStreamID:%{public}d", playingStreamID);
325 }
326
327 int32_t willPlaySize = static_cast<int32_t>(willPlayStreamInfos_.size());
328 for (int32_t i = 0; i < willPlaySize - 1; ++i) {
329 for (int32_t j = 0; j < willPlaySize - 1 - i; ++j) {
330 std::shared_ptr<CacheBuffer> left = FindCacheBuffer(willPlayStreamInfos_[j].streamID);
331 std::shared_ptr<CacheBuffer> right = FindCacheBuffer(willPlayStreamInfos_[j + 1].streamID);
332 if (left != nullptr && right != nullptr && left->GetPriority() < right->GetPriority()) {
333 StreamIDAndPlayParamsInfo willPlayInfoTemp = willPlayStreamInfos_[j];
334 willPlayStreamInfos_[j] = willPlayStreamInfos_[j + 1];
335 willPlayStreamInfos_[j + 1] = willPlayInfoTemp;
336 }
337 }
338 }
339 for (size_t i = 0; i < willPlayStreamInfos_.size(); i++) {
340 StreamIDAndPlayParamsInfo willPlayInfo = willPlayStreamInfos_[i];
341 MEDIA_LOGD("StreamIDManager::ReorderStream willPlayStreamID:%{public}d", willPlayInfo.streamID);
342 }
343 return MSERR_OK;
344 }
345
ClearStreamIDInDeque(int32_t streamID)346 int32_t StreamIDManager::ClearStreamIDInDeque(int32_t streamID)
347 {
348 std::lock_guard lock(streamIDManagerLock_);
349 for (auto it = playingStreamIDs_.begin(); it != playingStreamIDs_.end();) {
350 if (*it == streamID) {
351 MEDIA_LOGI("StreamIDManager::ClearStreamIDInDeque playingDel streamID:%{public}d", streamID);
352 it = playingStreamIDs_.erase(it);
353 } else {
354 ++it;
355 }
356 }
357 for (auto it = willPlayStreamInfos_.begin(); it != willPlayStreamInfos_.end();) {
358 if (it->streamID == streamID) {
359 MEDIA_LOGI("StreamIDManager::ClearStreamIDInDeque willPlayDel streamID:%{public}d", streamID);
360 it = willPlayStreamInfos_.erase(it);
361 } else {
362 ++it;
363 }
364 }
365 return MSERR_OK;
366 }
367
GetFreshStreamID(const int32_t soundID, PlayParams playParameters)368 int32_t StreamIDManager::GetFreshStreamID(const int32_t soundID, PlayParams playParameters)
369 {
370 int32_t streamID = 0;
371 if (cacheBuffers_.empty()) {
372 MEDIA_LOGI("StreamIDManager::GetFreshStreamID cacheBuffers_ empty");
373 return streamID;
374 }
375 for (auto cacheBuffer : cacheBuffers_) {
376 if (cacheBuffer.second == nullptr) {
377 MEDIA_LOGE("Invalid cacheBuffer, soundID:%{public}d", soundID);
378 continue;
379 }
380 if (soundID == cacheBuffer.second->GetSoundID()) {
381 streamID = cacheBuffer.second->GetStreamID();
382 MEDIA_LOGI("Have cache soundID:%{public}d, streamID:%{public}d", soundID, streamID);
383 break;
384 }
385 }
386 return streamID;
387 }
388
OnPlayFinished()389 void StreamIDManager::OnPlayFinished()
390 {
391 {
392 std::lock_guard lock(streamIDManagerLock_);
393 for (int32_t i = 0; i < static_cast<int32_t>(playingStreamIDs_.size()); i++) {
394 int32_t playingStreamID = playingStreamIDs_[i];
395 std::shared_ptr<CacheBuffer> playingCacheBuffer = FindCacheBuffer(playingStreamID);
396 if (playingCacheBuffer != nullptr && !playingCacheBuffer->IsRunning()) {
397 MEDIA_LOGI("StreamIDManager::OnPlayFinished erase playingStreamID:%{public}d", playingStreamID);
398 playingStreamIDs_.erase(playingStreamIDs_.begin() + i);
399 i--;
400 }
401 }
402 }
403 if (!willPlayStreamInfos_.empty()) {
404 MEDIA_LOGI("StreamIDManager OnPlayFinished will play streams non empty, get the front.");
405 StreamIDAndPlayParamsInfo willPlayStreamInfo = willPlayStreamInfos_.front();
406 AddPlayTask(willPlayStreamInfo.streamID, willPlayStreamInfo.playParameters);
407 std::lock_guard lock(streamIDManagerLock_);
408 willPlayStreamInfos_.pop_front();
409 }
410 }
411
SetCallback(const std::shared_ptr<ISoundPoolCallback> &callback)412 int32_t StreamIDManager::SetCallback(const std::shared_ptr<ISoundPoolCallback> &callback)
413 {
414 callback_ = callback;
415 return MSERR_OK;
416 }
417
SetFrameWriteCallback(const std::shared_ptr<ISoundPoolFrameWriteCallback> &callback)418 int32_t StreamIDManager::SetFrameWriteCallback(const std::shared_ptr<ISoundPoolFrameWriteCallback> &callback)
419 {
420 frameWriteCallback_ = callback;
421 return MSERR_OK;
422 }
423 } // namespace Media
424 } // namespace OHOS
425