1cabdff1aSopenharmony_ci/*
2cabdff1aSopenharmony_ci * Copyright (c) 2014 Nicolas George
3cabdff1aSopenharmony_ci *
4cabdff1aSopenharmony_ci * This file is part of FFmpeg.
5cabdff1aSopenharmony_ci *
6cabdff1aSopenharmony_ci * FFmpeg is free software; you can redistribute it and/or
7cabdff1aSopenharmony_ci * modify it under the terms of the GNU Lesser General Public License
8cabdff1aSopenharmony_ci * as published by the Free Software Foundation; either
9cabdff1aSopenharmony_ci * version 2.1 of the License, or (at your option) any later version.
10cabdff1aSopenharmony_ci *
11cabdff1aSopenharmony_ci * FFmpeg is distributed in the hope that it will be useful,
12cabdff1aSopenharmony_ci * but WITHOUT ANY WARRANTY; without even the implied warranty of
13cabdff1aSopenharmony_ci * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14cabdff1aSopenharmony_ci * GNU Lesser General Public License for more details.
15cabdff1aSopenharmony_ci *
16cabdff1aSopenharmony_ci * You should have received a copy of the GNU Lesser General Public License
17cabdff1aSopenharmony_ci * along with FFmpeg; if not, write to the Free Software Foundation, Inc.,
18cabdff1aSopenharmony_ci * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19cabdff1aSopenharmony_ci */
20cabdff1aSopenharmony_ci
21cabdff1aSopenharmony_ci#include <limits.h>
22cabdff1aSopenharmony_ci#include "fifo.h"
23cabdff1aSopenharmony_ci#include "mem.h"
24cabdff1aSopenharmony_ci#include "threadmessage.h"
25cabdff1aSopenharmony_ci#include "thread.h"
26cabdff1aSopenharmony_ci
27cabdff1aSopenharmony_cistruct AVThreadMessageQueue {
28cabdff1aSopenharmony_ci#if HAVE_THREADS
29cabdff1aSopenharmony_ci    AVFifo *fifo;
30cabdff1aSopenharmony_ci    pthread_mutex_t lock;
31cabdff1aSopenharmony_ci    pthread_cond_t cond_recv;
32cabdff1aSopenharmony_ci    pthread_cond_t cond_send;
33cabdff1aSopenharmony_ci    int err_send;
34cabdff1aSopenharmony_ci    int err_recv;
35cabdff1aSopenharmony_ci    unsigned elsize;
36cabdff1aSopenharmony_ci    void (*free_func)(void *msg);
37cabdff1aSopenharmony_ci#else
38cabdff1aSopenharmony_ci    int dummy;
39cabdff1aSopenharmony_ci#endif
40cabdff1aSopenharmony_ci};
41cabdff1aSopenharmony_ci
42cabdff1aSopenharmony_ciint av_thread_message_queue_alloc(AVThreadMessageQueue **mq,
43cabdff1aSopenharmony_ci                                  unsigned nelem,
44cabdff1aSopenharmony_ci                                  unsigned elsize)
45cabdff1aSopenharmony_ci{
46cabdff1aSopenharmony_ci#if HAVE_THREADS
47cabdff1aSopenharmony_ci    AVThreadMessageQueue *rmq;
48cabdff1aSopenharmony_ci    int ret = 0;
49cabdff1aSopenharmony_ci
50cabdff1aSopenharmony_ci    if (nelem > INT_MAX / elsize)
51cabdff1aSopenharmony_ci        return AVERROR(EINVAL);
52cabdff1aSopenharmony_ci    if (!(rmq = av_mallocz(sizeof(*rmq))))
53cabdff1aSopenharmony_ci        return AVERROR(ENOMEM);
54cabdff1aSopenharmony_ci    if ((ret = pthread_mutex_init(&rmq->lock, NULL))) {
55cabdff1aSopenharmony_ci        av_free(rmq);
56cabdff1aSopenharmony_ci        return AVERROR(ret);
57cabdff1aSopenharmony_ci    }
58cabdff1aSopenharmony_ci    if ((ret = pthread_cond_init(&rmq->cond_recv, NULL))) {
59cabdff1aSopenharmony_ci        pthread_mutex_destroy(&rmq->lock);
60cabdff1aSopenharmony_ci        av_free(rmq);
61cabdff1aSopenharmony_ci        return AVERROR(ret);
62cabdff1aSopenharmony_ci    }
63cabdff1aSopenharmony_ci    if ((ret = pthread_cond_init(&rmq->cond_send, NULL))) {
64cabdff1aSopenharmony_ci        pthread_cond_destroy(&rmq->cond_recv);
65cabdff1aSopenharmony_ci        pthread_mutex_destroy(&rmq->lock);
66cabdff1aSopenharmony_ci        av_free(rmq);
67cabdff1aSopenharmony_ci        return AVERROR(ret);
68cabdff1aSopenharmony_ci    }
69cabdff1aSopenharmony_ci    if (!(rmq->fifo = av_fifo_alloc2(nelem, elsize, 0))) {
70cabdff1aSopenharmony_ci        pthread_cond_destroy(&rmq->cond_send);
71cabdff1aSopenharmony_ci        pthread_cond_destroy(&rmq->cond_recv);
72cabdff1aSopenharmony_ci        pthread_mutex_destroy(&rmq->lock);
73cabdff1aSopenharmony_ci        av_free(rmq);
74cabdff1aSopenharmony_ci        return AVERROR(ENOMEM);
75cabdff1aSopenharmony_ci    }
76cabdff1aSopenharmony_ci    rmq->elsize = elsize;
77cabdff1aSopenharmony_ci    *mq = rmq;
78cabdff1aSopenharmony_ci    return 0;
79cabdff1aSopenharmony_ci#else
80cabdff1aSopenharmony_ci    *mq = NULL;
81cabdff1aSopenharmony_ci    return AVERROR(ENOSYS);
82cabdff1aSopenharmony_ci#endif /* HAVE_THREADS */
83cabdff1aSopenharmony_ci}
84cabdff1aSopenharmony_ci
85cabdff1aSopenharmony_civoid av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq,
86cabdff1aSopenharmony_ci                                           void (*free_func)(void *msg))
87cabdff1aSopenharmony_ci{
88cabdff1aSopenharmony_ci#if HAVE_THREADS
89cabdff1aSopenharmony_ci    mq->free_func = free_func;
90cabdff1aSopenharmony_ci#endif
91cabdff1aSopenharmony_ci}
92cabdff1aSopenharmony_ci
93cabdff1aSopenharmony_civoid av_thread_message_queue_free(AVThreadMessageQueue **mq)
94cabdff1aSopenharmony_ci{
95cabdff1aSopenharmony_ci#if HAVE_THREADS
96cabdff1aSopenharmony_ci    if (*mq) {
97cabdff1aSopenharmony_ci        av_thread_message_flush(*mq);
98cabdff1aSopenharmony_ci        av_fifo_freep2(&(*mq)->fifo);
99cabdff1aSopenharmony_ci        pthread_cond_destroy(&(*mq)->cond_send);
100cabdff1aSopenharmony_ci        pthread_cond_destroy(&(*mq)->cond_recv);
101cabdff1aSopenharmony_ci        pthread_mutex_destroy(&(*mq)->lock);
102cabdff1aSopenharmony_ci        av_freep(mq);
103cabdff1aSopenharmony_ci    }
104cabdff1aSopenharmony_ci#endif
105cabdff1aSopenharmony_ci}
106cabdff1aSopenharmony_ci
107cabdff1aSopenharmony_ciint av_thread_message_queue_nb_elems(AVThreadMessageQueue *mq)
108cabdff1aSopenharmony_ci{
109cabdff1aSopenharmony_ci#if HAVE_THREADS
110cabdff1aSopenharmony_ci    int ret;
111cabdff1aSopenharmony_ci    pthread_mutex_lock(&mq->lock);
112cabdff1aSopenharmony_ci    ret = av_fifo_can_read(mq->fifo);
113cabdff1aSopenharmony_ci    pthread_mutex_unlock(&mq->lock);
114cabdff1aSopenharmony_ci    return ret;
115cabdff1aSopenharmony_ci#else
116cabdff1aSopenharmony_ci    return AVERROR(ENOSYS);
117cabdff1aSopenharmony_ci#endif
118cabdff1aSopenharmony_ci}
119cabdff1aSopenharmony_ci
120cabdff1aSopenharmony_ci#if HAVE_THREADS
121cabdff1aSopenharmony_ci
122cabdff1aSopenharmony_cistatic int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq,
123cabdff1aSopenharmony_ci                                               void *msg,
124cabdff1aSopenharmony_ci                                               unsigned flags)
125cabdff1aSopenharmony_ci{
126cabdff1aSopenharmony_ci    while (!mq->err_send && !av_fifo_can_write(mq->fifo)) {
127cabdff1aSopenharmony_ci        if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
128cabdff1aSopenharmony_ci            return AVERROR(EAGAIN);
129cabdff1aSopenharmony_ci        pthread_cond_wait(&mq->cond_send, &mq->lock);
130cabdff1aSopenharmony_ci    }
131cabdff1aSopenharmony_ci    if (mq->err_send)
132cabdff1aSopenharmony_ci        return mq->err_send;
133cabdff1aSopenharmony_ci    av_fifo_write(mq->fifo, msg, 1);
134cabdff1aSopenharmony_ci    /* one message is sent, signal one receiver */
135cabdff1aSopenharmony_ci    pthread_cond_signal(&mq->cond_recv);
136cabdff1aSopenharmony_ci    return 0;
137cabdff1aSopenharmony_ci}
138cabdff1aSopenharmony_ci
139cabdff1aSopenharmony_cistatic int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq,
140cabdff1aSopenharmony_ci                                               void *msg,
141cabdff1aSopenharmony_ci                                               unsigned flags)
142cabdff1aSopenharmony_ci{
143cabdff1aSopenharmony_ci    while (!mq->err_recv && !av_fifo_can_read(mq->fifo)) {
144cabdff1aSopenharmony_ci        if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
145cabdff1aSopenharmony_ci            return AVERROR(EAGAIN);
146cabdff1aSopenharmony_ci        pthread_cond_wait(&mq->cond_recv, &mq->lock);
147cabdff1aSopenharmony_ci    }
148cabdff1aSopenharmony_ci    if (!av_fifo_can_read(mq->fifo))
149cabdff1aSopenharmony_ci        return mq->err_recv;
150cabdff1aSopenharmony_ci    av_fifo_read(mq->fifo, msg, 1);
151cabdff1aSopenharmony_ci    /* one message space appeared, signal one sender */
152cabdff1aSopenharmony_ci    pthread_cond_signal(&mq->cond_send);
153cabdff1aSopenharmony_ci    return 0;
154cabdff1aSopenharmony_ci}
155cabdff1aSopenharmony_ci
156cabdff1aSopenharmony_ci#endif /* HAVE_THREADS */
157cabdff1aSopenharmony_ci
158cabdff1aSopenharmony_ciint av_thread_message_queue_send(AVThreadMessageQueue *mq,
159cabdff1aSopenharmony_ci                                 void *msg,
160cabdff1aSopenharmony_ci                                 unsigned flags)
161cabdff1aSopenharmony_ci{
162cabdff1aSopenharmony_ci#if HAVE_THREADS
163cabdff1aSopenharmony_ci    int ret;
164cabdff1aSopenharmony_ci
165cabdff1aSopenharmony_ci    pthread_mutex_lock(&mq->lock);
166cabdff1aSopenharmony_ci    ret = av_thread_message_queue_send_locked(mq, msg, flags);
167cabdff1aSopenharmony_ci    pthread_mutex_unlock(&mq->lock);
168cabdff1aSopenharmony_ci    return ret;
169cabdff1aSopenharmony_ci#else
170cabdff1aSopenharmony_ci    return AVERROR(ENOSYS);
171cabdff1aSopenharmony_ci#endif /* HAVE_THREADS */
172cabdff1aSopenharmony_ci}
173cabdff1aSopenharmony_ci
174cabdff1aSopenharmony_ciint av_thread_message_queue_recv(AVThreadMessageQueue *mq,
175cabdff1aSopenharmony_ci                                 void *msg,
176cabdff1aSopenharmony_ci                                 unsigned flags)
177cabdff1aSopenharmony_ci{
178cabdff1aSopenharmony_ci#if HAVE_THREADS
179cabdff1aSopenharmony_ci    int ret;
180cabdff1aSopenharmony_ci
181cabdff1aSopenharmony_ci    pthread_mutex_lock(&mq->lock);
182cabdff1aSopenharmony_ci    ret = av_thread_message_queue_recv_locked(mq, msg, flags);
183cabdff1aSopenharmony_ci    pthread_mutex_unlock(&mq->lock);
184cabdff1aSopenharmony_ci    return ret;
185cabdff1aSopenharmony_ci#else
186cabdff1aSopenharmony_ci    return AVERROR(ENOSYS);
187cabdff1aSopenharmony_ci#endif /* HAVE_THREADS */
188cabdff1aSopenharmony_ci}
189cabdff1aSopenharmony_ci
190cabdff1aSopenharmony_civoid av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq,
191cabdff1aSopenharmony_ci                                          int err)
192cabdff1aSopenharmony_ci{
193cabdff1aSopenharmony_ci#if HAVE_THREADS
194cabdff1aSopenharmony_ci    pthread_mutex_lock(&mq->lock);
195cabdff1aSopenharmony_ci    mq->err_send = err;
196cabdff1aSopenharmony_ci    pthread_cond_broadcast(&mq->cond_send);
197cabdff1aSopenharmony_ci    pthread_mutex_unlock(&mq->lock);
198cabdff1aSopenharmony_ci#endif /* HAVE_THREADS */
199cabdff1aSopenharmony_ci}
200cabdff1aSopenharmony_ci
201cabdff1aSopenharmony_civoid av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq,
202cabdff1aSopenharmony_ci                                          int err)
203cabdff1aSopenharmony_ci{
204cabdff1aSopenharmony_ci#if HAVE_THREADS
205cabdff1aSopenharmony_ci    pthread_mutex_lock(&mq->lock);
206cabdff1aSopenharmony_ci    mq->err_recv = err;
207cabdff1aSopenharmony_ci    pthread_cond_broadcast(&mq->cond_recv);
208cabdff1aSopenharmony_ci    pthread_mutex_unlock(&mq->lock);
209cabdff1aSopenharmony_ci#endif /* HAVE_THREADS */
210cabdff1aSopenharmony_ci}
211cabdff1aSopenharmony_ci
212cabdff1aSopenharmony_ci#if HAVE_THREADS
213cabdff1aSopenharmony_cistatic int free_func_wrap(void *arg, void *buf, size_t *nb_elems)
214cabdff1aSopenharmony_ci{
215cabdff1aSopenharmony_ci    AVThreadMessageQueue *mq = arg;
216cabdff1aSopenharmony_ci    uint8_t *msg = buf;
217cabdff1aSopenharmony_ci    for (size_t i = 0; i < *nb_elems; i++)
218cabdff1aSopenharmony_ci        mq->free_func(msg + i * mq->elsize);
219cabdff1aSopenharmony_ci    return 0;
220cabdff1aSopenharmony_ci}
221cabdff1aSopenharmony_ci#endif
222cabdff1aSopenharmony_ci
223cabdff1aSopenharmony_civoid av_thread_message_flush(AVThreadMessageQueue *mq)
224cabdff1aSopenharmony_ci{
225cabdff1aSopenharmony_ci#if HAVE_THREADS
226cabdff1aSopenharmony_ci    size_t used;
227cabdff1aSopenharmony_ci
228cabdff1aSopenharmony_ci    pthread_mutex_lock(&mq->lock);
229cabdff1aSopenharmony_ci    used = av_fifo_can_read(mq->fifo);
230cabdff1aSopenharmony_ci    if (mq->free_func)
231cabdff1aSopenharmony_ci        av_fifo_read_to_cb(mq->fifo, free_func_wrap, mq, &used);
232cabdff1aSopenharmony_ci    /* only the senders need to be notified since the queue is empty and there
233cabdff1aSopenharmony_ci     * is nothing to read */
234cabdff1aSopenharmony_ci    pthread_cond_broadcast(&mq->cond_send);
235cabdff1aSopenharmony_ci    pthread_mutex_unlock(&mq->lock);
236cabdff1aSopenharmony_ci#endif /* HAVE_THREADS */
237cabdff1aSopenharmony_ci}
238