153a5a1b3Sopenharmony_ci/*** 253a5a1b3Sopenharmony_ci This file is part of PulseAudio. 353a5a1b3Sopenharmony_ci 453a5a1b3Sopenharmony_ci Copyright 2006 Lennart Poettering 553a5a1b3Sopenharmony_ci 653a5a1b3Sopenharmony_ci PulseAudio is free software; you can redistribute it and/or modify 753a5a1b3Sopenharmony_ci it under the terms of the GNU Lesser General Public License as 853a5a1b3Sopenharmony_ci published by the Free Software Foundation; either version 2.1 of the 953a5a1b3Sopenharmony_ci License, or (at your option) any later version. 1053a5a1b3Sopenharmony_ci 1153a5a1b3Sopenharmony_ci PulseAudio is distributed in the hope that it will be useful, but 1253a5a1b3Sopenharmony_ci WITHOUT ANY WARRANTY; without even the implied warranty of 1353a5a1b3Sopenharmony_ci MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 1453a5a1b3Sopenharmony_ci Lesser General Public License for more details. 1553a5a1b3Sopenharmony_ci 1653a5a1b3Sopenharmony_ci You should have received a copy of the GNU Lesser General Public 1753a5a1b3Sopenharmony_ci License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 1853a5a1b3Sopenharmony_ci***/ 1953a5a1b3Sopenharmony_ci 2053a5a1b3Sopenharmony_ci#ifdef HAVE_CONFIG_H 2153a5a1b3Sopenharmony_ci#include <config.h> 2253a5a1b3Sopenharmony_ci#endif 2353a5a1b3Sopenharmony_ci 2453a5a1b3Sopenharmony_ci#include <unistd.h> 2553a5a1b3Sopenharmony_ci#include <errno.h> 2653a5a1b3Sopenharmony_ci 2753a5a1b3Sopenharmony_ci#include <pulse/xmalloc.h> 2853a5a1b3Sopenharmony_ci 2953a5a1b3Sopenharmony_ci#include <pulsecore/macro.h> 3053a5a1b3Sopenharmony_ci#include <pulsecore/log.h> 3153a5a1b3Sopenharmony_ci#include <pulsecore/semaphore.h> 3253a5a1b3Sopenharmony_ci#include <pulsecore/macro.h> 3353a5a1b3Sopenharmony_ci#include <pulsecore/mutex.h> 3453a5a1b3Sopenharmony_ci#include <pulsecore/flist.h> 3553a5a1b3Sopenharmony_ci 3653a5a1b3Sopenharmony_ci#include "asyncmsgq.h" 3753a5a1b3Sopenharmony_ci#define PA_SNPRINTF_STR_LENGTH 256 3853a5a1b3Sopenharmony_ci 3953a5a1b3Sopenharmony_ciPA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree); 4053a5a1b3Sopenharmony_ciPA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free); 4153a5a1b3Sopenharmony_ci 4253a5a1b3Sopenharmony_cistruct asyncmsgq_item { 4353a5a1b3Sopenharmony_ci int code; 4453a5a1b3Sopenharmony_ci pa_msgobject *object; 4553a5a1b3Sopenharmony_ci void *userdata; 4653a5a1b3Sopenharmony_ci pa_free_cb_t free_cb; 4753a5a1b3Sopenharmony_ci int64_t offset; 4853a5a1b3Sopenharmony_ci pa_memchunk memchunk; 4953a5a1b3Sopenharmony_ci pa_semaphore *semaphore; 5053a5a1b3Sopenharmony_ci int ret; 5153a5a1b3Sopenharmony_ci}; 5253a5a1b3Sopenharmony_ci 5353a5a1b3Sopenharmony_cistruct pa_asyncmsgq { 5453a5a1b3Sopenharmony_ci PA_REFCNT_DECLARE; 5553a5a1b3Sopenharmony_ci pa_asyncq *asyncq; 5653a5a1b3Sopenharmony_ci pa_mutex *mutex; /* only for the writer side */ 5753a5a1b3Sopenharmony_ci 5853a5a1b3Sopenharmony_ci struct asyncmsgq_item *current; 5953a5a1b3Sopenharmony_ci}; 6053a5a1b3Sopenharmony_ci 6153a5a1b3Sopenharmony_cipa_asyncmsgq *pa_asyncmsgq_new(unsigned size) { 6253a5a1b3Sopenharmony_ci pa_asyncq *asyncq; 6353a5a1b3Sopenharmony_ci pa_asyncmsgq *a; 6453a5a1b3Sopenharmony_ci 6553a5a1b3Sopenharmony_ci asyncq = pa_asyncq_new(size); 6653a5a1b3Sopenharmony_ci if (!asyncq) 6753a5a1b3Sopenharmony_ci return NULL; 6853a5a1b3Sopenharmony_ci 6953a5a1b3Sopenharmony_ci a = pa_xnew(pa_asyncmsgq, 1); 7053a5a1b3Sopenharmony_ci 7153a5a1b3Sopenharmony_ci PA_REFCNT_INIT(a); 7253a5a1b3Sopenharmony_ci a->asyncq = asyncq; 7353a5a1b3Sopenharmony_ci pa_assert_se(a->mutex = pa_mutex_new(false, true)); 7453a5a1b3Sopenharmony_ci a->current = NULL; 7553a5a1b3Sopenharmony_ci 7653a5a1b3Sopenharmony_ci return a; 7753a5a1b3Sopenharmony_ci} 7853a5a1b3Sopenharmony_ci 7953a5a1b3Sopenharmony_cistatic void asyncmsgq_free(pa_asyncmsgq *a) { 8053a5a1b3Sopenharmony_ci struct asyncmsgq_item *i; 8153a5a1b3Sopenharmony_ci pa_assert(a); 8253a5a1b3Sopenharmony_ci 8353a5a1b3Sopenharmony_ci while ((i = pa_asyncq_pop(a->asyncq, false))) { 8453a5a1b3Sopenharmony_ci 8553a5a1b3Sopenharmony_ci pa_assert(!i->semaphore); 8653a5a1b3Sopenharmony_ci 8753a5a1b3Sopenharmony_ci if (i->object) 8853a5a1b3Sopenharmony_ci pa_msgobject_unref(i->object); 8953a5a1b3Sopenharmony_ci 9053a5a1b3Sopenharmony_ci if (i->memchunk.memblock) 9153a5a1b3Sopenharmony_ci pa_memblock_unref(i->memchunk.memblock); 9253a5a1b3Sopenharmony_ci 9353a5a1b3Sopenharmony_ci if (i->free_cb) 9453a5a1b3Sopenharmony_ci i->free_cb(i->userdata); 9553a5a1b3Sopenharmony_ci 9653a5a1b3Sopenharmony_ci if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0) 9753a5a1b3Sopenharmony_ci pa_xfree(i); 9853a5a1b3Sopenharmony_ci } 9953a5a1b3Sopenharmony_ci 10053a5a1b3Sopenharmony_ci pa_asyncq_free(a->asyncq, NULL); 10153a5a1b3Sopenharmony_ci pa_mutex_free(a->mutex); 10253a5a1b3Sopenharmony_ci pa_xfree(a); 10353a5a1b3Sopenharmony_ci} 10453a5a1b3Sopenharmony_ci 10553a5a1b3Sopenharmony_cipa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) { 10653a5a1b3Sopenharmony_ci pa_assert(PA_REFCNT_VALUE(q) > 0); 10753a5a1b3Sopenharmony_ci 10853a5a1b3Sopenharmony_ci PA_REFCNT_INC(q); 10953a5a1b3Sopenharmony_ci return q; 11053a5a1b3Sopenharmony_ci} 11153a5a1b3Sopenharmony_ci 11253a5a1b3Sopenharmony_civoid pa_asyncmsgq_unref(pa_asyncmsgq* q) { 11353a5a1b3Sopenharmony_ci pa_assert(PA_REFCNT_VALUE(q) > 0); 11453a5a1b3Sopenharmony_ci 11553a5a1b3Sopenharmony_ci if (PA_REFCNT_DEC(q) <= 0) 11653a5a1b3Sopenharmony_ci asyncmsgq_free(q); 11753a5a1b3Sopenharmony_ci} 11853a5a1b3Sopenharmony_ci 11953a5a1b3Sopenharmony_civoid pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) { 12053a5a1b3Sopenharmony_ci struct asyncmsgq_item *i; 12153a5a1b3Sopenharmony_ci pa_assert(PA_REFCNT_VALUE(a) > 0); 12253a5a1b3Sopenharmony_ci 12353a5a1b3Sopenharmony_ci char t[PA_SNPRINTF_STR_LENGTH] = {0}; 12453a5a1b3Sopenharmony_ci pa_snprintf(t, sizeof(t), "pa_asyncmsgq_post[%d] %u", code, PaAsyncqGetNumToRead(a->asyncq)); 12553a5a1b3Sopenharmony_ci CallStart(t); 12653a5a1b3Sopenharmony_ci 12753a5a1b3Sopenharmony_ci if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq)))) 12853a5a1b3Sopenharmony_ci i = pa_xnew(struct asyncmsgq_item, 1); 12953a5a1b3Sopenharmony_ci 13053a5a1b3Sopenharmony_ci i->code = code; 13153a5a1b3Sopenharmony_ci i->object = object ? pa_msgobject_ref(object) : NULL; 13253a5a1b3Sopenharmony_ci i->userdata = (void*) userdata; 13353a5a1b3Sopenharmony_ci i->free_cb = free_cb; 13453a5a1b3Sopenharmony_ci i->offset = offset; 13553a5a1b3Sopenharmony_ci if (chunk) { 13653a5a1b3Sopenharmony_ci pa_assert(chunk->memblock); 13753a5a1b3Sopenharmony_ci i->memchunk = *chunk; 13853a5a1b3Sopenharmony_ci pa_memblock_ref(i->memchunk.memblock); 13953a5a1b3Sopenharmony_ci } else 14053a5a1b3Sopenharmony_ci pa_memchunk_reset(&i->memchunk); 14153a5a1b3Sopenharmony_ci i->semaphore = NULL; 14253a5a1b3Sopenharmony_ci 14353a5a1b3Sopenharmony_ci /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */ 14453a5a1b3Sopenharmony_ci pa_mutex_lock(a->mutex); 14553a5a1b3Sopenharmony_ci pa_asyncq_post(a->asyncq, i); 14653a5a1b3Sopenharmony_ci pa_mutex_unlock(a->mutex); 14753a5a1b3Sopenharmony_ci CallEnd(); 14853a5a1b3Sopenharmony_ci} 14953a5a1b3Sopenharmony_ci 15053a5a1b3Sopenharmony_ciint pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) { 15153a5a1b3Sopenharmony_ci struct asyncmsgq_item i; 15253a5a1b3Sopenharmony_ci pa_assert(PA_REFCNT_VALUE(a) > 0); 15353a5a1b3Sopenharmony_ci 15453a5a1b3Sopenharmony_ci char t[PA_SNPRINTF_STR_LENGTH] = {0}; 15553a5a1b3Sopenharmony_ci pa_snprintf(t, sizeof(t), "pa_asyncmsgq_send[%d] %u", code, PaAsyncqGetNumToRead(a->asyncq)); 15653a5a1b3Sopenharmony_ci CallStart(t); 15753a5a1b3Sopenharmony_ci i.code = code; 15853a5a1b3Sopenharmony_ci i.object = object; 15953a5a1b3Sopenharmony_ci i.userdata = (void*) userdata; 16053a5a1b3Sopenharmony_ci i.free_cb = NULL; 16153a5a1b3Sopenharmony_ci i.ret = -1; 16253a5a1b3Sopenharmony_ci i.offset = offset; 16353a5a1b3Sopenharmony_ci if (chunk) { 16453a5a1b3Sopenharmony_ci pa_assert(chunk->memblock); 16553a5a1b3Sopenharmony_ci i.memchunk = *chunk; 16653a5a1b3Sopenharmony_ci } else 16753a5a1b3Sopenharmony_ci pa_memchunk_reset(&i.memchunk); 16853a5a1b3Sopenharmony_ci 16953a5a1b3Sopenharmony_ci if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores)))) 17053a5a1b3Sopenharmony_ci i.semaphore = pa_semaphore_new(0); 17153a5a1b3Sopenharmony_ci 17253a5a1b3Sopenharmony_ci /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */ 17353a5a1b3Sopenharmony_ci pa_mutex_lock(a->mutex); 17453a5a1b3Sopenharmony_ci pa_assert_se(pa_asyncq_push(a->asyncq, &i, true) == 0); 17553a5a1b3Sopenharmony_ci pa_mutex_unlock(a->mutex); 17653a5a1b3Sopenharmony_ci 17753a5a1b3Sopenharmony_ci pa_semaphore_wait(i.semaphore); 17853a5a1b3Sopenharmony_ci 17953a5a1b3Sopenharmony_ci if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0) 18053a5a1b3Sopenharmony_ci pa_semaphore_free(i.semaphore); 18153a5a1b3Sopenharmony_ci CallEnd(); 18253a5a1b3Sopenharmony_ci return i.ret; 18353a5a1b3Sopenharmony_ci} 18453a5a1b3Sopenharmony_ci 18553a5a1b3Sopenharmony_ciint pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, bool wait_op) { 18653a5a1b3Sopenharmony_ci pa_assert(PA_REFCNT_VALUE(a) > 0); 18753a5a1b3Sopenharmony_ci pa_assert(!a->current); 18853a5a1b3Sopenharmony_ci 18953a5a1b3Sopenharmony_ci if (!(a->current = pa_asyncq_pop(a->asyncq, wait_op))) { 19053a5a1b3Sopenharmony_ci/* pa_log("failure"); */ 19153a5a1b3Sopenharmony_ci return -1; 19253a5a1b3Sopenharmony_ci } 19353a5a1b3Sopenharmony_ci 19453a5a1b3Sopenharmony_ci/* pa_log("success"); */ 19553a5a1b3Sopenharmony_ci 19653a5a1b3Sopenharmony_ci char t[PA_SNPRINTF_STR_LENGTH] = {0}; 19753a5a1b3Sopenharmony_ci pa_snprintf(t, sizeof(t), "pa_asyncmsgq_get[%d] %u", a->current->code, PaAsyncqGetNumToRead(a->asyncq)); 19853a5a1b3Sopenharmony_ci CallStart(t); 19953a5a1b3Sopenharmony_ci if (code) 20053a5a1b3Sopenharmony_ci *code = a->current->code; 20153a5a1b3Sopenharmony_ci if (userdata) 20253a5a1b3Sopenharmony_ci *userdata = a->current->userdata; 20353a5a1b3Sopenharmony_ci if (offset) 20453a5a1b3Sopenharmony_ci *offset = a->current->offset; 20553a5a1b3Sopenharmony_ci if (object) { 20653a5a1b3Sopenharmony_ci if ((*object = a->current->object)) 20753a5a1b3Sopenharmony_ci pa_msgobject_assert_ref(*object); 20853a5a1b3Sopenharmony_ci } 20953a5a1b3Sopenharmony_ci if (chunk) 21053a5a1b3Sopenharmony_ci *chunk = a->current->memchunk; 21153a5a1b3Sopenharmony_ci 21253a5a1b3Sopenharmony_ci/* pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */ 21353a5a1b3Sopenharmony_ci/* (void*) a, */ 21453a5a1b3Sopenharmony_ci/* (void*) a->current->object, */ 21553a5a1b3Sopenharmony_ci/* a->current->object ? a->current->object->parent.type_name : NULL, */ 21653a5a1b3Sopenharmony_ci/* a->current->code, */ 21753a5a1b3Sopenharmony_ci/* (void*) a->current->userdata, */ 21853a5a1b3Sopenharmony_ci/* (unsigned long) a->current->memchunk.length); */ 21953a5a1b3Sopenharmony_ci 22053a5a1b3Sopenharmony_ci CallEnd(); 22153a5a1b3Sopenharmony_ci return 0; 22253a5a1b3Sopenharmony_ci} 22353a5a1b3Sopenharmony_ci 22453a5a1b3Sopenharmony_civoid pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) { 22553a5a1b3Sopenharmony_ci pa_assert(PA_REFCNT_VALUE(a) > 0); 22653a5a1b3Sopenharmony_ci pa_assert(a); 22753a5a1b3Sopenharmony_ci pa_assert(a->current); 22853a5a1b3Sopenharmony_ci 22953a5a1b3Sopenharmony_ci if (a->current->semaphore) { 23053a5a1b3Sopenharmony_ci a->current->ret = ret; 23153a5a1b3Sopenharmony_ci pa_semaphore_post(a->current->semaphore); 23253a5a1b3Sopenharmony_ci } else { 23353a5a1b3Sopenharmony_ci 23453a5a1b3Sopenharmony_ci if (a->current->free_cb) 23553a5a1b3Sopenharmony_ci a->current->free_cb(a->current->userdata); 23653a5a1b3Sopenharmony_ci 23753a5a1b3Sopenharmony_ci if (a->current->object) 23853a5a1b3Sopenharmony_ci pa_msgobject_unref(a->current->object); 23953a5a1b3Sopenharmony_ci 24053a5a1b3Sopenharmony_ci if (a->current->memchunk.memblock) 24153a5a1b3Sopenharmony_ci pa_memblock_unref(a->current->memchunk.memblock); 24253a5a1b3Sopenharmony_ci 24353a5a1b3Sopenharmony_ci if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0) 24453a5a1b3Sopenharmony_ci pa_xfree(a->current); 24553a5a1b3Sopenharmony_ci } 24653a5a1b3Sopenharmony_ci 24753a5a1b3Sopenharmony_ci a->current = NULL; 24853a5a1b3Sopenharmony_ci} 24953a5a1b3Sopenharmony_ci 25053a5a1b3Sopenharmony_ciint pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) { 25153a5a1b3Sopenharmony_ci int c; 25253a5a1b3Sopenharmony_ci pa_assert(PA_REFCNT_VALUE(a) > 0); 25353a5a1b3Sopenharmony_ci 25453a5a1b3Sopenharmony_ci pa_asyncmsgq_ref(a); 25553a5a1b3Sopenharmony_ci 25653a5a1b3Sopenharmony_ci do { 25753a5a1b3Sopenharmony_ci pa_msgobject *o; 25853a5a1b3Sopenharmony_ci void *data; 25953a5a1b3Sopenharmony_ci int64_t offset; 26053a5a1b3Sopenharmony_ci pa_memchunk chunk; 26153a5a1b3Sopenharmony_ci int ret; 26253a5a1b3Sopenharmony_ci 26353a5a1b3Sopenharmony_ci if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, true) < 0) 26453a5a1b3Sopenharmony_ci return -1; 26553a5a1b3Sopenharmony_ci 26653a5a1b3Sopenharmony_ci ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk); 26753a5a1b3Sopenharmony_ci pa_asyncmsgq_done(a, ret); 26853a5a1b3Sopenharmony_ci 26953a5a1b3Sopenharmony_ci } while (c != code); 27053a5a1b3Sopenharmony_ci 27153a5a1b3Sopenharmony_ci pa_asyncmsgq_unref(a); 27253a5a1b3Sopenharmony_ci 27353a5a1b3Sopenharmony_ci return 0; 27453a5a1b3Sopenharmony_ci} 27553a5a1b3Sopenharmony_ci 27653a5a1b3Sopenharmony_ciint pa_asyncmsgq_process_one(pa_asyncmsgq *a) { 27753a5a1b3Sopenharmony_ci pa_msgobject *object; 27853a5a1b3Sopenharmony_ci int code; 27953a5a1b3Sopenharmony_ci void *data; 28053a5a1b3Sopenharmony_ci pa_memchunk chunk; 28153a5a1b3Sopenharmony_ci int64_t offset; 28253a5a1b3Sopenharmony_ci int ret; 28353a5a1b3Sopenharmony_ci 28453a5a1b3Sopenharmony_ci pa_assert(PA_REFCNT_VALUE(a) > 0); 28553a5a1b3Sopenharmony_ci 28653a5a1b3Sopenharmony_ci if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0) 28753a5a1b3Sopenharmony_ci return 0; 28853a5a1b3Sopenharmony_ci 28953a5a1b3Sopenharmony_ci pa_asyncmsgq_ref(a); 29053a5a1b3Sopenharmony_ci ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); 29153a5a1b3Sopenharmony_ci pa_asyncmsgq_done(a, ret); 29253a5a1b3Sopenharmony_ci pa_asyncmsgq_unref(a); 29353a5a1b3Sopenharmony_ci 29453a5a1b3Sopenharmony_ci return 1; 29553a5a1b3Sopenharmony_ci} 29653a5a1b3Sopenharmony_ci 29753a5a1b3Sopenharmony_ciint pa_asyncmsgq_read_fd(pa_asyncmsgq *a) { 29853a5a1b3Sopenharmony_ci pa_assert(PA_REFCNT_VALUE(a) > 0); 29953a5a1b3Sopenharmony_ci 30053a5a1b3Sopenharmony_ci return pa_asyncq_read_fd(a->asyncq); 30153a5a1b3Sopenharmony_ci} 30253a5a1b3Sopenharmony_ci 30353a5a1b3Sopenharmony_ciint pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) { 30453a5a1b3Sopenharmony_ci pa_assert(PA_REFCNT_VALUE(a) > 0); 30553a5a1b3Sopenharmony_ci 30653a5a1b3Sopenharmony_ci return pa_asyncq_read_before_poll(a->asyncq); 30753a5a1b3Sopenharmony_ci} 30853a5a1b3Sopenharmony_ci 30953a5a1b3Sopenharmony_civoid pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) { 31053a5a1b3Sopenharmony_ci pa_assert(PA_REFCNT_VALUE(a) > 0); 31153a5a1b3Sopenharmony_ci 31253a5a1b3Sopenharmony_ci pa_asyncq_read_after_poll(a->asyncq); 31353a5a1b3Sopenharmony_ci} 31453a5a1b3Sopenharmony_ci 31553a5a1b3Sopenharmony_ciint pa_asyncmsgq_write_fd(pa_asyncmsgq *a) { 31653a5a1b3Sopenharmony_ci pa_assert(PA_REFCNT_VALUE(a) > 0); 31753a5a1b3Sopenharmony_ci 31853a5a1b3Sopenharmony_ci return pa_asyncq_write_fd(a->asyncq); 31953a5a1b3Sopenharmony_ci} 32053a5a1b3Sopenharmony_ci 32153a5a1b3Sopenharmony_civoid pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) { 32253a5a1b3Sopenharmony_ci pa_assert(PA_REFCNT_VALUE(a) > 0); 32353a5a1b3Sopenharmony_ci 32453a5a1b3Sopenharmony_ci pa_asyncq_write_before_poll(a->asyncq); 32553a5a1b3Sopenharmony_ci} 32653a5a1b3Sopenharmony_ci 32753a5a1b3Sopenharmony_civoid pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) { 32853a5a1b3Sopenharmony_ci pa_assert(PA_REFCNT_VALUE(a) > 0); 32953a5a1b3Sopenharmony_ci 33053a5a1b3Sopenharmony_ci pa_asyncq_write_after_poll(a->asyncq); 33153a5a1b3Sopenharmony_ci} 33253a5a1b3Sopenharmony_ci 33353a5a1b3Sopenharmony_ciint pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) { 33453a5a1b3Sopenharmony_ci 33553a5a1b3Sopenharmony_ci if (object) 33653a5a1b3Sopenharmony_ci return object->process_msg(object, code, userdata, offset, pa_memchunk_isset(memchunk) ? memchunk : NULL); 33753a5a1b3Sopenharmony_ci 33853a5a1b3Sopenharmony_ci return 0; 33953a5a1b3Sopenharmony_ci} 34053a5a1b3Sopenharmony_ci 34153a5a1b3Sopenharmony_civoid pa_asyncmsgq_flush(pa_asyncmsgq *a, bool run) { 34253a5a1b3Sopenharmony_ci pa_assert(PA_REFCNT_VALUE(a) > 0); 34353a5a1b3Sopenharmony_ci 34453a5a1b3Sopenharmony_ci for (;;) { 34553a5a1b3Sopenharmony_ci pa_msgobject *object; 34653a5a1b3Sopenharmony_ci int code; 34753a5a1b3Sopenharmony_ci void *data; 34853a5a1b3Sopenharmony_ci int64_t offset; 34953a5a1b3Sopenharmony_ci pa_memchunk chunk; 35053a5a1b3Sopenharmony_ci int ret; 35153a5a1b3Sopenharmony_ci 35253a5a1b3Sopenharmony_ci if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0) 35353a5a1b3Sopenharmony_ci return; 35453a5a1b3Sopenharmony_ci 35553a5a1b3Sopenharmony_ci if (!run) { 35653a5a1b3Sopenharmony_ci pa_asyncmsgq_done(a, -1); 35753a5a1b3Sopenharmony_ci continue; 35853a5a1b3Sopenharmony_ci } 35953a5a1b3Sopenharmony_ci 36053a5a1b3Sopenharmony_ci pa_asyncmsgq_ref(a); 36153a5a1b3Sopenharmony_ci ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); 36253a5a1b3Sopenharmony_ci pa_asyncmsgq_done(a, ret); 36353a5a1b3Sopenharmony_ci pa_asyncmsgq_unref(a); 36453a5a1b3Sopenharmony_ci } 36553a5a1b3Sopenharmony_ci} 36653a5a1b3Sopenharmony_ci 36753a5a1b3Sopenharmony_cibool pa_asyncmsgq_dispatching(pa_asyncmsgq *a) { 36853a5a1b3Sopenharmony_ci pa_assert(PA_REFCNT_VALUE(a) > 0); 36953a5a1b3Sopenharmony_ci 37053a5a1b3Sopenharmony_ci return !!a->current; 37153a5a1b3Sopenharmony_ci} 372