153a5a1b3Sopenharmony_ci/*** 253a5a1b3Sopenharmony_ci This file is part of PulseAudio. 353a5a1b3Sopenharmony_ci 453a5a1b3Sopenharmony_ci Copyright 2004-2006 Lennart Poettering 553a5a1b3Sopenharmony_ci 653a5a1b3Sopenharmony_ci PulseAudio is free software; you can redistribute it and/or modify 753a5a1b3Sopenharmony_ci it under the terms of the GNU Lesser General Public License as published 853a5a1b3Sopenharmony_ci by the Free Software Foundation; either version 2.1 of the License, 953a5a1b3Sopenharmony_ci or (at your option) any later version. 1053a5a1b3Sopenharmony_ci 1153a5a1b3Sopenharmony_ci PulseAudio is distributed in the hope that it will be useful, but 1253a5a1b3Sopenharmony_ci WITHOUT ANY WARRANTY; without even the implied warranty of 1353a5a1b3Sopenharmony_ci MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 1453a5a1b3Sopenharmony_ci General Public License for more details. 1553a5a1b3Sopenharmony_ci 1653a5a1b3Sopenharmony_ci You should have received a copy of the GNU Lesser General Public License 1753a5a1b3Sopenharmony_ci along with PulseAudio; if not, see <http://www.gnu.org/licenses/>. 1853a5a1b3Sopenharmony_ci***/ 1953a5a1b3Sopenharmony_ci 2053a5a1b3Sopenharmony_ci#ifdef HAVE_CONFIG_H 2153a5a1b3Sopenharmony_ci#include <config.h> 2253a5a1b3Sopenharmony_ci#endif 2353a5a1b3Sopenharmony_ci 2453a5a1b3Sopenharmony_ci#include <stdlib.h> 2553a5a1b3Sopenharmony_ci#include <sys/stat.h> 2653a5a1b3Sopenharmony_ci#include <stdio.h> 2753a5a1b3Sopenharmony_ci#include <errno.h> 2853a5a1b3Sopenharmony_ci#include <fcntl.h> 2953a5a1b3Sopenharmony_ci#include <unistd.h> 3053a5a1b3Sopenharmony_ci#include <sys/ioctl.h> 3153a5a1b3Sopenharmony_ci 3253a5a1b3Sopenharmony_ci#ifdef HAVE_SYS_FILIO_H 3353a5a1b3Sopenharmony_ci#include <sys/filio.h> 3453a5a1b3Sopenharmony_ci#endif 3553a5a1b3Sopenharmony_ci 3653a5a1b3Sopenharmony_ci#include <pulse/xmalloc.h> 3753a5a1b3Sopenharmony_ci#include <pulse/timeval.h> 3853a5a1b3Sopenharmony_ci#include <pulse/util.h> 3953a5a1b3Sopenharmony_ci#include <pulse/rtclock.h> 4053a5a1b3Sopenharmony_ci 4153a5a1b3Sopenharmony_ci#include <pulsecore/core-error.h> 4253a5a1b3Sopenharmony_ci#include <pulsecore/sink.h> 4353a5a1b3Sopenharmony_ci#include <pulsecore/module.h> 4453a5a1b3Sopenharmony_ci#include <pulsecore/core-util.h> 4553a5a1b3Sopenharmony_ci#include <pulsecore/modargs.h> 4653a5a1b3Sopenharmony_ci#include <pulsecore/log.h> 4753a5a1b3Sopenharmony_ci#include <pulsecore/thread.h> 4853a5a1b3Sopenharmony_ci#include <pulsecore/thread-mq.h> 4953a5a1b3Sopenharmony_ci#include <pulsecore/rtpoll.h> 5053a5a1b3Sopenharmony_ci#include <pulsecore/poll.h> 5153a5a1b3Sopenharmony_ci 5253a5a1b3Sopenharmony_ciPA_MODULE_AUTHOR("Lennart Poettering"); 5353a5a1b3Sopenharmony_ciPA_MODULE_DESCRIPTION("UNIX pipe sink"); 5453a5a1b3Sopenharmony_ciPA_MODULE_VERSION(PACKAGE_VERSION); 5553a5a1b3Sopenharmony_ciPA_MODULE_LOAD_ONCE(false); 5653a5a1b3Sopenharmony_ciPA_MODULE_USAGE( 5753a5a1b3Sopenharmony_ci "sink_name=<name for the sink> " 5853a5a1b3Sopenharmony_ci "sink_properties=<properties for the sink> " 5953a5a1b3Sopenharmony_ci "file=<path of the FIFO> " 6053a5a1b3Sopenharmony_ci "format=<sample format> " 6153a5a1b3Sopenharmony_ci "rate=<sample rate> " 6253a5a1b3Sopenharmony_ci "channels=<number of channels> " 6353a5a1b3Sopenharmony_ci "channel_map=<channel map> " 6453a5a1b3Sopenharmony_ci "use_system_clock_for_timing=<yes or no> " 6553a5a1b3Sopenharmony_ci); 6653a5a1b3Sopenharmony_ci 6753a5a1b3Sopenharmony_ci#define DEFAULT_FILE_NAME "fifo_output" 6853a5a1b3Sopenharmony_ci#define DEFAULT_SINK_NAME "fifo_output" 6953a5a1b3Sopenharmony_ci 7053a5a1b3Sopenharmony_cistruct userdata { 7153a5a1b3Sopenharmony_ci pa_core *core; 7253a5a1b3Sopenharmony_ci pa_module *module; 7353a5a1b3Sopenharmony_ci pa_sink *sink; 7453a5a1b3Sopenharmony_ci 7553a5a1b3Sopenharmony_ci pa_thread *thread; 7653a5a1b3Sopenharmony_ci pa_thread_mq thread_mq; 7753a5a1b3Sopenharmony_ci pa_rtpoll *rtpoll; 7853a5a1b3Sopenharmony_ci 7953a5a1b3Sopenharmony_ci char *filename; 8053a5a1b3Sopenharmony_ci int fd; 8153a5a1b3Sopenharmony_ci bool do_unlink_fifo; 8253a5a1b3Sopenharmony_ci size_t buffer_size; 8353a5a1b3Sopenharmony_ci size_t bytes_dropped; 8453a5a1b3Sopenharmony_ci bool fifo_error; 8553a5a1b3Sopenharmony_ci 8653a5a1b3Sopenharmony_ci pa_memchunk memchunk; 8753a5a1b3Sopenharmony_ci 8853a5a1b3Sopenharmony_ci pa_rtpoll_item *rtpoll_item; 8953a5a1b3Sopenharmony_ci 9053a5a1b3Sopenharmony_ci int write_type; 9153a5a1b3Sopenharmony_ci pa_usec_t block_usec; 9253a5a1b3Sopenharmony_ci pa_usec_t timestamp; 9353a5a1b3Sopenharmony_ci 9453a5a1b3Sopenharmony_ci bool use_system_clock_for_timing; 9553a5a1b3Sopenharmony_ci}; 9653a5a1b3Sopenharmony_ci 9753a5a1b3Sopenharmony_cistatic const char* const valid_modargs[] = { 9853a5a1b3Sopenharmony_ci "sink_name", 9953a5a1b3Sopenharmony_ci "sink_properties", 10053a5a1b3Sopenharmony_ci "file", 10153a5a1b3Sopenharmony_ci "format", 10253a5a1b3Sopenharmony_ci "rate", 10353a5a1b3Sopenharmony_ci "channels", 10453a5a1b3Sopenharmony_ci "channel_map", 10553a5a1b3Sopenharmony_ci "use_system_clock_for_timing", 10653a5a1b3Sopenharmony_ci NULL 10753a5a1b3Sopenharmony_ci}; 10853a5a1b3Sopenharmony_ci 10953a5a1b3Sopenharmony_cistatic int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { 11053a5a1b3Sopenharmony_ci struct userdata *u = PA_SINK(o)->userdata; 11153a5a1b3Sopenharmony_ci 11253a5a1b3Sopenharmony_ci switch (code) { 11353a5a1b3Sopenharmony_ci case PA_SINK_MESSAGE_GET_LATENCY: 11453a5a1b3Sopenharmony_ci if (u->use_system_clock_for_timing) { 11553a5a1b3Sopenharmony_ci pa_usec_t now; 11653a5a1b3Sopenharmony_ci now = pa_rtclock_now(); 11753a5a1b3Sopenharmony_ci *((int64_t*) data) = (int64_t)u->timestamp - (int64_t)now; 11853a5a1b3Sopenharmony_ci } else { 11953a5a1b3Sopenharmony_ci size_t n = 0; 12053a5a1b3Sopenharmony_ci 12153a5a1b3Sopenharmony_ci#ifdef FIONREAD 12253a5a1b3Sopenharmony_ci int l; 12353a5a1b3Sopenharmony_ci 12453a5a1b3Sopenharmony_ci if (ioctl(u->fd, FIONREAD, &l) >= 0 && l > 0) 12553a5a1b3Sopenharmony_ci n = (size_t) l; 12653a5a1b3Sopenharmony_ci#endif 12753a5a1b3Sopenharmony_ci 12853a5a1b3Sopenharmony_ci n += u->memchunk.length; 12953a5a1b3Sopenharmony_ci 13053a5a1b3Sopenharmony_ci *((int64_t*) data) = pa_bytes_to_usec(n, &u->sink->sample_spec); 13153a5a1b3Sopenharmony_ci } 13253a5a1b3Sopenharmony_ci return 0; 13353a5a1b3Sopenharmony_ci } 13453a5a1b3Sopenharmony_ci 13553a5a1b3Sopenharmony_ci return pa_sink_process_msg(o, code, data, offset, chunk); 13653a5a1b3Sopenharmony_ci} 13753a5a1b3Sopenharmony_ci 13853a5a1b3Sopenharmony_ci/* Called from the IO thread. */ 13953a5a1b3Sopenharmony_cistatic int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) { 14053a5a1b3Sopenharmony_ci struct userdata *u; 14153a5a1b3Sopenharmony_ci 14253a5a1b3Sopenharmony_ci pa_assert(s); 14353a5a1b3Sopenharmony_ci pa_assert_se(u = s->userdata); 14453a5a1b3Sopenharmony_ci 14553a5a1b3Sopenharmony_ci if (s->thread_info.state == PA_SINK_SUSPENDED || s->thread_info.state == PA_SINK_INIT) { 14653a5a1b3Sopenharmony_ci if (PA_SINK_IS_OPENED(new_state)) 14753a5a1b3Sopenharmony_ci u->timestamp = pa_rtclock_now(); 14853a5a1b3Sopenharmony_ci } else if (PA_SINK_IS_OPENED(s->thread_info.state)) { 14953a5a1b3Sopenharmony_ci if (new_state == PA_SINK_SUSPENDED) { 15053a5a1b3Sopenharmony_ci /* Clear potential FIFO error flag */ 15153a5a1b3Sopenharmony_ci u->fifo_error = false; 15253a5a1b3Sopenharmony_ci 15353a5a1b3Sopenharmony_ci /* Continuously dropping data (clear counter on entering suspended state. */ 15453a5a1b3Sopenharmony_ci if (u->bytes_dropped != 0) { 15553a5a1b3Sopenharmony_ci pa_log_debug("Pipe-sink continuously dropping data - clear statistics (%zu -> 0 bytes dropped)", u->bytes_dropped); 15653a5a1b3Sopenharmony_ci u->bytes_dropped = 0; 15753a5a1b3Sopenharmony_ci } 15853a5a1b3Sopenharmony_ci } 15953a5a1b3Sopenharmony_ci } 16053a5a1b3Sopenharmony_ci 16153a5a1b3Sopenharmony_ci return 0; 16253a5a1b3Sopenharmony_ci} 16353a5a1b3Sopenharmony_ci 16453a5a1b3Sopenharmony_cistatic void sink_update_requested_latency_cb(pa_sink *s) { 16553a5a1b3Sopenharmony_ci struct userdata *u; 16653a5a1b3Sopenharmony_ci size_t nbytes; 16753a5a1b3Sopenharmony_ci 16853a5a1b3Sopenharmony_ci pa_sink_assert_ref(s); 16953a5a1b3Sopenharmony_ci pa_assert_se(u = s->userdata); 17053a5a1b3Sopenharmony_ci 17153a5a1b3Sopenharmony_ci u->block_usec = pa_sink_get_requested_latency_within_thread(s); 17253a5a1b3Sopenharmony_ci 17353a5a1b3Sopenharmony_ci if (u->block_usec == (pa_usec_t) -1) 17453a5a1b3Sopenharmony_ci u->block_usec = s->thread_info.max_latency; 17553a5a1b3Sopenharmony_ci 17653a5a1b3Sopenharmony_ci nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec); 17753a5a1b3Sopenharmony_ci pa_sink_set_max_request_within_thread(s, nbytes); 17853a5a1b3Sopenharmony_ci} 17953a5a1b3Sopenharmony_ci 18053a5a1b3Sopenharmony_cistatic ssize_t pipe_sink_write(struct userdata *u, pa_memchunk *pchunk) { 18153a5a1b3Sopenharmony_ci size_t index, length; 18253a5a1b3Sopenharmony_ci ssize_t count = 0; 18353a5a1b3Sopenharmony_ci void *p; 18453a5a1b3Sopenharmony_ci 18553a5a1b3Sopenharmony_ci pa_assert(u); 18653a5a1b3Sopenharmony_ci pa_assert(pchunk); 18753a5a1b3Sopenharmony_ci 18853a5a1b3Sopenharmony_ci index = pchunk->index; 18953a5a1b3Sopenharmony_ci length = pchunk->length; 19053a5a1b3Sopenharmony_ci p = pa_memblock_acquire(pchunk->memblock); 19153a5a1b3Sopenharmony_ci 19253a5a1b3Sopenharmony_ci for (;;) { 19353a5a1b3Sopenharmony_ci ssize_t l; 19453a5a1b3Sopenharmony_ci 19553a5a1b3Sopenharmony_ci l = pa_write(u->fd, (uint8_t*) p + index, length, &u->write_type); 19653a5a1b3Sopenharmony_ci 19753a5a1b3Sopenharmony_ci pa_assert(l != 0); 19853a5a1b3Sopenharmony_ci 19953a5a1b3Sopenharmony_ci if (l < 0) { 20053a5a1b3Sopenharmony_ci if (errno == EAGAIN) 20153a5a1b3Sopenharmony_ci break; 20253a5a1b3Sopenharmony_ci 20353a5a1b3Sopenharmony_ci if (!u->fifo_error) { 20453a5a1b3Sopenharmony_ci pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno)); 20553a5a1b3Sopenharmony_ci u->fifo_error = true; 20653a5a1b3Sopenharmony_ci } 20753a5a1b3Sopenharmony_ci count = -1 - count; 20853a5a1b3Sopenharmony_ci break; 20953a5a1b3Sopenharmony_ci } else { 21053a5a1b3Sopenharmony_ci if (u->fifo_error) { 21153a5a1b3Sopenharmony_ci pa_log_debug("Recovered from FIFO error"); 21253a5a1b3Sopenharmony_ci u->fifo_error = false; 21353a5a1b3Sopenharmony_ci } 21453a5a1b3Sopenharmony_ci count += l; 21553a5a1b3Sopenharmony_ci index += l; 21653a5a1b3Sopenharmony_ci length -= l; 21753a5a1b3Sopenharmony_ci 21853a5a1b3Sopenharmony_ci if (length <= 0) { 21953a5a1b3Sopenharmony_ci break; 22053a5a1b3Sopenharmony_ci } 22153a5a1b3Sopenharmony_ci } 22253a5a1b3Sopenharmony_ci } 22353a5a1b3Sopenharmony_ci 22453a5a1b3Sopenharmony_ci pa_memblock_release(pchunk->memblock); 22553a5a1b3Sopenharmony_ci 22653a5a1b3Sopenharmony_ci return count; 22753a5a1b3Sopenharmony_ci} 22853a5a1b3Sopenharmony_ci 22953a5a1b3Sopenharmony_cistatic void process_render_use_timing(struct userdata *u, pa_usec_t now) { 23053a5a1b3Sopenharmony_ci size_t dropped = 0; 23153a5a1b3Sopenharmony_ci size_t consumed = 0; 23253a5a1b3Sopenharmony_ci 23353a5a1b3Sopenharmony_ci pa_assert(u); 23453a5a1b3Sopenharmony_ci 23553a5a1b3Sopenharmony_ci /* Fill the buffer up the latency size */ 23653a5a1b3Sopenharmony_ci while (u->timestamp < now + u->block_usec) { 23753a5a1b3Sopenharmony_ci ssize_t written = 0; 23853a5a1b3Sopenharmony_ci pa_memchunk chunk; 23953a5a1b3Sopenharmony_ci 24053a5a1b3Sopenharmony_ci pa_sink_render(u->sink, u->sink->thread_info.max_request, &chunk); 24153a5a1b3Sopenharmony_ci 24253a5a1b3Sopenharmony_ci pa_assert(chunk.length > 0); 24353a5a1b3Sopenharmony_ci 24453a5a1b3Sopenharmony_ci if ((written = pipe_sink_write(u, &chunk)) < 0) 24553a5a1b3Sopenharmony_ci written = -1 - written; 24653a5a1b3Sopenharmony_ci 24753a5a1b3Sopenharmony_ci pa_memblock_unref(chunk.memblock); 24853a5a1b3Sopenharmony_ci 24953a5a1b3Sopenharmony_ci u->timestamp += pa_bytes_to_usec(chunk.length, &u->sink->sample_spec); 25053a5a1b3Sopenharmony_ci 25153a5a1b3Sopenharmony_ci dropped = chunk.length - written; 25253a5a1b3Sopenharmony_ci 25353a5a1b3Sopenharmony_ci if (u->bytes_dropped != 0 && dropped != chunk.length) { 25453a5a1b3Sopenharmony_ci pa_log_debug("Pipe-sink continuously dropped %zu bytes", u->bytes_dropped); 25553a5a1b3Sopenharmony_ci u->bytes_dropped = 0; 25653a5a1b3Sopenharmony_ci } 25753a5a1b3Sopenharmony_ci 25853a5a1b3Sopenharmony_ci if (u->bytes_dropped == 0 && dropped != 0) 25953a5a1b3Sopenharmony_ci pa_log_debug("Pipe-sink just dropped %zu bytes", dropped); 26053a5a1b3Sopenharmony_ci 26153a5a1b3Sopenharmony_ci u->bytes_dropped += dropped; 26253a5a1b3Sopenharmony_ci 26353a5a1b3Sopenharmony_ci consumed += chunk.length; 26453a5a1b3Sopenharmony_ci 26553a5a1b3Sopenharmony_ci if (consumed >= u->sink->thread_info.max_request) 26653a5a1b3Sopenharmony_ci break; 26753a5a1b3Sopenharmony_ci } 26853a5a1b3Sopenharmony_ci} 26953a5a1b3Sopenharmony_ci 27053a5a1b3Sopenharmony_cistatic int process_render(struct userdata *u) { 27153a5a1b3Sopenharmony_ci pa_assert(u); 27253a5a1b3Sopenharmony_ci 27353a5a1b3Sopenharmony_ci if (u->memchunk.length <= 0) 27453a5a1b3Sopenharmony_ci pa_sink_render(u->sink, u->buffer_size, &u->memchunk); 27553a5a1b3Sopenharmony_ci 27653a5a1b3Sopenharmony_ci pa_assert(u->memchunk.length > 0); 27753a5a1b3Sopenharmony_ci 27853a5a1b3Sopenharmony_ci for (;;) { 27953a5a1b3Sopenharmony_ci ssize_t l; 28053a5a1b3Sopenharmony_ci void *p; 28153a5a1b3Sopenharmony_ci 28253a5a1b3Sopenharmony_ci p = pa_memblock_acquire(u->memchunk.memblock); 28353a5a1b3Sopenharmony_ci l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length, &u->write_type); 28453a5a1b3Sopenharmony_ci pa_memblock_release(u->memchunk.memblock); 28553a5a1b3Sopenharmony_ci 28653a5a1b3Sopenharmony_ci pa_assert(l != 0); 28753a5a1b3Sopenharmony_ci 28853a5a1b3Sopenharmony_ci if (l < 0) { 28953a5a1b3Sopenharmony_ci 29053a5a1b3Sopenharmony_ci if (errno == EAGAIN) 29153a5a1b3Sopenharmony_ci return 0; 29253a5a1b3Sopenharmony_ci else { 29353a5a1b3Sopenharmony_ci pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno)); 29453a5a1b3Sopenharmony_ci return -1; 29553a5a1b3Sopenharmony_ci } 29653a5a1b3Sopenharmony_ci 29753a5a1b3Sopenharmony_ci } else { 29853a5a1b3Sopenharmony_ci 29953a5a1b3Sopenharmony_ci u->memchunk.index += (size_t) l; 30053a5a1b3Sopenharmony_ci u->memchunk.length -= (size_t) l; 30153a5a1b3Sopenharmony_ci 30253a5a1b3Sopenharmony_ci if (u->memchunk.length <= 0) { 30353a5a1b3Sopenharmony_ci pa_memblock_unref(u->memchunk.memblock); 30453a5a1b3Sopenharmony_ci pa_memchunk_reset(&u->memchunk); 30553a5a1b3Sopenharmony_ci } 30653a5a1b3Sopenharmony_ci } 30753a5a1b3Sopenharmony_ci 30853a5a1b3Sopenharmony_ci return 0; 30953a5a1b3Sopenharmony_ci } 31053a5a1b3Sopenharmony_ci} 31153a5a1b3Sopenharmony_ci 31253a5a1b3Sopenharmony_cistatic void thread_func_use_timing(void *userdata) { 31353a5a1b3Sopenharmony_ci struct userdata *u = userdata; 31453a5a1b3Sopenharmony_ci 31553a5a1b3Sopenharmony_ci pa_assert(u); 31653a5a1b3Sopenharmony_ci 31753a5a1b3Sopenharmony_ci pa_log_debug("Thread (use timing) starting up"); 31853a5a1b3Sopenharmony_ci 31953a5a1b3Sopenharmony_ci pa_thread_mq_install(&u->thread_mq); 32053a5a1b3Sopenharmony_ci 32153a5a1b3Sopenharmony_ci u->timestamp = pa_rtclock_now(); 32253a5a1b3Sopenharmony_ci 32353a5a1b3Sopenharmony_ci for (;;) { 32453a5a1b3Sopenharmony_ci pa_usec_t now = 0; 32553a5a1b3Sopenharmony_ci int ret; 32653a5a1b3Sopenharmony_ci 32753a5a1b3Sopenharmony_ci if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) 32853a5a1b3Sopenharmony_ci now = pa_rtclock_now(); 32953a5a1b3Sopenharmony_ci 33053a5a1b3Sopenharmony_ci if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) 33153a5a1b3Sopenharmony_ci pa_sink_process_rewind(u->sink, 0); 33253a5a1b3Sopenharmony_ci 33353a5a1b3Sopenharmony_ci /* Render some data and write it to the fifo */ 33453a5a1b3Sopenharmony_ci if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) { 33553a5a1b3Sopenharmony_ci if (u->timestamp <= now) 33653a5a1b3Sopenharmony_ci process_render_use_timing(u, now); 33753a5a1b3Sopenharmony_ci 33853a5a1b3Sopenharmony_ci pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp); 33953a5a1b3Sopenharmony_ci } else 34053a5a1b3Sopenharmony_ci pa_rtpoll_set_timer_disabled(u->rtpoll); 34153a5a1b3Sopenharmony_ci 34253a5a1b3Sopenharmony_ci /* Hmm, nothing to do. Let's sleep */ 34353a5a1b3Sopenharmony_ci if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) 34453a5a1b3Sopenharmony_ci goto fail; 34553a5a1b3Sopenharmony_ci 34653a5a1b3Sopenharmony_ci if (ret == 0) 34753a5a1b3Sopenharmony_ci goto finish; 34853a5a1b3Sopenharmony_ci } 34953a5a1b3Sopenharmony_ci 35053a5a1b3Sopenharmony_cifail: 35153a5a1b3Sopenharmony_ci /* If this was no regular exit from the loop we have to continue 35253a5a1b3Sopenharmony_ci * processing messages until we received PA_MESSAGE_SHUTDOWN */ 35353a5a1b3Sopenharmony_ci pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); 35453a5a1b3Sopenharmony_ci pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); 35553a5a1b3Sopenharmony_ci 35653a5a1b3Sopenharmony_cifinish: 35753a5a1b3Sopenharmony_ci pa_log_debug("Thread (use timing) shutting down"); 35853a5a1b3Sopenharmony_ci} 35953a5a1b3Sopenharmony_ci 36053a5a1b3Sopenharmony_cistatic void thread_func(void *userdata) { 36153a5a1b3Sopenharmony_ci struct userdata *u = userdata; 36253a5a1b3Sopenharmony_ci 36353a5a1b3Sopenharmony_ci pa_assert(u); 36453a5a1b3Sopenharmony_ci 36553a5a1b3Sopenharmony_ci pa_log_debug("Thread starting up"); 36653a5a1b3Sopenharmony_ci 36753a5a1b3Sopenharmony_ci pa_thread_mq_install(&u->thread_mq); 36853a5a1b3Sopenharmony_ci 36953a5a1b3Sopenharmony_ci for (;;) { 37053a5a1b3Sopenharmony_ci struct pollfd *pollfd; 37153a5a1b3Sopenharmony_ci int ret; 37253a5a1b3Sopenharmony_ci 37353a5a1b3Sopenharmony_ci pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); 37453a5a1b3Sopenharmony_ci 37553a5a1b3Sopenharmony_ci if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) 37653a5a1b3Sopenharmony_ci pa_sink_process_rewind(u->sink, 0); 37753a5a1b3Sopenharmony_ci 37853a5a1b3Sopenharmony_ci /* Render some data and write it to the fifo */ 37953a5a1b3Sopenharmony_ci if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) { 38053a5a1b3Sopenharmony_ci if (pollfd->revents) { 38153a5a1b3Sopenharmony_ci if (process_render(u) < 0) 38253a5a1b3Sopenharmony_ci goto fail; 38353a5a1b3Sopenharmony_ci 38453a5a1b3Sopenharmony_ci pollfd->revents = 0; 38553a5a1b3Sopenharmony_ci } 38653a5a1b3Sopenharmony_ci } 38753a5a1b3Sopenharmony_ci 38853a5a1b3Sopenharmony_ci /* Hmm, nothing to do. Let's sleep */ 38953a5a1b3Sopenharmony_ci pollfd->events = (short) (u->sink->thread_info.state == PA_SINK_RUNNING ? POLLOUT : 0); 39053a5a1b3Sopenharmony_ci 39153a5a1b3Sopenharmony_ci if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) 39253a5a1b3Sopenharmony_ci goto fail; 39353a5a1b3Sopenharmony_ci 39453a5a1b3Sopenharmony_ci if (ret == 0) 39553a5a1b3Sopenharmony_ci goto finish; 39653a5a1b3Sopenharmony_ci 39753a5a1b3Sopenharmony_ci pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); 39853a5a1b3Sopenharmony_ci 39953a5a1b3Sopenharmony_ci if (pollfd->revents & ~POLLOUT) { 40053a5a1b3Sopenharmony_ci pa_log("FIFO shutdown."); 40153a5a1b3Sopenharmony_ci goto fail; 40253a5a1b3Sopenharmony_ci } 40353a5a1b3Sopenharmony_ci } 40453a5a1b3Sopenharmony_ci 40553a5a1b3Sopenharmony_cifail: 40653a5a1b3Sopenharmony_ci /* If this was no regular exit from the loop we have to continue 40753a5a1b3Sopenharmony_ci * processing messages until we received PA_MESSAGE_SHUTDOWN */ 40853a5a1b3Sopenharmony_ci pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); 40953a5a1b3Sopenharmony_ci pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); 41053a5a1b3Sopenharmony_ci 41153a5a1b3Sopenharmony_cifinish: 41253a5a1b3Sopenharmony_ci pa_log_debug("Thread shutting down"); 41353a5a1b3Sopenharmony_ci} 41453a5a1b3Sopenharmony_ci 41553a5a1b3Sopenharmony_ciint pa__init(pa_module *m) { 41653a5a1b3Sopenharmony_ci struct userdata *u; 41753a5a1b3Sopenharmony_ci struct stat st; 41853a5a1b3Sopenharmony_ci pa_sample_spec ss; 41953a5a1b3Sopenharmony_ci pa_channel_map map; 42053a5a1b3Sopenharmony_ci pa_modargs *ma; 42153a5a1b3Sopenharmony_ci struct pollfd *pollfd; 42253a5a1b3Sopenharmony_ci pa_sink_new_data data; 42353a5a1b3Sopenharmony_ci pa_thread_func_t thread_routine; 42453a5a1b3Sopenharmony_ci 42553a5a1b3Sopenharmony_ci pa_assert(m); 42653a5a1b3Sopenharmony_ci 42753a5a1b3Sopenharmony_ci if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { 42853a5a1b3Sopenharmony_ci pa_log("Failed to parse module arguments."); 42953a5a1b3Sopenharmony_ci goto fail; 43053a5a1b3Sopenharmony_ci } 43153a5a1b3Sopenharmony_ci 43253a5a1b3Sopenharmony_ci ss = m->core->default_sample_spec; 43353a5a1b3Sopenharmony_ci map = m->core->default_channel_map; 43453a5a1b3Sopenharmony_ci if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) { 43553a5a1b3Sopenharmony_ci pa_log("Invalid sample format specification or channel map"); 43653a5a1b3Sopenharmony_ci goto fail; 43753a5a1b3Sopenharmony_ci } 43853a5a1b3Sopenharmony_ci 43953a5a1b3Sopenharmony_ci u = pa_xnew0(struct userdata, 1); 44053a5a1b3Sopenharmony_ci u->core = m->core; 44153a5a1b3Sopenharmony_ci u->module = m; 44253a5a1b3Sopenharmony_ci m->userdata = u; 44353a5a1b3Sopenharmony_ci pa_memchunk_reset(&u->memchunk); 44453a5a1b3Sopenharmony_ci u->rtpoll = pa_rtpoll_new(); 44553a5a1b3Sopenharmony_ci 44653a5a1b3Sopenharmony_ci if (pa_modargs_get_value_boolean(ma, "use_system_clock_for_timing", &u->use_system_clock_for_timing) < 0) { 44753a5a1b3Sopenharmony_ci pa_log("Failed to parse use_system_clock_for_timing argument."); 44853a5a1b3Sopenharmony_ci goto fail; 44953a5a1b3Sopenharmony_ci } 45053a5a1b3Sopenharmony_ci 45153a5a1b3Sopenharmony_ci if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) { 45253a5a1b3Sopenharmony_ci pa_log("pa_thread_mq_init() failed."); 45353a5a1b3Sopenharmony_ci goto fail; 45453a5a1b3Sopenharmony_ci } 45553a5a1b3Sopenharmony_ci 45653a5a1b3Sopenharmony_ci u->write_type = 0; 45753a5a1b3Sopenharmony_ci 45853a5a1b3Sopenharmony_ci u->filename = pa_runtime_path(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME)); 45953a5a1b3Sopenharmony_ci u->do_unlink_fifo = false; 46053a5a1b3Sopenharmony_ci 46153a5a1b3Sopenharmony_ci if (mkfifo(u->filename, 0666) < 0) { 46253a5a1b3Sopenharmony_ci if (errno != EEXIST) { 46353a5a1b3Sopenharmony_ci pa_log("mkfifo('%s'): %s", u->filename, pa_cstrerror(errno)); 46453a5a1b3Sopenharmony_ci goto fail; 46553a5a1b3Sopenharmony_ci } 46653a5a1b3Sopenharmony_ci } else { 46753a5a1b3Sopenharmony_ci u->do_unlink_fifo = true; 46853a5a1b3Sopenharmony_ci 46953a5a1b3Sopenharmony_ci /* Our umask is 077, so the pipe won't be created with the requested 47053a5a1b3Sopenharmony_ci * permissions. Let's fix the permissions with chmod(). */ 47153a5a1b3Sopenharmony_ci if (chmod(u->filename, 0666) < 0) 47253a5a1b3Sopenharmony_ci pa_log_warn("chomd('%s'): %s", u->filename, pa_cstrerror(errno)); 47353a5a1b3Sopenharmony_ci } 47453a5a1b3Sopenharmony_ci 47553a5a1b3Sopenharmony_ci if ((u->fd = pa_open_cloexec(u->filename, O_RDWR, 0)) < 0) { 47653a5a1b3Sopenharmony_ci pa_log("open('%s'): %s", u->filename, pa_cstrerror(errno)); 47753a5a1b3Sopenharmony_ci goto fail; 47853a5a1b3Sopenharmony_ci } 47953a5a1b3Sopenharmony_ci 48053a5a1b3Sopenharmony_ci pa_make_fd_nonblock(u->fd); 48153a5a1b3Sopenharmony_ci 48253a5a1b3Sopenharmony_ci if (fstat(u->fd, &st) < 0) { 48353a5a1b3Sopenharmony_ci pa_log("fstat('%s'): %s", u->filename, pa_cstrerror(errno)); 48453a5a1b3Sopenharmony_ci goto fail; 48553a5a1b3Sopenharmony_ci } 48653a5a1b3Sopenharmony_ci 48753a5a1b3Sopenharmony_ci if (!S_ISFIFO(st.st_mode)) { 48853a5a1b3Sopenharmony_ci pa_log("'%s' is not a FIFO.", u->filename); 48953a5a1b3Sopenharmony_ci goto fail; 49053a5a1b3Sopenharmony_ci } 49153a5a1b3Sopenharmony_ci 49253a5a1b3Sopenharmony_ci pa_sink_new_data_init(&data); 49353a5a1b3Sopenharmony_ci data.driver = __FILE__; 49453a5a1b3Sopenharmony_ci data.module = m; 49553a5a1b3Sopenharmony_ci pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME)); 49653a5a1b3Sopenharmony_ci pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, u->filename); 49753a5a1b3Sopenharmony_ci pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Unix FIFO sink %s", u->filename); 49853a5a1b3Sopenharmony_ci pa_sink_new_data_set_sample_spec(&data, &ss); 49953a5a1b3Sopenharmony_ci pa_sink_new_data_set_channel_map(&data, &map); 50053a5a1b3Sopenharmony_ci 50153a5a1b3Sopenharmony_ci if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) { 50253a5a1b3Sopenharmony_ci pa_log("Invalid properties"); 50353a5a1b3Sopenharmony_ci pa_sink_new_data_done(&data); 50453a5a1b3Sopenharmony_ci goto fail; 50553a5a1b3Sopenharmony_ci } 50653a5a1b3Sopenharmony_ci 50753a5a1b3Sopenharmony_ci if (u->use_system_clock_for_timing) 50853a5a1b3Sopenharmony_ci u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY); 50953a5a1b3Sopenharmony_ci else 51053a5a1b3Sopenharmony_ci u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY); 51153a5a1b3Sopenharmony_ci pa_sink_new_data_done(&data); 51253a5a1b3Sopenharmony_ci 51353a5a1b3Sopenharmony_ci if (!u->sink) { 51453a5a1b3Sopenharmony_ci pa_log("Failed to create sink."); 51553a5a1b3Sopenharmony_ci goto fail; 51653a5a1b3Sopenharmony_ci } 51753a5a1b3Sopenharmony_ci 51853a5a1b3Sopenharmony_ci u->sink->parent.process_msg = sink_process_msg; 51953a5a1b3Sopenharmony_ci u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; 52053a5a1b3Sopenharmony_ci if (u->use_system_clock_for_timing) 52153a5a1b3Sopenharmony_ci u->sink->update_requested_latency = sink_update_requested_latency_cb; 52253a5a1b3Sopenharmony_ci u->sink->userdata = u; 52353a5a1b3Sopenharmony_ci 52453a5a1b3Sopenharmony_ci pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); 52553a5a1b3Sopenharmony_ci pa_sink_set_rtpoll(u->sink, u->rtpoll); 52653a5a1b3Sopenharmony_ci 52753a5a1b3Sopenharmony_ci u->bytes_dropped = 0; 52853a5a1b3Sopenharmony_ci u->fifo_error = false; 52953a5a1b3Sopenharmony_ci u->buffer_size = pa_frame_align(pa_pipe_buf(u->fd), &u->sink->sample_spec); 53053a5a1b3Sopenharmony_ci if (u->use_system_clock_for_timing) { 53153a5a1b3Sopenharmony_ci u->block_usec = pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec); 53253a5a1b3Sopenharmony_ci pa_sink_set_latency_range(u->sink, 0, u->block_usec); 53353a5a1b3Sopenharmony_ci thread_routine = thread_func_use_timing; 53453a5a1b3Sopenharmony_ci } else { 53553a5a1b3Sopenharmony_ci pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec)); 53653a5a1b3Sopenharmony_ci thread_routine = thread_func; 53753a5a1b3Sopenharmony_ci } 53853a5a1b3Sopenharmony_ci pa_sink_set_max_request(u->sink, u->buffer_size); 53953a5a1b3Sopenharmony_ci 54053a5a1b3Sopenharmony_ci u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); 54153a5a1b3Sopenharmony_ci pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); 54253a5a1b3Sopenharmony_ci pollfd->fd = u->fd; 54353a5a1b3Sopenharmony_ci pollfd->events = pollfd->revents = 0; 54453a5a1b3Sopenharmony_ci 54553a5a1b3Sopenharmony_ci if (!(u->thread = pa_thread_new("pipe-sink", thread_routine, u))) { 54653a5a1b3Sopenharmony_ci pa_log("Failed to create thread."); 54753a5a1b3Sopenharmony_ci goto fail; 54853a5a1b3Sopenharmony_ci } 54953a5a1b3Sopenharmony_ci 55053a5a1b3Sopenharmony_ci pa_sink_put(u->sink); 55153a5a1b3Sopenharmony_ci 55253a5a1b3Sopenharmony_ci pa_modargs_free(ma); 55353a5a1b3Sopenharmony_ci 55453a5a1b3Sopenharmony_ci return 0; 55553a5a1b3Sopenharmony_ci 55653a5a1b3Sopenharmony_cifail: 55753a5a1b3Sopenharmony_ci if (ma) 55853a5a1b3Sopenharmony_ci pa_modargs_free(ma); 55953a5a1b3Sopenharmony_ci 56053a5a1b3Sopenharmony_ci pa__done(m); 56153a5a1b3Sopenharmony_ci 56253a5a1b3Sopenharmony_ci return -1; 56353a5a1b3Sopenharmony_ci} 56453a5a1b3Sopenharmony_ci 56553a5a1b3Sopenharmony_ciint pa__get_n_used(pa_module *m) { 56653a5a1b3Sopenharmony_ci struct userdata *u; 56753a5a1b3Sopenharmony_ci 56853a5a1b3Sopenharmony_ci pa_assert(m); 56953a5a1b3Sopenharmony_ci pa_assert_se(u = m->userdata); 57053a5a1b3Sopenharmony_ci 57153a5a1b3Sopenharmony_ci return pa_sink_linked_by(u->sink); 57253a5a1b3Sopenharmony_ci} 57353a5a1b3Sopenharmony_ci 57453a5a1b3Sopenharmony_civoid pa__done(pa_module *m) { 57553a5a1b3Sopenharmony_ci struct userdata *u; 57653a5a1b3Sopenharmony_ci 57753a5a1b3Sopenharmony_ci pa_assert(m); 57853a5a1b3Sopenharmony_ci 57953a5a1b3Sopenharmony_ci if (!(u = m->userdata)) 58053a5a1b3Sopenharmony_ci return; 58153a5a1b3Sopenharmony_ci 58253a5a1b3Sopenharmony_ci if (u->sink) 58353a5a1b3Sopenharmony_ci pa_sink_unlink(u->sink); 58453a5a1b3Sopenharmony_ci 58553a5a1b3Sopenharmony_ci if (u->thread) { 58653a5a1b3Sopenharmony_ci pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL); 58753a5a1b3Sopenharmony_ci pa_thread_free(u->thread); 58853a5a1b3Sopenharmony_ci } 58953a5a1b3Sopenharmony_ci 59053a5a1b3Sopenharmony_ci pa_thread_mq_done(&u->thread_mq); 59153a5a1b3Sopenharmony_ci 59253a5a1b3Sopenharmony_ci if (u->sink) 59353a5a1b3Sopenharmony_ci pa_sink_unref(u->sink); 59453a5a1b3Sopenharmony_ci 59553a5a1b3Sopenharmony_ci if (u->memchunk.memblock) 59653a5a1b3Sopenharmony_ci pa_memblock_unref(u->memchunk.memblock); 59753a5a1b3Sopenharmony_ci 59853a5a1b3Sopenharmony_ci if (u->rtpoll_item) 59953a5a1b3Sopenharmony_ci pa_rtpoll_item_free(u->rtpoll_item); 60053a5a1b3Sopenharmony_ci 60153a5a1b3Sopenharmony_ci if (u->rtpoll) 60253a5a1b3Sopenharmony_ci pa_rtpoll_free(u->rtpoll); 60353a5a1b3Sopenharmony_ci 60453a5a1b3Sopenharmony_ci if (u->filename) { 60553a5a1b3Sopenharmony_ci if (u->do_unlink_fifo) 60653a5a1b3Sopenharmony_ci unlink(u->filename); 60753a5a1b3Sopenharmony_ci pa_xfree(u->filename); 60853a5a1b3Sopenharmony_ci } 60953a5a1b3Sopenharmony_ci 61053a5a1b3Sopenharmony_ci if (u->fd >= 0) 61153a5a1b3Sopenharmony_ci pa_assert_se(pa_close(u->fd) == 0); 61253a5a1b3Sopenharmony_ci 61353a5a1b3Sopenharmony_ci pa_xfree(u); 61453a5a1b3Sopenharmony_ci} 615