153a5a1b3Sopenharmony_ci 253a5a1b3Sopenharmony_ci/*** 353a5a1b3Sopenharmony_ci This file is part of PulseAudio. 453a5a1b3Sopenharmony_ci 553a5a1b3Sopenharmony_ci Copyright 2006 Lennart Poettering 653a5a1b3Sopenharmony_ci 753a5a1b3Sopenharmony_ci PulseAudio is free software; you can redistribute it and/or modify 853a5a1b3Sopenharmony_ci it under the terms of the GNU Lesser General Public License as published 953a5a1b3Sopenharmony_ci by the Free Software Foundation; either version 2.1 of the License, 1053a5a1b3Sopenharmony_ci or (at your option) any later version. 1153a5a1b3Sopenharmony_ci 1253a5a1b3Sopenharmony_ci PulseAudio is distributed in the hope that it will be useful, but 1353a5a1b3Sopenharmony_ci WITHOUT ANY WARRANTY; without even the implied warranty of 1453a5a1b3Sopenharmony_ci MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 1553a5a1b3Sopenharmony_ci General Public License for more details. 1653a5a1b3Sopenharmony_ci 1753a5a1b3Sopenharmony_ci You should have received a copy of the GNU Lesser General Public License 1853a5a1b3Sopenharmony_ci along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 1953a5a1b3Sopenharmony_ci***/ 2053a5a1b3Sopenharmony_ci 2153a5a1b3Sopenharmony_ci#ifdef HAVE_CONFIG_H 2253a5a1b3Sopenharmony_ci#include <config.h> 2353a5a1b3Sopenharmony_ci#endif 2453a5a1b3Sopenharmony_ci 2553a5a1b3Sopenharmony_ci#include <stdio.h> 2653a5a1b3Sopenharmony_ci#include <sys/socket.h> 2753a5a1b3Sopenharmony_ci#include <netinet/in.h> 2853a5a1b3Sopenharmony_ci#include <errno.h> 2953a5a1b3Sopenharmony_ci#include <string.h> 3053a5a1b3Sopenharmony_ci#include <unistd.h> 3153a5a1b3Sopenharmony_ci#include <math.h> 3253a5a1b3Sopenharmony_ci 3353a5a1b3Sopenharmony_ci#include <pulse/rtclock.h> 3453a5a1b3Sopenharmony_ci#include <pulse/timeval.h> 3553a5a1b3Sopenharmony_ci#include <pulse/xmalloc.h> 3653a5a1b3Sopenharmony_ci 3753a5a1b3Sopenharmony_ci#include <pulsecore/core-error.h> 3853a5a1b3Sopenharmony_ci#include <pulsecore/module.h> 3953a5a1b3Sopenharmony_ci#include <pulsecore/llist.h> 4053a5a1b3Sopenharmony_ci#include <pulsecore/sink.h> 4153a5a1b3Sopenharmony_ci#include <pulsecore/sink-input.h> 4253a5a1b3Sopenharmony_ci#include <pulsecore/memblockq.h> 4353a5a1b3Sopenharmony_ci#include <pulsecore/log.h> 4453a5a1b3Sopenharmony_ci#include <pulsecore/core-rtclock.h> 4553a5a1b3Sopenharmony_ci#include <pulsecore/core-util.h> 4653a5a1b3Sopenharmony_ci#include <pulsecore/modargs.h> 4753a5a1b3Sopenharmony_ci#include <pulsecore/namereg.h> 4853a5a1b3Sopenharmony_ci#include <pulsecore/sample-util.h> 4953a5a1b3Sopenharmony_ci#include <pulsecore/macro.h> 5053a5a1b3Sopenharmony_ci#include <pulsecore/socket-util.h> 5153a5a1b3Sopenharmony_ci#include <pulsecore/atomic.h> 5253a5a1b3Sopenharmony_ci#include <pulsecore/once.h> 5353a5a1b3Sopenharmony_ci#include <pulsecore/poll.h> 5453a5a1b3Sopenharmony_ci#include <pulsecore/arpa-inet.h> 5553a5a1b3Sopenharmony_ci 5653a5a1b3Sopenharmony_ci#include "rtp.h" 5753a5a1b3Sopenharmony_ci#include "sdp.h" 5853a5a1b3Sopenharmony_ci#include "sap.h" 5953a5a1b3Sopenharmony_ci 6053a5a1b3Sopenharmony_ciPA_MODULE_AUTHOR("Lennart Poettering"); 6153a5a1b3Sopenharmony_ciPA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP"); 6253a5a1b3Sopenharmony_ciPA_MODULE_VERSION(PACKAGE_VERSION); 6353a5a1b3Sopenharmony_ciPA_MODULE_LOAD_ONCE(false); 6453a5a1b3Sopenharmony_ciPA_MODULE_USAGE( 6553a5a1b3Sopenharmony_ci "sink=<name of the sink> " 6653a5a1b3Sopenharmony_ci "sap_address=<multicast address to listen on> " 6753a5a1b3Sopenharmony_ci "latency_msec=<latency in ms> " 6853a5a1b3Sopenharmony_ci); 6953a5a1b3Sopenharmony_ci 7053a5a1b3Sopenharmony_ci#define SAP_PORT 9875 7153a5a1b3Sopenharmony_ci#define DEFAULT_SAP_ADDRESS "224.0.0.56" 7253a5a1b3Sopenharmony_ci#define DEFAULT_LATENCY_MSEC 500 7353a5a1b3Sopenharmony_ci#define MEMBLOCKQ_MAXLENGTH (1024*1024*40) 7453a5a1b3Sopenharmony_ci#define MAX_SESSIONS 16 7553a5a1b3Sopenharmony_ci#define DEATH_TIMEOUT 20 7653a5a1b3Sopenharmony_ci#define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC) 7753a5a1b3Sopenharmony_ci 7853a5a1b3Sopenharmony_cistatic const char* const valid_modargs[] = { 7953a5a1b3Sopenharmony_ci "sink", 8053a5a1b3Sopenharmony_ci "sap_address", 8153a5a1b3Sopenharmony_ci "latency_msec", 8253a5a1b3Sopenharmony_ci NULL 8353a5a1b3Sopenharmony_ci}; 8453a5a1b3Sopenharmony_ci 8553a5a1b3Sopenharmony_cistruct session { 8653a5a1b3Sopenharmony_ci struct userdata *userdata; 8753a5a1b3Sopenharmony_ci PA_LLIST_FIELDS(struct session); 8853a5a1b3Sopenharmony_ci 8953a5a1b3Sopenharmony_ci pa_sink_input *sink_input; 9053a5a1b3Sopenharmony_ci pa_memblockq *memblockq; 9153a5a1b3Sopenharmony_ci 9253a5a1b3Sopenharmony_ci bool first_packet; 9353a5a1b3Sopenharmony_ci uint32_t offset; 9453a5a1b3Sopenharmony_ci 9553a5a1b3Sopenharmony_ci struct pa_sdp_info sdp_info; 9653a5a1b3Sopenharmony_ci 9753a5a1b3Sopenharmony_ci pa_rtp_context *rtp_context; 9853a5a1b3Sopenharmony_ci 9953a5a1b3Sopenharmony_ci pa_rtpoll_item *rtpoll_item; 10053a5a1b3Sopenharmony_ci 10153a5a1b3Sopenharmony_ci pa_atomic_t timestamp; 10253a5a1b3Sopenharmony_ci 10353a5a1b3Sopenharmony_ci pa_usec_t intended_latency; 10453a5a1b3Sopenharmony_ci pa_usec_t sink_latency; 10553a5a1b3Sopenharmony_ci 10653a5a1b3Sopenharmony_ci unsigned int base_rate; 10753a5a1b3Sopenharmony_ci pa_usec_t last_rate_update; 10853a5a1b3Sopenharmony_ci pa_usec_t last_latency; 10953a5a1b3Sopenharmony_ci double estimated_rate; 11053a5a1b3Sopenharmony_ci double avg_estimated_rate; 11153a5a1b3Sopenharmony_ci}; 11253a5a1b3Sopenharmony_ci 11353a5a1b3Sopenharmony_cistruct userdata { 11453a5a1b3Sopenharmony_ci pa_module *module; 11553a5a1b3Sopenharmony_ci pa_core *core; 11653a5a1b3Sopenharmony_ci 11753a5a1b3Sopenharmony_ci pa_sap_context sap_context; 11853a5a1b3Sopenharmony_ci pa_io_event* sap_event; 11953a5a1b3Sopenharmony_ci 12053a5a1b3Sopenharmony_ci pa_time_event *check_death_event; 12153a5a1b3Sopenharmony_ci 12253a5a1b3Sopenharmony_ci char *sink_name; 12353a5a1b3Sopenharmony_ci 12453a5a1b3Sopenharmony_ci PA_LLIST_HEAD(struct session, sessions); 12553a5a1b3Sopenharmony_ci pa_hashmap *by_origin; 12653a5a1b3Sopenharmony_ci int n_sessions; 12753a5a1b3Sopenharmony_ci 12853a5a1b3Sopenharmony_ci pa_usec_t latency; 12953a5a1b3Sopenharmony_ci}; 13053a5a1b3Sopenharmony_ci 13153a5a1b3Sopenharmony_cistatic void session_free(struct session *s); 13253a5a1b3Sopenharmony_ci 13353a5a1b3Sopenharmony_ci/* Called from I/O thread context */ 13453a5a1b3Sopenharmony_cistatic int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { 13553a5a1b3Sopenharmony_ci struct session *s = PA_SINK_INPUT(o)->userdata; 13653a5a1b3Sopenharmony_ci 13753a5a1b3Sopenharmony_ci switch (code) { 13853a5a1b3Sopenharmony_ci case PA_SINK_INPUT_MESSAGE_GET_LATENCY: 13953a5a1b3Sopenharmony_ci *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec); 14053a5a1b3Sopenharmony_ci 14153a5a1b3Sopenharmony_ci /* Fall through, the default handler will add in the extra 14253a5a1b3Sopenharmony_ci * latency added by the resampler */ 14353a5a1b3Sopenharmony_ci break; 14453a5a1b3Sopenharmony_ci } 14553a5a1b3Sopenharmony_ci 14653a5a1b3Sopenharmony_ci return pa_sink_input_process_msg(o, code, data, offset, chunk); 14753a5a1b3Sopenharmony_ci} 14853a5a1b3Sopenharmony_ci 14953a5a1b3Sopenharmony_ci/* Called from I/O thread context */ 15053a5a1b3Sopenharmony_cistatic int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { 15153a5a1b3Sopenharmony_ci struct session *s; 15253a5a1b3Sopenharmony_ci pa_sink_input_assert_ref(i); 15353a5a1b3Sopenharmony_ci pa_assert_se(s = i->userdata); 15453a5a1b3Sopenharmony_ci 15553a5a1b3Sopenharmony_ci if (pa_memblockq_peek(s->memblockq, chunk) < 0) 15653a5a1b3Sopenharmony_ci return -1; 15753a5a1b3Sopenharmony_ci 15853a5a1b3Sopenharmony_ci pa_memblockq_drop(s->memblockq, chunk->length); 15953a5a1b3Sopenharmony_ci 16053a5a1b3Sopenharmony_ci return 0; 16153a5a1b3Sopenharmony_ci} 16253a5a1b3Sopenharmony_ci 16353a5a1b3Sopenharmony_ci/* Called from I/O thread context */ 16453a5a1b3Sopenharmony_cistatic void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) { 16553a5a1b3Sopenharmony_ci struct session *s; 16653a5a1b3Sopenharmony_ci 16753a5a1b3Sopenharmony_ci pa_sink_input_assert_ref(i); 16853a5a1b3Sopenharmony_ci pa_assert_se(s = i->userdata); 16953a5a1b3Sopenharmony_ci 17053a5a1b3Sopenharmony_ci pa_memblockq_rewind(s->memblockq, nbytes); 17153a5a1b3Sopenharmony_ci} 17253a5a1b3Sopenharmony_ci 17353a5a1b3Sopenharmony_ci/* Called from I/O thread context */ 17453a5a1b3Sopenharmony_cistatic void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) { 17553a5a1b3Sopenharmony_ci struct session *s; 17653a5a1b3Sopenharmony_ci 17753a5a1b3Sopenharmony_ci pa_sink_input_assert_ref(i); 17853a5a1b3Sopenharmony_ci pa_assert_se(s = i->userdata); 17953a5a1b3Sopenharmony_ci 18053a5a1b3Sopenharmony_ci pa_memblockq_set_maxrewind(s->memblockq, nbytes); 18153a5a1b3Sopenharmony_ci} 18253a5a1b3Sopenharmony_ci 18353a5a1b3Sopenharmony_ci/* Called from main context */ 18453a5a1b3Sopenharmony_cistatic void sink_input_kill(pa_sink_input* i) { 18553a5a1b3Sopenharmony_ci struct session *s; 18653a5a1b3Sopenharmony_ci pa_sink_input_assert_ref(i); 18753a5a1b3Sopenharmony_ci pa_assert_se(s = i->userdata); 18853a5a1b3Sopenharmony_ci 18953a5a1b3Sopenharmony_ci pa_hashmap_remove_and_free(s->userdata->by_origin, s->sdp_info.origin); 19053a5a1b3Sopenharmony_ci} 19153a5a1b3Sopenharmony_ci 19253a5a1b3Sopenharmony_ci/* Called from IO context */ 19353a5a1b3Sopenharmony_cistatic void sink_input_suspend_within_thread(pa_sink_input* i, bool b) { 19453a5a1b3Sopenharmony_ci struct session *s; 19553a5a1b3Sopenharmony_ci pa_sink_input_assert_ref(i); 19653a5a1b3Sopenharmony_ci pa_assert_se(s = i->userdata); 19753a5a1b3Sopenharmony_ci 19853a5a1b3Sopenharmony_ci if (b) 19953a5a1b3Sopenharmony_ci pa_memblockq_flush_read(s->memblockq); 20053a5a1b3Sopenharmony_ci else 20153a5a1b3Sopenharmony_ci s->first_packet = false; 20253a5a1b3Sopenharmony_ci} 20353a5a1b3Sopenharmony_ci 20453a5a1b3Sopenharmony_ci/* Called from I/O thread context */ 20553a5a1b3Sopenharmony_cistatic int rtpoll_work_cb(pa_rtpoll_item *i) { 20653a5a1b3Sopenharmony_ci pa_memchunk chunk; 20753a5a1b3Sopenharmony_ci uint32_t timestamp; 20853a5a1b3Sopenharmony_ci int64_t k, j, delta; 20953a5a1b3Sopenharmony_ci struct timeval now = { 0, 0 }; 21053a5a1b3Sopenharmony_ci struct session *s; 21153a5a1b3Sopenharmony_ci struct pollfd *p; 21253a5a1b3Sopenharmony_ci 21353a5a1b3Sopenharmony_ci pa_assert_se(s = pa_rtpoll_item_get_work_userdata(i)); 21453a5a1b3Sopenharmony_ci 21553a5a1b3Sopenharmony_ci p = pa_rtpoll_item_get_pollfd(i, NULL); 21653a5a1b3Sopenharmony_ci 21753a5a1b3Sopenharmony_ci if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) { 21853a5a1b3Sopenharmony_ci pa_log("poll() signalled bad revents."); 21953a5a1b3Sopenharmony_ci return -1; 22053a5a1b3Sopenharmony_ci } 22153a5a1b3Sopenharmony_ci 22253a5a1b3Sopenharmony_ci if ((p->revents & POLLIN) == 0) 22353a5a1b3Sopenharmony_ci return 0; 22453a5a1b3Sopenharmony_ci 22553a5a1b3Sopenharmony_ci p->revents = 0; 22653a5a1b3Sopenharmony_ci 22753a5a1b3Sopenharmony_ci if (pa_rtp_recv(s->rtp_context, &chunk, s->userdata->module->core->mempool, ×tamp, &now) < 0) 22853a5a1b3Sopenharmony_ci return 0; 22953a5a1b3Sopenharmony_ci 23053a5a1b3Sopenharmony_ci if (!PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) { 23153a5a1b3Sopenharmony_ci pa_memblock_unref(chunk.memblock); 23253a5a1b3Sopenharmony_ci return 0; 23353a5a1b3Sopenharmony_ci } 23453a5a1b3Sopenharmony_ci 23553a5a1b3Sopenharmony_ci if (!s->first_packet) { 23653a5a1b3Sopenharmony_ci s->first_packet = true; 23753a5a1b3Sopenharmony_ci s->offset = timestamp; 23853a5a1b3Sopenharmony_ci } 23953a5a1b3Sopenharmony_ci 24053a5a1b3Sopenharmony_ci /* Check whether there was a timestamp overflow */ 24153a5a1b3Sopenharmony_ci k = (int64_t) timestamp - (int64_t) s->offset; 24253a5a1b3Sopenharmony_ci j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) timestamp; 24353a5a1b3Sopenharmony_ci 24453a5a1b3Sopenharmony_ci if ((k < 0 ? -k : k) < (j < 0 ? -j : j)) 24553a5a1b3Sopenharmony_ci delta = k; 24653a5a1b3Sopenharmony_ci else 24753a5a1b3Sopenharmony_ci delta = j; 24853a5a1b3Sopenharmony_ci 24953a5a1b3Sopenharmony_ci pa_memblockq_seek(s->memblockq, delta * (int64_t) pa_rtp_context_get_frame_size(s->rtp_context), PA_SEEK_RELATIVE, 25053a5a1b3Sopenharmony_ci true); 25153a5a1b3Sopenharmony_ci 25253a5a1b3Sopenharmony_ci if (now.tv_sec == 0) { 25353a5a1b3Sopenharmony_ci PA_ONCE_BEGIN { 25453a5a1b3Sopenharmony_ci pa_log_warn("Using artificial time instead of timestamp"); 25553a5a1b3Sopenharmony_ci } PA_ONCE_END; 25653a5a1b3Sopenharmony_ci pa_rtclock_get(&now); 25753a5a1b3Sopenharmony_ci } else 25853a5a1b3Sopenharmony_ci pa_rtclock_from_wallclock(&now); 25953a5a1b3Sopenharmony_ci 26053a5a1b3Sopenharmony_ci if (pa_memblockq_push(s->memblockq, &chunk) < 0) { 26153a5a1b3Sopenharmony_ci pa_log_warn("Queue overrun"); 26253a5a1b3Sopenharmony_ci pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true); 26353a5a1b3Sopenharmony_ci } 26453a5a1b3Sopenharmony_ci 26553a5a1b3Sopenharmony_ci/* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */ 26653a5a1b3Sopenharmony_ci 26753a5a1b3Sopenharmony_ci pa_memblock_unref(chunk.memblock); 26853a5a1b3Sopenharmony_ci 26953a5a1b3Sopenharmony_ci /* The next timestamp we expect */ 27053a5a1b3Sopenharmony_ci s->offset = timestamp + (uint32_t) (chunk.length / pa_rtp_context_get_frame_size(s->rtp_context)); 27153a5a1b3Sopenharmony_ci 27253a5a1b3Sopenharmony_ci pa_atomic_store(&s->timestamp, (int) now.tv_sec); 27353a5a1b3Sopenharmony_ci 27453a5a1b3Sopenharmony_ci if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) { 27553a5a1b3Sopenharmony_ci pa_usec_t wi, ri, render_delay, sink_delay = 0, latency; 27653a5a1b3Sopenharmony_ci uint32_t current_rate = s->sink_input->sample_spec.rate; 27753a5a1b3Sopenharmony_ci uint32_t new_rate; 27853a5a1b3Sopenharmony_ci double estimated_rate, alpha = 0.02; 27953a5a1b3Sopenharmony_ci 28053a5a1b3Sopenharmony_ci pa_log_debug("Updating sample rate"); 28153a5a1b3Sopenharmony_ci 28253a5a1b3Sopenharmony_ci wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec); 28353a5a1b3Sopenharmony_ci ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec); 28453a5a1b3Sopenharmony_ci 28553a5a1b3Sopenharmony_ci pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri); 28653a5a1b3Sopenharmony_ci 28753a5a1b3Sopenharmony_ci sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink, false); 28853a5a1b3Sopenharmony_ci sink_delay += pa_resampler_get_delay_usec(s->sink_input->thread_info.resampler); 28953a5a1b3Sopenharmony_ci render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec); 29053a5a1b3Sopenharmony_ci 29153a5a1b3Sopenharmony_ci if (ri > render_delay+sink_delay) 29253a5a1b3Sopenharmony_ci ri -= render_delay+sink_delay; 29353a5a1b3Sopenharmony_ci else 29453a5a1b3Sopenharmony_ci ri = 0; 29553a5a1b3Sopenharmony_ci 29653a5a1b3Sopenharmony_ci if (wi < ri) 29753a5a1b3Sopenharmony_ci latency = 0; 29853a5a1b3Sopenharmony_ci else 29953a5a1b3Sopenharmony_ci latency = wi - ri; 30053a5a1b3Sopenharmony_ci 30153a5a1b3Sopenharmony_ci pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC); 30253a5a1b3Sopenharmony_ci 30353a5a1b3Sopenharmony_ci /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in 30453a5a1b3Sopenharmony_ci * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that 30553a5a1b3Sopenharmony_ci * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate 30653a5a1b3Sopenharmony_ci * T 30753a5a1b3Sopenharmony_ci * R̂ = ─────────────── Rⁿ . (1) 30853a5a1b3Sopenharmony_ci * T - (Lⁿ - Lⁿ⁻ⁱ) 30953a5a1b3Sopenharmony_ci * 31053a5a1b3Sopenharmony_ci * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂ 31153a5a1b3Sopenharmony_ci * is correct). But there is also the requirement to keep the buffer at a predefined target 31253a5a1b3Sopenharmony_ci * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R 31353a5a1b3Sopenharmony_ci * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time 31453a5a1b3Sopenharmony_ci * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements 31553a5a1b3Sopenharmony_ci * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1 31653a5a1b3Sopenharmony_ci * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ . 31753a5a1b3Sopenharmony_ci * ʲ⁼ⁱ R̂ a a 31853a5a1b3Sopenharmony_ci * Solving for Rⁿ⁺ⁱ gives 31953a5a1b3Sopenharmony_ci * T - ²∕ₐ₊₁(L̂ - Lⁿ) 32053a5a1b3Sopenharmony_ci * Rⁿ⁺ⁱ = ───────────────── R̂ . (2) 32153a5a1b3Sopenharmony_ci * T 32253a5a1b3Sopenharmony_ci * In the code below a = 7 is used. 32353a5a1b3Sopenharmony_ci * 32453a5a1b3Sopenharmony_ci * Equation (1) is not directly used in (2), but instead an exponentially weighted average 32553a5a1b3Sopenharmony_ci * of the estimated rate R̂ is used. This average R̅ is defined as 32653a5a1b3Sopenharmony_ci * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ . 32753a5a1b3Sopenharmony_ci * Because it is difficult to find a fixed value for the coefficient α such that the 32853a5a1b3Sopenharmony_ci * averaging is without significant lag but oscillations are filtered out, a heuristic is 32953a5a1b3Sopenharmony_ci * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a 33053a5a1b3Sopenharmony_ci * sudden spike in the estimated rate α→0, such that the deviation is given little weight. 33153a5a1b3Sopenharmony_ci */ 33253a5a1b3Sopenharmony_ci estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency); 33353a5a1b3Sopenharmony_ci if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) { 33453a5a1b3Sopenharmony_ci double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate); 33553a5a1b3Sopenharmony_ci alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8); 33653a5a1b3Sopenharmony_ci } 33753a5a1b3Sopenharmony_ci s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate; 33853a5a1b3Sopenharmony_ci s->estimated_rate = estimated_rate; 33953a5a1b3Sopenharmony_ci pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha); 34053a5a1b3Sopenharmony_ci new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate); 34153a5a1b3Sopenharmony_ci s->last_latency = latency; 34253a5a1b3Sopenharmony_ci 34353a5a1b3Sopenharmony_ci if (new_rate < (uint32_t) (s->base_rate*0.8) || new_rate > (uint32_t) (s->base_rate*1.25)) { 34453a5a1b3Sopenharmony_ci pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", s->base_rate, new_rate); 34553a5a1b3Sopenharmony_ci new_rate = s->base_rate; 34653a5a1b3Sopenharmony_ci } else { 34753a5a1b3Sopenharmony_ci if (s->base_rate < new_rate + 20 && new_rate < s->base_rate + 20) 34853a5a1b3Sopenharmony_ci new_rate = s->base_rate; 34953a5a1b3Sopenharmony_ci /* Do the adjustment in small steps; 2‰ can be considered inaudible */ 35053a5a1b3Sopenharmony_ci if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) { 35153a5a1b3Sopenharmony_ci pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate); 35253a5a1b3Sopenharmony_ci new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002)); 35353a5a1b3Sopenharmony_ci } 35453a5a1b3Sopenharmony_ci } 35553a5a1b3Sopenharmony_ci s->sink_input->sample_spec.rate = new_rate; 35653a5a1b3Sopenharmony_ci 35753a5a1b3Sopenharmony_ci pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec)); 35853a5a1b3Sopenharmony_ci 35953a5a1b3Sopenharmony_ci pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate); 36053a5a1b3Sopenharmony_ci 36153a5a1b3Sopenharmony_ci pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate); 36253a5a1b3Sopenharmony_ci 36353a5a1b3Sopenharmony_ci s->last_rate_update = pa_timeval_load(&now); 36453a5a1b3Sopenharmony_ci } 36553a5a1b3Sopenharmony_ci 36653a5a1b3Sopenharmony_ci if (pa_memblockq_is_readable(s->memblockq) && 36753a5a1b3Sopenharmony_ci s->sink_input->thread_info.underrun_for > 0) { 36853a5a1b3Sopenharmony_ci pa_log_debug("Requesting rewind due to end of underrun"); 36953a5a1b3Sopenharmony_ci pa_sink_input_request_rewind(s->sink_input, 37053a5a1b3Sopenharmony_ci (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for), 37153a5a1b3Sopenharmony_ci false, true, false); 37253a5a1b3Sopenharmony_ci } 37353a5a1b3Sopenharmony_ci 37453a5a1b3Sopenharmony_ci return 1; 37553a5a1b3Sopenharmony_ci} 37653a5a1b3Sopenharmony_ci 37753a5a1b3Sopenharmony_ci/* Called from I/O thread context */ 37853a5a1b3Sopenharmony_cistatic void sink_input_attach(pa_sink_input *i) { 37953a5a1b3Sopenharmony_ci struct session *s; 38053a5a1b3Sopenharmony_ci 38153a5a1b3Sopenharmony_ci pa_sink_input_assert_ref(i); 38253a5a1b3Sopenharmony_ci pa_assert_se(s = i->userdata); 38353a5a1b3Sopenharmony_ci 38453a5a1b3Sopenharmony_ci pa_assert(!s->rtpoll_item); 38553a5a1b3Sopenharmony_ci s->rtpoll_item = pa_rtp_context_get_rtpoll_item(s->rtp_context, i->sink->thread_info.rtpoll); 38653a5a1b3Sopenharmony_ci 38753a5a1b3Sopenharmony_ci pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb, s); 38853a5a1b3Sopenharmony_ci} 38953a5a1b3Sopenharmony_ci 39053a5a1b3Sopenharmony_ci/* Called from I/O thread context */ 39153a5a1b3Sopenharmony_cistatic void sink_input_detach(pa_sink_input *i) { 39253a5a1b3Sopenharmony_ci struct session *s; 39353a5a1b3Sopenharmony_ci pa_sink_input_assert_ref(i); 39453a5a1b3Sopenharmony_ci pa_assert_se(s = i->userdata); 39553a5a1b3Sopenharmony_ci 39653a5a1b3Sopenharmony_ci pa_assert(s->rtpoll_item); 39753a5a1b3Sopenharmony_ci pa_rtpoll_item_free(s->rtpoll_item); 39853a5a1b3Sopenharmony_ci s->rtpoll_item = NULL; 39953a5a1b3Sopenharmony_ci} 40053a5a1b3Sopenharmony_ci 40153a5a1b3Sopenharmony_cistatic int mcast_socket(const struct sockaddr* sa, socklen_t salen) { 40253a5a1b3Sopenharmony_ci int af, fd = -1, r, one; 40353a5a1b3Sopenharmony_ci 40453a5a1b3Sopenharmony_ci pa_assert(sa); 40553a5a1b3Sopenharmony_ci pa_assert(salen > 0); 40653a5a1b3Sopenharmony_ci 40753a5a1b3Sopenharmony_ci af = sa->sa_family; 40853a5a1b3Sopenharmony_ci if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) { 40953a5a1b3Sopenharmony_ci pa_log("Failed to create socket: %s", pa_cstrerror(errno)); 41053a5a1b3Sopenharmony_ci goto fail; 41153a5a1b3Sopenharmony_ci } 41253a5a1b3Sopenharmony_ci 41353a5a1b3Sopenharmony_ci pa_make_udp_socket_low_delay(fd); 41453a5a1b3Sopenharmony_ci 41553a5a1b3Sopenharmony_ci#ifdef SO_TIMESTAMP 41653a5a1b3Sopenharmony_ci one = 1; 41753a5a1b3Sopenharmony_ci if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) { 41853a5a1b3Sopenharmony_ci pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno)); 41953a5a1b3Sopenharmony_ci goto fail; 42053a5a1b3Sopenharmony_ci } 42153a5a1b3Sopenharmony_ci#else 42253a5a1b3Sopenharmony_ci pa_log("SO_TIMESTAMP unsupported on this platform"); 42353a5a1b3Sopenharmony_ci goto fail; 42453a5a1b3Sopenharmony_ci#endif 42553a5a1b3Sopenharmony_ci 42653a5a1b3Sopenharmony_ci one = 1; 42753a5a1b3Sopenharmony_ci if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) { 42853a5a1b3Sopenharmony_ci pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno)); 42953a5a1b3Sopenharmony_ci goto fail; 43053a5a1b3Sopenharmony_ci } 43153a5a1b3Sopenharmony_ci 43253a5a1b3Sopenharmony_ci r = 0; 43353a5a1b3Sopenharmony_ci if (af == AF_INET) { 43453a5a1b3Sopenharmony_ci /* IPv4 multicast addresses are in the 224.0.0.0-239.255.255.255 range */ 43553a5a1b3Sopenharmony_ci static const uint32_t ipv4_mcast_mask = 0xe0000000; 43653a5a1b3Sopenharmony_ci 43753a5a1b3Sopenharmony_ci if ((ntohl(((const struct sockaddr_in*) sa)->sin_addr.s_addr) & ipv4_mcast_mask) == ipv4_mcast_mask) { 43853a5a1b3Sopenharmony_ci struct ip_mreq mr4; 43953a5a1b3Sopenharmony_ci memset(&mr4, 0, sizeof(mr4)); 44053a5a1b3Sopenharmony_ci mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr; 44153a5a1b3Sopenharmony_ci r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4)); 44253a5a1b3Sopenharmony_ci } 44353a5a1b3Sopenharmony_ci#ifdef HAVE_IPV6 44453a5a1b3Sopenharmony_ci } else if (af == AF_INET6) { 44553a5a1b3Sopenharmony_ci /* IPv6 multicast addresses have 255 as the most significant byte */ 44653a5a1b3Sopenharmony_ci if (((const struct sockaddr_in6*) sa)->sin6_addr.s6_addr[0] == 0xff) { 44753a5a1b3Sopenharmony_ci struct ipv6_mreq mr6; 44853a5a1b3Sopenharmony_ci memset(&mr6, 0, sizeof(mr6)); 44953a5a1b3Sopenharmony_ci mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr; 45053a5a1b3Sopenharmony_ci r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6)); 45153a5a1b3Sopenharmony_ci } 45253a5a1b3Sopenharmony_ci#endif 45353a5a1b3Sopenharmony_ci } else 45453a5a1b3Sopenharmony_ci pa_assert_not_reached(); 45553a5a1b3Sopenharmony_ci 45653a5a1b3Sopenharmony_ci if (r < 0) { 45753a5a1b3Sopenharmony_ci pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno)); 45853a5a1b3Sopenharmony_ci goto fail; 45953a5a1b3Sopenharmony_ci } 46053a5a1b3Sopenharmony_ci 46153a5a1b3Sopenharmony_ci if (bind(fd, sa, salen) < 0) { 46253a5a1b3Sopenharmony_ci pa_log("bind() failed: %s", pa_cstrerror(errno)); 46353a5a1b3Sopenharmony_ci goto fail; 46453a5a1b3Sopenharmony_ci } 46553a5a1b3Sopenharmony_ci 46653a5a1b3Sopenharmony_ci return fd; 46753a5a1b3Sopenharmony_ci 46853a5a1b3Sopenharmony_cifail: 46953a5a1b3Sopenharmony_ci if (fd >= 0) 47053a5a1b3Sopenharmony_ci close(fd); 47153a5a1b3Sopenharmony_ci 47253a5a1b3Sopenharmony_ci return -1; 47353a5a1b3Sopenharmony_ci} 47453a5a1b3Sopenharmony_ci 47553a5a1b3Sopenharmony_cistatic struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) { 47653a5a1b3Sopenharmony_ci struct session *s = NULL; 47753a5a1b3Sopenharmony_ci pa_sink *sink; 47853a5a1b3Sopenharmony_ci int fd = -1; 47953a5a1b3Sopenharmony_ci pa_memchunk silence; 48053a5a1b3Sopenharmony_ci pa_sink_input_new_data data; 48153a5a1b3Sopenharmony_ci struct timeval now; 48253a5a1b3Sopenharmony_ci 48353a5a1b3Sopenharmony_ci pa_assert(u); 48453a5a1b3Sopenharmony_ci pa_assert(sdp_info); 48553a5a1b3Sopenharmony_ci 48653a5a1b3Sopenharmony_ci if (u->n_sessions >= MAX_SESSIONS) { 48753a5a1b3Sopenharmony_ci pa_log("Session limit reached."); 48853a5a1b3Sopenharmony_ci goto fail; 48953a5a1b3Sopenharmony_ci } 49053a5a1b3Sopenharmony_ci 49153a5a1b3Sopenharmony_ci if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) { 49253a5a1b3Sopenharmony_ci pa_log("Sink does not exist."); 49353a5a1b3Sopenharmony_ci goto fail; 49453a5a1b3Sopenharmony_ci } 49553a5a1b3Sopenharmony_ci 49653a5a1b3Sopenharmony_ci pa_rtclock_get(&now); 49753a5a1b3Sopenharmony_ci 49853a5a1b3Sopenharmony_ci s = pa_xnew0(struct session, 1); 49953a5a1b3Sopenharmony_ci s->userdata = u; 50053a5a1b3Sopenharmony_ci s->first_packet = false; 50153a5a1b3Sopenharmony_ci s->sdp_info = *sdp_info; 50253a5a1b3Sopenharmony_ci s->rtpoll_item = NULL; 50353a5a1b3Sopenharmony_ci s->intended_latency = u->latency; 50453a5a1b3Sopenharmony_ci s->last_rate_update = pa_timeval_load(&now); 50553a5a1b3Sopenharmony_ci s->last_latency = u->latency; 50653a5a1b3Sopenharmony_ci pa_atomic_store(&s->timestamp, (int) now.tv_sec); 50753a5a1b3Sopenharmony_ci 50853a5a1b3Sopenharmony_ci if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0) 50953a5a1b3Sopenharmony_ci goto fail; 51053a5a1b3Sopenharmony_ci 51153a5a1b3Sopenharmony_ci pa_sink_input_new_data_init(&data); 51253a5a1b3Sopenharmony_ci pa_sink_input_new_data_set_sink(&data, sink, false, true); 51353a5a1b3Sopenharmony_ci data.driver = __FILE__; 51453a5a1b3Sopenharmony_ci pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream"); 51553a5a1b3Sopenharmony_ci pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME, 51653a5a1b3Sopenharmony_ci "RTP Stream%s%s%s", 51753a5a1b3Sopenharmony_ci sdp_info->session_name ? " (" : "", 51853a5a1b3Sopenharmony_ci sdp_info->session_name ? sdp_info->session_name : "", 51953a5a1b3Sopenharmony_ci sdp_info->session_name ? ")" : ""); 52053a5a1b3Sopenharmony_ci 52153a5a1b3Sopenharmony_ci if (sdp_info->session_name) 52253a5a1b3Sopenharmony_ci pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name); 52353a5a1b3Sopenharmony_ci pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin); 52453a5a1b3Sopenharmony_ci pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload); 52553a5a1b3Sopenharmony_ci data.module = u->module; 52653a5a1b3Sopenharmony_ci pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec); 52753a5a1b3Sopenharmony_ci data.flags = PA_SINK_INPUT_VARIABLE_RATE; 52853a5a1b3Sopenharmony_ci 52953a5a1b3Sopenharmony_ci pa_sink_input_new(&s->sink_input, u->module->core, &data); 53053a5a1b3Sopenharmony_ci pa_sink_input_new_data_done(&data); 53153a5a1b3Sopenharmony_ci 53253a5a1b3Sopenharmony_ci if (!s->sink_input) { 53353a5a1b3Sopenharmony_ci pa_log("Failed to create sink input."); 53453a5a1b3Sopenharmony_ci goto fail; 53553a5a1b3Sopenharmony_ci } 53653a5a1b3Sopenharmony_ci 53753a5a1b3Sopenharmony_ci s->base_rate = (double) s->sink_input->sample_spec.rate; 53853a5a1b3Sopenharmony_ci s->estimated_rate = (double) s->sink_input->sample_spec.rate; 53953a5a1b3Sopenharmony_ci s->avg_estimated_rate = (double) s->sink_input->sample_spec.rate; 54053a5a1b3Sopenharmony_ci 54153a5a1b3Sopenharmony_ci s->sink_input->userdata = s; 54253a5a1b3Sopenharmony_ci 54353a5a1b3Sopenharmony_ci s->sink_input->parent.process_msg = sink_input_process_msg; 54453a5a1b3Sopenharmony_ci s->sink_input->pop = sink_input_pop_cb; 54553a5a1b3Sopenharmony_ci s->sink_input->process_rewind = sink_input_process_rewind_cb; 54653a5a1b3Sopenharmony_ci s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb; 54753a5a1b3Sopenharmony_ci s->sink_input->kill = sink_input_kill; 54853a5a1b3Sopenharmony_ci s->sink_input->attach = sink_input_attach; 54953a5a1b3Sopenharmony_ci s->sink_input->detach = sink_input_detach; 55053a5a1b3Sopenharmony_ci s->sink_input->suspend_within_thread = sink_input_suspend_within_thread; 55153a5a1b3Sopenharmony_ci 55253a5a1b3Sopenharmony_ci pa_sink_input_get_silence(s->sink_input, &silence); 55353a5a1b3Sopenharmony_ci 55453a5a1b3Sopenharmony_ci s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2); 55553a5a1b3Sopenharmony_ci 55653a5a1b3Sopenharmony_ci if (s->intended_latency < s->sink_latency*2) 55753a5a1b3Sopenharmony_ci s->intended_latency = s->sink_latency*2; 55853a5a1b3Sopenharmony_ci 55953a5a1b3Sopenharmony_ci s->memblockq = pa_memblockq_new( 56053a5a1b3Sopenharmony_ci "module-rtp-recv memblockq", 56153a5a1b3Sopenharmony_ci 0, 56253a5a1b3Sopenharmony_ci MEMBLOCKQ_MAXLENGTH, 56353a5a1b3Sopenharmony_ci MEMBLOCKQ_MAXLENGTH, 56453a5a1b3Sopenharmony_ci &s->sink_input->sample_spec, 56553a5a1b3Sopenharmony_ci pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec), 56653a5a1b3Sopenharmony_ci 0, 56753a5a1b3Sopenharmony_ci 0, 56853a5a1b3Sopenharmony_ci &silence); 56953a5a1b3Sopenharmony_ci 57053a5a1b3Sopenharmony_ci pa_memblock_unref(silence.memblock); 57153a5a1b3Sopenharmony_ci 57253a5a1b3Sopenharmony_ci if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, &s->sdp_info.sample_spec, sdp_info->enable_opus))) 57353a5a1b3Sopenharmony_ci goto fail; 57453a5a1b3Sopenharmony_ci 57553a5a1b3Sopenharmony_ci pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s); 57653a5a1b3Sopenharmony_ci u->n_sessions++; 57753a5a1b3Sopenharmony_ci PA_LLIST_PREPEND(struct session, s->userdata->sessions, s); 57853a5a1b3Sopenharmony_ci 57953a5a1b3Sopenharmony_ci pa_sink_input_put(s->sink_input); 58053a5a1b3Sopenharmony_ci 58153a5a1b3Sopenharmony_ci pa_log_info("New session '%s'", s->sdp_info.session_name); 58253a5a1b3Sopenharmony_ci 58353a5a1b3Sopenharmony_ci return s; 58453a5a1b3Sopenharmony_ci 58553a5a1b3Sopenharmony_cifail: 58653a5a1b3Sopenharmony_ci pa_xfree(s); 58753a5a1b3Sopenharmony_ci 58853a5a1b3Sopenharmony_ci if (fd >= 0) 58953a5a1b3Sopenharmony_ci pa_close(fd); 59053a5a1b3Sopenharmony_ci 59153a5a1b3Sopenharmony_ci return NULL; 59253a5a1b3Sopenharmony_ci} 59353a5a1b3Sopenharmony_ci 59453a5a1b3Sopenharmony_cistatic void session_free(struct session *s) { 59553a5a1b3Sopenharmony_ci pa_assert(s); 59653a5a1b3Sopenharmony_ci 59753a5a1b3Sopenharmony_ci pa_log_info("Freeing session '%s'", s->sdp_info.session_name); 59853a5a1b3Sopenharmony_ci 59953a5a1b3Sopenharmony_ci pa_sink_input_unlink(s->sink_input); 60053a5a1b3Sopenharmony_ci pa_sink_input_unref(s->sink_input); 60153a5a1b3Sopenharmony_ci 60253a5a1b3Sopenharmony_ci PA_LLIST_REMOVE(struct session, s->userdata->sessions, s); 60353a5a1b3Sopenharmony_ci pa_assert(s->userdata->n_sessions >= 1); 60453a5a1b3Sopenharmony_ci s->userdata->n_sessions--; 60553a5a1b3Sopenharmony_ci 60653a5a1b3Sopenharmony_ci pa_memblockq_free(s->memblockq); 60753a5a1b3Sopenharmony_ci pa_sdp_info_destroy(&s->sdp_info); 60853a5a1b3Sopenharmony_ci pa_rtp_context_free(s->rtp_context); 60953a5a1b3Sopenharmony_ci 61053a5a1b3Sopenharmony_ci pa_xfree(s); 61153a5a1b3Sopenharmony_ci} 61253a5a1b3Sopenharmony_ci 61353a5a1b3Sopenharmony_cistatic void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) { 61453a5a1b3Sopenharmony_ci struct userdata *u = userdata; 61553a5a1b3Sopenharmony_ci bool goodbye = false; 61653a5a1b3Sopenharmony_ci pa_sdp_info info; 61753a5a1b3Sopenharmony_ci struct session *s; 61853a5a1b3Sopenharmony_ci 61953a5a1b3Sopenharmony_ci pa_assert(m); 62053a5a1b3Sopenharmony_ci pa_assert(e); 62153a5a1b3Sopenharmony_ci pa_assert(u); 62253a5a1b3Sopenharmony_ci pa_assert(fd == u->sap_context.fd); 62353a5a1b3Sopenharmony_ci pa_assert(flags == PA_IO_EVENT_INPUT); 62453a5a1b3Sopenharmony_ci 62553a5a1b3Sopenharmony_ci if (pa_sap_recv(&u->sap_context, &goodbye) < 0) 62653a5a1b3Sopenharmony_ci return; 62753a5a1b3Sopenharmony_ci 62853a5a1b3Sopenharmony_ci if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye)) 62953a5a1b3Sopenharmony_ci return; 63053a5a1b3Sopenharmony_ci 63153a5a1b3Sopenharmony_ci if (goodbye) { 63253a5a1b3Sopenharmony_ci pa_hashmap_remove_and_free(u->by_origin, info.origin); 63353a5a1b3Sopenharmony_ci pa_sdp_info_destroy(&info); 63453a5a1b3Sopenharmony_ci } else { 63553a5a1b3Sopenharmony_ci 63653a5a1b3Sopenharmony_ci if (!(s = pa_hashmap_get(u->by_origin, info.origin))) { 63753a5a1b3Sopenharmony_ci if (!session_new(u, &info)) 63853a5a1b3Sopenharmony_ci pa_sdp_info_destroy(&info); 63953a5a1b3Sopenharmony_ci 64053a5a1b3Sopenharmony_ci } else { 64153a5a1b3Sopenharmony_ci struct timeval now; 64253a5a1b3Sopenharmony_ci pa_rtclock_get(&now); 64353a5a1b3Sopenharmony_ci pa_atomic_store(&s->timestamp, (int) now.tv_sec); 64453a5a1b3Sopenharmony_ci 64553a5a1b3Sopenharmony_ci pa_sdp_info_destroy(&info); 64653a5a1b3Sopenharmony_ci } 64753a5a1b3Sopenharmony_ci } 64853a5a1b3Sopenharmony_ci} 64953a5a1b3Sopenharmony_ci 65053a5a1b3Sopenharmony_cistatic void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) { 65153a5a1b3Sopenharmony_ci struct session *s, *n; 65253a5a1b3Sopenharmony_ci struct userdata *u = userdata; 65353a5a1b3Sopenharmony_ci struct timeval now; 65453a5a1b3Sopenharmony_ci 65553a5a1b3Sopenharmony_ci pa_assert(m); 65653a5a1b3Sopenharmony_ci pa_assert(t); 65753a5a1b3Sopenharmony_ci pa_assert(u); 65853a5a1b3Sopenharmony_ci 65953a5a1b3Sopenharmony_ci pa_rtclock_get(&now); 66053a5a1b3Sopenharmony_ci 66153a5a1b3Sopenharmony_ci pa_log_debug("Checking for dead streams ..."); 66253a5a1b3Sopenharmony_ci 66353a5a1b3Sopenharmony_ci for (s = u->sessions; s; s = n) { 66453a5a1b3Sopenharmony_ci int k; 66553a5a1b3Sopenharmony_ci n = s->next; 66653a5a1b3Sopenharmony_ci 66753a5a1b3Sopenharmony_ci k = pa_atomic_load(&s->timestamp); 66853a5a1b3Sopenharmony_ci 66953a5a1b3Sopenharmony_ci if (k + DEATH_TIMEOUT < now.tv_sec) 67053a5a1b3Sopenharmony_ci pa_hashmap_remove_and_free(u->by_origin, s->sdp_info.origin); 67153a5a1b3Sopenharmony_ci } 67253a5a1b3Sopenharmony_ci 67353a5a1b3Sopenharmony_ci /* Restart timer */ 67453a5a1b3Sopenharmony_ci pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC); 67553a5a1b3Sopenharmony_ci} 67653a5a1b3Sopenharmony_ci 67753a5a1b3Sopenharmony_ciint pa__init(pa_module*m) { 67853a5a1b3Sopenharmony_ci struct userdata *u; 67953a5a1b3Sopenharmony_ci pa_modargs *ma = NULL; 68053a5a1b3Sopenharmony_ci struct sockaddr_in sa4; 68153a5a1b3Sopenharmony_ci#ifdef HAVE_IPV6 68253a5a1b3Sopenharmony_ci struct sockaddr_in6 sa6; 68353a5a1b3Sopenharmony_ci#endif 68453a5a1b3Sopenharmony_ci struct sockaddr *sa; 68553a5a1b3Sopenharmony_ci socklen_t salen; 68653a5a1b3Sopenharmony_ci const char *sap_address; 68753a5a1b3Sopenharmony_ci uint32_t latency_msec; 68853a5a1b3Sopenharmony_ci int fd = -1; 68953a5a1b3Sopenharmony_ci 69053a5a1b3Sopenharmony_ci pa_assert(m); 69153a5a1b3Sopenharmony_ci 69253a5a1b3Sopenharmony_ci if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { 69353a5a1b3Sopenharmony_ci pa_log("failed to parse module arguments"); 69453a5a1b3Sopenharmony_ci goto fail; 69553a5a1b3Sopenharmony_ci } 69653a5a1b3Sopenharmony_ci 69753a5a1b3Sopenharmony_ci sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS); 69853a5a1b3Sopenharmony_ci 69953a5a1b3Sopenharmony_ci if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) { 70053a5a1b3Sopenharmony_ci sa4.sin_family = AF_INET; 70153a5a1b3Sopenharmony_ci sa4.sin_port = htons(SAP_PORT); 70253a5a1b3Sopenharmony_ci sa = (struct sockaddr*) &sa4; 70353a5a1b3Sopenharmony_ci salen = sizeof(sa4); 70453a5a1b3Sopenharmony_ci#ifdef HAVE_IPV6 70553a5a1b3Sopenharmony_ci } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) { 70653a5a1b3Sopenharmony_ci sa6.sin6_family = AF_INET6; 70753a5a1b3Sopenharmony_ci sa6.sin6_port = htons(SAP_PORT); 70853a5a1b3Sopenharmony_ci sa = (struct sockaddr*) &sa6; 70953a5a1b3Sopenharmony_ci salen = sizeof(sa6); 71053a5a1b3Sopenharmony_ci#endif 71153a5a1b3Sopenharmony_ci } else { 71253a5a1b3Sopenharmony_ci pa_log("Invalid SAP address '%s'", sap_address); 71353a5a1b3Sopenharmony_ci goto fail; 71453a5a1b3Sopenharmony_ci } 71553a5a1b3Sopenharmony_ci 71653a5a1b3Sopenharmony_ci latency_msec = DEFAULT_LATENCY_MSEC; 71753a5a1b3Sopenharmony_ci if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 300000) { 71853a5a1b3Sopenharmony_ci pa_log("Invalid latency specification"); 71953a5a1b3Sopenharmony_ci goto fail; 72053a5a1b3Sopenharmony_ci } 72153a5a1b3Sopenharmony_ci 72253a5a1b3Sopenharmony_ci if ((fd = mcast_socket(sa, salen)) < 0) 72353a5a1b3Sopenharmony_ci goto fail; 72453a5a1b3Sopenharmony_ci 72553a5a1b3Sopenharmony_ci m->userdata = u = pa_xnew(struct userdata, 1); 72653a5a1b3Sopenharmony_ci u->module = m; 72753a5a1b3Sopenharmony_ci u->core = m->core; 72853a5a1b3Sopenharmony_ci u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL)); 72953a5a1b3Sopenharmony_ci u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC; 73053a5a1b3Sopenharmony_ci 73153a5a1b3Sopenharmony_ci u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u); 73253a5a1b3Sopenharmony_ci pa_sap_context_init_recv(&u->sap_context, fd); 73353a5a1b3Sopenharmony_ci 73453a5a1b3Sopenharmony_ci PA_LLIST_HEAD_INIT(struct session, u->sessions); 73553a5a1b3Sopenharmony_ci u->n_sessions = 0; 73653a5a1b3Sopenharmony_ci u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free); 73753a5a1b3Sopenharmony_ci 73853a5a1b3Sopenharmony_ci u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u); 73953a5a1b3Sopenharmony_ci 74053a5a1b3Sopenharmony_ci pa_modargs_free(ma); 74153a5a1b3Sopenharmony_ci 74253a5a1b3Sopenharmony_ci return 0; 74353a5a1b3Sopenharmony_ci 74453a5a1b3Sopenharmony_cifail: 74553a5a1b3Sopenharmony_ci if (ma) 74653a5a1b3Sopenharmony_ci pa_modargs_free(ma); 74753a5a1b3Sopenharmony_ci 74853a5a1b3Sopenharmony_ci if (fd >= 0) 74953a5a1b3Sopenharmony_ci pa_close(fd); 75053a5a1b3Sopenharmony_ci 75153a5a1b3Sopenharmony_ci return -1; 75253a5a1b3Sopenharmony_ci} 75353a5a1b3Sopenharmony_ci 75453a5a1b3Sopenharmony_civoid pa__done(pa_module*m) { 75553a5a1b3Sopenharmony_ci struct userdata *u; 75653a5a1b3Sopenharmony_ci 75753a5a1b3Sopenharmony_ci pa_assert(m); 75853a5a1b3Sopenharmony_ci 75953a5a1b3Sopenharmony_ci if (!(u = m->userdata)) 76053a5a1b3Sopenharmony_ci return; 76153a5a1b3Sopenharmony_ci 76253a5a1b3Sopenharmony_ci if (u->sap_event) 76353a5a1b3Sopenharmony_ci m->core->mainloop->io_free(u->sap_event); 76453a5a1b3Sopenharmony_ci 76553a5a1b3Sopenharmony_ci if (u->check_death_event) 76653a5a1b3Sopenharmony_ci m->core->mainloop->time_free(u->check_death_event); 76753a5a1b3Sopenharmony_ci 76853a5a1b3Sopenharmony_ci pa_sap_context_destroy(&u->sap_context); 76953a5a1b3Sopenharmony_ci 77053a5a1b3Sopenharmony_ci if (u->by_origin) 77153a5a1b3Sopenharmony_ci pa_hashmap_free(u->by_origin); 77253a5a1b3Sopenharmony_ci 77353a5a1b3Sopenharmony_ci pa_xfree(u->sink_name); 77453a5a1b3Sopenharmony_ci pa_xfree(u); 77553a5a1b3Sopenharmony_ci} 776