1bf215546Sopenharmony_ci/* 2bf215546Sopenharmony_ci * Copyright © 2016 Advanced Micro Devices, Inc. 3bf215546Sopenharmony_ci * All Rights Reserved. 4bf215546Sopenharmony_ci * 5bf215546Sopenharmony_ci * Permission is hereby granted, free of charge, to any person obtaining 6bf215546Sopenharmony_ci * a copy of this software and associated documentation files (the 7bf215546Sopenharmony_ci * "Software"), to deal in the Software without restriction, including 8bf215546Sopenharmony_ci * without limitation the rights to use, copy, modify, merge, publish, 9bf215546Sopenharmony_ci * distribute, sub license, and/or sell copies of the Software, and to 10bf215546Sopenharmony_ci * permit persons to whom the Software is furnished to do so, subject to 11bf215546Sopenharmony_ci * the following conditions: 12bf215546Sopenharmony_ci * 13bf215546Sopenharmony_ci * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 14bf215546Sopenharmony_ci * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 15bf215546Sopenharmony_ci * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 16bf215546Sopenharmony_ci * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS 17bf215546Sopenharmony_ci * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18bf215546Sopenharmony_ci * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, 19bf215546Sopenharmony_ci * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20bf215546Sopenharmony_ci * USE OR OTHER DEALINGS IN THE SOFTWARE. 21bf215546Sopenharmony_ci * 22bf215546Sopenharmony_ci * The above copyright notice and this permission notice (including the 23bf215546Sopenharmony_ci * next paragraph) shall be included in all copies or substantial portions 24bf215546Sopenharmony_ci * of the Software. 25bf215546Sopenharmony_ci */ 26bf215546Sopenharmony_ci 27bf215546Sopenharmony_ci#include "u_queue.h" 28bf215546Sopenharmony_ci 29bf215546Sopenharmony_ci#include "c11/threads.h" 30bf215546Sopenharmony_ci#include "util/u_cpu_detect.h" 31bf215546Sopenharmony_ci#include "util/os_time.h" 32bf215546Sopenharmony_ci#include "util/u_string.h" 33bf215546Sopenharmony_ci#include "util/u_thread.h" 34bf215546Sopenharmony_ci#include "u_process.h" 35bf215546Sopenharmony_ci 36bf215546Sopenharmony_ci#if defined(__linux__) 37bf215546Sopenharmony_ci#include <sys/time.h> 38bf215546Sopenharmony_ci#include <sys/resource.h> 39bf215546Sopenharmony_ci#include <sys/syscall.h> 40bf215546Sopenharmony_ci#endif 41bf215546Sopenharmony_ci 42bf215546Sopenharmony_ci 43bf215546Sopenharmony_ci/* Define 256MB */ 44bf215546Sopenharmony_ci#define S_256MB (256 * 1024 * 1024) 45bf215546Sopenharmony_ci 46bf215546Sopenharmony_cistatic void 47bf215546Sopenharmony_ciutil_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads, 48bf215546Sopenharmony_ci bool finish_locked); 49bf215546Sopenharmony_ci 50bf215546Sopenharmony_ci/**************************************************************************** 51bf215546Sopenharmony_ci * Wait for all queues to assert idle when exit() is called. 52bf215546Sopenharmony_ci * 53bf215546Sopenharmony_ci * Otherwise, C++ static variable destructors can be called while threads 54bf215546Sopenharmony_ci * are using the static variables. 55bf215546Sopenharmony_ci */ 56bf215546Sopenharmony_ci 57bf215546Sopenharmony_cistatic once_flag atexit_once_flag = ONCE_FLAG_INIT; 58bf215546Sopenharmony_cistatic struct list_head queue_list; 59bf215546Sopenharmony_cistatic mtx_t exit_mutex = _MTX_INITIALIZER_NP; 60bf215546Sopenharmony_ci 61bf215546Sopenharmony_cistatic void 62bf215546Sopenharmony_ciatexit_handler(void) 63bf215546Sopenharmony_ci{ 64bf215546Sopenharmony_ci struct util_queue *iter; 65bf215546Sopenharmony_ci 66bf215546Sopenharmony_ci mtx_lock(&exit_mutex); 67bf215546Sopenharmony_ci /* Wait for all queues to assert idle. */ 68bf215546Sopenharmony_ci LIST_FOR_EACH_ENTRY(iter, &queue_list, head) { 69bf215546Sopenharmony_ci util_queue_kill_threads(iter, 0, false); 70bf215546Sopenharmony_ci } 71bf215546Sopenharmony_ci mtx_unlock(&exit_mutex); 72bf215546Sopenharmony_ci} 73bf215546Sopenharmony_ci 74bf215546Sopenharmony_cistatic void 75bf215546Sopenharmony_ciglobal_init(void) 76bf215546Sopenharmony_ci{ 77bf215546Sopenharmony_ci list_inithead(&queue_list); 78bf215546Sopenharmony_ci atexit(atexit_handler); 79bf215546Sopenharmony_ci} 80bf215546Sopenharmony_ci 81bf215546Sopenharmony_cistatic void 82bf215546Sopenharmony_ciadd_to_atexit_list(struct util_queue *queue) 83bf215546Sopenharmony_ci{ 84bf215546Sopenharmony_ci call_once(&atexit_once_flag, global_init); 85bf215546Sopenharmony_ci 86bf215546Sopenharmony_ci mtx_lock(&exit_mutex); 87bf215546Sopenharmony_ci list_add(&queue->head, &queue_list); 88bf215546Sopenharmony_ci mtx_unlock(&exit_mutex); 89bf215546Sopenharmony_ci} 90bf215546Sopenharmony_ci 91bf215546Sopenharmony_cistatic void 92bf215546Sopenharmony_ciremove_from_atexit_list(struct util_queue *queue) 93bf215546Sopenharmony_ci{ 94bf215546Sopenharmony_ci struct util_queue *iter, *tmp; 95bf215546Sopenharmony_ci 96bf215546Sopenharmony_ci mtx_lock(&exit_mutex); 97bf215546Sopenharmony_ci LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) { 98bf215546Sopenharmony_ci if (iter == queue) { 99bf215546Sopenharmony_ci list_del(&iter->head); 100bf215546Sopenharmony_ci break; 101bf215546Sopenharmony_ci } 102bf215546Sopenharmony_ci } 103bf215546Sopenharmony_ci mtx_unlock(&exit_mutex); 104bf215546Sopenharmony_ci} 105bf215546Sopenharmony_ci 106bf215546Sopenharmony_ci/**************************************************************************** 107bf215546Sopenharmony_ci * util_queue_fence 108bf215546Sopenharmony_ci */ 109bf215546Sopenharmony_ci 110bf215546Sopenharmony_ci#ifdef UTIL_QUEUE_FENCE_FUTEX 111bf215546Sopenharmony_cistatic bool 112bf215546Sopenharmony_cido_futex_fence_wait(struct util_queue_fence *fence, 113bf215546Sopenharmony_ci bool timeout, int64_t abs_timeout) 114bf215546Sopenharmony_ci{ 115bf215546Sopenharmony_ci uint32_t v = p_atomic_read_relaxed(&fence->val); 116bf215546Sopenharmony_ci struct timespec ts; 117bf215546Sopenharmony_ci ts.tv_sec = abs_timeout / (1000*1000*1000); 118bf215546Sopenharmony_ci ts.tv_nsec = abs_timeout % (1000*1000*1000); 119bf215546Sopenharmony_ci 120bf215546Sopenharmony_ci while (v != 0) { 121bf215546Sopenharmony_ci if (v != 2) { 122bf215546Sopenharmony_ci v = p_atomic_cmpxchg(&fence->val, 1, 2); 123bf215546Sopenharmony_ci if (v == 0) 124bf215546Sopenharmony_ci return true; 125bf215546Sopenharmony_ci } 126bf215546Sopenharmony_ci 127bf215546Sopenharmony_ci int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL); 128bf215546Sopenharmony_ci if (timeout && r < 0) { 129bf215546Sopenharmony_ci if (errno == ETIMEDOUT) 130bf215546Sopenharmony_ci return false; 131bf215546Sopenharmony_ci } 132bf215546Sopenharmony_ci 133bf215546Sopenharmony_ci v = p_atomic_read_relaxed(&fence->val); 134bf215546Sopenharmony_ci } 135bf215546Sopenharmony_ci 136bf215546Sopenharmony_ci return true; 137bf215546Sopenharmony_ci} 138bf215546Sopenharmony_ci 139bf215546Sopenharmony_civoid 140bf215546Sopenharmony_ci_util_queue_fence_wait(struct util_queue_fence *fence) 141bf215546Sopenharmony_ci{ 142bf215546Sopenharmony_ci do_futex_fence_wait(fence, false, 0); 143bf215546Sopenharmony_ci} 144bf215546Sopenharmony_ci 145bf215546Sopenharmony_cibool 146bf215546Sopenharmony_ci_util_queue_fence_wait_timeout(struct util_queue_fence *fence, 147bf215546Sopenharmony_ci int64_t abs_timeout) 148bf215546Sopenharmony_ci{ 149bf215546Sopenharmony_ci return do_futex_fence_wait(fence, true, abs_timeout); 150bf215546Sopenharmony_ci} 151bf215546Sopenharmony_ci 152bf215546Sopenharmony_ci#endif 153bf215546Sopenharmony_ci 154bf215546Sopenharmony_ci#ifdef UTIL_QUEUE_FENCE_STANDARD 155bf215546Sopenharmony_civoid 156bf215546Sopenharmony_ciutil_queue_fence_signal(struct util_queue_fence *fence) 157bf215546Sopenharmony_ci{ 158bf215546Sopenharmony_ci mtx_lock(&fence->mutex); 159bf215546Sopenharmony_ci fence->signalled = true; 160bf215546Sopenharmony_ci cnd_broadcast(&fence->cond); 161bf215546Sopenharmony_ci mtx_unlock(&fence->mutex); 162bf215546Sopenharmony_ci} 163bf215546Sopenharmony_ci 164bf215546Sopenharmony_civoid 165bf215546Sopenharmony_ci_util_queue_fence_wait(struct util_queue_fence *fence) 166bf215546Sopenharmony_ci{ 167bf215546Sopenharmony_ci mtx_lock(&fence->mutex); 168bf215546Sopenharmony_ci while (!fence->signalled) 169bf215546Sopenharmony_ci cnd_wait(&fence->cond, &fence->mutex); 170bf215546Sopenharmony_ci mtx_unlock(&fence->mutex); 171bf215546Sopenharmony_ci} 172bf215546Sopenharmony_ci 173bf215546Sopenharmony_cibool 174bf215546Sopenharmony_ci_util_queue_fence_wait_timeout(struct util_queue_fence *fence, 175bf215546Sopenharmony_ci int64_t abs_timeout) 176bf215546Sopenharmony_ci{ 177bf215546Sopenharmony_ci /* This terrible hack is made necessary by the fact that we really want an 178bf215546Sopenharmony_ci * internal interface consistent with os_time_*, but cnd_timedwait is spec'd 179bf215546Sopenharmony_ci * to be relative to the TIME_UTC clock. 180bf215546Sopenharmony_ci */ 181bf215546Sopenharmony_ci int64_t rel = abs_timeout - os_time_get_nano(); 182bf215546Sopenharmony_ci 183bf215546Sopenharmony_ci if (rel > 0) { 184bf215546Sopenharmony_ci struct timespec ts; 185bf215546Sopenharmony_ci 186bf215546Sopenharmony_ci timespec_get(&ts, TIME_UTC); 187bf215546Sopenharmony_ci 188bf215546Sopenharmony_ci ts.tv_sec += abs_timeout / (1000*1000*1000); 189bf215546Sopenharmony_ci ts.tv_nsec += abs_timeout % (1000*1000*1000); 190bf215546Sopenharmony_ci if (ts.tv_nsec >= (1000*1000*1000)) { 191bf215546Sopenharmony_ci ts.tv_sec++; 192bf215546Sopenharmony_ci ts.tv_nsec -= (1000*1000*1000); 193bf215546Sopenharmony_ci } 194bf215546Sopenharmony_ci 195bf215546Sopenharmony_ci mtx_lock(&fence->mutex); 196bf215546Sopenharmony_ci while (!fence->signalled) { 197bf215546Sopenharmony_ci if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success) 198bf215546Sopenharmony_ci break; 199bf215546Sopenharmony_ci } 200bf215546Sopenharmony_ci mtx_unlock(&fence->mutex); 201bf215546Sopenharmony_ci } 202bf215546Sopenharmony_ci 203bf215546Sopenharmony_ci return fence->signalled; 204bf215546Sopenharmony_ci} 205bf215546Sopenharmony_ci 206bf215546Sopenharmony_civoid 207bf215546Sopenharmony_ciutil_queue_fence_init(struct util_queue_fence *fence) 208bf215546Sopenharmony_ci{ 209bf215546Sopenharmony_ci memset(fence, 0, sizeof(*fence)); 210bf215546Sopenharmony_ci (void) mtx_init(&fence->mutex, mtx_plain); 211bf215546Sopenharmony_ci cnd_init(&fence->cond); 212bf215546Sopenharmony_ci fence->signalled = true; 213bf215546Sopenharmony_ci} 214bf215546Sopenharmony_ci 215bf215546Sopenharmony_civoid 216bf215546Sopenharmony_ciutil_queue_fence_destroy(struct util_queue_fence *fence) 217bf215546Sopenharmony_ci{ 218bf215546Sopenharmony_ci assert(fence->signalled); 219bf215546Sopenharmony_ci 220bf215546Sopenharmony_ci /* Ensure that another thread is not in the middle of 221bf215546Sopenharmony_ci * util_queue_fence_signal (having set the fence to signalled but still 222bf215546Sopenharmony_ci * holding the fence mutex). 223bf215546Sopenharmony_ci * 224bf215546Sopenharmony_ci * A common contract between threads is that as soon as a fence is signalled 225bf215546Sopenharmony_ci * by thread A, thread B is allowed to destroy it. Since 226bf215546Sopenharmony_ci * util_queue_fence_is_signalled does not lock the fence mutex (for 227bf215546Sopenharmony_ci * performance reasons), we must do so here. 228bf215546Sopenharmony_ci */ 229bf215546Sopenharmony_ci mtx_lock(&fence->mutex); 230bf215546Sopenharmony_ci mtx_unlock(&fence->mutex); 231bf215546Sopenharmony_ci 232bf215546Sopenharmony_ci cnd_destroy(&fence->cond); 233bf215546Sopenharmony_ci mtx_destroy(&fence->mutex); 234bf215546Sopenharmony_ci} 235bf215546Sopenharmony_ci#endif 236bf215546Sopenharmony_ci 237bf215546Sopenharmony_ci/**************************************************************************** 238bf215546Sopenharmony_ci * util_queue implementation 239bf215546Sopenharmony_ci */ 240bf215546Sopenharmony_ci 241bf215546Sopenharmony_cistruct thread_input { 242bf215546Sopenharmony_ci struct util_queue *queue; 243bf215546Sopenharmony_ci int thread_index; 244bf215546Sopenharmony_ci}; 245bf215546Sopenharmony_ci 246bf215546Sopenharmony_cistatic int 247bf215546Sopenharmony_ciutil_queue_thread_func(void *input) 248bf215546Sopenharmony_ci{ 249bf215546Sopenharmony_ci struct util_queue *queue = ((struct thread_input*)input)->queue; 250bf215546Sopenharmony_ci int thread_index = ((struct thread_input*)input)->thread_index; 251bf215546Sopenharmony_ci 252bf215546Sopenharmony_ci free(input); 253bf215546Sopenharmony_ci 254bf215546Sopenharmony_ci if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) { 255bf215546Sopenharmony_ci /* Don't inherit the thread affinity from the parent thread. 256bf215546Sopenharmony_ci * Set the full mask. 257bf215546Sopenharmony_ci */ 258bf215546Sopenharmony_ci uint32_t mask[UTIL_MAX_CPUS / 32]; 259bf215546Sopenharmony_ci 260bf215546Sopenharmony_ci memset(mask, 0xff, sizeof(mask)); 261bf215546Sopenharmony_ci 262bf215546Sopenharmony_ci util_set_current_thread_affinity(mask, NULL, 263bf215546Sopenharmony_ci util_get_cpu_caps()->num_cpu_mask_bits); 264bf215546Sopenharmony_ci } 265bf215546Sopenharmony_ci 266bf215546Sopenharmony_ci#if defined(__linux__) 267bf215546Sopenharmony_ci if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) { 268bf215546Sopenharmony_ci /* The nice() function can only set a maximum of 19. */ 269bf215546Sopenharmony_ci setpriority(PRIO_PROCESS, syscall(SYS_gettid), 19); 270bf215546Sopenharmony_ci } 271bf215546Sopenharmony_ci#endif 272bf215546Sopenharmony_ci 273bf215546Sopenharmony_ci if (strlen(queue->name) > 0) { 274bf215546Sopenharmony_ci char name[16]; 275bf215546Sopenharmony_ci snprintf(name, sizeof(name), "%s%i", queue->name, thread_index); 276bf215546Sopenharmony_ci u_thread_setname(name); 277bf215546Sopenharmony_ci } 278bf215546Sopenharmony_ci 279bf215546Sopenharmony_ci while (1) { 280bf215546Sopenharmony_ci struct util_queue_job job; 281bf215546Sopenharmony_ci 282bf215546Sopenharmony_ci mtx_lock(&queue->lock); 283bf215546Sopenharmony_ci assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); 284bf215546Sopenharmony_ci 285bf215546Sopenharmony_ci /* wait if the queue is empty */ 286bf215546Sopenharmony_ci while (thread_index < queue->num_threads && queue->num_queued == 0) 287bf215546Sopenharmony_ci cnd_wait(&queue->has_queued_cond, &queue->lock); 288bf215546Sopenharmony_ci 289bf215546Sopenharmony_ci /* only kill threads that are above "num_threads" */ 290bf215546Sopenharmony_ci if (thread_index >= queue->num_threads) { 291bf215546Sopenharmony_ci mtx_unlock(&queue->lock); 292bf215546Sopenharmony_ci break; 293bf215546Sopenharmony_ci } 294bf215546Sopenharmony_ci 295bf215546Sopenharmony_ci job = queue->jobs[queue->read_idx]; 296bf215546Sopenharmony_ci memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job)); 297bf215546Sopenharmony_ci queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; 298bf215546Sopenharmony_ci 299bf215546Sopenharmony_ci queue->num_queued--; 300bf215546Sopenharmony_ci cnd_signal(&queue->has_space_cond); 301bf215546Sopenharmony_ci if (job.job) 302bf215546Sopenharmony_ci queue->total_jobs_size -= job.job_size; 303bf215546Sopenharmony_ci mtx_unlock(&queue->lock); 304bf215546Sopenharmony_ci 305bf215546Sopenharmony_ci if (job.job) { 306bf215546Sopenharmony_ci job.execute(job.job, job.global_data, thread_index); 307bf215546Sopenharmony_ci if (job.fence) 308bf215546Sopenharmony_ci util_queue_fence_signal(job.fence); 309bf215546Sopenharmony_ci if (job.cleanup) 310bf215546Sopenharmony_ci job.cleanup(job.job, job.global_data, thread_index); 311bf215546Sopenharmony_ci } 312bf215546Sopenharmony_ci } 313bf215546Sopenharmony_ci 314bf215546Sopenharmony_ci /* signal remaining jobs if all threads are being terminated */ 315bf215546Sopenharmony_ci mtx_lock(&queue->lock); 316bf215546Sopenharmony_ci if (queue->num_threads == 0) { 317bf215546Sopenharmony_ci for (unsigned i = queue->read_idx; i != queue->write_idx; 318bf215546Sopenharmony_ci i = (i + 1) % queue->max_jobs) { 319bf215546Sopenharmony_ci if (queue->jobs[i].job) { 320bf215546Sopenharmony_ci if (queue->jobs[i].fence) 321bf215546Sopenharmony_ci util_queue_fence_signal(queue->jobs[i].fence); 322bf215546Sopenharmony_ci queue->jobs[i].job = NULL; 323bf215546Sopenharmony_ci } 324bf215546Sopenharmony_ci } 325bf215546Sopenharmony_ci queue->read_idx = queue->write_idx; 326bf215546Sopenharmony_ci queue->num_queued = 0; 327bf215546Sopenharmony_ci } 328bf215546Sopenharmony_ci mtx_unlock(&queue->lock); 329bf215546Sopenharmony_ci return 0; 330bf215546Sopenharmony_ci} 331bf215546Sopenharmony_ci 332bf215546Sopenharmony_cistatic bool 333bf215546Sopenharmony_ciutil_queue_create_thread(struct util_queue *queue, unsigned index) 334bf215546Sopenharmony_ci{ 335bf215546Sopenharmony_ci struct thread_input *input = 336bf215546Sopenharmony_ci (struct thread_input *) malloc(sizeof(struct thread_input)); 337bf215546Sopenharmony_ci input->queue = queue; 338bf215546Sopenharmony_ci input->thread_index = index; 339bf215546Sopenharmony_ci 340bf215546Sopenharmony_ci if (thrd_success != u_thread_create(queue->threads + index, util_queue_thread_func, input)) { 341bf215546Sopenharmony_ci free(input); 342bf215546Sopenharmony_ci return false; 343bf215546Sopenharmony_ci } 344bf215546Sopenharmony_ci 345bf215546Sopenharmony_ci if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) { 346bf215546Sopenharmony_ci#if defined(__linux__) && defined(SCHED_BATCH) 347bf215546Sopenharmony_ci struct sched_param sched_param = {0}; 348bf215546Sopenharmony_ci 349bf215546Sopenharmony_ci /* The nice() function can only set a maximum of 19. 350bf215546Sopenharmony_ci * SCHED_BATCH gives the scheduler a hint that this is a latency 351bf215546Sopenharmony_ci * insensitive thread. 352bf215546Sopenharmony_ci * 353bf215546Sopenharmony_ci * Note that Linux only allows decreasing the priority. The original 354bf215546Sopenharmony_ci * priority can't be restored. 355bf215546Sopenharmony_ci */ 356bf215546Sopenharmony_ci pthread_setschedparam(queue->threads[index], SCHED_BATCH, &sched_param); 357bf215546Sopenharmony_ci#endif 358bf215546Sopenharmony_ci } 359bf215546Sopenharmony_ci return true; 360bf215546Sopenharmony_ci} 361bf215546Sopenharmony_ci 362bf215546Sopenharmony_civoid 363bf215546Sopenharmony_ciutil_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads) 364bf215546Sopenharmony_ci{ 365bf215546Sopenharmony_ci num_threads = MIN2(num_threads, queue->max_threads); 366bf215546Sopenharmony_ci num_threads = MAX2(num_threads, 1); 367bf215546Sopenharmony_ci 368bf215546Sopenharmony_ci simple_mtx_lock(&queue->finish_lock); 369bf215546Sopenharmony_ci unsigned old_num_threads = queue->num_threads; 370bf215546Sopenharmony_ci 371bf215546Sopenharmony_ci if (num_threads == old_num_threads) { 372bf215546Sopenharmony_ci simple_mtx_unlock(&queue->finish_lock); 373bf215546Sopenharmony_ci return; 374bf215546Sopenharmony_ci } 375bf215546Sopenharmony_ci 376bf215546Sopenharmony_ci if (num_threads < old_num_threads) { 377bf215546Sopenharmony_ci util_queue_kill_threads(queue, num_threads, true); 378bf215546Sopenharmony_ci simple_mtx_unlock(&queue->finish_lock); 379bf215546Sopenharmony_ci return; 380bf215546Sopenharmony_ci } 381bf215546Sopenharmony_ci 382bf215546Sopenharmony_ci /* Create threads. 383bf215546Sopenharmony_ci * 384bf215546Sopenharmony_ci * We need to update num_threads first, because threads terminate 385bf215546Sopenharmony_ci * when thread_index < num_threads. 386bf215546Sopenharmony_ci */ 387bf215546Sopenharmony_ci queue->num_threads = num_threads; 388bf215546Sopenharmony_ci for (unsigned i = old_num_threads; i < num_threads; i++) { 389bf215546Sopenharmony_ci if (!util_queue_create_thread(queue, i)) { 390bf215546Sopenharmony_ci queue->num_threads = i; 391bf215546Sopenharmony_ci break; 392bf215546Sopenharmony_ci } 393bf215546Sopenharmony_ci } 394bf215546Sopenharmony_ci simple_mtx_unlock(&queue->finish_lock); 395bf215546Sopenharmony_ci} 396bf215546Sopenharmony_ci 397bf215546Sopenharmony_cibool 398bf215546Sopenharmony_ciutil_queue_init(struct util_queue *queue, 399bf215546Sopenharmony_ci const char *name, 400bf215546Sopenharmony_ci unsigned max_jobs, 401bf215546Sopenharmony_ci unsigned num_threads, 402bf215546Sopenharmony_ci unsigned flags, 403bf215546Sopenharmony_ci void *global_data) 404bf215546Sopenharmony_ci{ 405bf215546Sopenharmony_ci unsigned i; 406bf215546Sopenharmony_ci 407bf215546Sopenharmony_ci /* Form the thread name from process_name and name, limited to 13 408bf215546Sopenharmony_ci * characters. Characters 14-15 are reserved for the thread number. 409bf215546Sopenharmony_ci * Character 16 should be 0. Final form: "process:name12" 410bf215546Sopenharmony_ci * 411bf215546Sopenharmony_ci * If name is too long, it's truncated. If any space is left, the process 412bf215546Sopenharmony_ci * name fills it. 413bf215546Sopenharmony_ci */ 414bf215546Sopenharmony_ci const char *process_name = util_get_process_name(); 415bf215546Sopenharmony_ci int process_len = process_name ? strlen(process_name) : 0; 416bf215546Sopenharmony_ci int name_len = strlen(name); 417bf215546Sopenharmony_ci const int max_chars = sizeof(queue->name) - 1; 418bf215546Sopenharmony_ci 419bf215546Sopenharmony_ci name_len = MIN2(name_len, max_chars); 420bf215546Sopenharmony_ci 421bf215546Sopenharmony_ci /* See if there is any space left for the process name, reserve 1 for 422bf215546Sopenharmony_ci * the colon. */ 423bf215546Sopenharmony_ci process_len = MIN2(process_len, max_chars - name_len - 1); 424bf215546Sopenharmony_ci process_len = MAX2(process_len, 0); 425bf215546Sopenharmony_ci 426bf215546Sopenharmony_ci memset(queue, 0, sizeof(*queue)); 427bf215546Sopenharmony_ci 428bf215546Sopenharmony_ci if (process_len) { 429bf215546Sopenharmony_ci snprintf(queue->name, sizeof(queue->name), "%.*s:%s", 430bf215546Sopenharmony_ci process_len, process_name, name); 431bf215546Sopenharmony_ci } else { 432bf215546Sopenharmony_ci snprintf(queue->name, sizeof(queue->name), "%s", name); 433bf215546Sopenharmony_ci } 434bf215546Sopenharmony_ci 435bf215546Sopenharmony_ci queue->flags = flags; 436bf215546Sopenharmony_ci queue->max_threads = num_threads; 437bf215546Sopenharmony_ci queue->num_threads = (flags & UTIL_QUEUE_INIT_SCALE_THREADS) ? 1 : num_threads; 438bf215546Sopenharmony_ci queue->max_jobs = max_jobs; 439bf215546Sopenharmony_ci queue->global_data = global_data; 440bf215546Sopenharmony_ci 441bf215546Sopenharmony_ci (void) mtx_init(&queue->lock, mtx_plain); 442bf215546Sopenharmony_ci (void) simple_mtx_init(&queue->finish_lock, mtx_plain); 443bf215546Sopenharmony_ci 444bf215546Sopenharmony_ci queue->num_queued = 0; 445bf215546Sopenharmony_ci cnd_init(&queue->has_queued_cond); 446bf215546Sopenharmony_ci cnd_init(&queue->has_space_cond); 447bf215546Sopenharmony_ci 448bf215546Sopenharmony_ci queue->jobs = (struct util_queue_job*) 449bf215546Sopenharmony_ci calloc(max_jobs, sizeof(struct util_queue_job)); 450bf215546Sopenharmony_ci if (!queue->jobs) 451bf215546Sopenharmony_ci goto fail; 452bf215546Sopenharmony_ci 453bf215546Sopenharmony_ci queue->threads = (thrd_t*) calloc(queue->max_threads, sizeof(thrd_t)); 454bf215546Sopenharmony_ci if (!queue->threads) 455bf215546Sopenharmony_ci goto fail; 456bf215546Sopenharmony_ci 457bf215546Sopenharmony_ci /* start threads */ 458bf215546Sopenharmony_ci for (i = 0; i < queue->num_threads; i++) { 459bf215546Sopenharmony_ci if (!util_queue_create_thread(queue, i)) { 460bf215546Sopenharmony_ci if (i == 0) { 461bf215546Sopenharmony_ci /* no threads created, fail */ 462bf215546Sopenharmony_ci goto fail; 463bf215546Sopenharmony_ci } else { 464bf215546Sopenharmony_ci /* at least one thread created, so use it */ 465bf215546Sopenharmony_ci queue->num_threads = i; 466bf215546Sopenharmony_ci break; 467bf215546Sopenharmony_ci } 468bf215546Sopenharmony_ci } 469bf215546Sopenharmony_ci } 470bf215546Sopenharmony_ci 471bf215546Sopenharmony_ci add_to_atexit_list(queue); 472bf215546Sopenharmony_ci return true; 473bf215546Sopenharmony_ci 474bf215546Sopenharmony_cifail: 475bf215546Sopenharmony_ci free(queue->threads); 476bf215546Sopenharmony_ci 477bf215546Sopenharmony_ci if (queue->jobs) { 478bf215546Sopenharmony_ci cnd_destroy(&queue->has_space_cond); 479bf215546Sopenharmony_ci cnd_destroy(&queue->has_queued_cond); 480bf215546Sopenharmony_ci mtx_destroy(&queue->lock); 481bf215546Sopenharmony_ci free(queue->jobs); 482bf215546Sopenharmony_ci } 483bf215546Sopenharmony_ci /* also util_queue_is_initialized can be used to check for success */ 484bf215546Sopenharmony_ci memset(queue, 0, sizeof(*queue)); 485bf215546Sopenharmony_ci return false; 486bf215546Sopenharmony_ci} 487bf215546Sopenharmony_ci 488bf215546Sopenharmony_cistatic void 489bf215546Sopenharmony_ciutil_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads, 490bf215546Sopenharmony_ci bool finish_locked) 491bf215546Sopenharmony_ci{ 492bf215546Sopenharmony_ci unsigned i; 493bf215546Sopenharmony_ci 494bf215546Sopenharmony_ci /* Signal all threads to terminate. */ 495bf215546Sopenharmony_ci if (!finish_locked) 496bf215546Sopenharmony_ci simple_mtx_lock(&queue->finish_lock); 497bf215546Sopenharmony_ci 498bf215546Sopenharmony_ci if (keep_num_threads >= queue->num_threads) { 499bf215546Sopenharmony_ci simple_mtx_unlock(&queue->finish_lock); 500bf215546Sopenharmony_ci return; 501bf215546Sopenharmony_ci } 502bf215546Sopenharmony_ci 503bf215546Sopenharmony_ci mtx_lock(&queue->lock); 504bf215546Sopenharmony_ci unsigned old_num_threads = queue->num_threads; 505bf215546Sopenharmony_ci /* Setting num_threads is what causes the threads to terminate. 506bf215546Sopenharmony_ci * Then cnd_broadcast wakes them up and they will exit their function. 507bf215546Sopenharmony_ci */ 508bf215546Sopenharmony_ci queue->num_threads = keep_num_threads; 509bf215546Sopenharmony_ci cnd_broadcast(&queue->has_queued_cond); 510bf215546Sopenharmony_ci mtx_unlock(&queue->lock); 511bf215546Sopenharmony_ci 512bf215546Sopenharmony_ci for (i = keep_num_threads; i < old_num_threads; i++) 513bf215546Sopenharmony_ci thrd_join(queue->threads[i], NULL); 514bf215546Sopenharmony_ci 515bf215546Sopenharmony_ci if (!finish_locked) 516bf215546Sopenharmony_ci simple_mtx_unlock(&queue->finish_lock); 517bf215546Sopenharmony_ci} 518bf215546Sopenharmony_ci 519bf215546Sopenharmony_cistatic void 520bf215546Sopenharmony_ciutil_queue_finish_execute(void *data, void *gdata, int num_thread) 521bf215546Sopenharmony_ci{ 522bf215546Sopenharmony_ci util_barrier *barrier = data; 523bf215546Sopenharmony_ci if (util_barrier_wait(barrier)) 524bf215546Sopenharmony_ci util_barrier_destroy(barrier); 525bf215546Sopenharmony_ci} 526bf215546Sopenharmony_ci 527bf215546Sopenharmony_civoid 528bf215546Sopenharmony_ciutil_queue_destroy(struct util_queue *queue) 529bf215546Sopenharmony_ci{ 530bf215546Sopenharmony_ci util_queue_kill_threads(queue, 0, false); 531bf215546Sopenharmony_ci 532bf215546Sopenharmony_ci /* This makes it safe to call on a queue that failed util_queue_init. */ 533bf215546Sopenharmony_ci if (queue->head.next != NULL) 534bf215546Sopenharmony_ci remove_from_atexit_list(queue); 535bf215546Sopenharmony_ci 536bf215546Sopenharmony_ci cnd_destroy(&queue->has_space_cond); 537bf215546Sopenharmony_ci cnd_destroy(&queue->has_queued_cond); 538bf215546Sopenharmony_ci simple_mtx_destroy(&queue->finish_lock); 539bf215546Sopenharmony_ci mtx_destroy(&queue->lock); 540bf215546Sopenharmony_ci free(queue->jobs); 541bf215546Sopenharmony_ci free(queue->threads); 542bf215546Sopenharmony_ci} 543bf215546Sopenharmony_ci 544bf215546Sopenharmony_civoid 545bf215546Sopenharmony_ciutil_queue_add_job(struct util_queue *queue, 546bf215546Sopenharmony_ci void *job, 547bf215546Sopenharmony_ci struct util_queue_fence *fence, 548bf215546Sopenharmony_ci util_queue_execute_func execute, 549bf215546Sopenharmony_ci util_queue_execute_func cleanup, 550bf215546Sopenharmony_ci const size_t job_size) 551bf215546Sopenharmony_ci{ 552bf215546Sopenharmony_ci struct util_queue_job *ptr; 553bf215546Sopenharmony_ci 554bf215546Sopenharmony_ci mtx_lock(&queue->lock); 555bf215546Sopenharmony_ci if (queue->num_threads == 0) { 556bf215546Sopenharmony_ci mtx_unlock(&queue->lock); 557bf215546Sopenharmony_ci /* well no good option here, but any leaks will be 558bf215546Sopenharmony_ci * short-lived as things are shutting down.. 559bf215546Sopenharmony_ci */ 560bf215546Sopenharmony_ci return; 561bf215546Sopenharmony_ci } 562bf215546Sopenharmony_ci 563bf215546Sopenharmony_ci if (fence) 564bf215546Sopenharmony_ci util_queue_fence_reset(fence); 565bf215546Sopenharmony_ci 566bf215546Sopenharmony_ci assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); 567bf215546Sopenharmony_ci 568bf215546Sopenharmony_ci /* Scale the number of threads up if there's already one job waiting. */ 569bf215546Sopenharmony_ci if (queue->num_queued > 0 && 570bf215546Sopenharmony_ci queue->flags & UTIL_QUEUE_INIT_SCALE_THREADS && 571bf215546Sopenharmony_ci execute != util_queue_finish_execute && 572bf215546Sopenharmony_ci queue->num_threads < queue->max_threads) { 573bf215546Sopenharmony_ci util_queue_adjust_num_threads(queue, queue->num_threads + 1); 574bf215546Sopenharmony_ci } 575bf215546Sopenharmony_ci 576bf215546Sopenharmony_ci if (queue->num_queued == queue->max_jobs) { 577bf215546Sopenharmony_ci if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL && 578bf215546Sopenharmony_ci queue->total_jobs_size + job_size < S_256MB) { 579bf215546Sopenharmony_ci /* If the queue is full, make it larger to avoid waiting for a free 580bf215546Sopenharmony_ci * slot. 581bf215546Sopenharmony_ci */ 582bf215546Sopenharmony_ci unsigned new_max_jobs = queue->max_jobs + 8; 583bf215546Sopenharmony_ci struct util_queue_job *jobs = 584bf215546Sopenharmony_ci (struct util_queue_job*)calloc(new_max_jobs, 585bf215546Sopenharmony_ci sizeof(struct util_queue_job)); 586bf215546Sopenharmony_ci assert(jobs); 587bf215546Sopenharmony_ci 588bf215546Sopenharmony_ci /* Copy all queued jobs into the new list. */ 589bf215546Sopenharmony_ci unsigned num_jobs = 0; 590bf215546Sopenharmony_ci unsigned i = queue->read_idx; 591bf215546Sopenharmony_ci 592bf215546Sopenharmony_ci do { 593bf215546Sopenharmony_ci jobs[num_jobs++] = queue->jobs[i]; 594bf215546Sopenharmony_ci i = (i + 1) % queue->max_jobs; 595bf215546Sopenharmony_ci } while (i != queue->write_idx); 596bf215546Sopenharmony_ci 597bf215546Sopenharmony_ci assert(num_jobs == queue->num_queued); 598bf215546Sopenharmony_ci 599bf215546Sopenharmony_ci free(queue->jobs); 600bf215546Sopenharmony_ci queue->jobs = jobs; 601bf215546Sopenharmony_ci queue->read_idx = 0; 602bf215546Sopenharmony_ci queue->write_idx = num_jobs; 603bf215546Sopenharmony_ci queue->max_jobs = new_max_jobs; 604bf215546Sopenharmony_ci } else { 605bf215546Sopenharmony_ci /* Wait until there is a free slot. */ 606bf215546Sopenharmony_ci while (queue->num_queued == queue->max_jobs) 607bf215546Sopenharmony_ci cnd_wait(&queue->has_space_cond, &queue->lock); 608bf215546Sopenharmony_ci } 609bf215546Sopenharmony_ci } 610bf215546Sopenharmony_ci 611bf215546Sopenharmony_ci ptr = &queue->jobs[queue->write_idx]; 612bf215546Sopenharmony_ci assert(ptr->job == NULL); 613bf215546Sopenharmony_ci ptr->job = job; 614bf215546Sopenharmony_ci ptr->global_data = queue->global_data; 615bf215546Sopenharmony_ci ptr->fence = fence; 616bf215546Sopenharmony_ci ptr->execute = execute; 617bf215546Sopenharmony_ci ptr->cleanup = cleanup; 618bf215546Sopenharmony_ci ptr->job_size = job_size; 619bf215546Sopenharmony_ci 620bf215546Sopenharmony_ci queue->write_idx = (queue->write_idx + 1) % queue->max_jobs; 621bf215546Sopenharmony_ci queue->total_jobs_size += ptr->job_size; 622bf215546Sopenharmony_ci 623bf215546Sopenharmony_ci queue->num_queued++; 624bf215546Sopenharmony_ci cnd_signal(&queue->has_queued_cond); 625bf215546Sopenharmony_ci mtx_unlock(&queue->lock); 626bf215546Sopenharmony_ci} 627bf215546Sopenharmony_ci 628bf215546Sopenharmony_ci/** 629bf215546Sopenharmony_ci * Remove a queued job. If the job hasn't started execution, it's removed from 630bf215546Sopenharmony_ci * the queue. If the job has started execution, the function waits for it to 631bf215546Sopenharmony_ci * complete. 632bf215546Sopenharmony_ci * 633bf215546Sopenharmony_ci * In all cases, the fence is signalled when the function returns. 634bf215546Sopenharmony_ci * 635bf215546Sopenharmony_ci * The function can be used when destroying an object associated with the job 636bf215546Sopenharmony_ci * when you don't care about the job completion state. 637bf215546Sopenharmony_ci */ 638bf215546Sopenharmony_civoid 639bf215546Sopenharmony_ciutil_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence) 640bf215546Sopenharmony_ci{ 641bf215546Sopenharmony_ci bool removed = false; 642bf215546Sopenharmony_ci 643bf215546Sopenharmony_ci if (util_queue_fence_is_signalled(fence)) 644bf215546Sopenharmony_ci return; 645bf215546Sopenharmony_ci 646bf215546Sopenharmony_ci mtx_lock(&queue->lock); 647bf215546Sopenharmony_ci for (unsigned i = queue->read_idx; i != queue->write_idx; 648bf215546Sopenharmony_ci i = (i + 1) % queue->max_jobs) { 649bf215546Sopenharmony_ci if (queue->jobs[i].fence == fence) { 650bf215546Sopenharmony_ci if (queue->jobs[i].cleanup) 651bf215546Sopenharmony_ci queue->jobs[i].cleanup(queue->jobs[i].job, queue->global_data, -1); 652bf215546Sopenharmony_ci 653bf215546Sopenharmony_ci /* Just clear it. The threads will treat as a no-op job. */ 654bf215546Sopenharmony_ci memset(&queue->jobs[i], 0, sizeof(queue->jobs[i])); 655bf215546Sopenharmony_ci removed = true; 656bf215546Sopenharmony_ci break; 657bf215546Sopenharmony_ci } 658bf215546Sopenharmony_ci } 659bf215546Sopenharmony_ci mtx_unlock(&queue->lock); 660bf215546Sopenharmony_ci 661bf215546Sopenharmony_ci if (removed) 662bf215546Sopenharmony_ci util_queue_fence_signal(fence); 663bf215546Sopenharmony_ci else 664bf215546Sopenharmony_ci util_queue_fence_wait(fence); 665bf215546Sopenharmony_ci} 666bf215546Sopenharmony_ci 667bf215546Sopenharmony_ci/** 668bf215546Sopenharmony_ci * Wait until all previously added jobs have completed. 669bf215546Sopenharmony_ci */ 670bf215546Sopenharmony_civoid 671bf215546Sopenharmony_ciutil_queue_finish(struct util_queue *queue) 672bf215546Sopenharmony_ci{ 673bf215546Sopenharmony_ci util_barrier barrier; 674bf215546Sopenharmony_ci struct util_queue_fence *fences; 675bf215546Sopenharmony_ci 676bf215546Sopenharmony_ci /* If 2 threads were adding jobs for 2 different barries at the same time, 677bf215546Sopenharmony_ci * a deadlock would happen, because 1 barrier requires that all threads 678bf215546Sopenharmony_ci * wait for it exclusively. 679bf215546Sopenharmony_ci */ 680bf215546Sopenharmony_ci simple_mtx_lock(&queue->finish_lock); 681bf215546Sopenharmony_ci 682bf215546Sopenharmony_ci /* The number of threads can be changed to 0, e.g. by the atexit handler. */ 683bf215546Sopenharmony_ci if (!queue->num_threads) { 684bf215546Sopenharmony_ci simple_mtx_unlock(&queue->finish_lock); 685bf215546Sopenharmony_ci return; 686bf215546Sopenharmony_ci } 687bf215546Sopenharmony_ci 688bf215546Sopenharmony_ci fences = malloc(queue->num_threads * sizeof(*fences)); 689bf215546Sopenharmony_ci util_barrier_init(&barrier, queue->num_threads); 690bf215546Sopenharmony_ci 691bf215546Sopenharmony_ci for (unsigned i = 0; i < queue->num_threads; ++i) { 692bf215546Sopenharmony_ci util_queue_fence_init(&fences[i]); 693bf215546Sopenharmony_ci util_queue_add_job(queue, &barrier, &fences[i], 694bf215546Sopenharmony_ci util_queue_finish_execute, NULL, 0); 695bf215546Sopenharmony_ci } 696bf215546Sopenharmony_ci 697bf215546Sopenharmony_ci for (unsigned i = 0; i < queue->num_threads; ++i) { 698bf215546Sopenharmony_ci util_queue_fence_wait(&fences[i]); 699bf215546Sopenharmony_ci util_queue_fence_destroy(&fences[i]); 700bf215546Sopenharmony_ci } 701bf215546Sopenharmony_ci simple_mtx_unlock(&queue->finish_lock); 702bf215546Sopenharmony_ci 703bf215546Sopenharmony_ci free(fences); 704bf215546Sopenharmony_ci} 705bf215546Sopenharmony_ci 706bf215546Sopenharmony_ciint64_t 707bf215546Sopenharmony_ciutil_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index) 708bf215546Sopenharmony_ci{ 709bf215546Sopenharmony_ci /* Allow some flexibility by not raising an error. */ 710bf215546Sopenharmony_ci if (thread_index >= queue->num_threads) 711bf215546Sopenharmony_ci return 0; 712bf215546Sopenharmony_ci 713bf215546Sopenharmony_ci return util_thread_get_time_nano(queue->threads[thread_index]); 714bf215546Sopenharmony_ci} 715