xref: /kernel/linux/linux-6.6/fs/gfs2/lock_dlm.c (revision 62306a36)
1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
4 * Copyright 2004-2011 Red Hat, Inc.
5 */
6
7#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
8
9#include <linux/fs.h>
10#include <linux/dlm.h>
11#include <linux/slab.h>
12#include <linux/types.h>
13#include <linux/delay.h>
14#include <linux/gfs2_ondisk.h>
15#include <linux/sched/signal.h>
16
17#include "incore.h"
18#include "glock.h"
19#include "glops.h"
20#include "recovery.h"
21#include "util.h"
22#include "sys.h"
23#include "trace_gfs2.h"
24
25/**
26 * gfs2_update_stats - Update time based stats
27 * @s: The stats to update (local or global)
28 * @index: The index inside @s
29 * @sample: New data to include
30 */
31static inline void gfs2_update_stats(struct gfs2_lkstats *s, unsigned index,
32				     s64 sample)
33{
34	/*
35	 * @delta is the difference between the current rtt sample and the
36	 * running average srtt. We add 1/8 of that to the srtt in order to
37	 * update the current srtt estimate. The variance estimate is a bit
38	 * more complicated. We subtract the current variance estimate from
39	 * the abs value of the @delta and add 1/4 of that to the running
40	 * total.  That's equivalent to 3/4 of the current variance
41	 * estimate plus 1/4 of the abs of @delta.
42	 *
43	 * Note that the index points at the array entry containing the
44	 * smoothed mean value, and the variance is always in the following
45	 * entry
46	 *
47	 * Reference: TCP/IP Illustrated, vol 2, p. 831,832
48	 * All times are in units of integer nanoseconds. Unlike the TCP/IP
49	 * case, they are not scaled fixed point.
50	 */
51
52	s64 delta = sample - s->stats[index];
53	s->stats[index] += (delta >> 3);
54	index++;
55	s->stats[index] += (s64)(abs(delta) - s->stats[index]) >> 2;
56}
57
58/**
59 * gfs2_update_reply_times - Update locking statistics
60 * @gl: The glock to update
61 *
62 * This assumes that gl->gl_dstamp has been set earlier.
63 *
64 * The rtt (lock round trip time) is an estimate of the time
65 * taken to perform a dlm lock request. We update it on each
66 * reply from the dlm.
67 *
68 * The blocking flag is set on the glock for all dlm requests
69 * which may potentially block due to lock requests from other nodes.
70 * DLM requests where the current lock state is exclusive, the
71 * requested state is null (or unlocked) or where the TRY or
72 * TRY_1CB flags are set are classified as non-blocking. All
73 * other DLM requests are counted as (potentially) blocking.
74 */
75static inline void gfs2_update_reply_times(struct gfs2_glock *gl)
76{
77	struct gfs2_pcpu_lkstats *lks;
78	const unsigned gltype = gl->gl_name.ln_type;
79	unsigned index = test_bit(GLF_BLOCKING, &gl->gl_flags) ?
80			 GFS2_LKS_SRTTB : GFS2_LKS_SRTT;
81	s64 rtt;
82
83	preempt_disable();
84	rtt = ktime_to_ns(ktime_sub(ktime_get_real(), gl->gl_dstamp));
85	lks = this_cpu_ptr(gl->gl_name.ln_sbd->sd_lkstats);
86	gfs2_update_stats(&gl->gl_stats, index, rtt);		/* Local */
87	gfs2_update_stats(&lks->lkstats[gltype], index, rtt);	/* Global */
88	preempt_enable();
89
90	trace_gfs2_glock_lock_time(gl, rtt);
91}
92
93/**
94 * gfs2_update_request_times - Update locking statistics
95 * @gl: The glock to update
96 *
97 * The irt (lock inter-request times) measures the average time
98 * between requests to the dlm. It is updated immediately before
99 * each dlm call.
100 */
101
102static inline void gfs2_update_request_times(struct gfs2_glock *gl)
103{
104	struct gfs2_pcpu_lkstats *lks;
105	const unsigned gltype = gl->gl_name.ln_type;
106	ktime_t dstamp;
107	s64 irt;
108
109	preempt_disable();
110	dstamp = gl->gl_dstamp;
111	gl->gl_dstamp = ktime_get_real();
112	irt = ktime_to_ns(ktime_sub(gl->gl_dstamp, dstamp));
113	lks = this_cpu_ptr(gl->gl_name.ln_sbd->sd_lkstats);
114	gfs2_update_stats(&gl->gl_stats, GFS2_LKS_SIRT, irt);		/* Local */
115	gfs2_update_stats(&lks->lkstats[gltype], GFS2_LKS_SIRT, irt);	/* Global */
116	preempt_enable();
117}
118
119static void gdlm_ast(void *arg)
120{
121	struct gfs2_glock *gl = arg;
122	unsigned ret = gl->gl_state;
123
124	gfs2_update_reply_times(gl);
125	BUG_ON(gl->gl_lksb.sb_flags & DLM_SBF_DEMOTED);
126
127	if ((gl->gl_lksb.sb_flags & DLM_SBF_VALNOTVALID) && gl->gl_lksb.sb_lvbptr)
128		memset(gl->gl_lksb.sb_lvbptr, 0, GDLM_LVB_SIZE);
129
130	switch (gl->gl_lksb.sb_status) {
131	case -DLM_EUNLOCK: /* Unlocked, so glock can be freed */
132		if (gl->gl_ops->go_free)
133			gl->gl_ops->go_free(gl);
134		gfs2_glock_free(gl);
135		return;
136	case -DLM_ECANCEL: /* Cancel while getting lock */
137		ret |= LM_OUT_CANCELED;
138		goto out;
139	case -EAGAIN: /* Try lock fails */
140	case -EDEADLK: /* Deadlock detected */
141		goto out;
142	case -ETIMEDOUT: /* Canceled due to timeout */
143		ret |= LM_OUT_ERROR;
144		goto out;
145	case 0: /* Success */
146		break;
147	default: /* Something unexpected */
148		BUG();
149	}
150
151	ret = gl->gl_req;
152	if (gl->gl_lksb.sb_flags & DLM_SBF_ALTMODE) {
153		if (gl->gl_req == LM_ST_SHARED)
154			ret = LM_ST_DEFERRED;
155		else if (gl->gl_req == LM_ST_DEFERRED)
156			ret = LM_ST_SHARED;
157		else
158			BUG();
159	}
160
161	set_bit(GLF_INITIAL, &gl->gl_flags);
162	gfs2_glock_complete(gl, ret);
163	return;
164out:
165	if (!test_bit(GLF_INITIAL, &gl->gl_flags))
166		gl->gl_lksb.sb_lkid = 0;
167	gfs2_glock_complete(gl, ret);
168}
169
170static void gdlm_bast(void *arg, int mode)
171{
172	struct gfs2_glock *gl = arg;
173
174	switch (mode) {
175	case DLM_LOCK_EX:
176		gfs2_glock_cb(gl, LM_ST_UNLOCKED);
177		break;
178	case DLM_LOCK_CW:
179		gfs2_glock_cb(gl, LM_ST_DEFERRED);
180		break;
181	case DLM_LOCK_PR:
182		gfs2_glock_cb(gl, LM_ST_SHARED);
183		break;
184	default:
185		fs_err(gl->gl_name.ln_sbd, "unknown bast mode %d\n", mode);
186		BUG();
187	}
188}
189
190/* convert gfs lock-state to dlm lock-mode */
191
192static int make_mode(struct gfs2_sbd *sdp, const unsigned int lmstate)
193{
194	switch (lmstate) {
195	case LM_ST_UNLOCKED:
196		return DLM_LOCK_NL;
197	case LM_ST_EXCLUSIVE:
198		return DLM_LOCK_EX;
199	case LM_ST_DEFERRED:
200		return DLM_LOCK_CW;
201	case LM_ST_SHARED:
202		return DLM_LOCK_PR;
203	}
204	fs_err(sdp, "unknown LM state %d\n", lmstate);
205	BUG();
206	return -1;
207}
208
209static u32 make_flags(struct gfs2_glock *gl, const unsigned int gfs_flags,
210		      const int req)
211{
212	u32 lkf = 0;
213
214	if (gl->gl_lksb.sb_lvbptr)
215		lkf |= DLM_LKF_VALBLK;
216
217	if (gfs_flags & LM_FLAG_TRY)
218		lkf |= DLM_LKF_NOQUEUE;
219
220	if (gfs_flags & LM_FLAG_TRY_1CB) {
221		lkf |= DLM_LKF_NOQUEUE;
222		lkf |= DLM_LKF_NOQUEUEBAST;
223	}
224
225	if (gfs_flags & LM_FLAG_ANY) {
226		if (req == DLM_LOCK_PR)
227			lkf |= DLM_LKF_ALTCW;
228		else if (req == DLM_LOCK_CW)
229			lkf |= DLM_LKF_ALTPR;
230		else
231			BUG();
232	}
233
234	if (gl->gl_lksb.sb_lkid != 0) {
235		lkf |= DLM_LKF_CONVERT;
236		if (test_bit(GLF_BLOCKING, &gl->gl_flags))
237			lkf |= DLM_LKF_QUECVT;
238	}
239
240	return lkf;
241}
242
243static void gfs2_reverse_hex(char *c, u64 value)
244{
245	*c = '0';
246	while (value) {
247		*c-- = hex_asc[value & 0x0f];
248		value >>= 4;
249	}
250}
251
252static int gdlm_lock(struct gfs2_glock *gl, unsigned int req_state,
253		     unsigned int flags)
254{
255	struct lm_lockstruct *ls = &gl->gl_name.ln_sbd->sd_lockstruct;
256	int req;
257	u32 lkf;
258	char strname[GDLM_STRNAME_BYTES] = "";
259	int error;
260
261	req = make_mode(gl->gl_name.ln_sbd, req_state);
262	lkf = make_flags(gl, flags, req);
263	gfs2_glstats_inc(gl, GFS2_LKS_DCOUNT);
264	gfs2_sbstats_inc(gl, GFS2_LKS_DCOUNT);
265	if (gl->gl_lksb.sb_lkid) {
266		gfs2_update_request_times(gl);
267	} else {
268		memset(strname, ' ', GDLM_STRNAME_BYTES - 1);
269		strname[GDLM_STRNAME_BYTES - 1] = '\0';
270		gfs2_reverse_hex(strname + 7, gl->gl_name.ln_type);
271		gfs2_reverse_hex(strname + 23, gl->gl_name.ln_number);
272		gl->gl_dstamp = ktime_get_real();
273	}
274	/*
275	 * Submit the actual lock request.
276	 */
277
278again:
279	error = dlm_lock(ls->ls_dlm, req, &gl->gl_lksb, lkf, strname,
280			GDLM_STRNAME_BYTES - 1, 0, gdlm_ast, gl, gdlm_bast);
281	if (error == -EBUSY) {
282		msleep(20);
283		goto again;
284	}
285	return error;
286}
287
288static void gdlm_put_lock(struct gfs2_glock *gl)
289{
290	struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
291	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
292	int error;
293
294	if (gl->gl_lksb.sb_lkid == 0)
295		goto out_free;
296
297	clear_bit(GLF_BLOCKING, &gl->gl_flags);
298	gfs2_glstats_inc(gl, GFS2_LKS_DCOUNT);
299	gfs2_sbstats_inc(gl, GFS2_LKS_DCOUNT);
300	gfs2_update_request_times(gl);
301
302	/* don't want to call dlm if we've unmounted the lock protocol */
303	if (test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
304		goto out_free;
305	/* don't want to skip dlm_unlock writing the lvb when lock has one */
306
307	if (test_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags) &&
308	    !gl->gl_lksb.sb_lvbptr)
309		goto out_free;
310
311again:
312	error = dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_VALBLK,
313			   NULL, gl);
314	if (error == -EBUSY) {
315		msleep(20);
316		goto again;
317	}
318
319	if (error) {
320		fs_err(sdp, "gdlm_unlock %x,%llx err=%d\n",
321		       gl->gl_name.ln_type,
322		       (unsigned long long)gl->gl_name.ln_number, error);
323	}
324	return;
325
326out_free:
327	gfs2_glock_free(gl);
328}
329
330static void gdlm_cancel(struct gfs2_glock *gl)
331{
332	struct lm_lockstruct *ls = &gl->gl_name.ln_sbd->sd_lockstruct;
333	dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_CANCEL, NULL, gl);
334}
335
336/*
337 * dlm/gfs2 recovery coordination using dlm_recover callbacks
338 *
339 *  0. gfs2 checks for another cluster node withdraw, needing journal replay
340 *  1. dlm_controld sees lockspace members change
341 *  2. dlm_controld blocks dlm-kernel locking activity
342 *  3. dlm_controld within dlm-kernel notifies gfs2 (recover_prep)
343 *  4. dlm_controld starts and finishes its own user level recovery
344 *  5. dlm_controld starts dlm-kernel dlm_recoverd to do kernel recovery
345 *  6. dlm_recoverd notifies gfs2 of failed nodes (recover_slot)
346 *  7. dlm_recoverd does its own lock recovery
347 *  8. dlm_recoverd unblocks dlm-kernel locking activity
348 *  9. dlm_recoverd notifies gfs2 when done (recover_done with new generation)
349 * 10. gfs2_control updates control_lock lvb with new generation and jid bits
350 * 11. gfs2_control enqueues journals for gfs2_recover to recover (maybe none)
351 * 12. gfs2_recover dequeues and recovers journals of failed nodes
352 * 13. gfs2_recover provides recovery results to gfs2_control (recovery_result)
353 * 14. gfs2_control updates control_lock lvb jid bits for recovered journals
354 * 15. gfs2_control unblocks normal locking when all journals are recovered
355 *
356 * - failures during recovery
357 *
358 * recover_prep() may set BLOCK_LOCKS (step 3) again before gfs2_control
359 * clears BLOCK_LOCKS (step 15), e.g. another node fails while still
360 * recovering for a prior failure.  gfs2_control needs a way to detect
361 * this so it can leave BLOCK_LOCKS set in step 15.  This is managed using
362 * the recover_block and recover_start values.
363 *
364 * recover_done() provides a new lockspace generation number each time it
365 * is called (step 9).  This generation number is saved as recover_start.
366 * When recover_prep() is called, it sets BLOCK_LOCKS and sets
367 * recover_block = recover_start.  So, while recover_block is equal to
368 * recover_start, BLOCK_LOCKS should remain set.  (recover_spin must
369 * be held around the BLOCK_LOCKS/recover_block/recover_start logic.)
370 *
371 * - more specific gfs2 steps in sequence above
372 *
373 *  3. recover_prep sets BLOCK_LOCKS and sets recover_block = recover_start
374 *  6. recover_slot records any failed jids (maybe none)
375 *  9. recover_done sets recover_start = new generation number
376 * 10. gfs2_control sets control_lock lvb = new gen + bits for failed jids
377 * 12. gfs2_recover does journal recoveries for failed jids identified above
378 * 14. gfs2_control clears control_lock lvb bits for recovered jids
379 * 15. gfs2_control checks if recover_block == recover_start (step 3 occured
380 *     again) then do nothing, otherwise if recover_start > recover_block
381 *     then clear BLOCK_LOCKS.
382 *
383 * - parallel recovery steps across all nodes
384 *
385 * All nodes attempt to update the control_lock lvb with the new generation
386 * number and jid bits, but only the first to get the control_lock EX will
387 * do so; others will see that it's already done (lvb already contains new
388 * generation number.)
389 *
390 * . All nodes get the same recover_prep/recover_slot/recover_done callbacks
391 * . All nodes attempt to set control_lock lvb gen + bits for the new gen
392 * . One node gets control_lock first and writes the lvb, others see it's done
393 * . All nodes attempt to recover jids for which they see control_lock bits set
394 * . One node succeeds for a jid, and that one clears the jid bit in the lvb
395 * . All nodes will eventually see all lvb bits clear and unblock locks
396 *
397 * - is there a problem with clearing an lvb bit that should be set
398 *   and missing a journal recovery?
399 *
400 * 1. jid fails
401 * 2. lvb bit set for step 1
402 * 3. jid recovered for step 1
403 * 4. jid taken again (new mount)
404 * 5. jid fails (for step 4)
405 * 6. lvb bit set for step 5 (will already be set)
406 * 7. lvb bit cleared for step 3
407 *
408 * This is not a problem because the failure in step 5 does not
409 * require recovery, because the mount in step 4 could not have
410 * progressed far enough to unblock locks and access the fs.  The
411 * control_mount() function waits for all recoveries to be complete
412 * for the latest lockspace generation before ever unblocking locks
413 * and returning.  The mount in step 4 waits until the recovery in
414 * step 1 is done.
415 *
416 * - special case of first mounter: first node to mount the fs
417 *
418 * The first node to mount a gfs2 fs needs to check all the journals
419 * and recover any that need recovery before other nodes are allowed
420 * to mount the fs.  (Others may begin mounting, but they must wait
421 * for the first mounter to be done before taking locks on the fs
422 * or accessing the fs.)  This has two parts:
423 *
424 * 1. The mounted_lock tells a node it's the first to mount the fs.
425 * Each node holds the mounted_lock in PR while it's mounted.
426 * Each node tries to acquire the mounted_lock in EX when it mounts.
427 * If a node is granted the mounted_lock EX it means there are no
428 * other mounted nodes (no PR locks exist), and it is the first mounter.
429 * The mounted_lock is demoted to PR when first recovery is done, so
430 * others will fail to get an EX lock, but will get a PR lock.
431 *
432 * 2. The control_lock blocks others in control_mount() while the first
433 * mounter is doing first mount recovery of all journals.
434 * A mounting node needs to acquire control_lock in EX mode before
435 * it can proceed.  The first mounter holds control_lock in EX while doing
436 * the first mount recovery, blocking mounts from other nodes, then demotes
437 * control_lock to NL when it's done (others_may_mount/first_done),
438 * allowing other nodes to continue mounting.
439 *
440 * first mounter:
441 * control_lock EX/NOQUEUE success
442 * mounted_lock EX/NOQUEUE success (no other PR, so no other mounters)
443 * set first=1
444 * do first mounter recovery
445 * mounted_lock EX->PR
446 * control_lock EX->NL, write lvb generation
447 *
448 * other mounter:
449 * control_lock EX/NOQUEUE success (if fail -EAGAIN, retry)
450 * mounted_lock EX/NOQUEUE fail -EAGAIN (expected due to other mounters PR)
451 * mounted_lock PR/NOQUEUE success
452 * read lvb generation
453 * control_lock EX->NL
454 * set first=0
455 *
456 * - mount during recovery
457 *
458 * If a node mounts while others are doing recovery (not first mounter),
459 * the mounting node will get its initial recover_done() callback without
460 * having seen any previous failures/callbacks.
461 *
462 * It must wait for all recoveries preceding its mount to be finished
463 * before it unblocks locks.  It does this by repeating the "other mounter"
464 * steps above until the lvb generation number is >= its mount generation
465 * number (from initial recover_done) and all lvb bits are clear.
466 *
467 * - control_lock lvb format
468 *
469 * 4 bytes generation number: the latest dlm lockspace generation number
470 * from recover_done callback.  Indicates the jid bitmap has been updated
471 * to reflect all slot failures through that generation.
472 * 4 bytes unused.
473 * GDLM_LVB_SIZE-8 bytes of jid bit map. If bit N is set, it indicates
474 * that jid N needs recovery.
475 */
476
477#define JID_BITMAP_OFFSET 8 /* 4 byte generation number + 4 byte unused */
478
479static void control_lvb_read(struct lm_lockstruct *ls, uint32_t *lvb_gen,
480			     char *lvb_bits)
481{
482	__le32 gen;
483	memcpy(lvb_bits, ls->ls_control_lvb, GDLM_LVB_SIZE);
484	memcpy(&gen, lvb_bits, sizeof(__le32));
485	*lvb_gen = le32_to_cpu(gen);
486}
487
488static void control_lvb_write(struct lm_lockstruct *ls, uint32_t lvb_gen,
489			      char *lvb_bits)
490{
491	__le32 gen;
492	memcpy(ls->ls_control_lvb, lvb_bits, GDLM_LVB_SIZE);
493	gen = cpu_to_le32(lvb_gen);
494	memcpy(ls->ls_control_lvb, &gen, sizeof(__le32));
495}
496
497static int all_jid_bits_clear(char *lvb)
498{
499	return !memchr_inv(lvb + JID_BITMAP_OFFSET, 0,
500			GDLM_LVB_SIZE - JID_BITMAP_OFFSET);
501}
502
503static void sync_wait_cb(void *arg)
504{
505	struct lm_lockstruct *ls = arg;
506	complete(&ls->ls_sync_wait);
507}
508
509static int sync_unlock(struct gfs2_sbd *sdp, struct dlm_lksb *lksb, char *name)
510{
511	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
512	int error;
513
514	error = dlm_unlock(ls->ls_dlm, lksb->sb_lkid, 0, lksb, ls);
515	if (error) {
516		fs_err(sdp, "%s lkid %x error %d\n",
517		       name, lksb->sb_lkid, error);
518		return error;
519	}
520
521	wait_for_completion(&ls->ls_sync_wait);
522
523	if (lksb->sb_status != -DLM_EUNLOCK) {
524		fs_err(sdp, "%s lkid %x status %d\n",
525		       name, lksb->sb_lkid, lksb->sb_status);
526		return -1;
527	}
528	return 0;
529}
530
531static int sync_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags,
532		     unsigned int num, struct dlm_lksb *lksb, char *name)
533{
534	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
535	char strname[GDLM_STRNAME_BYTES];
536	int error, status;
537
538	memset(strname, 0, GDLM_STRNAME_BYTES);
539	snprintf(strname, GDLM_STRNAME_BYTES, "%8x%16x", LM_TYPE_NONDISK, num);
540
541	error = dlm_lock(ls->ls_dlm, mode, lksb, flags,
542			 strname, GDLM_STRNAME_BYTES - 1,
543			 0, sync_wait_cb, ls, NULL);
544	if (error) {
545		fs_err(sdp, "%s lkid %x flags %x mode %d error %d\n",
546		       name, lksb->sb_lkid, flags, mode, error);
547		return error;
548	}
549
550	wait_for_completion(&ls->ls_sync_wait);
551
552	status = lksb->sb_status;
553
554	if (status && status != -EAGAIN) {
555		fs_err(sdp, "%s lkid %x flags %x mode %d status %d\n",
556		       name, lksb->sb_lkid, flags, mode, status);
557	}
558
559	return status;
560}
561
562static int mounted_unlock(struct gfs2_sbd *sdp)
563{
564	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
565	return sync_unlock(sdp, &ls->ls_mounted_lksb, "mounted_lock");
566}
567
568static int mounted_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags)
569{
570	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
571	return sync_lock(sdp, mode, flags, GFS2_MOUNTED_LOCK,
572			 &ls->ls_mounted_lksb, "mounted_lock");
573}
574
575static int control_unlock(struct gfs2_sbd *sdp)
576{
577	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
578	return sync_unlock(sdp, &ls->ls_control_lksb, "control_lock");
579}
580
581static int control_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags)
582{
583	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
584	return sync_lock(sdp, mode, flags, GFS2_CONTROL_LOCK,
585			 &ls->ls_control_lksb, "control_lock");
586}
587
588/**
589 * remote_withdraw - react to a node withdrawing from the file system
590 * @sdp: The superblock
591 */
592static void remote_withdraw(struct gfs2_sbd *sdp)
593{
594	struct gfs2_jdesc *jd;
595	int ret = 0, count = 0;
596
597	list_for_each_entry(jd, &sdp->sd_jindex_list, jd_list) {
598		if (jd->jd_jid == sdp->sd_lockstruct.ls_jid)
599			continue;
600		ret = gfs2_recover_journal(jd, true);
601		if (ret)
602			break;
603		count++;
604	}
605
606	/* Now drop the additional reference we acquired */
607	fs_err(sdp, "Journals checked: %d, ret = %d.\n", count, ret);
608}
609
610static void gfs2_control_func(struct work_struct *work)
611{
612	struct gfs2_sbd *sdp = container_of(work, struct gfs2_sbd, sd_control_work.work);
613	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
614	uint32_t block_gen, start_gen, lvb_gen, flags;
615	int recover_set = 0;
616	int write_lvb = 0;
617	int recover_size;
618	int i, error;
619
620	/* First check for other nodes that may have done a withdraw. */
621	if (test_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags)) {
622		remote_withdraw(sdp);
623		clear_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags);
624		return;
625	}
626
627	spin_lock(&ls->ls_recover_spin);
628	/*
629	 * No MOUNT_DONE means we're still mounting; control_mount()
630	 * will set this flag, after which this thread will take over
631	 * all further clearing of BLOCK_LOCKS.
632	 *
633	 * FIRST_MOUNT means this node is doing first mounter recovery,
634	 * for which recovery control is handled by
635	 * control_mount()/control_first_done(), not this thread.
636	 */
637	if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
638	     test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
639		spin_unlock(&ls->ls_recover_spin);
640		return;
641	}
642	block_gen = ls->ls_recover_block;
643	start_gen = ls->ls_recover_start;
644	spin_unlock(&ls->ls_recover_spin);
645
646	/*
647	 * Equal block_gen and start_gen implies we are between
648	 * recover_prep and recover_done callbacks, which means
649	 * dlm recovery is in progress and dlm locking is blocked.
650	 * There's no point trying to do any work until recover_done.
651	 */
652
653	if (block_gen == start_gen)
654		return;
655
656	/*
657	 * Propagate recover_submit[] and recover_result[] to lvb:
658	 * dlm_recoverd adds to recover_submit[] jids needing recovery
659	 * gfs2_recover adds to recover_result[] journal recovery results
660	 *
661	 * set lvb bit for jids in recover_submit[] if the lvb has not
662	 * yet been updated for the generation of the failure
663	 *
664	 * clear lvb bit for jids in recover_result[] if the result of
665	 * the journal recovery is SUCCESS
666	 */
667
668	error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
669	if (error) {
670		fs_err(sdp, "control lock EX error %d\n", error);
671		return;
672	}
673
674	control_lvb_read(ls, &lvb_gen, ls->ls_lvb_bits);
675
676	spin_lock(&ls->ls_recover_spin);
677	if (block_gen != ls->ls_recover_block ||
678	    start_gen != ls->ls_recover_start) {
679		fs_info(sdp, "recover generation %u block1 %u %u\n",
680			start_gen, block_gen, ls->ls_recover_block);
681		spin_unlock(&ls->ls_recover_spin);
682		control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
683		return;
684	}
685
686	recover_size = ls->ls_recover_size;
687
688	if (lvb_gen <= start_gen) {
689		/*
690		 * Clear lvb bits for jids we've successfully recovered.
691		 * Because all nodes attempt to recover failed journals,
692		 * a journal can be recovered multiple times successfully
693		 * in succession.  Only the first will really do recovery,
694		 * the others find it clean, but still report a successful
695		 * recovery.  So, another node may have already recovered
696		 * the jid and cleared the lvb bit for it.
697		 */
698		for (i = 0; i < recover_size; i++) {
699			if (ls->ls_recover_result[i] != LM_RD_SUCCESS)
700				continue;
701
702			ls->ls_recover_result[i] = 0;
703
704			if (!test_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET))
705				continue;
706
707			__clear_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET);
708			write_lvb = 1;
709		}
710	}
711
712	if (lvb_gen == start_gen) {
713		/*
714		 * Failed slots before start_gen are already set in lvb.
715		 */
716		for (i = 0; i < recover_size; i++) {
717			if (!ls->ls_recover_submit[i])
718				continue;
719			if (ls->ls_recover_submit[i] < lvb_gen)
720				ls->ls_recover_submit[i] = 0;
721		}
722	} else if (lvb_gen < start_gen) {
723		/*
724		 * Failed slots before start_gen are not yet set in lvb.
725		 */
726		for (i = 0; i < recover_size; i++) {
727			if (!ls->ls_recover_submit[i])
728				continue;
729			if (ls->ls_recover_submit[i] < start_gen) {
730				ls->ls_recover_submit[i] = 0;
731				__set_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET);
732			}
733		}
734		/* even if there are no bits to set, we need to write the
735		   latest generation to the lvb */
736		write_lvb = 1;
737	} else {
738		/*
739		 * we should be getting a recover_done() for lvb_gen soon
740		 */
741	}
742	spin_unlock(&ls->ls_recover_spin);
743
744	if (write_lvb) {
745		control_lvb_write(ls, start_gen, ls->ls_lvb_bits);
746		flags = DLM_LKF_CONVERT | DLM_LKF_VALBLK;
747	} else {
748		flags = DLM_LKF_CONVERT;
749	}
750
751	error = control_lock(sdp, DLM_LOCK_NL, flags);
752	if (error) {
753		fs_err(sdp, "control lock NL error %d\n", error);
754		return;
755	}
756
757	/*
758	 * Everyone will see jid bits set in the lvb, run gfs2_recover_set(),
759	 * and clear a jid bit in the lvb if the recovery is a success.
760	 * Eventually all journals will be recovered, all jid bits will
761	 * be cleared in the lvb, and everyone will clear BLOCK_LOCKS.
762	 */
763
764	for (i = 0; i < recover_size; i++) {
765		if (test_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET)) {
766			fs_info(sdp, "recover generation %u jid %d\n",
767				start_gen, i);
768			gfs2_recover_set(sdp, i);
769			recover_set++;
770		}
771	}
772	if (recover_set)
773		return;
774
775	/*
776	 * No more jid bits set in lvb, all recovery is done, unblock locks
777	 * (unless a new recover_prep callback has occured blocking locks
778	 * again while working above)
779	 */
780
781	spin_lock(&ls->ls_recover_spin);
782	if (ls->ls_recover_block == block_gen &&
783	    ls->ls_recover_start == start_gen) {
784		clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
785		spin_unlock(&ls->ls_recover_spin);
786		fs_info(sdp, "recover generation %u done\n", start_gen);
787		gfs2_glock_thaw(sdp);
788	} else {
789		fs_info(sdp, "recover generation %u block2 %u %u\n",
790			start_gen, block_gen, ls->ls_recover_block);
791		spin_unlock(&ls->ls_recover_spin);
792	}
793}
794
795static int control_mount(struct gfs2_sbd *sdp)
796{
797	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
798	uint32_t start_gen, block_gen, mount_gen, lvb_gen;
799	int mounted_mode;
800	int retries = 0;
801	int error;
802
803	memset(&ls->ls_mounted_lksb, 0, sizeof(struct dlm_lksb));
804	memset(&ls->ls_control_lksb, 0, sizeof(struct dlm_lksb));
805	memset(&ls->ls_control_lvb, 0, GDLM_LVB_SIZE);
806	ls->ls_control_lksb.sb_lvbptr = ls->ls_control_lvb;
807	init_completion(&ls->ls_sync_wait);
808
809	set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
810
811	error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_VALBLK);
812	if (error) {
813		fs_err(sdp, "control_mount control_lock NL error %d\n", error);
814		return error;
815	}
816
817	error = mounted_lock(sdp, DLM_LOCK_NL, 0);
818	if (error) {
819		fs_err(sdp, "control_mount mounted_lock NL error %d\n", error);
820		control_unlock(sdp);
821		return error;
822	}
823	mounted_mode = DLM_LOCK_NL;
824
825restart:
826	if (retries++ && signal_pending(current)) {
827		error = -EINTR;
828		goto fail;
829	}
830
831	/*
832	 * We always start with both locks in NL. control_lock is
833	 * demoted to NL below so we don't need to do it here.
834	 */
835
836	if (mounted_mode != DLM_LOCK_NL) {
837		error = mounted_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
838		if (error)
839			goto fail;
840		mounted_mode = DLM_LOCK_NL;
841	}
842
843	/*
844	 * Other nodes need to do some work in dlm recovery and gfs2_control
845	 * before the recover_done and control_lock will be ready for us below.
846	 * A delay here is not required but often avoids having to retry.
847	 */
848
849	msleep_interruptible(500);
850
851	/*
852	 * Acquire control_lock in EX and mounted_lock in either EX or PR.
853	 * control_lock lvb keeps track of any pending journal recoveries.
854	 * mounted_lock indicates if any other nodes have the fs mounted.
855	 */
856
857	error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE|DLM_LKF_VALBLK);
858	if (error == -EAGAIN) {
859		goto restart;
860	} else if (error) {
861		fs_err(sdp, "control_mount control_lock EX error %d\n", error);
862		goto fail;
863	}
864
865	/**
866	 * If we're a spectator, we don't want to take the lock in EX because
867	 * we cannot do the first-mount responsibility it implies: recovery.
868	 */
869	if (sdp->sd_args.ar_spectator)
870		goto locks_done;
871
872	error = mounted_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE);
873	if (!error) {
874		mounted_mode = DLM_LOCK_EX;
875		goto locks_done;
876	} else if (error != -EAGAIN) {
877		fs_err(sdp, "control_mount mounted_lock EX error %d\n", error);
878		goto fail;
879	}
880
881	error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE);
882	if (!error) {
883		mounted_mode = DLM_LOCK_PR;
884		goto locks_done;
885	} else {
886		/* not even -EAGAIN should happen here */
887		fs_err(sdp, "control_mount mounted_lock PR error %d\n", error);
888		goto fail;
889	}
890
891locks_done:
892	/*
893	 * If we got both locks above in EX, then we're the first mounter.
894	 * If not, then we need to wait for the control_lock lvb to be
895	 * updated by other mounted nodes to reflect our mount generation.
896	 *
897	 * In simple first mounter cases, first mounter will see zero lvb_gen,
898	 * but in cases where all existing nodes leave/fail before mounting
899	 * nodes finish control_mount, then all nodes will be mounting and
900	 * lvb_gen will be non-zero.
901	 */
902
903	control_lvb_read(ls, &lvb_gen, ls->ls_lvb_bits);
904
905	if (lvb_gen == 0xFFFFFFFF) {
906		/* special value to force mount attempts to fail */
907		fs_err(sdp, "control_mount control_lock disabled\n");
908		error = -EINVAL;
909		goto fail;
910	}
911
912	if (mounted_mode == DLM_LOCK_EX) {
913		/* first mounter, keep both EX while doing first recovery */
914		spin_lock(&ls->ls_recover_spin);
915		clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
916		set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags);
917		set_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
918		spin_unlock(&ls->ls_recover_spin);
919		fs_info(sdp, "first mounter control generation %u\n", lvb_gen);
920		return 0;
921	}
922
923	error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
924	if (error)
925		goto fail;
926
927	/*
928	 * We are not first mounter, now we need to wait for the control_lock
929	 * lvb generation to be >= the generation from our first recover_done
930	 * and all lvb bits to be clear (no pending journal recoveries.)
931	 */
932
933	if (!all_jid_bits_clear(ls->ls_lvb_bits)) {
934		/* journals need recovery, wait until all are clear */
935		fs_info(sdp, "control_mount wait for journal recovery\n");
936		goto restart;
937	}
938
939	spin_lock(&ls->ls_recover_spin);
940	block_gen = ls->ls_recover_block;
941	start_gen = ls->ls_recover_start;
942	mount_gen = ls->ls_recover_mount;
943
944	if (lvb_gen < mount_gen) {
945		/* wait for mounted nodes to update control_lock lvb to our
946		   generation, which might include new recovery bits set */
947		if (sdp->sd_args.ar_spectator) {
948			fs_info(sdp, "Recovery is required. Waiting for a "
949				"non-spectator to mount.\n");
950			msleep_interruptible(1000);
951		} else {
952			fs_info(sdp, "control_mount wait1 block %u start %u "
953				"mount %u lvb %u flags %lx\n", block_gen,
954				start_gen, mount_gen, lvb_gen,
955				ls->ls_recover_flags);
956		}
957		spin_unlock(&ls->ls_recover_spin);
958		goto restart;
959	}
960
961	if (lvb_gen != start_gen) {
962		/* wait for mounted nodes to update control_lock lvb to the
963		   latest recovery generation */
964		fs_info(sdp, "control_mount wait2 block %u start %u mount %u "
965			"lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
966			lvb_gen, ls->ls_recover_flags);
967		spin_unlock(&ls->ls_recover_spin);
968		goto restart;
969	}
970
971	if (block_gen == start_gen) {
972		/* dlm recovery in progress, wait for it to finish */
973		fs_info(sdp, "control_mount wait3 block %u start %u mount %u "
974			"lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
975			lvb_gen, ls->ls_recover_flags);
976		spin_unlock(&ls->ls_recover_spin);
977		goto restart;
978	}
979
980	clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
981	set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags);
982	memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t));
983	memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t));
984	spin_unlock(&ls->ls_recover_spin);
985	return 0;
986
987fail:
988	mounted_unlock(sdp);
989	control_unlock(sdp);
990	return error;
991}
992
993static int control_first_done(struct gfs2_sbd *sdp)
994{
995	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
996	uint32_t start_gen, block_gen;
997	int error;
998
999restart:
1000	spin_lock(&ls->ls_recover_spin);
1001	start_gen = ls->ls_recover_start;
1002	block_gen = ls->ls_recover_block;
1003
1004	if (test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags) ||
1005	    !test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
1006	    !test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1007		/* sanity check, should not happen */
1008		fs_err(sdp, "control_first_done start %u block %u flags %lx\n",
1009		       start_gen, block_gen, ls->ls_recover_flags);
1010		spin_unlock(&ls->ls_recover_spin);
1011		control_unlock(sdp);
1012		return -1;
1013	}
1014
1015	if (start_gen == block_gen) {
1016		/*
1017		 * Wait for the end of a dlm recovery cycle to switch from
1018		 * first mounter recovery.  We can ignore any recover_slot
1019		 * callbacks between the recover_prep and next recover_done
1020		 * because we are still the first mounter and any failed nodes
1021		 * have not fully mounted, so they don't need recovery.
1022		 */
1023		spin_unlock(&ls->ls_recover_spin);
1024		fs_info(sdp, "control_first_done wait gen %u\n", start_gen);
1025
1026		wait_on_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY,
1027			    TASK_UNINTERRUPTIBLE);
1028		goto restart;
1029	}
1030
1031	clear_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
1032	set_bit(DFL_FIRST_MOUNT_DONE, &ls->ls_recover_flags);
1033	memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t));
1034	memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t));
1035	spin_unlock(&ls->ls_recover_spin);
1036
1037	memset(ls->ls_lvb_bits, 0, GDLM_LVB_SIZE);
1038	control_lvb_write(ls, start_gen, ls->ls_lvb_bits);
1039
1040	error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT);
1041	if (error)
1042		fs_err(sdp, "control_first_done mounted PR error %d\n", error);
1043
1044	error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
1045	if (error)
1046		fs_err(sdp, "control_first_done control NL error %d\n", error);
1047
1048	return error;
1049}
1050
1051/*
1052 * Expand static jid arrays if necessary (by increments of RECOVER_SIZE_INC)
1053 * to accommodate the largest slot number.  (NB dlm slot numbers start at 1,
1054 * gfs2 jids start at 0, so jid = slot - 1)
1055 */
1056
1057#define RECOVER_SIZE_INC 16
1058
1059static int set_recover_size(struct gfs2_sbd *sdp, struct dlm_slot *slots,
1060			    int num_slots)
1061{
1062	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1063	uint32_t *submit = NULL;
1064	uint32_t *result = NULL;
1065	uint32_t old_size, new_size;
1066	int i, max_jid;
1067
1068	if (!ls->ls_lvb_bits) {
1069		ls->ls_lvb_bits = kzalloc(GDLM_LVB_SIZE, GFP_NOFS);
1070		if (!ls->ls_lvb_bits)
1071			return -ENOMEM;
1072	}
1073
1074	max_jid = 0;
1075	for (i = 0; i < num_slots; i++) {
1076		if (max_jid < slots[i].slot - 1)
1077			max_jid = slots[i].slot - 1;
1078	}
1079
1080	old_size = ls->ls_recover_size;
1081	new_size = old_size;
1082	while (new_size < max_jid + 1)
1083		new_size += RECOVER_SIZE_INC;
1084	if (new_size == old_size)
1085		return 0;
1086
1087	submit = kcalloc(new_size, sizeof(uint32_t), GFP_NOFS);
1088	result = kcalloc(new_size, sizeof(uint32_t), GFP_NOFS);
1089	if (!submit || !result) {
1090		kfree(submit);
1091		kfree(result);
1092		return -ENOMEM;
1093	}
1094
1095	spin_lock(&ls->ls_recover_spin);
1096	memcpy(submit, ls->ls_recover_submit, old_size * sizeof(uint32_t));
1097	memcpy(result, ls->ls_recover_result, old_size * sizeof(uint32_t));
1098	kfree(ls->ls_recover_submit);
1099	kfree(ls->ls_recover_result);
1100	ls->ls_recover_submit = submit;
1101	ls->ls_recover_result = result;
1102	ls->ls_recover_size = new_size;
1103	spin_unlock(&ls->ls_recover_spin);
1104	return 0;
1105}
1106
1107static void free_recover_size(struct lm_lockstruct *ls)
1108{
1109	kfree(ls->ls_lvb_bits);
1110	kfree(ls->ls_recover_submit);
1111	kfree(ls->ls_recover_result);
1112	ls->ls_recover_submit = NULL;
1113	ls->ls_recover_result = NULL;
1114	ls->ls_recover_size = 0;
1115	ls->ls_lvb_bits = NULL;
1116}
1117
1118/* dlm calls before it does lock recovery */
1119
1120static void gdlm_recover_prep(void *arg)
1121{
1122	struct gfs2_sbd *sdp = arg;
1123	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1124
1125	if (gfs2_withdrawn(sdp)) {
1126		fs_err(sdp, "recover_prep ignored due to withdraw.\n");
1127		return;
1128	}
1129	spin_lock(&ls->ls_recover_spin);
1130	ls->ls_recover_block = ls->ls_recover_start;
1131	set_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags);
1132
1133	if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
1134	     test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1135		spin_unlock(&ls->ls_recover_spin);
1136		return;
1137	}
1138	set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
1139	spin_unlock(&ls->ls_recover_spin);
1140}
1141
1142/* dlm calls after recover_prep has been completed on all lockspace members;
1143   identifies slot/jid of failed member */
1144
1145static void gdlm_recover_slot(void *arg, struct dlm_slot *slot)
1146{
1147	struct gfs2_sbd *sdp = arg;
1148	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1149	int jid = slot->slot - 1;
1150
1151	if (gfs2_withdrawn(sdp)) {
1152		fs_err(sdp, "recover_slot jid %d ignored due to withdraw.\n",
1153		       jid);
1154		return;
1155	}
1156	spin_lock(&ls->ls_recover_spin);
1157	if (ls->ls_recover_size < jid + 1) {
1158		fs_err(sdp, "recover_slot jid %d gen %u short size %d\n",
1159		       jid, ls->ls_recover_block, ls->ls_recover_size);
1160		spin_unlock(&ls->ls_recover_spin);
1161		return;
1162	}
1163
1164	if (ls->ls_recover_submit[jid]) {
1165		fs_info(sdp, "recover_slot jid %d gen %u prev %u\n",
1166			jid, ls->ls_recover_block, ls->ls_recover_submit[jid]);
1167	}
1168	ls->ls_recover_submit[jid] = ls->ls_recover_block;
1169	spin_unlock(&ls->ls_recover_spin);
1170}
1171
1172/* dlm calls after recover_slot and after it completes lock recovery */
1173
1174static void gdlm_recover_done(void *arg, struct dlm_slot *slots, int num_slots,
1175			      int our_slot, uint32_t generation)
1176{
1177	struct gfs2_sbd *sdp = arg;
1178	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1179
1180	if (gfs2_withdrawn(sdp)) {
1181		fs_err(sdp, "recover_done ignored due to withdraw.\n");
1182		return;
1183	}
1184	/* ensure the ls jid arrays are large enough */
1185	set_recover_size(sdp, slots, num_slots);
1186
1187	spin_lock(&ls->ls_recover_spin);
1188	ls->ls_recover_start = generation;
1189
1190	if (!ls->ls_recover_mount) {
1191		ls->ls_recover_mount = generation;
1192		ls->ls_jid = our_slot - 1;
1193	}
1194
1195	if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
1196		queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work, 0);
1197
1198	clear_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags);
1199	smp_mb__after_atomic();
1200	wake_up_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY);
1201	spin_unlock(&ls->ls_recover_spin);
1202}
1203
1204/* gfs2_recover thread has a journal recovery result */
1205
1206static void gdlm_recovery_result(struct gfs2_sbd *sdp, unsigned int jid,
1207				 unsigned int result)
1208{
1209	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1210
1211	if (gfs2_withdrawn(sdp)) {
1212		fs_err(sdp, "recovery_result jid %d ignored due to withdraw.\n",
1213		       jid);
1214		return;
1215	}
1216	if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1217		return;
1218
1219	/* don't care about the recovery of own journal during mount */
1220	if (jid == ls->ls_jid)
1221		return;
1222
1223	spin_lock(&ls->ls_recover_spin);
1224	if (test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1225		spin_unlock(&ls->ls_recover_spin);
1226		return;
1227	}
1228	if (ls->ls_recover_size < jid + 1) {
1229		fs_err(sdp, "recovery_result jid %d short size %d\n",
1230		       jid, ls->ls_recover_size);
1231		spin_unlock(&ls->ls_recover_spin);
1232		return;
1233	}
1234
1235	fs_info(sdp, "recover jid %d result %s\n", jid,
1236		result == LM_RD_GAVEUP ? "busy" : "success");
1237
1238	ls->ls_recover_result[jid] = result;
1239
1240	/* GAVEUP means another node is recovering the journal; delay our
1241	   next attempt to recover it, to give the other node a chance to
1242	   finish before trying again */
1243
1244	if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
1245		queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work,
1246				   result == LM_RD_GAVEUP ? HZ : 0);
1247	spin_unlock(&ls->ls_recover_spin);
1248}
1249
1250static const struct dlm_lockspace_ops gdlm_lockspace_ops = {
1251	.recover_prep = gdlm_recover_prep,
1252	.recover_slot = gdlm_recover_slot,
1253	.recover_done = gdlm_recover_done,
1254};
1255
1256static int gdlm_mount(struct gfs2_sbd *sdp, const char *table)
1257{
1258	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1259	char cluster[GFS2_LOCKNAME_LEN];
1260	const char *fsname;
1261	uint32_t flags;
1262	int error, ops_result;
1263
1264	/*
1265	 * initialize everything
1266	 */
1267
1268	INIT_DELAYED_WORK(&sdp->sd_control_work, gfs2_control_func);
1269	spin_lock_init(&ls->ls_recover_spin);
1270	ls->ls_recover_flags = 0;
1271	ls->ls_recover_mount = 0;
1272	ls->ls_recover_start = 0;
1273	ls->ls_recover_block = 0;
1274	ls->ls_recover_size = 0;
1275	ls->ls_recover_submit = NULL;
1276	ls->ls_recover_result = NULL;
1277	ls->ls_lvb_bits = NULL;
1278
1279	error = set_recover_size(sdp, NULL, 0);
1280	if (error)
1281		goto fail;
1282
1283	/*
1284	 * prepare dlm_new_lockspace args
1285	 */
1286
1287	fsname = strchr(table, ':');
1288	if (!fsname) {
1289		fs_info(sdp, "no fsname found\n");
1290		error = -EINVAL;
1291		goto fail_free;
1292	}
1293	memset(cluster, 0, sizeof(cluster));
1294	memcpy(cluster, table, strlen(table) - strlen(fsname));
1295	fsname++;
1296
1297	flags = DLM_LSFL_NEWEXCL;
1298
1299	/*
1300	 * create/join lockspace
1301	 */
1302
1303	error = dlm_new_lockspace(fsname, cluster, flags, GDLM_LVB_SIZE,
1304				  &gdlm_lockspace_ops, sdp, &ops_result,
1305				  &ls->ls_dlm);
1306	if (error) {
1307		fs_err(sdp, "dlm_new_lockspace error %d\n", error);
1308		goto fail_free;
1309	}
1310
1311	if (ops_result < 0) {
1312		/*
1313		 * dlm does not support ops callbacks,
1314		 * old dlm_controld/gfs_controld are used, try without ops.
1315		 */
1316		fs_info(sdp, "dlm lockspace ops not used\n");
1317		free_recover_size(ls);
1318		set_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags);
1319		return 0;
1320	}
1321
1322	if (!test_bit(SDF_NOJOURNALID, &sdp->sd_flags)) {
1323		fs_err(sdp, "dlm lockspace ops disallow jid preset\n");
1324		error = -EINVAL;
1325		goto fail_release;
1326	}
1327
1328	/*
1329	 * control_mount() uses control_lock to determine first mounter,
1330	 * and for later mounts, waits for any recoveries to be cleared.
1331	 */
1332
1333	error = control_mount(sdp);
1334	if (error) {
1335		fs_err(sdp, "mount control error %d\n", error);
1336		goto fail_release;
1337	}
1338
1339	ls->ls_first = !!test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
1340	clear_bit(SDF_NOJOURNALID, &sdp->sd_flags);
1341	smp_mb__after_atomic();
1342	wake_up_bit(&sdp->sd_flags, SDF_NOJOURNALID);
1343	return 0;
1344
1345fail_release:
1346	dlm_release_lockspace(ls->ls_dlm, 2);
1347fail_free:
1348	free_recover_size(ls);
1349fail:
1350	return error;
1351}
1352
1353static void gdlm_first_done(struct gfs2_sbd *sdp)
1354{
1355	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1356	int error;
1357
1358	if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1359		return;
1360
1361	error = control_first_done(sdp);
1362	if (error)
1363		fs_err(sdp, "mount first_done error %d\n", error);
1364}
1365
1366static void gdlm_unmount(struct gfs2_sbd *sdp)
1367{
1368	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1369
1370	if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1371		goto release;
1372
1373	/* wait for gfs2_control_wq to be done with this mount */
1374
1375	spin_lock(&ls->ls_recover_spin);
1376	set_bit(DFL_UNMOUNT, &ls->ls_recover_flags);
1377	spin_unlock(&ls->ls_recover_spin);
1378	flush_delayed_work(&sdp->sd_control_work);
1379
1380	/* mounted_lock and control_lock will be purged in dlm recovery */
1381release:
1382	if (ls->ls_dlm) {
1383		dlm_release_lockspace(ls->ls_dlm, 2);
1384		ls->ls_dlm = NULL;
1385	}
1386
1387	free_recover_size(ls);
1388}
1389
1390static const match_table_t dlm_tokens = {
1391	{ Opt_jid, "jid=%d"},
1392	{ Opt_id, "id=%d"},
1393	{ Opt_first, "first=%d"},
1394	{ Opt_nodir, "nodir=%d"},
1395	{ Opt_err, NULL },
1396};
1397
1398const struct lm_lockops gfs2_dlm_ops = {
1399	.lm_proto_name = "lock_dlm",
1400	.lm_mount = gdlm_mount,
1401	.lm_first_done = gdlm_first_done,
1402	.lm_recovery_result = gdlm_recovery_result,
1403	.lm_unmount = gdlm_unmount,
1404	.lm_put_lock = gdlm_put_lock,
1405	.lm_lock = gdlm_lock,
1406	.lm_cancel = gdlm_cancel,
1407	.lm_tokens = &dlm_tokens,
1408};
1409
1410