1/***
2  This file is part of PulseAudio.
3
4  Copyright 2004-2006 Lennart Poettering
5
6  PulseAudio is free software; you can redistribute it and/or modify
7  it under the terms of the GNU Lesser General Public License as published
8  by the Free Software Foundation; either version 2.1 of the License,
9  or (at your option) any later version.
10
11  PulseAudio is distributed in the hope that it will be useful, but
12  WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  General Public License for more details.
15
16  You should have received a copy of the GNU Lesser General Public License
17  along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18***/
19
20#ifdef HAVE_CONFIG_H
21#include <config.h>
22#endif
23
24#include <errno.h>
25#include <stdio.h>
26#include <stdlib.h>
27#include <string.h>
28
29#include <pulse/xmalloc.h>
30
31#include <pulsecore/socket.h>
32#include <pulsecore/core-error.h>
33#include <pulsecore/core-util.h>
34#include <pulsecore/log.h>
35#include <pulsecore/macro.h>
36#include <pulsecore/refcnt.h>
37
38#include "ioline.h"
39
40#define BUFFER_LIMIT (64*1024)
41#define READ_SIZE (1024)
42
43struct pa_ioline {
44    PA_REFCNT_DECLARE;
45
46    pa_iochannel *io;
47    pa_defer_event *defer_event;
48    pa_mainloop_api *mainloop;
49
50    char *wbuf;
51    size_t wbuf_length, wbuf_index, wbuf_valid_length;
52
53    char *rbuf;
54    size_t rbuf_length, rbuf_index, rbuf_valid_length;
55
56    pa_ioline_cb_t callback;
57    void *userdata;
58
59    pa_ioline_drain_cb_t drain_callback;
60    void *drain_userdata;
61
62    bool dead:1;
63    bool defer_close:1;
64};
65
66static void io_callback(pa_iochannel*io, void *userdata);
67static void defer_callback(pa_mainloop_api*m, pa_defer_event*e, void *userdata);
68
69pa_ioline* pa_ioline_new(pa_iochannel *io) {
70    pa_ioline *l;
71    pa_assert(io);
72
73    l = pa_xnew(pa_ioline, 1);
74    PA_REFCNT_INIT(l);
75    l->io = io;
76
77    l->wbuf = NULL;
78    l->wbuf_length = l->wbuf_index = l->wbuf_valid_length = 0;
79
80    l->rbuf = NULL;
81    l->rbuf_length = l->rbuf_index = l->rbuf_valid_length = 0;
82
83    l->callback = NULL;
84    l->userdata = NULL;
85
86    l->drain_callback = NULL;
87    l->drain_userdata = NULL;
88
89    l->mainloop = pa_iochannel_get_mainloop_api(io);
90
91    l->defer_event = l->mainloop->defer_new(l->mainloop, defer_callback, l);
92    l->mainloop->defer_enable(l->defer_event, 0);
93
94    l->dead = false;
95    l->defer_close = false;
96
97    pa_iochannel_set_callback(io, io_callback, l);
98
99    return l;
100}
101
102static void ioline_free(pa_ioline *l) {
103    pa_assert(l);
104
105    if (l->io)
106        pa_iochannel_free(l->io);
107
108    if (l->defer_event)
109        l->mainloop->defer_free(l->defer_event);
110
111    pa_xfree(l->wbuf);
112    pa_xfree(l->rbuf);
113    pa_xfree(l);
114}
115
116void pa_ioline_unref(pa_ioline *l) {
117    pa_assert(l);
118    pa_assert(PA_REFCNT_VALUE(l) >= 1);
119
120    if (PA_REFCNT_DEC(l) <= 0)
121        ioline_free(l);
122}
123
124pa_ioline* pa_ioline_ref(pa_ioline *l) {
125    pa_assert(l);
126    pa_assert(PA_REFCNT_VALUE(l) >= 1);
127
128    PA_REFCNT_INC(l);
129    return l;
130}
131
132void pa_ioline_close(pa_ioline *l) {
133    pa_assert(l);
134    pa_assert(PA_REFCNT_VALUE(l) >= 1);
135
136    l->dead = true;
137
138    if (l->io) {
139        pa_iochannel_free(l->io);
140        l->io = NULL;
141    }
142
143    if (l->defer_event) {
144        l->mainloop->defer_free(l->defer_event);
145        l->defer_event = NULL;
146    }
147
148    if (l->callback)
149        l->callback = NULL;
150}
151
152void pa_ioline_puts(pa_ioline *l, const char *c) {
153    size_t len;
154
155    pa_assert(l);
156    pa_assert(PA_REFCNT_VALUE(l) >= 1);
157    pa_assert(c);
158
159    if (l->dead)
160        return;
161
162    len = strlen(c);
163    if (len > BUFFER_LIMIT - l->wbuf_valid_length)
164        len = BUFFER_LIMIT - l->wbuf_valid_length;
165
166    if (len) {
167        pa_assert(l->wbuf_length >= l->wbuf_valid_length);
168
169        /* In case the allocated buffer is too small, enlarge it. */
170        if (l->wbuf_valid_length + len > l->wbuf_length) {
171            size_t n = l->wbuf_valid_length+len;
172            char *new = pa_xnew(char, (unsigned) n);
173
174            if (l->wbuf) {
175                memcpy(new, l->wbuf+l->wbuf_index, l->wbuf_valid_length);
176                pa_xfree(l->wbuf);
177            }
178
179            l->wbuf = new;
180            l->wbuf_length = n;
181            l->wbuf_index = 0;
182        } else if (l->wbuf_index + l->wbuf_valid_length + len > l->wbuf_length) {
183
184            /* In case the allocated buffer fits, but the current index is too far from the start, move it to the front. */
185            memmove(l->wbuf, l->wbuf+l->wbuf_index, l->wbuf_valid_length);
186            l->wbuf_index = 0;
187        }
188
189        pa_assert(l->wbuf_index + l->wbuf_valid_length + len <= l->wbuf_length);
190
191        /* Append the new string */
192        memcpy(l->wbuf + l->wbuf_index + l->wbuf_valid_length, c, len);
193        l->wbuf_valid_length += len;
194
195        l->mainloop->defer_enable(l->defer_event, 1);
196    }
197}
198
199void pa_ioline_set_callback(pa_ioline*l, pa_ioline_cb_t callback, void *userdata) {
200    pa_assert(l);
201    pa_assert(PA_REFCNT_VALUE(l) >= 1);
202
203    if (l->dead)
204        return;
205
206    l->callback = callback;
207    l->userdata = userdata;
208}
209
210void pa_ioline_set_drain_callback(pa_ioline*l, pa_ioline_drain_cb_t callback, void *userdata) {
211    pa_assert(l);
212    pa_assert(PA_REFCNT_VALUE(l) >= 1);
213
214    if (l->dead)
215        return;
216
217    l->drain_callback = callback;
218    l->drain_userdata = userdata;
219}
220
221static void failure(pa_ioline *l, bool process_leftover) {
222    pa_assert(l);
223    pa_assert(PA_REFCNT_VALUE(l) >= 1);
224    pa_assert(!l->dead);
225
226    if (process_leftover && l->rbuf_valid_length > 0) {
227        /* Pass the last missing bit to the client */
228
229        if (l->callback) {
230            char *p = pa_xstrndup(l->rbuf+l->rbuf_index, l->rbuf_valid_length);
231            l->callback(l, p, l->userdata);
232            pa_xfree(p);
233        }
234    }
235
236    if (l->callback) {
237        l->callback(l, NULL, l->userdata);
238        l->callback = NULL;
239    }
240
241    pa_ioline_close(l);
242}
243
244static void scan_for_lines(pa_ioline *l, size_t skip) {
245    pa_assert(l);
246    pa_assert(PA_REFCNT_VALUE(l) >= 1);
247    pa_assert(skip < l->rbuf_valid_length);
248
249    while (!l->dead && l->rbuf_valid_length > skip) {
250        char *e, *p;
251        size_t m;
252
253        if (!(e = memchr(l->rbuf + l->rbuf_index + skip, '\n', l->rbuf_valid_length - skip)))
254            break;
255
256        *e = 0;
257
258        p = l->rbuf + l->rbuf_index;
259        m = strlen(p);
260
261        l->rbuf_index += m+1;
262        l->rbuf_valid_length -= m+1;
263
264        /* A shortcut for the next time */
265        if (l->rbuf_valid_length == 0)
266            l->rbuf_index = 0;
267
268        if (l->callback)
269            l->callback(l, pa_strip_nl(p), l->userdata);
270
271        skip = 0;
272    }
273
274    /* If the buffer became too large and still no newline was found, drop it. */
275    if (l->rbuf_valid_length >= BUFFER_LIMIT)
276        l->rbuf_index = l->rbuf_valid_length = 0;
277}
278
279static int do_write(pa_ioline *l);
280
281static int do_read(pa_ioline *l) {
282    pa_assert(l);
283    pa_assert(PA_REFCNT_VALUE(l) >= 1);
284
285    while (l->io && !l->dead && pa_iochannel_is_readable(l->io)) {
286        ssize_t r;
287        size_t len;
288
289        len = l->rbuf_length - l->rbuf_index - l->rbuf_valid_length;
290
291        /* Check if we have to enlarge the read buffer */
292        if (len < READ_SIZE) {
293            size_t n = l->rbuf_valid_length+READ_SIZE;
294
295            if (n >= BUFFER_LIMIT)
296                n = BUFFER_LIMIT;
297
298            if (l->rbuf_length >= n) {
299                /* The current buffer is large enough, let's just move the data to the front */
300                if (l->rbuf_valid_length)
301                    memmove(l->rbuf, l->rbuf+l->rbuf_index, l->rbuf_valid_length);
302            } else {
303                /* Enlarge the buffer */
304                char *new = pa_xnew(char, (unsigned) n);
305                if (l->rbuf_valid_length)
306                    memcpy(new, l->rbuf+l->rbuf_index, l->rbuf_valid_length);
307                pa_xfree(l->rbuf);
308                l->rbuf = new;
309                l->rbuf_length = n;
310            }
311
312            l->rbuf_index = 0;
313        }
314
315        len = l->rbuf_length - l->rbuf_index - l->rbuf_valid_length;
316
317        pa_assert(len >= READ_SIZE);
318
319        /* Read some data */
320        if ((r = pa_iochannel_read(l->io, l->rbuf+l->rbuf_index+l->rbuf_valid_length, len)) <= 0) {
321
322            if (r < 0 && errno == EAGAIN)
323                return 0;
324
325            if (r < 0 && errno != ECONNRESET) {
326                pa_log("read(): %s", pa_cstrerror(errno));
327                failure(l, false);
328            } else
329                failure(l, true);
330
331            return -1;
332        }
333
334        l->rbuf_valid_length += (size_t) r;
335
336        /* Look if a line has been terminated in the newly read data */
337        scan_for_lines(l, l->rbuf_valid_length - (size_t) r);
338    }
339
340    return 0;
341}
342
343/* Try to flush the buffer */
344static int do_write(pa_ioline *l) {
345    ssize_t r;
346
347    pa_assert(l);
348    pa_assert(PA_REFCNT_VALUE(l) >= 1);
349
350    while (l->io && !l->dead && pa_iochannel_is_writable(l->io) && l->wbuf_valid_length > 0) {
351
352        if ((r = pa_iochannel_write(l->io, l->wbuf+l->wbuf_index, l->wbuf_valid_length)) < 0) {
353
354            if (errno != EPIPE)
355                pa_log("write(): %s", pa_cstrerror(errno));
356
357            failure(l, false);
358
359            return -1;
360        }
361
362        l->wbuf_index += (size_t) r;
363        l->wbuf_valid_length -= (size_t) r;
364
365        /* A shortcut for the next time */
366        if (l->wbuf_valid_length == 0)
367            l->wbuf_index = 0;
368    }
369
370    if (l->wbuf_valid_length <= 0 && l->drain_callback)
371        l->drain_callback(l, l->drain_userdata);
372
373    return 0;
374}
375
376/* Try to flush read/write data */
377static void do_work(pa_ioline *l) {
378    pa_assert(l);
379    pa_assert(PA_REFCNT_VALUE(l) >= 1);
380
381    pa_ioline_ref(l);
382
383    l->mainloop->defer_enable(l->defer_event, 0);
384
385    if (!l->dead)
386        do_read(l);
387
388    if (!l->dead)
389        do_write(l);
390
391    if (l->defer_close && !l->wbuf_valid_length)
392        failure(l, true);
393
394    pa_ioline_unref(l);
395}
396
397static void io_callback(pa_iochannel*io, void *userdata) {
398    pa_ioline *l = userdata;
399
400    pa_assert(io);
401    pa_assert(l);
402    pa_assert(PA_REFCNT_VALUE(l) >= 1);
403
404    do_work(l);
405}
406
407static void defer_callback(pa_mainloop_api*m, pa_defer_event*e, void *userdata) {
408    pa_ioline *l = userdata;
409
410    pa_assert(l);
411    pa_assert(PA_REFCNT_VALUE(l) >= 1);
412    pa_assert(l->mainloop == m);
413    pa_assert(l->defer_event == e);
414
415    do_work(l);
416}
417
418void pa_ioline_defer_close(pa_ioline *l) {
419    pa_assert(l);
420    pa_assert(PA_REFCNT_VALUE(l) >= 1);
421
422    l->defer_close = true;
423
424    if (!l->wbuf_valid_length)
425        l->mainloop->defer_enable(l->defer_event, 1);
426}
427
428void pa_ioline_printf(pa_ioline *l, const char *format, ...) {
429    char *t;
430    va_list ap;
431
432    pa_assert(l);
433    pa_assert(PA_REFCNT_VALUE(l) >= 1);
434
435    va_start(ap, format);
436    t = pa_vsprintf_malloc(format, ap);
437    va_end(ap);
438
439    pa_ioline_puts(l, t);
440    pa_xfree(t);
441}
442
443pa_iochannel* pa_ioline_detach_iochannel(pa_ioline *l) {
444    pa_iochannel *r;
445
446    pa_assert(l);
447
448    if (!l->io)
449        return NULL;
450
451    r = l->io;
452    l->io = NULL;
453
454    pa_iochannel_set_callback(r, NULL, NULL);
455
456    return r;
457}
458
459bool pa_ioline_is_drained(pa_ioline *l) {
460    pa_assert(l);
461
462    return l->wbuf_valid_length <= 0;
463}
464