1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28
29 #ifdef HAVE_NETINET_IN_H
30 #include <netinet/in.h>
31 #endif
32
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/idxset.h>
36 #include <pulsecore/socket.h>
37 #include <pulsecore/queue.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/creds.h>
40 #include <pulsecore/refcnt.h>
41 #include <pulsecore/flist.h>
42 #include <pulsecore/macro.h>
43
44 #include "pstream.h"
45
46 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
47 #define PA_FLAG_SHMDATA 0x80000000LU
48 #define PA_FLAG_SHMDATA_MEMFD_BLOCK 0x20000000LU
49 #define PA_FLAG_SHMRELEASE 0x40000000LU
50 #define PA_FLAG_SHMREVOKE 0xC0000000LU
51 #define PA_FLAG_SHMMASK 0xFF000000LU
52 #define PA_FLAG_SEEKMASK 0x000000FFLU
53 #define PA_FLAG_SHMWRITABLE 0x00800000LU
54
55 /* The sequence descriptor header consists of 5 32bit integers: */
56 enum {
57 PA_PSTREAM_DESCRIPTOR_LENGTH,
58 PA_PSTREAM_DESCRIPTOR_CHANNEL,
59 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
60 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
61 PA_PSTREAM_DESCRIPTOR_FLAGS,
62 PA_PSTREAM_DESCRIPTOR_MAX
63 };
64
65 /* If we have an SHM block, this info follows the descriptor */
66 enum {
67 PA_PSTREAM_SHM_BLOCKID,
68 PA_PSTREAM_SHM_SHMID,
69 PA_PSTREAM_SHM_INDEX,
70 PA_PSTREAM_SHM_LENGTH,
71 PA_PSTREAM_SHM_MAX
72 };
73
74 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
75
76 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
77
78 #define MINIBUF_SIZE (256)
79
80 /* To allow uploading a single sample in one frame, this value should be the
81 * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
82 */
83 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
84
85 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
86
87 struct item_info {
88 enum {
89 PA_PSTREAM_ITEM_PACKET,
90 PA_PSTREAM_ITEM_MEMBLOCK,
91 PA_PSTREAM_ITEM_SHMRELEASE,
92 PA_PSTREAM_ITEM_SHMREVOKE
93 } type;
94
95 /* packet info */
96 pa_packet *packet;
97 #ifdef HAVE_CREDS
98 bool with_ancil_data;
99 pa_cmsg_ancil_data ancil_data;
100 #endif
101
102 /* memblock info */
103 pa_memchunk chunk;
104 uint32_t channel;
105 int64_t offset;
106 pa_seek_mode_t seek_mode;
107
108 /* release/revoke info */
109 uint32_t block_id;
110 };
111
112 struct pstream_read {
113 pa_pstream_descriptor descriptor;
114 pa_memblock *memblock;
115 pa_packet *packet;
116 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
117 void *data;
118 size_t index;
119 };
120
121 struct pa_pstream {
122 PA_REFCNT_DECLARE;
123
124 pa_mainloop_api *mainloop;
125 pa_defer_event *defer_event;
126 pa_iochannel *io;
127 pa_srbchannel *srb, *srbpending;
128 bool is_srbpending;
129
130 pa_queue *send_queue;
131
132 bool dead;
133
134 struct {
135 union {
136 uint8_t minibuf[MINIBUF_SIZE];
137 pa_pstream_descriptor descriptor;
138 };
139 struct item_info* current;
140 void *data;
141 size_t index;
142 int minibuf_validsize;
143 pa_memchunk memchunk;
144 } write;
145
146 struct pstream_read readio, readsrb;
147
148 /* @use_shm: beside copying the full audio data to the other
149 * PA end, this pipe supports just sending references of the
150 * same audio data blocks if they reside in a SHM pool.
151 *
152 * @use_memfd: pipe supports sending SHM memfd block references
153 *
154 * @registered_memfd_ids: registered memfd pools SHM IDs. Check
155 * pa_pstream_register_memfd_mempool() for more information. */
156 bool use_shm, use_memfd;
157 bool non_registered_memfd_id_error_logged;
158 pa_idxset *registered_memfd_ids;
159
160 pa_memimport *import;
161 pa_memexport *export;
162
163 pa_pstream_packet_cb_t receive_packet_callback;
164 void *receive_packet_callback_userdata;
165
166 pa_pstream_memblock_cb_t receive_memblock_callback;
167 void *receive_memblock_callback_userdata;
168
169 pa_pstream_notify_cb_t drain_callback;
170 void *drain_callback_userdata;
171
172 pa_pstream_notify_cb_t die_callback;
173 void *die_callback_userdata;
174
175 pa_pstream_block_id_cb_t revoke_callback;
176 void *revoke_callback_userdata;
177
178 pa_pstream_block_id_cb_t release_callback;
179 void *release_callback_userdata;
180
181 pa_mempool *mempool;
182
183 #ifdef HAVE_CREDS
184 pa_cmsg_ancil_data read_ancil_data, *write_ancil_data;
185 bool send_ancil_data_now;
186 #endif
187 };
188
189 #ifdef HAVE_CREDS
190 /*
191 * memfd-backed SHM pools blocks transfer occur without passing the pool's
192 * fd every time, thus minimizing overhead and avoiding fd leaks. A
193 * REGISTER_MEMFD_SHMID command is sent, with the pool's memfd fd, very early
194 * on. This command has an ID that uniquely identifies the pool in question.
195 * Further pool's block references can then be exclusively done using such ID;
196 * the fd can be safely closed – on both ends – afterwards.
197 *
198 * On the sending side of this command, we want to close the passed fds
199 * directly after being sent. Meanwhile we're only allowed to asynchronously
200 * schedule packet writes to the pstream, so the job of closing passed fds is
201 * left to the pstream's actual writing function do_write(): it knows the
202 * exact point in time where the fds are passed to the other end through
203 * iochannels and the sendmsg() system call.
204 *
205 * Nonetheless not all code paths in the system desire their socket-passed
206 * fds to be closed after the send. srbchannel needs the passed fds to still
207 * be open for further communication. System-wide global memfd-backed pools
208 * also require the passed fd to be open: they pass the same fd, with the same
209 * ID registration mechanism, for each newly connected client to the system.
210 *
211 * So from all of the above, never close the ancillary fds by your own and
212 * always call below method instead. It takes care of closing the passed fds
213 * _only if allowed_ by the code paths that originally created them to do so.
214 * Moreover, it is multiple-invocations safe: failure handlers can, and
215 * should, call it for passed fds cleanup without worrying too much about
216 * the system state.
217 */
pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil)218 void pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil) {
219 if (ancil && ancil->nfd > 0 && ancil->close_fds_on_cleanup) {
220 int i;
221
222 pa_assert(ancil->nfd <= MAX_ANCIL_DATA_FDS);
223
224 for (i = 0; i < ancil->nfd; i++)
225 if (ancil->fds[i] != -1) {
226 pa_assert_se(pa_close(ancil->fds[i]) == 0);
227 ancil->fds[i] = -1;
228 }
229
230 ancil->nfd = 0;
231 ancil->close_fds_on_cleanup = false;
232 }
233 }
234 #endif
235
236 static int do_write(pa_pstream *p);
237 static int do_read(pa_pstream *p, struct pstream_read *re);
238
do_pstream_read_write(pa_pstream *p)239 static void do_pstream_read_write(pa_pstream *p) {
240 pa_assert(p);
241 pa_assert(PA_REFCNT_VALUE(p) > 0);
242
243 pa_pstream_ref(p);
244
245 p->mainloop->defer_enable(p->defer_event, 0);
246
247 if (!p->dead && p->srb) {
248 int r = 0;
249
250 if(do_write(p) < 0)
251 goto fail;
252
253 while (!p->dead && r == 0) {
254 r = do_read(p, &p->readsrb);
255 if (r < 0)
256 goto fail;
257 }
258 }
259
260 if (!p->dead && pa_iochannel_is_readable(p->io)) {
261 if (do_read(p, &p->readio) < 0)
262 goto fail;
263 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
264 goto fail;
265
266 while (!p->dead && pa_iochannel_is_writable(p->io)) {
267 int r = do_write(p);
268 if (r < 0)
269 goto fail;
270 if (r == 0)
271 break;
272 }
273
274 pa_pstream_unref(p);
275 return;
276
277 fail:
278
279 if (p->die_callback)
280 p->die_callback(p, p->die_callback_userdata);
281
282 pa_pstream_unlink(p);
283 pa_pstream_unref(p);
284 }
285
srb_callback(pa_srbchannel *srb, void *userdata)286 static bool srb_callback(pa_srbchannel *srb, void *userdata) {
287 bool b;
288 pa_pstream *p = userdata;
289
290 pa_assert(p);
291 pa_assert(PA_REFCNT_VALUE(p) > 0);
292 pa_assert(p->srb == srb);
293
294 pa_pstream_ref(p);
295
296 do_pstream_read_write(p);
297
298 /* If either pstream or the srb is going away, return false.
299 We need to check this before p is destroyed. */
300 b = (PA_REFCNT_VALUE(p) > 1) && (p->srb == srb);
301 pa_pstream_unref(p);
302
303 return b;
304 }
305
io_callback(pa_iochannel*io, void *userdata)306 static void io_callback(pa_iochannel*io, void *userdata) {
307 pa_pstream *p = userdata;
308
309 pa_assert(p);
310 pa_assert(PA_REFCNT_VALUE(p) > 0);
311 pa_assert(p->io == io);
312
313 do_pstream_read_write(p);
314 }
315
defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata)316 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
317 pa_pstream *p = userdata;
318
319 pa_assert(p);
320 pa_assert(PA_REFCNT_VALUE(p) > 0);
321 pa_assert(p->defer_event == e);
322 pa_assert(p->mainloop == m);
323
324 do_pstream_read_write(p);
325 }
326
327 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
328
pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool)329 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
330 pa_pstream *p;
331
332 pa_assert(m);
333 pa_assert(io);
334 pa_assert(pool);
335
336 p = pa_xnew0(pa_pstream, 1);
337 PA_REFCNT_INIT(p);
338 p->io = io;
339 pa_iochannel_set_callback(io, io_callback, p);
340
341 p->mainloop = m;
342 p->defer_event = m->defer_new(m, defer_callback, p);
343 m->defer_enable(p->defer_event, 0);
344
345 p->send_queue = pa_queue_new();
346
347 p->mempool = pool;
348
349 /* We do importing unconditionally */
350 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
351
352 pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
353 pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
354
355 return p;
356 }
357
358 /* Attach memfd<->SHM_ID mapping to given pstream and its memimport.
359 * Check pa_pstream_register_memfd_mempool() for further info.
360 *
361 * Caller owns the passed @memfd_fd and must close it down when appropriate. */
pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd)362 int pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd) {
363 int err = -1;
364
365 pa_assert(memfd_fd != -1);
366
367 if (!p->use_memfd) {
368 pa_log_warn("Received memfd ID registration request over a pipe "
369 "that does not support memfds");
370 return err;
371 }
372
373 if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
374 pa_log_warn("previously registered memfd SHM ID = %u", shm_id);
375 return err;
376 }
377
378 if (pa_memimport_attach_memfd(p->import, shm_id, memfd_fd, true)) {
379 pa_log("Failed to create permanent mapping for memfd region with ID = %u", shm_id);
380 return err;
381 }
382
383 pa_assert_se(pa_idxset_put(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL) == 0);
384 return 0;
385 }
386
item_free(void *item)387 static void item_free(void *item) {
388 struct item_info *i = item;
389 pa_assert(i);
390
391 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
392 pa_assert(i->chunk.memblock);
393 pa_memblock_unref(i->chunk.memblock);
394 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
395 pa_assert(i->packet);
396 pa_packet_unref(i->packet);
397 }
398
399 #ifdef HAVE_CREDS
400 /* On error recovery paths, there might be lingering items
401 * on the pstream send queue and they are usually freed with
402 * a call to 'pa_queue_free(p->send_queue, item_free)'. Make
403 * sure we do not leak any fds in that case! */
404 if (i->with_ancil_data)
405 pa_cmsg_ancil_data_close_fds(&i->ancil_data);
406 #endif
407
408 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
409 pa_xfree(i);
410 }
411
pstream_free(pa_pstream *p)412 static void pstream_free(pa_pstream *p) {
413 pa_assert(p);
414
415 pa_pstream_unlink(p);
416
417 pa_queue_free(p->send_queue, item_free);
418
419 if (p->write.current)
420 item_free(p->write.current);
421
422 if (p->write.memchunk.memblock)
423 pa_memblock_unref(p->write.memchunk.memblock);
424
425 if (p->readsrb.memblock)
426 pa_memblock_unref(p->readsrb.memblock);
427
428 if (p->readsrb.packet)
429 pa_packet_unref(p->readsrb.packet);
430
431 if (p->readio.memblock)
432 pa_memblock_unref(p->readio.memblock);
433
434 if (p->readio.packet)
435 pa_packet_unref(p->readio.packet);
436
437 if (p->registered_memfd_ids)
438 pa_idxset_free(p->registered_memfd_ids, NULL);
439
440 pa_xfree(p);
441 }
442
pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data)443 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data) {
444 struct item_info *i;
445
446 pa_assert(p);
447 pa_assert(PA_REFCNT_VALUE(p) > 0);
448 pa_assert(packet);
449
450 if (p->dead) {
451 #ifdef HAVE_CREDS
452 pa_cmsg_ancil_data_close_fds(ancil_data);
453 #endif
454 return;
455 }
456
457 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
458 i = pa_xnew(struct item_info, 1);
459
460 i->type = PA_PSTREAM_ITEM_PACKET;
461 i->packet = pa_packet_ref(packet);
462
463 #ifdef HAVE_CREDS
464 if ((i->with_ancil_data = !!ancil_data)) {
465 i->ancil_data = *ancil_data;
466 if (ancil_data->creds_valid)
467 pa_assert(ancil_data->nfd == 0);
468 else
469 pa_assert(ancil_data->nfd > 0);
470 }
471 #endif
472
473 pa_queue_push(p->send_queue, i);
474 if (PaQueueGetLen(p->send_queue) >= 10) { // 10 maybe have msg backlog
475 pa_log_warn("[MSG backlog]: PaQueueLen = %u", PaQueueGetLen(p->send_queue));
476 }
477
478 p->mainloop->defer_enable(p->defer_event, 1);
479 }
480
pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk)481 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
482 size_t length, idx;
483 size_t bsm;
484
485 pa_assert(p);
486 pa_assert(PA_REFCNT_VALUE(p) > 0);
487 pa_assert(channel != (uint32_t) -1);
488 pa_assert(chunk);
489
490 if (p->dead)
491 return;
492
493 idx = 0;
494 length = chunk->length;
495
496 bsm = pa_mempool_block_size_max(p->mempool);
497
498 while (length > 0) {
499 struct item_info *i;
500 size_t n;
501
502 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
503 i = pa_xnew(struct item_info, 1);
504 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
505
506 n = PA_MIN(length, bsm);
507 i->chunk.index = chunk->index + idx;
508 i->chunk.length = n;
509 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
510
511 i->channel = channel;
512 i->offset = offset;
513 i->seek_mode = seek_mode;
514 #ifdef HAVE_CREDS
515 i->with_ancil_data = false;
516 #endif
517
518 pa_queue_push(p->send_queue, i);
519
520 idx += n;
521 length -= n;
522 }
523
524 p->mainloop->defer_enable(p->defer_event, 1);
525 }
526
pa_pstream_send_release(pa_pstream *p, uint32_t block_id)527 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
528 struct item_info *item;
529 pa_assert(p);
530 pa_assert(PA_REFCNT_VALUE(p) > 0);
531
532 if (p->dead)
533 return;
534
535 /* pa_log("Releasing block %u", block_id); */
536
537 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
538 item = pa_xnew(struct item_info, 1);
539 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
540 item->block_id = block_id;
541 #ifdef HAVE_CREDS
542 item->with_ancil_data = false;
543 #endif
544
545 pa_queue_push(p->send_queue, item);
546 p->mainloop->defer_enable(p->defer_event, 1);
547 }
548
549 /* might be called from thread context */
memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata)550 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
551 pa_pstream *p = userdata;
552
553 pa_assert(p);
554 pa_assert(PA_REFCNT_VALUE(p) > 0);
555
556 if (p->dead)
557 return;
558
559 if (p->release_callback)
560 p->release_callback(p, block_id, p->release_callback_userdata);
561 else
562 pa_pstream_send_release(p, block_id);
563 }
564
pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id)565 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
566 struct item_info *item;
567 pa_assert(p);
568 pa_assert(PA_REFCNT_VALUE(p) > 0);
569
570 if (p->dead)
571 return;
572 /* pa_log("Revoking block %u", block_id); */
573
574 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
575 item = pa_xnew(struct item_info, 1);
576 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
577 item->block_id = block_id;
578 #ifdef HAVE_CREDS
579 item->with_ancil_data = false;
580 #endif
581
582 pa_queue_push(p->send_queue, item);
583 p->mainloop->defer_enable(p->defer_event, 1);
584 }
585
586 /* might be called from thread context */
memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata)587 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
588 pa_pstream *p = userdata;
589
590 pa_assert(p);
591 pa_assert(PA_REFCNT_VALUE(p) > 0);
592
593 if (p->revoke_callback)
594 p->revoke_callback(p, block_id, p->revoke_callback_userdata);
595 else
596 pa_pstream_send_revoke(p, block_id);
597 }
598
prepare_next_write_item(pa_pstream *p)599 static void prepare_next_write_item(pa_pstream *p) {
600 pa_assert(p);
601 pa_assert(PA_REFCNT_VALUE(p) > 0);
602
603 p->write.current = pa_queue_pop(p->send_queue);
604
605 if (!p->write.current)
606 return;
607 p->write.index = 0;
608 p->write.data = NULL;
609 p->write.minibuf_validsize = 0;
610 pa_memchunk_reset(&p->write.memchunk);
611
612 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
613 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
614 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
615 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
616 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
617
618 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
619 size_t plen;
620
621 pa_assert(p->write.current->packet);
622
623 p->write.data = (void *) pa_packet_data(p->write.current->packet, &plen);
624 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen);
625
626 if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
627 memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
628 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
629 }
630
631 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
632
633 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
634 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
635
636 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
637
638 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
639 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
640
641 } else {
642 uint32_t flags;
643 bool send_payload = true;
644
645 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
646 pa_assert(p->write.current->chunk.memblock);
647
648 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
649 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
650 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
651
652 flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
653
654 if (p->use_shm) {
655 pa_mem_type_t type;
656 uint32_t block_id, shm_id;
657 size_t offset, length;
658 uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
659 size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
660 pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
661 pa_memexport *current_export;
662
663 if (p->mempool == current_pool)
664 pa_assert_se(current_export = p->export);
665 else
666 pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
667
668 if (pa_memexport_put(current_export,
669 p->write.current->chunk.memblock,
670 &type,
671 &block_id,
672 &shm_id,
673 &offset,
674 &length) >= 0) {
675
676 if (type == PA_MEM_TYPE_SHARED_POSIX)
677 send_payload = false;
678
679 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd) {
680 if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
681 flags |= PA_FLAG_SHMDATA_MEMFD_BLOCK;
682 send_payload = false;
683 } else {
684 if (!p->non_registered_memfd_id_error_logged) {
685 pa_log("Cannot send block reference with non-registered memfd ID = %u", shm_id);
686 pa_log("Falling back to copying full block data over socket");
687 pa_log("There's a bug report about this: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/issues/824");
688 p->non_registered_memfd_id_error_logged = true;
689 }
690 }
691 }
692
693 if (send_payload) {
694 pa_assert_se(pa_memexport_process_release(current_export, block_id) == 0);
695 } else {
696 flags |= PA_FLAG_SHMDATA;
697 if (pa_mempool_is_remote_writable(current_pool))
698 flags |= PA_FLAG_SHMWRITABLE;
699
700 shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
701 shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
702 shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
703 shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
704
705 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
706 p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
707 }
708 }
709 /* else */
710 /* FIXME: Avoid memexport slot leaks. Call pa_memexport_process_release() */
711 /* pa_log_warn("Failed to export memory block."); */
712
713 if (current_export != p->export)
714 pa_memexport_free(current_export);
715 pa_mempool_unref(current_pool);
716 }
717
718 if (send_payload) {
719 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
720 p->write.memchunk = p->write.current->chunk;
721 pa_memblock_ref(p->write.memchunk.memblock);
722 }
723
724 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
725 }
726
727 #ifdef HAVE_CREDS
728 if ((p->send_ancil_data_now = p->write.current->with_ancil_data))
729 p->write_ancil_data = &p->write.current->ancil_data;
730 #endif
731 }
732
check_srbpending(pa_pstream *p)733 static void check_srbpending(pa_pstream *p) {
734 if (!p->is_srbpending)
735 return;
736
737 if (p->srb)
738 pa_srbchannel_free(p->srb);
739
740 p->srb = p->srbpending;
741 p->is_srbpending = false;
742
743 if (p->srb)
744 pa_srbchannel_set_callback(p->srb, srb_callback, p);
745 }
746
do_write(pa_pstream *p)747 static int do_write(pa_pstream *p) {
748 void *d;
749 size_t l;
750 ssize_t r;
751 pa_memblock *release_memblock = NULL;
752
753 pa_assert(p);
754 pa_assert(PA_REFCNT_VALUE(p) > 0);
755
756 if (!p->write.current)
757 prepare_next_write_item(p);
758
759 if (!p->write.current) {
760 /* The out queue is empty, so switching channels is safe */
761 check_srbpending(p);
762 return 0;
763 }
764
765 if (p->write.minibuf_validsize > 0) {
766 d = p->write.minibuf + p->write.index;
767 l = p->write.minibuf_validsize - p->write.index;
768 } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
769 d = (uint8_t*) p->write.descriptor + p->write.index;
770 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
771 } else {
772 pa_assert(p->write.data || p->write.memchunk.memblock);
773
774 if (p->write.data)
775 d = p->write.data;
776 else {
777 d = pa_memblock_acquire_chunk(&p->write.memchunk);
778 release_memblock = p->write.memchunk.memblock;
779 }
780
781 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
782 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
783 }
784
785 pa_assert(l > 0);
786
787 #ifdef HAVE_CREDS
788 if (p->send_ancil_data_now) {
789 if (p->write_ancil_data->creds_valid) {
790 pa_assert(p->write_ancil_data->nfd == 0);
791 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil_data->creds)) < 0)
792 goto fail;
793 }
794 else
795 if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil_data->nfd, p->write_ancil_data->fds)) < 0)
796 goto fail;
797
798 pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
799 p->send_ancil_data_now = false;
800 } else
801 #endif
802 if (p->srb)
803 r = pa_srbchannel_write(p->srb, d, l);
804 else if ((r = pa_iochannel_write(p->io, d, l)) < 0)
805 goto fail;
806
807 if (release_memblock)
808 pa_memblock_release(release_memblock);
809
810 p->write.index += (size_t) r;
811
812 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
813 pa_assert(p->write.current);
814 item_free(p->write.current);
815 p->write.current = NULL;
816
817 if (p->write.memchunk.memblock)
818 pa_memblock_unref(p->write.memchunk.memblock);
819
820 pa_memchunk_reset(&p->write.memchunk);
821
822 if (p->drain_callback && !pa_pstream_is_pending(p))
823 p->drain_callback(p, p->drain_callback_userdata);
824 }
825
826 return (size_t) r == l ? 1 : 0;
827
828 fail:
829 #ifdef HAVE_CREDS
830 if (p->send_ancil_data_now)
831 pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
832 #endif
833
834 if (release_memblock)
835 pa_memblock_release(release_memblock);
836
837 return -1;
838 }
839
memblock_complete(pa_pstream *p, struct pstream_read *re)840 static void memblock_complete(pa_pstream *p, struct pstream_read *re) {
841 pa_memchunk chunk;
842 int64_t offset;
843
844 if (!p->receive_memblock_callback)
845 return;
846
847 chunk.memblock = re->memblock;
848 chunk.index = 0;
849 chunk.length = re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
850
851 offset = (int64_t) (
852 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
853 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
854
855 p->receive_memblock_callback(
856 p,
857 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
858 offset,
859 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
860 &chunk,
861 p->receive_memblock_callback_userdata);
862 }
863
do_read(pa_pstream *p, struct pstream_read *re)864 static int do_read(pa_pstream *p, struct pstream_read *re) {
865 void *d;
866 size_t l;
867 ssize_t r;
868 pa_memblock *release_memblock = NULL;
869 pa_assert(p);
870 pa_assert(PA_REFCNT_VALUE(p) > 0);
871
872 if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
873 d = (uint8_t*) re->descriptor + re->index;
874 l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
875 } else {
876 pa_assert(re->data || re->memblock);
877
878 if (re->data)
879 d = re->data;
880 else {
881 d = pa_memblock_acquire(re->memblock);
882 release_memblock = re->memblock;
883 }
884
885 d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
886 l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE);
887 }
888
889 if (re == &p->readsrb) {
890 r = pa_srbchannel_read(p->srb, d, l);
891 if (r == 0) {
892 if (release_memblock)
893 pa_memblock_release(release_memblock);
894 return 1;
895 }
896 }
897 else
898 #ifdef HAVE_CREDS
899 {
900 pa_cmsg_ancil_data b;
901
902 if ((r = pa_iochannel_read_with_ancil_data(p->io, d, l, &b)) <= 0)
903 goto fail;
904
905 if (b.creds_valid) {
906 p->read_ancil_data.creds_valid = true;
907 p->read_ancil_data.creds = b.creds;
908 }
909 if (b.nfd > 0) {
910 pa_assert(b.nfd <= MAX_ANCIL_DATA_FDS);
911 p->read_ancil_data.nfd = b.nfd;
912 memcpy(p->read_ancil_data.fds, b.fds, sizeof(int) * b.nfd);
913 p->read_ancil_data.close_fds_on_cleanup = b.close_fds_on_cleanup;
914 }
915 }
916 #else
917 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
918 goto fail;
919 #endif
920
921 if (release_memblock)
922 pa_memblock_release(release_memblock);
923
924 re->index += (size_t) r;
925
926 if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
927 uint32_t flags, length, channel;
928 /* Reading of frame descriptor complete */
929
930 flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
931
932 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
933 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
934 return -1;
935 }
936
937 if (flags == PA_FLAG_SHMRELEASE) {
938
939 /* This is a SHM memblock release frame with no payload */
940
941 /* pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
942
943 pa_assert(p->export);
944 pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
945
946 goto frame_done;
947
948 } else if (flags == PA_FLAG_SHMREVOKE) {
949
950 /* This is a SHM memblock revoke frame with no payload */
951
952 /* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
953
954 pa_assert(p->import);
955 pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
956
957 goto frame_done;
958 }
959
960 length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
961
962 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
963 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
964 return -1;
965 }
966
967 pa_assert(!re->packet && !re->memblock);
968
969 channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
970
971 if (channel == (uint32_t) -1) {
972 size_t plen;
973
974 if (flags != 0) {
975 pa_log_warn("Received packet frame with invalid flags value.");
976 return -1;
977 }
978
979 /* Frame is a packet frame */
980 re->packet = pa_packet_new(length);
981 re->data = (void *) pa_packet_data(re->packet, &plen);
982
983 } else {
984
985 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
986 pa_log_warn("Received memblock frame with invalid seek mode.");
987 return -1;
988 }
989
990 if (((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0) {
991
992 if (length != sizeof(re->shm_info)) {
993 pa_log_warn("Received SHM memblock frame with invalid frame length.");
994 return -1;
995 }
996
997 /* Frame is a memblock frame referencing an SHM memblock */
998 re->data = re->shm_info;
999
1000 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
1001
1002 /* Frame is a memblock frame */
1003
1004 re->memblock = pa_memblock_new(p->mempool, length);
1005 re->data = NULL;
1006 } else {
1007
1008 pa_log_warn("Received memblock frame with invalid flags value.");
1009 return -1;
1010 }
1011 }
1012
1013 } else if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
1014 /* Frame complete */
1015
1016 if (re->memblock) {
1017 memblock_complete(p, re);
1018
1019 /* This was a memblock frame. We can unref the memblock now */
1020 pa_memblock_unref(re->memblock);
1021
1022 } else if (re->packet) {
1023
1024 if (p->receive_packet_callback)
1025 #ifdef HAVE_CREDS
1026 p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata);
1027 #else
1028 p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
1029 #endif
1030
1031 pa_packet_unref(re->packet);
1032 } else {
1033 pa_memblock *b = NULL;
1034 uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
1035 uint32_t shm_id = ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]);
1036 pa_mem_type_t type = (flags & PA_FLAG_SHMDATA_MEMFD_BLOCK) ?
1037 PA_MEM_TYPE_SHARED_MEMFD : PA_MEM_TYPE_SHARED_POSIX;
1038
1039 pa_assert(((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0);
1040 pa_assert(p->import);
1041
1042 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd &&
1043 !pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
1044
1045 if (pa_log_ratelimit(PA_LOG_ERROR))
1046 pa_log("Ignoring received block reference with non-registered memfd ID = %u", shm_id);
1047
1048 } else if (!(b = pa_memimport_get(p->import,
1049 type,
1050 ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
1051 shm_id,
1052 ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
1053 ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
1054 !!(flags & PA_FLAG_SHMWRITABLE)))) {
1055
1056 if (pa_log_ratelimit(PA_LOG_DEBUG))
1057 pa_log_debug("Failed to import memory block.");
1058 }
1059
1060 if (p->receive_memblock_callback) {
1061 int64_t offset;
1062 pa_memchunk chunk;
1063
1064 chunk.memblock = b;
1065 chunk.index = 0;
1066 chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
1067
1068 offset = (int64_t) (
1069 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
1070 (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
1071
1072 p->receive_memblock_callback(
1073 p,
1074 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
1075 offset,
1076 ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
1077 &chunk,
1078 p->receive_memblock_callback_userdata);
1079 }
1080
1081 if (b)
1082 pa_memblock_unref(b);
1083 }
1084
1085 goto frame_done;
1086 }
1087
1088 return 0;
1089
1090 frame_done:
1091 re->memblock = NULL;
1092 re->packet = NULL;
1093 re->index = 0;
1094 re->data = NULL;
1095
1096 #ifdef HAVE_CREDS
1097 /* FIXME: Close received ancillary data fds if the pstream's
1098 * receive_packet_callback did not do so.
1099 *
1100 * Malicious clients can attach fds to unknown commands, or attach them
1101 * to commands that does not expect fds. By doing so, server will reach
1102 * its open fd limit and future clients' SHM transfers will always fail.
1103 */
1104 p->read_ancil_data.creds_valid = false;
1105 p->read_ancil_data.nfd = 0;
1106 #endif
1107
1108 return 0;
1109
1110 fail:
1111 if (release_memblock)
1112 pa_memblock_release(release_memblock);
1113
1114 return -1;
1115 }
1116
pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata)1117 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1118 pa_assert(p);
1119 pa_assert(PA_REFCNT_VALUE(p) > 0);
1120
1121 p->die_callback = cb;
1122 p->die_callback_userdata = userdata;
1123 }
1124
pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata)1125 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1126 pa_assert(p);
1127 pa_assert(PA_REFCNT_VALUE(p) > 0);
1128
1129 p->drain_callback = cb;
1130 p->drain_callback_userdata = userdata;
1131 }
1132
pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata)1133 void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
1134 pa_assert(p);
1135 pa_assert(PA_REFCNT_VALUE(p) > 0);
1136
1137 p->receive_packet_callback = cb;
1138 p->receive_packet_callback_userdata = userdata;
1139 }
1140
pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata)1141 void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
1142 pa_assert(p);
1143 pa_assert(PA_REFCNT_VALUE(p) > 0);
1144
1145 p->receive_memblock_callback = cb;
1146 p->receive_memblock_callback_userdata = userdata;
1147 }
1148
pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata)1149 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1150 pa_assert(p);
1151 pa_assert(PA_REFCNT_VALUE(p) > 0);
1152
1153 p->release_callback = cb;
1154 p->release_callback_userdata = userdata;
1155 }
1156
pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata)1157 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1158 pa_assert(p);
1159 pa_assert(PA_REFCNT_VALUE(p) > 0);
1160
1161 p->revoke_callback = cb;
1162 p->revoke_callback_userdata = userdata;
1163 }
1164
pa_pstream_is_pending(pa_pstream *p)1165 bool pa_pstream_is_pending(pa_pstream *p) {
1166 bool b;
1167
1168 pa_assert(p);
1169 pa_assert(PA_REFCNT_VALUE(p) > 0);
1170
1171 if (p->dead)
1172 b = false;
1173 else
1174 b = p->write.current || !pa_queue_isempty(p->send_queue);
1175
1176 return b;
1177 }
1178
pa_pstream_unref(pa_pstream*p)1179 void pa_pstream_unref(pa_pstream*p) {
1180 pa_assert(p);
1181 pa_assert(PA_REFCNT_VALUE(p) > 0);
1182
1183 if (PA_REFCNT_DEC(p) <= 0)
1184 pstream_free(p);
1185 }
1186
pa_pstream_ref(pa_pstream*p)1187 pa_pstream* pa_pstream_ref(pa_pstream*p) {
1188 pa_assert(p);
1189 pa_assert(PA_REFCNT_VALUE(p) > 0);
1190
1191 PA_REFCNT_INC(p);
1192 return p;
1193 }
1194
pa_pstream_unlink(pa_pstream *p)1195 void pa_pstream_unlink(pa_pstream *p) {
1196 pa_assert(p);
1197
1198 if (p->dead)
1199 return;
1200
1201 p->dead = true;
1202
1203 while (p->srb || p->is_srbpending) /* In theory there could be one active and one pending */
1204 pa_pstream_set_srbchannel(p, NULL);
1205
1206 if (p->import) {
1207 pa_memimport_free(p->import);
1208 p->import = NULL;
1209 }
1210
1211 if (p->export) {
1212 pa_memexport_free(p->export);
1213 p->export = NULL;
1214 }
1215
1216 if (p->io) {
1217 pa_iochannel_free(p->io);
1218 p->io = NULL;
1219 }
1220
1221 if (p->defer_event) {
1222 p->mainloop->defer_free(p->defer_event);
1223 p->defer_event = NULL;
1224 }
1225
1226 p->die_callback = NULL;
1227 p->drain_callback = NULL;
1228 p->receive_packet_callback = NULL;
1229 p->receive_memblock_callback = NULL;
1230 }
1231
pa_pstream_enable_shm(pa_pstream *p, bool enable)1232 void pa_pstream_enable_shm(pa_pstream *p, bool enable) {
1233 pa_assert(p);
1234 pa_assert(PA_REFCNT_VALUE(p) > 0);
1235
1236 p->use_shm = enable;
1237
1238 if (enable) {
1239
1240 if (!p->export)
1241 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1242
1243 } else {
1244
1245 if (p->export) {
1246 pa_memexport_free(p->export);
1247 p->export = NULL;
1248 }
1249 }
1250 }
1251
pa_pstream_enable_memfd(pa_pstream *p)1252 void pa_pstream_enable_memfd(pa_pstream *p) {
1253 pa_assert(p);
1254 pa_assert(PA_REFCNT_VALUE(p) > 0);
1255 pa_assert(p->use_shm);
1256
1257 p->use_memfd = true;
1258
1259 if (!p->registered_memfd_ids) {
1260 p->registered_memfd_ids = pa_idxset_new(NULL, NULL);
1261 }
1262 }
1263
pa_pstream_get_shm(pa_pstream *p)1264 bool pa_pstream_get_shm(pa_pstream *p) {
1265 pa_assert(p);
1266 pa_assert(PA_REFCNT_VALUE(p) > 0);
1267
1268 return p->use_shm;
1269 }
1270
pa_pstream_get_memfd(pa_pstream *p)1271 bool pa_pstream_get_memfd(pa_pstream *p) {
1272 pa_assert(p);
1273 pa_assert(PA_REFCNT_VALUE(p) > 0);
1274
1275 return p->use_memfd;
1276 }
1277
pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb)1278 void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) {
1279 pa_assert(p);
1280 pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL);
1281
1282 if (srb == p->srb)
1283 return;
1284
1285 /* We can't handle quick switches between srbchannels. */
1286 pa_assert(!p->is_srbpending);
1287
1288 p->srbpending = srb;
1289 p->is_srbpending = true;
1290
1291 /* Switch immediately, if possible. */
1292 if (p->dead)
1293 check_srbpending(p);
1294 else
1295 do_write(p);
1296 }
1297