xref: /kernel/linux/linux-6.6/net/rxrpc/txbuf.c (revision 62306a36)
1// SPDX-License-Identifier: GPL-2.0-or-later
2/* RxRPC Tx data buffering.
3 *
4 * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved.
5 * Written by David Howells (dhowells@redhat.com)
6 */
7
8#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9
10#include <linux/slab.h>
11#include "ar-internal.h"
12
13static atomic_t rxrpc_txbuf_debug_ids;
14atomic_t rxrpc_nr_txbuf;
15
16/*
17 * Allocate and partially initialise an I/O request structure.
18 */
19struct rxrpc_txbuf *rxrpc_alloc_txbuf(struct rxrpc_call *call, u8 packet_type,
20				      gfp_t gfp)
21{
22	struct rxrpc_txbuf *txb;
23
24	txb = kmalloc(sizeof(*txb), gfp);
25	if (txb) {
26		INIT_LIST_HEAD(&txb->call_link);
27		INIT_LIST_HEAD(&txb->tx_link);
28		refcount_set(&txb->ref, 1);
29		txb->call_debug_id	= call->debug_id;
30		txb->debug_id		= atomic_inc_return(&rxrpc_txbuf_debug_ids);
31		txb->space		= sizeof(txb->data);
32		txb->len		= 0;
33		txb->offset		= 0;
34		txb->flags		= 0;
35		txb->ack_why		= 0;
36		txb->seq		= call->tx_prepared + 1;
37		txb->wire.epoch		= htonl(call->conn->proto.epoch);
38		txb->wire.cid		= htonl(call->cid);
39		txb->wire.callNumber	= htonl(call->call_id);
40		txb->wire.seq		= htonl(txb->seq);
41		txb->wire.type		= packet_type;
42		txb->wire.flags		= call->conn->out_clientflag;
43		txb->wire.userStatus	= 0;
44		txb->wire.securityIndex	= call->security_ix;
45		txb->wire._rsvd		= 0;
46		txb->wire.serviceId	= htons(call->dest_srx.srx_service);
47
48		trace_rxrpc_txbuf(txb->debug_id,
49				  txb->call_debug_id, txb->seq, 1,
50				  packet_type == RXRPC_PACKET_TYPE_DATA ?
51				  rxrpc_txbuf_alloc_data :
52				  rxrpc_txbuf_alloc_ack);
53		atomic_inc(&rxrpc_nr_txbuf);
54	}
55
56	return txb;
57}
58
59void rxrpc_get_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
60{
61	int r;
62
63	__refcount_inc(&txb->ref, &r);
64	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r + 1, what);
65}
66
67void rxrpc_see_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
68{
69	int r = refcount_read(&txb->ref);
70
71	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r, what);
72}
73
74static void rxrpc_free_txbuf(struct rcu_head *rcu)
75{
76	struct rxrpc_txbuf *txb = container_of(rcu, struct rxrpc_txbuf, rcu);
77
78	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 0,
79			  rxrpc_txbuf_free);
80	kfree(txb);
81	atomic_dec(&rxrpc_nr_txbuf);
82}
83
84void rxrpc_put_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
85{
86	unsigned int debug_id, call_debug_id;
87	rxrpc_seq_t seq;
88	bool dead;
89	int r;
90
91	if (txb) {
92		debug_id = txb->debug_id;
93		call_debug_id = txb->call_debug_id;
94		seq = txb->seq;
95		dead = __refcount_dec_and_test(&txb->ref, &r);
96		trace_rxrpc_txbuf(debug_id, call_debug_id, seq, r - 1, what);
97		if (dead)
98			call_rcu(&txb->rcu, rxrpc_free_txbuf);
99	}
100}
101
102/*
103 * Shrink the transmit buffer.
104 */
105void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
106{
107	struct rxrpc_txbuf *txb;
108	rxrpc_seq_t hard_ack = smp_load_acquire(&call->acks_hard_ack);
109	bool wake = false;
110
111	_enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top);
112
113	while ((txb = list_first_entry_or_null(&call->tx_buffer,
114					       struct rxrpc_txbuf, call_link))) {
115		hard_ack = smp_load_acquire(&call->acks_hard_ack);
116		if (before(hard_ack, txb->seq))
117			break;
118
119		if (txb->seq != call->tx_bottom + 1)
120			rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step);
121		ASSERTCMP(txb->seq, ==, call->tx_bottom + 1);
122		smp_store_release(&call->tx_bottom, call->tx_bottom + 1);
123		list_del_rcu(&txb->call_link);
124
125		trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue);
126
127		rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated);
128		if (after(call->acks_hard_ack, call->tx_bottom + 128))
129			wake = true;
130	}
131
132	if (wake)
133		wake_up(&call->waitq);
134}
135