1/***
2  This file is part of PulseAudio.
3
4  Copyright 2006 Lennart Poettering
5
6  PulseAudio is free software; you can redistribute it and/or modify
7  it under the terms of the GNU Lesser General Public License as
8  published by the Free Software Foundation; either version 2.1 of the
9  License, or (at your option) any later version.
10
11  PulseAudio is distributed in the hope that it will be useful, but
12  WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  Lesser General Public License for more details.
15
16  You should have received a copy of the GNU Lesser General Public
17  License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18***/
19
20#ifdef HAVE_CONFIG_H
21#include <config.h>
22#endif
23
24#include <unistd.h>
25#include <errno.h>
26
27#include <pulse/xmalloc.h>
28
29#include <pulsecore/macro.h>
30#include <pulsecore/log.h>
31#include <pulsecore/semaphore.h>
32#include <pulsecore/macro.h>
33#include <pulsecore/mutex.h>
34#include <pulsecore/flist.h>
35
36#include "asyncmsgq.h"
37#define PA_SNPRINTF_STR_LENGTH 256
38
39PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree);
40PA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free);
41
42struct asyncmsgq_item {
43    int code;
44    pa_msgobject *object;
45    void *userdata;
46    pa_free_cb_t free_cb;
47    int64_t offset;
48    pa_memchunk memchunk;
49    pa_semaphore *semaphore;
50    int ret;
51};
52
53struct pa_asyncmsgq {
54    PA_REFCNT_DECLARE;
55    pa_asyncq *asyncq;
56    pa_mutex *mutex; /* only for the writer side */
57
58    struct asyncmsgq_item *current;
59};
60
61pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
62    pa_asyncq *asyncq;
63    pa_asyncmsgq *a;
64
65    asyncq = pa_asyncq_new(size);
66    if (!asyncq)
67        return NULL;
68
69    a = pa_xnew(pa_asyncmsgq, 1);
70
71    PA_REFCNT_INIT(a);
72    a->asyncq = asyncq;
73    pa_assert_se(a->mutex = pa_mutex_new(false, true));
74    a->current = NULL;
75
76    return a;
77}
78
79static void asyncmsgq_free(pa_asyncmsgq *a) {
80    struct asyncmsgq_item *i;
81    pa_assert(a);
82
83    while ((i = pa_asyncq_pop(a->asyncq, false))) {
84
85        pa_assert(!i->semaphore);
86
87        if (i->object)
88            pa_msgobject_unref(i->object);
89
90        if (i->memchunk.memblock)
91            pa_memblock_unref(i->memchunk.memblock);
92
93        if (i->free_cb)
94            i->free_cb(i->userdata);
95
96        if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
97            pa_xfree(i);
98    }
99
100    pa_asyncq_free(a->asyncq, NULL);
101    pa_mutex_free(a->mutex);
102    pa_xfree(a);
103}
104
105pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) {
106    pa_assert(PA_REFCNT_VALUE(q) > 0);
107
108    PA_REFCNT_INC(q);
109    return q;
110}
111
112void pa_asyncmsgq_unref(pa_asyncmsgq* q) {
113    pa_assert(PA_REFCNT_VALUE(q) > 0);
114
115    if (PA_REFCNT_DEC(q) <= 0)
116        asyncmsgq_free(q);
117}
118
119void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
120    struct asyncmsgq_item *i;
121    pa_assert(PA_REFCNT_VALUE(a) > 0);
122
123    char t[PA_SNPRINTF_STR_LENGTH] = {0};
124    pa_snprintf(t, sizeof(t), "pa_asyncmsgq_post[%d] %u", code, PaAsyncqGetNumToRead(a->asyncq));
125    CallStart(t);
126
127    if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
128        i = pa_xnew(struct asyncmsgq_item, 1);
129
130    i->code = code;
131    i->object = object ? pa_msgobject_ref(object) : NULL;
132    i->userdata = (void*) userdata;
133    i->free_cb = free_cb;
134    i->offset = offset;
135    if (chunk) {
136        pa_assert(chunk->memblock);
137        i->memchunk = *chunk;
138        pa_memblock_ref(i->memchunk.memblock);
139    } else
140        pa_memchunk_reset(&i->memchunk);
141    i->semaphore = NULL;
142
143    /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
144    pa_mutex_lock(a->mutex);
145    pa_asyncq_post(a->asyncq, i);
146    pa_mutex_unlock(a->mutex);
147    CallEnd();
148}
149
150int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
151    struct asyncmsgq_item i;
152    pa_assert(PA_REFCNT_VALUE(a) > 0);
153
154    char t[PA_SNPRINTF_STR_LENGTH] = {0};
155    pa_snprintf(t, sizeof(t), "pa_asyncmsgq_send[%d] %u", code, PaAsyncqGetNumToRead(a->asyncq));
156    CallStart(t);
157    i.code = code;
158    i.object = object;
159    i.userdata = (void*) userdata;
160    i.free_cb = NULL;
161    i.ret = -1;
162    i.offset = offset;
163    if (chunk) {
164        pa_assert(chunk->memblock);
165        i.memchunk = *chunk;
166    } else
167        pa_memchunk_reset(&i.memchunk);
168
169    if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores))))
170        i.semaphore = pa_semaphore_new(0);
171
172    /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
173    pa_mutex_lock(a->mutex);
174    pa_assert_se(pa_asyncq_push(a->asyncq, &i, true) == 0);
175    pa_mutex_unlock(a->mutex);
176
177    pa_semaphore_wait(i.semaphore);
178
179    if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0)
180        pa_semaphore_free(i.semaphore);
181    CallEnd();
182    return i.ret;
183}
184
185int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, bool wait_op) {
186    pa_assert(PA_REFCNT_VALUE(a) > 0);
187    pa_assert(!a->current);
188
189    if (!(a->current = pa_asyncq_pop(a->asyncq, wait_op))) {
190/*         pa_log("failure"); */
191        return -1;
192    }
193
194/*     pa_log("success"); */
195
196    char t[PA_SNPRINTF_STR_LENGTH] = {0};
197    pa_snprintf(t, sizeof(t), "pa_asyncmsgq_get[%d] %u", a->current->code, PaAsyncqGetNumToRead(a->asyncq));
198    CallStart(t);
199    if (code)
200        *code = a->current->code;
201    if (userdata)
202        *userdata = a->current->userdata;
203    if (offset)
204        *offset = a->current->offset;
205    if (object) {
206        if ((*object = a->current->object))
207            pa_msgobject_assert_ref(*object);
208    }
209    if (chunk)
210        *chunk = a->current->memchunk;
211
212/*     pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */
213/*                  (void*) a, */
214/*                  (void*) a->current->object, */
215/*                  a->current->object ? a->current->object->parent.type_name : NULL, */
216/*                  a->current->code, */
217/*                  (void*) a->current->userdata, */
218/*                  (unsigned long) a->current->memchunk.length); */
219
220    CallEnd();
221    return 0;
222}
223
224void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
225    pa_assert(PA_REFCNT_VALUE(a) > 0);
226    pa_assert(a);
227    pa_assert(a->current);
228
229    if (a->current->semaphore) {
230        a->current->ret = ret;
231        pa_semaphore_post(a->current->semaphore);
232    } else {
233
234        if (a->current->free_cb)
235            a->current->free_cb(a->current->userdata);
236
237        if (a->current->object)
238            pa_msgobject_unref(a->current->object);
239
240        if (a->current->memchunk.memblock)
241            pa_memblock_unref(a->current->memchunk.memblock);
242
243        if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
244            pa_xfree(a->current);
245    }
246
247    a->current = NULL;
248}
249
250int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
251    int c;
252    pa_assert(PA_REFCNT_VALUE(a) > 0);
253
254    pa_asyncmsgq_ref(a);
255
256    do {
257        pa_msgobject *o;
258        void *data;
259        int64_t offset;
260        pa_memchunk chunk;
261        int ret;
262
263        if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, true) < 0)
264            return -1;
265
266        ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
267        pa_asyncmsgq_done(a, ret);
268
269    } while (c != code);
270
271    pa_asyncmsgq_unref(a);
272
273    return 0;
274}
275
276int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
277    pa_msgobject *object;
278    int code;
279    void *data;
280    pa_memchunk chunk;
281    int64_t offset;
282    int ret;
283
284    pa_assert(PA_REFCNT_VALUE(a) > 0);
285
286    if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0)
287        return 0;
288
289    pa_asyncmsgq_ref(a);
290    ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
291    pa_asyncmsgq_done(a, ret);
292    pa_asyncmsgq_unref(a);
293
294    return 1;
295}
296
297int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
298    pa_assert(PA_REFCNT_VALUE(a) > 0);
299
300    return pa_asyncq_read_fd(a->asyncq);
301}
302
303int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
304    pa_assert(PA_REFCNT_VALUE(a) > 0);
305
306    return pa_asyncq_read_before_poll(a->asyncq);
307}
308
309void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
310    pa_assert(PA_REFCNT_VALUE(a) > 0);
311
312    pa_asyncq_read_after_poll(a->asyncq);
313}
314
315int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
316    pa_assert(PA_REFCNT_VALUE(a) > 0);
317
318    return pa_asyncq_write_fd(a->asyncq);
319}
320
321void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
322    pa_assert(PA_REFCNT_VALUE(a) > 0);
323
324    pa_asyncq_write_before_poll(a->asyncq);
325}
326
327void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
328    pa_assert(PA_REFCNT_VALUE(a) > 0);
329
330    pa_asyncq_write_after_poll(a->asyncq);
331}
332
333int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
334
335    if (object)
336        return object->process_msg(object, code, userdata, offset, pa_memchunk_isset(memchunk) ? memchunk : NULL);
337
338    return 0;
339}
340
341void pa_asyncmsgq_flush(pa_asyncmsgq *a, bool run) {
342    pa_assert(PA_REFCNT_VALUE(a) > 0);
343
344    for (;;) {
345        pa_msgobject *object;
346        int code;
347        void *data;
348        int64_t offset;
349        pa_memchunk chunk;
350        int ret;
351
352        if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0)
353            return;
354
355        if (!run) {
356            pa_asyncmsgq_done(a, -1);
357            continue;
358        }
359
360        pa_asyncmsgq_ref(a);
361        ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
362        pa_asyncmsgq_done(a, ret);
363        pa_asyncmsgq_unref(a);
364    }
365}
366
367bool pa_asyncmsgq_dispatching(pa_asyncmsgq *a) {
368    pa_assert(PA_REFCNT_VALUE(a) > 0);
369
370    return !!a->current;
371}
372