1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6 
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as
9   published by the Free Software Foundation; either version 2.1 of the
10   License, or (at your option) any later version.
11 
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   Lesser General Public License for more details.
16 
17   You should have received a copy of the GNU Lesser General Public
18   License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19 ***/
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #include <sys/types.h>
26 #include <stdio.h>
27 #include <string.h>
28 #include <errno.h>
29 
30 #include <pulse/xmalloc.h>
31 #include <pulse/timeval.h>
32 
33 #include <pulsecore/poll.h>
34 #include <pulsecore/core-error.h>
35 #include <pulsecore/core-rtclock.h>
36 #include <pulsecore/macro.h>
37 #include <pulsecore/llist.h>
38 #include <pulsecore/flist.h>
39 #include <pulsecore/core-util.h>
40 #include <pulsecore/ratelimit.h>
41 #include <pulse/rtclock.h>
42 
43 #include "rtpoll.h"
44 #include "time.h"
45 
46 /* #define DEBUG_TIMING */
47 
48 struct pa_rtpoll {
49     struct pollfd *pollfd, *pollfd2;
50     unsigned n_pollfd_alloc, n_pollfd_used;
51 
52     struct timeval next_elapse;
53     bool timer_enabled:1;
54 
55     bool scan_for_dead:1;
56     bool running:1;
57     bool rebuild_needed:1;
58     bool quit:1;
59     bool timer_elapsed:1;
60 
61 #ifdef DEBUG_TIMING
62     pa_usec_t timestamp;
63     pa_usec_t slept, awake;
64 #endif
65 
66     PA_LLIST_HEAD(pa_rtpoll_item, items);
67 };
68 
69 struct pa_rtpoll_item {
70     pa_rtpoll *rtpoll;
71     bool dead;
72 
73     pa_rtpoll_priority_t priority;
74 
75     struct pollfd *pollfd;
76     unsigned n_pollfd;
77 
78     int (*work_cb)(pa_rtpoll_item *i);
79     int (*before_cb)(pa_rtpoll_item *i);
80     void (*after_cb)(pa_rtpoll_item *i);
81     void *work_userdata;
82     void *before_userdata;
83     void *after_userdata;
84 
85     PA_LLIST_FIELDS(pa_rtpoll_item);
86 };
87 
88 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
89 
pa_rtpoll_new(void)90 pa_rtpoll *pa_rtpoll_new(void) {
91     pa_rtpoll *p;
92 
93     p = pa_xnew0(pa_rtpoll, 1);
94 
95     p->n_pollfd_alloc = 32;
96     p->pollfd = pa_xnew(struct pollfd, p->n_pollfd_alloc);
97     p->pollfd2 = pa_xnew(struct pollfd, p->n_pollfd_alloc);
98 
99 #ifdef DEBUG_TIMING
100     p->timestamp = pa_rtclock_now();
101 #endif
102 
103     return p;
104 }
105 
rtpoll_rebuild(pa_rtpoll *p)106 static void rtpoll_rebuild(pa_rtpoll *p) {
107 
108     struct pollfd *e, *t;
109     pa_rtpoll_item *i;
110     int ra = 0;
111 
112     pa_assert(p);
113 
114     p->rebuild_needed = false;
115 
116     if (p->n_pollfd_used > p->n_pollfd_alloc) {
117         /* Hmm, we have to allocate some more space */
118         p->n_pollfd_alloc = p->n_pollfd_used * 2;
119         p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
120         ra = 1;
121     }
122 
123     e = p->pollfd2;
124 
125     for (i = p->items; i; i = i->next) {
126 
127         if (i->n_pollfd > 0) {
128             size_t l = i->n_pollfd * sizeof(struct pollfd);
129 
130             if (i->pollfd)
131                 memcpy(e, i->pollfd, l);
132             else
133                 memset(e, 0, l);
134 
135             i->pollfd = e;
136         } else
137             i->pollfd = NULL;
138 
139         e += i->n_pollfd;
140     }
141 
142     pa_assert((unsigned) (e - p->pollfd2) == p->n_pollfd_used);
143     t = p->pollfd;
144     p->pollfd = p->pollfd2;
145     p->pollfd2 = t;
146 
147     if (ra)
148         p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
149 }
150 
rtpoll_item_destroy(pa_rtpoll_item *i)151 static void rtpoll_item_destroy(pa_rtpoll_item *i) {
152     pa_rtpoll *p;
153 
154     pa_assert(i);
155 
156     p = i->rtpoll;
157 
158     PA_LLIST_REMOVE(pa_rtpoll_item, p->items, i);
159 
160     p->n_pollfd_used -= i->n_pollfd;
161 
162     if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
163         pa_xfree(i);
164 
165     p->rebuild_needed = true;
166 }
167 
pa_rtpoll_free(pa_rtpoll *p)168 void pa_rtpoll_free(pa_rtpoll *p) {
169     pa_assert(p);
170 
171     while (p->items)
172         rtpoll_item_destroy(p->items);
173 
174     pa_xfree(p->pollfd);
175     pa_xfree(p->pollfd2);
176 
177     pa_xfree(p);
178 }
179 
reset_revents(pa_rtpoll_item *i)180 static void reset_revents(pa_rtpoll_item *i) {
181     struct pollfd *f;
182     unsigned n;
183 
184     pa_assert(i);
185 
186     if (!(f = pa_rtpoll_item_get_pollfd(i, &n)))
187         return;
188 
189     for (; n > 0; n--)
190         f[n-1].revents = 0;
191 }
192 
reset_all_revents(pa_rtpoll *p)193 static void reset_all_revents(pa_rtpoll *p) {
194     pa_rtpoll_item *i;
195 
196     pa_assert(p);
197 
198     for (i = p->items; i; i = i->next) {
199 
200         if (i->dead)
201             continue;
202 
203         reset_revents(i);
204     }
205 }
206 
pa_rtpoll_run(pa_rtpoll *p)207 int pa_rtpoll_run(pa_rtpoll *p) {
208     pa_rtpoll_item *i;
209     int r = 0;
210     struct timeval timeout;
211 
212     pa_assert(p);
213     pa_assert(!p->running);
214 
215 #ifdef DEBUG_TIMING
216     pa_log("rtpoll_run");
217 #endif
218 
219     p->running = true;
220     p->timer_elapsed = false;
221 
222     /* First, let's do some work */
223     for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
224         int k;
225 
226         if (i->dead)
227             continue;
228 
229         if (!i->work_cb)
230             continue;
231 
232         if (p->quit) {
233 #ifdef DEBUG_TIMING
234             pa_log("rtpoll finish");
235 #endif
236             goto finish;
237         }
238 
239         if ((k = i->work_cb(i)) != 0) {
240             if (k < 0) {
241                 r = k;
242                 pa_log_error("Error %d in i->work_cb, goto finish", r);
243             }
244 #ifdef DEBUG_TIMING
245             pa_log("rtpoll finish");
246 #endif
247             goto finish;
248         }
249     }
250 
251     /* Now let's prepare for entering the sleep */
252     for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
253         int k = 0;
254 
255         if (i->dead)
256             continue;
257 
258         if (!i->before_cb)
259             continue;
260 
261         if (p->quit || (k = i->before_cb(i)) != 0) {
262 
263             /* Hmm, this one doesn't let us enter the poll, so rewind everything */
264 
265             for (i = i->prev; i; i = i->prev) {
266 
267                 if (i->dead)
268                     continue;
269 
270                 if (!i->after_cb)
271                     continue;
272 
273                 i->after_cb(i);
274             }
275 
276             if (k < 0) {
277                 pa_log_error("Error %d in i->before_cb, goto finish", r);
278                 r = k;
279             }
280 #ifdef DEBUG_TIMING
281             pa_log("rtpoll finish");
282 #endif
283             goto finish;
284         }
285     }
286 
287     if (p->rebuild_needed)
288         rtpoll_rebuild(p);
289 
290     pa_zero(timeout);
291 
292     /* Calculate timeout */
293     if (!p->quit && p->timer_enabled) {
294         struct timeval now;
295         pa_rtclock_get(&now);
296 
297         if (pa_timeval_cmp(&p->next_elapse, &now) > 0)
298             pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now));
299     }
300 
301 #ifdef DEBUG_TIMING
302     {
303         pa_usec_t now = pa_rtclock_now();
304         p->awake = now - p->timestamp;
305         p->timestamp = now;
306         if (!p->quit && p->timer_enabled)
307             pa_log("poll timeout: %d ms ",(int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)));
308         else if (p->quit)
309             pa_log("poll timeout is ZERO");
310         else
311             pa_log("poll timeout is FOREVER");
312     }
313 #endif
314 
315     /* OK, now let's sleep */
316 #ifdef HAVE_PPOLL
317     {
318         struct timespec ts;
319         ts.tv_sec = timeout.tv_sec;
320         ts.tv_nsec = timeout.tv_usec * 1000;
321         r = ppoll(p->pollfd, p->n_pollfd_used, (p->quit || p->timer_enabled) ? &ts : NULL, NULL);
322     }
323 #else
324     r = pa_poll(p->pollfd, p->n_pollfd_used, (p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
325 #endif
326 
327     p->timer_elapsed = r == 0;
328 
329 #ifdef DEBUG_TIMING
330     {
331         pa_usec_t now = pa_rtclock_now();
332         p->slept = now - p->timestamp;
333         p->timestamp = now;
334 
335         pa_log("Process time %llu ms; sleep time %llu ms",
336                (unsigned long long) (p->awake / PA_USEC_PER_MSEC),
337                (unsigned long long) (p->slept / PA_USEC_PER_MSEC));
338     }
339 #endif
340 
341     if (r < 0) {
342         if (errno == EAGAIN || errno == EINTR) {
343             r = 0;
344         } else {
345             pa_log_error("Error %d in ppoll, errno: %s", r, pa_cstrerror(errno));
346             pa_log_error("poll(): %s", pa_cstrerror(errno));
347         }
348 
349         reset_all_revents(p);
350     }
351 
352     /* Let's tell everyone that we left the sleep */
353     for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
354 
355         if (i->dead)
356             continue;
357 
358         if (!i->after_cb)
359             continue;
360 
361         i->after_cb(i);
362     }
363 
364 finish:
365 
366     p->running = false;
367 
368     if (p->scan_for_dead) {
369         pa_rtpoll_item *n;
370 
371         p->scan_for_dead = false;
372 
373         for (i = p->items; i; i = n) {
374             n = i->next;
375 
376             if (i->dead)
377                 rtpoll_item_destroy(i);
378         }
379     }
380 
381     return r < 0 ? r : !p->quit;
382 }
383 
pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec)384 void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec) {
385     pa_assert(p);
386 
387     pa_timeval_store(&p->next_elapse, usec);
388     p->timer_enabled = true;
389 }
390 
pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec)391 void pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec) {
392     pa_assert(p);
393 
394     /* Scheduling a timeout for more than an hour is very very suspicious */
395     pa_assert(usec <= PA_USEC_PER_SEC*60ULL*60ULL);
396 
397     pa_rtclock_get(&p->next_elapse);
398     pa_timeval_add(&p->next_elapse, usec);
399     p->timer_enabled = true;
400 }
401 
pa_rtpoll_set_timer_disabled(pa_rtpoll *p)402 void pa_rtpoll_set_timer_disabled(pa_rtpoll *p) {
403     pa_assert(p);
404 
405     memset(&p->next_elapse, 0, sizeof(p->next_elapse));
406     p->timer_enabled = false;
407 }
408 
pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds)409 pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds) {
410     pa_rtpoll_item *i, *j, *l = NULL;
411 
412     pa_assert(p);
413 
414     if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
415         i = pa_xnew(pa_rtpoll_item, 1);
416 
417     i->rtpoll = p;
418     i->dead = false;
419     i->n_pollfd = n_fds;
420     i->pollfd = NULL;
421     i->priority = prio;
422 
423     i->work_userdata = NULL;
424     i->before_userdata = NULL;
425     i->work_userdata = NULL;
426     i->before_cb = NULL;
427     i->after_cb = NULL;
428     i->work_cb = NULL;
429 
430     for (j = p->items; j; j = j->next) {
431         if (prio <= j->priority)
432             break;
433 
434         l = j;
435     }
436 
437     PA_LLIST_INSERT_AFTER(pa_rtpoll_item, p->items, j ? j->prev : l, i);
438 
439     if (n_fds > 0) {
440         p->rebuild_needed = 1;
441         p->n_pollfd_used += n_fds;
442     }
443 
444     return i;
445 }
446 
pa_rtpoll_item_free(pa_rtpoll_item *i)447 void pa_rtpoll_item_free(pa_rtpoll_item *i) {
448     pa_assert(i);
449 
450     if (i->rtpoll->running) {
451         i->dead = true;
452         i->rtpoll->scan_for_dead = true;
453         return;
454     }
455 
456     rtpoll_item_destroy(i);
457 }
458 
pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds)459 struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds) {
460     pa_assert(i);
461 
462     if (i->n_pollfd > 0)
463         if (i->rtpoll->rebuild_needed)
464             rtpoll_rebuild(i->rtpoll);
465 
466     if (n_fds)
467         *n_fds = i->n_pollfd;
468 
469     return i->pollfd;
470 }
471 
pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i), void *userdata)472 void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i), void *userdata) {
473     pa_assert(i);
474     pa_assert(i->priority < PA_RTPOLL_NEVER);
475 
476     i->before_cb = before_cb;
477     i->before_userdata = userdata;
478 }
479 
pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i), void *userdata)480 void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i), void *userdata) {
481     pa_assert(i);
482     pa_assert(i->priority < PA_RTPOLL_NEVER);
483 
484     i->after_cb = after_cb;
485     i->after_userdata = userdata;
486 }
487 
pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i), void *userdata)488 void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i), void *userdata) {
489     pa_assert(i);
490     pa_assert(i->priority < PA_RTPOLL_NEVER);
491 
492     i->work_cb = work_cb;
493     i->work_userdata = userdata;
494 }
495 
pa_rtpoll_item_get_work_userdata(pa_rtpoll_item *i)496 void* pa_rtpoll_item_get_work_userdata(pa_rtpoll_item *i) {
497     pa_assert(i);
498 
499     return i->work_userdata;
500 }
501 
fdsem_before(pa_rtpoll_item *i)502 static int fdsem_before(pa_rtpoll_item *i) {
503 
504     if (pa_fdsem_before_poll(i->before_userdata) < 0)
505         return 1; /* 1 means immediate restart of the loop */
506 
507     return 0;
508 }
509 
fdsem_after(pa_rtpoll_item *i)510 static void fdsem_after(pa_rtpoll_item *i) {
511     pa_assert(i);
512 
513     pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
514     pa_fdsem_after_poll(i->after_userdata);
515 }
516 
pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f)517 pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f) {
518     pa_rtpoll_item *i;
519     struct pollfd *pollfd;
520 
521     pa_assert(p);
522     pa_assert(f);
523 
524     i = pa_rtpoll_item_new(p, prio, 1);
525 
526     pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
527 
528     pollfd->fd = pa_fdsem_get(f);
529     pollfd->events = POLLIN;
530 
531     pa_rtpoll_item_set_before_callback(i, fdsem_before, f);
532     pa_rtpoll_item_set_after_callback(i, fdsem_after, f);
533 
534     return i;
535 }
536 
asyncmsgq_read_before(pa_rtpoll_item *i)537 static int asyncmsgq_read_before(pa_rtpoll_item *i) {
538     pa_assert(i);
539 
540     if (pa_asyncmsgq_read_before_poll(i->before_userdata) < 0)
541         return 1; /* 1 means immediate restart of the loop */
542 
543     return 0;
544 }
545 
asyncmsgq_read_after(pa_rtpoll_item *i)546 static void asyncmsgq_read_after(pa_rtpoll_item *i) {
547     pa_assert(i);
548 
549     pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
550     pa_asyncmsgq_read_after_poll(i->after_userdata);
551 }
552 
asyncmsgq_read_work(pa_rtpoll_item *i)553 static int asyncmsgq_read_work(pa_rtpoll_item *i) {
554     pa_msgobject *object;
555     int code;
556     void *data;
557     pa_memchunk chunk;
558     int64_t offset;
559 
560     pa_assert(i);
561 
562     if (pa_asyncmsgq_get(i->work_userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
563         int ret;
564 
565         if (!object && code == PA_MESSAGE_SHUTDOWN) {
566             pa_asyncmsgq_done(i->work_userdata, 0);
567             /* Requests the loop to exit. Will cause the next iteration of
568              * pa_rtpoll_run() to return 0 */
569             i->rtpoll->quit = true;
570             return 1;
571         }
572 
573         clock_t start = clock();
574         ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
575         clock_t end = clock();
576         double deltatime = (double)(end - start) / CLOCKS_PER_SEC;
577         if (deltatime > 1.0) {
578             pa_log_error("code %d time out %f s", code, deltatime);
579         }
580         pa_asyncmsgq_done(i->work_userdata, ret);
581         return 1;
582     }
583 
584     return 0;
585 }
586 
pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q)587 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
588     pa_rtpoll_item *i;
589     struct pollfd *pollfd;
590 
591     pa_assert(p);
592     pa_assert(q);
593 
594     i = pa_rtpoll_item_new(p, prio, 1);
595 
596     pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
597     pollfd->fd = pa_asyncmsgq_read_fd(q);
598     pollfd->events = POLLIN;
599 
600     pa_rtpoll_item_set_before_callback(i, asyncmsgq_read_before, q);
601     pa_rtpoll_item_set_after_callback(i, asyncmsgq_read_after, q);
602     pa_rtpoll_item_set_work_callback(i, asyncmsgq_read_work, q);
603 
604     return i;
605 }
606 
asyncmsgq_write_before(pa_rtpoll_item *i)607 static int asyncmsgq_write_before(pa_rtpoll_item *i) {
608     pa_assert(i);
609 
610     pa_asyncmsgq_write_before_poll(i->before_userdata);
611     return 0;
612 }
613 
asyncmsgq_write_after(pa_rtpoll_item *i)614 static void asyncmsgq_write_after(pa_rtpoll_item *i) {
615     pa_assert(i);
616 
617     pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
618     pa_asyncmsgq_write_after_poll(i->after_userdata);
619 }
620 
pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q)621 pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
622     pa_rtpoll_item *i;
623     struct pollfd *pollfd;
624 
625     pa_assert(p);
626     pa_assert(q);
627 
628     i = pa_rtpoll_item_new(p, prio, 1);
629 
630     pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
631     pollfd->fd = pa_asyncmsgq_write_fd(q);
632     pollfd->events = POLLIN;
633 
634     pa_rtpoll_item_set_before_callback(i, asyncmsgq_write_before, q);
635     pa_rtpoll_item_set_after_callback(i, asyncmsgq_write_after, q);
636 
637     return i;
638 }
639 
pa_rtpoll_timer_elapsed(pa_rtpoll *p)640 bool pa_rtpoll_timer_elapsed(pa_rtpoll *p) {
641     pa_assert(p);
642 
643     return p->timer_elapsed;
644 }
645