1/***
2    This file is part of PulseAudio.
3
4    Copyright 2010 Wim Taymans <wim.taymans@gmail.com>
5
6    Based on module-virtual-sink.c
7             module-virtual-source.c
8             module-loopback.c
9
10        Copyright 2010 Intel Corporation
11        Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
12
13    PulseAudio is free software; you can redistribute it and/or modify
14    it under the terms of the GNU Lesser General Public License as published
15    by the Free Software Foundation; either version 2.1 of the License,
16    or (at your option) any later version.
17
18    PulseAudio is distributed in the hope that it will be useful, but
19    WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21    General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public License
24    along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
25***/
26
27#ifdef HAVE_CONFIG_H
28#include <config.h>
29#endif
30
31#include <stdio.h>
32#include <math.h>
33
34#include "echo-cancel.h"
35
36#include <pulse/xmalloc.h>
37#include <pulse/timeval.h>
38#include <pulse/rtclock.h>
39
40#include <pulsecore/i18n.h>
41#include <pulsecore/atomic.h>
42#include <pulsecore/macro.h>
43#include <pulsecore/namereg.h>
44#include <pulsecore/sink.h>
45#include <pulsecore/module.h>
46#include <pulsecore/core-rtclock.h>
47#include <pulsecore/core-util.h>
48#include <pulsecore/modargs.h>
49#include <pulsecore/log.h>
50#include <pulsecore/rtpoll.h>
51#include <pulsecore/sample-util.h>
52#include <pulsecore/ltdl-helper.h>
53
54PA_MODULE_AUTHOR("Wim Taymans");
55PA_MODULE_DESCRIPTION("Echo Cancellation");
56PA_MODULE_VERSION(PACKAGE_VERSION);
57PA_MODULE_LOAD_ONCE(false);
58PA_MODULE_USAGE(
59        _("source_name=<name for the source> "
60          "source_properties=<properties for the source> "
61          "source_master=<name of source to filter> "
62          "sink_name=<name for the sink> "
63          "sink_properties=<properties for the sink> "
64          "sink_master=<name of sink to filter> "
65          "adjust_time=<how often to readjust rates in s> "
66          "adjust_threshold=<how much drift to readjust after in ms> "
67          "format=<sample format> "
68          "rate=<sample rate> "
69          "channels=<number of channels> "
70          "channel_map=<channel map> "
71          "aec_method=<implementation to use> "
72          "aec_args=<parameters for the AEC engine> "
73          "save_aec=<save AEC data in /tmp> "
74          "autoloaded=<set if this module is being loaded automatically> "
75          "use_volume_sharing=<yes or no> "
76          "use_master_format=<yes or no> "
77        ));
78
79/* NOTE: Make sure the enum and ec_table are maintained in the correct order */
80typedef enum {
81    PA_ECHO_CANCELLER_INVALID = -1,
82    PA_ECHO_CANCELLER_NULL,
83#ifdef HAVE_SPEEX
84    PA_ECHO_CANCELLER_SPEEX,
85#endif
86#ifdef HAVE_ADRIAN_EC
87    PA_ECHO_CANCELLER_ADRIAN,
88#endif
89#ifdef HAVE_WEBRTC
90    PA_ECHO_CANCELLER_WEBRTC,
91#endif
92} pa_echo_canceller_method_t;
93
94#ifdef HAVE_WEBRTC
95#define DEFAULT_ECHO_CANCELLER "webrtc"
96#else
97#define DEFAULT_ECHO_CANCELLER "speex"
98#endif
99
100static const pa_echo_canceller ec_table[] = {
101    {
102        /* Null, Dummy echo canceller (just copies data) */
103        .init                   = pa_null_ec_init,
104        .run                    = pa_null_ec_run,
105        .done                   = pa_null_ec_done,
106    },
107#ifdef HAVE_SPEEX
108    {
109        /* Speex */
110        .init                   = pa_speex_ec_init,
111        .run                    = pa_speex_ec_run,
112        .done                   = pa_speex_ec_done,
113    },
114#endif
115#ifdef HAVE_ADRIAN_EC
116    {
117        /* Adrian Andre's NLMS implementation */
118        .init                   = pa_adrian_ec_init,
119        .run                    = pa_adrian_ec_run,
120        .done                   = pa_adrian_ec_done,
121    },
122#endif
123#ifdef HAVE_WEBRTC
124    {
125        /* WebRTC's audio processing engine */
126        .init                   = pa_webrtc_ec_init,
127        .play                   = pa_webrtc_ec_play,
128        .record                 = pa_webrtc_ec_record,
129        .set_drift              = pa_webrtc_ec_set_drift,
130        .run                    = pa_webrtc_ec_run,
131        .done                   = pa_webrtc_ec_done,
132    },
133#endif
134};
135
136#define DEFAULT_RATE 32000
137#define DEFAULT_CHANNELS 1
138#define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
139#define DEFAULT_ADJUST_TOLERANCE (5*PA_USEC_PER_MSEC)
140#define DEFAULT_SAVE_AEC false
141#define DEFAULT_AUTOLOADED false
142#define DEFAULT_USE_MASTER_FORMAT false
143
144#define MEMBLOCKQ_MAXLENGTH (16*1024*1024)
145
146#define MAX_LATENCY_BLOCKS 10
147
148/* Can only be used in main context */
149#define IS_ACTIVE(u) (((u)->source->state == PA_SOURCE_RUNNING) && \
150                      ((u)->sink->state == PA_SINK_RUNNING))
151
152/* This module creates a new (virtual) source and sink.
153 *
154 * The data sent to the new sink is kept in a memblockq before being
155 * forwarded to the real sink_master.
156 *
157 * Data read from source_master is matched against the saved sink data and
158 * echo canceled data is then pushed onto the new source.
159 *
160 * Both source and sink masters have their own threads to push/pull data
161 * respectively. We however perform all our actions in the source IO thread.
162 * To do this we send all played samples to the source IO thread where they
163 * are then pushed into the memblockq.
164 *
165 * Alignment is performed in two steps:
166 *
167 * 1) when something happens that requires quick adjustment of the alignment of
168 *    capture and playback samples, we perform a resync. This adjusts the
169 *    position in the playback memblock to the requested sample. Quick
170 *    adjustments include moving the playback samples before the capture
171 *    samples (because else the echo canceller does not work) or when the
172 *    playback pointer drifts too far away.
173 *
174 * 2) periodically check the difference between capture and playback. We use a
175 *    low and high watermark for adjusting the alignment. Playback should always
176 *    be before capture and the difference should not be bigger than one frame
177 *    size. We would ideally like to resample the sink_input but most driver
178 *    don't give enough accuracy to be able to do that right now.
179 */
180
181struct userdata;
182
183struct pa_echo_canceller_msg {
184    pa_msgobject parent;
185    bool dead;
186    struct userdata *userdata;
187};
188
189PA_DEFINE_PRIVATE_CLASS(pa_echo_canceller_msg, pa_msgobject);
190#define PA_ECHO_CANCELLER_MSG(o) (pa_echo_canceller_msg_cast(o))
191
192struct snapshot {
193    pa_usec_t sink_now;
194    pa_usec_t sink_latency;
195    size_t sink_delay;
196    int64_t send_counter;
197
198    pa_usec_t source_now;
199    pa_usec_t source_latency;
200    size_t source_delay;
201    int64_t recv_counter;
202    size_t rlen;
203    size_t plen;
204};
205
206struct userdata {
207    pa_core *core;
208    pa_module *module;
209
210    bool dead;
211    bool save_aec;
212
213    pa_echo_canceller *ec;
214    uint32_t source_output_blocksize;
215    uint32_t source_blocksize;
216    uint32_t sink_blocksize;
217
218    bool need_realign;
219
220    /* to wakeup the source I/O thread */
221    pa_asyncmsgq *asyncmsgq;
222    pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
223
224    pa_source *source;
225    bool source_auto_desc;
226    pa_source_output *source_output;
227    pa_memblockq *source_memblockq; /* echo canceller needs fixed sized chunks */
228    size_t source_skip;
229
230    pa_sink *sink;
231    bool sink_auto_desc;
232    pa_sink_input *sink_input;
233    pa_memblockq *sink_memblockq;
234    int64_t send_counter;          /* updated in sink IO thread */
235    int64_t recv_counter;
236    size_t sink_skip;
237
238    /* Bytes left over from previous iteration */
239    size_t sink_rem;
240    size_t source_rem;
241
242    pa_atomic_t request_resync;
243
244    pa_time_event *time_event;
245    pa_usec_t adjust_time;
246    int adjust_threshold;
247
248    FILE *captured_file;
249    FILE *played_file;
250    FILE *canceled_file;
251    FILE *drift_file;
252
253    bool use_volume_sharing;
254
255    struct {
256        pa_cvolume current_volume;
257    } thread_info;
258};
259
260static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot);
261
262static const char* const valid_modargs[] = {
263    "source_name",
264    "source_properties",
265    "source_master",
266    "sink_name",
267    "sink_properties",
268    "sink_master",
269    "adjust_time",
270    "adjust_threshold",
271    "format",
272    "rate",
273    "channels",
274    "channel_map",
275    "aec_method",
276    "aec_args",
277    "save_aec",
278    "autoloaded",
279    "use_volume_sharing",
280    "use_master_format",
281    NULL
282};
283
284enum {
285    SOURCE_OUTPUT_MESSAGE_POST = PA_SOURCE_OUTPUT_MESSAGE_MAX,
286    SOURCE_OUTPUT_MESSAGE_REWIND,
287    SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT,
288    SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME
289};
290
291enum {
292    SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
293};
294
295enum {
296    ECHO_CANCELLER_MESSAGE_SET_VOLUME,
297};
298
299static int64_t calc_diff(struct userdata *u, struct snapshot *snapshot) {
300    int64_t diff_time, buffer_latency;
301    pa_usec_t plen, rlen, source_delay, sink_delay, recv_counter, send_counter;
302
303    /* get latency difference between playback and record */
304    plen = pa_bytes_to_usec(snapshot->plen, &u->sink_input->sample_spec);
305    rlen = pa_bytes_to_usec(snapshot->rlen, &u->source_output->sample_spec);
306    if (plen > rlen)
307        buffer_latency = plen - rlen;
308    else
309        buffer_latency = 0;
310
311    source_delay = pa_bytes_to_usec(snapshot->source_delay, &u->source_output->sample_spec);
312    sink_delay = pa_bytes_to_usec(snapshot->sink_delay, &u->sink_input->sample_spec);
313    buffer_latency += source_delay + sink_delay;
314
315    /* add the latency difference due to samples not yet transferred */
316    send_counter = pa_bytes_to_usec(snapshot->send_counter, &u->sink->sample_spec);
317    recv_counter = pa_bytes_to_usec(snapshot->recv_counter, &u->sink->sample_spec);
318    if (recv_counter <= send_counter)
319        buffer_latency += (int64_t) (send_counter - recv_counter);
320    else
321        buffer_latency = PA_CLIP_SUB(buffer_latency, (int64_t) (recv_counter - send_counter));
322
323    /* capture and playback are perfectly aligned when diff_time is 0 */
324    diff_time = (snapshot->sink_now + snapshot->sink_latency - buffer_latency) -
325          (snapshot->source_now - snapshot->source_latency);
326
327    pa_log_debug("Diff %lld (%lld - %lld + %lld) %lld %lld %lld %lld", (long long) diff_time,
328        (long long) snapshot->sink_latency,
329        (long long) buffer_latency, (long long) snapshot->source_latency,
330        (long long) source_delay, (long long) sink_delay,
331        (long long) (send_counter - recv_counter),
332        (long long) (snapshot->sink_now - snapshot->source_now));
333
334    return diff_time;
335}
336
337/* Called from main context */
338static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
339    struct userdata *u = userdata;
340    uint32_t old_rate, base_rate, new_rate;
341    int64_t diff_time;
342    /*size_t fs*/
343    struct snapshot latency_snapshot;
344
345    pa_assert(u);
346    pa_assert(a);
347    pa_assert(u->time_event == e);
348    pa_assert_ctl_context();
349
350    if (!IS_ACTIVE(u))
351        return;
352
353    /* update our snapshots */
354    pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
355    pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
356
357    /* calculate drift between capture and playback */
358    diff_time = calc_diff(u, &latency_snapshot);
359
360    /*fs = pa_frame_size(&u->source_output->sample_spec);*/
361    old_rate = u->sink_input->sample_spec.rate;
362    base_rate = u->source_output->sample_spec.rate;
363
364    if (diff_time < 0) {
365        /* recording before playback, we need to adjust quickly. The echo
366         * canceller does not work in this case. */
367        pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
368            NULL, diff_time, NULL, NULL);
369        /*new_rate = base_rate - ((pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
370        new_rate = base_rate;
371    }
372    else {
373        if (diff_time > u->adjust_threshold) {
374            /* diff too big, quickly adjust */
375            pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
376                NULL, diff_time, NULL, NULL);
377        }
378
379        /* recording behind playback, we need to slowly adjust the rate to match */
380        /*new_rate = base_rate + ((pa_usec_to_bytes(diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
381
382        /* assume equal samplerates for now */
383        new_rate = base_rate;
384    }
385
386    /* make sure we don't make too big adjustments because that sounds horrible */
387    if (new_rate > base_rate * 1.1 || new_rate < base_rate * 0.9)
388        new_rate = base_rate;
389
390    if (new_rate != old_rate) {
391        pa_log_info("Old rate %lu Hz, new rate %lu Hz", (unsigned long) old_rate, (unsigned long) new_rate);
392
393        pa_sink_input_set_rate(u->sink_input, new_rate);
394    }
395
396    pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
397}
398
399/* Called from source I/O thread context */
400static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
401    struct userdata *u = PA_SOURCE(o)->userdata;
402
403    switch (code) {
404
405        case PA_SOURCE_MESSAGE_GET_LATENCY:
406
407            /* The source is _put() before the source output is, so let's
408             * make sure we don't access it in that time. Also, the
409             * source output is first shut down, the source second. */
410            if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
411                !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state)) {
412                *((int64_t*) data) = 0;
413                return 0;
414            }
415
416            *((int64_t*) data) =
417
418                /* Get the latency of the master source */
419                pa_source_get_latency_within_thread(u->source_output->source, true) +
420                /* Add the latency internal to our source output on top */
421                pa_bytes_to_usec(pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq), &u->source_output->source->sample_spec) +
422                /* and the buffering we do on the source */
423                pa_bytes_to_usec(u->source_output_blocksize, &u->source_output->source->sample_spec);
424
425            /* Add resampler delay */
426            *((int64_t*) data) += pa_resampler_get_delay_usec(u->source_output->thread_info.resampler);
427
428            return 0;
429
430        case PA_SOURCE_MESSAGE_SET_VOLUME_SYNCED:
431            u->thread_info.current_volume = u->source->reference_volume;
432            break;
433    }
434
435    return pa_source_process_msg(o, code, data, offset, chunk);
436}
437
438/* Called from sink I/O thread context */
439static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
440    struct userdata *u = PA_SINK(o)->userdata;
441
442    switch (code) {
443
444        case PA_SINK_MESSAGE_GET_LATENCY:
445
446            /* The sink is _put() before the sink input is, so let's
447             * make sure we don't access it in that time. Also, the
448             * sink input is first shut down, the sink second. */
449            if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
450                !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state)) {
451                *((int64_t*) data) = 0;
452                return 0;
453            }
454
455            *((int64_t*) data) =
456
457                /* Get the latency of the master sink */
458                pa_sink_get_latency_within_thread(u->sink_input->sink, true) +
459
460                /* Add the latency internal to our sink input on top */
461                pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
462
463            /* Add resampler delay */
464            *((int64_t*) data) += pa_resampler_get_delay_usec(u->sink_input->thread_info.resampler);
465
466            return 0;
467    }
468
469    return pa_sink_process_msg(o, code, data, offset, chunk);
470}
471
472/* Called from main context */
473static int source_set_state_in_main_thread_cb(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
474    struct userdata *u;
475
476    pa_source_assert_ref(s);
477    pa_assert_se(u = s->userdata);
478
479    if (!PA_SOURCE_IS_LINKED(state) ||
480        !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->state))
481        return 0;
482
483    if (state == PA_SOURCE_RUNNING) {
484        /* restart timer when both sink and source are active */
485        if ((u->sink->state == PA_SINK_RUNNING) && u->adjust_time)
486            pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
487
488        pa_atomic_store(&u->request_resync, 1);
489        pa_source_output_cork(u->source_output, false);
490    } else if (state == PA_SOURCE_SUSPENDED) {
491        pa_source_output_cork(u->source_output, true);
492    }
493
494    return 0;
495}
496
497/* Called from main context */
498static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
499    struct userdata *u;
500
501    pa_sink_assert_ref(s);
502    pa_assert_se(u = s->userdata);
503
504    if (!PA_SINK_IS_LINKED(state) ||
505        !PA_SINK_INPUT_IS_LINKED(u->sink_input->state))
506        return 0;
507
508    if (state == PA_SINK_RUNNING) {
509        /* restart timer when both sink and source are active */
510        if ((u->source->state == PA_SOURCE_RUNNING) && u->adjust_time)
511            pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
512
513        pa_atomic_store(&u->request_resync, 1);
514        pa_sink_input_cork(u->sink_input, false);
515    } else if (state == PA_SINK_SUSPENDED) {
516        pa_sink_input_cork(u->sink_input, true);
517    }
518
519    return 0;
520}
521
522/* Called from the IO thread. */
523static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
524    struct userdata *u;
525
526    pa_assert(s);
527    pa_assert_se(u = s->userdata);
528
529    /* When set to running or idle for the first time, request a rewind
530     * of the master sink to make sure we are heard immediately */
531    if (PA_SINK_IS_OPENED(new_state) && s->thread_info.state == PA_SINK_INIT) {
532        pa_log_debug("Requesting rewind due to state change.");
533        pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
534    }
535
536    return 0;
537}
538
539/* Called from source I/O thread context */
540static void source_update_requested_latency_cb(pa_source *s) {
541    struct userdata *u;
542    pa_usec_t latency;
543
544    pa_source_assert_ref(s);
545    pa_assert_se(u = s->userdata);
546
547    if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
548        !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state))
549        return;
550
551    pa_log_debug("Source update requested latency");
552
553    /* Cap the maximum latency so we don't have to process too large chunks */
554    latency = PA_MIN(pa_source_get_requested_latency_within_thread(s),
555                     pa_bytes_to_usec(u->source_blocksize, &s->sample_spec) * MAX_LATENCY_BLOCKS);
556
557    pa_source_output_set_requested_latency_within_thread(u->source_output, latency);
558}
559
560/* Called from sink I/O thread context */
561static void sink_update_requested_latency_cb(pa_sink *s) {
562    struct userdata *u;
563    pa_usec_t latency;
564
565    pa_sink_assert_ref(s);
566    pa_assert_se(u = s->userdata);
567
568    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
569        !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
570        return;
571
572    pa_log_debug("Sink update requested latency");
573
574    /* Cap the maximum latency so we don't have to process too large chunks */
575    latency = PA_MIN(pa_sink_get_requested_latency_within_thread(s),
576                     pa_bytes_to_usec(u->sink_blocksize, &s->sample_spec) * MAX_LATENCY_BLOCKS);
577
578    pa_sink_input_set_requested_latency_within_thread(u->sink_input, latency);
579}
580
581/* Called from sink I/O thread context */
582static void sink_request_rewind_cb(pa_sink *s) {
583    struct userdata *u;
584
585    pa_sink_assert_ref(s);
586    pa_assert_se(u = s->userdata);
587
588    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
589        !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
590        return;
591
592    pa_log_debug("Sink request rewind %lld", (long long) s->thread_info.rewind_nbytes);
593
594    /* Just hand this one over to the master sink */
595    pa_sink_input_request_rewind(u->sink_input,
596                                 s->thread_info.rewind_nbytes, true, false, false);
597}
598
599/* Called from main context */
600static void source_set_volume_cb(pa_source *s) {
601    struct userdata *u;
602
603    pa_source_assert_ref(s);
604    pa_assert_se(u = s->userdata);
605
606    if (!PA_SOURCE_IS_LINKED(s->state) ||
607        !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->state))
608        return;
609
610    pa_source_output_set_volume(u->source_output, &s->real_volume, s->save_volume, true);
611}
612
613/* Called from main context */
614static void sink_set_volume_cb(pa_sink *s) {
615    struct userdata *u;
616
617    pa_sink_assert_ref(s);
618    pa_assert_se(u = s->userdata);
619
620    if (!PA_SINK_IS_LINKED(s->state) ||
621        !PA_SINK_INPUT_IS_LINKED(u->sink_input->state))
622        return;
623
624    pa_sink_input_set_volume(u->sink_input, &s->real_volume, s->save_volume, true);
625}
626
627/* Called from main context. */
628static void source_get_volume_cb(pa_source *s) {
629    struct userdata *u;
630    pa_cvolume v;
631
632    pa_source_assert_ref(s);
633    pa_assert_se(u = s->userdata);
634
635    if (!PA_SOURCE_IS_LINKED(s->state) ||
636        !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->state))
637        return;
638
639    pa_source_output_get_volume(u->source_output, &v, true);
640
641    if (pa_cvolume_equal(&s->real_volume, &v))
642        /* no change */
643        return;
644
645    s->real_volume = v;
646    pa_source_set_soft_volume(s, NULL);
647}
648
649/* Called from main context */
650static void source_set_mute_cb(pa_source *s) {
651    struct userdata *u;
652
653    pa_source_assert_ref(s);
654    pa_assert_se(u = s->userdata);
655
656    if (!PA_SOURCE_IS_LINKED(s->state) ||
657        !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->state))
658        return;
659
660    pa_source_output_set_mute(u->source_output, s->muted, s->save_muted);
661}
662
663/* Called from main context */
664static void sink_set_mute_cb(pa_sink *s) {
665    struct userdata *u;
666
667    pa_sink_assert_ref(s);
668    pa_assert_se(u = s->userdata);
669
670    if (!PA_SINK_IS_LINKED(s->state) ||
671        !PA_SINK_INPUT_IS_LINKED(u->sink_input->state))
672        return;
673
674    pa_sink_input_set_mute(u->sink_input, s->muted, s->save_muted);
675}
676
677/* Called from source I/O thread context. */
678static void apply_diff_time(struct userdata *u, int64_t diff_time) {
679    int64_t diff;
680
681    if (diff_time < 0) {
682        diff = pa_usec_to_bytes(-diff_time, &u->sink_input->sample_spec);
683
684        if (diff > 0) {
685            /* add some extra safety samples to compensate for jitter in the
686             * timings */
687            diff += 10 * pa_frame_size (&u->sink_input->sample_spec);
688
689            pa_log("Playback after capture (%lld), drop sink %lld", (long long) diff_time, (long long) diff);
690
691            u->sink_skip = diff;
692            u->source_skip = 0;
693        }
694    } else if (diff_time > 0) {
695        diff = pa_usec_to_bytes(diff_time, &u->source_output->sample_spec);
696
697        if (diff > 0) {
698            pa_log("Playback too far ahead (%lld), drop source %lld", (long long) diff_time, (long long) diff);
699
700            u->source_skip = diff;
701            u->sink_skip = 0;
702        }
703    }
704}
705
706/* Called from source I/O thread context. */
707static void do_resync(struct userdata *u) {
708    int64_t diff_time;
709    struct snapshot latency_snapshot;
710
711    pa_log("Doing resync");
712
713    /* update our snapshot */
714    /* 1. Get sink input latency snapshot, might cause buffers to be sent to source thread */
715    pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
716    /* 2. Pick up any in-flight buffers (and discard if needed) */
717    while (pa_asyncmsgq_process_one(u->asyncmsgq))
718        ;
719    /* 3. Now get the source output latency snapshot */
720    source_output_snapshot_within_thread(u, &latency_snapshot);
721
722    /* calculate drift between capture and playback */
723    diff_time = calc_diff(u, &latency_snapshot);
724
725    /* and adjust for the drift */
726    apply_diff_time(u, diff_time);
727}
728
729/* 1. Calculate drift at this point, pass to canceller
730 * 2. Push out playback samples in blocksize chunks
731 * 3. Push out capture samples in blocksize chunks
732 * 4. ???
733 * 5. Profit
734 *
735 * Called from source I/O thread context.
736 */
737static void do_push_drift_comp(struct userdata *u) {
738    size_t rlen, plen;
739    pa_memchunk rchunk, pchunk, cchunk;
740    uint8_t *rdata, *pdata, *cdata;
741    float drift;
742    int unused PA_GCC_UNUSED;
743
744    rlen = pa_memblockq_get_length(u->source_memblockq);
745    plen = pa_memblockq_get_length(u->sink_memblockq);
746
747    /* Estimate snapshot drift as follows:
748     *   pd: amount of data consumed since last time
749     *   rd: amount of data consumed since last time
750     *
751     *   drift = (pd - rd) / rd;
752     *
753     * We calculate pd and rd as the memblockq length less the number of
754     * samples left from the last iteration (to avoid double counting
755     * those remainder samples.
756     */
757    drift = ((float)(plen - u->sink_rem) - (rlen - u->source_rem)) / ((float)(rlen - u->source_rem));
758    u->sink_rem = plen % u->sink_blocksize;
759    u->source_rem = rlen % u->source_output_blocksize;
760
761    if (u->save_aec) {
762        if (u->drift_file)
763            fprintf(u->drift_file, "d %a\n", drift);
764    }
765
766    /* Send in the playback samples first */
767    while (plen >= u->sink_blocksize) {
768        pa_memblockq_peek_fixed_size(u->sink_memblockq, u->sink_blocksize, &pchunk);
769        pdata = pa_memblock_acquire(pchunk.memblock);
770        pdata += pchunk.index;
771
772        u->ec->play(u->ec, pdata);
773
774        if (u->save_aec) {
775            if (u->drift_file)
776                fprintf(u->drift_file, "p %d\n", u->sink_blocksize);
777            if (u->played_file)
778                unused = fwrite(pdata, 1, u->sink_blocksize, u->played_file);
779        }
780
781        pa_memblock_release(pchunk.memblock);
782        pa_memblockq_drop(u->sink_memblockq, u->sink_blocksize);
783        pa_memblock_unref(pchunk.memblock);
784
785        plen -= u->sink_blocksize;
786    }
787
788    /* And now the capture samples */
789    while (rlen >= u->source_output_blocksize) {
790        pa_memblockq_peek_fixed_size(u->source_memblockq, u->source_output_blocksize, &rchunk);
791
792        rdata = pa_memblock_acquire(rchunk.memblock);
793        rdata += rchunk.index;
794
795        cchunk.index = 0;
796        cchunk.length = u->source_output_blocksize;
797        cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
798        cdata = pa_memblock_acquire(cchunk.memblock);
799
800        u->ec->set_drift(u->ec, drift);
801        u->ec->record(u->ec, rdata, cdata);
802
803        if (u->save_aec) {
804            if (u->drift_file)
805                fprintf(u->drift_file, "c %d\n", u->source_output_blocksize);
806            if (u->captured_file)
807                unused = fwrite(rdata, 1, u->source_output_blocksize, u->captured_file);
808            if (u->canceled_file)
809                unused = fwrite(cdata, 1, u->source_output_blocksize, u->canceled_file);
810        }
811
812        pa_memblock_release(cchunk.memblock);
813        pa_memblock_release(rchunk.memblock);
814
815        pa_memblock_unref(rchunk.memblock);
816
817        pa_source_post(u->source, &cchunk);
818        pa_memblock_unref(cchunk.memblock);
819
820        pa_memblockq_drop(u->source_memblockq, u->source_output_blocksize);
821        rlen -= u->source_output_blocksize;
822    }
823}
824
825/* This one's simpler than the drift compensation case -- we just iterate over
826 * the capture buffer, and pass the canceller blocksize bytes of playback and
827 * capture data. If playback is currently inactive, we just push silence.
828 *
829 * Called from source I/O thread context. */
830static void do_push(struct userdata *u) {
831    size_t rlen, plen;
832    pa_memchunk rchunk, pchunk, cchunk;
833    uint8_t *rdata, *pdata, *cdata;
834    int unused PA_GCC_UNUSED;
835
836    rlen = pa_memblockq_get_length(u->source_memblockq);
837    plen = pa_memblockq_get_length(u->sink_memblockq);
838
839    while (rlen >= u->source_output_blocksize) {
840
841        /* take fixed blocks from recorded and played samples */
842        pa_memblockq_peek_fixed_size(u->source_memblockq, u->source_output_blocksize, &rchunk);
843        pa_memblockq_peek_fixed_size(u->sink_memblockq, u->sink_blocksize, &pchunk);
844
845        /* we ran out of played data and pchunk has been filled with silence bytes */
846        if (plen < u->sink_blocksize)
847            pa_memblockq_seek(u->sink_memblockq, u->sink_blocksize - plen, PA_SEEK_RELATIVE, true);
848
849        rdata = pa_memblock_acquire(rchunk.memblock);
850        rdata += rchunk.index;
851        pdata = pa_memblock_acquire(pchunk.memblock);
852        pdata += pchunk.index;
853
854        cchunk.index = 0;
855        cchunk.length = u->source_blocksize;
856        cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
857        cdata = pa_memblock_acquire(cchunk.memblock);
858
859        if (u->save_aec) {
860            if (u->captured_file)
861                unused = fwrite(rdata, 1, u->source_output_blocksize, u->captured_file);
862            if (u->played_file)
863                unused = fwrite(pdata, 1, u->sink_blocksize, u->played_file);
864        }
865
866        /* perform echo cancellation */
867        u->ec->run(u->ec, rdata, pdata, cdata);
868
869        if (u->save_aec) {
870            if (u->canceled_file)
871                unused = fwrite(cdata, 1, u->source_blocksize, u->canceled_file);
872        }
873
874        pa_memblock_release(cchunk.memblock);
875        pa_memblock_release(pchunk.memblock);
876        pa_memblock_release(rchunk.memblock);
877
878        /* drop consumed source samples */
879        pa_memblockq_drop(u->source_memblockq, u->source_output_blocksize);
880        pa_memblock_unref(rchunk.memblock);
881        rlen -= u->source_output_blocksize;
882
883        /* drop consumed sink samples */
884        pa_memblockq_drop(u->sink_memblockq, u->sink_blocksize);
885        pa_memblock_unref(pchunk.memblock);
886
887        if (plen >= u->sink_blocksize)
888            plen -= u->sink_blocksize;
889        else
890            plen = 0;
891
892        /* forward the (echo-canceled) data to the virtual source */
893        pa_source_post(u->source, &cchunk);
894        pa_memblock_unref(cchunk.memblock);
895    }
896}
897
898/* Called from source I/O thread context. */
899static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
900    struct userdata *u;
901    size_t rlen, plen, to_skip;
902    pa_memchunk rchunk;
903
904    pa_source_output_assert_ref(o);
905    pa_source_output_assert_io_context(o);
906    pa_assert_se(u = o->userdata);
907
908    if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state))
909        return;
910
911    if (!PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state)) {
912        pa_log("Push when no link?");
913        return;
914    }
915
916    /* handle queued messages, do any message sending of our own */
917    while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
918        ;
919
920    pa_memblockq_push_align(u->source_memblockq, chunk);
921
922    rlen = pa_memblockq_get_length(u->source_memblockq);
923    plen = pa_memblockq_get_length(u->sink_memblockq);
924
925    /* Let's not do anything else till we have enough data to process */
926    if (rlen < u->source_output_blocksize)
927        return;
928
929    /* See if we need to drop samples in order to sync */
930    if (pa_atomic_cmpxchg (&u->request_resync, 1, 0)) {
931        do_resync(u);
932    }
933
934    /* Okay, skip cancellation for skipped source samples if needed. */
935    if (PA_UNLIKELY(u->source_skip)) {
936        /* The slightly tricky bit here is that we drop all but modulo
937         * blocksize bytes and then adjust for that last bit on the sink side.
938         * We do this because the source data is coming at a fixed rate, which
939         * means the only way to try to catch up is drop sink samples and let
940         * the canceller cope up with this. */
941        to_skip = rlen >= u->source_skip ? u->source_skip : rlen;
942        to_skip -= to_skip % u->source_output_blocksize;
943
944        if (to_skip) {
945            pa_memblockq_peek_fixed_size(u->source_memblockq, to_skip, &rchunk);
946            pa_source_post(u->source, &rchunk);
947
948            pa_memblock_unref(rchunk.memblock);
949            pa_memblockq_drop(u->source_memblockq, to_skip);
950
951            rlen -= to_skip;
952            u->source_skip -= to_skip;
953        }
954
955        if (rlen && u->source_skip % u->source_output_blocksize) {
956            u->sink_skip += (uint64_t) (u->source_output_blocksize - (u->source_skip % u->source_output_blocksize)) * u->sink_blocksize / u->source_output_blocksize;
957            u->source_skip -= (u->source_skip % u->source_output_blocksize);
958        }
959    }
960
961    /* And for the sink, these samples have been played back already, so we can
962     * just drop them and get on with it. */
963    if (PA_UNLIKELY(u->sink_skip)) {
964        to_skip = plen >= u->sink_skip ? u->sink_skip : plen;
965
966        pa_memblockq_drop(u->sink_memblockq, to_skip);
967
968        plen -= to_skip;
969        u->sink_skip -= to_skip;
970    }
971
972    /* process and push out samples */
973    if (u->ec->params.drift_compensation)
974        do_push_drift_comp(u);
975    else
976        do_push(u);
977}
978
979/* Called from sink I/O thread context. */
980static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
981    struct userdata *u;
982
983    pa_sink_input_assert_ref(i);
984    pa_assert(chunk);
985    pa_assert_se(u = i->userdata);
986
987    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state))
988        return -1;
989
990    if (u->sink->thread_info.rewind_requested)
991        pa_sink_process_rewind(u->sink, 0);
992
993    pa_sink_render_full(u->sink, nbytes, chunk);
994
995    if (i->thread_info.underrun_for > 0) {
996        pa_log_debug("Handling end of underrun.");
997        pa_atomic_store(&u->request_resync, 1);
998    }
999
1000    /* let source thread handle the chunk. pass the sample count as well so that
1001     * the source IO thread can update the right variables. */
1002    pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_POST,
1003        NULL, 0, chunk, NULL);
1004    u->send_counter += chunk->length;
1005
1006    return 0;
1007}
1008
1009/* Called from source I/O thread context. */
1010static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
1011    struct userdata *u;
1012
1013    pa_source_output_assert_ref(o);
1014    pa_source_output_assert_io_context(o);
1015    pa_assert_se(u = o->userdata);
1016
1017    /* If the source is not yet linked, there is nothing to rewind */
1018    if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state))
1019        return;
1020
1021    pa_source_process_rewind(u->source, nbytes);
1022
1023    /* go back on read side, we need to use older sink data for this */
1024    pa_memblockq_rewind(u->sink_memblockq, nbytes);
1025
1026    /* manipulate write index */
1027    pa_memblockq_seek(u->source_memblockq, -nbytes, PA_SEEK_RELATIVE, true);
1028
1029    pa_log_debug("Source rewind (%lld) %lld", (long long) nbytes,
1030        (long long) pa_memblockq_get_length (u->source_memblockq));
1031}
1032
1033/* Called from sink I/O thread context. */
1034static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1035    struct userdata *u;
1036
1037    pa_sink_input_assert_ref(i);
1038    pa_assert_se(u = i->userdata);
1039
1040    /* If the sink is not yet linked, there is nothing to rewind */
1041    if (!PA_SINK_IS_LINKED(u->sink->thread_info.state))
1042        return;
1043
1044    pa_log_debug("Sink process rewind %lld", (long long) nbytes);
1045
1046    pa_sink_process_rewind(u->sink, nbytes);
1047
1048    pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
1049    u->send_counter -= nbytes;
1050}
1051
1052/* Called from source I/O thread context. */
1053static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot) {
1054    size_t delay, rlen, plen;
1055    pa_usec_t now, latency;
1056
1057    now = pa_rtclock_now();
1058    latency = pa_source_get_latency_within_thread(u->source_output->source, false);
1059    /* Add resampler delay */
1060    latency += pa_resampler_get_delay_usec(u->source_output->thread_info.resampler);
1061
1062    delay = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
1063
1064    delay = (u->source_output->thread_info.resampler ? pa_resampler_request(u->source_output->thread_info.resampler, delay) : delay);
1065    rlen = pa_memblockq_get_length(u->source_memblockq);
1066    plen = pa_memblockq_get_length(u->sink_memblockq);
1067
1068    snapshot->source_now = now;
1069    snapshot->source_latency = latency;
1070    snapshot->source_delay = delay;
1071    snapshot->recv_counter = u->recv_counter;
1072    snapshot->rlen = rlen + u->sink_skip;
1073    snapshot->plen = plen + u->source_skip;
1074}
1075
1076/* Called from source I/O thread context. */
1077static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1078    struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
1079
1080    switch (code) {
1081
1082        case SOURCE_OUTPUT_MESSAGE_POST:
1083
1084            pa_source_output_assert_io_context(u->source_output);
1085
1086            if (u->source_output->source->thread_info.state == PA_SOURCE_RUNNING)
1087                pa_memblockq_push_align(u->sink_memblockq, chunk);
1088            else
1089                pa_memblockq_flush_write(u->sink_memblockq, true);
1090
1091            u->recv_counter += (int64_t) chunk->length;
1092
1093            return 0;
1094
1095        case SOURCE_OUTPUT_MESSAGE_REWIND:
1096            pa_source_output_assert_io_context(u->source_output);
1097
1098            /* manipulate write index, never go past what we have */
1099            if (PA_SOURCE_IS_OPENED(u->source_output->source->thread_info.state))
1100                pa_memblockq_seek(u->sink_memblockq, -offset, PA_SEEK_RELATIVE, true);
1101            else
1102                pa_memblockq_flush_write(u->sink_memblockq, true);
1103
1104            pa_log_debug("Sink rewind (%lld)", (long long) offset);
1105
1106            u->recv_counter -= offset;
1107
1108            return 0;
1109
1110        case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
1111            struct snapshot *snapshot = (struct snapshot *) data;
1112
1113            source_output_snapshot_within_thread(u, snapshot);
1114            return 0;
1115        }
1116
1117        case SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME:
1118            apply_diff_time(u, offset);
1119            return 0;
1120
1121    }
1122
1123    return pa_source_output_process_msg(obj, code, data, offset, chunk);
1124}
1125
1126/* Called from sink I/O thread context. */
1127static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1128    struct userdata *u = PA_SINK_INPUT(obj)->userdata;
1129
1130    switch (code) {
1131
1132        case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
1133            size_t delay;
1134            pa_usec_t now, latency;
1135            struct snapshot *snapshot = (struct snapshot *) data;
1136
1137            pa_sink_input_assert_io_context(u->sink_input);
1138
1139            now = pa_rtclock_now();
1140            latency = pa_sink_get_latency_within_thread(u->sink_input->sink, false);
1141            /* Add resampler delay */
1142            latency += pa_resampler_get_delay_usec(u->sink_input->thread_info.resampler);
1143
1144            delay = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
1145
1146            delay = (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, delay) : delay);
1147
1148            snapshot->sink_now = now;
1149            snapshot->sink_latency = latency;
1150            snapshot->sink_delay = delay;
1151            snapshot->send_counter = u->send_counter;
1152            return 0;
1153        }
1154    }
1155
1156    return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1157}
1158
1159/* Called from sink I/O thread context. */
1160static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1161    struct userdata *u;
1162
1163    pa_sink_input_assert_ref(i);
1164    pa_assert_se(u = i->userdata);
1165
1166    pa_log_debug("Sink input update max rewind %lld", (long long) nbytes);
1167
1168    /* FIXME: Too small max_rewind:
1169     * https://bugs.freedesktop.org/show_bug.cgi?id=53709 */
1170    pa_memblockq_set_maxrewind(u->sink_memblockq, nbytes);
1171    pa_sink_set_max_rewind_within_thread(u->sink, nbytes);
1172}
1173
1174/* Called from source I/O thread context. */
1175static void source_output_update_max_rewind_cb(pa_source_output *o, size_t nbytes) {
1176    struct userdata *u;
1177
1178    pa_source_output_assert_ref(o);
1179    pa_assert_se(u = o->userdata);
1180
1181    pa_log_debug("Source output update max rewind %lld", (long long) nbytes);
1182
1183    pa_source_set_max_rewind_within_thread(u->source, nbytes);
1184}
1185
1186/* Called from sink I/O thread context. */
1187static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1188    struct userdata *u;
1189
1190    pa_sink_input_assert_ref(i);
1191    pa_assert_se(u = i->userdata);
1192
1193    pa_log_debug("Sink input update max request %lld", (long long) nbytes);
1194
1195    pa_sink_set_max_request_within_thread(u->sink, nbytes);
1196}
1197
1198/* Called from sink I/O thread context. */
1199static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
1200    struct userdata *u;
1201    pa_usec_t latency;
1202
1203    pa_sink_input_assert_ref(i);
1204    pa_assert_se(u = i->userdata);
1205
1206    latency = pa_sink_get_requested_latency_within_thread(i->sink);
1207
1208    pa_log_debug("Sink input update requested latency %lld", (long long) latency);
1209}
1210
1211/* Called from source I/O thread context. */
1212static void source_output_update_source_requested_latency_cb(pa_source_output *o) {
1213    struct userdata *u;
1214    pa_usec_t latency;
1215
1216    pa_source_output_assert_ref(o);
1217    pa_assert_se(u = o->userdata);
1218
1219    latency = pa_source_get_requested_latency_within_thread(o->source);
1220
1221    pa_log_debug("Source output update requested latency %lld", (long long) latency);
1222}
1223
1224/* Called from sink I/O thread context. */
1225static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
1226    struct userdata *u;
1227
1228    pa_sink_input_assert_ref(i);
1229    pa_assert_se(u = i->userdata);
1230
1231    pa_log_debug("Sink input update latency range %lld %lld",
1232        (long long) i->sink->thread_info.min_latency,
1233        (long long) i->sink->thread_info.max_latency);
1234
1235    pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1236}
1237
1238/* Called from source I/O thread context. */
1239static void source_output_update_source_latency_range_cb(pa_source_output *o) {
1240    struct userdata *u;
1241
1242    pa_source_output_assert_ref(o);
1243    pa_assert_se(u = o->userdata);
1244
1245    pa_log_debug("Source output update latency range %lld %lld",
1246        (long long) o->source->thread_info.min_latency,
1247        (long long) o->source->thread_info.max_latency);
1248
1249    pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1250}
1251
1252/* Called from sink I/O thread context. */
1253static void sink_input_update_sink_fixed_latency_cb(pa_sink_input *i) {
1254    struct userdata *u;
1255
1256    pa_sink_input_assert_ref(i);
1257    pa_assert_se(u = i->userdata);
1258
1259    pa_log_debug("Sink input update fixed latency %lld",
1260        (long long) i->sink->thread_info.fixed_latency);
1261
1262    pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1263}
1264
1265/* Called from source I/O thread context. */
1266static void source_output_update_source_fixed_latency_cb(pa_source_output *o) {
1267    struct userdata *u;
1268
1269    pa_source_output_assert_ref(o);
1270    pa_assert_se(u = o->userdata);
1271
1272    pa_log_debug("Source output update fixed latency %lld",
1273        (long long) o->source->thread_info.fixed_latency);
1274
1275    pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1276}
1277
1278/* Called from source I/O thread context. */
1279static void source_output_attach_cb(pa_source_output *o) {
1280    struct userdata *u;
1281
1282    pa_source_output_assert_ref(o);
1283    pa_source_output_assert_io_context(o);
1284    pa_assert_se(u = o->userdata);
1285
1286    pa_source_set_rtpoll(u->source, o->source->thread_info.rtpoll);
1287    pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1288    pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1289    pa_source_set_max_rewind_within_thread(u->source, pa_source_output_get_max_rewind(o));
1290
1291    pa_log_debug("Source output %d attach", o->index);
1292
1293    if (PA_SOURCE_IS_LINKED(u->source->thread_info.state))
1294        pa_source_attach_within_thread(u->source);
1295
1296    u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1297            o->source->thread_info.rtpoll,
1298            PA_RTPOLL_LATE,
1299            u->asyncmsgq);
1300}
1301
1302/* Called from sink I/O thread context. */
1303static void sink_input_attach_cb(pa_sink_input *i) {
1304    struct userdata *u;
1305
1306    pa_sink_input_assert_ref(i);
1307    pa_assert_se(u = i->userdata);
1308
1309    pa_sink_set_rtpoll(u->sink, i->sink->thread_info.rtpoll);
1310    pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1311
1312    /* (8.1) IF YOU NEED A FIXED BLOCK SIZE ADD THE LATENCY FOR ONE
1313     * BLOCK MINUS ONE SAMPLE HERE. SEE (7) */
1314    pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1315
1316    /* (8.2) IF YOU NEED A FIXED BLOCK SIZE ROUND
1317     * pa_sink_input_get_max_request(i) UP TO MULTIPLES OF IT
1318     * HERE. SEE (6) */
1319    pa_sink_set_max_request_within_thread(u->sink, pa_sink_input_get_max_request(i));
1320
1321    /* FIXME: Too small max_rewind:
1322     * https://bugs.freedesktop.org/show_bug.cgi?id=53709 */
1323    pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
1324
1325    pa_log_debug("Sink input %d attach", i->index);
1326
1327    u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
1328            i->sink->thread_info.rtpoll,
1329            PA_RTPOLL_LATE,
1330            u->asyncmsgq);
1331
1332    if (PA_SINK_IS_LINKED(u->sink->thread_info.state))
1333        pa_sink_attach_within_thread(u->sink);
1334}
1335
1336/* Called from source I/O thread context. */
1337static void source_output_detach_cb(pa_source_output *o) {
1338    struct userdata *u;
1339
1340    pa_source_output_assert_ref(o);
1341    pa_source_output_assert_io_context(o);
1342    pa_assert_se(u = o->userdata);
1343
1344    if (PA_SOURCE_IS_LINKED(u->source->thread_info.state))
1345        pa_source_detach_within_thread(u->source);
1346    pa_source_set_rtpoll(u->source, NULL);
1347
1348    pa_log_debug("Source output %d detach", o->index);
1349
1350    if (u->rtpoll_item_read) {
1351        pa_rtpoll_item_free(u->rtpoll_item_read);
1352        u->rtpoll_item_read = NULL;
1353    }
1354}
1355
1356/* Called from sink I/O thread context. */
1357static void sink_input_detach_cb(pa_sink_input *i) {
1358    struct userdata *u;
1359
1360    pa_sink_input_assert_ref(i);
1361    pa_assert_se(u = i->userdata);
1362
1363    if (PA_SINK_IS_LINKED(u->sink->thread_info.state))
1364        pa_sink_detach_within_thread(u->sink);
1365
1366    pa_sink_set_rtpoll(u->sink, NULL);
1367
1368    pa_log_debug("Sink input %d detach", i->index);
1369
1370    if (u->rtpoll_item_write) {
1371        pa_rtpoll_item_free(u->rtpoll_item_write);
1372        u->rtpoll_item_write = NULL;
1373    }
1374}
1375
1376/* Called from source I/O thread context except when cork() is called without valid source. */
1377static void source_output_state_change_cb(pa_source_output *o, pa_source_output_state_t state) {
1378    struct userdata *u;
1379
1380    pa_source_output_assert_ref(o);
1381    pa_assert_se(u = o->userdata);
1382
1383    pa_log_debug("Source output %d state %d", o->index, state);
1384}
1385
1386/* Called from sink I/O thread context. */
1387static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1388    struct userdata *u;
1389
1390    pa_sink_input_assert_ref(i);
1391    pa_assert_se(u = i->userdata);
1392
1393    pa_log_debug("Sink input %d state %d", i->index, state);
1394}
1395
1396/* Called from main context. */
1397static void source_output_kill_cb(pa_source_output *o) {
1398    struct userdata *u;
1399
1400    pa_source_output_assert_ref(o);
1401    pa_assert_ctl_context();
1402    pa_assert_se(u = o->userdata);
1403
1404    u->dead = true;
1405
1406    /* The order here matters! We first kill the source so that streams can
1407     * properly be moved away while the source output is still connected to
1408     * the master. */
1409    pa_source_output_cork(u->source_output, true);
1410    pa_source_unlink(u->source);
1411    pa_source_output_unlink(u->source_output);
1412
1413    pa_source_output_unref(u->source_output);
1414    u->source_output = NULL;
1415
1416    pa_source_unref(u->source);
1417    u->source = NULL;
1418
1419    pa_log_debug("Source output kill %d", o->index);
1420
1421    pa_module_unload_request(u->module, true);
1422}
1423
1424/* Called from main context */
1425static void sink_input_kill_cb(pa_sink_input *i) {
1426    struct userdata *u;
1427
1428    pa_sink_input_assert_ref(i);
1429    pa_assert_se(u = i->userdata);
1430
1431    u->dead = true;
1432
1433    /* The order here matters! We first kill the sink so that streams
1434     * can properly be moved away while the sink input is still connected
1435     * to the master. */
1436    pa_sink_input_cork(u->sink_input, true);
1437    pa_sink_unlink(u->sink);
1438    pa_sink_input_unlink(u->sink_input);
1439
1440    pa_sink_input_unref(u->sink_input);
1441    u->sink_input = NULL;
1442
1443    pa_sink_unref(u->sink);
1444    u->sink = NULL;
1445
1446    pa_log_debug("Sink input kill %d", i->index);
1447
1448    pa_module_unload_request(u->module, true);
1449}
1450
1451/* Called from main context. */
1452static bool source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
1453    struct userdata *u;
1454
1455    pa_source_output_assert_ref(o);
1456    pa_assert_ctl_context();
1457    pa_assert_se(u = o->userdata);
1458
1459    if (u->dead)
1460        return false;
1461
1462    return (u->source != dest) && (u->sink != dest->monitor_of);
1463}
1464
1465/* Called from main context */
1466static bool sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1467    struct userdata *u;
1468
1469    pa_sink_input_assert_ref(i);
1470    pa_assert_se(u = i->userdata);
1471
1472    if (u->dead)
1473        return false;
1474
1475    return u->sink != dest;
1476}
1477
1478/* Called from main context. */
1479static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1480    struct userdata *u;
1481    uint32_t idx;
1482    pa_source_output *output;
1483
1484    pa_source_output_assert_ref(o);
1485    pa_assert_ctl_context();
1486    pa_assert_se(u = o->userdata);
1487
1488    if (dest) {
1489        pa_source_set_asyncmsgq(u->source, dest->asyncmsgq);
1490        pa_source_update_flags(u->source, PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY, dest->flags);
1491    } else
1492        pa_source_set_asyncmsgq(u->source, NULL);
1493
1494    /* Propagate asyncmsq change to attached virtual sources */
1495    PA_IDXSET_FOREACH(output, u->source->outputs, idx) {
1496        if (output->destination_source && output->moving)
1497            output->moving(output, u->source);
1498    }
1499
1500    if (u->source_auto_desc && dest) {
1501        const char *y, *z;
1502        pa_proplist *pl;
1503
1504        pl = pa_proplist_new();
1505        if (u->sink_input->sink) {
1506            pa_proplist_sets(pl, PA_PROP_DEVICE_MASTER_DEVICE, u->sink_input->sink->name);
1507            y = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION);
1508        } else
1509            y = "<unknown>"; /* Probably in the middle of a move */
1510        z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1511        pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1512                y ? y : u->sink_input->sink->name);
1513
1514        pa_source_update_proplist(u->source, PA_UPDATE_REPLACE, pl);
1515        pa_proplist_free(pl);
1516    }
1517}
1518
1519/* Called from main context */
1520static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1521    struct userdata *u;
1522
1523    pa_sink_input_assert_ref(i);
1524    pa_assert_se(u = i->userdata);
1525
1526    if (dest) {
1527        pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
1528        pa_sink_update_flags(u->sink, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY, dest->flags);
1529    } else
1530        pa_sink_set_asyncmsgq(u->sink, NULL);
1531
1532    if (u->sink_auto_desc && dest) {
1533        const char *y, *z;
1534        pa_proplist *pl;
1535
1536        pl = pa_proplist_new();
1537        if (u->source_output->source) {
1538            pa_proplist_sets(pl, PA_PROP_DEVICE_MASTER_DEVICE, u->source_output->source->name);
1539            y = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION);
1540        } else
1541            y = "<unknown>"; /* Probably in the middle of a move */
1542        z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1543        pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1544                         y ? y : u->source_output->source->name);
1545
1546        pa_sink_update_proplist(u->sink, PA_UPDATE_REPLACE, pl);
1547        pa_proplist_free(pl);
1548    }
1549}
1550
1551/* Called from main context */
1552static void sink_input_volume_changed_cb(pa_sink_input *i) {
1553    struct userdata *u;
1554
1555    pa_sink_input_assert_ref(i);
1556    pa_assert_se(u = i->userdata);
1557
1558    pa_sink_volume_changed(u->sink, &i->volume);
1559}
1560
1561/* Called from main context */
1562static void sink_input_mute_changed_cb(pa_sink_input *i) {
1563    struct userdata *u;
1564
1565    pa_sink_input_assert_ref(i);
1566    pa_assert_se(u = i->userdata);
1567
1568    pa_sink_mute_changed(u->sink, i->muted);
1569}
1570
1571/* Called from main context */
1572static int canceller_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1573    struct pa_echo_canceller_msg *msg;
1574    struct userdata *u;
1575
1576    pa_assert(o);
1577
1578    msg = PA_ECHO_CANCELLER_MSG(o);
1579
1580    /* When the module is unloaded, there may still remain queued messages for
1581     * the canceller. Messages are sent to the main thread using the master
1582     * source's asyncmsgq, and that message queue isn't (and can't be, at least
1583     * with the current asyncmsgq API) cleared from the canceller messages when
1584     * module-echo-cancel is unloaded.
1585     *
1586     * The userdata may already have been freed at this point, but the
1587     * asyncmsgq holds a reference to the pa_echo_canceller_msg object, which
1588     * contains a flag to indicate that all remaining messages have to be
1589     * ignored. */
1590    if (msg->dead)
1591        return 0;
1592
1593    u = msg->userdata;
1594
1595    switch (code) {
1596        case ECHO_CANCELLER_MESSAGE_SET_VOLUME: {
1597            pa_volume_t v = PA_PTR_TO_UINT(userdata);
1598            pa_cvolume vol;
1599
1600            if (u->use_volume_sharing) {
1601                pa_cvolume_set(&vol, u->source->sample_spec.channels, v);
1602                pa_source_set_volume(u->source, &vol, true, false);
1603            } else {
1604                pa_cvolume_set(&vol, u->source_output->sample_spec.channels, v);
1605                pa_source_output_set_volume(u->source_output, &vol, false, true);
1606            }
1607
1608            break;
1609        }
1610
1611        default:
1612            pa_assert_not_reached();
1613            break;
1614    }
1615
1616    return 0;
1617}
1618
1619/* Called by the canceller, so source I/O thread context. */
1620pa_volume_t pa_echo_canceller_get_capture_volume(pa_echo_canceller *ec) {
1621#ifndef ECHO_CANCEL_TEST
1622    return pa_cvolume_avg(&ec->msg->userdata->thread_info.current_volume);
1623#else
1624    return PA_VOLUME_NORM;
1625#endif
1626}
1627
1628/* Called by the canceller, so source I/O thread context. */
1629void pa_echo_canceller_set_capture_volume(pa_echo_canceller *ec, pa_volume_t v) {
1630#ifndef ECHO_CANCEL_TEST
1631    if (pa_cvolume_avg(&ec->msg->userdata->thread_info.current_volume) != v) {
1632        pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(ec->msg), ECHO_CANCELLER_MESSAGE_SET_VOLUME, PA_UINT_TO_PTR(v),
1633                0, NULL, NULL);
1634    }
1635#endif
1636}
1637
1638uint32_t pa_echo_canceller_blocksize_power2(unsigned rate, unsigned ms) {
1639    unsigned nframes = (rate * ms) / 1000;
1640    uint32_t y = 1 << ((8 * sizeof(uint32_t)) - 2);
1641
1642    pa_assert(rate >= 4000);
1643    pa_assert(ms >= 1);
1644
1645    /* nframes should be a power of 2, round down to nearest power of two */
1646    while (y > nframes)
1647        y >>= 1;
1648
1649    pa_assert(y >= 1);
1650    return y;
1651}
1652
1653static pa_echo_canceller_method_t get_ec_method_from_string(const char *method) {
1654    if (pa_streq(method, "null"))
1655        return PA_ECHO_CANCELLER_NULL;
1656#ifdef HAVE_SPEEX
1657    if (pa_streq(method, "speex"))
1658        return PA_ECHO_CANCELLER_SPEEX;
1659#endif
1660#ifdef HAVE_ADRIAN_EC
1661    if (pa_streq(method, "adrian"))
1662        return PA_ECHO_CANCELLER_ADRIAN;
1663#endif
1664#ifdef HAVE_WEBRTC
1665    if (pa_streq(method, "webrtc"))
1666        return PA_ECHO_CANCELLER_WEBRTC;
1667#endif
1668    return PA_ECHO_CANCELLER_INVALID;
1669}
1670
1671/* Common initialisation bits between module-echo-cancel and the standalone
1672 * test program.
1673 *
1674 * Called from main context. */
1675static int init_common(pa_modargs *ma, struct userdata *u, pa_sample_spec *source_ss, pa_channel_map *source_map) {
1676    const char *ec_string;
1677    pa_echo_canceller_method_t ec_method;
1678
1679    if (pa_modargs_get_sample_spec_and_channel_map(ma, source_ss, source_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1680        pa_log("Invalid sample format specification or channel map");
1681        goto fail;
1682    }
1683
1684    u->ec = pa_xnew0(pa_echo_canceller, 1);
1685    if (!u->ec) {
1686        pa_log("Failed to alloc echo canceller");
1687        goto fail;
1688    }
1689
1690    ec_string = pa_modargs_get_value(ma, "aec_method", DEFAULT_ECHO_CANCELLER);
1691    if ((ec_method = get_ec_method_from_string(ec_string)) < 0) {
1692        pa_log("Invalid echo canceller implementation '%s'", ec_string);
1693        goto fail;
1694    }
1695
1696    pa_log_info("Using AEC engine: %s", ec_string);
1697
1698    u->ec->init = ec_table[ec_method].init;
1699    u->ec->play = ec_table[ec_method].play;
1700    u->ec->record = ec_table[ec_method].record;
1701    u->ec->set_drift = ec_table[ec_method].set_drift;
1702    u->ec->run = ec_table[ec_method].run;
1703    u->ec->done = ec_table[ec_method].done;
1704
1705    return 0;
1706
1707fail:
1708    return -1;
1709}
1710
1711/* Called from main context. */
1712int pa__init(pa_module*m) {
1713    struct userdata *u;
1714    pa_sample_spec source_output_ss, source_ss, sink_ss;
1715    pa_channel_map source_output_map, source_map, sink_map;
1716    pa_modargs *ma;
1717    pa_source *source_master=NULL;
1718    pa_sink *sink_master=NULL;
1719    bool autoloaded;
1720    pa_source_output_new_data source_output_data;
1721    pa_sink_input_new_data sink_input_data;
1722    pa_source_new_data source_data;
1723    pa_sink_new_data sink_data;
1724    pa_memchunk silence;
1725    uint32_t temp;
1726    uint32_t nframes = 0;
1727    bool use_master_format;
1728    pa_usec_t blocksize_usec;
1729
1730    pa_assert(m);
1731
1732    if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1733        pa_log("Failed to parse module arguments.");
1734        goto fail;
1735    }
1736
1737    if (!(source_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source_master", NULL), PA_NAMEREG_SOURCE))) {
1738        pa_log("Master source not found");
1739        goto fail;
1740    }
1741    pa_assert(source_master);
1742
1743    if (!(sink_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "sink_master", NULL), PA_NAMEREG_SINK))) {
1744        pa_log("Master sink not found");
1745        goto fail;
1746    }
1747    pa_assert(sink_master);
1748
1749    if (source_master->monitor_of == sink_master) {
1750        pa_log("Can't cancel echo between a sink and its monitor");
1751        goto fail;
1752    }
1753
1754    /* Set to true if we just want to inherit sample spec and channel map from the sink and source master */
1755    use_master_format = DEFAULT_USE_MASTER_FORMAT;
1756    if (pa_modargs_get_value_boolean(ma, "use_master_format", &use_master_format) < 0) {
1757        pa_log("use_master_format= expects a boolean argument");
1758        goto fail;
1759    }
1760
1761    source_ss = source_master->sample_spec;
1762    sink_ss = sink_master->sample_spec;
1763
1764    if (use_master_format) {
1765        source_map = source_master->channel_map;
1766        sink_map = sink_master->channel_map;
1767    } else {
1768        source_ss = source_master->sample_spec;
1769        source_ss.rate = DEFAULT_RATE;
1770        source_ss.channels = DEFAULT_CHANNELS;
1771        pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
1772
1773        sink_ss = sink_master->sample_spec;
1774        sink_ss.rate = DEFAULT_RATE;
1775        sink_ss.channels = DEFAULT_CHANNELS;
1776        pa_channel_map_init_auto(&sink_map, sink_ss.channels, PA_CHANNEL_MAP_DEFAULT);
1777    }
1778
1779    u = pa_xnew0(struct userdata, 1);
1780    if (!u) {
1781        pa_log("Failed to alloc userdata");
1782        goto fail;
1783    }
1784    u->core = m->core;
1785    u->module = m;
1786    m->userdata = u;
1787    u->dead = false;
1788
1789    u->use_volume_sharing = true;
1790    if (pa_modargs_get_value_boolean(ma, "use_volume_sharing", &u->use_volume_sharing) < 0) {
1791        pa_log("use_volume_sharing= expects a boolean argument");
1792        goto fail;
1793    }
1794
1795    temp = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1796    if (pa_modargs_get_value_u32(ma, "adjust_time", &temp) < 0) {
1797        pa_log("Failed to parse adjust_time value");
1798        goto fail;
1799    }
1800
1801    if (temp != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1802        u->adjust_time = temp * PA_USEC_PER_SEC;
1803    else
1804        u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1805
1806    temp = DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC;
1807    if (pa_modargs_get_value_u32(ma, "adjust_threshold", &temp) < 0) {
1808        pa_log("Failed to parse adjust_threshold value");
1809        goto fail;
1810    }
1811
1812    if (temp != DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC)
1813        u->adjust_threshold = temp * PA_USEC_PER_MSEC;
1814    else
1815        u->adjust_threshold = DEFAULT_ADJUST_TOLERANCE;
1816
1817    u->save_aec = DEFAULT_SAVE_AEC;
1818    if (pa_modargs_get_value_boolean(ma, "save_aec", &u->save_aec) < 0) {
1819        pa_log("Failed to parse save_aec value");
1820        goto fail;
1821    }
1822
1823    autoloaded = DEFAULT_AUTOLOADED;
1824    if (pa_modargs_get_value_boolean(ma, "autoloaded", &autoloaded) < 0) {
1825        pa_log("Failed to parse autoloaded value");
1826        goto fail;
1827    }
1828
1829    if (init_common(ma, u, &source_ss, &source_map) < 0)
1830        goto fail;
1831
1832    u->asyncmsgq = pa_asyncmsgq_new(0);
1833    if (!u->asyncmsgq) {
1834        pa_log("pa_asyncmsgq_new() failed.");
1835        goto fail;
1836    }
1837
1838    u->need_realign = true;
1839
1840    source_output_ss = source_ss;
1841    source_output_map = source_map;
1842
1843    if (sink_ss.rate != source_ss.rate) {
1844        pa_log_info("Sample rates of play and out stream differ. Adjusting rate of play stream.");
1845        sink_ss.rate = source_ss.rate;
1846    }
1847
1848    pa_assert(u->ec->init);
1849    if (!u->ec->init(u->core, u->ec, &source_output_ss, &source_output_map, &sink_ss, &sink_map, &source_ss, &source_map, &nframes, pa_modargs_get_value(ma, "aec_args", NULL))) {
1850        pa_log("Failed to init AEC engine");
1851        goto fail;
1852    }
1853
1854    pa_assert(source_output_ss.rate == source_ss.rate);
1855    pa_assert(sink_ss.rate == source_ss.rate);
1856
1857    u->source_output_blocksize = nframes * pa_frame_size(&source_output_ss);
1858    u->source_blocksize = nframes * pa_frame_size(&source_ss);
1859    u->sink_blocksize = nframes * pa_frame_size(&sink_ss);
1860
1861    if (u->ec->params.drift_compensation)
1862        pa_assert(u->ec->set_drift);
1863
1864    /* Create source */
1865    pa_source_new_data_init(&source_data);
1866    source_data.driver = __FILE__;
1867    source_data.module = m;
1868    if (!(source_data.name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1869        source_data.name = pa_sprintf_malloc("%s.echo-cancel", source_master->name);
1870    pa_source_new_data_set_sample_spec(&source_data, &source_ss);
1871    pa_source_new_data_set_channel_map(&source_data, &source_map);
1872    pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, source_master->name);
1873    pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1874    if (!autoloaded)
1875        pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1876
1877    if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
1878        pa_log("Invalid properties");
1879        pa_source_new_data_done(&source_data);
1880        goto fail;
1881    }
1882
1883    if ((u->source_auto_desc = !pa_proplist_contains(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1884        const char *y, *z;
1885
1886        y = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1887        z = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1888        pa_proplist_setf(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1889                z ? z : source_master->name, y ? y : sink_master->name);
1890    }
1891
1892    u->source = pa_source_new(m->core, &source_data, (source_master->flags & (PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY))
1893                                                     | (u->use_volume_sharing ? PA_SOURCE_SHARE_VOLUME_WITH_MASTER : 0));
1894    pa_source_new_data_done(&source_data);
1895
1896    if (!u->source) {
1897        pa_log("Failed to create source.");
1898        goto fail;
1899    }
1900
1901    u->source->parent.process_msg = source_process_msg_cb;
1902    u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
1903    u->source->update_requested_latency = source_update_requested_latency_cb;
1904    pa_source_set_set_mute_callback(u->source, source_set_mute_cb);
1905    if (!u->use_volume_sharing) {
1906        pa_source_set_get_volume_callback(u->source, source_get_volume_cb);
1907        pa_source_set_set_volume_callback(u->source, source_set_volume_cb);
1908        pa_source_enable_decibel_volume(u->source, true);
1909    }
1910    u->source->userdata = u;
1911
1912    pa_source_set_asyncmsgq(u->source, source_master->asyncmsgq);
1913
1914    /* Create sink */
1915    pa_sink_new_data_init(&sink_data);
1916    sink_data.driver = __FILE__;
1917    sink_data.module = m;
1918    if (!(sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1919        sink_data.name = pa_sprintf_malloc("%s.echo-cancel", sink_master->name);
1920    pa_sink_new_data_set_sample_spec(&sink_data, &sink_ss);
1921    pa_sink_new_data_set_channel_map(&sink_data, &sink_map);
1922    pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, sink_master->name);
1923    pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1924    if (!autoloaded)
1925        pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1926
1927    if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
1928        pa_log("Invalid properties");
1929        pa_sink_new_data_done(&sink_data);
1930        goto fail;
1931    }
1932
1933    if ((u->sink_auto_desc = !pa_proplist_contains(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1934        const char *y, *z;
1935
1936        y = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1937        z = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1938        pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1939                z ? z : sink_master->name, y ? y : source_master->name);
1940    }
1941
1942    u->sink = pa_sink_new(m->core, &sink_data, (sink_master->flags & (PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY))
1943                                               | (u->use_volume_sharing ? PA_SINK_SHARE_VOLUME_WITH_MASTER : 0));
1944    pa_sink_new_data_done(&sink_data);
1945
1946    if (!u->sink) {
1947        pa_log("Failed to create sink.");
1948        goto fail;
1949    }
1950
1951    u->sink->parent.process_msg = sink_process_msg_cb;
1952    u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
1953    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
1954    u->sink->update_requested_latency = sink_update_requested_latency_cb;
1955    u->sink->request_rewind = sink_request_rewind_cb;
1956    pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
1957    if (!u->use_volume_sharing) {
1958        pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
1959        pa_sink_enable_decibel_volume(u->sink, true);
1960    }
1961    u->sink->userdata = u;
1962
1963    pa_sink_set_asyncmsgq(u->sink, sink_master->asyncmsgq);
1964
1965    /* Create source output */
1966    pa_source_output_new_data_init(&source_output_data);
1967    source_output_data.driver = __FILE__;
1968    source_output_data.module = m;
1969    pa_source_output_new_data_set_source(&source_output_data, source_master, false, true);
1970    source_output_data.destination_source = u->source;
1971
1972    pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Source Stream");
1973    pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1974    pa_source_output_new_data_set_sample_spec(&source_output_data, &source_output_ss);
1975    pa_source_output_new_data_set_channel_map(&source_output_data, &source_output_map);
1976    source_output_data.flags |= PA_SOURCE_OUTPUT_START_CORKED;
1977
1978    if (autoloaded)
1979        source_output_data.flags |= PA_SOURCE_OUTPUT_DONT_MOVE;
1980
1981    pa_source_output_new(&u->source_output, m->core, &source_output_data);
1982    pa_source_output_new_data_done(&source_output_data);
1983
1984    if (!u->source_output)
1985        goto fail;
1986
1987    u->source_output->parent.process_msg = source_output_process_msg_cb;
1988    u->source_output->push = source_output_push_cb;
1989    u->source_output->process_rewind = source_output_process_rewind_cb;
1990    u->source_output->update_max_rewind = source_output_update_max_rewind_cb;
1991    u->source_output->update_source_requested_latency = source_output_update_source_requested_latency_cb;
1992    u->source_output->update_source_latency_range = source_output_update_source_latency_range_cb;
1993    u->source_output->update_source_fixed_latency = source_output_update_source_fixed_latency_cb;
1994    u->source_output->kill = source_output_kill_cb;
1995    u->source_output->attach = source_output_attach_cb;
1996    u->source_output->detach = source_output_detach_cb;
1997    u->source_output->state_change = source_output_state_change_cb;
1998    u->source_output->may_move_to = source_output_may_move_to_cb;
1999    u->source_output->moving = source_output_moving_cb;
2000    u->source_output->userdata = u;
2001
2002    u->source->output_from_master = u->source_output;
2003
2004    /* Create sink input */
2005    pa_sink_input_new_data_init(&sink_input_data);
2006    sink_input_data.driver = __FILE__;
2007    sink_input_data.module = m;
2008    pa_sink_input_new_data_set_sink(&sink_input_data, sink_master, false, true);
2009    sink_input_data.origin_sink = u->sink;
2010    pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Sink Stream");
2011    pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
2012    pa_sink_input_new_data_set_sample_spec(&sink_input_data, &sink_ss);
2013    pa_sink_input_new_data_set_channel_map(&sink_input_data, &sink_map);
2014    sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE | PA_SINK_INPUT_START_CORKED;
2015
2016    if (autoloaded)
2017        sink_input_data.flags |= PA_SINK_INPUT_DONT_MOVE;
2018
2019    pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
2020    pa_sink_input_new_data_done(&sink_input_data);
2021
2022    if (!u->sink_input)
2023        goto fail;
2024
2025    u->sink_input->parent.process_msg = sink_input_process_msg_cb;
2026    u->sink_input->pop = sink_input_pop_cb;
2027    u->sink_input->process_rewind = sink_input_process_rewind_cb;
2028    u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
2029    u->sink_input->update_max_request = sink_input_update_max_request_cb;
2030    u->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
2031    u->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
2032    u->sink_input->update_sink_fixed_latency = sink_input_update_sink_fixed_latency_cb;
2033    u->sink_input->kill = sink_input_kill_cb;
2034    u->sink_input->attach = sink_input_attach_cb;
2035    u->sink_input->detach = sink_input_detach_cb;
2036    u->sink_input->state_change = sink_input_state_change_cb;
2037    u->sink_input->may_move_to = sink_input_may_move_to_cb;
2038    u->sink_input->moving = sink_input_moving_cb;
2039    if (!u->use_volume_sharing)
2040        u->sink_input->volume_changed = sink_input_volume_changed_cb;
2041    u->sink_input->mute_changed = sink_input_mute_changed_cb;
2042    u->sink_input->userdata = u;
2043
2044    u->sink->input_to_master = u->sink_input;
2045
2046    pa_sink_input_get_silence(u->sink_input, &silence);
2047
2048    u->source_memblockq = pa_memblockq_new("module-echo-cancel source_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
2049        &source_output_ss, 1, 1, 0, &silence);
2050    u->sink_memblockq = pa_memblockq_new("module-echo-cancel sink_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
2051        &sink_ss, 0, 1, 0, &silence);
2052
2053    pa_memblock_unref(silence.memblock);
2054
2055    if (!u->source_memblockq || !u->sink_memblockq) {
2056        pa_log("Failed to create memblockq.");
2057        goto fail;
2058    }
2059
2060    if (u->adjust_time > 0 && !u->ec->params.drift_compensation)
2061        u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
2062    else if (u->ec->params.drift_compensation) {
2063        pa_log_info("Canceller does drift compensation -- built-in compensation will be disabled");
2064        u->adjust_time = 0;
2065        /* Perform resync just once to give the canceller a leg up */
2066        pa_atomic_store(&u->request_resync, 1);
2067    }
2068
2069    if (u->save_aec) {
2070        pa_log("Creating AEC files in /tmp");
2071        u->captured_file = fopen("/tmp/aec_rec.sw", "wb");
2072        if (u->captured_file == NULL)
2073            perror ("fopen failed");
2074        u->played_file = fopen("/tmp/aec_play.sw", "wb");
2075        if (u->played_file == NULL)
2076            perror ("fopen failed");
2077        u->canceled_file = fopen("/tmp/aec_out.sw", "wb");
2078        if (u->canceled_file == NULL)
2079            perror ("fopen failed");
2080        if (u->ec->params.drift_compensation) {
2081            u->drift_file = fopen("/tmp/aec_drift.txt", "w");
2082            if (u->drift_file == NULL)
2083                perror ("fopen failed");
2084        }
2085    }
2086
2087    u->ec->msg = pa_msgobject_new(pa_echo_canceller_msg);
2088    u->ec->msg->parent.process_msg = canceller_process_msg_cb;
2089    u->ec->msg->userdata = u;
2090
2091    u->thread_info.current_volume = u->source->reference_volume;
2092
2093    /* We don't want to deal with too many chunks at a time */
2094    blocksize_usec = pa_bytes_to_usec(u->source_blocksize, &u->source->sample_spec);
2095    if (u->source->flags & PA_SOURCE_DYNAMIC_LATENCY)
2096        pa_source_set_latency_range(u->source, blocksize_usec, blocksize_usec * MAX_LATENCY_BLOCKS);
2097    pa_source_output_set_requested_latency(u->source_output, blocksize_usec * MAX_LATENCY_BLOCKS);
2098
2099    blocksize_usec = pa_bytes_to_usec(u->sink_blocksize, &u->sink->sample_spec);
2100    if (u->sink->flags & PA_SINK_DYNAMIC_LATENCY)
2101        pa_sink_set_latency_range(u->sink, blocksize_usec, blocksize_usec * MAX_LATENCY_BLOCKS);
2102    pa_sink_input_set_requested_latency(u->sink_input, blocksize_usec * MAX_LATENCY_BLOCKS);
2103
2104    /* The order here is important. The input/output must be put first,
2105     * otherwise streams might attach to the sink/source before the
2106     * sink input or source output is attached to the master. */
2107    pa_sink_input_put(u->sink_input);
2108    pa_source_output_put(u->source_output);
2109
2110    pa_sink_put(u->sink);
2111    pa_source_put(u->source);
2112
2113    pa_source_output_cork(u->source_output, false);
2114    pa_sink_input_cork(u->sink_input, false);
2115
2116    pa_modargs_free(ma);
2117
2118    return 0;
2119
2120fail:
2121    if (ma)
2122        pa_modargs_free(ma);
2123
2124    pa__done(m);
2125
2126    return -1;
2127}
2128
2129/* Called from main context. */
2130int pa__get_n_used(pa_module *m) {
2131    struct userdata *u;
2132
2133    pa_assert(m);
2134    pa_assert_se(u = m->userdata);
2135
2136    return pa_sink_linked_by(u->sink) + pa_source_linked_by(u->source);
2137}
2138
2139/* Called from main context. */
2140void pa__done(pa_module*m) {
2141    struct userdata *u;
2142
2143    pa_assert(m);
2144
2145    if (!(u = m->userdata))
2146        return;
2147
2148    u->dead = true;
2149
2150    /* See comments in source_output_kill_cb() above regarding
2151     * destruction order! */
2152
2153    if (u->time_event)
2154        u->core->mainloop->time_free(u->time_event);
2155
2156    if (u->source_output)
2157        pa_source_output_cork(u->source_output, true);
2158    if (u->sink_input)
2159        pa_sink_input_cork(u->sink_input, true);
2160
2161    if (u->source)
2162        pa_source_unlink(u->source);
2163    if (u->sink)
2164        pa_sink_unlink(u->sink);
2165
2166    if (u->source_output) {
2167        pa_source_output_unlink(u->source_output);
2168        pa_source_output_unref(u->source_output);
2169    }
2170
2171    if (u->sink_input) {
2172        pa_sink_input_unlink(u->sink_input);
2173        pa_sink_input_unref(u->sink_input);
2174    }
2175
2176    if (u->source)
2177        pa_source_unref(u->source);
2178    if (u->sink)
2179        pa_sink_unref(u->sink);
2180
2181    if (u->source_memblockq)
2182        pa_memblockq_free(u->source_memblockq);
2183    if (u->sink_memblockq)
2184        pa_memblockq_free(u->sink_memblockq);
2185
2186    if (u->ec) {
2187        if (u->ec->done)
2188            u->ec->done(u->ec);
2189
2190        if (u->ec->msg) {
2191            u->ec->msg->dead = true;
2192            pa_echo_canceller_msg_unref(u->ec->msg);
2193        }
2194
2195        pa_xfree(u->ec);
2196    }
2197
2198    if (u->asyncmsgq)
2199        pa_asyncmsgq_unref(u->asyncmsgq);
2200
2201    if (u->save_aec) {
2202        if (u->played_file)
2203            fclose(u->played_file);
2204        if (u->captured_file)
2205            fclose(u->captured_file);
2206        if (u->canceled_file)
2207            fclose(u->canceled_file);
2208        if (u->drift_file)
2209            fclose(u->drift_file);
2210    }
2211
2212    pa_xfree(u);
2213}
2214
2215#ifdef ECHO_CANCEL_TEST
2216/*
2217 * Stand-alone test program for running in the canceller on pre-recorded files.
2218 */
2219int main(int argc, char* argv[]) {
2220    struct userdata u;
2221    pa_sample_spec source_output_ss, source_ss, sink_ss;
2222    pa_channel_map source_output_map, source_map, sink_map;
2223    pa_modargs *ma = NULL;
2224    uint8_t *rdata = NULL, *pdata = NULL, *cdata = NULL;
2225    int unused PA_GCC_UNUSED;
2226    int ret = 0, i;
2227    char c;
2228    float drift;
2229    uint32_t nframes;
2230
2231    if (!getenv("MAKE_CHECK"))
2232        pa_log_set_level(PA_LOG_DEBUG);
2233
2234    pa_memzero(&u, sizeof(u));
2235
2236    if (argc < 4 || argc > 7) {
2237        goto usage;
2238    }
2239
2240    u.captured_file = fopen(argv[2], "rb");
2241    if (u.captured_file == NULL) {
2242        perror ("Could not open capture file");
2243        goto fail;
2244    }
2245    u.played_file = fopen(argv[1], "rb");
2246    if (u.played_file == NULL) {
2247        perror ("Could not open play file");
2248        goto fail;
2249    }
2250    u.canceled_file = fopen(argv[3], "wb");
2251    if (u.canceled_file == NULL) {
2252        perror ("Could not open canceled file");
2253        goto fail;
2254    }
2255
2256    u.core = pa_xnew0(pa_core, 1);
2257    u.core->cpu_info.cpu_type = PA_CPU_X86;
2258    u.core->cpu_info.flags.x86 |= PA_CPU_X86_SSE;
2259
2260    if (!(ma = pa_modargs_new(argc > 4 ? argv[4] : NULL, valid_modargs))) {
2261        pa_log("Failed to parse module arguments.");
2262        goto fail;
2263    }
2264
2265    source_ss.format = PA_SAMPLE_FLOAT32LE;
2266    source_ss.rate = DEFAULT_RATE;
2267    source_ss.channels = DEFAULT_CHANNELS;
2268    pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2269
2270    sink_ss.format = PA_SAMPLE_FLOAT32LE;
2271    sink_ss.rate = DEFAULT_RATE;
2272    sink_ss.channels = DEFAULT_CHANNELS;
2273    pa_channel_map_init_auto(&sink_map, sink_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2274
2275    if (init_common(ma, &u, &source_ss, &source_map) < 0)
2276        goto fail;
2277
2278    source_output_ss = source_ss;
2279    source_output_map = source_map;
2280
2281    if (!u.ec->init(u.core, u.ec, &source_output_ss, &source_output_map, &sink_ss, &sink_map, &source_ss, &source_map, &nframes,
2282                     pa_modargs_get_value(ma, "aec_args", NULL))) {
2283        pa_log("Failed to init AEC engine");
2284        goto fail;
2285    }
2286    u.source_output_blocksize = nframes * pa_frame_size(&source_output_ss);
2287    u.source_blocksize = nframes * pa_frame_size(&source_ss);
2288    u.sink_blocksize = nframes * pa_frame_size(&sink_ss);
2289
2290    if (u.ec->params.drift_compensation) {
2291        if (argc < 6) {
2292            pa_log("Drift compensation enabled but drift file not specified");
2293            goto fail;
2294        }
2295
2296        u.drift_file = fopen(argv[5], "rt");
2297
2298        if (u.drift_file == NULL) {
2299            perror ("Could not open drift file");
2300            goto fail;
2301        }
2302    }
2303
2304    rdata = pa_xmalloc(u.source_output_blocksize);
2305    pdata = pa_xmalloc(u.sink_blocksize);
2306    cdata = pa_xmalloc(u.source_blocksize);
2307
2308    if (!u.ec->params.drift_compensation) {
2309        while (fread(rdata, u.source_output_blocksize, 1, u.captured_file) > 0) {
2310            if (fread(pdata, u.sink_blocksize, 1, u.played_file) == 0) {
2311                perror("Played file ended before captured file");
2312                goto fail;
2313            }
2314
2315            u.ec->run(u.ec, rdata, pdata, cdata);
2316
2317            unused = fwrite(cdata, u.source_blocksize, 1, u.canceled_file);
2318        }
2319    } else {
2320        while (fscanf(u.drift_file, "%c", &c) > 0) {
2321            switch (c) {
2322                case 'd':
2323                    if (!fscanf(u.drift_file, "%a", &drift)) {
2324                        perror("Drift file incomplete");
2325                        goto fail;
2326                    }
2327
2328                    u.ec->set_drift(u.ec, drift);
2329
2330                    break;
2331
2332                case 'c':
2333                    if (!fscanf(u.drift_file, "%d", &i)) {
2334                        perror("Drift file incomplete");
2335                        goto fail;
2336                    }
2337
2338                    if (fread(rdata, i, 1, u.captured_file) <= 0) {
2339                        perror("Captured file ended prematurely");
2340                        goto fail;
2341                    }
2342
2343                    u.ec->record(u.ec, rdata, cdata);
2344
2345                    unused = fwrite(cdata, i, 1, u.canceled_file);
2346
2347                    break;
2348
2349                case 'p':
2350                    if (!fscanf(u.drift_file, "%d", &i)) {
2351                        perror("Drift file incomplete");
2352                        goto fail;
2353                    }
2354
2355                    if (fread(pdata, i, 1, u.played_file) <= 0) {
2356                        perror("Played file ended prematurely");
2357                        goto fail;
2358                    }
2359
2360                    u.ec->play(u.ec, pdata);
2361
2362                    break;
2363            }
2364        }
2365
2366        if (fread(rdata, i, 1, u.captured_file) > 0)
2367            pa_log("All capture data was not consumed");
2368        if (fread(pdata, i, 1, u.played_file) > 0)
2369            pa_log("All playback data was not consumed");
2370    }
2371
2372    u.ec->done(u.ec);
2373    u.ec->msg->dead = true;
2374    pa_echo_canceller_msg_unref(u.ec->msg);
2375
2376out:
2377    if (u.captured_file)
2378        fclose(u.captured_file);
2379    if (u.played_file)
2380        fclose(u.played_file);
2381    if (u.canceled_file)
2382        fclose(u.canceled_file);
2383    if (u.drift_file)
2384        fclose(u.drift_file);
2385
2386    pa_xfree(rdata);
2387    pa_xfree(pdata);
2388    pa_xfree(cdata);
2389
2390    pa_xfree(u.ec);
2391    pa_xfree(u.core);
2392
2393    if (ma)
2394        pa_modargs_free(ma);
2395
2396    return ret;
2397
2398usage:
2399    pa_log("Usage: %s play_file rec_file out_file [module args] [drift_file]", argv[0]);
2400
2401fail:
2402    ret = -1;
2403    goto out;
2404}
2405#endif /* ECHO_CANCEL_TEST */
2406