1 2/*** 3 This file is part of PulseAudio. 4 5 Copyright 2006 Lennart Poettering 6 7 PulseAudio is free software; you can redistribute it and/or modify 8 it under the terms of the GNU Lesser General Public License as published 9 by the Free Software Foundation; either version 2.1 of the License, 10 or (at your option) any later version. 11 12 PulseAudio is distributed in the hope that it will be useful, but 13 WITHOUT ANY WARRANTY; without even the implied warranty of 14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15 General Public License for more details. 16 17 You should have received a copy of the GNU Lesser General Public License 18 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 19***/ 20 21#ifdef HAVE_CONFIG_H 22#include <config.h> 23#endif 24 25#include <stdio.h> 26#include <sys/socket.h> 27#include <netinet/in.h> 28#include <errno.h> 29#include <string.h> 30#include <unistd.h> 31#include <math.h> 32 33#include <pulse/rtclock.h> 34#include <pulse/timeval.h> 35#include <pulse/xmalloc.h> 36 37#include <pulsecore/core-error.h> 38#include <pulsecore/module.h> 39#include <pulsecore/llist.h> 40#include <pulsecore/sink.h> 41#include <pulsecore/sink-input.h> 42#include <pulsecore/memblockq.h> 43#include <pulsecore/log.h> 44#include <pulsecore/core-rtclock.h> 45#include <pulsecore/core-util.h> 46#include <pulsecore/modargs.h> 47#include <pulsecore/namereg.h> 48#include <pulsecore/sample-util.h> 49#include <pulsecore/macro.h> 50#include <pulsecore/socket-util.h> 51#include <pulsecore/atomic.h> 52#include <pulsecore/once.h> 53#include <pulsecore/poll.h> 54#include <pulsecore/arpa-inet.h> 55 56#include "rtp.h" 57#include "sdp.h" 58#include "sap.h" 59 60PA_MODULE_AUTHOR("Lennart Poettering"); 61PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP"); 62PA_MODULE_VERSION(PACKAGE_VERSION); 63PA_MODULE_LOAD_ONCE(false); 64PA_MODULE_USAGE( 65 "sink=<name of the sink> " 66 "sap_address=<multicast address to listen on> " 67 "latency_msec=<latency in ms> " 68); 69 70#define SAP_PORT 9875 71#define DEFAULT_SAP_ADDRESS "224.0.0.56" 72#define DEFAULT_LATENCY_MSEC 500 73#define MEMBLOCKQ_MAXLENGTH (1024*1024*40) 74#define MAX_SESSIONS 16 75#define DEATH_TIMEOUT 20 76#define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC) 77 78static const char* const valid_modargs[] = { 79 "sink", 80 "sap_address", 81 "latency_msec", 82 NULL 83}; 84 85struct session { 86 struct userdata *userdata; 87 PA_LLIST_FIELDS(struct session); 88 89 pa_sink_input *sink_input; 90 pa_memblockq *memblockq; 91 92 bool first_packet; 93 uint32_t offset; 94 95 struct pa_sdp_info sdp_info; 96 97 pa_rtp_context *rtp_context; 98 99 pa_rtpoll_item *rtpoll_item; 100 101 pa_atomic_t timestamp; 102 103 pa_usec_t intended_latency; 104 pa_usec_t sink_latency; 105 106 unsigned int base_rate; 107 pa_usec_t last_rate_update; 108 pa_usec_t last_latency; 109 double estimated_rate; 110 double avg_estimated_rate; 111}; 112 113struct userdata { 114 pa_module *module; 115 pa_core *core; 116 117 pa_sap_context sap_context; 118 pa_io_event* sap_event; 119 120 pa_time_event *check_death_event; 121 122 char *sink_name; 123 124 PA_LLIST_HEAD(struct session, sessions); 125 pa_hashmap *by_origin; 126 int n_sessions; 127 128 pa_usec_t latency; 129}; 130 131static void session_free(struct session *s); 132 133/* Called from I/O thread context */ 134static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { 135 struct session *s = PA_SINK_INPUT(o)->userdata; 136 137 switch (code) { 138 case PA_SINK_INPUT_MESSAGE_GET_LATENCY: 139 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec); 140 141 /* Fall through, the default handler will add in the extra 142 * latency added by the resampler */ 143 break; 144 } 145 146 return pa_sink_input_process_msg(o, code, data, offset, chunk); 147} 148 149/* Called from I/O thread context */ 150static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { 151 struct session *s; 152 pa_sink_input_assert_ref(i); 153 pa_assert_se(s = i->userdata); 154 155 if (pa_memblockq_peek(s->memblockq, chunk) < 0) 156 return -1; 157 158 pa_memblockq_drop(s->memblockq, chunk->length); 159 160 return 0; 161} 162 163/* Called from I/O thread context */ 164static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) { 165 struct session *s; 166 167 pa_sink_input_assert_ref(i); 168 pa_assert_se(s = i->userdata); 169 170 pa_memblockq_rewind(s->memblockq, nbytes); 171} 172 173/* Called from I/O thread context */ 174static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) { 175 struct session *s; 176 177 pa_sink_input_assert_ref(i); 178 pa_assert_se(s = i->userdata); 179 180 pa_memblockq_set_maxrewind(s->memblockq, nbytes); 181} 182 183/* Called from main context */ 184static void sink_input_kill(pa_sink_input* i) { 185 struct session *s; 186 pa_sink_input_assert_ref(i); 187 pa_assert_se(s = i->userdata); 188 189 pa_hashmap_remove_and_free(s->userdata->by_origin, s->sdp_info.origin); 190} 191 192/* Called from IO context */ 193static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) { 194 struct session *s; 195 pa_sink_input_assert_ref(i); 196 pa_assert_se(s = i->userdata); 197 198 if (b) 199 pa_memblockq_flush_read(s->memblockq); 200 else 201 s->first_packet = false; 202} 203 204/* Called from I/O thread context */ 205static int rtpoll_work_cb(pa_rtpoll_item *i) { 206 pa_memchunk chunk; 207 uint32_t timestamp; 208 int64_t k, j, delta; 209 struct timeval now = { 0, 0 }; 210 struct session *s; 211 struct pollfd *p; 212 213 pa_assert_se(s = pa_rtpoll_item_get_work_userdata(i)); 214 215 p = pa_rtpoll_item_get_pollfd(i, NULL); 216 217 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) { 218 pa_log("poll() signalled bad revents."); 219 return -1; 220 } 221 222 if ((p->revents & POLLIN) == 0) 223 return 0; 224 225 p->revents = 0; 226 227 if (pa_rtp_recv(s->rtp_context, &chunk, s->userdata->module->core->mempool, ×tamp, &now) < 0) 228 return 0; 229 230 if (!PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) { 231 pa_memblock_unref(chunk.memblock); 232 return 0; 233 } 234 235 if (!s->first_packet) { 236 s->first_packet = true; 237 s->offset = timestamp; 238 } 239 240 /* Check whether there was a timestamp overflow */ 241 k = (int64_t) timestamp - (int64_t) s->offset; 242 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) timestamp; 243 244 if ((k < 0 ? -k : k) < (j < 0 ? -j : j)) 245 delta = k; 246 else 247 delta = j; 248 249 pa_memblockq_seek(s->memblockq, delta * (int64_t) pa_rtp_context_get_frame_size(s->rtp_context), PA_SEEK_RELATIVE, 250 true); 251 252 if (now.tv_sec == 0) { 253 PA_ONCE_BEGIN { 254 pa_log_warn("Using artificial time instead of timestamp"); 255 } PA_ONCE_END; 256 pa_rtclock_get(&now); 257 } else 258 pa_rtclock_from_wallclock(&now); 259 260 if (pa_memblockq_push(s->memblockq, &chunk) < 0) { 261 pa_log_warn("Queue overrun"); 262 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true); 263 } 264 265/* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */ 266 267 pa_memblock_unref(chunk.memblock); 268 269 /* The next timestamp we expect */ 270 s->offset = timestamp + (uint32_t) (chunk.length / pa_rtp_context_get_frame_size(s->rtp_context)); 271 272 pa_atomic_store(&s->timestamp, (int) now.tv_sec); 273 274 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) { 275 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency; 276 uint32_t current_rate = s->sink_input->sample_spec.rate; 277 uint32_t new_rate; 278 double estimated_rate, alpha = 0.02; 279 280 pa_log_debug("Updating sample rate"); 281 282 wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec); 283 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec); 284 285 pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri); 286 287 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink, false); 288 sink_delay += pa_resampler_get_delay_usec(s->sink_input->thread_info.resampler); 289 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec); 290 291 if (ri > render_delay+sink_delay) 292 ri -= render_delay+sink_delay; 293 else 294 ri = 0; 295 296 if (wi < ri) 297 latency = 0; 298 else 299 latency = wi - ri; 300 301 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC); 302 303 /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in 304 * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that 305 * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate 306 * T 307 * R̂ = ─────────────── Rⁿ . (1) 308 * T - (Lⁿ - Lⁿ⁻ⁱ) 309 * 310 * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂ 311 * is correct). But there is also the requirement to keep the buffer at a predefined target 312 * latency L̂. So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R 313 * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time 314 * aT the latency is reduced from Lⁿ to L̂. This strategy translates to the requirements 315 * ₐ R̂ - Rⁿ⁺ʲ a-j+1 j-1 316 * Σ T ────────── = L̂ - Lⁿ with Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ . 317 * ʲ⁼ⁱ R̂ a a 318 * Solving for Rⁿ⁺ⁱ gives 319 * T - ²∕ₐ₊₁(L̂ - Lⁿ) 320 * Rⁿ⁺ⁱ = ───────────────── R̂ . (2) 321 * T 322 * In the code below a = 7 is used. 323 * 324 * Equation (1) is not directly used in (2), but instead an exponentially weighted average 325 * of the estimated rate R̂ is used. This average R̅ is defined as 326 * R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ . 327 * Because it is difficult to find a fixed value for the coefficient α such that the 328 * averaging is without significant lag but oscillations are filtered out, a heuristic is 329 * used. When the successive estimates R̂ⁿ do not change much then α→1, but when there is a 330 * sudden spike in the estimated rate α→0, such that the deviation is given little weight. 331 */ 332 estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency); 333 if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) { 334 double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate); 335 alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8); 336 } 337 s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate; 338 s->estimated_rate = estimated_rate; 339 pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha); 340 new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate); 341 s->last_latency = latency; 342 343 if (new_rate < (uint32_t) (s->base_rate*0.8) || new_rate > (uint32_t) (s->base_rate*1.25)) { 344 pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", s->base_rate, new_rate); 345 new_rate = s->base_rate; 346 } else { 347 if (s->base_rate < new_rate + 20 && new_rate < s->base_rate + 20) 348 new_rate = s->base_rate; 349 /* Do the adjustment in small steps; 2‰ can be considered inaudible */ 350 if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) { 351 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate); 352 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002)); 353 } 354 } 355 s->sink_input->sample_spec.rate = new_rate; 356 357 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec)); 358 359 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate); 360 361 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate); 362 363 s->last_rate_update = pa_timeval_load(&now); 364 } 365 366 if (pa_memblockq_is_readable(s->memblockq) && 367 s->sink_input->thread_info.underrun_for > 0) { 368 pa_log_debug("Requesting rewind due to end of underrun"); 369 pa_sink_input_request_rewind(s->sink_input, 370 (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for), 371 false, true, false); 372 } 373 374 return 1; 375} 376 377/* Called from I/O thread context */ 378static void sink_input_attach(pa_sink_input *i) { 379 struct session *s; 380 381 pa_sink_input_assert_ref(i); 382 pa_assert_se(s = i->userdata); 383 384 pa_assert(!s->rtpoll_item); 385 s->rtpoll_item = pa_rtp_context_get_rtpoll_item(s->rtp_context, i->sink->thread_info.rtpoll); 386 387 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb, s); 388} 389 390/* Called from I/O thread context */ 391static void sink_input_detach(pa_sink_input *i) { 392 struct session *s; 393 pa_sink_input_assert_ref(i); 394 pa_assert_se(s = i->userdata); 395 396 pa_assert(s->rtpoll_item); 397 pa_rtpoll_item_free(s->rtpoll_item); 398 s->rtpoll_item = NULL; 399} 400 401static int mcast_socket(const struct sockaddr* sa, socklen_t salen) { 402 int af, fd = -1, r, one; 403 404 pa_assert(sa); 405 pa_assert(salen > 0); 406 407 af = sa->sa_family; 408 if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) { 409 pa_log("Failed to create socket: %s", pa_cstrerror(errno)); 410 goto fail; 411 } 412 413 pa_make_udp_socket_low_delay(fd); 414 415#ifdef SO_TIMESTAMP 416 one = 1; 417 if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) { 418 pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno)); 419 goto fail; 420 } 421#else 422 pa_log("SO_TIMESTAMP unsupported on this platform"); 423 goto fail; 424#endif 425 426 one = 1; 427 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) { 428 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno)); 429 goto fail; 430 } 431 432 r = 0; 433 if (af == AF_INET) { 434 /* IPv4 multicast addresses are in the 224.0.0.0-239.255.255.255 range */ 435 static const uint32_t ipv4_mcast_mask = 0xe0000000; 436 437 if ((ntohl(((const struct sockaddr_in*) sa)->sin_addr.s_addr) & ipv4_mcast_mask) == ipv4_mcast_mask) { 438 struct ip_mreq mr4; 439 memset(&mr4, 0, sizeof(mr4)); 440 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr; 441 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4)); 442 } 443#ifdef HAVE_IPV6 444 } else if (af == AF_INET6) { 445 /* IPv6 multicast addresses have 255 as the most significant byte */ 446 if (((const struct sockaddr_in6*) sa)->sin6_addr.s6_addr[0] == 0xff) { 447 struct ipv6_mreq mr6; 448 memset(&mr6, 0, sizeof(mr6)); 449 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr; 450 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6)); 451 } 452#endif 453 } else 454 pa_assert_not_reached(); 455 456 if (r < 0) { 457 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno)); 458 goto fail; 459 } 460 461 if (bind(fd, sa, salen) < 0) { 462 pa_log("bind() failed: %s", pa_cstrerror(errno)); 463 goto fail; 464 } 465 466 return fd; 467 468fail: 469 if (fd >= 0) 470 close(fd); 471 472 return -1; 473} 474 475static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) { 476 struct session *s = NULL; 477 pa_sink *sink; 478 int fd = -1; 479 pa_memchunk silence; 480 pa_sink_input_new_data data; 481 struct timeval now; 482 483 pa_assert(u); 484 pa_assert(sdp_info); 485 486 if (u->n_sessions >= MAX_SESSIONS) { 487 pa_log("Session limit reached."); 488 goto fail; 489 } 490 491 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) { 492 pa_log("Sink does not exist."); 493 goto fail; 494 } 495 496 pa_rtclock_get(&now); 497 498 s = pa_xnew0(struct session, 1); 499 s->userdata = u; 500 s->first_packet = false; 501 s->sdp_info = *sdp_info; 502 s->rtpoll_item = NULL; 503 s->intended_latency = u->latency; 504 s->last_rate_update = pa_timeval_load(&now); 505 s->last_latency = u->latency; 506 pa_atomic_store(&s->timestamp, (int) now.tv_sec); 507 508 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0) 509 goto fail; 510 511 pa_sink_input_new_data_init(&data); 512 pa_sink_input_new_data_set_sink(&data, sink, false, true); 513 data.driver = __FILE__; 514 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream"); 515 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME, 516 "RTP Stream%s%s%s", 517 sdp_info->session_name ? " (" : "", 518 sdp_info->session_name ? sdp_info->session_name : "", 519 sdp_info->session_name ? ")" : ""); 520 521 if (sdp_info->session_name) 522 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name); 523 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin); 524 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload); 525 data.module = u->module; 526 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec); 527 data.flags = PA_SINK_INPUT_VARIABLE_RATE; 528 529 pa_sink_input_new(&s->sink_input, u->module->core, &data); 530 pa_sink_input_new_data_done(&data); 531 532 if (!s->sink_input) { 533 pa_log("Failed to create sink input."); 534 goto fail; 535 } 536 537 s->base_rate = (double) s->sink_input->sample_spec.rate; 538 s->estimated_rate = (double) s->sink_input->sample_spec.rate; 539 s->avg_estimated_rate = (double) s->sink_input->sample_spec.rate; 540 541 s->sink_input->userdata = s; 542 543 s->sink_input->parent.process_msg = sink_input_process_msg; 544 s->sink_input->pop = sink_input_pop_cb; 545 s->sink_input->process_rewind = sink_input_process_rewind_cb; 546 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb; 547 s->sink_input->kill = sink_input_kill; 548 s->sink_input->attach = sink_input_attach; 549 s->sink_input->detach = sink_input_detach; 550 s->sink_input->suspend_within_thread = sink_input_suspend_within_thread; 551 552 pa_sink_input_get_silence(s->sink_input, &silence); 553 554 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2); 555 556 if (s->intended_latency < s->sink_latency*2) 557 s->intended_latency = s->sink_latency*2; 558 559 s->memblockq = pa_memblockq_new( 560 "module-rtp-recv memblockq", 561 0, 562 MEMBLOCKQ_MAXLENGTH, 563 MEMBLOCKQ_MAXLENGTH, 564 &s->sink_input->sample_spec, 565 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec), 566 0, 567 0, 568 &silence); 569 570 pa_memblock_unref(silence.memblock); 571 572 if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, &s->sdp_info.sample_spec, sdp_info->enable_opus))) 573 goto fail; 574 575 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s); 576 u->n_sessions++; 577 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s); 578 579 pa_sink_input_put(s->sink_input); 580 581 pa_log_info("New session '%s'", s->sdp_info.session_name); 582 583 return s; 584 585fail: 586 pa_xfree(s); 587 588 if (fd >= 0) 589 pa_close(fd); 590 591 return NULL; 592} 593 594static void session_free(struct session *s) { 595 pa_assert(s); 596 597 pa_log_info("Freeing session '%s'", s->sdp_info.session_name); 598 599 pa_sink_input_unlink(s->sink_input); 600 pa_sink_input_unref(s->sink_input); 601 602 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s); 603 pa_assert(s->userdata->n_sessions >= 1); 604 s->userdata->n_sessions--; 605 606 pa_memblockq_free(s->memblockq); 607 pa_sdp_info_destroy(&s->sdp_info); 608 pa_rtp_context_free(s->rtp_context); 609 610 pa_xfree(s); 611} 612 613static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) { 614 struct userdata *u = userdata; 615 bool goodbye = false; 616 pa_sdp_info info; 617 struct session *s; 618 619 pa_assert(m); 620 pa_assert(e); 621 pa_assert(u); 622 pa_assert(fd == u->sap_context.fd); 623 pa_assert(flags == PA_IO_EVENT_INPUT); 624 625 if (pa_sap_recv(&u->sap_context, &goodbye) < 0) 626 return; 627 628 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye)) 629 return; 630 631 if (goodbye) { 632 pa_hashmap_remove_and_free(u->by_origin, info.origin); 633 pa_sdp_info_destroy(&info); 634 } else { 635 636 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) { 637 if (!session_new(u, &info)) 638 pa_sdp_info_destroy(&info); 639 640 } else { 641 struct timeval now; 642 pa_rtclock_get(&now); 643 pa_atomic_store(&s->timestamp, (int) now.tv_sec); 644 645 pa_sdp_info_destroy(&info); 646 } 647 } 648} 649 650static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) { 651 struct session *s, *n; 652 struct userdata *u = userdata; 653 struct timeval now; 654 655 pa_assert(m); 656 pa_assert(t); 657 pa_assert(u); 658 659 pa_rtclock_get(&now); 660 661 pa_log_debug("Checking for dead streams ..."); 662 663 for (s = u->sessions; s; s = n) { 664 int k; 665 n = s->next; 666 667 k = pa_atomic_load(&s->timestamp); 668 669 if (k + DEATH_TIMEOUT < now.tv_sec) 670 pa_hashmap_remove_and_free(u->by_origin, s->sdp_info.origin); 671 } 672 673 /* Restart timer */ 674 pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC); 675} 676 677int pa__init(pa_module*m) { 678 struct userdata *u; 679 pa_modargs *ma = NULL; 680 struct sockaddr_in sa4; 681#ifdef HAVE_IPV6 682 struct sockaddr_in6 sa6; 683#endif 684 struct sockaddr *sa; 685 socklen_t salen; 686 const char *sap_address; 687 uint32_t latency_msec; 688 int fd = -1; 689 690 pa_assert(m); 691 692 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { 693 pa_log("failed to parse module arguments"); 694 goto fail; 695 } 696 697 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS); 698 699 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) { 700 sa4.sin_family = AF_INET; 701 sa4.sin_port = htons(SAP_PORT); 702 sa = (struct sockaddr*) &sa4; 703 salen = sizeof(sa4); 704#ifdef HAVE_IPV6 705 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) { 706 sa6.sin6_family = AF_INET6; 707 sa6.sin6_port = htons(SAP_PORT); 708 sa = (struct sockaddr*) &sa6; 709 salen = sizeof(sa6); 710#endif 711 } else { 712 pa_log("Invalid SAP address '%s'", sap_address); 713 goto fail; 714 } 715 716 latency_msec = DEFAULT_LATENCY_MSEC; 717 if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 300000) { 718 pa_log("Invalid latency specification"); 719 goto fail; 720 } 721 722 if ((fd = mcast_socket(sa, salen)) < 0) 723 goto fail; 724 725 m->userdata = u = pa_xnew(struct userdata, 1); 726 u->module = m; 727 u->core = m->core; 728 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL)); 729 u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC; 730 731 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u); 732 pa_sap_context_init_recv(&u->sap_context, fd); 733 734 PA_LLIST_HEAD_INIT(struct session, u->sessions); 735 u->n_sessions = 0; 736 u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free); 737 738 u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u); 739 740 pa_modargs_free(ma); 741 742 return 0; 743 744fail: 745 if (ma) 746 pa_modargs_free(ma); 747 748 if (fd >= 0) 749 pa_close(fd); 750 751 return -1; 752} 753 754void pa__done(pa_module*m) { 755 struct userdata *u; 756 757 pa_assert(m); 758 759 if (!(u = m->userdata)) 760 return; 761 762 if (u->sap_event) 763 m->core->mainloop->io_free(u->sap_event); 764 765 if (u->check_death_event) 766 m->core->mainloop->time_free(u->check_death_event); 767 768 pa_sap_context_destroy(&u->sap_context); 769 770 if (u->by_origin) 771 pa_hashmap_free(u->by_origin); 772 773 pa_xfree(u->sink_name); 774 pa_xfree(u); 775} 776