xref: /third_party/libuv/src/unix/stream.c (revision e66f31c5)
1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22#include "uv.h"
23#include "internal.h"
24
25#include <stdio.h>
26#include <stdlib.h>
27#include <string.h>
28#include <assert.h>
29#include <errno.h>
30
31#include <sys/types.h>
32#include <sys/socket.h>
33#include <sys/uio.h>
34#include <sys/un.h>
35#include <unistd.h>
36#include <limits.h> /* IOV_MAX */
37
38#if defined(__APPLE__)
39# include <sys/event.h>
40# include <sys/time.h>
41# include <sys/select.h>
42
43/* Forward declaration */
44typedef struct uv__stream_select_s uv__stream_select_t;
45
46struct uv__stream_select_s {
47  uv_stream_t* stream;
48  uv_thread_t thread;
49  uv_sem_t close_sem;
50  uv_sem_t async_sem;
51  uv_async_t async;
52  int events;
53  int fake_fd;
54  int int_fd;
55  int fd;
56  fd_set* sread;
57  size_t sread_sz;
58  fd_set* swrite;
59  size_t swrite_sz;
60};
61#endif /* defined(__APPLE__) */
62
63union uv__cmsg {
64  struct cmsghdr hdr;
65  /* This cannot be larger because of the IBMi PASE limitation that
66   * the total size of control messages cannot exceed 256 bytes.
67   */
68  char pad[256];
69};
70
71STATIC_ASSERT(256 == sizeof(union uv__cmsg));
72
73static void uv__stream_connect(uv_stream_t*);
74static void uv__write(uv_stream_t* stream);
75static void uv__read(uv_stream_t* stream);
76static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
77static void uv__write_callbacks(uv_stream_t* stream);
78static size_t uv__write_req_size(uv_write_t* req);
79static void uv__drain(uv_stream_t* stream);
80
81
82void uv__stream_init(uv_loop_t* loop,
83                     uv_stream_t* stream,
84                     uv_handle_type type) {
85  int err;
86
87  uv__handle_init(loop, (uv_handle_t*)stream, type);
88  stream->read_cb = NULL;
89  stream->alloc_cb = NULL;
90  stream->close_cb = NULL;
91  stream->connection_cb = NULL;
92  stream->connect_req = NULL;
93  stream->shutdown_req = NULL;
94  stream->accepted_fd = -1;
95  stream->queued_fds = NULL;
96  stream->delayed_error = 0;
97  uv__queue_init(&stream->write_queue);
98  uv__queue_init(&stream->write_completed_queue);
99  stream->write_queue_size = 0;
100
101  if (loop->emfile_fd == -1) {
102    err = uv__open_cloexec("/dev/null", O_RDONLY);
103    if (err < 0)
104        /* In the rare case that "/dev/null" isn't mounted open "/"
105         * instead.
106         */
107        err = uv__open_cloexec("/", O_RDONLY);
108    if (err >= 0)
109      loop->emfile_fd = err;
110  }
111
112#if defined(__APPLE__)
113  stream->select = NULL;
114#endif /* defined(__APPLE_) */
115
116  uv__io_init(&stream->io_watcher, uv__stream_io, -1);
117}
118
119
120static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
121#if defined(__APPLE__)
122  /* Notify select() thread about state change */
123  uv__stream_select_t* s;
124  int r;
125
126  s = stream->select;
127  if (s == NULL)
128    return;
129
130  /* Interrupt select() loop
131   * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
132   * emit read event on other side
133   */
134  do
135    r = write(s->fake_fd, "x", 1);
136  while (r == -1 && errno == EINTR);
137
138  assert(r == 1);
139#else  /* !defined(__APPLE__) */
140  /* No-op on any other platform */
141#endif  /* !defined(__APPLE__) */
142}
143
144
145#if defined(__APPLE__)
146static void uv__stream_osx_select(void* arg) {
147  uv_stream_t* stream;
148  uv__stream_select_t* s;
149  char buf[1024];
150  int events;
151  int fd;
152  int r;
153  int max_fd;
154
155  stream = arg;
156  s = stream->select;
157  fd = s->fd;
158
159  if (fd > s->int_fd)
160    max_fd = fd;
161  else
162    max_fd = s->int_fd;
163
164  for (;;) {
165    /* Terminate on semaphore */
166    if (uv_sem_trywait(&s->close_sem) == 0)
167      break;
168
169    /* Watch fd using select(2) */
170    memset(s->sread, 0, s->sread_sz);
171    memset(s->swrite, 0, s->swrite_sz);
172
173    if (uv__io_active(&stream->io_watcher, POLLIN))
174      FD_SET(fd, s->sread);
175    if (uv__io_active(&stream->io_watcher, POLLOUT))
176      FD_SET(fd, s->swrite);
177    FD_SET(s->int_fd, s->sread);
178
179    /* Wait indefinitely for fd events */
180    r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
181    if (r == -1) {
182      if (errno == EINTR)
183        continue;
184
185      /* XXX: Possible?! */
186      abort();
187    }
188
189    /* Ignore timeouts */
190    if (r == 0)
191      continue;
192
193    /* Empty socketpair's buffer in case of interruption */
194    if (FD_ISSET(s->int_fd, s->sread))
195      for (;;) {
196        r = read(s->int_fd, buf, sizeof(buf));
197
198        if (r == sizeof(buf))
199          continue;
200
201        if (r != -1)
202          break;
203
204        if (errno == EAGAIN || errno == EWOULDBLOCK)
205          break;
206
207        if (errno == EINTR)
208          continue;
209
210        abort();
211      }
212
213    /* Handle events */
214    events = 0;
215    if (FD_ISSET(fd, s->sread))
216      events |= POLLIN;
217    if (FD_ISSET(fd, s->swrite))
218      events |= POLLOUT;
219
220    assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
221    if (events != 0) {
222      ACCESS_ONCE(int, s->events) = events;
223
224      uv_async_send(&s->async);
225      uv_sem_wait(&s->async_sem);
226
227      /* Should be processed at this stage */
228      assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
229    }
230  }
231}
232
233
234static void uv__stream_osx_select_cb(uv_async_t* handle) {
235  uv__stream_select_t* s;
236  uv_stream_t* stream;
237  int events;
238
239  s = container_of(handle, uv__stream_select_t, async);
240  stream = s->stream;
241
242  /* Get and reset stream's events */
243  events = s->events;
244  ACCESS_ONCE(int, s->events) = 0;
245
246  assert(events != 0);
247  assert(events == (events & (POLLIN | POLLOUT)));
248
249  /* Invoke callback on event-loop */
250  if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
251    uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
252
253  if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
254    uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
255
256  if (stream->flags & UV_HANDLE_CLOSING)
257    return;
258
259  /* NOTE: It is important to do it here, otherwise `select()` might be called
260   * before the actual `uv__read()`, leading to the blocking syscall
261   */
262  uv_sem_post(&s->async_sem);
263}
264
265
266static void uv__stream_osx_cb_close(uv_handle_t* async) {
267  uv__stream_select_t* s;
268
269  s = container_of(async, uv__stream_select_t, async);
270  uv__free(s);
271}
272
273
274int uv__stream_try_select(uv_stream_t* stream, int* fd) {
275  /*
276   * kqueue doesn't work with some files from /dev mount on osx.
277   * select(2) in separate thread for those fds
278   */
279
280  struct kevent filter[1];
281  struct kevent events[1];
282  struct timespec timeout;
283  uv__stream_select_t* s;
284  int fds[2];
285  int err;
286  int ret;
287  int kq;
288  int old_fd;
289  int max_fd;
290  size_t sread_sz;
291  size_t swrite_sz;
292
293  kq = kqueue();
294  if (kq == -1) {
295    perror("(libuv) kqueue()");
296    return UV__ERR(errno);
297  }
298
299  EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
300
301  /* Use small timeout, because we only want to capture EINVALs */
302  timeout.tv_sec = 0;
303  timeout.tv_nsec = 1;
304
305  do
306    ret = kevent(kq, filter, 1, events, 1, &timeout);
307  while (ret == -1 && errno == EINTR);
308
309  uv__close(kq);
310
311  if (ret == -1)
312    return UV__ERR(errno);
313
314  if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
315    return 0;
316
317  /* At this point we definitely know that this fd won't work with kqueue */
318
319  /*
320   * Create fds for io watcher and to interrupt the select() loop.
321   * NOTE: do it ahead of malloc below to allocate enough space for fd_sets
322   */
323  if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
324    return UV__ERR(errno);
325
326  max_fd = *fd;
327  if (fds[1] > max_fd)
328    max_fd = fds[1];
329
330  sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
331  swrite_sz = sread_sz;
332
333  s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
334  if (s == NULL) {
335    err = UV_ENOMEM;
336    goto failed_malloc;
337  }
338
339  s->events = 0;
340  s->fd = *fd;
341  s->sread = (fd_set*) ((char*) s + sizeof(*s));
342  s->sread_sz = sread_sz;
343  s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
344  s->swrite_sz = swrite_sz;
345
346  err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
347  if (err)
348    goto failed_async_init;
349
350  s->async.flags |= UV_HANDLE_INTERNAL;
351  uv__handle_unref(&s->async);
352
353  err = uv_sem_init(&s->close_sem, 0);
354  if (err != 0)
355    goto failed_close_sem_init;
356
357  err = uv_sem_init(&s->async_sem, 0);
358  if (err != 0)
359    goto failed_async_sem_init;
360
361  s->fake_fd = fds[0];
362  s->int_fd = fds[1];
363
364  old_fd = *fd;
365  s->stream = stream;
366  stream->select = s;
367  *fd = s->fake_fd;
368
369  err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
370  if (err != 0)
371    goto failed_thread_create;
372
373  return 0;
374
375failed_thread_create:
376  s->stream = NULL;
377  stream->select = NULL;
378  *fd = old_fd;
379
380  uv_sem_destroy(&s->async_sem);
381
382failed_async_sem_init:
383  uv_sem_destroy(&s->close_sem);
384
385failed_close_sem_init:
386  uv__close(fds[0]);
387  uv__close(fds[1]);
388  uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
389  return err;
390
391failed_async_init:
392  uv__free(s);
393
394failed_malloc:
395  uv__close(fds[0]);
396  uv__close(fds[1]);
397
398  return err;
399}
400#endif /* defined(__APPLE__) */
401
402
403int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
404#if defined(__APPLE__)
405  int enable;
406#endif
407
408  if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
409    return UV_EBUSY;
410
411  assert(fd >= 0);
412  stream->flags |= flags;
413
414  if (stream->type == UV_TCP) {
415    if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
416      return UV__ERR(errno);
417
418    /* TODO Use delay the user passed in. */
419    if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
420        uv__tcp_keepalive(fd, 1, 60)) {
421      return UV__ERR(errno);
422    }
423  }
424
425#if defined(__APPLE__)
426  enable = 1;
427  if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
428      errno != ENOTSOCK &&
429      errno != EINVAL) {
430    return UV__ERR(errno);
431  }
432#endif
433
434  stream->io_watcher.fd = fd;
435
436  return 0;
437}
438
439
440void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
441  uv_write_t* req;
442  struct uv__queue* q;
443  while (!uv__queue_empty(&stream->write_queue)) {
444    q = uv__queue_head(&stream->write_queue);
445    uv__queue_remove(q);
446
447    req = uv__queue_data(q, uv_write_t, queue);
448    req->error = error;
449
450    uv__queue_insert_tail(&stream->write_completed_queue, &req->queue);
451  }
452}
453
454
455void uv__stream_destroy(uv_stream_t* stream) {
456  assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
457  assert(stream->flags & UV_HANDLE_CLOSED);
458
459  if (stream->connect_req) {
460    uv__req_unregister(stream->loop, stream->connect_req);
461    stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
462    stream->connect_req = NULL;
463  }
464
465  uv__stream_flush_write_queue(stream, UV_ECANCELED);
466  uv__write_callbacks(stream);
467  uv__drain(stream);
468
469  assert(stream->write_queue_size == 0);
470}
471
472
473/* Implements a best effort approach to mitigating accept() EMFILE errors.
474 * We have a spare file descriptor stashed away that we close to get below
475 * the EMFILE limit. Next, we accept all pending connections and close them
476 * immediately to signal the clients that we're overloaded - and we are, but
477 * we still keep on trucking.
478 *
479 * There is one caveat: it's not reliable in a multi-threaded environment.
480 * The file descriptor limit is per process. Our party trick fails if another
481 * thread opens a file or creates a socket in the time window between us
482 * calling close() and accept().
483 */
484static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
485  int err;
486  int emfile_fd;
487
488  if (loop->emfile_fd == -1)
489    return UV_EMFILE;
490
491  uv__close(loop->emfile_fd);
492  loop->emfile_fd = -1;
493
494  do {
495    err = uv__accept(accept_fd);
496    if (err >= 0)
497      uv__close(err);
498  } while (err >= 0 || err == UV_EINTR);
499
500  emfile_fd = uv__open_cloexec("/", O_RDONLY);
501  if (emfile_fd >= 0)
502    loop->emfile_fd = emfile_fd;
503
504  return err;
505}
506
507
508void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
509  uv_stream_t* stream;
510  int err;
511  int fd;
512
513  stream = container_of(w, uv_stream_t, io_watcher);
514  assert(events & POLLIN);
515  assert(stream->accepted_fd == -1);
516  assert(!(stream->flags & UV_HANDLE_CLOSING));
517
518  fd = uv__stream_fd(stream);
519  err = uv__accept(fd);
520
521  if (err == UV_EMFILE || err == UV_ENFILE)
522    err = uv__emfile_trick(loop, fd);  /* Shed load. */
523
524  if (err < 0)
525    return;
526
527  stream->accepted_fd = err;
528  stream->connection_cb(stream, 0);
529
530  if (stream->accepted_fd != -1)
531    /* The user hasn't yet accepted called uv_accept() */
532    uv__io_stop(loop, &stream->io_watcher, POLLIN);
533}
534
535
536int uv_accept(uv_stream_t* server, uv_stream_t* client) {
537  int err;
538
539  assert(server->loop == client->loop);
540
541  if (server->accepted_fd == -1)
542    return UV_EAGAIN;
543
544  switch (client->type) {
545    case UV_NAMED_PIPE:
546    case UV_TCP:
547      err = uv__stream_open(client,
548                            server->accepted_fd,
549                            UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
550      if (err) {
551        /* TODO handle error */
552        uv__close(server->accepted_fd);
553        goto done;
554      }
555      break;
556
557    case UV_UDP:
558      err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
559      if (err) {
560        uv__close(server->accepted_fd);
561        goto done;
562      }
563      break;
564
565    default:
566      return UV_EINVAL;
567  }
568
569  client->flags |= UV_HANDLE_BOUND;
570
571done:
572  /* Process queued fds */
573  if (server->queued_fds != NULL) {
574    uv__stream_queued_fds_t* queued_fds;
575
576    queued_fds = server->queued_fds;
577
578    /* Read first */
579    server->accepted_fd = queued_fds->fds[0];
580
581    /* All read, free */
582    assert(queued_fds->offset > 0);
583    if (--queued_fds->offset == 0) {
584      uv__free(queued_fds);
585      server->queued_fds = NULL;
586    } else {
587      /* Shift rest */
588      memmove(queued_fds->fds,
589              queued_fds->fds + 1,
590              queued_fds->offset * sizeof(*queued_fds->fds));
591    }
592  } else {
593    server->accepted_fd = -1;
594    if (err == 0)
595      uv__io_start(server->loop, &server->io_watcher, POLLIN);
596  }
597  return err;
598}
599
600
601int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
602  int err;
603  if (uv__is_closing(stream)) {
604    return UV_EINVAL;
605  }
606  switch (stream->type) {
607  case UV_TCP:
608    err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb);
609    break;
610
611  case UV_NAMED_PIPE:
612    err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb);
613    break;
614
615  default:
616    err = UV_EINVAL;
617  }
618
619  if (err == 0)
620    uv__handle_start(stream);
621
622  return err;
623}
624
625
626static void uv__drain(uv_stream_t* stream) {
627  uv_shutdown_t* req;
628  int err;
629
630  assert(uv__queue_empty(&stream->write_queue));
631  if (!(stream->flags & UV_HANDLE_CLOSING)) {
632    uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
633    uv__stream_osx_interrupt_select(stream);
634  }
635
636  if (!uv__is_stream_shutting(stream))
637    return;
638
639  req = stream->shutdown_req;
640  assert(req);
641
642  if ((stream->flags & UV_HANDLE_CLOSING) ||
643      !(stream->flags & UV_HANDLE_SHUT)) {
644    stream->shutdown_req = NULL;
645    uv__req_unregister(stream->loop, req);
646
647    err = 0;
648    if (stream->flags & UV_HANDLE_CLOSING)
649      /* The user destroyed the stream before we got to do the shutdown. */
650      err = UV_ECANCELED;
651    else if (shutdown(uv__stream_fd(stream), SHUT_WR))
652      err = UV__ERR(errno);
653    else /* Success. */
654      stream->flags |= UV_HANDLE_SHUT;
655
656    if (req->cb != NULL)
657      req->cb(req, err);
658  }
659}
660
661
662static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
663  if (n == 1)
664    return write(fd, vec->iov_base, vec->iov_len);
665  else
666    return writev(fd, vec, n);
667}
668
669
670static size_t uv__write_req_size(uv_write_t* req) {
671  size_t size;
672
673  assert(req->bufs != NULL);
674  size = uv__count_bufs(req->bufs + req->write_index,
675                        req->nbufs - req->write_index);
676  assert(req->handle->write_queue_size >= size);
677
678  return size;
679}
680
681
682/* Returns 1 if all write request data has been written, or 0 if there is still
683 * more data to write.
684 *
685 * Note: the return value only says something about the *current* request.
686 * There may still be other write requests sitting in the queue.
687 */
688static int uv__write_req_update(uv_stream_t* stream,
689                                uv_write_t* req,
690                                size_t n) {
691  uv_buf_t* buf;
692  size_t len;
693
694  assert(n <= stream->write_queue_size);
695  stream->write_queue_size -= n;
696
697  buf = req->bufs + req->write_index;
698
699  do {
700    len = n < buf->len ? n : buf->len;
701    buf->base += len;
702    buf->len -= len;
703    buf += (buf->len == 0);  /* Advance to next buffer if this one is empty. */
704    n -= len;
705  } while (n > 0);
706
707  req->write_index = buf - req->bufs;
708
709  return req->write_index == req->nbufs;
710}
711
712
713static void uv__write_req_finish(uv_write_t* req) {
714  uv_stream_t* stream = req->handle;
715
716  /* Pop the req off tcp->write_queue. */
717  uv__queue_remove(&req->queue);
718
719  /* Only free when there was no error. On error, we touch up write_queue_size
720   * right before making the callback. The reason we don't do that right away
721   * is that a write_queue_size > 0 is our only way to signal to the user that
722   * they should stop writing - which they should if we got an error. Something
723   * to revisit in future revisions of the libuv API.
724   */
725  if (req->error == 0) {
726    if (req->bufs != req->bufsml)
727      uv__free(req->bufs);
728    req->bufs = NULL;
729  }
730
731  /* Add it to the write_completed_queue where it will have its
732   * callback called in the near future.
733   */
734  uv__queue_insert_tail(&stream->write_completed_queue, &req->queue);
735  uv__io_feed(stream->loop, &stream->io_watcher);
736}
737
738
739static int uv__handle_fd(uv_handle_t* handle) {
740  switch (handle->type) {
741    case UV_NAMED_PIPE:
742    case UV_TCP:
743      return ((uv_stream_t*) handle)->io_watcher.fd;
744
745    case UV_UDP:
746      return ((uv_udp_t*) handle)->io_watcher.fd;
747
748    default:
749      return -1;
750  }
751}
752
753static int uv__try_write(uv_stream_t* stream,
754                         const uv_buf_t bufs[],
755                         unsigned int nbufs,
756                         uv_stream_t* send_handle) {
757  struct iovec* iov;
758  int iovmax;
759  int iovcnt;
760  ssize_t n;
761
762  /*
763   * Cast to iovec. We had to have our own uv_buf_t instead of iovec
764   * because Windows's WSABUF is not an iovec.
765   */
766  iov = (struct iovec*) bufs;
767  iovcnt = nbufs;
768
769  iovmax = uv__getiovmax();
770
771  /* Limit iov count to avoid EINVALs from writev() */
772  if (iovcnt > iovmax)
773    iovcnt = iovmax;
774
775  /*
776   * Now do the actual writev. Note that we've been updating the pointers
777   * inside the iov each time we write. So there is no need to offset it.
778   */
779  if (send_handle != NULL) {
780    int fd_to_send;
781    struct msghdr msg;
782    union uv__cmsg cmsg;
783
784    if (uv__is_closing(send_handle))
785      return UV_EBADF;
786
787    fd_to_send = uv__handle_fd((uv_handle_t*) send_handle);
788
789    memset(&cmsg, 0, sizeof(cmsg));
790
791    assert(fd_to_send >= 0);
792
793    msg.msg_name = NULL;
794    msg.msg_namelen = 0;
795    msg.msg_iov = iov;
796    msg.msg_iovlen = iovcnt;
797    msg.msg_flags = 0;
798
799    msg.msg_control = &cmsg.hdr;
800    msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
801
802    cmsg.hdr.cmsg_level = SOL_SOCKET;
803    cmsg.hdr.cmsg_type = SCM_RIGHTS;
804    cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(fd_to_send));
805    memcpy(CMSG_DATA(&cmsg.hdr), &fd_to_send, sizeof(fd_to_send));
806
807    do
808      n = sendmsg(uv__stream_fd(stream), &msg, 0);
809    while (n == -1 && errno == EINTR);
810  } else {
811    do
812      n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
813    while (n == -1 && errno == EINTR);
814  }
815
816  if (n >= 0)
817    return n;
818
819  if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
820    return UV_EAGAIN;
821
822#ifdef __APPLE__
823  /* macOS versions 10.10 and 10.15 - and presumbaly 10.11 to 10.14, too -
824   * have a bug where a race condition causes the kernel to return EPROTOTYPE
825   * because the socket isn't fully constructed. It's probably the result of
826   * the peer closing the connection and that is why libuv translates it to
827   * ECONNRESET. Previously, libuv retried until the EPROTOTYPE error went
828   * away but some VPN software causes the same behavior except the error is
829   * permanent, not transient, turning the retry mechanism into an infinite
830   * loop. See https://github.com/libuv/libuv/pull/482.
831   */
832  if (errno == EPROTOTYPE)
833    return UV_ECONNRESET;
834#endif  /* __APPLE__ */
835
836  return UV__ERR(errno);
837}
838
839static void uv__write(uv_stream_t* stream) {
840  struct uv__queue* q;
841  uv_write_t* req;
842  ssize_t n;
843  int count;
844
845  assert(uv__stream_fd(stream) >= 0);
846
847  /* Prevent loop starvation when the consumer of this stream read as fast as
848   * (or faster than) we can write it. This `count` mechanism does not need to
849   * change even if we switch to edge-triggered I/O.
850   */
851  count = 32;
852
853  for (;;) {
854    if (uv__queue_empty(&stream->write_queue))
855      return;
856
857    q = uv__queue_head(&stream->write_queue);
858    req = uv__queue_data(q, uv_write_t, queue);
859    assert(req->handle == stream);
860
861    n = uv__try_write(stream,
862                      &(req->bufs[req->write_index]),
863                      req->nbufs - req->write_index,
864                      req->send_handle);
865
866    /* Ensure the handle isn't sent again in case this is a partial write. */
867    if (n >= 0) {
868      req->send_handle = NULL;
869      if (uv__write_req_update(stream, req, n)) {
870        uv__write_req_finish(req);
871        if (count-- > 0)
872          continue; /* Start trying to write the next request. */
873
874        return;
875      }
876    } else if (n != UV_EAGAIN)
877      goto error;
878
879    /* If this is a blocking stream, try again. */
880    if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
881      continue;
882
883    /* We're not done. */
884    uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
885
886    /* Notify select() thread about state change */
887    uv__stream_osx_interrupt_select(stream);
888
889    return;
890  }
891
892error:
893  req->error = n;
894  uv__write_req_finish(req);
895  uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
896  uv__stream_osx_interrupt_select(stream);
897}
898
899
900static void uv__write_callbacks(uv_stream_t* stream) {
901  uv_write_t* req;
902  struct uv__queue* q;
903  struct uv__queue pq;
904
905  if (uv__queue_empty(&stream->write_completed_queue))
906    return;
907
908  uv__queue_move(&stream->write_completed_queue, &pq);
909
910  while (!uv__queue_empty(&pq)) {
911    /* Pop a req off write_completed_queue. */
912    q = uv__queue_head(&pq);
913    req = uv__queue_data(q, uv_write_t, queue);
914    uv__queue_remove(q);
915    uv__req_unregister(stream->loop, req);
916
917    if (req->bufs != NULL) {
918      stream->write_queue_size -= uv__write_req_size(req);
919      if (req->bufs != req->bufsml)
920        uv__free(req->bufs);
921      req->bufs = NULL;
922    }
923
924    /* NOTE: call callback AFTER freeing the request data. */
925    if (req->cb)
926      req->cb(req, req->error);
927  }
928}
929
930
931static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
932  stream->flags |= UV_HANDLE_READ_EOF;
933  stream->flags &= ~UV_HANDLE_READING;
934  uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
935  uv__handle_stop(stream);
936  uv__stream_osx_interrupt_select(stream);
937  stream->read_cb(stream, UV_EOF, buf);
938}
939
940
941static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
942  uv__stream_queued_fds_t* queued_fds;
943  unsigned int queue_size;
944
945  queued_fds = stream->queued_fds;
946  if (queued_fds == NULL) {
947    queue_size = 8;
948    queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
949                            sizeof(*queued_fds));
950    if (queued_fds == NULL)
951      return UV_ENOMEM;
952    queued_fds->size = queue_size;
953    queued_fds->offset = 0;
954    stream->queued_fds = queued_fds;
955
956    /* Grow */
957  } else if (queued_fds->size == queued_fds->offset) {
958    queue_size = queued_fds->size + 8;
959    queued_fds = uv__realloc(queued_fds,
960                             (queue_size - 1) * sizeof(*queued_fds->fds) +
961                              sizeof(*queued_fds));
962
963    /*
964     * Allocation failure, report back.
965     * NOTE: if it is fatal - sockets will be closed in uv__stream_close
966     */
967    if (queued_fds == NULL)
968      return UV_ENOMEM;
969    queued_fds->size = queue_size;
970    stream->queued_fds = queued_fds;
971  }
972
973  /* Put fd in a queue */
974  queued_fds->fds[queued_fds->offset++] = fd;
975
976  return 0;
977}
978
979
980static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
981  struct cmsghdr* cmsg;
982  int fd;
983  int err;
984  size_t i;
985  size_t count;
986
987  for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
988    if (cmsg->cmsg_type != SCM_RIGHTS) {
989      fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
990          cmsg->cmsg_type);
991      continue;
992    }
993
994    assert(cmsg->cmsg_len >= CMSG_LEN(0));
995    count = cmsg->cmsg_len - CMSG_LEN(0);
996    assert(count % sizeof(fd) == 0);
997    count /= sizeof(fd);
998
999    for (i = 0; i < count; i++) {
1000      memcpy(&fd, (char*) CMSG_DATA(cmsg) + i * sizeof(fd), sizeof(fd));
1001      /* Already has accepted fd, queue now */
1002      if (stream->accepted_fd != -1) {
1003        err = uv__stream_queue_fd(stream, fd);
1004        if (err != 0) {
1005          /* Close rest */
1006          for (; i < count; i++)
1007            uv__close(fd);
1008          return err;
1009        }
1010      } else {
1011        stream->accepted_fd = fd;
1012      }
1013    }
1014  }
1015
1016  return 0;
1017}
1018
1019
1020static void uv__read(uv_stream_t* stream) {
1021  uv_buf_t buf;
1022  ssize_t nread;
1023  struct msghdr msg;
1024  union uv__cmsg cmsg;
1025  int count;
1026  int err;
1027  int is_ipc;
1028
1029  stream->flags &= ~UV_HANDLE_READ_PARTIAL;
1030
1031  /* Prevent loop starvation when the data comes in as fast as (or faster than)
1032   * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
1033   */
1034  count = 32;
1035
1036  is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
1037
1038  /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
1039   * tcp->read_cb is NULL or not?
1040   */
1041  while (stream->read_cb
1042      && (stream->flags & UV_HANDLE_READING)
1043      && (count-- > 0)) {
1044    assert(stream->alloc_cb != NULL);
1045
1046    buf = uv_buf_init(NULL, 0);
1047    stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
1048    if (buf.base == NULL || buf.len == 0) {
1049      /* User indicates it can't or won't handle the read. */
1050      stream->read_cb(stream, UV_ENOBUFS, &buf);
1051      return;
1052    }
1053
1054    assert(buf.base != NULL);
1055    assert(uv__stream_fd(stream) >= 0);
1056
1057    if (!is_ipc) {
1058      do {
1059        nread = read(uv__stream_fd(stream), buf.base, buf.len);
1060      }
1061      while (nread < 0 && errno == EINTR);
1062    } else {
1063      /* ipc uses recvmsg */
1064      msg.msg_flags = 0;
1065      msg.msg_iov = (struct iovec*) &buf;
1066      msg.msg_iovlen = 1;
1067      msg.msg_name = NULL;
1068      msg.msg_namelen = 0;
1069      /* Set up to receive a descriptor even if one isn't in the message */
1070      msg.msg_controllen = sizeof(cmsg);
1071      msg.msg_control = &cmsg.hdr;
1072
1073      do {
1074        nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1075      }
1076      while (nread < 0 && errno == EINTR);
1077    }
1078
1079    if (nread < 0) {
1080      /* Error */
1081      if (errno == EAGAIN || errno == EWOULDBLOCK) {
1082        /* Wait for the next one. */
1083        if (stream->flags & UV_HANDLE_READING) {
1084          uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1085          uv__stream_osx_interrupt_select(stream);
1086        }
1087        stream->read_cb(stream, 0, &buf);
1088#if defined(__CYGWIN__) || defined(__MSYS__)
1089      } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
1090        uv__stream_eof(stream, &buf);
1091        return;
1092#endif
1093      } else {
1094        /* Error. User should call uv_close(). */
1095        stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1096        stream->read_cb(stream, UV__ERR(errno), &buf);
1097        if (stream->flags & UV_HANDLE_READING) {
1098          stream->flags &= ~UV_HANDLE_READING;
1099          uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1100          uv__handle_stop(stream);
1101          uv__stream_osx_interrupt_select(stream);
1102        }
1103      }
1104      return;
1105    } else if (nread == 0) {
1106      uv__stream_eof(stream, &buf);
1107      return;
1108    } else {
1109      /* Successful read */
1110      ssize_t buflen = buf.len;
1111
1112      if (is_ipc) {
1113        err = uv__stream_recv_cmsg(stream, &msg);
1114        if (err != 0) {
1115          stream->read_cb(stream, err, &buf);
1116          return;
1117        }
1118      }
1119
1120#if defined(__MVS__)
1121      if (is_ipc && msg.msg_controllen > 0) {
1122        uv_buf_t blankbuf;
1123        int nread;
1124        struct iovec *old;
1125
1126        blankbuf.base = 0;
1127        blankbuf.len = 0;
1128        old = msg.msg_iov;
1129        msg.msg_iov = (struct iovec*) &blankbuf;
1130        nread = 0;
1131        do {
1132          nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1133          err = uv__stream_recv_cmsg(stream, &msg);
1134          if (err != 0) {
1135            stream->read_cb(stream, err, &buf);
1136            msg.msg_iov = old;
1137            return;
1138          }
1139        } while (nread == 0 && msg.msg_controllen > 0);
1140        msg.msg_iov = old;
1141      }
1142#endif
1143      stream->read_cb(stream, nread, &buf);
1144
1145      /* Return if we didn't fill the buffer, there is no more data to read. */
1146      if (nread < buflen) {
1147        stream->flags |= UV_HANDLE_READ_PARTIAL;
1148        return;
1149      }
1150    }
1151  }
1152}
1153
1154
1155int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
1156  assert(stream->type == UV_TCP ||
1157         stream->type == UV_TTY ||
1158         stream->type == UV_NAMED_PIPE);
1159
1160  if (!(stream->flags & UV_HANDLE_WRITABLE) ||
1161      stream->flags & UV_HANDLE_SHUT ||
1162      uv__is_stream_shutting(stream) ||
1163      uv__is_closing(stream)) {
1164    return UV_ENOTCONN;
1165  }
1166
1167  assert(uv__stream_fd(stream) >= 0);
1168
1169  /* Initialize request. The `shutdown(2)` call will always be deferred until
1170   * `uv__drain`, just before the callback is run. */
1171  uv__req_init(stream->loop, req, UV_SHUTDOWN);
1172  req->handle = stream;
1173  req->cb = cb;
1174  stream->shutdown_req = req;
1175  stream->flags &= ~UV_HANDLE_WRITABLE;
1176
1177  if (uv__queue_empty(&stream->write_queue))
1178    uv__io_feed(stream->loop, &stream->io_watcher);
1179
1180  return 0;
1181}
1182
1183
1184static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
1185  uv_stream_t* stream;
1186
1187  stream = container_of(w, uv_stream_t, io_watcher);
1188
1189  assert(stream->type == UV_TCP ||
1190         stream->type == UV_NAMED_PIPE ||
1191         stream->type == UV_TTY);
1192  assert(!(stream->flags & UV_HANDLE_CLOSING));
1193
1194  if (stream->connect_req) {
1195    uv__stream_connect(stream);
1196    return;
1197  }
1198
1199  assert(uv__stream_fd(stream) >= 0);
1200
1201  /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
1202  if (events & (POLLIN | POLLERR | POLLHUP))
1203    uv__read(stream);
1204
1205  if (uv__stream_fd(stream) == -1)
1206    return;  /* read_cb closed stream. */
1207
1208  /* Short-circuit iff POLLHUP is set, the user is still interested in read
1209   * events and uv__read() reported a partial read but not EOF. If the EOF
1210   * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
1211   * have to do anything. If the partial read flag is not set, we can't
1212   * report the EOF yet because there is still data to read.
1213   */
1214  if ((events & POLLHUP) &&
1215      (stream->flags & UV_HANDLE_READING) &&
1216      (stream->flags & UV_HANDLE_READ_PARTIAL) &&
1217      !(stream->flags & UV_HANDLE_READ_EOF)) {
1218    uv_buf_t buf = { NULL, 0 };
1219    uv__stream_eof(stream, &buf);
1220  }
1221
1222  if (uv__stream_fd(stream) == -1)
1223    return;  /* read_cb closed stream. */
1224
1225  if (events & (POLLOUT | POLLERR | POLLHUP)) {
1226    uv__write(stream);
1227    uv__write_callbacks(stream);
1228
1229    /* Write queue drained. */
1230    if (uv__queue_empty(&stream->write_queue))
1231      uv__drain(stream);
1232  }
1233}
1234
1235
1236/**
1237 * We get called here from directly following a call to connect(2).
1238 * In order to determine if we've errored out or succeeded must call
1239 * getsockopt.
1240 */
1241static void uv__stream_connect(uv_stream_t* stream) {
1242  int error;
1243  uv_connect_t* req = stream->connect_req;
1244  socklen_t errorsize = sizeof(int);
1245
1246  assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1247  assert(req);
1248
1249  if (stream->delayed_error) {
1250    /* To smooth over the differences between unixes errors that
1251     * were reported synchronously on the first connect can be delayed
1252     * until the next tick--which is now.
1253     */
1254    error = stream->delayed_error;
1255    stream->delayed_error = 0;
1256  } else {
1257    /* Normal situation: we need to get the socket error from the kernel. */
1258    assert(uv__stream_fd(stream) >= 0);
1259    getsockopt(uv__stream_fd(stream),
1260               SOL_SOCKET,
1261               SO_ERROR,
1262               &error,
1263               &errorsize);
1264    error = UV__ERR(error);
1265  }
1266
1267  if (error == UV__ERR(EINPROGRESS))
1268    return;
1269
1270  stream->connect_req = NULL;
1271  uv__req_unregister(stream->loop, req);
1272
1273  if (error < 0 || uv__queue_empty(&stream->write_queue)) {
1274    uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1275  }
1276
1277  if (req->cb)
1278    req->cb(req, error);
1279
1280  if (uv__stream_fd(stream) == -1)
1281    return;
1282
1283  if (error < 0) {
1284    uv__stream_flush_write_queue(stream, UV_ECANCELED);
1285    uv__write_callbacks(stream);
1286  }
1287}
1288
1289
1290static int uv__check_before_write(uv_stream_t* stream,
1291                                  unsigned int nbufs,
1292                                  uv_stream_t* send_handle) {
1293  assert(nbufs > 0);
1294  assert((stream->type == UV_TCP ||
1295          stream->type == UV_NAMED_PIPE ||
1296          stream->type == UV_TTY) &&
1297         "uv_write (unix) does not yet support other types of streams");
1298
1299  if (uv__stream_fd(stream) < 0)
1300    return UV_EBADF;
1301
1302  if (!(stream->flags & UV_HANDLE_WRITABLE))
1303    return UV_EPIPE;
1304
1305  if (send_handle != NULL) {
1306    if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1307      return UV_EINVAL;
1308
1309    /* XXX We abuse uv_write2() to send over UDP handles to child processes.
1310     * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
1311     * evaluates to a function that operates on a uv_stream_t with a couple of
1312     * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
1313     * which works but only by accident.
1314     */
1315    if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
1316      return UV_EBADF;
1317
1318#if defined(__CYGWIN__) || defined(__MSYS__)
1319    /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
1320       See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
1321    return UV_ENOSYS;
1322#endif
1323  }
1324
1325  return 0;
1326}
1327
1328int uv_write2(uv_write_t* req,
1329              uv_stream_t* stream,
1330              const uv_buf_t bufs[],
1331              unsigned int nbufs,
1332              uv_stream_t* send_handle,
1333              uv_write_cb cb) {
1334  int empty_queue;
1335  int err;
1336
1337  err = uv__check_before_write(stream, nbufs, send_handle);
1338  if (err < 0)
1339    return err;
1340
1341  /* It's legal for write_queue_size > 0 even when the write_queue is empty;
1342   * it means there are error-state requests in the write_completed_queue that
1343   * will touch up write_queue_size later, see also uv__write_req_finish().
1344   * We could check that write_queue is empty instead but that implies making
1345   * a write() syscall when we know that the handle is in error mode.
1346   */
1347  empty_queue = (stream->write_queue_size == 0);
1348
1349  /* Initialize the req */
1350  uv__req_init(stream->loop, req, UV_WRITE);
1351  req->cb = cb;
1352  req->handle = stream;
1353  req->error = 0;
1354  req->send_handle = send_handle;
1355  uv__queue_init(&req->queue);
1356
1357  req->bufs = req->bufsml;
1358  if (nbufs > ARRAY_SIZE(req->bufsml))
1359    req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
1360
1361  if (req->bufs == NULL)
1362    return UV_ENOMEM;
1363
1364  memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
1365  req->nbufs = nbufs;
1366  req->write_index = 0;
1367  stream->write_queue_size += uv__count_bufs(bufs, nbufs);
1368
1369  /* Append the request to write_queue. */
1370  uv__queue_insert_tail(&stream->write_queue, &req->queue);
1371
1372  /* If the queue was empty when this function began, we should attempt to
1373   * do the write immediately. Otherwise start the write_watcher and wait
1374   * for the fd to become writable.
1375   */
1376  if (stream->connect_req) {
1377    /* Still connecting, do nothing. */
1378  }
1379  else if (empty_queue) {
1380    uv__write(stream);
1381  }
1382  else {
1383    /*
1384     * blocking streams should never have anything in the queue.
1385     * if this assert fires then somehow the blocking stream isn't being
1386     * sufficiently flushed in uv__write.
1387     */
1388    assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
1389    uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1390    uv__stream_osx_interrupt_select(stream);
1391  }
1392
1393  return 0;
1394}
1395
1396
1397/* The buffers to be written must remain valid until the callback is called.
1398 * This is not required for the uv_buf_t array.
1399 */
1400int uv_write(uv_write_t* req,
1401             uv_stream_t* handle,
1402             const uv_buf_t bufs[],
1403             unsigned int nbufs,
1404             uv_write_cb cb) {
1405  return uv_write2(req, handle, bufs, nbufs, NULL, cb);
1406}
1407
1408
1409int uv_try_write(uv_stream_t* stream,
1410                 const uv_buf_t bufs[],
1411                 unsigned int nbufs) {
1412  return uv_try_write2(stream, bufs, nbufs, NULL);
1413}
1414
1415
1416int uv_try_write2(uv_stream_t* stream,
1417                  const uv_buf_t bufs[],
1418                  unsigned int nbufs,
1419                  uv_stream_t* send_handle) {
1420  int err;
1421
1422  /* Connecting or already writing some data */
1423  if (stream->connect_req != NULL || stream->write_queue_size != 0)
1424    return UV_EAGAIN;
1425
1426  err = uv__check_before_write(stream, nbufs, NULL);
1427  if (err < 0)
1428    return err;
1429
1430  return uv__try_write(stream, bufs, nbufs, send_handle);
1431}
1432
1433
1434int uv__read_start(uv_stream_t* stream,
1435                   uv_alloc_cb alloc_cb,
1436                   uv_read_cb read_cb) {
1437  assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1438      stream->type == UV_TTY);
1439
1440  /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it
1441   * just expresses the desired state of the user. */
1442  stream->flags |= UV_HANDLE_READING;
1443  stream->flags &= ~UV_HANDLE_READ_EOF;
1444
1445  /* TODO: try to do the read inline? */
1446  assert(uv__stream_fd(stream) >= 0);
1447  assert(alloc_cb);
1448
1449  stream->read_cb = read_cb;
1450  stream->alloc_cb = alloc_cb;
1451
1452  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1453  uv__handle_start(stream);
1454  uv__stream_osx_interrupt_select(stream);
1455
1456  return 0;
1457}
1458
1459
1460int uv_read_stop(uv_stream_t* stream) {
1461  if (!(stream->flags & UV_HANDLE_READING))
1462    return 0;
1463
1464  stream->flags &= ~UV_HANDLE_READING;
1465  uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1466  uv__handle_stop(stream);
1467  uv__stream_osx_interrupt_select(stream);
1468
1469  stream->read_cb = NULL;
1470  stream->alloc_cb = NULL;
1471  return 0;
1472}
1473
1474
1475int uv_is_readable(const uv_stream_t* stream) {
1476  return !!(stream->flags & UV_HANDLE_READABLE);
1477}
1478
1479
1480int uv_is_writable(const uv_stream_t* stream) {
1481  return !!(stream->flags & UV_HANDLE_WRITABLE);
1482}
1483
1484
1485#if defined(__APPLE__)
1486int uv___stream_fd(const uv_stream_t* handle) {
1487  const uv__stream_select_t* s;
1488
1489  assert(handle->type == UV_TCP ||
1490         handle->type == UV_TTY ||
1491         handle->type == UV_NAMED_PIPE);
1492
1493  s = handle->select;
1494  if (s != NULL)
1495    return s->fd;
1496
1497  return handle->io_watcher.fd;
1498}
1499#endif /* defined(__APPLE__) */
1500
1501
1502void uv__stream_close(uv_stream_t* handle) {
1503  unsigned int i;
1504  uv__stream_queued_fds_t* queued_fds;
1505
1506#if defined(__APPLE__)
1507  /* Terminate select loop first */
1508  if (handle->select != NULL) {
1509    uv__stream_select_t* s;
1510
1511    s = handle->select;
1512
1513    uv_sem_post(&s->close_sem);
1514    uv_sem_post(&s->async_sem);
1515    uv__stream_osx_interrupt_select(handle);
1516    uv_thread_join(&s->thread);
1517    uv_sem_destroy(&s->close_sem);
1518    uv_sem_destroy(&s->async_sem);
1519    uv__close(s->fake_fd);
1520    uv__close(s->int_fd);
1521    uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
1522
1523    handle->select = NULL;
1524  }
1525#endif /* defined(__APPLE__) */
1526
1527  uv__io_close(handle->loop, &handle->io_watcher);
1528  uv_read_stop(handle);
1529  uv__handle_stop(handle);
1530  handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1531
1532  if (handle->io_watcher.fd != -1) {
1533    /* Don't close stdio file descriptors.  Nothing good comes from it. */
1534    if (handle->io_watcher.fd > STDERR_FILENO)
1535      uv__close(handle->io_watcher.fd);
1536    handle->io_watcher.fd = -1;
1537  }
1538
1539  if (handle->accepted_fd != -1) {
1540    uv__close(handle->accepted_fd);
1541    handle->accepted_fd = -1;
1542  }
1543
1544  /* Close all queued fds */
1545  if (handle->queued_fds != NULL) {
1546    queued_fds = handle->queued_fds;
1547    for (i = 0; i < queued_fds->offset; i++)
1548      uv__close(queued_fds->fds[i]);
1549    uv__free(handle->queued_fds);
1550    handle->queued_fds = NULL;
1551  }
1552
1553  assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
1554}
1555
1556
1557int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
1558  /* Don't need to check the file descriptor, uv__nonblock()
1559   * will fail with EBADF if it's not valid.
1560   */
1561  return uv__nonblock(uv__stream_fd(handle), !blocking);
1562}
1563