1/***
2    This file is part of PulseAudio.
3
4    Copyright 2009 Intel Corporation
5    Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
6
7    PulseAudio is free software; you can redistribute it and/or modify
8    it under the terms of the GNU Lesser General Public License as published
9    by the Free Software Foundation; either version 2.1 of the License,
10    or (at your option) any later version.
11
12    PulseAudio is distributed in the hope that it will be useful, but
13    WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15    General Public License for more details.
16
17    You should have received a copy of the GNU Lesser General Public License
18    along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19***/
20
21#ifdef HAVE_CONFIG_H
22#include <config.h>
23#endif
24
25#include <stdio.h>
26
27#include <pulse/xmalloc.h>
28
29#include <pulsecore/sink-input.h>
30#include <pulsecore/module.h>
31#include <pulsecore/modargs.h>
32#include <pulsecore/namereg.h>
33#include <pulsecore/log.h>
34#include <pulsecore/core-util.h>
35
36#include <pulse/rtclock.h>
37#include <pulse/timeval.h>
38
39PA_MODULE_AUTHOR("Pierre-Louis Bossart, Georg Chini");
40PA_MODULE_DESCRIPTION("Loopback from source to sink");
41PA_MODULE_VERSION(PACKAGE_VERSION);
42PA_MODULE_LOAD_ONCE(false);
43PA_MODULE_USAGE(
44        "source=<source to connect to> "
45        "sink=<sink to connect to> "
46        "adjust_time=<how often to readjust rates in s> "
47        "latency_msec=<latency in ms> "
48        "max_latency_msec=<maximum latency in ms> "
49        "log_interval=<how often to log in s> "
50        "fast_adjust_threshold_msec=<threshold for fast adjust in ms> "
51        "adjust_threshold_usec=<threshold for latency adjustment in usec> "
52        "format=<sample format> "
53        "rate=<sample rate> "
54        "channels=<number of channels> "
55        "channel_map=<channel map> "
56        "sink_input_properties=<proplist> "
57        "source_output_properties=<proplist> "
58        "source_dont_move=<boolean> "
59        "sink_dont_move=<boolean> "
60        "remix=<remix channels?> ");
61
62#define DEFAULT_LATENCY_MSEC 200
63
64#define FILTER_PARAMETER 0.125
65
66#define DEFAULT_ADJUST_THRESHOLD_USEC 250
67
68#define MEMBLOCKQ_MAXLENGTH (1024*1024*32)
69
70#define MIN_DEVICE_LATENCY (2.5*PA_USEC_PER_MSEC)
71
72#define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
73
74typedef struct loopback_msg loopback_msg;
75
76struct userdata {
77    pa_core *core;
78    pa_module *module;
79
80    loopback_msg *msg;
81
82    pa_sink_input *sink_input;
83    pa_source_output *source_output;
84
85    pa_asyncmsgq *asyncmsgq;
86    pa_memblockq *memblockq;
87
88    pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
89
90    pa_time_event *time_event;
91
92    /* Variables used to calculate the average time between
93     * subsequent calls of adjust_rates() */
94    pa_usec_t adjust_time_stamp;
95    pa_usec_t real_adjust_time;
96    pa_usec_t real_adjust_time_sum;
97
98    /* Values from command line configuration */
99    pa_usec_t latency;
100    pa_usec_t max_latency;
101    pa_usec_t adjust_time;
102    pa_usec_t fast_adjust_threshold;
103    uint32_t adjust_threshold;
104    uint32_t log_interval;
105
106    /* Latency boundaries and current values */
107    pa_usec_t min_source_latency;
108    pa_usec_t max_source_latency;
109    pa_usec_t min_sink_latency;
110    pa_usec_t max_sink_latency;
111    pa_usec_t configured_sink_latency;
112    pa_usec_t configured_source_latency;
113    int64_t source_latency_offset;
114    int64_t sink_latency_offset;
115    pa_usec_t minimum_latency;
116
117    /* State variable of the latency controller */
118    int32_t last_latency_difference;
119    int64_t last_source_latency_offset;
120    int64_t last_sink_latency_offset;
121    int64_t next_latency_with_drift;
122    int64_t next_latency_at_optimum_rate_with_drift;
123
124    /* Filter varables used for 2nd order filter */
125    double drift_filter;
126    double drift_compensation_rate;
127
128    /* Variables for Kalman filter and error tracking*/
129    double latency_variance;
130    double kalman_variance;
131    double latency_error;
132
133    /* lower latency limit found by underruns */
134    pa_usec_t underrun_latency_limit;
135
136    /* Various counters */
137    uint32_t iteration_counter;
138    uint32_t underrun_counter;
139    uint32_t adjust_counter;
140    uint32_t target_latency_cross_counter;
141    uint32_t log_counter;
142
143    /* Various booleans */
144    bool fixed_alsa_source;
145    bool source_sink_changed;
146    bool underrun_occured;
147    bool source_latency_offset_changed;
148    bool sink_latency_offset_changed;
149    bool initial_adjust_pending;
150
151    /* Used for sink input and source output snapshots */
152    struct {
153        int64_t send_counter;
154        int64_t source_latency;
155        pa_usec_t source_timestamp;
156
157        int64_t recv_counter;
158        size_t loopback_memblockq_length;
159        int64_t sink_latency;
160        pa_usec_t sink_timestamp;
161    } latency_snapshot;
162
163    /* Input thread variable */
164    int64_t send_counter;
165
166    /* Output thread variables */
167    struct {
168        int64_t recv_counter;
169        pa_usec_t effective_source_latency;
170
171        /* Copied from main thread */
172        pa_usec_t minimum_latency;
173
174        /* Various booleans */
175        bool in_pop;
176        bool pop_called;
177        bool pop_adjust;
178        bool first_pop_done;
179        bool push_called;
180    } output_thread_info;
181};
182
183struct loopback_msg {
184    pa_msgobject parent;
185    struct userdata *userdata;
186    bool dead;
187};
188
189PA_DEFINE_PRIVATE_CLASS(loopback_msg, pa_msgobject);
190#define LOOPBACK_MSG(o) (loopback_msg_cast(o))
191
192static const char* const valid_modargs[] = {
193    "source",
194    "sink",
195    "adjust_time",
196    "latency_msec",
197    "max_latency_msec",
198    "log_interval",
199    "fast_adjust_threshold_msec",
200    "adjust_threshold_usec",
201    "format",
202    "rate",
203    "channels",
204    "channel_map",
205    "sink_input_properties",
206    "source_output_properties",
207    "source_dont_move",
208    "sink_dont_move",
209    "remix",
210    NULL,
211};
212
213enum {
214    SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
215    SINK_INPUT_MESSAGE_REWIND,
216    SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT,
217    SINK_INPUT_MESSAGE_SOURCE_CHANGED,
218    SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY,
219    SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY,
220    SINK_INPUT_MESSAGE_FAST_ADJUST,
221};
222
223enum {
224    SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT = PA_SOURCE_OUTPUT_MESSAGE_MAX,
225};
226
227enum {
228    LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED,
229    LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED,
230    LOOPBACK_MESSAGE_UNDERRUN,
231    LOOPBACK_MESSAGE_ADJUST_DONE,
232};
233
234static void enable_adjust_timer(struct userdata *u, bool enable);
235
236/* Called from main context */
237static void teardown(struct userdata *u) {
238    pa_assert(u);
239    pa_assert_ctl_context();
240
241    u->adjust_time = 0;
242    enable_adjust_timer(u, false);
243
244    if (u->msg)
245        u->msg->dead = true;
246
247    /* Handling the asyncmsgq between the source output and the sink input
248     * requires some care. When the source output is unlinked, nothing needs
249     * to be done for the asyncmsgq, because the source output is the sending
250     * end. But when the sink input is unlinked, we should ensure that the
251     * asyncmsgq is emptied, because the messages in the queue hold references
252     * to the sink input. Also, we need to ensure that new messages won't be
253     * written to the queue after we have emptied it.
254     *
255     * Emptying the queue can be done in the state_change() callback of the
256     * sink input, when the new state is "unlinked".
257     *
258     * Preventing new messages from being written to the queue can be achieved
259     * by unlinking the source output before unlinking the sink input. There
260     * are no other writers for that queue, so this is sufficient. */
261
262    if (u->source_output) {
263        pa_source_output_unlink(u->source_output);
264        pa_source_output_unref(u->source_output);
265        u->source_output = NULL;
266    }
267
268    if (u->sink_input) {
269        pa_sink_input_unlink(u->sink_input);
270        pa_sink_input_unref(u->sink_input);
271        u->sink_input = NULL;
272    }
273}
274
275/* rate controller, called from main context
276 * - maximum deviation from optimum rate for P-controller is less than 1%
277 * - P-controller step size is limited to 2.01‰
278 * - will calculate an optimum rate
279*/
280static uint32_t rate_controller(
281                struct userdata *u,
282                uint32_t base_rate, uint32_t old_rate,
283                int32_t latency_difference_at_optimum_rate,
284                int32_t latency_difference_at_base_rate) {
285
286    double new_rate, new_rate_1, new_rate_2;
287    double min_cycles_1, min_cycles_2, drift_rate, latency_drift, controller_weight, min_weight;
288    uint32_t base_rate_with_drift;
289
290    base_rate_with_drift = (int)(base_rate + u->drift_compensation_rate);
291
292    /* If we are less than 2‰ away from the optimum rate, lower weight of the
293     * P-controller. The weight is determined by the fact that a correction
294     * of 0.5 Hz needs to be applied by the controller when the latency
295     * difference gets larger than the threshold. The weight follows
296     * from the definition of the controller. The minimum will only
297     * be reached when one adjust threshold away from the target. Start
298     * using the weight after the target latency has been reached for the
299     * second time to accelerate initial convergence. The second time has
300     * been chosen because it takes a while before the smoother returns
301     * reliable latencies. */
302    controller_weight = 1;
303    min_weight = PA_CLAMP(0.5 / (double)base_rate * (100.0 + (double)u->real_adjust_time / u->adjust_threshold), 0, 1.0);
304    if ((double)abs((int)(old_rate - base_rate_with_drift)) / base_rate_with_drift < 0.002 && u->target_latency_cross_counter >= 2)
305        controller_weight = PA_CLAMP((double)abs(latency_difference_at_optimum_rate) / u->adjust_threshold * min_weight, min_weight, 1.0);
306
307    /* Calculate next rate that is not more than 2‰ away from the last rate */
308    min_cycles_1 = (double)abs(latency_difference_at_optimum_rate) / u->real_adjust_time / 0.002 + 1;
309    new_rate_1 = old_rate + base_rate * (double)latency_difference_at_optimum_rate / min_cycles_1 / u->real_adjust_time;
310
311    /* Calculate best rate to correct the current latency offset, limit at
312     * 1% difference from base_rate */
313    min_cycles_2 = (double)abs(latency_difference_at_optimum_rate) / u->real_adjust_time / 0.01 + 1;
314    new_rate_2 = (double)base_rate * (1.0 + controller_weight * latency_difference_at_optimum_rate / min_cycles_2 / u->real_adjust_time);
315
316    /* Choose the rate that is nearer to base_rate unless we are already near
317     * to the desired latency and rate */
318    if (abs((int)(new_rate_1 - base_rate)) < abs((int)(new_rate_2 - base_rate)) && controller_weight > 0.99)
319        new_rate = new_rate_1;
320    else
321        new_rate = new_rate_2;
322
323    /* Calculate rate difference between source and sink. Skip calculation
324     * after a source/sink change, an underrun or latency offset change */
325
326    if (!u->underrun_occured && !u->source_sink_changed && !u->source_latency_offset_changed && !u->sink_latency_offset_changed) {
327        /* Latency difference between last iterations */
328        latency_drift = latency_difference_at_base_rate - u->last_latency_difference;
329
330        /* Calculate frequency difference between source and sink */
331        drift_rate = latency_drift * old_rate / u->real_adjust_time + old_rate - base_rate;
332
333        /* The maximum accepted sample rate difference between source and
334         * sink is 1% of the base rate. If the result is larger, something
335         * went wrong, so do not use it. Pass in 0 instead to allow the
336         * filter to decay. */
337        if (abs((int)drift_rate) > base_rate / 100)
338            drift_rate = 0;
339
340        /* 2nd order lowpass filter */
341        u->drift_filter = (1 - FILTER_PARAMETER) * u->drift_filter + FILTER_PARAMETER * drift_rate;
342        u->drift_compensation_rate =  (1 - FILTER_PARAMETER) * u->drift_compensation_rate + FILTER_PARAMETER * u->drift_filter;
343    }
344
345    /* Use drift compensation. Though not likely, the rate might exceed the maximum allowed rate now. */
346    new_rate = new_rate + u->drift_compensation_rate + 0.5;
347
348    if (new_rate > base_rate * 101 / 100)
349        return base_rate * 101 / 100;
350    else if (new_rate < base_rate * 99 / 100)
351        return base_rate * 99 / 100;
352    else
353        return (int)new_rate;
354}
355
356/* Called from main thread.
357 * It has been a matter of discussion how to correctly calculate the minimum
358 * latency that module-loopback can deliver with a given source and sink.
359 * The calculation has been placed in a separate function so that the definition
360 * can easily be changed. The resulting estimate is not very exact because it
361 * depends on the reported latency ranges. In cases were the lower bounds of
362 * source and sink latency are not reported correctly (USB) the result will
363 * be wrong. */
364static void update_minimum_latency(struct userdata *u, pa_sink *sink, bool print_msg) {
365
366    if (u->underrun_latency_limit)
367        /* If we already detected a real latency limit because of underruns, use it */
368        u->minimum_latency = u->underrun_latency_limit;
369
370    else {
371        /* Calculate latency limit from latency ranges */
372
373        u->minimum_latency = u->min_sink_latency;
374        if (u->fixed_alsa_source)
375            /* If we are using an alsa source with fixed latency, we will get a wakeup when
376             * one fragment is filled, and then we empty the source buffer, so the source
377             * latency never grows much beyond one fragment (assuming that the CPU doesn't
378             * cause a bottleneck). */
379            u->minimum_latency += u->core->default_fragment_size_msec * PA_USEC_PER_MSEC;
380
381        else
382            /* In all other cases the source will deliver new data at latest after one source latency.
383             * Make sure there is enough data available that the sink can keep on playing until new
384             * data is pushed. */
385            u->minimum_latency += u->min_source_latency;
386
387        /* Multiply by 1.1 as a safety margin for delays that are proportional to the buffer sizes */
388        u->minimum_latency *= 1.1;
389
390        /* Add 1.5 ms as a safety margin for delays not related to the buffer sizes */
391        u->minimum_latency += 1.5 * PA_USEC_PER_MSEC;
392    }
393
394    /* Add the latency offsets */
395    if (-(u->sink_latency_offset + u->source_latency_offset) <= (int64_t)u->minimum_latency)
396        u->minimum_latency += u->sink_latency_offset + u->source_latency_offset;
397    else
398        u->minimum_latency = 0;
399
400    /* If the sink is valid, send a message to update the minimum latency to
401     * the output thread, else set the variable directly */
402    if (sink)
403        pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY, NULL, u->minimum_latency, NULL);
404    else
405        u->output_thread_info.minimum_latency = u->minimum_latency;
406
407    if (print_msg) {
408        pa_log_info("Minimum possible end to end latency: %0.2f ms", (double)u->minimum_latency / PA_USEC_PER_MSEC);
409        if (u->latency < u->minimum_latency)
410            pa_log_warn("Configured latency of %0.2f ms is smaller than minimum latency, using minimum instead", (double)u->latency / PA_USEC_PER_MSEC);
411    }
412}
413
414/* Called from main context */
415static void adjust_rates(struct userdata *u) {
416    size_t buffer;
417    uint32_t old_rate, base_rate, new_rate, run_hours;
418    int32_t latency_difference;
419    pa_usec_t current_buffer_latency, snapshot_delay;
420    int64_t current_source_sink_latency, current_latency, latency_at_optimum_rate;
421    pa_usec_t final_latency, now, time_passed;
422    double filtered_latency, current_latency_error, latency_correction, base_rate_with_drift;
423
424    pa_assert(u);
425    pa_assert_ctl_context();
426
427    /* Runtime and counters since last change of source or sink
428     * or source/sink latency */
429    run_hours = u->iteration_counter * u->real_adjust_time / PA_USEC_PER_SEC / 3600;
430    u->iteration_counter +=1;
431
432    /* If we are seeing underruns then the latency is too small */
433    if (u->underrun_counter > 2) {
434        pa_usec_t target_latency;
435
436        target_latency = PA_MAX(u->latency, u->minimum_latency) + 5 * PA_USEC_PER_MSEC;
437
438        if (u->max_latency == 0 || target_latency < u->max_latency) {
439            u->underrun_latency_limit = PA_CLIP_SUB((int64_t)target_latency, u->sink_latency_offset + u->source_latency_offset);
440            pa_log_warn("Too many underruns, increasing latency to %0.2f ms", (double)target_latency / PA_USEC_PER_MSEC);
441        } else {
442            u->underrun_latency_limit = PA_CLIP_SUB((int64_t)u->max_latency, u->sink_latency_offset + u->source_latency_offset);
443            pa_log_warn("Too many underruns, configured maximum latency of %0.2f ms is reached", (double)u->max_latency / PA_USEC_PER_MSEC);
444            pa_log_warn("Consider increasing the max_latency_msec");
445        }
446
447        update_minimum_latency(u, u->sink_input->sink, false);
448        u->underrun_counter = 0;
449    }
450
451    /* Allow one underrun per hour */
452    if (u->iteration_counter * u->real_adjust_time / PA_USEC_PER_SEC / 3600 > run_hours) {
453        u->underrun_counter = PA_CLIP_SUB(u->underrun_counter, 1u);
454        pa_log_info("Underrun counter: %u", u->underrun_counter);
455    }
456
457    /* Calculate real adjust time if source or sink did not change and if the system has
458     * not been suspended. If the time between two calls is more than 5% longer than the
459     * configured adjust time, we assume that the system has been sleeping and skip the
460     * calculation for this iteration. When source or sink changed or the system has been
461     * sleeping, we need to reset the parameters for drift compensation. */
462    now = pa_rtclock_now();
463    time_passed = now - u->adjust_time_stamp;
464    if (!u->source_sink_changed && time_passed < u->adjust_time * 1.05) {
465        u->adjust_counter++;
466        u->real_adjust_time_sum += time_passed;
467        u->real_adjust_time = u->real_adjust_time_sum / u->adjust_counter;
468    } else {
469        u->drift_compensation_rate = 0;
470        u->drift_filter = 0;
471        /* Ensure that source_sink_changed is set, so that the Kalman filter parameters
472         * will also be reset. */
473        u->source_sink_changed = true;
474    }
475    u->adjust_time_stamp = now;
476
477    /* Rates and latencies */
478    old_rate = u->sink_input->sample_spec.rate;
479    base_rate = u->source_output->sample_spec.rate;
480
481    buffer = u->latency_snapshot.loopback_memblockq_length;
482    if (u->latency_snapshot.recv_counter <= u->latency_snapshot.send_counter)
483        buffer += (size_t) (u->latency_snapshot.send_counter - u->latency_snapshot.recv_counter);
484    else
485        buffer = PA_CLIP_SUB(buffer, (size_t) (u->latency_snapshot.recv_counter - u->latency_snapshot.send_counter));
486
487    current_buffer_latency = pa_bytes_to_usec(buffer, &u->sink_input->sample_spec);
488    snapshot_delay = u->latency_snapshot.source_timestamp - u->latency_snapshot.sink_timestamp;
489    current_source_sink_latency = u->latency_snapshot.sink_latency + u->latency_snapshot.source_latency - snapshot_delay;
490
491    /* Current latency */
492    current_latency = current_source_sink_latency + current_buffer_latency;
493
494    /* Latency at optimum rate and latency difference */
495    latency_at_optimum_rate = current_source_sink_latency + current_buffer_latency * old_rate / (u->drift_compensation_rate + base_rate);
496
497    final_latency = PA_MAX(u->latency, u->minimum_latency);
498    latency_difference = (int32_t)(current_latency - final_latency);
499
500    /* Do not filter or calculate error if source or sink changed or if there was an underrun */
501    if (u->source_sink_changed || u->underrun_occured) {
502        /* Initial conditions are very unsure, so use a high variance */
503        u->kalman_variance = 10000000;
504        filtered_latency = latency_at_optimum_rate;
505        u->next_latency_at_optimum_rate_with_drift = latency_at_optimum_rate;
506        u->next_latency_with_drift = current_latency;
507
508    } else {
509        /* Correct predictions if one of the latency offsets changed between iterations */
510        u->next_latency_at_optimum_rate_with_drift += u->source_latency_offset - u->last_source_latency_offset;
511        u->next_latency_at_optimum_rate_with_drift += u->sink_latency_offset - u->last_sink_latency_offset;
512        u->next_latency_with_drift += u->source_latency_offset - u->last_source_latency_offset;
513        u->next_latency_with_drift += u->sink_latency_offset - u->last_sink_latency_offset;
514        /* Low pass filtered latency error. This value reflects how well the measured values match the prediction. */
515        u->latency_error = (1 - FILTER_PARAMETER) * u->latency_error + FILTER_PARAMETER * (double)abs((int32_t)(current_latency - u->next_latency_with_drift));
516        /* Low pass filtered latency variance */
517        current_latency_error = (double)abs((int32_t)(latency_at_optimum_rate - u->next_latency_at_optimum_rate_with_drift));
518        u->latency_variance = (1.0 - FILTER_PARAMETER) * u->latency_variance + FILTER_PARAMETER * current_latency_error * current_latency_error;
519        /* Kalman filter */
520        filtered_latency = (latency_at_optimum_rate * u->kalman_variance + u->next_latency_at_optimum_rate_with_drift * u->latency_variance) / (u->kalman_variance + u->latency_variance);
521        u->kalman_variance = u->kalman_variance * u->latency_variance / (u->kalman_variance + u->latency_variance) + u->latency_variance / 4 + 200;
522    }
523
524    /* Drop or insert samples if fast_adjust_threshold_msec was specified and the latency difference is too large. */
525    if (u->fast_adjust_threshold > 0 && abs(latency_difference) > u->fast_adjust_threshold) {
526        pa_log_debug ("Latency difference larger than %" PRIu64 " msec, skipping or inserting samples.", u->fast_adjust_threshold / PA_USEC_PER_MSEC);
527
528        pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_FAST_ADJUST, NULL, current_source_sink_latency, NULL);
529
530        /* Skip real adjust time calculation and reset drift compensation parameters on next iteration. */
531        u->source_sink_changed = true;
532
533        /* We probably need to adjust again, reset cross_counter. */
534        u->target_latency_cross_counter = 0;
535        return;
536    }
537
538    /* Calculate new rate */
539    new_rate = rate_controller(u, base_rate, old_rate, (int32_t)(filtered_latency - final_latency), latency_difference);
540
541    /* Log every log_interval iterations if the log_interval parameter is set */
542    if (u->log_interval != 0) {
543        u->log_counter--;
544        if (u->log_counter == 0) {
545            pa_log_debug("Loopback status %s to %s:\n    Source latency: %0.2f ms\n    Buffer: %0.2f ms\n    Sink latency: %0.2f ms\n    End-to-end latency: %0.2f ms\n"
546                         "    Deviation from target latency at optimum rate: %0.2f usec\n    Average prediction error: ± %0.2f usec\n    Optimum rate: %0.2f Hz\n    Deviation from base rate: %i Hz",
547                        u->source_output->source->name,
548                        u->sink_input->sink->name,
549                        (double) u->latency_snapshot.source_latency / PA_USEC_PER_MSEC,
550                        (double) current_buffer_latency / PA_USEC_PER_MSEC,
551                        (double) u->latency_snapshot.sink_latency / PA_USEC_PER_MSEC,
552                        (double) current_latency / PA_USEC_PER_MSEC,
553                        (double) latency_at_optimum_rate - final_latency,
554                        (double) u->latency_error,
555                        u->drift_compensation_rate + base_rate,
556                        (int32_t)(new_rate - base_rate));
557            u->log_counter = u->log_interval;
558        }
559    }
560
561    /* If the latency difference changed sign, we have crossed the target latency. */
562    if ((int64_t)latency_difference * u->last_latency_difference < 0)
563        u->target_latency_cross_counter++;
564
565    /* Save current latency difference at new rate for next cycle and reset flags */
566    u->last_latency_difference = current_source_sink_latency + current_buffer_latency * old_rate / new_rate - final_latency;
567
568    /* Set variables that may change between calls of adjust_rate() */
569    u->source_sink_changed = false;
570    u->underrun_occured = false;
571    u->last_source_latency_offset = u->source_latency_offset;
572    u->last_sink_latency_offset = u->sink_latency_offset;
573    u->source_latency_offset_changed = false;
574    u->sink_latency_offset_changed = false;
575
576    /* Predicton of next latency */
577
578    /* Evaluate optimum rate */
579    base_rate_with_drift = u->drift_compensation_rate + base_rate;
580
581    /* Latency correction on next iteration */
582    latency_correction = (base_rate_with_drift - new_rate) * (int64_t)u->real_adjust_time / new_rate;
583
584    if ((int)new_rate != (int)base_rate_with_drift || new_rate != old_rate) {
585        /* While we are correcting, the next latency is determined by the current value and the difference
586         * between the new sampling rate and the base rate*/
587        u->next_latency_with_drift = current_latency + latency_correction + ((double)old_rate / new_rate - 1) * current_buffer_latency;
588        u->next_latency_at_optimum_rate_with_drift = filtered_latency + latency_correction * new_rate / base_rate_with_drift;
589
590    } else {
591        /* We are in steady state, now only the fractional drift should matter.
592         * To make sure that we do not drift away due to errors in the fractional
593         * drift, use a running average of the measured and predicted values */
594        u->next_latency_with_drift = (filtered_latency + u->next_latency_with_drift) / 2.0 + (1.0 - (double)(int)base_rate_with_drift / base_rate_with_drift) * (int64_t)u->real_adjust_time;
595
596        /* We are at the optimum rate, so nothing to correct */
597        u->next_latency_at_optimum_rate_with_drift = u->next_latency_with_drift;
598    }
599
600    /* Set rate */
601    pa_sink_input_set_rate(u->sink_input, new_rate);
602}
603
604/* Called from main context */
605static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
606    struct userdata *u = userdata;
607
608    pa_assert(u);
609    pa_assert(a);
610    pa_assert(u->time_event == e);
611
612    /* Restart timer right away */
613    pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
614
615    /* If the initial latency adjustment has not been done yet, we have to skip
616     * adjust_rates(). The estimation of the optimum rate cannot be done in that
617     * situation */
618    if (u->initial_adjust_pending)
619        return;
620
621    /* Get sink and source latency snapshot */
622    pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
623    pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
624
625    adjust_rates(u);
626}
627
628/* Called from main context
629 * When source or sink changes, give it a third of a second to settle down, then call adjust_rates for the first time */
630static void enable_adjust_timer(struct userdata *u, bool enable) {
631    if (enable) {
632        if (!u->adjust_time)
633            return;
634        if (u->time_event)
635            u->core->mainloop->time_free(u->time_event);
636
637        u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + 333 * PA_USEC_PER_MSEC, time_callback, u);
638    } else {
639        if (!u->time_event)
640            return;
641
642        u->core->mainloop->time_free(u->time_event);
643        u->time_event = NULL;
644    }
645}
646
647/* Called from main context */
648static void update_adjust_timer(struct userdata *u) {
649    if (u->sink_input->state == PA_SINK_INPUT_CORKED || u->source_output->state == PA_SOURCE_OUTPUT_CORKED)
650        enable_adjust_timer(u, false);
651    else
652        enable_adjust_timer(u, true);
653}
654
655/* Called from main thread
656 * Calculates minimum and maximum possible latency for source and sink */
657static void update_latency_boundaries(struct userdata *u, pa_source *source, pa_sink *sink) {
658    const char *s;
659
660    if (source) {
661        /* Source latencies */
662        u->fixed_alsa_source = false;
663        if (source->flags & PA_SOURCE_DYNAMIC_LATENCY)
664            pa_source_get_latency_range(source, &u->min_source_latency, &u->max_source_latency);
665        else {
666            u->min_source_latency = pa_source_get_fixed_latency(source);
667            u->max_source_latency = u->min_source_latency;
668            if ((s = pa_proplist_gets(source->proplist, PA_PROP_DEVICE_API))) {
669                if (pa_streq(s, "alsa"))
670                    u->fixed_alsa_source = true;
671            }
672        }
673        /* Source offset */
674        u->source_latency_offset = source->port_latency_offset;
675
676        /* Latencies below 2.5 ms cause problems, limit source latency if possible */
677        if (u->max_source_latency >= MIN_DEVICE_LATENCY)
678            u->min_source_latency = PA_MAX(u->min_source_latency, MIN_DEVICE_LATENCY);
679        else
680            u->min_source_latency = u->max_source_latency;
681    }
682
683    if (sink) {
684        /* Sink latencies */
685        if (sink->flags & PA_SINK_DYNAMIC_LATENCY)
686            pa_sink_get_latency_range(sink, &u->min_sink_latency, &u->max_sink_latency);
687        else {
688            u->min_sink_latency = pa_sink_get_fixed_latency(sink);
689            u->max_sink_latency = u->min_sink_latency;
690        }
691        /* Sink offset */
692        u->sink_latency_offset = sink->port_latency_offset;
693
694        /* Latencies below 2.5 ms cause problems, limit sink latency if possible */
695        if (u->max_sink_latency >= MIN_DEVICE_LATENCY)
696            u->min_sink_latency = PA_MAX(u->min_sink_latency, MIN_DEVICE_LATENCY);
697        else
698            u->min_sink_latency = u->max_sink_latency;
699    }
700
701    update_minimum_latency(u, sink, true);
702}
703
704/* Called from output context
705 * Sets the memblockq to the configured latency corrected by latency_offset_usec */
706static void memblockq_adjust(struct userdata *u, int64_t latency_offset_usec, bool allow_push) {
707    size_t current_memblockq_length, requested_memblockq_length, buffer_correction;
708    int64_t requested_buffer_latency;
709    pa_usec_t final_latency, requested_sink_latency;
710
711    final_latency = PA_MAX(u->latency, u->output_thread_info.minimum_latency);
712
713    /* If source or sink have some large negative latency offset, we might want to
714     * hold more than final_latency in the memblockq */
715    requested_buffer_latency = (int64_t)final_latency - latency_offset_usec;
716
717    /* Keep at least one sink latency in the queue to make sure that the sink
718     * never underruns initially */
719    requested_sink_latency = pa_sink_get_requested_latency_within_thread(u->sink_input->sink);
720    if (requested_buffer_latency < (int64_t)requested_sink_latency)
721        requested_buffer_latency = requested_sink_latency;
722
723    requested_memblockq_length = pa_usec_to_bytes(requested_buffer_latency, &u->sink_input->sample_spec);
724    current_memblockq_length = pa_memblockq_get_length(u->memblockq);
725
726    if (current_memblockq_length > requested_memblockq_length) {
727        /* Drop audio from queue */
728        buffer_correction = current_memblockq_length - requested_memblockq_length;
729        pa_log_info("Dropping %" PRIu64 " usec of audio from queue", pa_bytes_to_usec(buffer_correction, &u->sink_input->sample_spec));
730        pa_memblockq_drop(u->memblockq, buffer_correction);
731
732    } else if (current_memblockq_length < requested_memblockq_length && allow_push) {
733        /* Add silence to queue */
734        buffer_correction = requested_memblockq_length - current_memblockq_length;
735        pa_log_info("Adding %" PRIu64 " usec of silence to queue", pa_bytes_to_usec(buffer_correction, &u->sink_input->sample_spec));
736        pa_memblockq_seek(u->memblockq, (int64_t)buffer_correction, PA_SEEK_RELATIVE, true);
737    }
738}
739
740/* Called from input thread context */
741static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
742    struct userdata *u;
743    pa_usec_t push_time;
744    int64_t current_source_latency;
745
746    pa_source_output_assert_ref(o);
747    pa_source_output_assert_io_context(o);
748    pa_assert_se(u = o->userdata);
749
750    /* Send current source latency and timestamp with the message */
751    push_time = pa_rtclock_now();
752    current_source_latency = pa_source_get_latency_within_thread(u->source_output->source, true);
753    current_source_latency += pa_resampler_get_delay_usec(u->source_output->thread_info.resampler);
754
755    pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_POST, PA_INT_TO_PTR(current_source_latency), push_time, chunk, NULL);
756    u->send_counter += (int64_t) chunk->length;
757}
758
759/* Called from input thread context */
760static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
761    struct userdata *u;
762
763    pa_source_output_assert_ref(o);
764    pa_source_output_assert_io_context(o);
765    pa_assert_se(u = o->userdata);
766
767    pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
768    u->send_counter -= (int64_t) nbytes;
769}
770
771/* Called from input thread context */
772static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
773    struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
774
775    switch (code) {
776
777        case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
778            size_t length;
779
780            length = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
781
782            u->latency_snapshot.send_counter = u->send_counter;
783            /* Add content of delay memblockq to the source latency */
784            u->latency_snapshot.source_latency = pa_source_get_latency_within_thread(u->source_output->source, true) +
785                                                 pa_bytes_to_usec(length, &u->source_output->source->sample_spec);
786            /* Add resampler latency */
787            u->latency_snapshot.source_latency += pa_resampler_get_delay_usec(u->source_output->thread_info.resampler);
788
789            u->latency_snapshot.source_timestamp = pa_rtclock_now();
790
791            return 0;
792        }
793    }
794
795    return pa_source_output_process_msg(obj, code, data, offset, chunk);
796}
797
798/* Called from main thread.
799 * Get current effective latency of the source. If the source is in use with
800 * smaller latency than the configured latency, it will continue running with
801 * the smaller value when the source output is switched to the source. */
802static void update_effective_source_latency(struct userdata *u, pa_source *source, pa_sink *sink) {
803    pa_usec_t effective_source_latency;
804
805    effective_source_latency = u->configured_source_latency;
806
807    if (source) {
808        effective_source_latency = pa_source_get_requested_latency(source);
809        if (effective_source_latency == 0 || effective_source_latency > u->configured_source_latency)
810            effective_source_latency = u->configured_source_latency;
811    }
812
813    /* If the sink is valid, send a message to the output thread, else set the variable directly */
814    if (sink)
815        pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY, NULL, (int64_t)effective_source_latency, NULL);
816    else
817       u->output_thread_info.effective_source_latency = effective_source_latency;
818}
819
820/* Called from main thread.
821 * Set source output latency to one third of the overall latency if possible.
822 * The choice of one third is rather arbitrary somewhere between the minimum
823 * possible latency which would cause a lot of CPU load and half the configured
824 * latency which would quickly lead to underruns */
825static void set_source_output_latency(struct userdata *u, pa_source *source) {
826    pa_usec_t latency, requested_latency;
827
828    requested_latency = u->latency / 3;
829
830    /* Normally we try to configure sink and source latency equally. If the
831     * sink latency cannot match the requested source latency try to set the
832     * source latency to a smaller value to avoid underruns */
833    if (u->min_sink_latency > requested_latency) {
834        latency = PA_MAX(u->latency, u->minimum_latency);
835        requested_latency = (latency - u->min_sink_latency) / 2;
836    }
837
838    latency = PA_CLAMP(requested_latency , u->min_source_latency, u->max_source_latency);
839    u->configured_source_latency = pa_source_output_set_requested_latency(u->source_output, latency);
840    if (u->configured_source_latency != requested_latency)
841        pa_log_warn("Cannot set requested source latency of %0.2f ms, adjusting to %0.2f ms", (double)requested_latency / PA_USEC_PER_MSEC, (double)u->configured_source_latency / PA_USEC_PER_MSEC);
842}
843
844/* Called from input thread context */
845static void source_output_attach_cb(pa_source_output *o) {
846    struct userdata *u;
847
848    pa_source_output_assert_ref(o);
849    pa_source_output_assert_io_context(o);
850    pa_assert_se(u = o->userdata);
851
852    u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
853            o->source->thread_info.rtpoll,
854            PA_RTPOLL_LATE,
855            u->asyncmsgq);
856}
857
858/* Called from input thread context */
859static void source_output_detach_cb(pa_source_output *o) {
860    struct userdata *u;
861
862    pa_source_output_assert_ref(o);
863    pa_source_output_assert_io_context(o);
864    pa_assert_se(u = o->userdata);
865
866    if (u->rtpoll_item_write) {
867        pa_rtpoll_item_free(u->rtpoll_item_write);
868        u->rtpoll_item_write = NULL;
869    }
870}
871
872/* Called from main thread */
873static void source_output_kill_cb(pa_source_output *o) {
874    struct userdata *u;
875
876    pa_source_output_assert_ref(o);
877    pa_assert_ctl_context();
878    pa_assert_se(u = o->userdata);
879
880    teardown(u);
881    pa_module_unload_request(u->module, true);
882}
883
884/* Called from main thread */
885static bool source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
886    struct userdata *u;
887
888    pa_source_output_assert_ref(o);
889    pa_assert_ctl_context();
890    pa_assert_se(u = o->userdata);
891
892    if (!u->sink_input || !u->sink_input->sink)
893        return true;
894
895    return dest != u->sink_input->sink->monitor_source;
896}
897
898/* Called from main thread */
899static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
900    struct userdata *u;
901    char *input_description;
902    const char *n;
903
904    if (!dest)
905        return;
906
907    pa_source_output_assert_ref(o);
908    pa_assert_ctl_context();
909    pa_assert_se(u = o->userdata);
910
911    input_description = pa_sprintf_malloc("Loopback of %s",
912                                          pa_strnull(pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION)));
913    pa_sink_input_set_property(u->sink_input, PA_PROP_MEDIA_NAME, input_description);
914    pa_xfree(input_description);
915
916    if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME)))
917        pa_sink_input_set_property(u->sink_input, PA_PROP_DEVICE_ICON_NAME, n);
918
919    /* Set latency and calculate latency limits */
920    u->underrun_latency_limit = 0;
921    u->last_source_latency_offset = dest->port_latency_offset;
922    u->initial_adjust_pending = true;
923    update_latency_boundaries(u, dest, u->sink_input->sink);
924    set_source_output_latency(u, dest);
925    update_effective_source_latency(u, dest, u->sink_input->sink);
926
927    /* Uncork the sink input unless the destination is suspended for other
928     * reasons than idle. */
929    if (dest->state == PA_SOURCE_SUSPENDED)
930        pa_sink_input_cork(u->sink_input, (dest->suspend_cause != PA_SUSPEND_IDLE));
931    else
932        pa_sink_input_cork(u->sink_input, false);
933
934    update_adjust_timer(u);
935
936    /* Reset counters */
937    u->iteration_counter = 0;
938    u->underrun_counter = 0;
939
940    /* Reset booleans, latency error and counters */
941    u->source_sink_changed = true;
942    u->underrun_occured = false;
943    u->source_latency_offset_changed = false;
944    u->target_latency_cross_counter = 0;
945    u->log_counter = u->log_interval;
946    u->latency_error = 0;
947
948    /* Send a mesage to the output thread that the source has changed.
949     * If the sink is invalid here during a profile switching situation
950     * we can safely set push_called to false directly. */
951    if (u->sink_input->sink)
952        pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL);
953    else
954        u->output_thread_info.push_called = false;
955
956    /* The sampling rate may be far away from the default rate if we are still
957     * recovering from a previous source or sink change, so reset rate to
958     * default before moving the source. */
959    pa_sink_input_set_rate(u->sink_input, u->source_output->sample_spec.rate);
960}
961
962/* Called from main thread */
963static void source_output_suspend_cb(pa_source_output *o, pa_source_state_t old_state, pa_suspend_cause_t old_suspend_cause) {
964    struct userdata *u;
965    bool suspended;
966
967    pa_source_output_assert_ref(o);
968    pa_assert_ctl_context();
969    pa_assert_se(u = o->userdata);
970
971    /* State has not changed, nothing to do */
972    if (old_state == o->source->state)
973        return;
974
975    suspended = (o->source->state == PA_SOURCE_SUSPENDED);
976
977    /* If the source has been suspended, we need to handle this like
978     * a source change when the source is resumed */
979    if (suspended) {
980        if (u->sink_input->sink)
981            pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_SOURCE_CHANGED, NULL, 0, NULL);
982        else
983            u->output_thread_info.push_called = false;
984
985    } else
986        /* Get effective source latency on unsuspend */
987        update_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
988
989    pa_sink_input_cork(u->sink_input, suspended);
990
991    update_adjust_timer(u);
992}
993
994/* Called from input thread context */
995static void update_source_latency_range_cb(pa_source_output *i) {
996    struct userdata *u;
997
998    pa_source_output_assert_ref(i);
999    pa_source_output_assert_io_context(i);
1000    pa_assert_se(u = i->userdata);
1001
1002    /* Source latency may have changed */
1003    pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED, NULL, 0, NULL, NULL);
1004}
1005
1006/* Called from output thread context */
1007static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1008    struct userdata *u;
1009
1010    pa_sink_input_assert_ref(i);
1011    pa_sink_input_assert_io_context(i);
1012    pa_assert_se(u = i->userdata);
1013    pa_assert(chunk);
1014
1015    /* It seems necessary to handle outstanding push messages here, though it is not clear
1016     * why. Removing this part leads to underruns when low latencies are configured. */
1017    u->output_thread_info.in_pop = true;
1018    while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
1019        ;
1020    u->output_thread_info.in_pop = false;
1021
1022    /* While pop has not been called, latency adjustments in SINK_INPUT_MESSAGE_POST are
1023     * enabled. Disable them on second pop and enable the final adjustment during the
1024     * next push. The adjustment must be done on the next push, because there is no way
1025     * to retrieve the source latency here. We are waiting for the second pop, because
1026     * the first pop may be called before the sink is actually started. */
1027    if (!u->output_thread_info.pop_called && u->output_thread_info.first_pop_done) {
1028        u->output_thread_info.pop_adjust = true;
1029        u->output_thread_info.pop_called = true;
1030    }
1031    u->output_thread_info.first_pop_done = true;
1032
1033    if (pa_memblockq_peek(u->memblockq, chunk) < 0) {
1034        pa_log_info("Could not peek into queue");
1035        return -1;
1036    }
1037
1038    chunk->length = PA_MIN(chunk->length, nbytes);
1039    pa_memblockq_drop(u->memblockq, chunk->length);
1040
1041    /* Adjust the memblockq to ensure that there is
1042     * enough data in the queue to avoid underruns. */
1043    if (!u->output_thread_info.push_called)
1044        memblockq_adjust(u, 0, true);
1045
1046    return 0;
1047}
1048
1049/* Called from output thread context */
1050static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1051    struct userdata *u;
1052
1053    pa_sink_input_assert_ref(i);
1054    pa_sink_input_assert_io_context(i);
1055    pa_assert_se(u = i->userdata);
1056
1057    pa_memblockq_rewind(u->memblockq, nbytes);
1058}
1059
1060/* Called from output thread context */
1061static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1062    struct userdata *u = PA_SINK_INPUT(obj)->userdata;
1063
1064    pa_sink_input_assert_io_context(u->sink_input);
1065
1066    switch (code) {
1067
1068        case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1069            pa_usec_t *r = data;
1070
1071            *r = pa_bytes_to_usec(pa_memblockq_get_length(u->memblockq), &u->sink_input->sample_spec);
1072
1073            /* Fall through, the default handler will add in the extra
1074             * latency added by the resampler */
1075            break;
1076        }
1077
1078        case SINK_INPUT_MESSAGE_POST:
1079
1080            pa_memblockq_push_align(u->memblockq, chunk);
1081
1082            /* If push has not been called yet, latency adjustments in sink_input_pop_cb()
1083             * are enabled. Disable them on first push and correct the memblockq. If pop
1084             * has not been called yet, wait until the pop_cb() requests the adjustment */
1085            if (u->output_thread_info.pop_called && (!u->output_thread_info.push_called || u->output_thread_info.pop_adjust)) {
1086                int64_t time_delta;
1087
1088                /* This is the source latency at the time push was called */
1089                time_delta = PA_PTR_TO_INT(data);
1090                /* Add the time between push and post */
1091                time_delta += pa_rtclock_now() - (pa_usec_t) offset;
1092                /* Add the sink and resampler latency */
1093                time_delta += pa_sink_get_latency_within_thread(u->sink_input->sink, true);
1094                time_delta += pa_resampler_get_delay_usec(u->sink_input->thread_info.resampler);
1095
1096                /* The source latency report includes the audio in the chunk,
1097                 * but since we already pushed the chunk to the memblockq, we need
1098                 * to subtract the chunk size from the source latency so that it
1099                 * won't be counted towards both the memblockq latency and the
1100                 * source latency.
1101                 *
1102                 * Sometimes the alsa source reports way too low latency (might
1103                 * be a bug in the alsa source code). This seems to happen when
1104                 * there's an overrun. As an attempt to detect overruns, we
1105                 * check if the chunk size is larger than the configured source
1106                 * latency. If so, we assume that the source should have pushed
1107                 * a chunk whose size equals the configured latency, so we
1108                 * modify time_delta only by that amount, which makes
1109                 * memblockq_adjust() drop more data than it would otherwise.
1110                 * This seems to work quite well, but it's possible that the
1111                 * next push also contains too much data, and in that case the
1112                 * resulting latency will be wrong. */
1113                if (pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec) > u->output_thread_info.effective_source_latency)
1114                    time_delta -= (int64_t)u->output_thread_info.effective_source_latency;
1115                else
1116                    time_delta -= (int64_t)pa_bytes_to_usec(chunk->length, &u->sink_input->sample_spec);
1117
1118                /* FIXME: We allow pushing silence here to fix up the latency. This
1119                 * might lead to a gap in the stream */
1120                memblockq_adjust(u, time_delta, true);
1121
1122                /* Notify main thread when the initial adjustment is done. */
1123                if (u->output_thread_info.pop_called)
1124                    pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_ADJUST_DONE, NULL, 0, NULL, NULL);
1125
1126                u->output_thread_info.pop_adjust = false;
1127                u->output_thread_info.push_called = true;
1128            }
1129
1130            /* If pop has not been called yet, make sure the latency does not grow too much.
1131             * Don't push any silence here, because we already have new data in the queue */
1132            if (!u->output_thread_info.pop_called)
1133                 memblockq_adjust(u, 0, false);
1134
1135            /* Is this the end of an underrun? Then let's start things
1136             * right-away */
1137            if (u->sink_input->sink->thread_info.state != PA_SINK_SUSPENDED &&
1138                u->sink_input->thread_info.underrun_for > 0 &&
1139                pa_memblockq_is_readable(u->memblockq) &&
1140                u->output_thread_info.pop_called) {
1141
1142                pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_UNDERRUN, NULL, 0, NULL, NULL);
1143                /* If called from within the pop callback skip the rewind */
1144                if (!u->output_thread_info.in_pop) {
1145                    pa_log_debug("Requesting rewind due to end of underrun.");
1146                    pa_sink_input_request_rewind(u->sink_input,
1147                                                 (size_t) (u->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : u->sink_input->thread_info.underrun_for),
1148                                                 false, true, false);
1149                }
1150            }
1151
1152            u->output_thread_info.recv_counter += (int64_t) chunk->length;
1153
1154            return 0;
1155
1156        case SINK_INPUT_MESSAGE_REWIND:
1157
1158            /* Do not try to rewind if no data was pushed yet */
1159            if (u->output_thread_info.push_called)
1160                pa_memblockq_seek(u->memblockq, -offset, PA_SEEK_RELATIVE, true);
1161
1162            u->output_thread_info.recv_counter -= offset;
1163
1164            return 0;
1165
1166        case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
1167            size_t length;
1168
1169            length = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
1170
1171            u->latency_snapshot.recv_counter = u->output_thread_info.recv_counter;
1172            u->latency_snapshot.loopback_memblockq_length = pa_memblockq_get_length(u->memblockq);
1173            /* Add content of render memblockq to sink latency */
1174            u->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(u->sink_input->sink, true) +
1175                                               pa_bytes_to_usec(length, &u->sink_input->sink->sample_spec);
1176            /* Add resampler latency */
1177            u->latency_snapshot.sink_latency += pa_resampler_get_delay_usec(u->sink_input->thread_info.resampler);
1178
1179            u->latency_snapshot.sink_timestamp = pa_rtclock_now();
1180
1181            return 0;
1182        }
1183
1184        case SINK_INPUT_MESSAGE_SOURCE_CHANGED:
1185
1186            u->output_thread_info.push_called = false;
1187
1188            return 0;
1189
1190        case SINK_INPUT_MESSAGE_SET_EFFECTIVE_SOURCE_LATENCY:
1191
1192            u->output_thread_info.effective_source_latency = (pa_usec_t)offset;
1193
1194            return 0;
1195
1196        case SINK_INPUT_MESSAGE_UPDATE_MIN_LATENCY:
1197
1198            u->output_thread_info.minimum_latency = (pa_usec_t)offset;
1199
1200            return 0;
1201
1202        case SINK_INPUT_MESSAGE_FAST_ADJUST:
1203
1204            memblockq_adjust(u, offset, true);
1205
1206            return 0;
1207    }
1208
1209    return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1210}
1211/* Called from main thread.
1212 * Set sink input latency to one third of the overall latency if possible.
1213 * The choice of one third is rather arbitrary somewhere between the minimum
1214 * possible latency which would cause a lot of CPU load and half the configured
1215 * latency which would quickly lead to underruns. */
1216static void set_sink_input_latency(struct userdata *u, pa_sink *sink) {
1217     pa_usec_t latency, requested_latency;
1218
1219    requested_latency = u->latency / 3;
1220
1221    /* Normally we try to configure sink and source latency equally. If the
1222     * source latency cannot match the requested sink latency try to set the
1223     * sink latency to a smaller value to avoid underruns */
1224    if (u->min_source_latency > requested_latency) {
1225        latency = PA_MAX(u->latency, u->minimum_latency);
1226        requested_latency = (latency - u->min_source_latency) / 2;
1227        /* In the case of a fixed alsa source, u->minimum_latency is calculated from
1228         * the default fragment size while u->min_source_latency is the reported minimum
1229         * of the source latency (nr_of_fragments * fragment_size). This can lead to a
1230         * situation where u->minimum_latency < u->min_source_latency. We only fall
1231         * back to use the fragment size instead of min_source_latency if the calculation
1232         * above does not deliver a usable result. */
1233        if (u->fixed_alsa_source && u->min_source_latency >= latency)
1234            requested_latency = (latency - u->core->default_fragment_size_msec * PA_USEC_PER_MSEC) / 2;
1235    }
1236
1237    latency = PA_CLAMP(requested_latency , u->min_sink_latency, u->max_sink_latency);
1238    u->configured_sink_latency = pa_sink_input_set_requested_latency(u->sink_input, latency);
1239    if (u->configured_sink_latency != requested_latency)
1240        pa_log_warn("Cannot set requested sink latency of %0.2f ms, adjusting to %0.2f ms", (double)requested_latency / PA_USEC_PER_MSEC, (double)u->configured_sink_latency / PA_USEC_PER_MSEC);
1241}
1242
1243/* Called from output thread context */
1244static void sink_input_attach_cb(pa_sink_input *i) {
1245    struct userdata *u;
1246
1247    pa_sink_input_assert_ref(i);
1248    pa_sink_input_assert_io_context(i);
1249    pa_assert_se(u = i->userdata);
1250
1251    u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1252            i->sink->thread_info.rtpoll,
1253            PA_RTPOLL_LATE,
1254            u->asyncmsgq);
1255
1256    pa_memblockq_set_prebuf(u->memblockq, pa_sink_input_get_max_request(i)*2);
1257    pa_memblockq_set_maxrewind(u->memblockq, pa_sink_input_get_max_rewind(i));
1258}
1259
1260/* Called from output thread context */
1261static void sink_input_detach_cb(pa_sink_input *i) {
1262    struct userdata *u;
1263
1264    pa_sink_input_assert_ref(i);
1265    pa_sink_input_assert_io_context(i);
1266    pa_assert_se(u = i->userdata);
1267
1268    if (u->rtpoll_item_read) {
1269        pa_rtpoll_item_free(u->rtpoll_item_read);
1270        u->rtpoll_item_read = NULL;
1271    }
1272}
1273
1274/* Called from output thread context */
1275static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1276    struct userdata *u;
1277
1278    pa_sink_input_assert_ref(i);
1279    pa_sink_input_assert_io_context(i);
1280    pa_assert_se(u = i->userdata);
1281
1282    pa_memblockq_set_maxrewind(u->memblockq, nbytes);
1283}
1284
1285/* Called from output thread context */
1286static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1287    struct userdata *u;
1288
1289    pa_sink_input_assert_ref(i);
1290    pa_sink_input_assert_io_context(i);
1291    pa_assert_se(u = i->userdata);
1292
1293    pa_memblockq_set_prebuf(u->memblockq, nbytes*2);
1294    pa_log_info("Max request changed");
1295}
1296
1297/* Called from main thread */
1298static void sink_input_kill_cb(pa_sink_input *i) {
1299    struct userdata *u;
1300
1301    pa_sink_input_assert_ref(i);
1302    pa_assert_ctl_context();
1303    pa_assert_se(u = i->userdata);
1304
1305    teardown(u);
1306    pa_module_unload_request(u->module, true);
1307}
1308
1309/* Called from the output thread context */
1310static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1311    struct userdata *u;
1312
1313    pa_sink_input_assert_ref(i);
1314    pa_assert_se(u = i->userdata);
1315
1316    if (state == PA_SINK_INPUT_UNLINKED)
1317        pa_asyncmsgq_flush(u->asyncmsgq, false);
1318}
1319
1320/* Called from main thread */
1321static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1322    struct userdata *u;
1323    char *output_description;
1324    const char *n;
1325
1326    if (!dest)
1327        return;
1328
1329    pa_sink_input_assert_ref(i);
1330    pa_assert_ctl_context();
1331    pa_assert_se(u = i->userdata);
1332
1333    output_description = pa_sprintf_malloc("Loopback to %s",
1334                                           pa_strnull(pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1335    pa_source_output_set_property(u->source_output, PA_PROP_MEDIA_NAME, output_description);
1336    pa_xfree(output_description);
1337
1338    if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME)))
1339        pa_source_output_set_property(u->source_output, PA_PROP_MEDIA_ICON_NAME, n);
1340
1341    /* Set latency and calculate latency limits */
1342    u->underrun_latency_limit = 0;
1343    u->last_sink_latency_offset = dest->port_latency_offset;
1344    u->initial_adjust_pending = true;
1345    update_latency_boundaries(u, NULL, dest);
1346    set_sink_input_latency(u, dest);
1347    update_effective_source_latency(u, u->source_output->source, dest);
1348
1349    /* Uncork the source output unless the destination is suspended for other
1350     * reasons than idle */
1351    if (dest->state == PA_SINK_SUSPENDED)
1352        pa_source_output_cork(u->source_output, (dest->suspend_cause != PA_SUSPEND_IDLE));
1353    else
1354        pa_source_output_cork(u->source_output, false);
1355
1356    update_adjust_timer(u);
1357
1358    /* Reset counters */
1359    u->iteration_counter = 0;
1360    u->underrun_counter = 0;
1361
1362    /* Reset booleans, latency error and counters */
1363    u->source_sink_changed = true;
1364    u->underrun_occured = false;
1365    u->sink_latency_offset_changed = false;
1366    u->target_latency_cross_counter = 0;
1367    u->log_counter = u->log_interval;
1368    u->latency_error = 0;
1369
1370    u->output_thread_info.pop_called = false;
1371    u->output_thread_info.first_pop_done = false;
1372
1373    /* Sample rate may be far away from the default rate if we are still
1374     * recovering from a previous source or sink change, so reset rate to
1375     * default before moving the sink. */
1376    pa_sink_input_set_rate(u->sink_input, u->source_output->sample_spec.rate);
1377}
1378
1379/* Called from main thread */
1380static bool sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1381    struct userdata *u;
1382
1383    pa_sink_input_assert_ref(i);
1384    pa_assert_ctl_context();
1385    pa_assert_se(u = i->userdata);
1386
1387    if (!u->source_output || !u->source_output->source)
1388        return true;
1389
1390    return dest != u->source_output->source->monitor_of;
1391}
1392
1393/* Called from main thread */
1394static void sink_input_suspend_cb(pa_sink_input *i, pa_sink_state_t old_state, pa_suspend_cause_t old_suspend_cause) {
1395    struct userdata *u;
1396    bool suspended;
1397
1398    pa_sink_input_assert_ref(i);
1399    pa_assert_ctl_context();
1400    pa_assert_se(u = i->userdata);
1401
1402    /* State has not changed, nothing to do */
1403    if (old_state == i->sink->state)
1404        return;
1405
1406    suspended = (i->sink->state == PA_SINK_SUSPENDED);
1407
1408    /* If the sink has been suspended, we need to handle this like
1409     * a sink change when the sink is resumed. Because the sink
1410     * is suspended, we can set the variables directly. */
1411    if (suspended) {
1412        u->output_thread_info.pop_called = false;
1413        u->output_thread_info.first_pop_done = false;
1414
1415    } else
1416        /* Set effective source latency on unsuspend */
1417        update_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
1418
1419    pa_source_output_cork(u->source_output, suspended);
1420
1421    update_adjust_timer(u);
1422}
1423
1424/* Called from output thread context */
1425static void update_sink_latency_range_cb(pa_sink_input *i) {
1426    struct userdata *u;
1427
1428    pa_sink_input_assert_ref(i);
1429    pa_sink_input_assert_io_context(i);
1430    pa_assert_se(u = i->userdata);
1431
1432    /* Sink latency may have changed */
1433    pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED, NULL, 0, NULL, NULL);
1434}
1435
1436/* Called from main context */
1437static int loopback_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1438    struct loopback_msg *msg;
1439    struct userdata *u;
1440    pa_usec_t current_latency;
1441
1442    pa_assert(o);
1443    pa_assert_ctl_context();
1444
1445    msg = LOOPBACK_MSG(o);
1446
1447    /* If messages are processed after a module unload request, they
1448     * must be ignored. */
1449    if (msg->dead)
1450        return 0;
1451
1452    pa_assert_se(u = msg->userdata);
1453
1454    switch (code) {
1455
1456        case LOOPBACK_MESSAGE_SOURCE_LATENCY_RANGE_CHANGED:
1457
1458            update_effective_source_latency(u, u->source_output->source, u->sink_input->sink);
1459            current_latency = pa_source_get_requested_latency(u->source_output->source);
1460            if (current_latency > u->configured_source_latency) {
1461                /* The minimum latency has changed to a value larger than the configured latency, so
1462                 * the source latency has been increased. The case that the minimum latency changes
1463                 * back to a smaller value is not handled because this never happens with the current
1464                 * source implementations. */
1465                pa_log_warn("Source minimum latency increased to %0.2f ms", (double)current_latency / PA_USEC_PER_MSEC);
1466                u->configured_source_latency = current_latency;
1467                update_latency_boundaries(u, u->source_output->source, u->sink_input->sink);
1468                /* We re-start counting when the latency has changed */
1469                u->iteration_counter = 0;
1470                u->underrun_counter = 0;
1471            }
1472
1473            return 0;
1474
1475        case LOOPBACK_MESSAGE_SINK_LATENCY_RANGE_CHANGED:
1476
1477            current_latency = pa_sink_get_requested_latency(u->sink_input->sink);
1478            if (current_latency > u->configured_sink_latency) {
1479                /* The minimum latency has changed to a value larger than the configured latency, so
1480                 * the sink latency has been increased. The case that the minimum latency changes back
1481                 * to a smaller value is not handled because this never happens with the current sink
1482                 * implementations. */
1483                pa_log_warn("Sink minimum latency increased to %0.2f ms", (double)current_latency / PA_USEC_PER_MSEC);
1484                u->configured_sink_latency = current_latency;
1485                update_latency_boundaries(u, u->source_output->source, u->sink_input->sink);
1486                /* We re-start counting when the latency has changed */
1487                u->iteration_counter = 0;
1488                u->underrun_counter = 0;
1489            }
1490
1491            return 0;
1492
1493        case LOOPBACK_MESSAGE_UNDERRUN:
1494
1495            u->underrun_counter++;
1496            u->underrun_occured = true;
1497            u->target_latency_cross_counter = 0;
1498            pa_log_debug("Underrun detected, counter incremented to %u", u->underrun_counter);
1499
1500            return 0;
1501
1502        case LOOPBACK_MESSAGE_ADJUST_DONE:
1503
1504            u->initial_adjust_pending = false;
1505
1506            return 0;
1507
1508    }
1509
1510    return 0;
1511}
1512
1513/* Called from main thread */
1514static pa_hook_result_t sink_port_latency_offset_changed_cb(pa_core *core, pa_sink *sink, struct userdata *u) {
1515
1516    if (sink != u->sink_input->sink)
1517        return PA_HOOK_OK;
1518
1519    if (!u->sink_latency_offset_changed)
1520        u->last_sink_latency_offset = u->sink_latency_offset;
1521    u->sink_latency_offset_changed = true;
1522    u->sink_latency_offset = sink->port_latency_offset;
1523    update_minimum_latency(u, sink, true);
1524
1525    /* We might need to adjust again, reset counter */
1526    u->target_latency_cross_counter = 0;
1527
1528    return PA_HOOK_OK;
1529}
1530
1531/* Called from main thread */
1532static pa_hook_result_t source_port_latency_offset_changed_cb(pa_core *core, pa_source *source, struct userdata *u) {
1533
1534    if (source != u->source_output->source)
1535        return PA_HOOK_OK;
1536
1537    if (!u->source_latency_offset_changed)
1538        u->last_source_latency_offset = u->source_latency_offset;
1539    u->source_latency_offset_changed = true;
1540    u->source_latency_offset = source->port_latency_offset;
1541    update_minimum_latency(u, u->sink_input->sink, true);
1542
1543    /* We might need to adjust again, reset counter */
1544    u->target_latency_cross_counter = 0;
1545
1546    return PA_HOOK_OK;
1547}
1548
1549int pa__init(pa_module *m) {
1550    pa_modargs *ma = NULL;
1551    struct userdata *u;
1552    pa_sink *sink = NULL;
1553    pa_sink_input_new_data sink_input_data;
1554    bool sink_dont_move;
1555    pa_source *source = NULL;
1556    pa_source_output_new_data source_output_data;
1557    bool source_dont_move;
1558    uint32_t latency_msec;
1559    uint32_t max_latency_msec;
1560    uint32_t fast_adjust_threshold;
1561    uint32_t adjust_threshold;
1562    pa_sample_spec ss;
1563    pa_channel_map map;
1564    bool format_set = false;
1565    bool rate_set = false;
1566    bool channels_set = false;
1567    pa_memchunk silence;
1568    double adjust_time_sec;
1569    double log_interval_sec;
1570    const char *n;
1571    bool remix = true;
1572
1573    pa_assert(m);
1574
1575    if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1576        pa_log("Failed to parse module arguments");
1577        goto fail;
1578    }
1579
1580    n = pa_modargs_get_value(ma, "source", NULL);
1581    if (n && !(source = pa_namereg_get(m->core, n, PA_NAMEREG_SOURCE))) {
1582        pa_log("No such source.");
1583        goto fail;
1584    }
1585
1586    n = pa_modargs_get_value(ma, "sink", NULL);
1587    if (n && !(sink = pa_namereg_get(m->core, n, PA_NAMEREG_SINK))) {
1588        pa_log("No such sink.");
1589        goto fail;
1590    }
1591
1592    if (pa_modargs_get_value_boolean(ma, "remix", &remix) < 0) {
1593        pa_log("Invalid boolean remix parameter");
1594        goto fail;
1595    }
1596
1597    if (source) {
1598        ss = source->sample_spec;
1599        map = source->channel_map;
1600        format_set = true;
1601        rate_set = true;
1602        channels_set = true;
1603    } else if (sink) {
1604        ss = sink->sample_spec;
1605        map = sink->channel_map;
1606        format_set = true;
1607        rate_set = true;
1608        channels_set = true;
1609    } else {
1610        /* FIXME: Dummy stream format, needed because pa_sink_input_new()
1611         * requires valid sample spec and channel map even when all the FIX_*
1612         * stream flags are specified. pa_sink_input_new() should be changed
1613         * to ignore the sample spec and channel map when the FIX_* flags are
1614         * present. */
1615        ss.format = PA_SAMPLE_U8;
1616        ss.rate = 8000;
1617        ss.channels = 1;
1618        map.channels = 1;
1619        map.map[0] = PA_CHANNEL_POSITION_MONO;
1620    }
1621
1622    if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1623        pa_log("Invalid sample format specification or channel map");
1624        goto fail;
1625    }
1626
1627    if (ss.rate < 4000 || ss.rate > PA_RATE_MAX) {
1628        pa_log("Invalid rate specification, valid range is 4000 Hz to %i Hz", PA_RATE_MAX);
1629        goto fail;
1630    }
1631
1632    if (pa_modargs_get_value(ma, "format", NULL))
1633        format_set = true;
1634
1635    if (pa_modargs_get_value(ma, "rate", NULL))
1636        rate_set = true;
1637
1638    if (pa_modargs_get_value(ma, "channels", NULL) || pa_modargs_get_value(ma, "channel_map", NULL))
1639        channels_set = true;
1640
1641    adjust_threshold = DEFAULT_ADJUST_THRESHOLD_USEC;
1642    if (pa_modargs_get_value_u32(ma, "adjust_threshold_usec", &adjust_threshold) < 0 || adjust_threshold < 1 || adjust_threshold > 10000) {
1643        pa_log_info("Invalid adjust threshold specification");
1644        goto fail;
1645    }
1646
1647    latency_msec = DEFAULT_LATENCY_MSEC;
1648    if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 30000) {
1649        pa_log("Invalid latency specification");
1650        goto fail;
1651    }
1652
1653    fast_adjust_threshold = 0;
1654    if (pa_modargs_get_value_u32(ma, "fast_adjust_threshold_msec", &fast_adjust_threshold) < 0 || (fast_adjust_threshold != 0 && fast_adjust_threshold < 100)) {
1655        pa_log("Invalid fast adjust threshold specification");
1656        goto fail;
1657    }
1658
1659    max_latency_msec = 0;
1660    if (pa_modargs_get_value_u32(ma, "max_latency_msec", &max_latency_msec) < 0) {
1661        pa_log("Invalid maximum latency specification");
1662        goto fail;
1663    }
1664
1665    if (max_latency_msec > 0 && max_latency_msec < latency_msec) {
1666        pa_log_warn("Configured maximum latency is smaller than latency, using latency instead");
1667        max_latency_msec = latency_msec;
1668    }
1669
1670    m->userdata = u = pa_xnew0(struct userdata, 1);
1671    u->core = m->core;
1672    u->module = m;
1673    u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
1674    u->max_latency = (pa_usec_t) max_latency_msec * PA_USEC_PER_MSEC;
1675    u->output_thread_info.pop_called = false;
1676    u->output_thread_info.pop_adjust = false;
1677    u->output_thread_info.push_called = false;
1678    u->iteration_counter = 0;
1679    u->underrun_counter = 0;
1680    u->underrun_latency_limit = 0;
1681    u->source_sink_changed = true;
1682    u->real_adjust_time_sum = 0;
1683    u->adjust_counter = 0;
1684    u->fast_adjust_threshold = fast_adjust_threshold * PA_USEC_PER_MSEC;
1685    u->underrun_occured = false;
1686    u->source_latency_offset_changed = false;
1687    u->sink_latency_offset_changed = false;
1688    u->latency_error = 0;
1689    u->adjust_threshold = adjust_threshold;
1690    u->target_latency_cross_counter = 0;
1691    u->initial_adjust_pending = true;
1692
1693    adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1694    if (pa_modargs_get_value_double(ma, "adjust_time", &adjust_time_sec) < 0) {
1695        pa_log("Failed to parse adjust_time value");
1696        goto fail;
1697    }
1698
1699    /* Allow values >= 0.1 and also 0 which means no adjustment */
1700    if (adjust_time_sec < 0.1) {
1701        if (adjust_time_sec < 0 || adjust_time_sec > 0) {
1702            pa_log("Failed to parse adjust_time value");
1703            goto fail;
1704        }
1705    }
1706
1707    u->adjust_time = adjust_time_sec * PA_USEC_PER_SEC;
1708    u->real_adjust_time = u->adjust_time;
1709
1710    pa_source_output_new_data_init(&source_output_data);
1711    source_output_data.driver = __FILE__;
1712    source_output_data.module = m;
1713    if (source)
1714        pa_source_output_new_data_set_source(&source_output_data, source, false, true);
1715
1716    if (pa_modargs_get_proplist(ma, "source_output_properties", source_output_data.proplist, PA_UPDATE_REPLACE) < 0) {
1717        pa_log("Failed to parse the source_output_properties value.");
1718        pa_source_output_new_data_done(&source_output_data);
1719        goto fail;
1720    }
1721
1722    if (!pa_proplist_contains(source_output_data.proplist, PA_PROP_MEDIA_ROLE))
1723        pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "abstract");
1724
1725    pa_source_output_new_data_set_sample_spec(&source_output_data, &ss);
1726    pa_source_output_new_data_set_channel_map(&source_output_data, &map);
1727    source_output_data.flags = PA_SOURCE_OUTPUT_START_CORKED;
1728
1729    if (!remix)
1730        source_output_data.flags |= PA_SOURCE_OUTPUT_NO_REMIX;
1731
1732    if (!format_set)
1733        source_output_data.flags |= PA_SOURCE_OUTPUT_FIX_FORMAT;
1734
1735    if (!rate_set)
1736        source_output_data.flags |= PA_SOURCE_OUTPUT_FIX_RATE;
1737
1738    if (!channels_set)
1739        source_output_data.flags |= PA_SOURCE_OUTPUT_FIX_CHANNELS;
1740
1741    source_dont_move = false;
1742    if (pa_modargs_get_value_boolean(ma, "source_dont_move", &source_dont_move) < 0) {
1743        pa_log("source_dont_move= expects a boolean argument.");
1744        goto fail;
1745    }
1746
1747    if (source_dont_move)
1748        source_output_data.flags |= PA_SOURCE_OUTPUT_DONT_MOVE;
1749
1750    pa_source_output_new(&u->source_output, m->core, &source_output_data);
1751    pa_source_output_new_data_done(&source_output_data);
1752
1753    if (!u->source_output)
1754        goto fail;
1755
1756    u->source_output->parent.process_msg = source_output_process_msg_cb;
1757    u->source_output->push = source_output_push_cb;
1758    u->source_output->process_rewind = source_output_process_rewind_cb;
1759    u->source_output->kill = source_output_kill_cb;
1760    u->source_output->attach = source_output_attach_cb;
1761    u->source_output->detach = source_output_detach_cb;
1762    u->source_output->may_move_to = source_output_may_move_to_cb;
1763    u->source_output->moving = source_output_moving_cb;
1764    u->source_output->suspend = source_output_suspend_cb;
1765    u->source_output->update_source_latency_range = update_source_latency_range_cb;
1766    u->source_output->update_source_fixed_latency = update_source_latency_range_cb;
1767    u->source_output->userdata = u;
1768
1769    /* If format, rate or channels were originally unset, they are set now
1770     * after the pa_source_output_new() call. */
1771    ss = u->source_output->sample_spec;
1772    map = u->source_output->channel_map;
1773
1774    /* Get log interval, default is 0, which means no logging */
1775    log_interval_sec = 0;
1776    if (pa_modargs_get_value_double(ma, "log_interval", &log_interval_sec) < 0) {
1777        pa_log_info("Invalid log interval specification");
1778        goto fail;
1779    }
1780
1781    /* Allow values >= 0.1 and also 0 */
1782    if (log_interval_sec < 0.1) {
1783        if (log_interval_sec < 0 || log_interval_sec > 0) {
1784            pa_log("Failed to parse log_interval value");
1785            goto fail;
1786        }
1787    }
1788
1789    /* Estimate number of iterations for logging. */
1790    u->log_interval = 0;
1791    if (u->adjust_time != 0 && log_interval_sec != 0) {
1792        u->log_interval = (int)(log_interval_sec * PA_USEC_PER_SEC / u->adjust_time + 0.5);
1793        /* Logging was specified, but log interval parameter was too small,
1794         * therefore log on every iteration */
1795        if (u->log_interval == 0)
1796            u->log_interval = 1;
1797    }
1798    u->log_counter = u->log_interval;
1799
1800    pa_sink_input_new_data_init(&sink_input_data);
1801    sink_input_data.driver = __FILE__;
1802    sink_input_data.module = m;
1803
1804    if (sink)
1805        pa_sink_input_new_data_set_sink(&sink_input_data, sink, false, true);
1806
1807    if (pa_modargs_get_proplist(ma, "sink_input_properties", sink_input_data.proplist, PA_UPDATE_REPLACE) < 0) {
1808        pa_log("Failed to parse the sink_input_properties value.");
1809        pa_sink_input_new_data_done(&sink_input_data);
1810        goto fail;
1811    }
1812
1813    if (!pa_proplist_contains(sink_input_data.proplist, PA_PROP_MEDIA_ROLE))
1814        pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "abstract");
1815
1816    pa_sink_input_new_data_set_sample_spec(&sink_input_data, &ss);
1817    pa_sink_input_new_data_set_channel_map(&sink_input_data, &map);
1818    sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE | PA_SINK_INPUT_START_CORKED;
1819
1820    if (!remix)
1821        sink_input_data.flags |= PA_SINK_INPUT_NO_REMIX;
1822
1823    sink_dont_move = false;
1824    if (pa_modargs_get_value_boolean(ma, "sink_dont_move", &sink_dont_move) < 0) {
1825        pa_log("sink_dont_move= expects a boolean argument.");
1826        goto fail;
1827    }
1828
1829    if (sink_dont_move)
1830        sink_input_data.flags |= PA_SINK_INPUT_DONT_MOVE;
1831
1832    pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1833    pa_sink_input_new_data_done(&sink_input_data);
1834
1835    if (!u->sink_input)
1836        goto fail;
1837
1838    u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1839    u->sink_input->pop = sink_input_pop_cb;
1840    u->sink_input->process_rewind = sink_input_process_rewind_cb;
1841    u->sink_input->kill = sink_input_kill_cb;
1842    u->sink_input->state_change = sink_input_state_change_cb;
1843    u->sink_input->attach = sink_input_attach_cb;
1844    u->sink_input->detach = sink_input_detach_cb;
1845    u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1846    u->sink_input->update_max_request = sink_input_update_max_request_cb;
1847    u->sink_input->may_move_to = sink_input_may_move_to_cb;
1848    u->sink_input->moving = sink_input_moving_cb;
1849    u->sink_input->suspend = sink_input_suspend_cb;
1850    u->sink_input->update_sink_latency_range = update_sink_latency_range_cb;
1851    u->sink_input->update_sink_fixed_latency = update_sink_latency_range_cb;
1852    u->sink_input->userdata = u;
1853
1854    u->last_source_latency_offset = u->source_output->source->port_latency_offset;
1855    u->last_sink_latency_offset = u->sink_input->sink->port_latency_offset;
1856    update_latency_boundaries(u, u->source_output->source, u->sink_input->sink);
1857    set_sink_input_latency(u, u->sink_input->sink);
1858    set_source_output_latency(u, u->source_output->source);
1859
1860    pa_sink_input_get_silence(u->sink_input, &silence);
1861    u->memblockq = pa_memblockq_new(
1862            "module-loopback memblockq",
1863            0,                      /* idx */
1864            MEMBLOCKQ_MAXLENGTH,    /* maxlength */
1865            MEMBLOCKQ_MAXLENGTH,    /* tlength */
1866            &ss,                    /* sample_spec */
1867            0,                      /* prebuf */
1868            0,                      /* minreq */
1869            0,                      /* maxrewind */
1870            &silence);              /* silence frame */
1871    pa_memblock_unref(silence.memblock);
1872    /* Fill the memblockq with silence */
1873    pa_memblockq_seek(u->memblockq, pa_usec_to_bytes(u->latency, &u->sink_input->sample_spec), PA_SEEK_RELATIVE, true);
1874
1875    u->asyncmsgq = pa_asyncmsgq_new(0);
1876    if (!u->asyncmsgq) {
1877        pa_log("pa_asyncmsgq_new() failed.");
1878        goto fail;
1879    }
1880
1881    if (!pa_proplist_contains(u->source_output->proplist, PA_PROP_MEDIA_NAME))
1882        pa_proplist_setf(u->source_output->proplist, PA_PROP_MEDIA_NAME, "Loopback to %s",
1883                         pa_strnull(pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1884
1885    if (!pa_proplist_contains(u->source_output->proplist, PA_PROP_MEDIA_ICON_NAME)
1886            && (n = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_ICON_NAME)))
1887        pa_proplist_sets(u->source_output->proplist, PA_PROP_MEDIA_ICON_NAME, n);
1888
1889    if (!pa_proplist_contains(u->sink_input->proplist, PA_PROP_MEDIA_NAME))
1890        pa_proplist_setf(u->sink_input->proplist, PA_PROP_MEDIA_NAME, "Loopback from %s",
1891                         pa_strnull(pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION)));
1892
1893    if (source && !pa_proplist_contains(u->sink_input->proplist, PA_PROP_MEDIA_ICON_NAME)
1894            && (n = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_ICON_NAME)))
1895        pa_proplist_sets(u->sink_input->proplist, PA_PROP_MEDIA_ICON_NAME, n);
1896
1897    /* Hooks to track changes of latency offsets */
1898    pa_module_hook_connect(m, &m->core->hooks[PA_CORE_HOOK_SINK_PORT_LATENCY_OFFSET_CHANGED],
1899                           PA_HOOK_NORMAL, (pa_hook_cb_t) sink_port_latency_offset_changed_cb, u);
1900    pa_module_hook_connect(m, &m->core->hooks[PA_CORE_HOOK_SOURCE_PORT_LATENCY_OFFSET_CHANGED],
1901                           PA_HOOK_NORMAL, (pa_hook_cb_t) source_port_latency_offset_changed_cb, u);
1902
1903    /* Setup message handler for main thread */
1904    u->msg = pa_msgobject_new(loopback_msg);
1905    u->msg->parent.process_msg = loopback_process_msg_cb;
1906    u->msg->userdata = u;
1907    u->msg->dead = false;
1908
1909    /* The output thread is not yet running, set effective_source_latency directly */
1910    update_effective_source_latency(u, u->source_output->source, NULL);
1911
1912    pa_sink_input_put(u->sink_input);
1913    pa_source_output_put(u->source_output);
1914
1915    if (u->source_output->source->state != PA_SOURCE_SUSPENDED)
1916        pa_sink_input_cork(u->sink_input, false);
1917
1918    if (u->sink_input->sink->state != PA_SINK_SUSPENDED)
1919        pa_source_output_cork(u->source_output, false);
1920
1921    update_adjust_timer(u);
1922
1923    pa_modargs_free(ma);
1924    return 0;
1925
1926fail:
1927    if (ma)
1928        pa_modargs_free(ma);
1929
1930    pa__done(m);
1931
1932    return -1;
1933}
1934
1935void pa__done(pa_module*m) {
1936    struct userdata *u;
1937
1938    pa_assert(m);
1939
1940    if (!(u = m->userdata))
1941        return;
1942
1943    teardown(u);
1944
1945    if (u->memblockq)
1946        pa_memblockq_free(u->memblockq);
1947
1948    if (u->asyncmsgq)
1949        pa_asyncmsgq_unref(u->asyncmsgq);
1950
1951    if (u->msg)
1952        loopback_msg_unref(u->msg);
1953
1954    pa_xfree(u);
1955}
1956