162306a36Sopenharmony_ci// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
262306a36Sopenharmony_ci/*
362306a36Sopenharmony_ci * Copyright (c) 2016-2018 Oracle. All rights reserved.
462306a36Sopenharmony_ci * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
562306a36Sopenharmony_ci * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
662306a36Sopenharmony_ci *
762306a36Sopenharmony_ci * This software is available to you under a choice of one of two
862306a36Sopenharmony_ci * licenses.  You may choose to be licensed under the terms of the GNU
962306a36Sopenharmony_ci * General Public License (GPL) Version 2, available from the file
1062306a36Sopenharmony_ci * COPYING in the main directory of this source tree, or the BSD-type
1162306a36Sopenharmony_ci * license below:
1262306a36Sopenharmony_ci *
1362306a36Sopenharmony_ci * Redistribution and use in source and binary forms, with or without
1462306a36Sopenharmony_ci * modification, are permitted provided that the following conditions
1562306a36Sopenharmony_ci * are met:
1662306a36Sopenharmony_ci *
1762306a36Sopenharmony_ci *      Redistributions of source code must retain the above copyright
1862306a36Sopenharmony_ci *      notice, this list of conditions and the following disclaimer.
1962306a36Sopenharmony_ci *
2062306a36Sopenharmony_ci *      Redistributions in binary form must reproduce the above
2162306a36Sopenharmony_ci *      copyright notice, this list of conditions and the following
2262306a36Sopenharmony_ci *      disclaimer in the documentation and/or other materials provided
2362306a36Sopenharmony_ci *      with the distribution.
2462306a36Sopenharmony_ci *
2562306a36Sopenharmony_ci *      Neither the name of the Network Appliance, Inc. nor the names of
2662306a36Sopenharmony_ci *      its contributors may be used to endorse or promote products
2762306a36Sopenharmony_ci *      derived from this software without specific prior written
2862306a36Sopenharmony_ci *      permission.
2962306a36Sopenharmony_ci *
3062306a36Sopenharmony_ci * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
3162306a36Sopenharmony_ci * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
3262306a36Sopenharmony_ci * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
3362306a36Sopenharmony_ci * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
3462306a36Sopenharmony_ci * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
3562306a36Sopenharmony_ci * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
3662306a36Sopenharmony_ci * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
3762306a36Sopenharmony_ci * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
3862306a36Sopenharmony_ci * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
3962306a36Sopenharmony_ci * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
4062306a36Sopenharmony_ci * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
4162306a36Sopenharmony_ci *
4262306a36Sopenharmony_ci * Author: Tom Tucker <tom@opengridcomputing.com>
4362306a36Sopenharmony_ci */
4462306a36Sopenharmony_ci
4562306a36Sopenharmony_ci/* Operation
4662306a36Sopenharmony_ci *
4762306a36Sopenharmony_ci * The main entry point is svc_rdma_sendto. This is called by the
4862306a36Sopenharmony_ci * RPC server when an RPC Reply is ready to be transmitted to a client.
4962306a36Sopenharmony_ci *
5062306a36Sopenharmony_ci * The passed-in svc_rqst contains a struct xdr_buf which holds an
5162306a36Sopenharmony_ci * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA
5262306a36Sopenharmony_ci * transport header, post all Write WRs needed for this Reply, then post
5362306a36Sopenharmony_ci * a Send WR conveying the transport header and the RPC message itself to
5462306a36Sopenharmony_ci * the client.
5562306a36Sopenharmony_ci *
5662306a36Sopenharmony_ci * svc_rdma_sendto must fully transmit the Reply before returning, as
5762306a36Sopenharmony_ci * the svc_rqst will be recycled as soon as sendto returns. Remaining
5862306a36Sopenharmony_ci * resources referred to by the svc_rqst are also recycled at that time.
5962306a36Sopenharmony_ci * Therefore any resources that must remain longer must be detached
6062306a36Sopenharmony_ci * from the svc_rqst and released later.
6162306a36Sopenharmony_ci *
6262306a36Sopenharmony_ci * Page Management
6362306a36Sopenharmony_ci *
6462306a36Sopenharmony_ci * The I/O that performs Reply transmission is asynchronous, and may
6562306a36Sopenharmony_ci * complete well after sendto returns. Thus pages under I/O must be
6662306a36Sopenharmony_ci * removed from the svc_rqst before sendto returns.
6762306a36Sopenharmony_ci *
6862306a36Sopenharmony_ci * The logic here depends on Send Queue and completion ordering. Since
6962306a36Sopenharmony_ci * the Send WR is always posted last, it will always complete last. Thus
7062306a36Sopenharmony_ci * when it completes, it is guaranteed that all previous Write WRs have
7162306a36Sopenharmony_ci * also completed.
7262306a36Sopenharmony_ci *
7362306a36Sopenharmony_ci * Write WRs are constructed and posted. Each Write segment gets its own
7462306a36Sopenharmony_ci * svc_rdma_rw_ctxt, allowing the Write completion handler to find and
7562306a36Sopenharmony_ci * DMA-unmap the pages under I/O for that Write segment. The Write
7662306a36Sopenharmony_ci * completion handler does not release any pages.
7762306a36Sopenharmony_ci *
7862306a36Sopenharmony_ci * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt.
7962306a36Sopenharmony_ci * The ownership of all of the Reply's pages are transferred into that
8062306a36Sopenharmony_ci * ctxt, the Send WR is posted, and sendto returns.
8162306a36Sopenharmony_ci *
8262306a36Sopenharmony_ci * The svc_rdma_send_ctxt is presented when the Send WR completes. The
8362306a36Sopenharmony_ci * Send completion handler finally releases the Reply's pages.
8462306a36Sopenharmony_ci *
8562306a36Sopenharmony_ci * This mechanism also assumes that completions on the transport's Send
8662306a36Sopenharmony_ci * Completion Queue do not run in parallel. Otherwise a Write completion
8762306a36Sopenharmony_ci * and Send completion running at the same time could release pages that
8862306a36Sopenharmony_ci * are still DMA-mapped.
8962306a36Sopenharmony_ci *
9062306a36Sopenharmony_ci * Error Handling
9162306a36Sopenharmony_ci *
9262306a36Sopenharmony_ci * - If the Send WR is posted successfully, it will either complete
9362306a36Sopenharmony_ci *   successfully, or get flushed. Either way, the Send completion
9462306a36Sopenharmony_ci *   handler releases the Reply's pages.
9562306a36Sopenharmony_ci * - If the Send WR cannot be not posted, the forward path releases
9662306a36Sopenharmony_ci *   the Reply's pages.
9762306a36Sopenharmony_ci *
9862306a36Sopenharmony_ci * This handles the case, without the use of page reference counting,
9962306a36Sopenharmony_ci * where two different Write segments send portions of the same page.
10062306a36Sopenharmony_ci */
10162306a36Sopenharmony_ci
10262306a36Sopenharmony_ci#include <linux/spinlock.h>
10362306a36Sopenharmony_ci#include <asm/unaligned.h>
10462306a36Sopenharmony_ci
10562306a36Sopenharmony_ci#include <rdma/ib_verbs.h>
10662306a36Sopenharmony_ci#include <rdma/rdma_cm.h>
10762306a36Sopenharmony_ci
10862306a36Sopenharmony_ci#include <linux/sunrpc/debug.h>
10962306a36Sopenharmony_ci#include <linux/sunrpc/svc_rdma.h>
11062306a36Sopenharmony_ci
11162306a36Sopenharmony_ci#include "xprt_rdma.h"
11262306a36Sopenharmony_ci#include <trace/events/rpcrdma.h>
11362306a36Sopenharmony_ci
11462306a36Sopenharmony_cistatic void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc);
11562306a36Sopenharmony_ci
11662306a36Sopenharmony_cistatic void svc_rdma_send_cid_init(struct svcxprt_rdma *rdma,
11762306a36Sopenharmony_ci				   struct rpc_rdma_cid *cid)
11862306a36Sopenharmony_ci{
11962306a36Sopenharmony_ci	cid->ci_queue_id = rdma->sc_sq_cq->res.id;
12062306a36Sopenharmony_ci	cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
12162306a36Sopenharmony_ci}
12262306a36Sopenharmony_ci
12362306a36Sopenharmony_cistatic struct svc_rdma_send_ctxt *
12462306a36Sopenharmony_cisvc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
12562306a36Sopenharmony_ci{
12662306a36Sopenharmony_ci	int node = ibdev_to_node(rdma->sc_cm_id->device);
12762306a36Sopenharmony_ci	struct svc_rdma_send_ctxt *ctxt;
12862306a36Sopenharmony_ci	dma_addr_t addr;
12962306a36Sopenharmony_ci	void *buffer;
13062306a36Sopenharmony_ci	int i;
13162306a36Sopenharmony_ci
13262306a36Sopenharmony_ci	ctxt = kmalloc_node(struct_size(ctxt, sc_sges, rdma->sc_max_send_sges),
13362306a36Sopenharmony_ci			    GFP_KERNEL, node);
13462306a36Sopenharmony_ci	if (!ctxt)
13562306a36Sopenharmony_ci		goto fail0;
13662306a36Sopenharmony_ci	buffer = kmalloc_node(rdma->sc_max_req_size, GFP_KERNEL, node);
13762306a36Sopenharmony_ci	if (!buffer)
13862306a36Sopenharmony_ci		goto fail1;
13962306a36Sopenharmony_ci	addr = ib_dma_map_single(rdma->sc_pd->device, buffer,
14062306a36Sopenharmony_ci				 rdma->sc_max_req_size, DMA_TO_DEVICE);
14162306a36Sopenharmony_ci	if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
14262306a36Sopenharmony_ci		goto fail2;
14362306a36Sopenharmony_ci
14462306a36Sopenharmony_ci	svc_rdma_send_cid_init(rdma, &ctxt->sc_cid);
14562306a36Sopenharmony_ci
14662306a36Sopenharmony_ci	ctxt->sc_send_wr.next = NULL;
14762306a36Sopenharmony_ci	ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
14862306a36Sopenharmony_ci	ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
14962306a36Sopenharmony_ci	ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
15062306a36Sopenharmony_ci	ctxt->sc_cqe.done = svc_rdma_wc_send;
15162306a36Sopenharmony_ci	ctxt->sc_xprt_buf = buffer;
15262306a36Sopenharmony_ci	xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
15362306a36Sopenharmony_ci		     rdma->sc_max_req_size);
15462306a36Sopenharmony_ci	ctxt->sc_sges[0].addr = addr;
15562306a36Sopenharmony_ci
15662306a36Sopenharmony_ci	for (i = 0; i < rdma->sc_max_send_sges; i++)
15762306a36Sopenharmony_ci		ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey;
15862306a36Sopenharmony_ci	return ctxt;
15962306a36Sopenharmony_ci
16062306a36Sopenharmony_cifail2:
16162306a36Sopenharmony_ci	kfree(buffer);
16262306a36Sopenharmony_cifail1:
16362306a36Sopenharmony_ci	kfree(ctxt);
16462306a36Sopenharmony_cifail0:
16562306a36Sopenharmony_ci	return NULL;
16662306a36Sopenharmony_ci}
16762306a36Sopenharmony_ci
16862306a36Sopenharmony_ci/**
16962306a36Sopenharmony_ci * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt
17062306a36Sopenharmony_ci * @rdma: svcxprt_rdma being torn down
17162306a36Sopenharmony_ci *
17262306a36Sopenharmony_ci */
17362306a36Sopenharmony_civoid svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma)
17462306a36Sopenharmony_ci{
17562306a36Sopenharmony_ci	struct svc_rdma_send_ctxt *ctxt;
17662306a36Sopenharmony_ci	struct llist_node *node;
17762306a36Sopenharmony_ci
17862306a36Sopenharmony_ci	while ((node = llist_del_first(&rdma->sc_send_ctxts)) != NULL) {
17962306a36Sopenharmony_ci		ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node);
18062306a36Sopenharmony_ci		ib_dma_unmap_single(rdma->sc_pd->device,
18162306a36Sopenharmony_ci				    ctxt->sc_sges[0].addr,
18262306a36Sopenharmony_ci				    rdma->sc_max_req_size,
18362306a36Sopenharmony_ci				    DMA_TO_DEVICE);
18462306a36Sopenharmony_ci		kfree(ctxt->sc_xprt_buf);
18562306a36Sopenharmony_ci		kfree(ctxt);
18662306a36Sopenharmony_ci	}
18762306a36Sopenharmony_ci}
18862306a36Sopenharmony_ci
18962306a36Sopenharmony_ci/**
19062306a36Sopenharmony_ci * svc_rdma_send_ctxt_get - Get a free send_ctxt
19162306a36Sopenharmony_ci * @rdma: controlling svcxprt_rdma
19262306a36Sopenharmony_ci *
19362306a36Sopenharmony_ci * Returns a ready-to-use send_ctxt, or NULL if none are
19462306a36Sopenharmony_ci * available and a fresh one cannot be allocated.
19562306a36Sopenharmony_ci */
19662306a36Sopenharmony_cistruct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)
19762306a36Sopenharmony_ci{
19862306a36Sopenharmony_ci	struct svc_rdma_send_ctxt *ctxt;
19962306a36Sopenharmony_ci	struct llist_node *node;
20062306a36Sopenharmony_ci
20162306a36Sopenharmony_ci	spin_lock(&rdma->sc_send_lock);
20262306a36Sopenharmony_ci	node = llist_del_first(&rdma->sc_send_ctxts);
20362306a36Sopenharmony_ci	if (!node)
20462306a36Sopenharmony_ci		goto out_empty;
20562306a36Sopenharmony_ci	ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node);
20662306a36Sopenharmony_ci	spin_unlock(&rdma->sc_send_lock);
20762306a36Sopenharmony_ci
20862306a36Sopenharmony_ciout:
20962306a36Sopenharmony_ci	rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
21062306a36Sopenharmony_ci	xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
21162306a36Sopenharmony_ci			ctxt->sc_xprt_buf, NULL);
21262306a36Sopenharmony_ci
21362306a36Sopenharmony_ci	ctxt->sc_send_wr.num_sge = 0;
21462306a36Sopenharmony_ci	ctxt->sc_cur_sge_no = 0;
21562306a36Sopenharmony_ci	ctxt->sc_page_count = 0;
21662306a36Sopenharmony_ci	return ctxt;
21762306a36Sopenharmony_ci
21862306a36Sopenharmony_ciout_empty:
21962306a36Sopenharmony_ci	spin_unlock(&rdma->sc_send_lock);
22062306a36Sopenharmony_ci	ctxt = svc_rdma_send_ctxt_alloc(rdma);
22162306a36Sopenharmony_ci	if (!ctxt)
22262306a36Sopenharmony_ci		return NULL;
22362306a36Sopenharmony_ci	goto out;
22462306a36Sopenharmony_ci}
22562306a36Sopenharmony_ci
22662306a36Sopenharmony_ci/**
22762306a36Sopenharmony_ci * svc_rdma_send_ctxt_put - Return send_ctxt to free list
22862306a36Sopenharmony_ci * @rdma: controlling svcxprt_rdma
22962306a36Sopenharmony_ci * @ctxt: object to return to the free list
23062306a36Sopenharmony_ci *
23162306a36Sopenharmony_ci * Pages left in sc_pages are DMA unmapped and released.
23262306a36Sopenharmony_ci */
23362306a36Sopenharmony_civoid svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
23462306a36Sopenharmony_ci			    struct svc_rdma_send_ctxt *ctxt)
23562306a36Sopenharmony_ci{
23662306a36Sopenharmony_ci	struct ib_device *device = rdma->sc_cm_id->device;
23762306a36Sopenharmony_ci	unsigned int i;
23862306a36Sopenharmony_ci
23962306a36Sopenharmony_ci	if (ctxt->sc_page_count)
24062306a36Sopenharmony_ci		release_pages(ctxt->sc_pages, ctxt->sc_page_count);
24162306a36Sopenharmony_ci
24262306a36Sopenharmony_ci	/* The first SGE contains the transport header, which
24362306a36Sopenharmony_ci	 * remains mapped until @ctxt is destroyed.
24462306a36Sopenharmony_ci	 */
24562306a36Sopenharmony_ci	for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) {
24662306a36Sopenharmony_ci		ib_dma_unmap_page(device,
24762306a36Sopenharmony_ci				  ctxt->sc_sges[i].addr,
24862306a36Sopenharmony_ci				  ctxt->sc_sges[i].length,
24962306a36Sopenharmony_ci				  DMA_TO_DEVICE);
25062306a36Sopenharmony_ci		trace_svcrdma_dma_unmap_page(rdma,
25162306a36Sopenharmony_ci					     ctxt->sc_sges[i].addr,
25262306a36Sopenharmony_ci					     ctxt->sc_sges[i].length);
25362306a36Sopenharmony_ci	}
25462306a36Sopenharmony_ci
25562306a36Sopenharmony_ci	llist_add(&ctxt->sc_node, &rdma->sc_send_ctxts);
25662306a36Sopenharmony_ci}
25762306a36Sopenharmony_ci
25862306a36Sopenharmony_ci/**
25962306a36Sopenharmony_ci * svc_rdma_wake_send_waiters - manage Send Queue accounting
26062306a36Sopenharmony_ci * @rdma: controlling transport
26162306a36Sopenharmony_ci * @avail: Number of additional SQEs that are now available
26262306a36Sopenharmony_ci *
26362306a36Sopenharmony_ci */
26462306a36Sopenharmony_civoid svc_rdma_wake_send_waiters(struct svcxprt_rdma *rdma, int avail)
26562306a36Sopenharmony_ci{
26662306a36Sopenharmony_ci	atomic_add(avail, &rdma->sc_sq_avail);
26762306a36Sopenharmony_ci	smp_mb__after_atomic();
26862306a36Sopenharmony_ci	if (unlikely(waitqueue_active(&rdma->sc_send_wait)))
26962306a36Sopenharmony_ci		wake_up(&rdma->sc_send_wait);
27062306a36Sopenharmony_ci}
27162306a36Sopenharmony_ci
27262306a36Sopenharmony_ci/**
27362306a36Sopenharmony_ci * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
27462306a36Sopenharmony_ci * @cq: Completion Queue context
27562306a36Sopenharmony_ci * @wc: Work Completion object
27662306a36Sopenharmony_ci *
27762306a36Sopenharmony_ci * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
27862306a36Sopenharmony_ci * the Send completion handler could be running.
27962306a36Sopenharmony_ci */
28062306a36Sopenharmony_cistatic void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
28162306a36Sopenharmony_ci{
28262306a36Sopenharmony_ci	struct svcxprt_rdma *rdma = cq->cq_context;
28362306a36Sopenharmony_ci	struct ib_cqe *cqe = wc->wr_cqe;
28462306a36Sopenharmony_ci	struct svc_rdma_send_ctxt *ctxt =
28562306a36Sopenharmony_ci		container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
28662306a36Sopenharmony_ci
28762306a36Sopenharmony_ci	svc_rdma_wake_send_waiters(rdma, 1);
28862306a36Sopenharmony_ci
28962306a36Sopenharmony_ci	if (unlikely(wc->status != IB_WC_SUCCESS))
29062306a36Sopenharmony_ci		goto flushed;
29162306a36Sopenharmony_ci
29262306a36Sopenharmony_ci	trace_svcrdma_wc_send(wc, &ctxt->sc_cid);
29362306a36Sopenharmony_ci	svc_rdma_send_ctxt_put(rdma, ctxt);
29462306a36Sopenharmony_ci	return;
29562306a36Sopenharmony_ci
29662306a36Sopenharmony_ciflushed:
29762306a36Sopenharmony_ci	if (wc->status != IB_WC_WR_FLUSH_ERR)
29862306a36Sopenharmony_ci		trace_svcrdma_wc_send_err(wc, &ctxt->sc_cid);
29962306a36Sopenharmony_ci	else
30062306a36Sopenharmony_ci		trace_svcrdma_wc_send_flush(wc, &ctxt->sc_cid);
30162306a36Sopenharmony_ci	svc_rdma_send_ctxt_put(rdma, ctxt);
30262306a36Sopenharmony_ci	svc_xprt_deferred_close(&rdma->sc_xprt);
30362306a36Sopenharmony_ci}
30462306a36Sopenharmony_ci
30562306a36Sopenharmony_ci/**
30662306a36Sopenharmony_ci * svc_rdma_send - Post a single Send WR
30762306a36Sopenharmony_ci * @rdma: transport on which to post the WR
30862306a36Sopenharmony_ci * @ctxt: send ctxt with a Send WR ready to post
30962306a36Sopenharmony_ci *
31062306a36Sopenharmony_ci * Returns zero if the Send WR was posted successfully. Otherwise, a
31162306a36Sopenharmony_ci * negative errno is returned.
31262306a36Sopenharmony_ci */
31362306a36Sopenharmony_ciint svc_rdma_send(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *ctxt)
31462306a36Sopenharmony_ci{
31562306a36Sopenharmony_ci	struct ib_send_wr *wr = &ctxt->sc_send_wr;
31662306a36Sopenharmony_ci	int ret;
31762306a36Sopenharmony_ci
31862306a36Sopenharmony_ci	might_sleep();
31962306a36Sopenharmony_ci
32062306a36Sopenharmony_ci	/* Sync the transport header buffer */
32162306a36Sopenharmony_ci	ib_dma_sync_single_for_device(rdma->sc_pd->device,
32262306a36Sopenharmony_ci				      wr->sg_list[0].addr,
32362306a36Sopenharmony_ci				      wr->sg_list[0].length,
32462306a36Sopenharmony_ci				      DMA_TO_DEVICE);
32562306a36Sopenharmony_ci
32662306a36Sopenharmony_ci	/* If the SQ is full, wait until an SQ entry is available */
32762306a36Sopenharmony_ci	while (1) {
32862306a36Sopenharmony_ci		if ((atomic_dec_return(&rdma->sc_sq_avail) < 0)) {
32962306a36Sopenharmony_ci			percpu_counter_inc(&svcrdma_stat_sq_starve);
33062306a36Sopenharmony_ci			trace_svcrdma_sq_full(rdma);
33162306a36Sopenharmony_ci			atomic_inc(&rdma->sc_sq_avail);
33262306a36Sopenharmony_ci			wait_event(rdma->sc_send_wait,
33362306a36Sopenharmony_ci				   atomic_read(&rdma->sc_sq_avail) > 1);
33462306a36Sopenharmony_ci			if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
33562306a36Sopenharmony_ci				return -ENOTCONN;
33662306a36Sopenharmony_ci			trace_svcrdma_sq_retry(rdma);
33762306a36Sopenharmony_ci			continue;
33862306a36Sopenharmony_ci		}
33962306a36Sopenharmony_ci
34062306a36Sopenharmony_ci		trace_svcrdma_post_send(ctxt);
34162306a36Sopenharmony_ci		ret = ib_post_send(rdma->sc_qp, wr, NULL);
34262306a36Sopenharmony_ci		if (ret)
34362306a36Sopenharmony_ci			break;
34462306a36Sopenharmony_ci		return 0;
34562306a36Sopenharmony_ci	}
34662306a36Sopenharmony_ci
34762306a36Sopenharmony_ci	trace_svcrdma_sq_post_err(rdma, ret);
34862306a36Sopenharmony_ci	svc_xprt_deferred_close(&rdma->sc_xprt);
34962306a36Sopenharmony_ci	wake_up(&rdma->sc_send_wait);
35062306a36Sopenharmony_ci	return ret;
35162306a36Sopenharmony_ci}
35262306a36Sopenharmony_ci
35362306a36Sopenharmony_ci/**
35462306a36Sopenharmony_ci * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list
35562306a36Sopenharmony_ci * @sctxt: Send context for the RPC Reply
35662306a36Sopenharmony_ci *
35762306a36Sopenharmony_ci * Return values:
35862306a36Sopenharmony_ci *   On success, returns length in bytes of the Reply XDR buffer
35962306a36Sopenharmony_ci *   that was consumed by the Reply Read list
36062306a36Sopenharmony_ci *   %-EMSGSIZE on XDR buffer overflow
36162306a36Sopenharmony_ci */
36262306a36Sopenharmony_cistatic ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
36362306a36Sopenharmony_ci{
36462306a36Sopenharmony_ci	/* RPC-over-RDMA version 1 replies never have a Read list. */
36562306a36Sopenharmony_ci	return xdr_stream_encode_item_absent(&sctxt->sc_stream);
36662306a36Sopenharmony_ci}
36762306a36Sopenharmony_ci
36862306a36Sopenharmony_ci/**
36962306a36Sopenharmony_ci * svc_rdma_encode_write_segment - Encode one Write segment
37062306a36Sopenharmony_ci * @sctxt: Send context for the RPC Reply
37162306a36Sopenharmony_ci * @chunk: Write chunk to push
37262306a36Sopenharmony_ci * @remaining: remaining bytes of the payload left in the Write chunk
37362306a36Sopenharmony_ci * @segno: which segment in the chunk
37462306a36Sopenharmony_ci *
37562306a36Sopenharmony_ci * Return values:
37662306a36Sopenharmony_ci *   On success, returns length in bytes of the Reply XDR buffer
37762306a36Sopenharmony_ci *   that was consumed by the Write segment, and updates @remaining
37862306a36Sopenharmony_ci *   %-EMSGSIZE on XDR buffer overflow
37962306a36Sopenharmony_ci */
38062306a36Sopenharmony_cistatic ssize_t svc_rdma_encode_write_segment(struct svc_rdma_send_ctxt *sctxt,
38162306a36Sopenharmony_ci					     const struct svc_rdma_chunk *chunk,
38262306a36Sopenharmony_ci					     u32 *remaining, unsigned int segno)
38362306a36Sopenharmony_ci{
38462306a36Sopenharmony_ci	const struct svc_rdma_segment *segment = &chunk->ch_segments[segno];
38562306a36Sopenharmony_ci	const size_t len = rpcrdma_segment_maxsz * sizeof(__be32);
38662306a36Sopenharmony_ci	u32 length;
38762306a36Sopenharmony_ci	__be32 *p;
38862306a36Sopenharmony_ci
38962306a36Sopenharmony_ci	p = xdr_reserve_space(&sctxt->sc_stream, len);
39062306a36Sopenharmony_ci	if (!p)
39162306a36Sopenharmony_ci		return -EMSGSIZE;
39262306a36Sopenharmony_ci
39362306a36Sopenharmony_ci	length = min_t(u32, *remaining, segment->rs_length);
39462306a36Sopenharmony_ci	*remaining -= length;
39562306a36Sopenharmony_ci	xdr_encode_rdma_segment(p, segment->rs_handle, length,
39662306a36Sopenharmony_ci				segment->rs_offset);
39762306a36Sopenharmony_ci	trace_svcrdma_encode_wseg(sctxt, segno, segment->rs_handle, length,
39862306a36Sopenharmony_ci				  segment->rs_offset);
39962306a36Sopenharmony_ci	return len;
40062306a36Sopenharmony_ci}
40162306a36Sopenharmony_ci
40262306a36Sopenharmony_ci/**
40362306a36Sopenharmony_ci * svc_rdma_encode_write_chunk - Encode one Write chunk
40462306a36Sopenharmony_ci * @sctxt: Send context for the RPC Reply
40562306a36Sopenharmony_ci * @chunk: Write chunk to push
40662306a36Sopenharmony_ci *
40762306a36Sopenharmony_ci * Copy a Write chunk from the Call transport header to the
40862306a36Sopenharmony_ci * Reply transport header. Update each segment's length field
40962306a36Sopenharmony_ci * to reflect the number of bytes written in that segment.
41062306a36Sopenharmony_ci *
41162306a36Sopenharmony_ci * Return values:
41262306a36Sopenharmony_ci *   On success, returns length in bytes of the Reply XDR buffer
41362306a36Sopenharmony_ci *   that was consumed by the Write chunk
41462306a36Sopenharmony_ci *   %-EMSGSIZE on XDR buffer overflow
41562306a36Sopenharmony_ci */
41662306a36Sopenharmony_cistatic ssize_t svc_rdma_encode_write_chunk(struct svc_rdma_send_ctxt *sctxt,
41762306a36Sopenharmony_ci					   const struct svc_rdma_chunk *chunk)
41862306a36Sopenharmony_ci{
41962306a36Sopenharmony_ci	u32 remaining = chunk->ch_payload_length;
42062306a36Sopenharmony_ci	unsigned int segno;
42162306a36Sopenharmony_ci	ssize_t len, ret;
42262306a36Sopenharmony_ci
42362306a36Sopenharmony_ci	len = 0;
42462306a36Sopenharmony_ci	ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
42562306a36Sopenharmony_ci	if (ret < 0)
42662306a36Sopenharmony_ci		return ret;
42762306a36Sopenharmony_ci	len += ret;
42862306a36Sopenharmony_ci
42962306a36Sopenharmony_ci	ret = xdr_stream_encode_u32(&sctxt->sc_stream, chunk->ch_segcount);
43062306a36Sopenharmony_ci	if (ret < 0)
43162306a36Sopenharmony_ci		return ret;
43262306a36Sopenharmony_ci	len += ret;
43362306a36Sopenharmony_ci
43462306a36Sopenharmony_ci	for (segno = 0; segno < chunk->ch_segcount; segno++) {
43562306a36Sopenharmony_ci		ret = svc_rdma_encode_write_segment(sctxt, chunk, &remaining, segno);
43662306a36Sopenharmony_ci		if (ret < 0)
43762306a36Sopenharmony_ci			return ret;
43862306a36Sopenharmony_ci		len += ret;
43962306a36Sopenharmony_ci	}
44062306a36Sopenharmony_ci
44162306a36Sopenharmony_ci	return len;
44262306a36Sopenharmony_ci}
44362306a36Sopenharmony_ci
44462306a36Sopenharmony_ci/**
44562306a36Sopenharmony_ci * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
44662306a36Sopenharmony_ci * @rctxt: Reply context with information about the RPC Call
44762306a36Sopenharmony_ci * @sctxt: Send context for the RPC Reply
44862306a36Sopenharmony_ci *
44962306a36Sopenharmony_ci * Return values:
45062306a36Sopenharmony_ci *   On success, returns length in bytes of the Reply XDR buffer
45162306a36Sopenharmony_ci *   that was consumed by the Reply's Write list
45262306a36Sopenharmony_ci *   %-EMSGSIZE on XDR buffer overflow
45362306a36Sopenharmony_ci */
45462306a36Sopenharmony_cistatic ssize_t svc_rdma_encode_write_list(struct svc_rdma_recv_ctxt *rctxt,
45562306a36Sopenharmony_ci					  struct svc_rdma_send_ctxt *sctxt)
45662306a36Sopenharmony_ci{
45762306a36Sopenharmony_ci	struct svc_rdma_chunk *chunk;
45862306a36Sopenharmony_ci	ssize_t len, ret;
45962306a36Sopenharmony_ci
46062306a36Sopenharmony_ci	len = 0;
46162306a36Sopenharmony_ci	pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
46262306a36Sopenharmony_ci		ret = svc_rdma_encode_write_chunk(sctxt, chunk);
46362306a36Sopenharmony_ci		if (ret < 0)
46462306a36Sopenharmony_ci			return ret;
46562306a36Sopenharmony_ci		len += ret;
46662306a36Sopenharmony_ci	}
46762306a36Sopenharmony_ci
46862306a36Sopenharmony_ci	/* Terminate the Write list */
46962306a36Sopenharmony_ci	ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
47062306a36Sopenharmony_ci	if (ret < 0)
47162306a36Sopenharmony_ci		return ret;
47262306a36Sopenharmony_ci
47362306a36Sopenharmony_ci	return len + ret;
47462306a36Sopenharmony_ci}
47562306a36Sopenharmony_ci
47662306a36Sopenharmony_ci/**
47762306a36Sopenharmony_ci * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk
47862306a36Sopenharmony_ci * @rctxt: Reply context with information about the RPC Call
47962306a36Sopenharmony_ci * @sctxt: Send context for the RPC Reply
48062306a36Sopenharmony_ci * @length: size in bytes of the payload in the Reply chunk
48162306a36Sopenharmony_ci *
48262306a36Sopenharmony_ci * Return values:
48362306a36Sopenharmony_ci *   On success, returns length in bytes of the Reply XDR buffer
48462306a36Sopenharmony_ci *   that was consumed by the Reply's Reply chunk
48562306a36Sopenharmony_ci *   %-EMSGSIZE on XDR buffer overflow
48662306a36Sopenharmony_ci *   %-E2BIG if the RPC message is larger than the Reply chunk
48762306a36Sopenharmony_ci */
48862306a36Sopenharmony_cistatic ssize_t
48962306a36Sopenharmony_cisvc_rdma_encode_reply_chunk(struct svc_rdma_recv_ctxt *rctxt,
49062306a36Sopenharmony_ci			    struct svc_rdma_send_ctxt *sctxt,
49162306a36Sopenharmony_ci			    unsigned int length)
49262306a36Sopenharmony_ci{
49362306a36Sopenharmony_ci	struct svc_rdma_chunk *chunk;
49462306a36Sopenharmony_ci
49562306a36Sopenharmony_ci	if (pcl_is_empty(&rctxt->rc_reply_pcl))
49662306a36Sopenharmony_ci		return xdr_stream_encode_item_absent(&sctxt->sc_stream);
49762306a36Sopenharmony_ci
49862306a36Sopenharmony_ci	chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
49962306a36Sopenharmony_ci	if (length > chunk->ch_length)
50062306a36Sopenharmony_ci		return -E2BIG;
50162306a36Sopenharmony_ci
50262306a36Sopenharmony_ci	chunk->ch_payload_length = length;
50362306a36Sopenharmony_ci	return svc_rdma_encode_write_chunk(sctxt, chunk);
50462306a36Sopenharmony_ci}
50562306a36Sopenharmony_ci
50662306a36Sopenharmony_cistruct svc_rdma_map_data {
50762306a36Sopenharmony_ci	struct svcxprt_rdma		*md_rdma;
50862306a36Sopenharmony_ci	struct svc_rdma_send_ctxt	*md_ctxt;
50962306a36Sopenharmony_ci};
51062306a36Sopenharmony_ci
51162306a36Sopenharmony_ci/**
51262306a36Sopenharmony_ci * svc_rdma_page_dma_map - DMA map one page
51362306a36Sopenharmony_ci * @data: pointer to arguments
51462306a36Sopenharmony_ci * @page: struct page to DMA map
51562306a36Sopenharmony_ci * @offset: offset into the page
51662306a36Sopenharmony_ci * @len: number of bytes to map
51762306a36Sopenharmony_ci *
51862306a36Sopenharmony_ci * Returns:
51962306a36Sopenharmony_ci *   %0 if DMA mapping was successful
52062306a36Sopenharmony_ci *   %-EIO if the page cannot be DMA mapped
52162306a36Sopenharmony_ci */
52262306a36Sopenharmony_cistatic int svc_rdma_page_dma_map(void *data, struct page *page,
52362306a36Sopenharmony_ci				 unsigned long offset, unsigned int len)
52462306a36Sopenharmony_ci{
52562306a36Sopenharmony_ci	struct svc_rdma_map_data *args = data;
52662306a36Sopenharmony_ci	struct svcxprt_rdma *rdma = args->md_rdma;
52762306a36Sopenharmony_ci	struct svc_rdma_send_ctxt *ctxt = args->md_ctxt;
52862306a36Sopenharmony_ci	struct ib_device *dev = rdma->sc_cm_id->device;
52962306a36Sopenharmony_ci	dma_addr_t dma_addr;
53062306a36Sopenharmony_ci
53162306a36Sopenharmony_ci	++ctxt->sc_cur_sge_no;
53262306a36Sopenharmony_ci
53362306a36Sopenharmony_ci	dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
53462306a36Sopenharmony_ci	if (ib_dma_mapping_error(dev, dma_addr))
53562306a36Sopenharmony_ci		goto out_maperr;
53662306a36Sopenharmony_ci
53762306a36Sopenharmony_ci	trace_svcrdma_dma_map_page(rdma, dma_addr, len);
53862306a36Sopenharmony_ci	ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr;
53962306a36Sopenharmony_ci	ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len;
54062306a36Sopenharmony_ci	ctxt->sc_send_wr.num_sge++;
54162306a36Sopenharmony_ci	return 0;
54262306a36Sopenharmony_ci
54362306a36Sopenharmony_ciout_maperr:
54462306a36Sopenharmony_ci	trace_svcrdma_dma_map_err(rdma, dma_addr, len);
54562306a36Sopenharmony_ci	return -EIO;
54662306a36Sopenharmony_ci}
54762306a36Sopenharmony_ci
54862306a36Sopenharmony_ci/**
54962306a36Sopenharmony_ci * svc_rdma_iov_dma_map - DMA map an iovec
55062306a36Sopenharmony_ci * @data: pointer to arguments
55162306a36Sopenharmony_ci * @iov: kvec to DMA map
55262306a36Sopenharmony_ci *
55362306a36Sopenharmony_ci * ib_dma_map_page() is used here because svc_rdma_dma_unmap()
55462306a36Sopenharmony_ci * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively.
55562306a36Sopenharmony_ci *
55662306a36Sopenharmony_ci * Returns:
55762306a36Sopenharmony_ci *   %0 if DMA mapping was successful
55862306a36Sopenharmony_ci *   %-EIO if the iovec cannot be DMA mapped
55962306a36Sopenharmony_ci */
56062306a36Sopenharmony_cistatic int svc_rdma_iov_dma_map(void *data, const struct kvec *iov)
56162306a36Sopenharmony_ci{
56262306a36Sopenharmony_ci	if (!iov->iov_len)
56362306a36Sopenharmony_ci		return 0;
56462306a36Sopenharmony_ci	return svc_rdma_page_dma_map(data, virt_to_page(iov->iov_base),
56562306a36Sopenharmony_ci				     offset_in_page(iov->iov_base),
56662306a36Sopenharmony_ci				     iov->iov_len);
56762306a36Sopenharmony_ci}
56862306a36Sopenharmony_ci
56962306a36Sopenharmony_ci/**
57062306a36Sopenharmony_ci * svc_rdma_xb_dma_map - DMA map all segments of an xdr_buf
57162306a36Sopenharmony_ci * @xdr: xdr_buf containing portion of an RPC message to transmit
57262306a36Sopenharmony_ci * @data: pointer to arguments
57362306a36Sopenharmony_ci *
57462306a36Sopenharmony_ci * Returns:
57562306a36Sopenharmony_ci *   %0 if DMA mapping was successful
57662306a36Sopenharmony_ci *   %-EIO if DMA mapping failed
57762306a36Sopenharmony_ci *
57862306a36Sopenharmony_ci * On failure, any DMA mappings that have been already done must be
57962306a36Sopenharmony_ci * unmapped by the caller.
58062306a36Sopenharmony_ci */
58162306a36Sopenharmony_cistatic int svc_rdma_xb_dma_map(const struct xdr_buf *xdr, void *data)
58262306a36Sopenharmony_ci{
58362306a36Sopenharmony_ci	unsigned int len, remaining;
58462306a36Sopenharmony_ci	unsigned long pageoff;
58562306a36Sopenharmony_ci	struct page **ppages;
58662306a36Sopenharmony_ci	int ret;
58762306a36Sopenharmony_ci
58862306a36Sopenharmony_ci	ret = svc_rdma_iov_dma_map(data, &xdr->head[0]);
58962306a36Sopenharmony_ci	if (ret < 0)
59062306a36Sopenharmony_ci		return ret;
59162306a36Sopenharmony_ci
59262306a36Sopenharmony_ci	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
59362306a36Sopenharmony_ci	pageoff = offset_in_page(xdr->page_base);
59462306a36Sopenharmony_ci	remaining = xdr->page_len;
59562306a36Sopenharmony_ci	while (remaining) {
59662306a36Sopenharmony_ci		len = min_t(u32, PAGE_SIZE - pageoff, remaining);
59762306a36Sopenharmony_ci
59862306a36Sopenharmony_ci		ret = svc_rdma_page_dma_map(data, *ppages++, pageoff, len);
59962306a36Sopenharmony_ci		if (ret < 0)
60062306a36Sopenharmony_ci			return ret;
60162306a36Sopenharmony_ci
60262306a36Sopenharmony_ci		remaining -= len;
60362306a36Sopenharmony_ci		pageoff = 0;
60462306a36Sopenharmony_ci	}
60562306a36Sopenharmony_ci
60662306a36Sopenharmony_ci	ret = svc_rdma_iov_dma_map(data, &xdr->tail[0]);
60762306a36Sopenharmony_ci	if (ret < 0)
60862306a36Sopenharmony_ci		return ret;
60962306a36Sopenharmony_ci
61062306a36Sopenharmony_ci	return xdr->len;
61162306a36Sopenharmony_ci}
61262306a36Sopenharmony_ci
61362306a36Sopenharmony_cistruct svc_rdma_pullup_data {
61462306a36Sopenharmony_ci	u8		*pd_dest;
61562306a36Sopenharmony_ci	unsigned int	pd_length;
61662306a36Sopenharmony_ci	unsigned int	pd_num_sges;
61762306a36Sopenharmony_ci};
61862306a36Sopenharmony_ci
61962306a36Sopenharmony_ci/**
62062306a36Sopenharmony_ci * svc_rdma_xb_count_sges - Count how many SGEs will be needed
62162306a36Sopenharmony_ci * @xdr: xdr_buf containing portion of an RPC message to transmit
62262306a36Sopenharmony_ci * @data: pointer to arguments
62362306a36Sopenharmony_ci *
62462306a36Sopenharmony_ci * Returns:
62562306a36Sopenharmony_ci *   Number of SGEs needed to Send the contents of @xdr inline
62662306a36Sopenharmony_ci */
62762306a36Sopenharmony_cistatic int svc_rdma_xb_count_sges(const struct xdr_buf *xdr,
62862306a36Sopenharmony_ci				  void *data)
62962306a36Sopenharmony_ci{
63062306a36Sopenharmony_ci	struct svc_rdma_pullup_data *args = data;
63162306a36Sopenharmony_ci	unsigned int remaining;
63262306a36Sopenharmony_ci	unsigned long offset;
63362306a36Sopenharmony_ci
63462306a36Sopenharmony_ci	if (xdr->head[0].iov_len)
63562306a36Sopenharmony_ci		++args->pd_num_sges;
63662306a36Sopenharmony_ci
63762306a36Sopenharmony_ci	offset = offset_in_page(xdr->page_base);
63862306a36Sopenharmony_ci	remaining = xdr->page_len;
63962306a36Sopenharmony_ci	while (remaining) {
64062306a36Sopenharmony_ci		++args->pd_num_sges;
64162306a36Sopenharmony_ci		remaining -= min_t(u32, PAGE_SIZE - offset, remaining);
64262306a36Sopenharmony_ci		offset = 0;
64362306a36Sopenharmony_ci	}
64462306a36Sopenharmony_ci
64562306a36Sopenharmony_ci	if (xdr->tail[0].iov_len)
64662306a36Sopenharmony_ci		++args->pd_num_sges;
64762306a36Sopenharmony_ci
64862306a36Sopenharmony_ci	args->pd_length += xdr->len;
64962306a36Sopenharmony_ci	return 0;
65062306a36Sopenharmony_ci}
65162306a36Sopenharmony_ci
65262306a36Sopenharmony_ci/**
65362306a36Sopenharmony_ci * svc_rdma_pull_up_needed - Determine whether to use pull-up
65462306a36Sopenharmony_ci * @rdma: controlling transport
65562306a36Sopenharmony_ci * @sctxt: send_ctxt for the Send WR
65662306a36Sopenharmony_ci * @rctxt: Write and Reply chunks provided by client
65762306a36Sopenharmony_ci * @xdr: xdr_buf containing RPC message to transmit
65862306a36Sopenharmony_ci *
65962306a36Sopenharmony_ci * Returns:
66062306a36Sopenharmony_ci *   %true if pull-up must be used
66162306a36Sopenharmony_ci *   %false otherwise
66262306a36Sopenharmony_ci */
66362306a36Sopenharmony_cistatic bool svc_rdma_pull_up_needed(const struct svcxprt_rdma *rdma,
66462306a36Sopenharmony_ci				    const struct svc_rdma_send_ctxt *sctxt,
66562306a36Sopenharmony_ci				    const struct svc_rdma_recv_ctxt *rctxt,
66662306a36Sopenharmony_ci				    const struct xdr_buf *xdr)
66762306a36Sopenharmony_ci{
66862306a36Sopenharmony_ci	/* Resources needed for the transport header */
66962306a36Sopenharmony_ci	struct svc_rdma_pullup_data args = {
67062306a36Sopenharmony_ci		.pd_length	= sctxt->sc_hdrbuf.len,
67162306a36Sopenharmony_ci		.pd_num_sges	= 1,
67262306a36Sopenharmony_ci	};
67362306a36Sopenharmony_ci	int ret;
67462306a36Sopenharmony_ci
67562306a36Sopenharmony_ci	ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
67662306a36Sopenharmony_ci				      svc_rdma_xb_count_sges, &args);
67762306a36Sopenharmony_ci	if (ret < 0)
67862306a36Sopenharmony_ci		return false;
67962306a36Sopenharmony_ci
68062306a36Sopenharmony_ci	if (args.pd_length < RPCRDMA_PULLUP_THRESH)
68162306a36Sopenharmony_ci		return true;
68262306a36Sopenharmony_ci	return args.pd_num_sges >= rdma->sc_max_send_sges;
68362306a36Sopenharmony_ci}
68462306a36Sopenharmony_ci
68562306a36Sopenharmony_ci/**
68662306a36Sopenharmony_ci * svc_rdma_xb_linearize - Copy region of xdr_buf to flat buffer
68762306a36Sopenharmony_ci * @xdr: xdr_buf containing portion of an RPC message to copy
68862306a36Sopenharmony_ci * @data: pointer to arguments
68962306a36Sopenharmony_ci *
69062306a36Sopenharmony_ci * Returns:
69162306a36Sopenharmony_ci *   Always zero.
69262306a36Sopenharmony_ci */
69362306a36Sopenharmony_cistatic int svc_rdma_xb_linearize(const struct xdr_buf *xdr,
69462306a36Sopenharmony_ci				 void *data)
69562306a36Sopenharmony_ci{
69662306a36Sopenharmony_ci	struct svc_rdma_pullup_data *args = data;
69762306a36Sopenharmony_ci	unsigned int len, remaining;
69862306a36Sopenharmony_ci	unsigned long pageoff;
69962306a36Sopenharmony_ci	struct page **ppages;
70062306a36Sopenharmony_ci
70162306a36Sopenharmony_ci	if (xdr->head[0].iov_len) {
70262306a36Sopenharmony_ci		memcpy(args->pd_dest, xdr->head[0].iov_base, xdr->head[0].iov_len);
70362306a36Sopenharmony_ci		args->pd_dest += xdr->head[0].iov_len;
70462306a36Sopenharmony_ci	}
70562306a36Sopenharmony_ci
70662306a36Sopenharmony_ci	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
70762306a36Sopenharmony_ci	pageoff = offset_in_page(xdr->page_base);
70862306a36Sopenharmony_ci	remaining = xdr->page_len;
70962306a36Sopenharmony_ci	while (remaining) {
71062306a36Sopenharmony_ci		len = min_t(u32, PAGE_SIZE - pageoff, remaining);
71162306a36Sopenharmony_ci		memcpy(args->pd_dest, page_address(*ppages) + pageoff, len);
71262306a36Sopenharmony_ci		remaining -= len;
71362306a36Sopenharmony_ci		args->pd_dest += len;
71462306a36Sopenharmony_ci		pageoff = 0;
71562306a36Sopenharmony_ci		ppages++;
71662306a36Sopenharmony_ci	}
71762306a36Sopenharmony_ci
71862306a36Sopenharmony_ci	if (xdr->tail[0].iov_len) {
71962306a36Sopenharmony_ci		memcpy(args->pd_dest, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
72062306a36Sopenharmony_ci		args->pd_dest += xdr->tail[0].iov_len;
72162306a36Sopenharmony_ci	}
72262306a36Sopenharmony_ci
72362306a36Sopenharmony_ci	args->pd_length += xdr->len;
72462306a36Sopenharmony_ci	return 0;
72562306a36Sopenharmony_ci}
72662306a36Sopenharmony_ci
72762306a36Sopenharmony_ci/**
72862306a36Sopenharmony_ci * svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer
72962306a36Sopenharmony_ci * @rdma: controlling transport
73062306a36Sopenharmony_ci * @sctxt: send_ctxt for the Send WR; xprt hdr is already prepared
73162306a36Sopenharmony_ci * @rctxt: Write and Reply chunks provided by client
73262306a36Sopenharmony_ci * @xdr: prepared xdr_buf containing RPC message
73362306a36Sopenharmony_ci *
73462306a36Sopenharmony_ci * The device is not capable of sending the reply directly.
73562306a36Sopenharmony_ci * Assemble the elements of @xdr into the transport header buffer.
73662306a36Sopenharmony_ci *
73762306a36Sopenharmony_ci * Assumptions:
73862306a36Sopenharmony_ci *  pull_up_needed has determined that @xdr will fit in the buffer.
73962306a36Sopenharmony_ci *
74062306a36Sopenharmony_ci * Returns:
74162306a36Sopenharmony_ci *   %0 if pull-up was successful
74262306a36Sopenharmony_ci *   %-EMSGSIZE if a buffer manipulation problem occurred
74362306a36Sopenharmony_ci */
74462306a36Sopenharmony_cistatic int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma,
74562306a36Sopenharmony_ci				      struct svc_rdma_send_ctxt *sctxt,
74662306a36Sopenharmony_ci				      const struct svc_rdma_recv_ctxt *rctxt,
74762306a36Sopenharmony_ci				      const struct xdr_buf *xdr)
74862306a36Sopenharmony_ci{
74962306a36Sopenharmony_ci	struct svc_rdma_pullup_data args = {
75062306a36Sopenharmony_ci		.pd_dest	= sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len,
75162306a36Sopenharmony_ci	};
75262306a36Sopenharmony_ci	int ret;
75362306a36Sopenharmony_ci
75462306a36Sopenharmony_ci	ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
75562306a36Sopenharmony_ci				      svc_rdma_xb_linearize, &args);
75662306a36Sopenharmony_ci	if (ret < 0)
75762306a36Sopenharmony_ci		return ret;
75862306a36Sopenharmony_ci
75962306a36Sopenharmony_ci	sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len + args.pd_length;
76062306a36Sopenharmony_ci	trace_svcrdma_send_pullup(sctxt, args.pd_length);
76162306a36Sopenharmony_ci	return 0;
76262306a36Sopenharmony_ci}
76362306a36Sopenharmony_ci
76462306a36Sopenharmony_ci/* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message
76562306a36Sopenharmony_ci * @rdma: controlling transport
76662306a36Sopenharmony_ci * @sctxt: send_ctxt for the Send WR
76762306a36Sopenharmony_ci * @rctxt: Write and Reply chunks provided by client
76862306a36Sopenharmony_ci * @xdr: prepared xdr_buf containing RPC message
76962306a36Sopenharmony_ci *
77062306a36Sopenharmony_ci * Returns:
77162306a36Sopenharmony_ci *   %0 if DMA mapping was successful.
77262306a36Sopenharmony_ci *   %-EMSGSIZE if a buffer manipulation problem occurred
77362306a36Sopenharmony_ci *   %-EIO if DMA mapping failed
77462306a36Sopenharmony_ci *
77562306a36Sopenharmony_ci * The Send WR's num_sge field is set in all cases.
77662306a36Sopenharmony_ci */
77762306a36Sopenharmony_ciint svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
77862306a36Sopenharmony_ci			   struct svc_rdma_send_ctxt *sctxt,
77962306a36Sopenharmony_ci			   const struct svc_rdma_recv_ctxt *rctxt,
78062306a36Sopenharmony_ci			   const struct xdr_buf *xdr)
78162306a36Sopenharmony_ci{
78262306a36Sopenharmony_ci	struct svc_rdma_map_data args = {
78362306a36Sopenharmony_ci		.md_rdma	= rdma,
78462306a36Sopenharmony_ci		.md_ctxt	= sctxt,
78562306a36Sopenharmony_ci	};
78662306a36Sopenharmony_ci
78762306a36Sopenharmony_ci	/* Set up the (persistently-mapped) transport header SGE. */
78862306a36Sopenharmony_ci	sctxt->sc_send_wr.num_sge = 1;
78962306a36Sopenharmony_ci	sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
79062306a36Sopenharmony_ci
79162306a36Sopenharmony_ci	/* If there is a Reply chunk, nothing follows the transport
79262306a36Sopenharmony_ci	 * header, and we're done here.
79362306a36Sopenharmony_ci	 */
79462306a36Sopenharmony_ci	if (!pcl_is_empty(&rctxt->rc_reply_pcl))
79562306a36Sopenharmony_ci		return 0;
79662306a36Sopenharmony_ci
79762306a36Sopenharmony_ci	/* For pull-up, svc_rdma_send() will sync the transport header.
79862306a36Sopenharmony_ci	 * No additional DMA mapping is necessary.
79962306a36Sopenharmony_ci	 */
80062306a36Sopenharmony_ci	if (svc_rdma_pull_up_needed(rdma, sctxt, rctxt, xdr))
80162306a36Sopenharmony_ci		return svc_rdma_pull_up_reply_msg(rdma, sctxt, rctxt, xdr);
80262306a36Sopenharmony_ci
80362306a36Sopenharmony_ci	return pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
80462306a36Sopenharmony_ci				       svc_rdma_xb_dma_map, &args);
80562306a36Sopenharmony_ci}
80662306a36Sopenharmony_ci
80762306a36Sopenharmony_ci/* The svc_rqst and all resources it owns are released as soon as
80862306a36Sopenharmony_ci * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt
80962306a36Sopenharmony_ci * so they are released by the Send completion handler.
81062306a36Sopenharmony_ci */
81162306a36Sopenharmony_cistatic void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
81262306a36Sopenharmony_ci				   struct svc_rdma_send_ctxt *ctxt)
81362306a36Sopenharmony_ci{
81462306a36Sopenharmony_ci	int i, pages = rqstp->rq_next_page - rqstp->rq_respages;
81562306a36Sopenharmony_ci
81662306a36Sopenharmony_ci	ctxt->sc_page_count += pages;
81762306a36Sopenharmony_ci	for (i = 0; i < pages; i++) {
81862306a36Sopenharmony_ci		ctxt->sc_pages[i] = rqstp->rq_respages[i];
81962306a36Sopenharmony_ci		rqstp->rq_respages[i] = NULL;
82062306a36Sopenharmony_ci	}
82162306a36Sopenharmony_ci
82262306a36Sopenharmony_ci	/* Prevent svc_xprt_release from releasing pages in rq_pages */
82362306a36Sopenharmony_ci	rqstp->rq_next_page = rqstp->rq_respages;
82462306a36Sopenharmony_ci}
82562306a36Sopenharmony_ci
82662306a36Sopenharmony_ci/* Prepare the portion of the RPC Reply that will be transmitted
82762306a36Sopenharmony_ci * via RDMA Send. The RPC-over-RDMA transport header is prepared
82862306a36Sopenharmony_ci * in sc_sges[0], and the RPC xdr_buf is prepared in following sges.
82962306a36Sopenharmony_ci *
83062306a36Sopenharmony_ci * Depending on whether a Write list or Reply chunk is present,
83162306a36Sopenharmony_ci * the server may send all, a portion of, or none of the xdr_buf.
83262306a36Sopenharmony_ci * In the latter case, only the transport header (sc_sges[0]) is
83362306a36Sopenharmony_ci * transmitted.
83462306a36Sopenharmony_ci *
83562306a36Sopenharmony_ci * RDMA Send is the last step of transmitting an RPC reply. Pages
83662306a36Sopenharmony_ci * involved in the earlier RDMA Writes are here transferred out
83762306a36Sopenharmony_ci * of the rqstp and into the sctxt's page array. These pages are
83862306a36Sopenharmony_ci * DMA unmapped by each Write completion, but the subsequent Send
83962306a36Sopenharmony_ci * completion finally releases these pages.
84062306a36Sopenharmony_ci *
84162306a36Sopenharmony_ci * Assumptions:
84262306a36Sopenharmony_ci * - The Reply's transport header will never be larger than a page.
84362306a36Sopenharmony_ci */
84462306a36Sopenharmony_cistatic int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
84562306a36Sopenharmony_ci				   struct svc_rdma_send_ctxt *sctxt,
84662306a36Sopenharmony_ci				   const struct svc_rdma_recv_ctxt *rctxt,
84762306a36Sopenharmony_ci				   struct svc_rqst *rqstp)
84862306a36Sopenharmony_ci{
84962306a36Sopenharmony_ci	int ret;
85062306a36Sopenharmony_ci
85162306a36Sopenharmony_ci	ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqstp->rq_res);
85262306a36Sopenharmony_ci	if (ret < 0)
85362306a36Sopenharmony_ci		return ret;
85462306a36Sopenharmony_ci
85562306a36Sopenharmony_ci	svc_rdma_save_io_pages(rqstp, sctxt);
85662306a36Sopenharmony_ci
85762306a36Sopenharmony_ci	if (rctxt->rc_inv_rkey) {
85862306a36Sopenharmony_ci		sctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
85962306a36Sopenharmony_ci		sctxt->sc_send_wr.ex.invalidate_rkey = rctxt->rc_inv_rkey;
86062306a36Sopenharmony_ci	} else {
86162306a36Sopenharmony_ci		sctxt->sc_send_wr.opcode = IB_WR_SEND;
86262306a36Sopenharmony_ci	}
86362306a36Sopenharmony_ci
86462306a36Sopenharmony_ci	return svc_rdma_send(rdma, sctxt);
86562306a36Sopenharmony_ci}
86662306a36Sopenharmony_ci
86762306a36Sopenharmony_ci/**
86862306a36Sopenharmony_ci * svc_rdma_send_error_msg - Send an RPC/RDMA v1 error response
86962306a36Sopenharmony_ci * @rdma: controlling transport context
87062306a36Sopenharmony_ci * @sctxt: Send context for the response
87162306a36Sopenharmony_ci * @rctxt: Receive context for incoming bad message
87262306a36Sopenharmony_ci * @status: negative errno indicating error that occurred
87362306a36Sopenharmony_ci *
87462306a36Sopenharmony_ci * Given the client-provided Read, Write, and Reply chunks, the
87562306a36Sopenharmony_ci * server was not able to parse the Call or form a complete Reply.
87662306a36Sopenharmony_ci * Return an RDMA_ERROR message so the client can retire the RPC
87762306a36Sopenharmony_ci * transaction.
87862306a36Sopenharmony_ci *
87962306a36Sopenharmony_ci * The caller does not have to release @sctxt. It is released by
88062306a36Sopenharmony_ci * Send completion, or by this function on error.
88162306a36Sopenharmony_ci */
88262306a36Sopenharmony_civoid svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
88362306a36Sopenharmony_ci			     struct svc_rdma_send_ctxt *sctxt,
88462306a36Sopenharmony_ci			     struct svc_rdma_recv_ctxt *rctxt,
88562306a36Sopenharmony_ci			     int status)
88662306a36Sopenharmony_ci{
88762306a36Sopenharmony_ci	__be32 *rdma_argp = rctxt->rc_recv_buf;
88862306a36Sopenharmony_ci	__be32 *p;
88962306a36Sopenharmony_ci
89062306a36Sopenharmony_ci	rpcrdma_set_xdrlen(&sctxt->sc_hdrbuf, 0);
89162306a36Sopenharmony_ci	xdr_init_encode(&sctxt->sc_stream, &sctxt->sc_hdrbuf,
89262306a36Sopenharmony_ci			sctxt->sc_xprt_buf, NULL);
89362306a36Sopenharmony_ci
89462306a36Sopenharmony_ci	p = xdr_reserve_space(&sctxt->sc_stream,
89562306a36Sopenharmony_ci			      rpcrdma_fixed_maxsz * sizeof(*p));
89662306a36Sopenharmony_ci	if (!p)
89762306a36Sopenharmony_ci		goto put_ctxt;
89862306a36Sopenharmony_ci
89962306a36Sopenharmony_ci	*p++ = *rdma_argp;
90062306a36Sopenharmony_ci	*p++ = *(rdma_argp + 1);
90162306a36Sopenharmony_ci	*p++ = rdma->sc_fc_credits;
90262306a36Sopenharmony_ci	*p = rdma_error;
90362306a36Sopenharmony_ci
90462306a36Sopenharmony_ci	switch (status) {
90562306a36Sopenharmony_ci	case -EPROTONOSUPPORT:
90662306a36Sopenharmony_ci		p = xdr_reserve_space(&sctxt->sc_stream, 3 * sizeof(*p));
90762306a36Sopenharmony_ci		if (!p)
90862306a36Sopenharmony_ci			goto put_ctxt;
90962306a36Sopenharmony_ci
91062306a36Sopenharmony_ci		*p++ = err_vers;
91162306a36Sopenharmony_ci		*p++ = rpcrdma_version;
91262306a36Sopenharmony_ci		*p = rpcrdma_version;
91362306a36Sopenharmony_ci		trace_svcrdma_err_vers(*rdma_argp);
91462306a36Sopenharmony_ci		break;
91562306a36Sopenharmony_ci	default:
91662306a36Sopenharmony_ci		p = xdr_reserve_space(&sctxt->sc_stream, sizeof(*p));
91762306a36Sopenharmony_ci		if (!p)
91862306a36Sopenharmony_ci			goto put_ctxt;
91962306a36Sopenharmony_ci
92062306a36Sopenharmony_ci		*p = err_chunk;
92162306a36Sopenharmony_ci		trace_svcrdma_err_chunk(*rdma_argp);
92262306a36Sopenharmony_ci	}
92362306a36Sopenharmony_ci
92462306a36Sopenharmony_ci	/* Remote Invalidation is skipped for simplicity. */
92562306a36Sopenharmony_ci	sctxt->sc_send_wr.num_sge = 1;
92662306a36Sopenharmony_ci	sctxt->sc_send_wr.opcode = IB_WR_SEND;
92762306a36Sopenharmony_ci	sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
92862306a36Sopenharmony_ci	if (svc_rdma_send(rdma, sctxt))
92962306a36Sopenharmony_ci		goto put_ctxt;
93062306a36Sopenharmony_ci	return;
93162306a36Sopenharmony_ci
93262306a36Sopenharmony_ciput_ctxt:
93362306a36Sopenharmony_ci	svc_rdma_send_ctxt_put(rdma, sctxt);
93462306a36Sopenharmony_ci}
93562306a36Sopenharmony_ci
93662306a36Sopenharmony_ci/**
93762306a36Sopenharmony_ci * svc_rdma_sendto - Transmit an RPC reply
93862306a36Sopenharmony_ci * @rqstp: processed RPC request, reply XDR already in ::rq_res
93962306a36Sopenharmony_ci *
94062306a36Sopenharmony_ci * Any resources still associated with @rqstp are released upon return.
94162306a36Sopenharmony_ci * If no reply message was possible, the connection is closed.
94262306a36Sopenharmony_ci *
94362306a36Sopenharmony_ci * Returns:
94462306a36Sopenharmony_ci *	%0 if an RPC reply has been successfully posted,
94562306a36Sopenharmony_ci *	%-ENOMEM if a resource shortage occurred (connection is lost),
94662306a36Sopenharmony_ci *	%-ENOTCONN if posting failed (connection is lost).
94762306a36Sopenharmony_ci */
94862306a36Sopenharmony_ciint svc_rdma_sendto(struct svc_rqst *rqstp)
94962306a36Sopenharmony_ci{
95062306a36Sopenharmony_ci	struct svc_xprt *xprt = rqstp->rq_xprt;
95162306a36Sopenharmony_ci	struct svcxprt_rdma *rdma =
95262306a36Sopenharmony_ci		container_of(xprt, struct svcxprt_rdma, sc_xprt);
95362306a36Sopenharmony_ci	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
95462306a36Sopenharmony_ci	__be32 *rdma_argp = rctxt->rc_recv_buf;
95562306a36Sopenharmony_ci	struct svc_rdma_send_ctxt *sctxt;
95662306a36Sopenharmony_ci	unsigned int rc_size;
95762306a36Sopenharmony_ci	__be32 *p;
95862306a36Sopenharmony_ci	int ret;
95962306a36Sopenharmony_ci
96062306a36Sopenharmony_ci	ret = -ENOTCONN;
96162306a36Sopenharmony_ci	if (svc_xprt_is_dead(xprt))
96262306a36Sopenharmony_ci		goto drop_connection;
96362306a36Sopenharmony_ci
96462306a36Sopenharmony_ci	ret = -ENOMEM;
96562306a36Sopenharmony_ci	sctxt = svc_rdma_send_ctxt_get(rdma);
96662306a36Sopenharmony_ci	if (!sctxt)
96762306a36Sopenharmony_ci		goto drop_connection;
96862306a36Sopenharmony_ci
96962306a36Sopenharmony_ci	ret = -EMSGSIZE;
97062306a36Sopenharmony_ci	p = xdr_reserve_space(&sctxt->sc_stream,
97162306a36Sopenharmony_ci			      rpcrdma_fixed_maxsz * sizeof(*p));
97262306a36Sopenharmony_ci	if (!p)
97362306a36Sopenharmony_ci		goto put_ctxt;
97462306a36Sopenharmony_ci
97562306a36Sopenharmony_ci	ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
97662306a36Sopenharmony_ci	if (ret < 0)
97762306a36Sopenharmony_ci		goto reply_chunk;
97862306a36Sopenharmony_ci	rc_size = ret;
97962306a36Sopenharmony_ci
98062306a36Sopenharmony_ci	*p++ = *rdma_argp;
98162306a36Sopenharmony_ci	*p++ = *(rdma_argp + 1);
98262306a36Sopenharmony_ci	*p++ = rdma->sc_fc_credits;
98362306a36Sopenharmony_ci	*p = pcl_is_empty(&rctxt->rc_reply_pcl) ? rdma_msg : rdma_nomsg;
98462306a36Sopenharmony_ci
98562306a36Sopenharmony_ci	ret = svc_rdma_encode_read_list(sctxt);
98662306a36Sopenharmony_ci	if (ret < 0)
98762306a36Sopenharmony_ci		goto put_ctxt;
98862306a36Sopenharmony_ci	ret = svc_rdma_encode_write_list(rctxt, sctxt);
98962306a36Sopenharmony_ci	if (ret < 0)
99062306a36Sopenharmony_ci		goto put_ctxt;
99162306a36Sopenharmony_ci	ret = svc_rdma_encode_reply_chunk(rctxt, sctxt, rc_size);
99262306a36Sopenharmony_ci	if (ret < 0)
99362306a36Sopenharmony_ci		goto put_ctxt;
99462306a36Sopenharmony_ci
99562306a36Sopenharmony_ci	ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp);
99662306a36Sopenharmony_ci	if (ret < 0)
99762306a36Sopenharmony_ci		goto put_ctxt;
99862306a36Sopenharmony_ci	return 0;
99962306a36Sopenharmony_ci
100062306a36Sopenharmony_cireply_chunk:
100162306a36Sopenharmony_ci	if (ret != -E2BIG && ret != -EINVAL)
100262306a36Sopenharmony_ci		goto put_ctxt;
100362306a36Sopenharmony_ci
100462306a36Sopenharmony_ci	/* Send completion releases payload pages that were part
100562306a36Sopenharmony_ci	 * of previously posted RDMA Writes.
100662306a36Sopenharmony_ci	 */
100762306a36Sopenharmony_ci	svc_rdma_save_io_pages(rqstp, sctxt);
100862306a36Sopenharmony_ci	svc_rdma_send_error_msg(rdma, sctxt, rctxt, ret);
100962306a36Sopenharmony_ci	return 0;
101062306a36Sopenharmony_ci
101162306a36Sopenharmony_ciput_ctxt:
101262306a36Sopenharmony_ci	svc_rdma_send_ctxt_put(rdma, sctxt);
101362306a36Sopenharmony_cidrop_connection:
101462306a36Sopenharmony_ci	trace_svcrdma_send_err(rqstp, ret);
101562306a36Sopenharmony_ci	svc_xprt_deferred_close(&rdma->sc_xprt);
101662306a36Sopenharmony_ci	return -ENOTCONN;
101762306a36Sopenharmony_ci}
101862306a36Sopenharmony_ci
101962306a36Sopenharmony_ci/**
102062306a36Sopenharmony_ci * svc_rdma_result_payload - special processing for a result payload
102162306a36Sopenharmony_ci * @rqstp: svc_rqst to operate on
102262306a36Sopenharmony_ci * @offset: payload's byte offset in @xdr
102362306a36Sopenharmony_ci * @length: size of payload, in bytes
102462306a36Sopenharmony_ci *
102562306a36Sopenharmony_ci * Return values:
102662306a36Sopenharmony_ci *   %0 if successful or nothing needed to be done
102762306a36Sopenharmony_ci *   %-EMSGSIZE on XDR buffer overflow
102862306a36Sopenharmony_ci *   %-E2BIG if the payload was larger than the Write chunk
102962306a36Sopenharmony_ci *   %-EINVAL if client provided too many segments
103062306a36Sopenharmony_ci *   %-ENOMEM if rdma_rw context pool was exhausted
103162306a36Sopenharmony_ci *   %-ENOTCONN if posting failed (connection is lost)
103262306a36Sopenharmony_ci *   %-EIO if rdma_rw initialization failed (DMA mapping, etc)
103362306a36Sopenharmony_ci */
103462306a36Sopenharmony_ciint svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
103562306a36Sopenharmony_ci			    unsigned int length)
103662306a36Sopenharmony_ci{
103762306a36Sopenharmony_ci	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
103862306a36Sopenharmony_ci	struct svc_rdma_chunk *chunk;
103962306a36Sopenharmony_ci	struct svcxprt_rdma *rdma;
104062306a36Sopenharmony_ci	struct xdr_buf subbuf;
104162306a36Sopenharmony_ci	int ret;
104262306a36Sopenharmony_ci
104362306a36Sopenharmony_ci	chunk = rctxt->rc_cur_result_payload;
104462306a36Sopenharmony_ci	if (!length || !chunk)
104562306a36Sopenharmony_ci		return 0;
104662306a36Sopenharmony_ci	rctxt->rc_cur_result_payload =
104762306a36Sopenharmony_ci		pcl_next_chunk(&rctxt->rc_write_pcl, chunk);
104862306a36Sopenharmony_ci	if (length > chunk->ch_length)
104962306a36Sopenharmony_ci		return -E2BIG;
105062306a36Sopenharmony_ci
105162306a36Sopenharmony_ci	chunk->ch_position = offset;
105262306a36Sopenharmony_ci	chunk->ch_payload_length = length;
105362306a36Sopenharmony_ci
105462306a36Sopenharmony_ci	if (xdr_buf_subsegment(&rqstp->rq_res, &subbuf, offset, length))
105562306a36Sopenharmony_ci		return -EMSGSIZE;
105662306a36Sopenharmony_ci
105762306a36Sopenharmony_ci	rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
105862306a36Sopenharmony_ci	ret = svc_rdma_send_write_chunk(rdma, chunk, &subbuf);
105962306a36Sopenharmony_ci	if (ret < 0)
106062306a36Sopenharmony_ci		return ret;
106162306a36Sopenharmony_ci	return 0;
106262306a36Sopenharmony_ci}
1063