153a5a1b3Sopenharmony_ci/*** 253a5a1b3Sopenharmony_ci This file is part of PulseAudio. 353a5a1b3Sopenharmony_ci 453a5a1b3Sopenharmony_ci Copyright 2006-2008 Lennart Poettering 553a5a1b3Sopenharmony_ci 653a5a1b3Sopenharmony_ci PulseAudio is free software; you can redistribute it and/or modify 753a5a1b3Sopenharmony_ci it under the terms of the GNU Lesser General Public License as 853a5a1b3Sopenharmony_ci published by the Free Software Foundation; either version 2.1 of the 953a5a1b3Sopenharmony_ci License, or (at your option) any later version. 1053a5a1b3Sopenharmony_ci 1153a5a1b3Sopenharmony_ci PulseAudio is distributed in the hope that it will be useful, but 1253a5a1b3Sopenharmony_ci WITHOUT ANY WARRANTY; without even the implied warranty of 1353a5a1b3Sopenharmony_ci MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 1453a5a1b3Sopenharmony_ci Lesser General Public License for more details. 1553a5a1b3Sopenharmony_ci 1653a5a1b3Sopenharmony_ci You should have received a copy of the GNU Lesser General Public 1753a5a1b3Sopenharmony_ci License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 1853a5a1b3Sopenharmony_ci***/ 1953a5a1b3Sopenharmony_ci 2053a5a1b3Sopenharmony_ci#ifdef HAVE_CONFIG_H 2153a5a1b3Sopenharmony_ci#include <config.h> 2253a5a1b3Sopenharmony_ci#endif 2353a5a1b3Sopenharmony_ci 2453a5a1b3Sopenharmony_ci#include <unistd.h> 2553a5a1b3Sopenharmony_ci#include <errno.h> 2653a5a1b3Sopenharmony_ci 2753a5a1b3Sopenharmony_ci#include <pulse/xmalloc.h> 2853a5a1b3Sopenharmony_ci 2953a5a1b3Sopenharmony_ci#include <pulsecore/atomic.h> 3053a5a1b3Sopenharmony_ci#include <pulsecore/log.h> 3153a5a1b3Sopenharmony_ci#include <pulsecore/thread.h> 3253a5a1b3Sopenharmony_ci#include <pulsecore/macro.h> 3353a5a1b3Sopenharmony_ci#include <pulsecore/core-util.h> 3453a5a1b3Sopenharmony_ci#include <pulsecore/llist.h> 3553a5a1b3Sopenharmony_ci#include <pulsecore/flist.h> 3653a5a1b3Sopenharmony_ci#include <pulsecore/fdsem.h> 3753a5a1b3Sopenharmony_ci 3853a5a1b3Sopenharmony_ci#include "asyncq.h" 3953a5a1b3Sopenharmony_ci 4053a5a1b3Sopenharmony_ci#define ASYNCQ_SIZE 256 4153a5a1b3Sopenharmony_ci 4253a5a1b3Sopenharmony_ci/* For debugging purposes we can define _Y to put an extra thread 4353a5a1b3Sopenharmony_ci * yield between each operation. */ 4453a5a1b3Sopenharmony_ci 4553a5a1b3Sopenharmony_ci/* #define PROFILE */ 4653a5a1b3Sopenharmony_ci 4753a5a1b3Sopenharmony_ci#ifdef PROFILE 4853a5a1b3Sopenharmony_ci#define _Y pa_thread_yield() 4953a5a1b3Sopenharmony_ci#else 5053a5a1b3Sopenharmony_ci#define _Y do { } while(0) 5153a5a1b3Sopenharmony_ci#endif 5253a5a1b3Sopenharmony_ci 5353a5a1b3Sopenharmony_cistruct localq { 5453a5a1b3Sopenharmony_ci void *data; 5553a5a1b3Sopenharmony_ci PA_LLIST_FIELDS(struct localq); 5653a5a1b3Sopenharmony_ci}; 5753a5a1b3Sopenharmony_ci 5853a5a1b3Sopenharmony_cistruct pa_asyncq { 5953a5a1b3Sopenharmony_ci unsigned size; 6053a5a1b3Sopenharmony_ci unsigned read_idx; 6153a5a1b3Sopenharmony_ci unsigned write_idx; 6253a5a1b3Sopenharmony_ci pa_fdsem *read_fdsem, *write_fdsem; 6353a5a1b3Sopenharmony_ci 6453a5a1b3Sopenharmony_ci PA_LLIST_HEAD(struct localq, localq); 6553a5a1b3Sopenharmony_ci struct localq *last_localq; 6653a5a1b3Sopenharmony_ci bool waiting_for_post; 6753a5a1b3Sopenharmony_ci}; 6853a5a1b3Sopenharmony_ci 6953a5a1b3Sopenharmony_ciPA_STATIC_FLIST_DECLARE(localq, 0, pa_xfree); 7053a5a1b3Sopenharmony_ci 7153a5a1b3Sopenharmony_ci#define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq)))) 7253a5a1b3Sopenharmony_ci 7353a5a1b3Sopenharmony_cistatic unsigned reduce(pa_asyncq *l, unsigned value) { 7453a5a1b3Sopenharmony_ci return value & (unsigned) (l->size - 1); 7553a5a1b3Sopenharmony_ci} 7653a5a1b3Sopenharmony_ci 7753a5a1b3Sopenharmony_ciunsigned PaAsyncqGetNumToRead(pa_asyncq *l) 7853a5a1b3Sopenharmony_ci{ 7953a5a1b3Sopenharmony_ci return l->write_idx - l->read_idx; 8053a5a1b3Sopenharmony_ci} 8153a5a1b3Sopenharmony_ci 8253a5a1b3Sopenharmony_cipa_asyncq *pa_asyncq_new(unsigned size) { 8353a5a1b3Sopenharmony_ci pa_asyncq *l; 8453a5a1b3Sopenharmony_ci 8553a5a1b3Sopenharmony_ci if (!size) 8653a5a1b3Sopenharmony_ci size = ASYNCQ_SIZE; 8753a5a1b3Sopenharmony_ci 8853a5a1b3Sopenharmony_ci pa_assert(pa_is_power_of_two(size)); 8953a5a1b3Sopenharmony_ci 9053a5a1b3Sopenharmony_ci l = pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq)) + (sizeof(pa_atomic_ptr_t) * size)); 9153a5a1b3Sopenharmony_ci 9253a5a1b3Sopenharmony_ci l->size = size; 9353a5a1b3Sopenharmony_ci 9453a5a1b3Sopenharmony_ci PA_LLIST_HEAD_INIT(struct localq, l->localq); 9553a5a1b3Sopenharmony_ci l->last_localq = NULL; 9653a5a1b3Sopenharmony_ci l->waiting_for_post = false; 9753a5a1b3Sopenharmony_ci 9853a5a1b3Sopenharmony_ci if (!(l->read_fdsem = pa_fdsem_new())) { 9953a5a1b3Sopenharmony_ci pa_xfree(l); 10053a5a1b3Sopenharmony_ci return NULL; 10153a5a1b3Sopenharmony_ci } 10253a5a1b3Sopenharmony_ci 10353a5a1b3Sopenharmony_ci if (!(l->write_fdsem = pa_fdsem_new())) { 10453a5a1b3Sopenharmony_ci pa_fdsem_free(l->read_fdsem); 10553a5a1b3Sopenharmony_ci pa_xfree(l); 10653a5a1b3Sopenharmony_ci return NULL; 10753a5a1b3Sopenharmony_ci } 10853a5a1b3Sopenharmony_ci 10953a5a1b3Sopenharmony_ci return l; 11053a5a1b3Sopenharmony_ci} 11153a5a1b3Sopenharmony_ci 11253a5a1b3Sopenharmony_civoid pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) { 11353a5a1b3Sopenharmony_ci struct localq *q; 11453a5a1b3Sopenharmony_ci pa_assert(l); 11553a5a1b3Sopenharmony_ci 11653a5a1b3Sopenharmony_ci if (free_cb) { 11753a5a1b3Sopenharmony_ci void *p; 11853a5a1b3Sopenharmony_ci 11953a5a1b3Sopenharmony_ci while ((p = pa_asyncq_pop(l, 0))) 12053a5a1b3Sopenharmony_ci free_cb(p); 12153a5a1b3Sopenharmony_ci } 12253a5a1b3Sopenharmony_ci 12353a5a1b3Sopenharmony_ci while ((q = l->localq)) { 12453a5a1b3Sopenharmony_ci if (free_cb) 12553a5a1b3Sopenharmony_ci free_cb(q->data); 12653a5a1b3Sopenharmony_ci 12753a5a1b3Sopenharmony_ci PA_LLIST_REMOVE(struct localq, l->localq, q); 12853a5a1b3Sopenharmony_ci 12953a5a1b3Sopenharmony_ci if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0) 13053a5a1b3Sopenharmony_ci pa_xfree(q); 13153a5a1b3Sopenharmony_ci } 13253a5a1b3Sopenharmony_ci 13353a5a1b3Sopenharmony_ci pa_fdsem_free(l->read_fdsem); 13453a5a1b3Sopenharmony_ci pa_fdsem_free(l->write_fdsem); 13553a5a1b3Sopenharmony_ci pa_xfree(l); 13653a5a1b3Sopenharmony_ci} 13753a5a1b3Sopenharmony_ci 13853a5a1b3Sopenharmony_cistatic int push(pa_asyncq*l, void *p, bool wait_op) { 13953a5a1b3Sopenharmony_ci unsigned idx; 14053a5a1b3Sopenharmony_ci pa_atomic_ptr_t *cells; 14153a5a1b3Sopenharmony_ci 14253a5a1b3Sopenharmony_ci pa_assert(l); 14353a5a1b3Sopenharmony_ci pa_assert(p); 14453a5a1b3Sopenharmony_ci 14553a5a1b3Sopenharmony_ci cells = PA_ASYNCQ_CELLS(l); 14653a5a1b3Sopenharmony_ci 14753a5a1b3Sopenharmony_ci _Y; 14853a5a1b3Sopenharmony_ci idx = reduce(l, l->write_idx); 14953a5a1b3Sopenharmony_ci 15053a5a1b3Sopenharmony_ci if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) { 15153a5a1b3Sopenharmony_ci 15253a5a1b3Sopenharmony_ci if (!wait_op) 15353a5a1b3Sopenharmony_ci return -1; 15453a5a1b3Sopenharmony_ci 15553a5a1b3Sopenharmony_ci/* pa_log("sleeping on push"); */ 15653a5a1b3Sopenharmony_ci 15753a5a1b3Sopenharmony_ci do { 15853a5a1b3Sopenharmony_ci pa_fdsem_wait(l->read_fdsem); 15953a5a1b3Sopenharmony_ci } while (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)); 16053a5a1b3Sopenharmony_ci } 16153a5a1b3Sopenharmony_ci 16253a5a1b3Sopenharmony_ci _Y; 16353a5a1b3Sopenharmony_ci l->write_idx++; 16453a5a1b3Sopenharmony_ci 16553a5a1b3Sopenharmony_ci pa_fdsem_post(l->write_fdsem); 16653a5a1b3Sopenharmony_ci 16753a5a1b3Sopenharmony_ci return 0; 16853a5a1b3Sopenharmony_ci} 16953a5a1b3Sopenharmony_ci 17053a5a1b3Sopenharmony_cistatic bool flush_postq(pa_asyncq *l, bool wait_op) { 17153a5a1b3Sopenharmony_ci struct localq *q; 17253a5a1b3Sopenharmony_ci 17353a5a1b3Sopenharmony_ci pa_assert(l); 17453a5a1b3Sopenharmony_ci 17553a5a1b3Sopenharmony_ci while ((q = l->last_localq)) { 17653a5a1b3Sopenharmony_ci 17753a5a1b3Sopenharmony_ci if (push(l, q->data, wait_op) < 0) 17853a5a1b3Sopenharmony_ci return false; 17953a5a1b3Sopenharmony_ci 18053a5a1b3Sopenharmony_ci l->last_localq = q->prev; 18153a5a1b3Sopenharmony_ci 18253a5a1b3Sopenharmony_ci PA_LLIST_REMOVE(struct localq, l->localq, q); 18353a5a1b3Sopenharmony_ci 18453a5a1b3Sopenharmony_ci if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0) 18553a5a1b3Sopenharmony_ci pa_xfree(q); 18653a5a1b3Sopenharmony_ci } 18753a5a1b3Sopenharmony_ci 18853a5a1b3Sopenharmony_ci return true; 18953a5a1b3Sopenharmony_ci} 19053a5a1b3Sopenharmony_ci 19153a5a1b3Sopenharmony_ciint pa_asyncq_push(pa_asyncq*l, void *p, bool wait_op) { 19253a5a1b3Sopenharmony_ci pa_assert(l); 19353a5a1b3Sopenharmony_ci 19453a5a1b3Sopenharmony_ci if (!flush_postq(l, wait_op)) 19553a5a1b3Sopenharmony_ci return -1; 19653a5a1b3Sopenharmony_ci 19753a5a1b3Sopenharmony_ci return push(l, p, wait_op); 19853a5a1b3Sopenharmony_ci} 19953a5a1b3Sopenharmony_ci 20053a5a1b3Sopenharmony_civoid pa_asyncq_post(pa_asyncq*l, void *p) { 20153a5a1b3Sopenharmony_ci struct localq *q; 20253a5a1b3Sopenharmony_ci 20353a5a1b3Sopenharmony_ci pa_assert(l); 20453a5a1b3Sopenharmony_ci pa_assert(p); 20553a5a1b3Sopenharmony_ci 20653a5a1b3Sopenharmony_ci if (flush_postq(l, false)) 20753a5a1b3Sopenharmony_ci if (pa_asyncq_push(l, p, false) >= 0) 20853a5a1b3Sopenharmony_ci return; 20953a5a1b3Sopenharmony_ci 21053a5a1b3Sopenharmony_ci /* OK, we couldn't push anything in the queue. So let's queue it 21153a5a1b3Sopenharmony_ci * locally and push it later */ 21253a5a1b3Sopenharmony_ci 21353a5a1b3Sopenharmony_ci if (pa_log_ratelimit(PA_LOG_WARN)) 21453a5a1b3Sopenharmony_ci pa_log_warn("q overrun, queuing locally"); 21553a5a1b3Sopenharmony_ci 21653a5a1b3Sopenharmony_ci if (!(q = pa_flist_pop(PA_STATIC_FLIST_GET(localq)))) 21753a5a1b3Sopenharmony_ci q = pa_xnew(struct localq, 1); 21853a5a1b3Sopenharmony_ci 21953a5a1b3Sopenharmony_ci q->data = p; 22053a5a1b3Sopenharmony_ci PA_LLIST_PREPEND(struct localq, l->localq, q); 22153a5a1b3Sopenharmony_ci 22253a5a1b3Sopenharmony_ci if (!l->last_localq) 22353a5a1b3Sopenharmony_ci l->last_localq = q; 22453a5a1b3Sopenharmony_ci 22553a5a1b3Sopenharmony_ci return; 22653a5a1b3Sopenharmony_ci} 22753a5a1b3Sopenharmony_ci 22853a5a1b3Sopenharmony_civoid* pa_asyncq_pop(pa_asyncq*l, bool wait_op) { 22953a5a1b3Sopenharmony_ci unsigned idx; 23053a5a1b3Sopenharmony_ci void *ret; 23153a5a1b3Sopenharmony_ci pa_atomic_ptr_t *cells; 23253a5a1b3Sopenharmony_ci 23353a5a1b3Sopenharmony_ci pa_assert(l); 23453a5a1b3Sopenharmony_ci 23553a5a1b3Sopenharmony_ci cells = PA_ASYNCQ_CELLS(l); 23653a5a1b3Sopenharmony_ci 23753a5a1b3Sopenharmony_ci _Y; 23853a5a1b3Sopenharmony_ci idx = reduce(l, l->read_idx); 23953a5a1b3Sopenharmony_ci 24053a5a1b3Sopenharmony_ci if (!(ret = pa_atomic_ptr_load(&cells[idx]))) { 24153a5a1b3Sopenharmony_ci 24253a5a1b3Sopenharmony_ci if (!wait_op) 24353a5a1b3Sopenharmony_ci return NULL; 24453a5a1b3Sopenharmony_ci 24553a5a1b3Sopenharmony_ci/* pa_log("sleeping on pop"); */ 24653a5a1b3Sopenharmony_ci 24753a5a1b3Sopenharmony_ci do { 24853a5a1b3Sopenharmony_ci pa_fdsem_wait(l->write_fdsem); 24953a5a1b3Sopenharmony_ci } while (!(ret = pa_atomic_ptr_load(&cells[idx]))); 25053a5a1b3Sopenharmony_ci } 25153a5a1b3Sopenharmony_ci 25253a5a1b3Sopenharmony_ci pa_assert(ret); 25353a5a1b3Sopenharmony_ci 25453a5a1b3Sopenharmony_ci /* Guaranteed to succeed if we only have a single reader */ 25553a5a1b3Sopenharmony_ci pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL)); 25653a5a1b3Sopenharmony_ci 25753a5a1b3Sopenharmony_ci _Y; 25853a5a1b3Sopenharmony_ci l->read_idx++; 25953a5a1b3Sopenharmony_ci 26053a5a1b3Sopenharmony_ci pa_fdsem_post(l->read_fdsem); 26153a5a1b3Sopenharmony_ci 26253a5a1b3Sopenharmony_ci return ret; 26353a5a1b3Sopenharmony_ci} 26453a5a1b3Sopenharmony_ci 26553a5a1b3Sopenharmony_ciint pa_asyncq_read_fd(pa_asyncq *q) { 26653a5a1b3Sopenharmony_ci pa_assert(q); 26753a5a1b3Sopenharmony_ci 26853a5a1b3Sopenharmony_ci return pa_fdsem_get(q->write_fdsem); 26953a5a1b3Sopenharmony_ci} 27053a5a1b3Sopenharmony_ci 27153a5a1b3Sopenharmony_ciint pa_asyncq_read_before_poll(pa_asyncq *l) { 27253a5a1b3Sopenharmony_ci unsigned idx; 27353a5a1b3Sopenharmony_ci pa_atomic_ptr_t *cells; 27453a5a1b3Sopenharmony_ci 27553a5a1b3Sopenharmony_ci pa_assert(l); 27653a5a1b3Sopenharmony_ci 27753a5a1b3Sopenharmony_ci cells = PA_ASYNCQ_CELLS(l); 27853a5a1b3Sopenharmony_ci 27953a5a1b3Sopenharmony_ci _Y; 28053a5a1b3Sopenharmony_ci idx = reduce(l, l->read_idx); 28153a5a1b3Sopenharmony_ci 28253a5a1b3Sopenharmony_ci for (;;) { 28353a5a1b3Sopenharmony_ci if (pa_atomic_ptr_load(&cells[idx])) 28453a5a1b3Sopenharmony_ci return -1; 28553a5a1b3Sopenharmony_ci 28653a5a1b3Sopenharmony_ci if (pa_fdsem_before_poll(l->write_fdsem) >= 0) 28753a5a1b3Sopenharmony_ci return 0; 28853a5a1b3Sopenharmony_ci } 28953a5a1b3Sopenharmony_ci} 29053a5a1b3Sopenharmony_ci 29153a5a1b3Sopenharmony_civoid pa_asyncq_read_after_poll(pa_asyncq *l) { 29253a5a1b3Sopenharmony_ci pa_assert(l); 29353a5a1b3Sopenharmony_ci 29453a5a1b3Sopenharmony_ci pa_fdsem_after_poll(l->write_fdsem); 29553a5a1b3Sopenharmony_ci} 29653a5a1b3Sopenharmony_ci 29753a5a1b3Sopenharmony_ciint pa_asyncq_write_fd(pa_asyncq *q) { 29853a5a1b3Sopenharmony_ci pa_assert(q); 29953a5a1b3Sopenharmony_ci 30053a5a1b3Sopenharmony_ci return pa_fdsem_get(q->read_fdsem); 30153a5a1b3Sopenharmony_ci} 30253a5a1b3Sopenharmony_ci 30353a5a1b3Sopenharmony_civoid pa_asyncq_write_before_poll(pa_asyncq *l) { 30453a5a1b3Sopenharmony_ci pa_assert(l); 30553a5a1b3Sopenharmony_ci 30653a5a1b3Sopenharmony_ci for (;;) { 30753a5a1b3Sopenharmony_ci 30853a5a1b3Sopenharmony_ci if (flush_postq(l, false)) 30953a5a1b3Sopenharmony_ci break; 31053a5a1b3Sopenharmony_ci 31153a5a1b3Sopenharmony_ci if (pa_fdsem_before_poll(l->read_fdsem) >= 0) { 31253a5a1b3Sopenharmony_ci l->waiting_for_post = true; 31353a5a1b3Sopenharmony_ci break; 31453a5a1b3Sopenharmony_ci } 31553a5a1b3Sopenharmony_ci } 31653a5a1b3Sopenharmony_ci} 31753a5a1b3Sopenharmony_ci 31853a5a1b3Sopenharmony_civoid pa_asyncq_write_after_poll(pa_asyncq *l) { 31953a5a1b3Sopenharmony_ci pa_assert(l); 32053a5a1b3Sopenharmony_ci 32153a5a1b3Sopenharmony_ci if (l->waiting_for_post) { 32253a5a1b3Sopenharmony_ci pa_fdsem_after_poll(l->read_fdsem); 32353a5a1b3Sopenharmony_ci l->waiting_for_post = false; 32453a5a1b3Sopenharmony_ci } 32553a5a1b3Sopenharmony_ci} 326