1/*** 2 This file is part of PulseAudio. 3 4 Copyright 2013 Alexander Couzens 5 6 PulseAudio is free software; you can redistribute it and/or modify 7 it under the terms of the GNU Lesser General Public License as published 8 by the Free Software Foundation; either version 2.1 of the License, 9 or (at your option) any later version. 10 11 PulseAudio is distributed in the hope that it will be useful, but 12 WITHOUT ANY WARRANTY; without even the implied warranty of 13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 General Public License for more details. 15 16 You should have received a copy of the GNU Lesser General Public License 17 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 18***/ 19 20#ifdef HAVE_CONFIG_H 21#include <config.h> 22#endif 23 24#include "restart-module.h" 25 26#include <pulse/context.h> 27#include <pulse/timeval.h> 28#include <pulse/xmalloc.h> 29#include <pulse/stream.h> 30#include <pulse/mainloop.h> 31#include <pulse/introspect.h> 32#include <pulse/error.h> 33 34#include <pulsecore/core.h> 35#include <pulsecore/core-util.h> 36#include <pulsecore/i18n.h> 37#include <pulsecore/sink.h> 38#include <pulsecore/modargs.h> 39#include <pulsecore/log.h> 40#include <pulsecore/thread.h> 41#include <pulsecore/thread-mq.h> 42#include <pulsecore/poll.h> 43#include <pulsecore/rtpoll.h> 44#include <pulsecore/proplist-util.h> 45 46PA_MODULE_AUTHOR("Alexander Couzens"); 47PA_MODULE_DESCRIPTION("Create a network sink which connects via a stream to a remote PulseAudio server"); 48PA_MODULE_VERSION(PACKAGE_VERSION); 49PA_MODULE_LOAD_ONCE(false); 50PA_MODULE_USAGE( 51 "server=<address> " 52 "sink=<name of the remote sink> " 53 "sink_name=<name for the local sink> " 54 "sink_properties=<properties for the local sink> " 55 "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> " 56 "format=<sample format> " 57 "channels=<number of channels> " 58 "rate=<sample rate> " 59 "channel_map=<channel map> " 60 "cookie=<cookie file path>" 61 ); 62 63#define MAX_LATENCY_USEC (200 * PA_USEC_PER_MSEC) 64#define TUNNEL_THREAD_FAILED_MAINLOOP 1 65 66static int do_init(pa_module *m); 67static void do_done(pa_module *m); 68static void stream_state_cb(pa_stream *stream, void *userdata); 69static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata); 70static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata); 71static void context_state_cb(pa_context *c, void *userdata); 72static void sink_update_requested_latency_cb(pa_sink *s); 73 74struct tunnel_msg { 75 pa_msgobject parent; 76}; 77 78typedef struct tunnel_msg tunnel_msg; 79PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject); 80 81enum { 82 TUNNEL_MESSAGE_CREATE_SINK_REQUEST, 83 TUNNEL_MESSAGE_MAYBE_RESTART, 84}; 85 86enum { 87 TUNNEL_MESSAGE_SINK_CREATED = PA_SINK_MESSAGE_MAX, 88}; 89 90struct userdata { 91 pa_module *module; 92 pa_sink *sink; 93 pa_thread *thread; 94 pa_thread_mq *thread_mq; 95 pa_mainloop *thread_mainloop; 96 pa_mainloop_api *thread_mainloop_api; 97 98 pa_context *context; 99 pa_stream *stream; 100 pa_rtpoll *rtpoll; 101 102 bool update_stream_bufferattr_after_connect; 103 104 bool connected; 105 bool shutting_down; 106 107 char *cookie_file; 108 char *remote_server; 109 char *remote_sink_name; 110 char *sink_name; 111 112 pa_proplist *sink_proplist; 113 pa_sample_spec sample_spec; 114 pa_channel_map channel_map; 115 116 tunnel_msg *msg; 117 118 pa_usec_t reconnect_interval_us; 119}; 120 121struct module_restart_data { 122 struct userdata *userdata; 123 pa_restart_data *restart_data; 124}; 125 126static const char* const valid_modargs[] = { 127 "sink_name", 128 "sink_properties", 129 "server", 130 "sink", 131 "format", 132 "channels", 133 "rate", 134 "channel_map", 135 "cookie", 136 "reconnect_interval_ms", 137 NULL, 138}; 139 140static void cork_stream(struct userdata *u, bool cork) { 141 pa_operation *operation; 142 143 pa_assert(u); 144 pa_assert(u->stream); 145 146 if (cork) { 147 /* When the sink becomes suspended (which is the only case where we 148 * cork the stream), we don't want to keep any old data around, because 149 * the old data is most likely unrelated to the audio that will be 150 * played at the time when the sink starts running again. */ 151 if ((operation = pa_stream_flush(u->stream, NULL, NULL))) 152 pa_operation_unref(operation); 153 } 154 155 if ((operation = pa_stream_cork(u->stream, cork, NULL, NULL))) 156 pa_operation_unref(operation); 157} 158 159static void reset_bufferattr(pa_buffer_attr *bufferattr) { 160 pa_assert(bufferattr); 161 bufferattr->fragsize = (uint32_t) -1; 162 bufferattr->minreq = (uint32_t) -1; 163 bufferattr->maxlength = (uint32_t) -1; 164 bufferattr->prebuf = (uint32_t) -1; 165 bufferattr->tlength = (uint32_t) -1; 166} 167 168static pa_proplist* tunnel_new_proplist(struct userdata *u) { 169 pa_proplist *proplist = pa_proplist_new(); 170 pa_assert(proplist); 171 pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio"); 172 pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio"); 173 pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION); 174 pa_init_proplist(proplist); 175 176 return proplist; 177} 178 179static void thread_func(void *userdata) { 180 struct userdata *u = userdata; 181 pa_proplist *proplist; 182 183 pa_assert(u); 184 185 pa_log_debug("Thread starting up"); 186 pa_thread_mq_install(u->thread_mq); 187 188 proplist = tunnel_new_proplist(u); 189 u->context = pa_context_new_with_proplist(u->thread_mainloop_api, 190 "PulseAudio", 191 proplist); 192 pa_proplist_free(proplist); 193 194 if (!u->context) { 195 pa_log("Failed to create libpulse context"); 196 goto fail; 197 } 198 199 if (u->cookie_file && pa_context_load_cookie_from_file(u->context, u->cookie_file) != 0) { 200 pa_log_error("Can not load cookie file!"); 201 goto fail; 202 } 203 204 pa_context_set_state_callback(u->context, context_state_cb, u); 205 if (pa_context_connect(u->context, 206 u->remote_server, 207 PA_CONTEXT_NOAUTOSPAWN, 208 NULL) < 0) { 209 pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context))); 210 goto fail; 211 } 212 213 for (;;) { 214 int ret; 215 216 if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) { 217 if (ret == 0) 218 goto finish; 219 else 220 goto fail; 221 } 222 223 if (u->sink && PA_UNLIKELY(u->sink->thread_info.rewind_requested)) 224 pa_sink_process_rewind(u->sink, 0); 225 226 if (u->connected && 227 pa_stream_get_state(u->stream) == PA_STREAM_READY && 228 PA_SINK_IS_LINKED(u->sink->thread_info.state)) { 229 size_t writable; 230 231 writable = pa_stream_writable_size(u->stream); 232 if (writable > 0) { 233 pa_memchunk memchunk; 234 const void *p; 235 236 pa_sink_render_full(u->sink, writable, &memchunk); 237 238 pa_assert(memchunk.length > 0); 239 240 /* we have new data to write */ 241 p = pa_memblock_acquire(memchunk.memblock); 242 /* TODO: Use pa_stream_begin_write() to reduce copying. */ 243 ret = pa_stream_write(u->stream, 244 (uint8_t*) p + memchunk.index, 245 memchunk.length, 246 NULL, /**< A cleanup routine for the data or NULL to request an internal copy */ 247 0, /** offset */ 248 PA_SEEK_RELATIVE); 249 pa_memblock_release(memchunk.memblock); 250 pa_memblock_unref(memchunk.memblock); 251 252 if (ret != 0) { 253 pa_log_error("Could not write data into the stream ... ret = %i", ret); 254 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); 255 } 256 257 } 258 } 259 } 260fail: 261 /* send a message to the ctl thread to ask it to either terminate us, or 262 * restart us, but either way this thread will exit, so then wait for the 263 * shutdown message */ 264 pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL); 265 pa_asyncmsgq_wait_for(u->thread_mq->inq, PA_MESSAGE_SHUTDOWN); 266 267finish: 268 if (u->stream) { 269 pa_stream_disconnect(u->stream); 270 pa_stream_unref(u->stream); 271 u->stream = NULL; 272 } 273 274 if (u->context) { 275 pa_context_disconnect(u->context); 276 pa_context_unref(u->context); 277 u->context = NULL; 278 } 279 280 pa_log_debug("Thread shutting down"); 281} 282 283static void stream_state_cb(pa_stream *stream, void *userdata) { 284 struct userdata *u = userdata; 285 286 pa_assert(u); 287 288 switch (pa_stream_get_state(stream)) { 289 case PA_STREAM_FAILED: 290 pa_log_error("Stream failed."); 291 u->connected = false; 292 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); 293 break; 294 case PA_STREAM_TERMINATED: 295 pa_log_debug("Stream terminated."); 296 break; 297 case PA_STREAM_READY: 298 if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) 299 cork_stream(u, false); 300 301 /* Only call our requested_latency_cb when requested_latency 302 * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because 303 * we don't want to override the initial tlength set by the server 304 * without a good reason. */ 305 if (u->update_stream_bufferattr_after_connect) 306 sink_update_requested_latency_cb(u->sink); 307 else 308 stream_changed_buffer_attr_cb(stream, userdata); 309 case PA_STREAM_CREATING: 310 case PA_STREAM_UNCONNECTED: 311 break; 312 } 313} 314 315/* called when remote server changes the stream buffer_attr */ 316static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata) { 317 struct userdata *u = userdata; 318 const pa_buffer_attr *bufferattr; 319 pa_assert(u); 320 321 bufferattr = pa_stream_get_buffer_attr(u->stream); 322 pa_sink_set_max_request_within_thread(u->sink, bufferattr->tlength); 323 324 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu.", 325 (unsigned long) bufferattr->tlength); 326} 327 328/* called after we requested a change of the stream buffer_attr */ 329static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata) { 330 stream_changed_buffer_attr_cb(stream, userdata); 331} 332 333/* called when the server experiences an underrun of our buffer */ 334static void stream_underflow_callback(pa_stream *stream, void *userdata) { 335 pa_log_info("Server signalled buffer underrun."); 336} 337 338/* called when the server experiences an overrun of our buffer */ 339static void stream_overflow_callback(pa_stream *stream, void *userdata) { 340 pa_log_info("Server signalled buffer overrun."); 341} 342 343/* Do a reinit of the module. Note that u will be freed as a result of this 344 * call. */ 345static void maybe_restart(struct module_restart_data *rd) { 346 struct userdata *u = rd->userdata; 347 348 if (rd->restart_data) { 349 pa_log_debug("Restart already pending"); 350 return; 351 } 352 353 if (u->reconnect_interval_us > 0) { 354 /* The handle returned here must be freed when do_init() finishes successfully 355 * and when the module exits. */ 356 rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us); 357 } else { 358 /* exit the module */ 359 pa_module_unload_request(u->module, true); 360 } 361} 362 363static void on_sink_created(struct userdata *u) { 364 pa_proplist *proplist; 365 pa_buffer_attr bufferattr; 366 pa_usec_t requested_latency; 367 char *username = pa_get_user_name_malloc(); 368 char *hostname = pa_get_host_name_malloc(); 369 /* TODO: old tunnel put here the remote sink_name into stream name e.g. 'Null Output for lynxis@lazus' */ 370 char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname); 371 pa_xfree(hostname); 372 pa_xfree(username); 373 374 pa_assert_io_context(); 375 376 /* if we still don't have a sink, then sink creation failed, and we should 377 * kill this io thread */ 378 if (!u->sink) { 379 pa_log_error("Could not create a sink."); 380 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); 381 return; 382 } 383 384 proplist = tunnel_new_proplist(u); 385 u->stream = pa_stream_new_with_proplist(u->context, 386 stream_name, 387 &u->sink->sample_spec, 388 &u->sink->channel_map, 389 proplist); 390 pa_proplist_free(proplist); 391 pa_xfree(stream_name); 392 393 if (!u->stream) { 394 pa_log_error("Could not create a stream."); 395 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); 396 return; 397 } 398 399 requested_latency = pa_sink_get_requested_latency_within_thread(u->sink); 400 if (requested_latency == (pa_usec_t) -1) 401 requested_latency = u->sink->thread_info.max_latency; 402 403 reset_bufferattr(&bufferattr); 404 bufferattr.tlength = pa_usec_to_bytes(requested_latency, &u->sink->sample_spec); 405 406 pa_log_debug("tlength requested at %lu.", (unsigned long) bufferattr.tlength); 407 408 pa_stream_set_state_callback(u->stream, stream_state_cb, u); 409 pa_stream_set_buffer_attr_callback(u->stream, stream_changed_buffer_attr_cb, u); 410 pa_stream_set_underflow_callback(u->stream, stream_underflow_callback, u); 411 pa_stream_set_overflow_callback(u->stream, stream_overflow_callback, u); 412 if (pa_stream_connect_playback(u->stream, 413 u->remote_sink_name, 414 &bufferattr, 415 PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_DONT_MOVE | PA_STREAM_START_CORKED | PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_ADJUST_LATENCY, 416 NULL, 417 NULL) < 0) { 418 pa_log_error("Could not connect stream."); 419 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); 420 } 421 u->connected = true; 422} 423 424static void context_state_cb(pa_context *c, void *userdata) { 425 struct userdata *u = userdata; 426 pa_assert(u); 427 428 switch (pa_context_get_state(c)) { 429 case PA_CONTEXT_UNCONNECTED: 430 case PA_CONTEXT_CONNECTING: 431 case PA_CONTEXT_AUTHORIZING: 432 case PA_CONTEXT_SETTING_NAME: 433 break; 434 case PA_CONTEXT_READY: 435 /* now that we're connected, ask the control thread to create a sink for 436 * us, and wait for that to complete before proceeding, we'll 437 * receive TUNNEL_MESSAGE_SINK_CREATED in response when the sink is 438 * created (see sink_process_msg_cb()) */ 439 pa_log_debug("Connection successful. Creating stream."); 440 pa_assert(!u->stream); 441 pa_assert(!u->sink); 442 443 pa_log_debug("Asking ctl thread to create sink."); 444 pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SINK_REQUEST, u, 0, NULL, NULL); 445 break; 446 case PA_CONTEXT_FAILED: 447 pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context))); 448 u->connected = false; 449 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); 450 break; 451 case PA_CONTEXT_TERMINATED: 452 pa_log_debug("Context terminated."); 453 u->connected = false; 454 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); 455 break; 456 } 457} 458 459static void sink_update_requested_latency_cb(pa_sink *s) { 460 struct userdata *u; 461 pa_operation *operation; 462 size_t nbytes; 463 pa_usec_t block_usec; 464 pa_buffer_attr bufferattr; 465 466 pa_sink_assert_ref(s); 467 pa_assert_se(u = s->userdata); 468 469 block_usec = pa_sink_get_requested_latency_within_thread(s); 470 if (block_usec == (pa_usec_t) -1) 471 block_usec = s->thread_info.max_latency; 472 473 nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec); 474 pa_sink_set_max_request_within_thread(s, nbytes); 475 476 if (u->stream) { 477 switch (pa_stream_get_state(u->stream)) { 478 case PA_STREAM_READY: 479 if (pa_stream_get_buffer_attr(u->stream)->tlength == nbytes) 480 break; 481 482 pa_log_debug("Requesting new buffer attrs. tlength requested at %lu.", 483 (unsigned long) nbytes); 484 485 reset_bufferattr(&bufferattr); 486 bufferattr.tlength = nbytes; 487 if ((operation = pa_stream_set_buffer_attr(u->stream, &bufferattr, stream_set_buffer_attr_cb, u))) 488 pa_operation_unref(operation); 489 break; 490 case PA_STREAM_CREATING: 491 /* we have to delay our request until stream is ready */ 492 u->update_stream_bufferattr_after_connect = true; 493 break; 494 default: 495 break; 496 } 497 } 498} 499 500static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { 501 struct userdata *u = PA_SINK(o)->userdata; 502 503 switch (code) { 504 case PA_SINK_MESSAGE_GET_LATENCY: { 505 int negative; 506 pa_usec_t remote_latency; 507 508 if (!PA_SINK_IS_LINKED(u->sink->thread_info.state)) { 509 *((int64_t*) data) = 0; 510 return 0; 511 } 512 513 if (!u->stream) { 514 *((int64_t*) data) = 0; 515 return 0; 516 } 517 518 if (pa_stream_get_state(u->stream) != PA_STREAM_READY) { 519 *((int64_t*) data) = 0; 520 return 0; 521 } 522 523 if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) { 524 *((int64_t*) data) = 0; 525 return 0; 526 } 527 528 *((int64_t*) data) = remote_latency; 529 return 0; 530 } 531 case TUNNEL_MESSAGE_SINK_CREATED: 532 on_sink_created(u); 533 return 0; 534 } 535 return pa_sink_process_msg(o, code, data, offset, chunk); 536} 537 538/* Called from the IO thread. */ 539static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) { 540 struct userdata *u; 541 542 pa_assert(s); 543 pa_assert_se(u = s->userdata); 544 545 /* It may be that only the suspend cause is changing, in which case there's 546 * nothing to do. */ 547 if (new_state == s->thread_info.state) 548 return 0; 549 550 if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY) 551 return 0; 552 553 switch (new_state) { 554 case PA_SINK_SUSPENDED: { 555 cork_stream(u, true); 556 break; 557 } 558 case PA_SINK_IDLE: 559 case PA_SINK_RUNNING: { 560 cork_stream(u, false); 561 break; 562 } 563 case PA_SINK_INVALID_STATE: 564 case PA_SINK_INIT: 565 case PA_SINK_UNLINKED: 566 break; 567 } 568 569 return 0; 570} 571 572/* Creates a sink in the main thread. 573 * 574 * This method is called when we receive a message from the io thread that a 575 * connection has been established with the server. We defer creation of the 576 * sink until the connection is established, because we don't have a sink if 577 * the remote server isn't there. 578 */ 579static void create_sink(struct userdata *u) { 580 pa_sink_new_data sink_data; 581 582 pa_assert_ctl_context(); 583 584 /* Create sink */ 585 pa_sink_new_data_init(&sink_data); 586 sink_data.driver = __FILE__; 587 sink_data.module = u->module; 588 589 pa_sink_new_data_set_name(&sink_data, u->sink_name); 590 pa_sink_new_data_set_sample_spec(&sink_data, &u->sample_spec); 591 pa_sink_new_data_set_channel_map(&sink_data, &u->channel_map); 592 593 pa_proplist_update(sink_data.proplist, PA_UPDATE_REPLACE, u->sink_proplist); 594 595 if (!(u->sink = pa_sink_new(u->module->core, &sink_data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY | PA_SINK_NETWORK))) { 596 pa_log("Failed to create sink."); 597 goto finish; 598 } 599 600 u->sink->userdata = u; 601 u->sink->parent.process_msg = sink_process_msg_cb; 602 u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; 603 u->sink->update_requested_latency = sink_update_requested_latency_cb; 604 pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC); 605 606 /* set thread message queue */ 607 pa_sink_set_asyncmsgq(u->sink, u->thread_mq->inq); 608 pa_sink_set_rtpoll(u->sink, u->rtpoll); 609 610 pa_sink_put(u->sink); 611 612finish: 613 pa_sink_new_data_done(&sink_data); 614 615 /* tell any interested io threads that the sink they asked for has now been 616 * created (even if we failed, we still notify the thread, so they can 617 * either handle or kill the thread, rather than deadlock waiting for a 618 * message that will never come */ 619 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), TUNNEL_MESSAGE_SINK_CREATED, u, 0, NULL); 620} 621 622/* Runs in PA mainloop context */ 623static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { 624 struct userdata *u = (struct userdata *) data; 625 626 pa_assert(u); 627 pa_assert_ctl_context(); 628 629 if (u->shutting_down) 630 return 0; 631 632 switch (code) { 633 case TUNNEL_MESSAGE_CREATE_SINK_REQUEST: 634 create_sink(u); 635 break; 636 case TUNNEL_MESSAGE_MAYBE_RESTART: 637 maybe_restart(u->module->userdata); 638 break; 639 } 640 641 return 0; 642} 643 644static int do_init(pa_module *m) { 645 struct userdata *u = NULL; 646 struct module_restart_data *rd; 647 pa_modargs *ma = NULL; 648 const char *remote_server = NULL; 649 char *default_sink_name = NULL; 650 uint32_t reconnect_interval_ms = 0; 651 652 pa_assert(m); 653 pa_assert(m->userdata); 654 655 rd = m->userdata; 656 657 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { 658 pa_log("Failed to parse module arguments."); 659 goto fail; 660 } 661 662 u = pa_xnew0(struct userdata, 1); 663 u->module = m; 664 rd->userdata = u; 665 666 u->sample_spec = m->core->default_sample_spec; 667 u->channel_map = m->core->default_channel_map; 668 if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) { 669 pa_log("Invalid sample format specification or channel map"); 670 goto fail; 671 } 672 673 remote_server = pa_modargs_get_value(ma, "server", NULL); 674 if (!remote_server) { 675 pa_log("No server given!"); 676 goto fail; 677 } 678 679 u->remote_server = pa_xstrdup(remote_server); 680 u->thread_mainloop = pa_mainloop_new(); 681 if (u->thread_mainloop == NULL) { 682 pa_log("Failed to create mainloop"); 683 goto fail; 684 } 685 u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop); 686 u->cookie_file = pa_xstrdup(pa_modargs_get_value(ma, "cookie", NULL)); 687 u->remote_sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL)); 688 689 u->thread_mq = pa_xnew0(pa_thread_mq, 1); 690 691 if (pa_thread_mq_init_thread_mainloop(u->thread_mq, m->core->mainloop, u->thread_mainloop_api) < 0) { 692 pa_log("pa_thread_mq_init_thread_mainloop() failed."); 693 goto fail; 694 } 695 696 u->msg = pa_msgobject_new(tunnel_msg); 697 u->msg->parent.process_msg = tunnel_process_msg; 698 699 /* The rtpoll created here is never run. It is only necessary to avoid crashes 700 * when module-tunnel-sink-new is used together with module-loopback or 701 * module-combine-sink. Both modules base their asyncmsq on the rtpoll provided 702 * by the sink. module-loopback and combine-sink only work because they call 703 * pa_asyncmsq_process_one() themselves. module_rtp_recv also uses the rtpoll, 704 * but never calls pa_asyncmsq_process_one(), so it will not work in combination 705 * with module-tunnel-sink-new. */ 706 u->rtpoll = pa_rtpoll_new(); 707 708 default_sink_name = pa_sprintf_malloc("tunnel-sink-new.%s", remote_server); 709 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", default_sink_name)); 710 711 u->sink_proplist = pa_proplist_new(); 712 pa_proplist_sets(u->sink_proplist, PA_PROP_DEVICE_CLASS, "sound"); 713 pa_proplist_setf(u->sink_proplist, 714 PA_PROP_DEVICE_DESCRIPTION, 715 _("Tunnel to %s/%s"), 716 remote_server, 717 pa_strempty(u->remote_sink_name)); 718 719 if (pa_modargs_get_proplist(ma, "sink_properties", u->sink_proplist, PA_UPDATE_REPLACE) < 0) { 720 pa_log("Invalid properties"); 721 goto fail; 722 } 723 724 pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms); 725 u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC; 726 727 if (!(u->thread = pa_thread_new("tunnel-sink", thread_func, u))) { 728 pa_log("Failed to create thread."); 729 goto fail; 730 } 731 732 /* If the module is restarting and do_init() finishes successfully, the 733 * restart data is no longer needed. If do_init() fails, don't touch the 734 * restart data, because following restart attempts will continue to use 735 * the same data. If restart_data is NULL, that means no restart is 736 * currently pending. */ 737 if (rd->restart_data) { 738 pa_restart_free(rd->restart_data); 739 rd->restart_data = NULL; 740 } 741 742 pa_modargs_free(ma); 743 pa_xfree(default_sink_name); 744 745 return 0; 746 747fail: 748 if (ma) 749 pa_modargs_free(ma); 750 751 if (default_sink_name) 752 pa_xfree(default_sink_name); 753 754 return -1; 755} 756 757static void do_done(pa_module *m) { 758 struct userdata *u = NULL; 759 struct module_restart_data *rd; 760 761 pa_assert(m); 762 763 if (!(rd = m->userdata)) 764 return; 765 if (!(u = rd->userdata)) 766 return; 767 768 u->shutting_down = true; 769 770 if (u->sink) 771 pa_sink_unlink(u->sink); 772 773 if (u->thread) { 774 pa_asyncmsgq_send(u->thread_mq->inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL); 775 pa_thread_free(u->thread); 776 } 777 778 if (u->thread_mq) { 779 pa_thread_mq_done(u->thread_mq); 780 pa_xfree(u->thread_mq); 781 } 782 783 if (u->thread_mainloop) 784 pa_mainloop_free(u->thread_mainloop); 785 786 if (u->cookie_file) 787 pa_xfree(u->cookie_file); 788 789 if (u->remote_sink_name) 790 pa_xfree(u->remote_sink_name); 791 792 if (u->remote_server) 793 pa_xfree(u->remote_server); 794 795 if (u->sink) 796 pa_sink_unref(u->sink); 797 798 if (u->rtpoll) 799 pa_rtpoll_free(u->rtpoll); 800 801 if (u->sink_proplist) 802 pa_proplist_free(u->sink_proplist); 803 804 if (u->sink_name) 805 pa_xfree(u->sink_name); 806 807 pa_xfree(u->msg); 808 809 pa_xfree(u); 810 811 rd->userdata = NULL; 812} 813 814int pa__init(pa_module *m) { 815 int ret; 816 817 pa_assert(m); 818 819 m->userdata = pa_xnew0(struct module_restart_data, 1); 820 821 ret = do_init(m); 822 823 if (ret < 0) 824 pa__done(m); 825 826 return ret; 827} 828 829void pa__done(pa_module *m) { 830 pa_assert(m); 831 832 do_done(m); 833 834 if (m->userdata) { 835 struct module_restart_data *rd = m->userdata; 836 837 if (rd->restart_data) 838 pa_restart_free(rd->restart_data); 839 840 pa_xfree(m->userdata); 841 } 842} 843