162306a36Sopenharmony_ci// SPDX-License-Identifier: GPL-2.0-only
262306a36Sopenharmony_ci/******************************************************************************
362306a36Sopenharmony_ci*******************************************************************************
462306a36Sopenharmony_ci**
562306a36Sopenharmony_ci**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
662306a36Sopenharmony_ci**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
762306a36Sopenharmony_ci**
862306a36Sopenharmony_ci**
962306a36Sopenharmony_ci*******************************************************************************
1062306a36Sopenharmony_ci******************************************************************************/
1162306a36Sopenharmony_ci
1262306a36Sopenharmony_ci#include "dlm_internal.h"
1362306a36Sopenharmony_ci#include "lockspace.h"
1462306a36Sopenharmony_ci#include "dir.h"
1562306a36Sopenharmony_ci#include "config.h"
1662306a36Sopenharmony_ci#include "ast.h"
1762306a36Sopenharmony_ci#include "memory.h"
1862306a36Sopenharmony_ci#include "rcom.h"
1962306a36Sopenharmony_ci#include "lock.h"
2062306a36Sopenharmony_ci#include "lowcomms.h"
2162306a36Sopenharmony_ci#include "member.h"
2262306a36Sopenharmony_ci#include "recover.h"
2362306a36Sopenharmony_ci
2462306a36Sopenharmony_ci
2562306a36Sopenharmony_ci/*
2662306a36Sopenharmony_ci * Recovery waiting routines: these functions wait for a particular reply from
2762306a36Sopenharmony_ci * a remote node, or for the remote node to report a certain status.  They need
2862306a36Sopenharmony_ci * to abort if the lockspace is stopped indicating a node has failed (perhaps
2962306a36Sopenharmony_ci * the one being waited for).
3062306a36Sopenharmony_ci */
3162306a36Sopenharmony_ci
3262306a36Sopenharmony_ci/*
3362306a36Sopenharmony_ci * Wait until given function returns non-zero or lockspace is stopped
3462306a36Sopenharmony_ci * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes).  When another
3562306a36Sopenharmony_ci * function thinks it could have completed the waited-on task, they should wake
3662306a36Sopenharmony_ci * up ls_wait_general to get an immediate response rather than waiting for the
3762306a36Sopenharmony_ci * timeout.  This uses a timeout so it can check periodically if the wait
3862306a36Sopenharmony_ci * should abort due to node failure (which doesn't cause a wake_up).
3962306a36Sopenharmony_ci * This should only be called by the dlm_recoverd thread.
4062306a36Sopenharmony_ci */
4162306a36Sopenharmony_ci
4262306a36Sopenharmony_ciint dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
4362306a36Sopenharmony_ci{
4462306a36Sopenharmony_ci	int error = 0;
4562306a36Sopenharmony_ci	int rv;
4662306a36Sopenharmony_ci
4762306a36Sopenharmony_ci	while (1) {
4862306a36Sopenharmony_ci		rv = wait_event_timeout(ls->ls_wait_general,
4962306a36Sopenharmony_ci					testfn(ls) || dlm_recovery_stopped(ls),
5062306a36Sopenharmony_ci					dlm_config.ci_recover_timer * HZ);
5162306a36Sopenharmony_ci		if (rv)
5262306a36Sopenharmony_ci			break;
5362306a36Sopenharmony_ci		if (test_bit(LSFL_RCOM_WAIT, &ls->ls_flags)) {
5462306a36Sopenharmony_ci			log_debug(ls, "dlm_wait_function timed out");
5562306a36Sopenharmony_ci			return -ETIMEDOUT;
5662306a36Sopenharmony_ci		}
5762306a36Sopenharmony_ci	}
5862306a36Sopenharmony_ci
5962306a36Sopenharmony_ci	if (dlm_recovery_stopped(ls)) {
6062306a36Sopenharmony_ci		log_debug(ls, "dlm_wait_function aborted");
6162306a36Sopenharmony_ci		error = -EINTR;
6262306a36Sopenharmony_ci	}
6362306a36Sopenharmony_ci	return error;
6462306a36Sopenharmony_ci}
6562306a36Sopenharmony_ci
6662306a36Sopenharmony_ci/*
6762306a36Sopenharmony_ci * An efficient way for all nodes to wait for all others to have a certain
6862306a36Sopenharmony_ci * status.  The node with the lowest nodeid polls all the others for their
6962306a36Sopenharmony_ci * status (wait_status_all) and all the others poll the node with the low id
7062306a36Sopenharmony_ci * for its accumulated result (wait_status_low).  When all nodes have set
7162306a36Sopenharmony_ci * status flag X, then status flag X_ALL will be set on the low nodeid.
7262306a36Sopenharmony_ci */
7362306a36Sopenharmony_ci
7462306a36Sopenharmony_ciuint32_t dlm_recover_status(struct dlm_ls *ls)
7562306a36Sopenharmony_ci{
7662306a36Sopenharmony_ci	uint32_t status;
7762306a36Sopenharmony_ci	spin_lock(&ls->ls_recover_lock);
7862306a36Sopenharmony_ci	status = ls->ls_recover_status;
7962306a36Sopenharmony_ci	spin_unlock(&ls->ls_recover_lock);
8062306a36Sopenharmony_ci	return status;
8162306a36Sopenharmony_ci}
8262306a36Sopenharmony_ci
8362306a36Sopenharmony_cistatic void _set_recover_status(struct dlm_ls *ls, uint32_t status)
8462306a36Sopenharmony_ci{
8562306a36Sopenharmony_ci	ls->ls_recover_status |= status;
8662306a36Sopenharmony_ci}
8762306a36Sopenharmony_ci
8862306a36Sopenharmony_civoid dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
8962306a36Sopenharmony_ci{
9062306a36Sopenharmony_ci	spin_lock(&ls->ls_recover_lock);
9162306a36Sopenharmony_ci	_set_recover_status(ls, status);
9262306a36Sopenharmony_ci	spin_unlock(&ls->ls_recover_lock);
9362306a36Sopenharmony_ci}
9462306a36Sopenharmony_ci
9562306a36Sopenharmony_cistatic int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
9662306a36Sopenharmony_ci			   int save_slots, uint64_t seq)
9762306a36Sopenharmony_ci{
9862306a36Sopenharmony_ci	struct dlm_rcom *rc = ls->ls_recover_buf;
9962306a36Sopenharmony_ci	struct dlm_member *memb;
10062306a36Sopenharmony_ci	int error = 0, delay;
10162306a36Sopenharmony_ci
10262306a36Sopenharmony_ci	list_for_each_entry(memb, &ls->ls_nodes, list) {
10362306a36Sopenharmony_ci		delay = 0;
10462306a36Sopenharmony_ci		for (;;) {
10562306a36Sopenharmony_ci			if (dlm_recovery_stopped(ls)) {
10662306a36Sopenharmony_ci				error = -EINTR;
10762306a36Sopenharmony_ci				goto out;
10862306a36Sopenharmony_ci			}
10962306a36Sopenharmony_ci
11062306a36Sopenharmony_ci			error = dlm_rcom_status(ls, memb->nodeid, 0, seq);
11162306a36Sopenharmony_ci			if (error)
11262306a36Sopenharmony_ci				goto out;
11362306a36Sopenharmony_ci
11462306a36Sopenharmony_ci			if (save_slots)
11562306a36Sopenharmony_ci				dlm_slot_save(ls, rc, memb);
11662306a36Sopenharmony_ci
11762306a36Sopenharmony_ci			if (le32_to_cpu(rc->rc_result) & wait_status)
11862306a36Sopenharmony_ci				break;
11962306a36Sopenharmony_ci			if (delay < 1000)
12062306a36Sopenharmony_ci				delay += 20;
12162306a36Sopenharmony_ci			msleep(delay);
12262306a36Sopenharmony_ci		}
12362306a36Sopenharmony_ci	}
12462306a36Sopenharmony_ci out:
12562306a36Sopenharmony_ci	return error;
12662306a36Sopenharmony_ci}
12762306a36Sopenharmony_ci
12862306a36Sopenharmony_cistatic int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
12962306a36Sopenharmony_ci			   uint32_t status_flags, uint64_t seq)
13062306a36Sopenharmony_ci{
13162306a36Sopenharmony_ci	struct dlm_rcom *rc = ls->ls_recover_buf;
13262306a36Sopenharmony_ci	int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
13362306a36Sopenharmony_ci
13462306a36Sopenharmony_ci	for (;;) {
13562306a36Sopenharmony_ci		if (dlm_recovery_stopped(ls)) {
13662306a36Sopenharmony_ci			error = -EINTR;
13762306a36Sopenharmony_ci			goto out;
13862306a36Sopenharmony_ci		}
13962306a36Sopenharmony_ci
14062306a36Sopenharmony_ci		error = dlm_rcom_status(ls, nodeid, status_flags, seq);
14162306a36Sopenharmony_ci		if (error)
14262306a36Sopenharmony_ci			break;
14362306a36Sopenharmony_ci
14462306a36Sopenharmony_ci		if (le32_to_cpu(rc->rc_result) & wait_status)
14562306a36Sopenharmony_ci			break;
14662306a36Sopenharmony_ci		if (delay < 1000)
14762306a36Sopenharmony_ci			delay += 20;
14862306a36Sopenharmony_ci		msleep(delay);
14962306a36Sopenharmony_ci	}
15062306a36Sopenharmony_ci out:
15162306a36Sopenharmony_ci	return error;
15262306a36Sopenharmony_ci}
15362306a36Sopenharmony_ci
15462306a36Sopenharmony_cistatic int wait_status(struct dlm_ls *ls, uint32_t status, uint64_t seq)
15562306a36Sopenharmony_ci{
15662306a36Sopenharmony_ci	uint32_t status_all = status << 1;
15762306a36Sopenharmony_ci	int error;
15862306a36Sopenharmony_ci
15962306a36Sopenharmony_ci	if (ls->ls_low_nodeid == dlm_our_nodeid()) {
16062306a36Sopenharmony_ci		error = wait_status_all(ls, status, 0, seq);
16162306a36Sopenharmony_ci		if (!error)
16262306a36Sopenharmony_ci			dlm_set_recover_status(ls, status_all);
16362306a36Sopenharmony_ci	} else
16462306a36Sopenharmony_ci		error = wait_status_low(ls, status_all, 0, seq);
16562306a36Sopenharmony_ci
16662306a36Sopenharmony_ci	return error;
16762306a36Sopenharmony_ci}
16862306a36Sopenharmony_ci
16962306a36Sopenharmony_ciint dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq)
17062306a36Sopenharmony_ci{
17162306a36Sopenharmony_ci	struct dlm_member *memb;
17262306a36Sopenharmony_ci	struct dlm_slot *slots;
17362306a36Sopenharmony_ci	int num_slots, slots_size;
17462306a36Sopenharmony_ci	int error, rv;
17562306a36Sopenharmony_ci	uint32_t gen;
17662306a36Sopenharmony_ci
17762306a36Sopenharmony_ci	list_for_each_entry(memb, &ls->ls_nodes, list) {
17862306a36Sopenharmony_ci		memb->slot = -1;
17962306a36Sopenharmony_ci		memb->generation = 0;
18062306a36Sopenharmony_ci	}
18162306a36Sopenharmony_ci
18262306a36Sopenharmony_ci	if (ls->ls_low_nodeid == dlm_our_nodeid()) {
18362306a36Sopenharmony_ci		error = wait_status_all(ls, DLM_RS_NODES, 1, seq);
18462306a36Sopenharmony_ci		if (error)
18562306a36Sopenharmony_ci			goto out;
18662306a36Sopenharmony_ci
18762306a36Sopenharmony_ci		/* slots array is sparse, slots_size may be > num_slots */
18862306a36Sopenharmony_ci
18962306a36Sopenharmony_ci		rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
19062306a36Sopenharmony_ci		if (!rv) {
19162306a36Sopenharmony_ci			spin_lock(&ls->ls_recover_lock);
19262306a36Sopenharmony_ci			_set_recover_status(ls, DLM_RS_NODES_ALL);
19362306a36Sopenharmony_ci			ls->ls_num_slots = num_slots;
19462306a36Sopenharmony_ci			ls->ls_slots_size = slots_size;
19562306a36Sopenharmony_ci			ls->ls_slots = slots;
19662306a36Sopenharmony_ci			ls->ls_generation = gen;
19762306a36Sopenharmony_ci			spin_unlock(&ls->ls_recover_lock);
19862306a36Sopenharmony_ci		} else {
19962306a36Sopenharmony_ci			dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
20062306a36Sopenharmony_ci		}
20162306a36Sopenharmony_ci	} else {
20262306a36Sopenharmony_ci		error = wait_status_low(ls, DLM_RS_NODES_ALL,
20362306a36Sopenharmony_ci					DLM_RSF_NEED_SLOTS, seq);
20462306a36Sopenharmony_ci		if (error)
20562306a36Sopenharmony_ci			goto out;
20662306a36Sopenharmony_ci
20762306a36Sopenharmony_ci		dlm_slots_copy_in(ls);
20862306a36Sopenharmony_ci	}
20962306a36Sopenharmony_ci out:
21062306a36Sopenharmony_ci	return error;
21162306a36Sopenharmony_ci}
21262306a36Sopenharmony_ci
21362306a36Sopenharmony_ciint dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq)
21462306a36Sopenharmony_ci{
21562306a36Sopenharmony_ci	return wait_status(ls, DLM_RS_DIR, seq);
21662306a36Sopenharmony_ci}
21762306a36Sopenharmony_ci
21862306a36Sopenharmony_ciint dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq)
21962306a36Sopenharmony_ci{
22062306a36Sopenharmony_ci	return wait_status(ls, DLM_RS_LOCKS, seq);
22162306a36Sopenharmony_ci}
22262306a36Sopenharmony_ci
22362306a36Sopenharmony_ciint dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq)
22462306a36Sopenharmony_ci{
22562306a36Sopenharmony_ci	return wait_status(ls, DLM_RS_DONE, seq);
22662306a36Sopenharmony_ci}
22762306a36Sopenharmony_ci
22862306a36Sopenharmony_ci/*
22962306a36Sopenharmony_ci * The recover_list contains all the rsb's for which we've requested the new
23062306a36Sopenharmony_ci * master nodeid.  As replies are returned from the resource directories the
23162306a36Sopenharmony_ci * rsb's are removed from the list.  When the list is empty we're done.
23262306a36Sopenharmony_ci *
23362306a36Sopenharmony_ci * The recover_list is later similarly used for all rsb's for which we've sent
23462306a36Sopenharmony_ci * new lkb's and need to receive new corresponding lkid's.
23562306a36Sopenharmony_ci *
23662306a36Sopenharmony_ci * We use the address of the rsb struct as a simple local identifier for the
23762306a36Sopenharmony_ci * rsb so we can match an rcom reply with the rsb it was sent for.
23862306a36Sopenharmony_ci */
23962306a36Sopenharmony_ci
24062306a36Sopenharmony_cistatic int recover_list_empty(struct dlm_ls *ls)
24162306a36Sopenharmony_ci{
24262306a36Sopenharmony_ci	int empty;
24362306a36Sopenharmony_ci
24462306a36Sopenharmony_ci	spin_lock(&ls->ls_recover_list_lock);
24562306a36Sopenharmony_ci	empty = list_empty(&ls->ls_recover_list);
24662306a36Sopenharmony_ci	spin_unlock(&ls->ls_recover_list_lock);
24762306a36Sopenharmony_ci
24862306a36Sopenharmony_ci	return empty;
24962306a36Sopenharmony_ci}
25062306a36Sopenharmony_ci
25162306a36Sopenharmony_cistatic void recover_list_add(struct dlm_rsb *r)
25262306a36Sopenharmony_ci{
25362306a36Sopenharmony_ci	struct dlm_ls *ls = r->res_ls;
25462306a36Sopenharmony_ci
25562306a36Sopenharmony_ci	spin_lock(&ls->ls_recover_list_lock);
25662306a36Sopenharmony_ci	if (list_empty(&r->res_recover_list)) {
25762306a36Sopenharmony_ci		list_add_tail(&r->res_recover_list, &ls->ls_recover_list);
25862306a36Sopenharmony_ci		ls->ls_recover_list_count++;
25962306a36Sopenharmony_ci		dlm_hold_rsb(r);
26062306a36Sopenharmony_ci	}
26162306a36Sopenharmony_ci	spin_unlock(&ls->ls_recover_list_lock);
26262306a36Sopenharmony_ci}
26362306a36Sopenharmony_ci
26462306a36Sopenharmony_cistatic void recover_list_del(struct dlm_rsb *r)
26562306a36Sopenharmony_ci{
26662306a36Sopenharmony_ci	struct dlm_ls *ls = r->res_ls;
26762306a36Sopenharmony_ci
26862306a36Sopenharmony_ci	spin_lock(&ls->ls_recover_list_lock);
26962306a36Sopenharmony_ci	list_del_init(&r->res_recover_list);
27062306a36Sopenharmony_ci	ls->ls_recover_list_count--;
27162306a36Sopenharmony_ci	spin_unlock(&ls->ls_recover_list_lock);
27262306a36Sopenharmony_ci
27362306a36Sopenharmony_ci	dlm_put_rsb(r);
27462306a36Sopenharmony_ci}
27562306a36Sopenharmony_ci
27662306a36Sopenharmony_cistatic void recover_list_clear(struct dlm_ls *ls)
27762306a36Sopenharmony_ci{
27862306a36Sopenharmony_ci	struct dlm_rsb *r, *s;
27962306a36Sopenharmony_ci
28062306a36Sopenharmony_ci	spin_lock(&ls->ls_recover_list_lock);
28162306a36Sopenharmony_ci	list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
28262306a36Sopenharmony_ci		list_del_init(&r->res_recover_list);
28362306a36Sopenharmony_ci		r->res_recover_locks_count = 0;
28462306a36Sopenharmony_ci		dlm_put_rsb(r);
28562306a36Sopenharmony_ci		ls->ls_recover_list_count--;
28662306a36Sopenharmony_ci	}
28762306a36Sopenharmony_ci
28862306a36Sopenharmony_ci	if (ls->ls_recover_list_count != 0) {
28962306a36Sopenharmony_ci		log_error(ls, "warning: recover_list_count %d",
29062306a36Sopenharmony_ci			  ls->ls_recover_list_count);
29162306a36Sopenharmony_ci		ls->ls_recover_list_count = 0;
29262306a36Sopenharmony_ci	}
29362306a36Sopenharmony_ci	spin_unlock(&ls->ls_recover_list_lock);
29462306a36Sopenharmony_ci}
29562306a36Sopenharmony_ci
29662306a36Sopenharmony_cistatic int recover_idr_empty(struct dlm_ls *ls)
29762306a36Sopenharmony_ci{
29862306a36Sopenharmony_ci	int empty = 1;
29962306a36Sopenharmony_ci
30062306a36Sopenharmony_ci	spin_lock(&ls->ls_recover_idr_lock);
30162306a36Sopenharmony_ci	if (ls->ls_recover_list_count)
30262306a36Sopenharmony_ci		empty = 0;
30362306a36Sopenharmony_ci	spin_unlock(&ls->ls_recover_idr_lock);
30462306a36Sopenharmony_ci
30562306a36Sopenharmony_ci	return empty;
30662306a36Sopenharmony_ci}
30762306a36Sopenharmony_ci
30862306a36Sopenharmony_cistatic int recover_idr_add(struct dlm_rsb *r)
30962306a36Sopenharmony_ci{
31062306a36Sopenharmony_ci	struct dlm_ls *ls = r->res_ls;
31162306a36Sopenharmony_ci	int rv;
31262306a36Sopenharmony_ci
31362306a36Sopenharmony_ci	idr_preload(GFP_NOFS);
31462306a36Sopenharmony_ci	spin_lock(&ls->ls_recover_idr_lock);
31562306a36Sopenharmony_ci	if (r->res_id) {
31662306a36Sopenharmony_ci		rv = -1;
31762306a36Sopenharmony_ci		goto out_unlock;
31862306a36Sopenharmony_ci	}
31962306a36Sopenharmony_ci	rv = idr_alloc(&ls->ls_recover_idr, r, 1, 0, GFP_NOWAIT);
32062306a36Sopenharmony_ci	if (rv < 0)
32162306a36Sopenharmony_ci		goto out_unlock;
32262306a36Sopenharmony_ci
32362306a36Sopenharmony_ci	r->res_id = rv;
32462306a36Sopenharmony_ci	ls->ls_recover_list_count++;
32562306a36Sopenharmony_ci	dlm_hold_rsb(r);
32662306a36Sopenharmony_ci	rv = 0;
32762306a36Sopenharmony_ciout_unlock:
32862306a36Sopenharmony_ci	spin_unlock(&ls->ls_recover_idr_lock);
32962306a36Sopenharmony_ci	idr_preload_end();
33062306a36Sopenharmony_ci	return rv;
33162306a36Sopenharmony_ci}
33262306a36Sopenharmony_ci
33362306a36Sopenharmony_cistatic void recover_idr_del(struct dlm_rsb *r)
33462306a36Sopenharmony_ci{
33562306a36Sopenharmony_ci	struct dlm_ls *ls = r->res_ls;
33662306a36Sopenharmony_ci
33762306a36Sopenharmony_ci	spin_lock(&ls->ls_recover_idr_lock);
33862306a36Sopenharmony_ci	idr_remove(&ls->ls_recover_idr, r->res_id);
33962306a36Sopenharmony_ci	r->res_id = 0;
34062306a36Sopenharmony_ci	ls->ls_recover_list_count--;
34162306a36Sopenharmony_ci	spin_unlock(&ls->ls_recover_idr_lock);
34262306a36Sopenharmony_ci
34362306a36Sopenharmony_ci	dlm_put_rsb(r);
34462306a36Sopenharmony_ci}
34562306a36Sopenharmony_ci
34662306a36Sopenharmony_cistatic struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
34762306a36Sopenharmony_ci{
34862306a36Sopenharmony_ci	struct dlm_rsb *r;
34962306a36Sopenharmony_ci
35062306a36Sopenharmony_ci	spin_lock(&ls->ls_recover_idr_lock);
35162306a36Sopenharmony_ci	r = idr_find(&ls->ls_recover_idr, (int)id);
35262306a36Sopenharmony_ci	spin_unlock(&ls->ls_recover_idr_lock);
35362306a36Sopenharmony_ci	return r;
35462306a36Sopenharmony_ci}
35562306a36Sopenharmony_ci
35662306a36Sopenharmony_cistatic void recover_idr_clear(struct dlm_ls *ls)
35762306a36Sopenharmony_ci{
35862306a36Sopenharmony_ci	struct dlm_rsb *r;
35962306a36Sopenharmony_ci	int id;
36062306a36Sopenharmony_ci
36162306a36Sopenharmony_ci	spin_lock(&ls->ls_recover_idr_lock);
36262306a36Sopenharmony_ci
36362306a36Sopenharmony_ci	idr_for_each_entry(&ls->ls_recover_idr, r, id) {
36462306a36Sopenharmony_ci		idr_remove(&ls->ls_recover_idr, id);
36562306a36Sopenharmony_ci		r->res_id = 0;
36662306a36Sopenharmony_ci		r->res_recover_locks_count = 0;
36762306a36Sopenharmony_ci		ls->ls_recover_list_count--;
36862306a36Sopenharmony_ci
36962306a36Sopenharmony_ci		dlm_put_rsb(r);
37062306a36Sopenharmony_ci	}
37162306a36Sopenharmony_ci
37262306a36Sopenharmony_ci	if (ls->ls_recover_list_count != 0) {
37362306a36Sopenharmony_ci		log_error(ls, "warning: recover_list_count %d",
37462306a36Sopenharmony_ci			  ls->ls_recover_list_count);
37562306a36Sopenharmony_ci		ls->ls_recover_list_count = 0;
37662306a36Sopenharmony_ci	}
37762306a36Sopenharmony_ci	spin_unlock(&ls->ls_recover_idr_lock);
37862306a36Sopenharmony_ci}
37962306a36Sopenharmony_ci
38062306a36Sopenharmony_ci
38162306a36Sopenharmony_ci/* Master recovery: find new master node for rsb's that were
38262306a36Sopenharmony_ci   mastered on nodes that have been removed.
38362306a36Sopenharmony_ci
38462306a36Sopenharmony_ci   dlm_recover_masters
38562306a36Sopenharmony_ci   recover_master
38662306a36Sopenharmony_ci   dlm_send_rcom_lookup            ->  receive_rcom_lookup
38762306a36Sopenharmony_ci                                       dlm_dir_lookup
38862306a36Sopenharmony_ci   receive_rcom_lookup_reply       <-
38962306a36Sopenharmony_ci   dlm_recover_master_reply
39062306a36Sopenharmony_ci   set_new_master
39162306a36Sopenharmony_ci   set_master_lkbs
39262306a36Sopenharmony_ci   set_lock_master
39362306a36Sopenharmony_ci*/
39462306a36Sopenharmony_ci
39562306a36Sopenharmony_ci/*
39662306a36Sopenharmony_ci * Set the lock master for all LKBs in a lock queue
39762306a36Sopenharmony_ci * If we are the new master of the rsb, we may have received new
39862306a36Sopenharmony_ci * MSTCPY locks from other nodes already which we need to ignore
39962306a36Sopenharmony_ci * when setting the new nodeid.
40062306a36Sopenharmony_ci */
40162306a36Sopenharmony_ci
40262306a36Sopenharmony_cistatic void set_lock_master(struct list_head *queue, int nodeid)
40362306a36Sopenharmony_ci{
40462306a36Sopenharmony_ci	struct dlm_lkb *lkb;
40562306a36Sopenharmony_ci
40662306a36Sopenharmony_ci	list_for_each_entry(lkb, queue, lkb_statequeue) {
40762306a36Sopenharmony_ci		if (!test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
40862306a36Sopenharmony_ci			lkb->lkb_nodeid = nodeid;
40962306a36Sopenharmony_ci			lkb->lkb_remid = 0;
41062306a36Sopenharmony_ci		}
41162306a36Sopenharmony_ci	}
41262306a36Sopenharmony_ci}
41362306a36Sopenharmony_ci
41462306a36Sopenharmony_cistatic void set_master_lkbs(struct dlm_rsb *r)
41562306a36Sopenharmony_ci{
41662306a36Sopenharmony_ci	set_lock_master(&r->res_grantqueue, r->res_nodeid);
41762306a36Sopenharmony_ci	set_lock_master(&r->res_convertqueue, r->res_nodeid);
41862306a36Sopenharmony_ci	set_lock_master(&r->res_waitqueue, r->res_nodeid);
41962306a36Sopenharmony_ci}
42062306a36Sopenharmony_ci
42162306a36Sopenharmony_ci/*
42262306a36Sopenharmony_ci * Propagate the new master nodeid to locks
42362306a36Sopenharmony_ci * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
42462306a36Sopenharmony_ci * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
42562306a36Sopenharmony_ci * rsb's to consider.
42662306a36Sopenharmony_ci */
42762306a36Sopenharmony_ci
42862306a36Sopenharmony_cistatic void set_new_master(struct dlm_rsb *r)
42962306a36Sopenharmony_ci{
43062306a36Sopenharmony_ci	set_master_lkbs(r);
43162306a36Sopenharmony_ci	rsb_set_flag(r, RSB_NEW_MASTER);
43262306a36Sopenharmony_ci	rsb_set_flag(r, RSB_NEW_MASTER2);
43362306a36Sopenharmony_ci}
43462306a36Sopenharmony_ci
43562306a36Sopenharmony_ci/*
43662306a36Sopenharmony_ci * We do async lookups on rsb's that need new masters.  The rsb's
43762306a36Sopenharmony_ci * waiting for a lookup reply are kept on the recover_list.
43862306a36Sopenharmony_ci *
43962306a36Sopenharmony_ci * Another node recovering the master may have sent us a rcom lookup,
44062306a36Sopenharmony_ci * and our dlm_master_lookup() set it as the new master, along with
44162306a36Sopenharmony_ci * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
44262306a36Sopenharmony_ci * equals our_nodeid below).
44362306a36Sopenharmony_ci */
44462306a36Sopenharmony_ci
44562306a36Sopenharmony_cistatic int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq)
44662306a36Sopenharmony_ci{
44762306a36Sopenharmony_ci	struct dlm_ls *ls = r->res_ls;
44862306a36Sopenharmony_ci	int our_nodeid, dir_nodeid;
44962306a36Sopenharmony_ci	int is_removed = 0;
45062306a36Sopenharmony_ci	int error;
45162306a36Sopenharmony_ci
45262306a36Sopenharmony_ci	if (is_master(r))
45362306a36Sopenharmony_ci		return 0;
45462306a36Sopenharmony_ci
45562306a36Sopenharmony_ci	is_removed = dlm_is_removed(ls, r->res_nodeid);
45662306a36Sopenharmony_ci
45762306a36Sopenharmony_ci	if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
45862306a36Sopenharmony_ci		return 0;
45962306a36Sopenharmony_ci
46062306a36Sopenharmony_ci	our_nodeid = dlm_our_nodeid();
46162306a36Sopenharmony_ci	dir_nodeid = dlm_dir_nodeid(r);
46262306a36Sopenharmony_ci
46362306a36Sopenharmony_ci	if (dir_nodeid == our_nodeid) {
46462306a36Sopenharmony_ci		if (is_removed) {
46562306a36Sopenharmony_ci			r->res_master_nodeid = our_nodeid;
46662306a36Sopenharmony_ci			r->res_nodeid = 0;
46762306a36Sopenharmony_ci		}
46862306a36Sopenharmony_ci
46962306a36Sopenharmony_ci		/* set master of lkbs to ourself when is_removed, or to
47062306a36Sopenharmony_ci		   another new master which we set along with NEW_MASTER
47162306a36Sopenharmony_ci		   in dlm_master_lookup */
47262306a36Sopenharmony_ci		set_new_master(r);
47362306a36Sopenharmony_ci		error = 0;
47462306a36Sopenharmony_ci	} else {
47562306a36Sopenharmony_ci		recover_idr_add(r);
47662306a36Sopenharmony_ci		error = dlm_send_rcom_lookup(r, dir_nodeid, seq);
47762306a36Sopenharmony_ci	}
47862306a36Sopenharmony_ci
47962306a36Sopenharmony_ci	(*count)++;
48062306a36Sopenharmony_ci	return error;
48162306a36Sopenharmony_ci}
48262306a36Sopenharmony_ci
48362306a36Sopenharmony_ci/*
48462306a36Sopenharmony_ci * All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
48562306a36Sopenharmony_ci * This is necessary because recovery can be started, aborted and restarted,
48662306a36Sopenharmony_ci * causing the master nodeid to briefly change during the aborted recovery, and
48762306a36Sopenharmony_ci * change back to the original value in the second recovery.  The MSTCPY locks
48862306a36Sopenharmony_ci * may or may not have been purged during the aborted recovery.  Another node
48962306a36Sopenharmony_ci * with an outstanding request in waiters list and a request reply saved in the
49062306a36Sopenharmony_ci * requestqueue, cannot know whether it should ignore the reply and resend the
49162306a36Sopenharmony_ci * request, or accept the reply and complete the request.  It must do the
49262306a36Sopenharmony_ci * former if the remote node purged MSTCPY locks, and it must do the later if
49362306a36Sopenharmony_ci * the remote node did not.  This is solved by always purging MSTCPY locks, in
49462306a36Sopenharmony_ci * which case, the request reply would always be ignored and the request
49562306a36Sopenharmony_ci * resent.
49662306a36Sopenharmony_ci */
49762306a36Sopenharmony_ci
49862306a36Sopenharmony_cistatic int recover_master_static(struct dlm_rsb *r, unsigned int *count)
49962306a36Sopenharmony_ci{
50062306a36Sopenharmony_ci	int dir_nodeid = dlm_dir_nodeid(r);
50162306a36Sopenharmony_ci	int new_master = dir_nodeid;
50262306a36Sopenharmony_ci
50362306a36Sopenharmony_ci	if (dir_nodeid == dlm_our_nodeid())
50462306a36Sopenharmony_ci		new_master = 0;
50562306a36Sopenharmony_ci
50662306a36Sopenharmony_ci	dlm_purge_mstcpy_locks(r);
50762306a36Sopenharmony_ci	r->res_master_nodeid = dir_nodeid;
50862306a36Sopenharmony_ci	r->res_nodeid = new_master;
50962306a36Sopenharmony_ci	set_new_master(r);
51062306a36Sopenharmony_ci	(*count)++;
51162306a36Sopenharmony_ci	return 0;
51262306a36Sopenharmony_ci}
51362306a36Sopenharmony_ci
51462306a36Sopenharmony_ci/*
51562306a36Sopenharmony_ci * Go through local root resources and for each rsb which has a master which
51662306a36Sopenharmony_ci * has departed, get the new master nodeid from the directory.  The dir will
51762306a36Sopenharmony_ci * assign mastery to the first node to look up the new master.  That means
51862306a36Sopenharmony_ci * we'll discover in this lookup if we're the new master of any rsb's.
51962306a36Sopenharmony_ci *
52062306a36Sopenharmony_ci * We fire off all the dir lookup requests individually and asynchronously to
52162306a36Sopenharmony_ci * the correct dir node.
52262306a36Sopenharmony_ci */
52362306a36Sopenharmony_ci
52462306a36Sopenharmony_ciint dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
52562306a36Sopenharmony_ci{
52662306a36Sopenharmony_ci	struct dlm_rsb *r;
52762306a36Sopenharmony_ci	unsigned int total = 0;
52862306a36Sopenharmony_ci	unsigned int count = 0;
52962306a36Sopenharmony_ci	int nodir = dlm_no_directory(ls);
53062306a36Sopenharmony_ci	int error;
53162306a36Sopenharmony_ci
53262306a36Sopenharmony_ci	log_rinfo(ls, "dlm_recover_masters");
53362306a36Sopenharmony_ci
53462306a36Sopenharmony_ci	down_read(&ls->ls_root_sem);
53562306a36Sopenharmony_ci	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
53662306a36Sopenharmony_ci		if (dlm_recovery_stopped(ls)) {
53762306a36Sopenharmony_ci			up_read(&ls->ls_root_sem);
53862306a36Sopenharmony_ci			error = -EINTR;
53962306a36Sopenharmony_ci			goto out;
54062306a36Sopenharmony_ci		}
54162306a36Sopenharmony_ci
54262306a36Sopenharmony_ci		lock_rsb(r);
54362306a36Sopenharmony_ci		if (nodir)
54462306a36Sopenharmony_ci			error = recover_master_static(r, &count);
54562306a36Sopenharmony_ci		else
54662306a36Sopenharmony_ci			error = recover_master(r, &count, seq);
54762306a36Sopenharmony_ci		unlock_rsb(r);
54862306a36Sopenharmony_ci		cond_resched();
54962306a36Sopenharmony_ci		total++;
55062306a36Sopenharmony_ci
55162306a36Sopenharmony_ci		if (error) {
55262306a36Sopenharmony_ci			up_read(&ls->ls_root_sem);
55362306a36Sopenharmony_ci			goto out;
55462306a36Sopenharmony_ci		}
55562306a36Sopenharmony_ci	}
55662306a36Sopenharmony_ci	up_read(&ls->ls_root_sem);
55762306a36Sopenharmony_ci
55862306a36Sopenharmony_ci	log_rinfo(ls, "dlm_recover_masters %u of %u", count, total);
55962306a36Sopenharmony_ci
56062306a36Sopenharmony_ci	error = dlm_wait_function(ls, &recover_idr_empty);
56162306a36Sopenharmony_ci out:
56262306a36Sopenharmony_ci	if (error)
56362306a36Sopenharmony_ci		recover_idr_clear(ls);
56462306a36Sopenharmony_ci	return error;
56562306a36Sopenharmony_ci}
56662306a36Sopenharmony_ci
56762306a36Sopenharmony_ciint dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc)
56862306a36Sopenharmony_ci{
56962306a36Sopenharmony_ci	struct dlm_rsb *r;
57062306a36Sopenharmony_ci	int ret_nodeid, new_master;
57162306a36Sopenharmony_ci
57262306a36Sopenharmony_ci	r = recover_idr_find(ls, le64_to_cpu(rc->rc_id));
57362306a36Sopenharmony_ci	if (!r) {
57462306a36Sopenharmony_ci		log_error(ls, "dlm_recover_master_reply no id %llx",
57562306a36Sopenharmony_ci			  (unsigned long long)le64_to_cpu(rc->rc_id));
57662306a36Sopenharmony_ci		goto out;
57762306a36Sopenharmony_ci	}
57862306a36Sopenharmony_ci
57962306a36Sopenharmony_ci	ret_nodeid = le32_to_cpu(rc->rc_result);
58062306a36Sopenharmony_ci
58162306a36Sopenharmony_ci	if (ret_nodeid == dlm_our_nodeid())
58262306a36Sopenharmony_ci		new_master = 0;
58362306a36Sopenharmony_ci	else
58462306a36Sopenharmony_ci		new_master = ret_nodeid;
58562306a36Sopenharmony_ci
58662306a36Sopenharmony_ci	lock_rsb(r);
58762306a36Sopenharmony_ci	r->res_master_nodeid = ret_nodeid;
58862306a36Sopenharmony_ci	r->res_nodeid = new_master;
58962306a36Sopenharmony_ci	set_new_master(r);
59062306a36Sopenharmony_ci	unlock_rsb(r);
59162306a36Sopenharmony_ci	recover_idr_del(r);
59262306a36Sopenharmony_ci
59362306a36Sopenharmony_ci	if (recover_idr_empty(ls))
59462306a36Sopenharmony_ci		wake_up(&ls->ls_wait_general);
59562306a36Sopenharmony_ci out:
59662306a36Sopenharmony_ci	return 0;
59762306a36Sopenharmony_ci}
59862306a36Sopenharmony_ci
59962306a36Sopenharmony_ci
60062306a36Sopenharmony_ci/* Lock recovery: rebuild the process-copy locks we hold on a
60162306a36Sopenharmony_ci   remastered rsb on the new rsb master.
60262306a36Sopenharmony_ci
60362306a36Sopenharmony_ci   dlm_recover_locks
60462306a36Sopenharmony_ci   recover_locks
60562306a36Sopenharmony_ci   recover_locks_queue
60662306a36Sopenharmony_ci   dlm_send_rcom_lock              ->  receive_rcom_lock
60762306a36Sopenharmony_ci                                       dlm_recover_master_copy
60862306a36Sopenharmony_ci   receive_rcom_lock_reply         <-
60962306a36Sopenharmony_ci   dlm_recover_process_copy
61062306a36Sopenharmony_ci*/
61162306a36Sopenharmony_ci
61262306a36Sopenharmony_ci
61362306a36Sopenharmony_ci/*
61462306a36Sopenharmony_ci * keep a count of the number of lkb's we send to the new master; when we get
61562306a36Sopenharmony_ci * an equal number of replies then recovery for the rsb is done
61662306a36Sopenharmony_ci */
61762306a36Sopenharmony_ci
61862306a36Sopenharmony_cistatic int recover_locks_queue(struct dlm_rsb *r, struct list_head *head,
61962306a36Sopenharmony_ci			       uint64_t seq)
62062306a36Sopenharmony_ci{
62162306a36Sopenharmony_ci	struct dlm_lkb *lkb;
62262306a36Sopenharmony_ci	int error = 0;
62362306a36Sopenharmony_ci
62462306a36Sopenharmony_ci	list_for_each_entry(lkb, head, lkb_statequeue) {
62562306a36Sopenharmony_ci		error = dlm_send_rcom_lock(r, lkb, seq);
62662306a36Sopenharmony_ci		if (error)
62762306a36Sopenharmony_ci			break;
62862306a36Sopenharmony_ci		r->res_recover_locks_count++;
62962306a36Sopenharmony_ci	}
63062306a36Sopenharmony_ci
63162306a36Sopenharmony_ci	return error;
63262306a36Sopenharmony_ci}
63362306a36Sopenharmony_ci
63462306a36Sopenharmony_cistatic int recover_locks(struct dlm_rsb *r, uint64_t seq)
63562306a36Sopenharmony_ci{
63662306a36Sopenharmony_ci	int error = 0;
63762306a36Sopenharmony_ci
63862306a36Sopenharmony_ci	lock_rsb(r);
63962306a36Sopenharmony_ci
64062306a36Sopenharmony_ci	DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r););
64162306a36Sopenharmony_ci
64262306a36Sopenharmony_ci	error = recover_locks_queue(r, &r->res_grantqueue, seq);
64362306a36Sopenharmony_ci	if (error)
64462306a36Sopenharmony_ci		goto out;
64562306a36Sopenharmony_ci	error = recover_locks_queue(r, &r->res_convertqueue, seq);
64662306a36Sopenharmony_ci	if (error)
64762306a36Sopenharmony_ci		goto out;
64862306a36Sopenharmony_ci	error = recover_locks_queue(r, &r->res_waitqueue, seq);
64962306a36Sopenharmony_ci	if (error)
65062306a36Sopenharmony_ci		goto out;
65162306a36Sopenharmony_ci
65262306a36Sopenharmony_ci	if (r->res_recover_locks_count)
65362306a36Sopenharmony_ci		recover_list_add(r);
65462306a36Sopenharmony_ci	else
65562306a36Sopenharmony_ci		rsb_clear_flag(r, RSB_NEW_MASTER);
65662306a36Sopenharmony_ci out:
65762306a36Sopenharmony_ci	unlock_rsb(r);
65862306a36Sopenharmony_ci	return error;
65962306a36Sopenharmony_ci}
66062306a36Sopenharmony_ci
66162306a36Sopenharmony_ciint dlm_recover_locks(struct dlm_ls *ls, uint64_t seq)
66262306a36Sopenharmony_ci{
66362306a36Sopenharmony_ci	struct dlm_rsb *r;
66462306a36Sopenharmony_ci	int error, count = 0;
66562306a36Sopenharmony_ci
66662306a36Sopenharmony_ci	down_read(&ls->ls_root_sem);
66762306a36Sopenharmony_ci	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
66862306a36Sopenharmony_ci		if (is_master(r)) {
66962306a36Sopenharmony_ci			rsb_clear_flag(r, RSB_NEW_MASTER);
67062306a36Sopenharmony_ci			continue;
67162306a36Sopenharmony_ci		}
67262306a36Sopenharmony_ci
67362306a36Sopenharmony_ci		if (!rsb_flag(r, RSB_NEW_MASTER))
67462306a36Sopenharmony_ci			continue;
67562306a36Sopenharmony_ci
67662306a36Sopenharmony_ci		if (dlm_recovery_stopped(ls)) {
67762306a36Sopenharmony_ci			error = -EINTR;
67862306a36Sopenharmony_ci			up_read(&ls->ls_root_sem);
67962306a36Sopenharmony_ci			goto out;
68062306a36Sopenharmony_ci		}
68162306a36Sopenharmony_ci
68262306a36Sopenharmony_ci		error = recover_locks(r, seq);
68362306a36Sopenharmony_ci		if (error) {
68462306a36Sopenharmony_ci			up_read(&ls->ls_root_sem);
68562306a36Sopenharmony_ci			goto out;
68662306a36Sopenharmony_ci		}
68762306a36Sopenharmony_ci
68862306a36Sopenharmony_ci		count += r->res_recover_locks_count;
68962306a36Sopenharmony_ci	}
69062306a36Sopenharmony_ci	up_read(&ls->ls_root_sem);
69162306a36Sopenharmony_ci
69262306a36Sopenharmony_ci	log_rinfo(ls, "dlm_recover_locks %d out", count);
69362306a36Sopenharmony_ci
69462306a36Sopenharmony_ci	error = dlm_wait_function(ls, &recover_list_empty);
69562306a36Sopenharmony_ci out:
69662306a36Sopenharmony_ci	if (error)
69762306a36Sopenharmony_ci		recover_list_clear(ls);
69862306a36Sopenharmony_ci	return error;
69962306a36Sopenharmony_ci}
70062306a36Sopenharmony_ci
70162306a36Sopenharmony_civoid dlm_recovered_lock(struct dlm_rsb *r)
70262306a36Sopenharmony_ci{
70362306a36Sopenharmony_ci	DLM_ASSERT(rsb_flag(r, RSB_NEW_MASTER), dlm_dump_rsb(r););
70462306a36Sopenharmony_ci
70562306a36Sopenharmony_ci	r->res_recover_locks_count--;
70662306a36Sopenharmony_ci	if (!r->res_recover_locks_count) {
70762306a36Sopenharmony_ci		rsb_clear_flag(r, RSB_NEW_MASTER);
70862306a36Sopenharmony_ci		recover_list_del(r);
70962306a36Sopenharmony_ci	}
71062306a36Sopenharmony_ci
71162306a36Sopenharmony_ci	if (recover_list_empty(r->res_ls))
71262306a36Sopenharmony_ci		wake_up(&r->res_ls->ls_wait_general);
71362306a36Sopenharmony_ci}
71462306a36Sopenharmony_ci
71562306a36Sopenharmony_ci/*
71662306a36Sopenharmony_ci * The lvb needs to be recovered on all master rsb's.  This includes setting
71762306a36Sopenharmony_ci * the VALNOTVALID flag if necessary, and determining the correct lvb contents
71862306a36Sopenharmony_ci * based on the lvb's of the locks held on the rsb.
71962306a36Sopenharmony_ci *
72062306a36Sopenharmony_ci * RSB_VALNOTVALID is set in two cases:
72162306a36Sopenharmony_ci *
72262306a36Sopenharmony_ci * 1. we are master, but not new, and we purged an EX/PW lock held by a
72362306a36Sopenharmony_ci * failed node (in dlm_recover_purge which set RSB_RECOVER_LVB_INVAL)
72462306a36Sopenharmony_ci *
72562306a36Sopenharmony_ci * 2. we are a new master, and there are only NL/CR locks left.
72662306a36Sopenharmony_ci * (We could probably improve this by only invaliding in this way when
72762306a36Sopenharmony_ci * the previous master left uncleanly.  VMS docs mention that.)
72862306a36Sopenharmony_ci *
72962306a36Sopenharmony_ci * The LVB contents are only considered for changing when this is a new master
73062306a36Sopenharmony_ci * of the rsb (NEW_MASTER2).  Then, the rsb's lvb is taken from any lkb with
73162306a36Sopenharmony_ci * mode > CR.  If no lkb's exist with mode above CR, the lvb contents are taken
73262306a36Sopenharmony_ci * from the lkb with the largest lvb sequence number.
73362306a36Sopenharmony_ci */
73462306a36Sopenharmony_ci
73562306a36Sopenharmony_cistatic void recover_lvb(struct dlm_rsb *r)
73662306a36Sopenharmony_ci{
73762306a36Sopenharmony_ci	struct dlm_lkb *big_lkb = NULL, *iter, *high_lkb = NULL;
73862306a36Sopenharmony_ci	uint32_t high_seq = 0;
73962306a36Sopenharmony_ci	int lock_lvb_exists = 0;
74062306a36Sopenharmony_ci	int lvblen = r->res_ls->ls_lvblen;
74162306a36Sopenharmony_ci
74262306a36Sopenharmony_ci	if (!rsb_flag(r, RSB_NEW_MASTER2) &&
74362306a36Sopenharmony_ci	    rsb_flag(r, RSB_RECOVER_LVB_INVAL)) {
74462306a36Sopenharmony_ci		/* case 1 above */
74562306a36Sopenharmony_ci		rsb_set_flag(r, RSB_VALNOTVALID);
74662306a36Sopenharmony_ci		return;
74762306a36Sopenharmony_ci	}
74862306a36Sopenharmony_ci
74962306a36Sopenharmony_ci	if (!rsb_flag(r, RSB_NEW_MASTER2))
75062306a36Sopenharmony_ci		return;
75162306a36Sopenharmony_ci
75262306a36Sopenharmony_ci	/* we are the new master, so figure out if VALNOTVALID should
75362306a36Sopenharmony_ci	   be set, and set the rsb lvb from the best lkb available. */
75462306a36Sopenharmony_ci
75562306a36Sopenharmony_ci	list_for_each_entry(iter, &r->res_grantqueue, lkb_statequeue) {
75662306a36Sopenharmony_ci		if (!(iter->lkb_exflags & DLM_LKF_VALBLK))
75762306a36Sopenharmony_ci			continue;
75862306a36Sopenharmony_ci
75962306a36Sopenharmony_ci		lock_lvb_exists = 1;
76062306a36Sopenharmony_ci
76162306a36Sopenharmony_ci		if (iter->lkb_grmode > DLM_LOCK_CR) {
76262306a36Sopenharmony_ci			big_lkb = iter;
76362306a36Sopenharmony_ci			goto setflag;
76462306a36Sopenharmony_ci		}
76562306a36Sopenharmony_ci
76662306a36Sopenharmony_ci		if (((int)iter->lkb_lvbseq - (int)high_seq) >= 0) {
76762306a36Sopenharmony_ci			high_lkb = iter;
76862306a36Sopenharmony_ci			high_seq = iter->lkb_lvbseq;
76962306a36Sopenharmony_ci		}
77062306a36Sopenharmony_ci	}
77162306a36Sopenharmony_ci
77262306a36Sopenharmony_ci	list_for_each_entry(iter, &r->res_convertqueue, lkb_statequeue) {
77362306a36Sopenharmony_ci		if (!(iter->lkb_exflags & DLM_LKF_VALBLK))
77462306a36Sopenharmony_ci			continue;
77562306a36Sopenharmony_ci
77662306a36Sopenharmony_ci		lock_lvb_exists = 1;
77762306a36Sopenharmony_ci
77862306a36Sopenharmony_ci		if (iter->lkb_grmode > DLM_LOCK_CR) {
77962306a36Sopenharmony_ci			big_lkb = iter;
78062306a36Sopenharmony_ci			goto setflag;
78162306a36Sopenharmony_ci		}
78262306a36Sopenharmony_ci
78362306a36Sopenharmony_ci		if (((int)iter->lkb_lvbseq - (int)high_seq) >= 0) {
78462306a36Sopenharmony_ci			high_lkb = iter;
78562306a36Sopenharmony_ci			high_seq = iter->lkb_lvbseq;
78662306a36Sopenharmony_ci		}
78762306a36Sopenharmony_ci	}
78862306a36Sopenharmony_ci
78962306a36Sopenharmony_ci setflag:
79062306a36Sopenharmony_ci	if (!lock_lvb_exists)
79162306a36Sopenharmony_ci		goto out;
79262306a36Sopenharmony_ci
79362306a36Sopenharmony_ci	/* lvb is invalidated if only NL/CR locks remain */
79462306a36Sopenharmony_ci	if (!big_lkb)
79562306a36Sopenharmony_ci		rsb_set_flag(r, RSB_VALNOTVALID);
79662306a36Sopenharmony_ci
79762306a36Sopenharmony_ci	if (!r->res_lvbptr) {
79862306a36Sopenharmony_ci		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
79962306a36Sopenharmony_ci		if (!r->res_lvbptr)
80062306a36Sopenharmony_ci			goto out;
80162306a36Sopenharmony_ci	}
80262306a36Sopenharmony_ci
80362306a36Sopenharmony_ci	if (big_lkb) {
80462306a36Sopenharmony_ci		r->res_lvbseq = big_lkb->lkb_lvbseq;
80562306a36Sopenharmony_ci		memcpy(r->res_lvbptr, big_lkb->lkb_lvbptr, lvblen);
80662306a36Sopenharmony_ci	} else if (high_lkb) {
80762306a36Sopenharmony_ci		r->res_lvbseq = high_lkb->lkb_lvbseq;
80862306a36Sopenharmony_ci		memcpy(r->res_lvbptr, high_lkb->lkb_lvbptr, lvblen);
80962306a36Sopenharmony_ci	} else {
81062306a36Sopenharmony_ci		r->res_lvbseq = 0;
81162306a36Sopenharmony_ci		memset(r->res_lvbptr, 0, lvblen);
81262306a36Sopenharmony_ci	}
81362306a36Sopenharmony_ci out:
81462306a36Sopenharmony_ci	return;
81562306a36Sopenharmony_ci}
81662306a36Sopenharmony_ci
81762306a36Sopenharmony_ci/* All master rsb's flagged RECOVER_CONVERT need to be looked at.  The locks
81862306a36Sopenharmony_ci   converting PR->CW or CW->PR need to have their lkb_grmode set. */
81962306a36Sopenharmony_ci
82062306a36Sopenharmony_cistatic void recover_conversion(struct dlm_rsb *r)
82162306a36Sopenharmony_ci{
82262306a36Sopenharmony_ci	struct dlm_ls *ls = r->res_ls;
82362306a36Sopenharmony_ci	struct dlm_lkb *lkb;
82462306a36Sopenharmony_ci	int grmode = -1;
82562306a36Sopenharmony_ci
82662306a36Sopenharmony_ci	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
82762306a36Sopenharmony_ci		if (lkb->lkb_grmode == DLM_LOCK_PR ||
82862306a36Sopenharmony_ci		    lkb->lkb_grmode == DLM_LOCK_CW) {
82962306a36Sopenharmony_ci			grmode = lkb->lkb_grmode;
83062306a36Sopenharmony_ci			break;
83162306a36Sopenharmony_ci		}
83262306a36Sopenharmony_ci	}
83362306a36Sopenharmony_ci
83462306a36Sopenharmony_ci	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
83562306a36Sopenharmony_ci		if (lkb->lkb_grmode != DLM_LOCK_IV)
83662306a36Sopenharmony_ci			continue;
83762306a36Sopenharmony_ci		if (grmode == -1) {
83862306a36Sopenharmony_ci			log_debug(ls, "recover_conversion %x set gr to rq %d",
83962306a36Sopenharmony_ci				  lkb->lkb_id, lkb->lkb_rqmode);
84062306a36Sopenharmony_ci			lkb->lkb_grmode = lkb->lkb_rqmode;
84162306a36Sopenharmony_ci		} else {
84262306a36Sopenharmony_ci			log_debug(ls, "recover_conversion %x set gr %d",
84362306a36Sopenharmony_ci				  lkb->lkb_id, grmode);
84462306a36Sopenharmony_ci			lkb->lkb_grmode = grmode;
84562306a36Sopenharmony_ci		}
84662306a36Sopenharmony_ci	}
84762306a36Sopenharmony_ci}
84862306a36Sopenharmony_ci
84962306a36Sopenharmony_ci/* We've become the new master for this rsb and waiting/converting locks may
85062306a36Sopenharmony_ci   need to be granted in dlm_recover_grant() due to locks that may have
85162306a36Sopenharmony_ci   existed from a removed node. */
85262306a36Sopenharmony_ci
85362306a36Sopenharmony_cistatic void recover_grant(struct dlm_rsb *r)
85462306a36Sopenharmony_ci{
85562306a36Sopenharmony_ci	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
85662306a36Sopenharmony_ci		rsb_set_flag(r, RSB_RECOVER_GRANT);
85762306a36Sopenharmony_ci}
85862306a36Sopenharmony_ci
85962306a36Sopenharmony_civoid dlm_recover_rsbs(struct dlm_ls *ls)
86062306a36Sopenharmony_ci{
86162306a36Sopenharmony_ci	struct dlm_rsb *r;
86262306a36Sopenharmony_ci	unsigned int count = 0;
86362306a36Sopenharmony_ci
86462306a36Sopenharmony_ci	down_read(&ls->ls_root_sem);
86562306a36Sopenharmony_ci	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
86662306a36Sopenharmony_ci		lock_rsb(r);
86762306a36Sopenharmony_ci		if (is_master(r)) {
86862306a36Sopenharmony_ci			if (rsb_flag(r, RSB_RECOVER_CONVERT))
86962306a36Sopenharmony_ci				recover_conversion(r);
87062306a36Sopenharmony_ci
87162306a36Sopenharmony_ci			/* recover lvb before granting locks so the updated
87262306a36Sopenharmony_ci			   lvb/VALNOTVALID is presented in the completion */
87362306a36Sopenharmony_ci			recover_lvb(r);
87462306a36Sopenharmony_ci
87562306a36Sopenharmony_ci			if (rsb_flag(r, RSB_NEW_MASTER2))
87662306a36Sopenharmony_ci				recover_grant(r);
87762306a36Sopenharmony_ci			count++;
87862306a36Sopenharmony_ci		} else {
87962306a36Sopenharmony_ci			rsb_clear_flag(r, RSB_VALNOTVALID);
88062306a36Sopenharmony_ci		}
88162306a36Sopenharmony_ci		rsb_clear_flag(r, RSB_RECOVER_CONVERT);
88262306a36Sopenharmony_ci		rsb_clear_flag(r, RSB_RECOVER_LVB_INVAL);
88362306a36Sopenharmony_ci		rsb_clear_flag(r, RSB_NEW_MASTER2);
88462306a36Sopenharmony_ci		unlock_rsb(r);
88562306a36Sopenharmony_ci	}
88662306a36Sopenharmony_ci	up_read(&ls->ls_root_sem);
88762306a36Sopenharmony_ci
88862306a36Sopenharmony_ci	if (count)
88962306a36Sopenharmony_ci		log_rinfo(ls, "dlm_recover_rsbs %d done", count);
89062306a36Sopenharmony_ci}
89162306a36Sopenharmony_ci
89262306a36Sopenharmony_ci/* Create a single list of all root rsb's to be used during recovery */
89362306a36Sopenharmony_ci
89462306a36Sopenharmony_ciint dlm_create_root_list(struct dlm_ls *ls)
89562306a36Sopenharmony_ci{
89662306a36Sopenharmony_ci	struct rb_node *n;
89762306a36Sopenharmony_ci	struct dlm_rsb *r;
89862306a36Sopenharmony_ci	int i, error = 0;
89962306a36Sopenharmony_ci
90062306a36Sopenharmony_ci	down_write(&ls->ls_root_sem);
90162306a36Sopenharmony_ci	if (!list_empty(&ls->ls_root_list)) {
90262306a36Sopenharmony_ci		log_error(ls, "root list not empty");
90362306a36Sopenharmony_ci		error = -EINVAL;
90462306a36Sopenharmony_ci		goto out;
90562306a36Sopenharmony_ci	}
90662306a36Sopenharmony_ci
90762306a36Sopenharmony_ci	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
90862306a36Sopenharmony_ci		spin_lock(&ls->ls_rsbtbl[i].lock);
90962306a36Sopenharmony_ci		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
91062306a36Sopenharmony_ci			r = rb_entry(n, struct dlm_rsb, res_hashnode);
91162306a36Sopenharmony_ci			list_add(&r->res_root_list, &ls->ls_root_list);
91262306a36Sopenharmony_ci			dlm_hold_rsb(r);
91362306a36Sopenharmony_ci		}
91462306a36Sopenharmony_ci
91562306a36Sopenharmony_ci		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
91662306a36Sopenharmony_ci			log_error(ls, "dlm_create_root_list toss not empty");
91762306a36Sopenharmony_ci		spin_unlock(&ls->ls_rsbtbl[i].lock);
91862306a36Sopenharmony_ci	}
91962306a36Sopenharmony_ci out:
92062306a36Sopenharmony_ci	up_write(&ls->ls_root_sem);
92162306a36Sopenharmony_ci	return error;
92262306a36Sopenharmony_ci}
92362306a36Sopenharmony_ci
92462306a36Sopenharmony_civoid dlm_release_root_list(struct dlm_ls *ls)
92562306a36Sopenharmony_ci{
92662306a36Sopenharmony_ci	struct dlm_rsb *r, *safe;
92762306a36Sopenharmony_ci
92862306a36Sopenharmony_ci	down_write(&ls->ls_root_sem);
92962306a36Sopenharmony_ci	list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
93062306a36Sopenharmony_ci		list_del_init(&r->res_root_list);
93162306a36Sopenharmony_ci		dlm_put_rsb(r);
93262306a36Sopenharmony_ci	}
93362306a36Sopenharmony_ci	up_write(&ls->ls_root_sem);
93462306a36Sopenharmony_ci}
93562306a36Sopenharmony_ci
93662306a36Sopenharmony_civoid dlm_clear_toss(struct dlm_ls *ls)
93762306a36Sopenharmony_ci{
93862306a36Sopenharmony_ci	struct rb_node *n, *next;
93962306a36Sopenharmony_ci	struct dlm_rsb *r;
94062306a36Sopenharmony_ci	unsigned int count = 0;
94162306a36Sopenharmony_ci	int i;
94262306a36Sopenharmony_ci
94362306a36Sopenharmony_ci	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
94462306a36Sopenharmony_ci		spin_lock(&ls->ls_rsbtbl[i].lock);
94562306a36Sopenharmony_ci		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
94662306a36Sopenharmony_ci			next = rb_next(n);
94762306a36Sopenharmony_ci			r = rb_entry(n, struct dlm_rsb, res_hashnode);
94862306a36Sopenharmony_ci			rb_erase(n, &ls->ls_rsbtbl[i].toss);
94962306a36Sopenharmony_ci			dlm_free_rsb(r);
95062306a36Sopenharmony_ci			count++;
95162306a36Sopenharmony_ci		}
95262306a36Sopenharmony_ci		spin_unlock(&ls->ls_rsbtbl[i].lock);
95362306a36Sopenharmony_ci	}
95462306a36Sopenharmony_ci
95562306a36Sopenharmony_ci	if (count)
95662306a36Sopenharmony_ci		log_rinfo(ls, "dlm_clear_toss %u done", count);
95762306a36Sopenharmony_ci}
95862306a36Sopenharmony_ci
959