1// SPDX-License-Identifier: GPL-2.0-or-later
2/*
3   drbd_worker.c
4
5   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6
7   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10
11
12*/
13
14#include <linux/module.h>
15#include <linux/drbd.h>
16#include <linux/sched/signal.h>
17#include <linux/wait.h>
18#include <linux/mm.h>
19#include <linux/memcontrol.h>
20#include <linux/mm_inline.h>
21#include <linux/slab.h>
22#include <linux/random.h>
23#include <linux/string.h>
24#include <linux/scatterlist.h>
25#include <linux/part_stat.h>
26
27#include "drbd_int.h"
28#include "drbd_protocol.h"
29#include "drbd_req.h"
30
31static int make_ov_request(struct drbd_device *, int);
32static int make_resync_request(struct drbd_device *, int);
33
34/* endio handlers:
35 *   drbd_md_endio (defined here)
36 *   drbd_request_endio (defined here)
37 *   drbd_peer_request_endio (defined here)
38 *   drbd_bm_endio (defined in drbd_bitmap.c)
39 *
40 * For all these callbacks, note the following:
41 * The callbacks will be called in irq context by the IDE drivers,
42 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
43 * Try to get the locking right :)
44 *
45 */
46
47/* used for synchronous meta data and bitmap IO
48 * submitted by drbd_md_sync_page_io()
49 */
50void drbd_md_endio(struct bio *bio)
51{
52	struct drbd_device *device;
53
54	device = bio->bi_private;
55	device->md_io.error = blk_status_to_errno(bio->bi_status);
56
57	/* special case: drbd_md_read() during drbd_adm_attach() */
58	if (device->ldev)
59		put_ldev(device);
60	bio_put(bio);
61
62	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
63	 * to timeout on the lower level device, and eventually detach from it.
64	 * If this io completion runs after that timeout expired, this
65	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
66	 * During normal operation, this only puts that extra reference
67	 * down to 1 again.
68	 * Make sure we first drop the reference, and only then signal
69	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
70	 * next drbd_md_sync_page_io(), that we trigger the
71	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
72	 */
73	drbd_md_put_buffer(device);
74	device->md_io.done = 1;
75	wake_up(&device->misc_wait);
76}
77
78/* reads on behalf of the partner,
79 * "submitted" by the receiver
80 */
81static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
82{
83	unsigned long flags = 0;
84	struct drbd_peer_device *peer_device = peer_req->peer_device;
85	struct drbd_device *device = peer_device->device;
86
87	spin_lock_irqsave(&device->resource->req_lock, flags);
88	device->read_cnt += peer_req->i.size >> 9;
89	list_del(&peer_req->w.list);
90	if (list_empty(&device->read_ee))
91		wake_up(&device->ee_wait);
92	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
93		__drbd_chk_io_error(device, DRBD_READ_ERROR);
94	spin_unlock_irqrestore(&device->resource->req_lock, flags);
95
96	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
97	put_ldev(device);
98}
99
100/* writes on behalf of the partner, or resync writes,
101 * "submitted" by the receiver, final stage.  */
102void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
103{
104	unsigned long flags = 0;
105	struct drbd_peer_device *peer_device = peer_req->peer_device;
106	struct drbd_device *device = peer_device->device;
107	struct drbd_connection *connection = peer_device->connection;
108	struct drbd_interval i;
109	int do_wake;
110	u64 block_id;
111	int do_al_complete_io;
112
113	/* after we moved peer_req to done_ee,
114	 * we may no longer access it,
115	 * it may be freed/reused already!
116	 * (as soon as we release the req_lock) */
117	i = peer_req->i;
118	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
119	block_id = peer_req->block_id;
120	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
121
122	if (peer_req->flags & EE_WAS_ERROR) {
123		/* In protocol != C, we usually do not send write acks.
124		 * In case of a write error, send the neg ack anyways. */
125		if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
126			inc_unacked(device);
127		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
128	}
129
130	spin_lock_irqsave(&device->resource->req_lock, flags);
131	device->writ_cnt += peer_req->i.size >> 9;
132	list_move_tail(&peer_req->w.list, &device->done_ee);
133
134	/*
135	 * Do not remove from the write_requests tree here: we did not send the
136	 * Ack yet and did not wake possibly waiting conflicting requests.
137	 * Removed from the tree from "drbd_process_done_ee" within the
138	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
139	 * _drbd_clear_done_ee.
140	 */
141
142	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
143
144	/* FIXME do we want to detach for failed REQ_OP_DISCARD?
145	 * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
146	if (peer_req->flags & EE_WAS_ERROR)
147		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
148
149	if (connection->cstate >= C_WF_REPORT_PARAMS) {
150		kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
151		if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
152			kref_put(&device->kref, drbd_destroy_device);
153	}
154	spin_unlock_irqrestore(&device->resource->req_lock, flags);
155
156	if (block_id == ID_SYNCER)
157		drbd_rs_complete_io(device, i.sector);
158
159	if (do_wake)
160		wake_up(&device->ee_wait);
161
162	if (do_al_complete_io)
163		drbd_al_complete_io(device, &i);
164
165	put_ldev(device);
166}
167
168/* writes on behalf of the partner, or resync writes,
169 * "submitted" by the receiver.
170 */
171void drbd_peer_request_endio(struct bio *bio)
172{
173	struct drbd_peer_request *peer_req = bio->bi_private;
174	struct drbd_device *device = peer_req->peer_device->device;
175	bool is_write = bio_data_dir(bio) == WRITE;
176	bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
177			  bio_op(bio) == REQ_OP_DISCARD;
178
179	if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
180		drbd_warn(device, "%s: error=%d s=%llus\n",
181				is_write ? (is_discard ? "discard" : "write")
182					: "read", bio->bi_status,
183				(unsigned long long)peer_req->i.sector);
184
185	if (bio->bi_status)
186		set_bit(__EE_WAS_ERROR, &peer_req->flags);
187
188	bio_put(bio); /* no need for the bio anymore */
189	if (atomic_dec_and_test(&peer_req->pending_bios)) {
190		if (is_write)
191			drbd_endio_write_sec_final(peer_req);
192		else
193			drbd_endio_read_sec_final(peer_req);
194	}
195}
196
197static void
198drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199{
200	panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
201		device->minor, device->resource->name, device->vnr);
202}
203
204/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205 */
206void drbd_request_endio(struct bio *bio)
207{
208	unsigned long flags;
209	struct drbd_request *req = bio->bi_private;
210	struct drbd_device *device = req->device;
211	struct bio_and_error m;
212	enum drbd_req_event what;
213
214	/* If this request was aborted locally before,
215	 * but now was completed "successfully",
216	 * chances are that this caused arbitrary data corruption.
217	 *
218	 * "aborting" requests, or force-detaching the disk, is intended for
219	 * completely blocked/hung local backing devices which do no longer
220	 * complete requests at all, not even do error completions.  In this
221	 * situation, usually a hard-reset and failover is the only way out.
222	 *
223	 * By "aborting", basically faking a local error-completion,
224	 * we allow for a more graceful swichover by cleanly migrating services.
225	 * Still the affected node has to be rebooted "soon".
226	 *
227	 * By completing these requests, we allow the upper layers to re-use
228	 * the associated data pages.
229	 *
230	 * If later the local backing device "recovers", and now DMAs some data
231	 * from disk into the original request pages, in the best case it will
232	 * just put random data into unused pages; but typically it will corrupt
233	 * meanwhile completely unrelated data, causing all sorts of damage.
234	 *
235	 * Which means delayed successful completion,
236	 * especially for READ requests,
237	 * is a reason to panic().
238	 *
239	 * We assume that a delayed *error* completion is OK,
240	 * though we still will complain noisily about it.
241	 */
242	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
243		if (__ratelimit(&drbd_ratelimit_state))
244			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
245
246		if (!bio->bi_status)
247			drbd_panic_after_delayed_completion_of_aborted_request(device);
248	}
249
250	/* to avoid recursion in __req_mod */
251	if (unlikely(bio->bi_status)) {
252		switch (bio_op(bio)) {
253		case REQ_OP_WRITE_ZEROES:
254		case REQ_OP_DISCARD:
255			if (bio->bi_status == BLK_STS_NOTSUPP)
256				what = DISCARD_COMPLETED_NOTSUPP;
257			else
258				what = DISCARD_COMPLETED_WITH_ERROR;
259			break;
260		case REQ_OP_READ:
261			if (bio->bi_opf & REQ_RAHEAD)
262				what = READ_AHEAD_COMPLETED_WITH_ERROR;
263			else
264				what = READ_COMPLETED_WITH_ERROR;
265			break;
266		default:
267			what = WRITE_COMPLETED_WITH_ERROR;
268			break;
269		}
270	} else {
271		what = COMPLETED_OK;
272	}
273
274	req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
275	bio_put(bio);
276
277	/* not req_mod(), we need irqsave here! */
278	spin_lock_irqsave(&device->resource->req_lock, flags);
279	__req_mod(req, what, &m);
280	spin_unlock_irqrestore(&device->resource->req_lock, flags);
281	put_ldev(device);
282
283	if (m.bio)
284		complete_master_bio(device, &m);
285}
286
287void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
288{
289	SHASH_DESC_ON_STACK(desc, tfm);
290	struct page *page = peer_req->pages;
291	struct page *tmp;
292	unsigned len;
293	void *src;
294
295	desc->tfm = tfm;
296
297	crypto_shash_init(desc);
298
299	src = kmap_atomic(page);
300	while ((tmp = page_chain_next(page))) {
301		/* all but the last page will be fully used */
302		crypto_shash_update(desc, src, PAGE_SIZE);
303		kunmap_atomic(src);
304		page = tmp;
305		src = kmap_atomic(page);
306	}
307	/* and now the last, possibly only partially used page */
308	len = peer_req->i.size & (PAGE_SIZE - 1);
309	crypto_shash_update(desc, src, len ?: PAGE_SIZE);
310	kunmap_atomic(src);
311
312	crypto_shash_final(desc, digest);
313	shash_desc_zero(desc);
314}
315
316void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
317{
318	SHASH_DESC_ON_STACK(desc, tfm);
319	struct bio_vec bvec;
320	struct bvec_iter iter;
321
322	desc->tfm = tfm;
323
324	crypto_shash_init(desc);
325
326	bio_for_each_segment(bvec, bio, iter) {
327		u8 *src;
328
329		src = kmap_atomic(bvec.bv_page);
330		crypto_shash_update(desc, src + bvec.bv_offset, bvec.bv_len);
331		kunmap_atomic(src);
332
333		/* REQ_OP_WRITE_SAME has only one segment,
334		 * checksum the payload only once. */
335		if (bio_op(bio) == REQ_OP_WRITE_SAME)
336			break;
337	}
338	crypto_shash_final(desc, digest);
339	shash_desc_zero(desc);
340}
341
342/* MAYBE merge common code with w_e_end_ov_req */
343static int w_e_send_csum(struct drbd_work *w, int cancel)
344{
345	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
346	struct drbd_peer_device *peer_device = peer_req->peer_device;
347	struct drbd_device *device = peer_device->device;
348	int digest_size;
349	void *digest;
350	int err = 0;
351
352	if (unlikely(cancel))
353		goto out;
354
355	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
356		goto out;
357
358	digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
359	digest = kmalloc(digest_size, GFP_NOIO);
360	if (digest) {
361		sector_t sector = peer_req->i.sector;
362		unsigned int size = peer_req->i.size;
363		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
364		/* Free peer_req and pages before send.
365		 * In case we block on congestion, we could otherwise run into
366		 * some distributed deadlock, if the other side blocks on
367		 * congestion as well, because our receiver blocks in
368		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
369		drbd_free_peer_req(device, peer_req);
370		peer_req = NULL;
371		inc_rs_pending(device);
372		err = drbd_send_drequest_csum(peer_device, sector, size,
373					      digest, digest_size,
374					      P_CSUM_RS_REQUEST);
375		kfree(digest);
376	} else {
377		drbd_err(device, "kmalloc() of digest failed.\n");
378		err = -ENOMEM;
379	}
380
381out:
382	if (peer_req)
383		drbd_free_peer_req(device, peer_req);
384
385	if (unlikely(err))
386		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
387	return err;
388}
389
390#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
391
392static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
393{
394	struct drbd_device *device = peer_device->device;
395	struct drbd_peer_request *peer_req;
396
397	if (!get_ldev(device))
398		return -EIO;
399
400	/* GFP_TRY, because if there is no memory available right now, this may
401	 * be rescheduled for later. It is "only" background resync, after all. */
402	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
403				       size, size, GFP_TRY);
404	if (!peer_req)
405		goto defer;
406
407	peer_req->w.cb = w_e_send_csum;
408	spin_lock_irq(&device->resource->req_lock);
409	list_add_tail(&peer_req->w.list, &device->read_ee);
410	spin_unlock_irq(&device->resource->req_lock);
411
412	atomic_add(size >> 9, &device->rs_sect_ev);
413	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
414				     DRBD_FAULT_RS_RD) == 0)
415		return 0;
416
417	/* If it failed because of ENOMEM, retry should help.  If it failed
418	 * because bio_add_page failed (probably broken lower level driver),
419	 * retry may or may not help.
420	 * If it does not, you may need to force disconnect. */
421	spin_lock_irq(&device->resource->req_lock);
422	list_del(&peer_req->w.list);
423	spin_unlock_irq(&device->resource->req_lock);
424
425	drbd_free_peer_req(device, peer_req);
426defer:
427	put_ldev(device);
428	return -EAGAIN;
429}
430
431int w_resync_timer(struct drbd_work *w, int cancel)
432{
433	struct drbd_device *device =
434		container_of(w, struct drbd_device, resync_work);
435
436	switch (device->state.conn) {
437	case C_VERIFY_S:
438		make_ov_request(device, cancel);
439		break;
440	case C_SYNC_TARGET:
441		make_resync_request(device, cancel);
442		break;
443	}
444
445	return 0;
446}
447
448void resync_timer_fn(struct timer_list *t)
449{
450	struct drbd_device *device = from_timer(device, t, resync_timer);
451
452	drbd_queue_work_if_unqueued(
453		&first_peer_device(device)->connection->sender_work,
454		&device->resync_work);
455}
456
457static void fifo_set(struct fifo_buffer *fb, int value)
458{
459	int i;
460
461	for (i = 0; i < fb->size; i++)
462		fb->values[i] = value;
463}
464
465static int fifo_push(struct fifo_buffer *fb, int value)
466{
467	int ov;
468
469	ov = fb->values[fb->head_index];
470	fb->values[fb->head_index++] = value;
471
472	if (fb->head_index >= fb->size)
473		fb->head_index = 0;
474
475	return ov;
476}
477
478static void fifo_add_val(struct fifo_buffer *fb, int value)
479{
480	int i;
481
482	for (i = 0; i < fb->size; i++)
483		fb->values[i] += value;
484}
485
486struct fifo_buffer *fifo_alloc(unsigned int fifo_size)
487{
488	struct fifo_buffer *fb;
489
490	fb = kzalloc(struct_size(fb, values, fifo_size), GFP_NOIO);
491	if (!fb)
492		return NULL;
493
494	fb->head_index = 0;
495	fb->size = fifo_size;
496	fb->total = 0;
497
498	return fb;
499}
500
501static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
502{
503	struct disk_conf *dc;
504	unsigned int want;     /* The number of sectors we want in-flight */
505	int req_sect; /* Number of sectors to request in this turn */
506	int correction; /* Number of sectors more we need in-flight */
507	int cps; /* correction per invocation of drbd_rs_controller() */
508	int steps; /* Number of time steps to plan ahead */
509	int curr_corr;
510	int max_sect;
511	struct fifo_buffer *plan;
512
513	dc = rcu_dereference(device->ldev->disk_conf);
514	plan = rcu_dereference(device->rs_plan_s);
515
516	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
517
518	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
519		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
520	} else { /* normal path */
521		want = dc->c_fill_target ? dc->c_fill_target :
522			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
523	}
524
525	correction = want - device->rs_in_flight - plan->total;
526
527	/* Plan ahead */
528	cps = correction / steps;
529	fifo_add_val(plan, cps);
530	plan->total += cps * steps;
531
532	/* What we do in this step */
533	curr_corr = fifo_push(plan, 0);
534	plan->total -= curr_corr;
535
536	req_sect = sect_in + curr_corr;
537	if (req_sect < 0)
538		req_sect = 0;
539
540	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
541	if (req_sect > max_sect)
542		req_sect = max_sect;
543
544	/*
545	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
546		 sect_in, device->rs_in_flight, want, correction,
547		 steps, cps, device->rs_planed, curr_corr, req_sect);
548	*/
549
550	return req_sect;
551}
552
553static int drbd_rs_number_requests(struct drbd_device *device)
554{
555	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
556	int number, mxb;
557
558	sect_in = atomic_xchg(&device->rs_sect_in, 0);
559	device->rs_in_flight -= sect_in;
560
561	rcu_read_lock();
562	mxb = drbd_get_max_buffers(device) / 2;
563	if (rcu_dereference(device->rs_plan_s)->size) {
564		number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
565		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
566	} else {
567		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
568		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
569	}
570	rcu_read_unlock();
571
572	/* Don't have more than "max-buffers"/2 in-flight.
573	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
574	 * potentially causing a distributed deadlock on congestion during
575	 * online-verify or (checksum-based) resync, if max-buffers,
576	 * socket buffer sizes and resync rate settings are mis-configured. */
577
578	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
579	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
580	 * "number of pages" (typically also 4k),
581	 * but "rs_in_flight" is in "sectors" (512 Byte). */
582	if (mxb - device->rs_in_flight/8 < number)
583		number = mxb - device->rs_in_flight/8;
584
585	return number;
586}
587
588static int make_resync_request(struct drbd_device *const device, int cancel)
589{
590	struct drbd_peer_device *const peer_device = first_peer_device(device);
591	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
592	unsigned long bit;
593	sector_t sector;
594	const sector_t capacity = get_capacity(device->vdisk);
595	int max_bio_size;
596	int number, rollback_i, size;
597	int align, requeue = 0;
598	int i = 0;
599	int discard_granularity = 0;
600
601	if (unlikely(cancel))
602		return 0;
603
604	if (device->rs_total == 0) {
605		/* empty resync? */
606		drbd_resync_finished(device);
607		return 0;
608	}
609
610	if (!get_ldev(device)) {
611		/* Since we only need to access device->rsync a
612		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
613		   to continue resync with a broken disk makes no sense at
614		   all */
615		drbd_err(device, "Disk broke down during resync!\n");
616		return 0;
617	}
618
619	if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
620		rcu_read_lock();
621		discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
622		rcu_read_unlock();
623	}
624
625	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
626	number = drbd_rs_number_requests(device);
627	if (number <= 0)
628		goto requeue;
629
630	for (i = 0; i < number; i++) {
631		/* Stop generating RS requests when half of the send buffer is filled,
632		 * but notify TCP that we'd like to have more space. */
633		mutex_lock(&connection->data.mutex);
634		if (connection->data.socket) {
635			struct sock *sk = connection->data.socket->sk;
636			int queued = sk->sk_wmem_queued;
637			int sndbuf = sk->sk_sndbuf;
638			if (queued > sndbuf / 2) {
639				requeue = 1;
640				if (sk->sk_socket)
641					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
642			}
643		} else
644			requeue = 1;
645		mutex_unlock(&connection->data.mutex);
646		if (requeue)
647			goto requeue;
648
649next_sector:
650		size = BM_BLOCK_SIZE;
651		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
652
653		if (bit == DRBD_END_OF_BITMAP) {
654			device->bm_resync_fo = drbd_bm_bits(device);
655			put_ldev(device);
656			return 0;
657		}
658
659		sector = BM_BIT_TO_SECT(bit);
660
661		if (drbd_try_rs_begin_io(device, sector)) {
662			device->bm_resync_fo = bit;
663			goto requeue;
664		}
665		device->bm_resync_fo = bit + 1;
666
667		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
668			drbd_rs_complete_io(device, sector);
669			goto next_sector;
670		}
671
672#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
673		/* try to find some adjacent bits.
674		 * we stop if we have already the maximum req size.
675		 *
676		 * Additionally always align bigger requests, in order to
677		 * be prepared for all stripe sizes of software RAIDs.
678		 */
679		align = 1;
680		rollback_i = i;
681		while (i < number) {
682			if (size + BM_BLOCK_SIZE > max_bio_size)
683				break;
684
685			/* Be always aligned */
686			if (sector & ((1<<(align+3))-1))
687				break;
688
689			if (discard_granularity && size == discard_granularity)
690				break;
691
692			/* do not cross extent boundaries */
693			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
694				break;
695			/* now, is it actually dirty, after all?
696			 * caution, drbd_bm_test_bit is tri-state for some
697			 * obscure reason; ( b == 0 ) would get the out-of-band
698			 * only accidentally right because of the "oddly sized"
699			 * adjustment below */
700			if (drbd_bm_test_bit(device, bit+1) != 1)
701				break;
702			bit++;
703			size += BM_BLOCK_SIZE;
704			if ((BM_BLOCK_SIZE << align) <= size)
705				align++;
706			i++;
707		}
708		/* if we merged some,
709		 * reset the offset to start the next drbd_bm_find_next from */
710		if (size > BM_BLOCK_SIZE)
711			device->bm_resync_fo = bit + 1;
712#endif
713
714		/* adjust very last sectors, in case we are oddly sized */
715		if (sector + (size>>9) > capacity)
716			size = (capacity-sector)<<9;
717
718		if (device->use_csums) {
719			switch (read_for_csum(peer_device, sector, size)) {
720			case -EIO: /* Disk failure */
721				put_ldev(device);
722				return -EIO;
723			case -EAGAIN: /* allocation failed, or ldev busy */
724				drbd_rs_complete_io(device, sector);
725				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
726				i = rollback_i;
727				goto requeue;
728			case 0:
729				/* everything ok */
730				break;
731			default:
732				BUG();
733			}
734		} else {
735			int err;
736
737			inc_rs_pending(device);
738			err = drbd_send_drequest(peer_device,
739						 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
740						 sector, size, ID_SYNCER);
741			if (err) {
742				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
743				dec_rs_pending(device);
744				put_ldev(device);
745				return err;
746			}
747		}
748	}
749
750	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
751		/* last syncer _request_ was sent,
752		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
753		 * next sync group will resume), as soon as we receive the last
754		 * resync data block, and the last bit is cleared.
755		 * until then resync "work" is "inactive" ...
756		 */
757		put_ldev(device);
758		return 0;
759	}
760
761 requeue:
762	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
763	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
764	put_ldev(device);
765	return 0;
766}
767
768static int make_ov_request(struct drbd_device *device, int cancel)
769{
770	int number, i, size;
771	sector_t sector;
772	const sector_t capacity = get_capacity(device->vdisk);
773	bool stop_sector_reached = false;
774
775	if (unlikely(cancel))
776		return 1;
777
778	number = drbd_rs_number_requests(device);
779
780	sector = device->ov_position;
781	for (i = 0; i < number; i++) {
782		if (sector >= capacity)
783			return 1;
784
785		/* We check for "finished" only in the reply path:
786		 * w_e_end_ov_reply().
787		 * We need to send at least one request out. */
788		stop_sector_reached = i > 0
789			&& verify_can_do_stop_sector(device)
790			&& sector >= device->ov_stop_sector;
791		if (stop_sector_reached)
792			break;
793
794		size = BM_BLOCK_SIZE;
795
796		if (drbd_try_rs_begin_io(device, sector)) {
797			device->ov_position = sector;
798			goto requeue;
799		}
800
801		if (sector + (size>>9) > capacity)
802			size = (capacity-sector)<<9;
803
804		inc_rs_pending(device);
805		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
806			dec_rs_pending(device);
807			return 0;
808		}
809		sector += BM_SECT_PER_BIT;
810	}
811	device->ov_position = sector;
812
813 requeue:
814	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
815	if (i == 0 || !stop_sector_reached)
816		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
817	return 1;
818}
819
820int w_ov_finished(struct drbd_work *w, int cancel)
821{
822	struct drbd_device_work *dw =
823		container_of(w, struct drbd_device_work, w);
824	struct drbd_device *device = dw->device;
825	kfree(dw);
826	ov_out_of_sync_print(device);
827	drbd_resync_finished(device);
828
829	return 0;
830}
831
832static int w_resync_finished(struct drbd_work *w, int cancel)
833{
834	struct drbd_device_work *dw =
835		container_of(w, struct drbd_device_work, w);
836	struct drbd_device *device = dw->device;
837	kfree(dw);
838
839	drbd_resync_finished(device);
840
841	return 0;
842}
843
844static void ping_peer(struct drbd_device *device)
845{
846	struct drbd_connection *connection = first_peer_device(device)->connection;
847
848	clear_bit(GOT_PING_ACK, &connection->flags);
849	request_ping(connection);
850	wait_event(connection->ping_wait,
851		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
852}
853
854int drbd_resync_finished(struct drbd_device *device)
855{
856	struct drbd_connection *connection = first_peer_device(device)->connection;
857	unsigned long db, dt, dbdt;
858	unsigned long n_oos;
859	union drbd_state os, ns;
860	struct drbd_device_work *dw;
861	char *khelper_cmd = NULL;
862	int verify_done = 0;
863
864	/* Remove all elements from the resync LRU. Since future actions
865	 * might set bits in the (main) bitmap, then the entries in the
866	 * resync LRU would be wrong. */
867	if (drbd_rs_del_all(device)) {
868		/* In case this is not possible now, most probably because
869		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
870		 * queue (or even the read operations for those packets
871		 * is not finished by now).   Retry in 100ms. */
872
873		schedule_timeout_interruptible(HZ / 10);
874		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
875		if (dw) {
876			dw->w.cb = w_resync_finished;
877			dw->device = device;
878			drbd_queue_work(&connection->sender_work, &dw->w);
879			return 1;
880		}
881		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
882	}
883
884	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
885	if (dt <= 0)
886		dt = 1;
887
888	db = device->rs_total;
889	/* adjust for verify start and stop sectors, respective reached position */
890	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
891		db -= device->ov_left;
892
893	dbdt = Bit2KB(db/dt);
894	device->rs_paused /= HZ;
895
896	if (!get_ldev(device))
897		goto out;
898
899	ping_peer(device);
900
901	spin_lock_irq(&device->resource->req_lock);
902	os = drbd_read_state(device);
903
904	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
905
906	/* This protects us against multiple calls (that can happen in the presence
907	   of application IO), and against connectivity loss just before we arrive here. */
908	if (os.conn <= C_CONNECTED)
909		goto out_unlock;
910
911	ns = os;
912	ns.conn = C_CONNECTED;
913
914	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
915	     verify_done ? "Online verify" : "Resync",
916	     dt + device->rs_paused, device->rs_paused, dbdt);
917
918	n_oos = drbd_bm_total_weight(device);
919
920	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
921		if (n_oos) {
922			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
923			      n_oos, Bit2KB(1));
924			khelper_cmd = "out-of-sync";
925		}
926	} else {
927		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
928
929		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
930			khelper_cmd = "after-resync-target";
931
932		if (device->use_csums && device->rs_total) {
933			const unsigned long s = device->rs_same_csum;
934			const unsigned long t = device->rs_total;
935			const int ratio =
936				(t == 0)     ? 0 :
937			(t < 100000) ? ((s*100)/t) : (s/(t/100));
938			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
939			     "transferred %luK total %luK\n",
940			     ratio,
941			     Bit2KB(device->rs_same_csum),
942			     Bit2KB(device->rs_total - device->rs_same_csum),
943			     Bit2KB(device->rs_total));
944		}
945	}
946
947	if (device->rs_failed) {
948		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
949
950		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
951			ns.disk = D_INCONSISTENT;
952			ns.pdsk = D_UP_TO_DATE;
953		} else {
954			ns.disk = D_UP_TO_DATE;
955			ns.pdsk = D_INCONSISTENT;
956		}
957	} else {
958		ns.disk = D_UP_TO_DATE;
959		ns.pdsk = D_UP_TO_DATE;
960
961		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
962			if (device->p_uuid) {
963				int i;
964				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
965					_drbd_uuid_set(device, i, device->p_uuid[i]);
966				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
967				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
968			} else {
969				drbd_err(device, "device->p_uuid is NULL! BUG\n");
970			}
971		}
972
973		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
974			/* for verify runs, we don't update uuids here,
975			 * so there would be nothing to report. */
976			drbd_uuid_set_bm(device, 0UL);
977			drbd_print_uuids(device, "updated UUIDs");
978			if (device->p_uuid) {
979				/* Now the two UUID sets are equal, update what we
980				 * know of the peer. */
981				int i;
982				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
983					device->p_uuid[i] = device->ldev->md.uuid[i];
984			}
985		}
986	}
987
988	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
989out_unlock:
990	spin_unlock_irq(&device->resource->req_lock);
991
992	/* If we have been sync source, and have an effective fencing-policy,
993	 * once *all* volumes are back in sync, call "unfence". */
994	if (os.conn == C_SYNC_SOURCE) {
995		enum drbd_disk_state disk_state = D_MASK;
996		enum drbd_disk_state pdsk_state = D_MASK;
997		enum drbd_fencing_p fp = FP_DONT_CARE;
998
999		rcu_read_lock();
1000		fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1001		if (fp != FP_DONT_CARE) {
1002			struct drbd_peer_device *peer_device;
1003			int vnr;
1004			idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1005				struct drbd_device *device = peer_device->device;
1006				disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1007				pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1008			}
1009		}
1010		rcu_read_unlock();
1011		if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1012			conn_khelper(connection, "unfence-peer");
1013	}
1014
1015	put_ldev(device);
1016out:
1017	device->rs_total  = 0;
1018	device->rs_failed = 0;
1019	device->rs_paused = 0;
1020
1021	/* reset start sector, if we reached end of device */
1022	if (verify_done && device->ov_left == 0)
1023		device->ov_start_sector = 0;
1024
1025	drbd_md_sync(device);
1026
1027	if (khelper_cmd)
1028		drbd_khelper(device, khelper_cmd);
1029
1030	return 1;
1031}
1032
1033/* helper */
1034static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1035{
1036	if (drbd_peer_req_has_active_page(peer_req)) {
1037		/* This might happen if sendpage() has not finished */
1038		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1039		atomic_add(i, &device->pp_in_use_by_net);
1040		atomic_sub(i, &device->pp_in_use);
1041		spin_lock_irq(&device->resource->req_lock);
1042		list_add_tail(&peer_req->w.list, &device->net_ee);
1043		spin_unlock_irq(&device->resource->req_lock);
1044		wake_up(&drbd_pp_wait);
1045	} else
1046		drbd_free_peer_req(device, peer_req);
1047}
1048
1049/**
1050 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1051 * @w:		work object.
1052 * @cancel:	The connection will be closed anyways
1053 */
1054int w_e_end_data_req(struct drbd_work *w, int cancel)
1055{
1056	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1057	struct drbd_peer_device *peer_device = peer_req->peer_device;
1058	struct drbd_device *device = peer_device->device;
1059	int err;
1060
1061	if (unlikely(cancel)) {
1062		drbd_free_peer_req(device, peer_req);
1063		dec_unacked(device);
1064		return 0;
1065	}
1066
1067	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1068		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1069	} else {
1070		if (__ratelimit(&drbd_ratelimit_state))
1071			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1072			    (unsigned long long)peer_req->i.sector);
1073
1074		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1075	}
1076
1077	dec_unacked(device);
1078
1079	move_to_net_ee_or_free(device, peer_req);
1080
1081	if (unlikely(err))
1082		drbd_err(device, "drbd_send_block() failed\n");
1083	return err;
1084}
1085
1086static bool all_zero(struct drbd_peer_request *peer_req)
1087{
1088	struct page *page = peer_req->pages;
1089	unsigned int len = peer_req->i.size;
1090
1091	page_chain_for_each(page) {
1092		unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1093		unsigned int i, words = l / sizeof(long);
1094		unsigned long *d;
1095
1096		d = kmap_atomic(page);
1097		for (i = 0; i < words; i++) {
1098			if (d[i]) {
1099				kunmap_atomic(d);
1100				return false;
1101			}
1102		}
1103		kunmap_atomic(d);
1104		len -= l;
1105	}
1106
1107	return true;
1108}
1109
1110/**
1111 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1112 * @w:		work object.
1113 * @cancel:	The connection will be closed anyways
1114 */
1115int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1116{
1117	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1118	struct drbd_peer_device *peer_device = peer_req->peer_device;
1119	struct drbd_device *device = peer_device->device;
1120	int err;
1121
1122	if (unlikely(cancel)) {
1123		drbd_free_peer_req(device, peer_req);
1124		dec_unacked(device);
1125		return 0;
1126	}
1127
1128	if (get_ldev_if_state(device, D_FAILED)) {
1129		drbd_rs_complete_io(device, peer_req->i.sector);
1130		put_ldev(device);
1131	}
1132
1133	if (device->state.conn == C_AHEAD) {
1134		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1135	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1136		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1137			inc_rs_pending(device);
1138			if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1139				err = drbd_send_rs_deallocated(peer_device, peer_req);
1140			else
1141				err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1142		} else {
1143			if (__ratelimit(&drbd_ratelimit_state))
1144				drbd_err(device, "Not sending RSDataReply, "
1145				    "partner DISKLESS!\n");
1146			err = 0;
1147		}
1148	} else {
1149		if (__ratelimit(&drbd_ratelimit_state))
1150			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1151			    (unsigned long long)peer_req->i.sector);
1152
1153		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1154
1155		/* update resync data with failure */
1156		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1157	}
1158
1159	dec_unacked(device);
1160
1161	move_to_net_ee_or_free(device, peer_req);
1162
1163	if (unlikely(err))
1164		drbd_err(device, "drbd_send_block() failed\n");
1165	return err;
1166}
1167
1168int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1169{
1170	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1171	struct drbd_peer_device *peer_device = peer_req->peer_device;
1172	struct drbd_device *device = peer_device->device;
1173	struct digest_info *di;
1174	int digest_size;
1175	void *digest = NULL;
1176	int err, eq = 0;
1177
1178	if (unlikely(cancel)) {
1179		drbd_free_peer_req(device, peer_req);
1180		dec_unacked(device);
1181		return 0;
1182	}
1183
1184	if (get_ldev(device)) {
1185		drbd_rs_complete_io(device, peer_req->i.sector);
1186		put_ldev(device);
1187	}
1188
1189	di = peer_req->digest;
1190
1191	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1192		/* quick hack to try to avoid a race against reconfiguration.
1193		 * a real fix would be much more involved,
1194		 * introducing more locking mechanisms */
1195		if (peer_device->connection->csums_tfm) {
1196			digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1197			D_ASSERT(device, digest_size == di->digest_size);
1198			digest = kmalloc(digest_size, GFP_NOIO);
1199		}
1200		if (digest) {
1201			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1202			eq = !memcmp(digest, di->digest, digest_size);
1203			kfree(digest);
1204		}
1205
1206		if (eq) {
1207			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1208			/* rs_same_csums unit is BM_BLOCK_SIZE */
1209			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1210			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1211		} else {
1212			inc_rs_pending(device);
1213			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1214			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1215			kfree(di);
1216			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1217		}
1218	} else {
1219		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1220		if (__ratelimit(&drbd_ratelimit_state))
1221			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1222	}
1223
1224	dec_unacked(device);
1225	move_to_net_ee_or_free(device, peer_req);
1226
1227	if (unlikely(err))
1228		drbd_err(device, "drbd_send_block/ack() failed\n");
1229	return err;
1230}
1231
1232int w_e_end_ov_req(struct drbd_work *w, int cancel)
1233{
1234	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1235	struct drbd_peer_device *peer_device = peer_req->peer_device;
1236	struct drbd_device *device = peer_device->device;
1237	sector_t sector = peer_req->i.sector;
1238	unsigned int size = peer_req->i.size;
1239	int digest_size;
1240	void *digest;
1241	int err = 0;
1242
1243	if (unlikely(cancel))
1244		goto out;
1245
1246	digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1247	digest = kmalloc(digest_size, GFP_NOIO);
1248	if (!digest) {
1249		err = 1;	/* terminate the connection in case the allocation failed */
1250		goto out;
1251	}
1252
1253	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1254		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1255	else
1256		memset(digest, 0, digest_size);
1257
1258	/* Free e and pages before send.
1259	 * In case we block on congestion, we could otherwise run into
1260	 * some distributed deadlock, if the other side blocks on
1261	 * congestion as well, because our receiver blocks in
1262	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1263	drbd_free_peer_req(device, peer_req);
1264	peer_req = NULL;
1265	inc_rs_pending(device);
1266	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1267	if (err)
1268		dec_rs_pending(device);
1269	kfree(digest);
1270
1271out:
1272	if (peer_req)
1273		drbd_free_peer_req(device, peer_req);
1274	dec_unacked(device);
1275	return err;
1276}
1277
1278void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1279{
1280	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1281		device->ov_last_oos_size += size>>9;
1282	} else {
1283		device->ov_last_oos_start = sector;
1284		device->ov_last_oos_size = size>>9;
1285	}
1286	drbd_set_out_of_sync(device, sector, size);
1287}
1288
1289int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1290{
1291	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1292	struct drbd_peer_device *peer_device = peer_req->peer_device;
1293	struct drbd_device *device = peer_device->device;
1294	struct digest_info *di;
1295	void *digest;
1296	sector_t sector = peer_req->i.sector;
1297	unsigned int size = peer_req->i.size;
1298	int digest_size;
1299	int err, eq = 0;
1300	bool stop_sector_reached = false;
1301
1302	if (unlikely(cancel)) {
1303		drbd_free_peer_req(device, peer_req);
1304		dec_unacked(device);
1305		return 0;
1306	}
1307
1308	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1309	 * the resync lru has been cleaned up already */
1310	if (get_ldev(device)) {
1311		drbd_rs_complete_io(device, peer_req->i.sector);
1312		put_ldev(device);
1313	}
1314
1315	di = peer_req->digest;
1316
1317	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1318		digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1319		digest = kmalloc(digest_size, GFP_NOIO);
1320		if (digest) {
1321			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1322
1323			D_ASSERT(device, digest_size == di->digest_size);
1324			eq = !memcmp(digest, di->digest, digest_size);
1325			kfree(digest);
1326		}
1327	}
1328
1329	/* Free peer_req and pages before send.
1330	 * In case we block on congestion, we could otherwise run into
1331	 * some distributed deadlock, if the other side blocks on
1332	 * congestion as well, because our receiver blocks in
1333	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1334	drbd_free_peer_req(device, peer_req);
1335	if (!eq)
1336		drbd_ov_out_of_sync_found(device, sector, size);
1337	else
1338		ov_out_of_sync_print(device);
1339
1340	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1341			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1342
1343	dec_unacked(device);
1344
1345	--device->ov_left;
1346
1347	/* let's advance progress step marks only for every other megabyte */
1348	if ((device->ov_left & 0x200) == 0x200)
1349		drbd_advance_rs_marks(device, device->ov_left);
1350
1351	stop_sector_reached = verify_can_do_stop_sector(device) &&
1352		(sector + (size>>9)) >= device->ov_stop_sector;
1353
1354	if (device->ov_left == 0 || stop_sector_reached) {
1355		ov_out_of_sync_print(device);
1356		drbd_resync_finished(device);
1357	}
1358
1359	return err;
1360}
1361
1362/* FIXME
1363 * We need to track the number of pending barrier acks,
1364 * and to be able to wait for them.
1365 * See also comment in drbd_adm_attach before drbd_suspend_io.
1366 */
1367static int drbd_send_barrier(struct drbd_connection *connection)
1368{
1369	struct p_barrier *p;
1370	struct drbd_socket *sock;
1371
1372	sock = &connection->data;
1373	p = conn_prepare_command(connection, sock);
1374	if (!p)
1375		return -EIO;
1376	p->barrier = connection->send.current_epoch_nr;
1377	p->pad = 0;
1378	connection->send.current_epoch_writes = 0;
1379	connection->send.last_sent_barrier_jif = jiffies;
1380
1381	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1382}
1383
1384static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1385{
1386	struct drbd_socket *sock = &pd->connection->data;
1387	if (!drbd_prepare_command(pd, sock))
1388		return -EIO;
1389	return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1390}
1391
1392int w_send_write_hint(struct drbd_work *w, int cancel)
1393{
1394	struct drbd_device *device =
1395		container_of(w, struct drbd_device, unplug_work);
1396
1397	if (cancel)
1398		return 0;
1399	return pd_send_unplug_remote(first_peer_device(device));
1400}
1401
1402static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1403{
1404	if (!connection->send.seen_any_write_yet) {
1405		connection->send.seen_any_write_yet = true;
1406		connection->send.current_epoch_nr = epoch;
1407		connection->send.current_epoch_writes = 0;
1408		connection->send.last_sent_barrier_jif = jiffies;
1409	}
1410}
1411
1412static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1413{
1414	/* re-init if first write on this connection */
1415	if (!connection->send.seen_any_write_yet)
1416		return;
1417	if (connection->send.current_epoch_nr != epoch) {
1418		if (connection->send.current_epoch_writes)
1419			drbd_send_barrier(connection);
1420		connection->send.current_epoch_nr = epoch;
1421	}
1422}
1423
1424int w_send_out_of_sync(struct drbd_work *w, int cancel)
1425{
1426	struct drbd_request *req = container_of(w, struct drbd_request, w);
1427	struct drbd_device *device = req->device;
1428	struct drbd_peer_device *const peer_device = first_peer_device(device);
1429	struct drbd_connection *const connection = peer_device->connection;
1430	int err;
1431
1432	if (unlikely(cancel)) {
1433		req_mod(req, SEND_CANCELED);
1434		return 0;
1435	}
1436	req->pre_send_jif = jiffies;
1437
1438	/* this time, no connection->send.current_epoch_writes++;
1439	 * If it was sent, it was the closing barrier for the last
1440	 * replicated epoch, before we went into AHEAD mode.
1441	 * No more barriers will be sent, until we leave AHEAD mode again. */
1442	maybe_send_barrier(connection, req->epoch);
1443
1444	err = drbd_send_out_of_sync(peer_device, req);
1445	req_mod(req, OOS_HANDED_TO_NETWORK);
1446
1447	return err;
1448}
1449
1450/**
1451 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1452 * @w:		work object.
1453 * @cancel:	The connection will be closed anyways
1454 */
1455int w_send_dblock(struct drbd_work *w, int cancel)
1456{
1457	struct drbd_request *req = container_of(w, struct drbd_request, w);
1458	struct drbd_device *device = req->device;
1459	struct drbd_peer_device *const peer_device = first_peer_device(device);
1460	struct drbd_connection *connection = peer_device->connection;
1461	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1462	int err;
1463
1464	if (unlikely(cancel)) {
1465		req_mod(req, SEND_CANCELED);
1466		return 0;
1467	}
1468	req->pre_send_jif = jiffies;
1469
1470	re_init_if_first_write(connection, req->epoch);
1471	maybe_send_barrier(connection, req->epoch);
1472	connection->send.current_epoch_writes++;
1473
1474	err = drbd_send_dblock(peer_device, req);
1475	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1476
1477	if (do_send_unplug && !err)
1478		pd_send_unplug_remote(peer_device);
1479
1480	return err;
1481}
1482
1483/**
1484 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1485 * @w:		work object.
1486 * @cancel:	The connection will be closed anyways
1487 */
1488int w_send_read_req(struct drbd_work *w, int cancel)
1489{
1490	struct drbd_request *req = container_of(w, struct drbd_request, w);
1491	struct drbd_device *device = req->device;
1492	struct drbd_peer_device *const peer_device = first_peer_device(device);
1493	struct drbd_connection *connection = peer_device->connection;
1494	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1495	int err;
1496
1497	if (unlikely(cancel)) {
1498		req_mod(req, SEND_CANCELED);
1499		return 0;
1500	}
1501	req->pre_send_jif = jiffies;
1502
1503	/* Even read requests may close a write epoch,
1504	 * if there was any yet. */
1505	maybe_send_barrier(connection, req->epoch);
1506
1507	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1508				 (unsigned long)req);
1509
1510	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1511
1512	if (do_send_unplug && !err)
1513		pd_send_unplug_remote(peer_device);
1514
1515	return err;
1516}
1517
1518int w_restart_disk_io(struct drbd_work *w, int cancel)
1519{
1520	struct drbd_request *req = container_of(w, struct drbd_request, w);
1521	struct drbd_device *device = req->device;
1522
1523	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1524		drbd_al_begin_io(device, &req->i);
1525
1526	drbd_req_make_private_bio(req, req->master_bio);
1527	bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1528	submit_bio_noacct(req->private_bio);
1529
1530	return 0;
1531}
1532
1533static int _drbd_may_sync_now(struct drbd_device *device)
1534{
1535	struct drbd_device *odev = device;
1536	int resync_after;
1537
1538	while (1) {
1539		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1540			return 1;
1541		rcu_read_lock();
1542		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1543		rcu_read_unlock();
1544		if (resync_after == -1)
1545			return 1;
1546		odev = minor_to_device(resync_after);
1547		if (!odev)
1548			return 1;
1549		if ((odev->state.conn >= C_SYNC_SOURCE &&
1550		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1551		    odev->state.aftr_isp || odev->state.peer_isp ||
1552		    odev->state.user_isp)
1553			return 0;
1554	}
1555}
1556
1557/**
1558 * drbd_pause_after() - Pause resync on all devices that may not resync now
1559 * @device:	DRBD device.
1560 *
1561 * Called from process context only (admin command and after_state_ch).
1562 */
1563static bool drbd_pause_after(struct drbd_device *device)
1564{
1565	bool changed = false;
1566	struct drbd_device *odev;
1567	int i;
1568
1569	rcu_read_lock();
1570	idr_for_each_entry(&drbd_devices, odev, i) {
1571		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1572			continue;
1573		if (!_drbd_may_sync_now(odev) &&
1574		    _drbd_set_state(_NS(odev, aftr_isp, 1),
1575				    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1576			changed = true;
1577	}
1578	rcu_read_unlock();
1579
1580	return changed;
1581}
1582
1583/**
1584 * drbd_resume_next() - Resume resync on all devices that may resync now
1585 * @device:	DRBD device.
1586 *
1587 * Called from process context only (admin command and worker).
1588 */
1589static bool drbd_resume_next(struct drbd_device *device)
1590{
1591	bool changed = false;
1592	struct drbd_device *odev;
1593	int i;
1594
1595	rcu_read_lock();
1596	idr_for_each_entry(&drbd_devices, odev, i) {
1597		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1598			continue;
1599		if (odev->state.aftr_isp) {
1600			if (_drbd_may_sync_now(odev) &&
1601			    _drbd_set_state(_NS(odev, aftr_isp, 0),
1602					    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1603				changed = true;
1604		}
1605	}
1606	rcu_read_unlock();
1607	return changed;
1608}
1609
1610void resume_next_sg(struct drbd_device *device)
1611{
1612	lock_all_resources();
1613	drbd_resume_next(device);
1614	unlock_all_resources();
1615}
1616
1617void suspend_other_sg(struct drbd_device *device)
1618{
1619	lock_all_resources();
1620	drbd_pause_after(device);
1621	unlock_all_resources();
1622}
1623
1624/* caller must lock_all_resources() */
1625enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1626{
1627	struct drbd_device *odev;
1628	int resync_after;
1629
1630	if (o_minor == -1)
1631		return NO_ERROR;
1632	if (o_minor < -1 || o_minor > MINORMASK)
1633		return ERR_RESYNC_AFTER;
1634
1635	/* check for loops */
1636	odev = minor_to_device(o_minor);
1637	while (1) {
1638		if (odev == device)
1639			return ERR_RESYNC_AFTER_CYCLE;
1640
1641		/* You are free to depend on diskless, non-existing,
1642		 * or not yet/no longer existing minors.
1643		 * We only reject dependency loops.
1644		 * We cannot follow the dependency chain beyond a detached or
1645		 * missing minor.
1646		 */
1647		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1648			return NO_ERROR;
1649
1650		rcu_read_lock();
1651		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1652		rcu_read_unlock();
1653		/* dependency chain ends here, no cycles. */
1654		if (resync_after == -1)
1655			return NO_ERROR;
1656
1657		/* follow the dependency chain */
1658		odev = minor_to_device(resync_after);
1659	}
1660}
1661
1662/* caller must lock_all_resources() */
1663void drbd_resync_after_changed(struct drbd_device *device)
1664{
1665	int changed;
1666
1667	do {
1668		changed  = drbd_pause_after(device);
1669		changed |= drbd_resume_next(device);
1670	} while (changed);
1671}
1672
1673void drbd_rs_controller_reset(struct drbd_device *device)
1674{
1675	struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
1676	struct fifo_buffer *plan;
1677
1678	atomic_set(&device->rs_sect_in, 0);
1679	atomic_set(&device->rs_sect_ev, 0);
1680	device->rs_in_flight = 0;
1681	device->rs_last_events = (int)part_stat_read_accum(&disk->part0, sectors);
1682
1683	/* Updating the RCU protected object in place is necessary since
1684	   this function gets called from atomic context.
1685	   It is valid since all other updates also lead to an completely
1686	   empty fifo */
1687	rcu_read_lock();
1688	plan = rcu_dereference(device->rs_plan_s);
1689	plan->total = 0;
1690	fifo_set(plan, 0);
1691	rcu_read_unlock();
1692}
1693
1694void start_resync_timer_fn(struct timer_list *t)
1695{
1696	struct drbd_device *device = from_timer(device, t, start_resync_timer);
1697	drbd_device_post_work(device, RS_START);
1698}
1699
1700static void do_start_resync(struct drbd_device *device)
1701{
1702	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1703		drbd_warn(device, "postponing start_resync ...\n");
1704		device->start_resync_timer.expires = jiffies + HZ/10;
1705		add_timer(&device->start_resync_timer);
1706		return;
1707	}
1708
1709	drbd_start_resync(device, C_SYNC_SOURCE);
1710	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1711}
1712
1713static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1714{
1715	bool csums_after_crash_only;
1716	rcu_read_lock();
1717	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1718	rcu_read_unlock();
1719	return connection->agreed_pro_version >= 89 &&		/* supported? */
1720		connection->csums_tfm &&			/* configured? */
1721		(csums_after_crash_only == false		/* use for each resync? */
1722		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
1723}
1724
1725/**
1726 * drbd_start_resync() - Start the resync process
1727 * @device:	DRBD device.
1728 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1729 *
1730 * This function might bring you directly into one of the
1731 * C_PAUSED_SYNC_* states.
1732 */
1733void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1734{
1735	struct drbd_peer_device *peer_device = first_peer_device(device);
1736	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1737	union drbd_state ns;
1738	int r;
1739
1740	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1741		drbd_err(device, "Resync already running!\n");
1742		return;
1743	}
1744
1745	if (!connection) {
1746		drbd_err(device, "No connection to peer, aborting!\n");
1747		return;
1748	}
1749
1750	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1751		if (side == C_SYNC_TARGET) {
1752			/* Since application IO was locked out during C_WF_BITMAP_T and
1753			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1754			   we check that we might make the data inconsistent. */
1755			r = drbd_khelper(device, "before-resync-target");
1756			r = (r >> 8) & 0xff;
1757			if (r > 0) {
1758				drbd_info(device, "before-resync-target handler returned %d, "
1759					 "dropping connection.\n", r);
1760				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1761				return;
1762			}
1763		} else /* C_SYNC_SOURCE */ {
1764			r = drbd_khelper(device, "before-resync-source");
1765			r = (r >> 8) & 0xff;
1766			if (r > 0) {
1767				if (r == 3) {
1768					drbd_info(device, "before-resync-source handler returned %d, "
1769						 "ignoring. Old userland tools?", r);
1770				} else {
1771					drbd_info(device, "before-resync-source handler returned %d, "
1772						 "dropping connection.\n", r);
1773					conn_request_state(connection,
1774							   NS(conn, C_DISCONNECTING), CS_HARD);
1775					return;
1776				}
1777			}
1778		}
1779	}
1780
1781	if (current == connection->worker.task) {
1782		/* The worker should not sleep waiting for state_mutex,
1783		   that can take long */
1784		if (!mutex_trylock(device->state_mutex)) {
1785			set_bit(B_RS_H_DONE, &device->flags);
1786			device->start_resync_timer.expires = jiffies + HZ/5;
1787			add_timer(&device->start_resync_timer);
1788			return;
1789		}
1790	} else {
1791		mutex_lock(device->state_mutex);
1792	}
1793
1794	lock_all_resources();
1795	clear_bit(B_RS_H_DONE, &device->flags);
1796	/* Did some connection breakage or IO error race with us? */
1797	if (device->state.conn < C_CONNECTED
1798	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1799		unlock_all_resources();
1800		goto out;
1801	}
1802
1803	ns = drbd_read_state(device);
1804
1805	ns.aftr_isp = !_drbd_may_sync_now(device);
1806
1807	ns.conn = side;
1808
1809	if (side == C_SYNC_TARGET)
1810		ns.disk = D_INCONSISTENT;
1811	else /* side == C_SYNC_SOURCE */
1812		ns.pdsk = D_INCONSISTENT;
1813
1814	r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1815	ns = drbd_read_state(device);
1816
1817	if (ns.conn < C_CONNECTED)
1818		r = SS_UNKNOWN_ERROR;
1819
1820	if (r == SS_SUCCESS) {
1821		unsigned long tw = drbd_bm_total_weight(device);
1822		unsigned long now = jiffies;
1823		int i;
1824
1825		device->rs_failed    = 0;
1826		device->rs_paused    = 0;
1827		device->rs_same_csum = 0;
1828		device->rs_last_sect_ev = 0;
1829		device->rs_total     = tw;
1830		device->rs_start     = now;
1831		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1832			device->rs_mark_left[i] = tw;
1833			device->rs_mark_time[i] = now;
1834		}
1835		drbd_pause_after(device);
1836		/* Forget potentially stale cached per resync extent bit-counts.
1837		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1838		 * disabled, and know the disk state is ok. */
1839		spin_lock(&device->al_lock);
1840		lc_reset(device->resync);
1841		device->resync_locked = 0;
1842		device->resync_wenr = LC_FREE;
1843		spin_unlock(&device->al_lock);
1844	}
1845	unlock_all_resources();
1846
1847	if (r == SS_SUCCESS) {
1848		wake_up(&device->al_wait); /* for lc_reset() above */
1849		/* reset rs_last_bcast when a resync or verify is started,
1850		 * to deal with potential jiffies wrap. */
1851		device->rs_last_bcast = jiffies - HZ;
1852
1853		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1854		     drbd_conn_str(ns.conn),
1855		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1856		     (unsigned long) device->rs_total);
1857		if (side == C_SYNC_TARGET) {
1858			device->bm_resync_fo = 0;
1859			device->use_csums = use_checksum_based_resync(connection, device);
1860		} else {
1861			device->use_csums = false;
1862		}
1863
1864		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1865		 * with w_send_oos, or the sync target will get confused as to
1866		 * how much bits to resync.  We cannot do that always, because for an
1867		 * empty resync and protocol < 95, we need to do it here, as we call
1868		 * drbd_resync_finished from here in that case.
1869		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1870		 * and from after_state_ch otherwise. */
1871		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1872			drbd_gen_and_send_sync_uuid(peer_device);
1873
1874		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1875			/* This still has a race (about when exactly the peers
1876			 * detect connection loss) that can lead to a full sync
1877			 * on next handshake. In 8.3.9 we fixed this with explicit
1878			 * resync-finished notifications, but the fix
1879			 * introduces a protocol change.  Sleeping for some
1880			 * time longer than the ping interval + timeout on the
1881			 * SyncSource, to give the SyncTarget the chance to
1882			 * detect connection loss, then waiting for a ping
1883			 * response (implicit in drbd_resync_finished) reduces
1884			 * the race considerably, but does not solve it. */
1885			if (side == C_SYNC_SOURCE) {
1886				struct net_conf *nc;
1887				int timeo;
1888
1889				rcu_read_lock();
1890				nc = rcu_dereference(connection->net_conf);
1891				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1892				rcu_read_unlock();
1893				schedule_timeout_interruptible(timeo);
1894			}
1895			drbd_resync_finished(device);
1896		}
1897
1898		drbd_rs_controller_reset(device);
1899		/* ns.conn may already be != device->state.conn,
1900		 * we may have been paused in between, or become paused until
1901		 * the timer triggers.
1902		 * No matter, that is handled in resync_timer_fn() */
1903		if (ns.conn == C_SYNC_TARGET)
1904			mod_timer(&device->resync_timer, jiffies);
1905
1906		drbd_md_sync(device);
1907	}
1908	put_ldev(device);
1909out:
1910	mutex_unlock(device->state_mutex);
1911}
1912
1913static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1914{
1915	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1916	device->rs_last_bcast = jiffies;
1917
1918	if (!get_ldev(device))
1919		return;
1920
1921	drbd_bm_write_lazy(device, 0);
1922	if (resync_done && is_sync_state(device->state.conn))
1923		drbd_resync_finished(device);
1924
1925	drbd_bcast_event(device, &sib);
1926	/* update timestamp, in case it took a while to write out stuff */
1927	device->rs_last_bcast = jiffies;
1928	put_ldev(device);
1929}
1930
1931static void drbd_ldev_destroy(struct drbd_device *device)
1932{
1933	lc_destroy(device->resync);
1934	device->resync = NULL;
1935	lc_destroy(device->act_log);
1936	device->act_log = NULL;
1937
1938	__acquire(local);
1939	drbd_backing_dev_free(device, device->ldev);
1940	device->ldev = NULL;
1941	__release(local);
1942
1943	clear_bit(GOING_DISKLESS, &device->flags);
1944	wake_up(&device->misc_wait);
1945}
1946
1947static void go_diskless(struct drbd_device *device)
1948{
1949	D_ASSERT(device, device->state.disk == D_FAILED);
1950	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1951	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1952	 * the protected members anymore, though, so once put_ldev reaches zero
1953	 * again, it will be safe to free them. */
1954
1955	/* Try to write changed bitmap pages, read errors may have just
1956	 * set some bits outside the area covered by the activity log.
1957	 *
1958	 * If we have an IO error during the bitmap writeout,
1959	 * we will want a full sync next time, just in case.
1960	 * (Do we want a specific meta data flag for this?)
1961	 *
1962	 * If that does not make it to stable storage either,
1963	 * we cannot do anything about that anymore.
1964	 *
1965	 * We still need to check if both bitmap and ldev are present, we may
1966	 * end up here after a failed attach, before ldev was even assigned.
1967	 */
1968	if (device->bitmap && device->ldev) {
1969		/* An interrupted resync or similar is allowed to recounts bits
1970		 * while we detach.
1971		 * Any modifications would not be expected anymore, though.
1972		 */
1973		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1974					"detach", BM_LOCKED_TEST_ALLOWED)) {
1975			if (test_bit(WAS_READ_ERROR, &device->flags)) {
1976				drbd_md_set_flag(device, MDF_FULL_SYNC);
1977				drbd_md_sync(device);
1978			}
1979		}
1980	}
1981
1982	drbd_force_state(device, NS(disk, D_DISKLESS));
1983}
1984
1985static int do_md_sync(struct drbd_device *device)
1986{
1987	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1988	drbd_md_sync(device);
1989	return 0;
1990}
1991
1992/* only called from drbd_worker thread, no locking */
1993void __update_timing_details(
1994		struct drbd_thread_timing_details *tdp,
1995		unsigned int *cb_nr,
1996		void *cb,
1997		const char *fn, const unsigned int line)
1998{
1999	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2000	struct drbd_thread_timing_details *td = tdp + i;
2001
2002	td->start_jif = jiffies;
2003	td->cb_addr = cb;
2004	td->caller_fn = fn;
2005	td->line = line;
2006	td->cb_nr = *cb_nr;
2007
2008	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2009	td = tdp + i;
2010	memset(td, 0, sizeof(*td));
2011
2012	++(*cb_nr);
2013}
2014
2015static void do_device_work(struct drbd_device *device, const unsigned long todo)
2016{
2017	if (test_bit(MD_SYNC, &todo))
2018		do_md_sync(device);
2019	if (test_bit(RS_DONE, &todo) ||
2020	    test_bit(RS_PROGRESS, &todo))
2021		update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2022	if (test_bit(GO_DISKLESS, &todo))
2023		go_diskless(device);
2024	if (test_bit(DESTROY_DISK, &todo))
2025		drbd_ldev_destroy(device);
2026	if (test_bit(RS_START, &todo))
2027		do_start_resync(device);
2028}
2029
2030#define DRBD_DEVICE_WORK_MASK	\
2031	((1UL << GO_DISKLESS)	\
2032	|(1UL << DESTROY_DISK)	\
2033	|(1UL << MD_SYNC)	\
2034	|(1UL << RS_START)	\
2035	|(1UL << RS_PROGRESS)	\
2036	|(1UL << RS_DONE)	\
2037	)
2038
2039static unsigned long get_work_bits(unsigned long *flags)
2040{
2041	unsigned long old, new;
2042	do {
2043		old = *flags;
2044		new = old & ~DRBD_DEVICE_WORK_MASK;
2045	} while (cmpxchg(flags, old, new) != old);
2046	return old & DRBD_DEVICE_WORK_MASK;
2047}
2048
2049static void do_unqueued_work(struct drbd_connection *connection)
2050{
2051	struct drbd_peer_device *peer_device;
2052	int vnr;
2053
2054	rcu_read_lock();
2055	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2056		struct drbd_device *device = peer_device->device;
2057		unsigned long todo = get_work_bits(&device->flags);
2058		if (!todo)
2059			continue;
2060
2061		kref_get(&device->kref);
2062		rcu_read_unlock();
2063		do_device_work(device, todo);
2064		kref_put(&device->kref, drbd_destroy_device);
2065		rcu_read_lock();
2066	}
2067	rcu_read_unlock();
2068}
2069
2070static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2071{
2072	spin_lock_irq(&queue->q_lock);
2073	list_splice_tail_init(&queue->q, work_list);
2074	spin_unlock_irq(&queue->q_lock);
2075	return !list_empty(work_list);
2076}
2077
2078static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2079{
2080	DEFINE_WAIT(wait);
2081	struct net_conf *nc;
2082	int uncork, cork;
2083
2084	dequeue_work_batch(&connection->sender_work, work_list);
2085	if (!list_empty(work_list))
2086		return;
2087
2088	/* Still nothing to do?
2089	 * Maybe we still need to close the current epoch,
2090	 * even if no new requests are queued yet.
2091	 *
2092	 * Also, poke TCP, just in case.
2093	 * Then wait for new work (or signal). */
2094	rcu_read_lock();
2095	nc = rcu_dereference(connection->net_conf);
2096	uncork = nc ? nc->tcp_cork : 0;
2097	rcu_read_unlock();
2098	if (uncork) {
2099		mutex_lock(&connection->data.mutex);
2100		if (connection->data.socket)
2101			tcp_sock_set_cork(connection->data.socket->sk, false);
2102		mutex_unlock(&connection->data.mutex);
2103	}
2104
2105	for (;;) {
2106		int send_barrier;
2107		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2108		spin_lock_irq(&connection->resource->req_lock);
2109		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2110		if (!list_empty(&connection->sender_work.q))
2111			list_splice_tail_init(&connection->sender_work.q, work_list);
2112		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2113		if (!list_empty(work_list) || signal_pending(current)) {
2114			spin_unlock_irq(&connection->resource->req_lock);
2115			break;
2116		}
2117
2118		/* We found nothing new to do, no to-be-communicated request,
2119		 * no other work item.  We may still need to close the last
2120		 * epoch.  Next incoming request epoch will be connection ->
2121		 * current transfer log epoch number.  If that is different
2122		 * from the epoch of the last request we communicated, it is
2123		 * safe to send the epoch separating barrier now.
2124		 */
2125		send_barrier =
2126			atomic_read(&connection->current_tle_nr) !=
2127			connection->send.current_epoch_nr;
2128		spin_unlock_irq(&connection->resource->req_lock);
2129
2130		if (send_barrier)
2131			maybe_send_barrier(connection,
2132					connection->send.current_epoch_nr + 1);
2133
2134		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2135			break;
2136
2137		/* drbd_send() may have called flush_signals() */
2138		if (get_t_state(&connection->worker) != RUNNING)
2139			break;
2140
2141		schedule();
2142		/* may be woken up for other things but new work, too,
2143		 * e.g. if the current epoch got closed.
2144		 * In which case we send the barrier above. */
2145	}
2146	finish_wait(&connection->sender_work.q_wait, &wait);
2147
2148	/* someone may have changed the config while we have been waiting above. */
2149	rcu_read_lock();
2150	nc = rcu_dereference(connection->net_conf);
2151	cork = nc ? nc->tcp_cork : 0;
2152	rcu_read_unlock();
2153	mutex_lock(&connection->data.mutex);
2154	if (connection->data.socket) {
2155		if (cork)
2156			tcp_sock_set_cork(connection->data.socket->sk, true);
2157		else if (!uncork)
2158			tcp_sock_set_cork(connection->data.socket->sk, false);
2159	}
2160	mutex_unlock(&connection->data.mutex);
2161}
2162
2163int drbd_worker(struct drbd_thread *thi)
2164{
2165	struct drbd_connection *connection = thi->connection;
2166	struct drbd_work *w = NULL;
2167	struct drbd_peer_device *peer_device;
2168	LIST_HEAD(work_list);
2169	int vnr;
2170
2171	while (get_t_state(thi) == RUNNING) {
2172		drbd_thread_current_set_cpu(thi);
2173
2174		if (list_empty(&work_list)) {
2175			update_worker_timing_details(connection, wait_for_work);
2176			wait_for_work(connection, &work_list);
2177		}
2178
2179		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2180			update_worker_timing_details(connection, do_unqueued_work);
2181			do_unqueued_work(connection);
2182		}
2183
2184		if (signal_pending(current)) {
2185			flush_signals(current);
2186			if (get_t_state(thi) == RUNNING) {
2187				drbd_warn(connection, "Worker got an unexpected signal\n");
2188				continue;
2189			}
2190			break;
2191		}
2192
2193		if (get_t_state(thi) != RUNNING)
2194			break;
2195
2196		if (!list_empty(&work_list)) {
2197			w = list_first_entry(&work_list, struct drbd_work, list);
2198			list_del_init(&w->list);
2199			update_worker_timing_details(connection, w->cb);
2200			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2201				continue;
2202			if (connection->cstate >= C_WF_REPORT_PARAMS)
2203				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2204		}
2205	}
2206
2207	do {
2208		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2209			update_worker_timing_details(connection, do_unqueued_work);
2210			do_unqueued_work(connection);
2211		}
2212		if (!list_empty(&work_list)) {
2213			w = list_first_entry(&work_list, struct drbd_work, list);
2214			list_del_init(&w->list);
2215			update_worker_timing_details(connection, w->cb);
2216			w->cb(w, 1);
2217		} else
2218			dequeue_work_batch(&connection->sender_work, &work_list);
2219	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2220
2221	rcu_read_lock();
2222	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2223		struct drbd_device *device = peer_device->device;
2224		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2225		kref_get(&device->kref);
2226		rcu_read_unlock();
2227		drbd_device_cleanup(device);
2228		kref_put(&device->kref, drbd_destroy_device);
2229		rcu_read_lock();
2230	}
2231	rcu_read_unlock();
2232
2233	return 0;
2234}
2235