1/***
2    This file is part of PulseAudio.
3
4    Copyright 2013 Alexander Couzens
5
6    PulseAudio is free software; you can redistribute it and/or modify
7    it under the terms of the GNU Lesser General Public License as published
8    by the Free Software Foundation; either version 2.1 of the License,
9    or (at your option) any later version.
10
11    PulseAudio is distributed in the hope that it will be useful, but
12    WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14    General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public License
17    along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18***/
19
20#ifdef HAVE_CONFIG_H
21#include <config.h>
22#endif
23
24#include "restart-module.h"
25
26#include <pulse/context.h>
27#include <pulse/timeval.h>
28#include <pulse/xmalloc.h>
29#include <pulse/stream.h>
30#include <pulse/mainloop.h>
31#include <pulse/introspect.h>
32#include <pulse/error.h>
33
34#include <pulsecore/core.h>
35#include <pulsecore/core-util.h>
36#include <pulsecore/i18n.h>
37#include <pulsecore/source.h>
38#include <pulsecore/modargs.h>
39#include <pulsecore/log.h>
40#include <pulsecore/thread.h>
41#include <pulsecore/thread-mq.h>
42#include <pulsecore/poll.h>
43#include <pulsecore/rtpoll.h>
44#include <pulsecore/proplist-util.h>
45
46PA_MODULE_AUTHOR("Alexander Couzens");
47PA_MODULE_DESCRIPTION("Create a network source which connects via a stream to a remote PulseAudio server");
48PA_MODULE_VERSION(PACKAGE_VERSION);
49PA_MODULE_LOAD_ONCE(false);
50PA_MODULE_USAGE(
51        "server=<address> "
52        "source=<name of the remote source> "
53        "source_name=<name for the local source> "
54        "source_properties=<properties for the local source> "
55        "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
56        "format=<sample format> "
57        "channels=<number of channels> "
58        "rate=<sample rate> "
59        "channel_map=<channel map> "
60        "cookie=<cookie file path>"
61        );
62
63#define TUNNEL_THREAD_FAILED_MAINLOOP 1
64
65static int do_init(pa_module *m);
66static void do_done(pa_module *m);
67static void stream_state_cb(pa_stream *stream, void *userdata);
68static void stream_read_cb(pa_stream *s, size_t length, void *userdata);
69static void context_state_cb(pa_context *c, void *userdata);
70static void source_update_requested_latency_cb(pa_source *s);
71
72struct tunnel_msg {
73    pa_msgobject parent;
74};
75
76typedef struct tunnel_msg tunnel_msg;
77PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
78
79enum {
80    TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST,
81    TUNNEL_MESSAGE_MAYBE_RESTART,
82};
83
84enum {
85    TUNNEL_MESSAGE_SOURCE_CREATED = PA_SOURCE_MESSAGE_MAX,
86};
87
88struct userdata {
89    pa_module *module;
90    pa_source *source;
91    pa_thread *thread;
92    pa_thread_mq *thread_mq;
93    pa_mainloop *thread_mainloop;
94    pa_mainloop_api *thread_mainloop_api;
95
96    pa_context *context;
97    pa_stream *stream;
98    pa_rtpoll *rtpoll;
99
100    bool update_stream_bufferattr_after_connect;
101    bool connected;
102    bool shutting_down;
103    bool new_data;
104
105    char *cookie_file;
106    char *remote_server;
107    char *remote_source_name;
108    char *source_name;
109
110    pa_proplist *source_proplist;
111    pa_sample_spec sample_spec;
112    pa_channel_map channel_map;
113
114    tunnel_msg *msg;
115
116    pa_usec_t reconnect_interval_us;
117};
118
119struct module_restart_data {
120    struct userdata *userdata;
121    pa_restart_data *restart_data;
122};
123
124static const char* const valid_modargs[] = {
125    "source_name",
126    "source_properties",
127    "server",
128    "source",
129    "format",
130    "channels",
131    "rate",
132    "channel_map",
133    "cookie",
134    "reconnect_interval_ms",
135    NULL,
136};
137
138static void cork_stream(struct userdata *u, bool cork) {
139    pa_operation *operation;
140
141    pa_assert(u);
142    pa_assert(u->stream);
143
144    if ((operation = pa_stream_cork(u->stream, cork, NULL, NULL)))
145        pa_operation_unref(operation);
146}
147
148static void reset_bufferattr(pa_buffer_attr *bufferattr) {
149    pa_assert(bufferattr);
150    bufferattr->fragsize = (uint32_t) -1;
151    bufferattr->minreq = (uint32_t) -1;
152    bufferattr->maxlength = (uint32_t) -1;
153    bufferattr->prebuf = (uint32_t) -1;
154    bufferattr->tlength = (uint32_t) -1;
155}
156
157static pa_proplist* tunnel_new_proplist(struct userdata *u) {
158    pa_proplist *proplist = pa_proplist_new();
159    pa_assert(proplist);
160    pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio");
161    pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
162    pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
163    pa_init_proplist(proplist);
164
165    return proplist;
166}
167
168static void stream_read_cb(pa_stream *s, size_t length, void *userdata) {
169    struct userdata *u = userdata;
170    u->new_data = true;
171}
172
173/* called from io context to read samples from the stream into our source */
174static void read_new_samples(struct userdata *u) {
175    const void *p;
176    size_t readable = 0;
177    pa_memchunk memchunk;
178
179    pa_assert(u);
180    u->new_data = false;
181
182    pa_memchunk_reset(&memchunk);
183
184    if (PA_UNLIKELY(!u->connected || pa_stream_get_state(u->stream) != PA_STREAM_READY))
185        return;
186
187    readable = pa_stream_readable_size(u->stream);
188    while (readable > 0) {
189        size_t nbytes = 0;
190        if (PA_UNLIKELY(pa_stream_peek(u->stream, &p, &nbytes) != 0)) {
191            pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u->context)));
192            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
193            return;
194        }
195
196        if (PA_LIKELY(p)) {
197            /* we have valid data */
198            memchunk.memblock = pa_memblock_new_fixed(u->module->core->mempool, (void *) p, nbytes, true);
199            memchunk.length = nbytes;
200            memchunk.index = 0;
201
202            pa_source_post(u->source, &memchunk);
203            pa_memblock_unref_fixed(memchunk.memblock);
204        } else {
205            size_t bytes_to_generate = nbytes;
206
207            /* we have a hole. generate silence */
208            memchunk = u->source->silence;
209            pa_memblock_ref(memchunk.memblock);
210
211            while (bytes_to_generate > 0) {
212                if (bytes_to_generate < memchunk.length)
213                    memchunk.length = bytes_to_generate;
214
215                pa_source_post(u->source, &memchunk);
216                bytes_to_generate -= memchunk.length;
217            }
218
219            pa_memblock_unref(memchunk.memblock);
220        }
221
222        pa_stream_drop(u->stream);
223        readable -= nbytes;
224    }
225}
226
227static void thread_func(void *userdata) {
228    struct userdata *u = userdata;
229    pa_proplist *proplist;
230
231    pa_assert(u);
232
233    pa_log_debug("Thread starting up");
234    pa_thread_mq_install(u->thread_mq);
235
236    proplist = tunnel_new_proplist(u);
237    u->context = pa_context_new_with_proplist(u->thread_mainloop_api,
238                                              "PulseAudio",
239                                              proplist);
240    pa_proplist_free(proplist);
241
242    if (!u->context) {
243        pa_log("Failed to create libpulse context");
244        goto fail;
245    }
246
247    if (u->cookie_file && pa_context_load_cookie_from_file(u->context, u->cookie_file) != 0) {
248        pa_log_error("Can not load cookie file!");
249        goto fail;
250    }
251
252    pa_context_set_state_callback(u->context, context_state_cb, u);
253    if (pa_context_connect(u->context,
254                           u->remote_server,
255                           PA_CONTEXT_NOAUTOSPAWN,
256                           NULL) < 0) {
257        pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context)));
258        goto fail;
259    }
260
261    for (;;) {
262        int ret;
263
264        if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) {
265            if (ret == 0)
266                goto finish;
267            else
268                goto fail;
269        }
270
271        if (u->new_data)
272            read_new_samples(u);
273    }
274fail:
275    /* send a message to the ctl thread to ask it to either terminate us, or
276     * restart us, but either way this thread will exit, so then wait for the
277     * shutdown message */
278    pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL);
279    pa_asyncmsgq_wait_for(u->thread_mq->inq, PA_MESSAGE_SHUTDOWN);
280
281finish:
282    if (u->stream) {
283        pa_stream_disconnect(u->stream);
284        pa_stream_unref(u->stream);
285        u->stream = NULL;
286    }
287
288    if (u->context) {
289        pa_context_disconnect(u->context);
290        pa_context_unref(u->context);
291        u->context = NULL;
292    }
293
294    pa_log_debug("Thread shutting down");
295}
296
297static void stream_state_cb(pa_stream *stream, void *userdata) {
298    struct userdata *u = userdata;
299
300    pa_assert(u);
301
302    switch (pa_stream_get_state(stream)) {
303        case PA_STREAM_FAILED:
304            pa_log_error("Stream failed: %s", pa_strerror(pa_context_errno(u->context)));
305            u->connected = false;
306            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
307            break;
308        case PA_STREAM_TERMINATED:
309            pa_log_debug("Stream terminated.");
310            break;
311        case PA_STREAM_READY:
312            if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
313                cork_stream(u, false);
314
315            /* Only call our requested_latency_cb when requested_latency
316             * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
317             * we don't want to override the initial fragsize set by the server
318             * without a good reason. */
319            if (u->update_stream_bufferattr_after_connect)
320                source_update_requested_latency_cb(u->source);
321        case PA_STREAM_UNCONNECTED:
322        case PA_STREAM_CREATING:
323            break;
324    }
325}
326
327/* Do a reinit of the module.  Note that u will be freed as a result of this
328 * call. */
329static void maybe_restart(struct module_restart_data *rd) {
330    struct userdata *u = rd->userdata;
331
332    if (rd->restart_data) {
333        pa_log_debug("Restart already pending");
334        return;
335    }
336
337    if (u->reconnect_interval_us > 0) {
338        /* The handle returned here must be freed when do_init() finishes successfully
339         * and when the module exits. */
340        rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us);
341    } else {
342        /* exit the module */
343        pa_module_unload_request(u->module, true);
344    }
345}
346
347static void on_source_created(struct userdata *u) {
348    pa_proplist *proplist;
349    pa_buffer_attr bufferattr;
350    pa_usec_t requested_latency;
351    char *username = pa_get_user_name_malloc();
352    char *hostname = pa_get_host_name_malloc();
353    /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for lynxis@lazus' */
354    char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
355    pa_xfree(username);
356    pa_xfree(hostname);
357
358    pa_assert_io_context();
359
360    /* if we still don't have a source, then source creation failed, and we
361     * should kill this io thread */
362    if (!u->source) {
363        pa_log_error("Could not create a source.");
364        u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
365        return;
366    }
367
368    proplist = tunnel_new_proplist(u);
369    u->stream = pa_stream_new_with_proplist(u->context,
370                                            stream_name,
371                                            &u->source->sample_spec,
372                                            &u->source->channel_map,
373                                            proplist);
374    pa_proplist_free(proplist);
375    pa_xfree(stream_name);
376
377    if (!u->stream) {
378        pa_log_error("Could not create a stream: %s", pa_strerror(pa_context_errno(u->context)));
379        u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
380        return;
381    }
382
383    requested_latency = pa_source_get_requested_latency_within_thread(u->source);
384    if (requested_latency == (uint32_t) -1)
385        requested_latency = u->source->thread_info.max_latency;
386
387    reset_bufferattr(&bufferattr);
388    bufferattr.fragsize = pa_usec_to_bytes(requested_latency, &u->source->sample_spec);
389
390    pa_stream_set_state_callback(u->stream, stream_state_cb, u);
391    pa_stream_set_read_callback(u->stream, stream_read_cb, u);
392    if (pa_stream_connect_record(u->stream,
393                                 u->remote_source_name,
394                                 &bufferattr,
395                                 PA_STREAM_INTERPOLATE_TIMING|PA_STREAM_DONT_MOVE|PA_STREAM_AUTO_TIMING_UPDATE|PA_STREAM_START_CORKED|PA_STREAM_ADJUST_LATENCY) < 0) {
396        pa_log_debug("Could not create stream: %s", pa_strerror(pa_context_errno(u->context)));
397        u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
398    }
399    u->connected = true;
400}
401
402static void context_state_cb(pa_context *c, void *userdata) {
403    struct userdata *u = userdata;
404    pa_assert(u);
405
406    switch (pa_context_get_state(c)) {
407        case PA_CONTEXT_UNCONNECTED:
408        case PA_CONTEXT_CONNECTING:
409        case PA_CONTEXT_AUTHORIZING:
410        case PA_CONTEXT_SETTING_NAME:
411            break;
412        case PA_CONTEXT_READY:
413            pa_log_debug("Connection successful. Creating stream.");
414            pa_assert(!u->stream);
415            pa_assert(!u->source);
416
417            pa_log_debug("Asking ctl thread to create source.");
418            pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST, u, 0, NULL, NULL);
419            break;
420        case PA_CONTEXT_FAILED:
421            pa_log_debug("Context failed with err %s.", pa_strerror(pa_context_errno(u->context)));
422            u->connected = false;
423            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
424            break;
425        case PA_CONTEXT_TERMINATED:
426            pa_log_debug("Context terminated.");
427            u->connected = false;
428            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
429            break;
430    }
431}
432
433static void source_update_requested_latency_cb(pa_source *s) {
434    struct userdata *u;
435    pa_operation *operation;
436    size_t nbytes;
437    pa_usec_t block_usec;
438    pa_buffer_attr bufferattr;
439
440    pa_source_assert_ref(s);
441    pa_assert_se(u = s->userdata);
442
443    block_usec = pa_source_get_requested_latency_within_thread(s);
444    if (block_usec == (pa_usec_t) -1)
445        block_usec = s->thread_info.max_latency;
446
447    nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec);
448
449    if (u->stream) {
450        switch (pa_stream_get_state(u->stream)) {
451            case PA_STREAM_READY:
452                if (pa_stream_get_buffer_attr(u->stream)->fragsize == nbytes)
453                    break;
454
455                reset_bufferattr(&bufferattr);
456                bufferattr.fragsize = nbytes;
457                if ((operation = pa_stream_set_buffer_attr(u->stream, &bufferattr, NULL, NULL)))
458                    pa_operation_unref(operation);
459                break;
460            case PA_STREAM_CREATING:
461                /* we have to delay our request until stream is ready */
462                u->update_stream_bufferattr_after_connect = true;
463                break;
464            default:
465                break;
466        }
467    }
468}
469
470static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
471    struct userdata *u = PA_SOURCE(o)->userdata;
472
473    switch (code) {
474        case PA_SOURCE_MESSAGE_GET_LATENCY: {
475            int negative;
476            pa_usec_t remote_latency;
477
478            if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state)) {
479                *((int64_t*) data) = 0;
480                return 0;
481            }
482
483            if (!u->stream) {
484                *((int64_t*) data) = 0;
485                return 0;
486            }
487
488            if (pa_stream_get_state(u->stream) != PA_STREAM_READY) {
489                *((int64_t*) data) = 0;
490                return 0;
491            }
492
493            if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) {
494                *((int64_t*) data) = 0;
495                return 0;
496            }
497
498            if (negative)
499                *((int64_t*) data) = - (int64_t)remote_latency;
500            else
501                *((int64_t*) data) = remote_latency;
502
503            return 0;
504        }
505        case TUNNEL_MESSAGE_SOURCE_CREATED:
506            on_source_created(u);
507            return 0;
508    }
509    return pa_source_process_msg(o, code, data, offset, chunk);
510}
511
512/* Called from the IO thread. */
513static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
514    struct userdata *u;
515
516    pa_assert(s);
517    pa_assert_se(u = s->userdata);
518
519    /* It may be that only the suspend cause is changing, in which case there's
520     * nothing to do. */
521    if (new_state == s->thread_info.state)
522        return 0;
523
524    if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
525        return 0;
526
527    switch (new_state) {
528        case PA_SOURCE_SUSPENDED: {
529            cork_stream(u, true);
530            break;
531        }
532        case PA_SOURCE_IDLE:
533        case PA_SOURCE_RUNNING: {
534            cork_stream(u, false);
535            break;
536        }
537        case PA_SOURCE_INVALID_STATE:
538        case PA_SOURCE_INIT:
539        case PA_SOURCE_UNLINKED:
540            break;
541    }
542
543    return 0;
544}
545
546/* Creates a source in the main thread.
547 *
548 * This method is called when we receive a message from the io thread that a
549 * connection has been established with the server.  We defer creation of the
550 * source until the connection is established, because we don't have a source
551 * if the remote server isn't there.
552 */
553static void create_source(struct userdata *u) {
554    pa_source_new_data source_data;
555
556    pa_assert_ctl_context();
557
558    /* Create source */
559    pa_source_new_data_init(&source_data);
560    source_data.driver = __FILE__;
561    source_data.module = u->module;
562
563    pa_source_new_data_set_name(&source_data, u->source_name);
564    pa_source_new_data_set_sample_spec(&source_data, &u->sample_spec);
565    pa_source_new_data_set_channel_map(&source_data, &u->channel_map);
566
567    pa_proplist_update(source_data.proplist, PA_UPDATE_REPLACE, u->source_proplist);
568
569    if (!(u->source = pa_source_new(u->module->core, &source_data, PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY | PA_SOURCE_NETWORK))) {
570        pa_log("Failed to create source.");
571        goto finish;
572    }
573
574    u->source->userdata = u;
575    u->source->parent.process_msg = source_process_msg_cb;
576    u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
577    u->source->update_requested_latency = source_update_requested_latency_cb;
578
579    pa_source_set_asyncmsgq(u->source, u->thread_mq->inq);
580    pa_source_set_rtpoll(u->source, u->rtpoll);
581
582    pa_source_put(u->source);
583
584finish:
585    pa_source_new_data_done(&source_data);
586
587    /* tell any interested io threads that the sink they asked for has now been
588     * created (even if we failed, we still notify the thread, so they can
589     * either handle or kill the thread, rather than deadlock waiting for a
590     * message that will never come */
591    pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), TUNNEL_MESSAGE_SOURCE_CREATED, u, 0, NULL);
592}
593
594/* Runs in PA mainloop context */
595static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
596    struct userdata *u = (struct userdata *) data;
597
598    pa_assert(u);
599    pa_assert_ctl_context();
600
601    if (u->shutting_down)
602        return 0;
603
604    switch (code) {
605        case TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST:
606            create_source(u);
607            break;
608        case TUNNEL_MESSAGE_MAYBE_RESTART:
609            maybe_restart(u->module->userdata);
610            break;
611    }
612
613    return 0;
614}
615
616static int do_init(pa_module *m) {
617    struct userdata *u = NULL;
618    struct module_restart_data *rd;
619    pa_modargs *ma = NULL;
620    const char *remote_server = NULL;
621    char *default_source_name = NULL;
622    uint32_t reconnect_interval_ms = 0;
623
624    pa_assert(m);
625    pa_assert(m->userdata);
626
627    rd = m->userdata;
628
629    if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
630        pa_log("Failed to parse module arguments.");
631        goto fail;
632    }
633
634    u = pa_xnew0(struct userdata, 1);
635    u->module = m;
636    rd->userdata = u;
637
638    u->sample_spec = m->core->default_sample_spec;
639    u->channel_map = m->core->default_channel_map;
640    if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
641        pa_log("Invalid sample format specification or channel map");
642        goto fail;
643    }
644
645    remote_server = pa_modargs_get_value(ma, "server", NULL);
646    if (!remote_server) {
647        pa_log("No server given!");
648        goto fail;
649    }
650
651    u->remote_server = pa_xstrdup(remote_server);
652    u->thread_mainloop = pa_mainloop_new();
653    if (u->thread_mainloop == NULL) {
654        pa_log("Failed to create mainloop");
655        goto fail;
656    }
657    u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop);
658    u->cookie_file = pa_xstrdup(pa_modargs_get_value(ma, "cookie", NULL));
659    u->remote_source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
660
661    u->thread_mq = pa_xnew0(pa_thread_mq, 1);
662
663    if (pa_thread_mq_init_thread_mainloop(u->thread_mq, m->core->mainloop, u->thread_mainloop_api) < 0) {
664        pa_log("pa_thread_mq_init_thread_mainloop() failed.");
665        goto fail;
666    }
667
668    u->msg = pa_msgobject_new(tunnel_msg);
669    u->msg->parent.process_msg = tunnel_process_msg;
670
671    /* The rtpoll created here is never run. It is only necessary to avoid crashes
672     * when module-tunnel-source-new is used together with module-loopback.
673     * module-loopback bases the asyncmsq on the rtpoll provided by the source and
674     * only works because it calls pa_asyncmsq_process_one(). */
675    u->rtpoll = pa_rtpoll_new();
676
677    default_source_name = pa_sprintf_malloc("tunnel-source-new.%s", remote_server);
678    u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", default_source_name));
679
680    u->source_proplist = pa_proplist_new();
681    pa_proplist_sets(u->source_proplist, PA_PROP_DEVICE_CLASS, "sound");
682    pa_proplist_setf(u->source_proplist,
683                     PA_PROP_DEVICE_DESCRIPTION,
684                     _("Tunnel to %s/%s"),
685                     remote_server,
686                     pa_strempty(u->remote_source_name));
687
688    if (pa_modargs_get_proplist(ma, "source_properties", u->source_proplist, PA_UPDATE_REPLACE) < 0) {
689        pa_log("Invalid properties");
690        goto fail;
691    }
692
693    pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms);
694    u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC;
695
696    if (!(u->thread = pa_thread_new("tunnel-source", thread_func, u))) {
697        pa_log("Failed to create thread.");
698        goto fail;
699    }
700
701    /* If the module is restarting and do_init() finishes successfully, the
702     * restart data is no longer needed. If do_init() fails, don't touch the
703     * restart data, because following restart attempts will continue to use
704     * the same data. If restart_data is NULL, that means no restart is
705     * currently pending. */
706    if (rd->restart_data) {
707        pa_restart_free(rd->restart_data);
708        rd->restart_data = NULL;
709    }
710
711    pa_modargs_free(ma);
712    pa_xfree(default_source_name);
713
714    return 0;
715
716fail:
717    if (ma)
718        pa_modargs_free(ma);
719
720    if (default_source_name)
721        pa_xfree(default_source_name);
722
723    return -1;
724}
725
726static void do_done(pa_module *m) {
727    struct userdata *u = NULL;
728    struct module_restart_data *rd;
729
730    pa_assert(m);
731
732    if (!(rd = m->userdata))
733        return;
734    if (!(u = rd->userdata))
735        return;
736
737    u->shutting_down = true;
738
739    if (u->source)
740        pa_source_unlink(u->source);
741
742    if (u->thread) {
743        pa_asyncmsgq_send(u->thread_mq->inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
744        pa_thread_free(u->thread);
745    }
746
747    if (u->thread_mq) {
748        pa_thread_mq_done(u->thread_mq);
749        pa_xfree(u->thread_mq);
750    }
751
752    if (u->thread_mainloop)
753        pa_mainloop_free(u->thread_mainloop);
754
755    if (u->cookie_file)
756        pa_xfree(u->cookie_file);
757
758    if (u->remote_source_name)
759        pa_xfree(u->remote_source_name);
760
761    if (u->remote_server)
762        pa_xfree(u->remote_server);
763
764    if (u->source)
765        pa_source_unref(u->source);
766
767    if (u->rtpoll)
768        pa_rtpoll_free(u->rtpoll);
769
770    if (u->source_proplist)
771        pa_proplist_free(u->source_proplist);
772
773    if (u->source_name)
774        pa_xfree(u->source_name);
775
776    pa_xfree(u->msg);
777
778    pa_xfree(u);
779
780    rd->userdata = NULL;
781}
782
783int pa__init(pa_module *m) {
784    int ret;
785
786    pa_assert(m);
787
788    m->userdata = pa_xnew0(struct module_restart_data, 1);
789
790    ret = do_init(m);
791
792    if (ret < 0)
793        pa__done(m);
794
795    return ret;
796}
797
798void pa__done(pa_module *m) {
799    pa_assert(m);
800
801    do_done(m);
802
803    if (m->userdata) {
804        struct module_restart_data *rd = m->userdata;
805
806        if (rd->restart_data)
807            pa_restart_free(rd->restart_data);
808
809        pa_xfree(m->userdata);
810    }
811}
812