1/*
2 * This file is part of FFmpeg.
3 *
4 * FFmpeg is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * FFmpeg is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with FFmpeg; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
19#include <stdatomic.h>
20#include "cpu.h"
21#include "internal.h"
22#include "slicethread.h"
23#include "mem.h"
24#include "thread.h"
25#include "avassert.h"
26
27#if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
28
29typedef struct WorkerContext {
30    AVSliceThread   *ctx;
31    pthread_mutex_t mutex;
32    pthread_cond_t  cond;
33    pthread_t       thread;
34    int             done;
35} WorkerContext;
36
37struct AVSliceThread {
38    WorkerContext   *workers;
39    int             nb_threads;
40    int             nb_active_threads;
41    int             nb_jobs;
42
43    atomic_uint     first_job;
44    atomic_uint     current_job;
45    pthread_mutex_t done_mutex;
46    pthread_cond_t  done_cond;
47    int             done;
48    int             finished;
49
50    void            *priv;
51    void            (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
52    void            (*main_func)(void *priv);
53};
54
55static int run_jobs(AVSliceThread *ctx)
56{
57    unsigned nb_jobs    = ctx->nb_jobs;
58    unsigned nb_active_threads = ctx->nb_active_threads;
59    unsigned first_job    = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
60    unsigned current_job  = first_job;
61
62    do {
63        ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
64    } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
65
66    return current_job == nb_jobs + nb_active_threads - 1;
67}
68
69static void *attribute_align_arg thread_worker(void *v)
70{
71    WorkerContext *w = v;
72    AVSliceThread *ctx = w->ctx;
73
74    pthread_mutex_lock(&w->mutex);
75    pthread_cond_signal(&w->cond);
76
77    while (1) {
78        w->done = 1;
79        while (w->done)
80            pthread_cond_wait(&w->cond, &w->mutex);
81
82        if (ctx->finished) {
83            pthread_mutex_unlock(&w->mutex);
84            return NULL;
85        }
86
87        if (run_jobs(ctx)) {
88            pthread_mutex_lock(&ctx->done_mutex);
89            ctx->done = 1;
90            pthread_cond_signal(&ctx->done_cond);
91            pthread_mutex_unlock(&ctx->done_mutex);
92        }
93    }
94}
95
96int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
97                              void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
98                              void (*main_func)(void *priv),
99                              int nb_threads)
100{
101    AVSliceThread *ctx;
102    int nb_workers, i;
103
104    av_assert0(nb_threads >= 0);
105    if (!nb_threads) {
106        int nb_cpus = av_cpu_count();
107        if (nb_cpus > 1)
108            nb_threads = nb_cpus + 1;
109        else
110            nb_threads = 1;
111    }
112
113    nb_workers = nb_threads;
114    if (!main_func)
115        nb_workers--;
116
117    *pctx = ctx = av_mallocz(sizeof(*ctx));
118    if (!ctx)
119        return AVERROR(ENOMEM);
120
121    if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
122        av_freep(pctx);
123        return AVERROR(ENOMEM);
124    }
125
126    ctx->priv        = priv;
127    ctx->worker_func = worker_func;
128    ctx->main_func   = main_func;
129    ctx->nb_threads  = nb_threads;
130    ctx->nb_active_threads = 0;
131    ctx->nb_jobs     = 0;
132    ctx->finished    = 0;
133
134    atomic_init(&ctx->first_job, 0);
135    atomic_init(&ctx->current_job, 0);
136    pthread_mutex_init(&ctx->done_mutex, NULL);
137    pthread_cond_init(&ctx->done_cond, NULL);
138    ctx->done        = 0;
139
140    for (i = 0; i < nb_workers; i++) {
141        WorkerContext *w = &ctx->workers[i];
142        int ret;
143        w->ctx = ctx;
144        pthread_mutex_init(&w->mutex, NULL);
145        pthread_cond_init(&w->cond, NULL);
146        pthread_mutex_lock(&w->mutex);
147        w->done = 0;
148
149        if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
150            ctx->nb_threads = main_func ? i : i + 1;
151            pthread_mutex_unlock(&w->mutex);
152            pthread_cond_destroy(&w->cond);
153            pthread_mutex_destroy(&w->mutex);
154            avpriv_slicethread_free(pctx);
155            return AVERROR(ret);
156        }
157
158        while (!w->done)
159            pthread_cond_wait(&w->cond, &w->mutex);
160        pthread_mutex_unlock(&w->mutex);
161    }
162
163    return nb_threads;
164}
165
166void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
167{
168    int nb_workers, i, is_last = 0;
169
170    av_assert0(nb_jobs > 0);
171    ctx->nb_jobs           = nb_jobs;
172    ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
173    atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
174    atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
175    nb_workers             = ctx->nb_active_threads;
176    if (!ctx->main_func || !execute_main)
177        nb_workers--;
178
179    for (i = 0; i < nb_workers; i++) {
180        WorkerContext *w = &ctx->workers[i];
181        pthread_mutex_lock(&w->mutex);
182        w->done = 0;
183        pthread_cond_signal(&w->cond);
184        pthread_mutex_unlock(&w->mutex);
185    }
186
187    if (ctx->main_func && execute_main)
188        ctx->main_func(ctx->priv);
189    else
190        is_last = run_jobs(ctx);
191
192    if (!is_last) {
193        pthread_mutex_lock(&ctx->done_mutex);
194        while (!ctx->done)
195            pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
196        ctx->done = 0;
197        pthread_mutex_unlock(&ctx->done_mutex);
198    }
199}
200
201void avpriv_slicethread_free(AVSliceThread **pctx)
202{
203    AVSliceThread *ctx;
204    int nb_workers, i;
205
206    if (!pctx || !*pctx)
207        return;
208
209    ctx = *pctx;
210    nb_workers = ctx->nb_threads;
211    if (!ctx->main_func)
212        nb_workers--;
213
214    ctx->finished = 1;
215    for (i = 0; i < nb_workers; i++) {
216        WorkerContext *w = &ctx->workers[i];
217        pthread_mutex_lock(&w->mutex);
218        w->done = 0;
219        pthread_cond_signal(&w->cond);
220        pthread_mutex_unlock(&w->mutex);
221    }
222
223    for (i = 0; i < nb_workers; i++) {
224        WorkerContext *w = &ctx->workers[i];
225        pthread_join(w->thread, NULL);
226        pthread_cond_destroy(&w->cond);
227        pthread_mutex_destroy(&w->mutex);
228    }
229
230    pthread_cond_destroy(&ctx->done_cond);
231    pthread_mutex_destroy(&ctx->done_mutex);
232    av_freep(&ctx->workers);
233    av_freep(pctx);
234}
235
236#else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
237
238int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
239                              void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
240                              void (*main_func)(void *priv),
241                              int nb_threads)
242{
243    *pctx = NULL;
244    return AVERROR(ENOSYS);
245}
246
247void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
248{
249    av_assert0(0);
250}
251
252void avpriv_slicethread_free(AVSliceThread **pctx)
253{
254    av_assert0(!pctx || !*pctx);
255}
256
257#endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
258