1cabdff1aSopenharmony_ci/* 2cabdff1aSopenharmony_ci * Copyright (c) 2014 Nicolas George 3cabdff1aSopenharmony_ci * 4cabdff1aSopenharmony_ci * This file is part of FFmpeg. 5cabdff1aSopenharmony_ci * 6cabdff1aSopenharmony_ci * FFmpeg is free software; you can redistribute it and/or 7cabdff1aSopenharmony_ci * modify it under the terms of the GNU Lesser General Public License 8cabdff1aSopenharmony_ci * as published by the Free Software Foundation; either 9cabdff1aSopenharmony_ci * version 2.1 of the License, or (at your option) any later version. 10cabdff1aSopenharmony_ci * 11cabdff1aSopenharmony_ci * FFmpeg is distributed in the hope that it will be useful, 12cabdff1aSopenharmony_ci * but WITHOUT ANY WARRANTY; without even the implied warranty of 13cabdff1aSopenharmony_ci * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14cabdff1aSopenharmony_ci * GNU Lesser General Public License for more details. 15cabdff1aSopenharmony_ci * 16cabdff1aSopenharmony_ci * You should have received a copy of the GNU Lesser General Public License 17cabdff1aSopenharmony_ci * along with FFmpeg; if not, write to the Free Software Foundation, Inc., 18cabdff1aSopenharmony_ci * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 19cabdff1aSopenharmony_ci */ 20cabdff1aSopenharmony_ci 21cabdff1aSopenharmony_ci#include <limits.h> 22cabdff1aSopenharmony_ci#include "fifo.h" 23cabdff1aSopenharmony_ci#include "mem.h" 24cabdff1aSopenharmony_ci#include "threadmessage.h" 25cabdff1aSopenharmony_ci#include "thread.h" 26cabdff1aSopenharmony_ci 27cabdff1aSopenharmony_cistruct AVThreadMessageQueue { 28cabdff1aSopenharmony_ci#if HAVE_THREADS 29cabdff1aSopenharmony_ci AVFifo *fifo; 30cabdff1aSopenharmony_ci pthread_mutex_t lock; 31cabdff1aSopenharmony_ci pthread_cond_t cond_recv; 32cabdff1aSopenharmony_ci pthread_cond_t cond_send; 33cabdff1aSopenharmony_ci int err_send; 34cabdff1aSopenharmony_ci int err_recv; 35cabdff1aSopenharmony_ci unsigned elsize; 36cabdff1aSopenharmony_ci void (*free_func)(void *msg); 37cabdff1aSopenharmony_ci#else 38cabdff1aSopenharmony_ci int dummy; 39cabdff1aSopenharmony_ci#endif 40cabdff1aSopenharmony_ci}; 41cabdff1aSopenharmony_ci 42cabdff1aSopenharmony_ciint av_thread_message_queue_alloc(AVThreadMessageQueue **mq, 43cabdff1aSopenharmony_ci unsigned nelem, 44cabdff1aSopenharmony_ci unsigned elsize) 45cabdff1aSopenharmony_ci{ 46cabdff1aSopenharmony_ci#if HAVE_THREADS 47cabdff1aSopenharmony_ci AVThreadMessageQueue *rmq; 48cabdff1aSopenharmony_ci int ret = 0; 49cabdff1aSopenharmony_ci 50cabdff1aSopenharmony_ci if (nelem > INT_MAX / elsize) 51cabdff1aSopenharmony_ci return AVERROR(EINVAL); 52cabdff1aSopenharmony_ci if (!(rmq = av_mallocz(sizeof(*rmq)))) 53cabdff1aSopenharmony_ci return AVERROR(ENOMEM); 54cabdff1aSopenharmony_ci if ((ret = pthread_mutex_init(&rmq->lock, NULL))) { 55cabdff1aSopenharmony_ci av_free(rmq); 56cabdff1aSopenharmony_ci return AVERROR(ret); 57cabdff1aSopenharmony_ci } 58cabdff1aSopenharmony_ci if ((ret = pthread_cond_init(&rmq->cond_recv, NULL))) { 59cabdff1aSopenharmony_ci pthread_mutex_destroy(&rmq->lock); 60cabdff1aSopenharmony_ci av_free(rmq); 61cabdff1aSopenharmony_ci return AVERROR(ret); 62cabdff1aSopenharmony_ci } 63cabdff1aSopenharmony_ci if ((ret = pthread_cond_init(&rmq->cond_send, NULL))) { 64cabdff1aSopenharmony_ci pthread_cond_destroy(&rmq->cond_recv); 65cabdff1aSopenharmony_ci pthread_mutex_destroy(&rmq->lock); 66cabdff1aSopenharmony_ci av_free(rmq); 67cabdff1aSopenharmony_ci return AVERROR(ret); 68cabdff1aSopenharmony_ci } 69cabdff1aSopenharmony_ci if (!(rmq->fifo = av_fifo_alloc2(nelem, elsize, 0))) { 70cabdff1aSopenharmony_ci pthread_cond_destroy(&rmq->cond_send); 71cabdff1aSopenharmony_ci pthread_cond_destroy(&rmq->cond_recv); 72cabdff1aSopenharmony_ci pthread_mutex_destroy(&rmq->lock); 73cabdff1aSopenharmony_ci av_free(rmq); 74cabdff1aSopenharmony_ci return AVERROR(ENOMEM); 75cabdff1aSopenharmony_ci } 76cabdff1aSopenharmony_ci rmq->elsize = elsize; 77cabdff1aSopenharmony_ci *mq = rmq; 78cabdff1aSopenharmony_ci return 0; 79cabdff1aSopenharmony_ci#else 80cabdff1aSopenharmony_ci *mq = NULL; 81cabdff1aSopenharmony_ci return AVERROR(ENOSYS); 82cabdff1aSopenharmony_ci#endif /* HAVE_THREADS */ 83cabdff1aSopenharmony_ci} 84cabdff1aSopenharmony_ci 85cabdff1aSopenharmony_civoid av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, 86cabdff1aSopenharmony_ci void (*free_func)(void *msg)) 87cabdff1aSopenharmony_ci{ 88cabdff1aSopenharmony_ci#if HAVE_THREADS 89cabdff1aSopenharmony_ci mq->free_func = free_func; 90cabdff1aSopenharmony_ci#endif 91cabdff1aSopenharmony_ci} 92cabdff1aSopenharmony_ci 93cabdff1aSopenharmony_civoid av_thread_message_queue_free(AVThreadMessageQueue **mq) 94cabdff1aSopenharmony_ci{ 95cabdff1aSopenharmony_ci#if HAVE_THREADS 96cabdff1aSopenharmony_ci if (*mq) { 97cabdff1aSopenharmony_ci av_thread_message_flush(*mq); 98cabdff1aSopenharmony_ci av_fifo_freep2(&(*mq)->fifo); 99cabdff1aSopenharmony_ci pthread_cond_destroy(&(*mq)->cond_send); 100cabdff1aSopenharmony_ci pthread_cond_destroy(&(*mq)->cond_recv); 101cabdff1aSopenharmony_ci pthread_mutex_destroy(&(*mq)->lock); 102cabdff1aSopenharmony_ci av_freep(mq); 103cabdff1aSopenharmony_ci } 104cabdff1aSopenharmony_ci#endif 105cabdff1aSopenharmony_ci} 106cabdff1aSopenharmony_ci 107cabdff1aSopenharmony_ciint av_thread_message_queue_nb_elems(AVThreadMessageQueue *mq) 108cabdff1aSopenharmony_ci{ 109cabdff1aSopenharmony_ci#if HAVE_THREADS 110cabdff1aSopenharmony_ci int ret; 111cabdff1aSopenharmony_ci pthread_mutex_lock(&mq->lock); 112cabdff1aSopenharmony_ci ret = av_fifo_can_read(mq->fifo); 113cabdff1aSopenharmony_ci pthread_mutex_unlock(&mq->lock); 114cabdff1aSopenharmony_ci return ret; 115cabdff1aSopenharmony_ci#else 116cabdff1aSopenharmony_ci return AVERROR(ENOSYS); 117cabdff1aSopenharmony_ci#endif 118cabdff1aSopenharmony_ci} 119cabdff1aSopenharmony_ci 120cabdff1aSopenharmony_ci#if HAVE_THREADS 121cabdff1aSopenharmony_ci 122cabdff1aSopenharmony_cistatic int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq, 123cabdff1aSopenharmony_ci void *msg, 124cabdff1aSopenharmony_ci unsigned flags) 125cabdff1aSopenharmony_ci{ 126cabdff1aSopenharmony_ci while (!mq->err_send && !av_fifo_can_write(mq->fifo)) { 127cabdff1aSopenharmony_ci if ((flags & AV_THREAD_MESSAGE_NONBLOCK)) 128cabdff1aSopenharmony_ci return AVERROR(EAGAIN); 129cabdff1aSopenharmony_ci pthread_cond_wait(&mq->cond_send, &mq->lock); 130cabdff1aSopenharmony_ci } 131cabdff1aSopenharmony_ci if (mq->err_send) 132cabdff1aSopenharmony_ci return mq->err_send; 133cabdff1aSopenharmony_ci av_fifo_write(mq->fifo, msg, 1); 134cabdff1aSopenharmony_ci /* one message is sent, signal one receiver */ 135cabdff1aSopenharmony_ci pthread_cond_signal(&mq->cond_recv); 136cabdff1aSopenharmony_ci return 0; 137cabdff1aSopenharmony_ci} 138cabdff1aSopenharmony_ci 139cabdff1aSopenharmony_cistatic int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq, 140cabdff1aSopenharmony_ci void *msg, 141cabdff1aSopenharmony_ci unsigned flags) 142cabdff1aSopenharmony_ci{ 143cabdff1aSopenharmony_ci while (!mq->err_recv && !av_fifo_can_read(mq->fifo)) { 144cabdff1aSopenharmony_ci if ((flags & AV_THREAD_MESSAGE_NONBLOCK)) 145cabdff1aSopenharmony_ci return AVERROR(EAGAIN); 146cabdff1aSopenharmony_ci pthread_cond_wait(&mq->cond_recv, &mq->lock); 147cabdff1aSopenharmony_ci } 148cabdff1aSopenharmony_ci if (!av_fifo_can_read(mq->fifo)) 149cabdff1aSopenharmony_ci return mq->err_recv; 150cabdff1aSopenharmony_ci av_fifo_read(mq->fifo, msg, 1); 151cabdff1aSopenharmony_ci /* one message space appeared, signal one sender */ 152cabdff1aSopenharmony_ci pthread_cond_signal(&mq->cond_send); 153cabdff1aSopenharmony_ci return 0; 154cabdff1aSopenharmony_ci} 155cabdff1aSopenharmony_ci 156cabdff1aSopenharmony_ci#endif /* HAVE_THREADS */ 157cabdff1aSopenharmony_ci 158cabdff1aSopenharmony_ciint av_thread_message_queue_send(AVThreadMessageQueue *mq, 159cabdff1aSopenharmony_ci void *msg, 160cabdff1aSopenharmony_ci unsigned flags) 161cabdff1aSopenharmony_ci{ 162cabdff1aSopenharmony_ci#if HAVE_THREADS 163cabdff1aSopenharmony_ci int ret; 164cabdff1aSopenharmony_ci 165cabdff1aSopenharmony_ci pthread_mutex_lock(&mq->lock); 166cabdff1aSopenharmony_ci ret = av_thread_message_queue_send_locked(mq, msg, flags); 167cabdff1aSopenharmony_ci pthread_mutex_unlock(&mq->lock); 168cabdff1aSopenharmony_ci return ret; 169cabdff1aSopenharmony_ci#else 170cabdff1aSopenharmony_ci return AVERROR(ENOSYS); 171cabdff1aSopenharmony_ci#endif /* HAVE_THREADS */ 172cabdff1aSopenharmony_ci} 173cabdff1aSopenharmony_ci 174cabdff1aSopenharmony_ciint av_thread_message_queue_recv(AVThreadMessageQueue *mq, 175cabdff1aSopenharmony_ci void *msg, 176cabdff1aSopenharmony_ci unsigned flags) 177cabdff1aSopenharmony_ci{ 178cabdff1aSopenharmony_ci#if HAVE_THREADS 179cabdff1aSopenharmony_ci int ret; 180cabdff1aSopenharmony_ci 181cabdff1aSopenharmony_ci pthread_mutex_lock(&mq->lock); 182cabdff1aSopenharmony_ci ret = av_thread_message_queue_recv_locked(mq, msg, flags); 183cabdff1aSopenharmony_ci pthread_mutex_unlock(&mq->lock); 184cabdff1aSopenharmony_ci return ret; 185cabdff1aSopenharmony_ci#else 186cabdff1aSopenharmony_ci return AVERROR(ENOSYS); 187cabdff1aSopenharmony_ci#endif /* HAVE_THREADS */ 188cabdff1aSopenharmony_ci} 189cabdff1aSopenharmony_ci 190cabdff1aSopenharmony_civoid av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, 191cabdff1aSopenharmony_ci int err) 192cabdff1aSopenharmony_ci{ 193cabdff1aSopenharmony_ci#if HAVE_THREADS 194cabdff1aSopenharmony_ci pthread_mutex_lock(&mq->lock); 195cabdff1aSopenharmony_ci mq->err_send = err; 196cabdff1aSopenharmony_ci pthread_cond_broadcast(&mq->cond_send); 197cabdff1aSopenharmony_ci pthread_mutex_unlock(&mq->lock); 198cabdff1aSopenharmony_ci#endif /* HAVE_THREADS */ 199cabdff1aSopenharmony_ci} 200cabdff1aSopenharmony_ci 201cabdff1aSopenharmony_civoid av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, 202cabdff1aSopenharmony_ci int err) 203cabdff1aSopenharmony_ci{ 204cabdff1aSopenharmony_ci#if HAVE_THREADS 205cabdff1aSopenharmony_ci pthread_mutex_lock(&mq->lock); 206cabdff1aSopenharmony_ci mq->err_recv = err; 207cabdff1aSopenharmony_ci pthread_cond_broadcast(&mq->cond_recv); 208cabdff1aSopenharmony_ci pthread_mutex_unlock(&mq->lock); 209cabdff1aSopenharmony_ci#endif /* HAVE_THREADS */ 210cabdff1aSopenharmony_ci} 211cabdff1aSopenharmony_ci 212cabdff1aSopenharmony_ci#if HAVE_THREADS 213cabdff1aSopenharmony_cistatic int free_func_wrap(void *arg, void *buf, size_t *nb_elems) 214cabdff1aSopenharmony_ci{ 215cabdff1aSopenharmony_ci AVThreadMessageQueue *mq = arg; 216cabdff1aSopenharmony_ci uint8_t *msg = buf; 217cabdff1aSopenharmony_ci for (size_t i = 0; i < *nb_elems; i++) 218cabdff1aSopenharmony_ci mq->free_func(msg + i * mq->elsize); 219cabdff1aSopenharmony_ci return 0; 220cabdff1aSopenharmony_ci} 221cabdff1aSopenharmony_ci#endif 222cabdff1aSopenharmony_ci 223cabdff1aSopenharmony_civoid av_thread_message_flush(AVThreadMessageQueue *mq) 224cabdff1aSopenharmony_ci{ 225cabdff1aSopenharmony_ci#if HAVE_THREADS 226cabdff1aSopenharmony_ci size_t used; 227cabdff1aSopenharmony_ci 228cabdff1aSopenharmony_ci pthread_mutex_lock(&mq->lock); 229cabdff1aSopenharmony_ci used = av_fifo_can_read(mq->fifo); 230cabdff1aSopenharmony_ci if (mq->free_func) 231cabdff1aSopenharmony_ci av_fifo_read_to_cb(mq->fifo, free_func_wrap, mq, &used); 232cabdff1aSopenharmony_ci /* only the senders need to be notified since the queue is empty and there 233cabdff1aSopenharmony_ci * is nothing to read */ 234cabdff1aSopenharmony_ci pthread_cond_broadcast(&mq->cond_send); 235cabdff1aSopenharmony_ci pthread_mutex_unlock(&mq->lock); 236cabdff1aSopenharmony_ci#endif /* HAVE_THREADS */ 237cabdff1aSopenharmony_ci} 238