1/*** 2 This file is part of PulseAudio. 3 4 Copyright 2004-2006 Lennart Poettering 5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB 6 7 PulseAudio is free software; you can redistribute it and/or modify 8 it under the terms of the GNU Lesser General Public License as published 9 by the Free Software Foundation; either version 2.1 of the License, 10 or (at your option) any later version. 11 12 PulseAudio is distributed in the hope that it will be useful, but 13 WITHOUT ANY WARRANTY; without even the implied warranty of 14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15 General Public License for more details. 16 17 You should have received a copy of the GNU Lesser General Public License 18 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 19***/ 20 21#ifdef HAVE_CONFIG_H 22#include <config.h> 23#endif 24 25#include <string.h> 26#include <stdio.h> 27#include <string.h> 28 29#include <pulse/def.h> 30#include <pulse/timeval.h> 31#include <pulse/rtclock.h> 32#include <pulse/xmalloc.h> 33#include <pulse/fork-detect.h> 34 35#include <pulsecore/pstream-util.h> 36#include <pulsecore/sample-util.h> 37#include <pulsecore/log.h> 38#include <pulsecore/hashmap.h> 39#include <pulsecore/macro.h> 40#include <pulsecore/core-rtclock.h> 41#include <pulsecore/core-util.h> 42 43#include "internal.h" 44#include "stream.h" 45 46/* #define STREAM_DEBUG */ 47 48#define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC) 49#define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC) 50 51#define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC) 52#ifndef USE_SMOOTHER_2 53#define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC) 54#define SMOOTHER_MIN_HISTORY (4) 55#endif 56 57pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) { 58 return pa_stream_new_with_proplist(c, name, ss, map, NULL); 59} 60 61static void reset_callbacks(pa_stream *s) { 62 s->read_callback = NULL; 63 s->read_userdata = NULL; 64 s->write_callback = NULL; 65 s->write_userdata = NULL; 66 s->state_callback = NULL; 67 s->state_userdata = NULL; 68 s->overflow_callback = NULL; 69 s->overflow_userdata = NULL; 70 s->underflow_callback = NULL; 71 s->underflow_userdata = NULL; 72 s->latency_update_callback = NULL; 73 s->latency_update_userdata = NULL; 74 s->moved_callback = NULL; 75 s->moved_userdata = NULL; 76 s->suspended_callback = NULL; 77 s->suspended_userdata = NULL; 78 s->started_callback = NULL; 79 s->started_userdata = NULL; 80 s->event_callback = NULL; 81 s->event_userdata = NULL; 82 s->buffer_attr_callback = NULL; 83 s->buffer_attr_userdata = NULL; 84 s->underflow_ohos_callback = NULL; 85 s->underflow_ohos_userdata = NULL; 86} 87 88static pa_stream *pa_stream_new_with_proplist_internal( 89 pa_context *c, 90 const char *name, 91 const pa_sample_spec *ss, 92 const pa_channel_map *map, 93 pa_format_info * const *formats, 94 unsigned int n_formats, 95 pa_proplist *p) { 96 97 pa_stream *s; 98 unsigned int i; 99 100 pa_assert(c); 101 pa_assert(PA_REFCNT_VALUE(c) >= 1); 102 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0)); 103 pa_assert(n_formats < PA_MAX_FORMATS); 104 105 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED); 106 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID); 107 108 s = pa_xnew(pa_stream, 1); 109 PA_REFCNT_INIT(s); 110 s->context = c; 111 s->mainloop = c->mainloop; 112 113 s->direction = PA_STREAM_NODIRECTION; 114 s->state = PA_STREAM_UNCONNECTED; 115 s->flags = 0; 116 117 if (ss) 118 s->sample_spec = *ss; 119 else 120 pa_sample_spec_init(&s->sample_spec); 121 122 if (map) 123 s->channel_map = *map; 124 else 125 pa_channel_map_init(&s->channel_map); 126 127 s->n_formats = 0; 128 if (formats) { 129 s->n_formats = n_formats; 130 for (i = 0; i < n_formats; i++) 131 s->req_formats[i] = pa_format_info_copy(formats[i]); 132 } 133 134 /* We'll get the final negotiated format after connecting */ 135 s->format = NULL; 136 137 s->direct_on_input = PA_INVALID_INDEX; 138 139 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new(); 140 if (name) 141 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name); 142 143 s->channel = 0; 144 s->channel_valid = false; 145 s->syncid = c->csyncid++; 146 s->stream_index = PA_INVALID_INDEX; 147 148 s->requested_bytes = 0; 149 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr)); 150 151 /* We initialize the target length here, so that if the user 152 * passes no explicit buffering metrics the default is similar to 153 * what older PA versions provided. */ 154 155 s->buffer_attr.maxlength = (uint32_t) -1; 156 if (ss) 157 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */ 158 else { 159 /* FIXME: We assume a worst-case compressed format corresponding to 160 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */ 161 pa_sample_spec tmp_ss = { 162 .format = PA_SAMPLE_S16NE, 163 .rate = 48000, 164 .channels = 2, 165 }; 166 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */ 167 } 168 s->buffer_attr.minreq = (uint32_t) -1; 169 s->buffer_attr.prebuf = (uint32_t) -1; 170 s->buffer_attr.fragsize = (uint32_t) -1; 171 172 s->device_index = PA_INVALID_INDEX; 173 s->device_name = NULL; 174 s->suspended = false; 175 s->corked = false; 176 177 s->write_memblock = NULL; 178 s->write_data = NULL; 179 180 pa_memchunk_reset(&s->peek_memchunk); 181 s->peek_data = NULL; 182 s->record_memblockq = NULL; 183 184 memset(&s->timing_info, 0, sizeof(s->timing_info)); 185 s->timing_info_valid = false; 186 187 s->previous_time = 0; 188 s->latest_underrun_at_index = -1; 189 190 s->read_index_not_before = 0; 191 s->write_index_not_before = 0; 192 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++) 193 s->write_index_corrections[i].valid = 0; 194 s->current_write_index_correction = 0; 195 196 s->auto_timing_update_event = NULL; 197 s->auto_timing_update_requested = false; 198 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC; 199 200 reset_callbacks(s); 201 202 s->smoother = NULL; 203 204 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */ 205 PA_LLIST_PREPEND(pa_stream, c->streams, s); 206 pa_stream_ref(s); 207 208 return s; 209} 210 211pa_stream *pa_stream_new_with_proplist( 212 pa_context *c, 213 const char *name, 214 const pa_sample_spec *ss, 215 const pa_channel_map *map, 216 pa_proplist *p) { 217 218 pa_channel_map tmap; 219 220 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID); 221 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED); 222 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED); 223 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED); 224 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID); 225 226 if (!map) 227 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID); 228 229 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p); 230} 231 232pa_stream *pa_stream_new_extended( 233 pa_context *c, 234 const char *name, 235 pa_format_info * const *formats, 236 unsigned int n_formats, 237 pa_proplist *p) { 238 239 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED); 240 241 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p); 242} 243 244static void stream_unlink(pa_stream *s) { 245 pa_operation *o, *n; 246 pa_assert(s); 247 248 if (!s->context) 249 return; 250 251 /* Detach from context */ 252 253 /* Unref all operation objects that point to us */ 254 for (o = s->context->operations; o; o = n) { 255 n = o->next; 256 257 if (o->stream == s) 258 pa_operation_cancel(o); 259 } 260 261 /* Drop all outstanding replies for this stream */ 262 if (s->context->pdispatch) 263 pa_pdispatch_unregister_reply(s->context->pdispatch, s); 264 265 if (s->channel_valid) { 266 pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel)); 267 s->channel = 0; 268 s->channel_valid = false; 269 } 270 271 PA_LLIST_REMOVE(pa_stream, s->context->streams, s); 272 pa_stream_unref(s); 273 274 s->context = NULL; 275 276 if (s->auto_timing_update_event) { 277 pa_assert(s->mainloop); 278 s->mainloop->time_free(s->auto_timing_update_event); 279 } 280 281 reset_callbacks(s); 282} 283 284static void stream_free(pa_stream *s) { 285 unsigned int i; 286 287 pa_assert(s); 288 289 stream_unlink(s); 290 291 if (s->write_memblock) { 292 if (s->write_data) 293 pa_memblock_release(s->write_memblock); 294 pa_memblock_unref(s->write_memblock); 295 } 296 297 if (s->peek_memchunk.memblock) { 298 if (s->peek_data) 299 pa_memblock_release(s->peek_memchunk.memblock); 300 pa_memblock_unref(s->peek_memchunk.memblock); 301 } 302 303 if (s->record_memblockq) 304 pa_memblockq_free(s->record_memblockq); 305 306 if (s->proplist) 307 pa_proplist_free(s->proplist); 308 309 if (s->smoother) 310#ifdef USE_SMOOTHER_2 311 pa_smoother_2_free(s->smoother); 312#else 313 pa_smoother_free(s->smoother); 314#endif 315 316 for (i = 0; i < s->n_formats; i++) 317 pa_format_info_free(s->req_formats[i]); 318 319 if (s->format) 320 pa_format_info_free(s->format); 321 322 pa_xfree(s->device_name); 323 pa_xfree(s); 324} 325 326void pa_stream_unref(pa_stream *s) { 327 pa_assert(s); 328 pa_assert(PA_REFCNT_VALUE(s) >= 1); 329 330 if (PA_REFCNT_DEC(s) <= 0) 331 stream_free(s); 332} 333 334pa_stream* pa_stream_ref(pa_stream *s) { 335 pa_assert(s); 336 pa_assert(PA_REFCNT_VALUE(s) >= 1); 337 338 PA_REFCNT_INC(s); 339 return s; 340} 341 342pa_stream_state_t pa_stream_get_state(const pa_stream *s) { 343 pa_assert(s); 344 pa_assert(PA_REFCNT_VALUE(s) >= 1); 345 346 return s->state; 347} 348 349void pa_stream_terminate(pa_stream *s) { 350 pa_stream_set_state(s, PA_STREAM_TERMINATED); 351} 352 353pa_context* pa_stream_get_context(const pa_stream *s) { 354 pa_assert(s); 355 pa_assert(PA_REFCNT_VALUE(s) >= 1); 356 357 return s->context; 358} 359 360uint32_t pa_stream_get_index(const pa_stream *s) { 361 pa_assert(s); 362 pa_assert(PA_REFCNT_VALUE(s) >= 1); 363 364 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX); 365 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX); 366 367 return s->stream_index; 368} 369 370void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) { 371 pa_assert(s); 372 pa_assert(PA_REFCNT_VALUE(s) >= 1); 373 374 if (s->state == st) 375 return; 376 377 pa_stream_ref(s); 378 379 s->state = st; 380 381 if (s->state_callback) 382 s->state_callback(s, s->state_userdata); 383 384 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED)) 385 stream_unlink(s); 386 387 pa_stream_unref(s); 388} 389 390static void request_auto_timing_update(pa_stream *s, bool force) { 391 pa_assert(s); 392 pa_assert(PA_REFCNT_VALUE(s) >= 1); 393 394 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE)) 395 return; 396 397 if (s->state == PA_STREAM_READY && 398 (force || !s->auto_timing_update_requested)) { 399 pa_operation *o; 400 401#ifdef STREAM_DEBUG 402 pa_log_debug("Automatically requesting new timing data"); 403#endif 404 405 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) { 406 pa_operation_unref(o); 407 s->auto_timing_update_requested = true; 408 } 409 } 410 411 if (s->auto_timing_update_event) { 412 if (s->suspended && !force) { 413 pa_assert(s->mainloop); 414 s->mainloop->time_free(s->auto_timing_update_event); 415 s->auto_timing_update_event = NULL; 416 } else { 417 if (force) 418 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC; 419 420 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec); 421 422 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2); 423 } 424 } 425} 426 427void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { 428 pa_context *c = userdata; 429 pa_stream *s; 430 uint32_t channel; 431 432 pa_assert(pd); 433 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED); 434 pa_assert(t); 435 pa_assert(c); 436 pa_assert(PA_REFCNT_VALUE(c) >= 1); 437 438 pa_context_ref(c); 439 440 if (pa_tagstruct_getu32(t, &channel) < 0 || 441 !pa_tagstruct_eof(t)) { 442 pa_context_fail(c, PA_ERR_PROTOCOL); 443 goto finish; 444 } 445 446 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel)))) 447 goto finish; 448 449 if (s->state != PA_STREAM_READY) 450 goto finish; 451 452 pa_context_set_error(c, PA_ERR_KILLED); 453 pa_stream_set_state(s, PA_STREAM_FAILED); 454 455finish: 456 pa_context_unref(c); 457} 458 459static void check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop) { 460 pa_usec_t x; 461 462 pa_assert(s); 463 pa_assert(!force_start || !force_stop); 464 465 if (!s->smoother) 466 return; 467 468 x = pa_rtclock_now(); 469 470 if (s->timing_info_valid) { 471 if (aposteriori) 472 x -= s->timing_info.transport_usec; 473 else 474 x += s->timing_info.transport_usec; 475 } 476 477 if (s->suspended || s->corked || force_stop) 478#ifdef USE_SMOOTHER_2 479 pa_smoother_2_pause(s->smoother, x); 480#else 481 pa_smoother_pause(s->smoother, x); 482#endif 483 else if (force_start || s->buffer_attr.prebuf == 0) { 484 485 if (!s->timing_info_valid && 486 !aposteriori && 487 !force_start && 488 !force_stop && 489 s->context->version >= 13) { 490 491 /* If the server supports STARTED events we take them as 492 * indications when audio really starts/stops playing, if 493 * we don't have any timing info yet -- instead of trying 494 * to be smart and guessing the server time. Otherwise the 495 * unknown transport delay adds too much noise to our time 496 * calculations. */ 497 498 return; 499 } 500 501#ifdef USE_SMOOTHER_2 502 pa_smoother_2_resume(s->smoother, x); 503#else 504 pa_smoother_resume(s->smoother, x, true); 505#endif 506 } 507 508 /* Please note that we have no idea if playback actually started 509 * if prebuf is non-zero! */ 510} 511 512static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata); 513 514void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { 515 pa_context *c = userdata; 516 pa_stream *s; 517 uint32_t channel; 518 const char *dn; 519 bool suspended; 520 uint32_t di; 521 pa_usec_t usec = 0; 522 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0; 523 524 pa_assert(pd); 525 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED); 526 pa_assert(t); 527 pa_assert(c); 528 pa_assert(PA_REFCNT_VALUE(c) >= 1); 529 530 pa_context_ref(c); 531 532 if (c->version < 12) { 533 pa_context_fail(c, PA_ERR_PROTOCOL); 534 goto finish; 535 } 536 537 if (pa_tagstruct_getu32(t, &channel) < 0 || 538 pa_tagstruct_getu32(t, &di) < 0 || 539 pa_tagstruct_gets(t, &dn) < 0 || 540 pa_tagstruct_get_boolean(t, &suspended) < 0) { 541 pa_context_fail(c, PA_ERR_PROTOCOL); 542 goto finish; 543 } 544 545 if (c->version >= 13) { 546 547 if (command == PA_COMMAND_RECORD_STREAM_MOVED) { 548 if (pa_tagstruct_getu32(t, &maxlength) < 0 || 549 pa_tagstruct_getu32(t, &fragsize) < 0 || 550 pa_tagstruct_get_usec(t, &usec) < 0) { 551 pa_context_fail(c, PA_ERR_PROTOCOL); 552 goto finish; 553 } 554 } else { 555 if (pa_tagstruct_getu32(t, &maxlength) < 0 || 556 pa_tagstruct_getu32(t, &tlength) < 0 || 557 pa_tagstruct_getu32(t, &prebuf) < 0 || 558 pa_tagstruct_getu32(t, &minreq) < 0 || 559 pa_tagstruct_get_usec(t, &usec) < 0) { 560 pa_context_fail(c, PA_ERR_PROTOCOL); 561 goto finish; 562 } 563 } 564 } 565 566 if (!pa_tagstruct_eof(t)) { 567 pa_context_fail(c, PA_ERR_PROTOCOL); 568 goto finish; 569 } 570 571 if (!dn || di == PA_INVALID_INDEX) { 572 pa_context_fail(c, PA_ERR_PROTOCOL); 573 goto finish; 574 } 575 576 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel)))) 577 goto finish; 578 579 if (s->state != PA_STREAM_READY) 580 goto finish; 581 582 if (c->version >= 13) { 583 if (s->direction == PA_STREAM_RECORD) 584 s->timing_info.configured_source_usec = usec; 585 else 586 s->timing_info.configured_sink_usec = usec; 587 588 s->buffer_attr.maxlength = maxlength; 589 s->buffer_attr.fragsize = fragsize; 590 s->buffer_attr.tlength = tlength; 591 s->buffer_attr.prebuf = prebuf; 592 s->buffer_attr.minreq = minreq; 593 } 594 595 pa_xfree(s->device_name); 596 s->device_name = pa_xstrdup(dn); 597 s->device_index = di; 598 599 s->suspended = suspended; 600 601 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) { 602 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC; 603 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s); 604 request_auto_timing_update(s, true); 605 } 606 607 check_smoother_status(s, true, false, false); 608 request_auto_timing_update(s, true); 609 610 if (s->moved_callback) 611 s->moved_callback(s, s->moved_userdata); 612 613finish: 614 pa_context_unref(c); 615} 616 617void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { 618 pa_context *c = userdata; 619 pa_stream *s; 620 uint32_t channel; 621 pa_usec_t usec = 0; 622 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0; 623 624 pa_assert(pd); 625 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED); 626 pa_assert(t); 627 pa_assert(c); 628 pa_assert(PA_REFCNT_VALUE(c) >= 1); 629 630 pa_context_ref(c); 631 632 if (c->version < 15) { 633 pa_context_fail(c, PA_ERR_PROTOCOL); 634 goto finish; 635 } 636 637 if (pa_tagstruct_getu32(t, &channel) < 0) { 638 pa_context_fail(c, PA_ERR_PROTOCOL); 639 goto finish; 640 } 641 642 if (command == PA_COMMAND_RECORD_STREAM_MOVED) { 643 if (pa_tagstruct_getu32(t, &maxlength) < 0 || 644 pa_tagstruct_getu32(t, &fragsize) < 0 || 645 pa_tagstruct_get_usec(t, &usec) < 0) { 646 pa_context_fail(c, PA_ERR_PROTOCOL); 647 goto finish; 648 } 649 } else { 650 if (pa_tagstruct_getu32(t, &maxlength) < 0 || 651 pa_tagstruct_getu32(t, &tlength) < 0 || 652 pa_tagstruct_getu32(t, &prebuf) < 0 || 653 pa_tagstruct_getu32(t, &minreq) < 0 || 654 pa_tagstruct_get_usec(t, &usec) < 0) { 655 pa_context_fail(c, PA_ERR_PROTOCOL); 656 goto finish; 657 } 658 } 659 660 if (!pa_tagstruct_eof(t)) { 661 pa_context_fail(c, PA_ERR_PROTOCOL); 662 goto finish; 663 } 664 665 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel)))) 666 goto finish; 667 668 if (s->state != PA_STREAM_READY) 669 goto finish; 670 671 if (s->direction == PA_STREAM_RECORD) 672 s->timing_info.configured_source_usec = usec; 673 else 674 s->timing_info.configured_sink_usec = usec; 675 676 s->buffer_attr.maxlength = maxlength; 677 s->buffer_attr.fragsize = fragsize; 678 s->buffer_attr.tlength = tlength; 679 s->buffer_attr.prebuf = prebuf; 680 s->buffer_attr.minreq = minreq; 681 682 request_auto_timing_update(s, true); 683 684 if (s->buffer_attr_callback) 685 s->buffer_attr_callback(s, s->buffer_attr_userdata); 686 687finish: 688 pa_context_unref(c); 689} 690 691void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { 692 pa_context *c = userdata; 693 pa_stream *s; 694 uint32_t channel; 695 bool suspended; 696 697 pa_assert(pd); 698 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED); 699 pa_assert(t); 700 pa_assert(c); 701 pa_assert(PA_REFCNT_VALUE(c) >= 1); 702 703 pa_context_ref(c); 704 705 if (c->version < 12) { 706 pa_context_fail(c, PA_ERR_PROTOCOL); 707 goto finish; 708 } 709 710 if (pa_tagstruct_getu32(t, &channel) < 0 || 711 pa_tagstruct_get_boolean(t, &suspended) < 0 || 712 !pa_tagstruct_eof(t)) { 713 pa_context_fail(c, PA_ERR_PROTOCOL); 714 goto finish; 715 } 716 717 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel)))) 718 goto finish; 719 720 if (s->state != PA_STREAM_READY) 721 goto finish; 722 723 s->suspended = suspended; 724 725 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) { 726 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC; 727 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s); 728 request_auto_timing_update(s, true); 729 } 730 731 check_smoother_status(s, true, false, false); 732 request_auto_timing_update(s, true); 733 734 if (s->suspended_callback) 735 s->suspended_callback(s, s->suspended_userdata); 736 737finish: 738 pa_context_unref(c); 739} 740 741void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { 742 pa_context *c = userdata; 743 pa_stream *s; 744 uint32_t channel; 745 746 pa_assert(pd); 747 pa_assert(command == PA_COMMAND_STARTED); 748 pa_assert(t); 749 pa_assert(c); 750 pa_assert(PA_REFCNT_VALUE(c) >= 1); 751 752 pa_context_ref(c); 753 754 if (c->version < 13) { 755 pa_context_fail(c, PA_ERR_PROTOCOL); 756 goto finish; 757 } 758 759 if (pa_tagstruct_getu32(t, &channel) < 0 || 760 !pa_tagstruct_eof(t)) { 761 pa_context_fail(c, PA_ERR_PROTOCOL); 762 goto finish; 763 } 764 765 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel)))) 766 goto finish; 767 768 if (s->state != PA_STREAM_READY) 769 goto finish; 770 771 check_smoother_status(s, true, true, false); 772 request_auto_timing_update(s, true); 773 774 if (s->started_callback) 775 s->started_callback(s, s->started_userdata); 776 777finish: 778 pa_context_unref(c); 779} 780 781void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { 782 pa_context *c = userdata; 783 pa_stream *s; 784 uint32_t channel; 785 pa_proplist *pl = NULL; 786 const char *event; 787 788 pa_assert(pd); 789 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT); 790 pa_assert(t); 791 pa_assert(c); 792 pa_assert(PA_REFCNT_VALUE(c) >= 1); 793 794 pa_context_ref(c); 795 796 if (c->version < 15) { 797 pa_context_fail(c, PA_ERR_PROTOCOL); 798 goto finish; 799 } 800 801 pl = pa_proplist_new(); 802 803 if (pa_tagstruct_getu32(t, &channel) < 0 || 804 pa_tagstruct_gets(t, &event) < 0 || 805 pa_tagstruct_get_proplist(t, pl) < 0 || 806 !pa_tagstruct_eof(t) || !event) { 807 pa_context_fail(c, PA_ERR_PROTOCOL); 808 goto finish; 809 } 810 811 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel)))) 812 goto finish; 813 814 if (s->state != PA_STREAM_READY) 815 goto finish; 816 817 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) { 818 /* Let client know what the running time was when the stream had to be killed */ 819 pa_usec_t stream_time; 820 if (pa_stream_get_time(s, &stream_time) == 0) 821 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time); 822 } 823 824 if (s->event_callback) 825 s->event_callback(s, event, pl, s->event_userdata); 826 827finish: 828 pa_context_unref(c); 829 830 if (pl) 831 pa_proplist_free(pl); 832} 833 834void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { 835 pa_stream *s; 836 pa_context *c = userdata; 837 uint32_t bytes, channel; 838 839 pa_assert(pd); 840 pa_assert(command == PA_COMMAND_REQUEST); 841 pa_assert(t); 842 pa_assert(c); 843 pa_assert(PA_REFCNT_VALUE(c) >= 1); 844 845 pa_context_ref(c); 846 847 if (pa_tagstruct_getu32(t, &channel) < 0 || 848 pa_tagstruct_getu32(t, &bytes) < 0 || 849 !pa_tagstruct_eof(t)) { 850 pa_context_fail(c, PA_ERR_PROTOCOL); 851 goto finish; 852 } 853 854 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel)))) 855 goto finish; 856 857 if (s->state != PA_STREAM_READY) 858 goto finish; 859 860 s->requested_bytes += bytes; 861 862#ifdef STREAM_DEBUG 863 pa_log_debug("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); 864#endif 865 866 if (s->requested_bytes > 0 && s->write_callback) 867 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata); 868 869finish: 870 pa_context_unref(c); 871} 872 873int64_t pa_stream_get_underflow_index(const pa_stream *p) { 874 pa_assert(p); 875 return p->latest_underrun_at_index; 876} 877 878void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { 879 pa_stream *s; 880 pa_context *c = userdata; 881 uint32_t channel; 882 int64_t offset = -1; 883 884 pa_assert(pd); 885 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW 886 || command == PA_COMMAND_UNDERFLOW_OHOS); 887 pa_assert(t); 888 pa_assert(c); 889 pa_assert(PA_REFCNT_VALUE(c) >= 1); 890 891 pa_context_ref(c); 892 893 if (pa_tagstruct_getu32(t, &channel) < 0) { 894 pa_context_fail(c, PA_ERR_PROTOCOL); 895 goto finish; 896 } 897 898 if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) { 899 if (pa_tagstruct_gets64(t, &offset) < 0) { 900 pa_context_fail(c, PA_ERR_PROTOCOL); 901 goto finish; 902 } 903 } 904 905 if (!pa_tagstruct_eof(t)) { 906 pa_context_fail(c, PA_ERR_PROTOCOL); 907 goto finish; 908 } 909 910 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel)))) 911 goto finish; 912 913 if (s->state != PA_STREAM_READY) 914 goto finish; 915 916 if (command == PA_COMMAND_UNDERFLOW_OHOS) { 917 if (s->underflow_ohos_callback) { 918 s->underflow_ohos_callback(s, s->underflow_ohos_userdata); 919 } 920 goto finish; 921 } 922 923 if (offset != -1) 924 s->latest_underrun_at_index = offset; 925 926 if (s->buffer_attr.prebuf > 0) 927 check_smoother_status(s, true, false, true); 928 929 request_auto_timing_update(s, true); 930 931 if (command == PA_COMMAND_OVERFLOW) { 932 if (s->overflow_callback) 933 s->overflow_callback(s, s->overflow_userdata); 934 } else if (command == PA_COMMAND_UNDERFLOW) { 935 if (s->underflow_callback) 936 s->underflow_callback(s, s->underflow_userdata); 937 } 938 939finish: 940 pa_context_unref(c); 941} 942 943static void invalidate_indexes(pa_stream *s, bool r, bool w) { 944 pa_assert(s); 945 pa_assert(PA_REFCNT_VALUE(s) >= 1); 946 947#ifdef STREAM_DEBUG 948 pa_log_debug("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); 949#endif 950 951 if (s->state != PA_STREAM_READY) 952 return; 953 954 if (w) { 955 s->write_index_not_before = s->context->ctag; 956 957 if (s->timing_info_valid) 958 s->timing_info.write_index_corrupt = true; 959 960#ifdef STREAM_DEBUG 961 pa_log_debug("write_index invalidated"); 962#endif 963 } 964 965 if (r) { 966 s->read_index_not_before = s->context->ctag; 967 968 if (s->timing_info_valid) 969 s->timing_info.read_index_corrupt = true; 970 971#ifdef STREAM_DEBUG 972 pa_log_debug("read_index invalidated"); 973#endif 974 } 975 976 request_auto_timing_update(s, true); 977} 978 979static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) { 980 pa_stream *s = userdata; 981 982 pa_assert(s); 983 pa_assert(PA_REFCNT_VALUE(s) >= 1); 984 985 pa_stream_ref(s); 986 request_auto_timing_update(s, false); 987 pa_stream_unref(s); 988} 989 990static void create_stream_complete(pa_stream *s) { 991 pa_assert(s); 992 pa_assert(PA_REFCNT_VALUE(s) >= 1); 993 pa_assert(s->state == PA_STREAM_CREATING); 994 995 pa_stream_set_state(s, PA_STREAM_READY); 996 997 if (s->requested_bytes > 0 && s->write_callback) 998 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata); 999 1000 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) { 1001 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC; 1002 pa_assert(!s->auto_timing_update_event); 1003 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s); 1004 1005 request_auto_timing_update(s, true); 1006 } 1007 1008 check_smoother_status(s, true, false, false); 1009} 1010 1011static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) { 1012 const char *e; 1013 1014 pa_assert(s); 1015 pa_assert(attr); 1016 1017 if ((e = getenv("PULSE_LATENCY_MSEC"))) { 1018 uint32_t ms; 1019 pa_sample_spec ss; 1020 1021 pa_sample_spec_init(&ss); 1022 1023 if (pa_sample_spec_valid(&s->sample_spec)) 1024 ss = s->sample_spec; 1025 else if (s->n_formats == 1) 1026 pa_format_info_to_sample_spec(s->req_formats[0], &ss, NULL); 1027 1028 if (pa_atou(e, &ms) < 0 || ms <= 0) 1029 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e); 1030 else if (!pa_sample_spec_valid(&s->sample_spec)) 1031 pa_log_debug("Ignoring $PULSE_LATENCY_MSEC: %s (invalid sample spec)", e); 1032 else { 1033 attr->maxlength = (uint32_t) -1; 1034 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &ss); 1035 attr->minreq = (uint32_t) -1; 1036 attr->prebuf = (uint32_t) -1; 1037 attr->fragsize = attr->tlength; 1038 1039 if (flags) 1040 *flags |= PA_STREAM_ADJUST_LATENCY; 1041 } 1042 } 1043 1044 if (s->context->version >= 13) 1045 return; 1046 1047 /* Version older than 0.9.10 didn't do server side buffer_attr 1048 * selection, hence we have to fake it on the client side. */ 1049 1050 /* We choose fairly conservative values here, to not confuse 1051 * old clients with extremely large playback buffers */ 1052 1053 if (attr->maxlength == (uint32_t) -1) 1054 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */ 1055 1056 if (attr->tlength == (uint32_t) -1) 1057 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */ 1058 1059 if (attr->minreq == (uint32_t) -1) 1060 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */ 1061 1062 if (attr->prebuf == (uint32_t) -1) 1063 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */ 1064 1065 if (attr->fragsize == (uint32_t) -1) 1066 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */ 1067} 1068 1069void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { 1070 pa_stream *s = userdata; 1071 uint32_t requested_bytes = 0; 1072 1073 pa_assert(pd); 1074 pa_assert(s); 1075 pa_assert(PA_REFCNT_VALUE(s) >= 1); 1076 pa_assert(s->state == PA_STREAM_CREATING); 1077 1078 pa_stream_ref(s); 1079 1080 if (command != PA_COMMAND_REPLY) { 1081 if (pa_context_handle_error(s->context, command, t, false) < 0) 1082 goto finish; 1083 1084 pa_stream_set_state(s, PA_STREAM_FAILED); 1085 goto finish; 1086 } 1087 1088 if (pa_tagstruct_getu32(t, &s->channel) < 0 || 1089 s->channel == PA_INVALID_INDEX || 1090 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) || 1091 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) { 1092 pa_context_fail(s->context, PA_ERR_PROTOCOL); 1093 goto finish; 1094 } 1095 1096 s->requested_bytes = (int64_t) requested_bytes; 1097 1098 if (s->context->version >= 9) { 1099 if (s->direction == PA_STREAM_PLAYBACK) { 1100 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 || 1101 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 || 1102 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 || 1103 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) { 1104 pa_context_fail(s->context, PA_ERR_PROTOCOL); 1105 goto finish; 1106 } 1107 } else if (s->direction == PA_STREAM_RECORD) { 1108 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 || 1109 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) { 1110 pa_context_fail(s->context, PA_ERR_PROTOCOL); 1111 goto finish; 1112 } 1113 } 1114 } 1115 1116 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) { 1117 pa_sample_spec ss; 1118 pa_channel_map cm; 1119 const char *dn = NULL; 1120 bool suspended; 1121 1122 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 || 1123 pa_tagstruct_get_channel_map(t, &cm) < 0 || 1124 pa_tagstruct_getu32(t, &s->device_index) < 0 || 1125 pa_tagstruct_gets(t, &dn) < 0 || 1126 pa_tagstruct_get_boolean(t, &suspended) < 0) { 1127 pa_context_fail(s->context, PA_ERR_PROTOCOL); 1128 goto finish; 1129 } 1130 1131 if (!dn || s->device_index == PA_INVALID_INDEX || 1132 ss.channels != cm.channels || 1133 !pa_channel_map_valid(&cm) || 1134 !pa_sample_spec_valid(&ss) || 1135 (s->n_formats == 0 && ( 1136 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) || 1137 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) || 1138 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) { 1139 pa_context_fail(s->context, PA_ERR_PROTOCOL); 1140 goto finish; 1141 } 1142 1143 pa_xfree(s->device_name); 1144 s->device_name = pa_xstrdup(dn); 1145 s->suspended = suspended; 1146 1147 s->channel_map = cm; 1148 s->sample_spec = ss; 1149 } 1150 1151#ifdef USE_SMOOTHER_2 1152 if (s->flags & PA_STREAM_INTERPOLATE_TIMING) 1153 pa_smoother_2_set_sample_spec(s->smoother, pa_rtclock_now(), &s->sample_spec); 1154#endif 1155 1156 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) { 1157 pa_usec_t usec; 1158 1159 if (pa_tagstruct_get_usec(t, &usec) < 0) { 1160 pa_context_fail(s->context, PA_ERR_PROTOCOL); 1161 goto finish; 1162 } 1163 1164 if (s->direction == PA_STREAM_RECORD) 1165 s->timing_info.configured_source_usec = usec; 1166 else 1167 s->timing_info.configured_sink_usec = usec; 1168 } 1169 1170 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK) 1171 || s->context->version >= 22) { 1172 1173 pa_format_info *f = pa_format_info_new(); 1174 1175 if (pa_tagstruct_get_format_info(t, f) < 0 || !pa_format_info_valid(f)) { 1176 pa_format_info_free(f); 1177 if (s->n_formats > 0) { 1178 /* We used the extended API, so we should have got back a proper format */ 1179 pa_context_fail(s->context, PA_ERR_PROTOCOL); 1180 goto finish; 1181 } 1182 } else 1183 s->format = f; 1184 } 1185 1186 if (!pa_tagstruct_eof(t)) { 1187 pa_context_fail(s->context, PA_ERR_PROTOCOL); 1188 goto finish; 1189 } 1190 1191 if (s->direction == PA_STREAM_RECORD) { 1192 pa_assert(!s->record_memblockq); 1193 1194 s->record_memblockq = pa_memblockq_new( 1195 "client side record memblockq", 1196 0, 1197 s->buffer_attr.maxlength, 1198 0, 1199 &s->sample_spec, 1200 1, 1201 0, 1202 0, 1203 NULL); 1204 } 1205 1206 s->channel_valid = true; 1207 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s); 1208 1209 create_stream_complete(s); 1210 1211finish: 1212 pa_stream_unref(s); 1213} 1214 1215static int create_stream( 1216 pa_stream_direction_t direction, 1217 pa_stream *s, 1218 const char *dev, 1219 const pa_buffer_attr *attr, 1220 pa_stream_flags_t flags, 1221 const pa_cvolume *volume, 1222 pa_stream *sync_stream) { 1223 1224 pa_tagstruct *t; 1225 uint32_t tag; 1226 bool volume_set = !!volume; 1227 pa_cvolume cv; 1228 uint32_t i; 1229 1230 pa_assert(s); 1231 pa_assert(PA_REFCNT_VALUE(s) >= 1); 1232 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD); 1233 1234 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED); 1235 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE); 1236 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE); 1237 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED| 1238 PA_STREAM_INTERPOLATE_TIMING| 1239 PA_STREAM_NOT_MONOTONIC| 1240 PA_STREAM_AUTO_TIMING_UPDATE| 1241 PA_STREAM_NO_REMAP_CHANNELS| 1242 PA_STREAM_NO_REMIX_CHANNELS| 1243 PA_STREAM_FIX_FORMAT| 1244 PA_STREAM_FIX_RATE| 1245 PA_STREAM_FIX_CHANNELS| 1246 PA_STREAM_DONT_MOVE| 1247 PA_STREAM_VARIABLE_RATE| 1248 PA_STREAM_PEAK_DETECT| 1249 PA_STREAM_START_MUTED| 1250 PA_STREAM_ADJUST_LATENCY| 1251 PA_STREAM_EARLY_REQUESTS| 1252 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND| 1253 PA_STREAM_START_UNMUTED| 1254 PA_STREAM_FAIL_ON_SUSPEND| 1255 PA_STREAM_RELATIVE_VOLUME| 1256 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID); 1257 1258 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED); 1259 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED); 1260 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE); 1261 /* Although some of the other flags are not supported on older 1262 * version, we don't check for them here, because it doesn't hurt 1263 * when they are passed but actually not supported. This makes 1264 * client development easier */ 1265 1266 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID); 1267 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID); 1268 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID); 1269 1270 pa_stream_ref(s); 1271 1272 s->direction = direction; 1273 1274 if (sync_stream) 1275 s->syncid = sync_stream->syncid; 1276 1277 if (attr) 1278 s->buffer_attr = *attr; 1279 patch_buffer_attr(s, &s->buffer_attr, &flags); 1280 1281 s->flags = flags; 1282 s->corked = !!(flags & PA_STREAM_START_CORKED); 1283 1284 if (flags & PA_STREAM_INTERPOLATE_TIMING) { 1285 pa_usec_t x; 1286 1287 x = pa_rtclock_now(); 1288 1289 pa_assert(!s->smoother); 1290#ifdef USE_SMOOTHER_2 1291 s->smoother = pa_smoother_2_new(SMOOTHER_HISTORY_TIME, x, 0, 0); 1292#else 1293 s->smoother = pa_smoother_new( 1294 SMOOTHER_ADJUST_TIME, 1295 SMOOTHER_HISTORY_TIME, 1296 !(flags & PA_STREAM_NOT_MONOTONIC), 1297 true, 1298 SMOOTHER_MIN_HISTORY, 1299 x, 1300 true); 1301#endif 1302 } 1303 1304 if (!dev) 1305 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source; 1306 1307 t = pa_tagstruct_command( 1308 s->context, 1309 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM), 1310 &tag); 1311 1312 if (s->context->version < 13) 1313 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)); 1314 1315 pa_tagstruct_put( 1316 t, 1317 PA_TAG_SAMPLE_SPEC, &s->sample_spec, 1318 PA_TAG_CHANNEL_MAP, &s->channel_map, 1319 PA_TAG_U32, PA_INVALID_INDEX, 1320 PA_TAG_STRING, dev, 1321 PA_TAG_U32, s->buffer_attr.maxlength, 1322 PA_TAG_BOOLEAN, s->corked, 1323 PA_TAG_INVALID); 1324 1325 if (!volume) { 1326 if (pa_sample_spec_valid(&s->sample_spec)) 1327 volume = pa_cvolume_reset(&cv, s->sample_spec.channels); 1328 else { 1329 /* This is not really relevant, since no volume was set, and 1330 * the real number of channels is embedded in the format_info 1331 * structure */ 1332 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX); 1333 } 1334 } 1335 1336 if (s->direction == PA_STREAM_PLAYBACK) { 1337 pa_tagstruct_put( 1338 t, 1339 PA_TAG_U32, s->buffer_attr.tlength, 1340 PA_TAG_U32, s->buffer_attr.prebuf, 1341 PA_TAG_U32, s->buffer_attr.minreq, 1342 PA_TAG_U32, s->syncid, 1343 PA_TAG_INVALID); 1344 1345 pa_tagstruct_put_cvolume(t, volume); 1346 } else 1347 pa_tagstruct_putu32(t, s->buffer_attr.fragsize); 1348 1349 if (s->context->version >= 12) { 1350 pa_tagstruct_put( 1351 t, 1352 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS, 1353 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS, 1354 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT, 1355 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE, 1356 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS, 1357 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE, 1358 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE, 1359 PA_TAG_INVALID); 1360 } 1361 1362 if (s->context->version >= 13) { 1363 1364 if (s->direction == PA_STREAM_PLAYBACK) 1365 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED); 1366 else 1367 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT); 1368 1369 pa_tagstruct_put( 1370 t, 1371 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY, 1372 PA_TAG_PROPLIST, s->proplist, 1373 PA_TAG_INVALID); 1374 1375 if (s->direction == PA_STREAM_RECORD) 1376 pa_tagstruct_putu32(t, s->direct_on_input); 1377 } 1378 1379 if (s->context->version >= 14) { 1380 1381 if (s->direction == PA_STREAM_PLAYBACK) 1382 pa_tagstruct_put_boolean(t, volume_set); 1383 1384 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS); 1385 } 1386 1387 if (s->context->version >= 15) { 1388 1389 if (s->direction == PA_STREAM_PLAYBACK) 1390 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED)); 1391 1392 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND); 1393 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND); 1394 } 1395 1396 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK) 1397 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME); 1398 1399 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK) 1400 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH)); 1401 1402 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK) 1403 || s->context->version >= 22) { 1404 1405 pa_tagstruct_putu8(t, s->n_formats); 1406 for (i = 0; i < s->n_formats; i++) 1407 pa_tagstruct_put_format_info(t, s->req_formats[i]); 1408 } 1409 1410 if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) { 1411 pa_tagstruct_put_cvolume(t, volume); 1412 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED); 1413 pa_tagstruct_put_boolean(t, volume_set); 1414 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED)); 1415 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME); 1416 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH)); 1417 } 1418 1419 pa_pstream_send_tagstruct(s->context->pstream, t); 1420 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL); 1421 1422 pa_stream_set_state(s, PA_STREAM_CREATING); 1423 1424 pa_stream_unref(s); 1425 return 0; 1426} 1427 1428int pa_stream_connect_playback( 1429 pa_stream *s, 1430 const char *dev, 1431 const pa_buffer_attr *attr, 1432 pa_stream_flags_t flags, 1433 const pa_cvolume *volume, 1434 pa_stream *sync_stream) { 1435 1436 pa_assert(s); 1437 pa_assert(PA_REFCNT_VALUE(s) >= 1); 1438 1439 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream); 1440} 1441 1442int pa_stream_connect_record( 1443 pa_stream *s, 1444 const char *dev, 1445 const pa_buffer_attr *attr, 1446 pa_stream_flags_t flags) { 1447 1448 pa_assert(s); 1449 pa_assert(PA_REFCNT_VALUE(s) >= 1); 1450 1451 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL); 1452} 1453 1454int pa_stream_begin_write( 1455 pa_stream *s, 1456 void **data, 1457 size_t *nbytes) { 1458 1459 pa_assert(s); 1460 pa_assert(PA_REFCNT_VALUE(s) >= 1); 1461 1462 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED); 1463 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 1464 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 1465 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID); 1466 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID); 1467 1468 if (*nbytes != (size_t) -1) { 1469 size_t m, fs; 1470 1471 m = pa_mempool_block_size_max(s->context->mempool); 1472 fs = pa_frame_size(&s->sample_spec); 1473 1474 m = (m / fs) * fs; 1475 if (*nbytes > m) 1476 *nbytes = m; 1477 } 1478 1479 if (!s->write_memblock) { 1480 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes); 1481 s->write_data = pa_memblock_acquire(s->write_memblock); 1482 } 1483 1484 *data = s->write_data; 1485 *nbytes = pa_memblock_get_length(s->write_memblock); 1486 1487 return 0; 1488} 1489 1490int pa_stream_cancel_write( 1491 pa_stream *s) { 1492 1493 pa_assert(s); 1494 pa_assert(PA_REFCNT_VALUE(s) >= 1); 1495 1496 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED); 1497 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 1498 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 1499 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE); 1500 1501 pa_assert(s->write_data); 1502 1503 pa_memblock_release(s->write_memblock); 1504 pa_memblock_unref(s->write_memblock); 1505 s->write_memblock = NULL; 1506 s->write_data = NULL; 1507 1508 return 0; 1509} 1510 1511int pa_stream_write_ext_free( 1512 pa_stream *s, 1513 const void *data, 1514 size_t length, 1515 pa_free_cb_t free_cb, 1516 void *free_cb_data, 1517 int64_t offset, 1518 pa_seek_mode_t seek) { 1519 1520 pa_assert(s); 1521 pa_assert(PA_REFCNT_VALUE(s) >= 1); 1522 pa_assert(data); 1523 1524 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED); 1525 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 1526 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 1527 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID); 1528 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID); 1529 PA_CHECK_VALIDITY(s->context, 1530 !s->write_memblock || 1531 ((data >= s->write_data) && 1532 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))), 1533 PA_ERR_INVALID); 1534 PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID); 1535 PA_CHECK_VALIDITY(s->context, length % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID); 1536 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID); 1537 1538 if (s->write_memblock) { 1539 pa_memchunk chunk; 1540 1541 /* pa_stream_write_begin() was called before */ 1542 1543 pa_memblock_release(s->write_memblock); 1544 1545 chunk.memblock = s->write_memblock; 1546 chunk.index = (const char *) data - (const char *) s->write_data; 1547 chunk.length = length; 1548 1549 s->write_memblock = NULL; 1550 s->write_data = NULL; 1551 1552 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk); 1553 pa_memblock_unref(chunk.memblock); 1554 1555 } else { 1556 pa_seek_mode_t t_seek = seek; 1557 int64_t t_offset = offset; 1558 size_t t_length = length; 1559 const void *t_data = data; 1560 1561 /* pa_stream_write_begin() was not called before */ 1562 1563 while (t_length > 0) { 1564 pa_memchunk chunk; 1565 1566 chunk.index = 0; 1567 1568 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) { 1569 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, free_cb_data, 1); 1570 chunk.length = t_length; 1571 } else { 1572 void *d; 1573 size_t blk_size_max; 1574 1575 /* Break large audio streams into _aligned_ blocks or the 1576 * other endpoint will happily discard them upon arrival. */ 1577 blk_size_max = pa_frame_align(pa_mempool_block_size_max(s->context->mempool), &s->sample_spec); 1578 chunk.length = PA_MIN(t_length, blk_size_max); 1579 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length); 1580 1581 d = pa_memblock_acquire(chunk.memblock); 1582 memcpy(d, t_data, chunk.length); 1583 pa_memblock_release(chunk.memblock); 1584 } 1585 1586 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk); 1587 1588 t_offset = 0; 1589 t_seek = PA_SEEK_RELATIVE; 1590 1591 t_data = (const uint8_t*) t_data + chunk.length; 1592 t_length -= chunk.length; 1593 1594 pa_memblock_unref(chunk.memblock); 1595 } 1596 1597 if (free_cb && pa_pstream_get_shm(s->context->pstream)) 1598 free_cb(free_cb_data); 1599 } 1600 1601 /* This is obviously wrong since we ignore the seeking index . But 1602 * that's OK, the server side applies the same error */ 1603 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length; 1604 1605#ifdef STREAM_DEBUG 1606 pa_log_debug("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); 1607#endif 1608 1609 if (s->direction == PA_STREAM_PLAYBACK) { 1610 1611 /* Update latency request correction */ 1612 if (s->write_index_corrections[s->current_write_index_correction].valid) { 1613 1614 if (seek == PA_SEEK_ABSOLUTE) { 1615 s->write_index_corrections[s->current_write_index_correction].corrupt = false; 1616 s->write_index_corrections[s->current_write_index_correction].absolute = true; 1617 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length; 1618 } else if (seek == PA_SEEK_RELATIVE) { 1619 if (!s->write_index_corrections[s->current_write_index_correction].corrupt) 1620 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length; 1621 } else 1622 s->write_index_corrections[s->current_write_index_correction].corrupt = true; 1623 } 1624 1625 /* Update the write index in the already available latency data */ 1626 if (s->timing_info_valid) { 1627 1628 if (seek == PA_SEEK_ABSOLUTE) { 1629 s->timing_info.write_index_corrupt = false; 1630 s->timing_info.write_index = offset + (int64_t) length; 1631 } else if (seek == PA_SEEK_RELATIVE) { 1632 if (!s->timing_info.write_index_corrupt) 1633 s->timing_info.write_index += offset + (int64_t) length; 1634 } else 1635 s->timing_info.write_index_corrupt = true; 1636 } 1637 1638 if (!s->timing_info_valid || s->timing_info.write_index_corrupt) 1639 request_auto_timing_update(s, true); 1640 } 1641 1642 return 0; 1643} 1644 1645int pa_stream_write( 1646 pa_stream *s, 1647 const void *data, 1648 size_t length, 1649 pa_free_cb_t free_cb, 1650 int64_t offset, 1651 pa_seek_mode_t seek) { 1652 1653 return pa_stream_write_ext_free(s, data, length, free_cb, (void*) data, offset, seek); 1654} 1655 1656int pa_stream_peek(pa_stream *s, const void **data, size_t *length) { 1657 pa_assert(s); 1658 pa_assert(PA_REFCNT_VALUE(s) >= 1); 1659 pa_assert(data); 1660 pa_assert(length); 1661 1662 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED); 1663 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 1664 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE); 1665 1666 if (!s->peek_memchunk.memblock) { 1667 1668 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) { 1669 /* record_memblockq is empty. */ 1670 *data = NULL; 1671 *length = 0; 1672 return 0; 1673 1674 } else if (!s->peek_memchunk.memblock) { 1675 /* record_memblockq isn't empty, but it doesn't have any data at 1676 * the current read index. */ 1677 *data = NULL; 1678 *length = s->peek_memchunk.length; 1679 return 0; 1680 } 1681 1682 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock); 1683 } 1684 1685 pa_assert(s->peek_data); 1686 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index; 1687 *length = s->peek_memchunk.length; 1688 return 0; 1689} 1690 1691int pa_stream_drop(pa_stream *s) { 1692 pa_assert(s); 1693 pa_assert(PA_REFCNT_VALUE(s) >= 1); 1694 1695 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED); 1696 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 1697 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE); 1698 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE); 1699 1700 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length); 1701 1702 /* Fix the simulated local read index */ 1703 if (s->timing_info_valid && !s->timing_info.read_index_corrupt) 1704 s->timing_info.read_index += (int64_t) s->peek_memchunk.length; 1705 1706 if (s->peek_memchunk.memblock) { 1707 pa_assert(s->peek_data); 1708 s->peek_data = NULL; 1709 pa_memblock_release(s->peek_memchunk.memblock); 1710 pa_memblock_unref(s->peek_memchunk.memblock); 1711 } 1712 1713 pa_memchunk_reset(&s->peek_memchunk); 1714 1715 return 0; 1716} 1717 1718size_t pa_stream_writable_size(const pa_stream *s) { 1719 pa_assert(s); 1720 pa_assert(PA_REFCNT_VALUE(s) >= 1); 1721 1722 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1); 1723 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1); 1724 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1); 1725 1726 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0; 1727} 1728 1729size_t pa_stream_readable_size(const pa_stream *s) { 1730 pa_assert(s); 1731 pa_assert(PA_REFCNT_VALUE(s) >= 1); 1732 1733 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1); 1734 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1); 1735 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1); 1736 1737 return pa_memblockq_get_length(s->record_memblockq); 1738} 1739 1740pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) { 1741 pa_operation *o; 1742 pa_tagstruct *t; 1743 uint32_t tag; 1744 1745 pa_assert(s); 1746 pa_assert(PA_REFCNT_VALUE(s) >= 1); 1747 1748 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 1749 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 1750 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE); 1751 1752 /* Ask for a timing update before we cork/uncork to get the best 1753 * accuracy for the transport latency suitable for the 1754 * check_smoother_status() call in the started callback */ 1755 request_auto_timing_update(s, true); 1756 1757 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata); 1758 1759 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag); 1760 pa_tagstruct_putu32(t, s->channel); 1761 pa_pstream_send_tagstruct(s->context->pstream, t); 1762 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref); 1763 1764 /* This might cause the read index to continue again, hence 1765 * let's request a timing update */ 1766 request_auto_timing_update(s, true); 1767 1768 return o; 1769} 1770 1771static pa_usec_t calc_time(const pa_stream *s, bool ignore_transport) { 1772 pa_usec_t usec; 1773 1774 pa_assert(s); 1775 pa_assert(PA_REFCNT_VALUE(s) >= 1); 1776 pa_assert(s->state == PA_STREAM_READY); 1777 pa_assert(s->direction != PA_STREAM_UPLOAD); 1778 pa_assert(s->timing_info_valid); 1779 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt); 1780 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt); 1781 1782 if (s->direction == PA_STREAM_PLAYBACK) { 1783 /* The last byte that was written into the output device 1784 * had this time value associated */ 1785 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec); 1786 1787 if (!s->corked && !s->suspended) { 1788 1789 if (!ignore_transport) 1790 /* Because the latency info took a little time to come 1791 * to us, we assume that the real output time is actually 1792 * a little ahead */ 1793 usec += s->timing_info.transport_usec; 1794 1795 /* However, the output device usually maintains a buffer 1796 too, hence the real sample currently played is a little 1797 back */ 1798 if (s->timing_info.sink_usec >= usec) 1799 usec = 0; 1800 else 1801 usec -= s->timing_info.sink_usec; 1802 } 1803 1804 } else { 1805 pa_assert(s->direction == PA_STREAM_RECORD); 1806 1807 /* The last byte written into the server side queue had 1808 * this time value associated */ 1809 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec); 1810 1811 if (!s->corked && !s->suspended) { 1812 1813 if (!ignore_transport) 1814 /* Add transport latency */ 1815 usec += s->timing_info.transport_usec; 1816 1817 /* Add latency of data in device buffer */ 1818 usec += s->timing_info.source_usec; 1819 1820 /* If this is a monitor source, we need to correct the 1821 * time by the playback device buffer */ 1822 if (s->timing_info.sink_usec >= usec) 1823 usec = 0; 1824 else 1825 usec -= s->timing_info.sink_usec; 1826 } 1827 } 1828 1829 return usec; 1830} 1831 1832#ifdef USE_SMOOTHER_2 1833static inline uint64_t calc_bytes(pa_stream *s, bool ignore_transport) { 1834 return (uint64_t)(calc_time(s, ignore_transport) * s->sample_spec.rate / PA_USEC_PER_SEC * pa_frame_size(&s->sample_spec)); 1835} 1836#endif 1837 1838static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { 1839 pa_operation *o = userdata; 1840 struct timeval local, remote, now; 1841 pa_timing_info *i; 1842 bool playing = false; 1843 uint64_t underrun_for = 0, playing_for = 0; 1844 1845 pa_assert(pd); 1846 pa_assert(o); 1847 pa_assert(PA_REFCNT_VALUE(o) >= 1); 1848 1849 if (!o->context || !o->stream) 1850 goto finish; 1851 1852 i = &o->stream->timing_info; 1853 1854 o->stream->timing_info_valid = false; 1855 i->write_index_corrupt = true; 1856 i->read_index_corrupt = true; 1857 1858 if (command != PA_COMMAND_REPLY) { 1859 if (pa_context_handle_error(o->context, command, t, false) < 0) 1860 goto finish; 1861 1862 } else { 1863 1864 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 || 1865 pa_tagstruct_get_usec(t, &i->source_usec) < 0 || 1866 pa_tagstruct_get_boolean(t, &playing) < 0 || 1867 pa_tagstruct_get_timeval(t, &local) < 0 || 1868 pa_tagstruct_get_timeval(t, &remote) < 0 || 1869 pa_tagstruct_gets64(t, &i->write_index) < 0 || 1870 pa_tagstruct_gets64(t, &i->read_index) < 0) { 1871 1872 pa_context_fail(o->context, PA_ERR_PROTOCOL); 1873 goto finish; 1874 } 1875 1876 if (o->context->version >= 13 && 1877 o->stream->direction == PA_STREAM_PLAYBACK) 1878 if (pa_tagstruct_getu64(t, &underrun_for) < 0 || 1879 pa_tagstruct_getu64(t, &playing_for) < 0) { 1880 1881 pa_context_fail(o->context, PA_ERR_PROTOCOL); 1882 goto finish; 1883 } 1884 1885 if (!pa_tagstruct_eof(t)) { 1886 pa_context_fail(o->context, PA_ERR_PROTOCOL); 1887 goto finish; 1888 } 1889 o->stream->timing_info_valid = true; 1890 i->write_index_corrupt = false; 1891 i->read_index_corrupt = false; 1892 1893 i->playing = (int) playing; 1894 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for); 1895 1896 pa_gettimeofday(&now); 1897 1898 /* Calculate timestamps */ 1899 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) { 1900 /* local and remote seem to have synchronized clocks */ 1901 1902 if (o->stream->direction == PA_STREAM_PLAYBACK) 1903 i->transport_usec = pa_timeval_diff(&remote, &local); 1904 else 1905 i->transport_usec = pa_timeval_diff(&now, &remote); 1906 1907 i->synchronized_clocks = true; 1908 i->timestamp = remote; 1909 } else { 1910 /* clocks are not synchronized, let's estimate latency then */ 1911 i->transport_usec = pa_timeval_diff(&now, &local)/2; 1912 i->synchronized_clocks = false; 1913 i->timestamp = local; 1914 pa_timeval_add(&i->timestamp, i->transport_usec); 1915 } 1916 1917 /* Invalidate read and write indexes if necessary */ 1918 if (tag < o->stream->read_index_not_before) 1919 i->read_index_corrupt = true; 1920 1921 if (tag < o->stream->write_index_not_before) 1922 i->write_index_corrupt = true; 1923 1924 if (o->stream->direction == PA_STREAM_PLAYBACK) { 1925 /* Write index correction */ 1926 1927 int n, j; 1928 uint32_t ctag = tag; 1929 1930 /* Go through the saved correction values and add up the 1931 * total correction.*/ 1932 for (n = 0, j = o->stream->current_write_index_correction+1; 1933 n < PA_MAX_WRITE_INDEX_CORRECTIONS; 1934 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) { 1935 1936 /* Step over invalid data or out-of-date data */ 1937 if (!o->stream->write_index_corrections[j].valid || 1938 o->stream->write_index_corrections[j].tag < ctag) 1939 continue; 1940 1941 /* Make sure that everything is in order */ 1942 ctag = o->stream->write_index_corrections[j].tag+1; 1943 1944 /* Now fix the write index */ 1945 if (o->stream->write_index_corrections[j].corrupt) { 1946 /* A corrupting seek was made */ 1947 i->write_index_corrupt = true; 1948 } else if (o->stream->write_index_corrections[j].absolute) { 1949 /* An absolute seek was made */ 1950 i->write_index = o->stream->write_index_corrections[j].value; 1951 i->write_index_corrupt = false; 1952 } else if (!i->write_index_corrupt) { 1953 /* A relative seek was made */ 1954 i->write_index += o->stream->write_index_corrections[j].value; 1955 } 1956 } 1957 1958 /* Clear old correction entries */ 1959 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) { 1960 if (!o->stream->write_index_corrections[n].valid) 1961 continue; 1962 1963 if (o->stream->write_index_corrections[n].tag <= tag) 1964 o->stream->write_index_corrections[n].valid = false; 1965 } 1966 } 1967 1968 if (o->stream->direction == PA_STREAM_RECORD) { 1969 /* Read index correction */ 1970 1971 if (!i->read_index_corrupt) 1972 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq); 1973 } 1974 1975 /* Update smoother if we're not corked */ 1976 if (o->stream->smoother && !o->stream->corked) { 1977 pa_usec_t u, x; 1978 1979 u = x = pa_rtclock_now() - i->transport_usec; 1980 1981 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) { 1982 pa_usec_t su; 1983 1984 /* If we weren't playing then it will take some time 1985 * until the audio will actually come out through the 1986 * speakers. Since we follow that timing here, we need 1987 * to try to fix this up */ 1988 1989 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec); 1990 1991 if (su < i->sink_usec) 1992 x += i->sink_usec - su; 1993 } 1994 1995 if (!i->playing) 1996#ifdef USE_SMOOTHER_2 1997 pa_smoother_2_pause(o->stream->smoother, x); 1998#else 1999 pa_smoother_pause(o->stream->smoother, x); 2000#endif 2001 2002 /* Update the smoother */ 2003 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) || 2004 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt)) 2005#ifdef USE_SMOOTHER_2 2006 pa_smoother_2_put(o->stream->smoother, u, calc_bytes(o->stream, true)); 2007#else 2008 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, true)); 2009#endif 2010 2011 if (i->playing) 2012#ifdef USE_SMOOTHER_2 2013 pa_smoother_2_resume(o->stream->smoother, x); 2014#else 2015 pa_smoother_resume(o->stream->smoother, x, true); 2016#endif 2017 } 2018 } 2019 2020 o->stream->auto_timing_update_requested = false; 2021 2022 if (o->stream->latency_update_callback) 2023 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata); 2024 2025 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) { 2026 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback; 2027 cb(o->stream, o->stream->timing_info_valid, o->userdata); 2028 } 2029 2030finish: 2031 2032 pa_operation_done(o); 2033 pa_operation_unref(o); 2034} 2035 2036pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) { 2037 uint32_t tag; 2038 pa_operation *o; 2039 pa_tagstruct *t; 2040 struct timeval now; 2041 int cidx = 0; 2042 2043 pa_assert(s); 2044 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2045 2046 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2047 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2048 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 2049 2050 if (s->direction == PA_STREAM_PLAYBACK) { 2051 /* Find a place to store the write_index correction data for this entry */ 2052 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS; 2053 2054 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */ 2055 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL); 2056 } 2057 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata); 2058 2059 t = pa_tagstruct_command( 2060 s->context, 2061 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY), 2062 &tag); 2063 pa_tagstruct_putu32(t, s->channel); 2064 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now)); 2065 2066 pa_pstream_send_tagstruct(s->context->pstream, t); 2067 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref); 2068 2069 if (s->direction == PA_STREAM_PLAYBACK) { 2070 /* Fill in initial correction data */ 2071 2072 s->current_write_index_correction = cidx; 2073 2074 s->write_index_corrections[cidx].valid = true; 2075 s->write_index_corrections[cidx].absolute = false; 2076 s->write_index_corrections[cidx].corrupt = false; 2077 s->write_index_corrections[cidx].tag = tag; 2078 s->write_index_corrections[cidx].value = 0; 2079 } 2080 2081 return o; 2082} 2083 2084void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { 2085 pa_stream *s = userdata; 2086 2087 pa_assert(pd); 2088 pa_assert(s); 2089 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2090 2091 pa_stream_ref(s); 2092 2093 if (command != PA_COMMAND_REPLY) { 2094 if (pa_context_handle_error(s->context, command, t, false) < 0) 2095 goto finish; 2096 2097 pa_stream_set_state(s, PA_STREAM_FAILED); 2098 goto finish; 2099 } else if (!pa_tagstruct_eof(t)) { 2100 pa_context_fail(s->context, PA_ERR_PROTOCOL); 2101 goto finish; 2102 } 2103 2104 pa_stream_set_state(s, PA_STREAM_TERMINATED); 2105 2106finish: 2107 pa_stream_unref(s); 2108} 2109 2110int pa_stream_disconnect(pa_stream *s) { 2111 pa_tagstruct *t; 2112 uint32_t tag; 2113 2114 pa_assert(s); 2115 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2116 2117 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2118 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE); 2119 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE); 2120 2121 pa_stream_ref(s); 2122 2123 t = pa_tagstruct_command( 2124 s->context, 2125 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM : 2126 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)), 2127 &tag); 2128 pa_tagstruct_putu32(t, s->channel); 2129 pa_pstream_send_tagstruct(s->context->pstream, t); 2130 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL); 2131 2132 pa_stream_unref(s); 2133 return 0; 2134} 2135 2136void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) { 2137 pa_assert(s); 2138 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2139 2140 if (pa_detect_fork()) 2141 return; 2142 2143 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED) 2144 return; 2145 2146 s->read_callback = cb; 2147 s->read_userdata = userdata; 2148} 2149 2150void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) { 2151 pa_assert(s); 2152 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2153 2154 if (pa_detect_fork()) 2155 return; 2156 2157 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED) 2158 return; 2159 2160 s->write_callback = cb; 2161 s->write_userdata = userdata; 2162} 2163 2164void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) { 2165 pa_assert(s); 2166 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2167 2168 if (pa_detect_fork()) 2169 return; 2170 2171 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED) 2172 return; 2173 2174 s->state_callback = cb; 2175 s->state_userdata = userdata; 2176} 2177 2178void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) { 2179 pa_assert(s); 2180 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2181 2182 if (pa_detect_fork()) 2183 return; 2184 2185 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED) 2186 return; 2187 2188 s->overflow_callback = cb; 2189 s->overflow_userdata = userdata; 2190} 2191 2192void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) { 2193 pa_assert(s); 2194 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2195 2196 if (pa_detect_fork()) 2197 return; 2198 2199 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED) 2200 return; 2201 2202 s->underflow_callback = cb; 2203 s->underflow_userdata = userdata; 2204} 2205 2206void pa_stream_set_underflow_ohos_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) { 2207 pa_assert(s); 2208 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2209 2210 if (pa_detect_fork()) 2211 return; 2212 2213 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED) 2214 return; 2215 2216 s->underflow_ohos_callback = cb; 2217 s->underflow_ohos_userdata = userdata; 2218} 2219 2220void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) { 2221 pa_assert(s); 2222 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2223 2224 if (pa_detect_fork()) 2225 return; 2226 2227 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED) 2228 return; 2229 2230 s->latency_update_callback = cb; 2231 s->latency_update_userdata = userdata; 2232} 2233 2234void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) { 2235 pa_assert(s); 2236 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2237 2238 if (pa_detect_fork()) 2239 return; 2240 2241 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED) 2242 return; 2243 2244 s->moved_callback = cb; 2245 s->moved_userdata = userdata; 2246} 2247 2248void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) { 2249 pa_assert(s); 2250 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2251 2252 if (pa_detect_fork()) 2253 return; 2254 2255 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED) 2256 return; 2257 2258 s->suspended_callback = cb; 2259 s->suspended_userdata = userdata; 2260} 2261 2262void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) { 2263 pa_assert(s); 2264 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2265 2266 if (pa_detect_fork()) 2267 return; 2268 2269 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED) 2270 return; 2271 2272 s->started_callback = cb; 2273 s->started_userdata = userdata; 2274} 2275 2276void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) { 2277 pa_assert(s); 2278 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2279 2280 if (pa_detect_fork()) 2281 return; 2282 2283 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED) 2284 return; 2285 2286 s->event_callback = cb; 2287 s->event_userdata = userdata; 2288} 2289 2290void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) { 2291 pa_assert(s); 2292 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2293 2294 if (pa_detect_fork()) 2295 return; 2296 2297 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED) 2298 return; 2299 2300 s->buffer_attr_callback = cb; 2301 s->buffer_attr_userdata = userdata; 2302} 2303 2304void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { 2305 pa_operation *o = userdata; 2306 int success = 1; 2307 2308 pa_assert(pd); 2309 pa_assert(o); 2310 pa_assert(PA_REFCNT_VALUE(o) >= 1); 2311 2312 if (!o->context) 2313 goto finish; 2314 2315 if (command != PA_COMMAND_REPLY) { 2316 if (pa_context_handle_error(o->context, command, t, false) < 0) 2317 goto finish; 2318 2319 success = 0; 2320 } else if (!pa_tagstruct_eof(t)) { 2321 pa_context_fail(o->context, PA_ERR_PROTOCOL); 2322 goto finish; 2323 } 2324 2325 if (o->callback) { 2326 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback; 2327 cb(o->stream, success, o->userdata); 2328 } 2329 2330finish: 2331 pa_operation_done(o); 2332 pa_operation_unref(o); 2333} 2334 2335pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) { 2336 pa_operation *o; 2337 pa_tagstruct *t; 2338 uint32_t tag; 2339 2340 pa_assert(s); 2341 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2342 2343 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2344 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2345 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 2346 2347 /* Ask for a timing update before we cork/uncork to get the best 2348 * accuracy for the transport latency suitable for the 2349 * check_smoother_status() call in the started callback */ 2350 request_auto_timing_update(s, true); 2351 2352 s->corked = b; 2353 2354 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata); 2355 2356 t = pa_tagstruct_command( 2357 s->context, 2358 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM), 2359 &tag); 2360 pa_tagstruct_putu32(t, s->channel); 2361 pa_tagstruct_put_boolean(t, !!b); 2362 pa_pstream_send_tagstruct(s->context->pstream, t); 2363 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref); 2364 2365 check_smoother_status(s, false, false, false); 2366 2367 /* This might cause the indexes to hang/start again, hence let's 2368 * request a timing update, after the cork/uncork, too */ 2369 request_auto_timing_update(s, true); 2370 2371 return o; 2372} 2373 2374static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) { 2375 pa_tagstruct *t; 2376 pa_operation *o; 2377 uint32_t tag; 2378 2379 pa_assert(s); 2380 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2381 2382 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2383 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2384 2385 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata); 2386 2387 t = pa_tagstruct_command(s->context, command, &tag); 2388 pa_tagstruct_putu32(t, s->channel); 2389 pa_pstream_send_tagstruct(s->context->pstream, t); 2390 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref); 2391 2392 return o; 2393} 2394 2395pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) { 2396 pa_operation *o; 2397 2398 pa_assert(s); 2399 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2400 2401 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2402 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2403 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 2404 2405 /* Ask for a timing update *before* the flush, so that the 2406 * transport usec is as up to date as possible when we get the 2407 * underflow message and update the smoother status*/ 2408 request_auto_timing_update(s, true); 2409 2410 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata))) 2411 return NULL; 2412 2413 if (s->direction == PA_STREAM_PLAYBACK) { 2414 2415 if (s->write_index_corrections[s->current_write_index_correction].valid) 2416 s->write_index_corrections[s->current_write_index_correction].corrupt = true; 2417 2418 if (s->buffer_attr.prebuf > 0) 2419 check_smoother_status(s, false, false, true); 2420 2421 /* This will change the write index, but leave the 2422 * read index untouched. */ 2423 invalidate_indexes(s, false, true); 2424 2425 } else 2426 /* For record streams this has no influence on the write 2427 * index, but the read index might jump. */ 2428 invalidate_indexes(s, true, false); 2429 2430 /* Note that we do not update requested_bytes here. This is 2431 * because we cannot really know how data actually was dropped 2432 * from the write index due to this. This 'error' will be applied 2433 * by both client and server and hence we should be fine. */ 2434 2435 return o; 2436} 2437 2438pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) { 2439 pa_operation *o; 2440 2441 pa_assert(s); 2442 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2443 2444 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2445 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2446 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE); 2447 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE); 2448 2449 /* Ask for a timing update before we cork/uncork to get the best 2450 * accuracy for the transport latency suitable for the 2451 * check_smoother_status() call in the started callback */ 2452 request_auto_timing_update(s, true); 2453 2454 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata))) 2455 return NULL; 2456 2457 /* This might cause the read index to hang again, hence 2458 * let's request a timing update */ 2459 request_auto_timing_update(s, true); 2460 2461 return o; 2462} 2463 2464pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) { 2465 pa_operation *o; 2466 2467 pa_assert(s); 2468 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2469 2470 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2471 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2472 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE); 2473 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE); 2474 2475 /* Ask for a timing update before we cork/uncork to get the best 2476 * accuracy for the transport latency suitable for the 2477 * check_smoother_status() call in the started callback */ 2478 request_auto_timing_update(s, true); 2479 2480 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata))) 2481 return NULL; 2482 2483 /* This might cause the read index to start moving again, hence 2484 * let's request a timing update */ 2485 request_auto_timing_update(s, true); 2486 2487 return o; 2488} 2489 2490pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) { 2491 pa_operation *o; 2492 2493 pa_assert(s); 2494 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2495 pa_assert(name); 2496 2497 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2498 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2499 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 2500 2501 if (s->context->version >= 13) { 2502 pa_proplist *p = pa_proplist_new(); 2503 2504 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name); 2505 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata); 2506 pa_proplist_free(p); 2507 } else { 2508 pa_tagstruct *t; 2509 uint32_t tag; 2510 2511 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata); 2512 t = pa_tagstruct_command( 2513 s->context, 2514 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME), 2515 &tag); 2516 pa_tagstruct_putu32(t, s->channel); 2517 pa_tagstruct_puts(t, name); 2518 pa_pstream_send_tagstruct(s->context->pstream, t); 2519 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref); 2520 } 2521 2522 return o; 2523} 2524 2525int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) { 2526 pa_usec_t usec; 2527 2528 pa_assert(s); 2529 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2530 2531 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2532 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2533 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 2534 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA); 2535 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA); 2536 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA); 2537 2538 if (s->smoother) 2539#ifdef USE_SMOOTHER_2 2540 usec = pa_smoother_2_get(s->smoother, pa_rtclock_now()); 2541#else 2542 usec = pa_smoother_get(s->smoother, pa_rtclock_now()); 2543#endif 2544 2545 else 2546 usec = calc_time(s, false); 2547 2548 /* Make sure the time runs monotonically */ 2549 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) { 2550 if (usec < s->previous_time) 2551 usec = s->previous_time; 2552 else 2553 s->previous_time = usec; 2554 } 2555 2556 if (r_usec) 2557 *r_usec = usec; 2558 2559 return 0; 2560} 2561 2562static pa_usec_t time_counter_diff(const pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) { 2563 pa_assert(s); 2564 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2565 2566 if (negative) 2567 *negative = 0; 2568 2569 if (a >= b) 2570 return a-b; 2571 else { 2572 if (negative && s->direction == PA_STREAM_RECORD) { 2573 *negative = 1; 2574 return b-a; 2575 } else 2576 return 0; 2577 } 2578} 2579 2580int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) { 2581 pa_usec_t t, c; 2582 int r; 2583 int64_t cindex; 2584 2585 pa_assert(s); 2586 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2587 pa_assert(r_usec); 2588 2589 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2590 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2591 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 2592 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA); 2593 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA); 2594 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA); 2595 2596 if ((r = pa_stream_get_time(s, &t)) < 0) 2597 return r; 2598 2599 if (s->direction == PA_STREAM_PLAYBACK) 2600 cindex = s->timing_info.write_index; 2601 else 2602 cindex = s->timing_info.read_index; 2603 2604 if (cindex < 0) 2605 cindex = 0; 2606 2607 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec); 2608 2609 if (s->direction == PA_STREAM_PLAYBACK) 2610 *r_usec = time_counter_diff(s, c, t, negative); 2611 else 2612 *r_usec = time_counter_diff(s, t, c, negative); 2613 2614 return 0; 2615} 2616 2617const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) { 2618 pa_assert(s); 2619 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2620 2621 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2622 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2623 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 2624 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA); 2625 2626 return &s->timing_info; 2627} 2628 2629const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) { 2630 pa_assert(s); 2631 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2632 2633 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2634 2635 return &s->sample_spec; 2636} 2637 2638const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) { 2639 pa_assert(s); 2640 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2641 2642 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2643 2644 return &s->channel_map; 2645} 2646 2647const pa_format_info* pa_stream_get_format_info(const pa_stream *s) { 2648 pa_assert(s); 2649 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2650 2651 /* We don't have the format till routing is done */ 2652 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2653 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2654 2655 return s->format; 2656} 2657const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) { 2658 pa_assert(s); 2659 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2660 2661 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2662 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 2663 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED); 2664 2665 return &s->buffer_attr; 2666} 2667 2668static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { 2669 pa_operation *o = userdata; 2670 int success = 1; 2671 2672 pa_assert(pd); 2673 pa_assert(o); 2674 pa_assert(PA_REFCNT_VALUE(o) >= 1); 2675 2676 if (!o->context) 2677 goto finish; 2678 2679 if (command != PA_COMMAND_REPLY) { 2680 if (pa_context_handle_error(o->context, command, t, false) < 0) 2681 goto finish; 2682 2683 success = 0; 2684 } else { 2685 if (o->stream->direction == PA_STREAM_PLAYBACK) { 2686 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 || 2687 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 || 2688 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 || 2689 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) { 2690 pa_context_fail(o->context, PA_ERR_PROTOCOL); 2691 goto finish; 2692 } 2693 } else if (o->stream->direction == PA_STREAM_RECORD) { 2694 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 || 2695 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) { 2696 pa_context_fail(o->context, PA_ERR_PROTOCOL); 2697 goto finish; 2698 } 2699 } 2700 2701 if (o->stream->context->version >= 13) { 2702 pa_usec_t usec; 2703 2704 if (pa_tagstruct_get_usec(t, &usec) < 0) { 2705 pa_context_fail(o->context, PA_ERR_PROTOCOL); 2706 goto finish; 2707 } 2708 2709 if (o->stream->direction == PA_STREAM_RECORD) 2710 o->stream->timing_info.configured_source_usec = usec; 2711 else 2712 o->stream->timing_info.configured_sink_usec = usec; 2713 } 2714 2715 if (!pa_tagstruct_eof(t)) { 2716 pa_context_fail(o->context, PA_ERR_PROTOCOL); 2717 goto finish; 2718 } 2719 } 2720 2721 if (o->callback) { 2722 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback; 2723 cb(o->stream, success, o->userdata); 2724 } 2725 2726finish: 2727 pa_operation_done(o); 2728 pa_operation_unref(o); 2729} 2730 2731pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) { 2732 pa_operation *o; 2733 pa_tagstruct *t; 2734 uint32_t tag; 2735 pa_buffer_attr copy; 2736 2737 pa_assert(s); 2738 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2739 pa_assert(attr); 2740 2741 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2742 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2743 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 2744 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED); 2745 2746 /* Ask for a timing update before we cork/uncork to get the best 2747 * accuracy for the transport latency suitable for the 2748 * check_smoother_status() call in the started callback */ 2749 request_auto_timing_update(s, true); 2750 2751 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata); 2752 2753 t = pa_tagstruct_command( 2754 s->context, 2755 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR), 2756 &tag); 2757 pa_tagstruct_putu32(t, s->channel); 2758 2759 copy = *attr; 2760 patch_buffer_attr(s, ©, NULL); 2761 attr = © 2762 2763 pa_tagstruct_putu32(t, attr->maxlength); 2764 2765 if (s->direction == PA_STREAM_PLAYBACK) 2766 pa_tagstruct_put( 2767 t, 2768 PA_TAG_U32, attr->tlength, 2769 PA_TAG_U32, attr->prebuf, 2770 PA_TAG_U32, attr->minreq, 2771 PA_TAG_INVALID); 2772 else 2773 pa_tagstruct_putu32(t, attr->fragsize); 2774 2775 if (s->context->version >= 13) 2776 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY)); 2777 2778 if (s->context->version >= 14) 2779 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS)); 2780 2781 pa_pstream_send_tagstruct(s->context->pstream, t); 2782 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref); 2783 2784 /* This might cause changes in the read/write index, hence let's 2785 * request a timing update */ 2786 request_auto_timing_update(s, true); 2787 2788 return o; 2789} 2790 2791uint32_t pa_stream_get_device_index(const pa_stream *s) { 2792 pa_assert(s); 2793 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2794 2795 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX); 2796 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX); 2797 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX); 2798 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX); 2799 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX); 2800 2801 return s->device_index; 2802} 2803 2804const char *pa_stream_get_device_name(const pa_stream *s) { 2805 pa_assert(s); 2806 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2807 2808 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2809 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2810 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 2811 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED); 2812 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE); 2813 2814 return s->device_name; 2815} 2816 2817int pa_stream_is_suspended(const pa_stream *s) { 2818 pa_assert(s); 2819 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2820 2821 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2822 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2823 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 2824 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED); 2825 2826 return s->suspended; 2827} 2828 2829int pa_stream_is_corked(const pa_stream *s) { 2830 pa_assert(s); 2831 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2832 2833 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2834 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2835 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 2836 2837 return s->corked; 2838} 2839 2840static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { 2841 pa_operation *o = userdata; 2842 int success = 1; 2843 2844 pa_assert(pd); 2845 pa_assert(o); 2846 pa_assert(PA_REFCNT_VALUE(o) >= 1); 2847 2848 if (!o->context) 2849 goto finish; 2850 2851 if (command != PA_COMMAND_REPLY) { 2852 if (pa_context_handle_error(o->context, command, t, false) < 0) 2853 goto finish; 2854 2855 success = 0; 2856 } else { 2857 2858 if (!pa_tagstruct_eof(t)) { 2859 pa_context_fail(o->context, PA_ERR_PROTOCOL); 2860 goto finish; 2861 } 2862 } 2863 2864 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private); 2865#ifdef USE_SMOOTHER_2 2866 if (o->stream->smoother) 2867 pa_smoother_2_set_rate(o->stream->smoother, pa_rtclock_now(), o->stream->sample_spec.rate); 2868#endif 2869 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec)); 2870 2871 if (o->callback) { 2872 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback; 2873 cb(o->stream, success, o->userdata); 2874 } 2875 2876finish: 2877 pa_operation_done(o); 2878 pa_operation_unref(o); 2879} 2880 2881pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) { 2882 pa_operation *o; 2883 pa_tagstruct *t; 2884 uint32_t tag; 2885 2886 pa_assert(s); 2887 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2888 2889 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2890 PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_sample_rate_valid(rate), PA_ERR_INVALID); 2891 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2892 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 2893 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE); 2894 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED); 2895 2896 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata); 2897 o->private = PA_UINT_TO_PTR(rate); 2898 2899 t = pa_tagstruct_command( 2900 s->context, 2901 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE), 2902 &tag); 2903 pa_tagstruct_putu32(t, s->channel); 2904 pa_tagstruct_putu32(t, rate); 2905 2906 pa_pstream_send_tagstruct(s->context->pstream, t); 2907 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref); 2908 2909 return o; 2910} 2911 2912pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) { 2913 pa_operation *o; 2914 pa_tagstruct *t; 2915 uint32_t tag; 2916 2917 pa_assert(s); 2918 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2919 2920 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2921 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID); 2922 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2923 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 2924 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED); 2925 2926 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata); 2927 2928 t = pa_tagstruct_command( 2929 s->context, 2930 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST), 2931 &tag); 2932 pa_tagstruct_putu32(t, s->channel); 2933 pa_tagstruct_putu32(t, (uint32_t) mode); 2934 pa_tagstruct_put_proplist(t, p); 2935 2936 pa_pstream_send_tagstruct(s->context->pstream, t); 2937 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref); 2938 2939 /* Please note that we don't update s->proplist here, because we 2940 * don't export that field */ 2941 2942 return o; 2943} 2944 2945pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) { 2946 pa_operation *o; 2947 pa_tagstruct *t; 2948 uint32_t tag; 2949 const char * const*k; 2950 2951 pa_assert(s); 2952 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2953 2954 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2955 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID); 2956 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); 2957 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); 2958 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED); 2959 2960 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata); 2961 2962 t = pa_tagstruct_command( 2963 s->context, 2964 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST), 2965 &tag); 2966 pa_tagstruct_putu32(t, s->channel); 2967 2968 for (k = keys; *k; k++) 2969 pa_tagstruct_puts(t, *k); 2970 2971 pa_tagstruct_puts(t, NULL); 2972 2973 pa_pstream_send_tagstruct(s->context->pstream, t); 2974 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref); 2975 2976 /* Please note that we don't update s->proplist here, because we 2977 * don't export that field */ 2978 2979 return o; 2980} 2981 2982int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) { 2983 pa_assert(s); 2984 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2985 2986 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED); 2987 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID); 2988 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE); 2989 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED); 2990 2991 s->direct_on_input = sink_input_idx; 2992 2993 return 0; 2994} 2995 2996uint32_t pa_stream_get_monitor_stream(const pa_stream *s) { 2997 pa_assert(s); 2998 pa_assert(PA_REFCNT_VALUE(s) >= 1); 2999 3000 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX); 3001 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX); 3002 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX); 3003 3004 return s->direct_on_input; 3005} 3006