1// SPDX-License-Identifier: GPL-2.0-or-later
2/* Client connection-specific management code.
3 *
4 * Copyright (C) 2016, 2020 Red Hat, Inc. All Rights Reserved.
5 * Written by David Howells (dhowells@redhat.com)
6 *
7 * Client connections need to be cached for a little while after they've made a
8 * call so as to handle retransmitted DATA packets in case the server didn't
9 * receive the final ACK or terminating ABORT we sent it.
10 *
11 * There are flags of relevance to the cache:
12 *
13 *  (2) DONT_REUSE - The connection should be discarded as soon as possible and
14 *      should not be reused.  This is set when an exclusive connection is used
15 *      or a call ID counter overflows.
16 *
17 * The caching state may only be changed if the cache lock is held.
18 *
19 * There are two idle client connection expiry durations.  If the total number
20 * of connections is below the reap threshold, we use the normal duration; if
21 * it's above, we use the fast duration.
22 */
23
24#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
25
26#include <linux/slab.h>
27#include <linux/idr.h>
28#include <linux/timer.h>
29#include <linux/sched/signal.h>
30
31#include "ar-internal.h"
32
33__read_mostly unsigned int rxrpc_reap_client_connections = 900;
34__read_mostly unsigned long rxrpc_conn_idle_client_expiry = 2 * 60 * HZ;
35__read_mostly unsigned long rxrpc_conn_idle_client_fast_expiry = 2 * HZ;
36
37/*
38 * We use machine-unique IDs for our client connections.
39 */
40DEFINE_IDR(rxrpc_client_conn_ids);
41static DEFINE_SPINLOCK(rxrpc_conn_id_lock);
42
43static void rxrpc_deactivate_bundle(struct rxrpc_bundle *bundle);
44
45/*
46 * Get a connection ID and epoch for a client connection from the global pool.
47 * The connection struct pointer is then recorded in the idr radix tree.  The
48 * epoch doesn't change until the client is rebooted (or, at least, unless the
49 * module is unloaded).
50 */
51static int rxrpc_get_client_connection_id(struct rxrpc_connection *conn,
52					  gfp_t gfp)
53{
54	struct rxrpc_net *rxnet = conn->params.local->rxnet;
55	int id;
56
57	_enter("");
58
59	idr_preload(gfp);
60	spin_lock(&rxrpc_conn_id_lock);
61
62	id = idr_alloc_cyclic(&rxrpc_client_conn_ids, conn,
63			      1, 0x40000000, GFP_NOWAIT);
64	if (id < 0)
65		goto error;
66
67	spin_unlock(&rxrpc_conn_id_lock);
68	idr_preload_end();
69
70	conn->proto.epoch = rxnet->epoch;
71	conn->proto.cid = id << RXRPC_CIDSHIFT;
72	set_bit(RXRPC_CONN_HAS_IDR, &conn->flags);
73	_leave(" [CID %x]", conn->proto.cid);
74	return 0;
75
76error:
77	spin_unlock(&rxrpc_conn_id_lock);
78	idr_preload_end();
79	_leave(" = %d", id);
80	return id;
81}
82
83/*
84 * Release a connection ID for a client connection from the global pool.
85 */
86static void rxrpc_put_client_connection_id(struct rxrpc_connection *conn)
87{
88	if (test_bit(RXRPC_CONN_HAS_IDR, &conn->flags)) {
89		spin_lock(&rxrpc_conn_id_lock);
90		idr_remove(&rxrpc_client_conn_ids,
91			   conn->proto.cid >> RXRPC_CIDSHIFT);
92		spin_unlock(&rxrpc_conn_id_lock);
93	}
94}
95
96/*
97 * Destroy the client connection ID tree.
98 */
99void rxrpc_destroy_client_conn_ids(void)
100{
101	struct rxrpc_connection *conn;
102	int id;
103
104	if (!idr_is_empty(&rxrpc_client_conn_ids)) {
105		idr_for_each_entry(&rxrpc_client_conn_ids, conn, id) {
106			pr_err("AF_RXRPC: Leaked client conn %p {%d}\n",
107			       conn, refcount_read(&conn->ref));
108		}
109		BUG();
110	}
111
112	idr_destroy(&rxrpc_client_conn_ids);
113}
114
115/*
116 * Allocate a connection bundle.
117 */
118static struct rxrpc_bundle *rxrpc_alloc_bundle(struct rxrpc_conn_parameters *cp,
119					       gfp_t gfp)
120{
121	struct rxrpc_bundle *bundle;
122
123	bundle = kzalloc(sizeof(*bundle), gfp);
124	if (bundle) {
125		bundle->params = *cp;
126		rxrpc_get_peer(bundle->params.peer);
127		refcount_set(&bundle->ref, 1);
128		atomic_set(&bundle->active, 1);
129		spin_lock_init(&bundle->channel_lock);
130		INIT_LIST_HEAD(&bundle->waiting_calls);
131	}
132	return bundle;
133}
134
135struct rxrpc_bundle *rxrpc_get_bundle(struct rxrpc_bundle *bundle)
136{
137	refcount_inc(&bundle->ref);
138	return bundle;
139}
140
141static void rxrpc_free_bundle(struct rxrpc_bundle *bundle)
142{
143	rxrpc_put_peer(bundle->params.peer);
144	kfree(bundle);
145}
146
147void rxrpc_put_bundle(struct rxrpc_bundle *bundle)
148{
149	unsigned int d = bundle->debug_id;
150	bool dead;
151	int r;
152
153	dead = __refcount_dec_and_test(&bundle->ref, &r);
154
155	_debug("PUT B=%x %d", d, r - 1);
156	if (dead)
157		rxrpc_free_bundle(bundle);
158}
159
160/*
161 * Allocate a client connection.
162 */
163static struct rxrpc_connection *
164rxrpc_alloc_client_connection(struct rxrpc_bundle *bundle, gfp_t gfp)
165{
166	struct rxrpc_connection *conn;
167	struct rxrpc_net *rxnet = bundle->params.local->rxnet;
168	int ret;
169
170	_enter("");
171
172	conn = rxrpc_alloc_connection(gfp);
173	if (!conn) {
174		_leave(" = -ENOMEM");
175		return ERR_PTR(-ENOMEM);
176	}
177
178	refcount_set(&conn->ref, 1);
179	conn->bundle		= bundle;
180	conn->params		= bundle->params;
181	conn->out_clientflag	= RXRPC_CLIENT_INITIATED;
182	conn->state		= RXRPC_CONN_CLIENT;
183	conn->service_id	= conn->params.service_id;
184
185	ret = rxrpc_get_client_connection_id(conn, gfp);
186	if (ret < 0)
187		goto error_0;
188
189	ret = rxrpc_init_client_conn_security(conn);
190	if (ret < 0)
191		goto error_1;
192
193	ret = conn->security->prime_packet_security(conn);
194	if (ret < 0)
195		goto error_2;
196
197	atomic_inc(&rxnet->nr_conns);
198	write_lock(&rxnet->conn_lock);
199	list_add_tail(&conn->proc_link, &rxnet->conn_proc_list);
200	write_unlock(&rxnet->conn_lock);
201
202	rxrpc_get_bundle(bundle);
203	rxrpc_get_peer(conn->params.peer);
204	rxrpc_get_local(conn->params.local);
205	key_get(conn->params.key);
206
207	trace_rxrpc_conn(conn->debug_id, rxrpc_conn_new_client,
208			 refcount_read(&conn->ref),
209			 __builtin_return_address(0));
210
211	atomic_inc(&rxnet->nr_client_conns);
212	trace_rxrpc_client(conn, -1, rxrpc_client_alloc);
213	_leave(" = %p", conn);
214	return conn;
215
216error_2:
217	conn->security->clear(conn);
218error_1:
219	rxrpc_put_client_connection_id(conn);
220error_0:
221	kfree(conn);
222	_leave(" = %d", ret);
223	return ERR_PTR(ret);
224}
225
226/*
227 * Determine if a connection may be reused.
228 */
229static bool rxrpc_may_reuse_conn(struct rxrpc_connection *conn)
230{
231	struct rxrpc_net *rxnet;
232	int id_cursor, id, distance, limit;
233
234	if (!conn)
235		goto dont_reuse;
236
237	rxnet = conn->params.local->rxnet;
238	if (test_bit(RXRPC_CONN_DONT_REUSE, &conn->flags))
239		goto dont_reuse;
240
241	if (conn->state != RXRPC_CONN_CLIENT ||
242	    conn->proto.epoch != rxnet->epoch)
243		goto mark_dont_reuse;
244
245	/* The IDR tree gets very expensive on memory if the connection IDs are
246	 * widely scattered throughout the number space, so we shall want to
247	 * kill off connections that, say, have an ID more than about four
248	 * times the maximum number of client conns away from the current
249	 * allocation point to try and keep the IDs concentrated.
250	 */
251	id_cursor = idr_get_cursor(&rxrpc_client_conn_ids);
252	id = conn->proto.cid >> RXRPC_CIDSHIFT;
253	distance = id - id_cursor;
254	if (distance < 0)
255		distance = -distance;
256	limit = max_t(unsigned long, atomic_read(&rxnet->nr_conns) * 4, 1024);
257	if (distance > limit)
258		goto mark_dont_reuse;
259
260	return true;
261
262mark_dont_reuse:
263	set_bit(RXRPC_CONN_DONT_REUSE, &conn->flags);
264dont_reuse:
265	return false;
266}
267
268/*
269 * Look up the conn bundle that matches the connection parameters, adding it if
270 * it doesn't yet exist.
271 */
272static struct rxrpc_bundle *rxrpc_look_up_bundle(struct rxrpc_conn_parameters *cp,
273						 gfp_t gfp)
274{
275	static atomic_t rxrpc_bundle_id;
276	struct rxrpc_bundle *bundle, *candidate;
277	struct rxrpc_local *local = cp->local;
278	struct rb_node *p, **pp, *parent;
279	long diff;
280
281	_enter("{%px,%x,%u,%u}",
282	       cp->peer, key_serial(cp->key), cp->security_level, cp->upgrade);
283
284	if (cp->exclusive)
285		return rxrpc_alloc_bundle(cp, gfp);
286
287	/* First, see if the bundle is already there. */
288	_debug("search 1");
289	spin_lock(&local->client_bundles_lock);
290	p = local->client_bundles.rb_node;
291	while (p) {
292		bundle = rb_entry(p, struct rxrpc_bundle, local_node);
293
294#define cmp(X) ((long)bundle->params.X - (long)cp->X)
295		diff = (cmp(peer) ?:
296			cmp(key) ?:
297			cmp(security_level) ?:
298			cmp(upgrade));
299#undef cmp
300		if (diff < 0)
301			p = p->rb_left;
302		else if (diff > 0)
303			p = p->rb_right;
304		else
305			goto found_bundle;
306	}
307	spin_unlock(&local->client_bundles_lock);
308	_debug("not found");
309
310	/* It wasn't.  We need to add one. */
311	candidate = rxrpc_alloc_bundle(cp, gfp);
312	if (!candidate)
313		return NULL;
314
315	_debug("search 2");
316	spin_lock(&local->client_bundles_lock);
317	pp = &local->client_bundles.rb_node;
318	parent = NULL;
319	while (*pp) {
320		parent = *pp;
321		bundle = rb_entry(parent, struct rxrpc_bundle, local_node);
322
323#define cmp(X) ((long)bundle->params.X - (long)cp->X)
324		diff = (cmp(peer) ?:
325			cmp(key) ?:
326			cmp(security_level) ?:
327			cmp(upgrade));
328#undef cmp
329		if (diff < 0)
330			pp = &(*pp)->rb_left;
331		else if (diff > 0)
332			pp = &(*pp)->rb_right;
333		else
334			goto found_bundle_free;
335	}
336
337	_debug("new bundle");
338	candidate->debug_id = atomic_inc_return(&rxrpc_bundle_id);
339	rb_link_node(&candidate->local_node, parent, pp);
340	rb_insert_color(&candidate->local_node, &local->client_bundles);
341	rxrpc_get_bundle(candidate);
342	spin_unlock(&local->client_bundles_lock);
343	_leave(" = %u [new]", candidate->debug_id);
344	return candidate;
345
346found_bundle_free:
347	rxrpc_free_bundle(candidate);
348found_bundle:
349	rxrpc_get_bundle(bundle);
350	atomic_inc(&bundle->active);
351	spin_unlock(&local->client_bundles_lock);
352	_leave(" = %u [found]", bundle->debug_id);
353	return bundle;
354}
355
356/*
357 * Create or find a client bundle to use for a call.
358 *
359 * If we return with a connection, the call will be on its waiting list.  It's
360 * left to the caller to assign a channel and wake up the call.
361 */
362static struct rxrpc_bundle *rxrpc_prep_call(struct rxrpc_sock *rx,
363					    struct rxrpc_call *call,
364					    struct rxrpc_conn_parameters *cp,
365					    struct sockaddr_rxrpc *srx,
366					    gfp_t gfp)
367{
368	struct rxrpc_bundle *bundle;
369
370	_enter("{%d,%lx},", call->debug_id, call->user_call_ID);
371
372	cp->peer = rxrpc_lookup_peer(rx, cp->local, srx, gfp);
373	if (!cp->peer)
374		goto error;
375
376	call->cong_cwnd = cp->peer->cong_cwnd;
377	if (call->cong_cwnd >= call->cong_ssthresh)
378		call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE;
379	else
380		call->cong_mode = RXRPC_CALL_SLOW_START;
381	if (cp->upgrade)
382		__set_bit(RXRPC_CALL_UPGRADE, &call->flags);
383
384	/* Find the client connection bundle. */
385	bundle = rxrpc_look_up_bundle(cp, gfp);
386	if (!bundle)
387		goto error;
388
389	/* Get this call queued.  Someone else may activate it whilst we're
390	 * lining up a new connection, but that's fine.
391	 */
392	spin_lock(&bundle->channel_lock);
393	list_add_tail(&call->chan_wait_link, &bundle->waiting_calls);
394	spin_unlock(&bundle->channel_lock);
395
396	_leave(" = [B=%x]", bundle->debug_id);
397	return bundle;
398
399error:
400	_leave(" = -ENOMEM");
401	return ERR_PTR(-ENOMEM);
402}
403
404/*
405 * Allocate a new connection and add it into a bundle.
406 */
407static void rxrpc_add_conn_to_bundle(struct rxrpc_bundle *bundle, gfp_t gfp)
408	__releases(bundle->channel_lock)
409{
410	struct rxrpc_connection *candidate = NULL, *old = NULL;
411	bool conflict;
412	int i;
413
414	_enter("");
415
416	conflict = bundle->alloc_conn;
417	if (!conflict)
418		bundle->alloc_conn = true;
419	spin_unlock(&bundle->channel_lock);
420	if (conflict) {
421		_leave(" [conf]");
422		return;
423	}
424
425	candidate = rxrpc_alloc_client_connection(bundle, gfp);
426
427	spin_lock(&bundle->channel_lock);
428	bundle->alloc_conn = false;
429
430	if (IS_ERR(candidate)) {
431		bundle->alloc_error = PTR_ERR(candidate);
432		spin_unlock(&bundle->channel_lock);
433		_leave(" [err %ld]", PTR_ERR(candidate));
434		return;
435	}
436
437	bundle->alloc_error = 0;
438
439	for (i = 0; i < ARRAY_SIZE(bundle->conns); i++) {
440		unsigned int shift = i * RXRPC_MAXCALLS;
441		int j;
442
443		old = bundle->conns[i];
444		if (!rxrpc_may_reuse_conn(old)) {
445			if (old)
446				trace_rxrpc_client(old, -1, rxrpc_client_replace);
447			candidate->bundle_shift = shift;
448			atomic_inc(&bundle->active);
449			bundle->conns[i] = candidate;
450			for (j = 0; j < RXRPC_MAXCALLS; j++)
451				set_bit(shift + j, &bundle->avail_chans);
452			candidate = NULL;
453			break;
454		}
455
456		old = NULL;
457	}
458
459	spin_unlock(&bundle->channel_lock);
460
461	if (candidate) {
462		_debug("discard C=%x", candidate->debug_id);
463		trace_rxrpc_client(candidate, -1, rxrpc_client_duplicate);
464		rxrpc_put_connection(candidate);
465	}
466
467	rxrpc_put_connection(old);
468	_leave("");
469}
470
471/*
472 * Add a connection to a bundle if there are no usable connections or we have
473 * connections waiting for extra capacity.
474 */
475static void rxrpc_maybe_add_conn(struct rxrpc_bundle *bundle, gfp_t gfp)
476{
477	struct rxrpc_call *call;
478	int i, usable;
479
480	_enter("");
481
482	spin_lock(&bundle->channel_lock);
483
484	/* See if there are any usable connections. */
485	usable = 0;
486	for (i = 0; i < ARRAY_SIZE(bundle->conns); i++)
487		if (rxrpc_may_reuse_conn(bundle->conns[i]))
488			usable++;
489
490	if (!usable && !list_empty(&bundle->waiting_calls)) {
491		call = list_first_entry(&bundle->waiting_calls,
492					struct rxrpc_call, chan_wait_link);
493		if (test_bit(RXRPC_CALL_UPGRADE, &call->flags))
494			bundle->try_upgrade = true;
495	}
496
497	if (!usable)
498		goto alloc_conn;
499
500	if (!bundle->avail_chans &&
501	    !bundle->try_upgrade &&
502	    !list_empty(&bundle->waiting_calls) &&
503	    usable < ARRAY_SIZE(bundle->conns))
504		goto alloc_conn;
505
506	spin_unlock(&bundle->channel_lock);
507	_leave("");
508	return;
509
510alloc_conn:
511	return rxrpc_add_conn_to_bundle(bundle, gfp);
512}
513
514/*
515 * Assign a channel to the call at the front of the queue and wake the call up.
516 * We don't increment the callNumber counter until this number has been exposed
517 * to the world.
518 */
519static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
520				       unsigned int channel)
521{
522	struct rxrpc_channel *chan = &conn->channels[channel];
523	struct rxrpc_bundle *bundle = conn->bundle;
524	struct rxrpc_call *call = list_entry(bundle->waiting_calls.next,
525					     struct rxrpc_call, chan_wait_link);
526	u32 call_id = chan->call_counter + 1;
527
528	_enter("C=%x,%u", conn->debug_id, channel);
529
530	trace_rxrpc_client(conn, channel, rxrpc_client_chan_activate);
531
532	/* Cancel the final ACK on the previous call if it hasn't been sent yet
533	 * as the DATA packet will implicitly ACK it.
534	 */
535	clear_bit(RXRPC_CONN_FINAL_ACK_0 + channel, &conn->flags);
536	clear_bit(conn->bundle_shift + channel, &bundle->avail_chans);
537
538	rxrpc_see_call(call);
539	list_del_init(&call->chan_wait_link);
540	call->peer	= rxrpc_get_peer(conn->params.peer);
541	call->conn	= rxrpc_get_connection(conn);
542	call->cid	= conn->proto.cid | channel;
543	call->call_id	= call_id;
544	call->security	= conn->security;
545	call->security_ix = conn->security_ix;
546	call->service_id = conn->service_id;
547
548	trace_rxrpc_connect_call(call);
549	_net("CONNECT call %08x:%08x as call %d on conn %d",
550	     call->cid, call->call_id, call->debug_id, conn->debug_id);
551
552	write_lock_bh(&call->state_lock);
553	call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
554	write_unlock_bh(&call->state_lock);
555
556	/* Paired with the read barrier in rxrpc_connect_call().  This orders
557	 * cid and epoch in the connection wrt to call_id without the need to
558	 * take the channel_lock.
559	 *
560	 * We provisionally assign a callNumber at this point, but we don't
561	 * confirm it until the call is about to be exposed.
562	 *
563	 * TODO: Pair with a barrier in the data_ready handler when that looks
564	 * at the call ID through a connection channel.
565	 */
566	smp_wmb();
567
568	chan->call_id		= call_id;
569	chan->call_debug_id	= call->debug_id;
570	rcu_assign_pointer(chan->call, call);
571	wake_up(&call->waitq);
572}
573
574/*
575 * Remove a connection from the idle list if it's on it.
576 */
577static void rxrpc_unidle_conn(struct rxrpc_bundle *bundle, struct rxrpc_connection *conn)
578{
579	struct rxrpc_net *rxnet = bundle->params.local->rxnet;
580	bool drop_ref;
581
582	if (!list_empty(&conn->cache_link)) {
583		drop_ref = false;
584		spin_lock(&rxnet->client_conn_cache_lock);
585		if (!list_empty(&conn->cache_link)) {
586			list_del_init(&conn->cache_link);
587			drop_ref = true;
588		}
589		spin_unlock(&rxnet->client_conn_cache_lock);
590		if (drop_ref)
591			rxrpc_put_connection(conn);
592	}
593}
594
595/*
596 * Assign channels and callNumbers to waiting calls with channel_lock
597 * held by caller.
598 */
599static void rxrpc_activate_channels_locked(struct rxrpc_bundle *bundle)
600{
601	struct rxrpc_connection *conn;
602	unsigned long avail, mask;
603	unsigned int channel, slot;
604
605	if (bundle->try_upgrade)
606		mask = 1;
607	else
608		mask = ULONG_MAX;
609
610	while (!list_empty(&bundle->waiting_calls)) {
611		avail = bundle->avail_chans & mask;
612		if (!avail)
613			break;
614		channel = __ffs(avail);
615		clear_bit(channel, &bundle->avail_chans);
616
617		slot = channel / RXRPC_MAXCALLS;
618		conn = bundle->conns[slot];
619		if (!conn)
620			break;
621
622		if (bundle->try_upgrade)
623			set_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags);
624		rxrpc_unidle_conn(bundle, conn);
625
626		channel &= (RXRPC_MAXCALLS - 1);
627		conn->act_chans	|= 1 << channel;
628		rxrpc_activate_one_channel(conn, channel);
629	}
630}
631
632/*
633 * Assign channels and callNumbers to waiting calls.
634 */
635static void rxrpc_activate_channels(struct rxrpc_bundle *bundle)
636{
637	_enter("B=%x", bundle->debug_id);
638
639	trace_rxrpc_client(NULL, -1, rxrpc_client_activate_chans);
640
641	if (!bundle->avail_chans)
642		return;
643
644	spin_lock(&bundle->channel_lock);
645	rxrpc_activate_channels_locked(bundle);
646	spin_unlock(&bundle->channel_lock);
647	_leave("");
648}
649
650/*
651 * Wait for a callNumber and a channel to be granted to a call.
652 */
653static int rxrpc_wait_for_channel(struct rxrpc_bundle *bundle,
654				  struct rxrpc_call *call, gfp_t gfp)
655{
656	DECLARE_WAITQUEUE(myself, current);
657	int ret = 0;
658
659	_enter("%d", call->debug_id);
660
661	if (!gfpflags_allow_blocking(gfp)) {
662		rxrpc_maybe_add_conn(bundle, gfp);
663		rxrpc_activate_channels(bundle);
664		ret = bundle->alloc_error ?: -EAGAIN;
665		goto out;
666	}
667
668	add_wait_queue_exclusive(&call->waitq, &myself);
669	for (;;) {
670		rxrpc_maybe_add_conn(bundle, gfp);
671		rxrpc_activate_channels(bundle);
672		ret = bundle->alloc_error;
673		if (ret < 0)
674			break;
675
676		switch (call->interruptibility) {
677		case RXRPC_INTERRUPTIBLE:
678		case RXRPC_PREINTERRUPTIBLE:
679			set_current_state(TASK_INTERRUPTIBLE);
680			break;
681		case RXRPC_UNINTERRUPTIBLE:
682		default:
683			set_current_state(TASK_UNINTERRUPTIBLE);
684			break;
685		}
686		if (READ_ONCE(call->state) != RXRPC_CALL_CLIENT_AWAIT_CONN)
687			break;
688		if ((call->interruptibility == RXRPC_INTERRUPTIBLE ||
689		     call->interruptibility == RXRPC_PREINTERRUPTIBLE) &&
690		    signal_pending(current)) {
691			ret = -ERESTARTSYS;
692			break;
693		}
694		schedule();
695	}
696	remove_wait_queue(&call->waitq, &myself);
697	__set_current_state(TASK_RUNNING);
698
699out:
700	_leave(" = %d", ret);
701	return ret;
702}
703
704/*
705 * find a connection for a call
706 * - called in process context with IRQs enabled
707 */
708int rxrpc_connect_call(struct rxrpc_sock *rx,
709		       struct rxrpc_call *call,
710		       struct rxrpc_conn_parameters *cp,
711		       struct sockaddr_rxrpc *srx,
712		       gfp_t gfp)
713{
714	struct rxrpc_bundle *bundle;
715	struct rxrpc_net *rxnet = cp->local->rxnet;
716	int ret = 0;
717
718	_enter("{%d,%lx},", call->debug_id, call->user_call_ID);
719
720	rxrpc_discard_expired_client_conns(&rxnet->client_conn_reaper);
721
722	bundle = rxrpc_prep_call(rx, call, cp, srx, gfp);
723	if (IS_ERR(bundle)) {
724		ret = PTR_ERR(bundle);
725		goto out;
726	}
727
728	if (call->state == RXRPC_CALL_CLIENT_AWAIT_CONN) {
729		ret = rxrpc_wait_for_channel(bundle, call, gfp);
730		if (ret < 0)
731			goto wait_failed;
732	}
733
734granted_channel:
735	/* Paired with the write barrier in rxrpc_activate_one_channel(). */
736	smp_rmb();
737
738out_put_bundle:
739	rxrpc_deactivate_bundle(bundle);
740	rxrpc_put_bundle(bundle);
741out:
742	_leave(" = %d", ret);
743	return ret;
744
745wait_failed:
746	spin_lock(&bundle->channel_lock);
747	list_del_init(&call->chan_wait_link);
748	spin_unlock(&bundle->channel_lock);
749
750	if (call->state != RXRPC_CALL_CLIENT_AWAIT_CONN) {
751		ret = 0;
752		goto granted_channel;
753	}
754
755	trace_rxrpc_client(call->conn, ret, rxrpc_client_chan_wait_failed);
756	rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, 0, ret);
757	rxrpc_disconnect_client_call(bundle, call);
758	goto out_put_bundle;
759}
760
761/*
762 * Note that a call, and thus a connection, is about to be exposed to the
763 * world.
764 */
765void rxrpc_expose_client_call(struct rxrpc_call *call)
766{
767	unsigned int channel = call->cid & RXRPC_CHANNELMASK;
768	struct rxrpc_connection *conn = call->conn;
769	struct rxrpc_channel *chan = &conn->channels[channel];
770
771	if (!test_and_set_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
772		/* Mark the call ID as being used.  If the callNumber counter
773		 * exceeds ~2 billion, we kill the connection after its
774		 * outstanding calls have finished so that the counter doesn't
775		 * wrap.
776		 */
777		chan->call_counter++;
778		if (chan->call_counter >= INT_MAX)
779			set_bit(RXRPC_CONN_DONT_REUSE, &conn->flags);
780		trace_rxrpc_client(conn, channel, rxrpc_client_exposed);
781	}
782}
783
784/*
785 * Set the reap timer.
786 */
787static void rxrpc_set_client_reap_timer(struct rxrpc_net *rxnet)
788{
789	if (!rxnet->kill_all_client_conns) {
790		unsigned long now = jiffies;
791		unsigned long reap_at = now + rxrpc_conn_idle_client_expiry;
792
793		if (rxnet->live)
794			timer_reduce(&rxnet->client_conn_reap_timer, reap_at);
795	}
796}
797
798/*
799 * Disconnect a client call.
800 */
801void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call *call)
802{
803	struct rxrpc_connection *conn;
804	struct rxrpc_channel *chan = NULL;
805	struct rxrpc_net *rxnet = bundle->params.local->rxnet;
806	unsigned int channel;
807	bool may_reuse;
808	u32 cid;
809
810	_enter("c=%x", call->debug_id);
811
812	spin_lock(&bundle->channel_lock);
813	set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
814
815	/* Calls that have never actually been assigned a channel can simply be
816	 * discarded.
817	 */
818	conn = call->conn;
819	if (!conn) {
820		_debug("call is waiting");
821		ASSERTCMP(call->call_id, ==, 0);
822		ASSERT(!test_bit(RXRPC_CALL_EXPOSED, &call->flags));
823		list_del_init(&call->chan_wait_link);
824		goto out;
825	}
826
827	cid = call->cid;
828	channel = cid & RXRPC_CHANNELMASK;
829	chan = &conn->channels[channel];
830	trace_rxrpc_client(conn, channel, rxrpc_client_chan_disconnect);
831
832	if (rcu_access_pointer(chan->call) != call) {
833		spin_unlock(&bundle->channel_lock);
834		BUG();
835	}
836
837	may_reuse = rxrpc_may_reuse_conn(conn);
838
839	/* If a client call was exposed to the world, we save the result for
840	 * retransmission.
841	 *
842	 * We use a barrier here so that the call number and abort code can be
843	 * read without needing to take a lock.
844	 *
845	 * TODO: Make the incoming packet handler check this and handle
846	 * terminal retransmission without requiring access to the call.
847	 */
848	if (test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
849		_debug("exposed %u,%u", call->call_id, call->abort_code);
850		__rxrpc_disconnect_call(conn, call);
851
852		if (test_and_clear_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags)) {
853			trace_rxrpc_client(conn, channel, rxrpc_client_to_active);
854			bundle->try_upgrade = false;
855			if (may_reuse)
856				rxrpc_activate_channels_locked(bundle);
857		}
858
859	}
860
861	/* See if we can pass the channel directly to another call. */
862	if (may_reuse && !list_empty(&bundle->waiting_calls)) {
863		trace_rxrpc_client(conn, channel, rxrpc_client_chan_pass);
864		rxrpc_activate_one_channel(conn, channel);
865		goto out;
866	}
867
868	/* Schedule the final ACK to be transmitted in a short while so that it
869	 * can be skipped if we find a follow-on call.  The first DATA packet
870	 * of the follow on call will implicitly ACK this call.
871	 */
872	if (call->completion == RXRPC_CALL_SUCCEEDED &&
873	    test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
874		unsigned long final_ack_at = jiffies + 2;
875
876		WRITE_ONCE(chan->final_ack_at, final_ack_at);
877		smp_wmb(); /* vs rxrpc_process_delayed_final_acks() */
878		set_bit(RXRPC_CONN_FINAL_ACK_0 + channel, &conn->flags);
879		rxrpc_reduce_conn_timer(conn, final_ack_at);
880	}
881
882	/* Deactivate the channel. */
883	rcu_assign_pointer(chan->call, NULL);
884	set_bit(conn->bundle_shift + channel, &conn->bundle->avail_chans);
885	conn->act_chans	&= ~(1 << channel);
886
887	/* If no channels remain active, then put the connection on the idle
888	 * list for a short while.  Give it a ref to stop it going away if it
889	 * becomes unbundled.
890	 */
891	if (!conn->act_chans) {
892		trace_rxrpc_client(conn, channel, rxrpc_client_to_idle);
893		conn->idle_timestamp = jiffies;
894
895		rxrpc_get_connection(conn);
896		spin_lock(&rxnet->client_conn_cache_lock);
897		list_move_tail(&conn->cache_link, &rxnet->idle_client_conns);
898		spin_unlock(&rxnet->client_conn_cache_lock);
899
900		rxrpc_set_client_reap_timer(rxnet);
901	}
902
903out:
904	spin_unlock(&bundle->channel_lock);
905	_leave("");
906	return;
907}
908
909/*
910 * Remove a connection from a bundle.
911 */
912static void rxrpc_unbundle_conn(struct rxrpc_connection *conn)
913{
914	struct rxrpc_bundle *bundle = conn->bundle;
915	unsigned int bindex;
916	bool need_drop = false;
917	int i;
918
919	_enter("C=%x", conn->debug_id);
920
921	if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
922		rxrpc_process_delayed_final_acks(conn, true);
923
924	spin_lock(&bundle->channel_lock);
925	bindex = conn->bundle_shift / RXRPC_MAXCALLS;
926	if (bundle->conns[bindex] == conn) {
927		_debug("clear slot %u", bindex);
928		bundle->conns[bindex] = NULL;
929		for (i = 0; i < RXRPC_MAXCALLS; i++)
930			clear_bit(conn->bundle_shift + i, &bundle->avail_chans);
931		need_drop = true;
932	}
933	spin_unlock(&bundle->channel_lock);
934
935	if (need_drop) {
936		rxrpc_deactivate_bundle(bundle);
937		rxrpc_put_connection(conn);
938	}
939}
940
941/*
942 * Drop the active count on a bundle.
943 */
944static void rxrpc_deactivate_bundle(struct rxrpc_bundle *bundle)
945{
946	struct rxrpc_local *local = bundle->params.local;
947	bool need_put = false;
948
949	if (atomic_dec_and_lock(&bundle->active, &local->client_bundles_lock)) {
950		if (!bundle->params.exclusive) {
951			_debug("erase bundle");
952			rb_erase(&bundle->local_node, &local->client_bundles);
953			need_put = true;
954		}
955
956		spin_unlock(&local->client_bundles_lock);
957		if (need_put)
958			rxrpc_put_bundle(bundle);
959	}
960}
961
962/*
963 * Clean up a dead client connection.
964 */
965static void rxrpc_kill_client_conn(struct rxrpc_connection *conn)
966{
967	struct rxrpc_local *local = conn->params.local;
968	struct rxrpc_net *rxnet = local->rxnet;
969
970	_enter("C=%x", conn->debug_id);
971
972	trace_rxrpc_client(conn, -1, rxrpc_client_cleanup);
973	atomic_dec(&rxnet->nr_client_conns);
974
975	rxrpc_put_client_connection_id(conn);
976	rxrpc_kill_connection(conn);
977}
978
979/*
980 * Clean up a dead client connections.
981 */
982void rxrpc_put_client_conn(struct rxrpc_connection *conn)
983{
984	const void *here = __builtin_return_address(0);
985	unsigned int debug_id = conn->debug_id;
986	bool dead;
987	int r;
988
989	dead = __refcount_dec_and_test(&conn->ref, &r);
990	trace_rxrpc_conn(debug_id, rxrpc_conn_put_client, r - 1, here);
991	if (dead)
992		rxrpc_kill_client_conn(conn);
993}
994
995/*
996 * Discard expired client connections from the idle list.  Each conn in the
997 * idle list has been exposed and holds an extra ref because of that.
998 *
999 * This may be called from conn setup or from a work item so cannot be
1000 * considered non-reentrant.
1001 */
1002void rxrpc_discard_expired_client_conns(struct work_struct *work)
1003{
1004	struct rxrpc_connection *conn;
1005	struct rxrpc_net *rxnet =
1006		container_of(work, struct rxrpc_net, client_conn_reaper);
1007	unsigned long expiry, conn_expires_at, now;
1008	unsigned int nr_conns;
1009
1010	_enter("");
1011
1012	if (list_empty(&rxnet->idle_client_conns)) {
1013		_leave(" [empty]");
1014		return;
1015	}
1016
1017	/* Don't double up on the discarding */
1018	if (!spin_trylock(&rxnet->client_conn_discard_lock)) {
1019		_leave(" [already]");
1020		return;
1021	}
1022
1023	/* We keep an estimate of what the number of conns ought to be after
1024	 * we've discarded some so that we don't overdo the discarding.
1025	 */
1026	nr_conns = atomic_read(&rxnet->nr_client_conns);
1027
1028next:
1029	spin_lock(&rxnet->client_conn_cache_lock);
1030
1031	if (list_empty(&rxnet->idle_client_conns))
1032		goto out;
1033
1034	conn = list_entry(rxnet->idle_client_conns.next,
1035			  struct rxrpc_connection, cache_link);
1036
1037	if (!rxnet->kill_all_client_conns) {
1038		/* If the number of connections is over the reap limit, we
1039		 * expedite discard by reducing the expiry timeout.  We must,
1040		 * however, have at least a short grace period to be able to do
1041		 * final-ACK or ABORT retransmission.
1042		 */
1043		expiry = rxrpc_conn_idle_client_expiry;
1044		if (nr_conns > rxrpc_reap_client_connections)
1045			expiry = rxrpc_conn_idle_client_fast_expiry;
1046		if (conn->params.local->service_closed)
1047			expiry = rxrpc_closed_conn_expiry * HZ;
1048
1049		conn_expires_at = conn->idle_timestamp + expiry;
1050
1051		now = READ_ONCE(jiffies);
1052		if (time_after(conn_expires_at, now))
1053			goto not_yet_expired;
1054	}
1055
1056	trace_rxrpc_client(conn, -1, rxrpc_client_discard);
1057	list_del_init(&conn->cache_link);
1058
1059	spin_unlock(&rxnet->client_conn_cache_lock);
1060
1061	rxrpc_unbundle_conn(conn);
1062	rxrpc_put_connection(conn); /* Drop the ->cache_link ref */
1063
1064	nr_conns--;
1065	goto next;
1066
1067not_yet_expired:
1068	/* The connection at the front of the queue hasn't yet expired, so
1069	 * schedule the work item for that point if we discarded something.
1070	 *
1071	 * We don't worry if the work item is already scheduled - it can look
1072	 * after rescheduling itself at a later time.  We could cancel it, but
1073	 * then things get messier.
1074	 */
1075	_debug("not yet");
1076	if (!rxnet->kill_all_client_conns)
1077		timer_reduce(&rxnet->client_conn_reap_timer, conn_expires_at);
1078
1079out:
1080	spin_unlock(&rxnet->client_conn_cache_lock);
1081	spin_unlock(&rxnet->client_conn_discard_lock);
1082	_leave("");
1083}
1084
1085/*
1086 * Preemptively destroy all the client connection records rather than waiting
1087 * for them to time out
1088 */
1089void rxrpc_destroy_all_client_connections(struct rxrpc_net *rxnet)
1090{
1091	_enter("");
1092
1093	spin_lock(&rxnet->client_conn_cache_lock);
1094	rxnet->kill_all_client_conns = true;
1095	spin_unlock(&rxnet->client_conn_cache_lock);
1096
1097	del_timer_sync(&rxnet->client_conn_reap_timer);
1098
1099	if (!rxrpc_queue_work(&rxnet->client_conn_reaper))
1100		_debug("destroy: queue failed");
1101
1102	_leave("");
1103}
1104
1105/*
1106 * Clean up the client connections on a local endpoint.
1107 */
1108void rxrpc_clean_up_local_conns(struct rxrpc_local *local)
1109{
1110	struct rxrpc_connection *conn, *tmp;
1111	struct rxrpc_net *rxnet = local->rxnet;
1112	LIST_HEAD(graveyard);
1113
1114	_enter("");
1115
1116	spin_lock(&rxnet->client_conn_cache_lock);
1117
1118	list_for_each_entry_safe(conn, tmp, &rxnet->idle_client_conns,
1119				 cache_link) {
1120		if (conn->params.local == local) {
1121			trace_rxrpc_client(conn, -1, rxrpc_client_discard);
1122			list_move(&conn->cache_link, &graveyard);
1123		}
1124	}
1125
1126	spin_unlock(&rxnet->client_conn_cache_lock);
1127
1128	while (!list_empty(&graveyard)) {
1129		conn = list_entry(graveyard.next,
1130				  struct rxrpc_connection, cache_link);
1131		list_del_init(&conn->cache_link);
1132		rxrpc_unbundle_conn(conn);
1133		rxrpc_put_connection(conn);
1134	}
1135
1136	_leave(" [culled]");
1137}
1138