xref: /kernel/linux/linux-6.6/fs/dlm/lowcomms.c (revision 62306a36)
1// SPDX-License-Identifier: GPL-2.0-only
2/******************************************************************************
3*******************************************************************************
4**
5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
7**
8**
9*******************************************************************************
10******************************************************************************/
11
12/*
13 * lowcomms.c
14 *
15 * This is the "low-level" comms layer.
16 *
17 * It is responsible for sending/receiving messages
18 * from other nodes in the cluster.
19 *
20 * Cluster nodes are referred to by their nodeids. nodeids are
21 * simply 32 bit numbers to the locking module - if they need to
22 * be expanded for the cluster infrastructure then that is its
23 * responsibility. It is this layer's
24 * responsibility to resolve these into IP address or
25 * whatever it needs for inter-node communication.
26 *
27 * The comms level is two kernel threads that deal mainly with
28 * the receiving of messages from other nodes and passing them
29 * up to the mid-level comms layer (which understands the
30 * message format) for execution by the locking core, and
31 * a send thread which does all the setting up of connections
32 * to remote nodes and the sending of data. Threads are not allowed
33 * to send their own data because it may cause them to wait in times
34 * of high load. Also, this way, the sending thread can collect together
35 * messages bound for one node and send them in one block.
36 *
37 * lowcomms will choose to use either TCP or SCTP as its transport layer
38 * depending on the configuration variable 'protocol'. This should be set
39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
40 * cluster-wide mechanism as it must be the same on all nodes of the cluster
41 * for the DLM to function.
42 *
43 */
44
45#include <asm/ioctls.h>
46#include <net/sock.h>
47#include <net/tcp.h>
48#include <linux/pagemap.h>
49#include <linux/file.h>
50#include <linux/mutex.h>
51#include <linux/sctp.h>
52#include <linux/slab.h>
53#include <net/sctp/sctp.h>
54#include <net/ipv6.h>
55
56#include <trace/events/dlm.h>
57#include <trace/events/sock.h>
58
59#include "dlm_internal.h"
60#include "lowcomms.h"
61#include "midcomms.h"
62#include "memory.h"
63#include "config.h"
64
65#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000)
66#define NEEDED_RMEM (4*1024*1024)
67
68struct connection {
69	struct socket *sock;	/* NULL if not connected */
70	uint32_t nodeid;	/* So we know who we are in the list */
71	/* this semaphore is used to allow parallel recv/send in read
72	 * lock mode. When we release a sock we need to held the write lock.
73	 *
74	 * However this is locking code and not nice. When we remove the
75	 * othercon handling we can look into other mechanism to synchronize
76	 * io handling to call sock_release() at the right time.
77	 */
78	struct rw_semaphore sock_lock;
79	unsigned long flags;
80#define CF_APP_LIMITED 0
81#define CF_RECV_PENDING 1
82#define CF_SEND_PENDING 2
83#define CF_RECV_INTR 3
84#define CF_IO_STOP 4
85#define CF_IS_OTHERCON 5
86	struct list_head writequeue;  /* List of outgoing writequeue_entries */
87	spinlock_t writequeue_lock;
88	int retries;
89	struct hlist_node list;
90	/* due some connect()/accept() races we currently have this cross over
91	 * connection attempt second connection for one node.
92	 *
93	 * There is a solution to avoid the race by introducing a connect
94	 * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to
95	 * connect. Otherside can connect but will only be considered that
96	 * the other side wants to have a reconnect.
97	 *
98	 * However changing to this behaviour will break backwards compatible.
99	 * In a DLM protocol major version upgrade we should remove this!
100	 */
101	struct connection *othercon;
102	struct work_struct rwork; /* receive worker */
103	struct work_struct swork; /* send worker */
104	wait_queue_head_t shutdown_wait;
105	unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE];
106	int rx_leftover;
107	int mark;
108	int addr_count;
109	int curr_addr_index;
110	struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT];
111	spinlock_t addrs_lock;
112	struct rcu_head rcu;
113};
114#define sock2con(x) ((struct connection *)(x)->sk_user_data)
115
116struct listen_connection {
117	struct socket *sock;
118	struct work_struct rwork;
119};
120
121#define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
122#define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
123
124/* An entry waiting to be sent */
125struct writequeue_entry {
126	struct list_head list;
127	struct page *page;
128	int offset;
129	int len;
130	int end;
131	int users;
132	bool dirty;
133	struct connection *con;
134	struct list_head msgs;
135	struct kref ref;
136};
137
138struct dlm_msg {
139	struct writequeue_entry *entry;
140	struct dlm_msg *orig_msg;
141	bool retransmit;
142	void *ppc;
143	int len;
144	int idx; /* new()/commit() idx exchange */
145
146	struct list_head list;
147	struct kref ref;
148};
149
150struct processqueue_entry {
151	unsigned char *buf;
152	int nodeid;
153	int buflen;
154
155	struct list_head list;
156};
157
158struct dlm_proto_ops {
159	bool try_new_addr;
160	const char *name;
161	int proto;
162
163	int (*connect)(struct connection *con, struct socket *sock,
164		       struct sockaddr *addr, int addr_len);
165	void (*sockopts)(struct socket *sock);
166	int (*bind)(struct socket *sock);
167	int (*listen_validate)(void);
168	void (*listen_sockopts)(struct socket *sock);
169	int (*listen_bind)(struct socket *sock);
170};
171
172static struct listen_sock_callbacks {
173	void (*sk_error_report)(struct sock *);
174	void (*sk_data_ready)(struct sock *);
175	void (*sk_state_change)(struct sock *);
176	void (*sk_write_space)(struct sock *);
177} listen_sock;
178
179static struct listen_connection listen_con;
180static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT];
181static int dlm_local_count;
182
183/* Work queues */
184static struct workqueue_struct *io_workqueue;
185static struct workqueue_struct *process_workqueue;
186
187static struct hlist_head connection_hash[CONN_HASH_SIZE];
188static DEFINE_SPINLOCK(connections_lock);
189DEFINE_STATIC_SRCU(connections_srcu);
190
191static const struct dlm_proto_ops *dlm_proto_ops;
192
193#define DLM_IO_SUCCESS 0
194#define DLM_IO_END 1
195#define DLM_IO_EOF 2
196#define DLM_IO_RESCHED 3
197
198static void process_recv_sockets(struct work_struct *work);
199static void process_send_sockets(struct work_struct *work);
200static void process_dlm_messages(struct work_struct *work);
201
202static DECLARE_WORK(process_work, process_dlm_messages);
203static DEFINE_SPINLOCK(processqueue_lock);
204static bool process_dlm_messages_pending;
205static LIST_HEAD(processqueue);
206
207bool dlm_lowcomms_is_running(void)
208{
209	return !!listen_con.sock;
210}
211
212static void lowcomms_queue_swork(struct connection *con)
213{
214	assert_spin_locked(&con->writequeue_lock);
215
216	if (!test_bit(CF_IO_STOP, &con->flags) &&
217	    !test_bit(CF_APP_LIMITED, &con->flags) &&
218	    !test_and_set_bit(CF_SEND_PENDING, &con->flags))
219		queue_work(io_workqueue, &con->swork);
220}
221
222static void lowcomms_queue_rwork(struct connection *con)
223{
224#ifdef CONFIG_LOCKDEP
225	WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk));
226#endif
227
228	if (!test_bit(CF_IO_STOP, &con->flags) &&
229	    !test_and_set_bit(CF_RECV_PENDING, &con->flags))
230		queue_work(io_workqueue, &con->rwork);
231}
232
233static void writequeue_entry_ctor(void *data)
234{
235	struct writequeue_entry *entry = data;
236
237	INIT_LIST_HEAD(&entry->msgs);
238}
239
240struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
241{
242	return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry),
243				 0, 0, writequeue_entry_ctor);
244}
245
246struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
247{
248	return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL);
249}
250
251/* need to held writequeue_lock */
252static struct writequeue_entry *con_next_wq(struct connection *con)
253{
254	struct writequeue_entry *e;
255
256	e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry,
257				     list);
258	/* if len is zero nothing is to send, if there are users filling
259	 * buffers we wait until the users are done so we can send more.
260	 */
261	if (!e || e->users || e->len == 0)
262		return NULL;
263
264	return e;
265}
266
267static struct connection *__find_con(int nodeid, int r)
268{
269	struct connection *con;
270
271	hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
272		if (con->nodeid == nodeid)
273			return con;
274	}
275
276	return NULL;
277}
278
279static void dlm_con_init(struct connection *con, int nodeid)
280{
281	con->nodeid = nodeid;
282	init_rwsem(&con->sock_lock);
283	INIT_LIST_HEAD(&con->writequeue);
284	spin_lock_init(&con->writequeue_lock);
285	INIT_WORK(&con->swork, process_send_sockets);
286	INIT_WORK(&con->rwork, process_recv_sockets);
287	spin_lock_init(&con->addrs_lock);
288	init_waitqueue_head(&con->shutdown_wait);
289}
290
291/*
292 * If 'allocation' is zero then we don't attempt to create a new
293 * connection structure for this node.
294 */
295static struct connection *nodeid2con(int nodeid, gfp_t alloc)
296{
297	struct connection *con, *tmp;
298	int r;
299
300	r = nodeid_hash(nodeid);
301	con = __find_con(nodeid, r);
302	if (con || !alloc)
303		return con;
304
305	con = kzalloc(sizeof(*con), alloc);
306	if (!con)
307		return NULL;
308
309	dlm_con_init(con, nodeid);
310
311	spin_lock(&connections_lock);
312	/* Because multiple workqueues/threads calls this function it can
313	 * race on multiple cpu's. Instead of locking hot path __find_con()
314	 * we just check in rare cases of recently added nodes again
315	 * under protection of connections_lock. If this is the case we
316	 * abort our connection creation and return the existing connection.
317	 */
318	tmp = __find_con(nodeid, r);
319	if (tmp) {
320		spin_unlock(&connections_lock);
321		kfree(con);
322		return tmp;
323	}
324
325	hlist_add_head_rcu(&con->list, &connection_hash[r]);
326	spin_unlock(&connections_lock);
327
328	return con;
329}
330
331static int addr_compare(const struct sockaddr_storage *x,
332			const struct sockaddr_storage *y)
333{
334	switch (x->ss_family) {
335	case AF_INET: {
336		struct sockaddr_in *sinx = (struct sockaddr_in *)x;
337		struct sockaddr_in *siny = (struct sockaddr_in *)y;
338		if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
339			return 0;
340		if (sinx->sin_port != siny->sin_port)
341			return 0;
342		break;
343	}
344	case AF_INET6: {
345		struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
346		struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
347		if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
348			return 0;
349		if (sinx->sin6_port != siny->sin6_port)
350			return 0;
351		break;
352	}
353	default:
354		return 0;
355	}
356	return 1;
357}
358
359static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
360			  struct sockaddr *sa_out, bool try_new_addr,
361			  unsigned int *mark)
362{
363	struct sockaddr_storage sas;
364	struct connection *con;
365	int idx;
366
367	if (!dlm_local_count)
368		return -1;
369
370	idx = srcu_read_lock(&connections_srcu);
371	con = nodeid2con(nodeid, 0);
372	if (!con) {
373		srcu_read_unlock(&connections_srcu, idx);
374		return -ENOENT;
375	}
376
377	spin_lock(&con->addrs_lock);
378	if (!con->addr_count) {
379		spin_unlock(&con->addrs_lock);
380		srcu_read_unlock(&connections_srcu, idx);
381		return -ENOENT;
382	}
383
384	memcpy(&sas, &con->addr[con->curr_addr_index],
385	       sizeof(struct sockaddr_storage));
386
387	if (try_new_addr) {
388		con->curr_addr_index++;
389		if (con->curr_addr_index == con->addr_count)
390			con->curr_addr_index = 0;
391	}
392
393	*mark = con->mark;
394	spin_unlock(&con->addrs_lock);
395
396	if (sas_out)
397		memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
398
399	if (!sa_out) {
400		srcu_read_unlock(&connections_srcu, idx);
401		return 0;
402	}
403
404	if (dlm_local_addr[0].ss_family == AF_INET) {
405		struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
406		struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
407		ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
408	} else {
409		struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
410		struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
411		ret6->sin6_addr = in6->sin6_addr;
412	}
413
414	srcu_read_unlock(&connections_srcu, idx);
415	return 0;
416}
417
418static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
419			  unsigned int *mark)
420{
421	struct connection *con;
422	int i, idx, addr_i;
423
424	idx = srcu_read_lock(&connections_srcu);
425	for (i = 0; i < CONN_HASH_SIZE; i++) {
426		hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
427			WARN_ON_ONCE(!con->addr_count);
428
429			spin_lock(&con->addrs_lock);
430			for (addr_i = 0; addr_i < con->addr_count; addr_i++) {
431				if (addr_compare(&con->addr[addr_i], addr)) {
432					*nodeid = con->nodeid;
433					*mark = con->mark;
434					spin_unlock(&con->addrs_lock);
435					srcu_read_unlock(&connections_srcu, idx);
436					return 0;
437				}
438			}
439			spin_unlock(&con->addrs_lock);
440		}
441	}
442	srcu_read_unlock(&connections_srcu, idx);
443
444	return -ENOENT;
445}
446
447static bool dlm_lowcomms_con_has_addr(const struct connection *con,
448				      const struct sockaddr_storage *addr)
449{
450	int i;
451
452	for (i = 0; i < con->addr_count; i++) {
453		if (addr_compare(&con->addr[i], addr))
454			return true;
455	}
456
457	return false;
458}
459
460int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
461{
462	struct connection *con;
463	bool ret, idx;
464
465	idx = srcu_read_lock(&connections_srcu);
466	con = nodeid2con(nodeid, GFP_NOFS);
467	if (!con) {
468		srcu_read_unlock(&connections_srcu, idx);
469		return -ENOMEM;
470	}
471
472	spin_lock(&con->addrs_lock);
473	if (!con->addr_count) {
474		memcpy(&con->addr[0], addr, sizeof(*addr));
475		con->addr_count = 1;
476		con->mark = dlm_config.ci_mark;
477		spin_unlock(&con->addrs_lock);
478		srcu_read_unlock(&connections_srcu, idx);
479		return 0;
480	}
481
482	ret = dlm_lowcomms_con_has_addr(con, addr);
483	if (ret) {
484		spin_unlock(&con->addrs_lock);
485		srcu_read_unlock(&connections_srcu, idx);
486		return -EEXIST;
487	}
488
489	if (con->addr_count >= DLM_MAX_ADDR_COUNT) {
490		spin_unlock(&con->addrs_lock);
491		srcu_read_unlock(&connections_srcu, idx);
492		return -ENOSPC;
493	}
494
495	memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr));
496	srcu_read_unlock(&connections_srcu, idx);
497	spin_unlock(&con->addrs_lock);
498	return 0;
499}
500
501/* Data available on socket or listen socket received a connect */
502static void lowcomms_data_ready(struct sock *sk)
503{
504	struct connection *con = sock2con(sk);
505
506	trace_sk_data_ready(sk);
507
508	set_bit(CF_RECV_INTR, &con->flags);
509	lowcomms_queue_rwork(con);
510}
511
512static void lowcomms_write_space(struct sock *sk)
513{
514	struct connection *con = sock2con(sk);
515
516	clear_bit(SOCK_NOSPACE, &con->sock->flags);
517
518	spin_lock_bh(&con->writequeue_lock);
519	if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
520		con->sock->sk->sk_write_pending--;
521		clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
522	}
523
524	lowcomms_queue_swork(con);
525	spin_unlock_bh(&con->writequeue_lock);
526}
527
528static void lowcomms_state_change(struct sock *sk)
529{
530	/* SCTP layer is not calling sk_data_ready when the connection
531	 * is done, so we catch the signal through here.
532	 */
533	if (sk->sk_shutdown == RCV_SHUTDOWN)
534		lowcomms_data_ready(sk);
535}
536
537static void lowcomms_listen_data_ready(struct sock *sk)
538{
539	trace_sk_data_ready(sk);
540
541	queue_work(io_workqueue, &listen_con.rwork);
542}
543
544int dlm_lowcomms_connect_node(int nodeid)
545{
546	struct connection *con;
547	int idx;
548
549	idx = srcu_read_lock(&connections_srcu);
550	con = nodeid2con(nodeid, 0);
551	if (WARN_ON_ONCE(!con)) {
552		srcu_read_unlock(&connections_srcu, idx);
553		return -ENOENT;
554	}
555
556	down_read(&con->sock_lock);
557	if (!con->sock) {
558		spin_lock_bh(&con->writequeue_lock);
559		lowcomms_queue_swork(con);
560		spin_unlock_bh(&con->writequeue_lock);
561	}
562	up_read(&con->sock_lock);
563	srcu_read_unlock(&connections_srcu, idx);
564
565	cond_resched();
566	return 0;
567}
568
569int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
570{
571	struct connection *con;
572	int idx;
573
574	idx = srcu_read_lock(&connections_srcu);
575	con = nodeid2con(nodeid, 0);
576	if (!con) {
577		srcu_read_unlock(&connections_srcu, idx);
578		return -ENOENT;
579	}
580
581	spin_lock(&con->addrs_lock);
582	con->mark = mark;
583	spin_unlock(&con->addrs_lock);
584	srcu_read_unlock(&connections_srcu, idx);
585	return 0;
586}
587
588static void lowcomms_error_report(struct sock *sk)
589{
590	struct connection *con = sock2con(sk);
591	struct inet_sock *inet;
592
593	inet = inet_sk(sk);
594	switch (sk->sk_family) {
595	case AF_INET:
596		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
597				   "sending to node %d at %pI4, dport %d, "
598				   "sk_err=%d/%d\n", dlm_our_nodeid(),
599				   con->nodeid, &inet->inet_daddr,
600				   ntohs(inet->inet_dport), sk->sk_err,
601				   READ_ONCE(sk->sk_err_soft));
602		break;
603#if IS_ENABLED(CONFIG_IPV6)
604	case AF_INET6:
605		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
606				   "sending to node %d at %pI6c, "
607				   "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(),
608				   con->nodeid, &sk->sk_v6_daddr,
609				   ntohs(inet->inet_dport), sk->sk_err,
610				   READ_ONCE(sk->sk_err_soft));
611		break;
612#endif
613	default:
614		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
615				   "invalid socket family %d set, "
616				   "sk_err=%d/%d\n", dlm_our_nodeid(),
617				   sk->sk_family, sk->sk_err,
618				   READ_ONCE(sk->sk_err_soft));
619		break;
620	}
621
622	dlm_midcomms_unack_msg_resend(con->nodeid);
623
624	listen_sock.sk_error_report(sk);
625}
626
627static void restore_callbacks(struct sock *sk)
628{
629#ifdef CONFIG_LOCKDEP
630	WARN_ON_ONCE(!lockdep_sock_is_held(sk));
631#endif
632
633	sk->sk_user_data = NULL;
634	sk->sk_data_ready = listen_sock.sk_data_ready;
635	sk->sk_state_change = listen_sock.sk_state_change;
636	sk->sk_write_space = listen_sock.sk_write_space;
637	sk->sk_error_report = listen_sock.sk_error_report;
638}
639
640/* Make a socket active */
641static void add_sock(struct socket *sock, struct connection *con)
642{
643	struct sock *sk = sock->sk;
644
645	lock_sock(sk);
646	con->sock = sock;
647
648	sk->sk_user_data = con;
649	sk->sk_data_ready = lowcomms_data_ready;
650	sk->sk_write_space = lowcomms_write_space;
651	if (dlm_config.ci_protocol == DLM_PROTO_SCTP)
652		sk->sk_state_change = lowcomms_state_change;
653	sk->sk_allocation = GFP_NOFS;
654	sk->sk_use_task_frag = false;
655	sk->sk_error_report = lowcomms_error_report;
656	release_sock(sk);
657}
658
659/* Add the port number to an IPv6 or 4 sockaddr and return the address
660   length */
661static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
662			  int *addr_len)
663{
664	saddr->ss_family =  dlm_local_addr[0].ss_family;
665	if (saddr->ss_family == AF_INET) {
666		struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
667		in4_addr->sin_port = cpu_to_be16(port);
668		*addr_len = sizeof(struct sockaddr_in);
669		memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
670	} else {
671		struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
672		in6_addr->sin6_port = cpu_to_be16(port);
673		*addr_len = sizeof(struct sockaddr_in6);
674	}
675	memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
676}
677
678static void dlm_page_release(struct kref *kref)
679{
680	struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
681						  ref);
682
683	__free_page(e->page);
684	dlm_free_writequeue(e);
685}
686
687static void dlm_msg_release(struct kref *kref)
688{
689	struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
690
691	kref_put(&msg->entry->ref, dlm_page_release);
692	dlm_free_msg(msg);
693}
694
695static void free_entry(struct writequeue_entry *e)
696{
697	struct dlm_msg *msg, *tmp;
698
699	list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
700		if (msg->orig_msg) {
701			msg->orig_msg->retransmit = false;
702			kref_put(&msg->orig_msg->ref, dlm_msg_release);
703		}
704
705		list_del(&msg->list);
706		kref_put(&msg->ref, dlm_msg_release);
707	}
708
709	list_del(&e->list);
710	kref_put(&e->ref, dlm_page_release);
711}
712
713static void dlm_close_sock(struct socket **sock)
714{
715	lock_sock((*sock)->sk);
716	restore_callbacks((*sock)->sk);
717	release_sock((*sock)->sk);
718
719	sock_release(*sock);
720	*sock = NULL;
721}
722
723static void allow_connection_io(struct connection *con)
724{
725	if (con->othercon)
726		clear_bit(CF_IO_STOP, &con->othercon->flags);
727	clear_bit(CF_IO_STOP, &con->flags);
728}
729
730static void stop_connection_io(struct connection *con)
731{
732	if (con->othercon)
733		stop_connection_io(con->othercon);
734
735	spin_lock_bh(&con->writequeue_lock);
736	set_bit(CF_IO_STOP, &con->flags);
737	spin_unlock_bh(&con->writequeue_lock);
738
739	down_write(&con->sock_lock);
740	if (con->sock) {
741		lock_sock(con->sock->sk);
742		restore_callbacks(con->sock->sk);
743		release_sock(con->sock->sk);
744	}
745	up_write(&con->sock_lock);
746
747	cancel_work_sync(&con->swork);
748	cancel_work_sync(&con->rwork);
749}
750
751/* Close a remote connection and tidy up */
752static void close_connection(struct connection *con, bool and_other)
753{
754	struct writequeue_entry *e;
755
756	if (con->othercon && and_other)
757		close_connection(con->othercon, false);
758
759	down_write(&con->sock_lock);
760	if (!con->sock) {
761		up_write(&con->sock_lock);
762		return;
763	}
764
765	dlm_close_sock(&con->sock);
766
767	/* if we send a writequeue entry only a half way, we drop the
768	 * whole entry because reconnection and that we not start of the
769	 * middle of a msg which will confuse the other end.
770	 *
771	 * we can always drop messages because retransmits, but what we
772	 * cannot allow is to transmit half messages which may be processed
773	 * at the other side.
774	 *
775	 * our policy is to start on a clean state when disconnects, we don't
776	 * know what's send/received on transport layer in this case.
777	 */
778	spin_lock_bh(&con->writequeue_lock);
779	if (!list_empty(&con->writequeue)) {
780		e = list_first_entry(&con->writequeue, struct writequeue_entry,
781				     list);
782		if (e->dirty)
783			free_entry(e);
784	}
785	spin_unlock_bh(&con->writequeue_lock);
786
787	con->rx_leftover = 0;
788	con->retries = 0;
789	clear_bit(CF_APP_LIMITED, &con->flags);
790	clear_bit(CF_RECV_PENDING, &con->flags);
791	clear_bit(CF_SEND_PENDING, &con->flags);
792	up_write(&con->sock_lock);
793}
794
795static void shutdown_connection(struct connection *con, bool and_other)
796{
797	int ret;
798
799	if (con->othercon && and_other)
800		shutdown_connection(con->othercon, false);
801
802	flush_workqueue(io_workqueue);
803	down_read(&con->sock_lock);
804	/* nothing to shutdown */
805	if (!con->sock) {
806		up_read(&con->sock_lock);
807		return;
808	}
809
810	ret = kernel_sock_shutdown(con->sock, SHUT_WR);
811	up_read(&con->sock_lock);
812	if (ret) {
813		log_print("Connection %p failed to shutdown: %d will force close",
814			  con, ret);
815		goto force_close;
816	} else {
817		ret = wait_event_timeout(con->shutdown_wait, !con->sock,
818					 DLM_SHUTDOWN_WAIT_TIMEOUT);
819		if (ret == 0) {
820			log_print("Connection %p shutdown timed out, will force close",
821				  con);
822			goto force_close;
823		}
824	}
825
826	return;
827
828force_close:
829	close_connection(con, false);
830}
831
832static struct processqueue_entry *new_processqueue_entry(int nodeid,
833							 int buflen)
834{
835	struct processqueue_entry *pentry;
836
837	pentry = kmalloc(sizeof(*pentry), GFP_NOFS);
838	if (!pentry)
839		return NULL;
840
841	pentry->buf = kmalloc(buflen, GFP_NOFS);
842	if (!pentry->buf) {
843		kfree(pentry);
844		return NULL;
845	}
846
847	pentry->nodeid = nodeid;
848	return pentry;
849}
850
851static void free_processqueue_entry(struct processqueue_entry *pentry)
852{
853	kfree(pentry->buf);
854	kfree(pentry);
855}
856
857struct dlm_processed_nodes {
858	int nodeid;
859
860	struct list_head list;
861};
862
863static void process_dlm_messages(struct work_struct *work)
864{
865	struct processqueue_entry *pentry;
866
867	spin_lock(&processqueue_lock);
868	pentry = list_first_entry_or_null(&processqueue,
869					  struct processqueue_entry, list);
870	if (WARN_ON_ONCE(!pentry)) {
871		process_dlm_messages_pending = false;
872		spin_unlock(&processqueue_lock);
873		return;
874	}
875
876	list_del(&pentry->list);
877	spin_unlock(&processqueue_lock);
878
879	for (;;) {
880		dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
881					    pentry->buflen);
882		free_processqueue_entry(pentry);
883
884		spin_lock(&processqueue_lock);
885		pentry = list_first_entry_or_null(&processqueue,
886						  struct processqueue_entry, list);
887		if (!pentry) {
888			process_dlm_messages_pending = false;
889			spin_unlock(&processqueue_lock);
890			break;
891		}
892
893		list_del(&pentry->list);
894		spin_unlock(&processqueue_lock);
895	}
896}
897
898/* Data received from remote end */
899static int receive_from_sock(struct connection *con, int buflen)
900{
901	struct processqueue_entry *pentry;
902	int ret, buflen_real;
903	struct msghdr msg;
904	struct kvec iov;
905
906	pentry = new_processqueue_entry(con->nodeid, buflen);
907	if (!pentry)
908		return DLM_IO_RESCHED;
909
910	memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover);
911
912	/* calculate new buffer parameter regarding last receive and
913	 * possible leftover bytes
914	 */
915	iov.iov_base = pentry->buf + con->rx_leftover;
916	iov.iov_len = buflen - con->rx_leftover;
917
918	memset(&msg, 0, sizeof(msg));
919	msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
920	clear_bit(CF_RECV_INTR, &con->flags);
921again:
922	ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
923			     msg.msg_flags);
924	trace_dlm_recv(con->nodeid, ret);
925	if (ret == -EAGAIN) {
926		lock_sock(con->sock->sk);
927		if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) {
928			release_sock(con->sock->sk);
929			goto again;
930		}
931
932		clear_bit(CF_RECV_PENDING, &con->flags);
933		release_sock(con->sock->sk);
934		free_processqueue_entry(pentry);
935		return DLM_IO_END;
936	} else if (ret == 0) {
937		/* close will clear CF_RECV_PENDING */
938		free_processqueue_entry(pentry);
939		return DLM_IO_EOF;
940	} else if (ret < 0) {
941		free_processqueue_entry(pentry);
942		return ret;
943	}
944
945	/* new buflen according readed bytes and leftover from last receive */
946	buflen_real = ret + con->rx_leftover;
947	ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf,
948					   buflen_real);
949	if (ret < 0) {
950		free_processqueue_entry(pentry);
951		return ret;
952	}
953
954	pentry->buflen = ret;
955
956	/* calculate leftover bytes from process and put it into begin of
957	 * the receive buffer, so next receive we have the full message
958	 * at the start address of the receive buffer.
959	 */
960	con->rx_leftover = buflen_real - ret;
961	memmove(con->rx_leftover_buf, pentry->buf + ret,
962		con->rx_leftover);
963
964	spin_lock(&processqueue_lock);
965	list_add_tail(&pentry->list, &processqueue);
966	if (!process_dlm_messages_pending) {
967		process_dlm_messages_pending = true;
968		queue_work(process_workqueue, &process_work);
969	}
970	spin_unlock(&processqueue_lock);
971
972	return DLM_IO_SUCCESS;
973}
974
975/* Listening socket is busy, accept a connection */
976static int accept_from_sock(void)
977{
978	struct sockaddr_storage peeraddr;
979	int len, idx, result, nodeid;
980	struct connection *newcon;
981	struct socket *newsock;
982	unsigned int mark;
983
984	result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK);
985	if (result == -EAGAIN)
986		return DLM_IO_END;
987	else if (result < 0)
988		goto accept_err;
989
990	/* Get the connected socket's peer */
991	memset(&peeraddr, 0, sizeof(peeraddr));
992	len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2);
993	if (len < 0) {
994		result = -ECONNABORTED;
995		goto accept_err;
996	}
997
998	/* Get the new node's NODEID */
999	make_sockaddr(&peeraddr, 0, &len);
1000	if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) {
1001		switch (peeraddr.ss_family) {
1002		case AF_INET: {
1003			struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr;
1004
1005			log_print("connect from non cluster IPv4 node %pI4",
1006				  &sin->sin_addr);
1007			break;
1008		}
1009#if IS_ENABLED(CONFIG_IPV6)
1010		case AF_INET6: {
1011			struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr;
1012
1013			log_print("connect from non cluster IPv6 node %pI6c",
1014				  &sin6->sin6_addr);
1015			break;
1016		}
1017#endif
1018		default:
1019			log_print("invalid family from non cluster node");
1020			break;
1021		}
1022
1023		sock_release(newsock);
1024		return -1;
1025	}
1026
1027	log_print("got connection from %d", nodeid);
1028
1029	/*  Check to see if we already have a connection to this node. This
1030	 *  could happen if the two nodes initiate a connection at roughly
1031	 *  the same time and the connections cross on the wire.
1032	 *  In this case we store the incoming one in "othercon"
1033	 */
1034	idx = srcu_read_lock(&connections_srcu);
1035	newcon = nodeid2con(nodeid, 0);
1036	if (WARN_ON_ONCE(!newcon)) {
1037		srcu_read_unlock(&connections_srcu, idx);
1038		result = -ENOENT;
1039		goto accept_err;
1040	}
1041
1042	sock_set_mark(newsock->sk, mark);
1043
1044	down_write(&newcon->sock_lock);
1045	if (newcon->sock) {
1046		struct connection *othercon = newcon->othercon;
1047
1048		if (!othercon) {
1049			othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
1050			if (!othercon) {
1051				log_print("failed to allocate incoming socket");
1052				up_write(&newcon->sock_lock);
1053				srcu_read_unlock(&connections_srcu, idx);
1054				result = -ENOMEM;
1055				goto accept_err;
1056			}
1057
1058			dlm_con_init(othercon, nodeid);
1059			lockdep_set_subclass(&othercon->sock_lock, 1);
1060			newcon->othercon = othercon;
1061			set_bit(CF_IS_OTHERCON, &othercon->flags);
1062		} else {
1063			/* close other sock con if we have something new */
1064			close_connection(othercon, false);
1065		}
1066
1067		down_write(&othercon->sock_lock);
1068		add_sock(newsock, othercon);
1069
1070		/* check if we receved something while adding */
1071		lock_sock(othercon->sock->sk);
1072		lowcomms_queue_rwork(othercon);
1073		release_sock(othercon->sock->sk);
1074		up_write(&othercon->sock_lock);
1075	}
1076	else {
1077		/* accept copies the sk after we've saved the callbacks, so we
1078		   don't want to save them a second time or comm errors will
1079		   result in calling sk_error_report recursively. */
1080		add_sock(newsock, newcon);
1081
1082		/* check if we receved something while adding */
1083		lock_sock(newcon->sock->sk);
1084		lowcomms_queue_rwork(newcon);
1085		release_sock(newcon->sock->sk);
1086	}
1087	up_write(&newcon->sock_lock);
1088	srcu_read_unlock(&connections_srcu, idx);
1089
1090	return DLM_IO_SUCCESS;
1091
1092accept_err:
1093	if (newsock)
1094		sock_release(newsock);
1095
1096	return result;
1097}
1098
1099/*
1100 * writequeue_entry_complete - try to delete and free write queue entry
1101 * @e: write queue entry to try to delete
1102 * @completed: bytes completed
1103 *
1104 * writequeue_lock must be held.
1105 */
1106static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
1107{
1108	e->offset += completed;
1109	e->len -= completed;
1110	/* signal that page was half way transmitted */
1111	e->dirty = true;
1112
1113	if (e->len == 0 && e->users == 0)
1114		free_entry(e);
1115}
1116
1117/*
1118 * sctp_bind_addrs - bind a SCTP socket to all our addresses
1119 */
1120static int sctp_bind_addrs(struct socket *sock, uint16_t port)
1121{
1122	struct sockaddr_storage localaddr;
1123	struct sockaddr *addr = (struct sockaddr *)&localaddr;
1124	int i, addr_len, result = 0;
1125
1126	for (i = 0; i < dlm_local_count; i++) {
1127		memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr));
1128		make_sockaddr(&localaddr, port, &addr_len);
1129
1130		if (!i)
1131			result = kernel_bind(sock, addr, addr_len);
1132		else
1133			result = sock_bind_add(sock->sk, addr, addr_len);
1134
1135		if (result < 0) {
1136			log_print("Can't bind to %d addr number %d, %d.\n",
1137				  port, i + 1, result);
1138			break;
1139		}
1140	}
1141	return result;
1142}
1143
1144/* Get local addresses */
1145static void init_local(void)
1146{
1147	struct sockaddr_storage sas;
1148	int i;
1149
1150	dlm_local_count = 0;
1151	for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1152		if (dlm_our_addr(&sas, i))
1153			break;
1154
1155		memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas));
1156	}
1157}
1158
1159static struct writequeue_entry *new_writequeue_entry(struct connection *con)
1160{
1161	struct writequeue_entry *entry;
1162
1163	entry = dlm_allocate_writequeue();
1164	if (!entry)
1165		return NULL;
1166
1167	entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO);
1168	if (!entry->page) {
1169		dlm_free_writequeue(entry);
1170		return NULL;
1171	}
1172
1173	entry->offset = 0;
1174	entry->len = 0;
1175	entry->end = 0;
1176	entry->dirty = false;
1177	entry->con = con;
1178	entry->users = 1;
1179	kref_init(&entry->ref);
1180	return entry;
1181}
1182
1183static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
1184					     char **ppc, void (*cb)(void *data),
1185					     void *data)
1186{
1187	struct writequeue_entry *e;
1188
1189	spin_lock_bh(&con->writequeue_lock);
1190	if (!list_empty(&con->writequeue)) {
1191		e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
1192		if (DLM_WQ_REMAIN_BYTES(e) >= len) {
1193			kref_get(&e->ref);
1194
1195			*ppc = page_address(e->page) + e->end;
1196			if (cb)
1197				cb(data);
1198
1199			e->end += len;
1200			e->users++;
1201			goto out;
1202		}
1203	}
1204
1205	e = new_writequeue_entry(con);
1206	if (!e)
1207		goto out;
1208
1209	kref_get(&e->ref);
1210	*ppc = page_address(e->page);
1211	e->end += len;
1212	if (cb)
1213		cb(data);
1214
1215	list_add_tail(&e->list, &con->writequeue);
1216
1217out:
1218	spin_unlock_bh(&con->writequeue_lock);
1219	return e;
1220};
1221
1222static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
1223						gfp_t allocation, char **ppc,
1224						void (*cb)(void *data),
1225						void *data)
1226{
1227	struct writequeue_entry *e;
1228	struct dlm_msg *msg;
1229
1230	msg = dlm_allocate_msg(allocation);
1231	if (!msg)
1232		return NULL;
1233
1234	kref_init(&msg->ref);
1235
1236	e = new_wq_entry(con, len, ppc, cb, data);
1237	if (!e) {
1238		dlm_free_msg(msg);
1239		return NULL;
1240	}
1241
1242	msg->retransmit = false;
1243	msg->orig_msg = NULL;
1244	msg->ppc = *ppc;
1245	msg->len = len;
1246	msg->entry = e;
1247
1248	return msg;
1249}
1250
1251/* avoid false positive for nodes_srcu, unlock happens in
1252 * dlm_lowcomms_commit_msg which is a must call if success
1253 */
1254#ifndef __CHECKER__
1255struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
1256				     char **ppc, void (*cb)(void *data),
1257				     void *data)
1258{
1259	struct connection *con;
1260	struct dlm_msg *msg;
1261	int idx;
1262
1263	if (len > DLM_MAX_SOCKET_BUFSIZE ||
1264	    len < sizeof(struct dlm_header)) {
1265		BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE);
1266		log_print("failed to allocate a buffer of size %d", len);
1267		WARN_ON_ONCE(1);
1268		return NULL;
1269	}
1270
1271	idx = srcu_read_lock(&connections_srcu);
1272	con = nodeid2con(nodeid, 0);
1273	if (WARN_ON_ONCE(!con)) {
1274		srcu_read_unlock(&connections_srcu, idx);
1275		return NULL;
1276	}
1277
1278	msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data);
1279	if (!msg) {
1280		srcu_read_unlock(&connections_srcu, idx);
1281		return NULL;
1282	}
1283
1284	/* for dlm_lowcomms_commit_msg() */
1285	kref_get(&msg->ref);
1286	/* we assume if successful commit must called */
1287	msg->idx = idx;
1288	return msg;
1289}
1290#endif
1291
1292static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1293{
1294	struct writequeue_entry *e = msg->entry;
1295	struct connection *con = e->con;
1296	int users;
1297
1298	spin_lock_bh(&con->writequeue_lock);
1299	kref_get(&msg->ref);
1300	list_add(&msg->list, &e->msgs);
1301
1302	users = --e->users;
1303	if (users)
1304		goto out;
1305
1306	e->len = DLM_WQ_LENGTH_BYTES(e);
1307
1308	lowcomms_queue_swork(con);
1309
1310out:
1311	spin_unlock_bh(&con->writequeue_lock);
1312	return;
1313}
1314
1315/* avoid false positive for nodes_srcu, lock was happen in
1316 * dlm_lowcomms_new_msg
1317 */
1318#ifndef __CHECKER__
1319void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1320{
1321	_dlm_lowcomms_commit_msg(msg);
1322	srcu_read_unlock(&connections_srcu, msg->idx);
1323	/* because dlm_lowcomms_new_msg() */
1324	kref_put(&msg->ref, dlm_msg_release);
1325}
1326#endif
1327
1328void dlm_lowcomms_put_msg(struct dlm_msg *msg)
1329{
1330	kref_put(&msg->ref, dlm_msg_release);
1331}
1332
1333/* does not held connections_srcu, usage lowcomms_error_report only */
1334int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
1335{
1336	struct dlm_msg *msg_resend;
1337	char *ppc;
1338
1339	if (msg->retransmit)
1340		return 1;
1341
1342	msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
1343					      GFP_ATOMIC, &ppc, NULL, NULL);
1344	if (!msg_resend)
1345		return -ENOMEM;
1346
1347	msg->retransmit = true;
1348	kref_get(&msg->ref);
1349	msg_resend->orig_msg = msg;
1350
1351	memcpy(ppc, msg->ppc, msg->len);
1352	_dlm_lowcomms_commit_msg(msg_resend);
1353	dlm_lowcomms_put_msg(msg_resend);
1354
1355	return 0;
1356}
1357
1358/* Send a message */
1359static int send_to_sock(struct connection *con)
1360{
1361	struct writequeue_entry *e;
1362	struct bio_vec bvec;
1363	struct msghdr msg = {
1364		.msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT | MSG_NOSIGNAL,
1365	};
1366	int len, offset, ret;
1367
1368	spin_lock_bh(&con->writequeue_lock);
1369	e = con_next_wq(con);
1370	if (!e) {
1371		clear_bit(CF_SEND_PENDING, &con->flags);
1372		spin_unlock_bh(&con->writequeue_lock);
1373		return DLM_IO_END;
1374	}
1375
1376	len = e->len;
1377	offset = e->offset;
1378	WARN_ON_ONCE(len == 0 && e->users == 0);
1379	spin_unlock_bh(&con->writequeue_lock);
1380
1381	bvec_set_page(&bvec, e->page, len, offset);
1382	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, len);
1383	ret = sock_sendmsg(con->sock, &msg);
1384	trace_dlm_send(con->nodeid, ret);
1385	if (ret == -EAGAIN || ret == 0) {
1386		lock_sock(con->sock->sk);
1387		spin_lock_bh(&con->writequeue_lock);
1388		if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1389		    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1390			/* Notify TCP that we're limited by the
1391			 * application window size.
1392			 */
1393			set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags);
1394			con->sock->sk->sk_write_pending++;
1395
1396			clear_bit(CF_SEND_PENDING, &con->flags);
1397			spin_unlock_bh(&con->writequeue_lock);
1398			release_sock(con->sock->sk);
1399
1400			/* wait for write_space() event */
1401			return DLM_IO_END;
1402		}
1403		spin_unlock_bh(&con->writequeue_lock);
1404		release_sock(con->sock->sk);
1405
1406		return DLM_IO_RESCHED;
1407	} else if (ret < 0) {
1408		return ret;
1409	}
1410
1411	spin_lock_bh(&con->writequeue_lock);
1412	writequeue_entry_complete(e, ret);
1413	spin_unlock_bh(&con->writequeue_lock);
1414
1415	return DLM_IO_SUCCESS;
1416}
1417
1418static void clean_one_writequeue(struct connection *con)
1419{
1420	struct writequeue_entry *e, *safe;
1421
1422	spin_lock_bh(&con->writequeue_lock);
1423	list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1424		free_entry(e);
1425	}
1426	spin_unlock_bh(&con->writequeue_lock);
1427}
1428
1429static void connection_release(struct rcu_head *rcu)
1430{
1431	struct connection *con = container_of(rcu, struct connection, rcu);
1432
1433	WARN_ON_ONCE(!list_empty(&con->writequeue));
1434	WARN_ON_ONCE(con->sock);
1435	kfree(con);
1436}
1437
1438/* Called from recovery when it knows that a node has
1439   left the cluster */
1440int dlm_lowcomms_close(int nodeid)
1441{
1442	struct connection *con;
1443	int idx;
1444
1445	log_print("closing connection to node %d", nodeid);
1446
1447	idx = srcu_read_lock(&connections_srcu);
1448	con = nodeid2con(nodeid, 0);
1449	if (WARN_ON_ONCE(!con)) {
1450		srcu_read_unlock(&connections_srcu, idx);
1451		return -ENOENT;
1452	}
1453
1454	stop_connection_io(con);
1455	log_print("io handling for node: %d stopped", nodeid);
1456	close_connection(con, true);
1457
1458	spin_lock(&connections_lock);
1459	hlist_del_rcu(&con->list);
1460	spin_unlock(&connections_lock);
1461
1462	clean_one_writequeue(con);
1463	call_srcu(&connections_srcu, &con->rcu, connection_release);
1464	if (con->othercon) {
1465		clean_one_writequeue(con->othercon);
1466		call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
1467	}
1468	srcu_read_unlock(&connections_srcu, idx);
1469
1470	/* for debugging we print when we are done to compare with other
1471	 * messages in between. This function need to be correctly synchronized
1472	 * with io handling
1473	 */
1474	log_print("closing connection to node %d done", nodeid);
1475
1476	return 0;
1477}
1478
1479/* Receive worker function */
1480static void process_recv_sockets(struct work_struct *work)
1481{
1482	struct connection *con = container_of(work, struct connection, rwork);
1483	int ret, buflen;
1484
1485	down_read(&con->sock_lock);
1486	if (!con->sock) {
1487		up_read(&con->sock_lock);
1488		return;
1489	}
1490
1491	buflen = READ_ONCE(dlm_config.ci_buffer_size);
1492	do {
1493		ret = receive_from_sock(con, buflen);
1494	} while (ret == DLM_IO_SUCCESS);
1495	up_read(&con->sock_lock);
1496
1497	switch (ret) {
1498	case DLM_IO_END:
1499		/* CF_RECV_PENDING cleared */
1500		break;
1501	case DLM_IO_EOF:
1502		close_connection(con, false);
1503		wake_up(&con->shutdown_wait);
1504		/* CF_RECV_PENDING cleared */
1505		break;
1506	case DLM_IO_RESCHED:
1507		cond_resched();
1508		queue_work(io_workqueue, &con->rwork);
1509		/* CF_RECV_PENDING not cleared */
1510		break;
1511	default:
1512		if (ret < 0) {
1513			if (test_bit(CF_IS_OTHERCON, &con->flags)) {
1514				close_connection(con, false);
1515			} else {
1516				spin_lock_bh(&con->writequeue_lock);
1517				lowcomms_queue_swork(con);
1518				spin_unlock_bh(&con->writequeue_lock);
1519			}
1520
1521			/* CF_RECV_PENDING cleared for othercon
1522			 * we trigger send queue if not already done
1523			 * and process_send_sockets will handle it
1524			 */
1525			break;
1526		}
1527
1528		WARN_ON_ONCE(1);
1529		break;
1530	}
1531}
1532
1533static void process_listen_recv_socket(struct work_struct *work)
1534{
1535	int ret;
1536
1537	if (WARN_ON_ONCE(!listen_con.sock))
1538		return;
1539
1540	do {
1541		ret = accept_from_sock();
1542	} while (ret == DLM_IO_SUCCESS);
1543
1544	if (ret < 0)
1545		log_print("critical error accepting connection: %d", ret);
1546}
1547
1548static int dlm_connect(struct connection *con)
1549{
1550	struct sockaddr_storage addr;
1551	int result, addr_len;
1552	struct socket *sock;
1553	unsigned int mark;
1554
1555	memset(&addr, 0, sizeof(addr));
1556	result = nodeid_to_addr(con->nodeid, &addr, NULL,
1557				dlm_proto_ops->try_new_addr, &mark);
1558	if (result < 0) {
1559		log_print("no address for nodeid %d", con->nodeid);
1560		return result;
1561	}
1562
1563	/* Create a socket to communicate with */
1564	result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
1565				  SOCK_STREAM, dlm_proto_ops->proto, &sock);
1566	if (result < 0)
1567		return result;
1568
1569	sock_set_mark(sock->sk, mark);
1570	dlm_proto_ops->sockopts(sock);
1571
1572	result = dlm_proto_ops->bind(sock);
1573	if (result < 0) {
1574		sock_release(sock);
1575		return result;
1576	}
1577
1578	add_sock(sock, con);
1579
1580	log_print_ratelimited("connecting to %d", con->nodeid);
1581	make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
1582	result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
1583					addr_len);
1584	switch (result) {
1585	case -EINPROGRESS:
1586		/* not an error */
1587		fallthrough;
1588	case 0:
1589		break;
1590	default:
1591		if (result < 0)
1592			dlm_close_sock(&con->sock);
1593
1594		break;
1595	}
1596
1597	return result;
1598}
1599
1600/* Send worker function */
1601static void process_send_sockets(struct work_struct *work)
1602{
1603	struct connection *con = container_of(work, struct connection, swork);
1604	int ret;
1605
1606	WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags));
1607
1608	down_read(&con->sock_lock);
1609	if (!con->sock) {
1610		up_read(&con->sock_lock);
1611		down_write(&con->sock_lock);
1612		if (!con->sock) {
1613			ret = dlm_connect(con);
1614			switch (ret) {
1615			case 0:
1616				break;
1617			case -EINPROGRESS:
1618				/* avoid spamming resched on connection
1619				 * we might can switch to a state_change
1620				 * event based mechanism if established
1621				 */
1622				msleep(100);
1623				break;
1624			default:
1625				/* CF_SEND_PENDING not cleared */
1626				up_write(&con->sock_lock);
1627				log_print("connect to node %d try %d error %d",
1628					  con->nodeid, con->retries++, ret);
1629				msleep(1000);
1630				/* For now we try forever to reconnect. In
1631				 * future we should send a event to cluster
1632				 * manager to fence itself after certain amount
1633				 * of retries.
1634				 */
1635				queue_work(io_workqueue, &con->swork);
1636				return;
1637			}
1638		}
1639		downgrade_write(&con->sock_lock);
1640	}
1641
1642	do {
1643		ret = send_to_sock(con);
1644	} while (ret == DLM_IO_SUCCESS);
1645	up_read(&con->sock_lock);
1646
1647	switch (ret) {
1648	case DLM_IO_END:
1649		/* CF_SEND_PENDING cleared */
1650		break;
1651	case DLM_IO_RESCHED:
1652		/* CF_SEND_PENDING not cleared */
1653		cond_resched();
1654		queue_work(io_workqueue, &con->swork);
1655		break;
1656	default:
1657		if (ret < 0) {
1658			close_connection(con, false);
1659
1660			/* CF_SEND_PENDING cleared */
1661			spin_lock_bh(&con->writequeue_lock);
1662			lowcomms_queue_swork(con);
1663			spin_unlock_bh(&con->writequeue_lock);
1664			break;
1665		}
1666
1667		WARN_ON_ONCE(1);
1668		break;
1669	}
1670}
1671
1672static void work_stop(void)
1673{
1674	if (io_workqueue) {
1675		destroy_workqueue(io_workqueue);
1676		io_workqueue = NULL;
1677	}
1678
1679	if (process_workqueue) {
1680		destroy_workqueue(process_workqueue);
1681		process_workqueue = NULL;
1682	}
1683}
1684
1685static int work_start(void)
1686{
1687	io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM |
1688				       WQ_UNBOUND, 0);
1689	if (!io_workqueue) {
1690		log_print("can't start dlm_io");
1691		return -ENOMEM;
1692	}
1693
1694	/* ordered dlm message process queue,
1695	 * should be converted to a tasklet
1696	 */
1697	process_workqueue = alloc_ordered_workqueue("dlm_process",
1698						    WQ_HIGHPRI | WQ_MEM_RECLAIM);
1699	if (!process_workqueue) {
1700		log_print("can't start dlm_process");
1701		destroy_workqueue(io_workqueue);
1702		io_workqueue = NULL;
1703		return -ENOMEM;
1704	}
1705
1706	return 0;
1707}
1708
1709void dlm_lowcomms_shutdown(void)
1710{
1711	struct connection *con;
1712	int i, idx;
1713
1714	/* stop lowcomms_listen_data_ready calls */
1715	lock_sock(listen_con.sock->sk);
1716	listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready;
1717	release_sock(listen_con.sock->sk);
1718
1719	cancel_work_sync(&listen_con.rwork);
1720	dlm_close_sock(&listen_con.sock);
1721
1722	idx = srcu_read_lock(&connections_srcu);
1723	for (i = 0; i < CONN_HASH_SIZE; i++) {
1724		hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
1725			shutdown_connection(con, true);
1726			stop_connection_io(con);
1727			flush_workqueue(process_workqueue);
1728			close_connection(con, true);
1729
1730			clean_one_writequeue(con);
1731			if (con->othercon)
1732				clean_one_writequeue(con->othercon);
1733			allow_connection_io(con);
1734		}
1735	}
1736	srcu_read_unlock(&connections_srcu, idx);
1737}
1738
1739void dlm_lowcomms_stop(void)
1740{
1741	work_stop();
1742	dlm_proto_ops = NULL;
1743}
1744
1745static int dlm_listen_for_all(void)
1746{
1747	struct socket *sock;
1748	int result;
1749
1750	log_print("Using %s for communications",
1751		  dlm_proto_ops->name);
1752
1753	result = dlm_proto_ops->listen_validate();
1754	if (result < 0)
1755		return result;
1756
1757	result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
1758				  SOCK_STREAM, dlm_proto_ops->proto, &sock);
1759	if (result < 0) {
1760		log_print("Can't create comms socket: %d", result);
1761		return result;
1762	}
1763
1764	sock_set_mark(sock->sk, dlm_config.ci_mark);
1765	dlm_proto_ops->listen_sockopts(sock);
1766
1767	result = dlm_proto_ops->listen_bind(sock);
1768	if (result < 0)
1769		goto out;
1770
1771	lock_sock(sock->sk);
1772	listen_sock.sk_data_ready = sock->sk->sk_data_ready;
1773	listen_sock.sk_write_space = sock->sk->sk_write_space;
1774	listen_sock.sk_error_report = sock->sk->sk_error_report;
1775	listen_sock.sk_state_change = sock->sk->sk_state_change;
1776
1777	listen_con.sock = sock;
1778
1779	sock->sk->sk_allocation = GFP_NOFS;
1780	sock->sk->sk_use_task_frag = false;
1781	sock->sk->sk_data_ready = lowcomms_listen_data_ready;
1782	release_sock(sock->sk);
1783
1784	result = sock->ops->listen(sock, 128);
1785	if (result < 0) {
1786		dlm_close_sock(&listen_con.sock);
1787		return result;
1788	}
1789
1790	return 0;
1791
1792out:
1793	sock_release(sock);
1794	return result;
1795}
1796
1797static int dlm_tcp_bind(struct socket *sock)
1798{
1799	struct sockaddr_storage src_addr;
1800	int result, addr_len;
1801
1802	/* Bind to our cluster-known address connecting to avoid
1803	 * routing problems.
1804	 */
1805	memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr));
1806	make_sockaddr(&src_addr, 0, &addr_len);
1807
1808	result = kernel_bind(sock, (struct sockaddr *)&src_addr,
1809			     addr_len);
1810	if (result < 0) {
1811		/* This *may* not indicate a critical error */
1812		log_print("could not bind for connect: %d", result);
1813	}
1814
1815	return 0;
1816}
1817
1818static int dlm_tcp_connect(struct connection *con, struct socket *sock,
1819			   struct sockaddr *addr, int addr_len)
1820{
1821	return kernel_connect(sock, addr, addr_len, O_NONBLOCK);
1822}
1823
1824static int dlm_tcp_listen_validate(void)
1825{
1826	/* We don't support multi-homed hosts */
1827	if (dlm_local_count > 1) {
1828		log_print("TCP protocol can't handle multi-homed hosts, try SCTP");
1829		return -EINVAL;
1830	}
1831
1832	return 0;
1833}
1834
1835static void dlm_tcp_sockopts(struct socket *sock)
1836{
1837	/* Turn off Nagle's algorithm */
1838	tcp_sock_set_nodelay(sock->sk);
1839}
1840
1841static void dlm_tcp_listen_sockopts(struct socket *sock)
1842{
1843	dlm_tcp_sockopts(sock);
1844	sock_set_reuseaddr(sock->sk);
1845}
1846
1847static int dlm_tcp_listen_bind(struct socket *sock)
1848{
1849	int addr_len;
1850
1851	/* Bind to our port */
1852	make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
1853	return kernel_bind(sock, (struct sockaddr *)&dlm_local_addr[0],
1854			   addr_len);
1855}
1856
1857static const struct dlm_proto_ops dlm_tcp_ops = {
1858	.name = "TCP",
1859	.proto = IPPROTO_TCP,
1860	.connect = dlm_tcp_connect,
1861	.sockopts = dlm_tcp_sockopts,
1862	.bind = dlm_tcp_bind,
1863	.listen_validate = dlm_tcp_listen_validate,
1864	.listen_sockopts = dlm_tcp_listen_sockopts,
1865	.listen_bind = dlm_tcp_listen_bind,
1866};
1867
1868static int dlm_sctp_bind(struct socket *sock)
1869{
1870	return sctp_bind_addrs(sock, 0);
1871}
1872
1873static int dlm_sctp_connect(struct connection *con, struct socket *sock,
1874			    struct sockaddr *addr, int addr_len)
1875{
1876	int ret;
1877
1878	/*
1879	 * Make kernel_connect() function return in specified time,
1880	 * since O_NONBLOCK argument in connect() function does not work here,
1881	 * then, we should restore the default value of this attribute.
1882	 */
1883	sock_set_sndtimeo(sock->sk, 5);
1884	ret = kernel_connect(sock, addr, addr_len, 0);
1885	sock_set_sndtimeo(sock->sk, 0);
1886	return ret;
1887}
1888
1889static int dlm_sctp_listen_validate(void)
1890{
1891	if (!IS_ENABLED(CONFIG_IP_SCTP)) {
1892		log_print("SCTP is not enabled by this kernel");
1893		return -EOPNOTSUPP;
1894	}
1895
1896	request_module("sctp");
1897	return 0;
1898}
1899
1900static int dlm_sctp_bind_listen(struct socket *sock)
1901{
1902	return sctp_bind_addrs(sock, dlm_config.ci_tcp_port);
1903}
1904
1905static void dlm_sctp_sockopts(struct socket *sock)
1906{
1907	/* Turn off Nagle's algorithm */
1908	sctp_sock_set_nodelay(sock->sk);
1909	sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
1910}
1911
1912static const struct dlm_proto_ops dlm_sctp_ops = {
1913	.name = "SCTP",
1914	.proto = IPPROTO_SCTP,
1915	.try_new_addr = true,
1916	.connect = dlm_sctp_connect,
1917	.sockopts = dlm_sctp_sockopts,
1918	.bind = dlm_sctp_bind,
1919	.listen_validate = dlm_sctp_listen_validate,
1920	.listen_sockopts = dlm_sctp_sockopts,
1921	.listen_bind = dlm_sctp_bind_listen,
1922};
1923
1924int dlm_lowcomms_start(void)
1925{
1926	int error;
1927
1928	init_local();
1929	if (!dlm_local_count) {
1930		error = -ENOTCONN;
1931		log_print("no local IP address has been set");
1932		goto fail;
1933	}
1934
1935	error = work_start();
1936	if (error)
1937		goto fail;
1938
1939	/* Start listening */
1940	switch (dlm_config.ci_protocol) {
1941	case DLM_PROTO_TCP:
1942		dlm_proto_ops = &dlm_tcp_ops;
1943		break;
1944	case DLM_PROTO_SCTP:
1945		dlm_proto_ops = &dlm_sctp_ops;
1946		break;
1947	default:
1948		log_print("Invalid protocol identifier %d set",
1949			  dlm_config.ci_protocol);
1950		error = -EINVAL;
1951		goto fail_proto_ops;
1952	}
1953
1954	error = dlm_listen_for_all();
1955	if (error)
1956		goto fail_listen;
1957
1958	return 0;
1959
1960fail_listen:
1961	dlm_proto_ops = NULL;
1962fail_proto_ops:
1963	work_stop();
1964fail:
1965	return error;
1966}
1967
1968void dlm_lowcomms_init(void)
1969{
1970	int i;
1971
1972	for (i = 0; i < CONN_HASH_SIZE; i++)
1973		INIT_HLIST_HEAD(&connection_hash[i]);
1974
1975	INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
1976}
1977
1978void dlm_lowcomms_exit(void)
1979{
1980	struct connection *con;
1981	int i, idx;
1982
1983	idx = srcu_read_lock(&connections_srcu);
1984	for (i = 0; i < CONN_HASH_SIZE; i++) {
1985		hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
1986			spin_lock(&connections_lock);
1987			hlist_del_rcu(&con->list);
1988			spin_unlock(&connections_lock);
1989
1990			if (con->othercon)
1991				call_srcu(&connections_srcu, &con->othercon->rcu,
1992					  connection_release);
1993			call_srcu(&connections_srcu, &con->rcu, connection_release);
1994		}
1995	}
1996	srcu_read_unlock(&connections_srcu, idx);
1997}
1998