1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #include <sys/types.h>
26 #include <stdio.h>
27 #include <string.h>
28 #include <errno.h>
29
30 #include <pulse/xmalloc.h>
31 #include <pulse/timeval.h>
32
33 #include <pulsecore/poll.h>
34 #include <pulsecore/core-error.h>
35 #include <pulsecore/core-rtclock.h>
36 #include <pulsecore/macro.h>
37 #include <pulsecore/llist.h>
38 #include <pulsecore/flist.h>
39 #include <pulsecore/core-util.h>
40 #include <pulsecore/ratelimit.h>
41 #include <pulse/rtclock.h>
42
43 #include "rtpoll.h"
44 #include "time.h"
45
46 /* #define DEBUG_TIMING */
47
48 struct pa_rtpoll {
49 struct pollfd *pollfd, *pollfd2;
50 unsigned n_pollfd_alloc, n_pollfd_used;
51
52 struct timeval next_elapse;
53 bool timer_enabled:1;
54
55 bool scan_for_dead:1;
56 bool running:1;
57 bool rebuild_needed:1;
58 bool quit:1;
59 bool timer_elapsed:1;
60
61 #ifdef DEBUG_TIMING
62 pa_usec_t timestamp;
63 pa_usec_t slept, awake;
64 #endif
65
66 PA_LLIST_HEAD(pa_rtpoll_item, items);
67 };
68
69 struct pa_rtpoll_item {
70 pa_rtpoll *rtpoll;
71 bool dead;
72
73 pa_rtpoll_priority_t priority;
74
75 struct pollfd *pollfd;
76 unsigned n_pollfd;
77
78 int (*work_cb)(pa_rtpoll_item *i);
79 int (*before_cb)(pa_rtpoll_item *i);
80 void (*after_cb)(pa_rtpoll_item *i);
81 void *work_userdata;
82 void *before_userdata;
83 void *after_userdata;
84
85 PA_LLIST_FIELDS(pa_rtpoll_item);
86 };
87
88 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
89
pa_rtpoll_new(void)90 pa_rtpoll *pa_rtpoll_new(void) {
91 pa_rtpoll *p;
92
93 p = pa_xnew0(pa_rtpoll, 1);
94
95 p->n_pollfd_alloc = 32;
96 p->pollfd = pa_xnew(struct pollfd, p->n_pollfd_alloc);
97 p->pollfd2 = pa_xnew(struct pollfd, p->n_pollfd_alloc);
98
99 #ifdef DEBUG_TIMING
100 p->timestamp = pa_rtclock_now();
101 #endif
102
103 return p;
104 }
105
rtpoll_rebuild(pa_rtpoll *p)106 static void rtpoll_rebuild(pa_rtpoll *p) {
107
108 struct pollfd *e, *t;
109 pa_rtpoll_item *i;
110 int ra = 0;
111
112 pa_assert(p);
113
114 p->rebuild_needed = false;
115
116 if (p->n_pollfd_used > p->n_pollfd_alloc) {
117 /* Hmm, we have to allocate some more space */
118 p->n_pollfd_alloc = p->n_pollfd_used * 2;
119 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
120 ra = 1;
121 }
122
123 e = p->pollfd2;
124
125 for (i = p->items; i; i = i->next) {
126
127 if (i->n_pollfd > 0) {
128 size_t l = i->n_pollfd * sizeof(struct pollfd);
129
130 if (i->pollfd)
131 memcpy(e, i->pollfd, l);
132 else
133 memset(e, 0, l);
134
135 i->pollfd = e;
136 } else
137 i->pollfd = NULL;
138
139 e += i->n_pollfd;
140 }
141
142 pa_assert((unsigned) (e - p->pollfd2) == p->n_pollfd_used);
143 t = p->pollfd;
144 p->pollfd = p->pollfd2;
145 p->pollfd2 = t;
146
147 if (ra)
148 p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
149 }
150
rtpoll_item_destroy(pa_rtpoll_item *i)151 static void rtpoll_item_destroy(pa_rtpoll_item *i) {
152 pa_rtpoll *p;
153
154 pa_assert(i);
155
156 p = i->rtpoll;
157
158 PA_LLIST_REMOVE(pa_rtpoll_item, p->items, i);
159
160 p->n_pollfd_used -= i->n_pollfd;
161
162 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
163 pa_xfree(i);
164
165 p->rebuild_needed = true;
166 }
167
pa_rtpoll_free(pa_rtpoll *p)168 void pa_rtpoll_free(pa_rtpoll *p) {
169 pa_assert(p);
170
171 while (p->items)
172 rtpoll_item_destroy(p->items);
173
174 pa_xfree(p->pollfd);
175 pa_xfree(p->pollfd2);
176
177 pa_xfree(p);
178 }
179
reset_revents(pa_rtpoll_item *i)180 static void reset_revents(pa_rtpoll_item *i) {
181 struct pollfd *f;
182 unsigned n;
183
184 pa_assert(i);
185
186 if (!(f = pa_rtpoll_item_get_pollfd(i, &n)))
187 return;
188
189 for (; n > 0; n--)
190 f[n-1].revents = 0;
191 }
192
reset_all_revents(pa_rtpoll *p)193 static void reset_all_revents(pa_rtpoll *p) {
194 pa_rtpoll_item *i;
195
196 pa_assert(p);
197
198 for (i = p->items; i; i = i->next) {
199
200 if (i->dead)
201 continue;
202
203 reset_revents(i);
204 }
205 }
206
pa_rtpoll_run(pa_rtpoll *p)207 int pa_rtpoll_run(pa_rtpoll *p) {
208 pa_rtpoll_item *i;
209 int r = 0;
210 struct timeval timeout;
211
212 pa_assert(p);
213 pa_assert(!p->running);
214
215 #ifdef DEBUG_TIMING
216 pa_log("rtpoll_run");
217 #endif
218
219 p->running = true;
220 p->timer_elapsed = false;
221
222 /* First, let's do some work */
223 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
224 int k;
225
226 if (i->dead)
227 continue;
228
229 if (!i->work_cb)
230 continue;
231
232 if (p->quit) {
233 #ifdef DEBUG_TIMING
234 pa_log("rtpoll finish");
235 #endif
236 goto finish;
237 }
238
239 if ((k = i->work_cb(i)) != 0) {
240 if (k < 0) {
241 r = k;
242 pa_log_error("Error %d in i->work_cb, goto finish", r);
243 }
244 #ifdef DEBUG_TIMING
245 pa_log("rtpoll finish");
246 #endif
247 goto finish;
248 }
249 }
250
251 /* Now let's prepare for entering the sleep */
252 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
253 int k = 0;
254
255 if (i->dead)
256 continue;
257
258 if (!i->before_cb)
259 continue;
260
261 if (p->quit || (k = i->before_cb(i)) != 0) {
262
263 /* Hmm, this one doesn't let us enter the poll, so rewind everything */
264
265 for (i = i->prev; i; i = i->prev) {
266
267 if (i->dead)
268 continue;
269
270 if (!i->after_cb)
271 continue;
272
273 i->after_cb(i);
274 }
275
276 if (k < 0) {
277 pa_log_error("Error %d in i->before_cb, goto finish", r);
278 r = k;
279 }
280 #ifdef DEBUG_TIMING
281 pa_log("rtpoll finish");
282 #endif
283 goto finish;
284 }
285 }
286
287 if (p->rebuild_needed)
288 rtpoll_rebuild(p);
289
290 pa_zero(timeout);
291
292 /* Calculate timeout */
293 if (!p->quit && p->timer_enabled) {
294 struct timeval now;
295 pa_rtclock_get(&now);
296
297 if (pa_timeval_cmp(&p->next_elapse, &now) > 0)
298 pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now));
299 }
300
301 #ifdef DEBUG_TIMING
302 {
303 pa_usec_t now = pa_rtclock_now();
304 p->awake = now - p->timestamp;
305 p->timestamp = now;
306 if (!p->quit && p->timer_enabled)
307 pa_log("poll timeout: %d ms ",(int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)));
308 else if (p->quit)
309 pa_log("poll timeout is ZERO");
310 else
311 pa_log("poll timeout is FOREVER");
312 }
313 #endif
314
315 /* OK, now let's sleep */
316 #ifdef HAVE_PPOLL
317 {
318 struct timespec ts;
319 ts.tv_sec = timeout.tv_sec;
320 ts.tv_nsec = timeout.tv_usec * 1000;
321 r = ppoll(p->pollfd, p->n_pollfd_used, (p->quit || p->timer_enabled) ? &ts : NULL, NULL);
322 }
323 #else
324 r = pa_poll(p->pollfd, p->n_pollfd_used, (p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
325 #endif
326
327 p->timer_elapsed = r == 0;
328
329 #ifdef DEBUG_TIMING
330 {
331 pa_usec_t now = pa_rtclock_now();
332 p->slept = now - p->timestamp;
333 p->timestamp = now;
334
335 pa_log("Process time %llu ms; sleep time %llu ms",
336 (unsigned long long) (p->awake / PA_USEC_PER_MSEC),
337 (unsigned long long) (p->slept / PA_USEC_PER_MSEC));
338 }
339 #endif
340
341 if (r < 0) {
342 if (errno == EAGAIN || errno == EINTR) {
343 r = 0;
344 } else {
345 pa_log_error("Error %d in ppoll, errno: %s", r, pa_cstrerror(errno));
346 pa_log_error("poll(): %s", pa_cstrerror(errno));
347 }
348
349 reset_all_revents(p);
350 }
351
352 /* Let's tell everyone that we left the sleep */
353 for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
354
355 if (i->dead)
356 continue;
357
358 if (!i->after_cb)
359 continue;
360
361 i->after_cb(i);
362 }
363
364 finish:
365
366 p->running = false;
367
368 if (p->scan_for_dead) {
369 pa_rtpoll_item *n;
370
371 p->scan_for_dead = false;
372
373 for (i = p->items; i; i = n) {
374 n = i->next;
375
376 if (i->dead)
377 rtpoll_item_destroy(i);
378 }
379 }
380
381 return r < 0 ? r : !p->quit;
382 }
383
pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec)384 void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec) {
385 pa_assert(p);
386
387 pa_timeval_store(&p->next_elapse, usec);
388 p->timer_enabled = true;
389 }
390
pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec)391 void pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec) {
392 pa_assert(p);
393
394 /* Scheduling a timeout for more than an hour is very very suspicious */
395 pa_assert(usec <= PA_USEC_PER_SEC*60ULL*60ULL);
396
397 pa_rtclock_get(&p->next_elapse);
398 pa_timeval_add(&p->next_elapse, usec);
399 p->timer_enabled = true;
400 }
401
pa_rtpoll_set_timer_disabled(pa_rtpoll *p)402 void pa_rtpoll_set_timer_disabled(pa_rtpoll *p) {
403 pa_assert(p);
404
405 memset(&p->next_elapse, 0, sizeof(p->next_elapse));
406 p->timer_enabled = false;
407 }
408
pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds)409 pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds) {
410 pa_rtpoll_item *i, *j, *l = NULL;
411
412 pa_assert(p);
413
414 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
415 i = pa_xnew(pa_rtpoll_item, 1);
416
417 i->rtpoll = p;
418 i->dead = false;
419 i->n_pollfd = n_fds;
420 i->pollfd = NULL;
421 i->priority = prio;
422
423 i->work_userdata = NULL;
424 i->before_userdata = NULL;
425 i->work_userdata = NULL;
426 i->before_cb = NULL;
427 i->after_cb = NULL;
428 i->work_cb = NULL;
429
430 for (j = p->items; j; j = j->next) {
431 if (prio <= j->priority)
432 break;
433
434 l = j;
435 }
436
437 PA_LLIST_INSERT_AFTER(pa_rtpoll_item, p->items, j ? j->prev : l, i);
438
439 if (n_fds > 0) {
440 p->rebuild_needed = 1;
441 p->n_pollfd_used += n_fds;
442 }
443
444 return i;
445 }
446
pa_rtpoll_item_free(pa_rtpoll_item *i)447 void pa_rtpoll_item_free(pa_rtpoll_item *i) {
448 pa_assert(i);
449
450 if (i->rtpoll->running) {
451 i->dead = true;
452 i->rtpoll->scan_for_dead = true;
453 return;
454 }
455
456 rtpoll_item_destroy(i);
457 }
458
pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds)459 struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds) {
460 pa_assert(i);
461
462 if (i->n_pollfd > 0)
463 if (i->rtpoll->rebuild_needed)
464 rtpoll_rebuild(i->rtpoll);
465
466 if (n_fds)
467 *n_fds = i->n_pollfd;
468
469 return i->pollfd;
470 }
471
pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i), void *userdata)472 void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i), void *userdata) {
473 pa_assert(i);
474 pa_assert(i->priority < PA_RTPOLL_NEVER);
475
476 i->before_cb = before_cb;
477 i->before_userdata = userdata;
478 }
479
pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i), void *userdata)480 void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i), void *userdata) {
481 pa_assert(i);
482 pa_assert(i->priority < PA_RTPOLL_NEVER);
483
484 i->after_cb = after_cb;
485 i->after_userdata = userdata;
486 }
487
pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i), void *userdata)488 void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i), void *userdata) {
489 pa_assert(i);
490 pa_assert(i->priority < PA_RTPOLL_NEVER);
491
492 i->work_cb = work_cb;
493 i->work_userdata = userdata;
494 }
495
pa_rtpoll_item_get_work_userdata(pa_rtpoll_item *i)496 void* pa_rtpoll_item_get_work_userdata(pa_rtpoll_item *i) {
497 pa_assert(i);
498
499 return i->work_userdata;
500 }
501
fdsem_before(pa_rtpoll_item *i)502 static int fdsem_before(pa_rtpoll_item *i) {
503
504 if (pa_fdsem_before_poll(i->before_userdata) < 0)
505 return 1; /* 1 means immediate restart of the loop */
506
507 return 0;
508 }
509
fdsem_after(pa_rtpoll_item *i)510 static void fdsem_after(pa_rtpoll_item *i) {
511 pa_assert(i);
512
513 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
514 pa_fdsem_after_poll(i->after_userdata);
515 }
516
pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f)517 pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f) {
518 pa_rtpoll_item *i;
519 struct pollfd *pollfd;
520
521 pa_assert(p);
522 pa_assert(f);
523
524 i = pa_rtpoll_item_new(p, prio, 1);
525
526 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
527
528 pollfd->fd = pa_fdsem_get(f);
529 pollfd->events = POLLIN;
530
531 pa_rtpoll_item_set_before_callback(i, fdsem_before, f);
532 pa_rtpoll_item_set_after_callback(i, fdsem_after, f);
533
534 return i;
535 }
536
asyncmsgq_read_before(pa_rtpoll_item *i)537 static int asyncmsgq_read_before(pa_rtpoll_item *i) {
538 pa_assert(i);
539
540 if (pa_asyncmsgq_read_before_poll(i->before_userdata) < 0)
541 return 1; /* 1 means immediate restart of the loop */
542
543 return 0;
544 }
545
asyncmsgq_read_after(pa_rtpoll_item *i)546 static void asyncmsgq_read_after(pa_rtpoll_item *i) {
547 pa_assert(i);
548
549 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
550 pa_asyncmsgq_read_after_poll(i->after_userdata);
551 }
552
asyncmsgq_read_work(pa_rtpoll_item *i)553 static int asyncmsgq_read_work(pa_rtpoll_item *i) {
554 pa_msgobject *object;
555 int code;
556 void *data;
557 pa_memchunk chunk;
558 int64_t offset;
559
560 pa_assert(i);
561
562 if (pa_asyncmsgq_get(i->work_userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
563 int ret;
564
565 if (!object && code == PA_MESSAGE_SHUTDOWN) {
566 pa_asyncmsgq_done(i->work_userdata, 0);
567 /* Requests the loop to exit. Will cause the next iteration of
568 * pa_rtpoll_run() to return 0 */
569 i->rtpoll->quit = true;
570 return 1;
571 }
572
573 clock_t start = clock();
574 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
575 clock_t end = clock();
576 double deltatime = (double)(end - start) / CLOCKS_PER_SEC;
577 if (deltatime > 1.0) {
578 pa_log_error("code %d time out %f s", code, deltatime);
579 }
580 pa_asyncmsgq_done(i->work_userdata, ret);
581 return 1;
582 }
583
584 return 0;
585 }
586
pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q)587 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
588 pa_rtpoll_item *i;
589 struct pollfd *pollfd;
590
591 pa_assert(p);
592 pa_assert(q);
593
594 i = pa_rtpoll_item_new(p, prio, 1);
595
596 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
597 pollfd->fd = pa_asyncmsgq_read_fd(q);
598 pollfd->events = POLLIN;
599
600 pa_rtpoll_item_set_before_callback(i, asyncmsgq_read_before, q);
601 pa_rtpoll_item_set_after_callback(i, asyncmsgq_read_after, q);
602 pa_rtpoll_item_set_work_callback(i, asyncmsgq_read_work, q);
603
604 return i;
605 }
606
asyncmsgq_write_before(pa_rtpoll_item *i)607 static int asyncmsgq_write_before(pa_rtpoll_item *i) {
608 pa_assert(i);
609
610 pa_asyncmsgq_write_before_poll(i->before_userdata);
611 return 0;
612 }
613
asyncmsgq_write_after(pa_rtpoll_item *i)614 static void asyncmsgq_write_after(pa_rtpoll_item *i) {
615 pa_assert(i);
616
617 pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
618 pa_asyncmsgq_write_after_poll(i->after_userdata);
619 }
620
pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q)621 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
622 pa_rtpoll_item *i;
623 struct pollfd *pollfd;
624
625 pa_assert(p);
626 pa_assert(q);
627
628 i = pa_rtpoll_item_new(p, prio, 1);
629
630 pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
631 pollfd->fd = pa_asyncmsgq_write_fd(q);
632 pollfd->events = POLLIN;
633
634 pa_rtpoll_item_set_before_callback(i, asyncmsgq_write_before, q);
635 pa_rtpoll_item_set_after_callback(i, asyncmsgq_write_after, q);
636
637 return i;
638 }
639
pa_rtpoll_timer_elapsed(pa_rtpoll *p)640 bool pa_rtpoll_timer_elapsed(pa_rtpoll *p) {
641 pa_assert(p);
642
643 return p->timer_elapsed;
644 }
645