1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "buffer_dispatcher.h"
17 #include <cmath>
18 #include <cstdarg>
19 #include <cstdint>
20 #include "common/common_macro.h"
21 #include "media_channel_def.h"
22
23 namespace OHOS {
24 namespace Sharing {
25
26 constexpr int32_t WRITING_TIMTOUT = 30;
27 constexpr int32_t FIX_OFFSET_TWO = 2;
28 constexpr int32_t FIX_OFFSET_ONE = 1;
29
SetSource(IBufferReader::Ptr dataReader)30 void BufferReceiver::SetSource(IBufferReader::Ptr dataReader)
31 {
32 SHARING_LOGD("trace.");
33 bufferReader_ = dataReader;
34 }
35
OnMediaDataNotify()36 int32_t BufferReceiver::OnMediaDataNotify()
37 {
38 SHARING_LOGD("BufferReceiver Media notified.");
39 dataReady_ = true;
40 notifyData_.notify_one();
41 return 0;
42 }
43
OnAudioDataNotify()44 int32_t BufferReceiver::OnAudioDataNotify()
45 {
46 MEDIA_LOGD("BufferReceiver Audio notified.");
47 nonBlockAudio_ = true;
48 notifyAudio_.notify_one();
49 return 0;
50 }
51
OnVideoDataNotify()52 int32_t BufferReceiver::OnVideoDataNotify()
53 {
54 MEDIA_LOGD("BufferReceiver Video notified.");
55 nonBlockVideo_ = true;
56 notifyVideo_.notify_one();
57 return 0;
58 }
59
IsMixedReceiver()60 bool BufferReceiver::IsMixedReceiver()
61 {
62 MEDIA_LOGD("trace.");
63 return mixed_;
64 }
65
RequestRead(MediaType type, std::function<void(const MediaData::Ptr &data)> cb)66 int32_t BufferReceiver::RequestRead(MediaType type, std::function<void(const MediaData::Ptr &data)> cb)
67 {
68 MEDIA_LOGD("trace.");
69 if (bufferReader_ == nullptr) {
70 SHARING_LOGE("BufferReceiver read failed null dispatcher.");
71 return -1;
72 }
73
74 if (firstMRead_ && type == MEDIA_TYPE_AV) {
75 bufferReader_->NotifyReadReady(GetReceiverId(), type);
76 mixed_ = true;
77 firstMRead_ = false;
78 } else if (firstARead_ && type == MEDIA_TYPE_AUDIO) {
79 bufferReader_->NotifyReadReady(GetReceiverId(), type);
80 firstARead_ = false;
81 } else if (firstVRead_ && type == MEDIA_TYPE_VIDEO) {
82 bufferReader_->NotifyReadReady(GetReceiverId(), type);
83 firstVRead_ = false;
84 }
85 std::unique_lock<std::mutex> locker(mutex_);
86 MEDIA_LOGD("BufferDispatcher NotifyThreadWorker before wait, receiverId: %{public}u.", GetReceiverId());
87 switch (type) {
88 /* cv will waiting if pred is false;
89 so set waiting audio pred (type != MEDIA_TYPE_AUDIO) to NOT block other type.*/
90 case MEDIA_TYPE_AUDIO:
91 MEDIA_LOGD("wait Audio, receiverId: %{public}u.", GetReceiverId());
92 notifyAudio_.wait(locker, [=]() { return nonBlockAudio_ || type != MEDIA_TYPE_AUDIO; });
93 nonBlockAudio_ = false;
94 break;
95 case MEDIA_TYPE_VIDEO:
96 MEDIA_LOGD("wait Video, receiverId: %{public}u.", GetReceiverId());
97 notifyVideo_.wait(locker, [=]() { return nonBlockVideo_ || type != MEDIA_TYPE_VIDEO; });
98 nonBlockVideo_ = false;
99 break;
100 case MEDIA_TYPE_AV:
101 MEDIA_LOGD("wait Mixed, receiverId: %{public}u.", GetReceiverId());
102 notifyData_.wait(locker, [=]() { return dataReady_ || type != MEDIA_TYPE_AV; });
103 dataReady_ = false;
104 break;
105 default:
106 return 0;
107 break;
108 }
109
110 bufferReader_->ClearDataBit(GetReceiverId(), type);
111 bufferReader_->ClearReadBit(GetReceiverId(), type);
112 MEDIA_LOGD("BufferDispatcher NotifyThreadWorker after wait start read, receiverId: %{public}u.", GetReceiverId());
113 int32_t ret = bufferReader_->ReadBufferData(GetReceiverId(), type, cb);
114 bufferReader_->NotifyReadReady(GetReceiverId(), type);
115 dataReady_ = false;
116
117 return ret;
118 }
119
NotifyReadStart()120 void BufferReceiver::NotifyReadStart()
121 {
122 SHARING_LOGD("receiverId: %{public}u notify start read.", GetReceiverId());
123 firstARead_ = true;
124 firstVRead_ = true;
125 firstMRead_ = true;
126 }
127
GetReceiverId()128 uint32_t BufferReceiver::GetReceiverId()
129 {
130 MEDIA_LOGD("trace.");
131 return GetId();
132 }
133
GetDispatcherId()134 uint32_t BufferReceiver::GetDispatcherId()
135 {
136 SHARING_LOGD("trace.");
137 if (bufferReader_) {
138 return bufferReader_->GetDispatcherId();
139 }
140
141 return 0;
142 }
143
NotifyReadStop()144 void BufferReceiver::NotifyReadStop()
145 {
146 SHARING_LOGD("receiverId: %{public}u notify stop read.", GetReceiverId());
147 nonBlockAudio_ = true;
148 nonBlockVideo_ = true;
149 dataReady_ = true;
150 notifyAudio_.notify_all();
151 notifyVideo_.notify_all();
152 notifyData_.notify_all();
153 }
154
EnableKeyMode(bool enable)155 void BufferReceiver::EnableKeyMode(bool enable)
156 {
157 SHARING_LOGD("bufferReceiver id %{public}u SetKeyOnlyMode %{public}d.", GetReceiverId(), enable);
158 if (keyOnly_ == true && enable == false) {
159 SHARING_LOGD("Set KeyOnlyMode false, need report fast read over.");
160 accelerationDone_ = true;
161 }
162
163 keyOnly_ = enable;
164 if (bufferReader_ && enable) {
165 bufferReader_->ClearDataBit(GetReceiverId(), MEDIA_TYPE_VIDEO);
166 }
167
168 auto listener = listener_.lock();
169 if (listener) {
170 listener->OnKeyModeNotify(enable);
171 }
172 }
173
IsKeyMode()174 bool BufferReceiver::IsKeyMode()
175 {
176 MEDIA_LOGD("trace.");
177 return keyOnly_;
178 }
179
IsKeyRedirect()180 bool BufferReceiver::IsKeyRedirect()
181 {
182 SHARING_LOGD("trace.");
183 return keyRedirect_;
184 }
185
GetSPS()186 const MediaData::Ptr BufferReceiver::GetSPS()
187 {
188 MEDIA_LOGD("trace.");
189 if (bufferReader_) {
190 return bufferReader_->GetSPS();
191 }
192
193 return nullptr;
194 }
195
GetPPS()196 const MediaData::Ptr BufferReceiver::GetPPS()
197 {
198 MEDIA_LOGD("trace.");
199 if (bufferReader_) {
200 return bufferReader_->GetPPS();
201 }
202
203 return nullptr;
204 }
205
NeedAcceleration()206 bool BufferReceiver::NeedAcceleration()
207 {
208 MEDIA_LOGD("trace.");
209 return accelerationDone_;
210 }
211
DisableAcceleration()212 void BufferReceiver::DisableAcceleration()
213 {
214 SHARING_LOGD("trace.");
215 accelerationDone_ = false;
216 }
217
SendAccelerationDone()218 void BufferReceiver::SendAccelerationDone()
219 {
220 SHARING_LOGD("trace.");
221 auto listener = listener_.lock();
222 if (listener) {
223 listener->OnAccelerationDoneNotify();
224 }
225 }
226
EnableKeyRedirect(bool enable)227 void BufferReceiver::EnableKeyRedirect(bool enable)
228 {
229 SHARING_LOGD("trace.");
230 if (bufferReader_ && enable) {
231 bufferReader_->EnableKeyRedirect(enable);
232 }
233 keyRedirect_ = enable;
234 }
235
SetBufferReceiverListener(std::weak_ptr<IBufferReceiverListener> listener)236 void BufferReceiver::SetBufferReceiverListener(std::weak_ptr<IBufferReceiverListener> listener)
237 {
238 SHARING_LOGD("trace.");
239 listener_ = listener;
240 }
241
242 using DataNotifier = BufferDispatcher::DataNotifier;
243
NotifyDataReceiver(MediaType type)244 void DataNotifier::NotifyDataReceiver(MediaType type)
245 {
246 MEDIA_LOGD("trace.");
247 if (receiver_.lock() == nullptr) {
248 SHARING_LOGE("target receiver NOT exist.");
249 return;
250 }
251
252 if (block_) {
253 return;
254 }
255
256 MEDIA_LOGD("notify target type %{public}d.", type);
257 switch (type) {
258 case MEDIA_TYPE_AUDIO:
259 GetBufferReceiver()->OnAudioDataNotify();
260 break;
261 case MEDIA_TYPE_VIDEO:
262 GetBufferReceiver()->OnVideoDataNotify();
263 break;
264 case MEDIA_TYPE_AV:
265 GetBufferReceiver()->OnMediaDataNotify();
266 break;
267 default:
268 SHARING_LOGI("none process case.");
269 break;
270 }
271 }
272
GetBufferReceiver()273 BufferReceiver::Ptr DataNotifier::GetBufferReceiver()
274 {
275 MEDIA_LOGD("trace.");
276 return receiver_.lock();
277 }
278
GetReceiverId()279 uint32_t DataNotifier::GetReceiverId()
280 {
281 MEDIA_LOGD("trace.");
282 auto receiver = receiver_.lock();
283 if (receiver == nullptr) {
284 SHARING_LOGE("target receiver NOT exist.");
285 return INVALID_INDEX;
286 }
287
288 return receiver->GetReceiverId();
289 }
290
SetListenDispatcher(IBufferReader::Ptr dispatcher)291 void DataNotifier::SetListenDispatcher(IBufferReader::Ptr dispatcher)
292 {
293 SHARING_LOGD("trace.");
294 dispatcher_ = dispatcher;
295 }
296
SetNotifyReceiver(BufferReceiver::Ptr receiver)297 void DataNotifier::SetNotifyReceiver(BufferReceiver::Ptr receiver)
298 {
299 SHARING_LOGD("trace.");
300 receiver_ = receiver;
301 }
302
SetBlock()303 void DataNotifier::SetBlock()
304 {
305 SHARING_LOGD("trace.");
306 block_ = true;
307 }
308
SetNeedUpdate(bool enable, MediaType type)309 void DataNotifier::SetNeedUpdate(bool enable, MediaType type)
310 {
311 MEDIA_LOGD("trace.");
312 if (type == MEDIA_TYPE_AUDIO) {
313 needUpdateAIndex = enable;
314 } else {
315 needUpdateVIndex = enable;
316 }
317 }
318
DataAvailable(MediaType type)319 bool DataNotifier::DataAvailable(MediaType type)
320 {
321 MEDIA_LOGD("trace.");
322 auto dispatcher = dispatcher_.lock();
323 if (dispatcher == nullptr) {
324 SHARING_LOGE("target dispatcher NOT exist.");
325 return false;
326 }
327
328 if (type == MEDIA_TYPE_AUDIO) {
329 return audioIndex != INVALID_INDEX &&
330 (audioIndex < dispatcher->GetLatestAudioIndex() || !dispatcher->IsRead(GetReceiverId(), audioIndex + 1));
331 } else if (type == MEDIA_TYPE_VIDEO) {
332 return videoIndex != INVALID_INDEX &&
333 (videoIndex < dispatcher->GetLatestVideoIndex() || !dispatcher->IsRead(GetReceiverId(), videoIndex + 1));
334 } else {
335 return videoIndex != INVALID_INDEX &&
336 (videoIndex < dispatcher->GetBufferSize() - 1 || !dispatcher->IsRead(GetReceiverId(), videoIndex + 1));
337 }
338
339 return false;
340 }
341
IsMixedReceiver()342 bool DataNotifier::IsMixedReceiver()
343 {
344 MEDIA_LOGD("trace.");
345 auto receiver = receiver_.lock();
346 if (receiver == nullptr) {
347 SHARING_LOGE("target receiver NOT exist.");
348 return false;
349 }
350
351 return receiver->IsMixedReceiver();
352 }
353
GetReceiverReadIndex(MediaType type)354 uint32_t DataNotifier::GetReceiverReadIndex(MediaType type)
355 {
356 MEDIA_LOGD("trace.");
357 switch (type) {
358 case MEDIA_TYPE_VIDEO:
359 MEDIA_LOGD("Video Recvid:%{public}d index: %{public}d.", GetReceiverId(), videoIndex);
360 return videoIndex;
361 break;
362 case MEDIA_TYPE_AUDIO:
363 MEDIA_LOGD("Audio Recvid:%{public}d index: %{public}d.", GetReceiverId(), audioIndex);
364 return audioIndex;
365 break;
366 case MEDIA_TYPE_AV:
367 MEDIA_LOGD("Mixed Recvid:%{public}d vindex: %{public}d aindex: %{public}d.", GetReceiverId(), videoIndex,
368 audioIndex);
369 if (audioIndex != INVALID_INDEX && videoIndex != INVALID_INDEX) {
370 return audioIndex <= videoIndex ? audioIndex : videoIndex;
371 } else if (audioIndex == INVALID_INDEX && videoIndex == INVALID_INDEX) {
372 return INVALID_INDEX;
373 } else {
374 return audioIndex == INVALID_INDEX ? videoIndex : audioIndex;
375 }
376 break;
377 default:
378 return INVALID_INDEX;
379 break;
380 }
381 }
382
IsKeyModeReceiver()383 bool DataNotifier::IsKeyModeReceiver()
384 {
385 MEDIA_LOGD("trace.");
386 auto receiver = receiver_.lock();
387 if (receiver) {
388 return receiver->IsKeyMode();
389 }
390
391 return false;
392 }
393
IsKeyRedirectReceiver()394 bool DataNotifier::IsKeyRedirectReceiver()
395 {
396 SHARING_LOGD("trace.");
397 auto receiver = receiver_.lock();
398 if (receiver) {
399 return receiver->IsKeyRedirect();
400 }
401
402 return false;
403 }
404
NeedAcceleration()405 bool DataNotifier::NeedAcceleration()
406 {
407 MEDIA_LOGD("trace.");
408 auto receiver = receiver_.lock();
409 if (receiver == nullptr) {
410 SHARING_LOGE("target receiver NOT exist.");
411 return false;
412 }
413
414 return receiver->NeedAcceleration();
415 }
416
SendAccelerationDone()417 void DataNotifier::SendAccelerationDone()
418 {
419 SHARING_LOGD("trace.");
420 auto receiver = receiver_.lock();
421 if (receiver == nullptr) {
422 SHARING_LOGE("target receiver NOT exist.");
423 return;
424 }
425
426 receiver->SendAccelerationDone();
427 receiver->DisableAcceleration();
428 }
429
BufferDispatcher(uint32_t maxCapacity, uint32_t capacityIncrement)430 BufferDispatcher::BufferDispatcher(uint32_t maxCapacity, uint32_t capacityIncrement)
431 {
432 SHARING_LOGD("BufferDispatcher ctor, set capacity: %{public}u.", maxCapacity);
433 maxBufferCapacity_ = maxCapacity;
434 bufferCapacityIncrement_ = capacityIncrement;
435 {
436 std::lock_guard<std::mutex> lock(idleMutex_);
437 idleAudioBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
438 idleVideoBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
439 for (size_t i = 0; i < INITIAL_BUFFER_CAPACITY; i++) {
440 MediaData::Ptr adata = std::make_shared<MediaData>();
441 MediaData::Ptr vdata = std::make_shared<MediaData>();
442 adata->buff = std::make_shared<DataBuffer>();
443 vdata->buff = std::make_shared<DataBuffer>();
444 idleAudioBuffer_.push_back(adata);
445 idleVideoBuffer_.push_back(vdata);
446 }
447 }
448
449 writingTimer_ = std::make_unique<TimeoutTimer>("dispatcher-writing-timer");
450
451 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
452 circularBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
453 StartDispatch();
454 }
455
~BufferDispatcher()456 BufferDispatcher::~BufferDispatcher()
457 {
458 SHARING_LOGI("BufferDispatcher dtor.");
459 running_ = false;
460 StopDispatch();
461 FlushBuffer();
462 ReleaseIdleBuffer();
463 ReleaseAllReceiver();
464 }
465
StartDispatch()466 void BufferDispatcher::StartDispatch()
467 {
468 SHARING_LOGD("trace.");
469 running_ = true;
470 notifyThread_ = std::thread(&BufferDispatcher::NotifyThreadWorker, this);
471 std::string name = "notifyThread";
472 pthread_setname_np(notifyThread_.native_handle(), name.c_str());
473 }
474
StopDispatch()475 void BufferDispatcher::StopDispatch()
476 {
477 SHARING_LOGD("trace.");
478 running_ = false;
479 continueNotify_ = true;
480
481 if (writingTimer_) {
482 writingTimer_.reset();
483 }
484
485 dataCV_.notify_all();
486 if (notifyThread_.joinable()) {
487 notifyThread_.join();
488 }
489 }
490
SetBufferCapacity(size_t capacity)491 void BufferDispatcher::SetBufferCapacity(size_t capacity)
492 {
493 SHARING_LOGD("trace.");
494 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
495 circularBuffer_.set_capacity(capacity);
496 }
497
SetDataMode(MediaDispacherMode dataMode)498 void BufferDispatcher::SetDataMode(MediaDispacherMode dataMode)
499 {
500 SHARING_LOGD("trace.");
501 dataMode_ = dataMode;
502 }
503
ReleaseIdleBuffer()504 void BufferDispatcher::ReleaseIdleBuffer()
505 {
506 SHARING_LOGD("BufferDispatcher idle Release Start.");
507 std::unique_lock<std::mutex> locker(idleMutex_);
508 for (auto &data : idleAudioBuffer_) {
509 if (data != nullptr && data->buff != nullptr) {
510 data->buff.reset();
511 }
512 }
513
514 idleAudioBuffer_.clear();
515 for (auto &data : idleVideoBuffer_) {
516 if (data != nullptr && data->buff != nullptr) {
517 data->buff.reset();
518 }
519 }
520
521 idleVideoBuffer_.clear();
522 SHARING_LOGD("BufferDispatcher idle Release End.");
523 }
524
FlushBuffer()525 void BufferDispatcher::FlushBuffer()
526 {
527 SHARING_LOGI("BufferDispatcher Start flushing, dispatcherId: %{public}u.", GetDispatcherId());
528 {
529 std::lock_guard<std::mutex> lock(idleMutex_);
530 idleAudioBuffer_.clear();
531 idleVideoBuffer_.clear();
532 for (size_t i = 0; i < INITIAL_BUFFER_CAPACITY; i++) {
533 MediaData::Ptr adata = std::make_shared<MediaData>();
534 MediaData::Ptr vdata = std::make_shared<MediaData>();
535 adata->buff = std::make_shared<DataBuffer>();
536 vdata->buff = std::make_shared<DataBuffer>();
537 idleAudioBuffer_.push_back(adata);
538 idleVideoBuffer_.push_back(vdata);
539 }
540 }
541
542 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
543 for (auto &data : circularBuffer_) {
544 if (data->mediaData != nullptr && data->mediaData->buff != nullptr) {
545 data->mediaData->buff.reset();
546 }
547 }
548
549 circularBuffer_.clear();
550 waitingKey_ = true;
551 gop_ = 0;
552 audioFrameCnt_ = 0;
553 videoFrameCnt_ = 0;
554 ResetAllIndex();
555 SHARING_LOGD("BufferDispatcher Dispatcher flushing end, dispatcherId: %{public}u.", GetDispatcherId());
556 }
557
RequestDataBuffer(MediaType type, uint32_t size)558 MediaData::Ptr BufferDispatcher::RequestDataBuffer(MediaType type, uint32_t size)
559 {
560 SHARING_LOGD("trace.");
561 std::lock_guard<std::mutex> lock(idleMutex_);
562 if (size <= 0) {
563 SHARING_LOGE("Size invalid.");
564 return nullptr;
565 }
566
567 MediaData::Ptr retData;
568 if (type == MEDIA_TYPE_VIDEO) {
569 if (!idleVideoBuffer_.empty()) {
570 SHARING_LOGD("video From idle.");
571 retData = idleVideoBuffer_.front();
572 idleVideoBuffer_.pop_front();
573 if (retData == nullptr) {
574 MEDIA_LOGW("video From alloc when idle nullptr.");
575 retData = std::make_shared<MediaData>();
576 }
577 return retData;
578 }
579 } else {
580 if (!idleAudioBuffer_.empty()) {
581 SHARING_LOGD("Audio From idle.");
582 retData = idleAudioBuffer_.front();
583 idleAudioBuffer_.pop_front();
584 if (retData == nullptr) {
585 MEDIA_LOGW("Audio From alloc when idle nullptr.");
586 retData = std::make_shared<MediaData>();
587 }
588 return retData;
589 }
590 }
591
592 SHARING_LOGD("Audio/video from alloc.");
593 retData = std::make_shared<MediaData>();
594 return retData;
595 }
596
ReturnIdleBuffer(DataSpec::Ptr &data)597 void BufferDispatcher::ReturnIdleBuffer(DataSpec::Ptr &data)
598 {
599 MEDIA_LOGD("trace.");
600 std::lock_guard<std::mutex> lock(idleMutex_);
601 if (data == nullptr || data->mediaData == nullptr) {
602 return;
603 }
604 if (data->mediaData->mediaType == MEDIA_TYPE_VIDEO) {
605 if (idleVideoBuffer_.size() < INITIAL_BUFFER_CAPACITY) {
606 idleVideoBuffer_.push_back(data->mediaData);
607 MEDIA_LOGD("data: push_back in idleVideoBuffer_, size: %{public}zu.", idleVideoBuffer_.size());
608 }
609 } else {
610 if (idleAudioBuffer_.size() < INITIAL_BUFFER_CAPACITY) {
611 idleAudioBuffer_.push_back(data->mediaData);
612 MEDIA_LOGD("data: push_back in idleAudioBuffer_, size: %{public}zu.", idleAudioBuffer_.size());
613 }
614 }
615
616 data.reset();
617 }
618
GetBufferSize()619 size_t BufferDispatcher::GetBufferSize()
620 {
621 SHARING_LOGD("trace.");
622 return circularBuffer_.size();
623 }
624
FindReceiverIndex(uint32_t receiverId)625 uint32_t BufferDispatcher::FindReceiverIndex(uint32_t receiverId)
626 {
627 MEDIA_LOGD("trace.");
628 if (notifiers_.find(receiverId) != notifiers_.end()) {
629 return notifiers_[receiverId]->GetReadIndex();
630 }
631
632 return INVALID_INDEX;
633 }
634
IsRecevierExist(uint32_t receiverId)635 bool BufferDispatcher::IsRecevierExist(uint32_t receiverId)
636 {
637 SHARING_LOGD("trace.");
638 auto notifier = GetNotifierByReceiverId(receiverId);
639 if (notifier == nullptr) {
640 return false;
641 }
642
643 return true;
644 }
645
EnableKeyMode(bool enable)646 void BufferDispatcher::EnableKeyMode(bool enable)
647 {
648 SHARING_LOGD("trace.");
649 keyOnly_ = enable;
650 }
651
AttachReceiver(BufferReceiver::Ptr receiver)652 int32_t BufferDispatcher::AttachReceiver(BufferReceiver::Ptr receiver)
653 {
654 SHARING_LOGD("trace.");
655 if (receiver == nullptr) {
656 return -1;
657 }
658
659 if (IsRecevierExist(receiver->GetReceiverId())) {
660 SHARING_LOGE("Exist.");
661 return 0;
662 }
663
664 receiver->NotifyReadStart();
665 std::lock_guard<std::mutex> locker(notifyMutex_);
666 if (readRefFlag_ == 0xFFFF) {
667 SHARING_LOGE("readRefFlag limited.");
668 return -1;
669 }
670
671 DataNotifier::Ptr notifier = std::make_shared<DataNotifier>();
672 notifier->SetListenDispatcher(shared_from_this());
673 notifier->SetNotifyReceiver(receiver);
674
675 auto usableRef = ~readRefFlag_ & (-(~readRefFlag_));
676
677 if ((usableRef & (usableRef - 1)) != 0) {
678 SHARING_LOGE("usableRef: %{public}d invalid.", usableRef);
679 return -1;
680 }
681
682 readRefFlag_ |= usableRef;
683 notifier->SetReadIndex(static_cast<uint32_t>(log2(usableRef)));
684 SHARING_LOGI("receiverId: %{public}d, readIndex: %{public}d, usableRef: %{public}d, readRefFlag_: %{public}d.",
685 receiver->GetReceiverId(), notifier->GetReadIndex(), usableRef, readRefFlag_);
686 receiver->SetSource(shared_from_this());
687 notifiers_.emplace(receiver->GetReceiverId(), notifier);
688
689 if (circularBuffer_.empty()) {
690 notifier->audioIndex = INVALID_INDEX;
691 notifier->videoIndex = INVALID_INDEX;
692 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, false);
693 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
694 SHARING_LOGD("BufferDispatcher Attach when buffer empty RecvId: %{public}d.", receiver->GetReceiverId());
695 videoNeedActivate_ = true;
696 audioNeedActivate_ = true;
697 return 0;
698 }
699
700 if (dataMode_ == MEDIA_AUDIO_ONLY) {
701 notifier->audioIndex = circularBuffer_.size() - 1;
702 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, true);
703 notifier->videoIndex = INVALID_INDEX;
704 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
705 SHARING_LOGD("BufferDispatcher Attach when Keyindex List empty RecvId: %{public}d.",
706 receiver->GetReceiverId());
707 } else {
708 if (!keyIndexList_.empty()) {
709 SHARING_LOGD("BufferDispatcher Attach with Keyindex RecvId: %{public}d KeyIndex:%{public}d.",
710 receiver->GetReceiverId(), keyIndexList_.back());
711 uint32_t tempIndex = FindNextIndex(keyIndexList_.back(), MEDIA_TYPE_AUDIO);
712 notifier->audioIndex = tempIndex == keyIndexList_.back() ? INVALID_INDEX : tempIndex;
713 notifier->videoIndex = keyIndexList_.back();
714 bool isAudioReady = tempIndex != INVALID_INDEX ? true : false;
715 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, isAudioReady);
716 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, true);
717 if (lastAudioIndex_ == INVALID_INDEX) {
718 audioNeedActivate_ = true;
719 }
720 } else {
721 SHARING_LOGD("BufferDispatcher Attach with Non Keyindex Exist RecvId: %{public}d.",
722 receiver->GetReceiverId());
723 uint32_t tempIndex = FindLastIndex(MEDIA_TYPE_AUDIO);
724 notifier->audioIndex = tempIndex;
725 notifier->videoIndex = INVALID_INDEX;
726 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, true);
727 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
728 if (lastAudioIndex_ == INVALID_INDEX) {
729 audioNeedActivate_ = true;
730 }
731 }
732 }
733
734 return 0;
735 }
736
DetachReceiver(BufferReceiver::Ptr receiver)737 int32_t BufferDispatcher::DetachReceiver(BufferReceiver::Ptr receiver)
738 {
739 SHARING_LOGI("buffer dispatcher: Detach receiver in.");
740 if (receiver == nullptr) {
741 SHARING_LOGE("buffer dispatcher: Detach receiver failed - null receiver.");
742 return -1;
743 }
744
745 if (!IsRecevierExist(receiver->GetReceiverId())) {
746 SHARING_LOGE("BufferDispatcher AttachReceiver No Vaild Recevier Exist.");
747 return 0;
748 }
749
750 auto notifier = GetNotifierByReceiverPtr(receiver);
751 if (notifier == nullptr) {
752 SHARING_LOGE("buffer dispatcher: Detach receiver failed - no find receiver in notifiers.");
753 return -1;
754 }
755
756 std::lock_guard<std::mutex> locker(notifyMutex_);
757 notifier->SetBlock();
758 SetReceiverReadRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, false);
759 SetReceiverReadRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
760
761 readRefFlag_ &= ~(RECV_FLAG_BASE << notifier->GetReadIndex());
762 notifiers_.erase(receiver->GetReceiverId());
763 SHARING_LOGI("now refFlag: %{public}d.", readRefFlag_);
764 return 0;
765 }
766
DetachReceiver(uint32_t receiverId, DataNotifier::Ptr notifier)767 int32_t BufferDispatcher::DetachReceiver(uint32_t receiverId, DataNotifier::Ptr notifier)
768 {
769 SHARING_LOGI("buffer dispatcher: Detach notifier in.");
770 if (notifier == nullptr) {
771 SHARING_LOGE("buffer dispatcher: Detach receiver failed - null notifier.");
772 return -1;
773 }
774 notifier->SetBlock();
775 SetReceiverReadRef(receiverId, MEDIA_TYPE_AUDIO, false);
776 SetReceiverReadRef(receiverId, MEDIA_TYPE_VIDEO, false);
777
778 readRefFlag_ &= ~(RECV_FLAG_BASE << notifier->GetReadIndex());
779 notifiers_.erase(receiverId);
780 SHARING_LOGI("now refFlag: %{public}d.", readRefFlag_);
781 return 0;
782 }
783
ReleaseAllReceiver()784 void BufferDispatcher::ReleaseAllReceiver()
785 {
786 SHARING_LOGD("trace.");
787 std::lock_guard<std::mutex> locker(notifyMutex_);
788 for (auto it = notifiers_.begin(); it != notifiers_.end();) {
789 auto notifier = it->second;
790 if (notifier == nullptr) {
791 ++it;
792 continue;
793 }
794
795 auto receiver = notifier->GetBufferReceiver();
796 if (receiver == nullptr) {
797 ++it;
798 continue;
799 }
800
801 auto receiverId = receiver->GetReceiverId();
802 if (notifiers_.find(receiverId) != notifiers_.end()) {
803 auto notifierFind = notifiers_[receiverId];
804 ++it;
805 DetachReceiver(receiverId, notifierFind);
806 } else {
807 ++it;
808 SHARING_LOGE("buffer dispatcher: Detach receiver failed - no find receiver in notifiers.");
809 }
810 }
811
812 notifiers_.clear();
813 SHARING_LOGD("release all receiver out.");
814 }
815
SetBufferDispatcherListener(BufferDispatcherListener::Ptr listener)816 void BufferDispatcher::SetBufferDispatcherListener(BufferDispatcherListener::Ptr listener)
817 {
818 SHARING_LOGD("trace.");
819 listener_ = listener;
820 RETURN_IF_NULL(writingTimer_);
821 writingTimer_->StartTimer(
822 WRITING_TIMTOUT, "waiting for continuous data inputs",
823 [this]() {
824 if (!writing_) {
825 SHARING_LOGI("writing timeout");
826 auto listener = listener_.lock();
827 if (listener) {
828 listener->OnWriteTimeout();
829 }
830 } else {
831 SHARING_LOGI("restart timer");
832 writing_ = false;
833 }
834 },
835 true);
836 }
837
GetNotifierByReceiverPtr(BufferReceiver::Ptr receiver)838 DataNotifier::Ptr BufferDispatcher::GetNotifierByReceiverPtr(BufferReceiver::Ptr receiver)
839 {
840 SHARING_LOGD("trace.");
841
842 return GetNotifierByReceiverId(receiver->GetReceiverId());
843 }
844
GetNotifierByReceiverId(uint32_t receiverId)845 DataNotifier::Ptr BufferDispatcher::GetNotifierByReceiverId(uint32_t receiverId)
846 {
847 MEDIA_LOGD("trace.");
848 std::lock_guard<std::mutex> locker(notifyMutex_);
849 if (notifiers_.find(receiverId) != notifiers_.end()) {
850 return notifiers_[receiverId];
851 }
852
853 return nullptr;
854 }
855
ReadBufferData(uint32_t receiverId, MediaType type, std::function<void(const MediaData::Ptr &data)> cb)856 int32_t BufferDispatcher::ReadBufferData(uint32_t receiverId, MediaType type,
857 std::function<void(const MediaData::Ptr &data)> cb)
858 {
859 MEDIA_LOGD("in, receiverId: %{public}u.", receiverId);
860 auto notifier = GetNotifierByReceiverId(receiverId);
861 if (notifier == nullptr) {
862 SHARING_LOGE("notifier is nullptr.");
863 return -1;
864 }
865
866 std::shared_lock<std::shared_mutex> locker(bufferMutex_);
867 uint32_t readIndex = notifier->GetReceiverReadIndex(type);
868 if (readIndex >= circularBuffer_.size()) {
869 SHARING_LOGE("Read wrong index exceed size.");
870 return -1;
871 }
872
873 if ((keyOnly_ || notifier->IsKeyModeReceiver()) && type == MEDIA_TYPE_VIDEO &&
874 !IsKeyVideoFrame(circularBuffer_.at(readIndex))) {
875 UpdateReceiverReadIndex(receiverId, readIndex, type);
876 SHARING_LOGE("Read Non Key Video in KeyOnly Mode index: %{public}u.", readIndex);
877 return -1;
878 }
879
880 if (IsDataReaded(receiverId, circularBuffer_.at(readIndex))) {
881 UpdateReceiverReadIndex(receiverId, readIndex, type);
882 return -1;
883 }
884
885 readIndex = notifier->GetReceiverReadIndex(type);
886 if (readIndex >= circularBuffer_.size()) {
887 return -1;
888 }
889
890 auto data = circularBuffer_.at(readIndex);
891 if (data == nullptr) {
892 SHARING_LOGE("BufferDispatcher Read data nullptr.");
893 return -1;
894 }
895
896 if (IsKeyVideoFrame(data)) {
897 int32_t bufferVideoCacheCnt = 0;
898 for (size_t i = readIndex + 1; i < circularBuffer_.size(); i++) {
899 if (circularBuffer_[i]->mediaData->mediaType == MEDIA_TYPE_VIDEO)
900 bufferVideoCacheCnt++;
901 }
902 MEDIA_LOGD("TEST STATISTIC:interval: buffer cache %{public}d frames.", bufferVideoCacheCnt);
903 }
904
905 SetReceiverReadFlag(receiverId, data);
906 if (cb != nullptr) {
907 cb(data->mediaData);
908 }
909
910 MEDIA_LOGD("Current data readed, Recvid:%{public}d, remain %{public}zu data, readIndex: %{public}u, "
911 "readtype: %{public}d, diff: %{public}zu.",
912 receiverId, circularBuffer_.size(), readIndex, int32_t(type), circularBuffer_.size() - readIndex);
913 UpdateReceiverReadIndex(receiverId, readIndex, type);
914 return 0;
915 }
916
InputData(const MediaData::Ptr &data)917 int32_t BufferDispatcher::InputData(const MediaData::Ptr &data)
918 {
919 if (data == nullptr || data->buff == nullptr) {
920 SHARING_LOGE("data nullptr.");
921 return -1;
922 }
923 MEDIA_LOGD("inputmediatype: %{public}d, keyFrame: %{public}d, pts: %{public}" PRIu64 ".", data->mediaType,
924 data->keyFrame, data->pts);
925
926 if (!writing_) {
927 writing_ = true;
928 }
929
930 DataSpec::Ptr dataSpec = std::make_shared<DataSpec>();
931 dataSpec->mediaData = data;
932 if (dataMode_ == MEDIA_AUDIO_ONLY) {
933 WriteDataIntoBuffer(dataSpec);
934 } else {
935 PreProcessDataSpec(dataSpec);
936 }
937
938 if (circularBuffer_.size() > 0) {
939 MEDIA_LOGD("inputmediatype: %{public}d, keyFrame: %{public}d, pts: %{public}" PRIu64 ".",
940 circularBuffer_[circularBuffer_.size() - 1]->mediaData->mediaType,
941 circularBuffer_[circularBuffer_.size() - 1]->mediaData->keyFrame,
942 circularBuffer_[circularBuffer_.size() - 1]->mediaData->pts);
943 }
944
945 if (data->keyFrame) {
946 MEDIA_LOGD("dispatcherId: %{public}u, after InputData, current circularBuffer_ size: %{public}zu, "
947 "idleVideoBuffer_ size: %{public}zu, idle_audioBuffer_ size: %{public}zu, "
948 "keyFrame: %{public}s, data size: %{public}d, adataCount:%{public}d.",
949 GetDispatcherId(), circularBuffer_.size(), idleVideoBuffer_.size(), idleAudioBuffer_.size(),
950 data->keyFrame ? "true" : "false", data->buff->Size(), audioFrameCnt_);
951 }
952
953 return 0;
954 }
955
PreProcessDataSpec(const DataSpec::Ptr &dataSpec)956 void BufferDispatcher::PreProcessDataSpec(const DataSpec::Ptr &dataSpec)
957 {
958 MEDIA_LOGD("trace.");
959 if (waitingKey_) {
960 if (IsAudioData(dataSpec)) {
961 } else if (!IsKeyVideoFrame(dataSpec)) {
962 SHARING_LOGD("BufferDispatcher Waiting First Key Video Frame.");
963 return;
964 } else {
965 SHARING_LOGD("BufferDispatcher received first key video frame and restore from uncontinuous...Flushing.");
966 FlushBuffer();
967 baseCounter_++;
968 capacityEvaluating_ = true;
969 waitingKey_ = false;
970 }
971 } else {
972 if (capacityEvaluating_) {
973 ReCalculateCapacity(IsKeyVideoFrame(dataSpec));
974 }
975 }
976
977 WriteDataIntoBuffer(dataSpec);
978 }
979
WriteDataIntoBuffer(const DataSpec::Ptr &data)980 int32_t BufferDispatcher::WriteDataIntoBuffer(const DataSpec::Ptr &data)
981 {
982 MEDIA_LOGD("trace.");
983 if (data->mediaData == nullptr || data->mediaData->buff == nullptr) {
984 SHARING_LOGE("null data.");
985 return -1;
986 }
987
988 if (NeedExtendToDBCapacity()) {
989 SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d CRTL_SIZE.", doubleBufferCapacity_);
990 SetBufferCapacity(doubleBufferCapacity_);
991 }
992
993 if (NeedRestoreToNormalCapacity()) {
994 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
995 int32_t popSize = circularBuffer_.size() - INITIAL_BUFFER_CAPACITY;
996 for (int32_t i = 0; i < popSize; i++) {
997 if (HeadFrameNeedReserve()) {
998 MEDIA_LOGW("dispatcherId: %{public}u, need reserve but pop, mediaType: "
999 "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64 ".",
1000 GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1001 circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
1002 circularBuffer_.front()->mediaData->pts);
1003 }
1004
1005 MEDIA_LOGW("dispatcherId: %{public}u, delete data, mediaType: %{public}d, keyFrame: "
1006 "%{public}s, pts: %{public}" PRIu64 ", reserveFlag: %{public}x.",
1007 GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1008 circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
1009 circularBuffer_.front()->mediaData->pts, circularBuffer_.front()->reserveFlag.load());
1010 circularBuffer_.pop_front();
1011 audioFrameCnt_--;
1012 UpdateIndex();
1013 }
1014
1015 baseBufferCapacity_ = INITIAL_BUFFER_CAPACITY;
1016 doubleBufferCapacity_ = INITIAL_BUFFER_CAPACITY * 2; // 2 : increasement
1017 SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d NORMALSIZE.", baseBufferCapacity_);
1018 circularBuffer_.set_capacity(baseBufferCapacity_);
1019 }
1020
1021 if (IsKeyVideoFrame(data) && !keyOnly_) {
1022 EraseOldGopDatas();
1023 }
1024
1025 bool updateIndexFlag = false;
1026 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
1027 if (circularBuffer_.size() >= circularBuffer_.capacity()) {
1028 updateIndexFlag = true;
1029 }
1030
1031 if (updateIndexFlag) {
1032 uint32_t nextDeleteIndex = 1;
1033 if (IsVideoData(data)) {
1034 nextDeleteIndex = FindNextDeleteVideoIndex();
1035 }
1036
1037 for (size_t i = 0; i <= nextDeleteIndex; i++) {
1038 MediaType headType = circularBuffer_.front()->mediaData->mediaType;
1039 DataSpec::Ptr retBuff = circularBuffer_.front();
1040 if (HeadFrameNeedReserve()) {
1041 MEDIA_LOGW("dispatcherId: %{public}u, need reserve but pop, mediaType: "
1042 "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64 ".",
1043 GetDispatcherId(), int32_t(retBuff->mediaData->mediaType),
1044 retBuff->mediaData->keyFrame ? "true" : "false", retBuff->mediaData->pts);
1045 }
1046
1047 MEDIA_LOGW("dispatcherId: %{public}u, delete data, mediaType: %{public}d, "
1048 "keyFrame: %{public}s, pts: %{public}" PRIu64 ", reserveFlag: %{public}x.",
1049 GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1050 circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
1051 circularBuffer_.front()->mediaData->pts, circularBuffer_.front()->reserveFlag.load());
1052
1053 circularBuffer_.pop_front();
1054 ReturnIdleBuffer(retBuff);
1055 headType == MEDIA_TYPE_AUDIO ? audioFrameCnt_-- : videoFrameCnt_--;
1056 UpdateIndex();
1057 }
1058 }
1059
1060 data->reserveFlag = 0;
1061 MEDIA_LOGD("WriteDataIntoBuffer data type: %{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64
1062 ", cur_size: %{public}zu, capacity: %{public}zu dispatcher[%{public}u].",
1063 int32_t(data->mediaData->mediaType), data->mediaData->keyFrame ? "true" : "false", data->mediaData->pts,
1064 circularBuffer_.size(), circularBuffer_.capacity(), GetDispatcherId());
1065 circularBuffer_.push_back(data);
1066 if (IsAudioData(data)) {
1067 lastAudioIndex_ = circularBuffer_.size() - 1;
1068 ActiveDataRef(MEDIA_TYPE_AUDIO, false);
1069 audioFrameCnt_++;
1070 } else {
1071 lastVideoIndex_ = circularBuffer_.size() - 1;
1072 if (!keyOnly_ || (keyOnly_ && IsKeyVideoFrame(data))) {
1073 ActiveDataRef(MEDIA_TYPE_VIDEO, IsKeyVideoFrame(data));
1074 }
1075 videoFrameCnt_++;
1076 }
1077
1078 if (audioNeedActivate_ && IsAudioData(data)) {
1079 MEDIA_LOGD("BufferDispatcher ActivateReceiverIndex By AudioData.");
1080 ActivateReceiverIndex(circularBuffer_.size() - 1, MEDIA_TYPE_AUDIO);
1081 }
1082
1083 if (IsKeyVideoFrame(data)) {
1084 uint32_t keyIndex = circularBuffer_.size() - 1;
1085 {
1086 std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1087 keyIndexList_.push_back(keyIndex);
1088 }
1089 if (videoNeedActivate_) {
1090 MEDIA_LOGD("BufferDispatcher ActivateReceiverIndex By KeyVideo Frame index: %{public}d.", keyIndex);
1091 ActivateReceiverIndex(keyIndex, MEDIA_TYPE_VIDEO);
1092 }
1093 if (keyRedirect_) {
1094 OnKeyRedirect();
1095 EnableKeyRedirect(false);
1096 }
1097 }
1098
1099 continueNotify_ = true;
1100 dataCV_.notify_one();
1101 return 0;
1102 }
1103
EraseOldGopDatas()1104 void BufferDispatcher::EraseOldGopDatas()
1105 {
1106 MEDIA_LOGD("BufferDispatcher Delete old datas In.");
1107 if (dataMode_ == MEDIA_AUDIO_ONLY) {
1108 FlushBuffer();
1109 return;
1110 }
1111
1112 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
1113 uint32_t nextKey = 0;
1114 {
1115 std::lock_guard<std::mutex> lock(notifyMutex_);
1116 if (!keyIndexList_.empty() && keyIndexList_.back() > 0) {
1117 MEDIA_LOGD("find next key listsize %{public}zu, back:%{public}d.", keyIndexList_.size(),
1118 keyIndexList_.back());
1119 nextKey = keyIndexList_.back();
1120 keyIndexList_.clear();
1121 keyIndexList_.push_back(nextKey);
1122 }
1123 }
1124
1125 MEDIA_LOGD("erase between 0 to next Video Frame %{public}d.", nextKey);
1126 DeleteHeadDatas(nextKey, false);
1127 nextKey = FindNextDeleteVideoIndex();
1128 DeleteHeadDatas(nextKey, true);
1129 std::string indexs;
1130
1131 MEDIA_LOGD("circularBuffer_ size: %{public}zu.", circularBuffer_.size());
1132 for (auto &keyIndex : keyIndexList_) {
1133 indexs += std::to_string(keyIndex) + ", ";
1134 MEDIA_LOGD("keyIndex update to %{public}d.", keyIndex);
1135 }
1136
1137 MEDIA_LOGD("current keyIndex: %{public}s.", indexs.c_str());
1138 }
1139
DeleteHeadDatas(uint32_t size, bool forceDelete)1140 void BufferDispatcher::DeleteHeadDatas(uint32_t size, bool forceDelete)
1141 {
1142 SHARING_LOGI("%{public}s, size %{public}d.", __FUNCTION__, size);
1143 if (size <= 0) {
1144 MEDIA_LOGW("invalid Size, dispatcherId: %{public}u!", GetDispatcherId());
1145 return;
1146 }
1147
1148 for (size_t i = 0; i < size; i++) {
1149 if (HeadFrameNeedReserve() && !forceDelete) {
1150 MEDIA_LOGD("index %{public}zu need reserve.", i);
1151 break;
1152 }
1153 if (circularBuffer_.empty()) {
1154 return;
1155 }
1156 DataSpec::Ptr retBuff = circularBuffer_.front();
1157 MEDIA_LOGD("BufferDispatcher pop out headtype %{public}d.", retBuff->mediaData->mediaType);
1158 retBuff->mediaData->mediaType == MEDIA_TYPE_AUDIO ? audioFrameCnt_-- : videoFrameCnt_--;
1159 if (HeadFrameNeedReserve()) {
1160 MEDIA_LOGW("dispatcherId: %{public}u, need reserve but pop, mediaType: "
1161 "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64 ".",
1162 GetDispatcherId(), int32_t(retBuff->mediaData->mediaType),
1163 retBuff->mediaData->keyFrame ? "true" : "false", retBuff->mediaData->pts);
1164 }
1165
1166 MEDIA_LOGD(
1167 "dispatcherId: %{public}u, delete data, mediaType: %{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64
1168 ", reserveFlag: %{public}x.",
1169 GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1170 circularBuffer_.front()->mediaData->keyFrame ? "true" : "false", circularBuffer_.front()->mediaData->pts,
1171 circularBuffer_.front()->reserveFlag.load());
1172 circularBuffer_.pop_front();
1173 ReturnIdleBuffer(retBuff);
1174 UpdateIndex();
1175 }
1176
1177 if (circularBuffer_.size() < baseBufferCapacity_ && circularBuffer_.capacity() > baseBufferCapacity_) {
1178 MEDIA_LOGE("capacity return to base %{public}d.", baseBufferCapacity_);
1179 circularBuffer_.set_capacity(baseBufferCapacity_);
1180 }
1181 }
1182
UpdateIndex()1183 void BufferDispatcher::UpdateIndex()
1184 {
1185 MEDIA_LOGD("trace.");
1186 std::lock_guard<std::mutex> locker(notifyMutex_);
1187 if (!keyIndexList_.empty() && keyIndexList_.front() == 0) {
1188 keyIndexList_.pop_front();
1189 MEDIA_LOGD("BufferDispatcher pop out first 0 keyIndex after listsize %{public}zu.", keyIndexList_.size());
1190 }
1191
1192 for (auto &keyIndex : keyIndexList_) {
1193 if (keyIndex > 0) {
1194 keyIndex--;
1195 }
1196 }
1197
1198 for (auto &[recvId, notifier] : notifiers_) {
1199 if (notifier->videoIndex > 0 && notifier->videoIndex != INVALID_INDEX) {
1200 notifier->videoIndex--;
1201 }
1202 if (notifier->audioIndex > 0 && notifier->audioIndex != INVALID_INDEX) {
1203 notifier->audioIndex--;
1204 }
1205 }
1206
1207 if (lastVideoIndex_ > 0 && lastVideoIndex_ != INVALID_INDEX) {
1208 lastVideoIndex_--;
1209 }
1210
1211 if (lastAudioIndex_ > 0 && lastAudioIndex_ != INVALID_INDEX) {
1212 lastAudioIndex_--;
1213 }
1214 }
1215
FindNextDeleteVideoIndex()1216 uint32_t BufferDispatcher::FindNextDeleteVideoIndex()
1217 {
1218 MEDIA_LOGD("trace.");
1219 for (size_t i = 0; i < circularBuffer_.size(); i++) {
1220 if (circularBuffer_[i]->mediaData != nullptr && circularBuffer_[i]->mediaData->mediaType == MEDIA_TYPE_VIDEO) {
1221 return i;
1222 }
1223 }
1224
1225 return 0;
1226 }
1227
FindLastIndex(MediaType type)1228 uint32_t BufferDispatcher::FindLastIndex(MediaType type)
1229 {
1230 SHARING_LOGD("trace.");
1231 if (circularBuffer_.empty()) {
1232 return INVALID_INDEX;
1233 }
1234
1235 return type == MEDIA_TYPE_AUDIO ? lastAudioIndex_ : lastVideoIndex_;
1236 }
1237
UpdateReceiverReadIndex(uint32_t receiverId, const uint32_t readIndex, MediaType type)1238 void BufferDispatcher::UpdateReceiverReadIndex(uint32_t receiverId, const uint32_t readIndex, MediaType type)
1239 {
1240 MEDIA_LOGD("trace.");
1241 uint32_t nextIndex = FindNextIndex(readIndex, type, receiverId);
1242 bool noAvaliableData = false;
1243 if (nextIndex == readIndex) {
1244 noAvaliableData = true;
1245 }
1246
1247 auto notifier = GetNotifierByReceiverId(receiverId);
1248 if (notifier == nullptr) {
1249 SHARING_LOGE("notifier is nullptr.");
1250 return;
1251 }
1252
1253 bool readOver = circularBuffer_.size() - readIndex < 3;
1254 if (readOver && notifier->NeedAcceleration() && type == MEDIA_TYPE_VIDEO) {
1255 SHARING_LOGD("BufferDispatcher SendAccelerationDone.");
1256 notifier->SendAccelerationDone();
1257 }
1258
1259 notifier->SetNeedUpdate(noAvaliableData, type);
1260
1261 if (type == MEDIA_TYPE_VIDEO) {
1262 notifier->videoIndex = nextIndex;
1263 } else if (type == MEDIA_TYPE_AUDIO) {
1264 notifier->audioIndex = nextIndex;
1265 } else {
1266 notifier->videoIndex = nextIndex;
1267 notifier->audioIndex = nextIndex;
1268 }
1269
1270 MEDIA_LOGD("After UpdateReceiverReadIndex type %{public}d, aindex %{public}d, vindex %{public}d.", type,
1271 notifier->audioIndex, notifier->videoIndex);
1272 }
1273
FindNextIndex(uint32_t index, MediaType type)1274 uint32_t BufferDispatcher::FindNextIndex(uint32_t index, MediaType type)
1275 {
1276 MEDIA_LOGD("trace.");
1277 if (index + 1 >= circularBuffer_.size() || index == INVALID_INDEX) {
1278 return index;
1279 }
1280
1281 if (type == MEDIA_TYPE_AV) {
1282 return index + 1;
1283 }
1284
1285 for (size_t i = index + 1; i < circularBuffer_.size(); i++) {
1286 if (circularBuffer_[i] && circularBuffer_[i]->mediaData && circularBuffer_[i]->mediaData->mediaType == type) {
1287 if (keyOnly_ && type == MEDIA_TYPE_VIDEO && !IsKeyVideoFrame(circularBuffer_[i])) {
1288 continue;
1289 } else {
1290 return i;
1291 }
1292 }
1293 }
1294
1295 return index;
1296 }
1297
FindNextIndex(uint32_t index, MediaType type, uint32_t receiverId)1298 uint32_t BufferDispatcher::FindNextIndex(uint32_t index, MediaType type, uint32_t receiverId)
1299 {
1300 MEDIA_LOGD("trace.");
1301 if (index + 1 >= circularBuffer_.size() || index == INVALID_INDEX) {
1302 return index;
1303 }
1304
1305 if (type == MEDIA_TYPE_AV) {
1306 return index + 1;
1307 }
1308
1309 auto notifier = GetNotifierByReceiverId(receiverId);
1310 if (notifier == nullptr) {
1311 SHARING_LOGE("FindNextIndex GetNotifier nullptr.");
1312 return INVALID_INDEX;
1313 }
1314
1315 bool keyModeReceiver = notifier->IsKeyModeReceiver();
1316 for (size_t i = index + 1; i < circularBuffer_.size(); i++) {
1317 if (circularBuffer_[i] && circularBuffer_[i]->mediaData && circularBuffer_[i]->mediaData->mediaType == type) {
1318 if ((keyOnly_ || keyModeReceiver) && type == MEDIA_TYPE_VIDEO) {
1319 if (!IsKeyVideoFrame(circularBuffer_[i])) {
1320 continue;
1321 } else {
1322 for (size_t bIndex = index + 1; bIndex < i; bIndex++) {
1323 SetReceiverReadFlag(receiverId, circularBuffer_[bIndex]);
1324 }
1325 return i;
1326 }
1327 } else {
1328 return i;
1329 }
1330 }
1331 }
1332
1333 return index;
1334 }
1335
ResetAllIndex()1336 void BufferDispatcher::ResetAllIndex()
1337 {
1338 SHARING_LOGD("trace.");
1339 std::lock_guard<std::mutex> locker(notifyMutex_);
1340 keyIndexList_.clear();
1341 for (auto &[recvId, notifier] : notifiers_) {
1342 notifier->videoIndex = INVALID_INDEX;
1343 notifier->audioIndex = INVALID_INDEX;
1344 }
1345
1346 videoNeedActivate_ = true;
1347 audioNeedActivate_ = true;
1348 lastAudioIndex_ = INVALID_INDEX;
1349 lastVideoIndex_ = INVALID_INDEX;
1350 }
1351
IsDataReaded(uint32_t receiverId, DataSpec::Ptr &dataSpec)1352 bool BufferDispatcher::IsDataReaded(uint32_t receiverId, DataSpec::Ptr &dataSpec)
1353 {
1354 MEDIA_LOGD("trace.");
1355 uint32_t index = FindReceiverIndex(receiverId);
1356 if (index == INVALID_INDEX) {
1357 return false;
1358 }
1359
1360 return dataSpec->reserveFlag & RECV_FLAG_BASE << index;
1361 }
1362
IsVideoData(const DataSpec::Ptr &dataSpec)1363 bool BufferDispatcher::IsVideoData(const DataSpec::Ptr &dataSpec)
1364 {
1365 MEDIA_LOGD("trace.");
1366 if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1367 SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1368 return false;
1369 }
1370
1371 return dataSpec->mediaData->mediaType == MEDIA_TYPE_VIDEO;
1372 }
1373
IsAudioData(const DataSpec::Ptr &dataSpec)1374 bool BufferDispatcher::IsAudioData(const DataSpec::Ptr &dataSpec)
1375 {
1376 MEDIA_LOGD("trace.");
1377 if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1378 SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1379 return false;
1380 }
1381
1382 return dataSpec->mediaData->mediaType == MEDIA_TYPE_AUDIO;
1383 }
1384
IsKeyVideoFrame(const DataSpec::Ptr &dataSpec)1385 bool BufferDispatcher::IsKeyVideoFrame(const DataSpec::Ptr &dataSpec)
1386 {
1387 MEDIA_LOGD("trace.");
1388 if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1389 SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1390 return false;
1391 }
1392
1393 return IsVideoData(dataSpec) && dataSpec->mediaData->keyFrame;
1394 }
1395
HeadFrameNeedReserve()1396 bool BufferDispatcher::HeadFrameNeedReserve()
1397 {
1398 MEDIA_LOGD("trace.");
1399 if (!circularBuffer_.empty()) {
1400 uint8_t temp = readRefFlag_;
1401 MEDIA_LOGD("IsHeadFrameNeedReserve Head reserveFlag %{public}d readRefFlag_ %{public}d.",
1402 circularBuffer_.front()->reserveFlag.load(), readRefFlag_);
1403 return temp ^ circularBuffer_.front()->reserveFlag;
1404 }
1405
1406 return false;
1407 }
1408
NeedExtendToDBCapacity()1409 bool BufferDispatcher::NeedExtendToDBCapacity()
1410 {
1411 MEDIA_LOGD("trace.");
1412 std::shared_lock<std::shared_mutex> locker(bufferMutex_);
1413 return (circularBuffer_.size() >= circularBuffer_.capacity() &&
1414 circularBuffer_.capacity() < doubleBufferCapacity_ && HeadFrameNeedReserve());
1415 }
1416
NeedRestoreToNormalCapacity()1417 bool BufferDispatcher::NeedRestoreToNormalCapacity()
1418 {
1419 MEDIA_LOGD("trace.");
1420 std::shared_lock<std::shared_mutex> locker(bufferMutex_);
1421 return audioFrameCnt_ >= circularBuffer_.capacity() && circularBuffer_.capacity() != INITIAL_BUFFER_CAPACITY;
1422 }
1423
ReCalculateCapacity(bool keyFrame)1424 void BufferDispatcher::ReCalculateCapacity(bool keyFrame)
1425 {
1426 MEDIA_LOGD("trace.");
1427 baseCounter_++;
1428 if (baseCounter_ >= maxBufferCapacity_) {
1429 SHARING_LOGE("BufferDispatcher too many Audiodata need Set Capacity to default.");
1430 }
1431
1432 if (baseCounter_ >= circularBuffer_.capacity() && !keyFrame) {
1433 uint32_t tmpSize = circularBuffer_.capacity() + bufferCapacityIncrement_ < maxBufferCapacity_
1434 ? circularBuffer_.capacity() + bufferCapacityIncrement_
1435 : maxBufferCapacity_;
1436 doubleBufferCapacity_ = tmpSize * 2 < maxBufferCapacity_ ? tmpSize * 2 : maxBufferCapacity_; // 2: increasement
1437 SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d in adaptive capacity calculating.", tmpSize);
1438 SetBufferCapacity(tmpSize);
1439 return;
1440 }
1441
1442 if (keyFrame) {
1443 baseBufferCapacity_ = baseCounter_ + bufferCapacityIncrement_ < maxBufferCapacity_
1444 ? baseCounter_ + bufferCapacityIncrement_
1445 : maxBufferCapacity_;
1446 if (baseBufferCapacity_ < INITIAL_BUFFER_CAPACITY) {
1447 baseBufferCapacity_ = INITIAL_BUFFER_CAPACITY;
1448 }
1449 doubleBufferCapacity_ = baseBufferCapacity_ * 2 < maxBufferCapacity_ // 2: increasement
1450 ? baseBufferCapacity_ * 2 // 2: increasement
1451 : maxBufferCapacity_;
1452 gop_ = baseCounter_;
1453 capacityEvaluating_ = gop_ > 0 ? false : true;
1454 SetBufferCapacity(baseBufferCapacity_);
1455 baseCounter_ = 0;
1456 SHARING_LOGI(
1457 "The gop is %{public}d and BufferDispatcher buffer Extended to %{public}d on base capacity confirmed.",
1458 GetCurrentGop(), baseBufferCapacity_);
1459 }
1460 }
1461
1462 int32_t BufferDispatcher::NotifyThreadWorker(void *userParam)
1463 {
1464 SHARING_LOGI("BufferDispatcher DataNotifier thread in.");
1465 RETURN_INVALID_IF_NULL(userParam);
1466 BufferDispatcher *dispatcher = (BufferDispatcher *)userParam;
1467 while (dispatcher->running_) {
1468 std::unique_lock<std::mutex> locker(dispatcher->notifyMutex_);
1469 uint32_t notifyRef = dispatcher->dataBitRef_ & dispatcher->recvBitRef_;
1470 MEDIA_LOGD("DataBitRef %{public}u recvBitRef_ %{public}d notifyRef_ %{public}d.",
1471 dispatcher->dataBitRef_.load(), dispatcher->recvBitRef_.load(), notifyRef);
1472
1473 for (auto &[recvId, notifier] : dispatcher->notifiers_) {
1474 auto index = notifier->GetReadIndex();
1475 if (((RECV_FLAG_BASE << (index * FIX_OFFSET_TWO)) & notifyRef) ||
1476 ((RECV_FLAG_BASE << (index * FIX_OFFSET_TWO + FIX_OFFSET_ONE)) & notifyRef)) {
1477 MediaType notifyType;
1478 if (notifier->IsMixedReceiver()) {
1479 notifyType = MEDIA_TYPE_AV;
1480 } else {
1481 notifyType = (((RECV_FLAG_BASE << (index * FIX_OFFSET_TWO)) & notifyRef) ? MEDIA_TYPE_AUDIO
1482 : MEDIA_TYPE_VIDEO);
1483 }
1484 MEDIA_LOGD("notify the receiveId: %{public}d, notifyType: %{public}d, notifyRef: %{public}x.", recvId,
1485 notifyType, notifyRef);
1486 notifier->NotifyDataReceiver(notifyType);
1487 }
1488 }
1489
1490 dispatcher->dataCV_.wait(locker, [&dispatcher]() { return dispatcher->continueNotify_.load(); });
1491 dispatcher->continueNotify_ = false;
1492 }
1493
1494 return 0;
1495 }
1496
NotifyReadReady(uint32_t receiverId, MediaType type)1497 void BufferDispatcher::NotifyReadReady(uint32_t receiverId, MediaType type)
1498 {
1499 MEDIA_LOGD("trace.");
1500 auto notifier = GetNotifierByReceiverId(receiverId);
1501 if (notifier == nullptr) {
1502 SHARING_LOGE("notifier is nullptr.");
1503 return;
1504 }
1505
1506 std::shared_lock<std::shared_mutex> lock(bufferMutex_);
1507 std::unique_lock<std::mutex> locker(notifyMutex_);
1508 if (type == MEDIA_TYPE_AV) {
1509 SetReceiverReadRef(receiverId, MEDIA_TYPE_VIDEO, true);
1510 SetReceiverReadRef(receiverId, MEDIA_TYPE_AUDIO, true);
1511 } else {
1512 SetReceiverReadRef(receiverId, type, true);
1513 }
1514
1515 bool dataAvaliable = notifier->DataAvailable(type);
1516 MEDIA_LOGD("receiverId %{public}d MediaType %{public}d dataAvaliable %{public}d.", receiverId, type,
1517 dataAvaliable);
1518 if (type == MEDIA_TYPE_AV) {
1519 SetReceiverDataRef(receiverId, MEDIA_TYPE_VIDEO, dataAvaliable);
1520 SetReceiverDataRef(receiverId, MEDIA_TYPE_AUDIO, dataAvaliable);
1521 } else {
1522 SetReceiverDataRef(receiverId, type, dataAvaliable);
1523 }
1524
1525 if (!dataAvaliable) {
1526 return;
1527 }
1528
1529 continueNotify_ = true;
1530 dataCV_.notify_one();
1531 }
1532
SetDataRef(uint32_t bitref)1533 void BufferDispatcher::SetDataRef(uint32_t bitref)
1534 {
1535 SHARING_LOGD("trace.");
1536 dataBitRef_ &= bitref;
1537 }
1538
GetDataRef()1539 uint32_t BufferDispatcher::GetDataRef()
1540 {
1541 SHARING_LOGD("trace.");
1542 return dataBitRef_;
1543 }
1544
SetReadRef(uint32_t bitref)1545 void BufferDispatcher::SetReadRef(uint32_t bitref)
1546 {
1547 SHARING_LOGD("trace.");
1548 recvBitRef_ &= bitref;
1549 }
1550
GetReadRef()1551 uint32_t BufferDispatcher::GetReadRef()
1552 {
1553 SHARING_LOGD("trace.");
1554 return recvBitRef_;
1555 }
1556
ActiveDataRef(MediaType type, bool keyFrame)1557 void BufferDispatcher::ActiveDataRef(MediaType type, bool keyFrame)
1558 {
1559 MEDIA_LOGD("trace.");
1560 std::unique_lock<std::mutex> locker(notifyMutex_);
1561 uint32_t bitRef = 0x0000;
1562 for (auto &[recvId, notifier] : notifiers_) {
1563 auto index = notifier->GetReadIndex();
1564 if (type == MEDIA_TYPE_AUDIO) {
1565 bitRef |= RECV_FLAG_BASE << (index * 2); // 2: fix offset, get audio notifyer id
1566 continue;
1567 }
1568 bool keyModeReceiver = false;
1569 keyModeReceiver = notifier->IsKeyModeReceiver();
1570 if (keyFrame && keyModeReceiver && keyIndexList_.empty()) {
1571 notifier->videoIndex = circularBuffer_.size() - 1;
1572 }
1573 if ((!keyModeReceiver || (keyModeReceiver && keyFrame))) {
1574 if (index != INVALID_INDEX) {
1575 bitRef |= RECV_FLAG_BASE << (index * 2 + 1); // 2: fix offset, get video notifyer id
1576 }
1577 }
1578 }
1579
1580 dataBitRef_ |= bitRef;
1581 }
1582
SetReceiverDataRef(uint32_t receiverId, MediaType type, bool ready)1583 void BufferDispatcher::SetReceiverDataRef(uint32_t receiverId, MediaType type, bool ready)
1584 {
1585 MEDIA_LOGD("trace.");
1586 uint32_t index = FindReceiverIndex(receiverId);
1587 if (index == INVALID_INDEX) {
1588 return;
1589 }
1590
1591 if (type == MEDIA_TYPE_AUDIO) {
1592 uint32_t audioBit = index * 2;
1593 uint32_t bitRef = RECV_FLAG_BASE << audioBit;
1594 MEDIA_LOGD("Audio recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1595 if (!ready) {
1596 bitRef = ~bitRef;
1597 dataBitRef_ &= bitRef;
1598 } else {
1599 dataBitRef_ |= bitRef;
1600 }
1601 } else if (type == MEDIA_TYPE_VIDEO) {
1602 uint32_t videoBit = index * 2 + 1;
1603 uint32_t bitRef = RECV_FLAG_BASE << videoBit;
1604 MEDIA_LOGD("Video recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1605 if (!ready) {
1606 bitRef = ~bitRef;
1607 dataBitRef_ &= bitRef;
1608 } else {
1609 dataBitRef_ |= bitRef;
1610 }
1611 }
1612 }
1613
SetReceiverReadRef(uint32_t receiverId, MediaType type, bool ready)1614 void BufferDispatcher::SetReceiverReadRef(uint32_t receiverId, MediaType type, bool ready)
1615 {
1616 MEDIA_LOGD("trace.");
1617 uint32_t index = FindReceiverIndex(receiverId);
1618 if (index == INVALID_INDEX) {
1619 return;
1620 }
1621
1622 if (type == MEDIA_TYPE_AUDIO) {
1623 uint32_t audioBit = index * 2;
1624 uint32_t bitRef = RECV_FLAG_BASE << audioBit;
1625 MEDIA_LOGD("Audio recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1626
1627 if (!ready) {
1628 bitRef = ~bitRef;
1629 recvBitRef_ &= bitRef;
1630 } else {
1631 recvBitRef_ |= bitRef;
1632 }
1633 } else if (type == MEDIA_TYPE_VIDEO) {
1634 uint32_t videoBit = index * 2 + 1;
1635 uint32_t bitRef = RECV_FLAG_BASE << videoBit;
1636 MEDIA_LOGD("Video recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1637
1638 if (!ready) {
1639 bitRef = ~bitRef;
1640 recvBitRef_ &= bitRef;
1641 } else {
1642 recvBitRef_ |= bitRef;
1643 }
1644 }
1645 }
1646
GetReceiverDataRef(uint32_t receiverId)1647 uint32_t BufferDispatcher::GetReceiverDataRef(uint32_t receiverId)
1648 {
1649 SHARING_LOGD("trace.");
1650 return 0;
1651 }
1652
GetReceiverReadRef(uint32_t receiverId)1653 uint32_t BufferDispatcher::GetReceiverReadRef(uint32_t receiverId)
1654 {
1655 SHARING_LOGD("trace.");
1656 uint32_t retBitRef = GetReceiverIndexRef(receiverId);
1657 retBitRef &= recvBitRef_;
1658 return retBitRef;
1659 }
1660
GetReceiverIndexRef(uint32_t receiverId)1661 uint32_t BufferDispatcher::GetReceiverIndexRef(uint32_t receiverId)
1662 {
1663 SHARING_LOGD("trace.");
1664 uint32_t index = FindReceiverIndex(receiverId);
1665 uint32_t audioBit = index * 2;
1666 uint32_t videoBit = index * 2 + 1;
1667 uint32_t retBitRef = 0x0000;
1668 retBitRef |= RECV_FLAG_BASE << audioBit;
1669 retBitRef |= RECV_FLAG_BASE << videoBit;
1670 return retBitRef;
1671 }
1672
ClearReadBit(uint32_t receiverId, MediaType type)1673 void BufferDispatcher::ClearReadBit(uint32_t receiverId, MediaType type)
1674 {
1675 MEDIA_LOGD("trace.");
1676 std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1677 if (type != MEDIA_TYPE_AV) {
1678 SetReceiverReadRef(receiverId, type, false);
1679 } else {
1680 SetReceiverReadRef(receiverId, MEDIA_TYPE_VIDEO, false);
1681 SetReceiverReadRef(receiverId, MEDIA_TYPE_AUDIO, false);
1682 }
1683 }
1684
ClearDataBit(uint32_t receiverId, MediaType type)1685 void BufferDispatcher::ClearDataBit(uint32_t receiverId, MediaType type)
1686 {
1687 MEDIA_LOGD("trace.");
1688 std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1689 if (type != MEDIA_TYPE_AV) {
1690 SetReceiverDataRef(receiverId, type, false);
1691 } else {
1692 SetReceiverDataRef(receiverId, MEDIA_TYPE_VIDEO, false);
1693 SetReceiverDataRef(receiverId, MEDIA_TYPE_AUDIO, false);
1694 }
1695 }
1696
IsRead(uint32_t receiverId, uint32_t index)1697 bool BufferDispatcher::IsRead(uint32_t receiverId, uint32_t index)
1698 {
1699 MEDIA_LOGD("trace.");
1700 if (index >= circularBuffer_.size()) {
1701 return true;
1702 } else {
1703 return IsDataReaded(receiverId, circularBuffer_.at(index));
1704 }
1705 }
1706
ActivateReceiverIndex(uint32_t index, MediaType type)1707 void BufferDispatcher::ActivateReceiverIndex(uint32_t index, MediaType type)
1708 {
1709 MEDIA_LOGD("trace.");
1710 std::unique_lock<std::mutex> lock(notifyMutex_);
1711 for (auto &[recvId, notifier] : notifiers_) {
1712 if (type == MEDIA_TYPE_VIDEO) {
1713 if (notifier->videoIndex == INVALID_INDEX) {
1714 notifier->videoIndex = index;
1715 SHARING_LOGD("RecvId %{public}d Activate %{public}d.", notifier->GetReceiverId(), notifier->videoIndex);
1716 videoNeedActivate_ = false;
1717 }
1718 } else {
1719 if (notifier->audioIndex == INVALID_INDEX) {
1720 notifier->audioIndex = index;
1721 SHARING_LOGD("RecvId %{public}d Activate %{public}d.", notifier->GetReceiverId(), notifier->audioIndex);
1722 audioNeedActivate_ = false;
1723 }
1724 }
1725 }
1726 }
UnlockWaitingReceiverIndex(MediaType type)1727 void BufferDispatcher::UnlockWaitingReceiverIndex(MediaType type)
1728 {
1729 SHARING_LOGD("trace.");
1730 }
1731
SetReceiverReadFlag(uint32_t receiverId, DataSpec::Ptr &dataSpec)1732 void BufferDispatcher::SetReceiverReadFlag(uint32_t receiverId, DataSpec::Ptr &dataSpec)
1733 {
1734 MEDIA_LOGD("trace.");
1735 RETURN_IF_NULL(dataSpec);
1736 uint32_t index = FindReceiverIndex(receiverId);
1737 if (index != INVALID_INDEX) {
1738 dataSpec->reserveFlag |= RECV_FLAG_BASE << index;
1739 MEDIA_LOGD("mediaType: %{public}d, pts: %{public}" PRIu64
1740 ", reserveFlag: %{public}x, receiverId: %{public}d, index: %{public}d.",
1741 dataSpec->mediaData->mediaType, dataSpec->mediaData->pts, dataSpec->reserveFlag.load(), receiverId,
1742 index);
1743 }
1744 }
1745
CancelReserve()1746 void BufferDispatcher::CancelReserve()
1747 {
1748 SHARING_LOGD("trace.");
1749 for (auto &data : circularBuffer_) {
1750 data->reserveFlag = 0xff;
1751 }
1752 }
1753
SetSpsNalu(MediaData::Ptr spsbuf)1754 void BufferDispatcher::SetSpsNalu(MediaData::Ptr spsbuf)
1755 {
1756 SHARING_LOGD("trace.");
1757 spsBuf_ = spsbuf;
1758 }
1759
GetSPS()1760 const MediaData::Ptr BufferDispatcher::GetSPS()
1761 {
1762 MEDIA_LOGD("trace.");
1763 return spsBuf_;
1764 }
1765
SetPpsNalu(MediaData::Ptr ppsbuf)1766 void BufferDispatcher::SetPpsNalu(MediaData::Ptr ppsbuf)
1767 {
1768 SHARING_LOGD("trace.");
1769 ppsBuf_ = ppsbuf;
1770 }
1771
GetPPS()1772 const MediaData::Ptr BufferDispatcher::GetPPS()
1773 {
1774 MEDIA_LOGD("trace.");
1775 return ppsBuf_;
1776 }
1777
GetCurrentGop()1778 uint32_t BufferDispatcher::GetCurrentGop()
1779 {
1780 SHARING_LOGD("trace.");
1781 return gop_;
1782 }
1783
OnKeyRedirect()1784 void BufferDispatcher::OnKeyRedirect()
1785 {
1786 SHARING_LOGD("trace.");
1787 std::unique_lock<std::mutex> lock(notifyMutex_);
1788 if (keyIndexList_.empty()) {
1789 return;
1790 }
1791
1792 auto nextIndex = keyIndexList_.back();
1793
1794 for (auto &[recvId, notifier] : notifiers_) {
1795 if (notifier->IsKeyRedirectReceiver()) {
1796 SHARING_LOGD("receiverId: %{public}u, videoIndex: %{public}d, nextIndex: %{public}d.",
1797 notifier->GetReceiverId(), notifier->videoIndex, nextIndex);
1798 auto curIndex = notifier->videoIndex;
1799 notifier->videoIndex = nextIndex;
1800 for (auto i = curIndex; i < nextIndex; i++) {
1801 SetReceiverReadFlag(notifier->GetReceiverId(), circularBuffer_[i]);
1802 }
1803 if (!rapidMode_) {
1804 auto receiver = notifier->GetBufferReceiver();
1805 if (receiver != nullptr) {
1806 receiver->EnableKeyRedirect(false);
1807 }
1808 }
1809 }
1810 }
1811 }
1812
EnableRapidMode(bool enable)1813 void BufferDispatcher::EnableRapidMode(bool enable)
1814 {
1815 SHARING_LOGD("trace.");
1816 rapidMode_ = enable;
1817 }
1818
1819 } // namespace Sharing
1820 } // namespace OHOS
1821