1/* 2 * This file is part of FFmpeg. 3 * 4 * FFmpeg is free software; you can redistribute it and/or 5 * modify it under the terms of the GNU Lesser General Public 6 * License as published by the Free Software Foundation; either 7 * version 2.1 of the License, or (at your option) any later version. 8 * 9 * FFmpeg is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 * Lesser General Public License for more details. 13 * 14 * You should have received a copy of the GNU Lesser General Public 15 * License along with FFmpeg; if not, write to the Free Software 16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 17 */ 18 19#include <stdatomic.h> 20#include "cpu.h" 21#include "internal.h" 22#include "slicethread.h" 23#include "mem.h" 24#include "thread.h" 25#include "avassert.h" 26 27#if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS 28 29typedef struct WorkerContext { 30 AVSliceThread *ctx; 31 pthread_mutex_t mutex; 32 pthread_cond_t cond; 33 pthread_t thread; 34 int done; 35} WorkerContext; 36 37struct AVSliceThread { 38 WorkerContext *workers; 39 int nb_threads; 40 int nb_active_threads; 41 int nb_jobs; 42 43 atomic_uint first_job; 44 atomic_uint current_job; 45 pthread_mutex_t done_mutex; 46 pthread_cond_t done_cond; 47 int done; 48 int finished; 49 50 void *priv; 51 void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads); 52 void (*main_func)(void *priv); 53}; 54 55static int run_jobs(AVSliceThread *ctx) 56{ 57 unsigned nb_jobs = ctx->nb_jobs; 58 unsigned nb_active_threads = ctx->nb_active_threads; 59 unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel); 60 unsigned current_job = first_job; 61 62 do { 63 ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads); 64 } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs); 65 66 return current_job == nb_jobs + nb_active_threads - 1; 67} 68 69static void *attribute_align_arg thread_worker(void *v) 70{ 71 WorkerContext *w = v; 72 AVSliceThread *ctx = w->ctx; 73 74 pthread_mutex_lock(&w->mutex); 75 pthread_cond_signal(&w->cond); 76 77 while (1) { 78 w->done = 1; 79 while (w->done) 80 pthread_cond_wait(&w->cond, &w->mutex); 81 82 if (ctx->finished) { 83 pthread_mutex_unlock(&w->mutex); 84 return NULL; 85 } 86 87 if (run_jobs(ctx)) { 88 pthread_mutex_lock(&ctx->done_mutex); 89 ctx->done = 1; 90 pthread_cond_signal(&ctx->done_cond); 91 pthread_mutex_unlock(&ctx->done_mutex); 92 } 93 } 94} 95 96int avpriv_slicethread_create(AVSliceThread **pctx, void *priv, 97 void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads), 98 void (*main_func)(void *priv), 99 int nb_threads) 100{ 101 AVSliceThread *ctx; 102 int nb_workers, i; 103 104 av_assert0(nb_threads >= 0); 105 if (!nb_threads) { 106 int nb_cpus = av_cpu_count(); 107 if (nb_cpus > 1) 108 nb_threads = nb_cpus + 1; 109 else 110 nb_threads = 1; 111 } 112 113 nb_workers = nb_threads; 114 if (!main_func) 115 nb_workers--; 116 117 *pctx = ctx = av_mallocz(sizeof(*ctx)); 118 if (!ctx) 119 return AVERROR(ENOMEM); 120 121 if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) { 122 av_freep(pctx); 123 return AVERROR(ENOMEM); 124 } 125 126 ctx->priv = priv; 127 ctx->worker_func = worker_func; 128 ctx->main_func = main_func; 129 ctx->nb_threads = nb_threads; 130 ctx->nb_active_threads = 0; 131 ctx->nb_jobs = 0; 132 ctx->finished = 0; 133 134 atomic_init(&ctx->first_job, 0); 135 atomic_init(&ctx->current_job, 0); 136 pthread_mutex_init(&ctx->done_mutex, NULL); 137 pthread_cond_init(&ctx->done_cond, NULL); 138 ctx->done = 0; 139 140 for (i = 0; i < nb_workers; i++) { 141 WorkerContext *w = &ctx->workers[i]; 142 int ret; 143 w->ctx = ctx; 144 pthread_mutex_init(&w->mutex, NULL); 145 pthread_cond_init(&w->cond, NULL); 146 pthread_mutex_lock(&w->mutex); 147 w->done = 0; 148 149 if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) { 150 ctx->nb_threads = main_func ? i : i + 1; 151 pthread_mutex_unlock(&w->mutex); 152 pthread_cond_destroy(&w->cond); 153 pthread_mutex_destroy(&w->mutex); 154 avpriv_slicethread_free(pctx); 155 return AVERROR(ret); 156 } 157 158 while (!w->done) 159 pthread_cond_wait(&w->cond, &w->mutex); 160 pthread_mutex_unlock(&w->mutex); 161 } 162 163 return nb_threads; 164} 165 166void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main) 167{ 168 int nb_workers, i, is_last = 0; 169 170 av_assert0(nb_jobs > 0); 171 ctx->nb_jobs = nb_jobs; 172 ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads); 173 atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed); 174 atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed); 175 nb_workers = ctx->nb_active_threads; 176 if (!ctx->main_func || !execute_main) 177 nb_workers--; 178 179 for (i = 0; i < nb_workers; i++) { 180 WorkerContext *w = &ctx->workers[i]; 181 pthread_mutex_lock(&w->mutex); 182 w->done = 0; 183 pthread_cond_signal(&w->cond); 184 pthread_mutex_unlock(&w->mutex); 185 } 186 187 if (ctx->main_func && execute_main) 188 ctx->main_func(ctx->priv); 189 else 190 is_last = run_jobs(ctx); 191 192 if (!is_last) { 193 pthread_mutex_lock(&ctx->done_mutex); 194 while (!ctx->done) 195 pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex); 196 ctx->done = 0; 197 pthread_mutex_unlock(&ctx->done_mutex); 198 } 199} 200 201void avpriv_slicethread_free(AVSliceThread **pctx) 202{ 203 AVSliceThread *ctx; 204 int nb_workers, i; 205 206 if (!pctx || !*pctx) 207 return; 208 209 ctx = *pctx; 210 nb_workers = ctx->nb_threads; 211 if (!ctx->main_func) 212 nb_workers--; 213 214 ctx->finished = 1; 215 for (i = 0; i < nb_workers; i++) { 216 WorkerContext *w = &ctx->workers[i]; 217 pthread_mutex_lock(&w->mutex); 218 w->done = 0; 219 pthread_cond_signal(&w->cond); 220 pthread_mutex_unlock(&w->mutex); 221 } 222 223 for (i = 0; i < nb_workers; i++) { 224 WorkerContext *w = &ctx->workers[i]; 225 pthread_join(w->thread, NULL); 226 pthread_cond_destroy(&w->cond); 227 pthread_mutex_destroy(&w->mutex); 228 } 229 230 pthread_cond_destroy(&ctx->done_cond); 231 pthread_mutex_destroy(&ctx->done_mutex); 232 av_freep(&ctx->workers); 233 av_freep(pctx); 234} 235 236#else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */ 237 238int avpriv_slicethread_create(AVSliceThread **pctx, void *priv, 239 void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads), 240 void (*main_func)(void *priv), 241 int nb_threads) 242{ 243 *pctx = NULL; 244 return AVERROR(ENOSYS); 245} 246 247void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main) 248{ 249 av_assert0(0); 250} 251 252void avpriv_slicethread_free(AVSliceThread **pctx) 253{ 254 av_assert0(!pctx || !*pctx); 255} 256 257#endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */ 258