Lines Matching refs:transport

5  * Client-side transport implementation for sockets.
16 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
65 static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock);
181 * transport connection with the server. Some servers like to drop a TCP
188 * TCP idle timeout; client drops the transport socket if it is idle
528 xs_read_header(struct sock_xprt *transport, struct xdr_buf *buf)
530 if (!transport->recv.copied) {
531 if (buf->head[0].iov_len >= transport->recv.offset)
533 &transport->recv.xid,
534 transport->recv.offset);
535 transport->recv.copied = transport->recv.offset;
540 xs_read_stream_request_done(struct sock_xprt *transport)
542 return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT);
546 xs_read_stream_check_eor(struct sock_xprt *transport,
549 if (xs_read_stream_request_done(transport))
554 xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
561 xs_read_header(transport, buf);
563 want = transport->recv.len - transport->recv.offset;
565 ret = xs_read_xdr_buf(transport->sock, msg, flags, buf,
566 transport->recv.copied + want,
567 transport->recv.copied,
569 transport->recv.offset += read;
570 transport->recv.copied += read;
573 if (transport->recv.offset == transport->recv.len)
574 xs_read_stream_check_eor(transport, msg);
601 xs_read_stream_header(struct sock_xprt *transport, struct msghdr *msg,
605 .iov_base = &transport->recv.fraghdr,
608 return xs_read_kvec(transport->sock, msg, flags, &kvec, want, seek);
613 xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
615 struct rpc_xprt *xprt = &transport->xprt;
619 /* Is this transport associated with the backchannel? */
624 req = xprt_lookup_bc_request(xprt, transport->recv.xid);
629 if (transport->recv.copied && !req->rq_private_buf.len)
632 ret = xs_read_stream_request(transport, msg, flags, req);
634 xprt_complete_bc_request(req, transport->recv.copied);
636 req->rq_private_buf.len = transport->recv.copied;
642 xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
649 xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags)
651 struct rpc_xprt *xprt = &transport->xprt;
657 req = xprt_lookup_rqst(xprt, transport->recv.xid);
658 if (!req || (transport->recv.copied && !req->rq_private_buf.len)) {
665 ret = xs_read_stream_request(transport, msg, flags, req);
669 xprt_complete_rqst(req->rq_task, transport->recv.copied);
671 req->rq_private_buf.len = transport->recv.copied;
679 xs_read_stream(struct sock_xprt *transport, int flags)
685 if (transport->recv.len == 0) {
686 want = xs_read_stream_headersize(transport->recv.copied != 0);
687 ret = xs_read_stream_header(transport, &msg, flags, want,
688 transport->recv.offset);
691 transport->recv.offset = ret;
692 if (transport->recv.offset != want)
693 return transport->recv.offset;
694 transport->recv.len = be32_to_cpu(transport->recv.fraghdr) &
696 transport->recv.offset -= sizeof(transport->recv.fraghdr);
700 switch (be32_to_cpu(transport->recv.calldir)) {
705 ret = xs_read_stream_call(transport, &msg, flags);
708 ret = xs_read_stream_reply(transport, &msg, flags);
711 transport->recv.calldir = cpu_to_be32(-1);
712 transport->recv.copied = -1;
717 if (transport->recv.offset < transport->recv.len) {
721 ret = xs_read_discard(transport->sock, &msg, flags,
722 transport->recv.len - transport->recv.offset);
725 transport->recv.offset += ret;
727 if (transport->recv.offset != transport->recv.len)
730 if (xs_read_stream_request_done(transport)) {
731 trace_xs_stream_read_request(transport);
732 transport->recv.copied = 0;
734 transport->recv.offset = 0;
735 transport->recv.len = 0;
741 static __poll_t xs_poll_socket(struct sock_xprt *transport)
743 return transport->sock->ops->poll(transport->file, transport->sock,
747 static bool xs_poll_socket_readable(struct sock_xprt *transport)
749 __poll_t events = xs_poll_socket(transport);
754 static void xs_poll_check_readable(struct sock_xprt *transport)
757 clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
758 if (test_bit(XPRT_SOCK_IGNORE_RECV, &transport->sock_state))
760 if (!xs_poll_socket_readable(transport))
762 if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
763 queue_work(xprtiod_workqueue, &transport->recv_worker);
766 static void xs_stream_data_receive(struct sock_xprt *transport)
771 mutex_lock(&transport->recv_mutex);
772 if (transport->sock == NULL)
775 ret = xs_read_stream(transport, MSG_DONTWAIT);
782 kernel_sock_shutdown(transport->sock, SHUT_RDWR);
784 xprt_wake_pending_tasks(&transport->xprt, -EACCES);
786 xs_poll_check_readable(transport);
788 mutex_unlock(&transport->recv_mutex);
789 trace_xs_stream_read_data(&transport->xprt, ret, read);
794 struct sock_xprt *transport =
798 xs_stream_data_receive(transport);
803 xs_stream_reset_connect(struct sock_xprt *transport)
805 transport->recv.offset = 0;
806 transport->recv.len = 0;
807 transport->recv.copied = 0;
808 transport->xmit.offset = 0;
812 xs_stream_start_connect(struct sock_xprt *transport)
814 transport->xprt.stat.connect_count++;
815 transport->xprt.stat.connect_start = jiffies;
823 * @transport: pointer to struct sock_xprt
826 static int xs_nospace(struct rpc_rqst *req, struct sock_xprt *transport)
828 struct rpc_xprt *xprt = &transport->xprt;
829 struct sock *sk = transport->inet;
832 trace_rpc_socket_nospace(req, transport);
840 set_bit(XPRT_SOCK_NOSPACE, &transport->sock_state);
853 struct sock_xprt *transport =
855 struct sock *sk = transport->inet;
860 ret = xs_nospace(req, transport);
867 struct sock_xprt *transport =
869 struct sock *sk = transport->inet;
876 ret = xs_nospace(req, transport);
891 xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req)
893 return transport->xmit.offset != 0 && req->rq_bytes_sent == 0;
921 struct sock_xprt *transport =
934 if (xs_send_request_was_aborted(transport, req)) {
942 vm_wait = sk_stream_is_writeable(transport->inet) ? true : false;
945 status = xprt_sock_sendmsg(transport->sock, &msg, xdr,
946 transport->xmit.offset, rm, &sent);
948 __func__, xdr->len - transport->xmit.offset, status);
951 transport->xmit.offset += sent;
952 req->rq_bytes_sent = transport->xmit.offset;
954 req->rq_xmit_bytes_sent += transport->xmit.offset;
955 transport->xmit.offset = 0;
992 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1016 status = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, 0, &sent);
1025 if (status == -EAGAIN && sock_writeable(transport->inet))
1078 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1090 if (xs_send_request_was_aborted(transport, req)) {
1091 if (transport->sock != NULL)
1092 kernel_sock_shutdown(transport->sock, SHUT_RDWR);
1095 if (!transport->inet)
1102 if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
1103 xs_tcp_set_socket_timeouts(xprt, transport->sock);
1105 xs_set_srcport(transport, transport->sock);
1111 tcp_sock_set_cork(transport->inet, true);
1113 vm_wait = sk_stream_is_writeable(transport->inet) ? true : false;
1116 status = xprt_sock_sendmsg(transport->sock, &msg, xdr,
1117 transport->xmit.offset, rm, &sent);
1120 xdr->len - transport->xmit.offset, status);
1124 transport->xmit.offset += sent;
1125 req->rq_bytes_sent = transport->xmit.offset;
1127 req->rq_xmit_bytes_sent += transport->xmit.offset;
1128 transport->xmit.offset = 0;
1130 tcp_sock_set_cork(transport->inet, false);
1164 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
1166 transport->old_data_ready = sk->sk_data_ready;
1167 transport->old_state_change = sk->sk_state_change;
1168 transport->old_write_space = sk->sk_write_space;
1169 transport->old_error_report = sk->sk_error_report;
1172 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
1174 sk->sk_data_ready = transport->old_data_ready;
1175 sk->sk_state_change = transport->old_state_change;
1176 sk->sk_write_space = transport->old_write_space;
1177 sk->sk_error_report = transport->old_error_report;
1182 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1184 clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
1185 clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state);
1186 clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state);
1187 clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state);
1188 clear_bit(XPRT_SOCK_NOSPACE, &transport->sock_state);
1191 static void xs_run_error_worker(struct sock_xprt *transport, unsigned int nr)
1193 set_bit(nr, &transport->sock_state);
1194 queue_work(xprtiod_workqueue, &transport->error_worker);
1216 struct sock_xprt *transport;
1222 transport = container_of(xprt, struct sock_xprt, xprt);
1223 transport->xprt_err = -sk->sk_err;
1224 if (transport->xprt_err == 0)
1227 xprt, -transport->xprt_err);
1228 trace_rpc_socket_error(xprt, sk->sk_socket, transport->xprt_err);
1232 xs_run_error_worker(transport, XPRT_SOCK_WAKE_ERROR);
1235 static void xs_reset_transport(struct sock_xprt *transport)
1237 struct socket *sock = transport->sock;
1238 struct sock *sk = transport->inet;
1239 struct rpc_xprt *xprt = &transport->xprt;
1240 struct file *filp = transport->file;
1255 if (atomic_read(&transport->xprt.swapper))
1262 mutex_lock(&transport->recv_mutex);
1264 transport->inet = NULL;
1265 transport->sock = NULL;
1266 transport->file = NULL;
1270 xs_restore_old_callbacks(transport, sk);
1274 xs_stream_reset_connect(transport);
1276 mutex_unlock(&transport->recv_mutex);
1286 * @xprt: transport
1296 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1300 if (transport->sock)
1301 tls_handshake_close(transport->sock);
1302 xs_reset_transport(transport);
1308 dprintk("RPC: injecting transport disconnect on xprt=%p\n",
1320 * xs_destroy - prepare to shutdown a transport
1321 * @xprt: doomed transport
1326 struct sock_xprt *transport = container_of(xprt,
1330 cancel_delayed_work_sync(&transport->connect_worker);
1332 cancel_work_sync(&transport->recv_worker);
1333 cancel_work_sync(&transport->error_worker);
1340 * @xprt: transport
1399 static void xs_udp_data_receive(struct sock_xprt *transport)
1405 mutex_lock(&transport->recv_mutex);
1406 sk = transport->inet;
1413 xs_udp_data_read_skb(&transport->xprt, sk, skb);
1417 xs_poll_check_readable(transport);
1419 mutex_unlock(&transport->recv_mutex);
1424 struct sock_xprt *transport =
1428 xs_udp_data_receive(transport);
1445 struct sock_xprt *transport = container_of(xprt,
1450 transport->old_data_ready(sk);
1452 if (test_bit(XPRT_SOCK_IGNORE_RECV, &transport->sock_state))
1460 if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
1461 queue_work(xprtiod_workqueue, &transport->recv_worker);
1489 struct sock_xprt *transport;
1493 transport = container_of(xprt, struct sock_xprt, xprt);
1497 xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
1509 struct sock_xprt *transport;
1520 transport = container_of(xprt, struct sock_xprt, xprt);
1526 clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
1532 xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING);
1549 xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
1567 &transport->sock_state))
1571 xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
1577 struct sock_xprt *transport;
1586 transport = container_of(xprt, struct sock_xprt, xprt);
1587 if (!test_and_clear_bit(XPRT_SOCK_NOSPACE, &transport->sock_state))
1589 xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE);
1629 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1630 struct sock *sk = transport->inet;
1632 if (transport->rcvsize) {
1634 sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1636 if (transport->sndsize) {
1638 sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1645 * @xprt: generic transport
1653 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1655 transport->sndsize = 0;
1657 transport->sndsize = sndsize + 1024;
1658 transport->rcvsize = 0;
1660 transport->rcvsize = rcvsize + 1024;
1666 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1667 * @xprt: controlling transport
1712 * @xprt: generic transport
1724 static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock)
1726 if (transport->srcport == 0 && transport->xprt.reuseport)
1727 transport->srcport = xs_sock_getport(sock);
1730 static int xs_get_srcport(struct sock_xprt *transport)
1732 int port = transport->srcport;
1734 if (port == 0 && transport->xprt.resvport)
1769 static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port)
1771 if (transport->srcport != 0)
1772 transport->srcport = 0;
1773 if (!transport->xprt.resvport)
1779 static int xs_bind(struct sock_xprt *transport, struct socket *sock)
1783 int port = xs_get_srcport(transport);
1788 * transport->xprt.resvport == 0), don't bind. Let the local
1798 * transport->xprt.resvport == 1) xs_get_srcport above will
1804 memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen);
1808 transport->xprt.addrlen);
1810 if (transport->xprt.reuseport)
1811 transport->srcport = port;
1815 port = xs_next_srcport(transport, port);
1899 struct sock_xprt *transport, int family, int type,
1908 dprintk("RPC: can't create %d transport socket (%d).\n",
1917 err = xs_bind(transport, sock);
1926 transport->file = filp;
1936 struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1939 if (!transport->inet) {
1944 xs_save_old_callbacks(transport, sk);
1956 transport->sock = sock;
1957 transport->inet = sk;
1962 xs_stream_start_connect(transport);
1969 * @transport: socket transport to connect
1971 static int xs_local_setup_socket(struct sock_xprt *transport)
1973 struct rpc_xprt *xprt = &transport->xprt;
1982 "transport socket (%d).\n", -status);
1992 transport->file = filp;
2032 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2035 if (transport->file)
2051 ret = xs_local_setup_socket(transport);
2070 struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
2077 if (!transport->inet)
2080 sk_set_memalloc(transport->inet);
2084 * xs_enable_swap - Tag this transport as being used for swap.
2085 * @xprt: transport to tag
2087 * Take a reference to this transport on behalf of the rpc_clnt, and
2104 * xs_disable_swap - Untag this transport as being used for swap.
2105 * @xprt: transport to tag
2140 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2142 if (!transport->inet) {
2147 xs_save_old_callbacks(transport, sk);
2157 transport->sock = sock;
2158 transport->inet = sk;
2171 struct sock_xprt *transport =
2173 struct rpc_xprt *xprt = &transport->xprt;
2180 sock = xs_create_sock(xprt, transport,
2197 xprt_unlock_connect(xprt, transport);
2204 * @xprt: transport
2211 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2212 struct socket *sock = transport->sock;
2213 int skst = transport->inet ? transport->inet->sk_state : TCP_CLOSE;
2232 xs_reset_transport(transport);
2239 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2253 clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
2279 struct sock_xprt *transport =
2290 memcpy(&transport->tcp_timeout, &to, sizeof(transport->tcp_timeout));
2291 xprt->timeout = &transport->tcp_timeout;
2299 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2306 set_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
2312 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2314 if (!transport->inet) {
2334 xs_save_old_callbacks(transport, sk);
2349 transport->sock = sock;
2350 transport->inet = sk;
2360 xs_stream_start_connect(transport);
2363 set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
2375 struct sock_xprt *transport =
2377 struct socket *sock = transport->sock;
2378 struct rpc_xprt *xprt = &transport->xprt;
2388 &transport->sock_state) ||
2390 xs_reset_transport(transport);
2391 sock = xs_create_sock(xprt, transport, xs_addr(xprt)->sa_family,
2414 set_bit(XPRT_SOCK_CONNECT_SENT, &transport->sock_state);
2422 transport->srcport = 0;
2451 xprt_unlock_connect(xprt, transport);
2724 * @xprt: pointer to transport structure
2738 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2741 WARN_ON_ONCE(!xprt_lock_connect(xprt, task, transport));
2743 if (transport->sock != NULL) {
2753 transport->clnt = task->tk_client;
2755 &transport->connect_worker,
2759 static void xs_wake_disconnect(struct sock_xprt *transport)
2761 if (test_and_clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state))
2762 xs_tcp_force_close(&transport->xprt);
2765 static void xs_wake_write(struct sock_xprt *transport)
2767 if (test_and_clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state))
2768 xprt_write_space(&transport->xprt);
2771 static void xs_wake_error(struct sock_xprt *transport)
2775 if (!test_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
2777 mutex_lock(&transport->recv_mutex);
2778 if (transport->sock == NULL)
2780 if (!test_and_clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
2782 sockerr = xchg(&transport->xprt_err, 0);
2784 xprt_wake_pending_tasks(&transport->xprt, sockerr);
2786 mutex_unlock(&transport->recv_mutex);
2789 static void xs_wake_pending(struct sock_xprt *transport)
2791 if (test_and_clear_bit(XPRT_SOCK_WAKE_PENDING, &transport->sock_state))
2792 xprt_wake_pending_tasks(&transport->xprt, -EAGAIN);
2797 struct sock_xprt *transport = container_of(work,
2800 xs_wake_disconnect(transport);
2801 xs_wake_write(transport);
2802 xs_wake_error(transport);
2803 xs_wake_pending(transport);
2843 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2847 transport->srcport,
2867 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2875 transport->srcport,
2935 struct sock_xprt *transport =
2949 err = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, marker, &sent);
3186 * xs_setup_local - Set up transport to use an AF_LOCAL socket
3187 * @args: rpc transport creation arguments
3189 * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP
3194 struct sock_xprt *transport;
3202 transport = container_of(xprt, struct sock_xprt, xprt);
3215 INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
3216 INIT_WORK(&transport->error_worker, xs_error_handle);
3217 INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket);
3254 * xs_setup_udp - Set up transport to use a UDP socket
3255 * @args: rpc transport creation arguments
3262 struct sock_xprt *transport;
3269 transport = container_of(xprt, struct sock_xprt, xprt);
3284 INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn);
3285 INIT_WORK(&transport->error_worker, xs_error_handle);
3286 INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket);
3331 * xs_setup_tcp - Set up transport to use a TCP socket
3332 * @args: rpc transport creation arguments
3339 struct sock_xprt *transport;
3350 transport = container_of(xprt, struct sock_xprt, xprt);
3372 INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
3373 INIT_WORK(&transport->error_worker, xs_error_handle);
3374 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket);
3413 * xs_setup_tcp_tls - Set up transport to use a TCP with TLS
3414 * @args: rpc transport creation arguments
3421 struct sock_xprt *transport;
3432 transport = container_of(xprt, struct sock_xprt, xprt);
3449 INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
3450 INIT_WORK(&transport->error_worker, xs_error_handle);
3456 INIT_DELAYED_WORK(&transport->connect_worker,
3501 * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket
3502 * @args: rpc transport creation arguments
3509 struct sock_xprt *transport;
3517 transport = container_of(xprt, struct sock_xprt, xprt);
3561 transport->sock = bc_sock->sk_sock;
3562 transport->inet = bc_sock->sk_sk;