Lines Matching refs:stream

47   uv_stream_t* stream;
74 static void uv__write(uv_stream_t* stream);
75 static void uv__read(uv_stream_t* stream);
77 static void uv__write_callbacks(uv_stream_t* stream);
79 static void uv__drain(uv_stream_t* stream);
83 uv_stream_t* stream,
87 uv__handle_init(loop, (uv_handle_t*)stream, type);
88 stream->read_cb = NULL;
89 stream->alloc_cb = NULL;
90 stream->close_cb = NULL;
91 stream->connection_cb = NULL;
92 stream->connect_req = NULL;
93 stream->shutdown_req = NULL;
94 stream->accepted_fd = -1;
95 stream->queued_fds = NULL;
96 stream->delayed_error = 0;
97 uv__queue_init(&stream->write_queue);
98 uv__queue_init(&stream->write_completed_queue);
99 stream->write_queue_size = 0;
113 stream->select = NULL;
116 uv__io_init(&stream->io_watcher, uv__stream_io, -1);
120 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
126 s = stream->select;
147 uv_stream_t* stream;
155 stream = arg;
156 s = stream->select;
173 if (uv__io_active(&stream->io_watcher, POLLIN))
175 if (uv__io_active(&stream->io_watcher, POLLOUT))
228 assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
236 uv_stream_t* stream;
240 stream = s->stream;
242 /* Get and reset stream's events */
250 if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
251 uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
253 if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
254 uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
256 if (stream->flags & UV_HANDLE_CLOSING)
274 int uv__stream_try_select(uv_stream_t* stream, int* fd) {
346 err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
365 s->stream = stream;
366 stream->select = s;
369 err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
376 s->stream = NULL;
377 stream->select = NULL;
403 int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
408 if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
412 stream->flags |= flags;
414 if (stream->type == UV_TCP) {
415 if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
419 if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
434 stream->io_watcher.fd = fd;
440 void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
443 while (!uv__queue_empty(&stream->write_queue)) {
444 q = uv__queue_head(&stream->write_queue);
450 uv__queue_insert_tail(&stream->write_completed_queue, &req->queue);
455 void uv__stream_destroy(uv_stream_t* stream) {
456 assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
457 assert(stream->flags & UV_HANDLE_CLOSED);
459 if (stream->connect_req) {
460 uv__req_unregister(stream->loop, stream->connect_req);
461 stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
462 stream->connect_req = NULL;
465 uv__stream_flush_write_queue(stream, UV_ECANCELED);
466 uv__write_callbacks(stream);
467 uv__drain(stream);
469 assert(stream->write_queue_size == 0);
509 uv_stream_t* stream;
513 stream = container_of(w, uv_stream_t, io_watcher);
515 assert(stream->accepted_fd == -1);
516 assert(!(stream->flags & UV_HANDLE_CLOSING));
518 fd = uv__stream_fd(stream);
527 stream->accepted_fd = err;
528 stream->connection_cb(stream, 0);
530 if (stream->accepted_fd != -1)
532 uv__io_stop(loop, &stream->io_watcher, POLLIN);
601 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
603 if (uv__is_closing(stream)) {
606 switch (stream->type) {
608 err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb);
612 err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb);
620 uv__handle_start(stream);
626 static void uv__drain(uv_stream_t* stream) {
630 assert(uv__queue_empty(&stream->write_queue));
631 if (!(stream->flags & UV_HANDLE_CLOSING)) {
632 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
633 uv__stream_osx_interrupt_select(stream);
636 if (!uv__is_stream_shutting(stream))
639 req = stream->shutdown_req;
642 if ((stream->flags & UV_HANDLE_CLOSING) ||
643 !(stream->flags & UV_HANDLE_SHUT)) {
644 stream->shutdown_req = NULL;
645 uv__req_unregister(stream->loop, req);
648 if (stream->flags & UV_HANDLE_CLOSING)
649 /* The user destroyed the stream before we got to do the shutdown. */
651 else if (shutdown(uv__stream_fd(stream), SHUT_WR))
654 stream->flags |= UV_HANDLE_SHUT;
688 static int uv__write_req_update(uv_stream_t* stream,
694 assert(n <= stream->write_queue_size);
695 stream->write_queue_size -= n;
714 uv_stream_t* stream = req->handle;
734 uv__queue_insert_tail(&stream->write_completed_queue, &req->queue);
735 uv__io_feed(stream->loop, &stream->io_watcher);
753 static int uv__try_write(uv_stream_t* stream,
808 n = sendmsg(uv__stream_fd(stream), &msg, 0);
812 n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
839 static void uv__write(uv_stream_t* stream) {
845 assert(uv__stream_fd(stream) >= 0);
847 /* Prevent loop starvation when the consumer of this stream read as fast as
854 if (uv__queue_empty(&stream->write_queue))
857 q = uv__queue_head(&stream->write_queue);
859 assert(req->handle == stream);
861 n = uv__try_write(stream,
869 if (uv__write_req_update(stream, req, n)) {
879 /* If this is a blocking stream, try again. */
880 if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
884 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
887 uv__stream_osx_interrupt_select(stream);
895 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
896 uv__stream_osx_interrupt_select(stream);
900 static void uv__write_callbacks(uv_stream_t* stream) {
905 if (uv__queue_empty(&stream->write_completed_queue))
908 uv__queue_move(&stream->write_completed_queue, &pq);
915 uv__req_unregister(stream->loop, req);
918 stream->write_queue_size -= uv__write_req_size(req);
931 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
932 stream->flags |= UV_HANDLE_READ_EOF;
933 stream->flags &= ~UV_HANDLE_READING;
934 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
935 uv__handle_stop(stream);
936 uv__stream_osx_interrupt_select(stream);
937 stream->read_cb(stream, UV_EOF, buf);
941 static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
945 queued_fds = stream->queued_fds;
954 stream->queued_fds = queued_fds;
970 stream->queued_fds = queued_fds;
980 static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
1002 if (stream->accepted_fd != -1) {
1003 err = uv__stream_queue_fd(stream, fd);
1011 stream->accepted_fd = fd;
1020 static void uv__read(uv_stream_t* stream) {
1029 stream->flags &= ~UV_HANDLE_READ_PARTIAL;
1036 is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
1041 while (stream->read_cb
1042 && (stream->flags & UV_HANDLE_READING)
1044 assert(stream->alloc_cb != NULL);
1047 stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
1050 stream->read_cb(stream, UV_ENOBUFS, &buf);
1055 assert(uv__stream_fd(stream) >= 0);
1059 nread = read(uv__stream_fd(stream), buf.base, buf.len);
1074 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1083 if (stream->flags & UV_HANDLE_READING) {
1084 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1085 uv__stream_osx_interrupt_select(stream);
1087 stream->read_cb(stream, 0, &buf);
1089 } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
1090 uv__stream_eof(stream, &buf);
1095 stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1096 stream->read_cb(stream, UV__ERR(errno), &buf);
1097 if (stream->flags & UV_HANDLE_READING) {
1098 stream->flags &= ~UV_HANDLE_READING;
1099 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1100 uv__handle_stop(stream);
1101 uv__stream_osx_interrupt_select(stream);
1106 uv__stream_eof(stream, &buf);
1113 err = uv__stream_recv_cmsg(stream, &msg);
1115 stream->read_cb(stream, err, &buf);
1132 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1133 err = uv__stream_recv_cmsg(stream, &msg);
1135 stream->read_cb(stream, err, &buf);
1143 stream->read_cb(stream, nread, &buf);
1147 stream->flags |= UV_HANDLE_READ_PARTIAL;
1155 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
1156 assert(stream->type == UV_TCP ||
1157 stream->type == UV_TTY ||
1158 stream->type == UV_NAMED_PIPE);
1160 if (!(stream->flags & UV_HANDLE_WRITABLE) ||
1161 stream->flags & UV_HANDLE_SHUT ||
1162 uv__is_stream_shutting(stream) ||
1163 uv__is_closing(stream)) {
1167 assert(uv__stream_fd(stream) >= 0);
1171 uv__req_init(stream->loop, req, UV_SHUTDOWN);
1172 req->handle = stream;
1174 stream->shutdown_req = req;
1175 stream->flags &= ~UV_HANDLE_WRITABLE;
1177 if (uv__queue_empty(&stream->write_queue))
1178 uv__io_feed(stream->loop, &stream->io_watcher);
1185 uv_stream_t* stream;
1187 stream = container_of(w, uv_stream_t, io_watcher);
1189 assert(stream->type == UV_TCP ||
1190 stream->type == UV_NAMED_PIPE ||
1191 stream->type == UV_TTY);
1192 assert(!(stream->flags & UV_HANDLE_CLOSING));
1194 if (stream->connect_req) {
1195 uv__stream_connect(stream);
1199 assert(uv__stream_fd(stream) >= 0);
1203 uv__read(stream);
1205 if (uv__stream_fd(stream) == -1)
1206 return; /* read_cb closed stream. */
1215 (stream->flags & UV_HANDLE_READING) &&
1216 (stream->flags & UV_HANDLE_READ_PARTIAL) &&
1217 !(stream->flags & UV_HANDLE_READ_EOF)) {
1219 uv__stream_eof(stream, &buf);
1222 if (uv__stream_fd(stream) == -1)
1223 return; /* read_cb closed stream. */
1226 uv__write(stream);
1227 uv__write_callbacks(stream);
1230 if (uv__queue_empty(&stream->write_queue))
1231 uv__drain(stream);
1241 static void uv__stream_connect(uv_stream_t* stream) {
1243 uv_connect_t* req = stream->connect_req;
1246 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1249 if (stream->delayed_error) {
1254 error = stream->delayed_error;
1255 stream->delayed_error = 0;
1258 assert(uv__stream_fd(stream) >= 0);
1259 getsockopt(uv__stream_fd(stream),
1270 stream->connect_req = NULL;
1271 uv__req_unregister(stream->loop, req);
1273 if (error < 0 || uv__queue_empty(&stream->write_queue)) {
1274 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1280 if (uv__stream_fd(stream) == -1)
1284 uv__stream_flush_write_queue(stream, UV_ECANCELED);
1285 uv__write_callbacks(stream);
1290 static int uv__check_before_write(uv_stream_t* stream,
1294 assert((stream->type == UV_TCP ||
1295 stream->type == UV_NAMED_PIPE ||
1296 stream->type == UV_TTY) &&
1299 if (uv__stream_fd(stream) < 0)
1302 if (!(stream->flags & UV_HANDLE_WRITABLE))
1306 if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1329 uv_stream_t* stream,
1337 err = uv__check_before_write(stream, nbufs, send_handle);
1347 empty_queue = (stream->write_queue_size == 0);
1350 uv__req_init(stream->loop, req, UV_WRITE);
1352 req->handle = stream;
1367 stream->write_queue_size += uv__count_bufs(bufs, nbufs);
1370 uv__queue_insert_tail(&stream->write_queue, &req->queue);
1376 if (stream->connect_req) {
1380 uv__write(stream);
1385 * if this assert fires then somehow the blocking stream isn't being
1388 assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
1389 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1390 uv__stream_osx_interrupt_select(stream);
1409 int uv_try_write(uv_stream_t* stream,
1412 return uv_try_write2(stream, bufs, nbufs, NULL);
1416 int uv_try_write2(uv_stream_t* stream,
1423 if (stream->connect_req != NULL || stream->write_queue_size != 0)
1426 err = uv__check_before_write(stream, nbufs, NULL);
1430 return uv__try_write(stream, bufs, nbufs, send_handle);
1434 int uv__read_start(uv_stream_t* stream,
1437 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1438 stream->type == UV_TTY);
1440 /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it
1442 stream->flags |= UV_HANDLE_READING;
1443 stream->flags &= ~UV_HANDLE_READ_EOF;
1446 assert(uv__stream_fd(stream) >= 0);
1449 stream->read_cb = read_cb;
1450 stream->alloc_cb = alloc_cb;
1452 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1453 uv__handle_start(stream);
1454 uv__stream_osx_interrupt_select(stream);
1460 int uv_read_stop(uv_stream_t* stream) {
1461 if (!(stream->flags & UV_HANDLE_READING))
1464 stream->flags &= ~UV_HANDLE_READING;
1465 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1466 uv__handle_stop(stream);
1467 uv__stream_osx_interrupt_select(stream);
1469 stream->read_cb = NULL;
1470 stream->alloc_cb = NULL;
1475 int uv_is_readable(const uv_stream_t* stream) {
1476 return !!(stream->flags & UV_HANDLE_READABLE);
1480 int uv_is_writable(const uv_stream_t* stream) {
1481 return !!(stream->flags & UV_HANDLE_WRITABLE);