xref: /kernel/linux/linux-5.10/net/rxrpc/output.c (revision 8c2ecf20)
1// SPDX-License-Identifier: GPL-2.0-or-later
2/* RxRPC packet transmission
3 *
4 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5 * Written by David Howells (dhowells@redhat.com)
6 */
7
8#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9
10#include <linux/net.h>
11#include <linux/gfp.h>
12#include <linux/skbuff.h>
13#include <linux/export.h>
14#include <net/sock.h>
15#include <net/af_rxrpc.h>
16#include "ar-internal.h"
17
18struct rxrpc_ack_buffer {
19	struct rxrpc_wire_header whdr;
20	struct rxrpc_ackpacket ack;
21	u8 acks[255];
22	u8 pad[3];
23	struct rxrpc_ackinfo ackinfo;
24};
25
26struct rxrpc_abort_buffer {
27	struct rxrpc_wire_header whdr;
28	__be32 abort_code;
29};
30
31static const char rxrpc_keepalive_string[] = "";
32
33/*
34 * Increase Tx backoff on transmission failure and clear it on success.
35 */
36static void rxrpc_tx_backoff(struct rxrpc_call *call, int ret)
37{
38	if (ret < 0) {
39		u16 tx_backoff = READ_ONCE(call->tx_backoff);
40
41		if (tx_backoff < HZ)
42			WRITE_ONCE(call->tx_backoff, tx_backoff + 1);
43	} else {
44		WRITE_ONCE(call->tx_backoff, 0);
45	}
46}
47
48/*
49 * Arrange for a keepalive ping a certain time after we last transmitted.  This
50 * lets the far side know we're still interested in this call and helps keep
51 * the route through any intervening firewall open.
52 *
53 * Receiving a response to the ping will prevent the ->expect_rx_by timer from
54 * expiring.
55 */
56static void rxrpc_set_keepalive(struct rxrpc_call *call)
57{
58	unsigned long now = jiffies, keepalive_at = call->next_rx_timo / 6;
59
60	keepalive_at += now;
61	WRITE_ONCE(call->keepalive_at, keepalive_at);
62	rxrpc_reduce_call_timer(call, keepalive_at, now,
63				rxrpc_timer_set_for_keepalive);
64}
65
66/*
67 * Fill out an ACK packet.
68 */
69static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
70				 struct rxrpc_call *call,
71				 struct rxrpc_ack_buffer *pkt,
72				 rxrpc_seq_t *_hard_ack,
73				 rxrpc_seq_t *_top,
74				 u8 reason)
75{
76	rxrpc_serial_t serial;
77	unsigned int tmp;
78	rxrpc_seq_t hard_ack, top, seq;
79	int ix;
80	u32 mtu, jmax;
81	u8 *ackp = pkt->acks;
82
83	tmp = atomic_xchg(&call->ackr_nr_unacked, 0);
84	tmp |= atomic_xchg(&call->ackr_nr_consumed, 0);
85	if (!tmp && (reason == RXRPC_ACK_DELAY ||
86		     reason == RXRPC_ACK_IDLE))
87		return 0;
88
89	/* Barrier against rxrpc_input_data(). */
90	serial = call->ackr_serial;
91	hard_ack = READ_ONCE(call->rx_hard_ack);
92	top = smp_load_acquire(&call->rx_top);
93	*_hard_ack = hard_ack;
94	*_top = top;
95
96	pkt->ack.bufferSpace	= htons(0);
97	pkt->ack.maxSkew	= htons(0);
98	pkt->ack.firstPacket	= htonl(hard_ack + 1);
99	pkt->ack.previousPacket	= htonl(call->ackr_highest_seq);
100	pkt->ack.serial		= htonl(serial);
101	pkt->ack.reason		= reason;
102	pkt->ack.nAcks		= top - hard_ack;
103
104	if (reason == RXRPC_ACK_PING)
105		pkt->whdr.flags |= RXRPC_REQUEST_ACK;
106
107	if (after(top, hard_ack)) {
108		seq = hard_ack + 1;
109		do {
110			ix = seq & RXRPC_RXTX_BUFF_MASK;
111			if (call->rxtx_buffer[ix])
112				*ackp++ = RXRPC_ACK_TYPE_ACK;
113			else
114				*ackp++ = RXRPC_ACK_TYPE_NACK;
115			seq++;
116		} while (before_eq(seq, top));
117	}
118
119	mtu = conn->params.peer->if_mtu;
120	mtu -= conn->params.peer->hdrsize;
121	jmax = (call->nr_jumbo_bad > 3) ? 1 : rxrpc_rx_jumbo_max;
122	pkt->ackinfo.rxMTU	= htonl(rxrpc_rx_mtu);
123	pkt->ackinfo.maxMTU	= htonl(mtu);
124	pkt->ackinfo.rwind	= htonl(call->rx_winsize);
125	pkt->ackinfo.jumbo_max	= htonl(jmax);
126
127	*ackp++ = 0;
128	*ackp++ = 0;
129	*ackp++ = 0;
130	return top - hard_ack + 3;
131}
132
133/*
134 * Record the beginning of an RTT probe.
135 */
136static int rxrpc_begin_rtt_probe(struct rxrpc_call *call, rxrpc_serial_t serial,
137				 enum rxrpc_rtt_tx_trace why)
138{
139	unsigned long avail = call->rtt_avail;
140	int rtt_slot = 9;
141
142	if (!(avail & RXRPC_CALL_RTT_AVAIL_MASK))
143		goto no_slot;
144
145	rtt_slot = __ffs(avail & RXRPC_CALL_RTT_AVAIL_MASK);
146	if (!test_and_clear_bit(rtt_slot, &call->rtt_avail))
147		goto no_slot;
148
149	call->rtt_serial[rtt_slot] = serial;
150	call->rtt_sent_at[rtt_slot] = ktime_get_real();
151	smp_wmb(); /* Write data before avail bit */
152	set_bit(rtt_slot + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);
153
154	trace_rxrpc_rtt_tx(call, why, rtt_slot, serial);
155	return rtt_slot;
156
157no_slot:
158	trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_no_slot, rtt_slot, serial);
159	return -1;
160}
161
162/*
163 * Cancel an RTT probe.
164 */
165static void rxrpc_cancel_rtt_probe(struct rxrpc_call *call,
166				   rxrpc_serial_t serial, int rtt_slot)
167{
168	if (rtt_slot != -1) {
169		clear_bit(rtt_slot + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);
170		smp_wmb(); /* Clear pending bit before setting slot */
171		set_bit(rtt_slot, &call->rtt_avail);
172		trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_cancel, rtt_slot, serial);
173	}
174}
175
176/*
177 * Send an ACK call packet.
178 */
179int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping,
180			  rxrpc_serial_t *_serial)
181{
182	struct rxrpc_connection *conn;
183	struct rxrpc_ack_buffer *pkt;
184	struct msghdr msg;
185	struct kvec iov[2];
186	rxrpc_serial_t serial;
187	rxrpc_seq_t hard_ack, top;
188	size_t len, n;
189	int ret, rtt_slot = -1;
190	u8 reason;
191
192	if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
193		return -ECONNRESET;
194
195	pkt = kzalloc(sizeof(*pkt), GFP_KERNEL);
196	if (!pkt)
197		return -ENOMEM;
198
199	conn = call->conn;
200
201	msg.msg_name	= &call->peer->srx.transport;
202	msg.msg_namelen	= call->peer->srx.transport_len;
203	msg.msg_control	= NULL;
204	msg.msg_controllen = 0;
205	msg.msg_flags	= 0;
206
207	pkt->whdr.epoch		= htonl(conn->proto.epoch);
208	pkt->whdr.cid		= htonl(call->cid);
209	pkt->whdr.callNumber	= htonl(call->call_id);
210	pkt->whdr.seq		= 0;
211	pkt->whdr.type		= RXRPC_PACKET_TYPE_ACK;
212	pkt->whdr.flags		= RXRPC_SLOW_START_OK | conn->out_clientflag;
213	pkt->whdr.userStatus	= 0;
214	pkt->whdr.securityIndex	= call->security_ix;
215	pkt->whdr._rsvd		= 0;
216	pkt->whdr.serviceId	= htons(call->service_id);
217
218	spin_lock_bh(&call->lock);
219	if (ping) {
220		reason = RXRPC_ACK_PING;
221	} else {
222		reason = call->ackr_reason;
223		if (!call->ackr_reason) {
224			spin_unlock_bh(&call->lock);
225			ret = 0;
226			goto out;
227		}
228		call->ackr_reason = 0;
229	}
230	n = rxrpc_fill_out_ack(conn, call, pkt, &hard_ack, &top, reason);
231
232	spin_unlock_bh(&call->lock);
233	if (n == 0) {
234		kfree(pkt);
235		return 0;
236	}
237
238	iov[0].iov_base	= pkt;
239	iov[0].iov_len	= sizeof(pkt->whdr) + sizeof(pkt->ack) + n;
240	iov[1].iov_base = &pkt->ackinfo;
241	iov[1].iov_len	= sizeof(pkt->ackinfo);
242	len = iov[0].iov_len + iov[1].iov_len;
243
244	serial = atomic_inc_return(&conn->serial);
245	pkt->whdr.serial = htonl(serial);
246	trace_rxrpc_tx_ack(call->debug_id, serial,
247			   ntohl(pkt->ack.firstPacket),
248			   ntohl(pkt->ack.serial),
249			   pkt->ack.reason, pkt->ack.nAcks);
250	if (_serial)
251		*_serial = serial;
252
253	if (ping)
254		rtt_slot = rxrpc_begin_rtt_probe(call, serial, rxrpc_rtt_tx_ping);
255
256	ret = kernel_sendmsg(conn->params.local->socket, &msg, iov, 2, len);
257	conn->params.peer->last_tx_at = ktime_get_seconds();
258	if (ret < 0)
259		trace_rxrpc_tx_fail(call->debug_id, serial, ret,
260				    rxrpc_tx_point_call_ack);
261	else
262		trace_rxrpc_tx_packet(call->debug_id, &pkt->whdr,
263				      rxrpc_tx_point_call_ack);
264	rxrpc_tx_backoff(call, ret);
265
266	if (call->state < RXRPC_CALL_COMPLETE) {
267		if (ret < 0) {
268			rxrpc_cancel_rtt_probe(call, serial, rtt_slot);
269			rxrpc_propose_ACK(call, pkt->ack.reason,
270					  ntohl(pkt->ack.serial),
271					  false, true,
272					  rxrpc_propose_ack_retry_tx);
273		}
274
275		rxrpc_set_keepalive(call);
276	}
277
278out:
279	kfree(pkt);
280	return ret;
281}
282
283/*
284 * Send an ABORT call packet.
285 */
286int rxrpc_send_abort_packet(struct rxrpc_call *call)
287{
288	struct rxrpc_connection *conn;
289	struct rxrpc_abort_buffer pkt;
290	struct msghdr msg;
291	struct kvec iov[1];
292	rxrpc_serial_t serial;
293	int ret;
294
295	/* Don't bother sending aborts for a client call once the server has
296	 * hard-ACK'd all of its request data.  After that point, we're not
297	 * going to stop the operation proceeding, and whilst we might limit
298	 * the reply, it's not worth it if we can send a new call on the same
299	 * channel instead, thereby closing off this call.
300	 */
301	if (rxrpc_is_client_call(call) &&
302	    test_bit(RXRPC_CALL_TX_LAST, &call->flags))
303		return 0;
304
305	if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
306		return -ECONNRESET;
307
308	conn = call->conn;
309
310	msg.msg_name	= &call->peer->srx.transport;
311	msg.msg_namelen	= call->peer->srx.transport_len;
312	msg.msg_control	= NULL;
313	msg.msg_controllen = 0;
314	msg.msg_flags	= 0;
315
316	pkt.whdr.epoch		= htonl(conn->proto.epoch);
317	pkt.whdr.cid		= htonl(call->cid);
318	pkt.whdr.callNumber	= htonl(call->call_id);
319	pkt.whdr.seq		= 0;
320	pkt.whdr.type		= RXRPC_PACKET_TYPE_ABORT;
321	pkt.whdr.flags		= conn->out_clientflag;
322	pkt.whdr.userStatus	= 0;
323	pkt.whdr.securityIndex	= call->security_ix;
324	pkt.whdr._rsvd		= 0;
325	pkt.whdr.serviceId	= htons(call->service_id);
326	pkt.abort_code		= htonl(call->abort_code);
327
328	iov[0].iov_base	= &pkt;
329	iov[0].iov_len	= sizeof(pkt);
330
331	serial = atomic_inc_return(&conn->serial);
332	pkt.whdr.serial = htonl(serial);
333
334	ret = kernel_sendmsg(conn->params.local->socket,
335			     &msg, iov, 1, sizeof(pkt));
336	conn->params.peer->last_tx_at = ktime_get_seconds();
337	if (ret < 0)
338		trace_rxrpc_tx_fail(call->debug_id, serial, ret,
339				    rxrpc_tx_point_call_abort);
340	else
341		trace_rxrpc_tx_packet(call->debug_id, &pkt.whdr,
342				      rxrpc_tx_point_call_abort);
343	rxrpc_tx_backoff(call, ret);
344	return ret;
345}
346
347/*
348 * send a packet through the transport endpoint
349 */
350int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb,
351			   bool retrans)
352{
353	struct rxrpc_connection *conn = call->conn;
354	struct rxrpc_wire_header whdr;
355	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
356	struct msghdr msg;
357	struct kvec iov[2];
358	rxrpc_serial_t serial;
359	size_t len;
360	int ret, rtt_slot = -1;
361
362	_enter(",{%d}", skb->len);
363
364	if (hlist_unhashed(&call->error_link)) {
365		spin_lock_bh(&call->peer->lock);
366		hlist_add_head_rcu(&call->error_link, &call->peer->error_targets);
367		spin_unlock_bh(&call->peer->lock);
368	}
369
370	/* Each transmission of a Tx packet needs a new serial number */
371	serial = atomic_inc_return(&conn->serial);
372
373	whdr.epoch	= htonl(conn->proto.epoch);
374	whdr.cid	= htonl(call->cid);
375	whdr.callNumber	= htonl(call->call_id);
376	whdr.seq	= htonl(sp->hdr.seq);
377	whdr.serial	= htonl(serial);
378	whdr.type	= RXRPC_PACKET_TYPE_DATA;
379	whdr.flags	= sp->hdr.flags;
380	whdr.userStatus	= 0;
381	whdr.securityIndex = call->security_ix;
382	whdr._rsvd	= htons(sp->hdr._rsvd);
383	whdr.serviceId	= htons(call->service_id);
384
385	if (test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags) &&
386	    sp->hdr.seq == 1)
387		whdr.userStatus	= RXRPC_USERSTATUS_SERVICE_UPGRADE;
388
389	iov[0].iov_base = &whdr;
390	iov[0].iov_len = sizeof(whdr);
391	iov[1].iov_base = skb->head;
392	iov[1].iov_len = skb->len;
393	len = iov[0].iov_len + iov[1].iov_len;
394
395	msg.msg_name = &call->peer->srx.transport;
396	msg.msg_namelen = call->peer->srx.transport_len;
397	msg.msg_control = NULL;
398	msg.msg_controllen = 0;
399	msg.msg_flags = 0;
400
401	/* If our RTT cache needs working on, request an ACK.  Also request
402	 * ACKs if a DATA packet appears to have been lost.
403	 *
404	 * However, we mustn't request an ACK on the last reply packet of a
405	 * service call, lest OpenAFS incorrectly send us an ACK with some
406	 * soft-ACKs in it and then never follow up with a proper hard ACK.
407	 */
408	if ((!(sp->hdr.flags & RXRPC_LAST_PACKET) ||
409	     rxrpc_to_server(sp)
410	     ) &&
411	    (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events) ||
412	     retrans ||
413	     call->cong_mode == RXRPC_CALL_SLOW_START ||
414	     (call->peer->rtt_count < 3 && sp->hdr.seq & 1) ||
415	     ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000),
416			  ktime_get_real())))
417		whdr.flags |= RXRPC_REQUEST_ACK;
418
419	if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
420		static int lose;
421		if ((lose++ & 7) == 7) {
422			ret = 0;
423			trace_rxrpc_tx_data(call, sp->hdr.seq, serial,
424					    whdr.flags, retrans, true);
425			goto done;
426		}
427	}
428
429	trace_rxrpc_tx_data(call, sp->hdr.seq, serial, whdr.flags, retrans,
430			    false);
431
432	/* send the packet with the don't fragment bit set if we currently
433	 * think it's small enough */
434	if (iov[1].iov_len >= call->peer->maxdata)
435		goto send_fragmentable;
436
437	down_read(&conn->params.local->defrag_sem);
438
439	sp->hdr.serial = serial;
440	smp_wmb(); /* Set serial before timestamp */
441	skb->tstamp = ktime_get_real();
442	if (whdr.flags & RXRPC_REQUEST_ACK)
443		rtt_slot = rxrpc_begin_rtt_probe(call, serial, rxrpc_rtt_tx_data);
444
445	/* send the packet by UDP
446	 * - returns -EMSGSIZE if UDP would have to fragment the packet
447	 *   to go out of the interface
448	 *   - in which case, we'll have processed the ICMP error
449	 *     message and update the peer record
450	 */
451	ret = kernel_sendmsg(conn->params.local->socket, &msg, iov, 2, len);
452	conn->params.peer->last_tx_at = ktime_get_seconds();
453
454	up_read(&conn->params.local->defrag_sem);
455	if (ret < 0) {
456		rxrpc_cancel_rtt_probe(call, serial, rtt_slot);
457		trace_rxrpc_tx_fail(call->debug_id, serial, ret,
458				    rxrpc_tx_point_call_data_nofrag);
459	} else {
460		trace_rxrpc_tx_packet(call->debug_id, &whdr,
461				      rxrpc_tx_point_call_data_nofrag);
462	}
463
464	rxrpc_tx_backoff(call, ret);
465	if (ret == -EMSGSIZE)
466		goto send_fragmentable;
467
468done:
469	if (ret >= 0) {
470		if (whdr.flags & RXRPC_REQUEST_ACK) {
471			call->peer->rtt_last_req = skb->tstamp;
472			if (call->peer->rtt_count > 1) {
473				unsigned long nowj = jiffies, ack_lost_at;
474
475				ack_lost_at = rxrpc_get_rto_backoff(call->peer, false);
476				ack_lost_at += nowj;
477				WRITE_ONCE(call->ack_lost_at, ack_lost_at);
478				rxrpc_reduce_call_timer(call, ack_lost_at, nowj,
479							rxrpc_timer_set_for_lost_ack);
480			}
481		}
482
483		if (sp->hdr.seq == 1 &&
484		    !test_and_set_bit(RXRPC_CALL_BEGAN_RX_TIMER,
485				      &call->flags)) {
486			unsigned long nowj = jiffies, expect_rx_by;
487
488			expect_rx_by = nowj + call->next_rx_timo;
489			WRITE_ONCE(call->expect_rx_by, expect_rx_by);
490			rxrpc_reduce_call_timer(call, expect_rx_by, nowj,
491						rxrpc_timer_set_for_normal);
492		}
493
494		rxrpc_set_keepalive(call);
495	} else {
496		/* Cancel the call if the initial transmission fails,
497		 * particularly if that's due to network routing issues that
498		 * aren't going away anytime soon.  The layer above can arrange
499		 * the retransmission.
500		 */
501		if (!test_and_set_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags))
502			rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
503						  RX_USER_ABORT, ret);
504	}
505
506	_leave(" = %d [%u]", ret, call->peer->maxdata);
507	return ret;
508
509send_fragmentable:
510	/* attempt to send this message with fragmentation enabled */
511	_debug("send fragment");
512
513	down_write(&conn->params.local->defrag_sem);
514
515	sp->hdr.serial = serial;
516	smp_wmb(); /* Set serial before timestamp */
517	skb->tstamp = ktime_get_real();
518	if (whdr.flags & RXRPC_REQUEST_ACK)
519		rtt_slot = rxrpc_begin_rtt_probe(call, serial, rxrpc_rtt_tx_data);
520
521	switch (conn->params.local->srx.transport.family) {
522	case AF_INET6:
523	case AF_INET:
524		ip_sock_set_mtu_discover(conn->params.local->socket->sk,
525				IP_PMTUDISC_DONT);
526		ret = kernel_sendmsg(conn->params.local->socket, &msg,
527				     iov, 2, len);
528		conn->params.peer->last_tx_at = ktime_get_seconds();
529
530		ip_sock_set_mtu_discover(conn->params.local->socket->sk,
531				IP_PMTUDISC_DO);
532		break;
533
534	default:
535		BUG();
536	}
537
538	if (ret < 0) {
539		rxrpc_cancel_rtt_probe(call, serial, rtt_slot);
540		trace_rxrpc_tx_fail(call->debug_id, serial, ret,
541				    rxrpc_tx_point_call_data_frag);
542	} else {
543		trace_rxrpc_tx_packet(call->debug_id, &whdr,
544				      rxrpc_tx_point_call_data_frag);
545	}
546	rxrpc_tx_backoff(call, ret);
547
548	up_write(&conn->params.local->defrag_sem);
549	goto done;
550}
551
552/*
553 * reject packets through the local endpoint
554 */
555void rxrpc_reject_packets(struct rxrpc_local *local)
556{
557	struct sockaddr_rxrpc srx;
558	struct rxrpc_skb_priv *sp;
559	struct rxrpc_wire_header whdr;
560	struct sk_buff *skb;
561	struct msghdr msg;
562	struct kvec iov[2];
563	size_t size;
564	__be32 code;
565	int ret, ioc;
566
567	_enter("%d", local->debug_id);
568
569	iov[0].iov_base = &whdr;
570	iov[0].iov_len = sizeof(whdr);
571	iov[1].iov_base = &code;
572	iov[1].iov_len = sizeof(code);
573
574	msg.msg_name = &srx.transport;
575	msg.msg_control = NULL;
576	msg.msg_controllen = 0;
577	msg.msg_flags = 0;
578
579	memset(&whdr, 0, sizeof(whdr));
580
581	while ((skb = skb_dequeue(&local->reject_queue))) {
582		rxrpc_see_skb(skb, rxrpc_skb_seen);
583		sp = rxrpc_skb(skb);
584
585		switch (skb->mark) {
586		case RXRPC_SKB_MARK_REJECT_BUSY:
587			whdr.type = RXRPC_PACKET_TYPE_BUSY;
588			size = sizeof(whdr);
589			ioc = 1;
590			break;
591		case RXRPC_SKB_MARK_REJECT_ABORT:
592			whdr.type = RXRPC_PACKET_TYPE_ABORT;
593			code = htonl(skb->priority);
594			size = sizeof(whdr) + sizeof(code);
595			ioc = 2;
596			break;
597		default:
598			rxrpc_free_skb(skb, rxrpc_skb_freed);
599			continue;
600		}
601
602		if (rxrpc_extract_addr_from_skb(&srx, skb) == 0) {
603			msg.msg_namelen = srx.transport_len;
604
605			whdr.epoch	= htonl(sp->hdr.epoch);
606			whdr.cid	= htonl(sp->hdr.cid);
607			whdr.callNumber	= htonl(sp->hdr.callNumber);
608			whdr.serviceId	= htons(sp->hdr.serviceId);
609			whdr.flags	= sp->hdr.flags;
610			whdr.flags	^= RXRPC_CLIENT_INITIATED;
611			whdr.flags	&= RXRPC_CLIENT_INITIATED;
612
613			ret = kernel_sendmsg(local->socket, &msg,
614					     iov, ioc, size);
615			if (ret < 0)
616				trace_rxrpc_tx_fail(local->debug_id, 0, ret,
617						    rxrpc_tx_point_reject);
618			else
619				trace_rxrpc_tx_packet(local->debug_id, &whdr,
620						      rxrpc_tx_point_reject);
621		}
622
623		rxrpc_free_skb(skb, rxrpc_skb_freed);
624	}
625
626	_leave("");
627}
628
629/*
630 * Send a VERSION reply to a peer as a keepalive.
631 */
632void rxrpc_send_keepalive(struct rxrpc_peer *peer)
633{
634	struct rxrpc_wire_header whdr;
635	struct msghdr msg;
636	struct kvec iov[2];
637	size_t len;
638	int ret;
639
640	_enter("");
641
642	msg.msg_name	= &peer->srx.transport;
643	msg.msg_namelen	= peer->srx.transport_len;
644	msg.msg_control	= NULL;
645	msg.msg_controllen = 0;
646	msg.msg_flags	= 0;
647
648	whdr.epoch	= htonl(peer->local->rxnet->epoch);
649	whdr.cid	= 0;
650	whdr.callNumber	= 0;
651	whdr.seq	= 0;
652	whdr.serial	= 0;
653	whdr.type	= RXRPC_PACKET_TYPE_VERSION; /* Not client-initiated */
654	whdr.flags	= RXRPC_LAST_PACKET;
655	whdr.userStatus	= 0;
656	whdr.securityIndex = 0;
657	whdr._rsvd	= 0;
658	whdr.serviceId	= 0;
659
660	iov[0].iov_base	= &whdr;
661	iov[0].iov_len	= sizeof(whdr);
662	iov[1].iov_base	= (char *)rxrpc_keepalive_string;
663	iov[1].iov_len	= sizeof(rxrpc_keepalive_string);
664
665	len = iov[0].iov_len + iov[1].iov_len;
666
667	_proto("Tx VERSION (keepalive)");
668
669	ret = kernel_sendmsg(peer->local->socket, &msg, iov, 2, len);
670	if (ret < 0)
671		trace_rxrpc_tx_fail(peer->debug_id, 0, ret,
672				    rxrpc_tx_point_version_keepalive);
673	else
674		trace_rxrpc_tx_packet(peer->debug_id, &whdr,
675				      rxrpc_tx_point_version_keepalive);
676
677	peer->last_tx_at = ktime_get_seconds();
678	_leave("");
679}
680