xref: /third_party/libuv/src/threadpool.c (revision e66f31c5)
1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22#include "uv-common.h"
23#include "uv_log.h"
24#include "uv_trace.h"
25
26#if !defined(_WIN32)
27# include "unix/internal.h"
28#else
29# include "win/internal.h"
30#endif
31
32#include <stdlib.h>
33#ifdef USE_FFRT
34#include <assert.h>
35#include "ffrt_inner.h"
36#endif
37#include <stdio.h>
38#ifdef ASYNC_STACKTRACE
39#include "dfx/async_stack/libuv_async_stack.h"
40#endif
41
42#define MAX_THREADPOOL_SIZE 1024
43#define UV_TRACE_NAME "UV_TRACE"
44
45static uv_rwlock_t g_closed_uv_loop_rwlock;
46static uv_once_t once = UV_ONCE_INIT;
47static uv_cond_t cond;
48static uv_mutex_t mutex;
49static unsigned int idle_threads;
50static unsigned int nthreads;
51static uv_thread_t* threads;
52static uv_thread_t default_threads[4];
53static struct uv__queue exit_message;
54static struct uv__queue wq;
55static struct uv__queue run_slow_work_message;
56static struct uv__queue slow_io_pending_wq;
57
58
59#ifdef UV_STATISTIC
60#define MAX_DUMP_QUEUE_SIZE 200
61static uv_mutex_t dump_queue_mutex;
62static QUEUE dump_queue;
63static unsigned int dump_queue_size;
64
65static int statistic_idle;
66static uv_mutex_t statistic_mutex;
67static QUEUE statistic_works;
68static uv_cond_t dump_cond;
69static uv_thread_t dump_thread;
70
71static void uv_dump_worker(void*  arg) {
72  struct uv__statistic_work* w;
73  struct uv__queue* q;
74  uv_sem_post((uv_sem_t*) arg);
75  arg = NULL;
76  uv_mutex_lock(&statistic_mutex);
77  for (;;) {
78    while (uv__queue_empty(&statistic_works)) {
79      statistic_idle = 1;
80      uv_cond_wait(&dump_cond, &statistic_mutex);
81      statistic_idle = 0;
82    }
83    q = uv__queue_head(&statistic_works);
84    if (q == &exit_message) {
85      uv_cond_signal(&dump_cond);
86      uv_mutex_unlock(&statistic_mutex);
87      break;
88    }
89    uv__queue_remove(q);
90    uv__queue_init(q);
91    uv_mutex_unlock(&statistic_mutex);
92    w = uv__queue_data(q, struct uv__statistic_work, wq);
93    w->work(w);
94    free(w);
95    uv_mutex_lock(&statistic_mutex);
96  }
97}
98
99static void post_statistic_work(struct uv__queue* q) {
100  uv_mutex_lock(&statistic_mutex);
101  uv__queue_insert_tail(&statistic_works, q);
102  if (statistic_idle)
103    uv_cond_signal(&dump_cond);
104  uv_mutex_unlock(&statistic_mutex);
105}
106
107static void uv__queue_work_info(struct uv__statistic_work *work) {
108  uv_mutex_lock(&dump_queue_mutex);
109  if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) { /* release works already done */
110    struct uv__queue* q;
111    uv__queue_foreach(q, &dump_queue) {
112      struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq);
113      if (info->state == DONE_END) {
114        uv__queue_remove(q);
115        free(info);
116        dump_queue_size--;
117      }
118    }
119    if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) {
120      abort(); /* too many works not done. */
121    }
122  }
123
124  uv__queue_insert_head(&dump_queue,  &work->info->wq);
125  dump_queue_size++;
126  uv_mutex_unlock(&dump_queue_mutex);
127}
128
129static void uv__update_work_info(struct uv__statistic_work *work) {
130  uv_mutex_lock(&dump_queue_mutex);
131  if (work != NULL && work->info != NULL) {
132    work->info->state = work->state;
133    switch (work->state) {
134      case WAITING:
135        work->info->queue_time = work->time;
136        break;
137      case WORK_EXECUTING:
138        work->info->execute_start_time = work->time;
139        break;
140      case WORK_END:
141        work->info->execute_end_time = work->time;
142        break;
143      case DONE_EXECUTING:
144        work->info->done_start_time = work->time;
145        break;
146      case DONE_END:
147        work->info->done_end_time = work->time;
148        break;
149      default:
150        break;
151    }
152  }
153  uv_mutex_unlock(&dump_queue_mutex);
154}
155
156
157// return the timestamp in millisecond
158static uint64_t uv__now_timestamp() {
159  uv_timeval64_t tv;
160  int r = uv_gettimeofday(&tv);
161  if (r != 0) {
162    return 0;
163  }
164  return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
165}
166
167
168static void uv__post_statistic_work(struct uv__work *w, enum uv_work_state state) {
169  struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
170  if (dump_work == NULL) {
171    return;
172  }
173  dump_work->info = w->info;
174  dump_work->work = uv__update_work_info;
175  dump_work->time = uv__now_timestamp();
176  dump_work->state = state;
177  uv__queue_init(&dump_work->wq);
178  post_statistic_work(&dump_work->wq);
179}
180
181
182static void init_work_dump_queue()
183{
184  if (uv_mutex_init(&dump_queue_mutex))
185    abort();
186  uv_mutex_lock(&dump_queue_mutex);
187  uv__queue_init(&dump_queue);
188  dump_queue_size = 0;
189  uv_mutex_unlock(&dump_queue_mutex);
190
191  /* init dump thread */
192  statistic_idle = 1;
193  if (uv_mutex_init(&statistic_mutex))
194    abort();
195  uv__queue_init(&statistic_works);
196  uv_sem_t sem;
197  if (uv_cond_init(&dump_cond))
198    abort();
199  if (uv_sem_init(&sem, 0))
200    abort();
201  if (uv_thread_create(&dump_thread, uv_dump_worker, &sem))
202      abort();
203  uv_sem_wait(&sem);
204  uv_sem_destroy(&sem);
205}
206
207
208void uv_init_dump_info(struct uv_work_dump_info* info, struct uv__work* w) {
209  if (info == NULL)
210    return;
211  info->queue_time = 0;
212  info->state = WAITING;
213  info->execute_start_time = 0;
214  info->execute_end_time = 0;
215  info->done_start_time = 0;
216  info->done_end_time = 0;
217  info->work = w;
218  uv__queue_init(&info->wq);
219}
220
221
222void uv_queue_statics(struct uv_work_dump_info* info) {
223  struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
224  if (dump_work == NULL) {
225    abort();
226  }
227  dump_work->info = info;
228  dump_work->work = uv__queue_work_info;
229  info->queue_time = uv__now_timestamp();
230  dump_work->state = WAITING;
231  uv__queue_init(&dump_work->wq);
232  post_statistic_work(&dump_work->wq);
233}
234
235
236uv_worker_info_t* uv_dump_work_queue(int* size) {
237#ifdef UV_STATISTIC
238  uv_mutex_lock(&dump_queue_mutex);
239  if (uv__queue_empty(&dump_queue)) {
240    return NULL;
241  }
242  *size = dump_queue_size;
243  uv_worker_info_t* dump_info = (uv_worker_info_t*) malloc(sizeof(uv_worker_info_t) * dump_queue_size);
244  struct uv__queue* q;
245  int i = 0;
246  uv__queue_foreach(q, &dump_queue) {
247    struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq);
248    dump_info[i].queue_time = info->queue_time;
249    dump_info[i].builtin_return_address[0] = info->builtin_return_address[0];
250    dump_info[i].builtin_return_address[1] = info->builtin_return_address[1];
251    dump_info[i].builtin_return_address[2] = info->builtin_return_address[2];
252    switch (info->state) {
253      case WAITING:
254        strcpy(dump_info[i].state, "waiting");
255        break;
256      case WORK_EXECUTING:
257        strcpy(dump_info[i].state, "work_executing");
258        break;
259      case WORK_END:
260        strcpy(dump_info[i].state, "work_end");
261        break;
262      case DONE_EXECUTING:
263        strcpy(dump_info[i].state, "done_executing");
264        break;
265      case DONE_END:
266        strcpy(dump_info[i].state, "done_end");
267        break;
268      default:
269        break;
270    }
271    dump_info[i].execute_start_time = info->execute_start_time;
272    dump_info[i].execute_end_time = info->execute_end_time;
273    dump_info[i].done_start_time = info->done_start_time;
274    dump_info[i].done_end_time = info->done_end_time;
275    ++i;
276  }
277  uv_mutex_unlock(&dump_queue_mutex);
278  return dump_info;
279#else
280  size = 0;
281  return NULL;
282#endif
283}
284#endif
285
286
287static void init_closed_uv_loop_rwlock_once(void) {
288  uv_rwlock_init(&g_closed_uv_loop_rwlock);
289}
290
291
292void rdlock_closed_uv_loop_rwlock(void) {
293  uv_rwlock_rdlock(&g_closed_uv_loop_rwlock);
294}
295
296
297void rdunlock_closed_uv_loop_rwlock(void) {
298  uv_rwlock_rdunlock(&g_closed_uv_loop_rwlock);
299}
300
301
302int is_uv_loop_good_magic(const uv_loop_t* loop) {
303  if (loop->magic == UV_LOOP_MAGIC) {
304    return 1;
305  }
306  UV_LOGE("loop:(%{public}zu:%{public}#x) invalid", (size_t)loop, loop->magic);
307  return 0;
308}
309
310
311void on_uv_loop_close(uv_loop_t* loop) {
312  time_t t1, t2;
313  time(&t1);
314  uv_start_trace(UV_TRACE_TAG, "Get Write Lock");
315  uv_rwlock_wrlock(&g_closed_uv_loop_rwlock);
316  uv_end_trace(UV_TRACE_TAG);
317  loop->magic = ~UV_LOOP_MAGIC;
318  uv_start_trace(UV_TRACE_TAG, "Release Write Lock");
319  uv_rwlock_wrunlock(&g_closed_uv_loop_rwlock);
320  uv_end_trace(UV_TRACE_TAG);
321  time(&t2);
322  UV_LOGI("loop:(%{public}zu) closed in %{public}zds", (size_t)loop, (ssize_t)(t2 - t1));
323}
324
325
326static void uv__cancelled(struct uv__work* w) {
327  abort();
328}
329
330
331#ifndef USE_FFRT
332static unsigned int slow_io_work_running;
333
334static unsigned int slow_work_thread_threshold(void) {
335  return (nthreads + 1) / 2;
336}
337
338
339/* To avoid deadlock with uv_cancel() it's crucial that the worker
340 * never holds the global mutex and the loop-local mutex at the same time.
341 */
342static void worker(void* arg) {
343  struct uv__work* w;
344  struct uv__queue* q;
345  int is_slow_work;
346
347  uv_sem_post((uv_sem_t*) arg);
348  arg = NULL;
349
350  uv_mutex_lock(&mutex);
351  for (;;) {
352    /* `mutex` should always be locked at this point. */
353
354    /* Keep waiting while either no work is present or only slow I/O
355       and we're at the threshold for that. */
356    while (uv__queue_empty(&wq) ||
357           (uv__queue_head(&wq) == &run_slow_work_message &&
358            uv__queue_next(&run_slow_work_message) == &wq &&
359            slow_io_work_running >= slow_work_thread_threshold())) {
360      idle_threads += 1;
361      uv_cond_wait(&cond, &mutex);
362      idle_threads -= 1;
363    }
364
365    q = uv__queue_head(&wq);
366    if (q == &exit_message) {
367      uv_cond_signal(&cond);
368      uv_mutex_unlock(&mutex);
369      break;
370    }
371
372    uv__queue_remove(q);
373    uv__queue_init(q);  /* Signal uv_cancel() that the work req is executing. */
374
375    is_slow_work = 0;
376    if (q == &run_slow_work_message) {
377      /* If we're at the slow I/O threshold, re-schedule until after all
378         other work in the queue is done. */
379      if (slow_io_work_running >= slow_work_thread_threshold()) {
380        uv__queue_insert_tail(&wq, q);
381        continue;
382      }
383
384      /* If we encountered a request to run slow I/O work but there is none
385         to run, that means it's cancelled => Start over. */
386      if (uv__queue_empty(&slow_io_pending_wq))
387        continue;
388
389      is_slow_work = 1;
390      slow_io_work_running++;
391
392      q = uv__queue_head(&slow_io_pending_wq);
393      uv__queue_remove(q);
394      uv__queue_init(q);
395
396      /* If there is more slow I/O work, schedule it to be run as well. */
397      if (!uv__queue_empty(&slow_io_pending_wq)) {
398        uv__queue_insert_tail(&wq, &run_slow_work_message);
399        if (idle_threads > 0)
400          uv_cond_signal(&cond);
401      }
402    }
403
404    uv_mutex_unlock(&mutex);
405
406    w = uv__queue_data(q, struct uv__work, wq);
407#ifdef UV_STATISTIC
408    uv__post_statistic_work(w, WORK_EXECUTING);
409#endif
410#ifdef ASYNC_STACKTRACE
411    uv_work_t* req = container_of(w, uv_work_t, work_req);
412    LibuvSetStackId((uint64_t)req->reserved[3]);
413#endif
414    w->work(w);
415#ifdef UV_STATISTIC
416    uv__post_statistic_work(w, WORK_END);
417#endif
418    uv_mutex_lock(&w->loop->wq_mutex);
419    w->work = NULL;  /* Signal uv_cancel() that the work req is done
420                        executing. */
421    uv__queue_insert_tail(&w->loop->wq, &w->wq);
422    uv_async_send(&w->loop->wq_async);
423    uv_mutex_unlock(&w->loop->wq_mutex);
424
425    /* Lock `mutex` since that is expected at the start of the next
426     * iteration. */
427    uv_mutex_lock(&mutex);
428    if (is_slow_work) {
429      /* `slow_io_work_running` is protected by `mutex`. */
430      slow_io_work_running--;
431    }
432  }
433}
434#endif
435
436
437static void post(struct uv__queue* q, enum uv__work_kind kind) {
438  uv_mutex_lock(&mutex);
439  if (kind == UV__WORK_SLOW_IO) {
440    /* Insert into a separate queue. */
441    uv__queue_insert_tail(&slow_io_pending_wq, q);
442    if (!uv__queue_empty(&run_slow_work_message)) {
443      /* Running slow I/O tasks is already scheduled => Nothing to do here.
444         The worker that runs said other task will schedule this one as well. */
445      uv_mutex_unlock(&mutex);
446      return;
447    }
448    q = &run_slow_work_message;
449  }
450
451  uv__queue_insert_tail(&wq, q);
452  if (idle_threads > 0)
453    uv_cond_signal(&cond);
454  uv_mutex_unlock(&mutex);
455}
456
457
458#ifdef __MVS__
459/* TODO(itodorov) - zos: revisit when Woz compiler is available. */
460__attribute__((destructor))
461#endif
462void uv__threadpool_cleanup(void) {
463  unsigned int i;
464
465  if (nthreads == 0)
466    return;
467
468#ifndef __MVS__
469  /* TODO(gabylb) - zos: revisit when Woz compiler is available. */
470  post(&exit_message, UV__WORK_CPU);
471#endif
472
473  for (i = 0; i < nthreads; i++)
474    if (uv_thread_join(threads + i))
475      abort();
476
477  if (threads != default_threads)
478    uv__free(threads);
479
480  uv_mutex_destroy(&mutex);
481  uv_cond_destroy(&cond);
482
483  threads = NULL;
484  nthreads = 0;
485#ifdef UV_STATISTIC
486  post_statistic_work(&exit_message);
487  uv_thread_join(dump_thread);
488  uv_mutex_destroy(&statistic_mutex);
489  uv_cond_destroy(&dump_cond);
490#endif
491}
492
493
494#ifndef USE_FFRT
495static void init_threads(void) {
496  uv_thread_options_t config;
497  unsigned int i;
498  const char* val;
499  uv_sem_t sem;
500
501  nthreads = ARRAY_SIZE(default_threads);
502  val = getenv("UV_THREADPOOL_SIZE");
503  if (val != NULL)
504    nthreads = atoi(val);
505  if (nthreads == 0)
506    nthreads = 1;
507  if (nthreads > MAX_THREADPOOL_SIZE)
508    nthreads = MAX_THREADPOOL_SIZE;
509
510  threads = default_threads;
511  if (nthreads > ARRAY_SIZE(default_threads)) {
512    threads = uv__malloc(nthreads * sizeof(threads[0]));
513    if (threads == NULL) {
514      nthreads = ARRAY_SIZE(default_threads);
515      threads = default_threads;
516    }
517  }
518
519  if (uv_cond_init(&cond))
520    abort();
521
522  if (uv_mutex_init(&mutex))
523    abort();
524
525  uv__queue_init(&wq);
526  uv__queue_init(&slow_io_pending_wq);
527  uv__queue_init(&run_slow_work_message);
528
529  if (uv_sem_init(&sem, 0))
530    abort();
531
532  config.flags = UV_THREAD_HAS_STACK_SIZE;
533  config.stack_size = 8u << 20;  /* 8 MB */
534
535  for (i = 0; i < nthreads; i++)
536    if (uv_thread_create_ex(threads + i, &config, worker, &sem))
537      abort();
538
539  for (i = 0; i < nthreads; i++)
540    uv_sem_wait(&sem);
541
542  uv_sem_destroy(&sem);
543}
544
545
546#ifndef _WIN32
547static void reset_once(void) {
548  uv_once_t child_once = UV_ONCE_INIT;
549  memcpy(&once, &child_once, sizeof(child_once));
550}
551#endif
552
553
554static void init_once(void) {
555#ifndef _WIN32
556  /* Re-initialize the threadpool after fork.
557   * Note that this discards the global mutex and condition as well
558   * as the work queue.
559   */
560  if (pthread_atfork(NULL, NULL, &reset_once))
561    abort();
562#endif
563  init_closed_uv_loop_rwlock_once();
564#ifdef UV_STATISTIC
565  init_work_dump_queue();
566#endif
567  init_threads();
568}
569
570
571void uv__work_submit(uv_loop_t* loop,
572                     struct uv__work* w,
573                     enum uv__work_kind kind,
574                     void (*work)(struct uv__work* w),
575                     void (*done)(struct uv__work* w, int status)) {
576  uv_once(&once, init_once);
577  w->loop = loop;
578  w->work = work;
579  w->done = done;
580  post(&w->wq, kind);
581}
582#endif
583
584
585#ifdef USE_FFRT
586static void uv__task_done_wrapper(void* work, int status) {
587  struct uv__work* w = (struct uv__work*)work;
588  w->done(w, status);
589}
590#endif
591
592
593/* TODO(bnoordhuis) teach libuv how to cancel file operations
594 * that go through io_uring instead of the thread pool.
595 */
596static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
597  int cancelled;
598
599  rdlock_closed_uv_loop_rwlock();
600  if (!is_uv_loop_good_magic(w->loop)) {
601    rdunlock_closed_uv_loop_rwlock();
602    return 0;
603  }
604
605#ifndef USE_FFRT
606  uv_mutex_lock(&mutex);
607  uv_mutex_lock(&w->loop->wq_mutex);
608
609  cancelled = !uv__queue_empty(&w->wq) && w->work != NULL;
610  if (cancelled)
611    uv__queue_remove(&w->wq);
612
613  uv_mutex_unlock(&w->loop->wq_mutex);
614  uv_mutex_unlock(&mutex);
615#else
616  uv_mutex_lock(&w->loop->wq_mutex);
617  cancelled = !uv__queue_empty(&w->wq) && w->work != NULL
618    && ffrt_executor_task_cancel(w, (ffrt_qos_t)(intptr_t)req->reserved[0]);
619  uv_mutex_unlock(&w->loop->wq_mutex);
620#endif
621
622  if (!cancelled) {
623    rdunlock_closed_uv_loop_rwlock();
624    return UV_EBUSY;
625  }
626
627  w->work = uv__cancelled;
628  uv_mutex_lock(&loop->wq_mutex);
629#ifndef USE_FFRT
630  uv__queue_insert_tail(&loop->wq, &w->wq);
631  uv_async_send(&loop->wq_async);
632#else
633  uv__loop_internal_fields_t* lfields = uv__get_internal_fields(w->loop);
634  int qos = (ffrt_qos_t)(intptr_t)req->reserved[0];
635
636  if (uv_check_data_valid((struct uv_loop_data*)(w->loop->data)) == 0) {
637    int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
638    struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)w->loop->data -
639      (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS));
640    addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos);
641  } else {
642    uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
643    uv_async_send(&loop->wq_async);
644  }
645#endif
646  uv_mutex_unlock(&loop->wq_mutex);
647  rdunlock_closed_uv_loop_rwlock();
648
649  return 0;
650}
651
652
653void uv__work_done(uv_async_t* handle) {
654  struct uv__work* w;
655  uv_loop_t* loop;
656  struct uv__queue* q;
657  struct uv__queue wq;
658  int err;
659  int nevents;
660
661  loop = container_of(handle, uv_loop_t, wq_async);
662  rdlock_closed_uv_loop_rwlock();
663  if (!is_uv_loop_good_magic(loop)) {
664    rdunlock_closed_uv_loop_rwlock();
665    return;
666  }
667  rdunlock_closed_uv_loop_rwlock();
668
669  uv_mutex_lock(&loop->wq_mutex);
670#ifndef USE_FFRT
671  uv__queue_move(&loop->wq, &wq);
672#else
673  uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
674  int i;
675  uv__queue_init(&wq);
676  for (i = 3; i >= 0; i--) {
677    if (!uv__queue_empty(&lfields->wq_sub[i])) {
678      uv__queue_append(&lfields->wq_sub[i], &wq);
679    }
680  }
681#endif
682  uv_mutex_unlock(&loop->wq_mutex);
683
684  nevents = 0;
685  uv_start_trace(UV_TRACE_TAG, UV_TRACE_NAME);
686  while (!uv__queue_empty(&wq)) {
687    q = uv__queue_head(&wq);
688    uv__queue_remove(q);
689
690    w = container_of(q, struct uv__work, wq);
691    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
692#ifdef UV_STATISTIC
693    uv__post_statistic_work(w, DONE_EXECUTING);
694    struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
695    if (dump_work == NULL) {
696      UV_LOGE("malloc(%{public}zu) failed: %{public}s", sizeof(struct uv__statistic_work), strerror(errno));
697      break;
698    }
699    dump_work->info = w->info;
700    dump_work->work = uv__update_work_info;
701#endif
702#ifdef ASYNC_STACKTRACE
703    uv_work_t* req = container_of(w, uv_work_t, work_req);
704    LibuvSetStackId((uint64_t)req->reserved[3]);
705#endif
706    w->done(w, err);
707    nevents++;
708#ifdef UV_STATISTIC
709    dump_work->time = uv__now_timestamp();
710    dump_work->state = DONE_END;
711    QUEUE_INIT(&dump_work->wq);
712    post_statistic_work(&dump_work->wq);
713#endif
714  }
715  uv_end_trace(UV_TRACE_TAG);
716
717  /* This check accomplishes 2 things:
718   * 1. Even if the queue was empty, the call to uv__work_done() should count
719   *    as an event. Which will have been added by the event loop when
720   *    calling this callback.
721   * 2. Prevents accidental wrap around in case nevents == 0 events == 0.
722   */
723  if (nevents > 1) {
724    /* Subtract 1 to counter the call to uv__work_done(). */
725    uv__metrics_inc_events(loop, nevents - 1);
726    if (uv__get_internal_fields(loop)->current_timeout == 0)
727      uv__metrics_inc_events_waiting(loop, nevents - 1);
728  }
729}
730
731
732static void uv__queue_work(struct uv__work* w) {
733  uv_work_t* req = container_of(w, uv_work_t, work_req);
734
735  req->work_cb(req);
736}
737
738
739static void uv__queue_done(struct uv__work* w, int err) {
740  uv_work_t* req;
741
742  if (w == NULL) {
743    UV_LOGE("uv_work_t is NULL");
744    return;
745  }
746
747  req = container_of(w, uv_work_t, work_req);
748  uv__req_unregister(req->loop, req);
749
750  if (req->after_work_cb == NULL)
751    return;
752
753  req->after_work_cb(req, err);
754}
755
756
757#ifdef USE_FFRT
758void uv__ffrt_work(ffrt_executor_task_t* data, ffrt_qos_t qos)
759{
760  struct uv__work* w = (struct uv__work *)data;
761  uv_loop_t* loop = w->loop;
762#ifdef UV_STATISTIC
763  uv__post_statistic_work(w, WORK_EXECUTING);
764#endif
765  uv_work_t* req = container_of(w, uv_work_t, work_req);
766#ifdef ASYNC_STACKTRACE
767  LibuvSetStackId((uint64_t)req->reserved[3]);
768#endif
769  w->work(w);
770#ifdef UV_STATISTIC
771  uv__post_statistic_work(w, WORK_END);
772#endif
773  rdlock_closed_uv_loop_rwlock();
774  if (loop->magic != UV_LOOP_MAGIC) {
775    rdunlock_closed_uv_loop_rwlock();
776    UV_LOGE("uv_loop(%{public}zu:%{public}#x) in task(%p:%p) is invalid",
777            (size_t)loop, loop->magic, req->work_cb, req->after_work_cb);
778    return;
779  }
780
781  uv_mutex_lock(&loop->wq_mutex);
782  w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */
783
784  if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) == 0) {
785    int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
786    struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)loop->data -
787      (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS));
788    addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos);
789  } else {
790    uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
791    uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
792    uv_async_send(&loop->wq_async);
793  }
794  uv_mutex_unlock(&loop->wq_mutex);
795  rdunlock_closed_uv_loop_rwlock();
796}
797
798static void init_once(void)
799{
800  init_closed_uv_loop_rwlock_once();
801  /* init uv work statics queue */
802#ifdef UV_STATISTIC
803  init_work_dump_queue();
804#endif
805  ffrt_executor_task_register_func(uv__ffrt_work, ffrt_uv_task);
806}
807
808
809/* ffrt uv__work_submit */
810void uv__work_submit(uv_loop_t* loop,
811                     uv_req_t* req,
812                     struct uv__work* w,
813                     enum uv__work_kind kind,
814                     void (*work)(struct uv__work *w),
815                     void (*done)(struct uv__work *w, int status)) {
816  uv_once(&once, init_once);
817  ffrt_task_attr_t attr;
818  ffrt_task_attr_init(&attr);
819
820  switch(kind) {
821    case UV__WORK_CPU:
822      ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
823      break;
824    case UV__WORK_FAST_IO:
825      ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
826      break;
827    case UV__WORK_SLOW_IO:
828      ffrt_task_attr_set_qos(&attr, ffrt_qos_background);
829      break;
830    default:
831#ifdef USE_OHOS_DFX
832      UV_LOGI("Unknown work kind");
833#endif
834      return;
835  }
836
837  w->loop = loop;
838  w->work = work;
839  w->done = done;
840
841  req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
842  ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
843  ffrt_task_attr_destroy(&attr);
844}
845
846
847/* ffrt uv__work_submit */
848void uv__work_submit_with_qos(uv_loop_t* loop,
849                     uv_req_t* req,
850                     struct uv__work* w,
851                     ffrt_qos_t qos,
852                     void (*work)(struct uv__work *w),
853                     void (*done)(struct uv__work *w, int status)) {
854    uv_once(&once, init_once);
855    ffrt_task_attr_t attr;
856    ffrt_task_attr_init(&attr);
857    ffrt_task_attr_set_qos(&attr, qos);
858
859    w->loop = loop;
860    w->work = work;
861    w->done = done;
862
863    req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
864    ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
865    ffrt_task_attr_destroy(&attr);
866}
867#endif
868
869
870int uv_queue_work(uv_loop_t* loop,
871                  uv_work_t* req,
872                  uv_work_cb work_cb,
873                  uv_after_work_cb after_work_cb) {
874  if (work_cb == NULL)
875    return UV_EINVAL;
876
877  uv__req_init(loop, req, UV_WORK);
878  req->loop = loop;
879  req->work_cb = work_cb;
880  req->after_work_cb = after_work_cb;
881
882#ifdef UV_STATISTIC
883  struct uv_work_dump_info* info = (struct uv_work_dump_info*) malloc(sizeof(struct uv_work_dump_info));
884  if (info == NULL) {
885    abort();
886  }
887  uv_init_dump_info(info, &req->work_req);
888  info->builtin_return_address[0] = __builtin_return_address(0);
889  info->builtin_return_address[1] = __builtin_return_address(1);
890  info->builtin_return_address[2] = __builtin_return_address(2);
891  (req->work_req).info = info;
892#endif
893#ifdef ASYNC_STACKTRACE
894  req->reserved[3] = (void*)LibuvCollectAsyncStack();
895#endif
896  uv__work_submit(loop,
897#ifdef USE_FFRT
898                  (uv_req_t*)req,
899#endif
900                  &req->work_req,
901                  UV__WORK_CPU,
902                  uv__queue_work,
903                  uv__queue_done
904);
905#ifdef UV_STATISTIC
906  uv_queue_statics(info);
907#endif
908  return 0;
909}
910
911
912int uv_queue_work_with_qos(uv_loop_t* loop,
913                  uv_work_t* req,
914                  uv_work_cb work_cb,
915                  uv_after_work_cb after_work_cb,
916                  uv_qos_t qos) {
917#ifdef USE_FFRT
918  if (work_cb == NULL)
919    return UV_EINVAL;
920
921  STATIC_ASSERT(uv_qos_background == ffrt_qos_background);
922  STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility);
923  STATIC_ASSERT(uv_qos_default == ffrt_qos_default);
924  STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated);
925  if (qos < ffrt_qos_background || qos > ffrt_qos_user_initiated) {
926    return UV_EINVAL;
927  }
928
929  uv__req_init(loop, req, UV_WORK);
930  req->loop = loop;
931  req->work_cb = work_cb;
932  req->after_work_cb = after_work_cb;
933#ifdef UV_STATISTIC
934  struct uv_work_dump_info* info = (struct uv_work_dump_info*)malloc(sizeof(struct uv_work_dump_info));
935  if (info == NULL) {
936    abort();
937  }
938  uv_init_dump_info(info, &req->work_req);
939  info->builtin_return_address[0] = __builtin_return_address(0);
940  info->builtin_return_address[1] = __builtin_return_address(1);
941  info->builtin_return_address[2] = __builtin_return_address(2);
942  (req->work_req).info = info;
943#endif
944  uv__work_submit_with_qos(loop,
945                  (uv_req_t*)req,
946                  &req->work_req,
947                  (ffrt_qos_t)qos,
948                  uv__queue_work,
949                  uv__queue_done);
950#ifdef UV_STATISTIC
951  uv_queue_statics(info);
952#endif
953  return 0;
954#else
955  return uv_queue_work(loop, req, work_cb, after_work_cb);
956#endif
957}
958
959
960int uv_cancel(uv_req_t* req) {
961  struct uv__work* wreq;
962  uv_loop_t* loop;
963
964  switch (req->type) {
965  case UV_FS:
966    loop =  ((uv_fs_t*) req)->loop;
967    wreq = &((uv_fs_t*) req)->work_req;
968    break;
969  case UV_GETADDRINFO:
970    loop =  ((uv_getaddrinfo_t*) req)->loop;
971    wreq = &((uv_getaddrinfo_t*) req)->work_req;
972    break;
973  case UV_GETNAMEINFO:
974    loop = ((uv_getnameinfo_t*) req)->loop;
975    wreq = &((uv_getnameinfo_t*) req)->work_req;
976    break;
977  case UV_RANDOM:
978    loop = ((uv_random_t*) req)->loop;
979    wreq = &((uv_random_t*) req)->work_req;
980    break;
981  case UV_WORK:
982    loop =  ((uv_work_t*) req)->loop;
983    wreq = &((uv_work_t*) req)->work_req;
984    break;
985  default:
986    return UV_EINVAL;
987  }
988
989  return uv__work_cancel(loop, req, wreq);
990}
991