1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22#include "uv.h" 23#include "internal.h" 24 25#include <assert.h> 26#include <string.h> 27#include <errno.h> 28#include <stdlib.h> 29#include <unistd.h> 30#if defined(__MVS__) 31#include <xti.h> 32#endif 33#include <sys/un.h> 34 35#if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP) 36# define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP 37#endif 38 39#if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP) 40# define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP 41#endif 42 43static void uv__udp_run_completed(uv_udp_t* handle); 44static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents); 45static void uv__udp_recvmsg(uv_udp_t* handle); 46static void uv__udp_sendmsg(uv_udp_t* handle); 47static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, 48 int domain, 49 unsigned int flags); 50 51 52void uv__udp_close(uv_udp_t* handle) { 53 uv__io_close(handle->loop, &handle->io_watcher); 54 uv__handle_stop(handle); 55 56 if (handle->io_watcher.fd != -1) { 57 uv__close(handle->io_watcher.fd); 58 handle->io_watcher.fd = -1; 59 } 60} 61 62 63void uv__udp_finish_close(uv_udp_t* handle) { 64 uv_udp_send_t* req; 65 struct uv__queue* q; 66 67 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT)); 68 assert(handle->io_watcher.fd == -1); 69 70 while (!uv__queue_empty(&handle->write_queue)) { 71 q = uv__queue_head(&handle->write_queue); 72 uv__queue_remove(q); 73 74 req = uv__queue_data(q, uv_udp_send_t, queue); 75 req->status = UV_ECANCELED; 76 uv__queue_insert_tail(&handle->write_completed_queue, &req->queue); 77 } 78 79 uv__udp_run_completed(handle); 80 81 assert(handle->send_queue_size == 0); 82 assert(handle->send_queue_count == 0); 83 84 /* Now tear down the handle. */ 85 handle->recv_cb = NULL; 86 handle->alloc_cb = NULL; 87 /* but _do not_ touch close_cb */ 88} 89 90 91static void uv__udp_run_completed(uv_udp_t* handle) { 92 uv_udp_send_t* req; 93 struct uv__queue* q; 94 95 assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING)); 96 handle->flags |= UV_HANDLE_UDP_PROCESSING; 97 98 while (!uv__queue_empty(&handle->write_completed_queue)) { 99 q = uv__queue_head(&handle->write_completed_queue); 100 uv__queue_remove(q); 101 102 req = uv__queue_data(q, uv_udp_send_t, queue); 103 uv__req_unregister(handle->loop, req); 104 105 handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs); 106 handle->send_queue_count--; 107 108 if (req->bufs != req->bufsml) 109 uv__free(req->bufs); 110 req->bufs = NULL; 111 112 if (req->send_cb == NULL) 113 continue; 114 115 /* req->status >= 0 == bytes written 116 * req->status < 0 == errno 117 */ 118 if (req->status >= 0) 119 req->send_cb(req, 0); 120 else 121 req->send_cb(req, req->status); 122 } 123 124 if (uv__queue_empty(&handle->write_queue)) { 125 /* Pending queue and completion queue empty, stop watcher. */ 126 uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT); 127 if (!uv__io_active(&handle->io_watcher, POLLIN)) 128 uv__handle_stop(handle); 129 } 130 131 handle->flags &= ~UV_HANDLE_UDP_PROCESSING; 132} 133 134 135static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) { 136 uv_udp_t* handle; 137 138 handle = container_of(w, uv_udp_t, io_watcher); 139 assert(handle->type == UV_UDP); 140 141 if (revents & POLLIN) 142 uv__udp_recvmsg(handle); 143 144 if (revents & POLLOUT) { 145 uv__udp_sendmsg(handle); 146 uv__udp_run_completed(handle); 147 } 148} 149 150static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) { 151#if defined(__linux__) || defined(__FreeBSD__) 152 struct sockaddr_in6 peers[20]; 153 struct iovec iov[ARRAY_SIZE(peers)]; 154 struct mmsghdr msgs[ARRAY_SIZE(peers)]; 155 ssize_t nread; 156 uv_buf_t chunk_buf; 157 size_t chunks; 158 int flags; 159 size_t k; 160 161 /* prepare structures for recvmmsg */ 162 chunks = buf->len / UV__UDP_DGRAM_MAXSIZE; 163 if (chunks > ARRAY_SIZE(iov)) 164 chunks = ARRAY_SIZE(iov); 165 for (k = 0; k < chunks; ++k) { 166 iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE; 167 iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE; 168 memset(&msgs[k].msg_hdr, 0, sizeof(msgs[k].msg_hdr)); 169 msgs[k].msg_hdr.msg_iov = iov + k; 170 msgs[k].msg_hdr.msg_iovlen = 1; 171 msgs[k].msg_hdr.msg_name = peers + k; 172 msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]); 173 msgs[k].msg_hdr.msg_control = NULL; 174 msgs[k].msg_hdr.msg_controllen = 0; 175 msgs[k].msg_hdr.msg_flags = 0; 176 } 177 178 do 179 nread = recvmmsg(handle->io_watcher.fd, msgs, chunks, 0, NULL); 180 while (nread == -1 && errno == EINTR); 181 182 if (nread < 1) { 183 if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK) 184 handle->recv_cb(handle, 0, buf, NULL, 0); 185 else 186 handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0); 187 } else { 188 /* pass each chunk to the application */ 189 for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) { 190 flags = UV_UDP_MMSG_CHUNK; 191 if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC) 192 flags |= UV_UDP_PARTIAL; 193 194 chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len); 195 handle->recv_cb(handle, 196 msgs[k].msg_len, 197 &chunk_buf, 198 msgs[k].msg_hdr.msg_name, 199 flags); 200 } 201 202 /* one last callback so the original buffer is freed */ 203 if (handle->recv_cb != NULL) 204 handle->recv_cb(handle, 0, buf, NULL, UV_UDP_MMSG_FREE); 205 } 206 return nread; 207#else /* __linux__ || ____FreeBSD__ */ 208 return UV_ENOSYS; 209#endif /* __linux__ || ____FreeBSD__ */ 210} 211 212static void uv__udp_recvmsg(uv_udp_t* handle) { 213 struct sockaddr_storage peer; 214 struct msghdr h; 215 ssize_t nread; 216 uv_buf_t buf; 217 int flags; 218 int count; 219 220 assert(handle->recv_cb != NULL); 221 assert(handle->alloc_cb != NULL); 222 223 /* Prevent loop starvation when the data comes in as fast as (or faster than) 224 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O. 225 */ 226 count = 32; 227 228 do { 229 buf = uv_buf_init(NULL, 0); 230 handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf); 231 if (buf.base == NULL || buf.len == 0) { 232 handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0); 233 return; 234 } 235 assert(buf.base != NULL); 236 237 if (uv_udp_using_recvmmsg(handle)) { 238 nread = uv__udp_recvmmsg(handle, &buf); 239 if (nread > 0) 240 count -= nread; 241 continue; 242 } 243 244 memset(&h, 0, sizeof(h)); 245 memset(&peer, 0, sizeof(peer)); 246 h.msg_name = &peer; 247 h.msg_namelen = sizeof(peer); 248 h.msg_iov = (void*) &buf; 249 h.msg_iovlen = 1; 250 251 do { 252 nread = recvmsg(handle->io_watcher.fd, &h, 0); 253 } 254 while (nread == -1 && errno == EINTR); 255 256 if (nread == -1) { 257 if (errno == EAGAIN || errno == EWOULDBLOCK) 258 handle->recv_cb(handle, 0, &buf, NULL, 0); 259 else 260 handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0); 261 } 262 else { 263 flags = 0; 264 if (h.msg_flags & MSG_TRUNC) 265 flags |= UV_UDP_PARTIAL; 266 267 handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags); 268 } 269 count--; 270 } 271 /* recv_cb callback may decide to pause or close the handle */ 272 while (nread != -1 273 && count > 0 274 && handle->io_watcher.fd != -1 275 && handle->recv_cb != NULL); 276} 277 278static void uv__udp_sendmsg(uv_udp_t* handle) { 279#if defined(__linux__) || defined(__FreeBSD__) 280 uv_udp_send_t* req; 281 struct mmsghdr h[20]; 282 struct mmsghdr* p; 283 struct uv__queue* q; 284 ssize_t npkts; 285 size_t pkts; 286 size_t i; 287 288 if (uv__queue_empty(&handle->write_queue)) 289 return; 290 291write_queue_drain: 292 for (pkts = 0, q = uv__queue_head(&handle->write_queue); 293 pkts < ARRAY_SIZE(h) && q != &handle->write_queue; 294 ++pkts, q = uv__queue_head(q)) { 295 assert(q != NULL); 296 req = uv__queue_data(q, uv_udp_send_t, queue); 297 assert(req != NULL); 298 299 p = &h[pkts]; 300 memset(p, 0, sizeof(*p)); 301 if (req->addr.ss_family == AF_UNSPEC) { 302 p->msg_hdr.msg_name = NULL; 303 p->msg_hdr.msg_namelen = 0; 304 } else { 305 p->msg_hdr.msg_name = &req->addr; 306 if (req->addr.ss_family == AF_INET6) 307 p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in6); 308 else if (req->addr.ss_family == AF_INET) 309 p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in); 310 else if (req->addr.ss_family == AF_UNIX) 311 p->msg_hdr.msg_namelen = sizeof(struct sockaddr_un); 312 else { 313 assert(0 && "unsupported address family"); 314 abort(); 315 } 316 } 317 h[pkts].msg_hdr.msg_iov = (struct iovec*) req->bufs; 318 h[pkts].msg_hdr.msg_iovlen = req->nbufs; 319 } 320 321 do 322 npkts = sendmmsg(handle->io_watcher.fd, h, pkts, 0); 323 while (npkts == -1 && errno == EINTR); 324 325 if (npkts < 1) { 326 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) 327 return; 328 for (i = 0, q = uv__queue_head(&handle->write_queue); 329 i < pkts && q != &handle->write_queue; 330 ++i, q = uv__queue_head(&handle->write_queue)) { 331 assert(q != NULL); 332 req = uv__queue_data(q, uv_udp_send_t, queue); 333 assert(req != NULL); 334 335 req->status = UV__ERR(errno); 336 uv__queue_remove(&req->queue); 337 uv__queue_insert_tail(&handle->write_completed_queue, &req->queue); 338 } 339 uv__io_feed(handle->loop, &handle->io_watcher); 340 return; 341 } 342 343 /* Safety: npkts known to be >0 below. Hence cast from ssize_t 344 * to size_t safe. 345 */ 346 for (i = 0, q = uv__queue_head(&handle->write_queue); 347 i < (size_t)npkts && q != &handle->write_queue; 348 ++i, q = uv__queue_head(&handle->write_queue)) { 349 assert(q != NULL); 350 req = uv__queue_data(q, uv_udp_send_t, queue); 351 assert(req != NULL); 352 353 req->status = req->bufs[0].len; 354 355 /* Sending a datagram is an atomic operation: either all data 356 * is written or nothing is (and EMSGSIZE is raised). That is 357 * why we don't handle partial writes. Just pop the request 358 * off the write queue and onto the completed queue, done. 359 */ 360 uv__queue_remove(&req->queue); 361 uv__queue_insert_tail(&handle->write_completed_queue, &req->queue); 362 } 363 364 /* couldn't batch everything, continue sending (jump to avoid stack growth) */ 365 if (!uv__queue_empty(&handle->write_queue)) 366 goto write_queue_drain; 367 uv__io_feed(handle->loop, &handle->io_watcher); 368#else /* __linux__ || ____FreeBSD__ */ 369 uv_udp_send_t* req; 370 struct msghdr h; 371 struct uv__queue* q; 372 ssize_t size; 373 374 while (!uv__queue_empty(&handle->write_queue)) { 375 q = uv__queue_head(&handle->write_queue); 376 assert(q != NULL); 377 378 req = uv__queue_data(q, uv_udp_send_t, queue); 379 assert(req != NULL); 380 381 memset(&h, 0, sizeof h); 382 if (req->addr.ss_family == AF_UNSPEC) { 383 h.msg_name = NULL; 384 h.msg_namelen = 0; 385 } else { 386 h.msg_name = &req->addr; 387 if (req->addr.ss_family == AF_INET6) 388 h.msg_namelen = sizeof(struct sockaddr_in6); 389 else if (req->addr.ss_family == AF_INET) 390 h.msg_namelen = sizeof(struct sockaddr_in); 391 else if (req->addr.ss_family == AF_UNIX) 392 h.msg_namelen = sizeof(struct sockaddr_un); 393 else { 394 assert(0 && "unsupported address family"); 395 abort(); 396 } 397 } 398 h.msg_iov = (struct iovec*) req->bufs; 399 h.msg_iovlen = req->nbufs; 400 401 do { 402 size = sendmsg(handle->io_watcher.fd, &h, 0); 403 } while (size == -1 && errno == EINTR); 404 405 if (size == -1) { 406 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) 407 break; 408 } 409 410 req->status = (size == -1 ? UV__ERR(errno) : size); 411 412 /* Sending a datagram is an atomic operation: either all data 413 * is written or nothing is (and EMSGSIZE is raised). That is 414 * why we don't handle partial writes. Just pop the request 415 * off the write queue and onto the completed queue, done. 416 */ 417 uv__queue_remove(&req->queue); 418 uv__queue_insert_tail(&handle->write_completed_queue, &req->queue); 419 uv__io_feed(handle->loop, &handle->io_watcher); 420 } 421#endif /* __linux__ || ____FreeBSD__ */ 422} 423 424/* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional 425 * refinements for programs that use multicast. 426 * 427 * Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that 428 * are different from the BSDs: it _shares_ the port rather than steal it 429 * from the current listener. While useful, it's not something we can emulate 430 * on other platforms so we don't enable it. 431 * 432 * zOS does not support getsockname with SO_REUSEPORT option when using 433 * AF_UNIX. 434 */ 435static int uv__set_reuse(int fd) { 436 int yes; 437 yes = 1; 438 439#if defined(SO_REUSEPORT) && defined(__MVS__) 440 struct sockaddr_in sockfd; 441 unsigned int sockfd_len = sizeof(sockfd); 442 if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1) 443 return UV__ERR(errno); 444 if (sockfd.sin_family == AF_UNIX) { 445 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))) 446 return UV__ERR(errno); 447 } else { 448 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) 449 return UV__ERR(errno); 450 } 451#elif defined(SO_REUSEPORT) && !defined(__linux__) && !defined(__GNU__) && \ 452 !defined(__sun__) 453 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) 454 return UV__ERR(errno); 455#else 456 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))) 457 return UV__ERR(errno); 458#endif 459 460 return 0; 461} 462 463/* 464 * The Linux kernel suppresses some ICMP error messages by default for UDP 465 * sockets. Setting IP_RECVERR/IPV6_RECVERR on the socket enables full ICMP 466 * error reporting, hopefully resulting in faster failover to working name 467 * servers. 468 */ 469static int uv__set_recverr(int fd, sa_family_t ss_family) { 470#if defined(__linux__) 471 int yes; 472 473 yes = 1; 474 if (ss_family == AF_INET) { 475 if (setsockopt(fd, IPPROTO_IP, IP_RECVERR, &yes, sizeof(yes))) 476 return UV__ERR(errno); 477 } else if (ss_family == AF_INET6) { 478 if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVERR, &yes, sizeof(yes))) 479 return UV__ERR(errno); 480 } 481#endif 482 return 0; 483} 484 485 486int uv__udp_bind(uv_udp_t* handle, 487 const struct sockaddr* addr, 488 unsigned int addrlen, 489 unsigned int flags) { 490 int err; 491 int yes; 492 int fd; 493 494 /* Check for bad flags. */ 495 if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR | UV_UDP_LINUX_RECVERR)) 496 return UV_EINVAL; 497 498 /* Cannot set IPv6-only mode on non-IPv6 socket. */ 499 if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6) 500 return UV_EINVAL; 501 502 fd = handle->io_watcher.fd; 503 if (fd == -1) { 504 err = uv__socket(addr->sa_family, SOCK_DGRAM, 0); 505 if (err < 0) 506 return err; 507 fd = err; 508 handle->io_watcher.fd = fd; 509 } 510 511 if (flags & UV_UDP_LINUX_RECVERR) { 512 err = uv__set_recverr(fd, addr->sa_family); 513 if (err) 514 return err; 515 } 516 517 if (flags & UV_UDP_REUSEADDR) { 518 err = uv__set_reuse(fd); 519 if (err) 520 return err; 521 } 522 523 if (flags & UV_UDP_IPV6ONLY) { 524#ifdef IPV6_V6ONLY 525 yes = 1; 526 if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) { 527 err = UV__ERR(errno); 528 return err; 529 } 530#else 531 err = UV_ENOTSUP; 532 return err; 533#endif 534 } 535 536 if (bind(fd, addr, addrlen)) { 537 err = UV__ERR(errno); 538 if (errno == EAFNOSUPPORT) 539 /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a 540 * socket created with AF_INET to an AF_INET6 address or vice versa. */ 541 err = UV_EINVAL; 542 return err; 543 } 544 545 if (addr->sa_family == AF_INET6) 546 handle->flags |= UV_HANDLE_IPV6; 547 548 handle->flags |= UV_HANDLE_BOUND; 549 return 0; 550} 551 552 553static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, 554 int domain, 555 unsigned int flags) { 556 union uv__sockaddr taddr; 557 socklen_t addrlen; 558 559 if (handle->io_watcher.fd != -1) 560 return 0; 561 562 switch (domain) { 563 case AF_INET: 564 { 565 struct sockaddr_in* addr = &taddr.in; 566 memset(addr, 0, sizeof *addr); 567 addr->sin_family = AF_INET; 568 addr->sin_addr.s_addr = INADDR_ANY; 569 addrlen = sizeof *addr; 570 break; 571 } 572 case AF_INET6: 573 { 574 struct sockaddr_in6* addr = &taddr.in6; 575 memset(addr, 0, sizeof *addr); 576 addr->sin6_family = AF_INET6; 577 addr->sin6_addr = in6addr_any; 578 addrlen = sizeof *addr; 579 break; 580 } 581 default: 582 assert(0 && "unsupported address family"); 583 abort(); 584 } 585 586 return uv__udp_bind(handle, &taddr.addr, addrlen, flags); 587} 588 589 590int uv__udp_connect(uv_udp_t* handle, 591 const struct sockaddr* addr, 592 unsigned int addrlen) { 593 int err; 594 595 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0); 596 if (err) 597 return err; 598 599 do { 600 errno = 0; 601 err = connect(handle->io_watcher.fd, addr, addrlen); 602 } while (err == -1 && errno == EINTR); 603 604 if (err) 605 return UV__ERR(errno); 606 607 handle->flags |= UV_HANDLE_UDP_CONNECTED; 608 609 return 0; 610} 611 612/* From https://pubs.opengroup.org/onlinepubs/9699919799/functions/connect.html 613 * Any of uv supported UNIXs kernel should be standardized, but the kernel 614 * implementation logic not same, let's use pseudocode to explain the udp 615 * disconnect behaviors: 616 * 617 * Predefined stubs for pseudocode: 618 * 1. sodisconnect: The function to perform the real udp disconnect 619 * 2. pru_connect: The function to perform the real udp connect 620 * 3. so: The kernel object match with socket fd 621 * 4. addr: The sockaddr parameter from user space 622 * 623 * BSDs: 624 * if(sodisconnect(so) == 0) { // udp disconnect succeed 625 * if (addr->sa_len != so->addr->sa_len) return EINVAL; 626 * if (addr->sa_family != so->addr->sa_family) return EAFNOSUPPORT; 627 * pru_connect(so); 628 * } 629 * else return EISCONN; 630 * 631 * z/OS (same with Windows): 632 * if(addr->sa_len < so->addr->sa_len) return EINVAL; 633 * if (addr->sa_family == AF_UNSPEC) sodisconnect(so); 634 * 635 * AIX: 636 * if(addr->sa_len != sizeof(struct sockaddr)) return EINVAL; // ignore ip proto version 637 * if (addr->sa_family == AF_UNSPEC) sodisconnect(so); 638 * 639 * Linux,Others: 640 * if(addr->sa_len < sizeof(struct sockaddr)) return EINVAL; 641 * if (addr->sa_family == AF_UNSPEC) sodisconnect(so); 642 */ 643int uv__udp_disconnect(uv_udp_t* handle) { 644 int r; 645#if defined(__MVS__) 646 struct sockaddr_storage addr; 647#else 648 struct sockaddr addr; 649#endif 650 651 memset(&addr, 0, sizeof(addr)); 652 653#if defined(__MVS__) 654 addr.ss_family = AF_UNSPEC; 655#else 656 addr.sa_family = AF_UNSPEC; 657#endif 658 659 do { 660 errno = 0; 661#ifdef __PASE__ 662 /* On IBMi a connectionless transport socket can be disconnected by 663 * either setting the addr parameter to NULL or setting the 664 * addr_length parameter to zero, and issuing another connect(). 665 * https://www.ibm.com/docs/en/i/7.4?topic=ssw_ibm_i_74/apis/connec.htm 666 */ 667 r = connect(handle->io_watcher.fd, (struct sockaddr*) NULL, 0); 668#else 669 r = connect(handle->io_watcher.fd, (struct sockaddr*) &addr, sizeof(addr)); 670#endif 671 } while (r == -1 && errno == EINTR); 672 673 if (r == -1) { 674#if defined(BSD) /* The macro BSD is from sys/param.h */ 675 if (errno != EAFNOSUPPORT && errno != EINVAL) 676 return UV__ERR(errno); 677#else 678 return UV__ERR(errno); 679#endif 680 } 681 682 handle->flags &= ~UV_HANDLE_UDP_CONNECTED; 683 return 0; 684} 685 686int uv__udp_send(uv_udp_send_t* req, 687 uv_udp_t* handle, 688 const uv_buf_t bufs[], 689 unsigned int nbufs, 690 const struct sockaddr* addr, 691 unsigned int addrlen, 692 uv_udp_send_cb send_cb) { 693 int err; 694 int empty_queue; 695 696 assert(nbufs > 0); 697 698 if (addr) { 699 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0); 700 if (err) 701 return err; 702 } 703 704 /* It's legal for send_queue_count > 0 even when the write_queue is empty; 705 * it means there are error-state requests in the write_completed_queue that 706 * will touch up send_queue_size/count later. 707 */ 708 empty_queue = (handle->send_queue_count == 0); 709 710 uv__req_init(handle->loop, req, UV_UDP_SEND); 711 assert(addrlen <= sizeof(req->addr)); 712 if (addr == NULL) 713 req->addr.ss_family = AF_UNSPEC; 714 else 715 memcpy(&req->addr, addr, addrlen); 716 req->send_cb = send_cb; 717 req->handle = handle; 718 req->nbufs = nbufs; 719 720 req->bufs = req->bufsml; 721 if (nbufs > ARRAY_SIZE(req->bufsml)) 722 req->bufs = uv__malloc(nbufs * sizeof(bufs[0])); 723 724 if (req->bufs == NULL) { 725 uv__req_unregister(handle->loop, req); 726 return UV_ENOMEM; 727 } 728 729 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0])); 730 handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs); 731 handle->send_queue_count++; 732 uv__queue_insert_tail(&handle->write_queue, &req->queue); 733 uv__handle_start(handle); 734 735 if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) { 736 uv__udp_sendmsg(handle); 737 738 /* `uv__udp_sendmsg` may not be able to do non-blocking write straight 739 * away. In such cases the `io_watcher` has to be queued for asynchronous 740 * write. 741 */ 742 if (!uv__queue_empty(&handle->write_queue)) 743 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT); 744 } else { 745 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT); 746 } 747 748 return 0; 749} 750 751 752int uv__udp_try_send(uv_udp_t* handle, 753 const uv_buf_t bufs[], 754 unsigned int nbufs, 755 const struct sockaddr* addr, 756 unsigned int addrlen) { 757 int err; 758 struct msghdr h; 759 ssize_t size; 760 761 assert(nbufs > 0); 762 763 /* already sending a message */ 764 if (handle->send_queue_count != 0) 765 return UV_EAGAIN; 766 767 if (addr) { 768 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0); 769 if (err) 770 return err; 771 } else { 772 assert(handle->flags & UV_HANDLE_UDP_CONNECTED); 773 } 774 775 memset(&h, 0, sizeof h); 776 h.msg_name = (struct sockaddr*) addr; 777 h.msg_namelen = addrlen; 778 h.msg_iov = (struct iovec*) bufs; 779 h.msg_iovlen = nbufs; 780 781 do { 782 size = sendmsg(handle->io_watcher.fd, &h, 0); 783 } while (size == -1 && errno == EINTR); 784 785 if (size == -1) { 786 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) 787 return UV_EAGAIN; 788 else 789 return UV__ERR(errno); 790 } 791 792 return size; 793} 794 795 796static int uv__udp_set_membership4(uv_udp_t* handle, 797 const struct sockaddr_in* multicast_addr, 798 const char* interface_addr, 799 uv_membership membership) { 800 struct ip_mreq mreq; 801 int optname; 802 int err; 803 804 memset(&mreq, 0, sizeof mreq); 805 806 if (interface_addr) { 807 err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr); 808 if (err) 809 return err; 810 } else { 811 mreq.imr_interface.s_addr = htonl(INADDR_ANY); 812 } 813 814 mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr; 815 816 switch (membership) { 817 case UV_JOIN_GROUP: 818 optname = IP_ADD_MEMBERSHIP; 819 break; 820 case UV_LEAVE_GROUP: 821 optname = IP_DROP_MEMBERSHIP; 822 break; 823 default: 824 return UV_EINVAL; 825 } 826 827 if (setsockopt(handle->io_watcher.fd, 828 IPPROTO_IP, 829 optname, 830 &mreq, 831 sizeof(mreq))) { 832#if defined(__MVS__) 833 if (errno == ENXIO) 834 return UV_ENODEV; 835#endif 836 return UV__ERR(errno); 837 } 838 839 return 0; 840} 841 842 843static int uv__udp_set_membership6(uv_udp_t* handle, 844 const struct sockaddr_in6* multicast_addr, 845 const char* interface_addr, 846 uv_membership membership) { 847 int optname; 848 struct ipv6_mreq mreq; 849 struct sockaddr_in6 addr6; 850 851 memset(&mreq, 0, sizeof mreq); 852 853 if (interface_addr) { 854 if (uv_ip6_addr(interface_addr, 0, &addr6)) 855 return UV_EINVAL; 856 mreq.ipv6mr_interface = addr6.sin6_scope_id; 857 } else { 858 mreq.ipv6mr_interface = 0; 859 } 860 861 mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr; 862 863 switch (membership) { 864 case UV_JOIN_GROUP: 865 optname = IPV6_ADD_MEMBERSHIP; 866 break; 867 case UV_LEAVE_GROUP: 868 optname = IPV6_DROP_MEMBERSHIP; 869 break; 870 default: 871 return UV_EINVAL; 872 } 873 874 if (setsockopt(handle->io_watcher.fd, 875 IPPROTO_IPV6, 876 optname, 877 &mreq, 878 sizeof(mreq))) { 879#if defined(__MVS__) 880 if (errno == ENXIO) 881 return UV_ENODEV; 882#endif 883 return UV__ERR(errno); 884 } 885 886 return 0; 887} 888 889 890#if !defined(__OpenBSD__) && \ 891 !defined(__NetBSD__) && \ 892 !defined(__ANDROID__) && \ 893 !defined(__DragonFly__) && \ 894 !defined(__QNX__) && \ 895 !defined(__GNU__) 896static int uv__udp_set_source_membership4(uv_udp_t* handle, 897 const struct sockaddr_in* multicast_addr, 898 const char* interface_addr, 899 const struct sockaddr_in* source_addr, 900 uv_membership membership) { 901 struct ip_mreq_source mreq; 902 int optname; 903 int err; 904 905 err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR); 906 if (err) 907 return err; 908 909 memset(&mreq, 0, sizeof(mreq)); 910 911 if (interface_addr != NULL) { 912 err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr); 913 if (err) 914 return err; 915 } else { 916 mreq.imr_interface.s_addr = htonl(INADDR_ANY); 917 } 918 919 mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr; 920 mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr; 921 922 if (membership == UV_JOIN_GROUP) 923 optname = IP_ADD_SOURCE_MEMBERSHIP; 924 else if (membership == UV_LEAVE_GROUP) 925 optname = IP_DROP_SOURCE_MEMBERSHIP; 926 else 927 return UV_EINVAL; 928 929 if (setsockopt(handle->io_watcher.fd, 930 IPPROTO_IP, 931 optname, 932 &mreq, 933 sizeof(mreq))) { 934 return UV__ERR(errno); 935 } 936 937 return 0; 938} 939 940 941static int uv__udp_set_source_membership6(uv_udp_t* handle, 942 const struct sockaddr_in6* multicast_addr, 943 const char* interface_addr, 944 const struct sockaddr_in6* source_addr, 945 uv_membership membership) { 946 struct group_source_req mreq; 947 struct sockaddr_in6 addr6; 948 int optname; 949 int err; 950 951 err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR); 952 if (err) 953 return err; 954 955 memset(&mreq, 0, sizeof(mreq)); 956 957 if (interface_addr != NULL) { 958 err = uv_ip6_addr(interface_addr, 0, &addr6); 959 if (err) 960 return err; 961 mreq.gsr_interface = addr6.sin6_scope_id; 962 } else { 963 mreq.gsr_interface = 0; 964 } 965 966 STATIC_ASSERT(sizeof(mreq.gsr_group) >= sizeof(*multicast_addr)); 967 STATIC_ASSERT(sizeof(mreq.gsr_source) >= sizeof(*source_addr)); 968 memcpy(&mreq.gsr_group, multicast_addr, sizeof(*multicast_addr)); 969 memcpy(&mreq.gsr_source, source_addr, sizeof(*source_addr)); 970 971 if (membership == UV_JOIN_GROUP) 972 optname = MCAST_JOIN_SOURCE_GROUP; 973 else if (membership == UV_LEAVE_GROUP) 974 optname = MCAST_LEAVE_SOURCE_GROUP; 975 else 976 return UV_EINVAL; 977 978 if (setsockopt(handle->io_watcher.fd, 979 IPPROTO_IPV6, 980 optname, 981 &mreq, 982 sizeof(mreq))) { 983 return UV__ERR(errno); 984 } 985 986 return 0; 987} 988#endif 989 990 991int uv__udp_init_ex(uv_loop_t* loop, 992 uv_udp_t* handle, 993 unsigned flags, 994 int domain) { 995 int fd; 996 997 fd = -1; 998 if (domain != AF_UNSPEC) { 999 fd = uv__socket(domain, SOCK_DGRAM, 0); 1000 if (fd < 0) 1001 return fd; 1002 } 1003 1004 uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP); 1005 handle->alloc_cb = NULL; 1006 handle->recv_cb = NULL; 1007 handle->send_queue_size = 0; 1008 handle->send_queue_count = 0; 1009 uv__io_init(&handle->io_watcher, uv__udp_io, fd); 1010 uv__queue_init(&handle->write_queue); 1011 uv__queue_init(&handle->write_completed_queue); 1012 1013 return 0; 1014} 1015 1016 1017int uv_udp_using_recvmmsg(const uv_udp_t* handle) { 1018#if defined(__linux__) || defined(__FreeBSD__) 1019 if (handle->flags & UV_HANDLE_UDP_RECVMMSG) 1020 return 1; 1021#endif 1022 return 0; 1023} 1024 1025 1026int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) { 1027 int err; 1028 1029 /* Check for already active socket. */ 1030 if (handle->io_watcher.fd != -1) 1031 return UV_EBUSY; 1032 1033 if (uv__fd_exists(handle->loop, sock)) 1034 return UV_EEXIST; 1035 1036 err = uv__nonblock(sock, 1); 1037 if (err) 1038 return err; 1039 1040 err = uv__set_reuse(sock); 1041 if (err) 1042 return err; 1043 1044 handle->io_watcher.fd = sock; 1045 if (uv__udp_is_connected(handle)) 1046 handle->flags |= UV_HANDLE_UDP_CONNECTED; 1047 1048 return 0; 1049} 1050 1051 1052int uv_udp_set_membership(uv_udp_t* handle, 1053 const char* multicast_addr, 1054 const char* interface_addr, 1055 uv_membership membership) { 1056 int err; 1057 struct sockaddr_in addr4; 1058 struct sockaddr_in6 addr6; 1059 1060 if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) { 1061 err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR); 1062 if (err) 1063 return err; 1064 return uv__udp_set_membership4(handle, &addr4, interface_addr, membership); 1065 } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) { 1066 err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR); 1067 if (err) 1068 return err; 1069 return uv__udp_set_membership6(handle, &addr6, interface_addr, membership); 1070 } else { 1071 return UV_EINVAL; 1072 } 1073} 1074 1075 1076int uv_udp_set_source_membership(uv_udp_t* handle, 1077 const char* multicast_addr, 1078 const char* interface_addr, 1079 const char* source_addr, 1080 uv_membership membership) { 1081#if !defined(__OpenBSD__) && \ 1082 !defined(__NetBSD__) && \ 1083 !defined(__ANDROID__) && \ 1084 !defined(__DragonFly__) && \ 1085 !defined(__QNX__) && \ 1086 !defined(__GNU__) 1087 int err; 1088 union uv__sockaddr mcast_addr; 1089 union uv__sockaddr src_addr; 1090 1091 err = uv_ip4_addr(multicast_addr, 0, &mcast_addr.in); 1092 if (err) { 1093 err = uv_ip6_addr(multicast_addr, 0, &mcast_addr.in6); 1094 if (err) 1095 return err; 1096 err = uv_ip6_addr(source_addr, 0, &src_addr.in6); 1097 if (err) 1098 return err; 1099 return uv__udp_set_source_membership6(handle, 1100 &mcast_addr.in6, 1101 interface_addr, 1102 &src_addr.in6, 1103 membership); 1104 } 1105 1106 err = uv_ip4_addr(source_addr, 0, &src_addr.in); 1107 if (err) 1108 return err; 1109 return uv__udp_set_source_membership4(handle, 1110 &mcast_addr.in, 1111 interface_addr, 1112 &src_addr.in, 1113 membership); 1114#else 1115 return UV_ENOSYS; 1116#endif 1117} 1118 1119 1120static int uv__setsockopt(uv_udp_t* handle, 1121 int option4, 1122 int option6, 1123 const void* val, 1124 socklen_t size) { 1125 int r; 1126 1127 if (handle->flags & UV_HANDLE_IPV6) 1128 r = setsockopt(handle->io_watcher.fd, 1129 IPPROTO_IPV6, 1130 option6, 1131 val, 1132 size); 1133 else 1134 r = setsockopt(handle->io_watcher.fd, 1135 IPPROTO_IP, 1136 option4, 1137 val, 1138 size); 1139 if (r) 1140 return UV__ERR(errno); 1141 1142 return 0; 1143} 1144 1145static int uv__setsockopt_maybe_char(uv_udp_t* handle, 1146 int option4, 1147 int option6, 1148 int val) { 1149#if defined(__sun) || defined(_AIX) || defined(__MVS__) 1150 char arg = val; 1151#elif defined(__OpenBSD__) 1152 unsigned char arg = val; 1153#else 1154 int arg = val; 1155#endif 1156 1157 if (val < 0 || val > 255) 1158 return UV_EINVAL; 1159 1160 return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg)); 1161} 1162 1163 1164int uv_udp_set_broadcast(uv_udp_t* handle, int on) { 1165 if (setsockopt(handle->io_watcher.fd, 1166 SOL_SOCKET, 1167 SO_BROADCAST, 1168 &on, 1169 sizeof(on))) { 1170 return UV__ERR(errno); 1171 } 1172 1173 return 0; 1174} 1175 1176 1177int uv_udp_set_ttl(uv_udp_t* handle, int ttl) { 1178 if (ttl < 1 || ttl > 255) 1179 return UV_EINVAL; 1180 1181#if defined(__MVS__) 1182 if (!(handle->flags & UV_HANDLE_IPV6)) 1183 return UV_ENOTSUP; /* zOS does not support setting ttl for IPv4 */ 1184#endif 1185 1186/* 1187 * On Solaris and derivatives such as SmartOS, the length of socket options 1188 * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS, 1189 * so hardcode the size of these options on this platform, 1190 * and use the general uv__setsockopt_maybe_char call on other platforms. 1191 */ 1192#if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ 1193 defined(__MVS__) || defined(__QNX__) 1194 1195 return uv__setsockopt(handle, 1196 IP_TTL, 1197 IPV6_UNICAST_HOPS, 1198 &ttl, 1199 sizeof(ttl)); 1200 1201#else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) || 1202 defined(__MVS__) || defined(__QNX__)) */ 1203 1204 return uv__setsockopt_maybe_char(handle, 1205 IP_TTL, 1206 IPV6_UNICAST_HOPS, 1207 ttl); 1208 1209#endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) || 1210 defined(__MVS__) || defined(__QNX__) */ 1211} 1212 1213 1214int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) { 1215/* 1216 * On Solaris and derivatives such as SmartOS, the length of socket options 1217 * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for 1218 * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case, 1219 * and use the general uv__setsockopt_maybe_char call otherwise. 1220 */ 1221#if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ 1222 defined(__MVS__) || defined(__QNX__) 1223 if (handle->flags & UV_HANDLE_IPV6) 1224 return uv__setsockopt(handle, 1225 IP_MULTICAST_TTL, 1226 IPV6_MULTICAST_HOPS, 1227 &ttl, 1228 sizeof(ttl)); 1229#endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ 1230 defined(__MVS__) || defined(__QNX__) */ 1231 1232 return uv__setsockopt_maybe_char(handle, 1233 IP_MULTICAST_TTL, 1234 IPV6_MULTICAST_HOPS, 1235 ttl); 1236} 1237 1238 1239int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) { 1240/* 1241 * On Solaris and derivatives such as SmartOS, the length of socket options 1242 * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for 1243 * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case, 1244 * and use the general uv__setsockopt_maybe_char call otherwise. 1245 */ 1246#if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ 1247 defined(__MVS__) || defined(__QNX__) 1248 if (handle->flags & UV_HANDLE_IPV6) 1249 return uv__setsockopt(handle, 1250 IP_MULTICAST_LOOP, 1251 IPV6_MULTICAST_LOOP, 1252 &on, 1253 sizeof(on)); 1254#endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) || 1255 defined(__MVS__) || defined(__QNX__) */ 1256 1257 return uv__setsockopt_maybe_char(handle, 1258 IP_MULTICAST_LOOP, 1259 IPV6_MULTICAST_LOOP, 1260 on); 1261} 1262 1263int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) { 1264 struct sockaddr_storage addr_st; 1265 struct sockaddr_in* addr4; 1266 struct sockaddr_in6* addr6; 1267 1268 addr4 = (struct sockaddr_in*) &addr_st; 1269 addr6 = (struct sockaddr_in6*) &addr_st; 1270 1271 if (!interface_addr) { 1272 memset(&addr_st, 0, sizeof addr_st); 1273 if (handle->flags & UV_HANDLE_IPV6) { 1274 addr_st.ss_family = AF_INET6; 1275 addr6->sin6_scope_id = 0; 1276 } else { 1277 addr_st.ss_family = AF_INET; 1278 addr4->sin_addr.s_addr = htonl(INADDR_ANY); 1279 } 1280 } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) { 1281 /* nothing, address was parsed */ 1282 } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) { 1283 /* nothing, address was parsed */ 1284 } else { 1285 return UV_EINVAL; 1286 } 1287 1288 if (addr_st.ss_family == AF_INET) { 1289 if (setsockopt(handle->io_watcher.fd, 1290 IPPROTO_IP, 1291 IP_MULTICAST_IF, 1292 (void*) &addr4->sin_addr, 1293 sizeof(addr4->sin_addr)) == -1) { 1294 return UV__ERR(errno); 1295 } 1296 } else if (addr_st.ss_family == AF_INET6) { 1297 if (setsockopt(handle->io_watcher.fd, 1298 IPPROTO_IPV6, 1299 IPV6_MULTICAST_IF, 1300 &addr6->sin6_scope_id, 1301 sizeof(addr6->sin6_scope_id)) == -1) { 1302 return UV__ERR(errno); 1303 } 1304 } else { 1305 assert(0 && "unexpected address family"); 1306 abort(); 1307 } 1308 1309 return 0; 1310} 1311 1312int uv_udp_getpeername(const uv_udp_t* handle, 1313 struct sockaddr* name, 1314 int* namelen) { 1315 1316 return uv__getsockpeername((const uv_handle_t*) handle, 1317 getpeername, 1318 name, 1319 namelen); 1320} 1321 1322int uv_udp_getsockname(const uv_udp_t* handle, 1323 struct sockaddr* name, 1324 int* namelen) { 1325 1326 return uv__getsockpeername((const uv_handle_t*) handle, 1327 getsockname, 1328 name, 1329 namelen); 1330} 1331 1332 1333int uv__udp_recv_start(uv_udp_t* handle, 1334 uv_alloc_cb alloc_cb, 1335 uv_udp_recv_cb recv_cb) { 1336 int err; 1337 1338 if (alloc_cb == NULL || recv_cb == NULL) 1339 return UV_EINVAL; 1340 1341 if (uv__io_active(&handle->io_watcher, POLLIN)) 1342 return UV_EALREADY; /* FIXME(bnoordhuis) Should be UV_EBUSY. */ 1343 1344 err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0); 1345 if (err) 1346 return err; 1347 1348 handle->alloc_cb = alloc_cb; 1349 handle->recv_cb = recv_cb; 1350 1351 uv__io_start(handle->loop, &handle->io_watcher, POLLIN); 1352 uv__handle_start(handle); 1353 1354 return 0; 1355} 1356 1357 1358int uv__udp_recv_stop(uv_udp_t* handle) { 1359 uv__io_stop(handle->loop, &handle->io_watcher, POLLIN); 1360 1361 if (!uv__io_active(&handle->io_watcher, POLLOUT)) 1362 uv__handle_stop(handle); 1363 1364 handle->alloc_cb = NULL; 1365 handle->recv_cb = NULL; 1366 1367 return 0; 1368} 1369