1/*** 2 This file is part of PulseAudio. 3 4 Copyright 2016 Arun Raghavan <mail@arunraghavan.net> 5 6 PulseAudio is free software; you can redistribute it and/or modify 7 it under the terms of the GNU Lesser General Public License as published 8 by the Free Software Foundation; either version 2.1 of the License, 9 or (at your option) any later version. 10 11 PulseAudio is distributed in the hope that it will be useful, but 12 WITHOUT ANY WARRANTY; without even the implied warranty of 13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 General Public License for more details. 15 16 You should have received a copy of the GNU Lesser General Public License 17 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 18***/ 19 20#ifdef HAVE_CONFIG_H 21#include <config.h> 22#endif 23 24#include <pulse/timeval.h> 25#include <pulsecore/fdsem.h> 26#include <pulsecore/core-rtclock.h> 27 28#include "rtp.h" 29 30#include <gio/gio.h> 31 32#include <gst/gst.h> 33#include <gst/app/gstappsrc.h> 34#include <gst/app/gstappsink.h> 35#include <gst/base/gstadapter.h> 36#include <gst/rtp/gstrtpbuffer.h> 37 38#define MAKE_ELEMENT_NAMED(v, e, n) \ 39 v = gst_element_factory_make(e, n); \ 40 if (!v) { \ 41 pa_log("Could not create %s element", e); \ 42 goto fail; \ 43 } 44 45#define MAKE_ELEMENT(v, e) MAKE_ELEMENT_NAMED((v), (e), NULL) 46#define RTP_HEADER_SIZE 12 47 48/* 49 * As per RFC 7587, the RTP payload type for OPUS is to be assigned 50 * dynamically. Considering that pa_rtp_payload_from_sample_spec uses 51 * 127 for anything other than format == S16BE and rate == 44.1 KHz, 52 * we use 127 for OPUS here as rate == 48 KHz for OPUS. 53 */ 54#define RTP_OPUS_PAYLOAD_TYPE 127 55 56struct pa_rtp_context { 57 pa_fdsem *fdsem; 58 pa_sample_spec ss; 59 60 GstElement *pipeline; 61 GstElement *appsrc; 62 GstElement *appsink; 63 GstCaps *meta_reference; 64 65 bool first_buffer; 66 uint32_t last_timestamp; 67 68 uint8_t *send_buf; 69 size_t mtu; 70}; 71 72static GstCaps* caps_from_sample_spec(const pa_sample_spec *ss, bool enable_opus) { 73 if (ss->format != PA_SAMPLE_S16BE && ss->format != PA_SAMPLE_S16LE) 74 return NULL; 75 76 return gst_caps_new_simple("audio/x-raw", 77 "format", G_TYPE_STRING, enable_opus ? "S16LE" : "S16BE", 78 "rate", G_TYPE_INT, (int) ss->rate, 79 "channels", G_TYPE_INT, (int) ss->channels, 80 "layout", G_TYPE_STRING, "interleaved", 81 NULL); 82} 83 84static bool init_send_pipeline(pa_rtp_context *c, int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss, bool enable_opus) { 85 GstElement *appsrc = NULL, *pay = NULL, *capsf = NULL, *rtpbin = NULL, *sink = NULL; 86 GstElement *opusenc = NULL; 87 GstCaps *caps; 88 GSocket *socket; 89 GInetSocketAddress *addr; 90 GInetAddress *iaddr; 91 guint16 port; 92 gchar *addr_str; 93 94 MAKE_ELEMENT(appsrc, "appsrc"); 95 if (enable_opus) { 96 MAKE_ELEMENT(opusenc, "opusenc"); 97 MAKE_ELEMENT(pay, "rtpopuspay"); 98 } else { 99 MAKE_ELEMENT(pay, "rtpL16pay"); 100 } 101 MAKE_ELEMENT(capsf, "capsfilter"); 102 MAKE_ELEMENT(rtpbin, "rtpbin"); 103 MAKE_ELEMENT(sink, "udpsink"); 104 105 c->pipeline = gst_pipeline_new(NULL); 106 107 gst_bin_add_many(GST_BIN(c->pipeline), appsrc, pay, capsf, rtpbin, sink, NULL); 108 109 if (enable_opus) 110 gst_bin_add_many(GST_BIN(c->pipeline), opusenc, NULL); 111 112 caps = caps_from_sample_spec(ss, enable_opus); 113 if (!caps) { 114 pa_log("Unsupported format to payload"); 115 goto fail; 116 } 117 118 socket = g_socket_new_from_fd(fd, NULL); 119 if (!socket) { 120 pa_log("Failed to create socket"); 121 goto fail; 122 } 123 124 addr = G_INET_SOCKET_ADDRESS(g_socket_get_remote_address(socket, NULL)); 125 iaddr = g_inet_socket_address_get_address(addr); 126 addr_str = g_inet_address_to_string(iaddr); 127 port = g_inet_socket_address_get_port(addr); 128 129 g_object_set(appsrc, "caps", caps, "is-live", TRUE, "blocksize", mtu, "format", 3 /* time */, NULL); 130 g_object_set(pay, "mtu", mtu, NULL); 131 g_object_set(sink, "socket", socket, "host", addr_str, "port", port, 132 "enable-last-sample", FALSE, "sync", FALSE, "loop", 133 g_socket_get_multicast_loopback(socket), "ttl", 134 g_socket_get_ttl(socket), "ttl-mc", 135 g_socket_get_multicast_ttl(socket), "auto-multicast", FALSE, 136 NULL); 137 138 g_free(addr_str); 139 g_object_unref(addr); 140 g_object_unref(socket); 141 142 gst_caps_unref(caps); 143 144 /* Force the payload type that we want */ 145 if (enable_opus) 146 caps = gst_caps_new_simple("application/x-rtp", "payload", G_TYPE_INT, (int) RTP_OPUS_PAYLOAD_TYPE, "encoding-name", G_TYPE_STRING, "OPUS", NULL); 147 else 148 caps = gst_caps_new_simple("application/x-rtp", "payload", G_TYPE_INT, (int) payload, "encoding-name", G_TYPE_STRING, "L16", NULL); 149 150 g_object_set(capsf, "caps", caps, NULL); 151 gst_caps_unref(caps); 152 153 if (enable_opus) { 154 if (!gst_element_link(appsrc, opusenc) || 155 !gst_element_link(opusenc, pay) || 156 !gst_element_link(pay, capsf) || 157 !gst_element_link_pads(capsf, "src", rtpbin, "send_rtp_sink_0") || 158 !gst_element_link_pads(rtpbin, "send_rtp_src_0", sink, "sink")) { 159 160 pa_log("Could not set up send pipeline"); 161 goto fail; 162 } 163 } else { 164 if (!gst_element_link(appsrc, pay) || 165 !gst_element_link(pay, capsf) || 166 !gst_element_link_pads(capsf, "src", rtpbin, "send_rtp_sink_0") || 167 !gst_element_link_pads(rtpbin, "send_rtp_src_0", sink, "sink")) { 168 169 pa_log("Could not set up send pipeline"); 170 goto fail; 171 } 172 } 173 174 if (gst_element_set_state(c->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { 175 pa_log("Could not start pipeline"); 176 goto fail; 177 } 178 179 c->appsrc = gst_object_ref(appsrc); 180 181 return true; 182 183fail: 184 if (c->pipeline) { 185 gst_object_unref(c->pipeline); 186 } else { 187 /* These weren't yet added to pipeline, so we still have a ref */ 188 if (appsrc) 189 gst_object_unref(appsrc); 190 if (opusenc) 191 gst_object_unref(opusenc); 192 if (pay) 193 gst_object_unref(pay); 194 if (capsf) 195 gst_object_unref(capsf); 196 if (rtpbin) 197 gst_object_unref(rtpbin); 198 if (sink) 199 gst_object_unref(sink); 200 } 201 202 return false; 203} 204 205pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss, bool enable_opus) { 206 pa_rtp_context *c = NULL; 207 GError *error = NULL; 208 209 pa_assert(fd >= 0); 210 211 pa_log_info("Initialising GStreamer RTP backend for send"); 212 213 if (enable_opus) 214 pa_log_info("Using OPUS encoding for RTP send"); 215 216 c = pa_xnew0(pa_rtp_context, 1); 217 218 c->ss = *ss; 219 c->mtu = mtu - RTP_HEADER_SIZE; 220 c->send_buf = pa_xmalloc(c->mtu); 221 222 if (!gst_init_check(NULL, NULL, &error)) { 223 pa_log_error("Could not initialise GStreamer: %s", error->message); 224 g_error_free(error); 225 goto fail; 226 } 227 228 if (!init_send_pipeline(c, fd, payload, mtu, ss, enable_opus)) 229 goto fail; 230 231 return c; 232 233fail: 234 pa_rtp_context_free(c); 235 return NULL; 236} 237 238/* Called from I/O thread context */ 239static bool process_bus_messages(pa_rtp_context *c) { 240 GstBus *bus; 241 GstMessage *message; 242 bool ret = true; 243 244 bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline)); 245 246 while (ret && (message = gst_bus_pop(bus))) { 247 if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) { 248 GError *error = NULL; 249 250 ret = false; 251 252 gst_message_parse_error(message, &error, NULL); 253 pa_log("Got an error: %s", error->message); 254 255 g_error_free(error); 256 } 257 258 gst_message_unref(message); 259 } 260 261 gst_object_unref(bus); 262 263 return ret; 264} 265 266/* Called from I/O thread context */ 267int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) { 268 GstBuffer *buf; 269 size_t n = 0; 270 271 pa_assert(c); 272 pa_assert(q); 273 274 if (!process_bus_messages(c)) 275 return -1; 276 277 /* 278 * While we check here for atleast MTU worth of data being available in 279 * memblockq, we might not have exact equivalent to MTU. Hence, we walk 280 * over the memchunks in memblockq and accumulate MTU bytes next. 281 */ 282 if (pa_memblockq_get_length(q) < c->mtu) 283 return 0; 284 285 for (;;) { 286 pa_memchunk chunk; 287 int r; 288 289 pa_memchunk_reset(&chunk); 290 291 if ((r = pa_memblockq_peek(q, &chunk)) >= 0) { 292 /* 293 * Accumulate MTU bytes of data before sending. If the current 294 * chunk length + accumulated bytes exceeds MTU, we drop bytes 295 * considered for transfer in this iteration from memblockq. 296 * 297 * The remaining bytes will be available in the next iteration, 298 * as these will be tracked and maintained by memblockq. 299 */ 300 size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length; 301 302 pa_assert(chunk.memblock); 303 304 memcpy(c->send_buf + n, pa_memblock_acquire_chunk(&chunk), k); 305 pa_memblock_release(chunk.memblock); 306 pa_memblock_unref(chunk.memblock); 307 308 n += k; 309 pa_memblockq_drop(q, k); 310 } 311 312 if (r < 0 || n >= c->mtu) { 313 GstClock *clock; 314 GstClockTime timestamp, clock_time; 315 GstMapInfo info; 316 317 if (n > 0) { 318 clock = gst_element_get_clock(c->pipeline); 319 clock_time = gst_clock_get_time(clock); 320 gst_object_unref(clock); 321 322 timestamp = gst_element_get_base_time(c->pipeline); 323 if (timestamp > clock_time) 324 timestamp -= clock_time; 325 else 326 timestamp = 0; 327 328 buf = gst_buffer_new_allocate(NULL, n, NULL); 329 pa_assert(buf); 330 331 GST_BUFFER_PTS(buf) = timestamp; 332 333 pa_assert_se(gst_buffer_map(buf, &info, GST_MAP_WRITE)); 334 335 memcpy(info.data, c->send_buf, n); 336 gst_buffer_unmap(buf, &info); 337 338 if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) { 339 pa_log_error("Could not push buffer"); 340 return -1; 341 } 342 } 343 344 if (r < 0 || pa_memblockq_get_length(q) < c->mtu) 345 break; 346 347 n = 0; 348 } 349 } 350 351 return 0; 352} 353 354static GstCaps* rtp_caps_from_sample_spec(const pa_sample_spec *ss, bool enable_opus) { 355 if (ss->format != PA_SAMPLE_S16BE && ss->format != PA_SAMPLE_S16LE) 356 return NULL; 357 358 if (enable_opus) 359 return gst_caps_new_simple("application/x-rtp", 360 "media", G_TYPE_STRING, "audio", 361 "encoding-name", G_TYPE_STRING, "OPUS", 362 "clock-rate", G_TYPE_INT, (int) 48000, 363 "payload", G_TYPE_INT, (int) RTP_OPUS_PAYLOAD_TYPE, 364 NULL); 365 366 return gst_caps_new_simple("application/x-rtp", 367 "media", G_TYPE_STRING, "audio", 368 "encoding-name", G_TYPE_STRING, "L16", 369 "clock-rate", G_TYPE_INT, (int) ss->rate, 370 "payload", G_TYPE_INT, (int) pa_rtp_payload_from_sample_spec(ss), 371 "layout", G_TYPE_STRING, "interleaved", 372 NULL); 373} 374 375static void on_pad_added(GstElement *element, GstPad *pad, gpointer userdata) { 376 pa_rtp_context *c = (pa_rtp_context *) userdata; 377 GstElement *depay; 378 GstPad *sinkpad; 379 GstPadLinkReturn ret; 380 381 depay = gst_bin_get_by_name(GST_BIN(c->pipeline), "depay"); 382 pa_assert(depay); 383 384 sinkpad = gst_element_get_static_pad(depay, "sink"); 385 386 ret = gst_pad_link(pad, sinkpad); 387 if (ret != GST_PAD_LINK_OK) { 388 GstBus *bus; 389 GError *error; 390 391 bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline)); 392 error = g_error_new(GST_CORE_ERROR, GST_CORE_ERROR_PAD, "Could not link rtpbin to depayloader"); 393 gst_bus_post(bus, gst_message_new_error(GST_OBJECT(c->pipeline), error, NULL)); 394 395 /* Actually cause the I/O thread to wake up and process the error */ 396 pa_fdsem_post(c->fdsem); 397 398 g_error_free(error); 399 gst_object_unref(bus); 400 } 401 402 gst_object_unref(sinkpad); 403 gst_object_unref(depay); 404} 405 406static GstPadProbeReturn udpsrc_buffer_probe(GstPad *pad, GstPadProbeInfo *info, gpointer userdata) { 407 struct timeval tv; 408 pa_usec_t timestamp; 409 pa_rtp_context *c = (pa_rtp_context *) userdata; 410 411 pa_assert(info->type & GST_PAD_PROBE_TYPE_BUFFER); 412 413 pa_gettimeofday(&tv); 414 timestamp = pa_timeval_load(&tv); 415 416 gst_buffer_add_reference_timestamp_meta(GST_BUFFER(info->data), c->meta_reference, timestamp * GST_USECOND, 417 GST_CLOCK_TIME_NONE); 418 419 return GST_PAD_PROBE_OK; 420} 421 422static bool init_receive_pipeline(pa_rtp_context *c, int fd, const pa_sample_spec *ss, bool enable_opus) { 423 GstElement *udpsrc = NULL, *rtpbin = NULL, *depay = NULL, *appsink = NULL; 424 GstElement *resample = NULL, *opusdec = NULL; 425 GstCaps *caps, *sink_caps; 426 GstPad *pad; 427 GSocket *socket; 428 GError *error = NULL; 429 430 MAKE_ELEMENT(udpsrc, "udpsrc"); 431 MAKE_ELEMENT(rtpbin, "rtpbin"); 432 if (enable_opus) { 433 MAKE_ELEMENT_NAMED(depay, "rtpopusdepay", "depay"); 434 MAKE_ELEMENT(opusdec, "opusdec"); 435 MAKE_ELEMENT(resample, "audioresample"); 436 } else { 437 MAKE_ELEMENT_NAMED(depay, "rtpL16depay", "depay"); 438 } 439 MAKE_ELEMENT(appsink, "appsink"); 440 441 c->pipeline = gst_pipeline_new(NULL); 442 443 gst_bin_add_many(GST_BIN(c->pipeline), udpsrc, rtpbin, depay, appsink, NULL); 444 445 if (enable_opus) 446 gst_bin_add_many(GST_BIN(c->pipeline), opusdec, resample, NULL); 447 448 socket = g_socket_new_from_fd(fd, &error); 449 if (error) { 450 pa_log("Could not create socket: %s", error->message); 451 g_error_free(error); 452 goto fail; 453 } 454 455 caps = rtp_caps_from_sample_spec(ss, enable_opus); 456 if (!caps) { 457 pa_log("Unsupported format to payload"); 458 goto fail; 459 } 460 461 g_object_set(udpsrc, "socket", socket, "caps", caps, "auto-multicast" /* caller handles this */, FALSE, NULL); 462 g_object_set(rtpbin, "latency", 0, "buffer-mode", 0 /* none */, NULL); 463 g_object_set(appsink, "sync", FALSE, "enable-last-sample", FALSE, NULL); 464 465 if (enable_opus) { 466 sink_caps = gst_caps_new_simple("audio/x-raw", 467 "format", G_TYPE_STRING, "S16LE", 468 "layout", G_TYPE_STRING, "interleaved", 469 "clock-rate", G_TYPE_INT, (int) ss->rate, 470 "channels", G_TYPE_INT, (int) ss->channels, 471 NULL); 472 g_object_set(appsink, "caps", sink_caps, NULL); 473 g_object_set(opusdec, "plc", TRUE, NULL); 474 gst_caps_unref(sink_caps); 475 } 476 477 gst_caps_unref(caps); 478 g_object_unref(socket); 479 480 if (enable_opus) { 481 if (!gst_element_link_pads(udpsrc, "src", rtpbin, "recv_rtp_sink_0") || 482 !gst_element_link(depay, opusdec) || 483 !gst_element_link(opusdec, resample) || 484 !gst_element_link(resample, appsink)) { 485 486 pa_log("Could not set up receive pipeline"); 487 goto fail; 488 } 489 } else { 490 if (!gst_element_link_pads(udpsrc, "src", rtpbin, "recv_rtp_sink_0") || 491 !gst_element_link(depay, appsink)) { 492 493 pa_log("Could not set up receive pipeline"); 494 goto fail; 495 } 496 } 497 498 g_signal_connect(G_OBJECT(rtpbin), "pad-added", G_CALLBACK(on_pad_added), c); 499 500 /* This logic should go into udpsrc, and we should be populating the 501 * receive timestamp using SCM_TIMESTAMP, but until we have that ... */ 502 c->meta_reference = gst_caps_new_empty_simple("timestamp/x-pulseaudio-wallclock"); 503 504 pad = gst_element_get_static_pad(udpsrc, "src"); 505 gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, udpsrc_buffer_probe, c, NULL); 506 gst_object_unref(pad); 507 508 if (gst_element_set_state(c->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { 509 pa_log("Could not start pipeline"); 510 goto fail; 511 } 512 513 c->appsink = gst_object_ref(appsink); 514 515 return true; 516 517fail: 518 if (c->pipeline) { 519 gst_object_unref(c->pipeline); 520 } else { 521 /* These weren't yet added to pipeline, so we still have a ref */ 522 if (udpsrc) 523 gst_object_unref(udpsrc); 524 if (depay) 525 gst_object_unref(depay); 526 if (rtpbin) 527 gst_object_unref(rtpbin); 528 if (opusdec) 529 gst_object_unref(opusdec); 530 if (resample) 531 gst_object_unref(resample); 532 if (appsink) 533 gst_object_unref(appsink); 534 } 535 536 return false; 537} 538 539/* Called from the GStreamer streaming thread */ 540static void appsink_eos(GstAppSink *appsink, gpointer userdata) { 541 pa_rtp_context *c = (pa_rtp_context *) userdata; 542 543 pa_fdsem_post(c->fdsem); 544} 545 546/* Called from the GStreamer streaming thread */ 547static GstFlowReturn appsink_new_sample(GstAppSink *appsink, gpointer userdata) { 548 pa_rtp_context *c = (pa_rtp_context *) userdata; 549 550 pa_fdsem_post(c->fdsem); 551 552 return GST_FLOW_OK; 553} 554 555pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss, bool enable_opus) { 556 pa_rtp_context *c = NULL; 557 GstAppSinkCallbacks callbacks = { 0, }; 558 GError *error = NULL; 559 560 pa_assert(fd >= 0); 561 562 pa_log_info("Initialising GStreamer RTP backend for receive"); 563 564 if (enable_opus) 565 pa_log_info("Using OPUS encoding for RTP recv"); 566 567 c = pa_xnew0(pa_rtp_context, 1); 568 569 c->fdsem = pa_fdsem_new(); 570 c->ss = *ss; 571 c->send_buf = NULL; 572 c->first_buffer = true; 573 574 if (!gst_init_check(NULL, NULL, &error)) { 575 pa_log_error("Could not initialise GStreamer: %s", error->message); 576 g_error_free(error); 577 goto fail; 578 } 579 580 if (!init_receive_pipeline(c, fd, ss, enable_opus)) 581 goto fail; 582 583 callbacks.eos = appsink_eos; 584 callbacks.new_sample = appsink_new_sample; 585 gst_app_sink_set_callbacks(GST_APP_SINK(c->appsink), &callbacks, c, NULL); 586 587 return c; 588 589fail: 590 pa_rtp_context_free(c); 591 return NULL; 592} 593 594/* Called from I/O thread context */ 595int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) { 596 GstSample *sample = NULL; 597 GstBufferList *buf_list; 598 GstAdapter *adapter = NULL; 599 GstBuffer *buf; 600 GstMapInfo info; 601 GstClockTime timestamp = GST_CLOCK_TIME_NONE; 602 uint8_t *data; 603 uint64_t data_len = 0; 604 605 if (!process_bus_messages(c)) 606 goto fail; 607 608 adapter = gst_adapter_new(); 609 pa_assert(adapter); 610 611 while (true) { 612 sample = gst_app_sink_try_pull_sample(GST_APP_SINK(c->appsink), 0); 613 if (!sample) 614 break; 615 616 buf = gst_sample_get_buffer(sample); 617 618 /* Get the timestamp from the first buffer */ 619 if (timestamp == GST_CLOCK_TIME_NONE) { 620 GstReferenceTimestampMeta *meta = gst_buffer_get_reference_timestamp_meta(buf, c->meta_reference); 621 622 /* Use the meta if we were able to insert it and it came through, 623 * else try to fallback to the DTS, which is only available in 624 * GStreamer 1.16 and earlier. */ 625 if (meta) 626 timestamp = meta->timestamp; 627 else if (GST_BUFFER_DTS(buf) != GST_CLOCK_TIME_NONE) 628 timestamp = GST_BUFFER_DTS(buf); 629 else 630 timestamp = 0; 631 } 632 633 if (GST_BUFFER_IS_DISCONT(buf)) 634 pa_log_info("Discontinuity detected, possibly lost some packets"); 635 636 if (!gst_buffer_map(buf, &info, GST_MAP_READ)) { 637 pa_log_info("Failed to map buffer"); 638 gst_sample_unref(sample); 639 goto fail; 640 } 641 642 data_len += info.size; 643 /* We need the buffer to be valid longer than the sample, which will 644 * be valid only for the duration of this loop. 645 * 646 * To do this, increase the ref count. Ownership is transferred to the 647 * adapter in gst_adapter_push. 648 */ 649 gst_buffer_ref(buf); 650 gst_adapter_push(adapter, buf); 651 gst_buffer_unmap(buf, &info); 652 653 gst_sample_unref(sample); 654 } 655 656 buf_list = gst_adapter_take_buffer_list(adapter, data_len); 657 pa_assert(buf_list); 658 659 pa_assert(pa_mempool_block_size_max(pool) >= data_len); 660 661 chunk->memblock = pa_memblock_new(pool, data_len); 662 chunk->index = 0; 663 chunk->length = data_len; 664 665 data = (uint8_t *) pa_memblock_acquire_chunk(chunk); 666 667 for (int i = 0; i < gst_buffer_list_length(buf_list); i++) { 668 buf = gst_buffer_list_get(buf_list, i); 669 670 if (!gst_buffer_map(buf, &info, GST_MAP_READ)) { 671 gst_buffer_list_unref(buf_list); 672 goto fail; 673 } 674 675 memcpy(data, info.data, info.size); 676 data += info.size; 677 gst_buffer_unmap(buf, &info); 678 } 679 680 pa_memblock_release(chunk->memblock); 681 682 /* When buffer-mode = none, the buffer PTS is the RTP timestamp, converted 683 * to time units (instead of clock-rate units as is in the header) and 684 * wraparound-corrected. */ 685 *rtp_tstamp = gst_util_uint64_scale_int(GST_BUFFER_PTS(gst_buffer_list_get(buf_list, 0)), c->ss.rate, GST_SECOND) & 0xFFFFFFFFU; 686 if (timestamp != GST_CLOCK_TIME_NONE) 687 pa_timeval_rtstore(tstamp, timestamp / PA_NSEC_PER_USEC, false); 688 689 if (c->first_buffer) { 690 c->first_buffer = false; 691 c->last_timestamp = *rtp_tstamp; 692 } else { 693 /* The RTP clock -> time domain -> RTP clock transformation above might 694 * add a ±1 rounding error, so let's get rid of that */ 695 uint32_t expected = c->last_timestamp + (uint32_t) (data_len / pa_rtp_context_get_frame_size(c)); 696 int delta = *rtp_tstamp - expected; 697 698 if (delta == 1 || delta == -1) 699 *rtp_tstamp -= delta; 700 701 c->last_timestamp = *rtp_tstamp; 702 } 703 704 gst_buffer_list_unref(buf_list); 705 gst_object_unref(adapter); 706 707 return 0; 708 709fail: 710 if (adapter) 711 gst_object_unref(adapter); 712 713 if (chunk->memblock) 714 pa_memblock_unref(chunk->memblock); 715 716 return -1; 717} 718 719void pa_rtp_context_free(pa_rtp_context *c) { 720 pa_assert(c); 721 722 if (c->meta_reference) 723 gst_caps_unref(c->meta_reference); 724 725 if (c->appsrc) { 726 gst_app_src_end_of_stream(GST_APP_SRC(c->appsrc)); 727 gst_object_unref(c->appsrc); 728 pa_xfree(c->send_buf); 729 } 730 731 if (c->appsink) 732 gst_object_unref(c->appsink); 733 734 if (c->pipeline) { 735 gst_element_set_state(c->pipeline, GST_STATE_NULL); 736 gst_object_unref(c->pipeline); 737 } 738 739 if (c->fdsem) 740 pa_fdsem_free(c->fdsem); 741 742 pa_xfree(c); 743} 744 745pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll) { 746 return pa_rtpoll_item_new_fdsem(rtpoll, PA_RTPOLL_LATE, c->fdsem); 747} 748 749size_t pa_rtp_context_get_frame_size(pa_rtp_context *c) { 750 return pa_frame_size(&c->ss); 751} 752