1/*** 2 This file is part of PulseAudio. 3 4 Copyright 2004-2006 Lennart Poettering 5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB 6 7 PulseAudio is free software; you can redistribute it and/or modify 8 it under the terms of the GNU Lesser General Public License as 9 published by the Free Software Foundation; either version 2.1 of the 10 License, or (at your option) any later version. 11 12 PulseAudio is distributed in the hope that it will be useful, but 13 WITHOUT ANY WARRANTY; without even the implied warranty of 14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15 Lesser General Public License for more details. 16 17 You should have received a copy of the GNU Lesser General Public 18 License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 19***/ 20 21#ifdef HAVE_CONFIG_H 22#include <config.h> 23#endif 24 25#include <sys/types.h> 26#include <stdio.h> 27#include <string.h> 28#include <errno.h> 29 30#include <pulse/xmalloc.h> 31#include <pulse/timeval.h> 32 33#include <pulsecore/poll.h> 34#include <pulsecore/core-error.h> 35#include <pulsecore/core-rtclock.h> 36#include <pulsecore/macro.h> 37#include <pulsecore/llist.h> 38#include <pulsecore/flist.h> 39#include <pulsecore/core-util.h> 40#include <pulsecore/ratelimit.h> 41#include <pulse/rtclock.h> 42 43#include "rtpoll.h" 44#include "time.h" 45 46/* #define DEBUG_TIMING */ 47 48struct pa_rtpoll { 49 struct pollfd *pollfd, *pollfd2; 50 unsigned n_pollfd_alloc, n_pollfd_used; 51 52 struct timeval next_elapse; 53 bool timer_enabled:1; 54 55 bool scan_for_dead:1; 56 bool running:1; 57 bool rebuild_needed:1; 58 bool quit:1; 59 bool timer_elapsed:1; 60 61#ifdef DEBUG_TIMING 62 pa_usec_t timestamp; 63 pa_usec_t slept, awake; 64#endif 65 66 PA_LLIST_HEAD(pa_rtpoll_item, items); 67}; 68 69struct pa_rtpoll_item { 70 pa_rtpoll *rtpoll; 71 bool dead; 72 73 pa_rtpoll_priority_t priority; 74 75 struct pollfd *pollfd; 76 unsigned n_pollfd; 77 78 int (*work_cb)(pa_rtpoll_item *i); 79 int (*before_cb)(pa_rtpoll_item *i); 80 void (*after_cb)(pa_rtpoll_item *i); 81 void *work_userdata; 82 void *before_userdata; 83 void *after_userdata; 84 85 PA_LLIST_FIELDS(pa_rtpoll_item); 86}; 87 88PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree); 89 90pa_rtpoll *pa_rtpoll_new(void) { 91 pa_rtpoll *p; 92 93 p = pa_xnew0(pa_rtpoll, 1); 94 95 p->n_pollfd_alloc = 32; 96 p->pollfd = pa_xnew(struct pollfd, p->n_pollfd_alloc); 97 p->pollfd2 = pa_xnew(struct pollfd, p->n_pollfd_alloc); 98 99#ifdef DEBUG_TIMING 100 p->timestamp = pa_rtclock_now(); 101#endif 102 103 return p; 104} 105 106static void rtpoll_rebuild(pa_rtpoll *p) { 107 108 struct pollfd *e, *t; 109 pa_rtpoll_item *i; 110 int ra = 0; 111 112 pa_assert(p); 113 114 p->rebuild_needed = false; 115 116 if (p->n_pollfd_used > p->n_pollfd_alloc) { 117 /* Hmm, we have to allocate some more space */ 118 p->n_pollfd_alloc = p->n_pollfd_used * 2; 119 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd)); 120 ra = 1; 121 } 122 123 e = p->pollfd2; 124 125 for (i = p->items; i; i = i->next) { 126 127 if (i->n_pollfd > 0) { 128 size_t l = i->n_pollfd * sizeof(struct pollfd); 129 130 if (i->pollfd) 131 memcpy(e, i->pollfd, l); 132 else 133 memset(e, 0, l); 134 135 i->pollfd = e; 136 } else 137 i->pollfd = NULL; 138 139 e += i->n_pollfd; 140 } 141 142 pa_assert((unsigned) (e - p->pollfd2) == p->n_pollfd_used); 143 t = p->pollfd; 144 p->pollfd = p->pollfd2; 145 p->pollfd2 = t; 146 147 if (ra) 148 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd)); 149} 150 151static void rtpoll_item_destroy(pa_rtpoll_item *i) { 152 pa_rtpoll *p; 153 154 pa_assert(i); 155 156 p = i->rtpoll; 157 158 PA_LLIST_REMOVE(pa_rtpoll_item, p->items, i); 159 160 p->n_pollfd_used -= i->n_pollfd; 161 162 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0) 163 pa_xfree(i); 164 165 p->rebuild_needed = true; 166} 167 168void pa_rtpoll_free(pa_rtpoll *p) { 169 pa_assert(p); 170 171 while (p->items) 172 rtpoll_item_destroy(p->items); 173 174 pa_xfree(p->pollfd); 175 pa_xfree(p->pollfd2); 176 177 pa_xfree(p); 178} 179 180static void reset_revents(pa_rtpoll_item *i) { 181 struct pollfd *f; 182 unsigned n; 183 184 pa_assert(i); 185 186 if (!(f = pa_rtpoll_item_get_pollfd(i, &n))) 187 return; 188 189 for (; n > 0; n--) 190 f[n-1].revents = 0; 191} 192 193static void reset_all_revents(pa_rtpoll *p) { 194 pa_rtpoll_item *i; 195 196 pa_assert(p); 197 198 for (i = p->items; i; i = i->next) { 199 200 if (i->dead) 201 continue; 202 203 reset_revents(i); 204 } 205} 206 207int pa_rtpoll_run(pa_rtpoll *p) { 208 pa_rtpoll_item *i; 209 int r = 0; 210 struct timeval timeout; 211 212 pa_assert(p); 213 pa_assert(!p->running); 214 215#ifdef DEBUG_TIMING 216 pa_log("rtpoll_run"); 217#endif 218 219 p->running = true; 220 p->timer_elapsed = false; 221 222 /* First, let's do some work */ 223 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) { 224 int k; 225 226 if (i->dead) 227 continue; 228 229 if (!i->work_cb) 230 continue; 231 232 if (p->quit) { 233#ifdef DEBUG_TIMING 234 pa_log("rtpoll finish"); 235#endif 236 goto finish; 237 } 238 239 if ((k = i->work_cb(i)) != 0) { 240 if (k < 0) { 241 r = k; 242 pa_log_error("Error %d in i->work_cb, goto finish", r); 243 } 244#ifdef DEBUG_TIMING 245 pa_log("rtpoll finish"); 246#endif 247 goto finish; 248 } 249 } 250 251 /* Now let's prepare for entering the sleep */ 252 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) { 253 int k = 0; 254 255 if (i->dead) 256 continue; 257 258 if (!i->before_cb) 259 continue; 260 261 if (p->quit || (k = i->before_cb(i)) != 0) { 262 263 /* Hmm, this one doesn't let us enter the poll, so rewind everything */ 264 265 for (i = i->prev; i; i = i->prev) { 266 267 if (i->dead) 268 continue; 269 270 if (!i->after_cb) 271 continue; 272 273 i->after_cb(i); 274 } 275 276 if (k < 0) { 277 pa_log_error("Error %d in i->before_cb, goto finish", r); 278 r = k; 279 } 280#ifdef DEBUG_TIMING 281 pa_log("rtpoll finish"); 282#endif 283 goto finish; 284 } 285 } 286 287 if (p->rebuild_needed) 288 rtpoll_rebuild(p); 289 290 pa_zero(timeout); 291 292 /* Calculate timeout */ 293 if (!p->quit && p->timer_enabled) { 294 struct timeval now; 295 pa_rtclock_get(&now); 296 297 if (pa_timeval_cmp(&p->next_elapse, &now) > 0) 298 pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now)); 299 } 300 301#ifdef DEBUG_TIMING 302 { 303 pa_usec_t now = pa_rtclock_now(); 304 p->awake = now - p->timestamp; 305 p->timestamp = now; 306 if (!p->quit && p->timer_enabled) 307 pa_log("poll timeout: %d ms ",(int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000))); 308 else if (p->quit) 309 pa_log("poll timeout is ZERO"); 310 else 311 pa_log("poll timeout is FOREVER"); 312 } 313#endif 314 315 /* OK, now let's sleep */ 316#ifdef HAVE_PPOLL 317 { 318 struct timespec ts; 319 ts.tv_sec = timeout.tv_sec; 320 ts.tv_nsec = timeout.tv_usec * 1000; 321 r = ppoll(p->pollfd, p->n_pollfd_used, (p->quit || p->timer_enabled) ? &ts : NULL, NULL); 322 } 323#else 324 r = pa_poll(p->pollfd, p->n_pollfd_used, (p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1); 325#endif 326 327 p->timer_elapsed = r == 0; 328 329#ifdef DEBUG_TIMING 330 { 331 pa_usec_t now = pa_rtclock_now(); 332 p->slept = now - p->timestamp; 333 p->timestamp = now; 334 335 pa_log("Process time %llu ms; sleep time %llu ms", 336 (unsigned long long) (p->awake / PA_USEC_PER_MSEC), 337 (unsigned long long) (p->slept / PA_USEC_PER_MSEC)); 338 } 339#endif 340 341 if (r < 0) { 342 if (errno == EAGAIN || errno == EINTR) { 343 r = 0; 344 } else { 345 pa_log_error("Error %d in ppoll, errno: %s", r, pa_cstrerror(errno)); 346 pa_log_error("poll(): %s", pa_cstrerror(errno)); 347 } 348 349 reset_all_revents(p); 350 } 351 352 /* Let's tell everyone that we left the sleep */ 353 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) { 354 355 if (i->dead) 356 continue; 357 358 if (!i->after_cb) 359 continue; 360 361 i->after_cb(i); 362 } 363 364finish: 365 366 p->running = false; 367 368 if (p->scan_for_dead) { 369 pa_rtpoll_item *n; 370 371 p->scan_for_dead = false; 372 373 for (i = p->items; i; i = n) { 374 n = i->next; 375 376 if (i->dead) 377 rtpoll_item_destroy(i); 378 } 379 } 380 381 return r < 0 ? r : !p->quit; 382} 383 384void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec) { 385 pa_assert(p); 386 387 pa_timeval_store(&p->next_elapse, usec); 388 p->timer_enabled = true; 389} 390 391void pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec) { 392 pa_assert(p); 393 394 /* Scheduling a timeout for more than an hour is very very suspicious */ 395 pa_assert(usec <= PA_USEC_PER_SEC*60ULL*60ULL); 396 397 pa_rtclock_get(&p->next_elapse); 398 pa_timeval_add(&p->next_elapse, usec); 399 p->timer_enabled = true; 400} 401 402void pa_rtpoll_set_timer_disabled(pa_rtpoll *p) { 403 pa_assert(p); 404 405 memset(&p->next_elapse, 0, sizeof(p->next_elapse)); 406 p->timer_enabled = false; 407} 408 409pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds) { 410 pa_rtpoll_item *i, *j, *l = NULL; 411 412 pa_assert(p); 413 414 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items)))) 415 i = pa_xnew(pa_rtpoll_item, 1); 416 417 i->rtpoll = p; 418 i->dead = false; 419 i->n_pollfd = n_fds; 420 i->pollfd = NULL; 421 i->priority = prio; 422 423 i->work_userdata = NULL; 424 i->before_userdata = NULL; 425 i->work_userdata = NULL; 426 i->before_cb = NULL; 427 i->after_cb = NULL; 428 i->work_cb = NULL; 429 430 for (j = p->items; j; j = j->next) { 431 if (prio <= j->priority) 432 break; 433 434 l = j; 435 } 436 437 PA_LLIST_INSERT_AFTER(pa_rtpoll_item, p->items, j ? j->prev : l, i); 438 439 if (n_fds > 0) { 440 p->rebuild_needed = 1; 441 p->n_pollfd_used += n_fds; 442 } 443 444 return i; 445} 446 447void pa_rtpoll_item_free(pa_rtpoll_item *i) { 448 pa_assert(i); 449 450 if (i->rtpoll->running) { 451 i->dead = true; 452 i->rtpoll->scan_for_dead = true; 453 return; 454 } 455 456 rtpoll_item_destroy(i); 457} 458 459struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds) { 460 pa_assert(i); 461 462 if (i->n_pollfd > 0) 463 if (i->rtpoll->rebuild_needed) 464 rtpoll_rebuild(i->rtpoll); 465 466 if (n_fds) 467 *n_fds = i->n_pollfd; 468 469 return i->pollfd; 470} 471 472void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i), void *userdata) { 473 pa_assert(i); 474 pa_assert(i->priority < PA_RTPOLL_NEVER); 475 476 i->before_cb = before_cb; 477 i->before_userdata = userdata; 478} 479 480void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i), void *userdata) { 481 pa_assert(i); 482 pa_assert(i->priority < PA_RTPOLL_NEVER); 483 484 i->after_cb = after_cb; 485 i->after_userdata = userdata; 486} 487 488void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i), void *userdata) { 489 pa_assert(i); 490 pa_assert(i->priority < PA_RTPOLL_NEVER); 491 492 i->work_cb = work_cb; 493 i->work_userdata = userdata; 494} 495 496void* pa_rtpoll_item_get_work_userdata(pa_rtpoll_item *i) { 497 pa_assert(i); 498 499 return i->work_userdata; 500} 501 502static int fdsem_before(pa_rtpoll_item *i) { 503 504 if (pa_fdsem_before_poll(i->before_userdata) < 0) 505 return 1; /* 1 means immediate restart of the loop */ 506 507 return 0; 508} 509 510static void fdsem_after(pa_rtpoll_item *i) { 511 pa_assert(i); 512 513 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0); 514 pa_fdsem_after_poll(i->after_userdata); 515} 516 517pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f) { 518 pa_rtpoll_item *i; 519 struct pollfd *pollfd; 520 521 pa_assert(p); 522 pa_assert(f); 523 524 i = pa_rtpoll_item_new(p, prio, 1); 525 526 pollfd = pa_rtpoll_item_get_pollfd(i, NULL); 527 528 pollfd->fd = pa_fdsem_get(f); 529 pollfd->events = POLLIN; 530 531 pa_rtpoll_item_set_before_callback(i, fdsem_before, f); 532 pa_rtpoll_item_set_after_callback(i, fdsem_after, f); 533 534 return i; 535} 536 537static int asyncmsgq_read_before(pa_rtpoll_item *i) { 538 pa_assert(i); 539 540 if (pa_asyncmsgq_read_before_poll(i->before_userdata) < 0) 541 return 1; /* 1 means immediate restart of the loop */ 542 543 return 0; 544} 545 546static void asyncmsgq_read_after(pa_rtpoll_item *i) { 547 pa_assert(i); 548 549 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0); 550 pa_asyncmsgq_read_after_poll(i->after_userdata); 551} 552 553static int asyncmsgq_read_work(pa_rtpoll_item *i) { 554 pa_msgobject *object; 555 int code; 556 void *data; 557 pa_memchunk chunk; 558 int64_t offset; 559 560 pa_assert(i); 561 562 if (pa_asyncmsgq_get(i->work_userdata, &object, &code, &data, &offset, &chunk, 0) == 0) { 563 int ret; 564 565 if (!object && code == PA_MESSAGE_SHUTDOWN) { 566 pa_asyncmsgq_done(i->work_userdata, 0); 567 /* Requests the loop to exit. Will cause the next iteration of 568 * pa_rtpoll_run() to return 0 */ 569 i->rtpoll->quit = true; 570 return 1; 571 } 572 573 clock_t start = clock(); 574 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); 575 clock_t end = clock(); 576 double deltatime = (double)(end - start) / CLOCKS_PER_SEC; 577 if (deltatime > 1.0) { 578 pa_log_error("code %d time out %f s", code, deltatime); 579 } 580 pa_asyncmsgq_done(i->work_userdata, ret); 581 return 1; 582 } 583 584 return 0; 585} 586 587pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) { 588 pa_rtpoll_item *i; 589 struct pollfd *pollfd; 590 591 pa_assert(p); 592 pa_assert(q); 593 594 i = pa_rtpoll_item_new(p, prio, 1); 595 596 pollfd = pa_rtpoll_item_get_pollfd(i, NULL); 597 pollfd->fd = pa_asyncmsgq_read_fd(q); 598 pollfd->events = POLLIN; 599 600 pa_rtpoll_item_set_before_callback(i, asyncmsgq_read_before, q); 601 pa_rtpoll_item_set_after_callback(i, asyncmsgq_read_after, q); 602 pa_rtpoll_item_set_work_callback(i, asyncmsgq_read_work, q); 603 604 return i; 605} 606 607static int asyncmsgq_write_before(pa_rtpoll_item *i) { 608 pa_assert(i); 609 610 pa_asyncmsgq_write_before_poll(i->before_userdata); 611 return 0; 612} 613 614static void asyncmsgq_write_after(pa_rtpoll_item *i) { 615 pa_assert(i); 616 617 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0); 618 pa_asyncmsgq_write_after_poll(i->after_userdata); 619} 620 621pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) { 622 pa_rtpoll_item *i; 623 struct pollfd *pollfd; 624 625 pa_assert(p); 626 pa_assert(q); 627 628 i = pa_rtpoll_item_new(p, prio, 1); 629 630 pollfd = pa_rtpoll_item_get_pollfd(i, NULL); 631 pollfd->fd = pa_asyncmsgq_write_fd(q); 632 pollfd->events = POLLIN; 633 634 pa_rtpoll_item_set_before_callback(i, asyncmsgq_write_before, q); 635 pa_rtpoll_item_set_after_callback(i, asyncmsgq_write_after, q); 636 637 return i; 638} 639 640bool pa_rtpoll_timer_elapsed(pa_rtpoll *p) { 641 pa_assert(p); 642 643 return p->timer_elapsed; 644} 645