1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2008 Lennart Poettering
5 
6   PulseAudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as published
8   by the Free Software Foundation; either version 2.1 of the License,
9   or (at your option) any later version.
10 
11   PulseAudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15 
16   You should have received a copy of the GNU Lesser General Public License
17   along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19 
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 
24 #include <stdio.h>
25 #include <errno.h>
26 
27 #include <pulse/rtclock.h>
28 #include <pulse/timeval.h>
29 #include <pulse/util.h>
30 #include <pulse/xmalloc.h>
31 
32 #include <pulsecore/macro.h>
33 #include <pulsecore/module.h>
34 #include <pulsecore/llist.h>
35 #include <pulsecore/sink.h>
36 #include <pulsecore/sink-input.h>
37 #include <pulsecore/memblockq.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/core-rtclock.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/namereg.h>
43 #include <pulsecore/thread.h>
44 #include <pulsecore/thread-mq.h>
45 #include <pulsecore/rtpoll.h>
46 
47 #ifdef USE_SMOOTHER_2
48 #include <pulsecore/time-smoother_2.h>
49 #else
50 #include <pulsecore/time-smoother.h>
51 #endif
52 
53 #include <pulsecore/strlist.h>
54 
55 PA_MODULE_AUTHOR("Lennart Poettering");
56 PA_MODULE_DESCRIPTION("Combine multiple sinks to one");
57 PA_MODULE_VERSION(PACKAGE_VERSION);
58 PA_MODULE_LOAD_ONCE(false);
59 PA_MODULE_USAGE(
60         "sink_name=<name for the sink> "
61         "sink_properties=<properties for the sink> "
62         "slaves=<slave sinks> "
63         "adjust_time=<how often to readjust rates in s> "
64         "resample_method=<method> "
65         "format=<sample format> "
66         "rate=<sample rate> "
67         "channels=<number of channels> "
68         "channel_map=<channel map>"
69         "remix=<boolean>");
70 
71 #define DEFAULT_SINK_NAME "combined"
72 
73 #define MEMBLOCKQ_MAXLENGTH (1024*1024*16)
74 
75 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
76 
77 #define BLOCK_USEC (PA_USEC_PER_MSEC * 200)
78 
79 static const char* const valid_modargs[] = {
80     "sink_name",
81     "sink_properties",
82     "slaves",
83     "adjust_time",
84     "resample_method",
85     "format",
86     "rate",
87     "channels",
88     "channel_map",
89     "remix",
90     NULL
91 };
92 
93 struct output {
94     struct userdata *userdata;
95 
96     pa_sink *sink;
97     pa_sink_input *sink_input;
98     bool ignore_state_change;
99 
100     /* This message queue is only for POST messages, i.e. the messages that
101      * carry audio data from the sink thread to the output thread. The POST
102      * messages need to be handled in a separate queue, because the queue is
103      * processed not only in the output thread mainloop, but also inside the
104      * sink input pop() callback. Processing other messages (such as
105      * SET_REQUESTED_LATENCY) is not safe inside the pop() callback; at least
106      * one reason why it's not safe is that messages that generate rewind
107      * requests (such as SET_REQUESTED_LATENCY) cause crashes when processed
108      * in the pop() callback. */
109     pa_asyncmsgq *audio_inq;
110 
111     /* This message queue is for all other messages than POST from the sink
112      * thread to the output thread (currently "all other messages" means just
113      * the SET_REQUESTED_LATENCY message). */
114     pa_asyncmsgq *control_inq;
115 
116     /* Message queue from the output thread to the sink thread. */
117     pa_asyncmsgq *outq;
118 
119     pa_rtpoll_item *audio_inq_rtpoll_item_read, *audio_inq_rtpoll_item_write;
120     pa_rtpoll_item *control_inq_rtpoll_item_read, *control_inq_rtpoll_item_write;
121     pa_rtpoll_item *outq_rtpoll_item_read, *outq_rtpoll_item_write;
122 
123     pa_memblockq *memblockq;
124 
125     /* For communication of the stream latencies to the main thread */
126     pa_usec_t total_latency;
127     struct {
128         pa_usec_t timestamp;
129         pa_usec_t sink_latency;
130         size_t output_memblockq_size;
131         uint64_t receive_counter;
132     } latency_snapshot;
133 
134     uint64_t receive_counter;
135 
136     /* For communication of the stream parameters to the sink thread */
137     pa_atomic_t max_request;
138     pa_atomic_t max_latency;
139     pa_atomic_t min_latency;
140 
141     PA_LLIST_FIELDS(struct output);
142 };
143 
144 struct userdata {
145     pa_core *core;
146     pa_module *module;
147     pa_sink *sink;
148 
149     pa_thread *thread;
150     pa_thread_mq thread_mq;
151     pa_rtpoll *rtpoll;
152 
153     pa_time_event *time_event;
154     pa_usec_t adjust_time;
155 
156     bool automatic;
157     bool auto_desc;
158 
159     pa_strlist *unlinked_slaves;
160 
161     pa_hook_slot *sink_put_slot, *sink_unlink_slot, *sink_state_changed_slot;
162 
163     pa_resample_method_t resample_method;
164 
165     pa_usec_t block_usec;
166     pa_usec_t default_min_latency;
167     pa_usec_t default_max_latency;
168 
169     pa_idxset* outputs; /* managed in main context */
170 
171     bool remix;
172 
173     struct {
174         PA_LLIST_HEAD(struct output, active_outputs); /* managed in IO thread context */
175         pa_atomic_t running;  /* we cache that value here, so that every thread can query it cheaply */
176         pa_usec_t timestamp;
177         bool in_null_mode;
178 #ifdef USE_SMOOTHER_2
179         pa_smoother_2 *smoother;
180 #else
181          pa_smoother *smoother;
182 #endif
183         uint64_t counter;
184 
185         uint64_t snapshot_counter;
186         pa_usec_t snapshot_time;
187 
188         pa_usec_t render_timestamp;
189     } thread_info;
190 };
191 
192 struct sink_snapshot {
193     pa_usec_t timestamp;
194     uint64_t send_counter;
195 };
196 
197 enum {
198     SINK_MESSAGE_ADD_OUTPUT = PA_SINK_MESSAGE_MAX,
199     SINK_MESSAGE_REMOVE_OUTPUT,
200     SINK_MESSAGE_NEED,
201     SINK_MESSAGE_UPDATE_LATENCY,
202     SINK_MESSAGE_UPDATE_MAX_REQUEST,
203     SINK_MESSAGE_UPDATE_LATENCY_RANGE,
204     SINK_MESSAGE_GET_SNAPSHOT
205 };
206 
207 enum {
208     SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
209     SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY,
210     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
211 };
212 
213 static void output_disable(struct output *o);
214 static void output_enable(struct output *o);
215 static void output_free(struct output *o);
216 static int output_create_sink_input(struct output *o);
217 
218 /* rate controller, called from main context
219  * - maximum deviation from base rate is less than 1%
220  * - controller step size is limited to 2.01‰
221  * - exhibits hunting with USB or Bluetooth devices
222  */
rate_controller( struct output *o, uint32_t base_rate, uint32_t old_rate, int32_t latency_difference_usec)223 static uint32_t rate_controller(
224                 struct output *o,
225                 uint32_t base_rate, uint32_t old_rate,
226                 int32_t latency_difference_usec) {
227 
228     double new_rate, new_rate_1, new_rate_2;
229     double min_cycles_1, min_cycles_2;
230 
231     /* Calculate next rate that is not more than 2‰ away from the last rate */
232     min_cycles_1 = (double)abs(latency_difference_usec) / o->userdata->adjust_time / 0.002 + 1;
233     new_rate_1 = old_rate + base_rate * (double)latency_difference_usec / min_cycles_1 / o->userdata->adjust_time;
234 
235     /* Calculate best rate to correct the current latency offset, limit at
236      * 1% difference from base_rate */
237     min_cycles_2 = (double)abs(latency_difference_usec) / o->userdata->adjust_time / 0.01 + 1;
238     new_rate_2 = (double)base_rate * (1.0 + (double)latency_difference_usec / min_cycles_2 / o->userdata->adjust_time);
239 
240     /* Choose the rate that is nearer to base_rate */
241     new_rate = new_rate_2;
242     if (abs(new_rate_1 - base_rate) < abs(new_rate_2 - base_rate))
243         new_rate = new_rate_1;
244 
245     return (uint32_t)(new_rate + 0.5);
246 }
247 
adjust_rates(struct userdata *u)248 static void adjust_rates(struct userdata *u) {
249     struct output *o;
250     struct sink_snapshot rdata;
251     pa_usec_t avg_total_latency = 0;
252     pa_usec_t target_latency = 0;
253     pa_usec_t max_sink_latency = 0;
254     pa_usec_t min_total_latency = (pa_usec_t)-1;
255     uint32_t base_rate;
256     uint32_t idx;
257     unsigned n = 0;
258     pa_usec_t now;
259     struct output *o_max;
260 
261     pa_assert(u);
262     pa_sink_assert_ref(u->sink);
263 
264     if (pa_idxset_size(u->outputs) <= 0)
265         return;
266 
267     if (u->sink->state != PA_SINK_RUNNING)
268         return;
269 
270     /* Get sink snapshot */
271     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_GET_SNAPSHOT, &rdata, 0, NULL);
272 
273     /* The sink snapshot time is the time when the last data was rendered.
274      * Latency is calculated for that point in time. */
275     now = rdata.timestamp;
276 
277     /* Sink snapshot is not yet valid. */
278     if (!now)
279         return;
280 
281     PA_IDXSET_FOREACH(o, u->outputs, idx) {
282         pa_usec_t snapshot_latency;
283         int64_t time_difference;
284 
285         if (!o->sink_input || !PA_SINK_IS_OPENED(o->sink->state))
286             continue;
287 
288         /* The difference may become negative, because it is probable, that the last
289          * render time was before the sink input snapshot. In this case, the sink
290          * had some more latency at the render time, so subtracting the value still
291          * gives the right result. */
292         time_difference = (int64_t)now - (int64_t)o->latency_snapshot.timestamp;
293 
294         /* Latency at sink snapshot time is sink input snapshot latency minus time
295          * passed between the two snapshots. */
296         snapshot_latency = o->latency_snapshot.sink_latency
297                            + pa_bytes_to_usec(o->latency_snapshot.output_memblockq_size, &o->sink_input->sample_spec)
298                            - time_difference;
299 
300         /* Add the data that was sent between taking the sink input snapshot
301          * and the sink snapshot. */
302         snapshot_latency += pa_bytes_to_usec(rdata.send_counter - o->latency_snapshot.receive_counter, &o->sink_input->sample_spec);
303 
304         /* This is the current combined latency of the slave sink and the related
305          * memblockq at the time of the sink snapshot. */
306         o->total_latency = snapshot_latency;
307         avg_total_latency += snapshot_latency;
308 
309         /* Get max_sink_latency and min_total_latency for target selection. */
310         if (min_total_latency == (pa_usec_t)-1 || o->total_latency < min_total_latency)
311             min_total_latency = o->total_latency;
312 
313         if (o->latency_snapshot.sink_latency > max_sink_latency) {
314             max_sink_latency = o->latency_snapshot.sink_latency;
315             o_max = o;
316         }
317 
318         /* Debug output */
319         pa_log_debug("[%s] Snapshot sink latency = %0.2fms, total snapshot latency = %0.2fms", o->sink->name, (double) o->latency_snapshot.sink_latency / PA_USEC_PER_MSEC, (double) snapshot_latency / PA_USEC_PER_MSEC);
320 
321         if (o->total_latency > 10*PA_USEC_PER_SEC)
322             pa_log_warn("[%s] Total latency of output is very high (%0.2fms), most likely the audio timing in one of your drivers is broken.", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC);
323 
324         n++;
325     }
326 
327     /* If there is no valid output there is nothing to do. */
328     if (min_total_latency == (pa_usec_t) -1)
329         return;
330 
331     avg_total_latency /= n;
332 
333     /* The target selection ensures, that at least one of the
334      * sinks will use the base rate and all other sinks are set
335      * relative to it. */
336     if (max_sink_latency > min_total_latency)
337         target_latency = o_max->total_latency;
338     else
339         target_latency = min_total_latency;
340 
341     pa_log_info("[%s] avg total latency is %0.2f msec.", u->sink->name, (double) avg_total_latency / PA_USEC_PER_MSEC);
342     pa_log_info("[%s] target latency for all slaves is %0.2f msec.", u->sink->name, (double) target_latency / PA_USEC_PER_MSEC);
343 
344     base_rate = u->sink->sample_spec.rate;
345 
346     /* Calculate and set rates for the sink inputs. */
347     PA_IDXSET_FOREACH(o, u->outputs, idx) {
348         uint32_t new_rate;
349         int32_t latency_difference;
350 
351         if (!o->sink_input || !PA_SINK_IS_OPENED(o->sink->state))
352             continue;
353 
354         latency_difference = (int64_t)o->total_latency - (int64_t)target_latency;
355         new_rate = rate_controller(o, base_rate, o->sink_input->sample_spec.rate, latency_difference);
356 
357         pa_log_info("[%s] new rate is %u Hz; ratio is %0.3f.", o->sink_input->sink->name, new_rate, (double) new_rate / base_rate);
358         pa_sink_input_set_rate(o->sink_input, new_rate);
359     }
360 
361     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, NULL, (int64_t) avg_total_latency, NULL);
362 }
363 
time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata)364 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
365     struct userdata *u = userdata;
366 
367     pa_assert(u);
368     pa_assert(a);
369     pa_assert(u->time_event == e);
370 
371     if (u->sink->state == PA_SINK_SUSPENDED) {
372         u->core->mainloop->time_free(e);
373         u->time_event = NULL;
374     } else {
375         struct output *o;
376         uint32_t idx;
377 
378         pa_core_rttime_restart(u->core, e, pa_rtclock_now() + u->adjust_time);
379 
380         /* Get latency snapshots */
381         PA_IDXSET_FOREACH(o, u->outputs, idx) {
382             pa_asyncmsgq_send(o->control_inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
383         }
384 
385     }
386     adjust_rates(u);
387 }
388 
process_render_null(struct userdata *u, pa_usec_t now)389 static void process_render_null(struct userdata *u, pa_usec_t now) {
390     size_t ate = 0;
391 
392     pa_assert(u);
393     pa_assert(u->sink->thread_info.state == PA_SINK_RUNNING);
394 
395     if (u->thread_info.in_null_mode)
396         u->thread_info.timestamp = now;
397 
398     while (u->thread_info.timestamp < now + u->block_usec) {
399         pa_memchunk chunk;
400 
401         pa_sink_render(u->sink, u->sink->thread_info.max_request, &chunk);
402         pa_memblock_unref(chunk.memblock);
403 
404         u->thread_info.counter += chunk.length;
405 
406 /*         pa_log_debug("Ate %lu bytes.", (unsigned long) chunk.length); */
407         u->thread_info.timestamp += pa_bytes_to_usec(chunk.length, &u->sink->sample_spec);
408 
409         ate += chunk.length;
410 
411         if (ate >= u->sink->thread_info.max_request)
412             break;
413     }
414 
415 /*     pa_log_debug("Ate in sum %lu bytes (of %lu)", (unsigned long) ate, (unsigned long) nbytes); */
416 
417 #ifdef USE_SMOOTHER_2
418     pa_smoother_2_put(u->thread_info.smoother, now,
419                     u->thread_info.counter - pa_usec_to_bytes(u->thread_info.timestamp - now, &u->sink->sample_spec));
420 #else
421      pa_smoother_put(u->thread_info.smoother, now,
422                      pa_bytes_to_usec(u->thread_info.counter, &u->sink->sample_spec) - (u->thread_info.timestamp - now));
423 #endif
424 }
425 
thread_func(void *userdata)426 static void thread_func(void *userdata) {
427     struct userdata *u = userdata;
428 
429     pa_assert(u);
430 
431     pa_log_debug("Thread starting up");
432 
433     if (u->core->realtime_scheduling)
434         pa_thread_make_realtime(u->core->realtime_priority+1);
435 
436     pa_thread_mq_install(&u->thread_mq);
437 
438     u->thread_info.timestamp = pa_rtclock_now();
439     u->thread_info.in_null_mode = false;
440 
441     for (;;) {
442         int ret;
443 
444         if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
445             pa_sink_process_rewind(u->sink, 0);
446 
447         /* If no outputs are connected, render some data and drop it immediately. */
448         if (u->sink->thread_info.state == PA_SINK_RUNNING && !u->thread_info.active_outputs) {
449             pa_usec_t now;
450 
451             now = pa_rtclock_now();
452 
453             if (!u->thread_info.in_null_mode || u->thread_info.timestamp <= now)
454                 process_render_null(u, now);
455 
456             pa_rtpoll_set_timer_absolute(u->rtpoll, u->thread_info.timestamp);
457             u->thread_info.in_null_mode = true;
458         } else {
459             pa_rtpoll_set_timer_disabled(u->rtpoll);
460             u->thread_info.in_null_mode = false;
461         }
462 
463         /* Hmm, nothing to do. Let's sleep */
464         if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) {
465             pa_log_info("pa_rtpoll_run() = %i", ret);
466             goto fail;
467         }
468 
469         if (ret == 0)
470             goto finish;
471     }
472 
473 fail:
474     /* If this was no regular exit from the loop we have to continue
475      * processing messages until we received PA_MESSAGE_SHUTDOWN */
476     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
477     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
478 
479 finish:
480     pa_log_debug("Thread shutting down");
481 }
482 
483 /* Called from combine sink I/O thread context */
render_memblock(struct userdata *u, struct output *o, size_t length)484 static void render_memblock(struct userdata *u, struct output *o, size_t length) {
485     pa_assert(u);
486     pa_assert(o);
487 
488     /* We are run by the sink thread, on behalf of an output (o). The
489      * output is waiting for us, hence it is safe to access its
490      * mainblockq and asyncmsgq directly. */
491 
492     /* If we are not running, we cannot produce any data */
493     if (!pa_atomic_load(&u->thread_info.running))
494         return;
495 
496     /* Maybe there's some data in the requesting output's queue
497      * now? */
498     while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
499         ;
500 
501     /* Ok, now let's prepare some data if we really have to. Save the
502      * the time for latency calculations. */
503     u->thread_info.render_timestamp = pa_rtclock_now();
504 
505     while (!pa_memblockq_is_readable(o->memblockq)) {
506         struct output *j;
507         pa_memchunk chunk;
508 
509         /* Render data! */
510         pa_sink_render(u->sink, length, &chunk);
511 
512         u->thread_info.counter += chunk.length;
513         o->receive_counter += chunk.length;
514 
515         /* OK, let's send this data to the other threads */
516         PA_LLIST_FOREACH(j, u->thread_info.active_outputs) {
517             if (j == o)
518                 continue;
519 
520             pa_asyncmsgq_post(j->audio_inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
521         }
522 
523         /* And place it directly into the requesting output's queue */
524         pa_memblockq_push_align(o->memblockq, &chunk);
525         pa_memblock_unref(chunk.memblock);
526     }
527 }
528 
529 /* Called from I/O thread context */
request_memblock(struct output *o, size_t length)530 static void request_memblock(struct output *o, size_t length) {
531     pa_assert(o);
532     pa_sink_input_assert_ref(o->sink_input);
533     pa_sink_assert_ref(o->userdata->sink);
534 
535     /* If another thread already prepared some data we received
536      * the data over the asyncmsgq, hence let's first process
537      * it. */
538     while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
539         ;
540 
541     /* Check whether we're now readable */
542     if (pa_memblockq_is_readable(o->memblockq))
543         return;
544 
545     /* OK, we need to prepare new data, but only if the sink is actually running */
546     if (pa_atomic_load(&o->userdata->thread_info.running))
547         pa_asyncmsgq_send(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_NEED, o, (int64_t) length, NULL);
548 }
549 
550 /* Called from I/O thread context */
sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk)551 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
552     struct output *o;
553 
554     pa_sink_input_assert_ref(i);
555     pa_assert_se(o = i->userdata);
556 
557     /* If necessary, get some new data */
558     request_memblock(o, nbytes);
559 
560     /* pa_log("%s q size is %u + %u (%u/%u)", */
561     /*        i->sink->name, */
562     /*        pa_memblockq_get_nblocks(o->memblockq), */
563     /*        pa_memblockq_get_nblocks(i->thread_info.render_memblockq), */
564     /*        pa_memblockq_get_maxrewind(o->memblockq), */
565     /*        pa_memblockq_get_maxrewind(i->thread_info.render_memblockq)); */
566 
567     if (pa_memblockq_peek(o->memblockq, chunk) < 0)
568         return -1;
569 
570     pa_memblockq_drop(o->memblockq, chunk->length);
571 
572     return 0;
573 }
574 
575 /* Called from I/O thread context */
sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes)576 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
577     struct output *o;
578 
579     pa_sink_input_assert_ref(i);
580     pa_assert_se(o = i->userdata);
581 
582     pa_memblockq_rewind(o->memblockq, nbytes);
583 }
584 
585 /* Called from I/O thread context */
sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes)586 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
587     struct output *o;
588 
589     pa_sink_input_assert_ref(i);
590     pa_assert_se(o = i->userdata);
591 
592     pa_memblockq_set_maxrewind(o->memblockq, nbytes);
593 }
594 
595 /* Called from I/O thread context */
sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes)596 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
597     struct output *o;
598 
599     pa_sink_input_assert_ref(i);
600     pa_assert_se(o = i->userdata);
601 
602     if (pa_atomic_load(&o->max_request) == (int) nbytes)
603         return;
604 
605     pa_atomic_store(&o->max_request, (int) nbytes);
606     pa_log_debug("Sink input update max request %lu", (unsigned long) nbytes);
607     pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
608 }
609 
610 /* Called from thread context */
sink_input_update_sink_latency_range_cb(pa_sink_input *i)611 static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
612     struct output *o;
613     pa_usec_t min, max, fix;
614 
615     pa_assert(i);
616 
617     pa_sink_input_assert_ref(i);
618     pa_assert_se(o = i->userdata);
619 
620     fix = i->sink->thread_info.fixed_latency;
621     if (fix > 0) {
622         min = fix;
623         max = fix;
624     } else {
625         min = i->sink->thread_info.min_latency;
626         max = i->sink->thread_info.max_latency;
627     }
628 
629     if ((pa_atomic_load(&o->min_latency) == (int) min) &&
630         (pa_atomic_load(&o->max_latency) == (int) max))
631         return;
632 
633     pa_atomic_store(&o->min_latency, (int) min);
634     pa_atomic_store(&o->max_latency, (int) max);
635     pa_log_debug("Sink input update latency range %lu %lu", (unsigned long) min, (unsigned long) max);
636     pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_LATENCY_RANGE, NULL, 0, NULL, NULL);
637 }
638 
639 /* Called from I/O thread context */
sink_input_attach_cb(pa_sink_input *i)640 static void sink_input_attach_cb(pa_sink_input *i) {
641     struct output *o;
642     pa_usec_t fix, min, max;
643     size_t nbytes;
644 
645     pa_sink_input_assert_ref(i);
646     pa_assert_se(o = i->userdata);
647 
648     /* Set up the queue from the sink thread to us */
649     pa_assert(!o->audio_inq_rtpoll_item_read);
650     pa_assert(!o->control_inq_rtpoll_item_read);
651     pa_assert(!o->outq_rtpoll_item_write);
652 
653     o->audio_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
654             i->sink->thread_info.rtpoll,
655             PA_RTPOLL_LATE,  /* This one is not that important, since we check for data in _peek() anyway. */
656             o->audio_inq);
657 
658     o->control_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
659             i->sink->thread_info.rtpoll,
660             PA_RTPOLL_NORMAL,
661             o->control_inq);
662 
663     o->outq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
664             i->sink->thread_info.rtpoll,
665             PA_RTPOLL_EARLY,
666             o->outq);
667 
668     pa_sink_input_request_rewind(i, 0, false, true, true);
669 
670     nbytes = pa_sink_input_get_max_request(i);
671     pa_atomic_store(&o->max_request, (int) nbytes);
672     pa_log_debug("attach max request %lu", (unsigned long) nbytes);
673 
674     fix = i->sink->thread_info.fixed_latency;
675     if (fix > 0) {
676         min = max = fix;
677     } else {
678         min = i->sink->thread_info.min_latency;
679         max = i->sink->thread_info.max_latency;
680     }
681     pa_atomic_store(&o->min_latency, (int) min);
682     pa_atomic_store(&o->max_latency, (int) max);
683     pa_log_debug("attach latency range %lu %lu", (unsigned long) min, (unsigned long) max);
684 
685     /* We register the output. That means that the sink will start to pass data to
686      * this output. */
687     pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_ADD_OUTPUT, o, 0, NULL);
688 }
689 
690 /* Called from I/O thread context */
sink_input_detach_cb(pa_sink_input *i)691 static void sink_input_detach_cb(pa_sink_input *i) {
692     struct output *o;
693 
694     pa_sink_input_assert_ref(i);
695     pa_assert_se(o = i->userdata);
696 
697     /* We unregister the output. That means that the sink doesn't
698      * pass any further data to this output */
699     pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
700 
701     if (o->audio_inq_rtpoll_item_read) {
702         pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read);
703         o->audio_inq_rtpoll_item_read = NULL;
704     }
705 
706     if (o->control_inq_rtpoll_item_read) {
707         pa_rtpoll_item_free(o->control_inq_rtpoll_item_read);
708         o->control_inq_rtpoll_item_read = NULL;
709     }
710 
711     if (o->outq_rtpoll_item_write) {
712         pa_rtpoll_item_free(o->outq_rtpoll_item_write);
713         o->outq_rtpoll_item_write = NULL;
714     }
715 
716 }
717 
718 /* Called from main context */
sink_input_kill_cb(pa_sink_input *i)719 static void sink_input_kill_cb(pa_sink_input *i) {
720     struct output *o;
721 
722     pa_sink_input_assert_ref(i);
723     pa_assert_se(o = i->userdata);
724 
725     pa_module_unload_request(o->userdata->module, true);
726     pa_idxset_remove_by_data(o->userdata->outputs, o, NULL);
727     output_free(o);
728 }
729 
730 /* Called from thread context */
sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk)731 static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
732     struct output *o = PA_SINK_INPUT(obj)->userdata;
733 
734     switch (code) {
735 
736         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
737             pa_usec_t *r = data;
738 
739             *r = pa_bytes_to_usec(pa_memblockq_get_length(o->memblockq), &o->sink_input->sample_spec);
740 
741             /* Fall through, the default handler will add in the extra
742              * latency added by the resampler */
743             break;
744         }
745 
746         case SINK_INPUT_MESSAGE_POST:
747 
748             if (o->sink_input->sink->thread_info.state == PA_SINK_RUNNING) {
749                 pa_memblockq_push_align(o->memblockq, chunk);
750                 o->receive_counter += chunk->length;
751             } else
752                 pa_memblockq_flush_write(o->memblockq, true);
753 
754             return 0;
755 
756         case SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY: {
757             pa_usec_t latency = (pa_usec_t) offset;
758 
759             pa_sink_input_set_requested_latency_within_thread(o->sink_input, latency);
760 
761             return 0;
762         }
763 
764         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
765             size_t length;
766 
767             length = pa_memblockq_get_length(o->sink_input->thread_info.render_memblockq);
768 
769             o->latency_snapshot.output_memblockq_size = pa_memblockq_get_length(o->memblockq);
770 
771             /* Add content of memblockq's to sink latency */
772             o->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(o->sink, true) +
773                                                pa_bytes_to_usec(length, &o->sink->sample_spec);
774             /* Add resampler latency */
775             o->latency_snapshot.sink_latency += pa_resampler_get_delay_usec(o->sink_input->thread_info.resampler);
776 
777             o->latency_snapshot.timestamp = pa_rtclock_now();
778 
779             o->latency_snapshot.receive_counter = o->receive_counter;
780 
781             return 0;
782         }
783     }
784 
785     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
786 }
787 
788 /* Called from main context */
suspend(struct userdata *u)789 static void suspend(struct userdata *u) {
790     struct output *o;
791     uint32_t idx;
792 
793     pa_assert(u);
794 
795     /* Let's suspend by unlinking all streams */
796     PA_IDXSET_FOREACH(o, u->outputs, idx)
797         output_disable(o);
798 
799     pa_log_info("Device suspended...");
800 }
801 
802 /* Called from main context */
unsuspend(struct userdata *u)803 static void unsuspend(struct userdata *u) {
804     struct output *o;
805     uint32_t idx;
806 
807     pa_assert(u);
808 
809     /* Let's resume */
810     PA_IDXSET_FOREACH(o, u->outputs, idx)
811         output_enable(o);
812 
813     pa_log_info("Resumed successfully...");
814 }
815 
816 /* Called from main context */
sink_set_state_in_main_thread_cb(pa_sink *sink, pa_sink_state_t state, pa_suspend_cause_t suspend_cause)817 static int sink_set_state_in_main_thread_cb(pa_sink *sink, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
818     struct userdata *u;
819 
820     pa_sink_assert_ref(sink);
821     pa_assert_se(u = sink->userdata);
822 
823     /* It may be that only the suspend cause is changing, in which
824      * case there's nothing to do. */
825     if (state == u->sink->state)
826         return 0;
827 
828     /* Please note that in contrast to the ALSA modules we call
829      * suspend/unsuspend from main context here! */
830 
831     switch (state) {
832         case PA_SINK_SUSPENDED:
833             pa_assert(PA_SINK_IS_OPENED(u->sink->state));
834 
835             suspend(u);
836             break;
837 
838         case PA_SINK_IDLE:
839         case PA_SINK_RUNNING:
840 
841             if (u->sink->state == PA_SINK_SUSPENDED)
842                 unsuspend(u);
843 
844             /* The first smoother update should be done early, otherwise the smoother will
845              * not be aware of the slave sink latencies and report far too small values.
846              * This is especially important if after an unsuspend the sink runs on a different
847              * latency than before. */
848             if (state == PA_SINK_RUNNING && !u->time_event && u->adjust_time > 0)
849                 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + pa_sink_get_requested_latency(u->sink), time_callback, u);
850 
851             break;
852 
853         case PA_SINK_UNLINKED:
854         case PA_SINK_INIT:
855         case PA_SINK_INVALID_STATE:
856             ;
857     }
858 
859     return 0;
860 }
861 
862 /* Called from the IO thread. */
sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause)863 static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
864     struct userdata *u;
865     bool running;
866 
867     pa_assert(s);
868     pa_assert_se(u = s->userdata);
869 
870     /* It may be that only the suspend cause is changing, in which case there's
871      * nothing to do. */
872     if (new_state == s->thread_info.state)
873         return 0;
874 
875     running = new_state == PA_SINK_RUNNING;
876     pa_atomic_store(&u->thread_info.running, running);
877 
878     if (running) {
879         u->thread_info.render_timestamp = 0;
880 #ifdef USE_SMOOTHER_2
881         pa_smoother_2_resume(u->thread_info.smoother, pa_rtclock_now());
882     } else
883         pa_smoother_2_pause(u->thread_info.smoother, pa_rtclock_now());
884 #else
885         pa_smoother_resume(u->thread_info.smoother, pa_rtclock_now(), true);
886     } else
887         pa_smoother_pause(u->thread_info.smoother, pa_rtclock_now());
888 #endif
889 
890     return 0;
891 }
892 
893 /* Called from IO context */
894 static void update_max_request(struct userdata *u) {
895     size_t max_request = 0;
896     struct output *o;
897 
898     pa_assert(u);
899     pa_sink_assert_io_context(u->sink);
900 
901     /* Collects the max_request values of all streams and sets the
902      * largest one locally */
903 
904     PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
905         size_t mr = (size_t) pa_atomic_load(&o->max_request);
906 
907         if (mr > max_request)
908             max_request = mr;
909     }
910 
911     if (max_request <= 0)
912         max_request = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec);
913 
914     pa_log_debug("Sink update max request %lu", (unsigned long) max_request);
915     pa_sink_set_max_request_within_thread(u->sink, max_request);
916 }
917 
918 /* Called from IO context */
919 static void update_latency_range(struct userdata *u) {
920     pa_usec_t min_latency = 0, max_latency = (pa_usec_t) -1;
921     struct output *o;
922 
923     pa_assert(u);
924     pa_sink_assert_io_context(u->sink);
925 
926     /* Collects the latency_range values of all streams and sets
927      * the max of min and min of max locally */
928     PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
929         pa_usec_t min = (size_t) pa_atomic_load(&o->min_latency);
930         pa_usec_t max = (size_t) pa_atomic_load(&o->max_latency);
931 
932         if (min > min_latency)
933             min_latency = min;
934         if (max_latency == (pa_usec_t) -1 || max < max_latency)
935             max_latency = max;
936     }
937     if (max_latency == (pa_usec_t) -1) {
938         /* No outputs, use default limits. */
939         min_latency = u->default_min_latency;
940         max_latency = u->default_max_latency;
941     }
942 
943     /* As long as we don't support rewinding, we should limit the max latency
944      * to a conservative value. */
945     if (max_latency > u->default_max_latency)
946         max_latency = u->default_max_latency;
947 
948     /* Never ever try to set lower max latency than min latency, it just
949      * doesn't make sense. */
950     if (max_latency < min_latency)
951         max_latency = min_latency;
952 
953     pa_log_debug("Sink update latency range %" PRIu64 " %" PRIu64, min_latency, max_latency);
954     pa_sink_set_latency_range_within_thread(u->sink, min_latency, max_latency);
955 }
956 
957 /* Called from thread context of the io thread */
958 static void output_add_within_thread(struct output *o) {
959     pa_assert(o);
960     pa_sink_assert_io_context(o->sink);
961 
962     PA_LLIST_PREPEND(struct output, o->userdata->thread_info.active_outputs, o);
963 
964     pa_assert(!o->outq_rtpoll_item_read);
965     pa_assert(!o->audio_inq_rtpoll_item_write);
966     pa_assert(!o->control_inq_rtpoll_item_write);
967 
968     o->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
969             o->userdata->rtpoll,
970             PA_RTPOLL_EARLY-1,  /* This item is very important */
971             o->outq);
972     o->audio_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
973             o->userdata->rtpoll,
974             PA_RTPOLL_EARLY,
975             o->audio_inq);
976     o->control_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
977             o->userdata->rtpoll,
978             PA_RTPOLL_NORMAL,
979             o->control_inq);
980     o->receive_counter = o->userdata->thread_info.counter;
981 }
982 
983 /* Called from thread context of the io thread */
984 static void output_remove_within_thread(struct output *o) {
985     pa_assert(o);
986     pa_sink_assert_io_context(o->sink);
987 
988     PA_LLIST_REMOVE(struct output, o->userdata->thread_info.active_outputs, o);
989 
990     if (o->outq_rtpoll_item_read) {
991         pa_rtpoll_item_free(o->outq_rtpoll_item_read);
992         o->outq_rtpoll_item_read = NULL;
993     }
994 
995     if (o->audio_inq_rtpoll_item_write) {
996         pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write);
997         o->audio_inq_rtpoll_item_write = NULL;
998     }
999 
1000     if (o->control_inq_rtpoll_item_write) {
1001         pa_rtpoll_item_free(o->control_inq_rtpoll_item_write);
1002         o->control_inq_rtpoll_item_write = NULL;
1003     }
1004 }
1005 
1006 /* Called from sink I/O thread context */
1007 static void sink_update_requested_latency(pa_sink *s) {
1008     struct userdata *u;
1009     struct output *o;
1010 
1011     pa_sink_assert_ref(s);
1012     pa_assert_se(u = s->userdata);
1013 
1014     u->block_usec = pa_sink_get_requested_latency_within_thread(s);
1015 
1016     if (u->block_usec == (pa_usec_t) -1)
1017         u->block_usec = s->thread_info.max_latency;
1018 
1019     pa_log_debug("Sink update requested latency %0.2f", (double) u->block_usec / PA_USEC_PER_MSEC);
1020 
1021     /* Just hand this one over to all sink_inputs */
1022     PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
1023         pa_asyncmsgq_post(o->control_inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL,
1024                           u->block_usec, NULL, NULL);
1025     }
1026 }
1027 
1028 
1029 /* Called from thread context of the io thread */
1030 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1031     struct userdata *u = PA_SINK(o)->userdata;
1032 
1033     switch (code) {
1034 
1035         case PA_SINK_MESSAGE_GET_LATENCY: {
1036             int64_t *delay = data;
1037 
1038 #ifdef USE_SMOOTHER_2
1039             *delay = pa_smoother_2_get_delay(u->thread_info.smoother, pa_rtclock_now(), u->thread_info.counter);
1040 #else
1041             pa_usec_t x, y, c;
1042 
1043             x = pa_rtclock_now();
1044             y = pa_smoother_get(u->thread_info.smoother, x);
1045 
1046             c = pa_bytes_to_usec(u->thread_info.counter, &u->sink->sample_spec);
1047 
1048             *delay = (int64_t)c - y;
1049 #endif
1050 
1051             return 0;
1052         }
1053 
1054         case SINK_MESSAGE_ADD_OUTPUT:
1055             output_add_within_thread(data);
1056             update_max_request(u);
1057             update_latency_range(u);
1058             return 0;
1059 
1060         case SINK_MESSAGE_REMOVE_OUTPUT:
1061             output_remove_within_thread(data);
1062             update_max_request(u);
1063             update_latency_range(u);
1064             return 0;
1065 
1066         case SINK_MESSAGE_NEED:
1067             render_memblock(u, (struct output*) data, (size_t) offset);
1068             return 0;
1069 
1070         case SINK_MESSAGE_UPDATE_LATENCY: {
1071 #ifdef USE_SMOOTHER_2
1072             size_t latency;
1073 
1074             latency = pa_usec_to_bytes((pa_usec_t)offset,  &u->sink->sample_spec);
1075             pa_smoother_2_put(u->thread_info.smoother, u->thread_info.snapshot_time, (int64_t)u->thread_info.snapshot_counter - latency);
1076 #else
1077             pa_usec_t x, y, latency = (pa_usec_t) offset;
1078 
1079             /* It may be possible that thread_info.counter has been increased
1080              * since we took the snapshot. Therefore we have to use the snapshot
1081              * time and counter instead of the current values. */
1082             x = u->thread_info.snapshot_time;
1083             y = pa_bytes_to_usec(u->thread_info.snapshot_counter, &u->sink->sample_spec);
1084 
1085             if (y > latency)
1086                 y -= latency;
1087             else
1088                 y = 0;
1089 
1090             pa_smoother_put(u->thread_info.smoother, x, y);
1091 #endif
1092             return 0;
1093         }
1094 
1095         case SINK_MESSAGE_GET_SNAPSHOT: {
1096             struct sink_snapshot *rdata = data;
1097 
1098             rdata->timestamp = u->thread_info.render_timestamp;
1099             rdata->send_counter = u->thread_info.counter;
1100             u->thread_info.snapshot_counter = u->thread_info.counter;
1101             u->thread_info.snapshot_time = u->thread_info.render_timestamp;
1102 
1103             return 0;
1104         }
1105 
1106         case SINK_MESSAGE_UPDATE_MAX_REQUEST:
1107             update_max_request(u);
1108             break;
1109 
1110         case SINK_MESSAGE_UPDATE_LATENCY_RANGE:
1111             update_latency_range(u);
1112             break;
1113 
1114 }
1115 
1116     return pa_sink_process_msg(o, code, data, offset, chunk);
1117 }
1118 
1119 static void update_description(struct userdata *u) {
1120     bool first = true;
1121     char *t;
1122     struct output *o;
1123     uint32_t idx;
1124 
1125     pa_assert(u);
1126 
1127     if (!u->auto_desc)
1128         return;
1129 
1130     if (pa_idxset_isempty(u->outputs)) {
1131         pa_sink_set_description(u->sink, "Simultaneous output");
1132         return;
1133     }
1134 
1135     t = pa_xstrdup("Simultaneous output to");
1136 
1137     PA_IDXSET_FOREACH(o, u->outputs, idx) {
1138         char *e;
1139 
1140         if (first) {
1141             e = pa_sprintf_malloc("%s %s", t, pa_strnull(pa_proplist_gets(o->sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1142             first = false;
1143         } else
1144             e = pa_sprintf_malloc("%s, %s", t, pa_strnull(pa_proplist_gets(o->sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1145 
1146         pa_xfree(t);
1147         t = e;
1148     }
1149 
1150     pa_sink_set_description(u->sink, t);
1151     pa_xfree(t);
1152 }
1153 
1154 static int output_create_sink_input(struct output *o) {
1155     struct userdata *u;
1156     pa_sink_input_new_data data;
1157 
1158     pa_assert(o);
1159 
1160     if (o->sink_input)
1161         return 0;
1162 
1163     u = o->userdata;
1164 
1165     pa_sink_input_new_data_init(&data);
1166     pa_sink_input_new_data_set_sink(&data, o->sink, false, true);
1167     data.driver = __FILE__;
1168     pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME, "Simultaneous output on %s", pa_strnull(pa_proplist_gets(o->sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1169     pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1170     pa_sink_input_new_data_set_sample_spec(&data, &u->sink->sample_spec);
1171     pa_sink_input_new_data_set_channel_map(&data, &u->sink->channel_map);
1172     data.module = u->module;
1173     data.resample_method = u->resample_method;
1174     data.flags = PA_SINK_INPUT_VARIABLE_RATE|PA_SINK_INPUT_DONT_MOVE|PA_SINK_INPUT_NO_CREATE_ON_SUSPEND;
1175     data.origin_sink = u->sink;
1176 
1177     if (!u->remix)
1178         data.flags |= PA_SINK_INPUT_NO_REMIX;
1179 
1180     pa_sink_input_new(&o->sink_input, u->core, &data);
1181 
1182     pa_sink_input_new_data_done(&data);
1183 
1184     if (!o->sink_input)
1185         return -1;
1186 
1187     o->sink_input->parent.process_msg = sink_input_process_msg;
1188     o->sink_input->pop = sink_input_pop_cb;
1189     o->sink_input->process_rewind = sink_input_process_rewind_cb;
1190     o->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1191     o->sink_input->update_max_request = sink_input_update_max_request_cb;
1192     o->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
1193     o->sink_input->attach = sink_input_attach_cb;
1194     o->sink_input->detach = sink_input_detach_cb;
1195     o->sink_input->kill = sink_input_kill_cb;
1196     o->sink_input->userdata = o;
1197 
1198     pa_sink_input_set_requested_latency(o->sink_input, pa_sink_get_requested_latency(u->sink));
1199 
1200     return 0;
1201 }
1202 
1203 /* Called from main context */
1204 static struct output *output_new(struct userdata *u, pa_sink *sink) {
1205     struct output *o;
1206 
1207     pa_assert(u);
1208     pa_assert(sink);
1209     pa_assert(u->sink);
1210 
1211     o = pa_xnew0(struct output, 1);
1212     o->userdata = u;
1213 
1214     o->audio_inq = pa_asyncmsgq_new(0);
1215     if (!o->audio_inq) {
1216         pa_log("pa_asyncmsgq_new() failed.");
1217         goto fail;
1218     }
1219 
1220     o->control_inq = pa_asyncmsgq_new(0);
1221     if (!o->control_inq) {
1222         pa_log("pa_asyncmsgq_new() failed.");
1223         goto fail;
1224     }
1225 
1226     o->outq = pa_asyncmsgq_new(0);
1227     if (!o->outq) {
1228         pa_log("pa_asyncmsgq_new() failed.");
1229         goto fail;
1230     }
1231 
1232     o->sink = sink;
1233     o->memblockq = pa_memblockq_new(
1234             "module-combine-sink output memblockq",
1235             0,
1236             MEMBLOCKQ_MAXLENGTH,
1237             MEMBLOCKQ_MAXLENGTH,
1238             &u->sink->sample_spec,
1239             1,
1240             0,
1241             0,
1242             &u->sink->silence);
1243 
1244     pa_assert_se(pa_idxset_put(u->outputs, o, NULL) == 0);
1245     update_description(u);
1246 
1247     return o;
1248 
1249 fail:
1250     output_free(o);
1251 
1252     return NULL;
1253 }
1254 
1255 /* Called from main context */
1256 static void output_free(struct output *o) {
1257     pa_assert(o);
1258 
1259     output_disable(o);
1260     update_description(o->userdata);
1261 
1262     if (o->audio_inq_rtpoll_item_read)
1263         pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read);
1264     if (o->audio_inq_rtpoll_item_write)
1265         pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write);
1266 
1267     if (o->control_inq_rtpoll_item_read)
1268         pa_rtpoll_item_free(o->control_inq_rtpoll_item_read);
1269     if (o->control_inq_rtpoll_item_write)
1270         pa_rtpoll_item_free(o->control_inq_rtpoll_item_write);
1271 
1272     if (o->outq_rtpoll_item_read)
1273         pa_rtpoll_item_free(o->outq_rtpoll_item_read);
1274     if (o->outq_rtpoll_item_write)
1275         pa_rtpoll_item_free(o->outq_rtpoll_item_write);
1276 
1277     if (o->audio_inq)
1278         pa_asyncmsgq_unref(o->audio_inq);
1279 
1280     if (o->control_inq)
1281         pa_asyncmsgq_unref(o->control_inq);
1282 
1283     if (o->outq)
1284         pa_asyncmsgq_unref(o->outq);
1285 
1286     if (o->memblockq)
1287         pa_memblockq_free(o->memblockq);
1288 
1289     pa_xfree(o);
1290 }
1291 
1292 /* Called from main context */
1293 static void output_enable(struct output *o) {
1294     pa_assert(o);
1295 
1296     if (o->sink_input)
1297         return;
1298 
1299     /* This might cause the sink to be resumed. The state change hook
1300      * of the sink might hence be called from here, which might then
1301      * cause us to be called in a loop. Make sure that state changes
1302      * for this output don't cause this loop by setting a flag here */
1303     o->ignore_state_change = true;
1304 
1305     if (output_create_sink_input(o) >= 0) {
1306 
1307         if (o->sink->state != PA_SINK_INIT) {
1308             /* Enable the sink input. That means that the sink
1309              * is now asked for new data. */
1310             pa_sink_input_put(o->sink_input);
1311         }
1312     }
1313 
1314     o->ignore_state_change = false;
1315 }
1316 
1317 /* Called from main context */
1318 static void output_disable(struct output *o) {
1319     pa_assert(o);
1320 
1321     if (!o->sink_input)
1322         return;
1323 
1324     /* We disable the sink input. That means that the sink is
1325      * not asked for new data anymore  */
1326     pa_sink_input_unlink(o->sink_input);
1327 
1328     /* Now deallocate the stream */
1329     pa_sink_input_unref(o->sink_input);
1330     o->sink_input = NULL;
1331 
1332     /* Finally, drop all queued data */
1333     pa_memblockq_flush_write(o->memblockq, true);
1334     pa_asyncmsgq_flush(o->audio_inq, false);
1335     pa_asyncmsgq_flush(o->control_inq, false);
1336     pa_asyncmsgq_flush(o->outq, false);
1337 }
1338 
1339 /* Called from main context */
1340 static void output_verify(struct output *o) {
1341     pa_assert(o);
1342 
1343     if (PA_SINK_IS_OPENED(o->userdata->sink->state))
1344         output_enable(o);
1345     else
1346         output_disable(o);
1347 }
1348 
1349 /* Called from main context */
1350 static bool is_suitable_sink(struct userdata *u, pa_sink *s) {
1351     const char *t;
1352 
1353     pa_sink_assert_ref(s);
1354 
1355     if (s == u->sink)
1356         return false;
1357 
1358     if (!(s->flags & PA_SINK_HARDWARE))
1359         return false;
1360 
1361     if (!(s->flags & PA_SINK_LATENCY))
1362         return false;
1363 
1364     if ((t = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_CLASS)))
1365         if (!pa_streq(t, "sound"))
1366             return false;
1367 
1368     return true;
1369 }
1370 
1371 /* Called from main context */
1372 static pa_hook_result_t sink_put_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
1373     struct output *o;
1374 
1375     pa_core_assert_ref(c);
1376     pa_sink_assert_ref(s);
1377     pa_assert(u);
1378 
1379     if (u->automatic) {
1380         if (!is_suitable_sink(u, s))
1381             return PA_HOOK_OK;
1382     } else {
1383         /* Check if the sink is a previously unlinked slave (non-automatic mode) */
1384         pa_strlist *l = u->unlinked_slaves;
1385 
1386         while (l && !pa_streq(pa_strlist_data(l), s->name))
1387             l = pa_strlist_next(l);
1388 
1389         if (!l)
1390             return PA_HOOK_OK;
1391 
1392         u->unlinked_slaves = pa_strlist_remove(u->unlinked_slaves, s->name);
1393     }
1394 
1395     pa_log_info("Configuring new sink: %s", s->name);
1396     if (!(o = output_new(u, s))) {
1397         pa_log("Failed to create sink input on sink '%s'.", s->name);
1398         return PA_HOOK_OK;
1399     }
1400 
1401     output_verify(o);
1402 
1403     return PA_HOOK_OK;
1404 }
1405 
1406 /* Called from main context */
1407 static struct output* find_output(struct userdata *u, pa_sink *s) {
1408     struct output *o;
1409     uint32_t idx;
1410 
1411     pa_assert(u);
1412     pa_assert(s);
1413 
1414     if (u->sink == s)
1415         return NULL;
1416 
1417     PA_IDXSET_FOREACH(o, u->outputs, idx)
1418         if (o->sink == s)
1419             return o;
1420 
1421     return NULL;
1422 }
1423 
1424 /* Called from main context */
1425 static pa_hook_result_t sink_unlink_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
1426     struct output *o;
1427 
1428     pa_assert(c);
1429     pa_sink_assert_ref(s);
1430     pa_assert(u);
1431 
1432     if (!(o = find_output(u, s)))
1433         return PA_HOOK_OK;
1434 
1435     pa_log_info("Unconfiguring sink: %s", s->name);
1436 
1437     if (!u->automatic)
1438         u->unlinked_slaves = pa_strlist_prepend(u->unlinked_slaves, s->name);
1439 
1440     pa_idxset_remove_by_data(u->outputs, o, NULL);
1441     output_free(o);
1442 
1443     return PA_HOOK_OK;
1444 }
1445 
1446 /* Called from main context */
1447 static pa_hook_result_t sink_state_changed_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
1448     struct output *o;
1449 
1450     if (!(o = find_output(u, s)))
1451         return PA_HOOK_OK;
1452 
1453     /* This state change might be triggered because we are creating a
1454      * stream here, in that case we don't want to create it a second
1455      * time here and enter a loop */
1456     if (o->ignore_state_change)
1457         return PA_HOOK_OK;
1458 
1459     output_verify(o);
1460 
1461     return PA_HOOK_OK;
1462 }
1463 
1464 int pa__init(pa_module*m) {
1465     struct userdata *u;
1466     pa_modargs *ma = NULL;
1467     const char *slaves, *rm;
1468     int resample_method;
1469     pa_sample_spec ss;
1470     pa_channel_map map;
1471     struct output *o;
1472     uint32_t idx;
1473     pa_sink_new_data data;
1474     uint32_t adjust_time_sec;
1475     size_t nbytes;
1476 
1477     pa_assert(m);
1478 
1479     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1480         pa_log("failed to parse module arguments");
1481         goto fail;
1482     }
1483 
1484     resample_method = m->core->resample_method;
1485     if ((rm = pa_modargs_get_value(ma, "resample_method", NULL))) {
1486         if ((resample_method = pa_parse_resample_method(rm)) < 0) {
1487             pa_log("invalid resample method '%s'", rm);
1488             goto fail;
1489         }
1490     }
1491 
1492     m->userdata = u = pa_xnew0(struct userdata, 1);
1493     u->core = m->core;
1494     u->module = m;
1495     u->rtpoll = pa_rtpoll_new();
1496 
1497     if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
1498         pa_log("pa_thread_mq_init() failed.");
1499         goto fail;
1500     }
1501 
1502     u->remix = !m->core->disable_remixing;
1503     if (pa_modargs_get_value_boolean(ma, "remix", &u->remix) < 0) {
1504         pa_log("Invalid boolean remix parameter");
1505         goto fail;
1506     }
1507 
1508     u->resample_method = resample_method;
1509     u->outputs = pa_idxset_new(NULL, NULL);
1510 #ifndef USE_SMOOTHER_2
1511     u->thread_info.smoother = pa_smoother_new(
1512             PA_USEC_PER_SEC,
1513             PA_USEC_PER_SEC*2,
1514             true,
1515             true,
1516             10,
1517             pa_rtclock_now(),
1518             true);
1519 #endif
1520 
1521     adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1522     if (pa_modargs_get_value_u32(ma, "adjust_time", &adjust_time_sec) < 0) {
1523         pa_log("Failed to parse adjust_time value");
1524         goto fail;
1525     }
1526 
1527     if (adjust_time_sec != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1528         u->adjust_time = adjust_time_sec * PA_USEC_PER_SEC;
1529     else
1530         u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1531 
1532     slaves = pa_modargs_get_value(ma, "slaves", NULL);
1533     u->automatic = !slaves;
1534 
1535     ss = m->core->default_sample_spec;
1536     map = m->core->default_channel_map;
1537 
1538     /* Check the specified slave sinks for sample_spec and channel_map to use for the combined sink */
1539     if (!u->automatic) {
1540         const char*split_state = NULL;
1541         char *n = NULL;
1542         pa_sample_spec slaves_spec;
1543         pa_channel_map slaves_map;
1544         bool is_first_slave = true;
1545 
1546         pa_sample_spec_init(&slaves_spec);
1547 
1548         while ((n = pa_split(slaves, ",", &split_state))) {
1549             pa_sink *slave_sink;
1550 
1551             if (!(slave_sink = pa_namereg_get(m->core, n, PA_NAMEREG_SINK))) {
1552                 pa_log("Invalid slave sink '%s'", n);
1553                 pa_xfree(n);
1554                 goto fail;
1555             }
1556 
1557             pa_xfree(n);
1558 
1559             if (is_first_slave) {
1560                 slaves_spec = slave_sink->sample_spec;
1561                 slaves_map = slave_sink->channel_map;
1562                 is_first_slave = false;
1563             } else {
1564                 if (slaves_spec.format != slave_sink->sample_spec.format)
1565                     slaves_spec.format = PA_SAMPLE_INVALID;
1566 
1567                 if (slaves_spec.rate < slave_sink->sample_spec.rate)
1568                     slaves_spec.rate = slave_sink->sample_spec.rate;
1569 
1570                 if (!pa_channel_map_equal(&slaves_map, &slave_sink->channel_map))
1571                     slaves_spec.channels = 0;
1572             }
1573         }
1574 
1575         if (!is_first_slave) {
1576             if (slaves_spec.format != PA_SAMPLE_INVALID)
1577                 ss.format = slaves_spec.format;
1578 
1579             ss.rate = slaves_spec.rate;
1580 
1581             if (slaves_spec.channels > 0) {
1582                 map = slaves_map;
1583                 ss.channels = slaves_map.channels;
1584             }
1585         }
1586     }
1587 
1588     if ((pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0)) {
1589         pa_log("Invalid sample specification.");
1590         goto fail;
1591     }
1592 
1593     pa_sink_new_data_init(&data);
1594     data.namereg_fail = false;
1595     data.driver = __FILE__;
1596     data.module = m;
1597     pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
1598     pa_sink_new_data_set_sample_spec(&data, &ss);
1599     pa_sink_new_data_set_channel_map(&data, &map);
1600     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1601 
1602     if (slaves)
1603         pa_proplist_sets(data.proplist, "combine.slaves", slaves);
1604 
1605     if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1606         pa_log("Invalid properties");
1607         pa_sink_new_data_done(&data);
1608         goto fail;
1609     }
1610 
1611     /* Check proplist for a description & fill in a default value if not */
1612     u->auto_desc = false;
1613     if (NULL == pa_proplist_gets(data.proplist, PA_PROP_DEVICE_DESCRIPTION)) {
1614         u->auto_desc = true;
1615         pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Simultaneous Output");
1616     }
1617 
1618     u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY);
1619     pa_sink_new_data_done(&data);
1620 
1621     if (!u->sink) {
1622         pa_log("Failed to create sink");
1623         goto fail;
1624     }
1625 
1626 #ifdef USE_SMOOTHER_2
1627     /* The smoother window size needs to be larger than the time between updates */
1628     u->thread_info.smoother = pa_smoother_2_new(u->adjust_time + 5*PA_USEC_PER_SEC, pa_rtclock_now(), pa_frame_size(&u->sink->sample_spec), u->sink->sample_spec.rate);
1629 #endif
1630 
1631     u->sink->parent.process_msg = sink_process_msg;
1632     u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
1633     u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
1634     u->sink->update_requested_latency = sink_update_requested_latency;
1635     u->sink->userdata = u;
1636 
1637     pa_sink_set_rtpoll(u->sink, u->rtpoll);
1638     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1639 
1640     nbytes = pa_usec_to_bytes(BLOCK_USEC, &u->sink->sample_spec);
1641     pa_sink_set_max_request(u->sink, nbytes);
1642     pa_sink_set_latency_range(u->sink, 0, BLOCK_USEC);
1643     /* pulse clamps the range, get the real values */
1644     u->default_min_latency = u->sink->thread_info.min_latency;
1645     u->default_max_latency = u->sink->thread_info.max_latency;
1646     u->block_usec = u->sink->thread_info.max_latency;
1647 
1648 
1649     if (!u->automatic) {
1650         const char*split_state;
1651         char *n = NULL;
1652         pa_assert(slaves);
1653 
1654         /* The slaves have been specified manually */
1655 
1656         split_state = NULL;
1657         while ((n = pa_split(slaves, ",", &split_state))) {
1658             pa_sink *slave_sink;
1659 
1660             if (!(slave_sink = pa_namereg_get(m->core, n, PA_NAMEREG_SINK)) || slave_sink == u->sink) {
1661                 pa_log("Invalid slave sink '%s'", n);
1662                 pa_xfree(n);
1663                 goto fail;
1664             }
1665 
1666             pa_xfree(n);
1667 
1668             if (!output_new(u, slave_sink)) {
1669                 pa_log("Failed to create slave sink input on sink '%s'.", slave_sink->name);
1670                 goto fail;
1671             }
1672         }
1673 
1674         if (pa_idxset_size(u->outputs) <= 1)
1675             pa_log_warn("No slave sinks specified.");
1676 
1677         u->sink_put_slot = NULL;
1678 
1679     } else {
1680         pa_sink *s;
1681 
1682         /* We're in automatic mode, we add every sink that matches our needs  */
1683 
1684         PA_IDXSET_FOREACH(s, m->core->sinks, idx) {
1685 
1686             if (!is_suitable_sink(u, s))
1687                 continue;
1688 
1689             if (!output_new(u, s)) {
1690                 pa_log("Failed to create sink input on sink '%s'.", s->name);
1691                 goto fail;
1692             }
1693         }
1694     }
1695 
1696     u->sink_put_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_PUT], PA_HOOK_LATE, (pa_hook_cb_t) sink_put_hook_cb, u);
1697     u->sink_unlink_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_UNLINK], PA_HOOK_EARLY, (pa_hook_cb_t) sink_unlink_hook_cb, u);
1698     u->sink_state_changed_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_STATE_CHANGED], PA_HOOK_NORMAL, (pa_hook_cb_t) sink_state_changed_hook_cb, u);
1699 
1700     u->thread_info.render_timestamp = 0;
1701 
1702     if (!(u->thread = pa_thread_new("combine", thread_func, u))) {
1703         pa_log("Failed to create thread.");
1704         goto fail;
1705     }
1706 
1707     /* Activate the sink and the sink inputs */
1708     pa_sink_put(u->sink);
1709 
1710     PA_IDXSET_FOREACH(o, u->outputs, idx)
1711         output_verify(o);
1712 
1713     if (u->adjust_time > 0)
1714         u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
1715 
1716     pa_modargs_free(ma);
1717 
1718     return 0;
1719 
1720 fail:
1721 
1722     if (ma)
1723         pa_modargs_free(ma);
1724 
1725     pa__done(m);
1726 
1727     return -1;
1728 }
1729 
1730 void pa__done(pa_module*m) {
1731     struct userdata *u;
1732 
1733     pa_assert(m);
1734 
1735     if (!(u = m->userdata))
1736         return;
1737 
1738     pa_strlist_free(u->unlinked_slaves);
1739 
1740     if (u->sink_put_slot)
1741         pa_hook_slot_free(u->sink_put_slot);
1742 
1743     if (u->sink_unlink_slot)
1744         pa_hook_slot_free(u->sink_unlink_slot);
1745 
1746     if (u->sink_state_changed_slot)
1747         pa_hook_slot_free(u->sink_state_changed_slot);
1748 
1749     if (u->outputs)
1750         pa_idxset_free(u->outputs, (pa_free_cb_t) output_free);
1751 
1752     if (u->sink)
1753         pa_sink_unlink(u->sink);
1754 
1755     if (u->thread) {
1756         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1757         pa_thread_free(u->thread);
1758     }
1759 
1760     pa_thread_mq_done(&u->thread_mq);
1761 
1762     if (u->sink)
1763         pa_sink_unref(u->sink);
1764 
1765     if (u->rtpoll)
1766         pa_rtpoll_free(u->rtpoll);
1767 
1768     if (u->time_event)
1769         u->core->mainloop->time_free(u->time_event);
1770 
1771     if (u->thread_info.smoother)
1772 #ifdef USE_SMOOTHER_2
1773         pa_smoother_2_free(u->thread_info.smoother);
1774 #else
1775         pa_smoother_free(u->thread_info.smoother);
1776 #endif
1777 
1778     pa_xfree(u);
1779 }
1780