1bf215546Sopenharmony_ci/*
2bf215546Sopenharmony_ci * Copyright © 2016 Advanced Micro Devices, Inc.
3bf215546Sopenharmony_ci * All Rights Reserved.
4bf215546Sopenharmony_ci *
5bf215546Sopenharmony_ci * Permission is hereby granted, free of charge, to any person obtaining
6bf215546Sopenharmony_ci * a copy of this software and associated documentation files (the
7bf215546Sopenharmony_ci * "Software"), to deal in the Software without restriction, including
8bf215546Sopenharmony_ci * without limitation the rights to use, copy, modify, merge, publish,
9bf215546Sopenharmony_ci * distribute, sub license, and/or sell copies of the Software, and to
10bf215546Sopenharmony_ci * permit persons to whom the Software is furnished to do so, subject to
11bf215546Sopenharmony_ci * the following conditions:
12bf215546Sopenharmony_ci *
13bf215546Sopenharmony_ci * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14bf215546Sopenharmony_ci * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
15bf215546Sopenharmony_ci * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16bf215546Sopenharmony_ci * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
17bf215546Sopenharmony_ci * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18bf215546Sopenharmony_ci * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
19bf215546Sopenharmony_ci * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20bf215546Sopenharmony_ci * USE OR OTHER DEALINGS IN THE SOFTWARE.
21bf215546Sopenharmony_ci *
22bf215546Sopenharmony_ci * The above copyright notice and this permission notice (including the
23bf215546Sopenharmony_ci * next paragraph) shall be included in all copies or substantial portions
24bf215546Sopenharmony_ci * of the Software.
25bf215546Sopenharmony_ci */
26bf215546Sopenharmony_ci
27bf215546Sopenharmony_ci#include "u_queue.h"
28bf215546Sopenharmony_ci
29bf215546Sopenharmony_ci#include "c11/threads.h"
30bf215546Sopenharmony_ci#include "util/u_cpu_detect.h"
31bf215546Sopenharmony_ci#include "util/os_time.h"
32bf215546Sopenharmony_ci#include "util/u_string.h"
33bf215546Sopenharmony_ci#include "util/u_thread.h"
34bf215546Sopenharmony_ci#include "u_process.h"
35bf215546Sopenharmony_ci
36bf215546Sopenharmony_ci#if defined(__linux__)
37bf215546Sopenharmony_ci#include <sys/time.h>
38bf215546Sopenharmony_ci#include <sys/resource.h>
39bf215546Sopenharmony_ci#include <sys/syscall.h>
40bf215546Sopenharmony_ci#endif
41bf215546Sopenharmony_ci
42bf215546Sopenharmony_ci
43bf215546Sopenharmony_ci/* Define 256MB */
44bf215546Sopenharmony_ci#define S_256MB (256 * 1024 * 1024)
45bf215546Sopenharmony_ci
46bf215546Sopenharmony_cistatic void
47bf215546Sopenharmony_ciutil_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
48bf215546Sopenharmony_ci                        bool finish_locked);
49bf215546Sopenharmony_ci
50bf215546Sopenharmony_ci/****************************************************************************
51bf215546Sopenharmony_ci * Wait for all queues to assert idle when exit() is called.
52bf215546Sopenharmony_ci *
53bf215546Sopenharmony_ci * Otherwise, C++ static variable destructors can be called while threads
54bf215546Sopenharmony_ci * are using the static variables.
55bf215546Sopenharmony_ci */
56bf215546Sopenharmony_ci
57bf215546Sopenharmony_cistatic once_flag atexit_once_flag = ONCE_FLAG_INIT;
58bf215546Sopenharmony_cistatic struct list_head queue_list;
59bf215546Sopenharmony_cistatic mtx_t exit_mutex = _MTX_INITIALIZER_NP;
60bf215546Sopenharmony_ci
61bf215546Sopenharmony_cistatic void
62bf215546Sopenharmony_ciatexit_handler(void)
63bf215546Sopenharmony_ci{
64bf215546Sopenharmony_ci   struct util_queue *iter;
65bf215546Sopenharmony_ci
66bf215546Sopenharmony_ci   mtx_lock(&exit_mutex);
67bf215546Sopenharmony_ci   /* Wait for all queues to assert idle. */
68bf215546Sopenharmony_ci   LIST_FOR_EACH_ENTRY(iter, &queue_list, head) {
69bf215546Sopenharmony_ci      util_queue_kill_threads(iter, 0, false);
70bf215546Sopenharmony_ci   }
71bf215546Sopenharmony_ci   mtx_unlock(&exit_mutex);
72bf215546Sopenharmony_ci}
73bf215546Sopenharmony_ci
74bf215546Sopenharmony_cistatic void
75bf215546Sopenharmony_ciglobal_init(void)
76bf215546Sopenharmony_ci{
77bf215546Sopenharmony_ci   list_inithead(&queue_list);
78bf215546Sopenharmony_ci   atexit(atexit_handler);
79bf215546Sopenharmony_ci}
80bf215546Sopenharmony_ci
81bf215546Sopenharmony_cistatic void
82bf215546Sopenharmony_ciadd_to_atexit_list(struct util_queue *queue)
83bf215546Sopenharmony_ci{
84bf215546Sopenharmony_ci   call_once(&atexit_once_flag, global_init);
85bf215546Sopenharmony_ci
86bf215546Sopenharmony_ci   mtx_lock(&exit_mutex);
87bf215546Sopenharmony_ci   list_add(&queue->head, &queue_list);
88bf215546Sopenharmony_ci   mtx_unlock(&exit_mutex);
89bf215546Sopenharmony_ci}
90bf215546Sopenharmony_ci
91bf215546Sopenharmony_cistatic void
92bf215546Sopenharmony_ciremove_from_atexit_list(struct util_queue *queue)
93bf215546Sopenharmony_ci{
94bf215546Sopenharmony_ci   struct util_queue *iter, *tmp;
95bf215546Sopenharmony_ci
96bf215546Sopenharmony_ci   mtx_lock(&exit_mutex);
97bf215546Sopenharmony_ci   LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) {
98bf215546Sopenharmony_ci      if (iter == queue) {
99bf215546Sopenharmony_ci         list_del(&iter->head);
100bf215546Sopenharmony_ci         break;
101bf215546Sopenharmony_ci      }
102bf215546Sopenharmony_ci   }
103bf215546Sopenharmony_ci   mtx_unlock(&exit_mutex);
104bf215546Sopenharmony_ci}
105bf215546Sopenharmony_ci
106bf215546Sopenharmony_ci/****************************************************************************
107bf215546Sopenharmony_ci * util_queue_fence
108bf215546Sopenharmony_ci */
109bf215546Sopenharmony_ci
110bf215546Sopenharmony_ci#ifdef UTIL_QUEUE_FENCE_FUTEX
111bf215546Sopenharmony_cistatic bool
112bf215546Sopenharmony_cido_futex_fence_wait(struct util_queue_fence *fence,
113bf215546Sopenharmony_ci                    bool timeout, int64_t abs_timeout)
114bf215546Sopenharmony_ci{
115bf215546Sopenharmony_ci   uint32_t v = p_atomic_read_relaxed(&fence->val);
116bf215546Sopenharmony_ci   struct timespec ts;
117bf215546Sopenharmony_ci   ts.tv_sec = abs_timeout / (1000*1000*1000);
118bf215546Sopenharmony_ci   ts.tv_nsec = abs_timeout % (1000*1000*1000);
119bf215546Sopenharmony_ci
120bf215546Sopenharmony_ci   while (v != 0) {
121bf215546Sopenharmony_ci      if (v != 2) {
122bf215546Sopenharmony_ci         v = p_atomic_cmpxchg(&fence->val, 1, 2);
123bf215546Sopenharmony_ci         if (v == 0)
124bf215546Sopenharmony_ci            return true;
125bf215546Sopenharmony_ci      }
126bf215546Sopenharmony_ci
127bf215546Sopenharmony_ci      int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL);
128bf215546Sopenharmony_ci      if (timeout && r < 0) {
129bf215546Sopenharmony_ci         if (errno == ETIMEDOUT)
130bf215546Sopenharmony_ci            return false;
131bf215546Sopenharmony_ci      }
132bf215546Sopenharmony_ci
133bf215546Sopenharmony_ci      v = p_atomic_read_relaxed(&fence->val);
134bf215546Sopenharmony_ci   }
135bf215546Sopenharmony_ci
136bf215546Sopenharmony_ci   return true;
137bf215546Sopenharmony_ci}
138bf215546Sopenharmony_ci
139bf215546Sopenharmony_civoid
140bf215546Sopenharmony_ci_util_queue_fence_wait(struct util_queue_fence *fence)
141bf215546Sopenharmony_ci{
142bf215546Sopenharmony_ci   do_futex_fence_wait(fence, false, 0);
143bf215546Sopenharmony_ci}
144bf215546Sopenharmony_ci
145bf215546Sopenharmony_cibool
146bf215546Sopenharmony_ci_util_queue_fence_wait_timeout(struct util_queue_fence *fence,
147bf215546Sopenharmony_ci                               int64_t abs_timeout)
148bf215546Sopenharmony_ci{
149bf215546Sopenharmony_ci   return do_futex_fence_wait(fence, true, abs_timeout);
150bf215546Sopenharmony_ci}
151bf215546Sopenharmony_ci
152bf215546Sopenharmony_ci#endif
153bf215546Sopenharmony_ci
154bf215546Sopenharmony_ci#ifdef UTIL_QUEUE_FENCE_STANDARD
155bf215546Sopenharmony_civoid
156bf215546Sopenharmony_ciutil_queue_fence_signal(struct util_queue_fence *fence)
157bf215546Sopenharmony_ci{
158bf215546Sopenharmony_ci   mtx_lock(&fence->mutex);
159bf215546Sopenharmony_ci   fence->signalled = true;
160bf215546Sopenharmony_ci   cnd_broadcast(&fence->cond);
161bf215546Sopenharmony_ci   mtx_unlock(&fence->mutex);
162bf215546Sopenharmony_ci}
163bf215546Sopenharmony_ci
164bf215546Sopenharmony_civoid
165bf215546Sopenharmony_ci_util_queue_fence_wait(struct util_queue_fence *fence)
166bf215546Sopenharmony_ci{
167bf215546Sopenharmony_ci   mtx_lock(&fence->mutex);
168bf215546Sopenharmony_ci   while (!fence->signalled)
169bf215546Sopenharmony_ci      cnd_wait(&fence->cond, &fence->mutex);
170bf215546Sopenharmony_ci   mtx_unlock(&fence->mutex);
171bf215546Sopenharmony_ci}
172bf215546Sopenharmony_ci
173bf215546Sopenharmony_cibool
174bf215546Sopenharmony_ci_util_queue_fence_wait_timeout(struct util_queue_fence *fence,
175bf215546Sopenharmony_ci                               int64_t abs_timeout)
176bf215546Sopenharmony_ci{
177bf215546Sopenharmony_ci   /* This terrible hack is made necessary by the fact that we really want an
178bf215546Sopenharmony_ci    * internal interface consistent with os_time_*, but cnd_timedwait is spec'd
179bf215546Sopenharmony_ci    * to be relative to the TIME_UTC clock.
180bf215546Sopenharmony_ci    */
181bf215546Sopenharmony_ci   int64_t rel = abs_timeout - os_time_get_nano();
182bf215546Sopenharmony_ci
183bf215546Sopenharmony_ci   if (rel > 0) {
184bf215546Sopenharmony_ci      struct timespec ts;
185bf215546Sopenharmony_ci
186bf215546Sopenharmony_ci      timespec_get(&ts, TIME_UTC);
187bf215546Sopenharmony_ci
188bf215546Sopenharmony_ci      ts.tv_sec += abs_timeout / (1000*1000*1000);
189bf215546Sopenharmony_ci      ts.tv_nsec += abs_timeout % (1000*1000*1000);
190bf215546Sopenharmony_ci      if (ts.tv_nsec >= (1000*1000*1000)) {
191bf215546Sopenharmony_ci         ts.tv_sec++;
192bf215546Sopenharmony_ci         ts.tv_nsec -= (1000*1000*1000);
193bf215546Sopenharmony_ci      }
194bf215546Sopenharmony_ci
195bf215546Sopenharmony_ci      mtx_lock(&fence->mutex);
196bf215546Sopenharmony_ci      while (!fence->signalled) {
197bf215546Sopenharmony_ci         if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success)
198bf215546Sopenharmony_ci            break;
199bf215546Sopenharmony_ci      }
200bf215546Sopenharmony_ci      mtx_unlock(&fence->mutex);
201bf215546Sopenharmony_ci   }
202bf215546Sopenharmony_ci
203bf215546Sopenharmony_ci   return fence->signalled;
204bf215546Sopenharmony_ci}
205bf215546Sopenharmony_ci
206bf215546Sopenharmony_civoid
207bf215546Sopenharmony_ciutil_queue_fence_init(struct util_queue_fence *fence)
208bf215546Sopenharmony_ci{
209bf215546Sopenharmony_ci   memset(fence, 0, sizeof(*fence));
210bf215546Sopenharmony_ci   (void) mtx_init(&fence->mutex, mtx_plain);
211bf215546Sopenharmony_ci   cnd_init(&fence->cond);
212bf215546Sopenharmony_ci   fence->signalled = true;
213bf215546Sopenharmony_ci}
214bf215546Sopenharmony_ci
215bf215546Sopenharmony_civoid
216bf215546Sopenharmony_ciutil_queue_fence_destroy(struct util_queue_fence *fence)
217bf215546Sopenharmony_ci{
218bf215546Sopenharmony_ci   assert(fence->signalled);
219bf215546Sopenharmony_ci
220bf215546Sopenharmony_ci   /* Ensure that another thread is not in the middle of
221bf215546Sopenharmony_ci    * util_queue_fence_signal (having set the fence to signalled but still
222bf215546Sopenharmony_ci    * holding the fence mutex).
223bf215546Sopenharmony_ci    *
224bf215546Sopenharmony_ci    * A common contract between threads is that as soon as a fence is signalled
225bf215546Sopenharmony_ci    * by thread A, thread B is allowed to destroy it. Since
226bf215546Sopenharmony_ci    * util_queue_fence_is_signalled does not lock the fence mutex (for
227bf215546Sopenharmony_ci    * performance reasons), we must do so here.
228bf215546Sopenharmony_ci    */
229bf215546Sopenharmony_ci   mtx_lock(&fence->mutex);
230bf215546Sopenharmony_ci   mtx_unlock(&fence->mutex);
231bf215546Sopenharmony_ci
232bf215546Sopenharmony_ci   cnd_destroy(&fence->cond);
233bf215546Sopenharmony_ci   mtx_destroy(&fence->mutex);
234bf215546Sopenharmony_ci}
235bf215546Sopenharmony_ci#endif
236bf215546Sopenharmony_ci
237bf215546Sopenharmony_ci/****************************************************************************
238bf215546Sopenharmony_ci * util_queue implementation
239bf215546Sopenharmony_ci */
240bf215546Sopenharmony_ci
241bf215546Sopenharmony_cistruct thread_input {
242bf215546Sopenharmony_ci   struct util_queue *queue;
243bf215546Sopenharmony_ci   int thread_index;
244bf215546Sopenharmony_ci};
245bf215546Sopenharmony_ci
246bf215546Sopenharmony_cistatic int
247bf215546Sopenharmony_ciutil_queue_thread_func(void *input)
248bf215546Sopenharmony_ci{
249bf215546Sopenharmony_ci   struct util_queue *queue = ((struct thread_input*)input)->queue;
250bf215546Sopenharmony_ci   int thread_index = ((struct thread_input*)input)->thread_index;
251bf215546Sopenharmony_ci
252bf215546Sopenharmony_ci   free(input);
253bf215546Sopenharmony_ci
254bf215546Sopenharmony_ci   if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) {
255bf215546Sopenharmony_ci      /* Don't inherit the thread affinity from the parent thread.
256bf215546Sopenharmony_ci       * Set the full mask.
257bf215546Sopenharmony_ci       */
258bf215546Sopenharmony_ci      uint32_t mask[UTIL_MAX_CPUS / 32];
259bf215546Sopenharmony_ci
260bf215546Sopenharmony_ci      memset(mask, 0xff, sizeof(mask));
261bf215546Sopenharmony_ci
262bf215546Sopenharmony_ci      util_set_current_thread_affinity(mask, NULL,
263bf215546Sopenharmony_ci                                       util_get_cpu_caps()->num_cpu_mask_bits);
264bf215546Sopenharmony_ci   }
265bf215546Sopenharmony_ci
266bf215546Sopenharmony_ci#if defined(__linux__)
267bf215546Sopenharmony_ci   if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
268bf215546Sopenharmony_ci      /* The nice() function can only set a maximum of 19. */
269bf215546Sopenharmony_ci      setpriority(PRIO_PROCESS, syscall(SYS_gettid), 19);
270bf215546Sopenharmony_ci   }
271bf215546Sopenharmony_ci#endif
272bf215546Sopenharmony_ci
273bf215546Sopenharmony_ci   if (strlen(queue->name) > 0) {
274bf215546Sopenharmony_ci      char name[16];
275bf215546Sopenharmony_ci      snprintf(name, sizeof(name), "%s%i", queue->name, thread_index);
276bf215546Sopenharmony_ci      u_thread_setname(name);
277bf215546Sopenharmony_ci   }
278bf215546Sopenharmony_ci
279bf215546Sopenharmony_ci   while (1) {
280bf215546Sopenharmony_ci      struct util_queue_job job;
281bf215546Sopenharmony_ci
282bf215546Sopenharmony_ci      mtx_lock(&queue->lock);
283bf215546Sopenharmony_ci      assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
284bf215546Sopenharmony_ci
285bf215546Sopenharmony_ci      /* wait if the queue is empty */
286bf215546Sopenharmony_ci      while (thread_index < queue->num_threads && queue->num_queued == 0)
287bf215546Sopenharmony_ci         cnd_wait(&queue->has_queued_cond, &queue->lock);
288bf215546Sopenharmony_ci
289bf215546Sopenharmony_ci      /* only kill threads that are above "num_threads" */
290bf215546Sopenharmony_ci      if (thread_index >= queue->num_threads) {
291bf215546Sopenharmony_ci         mtx_unlock(&queue->lock);
292bf215546Sopenharmony_ci         break;
293bf215546Sopenharmony_ci      }
294bf215546Sopenharmony_ci
295bf215546Sopenharmony_ci      job = queue->jobs[queue->read_idx];
296bf215546Sopenharmony_ci      memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
297bf215546Sopenharmony_ci      queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
298bf215546Sopenharmony_ci
299bf215546Sopenharmony_ci      queue->num_queued--;
300bf215546Sopenharmony_ci      cnd_signal(&queue->has_space_cond);
301bf215546Sopenharmony_ci      if (job.job)
302bf215546Sopenharmony_ci         queue->total_jobs_size -= job.job_size;
303bf215546Sopenharmony_ci      mtx_unlock(&queue->lock);
304bf215546Sopenharmony_ci
305bf215546Sopenharmony_ci      if (job.job) {
306bf215546Sopenharmony_ci         job.execute(job.job, job.global_data, thread_index);
307bf215546Sopenharmony_ci         if (job.fence)
308bf215546Sopenharmony_ci            util_queue_fence_signal(job.fence);
309bf215546Sopenharmony_ci         if (job.cleanup)
310bf215546Sopenharmony_ci            job.cleanup(job.job, job.global_data, thread_index);
311bf215546Sopenharmony_ci      }
312bf215546Sopenharmony_ci   }
313bf215546Sopenharmony_ci
314bf215546Sopenharmony_ci   /* signal remaining jobs if all threads are being terminated */
315bf215546Sopenharmony_ci   mtx_lock(&queue->lock);
316bf215546Sopenharmony_ci   if (queue->num_threads == 0) {
317bf215546Sopenharmony_ci      for (unsigned i = queue->read_idx; i != queue->write_idx;
318bf215546Sopenharmony_ci           i = (i + 1) % queue->max_jobs) {
319bf215546Sopenharmony_ci         if (queue->jobs[i].job) {
320bf215546Sopenharmony_ci            if (queue->jobs[i].fence)
321bf215546Sopenharmony_ci               util_queue_fence_signal(queue->jobs[i].fence);
322bf215546Sopenharmony_ci            queue->jobs[i].job = NULL;
323bf215546Sopenharmony_ci         }
324bf215546Sopenharmony_ci      }
325bf215546Sopenharmony_ci      queue->read_idx = queue->write_idx;
326bf215546Sopenharmony_ci      queue->num_queued = 0;
327bf215546Sopenharmony_ci   }
328bf215546Sopenharmony_ci   mtx_unlock(&queue->lock);
329bf215546Sopenharmony_ci   return 0;
330bf215546Sopenharmony_ci}
331bf215546Sopenharmony_ci
332bf215546Sopenharmony_cistatic bool
333bf215546Sopenharmony_ciutil_queue_create_thread(struct util_queue *queue, unsigned index)
334bf215546Sopenharmony_ci{
335bf215546Sopenharmony_ci   struct thread_input *input =
336bf215546Sopenharmony_ci      (struct thread_input *) malloc(sizeof(struct thread_input));
337bf215546Sopenharmony_ci   input->queue = queue;
338bf215546Sopenharmony_ci   input->thread_index = index;
339bf215546Sopenharmony_ci
340bf215546Sopenharmony_ci   if (thrd_success != u_thread_create(queue->threads + index, util_queue_thread_func, input)) {
341bf215546Sopenharmony_ci      free(input);
342bf215546Sopenharmony_ci      return false;
343bf215546Sopenharmony_ci   }
344bf215546Sopenharmony_ci
345bf215546Sopenharmony_ci   if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
346bf215546Sopenharmony_ci#if defined(__linux__) && defined(SCHED_BATCH)
347bf215546Sopenharmony_ci      struct sched_param sched_param = {0};
348bf215546Sopenharmony_ci
349bf215546Sopenharmony_ci      /* The nice() function can only set a maximum of 19.
350bf215546Sopenharmony_ci       * SCHED_BATCH gives the scheduler a hint that this is a latency
351bf215546Sopenharmony_ci       * insensitive thread.
352bf215546Sopenharmony_ci       *
353bf215546Sopenharmony_ci       * Note that Linux only allows decreasing the priority. The original
354bf215546Sopenharmony_ci       * priority can't be restored.
355bf215546Sopenharmony_ci       */
356bf215546Sopenharmony_ci      pthread_setschedparam(queue->threads[index], SCHED_BATCH, &sched_param);
357bf215546Sopenharmony_ci#endif
358bf215546Sopenharmony_ci   }
359bf215546Sopenharmony_ci   return true;
360bf215546Sopenharmony_ci}
361bf215546Sopenharmony_ci
362bf215546Sopenharmony_civoid
363bf215546Sopenharmony_ciutil_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads)
364bf215546Sopenharmony_ci{
365bf215546Sopenharmony_ci   num_threads = MIN2(num_threads, queue->max_threads);
366bf215546Sopenharmony_ci   num_threads = MAX2(num_threads, 1);
367bf215546Sopenharmony_ci
368bf215546Sopenharmony_ci   simple_mtx_lock(&queue->finish_lock);
369bf215546Sopenharmony_ci   unsigned old_num_threads = queue->num_threads;
370bf215546Sopenharmony_ci
371bf215546Sopenharmony_ci   if (num_threads == old_num_threads) {
372bf215546Sopenharmony_ci      simple_mtx_unlock(&queue->finish_lock);
373bf215546Sopenharmony_ci      return;
374bf215546Sopenharmony_ci   }
375bf215546Sopenharmony_ci
376bf215546Sopenharmony_ci   if (num_threads < old_num_threads) {
377bf215546Sopenharmony_ci      util_queue_kill_threads(queue, num_threads, true);
378bf215546Sopenharmony_ci      simple_mtx_unlock(&queue->finish_lock);
379bf215546Sopenharmony_ci      return;
380bf215546Sopenharmony_ci   }
381bf215546Sopenharmony_ci
382bf215546Sopenharmony_ci   /* Create threads.
383bf215546Sopenharmony_ci    *
384bf215546Sopenharmony_ci    * We need to update num_threads first, because threads terminate
385bf215546Sopenharmony_ci    * when thread_index < num_threads.
386bf215546Sopenharmony_ci    */
387bf215546Sopenharmony_ci   queue->num_threads = num_threads;
388bf215546Sopenharmony_ci   for (unsigned i = old_num_threads; i < num_threads; i++) {
389bf215546Sopenharmony_ci      if (!util_queue_create_thread(queue, i)) {
390bf215546Sopenharmony_ci         queue->num_threads = i;
391bf215546Sopenharmony_ci         break;
392bf215546Sopenharmony_ci      }
393bf215546Sopenharmony_ci   }
394bf215546Sopenharmony_ci   simple_mtx_unlock(&queue->finish_lock);
395bf215546Sopenharmony_ci}
396bf215546Sopenharmony_ci
397bf215546Sopenharmony_cibool
398bf215546Sopenharmony_ciutil_queue_init(struct util_queue *queue,
399bf215546Sopenharmony_ci                const char *name,
400bf215546Sopenharmony_ci                unsigned max_jobs,
401bf215546Sopenharmony_ci                unsigned num_threads,
402bf215546Sopenharmony_ci                unsigned flags,
403bf215546Sopenharmony_ci                void *global_data)
404bf215546Sopenharmony_ci{
405bf215546Sopenharmony_ci   unsigned i;
406bf215546Sopenharmony_ci
407bf215546Sopenharmony_ci   /* Form the thread name from process_name and name, limited to 13
408bf215546Sopenharmony_ci    * characters. Characters 14-15 are reserved for the thread number.
409bf215546Sopenharmony_ci    * Character 16 should be 0. Final form: "process:name12"
410bf215546Sopenharmony_ci    *
411bf215546Sopenharmony_ci    * If name is too long, it's truncated. If any space is left, the process
412bf215546Sopenharmony_ci    * name fills it.
413bf215546Sopenharmony_ci    */
414bf215546Sopenharmony_ci   const char *process_name = util_get_process_name();
415bf215546Sopenharmony_ci   int process_len = process_name ? strlen(process_name) : 0;
416bf215546Sopenharmony_ci   int name_len = strlen(name);
417bf215546Sopenharmony_ci   const int max_chars = sizeof(queue->name) - 1;
418bf215546Sopenharmony_ci
419bf215546Sopenharmony_ci   name_len = MIN2(name_len, max_chars);
420bf215546Sopenharmony_ci
421bf215546Sopenharmony_ci   /* See if there is any space left for the process name, reserve 1 for
422bf215546Sopenharmony_ci    * the colon. */
423bf215546Sopenharmony_ci   process_len = MIN2(process_len, max_chars - name_len - 1);
424bf215546Sopenharmony_ci   process_len = MAX2(process_len, 0);
425bf215546Sopenharmony_ci
426bf215546Sopenharmony_ci   memset(queue, 0, sizeof(*queue));
427bf215546Sopenharmony_ci
428bf215546Sopenharmony_ci   if (process_len) {
429bf215546Sopenharmony_ci      snprintf(queue->name, sizeof(queue->name), "%.*s:%s",
430bf215546Sopenharmony_ci               process_len, process_name, name);
431bf215546Sopenharmony_ci   } else {
432bf215546Sopenharmony_ci      snprintf(queue->name, sizeof(queue->name), "%s", name);
433bf215546Sopenharmony_ci   }
434bf215546Sopenharmony_ci
435bf215546Sopenharmony_ci   queue->flags = flags;
436bf215546Sopenharmony_ci   queue->max_threads = num_threads;
437bf215546Sopenharmony_ci   queue->num_threads = (flags & UTIL_QUEUE_INIT_SCALE_THREADS) ? 1 : num_threads;
438bf215546Sopenharmony_ci   queue->max_jobs = max_jobs;
439bf215546Sopenharmony_ci   queue->global_data = global_data;
440bf215546Sopenharmony_ci
441bf215546Sopenharmony_ci   (void) mtx_init(&queue->lock, mtx_plain);
442bf215546Sopenharmony_ci   (void) simple_mtx_init(&queue->finish_lock, mtx_plain);
443bf215546Sopenharmony_ci
444bf215546Sopenharmony_ci   queue->num_queued = 0;
445bf215546Sopenharmony_ci   cnd_init(&queue->has_queued_cond);
446bf215546Sopenharmony_ci   cnd_init(&queue->has_space_cond);
447bf215546Sopenharmony_ci
448bf215546Sopenharmony_ci   queue->jobs = (struct util_queue_job*)
449bf215546Sopenharmony_ci                 calloc(max_jobs, sizeof(struct util_queue_job));
450bf215546Sopenharmony_ci   if (!queue->jobs)
451bf215546Sopenharmony_ci      goto fail;
452bf215546Sopenharmony_ci
453bf215546Sopenharmony_ci   queue->threads = (thrd_t*) calloc(queue->max_threads, sizeof(thrd_t));
454bf215546Sopenharmony_ci   if (!queue->threads)
455bf215546Sopenharmony_ci      goto fail;
456bf215546Sopenharmony_ci
457bf215546Sopenharmony_ci   /* start threads */
458bf215546Sopenharmony_ci   for (i = 0; i < queue->num_threads; i++) {
459bf215546Sopenharmony_ci      if (!util_queue_create_thread(queue, i)) {
460bf215546Sopenharmony_ci         if (i == 0) {
461bf215546Sopenharmony_ci            /* no threads created, fail */
462bf215546Sopenharmony_ci            goto fail;
463bf215546Sopenharmony_ci         } else {
464bf215546Sopenharmony_ci            /* at least one thread created, so use it */
465bf215546Sopenharmony_ci            queue->num_threads = i;
466bf215546Sopenharmony_ci            break;
467bf215546Sopenharmony_ci         }
468bf215546Sopenharmony_ci      }
469bf215546Sopenharmony_ci   }
470bf215546Sopenharmony_ci
471bf215546Sopenharmony_ci   add_to_atexit_list(queue);
472bf215546Sopenharmony_ci   return true;
473bf215546Sopenharmony_ci
474bf215546Sopenharmony_cifail:
475bf215546Sopenharmony_ci   free(queue->threads);
476bf215546Sopenharmony_ci
477bf215546Sopenharmony_ci   if (queue->jobs) {
478bf215546Sopenharmony_ci      cnd_destroy(&queue->has_space_cond);
479bf215546Sopenharmony_ci      cnd_destroy(&queue->has_queued_cond);
480bf215546Sopenharmony_ci      mtx_destroy(&queue->lock);
481bf215546Sopenharmony_ci      free(queue->jobs);
482bf215546Sopenharmony_ci   }
483bf215546Sopenharmony_ci   /* also util_queue_is_initialized can be used to check for success */
484bf215546Sopenharmony_ci   memset(queue, 0, sizeof(*queue));
485bf215546Sopenharmony_ci   return false;
486bf215546Sopenharmony_ci}
487bf215546Sopenharmony_ci
488bf215546Sopenharmony_cistatic void
489bf215546Sopenharmony_ciutil_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
490bf215546Sopenharmony_ci                        bool finish_locked)
491bf215546Sopenharmony_ci{
492bf215546Sopenharmony_ci   unsigned i;
493bf215546Sopenharmony_ci
494bf215546Sopenharmony_ci   /* Signal all threads to terminate. */
495bf215546Sopenharmony_ci   if (!finish_locked)
496bf215546Sopenharmony_ci      simple_mtx_lock(&queue->finish_lock);
497bf215546Sopenharmony_ci
498bf215546Sopenharmony_ci   if (keep_num_threads >= queue->num_threads) {
499bf215546Sopenharmony_ci      simple_mtx_unlock(&queue->finish_lock);
500bf215546Sopenharmony_ci      return;
501bf215546Sopenharmony_ci   }
502bf215546Sopenharmony_ci
503bf215546Sopenharmony_ci   mtx_lock(&queue->lock);
504bf215546Sopenharmony_ci   unsigned old_num_threads = queue->num_threads;
505bf215546Sopenharmony_ci   /* Setting num_threads is what causes the threads to terminate.
506bf215546Sopenharmony_ci    * Then cnd_broadcast wakes them up and they will exit their function.
507bf215546Sopenharmony_ci    */
508bf215546Sopenharmony_ci   queue->num_threads = keep_num_threads;
509bf215546Sopenharmony_ci   cnd_broadcast(&queue->has_queued_cond);
510bf215546Sopenharmony_ci   mtx_unlock(&queue->lock);
511bf215546Sopenharmony_ci
512bf215546Sopenharmony_ci   for (i = keep_num_threads; i < old_num_threads; i++)
513bf215546Sopenharmony_ci      thrd_join(queue->threads[i], NULL);
514bf215546Sopenharmony_ci
515bf215546Sopenharmony_ci   if (!finish_locked)
516bf215546Sopenharmony_ci      simple_mtx_unlock(&queue->finish_lock);
517bf215546Sopenharmony_ci}
518bf215546Sopenharmony_ci
519bf215546Sopenharmony_cistatic void
520bf215546Sopenharmony_ciutil_queue_finish_execute(void *data, void *gdata, int num_thread)
521bf215546Sopenharmony_ci{
522bf215546Sopenharmony_ci   util_barrier *barrier = data;
523bf215546Sopenharmony_ci   if (util_barrier_wait(barrier))
524bf215546Sopenharmony_ci      util_barrier_destroy(barrier);
525bf215546Sopenharmony_ci}
526bf215546Sopenharmony_ci
527bf215546Sopenharmony_civoid
528bf215546Sopenharmony_ciutil_queue_destroy(struct util_queue *queue)
529bf215546Sopenharmony_ci{
530bf215546Sopenharmony_ci   util_queue_kill_threads(queue, 0, false);
531bf215546Sopenharmony_ci
532bf215546Sopenharmony_ci   /* This makes it safe to call on a queue that failed util_queue_init. */
533bf215546Sopenharmony_ci   if (queue->head.next != NULL)
534bf215546Sopenharmony_ci      remove_from_atexit_list(queue);
535bf215546Sopenharmony_ci
536bf215546Sopenharmony_ci   cnd_destroy(&queue->has_space_cond);
537bf215546Sopenharmony_ci   cnd_destroy(&queue->has_queued_cond);
538bf215546Sopenharmony_ci   simple_mtx_destroy(&queue->finish_lock);
539bf215546Sopenharmony_ci   mtx_destroy(&queue->lock);
540bf215546Sopenharmony_ci   free(queue->jobs);
541bf215546Sopenharmony_ci   free(queue->threads);
542bf215546Sopenharmony_ci}
543bf215546Sopenharmony_ci
544bf215546Sopenharmony_civoid
545bf215546Sopenharmony_ciutil_queue_add_job(struct util_queue *queue,
546bf215546Sopenharmony_ci                   void *job,
547bf215546Sopenharmony_ci                   struct util_queue_fence *fence,
548bf215546Sopenharmony_ci                   util_queue_execute_func execute,
549bf215546Sopenharmony_ci                   util_queue_execute_func cleanup,
550bf215546Sopenharmony_ci                   const size_t job_size)
551bf215546Sopenharmony_ci{
552bf215546Sopenharmony_ci   struct util_queue_job *ptr;
553bf215546Sopenharmony_ci
554bf215546Sopenharmony_ci   mtx_lock(&queue->lock);
555bf215546Sopenharmony_ci   if (queue->num_threads == 0) {
556bf215546Sopenharmony_ci      mtx_unlock(&queue->lock);
557bf215546Sopenharmony_ci      /* well no good option here, but any leaks will be
558bf215546Sopenharmony_ci       * short-lived as things are shutting down..
559bf215546Sopenharmony_ci       */
560bf215546Sopenharmony_ci      return;
561bf215546Sopenharmony_ci   }
562bf215546Sopenharmony_ci
563bf215546Sopenharmony_ci   if (fence)
564bf215546Sopenharmony_ci      util_queue_fence_reset(fence);
565bf215546Sopenharmony_ci
566bf215546Sopenharmony_ci   assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
567bf215546Sopenharmony_ci
568bf215546Sopenharmony_ci   /* Scale the number of threads up if there's already one job waiting. */
569bf215546Sopenharmony_ci   if (queue->num_queued > 0 &&
570bf215546Sopenharmony_ci       queue->flags & UTIL_QUEUE_INIT_SCALE_THREADS &&
571bf215546Sopenharmony_ci       execute != util_queue_finish_execute &&
572bf215546Sopenharmony_ci       queue->num_threads < queue->max_threads) {
573bf215546Sopenharmony_ci      util_queue_adjust_num_threads(queue, queue->num_threads + 1);
574bf215546Sopenharmony_ci   }
575bf215546Sopenharmony_ci
576bf215546Sopenharmony_ci   if (queue->num_queued == queue->max_jobs) {
577bf215546Sopenharmony_ci      if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL &&
578bf215546Sopenharmony_ci          queue->total_jobs_size + job_size < S_256MB) {
579bf215546Sopenharmony_ci         /* If the queue is full, make it larger to avoid waiting for a free
580bf215546Sopenharmony_ci          * slot.
581bf215546Sopenharmony_ci          */
582bf215546Sopenharmony_ci         unsigned new_max_jobs = queue->max_jobs + 8;
583bf215546Sopenharmony_ci         struct util_queue_job *jobs =
584bf215546Sopenharmony_ci            (struct util_queue_job*)calloc(new_max_jobs,
585bf215546Sopenharmony_ci                                           sizeof(struct util_queue_job));
586bf215546Sopenharmony_ci         assert(jobs);
587bf215546Sopenharmony_ci
588bf215546Sopenharmony_ci         /* Copy all queued jobs into the new list. */
589bf215546Sopenharmony_ci         unsigned num_jobs = 0;
590bf215546Sopenharmony_ci         unsigned i = queue->read_idx;
591bf215546Sopenharmony_ci
592bf215546Sopenharmony_ci         do {
593bf215546Sopenharmony_ci            jobs[num_jobs++] = queue->jobs[i];
594bf215546Sopenharmony_ci            i = (i + 1) % queue->max_jobs;
595bf215546Sopenharmony_ci         } while (i != queue->write_idx);
596bf215546Sopenharmony_ci
597bf215546Sopenharmony_ci         assert(num_jobs == queue->num_queued);
598bf215546Sopenharmony_ci
599bf215546Sopenharmony_ci         free(queue->jobs);
600bf215546Sopenharmony_ci         queue->jobs = jobs;
601bf215546Sopenharmony_ci         queue->read_idx = 0;
602bf215546Sopenharmony_ci         queue->write_idx = num_jobs;
603bf215546Sopenharmony_ci         queue->max_jobs = new_max_jobs;
604bf215546Sopenharmony_ci      } else {
605bf215546Sopenharmony_ci         /* Wait until there is a free slot. */
606bf215546Sopenharmony_ci         while (queue->num_queued == queue->max_jobs)
607bf215546Sopenharmony_ci            cnd_wait(&queue->has_space_cond, &queue->lock);
608bf215546Sopenharmony_ci      }
609bf215546Sopenharmony_ci   }
610bf215546Sopenharmony_ci
611bf215546Sopenharmony_ci   ptr = &queue->jobs[queue->write_idx];
612bf215546Sopenharmony_ci   assert(ptr->job == NULL);
613bf215546Sopenharmony_ci   ptr->job = job;
614bf215546Sopenharmony_ci   ptr->global_data = queue->global_data;
615bf215546Sopenharmony_ci   ptr->fence = fence;
616bf215546Sopenharmony_ci   ptr->execute = execute;
617bf215546Sopenharmony_ci   ptr->cleanup = cleanup;
618bf215546Sopenharmony_ci   ptr->job_size = job_size;
619bf215546Sopenharmony_ci
620bf215546Sopenharmony_ci   queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
621bf215546Sopenharmony_ci   queue->total_jobs_size += ptr->job_size;
622bf215546Sopenharmony_ci
623bf215546Sopenharmony_ci   queue->num_queued++;
624bf215546Sopenharmony_ci   cnd_signal(&queue->has_queued_cond);
625bf215546Sopenharmony_ci   mtx_unlock(&queue->lock);
626bf215546Sopenharmony_ci}
627bf215546Sopenharmony_ci
628bf215546Sopenharmony_ci/**
629bf215546Sopenharmony_ci * Remove a queued job. If the job hasn't started execution, it's removed from
630bf215546Sopenharmony_ci * the queue. If the job has started execution, the function waits for it to
631bf215546Sopenharmony_ci * complete.
632bf215546Sopenharmony_ci *
633bf215546Sopenharmony_ci * In all cases, the fence is signalled when the function returns.
634bf215546Sopenharmony_ci *
635bf215546Sopenharmony_ci * The function can be used when destroying an object associated with the job
636bf215546Sopenharmony_ci * when you don't care about the job completion state.
637bf215546Sopenharmony_ci */
638bf215546Sopenharmony_civoid
639bf215546Sopenharmony_ciutil_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence)
640bf215546Sopenharmony_ci{
641bf215546Sopenharmony_ci   bool removed = false;
642bf215546Sopenharmony_ci
643bf215546Sopenharmony_ci   if (util_queue_fence_is_signalled(fence))
644bf215546Sopenharmony_ci      return;
645bf215546Sopenharmony_ci
646bf215546Sopenharmony_ci   mtx_lock(&queue->lock);
647bf215546Sopenharmony_ci   for (unsigned i = queue->read_idx; i != queue->write_idx;
648bf215546Sopenharmony_ci        i = (i + 1) % queue->max_jobs) {
649bf215546Sopenharmony_ci      if (queue->jobs[i].fence == fence) {
650bf215546Sopenharmony_ci         if (queue->jobs[i].cleanup)
651bf215546Sopenharmony_ci            queue->jobs[i].cleanup(queue->jobs[i].job, queue->global_data, -1);
652bf215546Sopenharmony_ci
653bf215546Sopenharmony_ci         /* Just clear it. The threads will treat as a no-op job. */
654bf215546Sopenharmony_ci         memset(&queue->jobs[i], 0, sizeof(queue->jobs[i]));
655bf215546Sopenharmony_ci         removed = true;
656bf215546Sopenharmony_ci         break;
657bf215546Sopenharmony_ci      }
658bf215546Sopenharmony_ci   }
659bf215546Sopenharmony_ci   mtx_unlock(&queue->lock);
660bf215546Sopenharmony_ci
661bf215546Sopenharmony_ci   if (removed)
662bf215546Sopenharmony_ci      util_queue_fence_signal(fence);
663bf215546Sopenharmony_ci   else
664bf215546Sopenharmony_ci      util_queue_fence_wait(fence);
665bf215546Sopenharmony_ci}
666bf215546Sopenharmony_ci
667bf215546Sopenharmony_ci/**
668bf215546Sopenharmony_ci * Wait until all previously added jobs have completed.
669bf215546Sopenharmony_ci */
670bf215546Sopenharmony_civoid
671bf215546Sopenharmony_ciutil_queue_finish(struct util_queue *queue)
672bf215546Sopenharmony_ci{
673bf215546Sopenharmony_ci   util_barrier barrier;
674bf215546Sopenharmony_ci   struct util_queue_fence *fences;
675bf215546Sopenharmony_ci
676bf215546Sopenharmony_ci   /* If 2 threads were adding jobs for 2 different barries at the same time,
677bf215546Sopenharmony_ci    * a deadlock would happen, because 1 barrier requires that all threads
678bf215546Sopenharmony_ci    * wait for it exclusively.
679bf215546Sopenharmony_ci    */
680bf215546Sopenharmony_ci   simple_mtx_lock(&queue->finish_lock);
681bf215546Sopenharmony_ci
682bf215546Sopenharmony_ci   /* The number of threads can be changed to 0, e.g. by the atexit handler. */
683bf215546Sopenharmony_ci   if (!queue->num_threads) {
684bf215546Sopenharmony_ci      simple_mtx_unlock(&queue->finish_lock);
685bf215546Sopenharmony_ci      return;
686bf215546Sopenharmony_ci   }
687bf215546Sopenharmony_ci
688bf215546Sopenharmony_ci   fences = malloc(queue->num_threads * sizeof(*fences));
689bf215546Sopenharmony_ci   util_barrier_init(&barrier, queue->num_threads);
690bf215546Sopenharmony_ci
691bf215546Sopenharmony_ci   for (unsigned i = 0; i < queue->num_threads; ++i) {
692bf215546Sopenharmony_ci      util_queue_fence_init(&fences[i]);
693bf215546Sopenharmony_ci      util_queue_add_job(queue, &barrier, &fences[i],
694bf215546Sopenharmony_ci                         util_queue_finish_execute, NULL, 0);
695bf215546Sopenharmony_ci   }
696bf215546Sopenharmony_ci
697bf215546Sopenharmony_ci   for (unsigned i = 0; i < queue->num_threads; ++i) {
698bf215546Sopenharmony_ci      util_queue_fence_wait(&fences[i]);
699bf215546Sopenharmony_ci      util_queue_fence_destroy(&fences[i]);
700bf215546Sopenharmony_ci   }
701bf215546Sopenharmony_ci   simple_mtx_unlock(&queue->finish_lock);
702bf215546Sopenharmony_ci
703bf215546Sopenharmony_ci   free(fences);
704bf215546Sopenharmony_ci}
705bf215546Sopenharmony_ci
706bf215546Sopenharmony_ciint64_t
707bf215546Sopenharmony_ciutil_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index)
708bf215546Sopenharmony_ci{
709bf215546Sopenharmony_ci   /* Allow some flexibility by not raising an error. */
710bf215546Sopenharmony_ci   if (thread_index >= queue->num_threads)
711bf215546Sopenharmony_ci      return 0;
712bf215546Sopenharmony_ci
713bf215546Sopenharmony_ci   return util_thread_get_time_nano(queue->threads[thread_index]);
714bf215546Sopenharmony_ci}
715