1/***
2  This file is part of PulseAudio.
3
4  Copyright 2006 Lennart Poettering
5
6  PulseAudio is free software; you can redistribute it and/or modify
7  it under the terms of the GNU Lesser General Public License as
8  published by the Free Software Foundation; either version 2.1 of the
9  License, or (at your option) any later version.
10
11  PulseAudio is distributed in the hope that it will be useful, but
12  WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  Lesser General Public License for more details.
15
16  You should have received a copy of the GNU Lesser General Public
17  License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18***/
19
20#ifdef HAVE_CONFIG_H
21#include <config.h>
22#endif
23
24#include <unistd.h>
25#include <errno.h>
26
27#include <pulsecore/atomic.h>
28#include <pulsecore/log.h>
29#include <pulsecore/thread.h>
30#include <pulsecore/macro.h>
31#include <pulsecore/core-util.h>
32#include <pulse/xmalloc.h>
33
34#include "fdsem.h"
35
36/* For debugging purposes we can define _Y to put and extra thread
37 * yield between each operation. */
38
39/* #define PROFILE */
40
41#ifdef PROFILE
42#define _Y pa_thread_yield()
43#else
44#define _Y do { } while(0)
45#endif
46
47struct pa_shmasyncq {
48    pa_fdsem *read_fdsem, *write_fdsem;
49    pa_shmasyncq_data *data;
50};
51
52static int is_power_of_two(unsigned size) {
53    return !(size & (size - 1));
54}
55
56static int reduce(pa_shmasyncq *l, int value) {
57    return value & (unsigned) (l->n_elements - 1);
58}
59
60static pa_atomic_t* get_cell(pa_shmasyncq *l, unsigned i) {
61    pa_assert(i < l->data->n_elements);
62
63    return (pa_atomic_t*) ((uint8*t) l->data + PA_ALIGN(sizeof(pa_shmasyncq_data)) + i * (PA_ALIGN(sizeof(pa_atomic_t)) + PA_ALIGN(element_size)))
64}
65
66static void *get_cell_data(pa_atomic_t *a) {
67    return (uint8_t*) a + PA_ALIGN(sizeof(atomic_t));
68}
69
70pa_shmasyncq *pa_shmasyncq_new(unsigned n_elements, size_t element_size, void *data, int fd[2]) {
71    pa_shmasyncq *l;
72
73    pa_assert(n_elements > 0);
74    pa_assert(is_power_of_two(n_elements));
75    pa_assert(element_size > 0);
76    pa_assert(data);
77    pa_assert(fd);
78
79    l = pa_xnew(pa_shmasyncq, 1);
80
81    l->data = data;
82    memset(data, 0, PA_SHMASYNCQ_SIZE(n_elements, element_size));
83
84    l->data->n_elements = n_elements;
85    l->data->element_size = element_size;
86
87    if (!(l->read_fdsem = pa_fdsem_new_shm(&d->read_fdsem_data))) {
88        pa_xfree(l);
89        return NULL;
90    }
91    fd[0] = pa_fdsem_get(l->read_fdsem);
92
93    if (!(l->write_fdsem = pa_fdsem_new(&d->write_fdsem_data, &fd[1]))) {
94        pa_fdsem_free(l->read_fdsem);
95        pa_xfree(l);
96        return NULL;
97    }
98
99    return l;
100}
101
102void pa_shmasyncq_free(pa_shmasyncq *l, pa_free_cb_t free_cb) {
103    pa_assert(l);
104
105    if (free_cb) {
106        void *p;
107
108        while ((p = pa_shmasyncq_pop(l, 0)))
109            free_cb(p);
110    }
111
112    pa_fdsem_free(l->read_fdsem);
113    pa_fdsem_free(l->write_fdsem);
114    pa_xfree(l);
115}
116
117int pa_shmasyncq_push(pa_shmasyncq*l, void *p, int wait) {
118    int idx;
119    pa_atomic_ptr_t *cells;
120
121    pa_assert(l);
122    pa_assert(p);
123
124    cells = PA_SHMASYNCQ_CELLS(l);
125
126    _Y;
127    idx = reduce(l, l->write_idx);
128
129    if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) {
130
131        if (!wait)
132            return -1;
133
134/*         pa_log("sleeping on push"); */
135
136        do {
137            pa_fdsem_wait(l->read_fdsem);
138        } while (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p));
139    }
140
141    _Y;
142    l->write_idx++;
143
144    pa_fdsem_post(l->write_fdsem);
145
146    return 0;
147}
148
149void* pa_shmasyncq_pop(pa_shmasyncq*l, int wait) {
150    int idx;
151    void *ret;
152    pa_atomic_ptr_t *cells;
153
154    pa_assert(l);
155
156    cells = PA_SHMASYNCQ_CELLS(l);
157
158    _Y;
159    idx = reduce(l, l->read_idx);
160
161    if (!(ret = pa_atomic_ptr_load(&cells[idx]))) {
162
163        if (!wait)
164            return NULL;
165
166/*         pa_log("sleeping on pop"); */
167
168        do {
169            pa_fdsem_wait(l->write_fdsem);
170        } while (!(ret = pa_atomic_ptr_load(&cells[idx])));
171    }
172
173    pa_assert(ret);
174
175    /* Guaranteed to succeed if we only have a single reader */
176    pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL));
177
178    _Y;
179    l->read_idx++;
180
181    pa_fdsem_post(l->read_fdsem);
182
183    return ret;
184}
185
186int pa_shmasyncq_get_fd(pa_shmasyncq *q) {
187    pa_assert(q);
188
189    return pa_fdsem_get(q->write_fdsem);
190}
191
192int pa_shmasyncq_before_poll(pa_shmasyncq *l) {
193    int idx;
194    pa_atomic_ptr_t *cells;
195
196    pa_assert(l);
197
198    cells = PA_SHMASYNCQ_CELLS(l);
199
200    _Y;
201    idx = reduce(l, l->read_idx);
202
203    for (;;) {
204        if (pa_atomic_ptr_load(&cells[idx]))
205            return -1;
206
207        if (pa_fdsem_before_poll(l->write_fdsem) >= 0)
208            return 0;
209    }
210
211    return 0;
212}
213
214void pa_shmasyncq_after_poll(pa_shmasyncq *l) {
215    pa_assert(l);
216
217    pa_fdsem_after_poll(l->write_fdsem);
218}
219