1/*** 2 This file is part of PulseAudio. 3 4 Copyright 2004-2006 Lennart Poettering 5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB 6 7 PulseAudio is free software; you can redistribute it and/or modify 8 it under the terms of the GNU Lesser General Public License as 9 published by the Free Software Foundation; either version 2.1 of the 10 License, or (at your option) any later version. 11 12 PulseAudio is distributed in the hope that it will be useful, but 13 WITHOUT ANY WARRANTY; without even the implied warranty of 14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15 Lesser General Public License for more details. 16 17 You should have received a copy of the GNU Lesser General Public 18 License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 19***/ 20 21#ifdef HAVE_CONFIG_H 22#include <config.h> 23#endif 24 25#include <stdio.h> 26#include <stdlib.h> 27#include <unistd.h> 28 29#ifdef HAVE_NETINET_IN_H 30#include <netinet/in.h> 31#endif 32 33#include <pulse/xmalloc.h> 34 35#include <pulsecore/idxset.h> 36#include <pulsecore/socket.h> 37#include <pulsecore/queue.h> 38#include <pulsecore/log.h> 39#include <pulsecore/creds.h> 40#include <pulsecore/refcnt.h> 41#include <pulsecore/flist.h> 42#include <pulsecore/macro.h> 43 44#include "pstream.h" 45 46/* We piggyback information if audio data blocks are stored in SHM on the seek mode */ 47#define PA_FLAG_SHMDATA 0x80000000LU 48#define PA_FLAG_SHMDATA_MEMFD_BLOCK 0x20000000LU 49#define PA_FLAG_SHMRELEASE 0x40000000LU 50#define PA_FLAG_SHMREVOKE 0xC0000000LU 51#define PA_FLAG_SHMMASK 0xFF000000LU 52#define PA_FLAG_SEEKMASK 0x000000FFLU 53#define PA_FLAG_SHMWRITABLE 0x00800000LU 54 55/* The sequence descriptor header consists of 5 32bit integers: */ 56enum { 57 PA_PSTREAM_DESCRIPTOR_LENGTH, 58 PA_PSTREAM_DESCRIPTOR_CHANNEL, 59 PA_PSTREAM_DESCRIPTOR_OFFSET_HI, 60 PA_PSTREAM_DESCRIPTOR_OFFSET_LO, 61 PA_PSTREAM_DESCRIPTOR_FLAGS, 62 PA_PSTREAM_DESCRIPTOR_MAX 63}; 64 65/* If we have an SHM block, this info follows the descriptor */ 66enum { 67 PA_PSTREAM_SHM_BLOCKID, 68 PA_PSTREAM_SHM_SHMID, 69 PA_PSTREAM_SHM_INDEX, 70 PA_PSTREAM_SHM_LENGTH, 71 PA_PSTREAM_SHM_MAX 72}; 73 74typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX]; 75 76#define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t)) 77 78#define MINIBUF_SIZE (256) 79 80/* To allow uploading a single sample in one frame, this value should be the 81 * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h. 82 */ 83#define FRAME_SIZE_MAX_ALLOW (1024*1024*16) 84 85PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree); 86 87struct item_info { 88 enum { 89 PA_PSTREAM_ITEM_PACKET, 90 PA_PSTREAM_ITEM_MEMBLOCK, 91 PA_PSTREAM_ITEM_SHMRELEASE, 92 PA_PSTREAM_ITEM_SHMREVOKE 93 } type; 94 95 /* packet info */ 96 pa_packet *packet; 97#ifdef HAVE_CREDS 98 bool with_ancil_data; 99 pa_cmsg_ancil_data ancil_data; 100#endif 101 102 /* memblock info */ 103 pa_memchunk chunk; 104 uint32_t channel; 105 int64_t offset; 106 pa_seek_mode_t seek_mode; 107 108 /* release/revoke info */ 109 uint32_t block_id; 110}; 111 112struct pstream_read { 113 pa_pstream_descriptor descriptor; 114 pa_memblock *memblock; 115 pa_packet *packet; 116 uint32_t shm_info[PA_PSTREAM_SHM_MAX]; 117 void *data; 118 size_t index; 119}; 120 121struct pa_pstream { 122 PA_REFCNT_DECLARE; 123 124 pa_mainloop_api *mainloop; 125 pa_defer_event *defer_event; 126 pa_iochannel *io; 127 pa_srbchannel *srb, *srbpending; 128 bool is_srbpending; 129 130 pa_queue *send_queue; 131 132 bool dead; 133 134 struct { 135 union { 136 uint8_t minibuf[MINIBUF_SIZE]; 137 pa_pstream_descriptor descriptor; 138 }; 139 struct item_info* current; 140 void *data; 141 size_t index; 142 int minibuf_validsize; 143 pa_memchunk memchunk; 144 } write; 145 146 struct pstream_read readio, readsrb; 147 148 /* @use_shm: beside copying the full audio data to the other 149 * PA end, this pipe supports just sending references of the 150 * same audio data blocks if they reside in a SHM pool. 151 * 152 * @use_memfd: pipe supports sending SHM memfd block references 153 * 154 * @registered_memfd_ids: registered memfd pools SHM IDs. Check 155 * pa_pstream_register_memfd_mempool() for more information. */ 156 bool use_shm, use_memfd; 157 bool non_registered_memfd_id_error_logged; 158 pa_idxset *registered_memfd_ids; 159 160 pa_memimport *import; 161 pa_memexport *export; 162 163 pa_pstream_packet_cb_t receive_packet_callback; 164 void *receive_packet_callback_userdata; 165 166 pa_pstream_memblock_cb_t receive_memblock_callback; 167 void *receive_memblock_callback_userdata; 168 169 pa_pstream_notify_cb_t drain_callback; 170 void *drain_callback_userdata; 171 172 pa_pstream_notify_cb_t die_callback; 173 void *die_callback_userdata; 174 175 pa_pstream_block_id_cb_t revoke_callback; 176 void *revoke_callback_userdata; 177 178 pa_pstream_block_id_cb_t release_callback; 179 void *release_callback_userdata; 180 181 pa_mempool *mempool; 182 183#ifdef HAVE_CREDS 184 pa_cmsg_ancil_data read_ancil_data, *write_ancil_data; 185 bool send_ancil_data_now; 186#endif 187}; 188 189#ifdef HAVE_CREDS 190/* 191 * memfd-backed SHM pools blocks transfer occur without passing the pool's 192 * fd every time, thus minimizing overhead and avoiding fd leaks. A 193 * REGISTER_MEMFD_SHMID command is sent, with the pool's memfd fd, very early 194 * on. This command has an ID that uniquely identifies the pool in question. 195 * Further pool's block references can then be exclusively done using such ID; 196 * the fd can be safely closed – on both ends – afterwards. 197 * 198 * On the sending side of this command, we want to close the passed fds 199 * directly after being sent. Meanwhile we're only allowed to asynchronously 200 * schedule packet writes to the pstream, so the job of closing passed fds is 201 * left to the pstream's actual writing function do_write(): it knows the 202 * exact point in time where the fds are passed to the other end through 203 * iochannels and the sendmsg() system call. 204 * 205 * Nonetheless not all code paths in the system desire their socket-passed 206 * fds to be closed after the send. srbchannel needs the passed fds to still 207 * be open for further communication. System-wide global memfd-backed pools 208 * also require the passed fd to be open: they pass the same fd, with the same 209 * ID registration mechanism, for each newly connected client to the system. 210 * 211 * So from all of the above, never close the ancillary fds by your own and 212 * always call below method instead. It takes care of closing the passed fds 213 * _only if allowed_ by the code paths that originally created them to do so. 214 * Moreover, it is multiple-invocations safe: failure handlers can, and 215 * should, call it for passed fds cleanup without worrying too much about 216 * the system state. 217 */ 218void pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil) { 219 if (ancil && ancil->nfd > 0 && ancil->close_fds_on_cleanup) { 220 int i; 221 222 pa_assert(ancil->nfd <= MAX_ANCIL_DATA_FDS); 223 224 for (i = 0; i < ancil->nfd; i++) 225 if (ancil->fds[i] != -1) { 226 pa_assert_se(pa_close(ancil->fds[i]) == 0); 227 ancil->fds[i] = -1; 228 } 229 230 ancil->nfd = 0; 231 ancil->close_fds_on_cleanup = false; 232 } 233} 234#endif 235 236static int do_write(pa_pstream *p); 237static int do_read(pa_pstream *p, struct pstream_read *re); 238 239static void do_pstream_read_write(pa_pstream *p) { 240 pa_assert(p); 241 pa_assert(PA_REFCNT_VALUE(p) > 0); 242 243 pa_pstream_ref(p); 244 245 p->mainloop->defer_enable(p->defer_event, 0); 246 247 if (!p->dead && p->srb) { 248 int r = 0; 249 250 if(do_write(p) < 0) 251 goto fail; 252 253 while (!p->dead && r == 0) { 254 r = do_read(p, &p->readsrb); 255 if (r < 0) 256 goto fail; 257 } 258 } 259 260 if (!p->dead && pa_iochannel_is_readable(p->io)) { 261 if (do_read(p, &p->readio) < 0) 262 goto fail; 263 } else if (!p->dead && pa_iochannel_is_hungup(p->io)) 264 goto fail; 265 266 while (!p->dead && pa_iochannel_is_writable(p->io)) { 267 int r = do_write(p); 268 if (r < 0) 269 goto fail; 270 if (r == 0) 271 break; 272 } 273 274 pa_pstream_unref(p); 275 return; 276 277fail: 278 279 if (p->die_callback) 280 p->die_callback(p, p->die_callback_userdata); 281 282 pa_pstream_unlink(p); 283 pa_pstream_unref(p); 284} 285 286static bool srb_callback(pa_srbchannel *srb, void *userdata) { 287 bool b; 288 pa_pstream *p = userdata; 289 290 pa_assert(p); 291 pa_assert(PA_REFCNT_VALUE(p) > 0); 292 pa_assert(p->srb == srb); 293 294 pa_pstream_ref(p); 295 296 do_pstream_read_write(p); 297 298 /* If either pstream or the srb is going away, return false. 299 We need to check this before p is destroyed. */ 300 b = (PA_REFCNT_VALUE(p) > 1) && (p->srb == srb); 301 pa_pstream_unref(p); 302 303 return b; 304} 305 306static void io_callback(pa_iochannel*io, void *userdata) { 307 pa_pstream *p = userdata; 308 309 pa_assert(p); 310 pa_assert(PA_REFCNT_VALUE(p) > 0); 311 pa_assert(p->io == io); 312 313 do_pstream_read_write(p); 314} 315 316static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) { 317 pa_pstream *p = userdata; 318 319 pa_assert(p); 320 pa_assert(PA_REFCNT_VALUE(p) > 0); 321 pa_assert(p->defer_event == e); 322 pa_assert(p->mainloop == m); 323 324 do_pstream_read_write(p); 325} 326 327static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata); 328 329pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) { 330 pa_pstream *p; 331 332 pa_assert(m); 333 pa_assert(io); 334 pa_assert(pool); 335 336 p = pa_xnew0(pa_pstream, 1); 337 PA_REFCNT_INIT(p); 338 p->io = io; 339 pa_iochannel_set_callback(io, io_callback, p); 340 341 p->mainloop = m; 342 p->defer_event = m->defer_new(m, defer_callback, p); 343 m->defer_enable(p->defer_event, 0); 344 345 p->send_queue = pa_queue_new(); 346 347 p->mempool = pool; 348 349 /* We do importing unconditionally */ 350 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p); 351 352 pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool)); 353 pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool)); 354 355 return p; 356} 357 358/* Attach memfd<->SHM_ID mapping to given pstream and its memimport. 359 * Check pa_pstream_register_memfd_mempool() for further info. 360 * 361 * Caller owns the passed @memfd_fd and must close it down when appropriate. */ 362int pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd) { 363 int err = -1; 364 365 pa_assert(memfd_fd != -1); 366 367 if (!p->use_memfd) { 368 pa_log_warn("Received memfd ID registration request over a pipe " 369 "that does not support memfds"); 370 return err; 371 } 372 373 if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) { 374 pa_log_warn("previously registered memfd SHM ID = %u", shm_id); 375 return err; 376 } 377 378 if (pa_memimport_attach_memfd(p->import, shm_id, memfd_fd, true)) { 379 pa_log("Failed to create permanent mapping for memfd region with ID = %u", shm_id); 380 return err; 381 } 382 383 pa_assert_se(pa_idxset_put(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL) == 0); 384 return 0; 385} 386 387static void item_free(void *item) { 388 struct item_info *i = item; 389 pa_assert(i); 390 391 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) { 392 pa_assert(i->chunk.memblock); 393 pa_memblock_unref(i->chunk.memblock); 394 } else if (i->type == PA_PSTREAM_ITEM_PACKET) { 395 pa_assert(i->packet); 396 pa_packet_unref(i->packet); 397 } 398 399#ifdef HAVE_CREDS 400 /* On error recovery paths, there might be lingering items 401 * on the pstream send queue and they are usually freed with 402 * a call to 'pa_queue_free(p->send_queue, item_free)'. Make 403 * sure we do not leak any fds in that case! */ 404 if (i->with_ancil_data) 405 pa_cmsg_ancil_data_close_fds(&i->ancil_data); 406#endif 407 408 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0) 409 pa_xfree(i); 410} 411 412static void pstream_free(pa_pstream *p) { 413 pa_assert(p); 414 415 pa_pstream_unlink(p); 416 417 pa_queue_free(p->send_queue, item_free); 418 419 if (p->write.current) 420 item_free(p->write.current); 421 422 if (p->write.memchunk.memblock) 423 pa_memblock_unref(p->write.memchunk.memblock); 424 425 if (p->readsrb.memblock) 426 pa_memblock_unref(p->readsrb.memblock); 427 428 if (p->readsrb.packet) 429 pa_packet_unref(p->readsrb.packet); 430 431 if (p->readio.memblock) 432 pa_memblock_unref(p->readio.memblock); 433 434 if (p->readio.packet) 435 pa_packet_unref(p->readio.packet); 436 437 if (p->registered_memfd_ids) 438 pa_idxset_free(p->registered_memfd_ids, NULL); 439 440 pa_xfree(p); 441} 442 443void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data) { 444 struct item_info *i; 445 446 pa_assert(p); 447 pa_assert(PA_REFCNT_VALUE(p) > 0); 448 pa_assert(packet); 449 450 if (p->dead) { 451#ifdef HAVE_CREDS 452 pa_cmsg_ancil_data_close_fds(ancil_data); 453#endif 454 return; 455 } 456 457 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items)))) 458 i = pa_xnew(struct item_info, 1); 459 460 i->type = PA_PSTREAM_ITEM_PACKET; 461 i->packet = pa_packet_ref(packet); 462 463#ifdef HAVE_CREDS 464 if ((i->with_ancil_data = !!ancil_data)) { 465 i->ancil_data = *ancil_data; 466 if (ancil_data->creds_valid) 467 pa_assert(ancil_data->nfd == 0); 468 else 469 pa_assert(ancil_data->nfd > 0); 470 } 471#endif 472 473 pa_queue_push(p->send_queue, i); 474 if (PaQueueGetLen(p->send_queue) >= 10) { // 10 maybe have msg backlog 475 pa_log_warn("[MSG backlog]: PaQueueLen = %u", PaQueueGetLen(p->send_queue)); 476 } 477 478 p->mainloop->defer_enable(p->defer_event, 1); 479} 480 481void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) { 482 size_t length, idx; 483 size_t bsm; 484 485 pa_assert(p); 486 pa_assert(PA_REFCNT_VALUE(p) > 0); 487 pa_assert(channel != (uint32_t) -1); 488 pa_assert(chunk); 489 490 if (p->dead) 491 return; 492 493 idx = 0; 494 length = chunk->length; 495 496 bsm = pa_mempool_block_size_max(p->mempool); 497 498 while (length > 0) { 499 struct item_info *i; 500 size_t n; 501 502 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items)))) 503 i = pa_xnew(struct item_info, 1); 504 i->type = PA_PSTREAM_ITEM_MEMBLOCK; 505 506 n = PA_MIN(length, bsm); 507 i->chunk.index = chunk->index + idx; 508 i->chunk.length = n; 509 i->chunk.memblock = pa_memblock_ref(chunk->memblock); 510 511 i->channel = channel; 512 i->offset = offset; 513 i->seek_mode = seek_mode; 514#ifdef HAVE_CREDS 515 i->with_ancil_data = false; 516#endif 517 518 pa_queue_push(p->send_queue, i); 519 520 idx += n; 521 length -= n; 522 } 523 524 p->mainloop->defer_enable(p->defer_event, 1); 525} 526 527void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) { 528 struct item_info *item; 529 pa_assert(p); 530 pa_assert(PA_REFCNT_VALUE(p) > 0); 531 532 if (p->dead) 533 return; 534 535/* pa_log("Releasing block %u", block_id); */ 536 537 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items)))) 538 item = pa_xnew(struct item_info, 1); 539 item->type = PA_PSTREAM_ITEM_SHMRELEASE; 540 item->block_id = block_id; 541#ifdef HAVE_CREDS 542 item->with_ancil_data = false; 543#endif 544 545 pa_queue_push(p->send_queue, item); 546 p->mainloop->defer_enable(p->defer_event, 1); 547} 548 549/* might be called from thread context */ 550static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) { 551 pa_pstream *p = userdata; 552 553 pa_assert(p); 554 pa_assert(PA_REFCNT_VALUE(p) > 0); 555 556 if (p->dead) 557 return; 558 559 if (p->release_callback) 560 p->release_callback(p, block_id, p->release_callback_userdata); 561 else 562 pa_pstream_send_release(p, block_id); 563} 564 565void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) { 566 struct item_info *item; 567 pa_assert(p); 568 pa_assert(PA_REFCNT_VALUE(p) > 0); 569 570 if (p->dead) 571 return; 572/* pa_log("Revoking block %u", block_id); */ 573 574 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items)))) 575 item = pa_xnew(struct item_info, 1); 576 item->type = PA_PSTREAM_ITEM_SHMREVOKE; 577 item->block_id = block_id; 578#ifdef HAVE_CREDS 579 item->with_ancil_data = false; 580#endif 581 582 pa_queue_push(p->send_queue, item); 583 p->mainloop->defer_enable(p->defer_event, 1); 584} 585 586/* might be called from thread context */ 587static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) { 588 pa_pstream *p = userdata; 589 590 pa_assert(p); 591 pa_assert(PA_REFCNT_VALUE(p) > 0); 592 593 if (p->revoke_callback) 594 p->revoke_callback(p, block_id, p->revoke_callback_userdata); 595 else 596 pa_pstream_send_revoke(p, block_id); 597} 598 599static void prepare_next_write_item(pa_pstream *p) { 600 pa_assert(p); 601 pa_assert(PA_REFCNT_VALUE(p) > 0); 602 603 p->write.current = pa_queue_pop(p->send_queue); 604 605 if (!p->write.current) 606 return; 607 p->write.index = 0; 608 p->write.data = NULL; 609 p->write.minibuf_validsize = 0; 610 pa_memchunk_reset(&p->write.memchunk); 611 612 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0; 613 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1); 614 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0; 615 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0; 616 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0; 617 618 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) { 619 size_t plen; 620 621 pa_assert(p->write.current->packet); 622 623 p->write.data = (void *) pa_packet_data(p->write.current->packet, &plen); 624 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen); 625 626 if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) { 627 memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen); 628 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen; 629 } 630 631 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) { 632 633 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE); 634 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id); 635 636 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) { 637 638 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE); 639 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id); 640 641 } else { 642 uint32_t flags; 643 bool send_payload = true; 644 645 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK); 646 pa_assert(p->write.current->chunk.memblock); 647 648 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel); 649 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32)); 650 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset)); 651 652 flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK); 653 654 if (p->use_shm) { 655 pa_mem_type_t type; 656 uint32_t block_id, shm_id; 657 size_t offset, length; 658 uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE]; 659 size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX; 660 pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock); 661 pa_memexport *current_export; 662 663 if (p->mempool == current_pool) 664 pa_assert_se(current_export = p->export); 665 else 666 pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p)); 667 668 if (pa_memexport_put(current_export, 669 p->write.current->chunk.memblock, 670 &type, 671 &block_id, 672 &shm_id, 673 &offset, 674 &length) >= 0) { 675 676 if (type == PA_MEM_TYPE_SHARED_POSIX) 677 send_payload = false; 678 679 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd) { 680 if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) { 681 flags |= PA_FLAG_SHMDATA_MEMFD_BLOCK; 682 send_payload = false; 683 } else { 684 if (!p->non_registered_memfd_id_error_logged) { 685 pa_log("Cannot send block reference with non-registered memfd ID = %u", shm_id); 686 pa_log("Falling back to copying full block data over socket"); 687 pa_log("There's a bug report about this: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/issues/824"); 688 p->non_registered_memfd_id_error_logged = true; 689 } 690 } 691 } 692 693 if (send_payload) { 694 pa_assert_se(pa_memexport_process_release(current_export, block_id) == 0); 695 } else { 696 flags |= PA_FLAG_SHMDATA; 697 if (pa_mempool_is_remote_writable(current_pool)) 698 flags |= PA_FLAG_SHMWRITABLE; 699 700 shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id); 701 shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id); 702 shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index)); 703 shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length); 704 705 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size); 706 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size; 707 } 708 } 709/* else */ 710/* FIXME: Avoid memexport slot leaks. Call pa_memexport_process_release() */ 711/* pa_log_warn("Failed to export memory block."); */ 712 713 if (current_export != p->export) 714 pa_memexport_free(current_export); 715 pa_mempool_unref(current_pool); 716 } 717 718 if (send_payload) { 719 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length); 720 p->write.memchunk = p->write.current->chunk; 721 pa_memblock_ref(p->write.memchunk.memblock); 722 } 723 724 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags); 725 } 726 727#ifdef HAVE_CREDS 728 if ((p->send_ancil_data_now = p->write.current->with_ancil_data)) 729 p->write_ancil_data = &p->write.current->ancil_data; 730#endif 731} 732 733static void check_srbpending(pa_pstream *p) { 734 if (!p->is_srbpending) 735 return; 736 737 if (p->srb) 738 pa_srbchannel_free(p->srb); 739 740 p->srb = p->srbpending; 741 p->is_srbpending = false; 742 743 if (p->srb) 744 pa_srbchannel_set_callback(p->srb, srb_callback, p); 745} 746 747static int do_write(pa_pstream *p) { 748 void *d; 749 size_t l; 750 ssize_t r; 751 pa_memblock *release_memblock = NULL; 752 753 pa_assert(p); 754 pa_assert(PA_REFCNT_VALUE(p) > 0); 755 756 if (!p->write.current) 757 prepare_next_write_item(p); 758 759 if (!p->write.current) { 760 /* The out queue is empty, so switching channels is safe */ 761 check_srbpending(p); 762 return 0; 763 } 764 765 if (p->write.minibuf_validsize > 0) { 766 d = p->write.minibuf + p->write.index; 767 l = p->write.minibuf_validsize - p->write.index; 768 } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) { 769 d = (uint8_t*) p->write.descriptor + p->write.index; 770 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index; 771 } else { 772 pa_assert(p->write.data || p->write.memchunk.memblock); 773 774 if (p->write.data) 775 d = p->write.data; 776 else { 777 d = pa_memblock_acquire_chunk(&p->write.memchunk); 778 release_memblock = p->write.memchunk.memblock; 779 } 780 781 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE; 782 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE); 783 } 784 785 pa_assert(l > 0); 786 787#ifdef HAVE_CREDS 788 if (p->send_ancil_data_now) { 789 if (p->write_ancil_data->creds_valid) { 790 pa_assert(p->write_ancil_data->nfd == 0); 791 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil_data->creds)) < 0) 792 goto fail; 793 } 794 else 795 if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil_data->nfd, p->write_ancil_data->fds)) < 0) 796 goto fail; 797 798 pa_cmsg_ancil_data_close_fds(p->write_ancil_data); 799 p->send_ancil_data_now = false; 800 } else 801#endif 802 if (p->srb) 803 r = pa_srbchannel_write(p->srb, d, l); 804 else if ((r = pa_iochannel_write(p->io, d, l)) < 0) 805 goto fail; 806 807 if (release_memblock) 808 pa_memblock_release(release_memblock); 809 810 p->write.index += (size_t) r; 811 812 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) { 813 pa_assert(p->write.current); 814 item_free(p->write.current); 815 p->write.current = NULL; 816 817 if (p->write.memchunk.memblock) 818 pa_memblock_unref(p->write.memchunk.memblock); 819 820 pa_memchunk_reset(&p->write.memchunk); 821 822 if (p->drain_callback && !pa_pstream_is_pending(p)) 823 p->drain_callback(p, p->drain_callback_userdata); 824 } 825 826 return (size_t) r == l ? 1 : 0; 827 828fail: 829#ifdef HAVE_CREDS 830 if (p->send_ancil_data_now) 831 pa_cmsg_ancil_data_close_fds(p->write_ancil_data); 832#endif 833 834 if (release_memblock) 835 pa_memblock_release(release_memblock); 836 837 return -1; 838} 839 840static void memblock_complete(pa_pstream *p, struct pstream_read *re) { 841 pa_memchunk chunk; 842 int64_t offset; 843 844 if (!p->receive_memblock_callback) 845 return; 846 847 chunk.memblock = re->memblock; 848 chunk.index = 0; 849 chunk.length = re->index - PA_PSTREAM_DESCRIPTOR_SIZE; 850 851 offset = (int64_t) ( 852 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | 853 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); 854 855 p->receive_memblock_callback( 856 p, 857 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), 858 offset, 859 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, 860 &chunk, 861 p->receive_memblock_callback_userdata); 862} 863 864static int do_read(pa_pstream *p, struct pstream_read *re) { 865 void *d; 866 size_t l; 867 ssize_t r; 868 pa_memblock *release_memblock = NULL; 869 pa_assert(p); 870 pa_assert(PA_REFCNT_VALUE(p) > 0); 871 872 if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) { 873 d = (uint8_t*) re->descriptor + re->index; 874 l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index; 875 } else { 876 pa_assert(re->data || re->memblock); 877 878 if (re->data) 879 d = re->data; 880 else { 881 d = pa_memblock_acquire(re->memblock); 882 release_memblock = re->memblock; 883 } 884 885 d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE; 886 l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE); 887 } 888 889 if (re == &p->readsrb) { 890 r = pa_srbchannel_read(p->srb, d, l); 891 if (r == 0) { 892 if (release_memblock) 893 pa_memblock_release(release_memblock); 894 return 1; 895 } 896 } 897 else 898#ifdef HAVE_CREDS 899 { 900 pa_cmsg_ancil_data b; 901 902 if ((r = pa_iochannel_read_with_ancil_data(p->io, d, l, &b)) <= 0) 903 goto fail; 904 905 if (b.creds_valid) { 906 p->read_ancil_data.creds_valid = true; 907 p->read_ancil_data.creds = b.creds; 908 } 909 if (b.nfd > 0) { 910 pa_assert(b.nfd <= MAX_ANCIL_DATA_FDS); 911 p->read_ancil_data.nfd = b.nfd; 912 memcpy(p->read_ancil_data.fds, b.fds, sizeof(int) * b.nfd); 913 p->read_ancil_data.close_fds_on_cleanup = b.close_fds_on_cleanup; 914 } 915 } 916#else 917 if ((r = pa_iochannel_read(p->io, d, l)) <= 0) 918 goto fail; 919#endif 920 921 if (release_memblock) 922 pa_memblock_release(release_memblock); 923 924 re->index += (size_t) r; 925 926 if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) { 927 uint32_t flags, length, channel; 928 /* Reading of frame descriptor complete */ 929 930 flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); 931 932 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) { 933 pa_log_warn("Received SHM frame on a socket where SHM is disabled."); 934 return -1; 935 } 936 937 if (flags == PA_FLAG_SHMRELEASE) { 938 939 /* This is a SHM memblock release frame with no payload */ 940 941/* pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ 942 943 pa_assert(p->export); 944 pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); 945 946 goto frame_done; 947 948 } else if (flags == PA_FLAG_SHMREVOKE) { 949 950 /* This is a SHM memblock revoke frame with no payload */ 951 952/* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ 953 954 pa_assert(p->import); 955 pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); 956 957 goto frame_done; 958 } 959 960 length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]); 961 962 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) { 963 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length); 964 return -1; 965 } 966 967 pa_assert(!re->packet && !re->memblock); 968 969 channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]); 970 971 if (channel == (uint32_t) -1) { 972 size_t plen; 973 974 if (flags != 0) { 975 pa_log_warn("Received packet frame with invalid flags value."); 976 return -1; 977 } 978 979 /* Frame is a packet frame */ 980 re->packet = pa_packet_new(length); 981 re->data = (void *) pa_packet_data(re->packet, &plen); 982 983 } else { 984 985 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) { 986 pa_log_warn("Received memblock frame with invalid seek mode."); 987 return -1; 988 } 989 990 if (((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0) { 991 992 if (length != sizeof(re->shm_info)) { 993 pa_log_warn("Received SHM memblock frame with invalid frame length."); 994 return -1; 995 } 996 997 /* Frame is a memblock frame referencing an SHM memblock */ 998 re->data = re->shm_info; 999 1000 } else if ((flags & PA_FLAG_SHMMASK) == 0) { 1001 1002 /* Frame is a memblock frame */ 1003 1004 re->memblock = pa_memblock_new(p->mempool, length); 1005 re->data = NULL; 1006 } else { 1007 1008 pa_log_warn("Received memblock frame with invalid flags value."); 1009 return -1; 1010 } 1011 } 1012 1013 } else if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) { 1014 /* Frame complete */ 1015 1016 if (re->memblock) { 1017 memblock_complete(p, re); 1018 1019 /* This was a memblock frame. We can unref the memblock now */ 1020 pa_memblock_unref(re->memblock); 1021 1022 } else if (re->packet) { 1023 1024 if (p->receive_packet_callback) 1025#ifdef HAVE_CREDS 1026 p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata); 1027#else 1028 p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata); 1029#endif 1030 1031 pa_packet_unref(re->packet); 1032 } else { 1033 pa_memblock *b = NULL; 1034 uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); 1035 uint32_t shm_id = ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]); 1036 pa_mem_type_t type = (flags & PA_FLAG_SHMDATA_MEMFD_BLOCK) ? 1037 PA_MEM_TYPE_SHARED_MEMFD : PA_MEM_TYPE_SHARED_POSIX; 1038 1039 pa_assert(((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0); 1040 pa_assert(p->import); 1041 1042 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd && 1043 !pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) { 1044 1045 if (pa_log_ratelimit(PA_LOG_ERROR)) 1046 pa_log("Ignoring received block reference with non-registered memfd ID = %u", shm_id); 1047 1048 } else if (!(b = pa_memimport_get(p->import, 1049 type, 1050 ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]), 1051 shm_id, 1052 ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]), 1053 ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]), 1054 !!(flags & PA_FLAG_SHMWRITABLE)))) { 1055 1056 if (pa_log_ratelimit(PA_LOG_DEBUG)) 1057 pa_log_debug("Failed to import memory block."); 1058 } 1059 1060 if (p->receive_memblock_callback) { 1061 int64_t offset; 1062 pa_memchunk chunk; 1063 1064 chunk.memblock = b; 1065 chunk.index = 0; 1066 chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]); 1067 1068 offset = (int64_t) ( 1069 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | 1070 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); 1071 1072 p->receive_memblock_callback( 1073 p, 1074 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), 1075 offset, 1076 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, 1077 &chunk, 1078 p->receive_memblock_callback_userdata); 1079 } 1080 1081 if (b) 1082 pa_memblock_unref(b); 1083 } 1084 1085 goto frame_done; 1086 } 1087 1088 return 0; 1089 1090frame_done: 1091 re->memblock = NULL; 1092 re->packet = NULL; 1093 re->index = 0; 1094 re->data = NULL; 1095 1096#ifdef HAVE_CREDS 1097 /* FIXME: Close received ancillary data fds if the pstream's 1098 * receive_packet_callback did not do so. 1099 * 1100 * Malicious clients can attach fds to unknown commands, or attach them 1101 * to commands that does not expect fds. By doing so, server will reach 1102 * its open fd limit and future clients' SHM transfers will always fail. 1103 */ 1104 p->read_ancil_data.creds_valid = false; 1105 p->read_ancil_data.nfd = 0; 1106#endif 1107 1108 return 0; 1109 1110fail: 1111 if (release_memblock) 1112 pa_memblock_release(release_memblock); 1113 1114 return -1; 1115} 1116 1117void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) { 1118 pa_assert(p); 1119 pa_assert(PA_REFCNT_VALUE(p) > 0); 1120 1121 p->die_callback = cb; 1122 p->die_callback_userdata = userdata; 1123} 1124 1125void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) { 1126 pa_assert(p); 1127 pa_assert(PA_REFCNT_VALUE(p) > 0); 1128 1129 p->drain_callback = cb; 1130 p->drain_callback_userdata = userdata; 1131} 1132 1133void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) { 1134 pa_assert(p); 1135 pa_assert(PA_REFCNT_VALUE(p) > 0); 1136 1137 p->receive_packet_callback = cb; 1138 p->receive_packet_callback_userdata = userdata; 1139} 1140 1141void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) { 1142 pa_assert(p); 1143 pa_assert(PA_REFCNT_VALUE(p) > 0); 1144 1145 p->receive_memblock_callback = cb; 1146 p->receive_memblock_callback_userdata = userdata; 1147} 1148 1149void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) { 1150 pa_assert(p); 1151 pa_assert(PA_REFCNT_VALUE(p) > 0); 1152 1153 p->release_callback = cb; 1154 p->release_callback_userdata = userdata; 1155} 1156 1157void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) { 1158 pa_assert(p); 1159 pa_assert(PA_REFCNT_VALUE(p) > 0); 1160 1161 p->revoke_callback = cb; 1162 p->revoke_callback_userdata = userdata; 1163} 1164 1165bool pa_pstream_is_pending(pa_pstream *p) { 1166 bool b; 1167 1168 pa_assert(p); 1169 pa_assert(PA_REFCNT_VALUE(p) > 0); 1170 1171 if (p->dead) 1172 b = false; 1173 else 1174 b = p->write.current || !pa_queue_isempty(p->send_queue); 1175 1176 return b; 1177} 1178 1179void pa_pstream_unref(pa_pstream*p) { 1180 pa_assert(p); 1181 pa_assert(PA_REFCNT_VALUE(p) > 0); 1182 1183 if (PA_REFCNT_DEC(p) <= 0) 1184 pstream_free(p); 1185} 1186 1187pa_pstream* pa_pstream_ref(pa_pstream*p) { 1188 pa_assert(p); 1189 pa_assert(PA_REFCNT_VALUE(p) > 0); 1190 1191 PA_REFCNT_INC(p); 1192 return p; 1193} 1194 1195void pa_pstream_unlink(pa_pstream *p) { 1196 pa_assert(p); 1197 1198 if (p->dead) 1199 return; 1200 1201 p->dead = true; 1202 1203 while (p->srb || p->is_srbpending) /* In theory there could be one active and one pending */ 1204 pa_pstream_set_srbchannel(p, NULL); 1205 1206 if (p->import) { 1207 pa_memimport_free(p->import); 1208 p->import = NULL; 1209 } 1210 1211 if (p->export) { 1212 pa_memexport_free(p->export); 1213 p->export = NULL; 1214 } 1215 1216 if (p->io) { 1217 pa_iochannel_free(p->io); 1218 p->io = NULL; 1219 } 1220 1221 if (p->defer_event) { 1222 p->mainloop->defer_free(p->defer_event); 1223 p->defer_event = NULL; 1224 } 1225 1226 p->die_callback = NULL; 1227 p->drain_callback = NULL; 1228 p->receive_packet_callback = NULL; 1229 p->receive_memblock_callback = NULL; 1230} 1231 1232void pa_pstream_enable_shm(pa_pstream *p, bool enable) { 1233 pa_assert(p); 1234 pa_assert(PA_REFCNT_VALUE(p) > 0); 1235 1236 p->use_shm = enable; 1237 1238 if (enable) { 1239 1240 if (!p->export) 1241 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p); 1242 1243 } else { 1244 1245 if (p->export) { 1246 pa_memexport_free(p->export); 1247 p->export = NULL; 1248 } 1249 } 1250} 1251 1252void pa_pstream_enable_memfd(pa_pstream *p) { 1253 pa_assert(p); 1254 pa_assert(PA_REFCNT_VALUE(p) > 0); 1255 pa_assert(p->use_shm); 1256 1257 p->use_memfd = true; 1258 1259 if (!p->registered_memfd_ids) { 1260 p->registered_memfd_ids = pa_idxset_new(NULL, NULL); 1261 } 1262} 1263 1264bool pa_pstream_get_shm(pa_pstream *p) { 1265 pa_assert(p); 1266 pa_assert(PA_REFCNT_VALUE(p) > 0); 1267 1268 return p->use_shm; 1269} 1270 1271bool pa_pstream_get_memfd(pa_pstream *p) { 1272 pa_assert(p); 1273 pa_assert(PA_REFCNT_VALUE(p) > 0); 1274 1275 return p->use_memfd; 1276} 1277 1278void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) { 1279 pa_assert(p); 1280 pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL); 1281 1282 if (srb == p->srb) 1283 return; 1284 1285 /* We can't handle quick switches between srbchannels. */ 1286 pa_assert(!p->is_srbpending); 1287 1288 p->srbpending = srb; 1289 p->is_srbpending = true; 1290 1291 /* Switch immediately, if possible. */ 1292 if (p->dead) 1293 check_srbpending(p); 1294 else 1295 do_write(p); 1296} 1297