1/*** 2 This file is part of PulseAudio. 3 4 Copyright 2004-2006 Lennart Poettering 5 6 PulseAudio is free software; you can redistribute it and/or modify 7 it under the terms of the GNU Lesser General Public License as published 8 by the Free Software Foundation; either version 2.1 of the License, 9 or (at your option) any later version. 10 11 PulseAudio is distributed in the hope that it will be useful, but 12 WITHOUT ANY WARRANTY; without even the implied warranty of 13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 General Public License for more details. 15 16 You should have received a copy of the GNU Lesser General Public License 17 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 18***/ 19 20#ifdef HAVE_CONFIG_H 21#include <config.h> 22#endif 23 24#include <errno.h> 25#include <stdio.h> 26#include <stdlib.h> 27#include <string.h> 28 29#include <pulse/xmalloc.h> 30 31#include <pulsecore/socket.h> 32#include <pulsecore/core-error.h> 33#include <pulsecore/core-util.h> 34#include <pulsecore/log.h> 35#include <pulsecore/macro.h> 36#include <pulsecore/refcnt.h> 37 38#include "ioline.h" 39 40#define BUFFER_LIMIT (64*1024) 41#define READ_SIZE (1024) 42 43struct pa_ioline { 44 PA_REFCNT_DECLARE; 45 46 pa_iochannel *io; 47 pa_defer_event *defer_event; 48 pa_mainloop_api *mainloop; 49 50 char *wbuf; 51 size_t wbuf_length, wbuf_index, wbuf_valid_length; 52 53 char *rbuf; 54 size_t rbuf_length, rbuf_index, rbuf_valid_length; 55 56 pa_ioline_cb_t callback; 57 void *userdata; 58 59 pa_ioline_drain_cb_t drain_callback; 60 void *drain_userdata; 61 62 bool dead:1; 63 bool defer_close:1; 64}; 65 66static void io_callback(pa_iochannel*io, void *userdata); 67static void defer_callback(pa_mainloop_api*m, pa_defer_event*e, void *userdata); 68 69pa_ioline* pa_ioline_new(pa_iochannel *io) { 70 pa_ioline *l; 71 pa_assert(io); 72 73 l = pa_xnew(pa_ioline, 1); 74 PA_REFCNT_INIT(l); 75 l->io = io; 76 77 l->wbuf = NULL; 78 l->wbuf_length = l->wbuf_index = l->wbuf_valid_length = 0; 79 80 l->rbuf = NULL; 81 l->rbuf_length = l->rbuf_index = l->rbuf_valid_length = 0; 82 83 l->callback = NULL; 84 l->userdata = NULL; 85 86 l->drain_callback = NULL; 87 l->drain_userdata = NULL; 88 89 l->mainloop = pa_iochannel_get_mainloop_api(io); 90 91 l->defer_event = l->mainloop->defer_new(l->mainloop, defer_callback, l); 92 l->mainloop->defer_enable(l->defer_event, 0); 93 94 l->dead = false; 95 l->defer_close = false; 96 97 pa_iochannel_set_callback(io, io_callback, l); 98 99 return l; 100} 101 102static void ioline_free(pa_ioline *l) { 103 pa_assert(l); 104 105 if (l->io) 106 pa_iochannel_free(l->io); 107 108 if (l->defer_event) 109 l->mainloop->defer_free(l->defer_event); 110 111 pa_xfree(l->wbuf); 112 pa_xfree(l->rbuf); 113 pa_xfree(l); 114} 115 116void pa_ioline_unref(pa_ioline *l) { 117 pa_assert(l); 118 pa_assert(PA_REFCNT_VALUE(l) >= 1); 119 120 if (PA_REFCNT_DEC(l) <= 0) 121 ioline_free(l); 122} 123 124pa_ioline* pa_ioline_ref(pa_ioline *l) { 125 pa_assert(l); 126 pa_assert(PA_REFCNT_VALUE(l) >= 1); 127 128 PA_REFCNT_INC(l); 129 return l; 130} 131 132void pa_ioline_close(pa_ioline *l) { 133 pa_assert(l); 134 pa_assert(PA_REFCNT_VALUE(l) >= 1); 135 136 l->dead = true; 137 138 if (l->io) { 139 pa_iochannel_free(l->io); 140 l->io = NULL; 141 } 142 143 if (l->defer_event) { 144 l->mainloop->defer_free(l->defer_event); 145 l->defer_event = NULL; 146 } 147 148 if (l->callback) 149 l->callback = NULL; 150} 151 152void pa_ioline_puts(pa_ioline *l, const char *c) { 153 size_t len; 154 155 pa_assert(l); 156 pa_assert(PA_REFCNT_VALUE(l) >= 1); 157 pa_assert(c); 158 159 if (l->dead) 160 return; 161 162 len = strlen(c); 163 if (len > BUFFER_LIMIT - l->wbuf_valid_length) 164 len = BUFFER_LIMIT - l->wbuf_valid_length; 165 166 if (len) { 167 pa_assert(l->wbuf_length >= l->wbuf_valid_length); 168 169 /* In case the allocated buffer is too small, enlarge it. */ 170 if (l->wbuf_valid_length + len > l->wbuf_length) { 171 size_t n = l->wbuf_valid_length+len; 172 char *new = pa_xnew(char, (unsigned) n); 173 174 if (l->wbuf) { 175 memcpy(new, l->wbuf+l->wbuf_index, l->wbuf_valid_length); 176 pa_xfree(l->wbuf); 177 } 178 179 l->wbuf = new; 180 l->wbuf_length = n; 181 l->wbuf_index = 0; 182 } else if (l->wbuf_index + l->wbuf_valid_length + len > l->wbuf_length) { 183 184 /* In case the allocated buffer fits, but the current index is too far from the start, move it to the front. */ 185 memmove(l->wbuf, l->wbuf+l->wbuf_index, l->wbuf_valid_length); 186 l->wbuf_index = 0; 187 } 188 189 pa_assert(l->wbuf_index + l->wbuf_valid_length + len <= l->wbuf_length); 190 191 /* Append the new string */ 192 memcpy(l->wbuf + l->wbuf_index + l->wbuf_valid_length, c, len); 193 l->wbuf_valid_length += len; 194 195 l->mainloop->defer_enable(l->defer_event, 1); 196 } 197} 198 199void pa_ioline_set_callback(pa_ioline*l, pa_ioline_cb_t callback, void *userdata) { 200 pa_assert(l); 201 pa_assert(PA_REFCNT_VALUE(l) >= 1); 202 203 if (l->dead) 204 return; 205 206 l->callback = callback; 207 l->userdata = userdata; 208} 209 210void pa_ioline_set_drain_callback(pa_ioline*l, pa_ioline_drain_cb_t callback, void *userdata) { 211 pa_assert(l); 212 pa_assert(PA_REFCNT_VALUE(l) >= 1); 213 214 if (l->dead) 215 return; 216 217 l->drain_callback = callback; 218 l->drain_userdata = userdata; 219} 220 221static void failure(pa_ioline *l, bool process_leftover) { 222 pa_assert(l); 223 pa_assert(PA_REFCNT_VALUE(l) >= 1); 224 pa_assert(!l->dead); 225 226 if (process_leftover && l->rbuf_valid_length > 0) { 227 /* Pass the last missing bit to the client */ 228 229 if (l->callback) { 230 char *p = pa_xstrndup(l->rbuf+l->rbuf_index, l->rbuf_valid_length); 231 l->callback(l, p, l->userdata); 232 pa_xfree(p); 233 } 234 } 235 236 if (l->callback) { 237 l->callback(l, NULL, l->userdata); 238 l->callback = NULL; 239 } 240 241 pa_ioline_close(l); 242} 243 244static void scan_for_lines(pa_ioline *l, size_t skip) { 245 pa_assert(l); 246 pa_assert(PA_REFCNT_VALUE(l) >= 1); 247 pa_assert(skip < l->rbuf_valid_length); 248 249 while (!l->dead && l->rbuf_valid_length > skip) { 250 char *e, *p; 251 size_t m; 252 253 if (!(e = memchr(l->rbuf + l->rbuf_index + skip, '\n', l->rbuf_valid_length - skip))) 254 break; 255 256 *e = 0; 257 258 p = l->rbuf + l->rbuf_index; 259 m = strlen(p); 260 261 l->rbuf_index += m+1; 262 l->rbuf_valid_length -= m+1; 263 264 /* A shortcut for the next time */ 265 if (l->rbuf_valid_length == 0) 266 l->rbuf_index = 0; 267 268 if (l->callback) 269 l->callback(l, pa_strip_nl(p), l->userdata); 270 271 skip = 0; 272 } 273 274 /* If the buffer became too large and still no newline was found, drop it. */ 275 if (l->rbuf_valid_length >= BUFFER_LIMIT) 276 l->rbuf_index = l->rbuf_valid_length = 0; 277} 278 279static int do_write(pa_ioline *l); 280 281static int do_read(pa_ioline *l) { 282 pa_assert(l); 283 pa_assert(PA_REFCNT_VALUE(l) >= 1); 284 285 while (l->io && !l->dead && pa_iochannel_is_readable(l->io)) { 286 ssize_t r; 287 size_t len; 288 289 len = l->rbuf_length - l->rbuf_index - l->rbuf_valid_length; 290 291 /* Check if we have to enlarge the read buffer */ 292 if (len < READ_SIZE) { 293 size_t n = l->rbuf_valid_length+READ_SIZE; 294 295 if (n >= BUFFER_LIMIT) 296 n = BUFFER_LIMIT; 297 298 if (l->rbuf_length >= n) { 299 /* The current buffer is large enough, let's just move the data to the front */ 300 if (l->rbuf_valid_length) 301 memmove(l->rbuf, l->rbuf+l->rbuf_index, l->rbuf_valid_length); 302 } else { 303 /* Enlarge the buffer */ 304 char *new = pa_xnew(char, (unsigned) n); 305 if (l->rbuf_valid_length) 306 memcpy(new, l->rbuf+l->rbuf_index, l->rbuf_valid_length); 307 pa_xfree(l->rbuf); 308 l->rbuf = new; 309 l->rbuf_length = n; 310 } 311 312 l->rbuf_index = 0; 313 } 314 315 len = l->rbuf_length - l->rbuf_index - l->rbuf_valid_length; 316 317 pa_assert(len >= READ_SIZE); 318 319 /* Read some data */ 320 if ((r = pa_iochannel_read(l->io, l->rbuf+l->rbuf_index+l->rbuf_valid_length, len)) <= 0) { 321 322 if (r < 0 && errno == EAGAIN) 323 return 0; 324 325 if (r < 0 && errno != ECONNRESET) { 326 pa_log("read(): %s", pa_cstrerror(errno)); 327 failure(l, false); 328 } else 329 failure(l, true); 330 331 return -1; 332 } 333 334 l->rbuf_valid_length += (size_t) r; 335 336 /* Look if a line has been terminated in the newly read data */ 337 scan_for_lines(l, l->rbuf_valid_length - (size_t) r); 338 } 339 340 return 0; 341} 342 343/* Try to flush the buffer */ 344static int do_write(pa_ioline *l) { 345 ssize_t r; 346 347 pa_assert(l); 348 pa_assert(PA_REFCNT_VALUE(l) >= 1); 349 350 while (l->io && !l->dead && pa_iochannel_is_writable(l->io) && l->wbuf_valid_length > 0) { 351 352 if ((r = pa_iochannel_write(l->io, l->wbuf+l->wbuf_index, l->wbuf_valid_length)) < 0) { 353 354 if (errno != EPIPE) 355 pa_log("write(): %s", pa_cstrerror(errno)); 356 357 failure(l, false); 358 359 return -1; 360 } 361 362 l->wbuf_index += (size_t) r; 363 l->wbuf_valid_length -= (size_t) r; 364 365 /* A shortcut for the next time */ 366 if (l->wbuf_valid_length == 0) 367 l->wbuf_index = 0; 368 } 369 370 if (l->wbuf_valid_length <= 0 && l->drain_callback) 371 l->drain_callback(l, l->drain_userdata); 372 373 return 0; 374} 375 376/* Try to flush read/write data */ 377static void do_work(pa_ioline *l) { 378 pa_assert(l); 379 pa_assert(PA_REFCNT_VALUE(l) >= 1); 380 381 pa_ioline_ref(l); 382 383 l->mainloop->defer_enable(l->defer_event, 0); 384 385 if (!l->dead) 386 do_read(l); 387 388 if (!l->dead) 389 do_write(l); 390 391 if (l->defer_close && !l->wbuf_valid_length) 392 failure(l, true); 393 394 pa_ioline_unref(l); 395} 396 397static void io_callback(pa_iochannel*io, void *userdata) { 398 pa_ioline *l = userdata; 399 400 pa_assert(io); 401 pa_assert(l); 402 pa_assert(PA_REFCNT_VALUE(l) >= 1); 403 404 do_work(l); 405} 406 407static void defer_callback(pa_mainloop_api*m, pa_defer_event*e, void *userdata) { 408 pa_ioline *l = userdata; 409 410 pa_assert(l); 411 pa_assert(PA_REFCNT_VALUE(l) >= 1); 412 pa_assert(l->mainloop == m); 413 pa_assert(l->defer_event == e); 414 415 do_work(l); 416} 417 418void pa_ioline_defer_close(pa_ioline *l) { 419 pa_assert(l); 420 pa_assert(PA_REFCNT_VALUE(l) >= 1); 421 422 l->defer_close = true; 423 424 if (!l->wbuf_valid_length) 425 l->mainloop->defer_enable(l->defer_event, 1); 426} 427 428void pa_ioline_printf(pa_ioline *l, const char *format, ...) { 429 char *t; 430 va_list ap; 431 432 pa_assert(l); 433 pa_assert(PA_REFCNT_VALUE(l) >= 1); 434 435 va_start(ap, format); 436 t = pa_vsprintf_malloc(format, ap); 437 va_end(ap); 438 439 pa_ioline_puts(l, t); 440 pa_xfree(t); 441} 442 443pa_iochannel* pa_ioline_detach_iochannel(pa_ioline *l) { 444 pa_iochannel *r; 445 446 pa_assert(l); 447 448 if (!l->io) 449 return NULL; 450 451 r = l->io; 452 l->io = NULL; 453 454 pa_iochannel_set_callback(r, NULL, NULL); 455 456 return r; 457} 458 459bool pa_ioline_is_drained(pa_ioline *l) { 460 pa_assert(l); 461 462 return l->wbuf_valid_length <= 0; 463} 464