1/*** 2 This file is part of PulseAudio. 3 4 Copyright 2006 Lennart Poettering 5 6 PulseAudio is free software; you can redistribute it and/or modify 7 it under the terms of the GNU Lesser General Public License as published 8 by the Free Software Foundation; either version 2.1 of the License, 9 or (at your option) any later version. 10 11 PulseAudio is distributed in the hope that it will be useful, but 12 WITHOUT ANY WARRANTY; without even the implied warranty of 13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 General Public License for more details. 15 16 You should have received a copy of the GNU Lesser General Public License 17 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 18***/ 19 20#ifdef HAVE_CONFIG_H 21#include <config.h> 22#endif 23 24#include <stdio.h> 25#include <sys/socket.h> 26#include <netinet/in.h> 27#include <errno.h> 28#include <unistd.h> 29 30#include <pulse/rtclock.h> 31#include <pulse/timeval.h> 32#include <pulse/util.h> 33#include <pulse/xmalloc.h> 34 35#include <pulsecore/core-error.h> 36#include <pulsecore/module.h> 37#include <pulsecore/source.h> 38#include <pulsecore/source-output.h> 39#include <pulsecore/memblockq.h> 40#include <pulsecore/log.h> 41#include <pulsecore/core-util.h> 42#include <pulsecore/modargs.h> 43#include <pulsecore/namereg.h> 44#include <pulsecore/sample-util.h> 45#include <pulsecore/macro.h> 46#include <pulsecore/socket-util.h> 47#include <pulsecore/arpa-inet.h> 48 49#include "rtp.h" 50#include "sdp.h" 51#include "sap.h" 52 53PA_MODULE_AUTHOR("Lennart Poettering"); 54PA_MODULE_DESCRIPTION("Read data from source and send it to the network via RTP/SAP/SDP"); 55PA_MODULE_VERSION(PACKAGE_VERSION); 56PA_MODULE_LOAD_ONCE(false); 57PA_MODULE_USAGE( 58 "source=<name of the source> " 59 "format=<sample format> " 60 "channels=<number of channels> " 61 "rate=<sample rate> " 62 "destination_ip=<destination IP address> " 63 "source_ip=<source IP address> " 64 "port=<port number> " 65 "mtu=<maximum transfer unit> " 66 "loop=<loopback to local host?> " 67 "ttl=<ttl value> " 68 "inhibit_auto_suspend=<always|never|only_with_non_monitor_sources>" 69 "stream_name=<name of the stream>" 70 "enable_opus=<enable OPUS codec>" 71); 72 73#define DEFAULT_PORT 46000 74#define DEFAULT_TTL 1 75#define SAP_PORT 9875 76#define DEFAULT_SOURCE_IP "0.0.0.0" 77#define DEFAULT_DESTINATION_IP "224.0.0.56" 78#define MEMBLOCKQ_MAXLENGTH (1024*170) 79#define DEFAULT_MTU 1280 80#define SAP_INTERVAL (5*PA_USEC_PER_SEC) 81 82static const char* const valid_modargs[] = { 83 "source", 84 "format", 85 "channels", 86 "rate", 87 "destination", /* Compatbility */ 88 "destination_ip", 89 "source_ip", 90 "port", 91 "mtu" , 92 "loop", 93 "ttl", 94 "inhibit_auto_suspend", 95 "stream_name", 96 "enable_opus", 97 NULL 98}; 99 100enum inhibit_auto_suspend { 101 INHIBIT_AUTO_SUSPEND_ALWAYS, 102 INHIBIT_AUTO_SUSPEND_NEVER, 103 INHIBIT_AUTO_SUSPEND_ONLY_WITH_NON_MONITOR_SOURCES 104}; 105 106struct userdata { 107 pa_module *module; 108 109 pa_source_output *source_output; 110 pa_memblockq *memblockq; 111 112 pa_rtp_context *rtp_context; 113 pa_sap_context sap_context; 114 115 pa_time_event *sap_event; 116 117 enum inhibit_auto_suspend inhibit_auto_suspend; 118}; 119 120/* Called from I/O thread context */ 121static int source_output_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { 122 struct userdata *u; 123 pa_assert_se(u = PA_SOURCE_OUTPUT(o)->userdata); 124 125 switch (code) { 126 case PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY: 127 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(u->memblockq), &u->source_output->sample_spec); 128 129 /* Fall through, the default handler will add in the extra 130 * latency added by the resampler */ 131 break; 132 } 133 134 return pa_source_output_process_msg(o, code, data, offset, chunk); 135} 136 137/* Called from I/O thread context */ 138static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) { 139 struct userdata *u; 140 pa_source_output_assert_ref(o); 141 pa_assert_se(u = o->userdata); 142 143 if (pa_memblockq_push(u->memblockq, chunk) < 0) { 144 pa_log_warn("Failed to push chunk into memblockq."); 145 return; 146 } 147 148 pa_rtp_send(u->rtp_context, u->memblockq); 149} 150 151static pa_source_output_flags_t get_dont_inhibit_auto_suspend_flag(pa_source *source, 152 enum inhibit_auto_suspend inhibit_auto_suspend) { 153 pa_assert(source); 154 155 switch (inhibit_auto_suspend) { 156 case INHIBIT_AUTO_SUSPEND_ALWAYS: 157 return 0; 158 159 case INHIBIT_AUTO_SUSPEND_NEVER: 160 return PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND; 161 162 case INHIBIT_AUTO_SUSPEND_ONLY_WITH_NON_MONITOR_SOURCES: 163 return source->monitor_of ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0; 164 } 165 166 pa_assert_not_reached(); 167} 168 169/* Called from the main thread. */ 170static void source_output_moving_cb(pa_source_output *o, pa_source *dest) { 171 struct userdata *u; 172 173 pa_assert(o); 174 175 u = o->userdata; 176 177 if (!dest) 178 return; 179 180 o->flags &= ~PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND; 181 o->flags |= get_dont_inhibit_auto_suspend_flag(dest, u->inhibit_auto_suspend); 182} 183 184/* Called from main context */ 185static void source_output_kill_cb(pa_source_output* o) { 186 struct userdata *u; 187 pa_source_output_assert_ref(o); 188 pa_assert_se(u = o->userdata); 189 190 pa_module_unload_request(u->module, true); 191 192 pa_source_output_unlink(u->source_output); 193 pa_source_output_unref(u->source_output); 194 u->source_output = NULL; 195} 196 197static void sap_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) { 198 struct userdata *u = userdata; 199 200 pa_assert(m); 201 pa_assert(t); 202 pa_assert(u); 203 204 pa_sap_send(&u->sap_context, 0); 205 206 pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + SAP_INTERVAL); 207} 208 209int pa__init(pa_module*m) { 210 struct userdata *u; 211 pa_modargs *ma = NULL; 212 const char *dst_addr; 213 const char *src_addr; 214 uint32_t port = DEFAULT_PORT, mtu; 215 uint32_t ttl = DEFAULT_TTL; 216 sa_family_t af; 217 int fd = -1, sap_fd = -1; 218 pa_source *s; 219 pa_sample_spec ss; 220 pa_channel_map cm; 221 struct sockaddr_in dst_sa4, dst_sap_sa4, src_sa4, src_sap_sa4; 222#ifdef HAVE_IPV6 223 struct sockaddr_in6 dst_sa6, dst_sap_sa6, src_sa6, src_sap_sa6; 224#endif 225 struct sockaddr_storage sa_dst; 226 pa_source_output *o = NULL; 227 uint8_t payload; 228 char *p; 229 int r, j; 230 socklen_t k; 231 char hn[128], *n; 232 bool loop = false; 233 bool enable_opus = false; 234 enum inhibit_auto_suspend inhibit_auto_suspend = INHIBIT_AUTO_SUSPEND_ONLY_WITH_NON_MONITOR_SOURCES; 235 const char *inhibit_auto_suspend_str; 236 pa_source_output_new_data data; 237 238 pa_assert(m); 239 240 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { 241 pa_log("Failed to parse module arguments"); 242 goto fail; 243 } 244 245 if (!(s = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source", NULL), PA_NAMEREG_SOURCE))) { 246 pa_log("Source does not exist."); 247 goto fail; 248 } 249 250 if (pa_modargs_get_value_boolean(ma, "loop", &loop) < 0) { 251 pa_log("Failed to parse \"loop\" parameter."); 252 goto fail; 253 } 254 255 if (pa_modargs_get_value_boolean(ma, "enable_opus", &enable_opus) < 0) { 256 pa_log("Failed to parse \"use_opus\" parameter."); 257 goto fail; 258 } 259 260 if ((inhibit_auto_suspend_str = pa_modargs_get_value(ma, "inhibit_auto_suspend", NULL))) { 261 if (pa_streq(inhibit_auto_suspend_str, "always")) 262 inhibit_auto_suspend = INHIBIT_AUTO_SUSPEND_ALWAYS; 263 else if (pa_streq(inhibit_auto_suspend_str, "never")) 264 inhibit_auto_suspend = INHIBIT_AUTO_SUSPEND_NEVER; 265 else if (pa_streq(inhibit_auto_suspend_str, "only_with_non_monitor_sources")) 266 inhibit_auto_suspend = INHIBIT_AUTO_SUSPEND_ONLY_WITH_NON_MONITOR_SOURCES; 267 else { 268 pa_log("Failed to parse the \"inhibit_auto_suspend\" parameter."); 269 goto fail; 270 } 271 } 272 273 ss = s->sample_spec; 274 pa_rtp_sample_spec_fixup(&ss, enable_opus); 275 cm = s->channel_map; 276 if (pa_modargs_get_sample_spec(ma, &ss) < 0) { 277 pa_log("Failed to parse sample specification"); 278 goto fail; 279 } 280 281 if (!pa_rtp_sample_spec_valid(&ss)) { 282 pa_log("Specified sample type not compatible with RTP"); 283 goto fail; 284 } 285 286 if (enable_opus && ss.rate != 48000) { 287 pa_log_warn("OPUS requires sample rate as 48 KHz. Setting rate=48000."); 288 ss.rate = 48000; 289 } 290 291 if (ss.channels != cm.channels) 292 pa_channel_map_init_auto(&cm, ss.channels, PA_CHANNEL_MAP_AIFF); 293 294 payload = pa_rtp_payload_from_sample_spec(&ss); 295 296 mtu = (uint32_t) pa_frame_align(DEFAULT_MTU, &ss); 297 298 if (pa_modargs_get_value_u32(ma, "mtu", &mtu) < 0 || mtu < 1 || mtu % pa_frame_size(&ss) != 0) { 299 pa_log("Invalid MTU."); 300 goto fail; 301 } 302 303 port = DEFAULT_PORT + ((uint32_t) (rand() % 512) << 1); 304 if (pa_modargs_get_value_u32(ma, "port", &port) < 0 || port < 1 || port > 0xFFFF) { 305 pa_log("port= expects a numerical argument between 1 and 65535."); 306 goto fail; 307 } 308 309 if (port & 1) 310 pa_log_warn("Port number not even as suggested in RFC3550!"); 311 312 if (pa_modargs_get_value_u32(ma, "ttl", &ttl) < 0 || ttl < 1 || ttl > 0xFF) { 313 pa_log("ttl= expects a numerical argument between 1 and 255."); 314 goto fail; 315 } 316 317 src_addr = pa_modargs_get_value(ma, "source_ip", DEFAULT_SOURCE_IP); 318 319 if (inet_pton(AF_INET, src_addr, &src_sa4.sin_addr) > 0) { 320 src_sa4.sin_family = af = AF_INET; 321 src_sa4.sin_port = htons(0); 322 memset(&src_sa4.sin_zero, 0, sizeof(src_sa4.sin_zero)); 323 src_sap_sa4 = src_sa4; 324#ifdef HAVE_IPV6 325 } else if (inet_pton(AF_INET6, src_addr, &src_sa6.sin6_addr) > 0) { 326 src_sa6.sin6_family = af = AF_INET6; 327 src_sa6.sin6_port = htons(0); 328 src_sa6.sin6_flowinfo = 0; 329 src_sa6.sin6_scope_id = 0; 330 src_sap_sa6 = src_sa6; 331#endif 332 } else { 333 pa_log("Invalid source address '%s'", src_addr); 334 goto fail; 335 } 336 337 dst_addr = pa_modargs_get_value(ma, "destination", NULL); 338 if (dst_addr == NULL) 339 dst_addr = pa_modargs_get_value(ma, "destination_ip", DEFAULT_DESTINATION_IP); 340 341 if (inet_pton(AF_INET, dst_addr, &dst_sa4.sin_addr) > 0) { 342 dst_sa4.sin_family = af = AF_INET; 343 dst_sa4.sin_port = htons((uint16_t) port); 344 memset(&dst_sa4.sin_zero, 0, sizeof(dst_sa4.sin_zero)); 345 dst_sap_sa4 = dst_sa4; 346 dst_sap_sa4.sin_port = htons(SAP_PORT); 347#ifdef HAVE_IPV6 348 } else if (inet_pton(AF_INET6, dst_addr, &dst_sa6.sin6_addr) > 0) { 349 dst_sa6.sin6_family = af = AF_INET6; 350 dst_sa6.sin6_port = htons((uint16_t) port); 351 dst_sa6.sin6_flowinfo = 0; 352 dst_sa6.sin6_scope_id = 0; 353 dst_sap_sa6 = dst_sa6; 354 dst_sap_sa6.sin6_port = htons(SAP_PORT); 355#endif 356 } else { 357 pa_log("Invalid destination '%s'", dst_addr); 358 goto fail; 359 } 360 361 if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) { 362 pa_log("socket() failed: %s", pa_cstrerror(errno)); 363 goto fail; 364 } 365 366 if (af == AF_INET && bind(fd, (struct sockaddr*) &src_sa4, sizeof(src_sa4)) < 0) { 367 pa_log("bind() failed: %s", pa_cstrerror(errno)); 368 goto fail; 369#ifdef HAVE_IPV6 370 } else if (af == AF_INET6 && bind(fd, (struct sockaddr*) &src_sa6, sizeof(src_sa6)) < 0) { 371 pa_log("bind() failed: %s", pa_cstrerror(errno)); 372 goto fail; 373#endif 374 } 375 376 if (af == AF_INET && connect(fd, (struct sockaddr*) &dst_sa4, sizeof(dst_sa4)) < 0) { 377 pa_log("connect() failed: %s", pa_cstrerror(errno)); 378 goto fail; 379#ifdef HAVE_IPV6 380 } else if (af == AF_INET6 && connect(fd, (struct sockaddr*) &dst_sa6, sizeof(dst_sa6)) < 0) { 381 pa_log("connect() failed: %s", pa_cstrerror(errno)); 382 goto fail; 383#endif 384 } 385 386 if ((sap_fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) { 387 pa_log("socket() failed: %s", pa_cstrerror(errno)); 388 goto fail; 389 } 390 391 if (af == AF_INET && bind(sap_fd, (struct sockaddr*) &src_sap_sa4, sizeof(src_sap_sa4)) < 0) { 392 pa_log("bind() failed: %s", pa_cstrerror(errno)); 393 goto fail; 394#ifdef HAVE_IPV6 395 } else if (af == AF_INET6 && bind(sap_fd, (struct sockaddr*) &src_sap_sa6, sizeof(src_sap_sa6)) < 0) { 396 pa_log("bind() failed: %s", pa_cstrerror(errno)); 397 goto fail; 398#endif 399 } 400 401 if (af == AF_INET && connect(sap_fd, (struct sockaddr*) &dst_sap_sa4, sizeof(dst_sap_sa4)) < 0) { 402 pa_log("connect() failed: %s", pa_cstrerror(errno)); 403 goto fail; 404#ifdef HAVE_IPV6 405 } else if (af == AF_INET6 && connect(sap_fd, (struct sockaddr*) &dst_sap_sa6, sizeof(dst_sap_sa6)) < 0) { 406 pa_log("connect() failed: %s", pa_cstrerror(errno)); 407 goto fail; 408#endif 409 } 410 411 j = loop; 412 if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, &j, sizeof(j)) < 0 || 413 setsockopt(sap_fd, IPPROTO_IP, IP_MULTICAST_LOOP, &j, sizeof(j)) < 0) { 414 pa_log("IP_MULTICAST_LOOP failed: %s", pa_cstrerror(errno)); 415 goto fail; 416 } 417 418 if (ttl != DEFAULT_TTL) { 419 int _ttl = (int) ttl; 420 421 if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &_ttl, sizeof(_ttl)) < 0) { 422 pa_log("IP_MULTICAST_TTL failed: %s", pa_cstrerror(errno)); 423 goto fail; 424 } 425 426 if (setsockopt(sap_fd, IPPROTO_IP, IP_MULTICAST_TTL, &_ttl, sizeof(_ttl)) < 0) { 427 pa_log("IP_MULTICAST_TTL (sap) failed: %s", pa_cstrerror(errno)); 428 goto fail; 429 } 430 } 431 432 /* If the socket queue is full, let's drop packets */ 433 pa_make_fd_nonblock(fd); 434 pa_make_udp_socket_low_delay(fd); 435 436 pa_source_output_new_data_init(&data); 437 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_NAME, "RTP Monitor Stream"); 438 pa_proplist_sets(data.proplist, "rtp.source", src_addr); 439 pa_proplist_sets(data.proplist, "rtp.destination", dst_addr); 440 pa_proplist_setf(data.proplist, "rtp.mtu", "%lu", (unsigned long) mtu); 441 pa_proplist_setf(data.proplist, "rtp.port", "%lu", (unsigned long) port); 442 pa_proplist_setf(data.proplist, "rtp.ttl", "%lu", (unsigned long) ttl); 443 data.driver = __FILE__; 444 data.module = m; 445 pa_source_output_new_data_set_source(&data, s, false, true); 446 pa_source_output_new_data_set_sample_spec(&data, &ss); 447 pa_source_output_new_data_set_channel_map(&data, &cm); 448 data.flags |= get_dont_inhibit_auto_suspend_flag(s, inhibit_auto_suspend); 449 450 pa_source_output_new(&o, m->core, &data); 451 pa_source_output_new_data_done(&data); 452 453 if (!o) { 454 pa_log("failed to create source output."); 455 goto fail; 456 } 457 458 o->parent.process_msg = source_output_process_msg; 459 o->push = source_output_push_cb; 460 o->moving = source_output_moving_cb; 461 o->kill = source_output_kill_cb; 462 463 pa_log_info("Configured source latency of %llu ms.", 464 (unsigned long long) pa_source_output_set_requested_latency(o, pa_bytes_to_usec(mtu, &o->sample_spec)) / PA_USEC_PER_MSEC); 465 466 m->userdata = o->userdata = u = pa_xnew(struct userdata, 1); 467 u->module = m; 468 u->source_output = o; 469 470 u->memblockq = pa_memblockq_new( 471 "module-rtp-send memblockq", 472 0, 473 MEMBLOCKQ_MAXLENGTH, 474 MEMBLOCKQ_MAXLENGTH, 475 &ss, 476 1, 477 0, 478 0, 479 NULL); 480 481 k = sizeof(sa_dst); 482 pa_assert_se((r = getsockname(fd, (struct sockaddr*) &sa_dst, &k)) >= 0); 483 484 n = pa_xstrdup(pa_modargs_get_value(ma, "stream_name", NULL)); 485 if (n == NULL) 486 n = pa_sprintf_malloc("PulseAudio RTP Stream on %s", pa_get_fqdn(hn, sizeof(hn))); 487 488 if (af == AF_INET) { 489 p = pa_sdp_build(af, 490 (void*) &((struct sockaddr_in*) &sa_dst)->sin_addr, 491 (void*) &dst_sa4.sin_addr, 492 n, (uint16_t) port, payload, &ss, enable_opus); 493#ifdef HAVE_IPV6 494 } else { 495 p = pa_sdp_build(af, 496 (void*) &((struct sockaddr_in6*) &sa_dst)->sin6_addr, 497 (void*) &dst_sa6.sin6_addr, 498 n, (uint16_t) port, payload, &ss, enable_opus); 499#endif 500 } 501 502 pa_xfree(n); 503 504 if (!(u->rtp_context = pa_rtp_context_new_send(fd, payload, mtu, &ss, enable_opus))) 505 goto fail; 506 pa_sap_context_init_send(&u->sap_context, sap_fd, p); 507 508 pa_log_info("RTP stream initialized with mtu %u on %s:%u from %s ttl=%u, payload=%u", 509 mtu, dst_addr, port, src_addr, ttl, payload); 510 pa_log_info("SDP-Data:\n%s\nEOF", p); 511 512 pa_sap_send(&u->sap_context, 0); 513 514 u->sap_event = pa_core_rttime_new(m->core, pa_rtclock_now() + SAP_INTERVAL, sap_event_cb, u); 515 u->inhibit_auto_suspend = inhibit_auto_suspend; 516 517 pa_source_output_put(u->source_output); 518 519 pa_modargs_free(ma); 520 521 return 0; 522 523fail: 524 if (ma) 525 pa_modargs_free(ma); 526 527 if (fd >= 0) 528 pa_close(fd); 529 530 if (sap_fd >= 0) 531 pa_close(sap_fd); 532 533 return -1; 534} 535 536void pa__done(pa_module*m) { 537 struct userdata *u; 538 pa_assert(m); 539 540 if (!(u = m->userdata)) 541 return; 542 543 if (u->sap_event) 544 m->core->mainloop->time_free(u->sap_event); 545 546 if (u->source_output) { 547 pa_source_output_unlink(u->source_output); 548 pa_source_output_unref(u->source_output); 549 } 550 551 pa_rtp_context_free(u->rtp_context); 552 553 pa_sap_send(&u->sap_context, 1); 554 pa_sap_context_destroy(&u->sap_context); 555 556 if (u->memblockq) 557 pa_memblockq_free(u->memblockq); 558 559 pa_xfree(u); 560} 561