1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6 
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11 
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16 
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <string.h>
26 #include <stdio.h>
27 #include <string.h>
28 
29 #include <pulse/def.h>
30 #include <pulse/timeval.h>
31 #include <pulse/rtclock.h>
32 #include <pulse/xmalloc.h>
33 #include <pulse/fork-detect.h>
34 
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/sample-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41 #include <pulsecore/core-util.h>
42 
43 #include "internal.h"
44 #include "stream.h"
45 
46 /* #define STREAM_DEBUG */
47 
48 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
49 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50 
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #ifndef USE_SMOOTHER_2
53 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
54 #define SMOOTHER_MIN_HISTORY (4)
55 #endif
56 
pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map)57 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
58     return pa_stream_new_with_proplist(c, name, ss, map, NULL);
59 }
60 
reset_callbacks(pa_stream *s)61 static void reset_callbacks(pa_stream *s) {
62     s->read_callback = NULL;
63     s->read_userdata = NULL;
64     s->write_callback = NULL;
65     s->write_userdata = NULL;
66     s->state_callback = NULL;
67     s->state_userdata = NULL;
68     s->overflow_callback = NULL;
69     s->overflow_userdata = NULL;
70     s->underflow_callback = NULL;
71     s->underflow_userdata = NULL;
72     s->latency_update_callback = NULL;
73     s->latency_update_userdata = NULL;
74     s->moved_callback = NULL;
75     s->moved_userdata = NULL;
76     s->suspended_callback = NULL;
77     s->suspended_userdata = NULL;
78     s->started_callback = NULL;
79     s->started_userdata = NULL;
80     s->event_callback = NULL;
81     s->event_userdata = NULL;
82     s->buffer_attr_callback = NULL;
83     s->buffer_attr_userdata = NULL;
84     s->underflow_ohos_callback = NULL;
85     s->underflow_ohos_userdata = NULL;
86 }
87 
pa_stream_new_with_proplist_internal( pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map, pa_format_info * const *formats, unsigned int n_formats, pa_proplist *p)88 static pa_stream *pa_stream_new_with_proplist_internal(
89         pa_context *c,
90         const char *name,
91         const pa_sample_spec *ss,
92         const pa_channel_map *map,
93         pa_format_info * const *formats,
94         unsigned int n_formats,
95         pa_proplist *p) {
96 
97     pa_stream *s;
98     unsigned int i;
99 
100     pa_assert(c);
101     pa_assert(PA_REFCNT_VALUE(c) >= 1);
102     pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
103     pa_assert(n_formats < PA_MAX_FORMATS);
104 
105     PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
106     PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
107 
108     s = pa_xnew(pa_stream, 1);
109     PA_REFCNT_INIT(s);
110     s->context = c;
111     s->mainloop = c->mainloop;
112 
113     s->direction = PA_STREAM_NODIRECTION;
114     s->state = PA_STREAM_UNCONNECTED;
115     s->flags = 0;
116 
117     if (ss)
118         s->sample_spec = *ss;
119     else
120         pa_sample_spec_init(&s->sample_spec);
121 
122     if (map)
123         s->channel_map = *map;
124     else
125         pa_channel_map_init(&s->channel_map);
126 
127     s->n_formats = 0;
128     if (formats) {
129         s->n_formats = n_formats;
130         for (i = 0; i < n_formats; i++)
131             s->req_formats[i] = pa_format_info_copy(formats[i]);
132     }
133 
134     /* We'll get the final negotiated format after connecting */
135     s->format = NULL;
136 
137     s->direct_on_input = PA_INVALID_INDEX;
138 
139     s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
140     if (name)
141         pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
142 
143     s->channel = 0;
144     s->channel_valid = false;
145     s->syncid = c->csyncid++;
146     s->stream_index = PA_INVALID_INDEX;
147 
148     s->requested_bytes = 0;
149     memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
150 
151     /* We initialize the target length here, so that if the user
152      * passes no explicit buffering metrics the default is similar to
153      * what older PA versions provided. */
154 
155     s->buffer_attr.maxlength = (uint32_t) -1;
156     if (ss)
157         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
158     else {
159         /* FIXME: We assume a worst-case compressed format corresponding to
160          * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
161         pa_sample_spec tmp_ss = {
162             .format   = PA_SAMPLE_S16NE,
163             .rate     = 48000,
164             .channels = 2,
165         };
166         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
167     }
168     s->buffer_attr.minreq = (uint32_t) -1;
169     s->buffer_attr.prebuf = (uint32_t) -1;
170     s->buffer_attr.fragsize = (uint32_t) -1;
171 
172     s->device_index = PA_INVALID_INDEX;
173     s->device_name = NULL;
174     s->suspended = false;
175     s->corked = false;
176 
177     s->write_memblock = NULL;
178     s->write_data = NULL;
179 
180     pa_memchunk_reset(&s->peek_memchunk);
181     s->peek_data = NULL;
182     s->record_memblockq = NULL;
183 
184     memset(&s->timing_info, 0, sizeof(s->timing_info));
185     s->timing_info_valid = false;
186 
187     s->previous_time = 0;
188     s->latest_underrun_at_index = -1;
189 
190     s->read_index_not_before = 0;
191     s->write_index_not_before = 0;
192     for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
193         s->write_index_corrections[i].valid = 0;
194     s->current_write_index_correction = 0;
195 
196     s->auto_timing_update_event = NULL;
197     s->auto_timing_update_requested = false;
198     s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
199 
200     reset_callbacks(s);
201 
202     s->smoother = NULL;
203 
204     /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
205     PA_LLIST_PREPEND(pa_stream, c->streams, s);
206     pa_stream_ref(s);
207 
208     return s;
209 }
210 
pa_stream_new_with_proplist( pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map, pa_proplist *p)211 pa_stream *pa_stream_new_with_proplist(
212         pa_context *c,
213         const char *name,
214         const pa_sample_spec *ss,
215         const pa_channel_map *map,
216         pa_proplist *p) {
217 
218     pa_channel_map tmap;
219 
220     PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
221     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
222     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
223     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
224     PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
225 
226     if (!map)
227         PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
228 
229     return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
230 }
231 
pa_stream_new_extended( pa_context *c, const char *name, pa_format_info * const *formats, unsigned int n_formats, pa_proplist *p)232 pa_stream *pa_stream_new_extended(
233         pa_context *c,
234         const char *name,
235         pa_format_info * const *formats,
236         unsigned int n_formats,
237         pa_proplist *p) {
238 
239     PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
240 
241     return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
242 }
243 
stream_unlink(pa_stream *s)244 static void stream_unlink(pa_stream *s) {
245     pa_operation *o, *n;
246     pa_assert(s);
247 
248     if (!s->context)
249         return;
250 
251     /* Detach from context */
252 
253     /* Unref all operation objects that point to us */
254     for (o = s->context->operations; o; o = n) {
255         n = o->next;
256 
257         if (o->stream == s)
258             pa_operation_cancel(o);
259     }
260 
261     /* Drop all outstanding replies for this stream */
262     if (s->context->pdispatch)
263         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
264 
265     if (s->channel_valid) {
266         pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
267         s->channel = 0;
268         s->channel_valid = false;
269     }
270 
271     PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
272     pa_stream_unref(s);
273 
274     s->context = NULL;
275 
276     if (s->auto_timing_update_event) {
277         pa_assert(s->mainloop);
278         s->mainloop->time_free(s->auto_timing_update_event);
279     }
280 
281     reset_callbacks(s);
282 }
283 
stream_free(pa_stream *s)284 static void stream_free(pa_stream *s) {
285     unsigned int i;
286 
287     pa_assert(s);
288 
289     stream_unlink(s);
290 
291     if (s->write_memblock) {
292         if (s->write_data)
293             pa_memblock_release(s->write_memblock);
294         pa_memblock_unref(s->write_memblock);
295     }
296 
297     if (s->peek_memchunk.memblock) {
298         if (s->peek_data)
299             pa_memblock_release(s->peek_memchunk.memblock);
300         pa_memblock_unref(s->peek_memchunk.memblock);
301     }
302 
303     if (s->record_memblockq)
304         pa_memblockq_free(s->record_memblockq);
305 
306     if (s->proplist)
307         pa_proplist_free(s->proplist);
308 
309     if (s->smoother)
310 #ifdef USE_SMOOTHER_2
311         pa_smoother_2_free(s->smoother);
312 #else
313         pa_smoother_free(s->smoother);
314 #endif
315 
316     for (i = 0; i < s->n_formats; i++)
317         pa_format_info_free(s->req_formats[i]);
318 
319     if (s->format)
320         pa_format_info_free(s->format);
321 
322     pa_xfree(s->device_name);
323     pa_xfree(s);
324 }
325 
pa_stream_unref(pa_stream *s)326 void pa_stream_unref(pa_stream *s) {
327     pa_assert(s);
328     pa_assert(PA_REFCNT_VALUE(s) >= 1);
329 
330     if (PA_REFCNT_DEC(s) <= 0)
331         stream_free(s);
332 }
333 
pa_stream_ref(pa_stream *s)334 pa_stream* pa_stream_ref(pa_stream *s) {
335     pa_assert(s);
336     pa_assert(PA_REFCNT_VALUE(s) >= 1);
337 
338     PA_REFCNT_INC(s);
339     return s;
340 }
341 
pa_stream_get_state(const pa_stream *s)342 pa_stream_state_t pa_stream_get_state(const pa_stream *s) {
343     pa_assert(s);
344     pa_assert(PA_REFCNT_VALUE(s) >= 1);
345 
346     return s->state;
347 }
348 
pa_stream_terminate(pa_stream *s)349 void pa_stream_terminate(pa_stream *s) {
350     pa_stream_set_state(s, PA_STREAM_TERMINATED);
351 }
352 
pa_stream_get_context(const pa_stream *s)353 pa_context* pa_stream_get_context(const pa_stream *s) {
354     pa_assert(s);
355     pa_assert(PA_REFCNT_VALUE(s) >= 1);
356 
357     return s->context;
358 }
359 
pa_stream_get_index(const pa_stream *s)360 uint32_t pa_stream_get_index(const pa_stream *s) {
361     pa_assert(s);
362     pa_assert(PA_REFCNT_VALUE(s) >= 1);
363 
364     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
365     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
366 
367     return s->stream_index;
368 }
369 
pa_stream_set_state(pa_stream *s, pa_stream_state_t st)370 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
371     pa_assert(s);
372     pa_assert(PA_REFCNT_VALUE(s) >= 1);
373 
374     if (s->state == st)
375         return;
376 
377     pa_stream_ref(s);
378 
379     s->state = st;
380 
381     if (s->state_callback)
382         s->state_callback(s, s->state_userdata);
383 
384     if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
385         stream_unlink(s);
386 
387     pa_stream_unref(s);
388 }
389 
request_auto_timing_update(pa_stream *s, bool force)390 static void request_auto_timing_update(pa_stream *s, bool force) {
391     pa_assert(s);
392     pa_assert(PA_REFCNT_VALUE(s) >= 1);
393 
394     if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
395         return;
396 
397     if (s->state == PA_STREAM_READY &&
398         (force || !s->auto_timing_update_requested)) {
399         pa_operation *o;
400 
401 #ifdef STREAM_DEBUG
402         pa_log_debug("Automatically requesting new timing data");
403 #endif
404 
405         if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
406             pa_operation_unref(o);
407             s->auto_timing_update_requested = true;
408         }
409     }
410 
411     if (s->auto_timing_update_event) {
412         if (s->suspended && !force) {
413             pa_assert(s->mainloop);
414             s->mainloop->time_free(s->auto_timing_update_event);
415             s->auto_timing_update_event = NULL;
416         } else {
417             if (force)
418                 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
419 
420             pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
421 
422             s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
423         }
424     }
425 }
426 
pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)427 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
428     pa_context *c = userdata;
429     pa_stream *s;
430     uint32_t channel;
431 
432     pa_assert(pd);
433     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
434     pa_assert(t);
435     pa_assert(c);
436     pa_assert(PA_REFCNT_VALUE(c) >= 1);
437 
438     pa_context_ref(c);
439 
440     if (pa_tagstruct_getu32(t, &channel) < 0 ||
441         !pa_tagstruct_eof(t)) {
442         pa_context_fail(c, PA_ERR_PROTOCOL);
443         goto finish;
444     }
445 
446     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
447         goto finish;
448 
449     if (s->state != PA_STREAM_READY)
450         goto finish;
451 
452     pa_context_set_error(c, PA_ERR_KILLED);
453     pa_stream_set_state(s, PA_STREAM_FAILED);
454 
455 finish:
456     pa_context_unref(c);
457 }
458 
check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop)459 static void check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop) {
460     pa_usec_t x;
461 
462     pa_assert(s);
463     pa_assert(!force_start || !force_stop);
464 
465     if (!s->smoother)
466         return;
467 
468     x = pa_rtclock_now();
469 
470     if (s->timing_info_valid) {
471         if (aposteriori)
472             x -= s->timing_info.transport_usec;
473         else
474             x += s->timing_info.transport_usec;
475     }
476 
477     if (s->suspended || s->corked || force_stop)
478 #ifdef USE_SMOOTHER_2
479         pa_smoother_2_pause(s->smoother, x);
480 #else
481         pa_smoother_pause(s->smoother, x);
482 #endif
483     else if (force_start || s->buffer_attr.prebuf == 0) {
484 
485         if (!s->timing_info_valid &&
486             !aposteriori &&
487             !force_start &&
488             !force_stop &&
489             s->context->version >= 13) {
490 
491             /* If the server supports STARTED events we take them as
492              * indications when audio really starts/stops playing, if
493              * we don't have any timing info yet -- instead of trying
494              * to be smart and guessing the server time. Otherwise the
495              * unknown transport delay adds too much noise to our time
496              * calculations. */
497 
498             return;
499         }
500 
501 #ifdef USE_SMOOTHER_2
502         pa_smoother_2_resume(s->smoother, x);
503 #else
504         pa_smoother_resume(s->smoother, x, true);
505 #endif
506     }
507 
508     /* Please note that we have no idea if playback actually started
509      * if prebuf is non-zero! */
510 }
511 
512 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
513 
pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)514 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
515     pa_context *c = userdata;
516     pa_stream *s;
517     uint32_t channel;
518     const char *dn;
519     bool suspended;
520     uint32_t di;
521     pa_usec_t usec = 0;
522     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
523 
524     pa_assert(pd);
525     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
526     pa_assert(t);
527     pa_assert(c);
528     pa_assert(PA_REFCNT_VALUE(c) >= 1);
529 
530     pa_context_ref(c);
531 
532     if (c->version < 12) {
533         pa_context_fail(c, PA_ERR_PROTOCOL);
534         goto finish;
535     }
536 
537     if (pa_tagstruct_getu32(t, &channel) < 0 ||
538         pa_tagstruct_getu32(t, &di) < 0 ||
539         pa_tagstruct_gets(t, &dn) < 0 ||
540         pa_tagstruct_get_boolean(t, &suspended) < 0) {
541         pa_context_fail(c, PA_ERR_PROTOCOL);
542         goto finish;
543     }
544 
545     if (c->version >= 13) {
546 
547         if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
548             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
549                 pa_tagstruct_getu32(t, &fragsize) < 0 ||
550                 pa_tagstruct_get_usec(t, &usec) < 0) {
551                 pa_context_fail(c, PA_ERR_PROTOCOL);
552                 goto finish;
553             }
554         } else {
555             if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
556                 pa_tagstruct_getu32(t, &tlength) < 0 ||
557                 pa_tagstruct_getu32(t, &prebuf) < 0 ||
558                 pa_tagstruct_getu32(t, &minreq) < 0 ||
559                 pa_tagstruct_get_usec(t, &usec) < 0) {
560                 pa_context_fail(c, PA_ERR_PROTOCOL);
561                 goto finish;
562             }
563         }
564     }
565 
566     if (!pa_tagstruct_eof(t)) {
567         pa_context_fail(c, PA_ERR_PROTOCOL);
568         goto finish;
569     }
570 
571     if (!dn || di == PA_INVALID_INDEX) {
572         pa_context_fail(c, PA_ERR_PROTOCOL);
573         goto finish;
574     }
575 
576     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
577         goto finish;
578 
579     if (s->state != PA_STREAM_READY)
580         goto finish;
581 
582     if (c->version >= 13) {
583         if (s->direction == PA_STREAM_RECORD)
584             s->timing_info.configured_source_usec = usec;
585         else
586             s->timing_info.configured_sink_usec = usec;
587 
588         s->buffer_attr.maxlength = maxlength;
589         s->buffer_attr.fragsize = fragsize;
590         s->buffer_attr.tlength = tlength;
591         s->buffer_attr.prebuf = prebuf;
592         s->buffer_attr.minreq = minreq;
593     }
594 
595     pa_xfree(s->device_name);
596     s->device_name = pa_xstrdup(dn);
597     s->device_index = di;
598 
599     s->suspended = suspended;
600 
601     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
602         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
603         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
604         request_auto_timing_update(s, true);
605     }
606 
607     check_smoother_status(s, true, false, false);
608     request_auto_timing_update(s, true);
609 
610     if (s->moved_callback)
611         s->moved_callback(s, s->moved_userdata);
612 
613 finish:
614     pa_context_unref(c);
615 }
616 
pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)617 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
618     pa_context *c = userdata;
619     pa_stream *s;
620     uint32_t channel;
621     pa_usec_t usec = 0;
622     uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
623 
624     pa_assert(pd);
625     pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
626     pa_assert(t);
627     pa_assert(c);
628     pa_assert(PA_REFCNT_VALUE(c) >= 1);
629 
630     pa_context_ref(c);
631 
632     if (c->version < 15) {
633         pa_context_fail(c, PA_ERR_PROTOCOL);
634         goto finish;
635     }
636 
637     if (pa_tagstruct_getu32(t, &channel) < 0) {
638         pa_context_fail(c, PA_ERR_PROTOCOL);
639         goto finish;
640     }
641 
642     if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
643         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
644             pa_tagstruct_getu32(t, &fragsize) < 0 ||
645             pa_tagstruct_get_usec(t, &usec) < 0) {
646             pa_context_fail(c, PA_ERR_PROTOCOL);
647             goto finish;
648         }
649     } else {
650         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
651             pa_tagstruct_getu32(t, &tlength) < 0 ||
652             pa_tagstruct_getu32(t, &prebuf) < 0 ||
653             pa_tagstruct_getu32(t, &minreq) < 0 ||
654             pa_tagstruct_get_usec(t, &usec) < 0) {
655             pa_context_fail(c, PA_ERR_PROTOCOL);
656             goto finish;
657         }
658     }
659 
660     if (!pa_tagstruct_eof(t)) {
661         pa_context_fail(c, PA_ERR_PROTOCOL);
662         goto finish;
663     }
664 
665     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
666         goto finish;
667 
668     if (s->state != PA_STREAM_READY)
669         goto finish;
670 
671     if (s->direction == PA_STREAM_RECORD)
672         s->timing_info.configured_source_usec = usec;
673     else
674         s->timing_info.configured_sink_usec = usec;
675 
676     s->buffer_attr.maxlength = maxlength;
677     s->buffer_attr.fragsize = fragsize;
678     s->buffer_attr.tlength = tlength;
679     s->buffer_attr.prebuf = prebuf;
680     s->buffer_attr.minreq = minreq;
681 
682     request_auto_timing_update(s, true);
683 
684     if (s->buffer_attr_callback)
685         s->buffer_attr_callback(s, s->buffer_attr_userdata);
686 
687 finish:
688     pa_context_unref(c);
689 }
690 
pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)691 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
692     pa_context *c = userdata;
693     pa_stream *s;
694     uint32_t channel;
695     bool suspended;
696 
697     pa_assert(pd);
698     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
699     pa_assert(t);
700     pa_assert(c);
701     pa_assert(PA_REFCNT_VALUE(c) >= 1);
702 
703     pa_context_ref(c);
704 
705     if (c->version < 12) {
706         pa_context_fail(c, PA_ERR_PROTOCOL);
707         goto finish;
708     }
709 
710     if (pa_tagstruct_getu32(t, &channel) < 0 ||
711         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
712         !pa_tagstruct_eof(t)) {
713         pa_context_fail(c, PA_ERR_PROTOCOL);
714         goto finish;
715     }
716 
717     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
718         goto finish;
719 
720     if (s->state != PA_STREAM_READY)
721         goto finish;
722 
723     s->suspended = suspended;
724 
725     if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
726         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
727         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
728         request_auto_timing_update(s, true);
729     }
730 
731     check_smoother_status(s, true, false, false);
732     request_auto_timing_update(s, true);
733 
734     if (s->suspended_callback)
735         s->suspended_callback(s, s->suspended_userdata);
736 
737 finish:
738     pa_context_unref(c);
739 }
740 
pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)741 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
742     pa_context *c = userdata;
743     pa_stream *s;
744     uint32_t channel;
745 
746     pa_assert(pd);
747     pa_assert(command == PA_COMMAND_STARTED);
748     pa_assert(t);
749     pa_assert(c);
750     pa_assert(PA_REFCNT_VALUE(c) >= 1);
751 
752     pa_context_ref(c);
753 
754     if (c->version < 13) {
755         pa_context_fail(c, PA_ERR_PROTOCOL);
756         goto finish;
757     }
758 
759     if (pa_tagstruct_getu32(t, &channel) < 0 ||
760         !pa_tagstruct_eof(t)) {
761         pa_context_fail(c, PA_ERR_PROTOCOL);
762         goto finish;
763     }
764 
765     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
766         goto finish;
767 
768     if (s->state != PA_STREAM_READY)
769         goto finish;
770 
771     check_smoother_status(s, true, true, false);
772     request_auto_timing_update(s, true);
773 
774     if (s->started_callback)
775         s->started_callback(s, s->started_userdata);
776 
777 finish:
778     pa_context_unref(c);
779 }
780 
pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)781 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
782     pa_context *c = userdata;
783     pa_stream *s;
784     uint32_t channel;
785     pa_proplist *pl = NULL;
786     const char *event;
787 
788     pa_assert(pd);
789     pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
790     pa_assert(t);
791     pa_assert(c);
792     pa_assert(PA_REFCNT_VALUE(c) >= 1);
793 
794     pa_context_ref(c);
795 
796     if (c->version < 15) {
797         pa_context_fail(c, PA_ERR_PROTOCOL);
798         goto finish;
799     }
800 
801     pl = pa_proplist_new();
802 
803     if (pa_tagstruct_getu32(t, &channel) < 0 ||
804         pa_tagstruct_gets(t, &event) < 0 ||
805         pa_tagstruct_get_proplist(t, pl) < 0 ||
806         !pa_tagstruct_eof(t) || !event) {
807         pa_context_fail(c, PA_ERR_PROTOCOL);
808         goto finish;
809     }
810 
811     if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
812         goto finish;
813 
814     if (s->state != PA_STREAM_READY)
815         goto finish;
816 
817     if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
818         /* Let client know what the running time was when the stream had to be killed  */
819         pa_usec_t stream_time;
820         if (pa_stream_get_time(s, &stream_time) == 0)
821             pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
822     }
823 
824     if (s->event_callback)
825         s->event_callback(s, event, pl, s->event_userdata);
826 
827 finish:
828     pa_context_unref(c);
829 
830     if (pl)
831         pa_proplist_free(pl);
832 }
833 
pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)834 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
835     pa_stream *s;
836     pa_context *c = userdata;
837     uint32_t bytes, channel;
838 
839     pa_assert(pd);
840     pa_assert(command == PA_COMMAND_REQUEST);
841     pa_assert(t);
842     pa_assert(c);
843     pa_assert(PA_REFCNT_VALUE(c) >= 1);
844 
845     pa_context_ref(c);
846 
847     if (pa_tagstruct_getu32(t, &channel) < 0 ||
848         pa_tagstruct_getu32(t, &bytes) < 0 ||
849         !pa_tagstruct_eof(t)) {
850         pa_context_fail(c, PA_ERR_PROTOCOL);
851         goto finish;
852     }
853 
854     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
855         goto finish;
856 
857     if (s->state != PA_STREAM_READY)
858         goto finish;
859 
860     s->requested_bytes += bytes;
861 
862 #ifdef STREAM_DEBUG
863     pa_log_debug("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes);
864 #endif
865 
866     if (s->requested_bytes > 0 && s->write_callback)
867         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
868 
869 finish:
870     pa_context_unref(c);
871 }
872 
pa_stream_get_underflow_index(const pa_stream *p)873 int64_t pa_stream_get_underflow_index(const pa_stream *p) {
874     pa_assert(p);
875     return p->latest_underrun_at_index;
876 }
877 
pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)878 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
879     pa_stream *s;
880     pa_context *c = userdata;
881     uint32_t channel;
882     int64_t offset = -1;
883 
884     pa_assert(pd);
885     pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW
886         || command == PA_COMMAND_UNDERFLOW_OHOS);
887     pa_assert(t);
888     pa_assert(c);
889     pa_assert(PA_REFCNT_VALUE(c) >= 1);
890 
891     pa_context_ref(c);
892 
893     if (pa_tagstruct_getu32(t, &channel) < 0) {
894         pa_context_fail(c, PA_ERR_PROTOCOL);
895         goto finish;
896     }
897 
898     if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
899         if (pa_tagstruct_gets64(t, &offset) < 0) {
900             pa_context_fail(c, PA_ERR_PROTOCOL);
901             goto finish;
902         }
903     }
904 
905     if (!pa_tagstruct_eof(t)) {
906         pa_context_fail(c, PA_ERR_PROTOCOL);
907         goto finish;
908     }
909 
910     if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
911         goto finish;
912 
913     if (s->state != PA_STREAM_READY)
914         goto finish;
915 
916     if (command == PA_COMMAND_UNDERFLOW_OHOS) {
917         if (s->underflow_ohos_callback) {
918             s->underflow_ohos_callback(s, s->underflow_ohos_userdata);
919         }
920         goto finish;
921     }
922 
923     if (offset != -1)
924         s->latest_underrun_at_index = offset;
925 
926     if (s->buffer_attr.prebuf > 0)
927         check_smoother_status(s, true, false, true);
928 
929     request_auto_timing_update(s, true);
930 
931     if (command == PA_COMMAND_OVERFLOW) {
932         if (s->overflow_callback)
933             s->overflow_callback(s, s->overflow_userdata);
934     } else if (command == PA_COMMAND_UNDERFLOW) {
935         if (s->underflow_callback)
936             s->underflow_callback(s, s->underflow_userdata);
937     }
938 
939 finish:
940     pa_context_unref(c);
941 }
942 
invalidate_indexes(pa_stream *s, bool r, bool w)943 static void invalidate_indexes(pa_stream *s, bool r, bool w) {
944     pa_assert(s);
945     pa_assert(PA_REFCNT_VALUE(s) >= 1);
946 
947 #ifdef STREAM_DEBUG
948     pa_log_debug("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag);
949 #endif
950 
951     if (s->state != PA_STREAM_READY)
952         return;
953 
954     if (w) {
955         s->write_index_not_before = s->context->ctag;
956 
957         if (s->timing_info_valid)
958             s->timing_info.write_index_corrupt = true;
959 
960 #ifdef STREAM_DEBUG
961         pa_log_debug("write_index invalidated");
962 #endif
963     }
964 
965     if (r) {
966         s->read_index_not_before = s->context->ctag;
967 
968         if (s->timing_info_valid)
969             s->timing_info.read_index_corrupt = true;
970 
971 #ifdef STREAM_DEBUG
972         pa_log_debug("read_index invalidated");
973 #endif
974     }
975 
976     request_auto_timing_update(s, true);
977 }
978 
auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata)979 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
980     pa_stream *s = userdata;
981 
982     pa_assert(s);
983     pa_assert(PA_REFCNT_VALUE(s) >= 1);
984 
985     pa_stream_ref(s);
986     request_auto_timing_update(s, false);
987     pa_stream_unref(s);
988 }
989 
create_stream_complete(pa_stream *s)990 static void create_stream_complete(pa_stream *s) {
991     pa_assert(s);
992     pa_assert(PA_REFCNT_VALUE(s) >= 1);
993     pa_assert(s->state == PA_STREAM_CREATING);
994 
995     pa_stream_set_state(s, PA_STREAM_READY);
996 
997     if (s->requested_bytes > 0 && s->write_callback)
998         s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
999 
1000     if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
1001         s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
1002         pa_assert(!s->auto_timing_update_event);
1003         s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
1004 
1005         request_auto_timing_update(s, true);
1006     }
1007 
1008     check_smoother_status(s, true, false, false);
1009 }
1010 
patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags)1011 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
1012     const char *e;
1013 
1014     pa_assert(s);
1015     pa_assert(attr);
1016 
1017     if ((e = getenv("PULSE_LATENCY_MSEC"))) {
1018         uint32_t ms;
1019         pa_sample_spec ss;
1020 
1021         pa_sample_spec_init(&ss);
1022 
1023         if (pa_sample_spec_valid(&s->sample_spec))
1024             ss = s->sample_spec;
1025         else if (s->n_formats == 1)
1026             pa_format_info_to_sample_spec(s->req_formats[0], &ss, NULL);
1027 
1028         if (pa_atou(e, &ms) < 0 || ms <= 0)
1029             pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
1030         else if (!pa_sample_spec_valid(&s->sample_spec))
1031             pa_log_debug("Ignoring $PULSE_LATENCY_MSEC: %s (invalid sample spec)", e);
1032         else {
1033             attr->maxlength = (uint32_t) -1;
1034             attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &ss);
1035             attr->minreq = (uint32_t) -1;
1036             attr->prebuf = (uint32_t) -1;
1037             attr->fragsize = attr->tlength;
1038 
1039             if (flags)
1040                 *flags |= PA_STREAM_ADJUST_LATENCY;
1041         }
1042     }
1043 
1044     if (s->context->version >= 13)
1045         return;
1046 
1047     /* Version older than 0.9.10 didn't do server side buffer_attr
1048      * selection, hence we have to fake it on the client side. */
1049 
1050     /* We choose fairly conservative values here, to not confuse
1051      * old clients with extremely large playback buffers */
1052 
1053     if (attr->maxlength == (uint32_t) -1)
1054         attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1055 
1056     if (attr->tlength == (uint32_t) -1)
1057         attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1058 
1059     if (attr->minreq == (uint32_t) -1)
1060         attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1061 
1062     if (attr->prebuf == (uint32_t) -1)
1063         attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1064 
1065     if (attr->fragsize == (uint32_t) -1)
1066         attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1067 }
1068 
pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)1069 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1070     pa_stream *s = userdata;
1071     uint32_t requested_bytes = 0;
1072 
1073     pa_assert(pd);
1074     pa_assert(s);
1075     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1076     pa_assert(s->state == PA_STREAM_CREATING);
1077 
1078     pa_stream_ref(s);
1079 
1080     if (command != PA_COMMAND_REPLY) {
1081         if (pa_context_handle_error(s->context, command, t, false) < 0)
1082             goto finish;
1083 
1084         pa_stream_set_state(s, PA_STREAM_FAILED);
1085         goto finish;
1086     }
1087 
1088     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1089         s->channel == PA_INVALID_INDEX ||
1090         ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1091         ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1092         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1093         goto finish;
1094     }
1095 
1096     s->requested_bytes = (int64_t) requested_bytes;
1097 
1098     if (s->context->version >= 9) {
1099         if (s->direction == PA_STREAM_PLAYBACK) {
1100             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1101                 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1102                 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1103                 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1104                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1105                 goto finish;
1106             }
1107         } else if (s->direction == PA_STREAM_RECORD) {
1108             if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1109                 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1110                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1111                 goto finish;
1112             }
1113         }
1114     }
1115 
1116     if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1117         pa_sample_spec ss;
1118         pa_channel_map cm;
1119         const char *dn = NULL;
1120         bool suspended;
1121 
1122         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1123             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1124             pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1125             pa_tagstruct_gets(t, &dn) < 0 ||
1126             pa_tagstruct_get_boolean(t, &suspended) < 0) {
1127             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1128             goto finish;
1129         }
1130 
1131         if (!dn || s->device_index == PA_INVALID_INDEX ||
1132             ss.channels != cm.channels ||
1133             !pa_channel_map_valid(&cm) ||
1134             !pa_sample_spec_valid(&ss) ||
1135             (s->n_formats == 0 && (
1136                 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1137                 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1138                 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1139             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1140             goto finish;
1141         }
1142 
1143         pa_xfree(s->device_name);
1144         s->device_name = pa_xstrdup(dn);
1145         s->suspended = suspended;
1146 
1147         s->channel_map = cm;
1148         s->sample_spec = ss;
1149     }
1150 
1151 #ifdef USE_SMOOTHER_2
1152     if (s->flags & PA_STREAM_INTERPOLATE_TIMING)
1153         pa_smoother_2_set_sample_spec(s->smoother, pa_rtclock_now(), &s->sample_spec);
1154 #endif
1155 
1156     if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1157         pa_usec_t usec;
1158 
1159         if (pa_tagstruct_get_usec(t, &usec) < 0) {
1160             pa_context_fail(s->context, PA_ERR_PROTOCOL);
1161             goto finish;
1162         }
1163 
1164         if (s->direction == PA_STREAM_RECORD)
1165             s->timing_info.configured_source_usec = usec;
1166         else
1167             s->timing_info.configured_sink_usec = usec;
1168     }
1169 
1170     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1171         || s->context->version >= 22) {
1172 
1173         pa_format_info *f = pa_format_info_new();
1174 
1175         if (pa_tagstruct_get_format_info(t, f) < 0 || !pa_format_info_valid(f)) {
1176             pa_format_info_free(f);
1177             if (s->n_formats > 0) {
1178                 /* We used the extended API, so we should have got back a proper format */
1179                 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1180                 goto finish;
1181             }
1182         } else
1183             s->format = f;
1184     }
1185 
1186     if (!pa_tagstruct_eof(t)) {
1187         pa_context_fail(s->context, PA_ERR_PROTOCOL);
1188         goto finish;
1189     }
1190 
1191     if (s->direction == PA_STREAM_RECORD) {
1192         pa_assert(!s->record_memblockq);
1193 
1194         s->record_memblockq = pa_memblockq_new(
1195                 "client side record memblockq",
1196                 0,
1197                 s->buffer_attr.maxlength,
1198                 0,
1199                 &s->sample_spec,
1200                 1,
1201                 0,
1202                 0,
1203                 NULL);
1204     }
1205 
1206     s->channel_valid = true;
1207     pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1208 
1209     create_stream_complete(s);
1210 
1211 finish:
1212     pa_stream_unref(s);
1213 }
1214 
create_stream( pa_stream_direction_t direction, pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, const pa_cvolume *volume, pa_stream *sync_stream)1215 static int create_stream(
1216         pa_stream_direction_t direction,
1217         pa_stream *s,
1218         const char *dev,
1219         const pa_buffer_attr *attr,
1220         pa_stream_flags_t flags,
1221         const pa_cvolume *volume,
1222         pa_stream *sync_stream) {
1223 
1224     pa_tagstruct *t;
1225     uint32_t tag;
1226     bool volume_set = !!volume;
1227     pa_cvolume cv;
1228     uint32_t i;
1229 
1230     pa_assert(s);
1231     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1232     pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1233 
1234     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1235     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1236     PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1237     PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1238                                               PA_STREAM_INTERPOLATE_TIMING|
1239                                               PA_STREAM_NOT_MONOTONIC|
1240                                               PA_STREAM_AUTO_TIMING_UPDATE|
1241                                               PA_STREAM_NO_REMAP_CHANNELS|
1242                                               PA_STREAM_NO_REMIX_CHANNELS|
1243                                               PA_STREAM_FIX_FORMAT|
1244                                               PA_STREAM_FIX_RATE|
1245                                               PA_STREAM_FIX_CHANNELS|
1246                                               PA_STREAM_DONT_MOVE|
1247                                               PA_STREAM_VARIABLE_RATE|
1248                                               PA_STREAM_PEAK_DETECT|
1249                                               PA_STREAM_START_MUTED|
1250                                               PA_STREAM_ADJUST_LATENCY|
1251                                               PA_STREAM_EARLY_REQUESTS|
1252                                               PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1253                                               PA_STREAM_START_UNMUTED|
1254                                               PA_STREAM_FAIL_ON_SUSPEND|
1255                                               PA_STREAM_RELATIVE_VOLUME|
1256                                               PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1257 
1258     PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1259     PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1260     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1261     /* Although some of the other flags are not supported on older
1262      * version, we don't check for them here, because it doesn't hurt
1263      * when they are passed but actually not supported. This makes
1264      * client development easier */
1265 
1266     PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1267     PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1268     PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1269 
1270     pa_stream_ref(s);
1271 
1272     s->direction = direction;
1273 
1274     if (sync_stream)
1275         s->syncid = sync_stream->syncid;
1276 
1277     if (attr)
1278         s->buffer_attr = *attr;
1279     patch_buffer_attr(s, &s->buffer_attr, &flags);
1280 
1281     s->flags = flags;
1282     s->corked = !!(flags & PA_STREAM_START_CORKED);
1283 
1284     if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1285         pa_usec_t x;
1286 
1287         x = pa_rtclock_now();
1288 
1289         pa_assert(!s->smoother);
1290 #ifdef USE_SMOOTHER_2
1291         s->smoother = pa_smoother_2_new(SMOOTHER_HISTORY_TIME, x, 0, 0);
1292 #else
1293         s->smoother = pa_smoother_new(
1294                 SMOOTHER_ADJUST_TIME,
1295                 SMOOTHER_HISTORY_TIME,
1296                 !(flags & PA_STREAM_NOT_MONOTONIC),
1297                 true,
1298                 SMOOTHER_MIN_HISTORY,
1299                 x,
1300                 true);
1301 #endif
1302     }
1303 
1304     if (!dev)
1305         dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1306 
1307     t = pa_tagstruct_command(
1308             s->context,
1309             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1310             &tag);
1311 
1312     if (s->context->version < 13)
1313         pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1314 
1315     pa_tagstruct_put(
1316             t,
1317             PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1318             PA_TAG_CHANNEL_MAP, &s->channel_map,
1319             PA_TAG_U32, PA_INVALID_INDEX,
1320             PA_TAG_STRING, dev,
1321             PA_TAG_U32, s->buffer_attr.maxlength,
1322             PA_TAG_BOOLEAN, s->corked,
1323             PA_TAG_INVALID);
1324 
1325     if (!volume) {
1326         if (pa_sample_spec_valid(&s->sample_spec))
1327             volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1328         else {
1329             /* This is not really relevant, since no volume was set, and
1330              * the real number of channels is embedded in the format_info
1331              * structure */
1332             volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1333         }
1334     }
1335 
1336     if (s->direction == PA_STREAM_PLAYBACK) {
1337         pa_tagstruct_put(
1338                 t,
1339                 PA_TAG_U32, s->buffer_attr.tlength,
1340                 PA_TAG_U32, s->buffer_attr.prebuf,
1341                 PA_TAG_U32, s->buffer_attr.minreq,
1342                 PA_TAG_U32, s->syncid,
1343                 PA_TAG_INVALID);
1344 
1345         pa_tagstruct_put_cvolume(t, volume);
1346     } else
1347         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1348 
1349     if (s->context->version >= 12) {
1350         pa_tagstruct_put(
1351                 t,
1352                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1353                 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1354                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1355                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1356                 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1357                 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1358                 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1359                 PA_TAG_INVALID);
1360     }
1361 
1362     if (s->context->version >= 13) {
1363 
1364         if (s->direction == PA_STREAM_PLAYBACK)
1365             pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1366         else
1367             pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1368 
1369         pa_tagstruct_put(
1370                 t,
1371                 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1372                 PA_TAG_PROPLIST, s->proplist,
1373                 PA_TAG_INVALID);
1374 
1375         if (s->direction == PA_STREAM_RECORD)
1376             pa_tagstruct_putu32(t, s->direct_on_input);
1377     }
1378 
1379     if (s->context->version >= 14) {
1380 
1381         if (s->direction == PA_STREAM_PLAYBACK)
1382             pa_tagstruct_put_boolean(t, volume_set);
1383 
1384         pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1385     }
1386 
1387     if (s->context->version >= 15) {
1388 
1389         if (s->direction == PA_STREAM_PLAYBACK)
1390             pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1391 
1392         pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1393         pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1394     }
1395 
1396     if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1397         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1398 
1399     if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1400         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1401 
1402     if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1403         || s->context->version >= 22) {
1404 
1405         pa_tagstruct_putu8(t, s->n_formats);
1406         for (i = 0; i < s->n_formats; i++)
1407             pa_tagstruct_put_format_info(t, s->req_formats[i]);
1408     }
1409 
1410     if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1411         pa_tagstruct_put_cvolume(t, volume);
1412         pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1413         pa_tagstruct_put_boolean(t, volume_set);
1414         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1415         pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1416         pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1417     }
1418 
1419     pa_pstream_send_tagstruct(s->context->pstream, t);
1420     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1421 
1422     pa_stream_set_state(s, PA_STREAM_CREATING);
1423 
1424     pa_stream_unref(s);
1425     return 0;
1426 }
1427 
pa_stream_connect_playback( pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, const pa_cvolume *volume, pa_stream *sync_stream)1428 int pa_stream_connect_playback(
1429         pa_stream *s,
1430         const char *dev,
1431         const pa_buffer_attr *attr,
1432         pa_stream_flags_t flags,
1433         const pa_cvolume *volume,
1434         pa_stream *sync_stream) {
1435 
1436     pa_assert(s);
1437     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1438 
1439     return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1440 }
1441 
pa_stream_connect_record( pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags)1442 int pa_stream_connect_record(
1443         pa_stream *s,
1444         const char *dev,
1445         const pa_buffer_attr *attr,
1446         pa_stream_flags_t flags) {
1447 
1448     pa_assert(s);
1449     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1450 
1451     return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1452 }
1453 
pa_stream_begin_write( pa_stream *s, void **data, size_t *nbytes)1454 int pa_stream_begin_write(
1455         pa_stream *s,
1456         void **data,
1457         size_t *nbytes) {
1458 
1459     pa_assert(s);
1460     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1461 
1462     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1463     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1464     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1465     PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1466     PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1467 
1468     if (*nbytes != (size_t) -1) {
1469         size_t m, fs;
1470 
1471         m = pa_mempool_block_size_max(s->context->mempool);
1472         fs = pa_frame_size(&s->sample_spec);
1473 
1474         m = (m / fs) * fs;
1475         if (*nbytes > m)
1476             *nbytes = m;
1477     }
1478 
1479     if (!s->write_memblock) {
1480         s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1481         s->write_data = pa_memblock_acquire(s->write_memblock);
1482     }
1483 
1484     *data = s->write_data;
1485     *nbytes = pa_memblock_get_length(s->write_memblock);
1486 
1487     return 0;
1488 }
1489 
pa_stream_cancel_write( pa_stream *s)1490 int pa_stream_cancel_write(
1491         pa_stream *s) {
1492 
1493     pa_assert(s);
1494     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1495 
1496     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1497     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1498     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1499     PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1500 
1501     pa_assert(s->write_data);
1502 
1503     pa_memblock_release(s->write_memblock);
1504     pa_memblock_unref(s->write_memblock);
1505     s->write_memblock = NULL;
1506     s->write_data = NULL;
1507 
1508     return 0;
1509 }
1510 
pa_stream_write_ext_free( pa_stream *s, const void *data, size_t length, pa_free_cb_t free_cb, void *free_cb_data, int64_t offset, pa_seek_mode_t seek)1511 int pa_stream_write_ext_free(
1512         pa_stream *s,
1513         const void *data,
1514         size_t length,
1515         pa_free_cb_t free_cb,
1516         void *free_cb_data,
1517         int64_t offset,
1518         pa_seek_mode_t seek) {
1519 
1520     pa_assert(s);
1521     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1522     pa_assert(data);
1523 
1524     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1525     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1526     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1527     PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1528     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1529     PA_CHECK_VALIDITY(s->context,
1530                       !s->write_memblock ||
1531                       ((data >= s->write_data) &&
1532                        ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1533                       PA_ERR_INVALID);
1534     PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1535     PA_CHECK_VALIDITY(s->context, length % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1536     PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1537 
1538     if (s->write_memblock) {
1539         pa_memchunk chunk;
1540 
1541         /* pa_stream_write_begin() was called before */
1542 
1543         pa_memblock_release(s->write_memblock);
1544 
1545         chunk.memblock = s->write_memblock;
1546         chunk.index = (const char *) data - (const char *) s->write_data;
1547         chunk.length = length;
1548 
1549         s->write_memblock = NULL;
1550         s->write_data = NULL;
1551 
1552         pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1553         pa_memblock_unref(chunk.memblock);
1554 
1555     } else {
1556         pa_seek_mode_t t_seek = seek;
1557         int64_t t_offset = offset;
1558         size_t t_length = length;
1559         const void *t_data = data;
1560 
1561         /* pa_stream_write_begin() was not called before */
1562 
1563         while (t_length > 0) {
1564             pa_memchunk chunk;
1565 
1566             chunk.index = 0;
1567 
1568             if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1569                 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, free_cb_data, 1);
1570                 chunk.length = t_length;
1571             } else {
1572                 void *d;
1573                 size_t blk_size_max;
1574 
1575                 /* Break large audio streams into _aligned_ blocks or the
1576                  * other endpoint will happily discard them upon arrival. */
1577                 blk_size_max = pa_frame_align(pa_mempool_block_size_max(s->context->mempool), &s->sample_spec);
1578                 chunk.length = PA_MIN(t_length, blk_size_max);
1579                 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1580 
1581                 d = pa_memblock_acquire(chunk.memblock);
1582                 memcpy(d, t_data, chunk.length);
1583                 pa_memblock_release(chunk.memblock);
1584             }
1585 
1586             pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1587 
1588             t_offset = 0;
1589             t_seek = PA_SEEK_RELATIVE;
1590 
1591             t_data = (const uint8_t*) t_data + chunk.length;
1592             t_length -= chunk.length;
1593 
1594             pa_memblock_unref(chunk.memblock);
1595         }
1596 
1597         if (free_cb && pa_pstream_get_shm(s->context->pstream))
1598             free_cb(free_cb_data);
1599     }
1600 
1601     /* This is obviously wrong since we ignore the seeking index . But
1602      * that's OK, the server side applies the same error */
1603     s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1604 
1605 #ifdef STREAM_DEBUG
1606     pa_log_debug("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes);
1607 #endif
1608 
1609     if (s->direction == PA_STREAM_PLAYBACK) {
1610 
1611         /* Update latency request correction */
1612         if (s->write_index_corrections[s->current_write_index_correction].valid) {
1613 
1614             if (seek == PA_SEEK_ABSOLUTE) {
1615                 s->write_index_corrections[s->current_write_index_correction].corrupt = false;
1616                 s->write_index_corrections[s->current_write_index_correction].absolute = true;
1617                 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1618             } else if (seek == PA_SEEK_RELATIVE) {
1619                 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1620                     s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1621             } else
1622                 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
1623         }
1624 
1625         /* Update the write index in the already available latency data */
1626         if (s->timing_info_valid) {
1627 
1628             if (seek == PA_SEEK_ABSOLUTE) {
1629                 s->timing_info.write_index_corrupt = false;
1630                 s->timing_info.write_index = offset + (int64_t) length;
1631             } else if (seek == PA_SEEK_RELATIVE) {
1632                 if (!s->timing_info.write_index_corrupt)
1633                     s->timing_info.write_index += offset + (int64_t) length;
1634             } else
1635                 s->timing_info.write_index_corrupt = true;
1636         }
1637 
1638         if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1639             request_auto_timing_update(s, true);
1640     }
1641 
1642     return 0;
1643 }
1644 
pa_stream_write( pa_stream *s, const void *data, size_t length, pa_free_cb_t free_cb, int64_t offset, pa_seek_mode_t seek)1645 int pa_stream_write(
1646         pa_stream *s,
1647         const void *data,
1648         size_t length,
1649         pa_free_cb_t free_cb,
1650         int64_t offset,
1651         pa_seek_mode_t seek) {
1652 
1653     return pa_stream_write_ext_free(s, data, length, free_cb, (void*) data, offset, seek);
1654 }
1655 
pa_stream_peek(pa_stream *s, const void **data, size_t *length)1656 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1657     pa_assert(s);
1658     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1659     pa_assert(data);
1660     pa_assert(length);
1661 
1662     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1663     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1664     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1665 
1666     if (!s->peek_memchunk.memblock) {
1667 
1668         if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1669             /* record_memblockq is empty. */
1670             *data = NULL;
1671             *length = 0;
1672             return 0;
1673 
1674         } else if (!s->peek_memchunk.memblock) {
1675             /* record_memblockq isn't empty, but it doesn't have any data at
1676              * the current read index. */
1677             *data = NULL;
1678             *length = s->peek_memchunk.length;
1679             return 0;
1680         }
1681 
1682         s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1683     }
1684 
1685     pa_assert(s->peek_data);
1686     *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1687     *length = s->peek_memchunk.length;
1688     return 0;
1689 }
1690 
pa_stream_drop(pa_stream *s)1691 int pa_stream_drop(pa_stream *s) {
1692     pa_assert(s);
1693     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1694 
1695     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1696     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1697     PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1698     PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
1699 
1700     pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1701 
1702     /* Fix the simulated local read index */
1703     if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1704         s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1705 
1706     if (s->peek_memchunk.memblock) {
1707         pa_assert(s->peek_data);
1708         s->peek_data = NULL;
1709         pa_memblock_release(s->peek_memchunk.memblock);
1710         pa_memblock_unref(s->peek_memchunk.memblock);
1711     }
1712 
1713     pa_memchunk_reset(&s->peek_memchunk);
1714 
1715     return 0;
1716 }
1717 
pa_stream_writable_size(const pa_stream *s)1718 size_t pa_stream_writable_size(const pa_stream *s) {
1719     pa_assert(s);
1720     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1721 
1722     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1723     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1724     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1725 
1726     return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1727 }
1728 
pa_stream_readable_size(const pa_stream *s)1729 size_t pa_stream_readable_size(const pa_stream *s) {
1730     pa_assert(s);
1731     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1732 
1733     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1734     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1735     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1736 
1737     return pa_memblockq_get_length(s->record_memblockq);
1738 }
1739 
pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata)1740 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1741     pa_operation *o;
1742     pa_tagstruct *t;
1743     uint32_t tag;
1744 
1745     pa_assert(s);
1746     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1747 
1748     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1749     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1750     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1751 
1752     /* Ask for a timing update before we cork/uncork to get the best
1753      * accuracy for the transport latency suitable for the
1754      * check_smoother_status() call in the started callback */
1755     request_auto_timing_update(s, true);
1756 
1757     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1758 
1759     t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1760     pa_tagstruct_putu32(t, s->channel);
1761     pa_pstream_send_tagstruct(s->context->pstream, t);
1762     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1763 
1764     /* This might cause the read index to continue again, hence
1765      * let's request a timing update */
1766     request_auto_timing_update(s, true);
1767 
1768     return o;
1769 }
1770 
calc_time(const pa_stream *s, bool ignore_transport)1771 static pa_usec_t calc_time(const pa_stream *s, bool ignore_transport) {
1772     pa_usec_t usec;
1773 
1774     pa_assert(s);
1775     pa_assert(PA_REFCNT_VALUE(s) >= 1);
1776     pa_assert(s->state == PA_STREAM_READY);
1777     pa_assert(s->direction != PA_STREAM_UPLOAD);
1778     pa_assert(s->timing_info_valid);
1779     pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1780     pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1781 
1782     if (s->direction == PA_STREAM_PLAYBACK) {
1783         /* The last byte that was written into the output device
1784          * had this time value associated */
1785         usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1786 
1787         if (!s->corked && !s->suspended) {
1788 
1789             if (!ignore_transport)
1790                 /* Because the latency info took a little time to come
1791                  * to us, we assume that the real output time is actually
1792                  * a little ahead */
1793                 usec += s->timing_info.transport_usec;
1794 
1795             /* However, the output device usually maintains a buffer
1796                too, hence the real sample currently played is a little
1797                back  */
1798             if (s->timing_info.sink_usec >= usec)
1799                 usec = 0;
1800             else
1801                 usec -= s->timing_info.sink_usec;
1802         }
1803 
1804     } else {
1805         pa_assert(s->direction == PA_STREAM_RECORD);
1806 
1807         /* The last byte written into the server side queue had
1808          * this time value associated */
1809         usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1810 
1811         if (!s->corked && !s->suspended) {
1812 
1813             if (!ignore_transport)
1814                 /* Add transport latency */
1815                 usec += s->timing_info.transport_usec;
1816 
1817             /* Add latency of data in device buffer */
1818             usec += s->timing_info.source_usec;
1819 
1820             /* If this is a monitor source, we need to correct the
1821              * time by the playback device buffer */
1822             if (s->timing_info.sink_usec >= usec)
1823                 usec = 0;
1824             else
1825                 usec -= s->timing_info.sink_usec;
1826         }
1827     }
1828 
1829     return usec;
1830 }
1831 
1832 #ifdef USE_SMOOTHER_2
calc_bytes(pa_stream *s, bool ignore_transport)1833 static inline uint64_t calc_bytes(pa_stream *s, bool ignore_transport) {
1834     return (uint64_t)(calc_time(s, ignore_transport) * s->sample_spec.rate / PA_USEC_PER_SEC * pa_frame_size(&s->sample_spec));
1835 }
1836 #endif
1837 
stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)1838 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1839     pa_operation *o = userdata;
1840     struct timeval local, remote, now;
1841     pa_timing_info *i;
1842     bool playing = false;
1843     uint64_t underrun_for = 0, playing_for = 0;
1844 
1845     pa_assert(pd);
1846     pa_assert(o);
1847     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1848 
1849     if (!o->context || !o->stream)
1850         goto finish;
1851 
1852     i = &o->stream->timing_info;
1853 
1854     o->stream->timing_info_valid = false;
1855     i->write_index_corrupt = true;
1856     i->read_index_corrupt = true;
1857 
1858     if (command != PA_COMMAND_REPLY) {
1859         if (pa_context_handle_error(o->context, command, t, false) < 0)
1860             goto finish;
1861 
1862     } else {
1863 
1864         if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1865             pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1866             pa_tagstruct_get_boolean(t, &playing) < 0 ||
1867             pa_tagstruct_get_timeval(t, &local) < 0 ||
1868             pa_tagstruct_get_timeval(t, &remote) < 0 ||
1869             pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1870             pa_tagstruct_gets64(t, &i->read_index) < 0) {
1871 
1872             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1873             goto finish;
1874         }
1875 
1876         if (o->context->version >= 13 &&
1877             o->stream->direction == PA_STREAM_PLAYBACK)
1878             if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1879                 pa_tagstruct_getu64(t, &playing_for) < 0) {
1880 
1881                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1882                 goto finish;
1883             }
1884 
1885         if (!pa_tagstruct_eof(t)) {
1886             pa_context_fail(o->context, PA_ERR_PROTOCOL);
1887             goto finish;
1888         }
1889         o->stream->timing_info_valid = true;
1890         i->write_index_corrupt = false;
1891         i->read_index_corrupt = false;
1892 
1893         i->playing = (int) playing;
1894         i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1895 
1896         pa_gettimeofday(&now);
1897 
1898         /* Calculate timestamps */
1899         if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1900             /* local and remote seem to have synchronized clocks */
1901 
1902             if (o->stream->direction == PA_STREAM_PLAYBACK)
1903                 i->transport_usec = pa_timeval_diff(&remote, &local);
1904             else
1905                 i->transport_usec = pa_timeval_diff(&now, &remote);
1906 
1907             i->synchronized_clocks = true;
1908             i->timestamp = remote;
1909         } else {
1910             /* clocks are not synchronized, let's estimate latency then */
1911             i->transport_usec = pa_timeval_diff(&now, &local)/2;
1912             i->synchronized_clocks = false;
1913             i->timestamp = local;
1914             pa_timeval_add(&i->timestamp, i->transport_usec);
1915         }
1916 
1917         /* Invalidate read and write indexes if necessary */
1918         if (tag < o->stream->read_index_not_before)
1919             i->read_index_corrupt = true;
1920 
1921         if (tag < o->stream->write_index_not_before)
1922             i->write_index_corrupt = true;
1923 
1924         if (o->stream->direction == PA_STREAM_PLAYBACK) {
1925             /* Write index correction */
1926 
1927             int n, j;
1928             uint32_t ctag = tag;
1929 
1930             /* Go through the saved correction values and add up the
1931              * total correction.*/
1932             for (n = 0, j = o->stream->current_write_index_correction+1;
1933                  n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1934                  n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1935 
1936                 /* Step over invalid data or out-of-date data */
1937                 if (!o->stream->write_index_corrections[j].valid ||
1938                     o->stream->write_index_corrections[j].tag < ctag)
1939                     continue;
1940 
1941                 /* Make sure that everything is in order */
1942                 ctag = o->stream->write_index_corrections[j].tag+1;
1943 
1944                 /* Now fix the write index */
1945                 if (o->stream->write_index_corrections[j].corrupt) {
1946                     /* A corrupting seek was made */
1947                     i->write_index_corrupt = true;
1948                 } else if (o->stream->write_index_corrections[j].absolute) {
1949                     /* An absolute seek was made */
1950                     i->write_index = o->stream->write_index_corrections[j].value;
1951                     i->write_index_corrupt = false;
1952                 } else if (!i->write_index_corrupt) {
1953                     /* A relative seek was made */
1954                     i->write_index += o->stream->write_index_corrections[j].value;
1955                 }
1956             }
1957 
1958             /* Clear old correction entries */
1959             for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1960                 if (!o->stream->write_index_corrections[n].valid)
1961                     continue;
1962 
1963                 if (o->stream->write_index_corrections[n].tag <= tag)
1964                     o->stream->write_index_corrections[n].valid = false;
1965             }
1966         }
1967 
1968         if (o->stream->direction == PA_STREAM_RECORD) {
1969             /* Read index correction */
1970 
1971             if (!i->read_index_corrupt)
1972                 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1973         }
1974 
1975         /* Update smoother if we're not corked */
1976         if (o->stream->smoother && !o->stream->corked) {
1977             pa_usec_t u, x;
1978 
1979             u = x = pa_rtclock_now() - i->transport_usec;
1980 
1981             if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1982                 pa_usec_t su;
1983 
1984                 /* If we weren't playing then it will take some time
1985                  * until the audio will actually come out through the
1986                  * speakers. Since we follow that timing here, we need
1987                  * to try to fix this up */
1988 
1989                 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1990 
1991                 if (su < i->sink_usec)
1992                     x += i->sink_usec - su;
1993             }
1994 
1995             if (!i->playing)
1996 #ifdef USE_SMOOTHER_2
1997                 pa_smoother_2_pause(o->stream->smoother, x);
1998 #else
1999                 pa_smoother_pause(o->stream->smoother, x);
2000 #endif
2001 
2002             /* Update the smoother */
2003             if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
2004                 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
2005 #ifdef USE_SMOOTHER_2
2006                 pa_smoother_2_put(o->stream->smoother, u, calc_bytes(o->stream, true));
2007 #else
2008                 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, true));
2009 #endif
2010 
2011             if (i->playing)
2012 #ifdef USE_SMOOTHER_2
2013                 pa_smoother_2_resume(o->stream->smoother, x);
2014 #else
2015                 pa_smoother_resume(o->stream->smoother, x, true);
2016 #endif
2017         }
2018     }
2019 
2020     o->stream->auto_timing_update_requested = false;
2021 
2022     if (o->stream->latency_update_callback)
2023         o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
2024 
2025     if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
2026         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2027         cb(o->stream, o->stream->timing_info_valid, o->userdata);
2028     }
2029 
2030 finish:
2031 
2032     pa_operation_done(o);
2033     pa_operation_unref(o);
2034 }
2035 
pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata)2036 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2037     uint32_t tag;
2038     pa_operation *o;
2039     pa_tagstruct *t;
2040     struct timeval now;
2041     int cidx = 0;
2042 
2043     pa_assert(s);
2044     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2045 
2046     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2047     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2048     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2049 
2050     if (s->direction == PA_STREAM_PLAYBACK) {
2051         /* Find a place to store the write_index correction data for this entry */
2052         cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
2053 
2054         /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
2055         PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
2056     }
2057     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2058 
2059     t = pa_tagstruct_command(
2060             s->context,
2061             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
2062             &tag);
2063     pa_tagstruct_putu32(t, s->channel);
2064     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
2065 
2066     pa_pstream_send_tagstruct(s->context->pstream, t);
2067     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2068 
2069     if (s->direction == PA_STREAM_PLAYBACK) {
2070         /* Fill in initial correction data */
2071 
2072         s->current_write_index_correction = cidx;
2073 
2074         s->write_index_corrections[cidx].valid = true;
2075         s->write_index_corrections[cidx].absolute = false;
2076         s->write_index_corrections[cidx].corrupt = false;
2077         s->write_index_corrections[cidx].tag = tag;
2078         s->write_index_corrections[cidx].value = 0;
2079     }
2080 
2081     return o;
2082 }
2083 
pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)2084 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2085     pa_stream *s = userdata;
2086 
2087     pa_assert(pd);
2088     pa_assert(s);
2089     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2090 
2091     pa_stream_ref(s);
2092 
2093     if (command != PA_COMMAND_REPLY) {
2094         if (pa_context_handle_error(s->context, command, t, false) < 0)
2095             goto finish;
2096 
2097         pa_stream_set_state(s, PA_STREAM_FAILED);
2098         goto finish;
2099     } else if (!pa_tagstruct_eof(t)) {
2100         pa_context_fail(s->context, PA_ERR_PROTOCOL);
2101         goto finish;
2102     }
2103 
2104     pa_stream_set_state(s, PA_STREAM_TERMINATED);
2105 
2106 finish:
2107     pa_stream_unref(s);
2108 }
2109 
pa_stream_disconnect(pa_stream *s)2110 int pa_stream_disconnect(pa_stream *s) {
2111     pa_tagstruct *t;
2112     uint32_t tag;
2113 
2114     pa_assert(s);
2115     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2116 
2117     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2118     PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2119     PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2120 
2121     pa_stream_ref(s);
2122 
2123     t = pa_tagstruct_command(
2124             s->context,
2125             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2126                         (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2127             &tag);
2128     pa_tagstruct_putu32(t, s->channel);
2129     pa_pstream_send_tagstruct(s->context->pstream, t);
2130     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2131 
2132     pa_stream_unref(s);
2133     return 0;
2134 }
2135 
pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata)2136 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2137     pa_assert(s);
2138     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2139 
2140     if (pa_detect_fork())
2141         return;
2142 
2143     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2144         return;
2145 
2146     s->read_callback = cb;
2147     s->read_userdata = userdata;
2148 }
2149 
pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata)2150 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2151     pa_assert(s);
2152     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2153 
2154     if (pa_detect_fork())
2155         return;
2156 
2157     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2158         return;
2159 
2160     s->write_callback = cb;
2161     s->write_userdata = userdata;
2162 }
2163 
pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2164 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2165     pa_assert(s);
2166     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2167 
2168     if (pa_detect_fork())
2169         return;
2170 
2171     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2172         return;
2173 
2174     s->state_callback = cb;
2175     s->state_userdata = userdata;
2176 }
2177 
pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2178 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2179     pa_assert(s);
2180     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2181 
2182     if (pa_detect_fork())
2183         return;
2184 
2185     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2186         return;
2187 
2188     s->overflow_callback = cb;
2189     s->overflow_userdata = userdata;
2190 }
2191 
pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2192 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2193     pa_assert(s);
2194     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2195 
2196     if (pa_detect_fork())
2197         return;
2198 
2199     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2200         return;
2201 
2202     s->underflow_callback = cb;
2203     s->underflow_userdata = userdata;
2204 }
2205 
pa_stream_set_underflow_ohos_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2206 void pa_stream_set_underflow_ohos_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2207     pa_assert(s);
2208     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2209 
2210     if (pa_detect_fork())
2211         return;
2212 
2213     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2214         return;
2215 
2216     s->underflow_ohos_callback = cb;
2217     s->underflow_ohos_userdata = userdata;
2218 }
2219 
pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2220 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2221     pa_assert(s);
2222     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2223 
2224     if (pa_detect_fork())
2225         return;
2226 
2227     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2228         return;
2229 
2230     s->latency_update_callback = cb;
2231     s->latency_update_userdata = userdata;
2232 }
2233 
pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2234 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2235     pa_assert(s);
2236     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2237 
2238     if (pa_detect_fork())
2239         return;
2240 
2241     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2242         return;
2243 
2244     s->moved_callback = cb;
2245     s->moved_userdata = userdata;
2246 }
2247 
pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2248 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2249     pa_assert(s);
2250     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2251 
2252     if (pa_detect_fork())
2253         return;
2254 
2255     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2256         return;
2257 
2258     s->suspended_callback = cb;
2259     s->suspended_userdata = userdata;
2260 }
2261 
pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2262 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2263     pa_assert(s);
2264     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2265 
2266     if (pa_detect_fork())
2267         return;
2268 
2269     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2270         return;
2271 
2272     s->started_callback = cb;
2273     s->started_userdata = userdata;
2274 }
2275 
pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata)2276 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2277     pa_assert(s);
2278     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2279 
2280     if (pa_detect_fork())
2281         return;
2282 
2283     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2284         return;
2285 
2286     s->event_callback = cb;
2287     s->event_userdata = userdata;
2288 }
2289 
pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2290 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2291     pa_assert(s);
2292     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2293 
2294     if (pa_detect_fork())
2295         return;
2296 
2297     if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2298         return;
2299 
2300     s->buffer_attr_callback = cb;
2301     s->buffer_attr_userdata = userdata;
2302 }
2303 
pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)2304 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2305     pa_operation *o = userdata;
2306     int success = 1;
2307 
2308     pa_assert(pd);
2309     pa_assert(o);
2310     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2311 
2312     if (!o->context)
2313         goto finish;
2314 
2315     if (command != PA_COMMAND_REPLY) {
2316         if (pa_context_handle_error(o->context, command, t, false) < 0)
2317             goto finish;
2318 
2319         success = 0;
2320     } else if (!pa_tagstruct_eof(t)) {
2321         pa_context_fail(o->context, PA_ERR_PROTOCOL);
2322         goto finish;
2323     }
2324 
2325     if (o->callback) {
2326         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2327         cb(o->stream, success, o->userdata);
2328     }
2329 
2330 finish:
2331     pa_operation_done(o);
2332     pa_operation_unref(o);
2333 }
2334 
pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata)2335 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2336     pa_operation *o;
2337     pa_tagstruct *t;
2338     uint32_t tag;
2339 
2340     pa_assert(s);
2341     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2342 
2343     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2344     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2345     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2346 
2347     /* Ask for a timing update before we cork/uncork to get the best
2348      * accuracy for the transport latency suitable for the
2349      * check_smoother_status() call in the started callback */
2350     request_auto_timing_update(s, true);
2351 
2352     s->corked = b;
2353 
2354     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2355 
2356     t = pa_tagstruct_command(
2357             s->context,
2358             (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2359             &tag);
2360     pa_tagstruct_putu32(t, s->channel);
2361     pa_tagstruct_put_boolean(t, !!b);
2362     pa_pstream_send_tagstruct(s->context->pstream, t);
2363     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2364 
2365     check_smoother_status(s, false, false, false);
2366 
2367     /* This might cause the indexes to hang/start again, hence let's
2368      * request a timing update, after the cork/uncork, too */
2369     request_auto_timing_update(s, true);
2370 
2371     return o;
2372 }
2373 
stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata)2374 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2375     pa_tagstruct *t;
2376     pa_operation *o;
2377     uint32_t tag;
2378 
2379     pa_assert(s);
2380     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2381 
2382     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2383     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2384 
2385     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2386 
2387     t = pa_tagstruct_command(s->context, command, &tag);
2388     pa_tagstruct_putu32(t, s->channel);
2389     pa_pstream_send_tagstruct(s->context->pstream, t);
2390     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2391 
2392     return o;
2393 }
2394 
pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata)2395 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2396     pa_operation *o;
2397 
2398     pa_assert(s);
2399     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2400 
2401     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2402     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2403     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2404 
2405     /* Ask for a timing update *before* the flush, so that the
2406      * transport usec is as up to date as possible when we get the
2407      * underflow message and update the smoother status*/
2408     request_auto_timing_update(s, true);
2409 
2410     if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2411         return NULL;
2412 
2413     if (s->direction == PA_STREAM_PLAYBACK) {
2414 
2415         if (s->write_index_corrections[s->current_write_index_correction].valid)
2416             s->write_index_corrections[s->current_write_index_correction].corrupt = true;
2417 
2418         if (s->buffer_attr.prebuf > 0)
2419             check_smoother_status(s, false, false, true);
2420 
2421         /* This will change the write index, but leave the
2422          * read index untouched. */
2423         invalidate_indexes(s, false, true);
2424 
2425     } else
2426         /* For record streams this has no influence on the write
2427          * index, but the read index might jump. */
2428         invalidate_indexes(s, true, false);
2429 
2430     /* Note that we do not update requested_bytes here. This is
2431      * because we cannot really know how data actually was dropped
2432      * from the write index due to this. This 'error' will be applied
2433      * by both client and server and hence we should be fine. */
2434 
2435     return o;
2436 }
2437 
pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata)2438 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2439     pa_operation *o;
2440 
2441     pa_assert(s);
2442     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2443 
2444     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2445     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2446     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2447     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2448 
2449     /* Ask for a timing update before we cork/uncork to get the best
2450      * accuracy for the transport latency suitable for the
2451      * check_smoother_status() call in the started callback */
2452     request_auto_timing_update(s, true);
2453 
2454     if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2455         return NULL;
2456 
2457     /* This might cause the read index to hang again, hence
2458      * let's request a timing update */
2459     request_auto_timing_update(s, true);
2460 
2461     return o;
2462 }
2463 
pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata)2464 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2465     pa_operation *o;
2466 
2467     pa_assert(s);
2468     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2469 
2470     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2471     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2472     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2473     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2474 
2475     /* Ask for a timing update before we cork/uncork to get the best
2476      * accuracy for the transport latency suitable for the
2477      * check_smoother_status() call in the started callback */
2478     request_auto_timing_update(s, true);
2479 
2480     if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2481         return NULL;
2482 
2483     /* This might cause the read index to start moving again, hence
2484      * let's request a timing update */
2485     request_auto_timing_update(s, true);
2486 
2487     return o;
2488 }
2489 
pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata)2490 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2491     pa_operation *o;
2492 
2493     pa_assert(s);
2494     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2495     pa_assert(name);
2496 
2497     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2498     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2499     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2500 
2501     if (s->context->version >= 13) {
2502         pa_proplist *p = pa_proplist_new();
2503 
2504         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2505         o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2506         pa_proplist_free(p);
2507     } else {
2508         pa_tagstruct *t;
2509         uint32_t tag;
2510 
2511         o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2512         t = pa_tagstruct_command(
2513                 s->context,
2514                 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2515                 &tag);
2516         pa_tagstruct_putu32(t, s->channel);
2517         pa_tagstruct_puts(t, name);
2518         pa_pstream_send_tagstruct(s->context->pstream, t);
2519         pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2520     }
2521 
2522     return o;
2523 }
2524 
pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec)2525 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2526     pa_usec_t usec;
2527 
2528     pa_assert(s);
2529     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2530 
2531     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2532     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2533     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2534     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2535     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2536     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2537 
2538     if (s->smoother)
2539 #ifdef USE_SMOOTHER_2
2540         usec = pa_smoother_2_get(s->smoother, pa_rtclock_now());
2541 #else
2542         usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2543 #endif
2544 
2545     else
2546         usec = calc_time(s, false);
2547 
2548     /* Make sure the time runs monotonically */
2549     if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2550         if (usec < s->previous_time)
2551             usec = s->previous_time;
2552         else
2553             s->previous_time = usec;
2554     }
2555 
2556     if (r_usec)
2557         *r_usec = usec;
2558 
2559     return 0;
2560 }
2561 
time_counter_diff(const pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative)2562 static pa_usec_t time_counter_diff(const pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2563     pa_assert(s);
2564     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2565 
2566     if (negative)
2567         *negative = 0;
2568 
2569     if (a >= b)
2570         return a-b;
2571     else {
2572         if (negative && s->direction == PA_STREAM_RECORD) {
2573             *negative = 1;
2574             return b-a;
2575         } else
2576             return 0;
2577     }
2578 }
2579 
pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative)2580 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2581     pa_usec_t t, c;
2582     int r;
2583     int64_t cindex;
2584 
2585     pa_assert(s);
2586     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2587     pa_assert(r_usec);
2588 
2589     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2590     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2591     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2592     PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2593     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2594     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2595 
2596     if ((r = pa_stream_get_time(s, &t)) < 0)
2597         return r;
2598 
2599     if (s->direction == PA_STREAM_PLAYBACK)
2600         cindex = s->timing_info.write_index;
2601     else
2602         cindex = s->timing_info.read_index;
2603 
2604     if (cindex < 0)
2605         cindex = 0;
2606 
2607     c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2608 
2609     if (s->direction == PA_STREAM_PLAYBACK)
2610         *r_usec = time_counter_diff(s, c, t, negative);
2611     else
2612         *r_usec = time_counter_diff(s, t, c, negative);
2613 
2614     return 0;
2615 }
2616 
pa_stream_get_timing_info(pa_stream *s)2617 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2618     pa_assert(s);
2619     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2620 
2621     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2622     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2623     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2624     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2625 
2626     return &s->timing_info;
2627 }
2628 
pa_stream_get_sample_spec(pa_stream *s)2629 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2630     pa_assert(s);
2631     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2632 
2633     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2634 
2635     return &s->sample_spec;
2636 }
2637 
pa_stream_get_channel_map(pa_stream *s)2638 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2639     pa_assert(s);
2640     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2641 
2642     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2643 
2644     return &s->channel_map;
2645 }
2646 
pa_stream_get_format_info(const pa_stream *s)2647 const pa_format_info* pa_stream_get_format_info(const pa_stream *s) {
2648     pa_assert(s);
2649     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2650 
2651     /* We don't have the format till routing is done */
2652     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2653     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2654 
2655     return s->format;
2656 }
pa_stream_get_buffer_attr(pa_stream *s)2657 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2658     pa_assert(s);
2659     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2660 
2661     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2662     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2663     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2664 
2665     return &s->buffer_attr;
2666 }
2667 
stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)2668 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2669     pa_operation *o = userdata;
2670     int success = 1;
2671 
2672     pa_assert(pd);
2673     pa_assert(o);
2674     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2675 
2676     if (!o->context)
2677         goto finish;
2678 
2679     if (command != PA_COMMAND_REPLY) {
2680         if (pa_context_handle_error(o->context, command, t, false) < 0)
2681             goto finish;
2682 
2683         success = 0;
2684     } else {
2685         if (o->stream->direction == PA_STREAM_PLAYBACK) {
2686             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2687                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2688                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2689                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2690                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2691                 goto finish;
2692             }
2693         } else if (o->stream->direction == PA_STREAM_RECORD) {
2694             if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2695                 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2696                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2697                 goto finish;
2698             }
2699         }
2700 
2701         if (o->stream->context->version >= 13) {
2702             pa_usec_t usec;
2703 
2704             if (pa_tagstruct_get_usec(t, &usec) < 0) {
2705                 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2706                 goto finish;
2707             }
2708 
2709             if (o->stream->direction == PA_STREAM_RECORD)
2710                 o->stream->timing_info.configured_source_usec = usec;
2711             else
2712                 o->stream->timing_info.configured_sink_usec = usec;
2713         }
2714 
2715         if (!pa_tagstruct_eof(t)) {
2716             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2717             goto finish;
2718         }
2719     }
2720 
2721     if (o->callback) {
2722         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2723         cb(o->stream, success, o->userdata);
2724     }
2725 
2726 finish:
2727     pa_operation_done(o);
2728     pa_operation_unref(o);
2729 }
2730 
pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata)2731 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2732     pa_operation *o;
2733     pa_tagstruct *t;
2734     uint32_t tag;
2735     pa_buffer_attr copy;
2736 
2737     pa_assert(s);
2738     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2739     pa_assert(attr);
2740 
2741     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2742     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2743     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2744     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2745 
2746     /* Ask for a timing update before we cork/uncork to get the best
2747      * accuracy for the transport latency suitable for the
2748      * check_smoother_status() call in the started callback */
2749     request_auto_timing_update(s, true);
2750 
2751     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2752 
2753     t = pa_tagstruct_command(
2754             s->context,
2755             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2756             &tag);
2757     pa_tagstruct_putu32(t, s->channel);
2758 
2759     copy = *attr;
2760     patch_buffer_attr(s, &copy, NULL);
2761     attr = &copy;
2762 
2763     pa_tagstruct_putu32(t, attr->maxlength);
2764 
2765     if (s->direction == PA_STREAM_PLAYBACK)
2766         pa_tagstruct_put(
2767                 t,
2768                 PA_TAG_U32, attr->tlength,
2769                 PA_TAG_U32, attr->prebuf,
2770                 PA_TAG_U32, attr->minreq,
2771                 PA_TAG_INVALID);
2772     else
2773         pa_tagstruct_putu32(t, attr->fragsize);
2774 
2775     if (s->context->version >= 13)
2776         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2777 
2778     if (s->context->version >= 14)
2779         pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2780 
2781     pa_pstream_send_tagstruct(s->context->pstream, t);
2782     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2783 
2784     /* This might cause changes in the read/write index, hence let's
2785      * request a timing update */
2786     request_auto_timing_update(s, true);
2787 
2788     return o;
2789 }
2790 
pa_stream_get_device_index(const pa_stream *s)2791 uint32_t pa_stream_get_device_index(const pa_stream *s) {
2792     pa_assert(s);
2793     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2794 
2795     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2796     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2797     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2798     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2799     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2800 
2801     return s->device_index;
2802 }
2803 
pa_stream_get_device_name(const pa_stream *s)2804 const char *pa_stream_get_device_name(const pa_stream *s) {
2805     pa_assert(s);
2806     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2807 
2808     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2809     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2810     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2811     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2812     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2813 
2814     return s->device_name;
2815 }
2816 
pa_stream_is_suspended(const pa_stream *s)2817 int pa_stream_is_suspended(const pa_stream *s) {
2818     pa_assert(s);
2819     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2820 
2821     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2822     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2823     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2824     PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2825 
2826     return s->suspended;
2827 }
2828 
pa_stream_is_corked(const pa_stream *s)2829 int pa_stream_is_corked(const pa_stream *s) {
2830     pa_assert(s);
2831     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2832 
2833     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2834     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2835     PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2836 
2837     return s->corked;
2838 }
2839 
stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)2840 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2841     pa_operation *o = userdata;
2842     int success = 1;
2843 
2844     pa_assert(pd);
2845     pa_assert(o);
2846     pa_assert(PA_REFCNT_VALUE(o) >= 1);
2847 
2848     if (!o->context)
2849         goto finish;
2850 
2851     if (command != PA_COMMAND_REPLY) {
2852         if (pa_context_handle_error(o->context, command, t, false) < 0)
2853             goto finish;
2854 
2855         success = 0;
2856     } else {
2857 
2858         if (!pa_tagstruct_eof(t)) {
2859             pa_context_fail(o->context, PA_ERR_PROTOCOL);
2860             goto finish;
2861         }
2862     }
2863 
2864     o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2865 #ifdef USE_SMOOTHER_2
2866     if (o->stream->smoother)
2867         pa_smoother_2_set_rate(o->stream->smoother, pa_rtclock_now(), o->stream->sample_spec.rate);
2868 #endif
2869     pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2870 
2871     if (o->callback) {
2872         pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2873         cb(o->stream, success, o->userdata);
2874     }
2875 
2876 finish:
2877     pa_operation_done(o);
2878     pa_operation_unref(o);
2879 }
2880 
pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata)2881 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2882     pa_operation *o;
2883     pa_tagstruct *t;
2884     uint32_t tag;
2885 
2886     pa_assert(s);
2887     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2888 
2889     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2890     PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_sample_rate_valid(rate), PA_ERR_INVALID);
2891     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2892     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2893     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2894     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2895 
2896     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2897     o->private = PA_UINT_TO_PTR(rate);
2898 
2899     t = pa_tagstruct_command(
2900             s->context,
2901             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2902             &tag);
2903     pa_tagstruct_putu32(t, s->channel);
2904     pa_tagstruct_putu32(t, rate);
2905 
2906     pa_pstream_send_tagstruct(s->context->pstream, t);
2907     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2908 
2909     return o;
2910 }
2911 
pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata)2912 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2913     pa_operation *o;
2914     pa_tagstruct *t;
2915     uint32_t tag;
2916 
2917     pa_assert(s);
2918     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2919 
2920     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2921     PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2922     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2923     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2924     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2925 
2926     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2927 
2928     t = pa_tagstruct_command(
2929             s->context,
2930             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2931             &tag);
2932     pa_tagstruct_putu32(t, s->channel);
2933     pa_tagstruct_putu32(t, (uint32_t) mode);
2934     pa_tagstruct_put_proplist(t, p);
2935 
2936     pa_pstream_send_tagstruct(s->context->pstream, t);
2937     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2938 
2939     /* Please note that we don't update s->proplist here, because we
2940      * don't export that field */
2941 
2942     return o;
2943 }
2944 
pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata)2945 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2946     pa_operation *o;
2947     pa_tagstruct *t;
2948     uint32_t tag;
2949     const char * const*k;
2950 
2951     pa_assert(s);
2952     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2953 
2954     PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2955     PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2956     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2957     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2958     PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2959 
2960     o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2961 
2962     t = pa_tagstruct_command(
2963             s->context,
2964             (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2965             &tag);
2966     pa_tagstruct_putu32(t, s->channel);
2967 
2968     for (k = keys; *k; k++)
2969         pa_tagstruct_puts(t, *k);
2970 
2971     pa_tagstruct_puts(t, NULL);
2972 
2973     pa_pstream_send_tagstruct(s->context->pstream, t);
2974     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2975 
2976     /* Please note that we don't update s->proplist here, because we
2977      * don't export that field */
2978 
2979     return o;
2980 }
2981 
pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx)2982 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2983     pa_assert(s);
2984     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2985 
2986     PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2987     PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2988     PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2989     PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2990 
2991     s->direct_on_input = sink_input_idx;
2992 
2993     return 0;
2994 }
2995 
pa_stream_get_monitor_stream(const pa_stream *s)2996 uint32_t pa_stream_get_monitor_stream(const pa_stream *s) {
2997     pa_assert(s);
2998     pa_assert(PA_REFCNT_VALUE(s) >= 1);
2999 
3000     PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
3001     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
3002     PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
3003 
3004     return s->direct_on_input;
3005 }
3006