1 /*
2  * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OHOS_SHARING_BUFFER_DISPATCHER_H
17 #define OHOS_SHARING_BUFFER_DISPATCHER_H
18 #include <condition_variable>
19 #include <functional>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <shared_mutex>
24 #include <thread>
25 #include <unordered_map>
26 #include "common/identifier.h"
27 #include "common/media_log.h"
28 #include "media_channel_def.h"
29 #include "utils/circular_buffer.h"
30 #include "utils/data_buffer.h"
31 #include "utils/timeout_timer.h"
32 
33 constexpr size_t INITIAL_BUFFER_CAPACITY = 50;
34 constexpr size_t MAX_RECEIVER_SIZE = 16;
35 constexpr uint32_t INVALID_INDEX = static_cast<uint32_t>(-1);
36 constexpr uint32_t RECV_FLAG_BASE = 0x0001;
37 enum MediaDispacherMode { MEDIA_VIDEO_ONLY, MEDIA_AUDIO_ONLY, MEDIA_VIDEO_AUDIO_MIXED };
38 namespace OHOS {
39 namespace Sharing {
40 
41 class IBufferReader : public std::enable_shared_from_this<IBufferReader> {
42 public:
43     using Ptr = std::shared_ptr<IBufferReader>;
44 
IBufferReader()45     IBufferReader(){};
~IBufferReader()46     virtual ~IBufferReader(){};
47 
GetLatestAudioIndex()48     uint32_t GetLatestAudioIndex()
49     {
50         MEDIA_LOGD("trace.");
51         return lastAudioIndex_;
52     }
53 
GetLatestVideoIndex()54     uint32_t GetLatestVideoIndex()
55     {
56         MEDIA_LOGD("trace.");
57         return lastVideoIndex_;
58     }
59 
EnableKeyRedirect(bool enable)60     void EnableKeyRedirect(bool enable)
61     {
62         MEDIA_LOGD("trace.");
63         keyRedirect_ = enable;
64     }
65 
66 public:
67     virtual bool IsRead(uint32_t receiverId, uint32_t index) = 0;
68     virtual void ClearReadBit(uint32_t receiverId, MediaType type) = 0;
69     virtual void ClearDataBit(uint32_t receiverId, MediaType type) = 0;
70     virtual void NotifyReadReady(uint32_t receiverId, MediaType type) = 0;
71 
72     virtual int32_t ReadBufferData(uint32_t receiverId, MediaType type,
73                                    std::function<void(const MediaData::Ptr &data)> cb) = 0;
74 
75     virtual size_t GetBufferSize() = 0;
76     virtual uint32_t GetDispatcherId() = 0;
77     virtual const MediaData::Ptr GetSPS() = 0;
78     virtual const MediaData::Ptr GetPPS() = 0;
79 
80 protected:
81     uint32_t lastAudioIndex_ = INVALID_INDEX;
82     uint32_t lastVideoIndex_ = INVALID_INDEX;
83     volatile std::atomic<bool> keyRedirect_ = false;
84     volatile std::atomic<uint32_t> recvBitRef_ = 0x0000;
85     volatile std::atomic<uint32_t> dataBitRef_ = 0x0000;
86 };
87 
88 class IBufferReceiverListener {
89 public:
90     using Ptr = std::shared_ptr<IBufferReceiverListener>;
91 
IBufferReceiverListener()92     IBufferReceiverListener(){};
~IBufferReceiverListener()93     virtual ~IBufferReceiverListener(){};
94 
95     virtual void OnAccelerationDoneNotify() = 0;
96     virtual void OnKeyModeNotify(bool enable) = 0;
97 };
98 
99 class BufferReceiver : public IdentifierInfo {
100 public:
101     using Ptr = std::shared_ptr<BufferReceiver>;
102 
BufferReceiver()103     BufferReceiver(){};
~BufferReceiver()104     virtual ~BufferReceiver(){};
105 
106     virtual bool IsMixedReceiver();
107     virtual int32_t OnMediaDataNotify();
108     virtual int32_t OnAudioDataNotify();
109     virtual int32_t OnVideoDataNotify();
110     virtual int32_t RequestRead(MediaType type, std::function<void(const MediaData::Ptr &data)> cb);
111     virtual void SetSource(IBufferReader::Ptr dataReader);
112 
113     uint32_t GetReceiverId();
114     uint32_t GetDispatcherId();
115 
116     void NotifyReadStop();
117     void NotifyReadStart();
118     void EnableKeyMode(bool enable);
119 
120     bool IsKeyMode();
121     bool IsKeyRedirect();
122 
123     const MediaData::Ptr GetSPS();
124     const MediaData::Ptr GetPPS();
125 
126     bool NeedAcceleration();
127     void DisableAcceleration();
128     virtual void SendAccelerationDone();
129     virtual void EnableKeyRedirect(bool enable);
130     void SetBufferReceiverListener(std::weak_ptr<IBufferReceiverListener> listener);
131 
132 protected:
133     std::mutex mutex_;
134     std::atomic<bool> dataReady_ = false;
135     std::atomic<bool> nonBlockAudio_ = false;
136     std::atomic<bool> nonBlockVideo_ = false;
137 
138     std::atomic<bool> firstVRead_ = true;
139     std::atomic<bool> firstARead_ = true;
140     std::atomic<bool> firstMRead_ = true;
141 
142     std::condition_variable notifyAudio_;
143     std::condition_variable notifyVideo_;
144     std::condition_variable notifyData_;
145     std::weak_ptr<IBufferReceiverListener> listener_;
146 
147     volatile std::atomic<bool> mixed_ = false;
148     volatile std::atomic<bool> keyOnly_ = false;
149     volatile std::atomic<bool> keyRedirect_ = false;
150     volatile std::atomic<bool> accelerationDone_ = false;
151 
152     IBufferReader::Ptr bufferReader_ = nullptr;
153 };
154 
155 class BufferDispatcherListener {
156 public:
157     using Ptr = std::shared_ptr<BufferDispatcherListener>;
158     virtual ~BufferDispatcherListener() = default;
159 
160     virtual void OnWriteTimeout() = 0;
161 };
162 
163 class BufferDispatcher : public IBufferReader,
164                          public IdentifierInfo {
165 public:
166     using Ptr = std::shared_ptr<BufferDispatcher>;
167 
168     class DataNotifier {
169     public:
170         using Ptr = std::shared_ptr<DataNotifier>;
171 
DataNotifier()172         DataNotifier(){};
~DataNotifier()173         ~DataNotifier(){};
174 
SetReadIndex(uint32_t index)175         void SetReadIndex(uint32_t index)
176         {
177             MEDIA_LOGD("trace.");
178             readIndex_ = index;
179         }
180 
GetReadIndex()181         uint32_t GetReadIndex()
182         {
183             MEDIA_LOGD("trace.");
184             return readIndex_;
185         }
186 
187     public:
188         void SetBlock();
189         void SendAccelerationDone();
190         void NotifyDataReceiver(MediaType type);
191         void SetNeedUpdate(bool enable, MediaType type);
192         void SetNotifyReceiver(BufferReceiver::Ptr receiver);
193         void SetListenDispatcher(IBufferReader::Ptr dispatcher);
194 
195         bool IsMixedReceiver();
196         bool NeedAcceleration();
197         bool IsKeyModeReceiver();
198         bool IsKeyRedirectReceiver();
199         bool DataAvailable(MediaType type);
200 
201         uint32_t GetReceiverId();
202         uint32_t GetReceiverReadIndex(MediaType type);
203 
204         BufferReceiver::Ptr GetBufferReceiver();
205 
206     public:
207         uint32_t audioIndex = INVALID_INDEX;
208         uint32_t videoIndex = INVALID_INDEX;
209         std::atomic<bool> needUpdateAIndex = true;
210         std::atomic<bool> needUpdateVIndex = true;
211 
212     private:
213         bool block_ = false;
214         uint32_t readIndex_ = INVALID_INDEX;
215         std::weak_ptr<BufferReceiver> receiver_;
216         std::weak_ptr<IBufferReader> dispatcher_;
217     };
218 
219     struct DataSpec {
220         using Ptr = std::shared_ptr<DataSpec>;
221 
222         volatile std::atomic<uint16_t> reserveFlag;
223         uint64_t seq;
224         MediaData::Ptr mediaData;
225     };
226 
227 public:
228     explicit BufferDispatcher(uint32_t maxCapacity = MAX_BUFFER_CAPACITY,
229                      uint32_t capacityIncrement = BUFFER_CAPACITY_INCREMENT);
230     ~BufferDispatcher() override;
231 
232     inline uint32_t GetDispatcherId() override
233     {
234         MEDIA_LOGD("trace.");
235         return GetId();
236     }
237 
238 public:
239     void StopDispatch();
240     void StartDispatch();
241     void CancelReserve();
242     void ReleaseAllReceiver();
243     int32_t AttachReceiver(BufferReceiver::Ptr receiver);
244     int32_t DetachReceiver(BufferReceiver::Ptr receiver);
245     int32_t DetachReceiver(uint32_t receiverId, DataNotifier::Ptr notifier);
246     void SetBufferDispatcherListener(BufferDispatcherListener::Ptr listener);
247 
248     void SetSpsNalu(MediaData::Ptr spsbuf);
249     void SetPpsNalu(MediaData::Ptr ppsbuf);
250     void SetBufferCapacity(size_t capacity);
251     void SetDataMode(MediaDispacherMode dataMode);
252     int32_t InputData(const MediaData::Ptr &data);
253 
254     void FlushBuffer();
255     void ReleaseIdleBuffer();
256     void EnableKeyMode(bool enable);
257     void EnableRapidMode(bool enable);
258     void ClearReadBit(uint32_t receiverId, MediaType type) override;
259     void ClearDataBit(uint32_t receiverId, MediaType type) override;
260     void SetReceiverReadFlag(uint32_t receiverId, DataSpec::Ptr &dataSpec);
261     bool IsRecevierExist(uint32_t receiverId);
262     bool IsRead(uint32_t receiverId, uint32_t index) override;
263 
264     uint32_t GetCurrentGop();
265     size_t GetBufferSize() override;
266     void NotifyReadReady(uint32_t receiverId, MediaType type) override;
267     int32_t ReadBufferData(uint32_t receiverId, MediaType type,
268                            std::function<void(const MediaData::Ptr &data)> cb) override;
269     const MediaData::Ptr GetSPS() override;
270     const MediaData::Ptr GetPPS() override;
271     MediaData::Ptr RequestDataBuffer(MediaType type, uint32_t size);
272     DataNotifier::Ptr GetNotifierByReceiverId(uint32_t receiverId);
273     DataNotifier::Ptr GetNotifierByReceiverPtr(BufferReceiver::Ptr receiver);
274 
275 private:
276     void UpdateIndex();
277     void ResetAllIndex();
278     bool IsVideoData(const DataSpec::Ptr &dataSpec);
279     bool IsAudioData(const DataSpec::Ptr &dataSpec);
280     bool IsKeyVideoFrame(const DataSpec::Ptr &dataSpec);
281     bool IsDataReaded(uint32_t receiverId, DataSpec::Ptr &dataSpec);
282 
283     uint32_t FindNextDeleteVideoIndex();
284     uint32_t FindLastIndex(MediaType type);
285     uint32_t FindNextIndex(uint32_t index, MediaType type);
286     uint32_t FindNextIndex(uint32_t index, MediaType type, uint32_t receiverId);
287 
288     void EraseOldGopDatas();
289     void ReCalculateCapacity(bool keyFrame);
290     void ReturnIdleBuffer(DataSpec::Ptr &data);
291     void DeleteHeadDatas(uint32_t size, bool forceDelete);
292     void PreProcessDataSpec(const DataSpec::Ptr &dataSpec);
293 
294     bool HeadFrameNeedReserve();
295     bool NeedExtendToDBCapacity();
296     bool NeedRestoreToNormalCapacity();
297     int32_t WriteDataIntoBuffer(const DataSpec::Ptr &data);
298 
299     void OnKeyRedirect();
300     void SetDataRef(uint32_t bitref);
301     void SetReadRef(uint32_t bitref);
302     void UnlockWaitingReceiverIndex(MediaType type);
303     void ActiveDataRef(MediaType type, bool keyFrame);
304     void ActivateReceiverIndex(uint32_t index, MediaType type);
305     void SetReceiverDataRef(uint32_t receiverId, MediaType type, bool ready);
306     void SetReceiverReadRef(uint32_t receiverId, MediaType type, bool ready);
307     void UpdateReceiverReadIndex(uint32_t receiverId, const uint32_t readIndex, MediaType type);
308 
309     uint32_t GetDataRef();
310     uint32_t GetReadRef();
311     uint32_t FindReceiverIndex(uint32_t receiverId);
312     uint32_t GetReceiverDataRef(uint32_t receiverId);
313     uint32_t GetReceiverReadRef(uint32_t receiverId);
314     uint32_t GetReceiverIndexRef(uint32_t receiverId);
315     static int32_t NotifyThreadWorker(void *userParam);
316 
317 private:
318     bool running_ = false;
319     bool writing_ = false;
320     bool videoNeedActivate_ = false;
321     bool audioNeedActivate_ = false;
322     bool capacityEvaluating_ = false;
323     volatile bool keyOnly_ = false;
324     volatile bool waitingKey_ = true;
325     volatile bool rapidMode_ = false;
326     uint16_t readRefFlag_ = 0x0000;
327     uint32_t baseCounter_ = 0;
328     uint32_t videoFrameCnt_ = 0;
329     uint32_t audioFrameCnt_ = 0;
330     uint32_t maxBufferCapacity_ = MAX_BUFFER_CAPACITY;
331     uint32_t baseBufferCapacity_ = INITIAL_BUFFER_CAPACITY;
332     uint32_t doubleBufferCapacity_ = INITIAL_BUFFER_CAPACITY * 2;
333     uint32_t bufferCapacityIncrement_ = BUFFER_CAPACITY_INCREMENT;
334 
335     mutable std::shared_mutex bufferMutex_;
336 
337     std::atomic<bool> continueNotify_ = false;
338     std::atomic<uint32_t> gop_ = 0;
339     std::mutex idleMutex_;
340     std::mutex notifyMutex_;
341     std::thread notifyThread_;
342     std::condition_variable dataCV_;
343     std::list<uint32_t> keyIndexList_;
344     std::weak_ptr<BufferDispatcherListener> listener_;
345     std::unique_ptr<TimeoutTimer> writingTimer_ = nullptr;
346     std::unordered_map<uint32_t, DataNotifier::Ptr> notifiers_;
347 
348     circular_buffer<DataSpec::Ptr> circularBuffer_;
349     circular_buffer<MediaData::Ptr> idleVideoBuffer_;
350     circular_buffer<MediaData::Ptr> idleAudioBuffer_;
351 
352     MediaData::Ptr spsBuf_ = nullptr;
353     MediaData::Ptr ppsBuf_ = nullptr;
354     DataSpec::Ptr refHead_ = nullptr;
355     MediaDispacherMode dataMode_ = MEDIA_VIDEO_AUDIO_MIXED;
356 };
357 
358 } // namespace Sharing
359 } // namespace OHOS
360 #endif