1cabdff1aSopenharmony_ci/*
2cabdff1aSopenharmony_ci * Input async protocol.
3cabdff1aSopenharmony_ci * Copyright (c) 2015 Zhang Rui <bbcallen@gmail.com>
4cabdff1aSopenharmony_ci *
5cabdff1aSopenharmony_ci * This file is part of FFmpeg.
6cabdff1aSopenharmony_ci *
7cabdff1aSopenharmony_ci * FFmpeg is free software; you can redistribute it and/or
8cabdff1aSopenharmony_ci * modify it under the terms of the GNU Lesser General Public
9cabdff1aSopenharmony_ci * License as published by the Free Software Foundation; either
10cabdff1aSopenharmony_ci * version 2.1 of the License, or (at your option) any later version.
11cabdff1aSopenharmony_ci *
12cabdff1aSopenharmony_ci * FFmpeg is distributed in the hope that it will be useful,
13cabdff1aSopenharmony_ci * but WITHOUT ANY WARRANTY; without even the implied warranty of
14cabdff1aSopenharmony_ci * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15cabdff1aSopenharmony_ci * Lesser General Public License for more details.
16cabdff1aSopenharmony_ci *
17cabdff1aSopenharmony_ci * You should have received a copy of the GNU Lesser General Public
18cabdff1aSopenharmony_ci * License along with FFmpeg; if not, write to the Free Software
19cabdff1aSopenharmony_ci * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20cabdff1aSopenharmony_ci *
21cabdff1aSopenharmony_ci * Based on libavformat/cache.c by Michael Niedermayer
22cabdff1aSopenharmony_ci */
23cabdff1aSopenharmony_ci
24cabdff1aSopenharmony_ci /**
25cabdff1aSopenharmony_ci * @TODO
26cabdff1aSopenharmony_ci *      support timeout
27cabdff1aSopenharmony_ci *      support work with concatdec, hls
28cabdff1aSopenharmony_ci */
29cabdff1aSopenharmony_ci
30cabdff1aSopenharmony_ci#include "libavutil/avassert.h"
31cabdff1aSopenharmony_ci#include "libavutil/avstring.h"
32cabdff1aSopenharmony_ci#include "libavutil/error.h"
33cabdff1aSopenharmony_ci#include "libavutil/fifo.h"
34cabdff1aSopenharmony_ci#include "libavutil/log.h"
35cabdff1aSopenharmony_ci#include "libavutil/opt.h"
36cabdff1aSopenharmony_ci#include "libavutil/thread.h"
37cabdff1aSopenharmony_ci#include "url.h"
38cabdff1aSopenharmony_ci#include <stdint.h>
39cabdff1aSopenharmony_ci
40cabdff1aSopenharmony_ci#if HAVE_UNISTD_H
41cabdff1aSopenharmony_ci#include <unistd.h>
42cabdff1aSopenharmony_ci#endif
43cabdff1aSopenharmony_ci
44cabdff1aSopenharmony_ci#define BUFFER_CAPACITY         (4 * 1024 * 1024)
45cabdff1aSopenharmony_ci#define READ_BACK_CAPACITY      (4 * 1024 * 1024)
46cabdff1aSopenharmony_ci#define SHORT_SEEK_THRESHOLD    (256 * 1024)
47cabdff1aSopenharmony_ci
48cabdff1aSopenharmony_citypedef struct RingBuffer
49cabdff1aSopenharmony_ci{
50cabdff1aSopenharmony_ci    AVFifo       *fifo;
51cabdff1aSopenharmony_ci    int           read_back_capacity;
52cabdff1aSopenharmony_ci
53cabdff1aSopenharmony_ci    int           read_pos;
54cabdff1aSopenharmony_ci} RingBuffer;
55cabdff1aSopenharmony_ci
56cabdff1aSopenharmony_citypedef struct Context {
57cabdff1aSopenharmony_ci    AVClass        *class;
58cabdff1aSopenharmony_ci    URLContext     *inner;
59cabdff1aSopenharmony_ci
60cabdff1aSopenharmony_ci    int             seek_request;
61cabdff1aSopenharmony_ci    int64_t         seek_pos;
62cabdff1aSopenharmony_ci    int             seek_whence;
63cabdff1aSopenharmony_ci    int             seek_completed;
64cabdff1aSopenharmony_ci    int64_t         seek_ret;
65cabdff1aSopenharmony_ci
66cabdff1aSopenharmony_ci    int             inner_io_error;
67cabdff1aSopenharmony_ci    int             io_error;
68cabdff1aSopenharmony_ci    int             io_eof_reached;
69cabdff1aSopenharmony_ci
70cabdff1aSopenharmony_ci    int64_t         logical_pos;
71cabdff1aSopenharmony_ci    int64_t         logical_size;
72cabdff1aSopenharmony_ci    RingBuffer      ring;
73cabdff1aSopenharmony_ci
74cabdff1aSopenharmony_ci    pthread_cond_t  cond_wakeup_main;
75cabdff1aSopenharmony_ci    pthread_cond_t  cond_wakeup_background;
76cabdff1aSopenharmony_ci    pthread_mutex_t mutex;
77cabdff1aSopenharmony_ci    pthread_t       async_buffer_thread;
78cabdff1aSopenharmony_ci
79cabdff1aSopenharmony_ci    int             abort_request;
80cabdff1aSopenharmony_ci    AVIOInterruptCB interrupt_callback;
81cabdff1aSopenharmony_ci} Context;
82cabdff1aSopenharmony_ci
83cabdff1aSopenharmony_cistatic int ring_init(RingBuffer *ring, unsigned int capacity, int read_back_capacity)
84cabdff1aSopenharmony_ci{
85cabdff1aSopenharmony_ci    memset(ring, 0, sizeof(RingBuffer));
86cabdff1aSopenharmony_ci    ring->fifo = av_fifo_alloc2(capacity + read_back_capacity, 1, 0);
87cabdff1aSopenharmony_ci    if (!ring->fifo)
88cabdff1aSopenharmony_ci        return AVERROR(ENOMEM);
89cabdff1aSopenharmony_ci
90cabdff1aSopenharmony_ci    ring->read_back_capacity = read_back_capacity;
91cabdff1aSopenharmony_ci    return 0;
92cabdff1aSopenharmony_ci}
93cabdff1aSopenharmony_ci
94cabdff1aSopenharmony_cistatic void ring_destroy(RingBuffer *ring)
95cabdff1aSopenharmony_ci{
96cabdff1aSopenharmony_ci    av_fifo_freep2(&ring->fifo);
97cabdff1aSopenharmony_ci}
98cabdff1aSopenharmony_ci
99cabdff1aSopenharmony_cistatic void ring_reset(RingBuffer *ring)
100cabdff1aSopenharmony_ci{
101cabdff1aSopenharmony_ci    av_fifo_reset2(ring->fifo);
102cabdff1aSopenharmony_ci    ring->read_pos = 0;
103cabdff1aSopenharmony_ci}
104cabdff1aSopenharmony_ci
105cabdff1aSopenharmony_cistatic int ring_size(RingBuffer *ring)
106cabdff1aSopenharmony_ci{
107cabdff1aSopenharmony_ci    return av_fifo_can_read(ring->fifo) - ring->read_pos;
108cabdff1aSopenharmony_ci}
109cabdff1aSopenharmony_ci
110cabdff1aSopenharmony_cistatic int ring_space(RingBuffer *ring)
111cabdff1aSopenharmony_ci{
112cabdff1aSopenharmony_ci    return av_fifo_can_write(ring->fifo);
113cabdff1aSopenharmony_ci}
114cabdff1aSopenharmony_ci
115cabdff1aSopenharmony_cistatic int ring_read(RingBuffer *ring, void *dest, int buf_size)
116cabdff1aSopenharmony_ci{
117cabdff1aSopenharmony_ci    int ret = 0;
118cabdff1aSopenharmony_ci
119cabdff1aSopenharmony_ci    av_assert2(buf_size <= ring_size(ring));
120cabdff1aSopenharmony_ci    if (dest)
121cabdff1aSopenharmony_ci        ret = av_fifo_peek(ring->fifo, dest, buf_size, ring->read_pos);
122cabdff1aSopenharmony_ci    ring->read_pos += buf_size;
123cabdff1aSopenharmony_ci
124cabdff1aSopenharmony_ci    if (ring->read_pos > ring->read_back_capacity) {
125cabdff1aSopenharmony_ci        av_fifo_drain2(ring->fifo, ring->read_pos - ring->read_back_capacity);
126cabdff1aSopenharmony_ci        ring->read_pos = ring->read_back_capacity;
127cabdff1aSopenharmony_ci    }
128cabdff1aSopenharmony_ci
129cabdff1aSopenharmony_ci    return ret;
130cabdff1aSopenharmony_ci}
131cabdff1aSopenharmony_ci
132cabdff1aSopenharmony_cistatic int wrapped_url_read(void *src, void *dst, size_t *size)
133cabdff1aSopenharmony_ci{
134cabdff1aSopenharmony_ci    URLContext *h   = src;
135cabdff1aSopenharmony_ci    Context    *c   = h->priv_data;
136cabdff1aSopenharmony_ci    int         ret;
137cabdff1aSopenharmony_ci
138cabdff1aSopenharmony_ci    ret = ffurl_read(c->inner, dst, *size);
139cabdff1aSopenharmony_ci    *size             = ret > 0 ? ret : 0;
140cabdff1aSopenharmony_ci    c->inner_io_error = ret < 0 ? ret : 0;
141cabdff1aSopenharmony_ci
142cabdff1aSopenharmony_ci    return c->inner_io_error;
143cabdff1aSopenharmony_ci}
144cabdff1aSopenharmony_ci
145cabdff1aSopenharmony_cistatic int ring_write(RingBuffer *ring, URLContext *h, size_t size)
146cabdff1aSopenharmony_ci{
147cabdff1aSopenharmony_ci    int ret;
148cabdff1aSopenharmony_ci
149cabdff1aSopenharmony_ci    av_assert2(size <= ring_space(ring));
150cabdff1aSopenharmony_ci    ret = av_fifo_write_from_cb(ring->fifo, wrapped_url_read, h, &size);
151cabdff1aSopenharmony_ci    if (ret < 0)
152cabdff1aSopenharmony_ci        return ret;
153cabdff1aSopenharmony_ci
154cabdff1aSopenharmony_ci    return size;
155cabdff1aSopenharmony_ci}
156cabdff1aSopenharmony_ci
157cabdff1aSopenharmony_cistatic int ring_size_of_read_back(RingBuffer *ring)
158cabdff1aSopenharmony_ci{
159cabdff1aSopenharmony_ci    return ring->read_pos;
160cabdff1aSopenharmony_ci}
161cabdff1aSopenharmony_ci
162cabdff1aSopenharmony_cistatic int ring_drain(RingBuffer *ring, int offset)
163cabdff1aSopenharmony_ci{
164cabdff1aSopenharmony_ci    av_assert2(offset >= -ring_size_of_read_back(ring));
165cabdff1aSopenharmony_ci    av_assert2(offset <= ring_size(ring));
166cabdff1aSopenharmony_ci    ring->read_pos += offset;
167cabdff1aSopenharmony_ci    return 0;
168cabdff1aSopenharmony_ci}
169cabdff1aSopenharmony_ci
170cabdff1aSopenharmony_cistatic int async_check_interrupt(void *arg)
171cabdff1aSopenharmony_ci{
172cabdff1aSopenharmony_ci    URLContext *h   = arg;
173cabdff1aSopenharmony_ci    Context    *c   = h->priv_data;
174cabdff1aSopenharmony_ci
175cabdff1aSopenharmony_ci    if (c->abort_request)
176cabdff1aSopenharmony_ci        return 1;
177cabdff1aSopenharmony_ci
178cabdff1aSopenharmony_ci    if (ff_check_interrupt(&c->interrupt_callback))
179cabdff1aSopenharmony_ci        c->abort_request = 1;
180cabdff1aSopenharmony_ci
181cabdff1aSopenharmony_ci    return c->abort_request;
182cabdff1aSopenharmony_ci}
183cabdff1aSopenharmony_ci
184cabdff1aSopenharmony_cistatic void *async_buffer_task(void *arg)
185cabdff1aSopenharmony_ci{
186cabdff1aSopenharmony_ci    URLContext   *h    = arg;
187cabdff1aSopenharmony_ci    Context      *c    = h->priv_data;
188cabdff1aSopenharmony_ci    RingBuffer   *ring = &c->ring;
189cabdff1aSopenharmony_ci    int           ret  = 0;
190cabdff1aSopenharmony_ci    int64_t       seek_ret;
191cabdff1aSopenharmony_ci
192cabdff1aSopenharmony_ci    while (1) {
193cabdff1aSopenharmony_ci        int fifo_space, to_copy;
194cabdff1aSopenharmony_ci
195cabdff1aSopenharmony_ci        pthread_mutex_lock(&c->mutex);
196cabdff1aSopenharmony_ci        if (async_check_interrupt(h)) {
197cabdff1aSopenharmony_ci            c->io_eof_reached = 1;
198cabdff1aSopenharmony_ci            c->io_error       = AVERROR_EXIT;
199cabdff1aSopenharmony_ci            pthread_cond_signal(&c->cond_wakeup_main);
200cabdff1aSopenharmony_ci            pthread_mutex_unlock(&c->mutex);
201cabdff1aSopenharmony_ci            break;
202cabdff1aSopenharmony_ci        }
203cabdff1aSopenharmony_ci
204cabdff1aSopenharmony_ci        if (c->seek_request) {
205cabdff1aSopenharmony_ci            seek_ret = ffurl_seek(c->inner, c->seek_pos, c->seek_whence);
206cabdff1aSopenharmony_ci            if (seek_ret >= 0) {
207cabdff1aSopenharmony_ci                c->io_eof_reached = 0;
208cabdff1aSopenharmony_ci                c->io_error       = 0;
209cabdff1aSopenharmony_ci                ring_reset(ring);
210cabdff1aSopenharmony_ci            }
211cabdff1aSopenharmony_ci
212cabdff1aSopenharmony_ci            c->seek_completed = 1;
213cabdff1aSopenharmony_ci            c->seek_ret       = seek_ret;
214cabdff1aSopenharmony_ci            c->seek_request   = 0;
215cabdff1aSopenharmony_ci
216cabdff1aSopenharmony_ci
217cabdff1aSopenharmony_ci            pthread_cond_signal(&c->cond_wakeup_main);
218cabdff1aSopenharmony_ci            pthread_mutex_unlock(&c->mutex);
219cabdff1aSopenharmony_ci            continue;
220cabdff1aSopenharmony_ci        }
221cabdff1aSopenharmony_ci
222cabdff1aSopenharmony_ci        fifo_space = ring_space(ring);
223cabdff1aSopenharmony_ci        if (c->io_eof_reached || fifo_space <= 0) {
224cabdff1aSopenharmony_ci            pthread_cond_signal(&c->cond_wakeup_main);
225cabdff1aSopenharmony_ci            pthread_cond_wait(&c->cond_wakeup_background, &c->mutex);
226cabdff1aSopenharmony_ci            pthread_mutex_unlock(&c->mutex);
227cabdff1aSopenharmony_ci            continue;
228cabdff1aSopenharmony_ci        }
229cabdff1aSopenharmony_ci        pthread_mutex_unlock(&c->mutex);
230cabdff1aSopenharmony_ci
231cabdff1aSopenharmony_ci        to_copy = FFMIN(4096, fifo_space);
232cabdff1aSopenharmony_ci        ret = ring_write(ring, h, to_copy);
233cabdff1aSopenharmony_ci
234cabdff1aSopenharmony_ci        pthread_mutex_lock(&c->mutex);
235cabdff1aSopenharmony_ci        if (ret <= 0) {
236cabdff1aSopenharmony_ci            c->io_eof_reached = 1;
237cabdff1aSopenharmony_ci            if (c->inner_io_error < 0)
238cabdff1aSopenharmony_ci                c->io_error = c->inner_io_error;
239cabdff1aSopenharmony_ci        }
240cabdff1aSopenharmony_ci
241cabdff1aSopenharmony_ci        pthread_cond_signal(&c->cond_wakeup_main);
242cabdff1aSopenharmony_ci        pthread_mutex_unlock(&c->mutex);
243cabdff1aSopenharmony_ci    }
244cabdff1aSopenharmony_ci
245cabdff1aSopenharmony_ci    return NULL;
246cabdff1aSopenharmony_ci}
247cabdff1aSopenharmony_ci
248cabdff1aSopenharmony_cistatic int async_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
249cabdff1aSopenharmony_ci{
250cabdff1aSopenharmony_ci    Context         *c = h->priv_data;
251cabdff1aSopenharmony_ci    int              ret;
252cabdff1aSopenharmony_ci    AVIOInterruptCB  interrupt_callback = {.callback = async_check_interrupt, .opaque = h};
253cabdff1aSopenharmony_ci
254cabdff1aSopenharmony_ci    av_strstart(arg, "async:", &arg);
255cabdff1aSopenharmony_ci
256cabdff1aSopenharmony_ci    ret = ring_init(&c->ring, BUFFER_CAPACITY, READ_BACK_CAPACITY);
257cabdff1aSopenharmony_ci    if (ret < 0)
258cabdff1aSopenharmony_ci        goto fifo_fail;
259cabdff1aSopenharmony_ci
260cabdff1aSopenharmony_ci    /* wrap interrupt callback */
261cabdff1aSopenharmony_ci    c->interrupt_callback = h->interrupt_callback;
262cabdff1aSopenharmony_ci    ret = ffurl_open_whitelist(&c->inner, arg, flags, &interrupt_callback, options, h->protocol_whitelist, h->protocol_blacklist, h);
263cabdff1aSopenharmony_ci    if (ret != 0) {
264cabdff1aSopenharmony_ci        av_log(h, AV_LOG_ERROR, "ffurl_open failed : %s, %s\n", av_err2str(ret), arg);
265cabdff1aSopenharmony_ci        goto url_fail;
266cabdff1aSopenharmony_ci    }
267cabdff1aSopenharmony_ci
268cabdff1aSopenharmony_ci    c->logical_size = ffurl_size(c->inner);
269cabdff1aSopenharmony_ci    h->is_streamed  = c->inner->is_streamed;
270cabdff1aSopenharmony_ci
271cabdff1aSopenharmony_ci    ret = pthread_mutex_init(&c->mutex, NULL);
272cabdff1aSopenharmony_ci    if (ret != 0) {
273cabdff1aSopenharmony_ci        ret = AVERROR(ret);
274cabdff1aSopenharmony_ci        av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", av_err2str(ret));
275cabdff1aSopenharmony_ci        goto mutex_fail;
276cabdff1aSopenharmony_ci    }
277cabdff1aSopenharmony_ci
278cabdff1aSopenharmony_ci    ret = pthread_cond_init(&c->cond_wakeup_main, NULL);
279cabdff1aSopenharmony_ci    if (ret != 0) {
280cabdff1aSopenharmony_ci        ret = AVERROR(ret);
281cabdff1aSopenharmony_ci        av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
282cabdff1aSopenharmony_ci        goto cond_wakeup_main_fail;
283cabdff1aSopenharmony_ci    }
284cabdff1aSopenharmony_ci
285cabdff1aSopenharmony_ci    ret = pthread_cond_init(&c->cond_wakeup_background, NULL);
286cabdff1aSopenharmony_ci    if (ret != 0) {
287cabdff1aSopenharmony_ci        ret = AVERROR(ret);
288cabdff1aSopenharmony_ci        av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
289cabdff1aSopenharmony_ci        goto cond_wakeup_background_fail;
290cabdff1aSopenharmony_ci    }
291cabdff1aSopenharmony_ci
292cabdff1aSopenharmony_ci    ret = pthread_create(&c->async_buffer_thread, NULL, async_buffer_task, h);
293cabdff1aSopenharmony_ci    if (ret) {
294cabdff1aSopenharmony_ci        ret = AVERROR(ret);
295cabdff1aSopenharmony_ci        av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", av_err2str(ret));
296cabdff1aSopenharmony_ci        goto thread_fail;
297cabdff1aSopenharmony_ci    }
298cabdff1aSopenharmony_ci
299cabdff1aSopenharmony_ci    return 0;
300cabdff1aSopenharmony_ci
301cabdff1aSopenharmony_cithread_fail:
302cabdff1aSopenharmony_ci    pthread_cond_destroy(&c->cond_wakeup_background);
303cabdff1aSopenharmony_cicond_wakeup_background_fail:
304cabdff1aSopenharmony_ci    pthread_cond_destroy(&c->cond_wakeup_main);
305cabdff1aSopenharmony_cicond_wakeup_main_fail:
306cabdff1aSopenharmony_ci    pthread_mutex_destroy(&c->mutex);
307cabdff1aSopenharmony_cimutex_fail:
308cabdff1aSopenharmony_ci    ffurl_closep(&c->inner);
309cabdff1aSopenharmony_ciurl_fail:
310cabdff1aSopenharmony_ci    ring_destroy(&c->ring);
311cabdff1aSopenharmony_cififo_fail:
312cabdff1aSopenharmony_ci    return ret;
313cabdff1aSopenharmony_ci}
314cabdff1aSopenharmony_ci
315cabdff1aSopenharmony_cistatic int async_close(URLContext *h)
316cabdff1aSopenharmony_ci{
317cabdff1aSopenharmony_ci    Context *c = h->priv_data;
318cabdff1aSopenharmony_ci    int      ret;
319cabdff1aSopenharmony_ci
320cabdff1aSopenharmony_ci    pthread_mutex_lock(&c->mutex);
321cabdff1aSopenharmony_ci    c->abort_request = 1;
322cabdff1aSopenharmony_ci    pthread_cond_signal(&c->cond_wakeup_background);
323cabdff1aSopenharmony_ci    pthread_mutex_unlock(&c->mutex);
324cabdff1aSopenharmony_ci
325cabdff1aSopenharmony_ci    ret = pthread_join(c->async_buffer_thread, NULL);
326cabdff1aSopenharmony_ci    if (ret != 0)
327cabdff1aSopenharmony_ci        av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", av_err2str(ret));
328cabdff1aSopenharmony_ci
329cabdff1aSopenharmony_ci    pthread_cond_destroy(&c->cond_wakeup_background);
330cabdff1aSopenharmony_ci    pthread_cond_destroy(&c->cond_wakeup_main);
331cabdff1aSopenharmony_ci    pthread_mutex_destroy(&c->mutex);
332cabdff1aSopenharmony_ci    ffurl_closep(&c->inner);
333cabdff1aSopenharmony_ci    ring_destroy(&c->ring);
334cabdff1aSopenharmony_ci
335cabdff1aSopenharmony_ci    return 0;
336cabdff1aSopenharmony_ci}
337cabdff1aSopenharmony_ci
338cabdff1aSopenharmony_cistatic int async_read_internal(URLContext *h, void *dest, int size)
339cabdff1aSopenharmony_ci{
340cabdff1aSopenharmony_ci    Context      *c       = h->priv_data;
341cabdff1aSopenharmony_ci    RingBuffer   *ring    = &c->ring;
342cabdff1aSopenharmony_ci    int     read_complete = !dest;
343cabdff1aSopenharmony_ci    int           to_read = size;
344cabdff1aSopenharmony_ci    int           ret     = 0;
345cabdff1aSopenharmony_ci
346cabdff1aSopenharmony_ci    pthread_mutex_lock(&c->mutex);
347cabdff1aSopenharmony_ci
348cabdff1aSopenharmony_ci    while (to_read > 0) {
349cabdff1aSopenharmony_ci        int fifo_size, to_copy;
350cabdff1aSopenharmony_ci        if (async_check_interrupt(h)) {
351cabdff1aSopenharmony_ci            ret = AVERROR_EXIT;
352cabdff1aSopenharmony_ci            break;
353cabdff1aSopenharmony_ci        }
354cabdff1aSopenharmony_ci        fifo_size = ring_size(ring);
355cabdff1aSopenharmony_ci        to_copy   = FFMIN(to_read, fifo_size);
356cabdff1aSopenharmony_ci        if (to_copy > 0) {
357cabdff1aSopenharmony_ci            ring_read(ring, dest, to_copy);
358cabdff1aSopenharmony_ci            if (dest)
359cabdff1aSopenharmony_ci                dest = (uint8_t *)dest + to_copy;
360cabdff1aSopenharmony_ci            c->logical_pos += to_copy;
361cabdff1aSopenharmony_ci            to_read        -= to_copy;
362cabdff1aSopenharmony_ci            ret             = size - to_read;
363cabdff1aSopenharmony_ci
364cabdff1aSopenharmony_ci            if (to_read <= 0 || !read_complete)
365cabdff1aSopenharmony_ci                break;
366cabdff1aSopenharmony_ci        } else if (c->io_eof_reached) {
367cabdff1aSopenharmony_ci            if (ret <= 0) {
368cabdff1aSopenharmony_ci                if (c->io_error)
369cabdff1aSopenharmony_ci                    ret = c->io_error;
370cabdff1aSopenharmony_ci                else
371cabdff1aSopenharmony_ci                    ret = AVERROR_EOF;
372cabdff1aSopenharmony_ci            }
373cabdff1aSopenharmony_ci            break;
374cabdff1aSopenharmony_ci        }
375cabdff1aSopenharmony_ci        pthread_cond_signal(&c->cond_wakeup_background);
376cabdff1aSopenharmony_ci        pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
377cabdff1aSopenharmony_ci    }
378cabdff1aSopenharmony_ci
379cabdff1aSopenharmony_ci    pthread_cond_signal(&c->cond_wakeup_background);
380cabdff1aSopenharmony_ci    pthread_mutex_unlock(&c->mutex);
381cabdff1aSopenharmony_ci
382cabdff1aSopenharmony_ci    return ret;
383cabdff1aSopenharmony_ci}
384cabdff1aSopenharmony_ci
385cabdff1aSopenharmony_cistatic int async_read(URLContext *h, unsigned char *buf, int size)
386cabdff1aSopenharmony_ci{
387cabdff1aSopenharmony_ci    return async_read_internal(h, buf, size);
388cabdff1aSopenharmony_ci}
389cabdff1aSopenharmony_ci
390cabdff1aSopenharmony_cistatic int64_t async_seek(URLContext *h, int64_t pos, int whence)
391cabdff1aSopenharmony_ci{
392cabdff1aSopenharmony_ci    Context      *c    = h->priv_data;
393cabdff1aSopenharmony_ci    RingBuffer   *ring = &c->ring;
394cabdff1aSopenharmony_ci    int64_t       ret;
395cabdff1aSopenharmony_ci    int64_t       new_logical_pos;
396cabdff1aSopenharmony_ci    int fifo_size;
397cabdff1aSopenharmony_ci    int fifo_size_of_read_back;
398cabdff1aSopenharmony_ci
399cabdff1aSopenharmony_ci    if (whence == AVSEEK_SIZE) {
400cabdff1aSopenharmony_ci        av_log(h, AV_LOG_TRACE, "async_seek: AVSEEK_SIZE: %"PRId64"\n", (int64_t)c->logical_size);
401cabdff1aSopenharmony_ci        return c->logical_size;
402cabdff1aSopenharmony_ci    } else if (whence == SEEK_CUR) {
403cabdff1aSopenharmony_ci        av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
404cabdff1aSopenharmony_ci        new_logical_pos = pos + c->logical_pos;
405cabdff1aSopenharmony_ci    } else if (whence == SEEK_SET){
406cabdff1aSopenharmony_ci        av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
407cabdff1aSopenharmony_ci        new_logical_pos = pos;
408cabdff1aSopenharmony_ci    } else {
409cabdff1aSopenharmony_ci        return AVERROR(EINVAL);
410cabdff1aSopenharmony_ci    }
411cabdff1aSopenharmony_ci    if (new_logical_pos < 0)
412cabdff1aSopenharmony_ci        return AVERROR(EINVAL);
413cabdff1aSopenharmony_ci
414cabdff1aSopenharmony_ci    fifo_size = ring_size(ring);
415cabdff1aSopenharmony_ci    fifo_size_of_read_back = ring_size_of_read_back(ring);
416cabdff1aSopenharmony_ci    if (new_logical_pos == c->logical_pos) {
417cabdff1aSopenharmony_ci        /* current position */
418cabdff1aSopenharmony_ci        return c->logical_pos;
419cabdff1aSopenharmony_ci    } else if ((new_logical_pos >= (c->logical_pos - fifo_size_of_read_back)) &&
420cabdff1aSopenharmony_ci               (new_logical_pos < (c->logical_pos + fifo_size + SHORT_SEEK_THRESHOLD))) {
421cabdff1aSopenharmony_ci        int pos_delta = (int)(new_logical_pos - c->logical_pos);
422cabdff1aSopenharmony_ci        /* fast seek */
423cabdff1aSopenharmony_ci        av_log(h, AV_LOG_TRACE, "async_seek: fask_seek %"PRId64" from %d dist:%d/%d\n",
424cabdff1aSopenharmony_ci                new_logical_pos, (int)c->logical_pos,
425cabdff1aSopenharmony_ci                (int)(new_logical_pos - c->logical_pos), fifo_size);
426cabdff1aSopenharmony_ci
427cabdff1aSopenharmony_ci        if (pos_delta > 0) {
428cabdff1aSopenharmony_ci            // fast seek forwards
429cabdff1aSopenharmony_ci            async_read_internal(h, NULL, pos_delta);
430cabdff1aSopenharmony_ci        } else {
431cabdff1aSopenharmony_ci            // fast seek backwards
432cabdff1aSopenharmony_ci            ring_drain(ring, pos_delta);
433cabdff1aSopenharmony_ci            c->logical_pos = new_logical_pos;
434cabdff1aSopenharmony_ci        }
435cabdff1aSopenharmony_ci
436cabdff1aSopenharmony_ci        return c->logical_pos;
437cabdff1aSopenharmony_ci    } else if (c->logical_size <= 0) {
438cabdff1aSopenharmony_ci        /* can not seek */
439cabdff1aSopenharmony_ci        return AVERROR(EINVAL);
440cabdff1aSopenharmony_ci    } else if (new_logical_pos > c->logical_size) {
441cabdff1aSopenharmony_ci        /* beyond end */
442cabdff1aSopenharmony_ci        return AVERROR(EINVAL);
443cabdff1aSopenharmony_ci    }
444cabdff1aSopenharmony_ci
445cabdff1aSopenharmony_ci    pthread_mutex_lock(&c->mutex);
446cabdff1aSopenharmony_ci
447cabdff1aSopenharmony_ci    c->seek_request   = 1;
448cabdff1aSopenharmony_ci    c->seek_pos       = new_logical_pos;
449cabdff1aSopenharmony_ci    c->seek_whence    = SEEK_SET;
450cabdff1aSopenharmony_ci    c->seek_completed = 0;
451cabdff1aSopenharmony_ci    c->seek_ret       = 0;
452cabdff1aSopenharmony_ci
453cabdff1aSopenharmony_ci    while (1) {
454cabdff1aSopenharmony_ci        if (async_check_interrupt(h)) {
455cabdff1aSopenharmony_ci            ret = AVERROR_EXIT;
456cabdff1aSopenharmony_ci            break;
457cabdff1aSopenharmony_ci        }
458cabdff1aSopenharmony_ci        if (c->seek_completed) {
459cabdff1aSopenharmony_ci            if (c->seek_ret >= 0)
460cabdff1aSopenharmony_ci                c->logical_pos  = c->seek_ret;
461cabdff1aSopenharmony_ci            ret = c->seek_ret;
462cabdff1aSopenharmony_ci            break;
463cabdff1aSopenharmony_ci        }
464cabdff1aSopenharmony_ci        pthread_cond_signal(&c->cond_wakeup_background);
465cabdff1aSopenharmony_ci        pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
466cabdff1aSopenharmony_ci    }
467cabdff1aSopenharmony_ci
468cabdff1aSopenharmony_ci    pthread_mutex_unlock(&c->mutex);
469cabdff1aSopenharmony_ci
470cabdff1aSopenharmony_ci    return ret;
471cabdff1aSopenharmony_ci}
472cabdff1aSopenharmony_ci
473cabdff1aSopenharmony_ci#define OFFSET(x) offsetof(Context, x)
474cabdff1aSopenharmony_ci#define D AV_OPT_FLAG_DECODING_PARAM
475cabdff1aSopenharmony_ci
476cabdff1aSopenharmony_cistatic const AVOption options[] = {
477cabdff1aSopenharmony_ci    {NULL},
478cabdff1aSopenharmony_ci};
479cabdff1aSopenharmony_ci
480cabdff1aSopenharmony_ci#undef D
481cabdff1aSopenharmony_ci#undef OFFSET
482cabdff1aSopenharmony_ci
483cabdff1aSopenharmony_cistatic const AVClass async_context_class = {
484cabdff1aSopenharmony_ci    .class_name = "Async",
485cabdff1aSopenharmony_ci    .item_name  = av_default_item_name,
486cabdff1aSopenharmony_ci    .option     = options,
487cabdff1aSopenharmony_ci    .version    = LIBAVUTIL_VERSION_INT,
488cabdff1aSopenharmony_ci};
489cabdff1aSopenharmony_ci
490cabdff1aSopenharmony_ciconst URLProtocol ff_async_protocol = {
491cabdff1aSopenharmony_ci    .name                = "async",
492cabdff1aSopenharmony_ci    .url_open2           = async_open,
493cabdff1aSopenharmony_ci    .url_read            = async_read,
494cabdff1aSopenharmony_ci    .url_seek            = async_seek,
495cabdff1aSopenharmony_ci    .url_close           = async_close,
496cabdff1aSopenharmony_ci    .priv_data_size      = sizeof(Context),
497cabdff1aSopenharmony_ci    .priv_data_class     = &async_context_class,
498cabdff1aSopenharmony_ci};
499cabdff1aSopenharmony_ci
500cabdff1aSopenharmony_ci#if 0
501cabdff1aSopenharmony_ci
502cabdff1aSopenharmony_ci#define TEST_SEEK_POS    (1536)
503cabdff1aSopenharmony_ci#define TEST_STREAM_SIZE (2048)
504cabdff1aSopenharmony_ci
505cabdff1aSopenharmony_citypedef struct TestContext {
506cabdff1aSopenharmony_ci    AVClass        *class;
507cabdff1aSopenharmony_ci    int64_t         logical_pos;
508cabdff1aSopenharmony_ci    int64_t         logical_size;
509cabdff1aSopenharmony_ci
510cabdff1aSopenharmony_ci    /* options */
511cabdff1aSopenharmony_ci    int             opt_read_error;
512cabdff1aSopenharmony_ci} TestContext;
513cabdff1aSopenharmony_ci
514cabdff1aSopenharmony_cistatic int async_test_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
515cabdff1aSopenharmony_ci{
516cabdff1aSopenharmony_ci    TestContext *c = h->priv_data;
517cabdff1aSopenharmony_ci    c->logical_pos  = 0;
518cabdff1aSopenharmony_ci    c->logical_size = TEST_STREAM_SIZE;
519cabdff1aSopenharmony_ci    return 0;
520cabdff1aSopenharmony_ci}
521cabdff1aSopenharmony_ci
522cabdff1aSopenharmony_cistatic int async_test_close(URLContext *h)
523cabdff1aSopenharmony_ci{
524cabdff1aSopenharmony_ci    return 0;
525cabdff1aSopenharmony_ci}
526cabdff1aSopenharmony_ci
527cabdff1aSopenharmony_cistatic int async_test_read(URLContext *h, unsigned char *buf, int size)
528cabdff1aSopenharmony_ci{
529cabdff1aSopenharmony_ci    TestContext *c = h->priv_data;
530cabdff1aSopenharmony_ci    int          i;
531cabdff1aSopenharmony_ci    int          read_len = 0;
532cabdff1aSopenharmony_ci
533cabdff1aSopenharmony_ci    if (c->opt_read_error)
534cabdff1aSopenharmony_ci        return c->opt_read_error;
535cabdff1aSopenharmony_ci
536cabdff1aSopenharmony_ci    if (c->logical_pos >= c->logical_size)
537cabdff1aSopenharmony_ci        return AVERROR_EOF;
538cabdff1aSopenharmony_ci
539cabdff1aSopenharmony_ci    for (i = 0; i < size; ++i) {
540cabdff1aSopenharmony_ci        buf[i] = c->logical_pos & 0xFF;
541cabdff1aSopenharmony_ci
542cabdff1aSopenharmony_ci        c->logical_pos++;
543cabdff1aSopenharmony_ci        read_len++;
544cabdff1aSopenharmony_ci
545cabdff1aSopenharmony_ci        if (c->logical_pos >= c->logical_size)
546cabdff1aSopenharmony_ci            break;
547cabdff1aSopenharmony_ci    }
548cabdff1aSopenharmony_ci
549cabdff1aSopenharmony_ci    return read_len;
550cabdff1aSopenharmony_ci}
551cabdff1aSopenharmony_ci
552cabdff1aSopenharmony_cistatic int64_t async_test_seek(URLContext *h, int64_t pos, int whence)
553cabdff1aSopenharmony_ci{
554cabdff1aSopenharmony_ci    TestContext *c = h->priv_data;
555cabdff1aSopenharmony_ci    int64_t      new_logical_pos;
556cabdff1aSopenharmony_ci
557cabdff1aSopenharmony_ci    if (whence == AVSEEK_SIZE) {
558cabdff1aSopenharmony_ci        return c->logical_size;
559cabdff1aSopenharmony_ci    } else if (whence == SEEK_CUR) {
560cabdff1aSopenharmony_ci        new_logical_pos = pos + c->logical_pos;
561cabdff1aSopenharmony_ci    } else if (whence == SEEK_SET){
562cabdff1aSopenharmony_ci        new_logical_pos = pos;
563cabdff1aSopenharmony_ci    } else {
564cabdff1aSopenharmony_ci        return AVERROR(EINVAL);
565cabdff1aSopenharmony_ci    }
566cabdff1aSopenharmony_ci    if (new_logical_pos < 0)
567cabdff1aSopenharmony_ci        return AVERROR(EINVAL);
568cabdff1aSopenharmony_ci
569cabdff1aSopenharmony_ci    c->logical_pos = new_logical_pos;
570cabdff1aSopenharmony_ci    return new_logical_pos;
571cabdff1aSopenharmony_ci}
572cabdff1aSopenharmony_ci
573cabdff1aSopenharmony_ci#define OFFSET(x) offsetof(TestContext, x)
574cabdff1aSopenharmony_ci#define D AV_OPT_FLAG_DECODING_PARAM
575cabdff1aSopenharmony_ci
576cabdff1aSopenharmony_cistatic const AVOption async_test_options[] = {
577cabdff1aSopenharmony_ci    { "async-test-read-error",      "cause read fail",
578cabdff1aSopenharmony_ci        OFFSET(opt_read_error),     AV_OPT_TYPE_INT, { .i64 = 0 }, INT_MIN, INT_MAX, .flags = D },
579cabdff1aSopenharmony_ci    {NULL},
580cabdff1aSopenharmony_ci};
581cabdff1aSopenharmony_ci
582cabdff1aSopenharmony_ci#undef D
583cabdff1aSopenharmony_ci#undef OFFSET
584cabdff1aSopenharmony_ci
585cabdff1aSopenharmony_cistatic const AVClass async_test_context_class = {
586cabdff1aSopenharmony_ci    .class_name = "Async-Test",
587cabdff1aSopenharmony_ci    .item_name  = av_default_item_name,
588cabdff1aSopenharmony_ci    .option     = async_test_options,
589cabdff1aSopenharmony_ci    .version    = LIBAVUTIL_VERSION_INT,
590cabdff1aSopenharmony_ci};
591cabdff1aSopenharmony_ci
592cabdff1aSopenharmony_ciconst URLProtocol ff_async_test_protocol = {
593cabdff1aSopenharmony_ci    .name                = "async-test",
594cabdff1aSopenharmony_ci    .url_open2           = async_test_open,
595cabdff1aSopenharmony_ci    .url_read            = async_test_read,
596cabdff1aSopenharmony_ci    .url_seek            = async_test_seek,
597cabdff1aSopenharmony_ci    .url_close           = async_test_close,
598cabdff1aSopenharmony_ci    .priv_data_size      = sizeof(TestContext),
599cabdff1aSopenharmony_ci    .priv_data_class     = &async_test_context_class,
600cabdff1aSopenharmony_ci};
601cabdff1aSopenharmony_ci
602cabdff1aSopenharmony_ciint main(void)
603cabdff1aSopenharmony_ci{
604cabdff1aSopenharmony_ci    URLContext   *h = NULL;
605cabdff1aSopenharmony_ci    int           i;
606cabdff1aSopenharmony_ci    int           ret;
607cabdff1aSopenharmony_ci    int64_t       size;
608cabdff1aSopenharmony_ci    int64_t       pos;
609cabdff1aSopenharmony_ci    int64_t       read_len;
610cabdff1aSopenharmony_ci    unsigned char buf[4096];
611cabdff1aSopenharmony_ci    AVDictionary *opts = NULL;
612cabdff1aSopenharmony_ci
613cabdff1aSopenharmony_ci    ffurl_register_protocol(&ff_async_protocol);
614cabdff1aSopenharmony_ci    ffurl_register_protocol(&ff_async_test_protocol);
615cabdff1aSopenharmony_ci
616cabdff1aSopenharmony_ci    /*
617cabdff1aSopenharmony_ci     * test normal read
618cabdff1aSopenharmony_ci     */
619cabdff1aSopenharmony_ci    ret = ffurl_open_whitelist(&h, "async:async-test:", AVIO_FLAG_READ,
620cabdff1aSopenharmony_ci                               NULL, NULL, NULL, NULL, NULL);
621cabdff1aSopenharmony_ci    printf("open: %d\n", ret);
622cabdff1aSopenharmony_ci
623cabdff1aSopenharmony_ci    size = ffurl_size(h);
624cabdff1aSopenharmony_ci    printf("size: %"PRId64"\n", size);
625cabdff1aSopenharmony_ci
626cabdff1aSopenharmony_ci    pos = ffurl_seek(h, 0, SEEK_CUR);
627cabdff1aSopenharmony_ci    read_len = 0;
628cabdff1aSopenharmony_ci    while (1) {
629cabdff1aSopenharmony_ci        ret = ffurl_read(h, buf, sizeof(buf));
630cabdff1aSopenharmony_ci        if (ret == AVERROR_EOF) {
631cabdff1aSopenharmony_ci            printf("read-error: AVERROR_EOF at %"PRId64"\n", ffurl_seek(h, 0, SEEK_CUR));
632cabdff1aSopenharmony_ci            break;
633cabdff1aSopenharmony_ci        }
634cabdff1aSopenharmony_ci        else if (ret == 0)
635cabdff1aSopenharmony_ci            break;
636cabdff1aSopenharmony_ci        else if (ret < 0) {
637cabdff1aSopenharmony_ci            printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
638cabdff1aSopenharmony_ci            goto fail;
639cabdff1aSopenharmony_ci        } else {
640cabdff1aSopenharmony_ci            for (i = 0; i < ret; ++i) {
641cabdff1aSopenharmony_ci                if (buf[i] != (pos & 0xFF)) {
642cabdff1aSopenharmony_ci                    printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
643cabdff1aSopenharmony_ci                           (int)buf[i], (int)(pos & 0xFF), pos);
644cabdff1aSopenharmony_ci                    break;
645cabdff1aSopenharmony_ci                }
646cabdff1aSopenharmony_ci                pos++;
647cabdff1aSopenharmony_ci            }
648cabdff1aSopenharmony_ci        }
649cabdff1aSopenharmony_ci
650cabdff1aSopenharmony_ci        read_len += ret;
651cabdff1aSopenharmony_ci    }
652cabdff1aSopenharmony_ci    printf("read: %"PRId64"\n", read_len);
653cabdff1aSopenharmony_ci
654cabdff1aSopenharmony_ci    /*
655cabdff1aSopenharmony_ci     * test normal seek
656cabdff1aSopenharmony_ci     */
657cabdff1aSopenharmony_ci    ret = ffurl_read(h, buf, 1);
658cabdff1aSopenharmony_ci    printf("read: %d\n", ret);
659cabdff1aSopenharmony_ci
660cabdff1aSopenharmony_ci    pos = ffurl_seek(h, TEST_SEEK_POS, SEEK_SET);
661cabdff1aSopenharmony_ci    printf("seek: %"PRId64"\n", pos);
662cabdff1aSopenharmony_ci
663cabdff1aSopenharmony_ci    read_len = 0;
664cabdff1aSopenharmony_ci    while (1) {
665cabdff1aSopenharmony_ci        ret = ffurl_read(h, buf, sizeof(buf));
666cabdff1aSopenharmony_ci        if (ret == AVERROR_EOF)
667cabdff1aSopenharmony_ci            break;
668cabdff1aSopenharmony_ci        else if (ret == 0)
669cabdff1aSopenharmony_ci            break;
670cabdff1aSopenharmony_ci        else if (ret < 0) {
671cabdff1aSopenharmony_ci            printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
672cabdff1aSopenharmony_ci            goto fail;
673cabdff1aSopenharmony_ci        } else {
674cabdff1aSopenharmony_ci            for (i = 0; i < ret; ++i) {
675cabdff1aSopenharmony_ci                if (buf[i] != (pos & 0xFF)) {
676cabdff1aSopenharmony_ci                    printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
677cabdff1aSopenharmony_ci                           (int)buf[i], (int)(pos & 0xFF), pos);
678cabdff1aSopenharmony_ci                    break;
679cabdff1aSopenharmony_ci                }
680cabdff1aSopenharmony_ci                pos++;
681cabdff1aSopenharmony_ci            }
682cabdff1aSopenharmony_ci        }
683cabdff1aSopenharmony_ci
684cabdff1aSopenharmony_ci        read_len += ret;
685cabdff1aSopenharmony_ci    }
686cabdff1aSopenharmony_ci    printf("read: %"PRId64"\n", read_len);
687cabdff1aSopenharmony_ci
688cabdff1aSopenharmony_ci    ret = ffurl_read(h, buf, 1);
689cabdff1aSopenharmony_ci    printf("read: %d\n", ret);
690cabdff1aSopenharmony_ci
691cabdff1aSopenharmony_ci    /*
692cabdff1aSopenharmony_ci     * test read error
693cabdff1aSopenharmony_ci     */
694cabdff1aSopenharmony_ci    ffurl_close(h);
695cabdff1aSopenharmony_ci    av_dict_set_int(&opts, "async-test-read-error", -10000, 0);
696cabdff1aSopenharmony_ci    ret = ffurl_open_whitelist(&h, "async:async-test:", AVIO_FLAG_READ,
697cabdff1aSopenharmony_ci                               NULL, &opts, NULL, NULL, NULL);
698cabdff1aSopenharmony_ci    printf("open: %d\n", ret);
699cabdff1aSopenharmony_ci
700cabdff1aSopenharmony_ci    ret = ffurl_read(h, buf, 1);
701cabdff1aSopenharmony_ci    printf("read: %d\n", ret);
702cabdff1aSopenharmony_ci
703cabdff1aSopenharmony_cifail:
704cabdff1aSopenharmony_ci    av_dict_free(&opts);
705cabdff1aSopenharmony_ci    ffurl_close(h);
706cabdff1aSopenharmony_ci    return 0;
707cabdff1aSopenharmony_ci}
708cabdff1aSopenharmony_ci
709cabdff1aSopenharmony_ci#endif
710