162306a36Sopenharmony_ci// SPDX-License-Identifier: GPL-2.0
262306a36Sopenharmony_ci/*
362306a36Sopenharmony_ci * Copyright (c) 2015-2018 Oracle.  All rights reserved.
462306a36Sopenharmony_ci *
562306a36Sopenharmony_ci * Support for reverse-direction RPCs on RPC/RDMA (server-side).
662306a36Sopenharmony_ci */
762306a36Sopenharmony_ci
862306a36Sopenharmony_ci#include <linux/sunrpc/svc_rdma.h>
962306a36Sopenharmony_ci
1062306a36Sopenharmony_ci#include "xprt_rdma.h"
1162306a36Sopenharmony_ci#include <trace/events/rpcrdma.h>
1262306a36Sopenharmony_ci
1362306a36Sopenharmony_ci/**
1462306a36Sopenharmony_ci * svc_rdma_handle_bc_reply - Process incoming backchannel Reply
1562306a36Sopenharmony_ci * @rqstp: resources for handling the Reply
1662306a36Sopenharmony_ci * @rctxt: Received message
1762306a36Sopenharmony_ci *
1862306a36Sopenharmony_ci */
1962306a36Sopenharmony_civoid svc_rdma_handle_bc_reply(struct svc_rqst *rqstp,
2062306a36Sopenharmony_ci			      struct svc_rdma_recv_ctxt *rctxt)
2162306a36Sopenharmony_ci{
2262306a36Sopenharmony_ci	struct svc_xprt *sxprt = rqstp->rq_xprt;
2362306a36Sopenharmony_ci	struct rpc_xprt *xprt = sxprt->xpt_bc_xprt;
2462306a36Sopenharmony_ci	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
2562306a36Sopenharmony_ci	struct xdr_buf *rcvbuf = &rqstp->rq_arg;
2662306a36Sopenharmony_ci	struct kvec *dst, *src = &rcvbuf->head[0];
2762306a36Sopenharmony_ci	__be32 *rdma_resp = rctxt->rc_recv_buf;
2862306a36Sopenharmony_ci	struct rpc_rqst *req;
2962306a36Sopenharmony_ci	u32 credits;
3062306a36Sopenharmony_ci
3162306a36Sopenharmony_ci	spin_lock(&xprt->queue_lock);
3262306a36Sopenharmony_ci	req = xprt_lookup_rqst(xprt, *rdma_resp);
3362306a36Sopenharmony_ci	if (!req)
3462306a36Sopenharmony_ci		goto out_unlock;
3562306a36Sopenharmony_ci
3662306a36Sopenharmony_ci	dst = &req->rq_private_buf.head[0];
3762306a36Sopenharmony_ci	memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(struct xdr_buf));
3862306a36Sopenharmony_ci	if (dst->iov_len < src->iov_len)
3962306a36Sopenharmony_ci		goto out_unlock;
4062306a36Sopenharmony_ci	memcpy(dst->iov_base, src->iov_base, src->iov_len);
4162306a36Sopenharmony_ci	xprt_pin_rqst(req);
4262306a36Sopenharmony_ci	spin_unlock(&xprt->queue_lock);
4362306a36Sopenharmony_ci
4462306a36Sopenharmony_ci	credits = be32_to_cpup(rdma_resp + 2);
4562306a36Sopenharmony_ci	if (credits == 0)
4662306a36Sopenharmony_ci		credits = 1;	/* don't deadlock */
4762306a36Sopenharmony_ci	else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
4862306a36Sopenharmony_ci		credits = r_xprt->rx_buf.rb_bc_max_requests;
4962306a36Sopenharmony_ci	spin_lock(&xprt->transport_lock);
5062306a36Sopenharmony_ci	xprt->cwnd = credits << RPC_CWNDSHIFT;
5162306a36Sopenharmony_ci	spin_unlock(&xprt->transport_lock);
5262306a36Sopenharmony_ci
5362306a36Sopenharmony_ci	spin_lock(&xprt->queue_lock);
5462306a36Sopenharmony_ci	xprt_complete_rqst(req->rq_task, rcvbuf->len);
5562306a36Sopenharmony_ci	xprt_unpin_rqst(req);
5662306a36Sopenharmony_ci	rcvbuf->len = 0;
5762306a36Sopenharmony_ci
5862306a36Sopenharmony_ciout_unlock:
5962306a36Sopenharmony_ci	spin_unlock(&xprt->queue_lock);
6062306a36Sopenharmony_ci}
6162306a36Sopenharmony_ci
6262306a36Sopenharmony_ci/* Send a reverse-direction RPC Call.
6362306a36Sopenharmony_ci *
6462306a36Sopenharmony_ci * Caller holds the connection's mutex and has already marshaled
6562306a36Sopenharmony_ci * the RPC/RDMA request.
6662306a36Sopenharmony_ci *
6762306a36Sopenharmony_ci * This is similar to svc_rdma_send_reply_msg, but takes a struct
6862306a36Sopenharmony_ci * rpc_rqst instead, does not support chunks, and avoids blocking
6962306a36Sopenharmony_ci * memory allocation.
7062306a36Sopenharmony_ci *
7162306a36Sopenharmony_ci * XXX: There is still an opportunity to block in svc_rdma_send()
7262306a36Sopenharmony_ci * if there are no SQ entries to post the Send. This may occur if
7362306a36Sopenharmony_ci * the adapter has a small maximum SQ depth.
7462306a36Sopenharmony_ci */
7562306a36Sopenharmony_cistatic int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
7662306a36Sopenharmony_ci			      struct rpc_rqst *rqst,
7762306a36Sopenharmony_ci			      struct svc_rdma_send_ctxt *sctxt)
7862306a36Sopenharmony_ci{
7962306a36Sopenharmony_ci	struct svc_rdma_recv_ctxt *rctxt;
8062306a36Sopenharmony_ci	int ret;
8162306a36Sopenharmony_ci
8262306a36Sopenharmony_ci	rctxt = svc_rdma_recv_ctxt_get(rdma);
8362306a36Sopenharmony_ci	if (!rctxt)
8462306a36Sopenharmony_ci		return -EIO;
8562306a36Sopenharmony_ci
8662306a36Sopenharmony_ci	ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqst->rq_snd_buf);
8762306a36Sopenharmony_ci	svc_rdma_recv_ctxt_put(rdma, rctxt);
8862306a36Sopenharmony_ci	if (ret < 0)
8962306a36Sopenharmony_ci		return -EIO;
9062306a36Sopenharmony_ci
9162306a36Sopenharmony_ci	/* Bump page refcnt so Send completion doesn't release
9262306a36Sopenharmony_ci	 * the rq_buffer before all retransmits are complete.
9362306a36Sopenharmony_ci	 */
9462306a36Sopenharmony_ci	get_page(virt_to_page(rqst->rq_buffer));
9562306a36Sopenharmony_ci	sctxt->sc_send_wr.opcode = IB_WR_SEND;
9662306a36Sopenharmony_ci	return svc_rdma_send(rdma, sctxt);
9762306a36Sopenharmony_ci}
9862306a36Sopenharmony_ci
9962306a36Sopenharmony_ci/* Server-side transport endpoint wants a whole page for its send
10062306a36Sopenharmony_ci * buffer. The client RPC code constructs the RPC header in this
10162306a36Sopenharmony_ci * buffer before it invokes ->send_request.
10262306a36Sopenharmony_ci */
10362306a36Sopenharmony_cistatic int
10462306a36Sopenharmony_cixprt_rdma_bc_allocate(struct rpc_task *task)
10562306a36Sopenharmony_ci{
10662306a36Sopenharmony_ci	struct rpc_rqst *rqst = task->tk_rqstp;
10762306a36Sopenharmony_ci	size_t size = rqst->rq_callsize;
10862306a36Sopenharmony_ci	struct page *page;
10962306a36Sopenharmony_ci
11062306a36Sopenharmony_ci	if (size > PAGE_SIZE) {
11162306a36Sopenharmony_ci		WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
11262306a36Sopenharmony_ci			  size);
11362306a36Sopenharmony_ci		return -EINVAL;
11462306a36Sopenharmony_ci	}
11562306a36Sopenharmony_ci
11662306a36Sopenharmony_ci	page = alloc_page(GFP_NOIO | __GFP_NOWARN);
11762306a36Sopenharmony_ci	if (!page)
11862306a36Sopenharmony_ci		return -ENOMEM;
11962306a36Sopenharmony_ci	rqst->rq_buffer = page_address(page);
12062306a36Sopenharmony_ci
12162306a36Sopenharmony_ci	rqst->rq_rbuffer = kmalloc(rqst->rq_rcvsize, GFP_NOIO | __GFP_NOWARN);
12262306a36Sopenharmony_ci	if (!rqst->rq_rbuffer) {
12362306a36Sopenharmony_ci		put_page(page);
12462306a36Sopenharmony_ci		return -ENOMEM;
12562306a36Sopenharmony_ci	}
12662306a36Sopenharmony_ci	return 0;
12762306a36Sopenharmony_ci}
12862306a36Sopenharmony_ci
12962306a36Sopenharmony_cistatic void
13062306a36Sopenharmony_cixprt_rdma_bc_free(struct rpc_task *task)
13162306a36Sopenharmony_ci{
13262306a36Sopenharmony_ci	struct rpc_rqst *rqst = task->tk_rqstp;
13362306a36Sopenharmony_ci
13462306a36Sopenharmony_ci	put_page(virt_to_page(rqst->rq_buffer));
13562306a36Sopenharmony_ci	kfree(rqst->rq_rbuffer);
13662306a36Sopenharmony_ci}
13762306a36Sopenharmony_ci
13862306a36Sopenharmony_cistatic int
13962306a36Sopenharmony_cirpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
14062306a36Sopenharmony_ci{
14162306a36Sopenharmony_ci	struct rpc_xprt *xprt = rqst->rq_xprt;
14262306a36Sopenharmony_ci	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
14362306a36Sopenharmony_ci	struct svc_rdma_send_ctxt *ctxt;
14462306a36Sopenharmony_ci	__be32 *p;
14562306a36Sopenharmony_ci	int rc;
14662306a36Sopenharmony_ci
14762306a36Sopenharmony_ci	ctxt = svc_rdma_send_ctxt_get(rdma);
14862306a36Sopenharmony_ci	if (!ctxt)
14962306a36Sopenharmony_ci		goto drop_connection;
15062306a36Sopenharmony_ci
15162306a36Sopenharmony_ci	p = xdr_reserve_space(&ctxt->sc_stream, RPCRDMA_HDRLEN_MIN);
15262306a36Sopenharmony_ci	if (!p)
15362306a36Sopenharmony_ci		goto put_ctxt;
15462306a36Sopenharmony_ci	*p++ = rqst->rq_xid;
15562306a36Sopenharmony_ci	*p++ = rpcrdma_version;
15662306a36Sopenharmony_ci	*p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
15762306a36Sopenharmony_ci	*p++ = rdma_msg;
15862306a36Sopenharmony_ci	*p++ = xdr_zero;
15962306a36Sopenharmony_ci	*p++ = xdr_zero;
16062306a36Sopenharmony_ci	*p   = xdr_zero;
16162306a36Sopenharmony_ci
16262306a36Sopenharmony_ci	rqst->rq_xtime = ktime_get();
16362306a36Sopenharmony_ci	rc = svc_rdma_bc_sendto(rdma, rqst, ctxt);
16462306a36Sopenharmony_ci	if (rc)
16562306a36Sopenharmony_ci		goto put_ctxt;
16662306a36Sopenharmony_ci	return 0;
16762306a36Sopenharmony_ci
16862306a36Sopenharmony_ciput_ctxt:
16962306a36Sopenharmony_ci	svc_rdma_send_ctxt_put(rdma, ctxt);
17062306a36Sopenharmony_ci
17162306a36Sopenharmony_cidrop_connection:
17262306a36Sopenharmony_ci	return -ENOTCONN;
17362306a36Sopenharmony_ci}
17462306a36Sopenharmony_ci
17562306a36Sopenharmony_ci/**
17662306a36Sopenharmony_ci * xprt_rdma_bc_send_request - Send a reverse-direction Call
17762306a36Sopenharmony_ci * @rqst: rpc_rqst containing Call message to be sent
17862306a36Sopenharmony_ci *
17962306a36Sopenharmony_ci * Return values:
18062306a36Sopenharmony_ci *   %0 if the message was sent successfully
18162306a36Sopenharmony_ci *   %ENOTCONN if the message was not sent
18262306a36Sopenharmony_ci */
18362306a36Sopenharmony_cistatic int xprt_rdma_bc_send_request(struct rpc_rqst *rqst)
18462306a36Sopenharmony_ci{
18562306a36Sopenharmony_ci	struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
18662306a36Sopenharmony_ci	struct svcxprt_rdma *rdma =
18762306a36Sopenharmony_ci		container_of(sxprt, struct svcxprt_rdma, sc_xprt);
18862306a36Sopenharmony_ci	int ret;
18962306a36Sopenharmony_ci
19062306a36Sopenharmony_ci	if (test_bit(XPT_DEAD, &sxprt->xpt_flags))
19162306a36Sopenharmony_ci		return -ENOTCONN;
19262306a36Sopenharmony_ci
19362306a36Sopenharmony_ci	ret = rpcrdma_bc_send_request(rdma, rqst);
19462306a36Sopenharmony_ci	if (ret == -ENOTCONN)
19562306a36Sopenharmony_ci		svc_xprt_close(sxprt);
19662306a36Sopenharmony_ci	return ret;
19762306a36Sopenharmony_ci}
19862306a36Sopenharmony_ci
19962306a36Sopenharmony_cistatic void
20062306a36Sopenharmony_cixprt_rdma_bc_close(struct rpc_xprt *xprt)
20162306a36Sopenharmony_ci{
20262306a36Sopenharmony_ci	xprt_disconnect_done(xprt);
20362306a36Sopenharmony_ci	xprt->cwnd = RPC_CWNDSHIFT;
20462306a36Sopenharmony_ci}
20562306a36Sopenharmony_ci
20662306a36Sopenharmony_cistatic void
20762306a36Sopenharmony_cixprt_rdma_bc_put(struct rpc_xprt *xprt)
20862306a36Sopenharmony_ci{
20962306a36Sopenharmony_ci	xprt_rdma_free_addresses(xprt);
21062306a36Sopenharmony_ci	xprt_free(xprt);
21162306a36Sopenharmony_ci}
21262306a36Sopenharmony_ci
21362306a36Sopenharmony_cistatic const struct rpc_xprt_ops xprt_rdma_bc_procs = {
21462306a36Sopenharmony_ci	.reserve_xprt		= xprt_reserve_xprt_cong,
21562306a36Sopenharmony_ci	.release_xprt		= xprt_release_xprt_cong,
21662306a36Sopenharmony_ci	.alloc_slot		= xprt_alloc_slot,
21762306a36Sopenharmony_ci	.free_slot		= xprt_free_slot,
21862306a36Sopenharmony_ci	.release_request	= xprt_release_rqst_cong,
21962306a36Sopenharmony_ci	.buf_alloc		= xprt_rdma_bc_allocate,
22062306a36Sopenharmony_ci	.buf_free		= xprt_rdma_bc_free,
22162306a36Sopenharmony_ci	.send_request		= xprt_rdma_bc_send_request,
22262306a36Sopenharmony_ci	.wait_for_reply_request	= xprt_wait_for_reply_request_def,
22362306a36Sopenharmony_ci	.close			= xprt_rdma_bc_close,
22462306a36Sopenharmony_ci	.destroy		= xprt_rdma_bc_put,
22562306a36Sopenharmony_ci	.print_stats		= xprt_rdma_print_stats
22662306a36Sopenharmony_ci};
22762306a36Sopenharmony_ci
22862306a36Sopenharmony_cistatic const struct rpc_timeout xprt_rdma_bc_timeout = {
22962306a36Sopenharmony_ci	.to_initval = 60 * HZ,
23062306a36Sopenharmony_ci	.to_maxval = 60 * HZ,
23162306a36Sopenharmony_ci};
23262306a36Sopenharmony_ci
23362306a36Sopenharmony_ci/* It shouldn't matter if the number of backchannel session slots
23462306a36Sopenharmony_ci * doesn't match the number of RPC/RDMA credits. That just means
23562306a36Sopenharmony_ci * one or the other will have extra slots that aren't used.
23662306a36Sopenharmony_ci */
23762306a36Sopenharmony_cistatic struct rpc_xprt *
23862306a36Sopenharmony_cixprt_setup_rdma_bc(struct xprt_create *args)
23962306a36Sopenharmony_ci{
24062306a36Sopenharmony_ci	struct rpc_xprt *xprt;
24162306a36Sopenharmony_ci	struct rpcrdma_xprt *new_xprt;
24262306a36Sopenharmony_ci
24362306a36Sopenharmony_ci	if (args->addrlen > sizeof(xprt->addr))
24462306a36Sopenharmony_ci		return ERR_PTR(-EBADF);
24562306a36Sopenharmony_ci
24662306a36Sopenharmony_ci	xprt = xprt_alloc(args->net, sizeof(*new_xprt),
24762306a36Sopenharmony_ci			  RPCRDMA_MAX_BC_REQUESTS,
24862306a36Sopenharmony_ci			  RPCRDMA_MAX_BC_REQUESTS);
24962306a36Sopenharmony_ci	if (!xprt)
25062306a36Sopenharmony_ci		return ERR_PTR(-ENOMEM);
25162306a36Sopenharmony_ci
25262306a36Sopenharmony_ci	xprt->timeout = &xprt_rdma_bc_timeout;
25362306a36Sopenharmony_ci	xprt_set_bound(xprt);
25462306a36Sopenharmony_ci	xprt_set_connected(xprt);
25562306a36Sopenharmony_ci	xprt->bind_timeout = 0;
25662306a36Sopenharmony_ci	xprt->reestablish_timeout = 0;
25762306a36Sopenharmony_ci	xprt->idle_timeout = 0;
25862306a36Sopenharmony_ci
25962306a36Sopenharmony_ci	xprt->prot = XPRT_TRANSPORT_BC_RDMA;
26062306a36Sopenharmony_ci	xprt->ops = &xprt_rdma_bc_procs;
26162306a36Sopenharmony_ci
26262306a36Sopenharmony_ci	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
26362306a36Sopenharmony_ci	xprt->addrlen = args->addrlen;
26462306a36Sopenharmony_ci	xprt_rdma_format_addresses(xprt, (struct sockaddr *)&xprt->addr);
26562306a36Sopenharmony_ci	xprt->resvport = 0;
26662306a36Sopenharmony_ci
26762306a36Sopenharmony_ci	xprt->max_payload = xprt_rdma_max_inline_read;
26862306a36Sopenharmony_ci
26962306a36Sopenharmony_ci	new_xprt = rpcx_to_rdmax(xprt);
27062306a36Sopenharmony_ci	new_xprt->rx_buf.rb_bc_max_requests = xprt->max_reqs;
27162306a36Sopenharmony_ci
27262306a36Sopenharmony_ci	xprt_get(xprt);
27362306a36Sopenharmony_ci	args->bc_xprt->xpt_bc_xprt = xprt;
27462306a36Sopenharmony_ci	xprt->bc_xprt = args->bc_xprt;
27562306a36Sopenharmony_ci
27662306a36Sopenharmony_ci	/* Final put for backchannel xprt is in __svc_rdma_free */
27762306a36Sopenharmony_ci	xprt_get(xprt);
27862306a36Sopenharmony_ci	return xprt;
27962306a36Sopenharmony_ci}
28062306a36Sopenharmony_ci
28162306a36Sopenharmony_cistruct xprt_class xprt_rdma_bc = {
28262306a36Sopenharmony_ci	.list			= LIST_HEAD_INIT(xprt_rdma_bc.list),
28362306a36Sopenharmony_ci	.name			= "rdma backchannel",
28462306a36Sopenharmony_ci	.owner			= THIS_MODULE,
28562306a36Sopenharmony_ci	.ident			= XPRT_TRANSPORT_BC_RDMA,
28662306a36Sopenharmony_ci	.setup			= xprt_setup_rdma_bc,
28762306a36Sopenharmony_ci};
288