1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <algorithm>
17 #include "parameter.h"
18 #include "soundpool.h"
19 #include "media_log.h"
20 #include "media_errors.h"
21 #include "stream_id_manager.h"
22 
23 namespace {
24     // audiorender max concurrency.
25     constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN_SOUNDPOOL, "StreamIDManager"};
26     static const std::string THREAD_POOL_NAME = "OS_StreamMgr";
27     static const int32_t MAX_THREADS_NUM = std::thread::hardware_concurrency() >= 4 ? 2 : 1;
28 }
29 
30 namespace OHOS {
31 namespace Media {
StreamIDManager(int32_t maxStreams, AudioStandard::AudioRendererInfo audioRenderInfo)32 StreamIDManager::StreamIDManager(int32_t maxStreams,
33     AudioStandard::AudioRendererInfo audioRenderInfo) : audioRendererInfo_(audioRenderInfo), maxStreams_(maxStreams)
34 {
35     MEDIA_LOGI("Construction StreamIDManager.");
36     InitThreadPool();
37 }
38 
~StreamIDManager()39 StreamIDManager::~StreamIDManager()
40 {
41     MEDIA_LOGI("Destruction StreamIDManager");
42     if (callback_ != nullptr) {
43         callback_.reset();
44     }
45     if (frameWriteCallback_ != nullptr) {
46         frameWriteCallback_.reset();
47     }
48     for (auto cacheBuffer : cacheBuffers_) {
49         if (cacheBuffer.second != nullptr) {
50             int32_t streamID = cacheBuffer.second->GetStreamID();
51             cacheBuffer.second->Stop(streamID);
52             cacheBuffer.second->Release();
53         }
54     }
55     cacheBuffers_.clear();
56     if (isStreamPlayingThreadPoolStarted_.load()) {
57         if (streamPlayingThreadPool_ != nullptr) {
58             streamPlayingThreadPool_->Stop();
59         }
60         isStreamPlayingThreadPoolStarted_.store(false);
61     }
62 }
63 
InitThreadPool()64 int32_t StreamIDManager::InitThreadPool()
65 {
66     if (isStreamPlayingThreadPoolStarted_.load()) {
67         return MSERR_OK;
68     }
69     streamPlayingThreadPool_ = std::make_unique<ThreadPool>(THREAD_POOL_NAME);
70     CHECK_AND_RETURN_RET_LOG(streamPlayingThreadPool_ != nullptr, MSERR_INVALID_VAL,
71         "Failed to obtain playing ThreadPool");
72     if (maxStreams_ > MAX_PLAY_STREAMS_NUMBER) {
73         maxStreams_ = MAX_PLAY_STREAMS_NUMBER;
74         MEDIA_LOGI("more than max play stream number, align to max play strem number.");
75     }
76     if (maxStreams_ < MIN_PLAY_STREAMS_NUMBER) {
77         maxStreams_ = MIN_PLAY_STREAMS_NUMBER;
78         MEDIA_LOGI("less than min play stream number, align to min play strem number.");
79     }
80     MEDIA_LOGI("stream playing thread pool maxStreams_:%{public}d", maxStreams_);
81     // For stream priority logic, thread num need align to task num.
82     streamPlayingThreadPool_->Start(maxStreams_);
83     streamPlayingThreadPool_->SetMaxTaskNum(maxStreams_);
84     isStreamPlayingThreadPoolStarted_.store(true);
85 
86     return MSERR_OK;
87 }
88 
Play(std::shared_ptr<SoundParser> soundParser, PlayParams playParameters)89 int32_t StreamIDManager::Play(std::shared_ptr<SoundParser> soundParser, PlayParams playParameters)
90 {
91     MediaTrace trace("StreamIDManager::Play");
92     CHECK_AND_RETURN_RET_LOG(soundParser != nullptr, -1, "Invalid soundParser.");
93     int32_t soundID = soundParser->GetSoundID();
94     int32_t streamID = GetFreshStreamID(soundID, playParameters);
95     {
96         std::lock_guard lock(streamIDManagerLock_);
97         if (streamID <= 0) {
98             do {
99                 nextStreamID_ = nextStreamID_ == INT32_MAX ? 1 : nextStreamID_ + 1;
100             } while (FindCacheBuffer(nextStreamID_) != nullptr);
101             streamID = nextStreamID_;
102             std::deque<std::shared_ptr<AudioBufferEntry>> cacheData;
103             soundParser->GetSoundData(cacheData);
104             size_t cacheDataTotalSize = soundParser->GetSoundDataTotalSize();
105             MEDIA_LOGI("cacheData size:%{public}zu , cacheDataTotalSize:%{public}zu",
106                 cacheData.size(), cacheDataTotalSize);
107             auto cacheBuffer =
108                 std::make_shared<CacheBuffer>(soundParser->GetSoundTrackFormat(), cacheData, cacheDataTotalSize,
109                      soundID, streamID);
110             CHECK_AND_RETURN_RET_LOG(cacheBuffer != nullptr, -1, "failed to create cache buffer");
111             CHECK_AND_RETURN_RET_LOG(callback_ != nullptr, MSERR_INVALID_VAL, "Invalid callback.");
112             cacheBuffer->SetCallback(callback_);
113             cacheBufferCallback_ = std::make_shared<CacheBufferCallBack>(weak_from_this());
114             CHECK_AND_RETURN_RET_LOG(cacheBufferCallback_ != nullptr, MSERR_INVALID_VAL,
115                 "Invalid cachebuffer callback");
116             cacheBuffer->SetCacheBufferCallback(cacheBufferCallback_);
117             if (frameWriteCallback_ != nullptr) {
118                 cacheBuffer->SetFrameWriteCallback(frameWriteCallback_);
119             }
120             cacheBuffers_.emplace(streamID, cacheBuffer);
121         }
122     }
123     MEDIA_LOGI("StreamIDManager::SetPlay start soundID:%{public}d, streamID:%{public}d", soundID, streamID);
124     SetPlay(soundID, streamID, playParameters);
125     return streamID;
126 }
127 
SetPlay(const int32_t soundID, const int32_t streamID, const PlayParams playParameters)128 int32_t StreamIDManager::SetPlay(const int32_t soundID, const int32_t streamID, const PlayParams playParameters)
129 {
130     MediaTrace trace("StreamIDManager::SetPlay");
131     if (!isStreamPlayingThreadPoolStarted_.load()) {
132         InitThreadPool();
133     }
134 
135     CHECK_AND_RETURN_RET_LOG(streamPlayingThreadPool_ != nullptr, MSERR_INVALID_VAL,
136         "Failed to obtain stream play threadpool.");
137     // CacheBuffer must prepare before play.
138     std::shared_ptr<CacheBuffer> freshCacheBuffer = FindCacheBuffer(streamID);
139     CHECK_AND_RETURN_RET_LOG(freshCacheBuffer != nullptr, -1, "Invalid fresh cache buffer");
140     freshCacheBuffer->PreparePlay(streamID, audioRendererInfo_, playParameters);
141     int32_t tempMaxStream = maxStreams_;
142     MEDIA_LOGI("StreamIDManager cur task num:%{public}zu, maxStreams_:%{public}d",
143         playingStreamIDs_.size(), maxStreams_);
144     if (playingStreamIDs_.size() < static_cast<size_t>(tempMaxStream)) {
145         AddPlayTask(streamID, playParameters);
146     } else {
147         int32_t playingStreamID = playingStreamIDs_.back();
148         std::shared_ptr<CacheBuffer> playingCacheBuffer = FindCacheBuffer(playingStreamID);
149         CHECK_AND_RETURN_RET_LOG(freshCacheBuffer != nullptr, -1, "Invalid fresh cache buffer");
150         CHECK_AND_RETURN_RET_LOG(playingCacheBuffer != nullptr, -1, "Invalid playingCacheBuffer");
151         MEDIA_LOGI("StreamIDManager fresh sound priority:%{public}d, playing stream priority:%{public}d",
152             freshCacheBuffer->GetPriority(), playingCacheBuffer->GetPriority());
153         if (freshCacheBuffer->GetPriority() >= playingCacheBuffer->GetPriority()) {
154             MEDIA_LOGI("StreamIDManager stop playing low priority sound:%{public}d", playingStreamID);
155             playingCacheBuffer->Stop(playingStreamID);
156             MEDIA_LOGI("StreamIDManager to playing fresh streamID:%{public}d.", streamID);
157             AddPlayTask(streamID, playParameters);
158         } else {
159             std::lock_guard lock(streamIDManagerLock_);
160             MEDIA_LOGI("StreamIDManager queue will play streams, streamID:%{public}d.", streamID);
161             StreamIDAndPlayParamsInfo freshStreamIDAndPlayParamsInfo;
162             freshStreamIDAndPlayParamsInfo.streamID = streamID;
163             freshStreamIDAndPlayParamsInfo.playParameters = playParameters;
164             QueueAndSortWillPlayStreamID(freshStreamIDAndPlayParamsInfo);
165         }
166     }
167     for (size_t i = 0; i < playingStreamIDs_.size(); i++) {
168         int32_t playingStreamID = playingStreamIDs_[i];
169         MEDIA_LOGD("StreamIDManager::SetPlay  playingStreamID:%{public}d", playingStreamID);
170     }
171     for (size_t i = 0; i < willPlayStreamInfos_.size(); i++) {
172         StreamIDAndPlayParamsInfo willPlayInfo = willPlayStreamInfos_[i];
173         MEDIA_LOGD("StreamIDManager::SetPlay  willPlayStreamID:%{public}d", willPlayInfo.streamID);
174     }
175     return MSERR_OK;
176 }
177 
178 // Sort in descending order
179 // 0 has the lowest priority, and the higher the value, the higher the priority
180 // The queue head has the highest value and priority
QueueAndSortPlayingStreamID(int32_t streamID)181 void StreamIDManager::QueueAndSortPlayingStreamID(int32_t streamID)
182 {
183     if (playingStreamIDs_.empty()) {
184         playingStreamIDs_.emplace_back(streamID);
185     } else {
186         bool shouldReCombinePlayingQueue = false;
187         for (size_t i = 0; i < playingStreamIDs_.size(); i++) {
188             int32_t playingStreamID = playingStreamIDs_[i];
189             std::shared_ptr<CacheBuffer> freshCacheBuffer = FindCacheBuffer(streamID);
190             std::shared_ptr<CacheBuffer> playingCacheBuffer = FindCacheBuffer(playingStreamID);
191             if (playingCacheBuffer == nullptr) {
192                 playingStreamIDs_.erase(playingStreamIDs_.begin() + i);
193                 shouldReCombinePlayingQueue = true;
194                 break;
195             }
196             if (freshCacheBuffer == nullptr) {
197                 break;
198             }
199             if (freshCacheBuffer->GetPriority() >= playingCacheBuffer->GetPriority()) {
200                 playingStreamIDs_.insert(playingStreamIDs_.begin() + i, streamID);
201                 break;
202             }
203             if (playingStreamIDs_.size() >= 1 && i == playingStreamIDs_.size() - 1 &&
204                 freshCacheBuffer->GetPriority() < playingCacheBuffer->GetPriority()) {
205                 playingStreamIDs_.push_back(streamID);
206                 break;
207             }
208         }
209         if (shouldReCombinePlayingQueue) {
210             QueueAndSortPlayingStreamID(streamID);
211         }
212     }
213 }
214 
215 // Sort in descending order.
216 // 0 has the lowest priority, and the higher the value, the higher the priority
217 // The queue head has the highest value and priority
QueueAndSortWillPlayStreamID(StreamIDAndPlayParamsInfo freshStreamIDAndPlayParamsInfo)218 void StreamIDManager::QueueAndSortWillPlayStreamID(StreamIDAndPlayParamsInfo freshStreamIDAndPlayParamsInfo)
219 {
220     if (willPlayStreamInfos_.empty()) {
221         willPlayStreamInfos_.emplace_back(freshStreamIDAndPlayParamsInfo);
222     } else {
223         bool shouldReCombineWillPlayQueue = false;
224         for (size_t i = 0; i < willPlayStreamInfos_.size(); i++) {
225             std::shared_ptr<CacheBuffer> freshCacheBuffer = FindCacheBuffer(freshStreamIDAndPlayParamsInfo.streamID);
226             std::shared_ptr<CacheBuffer> willPlayCacheBuffer = FindCacheBuffer(willPlayStreamInfos_[i].streamID);
227             if (willPlayCacheBuffer == nullptr) {
228                 willPlayStreamInfos_.erase(willPlayStreamInfos_.begin() + i);
229                 shouldReCombineWillPlayQueue = true;
230                 break;
231             }
232             if (freshCacheBuffer == nullptr) {
233                 break;
234             }
235             if (freshCacheBuffer->GetPriority() >= willPlayCacheBuffer->GetPriority()) {
236                 willPlayStreamInfos_.insert(willPlayStreamInfos_.begin() + i, freshStreamIDAndPlayParamsInfo);
237                 break;
238             }
239             if (willPlayStreamInfos_.size() >= 1 && i == willPlayStreamInfos_.size() - 1 &&
240                 freshCacheBuffer->GetPriority() < willPlayCacheBuffer->GetPriority()) {
241                 willPlayStreamInfos_.push_back(freshStreamIDAndPlayParamsInfo);
242                 break;
243             }
244         }
245         if (shouldReCombineWillPlayQueue) {
246             QueueAndSortWillPlayStreamID(freshStreamIDAndPlayParamsInfo);
247         }
248     }
249 }
250 
AddPlayTask(const int32_t streamID, const PlayParams playParameters)251 int32_t StreamIDManager::AddPlayTask(const int32_t streamID, const PlayParams playParameters)
252 {
253     ThreadPool::Task streamPlayTask = [this, streamID] { this->DoPlay(streamID); };
254     CHECK_AND_RETURN_RET_LOG(streamPlayingThreadPool_ != nullptr, MSERR_INVALID_VAL,
255         "Failed to obtain playing ThreadPool");
256     CHECK_AND_RETURN_RET_LOG(streamPlayTask != nullptr, MSERR_INVALID_VAL, "Failed to obtain stream play Task");
257     streamPlayingThreadPool_->AddTask(streamPlayTask);
258     std::lock_guard lock(streamIDManagerLock_);
259     QueueAndSortPlayingStreamID(streamID);
260     return MSERR_OK;
261 }
262 
DoPlay(const int32_t streamID)263 int32_t StreamIDManager::DoPlay(const int32_t streamID)
264 {
265     MEDIA_LOGI("StreamIDManager::DoPlay start streamID:%{public}d", streamID);
266     std::shared_ptr<CacheBuffer> cacheBuffer = FindCacheBuffer(streamID);
267     CHECK_AND_RETURN_RET_LOG(cacheBuffer.get() != nullptr, MSERR_INVALID_VAL, "cachebuffer invalid.");
268     if (cacheBuffer->DoPlay(streamID) == MSERR_OK) {
269         MEDIA_LOGI("StreamIDManager::DoPlay success streamID:%{public}d", streamID);
270         return MSERR_OK;
271     }
272     MEDIA_LOGI("StreamIDManager::DoPlay failed streamID:%{public}d", streamID);
273     {
274         std::lock_guard lock(streamIDManagerLock_);
275         for (int32_t i = 0; i < static_cast<int32_t>(playingStreamIDs_.size()); i++) {
276             int32_t playingStreamID = playingStreamIDs_[i];
277             std::shared_ptr<CacheBuffer> playingCacheBuffer = FindCacheBuffer(playingStreamID);
278             if (playingCacheBuffer != nullptr && !playingCacheBuffer->IsRunning()) {
279                 MEDIA_LOGI("StreamIDManager::DoPlay fail erase playingStreamID:%{public}d", playingStreamID);
280                 playingStreamIDs_.erase(playingStreamIDs_.begin() + i);
281                 i--;
282             }
283         }
284     }
285     return MSERR_INVALID_VAL;
286 }
287 
FindCacheBuffer(const int32_t streamID)288 std::shared_ptr<CacheBuffer> StreamIDManager::FindCacheBuffer(const int32_t streamID)
289 {
290     if (cacheBuffers_.empty()) {
291         MEDIA_LOGI("StreamIDManager::FindCacheBuffer cacheBuffers_ empty");
292         return nullptr;
293     }
294     CHECK_AND_RETURN_RET_LOG(streamID >= 0, nullptr, "streamID invalid.");
295     if (cacheBuffers_.find(streamID) != cacheBuffers_.end()) {
296         return cacheBuffers_.at(streamID);
297     }
298     return nullptr;
299 }
300 
GetStreamIDBySoundID(const int32_t soundID)301 int32_t StreamIDManager::GetStreamIDBySoundID(const int32_t soundID)
302 {
303     PlayParams playParameters;
304     return GetFreshStreamID(soundID, playParameters);
305 }
306 
ReorderStream(int32_t streamID, int32_t priority)307 int32_t StreamIDManager::ReorderStream(int32_t streamID, int32_t priority)
308 {
309     std::lock_guard lock(streamIDManagerLock_);
310     int32_t playingSize = static_cast<int32_t>(playingStreamIDs_.size());
311     for (int32_t i = 0; i < playingSize - 1; ++i) {
312         for (int32_t j = 0; j < playingSize - 1 - i; ++j) {
313             std::shared_ptr<CacheBuffer> left = FindCacheBuffer(playingStreamIDs_[j]);
314             std::shared_ptr<CacheBuffer> right = FindCacheBuffer(playingStreamIDs_[j + 1]);
315             if (left != nullptr && right != nullptr && left->GetPriority() < right->GetPriority()) {
316                 int32_t streamIdTemp = playingStreamIDs_[j];
317                 playingStreamIDs_[j] = playingStreamIDs_[j + 1];
318                 playingStreamIDs_[j + 1] = streamIdTemp;
319             }
320         }
321     }
322     for (size_t i = 0; i < playingStreamIDs_.size(); i++) {
323         int32_t playingStreamID = playingStreamIDs_[i];
324         MEDIA_LOGD("StreamIDManager::ReorderStream  playingStreamID:%{public}d", playingStreamID);
325     }
326 
327     int32_t willPlaySize = static_cast<int32_t>(willPlayStreamInfos_.size());
328     for (int32_t i = 0; i < willPlaySize - 1; ++i) {
329         for (int32_t j = 0; j < willPlaySize - 1 - i; ++j) {
330             std::shared_ptr<CacheBuffer> left = FindCacheBuffer(willPlayStreamInfos_[j].streamID);
331             std::shared_ptr<CacheBuffer> right = FindCacheBuffer(willPlayStreamInfos_[j + 1].streamID);
332             if (left != nullptr && right != nullptr && left->GetPriority() < right->GetPriority()) {
333                 StreamIDAndPlayParamsInfo willPlayInfoTemp = willPlayStreamInfos_[j];
334                 willPlayStreamInfos_[j] = willPlayStreamInfos_[j + 1];
335                 willPlayStreamInfos_[j + 1] = willPlayInfoTemp;
336             }
337         }
338     }
339     for (size_t i = 0; i < willPlayStreamInfos_.size(); i++) {
340         StreamIDAndPlayParamsInfo willPlayInfo = willPlayStreamInfos_[i];
341         MEDIA_LOGD("StreamIDManager::ReorderStream  willPlayStreamID:%{public}d", willPlayInfo.streamID);
342     }
343     return MSERR_OK;
344 }
345 
ClearStreamIDInDeque(int32_t streamID)346 int32_t StreamIDManager::ClearStreamIDInDeque(int32_t streamID)
347 {
348     std::lock_guard lock(streamIDManagerLock_);
349     for (auto it = playingStreamIDs_.begin(); it != playingStreamIDs_.end();) {
350         if (*it == streamID) {
351             MEDIA_LOGI("StreamIDManager::ClearStreamIDInDeque playingDel streamID:%{public}d", streamID);
352             it = playingStreamIDs_.erase(it);
353         } else {
354             ++it;
355         }
356     }
357     for (auto it = willPlayStreamInfos_.begin(); it != willPlayStreamInfos_.end();) {
358         if (it->streamID == streamID) {
359             MEDIA_LOGI("StreamIDManager::ClearStreamIDInDeque willPlayDel streamID:%{public}d", streamID);
360             it = willPlayStreamInfos_.erase(it);
361         } else {
362             ++it;
363         }
364     }
365     return MSERR_OK;
366 }
367 
GetFreshStreamID(const int32_t soundID, PlayParams playParameters)368 int32_t StreamIDManager::GetFreshStreamID(const int32_t soundID, PlayParams playParameters)
369 {
370     int32_t streamID = 0;
371     if (cacheBuffers_.empty()) {
372         MEDIA_LOGI("StreamIDManager::GetFreshStreamID cacheBuffers_ empty");
373         return streamID;
374     }
375     for (auto cacheBuffer : cacheBuffers_) {
376         if (cacheBuffer.second == nullptr) {
377             MEDIA_LOGE("Invalid cacheBuffer, soundID:%{public}d", soundID);
378             continue;
379         }
380         if (soundID == cacheBuffer.second->GetSoundID()) {
381             streamID = cacheBuffer.second->GetStreamID();
382             MEDIA_LOGI("Have cache soundID:%{public}d, streamID:%{public}d", soundID, streamID);
383             break;
384         }
385     }
386     return streamID;
387 }
388 
OnPlayFinished()389 void StreamIDManager::OnPlayFinished()
390 {
391     {
392         std::lock_guard lock(streamIDManagerLock_);
393         for (int32_t i = 0; i < static_cast<int32_t>(playingStreamIDs_.size()); i++) {
394             int32_t playingStreamID = playingStreamIDs_[i];
395             std::shared_ptr<CacheBuffer> playingCacheBuffer = FindCacheBuffer(playingStreamID);
396             if (playingCacheBuffer != nullptr && !playingCacheBuffer->IsRunning()) {
397                 MEDIA_LOGI("StreamIDManager::OnPlayFinished erase playingStreamID:%{public}d", playingStreamID);
398                 playingStreamIDs_.erase(playingStreamIDs_.begin() + i);
399                 i--;
400             }
401         }
402     }
403     if (!willPlayStreamInfos_.empty()) {
404         MEDIA_LOGI("StreamIDManager OnPlayFinished will play streams non empty, get the front.");
405         StreamIDAndPlayParamsInfo willPlayStreamInfo =  willPlayStreamInfos_.front();
406         AddPlayTask(willPlayStreamInfo.streamID, willPlayStreamInfo.playParameters);
407         std::lock_guard lock(streamIDManagerLock_);
408         willPlayStreamInfos_.pop_front();
409     }
410 }
411 
SetCallback(const std::shared_ptr<ISoundPoolCallback> &callback)412 int32_t StreamIDManager::SetCallback(const std::shared_ptr<ISoundPoolCallback> &callback)
413 {
414     callback_ = callback;
415     return MSERR_OK;
416 }
417 
SetFrameWriteCallback(const std::shared_ptr<ISoundPoolFrameWriteCallback> &callback)418 int32_t StreamIDManager::SetFrameWriteCallback(const std::shared_ptr<ISoundPoolFrameWriteCallback> &callback)
419 {
420     frameWriteCallback_ = callback;
421     return MSERR_OK;
422 }
423 } // namespace Media
424 } // namespace OHOS
425