/*** This file is part of PulseAudio. Copyright 2004-2006 Lennart Poettering Copyright 2006 Pierre Ossman for Cendio AB PulseAudio is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. PulseAudio is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with PulseAudio; if not, see . ***/ #ifdef HAVE_CONFIG_H #include #endif #include "restart-module.h" #include #include #include #include #include #include #ifdef HAVE_X11 #include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef USE_SMOOTHER_2 #include #else #include #endif #include #include #include #include #include #include #include #include #ifdef HAVE_X11 #include #endif #define ENV_DEFAULT_SINK "PULSE_SINK" #define ENV_DEFAULT_SOURCE "PULSE_SOURCE" #define ENV_DEFAULT_SERVER "PULSE_SERVER" #define ENV_COOKIE_FILE "PULSE_COOKIE" #ifdef TUNNEL_SINK PA_MODULE_DESCRIPTION("Tunnel module for sinks"); PA_MODULE_USAGE( "sink_name= " "sink_properties= " "auto= " "server=
" "sink= " "reconnect_interval_ms= " "cookie= " "format= " "channels= " "rate= " "latency_msec= " "channel_map="); #else PA_MODULE_DESCRIPTION("Tunnel module for sources"); PA_MODULE_USAGE( "source_name= " "source_properties= " "auto= " "server=
" "source= " "reconnect_interval_ms= " "cookie= " "format= " "channels= " "rate= " "latency_msec= " "channel_map="); #endif PA_MODULE_AUTHOR("Lennart Poettering"); PA_MODULE_VERSION(PACKAGE_VERSION); PA_MODULE_LOAD_ONCE(false); static const char* const valid_modargs[] = { "auto", "server", "cookie", "format", "channels", "rate", "latency_msec", "reconnect_interval_ms", #ifdef TUNNEL_SINK "sink_name", "sink_properties", "sink", #else "source_name", "source_properties", "source", #endif "channel_map", NULL, }; #define DEFAULT_TIMEOUT 5 #define LATENCY_INTERVAL (1*PA_USEC_PER_SEC) #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC) #ifdef TUNNEL_SINK enum { SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX, SINK_MESSAGE_REMOTE_SUSPEND, SINK_MESSAGE_UPDATE_LATENCY, SINK_MESSAGE_GET_LATENCY_SNAPSHOT, SINK_MESSAGE_POST, }; #define DEFAULT_LATENCY_MSEC 100 #else enum { SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX, SOURCE_MESSAGE_REMOTE_SUSPEND, SOURCE_MESSAGE_UPDATE_LATENCY, SOURCE_MESSAGE_GET_LATENCY_SNAPSHOT, }; #define DEFAULT_LATENCY_MSEC 25 #endif struct tunnel_msg { pa_msgobject parent; }; typedef struct tunnel_msg tunnel_msg; PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject); enum { TUNNEL_MESSAGE_MAYBE_RESTART, }; static int do_init(pa_module *m); static void do_done(pa_module *m); #ifdef TUNNEL_SINK static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); #endif static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata); static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = { #ifdef TUNNEL_SINK [PA_COMMAND_REQUEST] = command_request, [PA_COMMAND_STARTED] = command_started, #endif [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event, [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow, [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow, [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed, [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed, [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended, [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended, [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved, [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved, [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event, [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event, [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event, [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed, [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed, [PA_COMMAND_UNDERFLOW_OHOS] = command_overflow_or_underflow, }; struct userdata { pa_core *core; pa_module *module; pa_thread_mq thread_mq; pa_rtpoll *rtpoll; pa_thread *thread; pa_socket_client *client; pa_pstream *pstream; pa_pdispatch *pdispatch; char *server_name; #ifdef TUNNEL_SINK char *sink_name; char *configured_sink_name; pa_sink *sink; size_t requested_bytes; #else char *source_name; char *configured_source_name; pa_source *source; pa_mcalign *mcalign; #endif pa_auth_cookie *auth_cookie; uint32_t version; uint32_t ctag; uint32_t device_index; uint32_t channel; uint32_t latency; int64_t counter; uint64_t receive_counter; uint64_t receive_snapshot; bool remote_corked:1; bool remote_suspended:1; bool shutting_down:1; pa_usec_t transport_usec; /* maintained in the main thread */ pa_usec_t thread_transport_usec; /* maintained in the IO thread */ uint32_t ignore_latency_before; pa_time_event *time_event; #ifdef USE_SMOOTHER_2 pa_smoother_2 *smoother; #else pa_smoother *smoother; #endif char *device_description; char *server_fqdn; char *user_name; uint32_t maxlength; #ifdef TUNNEL_SINK uint32_t tlength; uint32_t minreq; uint32_t prebuf; pa_proplist *sink_proplist; #else uint32_t fragsize; pa_proplist *source_proplist; #endif pa_sample_spec sample_spec; pa_channel_map channel_map; tunnel_msg *msg; pa_iochannel *io; pa_usec_t reconnect_interval_us; pa_usec_t snapshot_time; }; struct module_restart_data { struct userdata *userdata; pa_restart_data *restart_data; }; static void request_latency(struct userdata *u); #ifdef TUNNEL_SINK static void create_sink(struct userdata *u); static void on_sink_created(struct userdata *u); #else static void create_source(struct userdata *u); static void on_source_created(struct userdata *u); #endif /* Do a reinit of the module. Note that u will be freed as a result of this * call. */ static void unload_module(struct module_restart_data *rd) { struct userdata *u = rd->userdata; if (rd->restart_data) { pa_log_debug("Restart already pending"); return; } if (u->reconnect_interval_us > 0) { /* The handle returned here must be freed when do_init() was successful and when the * module exits. */ rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us); } else pa_module_unload_request(u->module, true); } /* Called from main context */ static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { pa_log_debug("Got stream or client event."); } /* Called from main context */ static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; pa_assert(pd); pa_assert(t); pa_assert(u); pa_assert(u->pdispatch == pd); pa_log_warn("Stream killed"); unload_module(u->module->userdata); } /* Called from main context */ static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; pa_assert(pd); pa_assert(t); pa_assert(u); pa_assert(u->pdispatch == pd); pa_log_info("Server signalled buffer overrun/underrun."); request_latency(u); } /* Called from main context */ static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; uint32_t channel; bool suspended; pa_assert(pd); pa_assert(t); pa_assert(u); pa_assert(u->pdispatch == pd); if (pa_tagstruct_getu32(t, &channel) < 0 || pa_tagstruct_get_boolean(t, &suspended) < 0 || !pa_tagstruct_eof(t)) { pa_log("Invalid packet."); unload_module(u->module->userdata); return; } pa_log_debug("Server reports device suspend."); #ifdef TUNNEL_SINK pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL); #else pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL); #endif request_latency(u); } /* Called from main context */ static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; uint32_t channel, di; const char *dn; bool suspended; pa_assert(pd); pa_assert(t); pa_assert(u); pa_assert(u->pdispatch == pd); if (pa_tagstruct_getu32(t, &channel) < 0 || pa_tagstruct_getu32(t, &di) < 0 || pa_tagstruct_gets(t, &dn) < 0 || pa_tagstruct_get_boolean(t, &suspended) < 0) { pa_log_error("Invalid packet."); unload_module(u->module->userdata); return; } pa_log_debug("Server reports a stream move."); #ifdef TUNNEL_SINK pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL); #else pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(suspended), 0, NULL); #endif request_latency(u); } static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq; pa_usec_t usec; pa_assert(pd); pa_assert(t); pa_assert(u); pa_assert(u->pdispatch == pd); if (pa_tagstruct_getu32(t, &channel) < 0 || pa_tagstruct_getu32(t, &maxlength) < 0) { pa_log_error("Invalid packet."); unload_module(u->module->userdata); return; } if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) { if (pa_tagstruct_getu32(t, &fragsize) < 0 || pa_tagstruct_get_usec(t, &usec) < 0) { pa_log_error("Invalid packet."); unload_module(u->module->userdata); return; } } else { if (pa_tagstruct_getu32(t, &tlength) < 0 || pa_tagstruct_getu32(t, &prebuf) < 0 || pa_tagstruct_getu32(t, &minreq) < 0 || pa_tagstruct_get_usec(t, &usec) < 0) { pa_log_error("Invalid packet."); unload_module(u->module->userdata); return; } } #ifdef TUNNEL_SINK pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength); #endif request_latency(u); } #ifdef TUNNEL_SINK /* Called from main context */ static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; pa_assert(pd); pa_assert(t); pa_assert(u); pa_assert(u->pdispatch == pd); pa_log_debug("Server reports playback started."); request_latency(u); } #endif /* Called from IO thread context */ static void check_smoother_status(struct userdata *u, bool past) { pa_usec_t x; pa_assert(u); x = pa_rtclock_now(); /* Correct by the time the requested issued needs to travel to the * other side. This is a valid thread-safe access, because the * main thread is waiting for us */ if (past) x -= u->thread_transport_usec; else x += u->thread_transport_usec; if (u->remote_suspended || u->remote_corked) #ifdef USE_SMOOTHER_2 pa_smoother_2_pause(u->smoother, x); else pa_smoother_2_resume(u->smoother, x); #else pa_smoother_pause(u->smoother, x); else pa_smoother_resume(u->smoother, x, true); #endif } /* Called from IO thread context */ static void stream_cork_within_thread(struct userdata *u, bool cork) { pa_assert(u); if (u->remote_corked == cork) return; u->remote_corked = cork; check_smoother_status(u, false); } /* Called from main context */ static void stream_cork(struct userdata *u, bool cork) { pa_tagstruct *t; pa_assert(u); if (!u->pstream) return; t = pa_tagstruct_new(); #ifdef TUNNEL_SINK pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM); #else pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM); #endif pa_tagstruct_putu32(t, u->ctag++); pa_tagstruct_putu32(t, u->channel); pa_tagstruct_put_boolean(t, cork); pa_pstream_send_tagstruct(u->pstream, t); request_latency(u); } /* Called from IO thread context */ static void stream_suspend_within_thread(struct userdata *u, bool suspend) { pa_assert(u); if (u->remote_suspended == suspend) return; u->remote_suspended = suspend; check_smoother_status(u, true); } #ifdef TUNNEL_SINK /* Called from IO thread context */ static void send_data(struct userdata *u) { pa_assert(u); while (u->requested_bytes > 0) { pa_memchunk memchunk; pa_sink_render(u->sink, u->requested_bytes, &memchunk); pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL); pa_memblock_unref(memchunk.memblock); u->requested_bytes -= memchunk.length; u->counter += (int64_t) memchunk.length; } } /* This function is called from IO context -- except when it is not. */ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SINK(o)->userdata; switch (code) { case PA_SINK_MESSAGE_SET_STATE: { int r; /* First, change the state, because otherwise pa_sink_render() would fail */ if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) { stream_cork_within_thread(u, u->sink->thread_info.state == PA_SINK_SUSPENDED); if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) send_data(u); } return r; } case PA_SINK_MESSAGE_GET_LATENCY: { int64_t *usec = data; #ifdef USE_SMOOTHER_2 *usec = pa_smoother_2_get_delay(u->smoother, pa_rtclock_now(), u->counter); #else pa_usec_t yl, yr; yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec); yr = pa_smoother_get(u->smoother, pa_rtclock_now()); *usec = (int64_t)yl - yr; #endif return 0; } case SINK_MESSAGE_GET_LATENCY_SNAPSHOT: { int64_t *send_counter = data; *send_counter = u->counter; return 0; } case SINK_MESSAGE_REQUEST: pa_assert(offset > 0); u->requested_bytes += (size_t) offset; if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) send_data(u); return 0; case SINK_MESSAGE_REMOTE_SUSPEND: stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data)); return 0; case SINK_MESSAGE_UPDATE_LATENCY: { #ifdef USE_SMOOTHER_2 int64_t bytes; if (offset < 0) bytes = - pa_usec_to_bytes(- offset, &u->sink->sample_spec); else bytes = pa_usec_to_bytes(offset, &u->sink->sample_spec); if (u->counter > bytes) bytes = u->counter - bytes; else bytes = 0; /* We may use u->snapshot time because the main thread is waiting */ pa_smoother_2_put(u->smoother, u->snapshot_time, bytes); #else pa_usec_t y; y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec); if (y > (pa_usec_t) offset) y -= (pa_usec_t) offset; else y = 0; /* We may use u->snapshot time because the main thread is waiting */ pa_smoother_put(u->smoother, u->snapshot_time, y); #endif /* We can access this freely here, since the main thread is waiting for us */ u->thread_transport_usec = u->transport_usec; return 0; } case SINK_MESSAGE_POST: /* OK, This might be a bit confusing. This message is * delivered to us from the main context -- NOT from the * IO thread context where the rest of the messages are * dispatched. Yeah, ugly, but I am a lazy bastard. */ pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk); u->receive_counter += chunk->length; return 0; } return pa_sink_process_msg(o, code, data, offset, chunk); } /* Called from main context */ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) { struct userdata *u; pa_sink_assert_ref(s); u = s->userdata; /* It may be that only the suspend cause is changing, in which * case there's nothing to do. */ if (state == s->state) return 0; switch ((pa_sink_state_t) state) { case PA_SINK_SUSPENDED: pa_assert(PA_SINK_IS_OPENED(s->state)); stream_cork(u, true); break; case PA_SINK_IDLE: case PA_SINK_RUNNING: if (s->state == PA_SINK_SUSPENDED) stream_cork(u, false); break; case PA_SINK_UNLINKED: case PA_SINK_INIT: case PA_SINK_INVALID_STATE: ; } return 0; } #else /* This function is called from IO context -- except when it is not. */ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SOURCE(o)->userdata; switch (code) { case PA_SOURCE_MESSAGE_SET_STATE: { int r; if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0) stream_cork_within_thread(u, u->source->thread_info.state == PA_SOURCE_SUSPENDED); return r; } case PA_SOURCE_MESSAGE_GET_LATENCY: { int64_t *usec = data; #ifdef USE_SMOOTHER_2 *usec = - pa_smoother_2_get_delay(u->smoother, pa_rtclock_now(), u->counter); #else pa_usec_t yr, yl; yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec); yr = pa_smoother_get(u->smoother, pa_rtclock_now()); *usec = (int64_t)yr - yl; #endif return 0; } case SOURCE_MESSAGE_GET_LATENCY_SNAPSHOT: { int64_t *send_counter = data; *send_counter = u->counter; return 0; } case SOURCE_MESSAGE_POST: { pa_memchunk c; pa_mcalign_push(u->mcalign, chunk); while (pa_mcalign_pop(u->mcalign, &c) >= 0) { if (PA_SOURCE_IS_OPENED(u->source->thread_info.state)) pa_source_post(u->source, &c); pa_memblock_unref(c.memblock); u->counter += (int64_t) c.length; } return 0; } case SOURCE_MESSAGE_REMOTE_SUSPEND: stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data)); return 0; case SOURCE_MESSAGE_UPDATE_LATENCY: { #ifdef USE_SMOOTHER_2 int64_t bytes; if (offset < 0) bytes = - pa_usec_to_bytes(- offset, &u->source->sample_spec); else bytes = pa_usec_to_bytes(offset, &u->source->sample_spec); bytes += u->counter; /* We may use u->snapshot time because the main thread is waiting */ pa_smoother_2_put(u->smoother, u->snapshot_time, bytes); #else pa_usec_t y; y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec); y += offset; /* We may use u->snapshot time because the main thread is waiting */ pa_smoother_put(u->smoother, u->snapshot_time, y); #endif /* We can access this freely here, since the main thread is waiting for us */ u->thread_transport_usec = u->transport_usec; return 0; } } return pa_source_process_msg(o, code, data, offset, chunk); } /* Called from main context */ static int source_set_state_in_main_thread_cb(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) { struct userdata *u; pa_source_assert_ref(s); u = s->userdata; /* It may be that only the suspend cause is changing, in which * case there's nothing to do. */ if (state == s->state) return 0; switch ((pa_source_state_t) state) { case PA_SOURCE_SUSPENDED: pa_assert(PA_SOURCE_IS_OPENED(s->state)); stream_cork(u, true); break; case PA_SOURCE_IDLE: case PA_SOURCE_RUNNING: if (s->state == PA_SOURCE_SUSPENDED) stream_cork(u, false); break; case PA_SOURCE_UNLINKED: case PA_SOURCE_INIT: case PA_SOURCE_INVALID_STATE: ; } return 0; } #endif static void thread_func(void *userdata) { struct userdata *u = userdata; pa_assert(u); pa_log_debug("Thread starting up"); pa_thread_mq_install(&u->thread_mq); for (;;) { int ret; #ifdef TUNNEL_SINK if (u->sink && PA_UNLIKELY(u->sink->thread_info.rewind_requested)) pa_sink_process_rewind(u->sink, 0); #endif if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) goto fail; if (ret == 0) goto finish; } fail: /* If this was no regular exit from the loop we have to continue * processing messages until we received PA_MESSAGE_SHUTDOWN */ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_MAYBE_RESTART, u, 0, NULL, NULL); pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); finish: pa_log_debug("Thread shutting down"); } #ifdef TUNNEL_SINK /* Called from main context */ static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; uint32_t bytes, channel; pa_assert(pd); pa_assert(command == PA_COMMAND_REQUEST); pa_assert(t); pa_assert(u); pa_assert(u->pdispatch == pd); if (pa_tagstruct_getu32(t, &channel) < 0 || pa_tagstruct_getu32(t, &bytes) < 0) { pa_log("Invalid protocol reply"); goto fail; } if (channel != u->channel) { pa_log("Received data for invalid channel"); goto fail; } pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL); return; fail: unload_module(u->module->userdata); } #endif /* Called from main context */ static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; pa_usec_t sink_usec, source_usec; bool playing; int64_t write_index, read_index; struct timeval local, remote, now; pa_sample_spec *ss; int64_t delay; #ifdef TUNNEL_SINK uint64_t send_counter; #endif pa_assert(pd); pa_assert(u); if (command != PA_COMMAND_REPLY) { if (command == PA_COMMAND_ERROR) pa_log("Failed to get latency."); else pa_log("Protocol error."); goto fail; } if (pa_tagstruct_get_usec(t, &sink_usec) < 0 || pa_tagstruct_get_usec(t, &source_usec) < 0 || pa_tagstruct_get_boolean(t, &playing) < 0 || pa_tagstruct_get_timeval(t, &local) < 0 || pa_tagstruct_get_timeval(t, &remote) < 0 || pa_tagstruct_gets64(t, &write_index) < 0 || pa_tagstruct_gets64(t, &read_index) < 0) { pa_log("Invalid reply."); goto fail; } #ifdef TUNNEL_SINK if (u->version >= 13) { uint64_t underrun_for = 0, playing_for = 0; if (pa_tagstruct_getu64(t, &underrun_for) < 0 || pa_tagstruct_getu64(t, &playing_for) < 0) { pa_log("Invalid reply."); goto fail; } } #endif if (!pa_tagstruct_eof(t)) { pa_log("Invalid reply."); goto fail; } if (tag < u->ignore_latency_before) { return; } pa_gettimeofday(&now); /* Calculate transport usec */ if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now) < 0) { /* local and remote seem to have synchronized clocks */ #ifdef TUNNEL_SINK u->transport_usec = pa_timeval_diff(&remote, &local); #else u->transport_usec = pa_timeval_diff(&now, &remote); #endif } else u->transport_usec = pa_timeval_diff(&now, &local)/2; /* First, take the device's delay */ #ifdef TUNNEL_SINK delay = (int64_t) sink_usec; ss = &u->sink->sample_spec; #else delay = (int64_t) source_usec; ss = &u->source->sample_spec; #endif /* Add the length of our server-side buffer */ if (write_index >= read_index) delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss); else delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss); /* Our measurements are already out of date, hence correct by the * * transport latency */ #ifdef TUNNEL_SINK delay -= (int64_t) u->transport_usec; #else delay += (int64_t) u->transport_usec; #endif /* Now correct by what we have have written since we requested the update. This * is not necessary for the source, because if data is received between request * and reply, it was already posted before we requested the source latency. */ #ifdef TUNNEL_SINK pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_GET_LATENCY_SNAPSHOT, &send_counter, 0, NULL); delay += (int64_t) pa_bytes_to_usec(send_counter - u->receive_snapshot, ss); #endif /* It may take some time before the async message is executed, so we take a timestamp here */ u->snapshot_time = pa_rtclock_now(); #ifdef TUNNEL_SINK pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL); #else pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL); #endif return; fail: unload_module(u->module->userdata); } /* Called from main context */ static void request_latency(struct userdata *u) { pa_tagstruct *t; struct timeval now; uint32_t tag; pa_assert(u); t = pa_tagstruct_new(); #ifdef TUNNEL_SINK pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY); #else pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY); #endif pa_tagstruct_putu32(t, tag = u->ctag++); pa_tagstruct_putu32(t, u->channel); pa_tagstruct_put_timeval(t, pa_gettimeofday(&now)); pa_pstream_send_tagstruct(u->pstream, t); pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL); u->ignore_latency_before = tag; u->receive_snapshot = u->receive_counter; } /* Called from main context */ static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) { struct userdata *u = userdata; pa_assert(m); pa_assert(e); pa_assert(u); request_latency(u); pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL); } /* Called from main context */ static void update_description(struct userdata *u) { char *d; char un[128], hn[128]; pa_tagstruct *t; pa_assert(u); if (!u->server_fqdn || !u->user_name || !u->device_description) return; d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn); #ifdef TUNNEL_SINK pa_sink_set_description(u->sink, d); pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name); pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn); pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description); #else pa_source_set_description(u->source, d); pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name); pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn); pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description); #endif pa_xfree(d); d = pa_sprintf_malloc("%s for %s@%s", u->device_description, pa_get_user_name(un, sizeof(un)), pa_get_host_name(hn, sizeof(hn))); t = pa_tagstruct_new(); #ifdef TUNNEL_SINK pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME); #else pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME); #endif pa_tagstruct_putu32(t, u->ctag++); pa_tagstruct_putu32(t, u->channel); pa_tagstruct_puts(t, d); pa_pstream_send_tagstruct(u->pstream, t); pa_xfree(d); } /* Called from main context */ static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; pa_sample_spec ss; pa_channel_map cm; const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name; uint32_t cookie; pa_assert(pd); pa_assert(u); if (command != PA_COMMAND_REPLY) { if (command == PA_COMMAND_ERROR) pa_log("Failed to get info."); else pa_log("Protocol error."); goto fail; } if (pa_tagstruct_gets(t, &server_name) < 0 || pa_tagstruct_gets(t, &server_version) < 0 || pa_tagstruct_gets(t, &user_name) < 0 || pa_tagstruct_gets(t, &host_name) < 0 || pa_tagstruct_get_sample_spec(t, &ss) < 0 || pa_tagstruct_gets(t, &default_sink_name) < 0 || pa_tagstruct_gets(t, &default_source_name) < 0 || pa_tagstruct_getu32(t, &cookie) < 0 || (u->version >= 15 && pa_tagstruct_get_channel_map(t, &cm) < 0)) { pa_log("Parse failure"); goto fail; } if (!pa_tagstruct_eof(t)) { pa_log("Packet too long"); goto fail; } pa_xfree(u->server_fqdn); u->server_fqdn = pa_xstrdup(host_name); pa_xfree(u->user_name); u->user_name = pa_xstrdup(user_name); update_description(u); return; fail: unload_module(u->module->userdata); } static int read_ports(struct userdata *u, pa_tagstruct *t) { if (u->version >= 16) { uint32_t n_ports; const char *s; if (pa_tagstruct_getu32(t, &n_ports)) { pa_log("Parse failure"); return -PA_ERR_PROTOCOL; } for (uint32_t j = 0; j < n_ports; j++) { uint32_t priority; if (pa_tagstruct_gets(t, &s) < 0 || /* name */ pa_tagstruct_gets(t, &s) < 0 || /* description */ pa_tagstruct_getu32(t, &priority) < 0) { pa_log("Parse failure"); return -PA_ERR_PROTOCOL; } if (u->version >= 24) { if (pa_tagstruct_getu32(t, &priority) < 0) { /* available */ pa_log("Parse failure"); return -PA_ERR_PROTOCOL; } if (u->version >= 34 && (pa_tagstruct_gets(t, &s) < 0 || /* availability group */ pa_tagstruct_getu32(t, &priority) < 0)) { /* device port type */ pa_log("Parse failure"); return -PA_ERR_PROTOCOL; } } } if (pa_tagstruct_gets(t, &s) < 0) { /* active port */ pa_log("Parse failure"); return -PA_ERR_PROTOCOL; } } return 0; } static int read_formats(struct userdata *u, pa_tagstruct *t) { uint8_t n_formats; pa_format_info *format; if (pa_tagstruct_getu8(t, &n_formats) < 0) { /* no. of formats */ pa_log("Parse failure"); return -PA_ERR_PROTOCOL; } for (uint8_t j = 0; j < n_formats; j++) { format = pa_format_info_new(); if (pa_tagstruct_get_format_info(t, format)) { /* format info */ pa_format_info_free(format); pa_log("Parse failure"); return -PA_ERR_PROTOCOL; } pa_format_info_free(format); } return 0; } #ifdef TUNNEL_SINK /* Called from main context */ static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; uint32_t idx, owner_module, monitor_source, flags; const char *name, *description, *monitor_source_name, *driver; pa_sample_spec ss; pa_channel_map cm; pa_cvolume volume; bool mute; pa_usec_t latency; pa_assert(pd); pa_assert(u); if (command != PA_COMMAND_REPLY) { if (command == PA_COMMAND_ERROR) pa_log("Failed to get info."); else pa_log("Protocol error."); goto fail; } if (pa_tagstruct_getu32(t, &idx) < 0 || pa_tagstruct_gets(t, &name) < 0 || pa_tagstruct_gets(t, &description) < 0 || pa_tagstruct_get_sample_spec(t, &ss) < 0 || pa_tagstruct_get_channel_map(t, &cm) < 0 || pa_tagstruct_getu32(t, &owner_module) < 0 || pa_tagstruct_get_cvolume(t, &volume) < 0 || pa_tagstruct_get_boolean(t, &mute) < 0 || pa_tagstruct_getu32(t, &monitor_source) < 0 || pa_tagstruct_gets(t, &monitor_source_name) < 0 || pa_tagstruct_get_usec(t, &latency) < 0 || pa_tagstruct_gets(t, &driver) < 0 || pa_tagstruct_getu32(t, &flags) < 0) { pa_log("Parse failure"); goto fail; } if (u->version >= 13) { pa_usec_t configured_latency; if (pa_tagstruct_get_proplist(t, NULL) < 0 || pa_tagstruct_get_usec(t, &configured_latency) < 0) { pa_log("Parse failure"); goto fail; } } if (u->version >= 15) { pa_volume_t base_volume; uint32_t state, n_volume_steps, card; if (pa_tagstruct_get_volume(t, &base_volume) < 0 || pa_tagstruct_getu32(t, &state) < 0 || pa_tagstruct_getu32(t, &n_volume_steps) < 0 || pa_tagstruct_getu32(t, &card) < 0) { pa_log("Parse failure"); goto fail; } } if (read_ports(u, t) < 0) goto fail; if (u->version >= 21 && read_formats(u, t) < 0) goto fail; if (!pa_tagstruct_eof(t)) { pa_log("Packet too long"); goto fail; } if (!u->sink_name || !pa_streq(name, u->sink_name)) return; pa_xfree(u->device_description); u->device_description = pa_xstrdup(description); update_description(u); return; fail: unload_module(u->module->userdata); } /* Called from main context */ static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; uint32_t idx, owner_module, client, sink; pa_usec_t buffer_usec, sink_usec; const char *name, *driver, *resample_method; bool mute = false; pa_sample_spec sample_spec; pa_channel_map channel_map; pa_cvolume volume; bool b; pa_assert(pd); pa_assert(u); if (command != PA_COMMAND_REPLY) { if (command == PA_COMMAND_ERROR) pa_log("Failed to get info."); else pa_log("Protocol error."); goto fail; } if (pa_tagstruct_getu32(t, &idx) < 0 || pa_tagstruct_gets(t, &name) < 0 || pa_tagstruct_getu32(t, &owner_module) < 0 || pa_tagstruct_getu32(t, &client) < 0 || pa_tagstruct_getu32(t, &sink) < 0 || pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 || pa_tagstruct_get_channel_map(t, &channel_map) < 0 || pa_tagstruct_get_cvolume(t, &volume) < 0 || pa_tagstruct_get_usec(t, &buffer_usec) < 0 || pa_tagstruct_get_usec(t, &sink_usec) < 0 || pa_tagstruct_gets(t, &resample_method) < 0 || pa_tagstruct_gets(t, &driver) < 0) { pa_log("Parse failure"); goto fail; } if (u->version >= 11) { if (pa_tagstruct_get_boolean(t, &mute) < 0) { pa_log("Parse failure"); goto fail; } } if (u->version >= 13) { if (pa_tagstruct_get_proplist(t, NULL) < 0) { pa_log("Parse failure"); goto fail; } } if (u->version >= 19) { if (pa_tagstruct_get_boolean(t, &b) < 0) { pa_log("Parse failure"); goto fail; } } if (u->version >= 20) { if (pa_tagstruct_get_boolean(t, &b) < 0 || pa_tagstruct_get_boolean(t, &b) < 0) { pa_log("Parse failure"); goto fail; } } if (u->version >= 21) { pa_format_info *format = pa_format_info_new(); if (pa_tagstruct_get_format_info(t, format) < 0) { pa_format_info_free(format); pa_log("Parse failure"); goto fail; } pa_format_info_free(format); } if (!pa_tagstruct_eof(t)) { pa_log("Packet too long"); goto fail; } if (idx != u->device_index) return; pa_assert(u->sink); if ((u->version < 11 || mute == u->sink->muted) && pa_cvolume_equal(&volume, &u->sink->real_volume)) return; pa_sink_volume_changed(u->sink, &volume); if (u->version >= 11) pa_sink_mute_changed(u->sink, mute); return; fail: unload_module(u->module->userdata); } #else /* Called from main context */ static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; uint32_t idx, owner_module, monitor_of_sink, flags; const char *name, *description, *monitor_of_sink_name, *driver; pa_sample_spec ss; pa_channel_map cm; pa_cvolume volume; bool mute; pa_usec_t latency, configured_latency; pa_assert(pd); pa_assert(u); if (command != PA_COMMAND_REPLY) { if (command == PA_COMMAND_ERROR) pa_log("Failed to get info."); else pa_log("Protocol error."); goto fail; } if (pa_tagstruct_getu32(t, &idx) < 0 || pa_tagstruct_gets(t, &name) < 0 || pa_tagstruct_gets(t, &description) < 0 || pa_tagstruct_get_sample_spec(t, &ss) < 0 || pa_tagstruct_get_channel_map(t, &cm) < 0 || pa_tagstruct_getu32(t, &owner_module) < 0 || pa_tagstruct_get_cvolume(t, &volume) < 0 || pa_tagstruct_get_boolean(t, &mute) < 0 || pa_tagstruct_getu32(t, &monitor_of_sink) < 0 || pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 || pa_tagstruct_get_usec(t, &latency) < 0 || pa_tagstruct_gets(t, &driver) < 0 || pa_tagstruct_getu32(t, &flags) < 0) { pa_log("Parse failure"); goto fail; } if (u->version >= 13) { if (pa_tagstruct_get_proplist(t, NULL) < 0 || pa_tagstruct_get_usec(t, &configured_latency) < 0) { pa_log("Parse failure"); goto fail; } } if (u->version >= 15) { pa_volume_t base_volume; uint32_t state, n_volume_steps, card; if (pa_tagstruct_get_volume(t, &base_volume) < 0 || pa_tagstruct_getu32(t, &state) < 0 || pa_tagstruct_getu32(t, &n_volume_steps) < 0 || pa_tagstruct_getu32(t, &card) < 0) { pa_log("Parse failure"); goto fail; } } if (read_ports(u, t) < 0) goto fail; if (u->version >= 22 && read_formats(u, t) < 0) goto fail; if (!pa_tagstruct_eof(t)) { pa_log("Packet too long"); goto fail; } if (!u->source_name || !pa_streq(name, u->source_name)) return; pa_xfree(u->device_description); u->device_description = pa_xstrdup(description); update_description(u); return; fail: unload_module(u->module->userdata); } #endif /* Called from main context */ static void request_info(struct userdata *u) { pa_tagstruct *t; uint32_t tag; pa_assert(u); t = pa_tagstruct_new(); pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO); pa_tagstruct_putu32(t, tag = u->ctag++); pa_pstream_send_tagstruct(u->pstream, t); pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL); #ifdef TUNNEL_SINK t = pa_tagstruct_new(); pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO); pa_tagstruct_putu32(t, tag = u->ctag++); pa_tagstruct_putu32(t, u->device_index); pa_pstream_send_tagstruct(u->pstream, t); pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL); if (u->sink_name) { t = pa_tagstruct_new(); pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO); pa_tagstruct_putu32(t, tag = u->ctag++); pa_tagstruct_putu32(t, PA_INVALID_INDEX); pa_tagstruct_puts(t, u->sink_name); pa_pstream_send_tagstruct(u->pstream, t); pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL); } #else if (u->source_name) { t = pa_tagstruct_new(); pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO); pa_tagstruct_putu32(t, tag = u->ctag++); pa_tagstruct_putu32(t, PA_INVALID_INDEX); pa_tagstruct_puts(t, u->source_name); pa_pstream_send_tagstruct(u->pstream, t); pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL); } #endif } /* Called from main context */ static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; pa_subscription_event_type_t e; uint32_t idx; pa_assert(pd); pa_assert(t); pa_assert(u); pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT); if (pa_tagstruct_getu32(t, &e) < 0 || pa_tagstruct_getu32(t, &idx) < 0) { pa_log("Invalid protocol reply"); unload_module(u->module->userdata); return; } if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) && #ifdef TUNNEL_SINK e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) && e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE) #else e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE) #endif ) return; request_info(u); } /* Called from main context */ static void start_subscribe(struct userdata *u) { pa_tagstruct *t; pa_assert(u); t = pa_tagstruct_new(); pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE); pa_tagstruct_putu32(t, u->ctag++); pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER| #ifdef TUNNEL_SINK PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK #else PA_SUBSCRIPTION_MASK_SOURCE #endif ); pa_pstream_send_tagstruct(u->pstream, t); } /* Called from main context */ static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; #ifdef TUNNEL_SINK uint32_t bytes; #endif pa_assert(pd); pa_assert(u); pa_assert(u->pdispatch == pd); if (command != PA_COMMAND_REPLY) { if (command == PA_COMMAND_ERROR) pa_log("Failed to create stream."); else pa_log("Protocol error."); goto fail; } if (pa_tagstruct_getu32(t, &u->channel) < 0 || pa_tagstruct_getu32(t, &u->device_index) < 0 #ifdef TUNNEL_SINK || pa_tagstruct_getu32(t, &bytes) < 0 #endif ) goto parse_error; if (u->version >= 9) { #ifdef TUNNEL_SINK if (pa_tagstruct_getu32(t, &u->maxlength) < 0 || pa_tagstruct_getu32(t, &u->tlength) < 0 || pa_tagstruct_getu32(t, &u->prebuf) < 0 || pa_tagstruct_getu32(t, &u->minreq) < 0) goto parse_error; #else if (pa_tagstruct_getu32(t, &u->maxlength) < 0 || pa_tagstruct_getu32(t, &u->fragsize) < 0) goto parse_error; #endif } if (u->version >= 12) { pa_sample_spec ss; pa_channel_map cm; uint32_t device_index; const char *dn; bool suspended; if (pa_tagstruct_get_sample_spec(t, &ss) < 0 || pa_tagstruct_get_channel_map(t, &cm) < 0 || pa_tagstruct_getu32(t, &device_index) < 0 || pa_tagstruct_gets(t, &dn) < 0 || pa_tagstruct_get_boolean(t, &suspended) < 0) goto parse_error; #ifdef TUNNEL_SINK pa_xfree(u->sink_name); u->sink_name = pa_xstrdup(dn); #else pa_xfree(u->source_name); u->source_name = pa_xstrdup(dn); #endif } if (u->version >= 13) { pa_usec_t usec; if (pa_tagstruct_get_usec(t, &usec) < 0) goto parse_error; /* #ifdef TUNNEL_SINK */ /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */ /* #else */ /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */ /* #endif */ } if (u->version >= 21) { pa_format_info *format = pa_format_info_new(); if (pa_tagstruct_get_format_info(t, format) < 0) { pa_format_info_free(format); goto parse_error; } pa_format_info_free(format); } if (!pa_tagstruct_eof(t)) goto parse_error; start_subscribe(u); request_info(u); pa_assert(!u->time_event); u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u); request_latency(u); pa_log_debug("Stream created."); #ifdef TUNNEL_SINK pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL); #endif return; parse_error: pa_log("Invalid reply. (Create stream)"); fail: unload_module(u->module->userdata); } /* Called from main context */ static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { struct userdata *u = userdata; pa_tagstruct *reply; char name[256], un[128], hn[128]; pa_cvolume volume; pa_assert(pd); pa_assert(u); pa_assert(u->pdispatch == pd); if (command != PA_COMMAND_REPLY || pa_tagstruct_getu32(t, &u->version) < 0 || !pa_tagstruct_eof(t)) { if (command == PA_COMMAND_ERROR) pa_log("Failed to authenticate"); else pa_log("Protocol error."); goto fail; } /* Minimum supported protocol version */ if (u->version < 8) { pa_log("Incompatible protocol version"); goto fail; } /* Starting with protocol version 13 the MSB of the version tag reflects if shm is enabled for this connection or not. We don't support SHM here at all, so we just ignore this. */ if (u->version >= 13) u->version &= 0x7FFFFFFFU; pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION); #ifdef TUNNEL_SINK pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version); pa_sink_update_proplist(u->sink, 0, NULL); pa_snprintf(name, sizeof(name), "%s for %s@%s", u->sink_name, pa_get_user_name(un, sizeof(un)), pa_get_host_name(hn, sizeof(hn))); #else pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version); pa_source_update_proplist(u->source, 0, NULL); pa_snprintf(name, sizeof(name), "%s for %s@%s", u->source_name, pa_get_user_name(un, sizeof(un)), pa_get_host_name(hn, sizeof(hn))); #endif reply = pa_tagstruct_new(); pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME); pa_tagstruct_putu32(reply, u->ctag++); if (u->version >= 13) { pa_proplist *pl; pl = pa_proplist_new(); pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio"); pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION); pa_init_proplist(pl); pa_tagstruct_put_proplist(reply, pl); pa_proplist_free(pl); } else pa_tagstruct_puts(reply, "PulseAudio"); pa_pstream_send_tagstruct(u->pstream, reply); /* We ignore the server's reply here */ reply = pa_tagstruct_new(); if (u->version < 13) /* Only for older PA versions we need to fill in the maxlength */ u->maxlength = 4*1024*1024; #ifdef TUNNEL_SINK u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * u->latency, &u->sink->sample_spec); u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * u->latency / 4, &u->sink->sample_spec); u->prebuf = u->tlength; #else u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * u->latency, &u->source->sample_spec); #endif #ifdef TUNNEL_SINK pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM); pa_tagstruct_putu32(reply, tag = u->ctag++); if (u->version < 13) pa_tagstruct_puts(reply, name); pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec); pa_tagstruct_put_channel_map(reply, &u->sink->channel_map); pa_tagstruct_putu32(reply, PA_INVALID_INDEX); pa_tagstruct_puts(reply, u->sink_name); pa_tagstruct_putu32(reply, u->maxlength); pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(u->sink->state)); pa_tagstruct_putu32(reply, u->tlength); pa_tagstruct_putu32(reply, u->prebuf); pa_tagstruct_putu32(reply, u->minreq); pa_tagstruct_putu32(reply, 0); pa_cvolume_reset(&volume, u->sink->sample_spec.channels); pa_tagstruct_put_cvolume(reply, &volume); #else pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM); pa_tagstruct_putu32(reply, tag = u->ctag++); if (u->version < 13) pa_tagstruct_puts(reply, name); pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec); pa_tagstruct_put_channel_map(reply, &u->source->channel_map); pa_tagstruct_putu32(reply, PA_INVALID_INDEX); pa_tagstruct_puts(reply, u->source_name); pa_tagstruct_putu32(reply, u->maxlength); pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(u->source->state)); pa_tagstruct_putu32(reply, u->fragsize); #endif if (u->version >= 12) { pa_tagstruct_put_boolean(reply, false); /* no_remap */ pa_tagstruct_put_boolean(reply, false); /* no_remix */ pa_tagstruct_put_boolean(reply, false); /* fix_format */ pa_tagstruct_put_boolean(reply, false); /* fix_rate */ pa_tagstruct_put_boolean(reply, false); /* fix_channels */ pa_tagstruct_put_boolean(reply, true); /* no_move */ pa_tagstruct_put_boolean(reply, false); /* variable_rate */ } if (u->version >= 13) { pa_proplist *pl; pa_tagstruct_put_boolean(reply, false); /* start muted/peak detect*/ pa_tagstruct_put_boolean(reply, true); /* adjust_latency */ pl = pa_proplist_new(); pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name); pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract"); pa_tagstruct_put_proplist(reply, pl); pa_proplist_free(pl); #ifndef TUNNEL_SINK pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */ #endif } if (u->version >= 14) { #ifdef TUNNEL_SINK pa_tagstruct_put_boolean(reply, false); /* volume_set */ #endif pa_tagstruct_put_boolean(reply, true); /* early rquests */ } if (u->version >= 15) { #ifdef TUNNEL_SINK pa_tagstruct_put_boolean(reply, false); /* muted_set */ #endif pa_tagstruct_put_boolean(reply, false); /* don't inhibit auto suspend */ pa_tagstruct_put_boolean(reply, false); /* fail on suspend */ } #ifdef TUNNEL_SINK if (u->version >= 17) pa_tagstruct_put_boolean(reply, false); /* relative volume */ if (u->version >= 18) pa_tagstruct_put_boolean(reply, false); /* passthrough stream */ #endif #ifdef TUNNEL_SINK if (u->version >= 21) { /* We're not using the extended API, so n_formats = 0 and that's that */ pa_tagstruct_putu8(reply, 0); } #else if (u->version >= 22) { /* We're not using the extended API, so n_formats = 0 and that's that */ pa_tagstruct_putu8(reply, 0); pa_cvolume_reset(&volume, u->source->sample_spec.channels); pa_tagstruct_put_cvolume(reply, &volume); pa_tagstruct_put_boolean(reply, false); /* muted */ pa_tagstruct_put_boolean(reply, false); /* volume_set */ pa_tagstruct_put_boolean(reply, false); /* muted_set */ pa_tagstruct_put_boolean(reply, false); /* relative volume */ pa_tagstruct_put_boolean(reply, false); /* passthrough stream */ } #endif pa_pstream_send_tagstruct(u->pstream, reply); pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL); pa_log_debug("Connection authenticated, creating stream ..."); return; fail: unload_module(u->module->userdata); } /* Called from main context */ static void pstream_die_callback(pa_pstream *p, void *userdata) { struct userdata *u = userdata; pa_assert(p); pa_assert(u); pa_log_warn("Stream died."); unload_module(u->module->userdata); } /* Called from main context */ static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data, void *userdata) { struct userdata *u = userdata; pa_assert(p); pa_assert(packet); pa_assert(u); if (pa_pdispatch_run(u->pdispatch, packet, ancil_data, u) < 0) { pa_log("Invalid packet"); unload_module(u->module->userdata); return; } } #ifndef TUNNEL_SINK /* Called from main context */ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) { struct userdata *u = userdata; pa_assert(p); pa_assert(chunk); pa_assert(u); if (channel != u->channel) { pa_log("Received memory block on bad channel."); unload_module(u->module->userdata); return; } pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk); u->receive_counter += chunk->length; } #endif /* Called from main context */ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) { struct userdata *u = userdata; pa_assert_ctl_context(); pa_assert(sc); pa_assert(u); pa_assert(u->client == sc); pa_socket_client_unref(u->client); u->client = NULL; if (!io) { pa_log("Connection failed: %s", pa_cstrerror(errno)); unload_module(u->module->userdata); return; } u->io = io; #ifdef TUNNEL_SINK create_sink(u); if (!u->sink) { unload_module(u->module->userdata); return; } on_sink_created(u); #else create_source(u); if (!u->source) { unload_module(u->module->userdata); return; } on_source_created(u); #endif } #ifdef TUNNEL_SINK static void on_sink_created(struct userdata *u) #else static void on_source_created(struct userdata *u) #endif { pa_tagstruct *t; uint32_t tag; u->pstream = pa_pstream_new(u->core->mainloop, u->io, u->core->mempool); u->pdispatch = pa_pdispatch_new(u->core->mainloop, true, command_table, PA_COMMAND_MAX); pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u); pa_pstream_set_receive_packet_callback(u->pstream, pstream_packet_callback, u); #ifndef TUNNEL_SINK pa_pstream_set_receive_memblock_callback(u->pstream, pstream_memblock_callback, u); #endif t = pa_tagstruct_new(); pa_tagstruct_putu32(t, PA_COMMAND_AUTH); pa_tagstruct_putu32(t, tag = u->ctag++); pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION); pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH); #ifdef HAVE_CREDS { pa_creds ucred; if (pa_iochannel_creds_supported(u->io)) pa_iochannel_creds_enable(u->io); ucred.uid = getuid(); ucred.gid = getgid(); pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred); } #else pa_pstream_send_tagstruct(u->pstream, t); #endif pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL); pa_log_debug("Connection established, authenticating ..."); } #ifdef TUNNEL_SINK /* Called from main context */ static void sink_set_volume(pa_sink *sink) { struct userdata *u; pa_tagstruct *t; pa_assert(sink); u = sink->userdata; pa_assert(u); t = pa_tagstruct_new(); pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME); pa_tagstruct_putu32(t, u->ctag++); pa_tagstruct_putu32(t, u->device_index); pa_tagstruct_put_cvolume(t, &sink->real_volume); pa_pstream_send_tagstruct(u->pstream, t); } /* Called from main context */ static void sink_set_mute(pa_sink *sink) { struct userdata *u; pa_tagstruct *t; pa_assert(sink); u = sink->userdata; pa_assert(u); if (u->version < 11) return; t = pa_tagstruct_new(); pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE); pa_tagstruct_putu32(t, u->ctag++); pa_tagstruct_putu32(t, u->device_index); pa_tagstruct_put_boolean(t, sink->muted); pa_pstream_send_tagstruct(u->pstream, t); } #endif #ifdef TUNNEL_SINK static void create_sink(struct userdata *u) { pa_sink_new_data data; char *data_name = NULL; if (!(data_name = pa_xstrdup(u->configured_sink_name))) data_name = pa_sprintf_malloc("tunnel-sink.%s", u->server_name); pa_sink_new_data_init(&data); data.driver = __FILE__; data.module = u->module; data.namereg_fail = false; pa_sink_new_data_set_name(&data, data_name); pa_sink_new_data_set_sample_spec(&data, &u->sample_spec); pa_sink_new_data_set_channel_map(&data, &u->channel_map); pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name); pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name); if (u->sink_name) pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name); pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, u->sink_proplist); u->sink = pa_sink_new(u->module->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY); if (!u->sink) { pa_log("Failed to create sink."); goto finish; } u->sink->parent.process_msg = sink_process_msg; u->sink->userdata = u; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; pa_sink_set_set_volume_callback(u->sink, sink_set_volume); pa_sink_set_set_mute_callback(u->sink, sink_set_mute); u->sink->refresh_volume = u->sink->refresh_muted = false; /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */ pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); pa_sink_set_rtpoll(u->sink, u->rtpoll); pa_sink_set_fixed_latency(u->sink, u->latency * PA_USEC_PER_MSEC); pa_sink_put(u->sink); finish: pa_sink_new_data_done(&data); pa_xfree(data_name); } #else static void create_source(struct userdata *u) { pa_source_new_data data; char *data_name = NULL; if (!(data_name = pa_xstrdup(u->configured_source_name))) data_name = pa_sprintf_malloc("tunnel-source.%s", u->server_name); pa_source_new_data_init(&data); data.driver = __FILE__; data.module = u->module; data.namereg_fail = false; pa_source_new_data_set_name(&data, data_name); pa_source_new_data_set_sample_spec(&data, &u->sample_spec); pa_source_new_data_set_channel_map(&data, &u->channel_map); pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name); pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name); if (u->source_name) pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name); pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, u->source_proplist); u->source = pa_source_new(u->module->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY); if (!u->source) { pa_log("Failed to create source."); goto finish; } u->source->parent.process_msg = source_process_msg; u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb; u->source->userdata = u; /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */ pa_source_set_asyncmsgq(u->source, u->thread_mq.inq); pa_source_set_rtpoll(u->source, u->rtpoll); pa_source_set_fixed_latency(u->source, u->latency * PA_USEC_PER_MSEC); u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec)); pa_source_put(u->source); finish: pa_source_new_data_done(&data); pa_xfree(data_name); } #endif /* Runs in PA mainloop context */ static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = (struct userdata *) data; pa_assert(u); pa_assert_ctl_context(); if (u->shutting_down) return 0; switch (code) { case TUNNEL_MESSAGE_MAYBE_RESTART: unload_module(u->module->userdata); break; } return 0; } static int start_connect(struct userdata *u, char *server, bool automatic) { pa_strlist *server_list = NULL; int rc = 0; if (server) { if (!(server_list = pa_strlist_parse(server))) { pa_log("Invalid server specified."); rc = -1; goto done; } } else { char *ufn; if (!automatic) { pa_log("No server specified."); rc = -1; goto done; } pa_log("No server address found. Attempting default local sockets."); /* The system wide instance via PF_LOCAL */ server_list = pa_strlist_prepend(server_list, PA_SYSTEM_RUNTIME_PATH PA_PATH_SEP PA_NATIVE_DEFAULT_UNIX_SOCKET); /* The user instance via PF_LOCAL */ if ((ufn = pa_runtime_path(PA_NATIVE_DEFAULT_UNIX_SOCKET))) { server_list = pa_strlist_prepend(server_list, ufn); pa_xfree(ufn); } } for (;;) { server_list = pa_strlist_pop(server_list, &u->server_name); if (!u->server_name) { if (server) pa_log("Failed to connect to server '%s'", server); else pa_log("Failed to connect"); rc = -1; goto done; } pa_log_debug("Trying to connect to %s...", u->server_name); if (!(u->client = pa_socket_client_new_string(u->module->core->mainloop, true, u->server_name, PA_NATIVE_DEFAULT_PORT))) { pa_xfree(u->server_name); u->server_name = NULL; continue; } break; } if (u->client) pa_socket_client_set_callback(u->client, on_connection, u); done: pa_strlist_free(server_list); return rc; } static int do_init(pa_module *m) { pa_modargs *ma = NULL; struct userdata *u = NULL; struct module_restart_data *rd; char *server = NULL; uint32_t latency_msec; bool automatic; #ifdef HAVE_X11 xcb_connection_t *xcb = NULL; #endif const char *cookie_path; uint32_t reconnect_interval_ms = 0; pa_assert(m); pa_assert(m->userdata); rd = m->userdata; if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { pa_log("Failed to parse module arguments"); goto fail; } rd->userdata = u = pa_xnew0(struct userdata, 1); u->core = m->core; u->module = m; u->client = NULL; u->pdispatch = NULL; u->pstream = NULL; u->server_name = NULL; #ifdef TUNNEL_SINK u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));; u->configured_sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL)); u->sink = NULL; u->requested_bytes = 0; #else u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));; u->configured_source_name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL)); u->source = NULL; #endif #ifndef USE_SMOOTHER_2 u->smoother = pa_smoother_new( PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, true, true, 10, pa_rtclock_now(), false); #endif u->ctag = 1; u->device_index = u->channel = PA_INVALID_INDEX; u->time_event = NULL; u->ignore_latency_before = 0; u->transport_usec = u->thread_transport_usec = 0; u->remote_suspended = u->remote_corked = false; u->counter = 0; u->receive_snapshot = 0; u->receive_counter = 0; u->msg = pa_msgobject_new(tunnel_msg); u->msg->parent.process_msg = tunnel_process_msg; u->rtpoll = pa_rtpoll_new(); if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) { pa_log("pa_thread_mq_init() failed."); goto fail; } automatic = false; if (pa_modargs_get_value_boolean(ma, "auto", &automatic) < 0) { pa_log("Failed to parse argument \"auto\"."); goto fail; } /* Allow latencies between 5ms and 500ms */ latency_msec = DEFAULT_LATENCY_MSEC; if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 5 || latency_msec > 500) { pa_log("Invalid latency specification"); goto fail; } u->latency = latency_msec; cookie_path = pa_modargs_get_value(ma, "cookie", NULL); server = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)); if (automatic) { #ifdef HAVE_X11 /* Need an X11 connection to get root properties */ if (getenv("DISPLAY") != NULL) { if (!(xcb = xcb_connect(getenv("DISPLAY"), NULL))) pa_log("xcb_connect() failed"); else { if (xcb_connection_has_error(xcb)) { pa_log("xcb_connection_has_error() returned true"); xcb_disconnect(xcb); xcb = NULL; } } } #endif /* Figure out the cookie the same way a normal client would */ if (!cookie_path) cookie_path = getenv(ENV_COOKIE_FILE); #ifdef HAVE_X11 if (!cookie_path && xcb) { char t[1024]; if (pa_x11_get_prop(xcb, 0, "PULSE_COOKIE", t, sizeof(t))) { uint8_t cookie[PA_NATIVE_COOKIE_LENGTH]; if (pa_parsehex(t, cookie, sizeof(cookie)) != sizeof(cookie)) pa_log("Failed to parse cookie data"); else { if (!(u->auth_cookie = pa_auth_cookie_create(u->core, cookie, sizeof(cookie)))) goto fail; } } } #endif /* Same thing for the server name */ if (!server) server = pa_xstrdup(getenv(ENV_DEFAULT_SERVER)); #ifdef HAVE_X11 if (!server && xcb) { char t[1024]; if (pa_x11_get_prop(xcb, 0, "PULSE_SERVER", t, sizeof(t))) server = pa_xstrdup(t); } #endif /* Also determine the default sink/source on the other server */ #ifdef TUNNEL_SINK if (!u->sink_name) u->sink_name = pa_xstrdup(getenv(ENV_DEFAULT_SINK)); #ifdef HAVE_X11 if (!u->sink_name && xcb) { char t[1024]; if (pa_x11_get_prop(xcb, 0, "PULSE_SINK", t, sizeof(t))) u->sink_name = pa_xstrdup(t); } #endif #else if (!u->source_name) u->source_name = pa_xstrdup(getenv(ENV_DEFAULT_SOURCE)); #ifdef HAVE_X11 if (!u->source_name && xcb) { char t[1024]; if (pa_x11_get_prop(xcb, 0, "PULSE_SOURCE", t, sizeof(t))) u->source_name = pa_xstrdup(t); } #endif #endif } if (!cookie_path && !u->auth_cookie) cookie_path = PA_NATIVE_COOKIE_FILE; if (cookie_path) { if (!(u->auth_cookie = pa_auth_cookie_get(u->core, cookie_path, true, PA_NATIVE_COOKIE_LENGTH))) goto fail; } u->sample_spec = m->core->default_sample_spec; u->channel_map = m->core->default_channel_map; if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->sample_spec, &u->channel_map, PA_CHANNEL_MAP_DEFAULT) < 0) { pa_log("Invalid sample format specification"); goto fail; } #ifdef USE_SMOOTHER_2 /* Smoother window must be larger than time between updates. */ u->smoother = pa_smoother_2_new(LATENCY_INTERVAL + 5*PA_USEC_PER_SEC, pa_rtclock_now(), pa_frame_size(&u->sample_spec), u->sample_spec.rate); #endif pa_modargs_get_value_u32(ma, "reconnect_interval_ms", &reconnect_interval_ms); u->reconnect_interval_us = reconnect_interval_ms * PA_USEC_PER_MSEC; #ifdef TUNNEL_SINK u->sink_proplist = pa_proplist_new(); if (pa_modargs_get_proplist(ma, "sink_properties", u->sink_proplist, PA_UPDATE_REPLACE) < 0) { pa_log("Invalid properties"); goto fail; } #else u->source_proplist = pa_proplist_new(); if (pa_modargs_get_proplist(ma, "source_properties", u->source_proplist, PA_UPDATE_REPLACE) < 0) { pa_log("Invalid properties"); goto fail; } #endif u->time_event = NULL; u->maxlength = (uint32_t) -1; #ifdef TUNNEL_SINK u->tlength = u->minreq = u->prebuf = (uint32_t) -1; #else u->fragsize = (uint32_t) -1; #endif if (start_connect(u, server, automatic) < 0) { goto fail; } if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) { pa_log("Failed to create thread."); goto fail; } if (server) pa_xfree(server); #ifdef HAVE_X11 if (xcb) xcb_disconnect(xcb); #endif /* If the module is restarting and do_init() finishes successfully, the * restart data is no longer needed. If do_init() fails, don't touch the * restart data, because following restart attempts will continue to use * the same data. If restart_data is NULL, that means no restart is * currently pending. */ if (rd->restart_data) { pa_restart_free(rd->restart_data); rd->restart_data = NULL; } pa_modargs_free(ma); return 0; fail: if (server) pa_xfree(server); #ifdef HAVE_X11 if (xcb) xcb_disconnect(xcb); #endif if (ma) pa_modargs_free(ma); return -1; } static void do_done(pa_module *m) { struct userdata *u = NULL; struct module_restart_data *rd; pa_assert(m); if (!(rd = m->userdata)) return; if (!(u = rd->userdata)) return; u->shutting_down = true; #ifdef TUNNEL_SINK if (u->sink) pa_sink_unlink(u->sink); #else if (u->source) pa_source_unlink(u->source); #endif if (u->thread) { pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL); pa_thread_free(u->thread); } pa_thread_mq_done(&u->thread_mq); #ifdef TUNNEL_SINK if (u->sink) pa_sink_unref(u->sink); #else if (u->source) pa_source_unref(u->source); #endif if (u->rtpoll) pa_rtpoll_free(u->rtpoll); if (u->pstream) { pa_pstream_unlink(u->pstream); pa_pstream_unref(u->pstream); } if (u->pdispatch) pa_pdispatch_unref(u->pdispatch); if (u->client) pa_socket_client_unref(u->client); if (u->auth_cookie) pa_auth_cookie_unref(u->auth_cookie); if (u->smoother) #ifdef USE_SMOOTHER_2 pa_smoother_2_free(u->smoother); #else pa_smoother_free(u->smoother); #endif if (u->time_event) u->core->mainloop->time_free(u->time_event); #ifndef TUNNEL_SINK if (u->mcalign) pa_mcalign_free(u->mcalign); #endif #ifdef TUNNEL_SINK pa_xfree(u->sink_name); pa_xfree(u->configured_sink_name); pa_proplist_free(u->sink_proplist); #else pa_xfree(u->source_name); pa_xfree(u->configured_source_name); pa_proplist_free(u->source_proplist); #endif pa_xfree(u->server_name); pa_xfree(u->device_description); pa_xfree(u->server_fqdn); pa_xfree(u->user_name); pa_xfree(u->msg); pa_xfree(u); rd->userdata = NULL; } int pa__init(pa_module *m) { int ret; pa_assert(m); m->userdata = pa_xnew0(struct module_restart_data, 1); ret = do_init(m); if (ret < 0) pa__done(m); return ret; } void pa__done(pa_module *m) { pa_assert(m); do_done(m); if (m->userdata) { struct module_restart_data *rd = m->userdata; if (rd->restart_data) pa_restart_free(rd->restart_data); pa_xfree(m->userdata); } }