1/***
2  This file is part of PulseAudio.
3
4  Copyright 2004-2006 Lennart Poettering
5  Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7  PulseAudio is free software; you can redistribute it and/or modify
8  it under the terms of the GNU Lesser General Public License as
9  published by the Free Software Foundation; either version 2.1 of the
10  License, or (at your option) any later version.
11
12  PulseAudio is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  Lesser General Public License for more details.
16
17  You should have received a copy of the GNU Lesser General Public
18  License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19***/
20
21#ifdef HAVE_CONFIG_H
22#include <config.h>
23#endif
24
25#include <stdio.h>
26#include <stdlib.h>
27#include <unistd.h>
28
29#ifdef HAVE_NETINET_IN_H
30#include <netinet/in.h>
31#endif
32
33#include <pulse/xmalloc.h>
34
35#include <pulsecore/idxset.h>
36#include <pulsecore/socket.h>
37#include <pulsecore/queue.h>
38#include <pulsecore/log.h>
39#include <pulsecore/creds.h>
40#include <pulsecore/refcnt.h>
41#include <pulsecore/flist.h>
42#include <pulsecore/macro.h>
43
44#include "pstream.h"
45
46/* We piggyback information if audio data blocks are stored in SHM on the seek mode */
47#define PA_FLAG_SHMDATA     0x80000000LU
48#define PA_FLAG_SHMDATA_MEMFD_BLOCK         0x20000000LU
49#define PA_FLAG_SHMRELEASE  0x40000000LU
50#define PA_FLAG_SHMREVOKE   0xC0000000LU
51#define PA_FLAG_SHMMASK     0xFF000000LU
52#define PA_FLAG_SEEKMASK    0x000000FFLU
53#define PA_FLAG_SHMWRITABLE 0x00800000LU
54
55/* The sequence descriptor header consists of 5 32bit integers: */
56enum {
57    PA_PSTREAM_DESCRIPTOR_LENGTH,
58    PA_PSTREAM_DESCRIPTOR_CHANNEL,
59    PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
60    PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
61    PA_PSTREAM_DESCRIPTOR_FLAGS,
62    PA_PSTREAM_DESCRIPTOR_MAX
63};
64
65/* If we have an SHM block, this info follows the descriptor */
66enum {
67    PA_PSTREAM_SHM_BLOCKID,
68    PA_PSTREAM_SHM_SHMID,
69    PA_PSTREAM_SHM_INDEX,
70    PA_PSTREAM_SHM_LENGTH,
71    PA_PSTREAM_SHM_MAX
72};
73
74typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
75
76#define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
77
78#define MINIBUF_SIZE (256)
79
80/* To allow uploading a single sample in one frame, this value should be the
81 * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
82 */
83#define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
84
85PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
86
87struct item_info {
88    enum {
89        PA_PSTREAM_ITEM_PACKET,
90        PA_PSTREAM_ITEM_MEMBLOCK,
91        PA_PSTREAM_ITEM_SHMRELEASE,
92        PA_PSTREAM_ITEM_SHMREVOKE
93    } type;
94
95    /* packet info */
96    pa_packet *packet;
97#ifdef HAVE_CREDS
98    bool with_ancil_data;
99    pa_cmsg_ancil_data ancil_data;
100#endif
101
102    /* memblock info */
103    pa_memchunk chunk;
104    uint32_t channel;
105    int64_t offset;
106    pa_seek_mode_t seek_mode;
107
108    /* release/revoke info */
109    uint32_t block_id;
110};
111
112struct pstream_read {
113    pa_pstream_descriptor descriptor;
114    pa_memblock *memblock;
115    pa_packet *packet;
116    uint32_t shm_info[PA_PSTREAM_SHM_MAX];
117    void *data;
118    size_t index;
119};
120
121struct pa_pstream {
122    PA_REFCNT_DECLARE;
123
124    pa_mainloop_api *mainloop;
125    pa_defer_event *defer_event;
126    pa_iochannel *io;
127    pa_srbchannel *srb, *srbpending;
128    bool is_srbpending;
129
130    pa_queue *send_queue;
131
132    bool dead;
133
134    struct {
135        union {
136            uint8_t minibuf[MINIBUF_SIZE];
137            pa_pstream_descriptor descriptor;
138        };
139        struct item_info* current;
140        void *data;
141        size_t index;
142        int minibuf_validsize;
143        pa_memchunk memchunk;
144    } write;
145
146    struct pstream_read readio, readsrb;
147
148    /* @use_shm: beside copying the full audio data to the other
149     * PA end, this pipe supports just sending references of the
150     * same audio data blocks if they reside in a SHM pool.
151     *
152     * @use_memfd: pipe supports sending SHM memfd block references
153     *
154     * @registered_memfd_ids: registered memfd pools SHM IDs. Check
155     * pa_pstream_register_memfd_mempool() for more information. */
156    bool use_shm, use_memfd;
157    bool non_registered_memfd_id_error_logged;
158    pa_idxset *registered_memfd_ids;
159
160    pa_memimport *import;
161    pa_memexport *export;
162
163    pa_pstream_packet_cb_t receive_packet_callback;
164    void *receive_packet_callback_userdata;
165
166    pa_pstream_memblock_cb_t receive_memblock_callback;
167    void *receive_memblock_callback_userdata;
168
169    pa_pstream_notify_cb_t drain_callback;
170    void *drain_callback_userdata;
171
172    pa_pstream_notify_cb_t die_callback;
173    void *die_callback_userdata;
174
175    pa_pstream_block_id_cb_t revoke_callback;
176    void *revoke_callback_userdata;
177
178    pa_pstream_block_id_cb_t release_callback;
179    void *release_callback_userdata;
180
181    pa_mempool *mempool;
182
183#ifdef HAVE_CREDS
184    pa_cmsg_ancil_data read_ancil_data, *write_ancil_data;
185    bool send_ancil_data_now;
186#endif
187};
188
189#ifdef HAVE_CREDS
190/*
191 * memfd-backed SHM pools blocks transfer occur without passing the pool's
192 * fd every time, thus minimizing overhead and avoiding fd leaks. A
193 * REGISTER_MEMFD_SHMID command is sent, with the pool's memfd fd, very early
194 * on. This command has an ID that uniquely identifies the pool in question.
195 * Further pool's block references can then be exclusively done using such ID;
196 * the fd can be safely closed – on both ends – afterwards.
197 *
198 * On the sending side of this command, we want to close the passed fds
199 * directly after being sent. Meanwhile we're only allowed to asynchronously
200 * schedule packet writes to the pstream, so the job of closing passed fds is
201 * left to the pstream's actual writing function do_write(): it knows the
202 * exact point in time where the fds are passed to the other end through
203 * iochannels and the sendmsg() system call.
204 *
205 * Nonetheless not all code paths in the system desire their socket-passed
206 * fds to be closed after the send. srbchannel needs the passed fds to still
207 * be open for further communication. System-wide global memfd-backed pools
208 * also require the passed fd to be open: they pass the same fd, with the same
209 * ID registration mechanism, for each newly connected client to the system.
210 *
211 * So from all of the above, never close the ancillary fds by your own and
212 * always call below method instead. It takes care of closing the passed fds
213 * _only if allowed_ by the code paths that originally created them to do so.
214 * Moreover, it is multiple-invocations safe: failure handlers can, and
215 * should, call it for passed fds cleanup without worrying too much about
216 * the system state.
217 */
218void pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil) {
219    if (ancil && ancil->nfd > 0 && ancil->close_fds_on_cleanup) {
220        int i;
221
222        pa_assert(ancil->nfd <= MAX_ANCIL_DATA_FDS);
223
224        for (i = 0; i < ancil->nfd; i++)
225            if (ancil->fds[i] != -1) {
226                pa_assert_se(pa_close(ancil->fds[i]) == 0);
227                ancil->fds[i] = -1;
228            }
229
230        ancil->nfd = 0;
231        ancil->close_fds_on_cleanup = false;
232    }
233}
234#endif
235
236static int do_write(pa_pstream *p);
237static int do_read(pa_pstream *p, struct pstream_read *re);
238
239static void do_pstream_read_write(pa_pstream *p) {
240    pa_assert(p);
241    pa_assert(PA_REFCNT_VALUE(p) > 0);
242
243    pa_pstream_ref(p);
244
245    p->mainloop->defer_enable(p->defer_event, 0);
246
247    if (!p->dead && p->srb) {
248        int r = 0;
249
250        if(do_write(p) < 0)
251            goto fail;
252
253        while (!p->dead && r == 0) {
254            r = do_read(p, &p->readsrb);
255            if (r < 0)
256                goto fail;
257        }
258    }
259
260    if (!p->dead && pa_iochannel_is_readable(p->io)) {
261        if (do_read(p, &p->readio) < 0)
262            goto fail;
263    } else if (!p->dead && pa_iochannel_is_hungup(p->io))
264        goto fail;
265
266    while (!p->dead && pa_iochannel_is_writable(p->io)) {
267        int r = do_write(p);
268        if (r < 0)
269            goto fail;
270        if (r == 0)
271            break;
272    }
273
274    pa_pstream_unref(p);
275    return;
276
277fail:
278
279    if (p->die_callback)
280        p->die_callback(p, p->die_callback_userdata);
281
282    pa_pstream_unlink(p);
283    pa_pstream_unref(p);
284}
285
286static bool srb_callback(pa_srbchannel *srb, void *userdata) {
287    bool b;
288    pa_pstream *p = userdata;
289
290    pa_assert(p);
291    pa_assert(PA_REFCNT_VALUE(p) > 0);
292    pa_assert(p->srb == srb);
293
294    pa_pstream_ref(p);
295
296    do_pstream_read_write(p);
297
298    /* If either pstream or the srb is going away, return false.
299       We need to check this before p is destroyed. */
300    b = (PA_REFCNT_VALUE(p) > 1) && (p->srb == srb);
301    pa_pstream_unref(p);
302
303    return b;
304}
305
306static void io_callback(pa_iochannel*io, void *userdata) {
307    pa_pstream *p = userdata;
308
309    pa_assert(p);
310    pa_assert(PA_REFCNT_VALUE(p) > 0);
311    pa_assert(p->io == io);
312
313    do_pstream_read_write(p);
314}
315
316static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
317    pa_pstream *p = userdata;
318
319    pa_assert(p);
320    pa_assert(PA_REFCNT_VALUE(p) > 0);
321    pa_assert(p->defer_event == e);
322    pa_assert(p->mainloop == m);
323
324    do_pstream_read_write(p);
325}
326
327static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
328
329pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
330    pa_pstream *p;
331
332    pa_assert(m);
333    pa_assert(io);
334    pa_assert(pool);
335
336    p = pa_xnew0(pa_pstream, 1);
337    PA_REFCNT_INIT(p);
338    p->io = io;
339    pa_iochannel_set_callback(io, io_callback, p);
340
341    p->mainloop = m;
342    p->defer_event = m->defer_new(m, defer_callback, p);
343    m->defer_enable(p->defer_event, 0);
344
345    p->send_queue = pa_queue_new();
346
347    p->mempool = pool;
348
349    /* We do importing unconditionally */
350    p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
351
352    pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
353    pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
354
355    return p;
356}
357
358/* Attach memfd<->SHM_ID mapping to given pstream and its memimport.
359 * Check pa_pstream_register_memfd_mempool() for further info.
360 *
361 * Caller owns the passed @memfd_fd and must close it down when appropriate. */
362int pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd) {
363    int err = -1;
364
365    pa_assert(memfd_fd != -1);
366
367    if (!p->use_memfd) {
368        pa_log_warn("Received memfd ID registration request over a pipe "
369                    "that does not support memfds");
370        return err;
371    }
372
373    if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
374        pa_log_warn("previously registered memfd SHM ID = %u", shm_id);
375        return err;
376    }
377
378    if (pa_memimport_attach_memfd(p->import, shm_id, memfd_fd, true)) {
379        pa_log("Failed to create permanent mapping for memfd region with ID = %u", shm_id);
380        return err;
381    }
382
383    pa_assert_se(pa_idxset_put(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL) == 0);
384    return 0;
385}
386
387static void item_free(void *item) {
388    struct item_info *i = item;
389    pa_assert(i);
390
391    if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
392        pa_assert(i->chunk.memblock);
393        pa_memblock_unref(i->chunk.memblock);
394    } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
395        pa_assert(i->packet);
396        pa_packet_unref(i->packet);
397    }
398
399#ifdef HAVE_CREDS
400    /* On error recovery paths, there might be lingering items
401     * on the pstream send queue and they are usually freed with
402     * a call to 'pa_queue_free(p->send_queue, item_free)'. Make
403     * sure we do not leak any fds in that case! */
404    if (i->with_ancil_data)
405        pa_cmsg_ancil_data_close_fds(&i->ancil_data);
406#endif
407
408    if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
409        pa_xfree(i);
410}
411
412static void pstream_free(pa_pstream *p) {
413    pa_assert(p);
414
415    pa_pstream_unlink(p);
416
417    pa_queue_free(p->send_queue, item_free);
418
419    if (p->write.current)
420        item_free(p->write.current);
421
422    if (p->write.memchunk.memblock)
423        pa_memblock_unref(p->write.memchunk.memblock);
424
425    if (p->readsrb.memblock)
426        pa_memblock_unref(p->readsrb.memblock);
427
428    if (p->readsrb.packet)
429        pa_packet_unref(p->readsrb.packet);
430
431    if (p->readio.memblock)
432        pa_memblock_unref(p->readio.memblock);
433
434    if (p->readio.packet)
435        pa_packet_unref(p->readio.packet);
436
437    if (p->registered_memfd_ids)
438        pa_idxset_free(p->registered_memfd_ids, NULL);
439
440    pa_xfree(p);
441}
442
443void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data) {
444    struct item_info *i;
445
446    pa_assert(p);
447    pa_assert(PA_REFCNT_VALUE(p) > 0);
448    pa_assert(packet);
449
450    if (p->dead) {
451#ifdef HAVE_CREDS
452        pa_cmsg_ancil_data_close_fds(ancil_data);
453#endif
454        return;
455    }
456
457    if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
458        i = pa_xnew(struct item_info, 1);
459
460    i->type = PA_PSTREAM_ITEM_PACKET;
461    i->packet = pa_packet_ref(packet);
462
463#ifdef HAVE_CREDS
464    if ((i->with_ancil_data = !!ancil_data)) {
465        i->ancil_data = *ancil_data;
466        if (ancil_data->creds_valid)
467            pa_assert(ancil_data->nfd == 0);
468        else
469            pa_assert(ancil_data->nfd > 0);
470    }
471#endif
472
473    pa_queue_push(p->send_queue, i);
474    if (PaQueueGetLen(p->send_queue) >= 10) {  // 10 maybe have msg backlog
475        pa_log_warn("[MSG backlog]: PaQueueLen = %u", PaQueueGetLen(p->send_queue));
476    }
477
478    p->mainloop->defer_enable(p->defer_event, 1);
479}
480
481void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
482    size_t length, idx;
483    size_t bsm;
484
485    pa_assert(p);
486    pa_assert(PA_REFCNT_VALUE(p) > 0);
487    pa_assert(channel != (uint32_t) -1);
488    pa_assert(chunk);
489
490    if (p->dead)
491        return;
492
493    idx = 0;
494    length = chunk->length;
495
496    bsm = pa_mempool_block_size_max(p->mempool);
497
498    while (length > 0) {
499        struct item_info *i;
500        size_t n;
501
502        if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
503            i = pa_xnew(struct item_info, 1);
504        i->type = PA_PSTREAM_ITEM_MEMBLOCK;
505
506        n = PA_MIN(length, bsm);
507        i->chunk.index = chunk->index + idx;
508        i->chunk.length = n;
509        i->chunk.memblock = pa_memblock_ref(chunk->memblock);
510
511        i->channel = channel;
512        i->offset = offset;
513        i->seek_mode = seek_mode;
514#ifdef HAVE_CREDS
515        i->with_ancil_data = false;
516#endif
517
518        pa_queue_push(p->send_queue, i);
519
520        idx += n;
521        length -= n;
522    }
523
524    p->mainloop->defer_enable(p->defer_event, 1);
525}
526
527void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
528    struct item_info *item;
529    pa_assert(p);
530    pa_assert(PA_REFCNT_VALUE(p) > 0);
531
532    if (p->dead)
533        return;
534
535/*     pa_log("Releasing block %u", block_id); */
536
537    if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
538        item = pa_xnew(struct item_info, 1);
539    item->type = PA_PSTREAM_ITEM_SHMRELEASE;
540    item->block_id = block_id;
541#ifdef HAVE_CREDS
542    item->with_ancil_data = false;
543#endif
544
545    pa_queue_push(p->send_queue, item);
546    p->mainloop->defer_enable(p->defer_event, 1);
547}
548
549/* might be called from thread context */
550static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
551    pa_pstream *p = userdata;
552
553    pa_assert(p);
554    pa_assert(PA_REFCNT_VALUE(p) > 0);
555
556    if (p->dead)
557        return;
558
559    if (p->release_callback)
560        p->release_callback(p, block_id, p->release_callback_userdata);
561    else
562        pa_pstream_send_release(p, block_id);
563}
564
565void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
566    struct item_info *item;
567    pa_assert(p);
568    pa_assert(PA_REFCNT_VALUE(p) > 0);
569
570    if (p->dead)
571        return;
572/*     pa_log("Revoking block %u", block_id); */
573
574    if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
575        item = pa_xnew(struct item_info, 1);
576    item->type = PA_PSTREAM_ITEM_SHMREVOKE;
577    item->block_id = block_id;
578#ifdef HAVE_CREDS
579    item->with_ancil_data = false;
580#endif
581
582    pa_queue_push(p->send_queue, item);
583    p->mainloop->defer_enable(p->defer_event, 1);
584}
585
586/* might be called from thread context */
587static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
588    pa_pstream *p = userdata;
589
590    pa_assert(p);
591    pa_assert(PA_REFCNT_VALUE(p) > 0);
592
593    if (p->revoke_callback)
594        p->revoke_callback(p, block_id, p->revoke_callback_userdata);
595    else
596        pa_pstream_send_revoke(p, block_id);
597}
598
599static void prepare_next_write_item(pa_pstream *p) {
600    pa_assert(p);
601    pa_assert(PA_REFCNT_VALUE(p) > 0);
602
603    p->write.current = pa_queue_pop(p->send_queue);
604
605    if (!p->write.current)
606        return;
607    p->write.index = 0;
608    p->write.data = NULL;
609    p->write.minibuf_validsize = 0;
610    pa_memchunk_reset(&p->write.memchunk);
611
612    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
613    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
614    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
615    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
616    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
617
618    if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
619        size_t plen;
620
621        pa_assert(p->write.current->packet);
622
623        p->write.data = (void *) pa_packet_data(p->write.current->packet, &plen);
624        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) plen);
625
626        if (plen <= MINIBUF_SIZE - PA_PSTREAM_DESCRIPTOR_SIZE) {
627            memcpy(&p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE], p->write.data, plen);
628            p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + plen;
629        }
630
631    } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
632
633        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
634        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
635
636    } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
637
638        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
639        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
640
641    } else {
642        uint32_t flags;
643        bool send_payload = true;
644
645        pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
646        pa_assert(p->write.current->chunk.memblock);
647
648        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
649        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
650        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
651
652        flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
653
654        if (p->use_shm) {
655            pa_mem_type_t type;
656            uint32_t block_id, shm_id;
657            size_t offset, length;
658            uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
659            size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
660            pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
661            pa_memexport *current_export;
662
663            if (p->mempool == current_pool)
664                pa_assert_se(current_export = p->export);
665            else
666                pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
667
668            if (pa_memexport_put(current_export,
669                                 p->write.current->chunk.memblock,
670                                 &type,
671                                 &block_id,
672                                 &shm_id,
673                                 &offset,
674                                 &length) >= 0) {
675
676                if (type == PA_MEM_TYPE_SHARED_POSIX)
677                    send_payload = false;
678
679                if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd) {
680                    if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
681                        flags |= PA_FLAG_SHMDATA_MEMFD_BLOCK;
682                        send_payload = false;
683                    } else {
684                        if (!p->non_registered_memfd_id_error_logged) {
685                            pa_log("Cannot send block reference with non-registered memfd ID = %u", shm_id);
686                            pa_log("Falling back to copying full block data over socket");
687                            pa_log("There's a bug report about this: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/issues/824");
688                            p->non_registered_memfd_id_error_logged = true;
689                        }
690                    }
691                }
692
693                if (send_payload) {
694                    pa_assert_se(pa_memexport_process_release(current_export, block_id) == 0);
695                } else {
696                    flags |= PA_FLAG_SHMDATA;
697                    if (pa_mempool_is_remote_writable(current_pool))
698                        flags |= PA_FLAG_SHMWRITABLE;
699
700                    shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
701                    shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
702                    shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
703                    shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
704
705                    p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
706                    p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
707                }
708            }
709/*             else */
710/*                 FIXME: Avoid memexport slot leaks. Call pa_memexport_process_release() */
711/*                 pa_log_warn("Failed to export memory block."); */
712
713            if (current_export != p->export)
714                pa_memexport_free(current_export);
715            pa_mempool_unref(current_pool);
716        }
717
718        if (send_payload) {
719            p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
720            p->write.memchunk = p->write.current->chunk;
721            pa_memblock_ref(p->write.memchunk.memblock);
722        }
723
724        p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
725    }
726
727#ifdef HAVE_CREDS
728    if ((p->send_ancil_data_now = p->write.current->with_ancil_data))
729        p->write_ancil_data = &p->write.current->ancil_data;
730#endif
731}
732
733static void check_srbpending(pa_pstream *p) {
734    if (!p->is_srbpending)
735        return;
736
737    if (p->srb)
738        pa_srbchannel_free(p->srb);
739
740    p->srb = p->srbpending;
741    p->is_srbpending = false;
742
743    if (p->srb)
744        pa_srbchannel_set_callback(p->srb, srb_callback, p);
745}
746
747static int do_write(pa_pstream *p) {
748    void *d;
749    size_t l;
750    ssize_t r;
751    pa_memblock *release_memblock = NULL;
752
753    pa_assert(p);
754    pa_assert(PA_REFCNT_VALUE(p) > 0);
755
756    if (!p->write.current)
757        prepare_next_write_item(p);
758
759    if (!p->write.current) {
760        /* The out queue is empty, so switching channels is safe */
761        check_srbpending(p);
762        return 0;
763    }
764
765    if (p->write.minibuf_validsize > 0) {
766        d = p->write.minibuf + p->write.index;
767        l = p->write.minibuf_validsize - p->write.index;
768    } else if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
769        d = (uint8_t*) p->write.descriptor + p->write.index;
770        l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
771    } else {
772        pa_assert(p->write.data || p->write.memchunk.memblock);
773
774        if (p->write.data)
775            d = p->write.data;
776        else {
777            d = pa_memblock_acquire_chunk(&p->write.memchunk);
778            release_memblock = p->write.memchunk.memblock;
779        }
780
781        d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
782        l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
783    }
784
785    pa_assert(l > 0);
786
787#ifdef HAVE_CREDS
788    if (p->send_ancil_data_now) {
789        if (p->write_ancil_data->creds_valid) {
790            pa_assert(p->write_ancil_data->nfd == 0);
791            if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil_data->creds)) < 0)
792                goto fail;
793        }
794        else
795            if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil_data->nfd, p->write_ancil_data->fds)) < 0)
796                goto fail;
797
798        pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
799        p->send_ancil_data_now = false;
800    } else
801#endif
802    if (p->srb)
803        r = pa_srbchannel_write(p->srb, d, l);
804    else if ((r = pa_iochannel_write(p->io, d, l)) < 0)
805        goto fail;
806
807    if (release_memblock)
808        pa_memblock_release(release_memblock);
809
810    p->write.index += (size_t) r;
811
812    if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
813        pa_assert(p->write.current);
814        item_free(p->write.current);
815        p->write.current = NULL;
816
817        if (p->write.memchunk.memblock)
818            pa_memblock_unref(p->write.memchunk.memblock);
819
820        pa_memchunk_reset(&p->write.memchunk);
821
822        if (p->drain_callback && !pa_pstream_is_pending(p))
823            p->drain_callback(p, p->drain_callback_userdata);
824    }
825
826    return (size_t) r == l ? 1 : 0;
827
828fail:
829#ifdef HAVE_CREDS
830    if (p->send_ancil_data_now)
831        pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
832#endif
833
834    if (release_memblock)
835        pa_memblock_release(release_memblock);
836
837    return -1;
838}
839
840static void memblock_complete(pa_pstream *p, struct pstream_read *re) {
841    pa_memchunk chunk;
842    int64_t offset;
843
844    if (!p->receive_memblock_callback)
845        return;
846
847    chunk.memblock = re->memblock;
848    chunk.index = 0;
849    chunk.length = re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
850
851    offset = (int64_t) (
852             (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
853             (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
854
855    p->receive_memblock_callback(
856        p,
857        ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
858        offset,
859        ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
860        &chunk,
861        p->receive_memblock_callback_userdata);
862}
863
864static int do_read(pa_pstream *p, struct pstream_read *re) {
865    void *d;
866    size_t l;
867    ssize_t r;
868    pa_memblock *release_memblock = NULL;
869    pa_assert(p);
870    pa_assert(PA_REFCNT_VALUE(p) > 0);
871
872    if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
873        d = (uint8_t*) re->descriptor + re->index;
874        l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
875    } else {
876        pa_assert(re->data || re->memblock);
877
878        if (re->data)
879            d = re->data;
880        else {
881            d = pa_memblock_acquire(re->memblock);
882            release_memblock = re->memblock;
883        }
884
885        d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
886        l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE);
887    }
888
889    if (re == &p->readsrb) {
890        r = pa_srbchannel_read(p->srb, d, l);
891        if (r == 0) {
892            if (release_memblock)
893                pa_memblock_release(release_memblock);
894            return 1;
895        }
896    }
897    else
898#ifdef HAVE_CREDS
899    {
900        pa_cmsg_ancil_data b;
901
902        if ((r = pa_iochannel_read_with_ancil_data(p->io, d, l, &b)) <= 0)
903            goto fail;
904
905        if (b.creds_valid) {
906            p->read_ancil_data.creds_valid = true;
907            p->read_ancil_data.creds = b.creds;
908        }
909        if (b.nfd > 0) {
910            pa_assert(b.nfd <= MAX_ANCIL_DATA_FDS);
911            p->read_ancil_data.nfd = b.nfd;
912            memcpy(p->read_ancil_data.fds, b.fds, sizeof(int) * b.nfd);
913            p->read_ancil_data.close_fds_on_cleanup = b.close_fds_on_cleanup;
914        }
915    }
916#else
917    if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
918        goto fail;
919#endif
920
921    if (release_memblock)
922        pa_memblock_release(release_memblock);
923
924    re->index += (size_t) r;
925
926    if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
927        uint32_t flags, length, channel;
928        /* Reading of frame descriptor complete */
929
930        flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
931
932        if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
933            pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
934            return -1;
935        }
936
937        if (flags == PA_FLAG_SHMRELEASE) {
938
939            /* This is a SHM memblock release frame with no payload */
940
941/*             pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
942
943            pa_assert(p->export);
944            pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
945
946            goto frame_done;
947
948        } else if (flags == PA_FLAG_SHMREVOKE) {
949
950            /* This is a SHM memblock revoke frame with no payload */
951
952/*             pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
953
954            pa_assert(p->import);
955            pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
956
957            goto frame_done;
958        }
959
960        length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
961
962        if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
963            pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
964            return -1;
965        }
966
967        pa_assert(!re->packet && !re->memblock);
968
969        channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
970
971        if (channel == (uint32_t) -1) {
972            size_t plen;
973
974            if (flags != 0) {
975                pa_log_warn("Received packet frame with invalid flags value.");
976                return -1;
977            }
978
979            /* Frame is a packet frame */
980            re->packet = pa_packet_new(length);
981            re->data = (void *) pa_packet_data(re->packet, &plen);
982
983        } else {
984
985            if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
986                pa_log_warn("Received memblock frame with invalid seek mode.");
987                return -1;
988            }
989
990            if (((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0) {
991
992                if (length != sizeof(re->shm_info)) {
993                    pa_log_warn("Received SHM memblock frame with invalid frame length.");
994                    return -1;
995                }
996
997                /* Frame is a memblock frame referencing an SHM memblock */
998                re->data = re->shm_info;
999
1000            } else if ((flags & PA_FLAG_SHMMASK) == 0) {
1001
1002                /* Frame is a memblock frame */
1003
1004                re->memblock = pa_memblock_new(p->mempool, length);
1005                re->data = NULL;
1006            } else {
1007
1008                pa_log_warn("Received memblock frame with invalid flags value.");
1009                return -1;
1010            }
1011        }
1012
1013    } else if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
1014        /* Frame complete */
1015
1016        if (re->memblock) {
1017            memblock_complete(p, re);
1018
1019            /* This was a memblock frame. We can unref the memblock now */
1020            pa_memblock_unref(re->memblock);
1021
1022        } else if (re->packet) {
1023
1024            if (p->receive_packet_callback)
1025#ifdef HAVE_CREDS
1026                p->receive_packet_callback(p, re->packet, &p->read_ancil_data, p->receive_packet_callback_userdata);
1027#else
1028                p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
1029#endif
1030
1031            pa_packet_unref(re->packet);
1032        } else {
1033            pa_memblock *b = NULL;
1034            uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
1035            uint32_t shm_id = ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]);
1036            pa_mem_type_t type = (flags & PA_FLAG_SHMDATA_MEMFD_BLOCK) ?
1037                                 PA_MEM_TYPE_SHARED_MEMFD : PA_MEM_TYPE_SHARED_POSIX;
1038
1039            pa_assert(((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0);
1040            pa_assert(p->import);
1041
1042            if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd &&
1043                !pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
1044
1045                if (pa_log_ratelimit(PA_LOG_ERROR))
1046                    pa_log("Ignoring received block reference with non-registered memfd ID = %u", shm_id);
1047
1048            } else if (!(b = pa_memimport_get(p->import,
1049                                              type,
1050                                              ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
1051                                              shm_id,
1052                                              ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
1053                                              ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
1054                                              !!(flags & PA_FLAG_SHMWRITABLE)))) {
1055
1056                if (pa_log_ratelimit(PA_LOG_DEBUG))
1057                    pa_log_debug("Failed to import memory block.");
1058            }
1059
1060            if (p->receive_memblock_callback) {
1061                int64_t offset;
1062                pa_memchunk chunk;
1063
1064                chunk.memblock = b;
1065                chunk.index = 0;
1066                chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
1067
1068                offset = (int64_t) (
1069                        (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
1070                        (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
1071
1072                p->receive_memblock_callback(
1073                        p,
1074                        ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
1075                        offset,
1076                        ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
1077                        &chunk,
1078                        p->receive_memblock_callback_userdata);
1079            }
1080
1081            if (b)
1082                pa_memblock_unref(b);
1083        }
1084
1085        goto frame_done;
1086    }
1087
1088    return 0;
1089
1090frame_done:
1091    re->memblock = NULL;
1092    re->packet = NULL;
1093    re->index = 0;
1094    re->data = NULL;
1095
1096#ifdef HAVE_CREDS
1097    /* FIXME: Close received ancillary data fds if the pstream's
1098     * receive_packet_callback did not do so.
1099     *
1100     * Malicious clients can attach fds to unknown commands, or attach them
1101     * to commands that does not expect fds. By doing so, server will reach
1102     * its open fd limit and future clients' SHM transfers will always fail.
1103     */
1104    p->read_ancil_data.creds_valid = false;
1105    p->read_ancil_data.nfd = 0;
1106#endif
1107
1108    return 0;
1109
1110fail:
1111    if (release_memblock)
1112        pa_memblock_release(release_memblock);
1113
1114    return -1;
1115}
1116
1117void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1118    pa_assert(p);
1119    pa_assert(PA_REFCNT_VALUE(p) > 0);
1120
1121    p->die_callback = cb;
1122    p->die_callback_userdata = userdata;
1123}
1124
1125void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
1126    pa_assert(p);
1127    pa_assert(PA_REFCNT_VALUE(p) > 0);
1128
1129    p->drain_callback = cb;
1130    p->drain_callback_userdata = userdata;
1131}
1132
1133void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
1134    pa_assert(p);
1135    pa_assert(PA_REFCNT_VALUE(p) > 0);
1136
1137    p->receive_packet_callback = cb;
1138    p->receive_packet_callback_userdata = userdata;
1139}
1140
1141void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
1142    pa_assert(p);
1143    pa_assert(PA_REFCNT_VALUE(p) > 0);
1144
1145    p->receive_memblock_callback = cb;
1146    p->receive_memblock_callback_userdata = userdata;
1147}
1148
1149void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1150    pa_assert(p);
1151    pa_assert(PA_REFCNT_VALUE(p) > 0);
1152
1153    p->release_callback = cb;
1154    p->release_callback_userdata = userdata;
1155}
1156
1157void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
1158    pa_assert(p);
1159    pa_assert(PA_REFCNT_VALUE(p) > 0);
1160
1161    p->revoke_callback = cb;
1162    p->revoke_callback_userdata = userdata;
1163}
1164
1165bool pa_pstream_is_pending(pa_pstream *p) {
1166    bool b;
1167
1168    pa_assert(p);
1169    pa_assert(PA_REFCNT_VALUE(p) > 0);
1170
1171    if (p->dead)
1172        b = false;
1173    else
1174        b = p->write.current || !pa_queue_isempty(p->send_queue);
1175
1176    return b;
1177}
1178
1179void pa_pstream_unref(pa_pstream*p) {
1180    pa_assert(p);
1181    pa_assert(PA_REFCNT_VALUE(p) > 0);
1182
1183    if (PA_REFCNT_DEC(p) <= 0)
1184        pstream_free(p);
1185}
1186
1187pa_pstream* pa_pstream_ref(pa_pstream*p) {
1188    pa_assert(p);
1189    pa_assert(PA_REFCNT_VALUE(p) > 0);
1190
1191    PA_REFCNT_INC(p);
1192    return p;
1193}
1194
1195void pa_pstream_unlink(pa_pstream *p) {
1196    pa_assert(p);
1197
1198    if (p->dead)
1199        return;
1200
1201    p->dead = true;
1202
1203    while (p->srb || p->is_srbpending) /* In theory there could be one active and one pending */
1204        pa_pstream_set_srbchannel(p, NULL);
1205
1206    if (p->import) {
1207        pa_memimport_free(p->import);
1208        p->import = NULL;
1209    }
1210
1211    if (p->export) {
1212        pa_memexport_free(p->export);
1213        p->export = NULL;
1214    }
1215
1216    if (p->io) {
1217        pa_iochannel_free(p->io);
1218        p->io = NULL;
1219    }
1220
1221    if (p->defer_event) {
1222        p->mainloop->defer_free(p->defer_event);
1223        p->defer_event = NULL;
1224    }
1225
1226    p->die_callback = NULL;
1227    p->drain_callback = NULL;
1228    p->receive_packet_callback = NULL;
1229    p->receive_memblock_callback = NULL;
1230}
1231
1232void pa_pstream_enable_shm(pa_pstream *p, bool enable) {
1233    pa_assert(p);
1234    pa_assert(PA_REFCNT_VALUE(p) > 0);
1235
1236    p->use_shm = enable;
1237
1238    if (enable) {
1239
1240        if (!p->export)
1241            p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1242
1243    } else {
1244
1245        if (p->export) {
1246            pa_memexport_free(p->export);
1247            p->export = NULL;
1248        }
1249    }
1250}
1251
1252void pa_pstream_enable_memfd(pa_pstream *p) {
1253    pa_assert(p);
1254    pa_assert(PA_REFCNT_VALUE(p) > 0);
1255    pa_assert(p->use_shm);
1256
1257    p->use_memfd = true;
1258
1259    if (!p->registered_memfd_ids) {
1260        p->registered_memfd_ids = pa_idxset_new(NULL, NULL);
1261    }
1262}
1263
1264bool pa_pstream_get_shm(pa_pstream *p) {
1265    pa_assert(p);
1266    pa_assert(PA_REFCNT_VALUE(p) > 0);
1267
1268    return p->use_shm;
1269}
1270
1271bool pa_pstream_get_memfd(pa_pstream *p) {
1272    pa_assert(p);
1273    pa_assert(PA_REFCNT_VALUE(p) > 0);
1274
1275    return p->use_memfd;
1276}
1277
1278void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) {
1279    pa_assert(p);
1280    pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL);
1281
1282    if (srb == p->srb)
1283        return;
1284
1285    /* We can't handle quick switches between srbchannels. */
1286    pa_assert(!p->is_srbpending);
1287
1288    p->srbpending = srb;
1289    p->is_srbpending = true;
1290
1291    /* Switch immediately, if possible. */
1292    if (p->dead)
1293        check_srbpending(p);
1294    else
1295        do_write(p);
1296}
1297