xref: /third_party/mesa3d/src/util/u_queue.c (revision bf215546)
1/*
2 * Copyright © 2016 Advanced Micro Devices, Inc.
3 * All Rights Reserved.
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining
6 * a copy of this software and associated documentation files (the
7 * "Software"), to deal in the Software without restriction, including
8 * without limitation the rights to use, copy, modify, merge, publish,
9 * distribute, sub license, and/or sell copies of the Software, and to
10 * permit persons to whom the Software is furnished to do so, subject to
11 * the following conditions:
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
15 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16 * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
17 * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
19 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 * USE OR OTHER DEALINGS IN THE SOFTWARE.
21 *
22 * The above copyright notice and this permission notice (including the
23 * next paragraph) shall be included in all copies or substantial portions
24 * of the Software.
25 */
26
27#include "u_queue.h"
28
29#include "c11/threads.h"
30#include "util/u_cpu_detect.h"
31#include "util/os_time.h"
32#include "util/u_string.h"
33#include "util/u_thread.h"
34#include "u_process.h"
35
36#if defined(__linux__)
37#include <sys/time.h>
38#include <sys/resource.h>
39#include <sys/syscall.h>
40#endif
41
42
43/* Define 256MB */
44#define S_256MB (256 * 1024 * 1024)
45
46static void
47util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
48                        bool finish_locked);
49
50/****************************************************************************
51 * Wait for all queues to assert idle when exit() is called.
52 *
53 * Otherwise, C++ static variable destructors can be called while threads
54 * are using the static variables.
55 */
56
57static once_flag atexit_once_flag = ONCE_FLAG_INIT;
58static struct list_head queue_list;
59static mtx_t exit_mutex = _MTX_INITIALIZER_NP;
60
61static void
62atexit_handler(void)
63{
64   struct util_queue *iter;
65
66   mtx_lock(&exit_mutex);
67   /* Wait for all queues to assert idle. */
68   LIST_FOR_EACH_ENTRY(iter, &queue_list, head) {
69      util_queue_kill_threads(iter, 0, false);
70   }
71   mtx_unlock(&exit_mutex);
72}
73
74static void
75global_init(void)
76{
77   list_inithead(&queue_list);
78   atexit(atexit_handler);
79}
80
81static void
82add_to_atexit_list(struct util_queue *queue)
83{
84   call_once(&atexit_once_flag, global_init);
85
86   mtx_lock(&exit_mutex);
87   list_add(&queue->head, &queue_list);
88   mtx_unlock(&exit_mutex);
89}
90
91static void
92remove_from_atexit_list(struct util_queue *queue)
93{
94   struct util_queue *iter, *tmp;
95
96   mtx_lock(&exit_mutex);
97   LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) {
98      if (iter == queue) {
99         list_del(&iter->head);
100         break;
101      }
102   }
103   mtx_unlock(&exit_mutex);
104}
105
106/****************************************************************************
107 * util_queue_fence
108 */
109
110#ifdef UTIL_QUEUE_FENCE_FUTEX
111static bool
112do_futex_fence_wait(struct util_queue_fence *fence,
113                    bool timeout, int64_t abs_timeout)
114{
115   uint32_t v = p_atomic_read_relaxed(&fence->val);
116   struct timespec ts;
117   ts.tv_sec = abs_timeout / (1000*1000*1000);
118   ts.tv_nsec = abs_timeout % (1000*1000*1000);
119
120   while (v != 0) {
121      if (v != 2) {
122         v = p_atomic_cmpxchg(&fence->val, 1, 2);
123         if (v == 0)
124            return true;
125      }
126
127      int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL);
128      if (timeout && r < 0) {
129         if (errno == ETIMEDOUT)
130            return false;
131      }
132
133      v = p_atomic_read_relaxed(&fence->val);
134   }
135
136   return true;
137}
138
139void
140_util_queue_fence_wait(struct util_queue_fence *fence)
141{
142   do_futex_fence_wait(fence, false, 0);
143}
144
145bool
146_util_queue_fence_wait_timeout(struct util_queue_fence *fence,
147                               int64_t abs_timeout)
148{
149   return do_futex_fence_wait(fence, true, abs_timeout);
150}
151
152#endif
153
154#ifdef UTIL_QUEUE_FENCE_STANDARD
155void
156util_queue_fence_signal(struct util_queue_fence *fence)
157{
158   mtx_lock(&fence->mutex);
159   fence->signalled = true;
160   cnd_broadcast(&fence->cond);
161   mtx_unlock(&fence->mutex);
162}
163
164void
165_util_queue_fence_wait(struct util_queue_fence *fence)
166{
167   mtx_lock(&fence->mutex);
168   while (!fence->signalled)
169      cnd_wait(&fence->cond, &fence->mutex);
170   mtx_unlock(&fence->mutex);
171}
172
173bool
174_util_queue_fence_wait_timeout(struct util_queue_fence *fence,
175                               int64_t abs_timeout)
176{
177   /* This terrible hack is made necessary by the fact that we really want an
178    * internal interface consistent with os_time_*, but cnd_timedwait is spec'd
179    * to be relative to the TIME_UTC clock.
180    */
181   int64_t rel = abs_timeout - os_time_get_nano();
182
183   if (rel > 0) {
184      struct timespec ts;
185
186      timespec_get(&ts, TIME_UTC);
187
188      ts.tv_sec += abs_timeout / (1000*1000*1000);
189      ts.tv_nsec += abs_timeout % (1000*1000*1000);
190      if (ts.tv_nsec >= (1000*1000*1000)) {
191         ts.tv_sec++;
192         ts.tv_nsec -= (1000*1000*1000);
193      }
194
195      mtx_lock(&fence->mutex);
196      while (!fence->signalled) {
197         if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success)
198            break;
199      }
200      mtx_unlock(&fence->mutex);
201   }
202
203   return fence->signalled;
204}
205
206void
207util_queue_fence_init(struct util_queue_fence *fence)
208{
209   memset(fence, 0, sizeof(*fence));
210   (void) mtx_init(&fence->mutex, mtx_plain);
211   cnd_init(&fence->cond);
212   fence->signalled = true;
213}
214
215void
216util_queue_fence_destroy(struct util_queue_fence *fence)
217{
218   assert(fence->signalled);
219
220   /* Ensure that another thread is not in the middle of
221    * util_queue_fence_signal (having set the fence to signalled but still
222    * holding the fence mutex).
223    *
224    * A common contract between threads is that as soon as a fence is signalled
225    * by thread A, thread B is allowed to destroy it. Since
226    * util_queue_fence_is_signalled does not lock the fence mutex (for
227    * performance reasons), we must do so here.
228    */
229   mtx_lock(&fence->mutex);
230   mtx_unlock(&fence->mutex);
231
232   cnd_destroy(&fence->cond);
233   mtx_destroy(&fence->mutex);
234}
235#endif
236
237/****************************************************************************
238 * util_queue implementation
239 */
240
241struct thread_input {
242   struct util_queue *queue;
243   int thread_index;
244};
245
246static int
247util_queue_thread_func(void *input)
248{
249   struct util_queue *queue = ((struct thread_input*)input)->queue;
250   int thread_index = ((struct thread_input*)input)->thread_index;
251
252   free(input);
253
254   if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) {
255      /* Don't inherit the thread affinity from the parent thread.
256       * Set the full mask.
257       */
258      uint32_t mask[UTIL_MAX_CPUS / 32];
259
260      memset(mask, 0xff, sizeof(mask));
261
262      util_set_current_thread_affinity(mask, NULL,
263                                       util_get_cpu_caps()->num_cpu_mask_bits);
264   }
265
266#if defined(__linux__)
267   if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
268      /* The nice() function can only set a maximum of 19. */
269      setpriority(PRIO_PROCESS, syscall(SYS_gettid), 19);
270   }
271#endif
272
273   if (strlen(queue->name) > 0) {
274      char name[16];
275      snprintf(name, sizeof(name), "%s%i", queue->name, thread_index);
276      u_thread_setname(name);
277   }
278
279   while (1) {
280      struct util_queue_job job;
281
282      mtx_lock(&queue->lock);
283      assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
284
285      /* wait if the queue is empty */
286      while (thread_index < queue->num_threads && queue->num_queued == 0)
287         cnd_wait(&queue->has_queued_cond, &queue->lock);
288
289      /* only kill threads that are above "num_threads" */
290      if (thread_index >= queue->num_threads) {
291         mtx_unlock(&queue->lock);
292         break;
293      }
294
295      job = queue->jobs[queue->read_idx];
296      memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
297      queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
298
299      queue->num_queued--;
300      cnd_signal(&queue->has_space_cond);
301      if (job.job)
302         queue->total_jobs_size -= job.job_size;
303      mtx_unlock(&queue->lock);
304
305      if (job.job) {
306         job.execute(job.job, job.global_data, thread_index);
307         if (job.fence)
308            util_queue_fence_signal(job.fence);
309         if (job.cleanup)
310            job.cleanup(job.job, job.global_data, thread_index);
311      }
312   }
313
314   /* signal remaining jobs if all threads are being terminated */
315   mtx_lock(&queue->lock);
316   if (queue->num_threads == 0) {
317      for (unsigned i = queue->read_idx; i != queue->write_idx;
318           i = (i + 1) % queue->max_jobs) {
319         if (queue->jobs[i].job) {
320            if (queue->jobs[i].fence)
321               util_queue_fence_signal(queue->jobs[i].fence);
322            queue->jobs[i].job = NULL;
323         }
324      }
325      queue->read_idx = queue->write_idx;
326      queue->num_queued = 0;
327   }
328   mtx_unlock(&queue->lock);
329   return 0;
330}
331
332static bool
333util_queue_create_thread(struct util_queue *queue, unsigned index)
334{
335   struct thread_input *input =
336      (struct thread_input *) malloc(sizeof(struct thread_input));
337   input->queue = queue;
338   input->thread_index = index;
339
340   if (thrd_success != u_thread_create(queue->threads + index, util_queue_thread_func, input)) {
341      free(input);
342      return false;
343   }
344
345   if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
346#if defined(__linux__) && defined(SCHED_BATCH)
347      struct sched_param sched_param = {0};
348
349      /* The nice() function can only set a maximum of 19.
350       * SCHED_BATCH gives the scheduler a hint that this is a latency
351       * insensitive thread.
352       *
353       * Note that Linux only allows decreasing the priority. The original
354       * priority can't be restored.
355       */
356      pthread_setschedparam(queue->threads[index], SCHED_BATCH, &sched_param);
357#endif
358   }
359   return true;
360}
361
362void
363util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads)
364{
365   num_threads = MIN2(num_threads, queue->max_threads);
366   num_threads = MAX2(num_threads, 1);
367
368   simple_mtx_lock(&queue->finish_lock);
369   unsigned old_num_threads = queue->num_threads;
370
371   if (num_threads == old_num_threads) {
372      simple_mtx_unlock(&queue->finish_lock);
373      return;
374   }
375
376   if (num_threads < old_num_threads) {
377      util_queue_kill_threads(queue, num_threads, true);
378      simple_mtx_unlock(&queue->finish_lock);
379      return;
380   }
381
382   /* Create threads.
383    *
384    * We need to update num_threads first, because threads terminate
385    * when thread_index < num_threads.
386    */
387   queue->num_threads = num_threads;
388   for (unsigned i = old_num_threads; i < num_threads; i++) {
389      if (!util_queue_create_thread(queue, i)) {
390         queue->num_threads = i;
391         break;
392      }
393   }
394   simple_mtx_unlock(&queue->finish_lock);
395}
396
397bool
398util_queue_init(struct util_queue *queue,
399                const char *name,
400                unsigned max_jobs,
401                unsigned num_threads,
402                unsigned flags,
403                void *global_data)
404{
405   unsigned i;
406
407   /* Form the thread name from process_name and name, limited to 13
408    * characters. Characters 14-15 are reserved for the thread number.
409    * Character 16 should be 0. Final form: "process:name12"
410    *
411    * If name is too long, it's truncated. If any space is left, the process
412    * name fills it.
413    */
414   const char *process_name = util_get_process_name();
415   int process_len = process_name ? strlen(process_name) : 0;
416   int name_len = strlen(name);
417   const int max_chars = sizeof(queue->name) - 1;
418
419   name_len = MIN2(name_len, max_chars);
420
421   /* See if there is any space left for the process name, reserve 1 for
422    * the colon. */
423   process_len = MIN2(process_len, max_chars - name_len - 1);
424   process_len = MAX2(process_len, 0);
425
426   memset(queue, 0, sizeof(*queue));
427
428   if (process_len) {
429      snprintf(queue->name, sizeof(queue->name), "%.*s:%s",
430               process_len, process_name, name);
431   } else {
432      snprintf(queue->name, sizeof(queue->name), "%s", name);
433   }
434
435   queue->flags = flags;
436   queue->max_threads = num_threads;
437   queue->num_threads = (flags & UTIL_QUEUE_INIT_SCALE_THREADS) ? 1 : num_threads;
438   queue->max_jobs = max_jobs;
439   queue->global_data = global_data;
440
441   (void) mtx_init(&queue->lock, mtx_plain);
442   (void) simple_mtx_init(&queue->finish_lock, mtx_plain);
443
444   queue->num_queued = 0;
445   cnd_init(&queue->has_queued_cond);
446   cnd_init(&queue->has_space_cond);
447
448   queue->jobs = (struct util_queue_job*)
449                 calloc(max_jobs, sizeof(struct util_queue_job));
450   if (!queue->jobs)
451      goto fail;
452
453   queue->threads = (thrd_t*) calloc(queue->max_threads, sizeof(thrd_t));
454   if (!queue->threads)
455      goto fail;
456
457   /* start threads */
458   for (i = 0; i < queue->num_threads; i++) {
459      if (!util_queue_create_thread(queue, i)) {
460         if (i == 0) {
461            /* no threads created, fail */
462            goto fail;
463         } else {
464            /* at least one thread created, so use it */
465            queue->num_threads = i;
466            break;
467         }
468      }
469   }
470
471   add_to_atexit_list(queue);
472   return true;
473
474fail:
475   free(queue->threads);
476
477   if (queue->jobs) {
478      cnd_destroy(&queue->has_space_cond);
479      cnd_destroy(&queue->has_queued_cond);
480      mtx_destroy(&queue->lock);
481      free(queue->jobs);
482   }
483   /* also util_queue_is_initialized can be used to check for success */
484   memset(queue, 0, sizeof(*queue));
485   return false;
486}
487
488static void
489util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
490                        bool finish_locked)
491{
492   unsigned i;
493
494   /* Signal all threads to terminate. */
495   if (!finish_locked)
496      simple_mtx_lock(&queue->finish_lock);
497
498   if (keep_num_threads >= queue->num_threads) {
499      simple_mtx_unlock(&queue->finish_lock);
500      return;
501   }
502
503   mtx_lock(&queue->lock);
504   unsigned old_num_threads = queue->num_threads;
505   /* Setting num_threads is what causes the threads to terminate.
506    * Then cnd_broadcast wakes them up and they will exit their function.
507    */
508   queue->num_threads = keep_num_threads;
509   cnd_broadcast(&queue->has_queued_cond);
510   mtx_unlock(&queue->lock);
511
512   for (i = keep_num_threads; i < old_num_threads; i++)
513      thrd_join(queue->threads[i], NULL);
514
515   if (!finish_locked)
516      simple_mtx_unlock(&queue->finish_lock);
517}
518
519static void
520util_queue_finish_execute(void *data, void *gdata, int num_thread)
521{
522   util_barrier *barrier = data;
523   if (util_barrier_wait(barrier))
524      util_barrier_destroy(barrier);
525}
526
527void
528util_queue_destroy(struct util_queue *queue)
529{
530   util_queue_kill_threads(queue, 0, false);
531
532   /* This makes it safe to call on a queue that failed util_queue_init. */
533   if (queue->head.next != NULL)
534      remove_from_atexit_list(queue);
535
536   cnd_destroy(&queue->has_space_cond);
537   cnd_destroy(&queue->has_queued_cond);
538   simple_mtx_destroy(&queue->finish_lock);
539   mtx_destroy(&queue->lock);
540   free(queue->jobs);
541   free(queue->threads);
542}
543
544void
545util_queue_add_job(struct util_queue *queue,
546                   void *job,
547                   struct util_queue_fence *fence,
548                   util_queue_execute_func execute,
549                   util_queue_execute_func cleanup,
550                   const size_t job_size)
551{
552   struct util_queue_job *ptr;
553
554   mtx_lock(&queue->lock);
555   if (queue->num_threads == 0) {
556      mtx_unlock(&queue->lock);
557      /* well no good option here, but any leaks will be
558       * short-lived as things are shutting down..
559       */
560      return;
561   }
562
563   if (fence)
564      util_queue_fence_reset(fence);
565
566   assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
567
568   /* Scale the number of threads up if there's already one job waiting. */
569   if (queue->num_queued > 0 &&
570       queue->flags & UTIL_QUEUE_INIT_SCALE_THREADS &&
571       execute != util_queue_finish_execute &&
572       queue->num_threads < queue->max_threads) {
573      util_queue_adjust_num_threads(queue, queue->num_threads + 1);
574   }
575
576   if (queue->num_queued == queue->max_jobs) {
577      if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL &&
578          queue->total_jobs_size + job_size < S_256MB) {
579         /* If the queue is full, make it larger to avoid waiting for a free
580          * slot.
581          */
582         unsigned new_max_jobs = queue->max_jobs + 8;
583         struct util_queue_job *jobs =
584            (struct util_queue_job*)calloc(new_max_jobs,
585                                           sizeof(struct util_queue_job));
586         assert(jobs);
587
588         /* Copy all queued jobs into the new list. */
589         unsigned num_jobs = 0;
590         unsigned i = queue->read_idx;
591
592         do {
593            jobs[num_jobs++] = queue->jobs[i];
594            i = (i + 1) % queue->max_jobs;
595         } while (i != queue->write_idx);
596
597         assert(num_jobs == queue->num_queued);
598
599         free(queue->jobs);
600         queue->jobs = jobs;
601         queue->read_idx = 0;
602         queue->write_idx = num_jobs;
603         queue->max_jobs = new_max_jobs;
604      } else {
605         /* Wait until there is a free slot. */
606         while (queue->num_queued == queue->max_jobs)
607            cnd_wait(&queue->has_space_cond, &queue->lock);
608      }
609   }
610
611   ptr = &queue->jobs[queue->write_idx];
612   assert(ptr->job == NULL);
613   ptr->job = job;
614   ptr->global_data = queue->global_data;
615   ptr->fence = fence;
616   ptr->execute = execute;
617   ptr->cleanup = cleanup;
618   ptr->job_size = job_size;
619
620   queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
621   queue->total_jobs_size += ptr->job_size;
622
623   queue->num_queued++;
624   cnd_signal(&queue->has_queued_cond);
625   mtx_unlock(&queue->lock);
626}
627
628/**
629 * Remove a queued job. If the job hasn't started execution, it's removed from
630 * the queue. If the job has started execution, the function waits for it to
631 * complete.
632 *
633 * In all cases, the fence is signalled when the function returns.
634 *
635 * The function can be used when destroying an object associated with the job
636 * when you don't care about the job completion state.
637 */
638void
639util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence)
640{
641   bool removed = false;
642
643   if (util_queue_fence_is_signalled(fence))
644      return;
645
646   mtx_lock(&queue->lock);
647   for (unsigned i = queue->read_idx; i != queue->write_idx;
648        i = (i + 1) % queue->max_jobs) {
649      if (queue->jobs[i].fence == fence) {
650         if (queue->jobs[i].cleanup)
651            queue->jobs[i].cleanup(queue->jobs[i].job, queue->global_data, -1);
652
653         /* Just clear it. The threads will treat as a no-op job. */
654         memset(&queue->jobs[i], 0, sizeof(queue->jobs[i]));
655         removed = true;
656         break;
657      }
658   }
659   mtx_unlock(&queue->lock);
660
661   if (removed)
662      util_queue_fence_signal(fence);
663   else
664      util_queue_fence_wait(fence);
665}
666
667/**
668 * Wait until all previously added jobs have completed.
669 */
670void
671util_queue_finish(struct util_queue *queue)
672{
673   util_barrier barrier;
674   struct util_queue_fence *fences;
675
676   /* If 2 threads were adding jobs for 2 different barries at the same time,
677    * a deadlock would happen, because 1 barrier requires that all threads
678    * wait for it exclusively.
679    */
680   simple_mtx_lock(&queue->finish_lock);
681
682   /* The number of threads can be changed to 0, e.g. by the atexit handler. */
683   if (!queue->num_threads) {
684      simple_mtx_unlock(&queue->finish_lock);
685      return;
686   }
687
688   fences = malloc(queue->num_threads * sizeof(*fences));
689   util_barrier_init(&barrier, queue->num_threads);
690
691   for (unsigned i = 0; i < queue->num_threads; ++i) {
692      util_queue_fence_init(&fences[i]);
693      util_queue_add_job(queue, &barrier, &fences[i],
694                         util_queue_finish_execute, NULL, 0);
695   }
696
697   for (unsigned i = 0; i < queue->num_threads; ++i) {
698      util_queue_fence_wait(&fences[i]);
699      util_queue_fence_destroy(&fences[i]);
700   }
701   simple_mtx_unlock(&queue->finish_lock);
702
703   free(fences);
704}
705
706int64_t
707util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index)
708{
709   /* Allow some flexibility by not raising an error. */
710   if (thread_index >= queue->num_threads)
711      return 0;
712
713   return util_thread_get_time_nano(queue->threads[thread_index]);
714}
715