1/* 2 * Copyright © 2016 Advanced Micro Devices, Inc. 3 * All Rights Reserved. 4 * 5 * Permission is hereby granted, free of charge, to any person obtaining 6 * a copy of this software and associated documentation files (the 7 * "Software"), to deal in the Software without restriction, including 8 * without limitation the rights to use, copy, modify, merge, publish, 9 * distribute, sub license, and/or sell copies of the Software, and to 10 * permit persons to whom the Software is furnished to do so, subject to 11 * the following conditions: 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 14 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 15 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 16 * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS 17 * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, 19 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20 * USE OR OTHER DEALINGS IN THE SOFTWARE. 21 * 22 * The above copyright notice and this permission notice (including the 23 * next paragraph) shall be included in all copies or substantial portions 24 * of the Software. 25 */ 26 27#include "u_queue.h" 28 29#include "c11/threads.h" 30#include "util/u_cpu_detect.h" 31#include "util/os_time.h" 32#include "util/u_string.h" 33#include "util/u_thread.h" 34#include "u_process.h" 35 36#if defined(__linux__) 37#include <sys/time.h> 38#include <sys/resource.h> 39#include <sys/syscall.h> 40#endif 41 42 43/* Define 256MB */ 44#define S_256MB (256 * 1024 * 1024) 45 46static void 47util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads, 48 bool finish_locked); 49 50/**************************************************************************** 51 * Wait for all queues to assert idle when exit() is called. 52 * 53 * Otherwise, C++ static variable destructors can be called while threads 54 * are using the static variables. 55 */ 56 57static once_flag atexit_once_flag = ONCE_FLAG_INIT; 58static struct list_head queue_list; 59static mtx_t exit_mutex = _MTX_INITIALIZER_NP; 60 61static void 62atexit_handler(void) 63{ 64 struct util_queue *iter; 65 66 mtx_lock(&exit_mutex); 67 /* Wait for all queues to assert idle. */ 68 LIST_FOR_EACH_ENTRY(iter, &queue_list, head) { 69 util_queue_kill_threads(iter, 0, false); 70 } 71 mtx_unlock(&exit_mutex); 72} 73 74static void 75global_init(void) 76{ 77 list_inithead(&queue_list); 78 atexit(atexit_handler); 79} 80 81static void 82add_to_atexit_list(struct util_queue *queue) 83{ 84 call_once(&atexit_once_flag, global_init); 85 86 mtx_lock(&exit_mutex); 87 list_add(&queue->head, &queue_list); 88 mtx_unlock(&exit_mutex); 89} 90 91static void 92remove_from_atexit_list(struct util_queue *queue) 93{ 94 struct util_queue *iter, *tmp; 95 96 mtx_lock(&exit_mutex); 97 LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) { 98 if (iter == queue) { 99 list_del(&iter->head); 100 break; 101 } 102 } 103 mtx_unlock(&exit_mutex); 104} 105 106/**************************************************************************** 107 * util_queue_fence 108 */ 109 110#ifdef UTIL_QUEUE_FENCE_FUTEX 111static bool 112do_futex_fence_wait(struct util_queue_fence *fence, 113 bool timeout, int64_t abs_timeout) 114{ 115 uint32_t v = p_atomic_read_relaxed(&fence->val); 116 struct timespec ts; 117 ts.tv_sec = abs_timeout / (1000*1000*1000); 118 ts.tv_nsec = abs_timeout % (1000*1000*1000); 119 120 while (v != 0) { 121 if (v != 2) { 122 v = p_atomic_cmpxchg(&fence->val, 1, 2); 123 if (v == 0) 124 return true; 125 } 126 127 int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL); 128 if (timeout && r < 0) { 129 if (errno == ETIMEDOUT) 130 return false; 131 } 132 133 v = p_atomic_read_relaxed(&fence->val); 134 } 135 136 return true; 137} 138 139void 140_util_queue_fence_wait(struct util_queue_fence *fence) 141{ 142 do_futex_fence_wait(fence, false, 0); 143} 144 145bool 146_util_queue_fence_wait_timeout(struct util_queue_fence *fence, 147 int64_t abs_timeout) 148{ 149 return do_futex_fence_wait(fence, true, abs_timeout); 150} 151 152#endif 153 154#ifdef UTIL_QUEUE_FENCE_STANDARD 155void 156util_queue_fence_signal(struct util_queue_fence *fence) 157{ 158 mtx_lock(&fence->mutex); 159 fence->signalled = true; 160 cnd_broadcast(&fence->cond); 161 mtx_unlock(&fence->mutex); 162} 163 164void 165_util_queue_fence_wait(struct util_queue_fence *fence) 166{ 167 mtx_lock(&fence->mutex); 168 while (!fence->signalled) 169 cnd_wait(&fence->cond, &fence->mutex); 170 mtx_unlock(&fence->mutex); 171} 172 173bool 174_util_queue_fence_wait_timeout(struct util_queue_fence *fence, 175 int64_t abs_timeout) 176{ 177 /* This terrible hack is made necessary by the fact that we really want an 178 * internal interface consistent with os_time_*, but cnd_timedwait is spec'd 179 * to be relative to the TIME_UTC clock. 180 */ 181 int64_t rel = abs_timeout - os_time_get_nano(); 182 183 if (rel > 0) { 184 struct timespec ts; 185 186 timespec_get(&ts, TIME_UTC); 187 188 ts.tv_sec += abs_timeout / (1000*1000*1000); 189 ts.tv_nsec += abs_timeout % (1000*1000*1000); 190 if (ts.tv_nsec >= (1000*1000*1000)) { 191 ts.tv_sec++; 192 ts.tv_nsec -= (1000*1000*1000); 193 } 194 195 mtx_lock(&fence->mutex); 196 while (!fence->signalled) { 197 if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success) 198 break; 199 } 200 mtx_unlock(&fence->mutex); 201 } 202 203 return fence->signalled; 204} 205 206void 207util_queue_fence_init(struct util_queue_fence *fence) 208{ 209 memset(fence, 0, sizeof(*fence)); 210 (void) mtx_init(&fence->mutex, mtx_plain); 211 cnd_init(&fence->cond); 212 fence->signalled = true; 213} 214 215void 216util_queue_fence_destroy(struct util_queue_fence *fence) 217{ 218 assert(fence->signalled); 219 220 /* Ensure that another thread is not in the middle of 221 * util_queue_fence_signal (having set the fence to signalled but still 222 * holding the fence mutex). 223 * 224 * A common contract between threads is that as soon as a fence is signalled 225 * by thread A, thread B is allowed to destroy it. Since 226 * util_queue_fence_is_signalled does not lock the fence mutex (for 227 * performance reasons), we must do so here. 228 */ 229 mtx_lock(&fence->mutex); 230 mtx_unlock(&fence->mutex); 231 232 cnd_destroy(&fence->cond); 233 mtx_destroy(&fence->mutex); 234} 235#endif 236 237/**************************************************************************** 238 * util_queue implementation 239 */ 240 241struct thread_input { 242 struct util_queue *queue; 243 int thread_index; 244}; 245 246static int 247util_queue_thread_func(void *input) 248{ 249 struct util_queue *queue = ((struct thread_input*)input)->queue; 250 int thread_index = ((struct thread_input*)input)->thread_index; 251 252 free(input); 253 254 if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) { 255 /* Don't inherit the thread affinity from the parent thread. 256 * Set the full mask. 257 */ 258 uint32_t mask[UTIL_MAX_CPUS / 32]; 259 260 memset(mask, 0xff, sizeof(mask)); 261 262 util_set_current_thread_affinity(mask, NULL, 263 util_get_cpu_caps()->num_cpu_mask_bits); 264 } 265 266#if defined(__linux__) 267 if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) { 268 /* The nice() function can only set a maximum of 19. */ 269 setpriority(PRIO_PROCESS, syscall(SYS_gettid), 19); 270 } 271#endif 272 273 if (strlen(queue->name) > 0) { 274 char name[16]; 275 snprintf(name, sizeof(name), "%s%i", queue->name, thread_index); 276 u_thread_setname(name); 277 } 278 279 while (1) { 280 struct util_queue_job job; 281 282 mtx_lock(&queue->lock); 283 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); 284 285 /* wait if the queue is empty */ 286 while (thread_index < queue->num_threads && queue->num_queued == 0) 287 cnd_wait(&queue->has_queued_cond, &queue->lock); 288 289 /* only kill threads that are above "num_threads" */ 290 if (thread_index >= queue->num_threads) { 291 mtx_unlock(&queue->lock); 292 break; 293 } 294 295 job = queue->jobs[queue->read_idx]; 296 memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job)); 297 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; 298 299 queue->num_queued--; 300 cnd_signal(&queue->has_space_cond); 301 if (job.job) 302 queue->total_jobs_size -= job.job_size; 303 mtx_unlock(&queue->lock); 304 305 if (job.job) { 306 job.execute(job.job, job.global_data, thread_index); 307 if (job.fence) 308 util_queue_fence_signal(job.fence); 309 if (job.cleanup) 310 job.cleanup(job.job, job.global_data, thread_index); 311 } 312 } 313 314 /* signal remaining jobs if all threads are being terminated */ 315 mtx_lock(&queue->lock); 316 if (queue->num_threads == 0) { 317 for (unsigned i = queue->read_idx; i != queue->write_idx; 318 i = (i + 1) % queue->max_jobs) { 319 if (queue->jobs[i].job) { 320 if (queue->jobs[i].fence) 321 util_queue_fence_signal(queue->jobs[i].fence); 322 queue->jobs[i].job = NULL; 323 } 324 } 325 queue->read_idx = queue->write_idx; 326 queue->num_queued = 0; 327 } 328 mtx_unlock(&queue->lock); 329 return 0; 330} 331 332static bool 333util_queue_create_thread(struct util_queue *queue, unsigned index) 334{ 335 struct thread_input *input = 336 (struct thread_input *) malloc(sizeof(struct thread_input)); 337 input->queue = queue; 338 input->thread_index = index; 339 340 if (thrd_success != u_thread_create(queue->threads + index, util_queue_thread_func, input)) { 341 free(input); 342 return false; 343 } 344 345 if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) { 346#if defined(__linux__) && defined(SCHED_BATCH) 347 struct sched_param sched_param = {0}; 348 349 /* The nice() function can only set a maximum of 19. 350 * SCHED_BATCH gives the scheduler a hint that this is a latency 351 * insensitive thread. 352 * 353 * Note that Linux only allows decreasing the priority. The original 354 * priority can't be restored. 355 */ 356 pthread_setschedparam(queue->threads[index], SCHED_BATCH, &sched_param); 357#endif 358 } 359 return true; 360} 361 362void 363util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads) 364{ 365 num_threads = MIN2(num_threads, queue->max_threads); 366 num_threads = MAX2(num_threads, 1); 367 368 simple_mtx_lock(&queue->finish_lock); 369 unsigned old_num_threads = queue->num_threads; 370 371 if (num_threads == old_num_threads) { 372 simple_mtx_unlock(&queue->finish_lock); 373 return; 374 } 375 376 if (num_threads < old_num_threads) { 377 util_queue_kill_threads(queue, num_threads, true); 378 simple_mtx_unlock(&queue->finish_lock); 379 return; 380 } 381 382 /* Create threads. 383 * 384 * We need to update num_threads first, because threads terminate 385 * when thread_index < num_threads. 386 */ 387 queue->num_threads = num_threads; 388 for (unsigned i = old_num_threads; i < num_threads; i++) { 389 if (!util_queue_create_thread(queue, i)) { 390 queue->num_threads = i; 391 break; 392 } 393 } 394 simple_mtx_unlock(&queue->finish_lock); 395} 396 397bool 398util_queue_init(struct util_queue *queue, 399 const char *name, 400 unsigned max_jobs, 401 unsigned num_threads, 402 unsigned flags, 403 void *global_data) 404{ 405 unsigned i; 406 407 /* Form the thread name from process_name and name, limited to 13 408 * characters. Characters 14-15 are reserved for the thread number. 409 * Character 16 should be 0. Final form: "process:name12" 410 * 411 * If name is too long, it's truncated. If any space is left, the process 412 * name fills it. 413 */ 414 const char *process_name = util_get_process_name(); 415 int process_len = process_name ? strlen(process_name) : 0; 416 int name_len = strlen(name); 417 const int max_chars = sizeof(queue->name) - 1; 418 419 name_len = MIN2(name_len, max_chars); 420 421 /* See if there is any space left for the process name, reserve 1 for 422 * the colon. */ 423 process_len = MIN2(process_len, max_chars - name_len - 1); 424 process_len = MAX2(process_len, 0); 425 426 memset(queue, 0, sizeof(*queue)); 427 428 if (process_len) { 429 snprintf(queue->name, sizeof(queue->name), "%.*s:%s", 430 process_len, process_name, name); 431 } else { 432 snprintf(queue->name, sizeof(queue->name), "%s", name); 433 } 434 435 queue->flags = flags; 436 queue->max_threads = num_threads; 437 queue->num_threads = (flags & UTIL_QUEUE_INIT_SCALE_THREADS) ? 1 : num_threads; 438 queue->max_jobs = max_jobs; 439 queue->global_data = global_data; 440 441 (void) mtx_init(&queue->lock, mtx_plain); 442 (void) simple_mtx_init(&queue->finish_lock, mtx_plain); 443 444 queue->num_queued = 0; 445 cnd_init(&queue->has_queued_cond); 446 cnd_init(&queue->has_space_cond); 447 448 queue->jobs = (struct util_queue_job*) 449 calloc(max_jobs, sizeof(struct util_queue_job)); 450 if (!queue->jobs) 451 goto fail; 452 453 queue->threads = (thrd_t*) calloc(queue->max_threads, sizeof(thrd_t)); 454 if (!queue->threads) 455 goto fail; 456 457 /* start threads */ 458 for (i = 0; i < queue->num_threads; i++) { 459 if (!util_queue_create_thread(queue, i)) { 460 if (i == 0) { 461 /* no threads created, fail */ 462 goto fail; 463 } else { 464 /* at least one thread created, so use it */ 465 queue->num_threads = i; 466 break; 467 } 468 } 469 } 470 471 add_to_atexit_list(queue); 472 return true; 473 474fail: 475 free(queue->threads); 476 477 if (queue->jobs) { 478 cnd_destroy(&queue->has_space_cond); 479 cnd_destroy(&queue->has_queued_cond); 480 mtx_destroy(&queue->lock); 481 free(queue->jobs); 482 } 483 /* also util_queue_is_initialized can be used to check for success */ 484 memset(queue, 0, sizeof(*queue)); 485 return false; 486} 487 488static void 489util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads, 490 bool finish_locked) 491{ 492 unsigned i; 493 494 /* Signal all threads to terminate. */ 495 if (!finish_locked) 496 simple_mtx_lock(&queue->finish_lock); 497 498 if (keep_num_threads >= queue->num_threads) { 499 simple_mtx_unlock(&queue->finish_lock); 500 return; 501 } 502 503 mtx_lock(&queue->lock); 504 unsigned old_num_threads = queue->num_threads; 505 /* Setting num_threads is what causes the threads to terminate. 506 * Then cnd_broadcast wakes them up and they will exit their function. 507 */ 508 queue->num_threads = keep_num_threads; 509 cnd_broadcast(&queue->has_queued_cond); 510 mtx_unlock(&queue->lock); 511 512 for (i = keep_num_threads; i < old_num_threads; i++) 513 thrd_join(queue->threads[i], NULL); 514 515 if (!finish_locked) 516 simple_mtx_unlock(&queue->finish_lock); 517} 518 519static void 520util_queue_finish_execute(void *data, void *gdata, int num_thread) 521{ 522 util_barrier *barrier = data; 523 if (util_barrier_wait(barrier)) 524 util_barrier_destroy(barrier); 525} 526 527void 528util_queue_destroy(struct util_queue *queue) 529{ 530 util_queue_kill_threads(queue, 0, false); 531 532 /* This makes it safe to call on a queue that failed util_queue_init. */ 533 if (queue->head.next != NULL) 534 remove_from_atexit_list(queue); 535 536 cnd_destroy(&queue->has_space_cond); 537 cnd_destroy(&queue->has_queued_cond); 538 simple_mtx_destroy(&queue->finish_lock); 539 mtx_destroy(&queue->lock); 540 free(queue->jobs); 541 free(queue->threads); 542} 543 544void 545util_queue_add_job(struct util_queue *queue, 546 void *job, 547 struct util_queue_fence *fence, 548 util_queue_execute_func execute, 549 util_queue_execute_func cleanup, 550 const size_t job_size) 551{ 552 struct util_queue_job *ptr; 553 554 mtx_lock(&queue->lock); 555 if (queue->num_threads == 0) { 556 mtx_unlock(&queue->lock); 557 /* well no good option here, but any leaks will be 558 * short-lived as things are shutting down.. 559 */ 560 return; 561 } 562 563 if (fence) 564 util_queue_fence_reset(fence); 565 566 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); 567 568 /* Scale the number of threads up if there's already one job waiting. */ 569 if (queue->num_queued > 0 && 570 queue->flags & UTIL_QUEUE_INIT_SCALE_THREADS && 571 execute != util_queue_finish_execute && 572 queue->num_threads < queue->max_threads) { 573 util_queue_adjust_num_threads(queue, queue->num_threads + 1); 574 } 575 576 if (queue->num_queued == queue->max_jobs) { 577 if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL && 578 queue->total_jobs_size + job_size < S_256MB) { 579 /* If the queue is full, make it larger to avoid waiting for a free 580 * slot. 581 */ 582 unsigned new_max_jobs = queue->max_jobs + 8; 583 struct util_queue_job *jobs = 584 (struct util_queue_job*)calloc(new_max_jobs, 585 sizeof(struct util_queue_job)); 586 assert(jobs); 587 588 /* Copy all queued jobs into the new list. */ 589 unsigned num_jobs = 0; 590 unsigned i = queue->read_idx; 591 592 do { 593 jobs[num_jobs++] = queue->jobs[i]; 594 i = (i + 1) % queue->max_jobs; 595 } while (i != queue->write_idx); 596 597 assert(num_jobs == queue->num_queued); 598 599 free(queue->jobs); 600 queue->jobs = jobs; 601 queue->read_idx = 0; 602 queue->write_idx = num_jobs; 603 queue->max_jobs = new_max_jobs; 604 } else { 605 /* Wait until there is a free slot. */ 606 while (queue->num_queued == queue->max_jobs) 607 cnd_wait(&queue->has_space_cond, &queue->lock); 608 } 609 } 610 611 ptr = &queue->jobs[queue->write_idx]; 612 assert(ptr->job == NULL); 613 ptr->job = job; 614 ptr->global_data = queue->global_data; 615 ptr->fence = fence; 616 ptr->execute = execute; 617 ptr->cleanup = cleanup; 618 ptr->job_size = job_size; 619 620 queue->write_idx = (queue->write_idx + 1) % queue->max_jobs; 621 queue->total_jobs_size += ptr->job_size; 622 623 queue->num_queued++; 624 cnd_signal(&queue->has_queued_cond); 625 mtx_unlock(&queue->lock); 626} 627 628/** 629 * Remove a queued job. If the job hasn't started execution, it's removed from 630 * the queue. If the job has started execution, the function waits for it to 631 * complete. 632 * 633 * In all cases, the fence is signalled when the function returns. 634 * 635 * The function can be used when destroying an object associated with the job 636 * when you don't care about the job completion state. 637 */ 638void 639util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence) 640{ 641 bool removed = false; 642 643 if (util_queue_fence_is_signalled(fence)) 644 return; 645 646 mtx_lock(&queue->lock); 647 for (unsigned i = queue->read_idx; i != queue->write_idx; 648 i = (i + 1) % queue->max_jobs) { 649 if (queue->jobs[i].fence == fence) { 650 if (queue->jobs[i].cleanup) 651 queue->jobs[i].cleanup(queue->jobs[i].job, queue->global_data, -1); 652 653 /* Just clear it. The threads will treat as a no-op job. */ 654 memset(&queue->jobs[i], 0, sizeof(queue->jobs[i])); 655 removed = true; 656 break; 657 } 658 } 659 mtx_unlock(&queue->lock); 660 661 if (removed) 662 util_queue_fence_signal(fence); 663 else 664 util_queue_fence_wait(fence); 665} 666 667/** 668 * Wait until all previously added jobs have completed. 669 */ 670void 671util_queue_finish(struct util_queue *queue) 672{ 673 util_barrier barrier; 674 struct util_queue_fence *fences; 675 676 /* If 2 threads were adding jobs for 2 different barries at the same time, 677 * a deadlock would happen, because 1 barrier requires that all threads 678 * wait for it exclusively. 679 */ 680 simple_mtx_lock(&queue->finish_lock); 681 682 /* The number of threads can be changed to 0, e.g. by the atexit handler. */ 683 if (!queue->num_threads) { 684 simple_mtx_unlock(&queue->finish_lock); 685 return; 686 } 687 688 fences = malloc(queue->num_threads * sizeof(*fences)); 689 util_barrier_init(&barrier, queue->num_threads); 690 691 for (unsigned i = 0; i < queue->num_threads; ++i) { 692 util_queue_fence_init(&fences[i]); 693 util_queue_add_job(queue, &barrier, &fences[i], 694 util_queue_finish_execute, NULL, 0); 695 } 696 697 for (unsigned i = 0; i < queue->num_threads; ++i) { 698 util_queue_fence_wait(&fences[i]); 699 util_queue_fence_destroy(&fences[i]); 700 } 701 simple_mtx_unlock(&queue->finish_lock); 702 703 free(fences); 704} 705 706int64_t 707util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index) 708{ 709 /* Allow some flexibility by not raising an error. */ 710 if (thread_index >= queue->num_threads) 711 return 0; 712 713 return util_thread_get_time_nano(queue->threads[thread_index]); 714} 715