1e66f31c5Sopenharmony_ci/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2e66f31c5Sopenharmony_ci *
3e66f31c5Sopenharmony_ci * Permission is hereby granted, free of charge, to any person obtaining a copy
4e66f31c5Sopenharmony_ci * of this software and associated documentation files (the "Software"), to
5e66f31c5Sopenharmony_ci * deal in the Software without restriction, including without limitation the
6e66f31c5Sopenharmony_ci * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7e66f31c5Sopenharmony_ci * sell copies of the Software, and to permit persons to whom the Software is
8e66f31c5Sopenharmony_ci * furnished to do so, subject to the following conditions:
9e66f31c5Sopenharmony_ci *
10e66f31c5Sopenharmony_ci * The above copyright notice and this permission notice shall be included in
11e66f31c5Sopenharmony_ci * all copies or substantial portions of the Software.
12e66f31c5Sopenharmony_ci *
13e66f31c5Sopenharmony_ci * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14e66f31c5Sopenharmony_ci * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15e66f31c5Sopenharmony_ci * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16e66f31c5Sopenharmony_ci * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17e66f31c5Sopenharmony_ci * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18e66f31c5Sopenharmony_ci * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19e66f31c5Sopenharmony_ci * IN THE SOFTWARE.
20e66f31c5Sopenharmony_ci */
21e66f31c5Sopenharmony_ci
22e66f31c5Sopenharmony_ci#include "uv-common.h"
23e66f31c5Sopenharmony_ci#include "uv_log.h"
24e66f31c5Sopenharmony_ci#include "uv_trace.h"
25e66f31c5Sopenharmony_ci
26e66f31c5Sopenharmony_ci#if !defined(_WIN32)
27e66f31c5Sopenharmony_ci# include "unix/internal.h"
28e66f31c5Sopenharmony_ci#else
29e66f31c5Sopenharmony_ci# include "win/internal.h"
30e66f31c5Sopenharmony_ci#endif
31e66f31c5Sopenharmony_ci
32e66f31c5Sopenharmony_ci#include <stdlib.h>
33e66f31c5Sopenharmony_ci#ifdef USE_FFRT
34e66f31c5Sopenharmony_ci#include <assert.h>
35e66f31c5Sopenharmony_ci#include "ffrt_inner.h"
36e66f31c5Sopenharmony_ci#endif
37e66f31c5Sopenharmony_ci#include <stdio.h>
38e66f31c5Sopenharmony_ci#ifdef ASYNC_STACKTRACE
39e66f31c5Sopenharmony_ci#include "dfx/async_stack/libuv_async_stack.h"
40e66f31c5Sopenharmony_ci#endif
41e66f31c5Sopenharmony_ci
42e66f31c5Sopenharmony_ci#define MAX_THREADPOOL_SIZE 1024
43e66f31c5Sopenharmony_ci#define UV_TRACE_NAME "UV_TRACE"
44e66f31c5Sopenharmony_ci
45e66f31c5Sopenharmony_cistatic uv_rwlock_t g_closed_uv_loop_rwlock;
46e66f31c5Sopenharmony_cistatic uv_once_t once = UV_ONCE_INIT;
47e66f31c5Sopenharmony_cistatic uv_cond_t cond;
48e66f31c5Sopenharmony_cistatic uv_mutex_t mutex;
49e66f31c5Sopenharmony_cistatic unsigned int idle_threads;
50e66f31c5Sopenharmony_cistatic unsigned int nthreads;
51e66f31c5Sopenharmony_cistatic uv_thread_t* threads;
52e66f31c5Sopenharmony_cistatic uv_thread_t default_threads[4];
53e66f31c5Sopenharmony_cistatic struct uv__queue exit_message;
54e66f31c5Sopenharmony_cistatic struct uv__queue wq;
55e66f31c5Sopenharmony_cistatic struct uv__queue run_slow_work_message;
56e66f31c5Sopenharmony_cistatic struct uv__queue slow_io_pending_wq;
57e66f31c5Sopenharmony_ci
58e66f31c5Sopenharmony_ci
59e66f31c5Sopenharmony_ci#ifdef UV_STATISTIC
60e66f31c5Sopenharmony_ci#define MAX_DUMP_QUEUE_SIZE 200
61e66f31c5Sopenharmony_cistatic uv_mutex_t dump_queue_mutex;
62e66f31c5Sopenharmony_cistatic QUEUE dump_queue;
63e66f31c5Sopenharmony_cistatic unsigned int dump_queue_size;
64e66f31c5Sopenharmony_ci
65e66f31c5Sopenharmony_cistatic int statistic_idle;
66e66f31c5Sopenharmony_cistatic uv_mutex_t statistic_mutex;
67e66f31c5Sopenharmony_cistatic QUEUE statistic_works;
68e66f31c5Sopenharmony_cistatic uv_cond_t dump_cond;
69e66f31c5Sopenharmony_cistatic uv_thread_t dump_thread;
70e66f31c5Sopenharmony_ci
71e66f31c5Sopenharmony_cistatic void uv_dump_worker(void*  arg) {
72e66f31c5Sopenharmony_ci  struct uv__statistic_work* w;
73e66f31c5Sopenharmony_ci  struct uv__queue* q;
74e66f31c5Sopenharmony_ci  uv_sem_post((uv_sem_t*) arg);
75e66f31c5Sopenharmony_ci  arg = NULL;
76e66f31c5Sopenharmony_ci  uv_mutex_lock(&statistic_mutex);
77e66f31c5Sopenharmony_ci  for (;;) {
78e66f31c5Sopenharmony_ci    while (uv__queue_empty(&statistic_works)) {
79e66f31c5Sopenharmony_ci      statistic_idle = 1;
80e66f31c5Sopenharmony_ci      uv_cond_wait(&dump_cond, &statistic_mutex);
81e66f31c5Sopenharmony_ci      statistic_idle = 0;
82e66f31c5Sopenharmony_ci    }
83e66f31c5Sopenharmony_ci    q = uv__queue_head(&statistic_works);
84e66f31c5Sopenharmony_ci    if (q == &exit_message) {
85e66f31c5Sopenharmony_ci      uv_cond_signal(&dump_cond);
86e66f31c5Sopenharmony_ci      uv_mutex_unlock(&statistic_mutex);
87e66f31c5Sopenharmony_ci      break;
88e66f31c5Sopenharmony_ci    }
89e66f31c5Sopenharmony_ci    uv__queue_remove(q);
90e66f31c5Sopenharmony_ci    uv__queue_init(q);
91e66f31c5Sopenharmony_ci    uv_mutex_unlock(&statistic_mutex);
92e66f31c5Sopenharmony_ci    w = uv__queue_data(q, struct uv__statistic_work, wq);
93e66f31c5Sopenharmony_ci    w->work(w);
94e66f31c5Sopenharmony_ci    free(w);
95e66f31c5Sopenharmony_ci    uv_mutex_lock(&statistic_mutex);
96e66f31c5Sopenharmony_ci  }
97e66f31c5Sopenharmony_ci}
98e66f31c5Sopenharmony_ci
99e66f31c5Sopenharmony_cistatic void post_statistic_work(struct uv__queue* q) {
100e66f31c5Sopenharmony_ci  uv_mutex_lock(&statistic_mutex);
101e66f31c5Sopenharmony_ci  uv__queue_insert_tail(&statistic_works, q);
102e66f31c5Sopenharmony_ci  if (statistic_idle)
103e66f31c5Sopenharmony_ci    uv_cond_signal(&dump_cond);
104e66f31c5Sopenharmony_ci  uv_mutex_unlock(&statistic_mutex);
105e66f31c5Sopenharmony_ci}
106e66f31c5Sopenharmony_ci
107e66f31c5Sopenharmony_cistatic void uv__queue_work_info(struct uv__statistic_work *work) {
108e66f31c5Sopenharmony_ci  uv_mutex_lock(&dump_queue_mutex);
109e66f31c5Sopenharmony_ci  if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) { /* release works already done */
110e66f31c5Sopenharmony_ci    struct uv__queue* q;
111e66f31c5Sopenharmony_ci    uv__queue_foreach(q, &dump_queue) {
112e66f31c5Sopenharmony_ci      struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq);
113e66f31c5Sopenharmony_ci      if (info->state == DONE_END) {
114e66f31c5Sopenharmony_ci        uv__queue_remove(q);
115e66f31c5Sopenharmony_ci        free(info);
116e66f31c5Sopenharmony_ci        dump_queue_size--;
117e66f31c5Sopenharmony_ci      }
118e66f31c5Sopenharmony_ci    }
119e66f31c5Sopenharmony_ci    if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) {
120e66f31c5Sopenharmony_ci      abort(); /* too many works not done. */
121e66f31c5Sopenharmony_ci    }
122e66f31c5Sopenharmony_ci  }
123e66f31c5Sopenharmony_ci
124e66f31c5Sopenharmony_ci  uv__queue_insert_head(&dump_queue,  &work->info->wq);
125e66f31c5Sopenharmony_ci  dump_queue_size++;
126e66f31c5Sopenharmony_ci  uv_mutex_unlock(&dump_queue_mutex);
127e66f31c5Sopenharmony_ci}
128e66f31c5Sopenharmony_ci
129e66f31c5Sopenharmony_cistatic void uv__update_work_info(struct uv__statistic_work *work) {
130e66f31c5Sopenharmony_ci  uv_mutex_lock(&dump_queue_mutex);
131e66f31c5Sopenharmony_ci  if (work != NULL && work->info != NULL) {
132e66f31c5Sopenharmony_ci    work->info->state = work->state;
133e66f31c5Sopenharmony_ci    switch (work->state) {
134e66f31c5Sopenharmony_ci      case WAITING:
135e66f31c5Sopenharmony_ci        work->info->queue_time = work->time;
136e66f31c5Sopenharmony_ci        break;
137e66f31c5Sopenharmony_ci      case WORK_EXECUTING:
138e66f31c5Sopenharmony_ci        work->info->execute_start_time = work->time;
139e66f31c5Sopenharmony_ci        break;
140e66f31c5Sopenharmony_ci      case WORK_END:
141e66f31c5Sopenharmony_ci        work->info->execute_end_time = work->time;
142e66f31c5Sopenharmony_ci        break;
143e66f31c5Sopenharmony_ci      case DONE_EXECUTING:
144e66f31c5Sopenharmony_ci        work->info->done_start_time = work->time;
145e66f31c5Sopenharmony_ci        break;
146e66f31c5Sopenharmony_ci      case DONE_END:
147e66f31c5Sopenharmony_ci        work->info->done_end_time = work->time;
148e66f31c5Sopenharmony_ci        break;
149e66f31c5Sopenharmony_ci      default:
150e66f31c5Sopenharmony_ci        break;
151e66f31c5Sopenharmony_ci    }
152e66f31c5Sopenharmony_ci  }
153e66f31c5Sopenharmony_ci  uv_mutex_unlock(&dump_queue_mutex);
154e66f31c5Sopenharmony_ci}
155e66f31c5Sopenharmony_ci
156e66f31c5Sopenharmony_ci
157e66f31c5Sopenharmony_ci// return the timestamp in millisecond
158e66f31c5Sopenharmony_cistatic uint64_t uv__now_timestamp() {
159e66f31c5Sopenharmony_ci  uv_timeval64_t tv;
160e66f31c5Sopenharmony_ci  int r = uv_gettimeofday(&tv);
161e66f31c5Sopenharmony_ci  if (r != 0) {
162e66f31c5Sopenharmony_ci    return 0;
163e66f31c5Sopenharmony_ci  }
164e66f31c5Sopenharmony_ci  return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
165e66f31c5Sopenharmony_ci}
166e66f31c5Sopenharmony_ci
167e66f31c5Sopenharmony_ci
168e66f31c5Sopenharmony_cistatic void uv__post_statistic_work(struct uv__work *w, enum uv_work_state state) {
169e66f31c5Sopenharmony_ci  struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
170e66f31c5Sopenharmony_ci  if (dump_work == NULL) {
171e66f31c5Sopenharmony_ci    return;
172e66f31c5Sopenharmony_ci  }
173e66f31c5Sopenharmony_ci  dump_work->info = w->info;
174e66f31c5Sopenharmony_ci  dump_work->work = uv__update_work_info;
175e66f31c5Sopenharmony_ci  dump_work->time = uv__now_timestamp();
176e66f31c5Sopenharmony_ci  dump_work->state = state;
177e66f31c5Sopenharmony_ci  uv__queue_init(&dump_work->wq);
178e66f31c5Sopenharmony_ci  post_statistic_work(&dump_work->wq);
179e66f31c5Sopenharmony_ci}
180e66f31c5Sopenharmony_ci
181e66f31c5Sopenharmony_ci
182e66f31c5Sopenharmony_cistatic void init_work_dump_queue()
183e66f31c5Sopenharmony_ci{
184e66f31c5Sopenharmony_ci  if (uv_mutex_init(&dump_queue_mutex))
185e66f31c5Sopenharmony_ci    abort();
186e66f31c5Sopenharmony_ci  uv_mutex_lock(&dump_queue_mutex);
187e66f31c5Sopenharmony_ci  uv__queue_init(&dump_queue);
188e66f31c5Sopenharmony_ci  dump_queue_size = 0;
189e66f31c5Sopenharmony_ci  uv_mutex_unlock(&dump_queue_mutex);
190e66f31c5Sopenharmony_ci
191e66f31c5Sopenharmony_ci  /* init dump thread */
192e66f31c5Sopenharmony_ci  statistic_idle = 1;
193e66f31c5Sopenharmony_ci  if (uv_mutex_init(&statistic_mutex))
194e66f31c5Sopenharmony_ci    abort();
195e66f31c5Sopenharmony_ci  uv__queue_init(&statistic_works);
196e66f31c5Sopenharmony_ci  uv_sem_t sem;
197e66f31c5Sopenharmony_ci  if (uv_cond_init(&dump_cond))
198e66f31c5Sopenharmony_ci    abort();
199e66f31c5Sopenharmony_ci  if (uv_sem_init(&sem, 0))
200e66f31c5Sopenharmony_ci    abort();
201e66f31c5Sopenharmony_ci  if (uv_thread_create(&dump_thread, uv_dump_worker, &sem))
202e66f31c5Sopenharmony_ci      abort();
203e66f31c5Sopenharmony_ci  uv_sem_wait(&sem);
204e66f31c5Sopenharmony_ci  uv_sem_destroy(&sem);
205e66f31c5Sopenharmony_ci}
206e66f31c5Sopenharmony_ci
207e66f31c5Sopenharmony_ci
208e66f31c5Sopenharmony_civoid uv_init_dump_info(struct uv_work_dump_info* info, struct uv__work* w) {
209e66f31c5Sopenharmony_ci  if (info == NULL)
210e66f31c5Sopenharmony_ci    return;
211e66f31c5Sopenharmony_ci  info->queue_time = 0;
212e66f31c5Sopenharmony_ci  info->state = WAITING;
213e66f31c5Sopenharmony_ci  info->execute_start_time = 0;
214e66f31c5Sopenharmony_ci  info->execute_end_time = 0;
215e66f31c5Sopenharmony_ci  info->done_start_time = 0;
216e66f31c5Sopenharmony_ci  info->done_end_time = 0;
217e66f31c5Sopenharmony_ci  info->work = w;
218e66f31c5Sopenharmony_ci  uv__queue_init(&info->wq);
219e66f31c5Sopenharmony_ci}
220e66f31c5Sopenharmony_ci
221e66f31c5Sopenharmony_ci
222e66f31c5Sopenharmony_civoid uv_queue_statics(struct uv_work_dump_info* info) {
223e66f31c5Sopenharmony_ci  struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
224e66f31c5Sopenharmony_ci  if (dump_work == NULL) {
225e66f31c5Sopenharmony_ci    abort();
226e66f31c5Sopenharmony_ci  }
227e66f31c5Sopenharmony_ci  dump_work->info = info;
228e66f31c5Sopenharmony_ci  dump_work->work = uv__queue_work_info;
229e66f31c5Sopenharmony_ci  info->queue_time = uv__now_timestamp();
230e66f31c5Sopenharmony_ci  dump_work->state = WAITING;
231e66f31c5Sopenharmony_ci  uv__queue_init(&dump_work->wq);
232e66f31c5Sopenharmony_ci  post_statistic_work(&dump_work->wq);
233e66f31c5Sopenharmony_ci}
234e66f31c5Sopenharmony_ci
235e66f31c5Sopenharmony_ci
236e66f31c5Sopenharmony_ciuv_worker_info_t* uv_dump_work_queue(int* size) {
237e66f31c5Sopenharmony_ci#ifdef UV_STATISTIC
238e66f31c5Sopenharmony_ci  uv_mutex_lock(&dump_queue_mutex);
239e66f31c5Sopenharmony_ci  if (uv__queue_empty(&dump_queue)) {
240e66f31c5Sopenharmony_ci    return NULL;
241e66f31c5Sopenharmony_ci  }
242e66f31c5Sopenharmony_ci  *size = dump_queue_size;
243e66f31c5Sopenharmony_ci  uv_worker_info_t* dump_info = (uv_worker_info_t*) malloc(sizeof(uv_worker_info_t) * dump_queue_size);
244e66f31c5Sopenharmony_ci  struct uv__queue* q;
245e66f31c5Sopenharmony_ci  int i = 0;
246e66f31c5Sopenharmony_ci  uv__queue_foreach(q, &dump_queue) {
247e66f31c5Sopenharmony_ci    struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq);
248e66f31c5Sopenharmony_ci    dump_info[i].queue_time = info->queue_time;
249e66f31c5Sopenharmony_ci    dump_info[i].builtin_return_address[0] = info->builtin_return_address[0];
250e66f31c5Sopenharmony_ci    dump_info[i].builtin_return_address[1] = info->builtin_return_address[1];
251e66f31c5Sopenharmony_ci    dump_info[i].builtin_return_address[2] = info->builtin_return_address[2];
252e66f31c5Sopenharmony_ci    switch (info->state) {
253e66f31c5Sopenharmony_ci      case WAITING:
254e66f31c5Sopenharmony_ci        strcpy(dump_info[i].state, "waiting");
255e66f31c5Sopenharmony_ci        break;
256e66f31c5Sopenharmony_ci      case WORK_EXECUTING:
257e66f31c5Sopenharmony_ci        strcpy(dump_info[i].state, "work_executing");
258e66f31c5Sopenharmony_ci        break;
259e66f31c5Sopenharmony_ci      case WORK_END:
260e66f31c5Sopenharmony_ci        strcpy(dump_info[i].state, "work_end");
261e66f31c5Sopenharmony_ci        break;
262e66f31c5Sopenharmony_ci      case DONE_EXECUTING:
263e66f31c5Sopenharmony_ci        strcpy(dump_info[i].state, "done_executing");
264e66f31c5Sopenharmony_ci        break;
265e66f31c5Sopenharmony_ci      case DONE_END:
266e66f31c5Sopenharmony_ci        strcpy(dump_info[i].state, "done_end");
267e66f31c5Sopenharmony_ci        break;
268e66f31c5Sopenharmony_ci      default:
269e66f31c5Sopenharmony_ci        break;
270e66f31c5Sopenharmony_ci    }
271e66f31c5Sopenharmony_ci    dump_info[i].execute_start_time = info->execute_start_time;
272e66f31c5Sopenharmony_ci    dump_info[i].execute_end_time = info->execute_end_time;
273e66f31c5Sopenharmony_ci    dump_info[i].done_start_time = info->done_start_time;
274e66f31c5Sopenharmony_ci    dump_info[i].done_end_time = info->done_end_time;
275e66f31c5Sopenharmony_ci    ++i;
276e66f31c5Sopenharmony_ci  }
277e66f31c5Sopenharmony_ci  uv_mutex_unlock(&dump_queue_mutex);
278e66f31c5Sopenharmony_ci  return dump_info;
279e66f31c5Sopenharmony_ci#else
280e66f31c5Sopenharmony_ci  size = 0;
281e66f31c5Sopenharmony_ci  return NULL;
282e66f31c5Sopenharmony_ci#endif
283e66f31c5Sopenharmony_ci}
284e66f31c5Sopenharmony_ci#endif
285e66f31c5Sopenharmony_ci
286e66f31c5Sopenharmony_ci
287e66f31c5Sopenharmony_cistatic void init_closed_uv_loop_rwlock_once(void) {
288e66f31c5Sopenharmony_ci  uv_rwlock_init(&g_closed_uv_loop_rwlock);
289e66f31c5Sopenharmony_ci}
290e66f31c5Sopenharmony_ci
291e66f31c5Sopenharmony_ci
292e66f31c5Sopenharmony_civoid rdlock_closed_uv_loop_rwlock(void) {
293e66f31c5Sopenharmony_ci  uv_rwlock_rdlock(&g_closed_uv_loop_rwlock);
294e66f31c5Sopenharmony_ci}
295e66f31c5Sopenharmony_ci
296e66f31c5Sopenharmony_ci
297e66f31c5Sopenharmony_civoid rdunlock_closed_uv_loop_rwlock(void) {
298e66f31c5Sopenharmony_ci  uv_rwlock_rdunlock(&g_closed_uv_loop_rwlock);
299e66f31c5Sopenharmony_ci}
300e66f31c5Sopenharmony_ci
301e66f31c5Sopenharmony_ci
302e66f31c5Sopenharmony_ciint is_uv_loop_good_magic(const uv_loop_t* loop) {
303e66f31c5Sopenharmony_ci  if (loop->magic == UV_LOOP_MAGIC) {
304e66f31c5Sopenharmony_ci    return 1;
305e66f31c5Sopenharmony_ci  }
306e66f31c5Sopenharmony_ci  UV_LOGE("loop:(%{public}zu:%{public}#x) invalid", (size_t)loop, loop->magic);
307e66f31c5Sopenharmony_ci  return 0;
308e66f31c5Sopenharmony_ci}
309e66f31c5Sopenharmony_ci
310e66f31c5Sopenharmony_ci
311e66f31c5Sopenharmony_civoid on_uv_loop_close(uv_loop_t* loop) {
312e66f31c5Sopenharmony_ci  time_t t1, t2;
313e66f31c5Sopenharmony_ci  time(&t1);
314e66f31c5Sopenharmony_ci  uv_start_trace(UV_TRACE_TAG, "Get Write Lock");
315e66f31c5Sopenharmony_ci  uv_rwlock_wrlock(&g_closed_uv_loop_rwlock);
316e66f31c5Sopenharmony_ci  uv_end_trace(UV_TRACE_TAG);
317e66f31c5Sopenharmony_ci  loop->magic = ~UV_LOOP_MAGIC;
318e66f31c5Sopenharmony_ci  uv_start_trace(UV_TRACE_TAG, "Release Write Lock");
319e66f31c5Sopenharmony_ci  uv_rwlock_wrunlock(&g_closed_uv_loop_rwlock);
320e66f31c5Sopenharmony_ci  uv_end_trace(UV_TRACE_TAG);
321e66f31c5Sopenharmony_ci  time(&t2);
322e66f31c5Sopenharmony_ci  UV_LOGI("loop:(%{public}zu) closed in %{public}zds", (size_t)loop, (ssize_t)(t2 - t1));
323e66f31c5Sopenharmony_ci}
324e66f31c5Sopenharmony_ci
325e66f31c5Sopenharmony_ci
326e66f31c5Sopenharmony_cistatic void uv__cancelled(struct uv__work* w) {
327e66f31c5Sopenharmony_ci  abort();
328e66f31c5Sopenharmony_ci}
329e66f31c5Sopenharmony_ci
330e66f31c5Sopenharmony_ci
331e66f31c5Sopenharmony_ci#ifndef USE_FFRT
332e66f31c5Sopenharmony_cistatic unsigned int slow_io_work_running;
333e66f31c5Sopenharmony_ci
334e66f31c5Sopenharmony_cistatic unsigned int slow_work_thread_threshold(void) {
335e66f31c5Sopenharmony_ci  return (nthreads + 1) / 2;
336e66f31c5Sopenharmony_ci}
337e66f31c5Sopenharmony_ci
338e66f31c5Sopenharmony_ci
339e66f31c5Sopenharmony_ci/* To avoid deadlock with uv_cancel() it's crucial that the worker
340e66f31c5Sopenharmony_ci * never holds the global mutex and the loop-local mutex at the same time.
341e66f31c5Sopenharmony_ci */
342e66f31c5Sopenharmony_cistatic void worker(void* arg) {
343e66f31c5Sopenharmony_ci  struct uv__work* w;
344e66f31c5Sopenharmony_ci  struct uv__queue* q;
345e66f31c5Sopenharmony_ci  int is_slow_work;
346e66f31c5Sopenharmony_ci
347e66f31c5Sopenharmony_ci  uv_sem_post((uv_sem_t*) arg);
348e66f31c5Sopenharmony_ci  arg = NULL;
349e66f31c5Sopenharmony_ci
350e66f31c5Sopenharmony_ci  uv_mutex_lock(&mutex);
351e66f31c5Sopenharmony_ci  for (;;) {
352e66f31c5Sopenharmony_ci    /* `mutex` should always be locked at this point. */
353e66f31c5Sopenharmony_ci
354e66f31c5Sopenharmony_ci    /* Keep waiting while either no work is present or only slow I/O
355e66f31c5Sopenharmony_ci       and we're at the threshold for that. */
356e66f31c5Sopenharmony_ci    while (uv__queue_empty(&wq) ||
357e66f31c5Sopenharmony_ci           (uv__queue_head(&wq) == &run_slow_work_message &&
358e66f31c5Sopenharmony_ci            uv__queue_next(&run_slow_work_message) == &wq &&
359e66f31c5Sopenharmony_ci            slow_io_work_running >= slow_work_thread_threshold())) {
360e66f31c5Sopenharmony_ci      idle_threads += 1;
361e66f31c5Sopenharmony_ci      uv_cond_wait(&cond, &mutex);
362e66f31c5Sopenharmony_ci      idle_threads -= 1;
363e66f31c5Sopenharmony_ci    }
364e66f31c5Sopenharmony_ci
365e66f31c5Sopenharmony_ci    q = uv__queue_head(&wq);
366e66f31c5Sopenharmony_ci    if (q == &exit_message) {
367e66f31c5Sopenharmony_ci      uv_cond_signal(&cond);
368e66f31c5Sopenharmony_ci      uv_mutex_unlock(&mutex);
369e66f31c5Sopenharmony_ci      break;
370e66f31c5Sopenharmony_ci    }
371e66f31c5Sopenharmony_ci
372e66f31c5Sopenharmony_ci    uv__queue_remove(q);
373e66f31c5Sopenharmony_ci    uv__queue_init(q);  /* Signal uv_cancel() that the work req is executing. */
374e66f31c5Sopenharmony_ci
375e66f31c5Sopenharmony_ci    is_slow_work = 0;
376e66f31c5Sopenharmony_ci    if (q == &run_slow_work_message) {
377e66f31c5Sopenharmony_ci      /* If we're at the slow I/O threshold, re-schedule until after all
378e66f31c5Sopenharmony_ci         other work in the queue is done. */
379e66f31c5Sopenharmony_ci      if (slow_io_work_running >= slow_work_thread_threshold()) {
380e66f31c5Sopenharmony_ci        uv__queue_insert_tail(&wq, q);
381e66f31c5Sopenharmony_ci        continue;
382e66f31c5Sopenharmony_ci      }
383e66f31c5Sopenharmony_ci
384e66f31c5Sopenharmony_ci      /* If we encountered a request to run slow I/O work but there is none
385e66f31c5Sopenharmony_ci         to run, that means it's cancelled => Start over. */
386e66f31c5Sopenharmony_ci      if (uv__queue_empty(&slow_io_pending_wq))
387e66f31c5Sopenharmony_ci        continue;
388e66f31c5Sopenharmony_ci
389e66f31c5Sopenharmony_ci      is_slow_work = 1;
390e66f31c5Sopenharmony_ci      slow_io_work_running++;
391e66f31c5Sopenharmony_ci
392e66f31c5Sopenharmony_ci      q = uv__queue_head(&slow_io_pending_wq);
393e66f31c5Sopenharmony_ci      uv__queue_remove(q);
394e66f31c5Sopenharmony_ci      uv__queue_init(q);
395e66f31c5Sopenharmony_ci
396e66f31c5Sopenharmony_ci      /* If there is more slow I/O work, schedule it to be run as well. */
397e66f31c5Sopenharmony_ci      if (!uv__queue_empty(&slow_io_pending_wq)) {
398e66f31c5Sopenharmony_ci        uv__queue_insert_tail(&wq, &run_slow_work_message);
399e66f31c5Sopenharmony_ci        if (idle_threads > 0)
400e66f31c5Sopenharmony_ci          uv_cond_signal(&cond);
401e66f31c5Sopenharmony_ci      }
402e66f31c5Sopenharmony_ci    }
403e66f31c5Sopenharmony_ci
404e66f31c5Sopenharmony_ci    uv_mutex_unlock(&mutex);
405e66f31c5Sopenharmony_ci
406e66f31c5Sopenharmony_ci    w = uv__queue_data(q, struct uv__work, wq);
407e66f31c5Sopenharmony_ci#ifdef UV_STATISTIC
408e66f31c5Sopenharmony_ci    uv__post_statistic_work(w, WORK_EXECUTING);
409e66f31c5Sopenharmony_ci#endif
410e66f31c5Sopenharmony_ci#ifdef ASYNC_STACKTRACE
411e66f31c5Sopenharmony_ci    uv_work_t* req = container_of(w, uv_work_t, work_req);
412e66f31c5Sopenharmony_ci    LibuvSetStackId((uint64_t)req->reserved[3]);
413e66f31c5Sopenharmony_ci#endif
414e66f31c5Sopenharmony_ci    w->work(w);
415e66f31c5Sopenharmony_ci#ifdef UV_STATISTIC
416e66f31c5Sopenharmony_ci    uv__post_statistic_work(w, WORK_END);
417e66f31c5Sopenharmony_ci#endif
418e66f31c5Sopenharmony_ci    uv_mutex_lock(&w->loop->wq_mutex);
419e66f31c5Sopenharmony_ci    w->work = NULL;  /* Signal uv_cancel() that the work req is done
420e66f31c5Sopenharmony_ci                        executing. */
421e66f31c5Sopenharmony_ci    uv__queue_insert_tail(&w->loop->wq, &w->wq);
422e66f31c5Sopenharmony_ci    uv_async_send(&w->loop->wq_async);
423e66f31c5Sopenharmony_ci    uv_mutex_unlock(&w->loop->wq_mutex);
424e66f31c5Sopenharmony_ci
425e66f31c5Sopenharmony_ci    /* Lock `mutex` since that is expected at the start of the next
426e66f31c5Sopenharmony_ci     * iteration. */
427e66f31c5Sopenharmony_ci    uv_mutex_lock(&mutex);
428e66f31c5Sopenharmony_ci    if (is_slow_work) {
429e66f31c5Sopenharmony_ci      /* `slow_io_work_running` is protected by `mutex`. */
430e66f31c5Sopenharmony_ci      slow_io_work_running--;
431e66f31c5Sopenharmony_ci    }
432e66f31c5Sopenharmony_ci  }
433e66f31c5Sopenharmony_ci}
434e66f31c5Sopenharmony_ci#endif
435e66f31c5Sopenharmony_ci
436e66f31c5Sopenharmony_ci
437e66f31c5Sopenharmony_cistatic void post(struct uv__queue* q, enum uv__work_kind kind) {
438e66f31c5Sopenharmony_ci  uv_mutex_lock(&mutex);
439e66f31c5Sopenharmony_ci  if (kind == UV__WORK_SLOW_IO) {
440e66f31c5Sopenharmony_ci    /* Insert into a separate queue. */
441e66f31c5Sopenharmony_ci    uv__queue_insert_tail(&slow_io_pending_wq, q);
442e66f31c5Sopenharmony_ci    if (!uv__queue_empty(&run_slow_work_message)) {
443e66f31c5Sopenharmony_ci      /* Running slow I/O tasks is already scheduled => Nothing to do here.
444e66f31c5Sopenharmony_ci         The worker that runs said other task will schedule this one as well. */
445e66f31c5Sopenharmony_ci      uv_mutex_unlock(&mutex);
446e66f31c5Sopenharmony_ci      return;
447e66f31c5Sopenharmony_ci    }
448e66f31c5Sopenharmony_ci    q = &run_slow_work_message;
449e66f31c5Sopenharmony_ci  }
450e66f31c5Sopenharmony_ci
451e66f31c5Sopenharmony_ci  uv__queue_insert_tail(&wq, q);
452e66f31c5Sopenharmony_ci  if (idle_threads > 0)
453e66f31c5Sopenharmony_ci    uv_cond_signal(&cond);
454e66f31c5Sopenharmony_ci  uv_mutex_unlock(&mutex);
455e66f31c5Sopenharmony_ci}
456e66f31c5Sopenharmony_ci
457e66f31c5Sopenharmony_ci
458e66f31c5Sopenharmony_ci#ifdef __MVS__
459e66f31c5Sopenharmony_ci/* TODO(itodorov) - zos: revisit when Woz compiler is available. */
460e66f31c5Sopenharmony_ci__attribute__((destructor))
461e66f31c5Sopenharmony_ci#endif
462e66f31c5Sopenharmony_civoid uv__threadpool_cleanup(void) {
463e66f31c5Sopenharmony_ci  unsigned int i;
464e66f31c5Sopenharmony_ci
465e66f31c5Sopenharmony_ci  if (nthreads == 0)
466e66f31c5Sopenharmony_ci    return;
467e66f31c5Sopenharmony_ci
468e66f31c5Sopenharmony_ci#ifndef __MVS__
469e66f31c5Sopenharmony_ci  /* TODO(gabylb) - zos: revisit when Woz compiler is available. */
470e66f31c5Sopenharmony_ci  post(&exit_message, UV__WORK_CPU);
471e66f31c5Sopenharmony_ci#endif
472e66f31c5Sopenharmony_ci
473e66f31c5Sopenharmony_ci  for (i = 0; i < nthreads; i++)
474e66f31c5Sopenharmony_ci    if (uv_thread_join(threads + i))
475e66f31c5Sopenharmony_ci      abort();
476e66f31c5Sopenharmony_ci
477e66f31c5Sopenharmony_ci  if (threads != default_threads)
478e66f31c5Sopenharmony_ci    uv__free(threads);
479e66f31c5Sopenharmony_ci
480e66f31c5Sopenharmony_ci  uv_mutex_destroy(&mutex);
481e66f31c5Sopenharmony_ci  uv_cond_destroy(&cond);
482e66f31c5Sopenharmony_ci
483e66f31c5Sopenharmony_ci  threads = NULL;
484e66f31c5Sopenharmony_ci  nthreads = 0;
485e66f31c5Sopenharmony_ci#ifdef UV_STATISTIC
486e66f31c5Sopenharmony_ci  post_statistic_work(&exit_message);
487e66f31c5Sopenharmony_ci  uv_thread_join(dump_thread);
488e66f31c5Sopenharmony_ci  uv_mutex_destroy(&statistic_mutex);
489e66f31c5Sopenharmony_ci  uv_cond_destroy(&dump_cond);
490e66f31c5Sopenharmony_ci#endif
491e66f31c5Sopenharmony_ci}
492e66f31c5Sopenharmony_ci
493e66f31c5Sopenharmony_ci
494e66f31c5Sopenharmony_ci#ifndef USE_FFRT
495e66f31c5Sopenharmony_cistatic void init_threads(void) {
496e66f31c5Sopenharmony_ci  uv_thread_options_t config;
497e66f31c5Sopenharmony_ci  unsigned int i;
498e66f31c5Sopenharmony_ci  const char* val;
499e66f31c5Sopenharmony_ci  uv_sem_t sem;
500e66f31c5Sopenharmony_ci
501e66f31c5Sopenharmony_ci  nthreads = ARRAY_SIZE(default_threads);
502e66f31c5Sopenharmony_ci  val = getenv("UV_THREADPOOL_SIZE");
503e66f31c5Sopenharmony_ci  if (val != NULL)
504e66f31c5Sopenharmony_ci    nthreads = atoi(val);
505e66f31c5Sopenharmony_ci  if (nthreads == 0)
506e66f31c5Sopenharmony_ci    nthreads = 1;
507e66f31c5Sopenharmony_ci  if (nthreads > MAX_THREADPOOL_SIZE)
508e66f31c5Sopenharmony_ci    nthreads = MAX_THREADPOOL_SIZE;
509e66f31c5Sopenharmony_ci
510e66f31c5Sopenharmony_ci  threads = default_threads;
511e66f31c5Sopenharmony_ci  if (nthreads > ARRAY_SIZE(default_threads)) {
512e66f31c5Sopenharmony_ci    threads = uv__malloc(nthreads * sizeof(threads[0]));
513e66f31c5Sopenharmony_ci    if (threads == NULL) {
514e66f31c5Sopenharmony_ci      nthreads = ARRAY_SIZE(default_threads);
515e66f31c5Sopenharmony_ci      threads = default_threads;
516e66f31c5Sopenharmony_ci    }
517e66f31c5Sopenharmony_ci  }
518e66f31c5Sopenharmony_ci
519e66f31c5Sopenharmony_ci  if (uv_cond_init(&cond))
520e66f31c5Sopenharmony_ci    abort();
521e66f31c5Sopenharmony_ci
522e66f31c5Sopenharmony_ci  if (uv_mutex_init(&mutex))
523e66f31c5Sopenharmony_ci    abort();
524e66f31c5Sopenharmony_ci
525e66f31c5Sopenharmony_ci  uv__queue_init(&wq);
526e66f31c5Sopenharmony_ci  uv__queue_init(&slow_io_pending_wq);
527e66f31c5Sopenharmony_ci  uv__queue_init(&run_slow_work_message);
528e66f31c5Sopenharmony_ci
529e66f31c5Sopenharmony_ci  if (uv_sem_init(&sem, 0))
530e66f31c5Sopenharmony_ci    abort();
531e66f31c5Sopenharmony_ci
532e66f31c5Sopenharmony_ci  config.flags = UV_THREAD_HAS_STACK_SIZE;
533e66f31c5Sopenharmony_ci  config.stack_size = 8u << 20;  /* 8 MB */
534e66f31c5Sopenharmony_ci
535e66f31c5Sopenharmony_ci  for (i = 0; i < nthreads; i++)
536e66f31c5Sopenharmony_ci    if (uv_thread_create_ex(threads + i, &config, worker, &sem))
537e66f31c5Sopenharmony_ci      abort();
538e66f31c5Sopenharmony_ci
539e66f31c5Sopenharmony_ci  for (i = 0; i < nthreads; i++)
540e66f31c5Sopenharmony_ci    uv_sem_wait(&sem);
541e66f31c5Sopenharmony_ci
542e66f31c5Sopenharmony_ci  uv_sem_destroy(&sem);
543e66f31c5Sopenharmony_ci}
544e66f31c5Sopenharmony_ci
545e66f31c5Sopenharmony_ci
546e66f31c5Sopenharmony_ci#ifndef _WIN32
547e66f31c5Sopenharmony_cistatic void reset_once(void) {
548e66f31c5Sopenharmony_ci  uv_once_t child_once = UV_ONCE_INIT;
549e66f31c5Sopenharmony_ci  memcpy(&once, &child_once, sizeof(child_once));
550e66f31c5Sopenharmony_ci}
551e66f31c5Sopenharmony_ci#endif
552e66f31c5Sopenharmony_ci
553e66f31c5Sopenharmony_ci
554e66f31c5Sopenharmony_cistatic void init_once(void) {
555e66f31c5Sopenharmony_ci#ifndef _WIN32
556e66f31c5Sopenharmony_ci  /* Re-initialize the threadpool after fork.
557e66f31c5Sopenharmony_ci   * Note that this discards the global mutex and condition as well
558e66f31c5Sopenharmony_ci   * as the work queue.
559e66f31c5Sopenharmony_ci   */
560e66f31c5Sopenharmony_ci  if (pthread_atfork(NULL, NULL, &reset_once))
561e66f31c5Sopenharmony_ci    abort();
562e66f31c5Sopenharmony_ci#endif
563e66f31c5Sopenharmony_ci  init_closed_uv_loop_rwlock_once();
564e66f31c5Sopenharmony_ci#ifdef UV_STATISTIC
565e66f31c5Sopenharmony_ci  init_work_dump_queue();
566e66f31c5Sopenharmony_ci#endif
567e66f31c5Sopenharmony_ci  init_threads();
568e66f31c5Sopenharmony_ci}
569e66f31c5Sopenharmony_ci
570e66f31c5Sopenharmony_ci
571e66f31c5Sopenharmony_civoid uv__work_submit(uv_loop_t* loop,
572e66f31c5Sopenharmony_ci                     struct uv__work* w,
573e66f31c5Sopenharmony_ci                     enum uv__work_kind kind,
574e66f31c5Sopenharmony_ci                     void (*work)(struct uv__work* w),
575e66f31c5Sopenharmony_ci                     void (*done)(struct uv__work* w, int status)) {
576e66f31c5Sopenharmony_ci  uv_once(&once, init_once);
577e66f31c5Sopenharmony_ci  w->loop = loop;
578e66f31c5Sopenharmony_ci  w->work = work;
579e66f31c5Sopenharmony_ci  w->done = done;
580e66f31c5Sopenharmony_ci  post(&w->wq, kind);
581e66f31c5Sopenharmony_ci}
582e66f31c5Sopenharmony_ci#endif
583e66f31c5Sopenharmony_ci
584e66f31c5Sopenharmony_ci
585e66f31c5Sopenharmony_ci#ifdef USE_FFRT
586e66f31c5Sopenharmony_cistatic void uv__task_done_wrapper(void* work, int status) {
587e66f31c5Sopenharmony_ci  struct uv__work* w = (struct uv__work*)work;
588e66f31c5Sopenharmony_ci  w->done(w, status);
589e66f31c5Sopenharmony_ci}
590e66f31c5Sopenharmony_ci#endif
591e66f31c5Sopenharmony_ci
592e66f31c5Sopenharmony_ci
593e66f31c5Sopenharmony_ci/* TODO(bnoordhuis) teach libuv how to cancel file operations
594e66f31c5Sopenharmony_ci * that go through io_uring instead of the thread pool.
595e66f31c5Sopenharmony_ci */
596e66f31c5Sopenharmony_cistatic int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
597e66f31c5Sopenharmony_ci  int cancelled;
598e66f31c5Sopenharmony_ci
599e66f31c5Sopenharmony_ci  rdlock_closed_uv_loop_rwlock();
600e66f31c5Sopenharmony_ci  if (!is_uv_loop_good_magic(w->loop)) {
601e66f31c5Sopenharmony_ci    rdunlock_closed_uv_loop_rwlock();
602e66f31c5Sopenharmony_ci    return 0;
603e66f31c5Sopenharmony_ci  }
604e66f31c5Sopenharmony_ci
605e66f31c5Sopenharmony_ci#ifndef USE_FFRT
606e66f31c5Sopenharmony_ci  uv_mutex_lock(&mutex);
607e66f31c5Sopenharmony_ci  uv_mutex_lock(&w->loop->wq_mutex);
608e66f31c5Sopenharmony_ci
609e66f31c5Sopenharmony_ci  cancelled = !uv__queue_empty(&w->wq) && w->work != NULL;
610e66f31c5Sopenharmony_ci  if (cancelled)
611e66f31c5Sopenharmony_ci    uv__queue_remove(&w->wq);
612e66f31c5Sopenharmony_ci
613e66f31c5Sopenharmony_ci  uv_mutex_unlock(&w->loop->wq_mutex);
614e66f31c5Sopenharmony_ci  uv_mutex_unlock(&mutex);
615e66f31c5Sopenharmony_ci#else
616e66f31c5Sopenharmony_ci  uv_mutex_lock(&w->loop->wq_mutex);
617e66f31c5Sopenharmony_ci  cancelled = !uv__queue_empty(&w->wq) && w->work != NULL
618e66f31c5Sopenharmony_ci    && ffrt_executor_task_cancel(w, (ffrt_qos_t)(intptr_t)req->reserved[0]);
619e66f31c5Sopenharmony_ci  uv_mutex_unlock(&w->loop->wq_mutex);
620e66f31c5Sopenharmony_ci#endif
621e66f31c5Sopenharmony_ci
622e66f31c5Sopenharmony_ci  if (!cancelled) {
623e66f31c5Sopenharmony_ci    rdunlock_closed_uv_loop_rwlock();
624e66f31c5Sopenharmony_ci    return UV_EBUSY;
625e66f31c5Sopenharmony_ci  }
626e66f31c5Sopenharmony_ci
627e66f31c5Sopenharmony_ci  w->work = uv__cancelled;
628e66f31c5Sopenharmony_ci  uv_mutex_lock(&loop->wq_mutex);
629e66f31c5Sopenharmony_ci#ifndef USE_FFRT
630e66f31c5Sopenharmony_ci  uv__queue_insert_tail(&loop->wq, &w->wq);
631e66f31c5Sopenharmony_ci  uv_async_send(&loop->wq_async);
632e66f31c5Sopenharmony_ci#else
633e66f31c5Sopenharmony_ci  uv__loop_internal_fields_t* lfields = uv__get_internal_fields(w->loop);
634e66f31c5Sopenharmony_ci  int qos = (ffrt_qos_t)(intptr_t)req->reserved[0];
635e66f31c5Sopenharmony_ci
636e66f31c5Sopenharmony_ci  if (uv_check_data_valid((struct uv_loop_data*)(w->loop->data)) == 0) {
637e66f31c5Sopenharmony_ci    int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
638e66f31c5Sopenharmony_ci    struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)w->loop->data -
639e66f31c5Sopenharmony_ci      (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS));
640e66f31c5Sopenharmony_ci    addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos);
641e66f31c5Sopenharmony_ci  } else {
642e66f31c5Sopenharmony_ci    uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
643e66f31c5Sopenharmony_ci    uv_async_send(&loop->wq_async);
644e66f31c5Sopenharmony_ci  }
645e66f31c5Sopenharmony_ci#endif
646e66f31c5Sopenharmony_ci  uv_mutex_unlock(&loop->wq_mutex);
647e66f31c5Sopenharmony_ci  rdunlock_closed_uv_loop_rwlock();
648e66f31c5Sopenharmony_ci
649e66f31c5Sopenharmony_ci  return 0;
650e66f31c5Sopenharmony_ci}
651e66f31c5Sopenharmony_ci
652e66f31c5Sopenharmony_ci
653e66f31c5Sopenharmony_civoid uv__work_done(uv_async_t* handle) {
654e66f31c5Sopenharmony_ci  struct uv__work* w;
655e66f31c5Sopenharmony_ci  uv_loop_t* loop;
656e66f31c5Sopenharmony_ci  struct uv__queue* q;
657e66f31c5Sopenharmony_ci  struct uv__queue wq;
658e66f31c5Sopenharmony_ci  int err;
659e66f31c5Sopenharmony_ci  int nevents;
660e66f31c5Sopenharmony_ci
661e66f31c5Sopenharmony_ci  loop = container_of(handle, uv_loop_t, wq_async);
662e66f31c5Sopenharmony_ci  rdlock_closed_uv_loop_rwlock();
663e66f31c5Sopenharmony_ci  if (!is_uv_loop_good_magic(loop)) {
664e66f31c5Sopenharmony_ci    rdunlock_closed_uv_loop_rwlock();
665e66f31c5Sopenharmony_ci    return;
666e66f31c5Sopenharmony_ci  }
667e66f31c5Sopenharmony_ci  rdunlock_closed_uv_loop_rwlock();
668e66f31c5Sopenharmony_ci
669e66f31c5Sopenharmony_ci  uv_mutex_lock(&loop->wq_mutex);
670e66f31c5Sopenharmony_ci#ifndef USE_FFRT
671e66f31c5Sopenharmony_ci  uv__queue_move(&loop->wq, &wq);
672e66f31c5Sopenharmony_ci#else
673e66f31c5Sopenharmony_ci  uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
674e66f31c5Sopenharmony_ci  int i;
675e66f31c5Sopenharmony_ci  uv__queue_init(&wq);
676e66f31c5Sopenharmony_ci  for (i = 3; i >= 0; i--) {
677e66f31c5Sopenharmony_ci    if (!uv__queue_empty(&lfields->wq_sub[i])) {
678e66f31c5Sopenharmony_ci      uv__queue_append(&lfields->wq_sub[i], &wq);
679e66f31c5Sopenharmony_ci    }
680e66f31c5Sopenharmony_ci  }
681e66f31c5Sopenharmony_ci#endif
682e66f31c5Sopenharmony_ci  uv_mutex_unlock(&loop->wq_mutex);
683e66f31c5Sopenharmony_ci
684e66f31c5Sopenharmony_ci  nevents = 0;
685e66f31c5Sopenharmony_ci  uv_start_trace(UV_TRACE_TAG, UV_TRACE_NAME);
686e66f31c5Sopenharmony_ci  while (!uv__queue_empty(&wq)) {
687e66f31c5Sopenharmony_ci    q = uv__queue_head(&wq);
688e66f31c5Sopenharmony_ci    uv__queue_remove(q);
689e66f31c5Sopenharmony_ci
690e66f31c5Sopenharmony_ci    w = container_of(q, struct uv__work, wq);
691e66f31c5Sopenharmony_ci    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
692e66f31c5Sopenharmony_ci#ifdef UV_STATISTIC
693e66f31c5Sopenharmony_ci    uv__post_statistic_work(w, DONE_EXECUTING);
694e66f31c5Sopenharmony_ci    struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
695e66f31c5Sopenharmony_ci    if (dump_work == NULL) {
696e66f31c5Sopenharmony_ci      UV_LOGE("malloc(%{public}zu) failed: %{public}s", sizeof(struct uv__statistic_work), strerror(errno));
697e66f31c5Sopenharmony_ci      break;
698e66f31c5Sopenharmony_ci    }
699e66f31c5Sopenharmony_ci    dump_work->info = w->info;
700e66f31c5Sopenharmony_ci    dump_work->work = uv__update_work_info;
701e66f31c5Sopenharmony_ci#endif
702e66f31c5Sopenharmony_ci#ifdef ASYNC_STACKTRACE
703e66f31c5Sopenharmony_ci    uv_work_t* req = container_of(w, uv_work_t, work_req);
704e66f31c5Sopenharmony_ci    LibuvSetStackId((uint64_t)req->reserved[3]);
705e66f31c5Sopenharmony_ci#endif
706e66f31c5Sopenharmony_ci    w->done(w, err);
707e66f31c5Sopenharmony_ci    nevents++;
708e66f31c5Sopenharmony_ci#ifdef UV_STATISTIC
709e66f31c5Sopenharmony_ci    dump_work->time = uv__now_timestamp();
710e66f31c5Sopenharmony_ci    dump_work->state = DONE_END;
711e66f31c5Sopenharmony_ci    QUEUE_INIT(&dump_work->wq);
712e66f31c5Sopenharmony_ci    post_statistic_work(&dump_work->wq);
713e66f31c5Sopenharmony_ci#endif
714e66f31c5Sopenharmony_ci  }
715e66f31c5Sopenharmony_ci  uv_end_trace(UV_TRACE_TAG);
716e66f31c5Sopenharmony_ci
717e66f31c5Sopenharmony_ci  /* This check accomplishes 2 things:
718e66f31c5Sopenharmony_ci   * 1. Even if the queue was empty, the call to uv__work_done() should count
719e66f31c5Sopenharmony_ci   *    as an event. Which will have been added by the event loop when
720e66f31c5Sopenharmony_ci   *    calling this callback.
721e66f31c5Sopenharmony_ci   * 2. Prevents accidental wrap around in case nevents == 0 events == 0.
722e66f31c5Sopenharmony_ci   */
723e66f31c5Sopenharmony_ci  if (nevents > 1) {
724e66f31c5Sopenharmony_ci    /* Subtract 1 to counter the call to uv__work_done(). */
725e66f31c5Sopenharmony_ci    uv__metrics_inc_events(loop, nevents - 1);
726e66f31c5Sopenharmony_ci    if (uv__get_internal_fields(loop)->current_timeout == 0)
727e66f31c5Sopenharmony_ci      uv__metrics_inc_events_waiting(loop, nevents - 1);
728e66f31c5Sopenharmony_ci  }
729e66f31c5Sopenharmony_ci}
730e66f31c5Sopenharmony_ci
731e66f31c5Sopenharmony_ci
732e66f31c5Sopenharmony_cistatic void uv__queue_work(struct uv__work* w) {
733e66f31c5Sopenharmony_ci  uv_work_t* req = container_of(w, uv_work_t, work_req);
734e66f31c5Sopenharmony_ci
735e66f31c5Sopenharmony_ci  req->work_cb(req);
736e66f31c5Sopenharmony_ci}
737e66f31c5Sopenharmony_ci
738e66f31c5Sopenharmony_ci
739e66f31c5Sopenharmony_cistatic void uv__queue_done(struct uv__work* w, int err) {
740e66f31c5Sopenharmony_ci  uv_work_t* req;
741e66f31c5Sopenharmony_ci
742e66f31c5Sopenharmony_ci  if (w == NULL) {
743e66f31c5Sopenharmony_ci    UV_LOGE("uv_work_t is NULL");
744e66f31c5Sopenharmony_ci    return;
745e66f31c5Sopenharmony_ci  }
746e66f31c5Sopenharmony_ci
747e66f31c5Sopenharmony_ci  req = container_of(w, uv_work_t, work_req);
748e66f31c5Sopenharmony_ci  uv__req_unregister(req->loop, req);
749e66f31c5Sopenharmony_ci
750e66f31c5Sopenharmony_ci  if (req->after_work_cb == NULL)
751e66f31c5Sopenharmony_ci    return;
752e66f31c5Sopenharmony_ci
753e66f31c5Sopenharmony_ci  req->after_work_cb(req, err);
754e66f31c5Sopenharmony_ci}
755e66f31c5Sopenharmony_ci
756e66f31c5Sopenharmony_ci
757e66f31c5Sopenharmony_ci#ifdef USE_FFRT
758e66f31c5Sopenharmony_civoid uv__ffrt_work(ffrt_executor_task_t* data, ffrt_qos_t qos)
759e66f31c5Sopenharmony_ci{
760e66f31c5Sopenharmony_ci  struct uv__work* w = (struct uv__work *)data;
761e66f31c5Sopenharmony_ci  uv_loop_t* loop = w->loop;
762e66f31c5Sopenharmony_ci#ifdef UV_STATISTIC
763e66f31c5Sopenharmony_ci  uv__post_statistic_work(w, WORK_EXECUTING);
764e66f31c5Sopenharmony_ci#endif
765e66f31c5Sopenharmony_ci  uv_work_t* req = container_of(w, uv_work_t, work_req);
766e66f31c5Sopenharmony_ci#ifdef ASYNC_STACKTRACE
767e66f31c5Sopenharmony_ci  LibuvSetStackId((uint64_t)req->reserved[3]);
768e66f31c5Sopenharmony_ci#endif
769e66f31c5Sopenharmony_ci  w->work(w);
770e66f31c5Sopenharmony_ci#ifdef UV_STATISTIC
771e66f31c5Sopenharmony_ci  uv__post_statistic_work(w, WORK_END);
772e66f31c5Sopenharmony_ci#endif
773e66f31c5Sopenharmony_ci  rdlock_closed_uv_loop_rwlock();
774e66f31c5Sopenharmony_ci  if (loop->magic != UV_LOOP_MAGIC) {
775e66f31c5Sopenharmony_ci    rdunlock_closed_uv_loop_rwlock();
776e66f31c5Sopenharmony_ci    UV_LOGE("uv_loop(%{public}zu:%{public}#x) in task(%p:%p) is invalid",
777e66f31c5Sopenharmony_ci            (size_t)loop, loop->magic, req->work_cb, req->after_work_cb);
778e66f31c5Sopenharmony_ci    return;
779e66f31c5Sopenharmony_ci  }
780e66f31c5Sopenharmony_ci
781e66f31c5Sopenharmony_ci  uv_mutex_lock(&loop->wq_mutex);
782e66f31c5Sopenharmony_ci  w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */
783e66f31c5Sopenharmony_ci
784e66f31c5Sopenharmony_ci  if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) == 0) {
785e66f31c5Sopenharmony_ci    int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
786e66f31c5Sopenharmony_ci    struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)loop->data -
787e66f31c5Sopenharmony_ci      (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS));
788e66f31c5Sopenharmony_ci    addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos);
789e66f31c5Sopenharmony_ci  } else {
790e66f31c5Sopenharmony_ci    uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
791e66f31c5Sopenharmony_ci    uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
792e66f31c5Sopenharmony_ci    uv_async_send(&loop->wq_async);
793e66f31c5Sopenharmony_ci  }
794e66f31c5Sopenharmony_ci  uv_mutex_unlock(&loop->wq_mutex);
795e66f31c5Sopenharmony_ci  rdunlock_closed_uv_loop_rwlock();
796e66f31c5Sopenharmony_ci}
797e66f31c5Sopenharmony_ci
798e66f31c5Sopenharmony_cistatic void init_once(void)
799e66f31c5Sopenharmony_ci{
800e66f31c5Sopenharmony_ci  init_closed_uv_loop_rwlock_once();
801e66f31c5Sopenharmony_ci  /* init uv work statics queue */
802e66f31c5Sopenharmony_ci#ifdef UV_STATISTIC
803e66f31c5Sopenharmony_ci  init_work_dump_queue();
804e66f31c5Sopenharmony_ci#endif
805e66f31c5Sopenharmony_ci  ffrt_executor_task_register_func(uv__ffrt_work, ffrt_uv_task);
806e66f31c5Sopenharmony_ci}
807e66f31c5Sopenharmony_ci
808e66f31c5Sopenharmony_ci
809e66f31c5Sopenharmony_ci/* ffrt uv__work_submit */
810e66f31c5Sopenharmony_civoid uv__work_submit(uv_loop_t* loop,
811e66f31c5Sopenharmony_ci                     uv_req_t* req,
812e66f31c5Sopenharmony_ci                     struct uv__work* w,
813e66f31c5Sopenharmony_ci                     enum uv__work_kind kind,
814e66f31c5Sopenharmony_ci                     void (*work)(struct uv__work *w),
815e66f31c5Sopenharmony_ci                     void (*done)(struct uv__work *w, int status)) {
816e66f31c5Sopenharmony_ci  uv_once(&once, init_once);
817e66f31c5Sopenharmony_ci  ffrt_task_attr_t attr;
818e66f31c5Sopenharmony_ci  ffrt_task_attr_init(&attr);
819e66f31c5Sopenharmony_ci
820e66f31c5Sopenharmony_ci  switch(kind) {
821e66f31c5Sopenharmony_ci    case UV__WORK_CPU:
822e66f31c5Sopenharmony_ci      ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
823e66f31c5Sopenharmony_ci      break;
824e66f31c5Sopenharmony_ci    case UV__WORK_FAST_IO:
825e66f31c5Sopenharmony_ci      ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
826e66f31c5Sopenharmony_ci      break;
827e66f31c5Sopenharmony_ci    case UV__WORK_SLOW_IO:
828e66f31c5Sopenharmony_ci      ffrt_task_attr_set_qos(&attr, ffrt_qos_background);
829e66f31c5Sopenharmony_ci      break;
830e66f31c5Sopenharmony_ci    default:
831e66f31c5Sopenharmony_ci#ifdef USE_OHOS_DFX
832e66f31c5Sopenharmony_ci      UV_LOGI("Unknown work kind");
833e66f31c5Sopenharmony_ci#endif
834e66f31c5Sopenharmony_ci      return;
835e66f31c5Sopenharmony_ci  }
836e66f31c5Sopenharmony_ci
837e66f31c5Sopenharmony_ci  w->loop = loop;
838e66f31c5Sopenharmony_ci  w->work = work;
839e66f31c5Sopenharmony_ci  w->done = done;
840e66f31c5Sopenharmony_ci
841e66f31c5Sopenharmony_ci  req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
842e66f31c5Sopenharmony_ci  ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
843e66f31c5Sopenharmony_ci  ffrt_task_attr_destroy(&attr);
844e66f31c5Sopenharmony_ci}
845e66f31c5Sopenharmony_ci
846e66f31c5Sopenharmony_ci
847e66f31c5Sopenharmony_ci/* ffrt uv__work_submit */
848e66f31c5Sopenharmony_civoid uv__work_submit_with_qos(uv_loop_t* loop,
849e66f31c5Sopenharmony_ci                     uv_req_t* req,
850e66f31c5Sopenharmony_ci                     struct uv__work* w,
851e66f31c5Sopenharmony_ci                     ffrt_qos_t qos,
852e66f31c5Sopenharmony_ci                     void (*work)(struct uv__work *w),
853e66f31c5Sopenharmony_ci                     void (*done)(struct uv__work *w, int status)) {
854e66f31c5Sopenharmony_ci    uv_once(&once, init_once);
855e66f31c5Sopenharmony_ci    ffrt_task_attr_t attr;
856e66f31c5Sopenharmony_ci    ffrt_task_attr_init(&attr);
857e66f31c5Sopenharmony_ci    ffrt_task_attr_set_qos(&attr, qos);
858e66f31c5Sopenharmony_ci
859e66f31c5Sopenharmony_ci    w->loop = loop;
860e66f31c5Sopenharmony_ci    w->work = work;
861e66f31c5Sopenharmony_ci    w->done = done;
862e66f31c5Sopenharmony_ci
863e66f31c5Sopenharmony_ci    req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
864e66f31c5Sopenharmony_ci    ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
865e66f31c5Sopenharmony_ci    ffrt_task_attr_destroy(&attr);
866e66f31c5Sopenharmony_ci}
867e66f31c5Sopenharmony_ci#endif
868e66f31c5Sopenharmony_ci
869e66f31c5Sopenharmony_ci
870e66f31c5Sopenharmony_ciint uv_queue_work(uv_loop_t* loop,
871e66f31c5Sopenharmony_ci                  uv_work_t* req,
872e66f31c5Sopenharmony_ci                  uv_work_cb work_cb,
873e66f31c5Sopenharmony_ci                  uv_after_work_cb after_work_cb) {
874e66f31c5Sopenharmony_ci  if (work_cb == NULL)
875e66f31c5Sopenharmony_ci    return UV_EINVAL;
876e66f31c5Sopenharmony_ci
877e66f31c5Sopenharmony_ci  uv__req_init(loop, req, UV_WORK);
878e66f31c5Sopenharmony_ci  req->loop = loop;
879e66f31c5Sopenharmony_ci  req->work_cb = work_cb;
880e66f31c5Sopenharmony_ci  req->after_work_cb = after_work_cb;
881e66f31c5Sopenharmony_ci
882e66f31c5Sopenharmony_ci#ifdef UV_STATISTIC
883e66f31c5Sopenharmony_ci  struct uv_work_dump_info* info = (struct uv_work_dump_info*) malloc(sizeof(struct uv_work_dump_info));
884e66f31c5Sopenharmony_ci  if (info == NULL) {
885e66f31c5Sopenharmony_ci    abort();
886e66f31c5Sopenharmony_ci  }
887e66f31c5Sopenharmony_ci  uv_init_dump_info(info, &req->work_req);
888e66f31c5Sopenharmony_ci  info->builtin_return_address[0] = __builtin_return_address(0);
889e66f31c5Sopenharmony_ci  info->builtin_return_address[1] = __builtin_return_address(1);
890e66f31c5Sopenharmony_ci  info->builtin_return_address[2] = __builtin_return_address(2);
891e66f31c5Sopenharmony_ci  (req->work_req).info = info;
892e66f31c5Sopenharmony_ci#endif
893e66f31c5Sopenharmony_ci#ifdef ASYNC_STACKTRACE
894e66f31c5Sopenharmony_ci  req->reserved[3] = (void*)LibuvCollectAsyncStack();
895e66f31c5Sopenharmony_ci#endif
896e66f31c5Sopenharmony_ci  uv__work_submit(loop,
897e66f31c5Sopenharmony_ci#ifdef USE_FFRT
898e66f31c5Sopenharmony_ci                  (uv_req_t*)req,
899e66f31c5Sopenharmony_ci#endif
900e66f31c5Sopenharmony_ci                  &req->work_req,
901e66f31c5Sopenharmony_ci                  UV__WORK_CPU,
902e66f31c5Sopenharmony_ci                  uv__queue_work,
903e66f31c5Sopenharmony_ci                  uv__queue_done
904e66f31c5Sopenharmony_ci);
905e66f31c5Sopenharmony_ci#ifdef UV_STATISTIC
906e66f31c5Sopenharmony_ci  uv_queue_statics(info);
907e66f31c5Sopenharmony_ci#endif
908e66f31c5Sopenharmony_ci  return 0;
909e66f31c5Sopenharmony_ci}
910e66f31c5Sopenharmony_ci
911e66f31c5Sopenharmony_ci
912e66f31c5Sopenharmony_ciint uv_queue_work_with_qos(uv_loop_t* loop,
913e66f31c5Sopenharmony_ci                  uv_work_t* req,
914e66f31c5Sopenharmony_ci                  uv_work_cb work_cb,
915e66f31c5Sopenharmony_ci                  uv_after_work_cb after_work_cb,
916e66f31c5Sopenharmony_ci                  uv_qos_t qos) {
917e66f31c5Sopenharmony_ci#ifdef USE_FFRT
918e66f31c5Sopenharmony_ci  if (work_cb == NULL)
919e66f31c5Sopenharmony_ci    return UV_EINVAL;
920e66f31c5Sopenharmony_ci
921e66f31c5Sopenharmony_ci  STATIC_ASSERT(uv_qos_background == ffrt_qos_background);
922e66f31c5Sopenharmony_ci  STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility);
923e66f31c5Sopenharmony_ci  STATIC_ASSERT(uv_qos_default == ffrt_qos_default);
924e66f31c5Sopenharmony_ci  STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated);
925e66f31c5Sopenharmony_ci  if (qos < ffrt_qos_background || qos > ffrt_qos_user_initiated) {
926e66f31c5Sopenharmony_ci    return UV_EINVAL;
927e66f31c5Sopenharmony_ci  }
928e66f31c5Sopenharmony_ci
929e66f31c5Sopenharmony_ci  uv__req_init(loop, req, UV_WORK);
930e66f31c5Sopenharmony_ci  req->loop = loop;
931e66f31c5Sopenharmony_ci  req->work_cb = work_cb;
932e66f31c5Sopenharmony_ci  req->after_work_cb = after_work_cb;
933e66f31c5Sopenharmony_ci#ifdef UV_STATISTIC
934e66f31c5Sopenharmony_ci  struct uv_work_dump_info* info = (struct uv_work_dump_info*)malloc(sizeof(struct uv_work_dump_info));
935e66f31c5Sopenharmony_ci  if (info == NULL) {
936e66f31c5Sopenharmony_ci    abort();
937e66f31c5Sopenharmony_ci  }
938e66f31c5Sopenharmony_ci  uv_init_dump_info(info, &req->work_req);
939e66f31c5Sopenharmony_ci  info->builtin_return_address[0] = __builtin_return_address(0);
940e66f31c5Sopenharmony_ci  info->builtin_return_address[1] = __builtin_return_address(1);
941e66f31c5Sopenharmony_ci  info->builtin_return_address[2] = __builtin_return_address(2);
942e66f31c5Sopenharmony_ci  (req->work_req).info = info;
943e66f31c5Sopenharmony_ci#endif
944e66f31c5Sopenharmony_ci  uv__work_submit_with_qos(loop,
945e66f31c5Sopenharmony_ci                  (uv_req_t*)req,
946e66f31c5Sopenharmony_ci                  &req->work_req,
947e66f31c5Sopenharmony_ci                  (ffrt_qos_t)qos,
948e66f31c5Sopenharmony_ci                  uv__queue_work,
949e66f31c5Sopenharmony_ci                  uv__queue_done);
950e66f31c5Sopenharmony_ci#ifdef UV_STATISTIC
951e66f31c5Sopenharmony_ci  uv_queue_statics(info);
952e66f31c5Sopenharmony_ci#endif
953e66f31c5Sopenharmony_ci  return 0;
954e66f31c5Sopenharmony_ci#else
955e66f31c5Sopenharmony_ci  return uv_queue_work(loop, req, work_cb, after_work_cb);
956e66f31c5Sopenharmony_ci#endif
957e66f31c5Sopenharmony_ci}
958e66f31c5Sopenharmony_ci
959e66f31c5Sopenharmony_ci
960e66f31c5Sopenharmony_ciint uv_cancel(uv_req_t* req) {
961e66f31c5Sopenharmony_ci  struct uv__work* wreq;
962e66f31c5Sopenharmony_ci  uv_loop_t* loop;
963e66f31c5Sopenharmony_ci
964e66f31c5Sopenharmony_ci  switch (req->type) {
965e66f31c5Sopenharmony_ci  case UV_FS:
966e66f31c5Sopenharmony_ci    loop =  ((uv_fs_t*) req)->loop;
967e66f31c5Sopenharmony_ci    wreq = &((uv_fs_t*) req)->work_req;
968e66f31c5Sopenharmony_ci    break;
969e66f31c5Sopenharmony_ci  case UV_GETADDRINFO:
970e66f31c5Sopenharmony_ci    loop =  ((uv_getaddrinfo_t*) req)->loop;
971e66f31c5Sopenharmony_ci    wreq = &((uv_getaddrinfo_t*) req)->work_req;
972e66f31c5Sopenharmony_ci    break;
973e66f31c5Sopenharmony_ci  case UV_GETNAMEINFO:
974e66f31c5Sopenharmony_ci    loop = ((uv_getnameinfo_t*) req)->loop;
975e66f31c5Sopenharmony_ci    wreq = &((uv_getnameinfo_t*) req)->work_req;
976e66f31c5Sopenharmony_ci    break;
977e66f31c5Sopenharmony_ci  case UV_RANDOM:
978e66f31c5Sopenharmony_ci    loop = ((uv_random_t*) req)->loop;
979e66f31c5Sopenharmony_ci    wreq = &((uv_random_t*) req)->work_req;
980e66f31c5Sopenharmony_ci    break;
981e66f31c5Sopenharmony_ci  case UV_WORK:
982e66f31c5Sopenharmony_ci    loop =  ((uv_work_t*) req)->loop;
983e66f31c5Sopenharmony_ci    wreq = &((uv_work_t*) req)->work_req;
984e66f31c5Sopenharmony_ci    break;
985e66f31c5Sopenharmony_ci  default:
986e66f31c5Sopenharmony_ci    return UV_EINVAL;
987e66f31c5Sopenharmony_ci  }
988e66f31c5Sopenharmony_ci
989e66f31c5Sopenharmony_ci  return uv__work_cancel(loop, req, wreq);
990e66f31c5Sopenharmony_ci}
991