153a5a1b3Sopenharmony_ci/*** 253a5a1b3Sopenharmony_ci This file is part of PulseAudio. 353a5a1b3Sopenharmony_ci 453a5a1b3Sopenharmony_ci Copyright 2006 Lennart Poettering 553a5a1b3Sopenharmony_ci 653a5a1b3Sopenharmony_ci PulseAudio is free software; you can redistribute it and/or modify 753a5a1b3Sopenharmony_ci it under the terms of the GNU Lesser General Public License as 853a5a1b3Sopenharmony_ci published by the Free Software Foundation; either version 2.1 of the 953a5a1b3Sopenharmony_ci License, or (at your option) any later version. 1053a5a1b3Sopenharmony_ci 1153a5a1b3Sopenharmony_ci PulseAudio is distributed in the hope that it will be useful, but 1253a5a1b3Sopenharmony_ci WITHOUT ANY WARRANTY; without even the implied warranty of 1353a5a1b3Sopenharmony_ci MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 1453a5a1b3Sopenharmony_ci Lesser General Public License for more details. 1553a5a1b3Sopenharmony_ci 1653a5a1b3Sopenharmony_ci You should have received a copy of the GNU Lesser General Public 1753a5a1b3Sopenharmony_ci License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 1853a5a1b3Sopenharmony_ci***/ 1953a5a1b3Sopenharmony_ci 2053a5a1b3Sopenharmony_ci#ifdef HAVE_CONFIG_H 2153a5a1b3Sopenharmony_ci#include <config.h> 2253a5a1b3Sopenharmony_ci#endif 2353a5a1b3Sopenharmony_ci 2453a5a1b3Sopenharmony_ci#include <unistd.h> 2553a5a1b3Sopenharmony_ci#include <errno.h> 2653a5a1b3Sopenharmony_ci 2753a5a1b3Sopenharmony_ci#include <pulsecore/thread.h> 2853a5a1b3Sopenharmony_ci#include <pulsecore/semaphore.h> 2953a5a1b3Sopenharmony_ci#include <pulsecore/macro.h> 3053a5a1b3Sopenharmony_ci 3153a5a1b3Sopenharmony_ci#include <pulse/mainloop-api.h> 3253a5a1b3Sopenharmony_ci 3353a5a1b3Sopenharmony_ci#include "thread-mq.h" 3453a5a1b3Sopenharmony_ci 3553a5a1b3Sopenharmony_ciPA_STATIC_TLS_DECLARE_NO_FREE(thread_mq); 3653a5a1b3Sopenharmony_ci 3753a5a1b3Sopenharmony_cistatic void asyncmsgq_read_cb(pa_mainloop_api *api, pa_io_event *e, int fd, pa_io_event_flags_t events, void *userdata) { 3853a5a1b3Sopenharmony_ci pa_thread_mq *q = userdata; 3953a5a1b3Sopenharmony_ci pa_asyncmsgq *aq; 4053a5a1b3Sopenharmony_ci 4153a5a1b3Sopenharmony_ci pa_assert(events == PA_IO_EVENT_INPUT); 4253a5a1b3Sopenharmony_ci 4353a5a1b3Sopenharmony_ci if (pa_asyncmsgq_read_fd(q->outq) == fd) 4453a5a1b3Sopenharmony_ci pa_asyncmsgq_ref(aq = q->outq); 4553a5a1b3Sopenharmony_ci else if (pa_asyncmsgq_read_fd(q->inq) == fd) 4653a5a1b3Sopenharmony_ci pa_asyncmsgq_ref(aq = q->inq); 4753a5a1b3Sopenharmony_ci else 4853a5a1b3Sopenharmony_ci pa_assert_not_reached(); 4953a5a1b3Sopenharmony_ci 5053a5a1b3Sopenharmony_ci pa_asyncmsgq_read_after_poll(aq); 5153a5a1b3Sopenharmony_ci 5253a5a1b3Sopenharmony_ci for (;;) { 5353a5a1b3Sopenharmony_ci pa_msgobject *object; 5453a5a1b3Sopenharmony_ci int code; 5553a5a1b3Sopenharmony_ci void *data; 5653a5a1b3Sopenharmony_ci int64_t offset; 5753a5a1b3Sopenharmony_ci pa_memchunk chunk; 5853a5a1b3Sopenharmony_ci 5953a5a1b3Sopenharmony_ci /* Check whether there is a message for us to process */ 6053a5a1b3Sopenharmony_ci while (pa_asyncmsgq_get(aq, &object, &code, &data, &offset, &chunk, 0) >= 0) { 6153a5a1b3Sopenharmony_ci int ret; 6253a5a1b3Sopenharmony_ci 6353a5a1b3Sopenharmony_ci if (!object && code == PA_MESSAGE_SHUTDOWN) { 6453a5a1b3Sopenharmony_ci pa_asyncmsgq_done(aq, 0); 6553a5a1b3Sopenharmony_ci api->quit(api, 0); 6653a5a1b3Sopenharmony_ci break; 6753a5a1b3Sopenharmony_ci } 6853a5a1b3Sopenharmony_ci 6953a5a1b3Sopenharmony_ci ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); 7053a5a1b3Sopenharmony_ci pa_asyncmsgq_done(aq, ret); 7153a5a1b3Sopenharmony_ci } 7253a5a1b3Sopenharmony_ci 7353a5a1b3Sopenharmony_ci if (pa_asyncmsgq_read_before_poll(aq) == 0) 7453a5a1b3Sopenharmony_ci break; 7553a5a1b3Sopenharmony_ci } 7653a5a1b3Sopenharmony_ci 7753a5a1b3Sopenharmony_ci pa_asyncmsgq_unref(aq); 7853a5a1b3Sopenharmony_ci} 7953a5a1b3Sopenharmony_ci 8053a5a1b3Sopenharmony_cistatic void asyncmsgq_write_inq_cb(pa_mainloop_api *api, pa_io_event *e, int fd, pa_io_event_flags_t events, void *userdata) { 8153a5a1b3Sopenharmony_ci pa_thread_mq *q = userdata; 8253a5a1b3Sopenharmony_ci 8353a5a1b3Sopenharmony_ci pa_assert(pa_asyncmsgq_write_fd(q->inq) == fd); 8453a5a1b3Sopenharmony_ci pa_assert(events == PA_IO_EVENT_INPUT); 8553a5a1b3Sopenharmony_ci 8653a5a1b3Sopenharmony_ci pa_asyncmsgq_write_after_poll(q->inq); 8753a5a1b3Sopenharmony_ci pa_asyncmsgq_write_before_poll(q->inq); 8853a5a1b3Sopenharmony_ci} 8953a5a1b3Sopenharmony_ci 9053a5a1b3Sopenharmony_cistatic void asyncmsgq_write_outq_cb(pa_mainloop_api *api, pa_io_event *e, int fd, pa_io_event_flags_t events, void *userdata) { 9153a5a1b3Sopenharmony_ci pa_thread_mq *q = userdata; 9253a5a1b3Sopenharmony_ci 9353a5a1b3Sopenharmony_ci pa_assert(pa_asyncmsgq_write_fd(q->outq) == fd); 9453a5a1b3Sopenharmony_ci pa_assert(events == PA_IO_EVENT_INPUT); 9553a5a1b3Sopenharmony_ci 9653a5a1b3Sopenharmony_ci pa_asyncmsgq_write_after_poll(q->outq); 9753a5a1b3Sopenharmony_ci pa_asyncmsgq_write_before_poll(q->outq); 9853a5a1b3Sopenharmony_ci} 9953a5a1b3Sopenharmony_ci 10053a5a1b3Sopenharmony_ciint pa_thread_mq_init_thread_mainloop(pa_thread_mq *q, pa_mainloop_api *main_mainloop, pa_mainloop_api *thread_mainloop) { 10153a5a1b3Sopenharmony_ci pa_assert(q); 10253a5a1b3Sopenharmony_ci pa_assert(main_mainloop); 10353a5a1b3Sopenharmony_ci pa_assert(thread_mainloop); 10453a5a1b3Sopenharmony_ci 10553a5a1b3Sopenharmony_ci pa_zero(*q); 10653a5a1b3Sopenharmony_ci 10753a5a1b3Sopenharmony_ci q->inq = pa_asyncmsgq_new(0); 10853a5a1b3Sopenharmony_ci if (!q->inq) 10953a5a1b3Sopenharmony_ci goto fail; 11053a5a1b3Sopenharmony_ci 11153a5a1b3Sopenharmony_ci q->outq = pa_asyncmsgq_new(0); 11253a5a1b3Sopenharmony_ci if (!q->outq) 11353a5a1b3Sopenharmony_ci goto fail; 11453a5a1b3Sopenharmony_ci 11553a5a1b3Sopenharmony_ci q->main_mainloop = main_mainloop; 11653a5a1b3Sopenharmony_ci q->thread_mainloop = thread_mainloop; 11753a5a1b3Sopenharmony_ci 11853a5a1b3Sopenharmony_ci pa_assert_se(pa_asyncmsgq_read_before_poll(q->outq) == 0); 11953a5a1b3Sopenharmony_ci pa_asyncmsgq_write_before_poll(q->outq); 12053a5a1b3Sopenharmony_ci pa_assert_se(q->read_main_event = main_mainloop->io_new(main_mainloop, pa_asyncmsgq_read_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_read_cb, q)); 12153a5a1b3Sopenharmony_ci pa_assert_se(q->write_thread_event = thread_mainloop->io_new(thread_mainloop, pa_asyncmsgq_write_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_write_outq_cb, q)); 12253a5a1b3Sopenharmony_ci 12353a5a1b3Sopenharmony_ci pa_asyncmsgq_read_before_poll(q->inq); 12453a5a1b3Sopenharmony_ci pa_asyncmsgq_write_before_poll(q->inq); 12553a5a1b3Sopenharmony_ci pa_assert_se(q->read_thread_event = thread_mainloop->io_new(thread_mainloop, pa_asyncmsgq_read_fd(q->inq), PA_IO_EVENT_INPUT, asyncmsgq_read_cb, q)); 12653a5a1b3Sopenharmony_ci pa_assert_se(q->write_main_event = main_mainloop->io_new(main_mainloop, pa_asyncmsgq_write_fd(q->inq), PA_IO_EVENT_INPUT, asyncmsgq_write_inq_cb, q)); 12753a5a1b3Sopenharmony_ci 12853a5a1b3Sopenharmony_ci return 0; 12953a5a1b3Sopenharmony_ci 13053a5a1b3Sopenharmony_cifail: 13153a5a1b3Sopenharmony_ci pa_thread_mq_done(q); 13253a5a1b3Sopenharmony_ci 13353a5a1b3Sopenharmony_ci return -1; 13453a5a1b3Sopenharmony_ci} 13553a5a1b3Sopenharmony_ci 13653a5a1b3Sopenharmony_ciint pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop, pa_rtpoll *rtpoll) { 13753a5a1b3Sopenharmony_ci pa_assert(q); 13853a5a1b3Sopenharmony_ci pa_assert(mainloop); 13953a5a1b3Sopenharmony_ci 14053a5a1b3Sopenharmony_ci pa_zero(*q); 14153a5a1b3Sopenharmony_ci 14253a5a1b3Sopenharmony_ci q->main_mainloop = mainloop; 14353a5a1b3Sopenharmony_ci q->thread_mainloop = NULL; 14453a5a1b3Sopenharmony_ci 14553a5a1b3Sopenharmony_ci q->inq = pa_asyncmsgq_new(0); 14653a5a1b3Sopenharmony_ci if (!q->inq) 14753a5a1b3Sopenharmony_ci goto fail; 14853a5a1b3Sopenharmony_ci 14953a5a1b3Sopenharmony_ci q->outq = pa_asyncmsgq_new(0); 15053a5a1b3Sopenharmony_ci if (!q->outq) 15153a5a1b3Sopenharmony_ci goto fail; 15253a5a1b3Sopenharmony_ci 15353a5a1b3Sopenharmony_ci pa_assert_se(pa_asyncmsgq_read_before_poll(q->outq) == 0); 15453a5a1b3Sopenharmony_ci pa_assert_se(q->read_main_event = mainloop->io_new(mainloop, pa_asyncmsgq_read_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_read_cb, q)); 15553a5a1b3Sopenharmony_ci 15653a5a1b3Sopenharmony_ci pa_asyncmsgq_write_before_poll(q->inq); 15753a5a1b3Sopenharmony_ci pa_assert_se(q->write_main_event = mainloop->io_new(mainloop, pa_asyncmsgq_write_fd(q->inq), PA_IO_EVENT_INPUT, asyncmsgq_write_inq_cb, q)); 15853a5a1b3Sopenharmony_ci 15953a5a1b3Sopenharmony_ci pa_rtpoll_item_new_asyncmsgq_read(rtpoll, PA_RTPOLL_EARLY, q->inq); 16053a5a1b3Sopenharmony_ci pa_rtpoll_item_new_asyncmsgq_write(rtpoll, PA_RTPOLL_LATE, q->outq); 16153a5a1b3Sopenharmony_ci 16253a5a1b3Sopenharmony_ci return 0; 16353a5a1b3Sopenharmony_ci 16453a5a1b3Sopenharmony_cifail: 16553a5a1b3Sopenharmony_ci pa_thread_mq_done(q); 16653a5a1b3Sopenharmony_ci 16753a5a1b3Sopenharmony_ci return -1; 16853a5a1b3Sopenharmony_ci} 16953a5a1b3Sopenharmony_ci 17053a5a1b3Sopenharmony_civoid pa_thread_mq_done(pa_thread_mq *q) { 17153a5a1b3Sopenharmony_ci pa_assert(q); 17253a5a1b3Sopenharmony_ci 17353a5a1b3Sopenharmony_ci /* Since we are called from main context we can be sure that the 17453a5a1b3Sopenharmony_ci * inq is empty. However, the outq might still contain messages 17553a5a1b3Sopenharmony_ci * for the main loop, which we need to dispatch (e.g. release 17653a5a1b3Sopenharmony_ci * msgs, other stuff). Hence do so if we aren't currently 17753a5a1b3Sopenharmony_ci * dispatching anyway. */ 17853a5a1b3Sopenharmony_ci 17953a5a1b3Sopenharmony_ci if (q->outq && !pa_asyncmsgq_dispatching(q->outq)) { 18053a5a1b3Sopenharmony_ci /* Flushing the asyncmsgq can cause arbitrarily callbacks to run, 18153a5a1b3Sopenharmony_ci potentially causing recursion into pa_thread_mq_done again. */ 18253a5a1b3Sopenharmony_ci pa_asyncmsgq *z = q->outq; 18353a5a1b3Sopenharmony_ci pa_asyncmsgq_ref(z); 18453a5a1b3Sopenharmony_ci pa_asyncmsgq_flush(z, true); 18553a5a1b3Sopenharmony_ci pa_asyncmsgq_unref(z); 18653a5a1b3Sopenharmony_ci } 18753a5a1b3Sopenharmony_ci 18853a5a1b3Sopenharmony_ci if (q->main_mainloop) { 18953a5a1b3Sopenharmony_ci if (q->read_main_event) 19053a5a1b3Sopenharmony_ci q->main_mainloop->io_free(q->read_main_event); 19153a5a1b3Sopenharmony_ci if (q->write_main_event) 19253a5a1b3Sopenharmony_ci q->main_mainloop->io_free(q->write_main_event); 19353a5a1b3Sopenharmony_ci q->read_main_event = q->write_main_event = NULL; 19453a5a1b3Sopenharmony_ci } 19553a5a1b3Sopenharmony_ci 19653a5a1b3Sopenharmony_ci if (q->thread_mainloop) { 19753a5a1b3Sopenharmony_ci if (q->read_thread_event) 19853a5a1b3Sopenharmony_ci q->thread_mainloop->io_free(q->read_thread_event); 19953a5a1b3Sopenharmony_ci if (q->write_thread_event) 20053a5a1b3Sopenharmony_ci q->thread_mainloop->io_free(q->write_thread_event); 20153a5a1b3Sopenharmony_ci q->read_thread_event = q->write_thread_event = NULL; 20253a5a1b3Sopenharmony_ci } 20353a5a1b3Sopenharmony_ci 20453a5a1b3Sopenharmony_ci if (q->inq) 20553a5a1b3Sopenharmony_ci pa_asyncmsgq_unref(q->inq); 20653a5a1b3Sopenharmony_ci if (q->outq) 20753a5a1b3Sopenharmony_ci pa_asyncmsgq_unref(q->outq); 20853a5a1b3Sopenharmony_ci q->inq = q->outq = NULL; 20953a5a1b3Sopenharmony_ci 21053a5a1b3Sopenharmony_ci q->main_mainloop = NULL; 21153a5a1b3Sopenharmony_ci q->thread_mainloop = NULL; 21253a5a1b3Sopenharmony_ci} 21353a5a1b3Sopenharmony_ci 21453a5a1b3Sopenharmony_civoid pa_thread_mq_install(pa_thread_mq *q) { 21553a5a1b3Sopenharmony_ci pa_assert(q); 21653a5a1b3Sopenharmony_ci 21753a5a1b3Sopenharmony_ci pa_assert(!(PA_STATIC_TLS_GET(thread_mq))); 21853a5a1b3Sopenharmony_ci PA_STATIC_TLS_SET(thread_mq, q); 21953a5a1b3Sopenharmony_ci} 22053a5a1b3Sopenharmony_ci 22153a5a1b3Sopenharmony_cipa_thread_mq *pa_thread_mq_get(void) { 22253a5a1b3Sopenharmony_ci return PA_STATIC_TLS_GET(thread_mq); 22353a5a1b3Sopenharmony_ci} 224