162306a36Sopenharmony_ci/*
262306a36Sopenharmony_ci * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
362306a36Sopenharmony_ci *
462306a36Sopenharmony_ci * This software is available to you under a choice of one of two
562306a36Sopenharmony_ci * licenses.  You may choose to be licensed under the terms of the GNU
662306a36Sopenharmony_ci * General Public License (GPL) Version 2, available from the file
762306a36Sopenharmony_ci * COPYING in the main directory of this source tree, or the
862306a36Sopenharmony_ci * OpenIB.org BSD license below:
962306a36Sopenharmony_ci *
1062306a36Sopenharmony_ci *     Redistribution and use in source and binary forms, with or
1162306a36Sopenharmony_ci *     without modification, are permitted provided that the following
1262306a36Sopenharmony_ci *     conditions are met:
1362306a36Sopenharmony_ci *
1462306a36Sopenharmony_ci *      - Redistributions of source code must retain the above
1562306a36Sopenharmony_ci *        copyright notice, this list of conditions and the following
1662306a36Sopenharmony_ci *        disclaimer.
1762306a36Sopenharmony_ci *
1862306a36Sopenharmony_ci *      - Redistributions in binary form must reproduce the above
1962306a36Sopenharmony_ci *        copyright notice, this list of conditions and the following
2062306a36Sopenharmony_ci *        disclaimer in the documentation and/or other materials
2162306a36Sopenharmony_ci *        provided with the distribution.
2262306a36Sopenharmony_ci *
2362306a36Sopenharmony_ci * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
2462306a36Sopenharmony_ci * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
2562306a36Sopenharmony_ci * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
2662306a36Sopenharmony_ci * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
2762306a36Sopenharmony_ci * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
2862306a36Sopenharmony_ci * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
2962306a36Sopenharmony_ci * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
3062306a36Sopenharmony_ci * SOFTWARE.
3162306a36Sopenharmony_ci *
3262306a36Sopenharmony_ci */
3362306a36Sopenharmony_ci#include <linux/kernel.h>
3462306a36Sopenharmony_ci#include <linux/moduleparam.h>
3562306a36Sopenharmony_ci#include <linux/gfp.h>
3662306a36Sopenharmony_ci#include <net/sock.h>
3762306a36Sopenharmony_ci#include <linux/in.h>
3862306a36Sopenharmony_ci#include <linux/list.h>
3962306a36Sopenharmony_ci#include <linux/ratelimit.h>
4062306a36Sopenharmony_ci#include <linux/export.h>
4162306a36Sopenharmony_ci#include <linux/sizes.h>
4262306a36Sopenharmony_ci
4362306a36Sopenharmony_ci#include "rds.h"
4462306a36Sopenharmony_ci
4562306a36Sopenharmony_ci/* When transmitting messages in rds_send_xmit, we need to emerge from
4662306a36Sopenharmony_ci * time to time and briefly release the CPU. Otherwise the softlock watchdog
4762306a36Sopenharmony_ci * will kick our shin.
4862306a36Sopenharmony_ci * Also, it seems fairer to not let one busy connection stall all the
4962306a36Sopenharmony_ci * others.
5062306a36Sopenharmony_ci *
5162306a36Sopenharmony_ci * send_batch_count is the number of times we'll loop in send_xmit. Setting
5262306a36Sopenharmony_ci * it to 0 will restore the old behavior (where we looped until we had
5362306a36Sopenharmony_ci * drained the queue).
5462306a36Sopenharmony_ci */
5562306a36Sopenharmony_cistatic int send_batch_count = SZ_1K;
5662306a36Sopenharmony_cimodule_param(send_batch_count, int, 0444);
5762306a36Sopenharmony_ciMODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
5862306a36Sopenharmony_ci
5962306a36Sopenharmony_cistatic void rds_send_remove_from_sock(struct list_head *messages, int status);
6062306a36Sopenharmony_ci
6162306a36Sopenharmony_ci/*
6262306a36Sopenharmony_ci * Reset the send state.  Callers must ensure that this doesn't race with
6362306a36Sopenharmony_ci * rds_send_xmit().
6462306a36Sopenharmony_ci */
6562306a36Sopenharmony_civoid rds_send_path_reset(struct rds_conn_path *cp)
6662306a36Sopenharmony_ci{
6762306a36Sopenharmony_ci	struct rds_message *rm, *tmp;
6862306a36Sopenharmony_ci	unsigned long flags;
6962306a36Sopenharmony_ci
7062306a36Sopenharmony_ci	if (cp->cp_xmit_rm) {
7162306a36Sopenharmony_ci		rm = cp->cp_xmit_rm;
7262306a36Sopenharmony_ci		cp->cp_xmit_rm = NULL;
7362306a36Sopenharmony_ci		/* Tell the user the RDMA op is no longer mapped by the
7462306a36Sopenharmony_ci		 * transport. This isn't entirely true (it's flushed out
7562306a36Sopenharmony_ci		 * independently) but as the connection is down, there's
7662306a36Sopenharmony_ci		 * no ongoing RDMA to/from that memory */
7762306a36Sopenharmony_ci		rds_message_unmapped(rm);
7862306a36Sopenharmony_ci		rds_message_put(rm);
7962306a36Sopenharmony_ci	}
8062306a36Sopenharmony_ci
8162306a36Sopenharmony_ci	cp->cp_xmit_sg = 0;
8262306a36Sopenharmony_ci	cp->cp_xmit_hdr_off = 0;
8362306a36Sopenharmony_ci	cp->cp_xmit_data_off = 0;
8462306a36Sopenharmony_ci	cp->cp_xmit_atomic_sent = 0;
8562306a36Sopenharmony_ci	cp->cp_xmit_rdma_sent = 0;
8662306a36Sopenharmony_ci	cp->cp_xmit_data_sent = 0;
8762306a36Sopenharmony_ci
8862306a36Sopenharmony_ci	cp->cp_conn->c_map_queued = 0;
8962306a36Sopenharmony_ci
9062306a36Sopenharmony_ci	cp->cp_unacked_packets = rds_sysctl_max_unacked_packets;
9162306a36Sopenharmony_ci	cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes;
9262306a36Sopenharmony_ci
9362306a36Sopenharmony_ci	/* Mark messages as retransmissions, and move them to the send q */
9462306a36Sopenharmony_ci	spin_lock_irqsave(&cp->cp_lock, flags);
9562306a36Sopenharmony_ci	list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
9662306a36Sopenharmony_ci		set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
9762306a36Sopenharmony_ci		set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
9862306a36Sopenharmony_ci	}
9962306a36Sopenharmony_ci	list_splice_init(&cp->cp_retrans, &cp->cp_send_queue);
10062306a36Sopenharmony_ci	spin_unlock_irqrestore(&cp->cp_lock, flags);
10162306a36Sopenharmony_ci}
10262306a36Sopenharmony_ciEXPORT_SYMBOL_GPL(rds_send_path_reset);
10362306a36Sopenharmony_ci
10462306a36Sopenharmony_cistatic int acquire_in_xmit(struct rds_conn_path *cp)
10562306a36Sopenharmony_ci{
10662306a36Sopenharmony_ci	return test_and_set_bit_lock(RDS_IN_XMIT, &cp->cp_flags) == 0;
10762306a36Sopenharmony_ci}
10862306a36Sopenharmony_ci
10962306a36Sopenharmony_cistatic void release_in_xmit(struct rds_conn_path *cp)
11062306a36Sopenharmony_ci{
11162306a36Sopenharmony_ci	clear_bit_unlock(RDS_IN_XMIT, &cp->cp_flags);
11262306a36Sopenharmony_ci	/*
11362306a36Sopenharmony_ci	 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
11462306a36Sopenharmony_ci	 * hot path and finding waiters is very rare.  We don't want to walk
11562306a36Sopenharmony_ci	 * the system-wide hashed waitqueue buckets in the fast path only to
11662306a36Sopenharmony_ci	 * almost never find waiters.
11762306a36Sopenharmony_ci	 */
11862306a36Sopenharmony_ci	if (waitqueue_active(&cp->cp_waitq))
11962306a36Sopenharmony_ci		wake_up_all(&cp->cp_waitq);
12062306a36Sopenharmony_ci}
12162306a36Sopenharmony_ci
12262306a36Sopenharmony_ci/*
12362306a36Sopenharmony_ci * We're making the conscious trade-off here to only send one message
12462306a36Sopenharmony_ci * down the connection at a time.
12562306a36Sopenharmony_ci *   Pro:
12662306a36Sopenharmony_ci *      - tx queueing is a simple fifo list
12762306a36Sopenharmony_ci *   	- reassembly is optional and easily done by transports per conn
12862306a36Sopenharmony_ci *      - no per flow rx lookup at all, straight to the socket
12962306a36Sopenharmony_ci *   	- less per-frag memory and wire overhead
13062306a36Sopenharmony_ci *   Con:
13162306a36Sopenharmony_ci *      - queued acks can be delayed behind large messages
13262306a36Sopenharmony_ci *   Depends:
13362306a36Sopenharmony_ci *      - small message latency is higher behind queued large messages
13462306a36Sopenharmony_ci *      - large message latency isn't starved by intervening small sends
13562306a36Sopenharmony_ci */
13662306a36Sopenharmony_ciint rds_send_xmit(struct rds_conn_path *cp)
13762306a36Sopenharmony_ci{
13862306a36Sopenharmony_ci	struct rds_connection *conn = cp->cp_conn;
13962306a36Sopenharmony_ci	struct rds_message *rm;
14062306a36Sopenharmony_ci	unsigned long flags;
14162306a36Sopenharmony_ci	unsigned int tmp;
14262306a36Sopenharmony_ci	struct scatterlist *sg;
14362306a36Sopenharmony_ci	int ret = 0;
14462306a36Sopenharmony_ci	LIST_HEAD(to_be_dropped);
14562306a36Sopenharmony_ci	int batch_count;
14662306a36Sopenharmony_ci	unsigned long send_gen = 0;
14762306a36Sopenharmony_ci	int same_rm = 0;
14862306a36Sopenharmony_ci
14962306a36Sopenharmony_cirestart:
15062306a36Sopenharmony_ci	batch_count = 0;
15162306a36Sopenharmony_ci
15262306a36Sopenharmony_ci	/*
15362306a36Sopenharmony_ci	 * sendmsg calls here after having queued its message on the send
15462306a36Sopenharmony_ci	 * queue.  We only have one task feeding the connection at a time.  If
15562306a36Sopenharmony_ci	 * another thread is already feeding the queue then we back off.  This
15662306a36Sopenharmony_ci	 * avoids blocking the caller and trading per-connection data between
15762306a36Sopenharmony_ci	 * caches per message.
15862306a36Sopenharmony_ci	 */
15962306a36Sopenharmony_ci	if (!acquire_in_xmit(cp)) {
16062306a36Sopenharmony_ci		rds_stats_inc(s_send_lock_contention);
16162306a36Sopenharmony_ci		ret = -ENOMEM;
16262306a36Sopenharmony_ci		goto out;
16362306a36Sopenharmony_ci	}
16462306a36Sopenharmony_ci
16562306a36Sopenharmony_ci	if (rds_destroy_pending(cp->cp_conn)) {
16662306a36Sopenharmony_ci		release_in_xmit(cp);
16762306a36Sopenharmony_ci		ret = -ENETUNREACH; /* dont requeue send work */
16862306a36Sopenharmony_ci		goto out;
16962306a36Sopenharmony_ci	}
17062306a36Sopenharmony_ci
17162306a36Sopenharmony_ci	/*
17262306a36Sopenharmony_ci	 * we record the send generation after doing the xmit acquire.
17362306a36Sopenharmony_ci	 * if someone else manages to jump in and do some work, we'll use
17462306a36Sopenharmony_ci	 * this to avoid a goto restart farther down.
17562306a36Sopenharmony_ci	 *
17662306a36Sopenharmony_ci	 * The acquire_in_xmit() check above ensures that only one
17762306a36Sopenharmony_ci	 * caller can increment c_send_gen at any time.
17862306a36Sopenharmony_ci	 */
17962306a36Sopenharmony_ci	send_gen = READ_ONCE(cp->cp_send_gen) + 1;
18062306a36Sopenharmony_ci	WRITE_ONCE(cp->cp_send_gen, send_gen);
18162306a36Sopenharmony_ci
18262306a36Sopenharmony_ci	/*
18362306a36Sopenharmony_ci	 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
18462306a36Sopenharmony_ci	 * we do the opposite to avoid races.
18562306a36Sopenharmony_ci	 */
18662306a36Sopenharmony_ci	if (!rds_conn_path_up(cp)) {
18762306a36Sopenharmony_ci		release_in_xmit(cp);
18862306a36Sopenharmony_ci		ret = 0;
18962306a36Sopenharmony_ci		goto out;
19062306a36Sopenharmony_ci	}
19162306a36Sopenharmony_ci
19262306a36Sopenharmony_ci	if (conn->c_trans->xmit_path_prepare)
19362306a36Sopenharmony_ci		conn->c_trans->xmit_path_prepare(cp);
19462306a36Sopenharmony_ci
19562306a36Sopenharmony_ci	/*
19662306a36Sopenharmony_ci	 * spin trying to push headers and data down the connection until
19762306a36Sopenharmony_ci	 * the connection doesn't make forward progress.
19862306a36Sopenharmony_ci	 */
19962306a36Sopenharmony_ci	while (1) {
20062306a36Sopenharmony_ci
20162306a36Sopenharmony_ci		rm = cp->cp_xmit_rm;
20262306a36Sopenharmony_ci
20362306a36Sopenharmony_ci		if (!rm) {
20462306a36Sopenharmony_ci			same_rm = 0;
20562306a36Sopenharmony_ci		} else {
20662306a36Sopenharmony_ci			same_rm++;
20762306a36Sopenharmony_ci			if (same_rm >= 4096) {
20862306a36Sopenharmony_ci				rds_stats_inc(s_send_stuck_rm);
20962306a36Sopenharmony_ci				ret = -EAGAIN;
21062306a36Sopenharmony_ci				break;
21162306a36Sopenharmony_ci			}
21262306a36Sopenharmony_ci		}
21362306a36Sopenharmony_ci
21462306a36Sopenharmony_ci		/*
21562306a36Sopenharmony_ci		 * If between sending messages, we can send a pending congestion
21662306a36Sopenharmony_ci		 * map update.
21762306a36Sopenharmony_ci		 */
21862306a36Sopenharmony_ci		if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
21962306a36Sopenharmony_ci			rm = rds_cong_update_alloc(conn);
22062306a36Sopenharmony_ci			if (IS_ERR(rm)) {
22162306a36Sopenharmony_ci				ret = PTR_ERR(rm);
22262306a36Sopenharmony_ci				break;
22362306a36Sopenharmony_ci			}
22462306a36Sopenharmony_ci			rm->data.op_active = 1;
22562306a36Sopenharmony_ci			rm->m_inc.i_conn_path = cp;
22662306a36Sopenharmony_ci			rm->m_inc.i_conn = cp->cp_conn;
22762306a36Sopenharmony_ci
22862306a36Sopenharmony_ci			cp->cp_xmit_rm = rm;
22962306a36Sopenharmony_ci		}
23062306a36Sopenharmony_ci
23162306a36Sopenharmony_ci		/*
23262306a36Sopenharmony_ci		 * If not already working on one, grab the next message.
23362306a36Sopenharmony_ci		 *
23462306a36Sopenharmony_ci		 * cp_xmit_rm holds a ref while we're sending this message down
23562306a36Sopenharmony_ci		 * the connction.  We can use this ref while holding the
23662306a36Sopenharmony_ci		 * send_sem.. rds_send_reset() is serialized with it.
23762306a36Sopenharmony_ci		 */
23862306a36Sopenharmony_ci		if (!rm) {
23962306a36Sopenharmony_ci			unsigned int len;
24062306a36Sopenharmony_ci
24162306a36Sopenharmony_ci			batch_count++;
24262306a36Sopenharmony_ci
24362306a36Sopenharmony_ci			/* we want to process as big a batch as we can, but
24462306a36Sopenharmony_ci			 * we also want to avoid softlockups.  If we've been
24562306a36Sopenharmony_ci			 * through a lot of messages, lets back off and see
24662306a36Sopenharmony_ci			 * if anyone else jumps in
24762306a36Sopenharmony_ci			 */
24862306a36Sopenharmony_ci			if (batch_count >= send_batch_count)
24962306a36Sopenharmony_ci				goto over_batch;
25062306a36Sopenharmony_ci
25162306a36Sopenharmony_ci			spin_lock_irqsave(&cp->cp_lock, flags);
25262306a36Sopenharmony_ci
25362306a36Sopenharmony_ci			if (!list_empty(&cp->cp_send_queue)) {
25462306a36Sopenharmony_ci				rm = list_entry(cp->cp_send_queue.next,
25562306a36Sopenharmony_ci						struct rds_message,
25662306a36Sopenharmony_ci						m_conn_item);
25762306a36Sopenharmony_ci				rds_message_addref(rm);
25862306a36Sopenharmony_ci
25962306a36Sopenharmony_ci				/*
26062306a36Sopenharmony_ci				 * Move the message from the send queue to the retransmit
26162306a36Sopenharmony_ci				 * list right away.
26262306a36Sopenharmony_ci				 */
26362306a36Sopenharmony_ci				list_move_tail(&rm->m_conn_item,
26462306a36Sopenharmony_ci					       &cp->cp_retrans);
26562306a36Sopenharmony_ci			}
26662306a36Sopenharmony_ci
26762306a36Sopenharmony_ci			spin_unlock_irqrestore(&cp->cp_lock, flags);
26862306a36Sopenharmony_ci
26962306a36Sopenharmony_ci			if (!rm)
27062306a36Sopenharmony_ci				break;
27162306a36Sopenharmony_ci
27262306a36Sopenharmony_ci			/* Unfortunately, the way Infiniband deals with
27362306a36Sopenharmony_ci			 * RDMA to a bad MR key is by moving the entire
27462306a36Sopenharmony_ci			 * queue pair to error state. We could possibly
27562306a36Sopenharmony_ci			 * recover from that, but right now we drop the
27662306a36Sopenharmony_ci			 * connection.
27762306a36Sopenharmony_ci			 * Therefore, we never retransmit messages with RDMA ops.
27862306a36Sopenharmony_ci			 */
27962306a36Sopenharmony_ci			if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) ||
28062306a36Sopenharmony_ci			    (rm->rdma.op_active &&
28162306a36Sopenharmony_ci			    test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))) {
28262306a36Sopenharmony_ci				spin_lock_irqsave(&cp->cp_lock, flags);
28362306a36Sopenharmony_ci				if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
28462306a36Sopenharmony_ci					list_move(&rm->m_conn_item, &to_be_dropped);
28562306a36Sopenharmony_ci				spin_unlock_irqrestore(&cp->cp_lock, flags);
28662306a36Sopenharmony_ci				continue;
28762306a36Sopenharmony_ci			}
28862306a36Sopenharmony_ci
28962306a36Sopenharmony_ci			/* Require an ACK every once in a while */
29062306a36Sopenharmony_ci			len = ntohl(rm->m_inc.i_hdr.h_len);
29162306a36Sopenharmony_ci			if (cp->cp_unacked_packets == 0 ||
29262306a36Sopenharmony_ci			    cp->cp_unacked_bytes < len) {
29362306a36Sopenharmony_ci				set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
29462306a36Sopenharmony_ci
29562306a36Sopenharmony_ci				cp->cp_unacked_packets =
29662306a36Sopenharmony_ci					rds_sysctl_max_unacked_packets;
29762306a36Sopenharmony_ci				cp->cp_unacked_bytes =
29862306a36Sopenharmony_ci					rds_sysctl_max_unacked_bytes;
29962306a36Sopenharmony_ci				rds_stats_inc(s_send_ack_required);
30062306a36Sopenharmony_ci			} else {
30162306a36Sopenharmony_ci				cp->cp_unacked_bytes -= len;
30262306a36Sopenharmony_ci				cp->cp_unacked_packets--;
30362306a36Sopenharmony_ci			}
30462306a36Sopenharmony_ci
30562306a36Sopenharmony_ci			cp->cp_xmit_rm = rm;
30662306a36Sopenharmony_ci		}
30762306a36Sopenharmony_ci
30862306a36Sopenharmony_ci		/* The transport either sends the whole rdma or none of it */
30962306a36Sopenharmony_ci		if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
31062306a36Sopenharmony_ci			rm->m_final_op = &rm->rdma;
31162306a36Sopenharmony_ci			/* The transport owns the mapped memory for now.
31262306a36Sopenharmony_ci			 * You can't unmap it while it's on the send queue
31362306a36Sopenharmony_ci			 */
31462306a36Sopenharmony_ci			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
31562306a36Sopenharmony_ci			ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
31662306a36Sopenharmony_ci			if (ret) {
31762306a36Sopenharmony_ci				clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
31862306a36Sopenharmony_ci				wake_up_interruptible(&rm->m_flush_wait);
31962306a36Sopenharmony_ci				break;
32062306a36Sopenharmony_ci			}
32162306a36Sopenharmony_ci			cp->cp_xmit_rdma_sent = 1;
32262306a36Sopenharmony_ci
32362306a36Sopenharmony_ci		}
32462306a36Sopenharmony_ci
32562306a36Sopenharmony_ci		if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
32662306a36Sopenharmony_ci			rm->m_final_op = &rm->atomic;
32762306a36Sopenharmony_ci			/* The transport owns the mapped memory for now.
32862306a36Sopenharmony_ci			 * You can't unmap it while it's on the send queue
32962306a36Sopenharmony_ci			 */
33062306a36Sopenharmony_ci			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
33162306a36Sopenharmony_ci			ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
33262306a36Sopenharmony_ci			if (ret) {
33362306a36Sopenharmony_ci				clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
33462306a36Sopenharmony_ci				wake_up_interruptible(&rm->m_flush_wait);
33562306a36Sopenharmony_ci				break;
33662306a36Sopenharmony_ci			}
33762306a36Sopenharmony_ci			cp->cp_xmit_atomic_sent = 1;
33862306a36Sopenharmony_ci
33962306a36Sopenharmony_ci		}
34062306a36Sopenharmony_ci
34162306a36Sopenharmony_ci		/*
34262306a36Sopenharmony_ci		 * A number of cases require an RDS header to be sent
34362306a36Sopenharmony_ci		 * even if there is no data.
34462306a36Sopenharmony_ci		 * We permit 0-byte sends; rds-ping depends on this.
34562306a36Sopenharmony_ci		 * However, if there are exclusively attached silent ops,
34662306a36Sopenharmony_ci		 * we skip the hdr/data send, to enable silent operation.
34762306a36Sopenharmony_ci		 */
34862306a36Sopenharmony_ci		if (rm->data.op_nents == 0) {
34962306a36Sopenharmony_ci			int ops_present;
35062306a36Sopenharmony_ci			int all_ops_are_silent = 1;
35162306a36Sopenharmony_ci
35262306a36Sopenharmony_ci			ops_present = (rm->atomic.op_active || rm->rdma.op_active);
35362306a36Sopenharmony_ci			if (rm->atomic.op_active && !rm->atomic.op_silent)
35462306a36Sopenharmony_ci				all_ops_are_silent = 0;
35562306a36Sopenharmony_ci			if (rm->rdma.op_active && !rm->rdma.op_silent)
35662306a36Sopenharmony_ci				all_ops_are_silent = 0;
35762306a36Sopenharmony_ci
35862306a36Sopenharmony_ci			if (ops_present && all_ops_are_silent
35962306a36Sopenharmony_ci			    && !rm->m_rdma_cookie)
36062306a36Sopenharmony_ci				rm->data.op_active = 0;
36162306a36Sopenharmony_ci		}
36262306a36Sopenharmony_ci
36362306a36Sopenharmony_ci		if (rm->data.op_active && !cp->cp_xmit_data_sent) {
36462306a36Sopenharmony_ci			rm->m_final_op = &rm->data;
36562306a36Sopenharmony_ci
36662306a36Sopenharmony_ci			ret = conn->c_trans->xmit(conn, rm,
36762306a36Sopenharmony_ci						  cp->cp_xmit_hdr_off,
36862306a36Sopenharmony_ci						  cp->cp_xmit_sg,
36962306a36Sopenharmony_ci						  cp->cp_xmit_data_off);
37062306a36Sopenharmony_ci			if (ret <= 0)
37162306a36Sopenharmony_ci				break;
37262306a36Sopenharmony_ci
37362306a36Sopenharmony_ci			if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
37462306a36Sopenharmony_ci				tmp = min_t(int, ret,
37562306a36Sopenharmony_ci					    sizeof(struct rds_header) -
37662306a36Sopenharmony_ci					    cp->cp_xmit_hdr_off);
37762306a36Sopenharmony_ci				cp->cp_xmit_hdr_off += tmp;
37862306a36Sopenharmony_ci				ret -= tmp;
37962306a36Sopenharmony_ci			}
38062306a36Sopenharmony_ci
38162306a36Sopenharmony_ci			sg = &rm->data.op_sg[cp->cp_xmit_sg];
38262306a36Sopenharmony_ci			while (ret) {
38362306a36Sopenharmony_ci				tmp = min_t(int, ret, sg->length -
38462306a36Sopenharmony_ci						      cp->cp_xmit_data_off);
38562306a36Sopenharmony_ci				cp->cp_xmit_data_off += tmp;
38662306a36Sopenharmony_ci				ret -= tmp;
38762306a36Sopenharmony_ci				if (cp->cp_xmit_data_off == sg->length) {
38862306a36Sopenharmony_ci					cp->cp_xmit_data_off = 0;
38962306a36Sopenharmony_ci					sg++;
39062306a36Sopenharmony_ci					cp->cp_xmit_sg++;
39162306a36Sopenharmony_ci					BUG_ON(ret != 0 && cp->cp_xmit_sg ==
39262306a36Sopenharmony_ci					       rm->data.op_nents);
39362306a36Sopenharmony_ci				}
39462306a36Sopenharmony_ci			}
39562306a36Sopenharmony_ci
39662306a36Sopenharmony_ci			if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
39762306a36Sopenharmony_ci			    (cp->cp_xmit_sg == rm->data.op_nents))
39862306a36Sopenharmony_ci				cp->cp_xmit_data_sent = 1;
39962306a36Sopenharmony_ci		}
40062306a36Sopenharmony_ci
40162306a36Sopenharmony_ci		/*
40262306a36Sopenharmony_ci		 * A rm will only take multiple times through this loop
40362306a36Sopenharmony_ci		 * if there is a data op. Thus, if the data is sent (or there was
40462306a36Sopenharmony_ci		 * none), then we're done with the rm.
40562306a36Sopenharmony_ci		 */
40662306a36Sopenharmony_ci		if (!rm->data.op_active || cp->cp_xmit_data_sent) {
40762306a36Sopenharmony_ci			cp->cp_xmit_rm = NULL;
40862306a36Sopenharmony_ci			cp->cp_xmit_sg = 0;
40962306a36Sopenharmony_ci			cp->cp_xmit_hdr_off = 0;
41062306a36Sopenharmony_ci			cp->cp_xmit_data_off = 0;
41162306a36Sopenharmony_ci			cp->cp_xmit_rdma_sent = 0;
41262306a36Sopenharmony_ci			cp->cp_xmit_atomic_sent = 0;
41362306a36Sopenharmony_ci			cp->cp_xmit_data_sent = 0;
41462306a36Sopenharmony_ci
41562306a36Sopenharmony_ci			rds_message_put(rm);
41662306a36Sopenharmony_ci		}
41762306a36Sopenharmony_ci	}
41862306a36Sopenharmony_ci
41962306a36Sopenharmony_ciover_batch:
42062306a36Sopenharmony_ci	if (conn->c_trans->xmit_path_complete)
42162306a36Sopenharmony_ci		conn->c_trans->xmit_path_complete(cp);
42262306a36Sopenharmony_ci	release_in_xmit(cp);
42362306a36Sopenharmony_ci
42462306a36Sopenharmony_ci	/* Nuke any messages we decided not to retransmit. */
42562306a36Sopenharmony_ci	if (!list_empty(&to_be_dropped)) {
42662306a36Sopenharmony_ci		/* irqs on here, so we can put(), unlike above */
42762306a36Sopenharmony_ci		list_for_each_entry(rm, &to_be_dropped, m_conn_item)
42862306a36Sopenharmony_ci			rds_message_put(rm);
42962306a36Sopenharmony_ci		rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
43062306a36Sopenharmony_ci	}
43162306a36Sopenharmony_ci
43262306a36Sopenharmony_ci	/*
43362306a36Sopenharmony_ci	 * Other senders can queue a message after we last test the send queue
43462306a36Sopenharmony_ci	 * but before we clear RDS_IN_XMIT.  In that case they'd back off and
43562306a36Sopenharmony_ci	 * not try and send their newly queued message.  We need to check the
43662306a36Sopenharmony_ci	 * send queue after having cleared RDS_IN_XMIT so that their message
43762306a36Sopenharmony_ci	 * doesn't get stuck on the send queue.
43862306a36Sopenharmony_ci	 *
43962306a36Sopenharmony_ci	 * If the transport cannot continue (i.e ret != 0), then it must
44062306a36Sopenharmony_ci	 * call us when more room is available, such as from the tx
44162306a36Sopenharmony_ci	 * completion handler.
44262306a36Sopenharmony_ci	 *
44362306a36Sopenharmony_ci	 * We have an extra generation check here so that if someone manages
44462306a36Sopenharmony_ci	 * to jump in after our release_in_xmit, we'll see that they have done
44562306a36Sopenharmony_ci	 * some work and we will skip our goto
44662306a36Sopenharmony_ci	 */
44762306a36Sopenharmony_ci	if (ret == 0) {
44862306a36Sopenharmony_ci		bool raced;
44962306a36Sopenharmony_ci
45062306a36Sopenharmony_ci		smp_mb();
45162306a36Sopenharmony_ci		raced = send_gen != READ_ONCE(cp->cp_send_gen);
45262306a36Sopenharmony_ci
45362306a36Sopenharmony_ci		if ((test_bit(0, &conn->c_map_queued) ||
45462306a36Sopenharmony_ci		    !list_empty(&cp->cp_send_queue)) && !raced) {
45562306a36Sopenharmony_ci			if (batch_count < send_batch_count)
45662306a36Sopenharmony_ci				goto restart;
45762306a36Sopenharmony_ci			rcu_read_lock();
45862306a36Sopenharmony_ci			if (rds_destroy_pending(cp->cp_conn))
45962306a36Sopenharmony_ci				ret = -ENETUNREACH;
46062306a36Sopenharmony_ci			else
46162306a36Sopenharmony_ci				queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
46262306a36Sopenharmony_ci			rcu_read_unlock();
46362306a36Sopenharmony_ci		} else if (raced) {
46462306a36Sopenharmony_ci			rds_stats_inc(s_send_lock_queue_raced);
46562306a36Sopenharmony_ci		}
46662306a36Sopenharmony_ci	}
46762306a36Sopenharmony_ciout:
46862306a36Sopenharmony_ci	return ret;
46962306a36Sopenharmony_ci}
47062306a36Sopenharmony_ciEXPORT_SYMBOL_GPL(rds_send_xmit);
47162306a36Sopenharmony_ci
47262306a36Sopenharmony_cistatic void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
47362306a36Sopenharmony_ci{
47462306a36Sopenharmony_ci	u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
47562306a36Sopenharmony_ci
47662306a36Sopenharmony_ci	assert_spin_locked(&rs->rs_lock);
47762306a36Sopenharmony_ci
47862306a36Sopenharmony_ci	BUG_ON(rs->rs_snd_bytes < len);
47962306a36Sopenharmony_ci	rs->rs_snd_bytes -= len;
48062306a36Sopenharmony_ci
48162306a36Sopenharmony_ci	if (rs->rs_snd_bytes == 0)
48262306a36Sopenharmony_ci		rds_stats_inc(s_send_queue_empty);
48362306a36Sopenharmony_ci}
48462306a36Sopenharmony_ci
48562306a36Sopenharmony_cistatic inline int rds_send_is_acked(struct rds_message *rm, u64 ack,
48662306a36Sopenharmony_ci				    is_acked_func is_acked)
48762306a36Sopenharmony_ci{
48862306a36Sopenharmony_ci	if (is_acked)
48962306a36Sopenharmony_ci		return is_acked(rm, ack);
49062306a36Sopenharmony_ci	return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack;
49162306a36Sopenharmony_ci}
49262306a36Sopenharmony_ci
49362306a36Sopenharmony_ci/*
49462306a36Sopenharmony_ci * This is pretty similar to what happens below in the ACK
49562306a36Sopenharmony_ci * handling code - except that we call here as soon as we get
49662306a36Sopenharmony_ci * the IB send completion on the RDMA op and the accompanying
49762306a36Sopenharmony_ci * message.
49862306a36Sopenharmony_ci */
49962306a36Sopenharmony_civoid rds_rdma_send_complete(struct rds_message *rm, int status)
50062306a36Sopenharmony_ci{
50162306a36Sopenharmony_ci	struct rds_sock *rs = NULL;
50262306a36Sopenharmony_ci	struct rm_rdma_op *ro;
50362306a36Sopenharmony_ci	struct rds_notifier *notifier;
50462306a36Sopenharmony_ci	unsigned long flags;
50562306a36Sopenharmony_ci
50662306a36Sopenharmony_ci	spin_lock_irqsave(&rm->m_rs_lock, flags);
50762306a36Sopenharmony_ci
50862306a36Sopenharmony_ci	ro = &rm->rdma;
50962306a36Sopenharmony_ci	if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
51062306a36Sopenharmony_ci	    ro->op_active && ro->op_notify && ro->op_notifier) {
51162306a36Sopenharmony_ci		notifier = ro->op_notifier;
51262306a36Sopenharmony_ci		rs = rm->m_rs;
51362306a36Sopenharmony_ci		sock_hold(rds_rs_to_sk(rs));
51462306a36Sopenharmony_ci
51562306a36Sopenharmony_ci		notifier->n_status = status;
51662306a36Sopenharmony_ci		spin_lock(&rs->rs_lock);
51762306a36Sopenharmony_ci		list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
51862306a36Sopenharmony_ci		spin_unlock(&rs->rs_lock);
51962306a36Sopenharmony_ci
52062306a36Sopenharmony_ci		ro->op_notifier = NULL;
52162306a36Sopenharmony_ci	}
52262306a36Sopenharmony_ci
52362306a36Sopenharmony_ci	spin_unlock_irqrestore(&rm->m_rs_lock, flags);
52462306a36Sopenharmony_ci
52562306a36Sopenharmony_ci	if (rs) {
52662306a36Sopenharmony_ci		rds_wake_sk_sleep(rs);
52762306a36Sopenharmony_ci		sock_put(rds_rs_to_sk(rs));
52862306a36Sopenharmony_ci	}
52962306a36Sopenharmony_ci}
53062306a36Sopenharmony_ciEXPORT_SYMBOL_GPL(rds_rdma_send_complete);
53162306a36Sopenharmony_ci
53262306a36Sopenharmony_ci/*
53362306a36Sopenharmony_ci * Just like above, except looks at atomic op
53462306a36Sopenharmony_ci */
53562306a36Sopenharmony_civoid rds_atomic_send_complete(struct rds_message *rm, int status)
53662306a36Sopenharmony_ci{
53762306a36Sopenharmony_ci	struct rds_sock *rs = NULL;
53862306a36Sopenharmony_ci	struct rm_atomic_op *ao;
53962306a36Sopenharmony_ci	struct rds_notifier *notifier;
54062306a36Sopenharmony_ci	unsigned long flags;
54162306a36Sopenharmony_ci
54262306a36Sopenharmony_ci	spin_lock_irqsave(&rm->m_rs_lock, flags);
54362306a36Sopenharmony_ci
54462306a36Sopenharmony_ci	ao = &rm->atomic;
54562306a36Sopenharmony_ci	if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
54662306a36Sopenharmony_ci	    && ao->op_active && ao->op_notify && ao->op_notifier) {
54762306a36Sopenharmony_ci		notifier = ao->op_notifier;
54862306a36Sopenharmony_ci		rs = rm->m_rs;
54962306a36Sopenharmony_ci		sock_hold(rds_rs_to_sk(rs));
55062306a36Sopenharmony_ci
55162306a36Sopenharmony_ci		notifier->n_status = status;
55262306a36Sopenharmony_ci		spin_lock(&rs->rs_lock);
55362306a36Sopenharmony_ci		list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
55462306a36Sopenharmony_ci		spin_unlock(&rs->rs_lock);
55562306a36Sopenharmony_ci
55662306a36Sopenharmony_ci		ao->op_notifier = NULL;
55762306a36Sopenharmony_ci	}
55862306a36Sopenharmony_ci
55962306a36Sopenharmony_ci	spin_unlock_irqrestore(&rm->m_rs_lock, flags);
56062306a36Sopenharmony_ci
56162306a36Sopenharmony_ci	if (rs) {
56262306a36Sopenharmony_ci		rds_wake_sk_sleep(rs);
56362306a36Sopenharmony_ci		sock_put(rds_rs_to_sk(rs));
56462306a36Sopenharmony_ci	}
56562306a36Sopenharmony_ci}
56662306a36Sopenharmony_ciEXPORT_SYMBOL_GPL(rds_atomic_send_complete);
56762306a36Sopenharmony_ci
56862306a36Sopenharmony_ci/*
56962306a36Sopenharmony_ci * This is the same as rds_rdma_send_complete except we
57062306a36Sopenharmony_ci * don't do any locking - we have all the ingredients (message,
57162306a36Sopenharmony_ci * socket, socket lock) and can just move the notifier.
57262306a36Sopenharmony_ci */
57362306a36Sopenharmony_cistatic inline void
57462306a36Sopenharmony_ci__rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
57562306a36Sopenharmony_ci{
57662306a36Sopenharmony_ci	struct rm_rdma_op *ro;
57762306a36Sopenharmony_ci	struct rm_atomic_op *ao;
57862306a36Sopenharmony_ci
57962306a36Sopenharmony_ci	ro = &rm->rdma;
58062306a36Sopenharmony_ci	if (ro->op_active && ro->op_notify && ro->op_notifier) {
58162306a36Sopenharmony_ci		ro->op_notifier->n_status = status;
58262306a36Sopenharmony_ci		list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue);
58362306a36Sopenharmony_ci		ro->op_notifier = NULL;
58462306a36Sopenharmony_ci	}
58562306a36Sopenharmony_ci
58662306a36Sopenharmony_ci	ao = &rm->atomic;
58762306a36Sopenharmony_ci	if (ao->op_active && ao->op_notify && ao->op_notifier) {
58862306a36Sopenharmony_ci		ao->op_notifier->n_status = status;
58962306a36Sopenharmony_ci		list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue);
59062306a36Sopenharmony_ci		ao->op_notifier = NULL;
59162306a36Sopenharmony_ci	}
59262306a36Sopenharmony_ci
59362306a36Sopenharmony_ci	/* No need to wake the app - caller does this */
59462306a36Sopenharmony_ci}
59562306a36Sopenharmony_ci
59662306a36Sopenharmony_ci/*
59762306a36Sopenharmony_ci * This removes messages from the socket's list if they're on it.  The list
59862306a36Sopenharmony_ci * argument must be private to the caller, we must be able to modify it
59962306a36Sopenharmony_ci * without locks.  The messages must have a reference held for their
60062306a36Sopenharmony_ci * position on the list.  This function will drop that reference after
60162306a36Sopenharmony_ci * removing the messages from the 'messages' list regardless of if it found
60262306a36Sopenharmony_ci * the messages on the socket list or not.
60362306a36Sopenharmony_ci */
60462306a36Sopenharmony_cistatic void rds_send_remove_from_sock(struct list_head *messages, int status)
60562306a36Sopenharmony_ci{
60662306a36Sopenharmony_ci	unsigned long flags;
60762306a36Sopenharmony_ci	struct rds_sock *rs = NULL;
60862306a36Sopenharmony_ci	struct rds_message *rm;
60962306a36Sopenharmony_ci
61062306a36Sopenharmony_ci	while (!list_empty(messages)) {
61162306a36Sopenharmony_ci		int was_on_sock = 0;
61262306a36Sopenharmony_ci
61362306a36Sopenharmony_ci		rm = list_entry(messages->next, struct rds_message,
61462306a36Sopenharmony_ci				m_conn_item);
61562306a36Sopenharmony_ci		list_del_init(&rm->m_conn_item);
61662306a36Sopenharmony_ci
61762306a36Sopenharmony_ci		/*
61862306a36Sopenharmony_ci		 * If we see this flag cleared then we're *sure* that someone
61962306a36Sopenharmony_ci		 * else beat us to removing it from the sock.  If we race
62062306a36Sopenharmony_ci		 * with their flag update we'll get the lock and then really
62162306a36Sopenharmony_ci		 * see that the flag has been cleared.
62262306a36Sopenharmony_ci		 *
62362306a36Sopenharmony_ci		 * The message spinlock makes sure nobody clears rm->m_rs
62462306a36Sopenharmony_ci		 * while we're messing with it. It does not prevent the
62562306a36Sopenharmony_ci		 * message from being removed from the socket, though.
62662306a36Sopenharmony_ci		 */
62762306a36Sopenharmony_ci		spin_lock_irqsave(&rm->m_rs_lock, flags);
62862306a36Sopenharmony_ci		if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags))
62962306a36Sopenharmony_ci			goto unlock_and_drop;
63062306a36Sopenharmony_ci
63162306a36Sopenharmony_ci		if (rs != rm->m_rs) {
63262306a36Sopenharmony_ci			if (rs) {
63362306a36Sopenharmony_ci				rds_wake_sk_sleep(rs);
63462306a36Sopenharmony_ci				sock_put(rds_rs_to_sk(rs));
63562306a36Sopenharmony_ci			}
63662306a36Sopenharmony_ci			rs = rm->m_rs;
63762306a36Sopenharmony_ci			if (rs)
63862306a36Sopenharmony_ci				sock_hold(rds_rs_to_sk(rs));
63962306a36Sopenharmony_ci		}
64062306a36Sopenharmony_ci		if (!rs)
64162306a36Sopenharmony_ci			goto unlock_and_drop;
64262306a36Sopenharmony_ci		spin_lock(&rs->rs_lock);
64362306a36Sopenharmony_ci
64462306a36Sopenharmony_ci		if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
64562306a36Sopenharmony_ci			struct rm_rdma_op *ro = &rm->rdma;
64662306a36Sopenharmony_ci			struct rds_notifier *notifier;
64762306a36Sopenharmony_ci
64862306a36Sopenharmony_ci			list_del_init(&rm->m_sock_item);
64962306a36Sopenharmony_ci			rds_send_sndbuf_remove(rs, rm);
65062306a36Sopenharmony_ci
65162306a36Sopenharmony_ci			if (ro->op_active && ro->op_notifier &&
65262306a36Sopenharmony_ci			       (ro->op_notify || (ro->op_recverr && status))) {
65362306a36Sopenharmony_ci				notifier = ro->op_notifier;
65462306a36Sopenharmony_ci				list_add_tail(&notifier->n_list,
65562306a36Sopenharmony_ci						&rs->rs_notify_queue);
65662306a36Sopenharmony_ci				if (!notifier->n_status)
65762306a36Sopenharmony_ci					notifier->n_status = status;
65862306a36Sopenharmony_ci				rm->rdma.op_notifier = NULL;
65962306a36Sopenharmony_ci			}
66062306a36Sopenharmony_ci			was_on_sock = 1;
66162306a36Sopenharmony_ci		}
66262306a36Sopenharmony_ci		spin_unlock(&rs->rs_lock);
66362306a36Sopenharmony_ci
66462306a36Sopenharmony_ciunlock_and_drop:
66562306a36Sopenharmony_ci		spin_unlock_irqrestore(&rm->m_rs_lock, flags);
66662306a36Sopenharmony_ci		rds_message_put(rm);
66762306a36Sopenharmony_ci		if (was_on_sock)
66862306a36Sopenharmony_ci			rds_message_put(rm);
66962306a36Sopenharmony_ci	}
67062306a36Sopenharmony_ci
67162306a36Sopenharmony_ci	if (rs) {
67262306a36Sopenharmony_ci		rds_wake_sk_sleep(rs);
67362306a36Sopenharmony_ci		sock_put(rds_rs_to_sk(rs));
67462306a36Sopenharmony_ci	}
67562306a36Sopenharmony_ci}
67662306a36Sopenharmony_ci
67762306a36Sopenharmony_ci/*
67862306a36Sopenharmony_ci * Transports call here when they've determined that the receiver queued
67962306a36Sopenharmony_ci * messages up to, and including, the given sequence number.  Messages are
68062306a36Sopenharmony_ci * moved to the retrans queue when rds_send_xmit picks them off the send
68162306a36Sopenharmony_ci * queue. This means that in the TCP case, the message may not have been
68262306a36Sopenharmony_ci * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked
68362306a36Sopenharmony_ci * checks the RDS_MSG_HAS_ACK_SEQ bit.
68462306a36Sopenharmony_ci */
68562306a36Sopenharmony_civoid rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack,
68662306a36Sopenharmony_ci			      is_acked_func is_acked)
68762306a36Sopenharmony_ci{
68862306a36Sopenharmony_ci	struct rds_message *rm, *tmp;
68962306a36Sopenharmony_ci	unsigned long flags;
69062306a36Sopenharmony_ci	LIST_HEAD(list);
69162306a36Sopenharmony_ci
69262306a36Sopenharmony_ci	spin_lock_irqsave(&cp->cp_lock, flags);
69362306a36Sopenharmony_ci
69462306a36Sopenharmony_ci	list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
69562306a36Sopenharmony_ci		if (!rds_send_is_acked(rm, ack, is_acked))
69662306a36Sopenharmony_ci			break;
69762306a36Sopenharmony_ci
69862306a36Sopenharmony_ci		list_move(&rm->m_conn_item, &list);
69962306a36Sopenharmony_ci		clear_bit(RDS_MSG_ON_CONN, &rm->m_flags);
70062306a36Sopenharmony_ci	}
70162306a36Sopenharmony_ci
70262306a36Sopenharmony_ci	/* order flag updates with spin locks */
70362306a36Sopenharmony_ci	if (!list_empty(&list))
70462306a36Sopenharmony_ci		smp_mb__after_atomic();
70562306a36Sopenharmony_ci
70662306a36Sopenharmony_ci	spin_unlock_irqrestore(&cp->cp_lock, flags);
70762306a36Sopenharmony_ci
70862306a36Sopenharmony_ci	/* now remove the messages from the sock list as needed */
70962306a36Sopenharmony_ci	rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
71062306a36Sopenharmony_ci}
71162306a36Sopenharmony_ciEXPORT_SYMBOL_GPL(rds_send_path_drop_acked);
71262306a36Sopenharmony_ci
71362306a36Sopenharmony_civoid rds_send_drop_acked(struct rds_connection *conn, u64 ack,
71462306a36Sopenharmony_ci			 is_acked_func is_acked)
71562306a36Sopenharmony_ci{
71662306a36Sopenharmony_ci	WARN_ON(conn->c_trans->t_mp_capable);
71762306a36Sopenharmony_ci	rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked);
71862306a36Sopenharmony_ci}
71962306a36Sopenharmony_ciEXPORT_SYMBOL_GPL(rds_send_drop_acked);
72062306a36Sopenharmony_ci
72162306a36Sopenharmony_civoid rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
72262306a36Sopenharmony_ci{
72362306a36Sopenharmony_ci	struct rds_message *rm, *tmp;
72462306a36Sopenharmony_ci	struct rds_connection *conn;
72562306a36Sopenharmony_ci	struct rds_conn_path *cp;
72662306a36Sopenharmony_ci	unsigned long flags;
72762306a36Sopenharmony_ci	LIST_HEAD(list);
72862306a36Sopenharmony_ci
72962306a36Sopenharmony_ci	/* get all the messages we're dropping under the rs lock */
73062306a36Sopenharmony_ci	spin_lock_irqsave(&rs->rs_lock, flags);
73162306a36Sopenharmony_ci
73262306a36Sopenharmony_ci	list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) {
73362306a36Sopenharmony_ci		if (dest &&
73462306a36Sopenharmony_ci		    (!ipv6_addr_equal(&dest->sin6_addr, &rm->m_daddr) ||
73562306a36Sopenharmony_ci		     dest->sin6_port != rm->m_inc.i_hdr.h_dport))
73662306a36Sopenharmony_ci			continue;
73762306a36Sopenharmony_ci
73862306a36Sopenharmony_ci		list_move(&rm->m_sock_item, &list);
73962306a36Sopenharmony_ci		rds_send_sndbuf_remove(rs, rm);
74062306a36Sopenharmony_ci		clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
74162306a36Sopenharmony_ci	}
74262306a36Sopenharmony_ci
74362306a36Sopenharmony_ci	/* order flag updates with the rs lock */
74462306a36Sopenharmony_ci	smp_mb__after_atomic();
74562306a36Sopenharmony_ci
74662306a36Sopenharmony_ci	spin_unlock_irqrestore(&rs->rs_lock, flags);
74762306a36Sopenharmony_ci
74862306a36Sopenharmony_ci	if (list_empty(&list))
74962306a36Sopenharmony_ci		return;
75062306a36Sopenharmony_ci
75162306a36Sopenharmony_ci	/* Remove the messages from the conn */
75262306a36Sopenharmony_ci	list_for_each_entry(rm, &list, m_sock_item) {
75362306a36Sopenharmony_ci
75462306a36Sopenharmony_ci		conn = rm->m_inc.i_conn;
75562306a36Sopenharmony_ci		if (conn->c_trans->t_mp_capable)
75662306a36Sopenharmony_ci			cp = rm->m_inc.i_conn_path;
75762306a36Sopenharmony_ci		else
75862306a36Sopenharmony_ci			cp = &conn->c_path[0];
75962306a36Sopenharmony_ci
76062306a36Sopenharmony_ci		spin_lock_irqsave(&cp->cp_lock, flags);
76162306a36Sopenharmony_ci		/*
76262306a36Sopenharmony_ci		 * Maybe someone else beat us to removing rm from the conn.
76362306a36Sopenharmony_ci		 * If we race with their flag update we'll get the lock and
76462306a36Sopenharmony_ci		 * then really see that the flag has been cleared.
76562306a36Sopenharmony_ci		 */
76662306a36Sopenharmony_ci		if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
76762306a36Sopenharmony_ci			spin_unlock_irqrestore(&cp->cp_lock, flags);
76862306a36Sopenharmony_ci			continue;
76962306a36Sopenharmony_ci		}
77062306a36Sopenharmony_ci		list_del_init(&rm->m_conn_item);
77162306a36Sopenharmony_ci		spin_unlock_irqrestore(&cp->cp_lock, flags);
77262306a36Sopenharmony_ci
77362306a36Sopenharmony_ci		/*
77462306a36Sopenharmony_ci		 * Couldn't grab m_rs_lock in top loop (lock ordering),
77562306a36Sopenharmony_ci		 * but we can now.
77662306a36Sopenharmony_ci		 */
77762306a36Sopenharmony_ci		spin_lock_irqsave(&rm->m_rs_lock, flags);
77862306a36Sopenharmony_ci
77962306a36Sopenharmony_ci		spin_lock(&rs->rs_lock);
78062306a36Sopenharmony_ci		__rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
78162306a36Sopenharmony_ci		spin_unlock(&rs->rs_lock);
78262306a36Sopenharmony_ci
78362306a36Sopenharmony_ci		spin_unlock_irqrestore(&rm->m_rs_lock, flags);
78462306a36Sopenharmony_ci
78562306a36Sopenharmony_ci		rds_message_put(rm);
78662306a36Sopenharmony_ci	}
78762306a36Sopenharmony_ci
78862306a36Sopenharmony_ci	rds_wake_sk_sleep(rs);
78962306a36Sopenharmony_ci
79062306a36Sopenharmony_ci	while (!list_empty(&list)) {
79162306a36Sopenharmony_ci		rm = list_entry(list.next, struct rds_message, m_sock_item);
79262306a36Sopenharmony_ci		list_del_init(&rm->m_sock_item);
79362306a36Sopenharmony_ci		rds_message_wait(rm);
79462306a36Sopenharmony_ci
79562306a36Sopenharmony_ci		/* just in case the code above skipped this message
79662306a36Sopenharmony_ci		 * because RDS_MSG_ON_CONN wasn't set, run it again here
79762306a36Sopenharmony_ci		 * taking m_rs_lock is the only thing that keeps us
79862306a36Sopenharmony_ci		 * from racing with ack processing.
79962306a36Sopenharmony_ci		 */
80062306a36Sopenharmony_ci		spin_lock_irqsave(&rm->m_rs_lock, flags);
80162306a36Sopenharmony_ci
80262306a36Sopenharmony_ci		spin_lock(&rs->rs_lock);
80362306a36Sopenharmony_ci		__rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
80462306a36Sopenharmony_ci		spin_unlock(&rs->rs_lock);
80562306a36Sopenharmony_ci
80662306a36Sopenharmony_ci		spin_unlock_irqrestore(&rm->m_rs_lock, flags);
80762306a36Sopenharmony_ci
80862306a36Sopenharmony_ci		rds_message_put(rm);
80962306a36Sopenharmony_ci	}
81062306a36Sopenharmony_ci}
81162306a36Sopenharmony_ci
81262306a36Sopenharmony_ci/*
81362306a36Sopenharmony_ci * we only want this to fire once so we use the callers 'queued'.  It's
81462306a36Sopenharmony_ci * possible that another thread can race with us and remove the
81562306a36Sopenharmony_ci * message from the flow with RDS_CANCEL_SENT_TO.
81662306a36Sopenharmony_ci */
81762306a36Sopenharmony_cistatic int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
81862306a36Sopenharmony_ci			     struct rds_conn_path *cp,
81962306a36Sopenharmony_ci			     struct rds_message *rm, __be16 sport,
82062306a36Sopenharmony_ci			     __be16 dport, int *queued)
82162306a36Sopenharmony_ci{
82262306a36Sopenharmony_ci	unsigned long flags;
82362306a36Sopenharmony_ci	u32 len;
82462306a36Sopenharmony_ci
82562306a36Sopenharmony_ci	if (*queued)
82662306a36Sopenharmony_ci		goto out;
82762306a36Sopenharmony_ci
82862306a36Sopenharmony_ci	len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
82962306a36Sopenharmony_ci
83062306a36Sopenharmony_ci	/* this is the only place which holds both the socket's rs_lock
83162306a36Sopenharmony_ci	 * and the connection's c_lock */
83262306a36Sopenharmony_ci	spin_lock_irqsave(&rs->rs_lock, flags);
83362306a36Sopenharmony_ci
83462306a36Sopenharmony_ci	/*
83562306a36Sopenharmony_ci	 * If there is a little space in sndbuf, we don't queue anything,
83662306a36Sopenharmony_ci	 * and userspace gets -EAGAIN. But poll() indicates there's send
83762306a36Sopenharmony_ci	 * room. This can lead to bad behavior (spinning) if snd_bytes isn't
83862306a36Sopenharmony_ci	 * freed up by incoming acks. So we check the *old* value of
83962306a36Sopenharmony_ci	 * rs_snd_bytes here to allow the last msg to exceed the buffer,
84062306a36Sopenharmony_ci	 * and poll() now knows no more data can be sent.
84162306a36Sopenharmony_ci	 */
84262306a36Sopenharmony_ci	if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) {
84362306a36Sopenharmony_ci		rs->rs_snd_bytes += len;
84462306a36Sopenharmony_ci
84562306a36Sopenharmony_ci		/* let recv side know we are close to send space exhaustion.
84662306a36Sopenharmony_ci		 * This is probably not the optimal way to do it, as this
84762306a36Sopenharmony_ci		 * means we set the flag on *all* messages as soon as our
84862306a36Sopenharmony_ci		 * throughput hits a certain threshold.
84962306a36Sopenharmony_ci		 */
85062306a36Sopenharmony_ci		if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2)
85162306a36Sopenharmony_ci			set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
85262306a36Sopenharmony_ci
85362306a36Sopenharmony_ci		list_add_tail(&rm->m_sock_item, &rs->rs_send_queue);
85462306a36Sopenharmony_ci		set_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
85562306a36Sopenharmony_ci		rds_message_addref(rm);
85662306a36Sopenharmony_ci		sock_hold(rds_rs_to_sk(rs));
85762306a36Sopenharmony_ci		rm->m_rs = rs;
85862306a36Sopenharmony_ci
85962306a36Sopenharmony_ci		/* The code ordering is a little weird, but we're
86062306a36Sopenharmony_ci		   trying to minimize the time we hold c_lock */
86162306a36Sopenharmony_ci		rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0);
86262306a36Sopenharmony_ci		rm->m_inc.i_conn = conn;
86362306a36Sopenharmony_ci		rm->m_inc.i_conn_path = cp;
86462306a36Sopenharmony_ci		rds_message_addref(rm);
86562306a36Sopenharmony_ci
86662306a36Sopenharmony_ci		spin_lock(&cp->cp_lock);
86762306a36Sopenharmony_ci		rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++);
86862306a36Sopenharmony_ci		list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
86962306a36Sopenharmony_ci		set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
87062306a36Sopenharmony_ci		spin_unlock(&cp->cp_lock);
87162306a36Sopenharmony_ci
87262306a36Sopenharmony_ci		rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n",
87362306a36Sopenharmony_ci			 rm, len, rs, rs->rs_snd_bytes,
87462306a36Sopenharmony_ci			 (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence));
87562306a36Sopenharmony_ci
87662306a36Sopenharmony_ci		*queued = 1;
87762306a36Sopenharmony_ci	}
87862306a36Sopenharmony_ci
87962306a36Sopenharmony_ci	spin_unlock_irqrestore(&rs->rs_lock, flags);
88062306a36Sopenharmony_ciout:
88162306a36Sopenharmony_ci	return *queued;
88262306a36Sopenharmony_ci}
88362306a36Sopenharmony_ci
88462306a36Sopenharmony_ci/*
88562306a36Sopenharmony_ci * rds_message is getting to be quite complicated, and we'd like to allocate
88662306a36Sopenharmony_ci * it all in one go. This figures out how big it needs to be up front.
88762306a36Sopenharmony_ci */
88862306a36Sopenharmony_cistatic int rds_rm_size(struct msghdr *msg, int num_sgs,
88962306a36Sopenharmony_ci		       struct rds_iov_vector_arr *vct)
89062306a36Sopenharmony_ci{
89162306a36Sopenharmony_ci	struct cmsghdr *cmsg;
89262306a36Sopenharmony_ci	int size = 0;
89362306a36Sopenharmony_ci	int cmsg_groups = 0;
89462306a36Sopenharmony_ci	int retval;
89562306a36Sopenharmony_ci	bool zcopy_cookie = false;
89662306a36Sopenharmony_ci	struct rds_iov_vector *iov, *tmp_iov;
89762306a36Sopenharmony_ci
89862306a36Sopenharmony_ci	if (num_sgs < 0)
89962306a36Sopenharmony_ci		return -EINVAL;
90062306a36Sopenharmony_ci
90162306a36Sopenharmony_ci	for_each_cmsghdr(cmsg, msg) {
90262306a36Sopenharmony_ci		if (!CMSG_OK(msg, cmsg))
90362306a36Sopenharmony_ci			return -EINVAL;
90462306a36Sopenharmony_ci
90562306a36Sopenharmony_ci		if (cmsg->cmsg_level != SOL_RDS)
90662306a36Sopenharmony_ci			continue;
90762306a36Sopenharmony_ci
90862306a36Sopenharmony_ci		switch (cmsg->cmsg_type) {
90962306a36Sopenharmony_ci		case RDS_CMSG_RDMA_ARGS:
91062306a36Sopenharmony_ci			if (vct->indx >= vct->len) {
91162306a36Sopenharmony_ci				vct->len += vct->incr;
91262306a36Sopenharmony_ci				tmp_iov =
91362306a36Sopenharmony_ci					krealloc(vct->vec,
91462306a36Sopenharmony_ci						 vct->len *
91562306a36Sopenharmony_ci						 sizeof(struct rds_iov_vector),
91662306a36Sopenharmony_ci						 GFP_KERNEL);
91762306a36Sopenharmony_ci				if (!tmp_iov) {
91862306a36Sopenharmony_ci					vct->len -= vct->incr;
91962306a36Sopenharmony_ci					return -ENOMEM;
92062306a36Sopenharmony_ci				}
92162306a36Sopenharmony_ci				vct->vec = tmp_iov;
92262306a36Sopenharmony_ci			}
92362306a36Sopenharmony_ci			iov = &vct->vec[vct->indx];
92462306a36Sopenharmony_ci			memset(iov, 0, sizeof(struct rds_iov_vector));
92562306a36Sopenharmony_ci			vct->indx++;
92662306a36Sopenharmony_ci			cmsg_groups |= 1;
92762306a36Sopenharmony_ci			retval = rds_rdma_extra_size(CMSG_DATA(cmsg), iov);
92862306a36Sopenharmony_ci			if (retval < 0)
92962306a36Sopenharmony_ci				return retval;
93062306a36Sopenharmony_ci			size += retval;
93162306a36Sopenharmony_ci
93262306a36Sopenharmony_ci			break;
93362306a36Sopenharmony_ci
93462306a36Sopenharmony_ci		case RDS_CMSG_ZCOPY_COOKIE:
93562306a36Sopenharmony_ci			zcopy_cookie = true;
93662306a36Sopenharmony_ci			fallthrough;
93762306a36Sopenharmony_ci
93862306a36Sopenharmony_ci		case RDS_CMSG_RDMA_DEST:
93962306a36Sopenharmony_ci		case RDS_CMSG_RDMA_MAP:
94062306a36Sopenharmony_ci			cmsg_groups |= 2;
94162306a36Sopenharmony_ci			/* these are valid but do no add any size */
94262306a36Sopenharmony_ci			break;
94362306a36Sopenharmony_ci
94462306a36Sopenharmony_ci		case RDS_CMSG_ATOMIC_CSWP:
94562306a36Sopenharmony_ci		case RDS_CMSG_ATOMIC_FADD:
94662306a36Sopenharmony_ci		case RDS_CMSG_MASKED_ATOMIC_CSWP:
94762306a36Sopenharmony_ci		case RDS_CMSG_MASKED_ATOMIC_FADD:
94862306a36Sopenharmony_ci			cmsg_groups |= 1;
94962306a36Sopenharmony_ci			size += sizeof(struct scatterlist);
95062306a36Sopenharmony_ci			break;
95162306a36Sopenharmony_ci
95262306a36Sopenharmony_ci		default:
95362306a36Sopenharmony_ci			return -EINVAL;
95462306a36Sopenharmony_ci		}
95562306a36Sopenharmony_ci
95662306a36Sopenharmony_ci	}
95762306a36Sopenharmony_ci
95862306a36Sopenharmony_ci	if ((msg->msg_flags & MSG_ZEROCOPY) && !zcopy_cookie)
95962306a36Sopenharmony_ci		return -EINVAL;
96062306a36Sopenharmony_ci
96162306a36Sopenharmony_ci	size += num_sgs * sizeof(struct scatterlist);
96262306a36Sopenharmony_ci
96362306a36Sopenharmony_ci	/* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
96462306a36Sopenharmony_ci	if (cmsg_groups == 3)
96562306a36Sopenharmony_ci		return -EINVAL;
96662306a36Sopenharmony_ci
96762306a36Sopenharmony_ci	return size;
96862306a36Sopenharmony_ci}
96962306a36Sopenharmony_ci
97062306a36Sopenharmony_cistatic int rds_cmsg_zcopy(struct rds_sock *rs, struct rds_message *rm,
97162306a36Sopenharmony_ci			  struct cmsghdr *cmsg)
97262306a36Sopenharmony_ci{
97362306a36Sopenharmony_ci	u32 *cookie;
97462306a36Sopenharmony_ci
97562306a36Sopenharmony_ci	if (cmsg->cmsg_len < CMSG_LEN(sizeof(*cookie)) ||
97662306a36Sopenharmony_ci	    !rm->data.op_mmp_znotifier)
97762306a36Sopenharmony_ci		return -EINVAL;
97862306a36Sopenharmony_ci	cookie = CMSG_DATA(cmsg);
97962306a36Sopenharmony_ci	rm->data.op_mmp_znotifier->z_cookie = *cookie;
98062306a36Sopenharmony_ci	return 0;
98162306a36Sopenharmony_ci}
98262306a36Sopenharmony_ci
98362306a36Sopenharmony_cistatic int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
98462306a36Sopenharmony_ci			 struct msghdr *msg, int *allocated_mr,
98562306a36Sopenharmony_ci			 struct rds_iov_vector_arr *vct)
98662306a36Sopenharmony_ci{
98762306a36Sopenharmony_ci	struct cmsghdr *cmsg;
98862306a36Sopenharmony_ci	int ret = 0, ind = 0;
98962306a36Sopenharmony_ci
99062306a36Sopenharmony_ci	for_each_cmsghdr(cmsg, msg) {
99162306a36Sopenharmony_ci		if (!CMSG_OK(msg, cmsg))
99262306a36Sopenharmony_ci			return -EINVAL;
99362306a36Sopenharmony_ci
99462306a36Sopenharmony_ci		if (cmsg->cmsg_level != SOL_RDS)
99562306a36Sopenharmony_ci			continue;
99662306a36Sopenharmony_ci
99762306a36Sopenharmony_ci		/* As a side effect, RDMA_DEST and RDMA_MAP will set
99862306a36Sopenharmony_ci		 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr.
99962306a36Sopenharmony_ci		 */
100062306a36Sopenharmony_ci		switch (cmsg->cmsg_type) {
100162306a36Sopenharmony_ci		case RDS_CMSG_RDMA_ARGS:
100262306a36Sopenharmony_ci			if (ind >= vct->indx)
100362306a36Sopenharmony_ci				return -ENOMEM;
100462306a36Sopenharmony_ci			ret = rds_cmsg_rdma_args(rs, rm, cmsg, &vct->vec[ind]);
100562306a36Sopenharmony_ci			ind++;
100662306a36Sopenharmony_ci			break;
100762306a36Sopenharmony_ci
100862306a36Sopenharmony_ci		case RDS_CMSG_RDMA_DEST:
100962306a36Sopenharmony_ci			ret = rds_cmsg_rdma_dest(rs, rm, cmsg);
101062306a36Sopenharmony_ci			break;
101162306a36Sopenharmony_ci
101262306a36Sopenharmony_ci		case RDS_CMSG_RDMA_MAP:
101362306a36Sopenharmony_ci			ret = rds_cmsg_rdma_map(rs, rm, cmsg);
101462306a36Sopenharmony_ci			if (!ret)
101562306a36Sopenharmony_ci				*allocated_mr = 1;
101662306a36Sopenharmony_ci			else if (ret == -ENODEV)
101762306a36Sopenharmony_ci				/* Accommodate the get_mr() case which can fail
101862306a36Sopenharmony_ci				 * if connection isn't established yet.
101962306a36Sopenharmony_ci				 */
102062306a36Sopenharmony_ci				ret = -EAGAIN;
102162306a36Sopenharmony_ci			break;
102262306a36Sopenharmony_ci		case RDS_CMSG_ATOMIC_CSWP:
102362306a36Sopenharmony_ci		case RDS_CMSG_ATOMIC_FADD:
102462306a36Sopenharmony_ci		case RDS_CMSG_MASKED_ATOMIC_CSWP:
102562306a36Sopenharmony_ci		case RDS_CMSG_MASKED_ATOMIC_FADD:
102662306a36Sopenharmony_ci			ret = rds_cmsg_atomic(rs, rm, cmsg);
102762306a36Sopenharmony_ci			break;
102862306a36Sopenharmony_ci
102962306a36Sopenharmony_ci		case RDS_CMSG_ZCOPY_COOKIE:
103062306a36Sopenharmony_ci			ret = rds_cmsg_zcopy(rs, rm, cmsg);
103162306a36Sopenharmony_ci			break;
103262306a36Sopenharmony_ci
103362306a36Sopenharmony_ci		default:
103462306a36Sopenharmony_ci			return -EINVAL;
103562306a36Sopenharmony_ci		}
103662306a36Sopenharmony_ci
103762306a36Sopenharmony_ci		if (ret)
103862306a36Sopenharmony_ci			break;
103962306a36Sopenharmony_ci	}
104062306a36Sopenharmony_ci
104162306a36Sopenharmony_ci	return ret;
104262306a36Sopenharmony_ci}
104362306a36Sopenharmony_ci
104462306a36Sopenharmony_cistatic int rds_send_mprds_hash(struct rds_sock *rs,
104562306a36Sopenharmony_ci			       struct rds_connection *conn, int nonblock)
104662306a36Sopenharmony_ci{
104762306a36Sopenharmony_ci	int hash;
104862306a36Sopenharmony_ci
104962306a36Sopenharmony_ci	if (conn->c_npaths == 0)
105062306a36Sopenharmony_ci		hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
105162306a36Sopenharmony_ci	else
105262306a36Sopenharmony_ci		hash = RDS_MPATH_HASH(rs, conn->c_npaths);
105362306a36Sopenharmony_ci	if (conn->c_npaths == 0 && hash != 0) {
105462306a36Sopenharmony_ci		rds_send_ping(conn, 0);
105562306a36Sopenharmony_ci
105662306a36Sopenharmony_ci		/* The underlying connection is not up yet.  Need to wait
105762306a36Sopenharmony_ci		 * until it is up to be sure that the non-zero c_path can be
105862306a36Sopenharmony_ci		 * used.  But if we are interrupted, we have to use the zero
105962306a36Sopenharmony_ci		 * c_path in case the connection ends up being non-MP capable.
106062306a36Sopenharmony_ci		 */
106162306a36Sopenharmony_ci		if (conn->c_npaths == 0) {
106262306a36Sopenharmony_ci			/* Cannot wait for the connection be made, so just use
106362306a36Sopenharmony_ci			 * the base c_path.
106462306a36Sopenharmony_ci			 */
106562306a36Sopenharmony_ci			if (nonblock)
106662306a36Sopenharmony_ci				return 0;
106762306a36Sopenharmony_ci			if (wait_event_interruptible(conn->c_hs_waitq,
106862306a36Sopenharmony_ci						     conn->c_npaths != 0))
106962306a36Sopenharmony_ci				hash = 0;
107062306a36Sopenharmony_ci		}
107162306a36Sopenharmony_ci		if (conn->c_npaths == 1)
107262306a36Sopenharmony_ci			hash = 0;
107362306a36Sopenharmony_ci	}
107462306a36Sopenharmony_ci	return hash;
107562306a36Sopenharmony_ci}
107662306a36Sopenharmony_ci
107762306a36Sopenharmony_cistatic int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes)
107862306a36Sopenharmony_ci{
107962306a36Sopenharmony_ci	struct rds_rdma_args *args;
108062306a36Sopenharmony_ci	struct cmsghdr *cmsg;
108162306a36Sopenharmony_ci
108262306a36Sopenharmony_ci	for_each_cmsghdr(cmsg, msg) {
108362306a36Sopenharmony_ci		if (!CMSG_OK(msg, cmsg))
108462306a36Sopenharmony_ci			return -EINVAL;
108562306a36Sopenharmony_ci
108662306a36Sopenharmony_ci		if (cmsg->cmsg_level != SOL_RDS)
108762306a36Sopenharmony_ci			continue;
108862306a36Sopenharmony_ci
108962306a36Sopenharmony_ci		if (cmsg->cmsg_type == RDS_CMSG_RDMA_ARGS) {
109062306a36Sopenharmony_ci			if (cmsg->cmsg_len <
109162306a36Sopenharmony_ci			    CMSG_LEN(sizeof(struct rds_rdma_args)))
109262306a36Sopenharmony_ci				return -EINVAL;
109362306a36Sopenharmony_ci			args = CMSG_DATA(cmsg);
109462306a36Sopenharmony_ci			*rdma_bytes += args->remote_vec.bytes;
109562306a36Sopenharmony_ci		}
109662306a36Sopenharmony_ci	}
109762306a36Sopenharmony_ci	return 0;
109862306a36Sopenharmony_ci}
109962306a36Sopenharmony_ci
110062306a36Sopenharmony_ciint rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
110162306a36Sopenharmony_ci{
110262306a36Sopenharmony_ci	struct sock *sk = sock->sk;
110362306a36Sopenharmony_ci	struct rds_sock *rs = rds_sk_to_rs(sk);
110462306a36Sopenharmony_ci	DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name);
110562306a36Sopenharmony_ci	DECLARE_SOCKADDR(struct sockaddr_in *, usin, msg->msg_name);
110662306a36Sopenharmony_ci	__be16 dport;
110762306a36Sopenharmony_ci	struct rds_message *rm = NULL;
110862306a36Sopenharmony_ci	struct rds_connection *conn;
110962306a36Sopenharmony_ci	int ret = 0;
111062306a36Sopenharmony_ci	int queued = 0, allocated_mr = 0;
111162306a36Sopenharmony_ci	int nonblock = msg->msg_flags & MSG_DONTWAIT;
111262306a36Sopenharmony_ci	long timeo = sock_sndtimeo(sk, nonblock);
111362306a36Sopenharmony_ci	struct rds_conn_path *cpath;
111462306a36Sopenharmony_ci	struct in6_addr daddr;
111562306a36Sopenharmony_ci	__u32 scope_id = 0;
111662306a36Sopenharmony_ci	size_t rdma_payload_len = 0;
111762306a36Sopenharmony_ci	bool zcopy = ((msg->msg_flags & MSG_ZEROCOPY) &&
111862306a36Sopenharmony_ci		      sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY));
111962306a36Sopenharmony_ci	int num_sgs = DIV_ROUND_UP(payload_len, PAGE_SIZE);
112062306a36Sopenharmony_ci	int namelen;
112162306a36Sopenharmony_ci	struct rds_iov_vector_arr vct;
112262306a36Sopenharmony_ci	int ind;
112362306a36Sopenharmony_ci
112462306a36Sopenharmony_ci	memset(&vct, 0, sizeof(vct));
112562306a36Sopenharmony_ci
112662306a36Sopenharmony_ci	/* expect 1 RDMA CMSG per rds_sendmsg. can still grow if more needed. */
112762306a36Sopenharmony_ci	vct.incr = 1;
112862306a36Sopenharmony_ci
112962306a36Sopenharmony_ci	/* Mirror Linux UDP mirror of BSD error message compatibility */
113062306a36Sopenharmony_ci	/* XXX: Perhaps MSG_MORE someday */
113162306a36Sopenharmony_ci	if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT | MSG_ZEROCOPY)) {
113262306a36Sopenharmony_ci		ret = -EOPNOTSUPP;
113362306a36Sopenharmony_ci		goto out;
113462306a36Sopenharmony_ci	}
113562306a36Sopenharmony_ci
113662306a36Sopenharmony_ci	namelen = msg->msg_namelen;
113762306a36Sopenharmony_ci	if (namelen != 0) {
113862306a36Sopenharmony_ci		if (namelen < sizeof(*usin)) {
113962306a36Sopenharmony_ci			ret = -EINVAL;
114062306a36Sopenharmony_ci			goto out;
114162306a36Sopenharmony_ci		}
114262306a36Sopenharmony_ci		switch (usin->sin_family) {
114362306a36Sopenharmony_ci		case AF_INET:
114462306a36Sopenharmony_ci			if (usin->sin_addr.s_addr == htonl(INADDR_ANY) ||
114562306a36Sopenharmony_ci			    usin->sin_addr.s_addr == htonl(INADDR_BROADCAST) ||
114662306a36Sopenharmony_ci			    ipv4_is_multicast(usin->sin_addr.s_addr)) {
114762306a36Sopenharmony_ci				ret = -EINVAL;
114862306a36Sopenharmony_ci				goto out;
114962306a36Sopenharmony_ci			}
115062306a36Sopenharmony_ci			ipv6_addr_set_v4mapped(usin->sin_addr.s_addr, &daddr);
115162306a36Sopenharmony_ci			dport = usin->sin_port;
115262306a36Sopenharmony_ci			break;
115362306a36Sopenharmony_ci
115462306a36Sopenharmony_ci#if IS_ENABLED(CONFIG_IPV6)
115562306a36Sopenharmony_ci		case AF_INET6: {
115662306a36Sopenharmony_ci			int addr_type;
115762306a36Sopenharmony_ci
115862306a36Sopenharmony_ci			if (namelen < sizeof(*sin6)) {
115962306a36Sopenharmony_ci				ret = -EINVAL;
116062306a36Sopenharmony_ci				goto out;
116162306a36Sopenharmony_ci			}
116262306a36Sopenharmony_ci			addr_type = ipv6_addr_type(&sin6->sin6_addr);
116362306a36Sopenharmony_ci			if (!(addr_type & IPV6_ADDR_UNICAST)) {
116462306a36Sopenharmony_ci				__be32 addr4;
116562306a36Sopenharmony_ci
116662306a36Sopenharmony_ci				if (!(addr_type & IPV6_ADDR_MAPPED)) {
116762306a36Sopenharmony_ci					ret = -EINVAL;
116862306a36Sopenharmony_ci					goto out;
116962306a36Sopenharmony_ci				}
117062306a36Sopenharmony_ci
117162306a36Sopenharmony_ci				/* It is a mapped address.  Need to do some
117262306a36Sopenharmony_ci				 * sanity checks.
117362306a36Sopenharmony_ci				 */
117462306a36Sopenharmony_ci				addr4 = sin6->sin6_addr.s6_addr32[3];
117562306a36Sopenharmony_ci				if (addr4 == htonl(INADDR_ANY) ||
117662306a36Sopenharmony_ci				    addr4 == htonl(INADDR_BROADCAST) ||
117762306a36Sopenharmony_ci				    ipv4_is_multicast(addr4)) {
117862306a36Sopenharmony_ci					ret = -EINVAL;
117962306a36Sopenharmony_ci					goto out;
118062306a36Sopenharmony_ci				}
118162306a36Sopenharmony_ci			}
118262306a36Sopenharmony_ci			if (addr_type & IPV6_ADDR_LINKLOCAL) {
118362306a36Sopenharmony_ci				if (sin6->sin6_scope_id == 0) {
118462306a36Sopenharmony_ci					ret = -EINVAL;
118562306a36Sopenharmony_ci					goto out;
118662306a36Sopenharmony_ci				}
118762306a36Sopenharmony_ci				scope_id = sin6->sin6_scope_id;
118862306a36Sopenharmony_ci			}
118962306a36Sopenharmony_ci
119062306a36Sopenharmony_ci			daddr = sin6->sin6_addr;
119162306a36Sopenharmony_ci			dport = sin6->sin6_port;
119262306a36Sopenharmony_ci			break;
119362306a36Sopenharmony_ci		}
119462306a36Sopenharmony_ci#endif
119562306a36Sopenharmony_ci
119662306a36Sopenharmony_ci		default:
119762306a36Sopenharmony_ci			ret = -EINVAL;
119862306a36Sopenharmony_ci			goto out;
119962306a36Sopenharmony_ci		}
120062306a36Sopenharmony_ci	} else {
120162306a36Sopenharmony_ci		/* We only care about consistency with ->connect() */
120262306a36Sopenharmony_ci		lock_sock(sk);
120362306a36Sopenharmony_ci		daddr = rs->rs_conn_addr;
120462306a36Sopenharmony_ci		dport = rs->rs_conn_port;
120562306a36Sopenharmony_ci		scope_id = rs->rs_bound_scope_id;
120662306a36Sopenharmony_ci		release_sock(sk);
120762306a36Sopenharmony_ci	}
120862306a36Sopenharmony_ci
120962306a36Sopenharmony_ci	lock_sock(sk);
121062306a36Sopenharmony_ci	if (ipv6_addr_any(&rs->rs_bound_addr) || ipv6_addr_any(&daddr)) {
121162306a36Sopenharmony_ci		release_sock(sk);
121262306a36Sopenharmony_ci		ret = -ENOTCONN;
121362306a36Sopenharmony_ci		goto out;
121462306a36Sopenharmony_ci	} else if (namelen != 0) {
121562306a36Sopenharmony_ci		/* Cannot send to an IPv4 address using an IPv6 source
121662306a36Sopenharmony_ci		 * address and cannot send to an IPv6 address using an
121762306a36Sopenharmony_ci		 * IPv4 source address.
121862306a36Sopenharmony_ci		 */
121962306a36Sopenharmony_ci		if (ipv6_addr_v4mapped(&daddr) ^
122062306a36Sopenharmony_ci		    ipv6_addr_v4mapped(&rs->rs_bound_addr)) {
122162306a36Sopenharmony_ci			release_sock(sk);
122262306a36Sopenharmony_ci			ret = -EOPNOTSUPP;
122362306a36Sopenharmony_ci			goto out;
122462306a36Sopenharmony_ci		}
122562306a36Sopenharmony_ci		/* If the socket is already bound to a link local address,
122662306a36Sopenharmony_ci		 * it can only send to peers on the same link.  But allow
122762306a36Sopenharmony_ci		 * communicating between link local and non-link local address.
122862306a36Sopenharmony_ci		 */
122962306a36Sopenharmony_ci		if (scope_id != rs->rs_bound_scope_id) {
123062306a36Sopenharmony_ci			if (!scope_id) {
123162306a36Sopenharmony_ci				scope_id = rs->rs_bound_scope_id;
123262306a36Sopenharmony_ci			} else if (rs->rs_bound_scope_id) {
123362306a36Sopenharmony_ci				release_sock(sk);
123462306a36Sopenharmony_ci				ret = -EINVAL;
123562306a36Sopenharmony_ci				goto out;
123662306a36Sopenharmony_ci			}
123762306a36Sopenharmony_ci		}
123862306a36Sopenharmony_ci	}
123962306a36Sopenharmony_ci	release_sock(sk);
124062306a36Sopenharmony_ci
124162306a36Sopenharmony_ci	ret = rds_rdma_bytes(msg, &rdma_payload_len);
124262306a36Sopenharmony_ci	if (ret)
124362306a36Sopenharmony_ci		goto out;
124462306a36Sopenharmony_ci
124562306a36Sopenharmony_ci	if (max_t(size_t, payload_len, rdma_payload_len) > RDS_MAX_MSG_SIZE) {
124662306a36Sopenharmony_ci		ret = -EMSGSIZE;
124762306a36Sopenharmony_ci		goto out;
124862306a36Sopenharmony_ci	}
124962306a36Sopenharmony_ci
125062306a36Sopenharmony_ci	if (payload_len > rds_sk_sndbuf(rs)) {
125162306a36Sopenharmony_ci		ret = -EMSGSIZE;
125262306a36Sopenharmony_ci		goto out;
125362306a36Sopenharmony_ci	}
125462306a36Sopenharmony_ci
125562306a36Sopenharmony_ci	if (zcopy) {
125662306a36Sopenharmony_ci		if (rs->rs_transport->t_type != RDS_TRANS_TCP) {
125762306a36Sopenharmony_ci			ret = -EOPNOTSUPP;
125862306a36Sopenharmony_ci			goto out;
125962306a36Sopenharmony_ci		}
126062306a36Sopenharmony_ci		num_sgs = iov_iter_npages(&msg->msg_iter, INT_MAX);
126162306a36Sopenharmony_ci	}
126262306a36Sopenharmony_ci	/* size of rm including all sgs */
126362306a36Sopenharmony_ci	ret = rds_rm_size(msg, num_sgs, &vct);
126462306a36Sopenharmony_ci	if (ret < 0)
126562306a36Sopenharmony_ci		goto out;
126662306a36Sopenharmony_ci
126762306a36Sopenharmony_ci	rm = rds_message_alloc(ret, GFP_KERNEL);
126862306a36Sopenharmony_ci	if (!rm) {
126962306a36Sopenharmony_ci		ret = -ENOMEM;
127062306a36Sopenharmony_ci		goto out;
127162306a36Sopenharmony_ci	}
127262306a36Sopenharmony_ci
127362306a36Sopenharmony_ci	/* Attach data to the rm */
127462306a36Sopenharmony_ci	if (payload_len) {
127562306a36Sopenharmony_ci		rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs);
127662306a36Sopenharmony_ci		if (IS_ERR(rm->data.op_sg)) {
127762306a36Sopenharmony_ci			ret = PTR_ERR(rm->data.op_sg);
127862306a36Sopenharmony_ci			goto out;
127962306a36Sopenharmony_ci		}
128062306a36Sopenharmony_ci		ret = rds_message_copy_from_user(rm, &msg->msg_iter, zcopy);
128162306a36Sopenharmony_ci		if (ret)
128262306a36Sopenharmony_ci			goto out;
128362306a36Sopenharmony_ci	}
128462306a36Sopenharmony_ci	rm->data.op_active = 1;
128562306a36Sopenharmony_ci
128662306a36Sopenharmony_ci	rm->m_daddr = daddr;
128762306a36Sopenharmony_ci
128862306a36Sopenharmony_ci	/* rds_conn_create has a spinlock that runs with IRQ off.
128962306a36Sopenharmony_ci	 * Caching the conn in the socket helps a lot. */
129062306a36Sopenharmony_ci	if (rs->rs_conn && ipv6_addr_equal(&rs->rs_conn->c_faddr, &daddr) &&
129162306a36Sopenharmony_ci	    rs->rs_tos == rs->rs_conn->c_tos) {
129262306a36Sopenharmony_ci		conn = rs->rs_conn;
129362306a36Sopenharmony_ci	} else {
129462306a36Sopenharmony_ci		conn = rds_conn_create_outgoing(sock_net(sock->sk),
129562306a36Sopenharmony_ci						&rs->rs_bound_addr, &daddr,
129662306a36Sopenharmony_ci						rs->rs_transport, rs->rs_tos,
129762306a36Sopenharmony_ci						sock->sk->sk_allocation,
129862306a36Sopenharmony_ci						scope_id);
129962306a36Sopenharmony_ci		if (IS_ERR(conn)) {
130062306a36Sopenharmony_ci			ret = PTR_ERR(conn);
130162306a36Sopenharmony_ci			goto out;
130262306a36Sopenharmony_ci		}
130362306a36Sopenharmony_ci		rs->rs_conn = conn;
130462306a36Sopenharmony_ci	}
130562306a36Sopenharmony_ci
130662306a36Sopenharmony_ci	if (conn->c_trans->t_mp_capable)
130762306a36Sopenharmony_ci		cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, nonblock)];
130862306a36Sopenharmony_ci	else
130962306a36Sopenharmony_ci		cpath = &conn->c_path[0];
131062306a36Sopenharmony_ci
131162306a36Sopenharmony_ci	rm->m_conn_path = cpath;
131262306a36Sopenharmony_ci
131362306a36Sopenharmony_ci	/* Parse any control messages the user may have included. */
131462306a36Sopenharmony_ci	ret = rds_cmsg_send(rs, rm, msg, &allocated_mr, &vct);
131562306a36Sopenharmony_ci	if (ret)
131662306a36Sopenharmony_ci		goto out;
131762306a36Sopenharmony_ci
131862306a36Sopenharmony_ci	if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
131962306a36Sopenharmony_ci		printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
132062306a36Sopenharmony_ci			       &rm->rdma, conn->c_trans->xmit_rdma);
132162306a36Sopenharmony_ci		ret = -EOPNOTSUPP;
132262306a36Sopenharmony_ci		goto out;
132362306a36Sopenharmony_ci	}
132462306a36Sopenharmony_ci
132562306a36Sopenharmony_ci	if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) {
132662306a36Sopenharmony_ci		printk_ratelimited(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n",
132762306a36Sopenharmony_ci			       &rm->atomic, conn->c_trans->xmit_atomic);
132862306a36Sopenharmony_ci		ret = -EOPNOTSUPP;
132962306a36Sopenharmony_ci		goto out;
133062306a36Sopenharmony_ci	}
133162306a36Sopenharmony_ci
133262306a36Sopenharmony_ci	if (rds_destroy_pending(conn)) {
133362306a36Sopenharmony_ci		ret = -EAGAIN;
133462306a36Sopenharmony_ci		goto out;
133562306a36Sopenharmony_ci	}
133662306a36Sopenharmony_ci
133762306a36Sopenharmony_ci	if (rds_conn_path_down(cpath))
133862306a36Sopenharmony_ci		rds_check_all_paths(conn);
133962306a36Sopenharmony_ci
134062306a36Sopenharmony_ci	ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
134162306a36Sopenharmony_ci	if (ret) {
134262306a36Sopenharmony_ci		rs->rs_seen_congestion = 1;
134362306a36Sopenharmony_ci		goto out;
134462306a36Sopenharmony_ci	}
134562306a36Sopenharmony_ci	while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port,
134662306a36Sopenharmony_ci				  dport, &queued)) {
134762306a36Sopenharmony_ci		rds_stats_inc(s_send_queue_full);
134862306a36Sopenharmony_ci
134962306a36Sopenharmony_ci		if (nonblock) {
135062306a36Sopenharmony_ci			ret = -EAGAIN;
135162306a36Sopenharmony_ci			goto out;
135262306a36Sopenharmony_ci		}
135362306a36Sopenharmony_ci
135462306a36Sopenharmony_ci		timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
135562306a36Sopenharmony_ci					rds_send_queue_rm(rs, conn, cpath, rm,
135662306a36Sopenharmony_ci							  rs->rs_bound_port,
135762306a36Sopenharmony_ci							  dport,
135862306a36Sopenharmony_ci							  &queued),
135962306a36Sopenharmony_ci					timeo);
136062306a36Sopenharmony_ci		rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo);
136162306a36Sopenharmony_ci		if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
136262306a36Sopenharmony_ci			continue;
136362306a36Sopenharmony_ci
136462306a36Sopenharmony_ci		ret = timeo;
136562306a36Sopenharmony_ci		if (ret == 0)
136662306a36Sopenharmony_ci			ret = -ETIMEDOUT;
136762306a36Sopenharmony_ci		goto out;
136862306a36Sopenharmony_ci	}
136962306a36Sopenharmony_ci
137062306a36Sopenharmony_ci	/*
137162306a36Sopenharmony_ci	 * By now we've committed to the send.  We reuse rds_send_worker()
137262306a36Sopenharmony_ci	 * to retry sends in the rds thread if the transport asks us to.
137362306a36Sopenharmony_ci	 */
137462306a36Sopenharmony_ci	rds_stats_inc(s_send_queued);
137562306a36Sopenharmony_ci
137662306a36Sopenharmony_ci	ret = rds_send_xmit(cpath);
137762306a36Sopenharmony_ci	if (ret == -ENOMEM || ret == -EAGAIN) {
137862306a36Sopenharmony_ci		ret = 0;
137962306a36Sopenharmony_ci		rcu_read_lock();
138062306a36Sopenharmony_ci		if (rds_destroy_pending(cpath->cp_conn))
138162306a36Sopenharmony_ci			ret = -ENETUNREACH;
138262306a36Sopenharmony_ci		else
138362306a36Sopenharmony_ci			queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
138462306a36Sopenharmony_ci		rcu_read_unlock();
138562306a36Sopenharmony_ci	}
138662306a36Sopenharmony_ci	if (ret)
138762306a36Sopenharmony_ci		goto out;
138862306a36Sopenharmony_ci	rds_message_put(rm);
138962306a36Sopenharmony_ci
139062306a36Sopenharmony_ci	for (ind = 0; ind < vct.indx; ind++)
139162306a36Sopenharmony_ci		kfree(vct.vec[ind].iov);
139262306a36Sopenharmony_ci	kfree(vct.vec);
139362306a36Sopenharmony_ci
139462306a36Sopenharmony_ci	return payload_len;
139562306a36Sopenharmony_ci
139662306a36Sopenharmony_ciout:
139762306a36Sopenharmony_ci	for (ind = 0; ind < vct.indx; ind++)
139862306a36Sopenharmony_ci		kfree(vct.vec[ind].iov);
139962306a36Sopenharmony_ci	kfree(vct.vec);
140062306a36Sopenharmony_ci
140162306a36Sopenharmony_ci	/* If the user included a RDMA_MAP cmsg, we allocated a MR on the fly.
140262306a36Sopenharmony_ci	 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN
140362306a36Sopenharmony_ci	 * or in any other way, we need to destroy the MR again */
140462306a36Sopenharmony_ci	if (allocated_mr)
140562306a36Sopenharmony_ci		rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1);
140662306a36Sopenharmony_ci
140762306a36Sopenharmony_ci	if (rm)
140862306a36Sopenharmony_ci		rds_message_put(rm);
140962306a36Sopenharmony_ci	return ret;
141062306a36Sopenharmony_ci}
141162306a36Sopenharmony_ci
141262306a36Sopenharmony_ci/*
141362306a36Sopenharmony_ci * send out a probe. Can be shared by rds_send_ping,
141462306a36Sopenharmony_ci * rds_send_pong, rds_send_hb.
141562306a36Sopenharmony_ci * rds_send_hb should use h_flags
141662306a36Sopenharmony_ci *   RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED
141762306a36Sopenharmony_ci * or
141862306a36Sopenharmony_ci *   RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED
141962306a36Sopenharmony_ci */
142062306a36Sopenharmony_cistatic int
142162306a36Sopenharmony_cirds_send_probe(struct rds_conn_path *cp, __be16 sport,
142262306a36Sopenharmony_ci	       __be16 dport, u8 h_flags)
142362306a36Sopenharmony_ci{
142462306a36Sopenharmony_ci	struct rds_message *rm;
142562306a36Sopenharmony_ci	unsigned long flags;
142662306a36Sopenharmony_ci	int ret = 0;
142762306a36Sopenharmony_ci
142862306a36Sopenharmony_ci	rm = rds_message_alloc(0, GFP_ATOMIC);
142962306a36Sopenharmony_ci	if (!rm) {
143062306a36Sopenharmony_ci		ret = -ENOMEM;
143162306a36Sopenharmony_ci		goto out;
143262306a36Sopenharmony_ci	}
143362306a36Sopenharmony_ci
143462306a36Sopenharmony_ci	rm->m_daddr = cp->cp_conn->c_faddr;
143562306a36Sopenharmony_ci	rm->data.op_active = 1;
143662306a36Sopenharmony_ci
143762306a36Sopenharmony_ci	rds_conn_path_connect_if_down(cp);
143862306a36Sopenharmony_ci
143962306a36Sopenharmony_ci	ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL);
144062306a36Sopenharmony_ci	if (ret)
144162306a36Sopenharmony_ci		goto out;
144262306a36Sopenharmony_ci
144362306a36Sopenharmony_ci	spin_lock_irqsave(&cp->cp_lock, flags);
144462306a36Sopenharmony_ci	list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
144562306a36Sopenharmony_ci	set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
144662306a36Sopenharmony_ci	rds_message_addref(rm);
144762306a36Sopenharmony_ci	rm->m_inc.i_conn = cp->cp_conn;
144862306a36Sopenharmony_ci	rm->m_inc.i_conn_path = cp;
144962306a36Sopenharmony_ci
145062306a36Sopenharmony_ci	rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport,
145162306a36Sopenharmony_ci				    cp->cp_next_tx_seq);
145262306a36Sopenharmony_ci	rm->m_inc.i_hdr.h_flags |= h_flags;
145362306a36Sopenharmony_ci	cp->cp_next_tx_seq++;
145462306a36Sopenharmony_ci
145562306a36Sopenharmony_ci	if (RDS_HS_PROBE(be16_to_cpu(sport), be16_to_cpu(dport)) &&
145662306a36Sopenharmony_ci	    cp->cp_conn->c_trans->t_mp_capable) {
145762306a36Sopenharmony_ci		u16 npaths = cpu_to_be16(RDS_MPATH_WORKERS);
145862306a36Sopenharmony_ci		u32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num);
145962306a36Sopenharmony_ci
146062306a36Sopenharmony_ci		rds_message_add_extension(&rm->m_inc.i_hdr,
146162306a36Sopenharmony_ci					  RDS_EXTHDR_NPATHS, &npaths,
146262306a36Sopenharmony_ci					  sizeof(npaths));
146362306a36Sopenharmony_ci		rds_message_add_extension(&rm->m_inc.i_hdr,
146462306a36Sopenharmony_ci					  RDS_EXTHDR_GEN_NUM,
146562306a36Sopenharmony_ci					  &my_gen_num,
146662306a36Sopenharmony_ci					  sizeof(u32));
146762306a36Sopenharmony_ci	}
146862306a36Sopenharmony_ci	spin_unlock_irqrestore(&cp->cp_lock, flags);
146962306a36Sopenharmony_ci
147062306a36Sopenharmony_ci	rds_stats_inc(s_send_queued);
147162306a36Sopenharmony_ci	rds_stats_inc(s_send_pong);
147262306a36Sopenharmony_ci
147362306a36Sopenharmony_ci	/* schedule the send work on rds_wq */
147462306a36Sopenharmony_ci	rcu_read_lock();
147562306a36Sopenharmony_ci	if (!rds_destroy_pending(cp->cp_conn))
147662306a36Sopenharmony_ci		queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
147762306a36Sopenharmony_ci	rcu_read_unlock();
147862306a36Sopenharmony_ci
147962306a36Sopenharmony_ci	rds_message_put(rm);
148062306a36Sopenharmony_ci	return 0;
148162306a36Sopenharmony_ci
148262306a36Sopenharmony_ciout:
148362306a36Sopenharmony_ci	if (rm)
148462306a36Sopenharmony_ci		rds_message_put(rm);
148562306a36Sopenharmony_ci	return ret;
148662306a36Sopenharmony_ci}
148762306a36Sopenharmony_ci
148862306a36Sopenharmony_ciint
148962306a36Sopenharmony_cirds_send_pong(struct rds_conn_path *cp, __be16 dport)
149062306a36Sopenharmony_ci{
149162306a36Sopenharmony_ci	return rds_send_probe(cp, 0, dport, 0);
149262306a36Sopenharmony_ci}
149362306a36Sopenharmony_ci
149462306a36Sopenharmony_civoid
149562306a36Sopenharmony_cirds_send_ping(struct rds_connection *conn, int cp_index)
149662306a36Sopenharmony_ci{
149762306a36Sopenharmony_ci	unsigned long flags;
149862306a36Sopenharmony_ci	struct rds_conn_path *cp = &conn->c_path[cp_index];
149962306a36Sopenharmony_ci
150062306a36Sopenharmony_ci	spin_lock_irqsave(&cp->cp_lock, flags);
150162306a36Sopenharmony_ci	if (conn->c_ping_triggered) {
150262306a36Sopenharmony_ci		spin_unlock_irqrestore(&cp->cp_lock, flags);
150362306a36Sopenharmony_ci		return;
150462306a36Sopenharmony_ci	}
150562306a36Sopenharmony_ci	conn->c_ping_triggered = 1;
150662306a36Sopenharmony_ci	spin_unlock_irqrestore(&cp->cp_lock, flags);
150762306a36Sopenharmony_ci	rds_send_probe(cp, cpu_to_be16(RDS_FLAG_PROBE_PORT), 0, 0);
150862306a36Sopenharmony_ci}
150962306a36Sopenharmony_ciEXPORT_SYMBOL_GPL(rds_send_ping);
1510