1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2006 Lennart Poettering
5 
6   PulseAudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as
8   published by the Free Software Foundation; either version 2.1 of the
9   License, or (at your option) any later version.
10 
11   PulseAudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   Lesser General Public License for more details.
15 
16   You should have received a copy of the GNU Lesser General Public
17   License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19 
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 
24 #include <unistd.h>
25 #include <errno.h>
26 
27 #include <pulse/xmalloc.h>
28 
29 #include <pulsecore/macro.h>
30 #include <pulsecore/log.h>
31 #include <pulsecore/semaphore.h>
32 #include <pulsecore/macro.h>
33 #include <pulsecore/mutex.h>
34 #include <pulsecore/flist.h>
35 
36 #include "asyncmsgq.h"
37 #define PA_SNPRINTF_STR_LENGTH 256
38 
39 PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree);
40 PA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free);
41 
42 struct asyncmsgq_item {
43     int code;
44     pa_msgobject *object;
45     void *userdata;
46     pa_free_cb_t free_cb;
47     int64_t offset;
48     pa_memchunk memchunk;
49     pa_semaphore *semaphore;
50     int ret;
51 };
52 
53 struct pa_asyncmsgq {
54     PA_REFCNT_DECLARE;
55     pa_asyncq *asyncq;
56     pa_mutex *mutex; /* only for the writer side */
57 
58     struct asyncmsgq_item *current;
59 };
60 
pa_asyncmsgq_new(unsigned size)61 pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
62     pa_asyncq *asyncq;
63     pa_asyncmsgq *a;
64 
65     asyncq = pa_asyncq_new(size);
66     if (!asyncq)
67         return NULL;
68 
69     a = pa_xnew(pa_asyncmsgq, 1);
70 
71     PA_REFCNT_INIT(a);
72     a->asyncq = asyncq;
73     pa_assert_se(a->mutex = pa_mutex_new(false, true));
74     a->current = NULL;
75 
76     return a;
77 }
78 
asyncmsgq_free(pa_asyncmsgq *a)79 static void asyncmsgq_free(pa_asyncmsgq *a) {
80     struct asyncmsgq_item *i;
81     pa_assert(a);
82 
83     while ((i = pa_asyncq_pop(a->asyncq, false))) {
84 
85         pa_assert(!i->semaphore);
86 
87         if (i->object)
88             pa_msgobject_unref(i->object);
89 
90         if (i->memchunk.memblock)
91             pa_memblock_unref(i->memchunk.memblock);
92 
93         if (i->free_cb)
94             i->free_cb(i->userdata);
95 
96         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
97             pa_xfree(i);
98     }
99 
100     pa_asyncq_free(a->asyncq, NULL);
101     pa_mutex_free(a->mutex);
102     pa_xfree(a);
103 }
104 
pa_asyncmsgq_ref(pa_asyncmsgq *q)105 pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) {
106     pa_assert(PA_REFCNT_VALUE(q) > 0);
107 
108     PA_REFCNT_INC(q);
109     return q;
110 }
111 
pa_asyncmsgq_unref(pa_asyncmsgq* q)112 void pa_asyncmsgq_unref(pa_asyncmsgq* q) {
113     pa_assert(PA_REFCNT_VALUE(q) > 0);
114 
115     if (PA_REFCNT_DEC(q) <= 0)
116         asyncmsgq_free(q);
117 }
118 
pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb)119 void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
120     struct asyncmsgq_item *i;
121     pa_assert(PA_REFCNT_VALUE(a) > 0);
122 
123     char t[PA_SNPRINTF_STR_LENGTH] = {0};
124     pa_snprintf(t, sizeof(t), "pa_asyncmsgq_post[%d] %u", code, PaAsyncqGetNumToRead(a->asyncq));
125     CallStart(t);
126 
127     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
128         i = pa_xnew(struct asyncmsgq_item, 1);
129 
130     i->code = code;
131     i->object = object ? pa_msgobject_ref(object) : NULL;
132     i->userdata = (void*) userdata;
133     i->free_cb = free_cb;
134     i->offset = offset;
135     if (chunk) {
136         pa_assert(chunk->memblock);
137         i->memchunk = *chunk;
138         pa_memblock_ref(i->memchunk.memblock);
139     } else
140         pa_memchunk_reset(&i->memchunk);
141     i->semaphore = NULL;
142 
143     /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
144     pa_mutex_lock(a->mutex);
145     pa_asyncq_post(a->asyncq, i);
146     pa_mutex_unlock(a->mutex);
147     CallEnd();
148 }
149 
pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk)150 int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
151     struct asyncmsgq_item i;
152     pa_assert(PA_REFCNT_VALUE(a) > 0);
153 
154     char t[PA_SNPRINTF_STR_LENGTH] = {0};
155     pa_snprintf(t, sizeof(t), "pa_asyncmsgq_send[%d] %u", code, PaAsyncqGetNumToRead(a->asyncq));
156     CallStart(t);
157     i.code = code;
158     i.object = object;
159     i.userdata = (void*) userdata;
160     i.free_cb = NULL;
161     i.ret = -1;
162     i.offset = offset;
163     if (chunk) {
164         pa_assert(chunk->memblock);
165         i.memchunk = *chunk;
166     } else
167         pa_memchunk_reset(&i.memchunk);
168 
169     if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores))))
170         i.semaphore = pa_semaphore_new(0);
171 
172     /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
173     pa_mutex_lock(a->mutex);
174     pa_assert_se(pa_asyncq_push(a->asyncq, &i, true) == 0);
175     pa_mutex_unlock(a->mutex);
176 
177     pa_semaphore_wait(i.semaphore);
178 
179     if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0)
180         pa_semaphore_free(i.semaphore);
181     CallEnd();
182     return i.ret;
183 }
184 
pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, bool wait_op)185 int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, bool wait_op) {
186     pa_assert(PA_REFCNT_VALUE(a) > 0);
187     pa_assert(!a->current);
188 
189     if (!(a->current = pa_asyncq_pop(a->asyncq, wait_op))) {
190 /*         pa_log("failure"); */
191         return -1;
192     }
193 
194 /*     pa_log("success"); */
195 
196     char t[PA_SNPRINTF_STR_LENGTH] = {0};
197     pa_snprintf(t, sizeof(t), "pa_asyncmsgq_get[%d] %u", a->current->code, PaAsyncqGetNumToRead(a->asyncq));
198     CallStart(t);
199     if (code)
200         *code = a->current->code;
201     if (userdata)
202         *userdata = a->current->userdata;
203     if (offset)
204         *offset = a->current->offset;
205     if (object) {
206         if ((*object = a->current->object))
207             pa_msgobject_assert_ref(*object);
208     }
209     if (chunk)
210         *chunk = a->current->memchunk;
211 
212 /*     pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */
213 /*                  (void*) a, */
214 /*                  (void*) a->current->object, */
215 /*                  a->current->object ? a->current->object->parent.type_name : NULL, */
216 /*                  a->current->code, */
217 /*                  (void*) a->current->userdata, */
218 /*                  (unsigned long) a->current->memchunk.length); */
219 
220     CallEnd();
221     return 0;
222 }
223 
pa_asyncmsgq_done(pa_asyncmsgq *a, int ret)224 void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
225     pa_assert(PA_REFCNT_VALUE(a) > 0);
226     pa_assert(a);
227     pa_assert(a->current);
228 
229     if (a->current->semaphore) {
230         a->current->ret = ret;
231         pa_semaphore_post(a->current->semaphore);
232     } else {
233 
234         if (a->current->free_cb)
235             a->current->free_cb(a->current->userdata);
236 
237         if (a->current->object)
238             pa_msgobject_unref(a->current->object);
239 
240         if (a->current->memchunk.memblock)
241             pa_memblock_unref(a->current->memchunk.memblock);
242 
243         if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
244             pa_xfree(a->current);
245     }
246 
247     a->current = NULL;
248 }
249 
pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code)250 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
251     int c;
252     pa_assert(PA_REFCNT_VALUE(a) > 0);
253 
254     pa_asyncmsgq_ref(a);
255 
256     do {
257         pa_msgobject *o;
258         void *data;
259         int64_t offset;
260         pa_memchunk chunk;
261         int ret;
262 
263         if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, true) < 0)
264             return -1;
265 
266         ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
267         pa_asyncmsgq_done(a, ret);
268 
269     } while (c != code);
270 
271     pa_asyncmsgq_unref(a);
272 
273     return 0;
274 }
275 
pa_asyncmsgq_process_one(pa_asyncmsgq *a)276 int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
277     pa_msgobject *object;
278     int code;
279     void *data;
280     pa_memchunk chunk;
281     int64_t offset;
282     int ret;
283 
284     pa_assert(PA_REFCNT_VALUE(a) > 0);
285 
286     if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0)
287         return 0;
288 
289     pa_asyncmsgq_ref(a);
290     ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
291     pa_asyncmsgq_done(a, ret);
292     pa_asyncmsgq_unref(a);
293 
294     return 1;
295 }
296 
pa_asyncmsgq_read_fd(pa_asyncmsgq *a)297 int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
298     pa_assert(PA_REFCNT_VALUE(a) > 0);
299 
300     return pa_asyncq_read_fd(a->asyncq);
301 }
302 
pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a)303 int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
304     pa_assert(PA_REFCNT_VALUE(a) > 0);
305 
306     return pa_asyncq_read_before_poll(a->asyncq);
307 }
308 
pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a)309 void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
310     pa_assert(PA_REFCNT_VALUE(a) > 0);
311 
312     pa_asyncq_read_after_poll(a->asyncq);
313 }
314 
pa_asyncmsgq_write_fd(pa_asyncmsgq *a)315 int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
316     pa_assert(PA_REFCNT_VALUE(a) > 0);
317 
318     return pa_asyncq_write_fd(a->asyncq);
319 }
320 
pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a)321 void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
322     pa_assert(PA_REFCNT_VALUE(a) > 0);
323 
324     pa_asyncq_write_before_poll(a->asyncq);
325 }
326 
pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a)327 void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
328     pa_assert(PA_REFCNT_VALUE(a) > 0);
329 
330     pa_asyncq_write_after_poll(a->asyncq);
331 }
332 
pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk)333 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
334 
335     if (object)
336         return object->process_msg(object, code, userdata, offset, pa_memchunk_isset(memchunk) ? memchunk : NULL);
337 
338     return 0;
339 }
340 
pa_asyncmsgq_flush(pa_asyncmsgq *a, bool run)341 void pa_asyncmsgq_flush(pa_asyncmsgq *a, bool run) {
342     pa_assert(PA_REFCNT_VALUE(a) > 0);
343 
344     for (;;) {
345         pa_msgobject *object;
346         int code;
347         void *data;
348         int64_t offset;
349         pa_memchunk chunk;
350         int ret;
351 
352         if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0)
353             return;
354 
355         if (!run) {
356             pa_asyncmsgq_done(a, -1);
357             continue;
358         }
359 
360         pa_asyncmsgq_ref(a);
361         ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
362         pa_asyncmsgq_done(a, ret);
363         pa_asyncmsgq_unref(a);
364     }
365 }
366 
pa_asyncmsgq_dispatching(pa_asyncmsgq *a)367 bool pa_asyncmsgq_dispatching(pa_asyncmsgq *a) {
368     pa_assert(PA_REFCNT_VALUE(a) > 0);
369 
370     return !!a->current;
371 }
372