1cabdff1aSopenharmony_ci/* 2cabdff1aSopenharmony_ci * Input async protocol. 3cabdff1aSopenharmony_ci * Copyright (c) 2015 Zhang Rui <bbcallen@gmail.com> 4cabdff1aSopenharmony_ci * 5cabdff1aSopenharmony_ci * This file is part of FFmpeg. 6cabdff1aSopenharmony_ci * 7cabdff1aSopenharmony_ci * FFmpeg is free software; you can redistribute it and/or 8cabdff1aSopenharmony_ci * modify it under the terms of the GNU Lesser General Public 9cabdff1aSopenharmony_ci * License as published by the Free Software Foundation; either 10cabdff1aSopenharmony_ci * version 2.1 of the License, or (at your option) any later version. 11cabdff1aSopenharmony_ci * 12cabdff1aSopenharmony_ci * FFmpeg is distributed in the hope that it will be useful, 13cabdff1aSopenharmony_ci * but WITHOUT ANY WARRANTY; without even the implied warranty of 14cabdff1aSopenharmony_ci * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15cabdff1aSopenharmony_ci * Lesser General Public License for more details. 16cabdff1aSopenharmony_ci * 17cabdff1aSopenharmony_ci * You should have received a copy of the GNU Lesser General Public 18cabdff1aSopenharmony_ci * License along with FFmpeg; if not, write to the Free Software 19cabdff1aSopenharmony_ci * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 20cabdff1aSopenharmony_ci * 21cabdff1aSopenharmony_ci * Based on libavformat/cache.c by Michael Niedermayer 22cabdff1aSopenharmony_ci */ 23cabdff1aSopenharmony_ci 24cabdff1aSopenharmony_ci /** 25cabdff1aSopenharmony_ci * @TODO 26cabdff1aSopenharmony_ci * support timeout 27cabdff1aSopenharmony_ci * support work with concatdec, hls 28cabdff1aSopenharmony_ci */ 29cabdff1aSopenharmony_ci 30cabdff1aSopenharmony_ci#include "libavutil/avassert.h" 31cabdff1aSopenharmony_ci#include "libavutil/avstring.h" 32cabdff1aSopenharmony_ci#include "libavutil/error.h" 33cabdff1aSopenharmony_ci#include "libavutil/fifo.h" 34cabdff1aSopenharmony_ci#include "libavutil/log.h" 35cabdff1aSopenharmony_ci#include "libavutil/opt.h" 36cabdff1aSopenharmony_ci#include "libavutil/thread.h" 37cabdff1aSopenharmony_ci#include "url.h" 38cabdff1aSopenharmony_ci#include <stdint.h> 39cabdff1aSopenharmony_ci 40cabdff1aSopenharmony_ci#if HAVE_UNISTD_H 41cabdff1aSopenharmony_ci#include <unistd.h> 42cabdff1aSopenharmony_ci#endif 43cabdff1aSopenharmony_ci 44cabdff1aSopenharmony_ci#define BUFFER_CAPACITY (4 * 1024 * 1024) 45cabdff1aSopenharmony_ci#define READ_BACK_CAPACITY (4 * 1024 * 1024) 46cabdff1aSopenharmony_ci#define SHORT_SEEK_THRESHOLD (256 * 1024) 47cabdff1aSopenharmony_ci 48cabdff1aSopenharmony_citypedef struct RingBuffer 49cabdff1aSopenharmony_ci{ 50cabdff1aSopenharmony_ci AVFifo *fifo; 51cabdff1aSopenharmony_ci int read_back_capacity; 52cabdff1aSopenharmony_ci 53cabdff1aSopenharmony_ci int read_pos; 54cabdff1aSopenharmony_ci} RingBuffer; 55cabdff1aSopenharmony_ci 56cabdff1aSopenharmony_citypedef struct Context { 57cabdff1aSopenharmony_ci AVClass *class; 58cabdff1aSopenharmony_ci URLContext *inner; 59cabdff1aSopenharmony_ci 60cabdff1aSopenharmony_ci int seek_request; 61cabdff1aSopenharmony_ci int64_t seek_pos; 62cabdff1aSopenharmony_ci int seek_whence; 63cabdff1aSopenharmony_ci int seek_completed; 64cabdff1aSopenharmony_ci int64_t seek_ret; 65cabdff1aSopenharmony_ci 66cabdff1aSopenharmony_ci int inner_io_error; 67cabdff1aSopenharmony_ci int io_error; 68cabdff1aSopenharmony_ci int io_eof_reached; 69cabdff1aSopenharmony_ci 70cabdff1aSopenharmony_ci int64_t logical_pos; 71cabdff1aSopenharmony_ci int64_t logical_size; 72cabdff1aSopenharmony_ci RingBuffer ring; 73cabdff1aSopenharmony_ci 74cabdff1aSopenharmony_ci pthread_cond_t cond_wakeup_main; 75cabdff1aSopenharmony_ci pthread_cond_t cond_wakeup_background; 76cabdff1aSopenharmony_ci pthread_mutex_t mutex; 77cabdff1aSopenharmony_ci pthread_t async_buffer_thread; 78cabdff1aSopenharmony_ci 79cabdff1aSopenharmony_ci int abort_request; 80cabdff1aSopenharmony_ci AVIOInterruptCB interrupt_callback; 81cabdff1aSopenharmony_ci} Context; 82cabdff1aSopenharmony_ci 83cabdff1aSopenharmony_cistatic int ring_init(RingBuffer *ring, unsigned int capacity, int read_back_capacity) 84cabdff1aSopenharmony_ci{ 85cabdff1aSopenharmony_ci memset(ring, 0, sizeof(RingBuffer)); 86cabdff1aSopenharmony_ci ring->fifo = av_fifo_alloc2(capacity + read_back_capacity, 1, 0); 87cabdff1aSopenharmony_ci if (!ring->fifo) 88cabdff1aSopenharmony_ci return AVERROR(ENOMEM); 89cabdff1aSopenharmony_ci 90cabdff1aSopenharmony_ci ring->read_back_capacity = read_back_capacity; 91cabdff1aSopenharmony_ci return 0; 92cabdff1aSopenharmony_ci} 93cabdff1aSopenharmony_ci 94cabdff1aSopenharmony_cistatic void ring_destroy(RingBuffer *ring) 95cabdff1aSopenharmony_ci{ 96cabdff1aSopenharmony_ci av_fifo_freep2(&ring->fifo); 97cabdff1aSopenharmony_ci} 98cabdff1aSopenharmony_ci 99cabdff1aSopenharmony_cistatic void ring_reset(RingBuffer *ring) 100cabdff1aSopenharmony_ci{ 101cabdff1aSopenharmony_ci av_fifo_reset2(ring->fifo); 102cabdff1aSopenharmony_ci ring->read_pos = 0; 103cabdff1aSopenharmony_ci} 104cabdff1aSopenharmony_ci 105cabdff1aSopenharmony_cistatic int ring_size(RingBuffer *ring) 106cabdff1aSopenharmony_ci{ 107cabdff1aSopenharmony_ci return av_fifo_can_read(ring->fifo) - ring->read_pos; 108cabdff1aSopenharmony_ci} 109cabdff1aSopenharmony_ci 110cabdff1aSopenharmony_cistatic int ring_space(RingBuffer *ring) 111cabdff1aSopenharmony_ci{ 112cabdff1aSopenharmony_ci return av_fifo_can_write(ring->fifo); 113cabdff1aSopenharmony_ci} 114cabdff1aSopenharmony_ci 115cabdff1aSopenharmony_cistatic int ring_read(RingBuffer *ring, void *dest, int buf_size) 116cabdff1aSopenharmony_ci{ 117cabdff1aSopenharmony_ci int ret = 0; 118cabdff1aSopenharmony_ci 119cabdff1aSopenharmony_ci av_assert2(buf_size <= ring_size(ring)); 120cabdff1aSopenharmony_ci if (dest) 121cabdff1aSopenharmony_ci ret = av_fifo_peek(ring->fifo, dest, buf_size, ring->read_pos); 122cabdff1aSopenharmony_ci ring->read_pos += buf_size; 123cabdff1aSopenharmony_ci 124cabdff1aSopenharmony_ci if (ring->read_pos > ring->read_back_capacity) { 125cabdff1aSopenharmony_ci av_fifo_drain2(ring->fifo, ring->read_pos - ring->read_back_capacity); 126cabdff1aSopenharmony_ci ring->read_pos = ring->read_back_capacity; 127cabdff1aSopenharmony_ci } 128cabdff1aSopenharmony_ci 129cabdff1aSopenharmony_ci return ret; 130cabdff1aSopenharmony_ci} 131cabdff1aSopenharmony_ci 132cabdff1aSopenharmony_cistatic int wrapped_url_read(void *src, void *dst, size_t *size) 133cabdff1aSopenharmony_ci{ 134cabdff1aSopenharmony_ci URLContext *h = src; 135cabdff1aSopenharmony_ci Context *c = h->priv_data; 136cabdff1aSopenharmony_ci int ret; 137cabdff1aSopenharmony_ci 138cabdff1aSopenharmony_ci ret = ffurl_read(c->inner, dst, *size); 139cabdff1aSopenharmony_ci *size = ret > 0 ? ret : 0; 140cabdff1aSopenharmony_ci c->inner_io_error = ret < 0 ? ret : 0; 141cabdff1aSopenharmony_ci 142cabdff1aSopenharmony_ci return c->inner_io_error; 143cabdff1aSopenharmony_ci} 144cabdff1aSopenharmony_ci 145cabdff1aSopenharmony_cistatic int ring_write(RingBuffer *ring, URLContext *h, size_t size) 146cabdff1aSopenharmony_ci{ 147cabdff1aSopenharmony_ci int ret; 148cabdff1aSopenharmony_ci 149cabdff1aSopenharmony_ci av_assert2(size <= ring_space(ring)); 150cabdff1aSopenharmony_ci ret = av_fifo_write_from_cb(ring->fifo, wrapped_url_read, h, &size); 151cabdff1aSopenharmony_ci if (ret < 0) 152cabdff1aSopenharmony_ci return ret; 153cabdff1aSopenharmony_ci 154cabdff1aSopenharmony_ci return size; 155cabdff1aSopenharmony_ci} 156cabdff1aSopenharmony_ci 157cabdff1aSopenharmony_cistatic int ring_size_of_read_back(RingBuffer *ring) 158cabdff1aSopenharmony_ci{ 159cabdff1aSopenharmony_ci return ring->read_pos; 160cabdff1aSopenharmony_ci} 161cabdff1aSopenharmony_ci 162cabdff1aSopenharmony_cistatic int ring_drain(RingBuffer *ring, int offset) 163cabdff1aSopenharmony_ci{ 164cabdff1aSopenharmony_ci av_assert2(offset >= -ring_size_of_read_back(ring)); 165cabdff1aSopenharmony_ci av_assert2(offset <= ring_size(ring)); 166cabdff1aSopenharmony_ci ring->read_pos += offset; 167cabdff1aSopenharmony_ci return 0; 168cabdff1aSopenharmony_ci} 169cabdff1aSopenharmony_ci 170cabdff1aSopenharmony_cistatic int async_check_interrupt(void *arg) 171cabdff1aSopenharmony_ci{ 172cabdff1aSopenharmony_ci URLContext *h = arg; 173cabdff1aSopenharmony_ci Context *c = h->priv_data; 174cabdff1aSopenharmony_ci 175cabdff1aSopenharmony_ci if (c->abort_request) 176cabdff1aSopenharmony_ci return 1; 177cabdff1aSopenharmony_ci 178cabdff1aSopenharmony_ci if (ff_check_interrupt(&c->interrupt_callback)) 179cabdff1aSopenharmony_ci c->abort_request = 1; 180cabdff1aSopenharmony_ci 181cabdff1aSopenharmony_ci return c->abort_request; 182cabdff1aSopenharmony_ci} 183cabdff1aSopenharmony_ci 184cabdff1aSopenharmony_cistatic void *async_buffer_task(void *arg) 185cabdff1aSopenharmony_ci{ 186cabdff1aSopenharmony_ci URLContext *h = arg; 187cabdff1aSopenharmony_ci Context *c = h->priv_data; 188cabdff1aSopenharmony_ci RingBuffer *ring = &c->ring; 189cabdff1aSopenharmony_ci int ret = 0; 190cabdff1aSopenharmony_ci int64_t seek_ret; 191cabdff1aSopenharmony_ci 192cabdff1aSopenharmony_ci while (1) { 193cabdff1aSopenharmony_ci int fifo_space, to_copy; 194cabdff1aSopenharmony_ci 195cabdff1aSopenharmony_ci pthread_mutex_lock(&c->mutex); 196cabdff1aSopenharmony_ci if (async_check_interrupt(h)) { 197cabdff1aSopenharmony_ci c->io_eof_reached = 1; 198cabdff1aSopenharmony_ci c->io_error = AVERROR_EXIT; 199cabdff1aSopenharmony_ci pthread_cond_signal(&c->cond_wakeup_main); 200cabdff1aSopenharmony_ci pthread_mutex_unlock(&c->mutex); 201cabdff1aSopenharmony_ci break; 202cabdff1aSopenharmony_ci } 203cabdff1aSopenharmony_ci 204cabdff1aSopenharmony_ci if (c->seek_request) { 205cabdff1aSopenharmony_ci seek_ret = ffurl_seek(c->inner, c->seek_pos, c->seek_whence); 206cabdff1aSopenharmony_ci if (seek_ret >= 0) { 207cabdff1aSopenharmony_ci c->io_eof_reached = 0; 208cabdff1aSopenharmony_ci c->io_error = 0; 209cabdff1aSopenharmony_ci ring_reset(ring); 210cabdff1aSopenharmony_ci } 211cabdff1aSopenharmony_ci 212cabdff1aSopenharmony_ci c->seek_completed = 1; 213cabdff1aSopenharmony_ci c->seek_ret = seek_ret; 214cabdff1aSopenharmony_ci c->seek_request = 0; 215cabdff1aSopenharmony_ci 216cabdff1aSopenharmony_ci 217cabdff1aSopenharmony_ci pthread_cond_signal(&c->cond_wakeup_main); 218cabdff1aSopenharmony_ci pthread_mutex_unlock(&c->mutex); 219cabdff1aSopenharmony_ci continue; 220cabdff1aSopenharmony_ci } 221cabdff1aSopenharmony_ci 222cabdff1aSopenharmony_ci fifo_space = ring_space(ring); 223cabdff1aSopenharmony_ci if (c->io_eof_reached || fifo_space <= 0) { 224cabdff1aSopenharmony_ci pthread_cond_signal(&c->cond_wakeup_main); 225cabdff1aSopenharmony_ci pthread_cond_wait(&c->cond_wakeup_background, &c->mutex); 226cabdff1aSopenharmony_ci pthread_mutex_unlock(&c->mutex); 227cabdff1aSopenharmony_ci continue; 228cabdff1aSopenharmony_ci } 229cabdff1aSopenharmony_ci pthread_mutex_unlock(&c->mutex); 230cabdff1aSopenharmony_ci 231cabdff1aSopenharmony_ci to_copy = FFMIN(4096, fifo_space); 232cabdff1aSopenharmony_ci ret = ring_write(ring, h, to_copy); 233cabdff1aSopenharmony_ci 234cabdff1aSopenharmony_ci pthread_mutex_lock(&c->mutex); 235cabdff1aSopenharmony_ci if (ret <= 0) { 236cabdff1aSopenharmony_ci c->io_eof_reached = 1; 237cabdff1aSopenharmony_ci if (c->inner_io_error < 0) 238cabdff1aSopenharmony_ci c->io_error = c->inner_io_error; 239cabdff1aSopenharmony_ci } 240cabdff1aSopenharmony_ci 241cabdff1aSopenharmony_ci pthread_cond_signal(&c->cond_wakeup_main); 242cabdff1aSopenharmony_ci pthread_mutex_unlock(&c->mutex); 243cabdff1aSopenharmony_ci } 244cabdff1aSopenharmony_ci 245cabdff1aSopenharmony_ci return NULL; 246cabdff1aSopenharmony_ci} 247cabdff1aSopenharmony_ci 248cabdff1aSopenharmony_cistatic int async_open(URLContext *h, const char *arg, int flags, AVDictionary **options) 249cabdff1aSopenharmony_ci{ 250cabdff1aSopenharmony_ci Context *c = h->priv_data; 251cabdff1aSopenharmony_ci int ret; 252cabdff1aSopenharmony_ci AVIOInterruptCB interrupt_callback = {.callback = async_check_interrupt, .opaque = h}; 253cabdff1aSopenharmony_ci 254cabdff1aSopenharmony_ci av_strstart(arg, "async:", &arg); 255cabdff1aSopenharmony_ci 256cabdff1aSopenharmony_ci ret = ring_init(&c->ring, BUFFER_CAPACITY, READ_BACK_CAPACITY); 257cabdff1aSopenharmony_ci if (ret < 0) 258cabdff1aSopenharmony_ci goto fifo_fail; 259cabdff1aSopenharmony_ci 260cabdff1aSopenharmony_ci /* wrap interrupt callback */ 261cabdff1aSopenharmony_ci c->interrupt_callback = h->interrupt_callback; 262cabdff1aSopenharmony_ci ret = ffurl_open_whitelist(&c->inner, arg, flags, &interrupt_callback, options, h->protocol_whitelist, h->protocol_blacklist, h); 263cabdff1aSopenharmony_ci if (ret != 0) { 264cabdff1aSopenharmony_ci av_log(h, AV_LOG_ERROR, "ffurl_open failed : %s, %s\n", av_err2str(ret), arg); 265cabdff1aSopenharmony_ci goto url_fail; 266cabdff1aSopenharmony_ci } 267cabdff1aSopenharmony_ci 268cabdff1aSopenharmony_ci c->logical_size = ffurl_size(c->inner); 269cabdff1aSopenharmony_ci h->is_streamed = c->inner->is_streamed; 270cabdff1aSopenharmony_ci 271cabdff1aSopenharmony_ci ret = pthread_mutex_init(&c->mutex, NULL); 272cabdff1aSopenharmony_ci if (ret != 0) { 273cabdff1aSopenharmony_ci ret = AVERROR(ret); 274cabdff1aSopenharmony_ci av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", av_err2str(ret)); 275cabdff1aSopenharmony_ci goto mutex_fail; 276cabdff1aSopenharmony_ci } 277cabdff1aSopenharmony_ci 278cabdff1aSopenharmony_ci ret = pthread_cond_init(&c->cond_wakeup_main, NULL); 279cabdff1aSopenharmony_ci if (ret != 0) { 280cabdff1aSopenharmony_ci ret = AVERROR(ret); 281cabdff1aSopenharmony_ci av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret)); 282cabdff1aSopenharmony_ci goto cond_wakeup_main_fail; 283cabdff1aSopenharmony_ci } 284cabdff1aSopenharmony_ci 285cabdff1aSopenharmony_ci ret = pthread_cond_init(&c->cond_wakeup_background, NULL); 286cabdff1aSopenharmony_ci if (ret != 0) { 287cabdff1aSopenharmony_ci ret = AVERROR(ret); 288cabdff1aSopenharmony_ci av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret)); 289cabdff1aSopenharmony_ci goto cond_wakeup_background_fail; 290cabdff1aSopenharmony_ci } 291cabdff1aSopenharmony_ci 292cabdff1aSopenharmony_ci ret = pthread_create(&c->async_buffer_thread, NULL, async_buffer_task, h); 293cabdff1aSopenharmony_ci if (ret) { 294cabdff1aSopenharmony_ci ret = AVERROR(ret); 295cabdff1aSopenharmony_ci av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", av_err2str(ret)); 296cabdff1aSopenharmony_ci goto thread_fail; 297cabdff1aSopenharmony_ci } 298cabdff1aSopenharmony_ci 299cabdff1aSopenharmony_ci return 0; 300cabdff1aSopenharmony_ci 301cabdff1aSopenharmony_cithread_fail: 302cabdff1aSopenharmony_ci pthread_cond_destroy(&c->cond_wakeup_background); 303cabdff1aSopenharmony_cicond_wakeup_background_fail: 304cabdff1aSopenharmony_ci pthread_cond_destroy(&c->cond_wakeup_main); 305cabdff1aSopenharmony_cicond_wakeup_main_fail: 306cabdff1aSopenharmony_ci pthread_mutex_destroy(&c->mutex); 307cabdff1aSopenharmony_cimutex_fail: 308cabdff1aSopenharmony_ci ffurl_closep(&c->inner); 309cabdff1aSopenharmony_ciurl_fail: 310cabdff1aSopenharmony_ci ring_destroy(&c->ring); 311cabdff1aSopenharmony_cififo_fail: 312cabdff1aSopenharmony_ci return ret; 313cabdff1aSopenharmony_ci} 314cabdff1aSopenharmony_ci 315cabdff1aSopenharmony_cistatic int async_close(URLContext *h) 316cabdff1aSopenharmony_ci{ 317cabdff1aSopenharmony_ci Context *c = h->priv_data; 318cabdff1aSopenharmony_ci int ret; 319cabdff1aSopenharmony_ci 320cabdff1aSopenharmony_ci pthread_mutex_lock(&c->mutex); 321cabdff1aSopenharmony_ci c->abort_request = 1; 322cabdff1aSopenharmony_ci pthread_cond_signal(&c->cond_wakeup_background); 323cabdff1aSopenharmony_ci pthread_mutex_unlock(&c->mutex); 324cabdff1aSopenharmony_ci 325cabdff1aSopenharmony_ci ret = pthread_join(c->async_buffer_thread, NULL); 326cabdff1aSopenharmony_ci if (ret != 0) 327cabdff1aSopenharmony_ci av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", av_err2str(ret)); 328cabdff1aSopenharmony_ci 329cabdff1aSopenharmony_ci pthread_cond_destroy(&c->cond_wakeup_background); 330cabdff1aSopenharmony_ci pthread_cond_destroy(&c->cond_wakeup_main); 331cabdff1aSopenharmony_ci pthread_mutex_destroy(&c->mutex); 332cabdff1aSopenharmony_ci ffurl_closep(&c->inner); 333cabdff1aSopenharmony_ci ring_destroy(&c->ring); 334cabdff1aSopenharmony_ci 335cabdff1aSopenharmony_ci return 0; 336cabdff1aSopenharmony_ci} 337cabdff1aSopenharmony_ci 338cabdff1aSopenharmony_cistatic int async_read_internal(URLContext *h, void *dest, int size) 339cabdff1aSopenharmony_ci{ 340cabdff1aSopenharmony_ci Context *c = h->priv_data; 341cabdff1aSopenharmony_ci RingBuffer *ring = &c->ring; 342cabdff1aSopenharmony_ci int read_complete = !dest; 343cabdff1aSopenharmony_ci int to_read = size; 344cabdff1aSopenharmony_ci int ret = 0; 345cabdff1aSopenharmony_ci 346cabdff1aSopenharmony_ci pthread_mutex_lock(&c->mutex); 347cabdff1aSopenharmony_ci 348cabdff1aSopenharmony_ci while (to_read > 0) { 349cabdff1aSopenharmony_ci int fifo_size, to_copy; 350cabdff1aSopenharmony_ci if (async_check_interrupt(h)) { 351cabdff1aSopenharmony_ci ret = AVERROR_EXIT; 352cabdff1aSopenharmony_ci break; 353cabdff1aSopenharmony_ci } 354cabdff1aSopenharmony_ci fifo_size = ring_size(ring); 355cabdff1aSopenharmony_ci to_copy = FFMIN(to_read, fifo_size); 356cabdff1aSopenharmony_ci if (to_copy > 0) { 357cabdff1aSopenharmony_ci ring_read(ring, dest, to_copy); 358cabdff1aSopenharmony_ci if (dest) 359cabdff1aSopenharmony_ci dest = (uint8_t *)dest + to_copy; 360cabdff1aSopenharmony_ci c->logical_pos += to_copy; 361cabdff1aSopenharmony_ci to_read -= to_copy; 362cabdff1aSopenharmony_ci ret = size - to_read; 363cabdff1aSopenharmony_ci 364cabdff1aSopenharmony_ci if (to_read <= 0 || !read_complete) 365cabdff1aSopenharmony_ci break; 366cabdff1aSopenharmony_ci } else if (c->io_eof_reached) { 367cabdff1aSopenharmony_ci if (ret <= 0) { 368cabdff1aSopenharmony_ci if (c->io_error) 369cabdff1aSopenharmony_ci ret = c->io_error; 370cabdff1aSopenharmony_ci else 371cabdff1aSopenharmony_ci ret = AVERROR_EOF; 372cabdff1aSopenharmony_ci } 373cabdff1aSopenharmony_ci break; 374cabdff1aSopenharmony_ci } 375cabdff1aSopenharmony_ci pthread_cond_signal(&c->cond_wakeup_background); 376cabdff1aSopenharmony_ci pthread_cond_wait(&c->cond_wakeup_main, &c->mutex); 377cabdff1aSopenharmony_ci } 378cabdff1aSopenharmony_ci 379cabdff1aSopenharmony_ci pthread_cond_signal(&c->cond_wakeup_background); 380cabdff1aSopenharmony_ci pthread_mutex_unlock(&c->mutex); 381cabdff1aSopenharmony_ci 382cabdff1aSopenharmony_ci return ret; 383cabdff1aSopenharmony_ci} 384cabdff1aSopenharmony_ci 385cabdff1aSopenharmony_cistatic int async_read(URLContext *h, unsigned char *buf, int size) 386cabdff1aSopenharmony_ci{ 387cabdff1aSopenharmony_ci return async_read_internal(h, buf, size); 388cabdff1aSopenharmony_ci} 389cabdff1aSopenharmony_ci 390cabdff1aSopenharmony_cistatic int64_t async_seek(URLContext *h, int64_t pos, int whence) 391cabdff1aSopenharmony_ci{ 392cabdff1aSopenharmony_ci Context *c = h->priv_data; 393cabdff1aSopenharmony_ci RingBuffer *ring = &c->ring; 394cabdff1aSopenharmony_ci int64_t ret; 395cabdff1aSopenharmony_ci int64_t new_logical_pos; 396cabdff1aSopenharmony_ci int fifo_size; 397cabdff1aSopenharmony_ci int fifo_size_of_read_back; 398cabdff1aSopenharmony_ci 399cabdff1aSopenharmony_ci if (whence == AVSEEK_SIZE) { 400cabdff1aSopenharmony_ci av_log(h, AV_LOG_TRACE, "async_seek: AVSEEK_SIZE: %"PRId64"\n", (int64_t)c->logical_size); 401cabdff1aSopenharmony_ci return c->logical_size; 402cabdff1aSopenharmony_ci } else if (whence == SEEK_CUR) { 403cabdff1aSopenharmony_ci av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos); 404cabdff1aSopenharmony_ci new_logical_pos = pos + c->logical_pos; 405cabdff1aSopenharmony_ci } else if (whence == SEEK_SET){ 406cabdff1aSopenharmony_ci av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos); 407cabdff1aSopenharmony_ci new_logical_pos = pos; 408cabdff1aSopenharmony_ci } else { 409cabdff1aSopenharmony_ci return AVERROR(EINVAL); 410cabdff1aSopenharmony_ci } 411cabdff1aSopenharmony_ci if (new_logical_pos < 0) 412cabdff1aSopenharmony_ci return AVERROR(EINVAL); 413cabdff1aSopenharmony_ci 414cabdff1aSopenharmony_ci fifo_size = ring_size(ring); 415cabdff1aSopenharmony_ci fifo_size_of_read_back = ring_size_of_read_back(ring); 416cabdff1aSopenharmony_ci if (new_logical_pos == c->logical_pos) { 417cabdff1aSopenharmony_ci /* current position */ 418cabdff1aSopenharmony_ci return c->logical_pos; 419cabdff1aSopenharmony_ci } else if ((new_logical_pos >= (c->logical_pos - fifo_size_of_read_back)) && 420cabdff1aSopenharmony_ci (new_logical_pos < (c->logical_pos + fifo_size + SHORT_SEEK_THRESHOLD))) { 421cabdff1aSopenharmony_ci int pos_delta = (int)(new_logical_pos - c->logical_pos); 422cabdff1aSopenharmony_ci /* fast seek */ 423cabdff1aSopenharmony_ci av_log(h, AV_LOG_TRACE, "async_seek: fask_seek %"PRId64" from %d dist:%d/%d\n", 424cabdff1aSopenharmony_ci new_logical_pos, (int)c->logical_pos, 425cabdff1aSopenharmony_ci (int)(new_logical_pos - c->logical_pos), fifo_size); 426cabdff1aSopenharmony_ci 427cabdff1aSopenharmony_ci if (pos_delta > 0) { 428cabdff1aSopenharmony_ci // fast seek forwards 429cabdff1aSopenharmony_ci async_read_internal(h, NULL, pos_delta); 430cabdff1aSopenharmony_ci } else { 431cabdff1aSopenharmony_ci // fast seek backwards 432cabdff1aSopenharmony_ci ring_drain(ring, pos_delta); 433cabdff1aSopenharmony_ci c->logical_pos = new_logical_pos; 434cabdff1aSopenharmony_ci } 435cabdff1aSopenharmony_ci 436cabdff1aSopenharmony_ci return c->logical_pos; 437cabdff1aSopenharmony_ci } else if (c->logical_size <= 0) { 438cabdff1aSopenharmony_ci /* can not seek */ 439cabdff1aSopenharmony_ci return AVERROR(EINVAL); 440cabdff1aSopenharmony_ci } else if (new_logical_pos > c->logical_size) { 441cabdff1aSopenharmony_ci /* beyond end */ 442cabdff1aSopenharmony_ci return AVERROR(EINVAL); 443cabdff1aSopenharmony_ci } 444cabdff1aSopenharmony_ci 445cabdff1aSopenharmony_ci pthread_mutex_lock(&c->mutex); 446cabdff1aSopenharmony_ci 447cabdff1aSopenharmony_ci c->seek_request = 1; 448cabdff1aSopenharmony_ci c->seek_pos = new_logical_pos; 449cabdff1aSopenharmony_ci c->seek_whence = SEEK_SET; 450cabdff1aSopenharmony_ci c->seek_completed = 0; 451cabdff1aSopenharmony_ci c->seek_ret = 0; 452cabdff1aSopenharmony_ci 453cabdff1aSopenharmony_ci while (1) { 454cabdff1aSopenharmony_ci if (async_check_interrupt(h)) { 455cabdff1aSopenharmony_ci ret = AVERROR_EXIT; 456cabdff1aSopenharmony_ci break; 457cabdff1aSopenharmony_ci } 458cabdff1aSopenharmony_ci if (c->seek_completed) { 459cabdff1aSopenharmony_ci if (c->seek_ret >= 0) 460cabdff1aSopenharmony_ci c->logical_pos = c->seek_ret; 461cabdff1aSopenharmony_ci ret = c->seek_ret; 462cabdff1aSopenharmony_ci break; 463cabdff1aSopenharmony_ci } 464cabdff1aSopenharmony_ci pthread_cond_signal(&c->cond_wakeup_background); 465cabdff1aSopenharmony_ci pthread_cond_wait(&c->cond_wakeup_main, &c->mutex); 466cabdff1aSopenharmony_ci } 467cabdff1aSopenharmony_ci 468cabdff1aSopenharmony_ci pthread_mutex_unlock(&c->mutex); 469cabdff1aSopenharmony_ci 470cabdff1aSopenharmony_ci return ret; 471cabdff1aSopenharmony_ci} 472cabdff1aSopenharmony_ci 473cabdff1aSopenharmony_ci#define OFFSET(x) offsetof(Context, x) 474cabdff1aSopenharmony_ci#define D AV_OPT_FLAG_DECODING_PARAM 475cabdff1aSopenharmony_ci 476cabdff1aSopenharmony_cistatic const AVOption options[] = { 477cabdff1aSopenharmony_ci {NULL}, 478cabdff1aSopenharmony_ci}; 479cabdff1aSopenharmony_ci 480cabdff1aSopenharmony_ci#undef D 481cabdff1aSopenharmony_ci#undef OFFSET 482cabdff1aSopenharmony_ci 483cabdff1aSopenharmony_cistatic const AVClass async_context_class = { 484cabdff1aSopenharmony_ci .class_name = "Async", 485cabdff1aSopenharmony_ci .item_name = av_default_item_name, 486cabdff1aSopenharmony_ci .option = options, 487cabdff1aSopenharmony_ci .version = LIBAVUTIL_VERSION_INT, 488cabdff1aSopenharmony_ci}; 489cabdff1aSopenharmony_ci 490cabdff1aSopenharmony_ciconst URLProtocol ff_async_protocol = { 491cabdff1aSopenharmony_ci .name = "async", 492cabdff1aSopenharmony_ci .url_open2 = async_open, 493cabdff1aSopenharmony_ci .url_read = async_read, 494cabdff1aSopenharmony_ci .url_seek = async_seek, 495cabdff1aSopenharmony_ci .url_close = async_close, 496cabdff1aSopenharmony_ci .priv_data_size = sizeof(Context), 497cabdff1aSopenharmony_ci .priv_data_class = &async_context_class, 498cabdff1aSopenharmony_ci}; 499cabdff1aSopenharmony_ci 500cabdff1aSopenharmony_ci#if 0 501cabdff1aSopenharmony_ci 502cabdff1aSopenharmony_ci#define TEST_SEEK_POS (1536) 503cabdff1aSopenharmony_ci#define TEST_STREAM_SIZE (2048) 504cabdff1aSopenharmony_ci 505cabdff1aSopenharmony_citypedef struct TestContext { 506cabdff1aSopenharmony_ci AVClass *class; 507cabdff1aSopenharmony_ci int64_t logical_pos; 508cabdff1aSopenharmony_ci int64_t logical_size; 509cabdff1aSopenharmony_ci 510cabdff1aSopenharmony_ci /* options */ 511cabdff1aSopenharmony_ci int opt_read_error; 512cabdff1aSopenharmony_ci} TestContext; 513cabdff1aSopenharmony_ci 514cabdff1aSopenharmony_cistatic int async_test_open(URLContext *h, const char *arg, int flags, AVDictionary **options) 515cabdff1aSopenharmony_ci{ 516cabdff1aSopenharmony_ci TestContext *c = h->priv_data; 517cabdff1aSopenharmony_ci c->logical_pos = 0; 518cabdff1aSopenharmony_ci c->logical_size = TEST_STREAM_SIZE; 519cabdff1aSopenharmony_ci return 0; 520cabdff1aSopenharmony_ci} 521cabdff1aSopenharmony_ci 522cabdff1aSopenharmony_cistatic int async_test_close(URLContext *h) 523cabdff1aSopenharmony_ci{ 524cabdff1aSopenharmony_ci return 0; 525cabdff1aSopenharmony_ci} 526cabdff1aSopenharmony_ci 527cabdff1aSopenharmony_cistatic int async_test_read(URLContext *h, unsigned char *buf, int size) 528cabdff1aSopenharmony_ci{ 529cabdff1aSopenharmony_ci TestContext *c = h->priv_data; 530cabdff1aSopenharmony_ci int i; 531cabdff1aSopenharmony_ci int read_len = 0; 532cabdff1aSopenharmony_ci 533cabdff1aSopenharmony_ci if (c->opt_read_error) 534cabdff1aSopenharmony_ci return c->opt_read_error; 535cabdff1aSopenharmony_ci 536cabdff1aSopenharmony_ci if (c->logical_pos >= c->logical_size) 537cabdff1aSopenharmony_ci return AVERROR_EOF; 538cabdff1aSopenharmony_ci 539cabdff1aSopenharmony_ci for (i = 0; i < size; ++i) { 540cabdff1aSopenharmony_ci buf[i] = c->logical_pos & 0xFF; 541cabdff1aSopenharmony_ci 542cabdff1aSopenharmony_ci c->logical_pos++; 543cabdff1aSopenharmony_ci read_len++; 544cabdff1aSopenharmony_ci 545cabdff1aSopenharmony_ci if (c->logical_pos >= c->logical_size) 546cabdff1aSopenharmony_ci break; 547cabdff1aSopenharmony_ci } 548cabdff1aSopenharmony_ci 549cabdff1aSopenharmony_ci return read_len; 550cabdff1aSopenharmony_ci} 551cabdff1aSopenharmony_ci 552cabdff1aSopenharmony_cistatic int64_t async_test_seek(URLContext *h, int64_t pos, int whence) 553cabdff1aSopenharmony_ci{ 554cabdff1aSopenharmony_ci TestContext *c = h->priv_data; 555cabdff1aSopenharmony_ci int64_t new_logical_pos; 556cabdff1aSopenharmony_ci 557cabdff1aSopenharmony_ci if (whence == AVSEEK_SIZE) { 558cabdff1aSopenharmony_ci return c->logical_size; 559cabdff1aSopenharmony_ci } else if (whence == SEEK_CUR) { 560cabdff1aSopenharmony_ci new_logical_pos = pos + c->logical_pos; 561cabdff1aSopenharmony_ci } else if (whence == SEEK_SET){ 562cabdff1aSopenharmony_ci new_logical_pos = pos; 563cabdff1aSopenharmony_ci } else { 564cabdff1aSopenharmony_ci return AVERROR(EINVAL); 565cabdff1aSopenharmony_ci } 566cabdff1aSopenharmony_ci if (new_logical_pos < 0) 567cabdff1aSopenharmony_ci return AVERROR(EINVAL); 568cabdff1aSopenharmony_ci 569cabdff1aSopenharmony_ci c->logical_pos = new_logical_pos; 570cabdff1aSopenharmony_ci return new_logical_pos; 571cabdff1aSopenharmony_ci} 572cabdff1aSopenharmony_ci 573cabdff1aSopenharmony_ci#define OFFSET(x) offsetof(TestContext, x) 574cabdff1aSopenharmony_ci#define D AV_OPT_FLAG_DECODING_PARAM 575cabdff1aSopenharmony_ci 576cabdff1aSopenharmony_cistatic const AVOption async_test_options[] = { 577cabdff1aSopenharmony_ci { "async-test-read-error", "cause read fail", 578cabdff1aSopenharmony_ci OFFSET(opt_read_error), AV_OPT_TYPE_INT, { .i64 = 0 }, INT_MIN, INT_MAX, .flags = D }, 579cabdff1aSopenharmony_ci {NULL}, 580cabdff1aSopenharmony_ci}; 581cabdff1aSopenharmony_ci 582cabdff1aSopenharmony_ci#undef D 583cabdff1aSopenharmony_ci#undef OFFSET 584cabdff1aSopenharmony_ci 585cabdff1aSopenharmony_cistatic const AVClass async_test_context_class = { 586cabdff1aSopenharmony_ci .class_name = "Async-Test", 587cabdff1aSopenharmony_ci .item_name = av_default_item_name, 588cabdff1aSopenharmony_ci .option = async_test_options, 589cabdff1aSopenharmony_ci .version = LIBAVUTIL_VERSION_INT, 590cabdff1aSopenharmony_ci}; 591cabdff1aSopenharmony_ci 592cabdff1aSopenharmony_ciconst URLProtocol ff_async_test_protocol = { 593cabdff1aSopenharmony_ci .name = "async-test", 594cabdff1aSopenharmony_ci .url_open2 = async_test_open, 595cabdff1aSopenharmony_ci .url_read = async_test_read, 596cabdff1aSopenharmony_ci .url_seek = async_test_seek, 597cabdff1aSopenharmony_ci .url_close = async_test_close, 598cabdff1aSopenharmony_ci .priv_data_size = sizeof(TestContext), 599cabdff1aSopenharmony_ci .priv_data_class = &async_test_context_class, 600cabdff1aSopenharmony_ci}; 601cabdff1aSopenharmony_ci 602cabdff1aSopenharmony_ciint main(void) 603cabdff1aSopenharmony_ci{ 604cabdff1aSopenharmony_ci URLContext *h = NULL; 605cabdff1aSopenharmony_ci int i; 606cabdff1aSopenharmony_ci int ret; 607cabdff1aSopenharmony_ci int64_t size; 608cabdff1aSopenharmony_ci int64_t pos; 609cabdff1aSopenharmony_ci int64_t read_len; 610cabdff1aSopenharmony_ci unsigned char buf[4096]; 611cabdff1aSopenharmony_ci AVDictionary *opts = NULL; 612cabdff1aSopenharmony_ci 613cabdff1aSopenharmony_ci ffurl_register_protocol(&ff_async_protocol); 614cabdff1aSopenharmony_ci ffurl_register_protocol(&ff_async_test_protocol); 615cabdff1aSopenharmony_ci 616cabdff1aSopenharmony_ci /* 617cabdff1aSopenharmony_ci * test normal read 618cabdff1aSopenharmony_ci */ 619cabdff1aSopenharmony_ci ret = ffurl_open_whitelist(&h, "async:async-test:", AVIO_FLAG_READ, 620cabdff1aSopenharmony_ci NULL, NULL, NULL, NULL, NULL); 621cabdff1aSopenharmony_ci printf("open: %d\n", ret); 622cabdff1aSopenharmony_ci 623cabdff1aSopenharmony_ci size = ffurl_size(h); 624cabdff1aSopenharmony_ci printf("size: %"PRId64"\n", size); 625cabdff1aSopenharmony_ci 626cabdff1aSopenharmony_ci pos = ffurl_seek(h, 0, SEEK_CUR); 627cabdff1aSopenharmony_ci read_len = 0; 628cabdff1aSopenharmony_ci while (1) { 629cabdff1aSopenharmony_ci ret = ffurl_read(h, buf, sizeof(buf)); 630cabdff1aSopenharmony_ci if (ret == AVERROR_EOF) { 631cabdff1aSopenharmony_ci printf("read-error: AVERROR_EOF at %"PRId64"\n", ffurl_seek(h, 0, SEEK_CUR)); 632cabdff1aSopenharmony_ci break; 633cabdff1aSopenharmony_ci } 634cabdff1aSopenharmony_ci else if (ret == 0) 635cabdff1aSopenharmony_ci break; 636cabdff1aSopenharmony_ci else if (ret < 0) { 637cabdff1aSopenharmony_ci printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR)); 638cabdff1aSopenharmony_ci goto fail; 639cabdff1aSopenharmony_ci } else { 640cabdff1aSopenharmony_ci for (i = 0; i < ret; ++i) { 641cabdff1aSopenharmony_ci if (buf[i] != (pos & 0xFF)) { 642cabdff1aSopenharmony_ci printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n", 643cabdff1aSopenharmony_ci (int)buf[i], (int)(pos & 0xFF), pos); 644cabdff1aSopenharmony_ci break; 645cabdff1aSopenharmony_ci } 646cabdff1aSopenharmony_ci pos++; 647cabdff1aSopenharmony_ci } 648cabdff1aSopenharmony_ci } 649cabdff1aSopenharmony_ci 650cabdff1aSopenharmony_ci read_len += ret; 651cabdff1aSopenharmony_ci } 652cabdff1aSopenharmony_ci printf("read: %"PRId64"\n", read_len); 653cabdff1aSopenharmony_ci 654cabdff1aSopenharmony_ci /* 655cabdff1aSopenharmony_ci * test normal seek 656cabdff1aSopenharmony_ci */ 657cabdff1aSopenharmony_ci ret = ffurl_read(h, buf, 1); 658cabdff1aSopenharmony_ci printf("read: %d\n", ret); 659cabdff1aSopenharmony_ci 660cabdff1aSopenharmony_ci pos = ffurl_seek(h, TEST_SEEK_POS, SEEK_SET); 661cabdff1aSopenharmony_ci printf("seek: %"PRId64"\n", pos); 662cabdff1aSopenharmony_ci 663cabdff1aSopenharmony_ci read_len = 0; 664cabdff1aSopenharmony_ci while (1) { 665cabdff1aSopenharmony_ci ret = ffurl_read(h, buf, sizeof(buf)); 666cabdff1aSopenharmony_ci if (ret == AVERROR_EOF) 667cabdff1aSopenharmony_ci break; 668cabdff1aSopenharmony_ci else if (ret == 0) 669cabdff1aSopenharmony_ci break; 670cabdff1aSopenharmony_ci else if (ret < 0) { 671cabdff1aSopenharmony_ci printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR)); 672cabdff1aSopenharmony_ci goto fail; 673cabdff1aSopenharmony_ci } else { 674cabdff1aSopenharmony_ci for (i = 0; i < ret; ++i) { 675cabdff1aSopenharmony_ci if (buf[i] != (pos & 0xFF)) { 676cabdff1aSopenharmony_ci printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n", 677cabdff1aSopenharmony_ci (int)buf[i], (int)(pos & 0xFF), pos); 678cabdff1aSopenharmony_ci break; 679cabdff1aSopenharmony_ci } 680cabdff1aSopenharmony_ci pos++; 681cabdff1aSopenharmony_ci } 682cabdff1aSopenharmony_ci } 683cabdff1aSopenharmony_ci 684cabdff1aSopenharmony_ci read_len += ret; 685cabdff1aSopenharmony_ci } 686cabdff1aSopenharmony_ci printf("read: %"PRId64"\n", read_len); 687cabdff1aSopenharmony_ci 688cabdff1aSopenharmony_ci ret = ffurl_read(h, buf, 1); 689cabdff1aSopenharmony_ci printf("read: %d\n", ret); 690cabdff1aSopenharmony_ci 691cabdff1aSopenharmony_ci /* 692cabdff1aSopenharmony_ci * test read error 693cabdff1aSopenharmony_ci */ 694cabdff1aSopenharmony_ci ffurl_close(h); 695cabdff1aSopenharmony_ci av_dict_set_int(&opts, "async-test-read-error", -10000, 0); 696cabdff1aSopenharmony_ci ret = ffurl_open_whitelist(&h, "async:async-test:", AVIO_FLAG_READ, 697cabdff1aSopenharmony_ci NULL, &opts, NULL, NULL, NULL); 698cabdff1aSopenharmony_ci printf("open: %d\n", ret); 699cabdff1aSopenharmony_ci 700cabdff1aSopenharmony_ci ret = ffurl_read(h, buf, 1); 701cabdff1aSopenharmony_ci printf("read: %d\n", ret); 702cabdff1aSopenharmony_ci 703cabdff1aSopenharmony_cifail: 704cabdff1aSopenharmony_ci av_dict_free(&opts); 705cabdff1aSopenharmony_ci ffurl_close(h); 706cabdff1aSopenharmony_ci return 0; 707cabdff1aSopenharmony_ci} 708cabdff1aSopenharmony_ci 709cabdff1aSopenharmony_ci#endif 710