1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22#include "uv-common.h" 23#include "uv_log.h" 24#include "uv_trace.h" 25 26#if !defined(_WIN32) 27# include "unix/internal.h" 28#else 29# include "win/internal.h" 30#endif 31 32#include <stdlib.h> 33#ifdef USE_FFRT 34#include <assert.h> 35#include "ffrt_inner.h" 36#endif 37#include <stdio.h> 38#ifdef ASYNC_STACKTRACE 39#include "dfx/async_stack/libuv_async_stack.h" 40#endif 41 42#define MAX_THREADPOOL_SIZE 1024 43#define UV_TRACE_NAME "UV_TRACE" 44 45static uv_rwlock_t g_closed_uv_loop_rwlock; 46static uv_once_t once = UV_ONCE_INIT; 47static uv_cond_t cond; 48static uv_mutex_t mutex; 49static unsigned int idle_threads; 50static unsigned int nthreads; 51static uv_thread_t* threads; 52static uv_thread_t default_threads[4]; 53static struct uv__queue exit_message; 54static struct uv__queue wq; 55static struct uv__queue run_slow_work_message; 56static struct uv__queue slow_io_pending_wq; 57 58 59#ifdef UV_STATISTIC 60#define MAX_DUMP_QUEUE_SIZE 200 61static uv_mutex_t dump_queue_mutex; 62static QUEUE dump_queue; 63static unsigned int dump_queue_size; 64 65static int statistic_idle; 66static uv_mutex_t statistic_mutex; 67static QUEUE statistic_works; 68static uv_cond_t dump_cond; 69static uv_thread_t dump_thread; 70 71static void uv_dump_worker(void* arg) { 72 struct uv__statistic_work* w; 73 struct uv__queue* q; 74 uv_sem_post((uv_sem_t*) arg); 75 arg = NULL; 76 uv_mutex_lock(&statistic_mutex); 77 for (;;) { 78 while (uv__queue_empty(&statistic_works)) { 79 statistic_idle = 1; 80 uv_cond_wait(&dump_cond, &statistic_mutex); 81 statistic_idle = 0; 82 } 83 q = uv__queue_head(&statistic_works); 84 if (q == &exit_message) { 85 uv_cond_signal(&dump_cond); 86 uv_mutex_unlock(&statistic_mutex); 87 break; 88 } 89 uv__queue_remove(q); 90 uv__queue_init(q); 91 uv_mutex_unlock(&statistic_mutex); 92 w = uv__queue_data(q, struct uv__statistic_work, wq); 93 w->work(w); 94 free(w); 95 uv_mutex_lock(&statistic_mutex); 96 } 97} 98 99static void post_statistic_work(struct uv__queue* q) { 100 uv_mutex_lock(&statistic_mutex); 101 uv__queue_insert_tail(&statistic_works, q); 102 if (statistic_idle) 103 uv_cond_signal(&dump_cond); 104 uv_mutex_unlock(&statistic_mutex); 105} 106 107static void uv__queue_work_info(struct uv__statistic_work *work) { 108 uv_mutex_lock(&dump_queue_mutex); 109 if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) { /* release works already done */ 110 struct uv__queue* q; 111 uv__queue_foreach(q, &dump_queue) { 112 struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq); 113 if (info->state == DONE_END) { 114 uv__queue_remove(q); 115 free(info); 116 dump_queue_size--; 117 } 118 } 119 if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) { 120 abort(); /* too many works not done. */ 121 } 122 } 123 124 uv__queue_insert_head(&dump_queue, &work->info->wq); 125 dump_queue_size++; 126 uv_mutex_unlock(&dump_queue_mutex); 127} 128 129static void uv__update_work_info(struct uv__statistic_work *work) { 130 uv_mutex_lock(&dump_queue_mutex); 131 if (work != NULL && work->info != NULL) { 132 work->info->state = work->state; 133 switch (work->state) { 134 case WAITING: 135 work->info->queue_time = work->time; 136 break; 137 case WORK_EXECUTING: 138 work->info->execute_start_time = work->time; 139 break; 140 case WORK_END: 141 work->info->execute_end_time = work->time; 142 break; 143 case DONE_EXECUTING: 144 work->info->done_start_time = work->time; 145 break; 146 case DONE_END: 147 work->info->done_end_time = work->time; 148 break; 149 default: 150 break; 151 } 152 } 153 uv_mutex_unlock(&dump_queue_mutex); 154} 155 156 157// return the timestamp in millisecond 158static uint64_t uv__now_timestamp() { 159 uv_timeval64_t tv; 160 int r = uv_gettimeofday(&tv); 161 if (r != 0) { 162 return 0; 163 } 164 return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000; 165} 166 167 168static void uv__post_statistic_work(struct uv__work *w, enum uv_work_state state) { 169 struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work)); 170 if (dump_work == NULL) { 171 return; 172 } 173 dump_work->info = w->info; 174 dump_work->work = uv__update_work_info; 175 dump_work->time = uv__now_timestamp(); 176 dump_work->state = state; 177 uv__queue_init(&dump_work->wq); 178 post_statistic_work(&dump_work->wq); 179} 180 181 182static void init_work_dump_queue() 183{ 184 if (uv_mutex_init(&dump_queue_mutex)) 185 abort(); 186 uv_mutex_lock(&dump_queue_mutex); 187 uv__queue_init(&dump_queue); 188 dump_queue_size = 0; 189 uv_mutex_unlock(&dump_queue_mutex); 190 191 /* init dump thread */ 192 statistic_idle = 1; 193 if (uv_mutex_init(&statistic_mutex)) 194 abort(); 195 uv__queue_init(&statistic_works); 196 uv_sem_t sem; 197 if (uv_cond_init(&dump_cond)) 198 abort(); 199 if (uv_sem_init(&sem, 0)) 200 abort(); 201 if (uv_thread_create(&dump_thread, uv_dump_worker, &sem)) 202 abort(); 203 uv_sem_wait(&sem); 204 uv_sem_destroy(&sem); 205} 206 207 208void uv_init_dump_info(struct uv_work_dump_info* info, struct uv__work* w) { 209 if (info == NULL) 210 return; 211 info->queue_time = 0; 212 info->state = WAITING; 213 info->execute_start_time = 0; 214 info->execute_end_time = 0; 215 info->done_start_time = 0; 216 info->done_end_time = 0; 217 info->work = w; 218 uv__queue_init(&info->wq); 219} 220 221 222void uv_queue_statics(struct uv_work_dump_info* info) { 223 struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work)); 224 if (dump_work == NULL) { 225 abort(); 226 } 227 dump_work->info = info; 228 dump_work->work = uv__queue_work_info; 229 info->queue_time = uv__now_timestamp(); 230 dump_work->state = WAITING; 231 uv__queue_init(&dump_work->wq); 232 post_statistic_work(&dump_work->wq); 233} 234 235 236uv_worker_info_t* uv_dump_work_queue(int* size) { 237#ifdef UV_STATISTIC 238 uv_mutex_lock(&dump_queue_mutex); 239 if (uv__queue_empty(&dump_queue)) { 240 return NULL; 241 } 242 *size = dump_queue_size; 243 uv_worker_info_t* dump_info = (uv_worker_info_t*) malloc(sizeof(uv_worker_info_t) * dump_queue_size); 244 struct uv__queue* q; 245 int i = 0; 246 uv__queue_foreach(q, &dump_queue) { 247 struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq); 248 dump_info[i].queue_time = info->queue_time; 249 dump_info[i].builtin_return_address[0] = info->builtin_return_address[0]; 250 dump_info[i].builtin_return_address[1] = info->builtin_return_address[1]; 251 dump_info[i].builtin_return_address[2] = info->builtin_return_address[2]; 252 switch (info->state) { 253 case WAITING: 254 strcpy(dump_info[i].state, "waiting"); 255 break; 256 case WORK_EXECUTING: 257 strcpy(dump_info[i].state, "work_executing"); 258 break; 259 case WORK_END: 260 strcpy(dump_info[i].state, "work_end"); 261 break; 262 case DONE_EXECUTING: 263 strcpy(dump_info[i].state, "done_executing"); 264 break; 265 case DONE_END: 266 strcpy(dump_info[i].state, "done_end"); 267 break; 268 default: 269 break; 270 } 271 dump_info[i].execute_start_time = info->execute_start_time; 272 dump_info[i].execute_end_time = info->execute_end_time; 273 dump_info[i].done_start_time = info->done_start_time; 274 dump_info[i].done_end_time = info->done_end_time; 275 ++i; 276 } 277 uv_mutex_unlock(&dump_queue_mutex); 278 return dump_info; 279#else 280 size = 0; 281 return NULL; 282#endif 283} 284#endif 285 286 287static void init_closed_uv_loop_rwlock_once(void) { 288 uv_rwlock_init(&g_closed_uv_loop_rwlock); 289} 290 291 292void rdlock_closed_uv_loop_rwlock(void) { 293 uv_rwlock_rdlock(&g_closed_uv_loop_rwlock); 294} 295 296 297void rdunlock_closed_uv_loop_rwlock(void) { 298 uv_rwlock_rdunlock(&g_closed_uv_loop_rwlock); 299} 300 301 302int is_uv_loop_good_magic(const uv_loop_t* loop) { 303 if (loop->magic == UV_LOOP_MAGIC) { 304 return 1; 305 } 306 UV_LOGE("loop:(%{public}zu:%{public}#x) invalid", (size_t)loop, loop->magic); 307 return 0; 308} 309 310 311void on_uv_loop_close(uv_loop_t* loop) { 312 time_t t1, t2; 313 time(&t1); 314 uv_start_trace(UV_TRACE_TAG, "Get Write Lock"); 315 uv_rwlock_wrlock(&g_closed_uv_loop_rwlock); 316 uv_end_trace(UV_TRACE_TAG); 317 loop->magic = ~UV_LOOP_MAGIC; 318 uv_start_trace(UV_TRACE_TAG, "Release Write Lock"); 319 uv_rwlock_wrunlock(&g_closed_uv_loop_rwlock); 320 uv_end_trace(UV_TRACE_TAG); 321 time(&t2); 322 UV_LOGI("loop:(%{public}zu) closed in %{public}zds", (size_t)loop, (ssize_t)(t2 - t1)); 323} 324 325 326static void uv__cancelled(struct uv__work* w) { 327 abort(); 328} 329 330 331#ifndef USE_FFRT 332static unsigned int slow_io_work_running; 333 334static unsigned int slow_work_thread_threshold(void) { 335 return (nthreads + 1) / 2; 336} 337 338 339/* To avoid deadlock with uv_cancel() it's crucial that the worker 340 * never holds the global mutex and the loop-local mutex at the same time. 341 */ 342static void worker(void* arg) { 343 struct uv__work* w; 344 struct uv__queue* q; 345 int is_slow_work; 346 347 uv_sem_post((uv_sem_t*) arg); 348 arg = NULL; 349 350 uv_mutex_lock(&mutex); 351 for (;;) { 352 /* `mutex` should always be locked at this point. */ 353 354 /* Keep waiting while either no work is present or only slow I/O 355 and we're at the threshold for that. */ 356 while (uv__queue_empty(&wq) || 357 (uv__queue_head(&wq) == &run_slow_work_message && 358 uv__queue_next(&run_slow_work_message) == &wq && 359 slow_io_work_running >= slow_work_thread_threshold())) { 360 idle_threads += 1; 361 uv_cond_wait(&cond, &mutex); 362 idle_threads -= 1; 363 } 364 365 q = uv__queue_head(&wq); 366 if (q == &exit_message) { 367 uv_cond_signal(&cond); 368 uv_mutex_unlock(&mutex); 369 break; 370 } 371 372 uv__queue_remove(q); 373 uv__queue_init(q); /* Signal uv_cancel() that the work req is executing. */ 374 375 is_slow_work = 0; 376 if (q == &run_slow_work_message) { 377 /* If we're at the slow I/O threshold, re-schedule until after all 378 other work in the queue is done. */ 379 if (slow_io_work_running >= slow_work_thread_threshold()) { 380 uv__queue_insert_tail(&wq, q); 381 continue; 382 } 383 384 /* If we encountered a request to run slow I/O work but there is none 385 to run, that means it's cancelled => Start over. */ 386 if (uv__queue_empty(&slow_io_pending_wq)) 387 continue; 388 389 is_slow_work = 1; 390 slow_io_work_running++; 391 392 q = uv__queue_head(&slow_io_pending_wq); 393 uv__queue_remove(q); 394 uv__queue_init(q); 395 396 /* If there is more slow I/O work, schedule it to be run as well. */ 397 if (!uv__queue_empty(&slow_io_pending_wq)) { 398 uv__queue_insert_tail(&wq, &run_slow_work_message); 399 if (idle_threads > 0) 400 uv_cond_signal(&cond); 401 } 402 } 403 404 uv_mutex_unlock(&mutex); 405 406 w = uv__queue_data(q, struct uv__work, wq); 407#ifdef UV_STATISTIC 408 uv__post_statistic_work(w, WORK_EXECUTING); 409#endif 410#ifdef ASYNC_STACKTRACE 411 uv_work_t* req = container_of(w, uv_work_t, work_req); 412 LibuvSetStackId((uint64_t)req->reserved[3]); 413#endif 414 w->work(w); 415#ifdef UV_STATISTIC 416 uv__post_statistic_work(w, WORK_END); 417#endif 418 uv_mutex_lock(&w->loop->wq_mutex); 419 w->work = NULL; /* Signal uv_cancel() that the work req is done 420 executing. */ 421 uv__queue_insert_tail(&w->loop->wq, &w->wq); 422 uv_async_send(&w->loop->wq_async); 423 uv_mutex_unlock(&w->loop->wq_mutex); 424 425 /* Lock `mutex` since that is expected at the start of the next 426 * iteration. */ 427 uv_mutex_lock(&mutex); 428 if (is_slow_work) { 429 /* `slow_io_work_running` is protected by `mutex`. */ 430 slow_io_work_running--; 431 } 432 } 433} 434#endif 435 436 437static void post(struct uv__queue* q, enum uv__work_kind kind) { 438 uv_mutex_lock(&mutex); 439 if (kind == UV__WORK_SLOW_IO) { 440 /* Insert into a separate queue. */ 441 uv__queue_insert_tail(&slow_io_pending_wq, q); 442 if (!uv__queue_empty(&run_slow_work_message)) { 443 /* Running slow I/O tasks is already scheduled => Nothing to do here. 444 The worker that runs said other task will schedule this one as well. */ 445 uv_mutex_unlock(&mutex); 446 return; 447 } 448 q = &run_slow_work_message; 449 } 450 451 uv__queue_insert_tail(&wq, q); 452 if (idle_threads > 0) 453 uv_cond_signal(&cond); 454 uv_mutex_unlock(&mutex); 455} 456 457 458#ifdef __MVS__ 459/* TODO(itodorov) - zos: revisit when Woz compiler is available. */ 460__attribute__((destructor)) 461#endif 462void uv__threadpool_cleanup(void) { 463 unsigned int i; 464 465 if (nthreads == 0) 466 return; 467 468#ifndef __MVS__ 469 /* TODO(gabylb) - zos: revisit when Woz compiler is available. */ 470 post(&exit_message, UV__WORK_CPU); 471#endif 472 473 for (i = 0; i < nthreads; i++) 474 if (uv_thread_join(threads + i)) 475 abort(); 476 477 if (threads != default_threads) 478 uv__free(threads); 479 480 uv_mutex_destroy(&mutex); 481 uv_cond_destroy(&cond); 482 483 threads = NULL; 484 nthreads = 0; 485#ifdef UV_STATISTIC 486 post_statistic_work(&exit_message); 487 uv_thread_join(dump_thread); 488 uv_mutex_destroy(&statistic_mutex); 489 uv_cond_destroy(&dump_cond); 490#endif 491} 492 493 494#ifndef USE_FFRT 495static void init_threads(void) { 496 uv_thread_options_t config; 497 unsigned int i; 498 const char* val; 499 uv_sem_t sem; 500 501 nthreads = ARRAY_SIZE(default_threads); 502 val = getenv("UV_THREADPOOL_SIZE"); 503 if (val != NULL) 504 nthreads = atoi(val); 505 if (nthreads == 0) 506 nthreads = 1; 507 if (nthreads > MAX_THREADPOOL_SIZE) 508 nthreads = MAX_THREADPOOL_SIZE; 509 510 threads = default_threads; 511 if (nthreads > ARRAY_SIZE(default_threads)) { 512 threads = uv__malloc(nthreads * sizeof(threads[0])); 513 if (threads == NULL) { 514 nthreads = ARRAY_SIZE(default_threads); 515 threads = default_threads; 516 } 517 } 518 519 if (uv_cond_init(&cond)) 520 abort(); 521 522 if (uv_mutex_init(&mutex)) 523 abort(); 524 525 uv__queue_init(&wq); 526 uv__queue_init(&slow_io_pending_wq); 527 uv__queue_init(&run_slow_work_message); 528 529 if (uv_sem_init(&sem, 0)) 530 abort(); 531 532 config.flags = UV_THREAD_HAS_STACK_SIZE; 533 config.stack_size = 8u << 20; /* 8 MB */ 534 535 for (i = 0; i < nthreads; i++) 536 if (uv_thread_create_ex(threads + i, &config, worker, &sem)) 537 abort(); 538 539 for (i = 0; i < nthreads; i++) 540 uv_sem_wait(&sem); 541 542 uv_sem_destroy(&sem); 543} 544 545 546#ifndef _WIN32 547static void reset_once(void) { 548 uv_once_t child_once = UV_ONCE_INIT; 549 memcpy(&once, &child_once, sizeof(child_once)); 550} 551#endif 552 553 554static void init_once(void) { 555#ifndef _WIN32 556 /* Re-initialize the threadpool after fork. 557 * Note that this discards the global mutex and condition as well 558 * as the work queue. 559 */ 560 if (pthread_atfork(NULL, NULL, &reset_once)) 561 abort(); 562#endif 563 init_closed_uv_loop_rwlock_once(); 564#ifdef UV_STATISTIC 565 init_work_dump_queue(); 566#endif 567 init_threads(); 568} 569 570 571void uv__work_submit(uv_loop_t* loop, 572 struct uv__work* w, 573 enum uv__work_kind kind, 574 void (*work)(struct uv__work* w), 575 void (*done)(struct uv__work* w, int status)) { 576 uv_once(&once, init_once); 577 w->loop = loop; 578 w->work = work; 579 w->done = done; 580 post(&w->wq, kind); 581} 582#endif 583 584 585#ifdef USE_FFRT 586static void uv__task_done_wrapper(void* work, int status) { 587 struct uv__work* w = (struct uv__work*)work; 588 w->done(w, status); 589} 590#endif 591 592 593/* TODO(bnoordhuis) teach libuv how to cancel file operations 594 * that go through io_uring instead of the thread pool. 595 */ 596static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { 597 int cancelled; 598 599 rdlock_closed_uv_loop_rwlock(); 600 if (!is_uv_loop_good_magic(w->loop)) { 601 rdunlock_closed_uv_loop_rwlock(); 602 return 0; 603 } 604 605#ifndef USE_FFRT 606 uv_mutex_lock(&mutex); 607 uv_mutex_lock(&w->loop->wq_mutex); 608 609 cancelled = !uv__queue_empty(&w->wq) && w->work != NULL; 610 if (cancelled) 611 uv__queue_remove(&w->wq); 612 613 uv_mutex_unlock(&w->loop->wq_mutex); 614 uv_mutex_unlock(&mutex); 615#else 616 uv_mutex_lock(&w->loop->wq_mutex); 617 cancelled = !uv__queue_empty(&w->wq) && w->work != NULL 618 && ffrt_executor_task_cancel(w, (ffrt_qos_t)(intptr_t)req->reserved[0]); 619 uv_mutex_unlock(&w->loop->wq_mutex); 620#endif 621 622 if (!cancelled) { 623 rdunlock_closed_uv_loop_rwlock(); 624 return UV_EBUSY; 625 } 626 627 w->work = uv__cancelled; 628 uv_mutex_lock(&loop->wq_mutex); 629#ifndef USE_FFRT 630 uv__queue_insert_tail(&loop->wq, &w->wq); 631 uv_async_send(&loop->wq_async); 632#else 633 uv__loop_internal_fields_t* lfields = uv__get_internal_fields(w->loop); 634 int qos = (ffrt_qos_t)(intptr_t)req->reserved[0]; 635 636 if (uv_check_data_valid((struct uv_loop_data*)(w->loop->data)) == 0) { 637 int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0; 638 struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)w->loop->data - 639 (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS)); 640 addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos); 641 } else { 642 uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq); 643 uv_async_send(&loop->wq_async); 644 } 645#endif 646 uv_mutex_unlock(&loop->wq_mutex); 647 rdunlock_closed_uv_loop_rwlock(); 648 649 return 0; 650} 651 652 653void uv__work_done(uv_async_t* handle) { 654 struct uv__work* w; 655 uv_loop_t* loop; 656 struct uv__queue* q; 657 struct uv__queue wq; 658 int err; 659 int nevents; 660 661 loop = container_of(handle, uv_loop_t, wq_async); 662 rdlock_closed_uv_loop_rwlock(); 663 if (!is_uv_loop_good_magic(loop)) { 664 rdunlock_closed_uv_loop_rwlock(); 665 return; 666 } 667 rdunlock_closed_uv_loop_rwlock(); 668 669 uv_mutex_lock(&loop->wq_mutex); 670#ifndef USE_FFRT 671 uv__queue_move(&loop->wq, &wq); 672#else 673 uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop); 674 int i; 675 uv__queue_init(&wq); 676 for (i = 3; i >= 0; i--) { 677 if (!uv__queue_empty(&lfields->wq_sub[i])) { 678 uv__queue_append(&lfields->wq_sub[i], &wq); 679 } 680 } 681#endif 682 uv_mutex_unlock(&loop->wq_mutex); 683 684 nevents = 0; 685 uv_start_trace(UV_TRACE_TAG, UV_TRACE_NAME); 686 while (!uv__queue_empty(&wq)) { 687 q = uv__queue_head(&wq); 688 uv__queue_remove(q); 689 690 w = container_of(q, struct uv__work, wq); 691 err = (w->work == uv__cancelled) ? UV_ECANCELED : 0; 692#ifdef UV_STATISTIC 693 uv__post_statistic_work(w, DONE_EXECUTING); 694 struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work)); 695 if (dump_work == NULL) { 696 UV_LOGE("malloc(%{public}zu) failed: %{public}s", sizeof(struct uv__statistic_work), strerror(errno)); 697 break; 698 } 699 dump_work->info = w->info; 700 dump_work->work = uv__update_work_info; 701#endif 702#ifdef ASYNC_STACKTRACE 703 uv_work_t* req = container_of(w, uv_work_t, work_req); 704 LibuvSetStackId((uint64_t)req->reserved[3]); 705#endif 706 w->done(w, err); 707 nevents++; 708#ifdef UV_STATISTIC 709 dump_work->time = uv__now_timestamp(); 710 dump_work->state = DONE_END; 711 QUEUE_INIT(&dump_work->wq); 712 post_statistic_work(&dump_work->wq); 713#endif 714 } 715 uv_end_trace(UV_TRACE_TAG); 716 717 /* This check accomplishes 2 things: 718 * 1. Even if the queue was empty, the call to uv__work_done() should count 719 * as an event. Which will have been added by the event loop when 720 * calling this callback. 721 * 2. Prevents accidental wrap around in case nevents == 0 events == 0. 722 */ 723 if (nevents > 1) { 724 /* Subtract 1 to counter the call to uv__work_done(). */ 725 uv__metrics_inc_events(loop, nevents - 1); 726 if (uv__get_internal_fields(loop)->current_timeout == 0) 727 uv__metrics_inc_events_waiting(loop, nevents - 1); 728 } 729} 730 731 732static void uv__queue_work(struct uv__work* w) { 733 uv_work_t* req = container_of(w, uv_work_t, work_req); 734 735 req->work_cb(req); 736} 737 738 739static void uv__queue_done(struct uv__work* w, int err) { 740 uv_work_t* req; 741 742 if (w == NULL) { 743 UV_LOGE("uv_work_t is NULL"); 744 return; 745 } 746 747 req = container_of(w, uv_work_t, work_req); 748 uv__req_unregister(req->loop, req); 749 750 if (req->after_work_cb == NULL) 751 return; 752 753 req->after_work_cb(req, err); 754} 755 756 757#ifdef USE_FFRT 758void uv__ffrt_work(ffrt_executor_task_t* data, ffrt_qos_t qos) 759{ 760 struct uv__work* w = (struct uv__work *)data; 761 uv_loop_t* loop = w->loop; 762#ifdef UV_STATISTIC 763 uv__post_statistic_work(w, WORK_EXECUTING); 764#endif 765 uv_work_t* req = container_of(w, uv_work_t, work_req); 766#ifdef ASYNC_STACKTRACE 767 LibuvSetStackId((uint64_t)req->reserved[3]); 768#endif 769 w->work(w); 770#ifdef UV_STATISTIC 771 uv__post_statistic_work(w, WORK_END); 772#endif 773 rdlock_closed_uv_loop_rwlock(); 774 if (loop->magic != UV_LOOP_MAGIC) { 775 rdunlock_closed_uv_loop_rwlock(); 776 UV_LOGE("uv_loop(%{public}zu:%{public}#x) in task(%p:%p) is invalid", 777 (size_t)loop, loop->magic, req->work_cb, req->after_work_cb); 778 return; 779 } 780 781 uv_mutex_lock(&loop->wq_mutex); 782 w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ 783 784 if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) == 0) { 785 int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0; 786 struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)loop->data - 787 (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS)); 788 addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos); 789 } else { 790 uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop); 791 uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq); 792 uv_async_send(&loop->wq_async); 793 } 794 uv_mutex_unlock(&loop->wq_mutex); 795 rdunlock_closed_uv_loop_rwlock(); 796} 797 798static void init_once(void) 799{ 800 init_closed_uv_loop_rwlock_once(); 801 /* init uv work statics queue */ 802#ifdef UV_STATISTIC 803 init_work_dump_queue(); 804#endif 805 ffrt_executor_task_register_func(uv__ffrt_work, ffrt_uv_task); 806} 807 808 809/* ffrt uv__work_submit */ 810void uv__work_submit(uv_loop_t* loop, 811 uv_req_t* req, 812 struct uv__work* w, 813 enum uv__work_kind kind, 814 void (*work)(struct uv__work *w), 815 void (*done)(struct uv__work *w, int status)) { 816 uv_once(&once, init_once); 817 ffrt_task_attr_t attr; 818 ffrt_task_attr_init(&attr); 819 820 switch(kind) { 821 case UV__WORK_CPU: 822 ffrt_task_attr_set_qos(&attr, ffrt_qos_default); 823 break; 824 case UV__WORK_FAST_IO: 825 ffrt_task_attr_set_qos(&attr, ffrt_qos_default); 826 break; 827 case UV__WORK_SLOW_IO: 828 ffrt_task_attr_set_qos(&attr, ffrt_qos_background); 829 break; 830 default: 831#ifdef USE_OHOS_DFX 832 UV_LOGI("Unknown work kind"); 833#endif 834 return; 835 } 836 837 w->loop = loop; 838 w->work = work; 839 w->done = done; 840 841 req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr); 842 ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr); 843 ffrt_task_attr_destroy(&attr); 844} 845 846 847/* ffrt uv__work_submit */ 848void uv__work_submit_with_qos(uv_loop_t* loop, 849 uv_req_t* req, 850 struct uv__work* w, 851 ffrt_qos_t qos, 852 void (*work)(struct uv__work *w), 853 void (*done)(struct uv__work *w, int status)) { 854 uv_once(&once, init_once); 855 ffrt_task_attr_t attr; 856 ffrt_task_attr_init(&attr); 857 ffrt_task_attr_set_qos(&attr, qos); 858 859 w->loop = loop; 860 w->work = work; 861 w->done = done; 862 863 req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr); 864 ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr); 865 ffrt_task_attr_destroy(&attr); 866} 867#endif 868 869 870int uv_queue_work(uv_loop_t* loop, 871 uv_work_t* req, 872 uv_work_cb work_cb, 873 uv_after_work_cb after_work_cb) { 874 if (work_cb == NULL) 875 return UV_EINVAL; 876 877 uv__req_init(loop, req, UV_WORK); 878 req->loop = loop; 879 req->work_cb = work_cb; 880 req->after_work_cb = after_work_cb; 881 882#ifdef UV_STATISTIC 883 struct uv_work_dump_info* info = (struct uv_work_dump_info*) malloc(sizeof(struct uv_work_dump_info)); 884 if (info == NULL) { 885 abort(); 886 } 887 uv_init_dump_info(info, &req->work_req); 888 info->builtin_return_address[0] = __builtin_return_address(0); 889 info->builtin_return_address[1] = __builtin_return_address(1); 890 info->builtin_return_address[2] = __builtin_return_address(2); 891 (req->work_req).info = info; 892#endif 893#ifdef ASYNC_STACKTRACE 894 req->reserved[3] = (void*)LibuvCollectAsyncStack(); 895#endif 896 uv__work_submit(loop, 897#ifdef USE_FFRT 898 (uv_req_t*)req, 899#endif 900 &req->work_req, 901 UV__WORK_CPU, 902 uv__queue_work, 903 uv__queue_done 904); 905#ifdef UV_STATISTIC 906 uv_queue_statics(info); 907#endif 908 return 0; 909} 910 911 912int uv_queue_work_with_qos(uv_loop_t* loop, 913 uv_work_t* req, 914 uv_work_cb work_cb, 915 uv_after_work_cb after_work_cb, 916 uv_qos_t qos) { 917#ifdef USE_FFRT 918 if (work_cb == NULL) 919 return UV_EINVAL; 920 921 STATIC_ASSERT(uv_qos_background == ffrt_qos_background); 922 STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility); 923 STATIC_ASSERT(uv_qos_default == ffrt_qos_default); 924 STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated); 925 if (qos < ffrt_qos_background || qos > ffrt_qos_user_initiated) { 926 return UV_EINVAL; 927 } 928 929 uv__req_init(loop, req, UV_WORK); 930 req->loop = loop; 931 req->work_cb = work_cb; 932 req->after_work_cb = after_work_cb; 933#ifdef UV_STATISTIC 934 struct uv_work_dump_info* info = (struct uv_work_dump_info*)malloc(sizeof(struct uv_work_dump_info)); 935 if (info == NULL) { 936 abort(); 937 } 938 uv_init_dump_info(info, &req->work_req); 939 info->builtin_return_address[0] = __builtin_return_address(0); 940 info->builtin_return_address[1] = __builtin_return_address(1); 941 info->builtin_return_address[2] = __builtin_return_address(2); 942 (req->work_req).info = info; 943#endif 944 uv__work_submit_with_qos(loop, 945 (uv_req_t*)req, 946 &req->work_req, 947 (ffrt_qos_t)qos, 948 uv__queue_work, 949 uv__queue_done); 950#ifdef UV_STATISTIC 951 uv_queue_statics(info); 952#endif 953 return 0; 954#else 955 return uv_queue_work(loop, req, work_cb, after_work_cb); 956#endif 957} 958 959 960int uv_cancel(uv_req_t* req) { 961 struct uv__work* wreq; 962 uv_loop_t* loop; 963 964 switch (req->type) { 965 case UV_FS: 966 loop = ((uv_fs_t*) req)->loop; 967 wreq = &((uv_fs_t*) req)->work_req; 968 break; 969 case UV_GETADDRINFO: 970 loop = ((uv_getaddrinfo_t*) req)->loop; 971 wreq = &((uv_getaddrinfo_t*) req)->work_req; 972 break; 973 case UV_GETNAMEINFO: 974 loop = ((uv_getnameinfo_t*) req)->loop; 975 wreq = &((uv_getnameinfo_t*) req)->work_req; 976 break; 977 case UV_RANDOM: 978 loop = ((uv_random_t*) req)->loop; 979 wreq = &((uv_random_t*) req)->work_req; 980 break; 981 case UV_WORK: 982 loop = ((uv_work_t*) req)->loop; 983 wreq = &((uv_work_t*) req)->work_req; 984 break; 985 default: 986 return UV_EINVAL; 987 } 988 989 return uv__work_cancel(loop, req, wreq); 990} 991