1/*** 2 This file is part of PulseAudio. 3 4 Copyright 2006 Lennart Poettering 5 6 PulseAudio is free software; you can redistribute it and/or modify 7 it under the terms of the GNU Lesser General Public License as 8 published by the Free Software Foundation; either version 2.1 of the 9 License, or (at your option) any later version. 10 11 PulseAudio is distributed in the hope that it will be useful, but 12 WITHOUT ANY WARRANTY; without even the implied warranty of 13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 Lesser General Public License for more details. 15 16 You should have received a copy of the GNU Lesser General Public 17 License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 18***/ 19 20#ifdef HAVE_CONFIG_H 21#include <config.h> 22#endif 23 24#include <unistd.h> 25#include <errno.h> 26 27#include <pulse/xmalloc.h> 28 29#include <pulsecore/macro.h> 30#include <pulsecore/log.h> 31#include <pulsecore/semaphore.h> 32#include <pulsecore/macro.h> 33#include <pulsecore/mutex.h> 34#include <pulsecore/flist.h> 35 36#include "asyncmsgq.h" 37#define PA_SNPRINTF_STR_LENGTH 256 38 39PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree); 40PA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free); 41 42struct asyncmsgq_item { 43 int code; 44 pa_msgobject *object; 45 void *userdata; 46 pa_free_cb_t free_cb; 47 int64_t offset; 48 pa_memchunk memchunk; 49 pa_semaphore *semaphore; 50 int ret; 51}; 52 53struct pa_asyncmsgq { 54 PA_REFCNT_DECLARE; 55 pa_asyncq *asyncq; 56 pa_mutex *mutex; /* only for the writer side */ 57 58 struct asyncmsgq_item *current; 59}; 60 61pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) { 62 pa_asyncq *asyncq; 63 pa_asyncmsgq *a; 64 65 asyncq = pa_asyncq_new(size); 66 if (!asyncq) 67 return NULL; 68 69 a = pa_xnew(pa_asyncmsgq, 1); 70 71 PA_REFCNT_INIT(a); 72 a->asyncq = asyncq; 73 pa_assert_se(a->mutex = pa_mutex_new(false, true)); 74 a->current = NULL; 75 76 return a; 77} 78 79static void asyncmsgq_free(pa_asyncmsgq *a) { 80 struct asyncmsgq_item *i; 81 pa_assert(a); 82 83 while ((i = pa_asyncq_pop(a->asyncq, false))) { 84 85 pa_assert(!i->semaphore); 86 87 if (i->object) 88 pa_msgobject_unref(i->object); 89 90 if (i->memchunk.memblock) 91 pa_memblock_unref(i->memchunk.memblock); 92 93 if (i->free_cb) 94 i->free_cb(i->userdata); 95 96 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0) 97 pa_xfree(i); 98 } 99 100 pa_asyncq_free(a->asyncq, NULL); 101 pa_mutex_free(a->mutex); 102 pa_xfree(a); 103} 104 105pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) { 106 pa_assert(PA_REFCNT_VALUE(q) > 0); 107 108 PA_REFCNT_INC(q); 109 return q; 110} 111 112void pa_asyncmsgq_unref(pa_asyncmsgq* q) { 113 pa_assert(PA_REFCNT_VALUE(q) > 0); 114 115 if (PA_REFCNT_DEC(q) <= 0) 116 asyncmsgq_free(q); 117} 118 119void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) { 120 struct asyncmsgq_item *i; 121 pa_assert(PA_REFCNT_VALUE(a) > 0); 122 123 char t[PA_SNPRINTF_STR_LENGTH] = {0}; 124 pa_snprintf(t, sizeof(t), "pa_asyncmsgq_post[%d] %u", code, PaAsyncqGetNumToRead(a->asyncq)); 125 CallStart(t); 126 127 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq)))) 128 i = pa_xnew(struct asyncmsgq_item, 1); 129 130 i->code = code; 131 i->object = object ? pa_msgobject_ref(object) : NULL; 132 i->userdata = (void*) userdata; 133 i->free_cb = free_cb; 134 i->offset = offset; 135 if (chunk) { 136 pa_assert(chunk->memblock); 137 i->memchunk = *chunk; 138 pa_memblock_ref(i->memchunk.memblock); 139 } else 140 pa_memchunk_reset(&i->memchunk); 141 i->semaphore = NULL; 142 143 /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */ 144 pa_mutex_lock(a->mutex); 145 pa_asyncq_post(a->asyncq, i); 146 pa_mutex_unlock(a->mutex); 147 CallEnd(); 148} 149 150int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) { 151 struct asyncmsgq_item i; 152 pa_assert(PA_REFCNT_VALUE(a) > 0); 153 154 char t[PA_SNPRINTF_STR_LENGTH] = {0}; 155 pa_snprintf(t, sizeof(t), "pa_asyncmsgq_send[%d] %u", code, PaAsyncqGetNumToRead(a->asyncq)); 156 CallStart(t); 157 i.code = code; 158 i.object = object; 159 i.userdata = (void*) userdata; 160 i.free_cb = NULL; 161 i.ret = -1; 162 i.offset = offset; 163 if (chunk) { 164 pa_assert(chunk->memblock); 165 i.memchunk = *chunk; 166 } else 167 pa_memchunk_reset(&i.memchunk); 168 169 if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores)))) 170 i.semaphore = pa_semaphore_new(0); 171 172 /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */ 173 pa_mutex_lock(a->mutex); 174 pa_assert_se(pa_asyncq_push(a->asyncq, &i, true) == 0); 175 pa_mutex_unlock(a->mutex); 176 177 pa_semaphore_wait(i.semaphore); 178 179 if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0) 180 pa_semaphore_free(i.semaphore); 181 CallEnd(); 182 return i.ret; 183} 184 185int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, bool wait_op) { 186 pa_assert(PA_REFCNT_VALUE(a) > 0); 187 pa_assert(!a->current); 188 189 if (!(a->current = pa_asyncq_pop(a->asyncq, wait_op))) { 190/* pa_log("failure"); */ 191 return -1; 192 } 193 194/* pa_log("success"); */ 195 196 char t[PA_SNPRINTF_STR_LENGTH] = {0}; 197 pa_snprintf(t, sizeof(t), "pa_asyncmsgq_get[%d] %u", a->current->code, PaAsyncqGetNumToRead(a->asyncq)); 198 CallStart(t); 199 if (code) 200 *code = a->current->code; 201 if (userdata) 202 *userdata = a->current->userdata; 203 if (offset) 204 *offset = a->current->offset; 205 if (object) { 206 if ((*object = a->current->object)) 207 pa_msgobject_assert_ref(*object); 208 } 209 if (chunk) 210 *chunk = a->current->memchunk; 211 212/* pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */ 213/* (void*) a, */ 214/* (void*) a->current->object, */ 215/* a->current->object ? a->current->object->parent.type_name : NULL, */ 216/* a->current->code, */ 217/* (void*) a->current->userdata, */ 218/* (unsigned long) a->current->memchunk.length); */ 219 220 CallEnd(); 221 return 0; 222} 223 224void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) { 225 pa_assert(PA_REFCNT_VALUE(a) > 0); 226 pa_assert(a); 227 pa_assert(a->current); 228 229 if (a->current->semaphore) { 230 a->current->ret = ret; 231 pa_semaphore_post(a->current->semaphore); 232 } else { 233 234 if (a->current->free_cb) 235 a->current->free_cb(a->current->userdata); 236 237 if (a->current->object) 238 pa_msgobject_unref(a->current->object); 239 240 if (a->current->memchunk.memblock) 241 pa_memblock_unref(a->current->memchunk.memblock); 242 243 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0) 244 pa_xfree(a->current); 245 } 246 247 a->current = NULL; 248} 249 250int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) { 251 int c; 252 pa_assert(PA_REFCNT_VALUE(a) > 0); 253 254 pa_asyncmsgq_ref(a); 255 256 do { 257 pa_msgobject *o; 258 void *data; 259 int64_t offset; 260 pa_memchunk chunk; 261 int ret; 262 263 if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, true) < 0) 264 return -1; 265 266 ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk); 267 pa_asyncmsgq_done(a, ret); 268 269 } while (c != code); 270 271 pa_asyncmsgq_unref(a); 272 273 return 0; 274} 275 276int pa_asyncmsgq_process_one(pa_asyncmsgq *a) { 277 pa_msgobject *object; 278 int code; 279 void *data; 280 pa_memchunk chunk; 281 int64_t offset; 282 int ret; 283 284 pa_assert(PA_REFCNT_VALUE(a) > 0); 285 286 if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0) 287 return 0; 288 289 pa_asyncmsgq_ref(a); 290 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); 291 pa_asyncmsgq_done(a, ret); 292 pa_asyncmsgq_unref(a); 293 294 return 1; 295} 296 297int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) { 298 pa_assert(PA_REFCNT_VALUE(a) > 0); 299 300 return pa_asyncq_read_fd(a->asyncq); 301} 302 303int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) { 304 pa_assert(PA_REFCNT_VALUE(a) > 0); 305 306 return pa_asyncq_read_before_poll(a->asyncq); 307} 308 309void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) { 310 pa_assert(PA_REFCNT_VALUE(a) > 0); 311 312 pa_asyncq_read_after_poll(a->asyncq); 313} 314 315int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) { 316 pa_assert(PA_REFCNT_VALUE(a) > 0); 317 318 return pa_asyncq_write_fd(a->asyncq); 319} 320 321void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) { 322 pa_assert(PA_REFCNT_VALUE(a) > 0); 323 324 pa_asyncq_write_before_poll(a->asyncq); 325} 326 327void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) { 328 pa_assert(PA_REFCNT_VALUE(a) > 0); 329 330 pa_asyncq_write_after_poll(a->asyncq); 331} 332 333int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) { 334 335 if (object) 336 return object->process_msg(object, code, userdata, offset, pa_memchunk_isset(memchunk) ? memchunk : NULL); 337 338 return 0; 339} 340 341void pa_asyncmsgq_flush(pa_asyncmsgq *a, bool run) { 342 pa_assert(PA_REFCNT_VALUE(a) > 0); 343 344 for (;;) { 345 pa_msgobject *object; 346 int code; 347 void *data; 348 int64_t offset; 349 pa_memchunk chunk; 350 int ret; 351 352 if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0) 353 return; 354 355 if (!run) { 356 pa_asyncmsgq_done(a, -1); 357 continue; 358 } 359 360 pa_asyncmsgq_ref(a); 361 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); 362 pa_asyncmsgq_done(a, ret); 363 pa_asyncmsgq_unref(a); 364 } 365} 366 367bool pa_asyncmsgq_dispatching(pa_asyncmsgq *a) { 368 pa_assert(PA_REFCNT_VALUE(a) > 0); 369 370 return !!a->current; 371} 372