1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6 
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as
9   published by the Free Software Foundation; either version 2.1 of the
10   License, or (at your option) any later version.
11 
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   Lesser General Public License for more details.
16 
17   You should have received a copy of the GNU Lesser General Public
18   License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28 
29 #ifdef HAVE_NETINET_IN_H
30 #include <netinet/in.h>
31 #endif
32 
33 #include <pulse/xmalloc.h>
34 
35 #include <pulsecore/idxset.h>
36 #include <pulsecore/socket.h>
37 #include <pulsecore/queue.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/creds.h>
40 #include <pulsecore/refcnt.h>
41 #include <pulsecore/flist.h>
42 #include <pulsecore/macro.h>
43 
44 #include "pstream.h"
45 
46 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
47 #define PA_FLAG_SHMDATA     0x80000000LU
48 #define PA_FLAG_SHMDATA_MEMFD_BLOCK         0x20000000LU
49 #define PA_FLAG_SHMRELEASE  0x40000000LU
50 #define PA_FLAG_SHMREVOKE   0xC0000000LU
51 #define PA_FLAG_SHMMASK     0xFF000000LU
52 #define PA_FLAG_SEEKMASK    0x000000FFLU
53 #define PA_FLAG_SHMWRITABLE 0x00800000LU
54 
55 /* The sequence descriptor header consists of 5 32bit integers: */
56 enum {
57     PA_PSTREAM_DESCRIPTOR_LENGTH,
58     PA_PSTREAM_DESCRIPTOR_CHANNEL,
59     PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
60     PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
61     PA_PSTREAM_DESCRIPTOR_FLAGS,
62     PA_PSTREAM_DESCRIPTOR_MAX
63 };
64 
65 /* If we have an SHM block, this info follows the descriptor */
66 enum {
67     PA_PSTREAM_SHM_BLOCKID,
68     PA_PSTREAM_SHM_SHMID,
69     PA_PSTREAM_SHM_INDEX,
70     PA_PSTREAM_SHM_LENGTH,
71     PA_PSTREAM_SHM_MAX
72 };
73 
74 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
75 
76 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
77 
78 #define MINIBUF_SIZE (256)
79 
80 /* To allow uploading a single sample in one frame, this value should be the
81  * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
82  */
83 #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
84 
85 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
86 
87 struct item_info {
88     enum {
89         PA_PSTREAM_ITEM_PACKET,
90         PA_PSTREAM_ITEM_MEMBLOCK,
91         PA_PSTREAM_ITEM_SHMRELEASE,
92         PA_PSTREAM_ITEM_SHMREVOKE
93     } type;
94 
95     /* packet info */
96     pa_packet *packet;
97 #ifdef HAVE_CREDS
98     bool with_ancil_data;
99     pa_cmsg_ancil_data ancil_data;
100 #endif
101 
102     /* memblock info */
103     pa_memchunk chunk;
104     uint32_t channel;
105     int64_t offset;
106     pa_seek_mode_t seek_mode;
107 
108     /* release/revoke info */
109     uint32_t block_id;
110 };
111 
112 struct pstream_read {
113     pa_pstream_descriptor descriptor;
114     pa_memblock *memblock;
115     pa_packet *packet;
116     uint32_t shm_info[PA_PSTREAM_SHM_MAX];
117     void *data;
118     size_t index;
119 };
120 
121 struct pa_pstream {
122     PA_REFCNT_DECLARE;
123 
124     pa_mainloop_api *mainloop;
125     pa_defer_event *defer_event;
126     pa_iochannel *io;
127     pa_srbchannel *srb, *srbpending;
128     bool is_srbpending;
129 
130     pa_queue *send_queue;
131 
132     bool dead;
133 
134     struct {
135         union {
136             uint8_t minibuf[MINIBUF_SIZE];
137             pa_pstream_descriptor descriptor;
138         };
139         struct item_info* current;
140         void *data;
141         size_t index;
142         int minibuf_validsize;
143         pa_memchunk memchunk;
144     } write;
145 
146     struct pstream_read readio, readsrb;
147 
148     /* @use_shm: beside copying the full audio data to the other
149      * PA end, this pipe supports just sending references of the
150      * same audio data blocks if they reside in a SHM pool.
151      *
152      * @use_memfd: pipe supports sending SHM memfd block references
153      *
154      * @registered_memfd_ids: registered memfd pools SHM IDs. Check
155      * pa_pstream_register_memfd_mempool() for more information. */
156     bool use_shm, use_memfd;
157     bool non_registered_memfd_id_error_logged;
158     pa_idxset *registered_memfd_ids;
159 
160     pa_memimport *import;
161     pa_memexport *export;
162 
163     pa_pstream_packet_cb_t receive_packet_callback;
164     void *receive_packet_callback_userdata;
165 
166     pa_pstream_memblock_cb_t receive_memblock_callback;
167     void *receive_memblock_callback_userdata;
168 
169     pa_pstream_notify_cb_t drain_callback;
170     void *drain_callback_userdata;
171 
172     pa_pstream_notify_cb_t die_callback;
173     void *die_callback_userdata;
174 
175     pa_pstream_block_id_cb_t revoke_callback;
176     void *revoke_callback_userdata;
177 
178     pa_pstream_block_id_cb_t release_callback;
179     void *release_callback_userdata;
180 
181     pa_mempool *mempool;
182 
183 #ifdef HAVE_CREDS
184     pa_cmsg_ancil_data read_ancil_data, *write_ancil_data;
185     bool send_ancil_data_now;
186 #endif
187 };
188 
189 #ifdef HAVE_CREDS
190 /*
191  * memfd-backed SHM pools blocks transfer occur without passing the pool's
192  * fd every time, thus minimizing overhead and avoiding fd leaks. A
193  * REGISTER_MEMFD_SHMID command is sent, with the pool's memfd fd, very early
194  * on. This command has an ID that uniquely identifies the pool in question.
195  * Further pool's block references can then be exclusively done using such ID;
196  * the fd can be safely closed – on both ends – afterwards.
197  *
198  * On the sending side of this command, we want to close the passed fds
199  * directly after being sent. Meanwhile we're only allowed to asynchronously
200  * schedule packet writes to the pstream, so the job of closing passed fds is
201  * left to the pstream's actual writing function do_write(): it knows the
202  * exact point in time where the fds are passed to the other end through
203  * iochannels and the sendmsg() system call.
204  *
205  * Nonetheless not all code paths in the system desire their socket-passed
206  * fds to be closed after the send. srbchannel needs the passed fds to still
207  * be open for further communication. System-wide global memfd-backed pools
208  * also require the passed fd to be open: they pass the same fd, with the same
209  * ID registration mechanism, for each newly connected client to the system.
210  *
211  * So from all of the above, never close the ancillary fds by your own and
212  * always call below method instead. It takes care of closing the passed fds
213  * _only if allowed_ by the code paths that originally created them to do so.
214  * Moreover, it is multiple-invocations safe: failure handlers can, and
215  * should, call it for passed fds cleanup without worrying too much about
216  * the system state.
217  */
pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil)218 void pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil) {
219     if (ancil && ancil->nfd > 0 && ancil->close_fds_on_cleanup) {
220         int i;
221 
222         pa_assert(ancil->nfd <= MAX_ANCIL_DATA_FDS);
223 
224         for (i = 0; i < ancil->nfd; i++)
225             if (ancil->fds[i] != -1) {
226                 pa_assert_se(pa_close(ancil->fds[i]) == 0);
227                 ancil->fds[i] = -1;
228             }
229 
230         ancil->nfd = 0;
231         ancil->close_fds_on_cleanup = false;
232     }
233 }
234 #endif
235 
236 static int do_write(pa_pstream *p);
237 static int do_read(pa_pstream *p, struct pstream_read *re);
238 
do_pstream_read_write(pa_pstream *p)239 static void do_pstream_read_write(pa_pstream *p) {
240     pa_assert(p);
241     pa_assert(PA_REFCNT_VALUE(p) > 0);
242 
243     pa_pstream_ref(p);
244 
245     p->mainloop->defer_enable(p->defer_event, 0);
246 
247     if (!p->dead && p->srb) {
248         int r = 0;
249 
250         if(do_write(p) < 0)
251             goto fail;
252 
253         while (!p->dead && r == 0) {
254             r = do_read(p, &p->readsrb);
255             if (r < 0)
256                 goto fail;
257         }
258     }
259 
260     if (!p->dead && pa_iochannel_is_readable(p->io)) {
261         if (do_read(p, &p->readio) < 0)
262             goto fail;
263     } else if (!p->dead && pa_iochannel_is_hungup(p->io))
264         goto fail;
265 
266     while (!p->dead && pa_iochannel_is_writable(p->io)) {
267         int r = do_write(p);
268         if (r < 0)
269             goto fail;
270         if (r == 0)
271             break;
272     }
273 
274     pa_pstream_unref(p);
275     return;
276 
277 fail:
278 
279     if (p->die_callback)
280         p->die_callback(p, p->die_callback_userdata);
281 
282     pa_pstream_unlink(p);
283     pa_pstream_unref(p);
284 }
285 
srb_callback(pa_srbchannel *srb, void *userdata)286 static bool srb_callback(pa_srbchannel *srb, void *userdata) {
287     bool b;
288     pa_pstream *p = userdata;
289 
290     pa_assert(p);
291     pa_assert(PA_REFCNT_VALUE(p) > 0);
292     pa_assert(p->srb == srb);
293 
294     pa_pstream_ref(p);
295 
296     do_pstream_read_write(p);
297 
298     /* If either pstream or the srb is going away, return false.
299        We need to check this before p is destroyed. */
300     b = (PA_REFCNT_VALUE(p) > 1) && (p->srb == srb);
301     pa_pstream_unref(p);
302 
303     return b;
304 }
305 
io_callback(pa_iochannel*io, void *userdata)306 static void io_callback(pa_iochannel*io, void *userdata) {
307     pa_pstream *p = userdata;
308 
309     pa_assert(p);
310     pa_assert(PA_REFCNT_VALUE(p) > 0);
311     pa_assert(p->io == io);
312 
313     do_pstream_read_write(p);
314 }
315 
defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata)316 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
317     pa_pstream *p = userdata;
318 
319     pa_assert(p);
320     pa_assert(PA_REFCNT_VALUE(p) > 0);
321     pa_assert(p->defer_event == e);
322     pa_assert(p->mainloop == m);
323 
324     do_pstream_read_write(p);
325 }
326 
327 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
328 
pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool)329 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
330     pa_pstream *p;
331 
332     pa_assert(m);
333     pa_assert(io);
334     pa_assert(pool);
335 
336     p = pa_xnew0(pa_pstream, 1);
337     PA_REFCNT_INIT(p);
338     p->io = io;
339     pa_iochannel_set_callback(io, io_callback, p);
340 
341     p->mainloop = m;
342     p->defer_event = m->defer_new(m, defer_callback, p);
343     m->defer_enable(p->defer_event, 0);
344 
345     p->send_queue = pa_queue_new();
346 
347     p->mempool = pool;
348 
349     /* We do importing unconditionally */
350     p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
351 
352     pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
353     pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
354 
355     return p;
356 }
357 
358 /* Attach memfd<->SHM_ID mapping to given pstream and its memimport.
359  * Check pa_pstream_register_memfd_mempool() for further info.
360  *
361  * Caller owns the passed @memfd_fd and must close it down when appropriate. */
pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd)362 int pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd) {
363     int err = -1;
364 
365     pa_assert(memfd_fd != -1);
366 
367     if (!p->use_memfd) {
368         pa_log_warn("Received memfd ID registration request over a pipe "
369                     "that does not support memfds");
370         return err;
371     }
372 
373     if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
374         pa_log_warn("previously registered memfd SHM ID = %u", shm_id);
375         return err;
376     }
377 
378     if (pa_memimport_attach_memfd(p->import, shm_id, memfd_fd, true)) {
379         pa_log("Failed to create permanent mapping for memfd region with ID = %u", shm_id);
380         return err;
381     }
382 
383     pa_assert_se(pa_idxset_put(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL) == 0);
384     return 0;
385 }
386 
item_free(void *item)387 static void item_free(void *item) {
388     struct item_info *i = item;
389     pa_assert(i);
390 
391     if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
392         pa_assert(i->chunk.memblock);
393         pa_memblock_unref(i->chunk.memblock);
394     } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
395         pa_assert(i->packet);
396         pa_packet_unref(i->packet);
397     }
398 
399 #ifdef HAVE_CREDS
400     /* On error recovery paths, there might be lingering items
401      * on the pstream send queue and they are usually freed with
402      * a call to 'pa_queue_free(p->send_queue, item_free)'. Make
403      * sure we do not leak any fds in that case! */
404     if (i->with_ancil_data)
405         pa_cmsg_ancil_data_close_fds(&i->ancil_data);
406 #endif
407 
408     if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
409         pa_xfree(i);
410 }
411 
pstream_free(pa_pstream *p)412 static void pstream_free(pa_pstream *p) {
413     pa_assert(p);
414 
415     pa_pstream_unlink(p);
416 
417     pa_queue_free(p->send_queue, item_free);
418 
419     if (p->write.current)
420         item_free(p->write.current);
421 
422     if (p->write.memchunk.memblock)
423         pa_memblock_unref(p->write.memchunk.memblock);
424 
425     if (p->readsrb.memblock)
426         pa_memblock_unref(p->readsrb.memblock);
427 
428     if (p->readsrb.packet)
429         pa_packet_unref(p->readsrb.packet);
430 
431     if (p->readio.memblock)
432         pa_memblock_unref(p->readio.memblock);
433 
434     if (p->readio.packet)
435         pa_packet_unref(p->readio.packet);
436 
437     if (p->registered_memfd_ids)
438         pa_idxset_free(p->registered_memfd_ids, NULL);
439 
440     pa_xfree(p);
441 }
442 
pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data)443 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data) {
444     struct item_info *i;
445 
446     pa_assert(p);
447     pa_assert(PA_REFCNT_VALUE(p) > 0);
448     pa_assert(packet);
449 
450     if (p->dead) {
451 #ifdef HAVE_CREDS
452         pa_cmsg_ancil_data_close_fds(ancil_data);
453 #endif
454         return;
455     }
456 
457     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
458         i = pa_xnew(struct item_info, 1);
459 
460     i->type = PA_PSTREAM_ITEM_PACKET;
461     i->packet = pa_packet_ref(packet);
462 
463 #ifdef HAVE_CREDS
464     if ((i->with_ancil_data = !!ancil_data)) {
465         i->ancil_data = *ancil_data;
466         if (ancil_data->creds_valid)
467             pa_assert(ancil_data->nfd == 0);
468         else
469             pa_assert(ancil_data->nfd > 0);
470     }
471 #endif
472 
473     pa_queue_push(p->send_queue, i);
474     if (PaQueueGetLen(p->send_queue) >= 10) {  // 10 maybe have msg backlog
475         pa_log_warn("[MSG backlog]: PaQueueLen = %u", PaQueueGetLen(p->send_queue));
476     }
477 
478     p->mainloop->defer_enable(p->defer_event, 1);
479 }
480 
pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk)481 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
482     size_t length, idx;
483     size_t bsm;
484 
485     pa_assert(p);
486     pa_assert(PA_REFCNT_VALUE(p) > 0);
487     pa_assert(channel != (uint32_t) -1);
488     pa_assert(chunk);
489 
490     if (p->dead)
491         return;
492 
493     idx = 0;
494     length = chunk->length;
495 
496     bsm = pa_mempool_block_size_max(p->mempool);
497 
498     while (length > 0) {
499         struct item_info *i;
500         size_t n;
501 
502         if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
503             i = pa_xnew(struct item_info, 1);
504         i->type = PA_PSTREAM_ITEM_MEMBLOCK;
505 
506         n = PA_MIN(length, bsm);
507         i->chunk.index = chunk->index + idx;
508         i->chunk.length = n;
509         i->chunk.memblock = pa_memblock_ref(chunk->memblock);
510 
511         i->channel = channel;
512         i->offset = offset;
513         i->seek_mode = seek_mode;
514 #ifdef HAVE_CREDS
515         i->with_ancil_data = false;
516 #endif
517 
518         pa_queue_push(p->send_queue, i);
519 
520         idx += n;
521         length -= n;
522     }
523 
524     p->mainloop->defer_enable(p->defer_event, 1);
525 }
526 
pa_pstream_send_release(pa_pstream *p, uint32_t block_id)527 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
528     struct item_info *item;
529     pa_assert(p);
530     pa_assert(PA_REFCNT_VALUE(p) > 0);
531 
532     if (p->dead)
533         return;
534 
535 /*     pa_log("Releasing block %u", block_id); */
536 
537     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
538         item = pa_xnew(struct item_info, 1);
539     item->type = PA_PSTREAM_ITEM_SHMRELEASE;
540     item->block_id = block_id;
541 #ifdef HAVE_CREDS
542     item->with_ancil_data = false;
543 #endif
544 
545     pa_queue_push(p->send_queue, item);
546     p->mainloop->defer_enable(p->defer_event, 1);
547 }
548 
549 /* might be called from thread context */
memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata)550 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
551     pa_pstream *p = userdata;
552 
553     pa_assert(p);
554     pa_assert(PA_REFCNT_VALUE(p) > 0);
555 
556     if (p->dead)
557         return;
558 
559     if (p->release_callback)
560         p->release_callback(p, block_id, p->release_callback_userdata);
561     else
562         pa_pstream_send_release(p, block_id);
563 }
564 
pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id)565 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
566     struct item_info *item;
567     pa_assert(p);
568     pa_assert(PA_REFCNT_VALUE(p) > 0);
569 
570     if (p->dead)
571         return;
572 /*     pa_log("Revoking block %u", block_id); */
573 
574     if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
575         item = pa_xnew(struct item_info, 1);
576     item->type = PA_PSTREAM_ITEM_SHMREVOKE;
577     item->block_id = block_id;
578 #ifdef HAVE_CREDS
579     item->with_ancil_data = false;
580 #endif
581 
582     pa_queue_push(p->send_queue, item);
583     p->mainloop->defer_enable(p->defer_event, 1);
584 }
585 
586 /* might be called from thread context */
memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata)587 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
588     pa_pstream *p = userdata;
589 
590     pa_assert(p);
591     pa_assert(PA_REFCNT_VALUE(p) > 0);
592 
593     if (p->revoke_callback)
594         p->revoke_callback(p, block_id, p->revoke_callback_userdata);
595     else
596         pa_pstream_send_revoke(p, block_id);
597 }
598 
prepare_next_write_item(pa_pstream *p)599 static void prepare_next_write_item(pa_pstream *p) {
600     pa_assert(p);
601     pa_assert(PA_REFCNT_VALUE(p) > 0);
602 
603     p->write.current = pa_queue_pop(p->send_queue);
604 
605     if (!p->write.current)
606         return;
607     p->write.index = 0;
608     p->write.data = NULL;
609     p->write.minibuf_validsize = 0;
610     pa_memchunk_reset(&p->write.memchunk);
611 
612     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
613     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
614     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
615     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
616     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
617 
618     if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
619         size_t plen;
620 
621         pa_assert(p->write.current->packet);
622 
623         p->write.data = (void *) pa_packet_data(p->write.current->packet, &plen);
624         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen);
625 
626         if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
627             memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
628             p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
629         }
630 
631     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
632 
633         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
634         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
635 
636     } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
637 
638         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
639         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
640 
641     } else {
642         uint32_t flags;
643         bool send_payload = true;
644 
645         pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
646         pa_assert(p->write.current->chunk.memblock);
647 
648         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
649         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
650         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
651 
652         flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
653 
654         if (p->use_shm) {
655             pa_mem_type_t type;
656             uint32_t block_id, shm_id;
657             size_t offset, length;
658             uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
659             size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
660             pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
661             pa_memexport *current_export;
662 
663             if (p->mempool == current_pool)
664                 pa_assert_se(current_export = p->export);
665             else
666                 pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
667 
668             if (pa_memexport_put(current_export,
669                                  p->write.current->chunk.memblock,
670                                  &type,
671                                  &block_id,
672                                  &shm_id,
673                                  &offset,
674                                  &length) >= 0) {
675 
676                 if (type == PA_MEM_TYPE_SHARED_POSIX)
677                     send_payload = false;
678 
679                 if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd) {
680                     if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
681                         flags |= PA_FLAG_SHMDATA_MEMFD_BLOCK;
682                         send_payload = false;
683                     } else {
684                         if (!p->non_registered_memfd_id_error_logged) {
685                             pa_log("Cannot send block reference with non-registered memfd ID = %u", shm_id);
686                             pa_log("Falling back to copying full block data over socket");
687                             pa_log("There's a bug report about this: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/issues/824");
688                             p->non_registered_memfd_id_error_logged = true;
689                         }
690                     }
691                 }
692 
693                 if (send_payload) {
694                     pa_assert_se(pa_memexport_process_release(current_export, block_id) == 0);
695                 } else {
696                     flags |= PA_FLAG_SHMDATA;
697                     if (pa_mempool_is_remote_writable(current_pool))
698                         flags |= PA_FLAG_SHMWRITABLE;
699 
700                     shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
701                     shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
702                     shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
703                     shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
704 
705                     p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
706                     p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
707                 }
708             }
709 /*             else */
710 /*                 FIXME: Avoid memexport slot leaks. Call pa_memexport_process_release() */
711 /*                 pa_log_warn("Failed to export memory block."); */
712 
713             if (current_export != p->export)
714                 pa_memexport_free(current_export);
715             pa_mempool_unref(current_pool);
716         }
717 
718         if (send_payload) {
719             p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
720             p->write.memchunk = p->write.current->chunk;
721             pa_memblock_ref(p->write.memchunk.memblock);
722         }
723 
724         p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
725     }
726 
727 #ifdef HAVE_CREDS
728     if ((p->send_ancil_data_now = p->write.current->with_ancil_data))
729         p->write_ancil_data = &p->write.current->ancil_data;
730 #endif
731 }
732 
check_srbpending(pa_pstream *p)733 static void check_srbpending(pa_pstream *p) {
734     if (!p->is_srbpending)
735         return;
736 
737     if (p->srb)
738         pa_srbchannel_free(p->srb);
739 
740     p->srb = p->srbpending;
741     p->is_srbpending = false;
742 
743     if (p->srb)
744         pa_srbchannel_set_callback(p->srb, srb_callback, p);
745 }
746 
do_write(pa_pstream *p)747 static int do_write(pa_pstream *p) {
748     void *d;
749     size_t l;
750     ssize_t r;
751     pa_memblock *release_memblock = NULL;
752 
753     pa_assert(p);
754     pa_assert(PA_REFCNT_VALUE(p) > 0);
755 
756     if (!p->write.current)
757         prepare_next_write_item(p);
758 
759     if (!p->write.current) {
760         /* The out queue is empty, so switching channels is safe */
761         check_srbpending(p);
762         return 0;
763     }
764 
765     if (p->write.minibuf_validsize > 0) {
766         d = p->write.minibuf + p->write.index;
767         l = p->write.minibuf_validsize - p->write.index;
768     } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
769         d = (uint8_t*) p->write.descriptor + p->write.index;
770         l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
771     } else {
772         pa_assert(p->write.data || p->write.memchunk.memblock);
773 
774         if (p->write.data)
775             d = p->write.data;
776         else {
777             d = pa_memblock_acquire_chunk(&p->write.memchunk);
778             release_memblock = p->write.memchunk.memblock;
779         }
780 
781         d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
782         l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
783     }
784 
785     pa_assert(l > 0);
786 
787 #ifdef HAVE_CREDS
788     if (p->send_ancil_data_now) {
789         if (p->write_ancil_data->creds_valid) {
790             pa_assert(p->write_ancil_data->nfd == 0);
791             if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil_data->creds)) < 0)
792                 goto fail;
793         }
794         else
795             if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil_data->nfd, p->write_ancil_data->fds)) < 0)
796                 goto fail;
797 
798         pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
799         p->send_ancil_data_now = false;
800     } else
801 #endif
802     if (p->srb)
803         r = pa_srbchannel_write(p->srb, d, l);
804     else if ((r = pa_iochannel_write(p->io, d, l)) < 0)
805         goto fail;
806 
807     if (release_memblock)
808         pa_memblock_release(release_memblock);
809 
810     p->write.index += (size_t) r;
811 
812     if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
813         pa_assert(p->write.current);
814         item_free(p->write.current);
815         p->write.current = NULL;
816 
817         if (p->write.memchunk.memblock)
818             pa_memblock_unref(p->write.memchunk.memblock);
819 
820         pa_memchunk_reset(&p->write.memchunk);
821 
822         if (p->drain_callback && !pa_pstream_is_pending(p))
823             p->drain_callback(p, p->drain_callback_userdata);
824     }
825 
826     return (size_t) r == l ? 1 : 0;
827 
828 fail:
829 #ifdef HAVE_CREDS
830     if (p->send_ancil_data_now)
831         pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
832 #endif
833 
834     if (release_memblock)
835         pa_memblock_release(release_memblock);
836 
837     return -1;
838 }
839 
memblock_complete(pa_pstream *p, struct pstream_read *re)840 static void memblock_complete(pa_pstream *p, struct pstream_read *re) {
841     pa_memchunk chunk;
842     int64_t offset;
843 
844     if (!p->receive_memblock_callback)
845         return;
846 
847     chunk.memblock = re->memblock;
848     chunk.index = 0;
849     chunk.length = re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
850 
851     offset = (int64_t) (
852              (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
853              (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
854 
855     p->receive_memblock_callback(
856         p,
857         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
858         offset,
859         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
860         &chunk,
861         p->receive_memblock_callback_userdata);
862 }
863 
do_read(pa_pstream *p, struct pstream_read *re)864 static int do_read(pa_pstream *p, struct pstream_read *re) {
865     void *d;
866     size_t l;
867     ssize_t r;
868     pa_memblock *release_memblock = NULL;
869     pa_assert(p);
870     pa_assert(PA_REFCNT_VALUE(p) > 0);
871 
872     if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
873         d = (uint8_t*) re->descriptor + re->index;
874         l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
875     } else {
876         pa_assert(re->data || re->memblock);
877 
878         if (re->data)
879             d = re->data;
880         else {
881             d = pa_memblock_acquire(re->memblock);
882             release_memblock = re->memblock;
883         }
884 
885         d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
886         l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE);
887     }
888 
889     if (re == &p->readsrb) {
890         r = pa_srbchannel_read(p->srb, d, l);
891         if (r == 0) {
892             if (release_memblock)
893                 pa_memblock_release(release_memblock);
894             return 1;
895         }
896     }
897     else
898 #ifdef HAVE_CREDS
899     {
900         pa_cmsg_ancil_data b;
901 
902         if ((r = pa_iochannel_read_with_ancil_data(p->io, d, l, &b)) <= 0)
903             goto fail;
904 
905         if (b.creds_valid) {
906             p->read_ancil_data.creds_valid = true;
907             p->read_ancil_data.creds = b.creds;
908         }
909         if (b.nfd > 0) {
910             pa_assert(b.nfd <= MAX_ANCIL_DATA_FDS);
911             p->read_ancil_data.nfd = b.nfd;
912             memcpy(p->read_ancil_data.fds, b.fds, sizeof(int) * b.nfd);
913             p->read_ancil_data.close_fds_on_cleanup = b.close_fds_on_cleanup;
914         }
915     }
916 #else
917     if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
918         goto fail;
919 #endif
920 
921     if (release_memblock)
922         pa_memblock_release(release_memblock);
923 
924     re->index += (size_t) r;
925 
926     if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
927         uint32_t flags, length, channel;
928         /* Reading of frame descriptor complete */
929 
930         flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
931 
932         if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
933             pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
934             return -1;
935         }
936 
937         if (flags == PA_FLAG_SHMRELEASE) {
938 
939             /* This is a SHM memblock release frame with no payload */
940 
941 /*             pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
942 
943             pa_assert(p->export);
944             pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
945 
946             goto frame_done;
947 
948         } else if (flags == PA_FLAG_SHMREVOKE) {
949 
950             /* This is a SHM memblock revoke frame with no payload */
951 
952 /*             pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
953 
954             pa_assert(p->import);
955             pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
956 
957             goto frame_done;
958         }
959 
960         length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
961 
962         if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
963             pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
964             return -1;
965         }
966 
967         pa_assert(!re->packet && !re->memblock);
968 
969         channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
970 
971         if (channel == (uint32_t) -1) {
972             size_t plen;
973 
974             if (flags != 0) {
975                 pa_log_warn("Received packet frame with invalid flags value.");
976                 return -1;
977             }
978 
979             /* Frame is a packet frame */
980             re->packet = pa_packet_new(length);
981             re->data = (void *) pa_packet_data(re->packet, &plen);
982 
983         } else {
984 
985             if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
986                 pa_log_warn("Received memblock frame with invalid seek mode.");
987                 return -1;
988             }
989 
990             if (((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0) {
991 
992                 if (length != sizeof(re->shm_info)) {
993                     pa_log_warn("Received SHM memblock frame with invalid frame length.");
994                     return -1;
995                 }
996 
997                 /* Frame is a memblock frame referencing an SHM memblock */
998                 re->data = re->shm_info;
999 
1000             } else if ((flags & PA_FLAG_SHMMASK) == 0) {
1001 
1002                 /* Frame is a memblock frame */
1003 
1004                 re->memblock = pa_memblock_new(p->mempool, length);
1005                 re->data = NULL;
1006             } else {
1007 
1008                 pa_log_warn("Received memblock frame with invalid flags value.");
1009                 return -1;
1010             }
1011         }
1012 
1013     } else if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
1014         /* Frame complete */
1015 
1016         if (re->memblock) {
1017             memblock_complete(p, re);
1018 
1019             /* This was a memblock frame. We can unref the memblock now */
1020             pa_memblock_unref(re->memblock);
1021 
1022         } else if (re->packet) {
1023 
1024             if (p->receive_packet_callback)
1025 #ifdef HAVE_CREDS
1026                 p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata);
1027 #else
1028                 p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
1029 #endif
1030 
1031             pa_packet_unref(re->packet);
1032         } else {
1033             pa_memblock *b = NULL;
1034             uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
1035             uint32_t shm_id = ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]);
1036             pa_mem_type_t type = (flags & PA_FLAG_SHMDATA_MEMFD_BLOCK) ?
1037                                  PA_MEM_TYPE_SHARED_MEMFD : PA_MEM_TYPE_SHARED_POSIX;
1038 
1039             pa_assert(((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0);
1040             pa_assert(p->import);
1041 
1042             if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd &&
1043                 !pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
1044 
1045                 if (pa_log_ratelimit(PA_LOG_ERROR))
1046                     pa_log("Ignoring received block reference with non-registered memfd ID = %u", shm_id);
1047 
1048             } else if (!(b = pa_memimport_get(p->import,
1049                                               type,
1050                                               ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
1051                                               shm_id,
1052                                               ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
1053                                               ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
1054                                               !!(flags & PA_FLAG_SHMWRITABLE)))) {
1055 
1056                 if (pa_log_ratelimit(PA_LOG_DEBUG))
1057                     pa_log_debug("Failed to import memory block.");
1058             }
1059 
1060             if (p->receive_memblock_callback) {
1061                 int64_t offset;
1062                 pa_memchunk chunk;
1063 
1064                 chunk.memblock = b;
1065                 chunk.index = 0;
1066                 chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
1067 
1068                 offset = (int64_t) (
1069                         (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
1070                         (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
1071 
1072                 p->receive_memblock_callback(
1073                         p,
1074                         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
1075                         offset,
1076                         ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
1077                         &chunk,
1078                         p->receive_memblock_callback_userdata);
1079             }
1080 
1081             if (b)
1082                 pa_memblock_unref(b);
1083         }
1084 
1085         goto frame_done;
1086     }
1087 
1088     return 0;
1089 
1090 frame_done:
1091     re->memblock = NULL;
1092     re->packet = NULL;
1093     re->index = 0;
1094     re->data = NULL;
1095 
1096 #ifdef HAVE_CREDS
1097     /* FIXME: Close received ancillary data fds if the pstream's
1098      * receive_packet_callback did not do so.
1099      *
1100      * Malicious clients can attach fds to unknown commands, or attach them
1101      * to commands that does not expect fds. By doing so, server will reach
1102      * its open fd limit and future clients' SHM transfers will always fail.
1103      */
1104     p->read_ancil_data.creds_valid = false;
1105     p->read_ancil_data.nfd = 0;
1106 #endif
1107 
1108     return 0;
1109 
1110 fail:
1111     if (release_memblock)
1112         pa_memblock_release(release_memblock);
1113 
1114     return -1;
1115 }
1116 
pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata)1117 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1118     pa_assert(p);
1119     pa_assert(PA_REFCNT_VALUE(p) > 0);
1120 
1121     p->die_callback = cb;
1122     p->die_callback_userdata = userdata;
1123 }
1124 
pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata)1125 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1126     pa_assert(p);
1127     pa_assert(PA_REFCNT_VALUE(p) > 0);
1128 
1129     p->drain_callback = cb;
1130     p->drain_callback_userdata = userdata;
1131 }
1132 
pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata)1133 void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
1134     pa_assert(p);
1135     pa_assert(PA_REFCNT_VALUE(p) > 0);
1136 
1137     p->receive_packet_callback = cb;
1138     p->receive_packet_callback_userdata = userdata;
1139 }
1140 
pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata)1141 void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
1142     pa_assert(p);
1143     pa_assert(PA_REFCNT_VALUE(p) > 0);
1144 
1145     p->receive_memblock_callback = cb;
1146     p->receive_memblock_callback_userdata = userdata;
1147 }
1148 
pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata)1149 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1150     pa_assert(p);
1151     pa_assert(PA_REFCNT_VALUE(p) > 0);
1152 
1153     p->release_callback = cb;
1154     p->release_callback_userdata = userdata;
1155 }
1156 
pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata)1157 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1158     pa_assert(p);
1159     pa_assert(PA_REFCNT_VALUE(p) > 0);
1160 
1161     p->revoke_callback = cb;
1162     p->revoke_callback_userdata = userdata;
1163 }
1164 
pa_pstream_is_pending(pa_pstream *p)1165 bool pa_pstream_is_pending(pa_pstream *p) {
1166     bool b;
1167 
1168     pa_assert(p);
1169     pa_assert(PA_REFCNT_VALUE(p) > 0);
1170 
1171     if (p->dead)
1172         b = false;
1173     else
1174         b = p->write.current || !pa_queue_isempty(p->send_queue);
1175 
1176     return b;
1177 }
1178 
pa_pstream_unref(pa_pstream*p)1179 void pa_pstream_unref(pa_pstream*p) {
1180     pa_assert(p);
1181     pa_assert(PA_REFCNT_VALUE(p) > 0);
1182 
1183     if (PA_REFCNT_DEC(p) <= 0)
1184         pstream_free(p);
1185 }
1186 
pa_pstream_ref(pa_pstream*p)1187 pa_pstream* pa_pstream_ref(pa_pstream*p) {
1188     pa_assert(p);
1189     pa_assert(PA_REFCNT_VALUE(p) > 0);
1190 
1191     PA_REFCNT_INC(p);
1192     return p;
1193 }
1194 
pa_pstream_unlink(pa_pstream *p)1195 void pa_pstream_unlink(pa_pstream *p) {
1196     pa_assert(p);
1197 
1198     if (p->dead)
1199         return;
1200 
1201     p->dead = true;
1202 
1203     while (p->srb || p->is_srbpending) /* In theory there could be one active and one pending */
1204         pa_pstream_set_srbchannel(p, NULL);
1205 
1206     if (p->import) {
1207         pa_memimport_free(p->import);
1208         p->import = NULL;
1209     }
1210 
1211     if (p->export) {
1212         pa_memexport_free(p->export);
1213         p->export = NULL;
1214     }
1215 
1216     if (p->io) {
1217         pa_iochannel_free(p->io);
1218         p->io = NULL;
1219     }
1220 
1221     if (p->defer_event) {
1222         p->mainloop->defer_free(p->defer_event);
1223         p->defer_event = NULL;
1224     }
1225 
1226     p->die_callback = NULL;
1227     p->drain_callback = NULL;
1228     p->receive_packet_callback = NULL;
1229     p->receive_memblock_callback = NULL;
1230 }
1231 
pa_pstream_enable_shm(pa_pstream *p, bool enable)1232 void pa_pstream_enable_shm(pa_pstream *p, bool enable) {
1233     pa_assert(p);
1234     pa_assert(PA_REFCNT_VALUE(p) > 0);
1235 
1236     p->use_shm = enable;
1237 
1238     if (enable) {
1239 
1240         if (!p->export)
1241             p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1242 
1243     } else {
1244 
1245         if (p->export) {
1246             pa_memexport_free(p->export);
1247             p->export = NULL;
1248         }
1249     }
1250 }
1251 
pa_pstream_enable_memfd(pa_pstream *p)1252 void pa_pstream_enable_memfd(pa_pstream *p) {
1253     pa_assert(p);
1254     pa_assert(PA_REFCNT_VALUE(p) > 0);
1255     pa_assert(p->use_shm);
1256 
1257     p->use_memfd = true;
1258 
1259     if (!p->registered_memfd_ids) {
1260         p->registered_memfd_ids = pa_idxset_new(NULL, NULL);
1261     }
1262 }
1263 
pa_pstream_get_shm(pa_pstream *p)1264 bool pa_pstream_get_shm(pa_pstream *p) {
1265     pa_assert(p);
1266     pa_assert(PA_REFCNT_VALUE(p) > 0);
1267 
1268     return p->use_shm;
1269 }
1270 
pa_pstream_get_memfd(pa_pstream *p)1271 bool pa_pstream_get_memfd(pa_pstream *p) {
1272     pa_assert(p);
1273     pa_assert(PA_REFCNT_VALUE(p) > 0);
1274 
1275     return p->use_memfd;
1276 }
1277 
pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb)1278 void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) {
1279     pa_assert(p);
1280     pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL);
1281 
1282     if (srb == p->srb)
1283         return;
1284 
1285     /* We can't handle quick switches between srbchannels. */
1286     pa_assert(!p->is_srbpending);
1287 
1288     p->srbpending = srb;
1289     p->is_srbpending = true;
1290 
1291     /* Switch immediately, if possible. */
1292     if (p->dead)
1293         check_srbpending(p);
1294     else
1295         do_write(p);
1296 }
1297