xref: /kernel/linux/linux-5.10/fs/dlm/lowcomms.c (revision 8c2ecf20)
1// SPDX-License-Identifier: GPL-2.0-only
2/******************************************************************************
3*******************************************************************************
4**
5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
7**
8**
9*******************************************************************************
10******************************************************************************/
11
12/*
13 * lowcomms.c
14 *
15 * This is the "low-level" comms layer.
16 *
17 * It is responsible for sending/receiving messages
18 * from other nodes in the cluster.
19 *
20 * Cluster nodes are referred to by their nodeids. nodeids are
21 * simply 32 bit numbers to the locking module - if they need to
22 * be expanded for the cluster infrastructure then that is its
23 * responsibility. It is this layer's
24 * responsibility to resolve these into IP address or
25 * whatever it needs for inter-node communication.
26 *
27 * The comms level is two kernel threads that deal mainly with
28 * the receiving of messages from other nodes and passing them
29 * up to the mid-level comms layer (which understands the
30 * message format) for execution by the locking core, and
31 * a send thread which does all the setting up of connections
32 * to remote nodes and the sending of data. Threads are not allowed
33 * to send their own data because it may cause them to wait in times
34 * of high load. Also, this way, the sending thread can collect together
35 * messages bound for one node and send them in one block.
36 *
37 * lowcomms will choose to use either TCP or SCTP as its transport layer
38 * depending on the configuration variable 'protocol'. This should be set
39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
40 * cluster-wide mechanism as it must be the same on all nodes of the cluster
41 * for the DLM to function.
42 *
43 */
44
45#include <asm/ioctls.h>
46#include <net/sock.h>
47#include <net/tcp.h>
48#include <linux/pagemap.h>
49#include <linux/file.h>
50#include <linux/mutex.h>
51#include <linux/sctp.h>
52#include <linux/slab.h>
53#include <net/sctp/sctp.h>
54#include <net/ipv6.h>
55
56#include "dlm_internal.h"
57#include "lowcomms.h"
58#include "midcomms.h"
59#include "config.h"
60
61#define NEEDED_RMEM (4*1024*1024)
62#define CONN_HASH_SIZE 32
63
64/* Number of messages to send before rescheduling */
65#define MAX_SEND_MSG_COUNT 25
66#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
67
68struct connection {
69	struct socket *sock;	/* NULL if not connected */
70	uint32_t nodeid;	/* So we know who we are in the list */
71	struct mutex sock_mutex;
72	unsigned long flags;
73#define CF_READ_PENDING 1
74#define CF_WRITE_PENDING 2
75#define CF_INIT_PENDING 4
76#define CF_IS_OTHERCON 5
77#define CF_CLOSE 6
78#define CF_APP_LIMITED 7
79#define CF_CLOSING 8
80#define CF_SHUTDOWN 9
81	struct list_head writequeue;  /* List of outgoing writequeue_entries */
82	spinlock_t writequeue_lock;
83	int (*rx_action) (struct connection *);	/* What to do when active */
84	void (*connect_action) (struct connection *);	/* What to do to connect */
85	void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
86	int retries;
87#define MAX_CONNECT_RETRIES 3
88	struct hlist_node list;
89	struct connection *othercon;
90	struct work_struct rwork; /* Receive workqueue */
91	struct work_struct swork; /* Send workqueue */
92	wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
93	unsigned char *rx_buf;
94	int rx_buflen;
95	int rx_leftover;
96	struct rcu_head rcu;
97};
98#define sock2con(x) ((struct connection *)(x)->sk_user_data)
99
100/* An entry waiting to be sent */
101struct writequeue_entry {
102	struct list_head list;
103	struct page *page;
104	int offset;
105	int len;
106	int end;
107	int users;
108	struct connection *con;
109};
110
111struct dlm_node_addr {
112	struct list_head list;
113	int nodeid;
114	int addr_count;
115	int curr_addr_index;
116	struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
117};
118
119static struct listen_sock_callbacks {
120	void (*sk_error_report)(struct sock *);
121	void (*sk_data_ready)(struct sock *);
122	void (*sk_state_change)(struct sock *);
123	void (*sk_write_space)(struct sock *);
124} listen_sock;
125
126static LIST_HEAD(dlm_node_addrs);
127static DEFINE_SPINLOCK(dlm_node_addrs_spin);
128
129static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
130static int dlm_local_count;
131static int dlm_allow_conn;
132
133/* Work queues */
134static struct workqueue_struct *recv_workqueue;
135static struct workqueue_struct *send_workqueue;
136
137static struct hlist_head connection_hash[CONN_HASH_SIZE];
138static DEFINE_SPINLOCK(connections_lock);
139DEFINE_STATIC_SRCU(connections_srcu);
140
141static void process_recv_sockets(struct work_struct *work);
142static void process_send_sockets(struct work_struct *work);
143
144
145/* This is deliberately very simple because most clusters have simple
146   sequential nodeids, so we should be able to go straight to a connection
147   struct in the array */
148static inline int nodeid_hash(int nodeid)
149{
150	return nodeid & (CONN_HASH_SIZE-1);
151}
152
153static struct connection *__find_con(int nodeid)
154{
155	int r, idx;
156	struct connection *con;
157
158	r = nodeid_hash(nodeid);
159
160	idx = srcu_read_lock(&connections_srcu);
161	hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
162		if (con->nodeid == nodeid) {
163			srcu_read_unlock(&connections_srcu, idx);
164			return con;
165		}
166	}
167	srcu_read_unlock(&connections_srcu, idx);
168
169	return NULL;
170}
171
172/*
173 * If 'allocation' is zero then we don't attempt to create a new
174 * connection structure for this node.
175 */
176static struct connection *nodeid2con(int nodeid, gfp_t alloc)
177{
178	struct connection *con, *tmp;
179	int r;
180
181	con = __find_con(nodeid);
182	if (con || !alloc)
183		return con;
184
185	con = kzalloc(sizeof(*con), alloc);
186	if (!con)
187		return NULL;
188
189	con->rx_buflen = dlm_config.ci_buffer_size;
190	con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
191	if (!con->rx_buf) {
192		kfree(con);
193		return NULL;
194	}
195
196	con->nodeid = nodeid;
197	mutex_init(&con->sock_mutex);
198	INIT_LIST_HEAD(&con->writequeue);
199	spin_lock_init(&con->writequeue_lock);
200	INIT_WORK(&con->swork, process_send_sockets);
201	INIT_WORK(&con->rwork, process_recv_sockets);
202	init_waitqueue_head(&con->shutdown_wait);
203
204	/* Setup action pointers for child sockets */
205	if (con->nodeid) {
206		struct connection *zerocon = __find_con(0);
207
208		con->connect_action = zerocon->connect_action;
209		if (!con->rx_action)
210			con->rx_action = zerocon->rx_action;
211	}
212
213	r = nodeid_hash(nodeid);
214
215	spin_lock(&connections_lock);
216	/* Because multiple workqueues/threads calls this function it can
217	 * race on multiple cpu's. Instead of locking hot path __find_con()
218	 * we just check in rare cases of recently added nodes again
219	 * under protection of connections_lock. If this is the case we
220	 * abort our connection creation and return the existing connection.
221	 */
222	tmp = __find_con(nodeid);
223	if (tmp) {
224		spin_unlock(&connections_lock);
225		kfree(con->rx_buf);
226		kfree(con);
227		return tmp;
228	}
229
230	hlist_add_head_rcu(&con->list, &connection_hash[r]);
231	spin_unlock(&connections_lock);
232
233	return con;
234}
235
236/* Loop round all connections */
237static void foreach_conn(void (*conn_func)(struct connection *c))
238{
239	int i, idx;
240	struct connection *con;
241
242	idx = srcu_read_lock(&connections_srcu);
243	for (i = 0; i < CONN_HASH_SIZE; i++) {
244		hlist_for_each_entry_rcu(con, &connection_hash[i], list)
245			conn_func(con);
246	}
247	srcu_read_unlock(&connections_srcu, idx);
248}
249
250static struct dlm_node_addr *find_node_addr(int nodeid)
251{
252	struct dlm_node_addr *na;
253
254	list_for_each_entry(na, &dlm_node_addrs, list) {
255		if (na->nodeid == nodeid)
256			return na;
257	}
258	return NULL;
259}
260
261static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
262{
263	switch (x->ss_family) {
264	case AF_INET: {
265		struct sockaddr_in *sinx = (struct sockaddr_in *)x;
266		struct sockaddr_in *siny = (struct sockaddr_in *)y;
267		if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
268			return 0;
269		if (sinx->sin_port != siny->sin_port)
270			return 0;
271		break;
272	}
273	case AF_INET6: {
274		struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
275		struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
276		if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
277			return 0;
278		if (sinx->sin6_port != siny->sin6_port)
279			return 0;
280		break;
281	}
282	default:
283		return 0;
284	}
285	return 1;
286}
287
288static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
289			  struct sockaddr *sa_out, bool try_new_addr)
290{
291	struct sockaddr_storage sas;
292	struct dlm_node_addr *na;
293
294	if (!dlm_local_count)
295		return -1;
296
297	spin_lock(&dlm_node_addrs_spin);
298	na = find_node_addr(nodeid);
299	if (na && na->addr_count) {
300		memcpy(&sas, na->addr[na->curr_addr_index],
301		       sizeof(struct sockaddr_storage));
302
303		if (try_new_addr) {
304			na->curr_addr_index++;
305			if (na->curr_addr_index == na->addr_count)
306				na->curr_addr_index = 0;
307		}
308	}
309	spin_unlock(&dlm_node_addrs_spin);
310
311	if (!na)
312		return -EEXIST;
313
314	if (!na->addr_count)
315		return -ENOENT;
316
317	if (sas_out)
318		memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
319
320	if (!sa_out)
321		return 0;
322
323	if (dlm_local_addr[0]->ss_family == AF_INET) {
324		struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
325		struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
326		ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
327	} else {
328		struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
329		struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
330		ret6->sin6_addr = in6->sin6_addr;
331	}
332
333	return 0;
334}
335
336static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
337{
338	struct dlm_node_addr *na;
339	int rv = -EEXIST;
340	int addr_i;
341
342	spin_lock(&dlm_node_addrs_spin);
343	list_for_each_entry(na, &dlm_node_addrs, list) {
344		if (!na->addr_count)
345			continue;
346
347		for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
348			if (addr_compare(na->addr[addr_i], addr)) {
349				*nodeid = na->nodeid;
350				rv = 0;
351				goto unlock;
352			}
353		}
354	}
355unlock:
356	spin_unlock(&dlm_node_addrs_spin);
357	return rv;
358}
359
360int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
361{
362	struct sockaddr_storage *new_addr;
363	struct dlm_node_addr *new_node, *na;
364
365	new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
366	if (!new_node)
367		return -ENOMEM;
368
369	new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
370	if (!new_addr) {
371		kfree(new_node);
372		return -ENOMEM;
373	}
374
375	memcpy(new_addr, addr, len);
376
377	spin_lock(&dlm_node_addrs_spin);
378	na = find_node_addr(nodeid);
379	if (!na) {
380		new_node->nodeid = nodeid;
381		new_node->addr[0] = new_addr;
382		new_node->addr_count = 1;
383		list_add(&new_node->list, &dlm_node_addrs);
384		spin_unlock(&dlm_node_addrs_spin);
385		return 0;
386	}
387
388	if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
389		spin_unlock(&dlm_node_addrs_spin);
390		kfree(new_addr);
391		kfree(new_node);
392		return -ENOSPC;
393	}
394
395	na->addr[na->addr_count++] = new_addr;
396	spin_unlock(&dlm_node_addrs_spin);
397	kfree(new_node);
398	return 0;
399}
400
401/* Data available on socket or listen socket received a connect */
402static void lowcomms_data_ready(struct sock *sk)
403{
404	struct connection *con;
405
406	read_lock_bh(&sk->sk_callback_lock);
407	con = sock2con(sk);
408	if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
409		queue_work(recv_workqueue, &con->rwork);
410	read_unlock_bh(&sk->sk_callback_lock);
411}
412
413static void lowcomms_write_space(struct sock *sk)
414{
415	struct connection *con;
416
417	read_lock_bh(&sk->sk_callback_lock);
418	con = sock2con(sk);
419	if (!con)
420		goto out;
421
422	clear_bit(SOCK_NOSPACE, &con->sock->flags);
423
424	if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
425		con->sock->sk->sk_write_pending--;
426		clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
427	}
428
429	queue_work(send_workqueue, &con->swork);
430out:
431	read_unlock_bh(&sk->sk_callback_lock);
432}
433
434static inline void lowcomms_connect_sock(struct connection *con)
435{
436	if (test_bit(CF_CLOSE, &con->flags))
437		return;
438	queue_work(send_workqueue, &con->swork);
439	cond_resched();
440}
441
442static void lowcomms_state_change(struct sock *sk)
443{
444	/* SCTP layer is not calling sk_data_ready when the connection
445	 * is done, so we catch the signal through here. Also, it
446	 * doesn't switch socket state when entering shutdown, so we
447	 * skip the write in that case.
448	 */
449	if (sk->sk_shutdown) {
450		if (sk->sk_shutdown == RCV_SHUTDOWN)
451			lowcomms_data_ready(sk);
452	} else if (sk->sk_state == TCP_ESTABLISHED) {
453		lowcomms_write_space(sk);
454	}
455}
456
457int dlm_lowcomms_connect_node(int nodeid)
458{
459	struct connection *con;
460
461	if (nodeid == dlm_our_nodeid())
462		return 0;
463
464	con = nodeid2con(nodeid, GFP_NOFS);
465	if (!con)
466		return -ENOMEM;
467	lowcomms_connect_sock(con);
468	return 0;
469}
470
471static void lowcomms_error_report(struct sock *sk)
472{
473	struct connection *con;
474	void (*orig_report)(struct sock *) = NULL;
475	struct inet_sock *inet;
476
477	read_lock_bh(&sk->sk_callback_lock);
478	con = sock2con(sk);
479	if (con == NULL)
480		goto out;
481
482	orig_report = listen_sock.sk_error_report;
483
484	inet = inet_sk(sk);
485	switch (sk->sk_family) {
486	case AF_INET:
487		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
488				   "sending to node %d at %pI4, dport %d, "
489				   "sk_err=%d/%d\n", dlm_our_nodeid(),
490				   con->nodeid, &inet->inet_daddr,
491				   ntohs(inet->inet_dport), sk->sk_err,
492				   sk->sk_err_soft);
493		break;
494#if IS_ENABLED(CONFIG_IPV6)
495	case AF_INET6:
496		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
497				   "sending to node %d at %pI6c, "
498				   "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(),
499				   con->nodeid, &sk->sk_v6_daddr,
500				   ntohs(inet->inet_dport), sk->sk_err,
501				   sk->sk_err_soft);
502		break;
503#endif
504	default:
505		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
506				   "invalid socket family %d set, "
507				   "sk_err=%d/%d\n", dlm_our_nodeid(),
508				   sk->sk_family, sk->sk_err, sk->sk_err_soft);
509		goto out;
510	}
511out:
512	read_unlock_bh(&sk->sk_callback_lock);
513	if (orig_report)
514		orig_report(sk);
515}
516
517/* Note: sk_callback_lock must be locked before calling this function. */
518static void save_listen_callbacks(struct socket *sock)
519{
520	struct sock *sk = sock->sk;
521
522	listen_sock.sk_data_ready = sk->sk_data_ready;
523	listen_sock.sk_state_change = sk->sk_state_change;
524	listen_sock.sk_write_space = sk->sk_write_space;
525	listen_sock.sk_error_report = sk->sk_error_report;
526}
527
528static void restore_callbacks(struct socket *sock)
529{
530	struct sock *sk = sock->sk;
531
532	write_lock_bh(&sk->sk_callback_lock);
533	sk->sk_user_data = NULL;
534	sk->sk_data_ready = listen_sock.sk_data_ready;
535	sk->sk_state_change = listen_sock.sk_state_change;
536	sk->sk_write_space = listen_sock.sk_write_space;
537	sk->sk_error_report = listen_sock.sk_error_report;
538	write_unlock_bh(&sk->sk_callback_lock);
539}
540
541/* Make a socket active */
542static void add_sock(struct socket *sock, struct connection *con)
543{
544	struct sock *sk = sock->sk;
545
546	write_lock_bh(&sk->sk_callback_lock);
547	con->sock = sock;
548
549	sk->sk_user_data = con;
550	/* Install a data_ready callback */
551	sk->sk_data_ready = lowcomms_data_ready;
552	sk->sk_write_space = lowcomms_write_space;
553	sk->sk_state_change = lowcomms_state_change;
554	sk->sk_allocation = GFP_NOFS;
555	sk->sk_error_report = lowcomms_error_report;
556	write_unlock_bh(&sk->sk_callback_lock);
557}
558
559/* Add the port number to an IPv6 or 4 sockaddr and return the address
560   length */
561static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
562			  int *addr_len)
563{
564	saddr->ss_family =  dlm_local_addr[0]->ss_family;
565	if (saddr->ss_family == AF_INET) {
566		struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
567		in4_addr->sin_port = cpu_to_be16(port);
568		*addr_len = sizeof(struct sockaddr_in);
569		memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
570	} else {
571		struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
572		in6_addr->sin6_port = cpu_to_be16(port);
573		*addr_len = sizeof(struct sockaddr_in6);
574	}
575	memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
576}
577
578/* Close a remote connection and tidy up */
579static void close_connection(struct connection *con, bool and_other,
580			     bool tx, bool rx)
581{
582	bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
583
584	if (tx && !closing && cancel_work_sync(&con->swork)) {
585		log_print("canceled swork for node %d", con->nodeid);
586		clear_bit(CF_WRITE_PENDING, &con->flags);
587	}
588	if (rx && !closing && cancel_work_sync(&con->rwork)) {
589		log_print("canceled rwork for node %d", con->nodeid);
590		clear_bit(CF_READ_PENDING, &con->flags);
591	}
592
593	mutex_lock(&con->sock_mutex);
594	if (con->sock) {
595		restore_callbacks(con->sock);
596		sock_release(con->sock);
597		con->sock = NULL;
598	}
599	if (con->othercon && and_other) {
600		/* Will only re-enter once. */
601		close_connection(con->othercon, false, tx, rx);
602	}
603
604	con->rx_leftover = 0;
605	con->retries = 0;
606	mutex_unlock(&con->sock_mutex);
607	clear_bit(CF_CLOSING, &con->flags);
608}
609
610static void shutdown_connection(struct connection *con)
611{
612	int ret;
613
614	flush_work(&con->swork);
615
616	mutex_lock(&con->sock_mutex);
617	/* nothing to shutdown */
618	if (!con->sock) {
619		mutex_unlock(&con->sock_mutex);
620		return;
621	}
622
623	set_bit(CF_SHUTDOWN, &con->flags);
624	ret = kernel_sock_shutdown(con->sock, SHUT_WR);
625	mutex_unlock(&con->sock_mutex);
626	if (ret) {
627		log_print("Connection %p failed to shutdown: %d will force close",
628			  con, ret);
629		goto force_close;
630	} else {
631		ret = wait_event_timeout(con->shutdown_wait,
632					 !test_bit(CF_SHUTDOWN, &con->flags),
633					 DLM_SHUTDOWN_WAIT_TIMEOUT);
634		if (ret == 0) {
635			log_print("Connection %p shutdown timed out, will force close",
636				  con);
637			goto force_close;
638		}
639	}
640
641	return;
642
643force_close:
644	clear_bit(CF_SHUTDOWN, &con->flags);
645	close_connection(con, false, true, true);
646}
647
648static void dlm_tcp_shutdown(struct connection *con)
649{
650	if (con->othercon)
651		shutdown_connection(con->othercon);
652	shutdown_connection(con);
653}
654
655static int con_realloc_receive_buf(struct connection *con, int newlen)
656{
657	unsigned char *newbuf;
658
659	newbuf = kmalloc(newlen, GFP_NOFS);
660	if (!newbuf)
661		return -ENOMEM;
662
663	/* copy any leftover from last receive */
664	if (con->rx_leftover)
665		memmove(newbuf, con->rx_buf, con->rx_leftover);
666
667	/* swap to new buffer space */
668	kfree(con->rx_buf);
669	con->rx_buflen = newlen;
670	con->rx_buf = newbuf;
671
672	return 0;
673}
674
675/* Data received from remote end */
676static int receive_from_sock(struct connection *con)
677{
678	int call_again_soon = 0;
679	struct msghdr msg;
680	struct kvec iov;
681	int ret, buflen;
682
683	mutex_lock(&con->sock_mutex);
684
685	if (con->sock == NULL) {
686		ret = -EAGAIN;
687		goto out_close;
688	}
689
690	if (con->nodeid == 0) {
691		ret = -EINVAL;
692		goto out_close;
693	}
694
695	/* realloc if we get new buffer size to read out */
696	buflen = dlm_config.ci_buffer_size;
697	if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
698		ret = con_realloc_receive_buf(con, buflen);
699		if (ret < 0)
700			goto out_resched;
701	}
702
703	/* calculate new buffer parameter regarding last receive and
704	 * possible leftover bytes
705	 */
706	iov.iov_base = con->rx_buf + con->rx_leftover;
707	iov.iov_len = con->rx_buflen - con->rx_leftover;
708
709	memset(&msg, 0, sizeof(msg));
710	msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
711	ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
712			     msg.msg_flags);
713	if (ret <= 0)
714		goto out_close;
715	else if (ret == iov.iov_len)
716		call_again_soon = 1;
717
718	/* new buflen according readed bytes and leftover from last receive */
719	buflen = ret + con->rx_leftover;
720	ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
721	if (ret < 0)
722		goto out_close;
723
724	/* calculate leftover bytes from process and put it into begin of
725	 * the receive buffer, so next receive we have the full message
726	 * at the start address of the receive buffer.
727	 */
728	con->rx_leftover = buflen - ret;
729	if (con->rx_leftover) {
730		memmove(con->rx_buf, con->rx_buf + ret,
731			con->rx_leftover);
732		call_again_soon = true;
733	}
734
735	if (call_again_soon)
736		goto out_resched;
737
738	mutex_unlock(&con->sock_mutex);
739	return 0;
740
741out_resched:
742	if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
743		queue_work(recv_workqueue, &con->rwork);
744	mutex_unlock(&con->sock_mutex);
745	return -EAGAIN;
746
747out_close:
748	mutex_unlock(&con->sock_mutex);
749	if (ret != -EAGAIN) {
750		/* Reconnect when there is something to send */
751		close_connection(con, false, true, false);
752		if (ret == 0) {
753			log_print("connection %p got EOF from %d",
754				  con, con->nodeid);
755			/* handling for tcp shutdown */
756			clear_bit(CF_SHUTDOWN, &con->flags);
757			wake_up(&con->shutdown_wait);
758			/* signal to breaking receive worker */
759			ret = -1;
760		}
761	}
762	return ret;
763}
764
765/* Listening socket is busy, accept a connection */
766static int accept_from_sock(struct connection *con)
767{
768	int result;
769	struct sockaddr_storage peeraddr;
770	struct socket *newsock;
771	int len;
772	int nodeid;
773	struct connection *newcon;
774	struct connection *addcon;
775	unsigned int mark;
776
777	if (!dlm_allow_conn) {
778		return -1;
779	}
780
781	mutex_lock_nested(&con->sock_mutex, 0);
782
783	if (!con->sock) {
784		mutex_unlock(&con->sock_mutex);
785		return -ENOTCONN;
786	}
787
788	result = kernel_accept(con->sock, &newsock, O_NONBLOCK);
789	if (result < 0)
790		goto accept_err;
791
792	/* Get the connected socket's peer */
793	memset(&peeraddr, 0, sizeof(peeraddr));
794	len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2);
795	if (len < 0) {
796		result = -ECONNABORTED;
797		goto accept_err;
798	}
799
800	/* Get the new node's NODEID */
801	make_sockaddr(&peeraddr, 0, &len);
802	if (addr_to_nodeid(&peeraddr, &nodeid)) {
803		unsigned char *b=(unsigned char *)&peeraddr;
804		log_print("connect from non cluster node");
805		print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
806				     b, sizeof(struct sockaddr_storage));
807		sock_release(newsock);
808		mutex_unlock(&con->sock_mutex);
809		return -1;
810	}
811
812	dlm_comm_mark(nodeid, &mark);
813	sock_set_mark(newsock->sk, mark);
814
815	log_print("got connection from %d", nodeid);
816
817	/*  Check to see if we already have a connection to this node. This
818	 *  could happen if the two nodes initiate a connection at roughly
819	 *  the same time and the connections cross on the wire.
820	 *  In this case we store the incoming one in "othercon"
821	 */
822	newcon = nodeid2con(nodeid, GFP_NOFS);
823	if (!newcon) {
824		result = -ENOMEM;
825		goto accept_err;
826	}
827	mutex_lock_nested(&newcon->sock_mutex, 1);
828	if (newcon->sock) {
829		struct connection *othercon = newcon->othercon;
830
831		if (!othercon) {
832			othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
833			if (!othercon) {
834				log_print("failed to allocate incoming socket");
835				mutex_unlock(&newcon->sock_mutex);
836				result = -ENOMEM;
837				goto accept_err;
838			}
839
840			othercon->rx_buflen = dlm_config.ci_buffer_size;
841			othercon->rx_buf = kmalloc(othercon->rx_buflen, GFP_NOFS);
842			if (!othercon->rx_buf) {
843				mutex_unlock(&newcon->sock_mutex);
844				kfree(othercon);
845				log_print("failed to allocate incoming socket receive buffer");
846				result = -ENOMEM;
847				goto accept_err;
848			}
849
850			othercon->nodeid = nodeid;
851			othercon->rx_action = receive_from_sock;
852			mutex_init(&othercon->sock_mutex);
853			INIT_LIST_HEAD(&othercon->writequeue);
854			spin_lock_init(&othercon->writequeue_lock);
855			INIT_WORK(&othercon->swork, process_send_sockets);
856			INIT_WORK(&othercon->rwork, process_recv_sockets);
857			init_waitqueue_head(&othercon->shutdown_wait);
858			set_bit(CF_IS_OTHERCON, &othercon->flags);
859		} else {
860			/* close other sock con if we have something new */
861			close_connection(othercon, false, true, false);
862		}
863
864		mutex_lock_nested(&othercon->sock_mutex, 2);
865		newcon->othercon = othercon;
866		add_sock(newsock, othercon);
867		addcon = othercon;
868		mutex_unlock(&othercon->sock_mutex);
869	}
870	else {
871		newcon->rx_action = receive_from_sock;
872		/* accept copies the sk after we've saved the callbacks, so we
873		   don't want to save them a second time or comm errors will
874		   result in calling sk_error_report recursively. */
875		add_sock(newsock, newcon);
876		addcon = newcon;
877	}
878
879	mutex_unlock(&newcon->sock_mutex);
880
881	/*
882	 * Add it to the active queue in case we got data
883	 * between processing the accept adding the socket
884	 * to the read_sockets list
885	 */
886	if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
887		queue_work(recv_workqueue, &addcon->rwork);
888	mutex_unlock(&con->sock_mutex);
889
890	return 0;
891
892accept_err:
893	mutex_unlock(&con->sock_mutex);
894	if (newsock)
895		sock_release(newsock);
896
897	if (result != -EAGAIN)
898		log_print("error accepting connection from node: %d", result);
899	return result;
900}
901
902static void free_entry(struct writequeue_entry *e)
903{
904	__free_page(e->page);
905	kfree(e);
906}
907
908/*
909 * writequeue_entry_complete - try to delete and free write queue entry
910 * @e: write queue entry to try to delete
911 * @completed: bytes completed
912 *
913 * writequeue_lock must be held.
914 */
915static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
916{
917	e->offset += completed;
918	e->len -= completed;
919
920	if (e->len == 0 && e->users == 0) {
921		list_del(&e->list);
922		free_entry(e);
923	}
924}
925
926/*
927 * sctp_bind_addrs - bind a SCTP socket to all our addresses
928 */
929static int sctp_bind_addrs(struct connection *con, uint16_t port)
930{
931	struct sockaddr_storage localaddr;
932	struct sockaddr *addr = (struct sockaddr *)&localaddr;
933	int i, addr_len, result = 0;
934
935	for (i = 0; i < dlm_local_count; i++) {
936		memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
937		make_sockaddr(&localaddr, port, &addr_len);
938
939		if (!i)
940			result = kernel_bind(con->sock, addr, addr_len);
941		else
942			result = sock_bind_add(con->sock->sk, addr, addr_len);
943
944		if (result < 0) {
945			log_print("Can't bind to %d addr number %d, %d.\n",
946				  port, i + 1, result);
947			break;
948		}
949	}
950	return result;
951}
952
953/* Initiate an SCTP association.
954   This is a special case of send_to_sock() in that we don't yet have a
955   peeled-off socket for this association, so we use the listening socket
956   and add the primary IP address of the remote node.
957 */
958static void sctp_connect_to_sock(struct connection *con)
959{
960	struct sockaddr_storage daddr;
961	int result;
962	int addr_len;
963	struct socket *sock;
964	unsigned int mark;
965
966	if (con->nodeid == 0) {
967		log_print("attempt to connect sock 0 foiled");
968		return;
969	}
970
971	dlm_comm_mark(con->nodeid, &mark);
972
973	mutex_lock(&con->sock_mutex);
974
975	/* Some odd races can cause double-connects, ignore them */
976	if (con->retries++ > MAX_CONNECT_RETRIES)
977		goto out;
978
979	if (con->sock) {
980		log_print("node %d already connected.", con->nodeid);
981		goto out;
982	}
983
984	memset(&daddr, 0, sizeof(daddr));
985	result = nodeid_to_addr(con->nodeid, &daddr, NULL, true);
986	if (result < 0) {
987		log_print("no address for nodeid %d", con->nodeid);
988		goto out;
989	}
990
991	/* Create a socket to communicate with */
992	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
993				  SOCK_STREAM, IPPROTO_SCTP, &sock);
994	if (result < 0)
995		goto socket_err;
996
997	sock_set_mark(sock->sk, mark);
998
999	con->rx_action = receive_from_sock;
1000	con->connect_action = sctp_connect_to_sock;
1001	add_sock(sock, con);
1002
1003	/* Bind to all addresses. */
1004	if (sctp_bind_addrs(con, 0))
1005		goto bind_err;
1006
1007	make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
1008
1009	log_print("connecting to %d", con->nodeid);
1010
1011	/* Turn off Nagle's algorithm */
1012	sctp_sock_set_nodelay(sock->sk);
1013
1014	/*
1015	 * Make sock->ops->connect() function return in specified time,
1016	 * since O_NONBLOCK argument in connect() function does not work here,
1017	 * then, we should restore the default value of this attribute.
1018	 */
1019	sock_set_sndtimeo(sock->sk, 5);
1020	result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
1021				   0);
1022	sock_set_sndtimeo(sock->sk, 0);
1023
1024	if (result == -EINPROGRESS)
1025		result = 0;
1026	if (result == 0)
1027		goto out;
1028
1029bind_err:
1030	con->sock = NULL;
1031	sock_release(sock);
1032
1033socket_err:
1034	/*
1035	 * Some errors are fatal and this list might need adjusting. For other
1036	 * errors we try again until the max number of retries is reached.
1037	 */
1038	if (result != -EHOSTUNREACH &&
1039	    result != -ENETUNREACH &&
1040	    result != -ENETDOWN &&
1041	    result != -EINVAL &&
1042	    result != -EPROTONOSUPPORT) {
1043		log_print("connect %d try %d error %d", con->nodeid,
1044			  con->retries, result);
1045		mutex_unlock(&con->sock_mutex);
1046		msleep(1000);
1047		lowcomms_connect_sock(con);
1048		return;
1049	}
1050
1051out:
1052	mutex_unlock(&con->sock_mutex);
1053}
1054
1055/* Connect a new socket to its peer */
1056static void tcp_connect_to_sock(struct connection *con)
1057{
1058	struct sockaddr_storage saddr, src_addr;
1059	int addr_len;
1060	struct socket *sock = NULL;
1061	unsigned int mark;
1062	int result;
1063
1064	if (con->nodeid == 0) {
1065		log_print("attempt to connect sock 0 foiled");
1066		return;
1067	}
1068
1069	dlm_comm_mark(con->nodeid, &mark);
1070
1071	mutex_lock(&con->sock_mutex);
1072	if (con->retries++ > MAX_CONNECT_RETRIES)
1073		goto out;
1074
1075	/* Some odd races can cause double-connects, ignore them */
1076	if (con->sock)
1077		goto out;
1078
1079	/* Create a socket to communicate with */
1080	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1081				  SOCK_STREAM, IPPROTO_TCP, &sock);
1082	if (result < 0)
1083		goto out_err;
1084
1085	sock_set_mark(sock->sk, mark);
1086
1087	memset(&saddr, 0, sizeof(saddr));
1088	result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
1089	if (result < 0) {
1090		log_print("no address for nodeid %d", con->nodeid);
1091		goto out_err;
1092	}
1093
1094	con->rx_action = receive_from_sock;
1095	con->connect_action = tcp_connect_to_sock;
1096	con->shutdown_action = dlm_tcp_shutdown;
1097	add_sock(sock, con);
1098
1099	/* Bind to our cluster-known address connecting to avoid
1100	   routing problems */
1101	memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1102	make_sockaddr(&src_addr, 0, &addr_len);
1103	result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1104				 addr_len);
1105	if (result < 0) {
1106		log_print("could not bind for connect: %d", result);
1107		/* This *may* not indicate a critical error */
1108	}
1109
1110	make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
1111
1112	log_print("connecting to %d", con->nodeid);
1113
1114	/* Turn off Nagle's algorithm */
1115	tcp_sock_set_nodelay(sock->sk);
1116
1117	result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
1118				   O_NONBLOCK);
1119	if (result == -EINPROGRESS)
1120		result = 0;
1121	if (result == 0)
1122		goto out;
1123
1124out_err:
1125	if (con->sock) {
1126		sock_release(con->sock);
1127		con->sock = NULL;
1128	} else if (sock) {
1129		sock_release(sock);
1130	}
1131	/*
1132	 * Some errors are fatal and this list might need adjusting. For other
1133	 * errors we try again until the max number of retries is reached.
1134	 */
1135	if (result != -EHOSTUNREACH &&
1136	    result != -ENETUNREACH &&
1137	    result != -ENETDOWN &&
1138	    result != -EINVAL &&
1139	    result != -EPROTONOSUPPORT) {
1140		log_print("connect %d try %d error %d", con->nodeid,
1141			  con->retries, result);
1142		mutex_unlock(&con->sock_mutex);
1143		msleep(1000);
1144		lowcomms_connect_sock(con);
1145		return;
1146	}
1147out:
1148	mutex_unlock(&con->sock_mutex);
1149	return;
1150}
1151
1152static struct socket *tcp_create_listen_sock(struct connection *con,
1153					     struct sockaddr_storage *saddr)
1154{
1155	struct socket *sock = NULL;
1156	int result = 0;
1157	int addr_len;
1158
1159	if (dlm_local_addr[0]->ss_family == AF_INET)
1160		addr_len = sizeof(struct sockaddr_in);
1161	else
1162		addr_len = sizeof(struct sockaddr_in6);
1163
1164	/* Create a socket to communicate with */
1165	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1166				  SOCK_STREAM, IPPROTO_TCP, &sock);
1167	if (result < 0) {
1168		log_print("Can't create listening comms socket");
1169		goto create_out;
1170	}
1171
1172	sock_set_mark(sock->sk, dlm_config.ci_mark);
1173
1174	/* Turn off Nagle's algorithm */
1175	tcp_sock_set_nodelay(sock->sk);
1176
1177	sock_set_reuseaddr(sock->sk);
1178
1179	write_lock_bh(&sock->sk->sk_callback_lock);
1180	sock->sk->sk_user_data = con;
1181	save_listen_callbacks(sock);
1182	con->rx_action = accept_from_sock;
1183	con->connect_action = tcp_connect_to_sock;
1184	write_unlock_bh(&sock->sk->sk_callback_lock);
1185
1186	/* Bind to our port */
1187	make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1188	result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1189	if (result < 0) {
1190		log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1191		sock_release(sock);
1192		sock = NULL;
1193		con->sock = NULL;
1194		goto create_out;
1195	}
1196	sock_set_keepalive(sock->sk);
1197
1198	result = sock->ops->listen(sock, 5);
1199	if (result < 0) {
1200		log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1201		sock_release(sock);
1202		sock = NULL;
1203		goto create_out;
1204	}
1205
1206create_out:
1207	return sock;
1208}
1209
1210/* Get local addresses */
1211static void init_local(void)
1212{
1213	struct sockaddr_storage sas, *addr;
1214	int i;
1215
1216	dlm_local_count = 0;
1217	for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1218		if (dlm_our_addr(&sas, i))
1219			break;
1220
1221		addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS);
1222		if (!addr)
1223			break;
1224		dlm_local_addr[dlm_local_count++] = addr;
1225	}
1226}
1227
1228static void deinit_local(void)
1229{
1230	int i;
1231
1232	for (i = 0; i < dlm_local_count; i++)
1233		kfree(dlm_local_addr[i]);
1234}
1235
1236/* Initialise SCTP socket and bind to all interfaces */
1237static int sctp_listen_for_all(void)
1238{
1239	struct socket *sock = NULL;
1240	int result = -EINVAL;
1241	struct connection *con = nodeid2con(0, GFP_NOFS);
1242
1243	if (!con)
1244		return -ENOMEM;
1245
1246	log_print("Using SCTP for communications");
1247
1248	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1249				  SOCK_STREAM, IPPROTO_SCTP, &sock);
1250	if (result < 0) {
1251		log_print("Can't create comms socket, check SCTP is loaded");
1252		goto out;
1253	}
1254
1255	sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
1256	sock_set_mark(sock->sk, dlm_config.ci_mark);
1257	sctp_sock_set_nodelay(sock->sk);
1258
1259	write_lock_bh(&sock->sk->sk_callback_lock);
1260	/* Init con struct */
1261	sock->sk->sk_user_data = con;
1262	save_listen_callbacks(sock);
1263	con->sock = sock;
1264	con->sock->sk->sk_data_ready = lowcomms_data_ready;
1265	con->rx_action = accept_from_sock;
1266	con->connect_action = sctp_connect_to_sock;
1267
1268	write_unlock_bh(&sock->sk->sk_callback_lock);
1269
1270	/* Bind to all addresses. */
1271	if (sctp_bind_addrs(con, dlm_config.ci_tcp_port))
1272		goto create_delsock;
1273
1274	result = sock->ops->listen(sock, 5);
1275	if (result < 0) {
1276		log_print("Can't set socket listening");
1277		goto create_delsock;
1278	}
1279
1280	return 0;
1281
1282create_delsock:
1283	sock_release(sock);
1284	con->sock = NULL;
1285out:
1286	return result;
1287}
1288
1289static int tcp_listen_for_all(void)
1290{
1291	struct socket *sock = NULL;
1292	struct connection *con = nodeid2con(0, GFP_NOFS);
1293	int result = -EINVAL;
1294
1295	if (!con)
1296		return -ENOMEM;
1297
1298	/* We don't support multi-homed hosts */
1299	if (dlm_local_addr[1] != NULL) {
1300		log_print("TCP protocol can't handle multi-homed hosts, "
1301			  "try SCTP");
1302		return -EINVAL;
1303	}
1304
1305	log_print("Using TCP for communications");
1306
1307	sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1308	if (sock) {
1309		add_sock(sock, con);
1310		result = 0;
1311	}
1312	else {
1313		result = -EADDRINUSE;
1314	}
1315
1316	return result;
1317}
1318
1319
1320
1321static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1322						     gfp_t allocation)
1323{
1324	struct writequeue_entry *entry;
1325
1326	entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1327	if (!entry)
1328		return NULL;
1329
1330	entry->page = alloc_page(allocation);
1331	if (!entry->page) {
1332		kfree(entry);
1333		return NULL;
1334	}
1335
1336	entry->offset = 0;
1337	entry->len = 0;
1338	entry->end = 0;
1339	entry->users = 0;
1340	entry->con = con;
1341
1342	return entry;
1343}
1344
1345void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1346{
1347	struct connection *con;
1348	struct writequeue_entry *e;
1349	int offset = 0;
1350
1351	con = nodeid2con(nodeid, allocation);
1352	if (!con)
1353		return NULL;
1354
1355	spin_lock(&con->writequeue_lock);
1356	e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1357	if ((&e->list == &con->writequeue) ||
1358	    (PAGE_SIZE - e->end < len)) {
1359		e = NULL;
1360	} else {
1361		offset = e->end;
1362		e->end += len;
1363		e->users++;
1364	}
1365	spin_unlock(&con->writequeue_lock);
1366
1367	if (e) {
1368	got_one:
1369		*ppc = page_address(e->page) + offset;
1370		return e;
1371	}
1372
1373	e = new_writequeue_entry(con, allocation);
1374	if (e) {
1375		spin_lock(&con->writequeue_lock);
1376		offset = e->end;
1377		e->end += len;
1378		e->users++;
1379		list_add_tail(&e->list, &con->writequeue);
1380		spin_unlock(&con->writequeue_lock);
1381		goto got_one;
1382	}
1383	return NULL;
1384}
1385
1386void dlm_lowcomms_commit_buffer(void *mh)
1387{
1388	struct writequeue_entry *e = (struct writequeue_entry *)mh;
1389	struct connection *con = e->con;
1390	int users;
1391
1392	spin_lock(&con->writequeue_lock);
1393	users = --e->users;
1394	if (users)
1395		goto out;
1396	e->len = e->end - e->offset;
1397	spin_unlock(&con->writequeue_lock);
1398
1399	queue_work(send_workqueue, &con->swork);
1400	return;
1401
1402out:
1403	spin_unlock(&con->writequeue_lock);
1404	return;
1405}
1406
1407/* Send a message */
1408static void send_to_sock(struct connection *con)
1409{
1410	int ret = 0;
1411	const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1412	struct writequeue_entry *e;
1413	int len, offset;
1414	int count = 0;
1415
1416	mutex_lock(&con->sock_mutex);
1417	if (con->sock == NULL)
1418		goto out_connect;
1419
1420	spin_lock(&con->writequeue_lock);
1421	for (;;) {
1422		e = list_entry(con->writequeue.next, struct writequeue_entry,
1423			       list);
1424		if ((struct list_head *) e == &con->writequeue)
1425			break;
1426
1427		len = e->len;
1428		offset = e->offset;
1429		BUG_ON(len == 0 && e->users == 0);
1430		spin_unlock(&con->writequeue_lock);
1431
1432		ret = 0;
1433		if (len) {
1434			ret = kernel_sendpage(con->sock, e->page, offset, len,
1435					      msg_flags);
1436			if (ret == -EAGAIN || ret == 0) {
1437				if (ret == -EAGAIN &&
1438				    test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1439				    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1440					/* Notify TCP that we're limited by the
1441					 * application window size.
1442					 */
1443					set_bit(SOCK_NOSPACE, &con->sock->flags);
1444					con->sock->sk->sk_write_pending++;
1445				}
1446				cond_resched();
1447				goto out;
1448			} else if (ret < 0)
1449				goto send_error;
1450		}
1451
1452		/* Don't starve people filling buffers */
1453		if (++count >= MAX_SEND_MSG_COUNT) {
1454			cond_resched();
1455			count = 0;
1456		}
1457
1458		spin_lock(&con->writequeue_lock);
1459		writequeue_entry_complete(e, ret);
1460	}
1461	spin_unlock(&con->writequeue_lock);
1462out:
1463	mutex_unlock(&con->sock_mutex);
1464	return;
1465
1466send_error:
1467	mutex_unlock(&con->sock_mutex);
1468	close_connection(con, false, false, true);
1469	/* Requeue the send work. When the work daemon runs again, it will try
1470	   a new connection, then call this function again. */
1471	queue_work(send_workqueue, &con->swork);
1472	return;
1473
1474out_connect:
1475	mutex_unlock(&con->sock_mutex);
1476	queue_work(send_workqueue, &con->swork);
1477	cond_resched();
1478}
1479
1480static void clean_one_writequeue(struct connection *con)
1481{
1482	struct writequeue_entry *e, *safe;
1483
1484	spin_lock(&con->writequeue_lock);
1485	list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1486		list_del(&e->list);
1487		free_entry(e);
1488	}
1489	spin_unlock(&con->writequeue_lock);
1490}
1491
1492/* Called from recovery when it knows that a node has
1493   left the cluster */
1494int dlm_lowcomms_close(int nodeid)
1495{
1496	struct connection *con;
1497	struct dlm_node_addr *na;
1498
1499	log_print("closing connection to node %d", nodeid);
1500	con = nodeid2con(nodeid, 0);
1501	if (con) {
1502		set_bit(CF_CLOSE, &con->flags);
1503		close_connection(con, true, true, true);
1504		clean_one_writequeue(con);
1505	}
1506
1507	spin_lock(&dlm_node_addrs_spin);
1508	na = find_node_addr(nodeid);
1509	if (na) {
1510		list_del(&na->list);
1511		while (na->addr_count--)
1512			kfree(na->addr[na->addr_count]);
1513		kfree(na);
1514	}
1515	spin_unlock(&dlm_node_addrs_spin);
1516
1517	return 0;
1518}
1519
1520/* Receive workqueue function */
1521static void process_recv_sockets(struct work_struct *work)
1522{
1523	struct connection *con = container_of(work, struct connection, rwork);
1524	int err;
1525
1526	clear_bit(CF_READ_PENDING, &con->flags);
1527	do {
1528		err = con->rx_action(con);
1529	} while (!err);
1530}
1531
1532/* Send workqueue function */
1533static void process_send_sockets(struct work_struct *work)
1534{
1535	struct connection *con = container_of(work, struct connection, swork);
1536
1537	clear_bit(CF_WRITE_PENDING, &con->flags);
1538	if (con->sock == NULL) /* not mutex protected so check it inside too */
1539		con->connect_action(con);
1540	if (!list_empty(&con->writequeue))
1541		send_to_sock(con);
1542}
1543
1544static void work_stop(void)
1545{
1546	if (recv_workqueue)
1547		destroy_workqueue(recv_workqueue);
1548	if (send_workqueue)
1549		destroy_workqueue(send_workqueue);
1550}
1551
1552static int work_start(void)
1553{
1554	recv_workqueue = alloc_workqueue("dlm_recv",
1555					 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1556	if (!recv_workqueue) {
1557		log_print("can't start dlm_recv");
1558		return -ENOMEM;
1559	}
1560
1561	send_workqueue = alloc_workqueue("dlm_send",
1562					 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1563	if (!send_workqueue) {
1564		log_print("can't start dlm_send");
1565		destroy_workqueue(recv_workqueue);
1566		return -ENOMEM;
1567	}
1568
1569	return 0;
1570}
1571
1572static void _stop_conn(struct connection *con, bool and_other)
1573{
1574	mutex_lock(&con->sock_mutex);
1575	set_bit(CF_CLOSE, &con->flags);
1576	set_bit(CF_READ_PENDING, &con->flags);
1577	set_bit(CF_WRITE_PENDING, &con->flags);
1578	if (con->sock && con->sock->sk) {
1579		write_lock_bh(&con->sock->sk->sk_callback_lock);
1580		con->sock->sk->sk_user_data = NULL;
1581		write_unlock_bh(&con->sock->sk->sk_callback_lock);
1582	}
1583	if (con->othercon && and_other)
1584		_stop_conn(con->othercon, false);
1585	mutex_unlock(&con->sock_mutex);
1586}
1587
1588static void stop_conn(struct connection *con)
1589{
1590	_stop_conn(con, true);
1591}
1592
1593static void shutdown_conn(struct connection *con)
1594{
1595	if (con->shutdown_action)
1596		con->shutdown_action(con);
1597}
1598
1599static void connection_release(struct rcu_head *rcu)
1600{
1601	struct connection *con = container_of(rcu, struct connection, rcu);
1602
1603	kfree(con->rx_buf);
1604	kfree(con);
1605}
1606
1607static void free_conn(struct connection *con)
1608{
1609	close_connection(con, true, true, true);
1610	spin_lock(&connections_lock);
1611	hlist_del_rcu(&con->list);
1612	spin_unlock(&connections_lock);
1613	if (con->othercon) {
1614		clean_one_writequeue(con->othercon);
1615		call_rcu(&con->othercon->rcu, connection_release);
1616	}
1617	clean_one_writequeue(con);
1618	call_rcu(&con->rcu, connection_release);
1619}
1620
1621static void work_flush(void)
1622{
1623	int ok, idx;
1624	int i;
1625	struct connection *con;
1626
1627	do {
1628		ok = 1;
1629		foreach_conn(stop_conn);
1630		if (recv_workqueue)
1631			flush_workqueue(recv_workqueue);
1632		if (send_workqueue)
1633			flush_workqueue(send_workqueue);
1634		idx = srcu_read_lock(&connections_srcu);
1635		for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
1636			hlist_for_each_entry_rcu(con, &connection_hash[i],
1637						 list) {
1638				ok &= test_bit(CF_READ_PENDING, &con->flags);
1639				ok &= test_bit(CF_WRITE_PENDING, &con->flags);
1640				if (con->othercon) {
1641					ok &= test_bit(CF_READ_PENDING,
1642						       &con->othercon->flags);
1643					ok &= test_bit(CF_WRITE_PENDING,
1644						       &con->othercon->flags);
1645				}
1646			}
1647		}
1648		srcu_read_unlock(&connections_srcu, idx);
1649	} while (!ok);
1650}
1651
1652void dlm_lowcomms_stop(void)
1653{
1654	/* Set all the flags to prevent any
1655	   socket activity.
1656	*/
1657	dlm_allow_conn = 0;
1658
1659	if (recv_workqueue)
1660		flush_workqueue(recv_workqueue);
1661	if (send_workqueue)
1662		flush_workqueue(send_workqueue);
1663
1664	foreach_conn(shutdown_conn);
1665	work_flush();
1666	foreach_conn(free_conn);
1667	work_stop();
1668	deinit_local();
1669}
1670
1671int dlm_lowcomms_start(void)
1672{
1673	int error = -EINVAL;
1674	struct connection *con;
1675	int i;
1676
1677	for (i = 0; i < CONN_HASH_SIZE; i++)
1678		INIT_HLIST_HEAD(&connection_hash[i]);
1679
1680	init_local();
1681	if (!dlm_local_count) {
1682		error = -ENOTCONN;
1683		log_print("no local IP address has been set");
1684		goto fail;
1685	}
1686
1687	error = work_start();
1688	if (error)
1689		goto fail;
1690
1691	dlm_allow_conn = 1;
1692
1693	/* Start listening */
1694	if (dlm_config.ci_protocol == 0)
1695		error = tcp_listen_for_all();
1696	else
1697		error = sctp_listen_for_all();
1698	if (error)
1699		goto fail_unlisten;
1700
1701	return 0;
1702
1703fail_unlisten:
1704	dlm_allow_conn = 0;
1705	con = nodeid2con(0,0);
1706	if (con)
1707		free_conn(con);
1708fail:
1709	return error;
1710}
1711
1712void dlm_lowcomms_exit(void)
1713{
1714	struct dlm_node_addr *na, *safe;
1715
1716	spin_lock(&dlm_node_addrs_spin);
1717	list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1718		list_del(&na->list);
1719		while (na->addr_count--)
1720			kfree(na->addr[na->addr_count]);
1721		kfree(na);
1722	}
1723	spin_unlock(&dlm_node_addrs_spin);
1724}
1725