1
2/***
3  This file is part of PulseAudio.
4
5  Copyright 2006 Lennart Poettering
6
7  PulseAudio is free software; you can redistribute it and/or modify
8  it under the terms of the GNU Lesser General Public License as published
9  by the Free Software Foundation; either version 2.1 of the License,
10  or (at your option) any later version.
11
12  PulseAudio is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  General Public License for more details.
16
17  You should have received a copy of the GNU Lesser General Public License
18  along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
19***/
20
21#ifdef HAVE_CONFIG_H
22#include <config.h>
23#endif
24
25#include <stdio.h>
26#include <sys/socket.h>
27#include <netinet/in.h>
28#include <errno.h>
29#include <string.h>
30#include <unistd.h>
31#include <math.h>
32
33#include <pulse/rtclock.h>
34#include <pulse/timeval.h>
35#include <pulse/xmalloc.h>
36
37#include <pulsecore/core-error.h>
38#include <pulsecore/module.h>
39#include <pulsecore/llist.h>
40#include <pulsecore/sink.h>
41#include <pulsecore/sink-input.h>
42#include <pulsecore/memblockq.h>
43#include <pulsecore/log.h>
44#include <pulsecore/core-rtclock.h>
45#include <pulsecore/core-util.h>
46#include <pulsecore/modargs.h>
47#include <pulsecore/namereg.h>
48#include <pulsecore/sample-util.h>
49#include <pulsecore/macro.h>
50#include <pulsecore/socket-util.h>
51#include <pulsecore/atomic.h>
52#include <pulsecore/once.h>
53#include <pulsecore/poll.h>
54#include <pulsecore/arpa-inet.h>
55
56#include "rtp.h"
57#include "sdp.h"
58#include "sap.h"
59
60PA_MODULE_AUTHOR("Lennart Poettering");
61PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
62PA_MODULE_VERSION(PACKAGE_VERSION);
63PA_MODULE_LOAD_ONCE(false);
64PA_MODULE_USAGE(
65        "sink=<name of the sink> "
66        "sap_address=<multicast address to listen on> "
67        "latency_msec=<latency in ms> "
68);
69
70#define SAP_PORT 9875
71#define DEFAULT_SAP_ADDRESS "224.0.0.56"
72#define DEFAULT_LATENCY_MSEC 500
73#define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
74#define MAX_SESSIONS 16
75#define DEATH_TIMEOUT 20
76#define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
77
78static const char* const valid_modargs[] = {
79    "sink",
80    "sap_address",
81    "latency_msec",
82    NULL
83};
84
85struct session {
86    struct userdata *userdata;
87    PA_LLIST_FIELDS(struct session);
88
89    pa_sink_input *sink_input;
90    pa_memblockq *memblockq;
91
92    bool first_packet;
93    uint32_t offset;
94
95    struct pa_sdp_info sdp_info;
96
97    pa_rtp_context *rtp_context;
98
99    pa_rtpoll_item *rtpoll_item;
100
101    pa_atomic_t timestamp;
102
103    pa_usec_t intended_latency;
104    pa_usec_t sink_latency;
105
106    unsigned int base_rate;
107    pa_usec_t last_rate_update;
108    pa_usec_t last_latency;
109    double estimated_rate;
110    double avg_estimated_rate;
111};
112
113struct userdata {
114    pa_module *module;
115    pa_core *core;
116
117    pa_sap_context sap_context;
118    pa_io_event* sap_event;
119
120    pa_time_event *check_death_event;
121
122    char *sink_name;
123
124    PA_LLIST_HEAD(struct session, sessions);
125    pa_hashmap *by_origin;
126    int n_sessions;
127
128    pa_usec_t latency;
129};
130
131static void session_free(struct session *s);
132
133/* Called from I/O thread context */
134static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
135    struct session *s = PA_SINK_INPUT(o)->userdata;
136
137    switch (code) {
138        case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
139            *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
140
141            /* Fall through, the default handler will add in the extra
142             * latency added by the resampler */
143            break;
144    }
145
146    return pa_sink_input_process_msg(o, code, data, offset, chunk);
147}
148
149/* Called from I/O thread context */
150static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
151    struct session *s;
152    pa_sink_input_assert_ref(i);
153    pa_assert_se(s = i->userdata);
154
155    if (pa_memblockq_peek(s->memblockq, chunk) < 0)
156        return -1;
157
158    pa_memblockq_drop(s->memblockq, chunk->length);
159
160    return 0;
161}
162
163/* Called from I/O thread context */
164static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
165    struct session *s;
166
167    pa_sink_input_assert_ref(i);
168    pa_assert_se(s = i->userdata);
169
170    pa_memblockq_rewind(s->memblockq, nbytes);
171}
172
173/* Called from I/O thread context */
174static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
175    struct session *s;
176
177    pa_sink_input_assert_ref(i);
178    pa_assert_se(s = i->userdata);
179
180    pa_memblockq_set_maxrewind(s->memblockq, nbytes);
181}
182
183/* Called from main context */
184static void sink_input_kill(pa_sink_input* i) {
185    struct session *s;
186    pa_sink_input_assert_ref(i);
187    pa_assert_se(s = i->userdata);
188
189    pa_hashmap_remove_and_free(s->userdata->by_origin, s->sdp_info.origin);
190}
191
192/* Called from IO context */
193static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
194    struct session *s;
195    pa_sink_input_assert_ref(i);
196    pa_assert_se(s = i->userdata);
197
198    if (b)
199        pa_memblockq_flush_read(s->memblockq);
200    else
201        s->first_packet = false;
202}
203
204/* Called from I/O thread context */
205static int rtpoll_work_cb(pa_rtpoll_item *i) {
206    pa_memchunk chunk;
207    uint32_t timestamp;
208    int64_t k, j, delta;
209    struct timeval now = { 0, 0 };
210    struct session *s;
211    struct pollfd *p;
212
213    pa_assert_se(s = pa_rtpoll_item_get_work_userdata(i));
214
215    p = pa_rtpoll_item_get_pollfd(i, NULL);
216
217    if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
218        pa_log("poll() signalled bad revents.");
219        return -1;
220    }
221
222    if ((p->revents & POLLIN) == 0)
223        return 0;
224
225    p->revents = 0;
226
227    if (pa_rtp_recv(s->rtp_context, &chunk, s->userdata->module->core->mempool, &timestamp, &now) < 0)
228        return 0;
229
230    if (!PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
231        pa_memblock_unref(chunk.memblock);
232        return 0;
233    }
234
235    if (!s->first_packet) {
236        s->first_packet = true;
237        s->offset = timestamp;
238    }
239
240    /* Check whether there was a timestamp overflow */
241    k = (int64_t) timestamp - (int64_t) s->offset;
242    j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) timestamp;
243
244    if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
245        delta = k;
246    else
247        delta = j;
248
249    pa_memblockq_seek(s->memblockq, delta * (int64_t) pa_rtp_context_get_frame_size(s->rtp_context), PA_SEEK_RELATIVE,
250            true);
251
252    if (now.tv_sec == 0) {
253        PA_ONCE_BEGIN {
254            pa_log_warn("Using artificial time instead of timestamp");
255        } PA_ONCE_END;
256        pa_rtclock_get(&now);
257    } else
258        pa_rtclock_from_wallclock(&now);
259
260    if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
261        pa_log_warn("Queue overrun");
262        pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true);
263    }
264
265/*     pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
266
267    pa_memblock_unref(chunk.memblock);
268
269    /* The next timestamp we expect */
270    s->offset = timestamp + (uint32_t) (chunk.length / pa_rtp_context_get_frame_size(s->rtp_context));
271
272    pa_atomic_store(&s->timestamp, (int) now.tv_sec);
273
274    if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
275        pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
276        uint32_t current_rate = s->sink_input->sample_spec.rate;
277        uint32_t new_rate;
278        double estimated_rate, alpha = 0.02;
279
280        pa_log_debug("Updating sample rate");
281
282        wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
283        ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
284
285        pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
286
287        sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink, false);
288        sink_delay += pa_resampler_get_delay_usec(s->sink_input->thread_info.resampler);
289        render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
290
291        if (ri > render_delay+sink_delay)
292            ri -= render_delay+sink_delay;
293        else
294            ri = 0;
295
296        if (wi < ri)
297            latency = 0;
298        else
299            latency = wi - ri;
300
301        pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
302
303        /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
304         * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
305         * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
306         *                                           T
307         *                                 R̂ = ─────────────── Rⁿ .                             (1)
308         *                                     T - (Lⁿ - Lⁿ⁻ⁱ)
309         *
310         * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
311         * is correct).  But there is also the requirement to keep the buffer at a predefined target
312         * latency L̂.  So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
313         * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
314         * aT the latency is reduced from Lⁿ to L̂.  This strategy translates to the requirements
315         *            ₐ      R̂ - Rⁿ⁺ʲ                            a-j+1         j-1
316         *            Σ  T ────────── = L̂ - Lⁿ    with    Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
317         *           ʲ⁼ⁱ        R̂                                  a            a
318         * Solving for Rⁿ⁺ⁱ gives
319         *                                     T - ²∕ₐ₊₁(L̂ - Lⁿ)
320         *                              Rⁿ⁺ⁱ = ───────────────── R̂ .                            (2)
321         *                                            T
322         * In the code below a = 7 is used.
323         *
324         * Equation (1) is not directly used in (2), but instead an exponentially weighted average
325         * of the estimated rate R̂ is used.  This average R̅ is defined as
326         *                                R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
327         * Because it is difficult to find a fixed value for the coefficient α such that the
328         * averaging is without significant lag but oscillations are filtered out, a heuristic is
329         * used.  When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
330         * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
331         */
332        estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
333        if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
334          double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
335          alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
336        }
337        s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
338        s->estimated_rate = estimated_rate;
339        pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz  (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
340        new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
341        s->last_latency = latency;
342
343        if (new_rate < (uint32_t) (s->base_rate*0.8) || new_rate > (uint32_t) (s->base_rate*1.25)) {
344            pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", s->base_rate, new_rate);
345            new_rate = s->base_rate;
346        } else {
347            if (s->base_rate < new_rate + 20 && new_rate < s->base_rate + 20)
348                new_rate = s->base_rate;
349            /* Do the adjustment in small steps; 2‰ can be considered inaudible */
350            if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
351                pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
352                new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
353            }
354        }
355        s->sink_input->sample_spec.rate = new_rate;
356
357        pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
358
359        pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
360
361        pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
362
363        s->last_rate_update = pa_timeval_load(&now);
364    }
365
366    if (pa_memblockq_is_readable(s->memblockq) &&
367        s->sink_input->thread_info.underrun_for > 0) {
368        pa_log_debug("Requesting rewind due to end of underrun");
369        pa_sink_input_request_rewind(s->sink_input,
370                                     (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
371                                     false, true, false);
372    }
373
374    return 1;
375}
376
377/* Called from I/O thread context */
378static void sink_input_attach(pa_sink_input *i) {
379    struct session *s;
380
381    pa_sink_input_assert_ref(i);
382    pa_assert_se(s = i->userdata);
383
384    pa_assert(!s->rtpoll_item);
385    s->rtpoll_item = pa_rtp_context_get_rtpoll_item(s->rtp_context, i->sink->thread_info.rtpoll);
386
387    pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb, s);
388}
389
390/* Called from I/O thread context */
391static void sink_input_detach(pa_sink_input *i) {
392    struct session *s;
393    pa_sink_input_assert_ref(i);
394    pa_assert_se(s = i->userdata);
395
396    pa_assert(s->rtpoll_item);
397    pa_rtpoll_item_free(s->rtpoll_item);
398    s->rtpoll_item = NULL;
399}
400
401static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
402    int af, fd = -1, r, one;
403
404    pa_assert(sa);
405    pa_assert(salen > 0);
406
407    af = sa->sa_family;
408    if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
409        pa_log("Failed to create socket: %s", pa_cstrerror(errno));
410        goto fail;
411    }
412
413    pa_make_udp_socket_low_delay(fd);
414
415#ifdef SO_TIMESTAMP
416    one = 1;
417    if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
418        pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
419        goto fail;
420    }
421#else
422    pa_log("SO_TIMESTAMP unsupported on this platform");
423    goto fail;
424#endif
425
426    one = 1;
427    if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
428        pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
429        goto fail;
430    }
431
432    r = 0;
433    if (af == AF_INET) {
434        /* IPv4 multicast addresses are in the 224.0.0.0-239.255.255.255 range */
435        static const uint32_t ipv4_mcast_mask = 0xe0000000;
436
437        if ((ntohl(((const struct sockaddr_in*) sa)->sin_addr.s_addr) & ipv4_mcast_mask) == ipv4_mcast_mask) {
438            struct ip_mreq mr4;
439            memset(&mr4, 0, sizeof(mr4));
440            mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
441            r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
442        }
443#ifdef HAVE_IPV6
444    } else if (af == AF_INET6) {
445        /* IPv6 multicast addresses have 255 as the most significant byte */
446        if (((const struct sockaddr_in6*) sa)->sin6_addr.s6_addr[0] == 0xff) {
447            struct ipv6_mreq mr6;
448            memset(&mr6, 0, sizeof(mr6));
449            mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
450            r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
451        }
452#endif
453    } else
454        pa_assert_not_reached();
455
456    if (r < 0) {
457        pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
458        goto fail;
459    }
460
461    if (bind(fd, sa, salen) < 0) {
462        pa_log("bind() failed: %s", pa_cstrerror(errno));
463        goto fail;
464    }
465
466    return fd;
467
468fail:
469    if (fd >= 0)
470        close(fd);
471
472    return -1;
473}
474
475static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
476    struct session *s = NULL;
477    pa_sink *sink;
478    int fd = -1;
479    pa_memchunk silence;
480    pa_sink_input_new_data data;
481    struct timeval now;
482
483    pa_assert(u);
484    pa_assert(sdp_info);
485
486    if (u->n_sessions >= MAX_SESSIONS) {
487        pa_log("Session limit reached.");
488        goto fail;
489    }
490
491    if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
492        pa_log("Sink does not exist.");
493        goto fail;
494    }
495
496    pa_rtclock_get(&now);
497
498    s = pa_xnew0(struct session, 1);
499    s->userdata = u;
500    s->first_packet = false;
501    s->sdp_info = *sdp_info;
502    s->rtpoll_item = NULL;
503    s->intended_latency = u->latency;
504    s->last_rate_update = pa_timeval_load(&now);
505    s->last_latency = u->latency;
506    pa_atomic_store(&s->timestamp, (int) now.tv_sec);
507
508    if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
509        goto fail;
510
511    pa_sink_input_new_data_init(&data);
512    pa_sink_input_new_data_set_sink(&data, sink, false, true);
513    data.driver = __FILE__;
514    pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
515    pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
516                     "RTP Stream%s%s%s",
517                     sdp_info->session_name ? " (" : "",
518                     sdp_info->session_name ? sdp_info->session_name : "",
519                     sdp_info->session_name ? ")" : "");
520
521    if (sdp_info->session_name)
522        pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
523    pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
524    pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
525    data.module = u->module;
526    pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
527    data.flags = PA_SINK_INPUT_VARIABLE_RATE;
528
529    pa_sink_input_new(&s->sink_input, u->module->core, &data);
530    pa_sink_input_new_data_done(&data);
531
532    if (!s->sink_input) {
533        pa_log("Failed to create sink input.");
534        goto fail;
535    }
536
537    s->base_rate = (double) s->sink_input->sample_spec.rate;
538    s->estimated_rate = (double) s->sink_input->sample_spec.rate;
539    s->avg_estimated_rate = (double) s->sink_input->sample_spec.rate;
540
541    s->sink_input->userdata = s;
542
543    s->sink_input->parent.process_msg = sink_input_process_msg;
544    s->sink_input->pop = sink_input_pop_cb;
545    s->sink_input->process_rewind = sink_input_process_rewind_cb;
546    s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
547    s->sink_input->kill = sink_input_kill;
548    s->sink_input->attach = sink_input_attach;
549    s->sink_input->detach = sink_input_detach;
550    s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
551
552    pa_sink_input_get_silence(s->sink_input, &silence);
553
554    s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
555
556    if (s->intended_latency < s->sink_latency*2)
557        s->intended_latency = s->sink_latency*2;
558
559    s->memblockq = pa_memblockq_new(
560            "module-rtp-recv memblockq",
561            0,
562            MEMBLOCKQ_MAXLENGTH,
563            MEMBLOCKQ_MAXLENGTH,
564            &s->sink_input->sample_spec,
565            pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
566            0,
567            0,
568            &silence);
569
570    pa_memblock_unref(silence.memblock);
571
572    if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, &s->sdp_info.sample_spec, sdp_info->enable_opus)))
573        goto fail;
574
575    pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
576    u->n_sessions++;
577    PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
578
579    pa_sink_input_put(s->sink_input);
580
581    pa_log_info("New session '%s'", s->sdp_info.session_name);
582
583    return s;
584
585fail:
586    pa_xfree(s);
587
588    if (fd >= 0)
589        pa_close(fd);
590
591    return NULL;
592}
593
594static void session_free(struct session *s) {
595    pa_assert(s);
596
597    pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
598
599    pa_sink_input_unlink(s->sink_input);
600    pa_sink_input_unref(s->sink_input);
601
602    PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
603    pa_assert(s->userdata->n_sessions >= 1);
604    s->userdata->n_sessions--;
605
606    pa_memblockq_free(s->memblockq);
607    pa_sdp_info_destroy(&s->sdp_info);
608    pa_rtp_context_free(s->rtp_context);
609
610    pa_xfree(s);
611}
612
613static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
614    struct userdata *u = userdata;
615    bool goodbye = false;
616    pa_sdp_info info;
617    struct session *s;
618
619    pa_assert(m);
620    pa_assert(e);
621    pa_assert(u);
622    pa_assert(fd == u->sap_context.fd);
623    pa_assert(flags == PA_IO_EVENT_INPUT);
624
625    if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
626        return;
627
628    if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
629        return;
630
631    if (goodbye) {
632        pa_hashmap_remove_and_free(u->by_origin, info.origin);
633        pa_sdp_info_destroy(&info);
634    } else {
635
636        if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
637            if (!session_new(u, &info))
638                pa_sdp_info_destroy(&info);
639
640        } else {
641            struct timeval now;
642            pa_rtclock_get(&now);
643            pa_atomic_store(&s->timestamp, (int) now.tv_sec);
644
645            pa_sdp_info_destroy(&info);
646        }
647    }
648}
649
650static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
651    struct session *s, *n;
652    struct userdata *u = userdata;
653    struct timeval now;
654
655    pa_assert(m);
656    pa_assert(t);
657    pa_assert(u);
658
659    pa_rtclock_get(&now);
660
661    pa_log_debug("Checking for dead streams ...");
662
663    for (s = u->sessions; s; s = n) {
664        int k;
665        n = s->next;
666
667        k = pa_atomic_load(&s->timestamp);
668
669        if (k + DEATH_TIMEOUT < now.tv_sec)
670            pa_hashmap_remove_and_free(u->by_origin, s->sdp_info.origin);
671    }
672
673    /* Restart timer */
674    pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
675}
676
677int pa__init(pa_module*m) {
678    struct userdata *u;
679    pa_modargs *ma = NULL;
680    struct sockaddr_in sa4;
681#ifdef HAVE_IPV6
682    struct sockaddr_in6 sa6;
683#endif
684    struct sockaddr *sa;
685    socklen_t salen;
686    const char *sap_address;
687    uint32_t latency_msec;
688    int fd = -1;
689
690    pa_assert(m);
691
692    if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
693        pa_log("failed to parse module arguments");
694        goto fail;
695    }
696
697    sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
698
699    if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
700        sa4.sin_family = AF_INET;
701        sa4.sin_port = htons(SAP_PORT);
702        sa = (struct sockaddr*) &sa4;
703        salen = sizeof(sa4);
704#ifdef HAVE_IPV6
705    } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
706        sa6.sin6_family = AF_INET6;
707        sa6.sin6_port = htons(SAP_PORT);
708        sa = (struct sockaddr*) &sa6;
709        salen = sizeof(sa6);
710#endif
711    } else {
712        pa_log("Invalid SAP address '%s'", sap_address);
713        goto fail;
714    }
715
716    latency_msec = DEFAULT_LATENCY_MSEC;
717    if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 300000) {
718        pa_log("Invalid latency specification");
719        goto fail;
720    }
721
722    if ((fd = mcast_socket(sa, salen)) < 0)
723        goto fail;
724
725    m->userdata = u = pa_xnew(struct userdata, 1);
726    u->module = m;
727    u->core = m->core;
728    u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
729    u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
730
731    u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
732    pa_sap_context_init_recv(&u->sap_context, fd);
733
734    PA_LLIST_HEAD_INIT(struct session, u->sessions);
735    u->n_sessions = 0;
736    u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free);
737
738    u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
739
740    pa_modargs_free(ma);
741
742    return 0;
743
744fail:
745    if (ma)
746        pa_modargs_free(ma);
747
748    if (fd >= 0)
749        pa_close(fd);
750
751    return -1;
752}
753
754void pa__done(pa_module*m) {
755    struct userdata *u;
756
757    pa_assert(m);
758
759    if (!(u = m->userdata))
760        return;
761
762    if (u->sap_event)
763        m->core->mainloop->io_free(u->sap_event);
764
765    if (u->check_death_event)
766        m->core->mainloop->time_free(u->check_death_event);
767
768    pa_sap_context_destroy(&u->sap_context);
769
770    if (u->by_origin)
771        pa_hashmap_free(u->by_origin);
772
773    pa_xfree(u->sink_name);
774    pa_xfree(u);
775}
776