Lines Matching refs:transport
5 * Client-side transport implementation for sockets.
16 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
179 * transport connection with the server. Some servers like to drop a TCP
186 * TCP idle timeout; client drops the transport socket if it is idle
470 xs_read_header(struct sock_xprt *transport, struct xdr_buf *buf)
472 if (!transport->recv.copied) {
473 if (buf->head[0].iov_len >= transport->recv.offset)
475 &transport->recv.xid,
476 transport->recv.offset);
477 transport->recv.copied = transport->recv.offset;
482 xs_read_stream_request_done(struct sock_xprt *transport)
484 return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT);
488 xs_read_stream_check_eor(struct sock_xprt *transport,
491 if (xs_read_stream_request_done(transport))
496 xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
503 xs_read_header(transport, buf);
505 want = transport->recv.len - transport->recv.offset;
507 ret = xs_read_xdr_buf(transport->sock, msg, flags, buf,
508 transport->recv.copied + want,
509 transport->recv.copied,
511 transport->recv.offset += read;
512 transport->recv.copied += read;
515 if (transport->recv.offset == transport->recv.len)
516 xs_read_stream_check_eor(transport, msg);
543 xs_read_stream_header(struct sock_xprt *transport, struct msghdr *msg,
547 .iov_base = &transport->recv.fraghdr,
550 return xs_read_kvec(transport->sock, msg, flags, &kvec, want, seek);
555 xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
557 struct rpc_xprt *xprt = &transport->xprt;
562 req = xprt_lookup_bc_request(xprt, transport->recv.xid);
567 if (transport->recv.copied && !req->rq_private_buf.len)
570 ret = xs_read_stream_request(transport, msg, flags, req);
572 xprt_complete_bc_request(req, transport->recv.copied);
574 req->rq_private_buf.len = transport->recv.copied;
580 xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
587 xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags)
589 struct rpc_xprt *xprt = &transport->xprt;
595 req = xprt_lookup_rqst(xprt, transport->recv.xid);
596 if (!req || (transport->recv.copied && !req->rq_private_buf.len)) {
603 ret = xs_read_stream_request(transport, msg, flags, req);
607 xprt_complete_rqst(req->rq_task, transport->recv.copied);
609 req->rq_private_buf.len = transport->recv.copied;
617 xs_read_stream(struct sock_xprt *transport, int flags)
623 if (transport->recv.len == 0) {
624 want = xs_read_stream_headersize(transport->recv.copied != 0);
625 ret = xs_read_stream_header(transport, &msg, flags, want,
626 transport->recv.offset);
629 transport->recv.offset = ret;
630 if (transport->recv.offset != want)
631 return transport->recv.offset;
632 transport->recv.len = be32_to_cpu(transport->recv.fraghdr) &
634 transport->recv.offset -= sizeof(transport->recv.fraghdr);
638 switch (be32_to_cpu(transport->recv.calldir)) {
643 ret = xs_read_stream_call(transport, &msg, flags);
646 ret = xs_read_stream_reply(transport, &msg, flags);
649 transport->recv.calldir = cpu_to_be32(-1);
650 transport->recv.copied = -1;
655 if (transport->recv.offset < transport->recv.len) {
659 ret = xs_read_discard(transport->sock, &msg, flags,
660 transport->recv.len - transport->recv.offset);
663 transport->recv.offset += ret;
665 if (transport->recv.offset != transport->recv.len)
668 if (xs_read_stream_request_done(transport)) {
669 trace_xs_stream_read_request(transport);
670 transport->recv.copied = 0;
672 transport->recv.offset = 0;
673 transport->recv.len = 0;
679 static __poll_t xs_poll_socket(struct sock_xprt *transport)
681 return transport->sock->ops->poll(transport->file, transport->sock,
685 static bool xs_poll_socket_readable(struct sock_xprt *transport)
687 __poll_t events = xs_poll_socket(transport);
692 static void xs_poll_check_readable(struct sock_xprt *transport)
695 clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
696 if (!xs_poll_socket_readable(transport))
698 if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
699 queue_work(xprtiod_workqueue, &transport->recv_worker);
702 static void xs_stream_data_receive(struct sock_xprt *transport)
707 mutex_lock(&transport->recv_mutex);
708 if (transport->sock == NULL)
711 ret = xs_read_stream(transport, MSG_DONTWAIT);
718 kernel_sock_shutdown(transport->sock, SHUT_RDWR);
720 xs_poll_check_readable(transport);
722 mutex_unlock(&transport->recv_mutex);
723 trace_xs_stream_read_data(&transport->xprt, ret, read);
728 struct sock_xprt *transport =
732 xs_stream_data_receive(transport);
737 xs_stream_reset_connect(struct sock_xprt *transport)
739 transport->recv.offset = 0;
740 transport->recv.len = 0;
741 transport->recv.copied = 0;
742 transport->xmit.offset = 0;
746 xs_stream_start_connect(struct sock_xprt *transport)
748 transport->xprt.stat.connect_count++;
749 transport->xprt.stat.connect_start = jiffies;
757 * @transport: pointer to struct sock_xprt
760 static int xs_nospace(struct rpc_rqst *req, struct sock_xprt *transport)
762 struct rpc_xprt *xprt = &transport->xprt;
763 struct sock *sk = transport->inet;
766 trace_rpc_socket_nospace(req, transport);
793 struct sock_xprt *transport =
795 struct sock *sk = transport->inet;
800 ret = xs_nospace(req, transport);
807 struct sock_xprt *transport =
809 struct sock *sk = transport->inet;
814 ret = xs_nospace(req, transport);
831 xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req)
833 return transport->xmit.offset != 0 && req->rq_bytes_sent == 0;
861 struct sock_xprt *transport =
873 if (xs_send_request_was_aborted(transport, req)) {
882 status = xprt_sock_sendmsg(transport->sock, &msg, xdr,
883 transport->xmit.offset, rm, &sent);
885 __func__, xdr->len - transport->xmit.offset, status);
887 if (status == -EAGAIN && sock_writeable(transport->inet))
891 transport->xmit.offset += sent;
892 req->rq_bytes_sent = transport->xmit.offset;
894 req->rq_xmit_bytes_sent += transport->xmit.offset;
895 transport->xmit.offset = 0;
933 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
954 status = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, 0, &sent);
963 if (status == -EAGAIN && sock_writeable(transport->inet))
1016 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1028 if (xs_send_request_was_aborted(transport, req)) {
1029 if (transport->sock != NULL)
1030 kernel_sock_shutdown(transport->sock, SHUT_RDWR);
1038 if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
1039 xs_tcp_set_socket_timeouts(xprt, transport->sock);
1046 status = xprt_sock_sendmsg(transport->sock, &msg, xdr,
1047 transport->xmit.offset, rm, &sent);
1050 xdr->len - transport->xmit.offset, status);
1054 transport->xmit.offset += sent;
1055 req->rq_bytes_sent = transport->xmit.offset;
1057 req->rq_xmit_bytes_sent += transport->xmit.offset;
1058 transport->xmit.offset = 0;
1069 if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
1113 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
1115 transport->old_data_ready = sk->sk_data_ready;
1116 transport->old_state_change = sk->sk_state_change;
1117 transport->old_write_space = sk->sk_write_space;
1118 transport->old_error_report = sk->sk_error_report;
1121 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
1123 sk->sk_data_ready = transport->old_data_ready;
1124 sk->sk_state_change = transport->old_state_change;
1125 sk->sk_write_space = transport->old_write_space;
1126 sk->sk_error_report = transport->old_error_report;
1131 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1133 clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
1134 clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state);
1135 clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state);
1136 clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state);
1139 static void xs_run_error_worker(struct sock_xprt *transport, unsigned int nr)
1141 set_bit(nr, &transport->sock_state);
1142 queue_work(xprtiod_workqueue, &transport->error_worker);
1163 struct sock_xprt *transport;
1170 transport = container_of(xprt, struct sock_xprt, xprt);
1171 transport->xprt_err = -sk->sk_err;
1172 if (transport->xprt_err == 0)
1175 xprt, -transport->xprt_err);
1176 trace_rpc_socket_error(xprt, sk->sk_socket, transport->xprt_err);
1180 xs_run_error_worker(transport, XPRT_SOCK_WAKE_ERROR);
1185 static void xs_reset_transport(struct sock_xprt *transport)
1187 struct socket *sock = transport->sock;
1188 struct sock *sk = transport->inet;
1189 struct rpc_xprt *xprt = &transport->xprt;
1190 struct file *filp = transport->file;
1205 if (atomic_read(&transport->xprt.swapper))
1210 mutex_lock(&transport->recv_mutex);
1212 transport->inet = NULL;
1213 transport->sock = NULL;
1214 transport->file = NULL;
1218 xs_restore_old_callbacks(transport, sk);
1223 xs_stream_reset_connect(transport);
1224 mutex_unlock(&transport->recv_mutex);
1234 * @xprt: transport
1244 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1248 xs_reset_transport(transport);
1254 dprintk("RPC: injecting transport disconnect on xprt=%p\n",
1266 * xs_destroy - prepare to shutdown a transport
1267 * @xprt: doomed transport
1272 struct sock_xprt *transport = container_of(xprt,
1276 cancel_delayed_work_sync(&transport->connect_worker);
1278 cancel_work_sync(&transport->recv_worker);
1279 cancel_work_sync(&transport->error_worker);
1286 * @xprt: transport
1345 static void xs_udp_data_receive(struct sock_xprt *transport)
1351 mutex_lock(&transport->recv_mutex);
1352 sk = transport->inet;
1359 xs_udp_data_read_skb(&transport->xprt, sk, skb);
1363 xs_poll_check_readable(transport);
1365 mutex_unlock(&transport->recv_mutex);
1370 struct sock_xprt *transport =
1374 xs_udp_data_receive(transport);
1391 struct sock_xprt *transport = container_of(xprt,
1393 transport->old_data_ready(sk);
1399 if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
1400 queue_work(xprtiod_workqueue, &transport->recv_worker);
1429 struct sock_xprt *transport;
1441 transport = container_of(xprt, struct sock_xprt, xprt);
1447 clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
1453 xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING);
1470 xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
1488 &transport->sock_state))
1492 xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
1501 struct sock_xprt *transport;
1510 transport = container_of(xprt, struct sock_xprt, xprt);
1516 xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE);
1566 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1567 struct sock *sk = transport->inet;
1569 if (transport->rcvsize) {
1571 sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1573 if (transport->sndsize) {
1575 sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1582 * @xprt: generic transport
1590 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1592 transport->sndsize = 0;
1594 transport->sndsize = sndsize + 1024;
1595 transport->rcvsize = 0;
1597 transport->rcvsize = rcvsize + 1024;
1603 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1604 * @xprt: controlling transport
1649 * @xprt: generic transport
1661 static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock)
1663 if (transport->srcport == 0 && transport->xprt.reuseport)
1664 transport->srcport = xs_sock_getport(sock);
1667 static int xs_get_srcport(struct sock_xprt *transport)
1669 int port = transport->srcport;
1671 if (port == 0 && transport->xprt.resvport)
1683 static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port)
1685 if (transport->srcport != 0)
1686 transport->srcport = 0;
1687 if (!transport->xprt.resvport)
1693 static int xs_bind(struct sock_xprt *transport, struct socket *sock)
1697 int port = xs_get_srcport(transport);
1702 * transport->xprt.resvport == 0), don't bind. Let the local
1712 * transport->xprt.resvport == 1) xs_get_srcport above will
1718 memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen);
1722 transport->xprt.addrlen);
1724 if (transport->xprt.reuseport)
1725 transport->srcport = port;
1729 port = xs_next_srcport(transport, port);
1813 struct sock_xprt *transport, int family, int type,
1822 dprintk("RPC: can't create %d transport socket (%d).\n",
1831 err = xs_bind(transport, sock);
1840 transport->file = filp;
1850 struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1853 if (!transport->inet) {
1858 xs_save_old_callbacks(transport, sk);
1869 transport->sock = sock;
1870 transport->inet = sk;
1875 xs_stream_start_connect(transport);
1882 * @transport: socket transport to connect
1884 static int xs_local_setup_socket(struct sock_xprt *transport)
1886 struct rpc_xprt *xprt = &transport->xprt;
1895 "transport socket (%d).\n", -status);
1905 transport->file = filp;
1945 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1962 ret = xs_local_setup_socket(transport);
1975 struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1982 if (!transport->inet)
1985 sk_set_memalloc(transport->inet);
1989 * xs_enable_swap - Tag this transport as being used for swap.
1990 * @xprt: transport to tag
1992 * Take a reference to this transport on behalf of the rpc_clnt, and
2011 * xs_disable_swap - Untag this transport as being used for swap.
2012 * @xprt: transport to tag
2049 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2051 if (!transport->inet) {
2056 xs_save_old_callbacks(transport, sk);
2066 transport->sock = sock;
2067 transport->inet = sk;
2080 struct sock_xprt *transport =
2082 struct rpc_xprt *xprt = &transport->xprt;
2086 sock = xs_create_sock(xprt, transport,
2103 xprt_unlock_connect(xprt, transport);
2109 * @xprt: transport
2116 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2117 struct socket *sock = transport->sock;
2118 int skst = transport->inet ? transport->inet->sk_state : TCP_CLOSE;
2129 xs_reset_transport(transport);
2136 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2146 clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
2163 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2178 memcpy(&transport->tcp_timeout, &to,
2179 sizeof(transport->tcp_timeout));
2180 xprt->timeout = &transport->tcp_timeout;
2183 set_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
2189 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2192 if (!transport->inet) {
2211 xs_save_old_callbacks(transport, sk);
2227 transport->sock = sock;
2228 transport->inet = sk;
2238 xs_stream_start_connect(transport);
2241 set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
2245 xs_set_srcport(transport, sock);
2254 transport->srcport = 0;
2268 struct sock_xprt *transport =
2270 struct socket *sock = transport->sock;
2271 struct rpc_xprt *xprt = &transport->xprt;
2277 &transport->sock_state) ||
2279 xs_reset_transport(transport);
2280 sock = xs_create_sock(xprt, transport, xs_addr(xprt)->sa_family,
2312 set_bit(XPRT_SOCK_CONNECT_SENT, &transport->sock_state);
2315 xprt_unlock_connect(xprt, transport);
2340 xprt_unlock_connect(xprt, transport);
2346 * @xprt: pointer to transport structure
2360 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2363 WARN_ON_ONCE(!xprt_lock_connect(xprt, task, transport));
2365 if (transport->sock != NULL) {
2376 &transport->connect_worker,
2380 static void xs_wake_disconnect(struct sock_xprt *transport)
2382 if (test_and_clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state))
2383 xs_tcp_force_close(&transport->xprt);
2386 static void xs_wake_write(struct sock_xprt *transport)
2388 if (test_and_clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state))
2389 xprt_write_space(&transport->xprt);
2392 static void xs_wake_error(struct sock_xprt *transport)
2396 if (!test_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
2398 mutex_lock(&transport->recv_mutex);
2399 if (transport->sock == NULL)
2401 if (!test_and_clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
2403 sockerr = xchg(&transport->xprt_err, 0);
2405 xprt_wake_pending_tasks(&transport->xprt, sockerr);
2407 mutex_unlock(&transport->recv_mutex);
2410 static void xs_wake_pending(struct sock_xprt *transport)
2412 if (test_and_clear_bit(XPRT_SOCK_WAKE_PENDING, &transport->sock_state))
2413 xprt_wake_pending_tasks(&transport->xprt, -EAGAIN);
2418 struct sock_xprt *transport = container_of(work,
2421 xs_wake_disconnect(transport);
2422 xs_wake_write(transport);
2423 xs_wake_error(transport);
2424 xs_wake_pending(transport);
2464 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2468 transport->srcport,
2488 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2496 transport->srcport,
2556 struct sock_xprt *transport =
2567 err = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, marker, &sent);
2800 * xs_setup_local - Set up transport to use an AF_LOCAL socket
2801 * @args: rpc transport creation arguments
2803 * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP
2808 struct sock_xprt *transport;
2816 transport = container_of(xprt, struct sock_xprt, xprt);
2828 INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
2829 INIT_WORK(&transport->error_worker, xs_error_handle);
2830 INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket);
2867 * xs_setup_udp - Set up transport to use a UDP socket
2868 * @args: rpc transport creation arguments
2875 struct sock_xprt *transport;
2882 transport = container_of(xprt, struct sock_xprt, xprt);
2896 INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn);
2897 INIT_WORK(&transport->error_worker, xs_error_handle);
2898 INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket);
2943 * xs_setup_tcp - Set up transport to use a TCP socket
2944 * @args: rpc transport creation arguments
2951 struct sock_xprt *transport;
2962 transport = container_of(xprt, struct sock_xprt, xprt);
2978 INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
2979 INIT_WORK(&transport->error_worker, xs_error_handle);
2980 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket);
3019 * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket
3020 * @args: rpc transport creation arguments
3027 struct sock_xprt *transport;
3035 transport = container_of(xprt, struct sock_xprt, xprt);
3078 transport->sock = bc_sock->sk_sock;
3079 transport->inet = bc_sock->sk_sk;