1/*** 2 This file is part of PulseAudio. 3 4 Copyright 2006-2008 Lennart Poettering 5 6 PulseAudio is free software; you can redistribute it and/or modify 7 it under the terms of the GNU Lesser General Public License as 8 published by the Free Software Foundation; either version 2.1 of the 9 License, or (at your option) any later version. 10 11 PulseAudio is distributed in the hope that it will be useful, but 12 WITHOUT ANY WARRANTY; without even the implied warranty of 13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 Lesser General Public License for more details. 15 16 You should have received a copy of the GNU Lesser General Public 17 License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 18***/ 19 20#ifdef HAVE_CONFIG_H 21#include <config.h> 22#endif 23 24#include <unistd.h> 25#include <errno.h> 26 27#include <pulse/xmalloc.h> 28 29#include <pulsecore/atomic.h> 30#include <pulsecore/log.h> 31#include <pulsecore/thread.h> 32#include <pulsecore/macro.h> 33#include <pulsecore/core-util.h> 34#include <pulsecore/llist.h> 35#include <pulsecore/flist.h> 36#include <pulsecore/fdsem.h> 37 38#include "asyncq.h" 39 40#define ASYNCQ_SIZE 256 41 42/* For debugging purposes we can define _Y to put an extra thread 43 * yield between each operation. */ 44 45/* #define PROFILE */ 46 47#ifdef PROFILE 48#define _Y pa_thread_yield() 49#else 50#define _Y do { } while(0) 51#endif 52 53struct localq { 54 void *data; 55 PA_LLIST_FIELDS(struct localq); 56}; 57 58struct pa_asyncq { 59 unsigned size; 60 unsigned read_idx; 61 unsigned write_idx; 62 pa_fdsem *read_fdsem, *write_fdsem; 63 64 PA_LLIST_HEAD(struct localq, localq); 65 struct localq *last_localq; 66 bool waiting_for_post; 67}; 68 69PA_STATIC_FLIST_DECLARE(localq, 0, pa_xfree); 70 71#define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq)))) 72 73static unsigned reduce(pa_asyncq *l, unsigned value) { 74 return value & (unsigned) (l->size - 1); 75} 76 77unsigned PaAsyncqGetNumToRead(pa_asyncq *l) 78{ 79 return l->write_idx - l->read_idx; 80} 81 82pa_asyncq *pa_asyncq_new(unsigned size) { 83 pa_asyncq *l; 84 85 if (!size) 86 size = ASYNCQ_SIZE; 87 88 pa_assert(pa_is_power_of_two(size)); 89 90 l = pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq)) + (sizeof(pa_atomic_ptr_t) * size)); 91 92 l->size = size; 93 94 PA_LLIST_HEAD_INIT(struct localq, l->localq); 95 l->last_localq = NULL; 96 l->waiting_for_post = false; 97 98 if (!(l->read_fdsem = pa_fdsem_new())) { 99 pa_xfree(l); 100 return NULL; 101 } 102 103 if (!(l->write_fdsem = pa_fdsem_new())) { 104 pa_fdsem_free(l->read_fdsem); 105 pa_xfree(l); 106 return NULL; 107 } 108 109 return l; 110} 111 112void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) { 113 struct localq *q; 114 pa_assert(l); 115 116 if (free_cb) { 117 void *p; 118 119 while ((p = pa_asyncq_pop(l, 0))) 120 free_cb(p); 121 } 122 123 while ((q = l->localq)) { 124 if (free_cb) 125 free_cb(q->data); 126 127 PA_LLIST_REMOVE(struct localq, l->localq, q); 128 129 if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0) 130 pa_xfree(q); 131 } 132 133 pa_fdsem_free(l->read_fdsem); 134 pa_fdsem_free(l->write_fdsem); 135 pa_xfree(l); 136} 137 138static int push(pa_asyncq*l, void *p, bool wait_op) { 139 unsigned idx; 140 pa_atomic_ptr_t *cells; 141 142 pa_assert(l); 143 pa_assert(p); 144 145 cells = PA_ASYNCQ_CELLS(l); 146 147 _Y; 148 idx = reduce(l, l->write_idx); 149 150 if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) { 151 152 if (!wait_op) 153 return -1; 154 155/* pa_log("sleeping on push"); */ 156 157 do { 158 pa_fdsem_wait(l->read_fdsem); 159 } while (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)); 160 } 161 162 _Y; 163 l->write_idx++; 164 165 pa_fdsem_post(l->write_fdsem); 166 167 return 0; 168} 169 170static bool flush_postq(pa_asyncq *l, bool wait_op) { 171 struct localq *q; 172 173 pa_assert(l); 174 175 while ((q = l->last_localq)) { 176 177 if (push(l, q->data, wait_op) < 0) 178 return false; 179 180 l->last_localq = q->prev; 181 182 PA_LLIST_REMOVE(struct localq, l->localq, q); 183 184 if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0) 185 pa_xfree(q); 186 } 187 188 return true; 189} 190 191int pa_asyncq_push(pa_asyncq*l, void *p, bool wait_op) { 192 pa_assert(l); 193 194 if (!flush_postq(l, wait_op)) 195 return -1; 196 197 return push(l, p, wait_op); 198} 199 200void pa_asyncq_post(pa_asyncq*l, void *p) { 201 struct localq *q; 202 203 pa_assert(l); 204 pa_assert(p); 205 206 if (flush_postq(l, false)) 207 if (pa_asyncq_push(l, p, false) >= 0) 208 return; 209 210 /* OK, we couldn't push anything in the queue. So let's queue it 211 * locally and push it later */ 212 213 if (pa_log_ratelimit(PA_LOG_WARN)) 214 pa_log_warn("q overrun, queuing locally"); 215 216 if (!(q = pa_flist_pop(PA_STATIC_FLIST_GET(localq)))) 217 q = pa_xnew(struct localq, 1); 218 219 q->data = p; 220 PA_LLIST_PREPEND(struct localq, l->localq, q); 221 222 if (!l->last_localq) 223 l->last_localq = q; 224 225 return; 226} 227 228void* pa_asyncq_pop(pa_asyncq*l, bool wait_op) { 229 unsigned idx; 230 void *ret; 231 pa_atomic_ptr_t *cells; 232 233 pa_assert(l); 234 235 cells = PA_ASYNCQ_CELLS(l); 236 237 _Y; 238 idx = reduce(l, l->read_idx); 239 240 if (!(ret = pa_atomic_ptr_load(&cells[idx]))) { 241 242 if (!wait_op) 243 return NULL; 244 245/* pa_log("sleeping on pop"); */ 246 247 do { 248 pa_fdsem_wait(l->write_fdsem); 249 } while (!(ret = pa_atomic_ptr_load(&cells[idx]))); 250 } 251 252 pa_assert(ret); 253 254 /* Guaranteed to succeed if we only have a single reader */ 255 pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL)); 256 257 _Y; 258 l->read_idx++; 259 260 pa_fdsem_post(l->read_fdsem); 261 262 return ret; 263} 264 265int pa_asyncq_read_fd(pa_asyncq *q) { 266 pa_assert(q); 267 268 return pa_fdsem_get(q->write_fdsem); 269} 270 271int pa_asyncq_read_before_poll(pa_asyncq *l) { 272 unsigned idx; 273 pa_atomic_ptr_t *cells; 274 275 pa_assert(l); 276 277 cells = PA_ASYNCQ_CELLS(l); 278 279 _Y; 280 idx = reduce(l, l->read_idx); 281 282 for (;;) { 283 if (pa_atomic_ptr_load(&cells[idx])) 284 return -1; 285 286 if (pa_fdsem_before_poll(l->write_fdsem) >= 0) 287 return 0; 288 } 289} 290 291void pa_asyncq_read_after_poll(pa_asyncq *l) { 292 pa_assert(l); 293 294 pa_fdsem_after_poll(l->write_fdsem); 295} 296 297int pa_asyncq_write_fd(pa_asyncq *q) { 298 pa_assert(q); 299 300 return pa_fdsem_get(q->read_fdsem); 301} 302 303void pa_asyncq_write_before_poll(pa_asyncq *l) { 304 pa_assert(l); 305 306 for (;;) { 307 308 if (flush_postq(l, false)) 309 break; 310 311 if (pa_fdsem_before_poll(l->read_fdsem) >= 0) { 312 l->waiting_for_post = true; 313 break; 314 } 315 } 316} 317 318void pa_asyncq_write_after_poll(pa_asyncq *l) { 319 pa_assert(l); 320 321 if (l->waiting_for_post) { 322 pa_fdsem_after_poll(l->read_fdsem); 323 l->waiting_for_post = false; 324 } 325} 326