1/***
2  This file is part of PulseAudio.
3
4  Copyright 2014 David Henningsson, Canonical Ltd.
5
6  PulseAudio is free software; you can redistribute it and/or modify
7  it under the terms of the GNU Lesser General Public License as
8  published by the Free Software Foundation; either version 2.1 of the
9  License, or (at your option) any later version.
10
11  PulseAudio is distributed in the hope that it will be useful, but
12  WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  Lesser General Public License for more details.
15
16  You should have received a copy of the GNU Lesser General Public
17  License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18***/
19
20#ifdef HAVE_CONFIG_H
21#include <config.h>
22#endif
23
24#include "srbchannel.h"
25
26#include <pulsecore/atomic.h>
27#include <pulse/xmalloc.h>
28
29/* #define DEBUG_SRBCHANNEL */
30
31/* This ringbuffer might be useful in other contexts too, but
32 * right now it's only used inside the srbchannel, so let's keep it here
33 * for the time being. */
34typedef struct pa_ringbuffer pa_ringbuffer;
35
36struct pa_ringbuffer {
37    pa_atomic_t *count; /* amount of data in the buffer */
38    int capacity;
39    uint8_t *memory;
40    int readindex, writeindex;
41};
42
43static void *pa_ringbuffer_peek(pa_ringbuffer *r, int *count) {
44    int c = pa_atomic_load(r->count);
45
46    if (r->readindex + c > r->capacity)
47        *count = r->capacity - r->readindex;
48    else
49        *count = c;
50
51    return r->memory + r->readindex;
52}
53
54/* Returns true only if the buffer was completely full before the drop. */
55static bool pa_ringbuffer_drop(pa_ringbuffer *r, int count) {
56    bool b = pa_atomic_sub(r->count, count) >= r->capacity;
57
58    r->readindex += count;
59    r->readindex %= r->capacity;
60
61    return b;
62}
63
64static void *pa_ringbuffer_begin_write(pa_ringbuffer *r, int *count) {
65    int c = pa_atomic_load(r->count);
66
67    *count = PA_MIN(r->capacity - r->writeindex, r->capacity - c);
68
69    return r->memory + r->writeindex;
70}
71
72static void pa_ringbuffer_end_write(pa_ringbuffer *r, int count) {
73    pa_atomic_add(r->count, count);
74    r->writeindex += count;
75    r->writeindex %= r->capacity;
76}
77
78struct pa_srbchannel {
79    pa_ringbuffer rb_read, rb_write;
80    pa_fdsem *sem_read, *sem_write;
81    pa_memblock *memblock;
82
83    void *cb_userdata;
84    pa_srbchannel_cb_t callback;
85
86    pa_io_event *read_event;
87    pa_defer_event *defer_event;
88    pa_mainloop_api *mainloop;
89};
90
91/* We always listen to sem_read, and always signal on sem_write.
92 *
93 * This means we signal the same semaphore for two scenarios:
94 * 1) We have written something to our send buffer, and want the other
95 *    side to read it
96 * 2) We have read something from our receive buffer that was previously
97 *    completely full, and want the other side to continue writing
98*/
99
100size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l) {
101    size_t written = 0;
102
103    while (l > 0) {
104        int towrite;
105        void *ptr = pa_ringbuffer_begin_write(&sr->rb_write, &towrite);
106
107        if ((size_t) towrite > l)
108            towrite = l;
109
110        if (towrite == 0) {
111#ifdef DEBUG_SRBCHANNEL
112            pa_log("srbchannel output buffer full");
113#endif
114            break;
115        }
116
117        memcpy(ptr, data, towrite);
118        pa_ringbuffer_end_write(&sr->rb_write, towrite);
119        written += towrite;
120        data = (uint8_t*) data + towrite;
121        l -= towrite;
122    }
123#ifdef DEBUG_SRBCHANNEL
124    pa_log("Wrote %d bytes to srbchannel, signalling fdsem", (int) written);
125#endif
126
127    pa_fdsem_post(sr->sem_write);
128    return written;
129}
130
131size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l) {
132    size_t isread = 0;
133
134    while (l > 0) {
135        int toread;
136        void *ptr = pa_ringbuffer_peek(&sr->rb_read, &toread);
137
138        if ((size_t) toread > l)
139            toread = l;
140
141        if (toread == 0)
142            break;
143
144        memcpy(data, ptr, toread);
145
146        if (pa_ringbuffer_drop(&sr->rb_read, toread)) {
147#ifdef DEBUG_SRBCHANNEL
148            pa_log("Read from full output buffer, signalling fdsem");
149#endif
150            pa_fdsem_post(sr->sem_write);
151        }
152
153        isread += toread;
154        data = (uint8_t*) data + toread;
155        l -= toread;
156    }
157
158#ifdef DEBUG_SRBCHANNEL
159    pa_log("Read %d bytes from srbchannel", (int) isread);
160#endif
161
162    return isread;
163}
164
165/* This is the memory layout of the ringbuffer shm block. It is followed by
166   read and write ringbuffer memory. */
167struct srbheader {
168    pa_atomic_t read_count;
169    pa_atomic_t write_count;
170
171    pa_fdsem_data read_semdata;
172    pa_fdsem_data write_semdata;
173
174    int capacity;
175    int readbuf_offset;
176    int writebuf_offset;
177
178    /* TODO: Maybe a marker here to make sure we talk to a server with equally sized struct */
179};
180
181static void srbchannel_rwloop(pa_srbchannel* sr) {
182    do {
183#ifdef DEBUG_SRBCHANNEL
184        int q;
185        pa_ringbuffer_peek(&sr->rb_read, &q);
186        pa_log("In rw loop from srbchannel, before callback, count = %d", q);
187#endif
188
189        if (sr->callback) {
190            if (!sr->callback(sr, sr->cb_userdata)) {
191#ifdef DEBUG_SRBCHANNEL
192                pa_log("Aborting read loop from srbchannel");
193#endif
194                return;
195            }
196        }
197
198#ifdef DEBUG_SRBCHANNEL
199        pa_ringbuffer_peek(&sr->rb_read, &q);
200        pa_log("In rw loop from srbchannel, after callback, count = %d", q);
201#endif
202
203    } while (pa_fdsem_before_poll(sr->sem_read) < 0);
204}
205
206static void semread_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t events, void *userdata) {
207    pa_srbchannel* sr = userdata;
208
209    pa_fdsem_after_poll(sr->sem_read);
210    srbchannel_rwloop(sr);
211}
212
213static void defer_cb(pa_mainloop_api *m, pa_defer_event *e, void *userdata) {
214    pa_srbchannel* sr = userdata;
215
216#ifdef DEBUG_SRBCHANNEL
217    pa_log("Calling rw loop from deferred event");
218#endif
219
220    m->defer_enable(e, 0);
221    srbchannel_rwloop(sr);
222}
223
224pa_srbchannel* pa_srbchannel_new(pa_mainloop_api *m, pa_mempool *p) {
225    int capacity;
226    int readfd;
227    struct srbheader *srh;
228
229    pa_srbchannel* sr = pa_xmalloc0(sizeof(pa_srbchannel));
230    sr->mainloop = m;
231    sr->memblock = pa_memblock_new_pool(p, -1);
232    if (!sr->memblock)
233        goto fail;
234
235    srh = pa_memblock_acquire(sr->memblock);
236    pa_zero(*srh);
237
238    sr->rb_read.memory = (uint8_t*) srh + PA_ALIGN(sizeof(*srh));
239    srh->readbuf_offset = sr->rb_read.memory - (uint8_t*) srh;
240
241    capacity = (pa_memblock_get_length(sr->memblock) - srh->readbuf_offset) / 2;
242
243    sr->rb_write.memory = PA_ALIGN_PTR(sr->rb_read.memory + capacity);
244    srh->writebuf_offset = sr->rb_write.memory - (uint8_t*) srh;
245
246    capacity = PA_MIN(capacity, srh->writebuf_offset - srh->readbuf_offset);
247
248    pa_log_debug("SHM block is %d bytes, ringbuffer capacity is 2 * %d bytes",
249        (int) pa_memblock_get_length(sr->memblock), capacity);
250
251    srh->capacity = sr->rb_read.capacity = sr->rb_write.capacity = capacity;
252
253    sr->rb_read.count = &srh->read_count;
254    sr->rb_write.count = &srh->write_count;
255
256    sr->sem_read = pa_fdsem_new_shm(&srh->read_semdata);
257    if (!sr->sem_read)
258        goto fail;
259
260    sr->sem_write = pa_fdsem_new_shm(&srh->write_semdata);
261    if (!sr->sem_write)
262        goto fail;
263
264    readfd = pa_fdsem_get(sr->sem_read);
265
266#ifdef DEBUG_SRBCHANNEL
267    pa_log("Enabling io event on fd %d", readfd);
268#endif
269
270    sr->read_event = m->io_new(m, readfd, PA_IO_EVENT_INPUT, semread_cb, sr);
271    m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);
272
273    return sr;
274
275fail:
276    pa_srbchannel_free(sr);
277
278    return NULL;
279}
280
281static void pa_srbchannel_swap(pa_srbchannel *sr) {
282    pa_srbchannel temp = *sr;
283
284    sr->sem_read = temp.sem_write;
285    sr->sem_write = temp.sem_read;
286    sr->rb_read = temp.rb_write;
287    sr->rb_write = temp.rb_read;
288}
289
290pa_srbchannel* pa_srbchannel_new_from_template(pa_mainloop_api *m, pa_srbchannel_template *t)
291{
292    int temp;
293    struct srbheader *srh;
294    pa_srbchannel* sr = pa_xmalloc0(sizeof(pa_srbchannel));
295
296    sr->mainloop = m;
297    sr->memblock = t->memblock;
298    pa_memblock_ref(sr->memblock);
299    srh = pa_memblock_acquire(sr->memblock);
300
301    sr->rb_read.capacity = sr->rb_write.capacity = srh->capacity;
302    sr->rb_read.count = &srh->read_count;
303    sr->rb_write.count = &srh->write_count;
304
305    sr->rb_read.memory = (uint8_t*) srh + srh->readbuf_offset;
306    sr->rb_write.memory = (uint8_t*) srh + srh->writebuf_offset;
307
308    sr->sem_read = pa_fdsem_open_shm(&srh->read_semdata, t->readfd);
309    if (!sr->sem_read)
310        goto fail;
311
312    sr->sem_write = pa_fdsem_open_shm(&srh->write_semdata, t->writefd);
313    if (!sr->sem_write)
314        goto fail;
315
316    pa_srbchannel_swap(sr);
317    temp = t->readfd; t->readfd = t->writefd; t->writefd = temp;
318
319#ifdef DEBUG_SRBCHANNEL
320    pa_log("Enabling io event on fd %d", t->readfd);
321#endif
322
323    sr->read_event = m->io_new(m, t->readfd, PA_IO_EVENT_INPUT, semread_cb, sr);
324    m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);
325
326    return sr;
327
328fail:
329    pa_srbchannel_free(sr);
330
331    return NULL;
332}
333
334void pa_srbchannel_export(pa_srbchannel *sr, pa_srbchannel_template *t) {
335    t->memblock = sr->memblock;
336    t->readfd = pa_fdsem_get(sr->sem_read);
337    t->writefd = pa_fdsem_get(sr->sem_write);
338}
339
340void pa_srbchannel_set_callback(pa_srbchannel *sr, pa_srbchannel_cb_t callback, void *userdata) {
341    if (sr->callback)
342        pa_fdsem_after_poll(sr->sem_read);
343
344    sr->callback = callback;
345    sr->cb_userdata = userdata;
346
347    if (sr->callback) {
348        /* If there are events to be read already in the ringbuffer, we will not get any IO event for that,
349           because that's how pa_fdsem works. Therefore check the ringbuffer in a defer event instead. */
350        if (!sr->defer_event)
351            sr->defer_event = sr->mainloop->defer_new(sr->mainloop, defer_cb, sr);
352        sr->mainloop->defer_enable(sr->defer_event, 1);
353    }
354}
355
356void pa_srbchannel_free(pa_srbchannel *sr)
357{
358#ifdef DEBUG_SRBCHANNEL
359    pa_log("Freeing srbchannel");
360#endif
361    pa_assert(sr);
362
363    if (sr->defer_event)
364        sr->mainloop->defer_free(sr->defer_event);
365    if (sr->read_event)
366        sr->mainloop->io_free(sr->read_event);
367
368    if (sr->sem_read)
369        pa_fdsem_free(sr->sem_read);
370    if (sr->sem_write)
371        pa_fdsem_free(sr->sem_write);
372
373    if (sr->memblock) {
374        pa_memblock_release(sr->memblock);
375        pa_memblock_unref(sr->memblock);
376    }
377
378    pa_xfree(sr);
379}
380