1/***
2  This file is part of PulseAudio.
3
4  Copyright 2004-2006 Lennart Poettering
5  Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7  PulseAudio is free software; you can redistribute it and/or modify
8  it under the terms of the GNU Lesser General Public License as published
9  by the Free Software Foundation; either version 2.1 of the License,
10  or (at your option) any later version.
11
12  PulseAudio is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  General Public License for more details.
16
17  You should have received a copy of the GNU Lesser General Public License
18  along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19***/
20
21#ifdef HAVE_CONFIG_H
22#include <config.h>
23#endif
24
25#include "restart-module.h"
26
27#include <unistd.h>
28#include <string.h>
29#include <errno.h>
30#include <sys/types.h>
31#include <stdio.h>
32#include <stdlib.h>
33
34#ifdef HAVE_X11
35#include <xcb/xcb.h>
36#endif
37
38#include <pulse/rtclock.h>
39#include <pulse/timeval.h>
40#include <pulse/util.h>
41#include <pulse/version.h>
42#include <pulse/xmalloc.h>
43
44#include <pulsecore/module.h>
45#include <pulsecore/core-util.h>
46#include <pulsecore/modargs.h>
47#include <pulsecore/log.h>
48#include <pulsecore/core-subscribe.h>
49#include <pulsecore/pdispatch.h>
50#include <pulsecore/pstream.h>
51#include <pulsecore/pstream-util.h>
52#include <pulsecore/socket-client.h>
53
54#ifdef USE_SMOOTHER_2
55#include <pulsecore/time-smoother_2.h>
56#else
57#include <pulsecore/time-smoother.h>
58#endif
59
60#include <pulsecore/thread.h>
61#include <pulsecore/thread-mq.h>
62#include <pulsecore/core-rtclock.h>
63#include <pulsecore/core-error.h>
64#include <pulsecore/proplist-util.h>
65#include <pulsecore/auth-cookie.h>
66#include <pulsecore/mcalign.h>
67#include <pulsecore/strlist.h>
68
69#ifdef HAVE_X11
70#include <pulsecore/x11prop.h>
71#endif
72
73#define ENV_DEFAULT_SINK "PULSE_SINK"
74#define ENV_DEFAULT_SOURCE "PULSE_SOURCE"
75#define ENV_DEFAULT_SERVER "PULSE_SERVER"
76#define ENV_COOKIE_FILE "PULSE_COOKIE"
77
78#ifdef TUNNEL_SINK
79PA_MODULE_DESCRIPTION("Tunnel module for sinks");
80PA_MODULE_USAGE(
81        "sink_name=<name for the local sink> "
82        "sink_properties=<properties for the local sink> "
83        "auto=<determine server/sink/cookie automatically> "
84        "server=<address> "
85        "sink=<remote sink name> "
86        "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
87        "cookie=<filename> "
88        "format=<sample format> "
89        "channels=<number of channels> "
90        "rate=<sample rate> "
91        "latency_msec=<fixed latency in ms> "
92        "channel_map=<channel map>");
93#else
94PA_MODULE_DESCRIPTION("Tunnel module for sources");
95PA_MODULE_USAGE(
96        "source_name=<name for the local source> "
97        "source_properties=<properties for the local source> "
98        "auto=<determine server/source/cookie automatically> "
99        "server=<address> "
100        "source=<remote source name> "
101        "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
102        "cookie=<filename> "
103        "format=<sample format> "
104        "channels=<number of channels> "
105        "rate=<sample rate> "
106        "latency_msec=<fixed latency in ms> "
107        "channel_map=<channel map>");
108#endif
109
110PA_MODULE_AUTHOR("Lennart Poettering");
111PA_MODULE_VERSION(PACKAGE_VERSION);
112PA_MODULE_LOAD_ONCE(false);
113
114static const char* const valid_modargs[] = {
115    "auto",
116    "server",
117    "cookie",
118    "format",
119    "channels",
120    "rate",
121    "latency_msec",
122    "reconnect_interval_ms",
123#ifdef TUNNEL_SINK
124    "sink_name",
125    "sink_properties",
126    "sink",
127#else
128    "source_name",
129    "source_properties",
130    "source",
131#endif
132    "channel_map",
133    NULL,
134};
135
136#define DEFAULT_TIMEOUT 5
137
138#define LATENCY_INTERVAL (1*PA_USEC_PER_SEC)
139
140#define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
141
142#ifdef TUNNEL_SINK
143
144enum {
145    SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
146    SINK_MESSAGE_REMOTE_SUSPEND,
147    SINK_MESSAGE_UPDATE_LATENCY,
148    SINK_MESSAGE_GET_LATENCY_SNAPSHOT,
149    SINK_MESSAGE_POST,
150};
151
152#define DEFAULT_LATENCY_MSEC 100
153
154#else
155
156enum {
157    SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
158    SOURCE_MESSAGE_REMOTE_SUSPEND,
159    SOURCE_MESSAGE_UPDATE_LATENCY,
160    SOURCE_MESSAGE_GET_LATENCY_SNAPSHOT,
161};
162
163#define DEFAULT_LATENCY_MSEC 25
164
165#endif
166
167struct tunnel_msg {
168    pa_msgobject parent;
169};
170
171typedef struct tunnel_msg tunnel_msg;
172PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
173
174enum {
175    TUNNEL_MESSAGE_MAYBE_RESTART,
176};
177
178static int do_init(pa_module *m);
179static void do_done(pa_module *m);
180
181#ifdef TUNNEL_SINK
182static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
183static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
184#endif
185static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
186static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
187static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
188static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
189static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
190static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
191static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
192
193static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
194#ifdef TUNNEL_SINK
195    [PA_COMMAND_REQUEST] = command_request,
196    [PA_COMMAND_STARTED] = command_started,
197#endif
198    [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
199    [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
200    [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
201    [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
202    [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
203    [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
204    [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
205    [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
206    [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
207    [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
208    [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
209    [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
210    [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
211    [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
212    [PA_COMMAND_UNDERFLOW_OHOS] = command_overflow_or_underflow,
213};
214
215struct userdata {
216    pa_core *core;
217    pa_module *module;
218
219    pa_thread_mq thread_mq;
220    pa_rtpoll *rtpoll;
221    pa_thread *thread;
222
223    pa_socket_client *client;
224    pa_pstream *pstream;
225    pa_pdispatch *pdispatch;
226
227    char *server_name;
228#ifdef TUNNEL_SINK
229    char *sink_name;
230    char *configured_sink_name;
231    pa_sink *sink;
232    size_t requested_bytes;
233#else
234    char *source_name;
235    char *configured_source_name;
236    pa_source *source;
237    pa_mcalign *mcalign;
238#endif
239
240    pa_auth_cookie *auth_cookie;
241
242    uint32_t version;
243    uint32_t ctag;
244    uint32_t device_index;
245    uint32_t channel;
246    uint32_t latency;
247
248    int64_t counter;
249    uint64_t receive_counter;
250    uint64_t receive_snapshot;
251
252    bool remote_corked:1;
253    bool remote_suspended:1;
254    bool shutting_down:1;
255
256    pa_usec_t transport_usec; /* maintained in the main thread */
257    pa_usec_t thread_transport_usec; /* maintained in the IO thread */
258
259    uint32_t ignore_latency_before;
260
261    pa_time_event *time_event;
262
263#ifdef USE_SMOOTHER_2
264    pa_smoother_2 *smoother;
265#else
266    pa_smoother *smoother;
267#endif
268
269    char *device_description;
270    char *server_fqdn;
271    char *user_name;
272
273    uint32_t maxlength;
274#ifdef TUNNEL_SINK
275    uint32_t tlength;
276    uint32_t minreq;
277    uint32_t prebuf;
278
279    pa_proplist *sink_proplist;
280#else
281    uint32_t fragsize;
282
283    pa_proplist *source_proplist;
284#endif
285
286    pa_sample_spec sample_spec;
287    pa_channel_map channel_map;
288
289    tunnel_msg *msg;
290
291    pa_iochannel *io;
292
293    pa_usec_t reconnect_interval_us;
294    pa_usec_t snapshot_time;
295};
296
297struct module_restart_data {
298    struct userdata *userdata;
299    pa_restart_data *restart_data;
300};
301
302static void request_latency(struct userdata *u);
303#ifdef TUNNEL_SINK
304static void create_sink(struct userdata *u);
305static void on_sink_created(struct userdata *u);
306#else
307static void create_source(struct userdata *u);
308static void on_source_created(struct userdata *u);
309#endif
310
311/* Do a reinit of the module.  Note that u will be freed as a result of this
312 * call. */
313static void unload_module(struct module_restart_data *rd) {
314    struct userdata *u = rd->userdata;
315
316    if (rd->restart_data) {
317        pa_log_debug("Restart already pending");
318        return;
319    }
320
321    if (u->reconnect_interval_us > 0) {
322        /* The handle returned here must be freed when do_init() was successful and when the
323         * module exits. */
324        rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us);
325    } else
326        pa_module_unload_request(u->module, true);
327}
328
329/* Called from main context */
330static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
331    pa_log_debug("Got stream or client event.");
332}
333
334/* Called from main context */
335static void command_stream_killed(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
336    struct userdata *u = userdata;
337
338    pa_assert(pd);
339    pa_assert(t);
340    pa_assert(u);
341    pa_assert(u->pdispatch == pd);
342
343    pa_log_warn("Stream killed");
344    unload_module(u->module->userdata);
345}
346
347/* Called from main context */
348static void command_overflow_or_underflow(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
349    struct userdata *u = userdata;
350
351    pa_assert(pd);
352    pa_assert(t);
353    pa_assert(u);
354    pa_assert(u->pdispatch == pd);
355
356    pa_log_info("Server signalled buffer overrun/underrun.");
357    request_latency(u);
358}
359
360/* Called from main context */
361static void command_suspended(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
362    struct userdata *u = userdata;
363    uint32_t channel;
364    bool suspended;
365
366    pa_assert(pd);
367    pa_assert(t);
368    pa_assert(u);
369    pa_assert(u->pdispatch == pd);
370
371    if (pa_tagstruct_getu32(t, &channel) < 0 ||
372        pa_tagstruct_get_boolean(t, &suspended) < 0 ||
373        !pa_tagstruct_eof(t)) {
374
375        pa_log("Invalid packet.");
376        unload_module(u->module->userdata);
377        return;
378    }
379
380    pa_log_debug("Server reports device suspend.");
381
382#ifdef TUNNEL_SINK
383    pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
384#else
385    pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
386#endif
387
388    request_latency(u);
389}
390
391/* Called from main context */
392static void command_moved(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
393    struct userdata *u = userdata;
394    uint32_t channel, di;
395    const char *dn;
396    bool suspended;
397
398    pa_assert(pd);
399    pa_assert(t);
400    pa_assert(u);
401    pa_assert(u->pdispatch == pd);
402
403    if (pa_tagstruct_getu32(t, &channel) < 0 ||
404        pa_tagstruct_getu32(t, &di) < 0 ||
405        pa_tagstruct_gets(t, &dn) < 0 ||
406        pa_tagstruct_get_boolean(t, &suspended) < 0) {
407
408        pa_log_error("Invalid packet.");
409        unload_module(u->module->userdata);
410        return;
411    }
412
413    pa_log_debug("Server reports a stream move.");
414
415#ifdef TUNNEL_SINK
416    pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
417#else
418    pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL);
419#endif
420
421    request_latency(u);
422}
423
424static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
425    struct userdata *u = userdata;
426    uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
427    pa_usec_t usec;
428
429    pa_assert(pd);
430    pa_assert(t);
431    pa_assert(u);
432    pa_assert(u->pdispatch == pd);
433
434    if (pa_tagstruct_getu32(t, &channel) < 0 ||
435        pa_tagstruct_getu32(t, &maxlength) < 0) {
436
437        pa_log_error("Invalid packet.");
438        unload_module(u->module->userdata);
439        return;
440    }
441
442    if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
443        if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
444            pa_tagstruct_get_usec(t, &usec) < 0) {
445
446            pa_log_error("Invalid packet.");
447            unload_module(u->module->userdata);
448            return;
449        }
450    } else {
451        if (pa_tagstruct_getu32(t, &tlength) < 0 ||
452            pa_tagstruct_getu32(t, &prebuf) < 0 ||
453            pa_tagstruct_getu32(t, &minreq) < 0 ||
454            pa_tagstruct_get_usec(t, &usec) < 0) {
455
456            pa_log_error("Invalid packet.");
457            unload_module(u->module->userdata);
458            return;
459        }
460    }
461
462#ifdef TUNNEL_SINK
463    pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
464#endif
465
466    request_latency(u);
467}
468
469#ifdef TUNNEL_SINK
470
471/* Called from main context */
472static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
473    struct userdata *u = userdata;
474
475    pa_assert(pd);
476    pa_assert(t);
477    pa_assert(u);
478    pa_assert(u->pdispatch == pd);
479
480    pa_log_debug("Server reports playback started.");
481    request_latency(u);
482}
483
484#endif
485
486/* Called from IO thread context */
487static void check_smoother_status(struct userdata *u, bool past) {
488    pa_usec_t x;
489
490    pa_assert(u);
491
492    x = pa_rtclock_now();
493
494    /* Correct by the time the requested issued needs to travel to the
495     * other side.  This is a valid thread-safe access, because the
496     * main thread is waiting for us */
497
498    if (past)
499        x -= u->thread_transport_usec;
500    else
501        x += u->thread_transport_usec;
502
503    if (u->remote_suspended || u->remote_corked)
504#ifdef USE_SMOOTHER_2
505        pa_smoother_2_pause(u->smoother, x);
506    else
507        pa_smoother_2_resume(u->smoother, x);
508#else
509        pa_smoother_pause(u->smoother, x);
510    else
511        pa_smoother_resume(u->smoother, x, true);
512#endif
513}
514
515/* Called from IO thread context */
516static void stream_cork_within_thread(struct userdata *u, bool cork) {
517    pa_assert(u);
518
519    if (u->remote_corked == cork)
520        return;
521
522    u->remote_corked = cork;
523    check_smoother_status(u, false);
524}
525
526/* Called from main context */
527static void stream_cork(struct userdata *u, bool cork) {
528    pa_tagstruct *t;
529    pa_assert(u);
530
531    if (!u->pstream)
532        return;
533
534    t = pa_tagstruct_new();
535#ifdef TUNNEL_SINK
536    pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
537#else
538    pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
539#endif
540    pa_tagstruct_putu32(t, u->ctag++);
541    pa_tagstruct_putu32(t, u->channel);
542    pa_tagstruct_put_boolean(t, cork);
543    pa_pstream_send_tagstruct(u->pstream, t);
544
545    request_latency(u);
546}
547
548/* Called from IO thread context */
549static void stream_suspend_within_thread(struct userdata *u, bool suspend) {
550    pa_assert(u);
551
552    if (u->remote_suspended == suspend)
553        return;
554
555    u->remote_suspended = suspend;
556    check_smoother_status(u, true);
557}
558
559#ifdef TUNNEL_SINK
560
561/* Called from IO thread context */
562static void send_data(struct userdata *u) {
563    pa_assert(u);
564
565    while (u->requested_bytes > 0) {
566        pa_memchunk memchunk;
567
568        pa_sink_render(u->sink, u->requested_bytes, &memchunk);
569        pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
570        pa_memblock_unref(memchunk.memblock);
571
572        u->requested_bytes -= memchunk.length;
573
574        u->counter += (int64_t) memchunk.length;
575    }
576}
577
578/* This function is called from IO context -- except when it is not. */
579static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
580    struct userdata *u = PA_SINK(o)->userdata;
581
582    switch (code) {
583
584        case PA_SINK_MESSAGE_SET_STATE: {
585            int r;
586
587            /* First, change the state, because otherwise pa_sink_render() would fail */
588            if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
589
590                stream_cork_within_thread(u, u->sink->thread_info.state == PA_SINK_SUSPENDED);
591
592                if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
593                    send_data(u);
594            }
595
596            return r;
597        }
598
599        case PA_SINK_MESSAGE_GET_LATENCY: {
600            int64_t *usec = data;
601
602#ifdef USE_SMOOTHER_2
603            *usec = pa_smoother_2_get_delay(u->smoother, pa_rtclock_now(), u->counter);
604#else
605            pa_usec_t yl, yr;
606
607            yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
608            yr = pa_smoother_get(u->smoother, pa_rtclock_now());
609
610            *usec = (int64_t)yl - yr;
611#endif
612            return 0;
613        }
614
615        case SINK_MESSAGE_GET_LATENCY_SNAPSHOT: {
616            int64_t *send_counter = data;
617
618            *send_counter = u->counter;
619            return 0;
620        }
621
622        case SINK_MESSAGE_REQUEST:
623
624            pa_assert(offset > 0);
625            u->requested_bytes += (size_t) offset;
626
627            if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
628                send_data(u);
629
630            return 0;
631
632        case SINK_MESSAGE_REMOTE_SUSPEND:
633
634            stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
635            return 0;
636
637        case SINK_MESSAGE_UPDATE_LATENCY: {
638#ifdef USE_SMOOTHER_2
639            int64_t bytes;
640
641            if (offset < 0)
642                bytes = - pa_usec_to_bytes(- offset, &u->sink->sample_spec);
643            else
644                bytes = pa_usec_to_bytes(offset, &u->sink->sample_spec);
645
646            if (u->counter > bytes)
647                bytes = u->counter - bytes;
648            else
649                bytes = 0;
650
651            /* We may use u->snapshot time because the main thread is waiting */
652             pa_smoother_2_put(u->smoother, u->snapshot_time, bytes);
653#else
654            pa_usec_t y;
655
656            y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
657
658            if (y > (pa_usec_t) offset)
659                y -= (pa_usec_t) offset;
660            else
661                y = 0;
662
663            /* We may use u->snapshot time because the main thread is waiting */
664            pa_smoother_put(u->smoother, u->snapshot_time, y);
665#endif
666
667            /* We can access this freely here, since the main thread is waiting for us */
668            u->thread_transport_usec = u->transport_usec;
669
670            return 0;
671        }
672
673        case SINK_MESSAGE_POST:
674
675            /* OK, This might be a bit confusing. This message is
676             * delivered to us from the main context -- NOT from the
677             * IO thread context where the rest of the messages are
678             * dispatched. Yeah, ugly, but I am a lazy bastard. */
679
680            pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
681
682            u->receive_counter += chunk->length;
683
684            return 0;
685    }
686
687    return pa_sink_process_msg(o, code, data, offset, chunk);
688}
689
690/* Called from main context */
691static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
692    struct userdata *u;
693    pa_sink_assert_ref(s);
694    u = s->userdata;
695
696    /* It may be that only the suspend cause is changing, in which
697     * case there's nothing to do. */
698    if (state == s->state)
699        return 0;
700
701    switch ((pa_sink_state_t) state) {
702
703        case PA_SINK_SUSPENDED:
704            pa_assert(PA_SINK_IS_OPENED(s->state));
705            stream_cork(u, true);
706            break;
707
708        case PA_SINK_IDLE:
709        case PA_SINK_RUNNING:
710            if (s->state == PA_SINK_SUSPENDED)
711                stream_cork(u, false);
712            break;
713
714        case PA_SINK_UNLINKED:
715        case PA_SINK_INIT:
716        case PA_SINK_INVALID_STATE:
717            ;
718    }
719
720    return 0;
721}
722
723#else
724
725/* This function is called from IO context -- except when it is not. */
726static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
727    struct userdata *u = PA_SOURCE(o)->userdata;
728
729    switch (code) {
730
731        case PA_SOURCE_MESSAGE_SET_STATE: {
732            int r;
733
734            if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
735                stream_cork_within_thread(u, u->source->thread_info.state == PA_SOURCE_SUSPENDED);
736
737            return r;
738        }
739
740        case PA_SOURCE_MESSAGE_GET_LATENCY: {
741            int64_t *usec = data;
742
743#ifdef USE_SMOOTHER_2
744            *usec = - pa_smoother_2_get_delay(u->smoother, pa_rtclock_now(), u->counter);
745#else
746            pa_usec_t yr, yl;
747
748            yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
749            yr = pa_smoother_get(u->smoother, pa_rtclock_now());
750
751            *usec = (int64_t)yr - yl;
752#endif
753            return 0;
754        }
755
756        case SOURCE_MESSAGE_GET_LATENCY_SNAPSHOT: {
757            int64_t *send_counter = data;
758
759            *send_counter = u->counter;
760            return 0;
761        }
762
763        case SOURCE_MESSAGE_POST: {
764            pa_memchunk c;
765
766            pa_mcalign_push(u->mcalign, chunk);
767
768            while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
769
770                if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
771                    pa_source_post(u->source, &c);
772
773                pa_memblock_unref(c.memblock);
774
775                u->counter += (int64_t) c.length;
776            }
777
778            return 0;
779        }
780
781        case SOURCE_MESSAGE_REMOTE_SUSPEND:
782
783            stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
784            return 0;
785
786        case SOURCE_MESSAGE_UPDATE_LATENCY: {
787#ifdef USE_SMOOTHER_2
788            int64_t bytes;
789
790            if (offset < 0)
791                bytes = - pa_usec_to_bytes(- offset, &u->source->sample_spec);
792            else
793                bytes = pa_usec_to_bytes(offset, &u->source->sample_spec);
794
795            bytes += u->counter;
796
797            /* We may use u->snapshot time because the main thread is waiting */
798            pa_smoother_2_put(u->smoother, u->snapshot_time, bytes);
799#else
800            pa_usec_t y;
801
802            y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
803            y += offset;
804
805            /* We may use u->snapshot time because the main thread is waiting */
806            pa_smoother_put(u->smoother, u->snapshot_time, y);
807#endif
808
809            /* We can access this freely here, since the main thread is waiting for us */
810            u->thread_transport_usec = u->transport_usec;
811
812            return 0;
813        }
814    }
815
816    return pa_source_process_msg(o, code, data, offset, chunk);
817}
818
819/* Called from main context */
820static int source_set_state_in_main_thread_cb(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
821    struct userdata *u;
822    pa_source_assert_ref(s);
823    u = s->userdata;
824
825    /* It may be that only the suspend cause is changing, in which
826     * case there's nothing to do. */
827    if (state == s->state)
828        return 0;
829
830    switch ((pa_source_state_t) state) {
831
832        case PA_SOURCE_SUSPENDED:
833            pa_assert(PA_SOURCE_IS_OPENED(s->state));
834            stream_cork(u, true);
835            break;
836
837        case PA_SOURCE_IDLE:
838        case PA_SOURCE_RUNNING:
839            if (s->state == PA_SOURCE_SUSPENDED)
840                stream_cork(u, false);
841            break;
842
843        case PA_SOURCE_UNLINKED:
844        case PA_SOURCE_INIT:
845        case PA_SOURCE_INVALID_STATE:
846            ;
847    }
848
849    return 0;
850}
851
852#endif
853
854static void thread_func(void *userdata) {
855    struct userdata *u = userdata;
856
857    pa_assert(u);
858
859    pa_log_debug("Thread starting up");
860
861    pa_thread_mq_install(&u->thread_mq);
862
863    for (;;) {
864        int ret;
865
866#ifdef TUNNEL_SINK
867        if (u->sink && PA_UNLIKELY(u->sink->thread_info.rewind_requested))
868            pa_sink_process_rewind(u->sink, 0);
869#endif
870
871        if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
872            goto fail;
873
874        if (ret == 0)
875            goto finish;
876    }
877
878fail:
879    /* If this was no regular exit from the loop we have to continue
880     * processing messages until we received PA_MESSAGE_SHUTDOWN */
881    pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL);
882    pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
883
884finish:
885    pa_log_debug("Thread shutting down");
886}
887
888#ifdef TUNNEL_SINK
889/* Called from main context */
890static void command_request(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
891    struct userdata *u = userdata;
892    uint32_t bytes, channel;
893
894    pa_assert(pd);
895    pa_assert(command == PA_COMMAND_REQUEST);
896    pa_assert(t);
897    pa_assert(u);
898    pa_assert(u->pdispatch == pd);
899
900    if (pa_tagstruct_getu32(t, &channel) < 0 ||
901        pa_tagstruct_getu32(t, &bytes) < 0) {
902        pa_log("Invalid protocol reply");
903        goto fail;
904    }
905
906    if (channel != u->channel) {
907        pa_log("Received data for invalid channel");
908        goto fail;
909    }
910
911    pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
912    return;
913
914fail:
915    unload_module(u->module->userdata);
916}
917
918#endif
919
920/* Called from main context */
921static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
922    struct userdata *u = userdata;
923    pa_usec_t sink_usec, source_usec;
924    bool playing;
925    int64_t write_index, read_index;
926    struct timeval local, remote, now;
927    pa_sample_spec *ss;
928    int64_t delay;
929#ifdef TUNNEL_SINK
930    uint64_t send_counter;
931#endif
932
933    pa_assert(pd);
934    pa_assert(u);
935
936    if (command != PA_COMMAND_REPLY) {
937        if (command == PA_COMMAND_ERROR)
938            pa_log("Failed to get latency.");
939        else
940            pa_log("Protocol error.");
941        goto fail;
942    }
943
944    if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
945        pa_tagstruct_get_usec(t, &source_usec) < 0 ||
946        pa_tagstruct_get_boolean(t, &playing) < 0 ||
947        pa_tagstruct_get_timeval(t, &local) < 0 ||
948        pa_tagstruct_get_timeval(t, &remote) < 0 ||
949        pa_tagstruct_gets64(t, &write_index) < 0 ||
950        pa_tagstruct_gets64(t, &read_index) < 0) {
951        pa_log("Invalid reply.");
952        goto fail;
953    }
954
955#ifdef TUNNEL_SINK
956    if (u->version >= 13) {
957        uint64_t underrun_for = 0, playing_for = 0;
958
959        if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
960            pa_tagstruct_getu64(t, &playing_for) < 0) {
961            pa_log("Invalid reply.");
962            goto fail;
963        }
964    }
965#endif
966
967    if (!pa_tagstruct_eof(t)) {
968        pa_log("Invalid reply.");
969        goto fail;
970    }
971
972    if (tag < u->ignore_latency_before) {
973        return;
974    }
975
976    pa_gettimeofday(&now);
977
978    /* Calculate transport usec */
979    if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now) < 0) {
980        /* local and remote seem to have synchronized clocks */
981#ifdef TUNNEL_SINK
982        u->transport_usec = pa_timeval_diff(&remote, &local);
983#else
984        u->transport_usec = pa_timeval_diff(&now, &remote);
985#endif
986    } else
987        u->transport_usec = pa_timeval_diff(&now, &local)/2;
988
989    /* First, take the device's delay */
990#ifdef TUNNEL_SINK
991    delay = (int64_t) sink_usec;
992    ss = &u->sink->sample_spec;
993#else
994    delay = (int64_t) source_usec;
995    ss = &u->source->sample_spec;
996#endif
997
998    /* Add the length of our server-side buffer */
999    if (write_index >= read_index)
1000        delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
1001    else
1002        delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
1003
1004    /* Our measurements are already out of date, hence correct by the     *
1005     * transport latency */
1006#ifdef TUNNEL_SINK
1007    delay -= (int64_t) u->transport_usec;
1008#else
1009    delay += (int64_t) u->transport_usec;
1010#endif
1011
1012    /* Now correct by what we have have written since we requested the update. This
1013     * is not necessary for the source, because if data is received between request
1014     * and reply, it was already posted before we requested the source latency. */
1015#ifdef TUNNEL_SINK
1016    pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_GET_LATENCY_SNAPSHOT, &send_counter, 0, NULL);
1017    delay += (int64_t) pa_bytes_to_usec(send_counter - u->receive_snapshot, ss);
1018#endif
1019
1020    /* It may take some time before the async message is executed, so we take a timestamp here */
1021    u->snapshot_time = pa_rtclock_now();
1022
1023#ifdef TUNNEL_SINK
1024    pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
1025#else
1026    pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
1027#endif
1028
1029    return;
1030
1031fail:
1032
1033    unload_module(u->module->userdata);
1034}
1035
1036/* Called from main context */
1037static void request_latency(struct userdata *u) {
1038    pa_tagstruct *t;
1039    struct timeval now;
1040    uint32_t tag;
1041    pa_assert(u);
1042
1043    t = pa_tagstruct_new();
1044#ifdef TUNNEL_SINK
1045    pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
1046#else
1047    pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
1048#endif
1049    pa_tagstruct_putu32(t, tag = u->ctag++);
1050    pa_tagstruct_putu32(t, u->channel);
1051
1052    pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1053
1054    pa_pstream_send_tagstruct(u->pstream, t);
1055    pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
1056
1057    u->ignore_latency_before = tag;
1058    u->receive_snapshot = u->receive_counter;
1059}
1060
1061/* Called from main context */
1062static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
1063    struct userdata *u = userdata;
1064
1065    pa_assert(m);
1066    pa_assert(e);
1067    pa_assert(u);
1068
1069    request_latency(u);
1070
1071    pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
1072}
1073
1074/* Called from main context */
1075static void update_description(struct userdata *u) {
1076    char *d;
1077    char un[128], hn[128];
1078    pa_tagstruct *t;
1079
1080    pa_assert(u);
1081
1082    if (!u->server_fqdn || !u->user_name || !u->device_description)
1083        return;
1084
1085    d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
1086
1087#ifdef TUNNEL_SINK
1088    pa_sink_set_description(u->sink, d);
1089    pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
1090    pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
1091    pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
1092#else
1093    pa_source_set_description(u->source, d);
1094    pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
1095    pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
1096    pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
1097#endif
1098
1099    pa_xfree(d);
1100
1101    d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
1102                          pa_get_user_name(un, sizeof(un)),
1103                          pa_get_host_name(hn, sizeof(hn)));
1104
1105    t = pa_tagstruct_new();
1106#ifdef TUNNEL_SINK
1107    pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
1108#else
1109    pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
1110#endif
1111    pa_tagstruct_putu32(t, u->ctag++);
1112    pa_tagstruct_putu32(t, u->channel);
1113    pa_tagstruct_puts(t, d);
1114    pa_pstream_send_tagstruct(u->pstream, t);
1115
1116    pa_xfree(d);
1117}
1118
1119/* Called from main context */
1120static void server_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1121    struct userdata *u = userdata;
1122    pa_sample_spec ss;
1123    pa_channel_map cm;
1124    const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
1125    uint32_t cookie;
1126
1127    pa_assert(pd);
1128    pa_assert(u);
1129
1130    if (command != PA_COMMAND_REPLY) {
1131        if (command == PA_COMMAND_ERROR)
1132            pa_log("Failed to get info.");
1133        else
1134            pa_log("Protocol error.");
1135        goto fail;
1136    }
1137
1138    if (pa_tagstruct_gets(t, &server_name) < 0 ||
1139        pa_tagstruct_gets(t, &server_version) < 0 ||
1140        pa_tagstruct_gets(t, &user_name) < 0 ||
1141        pa_tagstruct_gets(t, &host_name) < 0 ||
1142        pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1143        pa_tagstruct_gets(t, &default_sink_name) < 0 ||
1144        pa_tagstruct_gets(t, &default_source_name) < 0 ||
1145        pa_tagstruct_getu32(t, &cookie) < 0 ||
1146        (u->version >= 15 && pa_tagstruct_get_channel_map(t, &cm) < 0)) {
1147
1148        pa_log("Parse failure");
1149        goto fail;
1150    }
1151
1152    if (!pa_tagstruct_eof(t)) {
1153        pa_log("Packet too long");
1154        goto fail;
1155    }
1156
1157    pa_xfree(u->server_fqdn);
1158    u->server_fqdn = pa_xstrdup(host_name);
1159
1160    pa_xfree(u->user_name);
1161    u->user_name = pa_xstrdup(user_name);
1162
1163    update_description(u);
1164
1165    return;
1166
1167fail:
1168    unload_module(u->module->userdata);
1169}
1170
1171static int read_ports(struct userdata *u, pa_tagstruct *t) {
1172    if (u->version >= 16) {
1173        uint32_t n_ports;
1174        const char *s;
1175
1176        if (pa_tagstruct_getu32(t, &n_ports)) {
1177            pa_log("Parse failure");
1178            return -PA_ERR_PROTOCOL;
1179        }
1180
1181        for (uint32_t j = 0; j < n_ports; j++) {
1182            uint32_t priority;
1183
1184            if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1185                pa_tagstruct_gets(t, &s) < 0 || /* description */
1186                pa_tagstruct_getu32(t, &priority) < 0) {
1187
1188                pa_log("Parse failure");
1189                return -PA_ERR_PROTOCOL;
1190            }
1191            if (u->version >= 24) {
1192                if (pa_tagstruct_getu32(t, &priority) < 0) { /* available */
1193                    pa_log("Parse failure");
1194                    return -PA_ERR_PROTOCOL;
1195                }
1196                if (u->version >= 34 &&
1197                    (pa_tagstruct_gets(t, &s) < 0 || /* availability group */
1198                     pa_tagstruct_getu32(t, &priority) < 0)) { /* device port type */
1199                    pa_log("Parse failure");
1200                    return -PA_ERR_PROTOCOL;
1201                }
1202            }
1203        }
1204
1205        if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1206            pa_log("Parse failure");
1207            return -PA_ERR_PROTOCOL;
1208        }
1209    }
1210    return 0;
1211}
1212
1213static int read_formats(struct userdata *u, pa_tagstruct *t) {
1214    uint8_t n_formats;
1215    pa_format_info *format;
1216
1217    if (pa_tagstruct_getu8(t, &n_formats) < 0) { /* no. of formats */
1218        pa_log("Parse failure");
1219        return -PA_ERR_PROTOCOL;
1220    }
1221
1222    for (uint8_t j = 0; j < n_formats; j++) {
1223        format = pa_format_info_new();
1224        if (pa_tagstruct_get_format_info(t, format)) { /* format info */
1225            pa_format_info_free(format);
1226            pa_log("Parse failure");
1227            return -PA_ERR_PROTOCOL;
1228        }
1229        pa_format_info_free(format);
1230    }
1231    return 0;
1232}
1233
1234#ifdef TUNNEL_SINK
1235
1236/* Called from main context */
1237static void sink_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1238    struct userdata *u = userdata;
1239    uint32_t idx, owner_module, monitor_source, flags;
1240    const char *name, *description, *monitor_source_name, *driver;
1241    pa_sample_spec ss;
1242    pa_channel_map cm;
1243    pa_cvolume volume;
1244    bool mute;
1245    pa_usec_t latency;
1246
1247    pa_assert(pd);
1248    pa_assert(u);
1249
1250    if (command != PA_COMMAND_REPLY) {
1251        if (command == PA_COMMAND_ERROR)
1252            pa_log("Failed to get info.");
1253        else
1254            pa_log("Protocol error.");
1255        goto fail;
1256    }
1257
1258    if (pa_tagstruct_getu32(t, &idx) < 0 ||
1259        pa_tagstruct_gets(t, &name) < 0 ||
1260        pa_tagstruct_gets(t, &description) < 0 ||
1261        pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1262        pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1263        pa_tagstruct_getu32(t, &owner_module) < 0 ||
1264        pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1265        pa_tagstruct_get_boolean(t, &mute) < 0 ||
1266        pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1267        pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1268        pa_tagstruct_get_usec(t, &latency) < 0 ||
1269        pa_tagstruct_gets(t, &driver) < 0 ||
1270        pa_tagstruct_getu32(t, &flags) < 0) {
1271
1272        pa_log("Parse failure");
1273        goto fail;
1274    }
1275
1276    if (u->version >= 13) {
1277        pa_usec_t configured_latency;
1278
1279        if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1280            pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1281
1282            pa_log("Parse failure");
1283            goto fail;
1284        }
1285    }
1286
1287    if (u->version >= 15) {
1288        pa_volume_t base_volume;
1289        uint32_t state, n_volume_steps, card;
1290
1291        if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1292            pa_tagstruct_getu32(t, &state) < 0 ||
1293            pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1294            pa_tagstruct_getu32(t, &card) < 0) {
1295
1296            pa_log("Parse failure");
1297            goto fail;
1298        }
1299    }
1300
1301    if (read_ports(u, t) < 0)
1302        goto fail;
1303
1304    if (u->version >= 21 && read_formats(u, t) < 0)
1305        goto fail;
1306
1307    if (!pa_tagstruct_eof(t)) {
1308        pa_log("Packet too long");
1309        goto fail;
1310    }
1311
1312    if (!u->sink_name || !pa_streq(name, u->sink_name))
1313        return;
1314
1315    pa_xfree(u->device_description);
1316    u->device_description = pa_xstrdup(description);
1317
1318    update_description(u);
1319
1320    return;
1321
1322fail:
1323    unload_module(u->module->userdata);
1324}
1325
1326/* Called from main context */
1327static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1328    struct userdata *u = userdata;
1329    uint32_t idx, owner_module, client, sink;
1330    pa_usec_t buffer_usec, sink_usec;
1331    const char *name, *driver, *resample_method;
1332    bool mute = false;
1333    pa_sample_spec sample_spec;
1334    pa_channel_map channel_map;
1335    pa_cvolume volume;
1336    bool b;
1337
1338    pa_assert(pd);
1339    pa_assert(u);
1340
1341    if (command != PA_COMMAND_REPLY) {
1342        if (command == PA_COMMAND_ERROR)
1343            pa_log("Failed to get info.");
1344        else
1345            pa_log("Protocol error.");
1346        goto fail;
1347    }
1348
1349    if (pa_tagstruct_getu32(t, &idx) < 0 ||
1350        pa_tagstruct_gets(t, &name) < 0 ||
1351        pa_tagstruct_getu32(t, &owner_module) < 0 ||
1352        pa_tagstruct_getu32(t, &client) < 0 ||
1353        pa_tagstruct_getu32(t, &sink) < 0 ||
1354        pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1355        pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1356        pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1357        pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1358        pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1359        pa_tagstruct_gets(t, &resample_method) < 0 ||
1360        pa_tagstruct_gets(t, &driver) < 0) {
1361
1362        pa_log("Parse failure");
1363        goto fail;
1364    }
1365
1366    if (u->version >= 11) {
1367        if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1368
1369            pa_log("Parse failure");
1370            goto fail;
1371        }
1372    }
1373
1374    if (u->version >= 13) {
1375        if (pa_tagstruct_get_proplist(t, NULL) < 0) {
1376
1377            pa_log("Parse failure");
1378            goto fail;
1379        }
1380    }
1381
1382    if (u->version >= 19) {
1383        if (pa_tagstruct_get_boolean(t, &b) < 0) {
1384
1385            pa_log("Parse failure");
1386            goto fail;
1387        }
1388    }
1389
1390    if (u->version >= 20) {
1391        if (pa_tagstruct_get_boolean(t, &b) < 0 ||
1392            pa_tagstruct_get_boolean(t, &b) < 0) {
1393
1394            pa_log("Parse failure");
1395            goto fail;
1396        }
1397    }
1398
1399    if (u->version >= 21) {
1400        pa_format_info *format = pa_format_info_new();
1401
1402        if (pa_tagstruct_get_format_info(t, format) < 0) {
1403            pa_format_info_free(format);
1404            pa_log("Parse failure");
1405            goto fail;
1406        }
1407        pa_format_info_free(format);
1408    }
1409
1410    if (!pa_tagstruct_eof(t)) {
1411        pa_log("Packet too long");
1412        goto fail;
1413    }
1414
1415    if (idx != u->device_index)
1416        return;
1417
1418    pa_assert(u->sink);
1419
1420    if ((u->version < 11 || mute == u->sink->muted) &&
1421        pa_cvolume_equal(&volume, &u->sink->real_volume))
1422        return;
1423
1424    pa_sink_volume_changed(u->sink, &volume);
1425
1426    if (u->version >= 11)
1427        pa_sink_mute_changed(u->sink, mute);
1428
1429    return;
1430
1431fail:
1432    unload_module(u->module->userdata);
1433}
1434
1435#else
1436
1437/* Called from main context */
1438static void source_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1439    struct userdata *u = userdata;
1440    uint32_t idx, owner_module, monitor_of_sink, flags;
1441    const char *name, *description, *monitor_of_sink_name, *driver;
1442    pa_sample_spec ss;
1443    pa_channel_map cm;
1444    pa_cvolume volume;
1445    bool mute;
1446    pa_usec_t latency, configured_latency;
1447
1448    pa_assert(pd);
1449    pa_assert(u);
1450
1451    if (command != PA_COMMAND_REPLY) {
1452        if (command == PA_COMMAND_ERROR)
1453            pa_log("Failed to get info.");
1454        else
1455            pa_log("Protocol error.");
1456        goto fail;
1457    }
1458
1459    if (pa_tagstruct_getu32(t, &idx) < 0 ||
1460        pa_tagstruct_gets(t, &name) < 0 ||
1461        pa_tagstruct_gets(t, &description) < 0 ||
1462        pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1463        pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1464        pa_tagstruct_getu32(t, &owner_module) < 0 ||
1465        pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1466        pa_tagstruct_get_boolean(t, &mute) < 0 ||
1467        pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1468        pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1469        pa_tagstruct_get_usec(t, &latency) < 0 ||
1470        pa_tagstruct_gets(t, &driver) < 0 ||
1471        pa_tagstruct_getu32(t, &flags) < 0) {
1472
1473        pa_log("Parse failure");
1474        goto fail;
1475    }
1476
1477    if (u->version >= 13) {
1478        if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1479            pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1480
1481            pa_log("Parse failure");
1482            goto fail;
1483        }
1484    }
1485
1486    if (u->version >= 15) {
1487        pa_volume_t base_volume;
1488        uint32_t state, n_volume_steps, card;
1489
1490        if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1491            pa_tagstruct_getu32(t, &state) < 0 ||
1492            pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1493            pa_tagstruct_getu32(t, &card) < 0) {
1494
1495            pa_log("Parse failure");
1496            goto fail;
1497        }
1498    }
1499
1500    if (read_ports(u, t) < 0)
1501        goto fail;
1502
1503    if (u->version >= 22 && read_formats(u, t) < 0)
1504        goto fail;
1505
1506    if (!pa_tagstruct_eof(t)) {
1507        pa_log("Packet too long");
1508        goto fail;
1509    }
1510
1511    if (!u->source_name || !pa_streq(name, u->source_name))
1512        return;
1513
1514    pa_xfree(u->device_description);
1515    u->device_description = pa_xstrdup(description);
1516
1517    update_description(u);
1518
1519    return;
1520
1521fail:
1522    unload_module(u->module->userdata);
1523}
1524
1525#endif
1526
1527/* Called from main context */
1528static void request_info(struct userdata *u) {
1529    pa_tagstruct *t;
1530    uint32_t tag;
1531    pa_assert(u);
1532
1533    t = pa_tagstruct_new();
1534    pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1535    pa_tagstruct_putu32(t, tag = u->ctag++);
1536    pa_pstream_send_tagstruct(u->pstream, t);
1537    pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1538
1539#ifdef TUNNEL_SINK
1540    t = pa_tagstruct_new();
1541    pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1542    pa_tagstruct_putu32(t, tag = u->ctag++);
1543    pa_tagstruct_putu32(t, u->device_index);
1544    pa_pstream_send_tagstruct(u->pstream, t);
1545    pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1546
1547    if (u->sink_name) {
1548        t = pa_tagstruct_new();
1549        pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1550        pa_tagstruct_putu32(t, tag = u->ctag++);
1551        pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1552        pa_tagstruct_puts(t, u->sink_name);
1553        pa_pstream_send_tagstruct(u->pstream, t);
1554        pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1555    }
1556#else
1557    if (u->source_name) {
1558        t = pa_tagstruct_new();
1559        pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1560        pa_tagstruct_putu32(t, tag = u->ctag++);
1561        pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1562        pa_tagstruct_puts(t, u->source_name);
1563        pa_pstream_send_tagstruct(u->pstream, t);
1564        pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1565    }
1566#endif
1567}
1568
1569/* Called from main context */
1570static void command_subscribe_event(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1571    struct userdata *u = userdata;
1572    pa_subscription_event_type_t e;
1573    uint32_t idx;
1574
1575    pa_assert(pd);
1576    pa_assert(t);
1577    pa_assert(u);
1578    pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1579
1580    if (pa_tagstruct_getu32(t, &e) < 0 ||
1581        pa_tagstruct_getu32(t, &idx) < 0) {
1582        pa_log("Invalid protocol reply");
1583        unload_module(u->module->userdata);
1584        return;
1585    }
1586
1587    if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1588#ifdef TUNNEL_SINK
1589        e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1590        e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1591#else
1592        e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1593#endif
1594        )
1595        return;
1596
1597    request_info(u);
1598}
1599
1600/* Called from main context */
1601static void start_subscribe(struct userdata *u) {
1602    pa_tagstruct *t;
1603    pa_assert(u);
1604
1605    t = pa_tagstruct_new();
1606    pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1607    pa_tagstruct_putu32(t, u->ctag++);
1608    pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1609#ifdef TUNNEL_SINK
1610                        PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1611#else
1612                        PA_SUBSCRIPTION_MASK_SOURCE
1613#endif
1614                        );
1615
1616    pa_pstream_send_tagstruct(u->pstream, t);
1617}
1618
1619/* Called from main context */
1620static void create_stream_callback(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1621    struct userdata *u = userdata;
1622#ifdef TUNNEL_SINK
1623    uint32_t bytes;
1624#endif
1625
1626    pa_assert(pd);
1627    pa_assert(u);
1628    pa_assert(u->pdispatch == pd);
1629
1630    if (command != PA_COMMAND_REPLY) {
1631        if (command == PA_COMMAND_ERROR)
1632            pa_log("Failed to create stream.");
1633        else
1634            pa_log("Protocol error.");
1635        goto fail;
1636    }
1637
1638    if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1639        pa_tagstruct_getu32(t, &u->device_index) < 0
1640#ifdef TUNNEL_SINK
1641        || pa_tagstruct_getu32(t, &bytes) < 0
1642#endif
1643        )
1644        goto parse_error;
1645
1646    if (u->version >= 9) {
1647#ifdef TUNNEL_SINK
1648        if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1649            pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1650            pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1651            pa_tagstruct_getu32(t, &u->minreq) < 0)
1652            goto parse_error;
1653#else
1654        if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1655            pa_tagstruct_getu32(t, &u->fragsize) < 0)
1656            goto parse_error;
1657#endif
1658    }
1659
1660    if (u->version >= 12) {
1661        pa_sample_spec ss;
1662        pa_channel_map cm;
1663        uint32_t device_index;
1664        const char *dn;
1665        bool suspended;
1666
1667        if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1668            pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1669            pa_tagstruct_getu32(t, &device_index) < 0 ||
1670            pa_tagstruct_gets(t, &dn) < 0 ||
1671            pa_tagstruct_get_boolean(t, &suspended) < 0)
1672            goto parse_error;
1673
1674#ifdef TUNNEL_SINK
1675        pa_xfree(u->sink_name);
1676        u->sink_name = pa_xstrdup(dn);
1677#else
1678        pa_xfree(u->source_name);
1679        u->source_name = pa_xstrdup(dn);
1680#endif
1681    }
1682
1683    if (u->version >= 13) {
1684        pa_usec_t usec;
1685
1686        if (pa_tagstruct_get_usec(t, &usec) < 0)
1687            goto parse_error;
1688
1689/* #ifdef TUNNEL_SINK */
1690/*         pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1691/* #else */
1692/*         pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1693/* #endif */
1694    }
1695
1696    if (u->version >= 21) {
1697        pa_format_info *format = pa_format_info_new();
1698
1699        if (pa_tagstruct_get_format_info(t, format) < 0) {
1700            pa_format_info_free(format);
1701            goto parse_error;
1702        }
1703
1704        pa_format_info_free(format);
1705    }
1706
1707    if (!pa_tagstruct_eof(t))
1708        goto parse_error;
1709
1710    start_subscribe(u);
1711    request_info(u);
1712
1713    pa_assert(!u->time_event);
1714    u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1715
1716    request_latency(u);
1717
1718    pa_log_debug("Stream created.");
1719
1720#ifdef TUNNEL_SINK
1721    pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1722#endif
1723
1724    return;
1725
1726parse_error:
1727    pa_log("Invalid reply. (Create stream)");
1728
1729fail:
1730    unload_module(u->module->userdata);
1731
1732}
1733
1734/* Called from main context */
1735static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1736    struct userdata *u = userdata;
1737    pa_tagstruct *reply;
1738    char name[256], un[128], hn[128];
1739    pa_cvolume volume;
1740
1741    pa_assert(pd);
1742    pa_assert(u);
1743    pa_assert(u->pdispatch == pd);
1744
1745    if (command != PA_COMMAND_REPLY ||
1746        pa_tagstruct_getu32(t, &u->version) < 0 ||
1747        !pa_tagstruct_eof(t)) {
1748
1749        if (command == PA_COMMAND_ERROR)
1750            pa_log("Failed to authenticate");
1751        else
1752            pa_log("Protocol error.");
1753
1754        goto fail;
1755    }
1756
1757    /* Minimum supported protocol version */
1758    if (u->version < 8) {
1759        pa_log("Incompatible protocol version");
1760        goto fail;
1761    }
1762
1763    /* Starting with protocol version 13 the MSB of the version tag
1764    reflects if shm is enabled for this connection or not. We don't
1765    support SHM here at all, so we just ignore this. */
1766
1767    if (u->version >= 13)
1768        u->version &= 0x7FFFFFFFU;
1769
1770    pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1771
1772#ifdef TUNNEL_SINK
1773    pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1774    pa_sink_update_proplist(u->sink, 0, NULL);
1775
1776    pa_snprintf(name, sizeof(name), "%s for %s@%s",
1777                u->sink_name,
1778                pa_get_user_name(un, sizeof(un)),
1779                pa_get_host_name(hn, sizeof(hn)));
1780#else
1781    pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1782    pa_source_update_proplist(u->source, 0, NULL);
1783
1784    pa_snprintf(name, sizeof(name), "%s for %s@%s",
1785                u->source_name,
1786                pa_get_user_name(un, sizeof(un)),
1787                pa_get_host_name(hn, sizeof(hn)));
1788#endif
1789
1790    reply = pa_tagstruct_new();
1791    pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1792    pa_tagstruct_putu32(reply, u->ctag++);
1793
1794    if (u->version >= 13) {
1795        pa_proplist *pl;
1796        pl = pa_proplist_new();
1797        pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1798        pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1799        pa_init_proplist(pl);
1800        pa_tagstruct_put_proplist(reply, pl);
1801        pa_proplist_free(pl);
1802    } else
1803        pa_tagstruct_puts(reply, "PulseAudio");
1804
1805    pa_pstream_send_tagstruct(u->pstream, reply);
1806    /* We ignore the server's reply here */
1807
1808    reply = pa_tagstruct_new();
1809
1810    if (u->version < 13)
1811        /* Only for older PA versions we need to fill in the maxlength */
1812        u->maxlength = 4*1024*1024;
1813
1814#ifdef TUNNEL_SINK
1815    u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * u->latency, &u->sink->sample_spec);
1816    u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * u->latency / 4, &u->sink->sample_spec);
1817    u->prebuf = u->tlength;
1818#else
1819    u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * u->latency, &u->source->sample_spec);
1820#endif
1821
1822#ifdef TUNNEL_SINK
1823    pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1824    pa_tagstruct_putu32(reply, tag = u->ctag++);
1825
1826    if (u->version < 13)
1827        pa_tagstruct_puts(reply, name);
1828
1829    pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1830    pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1831    pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1832    pa_tagstruct_puts(reply, u->sink_name);
1833    pa_tagstruct_putu32(reply, u->maxlength);
1834    pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(u->sink->state));
1835    pa_tagstruct_putu32(reply, u->tlength);
1836    pa_tagstruct_putu32(reply, u->prebuf);
1837    pa_tagstruct_putu32(reply, u->minreq);
1838    pa_tagstruct_putu32(reply, 0);
1839    pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1840    pa_tagstruct_put_cvolume(reply, &volume);
1841#else
1842    pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1843    pa_tagstruct_putu32(reply, tag = u->ctag++);
1844
1845    if (u->version < 13)
1846        pa_tagstruct_puts(reply, name);
1847
1848    pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1849    pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1850    pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1851    pa_tagstruct_puts(reply, u->source_name);
1852    pa_tagstruct_putu32(reply, u->maxlength);
1853    pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(u->source->state));
1854    pa_tagstruct_putu32(reply, u->fragsize);
1855#endif
1856
1857    if (u->version >= 12) {
1858        pa_tagstruct_put_boolean(reply, false); /* no_remap */
1859        pa_tagstruct_put_boolean(reply, false); /* no_remix */
1860        pa_tagstruct_put_boolean(reply, false); /* fix_format */
1861        pa_tagstruct_put_boolean(reply, false); /* fix_rate */
1862        pa_tagstruct_put_boolean(reply, false); /* fix_channels */
1863        pa_tagstruct_put_boolean(reply, true); /* no_move */
1864        pa_tagstruct_put_boolean(reply, false); /* variable_rate */
1865    }
1866
1867    if (u->version >= 13) {
1868        pa_proplist *pl;
1869
1870        pa_tagstruct_put_boolean(reply, false); /* start muted/peak detect*/
1871        pa_tagstruct_put_boolean(reply, true); /* adjust_latency */
1872
1873        pl = pa_proplist_new();
1874        pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1875        pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1876        pa_tagstruct_put_proplist(reply, pl);
1877        pa_proplist_free(pl);
1878
1879#ifndef TUNNEL_SINK
1880        pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1881#endif
1882    }
1883
1884    if (u->version >= 14) {
1885#ifdef TUNNEL_SINK
1886        pa_tagstruct_put_boolean(reply, false); /* volume_set */
1887#endif
1888        pa_tagstruct_put_boolean(reply, true); /* early rquests */
1889    }
1890
1891    if (u->version >= 15) {
1892#ifdef TUNNEL_SINK
1893        pa_tagstruct_put_boolean(reply, false); /* muted_set */
1894#endif
1895        pa_tagstruct_put_boolean(reply, false); /* don't inhibit auto suspend */
1896        pa_tagstruct_put_boolean(reply, false); /* fail on suspend */
1897    }
1898
1899#ifdef TUNNEL_SINK
1900    if (u->version >= 17)
1901        pa_tagstruct_put_boolean(reply, false); /* relative volume */
1902
1903    if (u->version >= 18)
1904        pa_tagstruct_put_boolean(reply, false); /* passthrough stream */
1905#endif
1906
1907#ifdef TUNNEL_SINK
1908    if (u->version >= 21) {
1909        /* We're not using the extended API, so n_formats = 0 and that's that */
1910        pa_tagstruct_putu8(reply, 0);
1911    }
1912#else
1913    if (u->version >= 22) {
1914        /* We're not using the extended API, so n_formats = 0 and that's that */
1915        pa_tagstruct_putu8(reply, 0);
1916        pa_cvolume_reset(&volume, u->source->sample_spec.channels);
1917        pa_tagstruct_put_cvolume(reply, &volume);
1918        pa_tagstruct_put_boolean(reply, false); /* muted */
1919        pa_tagstruct_put_boolean(reply, false); /* volume_set */
1920        pa_tagstruct_put_boolean(reply, false); /* muted_set */
1921        pa_tagstruct_put_boolean(reply, false); /* relative volume */
1922        pa_tagstruct_put_boolean(reply, false); /* passthrough stream */
1923    }
1924#endif
1925
1926    pa_pstream_send_tagstruct(u->pstream, reply);
1927    pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1928
1929    pa_log_debug("Connection authenticated, creating stream ...");
1930
1931    return;
1932
1933fail:
1934    unload_module(u->module->userdata);
1935}
1936
1937/* Called from main context */
1938static void pstream_die_callback(pa_pstream *p, void *userdata) {
1939    struct userdata *u = userdata;
1940
1941    pa_assert(p);
1942    pa_assert(u);
1943
1944    pa_log_warn("Stream died.");
1945    unload_module(u->module->userdata);
1946}
1947
1948/* Called from main context */
1949static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data, void *userdata) {
1950    struct userdata *u = userdata;
1951
1952    pa_assert(p);
1953    pa_assert(packet);
1954    pa_assert(u);
1955
1956    if (pa_pdispatch_run(u->pdispatch, packet, ancil_data, u) < 0) {
1957        pa_log("Invalid packet");
1958        unload_module(u->module->userdata);
1959        return;
1960    }
1961}
1962
1963#ifndef TUNNEL_SINK
1964/* Called from main context */
1965static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1966    struct userdata *u = userdata;
1967
1968    pa_assert(p);
1969    pa_assert(chunk);
1970    pa_assert(u);
1971
1972    if (channel != u->channel) {
1973        pa_log("Received memory block on bad channel.");
1974        unload_module(u->module->userdata);
1975        return;
1976    }
1977
1978    pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1979
1980    u->receive_counter += chunk->length;
1981}
1982#endif
1983
1984/* Called from main context */
1985static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1986    struct userdata *u = userdata;
1987
1988    pa_assert_ctl_context();
1989
1990    pa_assert(sc);
1991    pa_assert(u);
1992    pa_assert(u->client == sc);
1993
1994    pa_socket_client_unref(u->client);
1995    u->client = NULL;
1996
1997    if (!io) {
1998        pa_log("Connection failed: %s", pa_cstrerror(errno));
1999        unload_module(u->module->userdata);
2000        return;
2001    }
2002
2003    u->io = io;
2004
2005#ifdef TUNNEL_SINK
2006    create_sink(u);
2007    if (!u->sink) {
2008        unload_module(u->module->userdata);
2009        return;
2010    }
2011    on_sink_created(u);
2012#else
2013    create_source(u);
2014    if (!u->source) {
2015        unload_module(u->module->userdata);
2016        return;
2017    }
2018    on_source_created(u);
2019#endif
2020}
2021
2022#ifdef TUNNEL_SINK
2023static void on_sink_created(struct userdata *u)
2024#else
2025static void on_source_created(struct userdata *u)
2026#endif
2027{
2028    pa_tagstruct *t;
2029    uint32_t tag;
2030
2031    u->pstream = pa_pstream_new(u->core->mainloop, u->io, u->core->mempool);
2032    u->pdispatch = pa_pdispatch_new(u->core->mainloop, true, command_table, PA_COMMAND_MAX);
2033
2034    pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
2035    pa_pstream_set_receive_packet_callback(u->pstream, pstream_packet_callback, u);
2036#ifndef TUNNEL_SINK
2037    pa_pstream_set_receive_memblock_callback(u->pstream, pstream_memblock_callback, u);
2038#endif
2039
2040    t = pa_tagstruct_new();
2041    pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
2042    pa_tagstruct_putu32(t, tag = u->ctag++);
2043    pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
2044
2045    pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
2046
2047#ifdef HAVE_CREDS
2048{
2049    pa_creds ucred;
2050
2051    if (pa_iochannel_creds_supported(u->io))
2052        pa_iochannel_creds_enable(u->io);
2053
2054    ucred.uid = getuid();
2055    ucred.gid = getgid();
2056
2057    pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
2058}
2059#else
2060    pa_pstream_send_tagstruct(u->pstream, t);
2061#endif
2062
2063    pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
2064
2065    pa_log_debug("Connection established, authenticating ...");
2066}
2067
2068#ifdef TUNNEL_SINK
2069
2070/* Called from main context */
2071static void sink_set_volume(pa_sink *sink) {
2072    struct userdata *u;
2073    pa_tagstruct *t;
2074
2075    pa_assert(sink);
2076    u = sink->userdata;
2077    pa_assert(u);
2078
2079    t = pa_tagstruct_new();
2080    pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
2081    pa_tagstruct_putu32(t, u->ctag++);
2082    pa_tagstruct_putu32(t, u->device_index);
2083    pa_tagstruct_put_cvolume(t, &sink->real_volume);
2084    pa_pstream_send_tagstruct(u->pstream, t);
2085}
2086
2087/* Called from main context */
2088static void sink_set_mute(pa_sink *sink) {
2089    struct userdata *u;
2090    pa_tagstruct *t;
2091
2092    pa_assert(sink);
2093    u = sink->userdata;
2094    pa_assert(u);
2095
2096    if (u->version < 11)
2097        return;
2098
2099    t = pa_tagstruct_new();
2100    pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
2101    pa_tagstruct_putu32(t, u->ctag++);
2102    pa_tagstruct_putu32(t, u->device_index);
2103    pa_tagstruct_put_boolean(t, sink->muted);
2104    pa_pstream_send_tagstruct(u->pstream, t);
2105}
2106
2107#endif
2108
2109#ifdef TUNNEL_SINK
2110static void create_sink(struct userdata *u) {
2111    pa_sink_new_data data;
2112    char *data_name = NULL;
2113
2114    if (!(data_name = pa_xstrdup(u->configured_sink_name)))
2115        data_name = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
2116
2117    pa_sink_new_data_init(&data);
2118    data.driver = __FILE__;
2119    data.module = u->module;
2120    data.namereg_fail = false;
2121    pa_sink_new_data_set_name(&data, data_name);
2122    pa_sink_new_data_set_sample_spec(&data, &u->sample_spec);
2123    pa_sink_new_data_set_channel_map(&data, &u->channel_map);
2124    pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
2125    pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2126    if (u->sink_name)
2127        pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
2128
2129    pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, u->sink_proplist);
2130
2131    u->sink = pa_sink_new(u->module->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY);
2132
2133    if (!u->sink) {
2134        pa_log("Failed to create sink.");
2135        goto finish;
2136    }
2137
2138    u->sink->parent.process_msg = sink_process_msg;
2139    u->sink->userdata = u;
2140    u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
2141    pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
2142    pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
2143
2144    u->sink->refresh_volume = u->sink->refresh_muted = false;
2145
2146/*     pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2147
2148    pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
2149    pa_sink_set_rtpoll(u->sink, u->rtpoll);
2150    pa_sink_set_fixed_latency(u->sink, u->latency * PA_USEC_PER_MSEC);
2151
2152    pa_sink_put(u->sink);
2153
2154finish:
2155    pa_sink_new_data_done(&data);
2156    pa_xfree(data_name);
2157}
2158#else
2159static void create_source(struct userdata *u) {
2160    pa_source_new_data data;
2161    char *data_name = NULL;
2162
2163    if (!(data_name = pa_xstrdup(u->configured_source_name)))
2164        data_name = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
2165
2166    pa_source_new_data_init(&data);
2167    data.driver = __FILE__;
2168    data.module = u->module;
2169    data.namereg_fail = false;
2170    pa_source_new_data_set_name(&data, data_name);
2171    pa_source_new_data_set_sample_spec(&data, &u->sample_spec);
2172    pa_source_new_data_set_channel_map(&data, &u->channel_map);
2173    pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
2174    pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2175    if (u->source_name)
2176        pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
2177
2178    pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, u->source_proplist);
2179
2180    u->source = pa_source_new(u->module->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
2181
2182    if (!u->source) {
2183        pa_log("Failed to create source.");
2184        goto finish;
2185    }
2186
2187    u->source->parent.process_msg = source_process_msg;
2188    u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
2189    u->source->userdata = u;
2190
2191/*     pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2192
2193    pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2194    pa_source_set_rtpoll(u->source, u->rtpoll);
2195    pa_source_set_fixed_latency(u->source, u->latency * PA_USEC_PER_MSEC);
2196
2197    u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2198
2199    pa_source_put(u->source);
2200
2201finish:
2202    pa_source_new_data_done(&data);
2203    pa_xfree(data_name);
2204}
2205#endif
2206
2207/* Runs in PA mainloop context */
2208static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
2209    struct userdata *u = (struct userdata *) data;
2210
2211    pa_assert(u);
2212    pa_assert_ctl_context();
2213
2214    if (u->shutting_down)
2215        return 0;
2216
2217    switch (code) {
2218
2219        case TUNNEL_MESSAGE_MAYBE_RESTART:
2220            unload_module(u->module->userdata);
2221            break;
2222    }
2223
2224    return 0;
2225}
2226
2227static int start_connect(struct userdata *u, char *server, bool automatic) {
2228    pa_strlist *server_list = NULL;
2229    int rc = 0;
2230
2231    if (server) {
2232        if (!(server_list = pa_strlist_parse(server))) {
2233            pa_log("Invalid server specified.");
2234            rc = -1;
2235            goto done;
2236        }
2237    } else {
2238        char *ufn;
2239
2240        if (!automatic) {
2241            pa_log("No server specified.");
2242            rc = -1;
2243            goto done;
2244        }
2245
2246        pa_log("No server address found. Attempting default local sockets.");
2247
2248        /* The system wide instance via PF_LOCAL */
2249        server_list = pa_strlist_prepend(server_list, PA_SYSTEM_RUNTIME_PATH PA_PATH_SEP PA_NATIVE_DEFAULT_UNIX_SOCKET);
2250
2251        /* The user instance via PF_LOCAL */
2252        if ((ufn = pa_runtime_path(PA_NATIVE_DEFAULT_UNIX_SOCKET))) {
2253            server_list = pa_strlist_prepend(server_list, ufn);
2254            pa_xfree(ufn);
2255        }
2256    }
2257
2258    for (;;) {
2259        server_list = pa_strlist_pop(server_list, &u->server_name);
2260
2261        if (!u->server_name) {
2262            if (server)
2263                pa_log("Failed to connect to server '%s'", server);
2264            else
2265                pa_log("Failed to connect");
2266            rc = -1;
2267            goto done;
2268        }
2269
2270        pa_log_debug("Trying to connect to %s...", u->server_name);
2271
2272        if (!(u->client = pa_socket_client_new_string(u->module->core->mainloop, true, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
2273            pa_xfree(u->server_name);
2274            u->server_name = NULL;
2275            continue;
2276        }
2277
2278        break;
2279    }
2280
2281    if (u->client)
2282        pa_socket_client_set_callback(u->client, on_connection, u);
2283
2284done:
2285    pa_strlist_free(server_list);
2286
2287    return rc;
2288}
2289
2290static int do_init(pa_module *m) {
2291    pa_modargs *ma = NULL;
2292    struct userdata *u = NULL;
2293    struct module_restart_data *rd;
2294    char *server = NULL;
2295    uint32_t latency_msec;
2296    bool automatic;
2297#ifdef HAVE_X11
2298    xcb_connection_t *xcb = NULL;
2299#endif
2300    const char *cookie_path;
2301    uint32_t reconnect_interval_ms = 0;
2302
2303    pa_assert(m);
2304    pa_assert(m->userdata);
2305
2306    rd = m->userdata;
2307
2308    if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
2309        pa_log("Failed to parse module arguments");
2310        goto fail;
2311    }
2312
2313    rd->userdata = u = pa_xnew0(struct userdata, 1);
2314    u->core = m->core;
2315    u->module = m;
2316    u->client = NULL;
2317    u->pdispatch = NULL;
2318    u->pstream = NULL;
2319    u->server_name = NULL;
2320#ifdef TUNNEL_SINK
2321    u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
2322    u->configured_sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL));
2323    u->sink = NULL;
2324    u->requested_bytes = 0;
2325#else
2326    u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
2327    u->configured_source_name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL));
2328    u->source = NULL;
2329#endif
2330#ifndef USE_SMOOTHER_2
2331    u->smoother = pa_smoother_new(
2332            PA_USEC_PER_SEC,
2333            PA_USEC_PER_SEC*2,
2334            true,
2335            true,
2336            10,
2337            pa_rtclock_now(),
2338            false);
2339#endif
2340    u->ctag = 1;
2341    u->device_index = u->channel = PA_INVALID_INDEX;
2342    u->time_event = NULL;
2343    u->ignore_latency_before = 0;
2344    u->transport_usec = u->thread_transport_usec = 0;
2345    u->remote_suspended = u->remote_corked = false;
2346    u->counter = 0;
2347    u->receive_snapshot = 0;
2348    u->receive_counter = 0;
2349
2350    u->msg = pa_msgobject_new(tunnel_msg);
2351    u->msg->parent.process_msg = tunnel_process_msg;
2352
2353    u->rtpoll = pa_rtpoll_new();
2354
2355    if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
2356        pa_log("pa_thread_mq_init() failed.");
2357        goto fail;
2358    }
2359
2360    automatic = false;
2361    if (pa_modargs_get_value_boolean(ma, "auto", &automatic) < 0) {
2362        pa_log("Failed to parse argument \"auto\".");
2363        goto fail;
2364    }
2365
2366    /* Allow latencies between 5ms and 500ms */
2367    latency_msec = DEFAULT_LATENCY_MSEC;
2368    if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 5 || latency_msec > 500) {
2369        pa_log("Invalid latency specification");
2370        goto fail;
2371    }
2372
2373    u->latency = latency_msec;
2374
2375    cookie_path = pa_modargs_get_value(ma, "cookie", NULL);
2376    server = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL));
2377
2378    if (automatic) {
2379#ifdef HAVE_X11
2380        /* Need an X11 connection to get root properties */
2381        if (getenv("DISPLAY") != NULL) {
2382            if (!(xcb = xcb_connect(getenv("DISPLAY"), NULL)))
2383                pa_log("xcb_connect() failed");
2384            else {
2385                if (xcb_connection_has_error(xcb)) {
2386                    pa_log("xcb_connection_has_error() returned true");
2387                    xcb_disconnect(xcb);
2388                    xcb = NULL;
2389                }
2390            }
2391        }
2392#endif
2393
2394        /* Figure out the cookie the same way a normal client would */
2395        if (!cookie_path)
2396            cookie_path = getenv(ENV_COOKIE_FILE);
2397
2398#ifdef HAVE_X11
2399        if (!cookie_path && xcb) {
2400            char t[1024];
2401            if (pa_x11_get_prop(xcb, 0, "PULSE_COOKIE", t, sizeof(t))) {
2402                uint8_t cookie[PA_NATIVE_COOKIE_LENGTH];
2403
2404                if (pa_parsehex(t, cookie, sizeof(cookie)) != sizeof(cookie))
2405                    pa_log("Failed to parse cookie data");
2406                else {
2407                    if (!(u->auth_cookie = pa_auth_cookie_create(u->core, cookie, sizeof(cookie))))
2408                        goto fail;
2409                }
2410            }
2411        }
2412#endif
2413
2414        /* Same thing for the server name */
2415        if (!server)
2416            server = pa_xstrdup(getenv(ENV_DEFAULT_SERVER));
2417
2418#ifdef HAVE_X11
2419        if (!server && xcb) {
2420            char t[1024];
2421            if (pa_x11_get_prop(xcb, 0, "PULSE_SERVER", t, sizeof(t)))
2422                server = pa_xstrdup(t);
2423        }
2424#endif
2425
2426        /* Also determine the default sink/source on the other server */
2427#ifdef TUNNEL_SINK
2428        if (!u->sink_name)
2429            u->sink_name = pa_xstrdup(getenv(ENV_DEFAULT_SINK));
2430
2431#ifdef HAVE_X11
2432        if (!u->sink_name && xcb) {
2433            char t[1024];
2434            if (pa_x11_get_prop(xcb, 0, "PULSE_SINK", t, sizeof(t)))
2435                u->sink_name = pa_xstrdup(t);
2436        }
2437#endif
2438#else
2439        if (!u->source_name)
2440            u->source_name = pa_xstrdup(getenv(ENV_DEFAULT_SOURCE));
2441
2442#ifdef HAVE_X11
2443        if (!u->source_name && xcb) {
2444            char t[1024];
2445            if (pa_x11_get_prop(xcb, 0, "PULSE_SOURCE", t, sizeof(t)))
2446                u->source_name = pa_xstrdup(t);
2447        }
2448#endif
2449#endif
2450    }
2451
2452    if (!cookie_path && !u->auth_cookie)
2453        cookie_path = PA_NATIVE_COOKIE_FILE;
2454
2455    if (cookie_path) {
2456        if (!(u->auth_cookie = pa_auth_cookie_get(u->core, cookie_path, true, PA_NATIVE_COOKIE_LENGTH)))
2457            goto fail;
2458    }
2459
2460    u->sample_spec = m->core->default_sample_spec;
2461    u->channel_map = m->core->default_channel_map;
2462    if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
2463        pa_log("Invalid sample format specification");
2464        goto fail;
2465    }
2466
2467#ifdef USE_SMOOTHER_2
2468    /* Smoother window must be larger than time between updates. */
2469    u->smoother = pa_smoother_2_new(LATENCY_INTERVAL + 5*PA_USEC_PER_SEC, pa_rtclock_now(), pa_frame_size(&u->sample_spec), u->sample_spec.rate);
2470#endif
2471
2472    pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms);
2473    u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC;
2474
2475#ifdef TUNNEL_SINK
2476
2477    u->sink_proplist = pa_proplist_new();
2478    if (pa_modargs_get_proplist(ma, "sink_properties", u->sink_proplist, PA_UPDATE_REPLACE) < 0) {
2479        pa_log("Invalid properties");
2480        goto fail;
2481    }
2482
2483#else
2484
2485    u->source_proplist = pa_proplist_new();
2486    if (pa_modargs_get_proplist(ma, "source_properties", u->source_proplist, PA_UPDATE_REPLACE) < 0) {
2487        pa_log("Invalid properties");
2488        goto fail;
2489    }
2490
2491#endif
2492
2493    u->time_event = NULL;
2494
2495    u->maxlength = (uint32_t) -1;
2496#ifdef TUNNEL_SINK
2497    u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2498#else
2499    u->fragsize = (uint32_t) -1;
2500#endif
2501
2502    if (start_connect(u, server, automatic) < 0) {
2503        goto fail;
2504    }
2505
2506    if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
2507        pa_log("Failed to create thread.");
2508        goto fail;
2509    }
2510
2511    if (server)
2512        pa_xfree(server);
2513
2514#ifdef HAVE_X11
2515    if (xcb)
2516        xcb_disconnect(xcb);
2517#endif
2518
2519    /* If the module is restarting and do_init() finishes successfully, the
2520     * restart data is no longer needed. If do_init() fails, don't touch the
2521     * restart data, because following restart attempts will continue to use
2522     * the same data. If restart_data is NULL, that means no restart is
2523     * currently pending. */
2524    if (rd->restart_data) {
2525        pa_restart_free(rd->restart_data);
2526        rd->restart_data = NULL;
2527    }
2528
2529    pa_modargs_free(ma);
2530
2531    return 0;
2532
2533fail:
2534    if (server)
2535        pa_xfree(server);
2536
2537#ifdef HAVE_X11
2538    if (xcb)
2539        xcb_disconnect(xcb);
2540#endif
2541
2542    if (ma)
2543        pa_modargs_free(ma);
2544
2545    return -1;
2546}
2547
2548static void do_done(pa_module *m) {
2549    struct userdata *u = NULL;
2550    struct module_restart_data *rd;
2551
2552    pa_assert(m);
2553
2554    if (!(rd = m->userdata))
2555        return;
2556    if (!(u = rd->userdata))
2557        return;
2558
2559    u->shutting_down = true;
2560
2561#ifdef TUNNEL_SINK
2562    if (u->sink)
2563        pa_sink_unlink(u->sink);
2564#else
2565    if (u->source)
2566        pa_source_unlink(u->source);
2567#endif
2568
2569    if (u->thread) {
2570        pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2571        pa_thread_free(u->thread);
2572    }
2573
2574    pa_thread_mq_done(&u->thread_mq);
2575
2576#ifdef TUNNEL_SINK
2577    if (u->sink)
2578        pa_sink_unref(u->sink);
2579#else
2580    if (u->source)
2581        pa_source_unref(u->source);
2582#endif
2583
2584    if (u->rtpoll)
2585        pa_rtpoll_free(u->rtpoll);
2586
2587    if (u->pstream) {
2588        pa_pstream_unlink(u->pstream);
2589        pa_pstream_unref(u->pstream);
2590    }
2591
2592    if (u->pdispatch)
2593        pa_pdispatch_unref(u->pdispatch);
2594
2595    if (u->client)
2596        pa_socket_client_unref(u->client);
2597
2598    if (u->auth_cookie)
2599        pa_auth_cookie_unref(u->auth_cookie);
2600
2601    if (u->smoother)
2602#ifdef USE_SMOOTHER_2
2603        pa_smoother_2_free(u->smoother);
2604#else
2605        pa_smoother_free(u->smoother);
2606#endif
2607
2608    if (u->time_event)
2609        u->core->mainloop->time_free(u->time_event);
2610
2611#ifndef TUNNEL_SINK
2612    if (u->mcalign)
2613        pa_mcalign_free(u->mcalign);
2614#endif
2615
2616#ifdef TUNNEL_SINK
2617    pa_xfree(u->sink_name);
2618    pa_xfree(u->configured_sink_name);
2619    pa_proplist_free(u->sink_proplist);
2620#else
2621    pa_xfree(u->source_name);
2622    pa_xfree(u->configured_source_name);
2623    pa_proplist_free(u->source_proplist);
2624#endif
2625    pa_xfree(u->server_name);
2626
2627    pa_xfree(u->device_description);
2628    pa_xfree(u->server_fqdn);
2629    pa_xfree(u->user_name);
2630
2631    pa_xfree(u->msg);
2632
2633    pa_xfree(u);
2634
2635    rd->userdata = NULL;
2636}
2637
2638int pa__init(pa_module *m) {
2639    int ret;
2640
2641    pa_assert(m);
2642
2643    m->userdata = pa_xnew0(struct module_restart_data, 1);
2644
2645    ret = do_init(m);
2646
2647    if (ret < 0)
2648        pa__done(m);
2649
2650    return ret;
2651}
2652
2653void pa__done(pa_module *m) {
2654    pa_assert(m);
2655
2656    do_done(m);
2657
2658    if (m->userdata) {
2659        struct module_restart_data *rd = m->userdata;
2660
2661        if (rd->restart_data)
2662            pa_restart_free(rd->restart_data);
2663
2664        pa_xfree(m->userdata);
2665    }
2666}
2667