Lines Matching refs:stream

47   uv_stream_t* stream;
64 static void uv__write(uv_stream_t* stream);
65 static void uv__read(uv_stream_t* stream);
67 static void uv__write_callbacks(uv_stream_t* stream);
69 static void uv__drain(uv_stream_t* stream);
73 uv_stream_t* stream,
77 uv__handle_init(loop, (uv_handle_t*)stream, type);
78 stream->read_cb = NULL;
79 stream->alloc_cb = NULL;
80 stream->close_cb = NULL;
81 stream->connection_cb = NULL;
82 stream->connect_req = NULL;
83 stream->shutdown_req = NULL;
84 stream->accepted_fd = -1;
85 stream->queued_fds = NULL;
86 stream->delayed_error = 0;
87 QUEUE_INIT(&stream->write_queue);
88 QUEUE_INIT(&stream->write_completed_queue);
89 stream->write_queue_size = 0;
103 stream->select = NULL;
106 uv__io_init(&stream->io_watcher, uv__stream_io, -1);
110 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
116 s = stream->select;
137 uv_stream_t* stream;
145 stream = arg;
146 s = stream->select;
163 if (uv__io_active(&stream->io_watcher, POLLIN))
165 if (uv__io_active(&stream->io_watcher, POLLOUT))
218 assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
226 uv_stream_t* stream;
230 stream = s->stream;
232 /* Get and reset stream's events */
240 if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
241 uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
243 if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
244 uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
246 if (stream->flags & UV_HANDLE_CLOSING)
264 int uv__stream_try_select(uv_stream_t* stream, int* fd) {
336 err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
355 s->stream = stream;
356 stream->select = s;
359 err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
366 s->stream = NULL;
367 stream->select = NULL;
393 int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
398 if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
402 stream->flags |= flags;
404 if (stream->type == UV_TCP) {
405 if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
409 if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
424 stream->io_watcher.fd = fd;
430 void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
433 while (!QUEUE_EMPTY(&stream->write_queue)) {
434 q = QUEUE_HEAD(&stream->write_queue);
440 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
445 void uv__stream_destroy(uv_stream_t* stream) {
446 assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
447 assert(stream->flags & UV_HANDLE_CLOSED);
449 if (stream->connect_req) {
450 uv__req_unregister(stream->loop, stream->connect_req);
451 stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
452 stream->connect_req = NULL;
455 uv__stream_flush_write_queue(stream, UV_ECANCELED);
456 uv__write_callbacks(stream);
457 uv__drain(stream);
459 assert(stream->write_queue_size == 0);
506 uv_stream_t* stream;
509 stream = container_of(w, uv_stream_t, io_watcher);
511 assert(stream->accepted_fd == -1);
512 assert(!(stream->flags & UV_HANDLE_CLOSING));
514 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
519 while (uv__stream_fd(stream) != -1) {
520 assert(stream->accepted_fd == -1);
527 err = uv__accept(uv__stream_fd(stream));
536 err = uv__emfile_trick(loop, uv__stream_fd(stream));
541 stream->connection_cb(stream, err);
546 stream->accepted_fd = err;
547 stream->connection_cb(stream, 0);
549 if (stream->accepted_fd != -1) {
551 uv__io_stop(loop, &stream->io_watcher, POLLIN);
555 if (stream->type == UV_TCP &&
556 (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
633 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
635 if (uv__is_closing(stream)) {
638 switch (stream->type) {
640 err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb);
644 err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb);
652 uv__handle_start(stream);
658 static void uv__drain(uv_stream_t* stream) {
662 assert(QUEUE_EMPTY(&stream->write_queue));
663 if (!(stream->flags & UV_HANDLE_CLOSING)) {
664 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
665 uv__stream_osx_interrupt_select(stream);
668 if (!(stream->flags & UV_HANDLE_SHUTTING))
671 req = stream->shutdown_req;
674 if ((stream->flags & UV_HANDLE_CLOSING) ||
675 !(stream->flags & UV_HANDLE_SHUT)) {
676 stream->shutdown_req = NULL;
677 stream->flags &= ~UV_HANDLE_SHUTTING;
678 uv__req_unregister(stream->loop, req);
681 if (stream->flags & UV_HANDLE_CLOSING)
682 /* The user destroyed the stream before we got to do the shutdown. */
684 else if (shutdown(uv__stream_fd(stream), SHUT_WR))
687 stream->flags |= UV_HANDLE_SHUT;
721 static int uv__write_req_update(uv_stream_t* stream,
727 assert(n <= stream->write_queue_size);
728 stream->write_queue_size -= n;
747 uv_stream_t* stream = req->handle;
767 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
768 uv__io_feed(stream->loop, &stream->io_watcher);
786 static int uv__try_write(uv_stream_t* stream,
852 n = sendmsg(uv__stream_fd(stream), &msg, 0);
856 n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
883 static void uv__write(uv_stream_t* stream) {
888 assert(uv__stream_fd(stream) >= 0);
891 if (QUEUE_EMPTY(&stream->write_queue))
894 q = QUEUE_HEAD(&stream->write_queue);
896 assert(req->handle == stream);
898 n = uv__try_write(stream,
906 if (uv__write_req_update(stream, req, n)) {
913 /* If this is a blocking stream, try again. */
914 if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
918 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
921 uv__stream_osx_interrupt_select(stream);
928 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
929 uv__stream_osx_interrupt_select(stream);
933 static void uv__write_callbacks(uv_stream_t* stream) {
938 if (QUEUE_EMPTY(&stream->write_completed_queue))
941 QUEUE_MOVE(&stream->write_completed_queue, &pq);
948 uv__req_unregister(stream->loop, req);
951 stream->write_queue_size -= uv__write_req_size(req);
964 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
965 stream->flags |= UV_HANDLE_READ_EOF;
966 stream->flags &= ~UV_HANDLE_READING;
967 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
968 uv__handle_stop(stream);
969 uv__stream_osx_interrupt_select(stream);
970 stream->read_cb(stream, UV_EOF, buf);
974 static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
978 queued_fds = stream->queued_fds;
987 stream->queued_fds = queued_fds;
1003 stream->queued_fds = queued_fds;
1022 static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
1054 if (stream->accepted_fd != -1) {
1055 err = uv__stream_queue_fd(stream, pi[i]);
1063 stream->accepted_fd = pi[i];
1078 static void uv__read(uv_stream_t* stream) {
1087 stream->flags &= ~UV_HANDLE_READ_PARTIAL;
1094 is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
1099 while (stream->read_cb
1100 && (stream->flags & UV_HANDLE_READING)
1102 assert(stream->alloc_cb != NULL);
1105 stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
1108 stream->read_cb(stream, UV_ENOBUFS, &buf);
1113 assert(uv__stream_fd(stream) >= 0);
1117 nread = read(uv__stream_fd(stream), buf.base, buf.len);
1132 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1141 if (stream->flags & UV_HANDLE_READING) {
1142 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1143 uv__stream_osx_interrupt_select(stream);
1145 stream->read_cb(stream, 0, &buf);
1147 } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
1148 uv__stream_eof(stream, &buf);
1153 stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1154 stream->read_cb(stream, UV__ERR(errno), &buf);
1155 if (stream->flags & UV_HANDLE_READING) {
1156 stream->flags &= ~UV_HANDLE_READING;
1157 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1158 uv__handle_stop(stream);
1159 uv__stream_osx_interrupt_select(stream);
1164 uv__stream_eof(stream, &buf);
1171 err = uv__stream_recv_cmsg(stream, &msg);
1173 stream->read_cb(stream, err, &buf);
1190 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1191 err = uv__stream_recv_cmsg(stream, &msg);
1193 stream->read_cb(stream, err, &buf);
1201 stream->read_cb(stream, nread, &buf);
1205 stream->flags |= UV_HANDLE_READ_PARTIAL;
1221 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
1222 assert(stream->type == UV_TCP ||
1223 stream->type == UV_TTY ||
1224 stream->type == UV_NAMED_PIPE);
1226 if (!(stream->flags & UV_HANDLE_WRITABLE) ||
1227 stream->flags & UV_HANDLE_SHUT ||
1228 stream->flags & UV_HANDLE_SHUTTING ||
1229 uv__is_closing(stream)) {
1233 assert(uv__stream_fd(stream) >= 0);
1237 uv__req_init(stream->loop, req, UV_SHUTDOWN);
1238 req->handle = stream;
1240 stream->shutdown_req = req;
1241 stream->flags |= UV_HANDLE_SHUTTING;
1242 stream->flags &= ~UV_HANDLE_WRITABLE;
1244 if (QUEUE_EMPTY(&stream->write_queue))
1245 uv__io_feed(stream->loop, &stream->io_watcher);
1252 uv_stream_t* stream;
1254 stream = container_of(w, uv_stream_t, io_watcher);
1256 assert(stream->type == UV_TCP ||
1257 stream->type == UV_NAMED_PIPE ||
1258 stream->type == UV_TTY);
1259 assert(!(stream->flags & UV_HANDLE_CLOSING));
1261 if (stream->connect_req) {
1262 uv__stream_connect(stream);
1266 assert(uv__stream_fd(stream) >= 0);
1270 uv__read(stream);
1272 if (uv__stream_fd(stream) == -1)
1273 return; /* read_cb closed stream. */
1282 (stream->flags & UV_HANDLE_READING) &&
1283 (stream->flags & UV_HANDLE_READ_PARTIAL) &&
1284 !(stream->flags & UV_HANDLE_READ_EOF)) {
1286 uv__stream_eof(stream, &buf);
1289 if (uv__stream_fd(stream) == -1)
1290 return; /* read_cb closed stream. */
1293 uv__write(stream);
1294 uv__write_callbacks(stream);
1297 if (QUEUE_EMPTY(&stream->write_queue))
1298 uv__drain(stream);
1308 static void uv__stream_connect(uv_stream_t* stream) {
1310 uv_connect_t* req = stream->connect_req;
1313 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1316 if (stream->delayed_error) {
1321 error = stream->delayed_error;
1322 stream->delayed_error = 0;
1325 assert(uv__stream_fd(stream) >= 0);
1326 getsockopt(uv__stream_fd(stream),
1337 stream->connect_req = NULL;
1338 uv__req_unregister(stream->loop, req);
1340 if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
1341 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1347 if (uv__stream_fd(stream) == -1)
1351 uv__stream_flush_write_queue(stream, UV_ECANCELED);
1352 uv__write_callbacks(stream);
1357 static int uv__check_before_write(uv_stream_t* stream,
1361 assert((stream->type == UV_TCP ||
1362 stream->type == UV_NAMED_PIPE ||
1363 stream->type == UV_TTY) &&
1366 if (uv__stream_fd(stream) < 0)
1369 if (!(stream->flags & UV_HANDLE_WRITABLE))
1373 if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1396 uv_stream_t* stream,
1404 err = uv__check_before_write(stream, nbufs, send_handle);
1414 empty_queue = (stream->write_queue_size == 0);
1417 uv__req_init(stream->loop, req, UV_WRITE);
1419 req->handle = stream;
1434 stream->write_queue_size += uv__count_bufs(bufs, nbufs);
1437 QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
1443 if (stream->connect_req) {
1447 uv__write(stream);
1452 * if this assert fires then somehow the blocking stream isn't being
1455 assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
1456 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1457 uv__stream_osx_interrupt_select(stream);
1476 int uv_try_write(uv_stream_t* stream,
1479 return uv_try_write2(stream, bufs, nbufs, NULL);
1483 int uv_try_write2(uv_stream_t* stream,
1490 if (stream->connect_req != NULL || stream->write_queue_size != 0)
1493 err = uv__check_before_write(stream, nbufs, NULL);
1497 return uv__try_write(stream, bufs, nbufs, send_handle);
1501 int uv__read_start(uv_stream_t* stream,
1504 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1505 stream->type == UV_TTY);
1507 /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it
1509 stream->flags |= UV_HANDLE_READING;
1510 stream->flags &= ~UV_HANDLE_READ_EOF;
1513 assert(uv__stream_fd(stream) >= 0);
1516 stream->read_cb = read_cb;
1517 stream->alloc_cb = alloc_cb;
1519 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1520 uv__handle_start(stream);
1521 uv__stream_osx_interrupt_select(stream);
1527 int uv_read_stop(uv_stream_t* stream) {
1528 if (!(stream->flags & UV_HANDLE_READING))
1531 stream->flags &= ~UV_HANDLE_READING;
1532 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1533 uv__handle_stop(stream);
1534 uv__stream_osx_interrupt_select(stream);
1536 stream->read_cb = NULL;
1537 stream->alloc_cb = NULL;
1542 int uv_is_readable(const uv_stream_t* stream) {
1543 return !!(stream->flags & UV_HANDLE_READABLE);
1547 int uv_is_writable(const uv_stream_t* stream) {
1548 return !!(stream->flags & UV_HANDLE_WRITABLE);