Lines Matching refs:queue

47 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
82 add_to_atexit_list(struct util_queue *queue)
87 list_add(&queue->head, &queue_list);
92 remove_from_atexit_list(struct util_queue *queue)
98 if (iter == queue) {
242 struct util_queue *queue;
249 struct util_queue *queue = ((struct thread_input*)input)->queue;
254 if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) {
267 if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
273 if (strlen(queue->name) > 0) {
275 snprintf(name, sizeof(name), "%s%i", queue->name, thread_index);
282 mtx_lock(&queue->lock);
283 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
285 /* wait if the queue is empty */
286 while (thread_index < queue->num_threads && queue->num_queued == 0)
287 cnd_wait(&queue->has_queued_cond, &queue->lock);
290 if (thread_index >= queue->num_threads) {
291 mtx_unlock(&queue->lock);
295 job = queue->jobs[queue->read_idx];
296 memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
297 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
299 queue->num_queued--;
300 cnd_signal(&queue->has_space_cond);
302 queue->total_jobs_size -= job.job_size;
303 mtx_unlock(&queue->lock);
315 mtx_lock(&queue->lock);
316 if (queue->num_threads == 0) {
317 for (unsigned i = queue->read_idx; i != queue->write_idx;
318 i = (i + 1) % queue->max_jobs) {
319 if (queue->jobs[i].job) {
320 if (queue->jobs[i].fence)
321 util_queue_fence_signal(queue->jobs[i].fence);
322 queue->jobs[i].job = NULL;
325 queue->read_idx = queue->write_idx;
326 queue->num_queued = 0;
328 mtx_unlock(&queue->lock);
333 util_queue_create_thread(struct util_queue *queue, unsigned index)
337 input->queue = queue;
340 if (thrd_success != u_thread_create(queue->threads + index, util_queue_thread_func, input)) {
345 if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
356 pthread_setschedparam(queue->threads[index], SCHED_BATCH, &sched_param);
363 util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads)
365 num_threads = MIN2(num_threads, queue->max_threads);
368 simple_mtx_lock(&queue->finish_lock);
369 unsigned old_num_threads = queue->num_threads;
372 simple_mtx_unlock(&queue->finish_lock);
377 util_queue_kill_threads(queue, num_threads, true);
378 simple_mtx_unlock(&queue->finish_lock);
387 queue->num_threads = num_threads;
389 if (!util_queue_create_thread(queue, i)) {
390 queue->num_threads = i;
394 simple_mtx_unlock(&queue->finish_lock);
398 util_queue_init(struct util_queue *queue,
417 const int max_chars = sizeof(queue->name) - 1;
426 memset(queue, 0, sizeof(*queue));
429 snprintf(queue->name, sizeof(queue->name), "%.*s:%s",
432 snprintf(queue->name, sizeof(queue->name), "%s", name);
435 queue->flags = flags;
436 queue->max_threads = num_threads;
437 queue->num_threads = (flags & UTIL_QUEUE_INIT_SCALE_THREADS) ? 1 : num_threads;
438 queue->max_jobs = max_jobs;
439 queue->global_data = global_data;
441 (void) mtx_init(&queue->lock, mtx_plain);
442 (void) simple_mtx_init(&queue->finish_lock, mtx_plain);
444 queue->num_queued = 0;
445 cnd_init(&queue->has_queued_cond);
446 cnd_init(&queue->has_space_cond);
448 queue->jobs = (struct util_queue_job*)
450 if (!queue->jobs)
453 queue->threads = (thrd_t*) calloc(queue->max_threads, sizeof(thrd_t));
454 if (!queue->threads)
458 for (i = 0; i < queue->num_threads; i++) {
459 if (!util_queue_create_thread(queue, i)) {
465 queue->num_threads = i;
471 add_to_atexit_list(queue);
475 free(queue->threads);
477 if (queue->jobs) {
478 cnd_destroy(&queue->has_space_cond);
479 cnd_destroy(&queue->has_queued_cond);
480 mtx_destroy(&queue->lock);
481 free(queue->jobs);
484 memset(queue, 0, sizeof(*queue));
489 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
496 simple_mtx_lock(&queue->finish_lock);
498 if (keep_num_threads >= queue->num_threads) {
499 simple_mtx_unlock(&queue->finish_lock);
503 mtx_lock(&queue->lock);
504 unsigned old_num_threads = queue->num_threads;
508 queue->num_threads = keep_num_threads;
509 cnd_broadcast(&queue->has_queued_cond);
510 mtx_unlock(&queue->lock);
513 thrd_join(queue->threads[i], NULL);
516 simple_mtx_unlock(&queue->finish_lock);
528 util_queue_destroy(struct util_queue *queue)
530 util_queue_kill_threads(queue, 0, false);
532 /* This makes it safe to call on a queue that failed util_queue_init. */
533 if (queue->head.next != NULL)
534 remove_from_atexit_list(queue);
536 cnd_destroy(&queue->has_space_cond);
537 cnd_destroy(&queue->has_queued_cond);
538 simple_mtx_destroy(&queue->finish_lock);
539 mtx_destroy(&queue->lock);
540 free(queue->jobs);
541 free(queue->threads);
545 util_queue_add_job(struct util_queue *queue,
554 mtx_lock(&queue->lock);
555 if (queue->num_threads == 0) {
556 mtx_unlock(&queue->lock);
566 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
569 if (queue->num_queued > 0 &&
570 queue->flags & UTIL_QUEUE_INIT_SCALE_THREADS &&
572 queue->num_threads < queue->max_threads) {
573 util_queue_adjust_num_threads(queue, queue->num_threads + 1);
576 if (queue->num_queued == queue->max_jobs) {
577 if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL &&
578 queue->total_jobs_size + job_size < S_256MB) {
579 /* If the queue is full, make it larger to avoid waiting for a free
582 unsigned new_max_jobs = queue->max_jobs + 8;
590 unsigned i = queue->read_idx;
593 jobs[num_jobs++] = queue->jobs[i];
594 i = (i + 1) % queue->max_jobs;
595 } while (i != queue->write_idx);
597 assert(num_jobs == queue->num_queued);
599 free(queue->jobs);
600 queue->jobs = jobs;
601 queue->read_idx = 0;
602 queue->write_idx = num_jobs;
603 queue->max_jobs = new_max_jobs;
606 while (queue->num_queued == queue->max_jobs)
607 cnd_wait(&queue->has_space_cond, &queue->lock);
611 ptr = &queue->jobs[queue->write_idx];
614 ptr->global_data = queue->global_data;
620 queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
621 queue->total_jobs_size += ptr->job_size;
623 queue->num_queued++;
624 cnd_signal(&queue->has_queued_cond);
625 mtx_unlock(&queue->lock);
630 * the queue. If the job has started execution, the function waits for it to
639 util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence)
646 mtx_lock(&queue->lock);
647 for (unsigned i = queue->read_idx; i != queue->write_idx;
648 i = (i + 1) % queue->max_jobs) {
649 if (queue->jobs[i].fence == fence) {
650 if (queue->jobs[i].cleanup)
651 queue->jobs[i].cleanup(queue->jobs[i].job, queue->global_data, -1);
654 memset(&queue->jobs[i], 0, sizeof(queue->jobs[i]));
659 mtx_unlock(&queue->lock);
671 util_queue_finish(struct util_queue *queue)
680 simple_mtx_lock(&queue->finish_lock);
683 if (!queue->num_threads) {
684 simple_mtx_unlock(&queue->finish_lock);
688 fences = malloc(queue->num_threads * sizeof(*fences));
689 util_barrier_init(&barrier, queue->num_threads);
691 for (unsigned i = 0; i < queue->num_threads; ++i) {
693 util_queue_add_job(queue, &barrier, &fences[i],
697 for (unsigned i = 0; i < queue->num_threads; ++i) {
701 simple_mtx_unlock(&queue->finish_lock);
707 util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index)
710 if (thread_index >= queue->num_threads)
713 return util_thread_get_time_nano(queue->threads[thread_index]);