153a5a1b3Sopenharmony_ci/*** 253a5a1b3Sopenharmony_ci This file is part of PulseAudio. 353a5a1b3Sopenharmony_ci 453a5a1b3Sopenharmony_ci Copyright 2016 Arun Raghavan <mail@arunraghavan.net> 553a5a1b3Sopenharmony_ci 653a5a1b3Sopenharmony_ci PulseAudio is free software; you can redistribute it and/or modify 753a5a1b3Sopenharmony_ci it under the terms of the GNU Lesser General Public License as published 853a5a1b3Sopenharmony_ci by the Free Software Foundation; either version 2.1 of the License, 953a5a1b3Sopenharmony_ci or (at your option) any later version. 1053a5a1b3Sopenharmony_ci 1153a5a1b3Sopenharmony_ci PulseAudio is distributed in the hope that it will be useful, but 1253a5a1b3Sopenharmony_ci WITHOUT ANY WARRANTY; without even the implied warranty of 1353a5a1b3Sopenharmony_ci MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 1453a5a1b3Sopenharmony_ci General Public License for more details. 1553a5a1b3Sopenharmony_ci 1653a5a1b3Sopenharmony_ci You should have received a copy of the GNU Lesser General Public License 1753a5a1b3Sopenharmony_ci along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 1853a5a1b3Sopenharmony_ci***/ 1953a5a1b3Sopenharmony_ci 2053a5a1b3Sopenharmony_ci#ifdef HAVE_CONFIG_H 2153a5a1b3Sopenharmony_ci#include <config.h> 2253a5a1b3Sopenharmony_ci#endif 2353a5a1b3Sopenharmony_ci 2453a5a1b3Sopenharmony_ci#include <pulse/timeval.h> 2553a5a1b3Sopenharmony_ci#include <pulsecore/fdsem.h> 2653a5a1b3Sopenharmony_ci#include <pulsecore/core-rtclock.h> 2753a5a1b3Sopenharmony_ci 2853a5a1b3Sopenharmony_ci#include "rtp.h" 2953a5a1b3Sopenharmony_ci 3053a5a1b3Sopenharmony_ci#include <gio/gio.h> 3153a5a1b3Sopenharmony_ci 3253a5a1b3Sopenharmony_ci#include <gst/gst.h> 3353a5a1b3Sopenharmony_ci#include <gst/app/gstappsrc.h> 3453a5a1b3Sopenharmony_ci#include <gst/app/gstappsink.h> 3553a5a1b3Sopenharmony_ci#include <gst/base/gstadapter.h> 3653a5a1b3Sopenharmony_ci#include <gst/rtp/gstrtpbuffer.h> 3753a5a1b3Sopenharmony_ci 3853a5a1b3Sopenharmony_ci#define MAKE_ELEMENT_NAMED(v, e, n) \ 3953a5a1b3Sopenharmony_ci v = gst_element_factory_make(e, n); \ 4053a5a1b3Sopenharmony_ci if (!v) { \ 4153a5a1b3Sopenharmony_ci pa_log("Could not create %s element", e); \ 4253a5a1b3Sopenharmony_ci goto fail; \ 4353a5a1b3Sopenharmony_ci } 4453a5a1b3Sopenharmony_ci 4553a5a1b3Sopenharmony_ci#define MAKE_ELEMENT(v, e) MAKE_ELEMENT_NAMED((v), (e), NULL) 4653a5a1b3Sopenharmony_ci#define RTP_HEADER_SIZE 12 4753a5a1b3Sopenharmony_ci 4853a5a1b3Sopenharmony_ci/* 4953a5a1b3Sopenharmony_ci * As per RFC 7587, the RTP payload type for OPUS is to be assigned 5053a5a1b3Sopenharmony_ci * dynamically. Considering that pa_rtp_payload_from_sample_spec uses 5153a5a1b3Sopenharmony_ci * 127 for anything other than format == S16BE and rate == 44.1 KHz, 5253a5a1b3Sopenharmony_ci * we use 127 for OPUS here as rate == 48 KHz for OPUS. 5353a5a1b3Sopenharmony_ci */ 5453a5a1b3Sopenharmony_ci#define RTP_OPUS_PAYLOAD_TYPE 127 5553a5a1b3Sopenharmony_ci 5653a5a1b3Sopenharmony_cistruct pa_rtp_context { 5753a5a1b3Sopenharmony_ci pa_fdsem *fdsem; 5853a5a1b3Sopenharmony_ci pa_sample_spec ss; 5953a5a1b3Sopenharmony_ci 6053a5a1b3Sopenharmony_ci GstElement *pipeline; 6153a5a1b3Sopenharmony_ci GstElement *appsrc; 6253a5a1b3Sopenharmony_ci GstElement *appsink; 6353a5a1b3Sopenharmony_ci GstCaps *meta_reference; 6453a5a1b3Sopenharmony_ci 6553a5a1b3Sopenharmony_ci bool first_buffer; 6653a5a1b3Sopenharmony_ci uint32_t last_timestamp; 6753a5a1b3Sopenharmony_ci 6853a5a1b3Sopenharmony_ci uint8_t *send_buf; 6953a5a1b3Sopenharmony_ci size_t mtu; 7053a5a1b3Sopenharmony_ci}; 7153a5a1b3Sopenharmony_ci 7253a5a1b3Sopenharmony_cistatic GstCaps* caps_from_sample_spec(const pa_sample_spec *ss, bool enable_opus) { 7353a5a1b3Sopenharmony_ci if (ss->format != PA_SAMPLE_S16BE && ss->format != PA_SAMPLE_S16LE) 7453a5a1b3Sopenharmony_ci return NULL; 7553a5a1b3Sopenharmony_ci 7653a5a1b3Sopenharmony_ci return gst_caps_new_simple("audio/x-raw", 7753a5a1b3Sopenharmony_ci "format", G_TYPE_STRING, enable_opus ? "S16LE" : "S16BE", 7853a5a1b3Sopenharmony_ci "rate", G_TYPE_INT, (int) ss->rate, 7953a5a1b3Sopenharmony_ci "channels", G_TYPE_INT, (int) ss->channels, 8053a5a1b3Sopenharmony_ci "layout", G_TYPE_STRING, "interleaved", 8153a5a1b3Sopenharmony_ci NULL); 8253a5a1b3Sopenharmony_ci} 8353a5a1b3Sopenharmony_ci 8453a5a1b3Sopenharmony_cistatic bool init_send_pipeline(pa_rtp_context *c, int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss, bool enable_opus) { 8553a5a1b3Sopenharmony_ci GstElement *appsrc = NULL, *pay = NULL, *capsf = NULL, *rtpbin = NULL, *sink = NULL; 8653a5a1b3Sopenharmony_ci GstElement *opusenc = NULL; 8753a5a1b3Sopenharmony_ci GstCaps *caps; 8853a5a1b3Sopenharmony_ci GSocket *socket; 8953a5a1b3Sopenharmony_ci GInetSocketAddress *addr; 9053a5a1b3Sopenharmony_ci GInetAddress *iaddr; 9153a5a1b3Sopenharmony_ci guint16 port; 9253a5a1b3Sopenharmony_ci gchar *addr_str; 9353a5a1b3Sopenharmony_ci 9453a5a1b3Sopenharmony_ci MAKE_ELEMENT(appsrc, "appsrc"); 9553a5a1b3Sopenharmony_ci if (enable_opus) { 9653a5a1b3Sopenharmony_ci MAKE_ELEMENT(opusenc, "opusenc"); 9753a5a1b3Sopenharmony_ci MAKE_ELEMENT(pay, "rtpopuspay"); 9853a5a1b3Sopenharmony_ci } else { 9953a5a1b3Sopenharmony_ci MAKE_ELEMENT(pay, "rtpL16pay"); 10053a5a1b3Sopenharmony_ci } 10153a5a1b3Sopenharmony_ci MAKE_ELEMENT(capsf, "capsfilter"); 10253a5a1b3Sopenharmony_ci MAKE_ELEMENT(rtpbin, "rtpbin"); 10353a5a1b3Sopenharmony_ci MAKE_ELEMENT(sink, "udpsink"); 10453a5a1b3Sopenharmony_ci 10553a5a1b3Sopenharmony_ci c->pipeline = gst_pipeline_new(NULL); 10653a5a1b3Sopenharmony_ci 10753a5a1b3Sopenharmony_ci gst_bin_add_many(GST_BIN(c->pipeline), appsrc, pay, capsf, rtpbin, sink, NULL); 10853a5a1b3Sopenharmony_ci 10953a5a1b3Sopenharmony_ci if (enable_opus) 11053a5a1b3Sopenharmony_ci gst_bin_add_many(GST_BIN(c->pipeline), opusenc, NULL); 11153a5a1b3Sopenharmony_ci 11253a5a1b3Sopenharmony_ci caps = caps_from_sample_spec(ss, enable_opus); 11353a5a1b3Sopenharmony_ci if (!caps) { 11453a5a1b3Sopenharmony_ci pa_log("Unsupported format to payload"); 11553a5a1b3Sopenharmony_ci goto fail; 11653a5a1b3Sopenharmony_ci } 11753a5a1b3Sopenharmony_ci 11853a5a1b3Sopenharmony_ci socket = g_socket_new_from_fd(fd, NULL); 11953a5a1b3Sopenharmony_ci if (!socket) { 12053a5a1b3Sopenharmony_ci pa_log("Failed to create socket"); 12153a5a1b3Sopenharmony_ci goto fail; 12253a5a1b3Sopenharmony_ci } 12353a5a1b3Sopenharmony_ci 12453a5a1b3Sopenharmony_ci addr = G_INET_SOCKET_ADDRESS(g_socket_get_remote_address(socket, NULL)); 12553a5a1b3Sopenharmony_ci iaddr = g_inet_socket_address_get_address(addr); 12653a5a1b3Sopenharmony_ci addr_str = g_inet_address_to_string(iaddr); 12753a5a1b3Sopenharmony_ci port = g_inet_socket_address_get_port(addr); 12853a5a1b3Sopenharmony_ci 12953a5a1b3Sopenharmony_ci g_object_set(appsrc, "caps", caps, "is-live", TRUE, "blocksize", mtu, "format", 3 /* time */, NULL); 13053a5a1b3Sopenharmony_ci g_object_set(pay, "mtu", mtu, NULL); 13153a5a1b3Sopenharmony_ci g_object_set(sink, "socket", socket, "host", addr_str, "port", port, 13253a5a1b3Sopenharmony_ci "enable-last-sample", FALSE, "sync", FALSE, "loop", 13353a5a1b3Sopenharmony_ci g_socket_get_multicast_loopback(socket), "ttl", 13453a5a1b3Sopenharmony_ci g_socket_get_ttl(socket), "ttl-mc", 13553a5a1b3Sopenharmony_ci g_socket_get_multicast_ttl(socket), "auto-multicast", FALSE, 13653a5a1b3Sopenharmony_ci NULL); 13753a5a1b3Sopenharmony_ci 13853a5a1b3Sopenharmony_ci g_free(addr_str); 13953a5a1b3Sopenharmony_ci g_object_unref(addr); 14053a5a1b3Sopenharmony_ci g_object_unref(socket); 14153a5a1b3Sopenharmony_ci 14253a5a1b3Sopenharmony_ci gst_caps_unref(caps); 14353a5a1b3Sopenharmony_ci 14453a5a1b3Sopenharmony_ci /* Force the payload type that we want */ 14553a5a1b3Sopenharmony_ci if (enable_opus) 14653a5a1b3Sopenharmony_ci caps = gst_caps_new_simple("application/x-rtp", "payload", G_TYPE_INT, (int) RTP_OPUS_PAYLOAD_TYPE, "encoding-name", G_TYPE_STRING, "OPUS", NULL); 14753a5a1b3Sopenharmony_ci else 14853a5a1b3Sopenharmony_ci caps = gst_caps_new_simple("application/x-rtp", "payload", G_TYPE_INT, (int) payload, "encoding-name", G_TYPE_STRING, "L16", NULL); 14953a5a1b3Sopenharmony_ci 15053a5a1b3Sopenharmony_ci g_object_set(capsf, "caps", caps, NULL); 15153a5a1b3Sopenharmony_ci gst_caps_unref(caps); 15253a5a1b3Sopenharmony_ci 15353a5a1b3Sopenharmony_ci if (enable_opus) { 15453a5a1b3Sopenharmony_ci if (!gst_element_link(appsrc, opusenc) || 15553a5a1b3Sopenharmony_ci !gst_element_link(opusenc, pay) || 15653a5a1b3Sopenharmony_ci !gst_element_link(pay, capsf) || 15753a5a1b3Sopenharmony_ci !gst_element_link_pads(capsf, "src", rtpbin, "send_rtp_sink_0") || 15853a5a1b3Sopenharmony_ci !gst_element_link_pads(rtpbin, "send_rtp_src_0", sink, "sink")) { 15953a5a1b3Sopenharmony_ci 16053a5a1b3Sopenharmony_ci pa_log("Could not set up send pipeline"); 16153a5a1b3Sopenharmony_ci goto fail; 16253a5a1b3Sopenharmony_ci } 16353a5a1b3Sopenharmony_ci } else { 16453a5a1b3Sopenharmony_ci if (!gst_element_link(appsrc, pay) || 16553a5a1b3Sopenharmony_ci !gst_element_link(pay, capsf) || 16653a5a1b3Sopenharmony_ci !gst_element_link_pads(capsf, "src", rtpbin, "send_rtp_sink_0") || 16753a5a1b3Sopenharmony_ci !gst_element_link_pads(rtpbin, "send_rtp_src_0", sink, "sink")) { 16853a5a1b3Sopenharmony_ci 16953a5a1b3Sopenharmony_ci pa_log("Could not set up send pipeline"); 17053a5a1b3Sopenharmony_ci goto fail; 17153a5a1b3Sopenharmony_ci } 17253a5a1b3Sopenharmony_ci } 17353a5a1b3Sopenharmony_ci 17453a5a1b3Sopenharmony_ci if (gst_element_set_state(c->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { 17553a5a1b3Sopenharmony_ci pa_log("Could not start pipeline"); 17653a5a1b3Sopenharmony_ci goto fail; 17753a5a1b3Sopenharmony_ci } 17853a5a1b3Sopenharmony_ci 17953a5a1b3Sopenharmony_ci c->appsrc = gst_object_ref(appsrc); 18053a5a1b3Sopenharmony_ci 18153a5a1b3Sopenharmony_ci return true; 18253a5a1b3Sopenharmony_ci 18353a5a1b3Sopenharmony_cifail: 18453a5a1b3Sopenharmony_ci if (c->pipeline) { 18553a5a1b3Sopenharmony_ci gst_object_unref(c->pipeline); 18653a5a1b3Sopenharmony_ci } else { 18753a5a1b3Sopenharmony_ci /* These weren't yet added to pipeline, so we still have a ref */ 18853a5a1b3Sopenharmony_ci if (appsrc) 18953a5a1b3Sopenharmony_ci gst_object_unref(appsrc); 19053a5a1b3Sopenharmony_ci if (opusenc) 19153a5a1b3Sopenharmony_ci gst_object_unref(opusenc); 19253a5a1b3Sopenharmony_ci if (pay) 19353a5a1b3Sopenharmony_ci gst_object_unref(pay); 19453a5a1b3Sopenharmony_ci if (capsf) 19553a5a1b3Sopenharmony_ci gst_object_unref(capsf); 19653a5a1b3Sopenharmony_ci if (rtpbin) 19753a5a1b3Sopenharmony_ci gst_object_unref(rtpbin); 19853a5a1b3Sopenharmony_ci if (sink) 19953a5a1b3Sopenharmony_ci gst_object_unref(sink); 20053a5a1b3Sopenharmony_ci } 20153a5a1b3Sopenharmony_ci 20253a5a1b3Sopenharmony_ci return false; 20353a5a1b3Sopenharmony_ci} 20453a5a1b3Sopenharmony_ci 20553a5a1b3Sopenharmony_cipa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss, bool enable_opus) { 20653a5a1b3Sopenharmony_ci pa_rtp_context *c = NULL; 20753a5a1b3Sopenharmony_ci GError *error = NULL; 20853a5a1b3Sopenharmony_ci 20953a5a1b3Sopenharmony_ci pa_assert(fd >= 0); 21053a5a1b3Sopenharmony_ci 21153a5a1b3Sopenharmony_ci pa_log_info("Initialising GStreamer RTP backend for send"); 21253a5a1b3Sopenharmony_ci 21353a5a1b3Sopenharmony_ci if (enable_opus) 21453a5a1b3Sopenharmony_ci pa_log_info("Using OPUS encoding for RTP send"); 21553a5a1b3Sopenharmony_ci 21653a5a1b3Sopenharmony_ci c = pa_xnew0(pa_rtp_context, 1); 21753a5a1b3Sopenharmony_ci 21853a5a1b3Sopenharmony_ci c->ss = *ss; 21953a5a1b3Sopenharmony_ci c->mtu = mtu - RTP_HEADER_SIZE; 22053a5a1b3Sopenharmony_ci c->send_buf = pa_xmalloc(c->mtu); 22153a5a1b3Sopenharmony_ci 22253a5a1b3Sopenharmony_ci if (!gst_init_check(NULL, NULL, &error)) { 22353a5a1b3Sopenharmony_ci pa_log_error("Could not initialise GStreamer: %s", error->message); 22453a5a1b3Sopenharmony_ci g_error_free(error); 22553a5a1b3Sopenharmony_ci goto fail; 22653a5a1b3Sopenharmony_ci } 22753a5a1b3Sopenharmony_ci 22853a5a1b3Sopenharmony_ci if (!init_send_pipeline(c, fd, payload, mtu, ss, enable_opus)) 22953a5a1b3Sopenharmony_ci goto fail; 23053a5a1b3Sopenharmony_ci 23153a5a1b3Sopenharmony_ci return c; 23253a5a1b3Sopenharmony_ci 23353a5a1b3Sopenharmony_cifail: 23453a5a1b3Sopenharmony_ci pa_rtp_context_free(c); 23553a5a1b3Sopenharmony_ci return NULL; 23653a5a1b3Sopenharmony_ci} 23753a5a1b3Sopenharmony_ci 23853a5a1b3Sopenharmony_ci/* Called from I/O thread context */ 23953a5a1b3Sopenharmony_cistatic bool process_bus_messages(pa_rtp_context *c) { 24053a5a1b3Sopenharmony_ci GstBus *bus; 24153a5a1b3Sopenharmony_ci GstMessage *message; 24253a5a1b3Sopenharmony_ci bool ret = true; 24353a5a1b3Sopenharmony_ci 24453a5a1b3Sopenharmony_ci bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline)); 24553a5a1b3Sopenharmony_ci 24653a5a1b3Sopenharmony_ci while (ret && (message = gst_bus_pop(bus))) { 24753a5a1b3Sopenharmony_ci if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) { 24853a5a1b3Sopenharmony_ci GError *error = NULL; 24953a5a1b3Sopenharmony_ci 25053a5a1b3Sopenharmony_ci ret = false; 25153a5a1b3Sopenharmony_ci 25253a5a1b3Sopenharmony_ci gst_message_parse_error(message, &error, NULL); 25353a5a1b3Sopenharmony_ci pa_log("Got an error: %s", error->message); 25453a5a1b3Sopenharmony_ci 25553a5a1b3Sopenharmony_ci g_error_free(error); 25653a5a1b3Sopenharmony_ci } 25753a5a1b3Sopenharmony_ci 25853a5a1b3Sopenharmony_ci gst_message_unref(message); 25953a5a1b3Sopenharmony_ci } 26053a5a1b3Sopenharmony_ci 26153a5a1b3Sopenharmony_ci gst_object_unref(bus); 26253a5a1b3Sopenharmony_ci 26353a5a1b3Sopenharmony_ci return ret; 26453a5a1b3Sopenharmony_ci} 26553a5a1b3Sopenharmony_ci 26653a5a1b3Sopenharmony_ci/* Called from I/O thread context */ 26753a5a1b3Sopenharmony_ciint pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) { 26853a5a1b3Sopenharmony_ci GstBuffer *buf; 26953a5a1b3Sopenharmony_ci size_t n = 0; 27053a5a1b3Sopenharmony_ci 27153a5a1b3Sopenharmony_ci pa_assert(c); 27253a5a1b3Sopenharmony_ci pa_assert(q); 27353a5a1b3Sopenharmony_ci 27453a5a1b3Sopenharmony_ci if (!process_bus_messages(c)) 27553a5a1b3Sopenharmony_ci return -1; 27653a5a1b3Sopenharmony_ci 27753a5a1b3Sopenharmony_ci /* 27853a5a1b3Sopenharmony_ci * While we check here for atleast MTU worth of data being available in 27953a5a1b3Sopenharmony_ci * memblockq, we might not have exact equivalent to MTU. Hence, we walk 28053a5a1b3Sopenharmony_ci * over the memchunks in memblockq and accumulate MTU bytes next. 28153a5a1b3Sopenharmony_ci */ 28253a5a1b3Sopenharmony_ci if (pa_memblockq_get_length(q) < c->mtu) 28353a5a1b3Sopenharmony_ci return 0; 28453a5a1b3Sopenharmony_ci 28553a5a1b3Sopenharmony_ci for (;;) { 28653a5a1b3Sopenharmony_ci pa_memchunk chunk; 28753a5a1b3Sopenharmony_ci int r; 28853a5a1b3Sopenharmony_ci 28953a5a1b3Sopenharmony_ci pa_memchunk_reset(&chunk); 29053a5a1b3Sopenharmony_ci 29153a5a1b3Sopenharmony_ci if ((r = pa_memblockq_peek(q, &chunk)) >= 0) { 29253a5a1b3Sopenharmony_ci /* 29353a5a1b3Sopenharmony_ci * Accumulate MTU bytes of data before sending. If the current 29453a5a1b3Sopenharmony_ci * chunk length + accumulated bytes exceeds MTU, we drop bytes 29553a5a1b3Sopenharmony_ci * considered for transfer in this iteration from memblockq. 29653a5a1b3Sopenharmony_ci * 29753a5a1b3Sopenharmony_ci * The remaining bytes will be available in the next iteration, 29853a5a1b3Sopenharmony_ci * as these will be tracked and maintained by memblockq. 29953a5a1b3Sopenharmony_ci */ 30053a5a1b3Sopenharmony_ci size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length; 30153a5a1b3Sopenharmony_ci 30253a5a1b3Sopenharmony_ci pa_assert(chunk.memblock); 30353a5a1b3Sopenharmony_ci 30453a5a1b3Sopenharmony_ci memcpy(c->send_buf + n, pa_memblock_acquire_chunk(&chunk), k); 30553a5a1b3Sopenharmony_ci pa_memblock_release(chunk.memblock); 30653a5a1b3Sopenharmony_ci pa_memblock_unref(chunk.memblock); 30753a5a1b3Sopenharmony_ci 30853a5a1b3Sopenharmony_ci n += k; 30953a5a1b3Sopenharmony_ci pa_memblockq_drop(q, k); 31053a5a1b3Sopenharmony_ci } 31153a5a1b3Sopenharmony_ci 31253a5a1b3Sopenharmony_ci if (r < 0 || n >= c->mtu) { 31353a5a1b3Sopenharmony_ci GstClock *clock; 31453a5a1b3Sopenharmony_ci GstClockTime timestamp, clock_time; 31553a5a1b3Sopenharmony_ci GstMapInfo info; 31653a5a1b3Sopenharmony_ci 31753a5a1b3Sopenharmony_ci if (n > 0) { 31853a5a1b3Sopenharmony_ci clock = gst_element_get_clock(c->pipeline); 31953a5a1b3Sopenharmony_ci clock_time = gst_clock_get_time(clock); 32053a5a1b3Sopenharmony_ci gst_object_unref(clock); 32153a5a1b3Sopenharmony_ci 32253a5a1b3Sopenharmony_ci timestamp = gst_element_get_base_time(c->pipeline); 32353a5a1b3Sopenharmony_ci if (timestamp > clock_time) 32453a5a1b3Sopenharmony_ci timestamp -= clock_time; 32553a5a1b3Sopenharmony_ci else 32653a5a1b3Sopenharmony_ci timestamp = 0; 32753a5a1b3Sopenharmony_ci 32853a5a1b3Sopenharmony_ci buf = gst_buffer_new_allocate(NULL, n, NULL); 32953a5a1b3Sopenharmony_ci pa_assert(buf); 33053a5a1b3Sopenharmony_ci 33153a5a1b3Sopenharmony_ci GST_BUFFER_PTS(buf) = timestamp; 33253a5a1b3Sopenharmony_ci 33353a5a1b3Sopenharmony_ci pa_assert_se(gst_buffer_map(buf, &info, GST_MAP_WRITE)); 33453a5a1b3Sopenharmony_ci 33553a5a1b3Sopenharmony_ci memcpy(info.data, c->send_buf, n); 33653a5a1b3Sopenharmony_ci gst_buffer_unmap(buf, &info); 33753a5a1b3Sopenharmony_ci 33853a5a1b3Sopenharmony_ci if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) { 33953a5a1b3Sopenharmony_ci pa_log_error("Could not push buffer"); 34053a5a1b3Sopenharmony_ci return -1; 34153a5a1b3Sopenharmony_ci } 34253a5a1b3Sopenharmony_ci } 34353a5a1b3Sopenharmony_ci 34453a5a1b3Sopenharmony_ci if (r < 0 || pa_memblockq_get_length(q) < c->mtu) 34553a5a1b3Sopenharmony_ci break; 34653a5a1b3Sopenharmony_ci 34753a5a1b3Sopenharmony_ci n = 0; 34853a5a1b3Sopenharmony_ci } 34953a5a1b3Sopenharmony_ci } 35053a5a1b3Sopenharmony_ci 35153a5a1b3Sopenharmony_ci return 0; 35253a5a1b3Sopenharmony_ci} 35353a5a1b3Sopenharmony_ci 35453a5a1b3Sopenharmony_cistatic GstCaps* rtp_caps_from_sample_spec(const pa_sample_spec *ss, bool enable_opus) { 35553a5a1b3Sopenharmony_ci if (ss->format != PA_SAMPLE_S16BE && ss->format != PA_SAMPLE_S16LE) 35653a5a1b3Sopenharmony_ci return NULL; 35753a5a1b3Sopenharmony_ci 35853a5a1b3Sopenharmony_ci if (enable_opus) 35953a5a1b3Sopenharmony_ci return gst_caps_new_simple("application/x-rtp", 36053a5a1b3Sopenharmony_ci "media", G_TYPE_STRING, "audio", 36153a5a1b3Sopenharmony_ci "encoding-name", G_TYPE_STRING, "OPUS", 36253a5a1b3Sopenharmony_ci "clock-rate", G_TYPE_INT, (int) 48000, 36353a5a1b3Sopenharmony_ci "payload", G_TYPE_INT, (int) RTP_OPUS_PAYLOAD_TYPE, 36453a5a1b3Sopenharmony_ci NULL); 36553a5a1b3Sopenharmony_ci 36653a5a1b3Sopenharmony_ci return gst_caps_new_simple("application/x-rtp", 36753a5a1b3Sopenharmony_ci "media", G_TYPE_STRING, "audio", 36853a5a1b3Sopenharmony_ci "encoding-name", G_TYPE_STRING, "L16", 36953a5a1b3Sopenharmony_ci "clock-rate", G_TYPE_INT, (int) ss->rate, 37053a5a1b3Sopenharmony_ci "payload", G_TYPE_INT, (int) pa_rtp_payload_from_sample_spec(ss), 37153a5a1b3Sopenharmony_ci "layout", G_TYPE_STRING, "interleaved", 37253a5a1b3Sopenharmony_ci NULL); 37353a5a1b3Sopenharmony_ci} 37453a5a1b3Sopenharmony_ci 37553a5a1b3Sopenharmony_cistatic void on_pad_added(GstElement *element, GstPad *pad, gpointer userdata) { 37653a5a1b3Sopenharmony_ci pa_rtp_context *c = (pa_rtp_context *) userdata; 37753a5a1b3Sopenharmony_ci GstElement *depay; 37853a5a1b3Sopenharmony_ci GstPad *sinkpad; 37953a5a1b3Sopenharmony_ci GstPadLinkReturn ret; 38053a5a1b3Sopenharmony_ci 38153a5a1b3Sopenharmony_ci depay = gst_bin_get_by_name(GST_BIN(c->pipeline), "depay"); 38253a5a1b3Sopenharmony_ci pa_assert(depay); 38353a5a1b3Sopenharmony_ci 38453a5a1b3Sopenharmony_ci sinkpad = gst_element_get_static_pad(depay, "sink"); 38553a5a1b3Sopenharmony_ci 38653a5a1b3Sopenharmony_ci ret = gst_pad_link(pad, sinkpad); 38753a5a1b3Sopenharmony_ci if (ret != GST_PAD_LINK_OK) { 38853a5a1b3Sopenharmony_ci GstBus *bus; 38953a5a1b3Sopenharmony_ci GError *error; 39053a5a1b3Sopenharmony_ci 39153a5a1b3Sopenharmony_ci bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline)); 39253a5a1b3Sopenharmony_ci error = g_error_new(GST_CORE_ERROR, GST_CORE_ERROR_PAD, "Could not link rtpbin to depayloader"); 39353a5a1b3Sopenharmony_ci gst_bus_post(bus, gst_message_new_error(GST_OBJECT(c->pipeline), error, NULL)); 39453a5a1b3Sopenharmony_ci 39553a5a1b3Sopenharmony_ci /* Actually cause the I/O thread to wake up and process the error */ 39653a5a1b3Sopenharmony_ci pa_fdsem_post(c->fdsem); 39753a5a1b3Sopenharmony_ci 39853a5a1b3Sopenharmony_ci g_error_free(error); 39953a5a1b3Sopenharmony_ci gst_object_unref(bus); 40053a5a1b3Sopenharmony_ci } 40153a5a1b3Sopenharmony_ci 40253a5a1b3Sopenharmony_ci gst_object_unref(sinkpad); 40353a5a1b3Sopenharmony_ci gst_object_unref(depay); 40453a5a1b3Sopenharmony_ci} 40553a5a1b3Sopenharmony_ci 40653a5a1b3Sopenharmony_cistatic GstPadProbeReturn udpsrc_buffer_probe(GstPad *pad, GstPadProbeInfo *info, gpointer userdata) { 40753a5a1b3Sopenharmony_ci struct timeval tv; 40853a5a1b3Sopenharmony_ci pa_usec_t timestamp; 40953a5a1b3Sopenharmony_ci pa_rtp_context *c = (pa_rtp_context *) userdata; 41053a5a1b3Sopenharmony_ci 41153a5a1b3Sopenharmony_ci pa_assert(info->type & GST_PAD_PROBE_TYPE_BUFFER); 41253a5a1b3Sopenharmony_ci 41353a5a1b3Sopenharmony_ci pa_gettimeofday(&tv); 41453a5a1b3Sopenharmony_ci timestamp = pa_timeval_load(&tv); 41553a5a1b3Sopenharmony_ci 41653a5a1b3Sopenharmony_ci gst_buffer_add_reference_timestamp_meta(GST_BUFFER(info->data), c->meta_reference, timestamp * GST_USECOND, 41753a5a1b3Sopenharmony_ci GST_CLOCK_TIME_NONE); 41853a5a1b3Sopenharmony_ci 41953a5a1b3Sopenharmony_ci return GST_PAD_PROBE_OK; 42053a5a1b3Sopenharmony_ci} 42153a5a1b3Sopenharmony_ci 42253a5a1b3Sopenharmony_cistatic bool init_receive_pipeline(pa_rtp_context *c, int fd, const pa_sample_spec *ss, bool enable_opus) { 42353a5a1b3Sopenharmony_ci GstElement *udpsrc = NULL, *rtpbin = NULL, *depay = NULL, *appsink = NULL; 42453a5a1b3Sopenharmony_ci GstElement *resample = NULL, *opusdec = NULL; 42553a5a1b3Sopenharmony_ci GstCaps *caps, *sink_caps; 42653a5a1b3Sopenharmony_ci GstPad *pad; 42753a5a1b3Sopenharmony_ci GSocket *socket; 42853a5a1b3Sopenharmony_ci GError *error = NULL; 42953a5a1b3Sopenharmony_ci 43053a5a1b3Sopenharmony_ci MAKE_ELEMENT(udpsrc, "udpsrc"); 43153a5a1b3Sopenharmony_ci MAKE_ELEMENT(rtpbin, "rtpbin"); 43253a5a1b3Sopenharmony_ci if (enable_opus) { 43353a5a1b3Sopenharmony_ci MAKE_ELEMENT_NAMED(depay, "rtpopusdepay", "depay"); 43453a5a1b3Sopenharmony_ci MAKE_ELEMENT(opusdec, "opusdec"); 43553a5a1b3Sopenharmony_ci MAKE_ELEMENT(resample, "audioresample"); 43653a5a1b3Sopenharmony_ci } else { 43753a5a1b3Sopenharmony_ci MAKE_ELEMENT_NAMED(depay, "rtpL16depay", "depay"); 43853a5a1b3Sopenharmony_ci } 43953a5a1b3Sopenharmony_ci MAKE_ELEMENT(appsink, "appsink"); 44053a5a1b3Sopenharmony_ci 44153a5a1b3Sopenharmony_ci c->pipeline = gst_pipeline_new(NULL); 44253a5a1b3Sopenharmony_ci 44353a5a1b3Sopenharmony_ci gst_bin_add_many(GST_BIN(c->pipeline), udpsrc, rtpbin, depay, appsink, NULL); 44453a5a1b3Sopenharmony_ci 44553a5a1b3Sopenharmony_ci if (enable_opus) 44653a5a1b3Sopenharmony_ci gst_bin_add_many(GST_BIN(c->pipeline), opusdec, resample, NULL); 44753a5a1b3Sopenharmony_ci 44853a5a1b3Sopenharmony_ci socket = g_socket_new_from_fd(fd, &error); 44953a5a1b3Sopenharmony_ci if (error) { 45053a5a1b3Sopenharmony_ci pa_log("Could not create socket: %s", error->message); 45153a5a1b3Sopenharmony_ci g_error_free(error); 45253a5a1b3Sopenharmony_ci goto fail; 45353a5a1b3Sopenharmony_ci } 45453a5a1b3Sopenharmony_ci 45553a5a1b3Sopenharmony_ci caps = rtp_caps_from_sample_spec(ss, enable_opus); 45653a5a1b3Sopenharmony_ci if (!caps) { 45753a5a1b3Sopenharmony_ci pa_log("Unsupported format to payload"); 45853a5a1b3Sopenharmony_ci goto fail; 45953a5a1b3Sopenharmony_ci } 46053a5a1b3Sopenharmony_ci 46153a5a1b3Sopenharmony_ci g_object_set(udpsrc, "socket", socket, "caps", caps, "auto-multicast" /* caller handles this */, FALSE, NULL); 46253a5a1b3Sopenharmony_ci g_object_set(rtpbin, "latency", 0, "buffer-mode", 0 /* none */, NULL); 46353a5a1b3Sopenharmony_ci g_object_set(appsink, "sync", FALSE, "enable-last-sample", FALSE, NULL); 46453a5a1b3Sopenharmony_ci 46553a5a1b3Sopenharmony_ci if (enable_opus) { 46653a5a1b3Sopenharmony_ci sink_caps = gst_caps_new_simple("audio/x-raw", 46753a5a1b3Sopenharmony_ci "format", G_TYPE_STRING, "S16LE", 46853a5a1b3Sopenharmony_ci "layout", G_TYPE_STRING, "interleaved", 46953a5a1b3Sopenharmony_ci "clock-rate", G_TYPE_INT, (int) ss->rate, 47053a5a1b3Sopenharmony_ci "channels", G_TYPE_INT, (int) ss->channels, 47153a5a1b3Sopenharmony_ci NULL); 47253a5a1b3Sopenharmony_ci g_object_set(appsink, "caps", sink_caps, NULL); 47353a5a1b3Sopenharmony_ci g_object_set(opusdec, "plc", TRUE, NULL); 47453a5a1b3Sopenharmony_ci gst_caps_unref(sink_caps); 47553a5a1b3Sopenharmony_ci } 47653a5a1b3Sopenharmony_ci 47753a5a1b3Sopenharmony_ci gst_caps_unref(caps); 47853a5a1b3Sopenharmony_ci g_object_unref(socket); 47953a5a1b3Sopenharmony_ci 48053a5a1b3Sopenharmony_ci if (enable_opus) { 48153a5a1b3Sopenharmony_ci if (!gst_element_link_pads(udpsrc, "src", rtpbin, "recv_rtp_sink_0") || 48253a5a1b3Sopenharmony_ci !gst_element_link(depay, opusdec) || 48353a5a1b3Sopenharmony_ci !gst_element_link(opusdec, resample) || 48453a5a1b3Sopenharmony_ci !gst_element_link(resample, appsink)) { 48553a5a1b3Sopenharmony_ci 48653a5a1b3Sopenharmony_ci pa_log("Could not set up receive pipeline"); 48753a5a1b3Sopenharmony_ci goto fail; 48853a5a1b3Sopenharmony_ci } 48953a5a1b3Sopenharmony_ci } else { 49053a5a1b3Sopenharmony_ci if (!gst_element_link_pads(udpsrc, "src", rtpbin, "recv_rtp_sink_0") || 49153a5a1b3Sopenharmony_ci !gst_element_link(depay, appsink)) { 49253a5a1b3Sopenharmony_ci 49353a5a1b3Sopenharmony_ci pa_log("Could not set up receive pipeline"); 49453a5a1b3Sopenharmony_ci goto fail; 49553a5a1b3Sopenharmony_ci } 49653a5a1b3Sopenharmony_ci } 49753a5a1b3Sopenharmony_ci 49853a5a1b3Sopenharmony_ci g_signal_connect(G_OBJECT(rtpbin), "pad-added", G_CALLBACK(on_pad_added), c); 49953a5a1b3Sopenharmony_ci 50053a5a1b3Sopenharmony_ci /* This logic should go into udpsrc, and we should be populating the 50153a5a1b3Sopenharmony_ci * receive timestamp using SCM_TIMESTAMP, but until we have that ... */ 50253a5a1b3Sopenharmony_ci c->meta_reference = gst_caps_new_empty_simple("timestamp/x-pulseaudio-wallclock"); 50353a5a1b3Sopenharmony_ci 50453a5a1b3Sopenharmony_ci pad = gst_element_get_static_pad(udpsrc, "src"); 50553a5a1b3Sopenharmony_ci gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, udpsrc_buffer_probe, c, NULL); 50653a5a1b3Sopenharmony_ci gst_object_unref(pad); 50753a5a1b3Sopenharmony_ci 50853a5a1b3Sopenharmony_ci if (gst_element_set_state(c->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { 50953a5a1b3Sopenharmony_ci pa_log("Could not start pipeline"); 51053a5a1b3Sopenharmony_ci goto fail; 51153a5a1b3Sopenharmony_ci } 51253a5a1b3Sopenharmony_ci 51353a5a1b3Sopenharmony_ci c->appsink = gst_object_ref(appsink); 51453a5a1b3Sopenharmony_ci 51553a5a1b3Sopenharmony_ci return true; 51653a5a1b3Sopenharmony_ci 51753a5a1b3Sopenharmony_cifail: 51853a5a1b3Sopenharmony_ci if (c->pipeline) { 51953a5a1b3Sopenharmony_ci gst_object_unref(c->pipeline); 52053a5a1b3Sopenharmony_ci } else { 52153a5a1b3Sopenharmony_ci /* These weren't yet added to pipeline, so we still have a ref */ 52253a5a1b3Sopenharmony_ci if (udpsrc) 52353a5a1b3Sopenharmony_ci gst_object_unref(udpsrc); 52453a5a1b3Sopenharmony_ci if (depay) 52553a5a1b3Sopenharmony_ci gst_object_unref(depay); 52653a5a1b3Sopenharmony_ci if (rtpbin) 52753a5a1b3Sopenharmony_ci gst_object_unref(rtpbin); 52853a5a1b3Sopenharmony_ci if (opusdec) 52953a5a1b3Sopenharmony_ci gst_object_unref(opusdec); 53053a5a1b3Sopenharmony_ci if (resample) 53153a5a1b3Sopenharmony_ci gst_object_unref(resample); 53253a5a1b3Sopenharmony_ci if (appsink) 53353a5a1b3Sopenharmony_ci gst_object_unref(appsink); 53453a5a1b3Sopenharmony_ci } 53553a5a1b3Sopenharmony_ci 53653a5a1b3Sopenharmony_ci return false; 53753a5a1b3Sopenharmony_ci} 53853a5a1b3Sopenharmony_ci 53953a5a1b3Sopenharmony_ci/* Called from the GStreamer streaming thread */ 54053a5a1b3Sopenharmony_cistatic void appsink_eos(GstAppSink *appsink, gpointer userdata) { 54153a5a1b3Sopenharmony_ci pa_rtp_context *c = (pa_rtp_context *) userdata; 54253a5a1b3Sopenharmony_ci 54353a5a1b3Sopenharmony_ci pa_fdsem_post(c->fdsem); 54453a5a1b3Sopenharmony_ci} 54553a5a1b3Sopenharmony_ci 54653a5a1b3Sopenharmony_ci/* Called from the GStreamer streaming thread */ 54753a5a1b3Sopenharmony_cistatic GstFlowReturn appsink_new_sample(GstAppSink *appsink, gpointer userdata) { 54853a5a1b3Sopenharmony_ci pa_rtp_context *c = (pa_rtp_context *) userdata; 54953a5a1b3Sopenharmony_ci 55053a5a1b3Sopenharmony_ci pa_fdsem_post(c->fdsem); 55153a5a1b3Sopenharmony_ci 55253a5a1b3Sopenharmony_ci return GST_FLOW_OK; 55353a5a1b3Sopenharmony_ci} 55453a5a1b3Sopenharmony_ci 55553a5a1b3Sopenharmony_cipa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss, bool enable_opus) { 55653a5a1b3Sopenharmony_ci pa_rtp_context *c = NULL; 55753a5a1b3Sopenharmony_ci GstAppSinkCallbacks callbacks = { 0, }; 55853a5a1b3Sopenharmony_ci GError *error = NULL; 55953a5a1b3Sopenharmony_ci 56053a5a1b3Sopenharmony_ci pa_assert(fd >= 0); 56153a5a1b3Sopenharmony_ci 56253a5a1b3Sopenharmony_ci pa_log_info("Initialising GStreamer RTP backend for receive"); 56353a5a1b3Sopenharmony_ci 56453a5a1b3Sopenharmony_ci if (enable_opus) 56553a5a1b3Sopenharmony_ci pa_log_info("Using OPUS encoding for RTP recv"); 56653a5a1b3Sopenharmony_ci 56753a5a1b3Sopenharmony_ci c = pa_xnew0(pa_rtp_context, 1); 56853a5a1b3Sopenharmony_ci 56953a5a1b3Sopenharmony_ci c->fdsem = pa_fdsem_new(); 57053a5a1b3Sopenharmony_ci c->ss = *ss; 57153a5a1b3Sopenharmony_ci c->send_buf = NULL; 57253a5a1b3Sopenharmony_ci c->first_buffer = true; 57353a5a1b3Sopenharmony_ci 57453a5a1b3Sopenharmony_ci if (!gst_init_check(NULL, NULL, &error)) { 57553a5a1b3Sopenharmony_ci pa_log_error("Could not initialise GStreamer: %s", error->message); 57653a5a1b3Sopenharmony_ci g_error_free(error); 57753a5a1b3Sopenharmony_ci goto fail; 57853a5a1b3Sopenharmony_ci } 57953a5a1b3Sopenharmony_ci 58053a5a1b3Sopenharmony_ci if (!init_receive_pipeline(c, fd, ss, enable_opus)) 58153a5a1b3Sopenharmony_ci goto fail; 58253a5a1b3Sopenharmony_ci 58353a5a1b3Sopenharmony_ci callbacks.eos = appsink_eos; 58453a5a1b3Sopenharmony_ci callbacks.new_sample = appsink_new_sample; 58553a5a1b3Sopenharmony_ci gst_app_sink_set_callbacks(GST_APP_SINK(c->appsink), &callbacks, c, NULL); 58653a5a1b3Sopenharmony_ci 58753a5a1b3Sopenharmony_ci return c; 58853a5a1b3Sopenharmony_ci 58953a5a1b3Sopenharmony_cifail: 59053a5a1b3Sopenharmony_ci pa_rtp_context_free(c); 59153a5a1b3Sopenharmony_ci return NULL; 59253a5a1b3Sopenharmony_ci} 59353a5a1b3Sopenharmony_ci 59453a5a1b3Sopenharmony_ci/* Called from I/O thread context */ 59553a5a1b3Sopenharmony_ciint pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) { 59653a5a1b3Sopenharmony_ci GstSample *sample = NULL; 59753a5a1b3Sopenharmony_ci GstBufferList *buf_list; 59853a5a1b3Sopenharmony_ci GstAdapter *adapter = NULL; 59953a5a1b3Sopenharmony_ci GstBuffer *buf; 60053a5a1b3Sopenharmony_ci GstMapInfo info; 60153a5a1b3Sopenharmony_ci GstClockTime timestamp = GST_CLOCK_TIME_NONE; 60253a5a1b3Sopenharmony_ci uint8_t *data; 60353a5a1b3Sopenharmony_ci uint64_t data_len = 0; 60453a5a1b3Sopenharmony_ci 60553a5a1b3Sopenharmony_ci if (!process_bus_messages(c)) 60653a5a1b3Sopenharmony_ci goto fail; 60753a5a1b3Sopenharmony_ci 60853a5a1b3Sopenharmony_ci adapter = gst_adapter_new(); 60953a5a1b3Sopenharmony_ci pa_assert(adapter); 61053a5a1b3Sopenharmony_ci 61153a5a1b3Sopenharmony_ci while (true) { 61253a5a1b3Sopenharmony_ci sample = gst_app_sink_try_pull_sample(GST_APP_SINK(c->appsink), 0); 61353a5a1b3Sopenharmony_ci if (!sample) 61453a5a1b3Sopenharmony_ci break; 61553a5a1b3Sopenharmony_ci 61653a5a1b3Sopenharmony_ci buf = gst_sample_get_buffer(sample); 61753a5a1b3Sopenharmony_ci 61853a5a1b3Sopenharmony_ci /* Get the timestamp from the first buffer */ 61953a5a1b3Sopenharmony_ci if (timestamp == GST_CLOCK_TIME_NONE) { 62053a5a1b3Sopenharmony_ci GstReferenceTimestampMeta *meta = gst_buffer_get_reference_timestamp_meta(buf, c->meta_reference); 62153a5a1b3Sopenharmony_ci 62253a5a1b3Sopenharmony_ci /* Use the meta if we were able to insert it and it came through, 62353a5a1b3Sopenharmony_ci * else try to fallback to the DTS, which is only available in 62453a5a1b3Sopenharmony_ci * GStreamer 1.16 and earlier. */ 62553a5a1b3Sopenharmony_ci if (meta) 62653a5a1b3Sopenharmony_ci timestamp = meta->timestamp; 62753a5a1b3Sopenharmony_ci else if (GST_BUFFER_DTS(buf) != GST_CLOCK_TIME_NONE) 62853a5a1b3Sopenharmony_ci timestamp = GST_BUFFER_DTS(buf); 62953a5a1b3Sopenharmony_ci else 63053a5a1b3Sopenharmony_ci timestamp = 0; 63153a5a1b3Sopenharmony_ci } 63253a5a1b3Sopenharmony_ci 63353a5a1b3Sopenharmony_ci if (GST_BUFFER_IS_DISCONT(buf)) 63453a5a1b3Sopenharmony_ci pa_log_info("Discontinuity detected, possibly lost some packets"); 63553a5a1b3Sopenharmony_ci 63653a5a1b3Sopenharmony_ci if (!gst_buffer_map(buf, &info, GST_MAP_READ)) { 63753a5a1b3Sopenharmony_ci pa_log_info("Failed to map buffer"); 63853a5a1b3Sopenharmony_ci gst_sample_unref(sample); 63953a5a1b3Sopenharmony_ci goto fail; 64053a5a1b3Sopenharmony_ci } 64153a5a1b3Sopenharmony_ci 64253a5a1b3Sopenharmony_ci data_len += info.size; 64353a5a1b3Sopenharmony_ci /* We need the buffer to be valid longer than the sample, which will 64453a5a1b3Sopenharmony_ci * be valid only for the duration of this loop. 64553a5a1b3Sopenharmony_ci * 64653a5a1b3Sopenharmony_ci * To do this, increase the ref count. Ownership is transferred to the 64753a5a1b3Sopenharmony_ci * adapter in gst_adapter_push. 64853a5a1b3Sopenharmony_ci */ 64953a5a1b3Sopenharmony_ci gst_buffer_ref(buf); 65053a5a1b3Sopenharmony_ci gst_adapter_push(adapter, buf); 65153a5a1b3Sopenharmony_ci gst_buffer_unmap(buf, &info); 65253a5a1b3Sopenharmony_ci 65353a5a1b3Sopenharmony_ci gst_sample_unref(sample); 65453a5a1b3Sopenharmony_ci } 65553a5a1b3Sopenharmony_ci 65653a5a1b3Sopenharmony_ci buf_list = gst_adapter_take_buffer_list(adapter, data_len); 65753a5a1b3Sopenharmony_ci pa_assert(buf_list); 65853a5a1b3Sopenharmony_ci 65953a5a1b3Sopenharmony_ci pa_assert(pa_mempool_block_size_max(pool) >= data_len); 66053a5a1b3Sopenharmony_ci 66153a5a1b3Sopenharmony_ci chunk->memblock = pa_memblock_new(pool, data_len); 66253a5a1b3Sopenharmony_ci chunk->index = 0; 66353a5a1b3Sopenharmony_ci chunk->length = data_len; 66453a5a1b3Sopenharmony_ci 66553a5a1b3Sopenharmony_ci data = (uint8_t *) pa_memblock_acquire_chunk(chunk); 66653a5a1b3Sopenharmony_ci 66753a5a1b3Sopenharmony_ci for (int i = 0; i < gst_buffer_list_length(buf_list); i++) { 66853a5a1b3Sopenharmony_ci buf = gst_buffer_list_get(buf_list, i); 66953a5a1b3Sopenharmony_ci 67053a5a1b3Sopenharmony_ci if (!gst_buffer_map(buf, &info, GST_MAP_READ)) { 67153a5a1b3Sopenharmony_ci gst_buffer_list_unref(buf_list); 67253a5a1b3Sopenharmony_ci goto fail; 67353a5a1b3Sopenharmony_ci } 67453a5a1b3Sopenharmony_ci 67553a5a1b3Sopenharmony_ci memcpy(data, info.data, info.size); 67653a5a1b3Sopenharmony_ci data += info.size; 67753a5a1b3Sopenharmony_ci gst_buffer_unmap(buf, &info); 67853a5a1b3Sopenharmony_ci } 67953a5a1b3Sopenharmony_ci 68053a5a1b3Sopenharmony_ci pa_memblock_release(chunk->memblock); 68153a5a1b3Sopenharmony_ci 68253a5a1b3Sopenharmony_ci /* When buffer-mode = none, the buffer PTS is the RTP timestamp, converted 68353a5a1b3Sopenharmony_ci * to time units (instead of clock-rate units as is in the header) and 68453a5a1b3Sopenharmony_ci * wraparound-corrected. */ 68553a5a1b3Sopenharmony_ci *rtp_tstamp = gst_util_uint64_scale_int(GST_BUFFER_PTS(gst_buffer_list_get(buf_list, 0)), c->ss.rate, GST_SECOND) & 0xFFFFFFFFU; 68653a5a1b3Sopenharmony_ci if (timestamp != GST_CLOCK_TIME_NONE) 68753a5a1b3Sopenharmony_ci pa_timeval_rtstore(tstamp, timestamp / PA_NSEC_PER_USEC, false); 68853a5a1b3Sopenharmony_ci 68953a5a1b3Sopenharmony_ci if (c->first_buffer) { 69053a5a1b3Sopenharmony_ci c->first_buffer = false; 69153a5a1b3Sopenharmony_ci c->last_timestamp = *rtp_tstamp; 69253a5a1b3Sopenharmony_ci } else { 69353a5a1b3Sopenharmony_ci /* The RTP clock -> time domain -> RTP clock transformation above might 69453a5a1b3Sopenharmony_ci * add a ±1 rounding error, so let's get rid of that */ 69553a5a1b3Sopenharmony_ci uint32_t expected = c->last_timestamp + (uint32_t) (data_len / pa_rtp_context_get_frame_size(c)); 69653a5a1b3Sopenharmony_ci int delta = *rtp_tstamp - expected; 69753a5a1b3Sopenharmony_ci 69853a5a1b3Sopenharmony_ci if (delta == 1 || delta == -1) 69953a5a1b3Sopenharmony_ci *rtp_tstamp -= delta; 70053a5a1b3Sopenharmony_ci 70153a5a1b3Sopenharmony_ci c->last_timestamp = *rtp_tstamp; 70253a5a1b3Sopenharmony_ci } 70353a5a1b3Sopenharmony_ci 70453a5a1b3Sopenharmony_ci gst_buffer_list_unref(buf_list); 70553a5a1b3Sopenharmony_ci gst_object_unref(adapter); 70653a5a1b3Sopenharmony_ci 70753a5a1b3Sopenharmony_ci return 0; 70853a5a1b3Sopenharmony_ci 70953a5a1b3Sopenharmony_cifail: 71053a5a1b3Sopenharmony_ci if (adapter) 71153a5a1b3Sopenharmony_ci gst_object_unref(adapter); 71253a5a1b3Sopenharmony_ci 71353a5a1b3Sopenharmony_ci if (chunk->memblock) 71453a5a1b3Sopenharmony_ci pa_memblock_unref(chunk->memblock); 71553a5a1b3Sopenharmony_ci 71653a5a1b3Sopenharmony_ci return -1; 71753a5a1b3Sopenharmony_ci} 71853a5a1b3Sopenharmony_ci 71953a5a1b3Sopenharmony_civoid pa_rtp_context_free(pa_rtp_context *c) { 72053a5a1b3Sopenharmony_ci pa_assert(c); 72153a5a1b3Sopenharmony_ci 72253a5a1b3Sopenharmony_ci if (c->meta_reference) 72353a5a1b3Sopenharmony_ci gst_caps_unref(c->meta_reference); 72453a5a1b3Sopenharmony_ci 72553a5a1b3Sopenharmony_ci if (c->appsrc) { 72653a5a1b3Sopenharmony_ci gst_app_src_end_of_stream(GST_APP_SRC(c->appsrc)); 72753a5a1b3Sopenharmony_ci gst_object_unref(c->appsrc); 72853a5a1b3Sopenharmony_ci pa_xfree(c->send_buf); 72953a5a1b3Sopenharmony_ci } 73053a5a1b3Sopenharmony_ci 73153a5a1b3Sopenharmony_ci if (c->appsink) 73253a5a1b3Sopenharmony_ci gst_object_unref(c->appsink); 73353a5a1b3Sopenharmony_ci 73453a5a1b3Sopenharmony_ci if (c->pipeline) { 73553a5a1b3Sopenharmony_ci gst_element_set_state(c->pipeline, GST_STATE_NULL); 73653a5a1b3Sopenharmony_ci gst_object_unref(c->pipeline); 73753a5a1b3Sopenharmony_ci } 73853a5a1b3Sopenharmony_ci 73953a5a1b3Sopenharmony_ci if (c->fdsem) 74053a5a1b3Sopenharmony_ci pa_fdsem_free(c->fdsem); 74153a5a1b3Sopenharmony_ci 74253a5a1b3Sopenharmony_ci pa_xfree(c); 74353a5a1b3Sopenharmony_ci} 74453a5a1b3Sopenharmony_ci 74553a5a1b3Sopenharmony_cipa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll) { 74653a5a1b3Sopenharmony_ci return pa_rtpoll_item_new_fdsem(rtpoll, PA_RTPOLL_LATE, c->fdsem); 74753a5a1b3Sopenharmony_ci} 74853a5a1b3Sopenharmony_ci 74953a5a1b3Sopenharmony_cisize_t pa_rtp_context_get_frame_size(pa_rtp_context *c) { 75053a5a1b3Sopenharmony_ci return pa_frame_size(&c->ss); 75153a5a1b3Sopenharmony_ci} 752