1/*** 2 This file is part of PulseAudio. 3 4 Copyright 2006 Lennart Poettering 5 6 PulseAudio is free software; you can redistribute it and/or modify 7 it under the terms of the GNU Lesser General Public License as 8 published by the Free Software Foundation; either version 2.1 of the 9 License, or (at your option) any later version. 10 11 PulseAudio is distributed in the hope that it will be useful, but 12 WITHOUT ANY WARRANTY; without even the implied warranty of 13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 Lesser General Public License for more details. 15 16 You should have received a copy of the GNU Lesser General Public 17 License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 18***/ 19 20#ifdef HAVE_CONFIG_H 21#include <config.h> 22#endif 23 24#include <unistd.h> 25#include <errno.h> 26 27#include <pulsecore/atomic.h> 28#include <pulsecore/log.h> 29#include <pulsecore/thread.h> 30#include <pulsecore/macro.h> 31#include <pulsecore/core-util.h> 32#include <pulse/xmalloc.h> 33 34#include "fdsem.h" 35 36/* For debugging purposes we can define _Y to put and extra thread 37 * yield between each operation. */ 38 39/* #define PROFILE */ 40 41#ifdef PROFILE 42#define _Y pa_thread_yield() 43#else 44#define _Y do { } while(0) 45#endif 46 47struct pa_shmasyncq { 48 pa_fdsem *read_fdsem, *write_fdsem; 49 pa_shmasyncq_data *data; 50}; 51 52static int is_power_of_two(unsigned size) { 53 return !(size & (size - 1)); 54} 55 56static int reduce(pa_shmasyncq *l, int value) { 57 return value & (unsigned) (l->n_elements - 1); 58} 59 60static pa_atomic_t* get_cell(pa_shmasyncq *l, unsigned i) { 61 pa_assert(i < l->data->n_elements); 62 63 return (pa_atomic_t*) ((uint8*t) l->data + PA_ALIGN(sizeof(pa_shmasyncq_data)) + i * (PA_ALIGN(sizeof(pa_atomic_t)) + PA_ALIGN(element_size))) 64} 65 66static void *get_cell_data(pa_atomic_t *a) { 67 return (uint8_t*) a + PA_ALIGN(sizeof(atomic_t)); 68} 69 70pa_shmasyncq *pa_shmasyncq_new(unsigned n_elements, size_t element_size, void *data, int fd[2]) { 71 pa_shmasyncq *l; 72 73 pa_assert(n_elements > 0); 74 pa_assert(is_power_of_two(n_elements)); 75 pa_assert(element_size > 0); 76 pa_assert(data); 77 pa_assert(fd); 78 79 l = pa_xnew(pa_shmasyncq, 1); 80 81 l->data = data; 82 memset(data, 0, PA_SHMASYNCQ_SIZE(n_elements, element_size)); 83 84 l->data->n_elements = n_elements; 85 l->data->element_size = element_size; 86 87 if (!(l->read_fdsem = pa_fdsem_new_shm(&d->read_fdsem_data))) { 88 pa_xfree(l); 89 return NULL; 90 } 91 fd[0] = pa_fdsem_get(l->read_fdsem); 92 93 if (!(l->write_fdsem = pa_fdsem_new(&d->write_fdsem_data, &fd[1]))) { 94 pa_fdsem_free(l->read_fdsem); 95 pa_xfree(l); 96 return NULL; 97 } 98 99 return l; 100} 101 102void pa_shmasyncq_free(pa_shmasyncq *l, pa_free_cb_t free_cb) { 103 pa_assert(l); 104 105 if (free_cb) { 106 void *p; 107 108 while ((p = pa_shmasyncq_pop(l, 0))) 109 free_cb(p); 110 } 111 112 pa_fdsem_free(l->read_fdsem); 113 pa_fdsem_free(l->write_fdsem); 114 pa_xfree(l); 115} 116 117int pa_shmasyncq_push(pa_shmasyncq*l, void *p, int wait) { 118 int idx; 119 pa_atomic_ptr_t *cells; 120 121 pa_assert(l); 122 pa_assert(p); 123 124 cells = PA_SHMASYNCQ_CELLS(l); 125 126 _Y; 127 idx = reduce(l, l->write_idx); 128 129 if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) { 130 131 if (!wait) 132 return -1; 133 134/* pa_log("sleeping on push"); */ 135 136 do { 137 pa_fdsem_wait(l->read_fdsem); 138 } while (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)); 139 } 140 141 _Y; 142 l->write_idx++; 143 144 pa_fdsem_post(l->write_fdsem); 145 146 return 0; 147} 148 149void* pa_shmasyncq_pop(pa_shmasyncq*l, int wait) { 150 int idx; 151 void *ret; 152 pa_atomic_ptr_t *cells; 153 154 pa_assert(l); 155 156 cells = PA_SHMASYNCQ_CELLS(l); 157 158 _Y; 159 idx = reduce(l, l->read_idx); 160 161 if (!(ret = pa_atomic_ptr_load(&cells[idx]))) { 162 163 if (!wait) 164 return NULL; 165 166/* pa_log("sleeping on pop"); */ 167 168 do { 169 pa_fdsem_wait(l->write_fdsem); 170 } while (!(ret = pa_atomic_ptr_load(&cells[idx]))); 171 } 172 173 pa_assert(ret); 174 175 /* Guaranteed to succeed if we only have a single reader */ 176 pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL)); 177 178 _Y; 179 l->read_idx++; 180 181 pa_fdsem_post(l->read_fdsem); 182 183 return ret; 184} 185 186int pa_shmasyncq_get_fd(pa_shmasyncq *q) { 187 pa_assert(q); 188 189 return pa_fdsem_get(q->write_fdsem); 190} 191 192int pa_shmasyncq_before_poll(pa_shmasyncq *l) { 193 int idx; 194 pa_atomic_ptr_t *cells; 195 196 pa_assert(l); 197 198 cells = PA_SHMASYNCQ_CELLS(l); 199 200 _Y; 201 idx = reduce(l, l->read_idx); 202 203 for (;;) { 204 if (pa_atomic_ptr_load(&cells[idx])) 205 return -1; 206 207 if (pa_fdsem_before_poll(l->write_fdsem) >= 0) 208 return 0; 209 } 210 211 return 0; 212} 213 214void pa_shmasyncq_after_poll(pa_shmasyncq *l) { 215 pa_assert(l); 216 217 pa_fdsem_after_poll(l->write_fdsem); 218} 219