1/***
2  This file is part of PulseAudio.
3
4  Copyright 2006-2008 Lennart Poettering
5
6  PulseAudio is free software; you can redistribute it and/or modify
7  it under the terms of the GNU Lesser General Public License as
8  published by the Free Software Foundation; either version 2.1 of the
9  License, or (at your option) any later version.
10
11  PulseAudio is distributed in the hope that it will be useful, but
12  WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  Lesser General Public License for more details.
15
16  You should have received a copy of the GNU Lesser General Public
17  License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18***/
19
20#ifdef HAVE_CONFIG_H
21#include <config.h>
22#endif
23
24#include <unistd.h>
25#include <errno.h>
26
27#include <pulse/xmalloc.h>
28
29#include <pulsecore/atomic.h>
30#include <pulsecore/log.h>
31#include <pulsecore/thread.h>
32#include <pulsecore/macro.h>
33#include <pulsecore/core-util.h>
34#include <pulsecore/llist.h>
35#include <pulsecore/flist.h>
36#include <pulsecore/fdsem.h>
37
38#include "asyncq.h"
39
40#define ASYNCQ_SIZE 256
41
42/* For debugging purposes we can define _Y to put an extra thread
43 * yield between each operation. */
44
45/* #define PROFILE */
46
47#ifdef PROFILE
48#define _Y pa_thread_yield()
49#else
50#define _Y do { } while(0)
51#endif
52
53struct localq {
54    void *data;
55    PA_LLIST_FIELDS(struct localq);
56};
57
58struct pa_asyncq {
59    unsigned size;
60    unsigned read_idx;
61    unsigned write_idx;
62    pa_fdsem *read_fdsem, *write_fdsem;
63
64    PA_LLIST_HEAD(struct localq, localq);
65    struct localq *last_localq;
66    bool waiting_for_post;
67};
68
69PA_STATIC_FLIST_DECLARE(localq, 0, pa_xfree);
70
71#define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
72
73static unsigned reduce(pa_asyncq *l, unsigned value) {
74    return value & (unsigned) (l->size - 1);
75}
76
77unsigned PaAsyncqGetNumToRead(pa_asyncq *l)
78{
79    return l->write_idx - l->read_idx;
80}
81
82pa_asyncq *pa_asyncq_new(unsigned size) {
83    pa_asyncq *l;
84
85    if (!size)
86        size = ASYNCQ_SIZE;
87
88    pa_assert(pa_is_power_of_two(size));
89
90    l = pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq)) + (sizeof(pa_atomic_ptr_t) * size));
91
92    l->size = size;
93
94    PA_LLIST_HEAD_INIT(struct localq, l->localq);
95    l->last_localq = NULL;
96    l->waiting_for_post = false;
97
98    if (!(l->read_fdsem = pa_fdsem_new())) {
99        pa_xfree(l);
100        return NULL;
101    }
102
103    if (!(l->write_fdsem = pa_fdsem_new())) {
104        pa_fdsem_free(l->read_fdsem);
105        pa_xfree(l);
106        return NULL;
107    }
108
109    return l;
110}
111
112void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
113    struct localq *q;
114    pa_assert(l);
115
116    if (free_cb) {
117        void *p;
118
119        while ((p = pa_asyncq_pop(l, 0)))
120            free_cb(p);
121    }
122
123    while ((q = l->localq)) {
124        if (free_cb)
125            free_cb(q->data);
126
127        PA_LLIST_REMOVE(struct localq, l->localq, q);
128
129        if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
130            pa_xfree(q);
131    }
132
133    pa_fdsem_free(l->read_fdsem);
134    pa_fdsem_free(l->write_fdsem);
135    pa_xfree(l);
136}
137
138static int push(pa_asyncq*l, void *p, bool wait_op) {
139    unsigned idx;
140    pa_atomic_ptr_t *cells;
141
142    pa_assert(l);
143    pa_assert(p);
144
145    cells = PA_ASYNCQ_CELLS(l);
146
147    _Y;
148    idx = reduce(l, l->write_idx);
149
150    if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) {
151
152        if (!wait_op)
153            return -1;
154
155/*         pa_log("sleeping on push"); */
156
157        do {
158            pa_fdsem_wait(l->read_fdsem);
159        } while (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p));
160    }
161
162    _Y;
163    l->write_idx++;
164
165    pa_fdsem_post(l->write_fdsem);
166
167    return 0;
168}
169
170static bool flush_postq(pa_asyncq *l, bool wait_op) {
171    struct localq *q;
172
173    pa_assert(l);
174
175    while ((q = l->last_localq)) {
176
177        if (push(l, q->data, wait_op) < 0)
178            return false;
179
180        l->last_localq = q->prev;
181
182        PA_LLIST_REMOVE(struct localq, l->localq, q);
183
184        if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
185            pa_xfree(q);
186    }
187
188    return true;
189}
190
191int pa_asyncq_push(pa_asyncq*l, void *p, bool wait_op) {
192    pa_assert(l);
193
194    if (!flush_postq(l, wait_op))
195        return -1;
196
197    return push(l, p, wait_op);
198}
199
200void pa_asyncq_post(pa_asyncq*l, void *p) {
201    struct localq *q;
202
203    pa_assert(l);
204    pa_assert(p);
205
206    if (flush_postq(l, false))
207        if (pa_asyncq_push(l, p, false) >= 0)
208            return;
209
210    /* OK, we couldn't push anything in the queue. So let's queue it
211     * locally and push it later */
212
213    if (pa_log_ratelimit(PA_LOG_WARN))
214        pa_log_warn("q overrun, queuing locally");
215
216    if (!(q = pa_flist_pop(PA_STATIC_FLIST_GET(localq))))
217        q = pa_xnew(struct localq, 1);
218
219    q->data = p;
220    PA_LLIST_PREPEND(struct localq, l->localq, q);
221
222    if (!l->last_localq)
223        l->last_localq = q;
224
225    return;
226}
227
228void* pa_asyncq_pop(pa_asyncq*l, bool wait_op) {
229    unsigned idx;
230    void *ret;
231    pa_atomic_ptr_t *cells;
232
233    pa_assert(l);
234
235    cells = PA_ASYNCQ_CELLS(l);
236
237    _Y;
238    idx = reduce(l, l->read_idx);
239
240    if (!(ret = pa_atomic_ptr_load(&cells[idx]))) {
241
242        if (!wait_op)
243            return NULL;
244
245/*         pa_log("sleeping on pop"); */
246
247        do {
248            pa_fdsem_wait(l->write_fdsem);
249        } while (!(ret = pa_atomic_ptr_load(&cells[idx])));
250    }
251
252    pa_assert(ret);
253
254    /* Guaranteed to succeed if we only have a single reader */
255    pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL));
256
257    _Y;
258    l->read_idx++;
259
260    pa_fdsem_post(l->read_fdsem);
261
262    return ret;
263}
264
265int pa_asyncq_read_fd(pa_asyncq *q) {
266    pa_assert(q);
267
268    return pa_fdsem_get(q->write_fdsem);
269}
270
271int pa_asyncq_read_before_poll(pa_asyncq *l) {
272    unsigned idx;
273    pa_atomic_ptr_t *cells;
274
275    pa_assert(l);
276
277    cells = PA_ASYNCQ_CELLS(l);
278
279    _Y;
280    idx = reduce(l, l->read_idx);
281
282    for (;;) {
283        if (pa_atomic_ptr_load(&cells[idx]))
284            return -1;
285
286        if (pa_fdsem_before_poll(l->write_fdsem) >= 0)
287            return 0;
288    }
289}
290
291void pa_asyncq_read_after_poll(pa_asyncq *l) {
292    pa_assert(l);
293
294    pa_fdsem_after_poll(l->write_fdsem);
295}
296
297int pa_asyncq_write_fd(pa_asyncq *q) {
298    pa_assert(q);
299
300    return pa_fdsem_get(q->read_fdsem);
301}
302
303void pa_asyncq_write_before_poll(pa_asyncq *l) {
304    pa_assert(l);
305
306    for (;;) {
307
308        if (flush_postq(l, false))
309            break;
310
311        if (pa_fdsem_before_poll(l->read_fdsem) >= 0) {
312            l->waiting_for_post = true;
313            break;
314        }
315    }
316}
317
318void pa_asyncq_write_after_poll(pa_asyncq *l) {
319    pa_assert(l);
320
321    if (l->waiting_for_post) {
322        pa_fdsem_after_poll(l->read_fdsem);
323        l->waiting_for_post = false;
324    }
325}
326