xref: /third_party/pulseaudio/src/pulse/stream.c (revision 53a5a1b3)
1/***
2  This file is part of PulseAudio.
3
4  Copyright 2004-2006 Lennart Poettering
5  Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7  PulseAudio is free software; you can redistribute it and/or modify
8  it under the terms of the GNU Lesser General Public License as published
9  by the Free Software Foundation; either version 2.1 of the License,
10  or (at your option) any later version.
11
12  PulseAudio is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  General Public License for more details.
16
17  You should have received a copy of the GNU Lesser General Public License
18  along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19***/
20
21#ifdef HAVE_CONFIG_H
22#include <config.h>
23#endif
24
25#include <string.h>
26#include <stdio.h>
27#include <string.h>
28
29#include <pulse/def.h>
30#include <pulse/timeval.h>
31#include <pulse/rtclock.h>
32#include <pulse/xmalloc.h>
33#include <pulse/fork-detect.h>
34
35#include <pulsecore/pstream-util.h>
36#include <pulsecore/sample-util.h>
37#include <pulsecore/log.h>
38#include <pulsecore/hashmap.h>
39#include <pulsecore/macro.h>
40#include <pulsecore/core-rtclock.h>
41#include <pulsecore/core-util.h>
42
43#include "internal.h"
44#include "stream.h"
45
46/* #define STREAM_DEBUG */
47
48#define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
49#define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50
51#define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52#ifndef USE_SMOOTHER_2
53#define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
54#define SMOOTHER_MIN_HISTORY (4)
55#endif
56
57pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
58    return pa_stream_new_with_proplist(c, name, ss, map, NULL);
59}
60
61static void reset_callbacks(pa_stream *s) {
62    s->read_callback = NULL;
63    s->read_userdata = NULL;
64    s->write_callback = NULL;
65    s->write_userdata = NULL;
66    s->state_callback = NULL;
67    s->state_userdata = NULL;
68    s->overflow_callback = NULL;
69    s->overflow_userdata = NULL;
70    s->underflow_callback = NULL;
71    s->underflow_userdata = NULL;
72    s->latency_update_callback = NULL;
73    s->latency_update_userdata = NULL;
74    s->moved_callback = NULL;
75    s->moved_userdata = NULL;
76    s->suspended_callback = NULL;
77    s->suspended_userdata = NULL;
78    s->started_callback = NULL;
79    s->started_userdata = NULL;
80    s->event_callback = NULL;
81    s->event_userdata = NULL;
82    s->buffer_attr_callback = NULL;
83    s->buffer_attr_userdata = NULL;
84    s->underflow_ohos_callback = NULL;
85    s->underflow_ohos_userdata = NULL;
86}
87
88static pa_stream *pa_stream_new_with_proplist_internal(
89        pa_context *c,
90        const char *name,
91        const pa_sample_spec *ss,
92        const pa_channel_map *map,
93        pa_format_info * const *formats,
94        unsigned int n_formats,
95        pa_proplist *p) {
96
97    pa_stream *s;
98    unsigned int i;
99
100    pa_assert(c);
101    pa_assert(PA_REFCNT_VALUE(c) >= 1);
102    pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
103    pa_assert(n_formats < PA_MAX_FORMATS);
104
105    PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
106    PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
107
108    s = pa_xnew(pa_stream, 1);
109    PA_REFCNT_INIT(s);
110    s->context = c;
111    s->mainloop = c->mainloop;
112
113    s->direction = PA_STREAM_NODIRECTION;
114    s->state = PA_STREAM_UNCONNECTED;
115    s->flags = 0;
116
117    if (ss)
118        s->sample_spec = *ss;
119    else
120        pa_sample_spec_init(&s->sample_spec);
121
122    if (map)
123        s->channel_map = *map;
124    else
125        pa_channel_map_init(&s->channel_map);
126
127    s->n_formats = 0;
128    if (formats) {
129        s->n_formats = n_formats;
130        for (i = 0; i < n_formats; i++)
131            s->req_formats[i] = pa_format_info_copy(formats[i]);
132    }
133
134    /* We'll get the final negotiated format after connecting */
135    s->format = NULL;
136
137    s->direct_on_input = PA_INVALID_INDEX;
138
139    s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
140    if (name)
141        pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
142
143    s->channel = 0;
144    s->channel_valid = false;
145    s->syncid = c->csyncid++;
146    s->stream_index = PA_INVALID_INDEX;
147
148    s->requested_bytes = 0;
149    memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
150
151    /* We initialize the target length here, so that if the user
152     * passes no explicit buffering metrics the default is similar to
153     * what older PA versions provided. */
154
155    s->buffer_attr.maxlength = (uint32_t) -1;
156    if (ss)
157        s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
158    else {
159        /* FIXME: We assume a worst-case compressed format corresponding to
160         * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
161        pa_sample_spec tmp_ss = {
162            .format   = PA_SAMPLE_S16NE,
163            .rate     = 48000,
164            .channels = 2,
165        };
166        s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
167    }
168    s->buffer_attr.minreq = (uint32_t) -1;
169    s->buffer_attr.prebuf = (uint32_t) -1;
170    s->buffer_attr.fragsize = (uint32_t) -1;
171
172    s->device_index = PA_INVALID_INDEX;
173    s->device_name = NULL;
174    s->suspended = false;
175    s->corked = false;
176
177    s->write_memblock = NULL;
178    s->write_data = NULL;
179
180    pa_memchunk_reset(&s->peek_memchunk);
181    s->peek_data = NULL;
182    s->record_memblockq = NULL;
183
184    memset(&s->timing_info, 0, sizeof(s->timing_info));
185    s->timing_info_valid = false;
186
187    s->previous_time = 0;
188    s->latest_underrun_at_index = -1;
189
190    s->read_index_not_before = 0;
191    s->write_index_not_before = 0;
192    for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
193        s->write_index_corrections[i].valid = 0;
194    s->current_write_index_correction = 0;
195
196    s->auto_timing_update_event = NULL;
197    s->auto_timing_update_requested = false;
198    s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
199
200    reset_callbacks(s);
201
202    s->smoother = NULL;
203
204    /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
205    PA_LLIST_PREPEND(pa_stream, c->streams, s);
206    pa_stream_ref(s);
207
208    return s;
209}
210
211pa_stream *pa_stream_new_with_proplist(
212        pa_context *c,
213        const char *name,
214        const pa_sample_spec *ss,
215        const pa_channel_map *map,
216        pa_proplist *p) {
217
218    pa_channel_map tmap;
219
220    PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
221    PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
222    PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
223    PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
224    PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
225
226    if (!map)
227        PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
228
229    return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
230}
231
232pa_stream *pa_stream_new_extended(
233        pa_context *c,
234        const char *name,
235        pa_format_info * const *formats,
236        unsigned int n_formats,
237        pa_proplist *p) {
238
239    PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
240
241    return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
242}
243
244static void stream_unlink(pa_stream *s) {
245    pa_operation *o, *n;
246    pa_assert(s);
247
248    if (!s->context)
249        return;
250
251    /* Detach from context */
252
253    /* Unref all operation objects that point to us */
254    for (o = s->context->operations; o; o = n) {
255        n = o->next;
256
257        if (o->stream == s)
258            pa_operation_cancel(o);
259    }
260
261    /* Drop all outstanding replies for this stream */
262    if (s->context->pdispatch)
263        pa_pdispatch_unregister_reply(s->context->pdispatch, s);
264
265    if (s->channel_valid) {
266        pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
267        s->channel = 0;
268        s->channel_valid = false;
269    }
270
271    PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
272    pa_stream_unref(s);
273
274    s->context = NULL;
275
276    if (s->auto_timing_update_event) {
277        pa_assert(s->mainloop);
278        s->mainloop->time_free(s->auto_timing_update_event);
279    }
280
281    reset_callbacks(s);
282}
283
284static void stream_free(pa_stream *s) {
285    unsigned int i;
286
287    pa_assert(s);
288
289    stream_unlink(s);
290
291    if (s->write_memblock) {
292        if (s->write_data)
293            pa_memblock_release(s->write_memblock);
294        pa_memblock_unref(s->write_memblock);
295    }
296
297    if (s->peek_memchunk.memblock) {
298        if (s->peek_data)
299            pa_memblock_release(s->peek_memchunk.memblock);
300        pa_memblock_unref(s->peek_memchunk.memblock);
301    }
302
303    if (s->record_memblockq)
304        pa_memblockq_free(s->record_memblockq);
305
306    if (s->proplist)
307        pa_proplist_free(s->proplist);
308
309    if (s->smoother)
310#ifdef USE_SMOOTHER_2
311        pa_smoother_2_free(s->smoother);
312#else
313        pa_smoother_free(s->smoother);
314#endif
315
316    for (i = 0; i < s->n_formats; i++)
317        pa_format_info_free(s->req_formats[i]);
318
319    if (s->format)
320        pa_format_info_free(s->format);
321
322    pa_xfree(s->device_name);
323    pa_xfree(s);
324}
325
326void pa_stream_unref(pa_stream *s) {
327    pa_assert(s);
328    pa_assert(PA_REFCNT_VALUE(s) >= 1);
329
330    if (PA_REFCNT_DEC(s) <= 0)
331        stream_free(s);
332}
333
334pa_stream* pa_stream_ref(pa_stream *s) {
335    pa_assert(s);
336    pa_assert(PA_REFCNT_VALUE(s) >= 1);
337
338    PA_REFCNT_INC(s);
339    return s;
340}
341
342pa_stream_state_t pa_stream_get_state(const pa_stream *s) {
343    pa_assert(s);
344    pa_assert(PA_REFCNT_VALUE(s) >= 1);
345
346    return s->state;
347}
348
349void pa_stream_terminate(pa_stream *s) {
350    pa_stream_set_state(s, PA_STREAM_TERMINATED);
351}
352
353pa_context* pa_stream_get_context(const pa_stream *s) {
354    pa_assert(s);
355    pa_assert(PA_REFCNT_VALUE(s) >= 1);
356
357    return s->context;
358}
359
360uint32_t pa_stream_get_index(const pa_stream *s) {
361    pa_assert(s);
362    pa_assert(PA_REFCNT_VALUE(s) >= 1);
363
364    PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
365    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
366
367    return s->stream_index;
368}
369
370void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
371    pa_assert(s);
372    pa_assert(PA_REFCNT_VALUE(s) >= 1);
373
374    if (s->state == st)
375        return;
376
377    pa_stream_ref(s);
378
379    s->state = st;
380
381    if (s->state_callback)
382        s->state_callback(s, s->state_userdata);
383
384    if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
385        stream_unlink(s);
386
387    pa_stream_unref(s);
388}
389
390static void request_auto_timing_update(pa_stream *s, bool force) {
391    pa_assert(s);
392    pa_assert(PA_REFCNT_VALUE(s) >= 1);
393
394    if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
395        return;
396
397    if (s->state == PA_STREAM_READY &&
398        (force || !s->auto_timing_update_requested)) {
399        pa_operation *o;
400
401#ifdef STREAM_DEBUG
402        pa_log_debug("Automatically requesting new timing data");
403#endif
404
405        if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
406            pa_operation_unref(o);
407            s->auto_timing_update_requested = true;
408        }
409    }
410
411    if (s->auto_timing_update_event) {
412        if (s->suspended && !force) {
413            pa_assert(s->mainloop);
414            s->mainloop->time_free(s->auto_timing_update_event);
415            s->auto_timing_update_event = NULL;
416        } else {
417            if (force)
418                s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
419
420            pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
421
422            s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
423        }
424    }
425}
426
427void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
428    pa_context *c = userdata;
429    pa_stream *s;
430    uint32_t channel;
431
432    pa_assert(pd);
433    pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
434    pa_assert(t);
435    pa_assert(c);
436    pa_assert(PA_REFCNT_VALUE(c) >= 1);
437
438    pa_context_ref(c);
439
440    if (pa_tagstruct_getu32(t, &channel) < 0 ||
441        !pa_tagstruct_eof(t)) {
442        pa_context_fail(c, PA_ERR_PROTOCOL);
443        goto finish;
444    }
445
446    if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
447        goto finish;
448
449    if (s->state != PA_STREAM_READY)
450        goto finish;
451
452    pa_context_set_error(c, PA_ERR_KILLED);
453    pa_stream_set_state(s, PA_STREAM_FAILED);
454
455finish:
456    pa_context_unref(c);
457}
458
459static void check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop) {
460    pa_usec_t x;
461
462    pa_assert(s);
463    pa_assert(!force_start || !force_stop);
464
465    if (!s->smoother)
466        return;
467
468    x = pa_rtclock_now();
469
470    if (s->timing_info_valid) {
471        if (aposteriori)
472            x -= s->timing_info.transport_usec;
473        else
474            x += s->timing_info.transport_usec;
475    }
476
477    if (s->suspended || s->corked || force_stop)
478#ifdef USE_SMOOTHER_2
479        pa_smoother_2_pause(s->smoother, x);
480#else
481        pa_smoother_pause(s->smoother, x);
482#endif
483    else if (force_start || s->buffer_attr.prebuf == 0) {
484
485        if (!s->timing_info_valid &&
486            !aposteriori &&
487            !force_start &&
488            !force_stop &&
489            s->context->version >= 13) {
490
491            /* If the server supports STARTED events we take them as
492             * indications when audio really starts/stops playing, if
493             * we don't have any timing info yet -- instead of trying
494             * to be smart and guessing the server time. Otherwise the
495             * unknown transport delay adds too much noise to our time
496             * calculations. */
497
498            return;
499        }
500
501#ifdef USE_SMOOTHER_2
502        pa_smoother_2_resume(s->smoother, x);
503#else
504        pa_smoother_resume(s->smoother, x, true);
505#endif
506    }
507
508    /* Please note that we have no idea if playback actually started
509     * if prebuf is non-zero! */
510}
511
512static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
513
514void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
515    pa_context *c = userdata;
516    pa_stream *s;
517    uint32_t channel;
518    const char *dn;
519    bool suspended;
520    uint32_t di;
521    pa_usec_t usec = 0;
522    uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
523
524    pa_assert(pd);
525    pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
526    pa_assert(t);
527    pa_assert(c);
528    pa_assert(PA_REFCNT_VALUE(c) >= 1);
529
530    pa_context_ref(c);
531
532    if (c->version < 12) {
533        pa_context_fail(c, PA_ERR_PROTOCOL);
534        goto finish;
535    }
536
537    if (pa_tagstruct_getu32(t, &channel) < 0 ||
538        pa_tagstruct_getu32(t, &di) < 0 ||
539        pa_tagstruct_gets(t, &dn) < 0 ||
540        pa_tagstruct_get_boolean(t, &suspended) < 0) {
541        pa_context_fail(c, PA_ERR_PROTOCOL);
542        goto finish;
543    }
544
545    if (c->version >= 13) {
546
547        if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
548            if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
549                pa_tagstruct_getu32(t, &fragsize) < 0 ||
550                pa_tagstruct_get_usec(t, &usec) < 0) {
551                pa_context_fail(c, PA_ERR_PROTOCOL);
552                goto finish;
553            }
554        } else {
555            if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
556                pa_tagstruct_getu32(t, &tlength) < 0 ||
557                pa_tagstruct_getu32(t, &prebuf) < 0 ||
558                pa_tagstruct_getu32(t, &minreq) < 0 ||
559                pa_tagstruct_get_usec(t, &usec) < 0) {
560                pa_context_fail(c, PA_ERR_PROTOCOL);
561                goto finish;
562            }
563        }
564    }
565
566    if (!pa_tagstruct_eof(t)) {
567        pa_context_fail(c, PA_ERR_PROTOCOL);
568        goto finish;
569    }
570
571    if (!dn || di == PA_INVALID_INDEX) {
572        pa_context_fail(c, PA_ERR_PROTOCOL);
573        goto finish;
574    }
575
576    if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
577        goto finish;
578
579    if (s->state != PA_STREAM_READY)
580        goto finish;
581
582    if (c->version >= 13) {
583        if (s->direction == PA_STREAM_RECORD)
584            s->timing_info.configured_source_usec = usec;
585        else
586            s->timing_info.configured_sink_usec = usec;
587
588        s->buffer_attr.maxlength = maxlength;
589        s->buffer_attr.fragsize = fragsize;
590        s->buffer_attr.tlength = tlength;
591        s->buffer_attr.prebuf = prebuf;
592        s->buffer_attr.minreq = minreq;
593    }
594
595    pa_xfree(s->device_name);
596    s->device_name = pa_xstrdup(dn);
597    s->device_index = di;
598
599    s->suspended = suspended;
600
601    if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
602        s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
603        s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
604        request_auto_timing_update(s, true);
605    }
606
607    check_smoother_status(s, true, false, false);
608    request_auto_timing_update(s, true);
609
610    if (s->moved_callback)
611        s->moved_callback(s, s->moved_userdata);
612
613finish:
614    pa_context_unref(c);
615}
616
617void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
618    pa_context *c = userdata;
619    pa_stream *s;
620    uint32_t channel;
621    pa_usec_t usec = 0;
622    uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
623
624    pa_assert(pd);
625    pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
626    pa_assert(t);
627    pa_assert(c);
628    pa_assert(PA_REFCNT_VALUE(c) >= 1);
629
630    pa_context_ref(c);
631
632    if (c->version < 15) {
633        pa_context_fail(c, PA_ERR_PROTOCOL);
634        goto finish;
635    }
636
637    if (pa_tagstruct_getu32(t, &channel) < 0) {
638        pa_context_fail(c, PA_ERR_PROTOCOL);
639        goto finish;
640    }
641
642    if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
643        if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
644            pa_tagstruct_getu32(t, &fragsize) < 0 ||
645            pa_tagstruct_get_usec(t, &usec) < 0) {
646            pa_context_fail(c, PA_ERR_PROTOCOL);
647            goto finish;
648        }
649    } else {
650        if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
651            pa_tagstruct_getu32(t, &tlength) < 0 ||
652            pa_tagstruct_getu32(t, &prebuf) < 0 ||
653            pa_tagstruct_getu32(t, &minreq) < 0 ||
654            pa_tagstruct_get_usec(t, &usec) < 0) {
655            pa_context_fail(c, PA_ERR_PROTOCOL);
656            goto finish;
657        }
658    }
659
660    if (!pa_tagstruct_eof(t)) {
661        pa_context_fail(c, PA_ERR_PROTOCOL);
662        goto finish;
663    }
664
665    if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
666        goto finish;
667
668    if (s->state != PA_STREAM_READY)
669        goto finish;
670
671    if (s->direction == PA_STREAM_RECORD)
672        s->timing_info.configured_source_usec = usec;
673    else
674        s->timing_info.configured_sink_usec = usec;
675
676    s->buffer_attr.maxlength = maxlength;
677    s->buffer_attr.fragsize = fragsize;
678    s->buffer_attr.tlength = tlength;
679    s->buffer_attr.prebuf = prebuf;
680    s->buffer_attr.minreq = minreq;
681
682    request_auto_timing_update(s, true);
683
684    if (s->buffer_attr_callback)
685        s->buffer_attr_callback(s, s->buffer_attr_userdata);
686
687finish:
688    pa_context_unref(c);
689}
690
691void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
692    pa_context *c = userdata;
693    pa_stream *s;
694    uint32_t channel;
695    bool suspended;
696
697    pa_assert(pd);
698    pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
699    pa_assert(t);
700    pa_assert(c);
701    pa_assert(PA_REFCNT_VALUE(c) >= 1);
702
703    pa_context_ref(c);
704
705    if (c->version < 12) {
706        pa_context_fail(c, PA_ERR_PROTOCOL);
707        goto finish;
708    }
709
710    if (pa_tagstruct_getu32(t, &channel) < 0 ||
711        pa_tagstruct_get_boolean(t, &suspended) < 0 ||
712        !pa_tagstruct_eof(t)) {
713        pa_context_fail(c, PA_ERR_PROTOCOL);
714        goto finish;
715    }
716
717    if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
718        goto finish;
719
720    if (s->state != PA_STREAM_READY)
721        goto finish;
722
723    s->suspended = suspended;
724
725    if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
726        s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
727        s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
728        request_auto_timing_update(s, true);
729    }
730
731    check_smoother_status(s, true, false, false);
732    request_auto_timing_update(s, true);
733
734    if (s->suspended_callback)
735        s->suspended_callback(s, s->suspended_userdata);
736
737finish:
738    pa_context_unref(c);
739}
740
741void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
742    pa_context *c = userdata;
743    pa_stream *s;
744    uint32_t channel;
745
746    pa_assert(pd);
747    pa_assert(command == PA_COMMAND_STARTED);
748    pa_assert(t);
749    pa_assert(c);
750    pa_assert(PA_REFCNT_VALUE(c) >= 1);
751
752    pa_context_ref(c);
753
754    if (c->version < 13) {
755        pa_context_fail(c, PA_ERR_PROTOCOL);
756        goto finish;
757    }
758
759    if (pa_tagstruct_getu32(t, &channel) < 0 ||
760        !pa_tagstruct_eof(t)) {
761        pa_context_fail(c, PA_ERR_PROTOCOL);
762        goto finish;
763    }
764
765    if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
766        goto finish;
767
768    if (s->state != PA_STREAM_READY)
769        goto finish;
770
771    check_smoother_status(s, true, true, false);
772    request_auto_timing_update(s, true);
773
774    if (s->started_callback)
775        s->started_callback(s, s->started_userdata);
776
777finish:
778    pa_context_unref(c);
779}
780
781void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
782    pa_context *c = userdata;
783    pa_stream *s;
784    uint32_t channel;
785    pa_proplist *pl = NULL;
786    const char *event;
787
788    pa_assert(pd);
789    pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
790    pa_assert(t);
791    pa_assert(c);
792    pa_assert(PA_REFCNT_VALUE(c) >= 1);
793
794    pa_context_ref(c);
795
796    if (c->version < 15) {
797        pa_context_fail(c, PA_ERR_PROTOCOL);
798        goto finish;
799    }
800
801    pl = pa_proplist_new();
802
803    if (pa_tagstruct_getu32(t, &channel) < 0 ||
804        pa_tagstruct_gets(t, &event) < 0 ||
805        pa_tagstruct_get_proplist(t, pl) < 0 ||
806        !pa_tagstruct_eof(t) || !event) {
807        pa_context_fail(c, PA_ERR_PROTOCOL);
808        goto finish;
809    }
810
811    if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
812        goto finish;
813
814    if (s->state != PA_STREAM_READY)
815        goto finish;
816
817    if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
818        /* Let client know what the running time was when the stream had to be killed  */
819        pa_usec_t stream_time;
820        if (pa_stream_get_time(s, &stream_time) == 0)
821            pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
822    }
823
824    if (s->event_callback)
825        s->event_callback(s, event, pl, s->event_userdata);
826
827finish:
828    pa_context_unref(c);
829
830    if (pl)
831        pa_proplist_free(pl);
832}
833
834void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
835    pa_stream *s;
836    pa_context *c = userdata;
837    uint32_t bytes, channel;
838
839    pa_assert(pd);
840    pa_assert(command == PA_COMMAND_REQUEST);
841    pa_assert(t);
842    pa_assert(c);
843    pa_assert(PA_REFCNT_VALUE(c) >= 1);
844
845    pa_context_ref(c);
846
847    if (pa_tagstruct_getu32(t, &channel) < 0 ||
848        pa_tagstruct_getu32(t, &bytes) < 0 ||
849        !pa_tagstruct_eof(t)) {
850        pa_context_fail(c, PA_ERR_PROTOCOL);
851        goto finish;
852    }
853
854    if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
855        goto finish;
856
857    if (s->state != PA_STREAM_READY)
858        goto finish;
859
860    s->requested_bytes += bytes;
861
862#ifdef STREAM_DEBUG
863    pa_log_debug("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes);
864#endif
865
866    if (s->requested_bytes > 0 && s->write_callback)
867        s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
868
869finish:
870    pa_context_unref(c);
871}
872
873int64_t pa_stream_get_underflow_index(const pa_stream *p) {
874    pa_assert(p);
875    return p->latest_underrun_at_index;
876}
877
878void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
879    pa_stream *s;
880    pa_context *c = userdata;
881    uint32_t channel;
882    int64_t offset = -1;
883
884    pa_assert(pd);
885    pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW
886        || command == PA_COMMAND_UNDERFLOW_OHOS);
887    pa_assert(t);
888    pa_assert(c);
889    pa_assert(PA_REFCNT_VALUE(c) >= 1);
890
891    pa_context_ref(c);
892
893    if (pa_tagstruct_getu32(t, &channel) < 0) {
894        pa_context_fail(c, PA_ERR_PROTOCOL);
895        goto finish;
896    }
897
898    if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
899        if (pa_tagstruct_gets64(t, &offset) < 0) {
900            pa_context_fail(c, PA_ERR_PROTOCOL);
901            goto finish;
902        }
903    }
904
905    if (!pa_tagstruct_eof(t)) {
906        pa_context_fail(c, PA_ERR_PROTOCOL);
907        goto finish;
908    }
909
910    if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
911        goto finish;
912
913    if (s->state != PA_STREAM_READY)
914        goto finish;
915
916    if (command == PA_COMMAND_UNDERFLOW_OHOS) {
917        if (s->underflow_ohos_callback) {
918            s->underflow_ohos_callback(s, s->underflow_ohos_userdata);
919        }
920        goto finish;
921    }
922
923    if (offset != -1)
924        s->latest_underrun_at_index = offset;
925
926    if (s->buffer_attr.prebuf > 0)
927        check_smoother_status(s, true, false, true);
928
929    request_auto_timing_update(s, true);
930
931    if (command == PA_COMMAND_OVERFLOW) {
932        if (s->overflow_callback)
933            s->overflow_callback(s, s->overflow_userdata);
934    } else if (command == PA_COMMAND_UNDERFLOW) {
935        if (s->underflow_callback)
936            s->underflow_callback(s, s->underflow_userdata);
937    }
938
939finish:
940    pa_context_unref(c);
941}
942
943static void invalidate_indexes(pa_stream *s, bool r, bool w) {
944    pa_assert(s);
945    pa_assert(PA_REFCNT_VALUE(s) >= 1);
946
947#ifdef STREAM_DEBUG
948    pa_log_debug("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag);
949#endif
950
951    if (s->state != PA_STREAM_READY)
952        return;
953
954    if (w) {
955        s->write_index_not_before = s->context->ctag;
956
957        if (s->timing_info_valid)
958            s->timing_info.write_index_corrupt = true;
959
960#ifdef STREAM_DEBUG
961        pa_log_debug("write_index invalidated");
962#endif
963    }
964
965    if (r) {
966        s->read_index_not_before = s->context->ctag;
967
968        if (s->timing_info_valid)
969            s->timing_info.read_index_corrupt = true;
970
971#ifdef STREAM_DEBUG
972        pa_log_debug("read_index invalidated");
973#endif
974    }
975
976    request_auto_timing_update(s, true);
977}
978
979static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
980    pa_stream *s = userdata;
981
982    pa_assert(s);
983    pa_assert(PA_REFCNT_VALUE(s) >= 1);
984
985    pa_stream_ref(s);
986    request_auto_timing_update(s, false);
987    pa_stream_unref(s);
988}
989
990static void create_stream_complete(pa_stream *s) {
991    pa_assert(s);
992    pa_assert(PA_REFCNT_VALUE(s) >= 1);
993    pa_assert(s->state == PA_STREAM_CREATING);
994
995    pa_stream_set_state(s, PA_STREAM_READY);
996
997    if (s->requested_bytes > 0 && s->write_callback)
998        s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
999
1000    if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
1001        s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
1002        pa_assert(!s->auto_timing_update_event);
1003        s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
1004
1005        request_auto_timing_update(s, true);
1006    }
1007
1008    check_smoother_status(s, true, false, false);
1009}
1010
1011static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
1012    const char *e;
1013
1014    pa_assert(s);
1015    pa_assert(attr);
1016
1017    if ((e = getenv("PULSE_LATENCY_MSEC"))) {
1018        uint32_t ms;
1019        pa_sample_spec ss;
1020
1021        pa_sample_spec_init(&ss);
1022
1023        if (pa_sample_spec_valid(&s->sample_spec))
1024            ss = s->sample_spec;
1025        else if (s->n_formats == 1)
1026            pa_format_info_to_sample_spec(s->req_formats[0], &ss, NULL);
1027
1028        if (pa_atou(e, &ms) < 0 || ms <= 0)
1029            pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
1030        else if (!pa_sample_spec_valid(&s->sample_spec))
1031            pa_log_debug("Ignoring $PULSE_LATENCY_MSEC: %s (invalid sample spec)", e);
1032        else {
1033            attr->maxlength = (uint32_t) -1;
1034            attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &ss);
1035            attr->minreq = (uint32_t) -1;
1036            attr->prebuf = (uint32_t) -1;
1037            attr->fragsize = attr->tlength;
1038
1039            if (flags)
1040                *flags |= PA_STREAM_ADJUST_LATENCY;
1041        }
1042    }
1043
1044    if (s->context->version >= 13)
1045        return;
1046
1047    /* Version older than 0.9.10 didn't do server side buffer_attr
1048     * selection, hence we have to fake it on the client side. */
1049
1050    /* We choose fairly conservative values here, to not confuse
1051     * old clients with extremely large playback buffers */
1052
1053    if (attr->maxlength == (uint32_t) -1)
1054        attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1055
1056    if (attr->tlength == (uint32_t) -1)
1057        attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1058
1059    if (attr->minreq == (uint32_t) -1)
1060        attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1061
1062    if (attr->prebuf == (uint32_t) -1)
1063        attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1064
1065    if (attr->fragsize == (uint32_t) -1)
1066        attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1067}
1068
1069void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1070    pa_stream *s = userdata;
1071    uint32_t requested_bytes = 0;
1072
1073    pa_assert(pd);
1074    pa_assert(s);
1075    pa_assert(PA_REFCNT_VALUE(s) >= 1);
1076    pa_assert(s->state == PA_STREAM_CREATING);
1077
1078    pa_stream_ref(s);
1079
1080    if (command != PA_COMMAND_REPLY) {
1081        if (pa_context_handle_error(s->context, command, t, false) < 0)
1082            goto finish;
1083
1084        pa_stream_set_state(s, PA_STREAM_FAILED);
1085        goto finish;
1086    }
1087
1088    if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1089        s->channel == PA_INVALID_INDEX ||
1090        ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1091        ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1092        pa_context_fail(s->context, PA_ERR_PROTOCOL);
1093        goto finish;
1094    }
1095
1096    s->requested_bytes = (int64_t) requested_bytes;
1097
1098    if (s->context->version >= 9) {
1099        if (s->direction == PA_STREAM_PLAYBACK) {
1100            if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1101                pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1102                pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1103                pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1104                pa_context_fail(s->context, PA_ERR_PROTOCOL);
1105                goto finish;
1106            }
1107        } else if (s->direction == PA_STREAM_RECORD) {
1108            if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1109                pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1110                pa_context_fail(s->context, PA_ERR_PROTOCOL);
1111                goto finish;
1112            }
1113        }
1114    }
1115
1116    if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1117        pa_sample_spec ss;
1118        pa_channel_map cm;
1119        const char *dn = NULL;
1120        bool suspended;
1121
1122        if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1123            pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1124            pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1125            pa_tagstruct_gets(t, &dn) < 0 ||
1126            pa_tagstruct_get_boolean(t, &suspended) < 0) {
1127            pa_context_fail(s->context, PA_ERR_PROTOCOL);
1128            goto finish;
1129        }
1130
1131        if (!dn || s->device_index == PA_INVALID_INDEX ||
1132            ss.channels != cm.channels ||
1133            !pa_channel_map_valid(&cm) ||
1134            !pa_sample_spec_valid(&ss) ||
1135            (s->n_formats == 0 && (
1136                (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1137                (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1138                (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1139            pa_context_fail(s->context, PA_ERR_PROTOCOL);
1140            goto finish;
1141        }
1142
1143        pa_xfree(s->device_name);
1144        s->device_name = pa_xstrdup(dn);
1145        s->suspended = suspended;
1146
1147        s->channel_map = cm;
1148        s->sample_spec = ss;
1149    }
1150
1151#ifdef USE_SMOOTHER_2
1152    if (s->flags & PA_STREAM_INTERPOLATE_TIMING)
1153        pa_smoother_2_set_sample_spec(s->smoother, pa_rtclock_now(), &s->sample_spec);
1154#endif
1155
1156    if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1157        pa_usec_t usec;
1158
1159        if (pa_tagstruct_get_usec(t, &usec) < 0) {
1160            pa_context_fail(s->context, PA_ERR_PROTOCOL);
1161            goto finish;
1162        }
1163
1164        if (s->direction == PA_STREAM_RECORD)
1165            s->timing_info.configured_source_usec = usec;
1166        else
1167            s->timing_info.configured_sink_usec = usec;
1168    }
1169
1170    if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1171        || s->context->version >= 22) {
1172
1173        pa_format_info *f = pa_format_info_new();
1174
1175        if (pa_tagstruct_get_format_info(t, f) < 0 || !pa_format_info_valid(f)) {
1176            pa_format_info_free(f);
1177            if (s->n_formats > 0) {
1178                /* We used the extended API, so we should have got back a proper format */
1179                pa_context_fail(s->context, PA_ERR_PROTOCOL);
1180                goto finish;
1181            }
1182        } else
1183            s->format = f;
1184    }
1185
1186    if (!pa_tagstruct_eof(t)) {
1187        pa_context_fail(s->context, PA_ERR_PROTOCOL);
1188        goto finish;
1189    }
1190
1191    if (s->direction == PA_STREAM_RECORD) {
1192        pa_assert(!s->record_memblockq);
1193
1194        s->record_memblockq = pa_memblockq_new(
1195                "client side record memblockq",
1196                0,
1197                s->buffer_attr.maxlength,
1198                0,
1199                &s->sample_spec,
1200                1,
1201                0,
1202                0,
1203                NULL);
1204    }
1205
1206    s->channel_valid = true;
1207    pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1208
1209    create_stream_complete(s);
1210
1211finish:
1212    pa_stream_unref(s);
1213}
1214
1215static int create_stream(
1216        pa_stream_direction_t direction,
1217        pa_stream *s,
1218        const char *dev,
1219        const pa_buffer_attr *attr,
1220        pa_stream_flags_t flags,
1221        const pa_cvolume *volume,
1222        pa_stream *sync_stream) {
1223
1224    pa_tagstruct *t;
1225    uint32_t tag;
1226    bool volume_set = !!volume;
1227    pa_cvolume cv;
1228    uint32_t i;
1229
1230    pa_assert(s);
1231    pa_assert(PA_REFCNT_VALUE(s) >= 1);
1232    pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1233
1234    PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1235    PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1236    PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1237    PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1238                                              PA_STREAM_INTERPOLATE_TIMING|
1239                                              PA_STREAM_NOT_MONOTONIC|
1240                                              PA_STREAM_AUTO_TIMING_UPDATE|
1241                                              PA_STREAM_NO_REMAP_CHANNELS|
1242                                              PA_STREAM_NO_REMIX_CHANNELS|
1243                                              PA_STREAM_FIX_FORMAT|
1244                                              PA_STREAM_FIX_RATE|
1245                                              PA_STREAM_FIX_CHANNELS|
1246                                              PA_STREAM_DONT_MOVE|
1247                                              PA_STREAM_VARIABLE_RATE|
1248                                              PA_STREAM_PEAK_DETECT|
1249                                              PA_STREAM_START_MUTED|
1250                                              PA_STREAM_ADJUST_LATENCY|
1251                                              PA_STREAM_EARLY_REQUESTS|
1252                                              PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1253                                              PA_STREAM_START_UNMUTED|
1254                                              PA_STREAM_FAIL_ON_SUSPEND|
1255                                              PA_STREAM_RELATIVE_VOLUME|
1256                                              PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1257
1258    PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1259    PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1260    PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1261    /* Although some of the other flags are not supported on older
1262     * version, we don't check for them here, because it doesn't hurt
1263     * when they are passed but actually not supported. This makes
1264     * client development easier */
1265
1266    PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1267    PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1268    PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1269
1270    pa_stream_ref(s);
1271
1272    s->direction = direction;
1273
1274    if (sync_stream)
1275        s->syncid = sync_stream->syncid;
1276
1277    if (attr)
1278        s->buffer_attr = *attr;
1279    patch_buffer_attr(s, &s->buffer_attr, &flags);
1280
1281    s->flags = flags;
1282    s->corked = !!(flags & PA_STREAM_START_CORKED);
1283
1284    if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1285        pa_usec_t x;
1286
1287        x = pa_rtclock_now();
1288
1289        pa_assert(!s->smoother);
1290#ifdef USE_SMOOTHER_2
1291        s->smoother = pa_smoother_2_new(SMOOTHER_HISTORY_TIME, x, 0, 0);
1292#else
1293        s->smoother = pa_smoother_new(
1294                SMOOTHER_ADJUST_TIME,
1295                SMOOTHER_HISTORY_TIME,
1296                !(flags & PA_STREAM_NOT_MONOTONIC),
1297                true,
1298                SMOOTHER_MIN_HISTORY,
1299                x,
1300                true);
1301#endif
1302    }
1303
1304    if (!dev)
1305        dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1306
1307    t = pa_tagstruct_command(
1308            s->context,
1309            (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1310            &tag);
1311
1312    if (s->context->version < 13)
1313        pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1314
1315    pa_tagstruct_put(
1316            t,
1317            PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1318            PA_TAG_CHANNEL_MAP, &s->channel_map,
1319            PA_TAG_U32, PA_INVALID_INDEX,
1320            PA_TAG_STRING, dev,
1321            PA_TAG_U32, s->buffer_attr.maxlength,
1322            PA_TAG_BOOLEAN, s->corked,
1323            PA_TAG_INVALID);
1324
1325    if (!volume) {
1326        if (pa_sample_spec_valid(&s->sample_spec))
1327            volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1328        else {
1329            /* This is not really relevant, since no volume was set, and
1330             * the real number of channels is embedded in the format_info
1331             * structure */
1332            volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1333        }
1334    }
1335
1336    if (s->direction == PA_STREAM_PLAYBACK) {
1337        pa_tagstruct_put(
1338                t,
1339                PA_TAG_U32, s->buffer_attr.tlength,
1340                PA_TAG_U32, s->buffer_attr.prebuf,
1341                PA_TAG_U32, s->buffer_attr.minreq,
1342                PA_TAG_U32, s->syncid,
1343                PA_TAG_INVALID);
1344
1345        pa_tagstruct_put_cvolume(t, volume);
1346    } else
1347        pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1348
1349    if (s->context->version >= 12) {
1350        pa_tagstruct_put(
1351                t,
1352                PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1353                PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1354                PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1355                PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1356                PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1357                PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1358                PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1359                PA_TAG_INVALID);
1360    }
1361
1362    if (s->context->version >= 13) {
1363
1364        if (s->direction == PA_STREAM_PLAYBACK)
1365            pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1366        else
1367            pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1368
1369        pa_tagstruct_put(
1370                t,
1371                PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1372                PA_TAG_PROPLIST, s->proplist,
1373                PA_TAG_INVALID);
1374
1375        if (s->direction == PA_STREAM_RECORD)
1376            pa_tagstruct_putu32(t, s->direct_on_input);
1377    }
1378
1379    if (s->context->version >= 14) {
1380
1381        if (s->direction == PA_STREAM_PLAYBACK)
1382            pa_tagstruct_put_boolean(t, volume_set);
1383
1384        pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1385    }
1386
1387    if (s->context->version >= 15) {
1388
1389        if (s->direction == PA_STREAM_PLAYBACK)
1390            pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1391
1392        pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1393        pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1394    }
1395
1396    if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1397        pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1398
1399    if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1400        pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1401
1402    if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1403        || s->context->version >= 22) {
1404
1405        pa_tagstruct_putu8(t, s->n_formats);
1406        for (i = 0; i < s->n_formats; i++)
1407            pa_tagstruct_put_format_info(t, s->req_formats[i]);
1408    }
1409
1410    if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1411        pa_tagstruct_put_cvolume(t, volume);
1412        pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1413        pa_tagstruct_put_boolean(t, volume_set);
1414        pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1415        pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1416        pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1417    }
1418
1419    pa_pstream_send_tagstruct(s->context->pstream, t);
1420    pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1421
1422    pa_stream_set_state(s, PA_STREAM_CREATING);
1423
1424    pa_stream_unref(s);
1425    return 0;
1426}
1427
1428int pa_stream_connect_playback(
1429        pa_stream *s,
1430        const char *dev,
1431        const pa_buffer_attr *attr,
1432        pa_stream_flags_t flags,
1433        const pa_cvolume *volume,
1434        pa_stream *sync_stream) {
1435
1436    pa_assert(s);
1437    pa_assert(PA_REFCNT_VALUE(s) >= 1);
1438
1439    return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1440}
1441
1442int pa_stream_connect_record(
1443        pa_stream *s,
1444        const char *dev,
1445        const pa_buffer_attr *attr,
1446        pa_stream_flags_t flags) {
1447
1448    pa_assert(s);
1449    pa_assert(PA_REFCNT_VALUE(s) >= 1);
1450
1451    return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1452}
1453
1454int pa_stream_begin_write(
1455        pa_stream *s,
1456        void **data,
1457        size_t *nbytes) {
1458
1459    pa_assert(s);
1460    pa_assert(PA_REFCNT_VALUE(s) >= 1);
1461
1462    PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1463    PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1464    PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1465    PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1466    PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1467
1468    if (*nbytes != (size_t) -1) {
1469        size_t m, fs;
1470
1471        m = pa_mempool_block_size_max(s->context->mempool);
1472        fs = pa_frame_size(&s->sample_spec);
1473
1474        m = (m / fs) * fs;
1475        if (*nbytes > m)
1476            *nbytes = m;
1477    }
1478
1479    if (!s->write_memblock) {
1480        s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1481        s->write_data = pa_memblock_acquire(s->write_memblock);
1482    }
1483
1484    *data = s->write_data;
1485    *nbytes = pa_memblock_get_length(s->write_memblock);
1486
1487    return 0;
1488}
1489
1490int pa_stream_cancel_write(
1491        pa_stream *s) {
1492
1493    pa_assert(s);
1494    pa_assert(PA_REFCNT_VALUE(s) >= 1);
1495
1496    PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1497    PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1498    PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1499    PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1500
1501    pa_assert(s->write_data);
1502
1503    pa_memblock_release(s->write_memblock);
1504    pa_memblock_unref(s->write_memblock);
1505    s->write_memblock = NULL;
1506    s->write_data = NULL;
1507
1508    return 0;
1509}
1510
1511int pa_stream_write_ext_free(
1512        pa_stream *s,
1513        const void *data,
1514        size_t length,
1515        pa_free_cb_t free_cb,
1516        void *free_cb_data,
1517        int64_t offset,
1518        pa_seek_mode_t seek) {
1519
1520    pa_assert(s);
1521    pa_assert(PA_REFCNT_VALUE(s) >= 1);
1522    pa_assert(data);
1523
1524    PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1525    PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1526    PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1527    PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1528    PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1529    PA_CHECK_VALIDITY(s->context,
1530                      !s->write_memblock ||
1531                      ((data >= s->write_data) &&
1532                       ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1533                      PA_ERR_INVALID);
1534    PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1535    PA_CHECK_VALIDITY(s->context, length % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1536    PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1537
1538    if (s->write_memblock) {
1539        pa_memchunk chunk;
1540
1541        /* pa_stream_write_begin() was called before */
1542
1543        pa_memblock_release(s->write_memblock);
1544
1545        chunk.memblock = s->write_memblock;
1546        chunk.index = (const char *) data - (const char *) s->write_data;
1547        chunk.length = length;
1548
1549        s->write_memblock = NULL;
1550        s->write_data = NULL;
1551
1552        pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1553        pa_memblock_unref(chunk.memblock);
1554
1555    } else {
1556        pa_seek_mode_t t_seek = seek;
1557        int64_t t_offset = offset;
1558        size_t t_length = length;
1559        const void *t_data = data;
1560
1561        /* pa_stream_write_begin() was not called before */
1562
1563        while (t_length > 0) {
1564            pa_memchunk chunk;
1565
1566            chunk.index = 0;
1567
1568            if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1569                chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, free_cb_data, 1);
1570                chunk.length = t_length;
1571            } else {
1572                void *d;
1573                size_t blk_size_max;
1574
1575                /* Break large audio streams into _aligned_ blocks or the
1576                 * other endpoint will happily discard them upon arrival. */
1577                blk_size_max = pa_frame_align(pa_mempool_block_size_max(s->context->mempool), &s->sample_spec);
1578                chunk.length = PA_MIN(t_length, blk_size_max);
1579                chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1580
1581                d = pa_memblock_acquire(chunk.memblock);
1582                memcpy(d, t_data, chunk.length);
1583                pa_memblock_release(chunk.memblock);
1584            }
1585
1586            pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1587
1588            t_offset = 0;
1589            t_seek = PA_SEEK_RELATIVE;
1590
1591            t_data = (const uint8_t*) t_data + chunk.length;
1592            t_length -= chunk.length;
1593
1594            pa_memblock_unref(chunk.memblock);
1595        }
1596
1597        if (free_cb && pa_pstream_get_shm(s->context->pstream))
1598            free_cb(free_cb_data);
1599    }
1600
1601    /* This is obviously wrong since we ignore the seeking index . But
1602     * that's OK, the server side applies the same error */
1603    s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1604
1605#ifdef STREAM_DEBUG
1606    pa_log_debug("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes);
1607#endif
1608
1609    if (s->direction == PA_STREAM_PLAYBACK) {
1610
1611        /* Update latency request correction */
1612        if (s->write_index_corrections[s->current_write_index_correction].valid) {
1613
1614            if (seek == PA_SEEK_ABSOLUTE) {
1615                s->write_index_corrections[s->current_write_index_correction].corrupt = false;
1616                s->write_index_corrections[s->current_write_index_correction].absolute = true;
1617                s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1618            } else if (seek == PA_SEEK_RELATIVE) {
1619                if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1620                    s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1621            } else
1622                s->write_index_corrections[s->current_write_index_correction].corrupt = true;
1623        }
1624
1625        /* Update the write index in the already available latency data */
1626        if (s->timing_info_valid) {
1627
1628            if (seek == PA_SEEK_ABSOLUTE) {
1629                s->timing_info.write_index_corrupt = false;
1630                s->timing_info.write_index = offset + (int64_t) length;
1631            } else if (seek == PA_SEEK_RELATIVE) {
1632                if (!s->timing_info.write_index_corrupt)
1633                    s->timing_info.write_index += offset + (int64_t) length;
1634            } else
1635                s->timing_info.write_index_corrupt = true;
1636        }
1637
1638        if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1639            request_auto_timing_update(s, true);
1640    }
1641
1642    return 0;
1643}
1644
1645int pa_stream_write(
1646        pa_stream *s,
1647        const void *data,
1648        size_t length,
1649        pa_free_cb_t free_cb,
1650        int64_t offset,
1651        pa_seek_mode_t seek) {
1652
1653    return pa_stream_write_ext_free(s, data, length, free_cb, (void*) data, offset, seek);
1654}
1655
1656int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1657    pa_assert(s);
1658    pa_assert(PA_REFCNT_VALUE(s) >= 1);
1659    pa_assert(data);
1660    pa_assert(length);
1661
1662    PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1663    PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1664    PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1665
1666    if (!s->peek_memchunk.memblock) {
1667
1668        if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1669            /* record_memblockq is empty. */
1670            *data = NULL;
1671            *length = 0;
1672            return 0;
1673
1674        } else if (!s->peek_memchunk.memblock) {
1675            /* record_memblockq isn't empty, but it doesn't have any data at
1676             * the current read index. */
1677            *data = NULL;
1678            *length = s->peek_memchunk.length;
1679            return 0;
1680        }
1681
1682        s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1683    }
1684
1685    pa_assert(s->peek_data);
1686    *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1687    *length = s->peek_memchunk.length;
1688    return 0;
1689}
1690
1691int pa_stream_drop(pa_stream *s) {
1692    pa_assert(s);
1693    pa_assert(PA_REFCNT_VALUE(s) >= 1);
1694
1695    PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1696    PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1697    PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1698    PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
1699
1700    pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1701
1702    /* Fix the simulated local read index */
1703    if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1704        s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1705
1706    if (s->peek_memchunk.memblock) {
1707        pa_assert(s->peek_data);
1708        s->peek_data = NULL;
1709        pa_memblock_release(s->peek_memchunk.memblock);
1710        pa_memblock_unref(s->peek_memchunk.memblock);
1711    }
1712
1713    pa_memchunk_reset(&s->peek_memchunk);
1714
1715    return 0;
1716}
1717
1718size_t pa_stream_writable_size(const pa_stream *s) {
1719    pa_assert(s);
1720    pa_assert(PA_REFCNT_VALUE(s) >= 1);
1721
1722    PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1723    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1724    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1725
1726    return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1727}
1728
1729size_t pa_stream_readable_size(const pa_stream *s) {
1730    pa_assert(s);
1731    pa_assert(PA_REFCNT_VALUE(s) >= 1);
1732
1733    PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1734    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1735    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1736
1737    return pa_memblockq_get_length(s->record_memblockq);
1738}
1739
1740pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1741    pa_operation *o;
1742    pa_tagstruct *t;
1743    uint32_t tag;
1744
1745    pa_assert(s);
1746    pa_assert(PA_REFCNT_VALUE(s) >= 1);
1747
1748    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1749    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1750    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1751
1752    /* Ask for a timing update before we cork/uncork to get the best
1753     * accuracy for the transport latency suitable for the
1754     * check_smoother_status() call in the started callback */
1755    request_auto_timing_update(s, true);
1756
1757    o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1758
1759    t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1760    pa_tagstruct_putu32(t, s->channel);
1761    pa_pstream_send_tagstruct(s->context->pstream, t);
1762    pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1763
1764    /* This might cause the read index to continue again, hence
1765     * let's request a timing update */
1766    request_auto_timing_update(s, true);
1767
1768    return o;
1769}
1770
1771static pa_usec_t calc_time(const pa_stream *s, bool ignore_transport) {
1772    pa_usec_t usec;
1773
1774    pa_assert(s);
1775    pa_assert(PA_REFCNT_VALUE(s) >= 1);
1776    pa_assert(s->state == PA_STREAM_READY);
1777    pa_assert(s->direction != PA_STREAM_UPLOAD);
1778    pa_assert(s->timing_info_valid);
1779    pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1780    pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1781
1782    if (s->direction == PA_STREAM_PLAYBACK) {
1783        /* The last byte that was written into the output device
1784         * had this time value associated */
1785        usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1786
1787        if (!s->corked && !s->suspended) {
1788
1789            if (!ignore_transport)
1790                /* Because the latency info took a little time to come
1791                 * to us, we assume that the real output time is actually
1792                 * a little ahead */
1793                usec += s->timing_info.transport_usec;
1794
1795            /* However, the output device usually maintains a buffer
1796               too, hence the real sample currently played is a little
1797               back  */
1798            if (s->timing_info.sink_usec >= usec)
1799                usec = 0;
1800            else
1801                usec -= s->timing_info.sink_usec;
1802        }
1803
1804    } else {
1805        pa_assert(s->direction == PA_STREAM_RECORD);
1806
1807        /* The last byte written into the server side queue had
1808         * this time value associated */
1809        usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1810
1811        if (!s->corked && !s->suspended) {
1812
1813            if (!ignore_transport)
1814                /* Add transport latency */
1815                usec += s->timing_info.transport_usec;
1816
1817            /* Add latency of data in device buffer */
1818            usec += s->timing_info.source_usec;
1819
1820            /* If this is a monitor source, we need to correct the
1821             * time by the playback device buffer */
1822            if (s->timing_info.sink_usec >= usec)
1823                usec = 0;
1824            else
1825                usec -= s->timing_info.sink_usec;
1826        }
1827    }
1828
1829    return usec;
1830}
1831
1832#ifdef USE_SMOOTHER_2
1833static inline uint64_t calc_bytes(pa_stream *s, bool ignore_transport) {
1834    return (uint64_t)(calc_time(s, ignore_transport) * s->sample_spec.rate / PA_USEC_PER_SEC * pa_frame_size(&s->sample_spec));
1835}
1836#endif
1837
1838static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1839    pa_operation *o = userdata;
1840    struct timeval local, remote, now;
1841    pa_timing_info *i;
1842    bool playing = false;
1843    uint64_t underrun_for = 0, playing_for = 0;
1844
1845    pa_assert(pd);
1846    pa_assert(o);
1847    pa_assert(PA_REFCNT_VALUE(o) >= 1);
1848
1849    if (!o->context || !o->stream)
1850        goto finish;
1851
1852    i = &o->stream->timing_info;
1853
1854    o->stream->timing_info_valid = false;
1855    i->write_index_corrupt = true;
1856    i->read_index_corrupt = true;
1857
1858    if (command != PA_COMMAND_REPLY) {
1859        if (pa_context_handle_error(o->context, command, t, false) < 0)
1860            goto finish;
1861
1862    } else {
1863
1864        if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1865            pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1866            pa_tagstruct_get_boolean(t, &playing) < 0 ||
1867            pa_tagstruct_get_timeval(t, &local) < 0 ||
1868            pa_tagstruct_get_timeval(t, &remote) < 0 ||
1869            pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1870            pa_tagstruct_gets64(t, &i->read_index) < 0) {
1871
1872            pa_context_fail(o->context, PA_ERR_PROTOCOL);
1873            goto finish;
1874        }
1875
1876        if (o->context->version >= 13 &&
1877            o->stream->direction == PA_STREAM_PLAYBACK)
1878            if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1879                pa_tagstruct_getu64(t, &playing_for) < 0) {
1880
1881                pa_context_fail(o->context, PA_ERR_PROTOCOL);
1882                goto finish;
1883            }
1884
1885        if (!pa_tagstruct_eof(t)) {
1886            pa_context_fail(o->context, PA_ERR_PROTOCOL);
1887            goto finish;
1888        }
1889        o->stream->timing_info_valid = true;
1890        i->write_index_corrupt = false;
1891        i->read_index_corrupt = false;
1892
1893        i->playing = (int) playing;
1894        i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1895
1896        pa_gettimeofday(&now);
1897
1898        /* Calculate timestamps */
1899        if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1900            /* local and remote seem to have synchronized clocks */
1901
1902            if (o->stream->direction == PA_STREAM_PLAYBACK)
1903                i->transport_usec = pa_timeval_diff(&remote, &local);
1904            else
1905                i->transport_usec = pa_timeval_diff(&now, &remote);
1906
1907            i->synchronized_clocks = true;
1908            i->timestamp = remote;
1909        } else {
1910            /* clocks are not synchronized, let's estimate latency then */
1911            i->transport_usec = pa_timeval_diff(&now, &local)/2;
1912            i->synchronized_clocks = false;
1913            i->timestamp = local;
1914            pa_timeval_add(&i->timestamp, i->transport_usec);
1915        }
1916
1917        /* Invalidate read and write indexes if necessary */
1918        if (tag < o->stream->read_index_not_before)
1919            i->read_index_corrupt = true;
1920
1921        if (tag < o->stream->write_index_not_before)
1922            i->write_index_corrupt = true;
1923
1924        if (o->stream->direction == PA_STREAM_PLAYBACK) {
1925            /* Write index correction */
1926
1927            int n, j;
1928            uint32_t ctag = tag;
1929
1930            /* Go through the saved correction values and add up the
1931             * total correction.*/
1932            for (n = 0, j = o->stream->current_write_index_correction+1;
1933                 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1934                 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1935
1936                /* Step over invalid data or out-of-date data */
1937                if (!o->stream->write_index_corrections[j].valid ||
1938                    o->stream->write_index_corrections[j].tag < ctag)
1939                    continue;
1940
1941                /* Make sure that everything is in order */
1942                ctag = o->stream->write_index_corrections[j].tag+1;
1943
1944                /* Now fix the write index */
1945                if (o->stream->write_index_corrections[j].corrupt) {
1946                    /* A corrupting seek was made */
1947                    i->write_index_corrupt = true;
1948                } else if (o->stream->write_index_corrections[j].absolute) {
1949                    /* An absolute seek was made */
1950                    i->write_index = o->stream->write_index_corrections[j].value;
1951                    i->write_index_corrupt = false;
1952                } else if (!i->write_index_corrupt) {
1953                    /* A relative seek was made */
1954                    i->write_index += o->stream->write_index_corrections[j].value;
1955                }
1956            }
1957
1958            /* Clear old correction entries */
1959            for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1960                if (!o->stream->write_index_corrections[n].valid)
1961                    continue;
1962
1963                if (o->stream->write_index_corrections[n].tag <= tag)
1964                    o->stream->write_index_corrections[n].valid = false;
1965            }
1966        }
1967
1968        if (o->stream->direction == PA_STREAM_RECORD) {
1969            /* Read index correction */
1970
1971            if (!i->read_index_corrupt)
1972                i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1973        }
1974
1975        /* Update smoother if we're not corked */
1976        if (o->stream->smoother && !o->stream->corked) {
1977            pa_usec_t u, x;
1978
1979            u = x = pa_rtclock_now() - i->transport_usec;
1980
1981            if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1982                pa_usec_t su;
1983
1984                /* If we weren't playing then it will take some time
1985                 * until the audio will actually come out through the
1986                 * speakers. Since we follow that timing here, we need
1987                 * to try to fix this up */
1988
1989                su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1990
1991                if (su < i->sink_usec)
1992                    x += i->sink_usec - su;
1993            }
1994
1995            if (!i->playing)
1996#ifdef USE_SMOOTHER_2
1997                pa_smoother_2_pause(o->stream->smoother, x);
1998#else
1999                pa_smoother_pause(o->stream->smoother, x);
2000#endif
2001
2002            /* Update the smoother */
2003            if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
2004                (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
2005#ifdef USE_SMOOTHER_2
2006                pa_smoother_2_put(o->stream->smoother, u, calc_bytes(o->stream, true));
2007#else
2008                pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, true));
2009#endif
2010
2011            if (i->playing)
2012#ifdef USE_SMOOTHER_2
2013                pa_smoother_2_resume(o->stream->smoother, x);
2014#else
2015                pa_smoother_resume(o->stream->smoother, x, true);
2016#endif
2017        }
2018    }
2019
2020    o->stream->auto_timing_update_requested = false;
2021
2022    if (o->stream->latency_update_callback)
2023        o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
2024
2025    if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
2026        pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2027        cb(o->stream, o->stream->timing_info_valid, o->userdata);
2028    }
2029
2030finish:
2031
2032    pa_operation_done(o);
2033    pa_operation_unref(o);
2034}
2035
2036pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2037    uint32_t tag;
2038    pa_operation *o;
2039    pa_tagstruct *t;
2040    struct timeval now;
2041    int cidx = 0;
2042
2043    pa_assert(s);
2044    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2045
2046    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2047    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2048    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2049
2050    if (s->direction == PA_STREAM_PLAYBACK) {
2051        /* Find a place to store the write_index correction data for this entry */
2052        cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
2053
2054        /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
2055        PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
2056    }
2057    o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2058
2059    t = pa_tagstruct_command(
2060            s->context,
2061            (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
2062            &tag);
2063    pa_tagstruct_putu32(t, s->channel);
2064    pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
2065
2066    pa_pstream_send_tagstruct(s->context->pstream, t);
2067    pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2068
2069    if (s->direction == PA_STREAM_PLAYBACK) {
2070        /* Fill in initial correction data */
2071
2072        s->current_write_index_correction = cidx;
2073
2074        s->write_index_corrections[cidx].valid = true;
2075        s->write_index_corrections[cidx].absolute = false;
2076        s->write_index_corrections[cidx].corrupt = false;
2077        s->write_index_corrections[cidx].tag = tag;
2078        s->write_index_corrections[cidx].value = 0;
2079    }
2080
2081    return o;
2082}
2083
2084void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2085    pa_stream *s = userdata;
2086
2087    pa_assert(pd);
2088    pa_assert(s);
2089    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2090
2091    pa_stream_ref(s);
2092
2093    if (command != PA_COMMAND_REPLY) {
2094        if (pa_context_handle_error(s->context, command, t, false) < 0)
2095            goto finish;
2096
2097        pa_stream_set_state(s, PA_STREAM_FAILED);
2098        goto finish;
2099    } else if (!pa_tagstruct_eof(t)) {
2100        pa_context_fail(s->context, PA_ERR_PROTOCOL);
2101        goto finish;
2102    }
2103
2104    pa_stream_set_state(s, PA_STREAM_TERMINATED);
2105
2106finish:
2107    pa_stream_unref(s);
2108}
2109
2110int pa_stream_disconnect(pa_stream *s) {
2111    pa_tagstruct *t;
2112    uint32_t tag;
2113
2114    pa_assert(s);
2115    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2116
2117    PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2118    PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2119    PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2120
2121    pa_stream_ref(s);
2122
2123    t = pa_tagstruct_command(
2124            s->context,
2125            (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2126                        (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2127            &tag);
2128    pa_tagstruct_putu32(t, s->channel);
2129    pa_pstream_send_tagstruct(s->context->pstream, t);
2130    pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2131
2132    pa_stream_unref(s);
2133    return 0;
2134}
2135
2136void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2137    pa_assert(s);
2138    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2139
2140    if (pa_detect_fork())
2141        return;
2142
2143    if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2144        return;
2145
2146    s->read_callback = cb;
2147    s->read_userdata = userdata;
2148}
2149
2150void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2151    pa_assert(s);
2152    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2153
2154    if (pa_detect_fork())
2155        return;
2156
2157    if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2158        return;
2159
2160    s->write_callback = cb;
2161    s->write_userdata = userdata;
2162}
2163
2164void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2165    pa_assert(s);
2166    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2167
2168    if (pa_detect_fork())
2169        return;
2170
2171    if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2172        return;
2173
2174    s->state_callback = cb;
2175    s->state_userdata = userdata;
2176}
2177
2178void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2179    pa_assert(s);
2180    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2181
2182    if (pa_detect_fork())
2183        return;
2184
2185    if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2186        return;
2187
2188    s->overflow_callback = cb;
2189    s->overflow_userdata = userdata;
2190}
2191
2192void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2193    pa_assert(s);
2194    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2195
2196    if (pa_detect_fork())
2197        return;
2198
2199    if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2200        return;
2201
2202    s->underflow_callback = cb;
2203    s->underflow_userdata = userdata;
2204}
2205
2206void pa_stream_set_underflow_ohos_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2207    pa_assert(s);
2208    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2209
2210    if (pa_detect_fork())
2211        return;
2212
2213    if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2214        return;
2215
2216    s->underflow_ohos_callback = cb;
2217    s->underflow_ohos_userdata = userdata;
2218}
2219
2220void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2221    pa_assert(s);
2222    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2223
2224    if (pa_detect_fork())
2225        return;
2226
2227    if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2228        return;
2229
2230    s->latency_update_callback = cb;
2231    s->latency_update_userdata = userdata;
2232}
2233
2234void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2235    pa_assert(s);
2236    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2237
2238    if (pa_detect_fork())
2239        return;
2240
2241    if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2242        return;
2243
2244    s->moved_callback = cb;
2245    s->moved_userdata = userdata;
2246}
2247
2248void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2249    pa_assert(s);
2250    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2251
2252    if (pa_detect_fork())
2253        return;
2254
2255    if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2256        return;
2257
2258    s->suspended_callback = cb;
2259    s->suspended_userdata = userdata;
2260}
2261
2262void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2263    pa_assert(s);
2264    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2265
2266    if (pa_detect_fork())
2267        return;
2268
2269    if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2270        return;
2271
2272    s->started_callback = cb;
2273    s->started_userdata = userdata;
2274}
2275
2276void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2277    pa_assert(s);
2278    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2279
2280    if (pa_detect_fork())
2281        return;
2282
2283    if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2284        return;
2285
2286    s->event_callback = cb;
2287    s->event_userdata = userdata;
2288}
2289
2290void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2291    pa_assert(s);
2292    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2293
2294    if (pa_detect_fork())
2295        return;
2296
2297    if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2298        return;
2299
2300    s->buffer_attr_callback = cb;
2301    s->buffer_attr_userdata = userdata;
2302}
2303
2304void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2305    pa_operation *o = userdata;
2306    int success = 1;
2307
2308    pa_assert(pd);
2309    pa_assert(o);
2310    pa_assert(PA_REFCNT_VALUE(o) >= 1);
2311
2312    if (!o->context)
2313        goto finish;
2314
2315    if (command != PA_COMMAND_REPLY) {
2316        if (pa_context_handle_error(o->context, command, t, false) < 0)
2317            goto finish;
2318
2319        success = 0;
2320    } else if (!pa_tagstruct_eof(t)) {
2321        pa_context_fail(o->context, PA_ERR_PROTOCOL);
2322        goto finish;
2323    }
2324
2325    if (o->callback) {
2326        pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2327        cb(o->stream, success, o->userdata);
2328    }
2329
2330finish:
2331    pa_operation_done(o);
2332    pa_operation_unref(o);
2333}
2334
2335pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2336    pa_operation *o;
2337    pa_tagstruct *t;
2338    uint32_t tag;
2339
2340    pa_assert(s);
2341    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2342
2343    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2344    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2345    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2346
2347    /* Ask for a timing update before we cork/uncork to get the best
2348     * accuracy for the transport latency suitable for the
2349     * check_smoother_status() call in the started callback */
2350    request_auto_timing_update(s, true);
2351
2352    s->corked = b;
2353
2354    o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2355
2356    t = pa_tagstruct_command(
2357            s->context,
2358            (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2359            &tag);
2360    pa_tagstruct_putu32(t, s->channel);
2361    pa_tagstruct_put_boolean(t, !!b);
2362    pa_pstream_send_tagstruct(s->context->pstream, t);
2363    pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2364
2365    check_smoother_status(s, false, false, false);
2366
2367    /* This might cause the indexes to hang/start again, hence let's
2368     * request a timing update, after the cork/uncork, too */
2369    request_auto_timing_update(s, true);
2370
2371    return o;
2372}
2373
2374static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2375    pa_tagstruct *t;
2376    pa_operation *o;
2377    uint32_t tag;
2378
2379    pa_assert(s);
2380    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2381
2382    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2383    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2384
2385    o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2386
2387    t = pa_tagstruct_command(s->context, command, &tag);
2388    pa_tagstruct_putu32(t, s->channel);
2389    pa_pstream_send_tagstruct(s->context->pstream, t);
2390    pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2391
2392    return o;
2393}
2394
2395pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2396    pa_operation *o;
2397
2398    pa_assert(s);
2399    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2400
2401    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2402    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2403    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2404
2405    /* Ask for a timing update *before* the flush, so that the
2406     * transport usec is as up to date as possible when we get the
2407     * underflow message and update the smoother status*/
2408    request_auto_timing_update(s, true);
2409
2410    if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2411        return NULL;
2412
2413    if (s->direction == PA_STREAM_PLAYBACK) {
2414
2415        if (s->write_index_corrections[s->current_write_index_correction].valid)
2416            s->write_index_corrections[s->current_write_index_correction].corrupt = true;
2417
2418        if (s->buffer_attr.prebuf > 0)
2419            check_smoother_status(s, false, false, true);
2420
2421        /* This will change the write index, but leave the
2422         * read index untouched. */
2423        invalidate_indexes(s, false, true);
2424
2425    } else
2426        /* For record streams this has no influence on the write
2427         * index, but the read index might jump. */
2428        invalidate_indexes(s, true, false);
2429
2430    /* Note that we do not update requested_bytes here. This is
2431     * because we cannot really know how data actually was dropped
2432     * from the write index due to this. This 'error' will be applied
2433     * by both client and server and hence we should be fine. */
2434
2435    return o;
2436}
2437
2438pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2439    pa_operation *o;
2440
2441    pa_assert(s);
2442    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2443
2444    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2445    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2446    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2447    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2448
2449    /* Ask for a timing update before we cork/uncork to get the best
2450     * accuracy for the transport latency suitable for the
2451     * check_smoother_status() call in the started callback */
2452    request_auto_timing_update(s, true);
2453
2454    if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2455        return NULL;
2456
2457    /* This might cause the read index to hang again, hence
2458     * let's request a timing update */
2459    request_auto_timing_update(s, true);
2460
2461    return o;
2462}
2463
2464pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2465    pa_operation *o;
2466
2467    pa_assert(s);
2468    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2469
2470    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2471    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2472    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2473    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2474
2475    /* Ask for a timing update before we cork/uncork to get the best
2476     * accuracy for the transport latency suitable for the
2477     * check_smoother_status() call in the started callback */
2478    request_auto_timing_update(s, true);
2479
2480    if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2481        return NULL;
2482
2483    /* This might cause the read index to start moving again, hence
2484     * let's request a timing update */
2485    request_auto_timing_update(s, true);
2486
2487    return o;
2488}
2489
2490pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2491    pa_operation *o;
2492
2493    pa_assert(s);
2494    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2495    pa_assert(name);
2496
2497    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2498    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2499    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2500
2501    if (s->context->version >= 13) {
2502        pa_proplist *p = pa_proplist_new();
2503
2504        pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2505        o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2506        pa_proplist_free(p);
2507    } else {
2508        pa_tagstruct *t;
2509        uint32_t tag;
2510
2511        o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2512        t = pa_tagstruct_command(
2513                s->context,
2514                (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2515                &tag);
2516        pa_tagstruct_putu32(t, s->channel);
2517        pa_tagstruct_puts(t, name);
2518        pa_pstream_send_tagstruct(s->context->pstream, t);
2519        pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2520    }
2521
2522    return o;
2523}
2524
2525int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2526    pa_usec_t usec;
2527
2528    pa_assert(s);
2529    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2530
2531    PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2532    PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2533    PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2534    PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2535    PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2536    PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2537
2538    if (s->smoother)
2539#ifdef USE_SMOOTHER_2
2540        usec = pa_smoother_2_get(s->smoother, pa_rtclock_now());
2541#else
2542        usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2543#endif
2544
2545    else
2546        usec = calc_time(s, false);
2547
2548    /* Make sure the time runs monotonically */
2549    if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2550        if (usec < s->previous_time)
2551            usec = s->previous_time;
2552        else
2553            s->previous_time = usec;
2554    }
2555
2556    if (r_usec)
2557        *r_usec = usec;
2558
2559    return 0;
2560}
2561
2562static pa_usec_t time_counter_diff(const pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2563    pa_assert(s);
2564    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2565
2566    if (negative)
2567        *negative = 0;
2568
2569    if (a >= b)
2570        return a-b;
2571    else {
2572        if (negative && s->direction == PA_STREAM_RECORD) {
2573            *negative = 1;
2574            return b-a;
2575        } else
2576            return 0;
2577    }
2578}
2579
2580int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2581    pa_usec_t t, c;
2582    int r;
2583    int64_t cindex;
2584
2585    pa_assert(s);
2586    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2587    pa_assert(r_usec);
2588
2589    PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2590    PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2591    PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2592    PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2593    PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2594    PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2595
2596    if ((r = pa_stream_get_time(s, &t)) < 0)
2597        return r;
2598
2599    if (s->direction == PA_STREAM_PLAYBACK)
2600        cindex = s->timing_info.write_index;
2601    else
2602        cindex = s->timing_info.read_index;
2603
2604    if (cindex < 0)
2605        cindex = 0;
2606
2607    c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2608
2609    if (s->direction == PA_STREAM_PLAYBACK)
2610        *r_usec = time_counter_diff(s, c, t, negative);
2611    else
2612        *r_usec = time_counter_diff(s, t, c, negative);
2613
2614    return 0;
2615}
2616
2617const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2618    pa_assert(s);
2619    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2620
2621    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2622    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2623    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2624    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2625
2626    return &s->timing_info;
2627}
2628
2629const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2630    pa_assert(s);
2631    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2632
2633    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2634
2635    return &s->sample_spec;
2636}
2637
2638const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2639    pa_assert(s);
2640    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2641
2642    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2643
2644    return &s->channel_map;
2645}
2646
2647const pa_format_info* pa_stream_get_format_info(const pa_stream *s) {
2648    pa_assert(s);
2649    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2650
2651    /* We don't have the format till routing is done */
2652    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2653    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2654
2655    return s->format;
2656}
2657const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2658    pa_assert(s);
2659    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2660
2661    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2662    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2663    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2664
2665    return &s->buffer_attr;
2666}
2667
2668static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2669    pa_operation *o = userdata;
2670    int success = 1;
2671
2672    pa_assert(pd);
2673    pa_assert(o);
2674    pa_assert(PA_REFCNT_VALUE(o) >= 1);
2675
2676    if (!o->context)
2677        goto finish;
2678
2679    if (command != PA_COMMAND_REPLY) {
2680        if (pa_context_handle_error(o->context, command, t, false) < 0)
2681            goto finish;
2682
2683        success = 0;
2684    } else {
2685        if (o->stream->direction == PA_STREAM_PLAYBACK) {
2686            if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2687                pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2688                pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2689                pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2690                pa_context_fail(o->context, PA_ERR_PROTOCOL);
2691                goto finish;
2692            }
2693        } else if (o->stream->direction == PA_STREAM_RECORD) {
2694            if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2695                pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2696                pa_context_fail(o->context, PA_ERR_PROTOCOL);
2697                goto finish;
2698            }
2699        }
2700
2701        if (o->stream->context->version >= 13) {
2702            pa_usec_t usec;
2703
2704            if (pa_tagstruct_get_usec(t, &usec) < 0) {
2705                pa_context_fail(o->context, PA_ERR_PROTOCOL);
2706                goto finish;
2707            }
2708
2709            if (o->stream->direction == PA_STREAM_RECORD)
2710                o->stream->timing_info.configured_source_usec = usec;
2711            else
2712                o->stream->timing_info.configured_sink_usec = usec;
2713        }
2714
2715        if (!pa_tagstruct_eof(t)) {
2716            pa_context_fail(o->context, PA_ERR_PROTOCOL);
2717            goto finish;
2718        }
2719    }
2720
2721    if (o->callback) {
2722        pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2723        cb(o->stream, success, o->userdata);
2724    }
2725
2726finish:
2727    pa_operation_done(o);
2728    pa_operation_unref(o);
2729}
2730
2731pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2732    pa_operation *o;
2733    pa_tagstruct *t;
2734    uint32_t tag;
2735    pa_buffer_attr copy;
2736
2737    pa_assert(s);
2738    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2739    pa_assert(attr);
2740
2741    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2742    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2743    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2744    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2745
2746    /* Ask for a timing update before we cork/uncork to get the best
2747     * accuracy for the transport latency suitable for the
2748     * check_smoother_status() call in the started callback */
2749    request_auto_timing_update(s, true);
2750
2751    o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2752
2753    t = pa_tagstruct_command(
2754            s->context,
2755            (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2756            &tag);
2757    pa_tagstruct_putu32(t, s->channel);
2758
2759    copy = *attr;
2760    patch_buffer_attr(s, &copy, NULL);
2761    attr = &copy;
2762
2763    pa_tagstruct_putu32(t, attr->maxlength);
2764
2765    if (s->direction == PA_STREAM_PLAYBACK)
2766        pa_tagstruct_put(
2767                t,
2768                PA_TAG_U32, attr->tlength,
2769                PA_TAG_U32, attr->prebuf,
2770                PA_TAG_U32, attr->minreq,
2771                PA_TAG_INVALID);
2772    else
2773        pa_tagstruct_putu32(t, attr->fragsize);
2774
2775    if (s->context->version >= 13)
2776        pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2777
2778    if (s->context->version >= 14)
2779        pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2780
2781    pa_pstream_send_tagstruct(s->context->pstream, t);
2782    pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2783
2784    /* This might cause changes in the read/write index, hence let's
2785     * request a timing update */
2786    request_auto_timing_update(s, true);
2787
2788    return o;
2789}
2790
2791uint32_t pa_stream_get_device_index(const pa_stream *s) {
2792    pa_assert(s);
2793    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2794
2795    PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2796    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2797    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2798    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2799    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2800
2801    return s->device_index;
2802}
2803
2804const char *pa_stream_get_device_name(const pa_stream *s) {
2805    pa_assert(s);
2806    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2807
2808    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2809    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2810    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2811    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2812    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2813
2814    return s->device_name;
2815}
2816
2817int pa_stream_is_suspended(const pa_stream *s) {
2818    pa_assert(s);
2819    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2820
2821    PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2822    PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2823    PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2824    PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2825
2826    return s->suspended;
2827}
2828
2829int pa_stream_is_corked(const pa_stream *s) {
2830    pa_assert(s);
2831    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2832
2833    PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2834    PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2835    PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2836
2837    return s->corked;
2838}
2839
2840static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2841    pa_operation *o = userdata;
2842    int success = 1;
2843
2844    pa_assert(pd);
2845    pa_assert(o);
2846    pa_assert(PA_REFCNT_VALUE(o) >= 1);
2847
2848    if (!o->context)
2849        goto finish;
2850
2851    if (command != PA_COMMAND_REPLY) {
2852        if (pa_context_handle_error(o->context, command, t, false) < 0)
2853            goto finish;
2854
2855        success = 0;
2856    } else {
2857
2858        if (!pa_tagstruct_eof(t)) {
2859            pa_context_fail(o->context, PA_ERR_PROTOCOL);
2860            goto finish;
2861        }
2862    }
2863
2864    o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2865#ifdef USE_SMOOTHER_2
2866    if (o->stream->smoother)
2867        pa_smoother_2_set_rate(o->stream->smoother, pa_rtclock_now(), o->stream->sample_spec.rate);
2868#endif
2869    pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2870
2871    if (o->callback) {
2872        pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2873        cb(o->stream, success, o->userdata);
2874    }
2875
2876finish:
2877    pa_operation_done(o);
2878    pa_operation_unref(o);
2879}
2880
2881pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2882    pa_operation *o;
2883    pa_tagstruct *t;
2884    uint32_t tag;
2885
2886    pa_assert(s);
2887    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2888
2889    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2890    PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_sample_rate_valid(rate), PA_ERR_INVALID);
2891    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2892    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2893    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2894    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2895
2896    o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2897    o->private = PA_UINT_TO_PTR(rate);
2898
2899    t = pa_tagstruct_command(
2900            s->context,
2901            (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2902            &tag);
2903    pa_tagstruct_putu32(t, s->channel);
2904    pa_tagstruct_putu32(t, rate);
2905
2906    pa_pstream_send_tagstruct(s->context->pstream, t);
2907    pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2908
2909    return o;
2910}
2911
2912pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2913    pa_operation *o;
2914    pa_tagstruct *t;
2915    uint32_t tag;
2916
2917    pa_assert(s);
2918    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2919
2920    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2921    PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2922    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2923    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2924    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2925
2926    o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2927
2928    t = pa_tagstruct_command(
2929            s->context,
2930            (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2931            &tag);
2932    pa_tagstruct_putu32(t, s->channel);
2933    pa_tagstruct_putu32(t, (uint32_t) mode);
2934    pa_tagstruct_put_proplist(t, p);
2935
2936    pa_pstream_send_tagstruct(s->context->pstream, t);
2937    pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2938
2939    /* Please note that we don't update s->proplist here, because we
2940     * don't export that field */
2941
2942    return o;
2943}
2944
2945pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2946    pa_operation *o;
2947    pa_tagstruct *t;
2948    uint32_t tag;
2949    const char * const*k;
2950
2951    pa_assert(s);
2952    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2953
2954    PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2955    PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2956    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2957    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2958    PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2959
2960    o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2961
2962    t = pa_tagstruct_command(
2963            s->context,
2964            (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2965            &tag);
2966    pa_tagstruct_putu32(t, s->channel);
2967
2968    for (k = keys; *k; k++)
2969        pa_tagstruct_puts(t, *k);
2970
2971    pa_tagstruct_puts(t, NULL);
2972
2973    pa_pstream_send_tagstruct(s->context->pstream, t);
2974    pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2975
2976    /* Please note that we don't update s->proplist here, because we
2977     * don't export that field */
2978
2979    return o;
2980}
2981
2982int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2983    pa_assert(s);
2984    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2985
2986    PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2987    PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2988    PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2989    PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2990
2991    s->direct_on_input = sink_input_idx;
2992
2993    return 0;
2994}
2995
2996uint32_t pa_stream_get_monitor_stream(const pa_stream *s) {
2997    pa_assert(s);
2998    pa_assert(PA_REFCNT_VALUE(s) >= 1);
2999
3000    PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
3001    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
3002    PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
3003
3004    return s->direct_on_input;
3005}
3006