153a5a1b3Sopenharmony_ci/***
253a5a1b3Sopenharmony_ci  This file is part of PulseAudio.
353a5a1b3Sopenharmony_ci
453a5a1b3Sopenharmony_ci  Copyright 2004-2006 Lennart Poettering
553a5a1b3Sopenharmony_ci
653a5a1b3Sopenharmony_ci  PulseAudio is free software; you can redistribute it and/or modify
753a5a1b3Sopenharmony_ci  it under the terms of the GNU Lesser General Public License as published
853a5a1b3Sopenharmony_ci  by the Free Software Foundation; either version 2.1 of the License,
953a5a1b3Sopenharmony_ci  or (at your option) any later version.
1053a5a1b3Sopenharmony_ci
1153a5a1b3Sopenharmony_ci  PulseAudio is distributed in the hope that it will be useful, but
1253a5a1b3Sopenharmony_ci  WITHOUT ANY WARRANTY; without even the implied warranty of
1353a5a1b3Sopenharmony_ci  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
1453a5a1b3Sopenharmony_ci  General Public License for more details.
1553a5a1b3Sopenharmony_ci
1653a5a1b3Sopenharmony_ci  You should have received a copy of the GNU Lesser General Public License
1753a5a1b3Sopenharmony_ci  along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
1853a5a1b3Sopenharmony_ci***/
1953a5a1b3Sopenharmony_ci
2053a5a1b3Sopenharmony_ci#ifdef HAVE_CONFIG_H
2153a5a1b3Sopenharmony_ci#include <config.h>
2253a5a1b3Sopenharmony_ci#endif
2353a5a1b3Sopenharmony_ci
2453a5a1b3Sopenharmony_ci#include <stdlib.h>
2553a5a1b3Sopenharmony_ci#include <sys/stat.h>
2653a5a1b3Sopenharmony_ci#include <stdio.h>
2753a5a1b3Sopenharmony_ci#include <errno.h>
2853a5a1b3Sopenharmony_ci#include <fcntl.h>
2953a5a1b3Sopenharmony_ci#include <unistd.h>
3053a5a1b3Sopenharmony_ci#include <sys/ioctl.h>
3153a5a1b3Sopenharmony_ci
3253a5a1b3Sopenharmony_ci#ifdef HAVE_SYS_FILIO_H
3353a5a1b3Sopenharmony_ci#include <sys/filio.h>
3453a5a1b3Sopenharmony_ci#endif
3553a5a1b3Sopenharmony_ci
3653a5a1b3Sopenharmony_ci#include <pulse/xmalloc.h>
3753a5a1b3Sopenharmony_ci#include <pulse/timeval.h>
3853a5a1b3Sopenharmony_ci#include <pulse/util.h>
3953a5a1b3Sopenharmony_ci#include <pulse/rtclock.h>
4053a5a1b3Sopenharmony_ci
4153a5a1b3Sopenharmony_ci#include <pulsecore/core-error.h>
4253a5a1b3Sopenharmony_ci#include <pulsecore/sink.h>
4353a5a1b3Sopenharmony_ci#include <pulsecore/module.h>
4453a5a1b3Sopenharmony_ci#include <pulsecore/core-util.h>
4553a5a1b3Sopenharmony_ci#include <pulsecore/modargs.h>
4653a5a1b3Sopenharmony_ci#include <pulsecore/log.h>
4753a5a1b3Sopenharmony_ci#include <pulsecore/thread.h>
4853a5a1b3Sopenharmony_ci#include <pulsecore/thread-mq.h>
4953a5a1b3Sopenharmony_ci#include <pulsecore/rtpoll.h>
5053a5a1b3Sopenharmony_ci#include <pulsecore/poll.h>
5153a5a1b3Sopenharmony_ci
5253a5a1b3Sopenharmony_ciPA_MODULE_AUTHOR("Lennart Poettering");
5353a5a1b3Sopenharmony_ciPA_MODULE_DESCRIPTION("UNIX pipe sink");
5453a5a1b3Sopenharmony_ciPA_MODULE_VERSION(PACKAGE_VERSION);
5553a5a1b3Sopenharmony_ciPA_MODULE_LOAD_ONCE(false);
5653a5a1b3Sopenharmony_ciPA_MODULE_USAGE(
5753a5a1b3Sopenharmony_ci        "sink_name=<name for the sink> "
5853a5a1b3Sopenharmony_ci        "sink_properties=<properties for the sink> "
5953a5a1b3Sopenharmony_ci        "file=<path of the FIFO> "
6053a5a1b3Sopenharmony_ci        "format=<sample format> "
6153a5a1b3Sopenharmony_ci        "rate=<sample rate> "
6253a5a1b3Sopenharmony_ci        "channels=<number of channels> "
6353a5a1b3Sopenharmony_ci        "channel_map=<channel map> "
6453a5a1b3Sopenharmony_ci        "use_system_clock_for_timing=<yes or no> "
6553a5a1b3Sopenharmony_ci);
6653a5a1b3Sopenharmony_ci
6753a5a1b3Sopenharmony_ci#define DEFAULT_FILE_NAME "fifo_output"
6853a5a1b3Sopenharmony_ci#define DEFAULT_SINK_NAME "fifo_output"
6953a5a1b3Sopenharmony_ci
7053a5a1b3Sopenharmony_cistruct userdata {
7153a5a1b3Sopenharmony_ci    pa_core *core;
7253a5a1b3Sopenharmony_ci    pa_module *module;
7353a5a1b3Sopenharmony_ci    pa_sink *sink;
7453a5a1b3Sopenharmony_ci
7553a5a1b3Sopenharmony_ci    pa_thread *thread;
7653a5a1b3Sopenharmony_ci    pa_thread_mq thread_mq;
7753a5a1b3Sopenharmony_ci    pa_rtpoll *rtpoll;
7853a5a1b3Sopenharmony_ci
7953a5a1b3Sopenharmony_ci    char *filename;
8053a5a1b3Sopenharmony_ci    int fd;
8153a5a1b3Sopenharmony_ci    bool do_unlink_fifo;
8253a5a1b3Sopenharmony_ci    size_t buffer_size;
8353a5a1b3Sopenharmony_ci    size_t bytes_dropped;
8453a5a1b3Sopenharmony_ci    bool fifo_error;
8553a5a1b3Sopenharmony_ci
8653a5a1b3Sopenharmony_ci    pa_memchunk memchunk;
8753a5a1b3Sopenharmony_ci
8853a5a1b3Sopenharmony_ci    pa_rtpoll_item *rtpoll_item;
8953a5a1b3Sopenharmony_ci
9053a5a1b3Sopenharmony_ci    int write_type;
9153a5a1b3Sopenharmony_ci    pa_usec_t block_usec;
9253a5a1b3Sopenharmony_ci    pa_usec_t timestamp;
9353a5a1b3Sopenharmony_ci
9453a5a1b3Sopenharmony_ci    bool use_system_clock_for_timing;
9553a5a1b3Sopenharmony_ci};
9653a5a1b3Sopenharmony_ci
9753a5a1b3Sopenharmony_cistatic const char* const valid_modargs[] = {
9853a5a1b3Sopenharmony_ci    "sink_name",
9953a5a1b3Sopenharmony_ci    "sink_properties",
10053a5a1b3Sopenharmony_ci    "file",
10153a5a1b3Sopenharmony_ci    "format",
10253a5a1b3Sopenharmony_ci    "rate",
10353a5a1b3Sopenharmony_ci    "channels",
10453a5a1b3Sopenharmony_ci    "channel_map",
10553a5a1b3Sopenharmony_ci    "use_system_clock_for_timing",
10653a5a1b3Sopenharmony_ci    NULL
10753a5a1b3Sopenharmony_ci};
10853a5a1b3Sopenharmony_ci
10953a5a1b3Sopenharmony_cistatic int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
11053a5a1b3Sopenharmony_ci    struct userdata *u = PA_SINK(o)->userdata;
11153a5a1b3Sopenharmony_ci
11253a5a1b3Sopenharmony_ci    switch (code) {
11353a5a1b3Sopenharmony_ci        case PA_SINK_MESSAGE_GET_LATENCY:
11453a5a1b3Sopenharmony_ci            if (u->use_system_clock_for_timing) {
11553a5a1b3Sopenharmony_ci                pa_usec_t now;
11653a5a1b3Sopenharmony_ci                now = pa_rtclock_now();
11753a5a1b3Sopenharmony_ci                *((int64_t*) data) = (int64_t)u->timestamp - (int64_t)now;
11853a5a1b3Sopenharmony_ci            } else {
11953a5a1b3Sopenharmony_ci                size_t n = 0;
12053a5a1b3Sopenharmony_ci
12153a5a1b3Sopenharmony_ci#ifdef FIONREAD
12253a5a1b3Sopenharmony_ci                int l;
12353a5a1b3Sopenharmony_ci
12453a5a1b3Sopenharmony_ci                if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0)
12553a5a1b3Sopenharmony_ci                    n = (size_t) l;
12653a5a1b3Sopenharmony_ci#endif
12753a5a1b3Sopenharmony_ci
12853a5a1b3Sopenharmony_ci                n += u->memchunk.length;
12953a5a1b3Sopenharmony_ci
13053a5a1b3Sopenharmony_ci                *((int64_t*) data) = pa_bytes_to_usec(n, &u->sink->sample_spec);
13153a5a1b3Sopenharmony_ci            }
13253a5a1b3Sopenharmony_ci            return 0;
13353a5a1b3Sopenharmony_ci    }
13453a5a1b3Sopenharmony_ci
13553a5a1b3Sopenharmony_ci    return pa_sink_process_msg(o, code, data, offset, chunk);
13653a5a1b3Sopenharmony_ci}
13753a5a1b3Sopenharmony_ci
13853a5a1b3Sopenharmony_ci/* Called from the IO thread. */
13953a5a1b3Sopenharmony_cistatic int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
14053a5a1b3Sopenharmony_ci    struct userdata *u;
14153a5a1b3Sopenharmony_ci
14253a5a1b3Sopenharmony_ci    pa_assert(s);
14353a5a1b3Sopenharmony_ci    pa_assert_se(u = s->userdata);
14453a5a1b3Sopenharmony_ci
14553a5a1b3Sopenharmony_ci    if (s->thread_info.state == PA_SINK_SUSPENDED || s->thread_info.state == PA_SINK_INIT) {
14653a5a1b3Sopenharmony_ci        if (PA_SINK_IS_OPENED(new_state))
14753a5a1b3Sopenharmony_ci            u->timestamp = pa_rtclock_now();
14853a5a1b3Sopenharmony_ci    } else if (PA_SINK_IS_OPENED(s->thread_info.state)) {
14953a5a1b3Sopenharmony_ci        if (new_state == PA_SINK_SUSPENDED) {
15053a5a1b3Sopenharmony_ci            /* Clear potential FIFO error flag */
15153a5a1b3Sopenharmony_ci            u->fifo_error = false;
15253a5a1b3Sopenharmony_ci
15353a5a1b3Sopenharmony_ci            /* Continuously dropping data (clear counter on entering suspended state. */
15453a5a1b3Sopenharmony_ci            if (u->bytes_dropped != 0) {
15553a5a1b3Sopenharmony_ci                pa_log_debug("Pipe-sink continuously dropping data - clear statistics (%zu -> 0 bytes dropped)", u->bytes_dropped);
15653a5a1b3Sopenharmony_ci                u->bytes_dropped = 0;
15753a5a1b3Sopenharmony_ci            }
15853a5a1b3Sopenharmony_ci        }
15953a5a1b3Sopenharmony_ci    }
16053a5a1b3Sopenharmony_ci
16153a5a1b3Sopenharmony_ci    return 0;
16253a5a1b3Sopenharmony_ci}
16353a5a1b3Sopenharmony_ci
16453a5a1b3Sopenharmony_cistatic void sink_update_requested_latency_cb(pa_sink *s) {
16553a5a1b3Sopenharmony_ci    struct userdata *u;
16653a5a1b3Sopenharmony_ci    size_t nbytes;
16753a5a1b3Sopenharmony_ci
16853a5a1b3Sopenharmony_ci    pa_sink_assert_ref(s);
16953a5a1b3Sopenharmony_ci    pa_assert_se(u = s->userdata);
17053a5a1b3Sopenharmony_ci
17153a5a1b3Sopenharmony_ci    u->block_usec = pa_sink_get_requested_latency_within_thread(s);
17253a5a1b3Sopenharmony_ci
17353a5a1b3Sopenharmony_ci    if (u->block_usec == (pa_usec_t) -1)
17453a5a1b3Sopenharmony_ci        u->block_usec = s->thread_info.max_latency;
17553a5a1b3Sopenharmony_ci
17653a5a1b3Sopenharmony_ci    nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec);
17753a5a1b3Sopenharmony_ci    pa_sink_set_max_request_within_thread(s, nbytes);
17853a5a1b3Sopenharmony_ci}
17953a5a1b3Sopenharmony_ci
18053a5a1b3Sopenharmony_cistatic ssize_t pipe_sink_write(struct userdata *u, pa_memchunk *pchunk) {
18153a5a1b3Sopenharmony_ci    size_t index, length;
18253a5a1b3Sopenharmony_ci    ssize_t count = 0;
18353a5a1b3Sopenharmony_ci    void *p;
18453a5a1b3Sopenharmony_ci
18553a5a1b3Sopenharmony_ci    pa_assert(u);
18653a5a1b3Sopenharmony_ci    pa_assert(pchunk);
18753a5a1b3Sopenharmony_ci
18853a5a1b3Sopenharmony_ci    index = pchunk->index;
18953a5a1b3Sopenharmony_ci    length = pchunk->length;
19053a5a1b3Sopenharmony_ci    p = pa_memblock_acquire(pchunk->memblock);
19153a5a1b3Sopenharmony_ci
19253a5a1b3Sopenharmony_ci    for (;;) {
19353a5a1b3Sopenharmony_ci        ssize_t l;
19453a5a1b3Sopenharmony_ci
19553a5a1b3Sopenharmony_ci        l = pa_write(u->fd, (uint8_t*) p + index, length, &u->write_type);
19653a5a1b3Sopenharmony_ci
19753a5a1b3Sopenharmony_ci        pa_assert(l != 0);
19853a5a1b3Sopenharmony_ci
19953a5a1b3Sopenharmony_ci        if (l < 0) {
20053a5a1b3Sopenharmony_ci            if (errno == EAGAIN)
20153a5a1b3Sopenharmony_ci                break;
20253a5a1b3Sopenharmony_ci
20353a5a1b3Sopenharmony_ci            if (!u->fifo_error) {
20453a5a1b3Sopenharmony_ci                pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
20553a5a1b3Sopenharmony_ci                u->fifo_error = true;
20653a5a1b3Sopenharmony_ci            }
20753a5a1b3Sopenharmony_ci            count = -1 - count;
20853a5a1b3Sopenharmony_ci            break;
20953a5a1b3Sopenharmony_ci        } else {
21053a5a1b3Sopenharmony_ci            if (u->fifo_error) {
21153a5a1b3Sopenharmony_ci                pa_log_debug("Recovered from FIFO error");
21253a5a1b3Sopenharmony_ci                u->fifo_error = false;
21353a5a1b3Sopenharmony_ci            }
21453a5a1b3Sopenharmony_ci            count += l;
21553a5a1b3Sopenharmony_ci            index += l;
21653a5a1b3Sopenharmony_ci            length -= l;
21753a5a1b3Sopenharmony_ci
21853a5a1b3Sopenharmony_ci            if (length <= 0) {
21953a5a1b3Sopenharmony_ci                break;
22053a5a1b3Sopenharmony_ci            }
22153a5a1b3Sopenharmony_ci        }
22253a5a1b3Sopenharmony_ci    }
22353a5a1b3Sopenharmony_ci
22453a5a1b3Sopenharmony_ci    pa_memblock_release(pchunk->memblock);
22553a5a1b3Sopenharmony_ci
22653a5a1b3Sopenharmony_ci    return count;
22753a5a1b3Sopenharmony_ci}
22853a5a1b3Sopenharmony_ci
22953a5a1b3Sopenharmony_cistatic void process_render_use_timing(struct userdata *u, pa_usec_t now) {
23053a5a1b3Sopenharmony_ci    size_t dropped = 0;
23153a5a1b3Sopenharmony_ci    size_t consumed = 0;
23253a5a1b3Sopenharmony_ci
23353a5a1b3Sopenharmony_ci    pa_assert(u);
23453a5a1b3Sopenharmony_ci
23553a5a1b3Sopenharmony_ci    /* Fill the buffer up the latency size */
23653a5a1b3Sopenharmony_ci    while (u->timestamp < now + u->block_usec) {
23753a5a1b3Sopenharmony_ci        ssize_t written = 0;
23853a5a1b3Sopenharmony_ci        pa_memchunk chunk;
23953a5a1b3Sopenharmony_ci
24053a5a1b3Sopenharmony_ci        pa_sink_render(u->sink, u->sink->thread_info.max_request, &chunk);
24153a5a1b3Sopenharmony_ci
24253a5a1b3Sopenharmony_ci        pa_assert(chunk.length > 0);
24353a5a1b3Sopenharmony_ci
24453a5a1b3Sopenharmony_ci        if ((written = pipe_sink_write(u, &chunk)) < 0)
24553a5a1b3Sopenharmony_ci            written = -1 - written;
24653a5a1b3Sopenharmony_ci
24753a5a1b3Sopenharmony_ci        pa_memblock_unref(chunk.memblock);
24853a5a1b3Sopenharmony_ci
24953a5a1b3Sopenharmony_ci        u->timestamp += pa_bytes_to_usec(chunk.length, &u->sink->sample_spec);
25053a5a1b3Sopenharmony_ci
25153a5a1b3Sopenharmony_ci        dropped = chunk.length - written;
25253a5a1b3Sopenharmony_ci
25353a5a1b3Sopenharmony_ci        if (u->bytes_dropped != 0 && dropped != chunk.length) {
25453a5a1b3Sopenharmony_ci            pa_log_debug("Pipe-sink continuously dropped %zu bytes", u->bytes_dropped);
25553a5a1b3Sopenharmony_ci            u->bytes_dropped = 0;
25653a5a1b3Sopenharmony_ci        }
25753a5a1b3Sopenharmony_ci
25853a5a1b3Sopenharmony_ci        if (u->bytes_dropped == 0 && dropped != 0)
25953a5a1b3Sopenharmony_ci            pa_log_debug("Pipe-sink just dropped %zu bytes", dropped);
26053a5a1b3Sopenharmony_ci
26153a5a1b3Sopenharmony_ci        u->bytes_dropped += dropped;
26253a5a1b3Sopenharmony_ci
26353a5a1b3Sopenharmony_ci        consumed += chunk.length;
26453a5a1b3Sopenharmony_ci
26553a5a1b3Sopenharmony_ci        if (consumed >= u->sink->thread_info.max_request)
26653a5a1b3Sopenharmony_ci            break;
26753a5a1b3Sopenharmony_ci    }
26853a5a1b3Sopenharmony_ci}
26953a5a1b3Sopenharmony_ci
27053a5a1b3Sopenharmony_cistatic int process_render(struct userdata *u) {
27153a5a1b3Sopenharmony_ci    pa_assert(u);
27253a5a1b3Sopenharmony_ci
27353a5a1b3Sopenharmony_ci    if (u->memchunk.length <= 0)
27453a5a1b3Sopenharmony_ci        pa_sink_render(u->sink, u->buffer_size, &u->memchunk);
27553a5a1b3Sopenharmony_ci
27653a5a1b3Sopenharmony_ci    pa_assert(u->memchunk.length > 0);
27753a5a1b3Sopenharmony_ci
27853a5a1b3Sopenharmony_ci    for (;;) {
27953a5a1b3Sopenharmony_ci        ssize_t l;
28053a5a1b3Sopenharmony_ci        void *p;
28153a5a1b3Sopenharmony_ci
28253a5a1b3Sopenharmony_ci        p = pa_memblock_acquire(u->memchunk.memblock);
28353a5a1b3Sopenharmony_ci        l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length, &u->write_type);
28453a5a1b3Sopenharmony_ci        pa_memblock_release(u->memchunk.memblock);
28553a5a1b3Sopenharmony_ci
28653a5a1b3Sopenharmony_ci        pa_assert(l != 0);
28753a5a1b3Sopenharmony_ci
28853a5a1b3Sopenharmony_ci        if (l < 0) {
28953a5a1b3Sopenharmony_ci
29053a5a1b3Sopenharmony_ci            if (errno == EAGAIN)
29153a5a1b3Sopenharmony_ci                return 0;
29253a5a1b3Sopenharmony_ci            else {
29353a5a1b3Sopenharmony_ci                pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
29453a5a1b3Sopenharmony_ci                return -1;
29553a5a1b3Sopenharmony_ci            }
29653a5a1b3Sopenharmony_ci
29753a5a1b3Sopenharmony_ci        } else {
29853a5a1b3Sopenharmony_ci
29953a5a1b3Sopenharmony_ci            u->memchunk.index += (size_t) l;
30053a5a1b3Sopenharmony_ci            u->memchunk.length -= (size_t) l;
30153a5a1b3Sopenharmony_ci
30253a5a1b3Sopenharmony_ci            if (u->memchunk.length <= 0) {
30353a5a1b3Sopenharmony_ci                pa_memblock_unref(u->memchunk.memblock);
30453a5a1b3Sopenharmony_ci                pa_memchunk_reset(&u->memchunk);
30553a5a1b3Sopenharmony_ci            }
30653a5a1b3Sopenharmony_ci        }
30753a5a1b3Sopenharmony_ci
30853a5a1b3Sopenharmony_ci        return 0;
30953a5a1b3Sopenharmony_ci    }
31053a5a1b3Sopenharmony_ci}
31153a5a1b3Sopenharmony_ci
31253a5a1b3Sopenharmony_cistatic void thread_func_use_timing(void *userdata) {
31353a5a1b3Sopenharmony_ci    struct userdata *u = userdata;
31453a5a1b3Sopenharmony_ci
31553a5a1b3Sopenharmony_ci    pa_assert(u);
31653a5a1b3Sopenharmony_ci
31753a5a1b3Sopenharmony_ci    pa_log_debug("Thread (use timing) starting up");
31853a5a1b3Sopenharmony_ci
31953a5a1b3Sopenharmony_ci    pa_thread_mq_install(&u->thread_mq);
32053a5a1b3Sopenharmony_ci
32153a5a1b3Sopenharmony_ci    u->timestamp = pa_rtclock_now();
32253a5a1b3Sopenharmony_ci
32353a5a1b3Sopenharmony_ci    for (;;) {
32453a5a1b3Sopenharmony_ci        pa_usec_t now = 0;
32553a5a1b3Sopenharmony_ci        int ret;
32653a5a1b3Sopenharmony_ci
32753a5a1b3Sopenharmony_ci        if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
32853a5a1b3Sopenharmony_ci            now = pa_rtclock_now();
32953a5a1b3Sopenharmony_ci
33053a5a1b3Sopenharmony_ci        if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
33153a5a1b3Sopenharmony_ci            pa_sink_process_rewind(u->sink, 0);
33253a5a1b3Sopenharmony_ci
33353a5a1b3Sopenharmony_ci        /* Render some data and write it to the fifo */
33453a5a1b3Sopenharmony_ci        if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
33553a5a1b3Sopenharmony_ci            if (u->timestamp <= now)
33653a5a1b3Sopenharmony_ci                process_render_use_timing(u, now);
33753a5a1b3Sopenharmony_ci
33853a5a1b3Sopenharmony_ci            pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp);
33953a5a1b3Sopenharmony_ci        } else
34053a5a1b3Sopenharmony_ci            pa_rtpoll_set_timer_disabled(u->rtpoll);
34153a5a1b3Sopenharmony_ci
34253a5a1b3Sopenharmony_ci        /* Hmm, nothing to do. Let's sleep */
34353a5a1b3Sopenharmony_ci        if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
34453a5a1b3Sopenharmony_ci            goto fail;
34553a5a1b3Sopenharmony_ci
34653a5a1b3Sopenharmony_ci        if (ret == 0)
34753a5a1b3Sopenharmony_ci            goto finish;
34853a5a1b3Sopenharmony_ci    }
34953a5a1b3Sopenharmony_ci
35053a5a1b3Sopenharmony_cifail:
35153a5a1b3Sopenharmony_ci    /* If this was no regular exit from the loop we have to continue
35253a5a1b3Sopenharmony_ci     * processing messages until we received PA_MESSAGE_SHUTDOWN */
35353a5a1b3Sopenharmony_ci    pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
35453a5a1b3Sopenharmony_ci    pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
35553a5a1b3Sopenharmony_ci
35653a5a1b3Sopenharmony_cifinish:
35753a5a1b3Sopenharmony_ci    pa_log_debug("Thread (use timing) shutting down");
35853a5a1b3Sopenharmony_ci}
35953a5a1b3Sopenharmony_ci
36053a5a1b3Sopenharmony_cistatic void thread_func(void *userdata) {
36153a5a1b3Sopenharmony_ci    struct userdata *u = userdata;
36253a5a1b3Sopenharmony_ci
36353a5a1b3Sopenharmony_ci    pa_assert(u);
36453a5a1b3Sopenharmony_ci
36553a5a1b3Sopenharmony_ci    pa_log_debug("Thread starting up");
36653a5a1b3Sopenharmony_ci
36753a5a1b3Sopenharmony_ci    pa_thread_mq_install(&u->thread_mq);
36853a5a1b3Sopenharmony_ci
36953a5a1b3Sopenharmony_ci    for (;;) {
37053a5a1b3Sopenharmony_ci        struct pollfd *pollfd;
37153a5a1b3Sopenharmony_ci        int ret;
37253a5a1b3Sopenharmony_ci
37353a5a1b3Sopenharmony_ci        pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
37453a5a1b3Sopenharmony_ci
37553a5a1b3Sopenharmony_ci        if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
37653a5a1b3Sopenharmony_ci            pa_sink_process_rewind(u->sink, 0);
37753a5a1b3Sopenharmony_ci
37853a5a1b3Sopenharmony_ci        /* Render some data and write it to the fifo */
37953a5a1b3Sopenharmony_ci        if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
38053a5a1b3Sopenharmony_ci            if (pollfd->revents) {
38153a5a1b3Sopenharmony_ci                if (process_render(u) < 0)
38253a5a1b3Sopenharmony_ci                    goto fail;
38353a5a1b3Sopenharmony_ci
38453a5a1b3Sopenharmony_ci                pollfd->revents = 0;
38553a5a1b3Sopenharmony_ci            }
38653a5a1b3Sopenharmony_ci        }
38753a5a1b3Sopenharmony_ci
38853a5a1b3Sopenharmony_ci        /* Hmm, nothing to do. Let's sleep */
38953a5a1b3Sopenharmony_ci        pollfd->events = (short) (u->sink->thread_info.state == PA_SINK_RUNNING ? POLLOUT : 0);
39053a5a1b3Sopenharmony_ci
39153a5a1b3Sopenharmony_ci        if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
39253a5a1b3Sopenharmony_ci            goto fail;
39353a5a1b3Sopenharmony_ci
39453a5a1b3Sopenharmony_ci        if (ret == 0)
39553a5a1b3Sopenharmony_ci            goto finish;
39653a5a1b3Sopenharmony_ci
39753a5a1b3Sopenharmony_ci        pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
39853a5a1b3Sopenharmony_ci
39953a5a1b3Sopenharmony_ci        if (pollfd->revents & ~POLLOUT) {
40053a5a1b3Sopenharmony_ci            pa_log("FIFO shutdown.");
40153a5a1b3Sopenharmony_ci            goto fail;
40253a5a1b3Sopenharmony_ci        }
40353a5a1b3Sopenharmony_ci    }
40453a5a1b3Sopenharmony_ci
40553a5a1b3Sopenharmony_cifail:
40653a5a1b3Sopenharmony_ci    /* If this was no regular exit from the loop we have to continue
40753a5a1b3Sopenharmony_ci     * processing messages until we received PA_MESSAGE_SHUTDOWN */
40853a5a1b3Sopenharmony_ci    pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
40953a5a1b3Sopenharmony_ci    pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
41053a5a1b3Sopenharmony_ci
41153a5a1b3Sopenharmony_cifinish:
41253a5a1b3Sopenharmony_ci    pa_log_debug("Thread shutting down");
41353a5a1b3Sopenharmony_ci}
41453a5a1b3Sopenharmony_ci
41553a5a1b3Sopenharmony_ciint pa__init(pa_module *m) {
41653a5a1b3Sopenharmony_ci    struct userdata *u;
41753a5a1b3Sopenharmony_ci    struct stat st;
41853a5a1b3Sopenharmony_ci    pa_sample_spec ss;
41953a5a1b3Sopenharmony_ci    pa_channel_map map;
42053a5a1b3Sopenharmony_ci    pa_modargs *ma;
42153a5a1b3Sopenharmony_ci    struct pollfd *pollfd;
42253a5a1b3Sopenharmony_ci    pa_sink_new_data data;
42353a5a1b3Sopenharmony_ci    pa_thread_func_t thread_routine;
42453a5a1b3Sopenharmony_ci
42553a5a1b3Sopenharmony_ci    pa_assert(m);
42653a5a1b3Sopenharmony_ci
42753a5a1b3Sopenharmony_ci    if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
42853a5a1b3Sopenharmony_ci        pa_log("Failed to parse module arguments.");
42953a5a1b3Sopenharmony_ci        goto fail;
43053a5a1b3Sopenharmony_ci    }
43153a5a1b3Sopenharmony_ci
43253a5a1b3Sopenharmony_ci    ss = m->core->default_sample_spec;
43353a5a1b3Sopenharmony_ci    map = m->core->default_channel_map;
43453a5a1b3Sopenharmony_ci    if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
43553a5a1b3Sopenharmony_ci        pa_log("Invalid sample format specification or channel map");
43653a5a1b3Sopenharmony_ci        goto fail;
43753a5a1b3Sopenharmony_ci    }
43853a5a1b3Sopenharmony_ci
43953a5a1b3Sopenharmony_ci    u = pa_xnew0(struct userdata, 1);
44053a5a1b3Sopenharmony_ci    u->core = m->core;
44153a5a1b3Sopenharmony_ci    u->module = m;
44253a5a1b3Sopenharmony_ci    m->userdata = u;
44353a5a1b3Sopenharmony_ci    pa_memchunk_reset(&u->memchunk);
44453a5a1b3Sopenharmony_ci    u->rtpoll = pa_rtpoll_new();
44553a5a1b3Sopenharmony_ci
44653a5a1b3Sopenharmony_ci    if (pa_modargs_get_value_boolean(ma, "use_system_clock_for_timing", &u->use_system_clock_for_timing) < 0) {
44753a5a1b3Sopenharmony_ci        pa_log("Failed to parse use_system_clock_for_timing argument.");
44853a5a1b3Sopenharmony_ci        goto fail;
44953a5a1b3Sopenharmony_ci    }
45053a5a1b3Sopenharmony_ci
45153a5a1b3Sopenharmony_ci    if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
45253a5a1b3Sopenharmony_ci        pa_log("pa_thread_mq_init() failed.");
45353a5a1b3Sopenharmony_ci        goto fail;
45453a5a1b3Sopenharmony_ci    }
45553a5a1b3Sopenharmony_ci
45653a5a1b3Sopenharmony_ci    u->write_type = 0;
45753a5a1b3Sopenharmony_ci
45853a5a1b3Sopenharmony_ci    u->filename = pa_runtime_path(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME));
45953a5a1b3Sopenharmony_ci    u->do_unlink_fifo = false;
46053a5a1b3Sopenharmony_ci
46153a5a1b3Sopenharmony_ci    if (mkfifo(u->filename, 0666) < 0) {
46253a5a1b3Sopenharmony_ci        if (errno != EEXIST) {
46353a5a1b3Sopenharmony_ci            pa_log("mkfifo('%s'): %s", u->filename, pa_cstrerror(errno));
46453a5a1b3Sopenharmony_ci            goto fail;
46553a5a1b3Sopenharmony_ci        }
46653a5a1b3Sopenharmony_ci    } else {
46753a5a1b3Sopenharmony_ci        u->do_unlink_fifo = true;
46853a5a1b3Sopenharmony_ci
46953a5a1b3Sopenharmony_ci        /* Our umask is 077, so the pipe won't be created with the requested
47053a5a1b3Sopenharmony_ci         * permissions. Let's fix the permissions with chmod(). */
47153a5a1b3Sopenharmony_ci        if (chmod(u->filename, 0666) < 0)
47253a5a1b3Sopenharmony_ci            pa_log_warn("chomd('%s'): %s", u->filename, pa_cstrerror(errno));
47353a5a1b3Sopenharmony_ci    }
47453a5a1b3Sopenharmony_ci
47553a5a1b3Sopenharmony_ci    if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) {
47653a5a1b3Sopenharmony_ci        pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno));
47753a5a1b3Sopenharmony_ci        goto fail;
47853a5a1b3Sopenharmony_ci    }
47953a5a1b3Sopenharmony_ci
48053a5a1b3Sopenharmony_ci    pa_make_fd_nonblock(u->fd);
48153a5a1b3Sopenharmony_ci
48253a5a1b3Sopenharmony_ci    if (fstat(u->fd, &st) < 0) {
48353a5a1b3Sopenharmony_ci        pa_log("fstat('%s'): %s", u->filename, pa_cstrerror(errno));
48453a5a1b3Sopenharmony_ci        goto fail;
48553a5a1b3Sopenharmony_ci    }
48653a5a1b3Sopenharmony_ci
48753a5a1b3Sopenharmony_ci    if (!S_ISFIFO(st.st_mode)) {
48853a5a1b3Sopenharmony_ci        pa_log("'%s' is not a FIFO.", u->filename);
48953a5a1b3Sopenharmony_ci        goto fail;
49053a5a1b3Sopenharmony_ci    }
49153a5a1b3Sopenharmony_ci
49253a5a1b3Sopenharmony_ci    pa_sink_new_data_init(&data);
49353a5a1b3Sopenharmony_ci    data.driver = __FILE__;
49453a5a1b3Sopenharmony_ci    data.module = m;
49553a5a1b3Sopenharmony_ci    pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
49653a5a1b3Sopenharmony_ci    pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, u->filename);
49753a5a1b3Sopenharmony_ci    pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Unix FIFO sink %s", u->filename);
49853a5a1b3Sopenharmony_ci    pa_sink_new_data_set_sample_spec(&data, &ss);
49953a5a1b3Sopenharmony_ci    pa_sink_new_data_set_channel_map(&data, &map);
50053a5a1b3Sopenharmony_ci
50153a5a1b3Sopenharmony_ci    if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
50253a5a1b3Sopenharmony_ci        pa_log("Invalid properties");
50353a5a1b3Sopenharmony_ci        pa_sink_new_data_done(&data);
50453a5a1b3Sopenharmony_ci        goto fail;
50553a5a1b3Sopenharmony_ci    }
50653a5a1b3Sopenharmony_ci
50753a5a1b3Sopenharmony_ci    if (u->use_system_clock_for_timing)
50853a5a1b3Sopenharmony_ci        u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY);
50953a5a1b3Sopenharmony_ci    else
51053a5a1b3Sopenharmony_ci        u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);
51153a5a1b3Sopenharmony_ci    pa_sink_new_data_done(&data);
51253a5a1b3Sopenharmony_ci
51353a5a1b3Sopenharmony_ci    if (!u->sink) {
51453a5a1b3Sopenharmony_ci        pa_log("Failed to create sink.");
51553a5a1b3Sopenharmony_ci        goto fail;
51653a5a1b3Sopenharmony_ci    }
51753a5a1b3Sopenharmony_ci
51853a5a1b3Sopenharmony_ci    u->sink->parent.process_msg = sink_process_msg;
51953a5a1b3Sopenharmony_ci    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
52053a5a1b3Sopenharmony_ci    if (u->use_system_clock_for_timing)
52153a5a1b3Sopenharmony_ci        u->sink->update_requested_latency = sink_update_requested_latency_cb;
52253a5a1b3Sopenharmony_ci    u->sink->userdata = u;
52353a5a1b3Sopenharmony_ci
52453a5a1b3Sopenharmony_ci    pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
52553a5a1b3Sopenharmony_ci    pa_sink_set_rtpoll(u->sink, u->rtpoll);
52653a5a1b3Sopenharmony_ci
52753a5a1b3Sopenharmony_ci    u->bytes_dropped = 0;
52853a5a1b3Sopenharmony_ci    u->fifo_error = false;
52953a5a1b3Sopenharmony_ci    u->buffer_size = pa_frame_align(pa_pipe_buf(u->fd), &u->sink->sample_spec);
53053a5a1b3Sopenharmony_ci    if (u->use_system_clock_for_timing) {
53153a5a1b3Sopenharmony_ci        u->block_usec = pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec);
53253a5a1b3Sopenharmony_ci        pa_sink_set_latency_range(u->sink, 0, u->block_usec);
53353a5a1b3Sopenharmony_ci        thread_routine = thread_func_use_timing;
53453a5a1b3Sopenharmony_ci    } else {
53553a5a1b3Sopenharmony_ci        pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec));
53653a5a1b3Sopenharmony_ci        thread_routine = thread_func;
53753a5a1b3Sopenharmony_ci    }
53853a5a1b3Sopenharmony_ci    pa_sink_set_max_request(u->sink, u->buffer_size);
53953a5a1b3Sopenharmony_ci
54053a5a1b3Sopenharmony_ci    u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
54153a5a1b3Sopenharmony_ci    pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
54253a5a1b3Sopenharmony_ci    pollfd->fd = u->fd;
54353a5a1b3Sopenharmony_ci    pollfd->events = pollfd->revents = 0;
54453a5a1b3Sopenharmony_ci
54553a5a1b3Sopenharmony_ci    if (!(u->thread = pa_thread_new("pipe-sink", thread_routine, u))) {
54653a5a1b3Sopenharmony_ci        pa_log("Failed to create thread.");
54753a5a1b3Sopenharmony_ci        goto fail;
54853a5a1b3Sopenharmony_ci    }
54953a5a1b3Sopenharmony_ci
55053a5a1b3Sopenharmony_ci    pa_sink_put(u->sink);
55153a5a1b3Sopenharmony_ci
55253a5a1b3Sopenharmony_ci    pa_modargs_free(ma);
55353a5a1b3Sopenharmony_ci
55453a5a1b3Sopenharmony_ci    return 0;
55553a5a1b3Sopenharmony_ci
55653a5a1b3Sopenharmony_cifail:
55753a5a1b3Sopenharmony_ci    if (ma)
55853a5a1b3Sopenharmony_ci        pa_modargs_free(ma);
55953a5a1b3Sopenharmony_ci
56053a5a1b3Sopenharmony_ci    pa__done(m);
56153a5a1b3Sopenharmony_ci
56253a5a1b3Sopenharmony_ci    return -1;
56353a5a1b3Sopenharmony_ci}
56453a5a1b3Sopenharmony_ci
56553a5a1b3Sopenharmony_ciint pa__get_n_used(pa_module *m) {
56653a5a1b3Sopenharmony_ci    struct userdata *u;
56753a5a1b3Sopenharmony_ci
56853a5a1b3Sopenharmony_ci    pa_assert(m);
56953a5a1b3Sopenharmony_ci    pa_assert_se(u = m->userdata);
57053a5a1b3Sopenharmony_ci
57153a5a1b3Sopenharmony_ci    return pa_sink_linked_by(u->sink);
57253a5a1b3Sopenharmony_ci}
57353a5a1b3Sopenharmony_ci
57453a5a1b3Sopenharmony_civoid pa__done(pa_module *m) {
57553a5a1b3Sopenharmony_ci    struct userdata *u;
57653a5a1b3Sopenharmony_ci
57753a5a1b3Sopenharmony_ci    pa_assert(m);
57853a5a1b3Sopenharmony_ci
57953a5a1b3Sopenharmony_ci    if (!(u = m->userdata))
58053a5a1b3Sopenharmony_ci        return;
58153a5a1b3Sopenharmony_ci
58253a5a1b3Sopenharmony_ci    if (u->sink)
58353a5a1b3Sopenharmony_ci        pa_sink_unlink(u->sink);
58453a5a1b3Sopenharmony_ci
58553a5a1b3Sopenharmony_ci    if (u->thread) {
58653a5a1b3Sopenharmony_ci        pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
58753a5a1b3Sopenharmony_ci        pa_thread_free(u->thread);
58853a5a1b3Sopenharmony_ci    }
58953a5a1b3Sopenharmony_ci
59053a5a1b3Sopenharmony_ci    pa_thread_mq_done(&u->thread_mq);
59153a5a1b3Sopenharmony_ci
59253a5a1b3Sopenharmony_ci    if (u->sink)
59353a5a1b3Sopenharmony_ci        pa_sink_unref(u->sink);
59453a5a1b3Sopenharmony_ci
59553a5a1b3Sopenharmony_ci    if (u->memchunk.memblock)
59653a5a1b3Sopenharmony_ci        pa_memblock_unref(u->memchunk.memblock);
59753a5a1b3Sopenharmony_ci
59853a5a1b3Sopenharmony_ci    if (u->rtpoll_item)
59953a5a1b3Sopenharmony_ci        pa_rtpoll_item_free(u->rtpoll_item);
60053a5a1b3Sopenharmony_ci
60153a5a1b3Sopenharmony_ci    if (u->rtpoll)
60253a5a1b3Sopenharmony_ci        pa_rtpoll_free(u->rtpoll);
60353a5a1b3Sopenharmony_ci
60453a5a1b3Sopenharmony_ci    if (u->filename) {
60553a5a1b3Sopenharmony_ci        if (u->do_unlink_fifo)
60653a5a1b3Sopenharmony_ci            unlink(u->filename);
60753a5a1b3Sopenharmony_ci        pa_xfree(u->filename);
60853a5a1b3Sopenharmony_ci    }
60953a5a1b3Sopenharmony_ci
61053a5a1b3Sopenharmony_ci    if (u->fd >= 0)
61153a5a1b3Sopenharmony_ci        pa_assert_se(pa_close(u->fd) == 0);
61253a5a1b3Sopenharmony_ci
61353a5a1b3Sopenharmony_ci    pa_xfree(u);
61453a5a1b3Sopenharmony_ci}
615