162306a36Sopenharmony_ci// SPDX-License-Identifier: GPL-2.0-only
262306a36Sopenharmony_ci/******************************************************************************
362306a36Sopenharmony_ci*******************************************************************************
462306a36Sopenharmony_ci**
562306a36Sopenharmony_ci**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
662306a36Sopenharmony_ci**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
762306a36Sopenharmony_ci**
862306a36Sopenharmony_ci**
962306a36Sopenharmony_ci*******************************************************************************
1062306a36Sopenharmony_ci******************************************************************************/
1162306a36Sopenharmony_ci
1262306a36Sopenharmony_ci/*
1362306a36Sopenharmony_ci * lowcomms.c
1462306a36Sopenharmony_ci *
1562306a36Sopenharmony_ci * This is the "low-level" comms layer.
1662306a36Sopenharmony_ci *
1762306a36Sopenharmony_ci * It is responsible for sending/receiving messages
1862306a36Sopenharmony_ci * from other nodes in the cluster.
1962306a36Sopenharmony_ci *
2062306a36Sopenharmony_ci * Cluster nodes are referred to by their nodeids. nodeids are
2162306a36Sopenharmony_ci * simply 32 bit numbers to the locking module - if they need to
2262306a36Sopenharmony_ci * be expanded for the cluster infrastructure then that is its
2362306a36Sopenharmony_ci * responsibility. It is this layer's
2462306a36Sopenharmony_ci * responsibility to resolve these into IP address or
2562306a36Sopenharmony_ci * whatever it needs for inter-node communication.
2662306a36Sopenharmony_ci *
2762306a36Sopenharmony_ci * The comms level is two kernel threads that deal mainly with
2862306a36Sopenharmony_ci * the receiving of messages from other nodes and passing them
2962306a36Sopenharmony_ci * up to the mid-level comms layer (which understands the
3062306a36Sopenharmony_ci * message format) for execution by the locking core, and
3162306a36Sopenharmony_ci * a send thread which does all the setting up of connections
3262306a36Sopenharmony_ci * to remote nodes and the sending of data. Threads are not allowed
3362306a36Sopenharmony_ci * to send their own data because it may cause them to wait in times
3462306a36Sopenharmony_ci * of high load. Also, this way, the sending thread can collect together
3562306a36Sopenharmony_ci * messages bound for one node and send them in one block.
3662306a36Sopenharmony_ci *
3762306a36Sopenharmony_ci * lowcomms will choose to use either TCP or SCTP as its transport layer
3862306a36Sopenharmony_ci * depending on the configuration variable 'protocol'. This should be set
3962306a36Sopenharmony_ci * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
4062306a36Sopenharmony_ci * cluster-wide mechanism as it must be the same on all nodes of the cluster
4162306a36Sopenharmony_ci * for the DLM to function.
4262306a36Sopenharmony_ci *
4362306a36Sopenharmony_ci */
4462306a36Sopenharmony_ci
4562306a36Sopenharmony_ci#include <asm/ioctls.h>
4662306a36Sopenharmony_ci#include <net/sock.h>
4762306a36Sopenharmony_ci#include <net/tcp.h>
4862306a36Sopenharmony_ci#include <linux/pagemap.h>
4962306a36Sopenharmony_ci#include <linux/file.h>
5062306a36Sopenharmony_ci#include <linux/mutex.h>
5162306a36Sopenharmony_ci#include <linux/sctp.h>
5262306a36Sopenharmony_ci#include <linux/slab.h>
5362306a36Sopenharmony_ci#include <net/sctp/sctp.h>
5462306a36Sopenharmony_ci#include <net/ipv6.h>
5562306a36Sopenharmony_ci
5662306a36Sopenharmony_ci#include <trace/events/dlm.h>
5762306a36Sopenharmony_ci#include <trace/events/sock.h>
5862306a36Sopenharmony_ci
5962306a36Sopenharmony_ci#include "dlm_internal.h"
6062306a36Sopenharmony_ci#include "lowcomms.h"
6162306a36Sopenharmony_ci#include "midcomms.h"
6262306a36Sopenharmony_ci#include "memory.h"
6362306a36Sopenharmony_ci#include "config.h"
6462306a36Sopenharmony_ci
6562306a36Sopenharmony_ci#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000)
6662306a36Sopenharmony_ci#define NEEDED_RMEM (4*1024*1024)
6762306a36Sopenharmony_ci
6862306a36Sopenharmony_cistruct connection {
6962306a36Sopenharmony_ci	struct socket *sock;	/* NULL if not connected */
7062306a36Sopenharmony_ci	uint32_t nodeid;	/* So we know who we are in the list */
7162306a36Sopenharmony_ci	/* this semaphore is used to allow parallel recv/send in read
7262306a36Sopenharmony_ci	 * lock mode. When we release a sock we need to held the write lock.
7362306a36Sopenharmony_ci	 *
7462306a36Sopenharmony_ci	 * However this is locking code and not nice. When we remove the
7562306a36Sopenharmony_ci	 * othercon handling we can look into other mechanism to synchronize
7662306a36Sopenharmony_ci	 * io handling to call sock_release() at the right time.
7762306a36Sopenharmony_ci	 */
7862306a36Sopenharmony_ci	struct rw_semaphore sock_lock;
7962306a36Sopenharmony_ci	unsigned long flags;
8062306a36Sopenharmony_ci#define CF_APP_LIMITED 0
8162306a36Sopenharmony_ci#define CF_RECV_PENDING 1
8262306a36Sopenharmony_ci#define CF_SEND_PENDING 2
8362306a36Sopenharmony_ci#define CF_RECV_INTR 3
8462306a36Sopenharmony_ci#define CF_IO_STOP 4
8562306a36Sopenharmony_ci#define CF_IS_OTHERCON 5
8662306a36Sopenharmony_ci	struct list_head writequeue;  /* List of outgoing writequeue_entries */
8762306a36Sopenharmony_ci	spinlock_t writequeue_lock;
8862306a36Sopenharmony_ci	int retries;
8962306a36Sopenharmony_ci	struct hlist_node list;
9062306a36Sopenharmony_ci	/* due some connect()/accept() races we currently have this cross over
9162306a36Sopenharmony_ci	 * connection attempt second connection for one node.
9262306a36Sopenharmony_ci	 *
9362306a36Sopenharmony_ci	 * There is a solution to avoid the race by introducing a connect
9462306a36Sopenharmony_ci	 * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to
9562306a36Sopenharmony_ci	 * connect. Otherside can connect but will only be considered that
9662306a36Sopenharmony_ci	 * the other side wants to have a reconnect.
9762306a36Sopenharmony_ci	 *
9862306a36Sopenharmony_ci	 * However changing to this behaviour will break backwards compatible.
9962306a36Sopenharmony_ci	 * In a DLM protocol major version upgrade we should remove this!
10062306a36Sopenharmony_ci	 */
10162306a36Sopenharmony_ci	struct connection *othercon;
10262306a36Sopenharmony_ci	struct work_struct rwork; /* receive worker */
10362306a36Sopenharmony_ci	struct work_struct swork; /* send worker */
10462306a36Sopenharmony_ci	wait_queue_head_t shutdown_wait;
10562306a36Sopenharmony_ci	unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE];
10662306a36Sopenharmony_ci	int rx_leftover;
10762306a36Sopenharmony_ci	int mark;
10862306a36Sopenharmony_ci	int addr_count;
10962306a36Sopenharmony_ci	int curr_addr_index;
11062306a36Sopenharmony_ci	struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT];
11162306a36Sopenharmony_ci	spinlock_t addrs_lock;
11262306a36Sopenharmony_ci	struct rcu_head rcu;
11362306a36Sopenharmony_ci};
11462306a36Sopenharmony_ci#define sock2con(x) ((struct connection *)(x)->sk_user_data)
11562306a36Sopenharmony_ci
11662306a36Sopenharmony_cistruct listen_connection {
11762306a36Sopenharmony_ci	struct socket *sock;
11862306a36Sopenharmony_ci	struct work_struct rwork;
11962306a36Sopenharmony_ci};
12062306a36Sopenharmony_ci
12162306a36Sopenharmony_ci#define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
12262306a36Sopenharmony_ci#define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
12362306a36Sopenharmony_ci
12462306a36Sopenharmony_ci/* An entry waiting to be sent */
12562306a36Sopenharmony_cistruct writequeue_entry {
12662306a36Sopenharmony_ci	struct list_head list;
12762306a36Sopenharmony_ci	struct page *page;
12862306a36Sopenharmony_ci	int offset;
12962306a36Sopenharmony_ci	int len;
13062306a36Sopenharmony_ci	int end;
13162306a36Sopenharmony_ci	int users;
13262306a36Sopenharmony_ci	bool dirty;
13362306a36Sopenharmony_ci	struct connection *con;
13462306a36Sopenharmony_ci	struct list_head msgs;
13562306a36Sopenharmony_ci	struct kref ref;
13662306a36Sopenharmony_ci};
13762306a36Sopenharmony_ci
13862306a36Sopenharmony_cistruct dlm_msg {
13962306a36Sopenharmony_ci	struct writequeue_entry *entry;
14062306a36Sopenharmony_ci	struct dlm_msg *orig_msg;
14162306a36Sopenharmony_ci	bool retransmit;
14262306a36Sopenharmony_ci	void *ppc;
14362306a36Sopenharmony_ci	int len;
14462306a36Sopenharmony_ci	int idx; /* new()/commit() idx exchange */
14562306a36Sopenharmony_ci
14662306a36Sopenharmony_ci	struct list_head list;
14762306a36Sopenharmony_ci	struct kref ref;
14862306a36Sopenharmony_ci};
14962306a36Sopenharmony_ci
15062306a36Sopenharmony_cistruct processqueue_entry {
15162306a36Sopenharmony_ci	unsigned char *buf;
15262306a36Sopenharmony_ci	int nodeid;
15362306a36Sopenharmony_ci	int buflen;
15462306a36Sopenharmony_ci
15562306a36Sopenharmony_ci	struct list_head list;
15662306a36Sopenharmony_ci};
15762306a36Sopenharmony_ci
15862306a36Sopenharmony_cistruct dlm_proto_ops {
15962306a36Sopenharmony_ci	bool try_new_addr;
16062306a36Sopenharmony_ci	const char *name;
16162306a36Sopenharmony_ci	int proto;
16262306a36Sopenharmony_ci
16362306a36Sopenharmony_ci	int (*connect)(struct connection *con, struct socket *sock,
16462306a36Sopenharmony_ci		       struct sockaddr *addr, int addr_len);
16562306a36Sopenharmony_ci	void (*sockopts)(struct socket *sock);
16662306a36Sopenharmony_ci	int (*bind)(struct socket *sock);
16762306a36Sopenharmony_ci	int (*listen_validate)(void);
16862306a36Sopenharmony_ci	void (*listen_sockopts)(struct socket *sock);
16962306a36Sopenharmony_ci	int (*listen_bind)(struct socket *sock);
17062306a36Sopenharmony_ci};
17162306a36Sopenharmony_ci
17262306a36Sopenharmony_cistatic struct listen_sock_callbacks {
17362306a36Sopenharmony_ci	void (*sk_error_report)(struct sock *);
17462306a36Sopenharmony_ci	void (*sk_data_ready)(struct sock *);
17562306a36Sopenharmony_ci	void (*sk_state_change)(struct sock *);
17662306a36Sopenharmony_ci	void (*sk_write_space)(struct sock *);
17762306a36Sopenharmony_ci} listen_sock;
17862306a36Sopenharmony_ci
17962306a36Sopenharmony_cistatic struct listen_connection listen_con;
18062306a36Sopenharmony_cistatic struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT];
18162306a36Sopenharmony_cistatic int dlm_local_count;
18262306a36Sopenharmony_ci
18362306a36Sopenharmony_ci/* Work queues */
18462306a36Sopenharmony_cistatic struct workqueue_struct *io_workqueue;
18562306a36Sopenharmony_cistatic struct workqueue_struct *process_workqueue;
18662306a36Sopenharmony_ci
18762306a36Sopenharmony_cistatic struct hlist_head connection_hash[CONN_HASH_SIZE];
18862306a36Sopenharmony_cistatic DEFINE_SPINLOCK(connections_lock);
18962306a36Sopenharmony_ciDEFINE_STATIC_SRCU(connections_srcu);
19062306a36Sopenharmony_ci
19162306a36Sopenharmony_cistatic const struct dlm_proto_ops *dlm_proto_ops;
19262306a36Sopenharmony_ci
19362306a36Sopenharmony_ci#define DLM_IO_SUCCESS 0
19462306a36Sopenharmony_ci#define DLM_IO_END 1
19562306a36Sopenharmony_ci#define DLM_IO_EOF 2
19662306a36Sopenharmony_ci#define DLM_IO_RESCHED 3
19762306a36Sopenharmony_ci
19862306a36Sopenharmony_cistatic void process_recv_sockets(struct work_struct *work);
19962306a36Sopenharmony_cistatic void process_send_sockets(struct work_struct *work);
20062306a36Sopenharmony_cistatic void process_dlm_messages(struct work_struct *work);
20162306a36Sopenharmony_ci
20262306a36Sopenharmony_cistatic DECLARE_WORK(process_work, process_dlm_messages);
20362306a36Sopenharmony_cistatic DEFINE_SPINLOCK(processqueue_lock);
20462306a36Sopenharmony_cistatic bool process_dlm_messages_pending;
20562306a36Sopenharmony_cistatic LIST_HEAD(processqueue);
20662306a36Sopenharmony_ci
20762306a36Sopenharmony_cibool dlm_lowcomms_is_running(void)
20862306a36Sopenharmony_ci{
20962306a36Sopenharmony_ci	return !!listen_con.sock;
21062306a36Sopenharmony_ci}
21162306a36Sopenharmony_ci
21262306a36Sopenharmony_cistatic void lowcomms_queue_swork(struct connection *con)
21362306a36Sopenharmony_ci{
21462306a36Sopenharmony_ci	assert_spin_locked(&con->writequeue_lock);
21562306a36Sopenharmony_ci
21662306a36Sopenharmony_ci	if (!test_bit(CF_IO_STOP, &con->flags) &&
21762306a36Sopenharmony_ci	    !test_bit(CF_APP_LIMITED, &con->flags) &&
21862306a36Sopenharmony_ci	    !test_and_set_bit(CF_SEND_PENDING, &con->flags))
21962306a36Sopenharmony_ci		queue_work(io_workqueue, &con->swork);
22062306a36Sopenharmony_ci}
22162306a36Sopenharmony_ci
22262306a36Sopenharmony_cistatic void lowcomms_queue_rwork(struct connection *con)
22362306a36Sopenharmony_ci{
22462306a36Sopenharmony_ci#ifdef CONFIG_LOCKDEP
22562306a36Sopenharmony_ci	WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk));
22662306a36Sopenharmony_ci#endif
22762306a36Sopenharmony_ci
22862306a36Sopenharmony_ci	if (!test_bit(CF_IO_STOP, &con->flags) &&
22962306a36Sopenharmony_ci	    !test_and_set_bit(CF_RECV_PENDING, &con->flags))
23062306a36Sopenharmony_ci		queue_work(io_workqueue, &con->rwork);
23162306a36Sopenharmony_ci}
23262306a36Sopenharmony_ci
23362306a36Sopenharmony_cistatic void writequeue_entry_ctor(void *data)
23462306a36Sopenharmony_ci{
23562306a36Sopenharmony_ci	struct writequeue_entry *entry = data;
23662306a36Sopenharmony_ci
23762306a36Sopenharmony_ci	INIT_LIST_HEAD(&entry->msgs);
23862306a36Sopenharmony_ci}
23962306a36Sopenharmony_ci
24062306a36Sopenharmony_cistruct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
24162306a36Sopenharmony_ci{
24262306a36Sopenharmony_ci	return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry),
24362306a36Sopenharmony_ci				 0, 0, writequeue_entry_ctor);
24462306a36Sopenharmony_ci}
24562306a36Sopenharmony_ci
24662306a36Sopenharmony_cistruct kmem_cache *dlm_lowcomms_msg_cache_create(void)
24762306a36Sopenharmony_ci{
24862306a36Sopenharmony_ci	return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL);
24962306a36Sopenharmony_ci}
25062306a36Sopenharmony_ci
25162306a36Sopenharmony_ci/* need to held writequeue_lock */
25262306a36Sopenharmony_cistatic struct writequeue_entry *con_next_wq(struct connection *con)
25362306a36Sopenharmony_ci{
25462306a36Sopenharmony_ci	struct writequeue_entry *e;
25562306a36Sopenharmony_ci
25662306a36Sopenharmony_ci	e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry,
25762306a36Sopenharmony_ci				     list);
25862306a36Sopenharmony_ci	/* if len is zero nothing is to send, if there are users filling
25962306a36Sopenharmony_ci	 * buffers we wait until the users are done so we can send more.
26062306a36Sopenharmony_ci	 */
26162306a36Sopenharmony_ci	if (!e || e->users || e->len == 0)
26262306a36Sopenharmony_ci		return NULL;
26362306a36Sopenharmony_ci
26462306a36Sopenharmony_ci	return e;
26562306a36Sopenharmony_ci}
26662306a36Sopenharmony_ci
26762306a36Sopenharmony_cistatic struct connection *__find_con(int nodeid, int r)
26862306a36Sopenharmony_ci{
26962306a36Sopenharmony_ci	struct connection *con;
27062306a36Sopenharmony_ci
27162306a36Sopenharmony_ci	hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
27262306a36Sopenharmony_ci		if (con->nodeid == nodeid)
27362306a36Sopenharmony_ci			return con;
27462306a36Sopenharmony_ci	}
27562306a36Sopenharmony_ci
27662306a36Sopenharmony_ci	return NULL;
27762306a36Sopenharmony_ci}
27862306a36Sopenharmony_ci
27962306a36Sopenharmony_cistatic void dlm_con_init(struct connection *con, int nodeid)
28062306a36Sopenharmony_ci{
28162306a36Sopenharmony_ci	con->nodeid = nodeid;
28262306a36Sopenharmony_ci	init_rwsem(&con->sock_lock);
28362306a36Sopenharmony_ci	INIT_LIST_HEAD(&con->writequeue);
28462306a36Sopenharmony_ci	spin_lock_init(&con->writequeue_lock);
28562306a36Sopenharmony_ci	INIT_WORK(&con->swork, process_send_sockets);
28662306a36Sopenharmony_ci	INIT_WORK(&con->rwork, process_recv_sockets);
28762306a36Sopenharmony_ci	spin_lock_init(&con->addrs_lock);
28862306a36Sopenharmony_ci	init_waitqueue_head(&con->shutdown_wait);
28962306a36Sopenharmony_ci}
29062306a36Sopenharmony_ci
29162306a36Sopenharmony_ci/*
29262306a36Sopenharmony_ci * If 'allocation' is zero then we don't attempt to create a new
29362306a36Sopenharmony_ci * connection structure for this node.
29462306a36Sopenharmony_ci */
29562306a36Sopenharmony_cistatic struct connection *nodeid2con(int nodeid, gfp_t alloc)
29662306a36Sopenharmony_ci{
29762306a36Sopenharmony_ci	struct connection *con, *tmp;
29862306a36Sopenharmony_ci	int r;
29962306a36Sopenharmony_ci
30062306a36Sopenharmony_ci	r = nodeid_hash(nodeid);
30162306a36Sopenharmony_ci	con = __find_con(nodeid, r);
30262306a36Sopenharmony_ci	if (con || !alloc)
30362306a36Sopenharmony_ci		return con;
30462306a36Sopenharmony_ci
30562306a36Sopenharmony_ci	con = kzalloc(sizeof(*con), alloc);
30662306a36Sopenharmony_ci	if (!con)
30762306a36Sopenharmony_ci		return NULL;
30862306a36Sopenharmony_ci
30962306a36Sopenharmony_ci	dlm_con_init(con, nodeid);
31062306a36Sopenharmony_ci
31162306a36Sopenharmony_ci	spin_lock(&connections_lock);
31262306a36Sopenharmony_ci	/* Because multiple workqueues/threads calls this function it can
31362306a36Sopenharmony_ci	 * race on multiple cpu's. Instead of locking hot path __find_con()
31462306a36Sopenharmony_ci	 * we just check in rare cases of recently added nodes again
31562306a36Sopenharmony_ci	 * under protection of connections_lock. If this is the case we
31662306a36Sopenharmony_ci	 * abort our connection creation and return the existing connection.
31762306a36Sopenharmony_ci	 */
31862306a36Sopenharmony_ci	tmp = __find_con(nodeid, r);
31962306a36Sopenharmony_ci	if (tmp) {
32062306a36Sopenharmony_ci		spin_unlock(&connections_lock);
32162306a36Sopenharmony_ci		kfree(con);
32262306a36Sopenharmony_ci		return tmp;
32362306a36Sopenharmony_ci	}
32462306a36Sopenharmony_ci
32562306a36Sopenharmony_ci	hlist_add_head_rcu(&con->list, &connection_hash[r]);
32662306a36Sopenharmony_ci	spin_unlock(&connections_lock);
32762306a36Sopenharmony_ci
32862306a36Sopenharmony_ci	return con;
32962306a36Sopenharmony_ci}
33062306a36Sopenharmony_ci
33162306a36Sopenharmony_cistatic int addr_compare(const struct sockaddr_storage *x,
33262306a36Sopenharmony_ci			const struct sockaddr_storage *y)
33362306a36Sopenharmony_ci{
33462306a36Sopenharmony_ci	switch (x->ss_family) {
33562306a36Sopenharmony_ci	case AF_INET: {
33662306a36Sopenharmony_ci		struct sockaddr_in *sinx = (struct sockaddr_in *)x;
33762306a36Sopenharmony_ci		struct sockaddr_in *siny = (struct sockaddr_in *)y;
33862306a36Sopenharmony_ci		if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
33962306a36Sopenharmony_ci			return 0;
34062306a36Sopenharmony_ci		if (sinx->sin_port != siny->sin_port)
34162306a36Sopenharmony_ci			return 0;
34262306a36Sopenharmony_ci		break;
34362306a36Sopenharmony_ci	}
34462306a36Sopenharmony_ci	case AF_INET6: {
34562306a36Sopenharmony_ci		struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
34662306a36Sopenharmony_ci		struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
34762306a36Sopenharmony_ci		if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
34862306a36Sopenharmony_ci			return 0;
34962306a36Sopenharmony_ci		if (sinx->sin6_port != siny->sin6_port)
35062306a36Sopenharmony_ci			return 0;
35162306a36Sopenharmony_ci		break;
35262306a36Sopenharmony_ci	}
35362306a36Sopenharmony_ci	default:
35462306a36Sopenharmony_ci		return 0;
35562306a36Sopenharmony_ci	}
35662306a36Sopenharmony_ci	return 1;
35762306a36Sopenharmony_ci}
35862306a36Sopenharmony_ci
35962306a36Sopenharmony_cistatic int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
36062306a36Sopenharmony_ci			  struct sockaddr *sa_out, bool try_new_addr,
36162306a36Sopenharmony_ci			  unsigned int *mark)
36262306a36Sopenharmony_ci{
36362306a36Sopenharmony_ci	struct sockaddr_storage sas;
36462306a36Sopenharmony_ci	struct connection *con;
36562306a36Sopenharmony_ci	int idx;
36662306a36Sopenharmony_ci
36762306a36Sopenharmony_ci	if (!dlm_local_count)
36862306a36Sopenharmony_ci		return -1;
36962306a36Sopenharmony_ci
37062306a36Sopenharmony_ci	idx = srcu_read_lock(&connections_srcu);
37162306a36Sopenharmony_ci	con = nodeid2con(nodeid, 0);
37262306a36Sopenharmony_ci	if (!con) {
37362306a36Sopenharmony_ci		srcu_read_unlock(&connections_srcu, idx);
37462306a36Sopenharmony_ci		return -ENOENT;
37562306a36Sopenharmony_ci	}
37662306a36Sopenharmony_ci
37762306a36Sopenharmony_ci	spin_lock(&con->addrs_lock);
37862306a36Sopenharmony_ci	if (!con->addr_count) {
37962306a36Sopenharmony_ci		spin_unlock(&con->addrs_lock);
38062306a36Sopenharmony_ci		srcu_read_unlock(&connections_srcu, idx);
38162306a36Sopenharmony_ci		return -ENOENT;
38262306a36Sopenharmony_ci	}
38362306a36Sopenharmony_ci
38462306a36Sopenharmony_ci	memcpy(&sas, &con->addr[con->curr_addr_index],
38562306a36Sopenharmony_ci	       sizeof(struct sockaddr_storage));
38662306a36Sopenharmony_ci
38762306a36Sopenharmony_ci	if (try_new_addr) {
38862306a36Sopenharmony_ci		con->curr_addr_index++;
38962306a36Sopenharmony_ci		if (con->curr_addr_index == con->addr_count)
39062306a36Sopenharmony_ci			con->curr_addr_index = 0;
39162306a36Sopenharmony_ci	}
39262306a36Sopenharmony_ci
39362306a36Sopenharmony_ci	*mark = con->mark;
39462306a36Sopenharmony_ci	spin_unlock(&con->addrs_lock);
39562306a36Sopenharmony_ci
39662306a36Sopenharmony_ci	if (sas_out)
39762306a36Sopenharmony_ci		memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
39862306a36Sopenharmony_ci
39962306a36Sopenharmony_ci	if (!sa_out) {
40062306a36Sopenharmony_ci		srcu_read_unlock(&connections_srcu, idx);
40162306a36Sopenharmony_ci		return 0;
40262306a36Sopenharmony_ci	}
40362306a36Sopenharmony_ci
40462306a36Sopenharmony_ci	if (dlm_local_addr[0].ss_family == AF_INET) {
40562306a36Sopenharmony_ci		struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
40662306a36Sopenharmony_ci		struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
40762306a36Sopenharmony_ci		ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
40862306a36Sopenharmony_ci	} else {
40962306a36Sopenharmony_ci		struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
41062306a36Sopenharmony_ci		struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
41162306a36Sopenharmony_ci		ret6->sin6_addr = in6->sin6_addr;
41262306a36Sopenharmony_ci	}
41362306a36Sopenharmony_ci
41462306a36Sopenharmony_ci	srcu_read_unlock(&connections_srcu, idx);
41562306a36Sopenharmony_ci	return 0;
41662306a36Sopenharmony_ci}
41762306a36Sopenharmony_ci
41862306a36Sopenharmony_cistatic int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
41962306a36Sopenharmony_ci			  unsigned int *mark)
42062306a36Sopenharmony_ci{
42162306a36Sopenharmony_ci	struct connection *con;
42262306a36Sopenharmony_ci	int i, idx, addr_i;
42362306a36Sopenharmony_ci
42462306a36Sopenharmony_ci	idx = srcu_read_lock(&connections_srcu);
42562306a36Sopenharmony_ci	for (i = 0; i < CONN_HASH_SIZE; i++) {
42662306a36Sopenharmony_ci		hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
42762306a36Sopenharmony_ci			WARN_ON_ONCE(!con->addr_count);
42862306a36Sopenharmony_ci
42962306a36Sopenharmony_ci			spin_lock(&con->addrs_lock);
43062306a36Sopenharmony_ci			for (addr_i = 0; addr_i < con->addr_count; addr_i++) {
43162306a36Sopenharmony_ci				if (addr_compare(&con->addr[addr_i], addr)) {
43262306a36Sopenharmony_ci					*nodeid = con->nodeid;
43362306a36Sopenharmony_ci					*mark = con->mark;
43462306a36Sopenharmony_ci					spin_unlock(&con->addrs_lock);
43562306a36Sopenharmony_ci					srcu_read_unlock(&connections_srcu, idx);
43662306a36Sopenharmony_ci					return 0;
43762306a36Sopenharmony_ci				}
43862306a36Sopenharmony_ci			}
43962306a36Sopenharmony_ci			spin_unlock(&con->addrs_lock);
44062306a36Sopenharmony_ci		}
44162306a36Sopenharmony_ci	}
44262306a36Sopenharmony_ci	srcu_read_unlock(&connections_srcu, idx);
44362306a36Sopenharmony_ci
44462306a36Sopenharmony_ci	return -ENOENT;
44562306a36Sopenharmony_ci}
44662306a36Sopenharmony_ci
44762306a36Sopenharmony_cistatic bool dlm_lowcomms_con_has_addr(const struct connection *con,
44862306a36Sopenharmony_ci				      const struct sockaddr_storage *addr)
44962306a36Sopenharmony_ci{
45062306a36Sopenharmony_ci	int i;
45162306a36Sopenharmony_ci
45262306a36Sopenharmony_ci	for (i = 0; i < con->addr_count; i++) {
45362306a36Sopenharmony_ci		if (addr_compare(&con->addr[i], addr))
45462306a36Sopenharmony_ci			return true;
45562306a36Sopenharmony_ci	}
45662306a36Sopenharmony_ci
45762306a36Sopenharmony_ci	return false;
45862306a36Sopenharmony_ci}
45962306a36Sopenharmony_ci
46062306a36Sopenharmony_ciint dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
46162306a36Sopenharmony_ci{
46262306a36Sopenharmony_ci	struct connection *con;
46362306a36Sopenharmony_ci	bool ret, idx;
46462306a36Sopenharmony_ci
46562306a36Sopenharmony_ci	idx = srcu_read_lock(&connections_srcu);
46662306a36Sopenharmony_ci	con = nodeid2con(nodeid, GFP_NOFS);
46762306a36Sopenharmony_ci	if (!con) {
46862306a36Sopenharmony_ci		srcu_read_unlock(&connections_srcu, idx);
46962306a36Sopenharmony_ci		return -ENOMEM;
47062306a36Sopenharmony_ci	}
47162306a36Sopenharmony_ci
47262306a36Sopenharmony_ci	spin_lock(&con->addrs_lock);
47362306a36Sopenharmony_ci	if (!con->addr_count) {
47462306a36Sopenharmony_ci		memcpy(&con->addr[0], addr, sizeof(*addr));
47562306a36Sopenharmony_ci		con->addr_count = 1;
47662306a36Sopenharmony_ci		con->mark = dlm_config.ci_mark;
47762306a36Sopenharmony_ci		spin_unlock(&con->addrs_lock);
47862306a36Sopenharmony_ci		srcu_read_unlock(&connections_srcu, idx);
47962306a36Sopenharmony_ci		return 0;
48062306a36Sopenharmony_ci	}
48162306a36Sopenharmony_ci
48262306a36Sopenharmony_ci	ret = dlm_lowcomms_con_has_addr(con, addr);
48362306a36Sopenharmony_ci	if (ret) {
48462306a36Sopenharmony_ci		spin_unlock(&con->addrs_lock);
48562306a36Sopenharmony_ci		srcu_read_unlock(&connections_srcu, idx);
48662306a36Sopenharmony_ci		return -EEXIST;
48762306a36Sopenharmony_ci	}
48862306a36Sopenharmony_ci
48962306a36Sopenharmony_ci	if (con->addr_count >= DLM_MAX_ADDR_COUNT) {
49062306a36Sopenharmony_ci		spin_unlock(&con->addrs_lock);
49162306a36Sopenharmony_ci		srcu_read_unlock(&connections_srcu, idx);
49262306a36Sopenharmony_ci		return -ENOSPC;
49362306a36Sopenharmony_ci	}
49462306a36Sopenharmony_ci
49562306a36Sopenharmony_ci	memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr));
49662306a36Sopenharmony_ci	srcu_read_unlock(&connections_srcu, idx);
49762306a36Sopenharmony_ci	spin_unlock(&con->addrs_lock);
49862306a36Sopenharmony_ci	return 0;
49962306a36Sopenharmony_ci}
50062306a36Sopenharmony_ci
50162306a36Sopenharmony_ci/* Data available on socket or listen socket received a connect */
50262306a36Sopenharmony_cistatic void lowcomms_data_ready(struct sock *sk)
50362306a36Sopenharmony_ci{
50462306a36Sopenharmony_ci	struct connection *con = sock2con(sk);
50562306a36Sopenharmony_ci
50662306a36Sopenharmony_ci	trace_sk_data_ready(sk);
50762306a36Sopenharmony_ci
50862306a36Sopenharmony_ci	set_bit(CF_RECV_INTR, &con->flags);
50962306a36Sopenharmony_ci	lowcomms_queue_rwork(con);
51062306a36Sopenharmony_ci}
51162306a36Sopenharmony_ci
51262306a36Sopenharmony_cistatic void lowcomms_write_space(struct sock *sk)
51362306a36Sopenharmony_ci{
51462306a36Sopenharmony_ci	struct connection *con = sock2con(sk);
51562306a36Sopenharmony_ci
51662306a36Sopenharmony_ci	clear_bit(SOCK_NOSPACE, &con->sock->flags);
51762306a36Sopenharmony_ci
51862306a36Sopenharmony_ci	spin_lock_bh(&con->writequeue_lock);
51962306a36Sopenharmony_ci	if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
52062306a36Sopenharmony_ci		con->sock->sk->sk_write_pending--;
52162306a36Sopenharmony_ci		clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
52262306a36Sopenharmony_ci	}
52362306a36Sopenharmony_ci
52462306a36Sopenharmony_ci	lowcomms_queue_swork(con);
52562306a36Sopenharmony_ci	spin_unlock_bh(&con->writequeue_lock);
52662306a36Sopenharmony_ci}
52762306a36Sopenharmony_ci
52862306a36Sopenharmony_cistatic void lowcomms_state_change(struct sock *sk)
52962306a36Sopenharmony_ci{
53062306a36Sopenharmony_ci	/* SCTP layer is not calling sk_data_ready when the connection
53162306a36Sopenharmony_ci	 * is done, so we catch the signal through here.
53262306a36Sopenharmony_ci	 */
53362306a36Sopenharmony_ci	if (sk->sk_shutdown == RCV_SHUTDOWN)
53462306a36Sopenharmony_ci		lowcomms_data_ready(sk);
53562306a36Sopenharmony_ci}
53662306a36Sopenharmony_ci
53762306a36Sopenharmony_cistatic void lowcomms_listen_data_ready(struct sock *sk)
53862306a36Sopenharmony_ci{
53962306a36Sopenharmony_ci	trace_sk_data_ready(sk);
54062306a36Sopenharmony_ci
54162306a36Sopenharmony_ci	queue_work(io_workqueue, &listen_con.rwork);
54262306a36Sopenharmony_ci}
54362306a36Sopenharmony_ci
54462306a36Sopenharmony_ciint dlm_lowcomms_connect_node(int nodeid)
54562306a36Sopenharmony_ci{
54662306a36Sopenharmony_ci	struct connection *con;
54762306a36Sopenharmony_ci	int idx;
54862306a36Sopenharmony_ci
54962306a36Sopenharmony_ci	idx = srcu_read_lock(&connections_srcu);
55062306a36Sopenharmony_ci	con = nodeid2con(nodeid, 0);
55162306a36Sopenharmony_ci	if (WARN_ON_ONCE(!con)) {
55262306a36Sopenharmony_ci		srcu_read_unlock(&connections_srcu, idx);
55362306a36Sopenharmony_ci		return -ENOENT;
55462306a36Sopenharmony_ci	}
55562306a36Sopenharmony_ci
55662306a36Sopenharmony_ci	down_read(&con->sock_lock);
55762306a36Sopenharmony_ci	if (!con->sock) {
55862306a36Sopenharmony_ci		spin_lock_bh(&con->writequeue_lock);
55962306a36Sopenharmony_ci		lowcomms_queue_swork(con);
56062306a36Sopenharmony_ci		spin_unlock_bh(&con->writequeue_lock);
56162306a36Sopenharmony_ci	}
56262306a36Sopenharmony_ci	up_read(&con->sock_lock);
56362306a36Sopenharmony_ci	srcu_read_unlock(&connections_srcu, idx);
56462306a36Sopenharmony_ci
56562306a36Sopenharmony_ci	cond_resched();
56662306a36Sopenharmony_ci	return 0;
56762306a36Sopenharmony_ci}
56862306a36Sopenharmony_ci
56962306a36Sopenharmony_ciint dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
57062306a36Sopenharmony_ci{
57162306a36Sopenharmony_ci	struct connection *con;
57262306a36Sopenharmony_ci	int idx;
57362306a36Sopenharmony_ci
57462306a36Sopenharmony_ci	idx = srcu_read_lock(&connections_srcu);
57562306a36Sopenharmony_ci	con = nodeid2con(nodeid, 0);
57662306a36Sopenharmony_ci	if (!con) {
57762306a36Sopenharmony_ci		srcu_read_unlock(&connections_srcu, idx);
57862306a36Sopenharmony_ci		return -ENOENT;
57962306a36Sopenharmony_ci	}
58062306a36Sopenharmony_ci
58162306a36Sopenharmony_ci	spin_lock(&con->addrs_lock);
58262306a36Sopenharmony_ci	con->mark = mark;
58362306a36Sopenharmony_ci	spin_unlock(&con->addrs_lock);
58462306a36Sopenharmony_ci	srcu_read_unlock(&connections_srcu, idx);
58562306a36Sopenharmony_ci	return 0;
58662306a36Sopenharmony_ci}
58762306a36Sopenharmony_ci
58862306a36Sopenharmony_cistatic void lowcomms_error_report(struct sock *sk)
58962306a36Sopenharmony_ci{
59062306a36Sopenharmony_ci	struct connection *con = sock2con(sk);
59162306a36Sopenharmony_ci	struct inet_sock *inet;
59262306a36Sopenharmony_ci
59362306a36Sopenharmony_ci	inet = inet_sk(sk);
59462306a36Sopenharmony_ci	switch (sk->sk_family) {
59562306a36Sopenharmony_ci	case AF_INET:
59662306a36Sopenharmony_ci		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
59762306a36Sopenharmony_ci				   "sending to node %d at %pI4, dport %d, "
59862306a36Sopenharmony_ci				   "sk_err=%d/%d\n", dlm_our_nodeid(),
59962306a36Sopenharmony_ci				   con->nodeid, &inet->inet_daddr,
60062306a36Sopenharmony_ci				   ntohs(inet->inet_dport), sk->sk_err,
60162306a36Sopenharmony_ci				   READ_ONCE(sk->sk_err_soft));
60262306a36Sopenharmony_ci		break;
60362306a36Sopenharmony_ci#if IS_ENABLED(CONFIG_IPV6)
60462306a36Sopenharmony_ci	case AF_INET6:
60562306a36Sopenharmony_ci		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
60662306a36Sopenharmony_ci				   "sending to node %d at %pI6c, "
60762306a36Sopenharmony_ci				   "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(),
60862306a36Sopenharmony_ci				   con->nodeid, &sk->sk_v6_daddr,
60962306a36Sopenharmony_ci				   ntohs(inet->inet_dport), sk->sk_err,
61062306a36Sopenharmony_ci				   READ_ONCE(sk->sk_err_soft));
61162306a36Sopenharmony_ci		break;
61262306a36Sopenharmony_ci#endif
61362306a36Sopenharmony_ci	default:
61462306a36Sopenharmony_ci		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
61562306a36Sopenharmony_ci				   "invalid socket family %d set, "
61662306a36Sopenharmony_ci				   "sk_err=%d/%d\n", dlm_our_nodeid(),
61762306a36Sopenharmony_ci				   sk->sk_family, sk->sk_err,
61862306a36Sopenharmony_ci				   READ_ONCE(sk->sk_err_soft));
61962306a36Sopenharmony_ci		break;
62062306a36Sopenharmony_ci	}
62162306a36Sopenharmony_ci
62262306a36Sopenharmony_ci	dlm_midcomms_unack_msg_resend(con->nodeid);
62362306a36Sopenharmony_ci
62462306a36Sopenharmony_ci	listen_sock.sk_error_report(sk);
62562306a36Sopenharmony_ci}
62662306a36Sopenharmony_ci
62762306a36Sopenharmony_cistatic void restore_callbacks(struct sock *sk)
62862306a36Sopenharmony_ci{
62962306a36Sopenharmony_ci#ifdef CONFIG_LOCKDEP
63062306a36Sopenharmony_ci	WARN_ON_ONCE(!lockdep_sock_is_held(sk));
63162306a36Sopenharmony_ci#endif
63262306a36Sopenharmony_ci
63362306a36Sopenharmony_ci	sk->sk_user_data = NULL;
63462306a36Sopenharmony_ci	sk->sk_data_ready = listen_sock.sk_data_ready;
63562306a36Sopenharmony_ci	sk->sk_state_change = listen_sock.sk_state_change;
63662306a36Sopenharmony_ci	sk->sk_write_space = listen_sock.sk_write_space;
63762306a36Sopenharmony_ci	sk->sk_error_report = listen_sock.sk_error_report;
63862306a36Sopenharmony_ci}
63962306a36Sopenharmony_ci
64062306a36Sopenharmony_ci/* Make a socket active */
64162306a36Sopenharmony_cistatic void add_sock(struct socket *sock, struct connection *con)
64262306a36Sopenharmony_ci{
64362306a36Sopenharmony_ci	struct sock *sk = sock->sk;
64462306a36Sopenharmony_ci
64562306a36Sopenharmony_ci	lock_sock(sk);
64662306a36Sopenharmony_ci	con->sock = sock;
64762306a36Sopenharmony_ci
64862306a36Sopenharmony_ci	sk->sk_user_data = con;
64962306a36Sopenharmony_ci	sk->sk_data_ready = lowcomms_data_ready;
65062306a36Sopenharmony_ci	sk->sk_write_space = lowcomms_write_space;
65162306a36Sopenharmony_ci	if (dlm_config.ci_protocol == DLM_PROTO_SCTP)
65262306a36Sopenharmony_ci		sk->sk_state_change = lowcomms_state_change;
65362306a36Sopenharmony_ci	sk->sk_allocation = GFP_NOFS;
65462306a36Sopenharmony_ci	sk->sk_use_task_frag = false;
65562306a36Sopenharmony_ci	sk->sk_error_report = lowcomms_error_report;
65662306a36Sopenharmony_ci	release_sock(sk);
65762306a36Sopenharmony_ci}
65862306a36Sopenharmony_ci
65962306a36Sopenharmony_ci/* Add the port number to an IPv6 or 4 sockaddr and return the address
66062306a36Sopenharmony_ci   length */
66162306a36Sopenharmony_cistatic void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
66262306a36Sopenharmony_ci			  int *addr_len)
66362306a36Sopenharmony_ci{
66462306a36Sopenharmony_ci	saddr->ss_family =  dlm_local_addr[0].ss_family;
66562306a36Sopenharmony_ci	if (saddr->ss_family == AF_INET) {
66662306a36Sopenharmony_ci		struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
66762306a36Sopenharmony_ci		in4_addr->sin_port = cpu_to_be16(port);
66862306a36Sopenharmony_ci		*addr_len = sizeof(struct sockaddr_in);
66962306a36Sopenharmony_ci		memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
67062306a36Sopenharmony_ci	} else {
67162306a36Sopenharmony_ci		struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
67262306a36Sopenharmony_ci		in6_addr->sin6_port = cpu_to_be16(port);
67362306a36Sopenharmony_ci		*addr_len = sizeof(struct sockaddr_in6);
67462306a36Sopenharmony_ci	}
67562306a36Sopenharmony_ci	memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
67662306a36Sopenharmony_ci}
67762306a36Sopenharmony_ci
67862306a36Sopenharmony_cistatic void dlm_page_release(struct kref *kref)
67962306a36Sopenharmony_ci{
68062306a36Sopenharmony_ci	struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
68162306a36Sopenharmony_ci						  ref);
68262306a36Sopenharmony_ci
68362306a36Sopenharmony_ci	__free_page(e->page);
68462306a36Sopenharmony_ci	dlm_free_writequeue(e);
68562306a36Sopenharmony_ci}
68662306a36Sopenharmony_ci
68762306a36Sopenharmony_cistatic void dlm_msg_release(struct kref *kref)
68862306a36Sopenharmony_ci{
68962306a36Sopenharmony_ci	struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
69062306a36Sopenharmony_ci
69162306a36Sopenharmony_ci	kref_put(&msg->entry->ref, dlm_page_release);
69262306a36Sopenharmony_ci	dlm_free_msg(msg);
69362306a36Sopenharmony_ci}
69462306a36Sopenharmony_ci
69562306a36Sopenharmony_cistatic void free_entry(struct writequeue_entry *e)
69662306a36Sopenharmony_ci{
69762306a36Sopenharmony_ci	struct dlm_msg *msg, *tmp;
69862306a36Sopenharmony_ci
69962306a36Sopenharmony_ci	list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
70062306a36Sopenharmony_ci		if (msg->orig_msg) {
70162306a36Sopenharmony_ci			msg->orig_msg->retransmit = false;
70262306a36Sopenharmony_ci			kref_put(&msg->orig_msg->ref, dlm_msg_release);
70362306a36Sopenharmony_ci		}
70462306a36Sopenharmony_ci
70562306a36Sopenharmony_ci		list_del(&msg->list);
70662306a36Sopenharmony_ci		kref_put(&msg->ref, dlm_msg_release);
70762306a36Sopenharmony_ci	}
70862306a36Sopenharmony_ci
70962306a36Sopenharmony_ci	list_del(&e->list);
71062306a36Sopenharmony_ci	kref_put(&e->ref, dlm_page_release);
71162306a36Sopenharmony_ci}
71262306a36Sopenharmony_ci
71362306a36Sopenharmony_cistatic void dlm_close_sock(struct socket **sock)
71462306a36Sopenharmony_ci{
71562306a36Sopenharmony_ci	lock_sock((*sock)->sk);
71662306a36Sopenharmony_ci	restore_callbacks((*sock)->sk);
71762306a36Sopenharmony_ci	release_sock((*sock)->sk);
71862306a36Sopenharmony_ci
71962306a36Sopenharmony_ci	sock_release(*sock);
72062306a36Sopenharmony_ci	*sock = NULL;
72162306a36Sopenharmony_ci}
72262306a36Sopenharmony_ci
72362306a36Sopenharmony_cistatic void allow_connection_io(struct connection *con)
72462306a36Sopenharmony_ci{
72562306a36Sopenharmony_ci	if (con->othercon)
72662306a36Sopenharmony_ci		clear_bit(CF_IO_STOP, &con->othercon->flags);
72762306a36Sopenharmony_ci	clear_bit(CF_IO_STOP, &con->flags);
72862306a36Sopenharmony_ci}
72962306a36Sopenharmony_ci
73062306a36Sopenharmony_cistatic void stop_connection_io(struct connection *con)
73162306a36Sopenharmony_ci{
73262306a36Sopenharmony_ci	if (con->othercon)
73362306a36Sopenharmony_ci		stop_connection_io(con->othercon);
73462306a36Sopenharmony_ci
73562306a36Sopenharmony_ci	spin_lock_bh(&con->writequeue_lock);
73662306a36Sopenharmony_ci	set_bit(CF_IO_STOP, &con->flags);
73762306a36Sopenharmony_ci	spin_unlock_bh(&con->writequeue_lock);
73862306a36Sopenharmony_ci
73962306a36Sopenharmony_ci	down_write(&con->sock_lock);
74062306a36Sopenharmony_ci	if (con->sock) {
74162306a36Sopenharmony_ci		lock_sock(con->sock->sk);
74262306a36Sopenharmony_ci		restore_callbacks(con->sock->sk);
74362306a36Sopenharmony_ci		release_sock(con->sock->sk);
74462306a36Sopenharmony_ci	}
74562306a36Sopenharmony_ci	up_write(&con->sock_lock);
74662306a36Sopenharmony_ci
74762306a36Sopenharmony_ci	cancel_work_sync(&con->swork);
74862306a36Sopenharmony_ci	cancel_work_sync(&con->rwork);
74962306a36Sopenharmony_ci}
75062306a36Sopenharmony_ci
75162306a36Sopenharmony_ci/* Close a remote connection and tidy up */
75262306a36Sopenharmony_cistatic void close_connection(struct connection *con, bool and_other)
75362306a36Sopenharmony_ci{
75462306a36Sopenharmony_ci	struct writequeue_entry *e;
75562306a36Sopenharmony_ci
75662306a36Sopenharmony_ci	if (con->othercon && and_other)
75762306a36Sopenharmony_ci		close_connection(con->othercon, false);
75862306a36Sopenharmony_ci
75962306a36Sopenharmony_ci	down_write(&con->sock_lock);
76062306a36Sopenharmony_ci	if (!con->sock) {
76162306a36Sopenharmony_ci		up_write(&con->sock_lock);
76262306a36Sopenharmony_ci		return;
76362306a36Sopenharmony_ci	}
76462306a36Sopenharmony_ci
76562306a36Sopenharmony_ci	dlm_close_sock(&con->sock);
76662306a36Sopenharmony_ci
76762306a36Sopenharmony_ci	/* if we send a writequeue entry only a half way, we drop the
76862306a36Sopenharmony_ci	 * whole entry because reconnection and that we not start of the
76962306a36Sopenharmony_ci	 * middle of a msg which will confuse the other end.
77062306a36Sopenharmony_ci	 *
77162306a36Sopenharmony_ci	 * we can always drop messages because retransmits, but what we
77262306a36Sopenharmony_ci	 * cannot allow is to transmit half messages which may be processed
77362306a36Sopenharmony_ci	 * at the other side.
77462306a36Sopenharmony_ci	 *
77562306a36Sopenharmony_ci	 * our policy is to start on a clean state when disconnects, we don't
77662306a36Sopenharmony_ci	 * know what's send/received on transport layer in this case.
77762306a36Sopenharmony_ci	 */
77862306a36Sopenharmony_ci	spin_lock_bh(&con->writequeue_lock);
77962306a36Sopenharmony_ci	if (!list_empty(&con->writequeue)) {
78062306a36Sopenharmony_ci		e = list_first_entry(&con->writequeue, struct writequeue_entry,
78162306a36Sopenharmony_ci				     list);
78262306a36Sopenharmony_ci		if (e->dirty)
78362306a36Sopenharmony_ci			free_entry(e);
78462306a36Sopenharmony_ci	}
78562306a36Sopenharmony_ci	spin_unlock_bh(&con->writequeue_lock);
78662306a36Sopenharmony_ci
78762306a36Sopenharmony_ci	con->rx_leftover = 0;
78862306a36Sopenharmony_ci	con->retries = 0;
78962306a36Sopenharmony_ci	clear_bit(CF_APP_LIMITED, &con->flags);
79062306a36Sopenharmony_ci	clear_bit(CF_RECV_PENDING, &con->flags);
79162306a36Sopenharmony_ci	clear_bit(CF_SEND_PENDING, &con->flags);
79262306a36Sopenharmony_ci	up_write(&con->sock_lock);
79362306a36Sopenharmony_ci}
79462306a36Sopenharmony_ci
79562306a36Sopenharmony_cistatic void shutdown_connection(struct connection *con, bool and_other)
79662306a36Sopenharmony_ci{
79762306a36Sopenharmony_ci	int ret;
79862306a36Sopenharmony_ci
79962306a36Sopenharmony_ci	if (con->othercon && and_other)
80062306a36Sopenharmony_ci		shutdown_connection(con->othercon, false);
80162306a36Sopenharmony_ci
80262306a36Sopenharmony_ci	flush_workqueue(io_workqueue);
80362306a36Sopenharmony_ci	down_read(&con->sock_lock);
80462306a36Sopenharmony_ci	/* nothing to shutdown */
80562306a36Sopenharmony_ci	if (!con->sock) {
80662306a36Sopenharmony_ci		up_read(&con->sock_lock);
80762306a36Sopenharmony_ci		return;
80862306a36Sopenharmony_ci	}
80962306a36Sopenharmony_ci
81062306a36Sopenharmony_ci	ret = kernel_sock_shutdown(con->sock, SHUT_WR);
81162306a36Sopenharmony_ci	up_read(&con->sock_lock);
81262306a36Sopenharmony_ci	if (ret) {
81362306a36Sopenharmony_ci		log_print("Connection %p failed to shutdown: %d will force close",
81462306a36Sopenharmony_ci			  con, ret);
81562306a36Sopenharmony_ci		goto force_close;
81662306a36Sopenharmony_ci	} else {
81762306a36Sopenharmony_ci		ret = wait_event_timeout(con->shutdown_wait, !con->sock,
81862306a36Sopenharmony_ci					 DLM_SHUTDOWN_WAIT_TIMEOUT);
81962306a36Sopenharmony_ci		if (ret == 0) {
82062306a36Sopenharmony_ci			log_print("Connection %p shutdown timed out, will force close",
82162306a36Sopenharmony_ci				  con);
82262306a36Sopenharmony_ci			goto force_close;
82362306a36Sopenharmony_ci		}
82462306a36Sopenharmony_ci	}
82562306a36Sopenharmony_ci
82662306a36Sopenharmony_ci	return;
82762306a36Sopenharmony_ci
82862306a36Sopenharmony_ciforce_close:
82962306a36Sopenharmony_ci	close_connection(con, false);
83062306a36Sopenharmony_ci}
83162306a36Sopenharmony_ci
83262306a36Sopenharmony_cistatic struct processqueue_entry *new_processqueue_entry(int nodeid,
83362306a36Sopenharmony_ci							 int buflen)
83462306a36Sopenharmony_ci{
83562306a36Sopenharmony_ci	struct processqueue_entry *pentry;
83662306a36Sopenharmony_ci
83762306a36Sopenharmony_ci	pentry = kmalloc(sizeof(*pentry), GFP_NOFS);
83862306a36Sopenharmony_ci	if (!pentry)
83962306a36Sopenharmony_ci		return NULL;
84062306a36Sopenharmony_ci
84162306a36Sopenharmony_ci	pentry->buf = kmalloc(buflen, GFP_NOFS);
84262306a36Sopenharmony_ci	if (!pentry->buf) {
84362306a36Sopenharmony_ci		kfree(pentry);
84462306a36Sopenharmony_ci		return NULL;
84562306a36Sopenharmony_ci	}
84662306a36Sopenharmony_ci
84762306a36Sopenharmony_ci	pentry->nodeid = nodeid;
84862306a36Sopenharmony_ci	return pentry;
84962306a36Sopenharmony_ci}
85062306a36Sopenharmony_ci
85162306a36Sopenharmony_cistatic void free_processqueue_entry(struct processqueue_entry *pentry)
85262306a36Sopenharmony_ci{
85362306a36Sopenharmony_ci	kfree(pentry->buf);
85462306a36Sopenharmony_ci	kfree(pentry);
85562306a36Sopenharmony_ci}
85662306a36Sopenharmony_ci
85762306a36Sopenharmony_cistruct dlm_processed_nodes {
85862306a36Sopenharmony_ci	int nodeid;
85962306a36Sopenharmony_ci
86062306a36Sopenharmony_ci	struct list_head list;
86162306a36Sopenharmony_ci};
86262306a36Sopenharmony_ci
86362306a36Sopenharmony_cistatic void process_dlm_messages(struct work_struct *work)
86462306a36Sopenharmony_ci{
86562306a36Sopenharmony_ci	struct processqueue_entry *pentry;
86662306a36Sopenharmony_ci
86762306a36Sopenharmony_ci	spin_lock(&processqueue_lock);
86862306a36Sopenharmony_ci	pentry = list_first_entry_or_null(&processqueue,
86962306a36Sopenharmony_ci					  struct processqueue_entry, list);
87062306a36Sopenharmony_ci	if (WARN_ON_ONCE(!pentry)) {
87162306a36Sopenharmony_ci		process_dlm_messages_pending = false;
87262306a36Sopenharmony_ci		spin_unlock(&processqueue_lock);
87362306a36Sopenharmony_ci		return;
87462306a36Sopenharmony_ci	}
87562306a36Sopenharmony_ci
87662306a36Sopenharmony_ci	list_del(&pentry->list);
87762306a36Sopenharmony_ci	spin_unlock(&processqueue_lock);
87862306a36Sopenharmony_ci
87962306a36Sopenharmony_ci	for (;;) {
88062306a36Sopenharmony_ci		dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
88162306a36Sopenharmony_ci					    pentry->buflen);
88262306a36Sopenharmony_ci		free_processqueue_entry(pentry);
88362306a36Sopenharmony_ci
88462306a36Sopenharmony_ci		spin_lock(&processqueue_lock);
88562306a36Sopenharmony_ci		pentry = list_first_entry_or_null(&processqueue,
88662306a36Sopenharmony_ci						  struct processqueue_entry, list);
88762306a36Sopenharmony_ci		if (!pentry) {
88862306a36Sopenharmony_ci			process_dlm_messages_pending = false;
88962306a36Sopenharmony_ci			spin_unlock(&processqueue_lock);
89062306a36Sopenharmony_ci			break;
89162306a36Sopenharmony_ci		}
89262306a36Sopenharmony_ci
89362306a36Sopenharmony_ci		list_del(&pentry->list);
89462306a36Sopenharmony_ci		spin_unlock(&processqueue_lock);
89562306a36Sopenharmony_ci	}
89662306a36Sopenharmony_ci}
89762306a36Sopenharmony_ci
89862306a36Sopenharmony_ci/* Data received from remote end */
89962306a36Sopenharmony_cistatic int receive_from_sock(struct connection *con, int buflen)
90062306a36Sopenharmony_ci{
90162306a36Sopenharmony_ci	struct processqueue_entry *pentry;
90262306a36Sopenharmony_ci	int ret, buflen_real;
90362306a36Sopenharmony_ci	struct msghdr msg;
90462306a36Sopenharmony_ci	struct kvec iov;
90562306a36Sopenharmony_ci
90662306a36Sopenharmony_ci	pentry = new_processqueue_entry(con->nodeid, buflen);
90762306a36Sopenharmony_ci	if (!pentry)
90862306a36Sopenharmony_ci		return DLM_IO_RESCHED;
90962306a36Sopenharmony_ci
91062306a36Sopenharmony_ci	memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover);
91162306a36Sopenharmony_ci
91262306a36Sopenharmony_ci	/* calculate new buffer parameter regarding last receive and
91362306a36Sopenharmony_ci	 * possible leftover bytes
91462306a36Sopenharmony_ci	 */
91562306a36Sopenharmony_ci	iov.iov_base = pentry->buf + con->rx_leftover;
91662306a36Sopenharmony_ci	iov.iov_len = buflen - con->rx_leftover;
91762306a36Sopenharmony_ci
91862306a36Sopenharmony_ci	memset(&msg, 0, sizeof(msg));
91962306a36Sopenharmony_ci	msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
92062306a36Sopenharmony_ci	clear_bit(CF_RECV_INTR, &con->flags);
92162306a36Sopenharmony_ciagain:
92262306a36Sopenharmony_ci	ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
92362306a36Sopenharmony_ci			     msg.msg_flags);
92462306a36Sopenharmony_ci	trace_dlm_recv(con->nodeid, ret);
92562306a36Sopenharmony_ci	if (ret == -EAGAIN) {
92662306a36Sopenharmony_ci		lock_sock(con->sock->sk);
92762306a36Sopenharmony_ci		if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) {
92862306a36Sopenharmony_ci			release_sock(con->sock->sk);
92962306a36Sopenharmony_ci			goto again;
93062306a36Sopenharmony_ci		}
93162306a36Sopenharmony_ci
93262306a36Sopenharmony_ci		clear_bit(CF_RECV_PENDING, &con->flags);
93362306a36Sopenharmony_ci		release_sock(con->sock->sk);
93462306a36Sopenharmony_ci		free_processqueue_entry(pentry);
93562306a36Sopenharmony_ci		return DLM_IO_END;
93662306a36Sopenharmony_ci	} else if (ret == 0) {
93762306a36Sopenharmony_ci		/* close will clear CF_RECV_PENDING */
93862306a36Sopenharmony_ci		free_processqueue_entry(pentry);
93962306a36Sopenharmony_ci		return DLM_IO_EOF;
94062306a36Sopenharmony_ci	} else if (ret < 0) {
94162306a36Sopenharmony_ci		free_processqueue_entry(pentry);
94262306a36Sopenharmony_ci		return ret;
94362306a36Sopenharmony_ci	}
94462306a36Sopenharmony_ci
94562306a36Sopenharmony_ci	/* new buflen according readed bytes and leftover from last receive */
94662306a36Sopenharmony_ci	buflen_real = ret + con->rx_leftover;
94762306a36Sopenharmony_ci	ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf,
94862306a36Sopenharmony_ci					   buflen_real);
94962306a36Sopenharmony_ci	if (ret < 0) {
95062306a36Sopenharmony_ci		free_processqueue_entry(pentry);
95162306a36Sopenharmony_ci		return ret;
95262306a36Sopenharmony_ci	}
95362306a36Sopenharmony_ci
95462306a36Sopenharmony_ci	pentry->buflen = ret;
95562306a36Sopenharmony_ci
95662306a36Sopenharmony_ci	/* calculate leftover bytes from process and put it into begin of
95762306a36Sopenharmony_ci	 * the receive buffer, so next receive we have the full message
95862306a36Sopenharmony_ci	 * at the start address of the receive buffer.
95962306a36Sopenharmony_ci	 */
96062306a36Sopenharmony_ci	con->rx_leftover = buflen_real - ret;
96162306a36Sopenharmony_ci	memmove(con->rx_leftover_buf, pentry->buf + ret,
96262306a36Sopenharmony_ci		con->rx_leftover);
96362306a36Sopenharmony_ci
96462306a36Sopenharmony_ci	spin_lock(&processqueue_lock);
96562306a36Sopenharmony_ci	list_add_tail(&pentry->list, &processqueue);
96662306a36Sopenharmony_ci	if (!process_dlm_messages_pending) {
96762306a36Sopenharmony_ci		process_dlm_messages_pending = true;
96862306a36Sopenharmony_ci		queue_work(process_workqueue, &process_work);
96962306a36Sopenharmony_ci	}
97062306a36Sopenharmony_ci	spin_unlock(&processqueue_lock);
97162306a36Sopenharmony_ci
97262306a36Sopenharmony_ci	return DLM_IO_SUCCESS;
97362306a36Sopenharmony_ci}
97462306a36Sopenharmony_ci
97562306a36Sopenharmony_ci/* Listening socket is busy, accept a connection */
97662306a36Sopenharmony_cistatic int accept_from_sock(void)
97762306a36Sopenharmony_ci{
97862306a36Sopenharmony_ci	struct sockaddr_storage peeraddr;
97962306a36Sopenharmony_ci	int len, idx, result, nodeid;
98062306a36Sopenharmony_ci	struct connection *newcon;
98162306a36Sopenharmony_ci	struct socket *newsock;
98262306a36Sopenharmony_ci	unsigned int mark;
98362306a36Sopenharmony_ci
98462306a36Sopenharmony_ci	result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK);
98562306a36Sopenharmony_ci	if (result == -EAGAIN)
98662306a36Sopenharmony_ci		return DLM_IO_END;
98762306a36Sopenharmony_ci	else if (result < 0)
98862306a36Sopenharmony_ci		goto accept_err;
98962306a36Sopenharmony_ci
99062306a36Sopenharmony_ci	/* Get the connected socket's peer */
99162306a36Sopenharmony_ci	memset(&peeraddr, 0, sizeof(peeraddr));
99262306a36Sopenharmony_ci	len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2);
99362306a36Sopenharmony_ci	if (len < 0) {
99462306a36Sopenharmony_ci		result = -ECONNABORTED;
99562306a36Sopenharmony_ci		goto accept_err;
99662306a36Sopenharmony_ci	}
99762306a36Sopenharmony_ci
99862306a36Sopenharmony_ci	/* Get the new node's NODEID */
99962306a36Sopenharmony_ci	make_sockaddr(&peeraddr, 0, &len);
100062306a36Sopenharmony_ci	if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) {
100162306a36Sopenharmony_ci		switch (peeraddr.ss_family) {
100262306a36Sopenharmony_ci		case AF_INET: {
100362306a36Sopenharmony_ci			struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr;
100462306a36Sopenharmony_ci
100562306a36Sopenharmony_ci			log_print("connect from non cluster IPv4 node %pI4",
100662306a36Sopenharmony_ci				  &sin->sin_addr);
100762306a36Sopenharmony_ci			break;
100862306a36Sopenharmony_ci		}
100962306a36Sopenharmony_ci#if IS_ENABLED(CONFIG_IPV6)
101062306a36Sopenharmony_ci		case AF_INET6: {
101162306a36Sopenharmony_ci			struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr;
101262306a36Sopenharmony_ci
101362306a36Sopenharmony_ci			log_print("connect from non cluster IPv6 node %pI6c",
101462306a36Sopenharmony_ci				  &sin6->sin6_addr);
101562306a36Sopenharmony_ci			break;
101662306a36Sopenharmony_ci		}
101762306a36Sopenharmony_ci#endif
101862306a36Sopenharmony_ci		default:
101962306a36Sopenharmony_ci			log_print("invalid family from non cluster node");
102062306a36Sopenharmony_ci			break;
102162306a36Sopenharmony_ci		}
102262306a36Sopenharmony_ci
102362306a36Sopenharmony_ci		sock_release(newsock);
102462306a36Sopenharmony_ci		return -1;
102562306a36Sopenharmony_ci	}
102662306a36Sopenharmony_ci
102762306a36Sopenharmony_ci	log_print("got connection from %d", nodeid);
102862306a36Sopenharmony_ci
102962306a36Sopenharmony_ci	/*  Check to see if we already have a connection to this node. This
103062306a36Sopenharmony_ci	 *  could happen if the two nodes initiate a connection at roughly
103162306a36Sopenharmony_ci	 *  the same time and the connections cross on the wire.
103262306a36Sopenharmony_ci	 *  In this case we store the incoming one in "othercon"
103362306a36Sopenharmony_ci	 */
103462306a36Sopenharmony_ci	idx = srcu_read_lock(&connections_srcu);
103562306a36Sopenharmony_ci	newcon = nodeid2con(nodeid, 0);
103662306a36Sopenharmony_ci	if (WARN_ON_ONCE(!newcon)) {
103762306a36Sopenharmony_ci		srcu_read_unlock(&connections_srcu, idx);
103862306a36Sopenharmony_ci		result = -ENOENT;
103962306a36Sopenharmony_ci		goto accept_err;
104062306a36Sopenharmony_ci	}
104162306a36Sopenharmony_ci
104262306a36Sopenharmony_ci	sock_set_mark(newsock->sk, mark);
104362306a36Sopenharmony_ci
104462306a36Sopenharmony_ci	down_write(&newcon->sock_lock);
104562306a36Sopenharmony_ci	if (newcon->sock) {
104662306a36Sopenharmony_ci		struct connection *othercon = newcon->othercon;
104762306a36Sopenharmony_ci
104862306a36Sopenharmony_ci		if (!othercon) {
104962306a36Sopenharmony_ci			othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
105062306a36Sopenharmony_ci			if (!othercon) {
105162306a36Sopenharmony_ci				log_print("failed to allocate incoming socket");
105262306a36Sopenharmony_ci				up_write(&newcon->sock_lock);
105362306a36Sopenharmony_ci				srcu_read_unlock(&connections_srcu, idx);
105462306a36Sopenharmony_ci				result = -ENOMEM;
105562306a36Sopenharmony_ci				goto accept_err;
105662306a36Sopenharmony_ci			}
105762306a36Sopenharmony_ci
105862306a36Sopenharmony_ci			dlm_con_init(othercon, nodeid);
105962306a36Sopenharmony_ci			lockdep_set_subclass(&othercon->sock_lock, 1);
106062306a36Sopenharmony_ci			newcon->othercon = othercon;
106162306a36Sopenharmony_ci			set_bit(CF_IS_OTHERCON, &othercon->flags);
106262306a36Sopenharmony_ci		} else {
106362306a36Sopenharmony_ci			/* close other sock con if we have something new */
106462306a36Sopenharmony_ci			close_connection(othercon, false);
106562306a36Sopenharmony_ci		}
106662306a36Sopenharmony_ci
106762306a36Sopenharmony_ci		down_write(&othercon->sock_lock);
106862306a36Sopenharmony_ci		add_sock(newsock, othercon);
106962306a36Sopenharmony_ci
107062306a36Sopenharmony_ci		/* check if we receved something while adding */
107162306a36Sopenharmony_ci		lock_sock(othercon->sock->sk);
107262306a36Sopenharmony_ci		lowcomms_queue_rwork(othercon);
107362306a36Sopenharmony_ci		release_sock(othercon->sock->sk);
107462306a36Sopenharmony_ci		up_write(&othercon->sock_lock);
107562306a36Sopenharmony_ci	}
107662306a36Sopenharmony_ci	else {
107762306a36Sopenharmony_ci		/* accept copies the sk after we've saved the callbacks, so we
107862306a36Sopenharmony_ci		   don't want to save them a second time or comm errors will
107962306a36Sopenharmony_ci		   result in calling sk_error_report recursively. */
108062306a36Sopenharmony_ci		add_sock(newsock, newcon);
108162306a36Sopenharmony_ci
108262306a36Sopenharmony_ci		/* check if we receved something while adding */
108362306a36Sopenharmony_ci		lock_sock(newcon->sock->sk);
108462306a36Sopenharmony_ci		lowcomms_queue_rwork(newcon);
108562306a36Sopenharmony_ci		release_sock(newcon->sock->sk);
108662306a36Sopenharmony_ci	}
108762306a36Sopenharmony_ci	up_write(&newcon->sock_lock);
108862306a36Sopenharmony_ci	srcu_read_unlock(&connections_srcu, idx);
108962306a36Sopenharmony_ci
109062306a36Sopenharmony_ci	return DLM_IO_SUCCESS;
109162306a36Sopenharmony_ci
109262306a36Sopenharmony_ciaccept_err:
109362306a36Sopenharmony_ci	if (newsock)
109462306a36Sopenharmony_ci		sock_release(newsock);
109562306a36Sopenharmony_ci
109662306a36Sopenharmony_ci	return result;
109762306a36Sopenharmony_ci}
109862306a36Sopenharmony_ci
109962306a36Sopenharmony_ci/*
110062306a36Sopenharmony_ci * writequeue_entry_complete - try to delete and free write queue entry
110162306a36Sopenharmony_ci * @e: write queue entry to try to delete
110262306a36Sopenharmony_ci * @completed: bytes completed
110362306a36Sopenharmony_ci *
110462306a36Sopenharmony_ci * writequeue_lock must be held.
110562306a36Sopenharmony_ci */
110662306a36Sopenharmony_cistatic void writequeue_entry_complete(struct writequeue_entry *e, int completed)
110762306a36Sopenharmony_ci{
110862306a36Sopenharmony_ci	e->offset += completed;
110962306a36Sopenharmony_ci	e->len -= completed;
111062306a36Sopenharmony_ci	/* signal that page was half way transmitted */
111162306a36Sopenharmony_ci	e->dirty = true;
111262306a36Sopenharmony_ci
111362306a36Sopenharmony_ci	if (e->len == 0 && e->users == 0)
111462306a36Sopenharmony_ci		free_entry(e);
111562306a36Sopenharmony_ci}
111662306a36Sopenharmony_ci
111762306a36Sopenharmony_ci/*
111862306a36Sopenharmony_ci * sctp_bind_addrs - bind a SCTP socket to all our addresses
111962306a36Sopenharmony_ci */
112062306a36Sopenharmony_cistatic int sctp_bind_addrs(struct socket *sock, uint16_t port)
112162306a36Sopenharmony_ci{
112262306a36Sopenharmony_ci	struct sockaddr_storage localaddr;
112362306a36Sopenharmony_ci	struct sockaddr *addr = (struct sockaddr *)&localaddr;
112462306a36Sopenharmony_ci	int i, addr_len, result = 0;
112562306a36Sopenharmony_ci
112662306a36Sopenharmony_ci	for (i = 0; i < dlm_local_count; i++) {
112762306a36Sopenharmony_ci		memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr));
112862306a36Sopenharmony_ci		make_sockaddr(&localaddr, port, &addr_len);
112962306a36Sopenharmony_ci
113062306a36Sopenharmony_ci		if (!i)
113162306a36Sopenharmony_ci			result = kernel_bind(sock, addr, addr_len);
113262306a36Sopenharmony_ci		else
113362306a36Sopenharmony_ci			result = sock_bind_add(sock->sk, addr, addr_len);
113462306a36Sopenharmony_ci
113562306a36Sopenharmony_ci		if (result < 0) {
113662306a36Sopenharmony_ci			log_print("Can't bind to %d addr number %d, %d.\n",
113762306a36Sopenharmony_ci				  port, i + 1, result);
113862306a36Sopenharmony_ci			break;
113962306a36Sopenharmony_ci		}
114062306a36Sopenharmony_ci	}
114162306a36Sopenharmony_ci	return result;
114262306a36Sopenharmony_ci}
114362306a36Sopenharmony_ci
114462306a36Sopenharmony_ci/* Get local addresses */
114562306a36Sopenharmony_cistatic void init_local(void)
114662306a36Sopenharmony_ci{
114762306a36Sopenharmony_ci	struct sockaddr_storage sas;
114862306a36Sopenharmony_ci	int i;
114962306a36Sopenharmony_ci
115062306a36Sopenharmony_ci	dlm_local_count = 0;
115162306a36Sopenharmony_ci	for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
115262306a36Sopenharmony_ci		if (dlm_our_addr(&sas, i))
115362306a36Sopenharmony_ci			break;
115462306a36Sopenharmony_ci
115562306a36Sopenharmony_ci		memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas));
115662306a36Sopenharmony_ci	}
115762306a36Sopenharmony_ci}
115862306a36Sopenharmony_ci
115962306a36Sopenharmony_cistatic struct writequeue_entry *new_writequeue_entry(struct connection *con)
116062306a36Sopenharmony_ci{
116162306a36Sopenharmony_ci	struct writequeue_entry *entry;
116262306a36Sopenharmony_ci
116362306a36Sopenharmony_ci	entry = dlm_allocate_writequeue();
116462306a36Sopenharmony_ci	if (!entry)
116562306a36Sopenharmony_ci		return NULL;
116662306a36Sopenharmony_ci
116762306a36Sopenharmony_ci	entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO);
116862306a36Sopenharmony_ci	if (!entry->page) {
116962306a36Sopenharmony_ci		dlm_free_writequeue(entry);
117062306a36Sopenharmony_ci		return NULL;
117162306a36Sopenharmony_ci	}
117262306a36Sopenharmony_ci
117362306a36Sopenharmony_ci	entry->offset = 0;
117462306a36Sopenharmony_ci	entry->len = 0;
117562306a36Sopenharmony_ci	entry->end = 0;
117662306a36Sopenharmony_ci	entry->dirty = false;
117762306a36Sopenharmony_ci	entry->con = con;
117862306a36Sopenharmony_ci	entry->users = 1;
117962306a36Sopenharmony_ci	kref_init(&entry->ref);
118062306a36Sopenharmony_ci	return entry;
118162306a36Sopenharmony_ci}
118262306a36Sopenharmony_ci
118362306a36Sopenharmony_cistatic struct writequeue_entry *new_wq_entry(struct connection *con, int len,
118462306a36Sopenharmony_ci					     char **ppc, void (*cb)(void *data),
118562306a36Sopenharmony_ci					     void *data)
118662306a36Sopenharmony_ci{
118762306a36Sopenharmony_ci	struct writequeue_entry *e;
118862306a36Sopenharmony_ci
118962306a36Sopenharmony_ci	spin_lock_bh(&con->writequeue_lock);
119062306a36Sopenharmony_ci	if (!list_empty(&con->writequeue)) {
119162306a36Sopenharmony_ci		e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
119262306a36Sopenharmony_ci		if (DLM_WQ_REMAIN_BYTES(e) >= len) {
119362306a36Sopenharmony_ci			kref_get(&e->ref);
119462306a36Sopenharmony_ci
119562306a36Sopenharmony_ci			*ppc = page_address(e->page) + e->end;
119662306a36Sopenharmony_ci			if (cb)
119762306a36Sopenharmony_ci				cb(data);
119862306a36Sopenharmony_ci
119962306a36Sopenharmony_ci			e->end += len;
120062306a36Sopenharmony_ci			e->users++;
120162306a36Sopenharmony_ci			goto out;
120262306a36Sopenharmony_ci		}
120362306a36Sopenharmony_ci	}
120462306a36Sopenharmony_ci
120562306a36Sopenharmony_ci	e = new_writequeue_entry(con);
120662306a36Sopenharmony_ci	if (!e)
120762306a36Sopenharmony_ci		goto out;
120862306a36Sopenharmony_ci
120962306a36Sopenharmony_ci	kref_get(&e->ref);
121062306a36Sopenharmony_ci	*ppc = page_address(e->page);
121162306a36Sopenharmony_ci	e->end += len;
121262306a36Sopenharmony_ci	if (cb)
121362306a36Sopenharmony_ci		cb(data);
121462306a36Sopenharmony_ci
121562306a36Sopenharmony_ci	list_add_tail(&e->list, &con->writequeue);
121662306a36Sopenharmony_ci
121762306a36Sopenharmony_ciout:
121862306a36Sopenharmony_ci	spin_unlock_bh(&con->writequeue_lock);
121962306a36Sopenharmony_ci	return e;
122062306a36Sopenharmony_ci};
122162306a36Sopenharmony_ci
122262306a36Sopenharmony_cistatic struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
122362306a36Sopenharmony_ci						gfp_t allocation, char **ppc,
122462306a36Sopenharmony_ci						void (*cb)(void *data),
122562306a36Sopenharmony_ci						void *data)
122662306a36Sopenharmony_ci{
122762306a36Sopenharmony_ci	struct writequeue_entry *e;
122862306a36Sopenharmony_ci	struct dlm_msg *msg;
122962306a36Sopenharmony_ci
123062306a36Sopenharmony_ci	msg = dlm_allocate_msg(allocation);
123162306a36Sopenharmony_ci	if (!msg)
123262306a36Sopenharmony_ci		return NULL;
123362306a36Sopenharmony_ci
123462306a36Sopenharmony_ci	kref_init(&msg->ref);
123562306a36Sopenharmony_ci
123662306a36Sopenharmony_ci	e = new_wq_entry(con, len, ppc, cb, data);
123762306a36Sopenharmony_ci	if (!e) {
123862306a36Sopenharmony_ci		dlm_free_msg(msg);
123962306a36Sopenharmony_ci		return NULL;
124062306a36Sopenharmony_ci	}
124162306a36Sopenharmony_ci
124262306a36Sopenharmony_ci	msg->retransmit = false;
124362306a36Sopenharmony_ci	msg->orig_msg = NULL;
124462306a36Sopenharmony_ci	msg->ppc = *ppc;
124562306a36Sopenharmony_ci	msg->len = len;
124662306a36Sopenharmony_ci	msg->entry = e;
124762306a36Sopenharmony_ci
124862306a36Sopenharmony_ci	return msg;
124962306a36Sopenharmony_ci}
125062306a36Sopenharmony_ci
125162306a36Sopenharmony_ci/* avoid false positive for nodes_srcu, unlock happens in
125262306a36Sopenharmony_ci * dlm_lowcomms_commit_msg which is a must call if success
125362306a36Sopenharmony_ci */
125462306a36Sopenharmony_ci#ifndef __CHECKER__
125562306a36Sopenharmony_cistruct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
125662306a36Sopenharmony_ci				     char **ppc, void (*cb)(void *data),
125762306a36Sopenharmony_ci				     void *data)
125862306a36Sopenharmony_ci{
125962306a36Sopenharmony_ci	struct connection *con;
126062306a36Sopenharmony_ci	struct dlm_msg *msg;
126162306a36Sopenharmony_ci	int idx;
126262306a36Sopenharmony_ci
126362306a36Sopenharmony_ci	if (len > DLM_MAX_SOCKET_BUFSIZE ||
126462306a36Sopenharmony_ci	    len < sizeof(struct dlm_header)) {
126562306a36Sopenharmony_ci		BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE);
126662306a36Sopenharmony_ci		log_print("failed to allocate a buffer of size %d", len);
126762306a36Sopenharmony_ci		WARN_ON_ONCE(1);
126862306a36Sopenharmony_ci		return NULL;
126962306a36Sopenharmony_ci	}
127062306a36Sopenharmony_ci
127162306a36Sopenharmony_ci	idx = srcu_read_lock(&connections_srcu);
127262306a36Sopenharmony_ci	con = nodeid2con(nodeid, 0);
127362306a36Sopenharmony_ci	if (WARN_ON_ONCE(!con)) {
127462306a36Sopenharmony_ci		srcu_read_unlock(&connections_srcu, idx);
127562306a36Sopenharmony_ci		return NULL;
127662306a36Sopenharmony_ci	}
127762306a36Sopenharmony_ci
127862306a36Sopenharmony_ci	msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data);
127962306a36Sopenharmony_ci	if (!msg) {
128062306a36Sopenharmony_ci		srcu_read_unlock(&connections_srcu, idx);
128162306a36Sopenharmony_ci		return NULL;
128262306a36Sopenharmony_ci	}
128362306a36Sopenharmony_ci
128462306a36Sopenharmony_ci	/* for dlm_lowcomms_commit_msg() */
128562306a36Sopenharmony_ci	kref_get(&msg->ref);
128662306a36Sopenharmony_ci	/* we assume if successful commit must called */
128762306a36Sopenharmony_ci	msg->idx = idx;
128862306a36Sopenharmony_ci	return msg;
128962306a36Sopenharmony_ci}
129062306a36Sopenharmony_ci#endif
129162306a36Sopenharmony_ci
129262306a36Sopenharmony_cistatic void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
129362306a36Sopenharmony_ci{
129462306a36Sopenharmony_ci	struct writequeue_entry *e = msg->entry;
129562306a36Sopenharmony_ci	struct connection *con = e->con;
129662306a36Sopenharmony_ci	int users;
129762306a36Sopenharmony_ci
129862306a36Sopenharmony_ci	spin_lock_bh(&con->writequeue_lock);
129962306a36Sopenharmony_ci	kref_get(&msg->ref);
130062306a36Sopenharmony_ci	list_add(&msg->list, &e->msgs);
130162306a36Sopenharmony_ci
130262306a36Sopenharmony_ci	users = --e->users;
130362306a36Sopenharmony_ci	if (users)
130462306a36Sopenharmony_ci		goto out;
130562306a36Sopenharmony_ci
130662306a36Sopenharmony_ci	e->len = DLM_WQ_LENGTH_BYTES(e);
130762306a36Sopenharmony_ci
130862306a36Sopenharmony_ci	lowcomms_queue_swork(con);
130962306a36Sopenharmony_ci
131062306a36Sopenharmony_ciout:
131162306a36Sopenharmony_ci	spin_unlock_bh(&con->writequeue_lock);
131262306a36Sopenharmony_ci	return;
131362306a36Sopenharmony_ci}
131462306a36Sopenharmony_ci
131562306a36Sopenharmony_ci/* avoid false positive for nodes_srcu, lock was happen in
131662306a36Sopenharmony_ci * dlm_lowcomms_new_msg
131762306a36Sopenharmony_ci */
131862306a36Sopenharmony_ci#ifndef __CHECKER__
131962306a36Sopenharmony_civoid dlm_lowcomms_commit_msg(struct dlm_msg *msg)
132062306a36Sopenharmony_ci{
132162306a36Sopenharmony_ci	_dlm_lowcomms_commit_msg(msg);
132262306a36Sopenharmony_ci	srcu_read_unlock(&connections_srcu, msg->idx);
132362306a36Sopenharmony_ci	/* because dlm_lowcomms_new_msg() */
132462306a36Sopenharmony_ci	kref_put(&msg->ref, dlm_msg_release);
132562306a36Sopenharmony_ci}
132662306a36Sopenharmony_ci#endif
132762306a36Sopenharmony_ci
132862306a36Sopenharmony_civoid dlm_lowcomms_put_msg(struct dlm_msg *msg)
132962306a36Sopenharmony_ci{
133062306a36Sopenharmony_ci	kref_put(&msg->ref, dlm_msg_release);
133162306a36Sopenharmony_ci}
133262306a36Sopenharmony_ci
133362306a36Sopenharmony_ci/* does not held connections_srcu, usage lowcomms_error_report only */
133462306a36Sopenharmony_ciint dlm_lowcomms_resend_msg(struct dlm_msg *msg)
133562306a36Sopenharmony_ci{
133662306a36Sopenharmony_ci	struct dlm_msg *msg_resend;
133762306a36Sopenharmony_ci	char *ppc;
133862306a36Sopenharmony_ci
133962306a36Sopenharmony_ci	if (msg->retransmit)
134062306a36Sopenharmony_ci		return 1;
134162306a36Sopenharmony_ci
134262306a36Sopenharmony_ci	msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
134362306a36Sopenharmony_ci					      GFP_ATOMIC, &ppc, NULL, NULL);
134462306a36Sopenharmony_ci	if (!msg_resend)
134562306a36Sopenharmony_ci		return -ENOMEM;
134662306a36Sopenharmony_ci
134762306a36Sopenharmony_ci	msg->retransmit = true;
134862306a36Sopenharmony_ci	kref_get(&msg->ref);
134962306a36Sopenharmony_ci	msg_resend->orig_msg = msg;
135062306a36Sopenharmony_ci
135162306a36Sopenharmony_ci	memcpy(ppc, msg->ppc, msg->len);
135262306a36Sopenharmony_ci	_dlm_lowcomms_commit_msg(msg_resend);
135362306a36Sopenharmony_ci	dlm_lowcomms_put_msg(msg_resend);
135462306a36Sopenharmony_ci
135562306a36Sopenharmony_ci	return 0;
135662306a36Sopenharmony_ci}
135762306a36Sopenharmony_ci
135862306a36Sopenharmony_ci/* Send a message */
135962306a36Sopenharmony_cistatic int send_to_sock(struct connection *con)
136062306a36Sopenharmony_ci{
136162306a36Sopenharmony_ci	struct writequeue_entry *e;
136262306a36Sopenharmony_ci	struct bio_vec bvec;
136362306a36Sopenharmony_ci	struct msghdr msg = {
136462306a36Sopenharmony_ci		.msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT | MSG_NOSIGNAL,
136562306a36Sopenharmony_ci	};
136662306a36Sopenharmony_ci	int len, offset, ret;
136762306a36Sopenharmony_ci
136862306a36Sopenharmony_ci	spin_lock_bh(&con->writequeue_lock);
136962306a36Sopenharmony_ci	e = con_next_wq(con);
137062306a36Sopenharmony_ci	if (!e) {
137162306a36Sopenharmony_ci		clear_bit(CF_SEND_PENDING, &con->flags);
137262306a36Sopenharmony_ci		spin_unlock_bh(&con->writequeue_lock);
137362306a36Sopenharmony_ci		return DLM_IO_END;
137462306a36Sopenharmony_ci	}
137562306a36Sopenharmony_ci
137662306a36Sopenharmony_ci	len = e->len;
137762306a36Sopenharmony_ci	offset = e->offset;
137862306a36Sopenharmony_ci	WARN_ON_ONCE(len == 0 && e->users == 0);
137962306a36Sopenharmony_ci	spin_unlock_bh(&con->writequeue_lock);
138062306a36Sopenharmony_ci
138162306a36Sopenharmony_ci	bvec_set_page(&bvec, e->page, len, offset);
138262306a36Sopenharmony_ci	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, len);
138362306a36Sopenharmony_ci	ret = sock_sendmsg(con->sock, &msg);
138462306a36Sopenharmony_ci	trace_dlm_send(con->nodeid, ret);
138562306a36Sopenharmony_ci	if (ret == -EAGAIN || ret == 0) {
138662306a36Sopenharmony_ci		lock_sock(con->sock->sk);
138762306a36Sopenharmony_ci		spin_lock_bh(&con->writequeue_lock);
138862306a36Sopenharmony_ci		if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
138962306a36Sopenharmony_ci		    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
139062306a36Sopenharmony_ci			/* Notify TCP that we're limited by the
139162306a36Sopenharmony_ci			 * application window size.
139262306a36Sopenharmony_ci			 */
139362306a36Sopenharmony_ci			set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags);
139462306a36Sopenharmony_ci			con->sock->sk->sk_write_pending++;
139562306a36Sopenharmony_ci
139662306a36Sopenharmony_ci			clear_bit(CF_SEND_PENDING, &con->flags);
139762306a36Sopenharmony_ci			spin_unlock_bh(&con->writequeue_lock);
139862306a36Sopenharmony_ci			release_sock(con->sock->sk);
139962306a36Sopenharmony_ci
140062306a36Sopenharmony_ci			/* wait for write_space() event */
140162306a36Sopenharmony_ci			return DLM_IO_END;
140262306a36Sopenharmony_ci		}
140362306a36Sopenharmony_ci		spin_unlock_bh(&con->writequeue_lock);
140462306a36Sopenharmony_ci		release_sock(con->sock->sk);
140562306a36Sopenharmony_ci
140662306a36Sopenharmony_ci		return DLM_IO_RESCHED;
140762306a36Sopenharmony_ci	} else if (ret < 0) {
140862306a36Sopenharmony_ci		return ret;
140962306a36Sopenharmony_ci	}
141062306a36Sopenharmony_ci
141162306a36Sopenharmony_ci	spin_lock_bh(&con->writequeue_lock);
141262306a36Sopenharmony_ci	writequeue_entry_complete(e, ret);
141362306a36Sopenharmony_ci	spin_unlock_bh(&con->writequeue_lock);
141462306a36Sopenharmony_ci
141562306a36Sopenharmony_ci	return DLM_IO_SUCCESS;
141662306a36Sopenharmony_ci}
141762306a36Sopenharmony_ci
141862306a36Sopenharmony_cistatic void clean_one_writequeue(struct connection *con)
141962306a36Sopenharmony_ci{
142062306a36Sopenharmony_ci	struct writequeue_entry *e, *safe;
142162306a36Sopenharmony_ci
142262306a36Sopenharmony_ci	spin_lock_bh(&con->writequeue_lock);
142362306a36Sopenharmony_ci	list_for_each_entry_safe(e, safe, &con->writequeue, list) {
142462306a36Sopenharmony_ci		free_entry(e);
142562306a36Sopenharmony_ci	}
142662306a36Sopenharmony_ci	spin_unlock_bh(&con->writequeue_lock);
142762306a36Sopenharmony_ci}
142862306a36Sopenharmony_ci
142962306a36Sopenharmony_cistatic void connection_release(struct rcu_head *rcu)
143062306a36Sopenharmony_ci{
143162306a36Sopenharmony_ci	struct connection *con = container_of(rcu, struct connection, rcu);
143262306a36Sopenharmony_ci
143362306a36Sopenharmony_ci	WARN_ON_ONCE(!list_empty(&con->writequeue));
143462306a36Sopenharmony_ci	WARN_ON_ONCE(con->sock);
143562306a36Sopenharmony_ci	kfree(con);
143662306a36Sopenharmony_ci}
143762306a36Sopenharmony_ci
143862306a36Sopenharmony_ci/* Called from recovery when it knows that a node has
143962306a36Sopenharmony_ci   left the cluster */
144062306a36Sopenharmony_ciint dlm_lowcomms_close(int nodeid)
144162306a36Sopenharmony_ci{
144262306a36Sopenharmony_ci	struct connection *con;
144362306a36Sopenharmony_ci	int idx;
144462306a36Sopenharmony_ci
144562306a36Sopenharmony_ci	log_print("closing connection to node %d", nodeid);
144662306a36Sopenharmony_ci
144762306a36Sopenharmony_ci	idx = srcu_read_lock(&connections_srcu);
144862306a36Sopenharmony_ci	con = nodeid2con(nodeid, 0);
144962306a36Sopenharmony_ci	if (WARN_ON_ONCE(!con)) {
145062306a36Sopenharmony_ci		srcu_read_unlock(&connections_srcu, idx);
145162306a36Sopenharmony_ci		return -ENOENT;
145262306a36Sopenharmony_ci	}
145362306a36Sopenharmony_ci
145462306a36Sopenharmony_ci	stop_connection_io(con);
145562306a36Sopenharmony_ci	log_print("io handling for node: %d stopped", nodeid);
145662306a36Sopenharmony_ci	close_connection(con, true);
145762306a36Sopenharmony_ci
145862306a36Sopenharmony_ci	spin_lock(&connections_lock);
145962306a36Sopenharmony_ci	hlist_del_rcu(&con->list);
146062306a36Sopenharmony_ci	spin_unlock(&connections_lock);
146162306a36Sopenharmony_ci
146262306a36Sopenharmony_ci	clean_one_writequeue(con);
146362306a36Sopenharmony_ci	call_srcu(&connections_srcu, &con->rcu, connection_release);
146462306a36Sopenharmony_ci	if (con->othercon) {
146562306a36Sopenharmony_ci		clean_one_writequeue(con->othercon);
146662306a36Sopenharmony_ci		call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
146762306a36Sopenharmony_ci	}
146862306a36Sopenharmony_ci	srcu_read_unlock(&connections_srcu, idx);
146962306a36Sopenharmony_ci
147062306a36Sopenharmony_ci	/* for debugging we print when we are done to compare with other
147162306a36Sopenharmony_ci	 * messages in between. This function need to be correctly synchronized
147262306a36Sopenharmony_ci	 * with io handling
147362306a36Sopenharmony_ci	 */
147462306a36Sopenharmony_ci	log_print("closing connection to node %d done", nodeid);
147562306a36Sopenharmony_ci
147662306a36Sopenharmony_ci	return 0;
147762306a36Sopenharmony_ci}
147862306a36Sopenharmony_ci
147962306a36Sopenharmony_ci/* Receive worker function */
148062306a36Sopenharmony_cistatic void process_recv_sockets(struct work_struct *work)
148162306a36Sopenharmony_ci{
148262306a36Sopenharmony_ci	struct connection *con = container_of(work, struct connection, rwork);
148362306a36Sopenharmony_ci	int ret, buflen;
148462306a36Sopenharmony_ci
148562306a36Sopenharmony_ci	down_read(&con->sock_lock);
148662306a36Sopenharmony_ci	if (!con->sock) {
148762306a36Sopenharmony_ci		up_read(&con->sock_lock);
148862306a36Sopenharmony_ci		return;
148962306a36Sopenharmony_ci	}
149062306a36Sopenharmony_ci
149162306a36Sopenharmony_ci	buflen = READ_ONCE(dlm_config.ci_buffer_size);
149262306a36Sopenharmony_ci	do {
149362306a36Sopenharmony_ci		ret = receive_from_sock(con, buflen);
149462306a36Sopenharmony_ci	} while (ret == DLM_IO_SUCCESS);
149562306a36Sopenharmony_ci	up_read(&con->sock_lock);
149662306a36Sopenharmony_ci
149762306a36Sopenharmony_ci	switch (ret) {
149862306a36Sopenharmony_ci	case DLM_IO_END:
149962306a36Sopenharmony_ci		/* CF_RECV_PENDING cleared */
150062306a36Sopenharmony_ci		break;
150162306a36Sopenharmony_ci	case DLM_IO_EOF:
150262306a36Sopenharmony_ci		close_connection(con, false);
150362306a36Sopenharmony_ci		wake_up(&con->shutdown_wait);
150462306a36Sopenharmony_ci		/* CF_RECV_PENDING cleared */
150562306a36Sopenharmony_ci		break;
150662306a36Sopenharmony_ci	case DLM_IO_RESCHED:
150762306a36Sopenharmony_ci		cond_resched();
150862306a36Sopenharmony_ci		queue_work(io_workqueue, &con->rwork);
150962306a36Sopenharmony_ci		/* CF_RECV_PENDING not cleared */
151062306a36Sopenharmony_ci		break;
151162306a36Sopenharmony_ci	default:
151262306a36Sopenharmony_ci		if (ret < 0) {
151362306a36Sopenharmony_ci			if (test_bit(CF_IS_OTHERCON, &con->flags)) {
151462306a36Sopenharmony_ci				close_connection(con, false);
151562306a36Sopenharmony_ci			} else {
151662306a36Sopenharmony_ci				spin_lock_bh(&con->writequeue_lock);
151762306a36Sopenharmony_ci				lowcomms_queue_swork(con);
151862306a36Sopenharmony_ci				spin_unlock_bh(&con->writequeue_lock);
151962306a36Sopenharmony_ci			}
152062306a36Sopenharmony_ci
152162306a36Sopenharmony_ci			/* CF_RECV_PENDING cleared for othercon
152262306a36Sopenharmony_ci			 * we trigger send queue if not already done
152362306a36Sopenharmony_ci			 * and process_send_sockets will handle it
152462306a36Sopenharmony_ci			 */
152562306a36Sopenharmony_ci			break;
152662306a36Sopenharmony_ci		}
152762306a36Sopenharmony_ci
152862306a36Sopenharmony_ci		WARN_ON_ONCE(1);
152962306a36Sopenharmony_ci		break;
153062306a36Sopenharmony_ci	}
153162306a36Sopenharmony_ci}
153262306a36Sopenharmony_ci
153362306a36Sopenharmony_cistatic void process_listen_recv_socket(struct work_struct *work)
153462306a36Sopenharmony_ci{
153562306a36Sopenharmony_ci	int ret;
153662306a36Sopenharmony_ci
153762306a36Sopenharmony_ci	if (WARN_ON_ONCE(!listen_con.sock))
153862306a36Sopenharmony_ci		return;
153962306a36Sopenharmony_ci
154062306a36Sopenharmony_ci	do {
154162306a36Sopenharmony_ci		ret = accept_from_sock();
154262306a36Sopenharmony_ci	} while (ret == DLM_IO_SUCCESS);
154362306a36Sopenharmony_ci
154462306a36Sopenharmony_ci	if (ret < 0)
154562306a36Sopenharmony_ci		log_print("critical error accepting connection: %d", ret);
154662306a36Sopenharmony_ci}
154762306a36Sopenharmony_ci
154862306a36Sopenharmony_cistatic int dlm_connect(struct connection *con)
154962306a36Sopenharmony_ci{
155062306a36Sopenharmony_ci	struct sockaddr_storage addr;
155162306a36Sopenharmony_ci	int result, addr_len;
155262306a36Sopenharmony_ci	struct socket *sock;
155362306a36Sopenharmony_ci	unsigned int mark;
155462306a36Sopenharmony_ci
155562306a36Sopenharmony_ci	memset(&addr, 0, sizeof(addr));
155662306a36Sopenharmony_ci	result = nodeid_to_addr(con->nodeid, &addr, NULL,
155762306a36Sopenharmony_ci				dlm_proto_ops->try_new_addr, &mark);
155862306a36Sopenharmony_ci	if (result < 0) {
155962306a36Sopenharmony_ci		log_print("no address for nodeid %d", con->nodeid);
156062306a36Sopenharmony_ci		return result;
156162306a36Sopenharmony_ci	}
156262306a36Sopenharmony_ci
156362306a36Sopenharmony_ci	/* Create a socket to communicate with */
156462306a36Sopenharmony_ci	result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
156562306a36Sopenharmony_ci				  SOCK_STREAM, dlm_proto_ops->proto, &sock);
156662306a36Sopenharmony_ci	if (result < 0)
156762306a36Sopenharmony_ci		return result;
156862306a36Sopenharmony_ci
156962306a36Sopenharmony_ci	sock_set_mark(sock->sk, mark);
157062306a36Sopenharmony_ci	dlm_proto_ops->sockopts(sock);
157162306a36Sopenharmony_ci
157262306a36Sopenharmony_ci	result = dlm_proto_ops->bind(sock);
157362306a36Sopenharmony_ci	if (result < 0) {
157462306a36Sopenharmony_ci		sock_release(sock);
157562306a36Sopenharmony_ci		return result;
157662306a36Sopenharmony_ci	}
157762306a36Sopenharmony_ci
157862306a36Sopenharmony_ci	add_sock(sock, con);
157962306a36Sopenharmony_ci
158062306a36Sopenharmony_ci	log_print_ratelimited("connecting to %d", con->nodeid);
158162306a36Sopenharmony_ci	make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
158262306a36Sopenharmony_ci	result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
158362306a36Sopenharmony_ci					addr_len);
158462306a36Sopenharmony_ci	switch (result) {
158562306a36Sopenharmony_ci	case -EINPROGRESS:
158662306a36Sopenharmony_ci		/* not an error */
158762306a36Sopenharmony_ci		fallthrough;
158862306a36Sopenharmony_ci	case 0:
158962306a36Sopenharmony_ci		break;
159062306a36Sopenharmony_ci	default:
159162306a36Sopenharmony_ci		if (result < 0)
159262306a36Sopenharmony_ci			dlm_close_sock(&con->sock);
159362306a36Sopenharmony_ci
159462306a36Sopenharmony_ci		break;
159562306a36Sopenharmony_ci	}
159662306a36Sopenharmony_ci
159762306a36Sopenharmony_ci	return result;
159862306a36Sopenharmony_ci}
159962306a36Sopenharmony_ci
160062306a36Sopenharmony_ci/* Send worker function */
160162306a36Sopenharmony_cistatic void process_send_sockets(struct work_struct *work)
160262306a36Sopenharmony_ci{
160362306a36Sopenharmony_ci	struct connection *con = container_of(work, struct connection, swork);
160462306a36Sopenharmony_ci	int ret;
160562306a36Sopenharmony_ci
160662306a36Sopenharmony_ci	WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags));
160762306a36Sopenharmony_ci
160862306a36Sopenharmony_ci	down_read(&con->sock_lock);
160962306a36Sopenharmony_ci	if (!con->sock) {
161062306a36Sopenharmony_ci		up_read(&con->sock_lock);
161162306a36Sopenharmony_ci		down_write(&con->sock_lock);
161262306a36Sopenharmony_ci		if (!con->sock) {
161362306a36Sopenharmony_ci			ret = dlm_connect(con);
161462306a36Sopenharmony_ci			switch (ret) {
161562306a36Sopenharmony_ci			case 0:
161662306a36Sopenharmony_ci				break;
161762306a36Sopenharmony_ci			case -EINPROGRESS:
161862306a36Sopenharmony_ci				/* avoid spamming resched on connection
161962306a36Sopenharmony_ci				 * we might can switch to a state_change
162062306a36Sopenharmony_ci				 * event based mechanism if established
162162306a36Sopenharmony_ci				 */
162262306a36Sopenharmony_ci				msleep(100);
162362306a36Sopenharmony_ci				break;
162462306a36Sopenharmony_ci			default:
162562306a36Sopenharmony_ci				/* CF_SEND_PENDING not cleared */
162662306a36Sopenharmony_ci				up_write(&con->sock_lock);
162762306a36Sopenharmony_ci				log_print("connect to node %d try %d error %d",
162862306a36Sopenharmony_ci					  con->nodeid, con->retries++, ret);
162962306a36Sopenharmony_ci				msleep(1000);
163062306a36Sopenharmony_ci				/* For now we try forever to reconnect. In
163162306a36Sopenharmony_ci				 * future we should send a event to cluster
163262306a36Sopenharmony_ci				 * manager to fence itself after certain amount
163362306a36Sopenharmony_ci				 * of retries.
163462306a36Sopenharmony_ci				 */
163562306a36Sopenharmony_ci				queue_work(io_workqueue, &con->swork);
163662306a36Sopenharmony_ci				return;
163762306a36Sopenharmony_ci			}
163862306a36Sopenharmony_ci		}
163962306a36Sopenharmony_ci		downgrade_write(&con->sock_lock);
164062306a36Sopenharmony_ci	}
164162306a36Sopenharmony_ci
164262306a36Sopenharmony_ci	do {
164362306a36Sopenharmony_ci		ret = send_to_sock(con);
164462306a36Sopenharmony_ci	} while (ret == DLM_IO_SUCCESS);
164562306a36Sopenharmony_ci	up_read(&con->sock_lock);
164662306a36Sopenharmony_ci
164762306a36Sopenharmony_ci	switch (ret) {
164862306a36Sopenharmony_ci	case DLM_IO_END:
164962306a36Sopenharmony_ci		/* CF_SEND_PENDING cleared */
165062306a36Sopenharmony_ci		break;
165162306a36Sopenharmony_ci	case DLM_IO_RESCHED:
165262306a36Sopenharmony_ci		/* CF_SEND_PENDING not cleared */
165362306a36Sopenharmony_ci		cond_resched();
165462306a36Sopenharmony_ci		queue_work(io_workqueue, &con->swork);
165562306a36Sopenharmony_ci		break;
165662306a36Sopenharmony_ci	default:
165762306a36Sopenharmony_ci		if (ret < 0) {
165862306a36Sopenharmony_ci			close_connection(con, false);
165962306a36Sopenharmony_ci
166062306a36Sopenharmony_ci			/* CF_SEND_PENDING cleared */
166162306a36Sopenharmony_ci			spin_lock_bh(&con->writequeue_lock);
166262306a36Sopenharmony_ci			lowcomms_queue_swork(con);
166362306a36Sopenharmony_ci			spin_unlock_bh(&con->writequeue_lock);
166462306a36Sopenharmony_ci			break;
166562306a36Sopenharmony_ci		}
166662306a36Sopenharmony_ci
166762306a36Sopenharmony_ci		WARN_ON_ONCE(1);
166862306a36Sopenharmony_ci		break;
166962306a36Sopenharmony_ci	}
167062306a36Sopenharmony_ci}
167162306a36Sopenharmony_ci
167262306a36Sopenharmony_cistatic void work_stop(void)
167362306a36Sopenharmony_ci{
167462306a36Sopenharmony_ci	if (io_workqueue) {
167562306a36Sopenharmony_ci		destroy_workqueue(io_workqueue);
167662306a36Sopenharmony_ci		io_workqueue = NULL;
167762306a36Sopenharmony_ci	}
167862306a36Sopenharmony_ci
167962306a36Sopenharmony_ci	if (process_workqueue) {
168062306a36Sopenharmony_ci		destroy_workqueue(process_workqueue);
168162306a36Sopenharmony_ci		process_workqueue = NULL;
168262306a36Sopenharmony_ci	}
168362306a36Sopenharmony_ci}
168462306a36Sopenharmony_ci
168562306a36Sopenharmony_cistatic int work_start(void)
168662306a36Sopenharmony_ci{
168762306a36Sopenharmony_ci	io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM |
168862306a36Sopenharmony_ci				       WQ_UNBOUND, 0);
168962306a36Sopenharmony_ci	if (!io_workqueue) {
169062306a36Sopenharmony_ci		log_print("can't start dlm_io");
169162306a36Sopenharmony_ci		return -ENOMEM;
169262306a36Sopenharmony_ci	}
169362306a36Sopenharmony_ci
169462306a36Sopenharmony_ci	/* ordered dlm message process queue,
169562306a36Sopenharmony_ci	 * should be converted to a tasklet
169662306a36Sopenharmony_ci	 */
169762306a36Sopenharmony_ci	process_workqueue = alloc_ordered_workqueue("dlm_process",
169862306a36Sopenharmony_ci						    WQ_HIGHPRI | WQ_MEM_RECLAIM);
169962306a36Sopenharmony_ci	if (!process_workqueue) {
170062306a36Sopenharmony_ci		log_print("can't start dlm_process");
170162306a36Sopenharmony_ci		destroy_workqueue(io_workqueue);
170262306a36Sopenharmony_ci		io_workqueue = NULL;
170362306a36Sopenharmony_ci		return -ENOMEM;
170462306a36Sopenharmony_ci	}
170562306a36Sopenharmony_ci
170662306a36Sopenharmony_ci	return 0;
170762306a36Sopenharmony_ci}
170862306a36Sopenharmony_ci
170962306a36Sopenharmony_civoid dlm_lowcomms_shutdown(void)
171062306a36Sopenharmony_ci{
171162306a36Sopenharmony_ci	struct connection *con;
171262306a36Sopenharmony_ci	int i, idx;
171362306a36Sopenharmony_ci
171462306a36Sopenharmony_ci	/* stop lowcomms_listen_data_ready calls */
171562306a36Sopenharmony_ci	lock_sock(listen_con.sock->sk);
171662306a36Sopenharmony_ci	listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready;
171762306a36Sopenharmony_ci	release_sock(listen_con.sock->sk);
171862306a36Sopenharmony_ci
171962306a36Sopenharmony_ci	cancel_work_sync(&listen_con.rwork);
172062306a36Sopenharmony_ci	dlm_close_sock(&listen_con.sock);
172162306a36Sopenharmony_ci
172262306a36Sopenharmony_ci	idx = srcu_read_lock(&connections_srcu);
172362306a36Sopenharmony_ci	for (i = 0; i < CONN_HASH_SIZE; i++) {
172462306a36Sopenharmony_ci		hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
172562306a36Sopenharmony_ci			shutdown_connection(con, true);
172662306a36Sopenharmony_ci			stop_connection_io(con);
172762306a36Sopenharmony_ci			flush_workqueue(process_workqueue);
172862306a36Sopenharmony_ci			close_connection(con, true);
172962306a36Sopenharmony_ci
173062306a36Sopenharmony_ci			clean_one_writequeue(con);
173162306a36Sopenharmony_ci			if (con->othercon)
173262306a36Sopenharmony_ci				clean_one_writequeue(con->othercon);
173362306a36Sopenharmony_ci			allow_connection_io(con);
173462306a36Sopenharmony_ci		}
173562306a36Sopenharmony_ci	}
173662306a36Sopenharmony_ci	srcu_read_unlock(&connections_srcu, idx);
173762306a36Sopenharmony_ci}
173862306a36Sopenharmony_ci
173962306a36Sopenharmony_civoid dlm_lowcomms_stop(void)
174062306a36Sopenharmony_ci{
174162306a36Sopenharmony_ci	work_stop();
174262306a36Sopenharmony_ci	dlm_proto_ops = NULL;
174362306a36Sopenharmony_ci}
174462306a36Sopenharmony_ci
174562306a36Sopenharmony_cistatic int dlm_listen_for_all(void)
174662306a36Sopenharmony_ci{
174762306a36Sopenharmony_ci	struct socket *sock;
174862306a36Sopenharmony_ci	int result;
174962306a36Sopenharmony_ci
175062306a36Sopenharmony_ci	log_print("Using %s for communications",
175162306a36Sopenharmony_ci		  dlm_proto_ops->name);
175262306a36Sopenharmony_ci
175362306a36Sopenharmony_ci	result = dlm_proto_ops->listen_validate();
175462306a36Sopenharmony_ci	if (result < 0)
175562306a36Sopenharmony_ci		return result;
175662306a36Sopenharmony_ci
175762306a36Sopenharmony_ci	result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
175862306a36Sopenharmony_ci				  SOCK_STREAM, dlm_proto_ops->proto, &sock);
175962306a36Sopenharmony_ci	if (result < 0) {
176062306a36Sopenharmony_ci		log_print("Can't create comms socket: %d", result);
176162306a36Sopenharmony_ci		return result;
176262306a36Sopenharmony_ci	}
176362306a36Sopenharmony_ci
176462306a36Sopenharmony_ci	sock_set_mark(sock->sk, dlm_config.ci_mark);
176562306a36Sopenharmony_ci	dlm_proto_ops->listen_sockopts(sock);
176662306a36Sopenharmony_ci
176762306a36Sopenharmony_ci	result = dlm_proto_ops->listen_bind(sock);
176862306a36Sopenharmony_ci	if (result < 0)
176962306a36Sopenharmony_ci		goto out;
177062306a36Sopenharmony_ci
177162306a36Sopenharmony_ci	lock_sock(sock->sk);
177262306a36Sopenharmony_ci	listen_sock.sk_data_ready = sock->sk->sk_data_ready;
177362306a36Sopenharmony_ci	listen_sock.sk_write_space = sock->sk->sk_write_space;
177462306a36Sopenharmony_ci	listen_sock.sk_error_report = sock->sk->sk_error_report;
177562306a36Sopenharmony_ci	listen_sock.sk_state_change = sock->sk->sk_state_change;
177662306a36Sopenharmony_ci
177762306a36Sopenharmony_ci	listen_con.sock = sock;
177862306a36Sopenharmony_ci
177962306a36Sopenharmony_ci	sock->sk->sk_allocation = GFP_NOFS;
178062306a36Sopenharmony_ci	sock->sk->sk_use_task_frag = false;
178162306a36Sopenharmony_ci	sock->sk->sk_data_ready = lowcomms_listen_data_ready;
178262306a36Sopenharmony_ci	release_sock(sock->sk);
178362306a36Sopenharmony_ci
178462306a36Sopenharmony_ci	result = sock->ops->listen(sock, 128);
178562306a36Sopenharmony_ci	if (result < 0) {
178662306a36Sopenharmony_ci		dlm_close_sock(&listen_con.sock);
178762306a36Sopenharmony_ci		return result;
178862306a36Sopenharmony_ci	}
178962306a36Sopenharmony_ci
179062306a36Sopenharmony_ci	return 0;
179162306a36Sopenharmony_ci
179262306a36Sopenharmony_ciout:
179362306a36Sopenharmony_ci	sock_release(sock);
179462306a36Sopenharmony_ci	return result;
179562306a36Sopenharmony_ci}
179662306a36Sopenharmony_ci
179762306a36Sopenharmony_cistatic int dlm_tcp_bind(struct socket *sock)
179862306a36Sopenharmony_ci{
179962306a36Sopenharmony_ci	struct sockaddr_storage src_addr;
180062306a36Sopenharmony_ci	int result, addr_len;
180162306a36Sopenharmony_ci
180262306a36Sopenharmony_ci	/* Bind to our cluster-known address connecting to avoid
180362306a36Sopenharmony_ci	 * routing problems.
180462306a36Sopenharmony_ci	 */
180562306a36Sopenharmony_ci	memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr));
180662306a36Sopenharmony_ci	make_sockaddr(&src_addr, 0, &addr_len);
180762306a36Sopenharmony_ci
180862306a36Sopenharmony_ci	result = kernel_bind(sock, (struct sockaddr *)&src_addr,
180962306a36Sopenharmony_ci			     addr_len);
181062306a36Sopenharmony_ci	if (result < 0) {
181162306a36Sopenharmony_ci		/* This *may* not indicate a critical error */
181262306a36Sopenharmony_ci		log_print("could not bind for connect: %d", result);
181362306a36Sopenharmony_ci	}
181462306a36Sopenharmony_ci
181562306a36Sopenharmony_ci	return 0;
181662306a36Sopenharmony_ci}
181762306a36Sopenharmony_ci
181862306a36Sopenharmony_cistatic int dlm_tcp_connect(struct connection *con, struct socket *sock,
181962306a36Sopenharmony_ci			   struct sockaddr *addr, int addr_len)
182062306a36Sopenharmony_ci{
182162306a36Sopenharmony_ci	return kernel_connect(sock, addr, addr_len, O_NONBLOCK);
182262306a36Sopenharmony_ci}
182362306a36Sopenharmony_ci
182462306a36Sopenharmony_cistatic int dlm_tcp_listen_validate(void)
182562306a36Sopenharmony_ci{
182662306a36Sopenharmony_ci	/* We don't support multi-homed hosts */
182762306a36Sopenharmony_ci	if (dlm_local_count > 1) {
182862306a36Sopenharmony_ci		log_print("TCP protocol can't handle multi-homed hosts, try SCTP");
182962306a36Sopenharmony_ci		return -EINVAL;
183062306a36Sopenharmony_ci	}
183162306a36Sopenharmony_ci
183262306a36Sopenharmony_ci	return 0;
183362306a36Sopenharmony_ci}
183462306a36Sopenharmony_ci
183562306a36Sopenharmony_cistatic void dlm_tcp_sockopts(struct socket *sock)
183662306a36Sopenharmony_ci{
183762306a36Sopenharmony_ci	/* Turn off Nagle's algorithm */
183862306a36Sopenharmony_ci	tcp_sock_set_nodelay(sock->sk);
183962306a36Sopenharmony_ci}
184062306a36Sopenharmony_ci
184162306a36Sopenharmony_cistatic void dlm_tcp_listen_sockopts(struct socket *sock)
184262306a36Sopenharmony_ci{
184362306a36Sopenharmony_ci	dlm_tcp_sockopts(sock);
184462306a36Sopenharmony_ci	sock_set_reuseaddr(sock->sk);
184562306a36Sopenharmony_ci}
184662306a36Sopenharmony_ci
184762306a36Sopenharmony_cistatic int dlm_tcp_listen_bind(struct socket *sock)
184862306a36Sopenharmony_ci{
184962306a36Sopenharmony_ci	int addr_len;
185062306a36Sopenharmony_ci
185162306a36Sopenharmony_ci	/* Bind to our port */
185262306a36Sopenharmony_ci	make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
185362306a36Sopenharmony_ci	return kernel_bind(sock, (struct sockaddr *)&dlm_local_addr[0],
185462306a36Sopenharmony_ci			   addr_len);
185562306a36Sopenharmony_ci}
185662306a36Sopenharmony_ci
185762306a36Sopenharmony_cistatic const struct dlm_proto_ops dlm_tcp_ops = {
185862306a36Sopenharmony_ci	.name = "TCP",
185962306a36Sopenharmony_ci	.proto = IPPROTO_TCP,
186062306a36Sopenharmony_ci	.connect = dlm_tcp_connect,
186162306a36Sopenharmony_ci	.sockopts = dlm_tcp_sockopts,
186262306a36Sopenharmony_ci	.bind = dlm_tcp_bind,
186362306a36Sopenharmony_ci	.listen_validate = dlm_tcp_listen_validate,
186462306a36Sopenharmony_ci	.listen_sockopts = dlm_tcp_listen_sockopts,
186562306a36Sopenharmony_ci	.listen_bind = dlm_tcp_listen_bind,
186662306a36Sopenharmony_ci};
186762306a36Sopenharmony_ci
186862306a36Sopenharmony_cistatic int dlm_sctp_bind(struct socket *sock)
186962306a36Sopenharmony_ci{
187062306a36Sopenharmony_ci	return sctp_bind_addrs(sock, 0);
187162306a36Sopenharmony_ci}
187262306a36Sopenharmony_ci
187362306a36Sopenharmony_cistatic int dlm_sctp_connect(struct connection *con, struct socket *sock,
187462306a36Sopenharmony_ci			    struct sockaddr *addr, int addr_len)
187562306a36Sopenharmony_ci{
187662306a36Sopenharmony_ci	int ret;
187762306a36Sopenharmony_ci
187862306a36Sopenharmony_ci	/*
187962306a36Sopenharmony_ci	 * Make kernel_connect() function return in specified time,
188062306a36Sopenharmony_ci	 * since O_NONBLOCK argument in connect() function does not work here,
188162306a36Sopenharmony_ci	 * then, we should restore the default value of this attribute.
188262306a36Sopenharmony_ci	 */
188362306a36Sopenharmony_ci	sock_set_sndtimeo(sock->sk, 5);
188462306a36Sopenharmony_ci	ret = kernel_connect(sock, addr, addr_len, 0);
188562306a36Sopenharmony_ci	sock_set_sndtimeo(sock->sk, 0);
188662306a36Sopenharmony_ci	return ret;
188762306a36Sopenharmony_ci}
188862306a36Sopenharmony_ci
188962306a36Sopenharmony_cistatic int dlm_sctp_listen_validate(void)
189062306a36Sopenharmony_ci{
189162306a36Sopenharmony_ci	if (!IS_ENABLED(CONFIG_IP_SCTP)) {
189262306a36Sopenharmony_ci		log_print("SCTP is not enabled by this kernel");
189362306a36Sopenharmony_ci		return -EOPNOTSUPP;
189462306a36Sopenharmony_ci	}
189562306a36Sopenharmony_ci
189662306a36Sopenharmony_ci	request_module("sctp");
189762306a36Sopenharmony_ci	return 0;
189862306a36Sopenharmony_ci}
189962306a36Sopenharmony_ci
190062306a36Sopenharmony_cistatic int dlm_sctp_bind_listen(struct socket *sock)
190162306a36Sopenharmony_ci{
190262306a36Sopenharmony_ci	return sctp_bind_addrs(sock, dlm_config.ci_tcp_port);
190362306a36Sopenharmony_ci}
190462306a36Sopenharmony_ci
190562306a36Sopenharmony_cistatic void dlm_sctp_sockopts(struct socket *sock)
190662306a36Sopenharmony_ci{
190762306a36Sopenharmony_ci	/* Turn off Nagle's algorithm */
190862306a36Sopenharmony_ci	sctp_sock_set_nodelay(sock->sk);
190962306a36Sopenharmony_ci	sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
191062306a36Sopenharmony_ci}
191162306a36Sopenharmony_ci
191262306a36Sopenharmony_cistatic const struct dlm_proto_ops dlm_sctp_ops = {
191362306a36Sopenharmony_ci	.name = "SCTP",
191462306a36Sopenharmony_ci	.proto = IPPROTO_SCTP,
191562306a36Sopenharmony_ci	.try_new_addr = true,
191662306a36Sopenharmony_ci	.connect = dlm_sctp_connect,
191762306a36Sopenharmony_ci	.sockopts = dlm_sctp_sockopts,
191862306a36Sopenharmony_ci	.bind = dlm_sctp_bind,
191962306a36Sopenharmony_ci	.listen_validate = dlm_sctp_listen_validate,
192062306a36Sopenharmony_ci	.listen_sockopts = dlm_sctp_sockopts,
192162306a36Sopenharmony_ci	.listen_bind = dlm_sctp_bind_listen,
192262306a36Sopenharmony_ci};
192362306a36Sopenharmony_ci
192462306a36Sopenharmony_ciint dlm_lowcomms_start(void)
192562306a36Sopenharmony_ci{
192662306a36Sopenharmony_ci	int error;
192762306a36Sopenharmony_ci
192862306a36Sopenharmony_ci	init_local();
192962306a36Sopenharmony_ci	if (!dlm_local_count) {
193062306a36Sopenharmony_ci		error = -ENOTCONN;
193162306a36Sopenharmony_ci		log_print("no local IP address has been set");
193262306a36Sopenharmony_ci		goto fail;
193362306a36Sopenharmony_ci	}
193462306a36Sopenharmony_ci
193562306a36Sopenharmony_ci	error = work_start();
193662306a36Sopenharmony_ci	if (error)
193762306a36Sopenharmony_ci		goto fail;
193862306a36Sopenharmony_ci
193962306a36Sopenharmony_ci	/* Start listening */
194062306a36Sopenharmony_ci	switch (dlm_config.ci_protocol) {
194162306a36Sopenharmony_ci	case DLM_PROTO_TCP:
194262306a36Sopenharmony_ci		dlm_proto_ops = &dlm_tcp_ops;
194362306a36Sopenharmony_ci		break;
194462306a36Sopenharmony_ci	case DLM_PROTO_SCTP:
194562306a36Sopenharmony_ci		dlm_proto_ops = &dlm_sctp_ops;
194662306a36Sopenharmony_ci		break;
194762306a36Sopenharmony_ci	default:
194862306a36Sopenharmony_ci		log_print("Invalid protocol identifier %d set",
194962306a36Sopenharmony_ci			  dlm_config.ci_protocol);
195062306a36Sopenharmony_ci		error = -EINVAL;
195162306a36Sopenharmony_ci		goto fail_proto_ops;
195262306a36Sopenharmony_ci	}
195362306a36Sopenharmony_ci
195462306a36Sopenharmony_ci	error = dlm_listen_for_all();
195562306a36Sopenharmony_ci	if (error)
195662306a36Sopenharmony_ci		goto fail_listen;
195762306a36Sopenharmony_ci
195862306a36Sopenharmony_ci	return 0;
195962306a36Sopenharmony_ci
196062306a36Sopenharmony_cifail_listen:
196162306a36Sopenharmony_ci	dlm_proto_ops = NULL;
196262306a36Sopenharmony_cifail_proto_ops:
196362306a36Sopenharmony_ci	work_stop();
196462306a36Sopenharmony_cifail:
196562306a36Sopenharmony_ci	return error;
196662306a36Sopenharmony_ci}
196762306a36Sopenharmony_ci
196862306a36Sopenharmony_civoid dlm_lowcomms_init(void)
196962306a36Sopenharmony_ci{
197062306a36Sopenharmony_ci	int i;
197162306a36Sopenharmony_ci
197262306a36Sopenharmony_ci	for (i = 0; i < CONN_HASH_SIZE; i++)
197362306a36Sopenharmony_ci		INIT_HLIST_HEAD(&connection_hash[i]);
197462306a36Sopenharmony_ci
197562306a36Sopenharmony_ci	INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
197662306a36Sopenharmony_ci}
197762306a36Sopenharmony_ci
197862306a36Sopenharmony_civoid dlm_lowcomms_exit(void)
197962306a36Sopenharmony_ci{
198062306a36Sopenharmony_ci	struct connection *con;
198162306a36Sopenharmony_ci	int i, idx;
198262306a36Sopenharmony_ci
198362306a36Sopenharmony_ci	idx = srcu_read_lock(&connections_srcu);
198462306a36Sopenharmony_ci	for (i = 0; i < CONN_HASH_SIZE; i++) {
198562306a36Sopenharmony_ci		hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
198662306a36Sopenharmony_ci			spin_lock(&connections_lock);
198762306a36Sopenharmony_ci			hlist_del_rcu(&con->list);
198862306a36Sopenharmony_ci			spin_unlock(&connections_lock);
198962306a36Sopenharmony_ci
199062306a36Sopenharmony_ci			if (con->othercon)
199162306a36Sopenharmony_ci				call_srcu(&connections_srcu, &con->othercon->rcu,
199262306a36Sopenharmony_ci					  connection_release);
199362306a36Sopenharmony_ci			call_srcu(&connections_srcu, &con->rcu, connection_release);
199462306a36Sopenharmony_ci		}
199562306a36Sopenharmony_ci	}
199662306a36Sopenharmony_ci	srcu_read_unlock(&connections_srcu, idx);
199762306a36Sopenharmony_ci}
1998