1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * Permission is hereby granted, free of charge, to any person obtaining a copy 3 * of this software and associated documentation files (the "Software"), to 4 * deal in the Software without restriction, including without limitation the 5 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 6 * sell copies of the Software, and to permit persons to whom the Software is 7 * furnished to do so, subject to the following conditions: 8 * 9 * The above copyright notice and this permission notice shall be included in 10 * all copies or substantial portions of the Software. 11 * 12 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 13 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 14 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 15 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 16 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 17 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 18 * IN THE SOFTWARE. 19 */ 20 21/* This file contains both the uv__async internal infrastructure and the 22 * user-facing uv_async_t functions. 23 */ 24 25#include "uv.h" 26#include "internal.h" 27#include "uv_log.h" 28 29#include <errno.h> 30#include <stdatomic.h> 31#include <stdio.h> /* snprintf() */ 32#include <assert.h> 33#include <stdlib.h> 34#include <string.h> 35#include <unistd.h> 36#include <sched.h> /* sched_yield() */ 37 38#ifdef __linux__ 39#include <sys/eventfd.h> 40#endif 41 42#ifdef USE_FFRT 43#include "ffrt.h" 44#include "c/executor_task.h" 45#endif 46 47static void uv__async_send(uv_async_t* handle); 48static int uv__async_start(uv_loop_t* loop); 49 50 51int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) { 52 int err; 53 54 err = uv__async_start(loop); 55 if (err) 56 return err; 57 58 uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC); 59 handle->async_cb = async_cb; 60 handle->pending = 0; 61 62 uv__queue_insert_tail(&loop->async_handles, &handle->queue); 63 uv__handle_start(handle); 64 65 return 0; 66} 67 68 69int uv_async_send(uv_async_t* handle) { 70 _Atomic int* pending; 71 72#ifdef USE_OHOS_DFX 73 if (handle == NULL) { 74 UV_LOGF("handle is NULL"); 75 return -1; 76 } 77#endif 78 79 pending = (_Atomic int*) &handle->pending; 80 81 82 /* Do a cheap read first. */ 83 if (atomic_load_explicit(pending, memory_order_relaxed) != 0) 84 return 0; 85 86 /* Wake up the other thread's event loop. */ 87 if (atomic_exchange(pending, 1) != 0) 88 return 0; 89 90 /* Wake up the other thread's event loop. */ 91 uv__async_send(handle); 92 return 0; 93} 94 95 96 97void uv__async_close(uv_async_t* handle) { 98 atomic_exchange((_Atomic int*) &handle->pending, 0); 99 uv__queue_remove(&handle->queue); 100 uv__handle_stop(handle); 101} 102 103 104static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { 105 char buf[1024]; 106 ssize_t r; 107 struct uv__queue queue; 108 struct uv__queue* q; 109 uv_async_t* h; 110 _Atomic int *pending; 111 112 assert(w == &loop->async_io_watcher); 113 114 for (;;) { 115 r = read(w->fd, buf, sizeof(buf)); 116 117 if (r == sizeof(buf)) 118 continue; 119 120 if (r != -1) 121 break; 122 123 if (errno == EAGAIN || errno == EWOULDBLOCK) 124 break; 125 126 if (errno == EINTR) 127 continue; 128 129#ifdef PRINT_ERRNO_ABORT 130 UV_ERRNO_ABORT("errno is %d, loop addr is %zu, fd is %d (%s:%s:%d)", 131 errno, (size_t)loop, w->fd, __FILE__, __func__, __LINE__); 132#else 133 abort(); 134#endif 135 } 136 137 uv__queue_move(&loop->async_handles, &queue); 138 while (!uv__queue_empty(&queue)) { 139 q = uv__queue_head(&queue); 140 h = uv__queue_data(q, uv_async_t, queue); 141 142 uv__queue_remove(q); 143 uv__queue_insert_tail(&loop->async_handles, q); 144 145 /* Atomically fetch and clear pending flag */ 146 pending = (_Atomic int*) &h->pending; 147 if (atomic_exchange(pending, 0) == 0) 148 continue; 149 150 if (h->async_cb == NULL) 151 continue; 152 153 h->async_cb(h); 154 } 155} 156 157 158static void uv__async_send(uv_async_t* handle) { 159 const void* buf; 160 ssize_t len; 161 int fd; 162 int r; 163 164 uv_loop_t* loop = handle->loop; 165 if (loop == NULL) { 166 UV_LOGF("loop is NULL"); 167 return; 168 } 169 170 buf = ""; 171 len = 1; 172 fd = loop->async_wfd; 173 174#if defined(__linux__) 175 if (fd == -1) { 176 static const uint64_t val = 1; 177 buf = &val; 178 len = sizeof(val); 179 fd = loop->async_io_watcher.fd; /* eventfd */ 180 } 181#endif 182 183 do 184 r = write(fd, buf, len); 185 while (r == -1 && errno == EINTR && atomic_load_explicit((_Atomic int*) &handle->pending, memory_order_relaxed) == 1); 186 187 if (r == len) 188 return; 189 190 if (r == -1) 191 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) 192 return; 193 194#ifdef PRINT_ERRNO_ABORT 195 UV_ERRNO_ABORT("errno is %d, loop addr is %zu, fd is %d (%s:%s:%d)", 196 errno, (size_t)loop, fd, __FILE__, __func__, __LINE__); 197#else 198 abort(); 199#endif 200} 201 202 203static int uv__async_start(uv_loop_t* loop) { 204 int pipefd[2]; 205 int err; 206 207 if (loop->async_io_watcher.fd != -1) 208 return 0; 209 210#ifdef __linux__ 211 err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); 212 if (err < 0) 213 return UV__ERR(errno); 214 215 pipefd[0] = err; 216 pipefd[1] = -1; 217#ifdef USE_OHOS_DFX 218 fdsan_exchange_owner_tag(pipefd[0], 0, uv__get_addr_tag((void *)&loop->async_io_watcher)); 219#endif 220#else 221 err = uv__make_pipe(pipefd, UV_NONBLOCK_PIPE); 222 if (err < 0) 223 return err; 224#endif 225 226 uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]); 227 uv__io_start(loop, &loop->async_io_watcher, POLLIN); 228 loop->async_wfd = pipefd[1]; 229 UV_LOGI("open:%{public}zu, pipefd[0]:%{public}d", (size_t)loop, pipefd[0]); 230 return 0; 231} 232 233 234int uv__async_fork(uv_loop_t* loop) { 235 if (loop->async_io_watcher.fd == -1) /* never started */ 236 return 0; 237 238 uv__async_stop(loop); 239 240 return uv__async_start(loop); 241} 242 243 244void uv__async_stop(uv_loop_t* loop) { 245 if (loop->async_io_watcher.fd == -1) 246 return; 247 248 if (loop->async_wfd != -1) { 249 if (loop->async_wfd != loop->async_io_watcher.fd) { 250 UV_LOGI("close:%{public}zu, async_wfd:%{public}d", (size_t)loop, loop->async_wfd); 251 uv__close(loop->async_wfd); 252 } 253 loop->async_wfd = -1; 254 } 255 256 uv__io_stop(loop, &loop->async_io_watcher, POLLIN); 257#ifdef USE_FFRT 258 if (ffrt_get_cur_task() != NULL) { 259 uv__epoll_ctl(loop->backend_fd, EPOLL_CTL_DEL, loop->async_io_watcher.fd, NULL); 260 } 261#endif 262 263#if defined(__linux__) && defined(USE_OHOS_DFX) 264 fdsan_close_with_tag(loop->async_io_watcher.fd, uv__get_addr_tag((void *)&loop->async_io_watcher)); 265#else 266 uv__close(loop->async_io_watcher.fd); 267#endif 268 UV_LOGI("close:%{public}zu, async_io_wfd:%{public}d", (size_t)loop, loop->async_io_watcher.fd); 269 loop->async_io_watcher.fd = -1; 270} 271 272