153a5a1b3Sopenharmony_ci/***
253a5a1b3Sopenharmony_ci    This file is part of PulseAudio.
353a5a1b3Sopenharmony_ci
453a5a1b3Sopenharmony_ci    Copyright 2013 Alexander Couzens
553a5a1b3Sopenharmony_ci
653a5a1b3Sopenharmony_ci    PulseAudio is free software; you can redistribute it and/or modify
753a5a1b3Sopenharmony_ci    it under the terms of the GNU Lesser General Public License as published
853a5a1b3Sopenharmony_ci    by the Free Software Foundation; either version 2.1 of the License,
953a5a1b3Sopenharmony_ci    or (at your option) any later version.
1053a5a1b3Sopenharmony_ci
1153a5a1b3Sopenharmony_ci    PulseAudio is distributed in the hope that it will be useful, but
1253a5a1b3Sopenharmony_ci    WITHOUT ANY WARRANTY; without even the implied warranty of
1353a5a1b3Sopenharmony_ci    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1453a5a1b3Sopenharmony_ci    General Public License for more details.
1553a5a1b3Sopenharmony_ci
1653a5a1b3Sopenharmony_ci    You should have received a copy of the GNU Lesser General Public License
1753a5a1b3Sopenharmony_ci    along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
1853a5a1b3Sopenharmony_ci***/
1953a5a1b3Sopenharmony_ci
2053a5a1b3Sopenharmony_ci#ifdef HAVE_CONFIG_H
2153a5a1b3Sopenharmony_ci#include <config.h>
2253a5a1b3Sopenharmony_ci#endif
2353a5a1b3Sopenharmony_ci
2453a5a1b3Sopenharmony_ci#include "restart-module.h"
2553a5a1b3Sopenharmony_ci
2653a5a1b3Sopenharmony_ci#include <pulse/context.h>
2753a5a1b3Sopenharmony_ci#include <pulse/timeval.h>
2853a5a1b3Sopenharmony_ci#include <pulse/xmalloc.h>
2953a5a1b3Sopenharmony_ci#include <pulse/stream.h>
3053a5a1b3Sopenharmony_ci#include <pulse/mainloop.h>
3153a5a1b3Sopenharmony_ci#include <pulse/introspect.h>
3253a5a1b3Sopenharmony_ci#include <pulse/error.h>
3353a5a1b3Sopenharmony_ci
3453a5a1b3Sopenharmony_ci#include <pulsecore/core.h>
3553a5a1b3Sopenharmony_ci#include <pulsecore/core-util.h>
3653a5a1b3Sopenharmony_ci#include <pulsecore/i18n.h>
3753a5a1b3Sopenharmony_ci#include <pulsecore/source.h>
3853a5a1b3Sopenharmony_ci#include <pulsecore/modargs.h>
3953a5a1b3Sopenharmony_ci#include <pulsecore/log.h>
4053a5a1b3Sopenharmony_ci#include <pulsecore/thread.h>
4153a5a1b3Sopenharmony_ci#include <pulsecore/thread-mq.h>
4253a5a1b3Sopenharmony_ci#include <pulsecore/poll.h>
4353a5a1b3Sopenharmony_ci#include <pulsecore/rtpoll.h>
4453a5a1b3Sopenharmony_ci#include <pulsecore/proplist-util.h>
4553a5a1b3Sopenharmony_ci
4653a5a1b3Sopenharmony_ciPA_MODULE_AUTHOR("Alexander Couzens");
4753a5a1b3Sopenharmony_ciPA_MODULE_DESCRIPTION("Create a network source which connects via a stream to a remote PulseAudio server");
4853a5a1b3Sopenharmony_ciPA_MODULE_VERSION(PACKAGE_VERSION);
4953a5a1b3Sopenharmony_ciPA_MODULE_LOAD_ONCE(false);
5053a5a1b3Sopenharmony_ciPA_MODULE_USAGE(
5153a5a1b3Sopenharmony_ci        "server=<address> "
5253a5a1b3Sopenharmony_ci        "source=<name of the remote source> "
5353a5a1b3Sopenharmony_ci        "source_name=<name for the local source> "
5453a5a1b3Sopenharmony_ci        "source_properties=<properties for the local source> "
5553a5a1b3Sopenharmony_ci        "reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
5653a5a1b3Sopenharmony_ci        "format=<sample format> "
5753a5a1b3Sopenharmony_ci        "channels=<number of channels> "
5853a5a1b3Sopenharmony_ci        "rate=<sample rate> "
5953a5a1b3Sopenharmony_ci        "channel_map=<channel map> "
6053a5a1b3Sopenharmony_ci        "cookie=<cookie file path>"
6153a5a1b3Sopenharmony_ci        );
6253a5a1b3Sopenharmony_ci
6353a5a1b3Sopenharmony_ci#define TUNNEL_THREAD_FAILED_MAINLOOP 1
6453a5a1b3Sopenharmony_ci
6553a5a1b3Sopenharmony_cistatic int do_init(pa_module *m);
6653a5a1b3Sopenharmony_cistatic void do_done(pa_module *m);
6753a5a1b3Sopenharmony_cistatic void stream_state_cb(pa_stream *stream, void *userdata);
6853a5a1b3Sopenharmony_cistatic void stream_read_cb(pa_stream *s, size_t length, void *userdata);
6953a5a1b3Sopenharmony_cistatic void context_state_cb(pa_context *c, void *userdata);
7053a5a1b3Sopenharmony_cistatic void source_update_requested_latency_cb(pa_source *s);
7153a5a1b3Sopenharmony_ci
7253a5a1b3Sopenharmony_cistruct tunnel_msg {
7353a5a1b3Sopenharmony_ci    pa_msgobject parent;
7453a5a1b3Sopenharmony_ci};
7553a5a1b3Sopenharmony_ci
7653a5a1b3Sopenharmony_citypedef struct tunnel_msg tunnel_msg;
7753a5a1b3Sopenharmony_ciPA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
7853a5a1b3Sopenharmony_ci
7953a5a1b3Sopenharmony_cienum {
8053a5a1b3Sopenharmony_ci    TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST,
8153a5a1b3Sopenharmony_ci    TUNNEL_MESSAGE_MAYBE_RESTART,
8253a5a1b3Sopenharmony_ci};
8353a5a1b3Sopenharmony_ci
8453a5a1b3Sopenharmony_cienum {
8553a5a1b3Sopenharmony_ci    TUNNEL_MESSAGE_SOURCE_CREATED = PA_SOURCE_MESSAGE_MAX,
8653a5a1b3Sopenharmony_ci};
8753a5a1b3Sopenharmony_ci
8853a5a1b3Sopenharmony_cistruct userdata {
8953a5a1b3Sopenharmony_ci    pa_module *module;
9053a5a1b3Sopenharmony_ci    pa_source *source;
9153a5a1b3Sopenharmony_ci    pa_thread *thread;
9253a5a1b3Sopenharmony_ci    pa_thread_mq *thread_mq;
9353a5a1b3Sopenharmony_ci    pa_mainloop *thread_mainloop;
9453a5a1b3Sopenharmony_ci    pa_mainloop_api *thread_mainloop_api;
9553a5a1b3Sopenharmony_ci
9653a5a1b3Sopenharmony_ci    pa_context *context;
9753a5a1b3Sopenharmony_ci    pa_stream *stream;
9853a5a1b3Sopenharmony_ci    pa_rtpoll *rtpoll;
9953a5a1b3Sopenharmony_ci
10053a5a1b3Sopenharmony_ci    bool update_stream_bufferattr_after_connect;
10153a5a1b3Sopenharmony_ci    bool connected;
10253a5a1b3Sopenharmony_ci    bool shutting_down;
10353a5a1b3Sopenharmony_ci    bool new_data;
10453a5a1b3Sopenharmony_ci
10553a5a1b3Sopenharmony_ci    char *cookie_file;
10653a5a1b3Sopenharmony_ci    char *remote_server;
10753a5a1b3Sopenharmony_ci    char *remote_source_name;
10853a5a1b3Sopenharmony_ci    char *source_name;
10953a5a1b3Sopenharmony_ci
11053a5a1b3Sopenharmony_ci    pa_proplist *source_proplist;
11153a5a1b3Sopenharmony_ci    pa_sample_spec sample_spec;
11253a5a1b3Sopenharmony_ci    pa_channel_map channel_map;
11353a5a1b3Sopenharmony_ci
11453a5a1b3Sopenharmony_ci    tunnel_msg *msg;
11553a5a1b3Sopenharmony_ci
11653a5a1b3Sopenharmony_ci    pa_usec_t reconnect_interval_us;
11753a5a1b3Sopenharmony_ci};
11853a5a1b3Sopenharmony_ci
11953a5a1b3Sopenharmony_cistruct module_restart_data {
12053a5a1b3Sopenharmony_ci    struct userdata *userdata;
12153a5a1b3Sopenharmony_ci    pa_restart_data *restart_data;
12253a5a1b3Sopenharmony_ci};
12353a5a1b3Sopenharmony_ci
12453a5a1b3Sopenharmony_cistatic const char* const valid_modargs[] = {
12553a5a1b3Sopenharmony_ci    "source_name",
12653a5a1b3Sopenharmony_ci    "source_properties",
12753a5a1b3Sopenharmony_ci    "server",
12853a5a1b3Sopenharmony_ci    "source",
12953a5a1b3Sopenharmony_ci    "format",
13053a5a1b3Sopenharmony_ci    "channels",
13153a5a1b3Sopenharmony_ci    "rate",
13253a5a1b3Sopenharmony_ci    "channel_map",
13353a5a1b3Sopenharmony_ci    "cookie",
13453a5a1b3Sopenharmony_ci    "reconnect_interval_ms",
13553a5a1b3Sopenharmony_ci    NULL,
13653a5a1b3Sopenharmony_ci};
13753a5a1b3Sopenharmony_ci
13853a5a1b3Sopenharmony_cistatic void cork_stream(struct userdata *u, bool cork) {
13953a5a1b3Sopenharmony_ci    pa_operation *operation;
14053a5a1b3Sopenharmony_ci
14153a5a1b3Sopenharmony_ci    pa_assert(u);
14253a5a1b3Sopenharmony_ci    pa_assert(u->stream);
14353a5a1b3Sopenharmony_ci
14453a5a1b3Sopenharmony_ci    if ((operation = pa_stream_cork(u->stream, cork, NULL, NULL)))
14553a5a1b3Sopenharmony_ci        pa_operation_unref(operation);
14653a5a1b3Sopenharmony_ci}
14753a5a1b3Sopenharmony_ci
14853a5a1b3Sopenharmony_cistatic void reset_bufferattr(pa_buffer_attr *bufferattr) {
14953a5a1b3Sopenharmony_ci    pa_assert(bufferattr);
15053a5a1b3Sopenharmony_ci    bufferattr->fragsize = (uint32_t) -1;
15153a5a1b3Sopenharmony_ci    bufferattr->minreq = (uint32_t) -1;
15253a5a1b3Sopenharmony_ci    bufferattr->maxlength = (uint32_t) -1;
15353a5a1b3Sopenharmony_ci    bufferattr->prebuf = (uint32_t) -1;
15453a5a1b3Sopenharmony_ci    bufferattr->tlength = (uint32_t) -1;
15553a5a1b3Sopenharmony_ci}
15653a5a1b3Sopenharmony_ci
15753a5a1b3Sopenharmony_cistatic pa_proplist* tunnel_new_proplist(struct userdata *u) {
15853a5a1b3Sopenharmony_ci    pa_proplist *proplist = pa_proplist_new();
15953a5a1b3Sopenharmony_ci    pa_assert(proplist);
16053a5a1b3Sopenharmony_ci    pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio");
16153a5a1b3Sopenharmony_ci    pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
16253a5a1b3Sopenharmony_ci    pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
16353a5a1b3Sopenharmony_ci    pa_init_proplist(proplist);
16453a5a1b3Sopenharmony_ci
16553a5a1b3Sopenharmony_ci    return proplist;
16653a5a1b3Sopenharmony_ci}
16753a5a1b3Sopenharmony_ci
16853a5a1b3Sopenharmony_cistatic void stream_read_cb(pa_stream *s, size_t length, void *userdata) {
16953a5a1b3Sopenharmony_ci    struct userdata *u = userdata;
17053a5a1b3Sopenharmony_ci    u->new_data = true;
17153a5a1b3Sopenharmony_ci}
17253a5a1b3Sopenharmony_ci
17353a5a1b3Sopenharmony_ci/* called from io context to read samples from the stream into our source */
17453a5a1b3Sopenharmony_cistatic void read_new_samples(struct userdata *u) {
17553a5a1b3Sopenharmony_ci    const void *p;
17653a5a1b3Sopenharmony_ci    size_t readable = 0;
17753a5a1b3Sopenharmony_ci    pa_memchunk memchunk;
17853a5a1b3Sopenharmony_ci
17953a5a1b3Sopenharmony_ci    pa_assert(u);
18053a5a1b3Sopenharmony_ci    u->new_data = false;
18153a5a1b3Sopenharmony_ci
18253a5a1b3Sopenharmony_ci    pa_memchunk_reset(&memchunk);
18353a5a1b3Sopenharmony_ci
18453a5a1b3Sopenharmony_ci    if (PA_UNLIKELY(!u->connected || pa_stream_get_state(u->stream) != PA_STREAM_READY))
18553a5a1b3Sopenharmony_ci        return;
18653a5a1b3Sopenharmony_ci
18753a5a1b3Sopenharmony_ci    readable = pa_stream_readable_size(u->stream);
18853a5a1b3Sopenharmony_ci    while (readable > 0) {
18953a5a1b3Sopenharmony_ci        size_t nbytes = 0;
19053a5a1b3Sopenharmony_ci        if (PA_UNLIKELY(pa_stream_peek(u->stream, &p, &nbytes) != 0)) {
19153a5a1b3Sopenharmony_ci            pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u->context)));
19253a5a1b3Sopenharmony_ci            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
19353a5a1b3Sopenharmony_ci            return;
19453a5a1b3Sopenharmony_ci        }
19553a5a1b3Sopenharmony_ci
19653a5a1b3Sopenharmony_ci        if (PA_LIKELY(p)) {
19753a5a1b3Sopenharmony_ci            /* we have valid data */
19853a5a1b3Sopenharmony_ci            memchunk.memblock = pa_memblock_new_fixed(u->module->core->mempool, (void *) p, nbytes, true);
19953a5a1b3Sopenharmony_ci            memchunk.length = nbytes;
20053a5a1b3Sopenharmony_ci            memchunk.index = 0;
20153a5a1b3Sopenharmony_ci
20253a5a1b3Sopenharmony_ci            pa_source_post(u->source, &memchunk);
20353a5a1b3Sopenharmony_ci            pa_memblock_unref_fixed(memchunk.memblock);
20453a5a1b3Sopenharmony_ci        } else {
20553a5a1b3Sopenharmony_ci            size_t bytes_to_generate = nbytes;
20653a5a1b3Sopenharmony_ci
20753a5a1b3Sopenharmony_ci            /* we have a hole. generate silence */
20853a5a1b3Sopenharmony_ci            memchunk = u->source->silence;
20953a5a1b3Sopenharmony_ci            pa_memblock_ref(memchunk.memblock);
21053a5a1b3Sopenharmony_ci
21153a5a1b3Sopenharmony_ci            while (bytes_to_generate > 0) {
21253a5a1b3Sopenharmony_ci                if (bytes_to_generate < memchunk.length)
21353a5a1b3Sopenharmony_ci                    memchunk.length = bytes_to_generate;
21453a5a1b3Sopenharmony_ci
21553a5a1b3Sopenharmony_ci                pa_source_post(u->source, &memchunk);
21653a5a1b3Sopenharmony_ci                bytes_to_generate -= memchunk.length;
21753a5a1b3Sopenharmony_ci            }
21853a5a1b3Sopenharmony_ci
21953a5a1b3Sopenharmony_ci            pa_memblock_unref(memchunk.memblock);
22053a5a1b3Sopenharmony_ci        }
22153a5a1b3Sopenharmony_ci
22253a5a1b3Sopenharmony_ci        pa_stream_drop(u->stream);
22353a5a1b3Sopenharmony_ci        readable -= nbytes;
22453a5a1b3Sopenharmony_ci    }
22553a5a1b3Sopenharmony_ci}
22653a5a1b3Sopenharmony_ci
22753a5a1b3Sopenharmony_cistatic void thread_func(void *userdata) {
22853a5a1b3Sopenharmony_ci    struct userdata *u = userdata;
22953a5a1b3Sopenharmony_ci    pa_proplist *proplist;
23053a5a1b3Sopenharmony_ci
23153a5a1b3Sopenharmony_ci    pa_assert(u);
23253a5a1b3Sopenharmony_ci
23353a5a1b3Sopenharmony_ci    pa_log_debug("Thread starting up");
23453a5a1b3Sopenharmony_ci    pa_thread_mq_install(u->thread_mq);
23553a5a1b3Sopenharmony_ci
23653a5a1b3Sopenharmony_ci    proplist = tunnel_new_proplist(u);
23753a5a1b3Sopenharmony_ci    u->context = pa_context_new_with_proplist(u->thread_mainloop_api,
23853a5a1b3Sopenharmony_ci                                              "PulseAudio",
23953a5a1b3Sopenharmony_ci                                              proplist);
24053a5a1b3Sopenharmony_ci    pa_proplist_free(proplist);
24153a5a1b3Sopenharmony_ci
24253a5a1b3Sopenharmony_ci    if (!u->context) {
24353a5a1b3Sopenharmony_ci        pa_log("Failed to create libpulse context");
24453a5a1b3Sopenharmony_ci        goto fail;
24553a5a1b3Sopenharmony_ci    }
24653a5a1b3Sopenharmony_ci
24753a5a1b3Sopenharmony_ci    if (u->cookie_file && pa_context_load_cookie_from_file(u->context, u->cookie_file) != 0) {
24853a5a1b3Sopenharmony_ci        pa_log_error("Can not load cookie file!");
24953a5a1b3Sopenharmony_ci        goto fail;
25053a5a1b3Sopenharmony_ci    }
25153a5a1b3Sopenharmony_ci
25253a5a1b3Sopenharmony_ci    pa_context_set_state_callback(u->context, context_state_cb, u);
25353a5a1b3Sopenharmony_ci    if (pa_context_connect(u->context,
25453a5a1b3Sopenharmony_ci                           u->remote_server,
25553a5a1b3Sopenharmony_ci                           PA_CONTEXT_NOAUTOSPAWN,
25653a5a1b3Sopenharmony_ci                           NULL) < 0) {
25753a5a1b3Sopenharmony_ci        pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context)));
25853a5a1b3Sopenharmony_ci        goto fail;
25953a5a1b3Sopenharmony_ci    }
26053a5a1b3Sopenharmony_ci
26153a5a1b3Sopenharmony_ci    for (;;) {
26253a5a1b3Sopenharmony_ci        int ret;
26353a5a1b3Sopenharmony_ci
26453a5a1b3Sopenharmony_ci        if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) {
26553a5a1b3Sopenharmony_ci            if (ret == 0)
26653a5a1b3Sopenharmony_ci                goto finish;
26753a5a1b3Sopenharmony_ci            else
26853a5a1b3Sopenharmony_ci                goto fail;
26953a5a1b3Sopenharmony_ci        }
27053a5a1b3Sopenharmony_ci
27153a5a1b3Sopenharmony_ci        if (u->new_data)
27253a5a1b3Sopenharmony_ci            read_new_samples(u);
27353a5a1b3Sopenharmony_ci    }
27453a5a1b3Sopenharmony_cifail:
27553a5a1b3Sopenharmony_ci    /* send a message to the ctl thread to ask it to either terminate us, or
27653a5a1b3Sopenharmony_ci     * restart us, but either way this thread will exit, so then wait for the
27753a5a1b3Sopenharmony_ci     * shutdown message */
27853a5a1b3Sopenharmony_ci    pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL);
27953a5a1b3Sopenharmony_ci    pa_asyncmsgq_wait_for(u->thread_mq->inq, PA_MESSAGE_SHUTDOWN);
28053a5a1b3Sopenharmony_ci
28153a5a1b3Sopenharmony_cifinish:
28253a5a1b3Sopenharmony_ci    if (u->stream) {
28353a5a1b3Sopenharmony_ci        pa_stream_disconnect(u->stream);
28453a5a1b3Sopenharmony_ci        pa_stream_unref(u->stream);
28553a5a1b3Sopenharmony_ci        u->stream = NULL;
28653a5a1b3Sopenharmony_ci    }
28753a5a1b3Sopenharmony_ci
28853a5a1b3Sopenharmony_ci    if (u->context) {
28953a5a1b3Sopenharmony_ci        pa_context_disconnect(u->context);
29053a5a1b3Sopenharmony_ci        pa_context_unref(u->context);
29153a5a1b3Sopenharmony_ci        u->context = NULL;
29253a5a1b3Sopenharmony_ci    }
29353a5a1b3Sopenharmony_ci
29453a5a1b3Sopenharmony_ci    pa_log_debug("Thread shutting down");
29553a5a1b3Sopenharmony_ci}
29653a5a1b3Sopenharmony_ci
29753a5a1b3Sopenharmony_cistatic void stream_state_cb(pa_stream *stream, void *userdata) {
29853a5a1b3Sopenharmony_ci    struct userdata *u = userdata;
29953a5a1b3Sopenharmony_ci
30053a5a1b3Sopenharmony_ci    pa_assert(u);
30153a5a1b3Sopenharmony_ci
30253a5a1b3Sopenharmony_ci    switch (pa_stream_get_state(stream)) {
30353a5a1b3Sopenharmony_ci        case PA_STREAM_FAILED:
30453a5a1b3Sopenharmony_ci            pa_log_error("Stream failed: %s", pa_strerror(pa_context_errno(u->context)));
30553a5a1b3Sopenharmony_ci            u->connected = false;
30653a5a1b3Sopenharmony_ci            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
30753a5a1b3Sopenharmony_ci            break;
30853a5a1b3Sopenharmony_ci        case PA_STREAM_TERMINATED:
30953a5a1b3Sopenharmony_ci            pa_log_debug("Stream terminated.");
31053a5a1b3Sopenharmony_ci            break;
31153a5a1b3Sopenharmony_ci        case PA_STREAM_READY:
31253a5a1b3Sopenharmony_ci            if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
31353a5a1b3Sopenharmony_ci                cork_stream(u, false);
31453a5a1b3Sopenharmony_ci
31553a5a1b3Sopenharmony_ci            /* Only call our requested_latency_cb when requested_latency
31653a5a1b3Sopenharmony_ci             * changed between PA_STREAM_CREATING -> PA_STREAM_READY, because
31753a5a1b3Sopenharmony_ci             * we don't want to override the initial fragsize set by the server
31853a5a1b3Sopenharmony_ci             * without a good reason. */
31953a5a1b3Sopenharmony_ci            if (u->update_stream_bufferattr_after_connect)
32053a5a1b3Sopenharmony_ci                source_update_requested_latency_cb(u->source);
32153a5a1b3Sopenharmony_ci        case PA_STREAM_UNCONNECTED:
32253a5a1b3Sopenharmony_ci        case PA_STREAM_CREATING:
32353a5a1b3Sopenharmony_ci            break;
32453a5a1b3Sopenharmony_ci    }
32553a5a1b3Sopenharmony_ci}
32653a5a1b3Sopenharmony_ci
32753a5a1b3Sopenharmony_ci/* Do a reinit of the module.  Note that u will be freed as a result of this
32853a5a1b3Sopenharmony_ci * call. */
32953a5a1b3Sopenharmony_cistatic void maybe_restart(struct module_restart_data *rd) {
33053a5a1b3Sopenharmony_ci    struct userdata *u = rd->userdata;
33153a5a1b3Sopenharmony_ci
33253a5a1b3Sopenharmony_ci    if (rd->restart_data) {
33353a5a1b3Sopenharmony_ci        pa_log_debug("Restart already pending");
33453a5a1b3Sopenharmony_ci        return;
33553a5a1b3Sopenharmony_ci    }
33653a5a1b3Sopenharmony_ci
33753a5a1b3Sopenharmony_ci    if (u->reconnect_interval_us > 0) {
33853a5a1b3Sopenharmony_ci        /* The handle returned here must be freed when do_init() finishes successfully
33953a5a1b3Sopenharmony_ci         * and when the module exits. */
34053a5a1b3Sopenharmony_ci        rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us);
34153a5a1b3Sopenharmony_ci    } else {
34253a5a1b3Sopenharmony_ci        /* exit the module */
34353a5a1b3Sopenharmony_ci        pa_module_unload_request(u->module, true);
34453a5a1b3Sopenharmony_ci    }
34553a5a1b3Sopenharmony_ci}
34653a5a1b3Sopenharmony_ci
34753a5a1b3Sopenharmony_cistatic void on_source_created(struct userdata *u) {
34853a5a1b3Sopenharmony_ci    pa_proplist *proplist;
34953a5a1b3Sopenharmony_ci    pa_buffer_attr bufferattr;
35053a5a1b3Sopenharmony_ci    pa_usec_t requested_latency;
35153a5a1b3Sopenharmony_ci    char *username = pa_get_user_name_malloc();
35253a5a1b3Sopenharmony_ci    char *hostname = pa_get_host_name_malloc();
35353a5a1b3Sopenharmony_ci    /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for lynxis@lazus' */
35453a5a1b3Sopenharmony_ci    char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
35553a5a1b3Sopenharmony_ci    pa_xfree(username);
35653a5a1b3Sopenharmony_ci    pa_xfree(hostname);
35753a5a1b3Sopenharmony_ci
35853a5a1b3Sopenharmony_ci    pa_assert_io_context();
35953a5a1b3Sopenharmony_ci
36053a5a1b3Sopenharmony_ci    /* if we still don't have a source, then source creation failed, and we
36153a5a1b3Sopenharmony_ci     * should kill this io thread */
36253a5a1b3Sopenharmony_ci    if (!u->source) {
36353a5a1b3Sopenharmony_ci        pa_log_error("Could not create a source.");
36453a5a1b3Sopenharmony_ci        u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
36553a5a1b3Sopenharmony_ci        return;
36653a5a1b3Sopenharmony_ci    }
36753a5a1b3Sopenharmony_ci
36853a5a1b3Sopenharmony_ci    proplist = tunnel_new_proplist(u);
36953a5a1b3Sopenharmony_ci    u->stream = pa_stream_new_with_proplist(u->context,
37053a5a1b3Sopenharmony_ci                                            stream_name,
37153a5a1b3Sopenharmony_ci                                            &u->source->sample_spec,
37253a5a1b3Sopenharmony_ci                                            &u->source->channel_map,
37353a5a1b3Sopenharmony_ci                                            proplist);
37453a5a1b3Sopenharmony_ci    pa_proplist_free(proplist);
37553a5a1b3Sopenharmony_ci    pa_xfree(stream_name);
37653a5a1b3Sopenharmony_ci
37753a5a1b3Sopenharmony_ci    if (!u->stream) {
37853a5a1b3Sopenharmony_ci        pa_log_error("Could not create a stream: %s", pa_strerror(pa_context_errno(u->context)));
37953a5a1b3Sopenharmony_ci        u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
38053a5a1b3Sopenharmony_ci        return;
38153a5a1b3Sopenharmony_ci    }
38253a5a1b3Sopenharmony_ci
38353a5a1b3Sopenharmony_ci    requested_latency = pa_source_get_requested_latency_within_thread(u->source);
38453a5a1b3Sopenharmony_ci    if (requested_latency == (uint32_t) -1)
38553a5a1b3Sopenharmony_ci        requested_latency = u->source->thread_info.max_latency;
38653a5a1b3Sopenharmony_ci
38753a5a1b3Sopenharmony_ci    reset_bufferattr(&bufferattr);
38853a5a1b3Sopenharmony_ci    bufferattr.fragsize = pa_usec_to_bytes(requested_latency, &u->source->sample_spec);
38953a5a1b3Sopenharmony_ci
39053a5a1b3Sopenharmony_ci    pa_stream_set_state_callback(u->stream, stream_state_cb, u);
39153a5a1b3Sopenharmony_ci    pa_stream_set_read_callback(u->stream, stream_read_cb, u);
39253a5a1b3Sopenharmony_ci    if (pa_stream_connect_record(u->stream,
39353a5a1b3Sopenharmony_ci                                 u->remote_source_name,
39453a5a1b3Sopenharmony_ci                                 &bufferattr,
39553a5a1b3Sopenharmony_ci                                 PA_STREAM_INTERPOLATE_TIMING|PA_STREAM_DONT_MOVE|PA_STREAM_AUTO_TIMING_UPDATE|PA_STREAM_START_CORKED|PA_STREAM_ADJUST_LATENCY) < 0) {
39653a5a1b3Sopenharmony_ci        pa_log_debug("Could not create stream: %s", pa_strerror(pa_context_errno(u->context)));
39753a5a1b3Sopenharmony_ci        u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
39853a5a1b3Sopenharmony_ci    }
39953a5a1b3Sopenharmony_ci    u->connected = true;
40053a5a1b3Sopenharmony_ci}
40153a5a1b3Sopenharmony_ci
40253a5a1b3Sopenharmony_cistatic void context_state_cb(pa_context *c, void *userdata) {
40353a5a1b3Sopenharmony_ci    struct userdata *u = userdata;
40453a5a1b3Sopenharmony_ci    pa_assert(u);
40553a5a1b3Sopenharmony_ci
40653a5a1b3Sopenharmony_ci    switch (pa_context_get_state(c)) {
40753a5a1b3Sopenharmony_ci        case PA_CONTEXT_UNCONNECTED:
40853a5a1b3Sopenharmony_ci        case PA_CONTEXT_CONNECTING:
40953a5a1b3Sopenharmony_ci        case PA_CONTEXT_AUTHORIZING:
41053a5a1b3Sopenharmony_ci        case PA_CONTEXT_SETTING_NAME:
41153a5a1b3Sopenharmony_ci            break;
41253a5a1b3Sopenharmony_ci        case PA_CONTEXT_READY:
41353a5a1b3Sopenharmony_ci            pa_log_debug("Connection successful. Creating stream.");
41453a5a1b3Sopenharmony_ci            pa_assert(!u->stream);
41553a5a1b3Sopenharmony_ci            pa_assert(!u->source);
41653a5a1b3Sopenharmony_ci
41753a5a1b3Sopenharmony_ci            pa_log_debug("Asking ctl thread to create source.");
41853a5a1b3Sopenharmony_ci            pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST, u, 0, NULL, NULL);
41953a5a1b3Sopenharmony_ci            break;
42053a5a1b3Sopenharmony_ci        case PA_CONTEXT_FAILED:
42153a5a1b3Sopenharmony_ci            pa_log_debug("Context failed with err %s.", pa_strerror(pa_context_errno(u->context)));
42253a5a1b3Sopenharmony_ci            u->connected = false;
42353a5a1b3Sopenharmony_ci            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
42453a5a1b3Sopenharmony_ci            break;
42553a5a1b3Sopenharmony_ci        case PA_CONTEXT_TERMINATED:
42653a5a1b3Sopenharmony_ci            pa_log_debug("Context terminated.");
42753a5a1b3Sopenharmony_ci            u->connected = false;
42853a5a1b3Sopenharmony_ci            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
42953a5a1b3Sopenharmony_ci            break;
43053a5a1b3Sopenharmony_ci    }
43153a5a1b3Sopenharmony_ci}
43253a5a1b3Sopenharmony_ci
43353a5a1b3Sopenharmony_cistatic void source_update_requested_latency_cb(pa_source *s) {
43453a5a1b3Sopenharmony_ci    struct userdata *u;
43553a5a1b3Sopenharmony_ci    pa_operation *operation;
43653a5a1b3Sopenharmony_ci    size_t nbytes;
43753a5a1b3Sopenharmony_ci    pa_usec_t block_usec;
43853a5a1b3Sopenharmony_ci    pa_buffer_attr bufferattr;
43953a5a1b3Sopenharmony_ci
44053a5a1b3Sopenharmony_ci    pa_source_assert_ref(s);
44153a5a1b3Sopenharmony_ci    pa_assert_se(u = s->userdata);
44253a5a1b3Sopenharmony_ci
44353a5a1b3Sopenharmony_ci    block_usec = pa_source_get_requested_latency_within_thread(s);
44453a5a1b3Sopenharmony_ci    if (block_usec == (pa_usec_t) -1)
44553a5a1b3Sopenharmony_ci        block_usec = s->thread_info.max_latency;
44653a5a1b3Sopenharmony_ci
44753a5a1b3Sopenharmony_ci    nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec);
44853a5a1b3Sopenharmony_ci
44953a5a1b3Sopenharmony_ci    if (u->stream) {
45053a5a1b3Sopenharmony_ci        switch (pa_stream_get_state(u->stream)) {
45153a5a1b3Sopenharmony_ci            case PA_STREAM_READY:
45253a5a1b3Sopenharmony_ci                if (pa_stream_get_buffer_attr(u->stream)->fragsize == nbytes)
45353a5a1b3Sopenharmony_ci                    break;
45453a5a1b3Sopenharmony_ci
45553a5a1b3Sopenharmony_ci                reset_bufferattr(&bufferattr);
45653a5a1b3Sopenharmony_ci                bufferattr.fragsize = nbytes;
45753a5a1b3Sopenharmony_ci                if ((operation = pa_stream_set_buffer_attr(u->stream, &bufferattr, NULL, NULL)))
45853a5a1b3Sopenharmony_ci                    pa_operation_unref(operation);
45953a5a1b3Sopenharmony_ci                break;
46053a5a1b3Sopenharmony_ci            case PA_STREAM_CREATING:
46153a5a1b3Sopenharmony_ci                /* we have to delay our request until stream is ready */
46253a5a1b3Sopenharmony_ci                u->update_stream_bufferattr_after_connect = true;
46353a5a1b3Sopenharmony_ci                break;
46453a5a1b3Sopenharmony_ci            default:
46553a5a1b3Sopenharmony_ci                break;
46653a5a1b3Sopenharmony_ci        }
46753a5a1b3Sopenharmony_ci    }
46853a5a1b3Sopenharmony_ci}
46953a5a1b3Sopenharmony_ci
47053a5a1b3Sopenharmony_cistatic int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
47153a5a1b3Sopenharmony_ci    struct userdata *u = PA_SOURCE(o)->userdata;
47253a5a1b3Sopenharmony_ci
47353a5a1b3Sopenharmony_ci    switch (code) {
47453a5a1b3Sopenharmony_ci        case PA_SOURCE_MESSAGE_GET_LATENCY: {
47553a5a1b3Sopenharmony_ci            int negative;
47653a5a1b3Sopenharmony_ci            pa_usec_t remote_latency;
47753a5a1b3Sopenharmony_ci
47853a5a1b3Sopenharmony_ci            if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state)) {
47953a5a1b3Sopenharmony_ci                *((int64_t*) data) = 0;
48053a5a1b3Sopenharmony_ci                return 0;
48153a5a1b3Sopenharmony_ci            }
48253a5a1b3Sopenharmony_ci
48353a5a1b3Sopenharmony_ci            if (!u->stream) {
48453a5a1b3Sopenharmony_ci                *((int64_t*) data) = 0;
48553a5a1b3Sopenharmony_ci                return 0;
48653a5a1b3Sopenharmony_ci            }
48753a5a1b3Sopenharmony_ci
48853a5a1b3Sopenharmony_ci            if (pa_stream_get_state(u->stream) != PA_STREAM_READY) {
48953a5a1b3Sopenharmony_ci                *((int64_t*) data) = 0;
49053a5a1b3Sopenharmony_ci                return 0;
49153a5a1b3Sopenharmony_ci            }
49253a5a1b3Sopenharmony_ci
49353a5a1b3Sopenharmony_ci            if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) {
49453a5a1b3Sopenharmony_ci                *((int64_t*) data) = 0;
49553a5a1b3Sopenharmony_ci                return 0;
49653a5a1b3Sopenharmony_ci            }
49753a5a1b3Sopenharmony_ci
49853a5a1b3Sopenharmony_ci            if (negative)
49953a5a1b3Sopenharmony_ci                *((int64_t*) data) = - (int64_t)remote_latency;
50053a5a1b3Sopenharmony_ci            else
50153a5a1b3Sopenharmony_ci                *((int64_t*) data) = remote_latency;
50253a5a1b3Sopenharmony_ci
50353a5a1b3Sopenharmony_ci            return 0;
50453a5a1b3Sopenharmony_ci        }
50553a5a1b3Sopenharmony_ci        case TUNNEL_MESSAGE_SOURCE_CREATED:
50653a5a1b3Sopenharmony_ci            on_source_created(u);
50753a5a1b3Sopenharmony_ci            return 0;
50853a5a1b3Sopenharmony_ci    }
50953a5a1b3Sopenharmony_ci    return pa_source_process_msg(o, code, data, offset, chunk);
51053a5a1b3Sopenharmony_ci}
51153a5a1b3Sopenharmony_ci
51253a5a1b3Sopenharmony_ci/* Called from the IO thread. */
51353a5a1b3Sopenharmony_cistatic int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
51453a5a1b3Sopenharmony_ci    struct userdata *u;
51553a5a1b3Sopenharmony_ci
51653a5a1b3Sopenharmony_ci    pa_assert(s);
51753a5a1b3Sopenharmony_ci    pa_assert_se(u = s->userdata);
51853a5a1b3Sopenharmony_ci
51953a5a1b3Sopenharmony_ci    /* It may be that only the suspend cause is changing, in which case there's
52053a5a1b3Sopenharmony_ci     * nothing to do. */
52153a5a1b3Sopenharmony_ci    if (new_state == s->thread_info.state)
52253a5a1b3Sopenharmony_ci        return 0;
52353a5a1b3Sopenharmony_ci
52453a5a1b3Sopenharmony_ci    if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
52553a5a1b3Sopenharmony_ci        return 0;
52653a5a1b3Sopenharmony_ci
52753a5a1b3Sopenharmony_ci    switch (new_state) {
52853a5a1b3Sopenharmony_ci        case PA_SOURCE_SUSPENDED: {
52953a5a1b3Sopenharmony_ci            cork_stream(u, true);
53053a5a1b3Sopenharmony_ci            break;
53153a5a1b3Sopenharmony_ci        }
53253a5a1b3Sopenharmony_ci        case PA_SOURCE_IDLE:
53353a5a1b3Sopenharmony_ci        case PA_SOURCE_RUNNING: {
53453a5a1b3Sopenharmony_ci            cork_stream(u, false);
53553a5a1b3Sopenharmony_ci            break;
53653a5a1b3Sopenharmony_ci        }
53753a5a1b3Sopenharmony_ci        case PA_SOURCE_INVALID_STATE:
53853a5a1b3Sopenharmony_ci        case PA_SOURCE_INIT:
53953a5a1b3Sopenharmony_ci        case PA_SOURCE_UNLINKED:
54053a5a1b3Sopenharmony_ci            break;
54153a5a1b3Sopenharmony_ci    }
54253a5a1b3Sopenharmony_ci
54353a5a1b3Sopenharmony_ci    return 0;
54453a5a1b3Sopenharmony_ci}
54553a5a1b3Sopenharmony_ci
54653a5a1b3Sopenharmony_ci/* Creates a source in the main thread.
54753a5a1b3Sopenharmony_ci *
54853a5a1b3Sopenharmony_ci * This method is called when we receive a message from the io thread that a
54953a5a1b3Sopenharmony_ci * connection has been established with the server.  We defer creation of the
55053a5a1b3Sopenharmony_ci * source until the connection is established, because we don't have a source
55153a5a1b3Sopenharmony_ci * if the remote server isn't there.
55253a5a1b3Sopenharmony_ci */
55353a5a1b3Sopenharmony_cistatic void create_source(struct userdata *u) {
55453a5a1b3Sopenharmony_ci    pa_source_new_data source_data;
55553a5a1b3Sopenharmony_ci
55653a5a1b3Sopenharmony_ci    pa_assert_ctl_context();
55753a5a1b3Sopenharmony_ci
55853a5a1b3Sopenharmony_ci    /* Create source */
55953a5a1b3Sopenharmony_ci    pa_source_new_data_init(&source_data);
56053a5a1b3Sopenharmony_ci    source_data.driver = __FILE__;
56153a5a1b3Sopenharmony_ci    source_data.module = u->module;
56253a5a1b3Sopenharmony_ci
56353a5a1b3Sopenharmony_ci    pa_source_new_data_set_name(&source_data, u->source_name);
56453a5a1b3Sopenharmony_ci    pa_source_new_data_set_sample_spec(&source_data, &u->sample_spec);
56553a5a1b3Sopenharmony_ci    pa_source_new_data_set_channel_map(&source_data, &u->channel_map);
56653a5a1b3Sopenharmony_ci
56753a5a1b3Sopenharmony_ci    pa_proplist_update(source_data.proplist, PA_UPDATE_REPLACE, u->source_proplist);
56853a5a1b3Sopenharmony_ci
56953a5a1b3Sopenharmony_ci    if (!(u->source = pa_source_new(u->module->core, &source_data, PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY | PA_SOURCE_NETWORK))) {
57053a5a1b3Sopenharmony_ci        pa_log("Failed to create source.");
57153a5a1b3Sopenharmony_ci        goto finish;
57253a5a1b3Sopenharmony_ci    }
57353a5a1b3Sopenharmony_ci
57453a5a1b3Sopenharmony_ci    u->source->userdata = u;
57553a5a1b3Sopenharmony_ci    u->source->parent.process_msg = source_process_msg_cb;
57653a5a1b3Sopenharmony_ci    u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
57753a5a1b3Sopenharmony_ci    u->source->update_requested_latency = source_update_requested_latency_cb;
57853a5a1b3Sopenharmony_ci
57953a5a1b3Sopenharmony_ci    pa_source_set_asyncmsgq(u->source, u->thread_mq->inq);
58053a5a1b3Sopenharmony_ci    pa_source_set_rtpoll(u->source, u->rtpoll);
58153a5a1b3Sopenharmony_ci
58253a5a1b3Sopenharmony_ci    pa_source_put(u->source);
58353a5a1b3Sopenharmony_ci
58453a5a1b3Sopenharmony_cifinish:
58553a5a1b3Sopenharmony_ci    pa_source_new_data_done(&source_data);
58653a5a1b3Sopenharmony_ci
58753a5a1b3Sopenharmony_ci    /* tell any interested io threads that the sink they asked for has now been
58853a5a1b3Sopenharmony_ci     * created (even if we failed, we still notify the thread, so they can
58953a5a1b3Sopenharmony_ci     * either handle or kill the thread, rather than deadlock waiting for a
59053a5a1b3Sopenharmony_ci     * message that will never come */
59153a5a1b3Sopenharmony_ci    pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), TUNNEL_MESSAGE_SOURCE_CREATED, u, 0, NULL);
59253a5a1b3Sopenharmony_ci}
59353a5a1b3Sopenharmony_ci
59453a5a1b3Sopenharmony_ci/* Runs in PA mainloop context */
59553a5a1b3Sopenharmony_cistatic int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
59653a5a1b3Sopenharmony_ci    struct userdata *u = (struct userdata *) data;
59753a5a1b3Sopenharmony_ci
59853a5a1b3Sopenharmony_ci    pa_assert(u);
59953a5a1b3Sopenharmony_ci    pa_assert_ctl_context();
60053a5a1b3Sopenharmony_ci
60153a5a1b3Sopenharmony_ci    if (u->shutting_down)
60253a5a1b3Sopenharmony_ci        return 0;
60353a5a1b3Sopenharmony_ci
60453a5a1b3Sopenharmony_ci    switch (code) {
60553a5a1b3Sopenharmony_ci        case TUNNEL_MESSAGE_CREATE_SOURCE_REQUEST:
60653a5a1b3Sopenharmony_ci            create_source(u);
60753a5a1b3Sopenharmony_ci            break;
60853a5a1b3Sopenharmony_ci        case TUNNEL_MESSAGE_MAYBE_RESTART:
60953a5a1b3Sopenharmony_ci            maybe_restart(u->module->userdata);
61053a5a1b3Sopenharmony_ci            break;
61153a5a1b3Sopenharmony_ci    }
61253a5a1b3Sopenharmony_ci
61353a5a1b3Sopenharmony_ci    return 0;
61453a5a1b3Sopenharmony_ci}
61553a5a1b3Sopenharmony_ci
61653a5a1b3Sopenharmony_cistatic int do_init(pa_module *m) {
61753a5a1b3Sopenharmony_ci    struct userdata *u = NULL;
61853a5a1b3Sopenharmony_ci    struct module_restart_data *rd;
61953a5a1b3Sopenharmony_ci    pa_modargs *ma = NULL;
62053a5a1b3Sopenharmony_ci    const char *remote_server = NULL;
62153a5a1b3Sopenharmony_ci    char *default_source_name = NULL;
62253a5a1b3Sopenharmony_ci    uint32_t reconnect_interval_ms = 0;
62353a5a1b3Sopenharmony_ci
62453a5a1b3Sopenharmony_ci    pa_assert(m);
62553a5a1b3Sopenharmony_ci    pa_assert(m->userdata);
62653a5a1b3Sopenharmony_ci
62753a5a1b3Sopenharmony_ci    rd = m->userdata;
62853a5a1b3Sopenharmony_ci
62953a5a1b3Sopenharmony_ci    if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
63053a5a1b3Sopenharmony_ci        pa_log("Failed to parse module arguments.");
63153a5a1b3Sopenharmony_ci        goto fail;
63253a5a1b3Sopenharmony_ci    }
63353a5a1b3Sopenharmony_ci
63453a5a1b3Sopenharmony_ci    u = pa_xnew0(struct userdata, 1);
63553a5a1b3Sopenharmony_ci    u->module = m;
63653a5a1b3Sopenharmony_ci    rd->userdata = u;
63753a5a1b3Sopenharmony_ci
63853a5a1b3Sopenharmony_ci    u->sample_spec = m->core->default_sample_spec;
63953a5a1b3Sopenharmony_ci    u->channel_map = m->core->default_channel_map;
64053a5a1b3Sopenharmony_ci    if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
64153a5a1b3Sopenharmony_ci        pa_log("Invalid sample format specification or channel map");
64253a5a1b3Sopenharmony_ci        goto fail;
64353a5a1b3Sopenharmony_ci    }
64453a5a1b3Sopenharmony_ci
64553a5a1b3Sopenharmony_ci    remote_server = pa_modargs_get_value(ma, "server", NULL);
64653a5a1b3Sopenharmony_ci    if (!remote_server) {
64753a5a1b3Sopenharmony_ci        pa_log("No server given!");
64853a5a1b3Sopenharmony_ci        goto fail;
64953a5a1b3Sopenharmony_ci    }
65053a5a1b3Sopenharmony_ci
65153a5a1b3Sopenharmony_ci    u->remote_server = pa_xstrdup(remote_server);
65253a5a1b3Sopenharmony_ci    u->thread_mainloop = pa_mainloop_new();
65353a5a1b3Sopenharmony_ci    if (u->thread_mainloop == NULL) {
65453a5a1b3Sopenharmony_ci        pa_log("Failed to create mainloop");
65553a5a1b3Sopenharmony_ci        goto fail;
65653a5a1b3Sopenharmony_ci    }
65753a5a1b3Sopenharmony_ci    u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop);
65853a5a1b3Sopenharmony_ci    u->cookie_file = pa_xstrdup(pa_modargs_get_value(ma, "cookie", NULL));
65953a5a1b3Sopenharmony_ci    u->remote_source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
66053a5a1b3Sopenharmony_ci
66153a5a1b3Sopenharmony_ci    u->thread_mq = pa_xnew0(pa_thread_mq, 1);
66253a5a1b3Sopenharmony_ci
66353a5a1b3Sopenharmony_ci    if (pa_thread_mq_init_thread_mainloop(u->thread_mq, m->core->mainloop, u->thread_mainloop_api) < 0) {
66453a5a1b3Sopenharmony_ci        pa_log("pa_thread_mq_init_thread_mainloop() failed.");
66553a5a1b3Sopenharmony_ci        goto fail;
66653a5a1b3Sopenharmony_ci    }
66753a5a1b3Sopenharmony_ci
66853a5a1b3Sopenharmony_ci    u->msg = pa_msgobject_new(tunnel_msg);
66953a5a1b3Sopenharmony_ci    u->msg->parent.process_msg = tunnel_process_msg;
67053a5a1b3Sopenharmony_ci
67153a5a1b3Sopenharmony_ci    /* The rtpoll created here is never run. It is only necessary to avoid crashes
67253a5a1b3Sopenharmony_ci     * when module-tunnel-source-new is used together with module-loopback.
67353a5a1b3Sopenharmony_ci     * module-loopback bases the asyncmsq on the rtpoll provided by the source and
67453a5a1b3Sopenharmony_ci     * only works because it calls pa_asyncmsq_process_one(). */
67553a5a1b3Sopenharmony_ci    u->rtpoll = pa_rtpoll_new();
67653a5a1b3Sopenharmony_ci
67753a5a1b3Sopenharmony_ci    default_source_name = pa_sprintf_malloc("tunnel-source-new.%s", remote_server);
67853a5a1b3Sopenharmony_ci    u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", default_source_name));
67953a5a1b3Sopenharmony_ci
68053a5a1b3Sopenharmony_ci    u->source_proplist = pa_proplist_new();
68153a5a1b3Sopenharmony_ci    pa_proplist_sets(u->source_proplist, PA_PROP_DEVICE_CLASS, "sound");
68253a5a1b3Sopenharmony_ci    pa_proplist_setf(u->source_proplist,
68353a5a1b3Sopenharmony_ci                     PA_PROP_DEVICE_DESCRIPTION,
68453a5a1b3Sopenharmony_ci                     _("Tunnel to %s/%s"),
68553a5a1b3Sopenharmony_ci                     remote_server,
68653a5a1b3Sopenharmony_ci                     pa_strempty(u->remote_source_name));
68753a5a1b3Sopenharmony_ci
68853a5a1b3Sopenharmony_ci    if (pa_modargs_get_proplist(ma, "source_properties", u->source_proplist, PA_UPDATE_REPLACE) < 0) {
68953a5a1b3Sopenharmony_ci        pa_log("Invalid properties");
69053a5a1b3Sopenharmony_ci        goto fail;
69153a5a1b3Sopenharmony_ci    }
69253a5a1b3Sopenharmony_ci
69353a5a1b3Sopenharmony_ci    pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms);
69453a5a1b3Sopenharmony_ci    u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC;
69553a5a1b3Sopenharmony_ci
69653a5a1b3Sopenharmony_ci    if (!(u->thread = pa_thread_new("tunnel-source", thread_func, u))) {
69753a5a1b3Sopenharmony_ci        pa_log("Failed to create thread.");
69853a5a1b3Sopenharmony_ci        goto fail;
69953a5a1b3Sopenharmony_ci    }
70053a5a1b3Sopenharmony_ci
70153a5a1b3Sopenharmony_ci    /* If the module is restarting and do_init() finishes successfully, the
70253a5a1b3Sopenharmony_ci     * restart data is no longer needed. If do_init() fails, don't touch the
70353a5a1b3Sopenharmony_ci     * restart data, because following restart attempts will continue to use
70453a5a1b3Sopenharmony_ci     * the same data. If restart_data is NULL, that means no restart is
70553a5a1b3Sopenharmony_ci     * currently pending. */
70653a5a1b3Sopenharmony_ci    if (rd->restart_data) {
70753a5a1b3Sopenharmony_ci        pa_restart_free(rd->restart_data);
70853a5a1b3Sopenharmony_ci        rd->restart_data = NULL;
70953a5a1b3Sopenharmony_ci    }
71053a5a1b3Sopenharmony_ci
71153a5a1b3Sopenharmony_ci    pa_modargs_free(ma);
71253a5a1b3Sopenharmony_ci    pa_xfree(default_source_name);
71353a5a1b3Sopenharmony_ci
71453a5a1b3Sopenharmony_ci    return 0;
71553a5a1b3Sopenharmony_ci
71653a5a1b3Sopenharmony_cifail:
71753a5a1b3Sopenharmony_ci    if (ma)
71853a5a1b3Sopenharmony_ci        pa_modargs_free(ma);
71953a5a1b3Sopenharmony_ci
72053a5a1b3Sopenharmony_ci    if (default_source_name)
72153a5a1b3Sopenharmony_ci        pa_xfree(default_source_name);
72253a5a1b3Sopenharmony_ci
72353a5a1b3Sopenharmony_ci    return -1;
72453a5a1b3Sopenharmony_ci}
72553a5a1b3Sopenharmony_ci
72653a5a1b3Sopenharmony_cistatic void do_done(pa_module *m) {
72753a5a1b3Sopenharmony_ci    struct userdata *u = NULL;
72853a5a1b3Sopenharmony_ci    struct module_restart_data *rd;
72953a5a1b3Sopenharmony_ci
73053a5a1b3Sopenharmony_ci    pa_assert(m);
73153a5a1b3Sopenharmony_ci
73253a5a1b3Sopenharmony_ci    if (!(rd = m->userdata))
73353a5a1b3Sopenharmony_ci        return;
73453a5a1b3Sopenharmony_ci    if (!(u = rd->userdata))
73553a5a1b3Sopenharmony_ci        return;
73653a5a1b3Sopenharmony_ci
73753a5a1b3Sopenharmony_ci    u->shutting_down = true;
73853a5a1b3Sopenharmony_ci
73953a5a1b3Sopenharmony_ci    if (u->source)
74053a5a1b3Sopenharmony_ci        pa_source_unlink(u->source);
74153a5a1b3Sopenharmony_ci
74253a5a1b3Sopenharmony_ci    if (u->thread) {
74353a5a1b3Sopenharmony_ci        pa_asyncmsgq_send(u->thread_mq->inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
74453a5a1b3Sopenharmony_ci        pa_thread_free(u->thread);
74553a5a1b3Sopenharmony_ci    }
74653a5a1b3Sopenharmony_ci
74753a5a1b3Sopenharmony_ci    if (u->thread_mq) {
74853a5a1b3Sopenharmony_ci        pa_thread_mq_done(u->thread_mq);
74953a5a1b3Sopenharmony_ci        pa_xfree(u->thread_mq);
75053a5a1b3Sopenharmony_ci    }
75153a5a1b3Sopenharmony_ci
75253a5a1b3Sopenharmony_ci    if (u->thread_mainloop)
75353a5a1b3Sopenharmony_ci        pa_mainloop_free(u->thread_mainloop);
75453a5a1b3Sopenharmony_ci
75553a5a1b3Sopenharmony_ci    if (u->cookie_file)
75653a5a1b3Sopenharmony_ci        pa_xfree(u->cookie_file);
75753a5a1b3Sopenharmony_ci
75853a5a1b3Sopenharmony_ci    if (u->remote_source_name)
75953a5a1b3Sopenharmony_ci        pa_xfree(u->remote_source_name);
76053a5a1b3Sopenharmony_ci
76153a5a1b3Sopenharmony_ci    if (u->remote_server)
76253a5a1b3Sopenharmony_ci        pa_xfree(u->remote_server);
76353a5a1b3Sopenharmony_ci
76453a5a1b3Sopenharmony_ci    if (u->source)
76553a5a1b3Sopenharmony_ci        pa_source_unref(u->source);
76653a5a1b3Sopenharmony_ci
76753a5a1b3Sopenharmony_ci    if (u->rtpoll)
76853a5a1b3Sopenharmony_ci        pa_rtpoll_free(u->rtpoll);
76953a5a1b3Sopenharmony_ci
77053a5a1b3Sopenharmony_ci    if (u->source_proplist)
77153a5a1b3Sopenharmony_ci        pa_proplist_free(u->source_proplist);
77253a5a1b3Sopenharmony_ci
77353a5a1b3Sopenharmony_ci    if (u->source_name)
77453a5a1b3Sopenharmony_ci        pa_xfree(u->source_name);
77553a5a1b3Sopenharmony_ci
77653a5a1b3Sopenharmony_ci    pa_xfree(u->msg);
77753a5a1b3Sopenharmony_ci
77853a5a1b3Sopenharmony_ci    pa_xfree(u);
77953a5a1b3Sopenharmony_ci
78053a5a1b3Sopenharmony_ci    rd->userdata = NULL;
78153a5a1b3Sopenharmony_ci}
78253a5a1b3Sopenharmony_ci
78353a5a1b3Sopenharmony_ciint pa__init(pa_module *m) {
78453a5a1b3Sopenharmony_ci    int ret;
78553a5a1b3Sopenharmony_ci
78653a5a1b3Sopenharmony_ci    pa_assert(m);
78753a5a1b3Sopenharmony_ci
78853a5a1b3Sopenharmony_ci    m->userdata = pa_xnew0(struct module_restart_data, 1);
78953a5a1b3Sopenharmony_ci
79053a5a1b3Sopenharmony_ci    ret = do_init(m);
79153a5a1b3Sopenharmony_ci
79253a5a1b3Sopenharmony_ci    if (ret < 0)
79353a5a1b3Sopenharmony_ci        pa__done(m);
79453a5a1b3Sopenharmony_ci
79553a5a1b3Sopenharmony_ci    return ret;
79653a5a1b3Sopenharmony_ci}
79753a5a1b3Sopenharmony_ci
79853a5a1b3Sopenharmony_civoid pa__done(pa_module *m) {
79953a5a1b3Sopenharmony_ci    pa_assert(m);
80053a5a1b3Sopenharmony_ci
80153a5a1b3Sopenharmony_ci    do_done(m);
80253a5a1b3Sopenharmony_ci
80353a5a1b3Sopenharmony_ci    if (m->userdata) {
80453a5a1b3Sopenharmony_ci        struct module_restart_data *rd = m->userdata;
80553a5a1b3Sopenharmony_ci
80653a5a1b3Sopenharmony_ci        if (rd->restart_data)
80753a5a1b3Sopenharmony_ci            pa_restart_free(rd->restart_data);
80853a5a1b3Sopenharmony_ci
80953a5a1b3Sopenharmony_ci        pa_xfree(m->userdata);
81053a5a1b3Sopenharmony_ci    }
81153a5a1b3Sopenharmony_ci}
812