1 /*
2  * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "rtp_codec_ts.h"
17 #include <securec.h>
18 #include "common/common_macro.h"
19 #include "common/media_log.h"
20 #include "frame/aac_frame.h"
21 #include "frame/h264_frame.h"
22 
23 namespace OHOS {
24 namespace Sharing {
25 constexpr int32_t FF_BUFFER_SIZE = 1500;
26 static std::mutex frameLock;
27 
RtpDecoderTs()28 RtpDecoderTs::RtpDecoderTs()
29 {
30     MEDIA_LOGD("RtpDecoderTs CTOR IN.");
31 }
32 
~RtpDecoderTs()33 RtpDecoderTs::~RtpDecoderTs()
34 {
35     SHARING_LOGI("RtpDecoderTs DTOR IN.");
36     Release();
37 }
38 
Release()39 void RtpDecoderTs::Release()
40 {
41     {
42         std::lock_guard<std::mutex> lock(frameLock);
43         onFrame_ = nullptr;
44     }
45 
46     exit_ = true;
47     if (decodeThread_ && decodeThread_->joinable()) {
48         decodeThread_->join();
49         decodeThread_ = nullptr;
50     }
51 
52     if (avFormatContext_) {
53         avformat_close_input(&avFormatContext_);
54     }
55 
56     if (avioContext_) {
57         avio_context_free(&avioContext_);
58     }
59 
60     if (avioCtxBuffer_) {
61         av_free(avioCtxBuffer_);
62         avioCtxBuffer_ = nullptr;
63     }
64 }
65 
InputRtp(const RtpPacket::Ptr &rtp)66 void RtpDecoderTs::InputRtp(const RtpPacket::Ptr &rtp)
67 {
68     MEDIA_LOGD("trace.");
69     RETURN_IF_NULL(rtp);
70     if (exit_) {
71         return;
72     }
73 
74     if (decodeThread_ == nullptr) {
75         decodeThread_ = std::make_unique<std::thread>(&RtpDecoderTs::StartDecoding, this);
76     }
77 
78     auto payload_size = rtp->GetPayloadSize();
79     if (payload_size <= 0) {
80         return;
81     }
82 
83     std::lock_guard<std::mutex> lock(queueMutex_);
84     dataQueue_.emplace(rtp);
85 }
86 
SetOnFrame(const OnFrame &cb)87 void RtpDecoderTs::SetOnFrame(const OnFrame &cb)
88 {
89     onFrame_ = cb;
90 }
91 
StartDecoding()92 void RtpDecoderTs::StartDecoding()
93 {
94     SHARING_LOGE("trace.");
95     avFormatContext_ = avformat_alloc_context();
96     if (avFormatContext_ == nullptr) {
97         SHARING_LOGE("avformat_alloc_context failed.");
98         return;
99     }
100 
101     avioCtxBuffer_ = (uint8_t *)av_malloc(FF_BUFFER_SIZE);
102 
103     avioContext_ =
104         avio_alloc_context(avioCtxBuffer_, FF_BUFFER_SIZE, 0, this, &RtpDecoderTs::StaticReadPacket, nullptr, nullptr);
105     if (avioContext_ == nullptr) {
106         SHARING_LOGE("avio_alloc_context failed.");
107         return;
108     }
109     avFormatContext_->pb = avioContext_;
110 
111     int ret = avformat_open_input(&avFormatContext_, nullptr, nullptr, nullptr);
112     if (ret != 0) {
113         SHARING_LOGE("avformat_open_input failed.");
114         return;
115     }
116 
117     for (uint32_t i = 0; i < avFormatContext_->nb_streams; i++) {
118         if (avFormatContext_->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
119             videoStreamIndex_ = i;
120             SHARING_LOGD("find video stream %{public}u.", i);
121         } else if (avFormatContext_->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
122             audioStreamIndex_ = i;
123             SHARING_LOGD("find audio stream %{public}u.", i);
124         }
125     }
126 
127     AVPacket *packet = av_packet_alloc();
128     while (!exit_ && av_read_frame(avFormatContext_, packet) >= 0) {
129         if (exit_) {
130             SHARING_LOGI("ignore av read frame.");
131             break;
132         }
133         if (packet->stream_index == videoStreamIndex_) {
134             SplitH264((char *)packet->data, (size_t)packet->size, 0, [&](const char *buf, size_t len, size_t prefix) {
135                 if (H264_TYPE(buf[prefix]) == H264Frame::NAL_AUD) {
136                     return;
137                 }
138                 auto outFrame = std::make_shared<H264Frame>((uint8_t *)buf, len, (uint32_t)packet->dts,
139                                                             (uint32_t)packet->pts, prefix);
140                 std::lock_guard<std::mutex> lock(frameLock);
141                 if (onFrame_) {
142                     onFrame_(outFrame);
143                 }
144             });
145         } else if (packet->stream_index == audioStreamIndex_) {
146             auto outFrame = std::make_shared<AACFrame>((uint8_t *)packet->data, packet->size, (uint32_t)packet->dts,
147                                                        (uint32_t)packet->pts);
148             std::lock_guard<std::mutex> lock(frameLock);
149             if (onFrame_) {
150                 onFrame_(outFrame);
151             }
152         }
153         av_packet_unref(packet);
154     }
155 
156     av_packet_free(&packet);
157     SHARING_LOGD("ts decoding Thread_ exit.");
158 }
159 
StaticReadPacket(void *opaque, uint8_t *buf, int buf_size)160 int RtpDecoderTs::StaticReadPacket(void *opaque, uint8_t *buf, int buf_size)
161 {
162     RETURN_INVALID_IF_NULL(opaque);
163     RETURN_INVALID_IF_NULL(buf);
164     RtpDecoderTs *decoder = (RtpDecoderTs *)opaque;
165     if (decoder == nullptr) {
166         SHARING_LOGE("decoder is nullptr.");
167         return 0;
168     }
169     return decoder->ReadPacket(buf, buf_size);
170 }
171 
ReadPacket(uint8_t *buf, int buf_size)172 int RtpDecoderTs::ReadPacket(uint8_t *buf, int buf_size)
173 {
174     RETURN_INVALID_IF_NULL(buf);
175     while (dataQueue_.empty()) {
176         if (exit_ == true) {
177             SHARING_LOGI("read packet exit.");
178             return 0;
179         }
180         std::this_thread::sleep_for(std::chrono::microseconds(5)); // 5: wait times
181     }
182 
183     std::unique_lock<std::mutex> lock(queueMutex_);
184     auto &rtp = dataQueue_.front();
185     auto data = rtp->GetPayload();
186     int length = rtp->GetPayloadSize();
187     if (length > buf_size) {
188         SHARING_LOGE("rtp length exceed buf_size!");
189         return 0;
190     }
191     auto ret = memcpy_s(buf, length, data, length);
192     if (ret != EOK) {
193         return 0;
194     }
195 
196     dataQueue_.pop();
197     return length;
198 }
199 
RtpEncoderTs(uint32_t ssrc, uint32_t mtuSize, uint32_t sampleRate, uint8_t payloadType, uint16_t seq)200 RtpEncoderTs::RtpEncoderTs(uint32_t ssrc, uint32_t mtuSize, uint32_t sampleRate, uint8_t payloadType, uint16_t seq)
201     : RtpMaker(ssrc, mtuSize, payloadType, sampleRate, seq)
202 {
203     SHARING_LOGI("RtpEncoderTs CTOR IN");
204     merger_.SetType(FrameMerger::H264_PREFIX);
205 }
206 
~RtpEncoderTs()207 RtpEncoderTs::~RtpEncoderTs()
208 {
209     SHARING_LOGI("RtpEncoderTs DTOR IN");
210     Release();
211 }
212 
Release()213 void RtpEncoderTs::Release()
214 {
215     SHARING_LOGD("trace.");
216     {
217         std::lock_guard<std::mutex> lock(cbLockMutex_);
218         onRtpPack_ = nullptr;
219     }
220     exit_ = true;
221     if (encodeThread_ && encodeThread_->joinable()) {
222         encodeThread_->join();
223         encodeThread_ = nullptr;
224     }
225 
226     {
227         std::lock_guard<std::mutex> lock(queueMutex_);
228         while (!dataQueue_.empty()) {
229             dataQueue_.pop();
230         }
231     }
232 
233     if (avioContext_) {
234         avio_context_free(&avioContext_);
235     }
236 
237     if (avioCtxBuffer_) {
238         av_free(avioCtxBuffer_);
239         avioCtxBuffer_ = nullptr;
240     }
241 
242     if (avFormatContext_) {
243         avformat_free_context(avFormatContext_);
244         avFormatContext_ = nullptr;
245     }
246 }
247 
InputFrame(const Frame::Ptr &frame)248 void RtpEncoderTs::InputFrame(const Frame::Ptr &frame)
249 {
250     RETURN_IF_NULL(frame);
251     if (exit_) {
252         return;
253     }
254 
255     DataBuffer::Ptr buffer = std::make_shared<DataBuffer>();
256     switch (frame->GetCodecId()) {
257         case CODEC_H264:
258             // merge sps, pps and key frame into one packet
259             merger_.InputFrame(
260                 frame, buffer, [this](uint32_t dts, uint32_t pts, const DataBuffer::Ptr &buffer, bool have_key_frame) {
261                     auto outFrame =
262                         std::make_shared<H264Frame>(buffer->Data(), buffer->Size(), (uint32_t)dts, (uint32_t)pts,
263                                                     PrefixSize((char *)buffer->Data(), buffer->Size()));
264                     SaveFrame(outFrame);
265                 });
266             break;
267         case CODEC_AAC:
268         case CODEC_PCM:
269             SaveFrame(frame);
270             break;
271         default:
272             SHARING_LOGW("Unknown codec: %d", frame->GetCodecId());
273             break;
274     }
275 
276     if (audioCodeId_ == AV_CODEC_ID_NONE) {
277         if (frame->GetCodecId() == CODEC_AAC) {
278             audioCodeId_ = AV_CODEC_ID_AAC;
279         } else if (frame->GetCodecId() == CODEC_PCM) {
280             audioCodeId_ = AV_CODEC_ID_PCM_S16BE;
281         }
282     }
283     if (encodeThread_ == nullptr && audioCodeId_ != AV_CODEC_ID_NONE) {
284         encodeThread_ = std::make_unique<std::thread>(&RtpEncoderTs::StartEncoding, this);
285     }
286 }
287 
SetOnRtpPack(const OnRtpPack &cb)288 void RtpEncoderTs::SetOnRtpPack(const OnRtpPack &cb)
289 {
290     SHARING_LOGD("trace.");
291     onRtpPack_ = cb;
292 }
293 
StartEncoding()294 void RtpEncoderTs::StartEncoding()
295 {
296     SHARING_LOGD("trace.");
297     avformat_alloc_output_context2(&avFormatContext_, NULL, "mpegts", NULL);
298     if (avFormatContext_ == nullptr) {
299         SHARING_LOGE("avformat_alloc_output_context2 failed.");
300         return;
301     }
302 
303     videoStream = avformat_new_stream(avFormatContext_, NULL);
304     videoStream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
305     videoStream->codecpar->codec_id = AV_CODEC_ID_H264;
306     videoStream->codecpar->codec_tag = 0;
307     videoStream->time_base.num = 1;
308     videoStream->time_base.den = SAMPLE_RATE_90K; // 90000: video sample rate
309 
310     audioStream = avformat_new_stream(avFormatContext_, NULL);
311     audioStream->codecpar->codec_type = AVMEDIA_TYPE_AUDIO;
312     audioStream->codecpar->codec_id = audioCodeId_;
313     audioStream->codecpar->codec_tag = 0;
314     audioStream->codecpar->channel_layout = av_get_default_channel_layout(AUDIO_CHANNEL_STEREO);
315     audioStream->codecpar->channels = AUDIO_CHANNEL_STEREO;
316     audioStream->codecpar->sample_rate = AUDIO_SAMPLE_RATE_48000;
317     audioStream->time_base.num = 1;
318     audioStream->time_base.den = AUDIO_SAMPLE_RATE_48000;
319     SHARING_LOGI("audio stream id: %{public}d, video stream id: %{public}d, audio codecid: %{public}d.",
320         audioStream->index, videoStream->index, audioCodeId_);
321 
322     avioCtxBuffer_ = (uint8_t *)av_malloc(MAX_RTP_PAYLOAD_SIZE);
323     avioContext_ =
324         avio_alloc_context(avioCtxBuffer_, MAX_RTP_PAYLOAD_SIZE, 1, this, NULL, &RtpEncoderTs::WritePacket, NULL);
325     if (avioContext_ == nullptr) {
326         SHARING_LOGE("avio_alloc_context failed.");
327         return;
328     }
329     avFormatContext_->pb = avioContext_;
330     avFormatContext_->flags |= AVFMT_FLAG_CUSTOM_IO;
331 
332     int32_t ret = avformat_write_header(avFormatContext_, NULL);
333     if (ret < 0) {
334         SHARING_LOGE("avformat_write_header to output failed, ret: %{public}d.", ret);
335         return;
336     }
337 
338     while (!exit_) {
339         AVPacket *packet = av_packet_alloc();
340         auto frame = ReadFrame(packet);
341         if (frame == nullptr) {
342             break;
343         }
344         av_write_frame(avFormatContext_, packet);
345     }
346 
347     av_write_trailer(avFormatContext_);
348 }
349 
SaveFrame(Frame::Ptr frame)350 void RtpEncoderTs::SaveFrame(Frame::Ptr frame)
351 {
352     RETURN_IF_NULL(frame);
353     std::lock_guard<std::mutex> lock(queueMutex_);
354     dataQueue_.emplace(frame);
355 }
356 
ReadFrame(AVPacket *packet)357 Frame::Ptr RtpEncoderTs::ReadFrame(AVPacket *packet)
358 {
359     if (packet == nullptr) {
360         SHARING_LOGE("packet is null!");
361         return nullptr;
362     }
363     while (dataQueue_.empty()) {
364         if (exit_ == true) {
365             SHARING_LOGI("exit when read frame.");
366             return nullptr;
367         }
368         std::this_thread::sleep_for(std::chrono::microseconds(100)); // 100: wait time
369     }
370 
371     std::unique_lock<std::mutex> lock(queueMutex_);
372     Frame::Ptr frame = dataQueue_.front();
373     dataQueue_.pop();
374     packet->data = frame->Data();
375     packet->size = frame->Size();
376 
377     keyFrame_ = frame->KeyFrame();
378     if (frame->GetTrackType() == TRACK_VIDEO) {
379         packet->dts = av_rescale(frame->Dts(), videoStream->time_base.den, WFD_MSEC_IN_SEC);
380         packet->pts = av_rescale(frame->Pts(), videoStream->time_base.den, WFD_MSEC_IN_SEC);
381         packet->stream_index = videoStream->index;
382         timeStamp_ = frame->Dts();
383     } else if (frame->GetTrackType() == TRACK_AUDIO) {
384         packet->dts = av_rescale(frame->Dts(), audioStream->time_base.den, WFD_MSEC_IN_SEC);
385         packet->pts = av_rescale(frame->Pts(), audioStream->time_base.den, WFD_MSEC_IN_SEC);
386         packet->stream_index = audioStream->index;
387         timeStamp_ = frame->Dts();
388     }
389     return frame;
390 }
391 
RemoveFrameAfterMuxing()392 void RtpEncoderTs::RemoveFrameAfterMuxing()
393 {
394     std::unique_lock<std::mutex> lock(queueMutex_);
395     dataQueue_.pop();
396 }
397 
WritePacket(void *opaque, uint8_t *buf, int buf_size)398 int RtpEncoderTs::WritePacket(void *opaque, uint8_t *buf, int buf_size)
399 {
400     RETURN_INVALID_IF_NULL(opaque);
401     RETURN_INVALID_IF_NULL(buf);
402 
403     RtpEncoderTs *encoder = (RtpEncoderTs *)opaque;
404     std::lock_guard<std::mutex> lock(encoder->cbLockMutex_);
405     if (encoder->onRtpPack_) {
406         auto rtp =
407             encoder->MakeRtp(reinterpret_cast<const void *>(buf), buf_size, encoder->keyFrame_, encoder->timeStamp_);
408         encoder->onRtpPack_(rtp);
409     }
410 
411     return 0;
412 }
413 } // namespace Sharing
414 } // namespace OHOS
415