1/*** 2 This file is part of PulseAudio. 3 4 Copyright 2013 Alexander Couzens 5 6 PulseAudio is free software; you can redistribute it and/or modify 7 it under the terms of the GNU Lesser General Public License as published 8 by the Free Software Foundation; either version 2.1 of the License, 9 or (at your option) any later version. 10 11 PulseAudio is distributed in the hope that it will be useful, but 12 WITHOUT ANY WARRANTY; without even the implied warranty of 13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 General Public License for more details. 15 16 You should have received a copy of the GNU Lesser General Public License 17 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 18***/ 19 20#ifdef HAVE_CONFIG_H 21#include <config.h> 22#endif 23 24#include "restart-module.h" 25 26#include <pulse/context.h> 27#include <pulse/timeval.h> 28#include <pulse/xmalloc.h> 29#include <pulse/stream.h> 30#include <pulse/mainloop.h> 31#include <pulse/introspect.h> 32#include <pulse/error.h> 33 34#include <pulsecore/core.h> 35#include <pulsecore/core-util.h> 36#include <pulsecore/i18n.h> 37#include <pulsecore/source.h> 38#include <pulsecore/modargs.h> 39#include <pulsecore/log.h> 40#include <pulsecore/thread.h> 41#include <pulsecore/thread-mq.h> 42#include <pulsecore/poll.h> 43#include <pulsecore/rtpoll.h> 44#include <pulsecore/proplist-util.h> 45 46PA_MODULE_AUTHOR("Alexander Couzens"); 47PA_MODULE_DESCRIPTION("Create a network source which connects via a stream to a remote PulseAudio server"); 48PA_MODULE_VERSION(PACKAGE_VERSION); 49PA_MODULE_LOAD_ONCE(false); 50PA_MODULE_USAGE( 51 "server=<address> " 52 "source=<name of the remote source> " 53 "source_name=<name for the local source> " 54 "source_properties=<properties for the local source> " 55 "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> " 56 "format=<sample format> " 57 "channels=<number of channels> " 58 "rate=<sample rate> " 59 "channel_map=<channel map> " 60 "cookie=<cookie file path>" 61 ); 62 63#define TUNNEL_THREAD_FAILED_MAINLOOP 1 64 65static int do_init(pa_module *m); 66static void do_done(pa_module *m); 67static void stream_state_cb(pa_stream *stream, void *userdata); 68static void stream_read_cb(pa_stream *s, size_t length, void *userdata); 69static void context_state_cb(pa_context *c, void *userdata); 70static void source_update_requested_latency_cb(pa_source *s); 71 72struct tunnel_msg { 73 pa_msgobject parent; 74}; 75 76typedef struct tunnel_msg tunnel_msg; 77PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject); 78 79enum { 80 TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST, 81 TUNNEL_MESSAGE_MAYBE_RESTART, 82}; 83 84enum { 85 TUNNEL_MESSAGE_SOURCE_CREATED = PA_SOURCE_MESSAGE_MAX, 86}; 87 88struct userdata { 89 pa_module *module; 90 pa_source *source; 91 pa_thread *thread; 92 pa_thread_mq *thread_mq; 93 pa_mainloop *thread_mainloop; 94 pa_mainloop_api *thread_mainloop_api; 95 96 pa_context *context; 97 pa_stream *stream; 98 pa_rtpoll *rtpoll; 99 100 bool update_stream_bufferattr_after_connect; 101 bool connected; 102 bool shutting_down; 103 bool new_data; 104 105 char *cookie_file; 106 char *remote_server; 107 char *remote_source_name; 108 char *source_name; 109 110 pa_proplist *source_proplist; 111 pa_sample_spec sample_spec; 112 pa_channel_map channel_map; 113 114 tunnel_msg *msg; 115 116 pa_usec_t reconnect_interval_us; 117}; 118 119struct module_restart_data { 120 struct userdata *userdata; 121 pa_restart_data *restart_data; 122}; 123 124static const char* const valid_modargs[] = { 125 "source_name", 126 "source_properties", 127 "server", 128 "source", 129 "format", 130 "channels", 131 "rate", 132 "channel_map", 133 "cookie", 134 "reconnect_interval_ms", 135 NULL, 136}; 137 138static void cork_stream(struct userdata *u, bool cork) { 139 pa_operation *operation; 140 141 pa_assert(u); 142 pa_assert(u->stream); 143 144 if ((operation = pa_stream_cork(u->stream, cork, NULL, NULL))) 145 pa_operation_unref(operation); 146} 147 148static void reset_bufferattr(pa_buffer_attr *bufferattr) { 149 pa_assert(bufferattr); 150 bufferattr->fragsize = (uint32_t) -1; 151 bufferattr->minreq = (uint32_t) -1; 152 bufferattr->maxlength = (uint32_t) -1; 153 bufferattr->prebuf = (uint32_t) -1; 154 bufferattr->tlength = (uint32_t) -1; 155} 156 157static pa_proplist* tunnel_new_proplist(struct userdata *u) { 158 pa_proplist *proplist = pa_proplist_new(); 159 pa_assert(proplist); 160 pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio"); 161 pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio"); 162 pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION); 163 pa_init_proplist(proplist); 164 165 return proplist; 166} 167 168static void stream_read_cb(pa_stream *s, size_t length, void *userdata) { 169 struct userdata *u = userdata; 170 u->new_data = true; 171} 172 173/* called from io context to read samples from the stream into our source */ 174static void read_new_samples(struct userdata *u) { 175 const void *p; 176 size_t readable = 0; 177 pa_memchunk memchunk; 178 179 pa_assert(u); 180 u->new_data = false; 181 182 pa_memchunk_reset(&memchunk); 183 184 if (PA_UNLIKELY(!u->connected || pa_stream_get_state(u->stream) != PA_STREAM_READY)) 185 return; 186 187 readable = pa_stream_readable_size(u->stream); 188 while (readable > 0) { 189 size_t nbytes = 0; 190 if (PA_UNLIKELY(pa_stream_peek(u->stream, &p, &nbytes) != 0)) { 191 pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u->context))); 192 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); 193 return; 194 } 195 196 if (PA_LIKELY(p)) { 197 /* we have valid data */ 198 memchunk.memblock = pa_memblock_new_fixed(u->module->core->mempool, (void *) p, nbytes, true); 199 memchunk.length = nbytes; 200 memchunk.index = 0; 201 202 pa_source_post(u->source, &memchunk); 203 pa_memblock_unref_fixed(memchunk.memblock); 204 } else { 205 size_t bytes_to_generate = nbytes; 206 207 /* we have a hole. generate silence */ 208 memchunk = u->source->silence; 209 pa_memblock_ref(memchunk.memblock); 210 211 while (bytes_to_generate > 0) { 212 if (bytes_to_generate < memchunk.length) 213 memchunk.length = bytes_to_generate; 214 215 pa_source_post(u->source, &memchunk); 216 bytes_to_generate -= memchunk.length; 217 } 218 219 pa_memblock_unref(memchunk.memblock); 220 } 221 222 pa_stream_drop(u->stream); 223 readable -= nbytes; 224 } 225} 226 227static void thread_func(void *userdata) { 228 struct userdata *u = userdata; 229 pa_proplist *proplist; 230 231 pa_assert(u); 232 233 pa_log_debug("Thread starting up"); 234 pa_thread_mq_install(u->thread_mq); 235 236 proplist = tunnel_new_proplist(u); 237 u->context = pa_context_new_with_proplist(u->thread_mainloop_api, 238 "PulseAudio", 239 proplist); 240 pa_proplist_free(proplist); 241 242 if (!u->context) { 243 pa_log("Failed to create libpulse context"); 244 goto fail; 245 } 246 247 if (u->cookie_file && pa_context_load_cookie_from_file(u->context, u->cookie_file) != 0) { 248 pa_log_error("Can not load cookie file!"); 249 goto fail; 250 } 251 252 pa_context_set_state_callback(u->context, context_state_cb, u); 253 if (pa_context_connect(u->context, 254 u->remote_server, 255 PA_CONTEXT_NOAUTOSPAWN, 256 NULL) < 0) { 257 pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context))); 258 goto fail; 259 } 260 261 for (;;) { 262 int ret; 263 264 if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) { 265 if (ret == 0) 266 goto finish; 267 else 268 goto fail; 269 } 270 271 if (u->new_data) 272 read_new_samples(u); 273 } 274fail: 275 /* send a message to the ctl thread to ask it to either terminate us, or 276 * restart us, but either way this thread will exit, so then wait for the 277 * shutdown message */ 278 pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL); 279 pa_asyncmsgq_wait_for(u->thread_mq->inq, PA_MESSAGE_SHUTDOWN); 280 281finish: 282 if (u->stream) { 283 pa_stream_disconnect(u->stream); 284 pa_stream_unref(u->stream); 285 u->stream = NULL; 286 } 287 288 if (u->context) { 289 pa_context_disconnect(u->context); 290 pa_context_unref(u->context); 291 u->context = NULL; 292 } 293 294 pa_log_debug("Thread shutting down"); 295} 296 297static void stream_state_cb(pa_stream *stream, void *userdata) { 298 struct userdata *u = userdata; 299 300 pa_assert(u); 301 302 switch (pa_stream_get_state(stream)) { 303 case PA_STREAM_FAILED: 304 pa_log_error("Stream failed: %s", pa_strerror(pa_context_errno(u->context))); 305 u->connected = false; 306 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); 307 break; 308 case PA_STREAM_TERMINATED: 309 pa_log_debug("Stream terminated."); 310 break; 311 case PA_STREAM_READY: 312 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state)) 313 cork_stream(u, false); 314 315 /* Only call our requested_latency_cb when requested_latency 316 * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because 317 * we don't want to override the initial fragsize set by the server 318 * without a good reason. */ 319 if (u->update_stream_bufferattr_after_connect) 320 source_update_requested_latency_cb(u->source); 321 case PA_STREAM_UNCONNECTED: 322 case PA_STREAM_CREATING: 323 break; 324 } 325} 326 327/* Do a reinit of the module. Note that u will be freed as a result of this 328 * call. */ 329static void maybe_restart(struct module_restart_data *rd) { 330 struct userdata *u = rd->userdata; 331 332 if (rd->restart_data) { 333 pa_log_debug("Restart already pending"); 334 return; 335 } 336 337 if (u->reconnect_interval_us > 0) { 338 /* The handle returned here must be freed when do_init() finishes successfully 339 * and when the module exits. */ 340 rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us); 341 } else { 342 /* exit the module */ 343 pa_module_unload_request(u->module, true); 344 } 345} 346 347static void on_source_created(struct userdata *u) { 348 pa_proplist *proplist; 349 pa_buffer_attr bufferattr; 350 pa_usec_t requested_latency; 351 char *username = pa_get_user_name_malloc(); 352 char *hostname = pa_get_host_name_malloc(); 353 /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for lynxis@lazus' */ 354 char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname); 355 pa_xfree(username); 356 pa_xfree(hostname); 357 358 pa_assert_io_context(); 359 360 /* if we still don't have a source, then source creation failed, and we 361 * should kill this io thread */ 362 if (!u->source) { 363 pa_log_error("Could not create a source."); 364 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); 365 return; 366 } 367 368 proplist = tunnel_new_proplist(u); 369 u->stream = pa_stream_new_with_proplist(u->context, 370 stream_name, 371 &u->source->sample_spec, 372 &u->source->channel_map, 373 proplist); 374 pa_proplist_free(proplist); 375 pa_xfree(stream_name); 376 377 if (!u->stream) { 378 pa_log_error("Could not create a stream: %s", pa_strerror(pa_context_errno(u->context))); 379 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); 380 return; 381 } 382 383 requested_latency = pa_source_get_requested_latency_within_thread(u->source); 384 if (requested_latency == (uint32_t) -1) 385 requested_latency = u->source->thread_info.max_latency; 386 387 reset_bufferattr(&bufferattr); 388 bufferattr.fragsize = pa_usec_to_bytes(requested_latency, &u->source->sample_spec); 389 390 pa_stream_set_state_callback(u->stream, stream_state_cb, u); 391 pa_stream_set_read_callback(u->stream, stream_read_cb, u); 392 if (pa_stream_connect_record(u->stream, 393 u->remote_source_name, 394 &bufferattr, 395 PA_STREAM_INTERPOLATE_TIMING|PA_STREAM_DONT_MOVE|PA_STREAM_AUTO_TIMING_UPDATE|PA_STREAM_START_CORKED|PA_STREAM_ADJUST_LATENCY) < 0) { 396 pa_log_debug("Could not create stream: %s", pa_strerror(pa_context_errno(u->context))); 397 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); 398 } 399 u->connected = true; 400} 401 402static void context_state_cb(pa_context *c, void *userdata) { 403 struct userdata *u = userdata; 404 pa_assert(u); 405 406 switch (pa_context_get_state(c)) { 407 case PA_CONTEXT_UNCONNECTED: 408 case PA_CONTEXT_CONNECTING: 409 case PA_CONTEXT_AUTHORIZING: 410 case PA_CONTEXT_SETTING_NAME: 411 break; 412 case PA_CONTEXT_READY: 413 pa_log_debug("Connection successful. Creating stream."); 414 pa_assert(!u->stream); 415 pa_assert(!u->source); 416 417 pa_log_debug("Asking ctl thread to create source."); 418 pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST, u, 0, NULL, NULL); 419 break; 420 case PA_CONTEXT_FAILED: 421 pa_log_debug("Context failed with err %s.", pa_strerror(pa_context_errno(u->context))); 422 u->connected = false; 423 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); 424 break; 425 case PA_CONTEXT_TERMINATED: 426 pa_log_debug("Context terminated."); 427 u->connected = false; 428 u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); 429 break; 430 } 431} 432 433static void source_update_requested_latency_cb(pa_source *s) { 434 struct userdata *u; 435 pa_operation *operation; 436 size_t nbytes; 437 pa_usec_t block_usec; 438 pa_buffer_attr bufferattr; 439 440 pa_source_assert_ref(s); 441 pa_assert_se(u = s->userdata); 442 443 block_usec = pa_source_get_requested_latency_within_thread(s); 444 if (block_usec == (pa_usec_t) -1) 445 block_usec = s->thread_info.max_latency; 446 447 nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec); 448 449 if (u->stream) { 450 switch (pa_stream_get_state(u->stream)) { 451 case PA_STREAM_READY: 452 if (pa_stream_get_buffer_attr(u->stream)->fragsize == nbytes) 453 break; 454 455 reset_bufferattr(&bufferattr); 456 bufferattr.fragsize = nbytes; 457 if ((operation = pa_stream_set_buffer_attr(u->stream, &bufferattr, NULL, NULL))) 458 pa_operation_unref(operation); 459 break; 460 case PA_STREAM_CREATING: 461 /* we have to delay our request until stream is ready */ 462 u->update_stream_bufferattr_after_connect = true; 463 break; 464 default: 465 break; 466 } 467 } 468} 469 470static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { 471 struct userdata *u = PA_SOURCE(o)->userdata; 472 473 switch (code) { 474 case PA_SOURCE_MESSAGE_GET_LATENCY: { 475 int negative; 476 pa_usec_t remote_latency; 477 478 if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state)) { 479 *((int64_t*) data) = 0; 480 return 0; 481 } 482 483 if (!u->stream) { 484 *((int64_t*) data) = 0; 485 return 0; 486 } 487 488 if (pa_stream_get_state(u->stream) != PA_STREAM_READY) { 489 *((int64_t*) data) = 0; 490 return 0; 491 } 492 493 if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) { 494 *((int64_t*) data) = 0; 495 return 0; 496 } 497 498 if (negative) 499 *((int64_t*) data) = - (int64_t)remote_latency; 500 else 501 *((int64_t*) data) = remote_latency; 502 503 return 0; 504 } 505 case TUNNEL_MESSAGE_SOURCE_CREATED: 506 on_source_created(u); 507 return 0; 508 } 509 return pa_source_process_msg(o, code, data, offset, chunk); 510} 511 512/* Called from the IO thread. */ 513static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state, pa_suspend_cause_t new_suspend_cause) { 514 struct userdata *u; 515 516 pa_assert(s); 517 pa_assert_se(u = s->userdata); 518 519 /* It may be that only the suspend cause is changing, in which case there's 520 * nothing to do. */ 521 if (new_state == s->thread_info.state) 522 return 0; 523 524 if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY) 525 return 0; 526 527 switch (new_state) { 528 case PA_SOURCE_SUSPENDED: { 529 cork_stream(u, true); 530 break; 531 } 532 case PA_SOURCE_IDLE: 533 case PA_SOURCE_RUNNING: { 534 cork_stream(u, false); 535 break; 536 } 537 case PA_SOURCE_INVALID_STATE: 538 case PA_SOURCE_INIT: 539 case PA_SOURCE_UNLINKED: 540 break; 541 } 542 543 return 0; 544} 545 546/* Creates a source in the main thread. 547 * 548 * This method is called when we receive a message from the io thread that a 549 * connection has been established with the server. We defer creation of the 550 * source until the connection is established, because we don't have a source 551 * if the remote server isn't there. 552 */ 553static void create_source(struct userdata *u) { 554 pa_source_new_data source_data; 555 556 pa_assert_ctl_context(); 557 558 /* Create source */ 559 pa_source_new_data_init(&source_data); 560 source_data.driver = __FILE__; 561 source_data.module = u->module; 562 563 pa_source_new_data_set_name(&source_data, u->source_name); 564 pa_source_new_data_set_sample_spec(&source_data, &u->sample_spec); 565 pa_source_new_data_set_channel_map(&source_data, &u->channel_map); 566 567 pa_proplist_update(source_data.proplist, PA_UPDATE_REPLACE, u->source_proplist); 568 569 if (!(u->source = pa_source_new(u->module->core, &source_data, PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY | PA_SOURCE_NETWORK))) { 570 pa_log("Failed to create source."); 571 goto finish; 572 } 573 574 u->source->userdata = u; 575 u->source->parent.process_msg = source_process_msg_cb; 576 u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; 577 u->source->update_requested_latency = source_update_requested_latency_cb; 578 579 pa_source_set_asyncmsgq(u->source, u->thread_mq->inq); 580 pa_source_set_rtpoll(u->source, u->rtpoll); 581 582 pa_source_put(u->source); 583 584finish: 585 pa_source_new_data_done(&source_data); 586 587 /* tell any interested io threads that the sink they asked for has now been 588 * created (even if we failed, we still notify the thread, so they can 589 * either handle or kill the thread, rather than deadlock waiting for a 590 * message that will never come */ 591 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), TUNNEL_MESSAGE_SOURCE_CREATED, u, 0, NULL); 592} 593 594/* Runs in PA mainloop context */ 595static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { 596 struct userdata *u = (struct userdata *) data; 597 598 pa_assert(u); 599 pa_assert_ctl_context(); 600 601 if (u->shutting_down) 602 return 0; 603 604 switch (code) { 605 case TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST: 606 create_source(u); 607 break; 608 case TUNNEL_MESSAGE_MAYBE_RESTART: 609 maybe_restart(u->module->userdata); 610 break; 611 } 612 613 return 0; 614} 615 616static int do_init(pa_module *m) { 617 struct userdata *u = NULL; 618 struct module_restart_data *rd; 619 pa_modargs *ma = NULL; 620 const char *remote_server = NULL; 621 char *default_source_name = NULL; 622 uint32_t reconnect_interval_ms = 0; 623 624 pa_assert(m); 625 pa_assert(m->userdata); 626 627 rd = m->userdata; 628 629 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { 630 pa_log("Failed to parse module arguments."); 631 goto fail; 632 } 633 634 u = pa_xnew0(struct userdata, 1); 635 u->module = m; 636 rd->userdata = u; 637 638 u->sample_spec = m->core->default_sample_spec; 639 u->channel_map = m->core->default_channel_map; 640 if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) { 641 pa_log("Invalid sample format specification or channel map"); 642 goto fail; 643 } 644 645 remote_server = pa_modargs_get_value(ma, "server", NULL); 646 if (!remote_server) { 647 pa_log("No server given!"); 648 goto fail; 649 } 650 651 u->remote_server = pa_xstrdup(remote_server); 652 u->thread_mainloop = pa_mainloop_new(); 653 if (u->thread_mainloop == NULL) { 654 pa_log("Failed to create mainloop"); 655 goto fail; 656 } 657 u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop); 658 u->cookie_file = pa_xstrdup(pa_modargs_get_value(ma, "cookie", NULL)); 659 u->remote_source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL)); 660 661 u->thread_mq = pa_xnew0(pa_thread_mq, 1); 662 663 if (pa_thread_mq_init_thread_mainloop(u->thread_mq, m->core->mainloop, u->thread_mainloop_api) < 0) { 664 pa_log("pa_thread_mq_init_thread_mainloop() failed."); 665 goto fail; 666 } 667 668 u->msg = pa_msgobject_new(tunnel_msg); 669 u->msg->parent.process_msg = tunnel_process_msg; 670 671 /* The rtpoll created here is never run. It is only necessary to avoid crashes 672 * when module-tunnel-source-new is used together with module-loopback. 673 * module-loopback bases the asyncmsq on the rtpoll provided by the source and 674 * only works because it calls pa_asyncmsq_process_one(). */ 675 u->rtpoll = pa_rtpoll_new(); 676 677 default_source_name = pa_sprintf_malloc("tunnel-source-new.%s", remote_server); 678 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", default_source_name)); 679 680 u->source_proplist = pa_proplist_new(); 681 pa_proplist_sets(u->source_proplist, PA_PROP_DEVICE_CLASS, "sound"); 682 pa_proplist_setf(u->source_proplist, 683 PA_PROP_DEVICE_DESCRIPTION, 684 _("Tunnel to %s/%s"), 685 remote_server, 686 pa_strempty(u->remote_source_name)); 687 688 if (pa_modargs_get_proplist(ma, "source_properties", u->source_proplist, PA_UPDATE_REPLACE) < 0) { 689 pa_log("Invalid properties"); 690 goto fail; 691 } 692 693 pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms); 694 u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC; 695 696 if (!(u->thread = pa_thread_new("tunnel-source", thread_func, u))) { 697 pa_log("Failed to create thread."); 698 goto fail; 699 } 700 701 /* If the module is restarting and do_init() finishes successfully, the 702 * restart data is no longer needed. If do_init() fails, don't touch the 703 * restart data, because following restart attempts will continue to use 704 * the same data. If restart_data is NULL, that means no restart is 705 * currently pending. */ 706 if (rd->restart_data) { 707 pa_restart_free(rd->restart_data); 708 rd->restart_data = NULL; 709 } 710 711 pa_modargs_free(ma); 712 pa_xfree(default_source_name); 713 714 return 0; 715 716fail: 717 if (ma) 718 pa_modargs_free(ma); 719 720 if (default_source_name) 721 pa_xfree(default_source_name); 722 723 return -1; 724} 725 726static void do_done(pa_module *m) { 727 struct userdata *u = NULL; 728 struct module_restart_data *rd; 729 730 pa_assert(m); 731 732 if (!(rd = m->userdata)) 733 return; 734 if (!(u = rd->userdata)) 735 return; 736 737 u->shutting_down = true; 738 739 if (u->source) 740 pa_source_unlink(u->source); 741 742 if (u->thread) { 743 pa_asyncmsgq_send(u->thread_mq->inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL); 744 pa_thread_free(u->thread); 745 } 746 747 if (u->thread_mq) { 748 pa_thread_mq_done(u->thread_mq); 749 pa_xfree(u->thread_mq); 750 } 751 752 if (u->thread_mainloop) 753 pa_mainloop_free(u->thread_mainloop); 754 755 if (u->cookie_file) 756 pa_xfree(u->cookie_file); 757 758 if (u->remote_source_name) 759 pa_xfree(u->remote_source_name); 760 761 if (u->remote_server) 762 pa_xfree(u->remote_server); 763 764 if (u->source) 765 pa_source_unref(u->source); 766 767 if (u->rtpoll) 768 pa_rtpoll_free(u->rtpoll); 769 770 if (u->source_proplist) 771 pa_proplist_free(u->source_proplist); 772 773 if (u->source_name) 774 pa_xfree(u->source_name); 775 776 pa_xfree(u->msg); 777 778 pa_xfree(u); 779 780 rd->userdata = NULL; 781} 782 783int pa__init(pa_module *m) { 784 int ret; 785 786 pa_assert(m); 787 788 m->userdata = pa_xnew0(struct module_restart_data, 1); 789 790 ret = do_init(m); 791 792 if (ret < 0) 793 pa__done(m); 794 795 return ret; 796} 797 798void pa__done(pa_module *m) { 799 pa_assert(m); 800 801 do_done(m); 802 803 if (m->userdata) { 804 struct module_restart_data *rd = m->userdata; 805 806 if (rd->restart_data) 807 pa_restart_free(rd->restart_data); 808 809 pa_xfree(m->userdata); 810 } 811} 812