xref: /third_party/node/deps/uv/src/unix/stream.c (revision 1cb0ef41)
1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22#include "uv.h"
23#include "internal.h"
24
25#include <stdio.h>
26#include <stdlib.h>
27#include <string.h>
28#include <assert.h>
29#include <errno.h>
30
31#include <sys/types.h>
32#include <sys/socket.h>
33#include <sys/uio.h>
34#include <sys/un.h>
35#include <unistd.h>
36#include <limits.h> /* IOV_MAX */
37
38#if defined(__APPLE__)
39# include <sys/event.h>
40# include <sys/time.h>
41# include <sys/select.h>
42
43/* Forward declaration */
44typedef struct uv__stream_select_s uv__stream_select_t;
45
46struct uv__stream_select_s {
47  uv_stream_t* stream;
48  uv_thread_t thread;
49  uv_sem_t close_sem;
50  uv_sem_t async_sem;
51  uv_async_t async;
52  int events;
53  int fake_fd;
54  int int_fd;
55  int fd;
56  fd_set* sread;
57  size_t sread_sz;
58  fd_set* swrite;
59  size_t swrite_sz;
60};
61#endif /* defined(__APPLE__) */
62
63static void uv__stream_connect(uv_stream_t*);
64static void uv__write(uv_stream_t* stream);
65static void uv__read(uv_stream_t* stream);
66static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
67static void uv__write_callbacks(uv_stream_t* stream);
68static size_t uv__write_req_size(uv_write_t* req);
69static void uv__drain(uv_stream_t* stream);
70
71
72void uv__stream_init(uv_loop_t* loop,
73                     uv_stream_t* stream,
74                     uv_handle_type type) {
75  int err;
76
77  uv__handle_init(loop, (uv_handle_t*)stream, type);
78  stream->read_cb = NULL;
79  stream->alloc_cb = NULL;
80  stream->close_cb = NULL;
81  stream->connection_cb = NULL;
82  stream->connect_req = NULL;
83  stream->shutdown_req = NULL;
84  stream->accepted_fd = -1;
85  stream->queued_fds = NULL;
86  stream->delayed_error = 0;
87  QUEUE_INIT(&stream->write_queue);
88  QUEUE_INIT(&stream->write_completed_queue);
89  stream->write_queue_size = 0;
90
91  if (loop->emfile_fd == -1) {
92    err = uv__open_cloexec("/dev/null", O_RDONLY);
93    if (err < 0)
94        /* In the rare case that "/dev/null" isn't mounted open "/"
95         * instead.
96         */
97        err = uv__open_cloexec("/", O_RDONLY);
98    if (err >= 0)
99      loop->emfile_fd = err;
100  }
101
102#if defined(__APPLE__)
103  stream->select = NULL;
104#endif /* defined(__APPLE_) */
105
106  uv__io_init(&stream->io_watcher, uv__stream_io, -1);
107}
108
109
110static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
111#if defined(__APPLE__)
112  /* Notify select() thread about state change */
113  uv__stream_select_t* s;
114  int r;
115
116  s = stream->select;
117  if (s == NULL)
118    return;
119
120  /* Interrupt select() loop
121   * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
122   * emit read event on other side
123   */
124  do
125    r = write(s->fake_fd, "x", 1);
126  while (r == -1 && errno == EINTR);
127
128  assert(r == 1);
129#else  /* !defined(__APPLE__) */
130  /* No-op on any other platform */
131#endif  /* !defined(__APPLE__) */
132}
133
134
135#if defined(__APPLE__)
136static void uv__stream_osx_select(void* arg) {
137  uv_stream_t* stream;
138  uv__stream_select_t* s;
139  char buf[1024];
140  int events;
141  int fd;
142  int r;
143  int max_fd;
144
145  stream = arg;
146  s = stream->select;
147  fd = s->fd;
148
149  if (fd > s->int_fd)
150    max_fd = fd;
151  else
152    max_fd = s->int_fd;
153
154  for (;;) {
155    /* Terminate on semaphore */
156    if (uv_sem_trywait(&s->close_sem) == 0)
157      break;
158
159    /* Watch fd using select(2) */
160    memset(s->sread, 0, s->sread_sz);
161    memset(s->swrite, 0, s->swrite_sz);
162
163    if (uv__io_active(&stream->io_watcher, POLLIN))
164      FD_SET(fd, s->sread);
165    if (uv__io_active(&stream->io_watcher, POLLOUT))
166      FD_SET(fd, s->swrite);
167    FD_SET(s->int_fd, s->sread);
168
169    /* Wait indefinitely for fd events */
170    r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
171    if (r == -1) {
172      if (errno == EINTR)
173        continue;
174
175      /* XXX: Possible?! */
176      abort();
177    }
178
179    /* Ignore timeouts */
180    if (r == 0)
181      continue;
182
183    /* Empty socketpair's buffer in case of interruption */
184    if (FD_ISSET(s->int_fd, s->sread))
185      for (;;) {
186        r = read(s->int_fd, buf, sizeof(buf));
187
188        if (r == sizeof(buf))
189          continue;
190
191        if (r != -1)
192          break;
193
194        if (errno == EAGAIN || errno == EWOULDBLOCK)
195          break;
196
197        if (errno == EINTR)
198          continue;
199
200        abort();
201      }
202
203    /* Handle events */
204    events = 0;
205    if (FD_ISSET(fd, s->sread))
206      events |= POLLIN;
207    if (FD_ISSET(fd, s->swrite))
208      events |= POLLOUT;
209
210    assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
211    if (events != 0) {
212      ACCESS_ONCE(int, s->events) = events;
213
214      uv_async_send(&s->async);
215      uv_sem_wait(&s->async_sem);
216
217      /* Should be processed at this stage */
218      assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
219    }
220  }
221}
222
223
224static void uv__stream_osx_select_cb(uv_async_t* handle) {
225  uv__stream_select_t* s;
226  uv_stream_t* stream;
227  int events;
228
229  s = container_of(handle, uv__stream_select_t, async);
230  stream = s->stream;
231
232  /* Get and reset stream's events */
233  events = s->events;
234  ACCESS_ONCE(int, s->events) = 0;
235
236  assert(events != 0);
237  assert(events == (events & (POLLIN | POLLOUT)));
238
239  /* Invoke callback on event-loop */
240  if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
241    uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
242
243  if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
244    uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
245
246  if (stream->flags & UV_HANDLE_CLOSING)
247    return;
248
249  /* NOTE: It is important to do it here, otherwise `select()` might be called
250   * before the actual `uv__read()`, leading to the blocking syscall
251   */
252  uv_sem_post(&s->async_sem);
253}
254
255
256static void uv__stream_osx_cb_close(uv_handle_t* async) {
257  uv__stream_select_t* s;
258
259  s = container_of(async, uv__stream_select_t, async);
260  uv__free(s);
261}
262
263
264int uv__stream_try_select(uv_stream_t* stream, int* fd) {
265  /*
266   * kqueue doesn't work with some files from /dev mount on osx.
267   * select(2) in separate thread for those fds
268   */
269
270  struct kevent filter[1];
271  struct kevent events[1];
272  struct timespec timeout;
273  uv__stream_select_t* s;
274  int fds[2];
275  int err;
276  int ret;
277  int kq;
278  int old_fd;
279  int max_fd;
280  size_t sread_sz;
281  size_t swrite_sz;
282
283  kq = kqueue();
284  if (kq == -1) {
285    perror("(libuv) kqueue()");
286    return UV__ERR(errno);
287  }
288
289  EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
290
291  /* Use small timeout, because we only want to capture EINVALs */
292  timeout.tv_sec = 0;
293  timeout.tv_nsec = 1;
294
295  do
296    ret = kevent(kq, filter, 1, events, 1, &timeout);
297  while (ret == -1 && errno == EINTR);
298
299  uv__close(kq);
300
301  if (ret == -1)
302    return UV__ERR(errno);
303
304  if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
305    return 0;
306
307  /* At this point we definitely know that this fd won't work with kqueue */
308
309  /*
310   * Create fds for io watcher and to interrupt the select() loop.
311   * NOTE: do it ahead of malloc below to allocate enough space for fd_sets
312   */
313  if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
314    return UV__ERR(errno);
315
316  max_fd = *fd;
317  if (fds[1] > max_fd)
318    max_fd = fds[1];
319
320  sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
321  swrite_sz = sread_sz;
322
323  s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
324  if (s == NULL) {
325    err = UV_ENOMEM;
326    goto failed_malloc;
327  }
328
329  s->events = 0;
330  s->fd = *fd;
331  s->sread = (fd_set*) ((char*) s + sizeof(*s));
332  s->sread_sz = sread_sz;
333  s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
334  s->swrite_sz = swrite_sz;
335
336  err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
337  if (err)
338    goto failed_async_init;
339
340  s->async.flags |= UV_HANDLE_INTERNAL;
341  uv__handle_unref(&s->async);
342
343  err = uv_sem_init(&s->close_sem, 0);
344  if (err != 0)
345    goto failed_close_sem_init;
346
347  err = uv_sem_init(&s->async_sem, 0);
348  if (err != 0)
349    goto failed_async_sem_init;
350
351  s->fake_fd = fds[0];
352  s->int_fd = fds[1];
353
354  old_fd = *fd;
355  s->stream = stream;
356  stream->select = s;
357  *fd = s->fake_fd;
358
359  err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
360  if (err != 0)
361    goto failed_thread_create;
362
363  return 0;
364
365failed_thread_create:
366  s->stream = NULL;
367  stream->select = NULL;
368  *fd = old_fd;
369
370  uv_sem_destroy(&s->async_sem);
371
372failed_async_sem_init:
373  uv_sem_destroy(&s->close_sem);
374
375failed_close_sem_init:
376  uv__close(fds[0]);
377  uv__close(fds[1]);
378  uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
379  return err;
380
381failed_async_init:
382  uv__free(s);
383
384failed_malloc:
385  uv__close(fds[0]);
386  uv__close(fds[1]);
387
388  return err;
389}
390#endif /* defined(__APPLE__) */
391
392
393int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
394#if defined(__APPLE__)
395  int enable;
396#endif
397
398  if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
399    return UV_EBUSY;
400
401  assert(fd >= 0);
402  stream->flags |= flags;
403
404  if (stream->type == UV_TCP) {
405    if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
406      return UV__ERR(errno);
407
408    /* TODO Use delay the user passed in. */
409    if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
410        uv__tcp_keepalive(fd, 1, 60)) {
411      return UV__ERR(errno);
412    }
413  }
414
415#if defined(__APPLE__)
416  enable = 1;
417  if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
418      errno != ENOTSOCK &&
419      errno != EINVAL) {
420    return UV__ERR(errno);
421  }
422#endif
423
424  stream->io_watcher.fd = fd;
425
426  return 0;
427}
428
429
430void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
431  uv_write_t* req;
432  QUEUE* q;
433  while (!QUEUE_EMPTY(&stream->write_queue)) {
434    q = QUEUE_HEAD(&stream->write_queue);
435    QUEUE_REMOVE(q);
436
437    req = QUEUE_DATA(q, uv_write_t, queue);
438    req->error = error;
439
440    QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
441  }
442}
443
444
445void uv__stream_destroy(uv_stream_t* stream) {
446  assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
447  assert(stream->flags & UV_HANDLE_CLOSED);
448
449  if (stream->connect_req) {
450    uv__req_unregister(stream->loop, stream->connect_req);
451    stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
452    stream->connect_req = NULL;
453  }
454
455  uv__stream_flush_write_queue(stream, UV_ECANCELED);
456  uv__write_callbacks(stream);
457  uv__drain(stream);
458
459  assert(stream->write_queue_size == 0);
460}
461
462
463/* Implements a best effort approach to mitigating accept() EMFILE errors.
464 * We have a spare file descriptor stashed away that we close to get below
465 * the EMFILE limit. Next, we accept all pending connections and close them
466 * immediately to signal the clients that we're overloaded - and we are, but
467 * we still keep on trucking.
468 *
469 * There is one caveat: it's not reliable in a multi-threaded environment.
470 * The file descriptor limit is per process. Our party trick fails if another
471 * thread opens a file or creates a socket in the time window between us
472 * calling close() and accept().
473 */
474static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
475  int err;
476  int emfile_fd;
477
478  if (loop->emfile_fd == -1)
479    return UV_EMFILE;
480
481  uv__close(loop->emfile_fd);
482  loop->emfile_fd = -1;
483
484  do {
485    err = uv__accept(accept_fd);
486    if (err >= 0)
487      uv__close(err);
488  } while (err >= 0 || err == UV_EINTR);
489
490  emfile_fd = uv__open_cloexec("/", O_RDONLY);
491  if (emfile_fd >= 0)
492    loop->emfile_fd = emfile_fd;
493
494  return err;
495}
496
497
498#if defined(UV_HAVE_KQUEUE)
499# define UV_DEC_BACKLOG(w) w->rcount--;
500#else
501# define UV_DEC_BACKLOG(w) /* no-op */
502#endif /* defined(UV_HAVE_KQUEUE) */
503
504
505void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
506  uv_stream_t* stream;
507  int err;
508
509  stream = container_of(w, uv_stream_t, io_watcher);
510  assert(events & POLLIN);
511  assert(stream->accepted_fd == -1);
512  assert(!(stream->flags & UV_HANDLE_CLOSING));
513
514  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
515
516  /* connection_cb can close the server socket while we're
517   * in the loop so check it on each iteration.
518   */
519  while (uv__stream_fd(stream) != -1) {
520    assert(stream->accepted_fd == -1);
521
522#if defined(UV_HAVE_KQUEUE)
523    if (w->rcount <= 0)
524      return;
525#endif /* defined(UV_HAVE_KQUEUE) */
526
527    err = uv__accept(uv__stream_fd(stream));
528    if (err < 0) {
529      if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
530        return;  /* Not an error. */
531
532      if (err == UV_ECONNABORTED)
533        continue;  /* Ignore. Nothing we can do about that. */
534
535      if (err == UV_EMFILE || err == UV_ENFILE) {
536        err = uv__emfile_trick(loop, uv__stream_fd(stream));
537        if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
538          break;
539      }
540
541      stream->connection_cb(stream, err);
542      continue;
543    }
544
545    UV_DEC_BACKLOG(w)
546    stream->accepted_fd = err;
547    stream->connection_cb(stream, 0);
548
549    if (stream->accepted_fd != -1) {
550      /* The user hasn't yet accepted called uv_accept() */
551      uv__io_stop(loop, &stream->io_watcher, POLLIN);
552      return;
553    }
554
555    if (stream->type == UV_TCP &&
556        (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
557      /* Give other processes a chance to accept connections. */
558      struct timespec timeout = { 0, 1 };
559      nanosleep(&timeout, NULL);
560    }
561  }
562}
563
564
565#undef UV_DEC_BACKLOG
566
567
568int uv_accept(uv_stream_t* server, uv_stream_t* client) {
569  int err;
570
571  assert(server->loop == client->loop);
572
573  if (server->accepted_fd == -1)
574    return UV_EAGAIN;
575
576  switch (client->type) {
577    case UV_NAMED_PIPE:
578    case UV_TCP:
579      err = uv__stream_open(client,
580                            server->accepted_fd,
581                            UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
582      if (err) {
583        /* TODO handle error */
584        uv__close(server->accepted_fd);
585        goto done;
586      }
587      break;
588
589    case UV_UDP:
590      err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
591      if (err) {
592        uv__close(server->accepted_fd);
593        goto done;
594      }
595      break;
596
597    default:
598      return UV_EINVAL;
599  }
600
601  client->flags |= UV_HANDLE_BOUND;
602
603done:
604  /* Process queued fds */
605  if (server->queued_fds != NULL) {
606    uv__stream_queued_fds_t* queued_fds;
607
608    queued_fds = server->queued_fds;
609
610    /* Read first */
611    server->accepted_fd = queued_fds->fds[0];
612
613    /* All read, free */
614    assert(queued_fds->offset > 0);
615    if (--queued_fds->offset == 0) {
616      uv__free(queued_fds);
617      server->queued_fds = NULL;
618    } else {
619      /* Shift rest */
620      memmove(queued_fds->fds,
621              queued_fds->fds + 1,
622              queued_fds->offset * sizeof(*queued_fds->fds));
623    }
624  } else {
625    server->accepted_fd = -1;
626    if (err == 0)
627      uv__io_start(server->loop, &server->io_watcher, POLLIN);
628  }
629  return err;
630}
631
632
633int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
634  int err;
635  if (uv__is_closing(stream)) {
636    return UV_EINVAL;
637  }
638  switch (stream->type) {
639  case UV_TCP:
640    err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb);
641    break;
642
643  case UV_NAMED_PIPE:
644    err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb);
645    break;
646
647  default:
648    err = UV_EINVAL;
649  }
650
651  if (err == 0)
652    uv__handle_start(stream);
653
654  return err;
655}
656
657
658static void uv__drain(uv_stream_t* stream) {
659  uv_shutdown_t* req;
660  int err;
661
662  assert(QUEUE_EMPTY(&stream->write_queue));
663  if (!(stream->flags & UV_HANDLE_CLOSING)) {
664    uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
665    uv__stream_osx_interrupt_select(stream);
666  }
667
668  if (!(stream->flags & UV_HANDLE_SHUTTING))
669    return;
670
671  req = stream->shutdown_req;
672  assert(req);
673
674  if ((stream->flags & UV_HANDLE_CLOSING) ||
675      !(stream->flags & UV_HANDLE_SHUT)) {
676    stream->shutdown_req = NULL;
677    stream->flags &= ~UV_HANDLE_SHUTTING;
678    uv__req_unregister(stream->loop, req);
679
680    err = 0;
681    if (stream->flags & UV_HANDLE_CLOSING)
682      /* The user destroyed the stream before we got to do the shutdown. */
683      err = UV_ECANCELED;
684    else if (shutdown(uv__stream_fd(stream), SHUT_WR))
685      err = UV__ERR(errno);
686    else /* Success. */
687      stream->flags |= UV_HANDLE_SHUT;
688
689    if (req->cb != NULL)
690      req->cb(req, err);
691  }
692}
693
694
695static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
696  if (n == 1)
697    return write(fd, vec->iov_base, vec->iov_len);
698  else
699    return writev(fd, vec, n);
700}
701
702
703static size_t uv__write_req_size(uv_write_t* req) {
704  size_t size;
705
706  assert(req->bufs != NULL);
707  size = uv__count_bufs(req->bufs + req->write_index,
708                        req->nbufs - req->write_index);
709  assert(req->handle->write_queue_size >= size);
710
711  return size;
712}
713
714
715/* Returns 1 if all write request data has been written, or 0 if there is still
716 * more data to write.
717 *
718 * Note: the return value only says something about the *current* request.
719 * There may still be other write requests sitting in the queue.
720 */
721static int uv__write_req_update(uv_stream_t* stream,
722                                uv_write_t* req,
723                                size_t n) {
724  uv_buf_t* buf;
725  size_t len;
726
727  assert(n <= stream->write_queue_size);
728  stream->write_queue_size -= n;
729
730  buf = req->bufs + req->write_index;
731
732  do {
733    len = n < buf->len ? n : buf->len;
734    buf->base += len;
735    buf->len -= len;
736    buf += (buf->len == 0);  /* Advance to next buffer if this one is empty. */
737    n -= len;
738  } while (n > 0);
739
740  req->write_index = buf - req->bufs;
741
742  return req->write_index == req->nbufs;
743}
744
745
746static void uv__write_req_finish(uv_write_t* req) {
747  uv_stream_t* stream = req->handle;
748
749  /* Pop the req off tcp->write_queue. */
750  QUEUE_REMOVE(&req->queue);
751
752  /* Only free when there was no error. On error, we touch up write_queue_size
753   * right before making the callback. The reason we don't do that right away
754   * is that a write_queue_size > 0 is our only way to signal to the user that
755   * they should stop writing - which they should if we got an error. Something
756   * to revisit in future revisions of the libuv API.
757   */
758  if (req->error == 0) {
759    if (req->bufs != req->bufsml)
760      uv__free(req->bufs);
761    req->bufs = NULL;
762  }
763
764  /* Add it to the write_completed_queue where it will have its
765   * callback called in the near future.
766   */
767  QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
768  uv__io_feed(stream->loop, &stream->io_watcher);
769}
770
771
772static int uv__handle_fd(uv_handle_t* handle) {
773  switch (handle->type) {
774    case UV_NAMED_PIPE:
775    case UV_TCP:
776      return ((uv_stream_t*) handle)->io_watcher.fd;
777
778    case UV_UDP:
779      return ((uv_udp_t*) handle)->io_watcher.fd;
780
781    default:
782      return -1;
783  }
784}
785
786static int uv__try_write(uv_stream_t* stream,
787                         const uv_buf_t bufs[],
788                         unsigned int nbufs,
789                         uv_stream_t* send_handle) {
790  struct iovec* iov;
791  int iovmax;
792  int iovcnt;
793  ssize_t n;
794
795  /*
796   * Cast to iovec. We had to have our own uv_buf_t instead of iovec
797   * because Windows's WSABUF is not an iovec.
798   */
799  iov = (struct iovec*) bufs;
800  iovcnt = nbufs;
801
802  iovmax = uv__getiovmax();
803
804  /* Limit iov count to avoid EINVALs from writev() */
805  if (iovcnt > iovmax)
806    iovcnt = iovmax;
807
808  /*
809   * Now do the actual writev. Note that we've been updating the pointers
810   * inside the iov each time we write. So there is no need to offset it.
811   */
812  if (send_handle != NULL) {
813    int fd_to_send;
814    struct msghdr msg;
815    struct cmsghdr *cmsg;
816    union {
817      char data[64];
818      struct cmsghdr alias;
819    } scratch;
820
821    if (uv__is_closing(send_handle))
822      return UV_EBADF;
823
824    fd_to_send = uv__handle_fd((uv_handle_t*) send_handle);
825
826    memset(&scratch, 0, sizeof(scratch));
827
828    assert(fd_to_send >= 0);
829
830    msg.msg_name = NULL;
831    msg.msg_namelen = 0;
832    msg.msg_iov = iov;
833    msg.msg_iovlen = iovcnt;
834    msg.msg_flags = 0;
835
836    msg.msg_control = &scratch.alias;
837    msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
838
839    cmsg = CMSG_FIRSTHDR(&msg);
840    cmsg->cmsg_level = SOL_SOCKET;
841    cmsg->cmsg_type = SCM_RIGHTS;
842    cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
843
844    /* silence aliasing warning */
845    {
846      void* pv = CMSG_DATA(cmsg);
847      int* pi = pv;
848      *pi = fd_to_send;
849    }
850
851    do
852      n = sendmsg(uv__stream_fd(stream), &msg, 0);
853    while (n == -1 && errno == EINTR);
854  } else {
855    do
856      n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
857    while (n == -1 && errno == EINTR);
858  }
859
860  if (n >= 0)
861    return n;
862
863  if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
864    return UV_EAGAIN;
865
866#ifdef __APPLE__
867  /* macOS versions 10.10 and 10.15 - and presumbaly 10.11 to 10.14, too -
868   * have a bug where a race condition causes the kernel to return EPROTOTYPE
869   * because the socket isn't fully constructed. It's probably the result of
870   * the peer closing the connection and that is why libuv translates it to
871   * ECONNRESET. Previously, libuv retried until the EPROTOTYPE error went
872   * away but some VPN software causes the same behavior except the error is
873   * permanent, not transient, turning the retry mechanism into an infinite
874   * loop. See https://github.com/libuv/libuv/pull/482.
875   */
876  if (errno == EPROTOTYPE)
877    return UV_ECONNRESET;
878#endif  /* __APPLE__ */
879
880  return UV__ERR(errno);
881}
882
883static void uv__write(uv_stream_t* stream) {
884  QUEUE* q;
885  uv_write_t* req;
886  ssize_t n;
887
888  assert(uv__stream_fd(stream) >= 0);
889
890  for (;;) {
891    if (QUEUE_EMPTY(&stream->write_queue))
892      return;
893
894    q = QUEUE_HEAD(&stream->write_queue);
895    req = QUEUE_DATA(q, uv_write_t, queue);
896    assert(req->handle == stream);
897
898    n = uv__try_write(stream,
899                      &(req->bufs[req->write_index]),
900                      req->nbufs - req->write_index,
901                      req->send_handle);
902
903    /* Ensure the handle isn't sent again in case this is a partial write. */
904    if (n >= 0) {
905      req->send_handle = NULL;
906      if (uv__write_req_update(stream, req, n)) {
907        uv__write_req_finish(req);
908        return;  /* TODO(bnoordhuis) Start trying to write the next request. */
909      }
910    } else if (n != UV_EAGAIN)
911      break;
912
913    /* If this is a blocking stream, try again. */
914    if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
915      continue;
916
917    /* We're not done. */
918    uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
919
920    /* Notify select() thread about state change */
921    uv__stream_osx_interrupt_select(stream);
922
923    return;
924  }
925
926  req->error = n;
927  uv__write_req_finish(req);
928  uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
929  uv__stream_osx_interrupt_select(stream);
930}
931
932
933static void uv__write_callbacks(uv_stream_t* stream) {
934  uv_write_t* req;
935  QUEUE* q;
936  QUEUE pq;
937
938  if (QUEUE_EMPTY(&stream->write_completed_queue))
939    return;
940
941  QUEUE_MOVE(&stream->write_completed_queue, &pq);
942
943  while (!QUEUE_EMPTY(&pq)) {
944    /* Pop a req off write_completed_queue. */
945    q = QUEUE_HEAD(&pq);
946    req = QUEUE_DATA(q, uv_write_t, queue);
947    QUEUE_REMOVE(q);
948    uv__req_unregister(stream->loop, req);
949
950    if (req->bufs != NULL) {
951      stream->write_queue_size -= uv__write_req_size(req);
952      if (req->bufs != req->bufsml)
953        uv__free(req->bufs);
954      req->bufs = NULL;
955    }
956
957    /* NOTE: call callback AFTER freeing the request data. */
958    if (req->cb)
959      req->cb(req, req->error);
960  }
961}
962
963
964static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
965  stream->flags |= UV_HANDLE_READ_EOF;
966  stream->flags &= ~UV_HANDLE_READING;
967  uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
968  uv__handle_stop(stream);
969  uv__stream_osx_interrupt_select(stream);
970  stream->read_cb(stream, UV_EOF, buf);
971}
972
973
974static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
975  uv__stream_queued_fds_t* queued_fds;
976  unsigned int queue_size;
977
978  queued_fds = stream->queued_fds;
979  if (queued_fds == NULL) {
980    queue_size = 8;
981    queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
982                            sizeof(*queued_fds));
983    if (queued_fds == NULL)
984      return UV_ENOMEM;
985    queued_fds->size = queue_size;
986    queued_fds->offset = 0;
987    stream->queued_fds = queued_fds;
988
989    /* Grow */
990  } else if (queued_fds->size == queued_fds->offset) {
991    queue_size = queued_fds->size + 8;
992    queued_fds = uv__realloc(queued_fds,
993                             (queue_size - 1) * sizeof(*queued_fds->fds) +
994                              sizeof(*queued_fds));
995
996    /*
997     * Allocation failure, report back.
998     * NOTE: if it is fatal - sockets will be closed in uv__stream_close
999     */
1000    if (queued_fds == NULL)
1001      return UV_ENOMEM;
1002    queued_fds->size = queue_size;
1003    stream->queued_fds = queued_fds;
1004  }
1005
1006  /* Put fd in a queue */
1007  queued_fds->fds[queued_fds->offset++] = fd;
1008
1009  return 0;
1010}
1011
1012
1013#if defined(__PASE__)
1014/* on IBMi PASE the control message length can not exceed 256. */
1015# define UV__CMSG_FD_COUNT 60
1016#else
1017# define UV__CMSG_FD_COUNT 64
1018#endif
1019#define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int))
1020
1021
1022static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
1023  struct cmsghdr* cmsg;
1024
1025  for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
1026    char* start;
1027    char* end;
1028    int err;
1029    void* pv;
1030    int* pi;
1031    unsigned int i;
1032    unsigned int count;
1033
1034    if (cmsg->cmsg_type != SCM_RIGHTS) {
1035      fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
1036          cmsg->cmsg_type);
1037      continue;
1038    }
1039
1040    /* silence aliasing warning */
1041    pv = CMSG_DATA(cmsg);
1042    pi = pv;
1043
1044    /* Count available fds */
1045    start = (char*) cmsg;
1046    end = (char*) cmsg + cmsg->cmsg_len;
1047    count = 0;
1048    while (start + CMSG_LEN(count * sizeof(*pi)) < end)
1049      count++;
1050    assert(start + CMSG_LEN(count * sizeof(*pi)) == end);
1051
1052    for (i = 0; i < count; i++) {
1053      /* Already has accepted fd, queue now */
1054      if (stream->accepted_fd != -1) {
1055        err = uv__stream_queue_fd(stream, pi[i]);
1056        if (err != 0) {
1057          /* Close rest */
1058          for (; i < count; i++)
1059            uv__close(pi[i]);
1060          return err;
1061        }
1062      } else {
1063        stream->accepted_fd = pi[i];
1064      }
1065    }
1066  }
1067
1068  return 0;
1069}
1070
1071
1072#ifdef __clang__
1073# pragma clang diagnostic push
1074# pragma clang diagnostic ignored "-Wgnu-folding-constant"
1075# pragma clang diagnostic ignored "-Wvla-extension"
1076#endif
1077
1078static void uv__read(uv_stream_t* stream) {
1079  uv_buf_t buf;
1080  ssize_t nread;
1081  struct msghdr msg;
1082  char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
1083  int count;
1084  int err;
1085  int is_ipc;
1086
1087  stream->flags &= ~UV_HANDLE_READ_PARTIAL;
1088
1089  /* Prevent loop starvation when the data comes in as fast as (or faster than)
1090   * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
1091   */
1092  count = 32;
1093
1094  is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
1095
1096  /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
1097   * tcp->read_cb is NULL or not?
1098   */
1099  while (stream->read_cb
1100      && (stream->flags & UV_HANDLE_READING)
1101      && (count-- > 0)) {
1102    assert(stream->alloc_cb != NULL);
1103
1104    buf = uv_buf_init(NULL, 0);
1105    stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
1106    if (buf.base == NULL || buf.len == 0) {
1107      /* User indicates it can't or won't handle the read. */
1108      stream->read_cb(stream, UV_ENOBUFS, &buf);
1109      return;
1110    }
1111
1112    assert(buf.base != NULL);
1113    assert(uv__stream_fd(stream) >= 0);
1114
1115    if (!is_ipc) {
1116      do {
1117        nread = read(uv__stream_fd(stream), buf.base, buf.len);
1118      }
1119      while (nread < 0 && errno == EINTR);
1120    } else {
1121      /* ipc uses recvmsg */
1122      msg.msg_flags = 0;
1123      msg.msg_iov = (struct iovec*) &buf;
1124      msg.msg_iovlen = 1;
1125      msg.msg_name = NULL;
1126      msg.msg_namelen = 0;
1127      /* Set up to receive a descriptor even if one isn't in the message */
1128      msg.msg_controllen = sizeof(cmsg_space);
1129      msg.msg_control = cmsg_space;
1130
1131      do {
1132        nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1133      }
1134      while (nread < 0 && errno == EINTR);
1135    }
1136
1137    if (nread < 0) {
1138      /* Error */
1139      if (errno == EAGAIN || errno == EWOULDBLOCK) {
1140        /* Wait for the next one. */
1141        if (stream->flags & UV_HANDLE_READING) {
1142          uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1143          uv__stream_osx_interrupt_select(stream);
1144        }
1145        stream->read_cb(stream, 0, &buf);
1146#if defined(__CYGWIN__) || defined(__MSYS__)
1147      } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
1148        uv__stream_eof(stream, &buf);
1149        return;
1150#endif
1151      } else {
1152        /* Error. User should call uv_close(). */
1153        stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1154        stream->read_cb(stream, UV__ERR(errno), &buf);
1155        if (stream->flags & UV_HANDLE_READING) {
1156          stream->flags &= ~UV_HANDLE_READING;
1157          uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1158          uv__handle_stop(stream);
1159          uv__stream_osx_interrupt_select(stream);
1160        }
1161      }
1162      return;
1163    } else if (nread == 0) {
1164      uv__stream_eof(stream, &buf);
1165      return;
1166    } else {
1167      /* Successful read */
1168      ssize_t buflen = buf.len;
1169
1170      if (is_ipc) {
1171        err = uv__stream_recv_cmsg(stream, &msg);
1172        if (err != 0) {
1173          stream->read_cb(stream, err, &buf);
1174          return;
1175        }
1176      }
1177
1178#if defined(__MVS__)
1179      if (is_ipc && msg.msg_controllen > 0) {
1180        uv_buf_t blankbuf;
1181        int nread;
1182        struct iovec *old;
1183
1184        blankbuf.base = 0;
1185        blankbuf.len = 0;
1186        old = msg.msg_iov;
1187        msg.msg_iov = (struct iovec*) &blankbuf;
1188        nread = 0;
1189        do {
1190          nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1191          err = uv__stream_recv_cmsg(stream, &msg);
1192          if (err != 0) {
1193            stream->read_cb(stream, err, &buf);
1194            msg.msg_iov = old;
1195            return;
1196          }
1197        } while (nread == 0 && msg.msg_controllen > 0);
1198        msg.msg_iov = old;
1199      }
1200#endif
1201      stream->read_cb(stream, nread, &buf);
1202
1203      /* Return if we didn't fill the buffer, there is no more data to read. */
1204      if (nread < buflen) {
1205        stream->flags |= UV_HANDLE_READ_PARTIAL;
1206        return;
1207      }
1208    }
1209  }
1210}
1211
1212
1213#ifdef __clang__
1214# pragma clang diagnostic pop
1215#endif
1216
1217#undef UV__CMSG_FD_COUNT
1218#undef UV__CMSG_FD_SIZE
1219
1220
1221int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
1222  assert(stream->type == UV_TCP ||
1223         stream->type == UV_TTY ||
1224         stream->type == UV_NAMED_PIPE);
1225
1226  if (!(stream->flags & UV_HANDLE_WRITABLE) ||
1227      stream->flags & UV_HANDLE_SHUT ||
1228      stream->flags & UV_HANDLE_SHUTTING ||
1229      uv__is_closing(stream)) {
1230    return UV_ENOTCONN;
1231  }
1232
1233  assert(uv__stream_fd(stream) >= 0);
1234
1235  /* Initialize request. The `shutdown(2)` call will always be deferred until
1236   * `uv__drain`, just before the callback is run. */
1237  uv__req_init(stream->loop, req, UV_SHUTDOWN);
1238  req->handle = stream;
1239  req->cb = cb;
1240  stream->shutdown_req = req;
1241  stream->flags |= UV_HANDLE_SHUTTING;
1242  stream->flags &= ~UV_HANDLE_WRITABLE;
1243
1244  if (QUEUE_EMPTY(&stream->write_queue))
1245    uv__io_feed(stream->loop, &stream->io_watcher);
1246
1247  return 0;
1248}
1249
1250
1251static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
1252  uv_stream_t* stream;
1253
1254  stream = container_of(w, uv_stream_t, io_watcher);
1255
1256  assert(stream->type == UV_TCP ||
1257         stream->type == UV_NAMED_PIPE ||
1258         stream->type == UV_TTY);
1259  assert(!(stream->flags & UV_HANDLE_CLOSING));
1260
1261  if (stream->connect_req) {
1262    uv__stream_connect(stream);
1263    return;
1264  }
1265
1266  assert(uv__stream_fd(stream) >= 0);
1267
1268  /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
1269  if (events & (POLLIN | POLLERR | POLLHUP))
1270    uv__read(stream);
1271
1272  if (uv__stream_fd(stream) == -1)
1273    return;  /* read_cb closed stream. */
1274
1275  /* Short-circuit iff POLLHUP is set, the user is still interested in read
1276   * events and uv__read() reported a partial read but not EOF. If the EOF
1277   * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
1278   * have to do anything. If the partial read flag is not set, we can't
1279   * report the EOF yet because there is still data to read.
1280   */
1281  if ((events & POLLHUP) &&
1282      (stream->flags & UV_HANDLE_READING) &&
1283      (stream->flags & UV_HANDLE_READ_PARTIAL) &&
1284      !(stream->flags & UV_HANDLE_READ_EOF)) {
1285    uv_buf_t buf = { NULL, 0 };
1286    uv__stream_eof(stream, &buf);
1287  }
1288
1289  if (uv__stream_fd(stream) == -1)
1290    return;  /* read_cb closed stream. */
1291
1292  if (events & (POLLOUT | POLLERR | POLLHUP)) {
1293    uv__write(stream);
1294    uv__write_callbacks(stream);
1295
1296    /* Write queue drained. */
1297    if (QUEUE_EMPTY(&stream->write_queue))
1298      uv__drain(stream);
1299  }
1300}
1301
1302
1303/**
1304 * We get called here from directly following a call to connect(2).
1305 * In order to determine if we've errored out or succeeded must call
1306 * getsockopt.
1307 */
1308static void uv__stream_connect(uv_stream_t* stream) {
1309  int error;
1310  uv_connect_t* req = stream->connect_req;
1311  socklen_t errorsize = sizeof(int);
1312
1313  assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1314  assert(req);
1315
1316  if (stream->delayed_error) {
1317    /* To smooth over the differences between unixes errors that
1318     * were reported synchronously on the first connect can be delayed
1319     * until the next tick--which is now.
1320     */
1321    error = stream->delayed_error;
1322    stream->delayed_error = 0;
1323  } else {
1324    /* Normal situation: we need to get the socket error from the kernel. */
1325    assert(uv__stream_fd(stream) >= 0);
1326    getsockopt(uv__stream_fd(stream),
1327               SOL_SOCKET,
1328               SO_ERROR,
1329               &error,
1330               &errorsize);
1331    error = UV__ERR(error);
1332  }
1333
1334  if (error == UV__ERR(EINPROGRESS))
1335    return;
1336
1337  stream->connect_req = NULL;
1338  uv__req_unregister(stream->loop, req);
1339
1340  if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
1341    uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1342  }
1343
1344  if (req->cb)
1345    req->cb(req, error);
1346
1347  if (uv__stream_fd(stream) == -1)
1348    return;
1349
1350  if (error < 0) {
1351    uv__stream_flush_write_queue(stream, UV_ECANCELED);
1352    uv__write_callbacks(stream);
1353  }
1354}
1355
1356
1357static int uv__check_before_write(uv_stream_t* stream,
1358                                  unsigned int nbufs,
1359                                  uv_stream_t* send_handle) {
1360  assert(nbufs > 0);
1361  assert((stream->type == UV_TCP ||
1362          stream->type == UV_NAMED_PIPE ||
1363          stream->type == UV_TTY) &&
1364         "uv_write (unix) does not yet support other types of streams");
1365
1366  if (uv__stream_fd(stream) < 0)
1367    return UV_EBADF;
1368
1369  if (!(stream->flags & UV_HANDLE_WRITABLE))
1370    return UV_EPIPE;
1371
1372  if (send_handle != NULL) {
1373    if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1374      return UV_EINVAL;
1375
1376    /* XXX We abuse uv_write2() to send over UDP handles to child processes.
1377     * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
1378     * evaluates to a function that operates on a uv_stream_t with a couple of
1379     * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
1380     * which works but only by accident.
1381     */
1382    if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
1383      return UV_EBADF;
1384
1385#if defined(__CYGWIN__) || defined(__MSYS__)
1386    /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
1387       See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
1388    return UV_ENOSYS;
1389#endif
1390  }
1391
1392  return 0;
1393}
1394
1395int uv_write2(uv_write_t* req,
1396              uv_stream_t* stream,
1397              const uv_buf_t bufs[],
1398              unsigned int nbufs,
1399              uv_stream_t* send_handle,
1400              uv_write_cb cb) {
1401  int empty_queue;
1402  int err;
1403
1404  err = uv__check_before_write(stream, nbufs, send_handle);
1405  if (err < 0)
1406    return err;
1407
1408  /* It's legal for write_queue_size > 0 even when the write_queue is empty;
1409   * it means there are error-state requests in the write_completed_queue that
1410   * will touch up write_queue_size later, see also uv__write_req_finish().
1411   * We could check that write_queue is empty instead but that implies making
1412   * a write() syscall when we know that the handle is in error mode.
1413   */
1414  empty_queue = (stream->write_queue_size == 0);
1415
1416  /* Initialize the req */
1417  uv__req_init(stream->loop, req, UV_WRITE);
1418  req->cb = cb;
1419  req->handle = stream;
1420  req->error = 0;
1421  req->send_handle = send_handle;
1422  QUEUE_INIT(&req->queue);
1423
1424  req->bufs = req->bufsml;
1425  if (nbufs > ARRAY_SIZE(req->bufsml))
1426    req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
1427
1428  if (req->bufs == NULL)
1429    return UV_ENOMEM;
1430
1431  memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
1432  req->nbufs = nbufs;
1433  req->write_index = 0;
1434  stream->write_queue_size += uv__count_bufs(bufs, nbufs);
1435
1436  /* Append the request to write_queue. */
1437  QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
1438
1439  /* If the queue was empty when this function began, we should attempt to
1440   * do the write immediately. Otherwise start the write_watcher and wait
1441   * for the fd to become writable.
1442   */
1443  if (stream->connect_req) {
1444    /* Still connecting, do nothing. */
1445  }
1446  else if (empty_queue) {
1447    uv__write(stream);
1448  }
1449  else {
1450    /*
1451     * blocking streams should never have anything in the queue.
1452     * if this assert fires then somehow the blocking stream isn't being
1453     * sufficiently flushed in uv__write.
1454     */
1455    assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
1456    uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1457    uv__stream_osx_interrupt_select(stream);
1458  }
1459
1460  return 0;
1461}
1462
1463
1464/* The buffers to be written must remain valid until the callback is called.
1465 * This is not required for the uv_buf_t array.
1466 */
1467int uv_write(uv_write_t* req,
1468             uv_stream_t* handle,
1469             const uv_buf_t bufs[],
1470             unsigned int nbufs,
1471             uv_write_cb cb) {
1472  return uv_write2(req, handle, bufs, nbufs, NULL, cb);
1473}
1474
1475
1476int uv_try_write(uv_stream_t* stream,
1477                 const uv_buf_t bufs[],
1478                 unsigned int nbufs) {
1479  return uv_try_write2(stream, bufs, nbufs, NULL);
1480}
1481
1482
1483int uv_try_write2(uv_stream_t* stream,
1484                  const uv_buf_t bufs[],
1485                  unsigned int nbufs,
1486                  uv_stream_t* send_handle) {
1487  int err;
1488
1489  /* Connecting or already writing some data */
1490  if (stream->connect_req != NULL || stream->write_queue_size != 0)
1491    return UV_EAGAIN;
1492
1493  err = uv__check_before_write(stream, nbufs, NULL);
1494  if (err < 0)
1495    return err;
1496
1497  return uv__try_write(stream, bufs, nbufs, send_handle);
1498}
1499
1500
1501int uv__read_start(uv_stream_t* stream,
1502                   uv_alloc_cb alloc_cb,
1503                   uv_read_cb read_cb) {
1504  assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1505      stream->type == UV_TTY);
1506
1507  /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it
1508   * just expresses the desired state of the user. */
1509  stream->flags |= UV_HANDLE_READING;
1510  stream->flags &= ~UV_HANDLE_READ_EOF;
1511
1512  /* TODO: try to do the read inline? */
1513  assert(uv__stream_fd(stream) >= 0);
1514  assert(alloc_cb);
1515
1516  stream->read_cb = read_cb;
1517  stream->alloc_cb = alloc_cb;
1518
1519  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1520  uv__handle_start(stream);
1521  uv__stream_osx_interrupt_select(stream);
1522
1523  return 0;
1524}
1525
1526
1527int uv_read_stop(uv_stream_t* stream) {
1528  if (!(stream->flags & UV_HANDLE_READING))
1529    return 0;
1530
1531  stream->flags &= ~UV_HANDLE_READING;
1532  uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1533  uv__handle_stop(stream);
1534  uv__stream_osx_interrupt_select(stream);
1535
1536  stream->read_cb = NULL;
1537  stream->alloc_cb = NULL;
1538  return 0;
1539}
1540
1541
1542int uv_is_readable(const uv_stream_t* stream) {
1543  return !!(stream->flags & UV_HANDLE_READABLE);
1544}
1545
1546
1547int uv_is_writable(const uv_stream_t* stream) {
1548  return !!(stream->flags & UV_HANDLE_WRITABLE);
1549}
1550
1551
1552#if defined(__APPLE__)
1553int uv___stream_fd(const uv_stream_t* handle) {
1554  const uv__stream_select_t* s;
1555
1556  assert(handle->type == UV_TCP ||
1557         handle->type == UV_TTY ||
1558         handle->type == UV_NAMED_PIPE);
1559
1560  s = handle->select;
1561  if (s != NULL)
1562    return s->fd;
1563
1564  return handle->io_watcher.fd;
1565}
1566#endif /* defined(__APPLE__) */
1567
1568
1569void uv__stream_close(uv_stream_t* handle) {
1570  unsigned int i;
1571  uv__stream_queued_fds_t* queued_fds;
1572
1573#if defined(__APPLE__)
1574  /* Terminate select loop first */
1575  if (handle->select != NULL) {
1576    uv__stream_select_t* s;
1577
1578    s = handle->select;
1579
1580    uv_sem_post(&s->close_sem);
1581    uv_sem_post(&s->async_sem);
1582    uv__stream_osx_interrupt_select(handle);
1583    uv_thread_join(&s->thread);
1584    uv_sem_destroy(&s->close_sem);
1585    uv_sem_destroy(&s->async_sem);
1586    uv__close(s->fake_fd);
1587    uv__close(s->int_fd);
1588    uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
1589
1590    handle->select = NULL;
1591  }
1592#endif /* defined(__APPLE__) */
1593
1594  uv__io_close(handle->loop, &handle->io_watcher);
1595  uv_read_stop(handle);
1596  uv__handle_stop(handle);
1597  handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1598
1599  if (handle->io_watcher.fd != -1) {
1600    /* Don't close stdio file descriptors.  Nothing good comes from it. */
1601    if (handle->io_watcher.fd > STDERR_FILENO)
1602      uv__close(handle->io_watcher.fd);
1603    handle->io_watcher.fd = -1;
1604  }
1605
1606  if (handle->accepted_fd != -1) {
1607    uv__close(handle->accepted_fd);
1608    handle->accepted_fd = -1;
1609  }
1610
1611  /* Close all queued fds */
1612  if (handle->queued_fds != NULL) {
1613    queued_fds = handle->queued_fds;
1614    for (i = 0; i < queued_fds->offset; i++)
1615      uv__close(queued_fds->fds[i]);
1616    uv__free(handle->queued_fds);
1617    handle->queued_fds = NULL;
1618  }
1619
1620  assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
1621}
1622
1623
1624int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
1625  /* Don't need to check the file descriptor, uv__nonblock()
1626   * will fail with EBADF if it's not valid.
1627   */
1628  return uv__nonblock(uv__stream_fd(handle), !blocking);
1629}
1630