1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2008 Lennart Poettering
5
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2.1 of the License,
9 or (at your option) any later version.
10
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include <stdio.h>
25 #include <errno.h>
26
27 #include <pulse/rtclock.h>
28 #include <pulse/timeval.h>
29 #include <pulse/util.h>
30 #include <pulse/xmalloc.h>
31
32 #include <pulsecore/macro.h>
33 #include <pulsecore/module.h>
34 #include <pulsecore/llist.h>
35 #include <pulsecore/sink.h>
36 #include <pulsecore/sink-input.h>
37 #include <pulsecore/memblockq.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/core-rtclock.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/namereg.h>
43 #include <pulsecore/thread.h>
44 #include <pulsecore/thread-mq.h>
45 #include <pulsecore/rtpoll.h>
46
47 #ifdef USE_SMOOTHER_2
48 #include <pulsecore/time-smoother_2.h>
49 #else
50 #include <pulsecore/time-smoother.h>
51 #endif
52
53 #include <pulsecore/strlist.h>
54
55 PA_MODULE_AUTHOR("Lennart Poettering");
56 PA_MODULE_DESCRIPTION("Combine multiple sinks to one");
57 PA_MODULE_VERSION(PACKAGE_VERSION);
58 PA_MODULE_LOAD_ONCE(false);
59 PA_MODULE_USAGE(
60 "sink_name=<name for the sink> "
61 "sink_properties=<properties for the sink> "
62 "slaves=<slave sinks> "
63 "adjust_time=<how often to readjust rates in s> "
64 "resample_method=<method> "
65 "format=<sample format> "
66 "rate=<sample rate> "
67 "channels=<number of channels> "
68 "channel_map=<channel map>"
69 "remix=<boolean>");
70
71 #define DEFAULT_SINK_NAME "combined"
72
73 #define MEMBLOCKQ_MAXLENGTH (1024*1024*16)
74
75 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
76
77 #define BLOCK_USEC (PA_USEC_PER_MSEC * 200)
78
79 static const char* const valid_modargs[] = {
80 "sink_name",
81 "sink_properties",
82 "slaves",
83 "adjust_time",
84 "resample_method",
85 "format",
86 "rate",
87 "channels",
88 "channel_map",
89 "remix",
90 NULL
91 };
92
93 struct output {
94 struct userdata *userdata;
95
96 pa_sink *sink;
97 pa_sink_input *sink_input;
98 bool ignore_state_change;
99
100 /* This message queue is only for POST messages, i.e. the messages that
101 * carry audio data from the sink thread to the output thread. The POST
102 * messages need to be handled in a separate queue, because the queue is
103 * processed not only in the output thread mainloop, but also inside the
104 * sink input pop() callback. Processing other messages (such as
105 * SET_REQUESTED_LATENCY) is not safe inside the pop() callback; at least
106 * one reason why it's not safe is that messages that generate rewind
107 * requests (such as SET_REQUESTED_LATENCY) cause crashes when processed
108 * in the pop() callback. */
109 pa_asyncmsgq *audio_inq;
110
111 /* This message queue is for all other messages than POST from the sink
112 * thread to the output thread (currently "all other messages" means just
113 * the SET_REQUESTED_LATENCY message). */
114 pa_asyncmsgq *control_inq;
115
116 /* Message queue from the output thread to the sink thread. */
117 pa_asyncmsgq *outq;
118
119 pa_rtpoll_item *audio_inq_rtpoll_item_read, *audio_inq_rtpoll_item_write;
120 pa_rtpoll_item *control_inq_rtpoll_item_read, *control_inq_rtpoll_item_write;
121 pa_rtpoll_item *outq_rtpoll_item_read, *outq_rtpoll_item_write;
122
123 pa_memblockq *memblockq;
124
125 /* For communication of the stream latencies to the main thread */
126 pa_usec_t total_latency;
127 struct {
128 pa_usec_t timestamp;
129 pa_usec_t sink_latency;
130 size_t output_memblockq_size;
131 uint64_t receive_counter;
132 } latency_snapshot;
133
134 uint64_t receive_counter;
135
136 /* For communication of the stream parameters to the sink thread */
137 pa_atomic_t max_request;
138 pa_atomic_t max_latency;
139 pa_atomic_t min_latency;
140
141 PA_LLIST_FIELDS(struct output);
142 };
143
144 struct userdata {
145 pa_core *core;
146 pa_module *module;
147 pa_sink *sink;
148
149 pa_thread *thread;
150 pa_thread_mq thread_mq;
151 pa_rtpoll *rtpoll;
152
153 pa_time_event *time_event;
154 pa_usec_t adjust_time;
155
156 bool automatic;
157 bool auto_desc;
158
159 pa_strlist *unlinked_slaves;
160
161 pa_hook_slot *sink_put_slot, *sink_unlink_slot, *sink_state_changed_slot;
162
163 pa_resample_method_t resample_method;
164
165 pa_usec_t block_usec;
166 pa_usec_t default_min_latency;
167 pa_usec_t default_max_latency;
168
169 pa_idxset* outputs; /* managed in main context */
170
171 bool remix;
172
173 struct {
174 PA_LLIST_HEAD(struct output, active_outputs); /* managed in IO thread context */
175 pa_atomic_t running; /* we cache that value here, so that every thread can query it cheaply */
176 pa_usec_t timestamp;
177 bool in_null_mode;
178 #ifdef USE_SMOOTHER_2
179 pa_smoother_2 *smoother;
180 #else
181 pa_smoother *smoother;
182 #endif
183 uint64_t counter;
184
185 uint64_t snapshot_counter;
186 pa_usec_t snapshot_time;
187
188 pa_usec_t render_timestamp;
189 } thread_info;
190 };
191
192 struct sink_snapshot {
193 pa_usec_t timestamp;
194 uint64_t send_counter;
195 };
196
197 enum {
198 SINK_MESSAGE_ADD_OUTPUT = PA_SINK_MESSAGE_MAX,
199 SINK_MESSAGE_REMOVE_OUTPUT,
200 SINK_MESSAGE_NEED,
201 SINK_MESSAGE_UPDATE_LATENCY,
202 SINK_MESSAGE_UPDATE_MAX_REQUEST,
203 SINK_MESSAGE_UPDATE_LATENCY_RANGE,
204 SINK_MESSAGE_GET_SNAPSHOT
205 };
206
207 enum {
208 SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
209 SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY,
210 SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
211 };
212
213 static void output_disable(struct output *o);
214 static void output_enable(struct output *o);
215 static void output_free(struct output *o);
216 static int output_create_sink_input(struct output *o);
217
218 /* rate controller, called from main context
219 * - maximum deviation from base rate is less than 1%
220 * - controller step size is limited to 2.01‰
221 * - exhibits hunting with USB or Bluetooth devices
222 */
rate_controller( struct output *o, uint32_t base_rate, uint32_t old_rate, int32_t latency_difference_usec)223 static uint32_t rate_controller(
224 struct output *o,
225 uint32_t base_rate, uint32_t old_rate,
226 int32_t latency_difference_usec) {
227
228 double new_rate, new_rate_1, new_rate_2;
229 double min_cycles_1, min_cycles_2;
230
231 /* Calculate next rate that is not more than 2‰ away from the last rate */
232 min_cycles_1 = (double)abs(latency_difference_usec) / o->userdata->adjust_time / 0.002 + 1;
233 new_rate_1 = old_rate + base_rate * (double)latency_difference_usec / min_cycles_1 / o->userdata->adjust_time;
234
235 /* Calculate best rate to correct the current latency offset, limit at
236 * 1% difference from base_rate */
237 min_cycles_2 = (double)abs(latency_difference_usec) / o->userdata->adjust_time / 0.01 + 1;
238 new_rate_2 = (double)base_rate * (1.0 + (double)latency_difference_usec / min_cycles_2 / o->userdata->adjust_time);
239
240 /* Choose the rate that is nearer to base_rate */
241 new_rate = new_rate_2;
242 if (abs(new_rate_1 - base_rate) < abs(new_rate_2 - base_rate))
243 new_rate = new_rate_1;
244
245 return (uint32_t)(new_rate + 0.5);
246 }
247
adjust_rates(struct userdata *u)248 static void adjust_rates(struct userdata *u) {
249 struct output *o;
250 struct sink_snapshot rdata;
251 pa_usec_t avg_total_latency = 0;
252 pa_usec_t target_latency = 0;
253 pa_usec_t max_sink_latency = 0;
254 pa_usec_t min_total_latency = (pa_usec_t)-1;
255 uint32_t base_rate;
256 uint32_t idx;
257 unsigned n = 0;
258 pa_usec_t now;
259 struct output *o_max;
260
261 pa_assert(u);
262 pa_sink_assert_ref(u->sink);
263
264 if (pa_idxset_size(u->outputs) <= 0)
265 return;
266
267 if (u->sink->state != PA_SINK_RUNNING)
268 return;
269
270 /* Get sink snapshot */
271 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_GET_SNAPSHOT, &rdata, 0, NULL);
272
273 /* The sink snapshot time is the time when the last data was rendered.
274 * Latency is calculated for that point in time. */
275 now = rdata.timestamp;
276
277 /* Sink snapshot is not yet valid. */
278 if (!now)
279 return;
280
281 PA_IDXSET_FOREACH(o, u->outputs, idx) {
282 pa_usec_t snapshot_latency;
283 int64_t time_difference;
284
285 if (!o->sink_input || !PA_SINK_IS_OPENED(o->sink->state))
286 continue;
287
288 /* The difference may become negative, because it is probable, that the last
289 * render time was before the sink input snapshot. In this case, the sink
290 * had some more latency at the render time, so subtracting the value still
291 * gives the right result. */
292 time_difference = (int64_t)now - (int64_t)o->latency_snapshot.timestamp;
293
294 /* Latency at sink snapshot time is sink input snapshot latency minus time
295 * passed between the two snapshots. */
296 snapshot_latency = o->latency_snapshot.sink_latency
297 + pa_bytes_to_usec(o->latency_snapshot.output_memblockq_size, &o->sink_input->sample_spec)
298 - time_difference;
299
300 /* Add the data that was sent between taking the sink input snapshot
301 * and the sink snapshot. */
302 snapshot_latency += pa_bytes_to_usec(rdata.send_counter - o->latency_snapshot.receive_counter, &o->sink_input->sample_spec);
303
304 /* This is the current combined latency of the slave sink and the related
305 * memblockq at the time of the sink snapshot. */
306 o->total_latency = snapshot_latency;
307 avg_total_latency += snapshot_latency;
308
309 /* Get max_sink_latency and min_total_latency for target selection. */
310 if (min_total_latency == (pa_usec_t)-1 || o->total_latency < min_total_latency)
311 min_total_latency = o->total_latency;
312
313 if (o->latency_snapshot.sink_latency > max_sink_latency) {
314 max_sink_latency = o->latency_snapshot.sink_latency;
315 o_max = o;
316 }
317
318 /* Debug output */
319 pa_log_debug("[%s] Snapshot sink latency = %0.2fms, total snapshot latency = %0.2fms", o->sink->name, (double) o->latency_snapshot.sink_latency / PA_USEC_PER_MSEC, (double) snapshot_latency / PA_USEC_PER_MSEC);
320
321 if (o->total_latency > 10*PA_USEC_PER_SEC)
322 pa_log_warn("[%s] Total latency of output is very high (%0.2fms), most likely the audio timing in one of your drivers is broken.", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC);
323
324 n++;
325 }
326
327 /* If there is no valid output there is nothing to do. */
328 if (min_total_latency == (pa_usec_t) -1)
329 return;
330
331 avg_total_latency /= n;
332
333 /* The target selection ensures, that at least one of the
334 * sinks will use the base rate and all other sinks are set
335 * relative to it. */
336 if (max_sink_latency > min_total_latency)
337 target_latency = o_max->total_latency;
338 else
339 target_latency = min_total_latency;
340
341 pa_log_info("[%s] avg total latency is %0.2f msec.", u->sink->name, (double) avg_total_latency / PA_USEC_PER_MSEC);
342 pa_log_info("[%s] target latency for all slaves is %0.2f msec.", u->sink->name, (double) target_latency / PA_USEC_PER_MSEC);
343
344 base_rate = u->sink->sample_spec.rate;
345
346 /* Calculate and set rates for the sink inputs. */
347 PA_IDXSET_FOREACH(o, u->outputs, idx) {
348 uint32_t new_rate;
349 int32_t latency_difference;
350
351 if (!o->sink_input || !PA_SINK_IS_OPENED(o->sink->state))
352 continue;
353
354 latency_difference = (int64_t)o->total_latency - (int64_t)target_latency;
355 new_rate = rate_controller(o, base_rate, o->sink_input->sample_spec.rate, latency_difference);
356
357 pa_log_info("[%s] new rate is %u Hz; ratio is %0.3f.", o->sink_input->sink->name, new_rate, (double) new_rate / base_rate);
358 pa_sink_input_set_rate(o->sink_input, new_rate);
359 }
360
361 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, NULL, (int64_t) avg_total_latency, NULL);
362 }
363
time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata)364 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
365 struct userdata *u = userdata;
366
367 pa_assert(u);
368 pa_assert(a);
369 pa_assert(u->time_event == e);
370
371 if (u->sink->state == PA_SINK_SUSPENDED) {
372 u->core->mainloop->time_free(e);
373 u->time_event = NULL;
374 } else {
375 struct output *o;
376 uint32_t idx;
377
378 pa_core_rttime_restart(u->core, e, pa_rtclock_now() + u->adjust_time);
379
380 /* Get latency snapshots */
381 PA_IDXSET_FOREACH(o, u->outputs, idx) {
382 pa_asyncmsgq_send(o->control_inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
383 }
384
385 }
386 adjust_rates(u);
387 }
388
process_render_null(struct userdata *u, pa_usec_t now)389 static void process_render_null(struct userdata *u, pa_usec_t now) {
390 size_t ate = 0;
391
392 pa_assert(u);
393 pa_assert(u->sink->thread_info.state == PA_SINK_RUNNING);
394
395 if (u->thread_info.in_null_mode)
396 u->thread_info.timestamp = now;
397
398 while (u->thread_info.timestamp < now + u->block_usec) {
399 pa_memchunk chunk;
400
401 pa_sink_render(u->sink, u->sink->thread_info.max_request, &chunk);
402 pa_memblock_unref(chunk.memblock);
403
404 u->thread_info.counter += chunk.length;
405
406 /* pa_log_debug("Ate %lu bytes.", (unsigned long) chunk.length); */
407 u->thread_info.timestamp += pa_bytes_to_usec(chunk.length, &u->sink->sample_spec);
408
409 ate += chunk.length;
410
411 if (ate >= u->sink->thread_info.max_request)
412 break;
413 }
414
415 /* pa_log_debug("Ate in sum %lu bytes (of %lu)", (unsigned long) ate, (unsigned long) nbytes); */
416
417 #ifdef USE_SMOOTHER_2
418 pa_smoother_2_put(u->thread_info.smoother, now,
419 u->thread_info.counter - pa_usec_to_bytes(u->thread_info.timestamp - now, &u->sink->sample_spec));
420 #else
421 pa_smoother_put(u->thread_info.smoother, now,
422 pa_bytes_to_usec(u->thread_info.counter, &u->sink->sample_spec) - (u->thread_info.timestamp - now));
423 #endif
424 }
425
thread_func(void *userdata)426 static void thread_func(void *userdata) {
427 struct userdata *u = userdata;
428
429 pa_assert(u);
430
431 pa_log_debug("Thread starting up");
432
433 if (u->core->realtime_scheduling)
434 pa_thread_make_realtime(u->core->realtime_priority+1);
435
436 pa_thread_mq_install(&u->thread_mq);
437
438 u->thread_info.timestamp = pa_rtclock_now();
439 u->thread_info.in_null_mode = false;
440
441 for (;;) {
442 int ret;
443
444 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
445 pa_sink_process_rewind(u->sink, 0);
446
447 /* If no outputs are connected, render some data and drop it immediately. */
448 if (u->sink->thread_info.state == PA_SINK_RUNNING && !u->thread_info.active_outputs) {
449 pa_usec_t now;
450
451 now = pa_rtclock_now();
452
453 if (!u->thread_info.in_null_mode || u->thread_info.timestamp <= now)
454 process_render_null(u, now);
455
456 pa_rtpoll_set_timer_absolute(u->rtpoll, u->thread_info.timestamp);
457 u->thread_info.in_null_mode = true;
458 } else {
459 pa_rtpoll_set_timer_disabled(u->rtpoll);
460 u->thread_info.in_null_mode = false;
461 }
462
463 /* Hmm, nothing to do. Let's sleep */
464 if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) {
465 pa_log_info("pa_rtpoll_run() = %i", ret);
466 goto fail;
467 }
468
469 if (ret == 0)
470 goto finish;
471 }
472
473 fail:
474 /* If this was no regular exit from the loop we have to continue
475 * processing messages until we received PA_MESSAGE_SHUTDOWN */
476 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
477 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
478
479 finish:
480 pa_log_debug("Thread shutting down");
481 }
482
483 /* Called from combine sink I/O thread context */
render_memblock(struct userdata *u, struct output *o, size_t length)484 static void render_memblock(struct userdata *u, struct output *o, size_t length) {
485 pa_assert(u);
486 pa_assert(o);
487
488 /* We are run by the sink thread, on behalf of an output (o). The
489 * output is waiting for us, hence it is safe to access its
490 * mainblockq and asyncmsgq directly. */
491
492 /* If we are not running, we cannot produce any data */
493 if (!pa_atomic_load(&u->thread_info.running))
494 return;
495
496 /* Maybe there's some data in the requesting output's queue
497 * now? */
498 while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
499 ;
500
501 /* Ok, now let's prepare some data if we really have to. Save the
502 * the time for latency calculations. */
503 u->thread_info.render_timestamp = pa_rtclock_now();
504
505 while (!pa_memblockq_is_readable(o->memblockq)) {
506 struct output *j;
507 pa_memchunk chunk;
508
509 /* Render data! */
510 pa_sink_render(u->sink, length, &chunk);
511
512 u->thread_info.counter += chunk.length;
513 o->receive_counter += chunk.length;
514
515 /* OK, let's send this data to the other threads */
516 PA_LLIST_FOREACH(j, u->thread_info.active_outputs) {
517 if (j == o)
518 continue;
519
520 pa_asyncmsgq_post(j->audio_inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
521 }
522
523 /* And place it directly into the requesting output's queue */
524 pa_memblockq_push_align(o->memblockq, &chunk);
525 pa_memblock_unref(chunk.memblock);
526 }
527 }
528
529 /* Called from I/O thread context */
request_memblock(struct output *o, size_t length)530 static void request_memblock(struct output *o, size_t length) {
531 pa_assert(o);
532 pa_sink_input_assert_ref(o->sink_input);
533 pa_sink_assert_ref(o->userdata->sink);
534
535 /* If another thread already prepared some data we received
536 * the data over the asyncmsgq, hence let's first process
537 * it. */
538 while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
539 ;
540
541 /* Check whether we're now readable */
542 if (pa_memblockq_is_readable(o->memblockq))
543 return;
544
545 /* OK, we need to prepare new data, but only if the sink is actually running */
546 if (pa_atomic_load(&o->userdata->thread_info.running))
547 pa_asyncmsgq_send(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_NEED, o, (int64_t) length, NULL);
548 }
549
550 /* Called from I/O thread context */
sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk)551 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
552 struct output *o;
553
554 pa_sink_input_assert_ref(i);
555 pa_assert_se(o = i->userdata);
556
557 /* If necessary, get some new data */
558 request_memblock(o, nbytes);
559
560 /* pa_log("%s q size is %u + %u (%u/%u)", */
561 /* i->sink->name, */
562 /* pa_memblockq_get_nblocks(o->memblockq), */
563 /* pa_memblockq_get_nblocks(i->thread_info.render_memblockq), */
564 /* pa_memblockq_get_maxrewind(o->memblockq), */
565 /* pa_memblockq_get_maxrewind(i->thread_info.render_memblockq)); */
566
567 if (pa_memblockq_peek(o->memblockq, chunk) < 0)
568 return -1;
569
570 pa_memblockq_drop(o->memblockq, chunk->length);
571
572 return 0;
573 }
574
575 /* Called from I/O thread context */
sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes)576 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
577 struct output *o;
578
579 pa_sink_input_assert_ref(i);
580 pa_assert_se(o = i->userdata);
581
582 pa_memblockq_rewind(o->memblockq, nbytes);
583 }
584
585 /* Called from I/O thread context */
sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes)586 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
587 struct output *o;
588
589 pa_sink_input_assert_ref(i);
590 pa_assert_se(o = i->userdata);
591
592 pa_memblockq_set_maxrewind(o->memblockq, nbytes);
593 }
594
595 /* Called from I/O thread context */
sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes)596 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
597 struct output *o;
598
599 pa_sink_input_assert_ref(i);
600 pa_assert_se(o = i->userdata);
601
602 if (pa_atomic_load(&o->max_request) == (int) nbytes)
603 return;
604
605 pa_atomic_store(&o->max_request, (int) nbytes);
606 pa_log_debug("Sink input update max request %lu", (unsigned long) nbytes);
607 pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
608 }
609
610 /* Called from thread context */
sink_input_update_sink_latency_range_cb(pa_sink_input *i)611 static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
612 struct output *o;
613 pa_usec_t min, max, fix;
614
615 pa_assert(i);
616
617 pa_sink_input_assert_ref(i);
618 pa_assert_se(o = i->userdata);
619
620 fix = i->sink->thread_info.fixed_latency;
621 if (fix > 0) {
622 min = fix;
623 max = fix;
624 } else {
625 min = i->sink->thread_info.min_latency;
626 max = i->sink->thread_info.max_latency;
627 }
628
629 if ((pa_atomic_load(&o->min_latency) == (int) min) &&
630 (pa_atomic_load(&o->max_latency) == (int) max))
631 return;
632
633 pa_atomic_store(&o->min_latency, (int) min);
634 pa_atomic_store(&o->max_latency, (int) max);
635 pa_log_debug("Sink input update latency range %lu %lu", (unsigned long) min, (unsigned long) max);
636 pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_LATENCY_RANGE, NULL, 0, NULL, NULL);
637 }
638
639 /* Called from I/O thread context */
sink_input_attach_cb(pa_sink_input *i)640 static void sink_input_attach_cb(pa_sink_input *i) {
641 struct output *o;
642 pa_usec_t fix, min, max;
643 size_t nbytes;
644
645 pa_sink_input_assert_ref(i);
646 pa_assert_se(o = i->userdata);
647
648 /* Set up the queue from the sink thread to us */
649 pa_assert(!o->audio_inq_rtpoll_item_read);
650 pa_assert(!o->control_inq_rtpoll_item_read);
651 pa_assert(!o->outq_rtpoll_item_write);
652
653 o->audio_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
654 i->sink->thread_info.rtpoll,
655 PA_RTPOLL_LATE, /* This one is not that important, since we check for data in _peek() anyway. */
656 o->audio_inq);
657
658 o->control_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
659 i->sink->thread_info.rtpoll,
660 PA_RTPOLL_NORMAL,
661 o->control_inq);
662
663 o->outq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
664 i->sink->thread_info.rtpoll,
665 PA_RTPOLL_EARLY,
666 o->outq);
667
668 pa_sink_input_request_rewind(i, 0, false, true, true);
669
670 nbytes = pa_sink_input_get_max_request(i);
671 pa_atomic_store(&o->max_request, (int) nbytes);
672 pa_log_debug("attach max request %lu", (unsigned long) nbytes);
673
674 fix = i->sink->thread_info.fixed_latency;
675 if (fix > 0) {
676 min = max = fix;
677 } else {
678 min = i->sink->thread_info.min_latency;
679 max = i->sink->thread_info.max_latency;
680 }
681 pa_atomic_store(&o->min_latency, (int) min);
682 pa_atomic_store(&o->max_latency, (int) max);
683 pa_log_debug("attach latency range %lu %lu", (unsigned long) min, (unsigned long) max);
684
685 /* We register the output. That means that the sink will start to pass data to
686 * this output. */
687 pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_ADD_OUTPUT, o, 0, NULL);
688 }
689
690 /* Called from I/O thread context */
sink_input_detach_cb(pa_sink_input *i)691 static void sink_input_detach_cb(pa_sink_input *i) {
692 struct output *o;
693
694 pa_sink_input_assert_ref(i);
695 pa_assert_se(o = i->userdata);
696
697 /* We unregister the output. That means that the sink doesn't
698 * pass any further data to this output */
699 pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
700
701 if (o->audio_inq_rtpoll_item_read) {
702 pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read);
703 o->audio_inq_rtpoll_item_read = NULL;
704 }
705
706 if (o->control_inq_rtpoll_item_read) {
707 pa_rtpoll_item_free(o->control_inq_rtpoll_item_read);
708 o->control_inq_rtpoll_item_read = NULL;
709 }
710
711 if (o->outq_rtpoll_item_write) {
712 pa_rtpoll_item_free(o->outq_rtpoll_item_write);
713 o->outq_rtpoll_item_write = NULL;
714 }
715
716 }
717
718 /* Called from main context */
sink_input_kill_cb(pa_sink_input *i)719 static void sink_input_kill_cb(pa_sink_input *i) {
720 struct output *o;
721
722 pa_sink_input_assert_ref(i);
723 pa_assert_se(o = i->userdata);
724
725 pa_module_unload_request(o->userdata->module, true);
726 pa_idxset_remove_by_data(o->userdata->outputs, o, NULL);
727 output_free(o);
728 }
729
730 /* Called from thread context */
sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk)731 static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
732 struct output *o = PA_SINK_INPUT(obj)->userdata;
733
734 switch (code) {
735
736 case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
737 pa_usec_t *r = data;
738
739 *r = pa_bytes_to_usec(pa_memblockq_get_length(o->memblockq), &o->sink_input->sample_spec);
740
741 /* Fall through, the default handler will add in the extra
742 * latency added by the resampler */
743 break;
744 }
745
746 case SINK_INPUT_MESSAGE_POST:
747
748 if (o->sink_input->sink->thread_info.state == PA_SINK_RUNNING) {
749 pa_memblockq_push_align(o->memblockq, chunk);
750 o->receive_counter += chunk->length;
751 } else
752 pa_memblockq_flush_write(o->memblockq, true);
753
754 return 0;
755
756 case SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY: {
757 pa_usec_t latency = (pa_usec_t) offset;
758
759 pa_sink_input_set_requested_latency_within_thread(o->sink_input, latency);
760
761 return 0;
762 }
763
764 case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
765 size_t length;
766
767 length = pa_memblockq_get_length(o->sink_input->thread_info.render_memblockq);
768
769 o->latency_snapshot.output_memblockq_size = pa_memblockq_get_length(o->memblockq);
770
771 /* Add content of memblockq's to sink latency */
772 o->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(o->sink, true) +
773 pa_bytes_to_usec(length, &o->sink->sample_spec);
774 /* Add resampler latency */
775 o->latency_snapshot.sink_latency += pa_resampler_get_delay_usec(o->sink_input->thread_info.resampler);
776
777 o->latency_snapshot.timestamp = pa_rtclock_now();
778
779 o->latency_snapshot.receive_counter = o->receive_counter;
780
781 return 0;
782 }
783 }
784
785 return pa_sink_input_process_msg(obj, code, data, offset, chunk);
786 }
787
788 /* Called from main context */
suspend(struct userdata *u)789 static void suspend(struct userdata *u) {
790 struct output *o;
791 uint32_t idx;
792
793 pa_assert(u);
794
795 /* Let's suspend by unlinking all streams */
796 PA_IDXSET_FOREACH(o, u->outputs, idx)
797 output_disable(o);
798
799 pa_log_info("Device suspended...");
800 }
801
802 /* Called from main context */
unsuspend(struct userdata *u)803 static void unsuspend(struct userdata *u) {
804 struct output *o;
805 uint32_t idx;
806
807 pa_assert(u);
808
809 /* Let's resume */
810 PA_IDXSET_FOREACH(o, u->outputs, idx)
811 output_enable(o);
812
813 pa_log_info("Resumed successfully...");
814 }
815
816 /* Called from main context */
sink_set_state_in_main_thread_cb(pa_sink *sink, pa_sink_state_t state, pa_suspend_cause_t suspend_cause)817 static int sink_set_state_in_main_thread_cb(pa_sink *sink, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
818 struct userdata *u;
819
820 pa_sink_assert_ref(sink);
821 pa_assert_se(u = sink->userdata);
822
823 /* It may be that only the suspend cause is changing, in which
824 * case there's nothing to do. */
825 if (state == u->sink->state)
826 return 0;
827
828 /* Please note that in contrast to the ALSA modules we call
829 * suspend/unsuspend from main context here! */
830
831 switch (state) {
832 case PA_SINK_SUSPENDED:
833 pa_assert(PA_SINK_IS_OPENED(u->sink->state));
834
835 suspend(u);
836 break;
837
838 case PA_SINK_IDLE:
839 case PA_SINK_RUNNING:
840
841 if (u->sink->state == PA_SINK_SUSPENDED)
842 unsuspend(u);
843
844 /* The first smoother update should be done early, otherwise the smoother will
845 * not be aware of the slave sink latencies and report far too small values.
846 * This is especially important if after an unsuspend the sink runs on a different
847 * latency than before. */
848 if (state == PA_SINK_RUNNING && !u->time_event && u->adjust_time > 0)
849 u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + pa_sink_get_requested_latency(u->sink), time_callback, u);
850
851 break;
852
853 case PA_SINK_UNLINKED:
854 case PA_SINK_INIT:
855 case PA_SINK_INVALID_STATE:
856 ;
857 }
858
859 return 0;
860 }
861
862 /* Called from the IO thread. */
sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause)863 static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
864 struct userdata *u;
865 bool running;
866
867 pa_assert(s);
868 pa_assert_se(u = s->userdata);
869
870 /* It may be that only the suspend cause is changing, in which case there's
871 * nothing to do. */
872 if (new_state == s->thread_info.state)
873 return 0;
874
875 running = new_state == PA_SINK_RUNNING;
876 pa_atomic_store(&u->thread_info.running, running);
877
878 if (running) {
879 u->thread_info.render_timestamp = 0;
880 #ifdef USE_SMOOTHER_2
881 pa_smoother_2_resume(u->thread_info.smoother, pa_rtclock_now());
882 } else
883 pa_smoother_2_pause(u->thread_info.smoother, pa_rtclock_now());
884 #else
885 pa_smoother_resume(u->thread_info.smoother, pa_rtclock_now(), true);
886 } else
887 pa_smoother_pause(u->thread_info.smoother, pa_rtclock_now());
888 #endif
889
890 return 0;
891 }
892
893 /* Called from IO context */
894 static void update_max_request(struct userdata *u) {
895 size_t max_request = 0;
896 struct output *o;
897
898 pa_assert(u);
899 pa_sink_assert_io_context(u->sink);
900
901 /* Collects the max_request values of all streams and sets the
902 * largest one locally */
903
904 PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
905 size_t mr = (size_t) pa_atomic_load(&o->max_request);
906
907 if (mr > max_request)
908 max_request = mr;
909 }
910
911 if (max_request <= 0)
912 max_request = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec);
913
914 pa_log_debug("Sink update max request %lu", (unsigned long) max_request);
915 pa_sink_set_max_request_within_thread(u->sink, max_request);
916 }
917
918 /* Called from IO context */
919 static void update_latency_range(struct userdata *u) {
920 pa_usec_t min_latency = 0, max_latency = (pa_usec_t) -1;
921 struct output *o;
922
923 pa_assert(u);
924 pa_sink_assert_io_context(u->sink);
925
926 /* Collects the latency_range values of all streams and sets
927 * the max of min and min of max locally */
928 PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
929 pa_usec_t min = (size_t) pa_atomic_load(&o->min_latency);
930 pa_usec_t max = (size_t) pa_atomic_load(&o->max_latency);
931
932 if (min > min_latency)
933 min_latency = min;
934 if (max_latency == (pa_usec_t) -1 || max < max_latency)
935 max_latency = max;
936 }
937 if (max_latency == (pa_usec_t) -1) {
938 /* No outputs, use default limits. */
939 min_latency = u->default_min_latency;
940 max_latency = u->default_max_latency;
941 }
942
943 /* As long as we don't support rewinding, we should limit the max latency
944 * to a conservative value. */
945 if (max_latency > u->default_max_latency)
946 max_latency = u->default_max_latency;
947
948 /* Never ever try to set lower max latency than min latency, it just
949 * doesn't make sense. */
950 if (max_latency < min_latency)
951 max_latency = min_latency;
952
953 pa_log_debug("Sink update latency range %" PRIu64 " %" PRIu64, min_latency, max_latency);
954 pa_sink_set_latency_range_within_thread(u->sink, min_latency, max_latency);
955 }
956
957 /* Called from thread context of the io thread */
958 static void output_add_within_thread(struct output *o) {
959 pa_assert(o);
960 pa_sink_assert_io_context(o->sink);
961
962 PA_LLIST_PREPEND(struct output, o->userdata->thread_info.active_outputs, o);
963
964 pa_assert(!o->outq_rtpoll_item_read);
965 pa_assert(!o->audio_inq_rtpoll_item_write);
966 pa_assert(!o->control_inq_rtpoll_item_write);
967
968 o->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
969 o->userdata->rtpoll,
970 PA_RTPOLL_EARLY-1, /* This item is very important */
971 o->outq);
972 o->audio_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
973 o->userdata->rtpoll,
974 PA_RTPOLL_EARLY,
975 o->audio_inq);
976 o->control_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
977 o->userdata->rtpoll,
978 PA_RTPOLL_NORMAL,
979 o->control_inq);
980 o->receive_counter = o->userdata->thread_info.counter;
981 }
982
983 /* Called from thread context of the io thread */
984 static void output_remove_within_thread(struct output *o) {
985 pa_assert(o);
986 pa_sink_assert_io_context(o->sink);
987
988 PA_LLIST_REMOVE(struct output, o->userdata->thread_info.active_outputs, o);
989
990 if (o->outq_rtpoll_item_read) {
991 pa_rtpoll_item_free(o->outq_rtpoll_item_read);
992 o->outq_rtpoll_item_read = NULL;
993 }
994
995 if (o->audio_inq_rtpoll_item_write) {
996 pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write);
997 o->audio_inq_rtpoll_item_write = NULL;
998 }
999
1000 if (o->control_inq_rtpoll_item_write) {
1001 pa_rtpoll_item_free(o->control_inq_rtpoll_item_write);
1002 o->control_inq_rtpoll_item_write = NULL;
1003 }
1004 }
1005
1006 /* Called from sink I/O thread context */
1007 static void sink_update_requested_latency(pa_sink *s) {
1008 struct userdata *u;
1009 struct output *o;
1010
1011 pa_sink_assert_ref(s);
1012 pa_assert_se(u = s->userdata);
1013
1014 u->block_usec = pa_sink_get_requested_latency_within_thread(s);
1015
1016 if (u->block_usec == (pa_usec_t) -1)
1017 u->block_usec = s->thread_info.max_latency;
1018
1019 pa_log_debug("Sink update requested latency %0.2f", (double) u->block_usec / PA_USEC_PER_MSEC);
1020
1021 /* Just hand this one over to all sink_inputs */
1022 PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
1023 pa_asyncmsgq_post(o->control_inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL,
1024 u->block_usec, NULL, NULL);
1025 }
1026 }
1027
1028
1029 /* Called from thread context of the io thread */
1030 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1031 struct userdata *u = PA_SINK(o)->userdata;
1032
1033 switch (code) {
1034
1035 case PA_SINK_MESSAGE_GET_LATENCY: {
1036 int64_t *delay = data;
1037
1038 #ifdef USE_SMOOTHER_2
1039 *delay = pa_smoother_2_get_delay(u->thread_info.smoother, pa_rtclock_now(), u->thread_info.counter);
1040 #else
1041 pa_usec_t x, y, c;
1042
1043 x = pa_rtclock_now();
1044 y = pa_smoother_get(u->thread_info.smoother, x);
1045
1046 c = pa_bytes_to_usec(u->thread_info.counter, &u->sink->sample_spec);
1047
1048 *delay = (int64_t)c - y;
1049 #endif
1050
1051 return 0;
1052 }
1053
1054 case SINK_MESSAGE_ADD_OUTPUT:
1055 output_add_within_thread(data);
1056 update_max_request(u);
1057 update_latency_range(u);
1058 return 0;
1059
1060 case SINK_MESSAGE_REMOVE_OUTPUT:
1061 output_remove_within_thread(data);
1062 update_max_request(u);
1063 update_latency_range(u);
1064 return 0;
1065
1066 case SINK_MESSAGE_NEED:
1067 render_memblock(u, (struct output*) data, (size_t) offset);
1068 return 0;
1069
1070 case SINK_MESSAGE_UPDATE_LATENCY: {
1071 #ifdef USE_SMOOTHER_2
1072 size_t latency;
1073
1074 latency = pa_usec_to_bytes((pa_usec_t)offset, &u->sink->sample_spec);
1075 pa_smoother_2_put(u->thread_info.smoother, u->thread_info.snapshot_time, (int64_t)u->thread_info.snapshot_counter - latency);
1076 #else
1077 pa_usec_t x, y, latency = (pa_usec_t) offset;
1078
1079 /* It may be possible that thread_info.counter has been increased
1080 * since we took the snapshot. Therefore we have to use the snapshot
1081 * time and counter instead of the current values. */
1082 x = u->thread_info.snapshot_time;
1083 y = pa_bytes_to_usec(u->thread_info.snapshot_counter, &u->sink->sample_spec);
1084
1085 if (y > latency)
1086 y -= latency;
1087 else
1088 y = 0;
1089
1090 pa_smoother_put(u->thread_info.smoother, x, y);
1091 #endif
1092 return 0;
1093 }
1094
1095 case SINK_MESSAGE_GET_SNAPSHOT: {
1096 struct sink_snapshot *rdata = data;
1097
1098 rdata->timestamp = u->thread_info.render_timestamp;
1099 rdata->send_counter = u->thread_info.counter;
1100 u->thread_info.snapshot_counter = u->thread_info.counter;
1101 u->thread_info.snapshot_time = u->thread_info.render_timestamp;
1102
1103 return 0;
1104 }
1105
1106 case SINK_MESSAGE_UPDATE_MAX_REQUEST:
1107 update_max_request(u);
1108 break;
1109
1110 case SINK_MESSAGE_UPDATE_LATENCY_RANGE:
1111 update_latency_range(u);
1112 break;
1113
1114 }
1115
1116 return pa_sink_process_msg(o, code, data, offset, chunk);
1117 }
1118
1119 static void update_description(struct userdata *u) {
1120 bool first = true;
1121 char *t;
1122 struct output *o;
1123 uint32_t idx;
1124
1125 pa_assert(u);
1126
1127 if (!u->auto_desc)
1128 return;
1129
1130 if (pa_idxset_isempty(u->outputs)) {
1131 pa_sink_set_description(u->sink, "Simultaneous output");
1132 return;
1133 }
1134
1135 t = pa_xstrdup("Simultaneous output to");
1136
1137 PA_IDXSET_FOREACH(o, u->outputs, idx) {
1138 char *e;
1139
1140 if (first) {
1141 e = pa_sprintf_malloc("%s %s", t, pa_strnull(pa_proplist_gets(o->sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1142 first = false;
1143 } else
1144 e = pa_sprintf_malloc("%s, %s", t, pa_strnull(pa_proplist_gets(o->sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1145
1146 pa_xfree(t);
1147 t = e;
1148 }
1149
1150 pa_sink_set_description(u->sink, t);
1151 pa_xfree(t);
1152 }
1153
1154 static int output_create_sink_input(struct output *o) {
1155 struct userdata *u;
1156 pa_sink_input_new_data data;
1157
1158 pa_assert(o);
1159
1160 if (o->sink_input)
1161 return 0;
1162
1163 u = o->userdata;
1164
1165 pa_sink_input_new_data_init(&data);
1166 pa_sink_input_new_data_set_sink(&data, o->sink, false, true);
1167 data.driver = __FILE__;
1168 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME, "Simultaneous output on %s", pa_strnull(pa_proplist_gets(o->sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1169 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1170 pa_sink_input_new_data_set_sample_spec(&data, &u->sink->sample_spec);
1171 pa_sink_input_new_data_set_channel_map(&data, &u->sink->channel_map);
1172 data.module = u->module;
1173 data.resample_method = u->resample_method;
1174 data.flags = PA_SINK_INPUT_VARIABLE_RATE|PA_SINK_INPUT_DONT_MOVE|PA_SINK_INPUT_NO_CREATE_ON_SUSPEND;
1175 data.origin_sink = u->sink;
1176
1177 if (!u->remix)
1178 data.flags |= PA_SINK_INPUT_NO_REMIX;
1179
1180 pa_sink_input_new(&o->sink_input, u->core, &data);
1181
1182 pa_sink_input_new_data_done(&data);
1183
1184 if (!o->sink_input)
1185 return -1;
1186
1187 o->sink_input->parent.process_msg = sink_input_process_msg;
1188 o->sink_input->pop = sink_input_pop_cb;
1189 o->sink_input->process_rewind = sink_input_process_rewind_cb;
1190 o->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1191 o->sink_input->update_max_request = sink_input_update_max_request_cb;
1192 o->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
1193 o->sink_input->attach = sink_input_attach_cb;
1194 o->sink_input->detach = sink_input_detach_cb;
1195 o->sink_input->kill = sink_input_kill_cb;
1196 o->sink_input->userdata = o;
1197
1198 pa_sink_input_set_requested_latency(o->sink_input, pa_sink_get_requested_latency(u->sink));
1199
1200 return 0;
1201 }
1202
1203 /* Called from main context */
1204 static struct output *output_new(struct userdata *u, pa_sink *sink) {
1205 struct output *o;
1206
1207 pa_assert(u);
1208 pa_assert(sink);
1209 pa_assert(u->sink);
1210
1211 o = pa_xnew0(struct output, 1);
1212 o->userdata = u;
1213
1214 o->audio_inq = pa_asyncmsgq_new(0);
1215 if (!o->audio_inq) {
1216 pa_log("pa_asyncmsgq_new() failed.");
1217 goto fail;
1218 }
1219
1220 o->control_inq = pa_asyncmsgq_new(0);
1221 if (!o->control_inq) {
1222 pa_log("pa_asyncmsgq_new() failed.");
1223 goto fail;
1224 }
1225
1226 o->outq = pa_asyncmsgq_new(0);
1227 if (!o->outq) {
1228 pa_log("pa_asyncmsgq_new() failed.");
1229 goto fail;
1230 }
1231
1232 o->sink = sink;
1233 o->memblockq = pa_memblockq_new(
1234 "module-combine-sink output memblockq",
1235 0,
1236 MEMBLOCKQ_MAXLENGTH,
1237 MEMBLOCKQ_MAXLENGTH,
1238 &u->sink->sample_spec,
1239 1,
1240 0,
1241 0,
1242 &u->sink->silence);
1243
1244 pa_assert_se(pa_idxset_put(u->outputs, o, NULL) == 0);
1245 update_description(u);
1246
1247 return o;
1248
1249 fail:
1250 output_free(o);
1251
1252 return NULL;
1253 }
1254
1255 /* Called from main context */
1256 static void output_free(struct output *o) {
1257 pa_assert(o);
1258
1259 output_disable(o);
1260 update_description(o->userdata);
1261
1262 if (o->audio_inq_rtpoll_item_read)
1263 pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read);
1264 if (o->audio_inq_rtpoll_item_write)
1265 pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write);
1266
1267 if (o->control_inq_rtpoll_item_read)
1268 pa_rtpoll_item_free(o->control_inq_rtpoll_item_read);
1269 if (o->control_inq_rtpoll_item_write)
1270 pa_rtpoll_item_free(o->control_inq_rtpoll_item_write);
1271
1272 if (o->outq_rtpoll_item_read)
1273 pa_rtpoll_item_free(o->outq_rtpoll_item_read);
1274 if (o->outq_rtpoll_item_write)
1275 pa_rtpoll_item_free(o->outq_rtpoll_item_write);
1276
1277 if (o->audio_inq)
1278 pa_asyncmsgq_unref(o->audio_inq);
1279
1280 if (o->control_inq)
1281 pa_asyncmsgq_unref(o->control_inq);
1282
1283 if (o->outq)
1284 pa_asyncmsgq_unref(o->outq);
1285
1286 if (o->memblockq)
1287 pa_memblockq_free(o->memblockq);
1288
1289 pa_xfree(o);
1290 }
1291
1292 /* Called from main context */
1293 static void output_enable(struct output *o) {
1294 pa_assert(o);
1295
1296 if (o->sink_input)
1297 return;
1298
1299 /* This might cause the sink to be resumed. The state change hook
1300 * of the sink might hence be called from here, which might then
1301 * cause us to be called in a loop. Make sure that state changes
1302 * for this output don't cause this loop by setting a flag here */
1303 o->ignore_state_change = true;
1304
1305 if (output_create_sink_input(o) >= 0) {
1306
1307 if (o->sink->state != PA_SINK_INIT) {
1308 /* Enable the sink input. That means that the sink
1309 * is now asked for new data. */
1310 pa_sink_input_put(o->sink_input);
1311 }
1312 }
1313
1314 o->ignore_state_change = false;
1315 }
1316
1317 /* Called from main context */
1318 static void output_disable(struct output *o) {
1319 pa_assert(o);
1320
1321 if (!o->sink_input)
1322 return;
1323
1324 /* We disable the sink input. That means that the sink is
1325 * not asked for new data anymore */
1326 pa_sink_input_unlink(o->sink_input);
1327
1328 /* Now deallocate the stream */
1329 pa_sink_input_unref(o->sink_input);
1330 o->sink_input = NULL;
1331
1332 /* Finally, drop all queued data */
1333 pa_memblockq_flush_write(o->memblockq, true);
1334 pa_asyncmsgq_flush(o->audio_inq, false);
1335 pa_asyncmsgq_flush(o->control_inq, false);
1336 pa_asyncmsgq_flush(o->outq, false);
1337 }
1338
1339 /* Called from main context */
1340 static void output_verify(struct output *o) {
1341 pa_assert(o);
1342
1343 if (PA_SINK_IS_OPENED(o->userdata->sink->state))
1344 output_enable(o);
1345 else
1346 output_disable(o);
1347 }
1348
1349 /* Called from main context */
1350 static bool is_suitable_sink(struct userdata *u, pa_sink *s) {
1351 const char *t;
1352
1353 pa_sink_assert_ref(s);
1354
1355 if (s == u->sink)
1356 return false;
1357
1358 if (!(s->flags & PA_SINK_HARDWARE))
1359 return false;
1360
1361 if (!(s->flags & PA_SINK_LATENCY))
1362 return false;
1363
1364 if ((t = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_CLASS)))
1365 if (!pa_streq(t, "sound"))
1366 return false;
1367
1368 return true;
1369 }
1370
1371 /* Called from main context */
1372 static pa_hook_result_t sink_put_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
1373 struct output *o;
1374
1375 pa_core_assert_ref(c);
1376 pa_sink_assert_ref(s);
1377 pa_assert(u);
1378
1379 if (u->automatic) {
1380 if (!is_suitable_sink(u, s))
1381 return PA_HOOK_OK;
1382 } else {
1383 /* Check if the sink is a previously unlinked slave (non-automatic mode) */
1384 pa_strlist *l = u->unlinked_slaves;
1385
1386 while (l && !pa_streq(pa_strlist_data(l), s->name))
1387 l = pa_strlist_next(l);
1388
1389 if (!l)
1390 return PA_HOOK_OK;
1391
1392 u->unlinked_slaves = pa_strlist_remove(u->unlinked_slaves, s->name);
1393 }
1394
1395 pa_log_info("Configuring new sink: %s", s->name);
1396 if (!(o = output_new(u, s))) {
1397 pa_log("Failed to create sink input on sink '%s'.", s->name);
1398 return PA_HOOK_OK;
1399 }
1400
1401 output_verify(o);
1402
1403 return PA_HOOK_OK;
1404 }
1405
1406 /* Called from main context */
1407 static struct output* find_output(struct userdata *u, pa_sink *s) {
1408 struct output *o;
1409 uint32_t idx;
1410
1411 pa_assert(u);
1412 pa_assert(s);
1413
1414 if (u->sink == s)
1415 return NULL;
1416
1417 PA_IDXSET_FOREACH(o, u->outputs, idx)
1418 if (o->sink == s)
1419 return o;
1420
1421 return NULL;
1422 }
1423
1424 /* Called from main context */
1425 static pa_hook_result_t sink_unlink_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
1426 struct output *o;
1427
1428 pa_assert(c);
1429 pa_sink_assert_ref(s);
1430 pa_assert(u);
1431
1432 if (!(o = find_output(u, s)))
1433 return PA_HOOK_OK;
1434
1435 pa_log_info("Unconfiguring sink: %s", s->name);
1436
1437 if (!u->automatic)
1438 u->unlinked_slaves = pa_strlist_prepend(u->unlinked_slaves, s->name);
1439
1440 pa_idxset_remove_by_data(u->outputs, o, NULL);
1441 output_free(o);
1442
1443 return PA_HOOK_OK;
1444 }
1445
1446 /* Called from main context */
1447 static pa_hook_result_t sink_state_changed_hook_cb(pa_core *c, pa_sink *s, struct userdata* u) {
1448 struct output *o;
1449
1450 if (!(o = find_output(u, s)))
1451 return PA_HOOK_OK;
1452
1453 /* This state change might be triggered because we are creating a
1454 * stream here, in that case we don't want to create it a second
1455 * time here and enter a loop */
1456 if (o->ignore_state_change)
1457 return PA_HOOK_OK;
1458
1459 output_verify(o);
1460
1461 return PA_HOOK_OK;
1462 }
1463
1464 int pa__init(pa_module*m) {
1465 struct userdata *u;
1466 pa_modargs *ma = NULL;
1467 const char *slaves, *rm;
1468 int resample_method;
1469 pa_sample_spec ss;
1470 pa_channel_map map;
1471 struct output *o;
1472 uint32_t idx;
1473 pa_sink_new_data data;
1474 uint32_t adjust_time_sec;
1475 size_t nbytes;
1476
1477 pa_assert(m);
1478
1479 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1480 pa_log("failed to parse module arguments");
1481 goto fail;
1482 }
1483
1484 resample_method = m->core->resample_method;
1485 if ((rm = pa_modargs_get_value(ma, "resample_method", NULL))) {
1486 if ((resample_method = pa_parse_resample_method(rm)) < 0) {
1487 pa_log("invalid resample method '%s'", rm);
1488 goto fail;
1489 }
1490 }
1491
1492 m->userdata = u = pa_xnew0(struct userdata, 1);
1493 u->core = m->core;
1494 u->module = m;
1495 u->rtpoll = pa_rtpoll_new();
1496
1497 if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
1498 pa_log("pa_thread_mq_init() failed.");
1499 goto fail;
1500 }
1501
1502 u->remix = !m->core->disable_remixing;
1503 if (pa_modargs_get_value_boolean(ma, "remix", &u->remix) < 0) {
1504 pa_log("Invalid boolean remix parameter");
1505 goto fail;
1506 }
1507
1508 u->resample_method = resample_method;
1509 u->outputs = pa_idxset_new(NULL, NULL);
1510 #ifndef USE_SMOOTHER_2
1511 u->thread_info.smoother = pa_smoother_new(
1512 PA_USEC_PER_SEC,
1513 PA_USEC_PER_SEC*2,
1514 true,
1515 true,
1516 10,
1517 pa_rtclock_now(),
1518 true);
1519 #endif
1520
1521 adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1522 if (pa_modargs_get_value_u32(ma, "adjust_time", &adjust_time_sec) < 0) {
1523 pa_log("Failed to parse adjust_time value");
1524 goto fail;
1525 }
1526
1527 if (adjust_time_sec != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1528 u->adjust_time = adjust_time_sec * PA_USEC_PER_SEC;
1529 else
1530 u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1531
1532 slaves = pa_modargs_get_value(ma, "slaves", NULL);
1533 u->automatic = !slaves;
1534
1535 ss = m->core->default_sample_spec;
1536 map = m->core->default_channel_map;
1537
1538 /* Check the specified slave sinks for sample_spec and channel_map to use for the combined sink */
1539 if (!u->automatic) {
1540 const char*split_state = NULL;
1541 char *n = NULL;
1542 pa_sample_spec slaves_spec;
1543 pa_channel_map slaves_map;
1544 bool is_first_slave = true;
1545
1546 pa_sample_spec_init(&slaves_spec);
1547
1548 while ((n = pa_split(slaves, ",", &split_state))) {
1549 pa_sink *slave_sink;
1550
1551 if (!(slave_sink = pa_namereg_get(m->core, n, PA_NAMEREG_SINK))) {
1552 pa_log("Invalid slave sink '%s'", n);
1553 pa_xfree(n);
1554 goto fail;
1555 }
1556
1557 pa_xfree(n);
1558
1559 if (is_first_slave) {
1560 slaves_spec = slave_sink->sample_spec;
1561 slaves_map = slave_sink->channel_map;
1562 is_first_slave = false;
1563 } else {
1564 if (slaves_spec.format != slave_sink->sample_spec.format)
1565 slaves_spec.format = PA_SAMPLE_INVALID;
1566
1567 if (slaves_spec.rate < slave_sink->sample_spec.rate)
1568 slaves_spec.rate = slave_sink->sample_spec.rate;
1569
1570 if (!pa_channel_map_equal(&slaves_map, &slave_sink->channel_map))
1571 slaves_spec.channels = 0;
1572 }
1573 }
1574
1575 if (!is_first_slave) {
1576 if (slaves_spec.format != PA_SAMPLE_INVALID)
1577 ss.format = slaves_spec.format;
1578
1579 ss.rate = slaves_spec.rate;
1580
1581 if (slaves_spec.channels > 0) {
1582 map = slaves_map;
1583 ss.channels = slaves_map.channels;
1584 }
1585 }
1586 }
1587
1588 if ((pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0)) {
1589 pa_log("Invalid sample specification.");
1590 goto fail;
1591 }
1592
1593 pa_sink_new_data_init(&data);
1594 data.namereg_fail = false;
1595 data.driver = __FILE__;
1596 data.module = m;
1597 pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
1598 pa_sink_new_data_set_sample_spec(&data, &ss);
1599 pa_sink_new_data_set_channel_map(&data, &map);
1600 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1601
1602 if (slaves)
1603 pa_proplist_sets(data.proplist, "combine.slaves", slaves);
1604
1605 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1606 pa_log("Invalid properties");
1607 pa_sink_new_data_done(&data);
1608 goto fail;
1609 }
1610
1611 /* Check proplist for a description & fill in a default value if not */
1612 u->auto_desc = false;
1613 if (NULL == pa_proplist_gets(data.proplist, PA_PROP_DEVICE_DESCRIPTION)) {
1614 u->auto_desc = true;
1615 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Simultaneous Output");
1616 }
1617
1618 u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY);
1619 pa_sink_new_data_done(&data);
1620
1621 if (!u->sink) {
1622 pa_log("Failed to create sink");
1623 goto fail;
1624 }
1625
1626 #ifdef USE_SMOOTHER_2
1627 /* The smoother window size needs to be larger than the time between updates */
1628 u->thread_info.smoother = pa_smoother_2_new(u->adjust_time + 5*PA_USEC_PER_SEC, pa_rtclock_now(), pa_frame_size(&u->sink->sample_spec), u->sink->sample_spec.rate);
1629 #endif
1630
1631 u->sink->parent.process_msg = sink_process_msg;
1632 u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
1633 u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
1634 u->sink->update_requested_latency = sink_update_requested_latency;
1635 u->sink->userdata = u;
1636
1637 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1638 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1639
1640 nbytes = pa_usec_to_bytes(BLOCK_USEC, &u->sink->sample_spec);
1641 pa_sink_set_max_request(u->sink, nbytes);
1642 pa_sink_set_latency_range(u->sink, 0, BLOCK_USEC);
1643 /* pulse clamps the range, get the real values */
1644 u->default_min_latency = u->sink->thread_info.min_latency;
1645 u->default_max_latency = u->sink->thread_info.max_latency;
1646 u->block_usec = u->sink->thread_info.max_latency;
1647
1648
1649 if (!u->automatic) {
1650 const char*split_state;
1651 char *n = NULL;
1652 pa_assert(slaves);
1653
1654 /* The slaves have been specified manually */
1655
1656 split_state = NULL;
1657 while ((n = pa_split(slaves, ",", &split_state))) {
1658 pa_sink *slave_sink;
1659
1660 if (!(slave_sink = pa_namereg_get(m->core, n, PA_NAMEREG_SINK)) || slave_sink == u->sink) {
1661 pa_log("Invalid slave sink '%s'", n);
1662 pa_xfree(n);
1663 goto fail;
1664 }
1665
1666 pa_xfree(n);
1667
1668 if (!output_new(u, slave_sink)) {
1669 pa_log("Failed to create slave sink input on sink '%s'.", slave_sink->name);
1670 goto fail;
1671 }
1672 }
1673
1674 if (pa_idxset_size(u->outputs) <= 1)
1675 pa_log_warn("No slave sinks specified.");
1676
1677 u->sink_put_slot = NULL;
1678
1679 } else {
1680 pa_sink *s;
1681
1682 /* We're in automatic mode, we add every sink that matches our needs */
1683
1684 PA_IDXSET_FOREACH(s, m->core->sinks, idx) {
1685
1686 if (!is_suitable_sink(u, s))
1687 continue;
1688
1689 if (!output_new(u, s)) {
1690 pa_log("Failed to create sink input on sink '%s'.", s->name);
1691 goto fail;
1692 }
1693 }
1694 }
1695
1696 u->sink_put_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_PUT], PA_HOOK_LATE, (pa_hook_cb_t) sink_put_hook_cb, u);
1697 u->sink_unlink_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_UNLINK], PA_HOOK_EARLY, (pa_hook_cb_t) sink_unlink_hook_cb, u);
1698 u->sink_state_changed_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_STATE_CHANGED], PA_HOOK_NORMAL, (pa_hook_cb_t) sink_state_changed_hook_cb, u);
1699
1700 u->thread_info.render_timestamp = 0;
1701
1702 if (!(u->thread = pa_thread_new("combine", thread_func, u))) {
1703 pa_log("Failed to create thread.");
1704 goto fail;
1705 }
1706
1707 /* Activate the sink and the sink inputs */
1708 pa_sink_put(u->sink);
1709
1710 PA_IDXSET_FOREACH(o, u->outputs, idx)
1711 output_verify(o);
1712
1713 if (u->adjust_time > 0)
1714 u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
1715
1716 pa_modargs_free(ma);
1717
1718 return 0;
1719
1720 fail:
1721
1722 if (ma)
1723 pa_modargs_free(ma);
1724
1725 pa__done(m);
1726
1727 return -1;
1728 }
1729
1730 void pa__done(pa_module*m) {
1731 struct userdata *u;
1732
1733 pa_assert(m);
1734
1735 if (!(u = m->userdata))
1736 return;
1737
1738 pa_strlist_free(u->unlinked_slaves);
1739
1740 if (u->sink_put_slot)
1741 pa_hook_slot_free(u->sink_put_slot);
1742
1743 if (u->sink_unlink_slot)
1744 pa_hook_slot_free(u->sink_unlink_slot);
1745
1746 if (u->sink_state_changed_slot)
1747 pa_hook_slot_free(u->sink_state_changed_slot);
1748
1749 if (u->outputs)
1750 pa_idxset_free(u->outputs, (pa_free_cb_t) output_free);
1751
1752 if (u->sink)
1753 pa_sink_unlink(u->sink);
1754
1755 if (u->thread) {
1756 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1757 pa_thread_free(u->thread);
1758 }
1759
1760 pa_thread_mq_done(&u->thread_mq);
1761
1762 if (u->sink)
1763 pa_sink_unref(u->sink);
1764
1765 if (u->rtpoll)
1766 pa_rtpoll_free(u->rtpoll);
1767
1768 if (u->time_event)
1769 u->core->mainloop->time_free(u->time_event);
1770
1771 if (u->thread_info.smoother)
1772 #ifdef USE_SMOOTHER_2
1773 pa_smoother_2_free(u->thread_info.smoother);
1774 #else
1775 pa_smoother_free(u->thread_info.smoother);
1776 #endif
1777
1778 pa_xfree(u);
1779 }
1780