1// SPDX-License-Identifier: GPL-2.0-or-later
2/*
3   drbd_receiver.c
4
5   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6
7   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10
11 */
12
13
14#include <linux/module.h>
15
16#include <linux/uaccess.h>
17#include <net/sock.h>
18
19#include <linux/drbd.h>
20#include <linux/fs.h>
21#include <linux/file.h>
22#include <linux/in.h>
23#include <linux/mm.h>
24#include <linux/memcontrol.h>
25#include <linux/mm_inline.h>
26#include <linux/slab.h>
27#include <uapi/linux/sched/types.h>
28#include <linux/sched/signal.h>
29#include <linux/pkt_sched.h>
30#define __KERNEL_SYSCALLS__
31#include <linux/unistd.h>
32#include <linux/vmalloc.h>
33#include <linux/random.h>
34#include <linux/string.h>
35#include <linux/scatterlist.h>
36#include <linux/part_stat.h>
37#include "drbd_int.h"
38#include "drbd_protocol.h"
39#include "drbd_req.h"
40#include "drbd_vli.h"
41
42#define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
43
44struct packet_info {
45	enum drbd_packet cmd;
46	unsigned int size;
47	unsigned int vnr;
48	void *data;
49};
50
51enum finish_epoch {
52	FE_STILL_LIVE,
53	FE_DESTROYED,
54	FE_RECYCLED,
55};
56
57static int drbd_do_features(struct drbd_connection *connection);
58static int drbd_do_auth(struct drbd_connection *connection);
59static int drbd_disconnected(struct drbd_peer_device *);
60static void conn_wait_active_ee_empty(struct drbd_connection *connection);
61static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
62static int e_end_block(struct drbd_work *, int);
63
64
65#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
66
67/*
68 * some helper functions to deal with single linked page lists,
69 * page->private being our "next" pointer.
70 */
71
72/* If at least n pages are linked at head, get n pages off.
73 * Otherwise, don't modify head, and return NULL.
74 * Locking is the responsibility of the caller.
75 */
76static struct page *page_chain_del(struct page **head, int n)
77{
78	struct page *page;
79	struct page *tmp;
80
81	BUG_ON(!n);
82	BUG_ON(!head);
83
84	page = *head;
85
86	if (!page)
87		return NULL;
88
89	while (page) {
90		tmp = page_chain_next(page);
91		if (--n == 0)
92			break; /* found sufficient pages */
93		if (tmp == NULL)
94			/* insufficient pages, don't use any of them. */
95			return NULL;
96		page = tmp;
97	}
98
99	/* add end of list marker for the returned list */
100	set_page_private(page, 0);
101	/* actual return value, and adjustment of head */
102	page = *head;
103	*head = tmp;
104	return page;
105}
106
107/* may be used outside of locks to find the tail of a (usually short)
108 * "private" page chain, before adding it back to a global chain head
109 * with page_chain_add() under a spinlock. */
110static struct page *page_chain_tail(struct page *page, int *len)
111{
112	struct page *tmp;
113	int i = 1;
114	while ((tmp = page_chain_next(page)))
115		++i, page = tmp;
116	if (len)
117		*len = i;
118	return page;
119}
120
121static int page_chain_free(struct page *page)
122{
123	struct page *tmp;
124	int i = 0;
125	page_chain_for_each_safe(page, tmp) {
126		put_page(page);
127		++i;
128	}
129	return i;
130}
131
132static void page_chain_add(struct page **head,
133		struct page *chain_first, struct page *chain_last)
134{
135#if 1
136	struct page *tmp;
137	tmp = page_chain_tail(chain_first, NULL);
138	BUG_ON(tmp != chain_last);
139#endif
140
141	/* add chain to head */
142	set_page_private(chain_last, (unsigned long)*head);
143	*head = chain_first;
144}
145
146static struct page *__drbd_alloc_pages(struct drbd_device *device,
147				       unsigned int number)
148{
149	struct page *page = NULL;
150	struct page *tmp = NULL;
151	unsigned int i = 0;
152
153	/* Yes, testing drbd_pp_vacant outside the lock is racy.
154	 * So what. It saves a spin_lock. */
155	if (drbd_pp_vacant >= number) {
156		spin_lock(&drbd_pp_lock);
157		page = page_chain_del(&drbd_pp_pool, number);
158		if (page)
159			drbd_pp_vacant -= number;
160		spin_unlock(&drbd_pp_lock);
161		if (page)
162			return page;
163	}
164
165	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
166	 * "criss-cross" setup, that might cause write-out on some other DRBD,
167	 * which in turn might block on the other node at this very place.  */
168	for (i = 0; i < number; i++) {
169		tmp = alloc_page(GFP_TRY);
170		if (!tmp)
171			break;
172		set_page_private(tmp, (unsigned long)page);
173		page = tmp;
174	}
175
176	if (i == number)
177		return page;
178
179	/* Not enough pages immediately available this time.
180	 * No need to jump around here, drbd_alloc_pages will retry this
181	 * function "soon". */
182	if (page) {
183		tmp = page_chain_tail(page, NULL);
184		spin_lock(&drbd_pp_lock);
185		page_chain_add(&drbd_pp_pool, page, tmp);
186		drbd_pp_vacant += i;
187		spin_unlock(&drbd_pp_lock);
188	}
189	return NULL;
190}
191
192static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
193					   struct list_head *to_be_freed)
194{
195	struct drbd_peer_request *peer_req, *tmp;
196
197	/* The EEs are always appended to the end of the list. Since
198	   they are sent in order over the wire, they have to finish
199	   in order. As soon as we see the first not finished we can
200	   stop to examine the list... */
201
202	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
203		if (drbd_peer_req_has_active_page(peer_req))
204			break;
205		list_move(&peer_req->w.list, to_be_freed);
206	}
207}
208
209static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
210{
211	LIST_HEAD(reclaimed);
212	struct drbd_peer_request *peer_req, *t;
213
214	spin_lock_irq(&device->resource->req_lock);
215	reclaim_finished_net_peer_reqs(device, &reclaimed);
216	spin_unlock_irq(&device->resource->req_lock);
217	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
218		drbd_free_net_peer_req(device, peer_req);
219}
220
221static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
222{
223	struct drbd_peer_device *peer_device;
224	int vnr;
225
226	rcu_read_lock();
227	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
228		struct drbd_device *device = peer_device->device;
229		if (!atomic_read(&device->pp_in_use_by_net))
230			continue;
231
232		kref_get(&device->kref);
233		rcu_read_unlock();
234		drbd_reclaim_net_peer_reqs(device);
235		kref_put(&device->kref, drbd_destroy_device);
236		rcu_read_lock();
237	}
238	rcu_read_unlock();
239}
240
241/**
242 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
243 * @device:	DRBD device.
244 * @number:	number of pages requested
245 * @retry:	whether to retry, if not enough pages are available right now
246 *
247 * Tries to allocate number pages, first from our own page pool, then from
248 * the kernel.
249 * Possibly retry until DRBD frees sufficient pages somewhere else.
250 *
251 * If this allocation would exceed the max_buffers setting, we throttle
252 * allocation (schedule_timeout) to give the system some room to breathe.
253 *
254 * We do not use max-buffers as hard limit, because it could lead to
255 * congestion and further to a distributed deadlock during online-verify or
256 * (checksum based) resync, if the max-buffers, socket buffer sizes and
257 * resync-rate settings are mis-configured.
258 *
259 * Returns a page chain linked via page->private.
260 */
261struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
262			      bool retry)
263{
264	struct drbd_device *device = peer_device->device;
265	struct page *page = NULL;
266	struct net_conf *nc;
267	DEFINE_WAIT(wait);
268	unsigned int mxb;
269
270	rcu_read_lock();
271	nc = rcu_dereference(peer_device->connection->net_conf);
272	mxb = nc ? nc->max_buffers : 1000000;
273	rcu_read_unlock();
274
275	if (atomic_read(&device->pp_in_use) < mxb)
276		page = __drbd_alloc_pages(device, number);
277
278	/* Try to keep the fast path fast, but occasionally we need
279	 * to reclaim the pages we lended to the network stack. */
280	if (page && atomic_read(&device->pp_in_use_by_net) > 512)
281		drbd_reclaim_net_peer_reqs(device);
282
283	while (page == NULL) {
284		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
285
286		drbd_reclaim_net_peer_reqs(device);
287
288		if (atomic_read(&device->pp_in_use) < mxb) {
289			page = __drbd_alloc_pages(device, number);
290			if (page)
291				break;
292		}
293
294		if (!retry)
295			break;
296
297		if (signal_pending(current)) {
298			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
299			break;
300		}
301
302		if (schedule_timeout(HZ/10) == 0)
303			mxb = UINT_MAX;
304	}
305	finish_wait(&drbd_pp_wait, &wait);
306
307	if (page)
308		atomic_add(number, &device->pp_in_use);
309	return page;
310}
311
312/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
313 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
314 * Either links the page chain back to the global pool,
315 * or returns all pages to the system. */
316static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
317{
318	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
319	int i;
320
321	if (page == NULL)
322		return;
323
324	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
325		i = page_chain_free(page);
326	else {
327		struct page *tmp;
328		tmp = page_chain_tail(page, &i);
329		spin_lock(&drbd_pp_lock);
330		page_chain_add(&drbd_pp_pool, page, tmp);
331		drbd_pp_vacant += i;
332		spin_unlock(&drbd_pp_lock);
333	}
334	i = atomic_sub_return(i, a);
335	if (i < 0)
336		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
337			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
338	wake_up(&drbd_pp_wait);
339}
340
341/*
342You need to hold the req_lock:
343 _drbd_wait_ee_list_empty()
344
345You must not have the req_lock:
346 drbd_free_peer_req()
347 drbd_alloc_peer_req()
348 drbd_free_peer_reqs()
349 drbd_ee_fix_bhs()
350 drbd_finish_peer_reqs()
351 drbd_clear_done_ee()
352 drbd_wait_ee_list_empty()
353*/
354
355/* normal: payload_size == request size (bi_size)
356 * w_same: payload_size == logical_block_size
357 * trim: payload_size == 0 */
358struct drbd_peer_request *
359drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
360		    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
361{
362	struct drbd_device *device = peer_device->device;
363	struct drbd_peer_request *peer_req;
364	struct page *page = NULL;
365	unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
366
367	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
368		return NULL;
369
370	peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
371	if (!peer_req) {
372		if (!(gfp_mask & __GFP_NOWARN))
373			drbd_err(device, "%s: allocation failed\n", __func__);
374		return NULL;
375	}
376
377	if (nr_pages) {
378		page = drbd_alloc_pages(peer_device, nr_pages,
379					gfpflags_allow_blocking(gfp_mask));
380		if (!page)
381			goto fail;
382	}
383
384	memset(peer_req, 0, sizeof(*peer_req));
385	INIT_LIST_HEAD(&peer_req->w.list);
386	drbd_clear_interval(&peer_req->i);
387	peer_req->i.size = request_size;
388	peer_req->i.sector = sector;
389	peer_req->submit_jif = jiffies;
390	peer_req->peer_device = peer_device;
391	peer_req->pages = page;
392	/*
393	 * The block_id is opaque to the receiver.  It is not endianness
394	 * converted, and sent back to the sender unchanged.
395	 */
396	peer_req->block_id = id;
397
398	return peer_req;
399
400 fail:
401	mempool_free(peer_req, &drbd_ee_mempool);
402	return NULL;
403}
404
405void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
406		       int is_net)
407{
408	might_sleep();
409	if (peer_req->flags & EE_HAS_DIGEST)
410		kfree(peer_req->digest);
411	drbd_free_pages(device, peer_req->pages, is_net);
412	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
413	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
414	if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
415		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
416		drbd_al_complete_io(device, &peer_req->i);
417	}
418	mempool_free(peer_req, &drbd_ee_mempool);
419}
420
421int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
422{
423	LIST_HEAD(work_list);
424	struct drbd_peer_request *peer_req, *t;
425	int count = 0;
426	int is_net = list == &device->net_ee;
427
428	spin_lock_irq(&device->resource->req_lock);
429	list_splice_init(list, &work_list);
430	spin_unlock_irq(&device->resource->req_lock);
431
432	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
433		__drbd_free_peer_req(device, peer_req, is_net);
434		count++;
435	}
436	return count;
437}
438
439/*
440 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
441 */
442static int drbd_finish_peer_reqs(struct drbd_device *device)
443{
444	LIST_HEAD(work_list);
445	LIST_HEAD(reclaimed);
446	struct drbd_peer_request *peer_req, *t;
447	int err = 0;
448
449	spin_lock_irq(&device->resource->req_lock);
450	reclaim_finished_net_peer_reqs(device, &reclaimed);
451	list_splice_init(&device->done_ee, &work_list);
452	spin_unlock_irq(&device->resource->req_lock);
453
454	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
455		drbd_free_net_peer_req(device, peer_req);
456
457	/* possible callbacks here:
458	 * e_end_block, and e_end_resync_block, e_send_superseded.
459	 * all ignore the last argument.
460	 */
461	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
462		int err2;
463
464		/* list_del not necessary, next/prev members not touched */
465		err2 = peer_req->w.cb(&peer_req->w, !!err);
466		if (!err)
467			err = err2;
468		drbd_free_peer_req(device, peer_req);
469	}
470	wake_up(&device->ee_wait);
471
472	return err;
473}
474
475static void _drbd_wait_ee_list_empty(struct drbd_device *device,
476				     struct list_head *head)
477{
478	DEFINE_WAIT(wait);
479
480	/* avoids spin_lock/unlock
481	 * and calling prepare_to_wait in the fast path */
482	while (!list_empty(head)) {
483		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
484		spin_unlock_irq(&device->resource->req_lock);
485		io_schedule();
486		finish_wait(&device->ee_wait, &wait);
487		spin_lock_irq(&device->resource->req_lock);
488	}
489}
490
491static void drbd_wait_ee_list_empty(struct drbd_device *device,
492				    struct list_head *head)
493{
494	spin_lock_irq(&device->resource->req_lock);
495	_drbd_wait_ee_list_empty(device, head);
496	spin_unlock_irq(&device->resource->req_lock);
497}
498
499static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
500{
501	struct kvec iov = {
502		.iov_base = buf,
503		.iov_len = size,
504	};
505	struct msghdr msg = {
506		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
507	};
508	iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, size);
509	return sock_recvmsg(sock, &msg, msg.msg_flags);
510}
511
512static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
513{
514	int rv;
515
516	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
517
518	if (rv < 0) {
519		if (rv == -ECONNRESET)
520			drbd_info(connection, "sock was reset by peer\n");
521		else if (rv != -ERESTARTSYS)
522			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
523	} else if (rv == 0) {
524		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
525			long t;
526			rcu_read_lock();
527			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
528			rcu_read_unlock();
529
530			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
531
532			if (t)
533				goto out;
534		}
535		drbd_info(connection, "sock was shut down by peer\n");
536	}
537
538	if (rv != size)
539		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
540
541out:
542	return rv;
543}
544
545static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
546{
547	int err;
548
549	err = drbd_recv(connection, buf, size);
550	if (err != size) {
551		if (err >= 0)
552			err = -EIO;
553	} else
554		err = 0;
555	return err;
556}
557
558static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
559{
560	int err;
561
562	err = drbd_recv_all(connection, buf, size);
563	if (err && !signal_pending(current))
564		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
565	return err;
566}
567
568/* quoting tcp(7):
569 *   On individual connections, the socket buffer size must be set prior to the
570 *   listen(2) or connect(2) calls in order to have it take effect.
571 * This is our wrapper to do so.
572 */
573static void drbd_setbufsize(struct socket *sock, unsigned int snd,
574		unsigned int rcv)
575{
576	/* open coded SO_SNDBUF, SO_RCVBUF */
577	if (snd) {
578		sock->sk->sk_sndbuf = snd;
579		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
580	}
581	if (rcv) {
582		sock->sk->sk_rcvbuf = rcv;
583		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
584	}
585}
586
587static struct socket *drbd_try_connect(struct drbd_connection *connection)
588{
589	const char *what;
590	struct socket *sock;
591	struct sockaddr_in6 src_in6;
592	struct sockaddr_in6 peer_in6;
593	struct net_conf *nc;
594	int err, peer_addr_len, my_addr_len;
595	int sndbuf_size, rcvbuf_size, connect_int;
596	int disconnect_on_error = 1;
597
598	rcu_read_lock();
599	nc = rcu_dereference(connection->net_conf);
600	if (!nc) {
601		rcu_read_unlock();
602		return NULL;
603	}
604	sndbuf_size = nc->sndbuf_size;
605	rcvbuf_size = nc->rcvbuf_size;
606	connect_int = nc->connect_int;
607	rcu_read_unlock();
608
609	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
610	memcpy(&src_in6, &connection->my_addr, my_addr_len);
611
612	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
613		src_in6.sin6_port = 0;
614	else
615		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
616
617	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
618	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
619
620	what = "sock_create_kern";
621	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
622			       SOCK_STREAM, IPPROTO_TCP, &sock);
623	if (err < 0) {
624		sock = NULL;
625		goto out;
626	}
627
628	sock->sk->sk_rcvtimeo =
629	sock->sk->sk_sndtimeo = connect_int * HZ;
630	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
631
632       /* explicitly bind to the configured IP as source IP
633	*  for the outgoing connections.
634	*  This is needed for multihomed hosts and to be
635	*  able to use lo: interfaces for drbd.
636	* Make sure to use 0 as port number, so linux selects
637	*  a free one dynamically.
638	*/
639	what = "bind before connect";
640	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
641	if (err < 0)
642		goto out;
643
644	/* connect may fail, peer not yet available.
645	 * stay C_WF_CONNECTION, don't go Disconnecting! */
646	disconnect_on_error = 0;
647	what = "connect";
648	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
649
650out:
651	if (err < 0) {
652		if (sock) {
653			sock_release(sock);
654			sock = NULL;
655		}
656		switch (-err) {
657			/* timeout, busy, signal pending */
658		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
659		case EINTR: case ERESTARTSYS:
660			/* peer not (yet) available, network problem */
661		case ECONNREFUSED: case ENETUNREACH:
662		case EHOSTDOWN:    case EHOSTUNREACH:
663			disconnect_on_error = 0;
664			break;
665		default:
666			drbd_err(connection, "%s failed, err = %d\n", what, err);
667		}
668		if (disconnect_on_error)
669			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
670	}
671
672	return sock;
673}
674
675struct accept_wait_data {
676	struct drbd_connection *connection;
677	struct socket *s_listen;
678	struct completion door_bell;
679	void (*original_sk_state_change)(struct sock *sk);
680
681};
682
683static void drbd_incoming_connection(struct sock *sk)
684{
685	struct accept_wait_data *ad = sk->sk_user_data;
686	void (*state_change)(struct sock *sk);
687
688	state_change = ad->original_sk_state_change;
689	if (sk->sk_state == TCP_ESTABLISHED)
690		complete(&ad->door_bell);
691	state_change(sk);
692}
693
694static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
695{
696	int err, sndbuf_size, rcvbuf_size, my_addr_len;
697	struct sockaddr_in6 my_addr;
698	struct socket *s_listen;
699	struct net_conf *nc;
700	const char *what;
701
702	rcu_read_lock();
703	nc = rcu_dereference(connection->net_conf);
704	if (!nc) {
705		rcu_read_unlock();
706		return -EIO;
707	}
708	sndbuf_size = nc->sndbuf_size;
709	rcvbuf_size = nc->rcvbuf_size;
710	rcu_read_unlock();
711
712	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
713	memcpy(&my_addr, &connection->my_addr, my_addr_len);
714
715	what = "sock_create_kern";
716	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
717			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
718	if (err) {
719		s_listen = NULL;
720		goto out;
721	}
722
723	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
724	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
725
726	what = "bind before listen";
727	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
728	if (err < 0)
729		goto out;
730
731	ad->s_listen = s_listen;
732	write_lock_bh(&s_listen->sk->sk_callback_lock);
733	ad->original_sk_state_change = s_listen->sk->sk_state_change;
734	s_listen->sk->sk_state_change = drbd_incoming_connection;
735	s_listen->sk->sk_user_data = ad;
736	write_unlock_bh(&s_listen->sk->sk_callback_lock);
737
738	what = "listen";
739	err = s_listen->ops->listen(s_listen, 5);
740	if (err < 0)
741		goto out;
742
743	return 0;
744out:
745	if (s_listen)
746		sock_release(s_listen);
747	if (err < 0) {
748		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
749			drbd_err(connection, "%s failed, err = %d\n", what, err);
750			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
751		}
752	}
753
754	return -EIO;
755}
756
757static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
758{
759	write_lock_bh(&sk->sk_callback_lock);
760	sk->sk_state_change = ad->original_sk_state_change;
761	sk->sk_user_data = NULL;
762	write_unlock_bh(&sk->sk_callback_lock);
763}
764
765static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
766{
767	int timeo, connect_int, err = 0;
768	struct socket *s_estab = NULL;
769	struct net_conf *nc;
770
771	rcu_read_lock();
772	nc = rcu_dereference(connection->net_conf);
773	if (!nc) {
774		rcu_read_unlock();
775		return NULL;
776	}
777	connect_int = nc->connect_int;
778	rcu_read_unlock();
779
780	timeo = connect_int * HZ;
781	/* 28.5% random jitter */
782	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
783
784	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
785	if (err <= 0)
786		return NULL;
787
788	err = kernel_accept(ad->s_listen, &s_estab, 0);
789	if (err < 0) {
790		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
791			drbd_err(connection, "accept failed, err = %d\n", err);
792			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
793		}
794	}
795
796	if (s_estab)
797		unregister_state_change(s_estab->sk, ad);
798
799	return s_estab;
800}
801
802static int decode_header(struct drbd_connection *, void *, struct packet_info *);
803
804static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
805			     enum drbd_packet cmd)
806{
807	if (!conn_prepare_command(connection, sock))
808		return -EIO;
809	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
810}
811
812static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
813{
814	unsigned int header_size = drbd_header_size(connection);
815	struct packet_info pi;
816	struct net_conf *nc;
817	int err;
818
819	rcu_read_lock();
820	nc = rcu_dereference(connection->net_conf);
821	if (!nc) {
822		rcu_read_unlock();
823		return -EIO;
824	}
825	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
826	rcu_read_unlock();
827
828	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
829	if (err != header_size) {
830		if (err >= 0)
831			err = -EIO;
832		return err;
833	}
834	err = decode_header(connection, connection->data.rbuf, &pi);
835	if (err)
836		return err;
837	return pi.cmd;
838}
839
840/**
841 * drbd_socket_okay() - Free the socket if its connection is not okay
842 * @sock:	pointer to the pointer to the socket.
843 */
844static bool drbd_socket_okay(struct socket **sock)
845{
846	int rr;
847	char tb[4];
848
849	if (!*sock)
850		return false;
851
852	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
853
854	if (rr > 0 || rr == -EAGAIN) {
855		return true;
856	} else {
857		sock_release(*sock);
858		*sock = NULL;
859		return false;
860	}
861}
862
863static bool connection_established(struct drbd_connection *connection,
864				   struct socket **sock1,
865				   struct socket **sock2)
866{
867	struct net_conf *nc;
868	int timeout;
869	bool ok;
870
871	if (!*sock1 || !*sock2)
872		return false;
873
874	rcu_read_lock();
875	nc = rcu_dereference(connection->net_conf);
876	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
877	rcu_read_unlock();
878	schedule_timeout_interruptible(timeout);
879
880	ok = drbd_socket_okay(sock1);
881	ok = drbd_socket_okay(sock2) && ok;
882
883	return ok;
884}
885
886/* Gets called if a connection is established, or if a new minor gets created
887   in a connection */
888int drbd_connected(struct drbd_peer_device *peer_device)
889{
890	struct drbd_device *device = peer_device->device;
891	int err;
892
893	atomic_set(&device->packet_seq, 0);
894	device->peer_seq = 0;
895
896	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
897		&peer_device->connection->cstate_mutex :
898		&device->own_state_mutex;
899
900	err = drbd_send_sync_param(peer_device);
901	if (!err)
902		err = drbd_send_sizes(peer_device, 0, 0);
903	if (!err)
904		err = drbd_send_uuids(peer_device);
905	if (!err)
906		err = drbd_send_current_state(peer_device);
907	clear_bit(USE_DEGR_WFC_T, &device->flags);
908	clear_bit(RESIZE_PENDING, &device->flags);
909	atomic_set(&device->ap_in_flight, 0);
910	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
911	return err;
912}
913
914/*
915 * return values:
916 *   1 yes, we have a valid connection
917 *   0 oops, did not work out, please try again
918 *  -1 peer talks different language,
919 *     no point in trying again, please go standalone.
920 *  -2 We do not have a network config...
921 */
922static int conn_connect(struct drbd_connection *connection)
923{
924	struct drbd_socket sock, msock;
925	struct drbd_peer_device *peer_device;
926	struct net_conf *nc;
927	int vnr, timeout, h;
928	bool discard_my_data, ok;
929	enum drbd_state_rv rv;
930	struct accept_wait_data ad = {
931		.connection = connection,
932		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
933	};
934
935	clear_bit(DISCONNECT_SENT, &connection->flags);
936	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
937		return -2;
938
939	mutex_init(&sock.mutex);
940	sock.sbuf = connection->data.sbuf;
941	sock.rbuf = connection->data.rbuf;
942	sock.socket = NULL;
943	mutex_init(&msock.mutex);
944	msock.sbuf = connection->meta.sbuf;
945	msock.rbuf = connection->meta.rbuf;
946	msock.socket = NULL;
947
948	/* Assume that the peer only understands protocol 80 until we know better.  */
949	connection->agreed_pro_version = 80;
950
951	if (prepare_listen_socket(connection, &ad))
952		return 0;
953
954	do {
955		struct socket *s;
956
957		s = drbd_try_connect(connection);
958		if (s) {
959			if (!sock.socket) {
960				sock.socket = s;
961				send_first_packet(connection, &sock, P_INITIAL_DATA);
962			} else if (!msock.socket) {
963				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
964				msock.socket = s;
965				send_first_packet(connection, &msock, P_INITIAL_META);
966			} else {
967				drbd_err(connection, "Logic error in conn_connect()\n");
968				goto out_release_sockets;
969			}
970		}
971
972		if (connection_established(connection, &sock.socket, &msock.socket))
973			break;
974
975retry:
976		s = drbd_wait_for_connect(connection, &ad);
977		if (s) {
978			int fp = receive_first_packet(connection, s);
979			drbd_socket_okay(&sock.socket);
980			drbd_socket_okay(&msock.socket);
981			switch (fp) {
982			case P_INITIAL_DATA:
983				if (sock.socket) {
984					drbd_warn(connection, "initial packet S crossed\n");
985					sock_release(sock.socket);
986					sock.socket = s;
987					goto randomize;
988				}
989				sock.socket = s;
990				break;
991			case P_INITIAL_META:
992				set_bit(RESOLVE_CONFLICTS, &connection->flags);
993				if (msock.socket) {
994					drbd_warn(connection, "initial packet M crossed\n");
995					sock_release(msock.socket);
996					msock.socket = s;
997					goto randomize;
998				}
999				msock.socket = s;
1000				break;
1001			default:
1002				drbd_warn(connection, "Error receiving initial packet\n");
1003				sock_release(s);
1004randomize:
1005				if (prandom_u32() & 1)
1006					goto retry;
1007			}
1008		}
1009
1010		if (connection->cstate <= C_DISCONNECTING)
1011			goto out_release_sockets;
1012		if (signal_pending(current)) {
1013			flush_signals(current);
1014			smp_rmb();
1015			if (get_t_state(&connection->receiver) == EXITING)
1016				goto out_release_sockets;
1017		}
1018
1019		ok = connection_established(connection, &sock.socket, &msock.socket);
1020	} while (!ok);
1021
1022	if (ad.s_listen)
1023		sock_release(ad.s_listen);
1024
1025	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1026	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1027
1028	sock.socket->sk->sk_allocation = GFP_NOIO;
1029	msock.socket->sk->sk_allocation = GFP_NOIO;
1030
1031	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1032	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1033
1034	/* NOT YET ...
1035	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1036	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1037	 * first set it to the P_CONNECTION_FEATURES timeout,
1038	 * which we set to 4x the configured ping_timeout. */
1039	rcu_read_lock();
1040	nc = rcu_dereference(connection->net_conf);
1041
1042	sock.socket->sk->sk_sndtimeo =
1043	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1044
1045	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1046	timeout = nc->timeout * HZ / 10;
1047	discard_my_data = nc->discard_my_data;
1048	rcu_read_unlock();
1049
1050	msock.socket->sk->sk_sndtimeo = timeout;
1051
1052	/* we don't want delays.
1053	 * we use TCP_CORK where appropriate, though */
1054	tcp_sock_set_nodelay(sock.socket->sk);
1055	tcp_sock_set_nodelay(msock.socket->sk);
1056
1057	connection->data.socket = sock.socket;
1058	connection->meta.socket = msock.socket;
1059	connection->last_received = jiffies;
1060
1061	h = drbd_do_features(connection);
1062	if (h <= 0)
1063		return h;
1064
1065	if (connection->cram_hmac_tfm) {
1066		/* drbd_request_state(device, NS(conn, WFAuth)); */
1067		switch (drbd_do_auth(connection)) {
1068		case -1:
1069			drbd_err(connection, "Authentication of peer failed\n");
1070			return -1;
1071		case 0:
1072			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1073			return 0;
1074		}
1075	}
1076
1077	connection->data.socket->sk->sk_sndtimeo = timeout;
1078	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1079
1080	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1081		return -1;
1082
1083	/* Prevent a race between resync-handshake and
1084	 * being promoted to Primary.
1085	 *
1086	 * Grab and release the state mutex, so we know that any current
1087	 * drbd_set_role() is finished, and any incoming drbd_set_role
1088	 * will see the STATE_SENT flag, and wait for it to be cleared.
1089	 */
1090	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1091		mutex_lock(peer_device->device->state_mutex);
1092
1093	/* avoid a race with conn_request_state( C_DISCONNECTING ) */
1094	spin_lock_irq(&connection->resource->req_lock);
1095	set_bit(STATE_SENT, &connection->flags);
1096	spin_unlock_irq(&connection->resource->req_lock);
1097
1098	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1099		mutex_unlock(peer_device->device->state_mutex);
1100
1101	rcu_read_lock();
1102	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1103		struct drbd_device *device = peer_device->device;
1104		kref_get(&device->kref);
1105		rcu_read_unlock();
1106
1107		if (discard_my_data)
1108			set_bit(DISCARD_MY_DATA, &device->flags);
1109		else
1110			clear_bit(DISCARD_MY_DATA, &device->flags);
1111
1112		drbd_connected(peer_device);
1113		kref_put(&device->kref, drbd_destroy_device);
1114		rcu_read_lock();
1115	}
1116	rcu_read_unlock();
1117
1118	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1119	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1120		clear_bit(STATE_SENT, &connection->flags);
1121		return 0;
1122	}
1123
1124	drbd_thread_start(&connection->ack_receiver);
1125	/* opencoded create_singlethread_workqueue(),
1126	 * to be able to use format string arguments */
1127	connection->ack_sender =
1128		alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1129	if (!connection->ack_sender) {
1130		drbd_err(connection, "Failed to create workqueue ack_sender\n");
1131		return 0;
1132	}
1133
1134	mutex_lock(&connection->resource->conf_update);
1135	/* The discard_my_data flag is a single-shot modifier to the next
1136	 * connection attempt, the handshake of which is now well underway.
1137	 * No need for rcu style copying of the whole struct
1138	 * just to clear a single value. */
1139	connection->net_conf->discard_my_data = 0;
1140	mutex_unlock(&connection->resource->conf_update);
1141
1142	return h;
1143
1144out_release_sockets:
1145	if (ad.s_listen)
1146		sock_release(ad.s_listen);
1147	if (sock.socket)
1148		sock_release(sock.socket);
1149	if (msock.socket)
1150		sock_release(msock.socket);
1151	return -1;
1152}
1153
1154static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1155{
1156	unsigned int header_size = drbd_header_size(connection);
1157
1158	if (header_size == sizeof(struct p_header100) &&
1159	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1160		struct p_header100 *h = header;
1161		if (h->pad != 0) {
1162			drbd_err(connection, "Header padding is not zero\n");
1163			return -EINVAL;
1164		}
1165		pi->vnr = be16_to_cpu(h->volume);
1166		pi->cmd = be16_to_cpu(h->command);
1167		pi->size = be32_to_cpu(h->length);
1168	} else if (header_size == sizeof(struct p_header95) &&
1169		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1170		struct p_header95 *h = header;
1171		pi->cmd = be16_to_cpu(h->command);
1172		pi->size = be32_to_cpu(h->length);
1173		pi->vnr = 0;
1174	} else if (header_size == sizeof(struct p_header80) &&
1175		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1176		struct p_header80 *h = header;
1177		pi->cmd = be16_to_cpu(h->command);
1178		pi->size = be16_to_cpu(h->length);
1179		pi->vnr = 0;
1180	} else {
1181		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1182			 be32_to_cpu(*(__be32 *)header),
1183			 connection->agreed_pro_version);
1184		return -EINVAL;
1185	}
1186	pi->data = header + header_size;
1187	return 0;
1188}
1189
1190static void drbd_unplug_all_devices(struct drbd_connection *connection)
1191{
1192	if (current->plug == &connection->receiver_plug) {
1193		blk_finish_plug(&connection->receiver_plug);
1194		blk_start_plug(&connection->receiver_plug);
1195	} /* else: maybe just schedule() ?? */
1196}
1197
1198static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1199{
1200	void *buffer = connection->data.rbuf;
1201	int err;
1202
1203	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1204	if (err)
1205		return err;
1206
1207	err = decode_header(connection, buffer, pi);
1208	connection->last_received = jiffies;
1209
1210	return err;
1211}
1212
1213static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1214{
1215	void *buffer = connection->data.rbuf;
1216	unsigned int size = drbd_header_size(connection);
1217	int err;
1218
1219	err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1220	if (err != size) {
1221		/* If we have nothing in the receive buffer now, to reduce
1222		 * application latency, try to drain the backend queues as
1223		 * quickly as possible, and let remote TCP know what we have
1224		 * received so far. */
1225		if (err == -EAGAIN) {
1226			tcp_sock_set_quickack(connection->data.socket->sk, 2);
1227			drbd_unplug_all_devices(connection);
1228		}
1229		if (err > 0) {
1230			buffer += err;
1231			size -= err;
1232		}
1233		err = drbd_recv_all_warn(connection, buffer, size);
1234		if (err)
1235			return err;
1236	}
1237
1238	err = decode_header(connection, connection->data.rbuf, pi);
1239	connection->last_received = jiffies;
1240
1241	return err;
1242}
1243/* This is blkdev_issue_flush, but asynchronous.
1244 * We want to submit to all component volumes in parallel,
1245 * then wait for all completions.
1246 */
1247struct issue_flush_context {
1248	atomic_t pending;
1249	int error;
1250	struct completion done;
1251};
1252struct one_flush_context {
1253	struct drbd_device *device;
1254	struct issue_flush_context *ctx;
1255};
1256
1257static void one_flush_endio(struct bio *bio)
1258{
1259	struct one_flush_context *octx = bio->bi_private;
1260	struct drbd_device *device = octx->device;
1261	struct issue_flush_context *ctx = octx->ctx;
1262
1263	if (bio->bi_status) {
1264		ctx->error = blk_status_to_errno(bio->bi_status);
1265		drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1266	}
1267	kfree(octx);
1268	bio_put(bio);
1269
1270	clear_bit(FLUSH_PENDING, &device->flags);
1271	put_ldev(device);
1272	kref_put(&device->kref, drbd_destroy_device);
1273
1274	if (atomic_dec_and_test(&ctx->pending))
1275		complete(&ctx->done);
1276}
1277
1278static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1279{
1280	struct bio *bio = bio_alloc(GFP_NOIO, 0);
1281	struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1282	if (!bio || !octx) {
1283		drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1284		/* FIXME: what else can I do now?  disconnecting or detaching
1285		 * really does not help to improve the state of the world, either.
1286		 */
1287		kfree(octx);
1288		if (bio)
1289			bio_put(bio);
1290
1291		ctx->error = -ENOMEM;
1292		put_ldev(device);
1293		kref_put(&device->kref, drbd_destroy_device);
1294		return;
1295	}
1296
1297	octx->device = device;
1298	octx->ctx = ctx;
1299	bio_set_dev(bio, device->ldev->backing_bdev);
1300	bio->bi_private = octx;
1301	bio->bi_end_io = one_flush_endio;
1302	bio->bi_opf = REQ_OP_WRITE | REQ_PREFLUSH;
1303
1304	device->flush_jif = jiffies;
1305	set_bit(FLUSH_PENDING, &device->flags);
1306	atomic_inc(&ctx->pending);
1307	submit_bio(bio);
1308}
1309
1310static void drbd_flush(struct drbd_connection *connection)
1311{
1312	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1313		struct drbd_peer_device *peer_device;
1314		struct issue_flush_context ctx;
1315		int vnr;
1316
1317		atomic_set(&ctx.pending, 1);
1318		ctx.error = 0;
1319		init_completion(&ctx.done);
1320
1321		rcu_read_lock();
1322		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1323			struct drbd_device *device = peer_device->device;
1324
1325			if (!get_ldev(device))
1326				continue;
1327			kref_get(&device->kref);
1328			rcu_read_unlock();
1329
1330			submit_one_flush(device, &ctx);
1331
1332			rcu_read_lock();
1333		}
1334		rcu_read_unlock();
1335
1336		/* Do we want to add a timeout,
1337		 * if disk-timeout is set? */
1338		if (!atomic_dec_and_test(&ctx.pending))
1339			wait_for_completion(&ctx.done);
1340
1341		if (ctx.error) {
1342			/* would rather check on EOPNOTSUPP, but that is not reliable.
1343			 * don't try again for ANY return value != 0
1344			 * if (rv == -EOPNOTSUPP) */
1345			/* Any error is already reported by bio_endio callback. */
1346			drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1347		}
1348	}
1349}
1350
1351/**
1352 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1353 * @device:	DRBD device.
1354 * @epoch:	Epoch object.
1355 * @ev:		Epoch event.
1356 */
1357static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1358					       struct drbd_epoch *epoch,
1359					       enum epoch_event ev)
1360{
1361	int epoch_size;
1362	struct drbd_epoch *next_epoch;
1363	enum finish_epoch rv = FE_STILL_LIVE;
1364
1365	spin_lock(&connection->epoch_lock);
1366	do {
1367		next_epoch = NULL;
1368
1369		epoch_size = atomic_read(&epoch->epoch_size);
1370
1371		switch (ev & ~EV_CLEANUP) {
1372		case EV_PUT:
1373			atomic_dec(&epoch->active);
1374			break;
1375		case EV_GOT_BARRIER_NR:
1376			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1377			break;
1378		case EV_BECAME_LAST:
1379			/* nothing to do*/
1380			break;
1381		}
1382
1383		if (epoch_size != 0 &&
1384		    atomic_read(&epoch->active) == 0 &&
1385		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1386			if (!(ev & EV_CLEANUP)) {
1387				spin_unlock(&connection->epoch_lock);
1388				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1389				spin_lock(&connection->epoch_lock);
1390			}
1391#if 0
1392			/* FIXME: dec unacked on connection, once we have
1393			 * something to count pending connection packets in. */
1394			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1395				dec_unacked(epoch->connection);
1396#endif
1397
1398			if (connection->current_epoch != epoch) {
1399				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1400				list_del(&epoch->list);
1401				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1402				connection->epochs--;
1403				kfree(epoch);
1404
1405				if (rv == FE_STILL_LIVE)
1406					rv = FE_DESTROYED;
1407			} else {
1408				epoch->flags = 0;
1409				atomic_set(&epoch->epoch_size, 0);
1410				/* atomic_set(&epoch->active, 0); is already zero */
1411				if (rv == FE_STILL_LIVE)
1412					rv = FE_RECYCLED;
1413			}
1414		}
1415
1416		if (!next_epoch)
1417			break;
1418
1419		epoch = next_epoch;
1420	} while (1);
1421
1422	spin_unlock(&connection->epoch_lock);
1423
1424	return rv;
1425}
1426
1427static enum write_ordering_e
1428max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1429{
1430	struct disk_conf *dc;
1431
1432	dc = rcu_dereference(bdev->disk_conf);
1433
1434	if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1435		wo = WO_DRAIN_IO;
1436	if (wo == WO_DRAIN_IO && !dc->disk_drain)
1437		wo = WO_NONE;
1438
1439	return wo;
1440}
1441
1442/**
1443 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1444 * @connection:	DRBD connection.
1445 * @wo:		Write ordering method to try.
1446 */
1447void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1448			      enum write_ordering_e wo)
1449{
1450	struct drbd_device *device;
1451	enum write_ordering_e pwo;
1452	int vnr;
1453	static char *write_ordering_str[] = {
1454		[WO_NONE] = "none",
1455		[WO_DRAIN_IO] = "drain",
1456		[WO_BDEV_FLUSH] = "flush",
1457	};
1458
1459	pwo = resource->write_ordering;
1460	if (wo != WO_BDEV_FLUSH)
1461		wo = min(pwo, wo);
1462	rcu_read_lock();
1463	idr_for_each_entry(&resource->devices, device, vnr) {
1464		if (get_ldev(device)) {
1465			wo = max_allowed_wo(device->ldev, wo);
1466			if (device->ldev == bdev)
1467				bdev = NULL;
1468			put_ldev(device);
1469		}
1470	}
1471
1472	if (bdev)
1473		wo = max_allowed_wo(bdev, wo);
1474
1475	rcu_read_unlock();
1476
1477	resource->write_ordering = wo;
1478	if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1479		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1480}
1481
1482/*
1483 * Mapping "discard" to ZEROOUT with UNMAP does not work for us:
1484 * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
1485 * will directly go to fallback mode, submitting normal writes, and
1486 * never even try to UNMAP.
1487 *
1488 * And dm-thin does not do this (yet), mostly because in general it has
1489 * to assume that "skip_block_zeroing" is set.  See also:
1490 * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
1491 * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
1492 *
1493 * We *may* ignore the discard-zeroes-data setting, if so configured.
1494 *
1495 * Assumption is that this "discard_zeroes_data=0" is only because the backend
1496 * may ignore partial unaligned discards.
1497 *
1498 * LVM/DM thin as of at least
1499 *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1500 *   Library version: 1.02.93-RHEL7 (2015-01-28)
1501 *   Driver version:  4.29.0
1502 * still behaves this way.
1503 *
1504 * For unaligned (wrt. alignment and granularity) or too small discards,
1505 * we zero-out the initial (and/or) trailing unaligned partial chunks,
1506 * but discard all the aligned full chunks.
1507 *
1508 * At least for LVM/DM thin, with skip_block_zeroing=false,
1509 * the result is effectively "discard_zeroes_data=1".
1510 */
1511/* flags: EE_TRIM|EE_ZEROOUT */
1512int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
1513{
1514	struct block_device *bdev = device->ldev->backing_bdev;
1515	struct request_queue *q = bdev_get_queue(bdev);
1516	sector_t tmp, nr;
1517	unsigned int max_discard_sectors, granularity;
1518	int alignment;
1519	int err = 0;
1520
1521	if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
1522		goto zero_out;
1523
1524	/* Zero-sector (unknown) and one-sector granularities are the same.  */
1525	granularity = max(q->limits.discard_granularity >> 9, 1U);
1526	alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1527
1528	max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1529	max_discard_sectors -= max_discard_sectors % granularity;
1530	if (unlikely(!max_discard_sectors))
1531		goto zero_out;
1532
1533	if (nr_sectors < granularity)
1534		goto zero_out;
1535
1536	tmp = start;
1537	if (sector_div(tmp, granularity) != alignment) {
1538		if (nr_sectors < 2*granularity)
1539			goto zero_out;
1540		/* start + gran - (start + gran - align) % gran */
1541		tmp = start + granularity - alignment;
1542		tmp = start + granularity - sector_div(tmp, granularity);
1543
1544		nr = tmp - start;
1545		/* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
1546		 * layers are below us, some may have smaller granularity */
1547		err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1548		nr_sectors -= nr;
1549		start = tmp;
1550	}
1551	while (nr_sectors >= max_discard_sectors) {
1552		err |= blkdev_issue_discard(bdev, start, max_discard_sectors, GFP_NOIO, 0);
1553		nr_sectors -= max_discard_sectors;
1554		start += max_discard_sectors;
1555	}
1556	if (nr_sectors) {
1557		/* max_discard_sectors is unsigned int (and a multiple of
1558		 * granularity, we made sure of that above already);
1559		 * nr is < max_discard_sectors;
1560		 * I don't need sector_div here, even though nr is sector_t */
1561		nr = nr_sectors;
1562		nr -= (unsigned int)nr % granularity;
1563		if (nr) {
1564			err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1565			nr_sectors -= nr;
1566			start += nr;
1567		}
1568	}
1569 zero_out:
1570	if (nr_sectors) {
1571		err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
1572				(flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
1573	}
1574	return err != 0;
1575}
1576
1577static bool can_do_reliable_discards(struct drbd_device *device)
1578{
1579	struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1580	struct disk_conf *dc;
1581	bool can_do;
1582
1583	if (!blk_queue_discard(q))
1584		return false;
1585
1586	rcu_read_lock();
1587	dc = rcu_dereference(device->ldev->disk_conf);
1588	can_do = dc->discard_zeroes_if_aligned;
1589	rcu_read_unlock();
1590	return can_do;
1591}
1592
1593static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
1594{
1595	/* If the backend cannot discard, or does not guarantee
1596	 * read-back zeroes in discarded ranges, we fall back to
1597	 * zero-out.  Unless configuration specifically requested
1598	 * otherwise. */
1599	if (!can_do_reliable_discards(device))
1600		peer_req->flags |= EE_ZEROOUT;
1601
1602	if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1603	    peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
1604		peer_req->flags |= EE_WAS_ERROR;
1605	drbd_endio_write_sec_final(peer_req);
1606}
1607
1608static void drbd_issue_peer_wsame(struct drbd_device *device,
1609				  struct drbd_peer_request *peer_req)
1610{
1611	struct block_device *bdev = device->ldev->backing_bdev;
1612	sector_t s = peer_req->i.sector;
1613	sector_t nr = peer_req->i.size >> 9;
1614	if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1615		peer_req->flags |= EE_WAS_ERROR;
1616	drbd_endio_write_sec_final(peer_req);
1617}
1618
1619
1620/**
1621 * drbd_submit_peer_request()
1622 * @device:	DRBD device.
1623 * @peer_req:	peer request
1624 * @rw:		flag field, see bio->bi_opf
1625 *
1626 * May spread the pages to multiple bios,
1627 * depending on bio_add_page restrictions.
1628 *
1629 * Returns 0 if all bios have been submitted,
1630 * -ENOMEM if we could not allocate enough bios,
1631 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1632 *  single page to an empty bio (which should never happen and likely indicates
1633 *  that the lower level IO stack is in some way broken). This has been observed
1634 *  on certain Xen deployments.
1635 */
1636/* TODO allocate from our own bio_set. */
1637int drbd_submit_peer_request(struct drbd_device *device,
1638			     struct drbd_peer_request *peer_req,
1639			     const unsigned op, const unsigned op_flags,
1640			     const int fault_type)
1641{
1642	struct bio *bios = NULL;
1643	struct bio *bio;
1644	struct page *page = peer_req->pages;
1645	sector_t sector = peer_req->i.sector;
1646	unsigned data_size = peer_req->i.size;
1647	unsigned n_bios = 0;
1648	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1649	int err = -ENOMEM;
1650
1651	/* TRIM/DISCARD: for now, always use the helper function
1652	 * blkdev_issue_zeroout(..., discard=true).
1653	 * It's synchronous, but it does the right thing wrt. bio splitting.
1654	 * Correctness first, performance later.  Next step is to code an
1655	 * asynchronous variant of the same.
1656	 */
1657	if (peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) {
1658		/* wait for all pending IO completions, before we start
1659		 * zeroing things out. */
1660		conn_wait_active_ee_empty(peer_req->peer_device->connection);
1661		/* add it to the active list now,
1662		 * so we can find it to present it in debugfs */
1663		peer_req->submit_jif = jiffies;
1664		peer_req->flags |= EE_SUBMITTED;
1665
1666		/* If this was a resync request from receive_rs_deallocated(),
1667		 * it is already on the sync_ee list */
1668		if (list_empty(&peer_req->w.list)) {
1669			spin_lock_irq(&device->resource->req_lock);
1670			list_add_tail(&peer_req->w.list, &device->active_ee);
1671			spin_unlock_irq(&device->resource->req_lock);
1672		}
1673
1674		if (peer_req->flags & (EE_TRIM|EE_ZEROOUT))
1675			drbd_issue_peer_discard_or_zero_out(device, peer_req);
1676		else /* EE_WRITE_SAME */
1677			drbd_issue_peer_wsame(device, peer_req);
1678		return 0;
1679	}
1680
1681	/* In most cases, we will only need one bio.  But in case the lower
1682	 * level restrictions happen to be different at this offset on this
1683	 * side than those of the sending peer, we may need to submit the
1684	 * request in more than one bio.
1685	 *
1686	 * Plain bio_alloc is good enough here, this is no DRBD internally
1687	 * generated bio, but a bio allocated on behalf of the peer.
1688	 */
1689next_bio:
1690	bio = bio_alloc(GFP_NOIO, nr_pages);
1691	if (!bio) {
1692		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1693		goto fail;
1694	}
1695	/* > peer_req->i.sector, unless this is the first bio */
1696	bio->bi_iter.bi_sector = sector;
1697	bio_set_dev(bio, device->ldev->backing_bdev);
1698	bio_set_op_attrs(bio, op, op_flags);
1699	bio->bi_private = peer_req;
1700	bio->bi_end_io = drbd_peer_request_endio;
1701
1702	bio->bi_next = bios;
1703	bios = bio;
1704	++n_bios;
1705
1706	page_chain_for_each(page) {
1707		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1708		if (!bio_add_page(bio, page, len, 0))
1709			goto next_bio;
1710		data_size -= len;
1711		sector += len >> 9;
1712		--nr_pages;
1713	}
1714	D_ASSERT(device, data_size == 0);
1715	D_ASSERT(device, page == NULL);
1716
1717	atomic_set(&peer_req->pending_bios, n_bios);
1718	/* for debugfs: update timestamp, mark as submitted */
1719	peer_req->submit_jif = jiffies;
1720	peer_req->flags |= EE_SUBMITTED;
1721	do {
1722		bio = bios;
1723		bios = bios->bi_next;
1724		bio->bi_next = NULL;
1725
1726		drbd_submit_bio_noacct(device, fault_type, bio);
1727	} while (bios);
1728	return 0;
1729
1730fail:
1731	while (bios) {
1732		bio = bios;
1733		bios = bios->bi_next;
1734		bio_put(bio);
1735	}
1736	return err;
1737}
1738
1739static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1740					     struct drbd_peer_request *peer_req)
1741{
1742	struct drbd_interval *i = &peer_req->i;
1743
1744	drbd_remove_interval(&device->write_requests, i);
1745	drbd_clear_interval(i);
1746
1747	/* Wake up any processes waiting for this peer request to complete.  */
1748	if (i->waiting)
1749		wake_up(&device->misc_wait);
1750}
1751
1752static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1753{
1754	struct drbd_peer_device *peer_device;
1755	int vnr;
1756
1757	rcu_read_lock();
1758	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1759		struct drbd_device *device = peer_device->device;
1760
1761		kref_get(&device->kref);
1762		rcu_read_unlock();
1763		drbd_wait_ee_list_empty(device, &device->active_ee);
1764		kref_put(&device->kref, drbd_destroy_device);
1765		rcu_read_lock();
1766	}
1767	rcu_read_unlock();
1768}
1769
1770static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1771{
1772	int rv;
1773	struct p_barrier *p = pi->data;
1774	struct drbd_epoch *epoch;
1775
1776	/* FIXME these are unacked on connection,
1777	 * not a specific (peer)device.
1778	 */
1779	connection->current_epoch->barrier_nr = p->barrier;
1780	connection->current_epoch->connection = connection;
1781	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1782
1783	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1784	 * the activity log, which means it would not be resynced in case the
1785	 * R_PRIMARY crashes now.
1786	 * Therefore we must send the barrier_ack after the barrier request was
1787	 * completed. */
1788	switch (connection->resource->write_ordering) {
1789	case WO_NONE:
1790		if (rv == FE_RECYCLED)
1791			return 0;
1792
1793		/* receiver context, in the writeout path of the other node.
1794		 * avoid potential distributed deadlock */
1795		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1796		if (epoch)
1797			break;
1798		else
1799			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1800		fallthrough;
1801
1802	case WO_BDEV_FLUSH:
1803	case WO_DRAIN_IO:
1804		conn_wait_active_ee_empty(connection);
1805		drbd_flush(connection);
1806
1807		if (atomic_read(&connection->current_epoch->epoch_size)) {
1808			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1809			if (epoch)
1810				break;
1811		}
1812
1813		return 0;
1814	default:
1815		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1816			 connection->resource->write_ordering);
1817		return -EIO;
1818	}
1819
1820	epoch->flags = 0;
1821	atomic_set(&epoch->epoch_size, 0);
1822	atomic_set(&epoch->active, 0);
1823
1824	spin_lock(&connection->epoch_lock);
1825	if (atomic_read(&connection->current_epoch->epoch_size)) {
1826		list_add(&epoch->list, &connection->current_epoch->list);
1827		connection->current_epoch = epoch;
1828		connection->epochs++;
1829	} else {
1830		/* The current_epoch got recycled while we allocated this one... */
1831		kfree(epoch);
1832	}
1833	spin_unlock(&connection->epoch_lock);
1834
1835	return 0;
1836}
1837
1838/* quick wrapper in case payload size != request_size (write same) */
1839static void drbd_csum_ee_size(struct crypto_shash *h,
1840			      struct drbd_peer_request *r, void *d,
1841			      unsigned int payload_size)
1842{
1843	unsigned int tmp = r->i.size;
1844	r->i.size = payload_size;
1845	drbd_csum_ee(h, r, d);
1846	r->i.size = tmp;
1847}
1848
1849/* used from receive_RSDataReply (recv_resync_read)
1850 * and from receive_Data.
1851 * data_size: actual payload ("data in")
1852 * 	for normal writes that is bi_size.
1853 * 	for discards, that is zero.
1854 * 	for write same, it is logical_block_size.
1855 * both trim and write same have the bi_size ("data len to be affected")
1856 * as extra argument in the packet header.
1857 */
1858static struct drbd_peer_request *
1859read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1860	      struct packet_info *pi) __must_hold(local)
1861{
1862	struct drbd_device *device = peer_device->device;
1863	const sector_t capacity = get_capacity(device->vdisk);
1864	struct drbd_peer_request *peer_req;
1865	struct page *page;
1866	int digest_size, err;
1867	unsigned int data_size = pi->size, ds;
1868	void *dig_in = peer_device->connection->int_dig_in;
1869	void *dig_vv = peer_device->connection->int_dig_vv;
1870	unsigned long *data;
1871	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1872	struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
1873	struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1874
1875	digest_size = 0;
1876	if (!trim && peer_device->connection->peer_integrity_tfm) {
1877		digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1878		/*
1879		 * FIXME: Receive the incoming digest into the receive buffer
1880		 *	  here, together with its struct p_data?
1881		 */
1882		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1883		if (err)
1884			return NULL;
1885		data_size -= digest_size;
1886	}
1887
1888	/* assume request_size == data_size, but special case trim and wsame. */
1889	ds = data_size;
1890	if (trim) {
1891		if (!expect(data_size == 0))
1892			return NULL;
1893		ds = be32_to_cpu(trim->size);
1894	} else if (zeroes) {
1895		if (!expect(data_size == 0))
1896			return NULL;
1897		ds = be32_to_cpu(zeroes->size);
1898	} else if (wsame) {
1899		if (data_size != queue_logical_block_size(device->rq_queue)) {
1900			drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1901				data_size, queue_logical_block_size(device->rq_queue));
1902			return NULL;
1903		}
1904		if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1905			drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1906				data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1907			return NULL;
1908		}
1909		ds = be32_to_cpu(wsame->size);
1910	}
1911
1912	if (!expect(IS_ALIGNED(ds, 512)))
1913		return NULL;
1914	if (trim || wsame || zeroes) {
1915		if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1916			return NULL;
1917	} else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1918		return NULL;
1919
1920	/* even though we trust out peer,
1921	 * we sometimes have to double check. */
1922	if (sector + (ds>>9) > capacity) {
1923		drbd_err(device, "request from peer beyond end of local disk: "
1924			"capacity: %llus < sector: %llus + size: %u\n",
1925			(unsigned long long)capacity,
1926			(unsigned long long)sector, ds);
1927		return NULL;
1928	}
1929
1930	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1931	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1932	 * which in turn might block on the other node at this very place.  */
1933	peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1934	if (!peer_req)
1935		return NULL;
1936
1937	peer_req->flags |= EE_WRITE;
1938	if (trim) {
1939		peer_req->flags |= EE_TRIM;
1940		return peer_req;
1941	}
1942	if (zeroes) {
1943		peer_req->flags |= EE_ZEROOUT;
1944		return peer_req;
1945	}
1946	if (wsame)
1947		peer_req->flags |= EE_WRITE_SAME;
1948
1949	/* receive payload size bytes into page chain */
1950	ds = data_size;
1951	page = peer_req->pages;
1952	page_chain_for_each(page) {
1953		unsigned len = min_t(int, ds, PAGE_SIZE);
1954		data = kmap(page);
1955		err = drbd_recv_all_warn(peer_device->connection, data, len);
1956		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1957			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1958			data[0] = data[0] ^ (unsigned long)-1;
1959		}
1960		kunmap(page);
1961		if (err) {
1962			drbd_free_peer_req(device, peer_req);
1963			return NULL;
1964		}
1965		ds -= len;
1966	}
1967
1968	if (digest_size) {
1969		drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1970		if (memcmp(dig_in, dig_vv, digest_size)) {
1971			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1972				(unsigned long long)sector, data_size);
1973			drbd_free_peer_req(device, peer_req);
1974			return NULL;
1975		}
1976	}
1977	device->recv_cnt += data_size >> 9;
1978	return peer_req;
1979}
1980
1981/* drbd_drain_block() just takes a data block
1982 * out of the socket input buffer, and discards it.
1983 */
1984static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1985{
1986	struct page *page;
1987	int err = 0;
1988	void *data;
1989
1990	if (!data_size)
1991		return 0;
1992
1993	page = drbd_alloc_pages(peer_device, 1, 1);
1994
1995	data = kmap(page);
1996	while (data_size) {
1997		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1998
1999		err = drbd_recv_all_warn(peer_device->connection, data, len);
2000		if (err)
2001			break;
2002		data_size -= len;
2003	}
2004	kunmap(page);
2005	drbd_free_pages(peer_device->device, page, 0);
2006	return err;
2007}
2008
2009static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
2010			   sector_t sector, int data_size)
2011{
2012	struct bio_vec bvec;
2013	struct bvec_iter iter;
2014	struct bio *bio;
2015	int digest_size, err, expect;
2016	void *dig_in = peer_device->connection->int_dig_in;
2017	void *dig_vv = peer_device->connection->int_dig_vv;
2018
2019	digest_size = 0;
2020	if (peer_device->connection->peer_integrity_tfm) {
2021		digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
2022		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
2023		if (err)
2024			return err;
2025		data_size -= digest_size;
2026	}
2027
2028	/* optimistically update recv_cnt.  if receiving fails below,
2029	 * we disconnect anyways, and counters will be reset. */
2030	peer_device->device->recv_cnt += data_size>>9;
2031
2032	bio = req->master_bio;
2033	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
2034
2035	bio_for_each_segment(bvec, bio, iter) {
2036		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
2037		expect = min_t(int, data_size, bvec.bv_len);
2038		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
2039		kunmap(bvec.bv_page);
2040		if (err)
2041			return err;
2042		data_size -= expect;
2043	}
2044
2045	if (digest_size) {
2046		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
2047		if (memcmp(dig_in, dig_vv, digest_size)) {
2048			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
2049			return -EINVAL;
2050		}
2051	}
2052
2053	D_ASSERT(peer_device->device, data_size == 0);
2054	return 0;
2055}
2056
2057/*
2058 * e_end_resync_block() is called in ack_sender context via
2059 * drbd_finish_peer_reqs().
2060 */
2061static int e_end_resync_block(struct drbd_work *w, int unused)
2062{
2063	struct drbd_peer_request *peer_req =
2064		container_of(w, struct drbd_peer_request, w);
2065	struct drbd_peer_device *peer_device = peer_req->peer_device;
2066	struct drbd_device *device = peer_device->device;
2067	sector_t sector = peer_req->i.sector;
2068	int err;
2069
2070	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2071
2072	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2073		drbd_set_in_sync(device, sector, peer_req->i.size);
2074		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2075	} else {
2076		/* Record failure to sync */
2077		drbd_rs_failed_io(device, sector, peer_req->i.size);
2078
2079		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2080	}
2081	dec_unacked(device);
2082
2083	return err;
2084}
2085
2086static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2087			    struct packet_info *pi) __releases(local)
2088{
2089	struct drbd_device *device = peer_device->device;
2090	struct drbd_peer_request *peer_req;
2091
2092	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2093	if (!peer_req)
2094		goto fail;
2095
2096	dec_rs_pending(device);
2097
2098	inc_unacked(device);
2099	/* corresponding dec_unacked() in e_end_resync_block()
2100	 * respective _drbd_clear_done_ee */
2101
2102	peer_req->w.cb = e_end_resync_block;
2103	peer_req->submit_jif = jiffies;
2104
2105	spin_lock_irq(&device->resource->req_lock);
2106	list_add_tail(&peer_req->w.list, &device->sync_ee);
2107	spin_unlock_irq(&device->resource->req_lock);
2108
2109	atomic_add(pi->size >> 9, &device->rs_sect_ev);
2110	if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
2111				     DRBD_FAULT_RS_WR) == 0)
2112		return 0;
2113
2114	/* don't care for the reason here */
2115	drbd_err(device, "submit failed, triggering re-connect\n");
2116	spin_lock_irq(&device->resource->req_lock);
2117	list_del(&peer_req->w.list);
2118	spin_unlock_irq(&device->resource->req_lock);
2119
2120	drbd_free_peer_req(device, peer_req);
2121fail:
2122	put_ldev(device);
2123	return -EIO;
2124}
2125
2126static struct drbd_request *
2127find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2128	     sector_t sector, bool missing_ok, const char *func)
2129{
2130	struct drbd_request *req;
2131
2132	/* Request object according to our peer */
2133	req = (struct drbd_request *)(unsigned long)id;
2134	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2135		return req;
2136	if (!missing_ok) {
2137		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2138			(unsigned long)id, (unsigned long long)sector);
2139	}
2140	return NULL;
2141}
2142
2143static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2144{
2145	struct drbd_peer_device *peer_device;
2146	struct drbd_device *device;
2147	struct drbd_request *req;
2148	sector_t sector;
2149	int err;
2150	struct p_data *p = pi->data;
2151
2152	peer_device = conn_peer_device(connection, pi->vnr);
2153	if (!peer_device)
2154		return -EIO;
2155	device = peer_device->device;
2156
2157	sector = be64_to_cpu(p->sector);
2158
2159	spin_lock_irq(&device->resource->req_lock);
2160	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2161	spin_unlock_irq(&device->resource->req_lock);
2162	if (unlikely(!req))
2163		return -EIO;
2164
2165	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2166	 * special casing it there for the various failure cases.
2167	 * still no race with drbd_fail_pending_reads */
2168	err = recv_dless_read(peer_device, req, sector, pi->size);
2169	if (!err)
2170		req_mod(req, DATA_RECEIVED);
2171	/* else: nothing. handled from drbd_disconnect...
2172	 * I don't think we may complete this just yet
2173	 * in case we are "on-disconnect: freeze" */
2174
2175	return err;
2176}
2177
2178static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2179{
2180	struct drbd_peer_device *peer_device;
2181	struct drbd_device *device;
2182	sector_t sector;
2183	int err;
2184	struct p_data *p = pi->data;
2185
2186	peer_device = conn_peer_device(connection, pi->vnr);
2187	if (!peer_device)
2188		return -EIO;
2189	device = peer_device->device;
2190
2191	sector = be64_to_cpu(p->sector);
2192	D_ASSERT(device, p->block_id == ID_SYNCER);
2193
2194	if (get_ldev(device)) {
2195		/* data is submitted to disk within recv_resync_read.
2196		 * corresponding put_ldev done below on error,
2197		 * or in drbd_peer_request_endio. */
2198		err = recv_resync_read(peer_device, sector, pi);
2199	} else {
2200		if (__ratelimit(&drbd_ratelimit_state))
2201			drbd_err(device, "Can not write resync data to local disk.\n");
2202
2203		err = drbd_drain_block(peer_device, pi->size);
2204
2205		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2206	}
2207
2208	atomic_add(pi->size >> 9, &device->rs_sect_in);
2209
2210	return err;
2211}
2212
2213static void restart_conflicting_writes(struct drbd_device *device,
2214				       sector_t sector, int size)
2215{
2216	struct drbd_interval *i;
2217	struct drbd_request *req;
2218
2219	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2220		if (!i->local)
2221			continue;
2222		req = container_of(i, struct drbd_request, i);
2223		if (req->rq_state & RQ_LOCAL_PENDING ||
2224		    !(req->rq_state & RQ_POSTPONED))
2225			continue;
2226		/* as it is RQ_POSTPONED, this will cause it to
2227		 * be queued on the retry workqueue. */
2228		__req_mod(req, CONFLICT_RESOLVED, NULL);
2229	}
2230}
2231
2232/*
2233 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2234 */
2235static int e_end_block(struct drbd_work *w, int cancel)
2236{
2237	struct drbd_peer_request *peer_req =
2238		container_of(w, struct drbd_peer_request, w);
2239	struct drbd_peer_device *peer_device = peer_req->peer_device;
2240	struct drbd_device *device = peer_device->device;
2241	sector_t sector = peer_req->i.sector;
2242	int err = 0, pcmd;
2243
2244	if (peer_req->flags & EE_SEND_WRITE_ACK) {
2245		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2246			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2247				device->state.conn <= C_PAUSED_SYNC_T &&
2248				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2249				P_RS_WRITE_ACK : P_WRITE_ACK;
2250			err = drbd_send_ack(peer_device, pcmd, peer_req);
2251			if (pcmd == P_RS_WRITE_ACK)
2252				drbd_set_in_sync(device, sector, peer_req->i.size);
2253		} else {
2254			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2255			/* we expect it to be marked out of sync anyways...
2256			 * maybe assert this?  */
2257		}
2258		dec_unacked(device);
2259	}
2260
2261	/* we delete from the conflict detection hash _after_ we sent out the
2262	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2263	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2264		spin_lock_irq(&device->resource->req_lock);
2265		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2266		drbd_remove_epoch_entry_interval(device, peer_req);
2267		if (peer_req->flags & EE_RESTART_REQUESTS)
2268			restart_conflicting_writes(device, sector, peer_req->i.size);
2269		spin_unlock_irq(&device->resource->req_lock);
2270	} else
2271		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2272
2273	drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2274
2275	return err;
2276}
2277
2278static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2279{
2280	struct drbd_peer_request *peer_req =
2281		container_of(w, struct drbd_peer_request, w);
2282	struct drbd_peer_device *peer_device = peer_req->peer_device;
2283	int err;
2284
2285	err = drbd_send_ack(peer_device, ack, peer_req);
2286	dec_unacked(peer_device->device);
2287
2288	return err;
2289}
2290
2291static int e_send_superseded(struct drbd_work *w, int unused)
2292{
2293	return e_send_ack(w, P_SUPERSEDED);
2294}
2295
2296static int e_send_retry_write(struct drbd_work *w, int unused)
2297{
2298	struct drbd_peer_request *peer_req =
2299		container_of(w, struct drbd_peer_request, w);
2300	struct drbd_connection *connection = peer_req->peer_device->connection;
2301
2302	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2303			     P_RETRY_WRITE : P_SUPERSEDED);
2304}
2305
2306static bool seq_greater(u32 a, u32 b)
2307{
2308	/*
2309	 * We assume 32-bit wrap-around here.
2310	 * For 24-bit wrap-around, we would have to shift:
2311	 *  a <<= 8; b <<= 8;
2312	 */
2313	return (s32)a - (s32)b > 0;
2314}
2315
2316static u32 seq_max(u32 a, u32 b)
2317{
2318	return seq_greater(a, b) ? a : b;
2319}
2320
2321static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2322{
2323	struct drbd_device *device = peer_device->device;
2324	unsigned int newest_peer_seq;
2325
2326	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2327		spin_lock(&device->peer_seq_lock);
2328		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2329		device->peer_seq = newest_peer_seq;
2330		spin_unlock(&device->peer_seq_lock);
2331		/* wake up only if we actually changed device->peer_seq */
2332		if (peer_seq == newest_peer_seq)
2333			wake_up(&device->seq_wait);
2334	}
2335}
2336
2337static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2338{
2339	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2340}
2341
2342/* maybe change sync_ee into interval trees as well? */
2343static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2344{
2345	struct drbd_peer_request *rs_req;
2346	bool rv = false;
2347
2348	spin_lock_irq(&device->resource->req_lock);
2349	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2350		if (overlaps(peer_req->i.sector, peer_req->i.size,
2351			     rs_req->i.sector, rs_req->i.size)) {
2352			rv = true;
2353			break;
2354		}
2355	}
2356	spin_unlock_irq(&device->resource->req_lock);
2357
2358	return rv;
2359}
2360
2361/* Called from receive_Data.
2362 * Synchronize packets on sock with packets on msock.
2363 *
2364 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2365 * packet traveling on msock, they are still processed in the order they have
2366 * been sent.
2367 *
2368 * Note: we don't care for Ack packets overtaking P_DATA packets.
2369 *
2370 * In case packet_seq is larger than device->peer_seq number, there are
2371 * outstanding packets on the msock. We wait for them to arrive.
2372 * In case we are the logically next packet, we update device->peer_seq
2373 * ourselves. Correctly handles 32bit wrap around.
2374 *
2375 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2376 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2377 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2378 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2379 *
2380 * returns 0 if we may process the packet,
2381 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2382static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2383{
2384	struct drbd_device *device = peer_device->device;
2385	DEFINE_WAIT(wait);
2386	long timeout;
2387	int ret = 0, tp;
2388
2389	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2390		return 0;
2391
2392	spin_lock(&device->peer_seq_lock);
2393	for (;;) {
2394		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2395			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2396			break;
2397		}
2398
2399		if (signal_pending(current)) {
2400			ret = -ERESTARTSYS;
2401			break;
2402		}
2403
2404		rcu_read_lock();
2405		tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2406		rcu_read_unlock();
2407
2408		if (!tp)
2409			break;
2410
2411		/* Only need to wait if two_primaries is enabled */
2412		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2413		spin_unlock(&device->peer_seq_lock);
2414		rcu_read_lock();
2415		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2416		rcu_read_unlock();
2417		timeout = schedule_timeout(timeout);
2418		spin_lock(&device->peer_seq_lock);
2419		if (!timeout) {
2420			ret = -ETIMEDOUT;
2421			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2422			break;
2423		}
2424	}
2425	spin_unlock(&device->peer_seq_lock);
2426	finish_wait(&device->seq_wait, &wait);
2427	return ret;
2428}
2429
2430/* see also bio_flags_to_wire()
2431 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2432 * flags and back. We may replicate to other kernel versions. */
2433static unsigned long wire_flags_to_bio_flags(u32 dpf)
2434{
2435	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2436		(dpf & DP_FUA ? REQ_FUA : 0) |
2437		(dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2438}
2439
2440static unsigned long wire_flags_to_bio_op(u32 dpf)
2441{
2442	if (dpf & DP_ZEROES)
2443		return REQ_OP_WRITE_ZEROES;
2444	if (dpf & DP_DISCARD)
2445		return REQ_OP_DISCARD;
2446	if (dpf & DP_WSAME)
2447		return REQ_OP_WRITE_SAME;
2448	else
2449		return REQ_OP_WRITE;
2450}
2451
2452static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2453				    unsigned int size)
2454{
2455	struct drbd_interval *i;
2456
2457    repeat:
2458	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2459		struct drbd_request *req;
2460		struct bio_and_error m;
2461
2462		if (!i->local)
2463			continue;
2464		req = container_of(i, struct drbd_request, i);
2465		if (!(req->rq_state & RQ_POSTPONED))
2466			continue;
2467		req->rq_state &= ~RQ_POSTPONED;
2468		__req_mod(req, NEG_ACKED, &m);
2469		spin_unlock_irq(&device->resource->req_lock);
2470		if (m.bio)
2471			complete_master_bio(device, &m);
2472		spin_lock_irq(&device->resource->req_lock);
2473		goto repeat;
2474	}
2475}
2476
2477static int handle_write_conflicts(struct drbd_device *device,
2478				  struct drbd_peer_request *peer_req)
2479{
2480	struct drbd_connection *connection = peer_req->peer_device->connection;
2481	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2482	sector_t sector = peer_req->i.sector;
2483	const unsigned int size = peer_req->i.size;
2484	struct drbd_interval *i;
2485	bool equal;
2486	int err;
2487
2488	/*
2489	 * Inserting the peer request into the write_requests tree will prevent
2490	 * new conflicting local requests from being added.
2491	 */
2492	drbd_insert_interval(&device->write_requests, &peer_req->i);
2493
2494    repeat:
2495	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2496		if (i == &peer_req->i)
2497			continue;
2498		if (i->completed)
2499			continue;
2500
2501		if (!i->local) {
2502			/*
2503			 * Our peer has sent a conflicting remote request; this
2504			 * should not happen in a two-node setup.  Wait for the
2505			 * earlier peer request to complete.
2506			 */
2507			err = drbd_wait_misc(device, i);
2508			if (err)
2509				goto out;
2510			goto repeat;
2511		}
2512
2513		equal = i->sector == sector && i->size == size;
2514		if (resolve_conflicts) {
2515			/*
2516			 * If the peer request is fully contained within the
2517			 * overlapping request, it can be considered overwritten
2518			 * and thus superseded; otherwise, it will be retried
2519			 * once all overlapping requests have completed.
2520			 */
2521			bool superseded = i->sector <= sector && i->sector +
2522				       (i->size >> 9) >= sector + (size >> 9);
2523
2524			if (!equal)
2525				drbd_alert(device, "Concurrent writes detected: "
2526					       "local=%llus +%u, remote=%llus +%u, "
2527					       "assuming %s came first\n",
2528					  (unsigned long long)i->sector, i->size,
2529					  (unsigned long long)sector, size,
2530					  superseded ? "local" : "remote");
2531
2532			peer_req->w.cb = superseded ? e_send_superseded :
2533						   e_send_retry_write;
2534			list_add_tail(&peer_req->w.list, &device->done_ee);
2535			queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2536
2537			err = -ENOENT;
2538			goto out;
2539		} else {
2540			struct drbd_request *req =
2541				container_of(i, struct drbd_request, i);
2542
2543			if (!equal)
2544				drbd_alert(device, "Concurrent writes detected: "
2545					       "local=%llus +%u, remote=%llus +%u\n",
2546					  (unsigned long long)i->sector, i->size,
2547					  (unsigned long long)sector, size);
2548
2549			if (req->rq_state & RQ_LOCAL_PENDING ||
2550			    !(req->rq_state & RQ_POSTPONED)) {
2551				/*
2552				 * Wait for the node with the discard flag to
2553				 * decide if this request has been superseded
2554				 * or needs to be retried.
2555				 * Requests that have been superseded will
2556				 * disappear from the write_requests tree.
2557				 *
2558				 * In addition, wait for the conflicting
2559				 * request to finish locally before submitting
2560				 * the conflicting peer request.
2561				 */
2562				err = drbd_wait_misc(device, &req->i);
2563				if (err) {
2564					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2565					fail_postponed_requests(device, sector, size);
2566					goto out;
2567				}
2568				goto repeat;
2569			}
2570			/*
2571			 * Remember to restart the conflicting requests after
2572			 * the new peer request has completed.
2573			 */
2574			peer_req->flags |= EE_RESTART_REQUESTS;
2575		}
2576	}
2577	err = 0;
2578
2579    out:
2580	if (err)
2581		drbd_remove_epoch_entry_interval(device, peer_req);
2582	return err;
2583}
2584
2585/* mirrored write */
2586static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2587{
2588	struct drbd_peer_device *peer_device;
2589	struct drbd_device *device;
2590	struct net_conf *nc;
2591	sector_t sector;
2592	struct drbd_peer_request *peer_req;
2593	struct p_data *p = pi->data;
2594	u32 peer_seq = be32_to_cpu(p->seq_num);
2595	int op, op_flags;
2596	u32 dp_flags;
2597	int err, tp;
2598
2599	peer_device = conn_peer_device(connection, pi->vnr);
2600	if (!peer_device)
2601		return -EIO;
2602	device = peer_device->device;
2603
2604	if (!get_ldev(device)) {
2605		int err2;
2606
2607		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2608		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2609		atomic_inc(&connection->current_epoch->epoch_size);
2610		err2 = drbd_drain_block(peer_device, pi->size);
2611		if (!err)
2612			err = err2;
2613		return err;
2614	}
2615
2616	/*
2617	 * Corresponding put_ldev done either below (on various errors), or in
2618	 * drbd_peer_request_endio, if we successfully submit the data at the
2619	 * end of this function.
2620	 */
2621
2622	sector = be64_to_cpu(p->sector);
2623	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2624	if (!peer_req) {
2625		put_ldev(device);
2626		return -EIO;
2627	}
2628
2629	peer_req->w.cb = e_end_block;
2630	peer_req->submit_jif = jiffies;
2631	peer_req->flags |= EE_APPLICATION;
2632
2633	dp_flags = be32_to_cpu(p->dp_flags);
2634	op = wire_flags_to_bio_op(dp_flags);
2635	op_flags = wire_flags_to_bio_flags(dp_flags);
2636	if (pi->cmd == P_TRIM) {
2637		D_ASSERT(peer_device, peer_req->i.size > 0);
2638		D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2639		D_ASSERT(peer_device, peer_req->pages == NULL);
2640		/* need to play safe: an older DRBD sender
2641		 * may mean zero-out while sending P_TRIM. */
2642		if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
2643			peer_req->flags |= EE_ZEROOUT;
2644	} else if (pi->cmd == P_ZEROES) {
2645		D_ASSERT(peer_device, peer_req->i.size > 0);
2646		D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2647		D_ASSERT(peer_device, peer_req->pages == NULL);
2648		/* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
2649		if (dp_flags & DP_DISCARD)
2650			peer_req->flags |= EE_TRIM;
2651	} else if (peer_req->pages == NULL) {
2652		D_ASSERT(device, peer_req->i.size == 0);
2653		D_ASSERT(device, dp_flags & DP_FLUSH);
2654	}
2655
2656	if (dp_flags & DP_MAY_SET_IN_SYNC)
2657		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2658
2659	spin_lock(&connection->epoch_lock);
2660	peer_req->epoch = connection->current_epoch;
2661	atomic_inc(&peer_req->epoch->epoch_size);
2662	atomic_inc(&peer_req->epoch->active);
2663	spin_unlock(&connection->epoch_lock);
2664
2665	rcu_read_lock();
2666	nc = rcu_dereference(peer_device->connection->net_conf);
2667	tp = nc->two_primaries;
2668	if (peer_device->connection->agreed_pro_version < 100) {
2669		switch (nc->wire_protocol) {
2670		case DRBD_PROT_C:
2671			dp_flags |= DP_SEND_WRITE_ACK;
2672			break;
2673		case DRBD_PROT_B:
2674			dp_flags |= DP_SEND_RECEIVE_ACK;
2675			break;
2676		}
2677	}
2678	rcu_read_unlock();
2679
2680	if (dp_flags & DP_SEND_WRITE_ACK) {
2681		peer_req->flags |= EE_SEND_WRITE_ACK;
2682		inc_unacked(device);
2683		/* corresponding dec_unacked() in e_end_block()
2684		 * respective _drbd_clear_done_ee */
2685	}
2686
2687	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2688		/* I really don't like it that the receiver thread
2689		 * sends on the msock, but anyways */
2690		drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2691	}
2692
2693	if (tp) {
2694		/* two primaries implies protocol C */
2695		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2696		peer_req->flags |= EE_IN_INTERVAL_TREE;
2697		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2698		if (err)
2699			goto out_interrupted;
2700		spin_lock_irq(&device->resource->req_lock);
2701		err = handle_write_conflicts(device, peer_req);
2702		if (err) {
2703			spin_unlock_irq(&device->resource->req_lock);
2704			if (err == -ENOENT) {
2705				put_ldev(device);
2706				return 0;
2707			}
2708			goto out_interrupted;
2709		}
2710	} else {
2711		update_peer_seq(peer_device, peer_seq);
2712		spin_lock_irq(&device->resource->req_lock);
2713	}
2714	/* TRIM and WRITE_SAME are processed synchronously,
2715	 * we wait for all pending requests, respectively wait for
2716	 * active_ee to become empty in drbd_submit_peer_request();
2717	 * better not add ourselves here. */
2718	if ((peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) == 0)
2719		list_add_tail(&peer_req->w.list, &device->active_ee);
2720	spin_unlock_irq(&device->resource->req_lock);
2721
2722	if (device->state.conn == C_SYNC_TARGET)
2723		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2724
2725	if (device->state.pdsk < D_INCONSISTENT) {
2726		/* In case we have the only disk of the cluster, */
2727		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2728		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2729		drbd_al_begin_io(device, &peer_req->i);
2730		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2731	}
2732
2733	err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2734				       DRBD_FAULT_DT_WR);
2735	if (!err)
2736		return 0;
2737
2738	/* don't care for the reason here */
2739	drbd_err(device, "submit failed, triggering re-connect\n");
2740	spin_lock_irq(&device->resource->req_lock);
2741	list_del(&peer_req->w.list);
2742	drbd_remove_epoch_entry_interval(device, peer_req);
2743	spin_unlock_irq(&device->resource->req_lock);
2744	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2745		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2746		drbd_al_complete_io(device, &peer_req->i);
2747	}
2748
2749out_interrupted:
2750	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2751	put_ldev(device);
2752	drbd_free_peer_req(device, peer_req);
2753	return err;
2754}
2755
2756/* We may throttle resync, if the lower device seems to be busy,
2757 * and current sync rate is above c_min_rate.
2758 *
2759 * To decide whether or not the lower device is busy, we use a scheme similar
2760 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2761 * (more than 64 sectors) of activity we cannot account for with our own resync
2762 * activity, it obviously is "busy".
2763 *
2764 * The current sync rate used here uses only the most recent two step marks,
2765 * to have a short time average so we can react faster.
2766 */
2767bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2768		bool throttle_if_app_is_waiting)
2769{
2770	struct lc_element *tmp;
2771	bool throttle = drbd_rs_c_min_rate_throttle(device);
2772
2773	if (!throttle || throttle_if_app_is_waiting)
2774		return throttle;
2775
2776	spin_lock_irq(&device->al_lock);
2777	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2778	if (tmp) {
2779		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2780		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2781			throttle = false;
2782		/* Do not slow down if app IO is already waiting for this extent,
2783		 * and our progress is necessary for application IO to complete. */
2784	}
2785	spin_unlock_irq(&device->al_lock);
2786
2787	return throttle;
2788}
2789
2790bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2791{
2792	struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
2793	unsigned long db, dt, dbdt;
2794	unsigned int c_min_rate;
2795	int curr_events;
2796
2797	rcu_read_lock();
2798	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2799	rcu_read_unlock();
2800
2801	/* feature disabled? */
2802	if (c_min_rate == 0)
2803		return false;
2804
2805	curr_events = (int)part_stat_read_accum(&disk->part0, sectors) -
2806			atomic_read(&device->rs_sect_ev);
2807
2808	if (atomic_read(&device->ap_actlog_cnt)
2809	    || curr_events - device->rs_last_events > 64) {
2810		unsigned long rs_left;
2811		int i;
2812
2813		device->rs_last_events = curr_events;
2814
2815		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2816		 * approx. */
2817		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2818
2819		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2820			rs_left = device->ov_left;
2821		else
2822			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2823
2824		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2825		if (!dt)
2826			dt++;
2827		db = device->rs_mark_left[i] - rs_left;
2828		dbdt = Bit2KB(db/dt);
2829
2830		if (dbdt > c_min_rate)
2831			return true;
2832	}
2833	return false;
2834}
2835
2836static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2837{
2838	struct drbd_peer_device *peer_device;
2839	struct drbd_device *device;
2840	sector_t sector;
2841	sector_t capacity;
2842	struct drbd_peer_request *peer_req;
2843	struct digest_info *di = NULL;
2844	int size, verb;
2845	unsigned int fault_type;
2846	struct p_block_req *p =	pi->data;
2847
2848	peer_device = conn_peer_device(connection, pi->vnr);
2849	if (!peer_device)
2850		return -EIO;
2851	device = peer_device->device;
2852	capacity = get_capacity(device->vdisk);
2853
2854	sector = be64_to_cpu(p->sector);
2855	size   = be32_to_cpu(p->blksize);
2856
2857	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2858		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2859				(unsigned long long)sector, size);
2860		return -EINVAL;
2861	}
2862	if (sector + (size>>9) > capacity) {
2863		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2864				(unsigned long long)sector, size);
2865		return -EINVAL;
2866	}
2867
2868	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2869		verb = 1;
2870		switch (pi->cmd) {
2871		case P_DATA_REQUEST:
2872			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2873			break;
2874		case P_RS_THIN_REQ:
2875		case P_RS_DATA_REQUEST:
2876		case P_CSUM_RS_REQUEST:
2877		case P_OV_REQUEST:
2878			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2879			break;
2880		case P_OV_REPLY:
2881			verb = 0;
2882			dec_rs_pending(device);
2883			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2884			break;
2885		default:
2886			BUG();
2887		}
2888		if (verb && __ratelimit(&drbd_ratelimit_state))
2889			drbd_err(device, "Can not satisfy peer's read request, "
2890			    "no local data.\n");
2891
2892		/* drain possibly payload */
2893		return drbd_drain_block(peer_device, pi->size);
2894	}
2895
2896	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2897	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2898	 * which in turn might block on the other node at this very place.  */
2899	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2900			size, GFP_NOIO);
2901	if (!peer_req) {
2902		put_ldev(device);
2903		return -ENOMEM;
2904	}
2905
2906	switch (pi->cmd) {
2907	case P_DATA_REQUEST:
2908		peer_req->w.cb = w_e_end_data_req;
2909		fault_type = DRBD_FAULT_DT_RD;
2910		/* application IO, don't drbd_rs_begin_io */
2911		peer_req->flags |= EE_APPLICATION;
2912		goto submit;
2913
2914	case P_RS_THIN_REQ:
2915		/* If at some point in the future we have a smart way to
2916		   find out if this data block is completely deallocated,
2917		   then we would do something smarter here than reading
2918		   the block... */
2919		peer_req->flags |= EE_RS_THIN_REQ;
2920		fallthrough;
2921	case P_RS_DATA_REQUEST:
2922		peer_req->w.cb = w_e_end_rsdata_req;
2923		fault_type = DRBD_FAULT_RS_RD;
2924		/* used in the sector offset progress display */
2925		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2926		break;
2927
2928	case P_OV_REPLY:
2929	case P_CSUM_RS_REQUEST:
2930		fault_type = DRBD_FAULT_RS_RD;
2931		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2932		if (!di)
2933			goto out_free_e;
2934
2935		di->digest_size = pi->size;
2936		di->digest = (((char *)di)+sizeof(struct digest_info));
2937
2938		peer_req->digest = di;
2939		peer_req->flags |= EE_HAS_DIGEST;
2940
2941		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2942			goto out_free_e;
2943
2944		if (pi->cmd == P_CSUM_RS_REQUEST) {
2945			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2946			peer_req->w.cb = w_e_end_csum_rs_req;
2947			/* used in the sector offset progress display */
2948			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2949			/* remember to report stats in drbd_resync_finished */
2950			device->use_csums = true;
2951		} else if (pi->cmd == P_OV_REPLY) {
2952			/* track progress, we may need to throttle */
2953			atomic_add(size >> 9, &device->rs_sect_in);
2954			peer_req->w.cb = w_e_end_ov_reply;
2955			dec_rs_pending(device);
2956			/* drbd_rs_begin_io done when we sent this request,
2957			 * but accounting still needs to be done. */
2958			goto submit_for_resync;
2959		}
2960		break;
2961
2962	case P_OV_REQUEST:
2963		if (device->ov_start_sector == ~(sector_t)0 &&
2964		    peer_device->connection->agreed_pro_version >= 90) {
2965			unsigned long now = jiffies;
2966			int i;
2967			device->ov_start_sector = sector;
2968			device->ov_position = sector;
2969			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2970			device->rs_total = device->ov_left;
2971			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2972				device->rs_mark_left[i] = device->ov_left;
2973				device->rs_mark_time[i] = now;
2974			}
2975			drbd_info(device, "Online Verify start sector: %llu\n",
2976					(unsigned long long)sector);
2977		}
2978		peer_req->w.cb = w_e_end_ov_req;
2979		fault_type = DRBD_FAULT_RS_RD;
2980		break;
2981
2982	default:
2983		BUG();
2984	}
2985
2986	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2987	 * wrt the receiver, but it is not as straightforward as it may seem.
2988	 * Various places in the resync start and stop logic assume resync
2989	 * requests are processed in order, requeuing this on the worker thread
2990	 * introduces a bunch of new code for synchronization between threads.
2991	 *
2992	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2993	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2994	 * for application writes for the same time.  For now, just throttle
2995	 * here, where the rest of the code expects the receiver to sleep for
2996	 * a while, anyways.
2997	 */
2998
2999	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
3000	 * this defers syncer requests for some time, before letting at least
3001	 * on request through.  The resync controller on the receiving side
3002	 * will adapt to the incoming rate accordingly.
3003	 *
3004	 * We cannot throttle here if remote is Primary/SyncTarget:
3005	 * we would also throttle its application reads.
3006	 * In that case, throttling is done on the SyncTarget only.
3007	 */
3008
3009	/* Even though this may be a resync request, we do add to "read_ee";
3010	 * "sync_ee" is only used for resync WRITEs.
3011	 * Add to list early, so debugfs can find this request
3012	 * even if we have to sleep below. */
3013	spin_lock_irq(&device->resource->req_lock);
3014	list_add_tail(&peer_req->w.list, &device->read_ee);
3015	spin_unlock_irq(&device->resource->req_lock);
3016
3017	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
3018	if (device->state.peer != R_PRIMARY
3019	&& drbd_rs_should_slow_down(device, sector, false))
3020		schedule_timeout_uninterruptible(HZ/10);
3021	update_receiver_timing_details(connection, drbd_rs_begin_io);
3022	if (drbd_rs_begin_io(device, sector))
3023		goto out_free_e;
3024
3025submit_for_resync:
3026	atomic_add(size >> 9, &device->rs_sect_ev);
3027
3028submit:
3029	update_receiver_timing_details(connection, drbd_submit_peer_request);
3030	inc_unacked(device);
3031	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
3032				     fault_type) == 0)
3033		return 0;
3034
3035	/* don't care for the reason here */
3036	drbd_err(device, "submit failed, triggering re-connect\n");
3037
3038out_free_e:
3039	spin_lock_irq(&device->resource->req_lock);
3040	list_del(&peer_req->w.list);
3041	spin_unlock_irq(&device->resource->req_lock);
3042	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
3043
3044	put_ldev(device);
3045	drbd_free_peer_req(device, peer_req);
3046	return -EIO;
3047}
3048
3049/**
3050 * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
3051 */
3052static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
3053{
3054	struct drbd_device *device = peer_device->device;
3055	int self, peer, rv = -100;
3056	unsigned long ch_self, ch_peer;
3057	enum drbd_after_sb_p after_sb_0p;
3058
3059	self = device->ldev->md.uuid[UI_BITMAP] & 1;
3060	peer = device->p_uuid[UI_BITMAP] & 1;
3061
3062	ch_peer = device->p_uuid[UI_SIZE];
3063	ch_self = device->comm_bm_set;
3064
3065	rcu_read_lock();
3066	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
3067	rcu_read_unlock();
3068	switch (after_sb_0p) {
3069	case ASB_CONSENSUS:
3070	case ASB_DISCARD_SECONDARY:
3071	case ASB_CALL_HELPER:
3072	case ASB_VIOLENTLY:
3073		drbd_err(device, "Configuration error.\n");
3074		break;
3075	case ASB_DISCONNECT:
3076		break;
3077	case ASB_DISCARD_YOUNGER_PRI:
3078		if (self == 0 && peer == 1) {
3079			rv = -1;
3080			break;
3081		}
3082		if (self == 1 && peer == 0) {
3083			rv =  1;
3084			break;
3085		}
3086		fallthrough;	/* to one of the other strategies */
3087	case ASB_DISCARD_OLDER_PRI:
3088		if (self == 0 && peer == 1) {
3089			rv = 1;
3090			break;
3091		}
3092		if (self == 1 && peer == 0) {
3093			rv = -1;
3094			break;
3095		}
3096		/* Else fall through to one of the other strategies... */
3097		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3098		     "Using discard-least-changes instead\n");
3099		fallthrough;
3100	case ASB_DISCARD_ZERO_CHG:
3101		if (ch_peer == 0 && ch_self == 0) {
3102			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3103				? -1 : 1;
3104			break;
3105		} else {
3106			if (ch_peer == 0) { rv =  1; break; }
3107			if (ch_self == 0) { rv = -1; break; }
3108		}
3109		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3110			break;
3111		fallthrough;
3112	case ASB_DISCARD_LEAST_CHG:
3113		if	(ch_self < ch_peer)
3114			rv = -1;
3115		else if (ch_self > ch_peer)
3116			rv =  1;
3117		else /* ( ch_self == ch_peer ) */
3118		     /* Well, then use something else. */
3119			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3120				? -1 : 1;
3121		break;
3122	case ASB_DISCARD_LOCAL:
3123		rv = -1;
3124		break;
3125	case ASB_DISCARD_REMOTE:
3126		rv =  1;
3127	}
3128
3129	return rv;
3130}
3131
3132/**
3133 * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3134 */
3135static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3136{
3137	struct drbd_device *device = peer_device->device;
3138	int hg, rv = -100;
3139	enum drbd_after_sb_p after_sb_1p;
3140
3141	rcu_read_lock();
3142	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3143	rcu_read_unlock();
3144	switch (after_sb_1p) {
3145	case ASB_DISCARD_YOUNGER_PRI:
3146	case ASB_DISCARD_OLDER_PRI:
3147	case ASB_DISCARD_LEAST_CHG:
3148	case ASB_DISCARD_LOCAL:
3149	case ASB_DISCARD_REMOTE:
3150	case ASB_DISCARD_ZERO_CHG:
3151		drbd_err(device, "Configuration error.\n");
3152		break;
3153	case ASB_DISCONNECT:
3154		break;
3155	case ASB_CONSENSUS:
3156		hg = drbd_asb_recover_0p(peer_device);
3157		if (hg == -1 && device->state.role == R_SECONDARY)
3158			rv = hg;
3159		if (hg == 1  && device->state.role == R_PRIMARY)
3160			rv = hg;
3161		break;
3162	case ASB_VIOLENTLY:
3163		rv = drbd_asb_recover_0p(peer_device);
3164		break;
3165	case ASB_DISCARD_SECONDARY:
3166		return device->state.role == R_PRIMARY ? 1 : -1;
3167	case ASB_CALL_HELPER:
3168		hg = drbd_asb_recover_0p(peer_device);
3169		if (hg == -1 && device->state.role == R_PRIMARY) {
3170			enum drbd_state_rv rv2;
3171
3172			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3173			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3174			  * we do not need to wait for the after state change work either. */
3175			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3176			if (rv2 != SS_SUCCESS) {
3177				drbd_khelper(device, "pri-lost-after-sb");
3178			} else {
3179				drbd_warn(device, "Successfully gave up primary role.\n");
3180				rv = hg;
3181			}
3182		} else
3183			rv = hg;
3184	}
3185
3186	return rv;
3187}
3188
3189/**
3190 * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3191 */
3192static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3193{
3194	struct drbd_device *device = peer_device->device;
3195	int hg, rv = -100;
3196	enum drbd_after_sb_p after_sb_2p;
3197
3198	rcu_read_lock();
3199	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3200	rcu_read_unlock();
3201	switch (after_sb_2p) {
3202	case ASB_DISCARD_YOUNGER_PRI:
3203	case ASB_DISCARD_OLDER_PRI:
3204	case ASB_DISCARD_LEAST_CHG:
3205	case ASB_DISCARD_LOCAL:
3206	case ASB_DISCARD_REMOTE:
3207	case ASB_CONSENSUS:
3208	case ASB_DISCARD_SECONDARY:
3209	case ASB_DISCARD_ZERO_CHG:
3210		drbd_err(device, "Configuration error.\n");
3211		break;
3212	case ASB_VIOLENTLY:
3213		rv = drbd_asb_recover_0p(peer_device);
3214		break;
3215	case ASB_DISCONNECT:
3216		break;
3217	case ASB_CALL_HELPER:
3218		hg = drbd_asb_recover_0p(peer_device);
3219		if (hg == -1) {
3220			enum drbd_state_rv rv2;
3221
3222			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3223			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3224			  * we do not need to wait for the after state change work either. */
3225			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3226			if (rv2 != SS_SUCCESS) {
3227				drbd_khelper(device, "pri-lost-after-sb");
3228			} else {
3229				drbd_warn(device, "Successfully gave up primary role.\n");
3230				rv = hg;
3231			}
3232		} else
3233			rv = hg;
3234	}
3235
3236	return rv;
3237}
3238
3239static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3240			   u64 bits, u64 flags)
3241{
3242	if (!uuid) {
3243		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3244		return;
3245	}
3246	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3247	     text,
3248	     (unsigned long long)uuid[UI_CURRENT],
3249	     (unsigned long long)uuid[UI_BITMAP],
3250	     (unsigned long long)uuid[UI_HISTORY_START],
3251	     (unsigned long long)uuid[UI_HISTORY_END],
3252	     (unsigned long long)bits,
3253	     (unsigned long long)flags);
3254}
3255
3256/*
3257  100	after split brain try auto recover
3258    2	C_SYNC_SOURCE set BitMap
3259    1	C_SYNC_SOURCE use BitMap
3260    0	no Sync
3261   -1	C_SYNC_TARGET use BitMap
3262   -2	C_SYNC_TARGET set BitMap
3263 -100	after split brain, disconnect
3264-1000	unrelated data
3265-1091   requires proto 91
3266-1096   requires proto 96
3267 */
3268
3269static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3270{
3271	struct drbd_peer_device *const peer_device = first_peer_device(device);
3272	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3273	u64 self, peer;
3274	int i, j;
3275
3276	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3277	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3278
3279	*rule_nr = 10;
3280	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3281		return 0;
3282
3283	*rule_nr = 20;
3284	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3285	     peer != UUID_JUST_CREATED)
3286		return -2;
3287
3288	*rule_nr = 30;
3289	if (self != UUID_JUST_CREATED &&
3290	    (peer == UUID_JUST_CREATED || peer == (u64)0))
3291		return 2;
3292
3293	if (self == peer) {
3294		int rct, dc; /* roles at crash time */
3295
3296		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3297
3298			if (connection->agreed_pro_version < 91)
3299				return -1091;
3300
3301			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3302			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3303				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3304				drbd_uuid_move_history(device);
3305				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3306				device->ldev->md.uuid[UI_BITMAP] = 0;
3307
3308				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3309					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3310				*rule_nr = 34;
3311			} else {
3312				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3313				*rule_nr = 36;
3314			}
3315
3316			return 1;
3317		}
3318
3319		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3320
3321			if (connection->agreed_pro_version < 91)
3322				return -1091;
3323
3324			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3325			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3326				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3327
3328				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3329				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3330				device->p_uuid[UI_BITMAP] = 0UL;
3331
3332				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3333				*rule_nr = 35;
3334			} else {
3335				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3336				*rule_nr = 37;
3337			}
3338
3339			return -1;
3340		}
3341
3342		/* Common power [off|failure] */
3343		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3344			(device->p_uuid[UI_FLAGS] & 2);
3345		/* lowest bit is set when we were primary,
3346		 * next bit (weight 2) is set when peer was primary */
3347		*rule_nr = 40;
3348
3349		/* Neither has the "crashed primary" flag set,
3350		 * only a replication link hickup. */
3351		if (rct == 0)
3352			return 0;
3353
3354		/* Current UUID equal and no bitmap uuid; does not necessarily
3355		 * mean this was a "simultaneous hard crash", maybe IO was
3356		 * frozen, so no UUID-bump happened.
3357		 * This is a protocol change, overload DRBD_FF_WSAME as flag
3358		 * for "new-enough" peer DRBD version. */
3359		if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3360			*rule_nr = 41;
3361			if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3362				drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3363				return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3364			}
3365			if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3366				/* At least one has the "crashed primary" bit set,
3367				 * both are primary now, but neither has rotated its UUIDs?
3368				 * "Can not happen." */
3369				drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3370				return -100;
3371			}
3372			if (device->state.role == R_PRIMARY)
3373				return 1;
3374			return -1;
3375		}
3376
3377		/* Both are secondary.
3378		 * Really looks like recovery from simultaneous hard crash.
3379		 * Check which had been primary before, and arbitrate. */
3380		switch (rct) {
3381		case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3382		case 1: /*  self_pri && !peer_pri */ return 1;
3383		case 2: /* !self_pri &&  peer_pri */ return -1;
3384		case 3: /*  self_pri &&  peer_pri */
3385			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3386			return dc ? -1 : 1;
3387		}
3388	}
3389
3390	*rule_nr = 50;
3391	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3392	if (self == peer)
3393		return -1;
3394
3395	*rule_nr = 51;
3396	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3397	if (self == peer) {
3398		if (connection->agreed_pro_version < 96 ?
3399		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3400		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3401		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3402			/* The last P_SYNC_UUID did not get though. Undo the last start of
3403			   resync as sync source modifications of the peer's UUIDs. */
3404
3405			if (connection->agreed_pro_version < 91)
3406				return -1091;
3407
3408			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3409			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3410
3411			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3412			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3413
3414			return -1;
3415		}
3416	}
3417
3418	*rule_nr = 60;
3419	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3420	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3421		peer = device->p_uuid[i] & ~((u64)1);
3422		if (self == peer)
3423			return -2;
3424	}
3425
3426	*rule_nr = 70;
3427	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3428	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3429	if (self == peer)
3430		return 1;
3431
3432	*rule_nr = 71;
3433	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3434	if (self == peer) {
3435		if (connection->agreed_pro_version < 96 ?
3436		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3437		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3438		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3439			/* The last P_SYNC_UUID did not get though. Undo the last start of
3440			   resync as sync source modifications of our UUIDs. */
3441
3442			if (connection->agreed_pro_version < 91)
3443				return -1091;
3444
3445			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3446			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3447
3448			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3449			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3450				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3451
3452			return 1;
3453		}
3454	}
3455
3456
3457	*rule_nr = 80;
3458	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3459	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3460		self = device->ldev->md.uuid[i] & ~((u64)1);
3461		if (self == peer)
3462			return 2;
3463	}
3464
3465	*rule_nr = 90;
3466	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3467	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3468	if (self == peer && self != ((u64)0))
3469		return 100;
3470
3471	*rule_nr = 100;
3472	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3473		self = device->ldev->md.uuid[i] & ~((u64)1);
3474		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3475			peer = device->p_uuid[j] & ~((u64)1);
3476			if (self == peer)
3477				return -100;
3478		}
3479	}
3480
3481	return -1000;
3482}
3483
3484/* drbd_sync_handshake() returns the new conn state on success, or
3485   CONN_MASK (-1) on failure.
3486 */
3487static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3488					   enum drbd_role peer_role,
3489					   enum drbd_disk_state peer_disk) __must_hold(local)
3490{
3491	struct drbd_device *device = peer_device->device;
3492	enum drbd_conns rv = C_MASK;
3493	enum drbd_disk_state mydisk;
3494	struct net_conf *nc;
3495	int hg, rule_nr, rr_conflict, tentative, always_asbp;
3496
3497	mydisk = device->state.disk;
3498	if (mydisk == D_NEGOTIATING)
3499		mydisk = device->new_state_tmp.disk;
3500
3501	drbd_info(device, "drbd_sync_handshake:\n");
3502
3503	spin_lock_irq(&device->ldev->md.uuid_lock);
3504	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3505	drbd_uuid_dump(device, "peer", device->p_uuid,
3506		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3507
3508	hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3509	spin_unlock_irq(&device->ldev->md.uuid_lock);
3510
3511	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3512
3513	if (hg == -1000) {
3514		drbd_alert(device, "Unrelated data, aborting!\n");
3515		return C_MASK;
3516	}
3517	if (hg < -0x10000) {
3518		int proto, fflags;
3519		hg = -hg;
3520		proto = hg & 0xff;
3521		fflags = (hg >> 8) & 0xff;
3522		drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3523					proto, fflags);
3524		return C_MASK;
3525	}
3526	if (hg < -1000) {
3527		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3528		return C_MASK;
3529	}
3530
3531	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3532	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3533		int f = (hg == -100) || abs(hg) == 2;
3534		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3535		if (f)
3536			hg = hg*2;
3537		drbd_info(device, "Becoming sync %s due to disk states.\n",
3538		     hg > 0 ? "source" : "target");
3539	}
3540
3541	if (abs(hg) == 100)
3542		drbd_khelper(device, "initial-split-brain");
3543
3544	rcu_read_lock();
3545	nc = rcu_dereference(peer_device->connection->net_conf);
3546	always_asbp = nc->always_asbp;
3547	rr_conflict = nc->rr_conflict;
3548	tentative = nc->tentative;
3549	rcu_read_unlock();
3550
3551	if (hg == 100 || (hg == -100 && always_asbp)) {
3552		int pcount = (device->state.role == R_PRIMARY)
3553			   + (peer_role == R_PRIMARY);
3554		int forced = (hg == -100);
3555
3556		switch (pcount) {
3557		case 0:
3558			hg = drbd_asb_recover_0p(peer_device);
3559			break;
3560		case 1:
3561			hg = drbd_asb_recover_1p(peer_device);
3562			break;
3563		case 2:
3564			hg = drbd_asb_recover_2p(peer_device);
3565			break;
3566		}
3567		if (abs(hg) < 100) {
3568			drbd_warn(device, "Split-Brain detected, %d primaries, "
3569			     "automatically solved. Sync from %s node\n",
3570			     pcount, (hg < 0) ? "peer" : "this");
3571			if (forced) {
3572				drbd_warn(device, "Doing a full sync, since"
3573				     " UUIDs where ambiguous.\n");
3574				hg = hg*2;
3575			}
3576		}
3577	}
3578
3579	if (hg == -100) {
3580		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3581			hg = -1;
3582		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3583			hg = 1;
3584
3585		if (abs(hg) < 100)
3586			drbd_warn(device, "Split-Brain detected, manually solved. "
3587			     "Sync from %s node\n",
3588			     (hg < 0) ? "peer" : "this");
3589	}
3590
3591	if (hg == -100) {
3592		/* FIXME this log message is not correct if we end up here
3593		 * after an attempted attach on a diskless node.
3594		 * We just refuse to attach -- well, we drop the "connection"
3595		 * to that disk, in a way... */
3596		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3597		drbd_khelper(device, "split-brain");
3598		return C_MASK;
3599	}
3600
3601	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3602		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3603		return C_MASK;
3604	}
3605
3606	if (hg < 0 && /* by intention we do not use mydisk here. */
3607	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3608		switch (rr_conflict) {
3609		case ASB_CALL_HELPER:
3610			drbd_khelper(device, "pri-lost");
3611			fallthrough;
3612		case ASB_DISCONNECT:
3613			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3614			return C_MASK;
3615		case ASB_VIOLENTLY:
3616			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3617			     "assumption\n");
3618		}
3619	}
3620
3621	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3622		if (hg == 0)
3623			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3624		else
3625			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3626				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3627				 abs(hg) >= 2 ? "full" : "bit-map based");
3628		return C_MASK;
3629	}
3630
3631	if (abs(hg) >= 2) {
3632		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3633		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3634					BM_LOCKED_SET_ALLOWED))
3635			return C_MASK;
3636	}
3637
3638	if (hg > 0) { /* become sync source. */
3639		rv = C_WF_BITMAP_S;
3640	} else if (hg < 0) { /* become sync target */
3641		rv = C_WF_BITMAP_T;
3642	} else {
3643		rv = C_CONNECTED;
3644		if (drbd_bm_total_weight(device)) {
3645			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3646			     drbd_bm_total_weight(device));
3647		}
3648	}
3649
3650	return rv;
3651}
3652
3653static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3654{
3655	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3656	if (peer == ASB_DISCARD_REMOTE)
3657		return ASB_DISCARD_LOCAL;
3658
3659	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3660	if (peer == ASB_DISCARD_LOCAL)
3661		return ASB_DISCARD_REMOTE;
3662
3663	/* everything else is valid if they are equal on both sides. */
3664	return peer;
3665}
3666
3667static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3668{
3669	struct p_protocol *p = pi->data;
3670	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3671	int p_proto, p_discard_my_data, p_two_primaries, cf;
3672	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3673	char integrity_alg[SHARED_SECRET_MAX] = "";
3674	struct crypto_shash *peer_integrity_tfm = NULL;
3675	void *int_dig_in = NULL, *int_dig_vv = NULL;
3676
3677	p_proto		= be32_to_cpu(p->protocol);
3678	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3679	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3680	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3681	p_two_primaries = be32_to_cpu(p->two_primaries);
3682	cf		= be32_to_cpu(p->conn_flags);
3683	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3684
3685	if (connection->agreed_pro_version >= 87) {
3686		int err;
3687
3688		if (pi->size > sizeof(integrity_alg))
3689			return -EIO;
3690		err = drbd_recv_all(connection, integrity_alg, pi->size);
3691		if (err)
3692			return err;
3693		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3694	}
3695
3696	if (pi->cmd != P_PROTOCOL_UPDATE) {
3697		clear_bit(CONN_DRY_RUN, &connection->flags);
3698
3699		if (cf & CF_DRY_RUN)
3700			set_bit(CONN_DRY_RUN, &connection->flags);
3701
3702		rcu_read_lock();
3703		nc = rcu_dereference(connection->net_conf);
3704
3705		if (p_proto != nc->wire_protocol) {
3706			drbd_err(connection, "incompatible %s settings\n", "protocol");
3707			goto disconnect_rcu_unlock;
3708		}
3709
3710		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3711			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3712			goto disconnect_rcu_unlock;
3713		}
3714
3715		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3716			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3717			goto disconnect_rcu_unlock;
3718		}
3719
3720		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3721			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3722			goto disconnect_rcu_unlock;
3723		}
3724
3725		if (p_discard_my_data && nc->discard_my_data) {
3726			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3727			goto disconnect_rcu_unlock;
3728		}
3729
3730		if (p_two_primaries != nc->two_primaries) {
3731			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3732			goto disconnect_rcu_unlock;
3733		}
3734
3735		if (strcmp(integrity_alg, nc->integrity_alg)) {
3736			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3737			goto disconnect_rcu_unlock;
3738		}
3739
3740		rcu_read_unlock();
3741	}
3742
3743	if (integrity_alg[0]) {
3744		int hash_size;
3745
3746		/*
3747		 * We can only change the peer data integrity algorithm
3748		 * here.  Changing our own data integrity algorithm
3749		 * requires that we send a P_PROTOCOL_UPDATE packet at
3750		 * the same time; otherwise, the peer has no way to
3751		 * tell between which packets the algorithm should
3752		 * change.
3753		 */
3754
3755		peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0);
3756		if (IS_ERR(peer_integrity_tfm)) {
3757			peer_integrity_tfm = NULL;
3758			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3759				 integrity_alg);
3760			goto disconnect;
3761		}
3762
3763		hash_size = crypto_shash_digestsize(peer_integrity_tfm);
3764		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3765		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3766		if (!(int_dig_in && int_dig_vv)) {
3767			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3768			goto disconnect;
3769		}
3770	}
3771
3772	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3773	if (!new_net_conf) {
3774		drbd_err(connection, "Allocation of new net_conf failed\n");
3775		goto disconnect;
3776	}
3777
3778	mutex_lock(&connection->data.mutex);
3779	mutex_lock(&connection->resource->conf_update);
3780	old_net_conf = connection->net_conf;
3781	*new_net_conf = *old_net_conf;
3782
3783	new_net_conf->wire_protocol = p_proto;
3784	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3785	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3786	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3787	new_net_conf->two_primaries = p_two_primaries;
3788
3789	rcu_assign_pointer(connection->net_conf, new_net_conf);
3790	mutex_unlock(&connection->resource->conf_update);
3791	mutex_unlock(&connection->data.mutex);
3792
3793	crypto_free_shash(connection->peer_integrity_tfm);
3794	kfree(connection->int_dig_in);
3795	kfree(connection->int_dig_vv);
3796	connection->peer_integrity_tfm = peer_integrity_tfm;
3797	connection->int_dig_in = int_dig_in;
3798	connection->int_dig_vv = int_dig_vv;
3799
3800	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3801		drbd_info(connection, "peer data-integrity-alg: %s\n",
3802			  integrity_alg[0] ? integrity_alg : "(none)");
3803
3804	synchronize_rcu();
3805	kfree(old_net_conf);
3806	return 0;
3807
3808disconnect_rcu_unlock:
3809	rcu_read_unlock();
3810disconnect:
3811	crypto_free_shash(peer_integrity_tfm);
3812	kfree(int_dig_in);
3813	kfree(int_dig_vv);
3814	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3815	return -EIO;
3816}
3817
3818/* helper function
3819 * input: alg name, feature name
3820 * return: NULL (alg name was "")
3821 *         ERR_PTR(error) if something goes wrong
3822 *         or the crypto hash ptr, if it worked out ok. */
3823static struct crypto_shash *drbd_crypto_alloc_digest_safe(
3824		const struct drbd_device *device,
3825		const char *alg, const char *name)
3826{
3827	struct crypto_shash *tfm;
3828
3829	if (!alg[0])
3830		return NULL;
3831
3832	tfm = crypto_alloc_shash(alg, 0, 0);
3833	if (IS_ERR(tfm)) {
3834		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3835			alg, name, PTR_ERR(tfm));
3836		return tfm;
3837	}
3838	return tfm;
3839}
3840
3841static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3842{
3843	void *buffer = connection->data.rbuf;
3844	int size = pi->size;
3845
3846	while (size) {
3847		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3848		s = drbd_recv(connection, buffer, s);
3849		if (s <= 0) {
3850			if (s < 0)
3851				return s;
3852			break;
3853		}
3854		size -= s;
3855	}
3856	if (size)
3857		return -EIO;
3858	return 0;
3859}
3860
3861/*
3862 * config_unknown_volume  -  device configuration command for unknown volume
3863 *
3864 * When a device is added to an existing connection, the node on which the
3865 * device is added first will send configuration commands to its peer but the
3866 * peer will not know about the device yet.  It will warn and ignore these
3867 * commands.  Once the device is added on the second node, the second node will
3868 * send the same device configuration commands, but in the other direction.
3869 *
3870 * (We can also end up here if drbd is misconfigured.)
3871 */
3872static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3873{
3874	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3875		  cmdname(pi->cmd), pi->vnr);
3876	return ignore_remaining_packet(connection, pi);
3877}
3878
3879static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3880{
3881	struct drbd_peer_device *peer_device;
3882	struct drbd_device *device;
3883	struct p_rs_param_95 *p;
3884	unsigned int header_size, data_size, exp_max_sz;
3885	struct crypto_shash *verify_tfm = NULL;
3886	struct crypto_shash *csums_tfm = NULL;
3887	struct net_conf *old_net_conf, *new_net_conf = NULL;
3888	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3889	const int apv = connection->agreed_pro_version;
3890	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3891	unsigned int fifo_size = 0;
3892	int err;
3893
3894	peer_device = conn_peer_device(connection, pi->vnr);
3895	if (!peer_device)
3896		return config_unknown_volume(connection, pi);
3897	device = peer_device->device;
3898
3899	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3900		    : apv == 88 ? sizeof(struct p_rs_param)
3901					+ SHARED_SECRET_MAX
3902		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3903		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3904
3905	if (pi->size > exp_max_sz) {
3906		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3907		    pi->size, exp_max_sz);
3908		return -EIO;
3909	}
3910
3911	if (apv <= 88) {
3912		header_size = sizeof(struct p_rs_param);
3913		data_size = pi->size - header_size;
3914	} else if (apv <= 94) {
3915		header_size = sizeof(struct p_rs_param_89);
3916		data_size = pi->size - header_size;
3917		D_ASSERT(device, data_size == 0);
3918	} else {
3919		header_size = sizeof(struct p_rs_param_95);
3920		data_size = pi->size - header_size;
3921		D_ASSERT(device, data_size == 0);
3922	}
3923
3924	/* initialize verify_alg and csums_alg */
3925	p = pi->data;
3926	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3927
3928	err = drbd_recv_all(peer_device->connection, p, header_size);
3929	if (err)
3930		return err;
3931
3932	mutex_lock(&connection->resource->conf_update);
3933	old_net_conf = peer_device->connection->net_conf;
3934	if (get_ldev(device)) {
3935		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3936		if (!new_disk_conf) {
3937			put_ldev(device);
3938			mutex_unlock(&connection->resource->conf_update);
3939			drbd_err(device, "Allocation of new disk_conf failed\n");
3940			return -ENOMEM;
3941		}
3942
3943		old_disk_conf = device->ldev->disk_conf;
3944		*new_disk_conf = *old_disk_conf;
3945
3946		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3947	}
3948
3949	if (apv >= 88) {
3950		if (apv == 88) {
3951			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3952				drbd_err(device, "verify-alg of wrong size, "
3953					"peer wants %u, accepting only up to %u byte\n",
3954					data_size, SHARED_SECRET_MAX);
3955				err = -EIO;
3956				goto reconnect;
3957			}
3958
3959			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3960			if (err)
3961				goto reconnect;
3962			/* we expect NUL terminated string */
3963			/* but just in case someone tries to be evil */
3964			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3965			p->verify_alg[data_size-1] = 0;
3966
3967		} else /* apv >= 89 */ {
3968			/* we still expect NUL terminated strings */
3969			/* but just in case someone tries to be evil */
3970			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3971			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3972			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3973			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3974		}
3975
3976		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3977			if (device->state.conn == C_WF_REPORT_PARAMS) {
3978				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3979				    old_net_conf->verify_alg, p->verify_alg);
3980				goto disconnect;
3981			}
3982			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3983					p->verify_alg, "verify-alg");
3984			if (IS_ERR(verify_tfm)) {
3985				verify_tfm = NULL;
3986				goto disconnect;
3987			}
3988		}
3989
3990		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3991			if (device->state.conn == C_WF_REPORT_PARAMS) {
3992				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3993				    old_net_conf->csums_alg, p->csums_alg);
3994				goto disconnect;
3995			}
3996			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3997					p->csums_alg, "csums-alg");
3998			if (IS_ERR(csums_tfm)) {
3999				csums_tfm = NULL;
4000				goto disconnect;
4001			}
4002		}
4003
4004		if (apv > 94 && new_disk_conf) {
4005			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
4006			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
4007			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
4008			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
4009
4010			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
4011			if (fifo_size != device->rs_plan_s->size) {
4012				new_plan = fifo_alloc(fifo_size);
4013				if (!new_plan) {
4014					drbd_err(device, "kmalloc of fifo_buffer failed");
4015					put_ldev(device);
4016					goto disconnect;
4017				}
4018			}
4019		}
4020
4021		if (verify_tfm || csums_tfm) {
4022			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
4023			if (!new_net_conf) {
4024				drbd_err(device, "Allocation of new net_conf failed\n");
4025				goto disconnect;
4026			}
4027
4028			*new_net_conf = *old_net_conf;
4029
4030			if (verify_tfm) {
4031				strcpy(new_net_conf->verify_alg, p->verify_alg);
4032				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
4033				crypto_free_shash(peer_device->connection->verify_tfm);
4034				peer_device->connection->verify_tfm = verify_tfm;
4035				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
4036			}
4037			if (csums_tfm) {
4038				strcpy(new_net_conf->csums_alg, p->csums_alg);
4039				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
4040				crypto_free_shash(peer_device->connection->csums_tfm);
4041				peer_device->connection->csums_tfm = csums_tfm;
4042				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
4043			}
4044			rcu_assign_pointer(connection->net_conf, new_net_conf);
4045		}
4046	}
4047
4048	if (new_disk_conf) {
4049		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4050		put_ldev(device);
4051	}
4052
4053	if (new_plan) {
4054		old_plan = device->rs_plan_s;
4055		rcu_assign_pointer(device->rs_plan_s, new_plan);
4056	}
4057
4058	mutex_unlock(&connection->resource->conf_update);
4059	synchronize_rcu();
4060	if (new_net_conf)
4061		kfree(old_net_conf);
4062	kfree(old_disk_conf);
4063	kfree(old_plan);
4064
4065	return 0;
4066
4067reconnect:
4068	if (new_disk_conf) {
4069		put_ldev(device);
4070		kfree(new_disk_conf);
4071	}
4072	mutex_unlock(&connection->resource->conf_update);
4073	return -EIO;
4074
4075disconnect:
4076	kfree(new_plan);
4077	if (new_disk_conf) {
4078		put_ldev(device);
4079		kfree(new_disk_conf);
4080	}
4081	mutex_unlock(&connection->resource->conf_update);
4082	/* just for completeness: actually not needed,
4083	 * as this is not reached if csums_tfm was ok. */
4084	crypto_free_shash(csums_tfm);
4085	/* but free the verify_tfm again, if csums_tfm did not work out */
4086	crypto_free_shash(verify_tfm);
4087	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4088	return -EIO;
4089}
4090
4091/* warn if the arguments differ by more than 12.5% */
4092static void warn_if_differ_considerably(struct drbd_device *device,
4093	const char *s, sector_t a, sector_t b)
4094{
4095	sector_t d;
4096	if (a == 0 || b == 0)
4097		return;
4098	d = (a > b) ? (a - b) : (b - a);
4099	if (d > (a>>3) || d > (b>>3))
4100		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4101		     (unsigned long long)a, (unsigned long long)b);
4102}
4103
4104static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4105{
4106	struct drbd_peer_device *peer_device;
4107	struct drbd_device *device;
4108	struct p_sizes *p = pi->data;
4109	struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4110	enum determine_dev_size dd = DS_UNCHANGED;
4111	sector_t p_size, p_usize, p_csize, my_usize;
4112	sector_t new_size, cur_size;
4113	int ldsc = 0; /* local disk size changed */
4114	enum dds_flags ddsf;
4115
4116	peer_device = conn_peer_device(connection, pi->vnr);
4117	if (!peer_device)
4118		return config_unknown_volume(connection, pi);
4119	device = peer_device->device;
4120	cur_size = get_capacity(device->vdisk);
4121
4122	p_size = be64_to_cpu(p->d_size);
4123	p_usize = be64_to_cpu(p->u_size);
4124	p_csize = be64_to_cpu(p->c_size);
4125
4126	/* just store the peer's disk size for now.
4127	 * we still need to figure out whether we accept that. */
4128	device->p_size = p_size;
4129
4130	if (get_ldev(device)) {
4131		rcu_read_lock();
4132		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4133		rcu_read_unlock();
4134
4135		warn_if_differ_considerably(device, "lower level device sizes",
4136			   p_size, drbd_get_max_capacity(device->ldev));
4137		warn_if_differ_considerably(device, "user requested size",
4138					    p_usize, my_usize);
4139
4140		/* if this is the first connect, or an otherwise expected
4141		 * param exchange, choose the minimum */
4142		if (device->state.conn == C_WF_REPORT_PARAMS)
4143			p_usize = min_not_zero(my_usize, p_usize);
4144
4145		/* Never shrink a device with usable data during connect,
4146		 * or "attach" on the peer.
4147		 * But allow online shrinking if we are connected. */
4148		new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4149		if (new_size < cur_size &&
4150		    device->state.disk >= D_OUTDATED &&
4151		    (device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) {
4152			drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4153					(unsigned long long)new_size, (unsigned long long)cur_size);
4154			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4155			put_ldev(device);
4156			return -EIO;
4157		}
4158
4159		if (my_usize != p_usize) {
4160			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4161
4162			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4163			if (!new_disk_conf) {
4164				drbd_err(device, "Allocation of new disk_conf failed\n");
4165				put_ldev(device);
4166				return -ENOMEM;
4167			}
4168
4169			mutex_lock(&connection->resource->conf_update);
4170			old_disk_conf = device->ldev->disk_conf;
4171			*new_disk_conf = *old_disk_conf;
4172			new_disk_conf->disk_size = p_usize;
4173
4174			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4175			mutex_unlock(&connection->resource->conf_update);
4176			synchronize_rcu();
4177			kfree(old_disk_conf);
4178
4179			drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n",
4180				 (unsigned long)p_usize, (unsigned long)my_usize);
4181		}
4182
4183		put_ldev(device);
4184	}
4185
4186	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4187	/* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4188	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4189	   drbd_reconsider_queue_parameters(), we can be sure that after
4190	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4191
4192	ddsf = be16_to_cpu(p->dds_flags);
4193	if (get_ldev(device)) {
4194		drbd_reconsider_queue_parameters(device, device->ldev, o);
4195		dd = drbd_determine_dev_size(device, ddsf, NULL);
4196		put_ldev(device);
4197		if (dd == DS_ERROR)
4198			return -EIO;
4199		drbd_md_sync(device);
4200	} else {
4201		/*
4202		 * I am diskless, need to accept the peer's *current* size.
4203		 * I must NOT accept the peers backing disk size,
4204		 * it may have been larger than mine all along...
4205		 *
4206		 * At this point, the peer knows more about my disk, or at
4207		 * least about what we last agreed upon, than myself.
4208		 * So if his c_size is less than his d_size, the most likely
4209		 * reason is that *my* d_size was smaller last time we checked.
4210		 *
4211		 * However, if he sends a zero current size,
4212		 * take his (user-capped or) backing disk size anyways.
4213		 *
4214		 * Unless of course he does not have a disk himself.
4215		 * In which case we ignore this completely.
4216		 */
4217		sector_t new_size = p_csize ?: p_usize ?: p_size;
4218		drbd_reconsider_queue_parameters(device, NULL, o);
4219		if (new_size == 0) {
4220			/* Ignore, peer does not know nothing. */
4221		} else if (new_size == cur_size) {
4222			/* nothing to do */
4223		} else if (cur_size != 0 && p_size == 0) {
4224			drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
4225					(unsigned long long)new_size, (unsigned long long)cur_size);
4226		} else if (new_size < cur_size && device->state.role == R_PRIMARY) {
4227			drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
4228					(unsigned long long)new_size, (unsigned long long)cur_size);
4229			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4230			return -EIO;
4231		} else {
4232			/* I believe the peer, if
4233			 *  - I don't have a current size myself
4234			 *  - we agree on the size anyways
4235			 *  - I do have a current size, am Secondary,
4236			 *    and he has the only disk
4237			 *  - I do have a current size, am Primary,
4238			 *    and he has the only disk,
4239			 *    which is larger than my current size
4240			 */
4241			drbd_set_my_capacity(device, new_size);
4242		}
4243	}
4244
4245	if (get_ldev(device)) {
4246		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4247			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4248			ldsc = 1;
4249		}
4250
4251		put_ldev(device);
4252	}
4253
4254	if (device->state.conn > C_WF_REPORT_PARAMS) {
4255		if (be64_to_cpu(p->c_size) != get_capacity(device->vdisk) ||
4256		    ldsc) {
4257			/* we have different sizes, probably peer
4258			 * needs to know my new size... */
4259			drbd_send_sizes(peer_device, 0, ddsf);
4260		}
4261		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4262		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4263			if (device->state.pdsk >= D_INCONSISTENT &&
4264			    device->state.disk >= D_INCONSISTENT) {
4265				if (ddsf & DDSF_NO_RESYNC)
4266					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4267				else
4268					resync_after_online_grow(device);
4269			} else
4270				set_bit(RESYNC_AFTER_NEG, &device->flags);
4271		}
4272	}
4273
4274	return 0;
4275}
4276
4277static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4278{
4279	struct drbd_peer_device *peer_device;
4280	struct drbd_device *device;
4281	struct p_uuids *p = pi->data;
4282	u64 *p_uuid;
4283	int i, updated_uuids = 0;
4284
4285	peer_device = conn_peer_device(connection, pi->vnr);
4286	if (!peer_device)
4287		return config_unknown_volume(connection, pi);
4288	device = peer_device->device;
4289
4290	p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4291	if (!p_uuid) {
4292		drbd_err(device, "kmalloc of p_uuid failed\n");
4293		return false;
4294	}
4295
4296	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4297		p_uuid[i] = be64_to_cpu(p->uuid[i]);
4298
4299	kfree(device->p_uuid);
4300	device->p_uuid = p_uuid;
4301
4302	if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4303	    device->state.disk < D_INCONSISTENT &&
4304	    device->state.role == R_PRIMARY &&
4305	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4306		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4307		    (unsigned long long)device->ed_uuid);
4308		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4309		return -EIO;
4310	}
4311
4312	if (get_ldev(device)) {
4313		int skip_initial_sync =
4314			device->state.conn == C_CONNECTED &&
4315			peer_device->connection->agreed_pro_version >= 90 &&
4316			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4317			(p_uuid[UI_FLAGS] & 8);
4318		if (skip_initial_sync) {
4319			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4320			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4321					"clear_n_write from receive_uuids",
4322					BM_LOCKED_TEST_ALLOWED);
4323			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4324			_drbd_uuid_set(device, UI_BITMAP, 0);
4325			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4326					CS_VERBOSE, NULL);
4327			drbd_md_sync(device);
4328			updated_uuids = 1;
4329		}
4330		put_ldev(device);
4331	} else if (device->state.disk < D_INCONSISTENT &&
4332		   device->state.role == R_PRIMARY) {
4333		/* I am a diskless primary, the peer just created a new current UUID
4334		   for me. */
4335		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4336	}
4337
4338	/* Before we test for the disk state, we should wait until an eventually
4339	   ongoing cluster wide state change is finished. That is important if
4340	   we are primary and are detaching from our disk. We need to see the
4341	   new disk state... */
4342	mutex_lock(device->state_mutex);
4343	mutex_unlock(device->state_mutex);
4344	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4345		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4346
4347	if (updated_uuids)
4348		drbd_print_uuids(device, "receiver updated UUIDs to");
4349
4350	return 0;
4351}
4352
4353/**
4354 * convert_state() - Converts the peer's view of the cluster state to our point of view
4355 * @ps:		The state as seen by the peer.
4356 */
4357static union drbd_state convert_state(union drbd_state ps)
4358{
4359	union drbd_state ms;
4360
4361	static enum drbd_conns c_tab[] = {
4362		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4363		[C_CONNECTED] = C_CONNECTED,
4364
4365		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4366		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4367		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4368		[C_VERIFY_S]       = C_VERIFY_T,
4369		[C_MASK]   = C_MASK,
4370	};
4371
4372	ms.i = ps.i;
4373
4374	ms.conn = c_tab[ps.conn];
4375	ms.peer = ps.role;
4376	ms.role = ps.peer;
4377	ms.pdsk = ps.disk;
4378	ms.disk = ps.pdsk;
4379	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4380
4381	return ms;
4382}
4383
4384static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4385{
4386	struct drbd_peer_device *peer_device;
4387	struct drbd_device *device;
4388	struct p_req_state *p = pi->data;
4389	union drbd_state mask, val;
4390	enum drbd_state_rv rv;
4391
4392	peer_device = conn_peer_device(connection, pi->vnr);
4393	if (!peer_device)
4394		return -EIO;
4395	device = peer_device->device;
4396
4397	mask.i = be32_to_cpu(p->mask);
4398	val.i = be32_to_cpu(p->val);
4399
4400	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4401	    mutex_is_locked(device->state_mutex)) {
4402		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4403		return 0;
4404	}
4405
4406	mask = convert_state(mask);
4407	val = convert_state(val);
4408
4409	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4410	drbd_send_sr_reply(peer_device, rv);
4411
4412	drbd_md_sync(device);
4413
4414	return 0;
4415}
4416
4417static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4418{
4419	struct p_req_state *p = pi->data;
4420	union drbd_state mask, val;
4421	enum drbd_state_rv rv;
4422
4423	mask.i = be32_to_cpu(p->mask);
4424	val.i = be32_to_cpu(p->val);
4425
4426	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4427	    mutex_is_locked(&connection->cstate_mutex)) {
4428		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4429		return 0;
4430	}
4431
4432	mask = convert_state(mask);
4433	val = convert_state(val);
4434
4435	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4436	conn_send_sr_reply(connection, rv);
4437
4438	return 0;
4439}
4440
4441static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4442{
4443	struct drbd_peer_device *peer_device;
4444	struct drbd_device *device;
4445	struct p_state *p = pi->data;
4446	union drbd_state os, ns, peer_state;
4447	enum drbd_disk_state real_peer_disk;
4448	enum chg_state_flags cs_flags;
4449	int rv;
4450
4451	peer_device = conn_peer_device(connection, pi->vnr);
4452	if (!peer_device)
4453		return config_unknown_volume(connection, pi);
4454	device = peer_device->device;
4455
4456	peer_state.i = be32_to_cpu(p->state);
4457
4458	real_peer_disk = peer_state.disk;
4459	if (peer_state.disk == D_NEGOTIATING) {
4460		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4461		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4462	}
4463
4464	spin_lock_irq(&device->resource->req_lock);
4465 retry:
4466	os = ns = drbd_read_state(device);
4467	spin_unlock_irq(&device->resource->req_lock);
4468
4469	/* If some other part of the code (ack_receiver thread, timeout)
4470	 * already decided to close the connection again,
4471	 * we must not "re-establish" it here. */
4472	if (os.conn <= C_TEAR_DOWN)
4473		return -ECONNRESET;
4474
4475	/* If this is the "end of sync" confirmation, usually the peer disk
4476	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4477	 * set) resync started in PausedSyncT, or if the timing of pause-/
4478	 * unpause-sync events has been "just right", the peer disk may
4479	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4480	 */
4481	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4482	    real_peer_disk == D_UP_TO_DATE &&
4483	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4484		/* If we are (becoming) SyncSource, but peer is still in sync
4485		 * preparation, ignore its uptodate-ness to avoid flapping, it
4486		 * will change to inconsistent once the peer reaches active
4487		 * syncing states.
4488		 * It may have changed syncer-paused flags, however, so we
4489		 * cannot ignore this completely. */
4490		if (peer_state.conn > C_CONNECTED &&
4491		    peer_state.conn < C_SYNC_SOURCE)
4492			real_peer_disk = D_INCONSISTENT;
4493
4494		/* if peer_state changes to connected at the same time,
4495		 * it explicitly notifies us that it finished resync.
4496		 * Maybe we should finish it up, too? */
4497		else if (os.conn >= C_SYNC_SOURCE &&
4498			 peer_state.conn == C_CONNECTED) {
4499			if (drbd_bm_total_weight(device) <= device->rs_failed)
4500				drbd_resync_finished(device);
4501			return 0;
4502		}
4503	}
4504
4505	/* explicit verify finished notification, stop sector reached. */
4506	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4507	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4508		ov_out_of_sync_print(device);
4509		drbd_resync_finished(device);
4510		return 0;
4511	}
4512
4513	/* peer says his disk is inconsistent, while we think it is uptodate,
4514	 * and this happens while the peer still thinks we have a sync going on,
4515	 * but we think we are already done with the sync.
4516	 * We ignore this to avoid flapping pdsk.
4517	 * This should not happen, if the peer is a recent version of drbd. */
4518	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4519	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4520		real_peer_disk = D_UP_TO_DATE;
4521
4522	if (ns.conn == C_WF_REPORT_PARAMS)
4523		ns.conn = C_CONNECTED;
4524
4525	if (peer_state.conn == C_AHEAD)
4526		ns.conn = C_BEHIND;
4527
4528	/* TODO:
4529	 * if (primary and diskless and peer uuid != effective uuid)
4530	 *     abort attach on peer;
4531	 *
4532	 * If this node does not have good data, was already connected, but
4533	 * the peer did a late attach only now, trying to "negotiate" with me,
4534	 * AND I am currently Primary, possibly frozen, with some specific
4535	 * "effective" uuid, this should never be reached, really, because
4536	 * we first send the uuids, then the current state.
4537	 *
4538	 * In this scenario, we already dropped the connection hard
4539	 * when we received the unsuitable uuids (receive_uuids().
4540	 *
4541	 * Should we want to change this, that is: not drop the connection in
4542	 * receive_uuids() already, then we would need to add a branch here
4543	 * that aborts the attach of "unsuitable uuids" on the peer in case
4544	 * this node is currently Diskless Primary.
4545	 */
4546
4547	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4548	    get_ldev_if_state(device, D_NEGOTIATING)) {
4549		int cr; /* consider resync */
4550
4551		/* if we established a new connection */
4552		cr  = (os.conn < C_CONNECTED);
4553		/* if we had an established connection
4554		 * and one of the nodes newly attaches a disk */
4555		cr |= (os.conn == C_CONNECTED &&
4556		       (peer_state.disk == D_NEGOTIATING ||
4557			os.disk == D_NEGOTIATING));
4558		/* if we have both been inconsistent, and the peer has been
4559		 * forced to be UpToDate with --force */
4560		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4561		/* if we had been plain connected, and the admin requested to
4562		 * start a sync by "invalidate" or "invalidate-remote" */
4563		cr |= (os.conn == C_CONNECTED &&
4564				(peer_state.conn >= C_STARTING_SYNC_S &&
4565				 peer_state.conn <= C_WF_BITMAP_T));
4566
4567		if (cr)
4568			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4569
4570		put_ldev(device);
4571		if (ns.conn == C_MASK) {
4572			ns.conn = C_CONNECTED;
4573			if (device->state.disk == D_NEGOTIATING) {
4574				drbd_force_state(device, NS(disk, D_FAILED));
4575			} else if (peer_state.disk == D_NEGOTIATING) {
4576				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4577				peer_state.disk = D_DISKLESS;
4578				real_peer_disk = D_DISKLESS;
4579			} else {
4580				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4581					return -EIO;
4582				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4583				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4584				return -EIO;
4585			}
4586		}
4587	}
4588
4589	spin_lock_irq(&device->resource->req_lock);
4590	if (os.i != drbd_read_state(device).i)
4591		goto retry;
4592	clear_bit(CONSIDER_RESYNC, &device->flags);
4593	ns.peer = peer_state.role;
4594	ns.pdsk = real_peer_disk;
4595	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4596	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4597		ns.disk = device->new_state_tmp.disk;
4598	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4599	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4600	    test_bit(NEW_CUR_UUID, &device->flags)) {
4601		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4602		   for temporal network outages! */
4603		spin_unlock_irq(&device->resource->req_lock);
4604		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4605		tl_clear(peer_device->connection);
4606		drbd_uuid_new_current(device);
4607		clear_bit(NEW_CUR_UUID, &device->flags);
4608		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4609		return -EIO;
4610	}
4611	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4612	ns = drbd_read_state(device);
4613	spin_unlock_irq(&device->resource->req_lock);
4614
4615	if (rv < SS_SUCCESS) {
4616		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4617		return -EIO;
4618	}
4619
4620	if (os.conn > C_WF_REPORT_PARAMS) {
4621		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4622		    peer_state.disk != D_NEGOTIATING ) {
4623			/* we want resync, peer has not yet decided to sync... */
4624			/* Nowadays only used when forcing a node into primary role and
4625			   setting its disk to UpToDate with that */
4626			drbd_send_uuids(peer_device);
4627			drbd_send_current_state(peer_device);
4628		}
4629	}
4630
4631	clear_bit(DISCARD_MY_DATA, &device->flags);
4632
4633	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4634
4635	return 0;
4636}
4637
4638static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4639{
4640	struct drbd_peer_device *peer_device;
4641	struct drbd_device *device;
4642	struct p_rs_uuid *p = pi->data;
4643
4644	peer_device = conn_peer_device(connection, pi->vnr);
4645	if (!peer_device)
4646		return -EIO;
4647	device = peer_device->device;
4648
4649	wait_event(device->misc_wait,
4650		   device->state.conn == C_WF_SYNC_UUID ||
4651		   device->state.conn == C_BEHIND ||
4652		   device->state.conn < C_CONNECTED ||
4653		   device->state.disk < D_NEGOTIATING);
4654
4655	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4656
4657	/* Here the _drbd_uuid_ functions are right, current should
4658	   _not_ be rotated into the history */
4659	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4660		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4661		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4662
4663		drbd_print_uuids(device, "updated sync uuid");
4664		drbd_start_resync(device, C_SYNC_TARGET);
4665
4666		put_ldev(device);
4667	} else
4668		drbd_err(device, "Ignoring SyncUUID packet!\n");
4669
4670	return 0;
4671}
4672
4673/**
4674 * receive_bitmap_plain
4675 *
4676 * Return 0 when done, 1 when another iteration is needed, and a negative error
4677 * code upon failure.
4678 */
4679static int
4680receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4681		     unsigned long *p, struct bm_xfer_ctx *c)
4682{
4683	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4684				 drbd_header_size(peer_device->connection);
4685	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4686				       c->bm_words - c->word_offset);
4687	unsigned int want = num_words * sizeof(*p);
4688	int err;
4689
4690	if (want != size) {
4691		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4692		return -EIO;
4693	}
4694	if (want == 0)
4695		return 0;
4696	err = drbd_recv_all(peer_device->connection, p, want);
4697	if (err)
4698		return err;
4699
4700	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4701
4702	c->word_offset += num_words;
4703	c->bit_offset = c->word_offset * BITS_PER_LONG;
4704	if (c->bit_offset > c->bm_bits)
4705		c->bit_offset = c->bm_bits;
4706
4707	return 1;
4708}
4709
4710static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4711{
4712	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4713}
4714
4715static int dcbp_get_start(struct p_compressed_bm *p)
4716{
4717	return (p->encoding & 0x80) != 0;
4718}
4719
4720static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4721{
4722	return (p->encoding >> 4) & 0x7;
4723}
4724
4725/**
4726 * recv_bm_rle_bits
4727 *
4728 * Return 0 when done, 1 when another iteration is needed, and a negative error
4729 * code upon failure.
4730 */
4731static int
4732recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4733		struct p_compressed_bm *p,
4734		 struct bm_xfer_ctx *c,
4735		 unsigned int len)
4736{
4737	struct bitstream bs;
4738	u64 look_ahead;
4739	u64 rl;
4740	u64 tmp;
4741	unsigned long s = c->bit_offset;
4742	unsigned long e;
4743	int toggle = dcbp_get_start(p);
4744	int have;
4745	int bits;
4746
4747	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4748
4749	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4750	if (bits < 0)
4751		return -EIO;
4752
4753	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4754		bits = vli_decode_bits(&rl, look_ahead);
4755		if (bits <= 0)
4756			return -EIO;
4757
4758		if (toggle) {
4759			e = s + rl -1;
4760			if (e >= c->bm_bits) {
4761				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4762				return -EIO;
4763			}
4764			_drbd_bm_set_bits(peer_device->device, s, e);
4765		}
4766
4767		if (have < bits) {
4768			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4769				have, bits, look_ahead,
4770				(unsigned int)(bs.cur.b - p->code),
4771				(unsigned int)bs.buf_len);
4772			return -EIO;
4773		}
4774		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4775		if (likely(bits < 64))
4776			look_ahead >>= bits;
4777		else
4778			look_ahead = 0;
4779		have -= bits;
4780
4781		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4782		if (bits < 0)
4783			return -EIO;
4784		look_ahead |= tmp << have;
4785		have += bits;
4786	}
4787
4788	c->bit_offset = s;
4789	bm_xfer_ctx_bit_to_word_offset(c);
4790
4791	return (s != c->bm_bits);
4792}
4793
4794/**
4795 * decode_bitmap_c
4796 *
4797 * Return 0 when done, 1 when another iteration is needed, and a negative error
4798 * code upon failure.
4799 */
4800static int
4801decode_bitmap_c(struct drbd_peer_device *peer_device,
4802		struct p_compressed_bm *p,
4803		struct bm_xfer_ctx *c,
4804		unsigned int len)
4805{
4806	if (dcbp_get_code(p) == RLE_VLI_Bits)
4807		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4808
4809	/* other variants had been implemented for evaluation,
4810	 * but have been dropped as this one turned out to be "best"
4811	 * during all our tests. */
4812
4813	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4814	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4815	return -EIO;
4816}
4817
4818void INFO_bm_xfer_stats(struct drbd_device *device,
4819		const char *direction, struct bm_xfer_ctx *c)
4820{
4821	/* what would it take to transfer it "plaintext" */
4822	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4823	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4824	unsigned int plain =
4825		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4826		c->bm_words * sizeof(unsigned long);
4827	unsigned int total = c->bytes[0] + c->bytes[1];
4828	unsigned int r;
4829
4830	/* total can not be zero. but just in case: */
4831	if (total == 0)
4832		return;
4833
4834	/* don't report if not compressed */
4835	if (total >= plain)
4836		return;
4837
4838	/* total < plain. check for overflow, still */
4839	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4840		                    : (1000 * total / plain);
4841
4842	if (r > 1000)
4843		r = 1000;
4844
4845	r = 1000 - r;
4846	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4847	     "total %u; compression: %u.%u%%\n",
4848			direction,
4849			c->bytes[1], c->packets[1],
4850			c->bytes[0], c->packets[0],
4851			total, r/10, r % 10);
4852}
4853
4854/* Since we are processing the bitfield from lower addresses to higher,
4855   it does not matter if the process it in 32 bit chunks or 64 bit
4856   chunks as long as it is little endian. (Understand it as byte stream,
4857   beginning with the lowest byte...) If we would use big endian
4858   we would need to process it from the highest address to the lowest,
4859   in order to be agnostic to the 32 vs 64 bits issue.
4860
4861   returns 0 on failure, 1 if we successfully received it. */
4862static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4863{
4864	struct drbd_peer_device *peer_device;
4865	struct drbd_device *device;
4866	struct bm_xfer_ctx c;
4867	int err;
4868
4869	peer_device = conn_peer_device(connection, pi->vnr);
4870	if (!peer_device)
4871		return -EIO;
4872	device = peer_device->device;
4873
4874	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4875	/* you are supposed to send additional out-of-sync information
4876	 * if you actually set bits during this phase */
4877
4878	c = (struct bm_xfer_ctx) {
4879		.bm_bits = drbd_bm_bits(device),
4880		.bm_words = drbd_bm_words(device),
4881	};
4882
4883	for(;;) {
4884		if (pi->cmd == P_BITMAP)
4885			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4886		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4887			/* MAYBE: sanity check that we speak proto >= 90,
4888			 * and the feature is enabled! */
4889			struct p_compressed_bm *p = pi->data;
4890
4891			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4892				drbd_err(device, "ReportCBitmap packet too large\n");
4893				err = -EIO;
4894				goto out;
4895			}
4896			if (pi->size <= sizeof(*p)) {
4897				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4898				err = -EIO;
4899				goto out;
4900			}
4901			err = drbd_recv_all(peer_device->connection, p, pi->size);
4902			if (err)
4903			       goto out;
4904			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4905		} else {
4906			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4907			err = -EIO;
4908			goto out;
4909		}
4910
4911		c.packets[pi->cmd == P_BITMAP]++;
4912		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4913
4914		if (err <= 0) {
4915			if (err < 0)
4916				goto out;
4917			break;
4918		}
4919		err = drbd_recv_header(peer_device->connection, pi);
4920		if (err)
4921			goto out;
4922	}
4923
4924	INFO_bm_xfer_stats(device, "receive", &c);
4925
4926	if (device->state.conn == C_WF_BITMAP_T) {
4927		enum drbd_state_rv rv;
4928
4929		err = drbd_send_bitmap(device);
4930		if (err)
4931			goto out;
4932		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4933		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4934		D_ASSERT(device, rv == SS_SUCCESS);
4935	} else if (device->state.conn != C_WF_BITMAP_S) {
4936		/* admin may have requested C_DISCONNECTING,
4937		 * other threads may have noticed network errors */
4938		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4939		    drbd_conn_str(device->state.conn));
4940	}
4941	err = 0;
4942
4943 out:
4944	drbd_bm_unlock(device);
4945	if (!err && device->state.conn == C_WF_BITMAP_S)
4946		drbd_start_resync(device, C_SYNC_SOURCE);
4947	return err;
4948}
4949
4950static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4951{
4952	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4953		 pi->cmd, pi->size);
4954
4955	return ignore_remaining_packet(connection, pi);
4956}
4957
4958static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4959{
4960	/* Make sure we've acked all the TCP data associated
4961	 * with the data requests being unplugged */
4962	tcp_sock_set_quickack(connection->data.socket->sk, 2);
4963	return 0;
4964}
4965
4966static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4967{
4968	struct drbd_peer_device *peer_device;
4969	struct drbd_device *device;
4970	struct p_block_desc *p = pi->data;
4971
4972	peer_device = conn_peer_device(connection, pi->vnr);
4973	if (!peer_device)
4974		return -EIO;
4975	device = peer_device->device;
4976
4977	switch (device->state.conn) {
4978	case C_WF_SYNC_UUID:
4979	case C_WF_BITMAP_T:
4980	case C_BEHIND:
4981			break;
4982	default:
4983		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4984				drbd_conn_str(device->state.conn));
4985	}
4986
4987	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4988
4989	return 0;
4990}
4991
4992static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4993{
4994	struct drbd_peer_device *peer_device;
4995	struct p_block_desc *p = pi->data;
4996	struct drbd_device *device;
4997	sector_t sector;
4998	int size, err = 0;
4999
5000	peer_device = conn_peer_device(connection, pi->vnr);
5001	if (!peer_device)
5002		return -EIO;
5003	device = peer_device->device;
5004
5005	sector = be64_to_cpu(p->sector);
5006	size = be32_to_cpu(p->blksize);
5007
5008	dec_rs_pending(device);
5009
5010	if (get_ldev(device)) {
5011		struct drbd_peer_request *peer_req;
5012		const int op = REQ_OP_WRITE_ZEROES;
5013
5014		peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
5015					       size, 0, GFP_NOIO);
5016		if (!peer_req) {
5017			put_ldev(device);
5018			return -ENOMEM;
5019		}
5020
5021		peer_req->w.cb = e_end_resync_block;
5022		peer_req->submit_jif = jiffies;
5023		peer_req->flags |= EE_TRIM;
5024
5025		spin_lock_irq(&device->resource->req_lock);
5026		list_add_tail(&peer_req->w.list, &device->sync_ee);
5027		spin_unlock_irq(&device->resource->req_lock);
5028
5029		atomic_add(pi->size >> 9, &device->rs_sect_ev);
5030		err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
5031
5032		if (err) {
5033			spin_lock_irq(&device->resource->req_lock);
5034			list_del(&peer_req->w.list);
5035			spin_unlock_irq(&device->resource->req_lock);
5036
5037			drbd_free_peer_req(device, peer_req);
5038			put_ldev(device);
5039			err = 0;
5040			goto fail;
5041		}
5042
5043		inc_unacked(device);
5044
5045		/* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
5046		   as well as drbd_rs_complete_io() */
5047	} else {
5048	fail:
5049		drbd_rs_complete_io(device, sector);
5050		drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
5051	}
5052
5053	atomic_add(size >> 9, &device->rs_sect_in);
5054
5055	return err;
5056}
5057
5058struct data_cmd {
5059	int expect_payload;
5060	unsigned int pkt_size;
5061	int (*fn)(struct drbd_connection *, struct packet_info *);
5062};
5063
5064static struct data_cmd drbd_cmd_handler[] = {
5065	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
5066	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
5067	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
5068	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
5069	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
5070	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
5071	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
5072	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
5073	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
5074	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
5075	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
5076	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
5077	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
5078	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
5079	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
5080	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
5081	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
5082	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
5083	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
5084	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
5085	[P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
5086	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
5087	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
5088	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
5089	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
5090	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
5091	[P_ZEROES]	    = { 0, sizeof(struct p_trim), receive_Data },
5092	[P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
5093	[P_WSAME]	    = { 1, sizeof(struct p_wsame), receive_Data },
5094};
5095
5096static void drbdd(struct drbd_connection *connection)
5097{
5098	struct packet_info pi;
5099	size_t shs; /* sub header size */
5100	int err;
5101
5102	while (get_t_state(&connection->receiver) == RUNNING) {
5103		struct data_cmd const *cmd;
5104
5105		drbd_thread_current_set_cpu(&connection->receiver);
5106		update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
5107		if (drbd_recv_header_maybe_unplug(connection, &pi))
5108			goto err_out;
5109
5110		cmd = &drbd_cmd_handler[pi.cmd];
5111		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
5112			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
5113				 cmdname(pi.cmd), pi.cmd);
5114			goto err_out;
5115		}
5116
5117		shs = cmd->pkt_size;
5118		if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
5119			shs += sizeof(struct o_qlim);
5120		if (pi.size > shs && !cmd->expect_payload) {
5121			drbd_err(connection, "No payload expected %s l:%d\n",
5122				 cmdname(pi.cmd), pi.size);
5123			goto err_out;
5124		}
5125		if (pi.size < shs) {
5126			drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
5127				 cmdname(pi.cmd), (int)shs, pi.size);
5128			goto err_out;
5129		}
5130
5131		if (shs) {
5132			update_receiver_timing_details(connection, drbd_recv_all_warn);
5133			err = drbd_recv_all_warn(connection, pi.data, shs);
5134			if (err)
5135				goto err_out;
5136			pi.size -= shs;
5137		}
5138
5139		update_receiver_timing_details(connection, cmd->fn);
5140		err = cmd->fn(connection, &pi);
5141		if (err) {
5142			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5143				 cmdname(pi.cmd), err, pi.size);
5144			goto err_out;
5145		}
5146	}
5147	return;
5148
5149    err_out:
5150	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5151}
5152
5153static void conn_disconnect(struct drbd_connection *connection)
5154{
5155	struct drbd_peer_device *peer_device;
5156	enum drbd_conns oc;
5157	int vnr;
5158
5159	if (connection->cstate == C_STANDALONE)
5160		return;
5161
5162	/* We are about to start the cleanup after connection loss.
5163	 * Make sure drbd_make_request knows about that.
5164	 * Usually we should be in some network failure state already,
5165	 * but just in case we are not, we fix it up here.
5166	 */
5167	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5168
5169	/* ack_receiver does not clean up anything. it must not interfere, either */
5170	drbd_thread_stop(&connection->ack_receiver);
5171	if (connection->ack_sender) {
5172		destroy_workqueue(connection->ack_sender);
5173		connection->ack_sender = NULL;
5174	}
5175	drbd_free_sock(connection);
5176
5177	rcu_read_lock();
5178	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5179		struct drbd_device *device = peer_device->device;
5180		kref_get(&device->kref);
5181		rcu_read_unlock();
5182		drbd_disconnected(peer_device);
5183		kref_put(&device->kref, drbd_destroy_device);
5184		rcu_read_lock();
5185	}
5186	rcu_read_unlock();
5187
5188	if (!list_empty(&connection->current_epoch->list))
5189		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5190	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5191	atomic_set(&connection->current_epoch->epoch_size, 0);
5192	connection->send.seen_any_write_yet = false;
5193
5194	drbd_info(connection, "Connection closed\n");
5195
5196	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5197		conn_try_outdate_peer_async(connection);
5198
5199	spin_lock_irq(&connection->resource->req_lock);
5200	oc = connection->cstate;
5201	if (oc >= C_UNCONNECTED)
5202		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5203
5204	spin_unlock_irq(&connection->resource->req_lock);
5205
5206	if (oc == C_DISCONNECTING)
5207		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5208}
5209
5210static int drbd_disconnected(struct drbd_peer_device *peer_device)
5211{
5212	struct drbd_device *device = peer_device->device;
5213	unsigned int i;
5214
5215	/* wait for current activity to cease. */
5216	spin_lock_irq(&device->resource->req_lock);
5217	_drbd_wait_ee_list_empty(device, &device->active_ee);
5218	_drbd_wait_ee_list_empty(device, &device->sync_ee);
5219	_drbd_wait_ee_list_empty(device, &device->read_ee);
5220	spin_unlock_irq(&device->resource->req_lock);
5221
5222	/* We do not have data structures that would allow us to
5223	 * get the rs_pending_cnt down to 0 again.
5224	 *  * On C_SYNC_TARGET we do not have any data structures describing
5225	 *    the pending RSDataRequest's we have sent.
5226	 *  * On C_SYNC_SOURCE there is no data structure that tracks
5227	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5228	 *  And no, it is not the sum of the reference counts in the
5229	 *  resync_LRU. The resync_LRU tracks the whole operation including
5230	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5231	 *  on the fly. */
5232	drbd_rs_cancel_all(device);
5233	device->rs_total = 0;
5234	device->rs_failed = 0;
5235	atomic_set(&device->rs_pending_cnt, 0);
5236	wake_up(&device->misc_wait);
5237
5238	del_timer_sync(&device->resync_timer);
5239	resync_timer_fn(&device->resync_timer);
5240
5241	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5242	 * w_make_resync_request etc. which may still be on the worker queue
5243	 * to be "canceled" */
5244	drbd_flush_workqueue(&peer_device->connection->sender_work);
5245
5246	drbd_finish_peer_reqs(device);
5247
5248	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5249	   might have issued a work again. The one before drbd_finish_peer_reqs() is
5250	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5251	drbd_flush_workqueue(&peer_device->connection->sender_work);
5252
5253	/* need to do it again, drbd_finish_peer_reqs() may have populated it
5254	 * again via drbd_try_clear_on_disk_bm(). */
5255	drbd_rs_cancel_all(device);
5256
5257	kfree(device->p_uuid);
5258	device->p_uuid = NULL;
5259
5260	if (!drbd_suspended(device))
5261		tl_clear(peer_device->connection);
5262
5263	drbd_md_sync(device);
5264
5265	if (get_ldev(device)) {
5266		drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5267				"write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5268		put_ldev(device);
5269	}
5270
5271	/* tcp_close and release of sendpage pages can be deferred.  I don't
5272	 * want to use SO_LINGER, because apparently it can be deferred for
5273	 * more than 20 seconds (longest time I checked).
5274	 *
5275	 * Actually we don't care for exactly when the network stack does its
5276	 * put_page(), but release our reference on these pages right here.
5277	 */
5278	i = drbd_free_peer_reqs(device, &device->net_ee);
5279	if (i)
5280		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5281	i = atomic_read(&device->pp_in_use_by_net);
5282	if (i)
5283		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5284	i = atomic_read(&device->pp_in_use);
5285	if (i)
5286		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5287
5288	D_ASSERT(device, list_empty(&device->read_ee));
5289	D_ASSERT(device, list_empty(&device->active_ee));
5290	D_ASSERT(device, list_empty(&device->sync_ee));
5291	D_ASSERT(device, list_empty(&device->done_ee));
5292
5293	return 0;
5294}
5295
5296/*
5297 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5298 * we can agree on is stored in agreed_pro_version.
5299 *
5300 * feature flags and the reserved array should be enough room for future
5301 * enhancements of the handshake protocol, and possible plugins...
5302 *
5303 * for now, they are expected to be zero, but ignored.
5304 */
5305static int drbd_send_features(struct drbd_connection *connection)
5306{
5307	struct drbd_socket *sock;
5308	struct p_connection_features *p;
5309
5310	sock = &connection->data;
5311	p = conn_prepare_command(connection, sock);
5312	if (!p)
5313		return -EIO;
5314	memset(p, 0, sizeof(*p));
5315	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5316	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5317	p->feature_flags = cpu_to_be32(PRO_FEATURES);
5318	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5319}
5320
5321/*
5322 * return values:
5323 *   1 yes, we have a valid connection
5324 *   0 oops, did not work out, please try again
5325 *  -1 peer talks different language,
5326 *     no point in trying again, please go standalone.
5327 */
5328static int drbd_do_features(struct drbd_connection *connection)
5329{
5330	/* ASSERT current == connection->receiver ... */
5331	struct p_connection_features *p;
5332	const int expect = sizeof(struct p_connection_features);
5333	struct packet_info pi;
5334	int err;
5335
5336	err = drbd_send_features(connection);
5337	if (err)
5338		return 0;
5339
5340	err = drbd_recv_header(connection, &pi);
5341	if (err)
5342		return 0;
5343
5344	if (pi.cmd != P_CONNECTION_FEATURES) {
5345		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5346			 cmdname(pi.cmd), pi.cmd);
5347		return -1;
5348	}
5349
5350	if (pi.size != expect) {
5351		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5352		     expect, pi.size);
5353		return -1;
5354	}
5355
5356	p = pi.data;
5357	err = drbd_recv_all_warn(connection, p, expect);
5358	if (err)
5359		return 0;
5360
5361	p->protocol_min = be32_to_cpu(p->protocol_min);
5362	p->protocol_max = be32_to_cpu(p->protocol_max);
5363	if (p->protocol_max == 0)
5364		p->protocol_max = p->protocol_min;
5365
5366	if (PRO_VERSION_MAX < p->protocol_min ||
5367	    PRO_VERSION_MIN > p->protocol_max)
5368		goto incompat;
5369
5370	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5371	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5372
5373	drbd_info(connection, "Handshake successful: "
5374	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
5375
5376	drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n",
5377		  connection->agreed_features,
5378		  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5379		  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5380		  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "",
5381		  connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" :
5382		  connection->agreed_features ? "" : " none");
5383
5384	return 1;
5385
5386 incompat:
5387	drbd_err(connection, "incompatible DRBD dialects: "
5388	    "I support %d-%d, peer supports %d-%d\n",
5389	    PRO_VERSION_MIN, PRO_VERSION_MAX,
5390	    p->protocol_min, p->protocol_max);
5391	return -1;
5392}
5393
5394#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5395static int drbd_do_auth(struct drbd_connection *connection)
5396{
5397	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5398	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5399	return -1;
5400}
5401#else
5402#define CHALLENGE_LEN 64
5403
5404/* Return value:
5405	1 - auth succeeded,
5406	0 - failed, try again (network error),
5407	-1 - auth failed, don't try again.
5408*/
5409
5410static int drbd_do_auth(struct drbd_connection *connection)
5411{
5412	struct drbd_socket *sock;
5413	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5414	char *response = NULL;
5415	char *right_response = NULL;
5416	char *peers_ch = NULL;
5417	unsigned int key_len;
5418	char secret[SHARED_SECRET_MAX]; /* 64 byte */
5419	unsigned int resp_size;
5420	struct shash_desc *desc;
5421	struct packet_info pi;
5422	struct net_conf *nc;
5423	int err, rv;
5424
5425	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5426
5427	rcu_read_lock();
5428	nc = rcu_dereference(connection->net_conf);
5429	key_len = strlen(nc->shared_secret);
5430	memcpy(secret, nc->shared_secret, key_len);
5431	rcu_read_unlock();
5432
5433	desc = kmalloc(sizeof(struct shash_desc) +
5434		       crypto_shash_descsize(connection->cram_hmac_tfm),
5435		       GFP_KERNEL);
5436	if (!desc) {
5437		rv = -1;
5438		goto fail;
5439	}
5440	desc->tfm = connection->cram_hmac_tfm;
5441
5442	rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5443	if (rv) {
5444		drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5445		rv = -1;
5446		goto fail;
5447	}
5448
5449	get_random_bytes(my_challenge, CHALLENGE_LEN);
5450
5451	sock = &connection->data;
5452	if (!conn_prepare_command(connection, sock)) {
5453		rv = 0;
5454		goto fail;
5455	}
5456	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5457				my_challenge, CHALLENGE_LEN);
5458	if (!rv)
5459		goto fail;
5460
5461	err = drbd_recv_header(connection, &pi);
5462	if (err) {
5463		rv = 0;
5464		goto fail;
5465	}
5466
5467	if (pi.cmd != P_AUTH_CHALLENGE) {
5468		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5469			 cmdname(pi.cmd), pi.cmd);
5470		rv = -1;
5471		goto fail;
5472	}
5473
5474	if (pi.size > CHALLENGE_LEN * 2) {
5475		drbd_err(connection, "expected AuthChallenge payload too big.\n");
5476		rv = -1;
5477		goto fail;
5478	}
5479
5480	if (pi.size < CHALLENGE_LEN) {
5481		drbd_err(connection, "AuthChallenge payload too small.\n");
5482		rv = -1;
5483		goto fail;
5484	}
5485
5486	peers_ch = kmalloc(pi.size, GFP_NOIO);
5487	if (peers_ch == NULL) {
5488		drbd_err(connection, "kmalloc of peers_ch failed\n");
5489		rv = -1;
5490		goto fail;
5491	}
5492
5493	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5494	if (err) {
5495		rv = 0;
5496		goto fail;
5497	}
5498
5499	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5500		drbd_err(connection, "Peer presented the same challenge!\n");
5501		rv = -1;
5502		goto fail;
5503	}
5504
5505	resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5506	response = kmalloc(resp_size, GFP_NOIO);
5507	if (response == NULL) {
5508		drbd_err(connection, "kmalloc of response failed\n");
5509		rv = -1;
5510		goto fail;
5511	}
5512
5513	rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5514	if (rv) {
5515		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5516		rv = -1;
5517		goto fail;
5518	}
5519
5520	if (!conn_prepare_command(connection, sock)) {
5521		rv = 0;
5522		goto fail;
5523	}
5524	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5525				response, resp_size);
5526	if (!rv)
5527		goto fail;
5528
5529	err = drbd_recv_header(connection, &pi);
5530	if (err) {
5531		rv = 0;
5532		goto fail;
5533	}
5534
5535	if (pi.cmd != P_AUTH_RESPONSE) {
5536		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5537			 cmdname(pi.cmd), pi.cmd);
5538		rv = 0;
5539		goto fail;
5540	}
5541
5542	if (pi.size != resp_size) {
5543		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5544		rv = 0;
5545		goto fail;
5546	}
5547
5548	err = drbd_recv_all_warn(connection, response , resp_size);
5549	if (err) {
5550		rv = 0;
5551		goto fail;
5552	}
5553
5554	right_response = kmalloc(resp_size, GFP_NOIO);
5555	if (right_response == NULL) {
5556		drbd_err(connection, "kmalloc of right_response failed\n");
5557		rv = -1;
5558		goto fail;
5559	}
5560
5561	rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5562				 right_response);
5563	if (rv) {
5564		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5565		rv = -1;
5566		goto fail;
5567	}
5568
5569	rv = !memcmp(response, right_response, resp_size);
5570
5571	if (rv)
5572		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5573		     resp_size);
5574	else
5575		rv = -1;
5576
5577 fail:
5578	kfree(peers_ch);
5579	kfree(response);
5580	kfree(right_response);
5581	if (desc) {
5582		shash_desc_zero(desc);
5583		kfree(desc);
5584	}
5585
5586	return rv;
5587}
5588#endif
5589
5590int drbd_receiver(struct drbd_thread *thi)
5591{
5592	struct drbd_connection *connection = thi->connection;
5593	int h;
5594
5595	drbd_info(connection, "receiver (re)started\n");
5596
5597	do {
5598		h = conn_connect(connection);
5599		if (h == 0) {
5600			conn_disconnect(connection);
5601			schedule_timeout_interruptible(HZ);
5602		}
5603		if (h == -1) {
5604			drbd_warn(connection, "Discarding network configuration.\n");
5605			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5606		}
5607	} while (h == 0);
5608
5609	if (h > 0) {
5610		blk_start_plug(&connection->receiver_plug);
5611		drbdd(connection);
5612		blk_finish_plug(&connection->receiver_plug);
5613	}
5614
5615	conn_disconnect(connection);
5616
5617	drbd_info(connection, "receiver terminated\n");
5618	return 0;
5619}
5620
5621/* ********* acknowledge sender ******** */
5622
5623static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5624{
5625	struct p_req_state_reply *p = pi->data;
5626	int retcode = be32_to_cpu(p->retcode);
5627
5628	if (retcode >= SS_SUCCESS) {
5629		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5630	} else {
5631		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5632		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5633			 drbd_set_st_err_str(retcode), retcode);
5634	}
5635	wake_up(&connection->ping_wait);
5636
5637	return 0;
5638}
5639
5640static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5641{
5642	struct drbd_peer_device *peer_device;
5643	struct drbd_device *device;
5644	struct p_req_state_reply *p = pi->data;
5645	int retcode = be32_to_cpu(p->retcode);
5646
5647	peer_device = conn_peer_device(connection, pi->vnr);
5648	if (!peer_device)
5649		return -EIO;
5650	device = peer_device->device;
5651
5652	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5653		D_ASSERT(device, connection->agreed_pro_version < 100);
5654		return got_conn_RqSReply(connection, pi);
5655	}
5656
5657	if (retcode >= SS_SUCCESS) {
5658		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5659	} else {
5660		set_bit(CL_ST_CHG_FAIL, &device->flags);
5661		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5662			drbd_set_st_err_str(retcode), retcode);
5663	}
5664	wake_up(&device->state_wait);
5665
5666	return 0;
5667}
5668
5669static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5670{
5671	return drbd_send_ping_ack(connection);
5672
5673}
5674
5675static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5676{
5677	/* restore idle timeout */
5678	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5679	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5680		wake_up(&connection->ping_wait);
5681
5682	return 0;
5683}
5684
5685static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5686{
5687	struct drbd_peer_device *peer_device;
5688	struct drbd_device *device;
5689	struct p_block_ack *p = pi->data;
5690	sector_t sector = be64_to_cpu(p->sector);
5691	int blksize = be32_to_cpu(p->blksize);
5692
5693	peer_device = conn_peer_device(connection, pi->vnr);
5694	if (!peer_device)
5695		return -EIO;
5696	device = peer_device->device;
5697
5698	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5699
5700	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5701
5702	if (get_ldev(device)) {
5703		drbd_rs_complete_io(device, sector);
5704		drbd_set_in_sync(device, sector, blksize);
5705		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5706		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5707		put_ldev(device);
5708	}
5709	dec_rs_pending(device);
5710	atomic_add(blksize >> 9, &device->rs_sect_in);
5711
5712	return 0;
5713}
5714
5715static int
5716validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5717			      struct rb_root *root, const char *func,
5718			      enum drbd_req_event what, bool missing_ok)
5719{
5720	struct drbd_request *req;
5721	struct bio_and_error m;
5722
5723	spin_lock_irq(&device->resource->req_lock);
5724	req = find_request(device, root, id, sector, missing_ok, func);
5725	if (unlikely(!req)) {
5726		spin_unlock_irq(&device->resource->req_lock);
5727		return -EIO;
5728	}
5729	__req_mod(req, what, &m);
5730	spin_unlock_irq(&device->resource->req_lock);
5731
5732	if (m.bio)
5733		complete_master_bio(device, &m);
5734	return 0;
5735}
5736
5737static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5738{
5739	struct drbd_peer_device *peer_device;
5740	struct drbd_device *device;
5741	struct p_block_ack *p = pi->data;
5742	sector_t sector = be64_to_cpu(p->sector);
5743	int blksize = be32_to_cpu(p->blksize);
5744	enum drbd_req_event what;
5745
5746	peer_device = conn_peer_device(connection, pi->vnr);
5747	if (!peer_device)
5748		return -EIO;
5749	device = peer_device->device;
5750
5751	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5752
5753	if (p->block_id == ID_SYNCER) {
5754		drbd_set_in_sync(device, sector, blksize);
5755		dec_rs_pending(device);
5756		return 0;
5757	}
5758	switch (pi->cmd) {
5759	case P_RS_WRITE_ACK:
5760		what = WRITE_ACKED_BY_PEER_AND_SIS;
5761		break;
5762	case P_WRITE_ACK:
5763		what = WRITE_ACKED_BY_PEER;
5764		break;
5765	case P_RECV_ACK:
5766		what = RECV_ACKED_BY_PEER;
5767		break;
5768	case P_SUPERSEDED:
5769		what = CONFLICT_RESOLVED;
5770		break;
5771	case P_RETRY_WRITE:
5772		what = POSTPONE_WRITE;
5773		break;
5774	default:
5775		BUG();
5776	}
5777
5778	return validate_req_change_req_state(device, p->block_id, sector,
5779					     &device->write_requests, __func__,
5780					     what, false);
5781}
5782
5783static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5784{
5785	struct drbd_peer_device *peer_device;
5786	struct drbd_device *device;
5787	struct p_block_ack *p = pi->data;
5788	sector_t sector = be64_to_cpu(p->sector);
5789	int size = be32_to_cpu(p->blksize);
5790	int err;
5791
5792	peer_device = conn_peer_device(connection, pi->vnr);
5793	if (!peer_device)
5794		return -EIO;
5795	device = peer_device->device;
5796
5797	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5798
5799	if (p->block_id == ID_SYNCER) {
5800		dec_rs_pending(device);
5801		drbd_rs_failed_io(device, sector, size);
5802		return 0;
5803	}
5804
5805	err = validate_req_change_req_state(device, p->block_id, sector,
5806					    &device->write_requests, __func__,
5807					    NEG_ACKED, true);
5808	if (err) {
5809		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5810		   The master bio might already be completed, therefore the
5811		   request is no longer in the collision hash. */
5812		/* In Protocol B we might already have got a P_RECV_ACK
5813		   but then get a P_NEG_ACK afterwards. */
5814		drbd_set_out_of_sync(device, sector, size);
5815	}
5816	return 0;
5817}
5818
5819static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5820{
5821	struct drbd_peer_device *peer_device;
5822	struct drbd_device *device;
5823	struct p_block_ack *p = pi->data;
5824	sector_t sector = be64_to_cpu(p->sector);
5825
5826	peer_device = conn_peer_device(connection, pi->vnr);
5827	if (!peer_device)
5828		return -EIO;
5829	device = peer_device->device;
5830
5831	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5832
5833	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5834	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5835
5836	return validate_req_change_req_state(device, p->block_id, sector,
5837					     &device->read_requests, __func__,
5838					     NEG_ACKED, false);
5839}
5840
5841static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5842{
5843	struct drbd_peer_device *peer_device;
5844	struct drbd_device *device;
5845	sector_t sector;
5846	int size;
5847	struct p_block_ack *p = pi->data;
5848
5849	peer_device = conn_peer_device(connection, pi->vnr);
5850	if (!peer_device)
5851		return -EIO;
5852	device = peer_device->device;
5853
5854	sector = be64_to_cpu(p->sector);
5855	size = be32_to_cpu(p->blksize);
5856
5857	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5858
5859	dec_rs_pending(device);
5860
5861	if (get_ldev_if_state(device, D_FAILED)) {
5862		drbd_rs_complete_io(device, sector);
5863		switch (pi->cmd) {
5864		case P_NEG_RS_DREPLY:
5865			drbd_rs_failed_io(device, sector, size);
5866		case P_RS_CANCEL:
5867			break;
5868		default:
5869			BUG();
5870		}
5871		put_ldev(device);
5872	}
5873
5874	return 0;
5875}
5876
5877static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5878{
5879	struct p_barrier_ack *p = pi->data;
5880	struct drbd_peer_device *peer_device;
5881	int vnr;
5882
5883	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5884
5885	rcu_read_lock();
5886	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5887		struct drbd_device *device = peer_device->device;
5888
5889		if (device->state.conn == C_AHEAD &&
5890		    atomic_read(&device->ap_in_flight) == 0 &&
5891		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5892			device->start_resync_timer.expires = jiffies + HZ;
5893			add_timer(&device->start_resync_timer);
5894		}
5895	}
5896	rcu_read_unlock();
5897
5898	return 0;
5899}
5900
5901static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5902{
5903	struct drbd_peer_device *peer_device;
5904	struct drbd_device *device;
5905	struct p_block_ack *p = pi->data;
5906	struct drbd_device_work *dw;
5907	sector_t sector;
5908	int size;
5909
5910	peer_device = conn_peer_device(connection, pi->vnr);
5911	if (!peer_device)
5912		return -EIO;
5913	device = peer_device->device;
5914
5915	sector = be64_to_cpu(p->sector);
5916	size = be32_to_cpu(p->blksize);
5917
5918	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5919
5920	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5921		drbd_ov_out_of_sync_found(device, sector, size);
5922	else
5923		ov_out_of_sync_print(device);
5924
5925	if (!get_ldev(device))
5926		return 0;
5927
5928	drbd_rs_complete_io(device, sector);
5929	dec_rs_pending(device);
5930
5931	--device->ov_left;
5932
5933	/* let's advance progress step marks only for every other megabyte */
5934	if ((device->ov_left & 0x200) == 0x200)
5935		drbd_advance_rs_marks(device, device->ov_left);
5936
5937	if (device->ov_left == 0) {
5938		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5939		if (dw) {
5940			dw->w.cb = w_ov_finished;
5941			dw->device = device;
5942			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5943		} else {
5944			drbd_err(device, "kmalloc(dw) failed.");
5945			ov_out_of_sync_print(device);
5946			drbd_resync_finished(device);
5947		}
5948	}
5949	put_ldev(device);
5950	return 0;
5951}
5952
5953static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5954{
5955	return 0;
5956}
5957
5958struct meta_sock_cmd {
5959	size_t pkt_size;
5960	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5961};
5962
5963static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5964{
5965	long t;
5966	struct net_conf *nc;
5967
5968	rcu_read_lock();
5969	nc = rcu_dereference(connection->net_conf);
5970	t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5971	rcu_read_unlock();
5972
5973	t *= HZ;
5974	if (ping_timeout)
5975		t /= 10;
5976
5977	connection->meta.socket->sk->sk_rcvtimeo = t;
5978}
5979
5980static void set_ping_timeout(struct drbd_connection *connection)
5981{
5982	set_rcvtimeo(connection, 1);
5983}
5984
5985static void set_idle_timeout(struct drbd_connection *connection)
5986{
5987	set_rcvtimeo(connection, 0);
5988}
5989
5990static struct meta_sock_cmd ack_receiver_tbl[] = {
5991	[P_PING]	    = { 0, got_Ping },
5992	[P_PING_ACK]	    = { 0, got_PingAck },
5993	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5994	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5995	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5996	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5997	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5998	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5999	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
6000	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
6001	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
6002	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
6003	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
6004	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
6005	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
6006	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
6007	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
6008};
6009
6010int drbd_ack_receiver(struct drbd_thread *thi)
6011{
6012	struct drbd_connection *connection = thi->connection;
6013	struct meta_sock_cmd *cmd = NULL;
6014	struct packet_info pi;
6015	unsigned long pre_recv_jif;
6016	int rv;
6017	void *buf    = connection->meta.rbuf;
6018	int received = 0;
6019	unsigned int header_size = drbd_header_size(connection);
6020	int expect   = header_size;
6021	bool ping_timeout_active = false;
6022
6023	sched_set_fifo_low(current);
6024
6025	while (get_t_state(thi) == RUNNING) {
6026		drbd_thread_current_set_cpu(thi);
6027
6028		conn_reclaim_net_peer_reqs(connection);
6029
6030		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
6031			if (drbd_send_ping(connection)) {
6032				drbd_err(connection, "drbd_send_ping has failed\n");
6033				goto reconnect;
6034			}
6035			set_ping_timeout(connection);
6036			ping_timeout_active = true;
6037		}
6038
6039		pre_recv_jif = jiffies;
6040		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
6041
6042		/* Note:
6043		 * -EINTR	 (on meta) we got a signal
6044		 * -EAGAIN	 (on meta) rcvtimeo expired
6045		 * -ECONNRESET	 other side closed the connection
6046		 * -ERESTARTSYS  (on data) we got a signal
6047		 * rv <  0	 other than above: unexpected error!
6048		 * rv == expected: full header or command
6049		 * rv <  expected: "woken" by signal during receive
6050		 * rv == 0	 : "connection shut down by peer"
6051		 */
6052		if (likely(rv > 0)) {
6053			received += rv;
6054			buf	 += rv;
6055		} else if (rv == 0) {
6056			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
6057				long t;
6058				rcu_read_lock();
6059				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
6060				rcu_read_unlock();
6061
6062				t = wait_event_timeout(connection->ping_wait,
6063						       connection->cstate < C_WF_REPORT_PARAMS,
6064						       t);
6065				if (t)
6066					break;
6067			}
6068			drbd_err(connection, "meta connection shut down by peer.\n");
6069			goto reconnect;
6070		} else if (rv == -EAGAIN) {
6071			/* If the data socket received something meanwhile,
6072			 * that is good enough: peer is still alive. */
6073			if (time_after(connection->last_received, pre_recv_jif))
6074				continue;
6075			if (ping_timeout_active) {
6076				drbd_err(connection, "PingAck did not arrive in time.\n");
6077				goto reconnect;
6078			}
6079			set_bit(SEND_PING, &connection->flags);
6080			continue;
6081		} else if (rv == -EINTR) {
6082			/* maybe drbd_thread_stop(): the while condition will notice.
6083			 * maybe woken for send_ping: we'll send a ping above,
6084			 * and change the rcvtimeo */
6085			flush_signals(current);
6086			continue;
6087		} else {
6088			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
6089			goto reconnect;
6090		}
6091
6092		if (received == expect && cmd == NULL) {
6093			if (decode_header(connection, connection->meta.rbuf, &pi))
6094				goto reconnect;
6095			cmd = &ack_receiver_tbl[pi.cmd];
6096			if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
6097				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
6098					 cmdname(pi.cmd), pi.cmd);
6099				goto disconnect;
6100			}
6101			expect = header_size + cmd->pkt_size;
6102			if (pi.size != expect - header_size) {
6103				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
6104					pi.cmd, pi.size);
6105				goto reconnect;
6106			}
6107		}
6108		if (received == expect) {
6109			bool err;
6110
6111			err = cmd->fn(connection, &pi);
6112			if (err) {
6113				drbd_err(connection, "%ps failed\n", cmd->fn);
6114				goto reconnect;
6115			}
6116
6117			connection->last_received = jiffies;
6118
6119			if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
6120				set_idle_timeout(connection);
6121				ping_timeout_active = false;
6122			}
6123
6124			buf	 = connection->meta.rbuf;
6125			received = 0;
6126			expect	 = header_size;
6127			cmd	 = NULL;
6128		}
6129	}
6130
6131	if (0) {
6132reconnect:
6133		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6134		conn_md_sync(connection);
6135	}
6136	if (0) {
6137disconnect:
6138		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
6139	}
6140
6141	drbd_info(connection, "ack_receiver terminated\n");
6142
6143	return 0;
6144}
6145
6146void drbd_send_acks_wf(struct work_struct *ws)
6147{
6148	struct drbd_peer_device *peer_device =
6149		container_of(ws, struct drbd_peer_device, send_acks_work);
6150	struct drbd_connection *connection = peer_device->connection;
6151	struct drbd_device *device = peer_device->device;
6152	struct net_conf *nc;
6153	int tcp_cork, err;
6154
6155	rcu_read_lock();
6156	nc = rcu_dereference(connection->net_conf);
6157	tcp_cork = nc->tcp_cork;
6158	rcu_read_unlock();
6159
6160	if (tcp_cork)
6161		tcp_sock_set_cork(connection->meta.socket->sk, true);
6162
6163	err = drbd_finish_peer_reqs(device);
6164	kref_put(&device->kref, drbd_destroy_device);
6165	/* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6166	   struct work_struct send_acks_work alive, which is in the peer_device object */
6167
6168	if (err) {
6169		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6170		return;
6171	}
6172
6173	if (tcp_cork)
6174		tcp_sock_set_cork(connection->meta.socket->sk, false);
6175
6176	return;
6177}
6178