1cabdff1aSopenharmony_ci/*
2cabdff1aSopenharmony_ci * UDP prototype streaming system
3cabdff1aSopenharmony_ci * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
4cabdff1aSopenharmony_ci *
5cabdff1aSopenharmony_ci * This file is part of FFmpeg.
6cabdff1aSopenharmony_ci *
7cabdff1aSopenharmony_ci * FFmpeg is free software; you can redistribute it and/or
8cabdff1aSopenharmony_ci * modify it under the terms of the GNU Lesser General Public
9cabdff1aSopenharmony_ci * License as published by the Free Software Foundation; either
10cabdff1aSopenharmony_ci * version 2.1 of the License, or (at your option) any later version.
11cabdff1aSopenharmony_ci *
12cabdff1aSopenharmony_ci * FFmpeg is distributed in the hope that it will be useful,
13cabdff1aSopenharmony_ci * but WITHOUT ANY WARRANTY; without even the implied warranty of
14cabdff1aSopenharmony_ci * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15cabdff1aSopenharmony_ci * Lesser General Public License for more details.
16cabdff1aSopenharmony_ci *
17cabdff1aSopenharmony_ci * You should have received a copy of the GNU Lesser General Public
18cabdff1aSopenharmony_ci * License along with FFmpeg; if not, write to the Free Software
19cabdff1aSopenharmony_ci * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20cabdff1aSopenharmony_ci */
21cabdff1aSopenharmony_ci
22cabdff1aSopenharmony_ci/**
23cabdff1aSopenharmony_ci * @file
24cabdff1aSopenharmony_ci * UDP protocol
25cabdff1aSopenharmony_ci */
26cabdff1aSopenharmony_ci
27cabdff1aSopenharmony_ci#define _DEFAULT_SOURCE
28cabdff1aSopenharmony_ci#define _BSD_SOURCE     /* Needed for using struct ip_mreq with recent glibc */
29cabdff1aSopenharmony_ci
30cabdff1aSopenharmony_ci#include "avformat.h"
31cabdff1aSopenharmony_ci#include "avio_internal.h"
32cabdff1aSopenharmony_ci#include "libavutil/avassert.h"
33cabdff1aSopenharmony_ci#include "libavutil/parseutils.h"
34cabdff1aSopenharmony_ci#include "libavutil/fifo.h"
35cabdff1aSopenharmony_ci#include "libavutil/intreadwrite.h"
36cabdff1aSopenharmony_ci#include "libavutil/avstring.h"
37cabdff1aSopenharmony_ci#include "libavutil/opt.h"
38cabdff1aSopenharmony_ci#include "libavutil/log.h"
39cabdff1aSopenharmony_ci#include "libavutil/time.h"
40cabdff1aSopenharmony_ci#include "internal.h"
41cabdff1aSopenharmony_ci#include "network.h"
42cabdff1aSopenharmony_ci#include "os_support.h"
43cabdff1aSopenharmony_ci#include "url.h"
44cabdff1aSopenharmony_ci#include "ip.h"
45cabdff1aSopenharmony_ci
46cabdff1aSopenharmony_ci#ifdef __APPLE__
47cabdff1aSopenharmony_ci#include "TargetConditionals.h"
48cabdff1aSopenharmony_ci#endif
49cabdff1aSopenharmony_ci
50cabdff1aSopenharmony_ci#if HAVE_UDPLITE_H
51cabdff1aSopenharmony_ci#include "udplite.h"
52cabdff1aSopenharmony_ci#else
53cabdff1aSopenharmony_ci/* On many Linux systems, udplite.h is missing but the kernel supports UDP-Lite.
54cabdff1aSopenharmony_ci * So, we provide a fallback here.
55cabdff1aSopenharmony_ci */
56cabdff1aSopenharmony_ci#define UDPLITE_SEND_CSCOV                               10
57cabdff1aSopenharmony_ci#define UDPLITE_RECV_CSCOV                               11
58cabdff1aSopenharmony_ci#endif
59cabdff1aSopenharmony_ci
60cabdff1aSopenharmony_ci#ifndef IPPROTO_UDPLITE
61cabdff1aSopenharmony_ci#define IPPROTO_UDPLITE                                  136
62cabdff1aSopenharmony_ci#endif
63cabdff1aSopenharmony_ci
64cabdff1aSopenharmony_ci#if HAVE_W32THREADS
65cabdff1aSopenharmony_ci#undef HAVE_PTHREAD_CANCEL
66cabdff1aSopenharmony_ci#define HAVE_PTHREAD_CANCEL 1
67cabdff1aSopenharmony_ci#endif
68cabdff1aSopenharmony_ci
69cabdff1aSopenharmony_ci#if HAVE_PTHREAD_CANCEL
70cabdff1aSopenharmony_ci#include "libavutil/thread.h"
71cabdff1aSopenharmony_ci#endif
72cabdff1aSopenharmony_ci
73cabdff1aSopenharmony_ci#ifndef IPV6_ADD_MEMBERSHIP
74cabdff1aSopenharmony_ci#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
75cabdff1aSopenharmony_ci#define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
76cabdff1aSopenharmony_ci#endif
77cabdff1aSopenharmony_ci
78cabdff1aSopenharmony_ci#define UDP_TX_BUF_SIZE 32768
79cabdff1aSopenharmony_ci#define UDP_RX_BUF_SIZE 393216
80cabdff1aSopenharmony_ci#define UDP_MAX_PKT_SIZE 65536
81cabdff1aSopenharmony_ci#define UDP_HEADER_SIZE 8
82cabdff1aSopenharmony_ci
83cabdff1aSopenharmony_citypedef struct UDPContext {
84cabdff1aSopenharmony_ci    const AVClass *class;
85cabdff1aSopenharmony_ci    int udp_fd;
86cabdff1aSopenharmony_ci    int ttl;
87cabdff1aSopenharmony_ci    int udplite_coverage;
88cabdff1aSopenharmony_ci    int buffer_size;
89cabdff1aSopenharmony_ci    int pkt_size;
90cabdff1aSopenharmony_ci    int is_multicast;
91cabdff1aSopenharmony_ci    int is_broadcast;
92cabdff1aSopenharmony_ci    int local_port;
93cabdff1aSopenharmony_ci    int reuse_socket;
94cabdff1aSopenharmony_ci    int overrun_nonfatal;
95cabdff1aSopenharmony_ci    struct sockaddr_storage dest_addr;
96cabdff1aSopenharmony_ci    int dest_addr_len;
97cabdff1aSopenharmony_ci    int is_connected;
98cabdff1aSopenharmony_ci
99cabdff1aSopenharmony_ci    /* Circular Buffer variables for use in UDP receive code */
100cabdff1aSopenharmony_ci    int circular_buffer_size;
101cabdff1aSopenharmony_ci    AVFifo *fifo;
102cabdff1aSopenharmony_ci    int circular_buffer_error;
103cabdff1aSopenharmony_ci    int64_t bitrate; /* number of bits to send per second */
104cabdff1aSopenharmony_ci    int64_t burst_bits;
105cabdff1aSopenharmony_ci    int close_req;
106cabdff1aSopenharmony_ci#if HAVE_PTHREAD_CANCEL
107cabdff1aSopenharmony_ci    pthread_t circular_buffer_thread;
108cabdff1aSopenharmony_ci    pthread_mutex_t mutex;
109cabdff1aSopenharmony_ci    pthread_cond_t cond;
110cabdff1aSopenharmony_ci    int thread_started;
111cabdff1aSopenharmony_ci#endif
112cabdff1aSopenharmony_ci    uint8_t tmp[UDP_MAX_PKT_SIZE+4];
113cabdff1aSopenharmony_ci    int remaining_in_dg;
114cabdff1aSopenharmony_ci    char *localaddr;
115cabdff1aSopenharmony_ci    int timeout;
116cabdff1aSopenharmony_ci    struct sockaddr_storage local_addr_storage;
117cabdff1aSopenharmony_ci    char *sources;
118cabdff1aSopenharmony_ci    char *block;
119cabdff1aSopenharmony_ci    IPSourceFilters filters;
120cabdff1aSopenharmony_ci} UDPContext;
121cabdff1aSopenharmony_ci
122cabdff1aSopenharmony_ci#define OFFSET(x) offsetof(UDPContext, x)
123cabdff1aSopenharmony_ci#define D AV_OPT_FLAG_DECODING_PARAM
124cabdff1aSopenharmony_ci#define E AV_OPT_FLAG_ENCODING_PARAM
125cabdff1aSopenharmony_cistatic const AVOption options[] = {
126cabdff1aSopenharmony_ci    { "buffer_size",    "System data size (in bytes)",                     OFFSET(buffer_size),    AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
127cabdff1aSopenharmony_ci    { "bitrate",        "Bits to send per second",                         OFFSET(bitrate),        AV_OPT_TYPE_INT64,  { .i64 = 0  },     0, INT64_MAX, .flags = E },
128cabdff1aSopenharmony_ci    { "burst_bits",     "Max length of bursts in bits (when using bitrate)", OFFSET(burst_bits),   AV_OPT_TYPE_INT64,  { .i64 = 0  },     0, INT64_MAX, .flags = E },
129cabdff1aSopenharmony_ci    { "localport",      "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, D|E },
130cabdff1aSopenharmony_ci    { "local_port",     "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
131cabdff1aSopenharmony_ci    { "localaddr",      "Local address",                                   OFFSET(localaddr),      AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
132cabdff1aSopenharmony_ci    { "udplite_coverage", "choose UDPLite head size which should be validated by checksum", OFFSET(udplite_coverage), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E },
133cabdff1aSopenharmony_ci    { "pkt_size",       "Maximum UDP packet size",                         OFFSET(pkt_size),       AV_OPT_TYPE_INT,    { .i64 = 1472 },  -1, INT_MAX, .flags = D|E },
134cabdff1aSopenharmony_ci    { "reuse",          "explicitly allow reusing UDP sockets",            OFFSET(reuse_socket),   AV_OPT_TYPE_BOOL,   { .i64 = -1 },    -1, 1,       D|E },
135cabdff1aSopenharmony_ci    { "reuse_socket",   "explicitly allow reusing UDP sockets",            OFFSET(reuse_socket),   AV_OPT_TYPE_BOOL,   { .i64 = -1 },    -1, 1,       .flags = D|E },
136cabdff1aSopenharmony_ci    { "broadcast", "explicitly allow or disallow broadcast destination",   OFFSET(is_broadcast),   AV_OPT_TYPE_BOOL,   { .i64 = 0  },     0, 1,       E },
137cabdff1aSopenharmony_ci    { "ttl",            "Time to live (multicast only)",                   OFFSET(ttl),            AV_OPT_TYPE_INT,    { .i64 = 16 },     0, 255,     E },
138cabdff1aSopenharmony_ci    { "connect",        "set if connect() should be called on socket",     OFFSET(is_connected),   AV_OPT_TYPE_BOOL,   { .i64 =  0 },     0, 1,       .flags = D|E },
139cabdff1aSopenharmony_ci    { "fifo_size",      "set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
140cabdff1aSopenharmony_ci    { "overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1,    D },
141cabdff1aSopenharmony_ci    { "timeout",        "set raise error timeout, in microseconds (only in read mode)",OFFSET(timeout),         AV_OPT_TYPE_INT,  {.i64 = 0}, 0, INT_MAX, D },
142cabdff1aSopenharmony_ci    { "sources",        "Source list",                                     OFFSET(sources),        AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
143cabdff1aSopenharmony_ci    { "block",          "Block list",                                      OFFSET(block),          AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
144cabdff1aSopenharmony_ci    { NULL }
145cabdff1aSopenharmony_ci};
146cabdff1aSopenharmony_ci
147cabdff1aSopenharmony_cistatic const AVClass udp_class = {
148cabdff1aSopenharmony_ci    .class_name = "udp",
149cabdff1aSopenharmony_ci    .item_name  = av_default_item_name,
150cabdff1aSopenharmony_ci    .option     = options,
151cabdff1aSopenharmony_ci    .version    = LIBAVUTIL_VERSION_INT,
152cabdff1aSopenharmony_ci};
153cabdff1aSopenharmony_ci
154cabdff1aSopenharmony_cistatic const AVClass udplite_context_class = {
155cabdff1aSopenharmony_ci    .class_name     = "udplite",
156cabdff1aSopenharmony_ci    .item_name      = av_default_item_name,
157cabdff1aSopenharmony_ci    .option         = options,
158cabdff1aSopenharmony_ci    .version        = LIBAVUTIL_VERSION_INT,
159cabdff1aSopenharmony_ci};
160cabdff1aSopenharmony_ci
161cabdff1aSopenharmony_cistatic int udp_set_multicast_ttl(int sockfd, int mcastTTL,
162cabdff1aSopenharmony_ci                                 struct sockaddr *addr,
163cabdff1aSopenharmony_ci                                 void *logctx)
164cabdff1aSopenharmony_ci{
165cabdff1aSopenharmony_ci    int protocol, cmd;
166cabdff1aSopenharmony_ci
167cabdff1aSopenharmony_ci    /* There is some confusion in the world whether IP_MULTICAST_TTL
168cabdff1aSopenharmony_ci     * takes a byte or an int as an argument.
169cabdff1aSopenharmony_ci     * BSD seems to indicate byte so we are going with that and use
170cabdff1aSopenharmony_ci     * int and fall back to byte to be safe */
171cabdff1aSopenharmony_ci    switch (addr->sa_family) {
172cabdff1aSopenharmony_ci#ifdef IP_MULTICAST_TTL
173cabdff1aSopenharmony_ci        case AF_INET:
174cabdff1aSopenharmony_ci            protocol = IPPROTO_IP;
175cabdff1aSopenharmony_ci            cmd      = IP_MULTICAST_TTL;
176cabdff1aSopenharmony_ci            break;
177cabdff1aSopenharmony_ci#endif
178cabdff1aSopenharmony_ci#ifdef IPV6_MULTICAST_HOPS
179cabdff1aSopenharmony_ci        case AF_INET6:
180cabdff1aSopenharmony_ci            protocol = IPPROTO_IPV6;
181cabdff1aSopenharmony_ci            cmd      = IPV6_MULTICAST_HOPS;
182cabdff1aSopenharmony_ci            break;
183cabdff1aSopenharmony_ci#endif
184cabdff1aSopenharmony_ci        default:
185cabdff1aSopenharmony_ci            return 0;
186cabdff1aSopenharmony_ci    }
187cabdff1aSopenharmony_ci
188cabdff1aSopenharmony_ci    if (setsockopt(sockfd, protocol, cmd, &mcastTTL, sizeof(mcastTTL)) < 0) {
189cabdff1aSopenharmony_ci        /* BSD compatibility */
190cabdff1aSopenharmony_ci        unsigned char ttl = (unsigned char) mcastTTL;
191cabdff1aSopenharmony_ci
192cabdff1aSopenharmony_ci        ff_log_net_error(logctx, AV_LOG_DEBUG, "setsockopt(IPV4/IPV6 MULTICAST TTL)");
193cabdff1aSopenharmony_ci        if (setsockopt(sockfd, protocol, cmd, &ttl, sizeof(ttl)) < 0) {
194cabdff1aSopenharmony_ci            ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IPV4/IPV6 MULTICAST TTL)");
195cabdff1aSopenharmony_ci            return ff_neterrno();
196cabdff1aSopenharmony_ci        }
197cabdff1aSopenharmony_ci    }
198cabdff1aSopenharmony_ci
199cabdff1aSopenharmony_ci    return 0;
200cabdff1aSopenharmony_ci}
201cabdff1aSopenharmony_ci
202cabdff1aSopenharmony_cistatic int udp_join_multicast_group(int sockfd, struct sockaddr *addr,
203cabdff1aSopenharmony_ci                                    struct sockaddr *local_addr, void *logctx)
204cabdff1aSopenharmony_ci{
205cabdff1aSopenharmony_ci#ifdef IP_ADD_MEMBERSHIP
206cabdff1aSopenharmony_ci    if (addr->sa_family == AF_INET) {
207cabdff1aSopenharmony_ci        struct ip_mreq mreq;
208cabdff1aSopenharmony_ci
209cabdff1aSopenharmony_ci        mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
210cabdff1aSopenharmony_ci        if (local_addr)
211cabdff1aSopenharmony_ci            mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
212cabdff1aSopenharmony_ci        else
213cabdff1aSopenharmony_ci            mreq.imr_interface.s_addr = INADDR_ANY;
214cabdff1aSopenharmony_ci        if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
215cabdff1aSopenharmony_ci            ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)");
216cabdff1aSopenharmony_ci            return ff_neterrno();
217cabdff1aSopenharmony_ci        }
218cabdff1aSopenharmony_ci    }
219cabdff1aSopenharmony_ci#endif
220cabdff1aSopenharmony_ci#if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
221cabdff1aSopenharmony_ci    if (addr->sa_family == AF_INET6) {
222cabdff1aSopenharmony_ci        struct ipv6_mreq mreq6;
223cabdff1aSopenharmony_ci
224cabdff1aSopenharmony_ci        memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
225cabdff1aSopenharmony_ci        //TODO: Interface index should be looked up from local_addr
226cabdff1aSopenharmony_ci        mreq6.ipv6mr_interface = 0;
227cabdff1aSopenharmony_ci        if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
228cabdff1aSopenharmony_ci            ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP)");
229cabdff1aSopenharmony_ci            return ff_neterrno();
230cabdff1aSopenharmony_ci        }
231cabdff1aSopenharmony_ci    }
232cabdff1aSopenharmony_ci#endif
233cabdff1aSopenharmony_ci    return 0;
234cabdff1aSopenharmony_ci}
235cabdff1aSopenharmony_ci
236cabdff1aSopenharmony_cistatic int udp_leave_multicast_group(int sockfd, struct sockaddr *addr,
237cabdff1aSopenharmony_ci                                     struct sockaddr *local_addr, void *logctx)
238cabdff1aSopenharmony_ci{
239cabdff1aSopenharmony_ci#ifdef IP_DROP_MEMBERSHIP
240cabdff1aSopenharmony_ci    if (addr->sa_family == AF_INET) {
241cabdff1aSopenharmony_ci        struct ip_mreq mreq;
242cabdff1aSopenharmony_ci
243cabdff1aSopenharmony_ci        mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
244cabdff1aSopenharmony_ci        if (local_addr)
245cabdff1aSopenharmony_ci            mreq.imr_interface = ((struct sockaddr_in *)local_addr)->sin_addr;
246cabdff1aSopenharmony_ci        else
247cabdff1aSopenharmony_ci            mreq.imr_interface.s_addr = INADDR_ANY;
248cabdff1aSopenharmony_ci        if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
249cabdff1aSopenharmony_ci            ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)");
250cabdff1aSopenharmony_ci            return -1;
251cabdff1aSopenharmony_ci        }
252cabdff1aSopenharmony_ci    }
253cabdff1aSopenharmony_ci#endif
254cabdff1aSopenharmony_ci#if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
255cabdff1aSopenharmony_ci    if (addr->sa_family == AF_INET6) {
256cabdff1aSopenharmony_ci        struct ipv6_mreq mreq6;
257cabdff1aSopenharmony_ci
258cabdff1aSopenharmony_ci        memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
259cabdff1aSopenharmony_ci        //TODO: Interface index should be looked up from local_addr
260cabdff1aSopenharmony_ci        mreq6.ipv6mr_interface = 0;
261cabdff1aSopenharmony_ci        if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
262cabdff1aSopenharmony_ci            ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP)");
263cabdff1aSopenharmony_ci            return -1;
264cabdff1aSopenharmony_ci        }
265cabdff1aSopenharmony_ci    }
266cabdff1aSopenharmony_ci#endif
267cabdff1aSopenharmony_ci    return 0;
268cabdff1aSopenharmony_ci}
269cabdff1aSopenharmony_ci
270cabdff1aSopenharmony_cistatic int udp_set_multicast_sources(URLContext *h,
271cabdff1aSopenharmony_ci                                     int sockfd, struct sockaddr *addr,
272cabdff1aSopenharmony_ci                                     int addr_len, struct sockaddr_storage *local_addr,
273cabdff1aSopenharmony_ci                                     struct sockaddr_storage *sources,
274cabdff1aSopenharmony_ci                                     int nb_sources, int include)
275cabdff1aSopenharmony_ci{
276cabdff1aSopenharmony_ci    int i;
277cabdff1aSopenharmony_ci    if (addr->sa_family != AF_INET) {
278cabdff1aSopenharmony_ci#if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE)
279cabdff1aSopenharmony_ci        /* For IPv4 prefer the old approach, as that alone works reliably on
280cabdff1aSopenharmony_ci         * Windows and it also supports supplying the interface based on its
281cabdff1aSopenharmony_ci         * address. */
282cabdff1aSopenharmony_ci        int i;
283cabdff1aSopenharmony_ci        for (i = 0; i < nb_sources; i++) {
284cabdff1aSopenharmony_ci            struct group_source_req mreqs;
285cabdff1aSopenharmony_ci            int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6;
286cabdff1aSopenharmony_ci
287cabdff1aSopenharmony_ci            //TODO: Interface index should be looked up from local_addr
288cabdff1aSopenharmony_ci            mreqs.gsr_interface = 0;
289cabdff1aSopenharmony_ci            memcpy(&mreqs.gsr_group, addr, addr_len);
290cabdff1aSopenharmony_ci            memcpy(&mreqs.gsr_source, &sources[i], sizeof(*sources));
291cabdff1aSopenharmony_ci
292cabdff1aSopenharmony_ci            if (setsockopt(sockfd, level,
293cabdff1aSopenharmony_ci                           include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE,
294cabdff1aSopenharmony_ci                           (const void *)&mreqs, sizeof(mreqs)) < 0) {
295cabdff1aSopenharmony_ci                if (include)
296cabdff1aSopenharmony_ci                    ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(MCAST_JOIN_SOURCE_GROUP)");
297cabdff1aSopenharmony_ci                else
298cabdff1aSopenharmony_ci                    ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(MCAST_BLOCK_SOURCE)");
299cabdff1aSopenharmony_ci                return ff_neterrno();
300cabdff1aSopenharmony_ci            }
301cabdff1aSopenharmony_ci        }
302cabdff1aSopenharmony_ci        return 0;
303cabdff1aSopenharmony_ci#else
304cabdff1aSopenharmony_ci        av_log(h, AV_LOG_ERROR,
305cabdff1aSopenharmony_ci               "Setting multicast sources only supported for IPv4\n");
306cabdff1aSopenharmony_ci        return AVERROR(EINVAL);
307cabdff1aSopenharmony_ci#endif
308cabdff1aSopenharmony_ci    }
309cabdff1aSopenharmony_ci#if HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
310cabdff1aSopenharmony_ci    for (i = 0; i < nb_sources; i++) {
311cabdff1aSopenharmony_ci        struct ip_mreq_source mreqs;
312cabdff1aSopenharmony_ci        if (sources[i].ss_family != AF_INET) {
313cabdff1aSopenharmony_ci            av_log(h, AV_LOG_ERROR, "Source/block address %d is of incorrect protocol family\n", i + 1);
314cabdff1aSopenharmony_ci            return AVERROR(EINVAL);
315cabdff1aSopenharmony_ci        }
316cabdff1aSopenharmony_ci
317cabdff1aSopenharmony_ci        mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
318cabdff1aSopenharmony_ci        if (local_addr)
319cabdff1aSopenharmony_ci            mreqs.imr_interface = ((struct sockaddr_in *)local_addr)->sin_addr;
320cabdff1aSopenharmony_ci        else
321cabdff1aSopenharmony_ci            mreqs.imr_interface.s_addr = INADDR_ANY;
322cabdff1aSopenharmony_ci        mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in *)&sources[i])->sin_addr.s_addr;
323cabdff1aSopenharmony_ci
324cabdff1aSopenharmony_ci        if (setsockopt(sockfd, IPPROTO_IP,
325cabdff1aSopenharmony_ci                       include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
326cabdff1aSopenharmony_ci                       (const void *)&mreqs, sizeof(mreqs)) < 0) {
327cabdff1aSopenharmony_ci            if (include)
328cabdff1aSopenharmony_ci                ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
329cabdff1aSopenharmony_ci            else
330cabdff1aSopenharmony_ci                ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)");
331cabdff1aSopenharmony_ci            return ff_neterrno();
332cabdff1aSopenharmony_ci        }
333cabdff1aSopenharmony_ci    }
334cabdff1aSopenharmony_ci#else
335cabdff1aSopenharmony_ci    return AVERROR(ENOSYS);
336cabdff1aSopenharmony_ci#endif
337cabdff1aSopenharmony_ci    return 0;
338cabdff1aSopenharmony_ci}
339cabdff1aSopenharmony_cistatic int udp_set_url(URLContext *h,
340cabdff1aSopenharmony_ci                       struct sockaddr_storage *addr,
341cabdff1aSopenharmony_ci                       const char *hostname, int port)
342cabdff1aSopenharmony_ci{
343cabdff1aSopenharmony_ci    struct addrinfo *res0;
344cabdff1aSopenharmony_ci    int addr_len;
345cabdff1aSopenharmony_ci
346cabdff1aSopenharmony_ci    res0 = ff_ip_resolve_host(h, hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
347cabdff1aSopenharmony_ci    if (!res0) return AVERROR(EIO);
348cabdff1aSopenharmony_ci    memcpy(addr, res0->ai_addr, res0->ai_addrlen);
349cabdff1aSopenharmony_ci    addr_len = res0->ai_addrlen;
350cabdff1aSopenharmony_ci    freeaddrinfo(res0);
351cabdff1aSopenharmony_ci
352cabdff1aSopenharmony_ci    return addr_len;
353cabdff1aSopenharmony_ci}
354cabdff1aSopenharmony_ci
355cabdff1aSopenharmony_cistatic int udp_socket_create(URLContext *h, struct sockaddr_storage *addr,
356cabdff1aSopenharmony_ci                             socklen_t *addr_len, const char *localaddr)
357cabdff1aSopenharmony_ci{
358cabdff1aSopenharmony_ci    UDPContext *s = h->priv_data;
359cabdff1aSopenharmony_ci    int udp_fd = -1;
360cabdff1aSopenharmony_ci    struct addrinfo *res0, *res;
361cabdff1aSopenharmony_ci    int family = AF_UNSPEC;
362cabdff1aSopenharmony_ci
363cabdff1aSopenharmony_ci    if (((struct sockaddr *) &s->dest_addr)->sa_family)
364cabdff1aSopenharmony_ci        family = ((struct sockaddr *) &s->dest_addr)->sa_family;
365cabdff1aSopenharmony_ci    res0 = ff_ip_resolve_host(h, (localaddr && localaddr[0]) ? localaddr : NULL,
366cabdff1aSopenharmony_ci                            s->local_port,
367cabdff1aSopenharmony_ci                            SOCK_DGRAM, family, AI_PASSIVE);
368cabdff1aSopenharmony_ci    if (!res0)
369cabdff1aSopenharmony_ci        goto fail;
370cabdff1aSopenharmony_ci    for (res = res0; res; res=res->ai_next) {
371cabdff1aSopenharmony_ci        if (s->udplite_coverage)
372cabdff1aSopenharmony_ci            udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, IPPROTO_UDPLITE, h);
373cabdff1aSopenharmony_ci        else
374cabdff1aSopenharmony_ci            udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, 0, h);
375cabdff1aSopenharmony_ci        if (udp_fd != -1) break;
376cabdff1aSopenharmony_ci        ff_log_net_error(h, AV_LOG_ERROR, "socket");
377cabdff1aSopenharmony_ci    }
378cabdff1aSopenharmony_ci
379cabdff1aSopenharmony_ci    if (udp_fd < 0)
380cabdff1aSopenharmony_ci        goto fail;
381cabdff1aSopenharmony_ci
382cabdff1aSopenharmony_ci    memcpy(addr, res->ai_addr, res->ai_addrlen);
383cabdff1aSopenharmony_ci    *addr_len = res->ai_addrlen;
384cabdff1aSopenharmony_ci
385cabdff1aSopenharmony_ci    freeaddrinfo(res0);
386cabdff1aSopenharmony_ci
387cabdff1aSopenharmony_ci    return udp_fd;
388cabdff1aSopenharmony_ci
389cabdff1aSopenharmony_ci fail:
390cabdff1aSopenharmony_ci    if (udp_fd >= 0)
391cabdff1aSopenharmony_ci        closesocket(udp_fd);
392cabdff1aSopenharmony_ci    if(res0)
393cabdff1aSopenharmony_ci        freeaddrinfo(res0);
394cabdff1aSopenharmony_ci    return -1;
395cabdff1aSopenharmony_ci}
396cabdff1aSopenharmony_ci
397cabdff1aSopenharmony_cistatic int udp_port(struct sockaddr_storage *addr, int addr_len)
398cabdff1aSopenharmony_ci{
399cabdff1aSopenharmony_ci    char sbuf[sizeof(int)*3+1];
400cabdff1aSopenharmony_ci    int error;
401cabdff1aSopenharmony_ci
402cabdff1aSopenharmony_ci    if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0,  sbuf, sizeof(sbuf), NI_NUMERICSERV)) != 0) {
403cabdff1aSopenharmony_ci        av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error));
404cabdff1aSopenharmony_ci        return -1;
405cabdff1aSopenharmony_ci    }
406cabdff1aSopenharmony_ci
407cabdff1aSopenharmony_ci    return strtol(sbuf, NULL, 10);
408cabdff1aSopenharmony_ci}
409cabdff1aSopenharmony_ci
410cabdff1aSopenharmony_ci
411cabdff1aSopenharmony_ci/**
412cabdff1aSopenharmony_ci * If no filename is given to av_open_input_file because you want to
413cabdff1aSopenharmony_ci * get the local port first, then you must call this function to set
414cabdff1aSopenharmony_ci * the remote server address.
415cabdff1aSopenharmony_ci *
416cabdff1aSopenharmony_ci * url syntax: udp://host:port[?option=val...]
417cabdff1aSopenharmony_ci * option: 'ttl=n'       : set the ttl value (for multicast only)
418cabdff1aSopenharmony_ci *         'localport=n' : set the local port
419cabdff1aSopenharmony_ci *         'pkt_size=n'  : set max packet size
420cabdff1aSopenharmony_ci *         'reuse=1'     : enable reusing the socket
421cabdff1aSopenharmony_ci *         'overrun_nonfatal=1': survive in case of circular buffer overrun
422cabdff1aSopenharmony_ci *
423cabdff1aSopenharmony_ci * @param h media file context
424cabdff1aSopenharmony_ci * @param uri of the remote server
425cabdff1aSopenharmony_ci * @return zero if no error.
426cabdff1aSopenharmony_ci */
427cabdff1aSopenharmony_ciint ff_udp_set_remote_url(URLContext *h, const char *uri)
428cabdff1aSopenharmony_ci{
429cabdff1aSopenharmony_ci    UDPContext *s = h->priv_data;
430cabdff1aSopenharmony_ci    char hostname[256], buf[10];
431cabdff1aSopenharmony_ci    int port;
432cabdff1aSopenharmony_ci    const char *p;
433cabdff1aSopenharmony_ci
434cabdff1aSopenharmony_ci    av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
435cabdff1aSopenharmony_ci
436cabdff1aSopenharmony_ci    /* set the destination address */
437cabdff1aSopenharmony_ci    s->dest_addr_len = udp_set_url(h, &s->dest_addr, hostname, port);
438cabdff1aSopenharmony_ci    if (s->dest_addr_len < 0) {
439cabdff1aSopenharmony_ci        return AVERROR(EIO);
440cabdff1aSopenharmony_ci    }
441cabdff1aSopenharmony_ci    s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
442cabdff1aSopenharmony_ci    p = strchr(uri, '?');
443cabdff1aSopenharmony_ci    if (p) {
444cabdff1aSopenharmony_ci        if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
445cabdff1aSopenharmony_ci            int was_connected = s->is_connected;
446cabdff1aSopenharmony_ci            s->is_connected = strtol(buf, NULL, 10);
447cabdff1aSopenharmony_ci            if (s->is_connected && !was_connected) {
448cabdff1aSopenharmony_ci                if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
449cabdff1aSopenharmony_ci                            s->dest_addr_len)) {
450cabdff1aSopenharmony_ci                    s->is_connected = 0;
451cabdff1aSopenharmony_ci                    ff_log_net_error(h, AV_LOG_ERROR, "connect");
452cabdff1aSopenharmony_ci                    return AVERROR(EIO);
453cabdff1aSopenharmony_ci                }
454cabdff1aSopenharmony_ci            }
455cabdff1aSopenharmony_ci        }
456cabdff1aSopenharmony_ci    }
457cabdff1aSopenharmony_ci
458cabdff1aSopenharmony_ci    return 0;
459cabdff1aSopenharmony_ci}
460cabdff1aSopenharmony_ci
461cabdff1aSopenharmony_ci/**
462cabdff1aSopenharmony_ci * Return the local port used by the UDP connection
463cabdff1aSopenharmony_ci * @param h media file context
464cabdff1aSopenharmony_ci * @return the local port number
465cabdff1aSopenharmony_ci */
466cabdff1aSopenharmony_ciint ff_udp_get_local_port(URLContext *h)
467cabdff1aSopenharmony_ci{
468cabdff1aSopenharmony_ci    UDPContext *s = h->priv_data;
469cabdff1aSopenharmony_ci    return s->local_port;
470cabdff1aSopenharmony_ci}
471cabdff1aSopenharmony_ci
472cabdff1aSopenharmony_ci/**
473cabdff1aSopenharmony_ci * Return the udp file handle for select() usage to wait for several RTP
474cabdff1aSopenharmony_ci * streams at the same time.
475cabdff1aSopenharmony_ci * @param h media file context
476cabdff1aSopenharmony_ci */
477cabdff1aSopenharmony_cistatic int udp_get_file_handle(URLContext *h)
478cabdff1aSopenharmony_ci{
479cabdff1aSopenharmony_ci    UDPContext *s = h->priv_data;
480cabdff1aSopenharmony_ci    return s->udp_fd;
481cabdff1aSopenharmony_ci}
482cabdff1aSopenharmony_ci
483cabdff1aSopenharmony_ci#if HAVE_PTHREAD_CANCEL
484cabdff1aSopenharmony_cistatic void *circular_buffer_task_rx( void *_URLContext)
485cabdff1aSopenharmony_ci{
486cabdff1aSopenharmony_ci    URLContext *h = _URLContext;
487cabdff1aSopenharmony_ci    UDPContext *s = h->priv_data;
488cabdff1aSopenharmony_ci    int old_cancelstate;
489cabdff1aSopenharmony_ci
490cabdff1aSopenharmony_ci    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
491cabdff1aSopenharmony_ci    pthread_mutex_lock(&s->mutex);
492cabdff1aSopenharmony_ci    if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
493cabdff1aSopenharmony_ci        av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
494cabdff1aSopenharmony_ci        s->circular_buffer_error = AVERROR(EIO);
495cabdff1aSopenharmony_ci        goto end;
496cabdff1aSopenharmony_ci    }
497cabdff1aSopenharmony_ci    while(1) {
498cabdff1aSopenharmony_ci        int len;
499cabdff1aSopenharmony_ci        struct sockaddr_storage addr;
500cabdff1aSopenharmony_ci        socklen_t addr_len = sizeof(addr);
501cabdff1aSopenharmony_ci
502cabdff1aSopenharmony_ci        pthread_mutex_unlock(&s->mutex);
503cabdff1aSopenharmony_ci        /* Blocking operations are always cancellation points;
504cabdff1aSopenharmony_ci           see "General Information" / "Thread Cancelation Overview"
505cabdff1aSopenharmony_ci           in Single Unix. */
506cabdff1aSopenharmony_ci        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
507cabdff1aSopenharmony_ci        len = recvfrom(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0, (struct sockaddr *)&addr, &addr_len);
508cabdff1aSopenharmony_ci        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
509cabdff1aSopenharmony_ci        pthread_mutex_lock(&s->mutex);
510cabdff1aSopenharmony_ci        if (len < 0) {
511cabdff1aSopenharmony_ci            if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
512cabdff1aSopenharmony_ci                s->circular_buffer_error = ff_neterrno();
513cabdff1aSopenharmony_ci                goto end;
514cabdff1aSopenharmony_ci            }
515cabdff1aSopenharmony_ci            continue;
516cabdff1aSopenharmony_ci        }
517cabdff1aSopenharmony_ci        if (ff_ip_check_source_lists(&addr, &s->filters))
518cabdff1aSopenharmony_ci            continue;
519cabdff1aSopenharmony_ci        AV_WL32(s->tmp, len);
520cabdff1aSopenharmony_ci
521cabdff1aSopenharmony_ci        if (av_fifo_can_write(s->fifo) < len + 4) {
522cabdff1aSopenharmony_ci            /* No Space left */
523cabdff1aSopenharmony_ci            if (s->overrun_nonfatal) {
524cabdff1aSopenharmony_ci                av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
525cabdff1aSopenharmony_ci                        "Surviving due to overrun_nonfatal option\n");
526cabdff1aSopenharmony_ci                continue;
527cabdff1aSopenharmony_ci            } else {
528cabdff1aSopenharmony_ci                av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
529cabdff1aSopenharmony_ci                        "To avoid, increase fifo_size URL option. "
530cabdff1aSopenharmony_ci                        "To survive in such case, use overrun_nonfatal option\n");
531cabdff1aSopenharmony_ci                s->circular_buffer_error = AVERROR(EIO);
532cabdff1aSopenharmony_ci                goto end;
533cabdff1aSopenharmony_ci            }
534cabdff1aSopenharmony_ci        }
535cabdff1aSopenharmony_ci        av_fifo_write(s->fifo, s->tmp, len + 4);
536cabdff1aSopenharmony_ci        pthread_cond_signal(&s->cond);
537cabdff1aSopenharmony_ci    }
538cabdff1aSopenharmony_ci
539cabdff1aSopenharmony_ciend:
540cabdff1aSopenharmony_ci    pthread_cond_signal(&s->cond);
541cabdff1aSopenharmony_ci    pthread_mutex_unlock(&s->mutex);
542cabdff1aSopenharmony_ci    return NULL;
543cabdff1aSopenharmony_ci}
544cabdff1aSopenharmony_ci
545cabdff1aSopenharmony_cistatic void *circular_buffer_task_tx( void *_URLContext)
546cabdff1aSopenharmony_ci{
547cabdff1aSopenharmony_ci    URLContext *h = _URLContext;
548cabdff1aSopenharmony_ci    UDPContext *s = h->priv_data;
549cabdff1aSopenharmony_ci    int64_t target_timestamp = av_gettime_relative();
550cabdff1aSopenharmony_ci    int64_t start_timestamp = av_gettime_relative();
551cabdff1aSopenharmony_ci    int64_t sent_bits = 0;
552cabdff1aSopenharmony_ci    int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0;
553cabdff1aSopenharmony_ci    int64_t max_delay = s->bitrate ?  ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0;
554cabdff1aSopenharmony_ci
555cabdff1aSopenharmony_ci    pthread_mutex_lock(&s->mutex);
556cabdff1aSopenharmony_ci
557cabdff1aSopenharmony_ci    if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
558cabdff1aSopenharmony_ci        av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
559cabdff1aSopenharmony_ci        s->circular_buffer_error = AVERROR(EIO);
560cabdff1aSopenharmony_ci        goto end;
561cabdff1aSopenharmony_ci    }
562cabdff1aSopenharmony_ci
563cabdff1aSopenharmony_ci    for(;;) {
564cabdff1aSopenharmony_ci        int len;
565cabdff1aSopenharmony_ci        const uint8_t *p;
566cabdff1aSopenharmony_ci        uint8_t tmp[4];
567cabdff1aSopenharmony_ci        int64_t timestamp;
568cabdff1aSopenharmony_ci
569cabdff1aSopenharmony_ci        len = av_fifo_can_read(s->fifo);
570cabdff1aSopenharmony_ci
571cabdff1aSopenharmony_ci        while (len<4) {
572cabdff1aSopenharmony_ci            if (s->close_req)
573cabdff1aSopenharmony_ci                goto end;
574cabdff1aSopenharmony_ci            pthread_cond_wait(&s->cond, &s->mutex);
575cabdff1aSopenharmony_ci            len = av_fifo_can_read(s->fifo);
576cabdff1aSopenharmony_ci        }
577cabdff1aSopenharmony_ci
578cabdff1aSopenharmony_ci        av_fifo_read(s->fifo, tmp, 4);
579cabdff1aSopenharmony_ci        len = AV_RL32(tmp);
580cabdff1aSopenharmony_ci
581cabdff1aSopenharmony_ci        av_assert0(len >= 0);
582cabdff1aSopenharmony_ci        av_assert0(len <= sizeof(s->tmp));
583cabdff1aSopenharmony_ci
584cabdff1aSopenharmony_ci        av_fifo_read(s->fifo, s->tmp, len);
585cabdff1aSopenharmony_ci
586cabdff1aSopenharmony_ci        pthread_mutex_unlock(&s->mutex);
587cabdff1aSopenharmony_ci
588cabdff1aSopenharmony_ci        if (s->bitrate) {
589cabdff1aSopenharmony_ci            timestamp = av_gettime_relative();
590cabdff1aSopenharmony_ci            if (timestamp < target_timestamp) {
591cabdff1aSopenharmony_ci                int64_t delay = target_timestamp - timestamp;
592cabdff1aSopenharmony_ci                if (delay > max_delay) {
593cabdff1aSopenharmony_ci                    delay = max_delay;
594cabdff1aSopenharmony_ci                    start_timestamp = timestamp + delay;
595cabdff1aSopenharmony_ci                    sent_bits = 0;
596cabdff1aSopenharmony_ci                }
597cabdff1aSopenharmony_ci                av_usleep(delay);
598cabdff1aSopenharmony_ci            } else {
599cabdff1aSopenharmony_ci                if (timestamp - burst_interval > target_timestamp) {
600cabdff1aSopenharmony_ci                    start_timestamp = timestamp - burst_interval;
601cabdff1aSopenharmony_ci                    sent_bits = 0;
602cabdff1aSopenharmony_ci                }
603cabdff1aSopenharmony_ci            }
604cabdff1aSopenharmony_ci            sent_bits += len * 8;
605cabdff1aSopenharmony_ci            target_timestamp = start_timestamp + sent_bits * 1000000 / s->bitrate;
606cabdff1aSopenharmony_ci        }
607cabdff1aSopenharmony_ci
608cabdff1aSopenharmony_ci        p = s->tmp;
609cabdff1aSopenharmony_ci        while (len) {
610cabdff1aSopenharmony_ci            int ret;
611cabdff1aSopenharmony_ci            av_assert0(len > 0);
612cabdff1aSopenharmony_ci            if (!s->is_connected) {
613cabdff1aSopenharmony_ci                ret = sendto (s->udp_fd, p, len, 0,
614cabdff1aSopenharmony_ci                            (struct sockaddr *) &s->dest_addr,
615cabdff1aSopenharmony_ci                            s->dest_addr_len);
616cabdff1aSopenharmony_ci            } else
617cabdff1aSopenharmony_ci                ret = send(s->udp_fd, p, len, 0);
618cabdff1aSopenharmony_ci            if (ret >= 0) {
619cabdff1aSopenharmony_ci                len -= ret;
620cabdff1aSopenharmony_ci                p   += ret;
621cabdff1aSopenharmony_ci            } else {
622cabdff1aSopenharmony_ci                ret = ff_neterrno();
623cabdff1aSopenharmony_ci                if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) {
624cabdff1aSopenharmony_ci                    pthread_mutex_lock(&s->mutex);
625cabdff1aSopenharmony_ci                    s->circular_buffer_error = ret;
626cabdff1aSopenharmony_ci                    pthread_mutex_unlock(&s->mutex);
627cabdff1aSopenharmony_ci                    return NULL;
628cabdff1aSopenharmony_ci                }
629cabdff1aSopenharmony_ci            }
630cabdff1aSopenharmony_ci        }
631cabdff1aSopenharmony_ci
632cabdff1aSopenharmony_ci        pthread_mutex_lock(&s->mutex);
633cabdff1aSopenharmony_ci    }
634cabdff1aSopenharmony_ci
635cabdff1aSopenharmony_ciend:
636cabdff1aSopenharmony_ci    pthread_mutex_unlock(&s->mutex);
637cabdff1aSopenharmony_ci    return NULL;
638cabdff1aSopenharmony_ci}
639cabdff1aSopenharmony_ci
640cabdff1aSopenharmony_ci
641cabdff1aSopenharmony_ci#endif
642cabdff1aSopenharmony_ci
643cabdff1aSopenharmony_ci/* put it in UDP context */
644cabdff1aSopenharmony_ci/* return non zero if error */
645cabdff1aSopenharmony_cistatic int udp_open(URLContext *h, const char *uri, int flags)
646cabdff1aSopenharmony_ci{
647cabdff1aSopenharmony_ci    char hostname[1024];
648cabdff1aSopenharmony_ci    int port, udp_fd = -1, tmp, bind_ret = -1, dscp = -1;
649cabdff1aSopenharmony_ci    UDPContext *s = h->priv_data;
650cabdff1aSopenharmony_ci    int is_output;
651cabdff1aSopenharmony_ci    const char *p;
652cabdff1aSopenharmony_ci    char buf[256];
653cabdff1aSopenharmony_ci    struct sockaddr_storage my_addr;
654cabdff1aSopenharmony_ci    socklen_t len;
655cabdff1aSopenharmony_ci    int ret;
656cabdff1aSopenharmony_ci
657cabdff1aSopenharmony_ci    h->is_streamed = 1;
658cabdff1aSopenharmony_ci
659cabdff1aSopenharmony_ci    is_output = !(flags & AVIO_FLAG_READ);
660cabdff1aSopenharmony_ci    if (s->buffer_size < 0)
661cabdff1aSopenharmony_ci        s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_RX_BUF_SIZE;
662cabdff1aSopenharmony_ci
663cabdff1aSopenharmony_ci    if (s->sources) {
664cabdff1aSopenharmony_ci        if ((ret = ff_ip_parse_sources(h, s->sources, &s->filters)) < 0)
665cabdff1aSopenharmony_ci            goto fail;
666cabdff1aSopenharmony_ci    }
667cabdff1aSopenharmony_ci
668cabdff1aSopenharmony_ci    if (s->block) {
669cabdff1aSopenharmony_ci        if ((ret = ff_ip_parse_blocks(h, s->block, &s->filters)) < 0)
670cabdff1aSopenharmony_ci            goto fail;
671cabdff1aSopenharmony_ci    }
672cabdff1aSopenharmony_ci
673cabdff1aSopenharmony_ci    p = strchr(uri, '?');
674cabdff1aSopenharmony_ci    if (p) {
675cabdff1aSopenharmony_ci        if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
676cabdff1aSopenharmony_ci            char *endptr = NULL;
677cabdff1aSopenharmony_ci            s->reuse_socket = strtol(buf, &endptr, 10);
678cabdff1aSopenharmony_ci            /* assume if no digits were found it is a request to enable it */
679cabdff1aSopenharmony_ci            if (buf == endptr)
680cabdff1aSopenharmony_ci                s->reuse_socket = 1;
681cabdff1aSopenharmony_ci        }
682cabdff1aSopenharmony_ci        if (av_find_info_tag(buf, sizeof(buf), "overrun_nonfatal", p)) {
683cabdff1aSopenharmony_ci            char *endptr = NULL;
684cabdff1aSopenharmony_ci            s->overrun_nonfatal = strtol(buf, &endptr, 10);
685cabdff1aSopenharmony_ci            /* assume if no digits were found it is a request to enable it */
686cabdff1aSopenharmony_ci            if (buf == endptr)
687cabdff1aSopenharmony_ci                s->overrun_nonfatal = 1;
688cabdff1aSopenharmony_ci            if (!HAVE_PTHREAD_CANCEL)
689cabdff1aSopenharmony_ci                av_log(h, AV_LOG_WARNING,
690cabdff1aSopenharmony_ci                       "'overrun_nonfatal' option was set but it is not supported "
691cabdff1aSopenharmony_ci                       "on this build (pthread support is required)\n");
692cabdff1aSopenharmony_ci        }
693cabdff1aSopenharmony_ci        if (av_find_info_tag(buf, sizeof(buf), "ttl", p)) {
694cabdff1aSopenharmony_ci            s->ttl = strtol(buf, NULL, 10);
695cabdff1aSopenharmony_ci            if (s->ttl < 0 || s->ttl > 255) {
696cabdff1aSopenharmony_ci                av_log(h, AV_LOG_ERROR, "ttl(%d) should be in range [0,255]\n", s->ttl);
697cabdff1aSopenharmony_ci                ret = AVERROR(EINVAL);
698cabdff1aSopenharmony_ci                goto fail;
699cabdff1aSopenharmony_ci            }
700cabdff1aSopenharmony_ci        }
701cabdff1aSopenharmony_ci        if (av_find_info_tag(buf, sizeof(buf), "udplite_coverage", p)) {
702cabdff1aSopenharmony_ci            s->udplite_coverage = strtol(buf, NULL, 10);
703cabdff1aSopenharmony_ci        }
704cabdff1aSopenharmony_ci        if (av_find_info_tag(buf, sizeof(buf), "localport", p)) {
705cabdff1aSopenharmony_ci            s->local_port = strtol(buf, NULL, 10);
706cabdff1aSopenharmony_ci        }
707cabdff1aSopenharmony_ci        if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
708cabdff1aSopenharmony_ci            s->pkt_size = strtol(buf, NULL, 10);
709cabdff1aSopenharmony_ci        }
710cabdff1aSopenharmony_ci        if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) {
711cabdff1aSopenharmony_ci            s->buffer_size = strtol(buf, NULL, 10);
712cabdff1aSopenharmony_ci        }
713cabdff1aSopenharmony_ci        if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
714cabdff1aSopenharmony_ci            s->is_connected = strtol(buf, NULL, 10);
715cabdff1aSopenharmony_ci        }
716cabdff1aSopenharmony_ci        if (av_find_info_tag(buf, sizeof(buf), "dscp", p)) {
717cabdff1aSopenharmony_ci            dscp = strtol(buf, NULL, 10);
718cabdff1aSopenharmony_ci        }
719cabdff1aSopenharmony_ci        if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
720cabdff1aSopenharmony_ci            s->circular_buffer_size = strtol(buf, NULL, 10);
721cabdff1aSopenharmony_ci            if (!HAVE_PTHREAD_CANCEL)
722cabdff1aSopenharmony_ci                av_log(h, AV_LOG_WARNING,
723cabdff1aSopenharmony_ci                       "'circular_buffer_size' option was set but it is not supported "
724cabdff1aSopenharmony_ci                       "on this build (pthread support is required)\n");
725cabdff1aSopenharmony_ci        }
726cabdff1aSopenharmony_ci        if (av_find_info_tag(buf, sizeof(buf), "bitrate", p)) {
727cabdff1aSopenharmony_ci            s->bitrate = strtoll(buf, NULL, 10);
728cabdff1aSopenharmony_ci            if (!HAVE_PTHREAD_CANCEL)
729cabdff1aSopenharmony_ci                av_log(h, AV_LOG_WARNING,
730cabdff1aSopenharmony_ci                       "'bitrate' option was set but it is not supported "
731cabdff1aSopenharmony_ci                       "on this build (pthread support is required)\n");
732cabdff1aSopenharmony_ci        }
733cabdff1aSopenharmony_ci        if (av_find_info_tag(buf, sizeof(buf), "burst_bits", p)) {
734cabdff1aSopenharmony_ci            s->burst_bits = strtoll(buf, NULL, 10);
735cabdff1aSopenharmony_ci        }
736cabdff1aSopenharmony_ci        if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
737cabdff1aSopenharmony_ci            av_freep(&s->localaddr);
738cabdff1aSopenharmony_ci            s->localaddr = av_strdup(buf);
739cabdff1aSopenharmony_ci        }
740cabdff1aSopenharmony_ci        if (av_find_info_tag(buf, sizeof(buf), "sources", p)) {
741cabdff1aSopenharmony_ci            if ((ret = ff_ip_parse_sources(h, buf, &s->filters)) < 0)
742cabdff1aSopenharmony_ci                goto fail;
743cabdff1aSopenharmony_ci        }
744cabdff1aSopenharmony_ci        if (av_find_info_tag(buf, sizeof(buf), "block", p)) {
745cabdff1aSopenharmony_ci            if ((ret = ff_ip_parse_blocks(h, buf, &s->filters)) < 0)
746cabdff1aSopenharmony_ci                goto fail;
747cabdff1aSopenharmony_ci        }
748cabdff1aSopenharmony_ci        if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p))
749cabdff1aSopenharmony_ci            s->timeout = strtol(buf, NULL, 10);
750cabdff1aSopenharmony_ci        if (is_output && av_find_info_tag(buf, sizeof(buf), "broadcast", p))
751cabdff1aSopenharmony_ci            s->is_broadcast = strtol(buf, NULL, 10);
752cabdff1aSopenharmony_ci    }
753cabdff1aSopenharmony_ci    /* handling needed to support options picking from both AVOption and URL */
754cabdff1aSopenharmony_ci    s->circular_buffer_size *= 188;
755cabdff1aSopenharmony_ci    if (flags & AVIO_FLAG_WRITE) {
756cabdff1aSopenharmony_ci        h->max_packet_size = s->pkt_size;
757cabdff1aSopenharmony_ci    } else {
758cabdff1aSopenharmony_ci        h->max_packet_size = UDP_MAX_PKT_SIZE;
759cabdff1aSopenharmony_ci    }
760cabdff1aSopenharmony_ci    h->rw_timeout = s->timeout;
761cabdff1aSopenharmony_ci
762cabdff1aSopenharmony_ci    /* fill the dest addr */
763cabdff1aSopenharmony_ci    av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
764cabdff1aSopenharmony_ci
765cabdff1aSopenharmony_ci    /* XXX: fix av_url_split */
766cabdff1aSopenharmony_ci    if (hostname[0] == '\0' || hostname[0] == '?') {
767cabdff1aSopenharmony_ci        /* only accepts null hostname if input */
768cabdff1aSopenharmony_ci        if (!(flags & AVIO_FLAG_READ)) {
769cabdff1aSopenharmony_ci            ret = AVERROR(EINVAL);
770cabdff1aSopenharmony_ci            goto fail;
771cabdff1aSopenharmony_ci        }
772cabdff1aSopenharmony_ci    } else {
773cabdff1aSopenharmony_ci        if ((ret = ff_udp_set_remote_url(h, uri)) < 0)
774cabdff1aSopenharmony_ci            goto fail;
775cabdff1aSopenharmony_ci    }
776cabdff1aSopenharmony_ci
777cabdff1aSopenharmony_ci    if ((s->is_multicast || s->local_port <= 0) && (h->flags & AVIO_FLAG_READ))
778cabdff1aSopenharmony_ci        s->local_port = port;
779cabdff1aSopenharmony_ci
780cabdff1aSopenharmony_ci    udp_fd = udp_socket_create(h, &my_addr, &len, s->localaddr);
781cabdff1aSopenharmony_ci    if (udp_fd < 0) {
782cabdff1aSopenharmony_ci        ret = AVERROR(EIO);
783cabdff1aSopenharmony_ci        goto fail;
784cabdff1aSopenharmony_ci    }
785cabdff1aSopenharmony_ci
786cabdff1aSopenharmony_ci    s->local_addr_storage=my_addr; //store for future multicast join
787cabdff1aSopenharmony_ci
788cabdff1aSopenharmony_ci    /* Follow the requested reuse option, unless it's multicast in which
789cabdff1aSopenharmony_ci     * case enable reuse unless explicitly disabled.
790cabdff1aSopenharmony_ci     */
791cabdff1aSopenharmony_ci    if (s->reuse_socket > 0 || (s->is_multicast && s->reuse_socket < 0)) {
792cabdff1aSopenharmony_ci        s->reuse_socket = 1;
793cabdff1aSopenharmony_ci        if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0) {
794cabdff1aSopenharmony_ci            ret = ff_neterrno();
795cabdff1aSopenharmony_ci            goto fail;
796cabdff1aSopenharmony_ci        }
797cabdff1aSopenharmony_ci    }
798cabdff1aSopenharmony_ci
799cabdff1aSopenharmony_ci    if (s->is_broadcast) {
800cabdff1aSopenharmony_ci#ifdef SO_BROADCAST
801cabdff1aSopenharmony_ci        if (setsockopt (udp_fd, SOL_SOCKET, SO_BROADCAST, &(s->is_broadcast), sizeof(s->is_broadcast)) != 0) {
802cabdff1aSopenharmony_ci            ret = ff_neterrno();
803cabdff1aSopenharmony_ci            goto fail;
804cabdff1aSopenharmony_ci        }
805cabdff1aSopenharmony_ci#else
806cabdff1aSopenharmony_ci        ret = AVERROR(ENOSYS);
807cabdff1aSopenharmony_ci        goto fail;
808cabdff1aSopenharmony_ci#endif
809cabdff1aSopenharmony_ci    }
810cabdff1aSopenharmony_ci
811cabdff1aSopenharmony_ci    /* Set the checksum coverage for UDP-Lite (RFC 3828) for sending and receiving.
812cabdff1aSopenharmony_ci     * The receiver coverage has to be less than or equal to the sender coverage.
813cabdff1aSopenharmony_ci     * Otherwise, the receiver will drop all packets.
814cabdff1aSopenharmony_ci     */
815cabdff1aSopenharmony_ci    if (s->udplite_coverage) {
816cabdff1aSopenharmony_ci        if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_SEND_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0)
817cabdff1aSopenharmony_ci            av_log(h, AV_LOG_WARNING, "socket option UDPLITE_SEND_CSCOV not available");
818cabdff1aSopenharmony_ci
819cabdff1aSopenharmony_ci        if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_RECV_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0)
820cabdff1aSopenharmony_ci            av_log(h, AV_LOG_WARNING, "socket option UDPLITE_RECV_CSCOV not available");
821cabdff1aSopenharmony_ci    }
822cabdff1aSopenharmony_ci
823cabdff1aSopenharmony_ci    if (dscp >= 0) {
824cabdff1aSopenharmony_ci        dscp <<= 2;
825cabdff1aSopenharmony_ci        if (setsockopt (udp_fd, IPPROTO_IP, IP_TOS, &dscp, sizeof(dscp)) != 0) {
826cabdff1aSopenharmony_ci            ret = ff_neterrno();
827cabdff1aSopenharmony_ci            goto fail;
828cabdff1aSopenharmony_ci        }
829cabdff1aSopenharmony_ci    }
830cabdff1aSopenharmony_ci
831cabdff1aSopenharmony_ci    /* If multicast, try binding the multicast address first, to avoid
832cabdff1aSopenharmony_ci     * receiving UDP packets from other sources aimed at the same UDP
833cabdff1aSopenharmony_ci     * port. This fails on windows. This makes sending to the same address
834cabdff1aSopenharmony_ci     * using sendto() fail, so only do it if we're opened in read-only mode. */
835cabdff1aSopenharmony_ci    if (s->is_multicast && (h->flags & AVIO_FLAG_READ)) {
836cabdff1aSopenharmony_ci        bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len);
837cabdff1aSopenharmony_ci    }
838cabdff1aSopenharmony_ci    /* bind to the local address if not multicast or if the multicast
839cabdff1aSopenharmony_ci     * bind failed */
840cabdff1aSopenharmony_ci    /* the bind is needed to give a port to the socket now */
841cabdff1aSopenharmony_ci    if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) {
842cabdff1aSopenharmony_ci        ff_log_net_error(h, AV_LOG_ERROR, "bind failed");
843cabdff1aSopenharmony_ci        ret = ff_neterrno();
844cabdff1aSopenharmony_ci        goto fail;
845cabdff1aSopenharmony_ci    }
846cabdff1aSopenharmony_ci
847cabdff1aSopenharmony_ci    len = sizeof(my_addr);
848cabdff1aSopenharmony_ci    getsockname(udp_fd, (struct sockaddr *)&my_addr, &len);
849cabdff1aSopenharmony_ci    s->local_port = udp_port(&my_addr, len);
850cabdff1aSopenharmony_ci
851cabdff1aSopenharmony_ci    if (s->is_multicast) {
852cabdff1aSopenharmony_ci        if (h->flags & AVIO_FLAG_WRITE) {
853cabdff1aSopenharmony_ci            /* output */
854cabdff1aSopenharmony_ci            if ((ret = udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr, h)) < 0)
855cabdff1aSopenharmony_ci                goto fail;
856cabdff1aSopenharmony_ci        }
857cabdff1aSopenharmony_ci        if (h->flags & AVIO_FLAG_READ) {
858cabdff1aSopenharmony_ci            /* input */
859cabdff1aSopenharmony_ci            if (s->filters.nb_include_addrs) {
860cabdff1aSopenharmony_ci                if ((ret = udp_set_multicast_sources(h, udp_fd,
861cabdff1aSopenharmony_ci                                              (struct sockaddr *)&s->dest_addr,
862cabdff1aSopenharmony_ci                                              s->dest_addr_len, &s->local_addr_storage,
863cabdff1aSopenharmony_ci                                              s->filters.include_addrs,
864cabdff1aSopenharmony_ci                                              s->filters.nb_include_addrs, 1)) < 0)
865cabdff1aSopenharmony_ci                    goto fail;
866cabdff1aSopenharmony_ci            } else {
867cabdff1aSopenharmony_ci                if ((ret = udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr,
868cabdff1aSopenharmony_ci                                                    (struct sockaddr *)&s->local_addr_storage, h)) < 0)
869cabdff1aSopenharmony_ci                    goto fail;
870cabdff1aSopenharmony_ci            }
871cabdff1aSopenharmony_ci            if (s->filters.nb_exclude_addrs) {
872cabdff1aSopenharmony_ci                if ((ret = udp_set_multicast_sources(h, udp_fd,
873cabdff1aSopenharmony_ci                                              (struct sockaddr *)&s->dest_addr,
874cabdff1aSopenharmony_ci                                              s->dest_addr_len, &s->local_addr_storage,
875cabdff1aSopenharmony_ci                                              s->filters.exclude_addrs,
876cabdff1aSopenharmony_ci                                              s->filters.nb_exclude_addrs, 0)) < 0)
877cabdff1aSopenharmony_ci                    goto fail;
878cabdff1aSopenharmony_ci            }
879cabdff1aSopenharmony_ci        }
880cabdff1aSopenharmony_ci    }
881cabdff1aSopenharmony_ci
882cabdff1aSopenharmony_ci    if (is_output) {
883cabdff1aSopenharmony_ci        /* limit the tx buf size to limit latency */
884cabdff1aSopenharmony_ci        tmp = s->buffer_size;
885cabdff1aSopenharmony_ci        if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
886cabdff1aSopenharmony_ci            ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)");
887cabdff1aSopenharmony_ci            ret = ff_neterrno();
888cabdff1aSopenharmony_ci            goto fail;
889cabdff1aSopenharmony_ci        }
890cabdff1aSopenharmony_ci    } else {
891cabdff1aSopenharmony_ci        /* set udp recv buffer size to the requested value (default UDP_RX_BUF_SIZE) */
892cabdff1aSopenharmony_ci        tmp = s->buffer_size;
893cabdff1aSopenharmony_ci        if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
894cabdff1aSopenharmony_ci            ff_log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)");
895cabdff1aSopenharmony_ci        }
896cabdff1aSopenharmony_ci        len = sizeof(tmp);
897cabdff1aSopenharmony_ci        if (getsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, &len) < 0) {
898cabdff1aSopenharmony_ci            ff_log_net_error(h, AV_LOG_WARNING, "getsockopt(SO_RCVBUF)");
899cabdff1aSopenharmony_ci        } else {
900cabdff1aSopenharmony_ci            av_log(h, AV_LOG_DEBUG, "end receive buffer size reported is %d\n", tmp);
901cabdff1aSopenharmony_ci            if(tmp < s->buffer_size)
902cabdff1aSopenharmony_ci                av_log(h, AV_LOG_WARNING, "attempted to set receive buffer to size %d but it only ended up set as %d\n", s->buffer_size, tmp);
903cabdff1aSopenharmony_ci        }
904cabdff1aSopenharmony_ci
905cabdff1aSopenharmony_ci        /* make the socket non-blocking */
906cabdff1aSopenharmony_ci        ff_socket_nonblock(udp_fd, 1);
907cabdff1aSopenharmony_ci    }
908cabdff1aSopenharmony_ci    if (s->is_connected) {
909cabdff1aSopenharmony_ci        if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
910cabdff1aSopenharmony_ci            ff_log_net_error(h, AV_LOG_ERROR, "connect");
911cabdff1aSopenharmony_ci            ret = ff_neterrno();
912cabdff1aSopenharmony_ci            goto fail;
913cabdff1aSopenharmony_ci        }
914cabdff1aSopenharmony_ci    }
915cabdff1aSopenharmony_ci
916cabdff1aSopenharmony_ci    s->udp_fd = udp_fd;
917cabdff1aSopenharmony_ci
918cabdff1aSopenharmony_ci#if HAVE_PTHREAD_CANCEL
919cabdff1aSopenharmony_ci    /*
920cabdff1aSopenharmony_ci      Create thread in case of:
921cabdff1aSopenharmony_ci      1. Input and circular_buffer_size is set
922cabdff1aSopenharmony_ci      2. Output and bitrate and circular_buffer_size is set
923cabdff1aSopenharmony_ci    */
924cabdff1aSopenharmony_ci
925cabdff1aSopenharmony_ci    if (is_output && s->bitrate && !s->circular_buffer_size) {
926cabdff1aSopenharmony_ci        /* Warn user in case of 'circular_buffer_size' is not set */
927cabdff1aSopenharmony_ci        av_log(h, AV_LOG_WARNING,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
928cabdff1aSopenharmony_ci    }
929cabdff1aSopenharmony_ci
930cabdff1aSopenharmony_ci    if ((!is_output && s->circular_buffer_size) || (is_output && s->bitrate && s->circular_buffer_size)) {
931cabdff1aSopenharmony_ci        /* start the task going */
932cabdff1aSopenharmony_ci        s->fifo = av_fifo_alloc2(s->circular_buffer_size, 1, 0);
933cabdff1aSopenharmony_ci        if (!s->fifo) {
934cabdff1aSopenharmony_ci            ret = AVERROR(ENOMEM);
935cabdff1aSopenharmony_ci            goto fail;
936cabdff1aSopenharmony_ci        }
937cabdff1aSopenharmony_ci        ret = pthread_mutex_init(&s->mutex, NULL);
938cabdff1aSopenharmony_ci        if (ret != 0) {
939cabdff1aSopenharmony_ci            av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
940cabdff1aSopenharmony_ci            ret = AVERROR(ret);
941cabdff1aSopenharmony_ci            goto fail;
942cabdff1aSopenharmony_ci        }
943cabdff1aSopenharmony_ci        ret = pthread_cond_init(&s->cond, NULL);
944cabdff1aSopenharmony_ci        if (ret != 0) {
945cabdff1aSopenharmony_ci            av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
946cabdff1aSopenharmony_ci            ret = AVERROR(ret);
947cabdff1aSopenharmony_ci            goto cond_fail;
948cabdff1aSopenharmony_ci        }
949cabdff1aSopenharmony_ci        ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h);
950cabdff1aSopenharmony_ci        if (ret != 0) {
951cabdff1aSopenharmony_ci            av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
952cabdff1aSopenharmony_ci            ret = AVERROR(ret);
953cabdff1aSopenharmony_ci            goto thread_fail;
954cabdff1aSopenharmony_ci        }
955cabdff1aSopenharmony_ci        s->thread_started = 1;
956cabdff1aSopenharmony_ci    }
957cabdff1aSopenharmony_ci#endif
958cabdff1aSopenharmony_ci
959cabdff1aSopenharmony_ci    return 0;
960cabdff1aSopenharmony_ci#if HAVE_PTHREAD_CANCEL
961cabdff1aSopenharmony_ci thread_fail:
962cabdff1aSopenharmony_ci    pthread_cond_destroy(&s->cond);
963cabdff1aSopenharmony_ci cond_fail:
964cabdff1aSopenharmony_ci    pthread_mutex_destroy(&s->mutex);
965cabdff1aSopenharmony_ci#endif
966cabdff1aSopenharmony_ci fail:
967cabdff1aSopenharmony_ci    if (udp_fd >= 0)
968cabdff1aSopenharmony_ci        closesocket(udp_fd);
969cabdff1aSopenharmony_ci    av_fifo_freep2(&s->fifo);
970cabdff1aSopenharmony_ci    ff_ip_reset_filters(&s->filters);
971cabdff1aSopenharmony_ci    return ret;
972cabdff1aSopenharmony_ci}
973cabdff1aSopenharmony_ci
974cabdff1aSopenharmony_cistatic int udplite_open(URLContext *h, const char *uri, int flags)
975cabdff1aSopenharmony_ci{
976cabdff1aSopenharmony_ci    UDPContext *s = h->priv_data;
977cabdff1aSopenharmony_ci
978cabdff1aSopenharmony_ci    // set default checksum coverage
979cabdff1aSopenharmony_ci    s->udplite_coverage = UDP_HEADER_SIZE;
980cabdff1aSopenharmony_ci
981cabdff1aSopenharmony_ci    return udp_open(h, uri, flags);
982cabdff1aSopenharmony_ci}
983cabdff1aSopenharmony_ci
984cabdff1aSopenharmony_cistatic int udp_read(URLContext *h, uint8_t *buf, int size)
985cabdff1aSopenharmony_ci{
986cabdff1aSopenharmony_ci    UDPContext *s = h->priv_data;
987cabdff1aSopenharmony_ci    int ret;
988cabdff1aSopenharmony_ci    struct sockaddr_storage addr;
989cabdff1aSopenharmony_ci    socklen_t addr_len = sizeof(addr);
990cabdff1aSopenharmony_ci#if HAVE_PTHREAD_CANCEL
991cabdff1aSopenharmony_ci    int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
992cabdff1aSopenharmony_ci
993cabdff1aSopenharmony_ci    if (s->fifo) {
994cabdff1aSopenharmony_ci        pthread_mutex_lock(&s->mutex);
995cabdff1aSopenharmony_ci        do {
996cabdff1aSopenharmony_ci            avail = av_fifo_can_read(s->fifo);
997cabdff1aSopenharmony_ci            if (avail) { // >=size) {
998cabdff1aSopenharmony_ci                uint8_t tmp[4];
999cabdff1aSopenharmony_ci
1000cabdff1aSopenharmony_ci                av_fifo_read(s->fifo, tmp, 4);
1001cabdff1aSopenharmony_ci                avail = AV_RL32(tmp);
1002cabdff1aSopenharmony_ci                if(avail > size){
1003cabdff1aSopenharmony_ci                    av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
1004cabdff1aSopenharmony_ci                    avail = size;
1005cabdff1aSopenharmony_ci                }
1006cabdff1aSopenharmony_ci
1007cabdff1aSopenharmony_ci                av_fifo_read(s->fifo, buf, avail);
1008cabdff1aSopenharmony_ci                av_fifo_drain2(s->fifo, AV_RL32(tmp) - avail);
1009cabdff1aSopenharmony_ci                pthread_mutex_unlock(&s->mutex);
1010cabdff1aSopenharmony_ci                return avail;
1011cabdff1aSopenharmony_ci            } else if(s->circular_buffer_error){
1012cabdff1aSopenharmony_ci                int err = s->circular_buffer_error;
1013cabdff1aSopenharmony_ci                pthread_mutex_unlock(&s->mutex);
1014cabdff1aSopenharmony_ci                return err;
1015cabdff1aSopenharmony_ci            } else if(nonblock) {
1016cabdff1aSopenharmony_ci                pthread_mutex_unlock(&s->mutex);
1017cabdff1aSopenharmony_ci                return AVERROR(EAGAIN);
1018cabdff1aSopenharmony_ci            } else {
1019cabdff1aSopenharmony_ci                /* FIXME: using the monotonic clock would be better,
1020cabdff1aSopenharmony_ci                   but it does not exist on all supported platforms. */
1021cabdff1aSopenharmony_ci                int64_t t = av_gettime() + 100000;
1022cabdff1aSopenharmony_ci                struct timespec tv = { .tv_sec  =  t / 1000000,
1023cabdff1aSopenharmony_ci                                       .tv_nsec = (t % 1000000) * 1000 };
1024cabdff1aSopenharmony_ci                int err = pthread_cond_timedwait(&s->cond, &s->mutex, &tv);
1025cabdff1aSopenharmony_ci                if (err) {
1026cabdff1aSopenharmony_ci                    pthread_mutex_unlock(&s->mutex);
1027cabdff1aSopenharmony_ci                    return AVERROR(err == ETIMEDOUT ? EAGAIN : err);
1028cabdff1aSopenharmony_ci                }
1029cabdff1aSopenharmony_ci                nonblock = 1;
1030cabdff1aSopenharmony_ci            }
1031cabdff1aSopenharmony_ci        } while(1);
1032cabdff1aSopenharmony_ci    }
1033cabdff1aSopenharmony_ci#endif
1034cabdff1aSopenharmony_ci
1035cabdff1aSopenharmony_ci    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
1036cabdff1aSopenharmony_ci        ret = ff_network_wait_fd(s->udp_fd, 0);
1037cabdff1aSopenharmony_ci        if (ret < 0)
1038cabdff1aSopenharmony_ci            return ret;
1039cabdff1aSopenharmony_ci    }
1040cabdff1aSopenharmony_ci    ret = recvfrom(s->udp_fd, buf, size, 0, (struct sockaddr *)&addr, &addr_len);
1041cabdff1aSopenharmony_ci    if (ret < 0)
1042cabdff1aSopenharmony_ci        return ff_neterrno();
1043cabdff1aSopenharmony_ci    if (ff_ip_check_source_lists(&addr, &s->filters))
1044cabdff1aSopenharmony_ci        return AVERROR(EINTR);
1045cabdff1aSopenharmony_ci    return ret;
1046cabdff1aSopenharmony_ci}
1047cabdff1aSopenharmony_ci
1048cabdff1aSopenharmony_cistatic int udp_write(URLContext *h, const uint8_t *buf, int size)
1049cabdff1aSopenharmony_ci{
1050cabdff1aSopenharmony_ci    UDPContext *s = h->priv_data;
1051cabdff1aSopenharmony_ci    int ret;
1052cabdff1aSopenharmony_ci
1053cabdff1aSopenharmony_ci#if HAVE_PTHREAD_CANCEL
1054cabdff1aSopenharmony_ci    if (s->fifo) {
1055cabdff1aSopenharmony_ci        uint8_t tmp[4];
1056cabdff1aSopenharmony_ci
1057cabdff1aSopenharmony_ci        pthread_mutex_lock(&s->mutex);
1058cabdff1aSopenharmony_ci
1059cabdff1aSopenharmony_ci        /*
1060cabdff1aSopenharmony_ci          Return error if last tx failed.
1061cabdff1aSopenharmony_ci          Here we can't know on which packet error was, but it needs to know that error exists.
1062cabdff1aSopenharmony_ci        */
1063cabdff1aSopenharmony_ci        if (s->circular_buffer_error<0) {
1064cabdff1aSopenharmony_ci            int err = s->circular_buffer_error;
1065cabdff1aSopenharmony_ci            pthread_mutex_unlock(&s->mutex);
1066cabdff1aSopenharmony_ci            return err;
1067cabdff1aSopenharmony_ci        }
1068cabdff1aSopenharmony_ci
1069cabdff1aSopenharmony_ci        if (av_fifo_can_write(s->fifo) < size + 4) {
1070cabdff1aSopenharmony_ci            /* What about a partial packet tx ? */
1071cabdff1aSopenharmony_ci            pthread_mutex_unlock(&s->mutex);
1072cabdff1aSopenharmony_ci            return AVERROR(ENOMEM);
1073cabdff1aSopenharmony_ci        }
1074cabdff1aSopenharmony_ci        AV_WL32(tmp, size);
1075cabdff1aSopenharmony_ci        av_fifo_write(s->fifo, tmp, 4); /* size of packet */
1076cabdff1aSopenharmony_ci        av_fifo_write(s->fifo, buf, size); /* the data */
1077cabdff1aSopenharmony_ci        pthread_cond_signal(&s->cond);
1078cabdff1aSopenharmony_ci        pthread_mutex_unlock(&s->mutex);
1079cabdff1aSopenharmony_ci        return size;
1080cabdff1aSopenharmony_ci    }
1081cabdff1aSopenharmony_ci#endif
1082cabdff1aSopenharmony_ci    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
1083cabdff1aSopenharmony_ci        ret = ff_network_wait_fd(s->udp_fd, 1);
1084cabdff1aSopenharmony_ci        if (ret < 0)
1085cabdff1aSopenharmony_ci            return ret;
1086cabdff1aSopenharmony_ci    }
1087cabdff1aSopenharmony_ci
1088cabdff1aSopenharmony_ci    if (!s->is_connected) {
1089cabdff1aSopenharmony_ci        ret = sendto (s->udp_fd, buf, size, 0,
1090cabdff1aSopenharmony_ci                      (struct sockaddr *) &s->dest_addr,
1091cabdff1aSopenharmony_ci                      s->dest_addr_len);
1092cabdff1aSopenharmony_ci    } else
1093cabdff1aSopenharmony_ci        ret = send(s->udp_fd, buf, size, 0);
1094cabdff1aSopenharmony_ci
1095cabdff1aSopenharmony_ci    return ret < 0 ? ff_neterrno() : ret;
1096cabdff1aSopenharmony_ci}
1097cabdff1aSopenharmony_ci
1098cabdff1aSopenharmony_cistatic int udp_close(URLContext *h)
1099cabdff1aSopenharmony_ci{
1100cabdff1aSopenharmony_ci    UDPContext *s = h->priv_data;
1101cabdff1aSopenharmony_ci
1102cabdff1aSopenharmony_ci#if HAVE_PTHREAD_CANCEL
1103cabdff1aSopenharmony_ci    // Request close once writing is finished
1104cabdff1aSopenharmony_ci    if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) {
1105cabdff1aSopenharmony_ci        pthread_mutex_lock(&s->mutex);
1106cabdff1aSopenharmony_ci        s->close_req = 1;
1107cabdff1aSopenharmony_ci        pthread_cond_signal(&s->cond);
1108cabdff1aSopenharmony_ci        pthread_mutex_unlock(&s->mutex);
1109cabdff1aSopenharmony_ci    }
1110cabdff1aSopenharmony_ci#endif
1111cabdff1aSopenharmony_ci
1112cabdff1aSopenharmony_ci    if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
1113cabdff1aSopenharmony_ci        udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,
1114cabdff1aSopenharmony_ci                                  (struct sockaddr *)&s->local_addr_storage, h);
1115cabdff1aSopenharmony_ci#if HAVE_PTHREAD_CANCEL
1116cabdff1aSopenharmony_ci    if (s->thread_started) {
1117cabdff1aSopenharmony_ci        int ret;
1118cabdff1aSopenharmony_ci        // Cancel only read, as write has been signaled as success to the user
1119cabdff1aSopenharmony_ci        if (h->flags & AVIO_FLAG_READ) {
1120cabdff1aSopenharmony_ci#ifdef _WIN32
1121cabdff1aSopenharmony_ci            /* recvfrom() is not a cancellation point for win32, so we shutdown
1122cabdff1aSopenharmony_ci             * the socket and abort pending IO, subsequent recvfrom() calls
1123cabdff1aSopenharmony_ci             * will fail with WSAESHUTDOWN causing the thread to exit. */
1124cabdff1aSopenharmony_ci            shutdown(s->udp_fd, SD_RECEIVE);
1125cabdff1aSopenharmony_ci            CancelIoEx((HANDLE)(SOCKET)s->udp_fd, NULL);
1126cabdff1aSopenharmony_ci#else
1127cabdff1aSopenharmony_ci            pthread_cancel(s->circular_buffer_thread);
1128cabdff1aSopenharmony_ci#endif
1129cabdff1aSopenharmony_ci        }
1130cabdff1aSopenharmony_ci        ret = pthread_join(s->circular_buffer_thread, NULL);
1131cabdff1aSopenharmony_ci        if (ret != 0)
1132cabdff1aSopenharmony_ci            av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
1133cabdff1aSopenharmony_ci        pthread_mutex_destroy(&s->mutex);
1134cabdff1aSopenharmony_ci        pthread_cond_destroy(&s->cond);
1135cabdff1aSopenharmony_ci    }
1136cabdff1aSopenharmony_ci#endif
1137cabdff1aSopenharmony_ci    closesocket(s->udp_fd);
1138cabdff1aSopenharmony_ci    av_fifo_freep2(&s->fifo);
1139cabdff1aSopenharmony_ci    ff_ip_reset_filters(&s->filters);
1140cabdff1aSopenharmony_ci    return 0;
1141cabdff1aSopenharmony_ci}
1142cabdff1aSopenharmony_ci
1143cabdff1aSopenharmony_ciconst URLProtocol ff_udp_protocol = {
1144cabdff1aSopenharmony_ci    .name                = "udp",
1145cabdff1aSopenharmony_ci    .url_open            = udp_open,
1146cabdff1aSopenharmony_ci    .url_read            = udp_read,
1147cabdff1aSopenharmony_ci    .url_write           = udp_write,
1148cabdff1aSopenharmony_ci    .url_close           = udp_close,
1149cabdff1aSopenharmony_ci    .url_get_file_handle = udp_get_file_handle,
1150cabdff1aSopenharmony_ci    .priv_data_size      = sizeof(UDPContext),
1151cabdff1aSopenharmony_ci    .priv_data_class     = &udp_class,
1152cabdff1aSopenharmony_ci    .flags               = URL_PROTOCOL_FLAG_NETWORK,
1153cabdff1aSopenharmony_ci};
1154cabdff1aSopenharmony_ci
1155cabdff1aSopenharmony_ciconst URLProtocol ff_udplite_protocol = {
1156cabdff1aSopenharmony_ci    .name                = "udplite",
1157cabdff1aSopenharmony_ci    .url_open            = udplite_open,
1158cabdff1aSopenharmony_ci    .url_read            = udp_read,
1159cabdff1aSopenharmony_ci    .url_write           = udp_write,
1160cabdff1aSopenharmony_ci    .url_close           = udp_close,
1161cabdff1aSopenharmony_ci    .url_get_file_handle = udp_get_file_handle,
1162cabdff1aSopenharmony_ci    .priv_data_size      = sizeof(UDPContext),
1163cabdff1aSopenharmony_ci    .priv_data_class     = &udplite_context_class,
1164cabdff1aSopenharmony_ci    .flags               = URL_PROTOCOL_FLAG_NETWORK,
1165cabdff1aSopenharmony_ci};
1166