xref: /third_party/libuv/src/unix/udp.c (revision e66f31c5)
1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22#include "uv.h"
23#include "internal.h"
24
25#include <assert.h>
26#include <string.h>
27#include <errno.h>
28#include <stdlib.h>
29#include <unistd.h>
30#if defined(__MVS__)
31#include <xti.h>
32#endif
33#include <sys/un.h>
34
35#if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP)
36# define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
37#endif
38
39#if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP)
40# define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
41#endif
42
43static void uv__udp_run_completed(uv_udp_t* handle);
44static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
45static void uv__udp_recvmsg(uv_udp_t* handle);
46static void uv__udp_sendmsg(uv_udp_t* handle);
47static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
48                                       int domain,
49                                       unsigned int flags);
50
51
52void uv__udp_close(uv_udp_t* handle) {
53  uv__io_close(handle->loop, &handle->io_watcher);
54  uv__handle_stop(handle);
55
56  if (handle->io_watcher.fd != -1) {
57    uv__close(handle->io_watcher.fd);
58    handle->io_watcher.fd = -1;
59  }
60}
61
62
63void uv__udp_finish_close(uv_udp_t* handle) {
64  uv_udp_send_t* req;
65  struct uv__queue* q;
66
67  assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
68  assert(handle->io_watcher.fd == -1);
69
70  while (!uv__queue_empty(&handle->write_queue)) {
71    q = uv__queue_head(&handle->write_queue);
72    uv__queue_remove(q);
73
74    req = uv__queue_data(q, uv_udp_send_t, queue);
75    req->status = UV_ECANCELED;
76    uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
77  }
78
79  uv__udp_run_completed(handle);
80
81  assert(handle->send_queue_size == 0);
82  assert(handle->send_queue_count == 0);
83
84  /* Now tear down the handle. */
85  handle->recv_cb = NULL;
86  handle->alloc_cb = NULL;
87  /* but _do not_ touch close_cb */
88}
89
90
91static void uv__udp_run_completed(uv_udp_t* handle) {
92  uv_udp_send_t* req;
93  struct uv__queue* q;
94
95  assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING));
96  handle->flags |= UV_HANDLE_UDP_PROCESSING;
97
98  while (!uv__queue_empty(&handle->write_completed_queue)) {
99    q = uv__queue_head(&handle->write_completed_queue);
100    uv__queue_remove(q);
101
102    req = uv__queue_data(q, uv_udp_send_t, queue);
103    uv__req_unregister(handle->loop, req);
104
105    handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
106    handle->send_queue_count--;
107
108    if (req->bufs != req->bufsml)
109      uv__free(req->bufs);
110    req->bufs = NULL;
111
112    if (req->send_cb == NULL)
113      continue;
114
115    /* req->status >= 0 == bytes written
116     * req->status <  0 == errno
117     */
118    if (req->status >= 0)
119      req->send_cb(req, 0);
120    else
121      req->send_cb(req, req->status);
122  }
123
124  if (uv__queue_empty(&handle->write_queue)) {
125    /* Pending queue and completion queue empty, stop watcher. */
126    uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT);
127    if (!uv__io_active(&handle->io_watcher, POLLIN))
128      uv__handle_stop(handle);
129  }
130
131  handle->flags &= ~UV_HANDLE_UDP_PROCESSING;
132}
133
134
135static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
136  uv_udp_t* handle;
137
138  handle = container_of(w, uv_udp_t, io_watcher);
139  assert(handle->type == UV_UDP);
140
141  if (revents & POLLIN)
142    uv__udp_recvmsg(handle);
143
144  if (revents & POLLOUT) {
145    uv__udp_sendmsg(handle);
146    uv__udp_run_completed(handle);
147  }
148}
149
150static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) {
151#if defined(__linux__) || defined(__FreeBSD__)
152  struct sockaddr_in6 peers[20];
153  struct iovec iov[ARRAY_SIZE(peers)];
154  struct mmsghdr msgs[ARRAY_SIZE(peers)];
155  ssize_t nread;
156  uv_buf_t chunk_buf;
157  size_t chunks;
158  int flags;
159  size_t k;
160
161  /* prepare structures for recvmmsg */
162  chunks = buf->len / UV__UDP_DGRAM_MAXSIZE;
163  if (chunks > ARRAY_SIZE(iov))
164    chunks = ARRAY_SIZE(iov);
165  for (k = 0; k < chunks; ++k) {
166    iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE;
167    iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE;
168    memset(&msgs[k].msg_hdr, 0, sizeof(msgs[k].msg_hdr));
169    msgs[k].msg_hdr.msg_iov = iov + k;
170    msgs[k].msg_hdr.msg_iovlen = 1;
171    msgs[k].msg_hdr.msg_name = peers + k;
172    msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]);
173    msgs[k].msg_hdr.msg_control = NULL;
174    msgs[k].msg_hdr.msg_controllen = 0;
175    msgs[k].msg_hdr.msg_flags = 0;
176  }
177
178  do
179    nread = recvmmsg(handle->io_watcher.fd, msgs, chunks, 0, NULL);
180  while (nread == -1 && errno == EINTR);
181
182  if (nread < 1) {
183    if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
184      handle->recv_cb(handle, 0, buf, NULL, 0);
185    else
186      handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0);
187  } else {
188    /* pass each chunk to the application */
189    for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) {
190      flags = UV_UDP_MMSG_CHUNK;
191      if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC)
192        flags |= UV_UDP_PARTIAL;
193
194      chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len);
195      handle->recv_cb(handle,
196                      msgs[k].msg_len,
197                      &chunk_buf,
198                      msgs[k].msg_hdr.msg_name,
199                      flags);
200    }
201
202    /* one last callback so the original buffer is freed */
203    if (handle->recv_cb != NULL)
204      handle->recv_cb(handle, 0, buf, NULL, UV_UDP_MMSG_FREE);
205  }
206  return nread;
207#else  /* __linux__ || ____FreeBSD__ */
208  return UV_ENOSYS;
209#endif  /* __linux__ || ____FreeBSD__ */
210}
211
212static void uv__udp_recvmsg(uv_udp_t* handle) {
213  struct sockaddr_storage peer;
214  struct msghdr h;
215  ssize_t nread;
216  uv_buf_t buf;
217  int flags;
218  int count;
219
220  assert(handle->recv_cb != NULL);
221  assert(handle->alloc_cb != NULL);
222
223  /* Prevent loop starvation when the data comes in as fast as (or faster than)
224   * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
225   */
226  count = 32;
227
228  do {
229    buf = uv_buf_init(NULL, 0);
230    handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
231    if (buf.base == NULL || buf.len == 0) {
232      handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
233      return;
234    }
235    assert(buf.base != NULL);
236
237    if (uv_udp_using_recvmmsg(handle)) {
238      nread = uv__udp_recvmmsg(handle, &buf);
239      if (nread > 0)
240        count -= nread;
241      continue;
242    }
243
244    memset(&h, 0, sizeof(h));
245    memset(&peer, 0, sizeof(peer));
246    h.msg_name = &peer;
247    h.msg_namelen = sizeof(peer);
248    h.msg_iov = (void*) &buf;
249    h.msg_iovlen = 1;
250
251    do {
252      nread = recvmsg(handle->io_watcher.fd, &h, 0);
253    }
254    while (nread == -1 && errno == EINTR);
255
256    if (nread == -1) {
257      if (errno == EAGAIN || errno == EWOULDBLOCK)
258        handle->recv_cb(handle, 0, &buf, NULL, 0);
259      else
260        handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0);
261    }
262    else {
263      flags = 0;
264      if (h.msg_flags & MSG_TRUNC)
265        flags |= UV_UDP_PARTIAL;
266
267      handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags);
268    }
269    count--;
270  }
271  /* recv_cb callback may decide to pause or close the handle */
272  while (nread != -1
273      && count > 0
274      && handle->io_watcher.fd != -1
275      && handle->recv_cb != NULL);
276}
277
278static void uv__udp_sendmsg(uv_udp_t* handle) {
279#if defined(__linux__) || defined(__FreeBSD__)
280  uv_udp_send_t* req;
281  struct mmsghdr h[20];
282  struct mmsghdr* p;
283  struct uv__queue* q;
284  ssize_t npkts;
285  size_t pkts;
286  size_t i;
287
288  if (uv__queue_empty(&handle->write_queue))
289    return;
290
291write_queue_drain:
292  for (pkts = 0, q = uv__queue_head(&handle->write_queue);
293       pkts < ARRAY_SIZE(h) && q != &handle->write_queue;
294       ++pkts, q = uv__queue_head(q)) {
295    assert(q != NULL);
296    req = uv__queue_data(q, uv_udp_send_t, queue);
297    assert(req != NULL);
298
299    p = &h[pkts];
300    memset(p, 0, sizeof(*p));
301    if (req->addr.ss_family == AF_UNSPEC) {
302      p->msg_hdr.msg_name = NULL;
303      p->msg_hdr.msg_namelen = 0;
304    } else {
305      p->msg_hdr.msg_name = &req->addr;
306      if (req->addr.ss_family == AF_INET6)
307        p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in6);
308      else if (req->addr.ss_family == AF_INET)
309        p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in);
310      else if (req->addr.ss_family == AF_UNIX)
311        p->msg_hdr.msg_namelen = sizeof(struct sockaddr_un);
312      else {
313        assert(0 && "unsupported address family");
314        abort();
315      }
316    }
317    h[pkts].msg_hdr.msg_iov = (struct iovec*) req->bufs;
318    h[pkts].msg_hdr.msg_iovlen = req->nbufs;
319  }
320
321  do
322    npkts = sendmmsg(handle->io_watcher.fd, h, pkts, 0);
323  while (npkts == -1 && errno == EINTR);
324
325  if (npkts < 1) {
326    if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
327      return;
328    for (i = 0, q = uv__queue_head(&handle->write_queue);
329         i < pkts && q != &handle->write_queue;
330         ++i, q = uv__queue_head(&handle->write_queue)) {
331      assert(q != NULL);
332      req = uv__queue_data(q, uv_udp_send_t, queue);
333      assert(req != NULL);
334
335      req->status = UV__ERR(errno);
336      uv__queue_remove(&req->queue);
337      uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
338    }
339    uv__io_feed(handle->loop, &handle->io_watcher);
340    return;
341  }
342
343  /* Safety: npkts known to be >0 below. Hence cast from ssize_t
344   * to size_t safe.
345   */
346  for (i = 0, q = uv__queue_head(&handle->write_queue);
347       i < (size_t)npkts && q != &handle->write_queue;
348       ++i, q = uv__queue_head(&handle->write_queue)) {
349    assert(q != NULL);
350    req = uv__queue_data(q, uv_udp_send_t, queue);
351    assert(req != NULL);
352
353    req->status = req->bufs[0].len;
354
355    /* Sending a datagram is an atomic operation: either all data
356     * is written or nothing is (and EMSGSIZE is raised). That is
357     * why we don't handle partial writes. Just pop the request
358     * off the write queue and onto the completed queue, done.
359     */
360    uv__queue_remove(&req->queue);
361    uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
362  }
363
364  /* couldn't batch everything, continue sending (jump to avoid stack growth) */
365  if (!uv__queue_empty(&handle->write_queue))
366    goto write_queue_drain;
367  uv__io_feed(handle->loop, &handle->io_watcher);
368#else  /* __linux__ || ____FreeBSD__ */
369  uv_udp_send_t* req;
370  struct msghdr h;
371  struct uv__queue* q;
372  ssize_t size;
373
374  while (!uv__queue_empty(&handle->write_queue)) {
375    q = uv__queue_head(&handle->write_queue);
376    assert(q != NULL);
377
378    req = uv__queue_data(q, uv_udp_send_t, queue);
379    assert(req != NULL);
380
381    memset(&h, 0, sizeof h);
382    if (req->addr.ss_family == AF_UNSPEC) {
383      h.msg_name = NULL;
384      h.msg_namelen = 0;
385    } else {
386      h.msg_name = &req->addr;
387      if (req->addr.ss_family == AF_INET6)
388        h.msg_namelen = sizeof(struct sockaddr_in6);
389      else if (req->addr.ss_family == AF_INET)
390        h.msg_namelen = sizeof(struct sockaddr_in);
391      else if (req->addr.ss_family == AF_UNIX)
392        h.msg_namelen = sizeof(struct sockaddr_un);
393      else {
394        assert(0 && "unsupported address family");
395        abort();
396      }
397    }
398    h.msg_iov = (struct iovec*) req->bufs;
399    h.msg_iovlen = req->nbufs;
400
401    do {
402      size = sendmsg(handle->io_watcher.fd, &h, 0);
403    } while (size == -1 && errno == EINTR);
404
405    if (size == -1) {
406      if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
407        break;
408    }
409
410    req->status = (size == -1 ? UV__ERR(errno) : size);
411
412    /* Sending a datagram is an atomic operation: either all data
413     * is written or nothing is (and EMSGSIZE is raised). That is
414     * why we don't handle partial writes. Just pop the request
415     * off the write queue and onto the completed queue, done.
416     */
417    uv__queue_remove(&req->queue);
418    uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
419    uv__io_feed(handle->loop, &handle->io_watcher);
420  }
421#endif  /* __linux__ || ____FreeBSD__ */
422}
423
424/* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
425 * refinements for programs that use multicast.
426 *
427 * Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that
428 * are different from the BSDs: it _shares_ the port rather than steal it
429 * from the current listener.  While useful, it's not something we can emulate
430 * on other platforms so we don't enable it.
431 *
432 * zOS does not support getsockname with SO_REUSEPORT option when using
433 * AF_UNIX.
434 */
435static int uv__set_reuse(int fd) {
436  int yes;
437  yes = 1;
438
439#if defined(SO_REUSEPORT) && defined(__MVS__)
440  struct sockaddr_in sockfd;
441  unsigned int sockfd_len = sizeof(sockfd);
442  if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1)
443      return UV__ERR(errno);
444  if (sockfd.sin_family == AF_UNIX) {
445    if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
446      return UV__ERR(errno);
447  } else {
448    if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
449       return UV__ERR(errno);
450  }
451#elif defined(SO_REUSEPORT) && !defined(__linux__) && !defined(__GNU__) && \
452	!defined(__sun__)
453  if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
454    return UV__ERR(errno);
455#else
456  if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
457    return UV__ERR(errno);
458#endif
459
460  return 0;
461}
462
463/*
464 * The Linux kernel suppresses some ICMP error messages by default for UDP
465 * sockets. Setting IP_RECVERR/IPV6_RECVERR on the socket enables full ICMP
466 * error reporting, hopefully resulting in faster failover to working name
467 * servers.
468 */
469static int uv__set_recverr(int fd, sa_family_t ss_family) {
470#if defined(__linux__)
471  int yes;
472
473  yes = 1;
474  if (ss_family == AF_INET) {
475    if (setsockopt(fd, IPPROTO_IP, IP_RECVERR, &yes, sizeof(yes)))
476      return UV__ERR(errno);
477  } else if (ss_family == AF_INET6) {
478    if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVERR, &yes, sizeof(yes)))
479       return UV__ERR(errno);
480  }
481#endif
482  return 0;
483}
484
485
486int uv__udp_bind(uv_udp_t* handle,
487                 const struct sockaddr* addr,
488                 unsigned int addrlen,
489                 unsigned int flags) {
490  int err;
491  int yes;
492  int fd;
493
494  /* Check for bad flags. */
495  if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR | UV_UDP_LINUX_RECVERR))
496    return UV_EINVAL;
497
498  /* Cannot set IPv6-only mode on non-IPv6 socket. */
499  if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6)
500    return UV_EINVAL;
501
502  fd = handle->io_watcher.fd;
503  if (fd == -1) {
504    err = uv__socket(addr->sa_family, SOCK_DGRAM, 0);
505    if (err < 0)
506      return err;
507    fd = err;
508    handle->io_watcher.fd = fd;
509  }
510
511  if (flags & UV_UDP_LINUX_RECVERR) {
512    err = uv__set_recverr(fd, addr->sa_family);
513    if (err)
514      return err;
515  }
516
517  if (flags & UV_UDP_REUSEADDR) {
518    err = uv__set_reuse(fd);
519    if (err)
520      return err;
521  }
522
523  if (flags & UV_UDP_IPV6ONLY) {
524#ifdef IPV6_V6ONLY
525    yes = 1;
526    if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
527      err = UV__ERR(errno);
528      return err;
529    }
530#else
531    err = UV_ENOTSUP;
532    return err;
533#endif
534  }
535
536  if (bind(fd, addr, addrlen)) {
537    err = UV__ERR(errno);
538    if (errno == EAFNOSUPPORT)
539      /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a
540       * socket created with AF_INET to an AF_INET6 address or vice versa. */
541      err = UV_EINVAL;
542    return err;
543  }
544
545  if (addr->sa_family == AF_INET6)
546    handle->flags |= UV_HANDLE_IPV6;
547
548  handle->flags |= UV_HANDLE_BOUND;
549  return 0;
550}
551
552
553static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
554                                       int domain,
555                                       unsigned int flags) {
556  union uv__sockaddr taddr;
557  socklen_t addrlen;
558
559  if (handle->io_watcher.fd != -1)
560    return 0;
561
562  switch (domain) {
563  case AF_INET:
564  {
565    struct sockaddr_in* addr = &taddr.in;
566    memset(addr, 0, sizeof *addr);
567    addr->sin_family = AF_INET;
568    addr->sin_addr.s_addr = INADDR_ANY;
569    addrlen = sizeof *addr;
570    break;
571  }
572  case AF_INET6:
573  {
574    struct sockaddr_in6* addr = &taddr.in6;
575    memset(addr, 0, sizeof *addr);
576    addr->sin6_family = AF_INET6;
577    addr->sin6_addr = in6addr_any;
578    addrlen = sizeof *addr;
579    break;
580  }
581  default:
582    assert(0 && "unsupported address family");
583    abort();
584  }
585
586  return uv__udp_bind(handle, &taddr.addr, addrlen, flags);
587}
588
589
590int uv__udp_connect(uv_udp_t* handle,
591                    const struct sockaddr* addr,
592                    unsigned int addrlen) {
593  int err;
594
595  err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
596  if (err)
597    return err;
598
599  do {
600    errno = 0;
601    err = connect(handle->io_watcher.fd, addr, addrlen);
602  } while (err == -1 && errno == EINTR);
603
604  if (err)
605    return UV__ERR(errno);
606
607  handle->flags |= UV_HANDLE_UDP_CONNECTED;
608
609  return 0;
610}
611
612/* From https://pubs.opengroup.org/onlinepubs/9699919799/functions/connect.html
613 * Any of uv supported UNIXs kernel should be standardized, but the kernel
614 * implementation logic not same, let's use pseudocode to explain the udp
615 * disconnect behaviors:
616 *
617 * Predefined stubs for pseudocode:
618 *   1. sodisconnect: The function to perform the real udp disconnect
619 *   2. pru_connect: The function to perform the real udp connect
620 *   3. so: The kernel object match with socket fd
621 *   4. addr: The sockaddr parameter from user space
622 *
623 * BSDs:
624 *   if(sodisconnect(so) == 0) { // udp disconnect succeed
625 *     if (addr->sa_len != so->addr->sa_len) return EINVAL;
626 *     if (addr->sa_family != so->addr->sa_family) return EAFNOSUPPORT;
627 *     pru_connect(so);
628 *   }
629 *   else return EISCONN;
630 *
631 * z/OS (same with Windows):
632 *   if(addr->sa_len < so->addr->sa_len) return EINVAL;
633 *   if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
634 *
635 * AIX:
636 *   if(addr->sa_len != sizeof(struct sockaddr)) return EINVAL; // ignore ip proto version
637 *   if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
638 *
639 * Linux,Others:
640 *   if(addr->sa_len < sizeof(struct sockaddr)) return EINVAL;
641 *   if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
642 */
643int uv__udp_disconnect(uv_udp_t* handle) {
644    int r;
645#if defined(__MVS__)
646    struct sockaddr_storage addr;
647#else
648    struct sockaddr addr;
649#endif
650
651    memset(&addr, 0, sizeof(addr));
652
653#if defined(__MVS__)
654    addr.ss_family = AF_UNSPEC;
655#else
656    addr.sa_family = AF_UNSPEC;
657#endif
658
659    do {
660      errno = 0;
661#ifdef __PASE__
662      /* On IBMi a connectionless transport socket can be disconnected by
663       * either setting the addr parameter to NULL or setting the
664       * addr_length parameter to zero, and issuing another connect().
665       * https://www.ibm.com/docs/en/i/7.4?topic=ssw_ibm_i_74/apis/connec.htm
666       */
667      r = connect(handle->io_watcher.fd, (struct sockaddr*) NULL, 0);
668#else
669      r = connect(handle->io_watcher.fd, (struct sockaddr*) &addr, sizeof(addr));
670#endif
671    } while (r == -1 && errno == EINTR);
672
673    if (r == -1) {
674#if defined(BSD)  /* The macro BSD is from sys/param.h */
675      if (errno != EAFNOSUPPORT && errno != EINVAL)
676        return UV__ERR(errno);
677#else
678      return UV__ERR(errno);
679#endif
680    }
681
682    handle->flags &= ~UV_HANDLE_UDP_CONNECTED;
683    return 0;
684}
685
686int uv__udp_send(uv_udp_send_t* req,
687                 uv_udp_t* handle,
688                 const uv_buf_t bufs[],
689                 unsigned int nbufs,
690                 const struct sockaddr* addr,
691                 unsigned int addrlen,
692                 uv_udp_send_cb send_cb) {
693  int err;
694  int empty_queue;
695
696  assert(nbufs > 0);
697
698  if (addr) {
699    err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
700    if (err)
701      return err;
702  }
703
704  /* It's legal for send_queue_count > 0 even when the write_queue is empty;
705   * it means there are error-state requests in the write_completed_queue that
706   * will touch up send_queue_size/count later.
707   */
708  empty_queue = (handle->send_queue_count == 0);
709
710  uv__req_init(handle->loop, req, UV_UDP_SEND);
711  assert(addrlen <= sizeof(req->addr));
712  if (addr == NULL)
713    req->addr.ss_family = AF_UNSPEC;
714  else
715    memcpy(&req->addr, addr, addrlen);
716  req->send_cb = send_cb;
717  req->handle = handle;
718  req->nbufs = nbufs;
719
720  req->bufs = req->bufsml;
721  if (nbufs > ARRAY_SIZE(req->bufsml))
722    req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
723
724  if (req->bufs == NULL) {
725    uv__req_unregister(handle->loop, req);
726    return UV_ENOMEM;
727  }
728
729  memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
730  handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
731  handle->send_queue_count++;
732  uv__queue_insert_tail(&handle->write_queue, &req->queue);
733  uv__handle_start(handle);
734
735  if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) {
736    uv__udp_sendmsg(handle);
737
738    /* `uv__udp_sendmsg` may not be able to do non-blocking write straight
739     * away. In such cases the `io_watcher` has to be queued for asynchronous
740     * write.
741     */
742    if (!uv__queue_empty(&handle->write_queue))
743      uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
744  } else {
745    uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
746  }
747
748  return 0;
749}
750
751
752int uv__udp_try_send(uv_udp_t* handle,
753                     const uv_buf_t bufs[],
754                     unsigned int nbufs,
755                     const struct sockaddr* addr,
756                     unsigned int addrlen) {
757  int err;
758  struct msghdr h;
759  ssize_t size;
760
761  assert(nbufs > 0);
762
763  /* already sending a message */
764  if (handle->send_queue_count != 0)
765    return UV_EAGAIN;
766
767  if (addr) {
768    err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
769    if (err)
770      return err;
771  } else {
772    assert(handle->flags & UV_HANDLE_UDP_CONNECTED);
773  }
774
775  memset(&h, 0, sizeof h);
776  h.msg_name = (struct sockaddr*) addr;
777  h.msg_namelen = addrlen;
778  h.msg_iov = (struct iovec*) bufs;
779  h.msg_iovlen = nbufs;
780
781  do {
782    size = sendmsg(handle->io_watcher.fd, &h, 0);
783  } while (size == -1 && errno == EINTR);
784
785  if (size == -1) {
786    if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
787      return UV_EAGAIN;
788    else
789      return UV__ERR(errno);
790  }
791
792  return size;
793}
794
795
796static int uv__udp_set_membership4(uv_udp_t* handle,
797                                   const struct sockaddr_in* multicast_addr,
798                                   const char* interface_addr,
799                                   uv_membership membership) {
800  struct ip_mreq mreq;
801  int optname;
802  int err;
803
804  memset(&mreq, 0, sizeof mreq);
805
806  if (interface_addr) {
807    err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
808    if (err)
809      return err;
810  } else {
811    mreq.imr_interface.s_addr = htonl(INADDR_ANY);
812  }
813
814  mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
815
816  switch (membership) {
817  case UV_JOIN_GROUP:
818    optname = IP_ADD_MEMBERSHIP;
819    break;
820  case UV_LEAVE_GROUP:
821    optname = IP_DROP_MEMBERSHIP;
822    break;
823  default:
824    return UV_EINVAL;
825  }
826
827  if (setsockopt(handle->io_watcher.fd,
828                 IPPROTO_IP,
829                 optname,
830                 &mreq,
831                 sizeof(mreq))) {
832#if defined(__MVS__)
833  if (errno == ENXIO)
834    return UV_ENODEV;
835#endif
836    return UV__ERR(errno);
837  }
838
839  return 0;
840}
841
842
843static int uv__udp_set_membership6(uv_udp_t* handle,
844                                   const struct sockaddr_in6* multicast_addr,
845                                   const char* interface_addr,
846                                   uv_membership membership) {
847  int optname;
848  struct ipv6_mreq mreq;
849  struct sockaddr_in6 addr6;
850
851  memset(&mreq, 0, sizeof mreq);
852
853  if (interface_addr) {
854    if (uv_ip6_addr(interface_addr, 0, &addr6))
855      return UV_EINVAL;
856    mreq.ipv6mr_interface = addr6.sin6_scope_id;
857  } else {
858    mreq.ipv6mr_interface = 0;
859  }
860
861  mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr;
862
863  switch (membership) {
864  case UV_JOIN_GROUP:
865    optname = IPV6_ADD_MEMBERSHIP;
866    break;
867  case UV_LEAVE_GROUP:
868    optname = IPV6_DROP_MEMBERSHIP;
869    break;
870  default:
871    return UV_EINVAL;
872  }
873
874  if (setsockopt(handle->io_watcher.fd,
875                 IPPROTO_IPV6,
876                 optname,
877                 &mreq,
878                 sizeof(mreq))) {
879#if defined(__MVS__)
880  if (errno == ENXIO)
881    return UV_ENODEV;
882#endif
883    return UV__ERR(errno);
884  }
885
886  return 0;
887}
888
889
890#if !defined(__OpenBSD__) &&                                        \
891    !defined(__NetBSD__) &&                                         \
892    !defined(__ANDROID__) &&                                        \
893    !defined(__DragonFly__) &&                                      \
894    !defined(__QNX__) &&                                            \
895    !defined(__GNU__)
896static int uv__udp_set_source_membership4(uv_udp_t* handle,
897                                          const struct sockaddr_in* multicast_addr,
898                                          const char* interface_addr,
899                                          const struct sockaddr_in* source_addr,
900                                          uv_membership membership) {
901  struct ip_mreq_source mreq;
902  int optname;
903  int err;
904
905  err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
906  if (err)
907    return err;
908
909  memset(&mreq, 0, sizeof(mreq));
910
911  if (interface_addr != NULL) {
912    err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
913    if (err)
914      return err;
915  } else {
916    mreq.imr_interface.s_addr = htonl(INADDR_ANY);
917  }
918
919  mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
920  mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr;
921
922  if (membership == UV_JOIN_GROUP)
923    optname = IP_ADD_SOURCE_MEMBERSHIP;
924  else if (membership == UV_LEAVE_GROUP)
925    optname = IP_DROP_SOURCE_MEMBERSHIP;
926  else
927    return UV_EINVAL;
928
929  if (setsockopt(handle->io_watcher.fd,
930                 IPPROTO_IP,
931                 optname,
932                 &mreq,
933                 sizeof(mreq))) {
934    return UV__ERR(errno);
935  }
936
937  return 0;
938}
939
940
941static int uv__udp_set_source_membership6(uv_udp_t* handle,
942                                          const struct sockaddr_in6* multicast_addr,
943                                          const char* interface_addr,
944                                          const struct sockaddr_in6* source_addr,
945                                          uv_membership membership) {
946  struct group_source_req mreq;
947  struct sockaddr_in6 addr6;
948  int optname;
949  int err;
950
951  err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
952  if (err)
953    return err;
954
955  memset(&mreq, 0, sizeof(mreq));
956
957  if (interface_addr != NULL) {
958    err = uv_ip6_addr(interface_addr, 0, &addr6);
959    if (err)
960      return err;
961    mreq.gsr_interface = addr6.sin6_scope_id;
962  } else {
963    mreq.gsr_interface = 0;
964  }
965
966  STATIC_ASSERT(sizeof(mreq.gsr_group) >= sizeof(*multicast_addr));
967  STATIC_ASSERT(sizeof(mreq.gsr_source) >= sizeof(*source_addr));
968  memcpy(&mreq.gsr_group, multicast_addr, sizeof(*multicast_addr));
969  memcpy(&mreq.gsr_source, source_addr, sizeof(*source_addr));
970
971  if (membership == UV_JOIN_GROUP)
972    optname = MCAST_JOIN_SOURCE_GROUP;
973  else if (membership == UV_LEAVE_GROUP)
974    optname = MCAST_LEAVE_SOURCE_GROUP;
975  else
976    return UV_EINVAL;
977
978  if (setsockopt(handle->io_watcher.fd,
979                 IPPROTO_IPV6,
980                 optname,
981                 &mreq,
982                 sizeof(mreq))) {
983    return UV__ERR(errno);
984  }
985
986  return 0;
987}
988#endif
989
990
991int uv__udp_init_ex(uv_loop_t* loop,
992                    uv_udp_t* handle,
993                    unsigned flags,
994                    int domain) {
995  int fd;
996
997  fd = -1;
998  if (domain != AF_UNSPEC) {
999    fd = uv__socket(domain, SOCK_DGRAM, 0);
1000    if (fd < 0)
1001      return fd;
1002  }
1003
1004  uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
1005  handle->alloc_cb = NULL;
1006  handle->recv_cb = NULL;
1007  handle->send_queue_size = 0;
1008  handle->send_queue_count = 0;
1009  uv__io_init(&handle->io_watcher, uv__udp_io, fd);
1010  uv__queue_init(&handle->write_queue);
1011  uv__queue_init(&handle->write_completed_queue);
1012
1013  return 0;
1014}
1015
1016
1017int uv_udp_using_recvmmsg(const uv_udp_t* handle) {
1018#if defined(__linux__) || defined(__FreeBSD__)
1019  if (handle->flags & UV_HANDLE_UDP_RECVMMSG)
1020    return 1;
1021#endif
1022  return 0;
1023}
1024
1025
1026int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
1027  int err;
1028
1029  /* Check for already active socket. */
1030  if (handle->io_watcher.fd != -1)
1031    return UV_EBUSY;
1032
1033  if (uv__fd_exists(handle->loop, sock))
1034    return UV_EEXIST;
1035
1036  err = uv__nonblock(sock, 1);
1037  if (err)
1038    return err;
1039
1040  err = uv__set_reuse(sock);
1041  if (err)
1042    return err;
1043
1044  handle->io_watcher.fd = sock;
1045  if (uv__udp_is_connected(handle))
1046    handle->flags |= UV_HANDLE_UDP_CONNECTED;
1047
1048  return 0;
1049}
1050
1051
1052int uv_udp_set_membership(uv_udp_t* handle,
1053                          const char* multicast_addr,
1054                          const char* interface_addr,
1055                          uv_membership membership) {
1056  int err;
1057  struct sockaddr_in addr4;
1058  struct sockaddr_in6 addr6;
1059
1060  if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) {
1061    err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
1062    if (err)
1063      return err;
1064    return uv__udp_set_membership4(handle, &addr4, interface_addr, membership);
1065  } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) {
1066    err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
1067    if (err)
1068      return err;
1069    return uv__udp_set_membership6(handle, &addr6, interface_addr, membership);
1070  } else {
1071    return UV_EINVAL;
1072  }
1073}
1074
1075
1076int uv_udp_set_source_membership(uv_udp_t* handle,
1077                                 const char* multicast_addr,
1078                                 const char* interface_addr,
1079                                 const char* source_addr,
1080                                 uv_membership membership) {
1081#if !defined(__OpenBSD__) &&                                        \
1082    !defined(__NetBSD__) &&                                         \
1083    !defined(__ANDROID__) &&                                        \
1084    !defined(__DragonFly__) &&                                      \
1085    !defined(__QNX__) &&                                            \
1086    !defined(__GNU__)
1087  int err;
1088  union uv__sockaddr mcast_addr;
1089  union uv__sockaddr src_addr;
1090
1091  err = uv_ip4_addr(multicast_addr, 0, &mcast_addr.in);
1092  if (err) {
1093    err = uv_ip6_addr(multicast_addr, 0, &mcast_addr.in6);
1094    if (err)
1095      return err;
1096    err = uv_ip6_addr(source_addr, 0, &src_addr.in6);
1097    if (err)
1098      return err;
1099    return uv__udp_set_source_membership6(handle,
1100                                          &mcast_addr.in6,
1101                                          interface_addr,
1102                                          &src_addr.in6,
1103                                          membership);
1104  }
1105
1106  err = uv_ip4_addr(source_addr, 0, &src_addr.in);
1107  if (err)
1108    return err;
1109  return uv__udp_set_source_membership4(handle,
1110                                        &mcast_addr.in,
1111                                        interface_addr,
1112                                        &src_addr.in,
1113                                        membership);
1114#else
1115  return UV_ENOSYS;
1116#endif
1117}
1118
1119
1120static int uv__setsockopt(uv_udp_t* handle,
1121                         int option4,
1122                         int option6,
1123                         const void* val,
1124                         socklen_t size) {
1125  int r;
1126
1127  if (handle->flags & UV_HANDLE_IPV6)
1128    r = setsockopt(handle->io_watcher.fd,
1129                   IPPROTO_IPV6,
1130                   option6,
1131                   val,
1132                   size);
1133  else
1134    r = setsockopt(handle->io_watcher.fd,
1135                   IPPROTO_IP,
1136                   option4,
1137                   val,
1138                   size);
1139  if (r)
1140    return UV__ERR(errno);
1141
1142  return 0;
1143}
1144
1145static int uv__setsockopt_maybe_char(uv_udp_t* handle,
1146                                     int option4,
1147                                     int option6,
1148                                     int val) {
1149#if defined(__sun) || defined(_AIX) || defined(__MVS__)
1150  char arg = val;
1151#elif defined(__OpenBSD__)
1152  unsigned char arg = val;
1153#else
1154  int arg = val;
1155#endif
1156
1157  if (val < 0 || val > 255)
1158    return UV_EINVAL;
1159
1160  return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg));
1161}
1162
1163
1164int uv_udp_set_broadcast(uv_udp_t* handle, int on) {
1165  if (setsockopt(handle->io_watcher.fd,
1166                 SOL_SOCKET,
1167                 SO_BROADCAST,
1168                 &on,
1169                 sizeof(on))) {
1170    return UV__ERR(errno);
1171  }
1172
1173  return 0;
1174}
1175
1176
1177int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
1178  if (ttl < 1 || ttl > 255)
1179    return UV_EINVAL;
1180
1181#if defined(__MVS__)
1182  if (!(handle->flags & UV_HANDLE_IPV6))
1183    return UV_ENOTSUP;  /* zOS does not support setting ttl for IPv4 */
1184#endif
1185
1186/*
1187 * On Solaris and derivatives such as SmartOS, the length of socket options
1188 * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS,
1189 * so hardcode the size of these options on this platform,
1190 * and use the general uv__setsockopt_maybe_char call on other platforms.
1191 */
1192#if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1193    defined(__MVS__) || defined(__QNX__)
1194
1195  return uv__setsockopt(handle,
1196                        IP_TTL,
1197                        IPV6_UNICAST_HOPS,
1198                        &ttl,
1199                        sizeof(ttl));
1200
1201#else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1202           defined(__MVS__) || defined(__QNX__)) */
1203
1204  return uv__setsockopt_maybe_char(handle,
1205                                   IP_TTL,
1206                                   IPV6_UNICAST_HOPS,
1207                                   ttl);
1208
1209#endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1210          defined(__MVS__) || defined(__QNX__) */
1211}
1212
1213
1214int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) {
1215/*
1216 * On Solaris and derivatives such as SmartOS, the length of socket options
1217 * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for
1218 * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case,
1219 * and use the general uv__setsockopt_maybe_char call otherwise.
1220 */
1221#if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1222    defined(__MVS__) || defined(__QNX__)
1223  if (handle->flags & UV_HANDLE_IPV6)
1224    return uv__setsockopt(handle,
1225                          IP_MULTICAST_TTL,
1226                          IPV6_MULTICAST_HOPS,
1227                          &ttl,
1228                          sizeof(ttl));
1229#endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1230    defined(__MVS__) || defined(__QNX__) */
1231
1232  return uv__setsockopt_maybe_char(handle,
1233                                   IP_MULTICAST_TTL,
1234                                   IPV6_MULTICAST_HOPS,
1235                                   ttl);
1236}
1237
1238
1239int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
1240/*
1241 * On Solaris and derivatives such as SmartOS, the length of socket options
1242 * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for
1243 * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case,
1244 * and use the general uv__setsockopt_maybe_char call otherwise.
1245 */
1246#if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1247    defined(__MVS__) || defined(__QNX__)
1248  if (handle->flags & UV_HANDLE_IPV6)
1249    return uv__setsockopt(handle,
1250                          IP_MULTICAST_LOOP,
1251                          IPV6_MULTICAST_LOOP,
1252                          &on,
1253                          sizeof(on));
1254#endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) ||
1255    defined(__MVS__) || defined(__QNX__) */
1256
1257  return uv__setsockopt_maybe_char(handle,
1258                                   IP_MULTICAST_LOOP,
1259                                   IPV6_MULTICAST_LOOP,
1260                                   on);
1261}
1262
1263int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) {
1264  struct sockaddr_storage addr_st;
1265  struct sockaddr_in* addr4;
1266  struct sockaddr_in6* addr6;
1267
1268  addr4 = (struct sockaddr_in*) &addr_st;
1269  addr6 = (struct sockaddr_in6*) &addr_st;
1270
1271  if (!interface_addr) {
1272    memset(&addr_st, 0, sizeof addr_st);
1273    if (handle->flags & UV_HANDLE_IPV6) {
1274      addr_st.ss_family = AF_INET6;
1275      addr6->sin6_scope_id = 0;
1276    } else {
1277      addr_st.ss_family = AF_INET;
1278      addr4->sin_addr.s_addr = htonl(INADDR_ANY);
1279    }
1280  } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) {
1281    /* nothing, address was parsed */
1282  } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) {
1283    /* nothing, address was parsed */
1284  } else {
1285    return UV_EINVAL;
1286  }
1287
1288  if (addr_st.ss_family == AF_INET) {
1289    if (setsockopt(handle->io_watcher.fd,
1290                   IPPROTO_IP,
1291                   IP_MULTICAST_IF,
1292                   (void*) &addr4->sin_addr,
1293                   sizeof(addr4->sin_addr)) == -1) {
1294      return UV__ERR(errno);
1295    }
1296  } else if (addr_st.ss_family == AF_INET6) {
1297    if (setsockopt(handle->io_watcher.fd,
1298                   IPPROTO_IPV6,
1299                   IPV6_MULTICAST_IF,
1300                   &addr6->sin6_scope_id,
1301                   sizeof(addr6->sin6_scope_id)) == -1) {
1302      return UV__ERR(errno);
1303    }
1304  } else {
1305    assert(0 && "unexpected address family");
1306    abort();
1307  }
1308
1309  return 0;
1310}
1311
1312int uv_udp_getpeername(const uv_udp_t* handle,
1313                       struct sockaddr* name,
1314                       int* namelen) {
1315
1316  return uv__getsockpeername((const uv_handle_t*) handle,
1317                             getpeername,
1318                             name,
1319                             namelen);
1320}
1321
1322int uv_udp_getsockname(const uv_udp_t* handle,
1323                       struct sockaddr* name,
1324                       int* namelen) {
1325
1326  return uv__getsockpeername((const uv_handle_t*) handle,
1327                             getsockname,
1328                             name,
1329                             namelen);
1330}
1331
1332
1333int uv__udp_recv_start(uv_udp_t* handle,
1334                       uv_alloc_cb alloc_cb,
1335                       uv_udp_recv_cb recv_cb) {
1336  int err;
1337
1338  if (alloc_cb == NULL || recv_cb == NULL)
1339    return UV_EINVAL;
1340
1341  if (uv__io_active(&handle->io_watcher, POLLIN))
1342    return UV_EALREADY;  /* FIXME(bnoordhuis) Should be UV_EBUSY. */
1343
1344  err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0);
1345  if (err)
1346    return err;
1347
1348  handle->alloc_cb = alloc_cb;
1349  handle->recv_cb = recv_cb;
1350
1351  uv__io_start(handle->loop, &handle->io_watcher, POLLIN);
1352  uv__handle_start(handle);
1353
1354  return 0;
1355}
1356
1357
1358int uv__udp_recv_stop(uv_udp_t* handle) {
1359  uv__io_stop(handle->loop, &handle->io_watcher, POLLIN);
1360
1361  if (!uv__io_active(&handle->io_watcher, POLLOUT))
1362    uv__handle_stop(handle);
1363
1364  handle->alloc_cb = NULL;
1365  handle->recv_cb = NULL;
1366
1367  return 0;
1368}
1369