1/*** 2 This file is part of PulseAudio. 3 4 Copyright 2014 David Henningsson, Canonical Ltd. 5 6 PulseAudio is free software; you can redistribute it and/or modify 7 it under the terms of the GNU Lesser General Public License as 8 published by the Free Software Foundation; either version 2.1 of the 9 License, or (at your option) any later version. 10 11 PulseAudio is distributed in the hope that it will be useful, but 12 WITHOUT ANY WARRANTY; without even the implied warranty of 13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 Lesser General Public License for more details. 15 16 You should have received a copy of the GNU Lesser General Public 17 License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 18***/ 19 20#ifdef HAVE_CONFIG_H 21#include <config.h> 22#endif 23 24#include "srbchannel.h" 25 26#include <pulsecore/atomic.h> 27#include <pulse/xmalloc.h> 28 29/* #define DEBUG_SRBCHANNEL */ 30 31/* This ringbuffer might be useful in other contexts too, but 32 * right now it's only used inside the srbchannel, so let's keep it here 33 * for the time being. */ 34typedef struct pa_ringbuffer pa_ringbuffer; 35 36struct pa_ringbuffer { 37 pa_atomic_t *count; /* amount of data in the buffer */ 38 int capacity; 39 uint8_t *memory; 40 int readindex, writeindex; 41}; 42 43static void *pa_ringbuffer_peek(pa_ringbuffer *r, int *count) { 44 int c = pa_atomic_load(r->count); 45 46 if (r->readindex + c > r->capacity) 47 *count = r->capacity - r->readindex; 48 else 49 *count = c; 50 51 return r->memory + r->readindex; 52} 53 54/* Returns true only if the buffer was completely full before the drop. */ 55static bool pa_ringbuffer_drop(pa_ringbuffer *r, int count) { 56 bool b = pa_atomic_sub(r->count, count) >= r->capacity; 57 58 r->readindex += count; 59 r->readindex %= r->capacity; 60 61 return b; 62} 63 64static void *pa_ringbuffer_begin_write(pa_ringbuffer *r, int *count) { 65 int c = pa_atomic_load(r->count); 66 67 *count = PA_MIN(r->capacity - r->writeindex, r->capacity - c); 68 69 return r->memory + r->writeindex; 70} 71 72static void pa_ringbuffer_end_write(pa_ringbuffer *r, int count) { 73 pa_atomic_add(r->count, count); 74 r->writeindex += count; 75 r->writeindex %= r->capacity; 76} 77 78struct pa_srbchannel { 79 pa_ringbuffer rb_read, rb_write; 80 pa_fdsem *sem_read, *sem_write; 81 pa_memblock *memblock; 82 83 void *cb_userdata; 84 pa_srbchannel_cb_t callback; 85 86 pa_io_event *read_event; 87 pa_defer_event *defer_event; 88 pa_mainloop_api *mainloop; 89}; 90 91/* We always listen to sem_read, and always signal on sem_write. 92 * 93 * This means we signal the same semaphore for two scenarios: 94 * 1) We have written something to our send buffer, and want the other 95 * side to read it 96 * 2) We have read something from our receive buffer that was previously 97 * completely full, and want the other side to continue writing 98*/ 99 100size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l) { 101 size_t written = 0; 102 103 while (l > 0) { 104 int towrite; 105 void *ptr = pa_ringbuffer_begin_write(&sr->rb_write, &towrite); 106 107 if ((size_t) towrite > l) 108 towrite = l; 109 110 if (towrite == 0) { 111#ifdef DEBUG_SRBCHANNEL 112 pa_log("srbchannel output buffer full"); 113#endif 114 break; 115 } 116 117 memcpy(ptr, data, towrite); 118 pa_ringbuffer_end_write(&sr->rb_write, towrite); 119 written += towrite; 120 data = (uint8_t*) data + towrite; 121 l -= towrite; 122 } 123#ifdef DEBUG_SRBCHANNEL 124 pa_log("Wrote %d bytes to srbchannel, signalling fdsem", (int) written); 125#endif 126 127 pa_fdsem_post(sr->sem_write); 128 return written; 129} 130 131size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l) { 132 size_t isread = 0; 133 134 while (l > 0) { 135 int toread; 136 void *ptr = pa_ringbuffer_peek(&sr->rb_read, &toread); 137 138 if ((size_t) toread > l) 139 toread = l; 140 141 if (toread == 0) 142 break; 143 144 memcpy(data, ptr, toread); 145 146 if (pa_ringbuffer_drop(&sr->rb_read, toread)) { 147#ifdef DEBUG_SRBCHANNEL 148 pa_log("Read from full output buffer, signalling fdsem"); 149#endif 150 pa_fdsem_post(sr->sem_write); 151 } 152 153 isread += toread; 154 data = (uint8_t*) data + toread; 155 l -= toread; 156 } 157 158#ifdef DEBUG_SRBCHANNEL 159 pa_log("Read %d bytes from srbchannel", (int) isread); 160#endif 161 162 return isread; 163} 164 165/* This is the memory layout of the ringbuffer shm block. It is followed by 166 read and write ringbuffer memory. */ 167struct srbheader { 168 pa_atomic_t read_count; 169 pa_atomic_t write_count; 170 171 pa_fdsem_data read_semdata; 172 pa_fdsem_data write_semdata; 173 174 int capacity; 175 int readbuf_offset; 176 int writebuf_offset; 177 178 /* TODO: Maybe a marker here to make sure we talk to a server with equally sized struct */ 179}; 180 181static void srbchannel_rwloop(pa_srbchannel* sr) { 182 do { 183#ifdef DEBUG_SRBCHANNEL 184 int q; 185 pa_ringbuffer_peek(&sr->rb_read, &q); 186 pa_log("In rw loop from srbchannel, before callback, count = %d", q); 187#endif 188 189 if (sr->callback) { 190 if (!sr->callback(sr, sr->cb_userdata)) { 191#ifdef DEBUG_SRBCHANNEL 192 pa_log("Aborting read loop from srbchannel"); 193#endif 194 return; 195 } 196 } 197 198#ifdef DEBUG_SRBCHANNEL 199 pa_ringbuffer_peek(&sr->rb_read, &q); 200 pa_log("In rw loop from srbchannel, after callback, count = %d", q); 201#endif 202 203 } while (pa_fdsem_before_poll(sr->sem_read) < 0); 204} 205 206static void semread_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t events, void *userdata) { 207 pa_srbchannel* sr = userdata; 208 209 pa_fdsem_after_poll(sr->sem_read); 210 srbchannel_rwloop(sr); 211} 212 213static void defer_cb(pa_mainloop_api *m, pa_defer_event *e, void *userdata) { 214 pa_srbchannel* sr = userdata; 215 216#ifdef DEBUG_SRBCHANNEL 217 pa_log("Calling rw loop from deferred event"); 218#endif 219 220 m->defer_enable(e, 0); 221 srbchannel_rwloop(sr); 222} 223 224pa_srbchannel* pa_srbchannel_new(pa_mainloop_api *m, pa_mempool *p) { 225 int capacity; 226 int readfd; 227 struct srbheader *srh; 228 229 pa_srbchannel* sr = pa_xmalloc0(sizeof(pa_srbchannel)); 230 sr->mainloop = m; 231 sr->memblock = pa_memblock_new_pool(p, -1); 232 if (!sr->memblock) 233 goto fail; 234 235 srh = pa_memblock_acquire(sr->memblock); 236 pa_zero(*srh); 237 238 sr->rb_read.memory = (uint8_t*) srh + PA_ALIGN(sizeof(*srh)); 239 srh->readbuf_offset = sr->rb_read.memory - (uint8_t*) srh; 240 241 capacity = (pa_memblock_get_length(sr->memblock) - srh->readbuf_offset) / 2; 242 243 sr->rb_write.memory = PA_ALIGN_PTR(sr->rb_read.memory + capacity); 244 srh->writebuf_offset = sr->rb_write.memory - (uint8_t*) srh; 245 246 capacity = PA_MIN(capacity, srh->writebuf_offset - srh->readbuf_offset); 247 248 pa_log_debug("SHM block is %d bytes, ringbuffer capacity is 2 * %d bytes", 249 (int) pa_memblock_get_length(sr->memblock), capacity); 250 251 srh->capacity = sr->rb_read.capacity = sr->rb_write.capacity = capacity; 252 253 sr->rb_read.count = &srh->read_count; 254 sr->rb_write.count = &srh->write_count; 255 256 sr->sem_read = pa_fdsem_new_shm(&srh->read_semdata); 257 if (!sr->sem_read) 258 goto fail; 259 260 sr->sem_write = pa_fdsem_new_shm(&srh->write_semdata); 261 if (!sr->sem_write) 262 goto fail; 263 264 readfd = pa_fdsem_get(sr->sem_read); 265 266#ifdef DEBUG_SRBCHANNEL 267 pa_log("Enabling io event on fd %d", readfd); 268#endif 269 270 sr->read_event = m->io_new(m, readfd, PA_IO_EVENT_INPUT, semread_cb, sr); 271 m->io_enable(sr->read_event, PA_IO_EVENT_INPUT); 272 273 return sr; 274 275fail: 276 pa_srbchannel_free(sr); 277 278 return NULL; 279} 280 281static void pa_srbchannel_swap(pa_srbchannel *sr) { 282 pa_srbchannel temp = *sr; 283 284 sr->sem_read = temp.sem_write; 285 sr->sem_write = temp.sem_read; 286 sr->rb_read = temp.rb_write; 287 sr->rb_write = temp.rb_read; 288} 289 290pa_srbchannel* pa_srbchannel_new_from_template(pa_mainloop_api *m, pa_srbchannel_template *t) 291{ 292 int temp; 293 struct srbheader *srh; 294 pa_srbchannel* sr = pa_xmalloc0(sizeof(pa_srbchannel)); 295 296 sr->mainloop = m; 297 sr->memblock = t->memblock; 298 pa_memblock_ref(sr->memblock); 299 srh = pa_memblock_acquire(sr->memblock); 300 301 sr->rb_read.capacity = sr->rb_write.capacity = srh->capacity; 302 sr->rb_read.count = &srh->read_count; 303 sr->rb_write.count = &srh->write_count; 304 305 sr->rb_read.memory = (uint8_t*) srh + srh->readbuf_offset; 306 sr->rb_write.memory = (uint8_t*) srh + srh->writebuf_offset; 307 308 sr->sem_read = pa_fdsem_open_shm(&srh->read_semdata, t->readfd); 309 if (!sr->sem_read) 310 goto fail; 311 312 sr->sem_write = pa_fdsem_open_shm(&srh->write_semdata, t->writefd); 313 if (!sr->sem_write) 314 goto fail; 315 316 pa_srbchannel_swap(sr); 317 temp = t->readfd; t->readfd = t->writefd; t->writefd = temp; 318 319#ifdef DEBUG_SRBCHANNEL 320 pa_log("Enabling io event on fd %d", t->readfd); 321#endif 322 323 sr->read_event = m->io_new(m, t->readfd, PA_IO_EVENT_INPUT, semread_cb, sr); 324 m->io_enable(sr->read_event, PA_IO_EVENT_INPUT); 325 326 return sr; 327 328fail: 329 pa_srbchannel_free(sr); 330 331 return NULL; 332} 333 334void pa_srbchannel_export(pa_srbchannel *sr, pa_srbchannel_template *t) { 335 t->memblock = sr->memblock; 336 t->readfd = pa_fdsem_get(sr->sem_read); 337 t->writefd = pa_fdsem_get(sr->sem_write); 338} 339 340void pa_srbchannel_set_callback(pa_srbchannel *sr, pa_srbchannel_cb_t callback, void *userdata) { 341 if (sr->callback) 342 pa_fdsem_after_poll(sr->sem_read); 343 344 sr->callback = callback; 345 sr->cb_userdata = userdata; 346 347 if (sr->callback) { 348 /* If there are events to be read already in the ringbuffer, we will not get any IO event for that, 349 because that's how pa_fdsem works. Therefore check the ringbuffer in a defer event instead. */ 350 if (!sr->defer_event) 351 sr->defer_event = sr->mainloop->defer_new(sr->mainloop, defer_cb, sr); 352 sr->mainloop->defer_enable(sr->defer_event, 1); 353 } 354} 355 356void pa_srbchannel_free(pa_srbchannel *sr) 357{ 358#ifdef DEBUG_SRBCHANNEL 359 pa_log("Freeing srbchannel"); 360#endif 361 pa_assert(sr); 362 363 if (sr->defer_event) 364 sr->mainloop->defer_free(sr->defer_event); 365 if (sr->read_event) 366 sr->mainloop->io_free(sr->read_event); 367 368 if (sr->sem_read) 369 pa_fdsem_free(sr->sem_read); 370 if (sr->sem_write) 371 pa_fdsem_free(sr->sem_write); 372 373 if (sr->memblock) { 374 pa_memblock_release(sr->memblock); 375 pa_memblock_unref(sr->memblock); 376 } 377 378 pa_xfree(sr); 379} 380