1// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 2/* 3 * Copyright (c) 2014-2017 Oracle. All rights reserved. 4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 5 * 6 * This software is available to you under a choice of one of two 7 * licenses. You may choose to be licensed under the terms of the GNU 8 * General Public License (GPL) Version 2, available from the file 9 * COPYING in the main directory of this source tree, or the BSD-type 10 * license below: 11 * 12 * Redistribution and use in source and binary forms, with or without 13 * modification, are permitted provided that the following conditions 14 * are met: 15 * 16 * Redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer. 18 * 19 * Redistributions in binary form must reproduce the above 20 * copyright notice, this list of conditions and the following 21 * disclaimer in the documentation and/or other materials provided 22 * with the distribution. 23 * 24 * Neither the name of the Network Appliance, Inc. nor the names of 25 * its contributors may be used to endorse or promote products 26 * derived from this software without specific prior written 27 * permission. 28 * 29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 40 */ 41 42/* 43 * rpc_rdma.c 44 * 45 * This file contains the guts of the RPC RDMA protocol, and 46 * does marshaling/unmarshaling, etc. It is also where interfacing 47 * to the Linux RPC framework lives. 48 */ 49 50#include <linux/highmem.h> 51 52#include <linux/sunrpc/svc_rdma.h> 53 54#include "xprt_rdma.h" 55#include <trace/events/rpcrdma.h> 56 57#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 58# define RPCDBG_FACILITY RPCDBG_TRANS 59#endif 60 61/* Returns size of largest RPC-over-RDMA header in a Call message 62 * 63 * The largest Call header contains a full-size Read list and a 64 * minimal Reply chunk. 65 */ 66static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) 67{ 68 unsigned int size; 69 70 /* Fixed header fields and list discriminators */ 71 size = RPCRDMA_HDRLEN_MIN; 72 73 /* Maximum Read list size */ 74 size += maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32); 75 76 /* Minimal Read chunk size */ 77 size += sizeof(__be32); /* segment count */ 78 size += rpcrdma_segment_maxsz * sizeof(__be32); 79 size += sizeof(__be32); /* list discriminator */ 80 81 return size; 82} 83 84/* Returns size of largest RPC-over-RDMA header in a Reply message 85 * 86 * There is only one Write list or one Reply chunk per Reply 87 * message. The larger list is the Write list. 88 */ 89static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) 90{ 91 unsigned int size; 92 93 /* Fixed header fields and list discriminators */ 94 size = RPCRDMA_HDRLEN_MIN; 95 96 /* Maximum Write list size */ 97 size += sizeof(__be32); /* segment count */ 98 size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32); 99 size += sizeof(__be32); /* list discriminator */ 100 101 return size; 102} 103 104/** 105 * rpcrdma_set_max_header_sizes - Initialize inline payload sizes 106 * @ep: endpoint to initialize 107 * 108 * The max_inline fields contain the maximum size of an RPC message 109 * so the marshaling code doesn't have to repeat this calculation 110 * for every RPC. 111 */ 112void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep) 113{ 114 unsigned int maxsegs = ep->re_max_rdma_segs; 115 116 ep->re_max_inline_send = 117 ep->re_inline_send - rpcrdma_max_call_header_size(maxsegs); 118 ep->re_max_inline_recv = 119 ep->re_inline_recv - rpcrdma_max_reply_header_size(maxsegs); 120} 121 122/* The client can send a request inline as long as the RPCRDMA header 123 * plus the RPC call fit under the transport's inline limit. If the 124 * combined call message size exceeds that limit, the client must use 125 * a Read chunk for this operation. 126 * 127 * A Read chunk is also required if sending the RPC call inline would 128 * exceed this device's max_sge limit. 129 */ 130static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, 131 struct rpc_rqst *rqst) 132{ 133 struct xdr_buf *xdr = &rqst->rq_snd_buf; 134 struct rpcrdma_ep *ep = r_xprt->rx_ep; 135 unsigned int count, remaining, offset; 136 137 if (xdr->len > ep->re_max_inline_send) 138 return false; 139 140 if (xdr->page_len) { 141 remaining = xdr->page_len; 142 offset = offset_in_page(xdr->page_base); 143 count = RPCRDMA_MIN_SEND_SGES; 144 while (remaining) { 145 remaining -= min_t(unsigned int, 146 PAGE_SIZE - offset, remaining); 147 offset = 0; 148 if (++count > ep->re_attr.cap.max_send_sge) 149 return false; 150 } 151 } 152 153 return true; 154} 155 156/* The client can't know how large the actual reply will be. Thus it 157 * plans for the largest possible reply for that particular ULP 158 * operation. If the maximum combined reply message size exceeds that 159 * limit, the client must provide a write list or a reply chunk for 160 * this request. 161 */ 162static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, 163 struct rpc_rqst *rqst) 164{ 165 return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep->re_max_inline_recv; 166} 167 168/* The client is required to provide a Reply chunk if the maximum 169 * size of the non-payload part of the RPC Reply is larger than 170 * the inline threshold. 171 */ 172static bool 173rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt, 174 const struct rpc_rqst *rqst) 175{ 176 const struct xdr_buf *buf = &rqst->rq_rcv_buf; 177 178 return (buf->head[0].iov_len + buf->tail[0].iov_len) < 179 r_xprt->rx_ep->re_max_inline_recv; 180} 181 182/* ACL likes to be lazy in allocating pages. For TCP, these 183 * pages can be allocated during receive processing. Not true 184 * for RDMA, which must always provision receive buffers 185 * up front. 186 */ 187static noinline int 188rpcrdma_alloc_sparse_pages(struct xdr_buf *buf) 189{ 190 struct page **ppages; 191 int len; 192 193 len = buf->page_len; 194 ppages = buf->pages + (buf->page_base >> PAGE_SHIFT); 195 while (len > 0) { 196 if (!*ppages) 197 *ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN); 198 if (!*ppages) 199 return -ENOBUFS; 200 ppages++; 201 len -= PAGE_SIZE; 202 } 203 204 return 0; 205} 206 207/* Split @vec on page boundaries into SGEs. FMR registers pages, not 208 * a byte range. Other modes coalesce these SGEs into a single MR 209 * when they can. 210 * 211 * Returns pointer to next available SGE, and bumps the total number 212 * of SGEs consumed. 213 */ 214static struct rpcrdma_mr_seg * 215rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, 216 unsigned int *n) 217{ 218 u32 remaining, page_offset; 219 char *base; 220 221 base = vec->iov_base; 222 page_offset = offset_in_page(base); 223 remaining = vec->iov_len; 224 while (remaining) { 225 seg->mr_page = NULL; 226 seg->mr_offset = base; 227 seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); 228 remaining -= seg->mr_len; 229 base += seg->mr_len; 230 ++seg; 231 ++(*n); 232 page_offset = 0; 233 } 234 return seg; 235} 236 237/* Convert @xdrbuf into SGEs no larger than a page each. As they 238 * are registered, these SGEs are then coalesced into RDMA segments 239 * when the selected memreg mode supports it. 240 * 241 * Returns positive number of SGEs consumed, or a negative errno. 242 */ 243 244static int 245rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf, 246 unsigned int pos, enum rpcrdma_chunktype type, 247 struct rpcrdma_mr_seg *seg) 248{ 249 unsigned long page_base; 250 unsigned int len, n; 251 struct page **ppages; 252 253 n = 0; 254 if (pos == 0) 255 seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n); 256 257 len = xdrbuf->page_len; 258 ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); 259 page_base = offset_in_page(xdrbuf->page_base); 260 while (len) { 261 seg->mr_page = *ppages; 262 seg->mr_offset = (char *)page_base; 263 seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len); 264 len -= seg->mr_len; 265 ++ppages; 266 ++seg; 267 ++n; 268 page_base = 0; 269 } 270 271 /* When encoding a Read chunk, the tail iovec contains an 272 * XDR pad and may be omitted. 273 */ 274 if (type == rpcrdma_readch && r_xprt->rx_ep->re_implicit_roundup) 275 goto out; 276 277 /* When encoding a Write chunk, some servers need to see an 278 * extra segment for non-XDR-aligned Write chunks. The upper 279 * layer provides space in the tail iovec that may be used 280 * for this purpose. 281 */ 282 if (type == rpcrdma_writech && r_xprt->rx_ep->re_implicit_roundup) 283 goto out; 284 285 if (xdrbuf->tail[0].iov_len) 286 seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n); 287 288out: 289 if (unlikely(n > RPCRDMA_MAX_SEGS)) 290 return -EIO; 291 return n; 292} 293 294static int 295encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr) 296{ 297 __be32 *p; 298 299 p = xdr_reserve_space(xdr, 4 * sizeof(*p)); 300 if (unlikely(!p)) 301 return -EMSGSIZE; 302 303 xdr_encode_rdma_segment(p, mr->mr_handle, mr->mr_length, mr->mr_offset); 304 return 0; 305} 306 307static int 308encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, 309 u32 position) 310{ 311 __be32 *p; 312 313 p = xdr_reserve_space(xdr, 6 * sizeof(*p)); 314 if (unlikely(!p)) 315 return -EMSGSIZE; 316 317 *p++ = xdr_one; /* Item present */ 318 xdr_encode_read_segment(p, position, mr->mr_handle, mr->mr_length, 319 mr->mr_offset); 320 return 0; 321} 322 323static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt, 324 struct rpcrdma_req *req, 325 struct rpcrdma_mr_seg *seg, 326 int nsegs, bool writing, 327 struct rpcrdma_mr **mr) 328{ 329 *mr = rpcrdma_mr_pop(&req->rl_free_mrs); 330 if (!*mr) { 331 *mr = rpcrdma_mr_get(r_xprt); 332 if (!*mr) 333 goto out_getmr_err; 334 trace_xprtrdma_mr_get(req); 335 (*mr)->mr_req = req; 336 } 337 338 rpcrdma_mr_push(*mr, &req->rl_registered); 339 return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr); 340 341out_getmr_err: 342 trace_xprtrdma_nomrs(req); 343 xprt_wait_for_buffer_space(&r_xprt->rx_xprt); 344 rpcrdma_mrs_refresh(r_xprt); 345 return ERR_PTR(-EAGAIN); 346} 347 348/* Register and XDR encode the Read list. Supports encoding a list of read 349 * segments that belong to a single read chunk. 350 * 351 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 352 * 353 * Read chunklist (a linked list): 354 * N elements, position P (same P for all chunks of same arg!): 355 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 356 * 357 * Returns zero on success, or a negative errno if a failure occurred. 358 * @xdr is advanced to the next position in the stream. 359 * 360 * Only a single @pos value is currently supported. 361 */ 362static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, 363 struct rpcrdma_req *req, 364 struct rpc_rqst *rqst, 365 enum rpcrdma_chunktype rtype) 366{ 367 struct xdr_stream *xdr = &req->rl_stream; 368 struct rpcrdma_mr_seg *seg; 369 struct rpcrdma_mr *mr; 370 unsigned int pos; 371 int nsegs; 372 373 if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped) 374 goto done; 375 376 pos = rqst->rq_snd_buf.head[0].iov_len; 377 if (rtype == rpcrdma_areadch) 378 pos = 0; 379 seg = req->rl_segments; 380 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos, 381 rtype, seg); 382 if (nsegs < 0) 383 return nsegs; 384 385 do { 386 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr); 387 if (IS_ERR(seg)) 388 return PTR_ERR(seg); 389 390 if (encode_read_segment(xdr, mr, pos) < 0) 391 return -EMSGSIZE; 392 393 trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs); 394 r_xprt->rx_stats.read_chunk_count++; 395 nsegs -= mr->mr_nents; 396 } while (nsegs); 397 398done: 399 if (xdr_stream_encode_item_absent(xdr) < 0) 400 return -EMSGSIZE; 401 return 0; 402} 403 404/* Register and XDR encode the Write list. Supports encoding a list 405 * containing one array of plain segments that belong to a single 406 * write chunk. 407 * 408 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 409 * 410 * Write chunklist (a list of (one) counted array): 411 * N elements: 412 * 1 - N - HLOO - HLOO - ... - HLOO - 0 413 * 414 * Returns zero on success, or a negative errno if a failure occurred. 415 * @xdr is advanced to the next position in the stream. 416 * 417 * Only a single Write chunk is currently supported. 418 */ 419static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, 420 struct rpcrdma_req *req, 421 struct rpc_rqst *rqst, 422 enum rpcrdma_chunktype wtype) 423{ 424 struct xdr_stream *xdr = &req->rl_stream; 425 struct rpcrdma_mr_seg *seg; 426 struct rpcrdma_mr *mr; 427 int nsegs, nchunks; 428 __be32 *segcount; 429 430 if (wtype != rpcrdma_writech) 431 goto done; 432 433 seg = req->rl_segments; 434 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 435 rqst->rq_rcv_buf.head[0].iov_len, 436 wtype, seg); 437 if (nsegs < 0) 438 return nsegs; 439 440 if (xdr_stream_encode_item_present(xdr) < 0) 441 return -EMSGSIZE; 442 segcount = xdr_reserve_space(xdr, sizeof(*segcount)); 443 if (unlikely(!segcount)) 444 return -EMSGSIZE; 445 /* Actual value encoded below */ 446 447 nchunks = 0; 448 do { 449 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); 450 if (IS_ERR(seg)) 451 return PTR_ERR(seg); 452 453 if (encode_rdma_segment(xdr, mr) < 0) 454 return -EMSGSIZE; 455 456 trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs); 457 r_xprt->rx_stats.write_chunk_count++; 458 r_xprt->rx_stats.total_rdma_request += mr->mr_length; 459 nchunks++; 460 nsegs -= mr->mr_nents; 461 } while (nsegs); 462 463 /* Update count of segments in this Write chunk */ 464 *segcount = cpu_to_be32(nchunks); 465 466done: 467 if (xdr_stream_encode_item_absent(xdr) < 0) 468 return -EMSGSIZE; 469 return 0; 470} 471 472/* Register and XDR encode the Reply chunk. Supports encoding an array 473 * of plain segments that belong to a single write (reply) chunk. 474 * 475 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 476 * 477 * Reply chunk (a counted array): 478 * N elements: 479 * 1 - N - HLOO - HLOO - ... - HLOO 480 * 481 * Returns zero on success, or a negative errno if a failure occurred. 482 * @xdr is advanced to the next position in the stream. 483 */ 484static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, 485 struct rpcrdma_req *req, 486 struct rpc_rqst *rqst, 487 enum rpcrdma_chunktype wtype) 488{ 489 struct xdr_stream *xdr = &req->rl_stream; 490 struct rpcrdma_mr_seg *seg; 491 struct rpcrdma_mr *mr; 492 int nsegs, nchunks; 493 __be32 *segcount; 494 495 if (wtype != rpcrdma_replych) { 496 if (xdr_stream_encode_item_absent(xdr) < 0) 497 return -EMSGSIZE; 498 return 0; 499 } 500 501 seg = req->rl_segments; 502 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg); 503 if (nsegs < 0) 504 return nsegs; 505 506 if (xdr_stream_encode_item_present(xdr) < 0) 507 return -EMSGSIZE; 508 segcount = xdr_reserve_space(xdr, sizeof(*segcount)); 509 if (unlikely(!segcount)) 510 return -EMSGSIZE; 511 /* Actual value encoded below */ 512 513 nchunks = 0; 514 do { 515 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); 516 if (IS_ERR(seg)) 517 return PTR_ERR(seg); 518 519 if (encode_rdma_segment(xdr, mr) < 0) 520 return -EMSGSIZE; 521 522 trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs); 523 r_xprt->rx_stats.reply_chunk_count++; 524 r_xprt->rx_stats.total_rdma_request += mr->mr_length; 525 nchunks++; 526 nsegs -= mr->mr_nents; 527 } while (nsegs); 528 529 /* Update count of segments in the Reply chunk */ 530 *segcount = cpu_to_be32(nchunks); 531 532 return 0; 533} 534 535static void rpcrdma_sendctx_done(struct kref *kref) 536{ 537 struct rpcrdma_req *req = 538 container_of(kref, struct rpcrdma_req, rl_kref); 539 struct rpcrdma_rep *rep = req->rl_reply; 540 541 rpcrdma_complete_rqst(rep); 542 rep->rr_rxprt->rx_stats.reply_waits_for_send++; 543} 544 545/** 546 * rpcrdma_sendctx_unmap - DMA-unmap Send buffer 547 * @sc: sendctx containing SGEs to unmap 548 * 549 */ 550void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) 551{ 552 struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf; 553 struct ib_sge *sge; 554 555 if (!sc->sc_unmap_count) 556 return; 557 558 /* The first two SGEs contain the transport header and 559 * the inline buffer. These are always left mapped so 560 * they can be cheaply re-used. 561 */ 562 for (sge = &sc->sc_sges[2]; sc->sc_unmap_count; 563 ++sge, --sc->sc_unmap_count) 564 ib_dma_unmap_page(rdmab_device(rb), sge->addr, sge->length, 565 DMA_TO_DEVICE); 566 567 kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done); 568} 569 570/* Prepare an SGE for the RPC-over-RDMA transport header. 571 */ 572static void rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt, 573 struct rpcrdma_req *req, u32 len) 574{ 575 struct rpcrdma_sendctx *sc = req->rl_sendctx; 576 struct rpcrdma_regbuf *rb = req->rl_rdmabuf; 577 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; 578 579 sge->addr = rdmab_addr(rb); 580 sge->length = len; 581 sge->lkey = rdmab_lkey(rb); 582 583 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, 584 DMA_TO_DEVICE); 585} 586 587/* The head iovec is straightforward, as it is usually already 588 * DMA-mapped. Sync the content that has changed. 589 */ 590static bool rpcrdma_prepare_head_iov(struct rpcrdma_xprt *r_xprt, 591 struct rpcrdma_req *req, unsigned int len) 592{ 593 struct rpcrdma_sendctx *sc = req->rl_sendctx; 594 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; 595 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 596 597 if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) 598 return false; 599 600 sge->addr = rdmab_addr(rb); 601 sge->length = len; 602 sge->lkey = rdmab_lkey(rb); 603 604 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, 605 DMA_TO_DEVICE); 606 return true; 607} 608 609/* If there is a page list present, DMA map and prepare an 610 * SGE for each page to be sent. 611 */ 612static bool rpcrdma_prepare_pagelist(struct rpcrdma_req *req, 613 struct xdr_buf *xdr) 614{ 615 struct rpcrdma_sendctx *sc = req->rl_sendctx; 616 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 617 unsigned int page_base, len, remaining; 618 struct page **ppages; 619 struct ib_sge *sge; 620 621 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 622 page_base = offset_in_page(xdr->page_base); 623 remaining = xdr->page_len; 624 while (remaining) { 625 sge = &sc->sc_sges[req->rl_wr.num_sge++]; 626 len = min_t(unsigned int, PAGE_SIZE - page_base, remaining); 627 sge->addr = ib_dma_map_page(rdmab_device(rb), *ppages, 628 page_base, len, DMA_TO_DEVICE); 629 if (ib_dma_mapping_error(rdmab_device(rb), sge->addr)) 630 goto out_mapping_err; 631 632 sge->length = len; 633 sge->lkey = rdmab_lkey(rb); 634 635 sc->sc_unmap_count++; 636 ppages++; 637 remaining -= len; 638 page_base = 0; 639 } 640 641 return true; 642 643out_mapping_err: 644 trace_xprtrdma_dma_maperr(sge->addr); 645 return false; 646} 647 648/* The tail iovec may include an XDR pad for the page list, 649 * as well as additional content, and may not reside in the 650 * same page as the head iovec. 651 */ 652static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req, 653 struct xdr_buf *xdr, 654 unsigned int page_base, unsigned int len) 655{ 656 struct rpcrdma_sendctx *sc = req->rl_sendctx; 657 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; 658 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 659 struct page *page = virt_to_page(xdr->tail[0].iov_base); 660 661 sge->addr = ib_dma_map_page(rdmab_device(rb), page, page_base, len, 662 DMA_TO_DEVICE); 663 if (ib_dma_mapping_error(rdmab_device(rb), sge->addr)) 664 goto out_mapping_err; 665 666 sge->length = len; 667 sge->lkey = rdmab_lkey(rb); 668 ++sc->sc_unmap_count; 669 return true; 670 671out_mapping_err: 672 trace_xprtrdma_dma_maperr(sge->addr); 673 return false; 674} 675 676/* Copy the tail to the end of the head buffer. 677 */ 678static void rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt, 679 struct rpcrdma_req *req, 680 struct xdr_buf *xdr) 681{ 682 unsigned char *dst; 683 684 dst = (unsigned char *)xdr->head[0].iov_base; 685 dst += xdr->head[0].iov_len + xdr->page_len; 686 memmove(dst, xdr->tail[0].iov_base, xdr->tail[0].iov_len); 687 r_xprt->rx_stats.pullup_copy_count += xdr->tail[0].iov_len; 688} 689 690/* Copy pagelist content into the head buffer. 691 */ 692static void rpcrdma_pullup_pagelist(struct rpcrdma_xprt *r_xprt, 693 struct rpcrdma_req *req, 694 struct xdr_buf *xdr) 695{ 696 unsigned int len, page_base, remaining; 697 struct page **ppages; 698 unsigned char *src, *dst; 699 700 dst = (unsigned char *)xdr->head[0].iov_base; 701 dst += xdr->head[0].iov_len; 702 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 703 page_base = offset_in_page(xdr->page_base); 704 remaining = xdr->page_len; 705 while (remaining) { 706 src = page_address(*ppages); 707 src += page_base; 708 len = min_t(unsigned int, PAGE_SIZE - page_base, remaining); 709 memcpy(dst, src, len); 710 r_xprt->rx_stats.pullup_copy_count += len; 711 712 ppages++; 713 dst += len; 714 remaining -= len; 715 page_base = 0; 716 } 717} 718 719/* Copy the contents of @xdr into @rl_sendbuf and DMA sync it. 720 * When the head, pagelist, and tail are small, a pull-up copy 721 * is considerably less costly than DMA mapping the components 722 * of @xdr. 723 * 724 * Assumptions: 725 * - the caller has already verified that the total length 726 * of the RPC Call body will fit into @rl_sendbuf. 727 */ 728static bool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt, 729 struct rpcrdma_req *req, 730 struct xdr_buf *xdr) 731{ 732 if (unlikely(xdr->tail[0].iov_len)) 733 rpcrdma_pullup_tail_iov(r_xprt, req, xdr); 734 735 if (unlikely(xdr->page_len)) 736 rpcrdma_pullup_pagelist(r_xprt, req, xdr); 737 738 /* The whole RPC message resides in the head iovec now */ 739 return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len); 740} 741 742static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt, 743 struct rpcrdma_req *req, 744 struct xdr_buf *xdr) 745{ 746 struct kvec *tail = &xdr->tail[0]; 747 748 if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) 749 return false; 750 if (xdr->page_len) 751 if (!rpcrdma_prepare_pagelist(req, xdr)) 752 return false; 753 if (tail->iov_len) 754 if (!rpcrdma_prepare_tail_iov(req, xdr, 755 offset_in_page(tail->iov_base), 756 tail->iov_len)) 757 return false; 758 759 if (req->rl_sendctx->sc_unmap_count) 760 kref_get(&req->rl_kref); 761 return true; 762} 763 764static bool rpcrdma_prepare_readch(struct rpcrdma_xprt *r_xprt, 765 struct rpcrdma_req *req, 766 struct xdr_buf *xdr) 767{ 768 if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) 769 return false; 770 771 /* If there is a Read chunk, the page list is being handled 772 * via explicit RDMA, and thus is skipped here. 773 */ 774 775 /* Do not include the tail if it is only an XDR pad */ 776 if (xdr->tail[0].iov_len > 3) { 777 unsigned int page_base, len; 778 779 /* If the content in the page list is an odd length, 780 * xdr_write_pages() adds a pad at the beginning of 781 * the tail iovec. Force the tail's non-pad content to 782 * land at the next XDR position in the Send message. 783 */ 784 page_base = offset_in_page(xdr->tail[0].iov_base); 785 len = xdr->tail[0].iov_len; 786 page_base += len & 3; 787 len -= len & 3; 788 if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len)) 789 return false; 790 kref_get(&req->rl_kref); 791 } 792 793 return true; 794} 795 796/** 797 * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR 798 * @r_xprt: controlling transport 799 * @req: context of RPC Call being marshalled 800 * @hdrlen: size of transport header, in bytes 801 * @xdr: xdr_buf containing RPC Call 802 * @rtype: chunk type being encoded 803 * 804 * Returns 0 on success; otherwise a negative errno is returned. 805 */ 806inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, 807 struct rpcrdma_req *req, u32 hdrlen, 808 struct xdr_buf *xdr, 809 enum rpcrdma_chunktype rtype) 810{ 811 int ret; 812 813 ret = -EAGAIN; 814 req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt); 815 if (!req->rl_sendctx) 816 goto out_nosc; 817 req->rl_sendctx->sc_unmap_count = 0; 818 req->rl_sendctx->sc_req = req; 819 kref_init(&req->rl_kref); 820 req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe; 821 req->rl_wr.sg_list = req->rl_sendctx->sc_sges; 822 req->rl_wr.num_sge = 0; 823 req->rl_wr.opcode = IB_WR_SEND; 824 825 rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen); 826 827 ret = -EIO; 828 switch (rtype) { 829 case rpcrdma_noch_pullup: 830 if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr)) 831 goto out_unmap; 832 break; 833 case rpcrdma_noch_mapped: 834 if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr)) 835 goto out_unmap; 836 break; 837 case rpcrdma_readch: 838 if (!rpcrdma_prepare_readch(r_xprt, req, xdr)) 839 goto out_unmap; 840 break; 841 case rpcrdma_areadch: 842 break; 843 default: 844 goto out_unmap; 845 } 846 847 return 0; 848 849out_unmap: 850 rpcrdma_sendctx_unmap(req->rl_sendctx); 851out_nosc: 852 trace_xprtrdma_prepsend_failed(&req->rl_slot, ret); 853 return ret; 854} 855 856/** 857 * rpcrdma_marshal_req - Marshal and send one RPC request 858 * @r_xprt: controlling transport 859 * @rqst: RPC request to be marshaled 860 * 861 * For the RPC in "rqst", this function: 862 * - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG) 863 * - Registers Read, Write, and Reply chunks 864 * - Constructs the transport header 865 * - Posts a Send WR to send the transport header and request 866 * 867 * Returns: 868 * %0 if the RPC was sent successfully, 869 * %-ENOTCONN if the connection was lost, 870 * %-EAGAIN if the caller should call again with the same arguments, 871 * %-ENOBUFS if the caller should call again after a delay, 872 * %-EMSGSIZE if the transport header is too small, 873 * %-EIO if a permanent problem occurred while marshaling. 874 */ 875int 876rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) 877{ 878 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 879 struct xdr_stream *xdr = &req->rl_stream; 880 enum rpcrdma_chunktype rtype, wtype; 881 struct xdr_buf *buf = &rqst->rq_snd_buf; 882 bool ddp_allowed; 883 __be32 *p; 884 int ret; 885 886 if (unlikely(rqst->rq_rcv_buf.flags & XDRBUF_SPARSE_PAGES)) { 887 ret = rpcrdma_alloc_sparse_pages(&rqst->rq_rcv_buf); 888 if (ret) 889 return ret; 890 } 891 892 rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); 893 xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf), 894 rqst); 895 896 /* Fixed header fields */ 897 ret = -EMSGSIZE; 898 p = xdr_reserve_space(xdr, 4 * sizeof(*p)); 899 if (!p) 900 goto out_err; 901 *p++ = rqst->rq_xid; 902 *p++ = rpcrdma_version; 903 *p++ = r_xprt->rx_buf.rb_max_requests; 904 905 /* When the ULP employs a GSS flavor that guarantees integrity 906 * or privacy, direct data placement of individual data items 907 * is not allowed. 908 */ 909 ddp_allowed = !test_bit(RPCAUTH_AUTH_DATATOUCH, 910 &rqst->rq_cred->cr_auth->au_flags); 911 912 /* 913 * Chunks needed for results? 914 * 915 * o If the expected result is under the inline threshold, all ops 916 * return as inline. 917 * o Large read ops return data as write chunk(s), header as 918 * inline. 919 * o Large non-read ops return as a single reply chunk. 920 */ 921 if (rpcrdma_results_inline(r_xprt, rqst)) 922 wtype = rpcrdma_noch; 923 else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) && 924 rpcrdma_nonpayload_inline(r_xprt, rqst)) 925 wtype = rpcrdma_writech; 926 else 927 wtype = rpcrdma_replych; 928 929 /* 930 * Chunks needed for arguments? 931 * 932 * o If the total request is under the inline threshold, all ops 933 * are sent as inline. 934 * o Large write ops transmit data as read chunk(s), header as 935 * inline. 936 * o Large non-write ops are sent with the entire message as a 937 * single read chunk (protocol 0-position special case). 938 * 939 * This assumes that the upper layer does not present a request 940 * that both has a data payload, and whose non-data arguments 941 * by themselves are larger than the inline threshold. 942 */ 943 if (rpcrdma_args_inline(r_xprt, rqst)) { 944 *p++ = rdma_msg; 945 rtype = buf->len < rdmab_length(req->rl_sendbuf) ? 946 rpcrdma_noch_pullup : rpcrdma_noch_mapped; 947 } else if (ddp_allowed && buf->flags & XDRBUF_WRITE) { 948 *p++ = rdma_msg; 949 rtype = rpcrdma_readch; 950 } else { 951 r_xprt->rx_stats.nomsg_call_count++; 952 *p++ = rdma_nomsg; 953 rtype = rpcrdma_areadch; 954 } 955 956 /* This implementation supports the following combinations 957 * of chunk lists in one RPC-over-RDMA Call message: 958 * 959 * - Read list 960 * - Write list 961 * - Reply chunk 962 * - Read list + Reply chunk 963 * 964 * It might not yet support the following combinations: 965 * 966 * - Read list + Write list 967 * 968 * It does not support the following combinations: 969 * 970 * - Write list + Reply chunk 971 * - Read list + Write list + Reply chunk 972 * 973 * This implementation supports only a single chunk in each 974 * Read or Write list. Thus for example the client cannot 975 * send a Call message with a Position Zero Read chunk and a 976 * regular Read chunk at the same time. 977 */ 978 ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype); 979 if (ret) 980 goto out_err; 981 ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype); 982 if (ret) 983 goto out_err; 984 ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype); 985 if (ret) 986 goto out_err; 987 988 ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len, 989 buf, rtype); 990 if (ret) 991 goto out_err; 992 993 trace_xprtrdma_marshal(req, rtype, wtype); 994 return 0; 995 996out_err: 997 trace_xprtrdma_marshal_failed(rqst, ret); 998 r_xprt->rx_stats.failed_marshal_count++; 999 frwr_reset(req); 1000 return ret; 1001} 1002 1003static void __rpcrdma_update_cwnd_locked(struct rpc_xprt *xprt, 1004 struct rpcrdma_buffer *buf, 1005 u32 grant) 1006{ 1007 buf->rb_credits = grant; 1008 xprt->cwnd = grant << RPC_CWNDSHIFT; 1009} 1010 1011static void rpcrdma_update_cwnd(struct rpcrdma_xprt *r_xprt, u32 grant) 1012{ 1013 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1014 1015 spin_lock(&xprt->transport_lock); 1016 __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, grant); 1017 spin_unlock(&xprt->transport_lock); 1018} 1019 1020/** 1021 * rpcrdma_reset_cwnd - Reset the xprt's congestion window 1022 * @r_xprt: controlling transport instance 1023 * 1024 * Prepare @r_xprt for the next connection by reinitializing 1025 * its credit grant to one (see RFC 8166, Section 3.3.3). 1026 */ 1027void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt) 1028{ 1029 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1030 1031 spin_lock(&xprt->transport_lock); 1032 xprt->cong = 0; 1033 __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, 1); 1034 spin_unlock(&xprt->transport_lock); 1035} 1036 1037/** 1038 * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs 1039 * @rqst: controlling RPC request 1040 * @srcp: points to RPC message payload in receive buffer 1041 * @copy_len: remaining length of receive buffer content 1042 * @pad: Write chunk pad bytes needed (zero for pure inline) 1043 * 1044 * The upper layer has set the maximum number of bytes it can 1045 * receive in each component of rq_rcv_buf. These values are set in 1046 * the head.iov_len, page_len, tail.iov_len, and buflen fields. 1047 * 1048 * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in 1049 * many cases this function simply updates iov_base pointers in 1050 * rq_rcv_buf to point directly to the received reply data, to 1051 * avoid copying reply data. 1052 * 1053 * Returns the count of bytes which had to be memcopied. 1054 */ 1055static unsigned long 1056rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) 1057{ 1058 unsigned long fixup_copy_count; 1059 int i, npages, curlen; 1060 char *destp; 1061 struct page **ppages; 1062 int page_base; 1063 1064 /* The head iovec is redirected to the RPC reply message 1065 * in the receive buffer, to avoid a memcopy. 1066 */ 1067 rqst->rq_rcv_buf.head[0].iov_base = srcp; 1068 rqst->rq_private_buf.head[0].iov_base = srcp; 1069 1070 /* The contents of the receive buffer that follow 1071 * head.iov_len bytes are copied into the page list. 1072 */ 1073 curlen = rqst->rq_rcv_buf.head[0].iov_len; 1074 if (curlen > copy_len) 1075 curlen = copy_len; 1076 srcp += curlen; 1077 copy_len -= curlen; 1078 1079 ppages = rqst->rq_rcv_buf.pages + 1080 (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT); 1081 page_base = offset_in_page(rqst->rq_rcv_buf.page_base); 1082 fixup_copy_count = 0; 1083 if (copy_len && rqst->rq_rcv_buf.page_len) { 1084 int pagelist_len; 1085 1086 pagelist_len = rqst->rq_rcv_buf.page_len; 1087 if (pagelist_len > copy_len) 1088 pagelist_len = copy_len; 1089 npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT; 1090 for (i = 0; i < npages; i++) { 1091 curlen = PAGE_SIZE - page_base; 1092 if (curlen > pagelist_len) 1093 curlen = pagelist_len; 1094 1095 destp = kmap_atomic(ppages[i]); 1096 memcpy(destp + page_base, srcp, curlen); 1097 flush_dcache_page(ppages[i]); 1098 kunmap_atomic(destp); 1099 srcp += curlen; 1100 copy_len -= curlen; 1101 fixup_copy_count += curlen; 1102 pagelist_len -= curlen; 1103 if (!pagelist_len) 1104 break; 1105 page_base = 0; 1106 } 1107 1108 /* Implicit padding for the last segment in a Write 1109 * chunk is inserted inline at the front of the tail 1110 * iovec. The upper layer ignores the content of 1111 * the pad. Simply ensure inline content in the tail 1112 * that follows the Write chunk is properly aligned. 1113 */ 1114 if (pad) 1115 srcp -= pad; 1116 } 1117 1118 /* The tail iovec is redirected to the remaining data 1119 * in the receive buffer, to avoid a memcopy. 1120 */ 1121 if (copy_len || pad) { 1122 rqst->rq_rcv_buf.tail[0].iov_base = srcp; 1123 rqst->rq_private_buf.tail[0].iov_base = srcp; 1124 } 1125 1126 if (fixup_copy_count) 1127 trace_xprtrdma_fixup(rqst, fixup_copy_count); 1128 return fixup_copy_count; 1129} 1130 1131/* By convention, backchannel calls arrive via rdma_msg type 1132 * messages, and never populate the chunk lists. This makes 1133 * the RPC/RDMA header small and fixed in size, so it is 1134 * straightforward to check the RPC header's direction field. 1135 */ 1136static bool 1137rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) 1138#if defined(CONFIG_SUNRPC_BACKCHANNEL) 1139{ 1140 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1141 struct xdr_stream *xdr = &rep->rr_stream; 1142 __be32 *p; 1143 1144 if (rep->rr_proc != rdma_msg) 1145 return false; 1146 1147 /* Peek at stream contents without advancing. */ 1148 p = xdr_inline_decode(xdr, 0); 1149 1150 /* Chunk lists */ 1151 if (xdr_item_is_present(p++)) 1152 return false; 1153 if (xdr_item_is_present(p++)) 1154 return false; 1155 if (xdr_item_is_present(p++)) 1156 return false; 1157 1158 /* RPC header */ 1159 if (*p++ != rep->rr_xid) 1160 return false; 1161 if (*p != cpu_to_be32(RPC_CALL)) 1162 return false; 1163 1164 /* No bc service. */ 1165 if (xprt->bc_serv == NULL) 1166 return false; 1167 1168 /* Now that we are sure this is a backchannel call, 1169 * advance to the RPC header. 1170 */ 1171 p = xdr_inline_decode(xdr, 3 * sizeof(*p)); 1172 if (unlikely(!p)) 1173 goto out_short; 1174 1175 rpcrdma_bc_receive_call(r_xprt, rep); 1176 return true; 1177 1178out_short: 1179 pr_warn("RPC/RDMA short backward direction call\n"); 1180 return true; 1181} 1182#else /* CONFIG_SUNRPC_BACKCHANNEL */ 1183{ 1184 return false; 1185} 1186#endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1187 1188static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length) 1189{ 1190 u32 handle; 1191 u64 offset; 1192 __be32 *p; 1193 1194 p = xdr_inline_decode(xdr, 4 * sizeof(*p)); 1195 if (unlikely(!p)) 1196 return -EIO; 1197 1198 xdr_decode_rdma_segment(p, &handle, length, &offset); 1199 trace_xprtrdma_decode_seg(handle, *length, offset); 1200 return 0; 1201} 1202 1203static int decode_write_chunk(struct xdr_stream *xdr, u32 *length) 1204{ 1205 u32 segcount, seglength; 1206 __be32 *p; 1207 1208 p = xdr_inline_decode(xdr, sizeof(*p)); 1209 if (unlikely(!p)) 1210 return -EIO; 1211 1212 *length = 0; 1213 segcount = be32_to_cpup(p); 1214 while (segcount--) { 1215 if (decode_rdma_segment(xdr, &seglength)) 1216 return -EIO; 1217 *length += seglength; 1218 } 1219 1220 return 0; 1221} 1222 1223/* In RPC-over-RDMA Version One replies, a Read list is never 1224 * expected. This decoder is a stub that returns an error if 1225 * a Read list is present. 1226 */ 1227static int decode_read_list(struct xdr_stream *xdr) 1228{ 1229 __be32 *p; 1230 1231 p = xdr_inline_decode(xdr, sizeof(*p)); 1232 if (unlikely(!p)) 1233 return -EIO; 1234 if (unlikely(xdr_item_is_present(p))) 1235 return -EIO; 1236 return 0; 1237} 1238 1239/* Supports only one Write chunk in the Write list 1240 */ 1241static int decode_write_list(struct xdr_stream *xdr, u32 *length) 1242{ 1243 u32 chunklen; 1244 bool first; 1245 __be32 *p; 1246 1247 *length = 0; 1248 first = true; 1249 do { 1250 p = xdr_inline_decode(xdr, sizeof(*p)); 1251 if (unlikely(!p)) 1252 return -EIO; 1253 if (xdr_item_is_absent(p)) 1254 break; 1255 if (!first) 1256 return -EIO; 1257 1258 if (decode_write_chunk(xdr, &chunklen)) 1259 return -EIO; 1260 *length += chunklen; 1261 first = false; 1262 } while (true); 1263 return 0; 1264} 1265 1266static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length) 1267{ 1268 __be32 *p; 1269 1270 p = xdr_inline_decode(xdr, sizeof(*p)); 1271 if (unlikely(!p)) 1272 return -EIO; 1273 1274 *length = 0; 1275 if (xdr_item_is_present(p)) 1276 if (decode_write_chunk(xdr, length)) 1277 return -EIO; 1278 return 0; 1279} 1280 1281static int 1282rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, 1283 struct rpc_rqst *rqst) 1284{ 1285 struct xdr_stream *xdr = &rep->rr_stream; 1286 u32 writelist, replychunk, rpclen; 1287 char *base; 1288 1289 /* Decode the chunk lists */ 1290 if (decode_read_list(xdr)) 1291 return -EIO; 1292 if (decode_write_list(xdr, &writelist)) 1293 return -EIO; 1294 if (decode_reply_chunk(xdr, &replychunk)) 1295 return -EIO; 1296 1297 /* RDMA_MSG sanity checks */ 1298 if (unlikely(replychunk)) 1299 return -EIO; 1300 1301 /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */ 1302 base = (char *)xdr_inline_decode(xdr, 0); 1303 rpclen = xdr_stream_remaining(xdr); 1304 r_xprt->rx_stats.fixup_copy_count += 1305 rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3); 1306 1307 r_xprt->rx_stats.total_rdma_reply += writelist; 1308 return rpclen + xdr_align_size(writelist); 1309} 1310 1311static noinline int 1312rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) 1313{ 1314 struct xdr_stream *xdr = &rep->rr_stream; 1315 u32 writelist, replychunk; 1316 1317 /* Decode the chunk lists */ 1318 if (decode_read_list(xdr)) 1319 return -EIO; 1320 if (decode_write_list(xdr, &writelist)) 1321 return -EIO; 1322 if (decode_reply_chunk(xdr, &replychunk)) 1323 return -EIO; 1324 1325 /* RDMA_NOMSG sanity checks */ 1326 if (unlikely(writelist)) 1327 return -EIO; 1328 if (unlikely(!replychunk)) 1329 return -EIO; 1330 1331 /* Reply chunk buffer already is the reply vector */ 1332 r_xprt->rx_stats.total_rdma_reply += replychunk; 1333 return replychunk; 1334} 1335 1336static noinline int 1337rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, 1338 struct rpc_rqst *rqst) 1339{ 1340 struct xdr_stream *xdr = &rep->rr_stream; 1341 __be32 *p; 1342 1343 p = xdr_inline_decode(xdr, sizeof(*p)); 1344 if (unlikely(!p)) 1345 return -EIO; 1346 1347 switch (*p) { 1348 case err_vers: 1349 p = xdr_inline_decode(xdr, 2 * sizeof(*p)); 1350 if (!p) 1351 break; 1352 dprintk("RPC: %s: server reports " 1353 "version error (%u-%u), xid %08x\n", __func__, 1354 be32_to_cpup(p), be32_to_cpu(*(p + 1)), 1355 be32_to_cpu(rep->rr_xid)); 1356 break; 1357 case err_chunk: 1358 dprintk("RPC: %s: server reports " 1359 "header decoding error, xid %08x\n", __func__, 1360 be32_to_cpu(rep->rr_xid)); 1361 break; 1362 default: 1363 dprintk("RPC: %s: server reports " 1364 "unrecognized error %d, xid %08x\n", __func__, 1365 be32_to_cpup(p), be32_to_cpu(rep->rr_xid)); 1366 } 1367 1368 return -EIO; 1369} 1370 1371/* Perform XID lookup, reconstruction of the RPC reply, and 1372 * RPC completion while holding the transport lock to ensure 1373 * the rep, rqst, and rq_task pointers remain stable. 1374 */ 1375void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) 1376{ 1377 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 1378 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1379 struct rpc_rqst *rqst = rep->rr_rqst; 1380 int status; 1381 1382 switch (rep->rr_proc) { 1383 case rdma_msg: 1384 status = rpcrdma_decode_msg(r_xprt, rep, rqst); 1385 break; 1386 case rdma_nomsg: 1387 status = rpcrdma_decode_nomsg(r_xprt, rep); 1388 break; 1389 case rdma_error: 1390 status = rpcrdma_decode_error(r_xprt, rep, rqst); 1391 break; 1392 default: 1393 status = -EIO; 1394 } 1395 if (status < 0) 1396 goto out_badheader; 1397 1398out: 1399 spin_lock(&xprt->queue_lock); 1400 xprt_complete_rqst(rqst->rq_task, status); 1401 xprt_unpin_rqst(rqst); 1402 spin_unlock(&xprt->queue_lock); 1403 return; 1404 1405out_badheader: 1406 trace_xprtrdma_reply_hdr(rep); 1407 r_xprt->rx_stats.bad_reply_count++; 1408 rqst->rq_task->tk_status = status; 1409 status = 0; 1410 goto out; 1411} 1412 1413static void rpcrdma_reply_done(struct kref *kref) 1414{ 1415 struct rpcrdma_req *req = 1416 container_of(kref, struct rpcrdma_req, rl_kref); 1417 1418 rpcrdma_complete_rqst(req->rl_reply); 1419} 1420 1421/** 1422 * rpcrdma_reply_handler - Process received RPC/RDMA messages 1423 * @rep: Incoming rpcrdma_rep object to process 1424 * 1425 * Errors must result in the RPC task either being awakened, or 1426 * allowed to timeout, to discover the errors at that time. 1427 */ 1428void rpcrdma_reply_handler(struct rpcrdma_rep *rep) 1429{ 1430 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 1431 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1432 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1433 struct rpcrdma_req *req; 1434 struct rpc_rqst *rqst; 1435 u32 credits; 1436 __be32 *p; 1437 1438 /* Any data means we had a useful conversation, so 1439 * then we don't need to delay the next reconnect. 1440 */ 1441 if (xprt->reestablish_timeout) 1442 xprt->reestablish_timeout = 0; 1443 1444 /* Fixed transport header fields */ 1445 xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf, 1446 rep->rr_hdrbuf.head[0].iov_base, NULL); 1447 p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p)); 1448 if (unlikely(!p)) 1449 goto out_shortreply; 1450 rep->rr_xid = *p++; 1451 rep->rr_vers = *p++; 1452 credits = be32_to_cpu(*p++); 1453 rep->rr_proc = *p++; 1454 1455 if (rep->rr_vers != rpcrdma_version) 1456 goto out_badversion; 1457 1458 if (rpcrdma_is_bcall(r_xprt, rep)) 1459 return; 1460 1461 /* Match incoming rpcrdma_rep to an rpcrdma_req to 1462 * get context for handling any incoming chunks. 1463 */ 1464 spin_lock(&xprt->queue_lock); 1465 rqst = xprt_lookup_rqst(xprt, rep->rr_xid); 1466 if (!rqst) 1467 goto out_norqst; 1468 xprt_pin_rqst(rqst); 1469 spin_unlock(&xprt->queue_lock); 1470 1471 if (credits == 0) 1472 credits = 1; /* don't deadlock */ 1473 else if (credits > r_xprt->rx_ep->re_max_requests) 1474 credits = r_xprt->rx_ep->re_max_requests; 1475 rpcrdma_post_recvs(r_xprt, credits + (buf->rb_bc_srv_max_requests << 1), 1476 false); 1477 if (buf->rb_credits != credits) 1478 rpcrdma_update_cwnd(r_xprt, credits); 1479 1480 req = rpcr_to_rdmar(rqst); 1481 if (req->rl_reply) { 1482 trace_xprtrdma_leaked_rep(rqst, req->rl_reply); 1483 rpcrdma_recv_buffer_put(req->rl_reply); 1484 } 1485 req->rl_reply = rep; 1486 rep->rr_rqst = rqst; 1487 1488 trace_xprtrdma_reply(rqst->rq_task, rep, req, credits); 1489 1490 if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) 1491 frwr_reminv(rep, &req->rl_registered); 1492 if (!list_empty(&req->rl_registered)) 1493 frwr_unmap_async(r_xprt, req); 1494 /* LocalInv completion will complete the RPC */ 1495 else 1496 kref_put(&req->rl_kref, rpcrdma_reply_done); 1497 return; 1498 1499out_badversion: 1500 trace_xprtrdma_reply_vers(rep); 1501 goto out; 1502 1503out_norqst: 1504 spin_unlock(&xprt->queue_lock); 1505 trace_xprtrdma_reply_rqst(rep); 1506 goto out; 1507 1508out_shortreply: 1509 trace_xprtrdma_reply_short(rep); 1510 1511out: 1512 rpcrdma_recv_buffer_put(rep); 1513} 1514