1/***
2  This file is part of PulseAudio.
3
4  Copyright 2004-2006 Lennart Poettering
5  Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7  PulseAudio is free software; you can redistribute it and/or modify
8  it under the terms of the GNU Lesser General Public License as
9  published by the Free Software Foundation; either version 2.1 of the
10  License, or (at your option) any later version.
11
12  PulseAudio is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  Lesser General Public License for more details.
16
17  You should have received a copy of the GNU Lesser General Public
18  License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19***/
20
21#ifdef HAVE_CONFIG_H
22#include <config.h>
23#endif
24
25#include <sys/types.h>
26#include <stdio.h>
27#include <string.h>
28#include <errno.h>
29
30#include <pulse/xmalloc.h>
31#include <pulse/timeval.h>
32
33#include <pulsecore/poll.h>
34#include <pulsecore/core-error.h>
35#include <pulsecore/core-rtclock.h>
36#include <pulsecore/macro.h>
37#include <pulsecore/llist.h>
38#include <pulsecore/flist.h>
39#include <pulsecore/core-util.h>
40#include <pulsecore/ratelimit.h>
41#include <pulse/rtclock.h>
42
43#include "rtpoll.h"
44#include "time.h"
45
46/* #define DEBUG_TIMING */
47
48struct pa_rtpoll {
49    struct pollfd *pollfd, *pollfd2;
50    unsigned n_pollfd_alloc, n_pollfd_used;
51
52    struct timeval next_elapse;
53    bool timer_enabled:1;
54
55    bool scan_for_dead:1;
56    bool running:1;
57    bool rebuild_needed:1;
58    bool quit:1;
59    bool timer_elapsed:1;
60
61#ifdef DEBUG_TIMING
62    pa_usec_t timestamp;
63    pa_usec_t slept, awake;
64#endif
65
66    PA_LLIST_HEAD(pa_rtpoll_item, items);
67};
68
69struct pa_rtpoll_item {
70    pa_rtpoll *rtpoll;
71    bool dead;
72
73    pa_rtpoll_priority_t priority;
74
75    struct pollfd *pollfd;
76    unsigned n_pollfd;
77
78    int (*work_cb)(pa_rtpoll_item *i);
79    int (*before_cb)(pa_rtpoll_item *i);
80    void (*after_cb)(pa_rtpoll_item *i);
81    void *work_userdata;
82    void *before_userdata;
83    void *after_userdata;
84
85    PA_LLIST_FIELDS(pa_rtpoll_item);
86};
87
88PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
89
90pa_rtpoll *pa_rtpoll_new(void) {
91    pa_rtpoll *p;
92
93    p = pa_xnew0(pa_rtpoll, 1);
94
95    p->n_pollfd_alloc = 32;
96    p->pollfd = pa_xnew(struct pollfd, p->n_pollfd_alloc);
97    p->pollfd2 = pa_xnew(struct pollfd, p->n_pollfd_alloc);
98
99#ifdef DEBUG_TIMING
100    p->timestamp = pa_rtclock_now();
101#endif
102
103    return p;
104}
105
106static void rtpoll_rebuild(pa_rtpoll *p) {
107
108    struct pollfd *e, *t;
109    pa_rtpoll_item *i;
110    int ra = 0;
111
112    pa_assert(p);
113
114    p->rebuild_needed = false;
115
116    if (p->n_pollfd_used > p->n_pollfd_alloc) {
117        /* Hmm, we have to allocate some more space */
118        p->n_pollfd_alloc = p->n_pollfd_used * 2;
119        p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
120        ra = 1;
121    }
122
123    e = p->pollfd2;
124
125    for (i = p->items; i; i = i->next) {
126
127        if (i->n_pollfd > 0) {
128            size_t l = i->n_pollfd * sizeof(struct pollfd);
129
130            if (i->pollfd)
131                memcpy(e, i->pollfd, l);
132            else
133                memset(e, 0, l);
134
135            i->pollfd = e;
136        } else
137            i->pollfd = NULL;
138
139        e += i->n_pollfd;
140    }
141
142    pa_assert((unsigned) (e - p->pollfd2) == p->n_pollfd_used);
143    t = p->pollfd;
144    p->pollfd = p->pollfd2;
145    p->pollfd2 = t;
146
147    if (ra)
148        p->pollfd2 = pa_xrealloc(p->pollfd2, p->n_pollfd_alloc * sizeof(struct pollfd));
149}
150
151static void rtpoll_item_destroy(pa_rtpoll_item *i) {
152    pa_rtpoll *p;
153
154    pa_assert(i);
155
156    p = i->rtpoll;
157
158    PA_LLIST_REMOVE(pa_rtpoll_item, p->items, i);
159
160    p->n_pollfd_used -= i->n_pollfd;
161
162    if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
163        pa_xfree(i);
164
165    p->rebuild_needed = true;
166}
167
168void pa_rtpoll_free(pa_rtpoll *p) {
169    pa_assert(p);
170
171    while (p->items)
172        rtpoll_item_destroy(p->items);
173
174    pa_xfree(p->pollfd);
175    pa_xfree(p->pollfd2);
176
177    pa_xfree(p);
178}
179
180static void reset_revents(pa_rtpoll_item *i) {
181    struct pollfd *f;
182    unsigned n;
183
184    pa_assert(i);
185
186    if (!(f = pa_rtpoll_item_get_pollfd(i, &n)))
187        return;
188
189    for (; n > 0; n--)
190        f[n-1].revents = 0;
191}
192
193static void reset_all_revents(pa_rtpoll *p) {
194    pa_rtpoll_item *i;
195
196    pa_assert(p);
197
198    for (i = p->items; i; i = i->next) {
199
200        if (i->dead)
201            continue;
202
203        reset_revents(i);
204    }
205}
206
207int pa_rtpoll_run(pa_rtpoll *p) {
208    pa_rtpoll_item *i;
209    int r = 0;
210    struct timeval timeout;
211
212    pa_assert(p);
213    pa_assert(!p->running);
214
215#ifdef DEBUG_TIMING
216    pa_log("rtpoll_run");
217#endif
218
219    p->running = true;
220    p->timer_elapsed = false;
221
222    /* First, let's do some work */
223    for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
224        int k;
225
226        if (i->dead)
227            continue;
228
229        if (!i->work_cb)
230            continue;
231
232        if (p->quit) {
233#ifdef DEBUG_TIMING
234            pa_log("rtpoll finish");
235#endif
236            goto finish;
237        }
238
239        if ((k = i->work_cb(i)) != 0) {
240            if (k < 0) {
241                r = k;
242                pa_log_error("Error %d in i->work_cb, goto finish", r);
243            }
244#ifdef DEBUG_TIMING
245            pa_log("rtpoll finish");
246#endif
247            goto finish;
248        }
249    }
250
251    /* Now let's prepare for entering the sleep */
252    for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
253        int k = 0;
254
255        if (i->dead)
256            continue;
257
258        if (!i->before_cb)
259            continue;
260
261        if (p->quit || (k = i->before_cb(i)) != 0) {
262
263            /* Hmm, this one doesn't let us enter the poll, so rewind everything */
264
265            for (i = i->prev; i; i = i->prev) {
266
267                if (i->dead)
268                    continue;
269
270                if (!i->after_cb)
271                    continue;
272
273                i->after_cb(i);
274            }
275
276            if (k < 0) {
277                pa_log_error("Error %d in i->before_cb, goto finish", r);
278                r = k;
279            }
280#ifdef DEBUG_TIMING
281            pa_log("rtpoll finish");
282#endif
283            goto finish;
284        }
285    }
286
287    if (p->rebuild_needed)
288        rtpoll_rebuild(p);
289
290    pa_zero(timeout);
291
292    /* Calculate timeout */
293    if (!p->quit && p->timer_enabled) {
294        struct timeval now;
295        pa_rtclock_get(&now);
296
297        if (pa_timeval_cmp(&p->next_elapse, &now) > 0)
298            pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now));
299    }
300
301#ifdef DEBUG_TIMING
302    {
303        pa_usec_t now = pa_rtclock_now();
304        p->awake = now - p->timestamp;
305        p->timestamp = now;
306        if (!p->quit && p->timer_enabled)
307            pa_log("poll timeout: %d ms ",(int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)));
308        else if (p->quit)
309            pa_log("poll timeout is ZERO");
310        else
311            pa_log("poll timeout is FOREVER");
312    }
313#endif
314
315    /* OK, now let's sleep */
316#ifdef HAVE_PPOLL
317    {
318        struct timespec ts;
319        ts.tv_sec = timeout.tv_sec;
320        ts.tv_nsec = timeout.tv_usec * 1000;
321        r = ppoll(p->pollfd, p->n_pollfd_used, (p->quit || p->timer_enabled) ? &ts : NULL, NULL);
322    }
323#else
324    r = pa_poll(p->pollfd, p->n_pollfd_used, (p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1);
325#endif
326
327    p->timer_elapsed = r == 0;
328
329#ifdef DEBUG_TIMING
330    {
331        pa_usec_t now = pa_rtclock_now();
332        p->slept = now - p->timestamp;
333        p->timestamp = now;
334
335        pa_log("Process time %llu ms; sleep time %llu ms",
336               (unsigned long long) (p->awake / PA_USEC_PER_MSEC),
337               (unsigned long long) (p->slept / PA_USEC_PER_MSEC));
338    }
339#endif
340
341    if (r < 0) {
342        if (errno == EAGAIN || errno == EINTR) {
343            r = 0;
344        } else {
345            pa_log_error("Error %d in ppoll, errno: %s", r, pa_cstrerror(errno));
346            pa_log_error("poll(): %s", pa_cstrerror(errno));
347        }
348
349        reset_all_revents(p);
350    }
351
352    /* Let's tell everyone that we left the sleep */
353    for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) {
354
355        if (i->dead)
356            continue;
357
358        if (!i->after_cb)
359            continue;
360
361        i->after_cb(i);
362    }
363
364finish:
365
366    p->running = false;
367
368    if (p->scan_for_dead) {
369        pa_rtpoll_item *n;
370
371        p->scan_for_dead = false;
372
373        for (i = p->items; i; i = n) {
374            n = i->next;
375
376            if (i->dead)
377                rtpoll_item_destroy(i);
378        }
379    }
380
381    return r < 0 ? r : !p->quit;
382}
383
384void pa_rtpoll_set_timer_absolute(pa_rtpoll *p, pa_usec_t usec) {
385    pa_assert(p);
386
387    pa_timeval_store(&p->next_elapse, usec);
388    p->timer_enabled = true;
389}
390
391void pa_rtpoll_set_timer_relative(pa_rtpoll *p, pa_usec_t usec) {
392    pa_assert(p);
393
394    /* Scheduling a timeout for more than an hour is very very suspicious */
395    pa_assert(usec <= PA_USEC_PER_SEC*60ULL*60ULL);
396
397    pa_rtclock_get(&p->next_elapse);
398    pa_timeval_add(&p->next_elapse, usec);
399    p->timer_enabled = true;
400}
401
402void pa_rtpoll_set_timer_disabled(pa_rtpoll *p) {
403    pa_assert(p);
404
405    memset(&p->next_elapse, 0, sizeof(p->next_elapse));
406    p->timer_enabled = false;
407}
408
409pa_rtpoll_item *pa_rtpoll_item_new(pa_rtpoll *p, pa_rtpoll_priority_t prio, unsigned n_fds) {
410    pa_rtpoll_item *i, *j, *l = NULL;
411
412    pa_assert(p);
413
414    if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
415        i = pa_xnew(pa_rtpoll_item, 1);
416
417    i->rtpoll = p;
418    i->dead = false;
419    i->n_pollfd = n_fds;
420    i->pollfd = NULL;
421    i->priority = prio;
422
423    i->work_userdata = NULL;
424    i->before_userdata = NULL;
425    i->work_userdata = NULL;
426    i->before_cb = NULL;
427    i->after_cb = NULL;
428    i->work_cb = NULL;
429
430    for (j = p->items; j; j = j->next) {
431        if (prio <= j->priority)
432            break;
433
434        l = j;
435    }
436
437    PA_LLIST_INSERT_AFTER(pa_rtpoll_item, p->items, j ? j->prev : l, i);
438
439    if (n_fds > 0) {
440        p->rebuild_needed = 1;
441        p->n_pollfd_used += n_fds;
442    }
443
444    return i;
445}
446
447void pa_rtpoll_item_free(pa_rtpoll_item *i) {
448    pa_assert(i);
449
450    if (i->rtpoll->running) {
451        i->dead = true;
452        i->rtpoll->scan_for_dead = true;
453        return;
454    }
455
456    rtpoll_item_destroy(i);
457}
458
459struct pollfd *pa_rtpoll_item_get_pollfd(pa_rtpoll_item *i, unsigned *n_fds) {
460    pa_assert(i);
461
462    if (i->n_pollfd > 0)
463        if (i->rtpoll->rebuild_needed)
464            rtpoll_rebuild(i->rtpoll);
465
466    if (n_fds)
467        *n_fds = i->n_pollfd;
468
469    return i->pollfd;
470}
471
472void pa_rtpoll_item_set_before_callback(pa_rtpoll_item *i, int (*before_cb)(pa_rtpoll_item *i), void *userdata) {
473    pa_assert(i);
474    pa_assert(i->priority < PA_RTPOLL_NEVER);
475
476    i->before_cb = before_cb;
477    i->before_userdata = userdata;
478}
479
480void pa_rtpoll_item_set_after_callback(pa_rtpoll_item *i, void (*after_cb)(pa_rtpoll_item *i), void *userdata) {
481    pa_assert(i);
482    pa_assert(i->priority < PA_RTPOLL_NEVER);
483
484    i->after_cb = after_cb;
485    i->after_userdata = userdata;
486}
487
488void pa_rtpoll_item_set_work_callback(pa_rtpoll_item *i, int (*work_cb)(pa_rtpoll_item *i), void *userdata) {
489    pa_assert(i);
490    pa_assert(i->priority < PA_RTPOLL_NEVER);
491
492    i->work_cb = work_cb;
493    i->work_userdata = userdata;
494}
495
496void* pa_rtpoll_item_get_work_userdata(pa_rtpoll_item *i) {
497    pa_assert(i);
498
499    return i->work_userdata;
500}
501
502static int fdsem_before(pa_rtpoll_item *i) {
503
504    if (pa_fdsem_before_poll(i->before_userdata) < 0)
505        return 1; /* 1 means immediate restart of the loop */
506
507    return 0;
508}
509
510static void fdsem_after(pa_rtpoll_item *i) {
511    pa_assert(i);
512
513    pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
514    pa_fdsem_after_poll(i->after_userdata);
515}
516
517pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *f) {
518    pa_rtpoll_item *i;
519    struct pollfd *pollfd;
520
521    pa_assert(p);
522    pa_assert(f);
523
524    i = pa_rtpoll_item_new(p, prio, 1);
525
526    pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
527
528    pollfd->fd = pa_fdsem_get(f);
529    pollfd->events = POLLIN;
530
531    pa_rtpoll_item_set_before_callback(i, fdsem_before, f);
532    pa_rtpoll_item_set_after_callback(i, fdsem_after, f);
533
534    return i;
535}
536
537static int asyncmsgq_read_before(pa_rtpoll_item *i) {
538    pa_assert(i);
539
540    if (pa_asyncmsgq_read_before_poll(i->before_userdata) < 0)
541        return 1; /* 1 means immediate restart of the loop */
542
543    return 0;
544}
545
546static void asyncmsgq_read_after(pa_rtpoll_item *i) {
547    pa_assert(i);
548
549    pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
550    pa_asyncmsgq_read_after_poll(i->after_userdata);
551}
552
553static int asyncmsgq_read_work(pa_rtpoll_item *i) {
554    pa_msgobject *object;
555    int code;
556    void *data;
557    pa_memchunk chunk;
558    int64_t offset;
559
560    pa_assert(i);
561
562    if (pa_asyncmsgq_get(i->work_userdata, &object, &code, &data, &offset, &chunk, 0) == 0) {
563        int ret;
564
565        if (!object && code == PA_MESSAGE_SHUTDOWN) {
566            pa_asyncmsgq_done(i->work_userdata, 0);
567            /* Requests the loop to exit. Will cause the next iteration of
568             * pa_rtpoll_run() to return 0 */
569            i->rtpoll->quit = true;
570            return 1;
571        }
572
573        clock_t start = clock();
574        ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
575        clock_t end = clock();
576        double deltatime = (double)(end - start) / CLOCKS_PER_SEC;
577        if (deltatime > 1.0) {
578            pa_log_error("code %d time out %f s", code, deltatime);
579        }
580        pa_asyncmsgq_done(i->work_userdata, ret);
581        return 1;
582    }
583
584    return 0;
585}
586
587pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
588    pa_rtpoll_item *i;
589    struct pollfd *pollfd;
590
591    pa_assert(p);
592    pa_assert(q);
593
594    i = pa_rtpoll_item_new(p, prio, 1);
595
596    pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
597    pollfd->fd = pa_asyncmsgq_read_fd(q);
598    pollfd->events = POLLIN;
599
600    pa_rtpoll_item_set_before_callback(i, asyncmsgq_read_before, q);
601    pa_rtpoll_item_set_after_callback(i, asyncmsgq_read_after, q);
602    pa_rtpoll_item_set_work_callback(i, asyncmsgq_read_work, q);
603
604    return i;
605}
606
607static int asyncmsgq_write_before(pa_rtpoll_item *i) {
608    pa_assert(i);
609
610    pa_asyncmsgq_write_before_poll(i->before_userdata);
611    return 0;
612}
613
614static void asyncmsgq_write_after(pa_rtpoll_item *i) {
615    pa_assert(i);
616
617    pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
618    pa_asyncmsgq_write_after_poll(i->after_userdata);
619}
620
621pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
622    pa_rtpoll_item *i;
623    struct pollfd *pollfd;
624
625    pa_assert(p);
626    pa_assert(q);
627
628    i = pa_rtpoll_item_new(p, prio, 1);
629
630    pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
631    pollfd->fd = pa_asyncmsgq_write_fd(q);
632    pollfd->events = POLLIN;
633
634    pa_rtpoll_item_set_before_callback(i, asyncmsgq_write_before, q);
635    pa_rtpoll_item_set_after_callback(i, asyncmsgq_write_after, q);
636
637    return i;
638}
639
640bool pa_rtpoll_timer_elapsed(pa_rtpoll *p) {
641    pa_assert(p);
642
643    return p->timer_elapsed;
644}
645