1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <string.h>
26 #include <stdio.h>
27 #include <string.h>
28
29 #include <pulse/def.h>
30 #include <pulse/timeval.h>
31 #include <pulse/rtclock.h>
32 #include <pulse/xmalloc.h>
33 #include <pulse/fork-detect.h>
34
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/sample-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41 #include <pulsecore/core-util.h>
42
43 #include "internal.h"
44 #include "stream.h"
45
46 /* #define STREAM_DEBUG */
47
48 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
49 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #ifndef USE_SMOOTHER_2
53 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
54 #define SMOOTHER_MIN_HISTORY (4)
55 #endif
56
pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map)57 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
58 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
59 }
60
reset_callbacks(pa_stream *s)61 static void reset_callbacks(pa_stream *s) {
62 s->read_callback = NULL;
63 s->read_userdata = NULL;
64 s->write_callback = NULL;
65 s->write_userdata = NULL;
66 s->state_callback = NULL;
67 s->state_userdata = NULL;
68 s->overflow_callback = NULL;
69 s->overflow_userdata = NULL;
70 s->underflow_callback = NULL;
71 s->underflow_userdata = NULL;
72 s->latency_update_callback = NULL;
73 s->latency_update_userdata = NULL;
74 s->moved_callback = NULL;
75 s->moved_userdata = NULL;
76 s->suspended_callback = NULL;
77 s->suspended_userdata = NULL;
78 s->started_callback = NULL;
79 s->started_userdata = NULL;
80 s->event_callback = NULL;
81 s->event_userdata = NULL;
82 s->buffer_attr_callback = NULL;
83 s->buffer_attr_userdata = NULL;
84 s->underflow_ohos_callback = NULL;
85 s->underflow_ohos_userdata = NULL;
86 }
87
pa_stream_new_with_proplist_internal( pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map, pa_format_info * const *formats, unsigned int n_formats, pa_proplist *p)88 static pa_stream *pa_stream_new_with_proplist_internal(
89 pa_context *c,
90 const char *name,
91 const pa_sample_spec *ss,
92 const pa_channel_map *map,
93 pa_format_info * const *formats,
94 unsigned int n_formats,
95 pa_proplist *p) {
96
97 pa_stream *s;
98 unsigned int i;
99
100 pa_assert(c);
101 pa_assert(PA_REFCNT_VALUE(c) >= 1);
102 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
103 pa_assert(n_formats < PA_MAX_FORMATS);
104
105 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
106 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
107
108 s = pa_xnew(pa_stream, 1);
109 PA_REFCNT_INIT(s);
110 s->context = c;
111 s->mainloop = c->mainloop;
112
113 s->direction = PA_STREAM_NODIRECTION;
114 s->state = PA_STREAM_UNCONNECTED;
115 s->flags = 0;
116
117 if (ss)
118 s->sample_spec = *ss;
119 else
120 pa_sample_spec_init(&s->sample_spec);
121
122 if (map)
123 s->channel_map = *map;
124 else
125 pa_channel_map_init(&s->channel_map);
126
127 s->n_formats = 0;
128 if (formats) {
129 s->n_formats = n_formats;
130 for (i = 0; i < n_formats; i++)
131 s->req_formats[i] = pa_format_info_copy(formats[i]);
132 }
133
134 /* We'll get the final negotiated format after connecting */
135 s->format = NULL;
136
137 s->direct_on_input = PA_INVALID_INDEX;
138
139 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
140 if (name)
141 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
142
143 s->channel = 0;
144 s->channel_valid = false;
145 s->syncid = c->csyncid++;
146 s->stream_index = PA_INVALID_INDEX;
147
148 s->requested_bytes = 0;
149 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
150
151 /* We initialize the target length here, so that if the user
152 * passes no explicit buffering metrics the default is similar to
153 * what older PA versions provided. */
154
155 s->buffer_attr.maxlength = (uint32_t) -1;
156 if (ss)
157 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
158 else {
159 /* FIXME: We assume a worst-case compressed format corresponding to
160 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
161 pa_sample_spec tmp_ss = {
162 .format = PA_SAMPLE_S16NE,
163 .rate = 48000,
164 .channels = 2,
165 };
166 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
167 }
168 s->buffer_attr.minreq = (uint32_t) -1;
169 s->buffer_attr.prebuf = (uint32_t) -1;
170 s->buffer_attr.fragsize = (uint32_t) -1;
171
172 s->device_index = PA_INVALID_INDEX;
173 s->device_name = NULL;
174 s->suspended = false;
175 s->corked = false;
176
177 s->write_memblock = NULL;
178 s->write_data = NULL;
179
180 pa_memchunk_reset(&s->peek_memchunk);
181 s->peek_data = NULL;
182 s->record_memblockq = NULL;
183
184 memset(&s->timing_info, 0, sizeof(s->timing_info));
185 s->timing_info_valid = false;
186
187 s->previous_time = 0;
188 s->latest_underrun_at_index = -1;
189
190 s->read_index_not_before = 0;
191 s->write_index_not_before = 0;
192 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
193 s->write_index_corrections[i].valid = 0;
194 s->current_write_index_correction = 0;
195
196 s->auto_timing_update_event = NULL;
197 s->auto_timing_update_requested = false;
198 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
199
200 reset_callbacks(s);
201
202 s->smoother = NULL;
203
204 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
205 PA_LLIST_PREPEND(pa_stream, c->streams, s);
206 pa_stream_ref(s);
207
208 return s;
209 }
210
pa_stream_new_with_proplist( pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map, pa_proplist *p)211 pa_stream *pa_stream_new_with_proplist(
212 pa_context *c,
213 const char *name,
214 const pa_sample_spec *ss,
215 const pa_channel_map *map,
216 pa_proplist *p) {
217
218 pa_channel_map tmap;
219
220 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
221 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
222 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
223 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
224 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
225
226 if (!map)
227 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
228
229 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
230 }
231
pa_stream_new_extended( pa_context *c, const char *name, pa_format_info * const *formats, unsigned int n_formats, pa_proplist *p)232 pa_stream *pa_stream_new_extended(
233 pa_context *c,
234 const char *name,
235 pa_format_info * const *formats,
236 unsigned int n_formats,
237 pa_proplist *p) {
238
239 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
240
241 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
242 }
243
stream_unlink(pa_stream *s)244 static void stream_unlink(pa_stream *s) {
245 pa_operation *o, *n;
246 pa_assert(s);
247
248 if (!s->context)
249 return;
250
251 /* Detach from context */
252
253 /* Unref all operation objects that point to us */
254 for (o = s->context->operations; o; o = n) {
255 n = o->next;
256
257 if (o->stream == s)
258 pa_operation_cancel(o);
259 }
260
261 /* Drop all outstanding replies for this stream */
262 if (s->context->pdispatch)
263 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
264
265 if (s->channel_valid) {
266 pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
267 s->channel = 0;
268 s->channel_valid = false;
269 }
270
271 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
272 pa_stream_unref(s);
273
274 s->context = NULL;
275
276 if (s->auto_timing_update_event) {
277 pa_assert(s->mainloop);
278 s->mainloop->time_free(s->auto_timing_update_event);
279 }
280
281 reset_callbacks(s);
282 }
283
stream_free(pa_stream *s)284 static void stream_free(pa_stream *s) {
285 unsigned int i;
286
287 pa_assert(s);
288
289 stream_unlink(s);
290
291 if (s->write_memblock) {
292 if (s->write_data)
293 pa_memblock_release(s->write_memblock);
294 pa_memblock_unref(s->write_memblock);
295 }
296
297 if (s->peek_memchunk.memblock) {
298 if (s->peek_data)
299 pa_memblock_release(s->peek_memchunk.memblock);
300 pa_memblock_unref(s->peek_memchunk.memblock);
301 }
302
303 if (s->record_memblockq)
304 pa_memblockq_free(s->record_memblockq);
305
306 if (s->proplist)
307 pa_proplist_free(s->proplist);
308
309 if (s->smoother)
310 #ifdef USE_SMOOTHER_2
311 pa_smoother_2_free(s->smoother);
312 #else
313 pa_smoother_free(s->smoother);
314 #endif
315
316 for (i = 0; i < s->n_formats; i++)
317 pa_format_info_free(s->req_formats[i]);
318
319 if (s->format)
320 pa_format_info_free(s->format);
321
322 pa_xfree(s->device_name);
323 pa_xfree(s);
324 }
325
pa_stream_unref(pa_stream *s)326 void pa_stream_unref(pa_stream *s) {
327 pa_assert(s);
328 pa_assert(PA_REFCNT_VALUE(s) >= 1);
329
330 if (PA_REFCNT_DEC(s) <= 0)
331 stream_free(s);
332 }
333
pa_stream_ref(pa_stream *s)334 pa_stream* pa_stream_ref(pa_stream *s) {
335 pa_assert(s);
336 pa_assert(PA_REFCNT_VALUE(s) >= 1);
337
338 PA_REFCNT_INC(s);
339 return s;
340 }
341
pa_stream_get_state(const pa_stream *s)342 pa_stream_state_t pa_stream_get_state(const pa_stream *s) {
343 pa_assert(s);
344 pa_assert(PA_REFCNT_VALUE(s) >= 1);
345
346 return s->state;
347 }
348
pa_stream_terminate(pa_stream *s)349 void pa_stream_terminate(pa_stream *s) {
350 pa_stream_set_state(s, PA_STREAM_TERMINATED);
351 }
352
pa_stream_get_context(const pa_stream *s)353 pa_context* pa_stream_get_context(const pa_stream *s) {
354 pa_assert(s);
355 pa_assert(PA_REFCNT_VALUE(s) >= 1);
356
357 return s->context;
358 }
359
pa_stream_get_index(const pa_stream *s)360 uint32_t pa_stream_get_index(const pa_stream *s) {
361 pa_assert(s);
362 pa_assert(PA_REFCNT_VALUE(s) >= 1);
363
364 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
365 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
366
367 return s->stream_index;
368 }
369
pa_stream_set_state(pa_stream *s, pa_stream_state_t st)370 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
371 pa_assert(s);
372 pa_assert(PA_REFCNT_VALUE(s) >= 1);
373
374 if (s->state == st)
375 return;
376
377 pa_stream_ref(s);
378
379 s->state = st;
380
381 if (s->state_callback)
382 s->state_callback(s, s->state_userdata);
383
384 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
385 stream_unlink(s);
386
387 pa_stream_unref(s);
388 }
389
request_auto_timing_update(pa_stream *s, bool force)390 static void request_auto_timing_update(pa_stream *s, bool force) {
391 pa_assert(s);
392 pa_assert(PA_REFCNT_VALUE(s) >= 1);
393
394 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
395 return;
396
397 if (s->state == PA_STREAM_READY &&
398 (force || !s->auto_timing_update_requested)) {
399 pa_operation *o;
400
401 #ifdef STREAM_DEBUG
402 pa_log_debug("Automatically requesting new timing data");
403 #endif
404
405 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
406 pa_operation_unref(o);
407 s->auto_timing_update_requested = true;
408 }
409 }
410
411 if (s->auto_timing_update_event) {
412 if (s->suspended && !force) {
413 pa_assert(s->mainloop);
414 s->mainloop->time_free(s->auto_timing_update_event);
415 s->auto_timing_update_event = NULL;
416 } else {
417 if (force)
418 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
419
420 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
421
422 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
423 }
424 }
425 }
426
pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)427 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
428 pa_context *c = userdata;
429 pa_stream *s;
430 uint32_t channel;
431
432 pa_assert(pd);
433 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
434 pa_assert(t);
435 pa_assert(c);
436 pa_assert(PA_REFCNT_VALUE(c) >= 1);
437
438 pa_context_ref(c);
439
440 if (pa_tagstruct_getu32(t, &channel) < 0 ||
441 !pa_tagstruct_eof(t)) {
442 pa_context_fail(c, PA_ERR_PROTOCOL);
443 goto finish;
444 }
445
446 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
447 goto finish;
448
449 if (s->state != PA_STREAM_READY)
450 goto finish;
451
452 pa_context_set_error(c, PA_ERR_KILLED);
453 pa_stream_set_state(s, PA_STREAM_FAILED);
454
455 finish:
456 pa_context_unref(c);
457 }
458
check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop)459 static void check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop) {
460 pa_usec_t x;
461
462 pa_assert(s);
463 pa_assert(!force_start || !force_stop);
464
465 if (!s->smoother)
466 return;
467
468 x = pa_rtclock_now();
469
470 if (s->timing_info_valid) {
471 if (aposteriori)
472 x -= s->timing_info.transport_usec;
473 else
474 x += s->timing_info.transport_usec;
475 }
476
477 if (s->suspended || s->corked || force_stop)
478 #ifdef USE_SMOOTHER_2
479 pa_smoother_2_pause(s->smoother, x);
480 #else
481 pa_smoother_pause(s->smoother, x);
482 #endif
483 else if (force_start || s->buffer_attr.prebuf == 0) {
484
485 if (!s->timing_info_valid &&
486 !aposteriori &&
487 !force_start &&
488 !force_stop &&
489 s->context->version >= 13) {
490
491 /* If the server supports STARTED events we take them as
492 * indications when audio really starts/stops playing, if
493 * we don't have any timing info yet -- instead of trying
494 * to be smart and guessing the server time. Otherwise the
495 * unknown transport delay adds too much noise to our time
496 * calculations. */
497
498 return;
499 }
500
501 #ifdef USE_SMOOTHER_2
502 pa_smoother_2_resume(s->smoother, x);
503 #else
504 pa_smoother_resume(s->smoother, x, true);
505 #endif
506 }
507
508 /* Please note that we have no idea if playback actually started
509 * if prebuf is non-zero! */
510 }
511
512 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
513
pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)514 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
515 pa_context *c = userdata;
516 pa_stream *s;
517 uint32_t channel;
518 const char *dn;
519 bool suspended;
520 uint32_t di;
521 pa_usec_t usec = 0;
522 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
523
524 pa_assert(pd);
525 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
526 pa_assert(t);
527 pa_assert(c);
528 pa_assert(PA_REFCNT_VALUE(c) >= 1);
529
530 pa_context_ref(c);
531
532 if (c->version < 12) {
533 pa_context_fail(c, PA_ERR_PROTOCOL);
534 goto finish;
535 }
536
537 if (pa_tagstruct_getu32(t, &channel) < 0 ||
538 pa_tagstruct_getu32(t, &di) < 0 ||
539 pa_tagstruct_gets(t, &dn) < 0 ||
540 pa_tagstruct_get_boolean(t, &suspended) < 0) {
541 pa_context_fail(c, PA_ERR_PROTOCOL);
542 goto finish;
543 }
544
545 if (c->version >= 13) {
546
547 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
548 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
549 pa_tagstruct_getu32(t, &fragsize) < 0 ||
550 pa_tagstruct_get_usec(t, &usec) < 0) {
551 pa_context_fail(c, PA_ERR_PROTOCOL);
552 goto finish;
553 }
554 } else {
555 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
556 pa_tagstruct_getu32(t, &tlength) < 0 ||
557 pa_tagstruct_getu32(t, &prebuf) < 0 ||
558 pa_tagstruct_getu32(t, &minreq) < 0 ||
559 pa_tagstruct_get_usec(t, &usec) < 0) {
560 pa_context_fail(c, PA_ERR_PROTOCOL);
561 goto finish;
562 }
563 }
564 }
565
566 if (!pa_tagstruct_eof(t)) {
567 pa_context_fail(c, PA_ERR_PROTOCOL);
568 goto finish;
569 }
570
571 if (!dn || di == PA_INVALID_INDEX) {
572 pa_context_fail(c, PA_ERR_PROTOCOL);
573 goto finish;
574 }
575
576 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
577 goto finish;
578
579 if (s->state != PA_STREAM_READY)
580 goto finish;
581
582 if (c->version >= 13) {
583 if (s->direction == PA_STREAM_RECORD)
584 s->timing_info.configured_source_usec = usec;
585 else
586 s->timing_info.configured_sink_usec = usec;
587
588 s->buffer_attr.maxlength = maxlength;
589 s->buffer_attr.fragsize = fragsize;
590 s->buffer_attr.tlength = tlength;
591 s->buffer_attr.prebuf = prebuf;
592 s->buffer_attr.minreq = minreq;
593 }
594
595 pa_xfree(s->device_name);
596 s->device_name = pa_xstrdup(dn);
597 s->device_index = di;
598
599 s->suspended = suspended;
600
601 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
602 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
603 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
604 request_auto_timing_update(s, true);
605 }
606
607 check_smoother_status(s, true, false, false);
608 request_auto_timing_update(s, true);
609
610 if (s->moved_callback)
611 s->moved_callback(s, s->moved_userdata);
612
613 finish:
614 pa_context_unref(c);
615 }
616
pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)617 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
618 pa_context *c = userdata;
619 pa_stream *s;
620 uint32_t channel;
621 pa_usec_t usec = 0;
622 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
623
624 pa_assert(pd);
625 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
626 pa_assert(t);
627 pa_assert(c);
628 pa_assert(PA_REFCNT_VALUE(c) >= 1);
629
630 pa_context_ref(c);
631
632 if (c->version < 15) {
633 pa_context_fail(c, PA_ERR_PROTOCOL);
634 goto finish;
635 }
636
637 if (pa_tagstruct_getu32(t, &channel) < 0) {
638 pa_context_fail(c, PA_ERR_PROTOCOL);
639 goto finish;
640 }
641
642 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
643 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
644 pa_tagstruct_getu32(t, &fragsize) < 0 ||
645 pa_tagstruct_get_usec(t, &usec) < 0) {
646 pa_context_fail(c, PA_ERR_PROTOCOL);
647 goto finish;
648 }
649 } else {
650 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
651 pa_tagstruct_getu32(t, &tlength) < 0 ||
652 pa_tagstruct_getu32(t, &prebuf) < 0 ||
653 pa_tagstruct_getu32(t, &minreq) < 0 ||
654 pa_tagstruct_get_usec(t, &usec) < 0) {
655 pa_context_fail(c, PA_ERR_PROTOCOL);
656 goto finish;
657 }
658 }
659
660 if (!pa_tagstruct_eof(t)) {
661 pa_context_fail(c, PA_ERR_PROTOCOL);
662 goto finish;
663 }
664
665 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
666 goto finish;
667
668 if (s->state != PA_STREAM_READY)
669 goto finish;
670
671 if (s->direction == PA_STREAM_RECORD)
672 s->timing_info.configured_source_usec = usec;
673 else
674 s->timing_info.configured_sink_usec = usec;
675
676 s->buffer_attr.maxlength = maxlength;
677 s->buffer_attr.fragsize = fragsize;
678 s->buffer_attr.tlength = tlength;
679 s->buffer_attr.prebuf = prebuf;
680 s->buffer_attr.minreq = minreq;
681
682 request_auto_timing_update(s, true);
683
684 if (s->buffer_attr_callback)
685 s->buffer_attr_callback(s, s->buffer_attr_userdata);
686
687 finish:
688 pa_context_unref(c);
689 }
690
pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)691 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
692 pa_context *c = userdata;
693 pa_stream *s;
694 uint32_t channel;
695 bool suspended;
696
697 pa_assert(pd);
698 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
699 pa_assert(t);
700 pa_assert(c);
701 pa_assert(PA_REFCNT_VALUE(c) >= 1);
702
703 pa_context_ref(c);
704
705 if (c->version < 12) {
706 pa_context_fail(c, PA_ERR_PROTOCOL);
707 goto finish;
708 }
709
710 if (pa_tagstruct_getu32(t, &channel) < 0 ||
711 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
712 !pa_tagstruct_eof(t)) {
713 pa_context_fail(c, PA_ERR_PROTOCOL);
714 goto finish;
715 }
716
717 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
718 goto finish;
719
720 if (s->state != PA_STREAM_READY)
721 goto finish;
722
723 s->suspended = suspended;
724
725 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
726 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
727 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
728 request_auto_timing_update(s, true);
729 }
730
731 check_smoother_status(s, true, false, false);
732 request_auto_timing_update(s, true);
733
734 if (s->suspended_callback)
735 s->suspended_callback(s, s->suspended_userdata);
736
737 finish:
738 pa_context_unref(c);
739 }
740
pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)741 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
742 pa_context *c = userdata;
743 pa_stream *s;
744 uint32_t channel;
745
746 pa_assert(pd);
747 pa_assert(command == PA_COMMAND_STARTED);
748 pa_assert(t);
749 pa_assert(c);
750 pa_assert(PA_REFCNT_VALUE(c) >= 1);
751
752 pa_context_ref(c);
753
754 if (c->version < 13) {
755 pa_context_fail(c, PA_ERR_PROTOCOL);
756 goto finish;
757 }
758
759 if (pa_tagstruct_getu32(t, &channel) < 0 ||
760 !pa_tagstruct_eof(t)) {
761 pa_context_fail(c, PA_ERR_PROTOCOL);
762 goto finish;
763 }
764
765 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
766 goto finish;
767
768 if (s->state != PA_STREAM_READY)
769 goto finish;
770
771 check_smoother_status(s, true, true, false);
772 request_auto_timing_update(s, true);
773
774 if (s->started_callback)
775 s->started_callback(s, s->started_userdata);
776
777 finish:
778 pa_context_unref(c);
779 }
780
pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)781 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
782 pa_context *c = userdata;
783 pa_stream *s;
784 uint32_t channel;
785 pa_proplist *pl = NULL;
786 const char *event;
787
788 pa_assert(pd);
789 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
790 pa_assert(t);
791 pa_assert(c);
792 pa_assert(PA_REFCNT_VALUE(c) >= 1);
793
794 pa_context_ref(c);
795
796 if (c->version < 15) {
797 pa_context_fail(c, PA_ERR_PROTOCOL);
798 goto finish;
799 }
800
801 pl = pa_proplist_new();
802
803 if (pa_tagstruct_getu32(t, &channel) < 0 ||
804 pa_tagstruct_gets(t, &event) < 0 ||
805 pa_tagstruct_get_proplist(t, pl) < 0 ||
806 !pa_tagstruct_eof(t) || !event) {
807 pa_context_fail(c, PA_ERR_PROTOCOL);
808 goto finish;
809 }
810
811 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
812 goto finish;
813
814 if (s->state != PA_STREAM_READY)
815 goto finish;
816
817 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
818 /* Let client know what the running time was when the stream had to be killed */
819 pa_usec_t stream_time;
820 if (pa_stream_get_time(s, &stream_time) == 0)
821 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
822 }
823
824 if (s->event_callback)
825 s->event_callback(s, event, pl, s->event_userdata);
826
827 finish:
828 pa_context_unref(c);
829
830 if (pl)
831 pa_proplist_free(pl);
832 }
833
pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)834 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
835 pa_stream *s;
836 pa_context *c = userdata;
837 uint32_t bytes, channel;
838
839 pa_assert(pd);
840 pa_assert(command == PA_COMMAND_REQUEST);
841 pa_assert(t);
842 pa_assert(c);
843 pa_assert(PA_REFCNT_VALUE(c) >= 1);
844
845 pa_context_ref(c);
846
847 if (pa_tagstruct_getu32(t, &channel) < 0 ||
848 pa_tagstruct_getu32(t, &bytes) < 0 ||
849 !pa_tagstruct_eof(t)) {
850 pa_context_fail(c, PA_ERR_PROTOCOL);
851 goto finish;
852 }
853
854 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
855 goto finish;
856
857 if (s->state != PA_STREAM_READY)
858 goto finish;
859
860 s->requested_bytes += bytes;
861
862 #ifdef STREAM_DEBUG
863 pa_log_debug("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes);
864 #endif
865
866 if (s->requested_bytes > 0 && s->write_callback)
867 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
868
869 finish:
870 pa_context_unref(c);
871 }
872
pa_stream_get_underflow_index(const pa_stream *p)873 int64_t pa_stream_get_underflow_index(const pa_stream *p) {
874 pa_assert(p);
875 return p->latest_underrun_at_index;
876 }
877
pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)878 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
879 pa_stream *s;
880 pa_context *c = userdata;
881 uint32_t channel;
882 int64_t offset = -1;
883
884 pa_assert(pd);
885 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW
886 || command == PA_COMMAND_UNDERFLOW_OHOS);
887 pa_assert(t);
888 pa_assert(c);
889 pa_assert(PA_REFCNT_VALUE(c) >= 1);
890
891 pa_context_ref(c);
892
893 if (pa_tagstruct_getu32(t, &channel) < 0) {
894 pa_context_fail(c, PA_ERR_PROTOCOL);
895 goto finish;
896 }
897
898 if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
899 if (pa_tagstruct_gets64(t, &offset) < 0) {
900 pa_context_fail(c, PA_ERR_PROTOCOL);
901 goto finish;
902 }
903 }
904
905 if (!pa_tagstruct_eof(t)) {
906 pa_context_fail(c, PA_ERR_PROTOCOL);
907 goto finish;
908 }
909
910 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
911 goto finish;
912
913 if (s->state != PA_STREAM_READY)
914 goto finish;
915
916 if (command == PA_COMMAND_UNDERFLOW_OHOS) {
917 if (s->underflow_ohos_callback) {
918 s->underflow_ohos_callback(s, s->underflow_ohos_userdata);
919 }
920 goto finish;
921 }
922
923 if (offset != -1)
924 s->latest_underrun_at_index = offset;
925
926 if (s->buffer_attr.prebuf > 0)
927 check_smoother_status(s, true, false, true);
928
929 request_auto_timing_update(s, true);
930
931 if (command == PA_COMMAND_OVERFLOW) {
932 if (s->overflow_callback)
933 s->overflow_callback(s, s->overflow_userdata);
934 } else if (command == PA_COMMAND_UNDERFLOW) {
935 if (s->underflow_callback)
936 s->underflow_callback(s, s->underflow_userdata);
937 }
938
939 finish:
940 pa_context_unref(c);
941 }
942
invalidate_indexes(pa_stream *s, bool r, bool w)943 static void invalidate_indexes(pa_stream *s, bool r, bool w) {
944 pa_assert(s);
945 pa_assert(PA_REFCNT_VALUE(s) >= 1);
946
947 #ifdef STREAM_DEBUG
948 pa_log_debug("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag);
949 #endif
950
951 if (s->state != PA_STREAM_READY)
952 return;
953
954 if (w) {
955 s->write_index_not_before = s->context->ctag;
956
957 if (s->timing_info_valid)
958 s->timing_info.write_index_corrupt = true;
959
960 #ifdef STREAM_DEBUG
961 pa_log_debug("write_index invalidated");
962 #endif
963 }
964
965 if (r) {
966 s->read_index_not_before = s->context->ctag;
967
968 if (s->timing_info_valid)
969 s->timing_info.read_index_corrupt = true;
970
971 #ifdef STREAM_DEBUG
972 pa_log_debug("read_index invalidated");
973 #endif
974 }
975
976 request_auto_timing_update(s, true);
977 }
978
auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata)979 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
980 pa_stream *s = userdata;
981
982 pa_assert(s);
983 pa_assert(PA_REFCNT_VALUE(s) >= 1);
984
985 pa_stream_ref(s);
986 request_auto_timing_update(s, false);
987 pa_stream_unref(s);
988 }
989
create_stream_complete(pa_stream *s)990 static void create_stream_complete(pa_stream *s) {
991 pa_assert(s);
992 pa_assert(PA_REFCNT_VALUE(s) >= 1);
993 pa_assert(s->state == PA_STREAM_CREATING);
994
995 pa_stream_set_state(s, PA_STREAM_READY);
996
997 if (s->requested_bytes > 0 && s->write_callback)
998 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
999
1000 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
1001 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
1002 pa_assert(!s->auto_timing_update_event);
1003 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
1004
1005 request_auto_timing_update(s, true);
1006 }
1007
1008 check_smoother_status(s, true, false, false);
1009 }
1010
patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags)1011 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
1012 const char *e;
1013
1014 pa_assert(s);
1015 pa_assert(attr);
1016
1017 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
1018 uint32_t ms;
1019 pa_sample_spec ss;
1020
1021 pa_sample_spec_init(&ss);
1022
1023 if (pa_sample_spec_valid(&s->sample_spec))
1024 ss = s->sample_spec;
1025 else if (s->n_formats == 1)
1026 pa_format_info_to_sample_spec(s->req_formats[0], &ss, NULL);
1027
1028 if (pa_atou(e, &ms) < 0 || ms <= 0)
1029 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
1030 else if (!pa_sample_spec_valid(&s->sample_spec))
1031 pa_log_debug("Ignoring $PULSE_LATENCY_MSEC: %s (invalid sample spec)", e);
1032 else {
1033 attr->maxlength = (uint32_t) -1;
1034 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &ss);
1035 attr->minreq = (uint32_t) -1;
1036 attr->prebuf = (uint32_t) -1;
1037 attr->fragsize = attr->tlength;
1038
1039 if (flags)
1040 *flags |= PA_STREAM_ADJUST_LATENCY;
1041 }
1042 }
1043
1044 if (s->context->version >= 13)
1045 return;
1046
1047 /* Version older than 0.9.10 didn't do server side buffer_attr
1048 * selection, hence we have to fake it on the client side. */
1049
1050 /* We choose fairly conservative values here, to not confuse
1051 * old clients with extremely large playback buffers */
1052
1053 if (attr->maxlength == (uint32_t) -1)
1054 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1055
1056 if (attr->tlength == (uint32_t) -1)
1057 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1058
1059 if (attr->minreq == (uint32_t) -1)
1060 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1061
1062 if (attr->prebuf == (uint32_t) -1)
1063 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1064
1065 if (attr->fragsize == (uint32_t) -1)
1066 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1067 }
1068
pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)1069 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1070 pa_stream *s = userdata;
1071 uint32_t requested_bytes = 0;
1072
1073 pa_assert(pd);
1074 pa_assert(s);
1075 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1076 pa_assert(s->state == PA_STREAM_CREATING);
1077
1078 pa_stream_ref(s);
1079
1080 if (command != PA_COMMAND_REPLY) {
1081 if (pa_context_handle_error(s->context, command, t, false) < 0)
1082 goto finish;
1083
1084 pa_stream_set_state(s, PA_STREAM_FAILED);
1085 goto finish;
1086 }
1087
1088 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1089 s->channel == PA_INVALID_INDEX ||
1090 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1091 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1092 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1093 goto finish;
1094 }
1095
1096 s->requested_bytes = (int64_t) requested_bytes;
1097
1098 if (s->context->version >= 9) {
1099 if (s->direction == PA_STREAM_PLAYBACK) {
1100 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1101 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1102 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1103 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1104 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1105 goto finish;
1106 }
1107 } else if (s->direction == PA_STREAM_RECORD) {
1108 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1109 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1110 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1111 goto finish;
1112 }
1113 }
1114 }
1115
1116 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1117 pa_sample_spec ss;
1118 pa_channel_map cm;
1119 const char *dn = NULL;
1120 bool suspended;
1121
1122 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1123 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1124 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1125 pa_tagstruct_gets(t, &dn) < 0 ||
1126 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1127 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1128 goto finish;
1129 }
1130
1131 if (!dn || s->device_index == PA_INVALID_INDEX ||
1132 ss.channels != cm.channels ||
1133 !pa_channel_map_valid(&cm) ||
1134 !pa_sample_spec_valid(&ss) ||
1135 (s->n_formats == 0 && (
1136 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1137 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1138 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1139 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1140 goto finish;
1141 }
1142
1143 pa_xfree(s->device_name);
1144 s->device_name = pa_xstrdup(dn);
1145 s->suspended = suspended;
1146
1147 s->channel_map = cm;
1148 s->sample_spec = ss;
1149 }
1150
1151 #ifdef USE_SMOOTHER_2
1152 if (s->flags & PA_STREAM_INTERPOLATE_TIMING)
1153 pa_smoother_2_set_sample_spec(s->smoother, pa_rtclock_now(), &s->sample_spec);
1154 #endif
1155
1156 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1157 pa_usec_t usec;
1158
1159 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1160 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1161 goto finish;
1162 }
1163
1164 if (s->direction == PA_STREAM_RECORD)
1165 s->timing_info.configured_source_usec = usec;
1166 else
1167 s->timing_info.configured_sink_usec = usec;
1168 }
1169
1170 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1171 || s->context->version >= 22) {
1172
1173 pa_format_info *f = pa_format_info_new();
1174
1175 if (pa_tagstruct_get_format_info(t, f) < 0 || !pa_format_info_valid(f)) {
1176 pa_format_info_free(f);
1177 if (s->n_formats > 0) {
1178 /* We used the extended API, so we should have got back a proper format */
1179 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1180 goto finish;
1181 }
1182 } else
1183 s->format = f;
1184 }
1185
1186 if (!pa_tagstruct_eof(t)) {
1187 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1188 goto finish;
1189 }
1190
1191 if (s->direction == PA_STREAM_RECORD) {
1192 pa_assert(!s->record_memblockq);
1193
1194 s->record_memblockq = pa_memblockq_new(
1195 "client side record memblockq",
1196 0,
1197 s->buffer_attr.maxlength,
1198 0,
1199 &s->sample_spec,
1200 1,
1201 0,
1202 0,
1203 NULL);
1204 }
1205
1206 s->channel_valid = true;
1207 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1208
1209 create_stream_complete(s);
1210
1211 finish:
1212 pa_stream_unref(s);
1213 }
1214
create_stream( pa_stream_direction_t direction, pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, const pa_cvolume *volume, pa_stream *sync_stream)1215 static int create_stream(
1216 pa_stream_direction_t direction,
1217 pa_stream *s,
1218 const char *dev,
1219 const pa_buffer_attr *attr,
1220 pa_stream_flags_t flags,
1221 const pa_cvolume *volume,
1222 pa_stream *sync_stream) {
1223
1224 pa_tagstruct *t;
1225 uint32_t tag;
1226 bool volume_set = !!volume;
1227 pa_cvolume cv;
1228 uint32_t i;
1229
1230 pa_assert(s);
1231 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1232 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1233
1234 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1235 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1236 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1237 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1238 PA_STREAM_INTERPOLATE_TIMING|
1239 PA_STREAM_NOT_MONOTONIC|
1240 PA_STREAM_AUTO_TIMING_UPDATE|
1241 PA_STREAM_NO_REMAP_CHANNELS|
1242 PA_STREAM_NO_REMIX_CHANNELS|
1243 PA_STREAM_FIX_FORMAT|
1244 PA_STREAM_FIX_RATE|
1245 PA_STREAM_FIX_CHANNELS|
1246 PA_STREAM_DONT_MOVE|
1247 PA_STREAM_VARIABLE_RATE|
1248 PA_STREAM_PEAK_DETECT|
1249 PA_STREAM_START_MUTED|
1250 PA_STREAM_ADJUST_LATENCY|
1251 PA_STREAM_EARLY_REQUESTS|
1252 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1253 PA_STREAM_START_UNMUTED|
1254 PA_STREAM_FAIL_ON_SUSPEND|
1255 PA_STREAM_RELATIVE_VOLUME|
1256 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1257
1258 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1259 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1260 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1261 /* Although some of the other flags are not supported on older
1262 * version, we don't check for them here, because it doesn't hurt
1263 * when they are passed but actually not supported. This makes
1264 * client development easier */
1265
1266 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1267 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1268 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1269
1270 pa_stream_ref(s);
1271
1272 s->direction = direction;
1273
1274 if (sync_stream)
1275 s->syncid = sync_stream->syncid;
1276
1277 if (attr)
1278 s->buffer_attr = *attr;
1279 patch_buffer_attr(s, &s->buffer_attr, &flags);
1280
1281 s->flags = flags;
1282 s->corked = !!(flags & PA_STREAM_START_CORKED);
1283
1284 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1285 pa_usec_t x;
1286
1287 x = pa_rtclock_now();
1288
1289 pa_assert(!s->smoother);
1290 #ifdef USE_SMOOTHER_2
1291 s->smoother = pa_smoother_2_new(SMOOTHER_HISTORY_TIME, x, 0, 0);
1292 #else
1293 s->smoother = pa_smoother_new(
1294 SMOOTHER_ADJUST_TIME,
1295 SMOOTHER_HISTORY_TIME,
1296 !(flags & PA_STREAM_NOT_MONOTONIC),
1297 true,
1298 SMOOTHER_MIN_HISTORY,
1299 x,
1300 true);
1301 #endif
1302 }
1303
1304 if (!dev)
1305 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1306
1307 t = pa_tagstruct_command(
1308 s->context,
1309 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1310 &tag);
1311
1312 if (s->context->version < 13)
1313 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1314
1315 pa_tagstruct_put(
1316 t,
1317 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1318 PA_TAG_CHANNEL_MAP, &s->channel_map,
1319 PA_TAG_U32, PA_INVALID_INDEX,
1320 PA_TAG_STRING, dev,
1321 PA_TAG_U32, s->buffer_attr.maxlength,
1322 PA_TAG_BOOLEAN, s->corked,
1323 PA_TAG_INVALID);
1324
1325 if (!volume) {
1326 if (pa_sample_spec_valid(&s->sample_spec))
1327 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1328 else {
1329 /* This is not really relevant, since no volume was set, and
1330 * the real number of channels is embedded in the format_info
1331 * structure */
1332 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1333 }
1334 }
1335
1336 if (s->direction == PA_STREAM_PLAYBACK) {
1337 pa_tagstruct_put(
1338 t,
1339 PA_TAG_U32, s->buffer_attr.tlength,
1340 PA_TAG_U32, s->buffer_attr.prebuf,
1341 PA_TAG_U32, s->buffer_attr.minreq,
1342 PA_TAG_U32, s->syncid,
1343 PA_TAG_INVALID);
1344
1345 pa_tagstruct_put_cvolume(t, volume);
1346 } else
1347 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1348
1349 if (s->context->version >= 12) {
1350 pa_tagstruct_put(
1351 t,
1352 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1353 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1354 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1355 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1356 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1357 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1358 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1359 PA_TAG_INVALID);
1360 }
1361
1362 if (s->context->version >= 13) {
1363
1364 if (s->direction == PA_STREAM_PLAYBACK)
1365 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1366 else
1367 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1368
1369 pa_tagstruct_put(
1370 t,
1371 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1372 PA_TAG_PROPLIST, s->proplist,
1373 PA_TAG_INVALID);
1374
1375 if (s->direction == PA_STREAM_RECORD)
1376 pa_tagstruct_putu32(t, s->direct_on_input);
1377 }
1378
1379 if (s->context->version >= 14) {
1380
1381 if (s->direction == PA_STREAM_PLAYBACK)
1382 pa_tagstruct_put_boolean(t, volume_set);
1383
1384 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1385 }
1386
1387 if (s->context->version >= 15) {
1388
1389 if (s->direction == PA_STREAM_PLAYBACK)
1390 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1391
1392 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1393 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1394 }
1395
1396 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1397 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1398
1399 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1400 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1401
1402 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1403 || s->context->version >= 22) {
1404
1405 pa_tagstruct_putu8(t, s->n_formats);
1406 for (i = 0; i < s->n_formats; i++)
1407 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1408 }
1409
1410 if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1411 pa_tagstruct_put_cvolume(t, volume);
1412 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1413 pa_tagstruct_put_boolean(t, volume_set);
1414 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1415 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1416 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1417 }
1418
1419 pa_pstream_send_tagstruct(s->context->pstream, t);
1420 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1421
1422 pa_stream_set_state(s, PA_STREAM_CREATING);
1423
1424 pa_stream_unref(s);
1425 return 0;
1426 }
1427
pa_stream_connect_playback( pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, const pa_cvolume *volume, pa_stream *sync_stream)1428 int pa_stream_connect_playback(
1429 pa_stream *s,
1430 const char *dev,
1431 const pa_buffer_attr *attr,
1432 pa_stream_flags_t flags,
1433 const pa_cvolume *volume,
1434 pa_stream *sync_stream) {
1435
1436 pa_assert(s);
1437 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1438
1439 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1440 }
1441
pa_stream_connect_record( pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags)1442 int pa_stream_connect_record(
1443 pa_stream *s,
1444 const char *dev,
1445 const pa_buffer_attr *attr,
1446 pa_stream_flags_t flags) {
1447
1448 pa_assert(s);
1449 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1450
1451 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1452 }
1453
pa_stream_begin_write( pa_stream *s, void **data, size_t *nbytes)1454 int pa_stream_begin_write(
1455 pa_stream *s,
1456 void **data,
1457 size_t *nbytes) {
1458
1459 pa_assert(s);
1460 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1461
1462 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1463 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1464 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1465 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1466 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1467
1468 if (*nbytes != (size_t) -1) {
1469 size_t m, fs;
1470
1471 m = pa_mempool_block_size_max(s->context->mempool);
1472 fs = pa_frame_size(&s->sample_spec);
1473
1474 m = (m / fs) * fs;
1475 if (*nbytes > m)
1476 *nbytes = m;
1477 }
1478
1479 if (!s->write_memblock) {
1480 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1481 s->write_data = pa_memblock_acquire(s->write_memblock);
1482 }
1483
1484 *data = s->write_data;
1485 *nbytes = pa_memblock_get_length(s->write_memblock);
1486
1487 return 0;
1488 }
1489
pa_stream_cancel_write( pa_stream *s)1490 int pa_stream_cancel_write(
1491 pa_stream *s) {
1492
1493 pa_assert(s);
1494 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1495
1496 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1497 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1498 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1499 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1500
1501 pa_assert(s->write_data);
1502
1503 pa_memblock_release(s->write_memblock);
1504 pa_memblock_unref(s->write_memblock);
1505 s->write_memblock = NULL;
1506 s->write_data = NULL;
1507
1508 return 0;
1509 }
1510
pa_stream_write_ext_free( pa_stream *s, const void *data, size_t length, pa_free_cb_t free_cb, void *free_cb_data, int64_t offset, pa_seek_mode_t seek)1511 int pa_stream_write_ext_free(
1512 pa_stream *s,
1513 const void *data,
1514 size_t length,
1515 pa_free_cb_t free_cb,
1516 void *free_cb_data,
1517 int64_t offset,
1518 pa_seek_mode_t seek) {
1519
1520 pa_assert(s);
1521 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1522 pa_assert(data);
1523
1524 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1525 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1526 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1527 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1528 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1529 PA_CHECK_VALIDITY(s->context,
1530 !s->write_memblock ||
1531 ((data >= s->write_data) &&
1532 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1533 PA_ERR_INVALID);
1534 PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1535 PA_CHECK_VALIDITY(s->context, length % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1536 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1537
1538 if (s->write_memblock) {
1539 pa_memchunk chunk;
1540
1541 /* pa_stream_write_begin() was called before */
1542
1543 pa_memblock_release(s->write_memblock);
1544
1545 chunk.memblock = s->write_memblock;
1546 chunk.index = (const char *) data - (const char *) s->write_data;
1547 chunk.length = length;
1548
1549 s->write_memblock = NULL;
1550 s->write_data = NULL;
1551
1552 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1553 pa_memblock_unref(chunk.memblock);
1554
1555 } else {
1556 pa_seek_mode_t t_seek = seek;
1557 int64_t t_offset = offset;
1558 size_t t_length = length;
1559 const void *t_data = data;
1560
1561 /* pa_stream_write_begin() was not called before */
1562
1563 while (t_length > 0) {
1564 pa_memchunk chunk;
1565
1566 chunk.index = 0;
1567
1568 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1569 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, free_cb_data, 1);
1570 chunk.length = t_length;
1571 } else {
1572 void *d;
1573 size_t blk_size_max;
1574
1575 /* Break large audio streams into _aligned_ blocks or the
1576 * other endpoint will happily discard them upon arrival. */
1577 blk_size_max = pa_frame_align(pa_mempool_block_size_max(s->context->mempool), &s->sample_spec);
1578 chunk.length = PA_MIN(t_length, blk_size_max);
1579 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1580
1581 d = pa_memblock_acquire(chunk.memblock);
1582 memcpy(d, t_data, chunk.length);
1583 pa_memblock_release(chunk.memblock);
1584 }
1585
1586 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1587
1588 t_offset = 0;
1589 t_seek = PA_SEEK_RELATIVE;
1590
1591 t_data = (const uint8_t*) t_data + chunk.length;
1592 t_length -= chunk.length;
1593
1594 pa_memblock_unref(chunk.memblock);
1595 }
1596
1597 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1598 free_cb(free_cb_data);
1599 }
1600
1601 /* This is obviously wrong since we ignore the seeking index . But
1602 * that's OK, the server side applies the same error */
1603 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1604
1605 #ifdef STREAM_DEBUG
1606 pa_log_debug("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes);
1607 #endif
1608
1609 if (s->direction == PA_STREAM_PLAYBACK) {
1610
1611 /* Update latency request correction */
1612 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1613
1614 if (seek == PA_SEEK_ABSOLUTE) {
1615 s->write_index_corrections[s->current_write_index_correction].corrupt = false;
1616 s->write_index_corrections[s->current_write_index_correction].absolute = true;
1617 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1618 } else if (seek == PA_SEEK_RELATIVE) {
1619 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1620 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1621 } else
1622 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
1623 }
1624
1625 /* Update the write index in the already available latency data */
1626 if (s->timing_info_valid) {
1627
1628 if (seek == PA_SEEK_ABSOLUTE) {
1629 s->timing_info.write_index_corrupt = false;
1630 s->timing_info.write_index = offset + (int64_t) length;
1631 } else if (seek == PA_SEEK_RELATIVE) {
1632 if (!s->timing_info.write_index_corrupt)
1633 s->timing_info.write_index += offset + (int64_t) length;
1634 } else
1635 s->timing_info.write_index_corrupt = true;
1636 }
1637
1638 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1639 request_auto_timing_update(s, true);
1640 }
1641
1642 return 0;
1643 }
1644
pa_stream_write( pa_stream *s, const void *data, size_t length, pa_free_cb_t free_cb, int64_t offset, pa_seek_mode_t seek)1645 int pa_stream_write(
1646 pa_stream *s,
1647 const void *data,
1648 size_t length,
1649 pa_free_cb_t free_cb,
1650 int64_t offset,
1651 pa_seek_mode_t seek) {
1652
1653 return pa_stream_write_ext_free(s, data, length, free_cb, (void*) data, offset, seek);
1654 }
1655
pa_stream_peek(pa_stream *s, const void **data, size_t *length)1656 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1657 pa_assert(s);
1658 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1659 pa_assert(data);
1660 pa_assert(length);
1661
1662 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1663 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1664 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1665
1666 if (!s->peek_memchunk.memblock) {
1667
1668 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1669 /* record_memblockq is empty. */
1670 *data = NULL;
1671 *length = 0;
1672 return 0;
1673
1674 } else if (!s->peek_memchunk.memblock) {
1675 /* record_memblockq isn't empty, but it doesn't have any data at
1676 * the current read index. */
1677 *data = NULL;
1678 *length = s->peek_memchunk.length;
1679 return 0;
1680 }
1681
1682 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1683 }
1684
1685 pa_assert(s->peek_data);
1686 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1687 *length = s->peek_memchunk.length;
1688 return 0;
1689 }
1690
pa_stream_drop(pa_stream *s)1691 int pa_stream_drop(pa_stream *s) {
1692 pa_assert(s);
1693 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1694
1695 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1696 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1697 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1698 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
1699
1700 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1701
1702 /* Fix the simulated local read index */
1703 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1704 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1705
1706 if (s->peek_memchunk.memblock) {
1707 pa_assert(s->peek_data);
1708 s->peek_data = NULL;
1709 pa_memblock_release(s->peek_memchunk.memblock);
1710 pa_memblock_unref(s->peek_memchunk.memblock);
1711 }
1712
1713 pa_memchunk_reset(&s->peek_memchunk);
1714
1715 return 0;
1716 }
1717
pa_stream_writable_size(const pa_stream *s)1718 size_t pa_stream_writable_size(const pa_stream *s) {
1719 pa_assert(s);
1720 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1721
1722 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1723 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1724 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1725
1726 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1727 }
1728
pa_stream_readable_size(const pa_stream *s)1729 size_t pa_stream_readable_size(const pa_stream *s) {
1730 pa_assert(s);
1731 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1732
1733 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1734 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1735 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1736
1737 return pa_memblockq_get_length(s->record_memblockq);
1738 }
1739
pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata)1740 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1741 pa_operation *o;
1742 pa_tagstruct *t;
1743 uint32_t tag;
1744
1745 pa_assert(s);
1746 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1747
1748 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1749 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1750 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1751
1752 /* Ask for a timing update before we cork/uncork to get the best
1753 * accuracy for the transport latency suitable for the
1754 * check_smoother_status() call in the started callback */
1755 request_auto_timing_update(s, true);
1756
1757 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1758
1759 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1760 pa_tagstruct_putu32(t, s->channel);
1761 pa_pstream_send_tagstruct(s->context->pstream, t);
1762 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1763
1764 /* This might cause the read index to continue again, hence
1765 * let's request a timing update */
1766 request_auto_timing_update(s, true);
1767
1768 return o;
1769 }
1770
calc_time(const pa_stream *s, bool ignore_transport)1771 static pa_usec_t calc_time(const pa_stream *s, bool ignore_transport) {
1772 pa_usec_t usec;
1773
1774 pa_assert(s);
1775 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1776 pa_assert(s->state == PA_STREAM_READY);
1777 pa_assert(s->direction != PA_STREAM_UPLOAD);
1778 pa_assert(s->timing_info_valid);
1779 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1780 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1781
1782 if (s->direction == PA_STREAM_PLAYBACK) {
1783 /* The last byte that was written into the output device
1784 * had this time value associated */
1785 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1786
1787 if (!s->corked && !s->suspended) {
1788
1789 if (!ignore_transport)
1790 /* Because the latency info took a little time to come
1791 * to us, we assume that the real output time is actually
1792 * a little ahead */
1793 usec += s->timing_info.transport_usec;
1794
1795 /* However, the output device usually maintains a buffer
1796 too, hence the real sample currently played is a little
1797 back */
1798 if (s->timing_info.sink_usec >= usec)
1799 usec = 0;
1800 else
1801 usec -= s->timing_info.sink_usec;
1802 }
1803
1804 } else {
1805 pa_assert(s->direction == PA_STREAM_RECORD);
1806
1807 /* The last byte written into the server side queue had
1808 * this time value associated */
1809 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1810
1811 if (!s->corked && !s->suspended) {
1812
1813 if (!ignore_transport)
1814 /* Add transport latency */
1815 usec += s->timing_info.transport_usec;
1816
1817 /* Add latency of data in device buffer */
1818 usec += s->timing_info.source_usec;
1819
1820 /* If this is a monitor source, we need to correct the
1821 * time by the playback device buffer */
1822 if (s->timing_info.sink_usec >= usec)
1823 usec = 0;
1824 else
1825 usec -= s->timing_info.sink_usec;
1826 }
1827 }
1828
1829 return usec;
1830 }
1831
1832 #ifdef USE_SMOOTHER_2
calc_bytes(pa_stream *s, bool ignore_transport)1833 static inline uint64_t calc_bytes(pa_stream *s, bool ignore_transport) {
1834 return (uint64_t)(calc_time(s, ignore_transport) * s->sample_spec.rate / PA_USEC_PER_SEC * pa_frame_size(&s->sample_spec));
1835 }
1836 #endif
1837
stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)1838 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1839 pa_operation *o = userdata;
1840 struct timeval local, remote, now;
1841 pa_timing_info *i;
1842 bool playing = false;
1843 uint64_t underrun_for = 0, playing_for = 0;
1844
1845 pa_assert(pd);
1846 pa_assert(o);
1847 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1848
1849 if (!o->context || !o->stream)
1850 goto finish;
1851
1852 i = &o->stream->timing_info;
1853
1854 o->stream->timing_info_valid = false;
1855 i->write_index_corrupt = true;
1856 i->read_index_corrupt = true;
1857
1858 if (command != PA_COMMAND_REPLY) {
1859 if (pa_context_handle_error(o->context, command, t, false) < 0)
1860 goto finish;
1861
1862 } else {
1863
1864 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1865 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1866 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1867 pa_tagstruct_get_timeval(t, &local) < 0 ||
1868 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1869 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1870 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1871
1872 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1873 goto finish;
1874 }
1875
1876 if (o->context->version >= 13 &&
1877 o->stream->direction == PA_STREAM_PLAYBACK)
1878 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1879 pa_tagstruct_getu64(t, &playing_for) < 0) {
1880
1881 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1882 goto finish;
1883 }
1884
1885 if (!pa_tagstruct_eof(t)) {
1886 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1887 goto finish;
1888 }
1889 o->stream->timing_info_valid = true;
1890 i->write_index_corrupt = false;
1891 i->read_index_corrupt = false;
1892
1893 i->playing = (int) playing;
1894 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1895
1896 pa_gettimeofday(&now);
1897
1898 /* Calculate timestamps */
1899 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1900 /* local and remote seem to have synchronized clocks */
1901
1902 if (o->stream->direction == PA_STREAM_PLAYBACK)
1903 i->transport_usec = pa_timeval_diff(&remote, &local);
1904 else
1905 i->transport_usec = pa_timeval_diff(&now, &remote);
1906
1907 i->synchronized_clocks = true;
1908 i->timestamp = remote;
1909 } else {
1910 /* clocks are not synchronized, let's estimate latency then */
1911 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1912 i->synchronized_clocks = false;
1913 i->timestamp = local;
1914 pa_timeval_add(&i->timestamp, i->transport_usec);
1915 }
1916
1917 /* Invalidate read and write indexes if necessary */
1918 if (tag < o->stream->read_index_not_before)
1919 i->read_index_corrupt = true;
1920
1921 if (tag < o->stream->write_index_not_before)
1922 i->write_index_corrupt = true;
1923
1924 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1925 /* Write index correction */
1926
1927 int n, j;
1928 uint32_t ctag = tag;
1929
1930 /* Go through the saved correction values and add up the
1931 * total correction.*/
1932 for (n = 0, j = o->stream->current_write_index_correction+1;
1933 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1934 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1935
1936 /* Step over invalid data or out-of-date data */
1937 if (!o->stream->write_index_corrections[j].valid ||
1938 o->stream->write_index_corrections[j].tag < ctag)
1939 continue;
1940
1941 /* Make sure that everything is in order */
1942 ctag = o->stream->write_index_corrections[j].tag+1;
1943
1944 /* Now fix the write index */
1945 if (o->stream->write_index_corrections[j].corrupt) {
1946 /* A corrupting seek was made */
1947 i->write_index_corrupt = true;
1948 } else if (o->stream->write_index_corrections[j].absolute) {
1949 /* An absolute seek was made */
1950 i->write_index = o->stream->write_index_corrections[j].value;
1951 i->write_index_corrupt = false;
1952 } else if (!i->write_index_corrupt) {
1953 /* A relative seek was made */
1954 i->write_index += o->stream->write_index_corrections[j].value;
1955 }
1956 }
1957
1958 /* Clear old correction entries */
1959 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1960 if (!o->stream->write_index_corrections[n].valid)
1961 continue;
1962
1963 if (o->stream->write_index_corrections[n].tag <= tag)
1964 o->stream->write_index_corrections[n].valid = false;
1965 }
1966 }
1967
1968 if (o->stream->direction == PA_STREAM_RECORD) {
1969 /* Read index correction */
1970
1971 if (!i->read_index_corrupt)
1972 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1973 }
1974
1975 /* Update smoother if we're not corked */
1976 if (o->stream->smoother && !o->stream->corked) {
1977 pa_usec_t u, x;
1978
1979 u = x = pa_rtclock_now() - i->transport_usec;
1980
1981 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1982 pa_usec_t su;
1983
1984 /* If we weren't playing then it will take some time
1985 * until the audio will actually come out through the
1986 * speakers. Since we follow that timing here, we need
1987 * to try to fix this up */
1988
1989 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1990
1991 if (su < i->sink_usec)
1992 x += i->sink_usec - su;
1993 }
1994
1995 if (!i->playing)
1996 #ifdef USE_SMOOTHER_2
1997 pa_smoother_2_pause(o->stream->smoother, x);
1998 #else
1999 pa_smoother_pause(o->stream->smoother, x);
2000 #endif
2001
2002 /* Update the smoother */
2003 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
2004 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
2005 #ifdef USE_SMOOTHER_2
2006 pa_smoother_2_put(o->stream->smoother, u, calc_bytes(o->stream, true));
2007 #else
2008 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, true));
2009 #endif
2010
2011 if (i->playing)
2012 #ifdef USE_SMOOTHER_2
2013 pa_smoother_2_resume(o->stream->smoother, x);
2014 #else
2015 pa_smoother_resume(o->stream->smoother, x, true);
2016 #endif
2017 }
2018 }
2019
2020 o->stream->auto_timing_update_requested = false;
2021
2022 if (o->stream->latency_update_callback)
2023 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
2024
2025 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
2026 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2027 cb(o->stream, o->stream->timing_info_valid, o->userdata);
2028 }
2029
2030 finish:
2031
2032 pa_operation_done(o);
2033 pa_operation_unref(o);
2034 }
2035
pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata)2036 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2037 uint32_t tag;
2038 pa_operation *o;
2039 pa_tagstruct *t;
2040 struct timeval now;
2041 int cidx = 0;
2042
2043 pa_assert(s);
2044 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2045
2046 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2047 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2048 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2049
2050 if (s->direction == PA_STREAM_PLAYBACK) {
2051 /* Find a place to store the write_index correction data for this entry */
2052 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
2053
2054 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
2055 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
2056 }
2057 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2058
2059 t = pa_tagstruct_command(
2060 s->context,
2061 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
2062 &tag);
2063 pa_tagstruct_putu32(t, s->channel);
2064 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
2065
2066 pa_pstream_send_tagstruct(s->context->pstream, t);
2067 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2068
2069 if (s->direction == PA_STREAM_PLAYBACK) {
2070 /* Fill in initial correction data */
2071
2072 s->current_write_index_correction = cidx;
2073
2074 s->write_index_corrections[cidx].valid = true;
2075 s->write_index_corrections[cidx].absolute = false;
2076 s->write_index_corrections[cidx].corrupt = false;
2077 s->write_index_corrections[cidx].tag = tag;
2078 s->write_index_corrections[cidx].value = 0;
2079 }
2080
2081 return o;
2082 }
2083
pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)2084 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2085 pa_stream *s = userdata;
2086
2087 pa_assert(pd);
2088 pa_assert(s);
2089 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2090
2091 pa_stream_ref(s);
2092
2093 if (command != PA_COMMAND_REPLY) {
2094 if (pa_context_handle_error(s->context, command, t, false) < 0)
2095 goto finish;
2096
2097 pa_stream_set_state(s, PA_STREAM_FAILED);
2098 goto finish;
2099 } else if (!pa_tagstruct_eof(t)) {
2100 pa_context_fail(s->context, PA_ERR_PROTOCOL);
2101 goto finish;
2102 }
2103
2104 pa_stream_set_state(s, PA_STREAM_TERMINATED);
2105
2106 finish:
2107 pa_stream_unref(s);
2108 }
2109
pa_stream_disconnect(pa_stream *s)2110 int pa_stream_disconnect(pa_stream *s) {
2111 pa_tagstruct *t;
2112 uint32_t tag;
2113
2114 pa_assert(s);
2115 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2116
2117 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2118 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2119 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2120
2121 pa_stream_ref(s);
2122
2123 t = pa_tagstruct_command(
2124 s->context,
2125 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2126 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2127 &tag);
2128 pa_tagstruct_putu32(t, s->channel);
2129 pa_pstream_send_tagstruct(s->context->pstream, t);
2130 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2131
2132 pa_stream_unref(s);
2133 return 0;
2134 }
2135
pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata)2136 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2137 pa_assert(s);
2138 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2139
2140 if (pa_detect_fork())
2141 return;
2142
2143 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2144 return;
2145
2146 s->read_callback = cb;
2147 s->read_userdata = userdata;
2148 }
2149
pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata)2150 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2151 pa_assert(s);
2152 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2153
2154 if (pa_detect_fork())
2155 return;
2156
2157 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2158 return;
2159
2160 s->write_callback = cb;
2161 s->write_userdata = userdata;
2162 }
2163
pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2164 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2165 pa_assert(s);
2166 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2167
2168 if (pa_detect_fork())
2169 return;
2170
2171 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2172 return;
2173
2174 s->state_callback = cb;
2175 s->state_userdata = userdata;
2176 }
2177
pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2178 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2179 pa_assert(s);
2180 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2181
2182 if (pa_detect_fork())
2183 return;
2184
2185 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2186 return;
2187
2188 s->overflow_callback = cb;
2189 s->overflow_userdata = userdata;
2190 }
2191
pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2192 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2193 pa_assert(s);
2194 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2195
2196 if (pa_detect_fork())
2197 return;
2198
2199 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2200 return;
2201
2202 s->underflow_callback = cb;
2203 s->underflow_userdata = userdata;
2204 }
2205
pa_stream_set_underflow_ohos_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2206 void pa_stream_set_underflow_ohos_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2207 pa_assert(s);
2208 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2209
2210 if (pa_detect_fork())
2211 return;
2212
2213 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2214 return;
2215
2216 s->underflow_ohos_callback = cb;
2217 s->underflow_ohos_userdata = userdata;
2218 }
2219
pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2220 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2221 pa_assert(s);
2222 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2223
2224 if (pa_detect_fork())
2225 return;
2226
2227 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2228 return;
2229
2230 s->latency_update_callback = cb;
2231 s->latency_update_userdata = userdata;
2232 }
2233
pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2234 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2235 pa_assert(s);
2236 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2237
2238 if (pa_detect_fork())
2239 return;
2240
2241 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2242 return;
2243
2244 s->moved_callback = cb;
2245 s->moved_userdata = userdata;
2246 }
2247
pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2248 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2249 pa_assert(s);
2250 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2251
2252 if (pa_detect_fork())
2253 return;
2254
2255 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2256 return;
2257
2258 s->suspended_callback = cb;
2259 s->suspended_userdata = userdata;
2260 }
2261
pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2262 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2263 pa_assert(s);
2264 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2265
2266 if (pa_detect_fork())
2267 return;
2268
2269 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2270 return;
2271
2272 s->started_callback = cb;
2273 s->started_userdata = userdata;
2274 }
2275
pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata)2276 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2277 pa_assert(s);
2278 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2279
2280 if (pa_detect_fork())
2281 return;
2282
2283 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2284 return;
2285
2286 s->event_callback = cb;
2287 s->event_userdata = userdata;
2288 }
2289
pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata)2290 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2291 pa_assert(s);
2292 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2293
2294 if (pa_detect_fork())
2295 return;
2296
2297 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2298 return;
2299
2300 s->buffer_attr_callback = cb;
2301 s->buffer_attr_userdata = userdata;
2302 }
2303
pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)2304 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2305 pa_operation *o = userdata;
2306 int success = 1;
2307
2308 pa_assert(pd);
2309 pa_assert(o);
2310 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2311
2312 if (!o->context)
2313 goto finish;
2314
2315 if (command != PA_COMMAND_REPLY) {
2316 if (pa_context_handle_error(o->context, command, t, false) < 0)
2317 goto finish;
2318
2319 success = 0;
2320 } else if (!pa_tagstruct_eof(t)) {
2321 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2322 goto finish;
2323 }
2324
2325 if (o->callback) {
2326 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2327 cb(o->stream, success, o->userdata);
2328 }
2329
2330 finish:
2331 pa_operation_done(o);
2332 pa_operation_unref(o);
2333 }
2334
pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata)2335 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2336 pa_operation *o;
2337 pa_tagstruct *t;
2338 uint32_t tag;
2339
2340 pa_assert(s);
2341 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2342
2343 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2344 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2345 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2346
2347 /* Ask for a timing update before we cork/uncork to get the best
2348 * accuracy for the transport latency suitable for the
2349 * check_smoother_status() call in the started callback */
2350 request_auto_timing_update(s, true);
2351
2352 s->corked = b;
2353
2354 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2355
2356 t = pa_tagstruct_command(
2357 s->context,
2358 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2359 &tag);
2360 pa_tagstruct_putu32(t, s->channel);
2361 pa_tagstruct_put_boolean(t, !!b);
2362 pa_pstream_send_tagstruct(s->context->pstream, t);
2363 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2364
2365 check_smoother_status(s, false, false, false);
2366
2367 /* This might cause the indexes to hang/start again, hence let's
2368 * request a timing update, after the cork/uncork, too */
2369 request_auto_timing_update(s, true);
2370
2371 return o;
2372 }
2373
stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata)2374 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2375 pa_tagstruct *t;
2376 pa_operation *o;
2377 uint32_t tag;
2378
2379 pa_assert(s);
2380 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2381
2382 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2383 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2384
2385 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2386
2387 t = pa_tagstruct_command(s->context, command, &tag);
2388 pa_tagstruct_putu32(t, s->channel);
2389 pa_pstream_send_tagstruct(s->context->pstream, t);
2390 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2391
2392 return o;
2393 }
2394
pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata)2395 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2396 pa_operation *o;
2397
2398 pa_assert(s);
2399 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2400
2401 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2402 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2403 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2404
2405 /* Ask for a timing update *before* the flush, so that the
2406 * transport usec is as up to date as possible when we get the
2407 * underflow message and update the smoother status*/
2408 request_auto_timing_update(s, true);
2409
2410 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2411 return NULL;
2412
2413 if (s->direction == PA_STREAM_PLAYBACK) {
2414
2415 if (s->write_index_corrections[s->current_write_index_correction].valid)
2416 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
2417
2418 if (s->buffer_attr.prebuf > 0)
2419 check_smoother_status(s, false, false, true);
2420
2421 /* This will change the write index, but leave the
2422 * read index untouched. */
2423 invalidate_indexes(s, false, true);
2424
2425 } else
2426 /* For record streams this has no influence on the write
2427 * index, but the read index might jump. */
2428 invalidate_indexes(s, true, false);
2429
2430 /* Note that we do not update requested_bytes here. This is
2431 * because we cannot really know how data actually was dropped
2432 * from the write index due to this. This 'error' will be applied
2433 * by both client and server and hence we should be fine. */
2434
2435 return o;
2436 }
2437
pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata)2438 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2439 pa_operation *o;
2440
2441 pa_assert(s);
2442 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2443
2444 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2445 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2446 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2447 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2448
2449 /* Ask for a timing update before we cork/uncork to get the best
2450 * accuracy for the transport latency suitable for the
2451 * check_smoother_status() call in the started callback */
2452 request_auto_timing_update(s, true);
2453
2454 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2455 return NULL;
2456
2457 /* This might cause the read index to hang again, hence
2458 * let's request a timing update */
2459 request_auto_timing_update(s, true);
2460
2461 return o;
2462 }
2463
pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata)2464 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2465 pa_operation *o;
2466
2467 pa_assert(s);
2468 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2469
2470 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2471 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2472 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2473 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2474
2475 /* Ask for a timing update before we cork/uncork to get the best
2476 * accuracy for the transport latency suitable for the
2477 * check_smoother_status() call in the started callback */
2478 request_auto_timing_update(s, true);
2479
2480 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2481 return NULL;
2482
2483 /* This might cause the read index to start moving again, hence
2484 * let's request a timing update */
2485 request_auto_timing_update(s, true);
2486
2487 return o;
2488 }
2489
pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata)2490 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2491 pa_operation *o;
2492
2493 pa_assert(s);
2494 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2495 pa_assert(name);
2496
2497 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2498 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2499 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2500
2501 if (s->context->version >= 13) {
2502 pa_proplist *p = pa_proplist_new();
2503
2504 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2505 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2506 pa_proplist_free(p);
2507 } else {
2508 pa_tagstruct *t;
2509 uint32_t tag;
2510
2511 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2512 t = pa_tagstruct_command(
2513 s->context,
2514 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2515 &tag);
2516 pa_tagstruct_putu32(t, s->channel);
2517 pa_tagstruct_puts(t, name);
2518 pa_pstream_send_tagstruct(s->context->pstream, t);
2519 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2520 }
2521
2522 return o;
2523 }
2524
pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec)2525 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2526 pa_usec_t usec;
2527
2528 pa_assert(s);
2529 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2530
2531 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2532 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2533 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2534 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2535 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2536 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2537
2538 if (s->smoother)
2539 #ifdef USE_SMOOTHER_2
2540 usec = pa_smoother_2_get(s->smoother, pa_rtclock_now());
2541 #else
2542 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2543 #endif
2544
2545 else
2546 usec = calc_time(s, false);
2547
2548 /* Make sure the time runs monotonically */
2549 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2550 if (usec < s->previous_time)
2551 usec = s->previous_time;
2552 else
2553 s->previous_time = usec;
2554 }
2555
2556 if (r_usec)
2557 *r_usec = usec;
2558
2559 return 0;
2560 }
2561
time_counter_diff(const pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative)2562 static pa_usec_t time_counter_diff(const pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2563 pa_assert(s);
2564 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2565
2566 if (negative)
2567 *negative = 0;
2568
2569 if (a >= b)
2570 return a-b;
2571 else {
2572 if (negative && s->direction == PA_STREAM_RECORD) {
2573 *negative = 1;
2574 return b-a;
2575 } else
2576 return 0;
2577 }
2578 }
2579
pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative)2580 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2581 pa_usec_t t, c;
2582 int r;
2583 int64_t cindex;
2584
2585 pa_assert(s);
2586 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2587 pa_assert(r_usec);
2588
2589 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2590 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2591 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2592 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2593 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2594 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2595
2596 if ((r = pa_stream_get_time(s, &t)) < 0)
2597 return r;
2598
2599 if (s->direction == PA_STREAM_PLAYBACK)
2600 cindex = s->timing_info.write_index;
2601 else
2602 cindex = s->timing_info.read_index;
2603
2604 if (cindex < 0)
2605 cindex = 0;
2606
2607 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2608
2609 if (s->direction == PA_STREAM_PLAYBACK)
2610 *r_usec = time_counter_diff(s, c, t, negative);
2611 else
2612 *r_usec = time_counter_diff(s, t, c, negative);
2613
2614 return 0;
2615 }
2616
pa_stream_get_timing_info(pa_stream *s)2617 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2618 pa_assert(s);
2619 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2620
2621 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2622 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2623 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2624 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2625
2626 return &s->timing_info;
2627 }
2628
pa_stream_get_sample_spec(pa_stream *s)2629 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2630 pa_assert(s);
2631 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2632
2633 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2634
2635 return &s->sample_spec;
2636 }
2637
pa_stream_get_channel_map(pa_stream *s)2638 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2639 pa_assert(s);
2640 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2641
2642 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2643
2644 return &s->channel_map;
2645 }
2646
pa_stream_get_format_info(const pa_stream *s)2647 const pa_format_info* pa_stream_get_format_info(const pa_stream *s) {
2648 pa_assert(s);
2649 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2650
2651 /* We don't have the format till routing is done */
2652 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2653 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2654
2655 return s->format;
2656 }
pa_stream_get_buffer_attr(pa_stream *s)2657 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2658 pa_assert(s);
2659 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2660
2661 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2662 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2663 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2664
2665 return &s->buffer_attr;
2666 }
2667
stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)2668 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2669 pa_operation *o = userdata;
2670 int success = 1;
2671
2672 pa_assert(pd);
2673 pa_assert(o);
2674 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2675
2676 if (!o->context)
2677 goto finish;
2678
2679 if (command != PA_COMMAND_REPLY) {
2680 if (pa_context_handle_error(o->context, command, t, false) < 0)
2681 goto finish;
2682
2683 success = 0;
2684 } else {
2685 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2686 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2687 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2688 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2689 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2690 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2691 goto finish;
2692 }
2693 } else if (o->stream->direction == PA_STREAM_RECORD) {
2694 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2695 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2696 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2697 goto finish;
2698 }
2699 }
2700
2701 if (o->stream->context->version >= 13) {
2702 pa_usec_t usec;
2703
2704 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2705 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2706 goto finish;
2707 }
2708
2709 if (o->stream->direction == PA_STREAM_RECORD)
2710 o->stream->timing_info.configured_source_usec = usec;
2711 else
2712 o->stream->timing_info.configured_sink_usec = usec;
2713 }
2714
2715 if (!pa_tagstruct_eof(t)) {
2716 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2717 goto finish;
2718 }
2719 }
2720
2721 if (o->callback) {
2722 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2723 cb(o->stream, success, o->userdata);
2724 }
2725
2726 finish:
2727 pa_operation_done(o);
2728 pa_operation_unref(o);
2729 }
2730
pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata)2731 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2732 pa_operation *o;
2733 pa_tagstruct *t;
2734 uint32_t tag;
2735 pa_buffer_attr copy;
2736
2737 pa_assert(s);
2738 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2739 pa_assert(attr);
2740
2741 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2742 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2743 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2744 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2745
2746 /* Ask for a timing update before we cork/uncork to get the best
2747 * accuracy for the transport latency suitable for the
2748 * check_smoother_status() call in the started callback */
2749 request_auto_timing_update(s, true);
2750
2751 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2752
2753 t = pa_tagstruct_command(
2754 s->context,
2755 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2756 &tag);
2757 pa_tagstruct_putu32(t, s->channel);
2758
2759 copy = *attr;
2760 patch_buffer_attr(s, ©, NULL);
2761 attr = ©
2762
2763 pa_tagstruct_putu32(t, attr->maxlength);
2764
2765 if (s->direction == PA_STREAM_PLAYBACK)
2766 pa_tagstruct_put(
2767 t,
2768 PA_TAG_U32, attr->tlength,
2769 PA_TAG_U32, attr->prebuf,
2770 PA_TAG_U32, attr->minreq,
2771 PA_TAG_INVALID);
2772 else
2773 pa_tagstruct_putu32(t, attr->fragsize);
2774
2775 if (s->context->version >= 13)
2776 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2777
2778 if (s->context->version >= 14)
2779 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2780
2781 pa_pstream_send_tagstruct(s->context->pstream, t);
2782 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2783
2784 /* This might cause changes in the read/write index, hence let's
2785 * request a timing update */
2786 request_auto_timing_update(s, true);
2787
2788 return o;
2789 }
2790
pa_stream_get_device_index(const pa_stream *s)2791 uint32_t pa_stream_get_device_index(const pa_stream *s) {
2792 pa_assert(s);
2793 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2794
2795 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2796 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2797 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2798 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2799 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2800
2801 return s->device_index;
2802 }
2803
pa_stream_get_device_name(const pa_stream *s)2804 const char *pa_stream_get_device_name(const pa_stream *s) {
2805 pa_assert(s);
2806 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2807
2808 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2809 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2810 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2811 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2812 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2813
2814 return s->device_name;
2815 }
2816
pa_stream_is_suspended(const pa_stream *s)2817 int pa_stream_is_suspended(const pa_stream *s) {
2818 pa_assert(s);
2819 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2820
2821 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2822 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2823 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2824 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2825
2826 return s->suspended;
2827 }
2828
pa_stream_is_corked(const pa_stream *s)2829 int pa_stream_is_corked(const pa_stream *s) {
2830 pa_assert(s);
2831 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2832
2833 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2834 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2835 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2836
2837 return s->corked;
2838 }
2839
stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata)2840 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2841 pa_operation *o = userdata;
2842 int success = 1;
2843
2844 pa_assert(pd);
2845 pa_assert(o);
2846 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2847
2848 if (!o->context)
2849 goto finish;
2850
2851 if (command != PA_COMMAND_REPLY) {
2852 if (pa_context_handle_error(o->context, command, t, false) < 0)
2853 goto finish;
2854
2855 success = 0;
2856 } else {
2857
2858 if (!pa_tagstruct_eof(t)) {
2859 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2860 goto finish;
2861 }
2862 }
2863
2864 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2865 #ifdef USE_SMOOTHER_2
2866 if (o->stream->smoother)
2867 pa_smoother_2_set_rate(o->stream->smoother, pa_rtclock_now(), o->stream->sample_spec.rate);
2868 #endif
2869 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2870
2871 if (o->callback) {
2872 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2873 cb(o->stream, success, o->userdata);
2874 }
2875
2876 finish:
2877 pa_operation_done(o);
2878 pa_operation_unref(o);
2879 }
2880
pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata)2881 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2882 pa_operation *o;
2883 pa_tagstruct *t;
2884 uint32_t tag;
2885
2886 pa_assert(s);
2887 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2888
2889 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2890 PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_sample_rate_valid(rate), PA_ERR_INVALID);
2891 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2892 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2893 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2894 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2895
2896 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2897 o->private = PA_UINT_TO_PTR(rate);
2898
2899 t = pa_tagstruct_command(
2900 s->context,
2901 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2902 &tag);
2903 pa_tagstruct_putu32(t, s->channel);
2904 pa_tagstruct_putu32(t, rate);
2905
2906 pa_pstream_send_tagstruct(s->context->pstream, t);
2907 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2908
2909 return o;
2910 }
2911
pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata)2912 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2913 pa_operation *o;
2914 pa_tagstruct *t;
2915 uint32_t tag;
2916
2917 pa_assert(s);
2918 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2919
2920 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2921 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2922 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2923 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2924 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2925
2926 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2927
2928 t = pa_tagstruct_command(
2929 s->context,
2930 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2931 &tag);
2932 pa_tagstruct_putu32(t, s->channel);
2933 pa_tagstruct_putu32(t, (uint32_t) mode);
2934 pa_tagstruct_put_proplist(t, p);
2935
2936 pa_pstream_send_tagstruct(s->context->pstream, t);
2937 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2938
2939 /* Please note that we don't update s->proplist here, because we
2940 * don't export that field */
2941
2942 return o;
2943 }
2944
pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata)2945 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2946 pa_operation *o;
2947 pa_tagstruct *t;
2948 uint32_t tag;
2949 const char * const*k;
2950
2951 pa_assert(s);
2952 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2953
2954 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2955 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2956 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2957 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2958 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2959
2960 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2961
2962 t = pa_tagstruct_command(
2963 s->context,
2964 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2965 &tag);
2966 pa_tagstruct_putu32(t, s->channel);
2967
2968 for (k = keys; *k; k++)
2969 pa_tagstruct_puts(t, *k);
2970
2971 pa_tagstruct_puts(t, NULL);
2972
2973 pa_pstream_send_tagstruct(s->context->pstream, t);
2974 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2975
2976 /* Please note that we don't update s->proplist here, because we
2977 * don't export that field */
2978
2979 return o;
2980 }
2981
pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx)2982 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2983 pa_assert(s);
2984 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2985
2986 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2987 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2988 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2989 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2990
2991 s->direct_on_input = sink_input_idx;
2992
2993 return 0;
2994 }
2995
pa_stream_get_monitor_stream(const pa_stream *s)2996 uint32_t pa_stream_get_monitor_stream(const pa_stream *s) {
2997 pa_assert(s);
2998 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2999
3000 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
3001 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
3002 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
3003
3004 return s->direct_on_input;
3005 }
3006