162306a36Sopenharmony_ci// SPDX-License-Identifier: GPL-2.0 262306a36Sopenharmony_ci/* 362306a36Sopenharmony_ci * Copyright (c) 2015-2020, Oracle and/or its affiliates. 462306a36Sopenharmony_ci * 562306a36Sopenharmony_ci * Support for reverse-direction RPCs on RPC/RDMA. 662306a36Sopenharmony_ci */ 762306a36Sopenharmony_ci 862306a36Sopenharmony_ci#include <linux/sunrpc/xprt.h> 962306a36Sopenharmony_ci#include <linux/sunrpc/svc.h> 1062306a36Sopenharmony_ci#include <linux/sunrpc/svc_xprt.h> 1162306a36Sopenharmony_ci#include <linux/sunrpc/svc_rdma.h> 1262306a36Sopenharmony_ci 1362306a36Sopenharmony_ci#include "xprt_rdma.h" 1462306a36Sopenharmony_ci#include <trace/events/rpcrdma.h> 1562306a36Sopenharmony_ci 1662306a36Sopenharmony_ci#undef RPCRDMA_BACKCHANNEL_DEBUG 1762306a36Sopenharmony_ci 1862306a36Sopenharmony_ci/** 1962306a36Sopenharmony_ci * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests 2062306a36Sopenharmony_ci * @xprt: transport associated with these backchannel resources 2162306a36Sopenharmony_ci * @reqs: number of concurrent incoming requests to expect 2262306a36Sopenharmony_ci * 2362306a36Sopenharmony_ci * Returns 0 on success; otherwise a negative errno 2462306a36Sopenharmony_ci */ 2562306a36Sopenharmony_ciint xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) 2662306a36Sopenharmony_ci{ 2762306a36Sopenharmony_ci struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 2862306a36Sopenharmony_ci 2962306a36Sopenharmony_ci r_xprt->rx_buf.rb_bc_srv_max_requests = RPCRDMA_BACKWARD_WRS >> 1; 3062306a36Sopenharmony_ci trace_xprtrdma_cb_setup(r_xprt, reqs); 3162306a36Sopenharmony_ci return 0; 3262306a36Sopenharmony_ci} 3362306a36Sopenharmony_ci 3462306a36Sopenharmony_ci/** 3562306a36Sopenharmony_ci * xprt_rdma_bc_maxpayload - Return maximum backchannel message size 3662306a36Sopenharmony_ci * @xprt: transport 3762306a36Sopenharmony_ci * 3862306a36Sopenharmony_ci * Returns maximum size, in bytes, of a backchannel message 3962306a36Sopenharmony_ci */ 4062306a36Sopenharmony_cisize_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) 4162306a36Sopenharmony_ci{ 4262306a36Sopenharmony_ci struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 4362306a36Sopenharmony_ci struct rpcrdma_ep *ep = r_xprt->rx_ep; 4462306a36Sopenharmony_ci size_t maxmsg; 4562306a36Sopenharmony_ci 4662306a36Sopenharmony_ci maxmsg = min_t(unsigned int, ep->re_inline_send, ep->re_inline_recv); 4762306a36Sopenharmony_ci maxmsg = min_t(unsigned int, maxmsg, PAGE_SIZE); 4862306a36Sopenharmony_ci return maxmsg - RPCRDMA_HDRLEN_MIN; 4962306a36Sopenharmony_ci} 5062306a36Sopenharmony_ci 5162306a36Sopenharmony_ciunsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt) 5262306a36Sopenharmony_ci{ 5362306a36Sopenharmony_ci return RPCRDMA_BACKWARD_WRS >> 1; 5462306a36Sopenharmony_ci} 5562306a36Sopenharmony_ci 5662306a36Sopenharmony_cistatic int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) 5762306a36Sopenharmony_ci{ 5862306a36Sopenharmony_ci struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 5962306a36Sopenharmony_ci struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 6062306a36Sopenharmony_ci __be32 *p; 6162306a36Sopenharmony_ci 6262306a36Sopenharmony_ci rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); 6362306a36Sopenharmony_ci xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf, 6462306a36Sopenharmony_ci rdmab_data(req->rl_rdmabuf), rqst); 6562306a36Sopenharmony_ci 6662306a36Sopenharmony_ci p = xdr_reserve_space(&req->rl_stream, 28); 6762306a36Sopenharmony_ci if (unlikely(!p)) 6862306a36Sopenharmony_ci return -EIO; 6962306a36Sopenharmony_ci *p++ = rqst->rq_xid; 7062306a36Sopenharmony_ci *p++ = rpcrdma_version; 7162306a36Sopenharmony_ci *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests); 7262306a36Sopenharmony_ci *p++ = rdma_msg; 7362306a36Sopenharmony_ci *p++ = xdr_zero; 7462306a36Sopenharmony_ci *p++ = xdr_zero; 7562306a36Sopenharmony_ci *p = xdr_zero; 7662306a36Sopenharmony_ci 7762306a36Sopenharmony_ci if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN, 7862306a36Sopenharmony_ci &rqst->rq_snd_buf, rpcrdma_noch_pullup)) 7962306a36Sopenharmony_ci return -EIO; 8062306a36Sopenharmony_ci 8162306a36Sopenharmony_ci trace_xprtrdma_cb_reply(r_xprt, rqst); 8262306a36Sopenharmony_ci return 0; 8362306a36Sopenharmony_ci} 8462306a36Sopenharmony_ci 8562306a36Sopenharmony_ci/** 8662306a36Sopenharmony_ci * xprt_rdma_bc_send_reply - marshal and send a backchannel reply 8762306a36Sopenharmony_ci * @rqst: RPC rqst with a backchannel RPC reply in rq_snd_buf 8862306a36Sopenharmony_ci * 8962306a36Sopenharmony_ci * Caller holds the transport's write lock. 9062306a36Sopenharmony_ci * 9162306a36Sopenharmony_ci * Returns: 9262306a36Sopenharmony_ci * %0 if the RPC message has been sent 9362306a36Sopenharmony_ci * %-ENOTCONN if the caller should reconnect and call again 9462306a36Sopenharmony_ci * %-EIO if a permanent error occurred and the request was not 9562306a36Sopenharmony_ci * sent. Do not try to send this message again. 9662306a36Sopenharmony_ci */ 9762306a36Sopenharmony_ciint xprt_rdma_bc_send_reply(struct rpc_rqst *rqst) 9862306a36Sopenharmony_ci{ 9962306a36Sopenharmony_ci struct rpc_xprt *xprt = rqst->rq_xprt; 10062306a36Sopenharmony_ci struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 10162306a36Sopenharmony_ci struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 10262306a36Sopenharmony_ci int rc; 10362306a36Sopenharmony_ci 10462306a36Sopenharmony_ci if (!xprt_connected(xprt)) 10562306a36Sopenharmony_ci return -ENOTCONN; 10662306a36Sopenharmony_ci 10762306a36Sopenharmony_ci if (!xprt_request_get_cong(xprt, rqst)) 10862306a36Sopenharmony_ci return -EBADSLT; 10962306a36Sopenharmony_ci 11062306a36Sopenharmony_ci rc = rpcrdma_bc_marshal_reply(rqst); 11162306a36Sopenharmony_ci if (rc < 0) 11262306a36Sopenharmony_ci goto failed_marshal; 11362306a36Sopenharmony_ci 11462306a36Sopenharmony_ci if (frwr_send(r_xprt, req)) 11562306a36Sopenharmony_ci goto drop_connection; 11662306a36Sopenharmony_ci return 0; 11762306a36Sopenharmony_ci 11862306a36Sopenharmony_cifailed_marshal: 11962306a36Sopenharmony_ci if (rc != -ENOTCONN) 12062306a36Sopenharmony_ci return rc; 12162306a36Sopenharmony_cidrop_connection: 12262306a36Sopenharmony_ci xprt_rdma_close(xprt); 12362306a36Sopenharmony_ci return -ENOTCONN; 12462306a36Sopenharmony_ci} 12562306a36Sopenharmony_ci 12662306a36Sopenharmony_ci/** 12762306a36Sopenharmony_ci * xprt_rdma_bc_destroy - Release resources for handling backchannel requests 12862306a36Sopenharmony_ci * @xprt: transport associated with these backchannel resources 12962306a36Sopenharmony_ci * @reqs: number of incoming requests to destroy; ignored 13062306a36Sopenharmony_ci */ 13162306a36Sopenharmony_civoid xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs) 13262306a36Sopenharmony_ci{ 13362306a36Sopenharmony_ci struct rpc_rqst *rqst, *tmp; 13462306a36Sopenharmony_ci 13562306a36Sopenharmony_ci spin_lock(&xprt->bc_pa_lock); 13662306a36Sopenharmony_ci list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { 13762306a36Sopenharmony_ci list_del(&rqst->rq_bc_pa_list); 13862306a36Sopenharmony_ci spin_unlock(&xprt->bc_pa_lock); 13962306a36Sopenharmony_ci 14062306a36Sopenharmony_ci rpcrdma_req_destroy(rpcr_to_rdmar(rqst)); 14162306a36Sopenharmony_ci 14262306a36Sopenharmony_ci spin_lock(&xprt->bc_pa_lock); 14362306a36Sopenharmony_ci } 14462306a36Sopenharmony_ci spin_unlock(&xprt->bc_pa_lock); 14562306a36Sopenharmony_ci} 14662306a36Sopenharmony_ci 14762306a36Sopenharmony_ci/** 14862306a36Sopenharmony_ci * xprt_rdma_bc_free_rqst - Release a backchannel rqst 14962306a36Sopenharmony_ci * @rqst: request to release 15062306a36Sopenharmony_ci */ 15162306a36Sopenharmony_civoid xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) 15262306a36Sopenharmony_ci{ 15362306a36Sopenharmony_ci struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 15462306a36Sopenharmony_ci struct rpcrdma_rep *rep = req->rl_reply; 15562306a36Sopenharmony_ci struct rpc_xprt *xprt = rqst->rq_xprt; 15662306a36Sopenharmony_ci struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 15762306a36Sopenharmony_ci 15862306a36Sopenharmony_ci rpcrdma_rep_put(&r_xprt->rx_buf, rep); 15962306a36Sopenharmony_ci req->rl_reply = NULL; 16062306a36Sopenharmony_ci 16162306a36Sopenharmony_ci spin_lock(&xprt->bc_pa_lock); 16262306a36Sopenharmony_ci list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); 16362306a36Sopenharmony_ci spin_unlock(&xprt->bc_pa_lock); 16462306a36Sopenharmony_ci xprt_put(xprt); 16562306a36Sopenharmony_ci} 16662306a36Sopenharmony_ci 16762306a36Sopenharmony_cistatic struct rpc_rqst *rpcrdma_bc_rqst_get(struct rpcrdma_xprt *r_xprt) 16862306a36Sopenharmony_ci{ 16962306a36Sopenharmony_ci struct rpc_xprt *xprt = &r_xprt->rx_xprt; 17062306a36Sopenharmony_ci struct rpcrdma_req *req; 17162306a36Sopenharmony_ci struct rpc_rqst *rqst; 17262306a36Sopenharmony_ci size_t size; 17362306a36Sopenharmony_ci 17462306a36Sopenharmony_ci spin_lock(&xprt->bc_pa_lock); 17562306a36Sopenharmony_ci rqst = list_first_entry_or_null(&xprt->bc_pa_list, struct rpc_rqst, 17662306a36Sopenharmony_ci rq_bc_pa_list); 17762306a36Sopenharmony_ci if (!rqst) 17862306a36Sopenharmony_ci goto create_req; 17962306a36Sopenharmony_ci list_del(&rqst->rq_bc_pa_list); 18062306a36Sopenharmony_ci spin_unlock(&xprt->bc_pa_lock); 18162306a36Sopenharmony_ci return rqst; 18262306a36Sopenharmony_ci 18362306a36Sopenharmony_cicreate_req: 18462306a36Sopenharmony_ci spin_unlock(&xprt->bc_pa_lock); 18562306a36Sopenharmony_ci 18662306a36Sopenharmony_ci /* Set a limit to prevent a remote from overrunning our resources. 18762306a36Sopenharmony_ci */ 18862306a36Sopenharmony_ci if (xprt->bc_alloc_count >= RPCRDMA_BACKWARD_WRS) 18962306a36Sopenharmony_ci return NULL; 19062306a36Sopenharmony_ci 19162306a36Sopenharmony_ci size = min_t(size_t, r_xprt->rx_ep->re_inline_recv, PAGE_SIZE); 19262306a36Sopenharmony_ci req = rpcrdma_req_create(r_xprt, size); 19362306a36Sopenharmony_ci if (!req) 19462306a36Sopenharmony_ci return NULL; 19562306a36Sopenharmony_ci if (rpcrdma_req_setup(r_xprt, req)) { 19662306a36Sopenharmony_ci rpcrdma_req_destroy(req); 19762306a36Sopenharmony_ci return NULL; 19862306a36Sopenharmony_ci } 19962306a36Sopenharmony_ci 20062306a36Sopenharmony_ci xprt->bc_alloc_count++; 20162306a36Sopenharmony_ci rqst = &req->rl_slot; 20262306a36Sopenharmony_ci rqst->rq_xprt = xprt; 20362306a36Sopenharmony_ci __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); 20462306a36Sopenharmony_ci xdr_buf_init(&rqst->rq_snd_buf, rdmab_data(req->rl_sendbuf), size); 20562306a36Sopenharmony_ci return rqst; 20662306a36Sopenharmony_ci} 20762306a36Sopenharmony_ci 20862306a36Sopenharmony_ci/** 20962306a36Sopenharmony_ci * rpcrdma_bc_receive_call - Handle a reverse-direction Call 21062306a36Sopenharmony_ci * @r_xprt: transport receiving the call 21162306a36Sopenharmony_ci * @rep: receive buffer containing the call 21262306a36Sopenharmony_ci * 21362306a36Sopenharmony_ci * Operational assumptions: 21462306a36Sopenharmony_ci * o Backchannel credits are ignored, just as the NFS server 21562306a36Sopenharmony_ci * forechannel currently does 21662306a36Sopenharmony_ci * o The ULP manages a replay cache (eg, NFSv4.1 sessions). 21762306a36Sopenharmony_ci * No replay detection is done at the transport level 21862306a36Sopenharmony_ci */ 21962306a36Sopenharmony_civoid rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, 22062306a36Sopenharmony_ci struct rpcrdma_rep *rep) 22162306a36Sopenharmony_ci{ 22262306a36Sopenharmony_ci struct rpc_xprt *xprt = &r_xprt->rx_xprt; 22362306a36Sopenharmony_ci struct svc_serv *bc_serv; 22462306a36Sopenharmony_ci struct rpcrdma_req *req; 22562306a36Sopenharmony_ci struct rpc_rqst *rqst; 22662306a36Sopenharmony_ci struct xdr_buf *buf; 22762306a36Sopenharmony_ci size_t size; 22862306a36Sopenharmony_ci __be32 *p; 22962306a36Sopenharmony_ci 23062306a36Sopenharmony_ci p = xdr_inline_decode(&rep->rr_stream, 0); 23162306a36Sopenharmony_ci size = xdr_stream_remaining(&rep->rr_stream); 23262306a36Sopenharmony_ci 23362306a36Sopenharmony_ci#ifdef RPCRDMA_BACKCHANNEL_DEBUG 23462306a36Sopenharmony_ci pr_info("RPC: %s: callback XID %08x, length=%u\n", 23562306a36Sopenharmony_ci __func__, be32_to_cpup(p), size); 23662306a36Sopenharmony_ci pr_info("RPC: %s: %*ph\n", __func__, size, p); 23762306a36Sopenharmony_ci#endif 23862306a36Sopenharmony_ci 23962306a36Sopenharmony_ci rqst = rpcrdma_bc_rqst_get(r_xprt); 24062306a36Sopenharmony_ci if (!rqst) 24162306a36Sopenharmony_ci goto out_overflow; 24262306a36Sopenharmony_ci 24362306a36Sopenharmony_ci rqst->rq_reply_bytes_recvd = 0; 24462306a36Sopenharmony_ci rqst->rq_xid = *p; 24562306a36Sopenharmony_ci 24662306a36Sopenharmony_ci rqst->rq_private_buf.len = size; 24762306a36Sopenharmony_ci 24862306a36Sopenharmony_ci buf = &rqst->rq_rcv_buf; 24962306a36Sopenharmony_ci memset(buf, 0, sizeof(*buf)); 25062306a36Sopenharmony_ci buf->head[0].iov_base = p; 25162306a36Sopenharmony_ci buf->head[0].iov_len = size; 25262306a36Sopenharmony_ci buf->len = size; 25362306a36Sopenharmony_ci 25462306a36Sopenharmony_ci /* The receive buffer has to be hooked to the rpcrdma_req 25562306a36Sopenharmony_ci * so that it is not released while the req is pointing 25662306a36Sopenharmony_ci * to its buffer, and so that it can be reposted after 25762306a36Sopenharmony_ci * the Upper Layer is done decoding it. 25862306a36Sopenharmony_ci */ 25962306a36Sopenharmony_ci req = rpcr_to_rdmar(rqst); 26062306a36Sopenharmony_ci req->rl_reply = rep; 26162306a36Sopenharmony_ci trace_xprtrdma_cb_call(r_xprt, rqst); 26262306a36Sopenharmony_ci 26362306a36Sopenharmony_ci /* Queue rqst for ULP's callback service */ 26462306a36Sopenharmony_ci bc_serv = xprt->bc_serv; 26562306a36Sopenharmony_ci xprt_get(xprt); 26662306a36Sopenharmony_ci spin_lock(&bc_serv->sv_cb_lock); 26762306a36Sopenharmony_ci list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list); 26862306a36Sopenharmony_ci spin_unlock(&bc_serv->sv_cb_lock); 26962306a36Sopenharmony_ci 27062306a36Sopenharmony_ci wake_up(&bc_serv->sv_cb_waitq); 27162306a36Sopenharmony_ci 27262306a36Sopenharmony_ci r_xprt->rx_stats.bcall_count++; 27362306a36Sopenharmony_ci return; 27462306a36Sopenharmony_ci 27562306a36Sopenharmony_ciout_overflow: 27662306a36Sopenharmony_ci pr_warn("RPC/RDMA backchannel overflow\n"); 27762306a36Sopenharmony_ci xprt_force_disconnect(xprt); 27862306a36Sopenharmony_ci /* This receive buffer gets reposted automatically 27962306a36Sopenharmony_ci * when the connection is re-established. 28062306a36Sopenharmony_ci */ 28162306a36Sopenharmony_ci return; 28262306a36Sopenharmony_ci} 283