1/* 2 * Copyright (c) 2014 Nicolas George 3 * 4 * This file is part of FFmpeg. 5 * 6 * FFmpeg is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public License 8 * as published by the Free Software Foundation; either 9 * version 2.1 of the License, or (at your option) any later version. 10 * 11 * FFmpeg is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public License 17 * along with FFmpeg; if not, write to the Free Software Foundation, Inc., 18 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 19 */ 20 21#include <limits.h> 22#include "fifo.h" 23#include "mem.h" 24#include "threadmessage.h" 25#include "thread.h" 26 27struct AVThreadMessageQueue { 28#if HAVE_THREADS 29 AVFifo *fifo; 30 pthread_mutex_t lock; 31 pthread_cond_t cond_recv; 32 pthread_cond_t cond_send; 33 int err_send; 34 int err_recv; 35 unsigned elsize; 36 void (*free_func)(void *msg); 37#else 38 int dummy; 39#endif 40}; 41 42int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, 43 unsigned nelem, 44 unsigned elsize) 45{ 46#if HAVE_THREADS 47 AVThreadMessageQueue *rmq; 48 int ret = 0; 49 50 if (nelem > INT_MAX / elsize) 51 return AVERROR(EINVAL); 52 if (!(rmq = av_mallocz(sizeof(*rmq)))) 53 return AVERROR(ENOMEM); 54 if ((ret = pthread_mutex_init(&rmq->lock, NULL))) { 55 av_free(rmq); 56 return AVERROR(ret); 57 } 58 if ((ret = pthread_cond_init(&rmq->cond_recv, NULL))) { 59 pthread_mutex_destroy(&rmq->lock); 60 av_free(rmq); 61 return AVERROR(ret); 62 } 63 if ((ret = pthread_cond_init(&rmq->cond_send, NULL))) { 64 pthread_cond_destroy(&rmq->cond_recv); 65 pthread_mutex_destroy(&rmq->lock); 66 av_free(rmq); 67 return AVERROR(ret); 68 } 69 if (!(rmq->fifo = av_fifo_alloc2(nelem, elsize, 0))) { 70 pthread_cond_destroy(&rmq->cond_send); 71 pthread_cond_destroy(&rmq->cond_recv); 72 pthread_mutex_destroy(&rmq->lock); 73 av_free(rmq); 74 return AVERROR(ENOMEM); 75 } 76 rmq->elsize = elsize; 77 *mq = rmq; 78 return 0; 79#else 80 *mq = NULL; 81 return AVERROR(ENOSYS); 82#endif /* HAVE_THREADS */ 83} 84 85void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, 86 void (*free_func)(void *msg)) 87{ 88#if HAVE_THREADS 89 mq->free_func = free_func; 90#endif 91} 92 93void av_thread_message_queue_free(AVThreadMessageQueue **mq) 94{ 95#if HAVE_THREADS 96 if (*mq) { 97 av_thread_message_flush(*mq); 98 av_fifo_freep2(&(*mq)->fifo); 99 pthread_cond_destroy(&(*mq)->cond_send); 100 pthread_cond_destroy(&(*mq)->cond_recv); 101 pthread_mutex_destroy(&(*mq)->lock); 102 av_freep(mq); 103 } 104#endif 105} 106 107int av_thread_message_queue_nb_elems(AVThreadMessageQueue *mq) 108{ 109#if HAVE_THREADS 110 int ret; 111 pthread_mutex_lock(&mq->lock); 112 ret = av_fifo_can_read(mq->fifo); 113 pthread_mutex_unlock(&mq->lock); 114 return ret; 115#else 116 return AVERROR(ENOSYS); 117#endif 118} 119 120#if HAVE_THREADS 121 122static int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq, 123 void *msg, 124 unsigned flags) 125{ 126 while (!mq->err_send && !av_fifo_can_write(mq->fifo)) { 127 if ((flags & AV_THREAD_MESSAGE_NONBLOCK)) 128 return AVERROR(EAGAIN); 129 pthread_cond_wait(&mq->cond_send, &mq->lock); 130 } 131 if (mq->err_send) 132 return mq->err_send; 133 av_fifo_write(mq->fifo, msg, 1); 134 /* one message is sent, signal one receiver */ 135 pthread_cond_signal(&mq->cond_recv); 136 return 0; 137} 138 139static int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq, 140 void *msg, 141 unsigned flags) 142{ 143 while (!mq->err_recv && !av_fifo_can_read(mq->fifo)) { 144 if ((flags & AV_THREAD_MESSAGE_NONBLOCK)) 145 return AVERROR(EAGAIN); 146 pthread_cond_wait(&mq->cond_recv, &mq->lock); 147 } 148 if (!av_fifo_can_read(mq->fifo)) 149 return mq->err_recv; 150 av_fifo_read(mq->fifo, msg, 1); 151 /* one message space appeared, signal one sender */ 152 pthread_cond_signal(&mq->cond_send); 153 return 0; 154} 155 156#endif /* HAVE_THREADS */ 157 158int av_thread_message_queue_send(AVThreadMessageQueue *mq, 159 void *msg, 160 unsigned flags) 161{ 162#if HAVE_THREADS 163 int ret; 164 165 pthread_mutex_lock(&mq->lock); 166 ret = av_thread_message_queue_send_locked(mq, msg, flags); 167 pthread_mutex_unlock(&mq->lock); 168 return ret; 169#else 170 return AVERROR(ENOSYS); 171#endif /* HAVE_THREADS */ 172} 173 174int av_thread_message_queue_recv(AVThreadMessageQueue *mq, 175 void *msg, 176 unsigned flags) 177{ 178#if HAVE_THREADS 179 int ret; 180 181 pthread_mutex_lock(&mq->lock); 182 ret = av_thread_message_queue_recv_locked(mq, msg, flags); 183 pthread_mutex_unlock(&mq->lock); 184 return ret; 185#else 186 return AVERROR(ENOSYS); 187#endif /* HAVE_THREADS */ 188} 189 190void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, 191 int err) 192{ 193#if HAVE_THREADS 194 pthread_mutex_lock(&mq->lock); 195 mq->err_send = err; 196 pthread_cond_broadcast(&mq->cond_send); 197 pthread_mutex_unlock(&mq->lock); 198#endif /* HAVE_THREADS */ 199} 200 201void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, 202 int err) 203{ 204#if HAVE_THREADS 205 pthread_mutex_lock(&mq->lock); 206 mq->err_recv = err; 207 pthread_cond_broadcast(&mq->cond_recv); 208 pthread_mutex_unlock(&mq->lock); 209#endif /* HAVE_THREADS */ 210} 211 212#if HAVE_THREADS 213static int free_func_wrap(void *arg, void *buf, size_t *nb_elems) 214{ 215 AVThreadMessageQueue *mq = arg; 216 uint8_t *msg = buf; 217 for (size_t i = 0; i < *nb_elems; i++) 218 mq->free_func(msg + i * mq->elsize); 219 return 0; 220} 221#endif 222 223void av_thread_message_flush(AVThreadMessageQueue *mq) 224{ 225#if HAVE_THREADS 226 size_t used; 227 228 pthread_mutex_lock(&mq->lock); 229 used = av_fifo_can_read(mq->fifo); 230 if (mq->free_func) 231 av_fifo_read_to_cb(mq->fifo, free_func_wrap, mq, &used); 232 /* only the senders need to be notified since the queue is empty and there 233 * is nothing to read */ 234 pthread_cond_broadcast(&mq->cond_send); 235 pthread_mutex_unlock(&mq->lock); 236#endif /* HAVE_THREADS */ 237} 238