1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "rtp_codec_ts.h"
17 #include <securec.h>
18 #include "common/common_macro.h"
19 #include "common/media_log.h"
20 #include "frame/aac_frame.h"
21 #include "frame/h264_frame.h"
22
23 namespace OHOS {
24 namespace Sharing {
25 constexpr int32_t FF_BUFFER_SIZE = 1500;
26 static std::mutex frameLock;
27
RtpDecoderTs()28 RtpDecoderTs::RtpDecoderTs()
29 {
30 MEDIA_LOGD("RtpDecoderTs CTOR IN.");
31 }
32
~RtpDecoderTs()33 RtpDecoderTs::~RtpDecoderTs()
34 {
35 SHARING_LOGI("RtpDecoderTs DTOR IN.");
36 Release();
37 }
38
Release()39 void RtpDecoderTs::Release()
40 {
41 {
42 std::lock_guard<std::mutex> lock(frameLock);
43 onFrame_ = nullptr;
44 }
45
46 exit_ = true;
47 if (decodeThread_ && decodeThread_->joinable()) {
48 decodeThread_->join();
49 decodeThread_ = nullptr;
50 }
51
52 if (avFormatContext_) {
53 avformat_close_input(&avFormatContext_);
54 }
55
56 if (avioContext_) {
57 avio_context_free(&avioContext_);
58 }
59
60 if (avioCtxBuffer_) {
61 av_free(avioCtxBuffer_);
62 avioCtxBuffer_ = nullptr;
63 }
64 }
65
InputRtp(const RtpPacket::Ptr &rtp)66 void RtpDecoderTs::InputRtp(const RtpPacket::Ptr &rtp)
67 {
68 MEDIA_LOGD("trace.");
69 RETURN_IF_NULL(rtp);
70 if (exit_) {
71 return;
72 }
73
74 if (decodeThread_ == nullptr) {
75 decodeThread_ = std::make_unique<std::thread>(&RtpDecoderTs::StartDecoding, this);
76 }
77
78 auto payload_size = rtp->GetPayloadSize();
79 if (payload_size <= 0) {
80 return;
81 }
82
83 std::lock_guard<std::mutex> lock(queueMutex_);
84 dataQueue_.emplace(rtp);
85 }
86
SetOnFrame(const OnFrame &cb)87 void RtpDecoderTs::SetOnFrame(const OnFrame &cb)
88 {
89 onFrame_ = cb;
90 }
91
StartDecoding()92 void RtpDecoderTs::StartDecoding()
93 {
94 SHARING_LOGE("trace.");
95 avFormatContext_ = avformat_alloc_context();
96 if (avFormatContext_ == nullptr) {
97 SHARING_LOGE("avformat_alloc_context failed.");
98 return;
99 }
100
101 avioCtxBuffer_ = (uint8_t *)av_malloc(FF_BUFFER_SIZE);
102
103 avioContext_ =
104 avio_alloc_context(avioCtxBuffer_, FF_BUFFER_SIZE, 0, this, &RtpDecoderTs::StaticReadPacket, nullptr, nullptr);
105 if (avioContext_ == nullptr) {
106 SHARING_LOGE("avio_alloc_context failed.");
107 return;
108 }
109 avFormatContext_->pb = avioContext_;
110
111 int ret = avformat_open_input(&avFormatContext_, nullptr, nullptr, nullptr);
112 if (ret != 0) {
113 SHARING_LOGE("avformat_open_input failed.");
114 return;
115 }
116
117 for (uint32_t i = 0; i < avFormatContext_->nb_streams; i++) {
118 if (avFormatContext_->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
119 videoStreamIndex_ = i;
120 SHARING_LOGD("find video stream %{public}u.", i);
121 } else if (avFormatContext_->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
122 audioStreamIndex_ = i;
123 SHARING_LOGD("find audio stream %{public}u.", i);
124 }
125 }
126
127 AVPacket *packet = av_packet_alloc();
128 while (!exit_ && av_read_frame(avFormatContext_, packet) >= 0) {
129 if (exit_) {
130 SHARING_LOGI("ignore av read frame.");
131 break;
132 }
133 if (packet->stream_index == videoStreamIndex_) {
134 SplitH264((char *)packet->data, (size_t)packet->size, 0, [&](const char *buf, size_t len, size_t prefix) {
135 if (H264_TYPE(buf[prefix]) == H264Frame::NAL_AUD) {
136 return;
137 }
138 auto outFrame = std::make_shared<H264Frame>((uint8_t *)buf, len, (uint32_t)packet->dts,
139 (uint32_t)packet->pts, prefix);
140 std::lock_guard<std::mutex> lock(frameLock);
141 if (onFrame_) {
142 onFrame_(outFrame);
143 }
144 });
145 } else if (packet->stream_index == audioStreamIndex_) {
146 auto outFrame = std::make_shared<AACFrame>((uint8_t *)packet->data, packet->size, (uint32_t)packet->dts,
147 (uint32_t)packet->pts);
148 std::lock_guard<std::mutex> lock(frameLock);
149 if (onFrame_) {
150 onFrame_(outFrame);
151 }
152 }
153 av_packet_unref(packet);
154 }
155
156 av_packet_free(&packet);
157 SHARING_LOGD("ts decoding Thread_ exit.");
158 }
159
StaticReadPacket(void *opaque, uint8_t *buf, int buf_size)160 int RtpDecoderTs::StaticReadPacket(void *opaque, uint8_t *buf, int buf_size)
161 {
162 RETURN_INVALID_IF_NULL(opaque);
163 RETURN_INVALID_IF_NULL(buf);
164 RtpDecoderTs *decoder = (RtpDecoderTs *)opaque;
165 if (decoder == nullptr) {
166 SHARING_LOGE("decoder is nullptr.");
167 return 0;
168 }
169 return decoder->ReadPacket(buf, buf_size);
170 }
171
ReadPacket(uint8_t *buf, int buf_size)172 int RtpDecoderTs::ReadPacket(uint8_t *buf, int buf_size)
173 {
174 RETURN_INVALID_IF_NULL(buf);
175 while (dataQueue_.empty()) {
176 if (exit_ == true) {
177 SHARING_LOGI("read packet exit.");
178 return 0;
179 }
180 std::this_thread::sleep_for(std::chrono::microseconds(5)); // 5: wait times
181 }
182
183 std::unique_lock<std::mutex> lock(queueMutex_);
184 auto &rtp = dataQueue_.front();
185 auto data = rtp->GetPayload();
186 int length = rtp->GetPayloadSize();
187 if (length > buf_size) {
188 SHARING_LOGE("rtp length exceed buf_size!");
189 return 0;
190 }
191 auto ret = memcpy_s(buf, length, data, length);
192 if (ret != EOK) {
193 return 0;
194 }
195
196 dataQueue_.pop();
197 return length;
198 }
199
RtpEncoderTs(uint32_t ssrc, uint32_t mtuSize, uint32_t sampleRate, uint8_t payloadType, uint16_t seq)200 RtpEncoderTs::RtpEncoderTs(uint32_t ssrc, uint32_t mtuSize, uint32_t sampleRate, uint8_t payloadType, uint16_t seq)
201 : RtpMaker(ssrc, mtuSize, payloadType, sampleRate, seq)
202 {
203 SHARING_LOGI("RtpEncoderTs CTOR IN");
204 merger_.SetType(FrameMerger::H264_PREFIX);
205 }
206
~RtpEncoderTs()207 RtpEncoderTs::~RtpEncoderTs()
208 {
209 SHARING_LOGI("RtpEncoderTs DTOR IN");
210 Release();
211 }
212
Release()213 void RtpEncoderTs::Release()
214 {
215 SHARING_LOGD("trace.");
216 {
217 std::lock_guard<std::mutex> lock(cbLockMutex_);
218 onRtpPack_ = nullptr;
219 }
220 exit_ = true;
221 if (encodeThread_ && encodeThread_->joinable()) {
222 encodeThread_->join();
223 encodeThread_ = nullptr;
224 }
225
226 {
227 std::lock_guard<std::mutex> lock(queueMutex_);
228 while (!dataQueue_.empty()) {
229 dataQueue_.pop();
230 }
231 }
232
233 if (avioContext_) {
234 avio_context_free(&avioContext_);
235 }
236
237 if (avioCtxBuffer_) {
238 av_free(avioCtxBuffer_);
239 avioCtxBuffer_ = nullptr;
240 }
241
242 if (avFormatContext_) {
243 avformat_free_context(avFormatContext_);
244 avFormatContext_ = nullptr;
245 }
246 }
247
InputFrame(const Frame::Ptr &frame)248 void RtpEncoderTs::InputFrame(const Frame::Ptr &frame)
249 {
250 RETURN_IF_NULL(frame);
251 if (exit_) {
252 return;
253 }
254
255 DataBuffer::Ptr buffer = std::make_shared<DataBuffer>();
256 switch (frame->GetCodecId()) {
257 case CODEC_H264:
258 // merge sps, pps and key frame into one packet
259 merger_.InputFrame(
260 frame, buffer, [this](uint32_t dts, uint32_t pts, const DataBuffer::Ptr &buffer, bool have_key_frame) {
261 auto outFrame =
262 std::make_shared<H264Frame>(buffer->Data(), buffer->Size(), (uint32_t)dts, (uint32_t)pts,
263 PrefixSize((char *)buffer->Data(), buffer->Size()));
264 SaveFrame(outFrame);
265 });
266 break;
267 case CODEC_AAC:
268 case CODEC_PCM:
269 SaveFrame(frame);
270 break;
271 default:
272 SHARING_LOGW("Unknown codec: %d", frame->GetCodecId());
273 break;
274 }
275
276 if (audioCodeId_ == AV_CODEC_ID_NONE) {
277 if (frame->GetCodecId() == CODEC_AAC) {
278 audioCodeId_ = AV_CODEC_ID_AAC;
279 } else if (frame->GetCodecId() == CODEC_PCM) {
280 audioCodeId_ = AV_CODEC_ID_PCM_S16BE;
281 }
282 }
283 if (encodeThread_ == nullptr && audioCodeId_ != AV_CODEC_ID_NONE) {
284 encodeThread_ = std::make_unique<std::thread>(&RtpEncoderTs::StartEncoding, this);
285 }
286 }
287
SetOnRtpPack(const OnRtpPack &cb)288 void RtpEncoderTs::SetOnRtpPack(const OnRtpPack &cb)
289 {
290 SHARING_LOGD("trace.");
291 onRtpPack_ = cb;
292 }
293
StartEncoding()294 void RtpEncoderTs::StartEncoding()
295 {
296 SHARING_LOGD("trace.");
297 avformat_alloc_output_context2(&avFormatContext_, NULL, "mpegts", NULL);
298 if (avFormatContext_ == nullptr) {
299 SHARING_LOGE("avformat_alloc_output_context2 failed.");
300 return;
301 }
302
303 videoStream = avformat_new_stream(avFormatContext_, NULL);
304 videoStream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
305 videoStream->codecpar->codec_id = AV_CODEC_ID_H264;
306 videoStream->codecpar->codec_tag = 0;
307 videoStream->time_base.num = 1;
308 videoStream->time_base.den = SAMPLE_RATE_90K; // 90000: video sample rate
309
310 audioStream = avformat_new_stream(avFormatContext_, NULL);
311 audioStream->codecpar->codec_type = AVMEDIA_TYPE_AUDIO;
312 audioStream->codecpar->codec_id = audioCodeId_;
313 audioStream->codecpar->codec_tag = 0;
314 audioStream->codecpar->channel_layout = av_get_default_channel_layout(AUDIO_CHANNEL_STEREO);
315 audioStream->codecpar->channels = AUDIO_CHANNEL_STEREO;
316 audioStream->codecpar->sample_rate = AUDIO_SAMPLE_RATE_48000;
317 audioStream->time_base.num = 1;
318 audioStream->time_base.den = AUDIO_SAMPLE_RATE_48000;
319 SHARING_LOGI("audio stream id: %{public}d, video stream id: %{public}d, audio codecid: %{public}d.",
320 audioStream->index, videoStream->index, audioCodeId_);
321
322 avioCtxBuffer_ = (uint8_t *)av_malloc(MAX_RTP_PAYLOAD_SIZE);
323 avioContext_ =
324 avio_alloc_context(avioCtxBuffer_, MAX_RTP_PAYLOAD_SIZE, 1, this, NULL, &RtpEncoderTs::WritePacket, NULL);
325 if (avioContext_ == nullptr) {
326 SHARING_LOGE("avio_alloc_context failed.");
327 return;
328 }
329 avFormatContext_->pb = avioContext_;
330 avFormatContext_->flags |= AVFMT_FLAG_CUSTOM_IO;
331
332 int32_t ret = avformat_write_header(avFormatContext_, NULL);
333 if (ret < 0) {
334 SHARING_LOGE("avformat_write_header to output failed, ret: %{public}d.", ret);
335 return;
336 }
337
338 while (!exit_) {
339 AVPacket *packet = av_packet_alloc();
340 auto frame = ReadFrame(packet);
341 if (frame == nullptr) {
342 break;
343 }
344 av_write_frame(avFormatContext_, packet);
345 }
346
347 av_write_trailer(avFormatContext_);
348 }
349
SaveFrame(Frame::Ptr frame)350 void RtpEncoderTs::SaveFrame(Frame::Ptr frame)
351 {
352 RETURN_IF_NULL(frame);
353 std::lock_guard<std::mutex> lock(queueMutex_);
354 dataQueue_.emplace(frame);
355 }
356
ReadFrame(AVPacket *packet)357 Frame::Ptr RtpEncoderTs::ReadFrame(AVPacket *packet)
358 {
359 if (packet == nullptr) {
360 SHARING_LOGE("packet is null!");
361 return nullptr;
362 }
363 while (dataQueue_.empty()) {
364 if (exit_ == true) {
365 SHARING_LOGI("exit when read frame.");
366 return nullptr;
367 }
368 std::this_thread::sleep_for(std::chrono::microseconds(100)); // 100: wait time
369 }
370
371 std::unique_lock<std::mutex> lock(queueMutex_);
372 Frame::Ptr frame = dataQueue_.front();
373 dataQueue_.pop();
374 packet->data = frame->Data();
375 packet->size = frame->Size();
376
377 keyFrame_ = frame->KeyFrame();
378 if (frame->GetTrackType() == TRACK_VIDEO) {
379 packet->dts = av_rescale(frame->Dts(), videoStream->time_base.den, WFD_MSEC_IN_SEC);
380 packet->pts = av_rescale(frame->Pts(), videoStream->time_base.den, WFD_MSEC_IN_SEC);
381 packet->stream_index = videoStream->index;
382 timeStamp_ = frame->Dts();
383 } else if (frame->GetTrackType() == TRACK_AUDIO) {
384 packet->dts = av_rescale(frame->Dts(), audioStream->time_base.den, WFD_MSEC_IN_SEC);
385 packet->pts = av_rescale(frame->Pts(), audioStream->time_base.den, WFD_MSEC_IN_SEC);
386 packet->stream_index = audioStream->index;
387 timeStamp_ = frame->Dts();
388 }
389 return frame;
390 }
391
RemoveFrameAfterMuxing()392 void RtpEncoderTs::RemoveFrameAfterMuxing()
393 {
394 std::unique_lock<std::mutex> lock(queueMutex_);
395 dataQueue_.pop();
396 }
397
WritePacket(void *opaque, uint8_t *buf, int buf_size)398 int RtpEncoderTs::WritePacket(void *opaque, uint8_t *buf, int buf_size)
399 {
400 RETURN_INVALID_IF_NULL(opaque);
401 RETURN_INVALID_IF_NULL(buf);
402
403 RtpEncoderTs *encoder = (RtpEncoderTs *)opaque;
404 std::lock_guard<std::mutex> lock(encoder->cbLockMutex_);
405 if (encoder->onRtpPack_) {
406 auto rtp =
407 encoder->MakeRtp(reinterpret_cast<const void *>(buf), buf_size, encoder->keyFrame_, encoder->timeStamp_);
408 encoder->onRtpPack_(rtp);
409 }
410
411 return 0;
412 }
413 } // namespace Sharing
414 } // namespace OHOS
415