153a5a1b3Sopenharmony_ci/***
253a5a1b3Sopenharmony_ci    This file is part of PulseAudio.
353a5a1b3Sopenharmony_ci
453a5a1b3Sopenharmony_ci    Copyright 2013 Alexander Couzens
553a5a1b3Sopenharmony_ci
653a5a1b3Sopenharmony_ci    PulseAudio is free software; you can redistribute it and/or modify
753a5a1b3Sopenharmony_ci    it under the terms of the GNU Lesser General Public License as published
853a5a1b3Sopenharmony_ci    by the Free Software Foundation; either version 2.1 of the License,
953a5a1b3Sopenharmony_ci    or (at your option) any later version.
1053a5a1b3Sopenharmony_ci
1153a5a1b3Sopenharmony_ci    PulseAudio is distributed in the hope that it will be useful, but
1253a5a1b3Sopenharmony_ci    WITHOUT ANY WARRANTY; without even the implied warranty of
1353a5a1b3Sopenharmony_ci    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1453a5a1b3Sopenharmony_ci    General Public License for more details.
1553a5a1b3Sopenharmony_ci
1653a5a1b3Sopenharmony_ci    You should have received a copy of the GNU Lesser General Public License
1753a5a1b3Sopenharmony_ci    along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
1853a5a1b3Sopenharmony_ci***/
1953a5a1b3Sopenharmony_ci
2053a5a1b3Sopenharmony_ci#ifdef HAVE_CONFIG_H
2153a5a1b3Sopenharmony_ci#include <config.h>
2253a5a1b3Sopenharmony_ci#endif
2353a5a1b3Sopenharmony_ci
2453a5a1b3Sopenharmony_ci#include "restart-module.h"
2553a5a1b3Sopenharmony_ci
2653a5a1b3Sopenharmony_ci#include <pulse/context.h>
2753a5a1b3Sopenharmony_ci#include <pulse/timeval.h>
2853a5a1b3Sopenharmony_ci#include <pulse/xmalloc.h>
2953a5a1b3Sopenharmony_ci#include <pulse/stream.h>
3053a5a1b3Sopenharmony_ci#include <pulse/mainloop.h>
3153a5a1b3Sopenharmony_ci#include <pulse/introspect.h>
3253a5a1b3Sopenharmony_ci#include <pulse/error.h>
3353a5a1b3Sopenharmony_ci
3453a5a1b3Sopenharmony_ci#include <pulsecore/core.h>
3553a5a1b3Sopenharmony_ci#include <pulsecore/core-util.h>
3653a5a1b3Sopenharmony_ci#include <pulsecore/i18n.h>
3753a5a1b3Sopenharmony_ci#include <pulsecore/sink.h>
3853a5a1b3Sopenharmony_ci#include <pulsecore/modargs.h>
3953a5a1b3Sopenharmony_ci#include <pulsecore/log.h>
4053a5a1b3Sopenharmony_ci#include <pulsecore/thread.h>
4153a5a1b3Sopenharmony_ci#include <pulsecore/thread-mq.h>
4253a5a1b3Sopenharmony_ci#include <pulsecore/poll.h>
4353a5a1b3Sopenharmony_ci#include <pulsecore/rtpoll.h>
4453a5a1b3Sopenharmony_ci#include <pulsecore/proplist-util.h>
4553a5a1b3Sopenharmony_ci
4653a5a1b3Sopenharmony_ciPA_MODULE_AUTHOR("Alexander Couzens");
4753a5a1b3Sopenharmony_ciPA_MODULE_DESCRIPTION("Create a network sink which connects via a stream to a remote PulseAudio server");
4853a5a1b3Sopenharmony_ciPA_MODULE_VERSION(PACKAGE_VERSION);
4953a5a1b3Sopenharmony_ciPA_MODULE_LOAD_ONCE(false);
5053a5a1b3Sopenharmony_ciPA_MODULE_USAGE(
5153a5a1b3Sopenharmony_ci        "server=<address> "
5253a5a1b3Sopenharmony_ci        "sink=<name of the remote sink> "
5353a5a1b3Sopenharmony_ci        "sink_name=<name for the local sink> "
5453a5a1b3Sopenharmony_ci        "sink_properties=<properties for the local sink> "
5553a5a1b3Sopenharmony_ci        "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
5653a5a1b3Sopenharmony_ci        "format=<sample format> "
5753a5a1b3Sopenharmony_ci        "channels=<number of channels> "
5853a5a1b3Sopenharmony_ci        "rate=<sample rate> "
5953a5a1b3Sopenharmony_ci        "channel_map=<channel map> "
6053a5a1b3Sopenharmony_ci        "cookie=<cookie file path>"
6153a5a1b3Sopenharmony_ci        );
6253a5a1b3Sopenharmony_ci
6353a5a1b3Sopenharmony_ci#define MAX_LATENCY_USEC (200 * PA_USEC_PER_MSEC)
6453a5a1b3Sopenharmony_ci#define TUNNEL_THREAD_FAILED_MAINLOOP 1
6553a5a1b3Sopenharmony_ci
6653a5a1b3Sopenharmony_cistatic int do_init(pa_module *m);
6753a5a1b3Sopenharmony_cistatic void do_done(pa_module *m);
6853a5a1b3Sopenharmony_cistatic void stream_state_cb(pa_stream *stream, void *userdata);
6953a5a1b3Sopenharmony_cistatic void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata);
7053a5a1b3Sopenharmony_cistatic void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata);
7153a5a1b3Sopenharmony_cistatic void context_state_cb(pa_context *c, void *userdata);
7253a5a1b3Sopenharmony_cistatic void sink_update_requested_latency_cb(pa_sink *s);
7353a5a1b3Sopenharmony_ci
7453a5a1b3Sopenharmony_cistruct tunnel_msg {
7553a5a1b3Sopenharmony_ci    pa_msgobject parent;
7653a5a1b3Sopenharmony_ci};
7753a5a1b3Sopenharmony_ci
7853a5a1b3Sopenharmony_citypedef struct tunnel_msg tunnel_msg;
7953a5a1b3Sopenharmony_ciPA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
8053a5a1b3Sopenharmony_ci
8153a5a1b3Sopenharmony_cienum {
8253a5a1b3Sopenharmony_ci    TUNNEL_MESSAGE_CREATE_SINK_REQUEST,
8353a5a1b3Sopenharmony_ci    TUNNEL_MESSAGE_MAYBE_RESTART,
8453a5a1b3Sopenharmony_ci};
8553a5a1b3Sopenharmony_ci
8653a5a1b3Sopenharmony_cienum {
8753a5a1b3Sopenharmony_ci    TUNNEL_MESSAGE_SINK_CREATED = PA_SINK_MESSAGE_MAX,
8853a5a1b3Sopenharmony_ci};
8953a5a1b3Sopenharmony_ci
9053a5a1b3Sopenharmony_cistruct userdata {
9153a5a1b3Sopenharmony_ci    pa_module *module;
9253a5a1b3Sopenharmony_ci    pa_sink *sink;
9353a5a1b3Sopenharmony_ci    pa_thread *thread;
9453a5a1b3Sopenharmony_ci    pa_thread_mq *thread_mq;
9553a5a1b3Sopenharmony_ci    pa_mainloop *thread_mainloop;
9653a5a1b3Sopenharmony_ci    pa_mainloop_api *thread_mainloop_api;
9753a5a1b3Sopenharmony_ci
9853a5a1b3Sopenharmony_ci    pa_context *context;
9953a5a1b3Sopenharmony_ci    pa_stream *stream;
10053a5a1b3Sopenharmony_ci    pa_rtpoll *rtpoll;
10153a5a1b3Sopenharmony_ci
10253a5a1b3Sopenharmony_ci    bool update_stream_bufferattr_after_connect;
10353a5a1b3Sopenharmony_ci
10453a5a1b3Sopenharmony_ci    bool connected;
10553a5a1b3Sopenharmony_ci    bool shutting_down;
10653a5a1b3Sopenharmony_ci
10753a5a1b3Sopenharmony_ci    char *cookie_file;
10853a5a1b3Sopenharmony_ci    char *remote_server;
10953a5a1b3Sopenharmony_ci    char *remote_sink_name;
11053a5a1b3Sopenharmony_ci    char *sink_name;
11153a5a1b3Sopenharmony_ci
11253a5a1b3Sopenharmony_ci    pa_proplist *sink_proplist;
11353a5a1b3Sopenharmony_ci    pa_sample_spec sample_spec;
11453a5a1b3Sopenharmony_ci    pa_channel_map channel_map;
11553a5a1b3Sopenharmony_ci
11653a5a1b3Sopenharmony_ci    tunnel_msg *msg;
11753a5a1b3Sopenharmony_ci
11853a5a1b3Sopenharmony_ci    pa_usec_t reconnect_interval_us;
11953a5a1b3Sopenharmony_ci};
12053a5a1b3Sopenharmony_ci
12153a5a1b3Sopenharmony_cistruct module_restart_data {
12253a5a1b3Sopenharmony_ci    struct userdata *userdata;
12353a5a1b3Sopenharmony_ci    pa_restart_data *restart_data;
12453a5a1b3Sopenharmony_ci};
12553a5a1b3Sopenharmony_ci
12653a5a1b3Sopenharmony_cistatic const char* const valid_modargs[] = {
12753a5a1b3Sopenharmony_ci    "sink_name",
12853a5a1b3Sopenharmony_ci    "sink_properties",
12953a5a1b3Sopenharmony_ci    "server",
13053a5a1b3Sopenharmony_ci    "sink",
13153a5a1b3Sopenharmony_ci    "format",
13253a5a1b3Sopenharmony_ci    "channels",
13353a5a1b3Sopenharmony_ci    "rate",
13453a5a1b3Sopenharmony_ci    "channel_map",
13553a5a1b3Sopenharmony_ci    "cookie",
13653a5a1b3Sopenharmony_ci    "reconnect_interval_ms",
13753a5a1b3Sopenharmony_ci    NULL,
13853a5a1b3Sopenharmony_ci};
13953a5a1b3Sopenharmony_ci
14053a5a1b3Sopenharmony_cistatic void cork_stream(struct userdata *u, bool cork) {
14153a5a1b3Sopenharmony_ci    pa_operation *operation;
14253a5a1b3Sopenharmony_ci
14353a5a1b3Sopenharmony_ci    pa_assert(u);
14453a5a1b3Sopenharmony_ci    pa_assert(u->stream);
14553a5a1b3Sopenharmony_ci
14653a5a1b3Sopenharmony_ci    if (cork) {
14753a5a1b3Sopenharmony_ci        /* When the sink becomes suspended (which is the only case where we
14853a5a1b3Sopenharmony_ci         * cork the stream), we don't want to keep any old data around, because
14953a5a1b3Sopenharmony_ci         * the old data is most likely unrelated to the audio that will be
15053a5a1b3Sopenharmony_ci         * played at the time when the sink starts running again. */
15153a5a1b3Sopenharmony_ci        if ((operation = pa_stream_flush(u->stream, NULL, NULL)))
15253a5a1b3Sopenharmony_ci            pa_operation_unref(operation);
15353a5a1b3Sopenharmony_ci    }
15453a5a1b3Sopenharmony_ci
15553a5a1b3Sopenharmony_ci    if ((operation = pa_stream_cork(u->stream, cork, NULL, NULL)))
15653a5a1b3Sopenharmony_ci        pa_operation_unref(operation);
15753a5a1b3Sopenharmony_ci}
15853a5a1b3Sopenharmony_ci
15953a5a1b3Sopenharmony_cistatic void reset_bufferattr(pa_buffer_attr *bufferattr) {
16053a5a1b3Sopenharmony_ci    pa_assert(bufferattr);
16153a5a1b3Sopenharmony_ci    bufferattr->fragsize = (uint32_t) -1;
16253a5a1b3Sopenharmony_ci    bufferattr->minreq = (uint32_t) -1;
16353a5a1b3Sopenharmony_ci    bufferattr->maxlength = (uint32_t) -1;
16453a5a1b3Sopenharmony_ci    bufferattr->prebuf = (uint32_t) -1;
16553a5a1b3Sopenharmony_ci    bufferattr->tlength = (uint32_t) -1;
16653a5a1b3Sopenharmony_ci}
16753a5a1b3Sopenharmony_ci
16853a5a1b3Sopenharmony_cistatic pa_proplist* tunnel_new_proplist(struct userdata *u) {
16953a5a1b3Sopenharmony_ci    pa_proplist *proplist = pa_proplist_new();
17053a5a1b3Sopenharmony_ci    pa_assert(proplist);
17153a5a1b3Sopenharmony_ci    pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio");
17253a5a1b3Sopenharmony_ci    pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
17353a5a1b3Sopenharmony_ci    pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
17453a5a1b3Sopenharmony_ci    pa_init_proplist(proplist);
17553a5a1b3Sopenharmony_ci
17653a5a1b3Sopenharmony_ci    return proplist;
17753a5a1b3Sopenharmony_ci}
17853a5a1b3Sopenharmony_ci
17953a5a1b3Sopenharmony_cistatic void thread_func(void *userdata) {
18053a5a1b3Sopenharmony_ci    struct userdata *u = userdata;
18153a5a1b3Sopenharmony_ci    pa_proplist *proplist;
18253a5a1b3Sopenharmony_ci
18353a5a1b3Sopenharmony_ci    pa_assert(u);
18453a5a1b3Sopenharmony_ci
18553a5a1b3Sopenharmony_ci    pa_log_debug("Thread starting up");
18653a5a1b3Sopenharmony_ci    pa_thread_mq_install(u->thread_mq);
18753a5a1b3Sopenharmony_ci
18853a5a1b3Sopenharmony_ci    proplist = tunnel_new_proplist(u);
18953a5a1b3Sopenharmony_ci    u->context = pa_context_new_with_proplist(u->thread_mainloop_api,
19053a5a1b3Sopenharmony_ci                                              "PulseAudio",
19153a5a1b3Sopenharmony_ci                                              proplist);
19253a5a1b3Sopenharmony_ci    pa_proplist_free(proplist);
19353a5a1b3Sopenharmony_ci
19453a5a1b3Sopenharmony_ci    if (!u->context) {
19553a5a1b3Sopenharmony_ci        pa_log("Failed to create libpulse context");
19653a5a1b3Sopenharmony_ci        goto fail;
19753a5a1b3Sopenharmony_ci    }
19853a5a1b3Sopenharmony_ci
19953a5a1b3Sopenharmony_ci    if (u->cookie_file && pa_context_load_cookie_from_file(u->context, u->cookie_file) != 0) {
20053a5a1b3Sopenharmony_ci        pa_log_error("Can not load cookie file!");
20153a5a1b3Sopenharmony_ci        goto fail;
20253a5a1b3Sopenharmony_ci    }
20353a5a1b3Sopenharmony_ci
20453a5a1b3Sopenharmony_ci    pa_context_set_state_callback(u->context, context_state_cb, u);
20553a5a1b3Sopenharmony_ci    if (pa_context_connect(u->context,
20653a5a1b3Sopenharmony_ci                           u->remote_server,
20753a5a1b3Sopenharmony_ci                           PA_CONTEXT_NOAUTOSPAWN,
20853a5a1b3Sopenharmony_ci                           NULL) < 0) {
20953a5a1b3Sopenharmony_ci        pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context)));
21053a5a1b3Sopenharmony_ci        goto fail;
21153a5a1b3Sopenharmony_ci    }
21253a5a1b3Sopenharmony_ci
21353a5a1b3Sopenharmony_ci    for (;;) {
21453a5a1b3Sopenharmony_ci        int ret;
21553a5a1b3Sopenharmony_ci
21653a5a1b3Sopenharmony_ci        if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) {
21753a5a1b3Sopenharmony_ci            if (ret == 0)
21853a5a1b3Sopenharmony_ci                goto finish;
21953a5a1b3Sopenharmony_ci            else
22053a5a1b3Sopenharmony_ci                goto fail;
22153a5a1b3Sopenharmony_ci        }
22253a5a1b3Sopenharmony_ci
22353a5a1b3Sopenharmony_ci        if (u->sink && PA_UNLIKELY(u->sink->thread_info.rewind_requested))
22453a5a1b3Sopenharmony_ci            pa_sink_process_rewind(u->sink, 0);
22553a5a1b3Sopenharmony_ci
22653a5a1b3Sopenharmony_ci        if (u->connected &&
22753a5a1b3Sopenharmony_ci                pa_stream_get_state(u->stream) == PA_STREAM_READY &&
22853a5a1b3Sopenharmony_ci                PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
22953a5a1b3Sopenharmony_ci            size_t writable;
23053a5a1b3Sopenharmony_ci
23153a5a1b3Sopenharmony_ci            writable = pa_stream_writable_size(u->stream);
23253a5a1b3Sopenharmony_ci            if (writable > 0) {
23353a5a1b3Sopenharmony_ci                pa_memchunk memchunk;
23453a5a1b3Sopenharmony_ci                const void *p;
23553a5a1b3Sopenharmony_ci
23653a5a1b3Sopenharmony_ci                pa_sink_render_full(u->sink, writable, &memchunk);
23753a5a1b3Sopenharmony_ci
23853a5a1b3Sopenharmony_ci                pa_assert(memchunk.length > 0);
23953a5a1b3Sopenharmony_ci
24053a5a1b3Sopenharmony_ci                /* we have new data to write */
24153a5a1b3Sopenharmony_ci                p = pa_memblock_acquire(memchunk.memblock);
24253a5a1b3Sopenharmony_ci                /* TODO: Use pa_stream_begin_write() to reduce copying. */
24353a5a1b3Sopenharmony_ci                ret = pa_stream_write(u->stream,
24453a5a1b3Sopenharmony_ci                                      (uint8_t*) p + memchunk.index,
24553a5a1b3Sopenharmony_ci                                      memchunk.length,
24653a5a1b3Sopenharmony_ci                                      NULL,     /**< A cleanup routine for the data or NULL to request an internal copy */
24753a5a1b3Sopenharmony_ci                                      0,        /** offset */
24853a5a1b3Sopenharmony_ci                                      PA_SEEK_RELATIVE);
24953a5a1b3Sopenharmony_ci                pa_memblock_release(memchunk.memblock);
25053a5a1b3Sopenharmony_ci                pa_memblock_unref(memchunk.memblock);
25153a5a1b3Sopenharmony_ci
25253a5a1b3Sopenharmony_ci                if (ret != 0) {
25353a5a1b3Sopenharmony_ci                    pa_log_error("Could not write data into the stream ... ret = %i", ret);
25453a5a1b3Sopenharmony_ci                    u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
25553a5a1b3Sopenharmony_ci                }
25653a5a1b3Sopenharmony_ci
25753a5a1b3Sopenharmony_ci            }
25853a5a1b3Sopenharmony_ci        }
25953a5a1b3Sopenharmony_ci    }
26053a5a1b3Sopenharmony_cifail:
26153a5a1b3Sopenharmony_ci    /* send a message to the ctl thread to ask it to either terminate us, or
26253a5a1b3Sopenharmony_ci     * restart us, but either way this thread will exit, so then wait for the
26353a5a1b3Sopenharmony_ci     * shutdown message */
26453a5a1b3Sopenharmony_ci    pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL);
26553a5a1b3Sopenharmony_ci    pa_asyncmsgq_wait_for(u->thread_mq->inq, PA_MESSAGE_SHUTDOWN);
26653a5a1b3Sopenharmony_ci
26753a5a1b3Sopenharmony_cifinish:
26853a5a1b3Sopenharmony_ci    if (u->stream) {
26953a5a1b3Sopenharmony_ci        pa_stream_disconnect(u->stream);
27053a5a1b3Sopenharmony_ci        pa_stream_unref(u->stream);
27153a5a1b3Sopenharmony_ci        u->stream = NULL;
27253a5a1b3Sopenharmony_ci    }
27353a5a1b3Sopenharmony_ci
27453a5a1b3Sopenharmony_ci    if (u->context) {
27553a5a1b3Sopenharmony_ci        pa_context_disconnect(u->context);
27653a5a1b3Sopenharmony_ci        pa_context_unref(u->context);
27753a5a1b3Sopenharmony_ci        u->context = NULL;
27853a5a1b3Sopenharmony_ci    }
27953a5a1b3Sopenharmony_ci
28053a5a1b3Sopenharmony_ci    pa_log_debug("Thread shutting down");
28153a5a1b3Sopenharmony_ci}
28253a5a1b3Sopenharmony_ci
28353a5a1b3Sopenharmony_cistatic void stream_state_cb(pa_stream *stream, void *userdata) {
28453a5a1b3Sopenharmony_ci    struct userdata *u = userdata;
28553a5a1b3Sopenharmony_ci
28653a5a1b3Sopenharmony_ci    pa_assert(u);
28753a5a1b3Sopenharmony_ci
28853a5a1b3Sopenharmony_ci    switch (pa_stream_get_state(stream)) {
28953a5a1b3Sopenharmony_ci        case PA_STREAM_FAILED:
29053a5a1b3Sopenharmony_ci            pa_log_error("Stream failed.");
29153a5a1b3Sopenharmony_ci            u->connected = false;
29253a5a1b3Sopenharmony_ci            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
29353a5a1b3Sopenharmony_ci            break;
29453a5a1b3Sopenharmony_ci        case PA_STREAM_TERMINATED:
29553a5a1b3Sopenharmony_ci            pa_log_debug("Stream terminated.");
29653a5a1b3Sopenharmony_ci            break;
29753a5a1b3Sopenharmony_ci        case PA_STREAM_READY:
29853a5a1b3Sopenharmony_ci            if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
29953a5a1b3Sopenharmony_ci                cork_stream(u, false);
30053a5a1b3Sopenharmony_ci
30153a5a1b3Sopenharmony_ci            /* Only call our requested_latency_cb when requested_latency
30253a5a1b3Sopenharmony_ci             * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
30353a5a1b3Sopenharmony_ci             * we don't want to override the initial tlength set by the server
30453a5a1b3Sopenharmony_ci             * without a good reason. */
30553a5a1b3Sopenharmony_ci            if (u->update_stream_bufferattr_after_connect)
30653a5a1b3Sopenharmony_ci                sink_update_requested_latency_cb(u->sink);
30753a5a1b3Sopenharmony_ci            else
30853a5a1b3Sopenharmony_ci                stream_changed_buffer_attr_cb(stream, userdata);
30953a5a1b3Sopenharmony_ci        case PA_STREAM_CREATING:
31053a5a1b3Sopenharmony_ci        case PA_STREAM_UNCONNECTED:
31153a5a1b3Sopenharmony_ci            break;
31253a5a1b3Sopenharmony_ci    }
31353a5a1b3Sopenharmony_ci}
31453a5a1b3Sopenharmony_ci
31553a5a1b3Sopenharmony_ci/* called when remote server changes the stream buffer_attr */
31653a5a1b3Sopenharmony_cistatic void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata) {
31753a5a1b3Sopenharmony_ci    struct userdata *u = userdata;
31853a5a1b3Sopenharmony_ci    const pa_buffer_attr *bufferattr;
31953a5a1b3Sopenharmony_ci    pa_assert(u);
32053a5a1b3Sopenharmony_ci
32153a5a1b3Sopenharmony_ci    bufferattr = pa_stream_get_buffer_attr(u->stream);
32253a5a1b3Sopenharmony_ci    pa_sink_set_max_request_within_thread(u->sink, bufferattr->tlength);
32353a5a1b3Sopenharmony_ci
32453a5a1b3Sopenharmony_ci    pa_log_debug("Server reports buffer attrs changed. tlength now at %lu.",
32553a5a1b3Sopenharmony_ci                 (unsigned long) bufferattr->tlength);
32653a5a1b3Sopenharmony_ci}
32753a5a1b3Sopenharmony_ci
32853a5a1b3Sopenharmony_ci/* called after we requested a change of the stream buffer_attr */
32953a5a1b3Sopenharmony_cistatic void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata) {
33053a5a1b3Sopenharmony_ci    stream_changed_buffer_attr_cb(stream, userdata);
33153a5a1b3Sopenharmony_ci}
33253a5a1b3Sopenharmony_ci
33353a5a1b3Sopenharmony_ci/* called when the server experiences an underrun of our buffer */
33453a5a1b3Sopenharmony_cistatic void stream_underflow_callback(pa_stream *stream, void *userdata) {
33553a5a1b3Sopenharmony_ci    pa_log_info("Server signalled buffer underrun.");
33653a5a1b3Sopenharmony_ci}
33753a5a1b3Sopenharmony_ci
33853a5a1b3Sopenharmony_ci/* called when the server experiences an overrun of our buffer */
33953a5a1b3Sopenharmony_cistatic void stream_overflow_callback(pa_stream *stream, void *userdata) {
34053a5a1b3Sopenharmony_ci    pa_log_info("Server signalled buffer overrun.");
34153a5a1b3Sopenharmony_ci}
34253a5a1b3Sopenharmony_ci
34353a5a1b3Sopenharmony_ci/* Do a reinit of the module.  Note that u will be freed as a result of this
34453a5a1b3Sopenharmony_ci * call. */
34553a5a1b3Sopenharmony_cistatic void maybe_restart(struct module_restart_data *rd) {
34653a5a1b3Sopenharmony_ci    struct userdata *u = rd->userdata;
34753a5a1b3Sopenharmony_ci
34853a5a1b3Sopenharmony_ci    if (rd->restart_data) {
34953a5a1b3Sopenharmony_ci        pa_log_debug("Restart already pending");
35053a5a1b3Sopenharmony_ci        return;
35153a5a1b3Sopenharmony_ci    }
35253a5a1b3Sopenharmony_ci
35353a5a1b3Sopenharmony_ci    if (u->reconnect_interval_us > 0) {
35453a5a1b3Sopenharmony_ci        /* The handle returned here must be freed when do_init() finishes successfully
35553a5a1b3Sopenharmony_ci         * and when the module exits. */
35653a5a1b3Sopenharmony_ci        rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us);
35753a5a1b3Sopenharmony_ci    } else {
35853a5a1b3Sopenharmony_ci        /* exit the module */
35953a5a1b3Sopenharmony_ci        pa_module_unload_request(u->module, true);
36053a5a1b3Sopenharmony_ci    }
36153a5a1b3Sopenharmony_ci}
36253a5a1b3Sopenharmony_ci
36353a5a1b3Sopenharmony_cistatic void on_sink_created(struct userdata *u) {
36453a5a1b3Sopenharmony_ci    pa_proplist *proplist;
36553a5a1b3Sopenharmony_ci    pa_buffer_attr bufferattr;
36653a5a1b3Sopenharmony_ci    pa_usec_t requested_latency;
36753a5a1b3Sopenharmony_ci    char *username = pa_get_user_name_malloc();
36853a5a1b3Sopenharmony_ci    char *hostname = pa_get_host_name_malloc();
36953a5a1b3Sopenharmony_ci    /* TODO: old tunnel put here the remote sink_name into stream name e.g. 'Null Output for lynxis@lazus' */
37053a5a1b3Sopenharmony_ci    char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
37153a5a1b3Sopenharmony_ci    pa_xfree(hostname);
37253a5a1b3Sopenharmony_ci    pa_xfree(username);
37353a5a1b3Sopenharmony_ci
37453a5a1b3Sopenharmony_ci    pa_assert_io_context();
37553a5a1b3Sopenharmony_ci
37653a5a1b3Sopenharmony_ci    /* if we still don't have a sink, then sink creation failed, and we should
37753a5a1b3Sopenharmony_ci     * kill this io thread */
37853a5a1b3Sopenharmony_ci    if (!u->sink) {
37953a5a1b3Sopenharmony_ci        pa_log_error("Could not create a sink.");
38053a5a1b3Sopenharmony_ci        u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
38153a5a1b3Sopenharmony_ci        return;
38253a5a1b3Sopenharmony_ci    }
38353a5a1b3Sopenharmony_ci
38453a5a1b3Sopenharmony_ci    proplist = tunnel_new_proplist(u);
38553a5a1b3Sopenharmony_ci    u->stream = pa_stream_new_with_proplist(u->context,
38653a5a1b3Sopenharmony_ci                                            stream_name,
38753a5a1b3Sopenharmony_ci                                            &u->sink->sample_spec,
38853a5a1b3Sopenharmony_ci                                            &u->sink->channel_map,
38953a5a1b3Sopenharmony_ci                                            proplist);
39053a5a1b3Sopenharmony_ci    pa_proplist_free(proplist);
39153a5a1b3Sopenharmony_ci    pa_xfree(stream_name);
39253a5a1b3Sopenharmony_ci
39353a5a1b3Sopenharmony_ci    if (!u->stream) {
39453a5a1b3Sopenharmony_ci        pa_log_error("Could not create a stream.");
39553a5a1b3Sopenharmony_ci        u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
39653a5a1b3Sopenharmony_ci        return;
39753a5a1b3Sopenharmony_ci    }
39853a5a1b3Sopenharmony_ci
39953a5a1b3Sopenharmony_ci    requested_latency = pa_sink_get_requested_latency_within_thread(u->sink);
40053a5a1b3Sopenharmony_ci    if (requested_latency == (pa_usec_t) -1)
40153a5a1b3Sopenharmony_ci        requested_latency = u->sink->thread_info.max_latency;
40253a5a1b3Sopenharmony_ci
40353a5a1b3Sopenharmony_ci    reset_bufferattr(&bufferattr);
40453a5a1b3Sopenharmony_ci    bufferattr.tlength = pa_usec_to_bytes(requested_latency, &u->sink->sample_spec);
40553a5a1b3Sopenharmony_ci
40653a5a1b3Sopenharmony_ci    pa_log_debug("tlength requested at %lu.", (unsigned long) bufferattr.tlength);
40753a5a1b3Sopenharmony_ci
40853a5a1b3Sopenharmony_ci    pa_stream_set_state_callback(u->stream, stream_state_cb, u);
40953a5a1b3Sopenharmony_ci    pa_stream_set_buffer_attr_callback(u->stream, stream_changed_buffer_attr_cb, u);
41053a5a1b3Sopenharmony_ci    pa_stream_set_underflow_callback(u->stream, stream_underflow_callback, u);
41153a5a1b3Sopenharmony_ci    pa_stream_set_overflow_callback(u->stream, stream_overflow_callback, u);
41253a5a1b3Sopenharmony_ci    if (pa_stream_connect_playback(u->stream,
41353a5a1b3Sopenharmony_ci                                   u->remote_sink_name,
41453a5a1b3Sopenharmony_ci                                   &bufferattr,
41553a5a1b3Sopenharmony_ci                                   PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_DONT_MOVE | PA_STREAM_START_CORKED | PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_ADJUST_LATENCY,
41653a5a1b3Sopenharmony_ci                                   NULL,
41753a5a1b3Sopenharmony_ci                                   NULL) < 0) {
41853a5a1b3Sopenharmony_ci        pa_log_error("Could not connect stream.");
41953a5a1b3Sopenharmony_ci        u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
42053a5a1b3Sopenharmony_ci    }
42153a5a1b3Sopenharmony_ci    u->connected = true;
42253a5a1b3Sopenharmony_ci}
42353a5a1b3Sopenharmony_ci
42453a5a1b3Sopenharmony_cistatic void context_state_cb(pa_context *c, void *userdata) {
42553a5a1b3Sopenharmony_ci    struct userdata *u = userdata;
42653a5a1b3Sopenharmony_ci    pa_assert(u);
42753a5a1b3Sopenharmony_ci
42853a5a1b3Sopenharmony_ci    switch (pa_context_get_state(c)) {
42953a5a1b3Sopenharmony_ci        case PA_CONTEXT_UNCONNECTED:
43053a5a1b3Sopenharmony_ci        case PA_CONTEXT_CONNECTING:
43153a5a1b3Sopenharmony_ci        case PA_CONTEXT_AUTHORIZING:
43253a5a1b3Sopenharmony_ci        case PA_CONTEXT_SETTING_NAME:
43353a5a1b3Sopenharmony_ci            break;
43453a5a1b3Sopenharmony_ci        case PA_CONTEXT_READY:
43553a5a1b3Sopenharmony_ci            /* now that we're connected, ask the control thread to create a sink for
43653a5a1b3Sopenharmony_ci             * us, and wait for that to complete before proceeding, we'll
43753a5a1b3Sopenharmony_ci             * receive TUNNEL_MESSAGE_SINK_CREATED in response when the sink is
43853a5a1b3Sopenharmony_ci             * created (see sink_process_msg_cb()) */
43953a5a1b3Sopenharmony_ci            pa_log_debug("Connection successful. Creating stream.");
44053a5a1b3Sopenharmony_ci            pa_assert(!u->stream);
44153a5a1b3Sopenharmony_ci            pa_assert(!u->sink);
44253a5a1b3Sopenharmony_ci
44353a5a1b3Sopenharmony_ci            pa_log_debug("Asking ctl thread to create sink.");
44453a5a1b3Sopenharmony_ci            pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SINK_REQUEST, u, 0, NULL, NULL);
44553a5a1b3Sopenharmony_ci            break;
44653a5a1b3Sopenharmony_ci        case PA_CONTEXT_FAILED:
44753a5a1b3Sopenharmony_ci            pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context)));
44853a5a1b3Sopenharmony_ci            u->connected = false;
44953a5a1b3Sopenharmony_ci            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
45053a5a1b3Sopenharmony_ci            break;
45153a5a1b3Sopenharmony_ci        case PA_CONTEXT_TERMINATED:
45253a5a1b3Sopenharmony_ci            pa_log_debug("Context terminated.");
45353a5a1b3Sopenharmony_ci            u->connected = false;
45453a5a1b3Sopenharmony_ci            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
45553a5a1b3Sopenharmony_ci            break;
45653a5a1b3Sopenharmony_ci    }
45753a5a1b3Sopenharmony_ci}
45853a5a1b3Sopenharmony_ci
45953a5a1b3Sopenharmony_cistatic void sink_update_requested_latency_cb(pa_sink *s) {
46053a5a1b3Sopenharmony_ci    struct userdata *u;
46153a5a1b3Sopenharmony_ci    pa_operation *operation;
46253a5a1b3Sopenharmony_ci    size_t nbytes;
46353a5a1b3Sopenharmony_ci    pa_usec_t block_usec;
46453a5a1b3Sopenharmony_ci    pa_buffer_attr bufferattr;
46553a5a1b3Sopenharmony_ci
46653a5a1b3Sopenharmony_ci    pa_sink_assert_ref(s);
46753a5a1b3Sopenharmony_ci    pa_assert_se(u = s->userdata);
46853a5a1b3Sopenharmony_ci
46953a5a1b3Sopenharmony_ci    block_usec = pa_sink_get_requested_latency_within_thread(s);
47053a5a1b3Sopenharmony_ci    if (block_usec == (pa_usec_t) -1)
47153a5a1b3Sopenharmony_ci        block_usec = s->thread_info.max_latency;
47253a5a1b3Sopenharmony_ci
47353a5a1b3Sopenharmony_ci    nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec);
47453a5a1b3Sopenharmony_ci    pa_sink_set_max_request_within_thread(s, nbytes);
47553a5a1b3Sopenharmony_ci
47653a5a1b3Sopenharmony_ci    if (u->stream) {
47753a5a1b3Sopenharmony_ci        switch (pa_stream_get_state(u->stream)) {
47853a5a1b3Sopenharmony_ci            case PA_STREAM_READY:
47953a5a1b3Sopenharmony_ci                if (pa_stream_get_buffer_attr(u->stream)->tlength == nbytes)
48053a5a1b3Sopenharmony_ci                    break;
48153a5a1b3Sopenharmony_ci
48253a5a1b3Sopenharmony_ci                pa_log_debug("Requesting new buffer attrs. tlength requested at %lu.",
48353a5a1b3Sopenharmony_ci                             (unsigned long) nbytes);
48453a5a1b3Sopenharmony_ci
48553a5a1b3Sopenharmony_ci                reset_bufferattr(&bufferattr);
48653a5a1b3Sopenharmony_ci                bufferattr.tlength = nbytes;
48753a5a1b3Sopenharmony_ci                if ((operation = pa_stream_set_buffer_attr(u->stream, &bufferattr, stream_set_buffer_attr_cb, u)))
48853a5a1b3Sopenharmony_ci                    pa_operation_unref(operation);
48953a5a1b3Sopenharmony_ci                break;
49053a5a1b3Sopenharmony_ci            case PA_STREAM_CREATING:
49153a5a1b3Sopenharmony_ci                /* we have to delay our request until stream is ready */
49253a5a1b3Sopenharmony_ci                u->update_stream_bufferattr_after_connect = true;
49353a5a1b3Sopenharmony_ci                break;
49453a5a1b3Sopenharmony_ci            default:
49553a5a1b3Sopenharmony_ci                break;
49653a5a1b3Sopenharmony_ci        }
49753a5a1b3Sopenharmony_ci    }
49853a5a1b3Sopenharmony_ci}
49953a5a1b3Sopenharmony_ci
50053a5a1b3Sopenharmony_cistatic int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
50153a5a1b3Sopenharmony_ci    struct userdata *u = PA_SINK(o)->userdata;
50253a5a1b3Sopenharmony_ci
50353a5a1b3Sopenharmony_ci    switch (code) {
50453a5a1b3Sopenharmony_ci        case PA_SINK_MESSAGE_GET_LATENCY: {
50553a5a1b3Sopenharmony_ci            int negative;
50653a5a1b3Sopenharmony_ci            pa_usec_t remote_latency;
50753a5a1b3Sopenharmony_ci
50853a5a1b3Sopenharmony_ci            if (!PA_SINK_IS_LINKED(u->sink->thread_info.state)) {
50953a5a1b3Sopenharmony_ci                *((int64_t*) data) = 0;
51053a5a1b3Sopenharmony_ci                return 0;
51153a5a1b3Sopenharmony_ci            }
51253a5a1b3Sopenharmony_ci
51353a5a1b3Sopenharmony_ci            if (!u->stream) {
51453a5a1b3Sopenharmony_ci                *((int64_t*) data) = 0;
51553a5a1b3Sopenharmony_ci                return 0;
51653a5a1b3Sopenharmony_ci            }
51753a5a1b3Sopenharmony_ci
51853a5a1b3Sopenharmony_ci            if (pa_stream_get_state(u->stream) != PA_STREAM_READY) {
51953a5a1b3Sopenharmony_ci                *((int64_t*) data) = 0;
52053a5a1b3Sopenharmony_ci                return 0;
52153a5a1b3Sopenharmony_ci            }
52253a5a1b3Sopenharmony_ci
52353a5a1b3Sopenharmony_ci            if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) {
52453a5a1b3Sopenharmony_ci                *((int64_t*) data) = 0;
52553a5a1b3Sopenharmony_ci                return 0;
52653a5a1b3Sopenharmony_ci            }
52753a5a1b3Sopenharmony_ci
52853a5a1b3Sopenharmony_ci            *((int64_t*) data) = remote_latency;
52953a5a1b3Sopenharmony_ci            return 0;
53053a5a1b3Sopenharmony_ci        }
53153a5a1b3Sopenharmony_ci        case TUNNEL_MESSAGE_SINK_CREATED:
53253a5a1b3Sopenharmony_ci            on_sink_created(u);
53353a5a1b3Sopenharmony_ci            return 0;
53453a5a1b3Sopenharmony_ci    }
53553a5a1b3Sopenharmony_ci    return pa_sink_process_msg(o, code, data, offset, chunk);
53653a5a1b3Sopenharmony_ci}
53753a5a1b3Sopenharmony_ci
53853a5a1b3Sopenharmony_ci/* Called from the IO thread. */
53953a5a1b3Sopenharmony_cistatic int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
54053a5a1b3Sopenharmony_ci    struct userdata *u;
54153a5a1b3Sopenharmony_ci
54253a5a1b3Sopenharmony_ci    pa_assert(s);
54353a5a1b3Sopenharmony_ci    pa_assert_se(u = s->userdata);
54453a5a1b3Sopenharmony_ci
54553a5a1b3Sopenharmony_ci    /* It may be that only the suspend cause is changing, in which case there's
54653a5a1b3Sopenharmony_ci     * nothing to do. */
54753a5a1b3Sopenharmony_ci    if (new_state == s->thread_info.state)
54853a5a1b3Sopenharmony_ci        return 0;
54953a5a1b3Sopenharmony_ci
55053a5a1b3Sopenharmony_ci    if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
55153a5a1b3Sopenharmony_ci        return 0;
55253a5a1b3Sopenharmony_ci
55353a5a1b3Sopenharmony_ci    switch (new_state) {
55453a5a1b3Sopenharmony_ci        case PA_SINK_SUSPENDED: {
55553a5a1b3Sopenharmony_ci            cork_stream(u, true);
55653a5a1b3Sopenharmony_ci            break;
55753a5a1b3Sopenharmony_ci        }
55853a5a1b3Sopenharmony_ci        case PA_SINK_IDLE:
55953a5a1b3Sopenharmony_ci        case PA_SINK_RUNNING: {
56053a5a1b3Sopenharmony_ci            cork_stream(u, false);
56153a5a1b3Sopenharmony_ci            break;
56253a5a1b3Sopenharmony_ci        }
56353a5a1b3Sopenharmony_ci        case PA_SINK_INVALID_STATE:
56453a5a1b3Sopenharmony_ci        case PA_SINK_INIT:
56553a5a1b3Sopenharmony_ci        case PA_SINK_UNLINKED:
56653a5a1b3Sopenharmony_ci            break;
56753a5a1b3Sopenharmony_ci    }
56853a5a1b3Sopenharmony_ci
56953a5a1b3Sopenharmony_ci    return 0;
57053a5a1b3Sopenharmony_ci}
57153a5a1b3Sopenharmony_ci
57253a5a1b3Sopenharmony_ci/* Creates a sink in the main thread.
57353a5a1b3Sopenharmony_ci *
57453a5a1b3Sopenharmony_ci * This method is called when we receive a message from the io thread that a
57553a5a1b3Sopenharmony_ci * connection has been established with the server.  We defer creation of the
57653a5a1b3Sopenharmony_ci * sink until the connection is established, because we don't have a sink if
57753a5a1b3Sopenharmony_ci * the remote server isn't there.
57853a5a1b3Sopenharmony_ci */
57953a5a1b3Sopenharmony_cistatic void create_sink(struct userdata *u) {
58053a5a1b3Sopenharmony_ci    pa_sink_new_data sink_data;
58153a5a1b3Sopenharmony_ci
58253a5a1b3Sopenharmony_ci    pa_assert_ctl_context();
58353a5a1b3Sopenharmony_ci
58453a5a1b3Sopenharmony_ci    /* Create sink */
58553a5a1b3Sopenharmony_ci    pa_sink_new_data_init(&sink_data);
58653a5a1b3Sopenharmony_ci    sink_data.driver = __FILE__;
58753a5a1b3Sopenharmony_ci    sink_data.module = u->module;
58853a5a1b3Sopenharmony_ci
58953a5a1b3Sopenharmony_ci    pa_sink_new_data_set_name(&sink_data, u->sink_name);
59053a5a1b3Sopenharmony_ci    pa_sink_new_data_set_sample_spec(&sink_data, &u->sample_spec);
59153a5a1b3Sopenharmony_ci    pa_sink_new_data_set_channel_map(&sink_data, &u->channel_map);
59253a5a1b3Sopenharmony_ci
59353a5a1b3Sopenharmony_ci    pa_proplist_update(sink_data.proplist, PA_UPDATE_REPLACE, u->sink_proplist);
59453a5a1b3Sopenharmony_ci
59553a5a1b3Sopenharmony_ci    if (!(u->sink = pa_sink_new(u->module->core, &sink_data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY | PA_SINK_NETWORK))) {
59653a5a1b3Sopenharmony_ci        pa_log("Failed to create sink.");
59753a5a1b3Sopenharmony_ci        goto finish;
59853a5a1b3Sopenharmony_ci    }
59953a5a1b3Sopenharmony_ci
60053a5a1b3Sopenharmony_ci    u->sink->userdata = u;
60153a5a1b3Sopenharmony_ci    u->sink->parent.process_msg = sink_process_msg_cb;
60253a5a1b3Sopenharmony_ci    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
60353a5a1b3Sopenharmony_ci    u->sink->update_requested_latency = sink_update_requested_latency_cb;
60453a5a1b3Sopenharmony_ci    pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC);
60553a5a1b3Sopenharmony_ci
60653a5a1b3Sopenharmony_ci    /* set thread message queue */
60753a5a1b3Sopenharmony_ci    pa_sink_set_asyncmsgq(u->sink, u->thread_mq->inq);
60853a5a1b3Sopenharmony_ci    pa_sink_set_rtpoll(u->sink, u->rtpoll);
60953a5a1b3Sopenharmony_ci
61053a5a1b3Sopenharmony_ci    pa_sink_put(u->sink);
61153a5a1b3Sopenharmony_ci
61253a5a1b3Sopenharmony_cifinish:
61353a5a1b3Sopenharmony_ci    pa_sink_new_data_done(&sink_data);
61453a5a1b3Sopenharmony_ci
61553a5a1b3Sopenharmony_ci    /* tell any interested io threads that the sink they asked for has now been
61653a5a1b3Sopenharmony_ci     * created (even if we failed, we still notify the thread, so they can
61753a5a1b3Sopenharmony_ci     * either handle or kill the thread, rather than deadlock waiting for a
61853a5a1b3Sopenharmony_ci     * message that will never come */
61953a5a1b3Sopenharmony_ci    pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), TUNNEL_MESSAGE_SINK_CREATED, u, 0, NULL);
62053a5a1b3Sopenharmony_ci}
62153a5a1b3Sopenharmony_ci
62253a5a1b3Sopenharmony_ci/* Runs in PA mainloop context */
62353a5a1b3Sopenharmony_cistatic int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
62453a5a1b3Sopenharmony_ci    struct userdata *u = (struct userdata *) data;
62553a5a1b3Sopenharmony_ci
62653a5a1b3Sopenharmony_ci    pa_assert(u);
62753a5a1b3Sopenharmony_ci    pa_assert_ctl_context();
62853a5a1b3Sopenharmony_ci
62953a5a1b3Sopenharmony_ci    if (u->shutting_down)
63053a5a1b3Sopenharmony_ci        return 0;
63153a5a1b3Sopenharmony_ci
63253a5a1b3Sopenharmony_ci    switch (code) {
63353a5a1b3Sopenharmony_ci        case TUNNEL_MESSAGE_CREATE_SINK_REQUEST:
63453a5a1b3Sopenharmony_ci            create_sink(u);
63553a5a1b3Sopenharmony_ci            break;
63653a5a1b3Sopenharmony_ci        case TUNNEL_MESSAGE_MAYBE_RESTART:
63753a5a1b3Sopenharmony_ci            maybe_restart(u->module->userdata);
63853a5a1b3Sopenharmony_ci            break;
63953a5a1b3Sopenharmony_ci    }
64053a5a1b3Sopenharmony_ci
64153a5a1b3Sopenharmony_ci    return 0;
64253a5a1b3Sopenharmony_ci}
64353a5a1b3Sopenharmony_ci
64453a5a1b3Sopenharmony_cistatic int do_init(pa_module *m) {
64553a5a1b3Sopenharmony_ci    struct userdata *u = NULL;
64653a5a1b3Sopenharmony_ci    struct module_restart_data *rd;
64753a5a1b3Sopenharmony_ci    pa_modargs *ma = NULL;
64853a5a1b3Sopenharmony_ci    const char *remote_server = NULL;
64953a5a1b3Sopenharmony_ci    char *default_sink_name = NULL;
65053a5a1b3Sopenharmony_ci    uint32_t reconnect_interval_ms = 0;
65153a5a1b3Sopenharmony_ci
65253a5a1b3Sopenharmony_ci    pa_assert(m);
65353a5a1b3Sopenharmony_ci    pa_assert(m->userdata);
65453a5a1b3Sopenharmony_ci
65553a5a1b3Sopenharmony_ci    rd = m->userdata;
65653a5a1b3Sopenharmony_ci
65753a5a1b3Sopenharmony_ci    if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
65853a5a1b3Sopenharmony_ci        pa_log("Failed to parse module arguments.");
65953a5a1b3Sopenharmony_ci        goto fail;
66053a5a1b3Sopenharmony_ci    }
66153a5a1b3Sopenharmony_ci
66253a5a1b3Sopenharmony_ci    u = pa_xnew0(struct userdata, 1);
66353a5a1b3Sopenharmony_ci    u->module = m;
66453a5a1b3Sopenharmony_ci    rd->userdata = u;
66553a5a1b3Sopenharmony_ci
66653a5a1b3Sopenharmony_ci    u->sample_spec = m->core->default_sample_spec;
66753a5a1b3Sopenharmony_ci    u->channel_map = m->core->default_channel_map;
66853a5a1b3Sopenharmony_ci    if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
66953a5a1b3Sopenharmony_ci        pa_log("Invalid sample format specification or channel map");
67053a5a1b3Sopenharmony_ci        goto fail;
67153a5a1b3Sopenharmony_ci    }
67253a5a1b3Sopenharmony_ci
67353a5a1b3Sopenharmony_ci    remote_server = pa_modargs_get_value(ma, "server", NULL);
67453a5a1b3Sopenharmony_ci    if (!remote_server) {
67553a5a1b3Sopenharmony_ci        pa_log("No server given!");
67653a5a1b3Sopenharmony_ci        goto fail;
67753a5a1b3Sopenharmony_ci    }
67853a5a1b3Sopenharmony_ci
67953a5a1b3Sopenharmony_ci    u->remote_server = pa_xstrdup(remote_server);
68053a5a1b3Sopenharmony_ci    u->thread_mainloop = pa_mainloop_new();
68153a5a1b3Sopenharmony_ci    if (u->thread_mainloop == NULL) {
68253a5a1b3Sopenharmony_ci        pa_log("Failed to create mainloop");
68353a5a1b3Sopenharmony_ci        goto fail;
68453a5a1b3Sopenharmony_ci    }
68553a5a1b3Sopenharmony_ci    u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop);
68653a5a1b3Sopenharmony_ci    u->cookie_file = pa_xstrdup(pa_modargs_get_value(ma, "cookie", NULL));
68753a5a1b3Sopenharmony_ci    u->remote_sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
68853a5a1b3Sopenharmony_ci
68953a5a1b3Sopenharmony_ci    u->thread_mq = pa_xnew0(pa_thread_mq, 1);
69053a5a1b3Sopenharmony_ci
69153a5a1b3Sopenharmony_ci    if (pa_thread_mq_init_thread_mainloop(u->thread_mq, m->core->mainloop, u->thread_mainloop_api) < 0) {
69253a5a1b3Sopenharmony_ci        pa_log("pa_thread_mq_init_thread_mainloop() failed.");
69353a5a1b3Sopenharmony_ci        goto fail;
69453a5a1b3Sopenharmony_ci    }
69553a5a1b3Sopenharmony_ci
69653a5a1b3Sopenharmony_ci    u->msg = pa_msgobject_new(tunnel_msg);
69753a5a1b3Sopenharmony_ci    u->msg->parent.process_msg = tunnel_process_msg;
69853a5a1b3Sopenharmony_ci
69953a5a1b3Sopenharmony_ci    /* The rtpoll created here is never run. It is only necessary to avoid crashes
70053a5a1b3Sopenharmony_ci     * when module-tunnel-sink-new is used together with module-loopback or
70153a5a1b3Sopenharmony_ci     * module-combine-sink. Both modules base their asyncmsq on the rtpoll provided
70253a5a1b3Sopenharmony_ci     * by the sink. module-loopback and combine-sink only work because they call
70353a5a1b3Sopenharmony_ci     * pa_asyncmsq_process_one() themselves. module_rtp_recv also uses the rtpoll,
70453a5a1b3Sopenharmony_ci     * but never calls pa_asyncmsq_process_one(), so it will not work in combination
70553a5a1b3Sopenharmony_ci     * with module-tunnel-sink-new. */
70653a5a1b3Sopenharmony_ci    u->rtpoll = pa_rtpoll_new();
70753a5a1b3Sopenharmony_ci
70853a5a1b3Sopenharmony_ci    default_sink_name = pa_sprintf_malloc("tunnel-sink-new.%s", remote_server);
70953a5a1b3Sopenharmony_ci    u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", default_sink_name));
71053a5a1b3Sopenharmony_ci
71153a5a1b3Sopenharmony_ci    u->sink_proplist = pa_proplist_new();
71253a5a1b3Sopenharmony_ci    pa_proplist_sets(u->sink_proplist, PA_PROP_DEVICE_CLASS, "sound");
71353a5a1b3Sopenharmony_ci    pa_proplist_setf(u->sink_proplist,
71453a5a1b3Sopenharmony_ci                     PA_PROP_DEVICE_DESCRIPTION,
71553a5a1b3Sopenharmony_ci                     _("Tunnel to %s/%s"),
71653a5a1b3Sopenharmony_ci                     remote_server,
71753a5a1b3Sopenharmony_ci                     pa_strempty(u->remote_sink_name));
71853a5a1b3Sopenharmony_ci
71953a5a1b3Sopenharmony_ci    if (pa_modargs_get_proplist(ma, "sink_properties", u->sink_proplist, PA_UPDATE_REPLACE) < 0) {
72053a5a1b3Sopenharmony_ci        pa_log("Invalid properties");
72153a5a1b3Sopenharmony_ci        goto fail;
72253a5a1b3Sopenharmony_ci    }
72353a5a1b3Sopenharmony_ci
72453a5a1b3Sopenharmony_ci    pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms);
72553a5a1b3Sopenharmony_ci    u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC;
72653a5a1b3Sopenharmony_ci
72753a5a1b3Sopenharmony_ci    if (!(u->thread = pa_thread_new("tunnel-sink", thread_func, u))) {
72853a5a1b3Sopenharmony_ci        pa_log("Failed to create thread.");
72953a5a1b3Sopenharmony_ci        goto fail;
73053a5a1b3Sopenharmony_ci    }
73153a5a1b3Sopenharmony_ci
73253a5a1b3Sopenharmony_ci    /* If the module is restarting and do_init() finishes successfully, the
73353a5a1b3Sopenharmony_ci     * restart data is no longer needed. If do_init() fails, don't touch the
73453a5a1b3Sopenharmony_ci     * restart data, because following restart attempts will continue to use
73553a5a1b3Sopenharmony_ci     * the same data. If restart_data is NULL, that means no restart is
73653a5a1b3Sopenharmony_ci     * currently pending. */
73753a5a1b3Sopenharmony_ci    if (rd->restart_data) {
73853a5a1b3Sopenharmony_ci        pa_restart_free(rd->restart_data);
73953a5a1b3Sopenharmony_ci        rd->restart_data = NULL;
74053a5a1b3Sopenharmony_ci    }
74153a5a1b3Sopenharmony_ci
74253a5a1b3Sopenharmony_ci    pa_modargs_free(ma);
74353a5a1b3Sopenharmony_ci    pa_xfree(default_sink_name);
74453a5a1b3Sopenharmony_ci
74553a5a1b3Sopenharmony_ci    return 0;
74653a5a1b3Sopenharmony_ci
74753a5a1b3Sopenharmony_cifail:
74853a5a1b3Sopenharmony_ci    if (ma)
74953a5a1b3Sopenharmony_ci        pa_modargs_free(ma);
75053a5a1b3Sopenharmony_ci
75153a5a1b3Sopenharmony_ci    if (default_sink_name)
75253a5a1b3Sopenharmony_ci        pa_xfree(default_sink_name);
75353a5a1b3Sopenharmony_ci
75453a5a1b3Sopenharmony_ci    return -1;
75553a5a1b3Sopenharmony_ci}
75653a5a1b3Sopenharmony_ci
75753a5a1b3Sopenharmony_cistatic void do_done(pa_module *m) {
75853a5a1b3Sopenharmony_ci    struct userdata *u = NULL;
75953a5a1b3Sopenharmony_ci    struct module_restart_data *rd;
76053a5a1b3Sopenharmony_ci
76153a5a1b3Sopenharmony_ci    pa_assert(m);
76253a5a1b3Sopenharmony_ci
76353a5a1b3Sopenharmony_ci    if (!(rd = m->userdata))
76453a5a1b3Sopenharmony_ci        return;
76553a5a1b3Sopenharmony_ci    if (!(u = rd->userdata))
76653a5a1b3Sopenharmony_ci        return;
76753a5a1b3Sopenharmony_ci
76853a5a1b3Sopenharmony_ci    u->shutting_down = true;
76953a5a1b3Sopenharmony_ci
77053a5a1b3Sopenharmony_ci    if (u->sink)
77153a5a1b3Sopenharmony_ci        pa_sink_unlink(u->sink);
77253a5a1b3Sopenharmony_ci
77353a5a1b3Sopenharmony_ci    if (u->thread) {
77453a5a1b3Sopenharmony_ci        pa_asyncmsgq_send(u->thread_mq->inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
77553a5a1b3Sopenharmony_ci        pa_thread_free(u->thread);
77653a5a1b3Sopenharmony_ci    }
77753a5a1b3Sopenharmony_ci
77853a5a1b3Sopenharmony_ci    if (u->thread_mq) {
77953a5a1b3Sopenharmony_ci        pa_thread_mq_done(u->thread_mq);
78053a5a1b3Sopenharmony_ci        pa_xfree(u->thread_mq);
78153a5a1b3Sopenharmony_ci    }
78253a5a1b3Sopenharmony_ci
78353a5a1b3Sopenharmony_ci    if (u->thread_mainloop)
78453a5a1b3Sopenharmony_ci        pa_mainloop_free(u->thread_mainloop);
78553a5a1b3Sopenharmony_ci
78653a5a1b3Sopenharmony_ci    if (u->cookie_file)
78753a5a1b3Sopenharmony_ci        pa_xfree(u->cookie_file);
78853a5a1b3Sopenharmony_ci
78953a5a1b3Sopenharmony_ci    if (u->remote_sink_name)
79053a5a1b3Sopenharmony_ci        pa_xfree(u->remote_sink_name);
79153a5a1b3Sopenharmony_ci
79253a5a1b3Sopenharmony_ci    if (u->remote_server)
79353a5a1b3Sopenharmony_ci        pa_xfree(u->remote_server);
79453a5a1b3Sopenharmony_ci
79553a5a1b3Sopenharmony_ci    if (u->sink)
79653a5a1b3Sopenharmony_ci        pa_sink_unref(u->sink);
79753a5a1b3Sopenharmony_ci
79853a5a1b3Sopenharmony_ci    if (u->rtpoll)
79953a5a1b3Sopenharmony_ci        pa_rtpoll_free(u->rtpoll);
80053a5a1b3Sopenharmony_ci
80153a5a1b3Sopenharmony_ci    if (u->sink_proplist)
80253a5a1b3Sopenharmony_ci        pa_proplist_free(u->sink_proplist);
80353a5a1b3Sopenharmony_ci
80453a5a1b3Sopenharmony_ci    if (u->sink_name)
80553a5a1b3Sopenharmony_ci        pa_xfree(u->sink_name);
80653a5a1b3Sopenharmony_ci
80753a5a1b3Sopenharmony_ci    pa_xfree(u->msg);
80853a5a1b3Sopenharmony_ci
80953a5a1b3Sopenharmony_ci    pa_xfree(u);
81053a5a1b3Sopenharmony_ci
81153a5a1b3Sopenharmony_ci    rd->userdata = NULL;
81253a5a1b3Sopenharmony_ci}
81353a5a1b3Sopenharmony_ci
81453a5a1b3Sopenharmony_ciint pa__init(pa_module *m) {
81553a5a1b3Sopenharmony_ci    int ret;
81653a5a1b3Sopenharmony_ci
81753a5a1b3Sopenharmony_ci    pa_assert(m);
81853a5a1b3Sopenharmony_ci
81953a5a1b3Sopenharmony_ci    m->userdata = pa_xnew0(struct module_restart_data, 1);
82053a5a1b3Sopenharmony_ci
82153a5a1b3Sopenharmony_ci    ret = do_init(m);
82253a5a1b3Sopenharmony_ci
82353a5a1b3Sopenharmony_ci    if (ret < 0)
82453a5a1b3Sopenharmony_ci        pa__done(m);
82553a5a1b3Sopenharmony_ci
82653a5a1b3Sopenharmony_ci    return ret;
82753a5a1b3Sopenharmony_ci}
82853a5a1b3Sopenharmony_ci
82953a5a1b3Sopenharmony_civoid pa__done(pa_module *m) {
83053a5a1b3Sopenharmony_ci    pa_assert(m);
83153a5a1b3Sopenharmony_ci
83253a5a1b3Sopenharmony_ci    do_done(m);
83353a5a1b3Sopenharmony_ci
83453a5a1b3Sopenharmony_ci    if (m->userdata) {
83553a5a1b3Sopenharmony_ci        struct module_restart_data *rd = m->userdata;
83653a5a1b3Sopenharmony_ci
83753a5a1b3Sopenharmony_ci        if (rd->restart_data)
83853a5a1b3Sopenharmony_ci            pa_restart_free(rd->restart_data);
83953a5a1b3Sopenharmony_ci
84053a5a1b3Sopenharmony_ci        pa_xfree(m->userdata);
84153a5a1b3Sopenharmony_ci    }
84253a5a1b3Sopenharmony_ci}
843