153a5a1b3Sopenharmony_ci/***
253a5a1b3Sopenharmony_ci  This file is part of PulseAudio.
353a5a1b3Sopenharmony_ci
453a5a1b3Sopenharmony_ci  Copyright 2006 Lennart Poettering
553a5a1b3Sopenharmony_ci
653a5a1b3Sopenharmony_ci  PulseAudio is free software; you can redistribute it and/or modify
753a5a1b3Sopenharmony_ci  it under the terms of the GNU Lesser General Public License as
853a5a1b3Sopenharmony_ci  published by the Free Software Foundation; either version 2.1 of the
953a5a1b3Sopenharmony_ci  License, or (at your option) any later version.
1053a5a1b3Sopenharmony_ci
1153a5a1b3Sopenharmony_ci  PulseAudio is distributed in the hope that it will be useful, but
1253a5a1b3Sopenharmony_ci  WITHOUT ANY WARRANTY; without even the implied warranty of
1353a5a1b3Sopenharmony_ci  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1453a5a1b3Sopenharmony_ci  Lesser General Public License for more details.
1553a5a1b3Sopenharmony_ci
1653a5a1b3Sopenharmony_ci  You should have received a copy of the GNU Lesser General Public
1753a5a1b3Sopenharmony_ci  License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
1853a5a1b3Sopenharmony_ci***/
1953a5a1b3Sopenharmony_ci
2053a5a1b3Sopenharmony_ci#ifdef HAVE_CONFIG_H
2153a5a1b3Sopenharmony_ci#include <config.h>
2253a5a1b3Sopenharmony_ci#endif
2353a5a1b3Sopenharmony_ci
2453a5a1b3Sopenharmony_ci#include <unistd.h>
2553a5a1b3Sopenharmony_ci#include <errno.h>
2653a5a1b3Sopenharmony_ci
2753a5a1b3Sopenharmony_ci#include <pulse/xmalloc.h>
2853a5a1b3Sopenharmony_ci
2953a5a1b3Sopenharmony_ci#include <pulsecore/macro.h>
3053a5a1b3Sopenharmony_ci#include <pulsecore/log.h>
3153a5a1b3Sopenharmony_ci#include <pulsecore/semaphore.h>
3253a5a1b3Sopenharmony_ci#include <pulsecore/macro.h>
3353a5a1b3Sopenharmony_ci#include <pulsecore/mutex.h>
3453a5a1b3Sopenharmony_ci#include <pulsecore/flist.h>
3553a5a1b3Sopenharmony_ci
3653a5a1b3Sopenharmony_ci#include "asyncmsgq.h"
3753a5a1b3Sopenharmony_ci#define PA_SNPRINTF_STR_LENGTH 256
3853a5a1b3Sopenharmony_ci
3953a5a1b3Sopenharmony_ciPA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree);
4053a5a1b3Sopenharmony_ciPA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free);
4153a5a1b3Sopenharmony_ci
4253a5a1b3Sopenharmony_cistruct asyncmsgq_item {
4353a5a1b3Sopenharmony_ci    int code;
4453a5a1b3Sopenharmony_ci    pa_msgobject *object;
4553a5a1b3Sopenharmony_ci    void *userdata;
4653a5a1b3Sopenharmony_ci    pa_free_cb_t free_cb;
4753a5a1b3Sopenharmony_ci    int64_t offset;
4853a5a1b3Sopenharmony_ci    pa_memchunk memchunk;
4953a5a1b3Sopenharmony_ci    pa_semaphore *semaphore;
5053a5a1b3Sopenharmony_ci    int ret;
5153a5a1b3Sopenharmony_ci};
5253a5a1b3Sopenharmony_ci
5353a5a1b3Sopenharmony_cistruct pa_asyncmsgq {
5453a5a1b3Sopenharmony_ci    PA_REFCNT_DECLARE;
5553a5a1b3Sopenharmony_ci    pa_asyncq *asyncq;
5653a5a1b3Sopenharmony_ci    pa_mutex *mutex; /* only for the writer side */
5753a5a1b3Sopenharmony_ci
5853a5a1b3Sopenharmony_ci    struct asyncmsgq_item *current;
5953a5a1b3Sopenharmony_ci};
6053a5a1b3Sopenharmony_ci
6153a5a1b3Sopenharmony_cipa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
6253a5a1b3Sopenharmony_ci    pa_asyncq *asyncq;
6353a5a1b3Sopenharmony_ci    pa_asyncmsgq *a;
6453a5a1b3Sopenharmony_ci
6553a5a1b3Sopenharmony_ci    asyncq = pa_asyncq_new(size);
6653a5a1b3Sopenharmony_ci    if (!asyncq)
6753a5a1b3Sopenharmony_ci        return NULL;
6853a5a1b3Sopenharmony_ci
6953a5a1b3Sopenharmony_ci    a = pa_xnew(pa_asyncmsgq, 1);
7053a5a1b3Sopenharmony_ci
7153a5a1b3Sopenharmony_ci    PA_REFCNT_INIT(a);
7253a5a1b3Sopenharmony_ci    a->asyncq = asyncq;
7353a5a1b3Sopenharmony_ci    pa_assert_se(a->mutex = pa_mutex_new(false, true));
7453a5a1b3Sopenharmony_ci    a->current = NULL;
7553a5a1b3Sopenharmony_ci
7653a5a1b3Sopenharmony_ci    return a;
7753a5a1b3Sopenharmony_ci}
7853a5a1b3Sopenharmony_ci
7953a5a1b3Sopenharmony_cistatic void asyncmsgq_free(pa_asyncmsgq *a) {
8053a5a1b3Sopenharmony_ci    struct asyncmsgq_item *i;
8153a5a1b3Sopenharmony_ci    pa_assert(a);
8253a5a1b3Sopenharmony_ci
8353a5a1b3Sopenharmony_ci    while ((i = pa_asyncq_pop(a->asyncq, false))) {
8453a5a1b3Sopenharmony_ci
8553a5a1b3Sopenharmony_ci        pa_assert(!i->semaphore);
8653a5a1b3Sopenharmony_ci
8753a5a1b3Sopenharmony_ci        if (i->object)
8853a5a1b3Sopenharmony_ci            pa_msgobject_unref(i->object);
8953a5a1b3Sopenharmony_ci
9053a5a1b3Sopenharmony_ci        if (i->memchunk.memblock)
9153a5a1b3Sopenharmony_ci            pa_memblock_unref(i->memchunk.memblock);
9253a5a1b3Sopenharmony_ci
9353a5a1b3Sopenharmony_ci        if (i->free_cb)
9453a5a1b3Sopenharmony_ci            i->free_cb(i->userdata);
9553a5a1b3Sopenharmony_ci
9653a5a1b3Sopenharmony_ci        if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
9753a5a1b3Sopenharmony_ci            pa_xfree(i);
9853a5a1b3Sopenharmony_ci    }
9953a5a1b3Sopenharmony_ci
10053a5a1b3Sopenharmony_ci    pa_asyncq_free(a->asyncq, NULL);
10153a5a1b3Sopenharmony_ci    pa_mutex_free(a->mutex);
10253a5a1b3Sopenharmony_ci    pa_xfree(a);
10353a5a1b3Sopenharmony_ci}
10453a5a1b3Sopenharmony_ci
10553a5a1b3Sopenharmony_cipa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) {
10653a5a1b3Sopenharmony_ci    pa_assert(PA_REFCNT_VALUE(q) > 0);
10753a5a1b3Sopenharmony_ci
10853a5a1b3Sopenharmony_ci    PA_REFCNT_INC(q);
10953a5a1b3Sopenharmony_ci    return q;
11053a5a1b3Sopenharmony_ci}
11153a5a1b3Sopenharmony_ci
11253a5a1b3Sopenharmony_civoid pa_asyncmsgq_unref(pa_asyncmsgq* q) {
11353a5a1b3Sopenharmony_ci    pa_assert(PA_REFCNT_VALUE(q) > 0);
11453a5a1b3Sopenharmony_ci
11553a5a1b3Sopenharmony_ci    if (PA_REFCNT_DEC(q) <= 0)
11653a5a1b3Sopenharmony_ci        asyncmsgq_free(q);
11753a5a1b3Sopenharmony_ci}
11853a5a1b3Sopenharmony_ci
11953a5a1b3Sopenharmony_civoid pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
12053a5a1b3Sopenharmony_ci    struct asyncmsgq_item *i;
12153a5a1b3Sopenharmony_ci    pa_assert(PA_REFCNT_VALUE(a) > 0);
12253a5a1b3Sopenharmony_ci
12353a5a1b3Sopenharmony_ci    char t[PA_SNPRINTF_STR_LENGTH] = {0};
12453a5a1b3Sopenharmony_ci    pa_snprintf(t, sizeof(t), "pa_asyncmsgq_post[%d] %u", code, PaAsyncqGetNumToRead(a->asyncq));
12553a5a1b3Sopenharmony_ci    CallStart(t);
12653a5a1b3Sopenharmony_ci
12753a5a1b3Sopenharmony_ci    if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
12853a5a1b3Sopenharmony_ci        i = pa_xnew(struct asyncmsgq_item, 1);
12953a5a1b3Sopenharmony_ci
13053a5a1b3Sopenharmony_ci    i->code = code;
13153a5a1b3Sopenharmony_ci    i->object = object ? pa_msgobject_ref(object) : NULL;
13253a5a1b3Sopenharmony_ci    i->userdata = (void*) userdata;
13353a5a1b3Sopenharmony_ci    i->free_cb = free_cb;
13453a5a1b3Sopenharmony_ci    i->offset = offset;
13553a5a1b3Sopenharmony_ci    if (chunk) {
13653a5a1b3Sopenharmony_ci        pa_assert(chunk->memblock);
13753a5a1b3Sopenharmony_ci        i->memchunk = *chunk;
13853a5a1b3Sopenharmony_ci        pa_memblock_ref(i->memchunk.memblock);
13953a5a1b3Sopenharmony_ci    } else
14053a5a1b3Sopenharmony_ci        pa_memchunk_reset(&i->memchunk);
14153a5a1b3Sopenharmony_ci    i->semaphore = NULL;
14253a5a1b3Sopenharmony_ci
14353a5a1b3Sopenharmony_ci    /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
14453a5a1b3Sopenharmony_ci    pa_mutex_lock(a->mutex);
14553a5a1b3Sopenharmony_ci    pa_asyncq_post(a->asyncq, i);
14653a5a1b3Sopenharmony_ci    pa_mutex_unlock(a->mutex);
14753a5a1b3Sopenharmony_ci    CallEnd();
14853a5a1b3Sopenharmony_ci}
14953a5a1b3Sopenharmony_ci
15053a5a1b3Sopenharmony_ciint pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
15153a5a1b3Sopenharmony_ci    struct asyncmsgq_item i;
15253a5a1b3Sopenharmony_ci    pa_assert(PA_REFCNT_VALUE(a) > 0);
15353a5a1b3Sopenharmony_ci
15453a5a1b3Sopenharmony_ci    char t[PA_SNPRINTF_STR_LENGTH] = {0};
15553a5a1b3Sopenharmony_ci    pa_snprintf(t, sizeof(t), "pa_asyncmsgq_send[%d] %u", code, PaAsyncqGetNumToRead(a->asyncq));
15653a5a1b3Sopenharmony_ci    CallStart(t);
15753a5a1b3Sopenharmony_ci    i.code = code;
15853a5a1b3Sopenharmony_ci    i.object = object;
15953a5a1b3Sopenharmony_ci    i.userdata = (void*) userdata;
16053a5a1b3Sopenharmony_ci    i.free_cb = NULL;
16153a5a1b3Sopenharmony_ci    i.ret = -1;
16253a5a1b3Sopenharmony_ci    i.offset = offset;
16353a5a1b3Sopenharmony_ci    if (chunk) {
16453a5a1b3Sopenharmony_ci        pa_assert(chunk->memblock);
16553a5a1b3Sopenharmony_ci        i.memchunk = *chunk;
16653a5a1b3Sopenharmony_ci    } else
16753a5a1b3Sopenharmony_ci        pa_memchunk_reset(&i.memchunk);
16853a5a1b3Sopenharmony_ci
16953a5a1b3Sopenharmony_ci    if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores))))
17053a5a1b3Sopenharmony_ci        i.semaphore = pa_semaphore_new(0);
17153a5a1b3Sopenharmony_ci
17253a5a1b3Sopenharmony_ci    /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
17353a5a1b3Sopenharmony_ci    pa_mutex_lock(a->mutex);
17453a5a1b3Sopenharmony_ci    pa_assert_se(pa_asyncq_push(a->asyncq, &i, true) == 0);
17553a5a1b3Sopenharmony_ci    pa_mutex_unlock(a->mutex);
17653a5a1b3Sopenharmony_ci
17753a5a1b3Sopenharmony_ci    pa_semaphore_wait(i.semaphore);
17853a5a1b3Sopenharmony_ci
17953a5a1b3Sopenharmony_ci    if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0)
18053a5a1b3Sopenharmony_ci        pa_semaphore_free(i.semaphore);
18153a5a1b3Sopenharmony_ci    CallEnd();
18253a5a1b3Sopenharmony_ci    return i.ret;
18353a5a1b3Sopenharmony_ci}
18453a5a1b3Sopenharmony_ci
18553a5a1b3Sopenharmony_ciint pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, bool wait_op) {
18653a5a1b3Sopenharmony_ci    pa_assert(PA_REFCNT_VALUE(a) > 0);
18753a5a1b3Sopenharmony_ci    pa_assert(!a->current);
18853a5a1b3Sopenharmony_ci
18953a5a1b3Sopenharmony_ci    if (!(a->current = pa_asyncq_pop(a->asyncq, wait_op))) {
19053a5a1b3Sopenharmony_ci/*         pa_log("failure"); */
19153a5a1b3Sopenharmony_ci        return -1;
19253a5a1b3Sopenharmony_ci    }
19353a5a1b3Sopenharmony_ci
19453a5a1b3Sopenharmony_ci/*     pa_log("success"); */
19553a5a1b3Sopenharmony_ci
19653a5a1b3Sopenharmony_ci    char t[PA_SNPRINTF_STR_LENGTH] = {0};
19753a5a1b3Sopenharmony_ci    pa_snprintf(t, sizeof(t), "pa_asyncmsgq_get[%d] %u", a->current->code, PaAsyncqGetNumToRead(a->asyncq));
19853a5a1b3Sopenharmony_ci    CallStart(t);
19953a5a1b3Sopenharmony_ci    if (code)
20053a5a1b3Sopenharmony_ci        *code = a->current->code;
20153a5a1b3Sopenharmony_ci    if (userdata)
20253a5a1b3Sopenharmony_ci        *userdata = a->current->userdata;
20353a5a1b3Sopenharmony_ci    if (offset)
20453a5a1b3Sopenharmony_ci        *offset = a->current->offset;
20553a5a1b3Sopenharmony_ci    if (object) {
20653a5a1b3Sopenharmony_ci        if ((*object = a->current->object))
20753a5a1b3Sopenharmony_ci            pa_msgobject_assert_ref(*object);
20853a5a1b3Sopenharmony_ci    }
20953a5a1b3Sopenharmony_ci    if (chunk)
21053a5a1b3Sopenharmony_ci        *chunk = a->current->memchunk;
21153a5a1b3Sopenharmony_ci
21253a5a1b3Sopenharmony_ci/*     pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */
21353a5a1b3Sopenharmony_ci/*                  (void*) a, */
21453a5a1b3Sopenharmony_ci/*                  (void*) a->current->object, */
21553a5a1b3Sopenharmony_ci/*                  a->current->object ? a->current->object->parent.type_name : NULL, */
21653a5a1b3Sopenharmony_ci/*                  a->current->code, */
21753a5a1b3Sopenharmony_ci/*                  (void*) a->current->userdata, */
21853a5a1b3Sopenharmony_ci/*                  (unsigned long) a->current->memchunk.length); */
21953a5a1b3Sopenharmony_ci
22053a5a1b3Sopenharmony_ci    CallEnd();
22153a5a1b3Sopenharmony_ci    return 0;
22253a5a1b3Sopenharmony_ci}
22353a5a1b3Sopenharmony_ci
22453a5a1b3Sopenharmony_civoid pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
22553a5a1b3Sopenharmony_ci    pa_assert(PA_REFCNT_VALUE(a) > 0);
22653a5a1b3Sopenharmony_ci    pa_assert(a);
22753a5a1b3Sopenharmony_ci    pa_assert(a->current);
22853a5a1b3Sopenharmony_ci
22953a5a1b3Sopenharmony_ci    if (a->current->semaphore) {
23053a5a1b3Sopenharmony_ci        a->current->ret = ret;
23153a5a1b3Sopenharmony_ci        pa_semaphore_post(a->current->semaphore);
23253a5a1b3Sopenharmony_ci    } else {
23353a5a1b3Sopenharmony_ci
23453a5a1b3Sopenharmony_ci        if (a->current->free_cb)
23553a5a1b3Sopenharmony_ci            a->current->free_cb(a->current->userdata);
23653a5a1b3Sopenharmony_ci
23753a5a1b3Sopenharmony_ci        if (a->current->object)
23853a5a1b3Sopenharmony_ci            pa_msgobject_unref(a->current->object);
23953a5a1b3Sopenharmony_ci
24053a5a1b3Sopenharmony_ci        if (a->current->memchunk.memblock)
24153a5a1b3Sopenharmony_ci            pa_memblock_unref(a->current->memchunk.memblock);
24253a5a1b3Sopenharmony_ci
24353a5a1b3Sopenharmony_ci        if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
24453a5a1b3Sopenharmony_ci            pa_xfree(a->current);
24553a5a1b3Sopenharmony_ci    }
24653a5a1b3Sopenharmony_ci
24753a5a1b3Sopenharmony_ci    a->current = NULL;
24853a5a1b3Sopenharmony_ci}
24953a5a1b3Sopenharmony_ci
25053a5a1b3Sopenharmony_ciint pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
25153a5a1b3Sopenharmony_ci    int c;
25253a5a1b3Sopenharmony_ci    pa_assert(PA_REFCNT_VALUE(a) > 0);
25353a5a1b3Sopenharmony_ci
25453a5a1b3Sopenharmony_ci    pa_asyncmsgq_ref(a);
25553a5a1b3Sopenharmony_ci
25653a5a1b3Sopenharmony_ci    do {
25753a5a1b3Sopenharmony_ci        pa_msgobject *o;
25853a5a1b3Sopenharmony_ci        void *data;
25953a5a1b3Sopenharmony_ci        int64_t offset;
26053a5a1b3Sopenharmony_ci        pa_memchunk chunk;
26153a5a1b3Sopenharmony_ci        int ret;
26253a5a1b3Sopenharmony_ci
26353a5a1b3Sopenharmony_ci        if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, true) < 0)
26453a5a1b3Sopenharmony_ci            return -1;
26553a5a1b3Sopenharmony_ci
26653a5a1b3Sopenharmony_ci        ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
26753a5a1b3Sopenharmony_ci        pa_asyncmsgq_done(a, ret);
26853a5a1b3Sopenharmony_ci
26953a5a1b3Sopenharmony_ci    } while (c != code);
27053a5a1b3Sopenharmony_ci
27153a5a1b3Sopenharmony_ci    pa_asyncmsgq_unref(a);
27253a5a1b3Sopenharmony_ci
27353a5a1b3Sopenharmony_ci    return 0;
27453a5a1b3Sopenharmony_ci}
27553a5a1b3Sopenharmony_ci
27653a5a1b3Sopenharmony_ciint pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
27753a5a1b3Sopenharmony_ci    pa_msgobject *object;
27853a5a1b3Sopenharmony_ci    int code;
27953a5a1b3Sopenharmony_ci    void *data;
28053a5a1b3Sopenharmony_ci    pa_memchunk chunk;
28153a5a1b3Sopenharmony_ci    int64_t offset;
28253a5a1b3Sopenharmony_ci    int ret;
28353a5a1b3Sopenharmony_ci
28453a5a1b3Sopenharmony_ci    pa_assert(PA_REFCNT_VALUE(a) > 0);
28553a5a1b3Sopenharmony_ci
28653a5a1b3Sopenharmony_ci    if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0)
28753a5a1b3Sopenharmony_ci        return 0;
28853a5a1b3Sopenharmony_ci
28953a5a1b3Sopenharmony_ci    pa_asyncmsgq_ref(a);
29053a5a1b3Sopenharmony_ci    ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
29153a5a1b3Sopenharmony_ci    pa_asyncmsgq_done(a, ret);
29253a5a1b3Sopenharmony_ci    pa_asyncmsgq_unref(a);
29353a5a1b3Sopenharmony_ci
29453a5a1b3Sopenharmony_ci    return 1;
29553a5a1b3Sopenharmony_ci}
29653a5a1b3Sopenharmony_ci
29753a5a1b3Sopenharmony_ciint pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
29853a5a1b3Sopenharmony_ci    pa_assert(PA_REFCNT_VALUE(a) > 0);
29953a5a1b3Sopenharmony_ci
30053a5a1b3Sopenharmony_ci    return pa_asyncq_read_fd(a->asyncq);
30153a5a1b3Sopenharmony_ci}
30253a5a1b3Sopenharmony_ci
30353a5a1b3Sopenharmony_ciint pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
30453a5a1b3Sopenharmony_ci    pa_assert(PA_REFCNT_VALUE(a) > 0);
30553a5a1b3Sopenharmony_ci
30653a5a1b3Sopenharmony_ci    return pa_asyncq_read_before_poll(a->asyncq);
30753a5a1b3Sopenharmony_ci}
30853a5a1b3Sopenharmony_ci
30953a5a1b3Sopenharmony_civoid pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
31053a5a1b3Sopenharmony_ci    pa_assert(PA_REFCNT_VALUE(a) > 0);
31153a5a1b3Sopenharmony_ci
31253a5a1b3Sopenharmony_ci    pa_asyncq_read_after_poll(a->asyncq);
31353a5a1b3Sopenharmony_ci}
31453a5a1b3Sopenharmony_ci
31553a5a1b3Sopenharmony_ciint pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
31653a5a1b3Sopenharmony_ci    pa_assert(PA_REFCNT_VALUE(a) > 0);
31753a5a1b3Sopenharmony_ci
31853a5a1b3Sopenharmony_ci    return pa_asyncq_write_fd(a->asyncq);
31953a5a1b3Sopenharmony_ci}
32053a5a1b3Sopenharmony_ci
32153a5a1b3Sopenharmony_civoid pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
32253a5a1b3Sopenharmony_ci    pa_assert(PA_REFCNT_VALUE(a) > 0);
32353a5a1b3Sopenharmony_ci
32453a5a1b3Sopenharmony_ci    pa_asyncq_write_before_poll(a->asyncq);
32553a5a1b3Sopenharmony_ci}
32653a5a1b3Sopenharmony_ci
32753a5a1b3Sopenharmony_civoid pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
32853a5a1b3Sopenharmony_ci    pa_assert(PA_REFCNT_VALUE(a) > 0);
32953a5a1b3Sopenharmony_ci
33053a5a1b3Sopenharmony_ci    pa_asyncq_write_after_poll(a->asyncq);
33153a5a1b3Sopenharmony_ci}
33253a5a1b3Sopenharmony_ci
33353a5a1b3Sopenharmony_ciint pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
33453a5a1b3Sopenharmony_ci
33553a5a1b3Sopenharmony_ci    if (object)
33653a5a1b3Sopenharmony_ci        return object->process_msg(object, code, userdata, offset, pa_memchunk_isset(memchunk) ? memchunk : NULL);
33753a5a1b3Sopenharmony_ci
33853a5a1b3Sopenharmony_ci    return 0;
33953a5a1b3Sopenharmony_ci}
34053a5a1b3Sopenharmony_ci
34153a5a1b3Sopenharmony_civoid pa_asyncmsgq_flush(pa_asyncmsgq *a, bool run) {
34253a5a1b3Sopenharmony_ci    pa_assert(PA_REFCNT_VALUE(a) > 0);
34353a5a1b3Sopenharmony_ci
34453a5a1b3Sopenharmony_ci    for (;;) {
34553a5a1b3Sopenharmony_ci        pa_msgobject *object;
34653a5a1b3Sopenharmony_ci        int code;
34753a5a1b3Sopenharmony_ci        void *data;
34853a5a1b3Sopenharmony_ci        int64_t offset;
34953a5a1b3Sopenharmony_ci        pa_memchunk chunk;
35053a5a1b3Sopenharmony_ci        int ret;
35153a5a1b3Sopenharmony_ci
35253a5a1b3Sopenharmony_ci        if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, false) < 0)
35353a5a1b3Sopenharmony_ci            return;
35453a5a1b3Sopenharmony_ci
35553a5a1b3Sopenharmony_ci        if (!run) {
35653a5a1b3Sopenharmony_ci            pa_asyncmsgq_done(a, -1);
35753a5a1b3Sopenharmony_ci            continue;
35853a5a1b3Sopenharmony_ci        }
35953a5a1b3Sopenharmony_ci
36053a5a1b3Sopenharmony_ci        pa_asyncmsgq_ref(a);
36153a5a1b3Sopenharmony_ci        ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
36253a5a1b3Sopenharmony_ci        pa_asyncmsgq_done(a, ret);
36353a5a1b3Sopenharmony_ci        pa_asyncmsgq_unref(a);
36453a5a1b3Sopenharmony_ci    }
36553a5a1b3Sopenharmony_ci}
36653a5a1b3Sopenharmony_ci
36753a5a1b3Sopenharmony_cibool pa_asyncmsgq_dispatching(pa_asyncmsgq *a) {
36853a5a1b3Sopenharmony_ci    pa_assert(PA_REFCNT_VALUE(a) > 0);
36953a5a1b3Sopenharmony_ci
37053a5a1b3Sopenharmony_ci    return !!a->current;
37153a5a1b3Sopenharmony_ci}
372