1/*** 2 This file is part of PulseAudio. 3 4 Copyright 2006 Lennart Poettering 5 6 PulseAudio is free software; you can redistribute it and/or modify 7 it under the terms of the GNU Lesser General Public License as 8 published by the Free Software Foundation; either version 2.1 of the 9 License, or (at your option) any later version. 10 11 PulseAudio is distributed in the hope that it will be useful, but 12 WITHOUT ANY WARRANTY; without even the implied warranty of 13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 Lesser General Public License for more details. 15 16 You should have received a copy of the GNU Lesser General Public 17 License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 18***/ 19 20#ifdef HAVE_CONFIG_H 21#include <config.h> 22#endif 23 24#include <unistd.h> 25#include <errno.h> 26 27#include <pulsecore/thread.h> 28#include <pulsecore/semaphore.h> 29#include <pulsecore/macro.h> 30 31#include <pulse/mainloop-api.h> 32 33#include "thread-mq.h" 34 35PA_STATIC_TLS_DECLARE_NO_FREE(thread_mq); 36 37static void asyncmsgq_read_cb(pa_mainloop_api *api, pa_io_event *e, int fd, pa_io_event_flags_t events, void *userdata) { 38 pa_thread_mq *q = userdata; 39 pa_asyncmsgq *aq; 40 41 pa_assert(events == PA_IO_EVENT_INPUT); 42 43 if (pa_asyncmsgq_read_fd(q->outq) == fd) 44 pa_asyncmsgq_ref(aq = q->outq); 45 else if (pa_asyncmsgq_read_fd(q->inq) == fd) 46 pa_asyncmsgq_ref(aq = q->inq); 47 else 48 pa_assert_not_reached(); 49 50 pa_asyncmsgq_read_after_poll(aq); 51 52 for (;;) { 53 pa_msgobject *object; 54 int code; 55 void *data; 56 int64_t offset; 57 pa_memchunk chunk; 58 59 /* Check whether there is a message for us to process */ 60 while (pa_asyncmsgq_get(aq, &object, &code, &data, &offset, &chunk, 0) >= 0) { 61 int ret; 62 63 if (!object && code == PA_MESSAGE_SHUTDOWN) { 64 pa_asyncmsgq_done(aq, 0); 65 api->quit(api, 0); 66 break; 67 } 68 69 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); 70 pa_asyncmsgq_done(aq, ret); 71 } 72 73 if (pa_asyncmsgq_read_before_poll(aq) == 0) 74 break; 75 } 76 77 pa_asyncmsgq_unref(aq); 78} 79 80static void asyncmsgq_write_inq_cb(pa_mainloop_api *api, pa_io_event *e, int fd, pa_io_event_flags_t events, void *userdata) { 81 pa_thread_mq *q = userdata; 82 83 pa_assert(pa_asyncmsgq_write_fd(q->inq) == fd); 84 pa_assert(events == PA_IO_EVENT_INPUT); 85 86 pa_asyncmsgq_write_after_poll(q->inq); 87 pa_asyncmsgq_write_before_poll(q->inq); 88} 89 90static void asyncmsgq_write_outq_cb(pa_mainloop_api *api, pa_io_event *e, int fd, pa_io_event_flags_t events, void *userdata) { 91 pa_thread_mq *q = userdata; 92 93 pa_assert(pa_asyncmsgq_write_fd(q->outq) == fd); 94 pa_assert(events == PA_IO_EVENT_INPUT); 95 96 pa_asyncmsgq_write_after_poll(q->outq); 97 pa_asyncmsgq_write_before_poll(q->outq); 98} 99 100int pa_thread_mq_init_thread_mainloop(pa_thread_mq *q, pa_mainloop_api *main_mainloop, pa_mainloop_api *thread_mainloop) { 101 pa_assert(q); 102 pa_assert(main_mainloop); 103 pa_assert(thread_mainloop); 104 105 pa_zero(*q); 106 107 q->inq = pa_asyncmsgq_new(0); 108 if (!q->inq) 109 goto fail; 110 111 q->outq = pa_asyncmsgq_new(0); 112 if (!q->outq) 113 goto fail; 114 115 q->main_mainloop = main_mainloop; 116 q->thread_mainloop = thread_mainloop; 117 118 pa_assert_se(pa_asyncmsgq_read_before_poll(q->outq) == 0); 119 pa_asyncmsgq_write_before_poll(q->outq); 120 pa_assert_se(q->read_main_event = main_mainloop->io_new(main_mainloop, pa_asyncmsgq_read_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_read_cb, q)); 121 pa_assert_se(q->write_thread_event = thread_mainloop->io_new(thread_mainloop, pa_asyncmsgq_write_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_write_outq_cb, q)); 122 123 pa_asyncmsgq_read_before_poll(q->inq); 124 pa_asyncmsgq_write_before_poll(q->inq); 125 pa_assert_se(q->read_thread_event = thread_mainloop->io_new(thread_mainloop, pa_asyncmsgq_read_fd(q->inq), PA_IO_EVENT_INPUT, asyncmsgq_read_cb, q)); 126 pa_assert_se(q->write_main_event = main_mainloop->io_new(main_mainloop, pa_asyncmsgq_write_fd(q->inq), PA_IO_EVENT_INPUT, asyncmsgq_write_inq_cb, q)); 127 128 return 0; 129 130fail: 131 pa_thread_mq_done(q); 132 133 return -1; 134} 135 136int pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop, pa_rtpoll *rtpoll) { 137 pa_assert(q); 138 pa_assert(mainloop); 139 140 pa_zero(*q); 141 142 q->main_mainloop = mainloop; 143 q->thread_mainloop = NULL; 144 145 q->inq = pa_asyncmsgq_new(0); 146 if (!q->inq) 147 goto fail; 148 149 q->outq = pa_asyncmsgq_new(0); 150 if (!q->outq) 151 goto fail; 152 153 pa_assert_se(pa_asyncmsgq_read_before_poll(q->outq) == 0); 154 pa_assert_se(q->read_main_event = mainloop->io_new(mainloop, pa_asyncmsgq_read_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_read_cb, q)); 155 156 pa_asyncmsgq_write_before_poll(q->inq); 157 pa_assert_se(q->write_main_event = mainloop->io_new(mainloop, pa_asyncmsgq_write_fd(q->inq), PA_IO_EVENT_INPUT, asyncmsgq_write_inq_cb, q)); 158 159 pa_rtpoll_item_new_asyncmsgq_read(rtpoll, PA_RTPOLL_EARLY, q->inq); 160 pa_rtpoll_item_new_asyncmsgq_write(rtpoll, PA_RTPOLL_LATE, q->outq); 161 162 return 0; 163 164fail: 165 pa_thread_mq_done(q); 166 167 return -1; 168} 169 170void pa_thread_mq_done(pa_thread_mq *q) { 171 pa_assert(q); 172 173 /* Since we are called from main context we can be sure that the 174 * inq is empty. However, the outq might still contain messages 175 * for the main loop, which we need to dispatch (e.g. release 176 * msgs, other stuff). Hence do so if we aren't currently 177 * dispatching anyway. */ 178 179 if (q->outq && !pa_asyncmsgq_dispatching(q->outq)) { 180 /* Flushing the asyncmsgq can cause arbitrarily callbacks to run, 181 potentially causing recursion into pa_thread_mq_done again. */ 182 pa_asyncmsgq *z = q->outq; 183 pa_asyncmsgq_ref(z); 184 pa_asyncmsgq_flush(z, true); 185 pa_asyncmsgq_unref(z); 186 } 187 188 if (q->main_mainloop) { 189 if (q->read_main_event) 190 q->main_mainloop->io_free(q->read_main_event); 191 if (q->write_main_event) 192 q->main_mainloop->io_free(q->write_main_event); 193 q->read_main_event = q->write_main_event = NULL; 194 } 195 196 if (q->thread_mainloop) { 197 if (q->read_thread_event) 198 q->thread_mainloop->io_free(q->read_thread_event); 199 if (q->write_thread_event) 200 q->thread_mainloop->io_free(q->write_thread_event); 201 q->read_thread_event = q->write_thread_event = NULL; 202 } 203 204 if (q->inq) 205 pa_asyncmsgq_unref(q->inq); 206 if (q->outq) 207 pa_asyncmsgq_unref(q->outq); 208 q->inq = q->outq = NULL; 209 210 q->main_mainloop = NULL; 211 q->thread_mainloop = NULL; 212} 213 214void pa_thread_mq_install(pa_thread_mq *q) { 215 pa_assert(q); 216 217 pa_assert(!(PA_STATIC_TLS_GET(thread_mq))); 218 PA_STATIC_TLS_SET(thread_mq, q); 219} 220 221pa_thread_mq *pa_thread_mq_get(void) { 222 return PA_STATIC_TLS_GET(thread_mq); 223} 224