162306a36Sopenharmony_ci// SPDX-License-Identifier: GPL-2.0-or-later
262306a36Sopenharmony_ci/*
362306a36Sopenharmony_ci * dlmmod.c
462306a36Sopenharmony_ci *
562306a36Sopenharmony_ci * standalone DLM module
662306a36Sopenharmony_ci *
762306a36Sopenharmony_ci * Copyright (C) 2004 Oracle.  All rights reserved.
862306a36Sopenharmony_ci */
962306a36Sopenharmony_ci
1062306a36Sopenharmony_ci
1162306a36Sopenharmony_ci#include <linux/module.h>
1262306a36Sopenharmony_ci#include <linux/fs.h>
1362306a36Sopenharmony_ci#include <linux/types.h>
1462306a36Sopenharmony_ci#include <linux/slab.h>
1562306a36Sopenharmony_ci#include <linux/highmem.h>
1662306a36Sopenharmony_ci#include <linux/init.h>
1762306a36Sopenharmony_ci#include <linux/sysctl.h>
1862306a36Sopenharmony_ci#include <linux/random.h>
1962306a36Sopenharmony_ci#include <linux/blkdev.h>
2062306a36Sopenharmony_ci#include <linux/socket.h>
2162306a36Sopenharmony_ci#include <linux/inet.h>
2262306a36Sopenharmony_ci#include <linux/spinlock.h>
2362306a36Sopenharmony_ci#include <linux/delay.h>
2462306a36Sopenharmony_ci
2562306a36Sopenharmony_ci
2662306a36Sopenharmony_ci#include "../cluster/heartbeat.h"
2762306a36Sopenharmony_ci#include "../cluster/nodemanager.h"
2862306a36Sopenharmony_ci#include "../cluster/tcp.h"
2962306a36Sopenharmony_ci
3062306a36Sopenharmony_ci#include "dlmapi.h"
3162306a36Sopenharmony_ci#include "dlmcommon.h"
3262306a36Sopenharmony_ci#include "dlmdomain.h"
3362306a36Sopenharmony_ci#include "dlmdebug.h"
3462306a36Sopenharmony_ci
3562306a36Sopenharmony_ci#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
3662306a36Sopenharmony_ci#include "../cluster/masklog.h"
3762306a36Sopenharmony_ci
3862306a36Sopenharmony_cistatic void dlm_mle_node_down(struct dlm_ctxt *dlm,
3962306a36Sopenharmony_ci			      struct dlm_master_list_entry *mle,
4062306a36Sopenharmony_ci			      struct o2nm_node *node,
4162306a36Sopenharmony_ci			      int idx);
4262306a36Sopenharmony_cistatic void dlm_mle_node_up(struct dlm_ctxt *dlm,
4362306a36Sopenharmony_ci			    struct dlm_master_list_entry *mle,
4462306a36Sopenharmony_ci			    struct o2nm_node *node,
4562306a36Sopenharmony_ci			    int idx);
4662306a36Sopenharmony_ci
4762306a36Sopenharmony_cistatic void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
4862306a36Sopenharmony_cistatic int dlm_do_assert_master(struct dlm_ctxt *dlm,
4962306a36Sopenharmony_ci				struct dlm_lock_resource *res,
5062306a36Sopenharmony_ci				void *nodemap, u32 flags);
5162306a36Sopenharmony_cistatic void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
5262306a36Sopenharmony_ci
5362306a36Sopenharmony_cistatic inline int dlm_mle_equal(struct dlm_ctxt *dlm,
5462306a36Sopenharmony_ci				struct dlm_master_list_entry *mle,
5562306a36Sopenharmony_ci				const char *name,
5662306a36Sopenharmony_ci				unsigned int namelen)
5762306a36Sopenharmony_ci{
5862306a36Sopenharmony_ci	if (dlm != mle->dlm)
5962306a36Sopenharmony_ci		return 0;
6062306a36Sopenharmony_ci
6162306a36Sopenharmony_ci	if (namelen != mle->mnamelen ||
6262306a36Sopenharmony_ci	    memcmp(name, mle->mname, namelen) != 0)
6362306a36Sopenharmony_ci		return 0;
6462306a36Sopenharmony_ci
6562306a36Sopenharmony_ci	return 1;
6662306a36Sopenharmony_ci}
6762306a36Sopenharmony_ci
6862306a36Sopenharmony_cistatic struct kmem_cache *dlm_lockres_cache;
6962306a36Sopenharmony_cistatic struct kmem_cache *dlm_lockname_cache;
7062306a36Sopenharmony_cistatic struct kmem_cache *dlm_mle_cache;
7162306a36Sopenharmony_ci
7262306a36Sopenharmony_cistatic void dlm_mle_release(struct kref *kref);
7362306a36Sopenharmony_cistatic void dlm_init_mle(struct dlm_master_list_entry *mle,
7462306a36Sopenharmony_ci			enum dlm_mle_type type,
7562306a36Sopenharmony_ci			struct dlm_ctxt *dlm,
7662306a36Sopenharmony_ci			struct dlm_lock_resource *res,
7762306a36Sopenharmony_ci			const char *name,
7862306a36Sopenharmony_ci			unsigned int namelen);
7962306a36Sopenharmony_cistatic void dlm_put_mle(struct dlm_master_list_entry *mle);
8062306a36Sopenharmony_cistatic void __dlm_put_mle(struct dlm_master_list_entry *mle);
8162306a36Sopenharmony_cistatic int dlm_find_mle(struct dlm_ctxt *dlm,
8262306a36Sopenharmony_ci			struct dlm_master_list_entry **mle,
8362306a36Sopenharmony_ci			char *name, unsigned int namelen);
8462306a36Sopenharmony_ci
8562306a36Sopenharmony_cistatic int dlm_do_master_request(struct dlm_lock_resource *res,
8662306a36Sopenharmony_ci				 struct dlm_master_list_entry *mle, int to);
8762306a36Sopenharmony_ci
8862306a36Sopenharmony_ci
8962306a36Sopenharmony_cistatic int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
9062306a36Sopenharmony_ci				     struct dlm_lock_resource *res,
9162306a36Sopenharmony_ci				     struct dlm_master_list_entry *mle,
9262306a36Sopenharmony_ci				     int *blocked);
9362306a36Sopenharmony_cistatic int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
9462306a36Sopenharmony_ci				    struct dlm_lock_resource *res,
9562306a36Sopenharmony_ci				    struct dlm_master_list_entry *mle,
9662306a36Sopenharmony_ci				    int blocked);
9762306a36Sopenharmony_cistatic int dlm_add_migration_mle(struct dlm_ctxt *dlm,
9862306a36Sopenharmony_ci				 struct dlm_lock_resource *res,
9962306a36Sopenharmony_ci				 struct dlm_master_list_entry *mle,
10062306a36Sopenharmony_ci				 struct dlm_master_list_entry **oldmle,
10162306a36Sopenharmony_ci				 const char *name, unsigned int namelen,
10262306a36Sopenharmony_ci				 u8 new_master, u8 master);
10362306a36Sopenharmony_ci
10462306a36Sopenharmony_cistatic u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
10562306a36Sopenharmony_ci				    struct dlm_lock_resource *res);
10662306a36Sopenharmony_cistatic void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
10762306a36Sopenharmony_ci				      struct dlm_lock_resource *res);
10862306a36Sopenharmony_cistatic int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
10962306a36Sopenharmony_ci				       struct dlm_lock_resource *res,
11062306a36Sopenharmony_ci				       u8 target);
11162306a36Sopenharmony_cistatic int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
11262306a36Sopenharmony_ci				       struct dlm_lock_resource *res);
11362306a36Sopenharmony_ci
11462306a36Sopenharmony_ci
11562306a36Sopenharmony_ciint dlm_is_host_down(int errno)
11662306a36Sopenharmony_ci{
11762306a36Sopenharmony_ci	switch (errno) {
11862306a36Sopenharmony_ci		case -EBADF:
11962306a36Sopenharmony_ci		case -ECONNREFUSED:
12062306a36Sopenharmony_ci		case -ENOTCONN:
12162306a36Sopenharmony_ci		case -ECONNRESET:
12262306a36Sopenharmony_ci		case -EPIPE:
12362306a36Sopenharmony_ci		case -EHOSTDOWN:
12462306a36Sopenharmony_ci		case -EHOSTUNREACH:
12562306a36Sopenharmony_ci		case -ETIMEDOUT:
12662306a36Sopenharmony_ci		case -ECONNABORTED:
12762306a36Sopenharmony_ci		case -ENETDOWN:
12862306a36Sopenharmony_ci		case -ENETUNREACH:
12962306a36Sopenharmony_ci		case -ENETRESET:
13062306a36Sopenharmony_ci		case -ESHUTDOWN:
13162306a36Sopenharmony_ci		case -ENOPROTOOPT:
13262306a36Sopenharmony_ci		case -EINVAL:   /* if returned from our tcp code,
13362306a36Sopenharmony_ci				   this means there is no socket */
13462306a36Sopenharmony_ci			return 1;
13562306a36Sopenharmony_ci	}
13662306a36Sopenharmony_ci	return 0;
13762306a36Sopenharmony_ci}
13862306a36Sopenharmony_ci
13962306a36Sopenharmony_ci
14062306a36Sopenharmony_ci/*
14162306a36Sopenharmony_ci * MASTER LIST FUNCTIONS
14262306a36Sopenharmony_ci */
14362306a36Sopenharmony_ci
14462306a36Sopenharmony_ci
14562306a36Sopenharmony_ci/*
14662306a36Sopenharmony_ci * regarding master list entries and heartbeat callbacks:
14762306a36Sopenharmony_ci *
14862306a36Sopenharmony_ci * in order to avoid sleeping and allocation that occurs in
14962306a36Sopenharmony_ci * heartbeat, master list entries are simply attached to the
15062306a36Sopenharmony_ci * dlm's established heartbeat callbacks.  the mle is attached
15162306a36Sopenharmony_ci * when it is created, and since the dlm->spinlock is held at
15262306a36Sopenharmony_ci * that time, any heartbeat event will be properly discovered
15362306a36Sopenharmony_ci * by the mle.  the mle needs to be detached from the
15462306a36Sopenharmony_ci * dlm->mle_hb_events list as soon as heartbeat events are no
15562306a36Sopenharmony_ci * longer useful to the mle, and before the mle is freed.
15662306a36Sopenharmony_ci *
15762306a36Sopenharmony_ci * as a general rule, heartbeat events are no longer needed by
15862306a36Sopenharmony_ci * the mle once an "answer" regarding the lock master has been
15962306a36Sopenharmony_ci * received.
16062306a36Sopenharmony_ci */
16162306a36Sopenharmony_cistatic inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
16262306a36Sopenharmony_ci					      struct dlm_master_list_entry *mle)
16362306a36Sopenharmony_ci{
16462306a36Sopenharmony_ci	assert_spin_locked(&dlm->spinlock);
16562306a36Sopenharmony_ci
16662306a36Sopenharmony_ci	list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
16762306a36Sopenharmony_ci}
16862306a36Sopenharmony_ci
16962306a36Sopenharmony_ci
17062306a36Sopenharmony_cistatic inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
17162306a36Sopenharmony_ci					      struct dlm_master_list_entry *mle)
17262306a36Sopenharmony_ci{
17362306a36Sopenharmony_ci	if (!list_empty(&mle->hb_events))
17462306a36Sopenharmony_ci		list_del_init(&mle->hb_events);
17562306a36Sopenharmony_ci}
17662306a36Sopenharmony_ci
17762306a36Sopenharmony_ci
17862306a36Sopenharmony_cistatic inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
17962306a36Sopenharmony_ci					    struct dlm_master_list_entry *mle)
18062306a36Sopenharmony_ci{
18162306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
18262306a36Sopenharmony_ci	__dlm_mle_detach_hb_events(dlm, mle);
18362306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
18462306a36Sopenharmony_ci}
18562306a36Sopenharmony_ci
18662306a36Sopenharmony_cistatic void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
18762306a36Sopenharmony_ci{
18862306a36Sopenharmony_ci	struct dlm_ctxt *dlm;
18962306a36Sopenharmony_ci	dlm = mle->dlm;
19062306a36Sopenharmony_ci
19162306a36Sopenharmony_ci	assert_spin_locked(&dlm->spinlock);
19262306a36Sopenharmony_ci	assert_spin_locked(&dlm->master_lock);
19362306a36Sopenharmony_ci	mle->inuse++;
19462306a36Sopenharmony_ci	kref_get(&mle->mle_refs);
19562306a36Sopenharmony_ci}
19662306a36Sopenharmony_ci
19762306a36Sopenharmony_cistatic void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
19862306a36Sopenharmony_ci{
19962306a36Sopenharmony_ci	struct dlm_ctxt *dlm;
20062306a36Sopenharmony_ci	dlm = mle->dlm;
20162306a36Sopenharmony_ci
20262306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
20362306a36Sopenharmony_ci	spin_lock(&dlm->master_lock);
20462306a36Sopenharmony_ci	mle->inuse--;
20562306a36Sopenharmony_ci	__dlm_put_mle(mle);
20662306a36Sopenharmony_ci	spin_unlock(&dlm->master_lock);
20762306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
20862306a36Sopenharmony_ci
20962306a36Sopenharmony_ci}
21062306a36Sopenharmony_ci
21162306a36Sopenharmony_ci/* remove from list and free */
21262306a36Sopenharmony_cistatic void __dlm_put_mle(struct dlm_master_list_entry *mle)
21362306a36Sopenharmony_ci{
21462306a36Sopenharmony_ci	struct dlm_ctxt *dlm;
21562306a36Sopenharmony_ci	dlm = mle->dlm;
21662306a36Sopenharmony_ci
21762306a36Sopenharmony_ci	assert_spin_locked(&dlm->spinlock);
21862306a36Sopenharmony_ci	assert_spin_locked(&dlm->master_lock);
21962306a36Sopenharmony_ci	if (!kref_read(&mle->mle_refs)) {
22062306a36Sopenharmony_ci		/* this may or may not crash, but who cares.
22162306a36Sopenharmony_ci		 * it's a BUG. */
22262306a36Sopenharmony_ci		mlog(ML_ERROR, "bad mle: %p\n", mle);
22362306a36Sopenharmony_ci		dlm_print_one_mle(mle);
22462306a36Sopenharmony_ci		BUG();
22562306a36Sopenharmony_ci	} else
22662306a36Sopenharmony_ci		kref_put(&mle->mle_refs, dlm_mle_release);
22762306a36Sopenharmony_ci}
22862306a36Sopenharmony_ci
22962306a36Sopenharmony_ci
23062306a36Sopenharmony_ci/* must not have any spinlocks coming in */
23162306a36Sopenharmony_cistatic void dlm_put_mle(struct dlm_master_list_entry *mle)
23262306a36Sopenharmony_ci{
23362306a36Sopenharmony_ci	struct dlm_ctxt *dlm;
23462306a36Sopenharmony_ci	dlm = mle->dlm;
23562306a36Sopenharmony_ci
23662306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
23762306a36Sopenharmony_ci	spin_lock(&dlm->master_lock);
23862306a36Sopenharmony_ci	__dlm_put_mle(mle);
23962306a36Sopenharmony_ci	spin_unlock(&dlm->master_lock);
24062306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
24162306a36Sopenharmony_ci}
24262306a36Sopenharmony_ci
24362306a36Sopenharmony_cistatic inline void dlm_get_mle(struct dlm_master_list_entry *mle)
24462306a36Sopenharmony_ci{
24562306a36Sopenharmony_ci	kref_get(&mle->mle_refs);
24662306a36Sopenharmony_ci}
24762306a36Sopenharmony_ci
24862306a36Sopenharmony_cistatic void dlm_init_mle(struct dlm_master_list_entry *mle,
24962306a36Sopenharmony_ci			enum dlm_mle_type type,
25062306a36Sopenharmony_ci			struct dlm_ctxt *dlm,
25162306a36Sopenharmony_ci			struct dlm_lock_resource *res,
25262306a36Sopenharmony_ci			const char *name,
25362306a36Sopenharmony_ci			unsigned int namelen)
25462306a36Sopenharmony_ci{
25562306a36Sopenharmony_ci	assert_spin_locked(&dlm->spinlock);
25662306a36Sopenharmony_ci
25762306a36Sopenharmony_ci	mle->dlm = dlm;
25862306a36Sopenharmony_ci	mle->type = type;
25962306a36Sopenharmony_ci	INIT_HLIST_NODE(&mle->master_hash_node);
26062306a36Sopenharmony_ci	INIT_LIST_HEAD(&mle->hb_events);
26162306a36Sopenharmony_ci	bitmap_zero(mle->maybe_map, O2NM_MAX_NODES);
26262306a36Sopenharmony_ci	spin_lock_init(&mle->spinlock);
26362306a36Sopenharmony_ci	init_waitqueue_head(&mle->wq);
26462306a36Sopenharmony_ci	atomic_set(&mle->woken, 0);
26562306a36Sopenharmony_ci	kref_init(&mle->mle_refs);
26662306a36Sopenharmony_ci	bitmap_zero(mle->response_map, O2NM_MAX_NODES);
26762306a36Sopenharmony_ci	mle->master = O2NM_MAX_NODES;
26862306a36Sopenharmony_ci	mle->new_master = O2NM_MAX_NODES;
26962306a36Sopenharmony_ci	mle->inuse = 0;
27062306a36Sopenharmony_ci
27162306a36Sopenharmony_ci	BUG_ON(mle->type != DLM_MLE_BLOCK &&
27262306a36Sopenharmony_ci	       mle->type != DLM_MLE_MASTER &&
27362306a36Sopenharmony_ci	       mle->type != DLM_MLE_MIGRATION);
27462306a36Sopenharmony_ci
27562306a36Sopenharmony_ci	if (mle->type == DLM_MLE_MASTER) {
27662306a36Sopenharmony_ci		BUG_ON(!res);
27762306a36Sopenharmony_ci		mle->mleres = res;
27862306a36Sopenharmony_ci		memcpy(mle->mname, res->lockname.name, res->lockname.len);
27962306a36Sopenharmony_ci		mle->mnamelen = res->lockname.len;
28062306a36Sopenharmony_ci		mle->mnamehash = res->lockname.hash;
28162306a36Sopenharmony_ci	} else {
28262306a36Sopenharmony_ci		BUG_ON(!name);
28362306a36Sopenharmony_ci		mle->mleres = NULL;
28462306a36Sopenharmony_ci		memcpy(mle->mname, name, namelen);
28562306a36Sopenharmony_ci		mle->mnamelen = namelen;
28662306a36Sopenharmony_ci		mle->mnamehash = dlm_lockid_hash(name, namelen);
28762306a36Sopenharmony_ci	}
28862306a36Sopenharmony_ci
28962306a36Sopenharmony_ci	atomic_inc(&dlm->mle_tot_count[mle->type]);
29062306a36Sopenharmony_ci	atomic_inc(&dlm->mle_cur_count[mle->type]);
29162306a36Sopenharmony_ci
29262306a36Sopenharmony_ci	/* copy off the node_map and register hb callbacks on our copy */
29362306a36Sopenharmony_ci	bitmap_copy(mle->node_map, dlm->domain_map, O2NM_MAX_NODES);
29462306a36Sopenharmony_ci	bitmap_copy(mle->vote_map, dlm->domain_map, O2NM_MAX_NODES);
29562306a36Sopenharmony_ci	clear_bit(dlm->node_num, mle->vote_map);
29662306a36Sopenharmony_ci	clear_bit(dlm->node_num, mle->node_map);
29762306a36Sopenharmony_ci
29862306a36Sopenharmony_ci	/* attach the mle to the domain node up/down events */
29962306a36Sopenharmony_ci	__dlm_mle_attach_hb_events(dlm, mle);
30062306a36Sopenharmony_ci}
30162306a36Sopenharmony_ci
30262306a36Sopenharmony_civoid __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
30362306a36Sopenharmony_ci{
30462306a36Sopenharmony_ci	assert_spin_locked(&dlm->spinlock);
30562306a36Sopenharmony_ci	assert_spin_locked(&dlm->master_lock);
30662306a36Sopenharmony_ci
30762306a36Sopenharmony_ci	if (!hlist_unhashed(&mle->master_hash_node))
30862306a36Sopenharmony_ci		hlist_del_init(&mle->master_hash_node);
30962306a36Sopenharmony_ci}
31062306a36Sopenharmony_ci
31162306a36Sopenharmony_civoid __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
31262306a36Sopenharmony_ci{
31362306a36Sopenharmony_ci	struct hlist_head *bucket;
31462306a36Sopenharmony_ci
31562306a36Sopenharmony_ci	assert_spin_locked(&dlm->master_lock);
31662306a36Sopenharmony_ci
31762306a36Sopenharmony_ci	bucket = dlm_master_hash(dlm, mle->mnamehash);
31862306a36Sopenharmony_ci	hlist_add_head(&mle->master_hash_node, bucket);
31962306a36Sopenharmony_ci}
32062306a36Sopenharmony_ci
32162306a36Sopenharmony_ci/* returns 1 if found, 0 if not */
32262306a36Sopenharmony_cistatic int dlm_find_mle(struct dlm_ctxt *dlm,
32362306a36Sopenharmony_ci			struct dlm_master_list_entry **mle,
32462306a36Sopenharmony_ci			char *name, unsigned int namelen)
32562306a36Sopenharmony_ci{
32662306a36Sopenharmony_ci	struct dlm_master_list_entry *tmpmle;
32762306a36Sopenharmony_ci	struct hlist_head *bucket;
32862306a36Sopenharmony_ci	unsigned int hash;
32962306a36Sopenharmony_ci
33062306a36Sopenharmony_ci	assert_spin_locked(&dlm->master_lock);
33162306a36Sopenharmony_ci
33262306a36Sopenharmony_ci	hash = dlm_lockid_hash(name, namelen);
33362306a36Sopenharmony_ci	bucket = dlm_master_hash(dlm, hash);
33462306a36Sopenharmony_ci	hlist_for_each_entry(tmpmle, bucket, master_hash_node) {
33562306a36Sopenharmony_ci		if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
33662306a36Sopenharmony_ci			continue;
33762306a36Sopenharmony_ci		dlm_get_mle(tmpmle);
33862306a36Sopenharmony_ci		*mle = tmpmle;
33962306a36Sopenharmony_ci		return 1;
34062306a36Sopenharmony_ci	}
34162306a36Sopenharmony_ci	return 0;
34262306a36Sopenharmony_ci}
34362306a36Sopenharmony_ci
34462306a36Sopenharmony_civoid dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
34562306a36Sopenharmony_ci{
34662306a36Sopenharmony_ci	struct dlm_master_list_entry *mle;
34762306a36Sopenharmony_ci
34862306a36Sopenharmony_ci	assert_spin_locked(&dlm->spinlock);
34962306a36Sopenharmony_ci
35062306a36Sopenharmony_ci	list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
35162306a36Sopenharmony_ci		if (node_up)
35262306a36Sopenharmony_ci			dlm_mle_node_up(dlm, mle, NULL, idx);
35362306a36Sopenharmony_ci		else
35462306a36Sopenharmony_ci			dlm_mle_node_down(dlm, mle, NULL, idx);
35562306a36Sopenharmony_ci	}
35662306a36Sopenharmony_ci}
35762306a36Sopenharmony_ci
35862306a36Sopenharmony_cistatic void dlm_mle_node_down(struct dlm_ctxt *dlm,
35962306a36Sopenharmony_ci			      struct dlm_master_list_entry *mle,
36062306a36Sopenharmony_ci			      struct o2nm_node *node, int idx)
36162306a36Sopenharmony_ci{
36262306a36Sopenharmony_ci	spin_lock(&mle->spinlock);
36362306a36Sopenharmony_ci
36462306a36Sopenharmony_ci	if (!test_bit(idx, mle->node_map))
36562306a36Sopenharmony_ci		mlog(0, "node %u already removed from nodemap!\n", idx);
36662306a36Sopenharmony_ci	else
36762306a36Sopenharmony_ci		clear_bit(idx, mle->node_map);
36862306a36Sopenharmony_ci
36962306a36Sopenharmony_ci	spin_unlock(&mle->spinlock);
37062306a36Sopenharmony_ci}
37162306a36Sopenharmony_ci
37262306a36Sopenharmony_cistatic void dlm_mle_node_up(struct dlm_ctxt *dlm,
37362306a36Sopenharmony_ci			    struct dlm_master_list_entry *mle,
37462306a36Sopenharmony_ci			    struct o2nm_node *node, int idx)
37562306a36Sopenharmony_ci{
37662306a36Sopenharmony_ci	spin_lock(&mle->spinlock);
37762306a36Sopenharmony_ci
37862306a36Sopenharmony_ci	if (test_bit(idx, mle->node_map))
37962306a36Sopenharmony_ci		mlog(0, "node %u already in node map!\n", idx);
38062306a36Sopenharmony_ci	else
38162306a36Sopenharmony_ci		set_bit(idx, mle->node_map);
38262306a36Sopenharmony_ci
38362306a36Sopenharmony_ci	spin_unlock(&mle->spinlock);
38462306a36Sopenharmony_ci}
38562306a36Sopenharmony_ci
38662306a36Sopenharmony_ci
38762306a36Sopenharmony_ciint dlm_init_mle_cache(void)
38862306a36Sopenharmony_ci{
38962306a36Sopenharmony_ci	dlm_mle_cache = kmem_cache_create("o2dlm_mle",
39062306a36Sopenharmony_ci					  sizeof(struct dlm_master_list_entry),
39162306a36Sopenharmony_ci					  0, SLAB_HWCACHE_ALIGN,
39262306a36Sopenharmony_ci					  NULL);
39362306a36Sopenharmony_ci	if (dlm_mle_cache == NULL)
39462306a36Sopenharmony_ci		return -ENOMEM;
39562306a36Sopenharmony_ci	return 0;
39662306a36Sopenharmony_ci}
39762306a36Sopenharmony_ci
39862306a36Sopenharmony_civoid dlm_destroy_mle_cache(void)
39962306a36Sopenharmony_ci{
40062306a36Sopenharmony_ci	kmem_cache_destroy(dlm_mle_cache);
40162306a36Sopenharmony_ci}
40262306a36Sopenharmony_ci
40362306a36Sopenharmony_cistatic void dlm_mle_release(struct kref *kref)
40462306a36Sopenharmony_ci{
40562306a36Sopenharmony_ci	struct dlm_master_list_entry *mle;
40662306a36Sopenharmony_ci	struct dlm_ctxt *dlm;
40762306a36Sopenharmony_ci
40862306a36Sopenharmony_ci	mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
40962306a36Sopenharmony_ci	dlm = mle->dlm;
41062306a36Sopenharmony_ci
41162306a36Sopenharmony_ci	assert_spin_locked(&dlm->spinlock);
41262306a36Sopenharmony_ci	assert_spin_locked(&dlm->master_lock);
41362306a36Sopenharmony_ci
41462306a36Sopenharmony_ci	mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
41562306a36Sopenharmony_ci	     mle->type);
41662306a36Sopenharmony_ci
41762306a36Sopenharmony_ci	/* remove from list if not already */
41862306a36Sopenharmony_ci	__dlm_unlink_mle(dlm, mle);
41962306a36Sopenharmony_ci
42062306a36Sopenharmony_ci	/* detach the mle from the domain node up/down events */
42162306a36Sopenharmony_ci	__dlm_mle_detach_hb_events(dlm, mle);
42262306a36Sopenharmony_ci
42362306a36Sopenharmony_ci	atomic_dec(&dlm->mle_cur_count[mle->type]);
42462306a36Sopenharmony_ci
42562306a36Sopenharmony_ci	/* NOTE: kfree under spinlock here.
42662306a36Sopenharmony_ci	 * if this is bad, we can move this to a freelist. */
42762306a36Sopenharmony_ci	kmem_cache_free(dlm_mle_cache, mle);
42862306a36Sopenharmony_ci}
42962306a36Sopenharmony_ci
43062306a36Sopenharmony_ci
43162306a36Sopenharmony_ci/*
43262306a36Sopenharmony_ci * LOCK RESOURCE FUNCTIONS
43362306a36Sopenharmony_ci */
43462306a36Sopenharmony_ci
43562306a36Sopenharmony_ciint dlm_init_master_caches(void)
43662306a36Sopenharmony_ci{
43762306a36Sopenharmony_ci	dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
43862306a36Sopenharmony_ci					      sizeof(struct dlm_lock_resource),
43962306a36Sopenharmony_ci					      0, SLAB_HWCACHE_ALIGN, NULL);
44062306a36Sopenharmony_ci	if (!dlm_lockres_cache)
44162306a36Sopenharmony_ci		goto bail;
44262306a36Sopenharmony_ci
44362306a36Sopenharmony_ci	dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
44462306a36Sopenharmony_ci					       DLM_LOCKID_NAME_MAX, 0,
44562306a36Sopenharmony_ci					       SLAB_HWCACHE_ALIGN, NULL);
44662306a36Sopenharmony_ci	if (!dlm_lockname_cache)
44762306a36Sopenharmony_ci		goto bail;
44862306a36Sopenharmony_ci
44962306a36Sopenharmony_ci	return 0;
45062306a36Sopenharmony_cibail:
45162306a36Sopenharmony_ci	dlm_destroy_master_caches();
45262306a36Sopenharmony_ci	return -ENOMEM;
45362306a36Sopenharmony_ci}
45462306a36Sopenharmony_ci
45562306a36Sopenharmony_civoid dlm_destroy_master_caches(void)
45662306a36Sopenharmony_ci{
45762306a36Sopenharmony_ci	kmem_cache_destroy(dlm_lockname_cache);
45862306a36Sopenharmony_ci	dlm_lockname_cache = NULL;
45962306a36Sopenharmony_ci
46062306a36Sopenharmony_ci	kmem_cache_destroy(dlm_lockres_cache);
46162306a36Sopenharmony_ci	dlm_lockres_cache = NULL;
46262306a36Sopenharmony_ci}
46362306a36Sopenharmony_ci
46462306a36Sopenharmony_cistatic void dlm_lockres_release(struct kref *kref)
46562306a36Sopenharmony_ci{
46662306a36Sopenharmony_ci	struct dlm_lock_resource *res;
46762306a36Sopenharmony_ci	struct dlm_ctxt *dlm;
46862306a36Sopenharmony_ci
46962306a36Sopenharmony_ci	res = container_of(kref, struct dlm_lock_resource, refs);
47062306a36Sopenharmony_ci	dlm = res->dlm;
47162306a36Sopenharmony_ci
47262306a36Sopenharmony_ci	/* This should not happen -- all lockres' have a name
47362306a36Sopenharmony_ci	 * associated with them at init time. */
47462306a36Sopenharmony_ci	BUG_ON(!res->lockname.name);
47562306a36Sopenharmony_ci
47662306a36Sopenharmony_ci	mlog(0, "destroying lockres %.*s\n", res->lockname.len,
47762306a36Sopenharmony_ci	     res->lockname.name);
47862306a36Sopenharmony_ci
47962306a36Sopenharmony_ci	atomic_dec(&dlm->res_cur_count);
48062306a36Sopenharmony_ci
48162306a36Sopenharmony_ci	if (!hlist_unhashed(&res->hash_node) ||
48262306a36Sopenharmony_ci	    !list_empty(&res->granted) ||
48362306a36Sopenharmony_ci	    !list_empty(&res->converting) ||
48462306a36Sopenharmony_ci	    !list_empty(&res->blocked) ||
48562306a36Sopenharmony_ci	    !list_empty(&res->dirty) ||
48662306a36Sopenharmony_ci	    !list_empty(&res->recovering) ||
48762306a36Sopenharmony_ci	    !list_empty(&res->purge)) {
48862306a36Sopenharmony_ci		mlog(ML_ERROR,
48962306a36Sopenharmony_ci		     "Going to BUG for resource %.*s."
49062306a36Sopenharmony_ci		     "  We're on a list! [%c%c%c%c%c%c%c]\n",
49162306a36Sopenharmony_ci		     res->lockname.len, res->lockname.name,
49262306a36Sopenharmony_ci		     !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
49362306a36Sopenharmony_ci		     !list_empty(&res->granted) ? 'G' : ' ',
49462306a36Sopenharmony_ci		     !list_empty(&res->converting) ? 'C' : ' ',
49562306a36Sopenharmony_ci		     !list_empty(&res->blocked) ? 'B' : ' ',
49662306a36Sopenharmony_ci		     !list_empty(&res->dirty) ? 'D' : ' ',
49762306a36Sopenharmony_ci		     !list_empty(&res->recovering) ? 'R' : ' ',
49862306a36Sopenharmony_ci		     !list_empty(&res->purge) ? 'P' : ' ');
49962306a36Sopenharmony_ci
50062306a36Sopenharmony_ci		dlm_print_one_lock_resource(res);
50162306a36Sopenharmony_ci	}
50262306a36Sopenharmony_ci
50362306a36Sopenharmony_ci	/* By the time we're ready to blow this guy away, we shouldn't
50462306a36Sopenharmony_ci	 * be on any lists. */
50562306a36Sopenharmony_ci	BUG_ON(!hlist_unhashed(&res->hash_node));
50662306a36Sopenharmony_ci	BUG_ON(!list_empty(&res->granted));
50762306a36Sopenharmony_ci	BUG_ON(!list_empty(&res->converting));
50862306a36Sopenharmony_ci	BUG_ON(!list_empty(&res->blocked));
50962306a36Sopenharmony_ci	BUG_ON(!list_empty(&res->dirty));
51062306a36Sopenharmony_ci	BUG_ON(!list_empty(&res->recovering));
51162306a36Sopenharmony_ci	BUG_ON(!list_empty(&res->purge));
51262306a36Sopenharmony_ci
51362306a36Sopenharmony_ci	kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
51462306a36Sopenharmony_ci
51562306a36Sopenharmony_ci	kmem_cache_free(dlm_lockres_cache, res);
51662306a36Sopenharmony_ci}
51762306a36Sopenharmony_ci
51862306a36Sopenharmony_civoid dlm_lockres_put(struct dlm_lock_resource *res)
51962306a36Sopenharmony_ci{
52062306a36Sopenharmony_ci	kref_put(&res->refs, dlm_lockres_release);
52162306a36Sopenharmony_ci}
52262306a36Sopenharmony_ci
52362306a36Sopenharmony_cistatic void dlm_init_lockres(struct dlm_ctxt *dlm,
52462306a36Sopenharmony_ci			     struct dlm_lock_resource *res,
52562306a36Sopenharmony_ci			     const char *name, unsigned int namelen)
52662306a36Sopenharmony_ci{
52762306a36Sopenharmony_ci	char *qname;
52862306a36Sopenharmony_ci
52962306a36Sopenharmony_ci	/* If we memset here, we lose our reference to the kmalloc'd
53062306a36Sopenharmony_ci	 * res->lockname.name, so be sure to init every field
53162306a36Sopenharmony_ci	 * correctly! */
53262306a36Sopenharmony_ci
53362306a36Sopenharmony_ci	qname = (char *) res->lockname.name;
53462306a36Sopenharmony_ci	memcpy(qname, name, namelen);
53562306a36Sopenharmony_ci
53662306a36Sopenharmony_ci	res->lockname.len = namelen;
53762306a36Sopenharmony_ci	res->lockname.hash = dlm_lockid_hash(name, namelen);
53862306a36Sopenharmony_ci
53962306a36Sopenharmony_ci	init_waitqueue_head(&res->wq);
54062306a36Sopenharmony_ci	spin_lock_init(&res->spinlock);
54162306a36Sopenharmony_ci	INIT_HLIST_NODE(&res->hash_node);
54262306a36Sopenharmony_ci	INIT_LIST_HEAD(&res->granted);
54362306a36Sopenharmony_ci	INIT_LIST_HEAD(&res->converting);
54462306a36Sopenharmony_ci	INIT_LIST_HEAD(&res->blocked);
54562306a36Sopenharmony_ci	INIT_LIST_HEAD(&res->dirty);
54662306a36Sopenharmony_ci	INIT_LIST_HEAD(&res->recovering);
54762306a36Sopenharmony_ci	INIT_LIST_HEAD(&res->purge);
54862306a36Sopenharmony_ci	INIT_LIST_HEAD(&res->tracking);
54962306a36Sopenharmony_ci	atomic_set(&res->asts_reserved, 0);
55062306a36Sopenharmony_ci	res->migration_pending = 0;
55162306a36Sopenharmony_ci	res->inflight_locks = 0;
55262306a36Sopenharmony_ci	res->inflight_assert_workers = 0;
55362306a36Sopenharmony_ci
55462306a36Sopenharmony_ci	res->dlm = dlm;
55562306a36Sopenharmony_ci
55662306a36Sopenharmony_ci	kref_init(&res->refs);
55762306a36Sopenharmony_ci
55862306a36Sopenharmony_ci	atomic_inc(&dlm->res_tot_count);
55962306a36Sopenharmony_ci	atomic_inc(&dlm->res_cur_count);
56062306a36Sopenharmony_ci
56162306a36Sopenharmony_ci	/* just for consistency */
56262306a36Sopenharmony_ci	spin_lock(&res->spinlock);
56362306a36Sopenharmony_ci	dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
56462306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
56562306a36Sopenharmony_ci
56662306a36Sopenharmony_ci	res->state = DLM_LOCK_RES_IN_PROGRESS;
56762306a36Sopenharmony_ci
56862306a36Sopenharmony_ci	res->last_used = 0;
56962306a36Sopenharmony_ci
57062306a36Sopenharmony_ci	spin_lock(&dlm->track_lock);
57162306a36Sopenharmony_ci	list_add_tail(&res->tracking, &dlm->tracking_list);
57262306a36Sopenharmony_ci	spin_unlock(&dlm->track_lock);
57362306a36Sopenharmony_ci
57462306a36Sopenharmony_ci	memset(res->lvb, 0, DLM_LVB_LEN);
57562306a36Sopenharmony_ci	bitmap_zero(res->refmap, O2NM_MAX_NODES);
57662306a36Sopenharmony_ci}
57762306a36Sopenharmony_ci
57862306a36Sopenharmony_cistruct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
57962306a36Sopenharmony_ci				   const char *name,
58062306a36Sopenharmony_ci				   unsigned int namelen)
58162306a36Sopenharmony_ci{
58262306a36Sopenharmony_ci	struct dlm_lock_resource *res = NULL;
58362306a36Sopenharmony_ci
58462306a36Sopenharmony_ci	res = kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
58562306a36Sopenharmony_ci	if (!res)
58662306a36Sopenharmony_ci		goto error;
58762306a36Sopenharmony_ci
58862306a36Sopenharmony_ci	res->lockname.name = kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
58962306a36Sopenharmony_ci	if (!res->lockname.name)
59062306a36Sopenharmony_ci		goto error;
59162306a36Sopenharmony_ci
59262306a36Sopenharmony_ci	dlm_init_lockres(dlm, res, name, namelen);
59362306a36Sopenharmony_ci	return res;
59462306a36Sopenharmony_ci
59562306a36Sopenharmony_cierror:
59662306a36Sopenharmony_ci	if (res)
59762306a36Sopenharmony_ci		kmem_cache_free(dlm_lockres_cache, res);
59862306a36Sopenharmony_ci	return NULL;
59962306a36Sopenharmony_ci}
60062306a36Sopenharmony_ci
60162306a36Sopenharmony_civoid dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
60262306a36Sopenharmony_ci				struct dlm_lock_resource *res, int bit)
60362306a36Sopenharmony_ci{
60462306a36Sopenharmony_ci	assert_spin_locked(&res->spinlock);
60562306a36Sopenharmony_ci
60662306a36Sopenharmony_ci	mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
60762306a36Sopenharmony_ci	     res->lockname.name, bit, __builtin_return_address(0));
60862306a36Sopenharmony_ci
60962306a36Sopenharmony_ci	set_bit(bit, res->refmap);
61062306a36Sopenharmony_ci}
61162306a36Sopenharmony_ci
61262306a36Sopenharmony_civoid dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
61362306a36Sopenharmony_ci				  struct dlm_lock_resource *res, int bit)
61462306a36Sopenharmony_ci{
61562306a36Sopenharmony_ci	assert_spin_locked(&res->spinlock);
61662306a36Sopenharmony_ci
61762306a36Sopenharmony_ci	mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
61862306a36Sopenharmony_ci	     res->lockname.name, bit, __builtin_return_address(0));
61962306a36Sopenharmony_ci
62062306a36Sopenharmony_ci	clear_bit(bit, res->refmap);
62162306a36Sopenharmony_ci}
62262306a36Sopenharmony_ci
62362306a36Sopenharmony_cistatic void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
62462306a36Sopenharmony_ci				   struct dlm_lock_resource *res)
62562306a36Sopenharmony_ci{
62662306a36Sopenharmony_ci	res->inflight_locks++;
62762306a36Sopenharmony_ci
62862306a36Sopenharmony_ci	mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
62962306a36Sopenharmony_ci	     res->lockname.len, res->lockname.name, res->inflight_locks,
63062306a36Sopenharmony_ci	     __builtin_return_address(0));
63162306a36Sopenharmony_ci}
63262306a36Sopenharmony_ci
63362306a36Sopenharmony_civoid dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
63462306a36Sopenharmony_ci				   struct dlm_lock_resource *res)
63562306a36Sopenharmony_ci{
63662306a36Sopenharmony_ci	assert_spin_locked(&res->spinlock);
63762306a36Sopenharmony_ci	__dlm_lockres_grab_inflight_ref(dlm, res);
63862306a36Sopenharmony_ci}
63962306a36Sopenharmony_ci
64062306a36Sopenharmony_civoid dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
64162306a36Sopenharmony_ci				   struct dlm_lock_resource *res)
64262306a36Sopenharmony_ci{
64362306a36Sopenharmony_ci	assert_spin_locked(&res->spinlock);
64462306a36Sopenharmony_ci
64562306a36Sopenharmony_ci	BUG_ON(res->inflight_locks == 0);
64662306a36Sopenharmony_ci
64762306a36Sopenharmony_ci	res->inflight_locks--;
64862306a36Sopenharmony_ci
64962306a36Sopenharmony_ci	mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
65062306a36Sopenharmony_ci	     res->lockname.len, res->lockname.name, res->inflight_locks,
65162306a36Sopenharmony_ci	     __builtin_return_address(0));
65262306a36Sopenharmony_ci
65362306a36Sopenharmony_ci	wake_up(&res->wq);
65462306a36Sopenharmony_ci}
65562306a36Sopenharmony_ci
65662306a36Sopenharmony_civoid __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
65762306a36Sopenharmony_ci		struct dlm_lock_resource *res)
65862306a36Sopenharmony_ci{
65962306a36Sopenharmony_ci	assert_spin_locked(&res->spinlock);
66062306a36Sopenharmony_ci	res->inflight_assert_workers++;
66162306a36Sopenharmony_ci	mlog(0, "%s:%.*s: inflight assert worker++: now %u\n",
66262306a36Sopenharmony_ci			dlm->name, res->lockname.len, res->lockname.name,
66362306a36Sopenharmony_ci			res->inflight_assert_workers);
66462306a36Sopenharmony_ci}
66562306a36Sopenharmony_ci
66662306a36Sopenharmony_cistatic void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
66762306a36Sopenharmony_ci		struct dlm_lock_resource *res)
66862306a36Sopenharmony_ci{
66962306a36Sopenharmony_ci	assert_spin_locked(&res->spinlock);
67062306a36Sopenharmony_ci	BUG_ON(res->inflight_assert_workers == 0);
67162306a36Sopenharmony_ci	res->inflight_assert_workers--;
67262306a36Sopenharmony_ci	mlog(0, "%s:%.*s: inflight assert worker--: now %u\n",
67362306a36Sopenharmony_ci			dlm->name, res->lockname.len, res->lockname.name,
67462306a36Sopenharmony_ci			res->inflight_assert_workers);
67562306a36Sopenharmony_ci}
67662306a36Sopenharmony_ci
67762306a36Sopenharmony_cistatic void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
67862306a36Sopenharmony_ci		struct dlm_lock_resource *res)
67962306a36Sopenharmony_ci{
68062306a36Sopenharmony_ci	spin_lock(&res->spinlock);
68162306a36Sopenharmony_ci	__dlm_lockres_drop_inflight_worker(dlm, res);
68262306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
68362306a36Sopenharmony_ci}
68462306a36Sopenharmony_ci
68562306a36Sopenharmony_ci/*
68662306a36Sopenharmony_ci * lookup a lock resource by name.
68762306a36Sopenharmony_ci * may already exist in the hashtable.
68862306a36Sopenharmony_ci * lockid is null terminated
68962306a36Sopenharmony_ci *
69062306a36Sopenharmony_ci * if not, allocate enough for the lockres and for
69162306a36Sopenharmony_ci * the temporary structure used in doing the mastering.
69262306a36Sopenharmony_ci *
69362306a36Sopenharmony_ci * also, do a lookup in the dlm->master_list to see
69462306a36Sopenharmony_ci * if another node has begun mastering the same lock.
69562306a36Sopenharmony_ci * if so, there should be a block entry in there
69662306a36Sopenharmony_ci * for this name, and we should *not* attempt to master
69762306a36Sopenharmony_ci * the lock here.   need to wait around for that node
69862306a36Sopenharmony_ci * to assert_master (or die).
69962306a36Sopenharmony_ci *
70062306a36Sopenharmony_ci */
70162306a36Sopenharmony_cistruct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
70262306a36Sopenharmony_ci					  const char *lockid,
70362306a36Sopenharmony_ci					  int namelen,
70462306a36Sopenharmony_ci					  int flags)
70562306a36Sopenharmony_ci{
70662306a36Sopenharmony_ci	struct dlm_lock_resource *tmpres=NULL, *res=NULL;
70762306a36Sopenharmony_ci	struct dlm_master_list_entry *mle = NULL;
70862306a36Sopenharmony_ci	struct dlm_master_list_entry *alloc_mle = NULL;
70962306a36Sopenharmony_ci	int blocked = 0;
71062306a36Sopenharmony_ci	int ret, nodenum;
71162306a36Sopenharmony_ci	struct dlm_node_iter iter;
71262306a36Sopenharmony_ci	unsigned int hash;
71362306a36Sopenharmony_ci	int tries = 0;
71462306a36Sopenharmony_ci	int bit, wait_on_recovery = 0;
71562306a36Sopenharmony_ci
71662306a36Sopenharmony_ci	BUG_ON(!lockid);
71762306a36Sopenharmony_ci
71862306a36Sopenharmony_ci	hash = dlm_lockid_hash(lockid, namelen);
71962306a36Sopenharmony_ci
72062306a36Sopenharmony_ci	mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
72162306a36Sopenharmony_ci
72262306a36Sopenharmony_cilookup:
72362306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
72462306a36Sopenharmony_ci	tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
72562306a36Sopenharmony_ci	if (tmpres) {
72662306a36Sopenharmony_ci		spin_unlock(&dlm->spinlock);
72762306a36Sopenharmony_ci		spin_lock(&tmpres->spinlock);
72862306a36Sopenharmony_ci
72962306a36Sopenharmony_ci		/*
73062306a36Sopenharmony_ci		 * Right after dlm spinlock was released, dlm_thread could have
73162306a36Sopenharmony_ci		 * purged the lockres. Check if lockres got unhashed. If so
73262306a36Sopenharmony_ci		 * start over.
73362306a36Sopenharmony_ci		 */
73462306a36Sopenharmony_ci		if (hlist_unhashed(&tmpres->hash_node)) {
73562306a36Sopenharmony_ci			spin_unlock(&tmpres->spinlock);
73662306a36Sopenharmony_ci			dlm_lockres_put(tmpres);
73762306a36Sopenharmony_ci			tmpres = NULL;
73862306a36Sopenharmony_ci			goto lookup;
73962306a36Sopenharmony_ci		}
74062306a36Sopenharmony_ci
74162306a36Sopenharmony_ci		/* Wait on the thread that is mastering the resource */
74262306a36Sopenharmony_ci		if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
74362306a36Sopenharmony_ci			__dlm_wait_on_lockres(tmpres);
74462306a36Sopenharmony_ci			BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
74562306a36Sopenharmony_ci			spin_unlock(&tmpres->spinlock);
74662306a36Sopenharmony_ci			dlm_lockres_put(tmpres);
74762306a36Sopenharmony_ci			tmpres = NULL;
74862306a36Sopenharmony_ci			goto lookup;
74962306a36Sopenharmony_ci		}
75062306a36Sopenharmony_ci
75162306a36Sopenharmony_ci		/* Wait on the resource purge to complete before continuing */
75262306a36Sopenharmony_ci		if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) {
75362306a36Sopenharmony_ci			BUG_ON(tmpres->owner == dlm->node_num);
75462306a36Sopenharmony_ci			__dlm_wait_on_lockres_flags(tmpres,
75562306a36Sopenharmony_ci						    DLM_LOCK_RES_DROPPING_REF);
75662306a36Sopenharmony_ci			spin_unlock(&tmpres->spinlock);
75762306a36Sopenharmony_ci			dlm_lockres_put(tmpres);
75862306a36Sopenharmony_ci			tmpres = NULL;
75962306a36Sopenharmony_ci			goto lookup;
76062306a36Sopenharmony_ci		}
76162306a36Sopenharmony_ci
76262306a36Sopenharmony_ci		/* Grab inflight ref to pin the resource */
76362306a36Sopenharmony_ci		dlm_lockres_grab_inflight_ref(dlm, tmpres);
76462306a36Sopenharmony_ci
76562306a36Sopenharmony_ci		spin_unlock(&tmpres->spinlock);
76662306a36Sopenharmony_ci		if (res) {
76762306a36Sopenharmony_ci			spin_lock(&dlm->track_lock);
76862306a36Sopenharmony_ci			if (!list_empty(&res->tracking))
76962306a36Sopenharmony_ci				list_del_init(&res->tracking);
77062306a36Sopenharmony_ci			else
77162306a36Sopenharmony_ci				mlog(ML_ERROR, "Resource %.*s not "
77262306a36Sopenharmony_ci						"on the Tracking list\n",
77362306a36Sopenharmony_ci						res->lockname.len,
77462306a36Sopenharmony_ci						res->lockname.name);
77562306a36Sopenharmony_ci			spin_unlock(&dlm->track_lock);
77662306a36Sopenharmony_ci			dlm_lockres_put(res);
77762306a36Sopenharmony_ci		}
77862306a36Sopenharmony_ci		res = tmpres;
77962306a36Sopenharmony_ci		goto leave;
78062306a36Sopenharmony_ci	}
78162306a36Sopenharmony_ci
78262306a36Sopenharmony_ci	if (!res) {
78362306a36Sopenharmony_ci		spin_unlock(&dlm->spinlock);
78462306a36Sopenharmony_ci		mlog(0, "allocating a new resource\n");
78562306a36Sopenharmony_ci		/* nothing found and we need to allocate one. */
78662306a36Sopenharmony_ci		alloc_mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
78762306a36Sopenharmony_ci		if (!alloc_mle)
78862306a36Sopenharmony_ci			goto leave;
78962306a36Sopenharmony_ci		res = dlm_new_lockres(dlm, lockid, namelen);
79062306a36Sopenharmony_ci		if (!res)
79162306a36Sopenharmony_ci			goto leave;
79262306a36Sopenharmony_ci		goto lookup;
79362306a36Sopenharmony_ci	}
79462306a36Sopenharmony_ci
79562306a36Sopenharmony_ci	mlog(0, "no lockres found, allocated our own: %p\n", res);
79662306a36Sopenharmony_ci
79762306a36Sopenharmony_ci	if (flags & LKM_LOCAL) {
79862306a36Sopenharmony_ci		/* caller knows it's safe to assume it's not mastered elsewhere
79962306a36Sopenharmony_ci		 * DONE!  return right away */
80062306a36Sopenharmony_ci		spin_lock(&res->spinlock);
80162306a36Sopenharmony_ci		dlm_change_lockres_owner(dlm, res, dlm->node_num);
80262306a36Sopenharmony_ci		__dlm_insert_lockres(dlm, res);
80362306a36Sopenharmony_ci		dlm_lockres_grab_inflight_ref(dlm, res);
80462306a36Sopenharmony_ci		spin_unlock(&res->spinlock);
80562306a36Sopenharmony_ci		spin_unlock(&dlm->spinlock);
80662306a36Sopenharmony_ci		/* lockres still marked IN_PROGRESS */
80762306a36Sopenharmony_ci		goto wake_waiters;
80862306a36Sopenharmony_ci	}
80962306a36Sopenharmony_ci
81062306a36Sopenharmony_ci	/* check master list to see if another node has started mastering it */
81162306a36Sopenharmony_ci	spin_lock(&dlm->master_lock);
81262306a36Sopenharmony_ci
81362306a36Sopenharmony_ci	/* if we found a block, wait for lock to be mastered by another node */
81462306a36Sopenharmony_ci	blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
81562306a36Sopenharmony_ci	if (blocked) {
81662306a36Sopenharmony_ci		int mig;
81762306a36Sopenharmony_ci		if (mle->type == DLM_MLE_MASTER) {
81862306a36Sopenharmony_ci			mlog(ML_ERROR, "master entry for nonexistent lock!\n");
81962306a36Sopenharmony_ci			BUG();
82062306a36Sopenharmony_ci		}
82162306a36Sopenharmony_ci		mig = (mle->type == DLM_MLE_MIGRATION);
82262306a36Sopenharmony_ci		/* if there is a migration in progress, let the migration
82362306a36Sopenharmony_ci		 * finish before continuing.  we can wait for the absence
82462306a36Sopenharmony_ci		 * of the MIGRATION mle: either the migrate finished or
82562306a36Sopenharmony_ci		 * one of the nodes died and the mle was cleaned up.
82662306a36Sopenharmony_ci		 * if there is a BLOCK here, but it already has a master
82762306a36Sopenharmony_ci		 * set, we are too late.  the master does not have a ref
82862306a36Sopenharmony_ci		 * for us in the refmap.  detach the mle and drop it.
82962306a36Sopenharmony_ci		 * either way, go back to the top and start over. */
83062306a36Sopenharmony_ci		if (mig || mle->master != O2NM_MAX_NODES) {
83162306a36Sopenharmony_ci			BUG_ON(mig && mle->master == dlm->node_num);
83262306a36Sopenharmony_ci			/* we arrived too late.  the master does not
83362306a36Sopenharmony_ci			 * have a ref for us. retry. */
83462306a36Sopenharmony_ci			mlog(0, "%s:%.*s: late on %s\n",
83562306a36Sopenharmony_ci			     dlm->name, namelen, lockid,
83662306a36Sopenharmony_ci			     mig ?  "MIGRATION" : "BLOCK");
83762306a36Sopenharmony_ci			spin_unlock(&dlm->master_lock);
83862306a36Sopenharmony_ci			spin_unlock(&dlm->spinlock);
83962306a36Sopenharmony_ci
84062306a36Sopenharmony_ci			/* master is known, detach */
84162306a36Sopenharmony_ci			if (!mig)
84262306a36Sopenharmony_ci				dlm_mle_detach_hb_events(dlm, mle);
84362306a36Sopenharmony_ci			dlm_put_mle(mle);
84462306a36Sopenharmony_ci			mle = NULL;
84562306a36Sopenharmony_ci			/* this is lame, but we can't wait on either
84662306a36Sopenharmony_ci			 * the mle or lockres waitqueue here */
84762306a36Sopenharmony_ci			if (mig)
84862306a36Sopenharmony_ci				msleep(100);
84962306a36Sopenharmony_ci			goto lookup;
85062306a36Sopenharmony_ci		}
85162306a36Sopenharmony_ci	} else {
85262306a36Sopenharmony_ci		/* go ahead and try to master lock on this node */
85362306a36Sopenharmony_ci		mle = alloc_mle;
85462306a36Sopenharmony_ci		/* make sure this does not get freed below */
85562306a36Sopenharmony_ci		alloc_mle = NULL;
85662306a36Sopenharmony_ci		dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
85762306a36Sopenharmony_ci		set_bit(dlm->node_num, mle->maybe_map);
85862306a36Sopenharmony_ci		__dlm_insert_mle(dlm, mle);
85962306a36Sopenharmony_ci
86062306a36Sopenharmony_ci		/* still holding the dlm spinlock, check the recovery map
86162306a36Sopenharmony_ci		 * to see if there are any nodes that still need to be
86262306a36Sopenharmony_ci		 * considered.  these will not appear in the mle nodemap
86362306a36Sopenharmony_ci		 * but they might own this lockres.  wait on them. */
86462306a36Sopenharmony_ci		bit = find_first_bit(dlm->recovery_map, O2NM_MAX_NODES);
86562306a36Sopenharmony_ci		if (bit < O2NM_MAX_NODES) {
86662306a36Sopenharmony_ci			mlog(0, "%s: res %.*s, At least one node (%d) "
86762306a36Sopenharmony_ci			     "to recover before lock mastery can begin\n",
86862306a36Sopenharmony_ci			     dlm->name, namelen, (char *)lockid, bit);
86962306a36Sopenharmony_ci			wait_on_recovery = 1;
87062306a36Sopenharmony_ci		}
87162306a36Sopenharmony_ci	}
87262306a36Sopenharmony_ci
87362306a36Sopenharmony_ci	/* at this point there is either a DLM_MLE_BLOCK or a
87462306a36Sopenharmony_ci	 * DLM_MLE_MASTER on the master list, so it's safe to add the
87562306a36Sopenharmony_ci	 * lockres to the hashtable.  anyone who finds the lock will
87662306a36Sopenharmony_ci	 * still have to wait on the IN_PROGRESS. */
87762306a36Sopenharmony_ci
87862306a36Sopenharmony_ci	/* finally add the lockres to its hash bucket */
87962306a36Sopenharmony_ci	__dlm_insert_lockres(dlm, res);
88062306a36Sopenharmony_ci
88162306a36Sopenharmony_ci	/* since this lockres is new it doesn't not require the spinlock */
88262306a36Sopenharmony_ci	__dlm_lockres_grab_inflight_ref(dlm, res);
88362306a36Sopenharmony_ci
88462306a36Sopenharmony_ci	/* get an extra ref on the mle in case this is a BLOCK
88562306a36Sopenharmony_ci	 * if so, the creator of the BLOCK may try to put the last
88662306a36Sopenharmony_ci	 * ref at this time in the assert master handler, so we
88762306a36Sopenharmony_ci	 * need an extra one to keep from a bad ptr deref. */
88862306a36Sopenharmony_ci	dlm_get_mle_inuse(mle);
88962306a36Sopenharmony_ci	spin_unlock(&dlm->master_lock);
89062306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
89162306a36Sopenharmony_ci
89262306a36Sopenharmony_ciredo_request:
89362306a36Sopenharmony_ci	while (wait_on_recovery) {
89462306a36Sopenharmony_ci		/* any cluster changes that occurred after dropping the
89562306a36Sopenharmony_ci		 * dlm spinlock would be detectable be a change on the mle,
89662306a36Sopenharmony_ci		 * so we only need to clear out the recovery map once. */
89762306a36Sopenharmony_ci		if (dlm_is_recovery_lock(lockid, namelen)) {
89862306a36Sopenharmony_ci			mlog(0, "%s: Recovery map is not empty, but must "
89962306a36Sopenharmony_ci			     "master $RECOVERY lock now\n", dlm->name);
90062306a36Sopenharmony_ci			if (!dlm_pre_master_reco_lockres(dlm, res))
90162306a36Sopenharmony_ci				wait_on_recovery = 0;
90262306a36Sopenharmony_ci			else {
90362306a36Sopenharmony_ci				mlog(0, "%s: waiting 500ms for heartbeat state "
90462306a36Sopenharmony_ci				    "change\n", dlm->name);
90562306a36Sopenharmony_ci				msleep(500);
90662306a36Sopenharmony_ci			}
90762306a36Sopenharmony_ci			continue;
90862306a36Sopenharmony_ci		}
90962306a36Sopenharmony_ci
91062306a36Sopenharmony_ci		dlm_kick_recovery_thread(dlm);
91162306a36Sopenharmony_ci		msleep(1000);
91262306a36Sopenharmony_ci		dlm_wait_for_recovery(dlm);
91362306a36Sopenharmony_ci
91462306a36Sopenharmony_ci		spin_lock(&dlm->spinlock);
91562306a36Sopenharmony_ci		bit = find_first_bit(dlm->recovery_map, O2NM_MAX_NODES);
91662306a36Sopenharmony_ci		if (bit < O2NM_MAX_NODES) {
91762306a36Sopenharmony_ci			mlog(0, "%s: res %.*s, At least one node (%d) "
91862306a36Sopenharmony_ci			     "to recover before lock mastery can begin\n",
91962306a36Sopenharmony_ci			     dlm->name, namelen, (char *)lockid, bit);
92062306a36Sopenharmony_ci			wait_on_recovery = 1;
92162306a36Sopenharmony_ci		} else
92262306a36Sopenharmony_ci			wait_on_recovery = 0;
92362306a36Sopenharmony_ci		spin_unlock(&dlm->spinlock);
92462306a36Sopenharmony_ci
92562306a36Sopenharmony_ci		if (wait_on_recovery)
92662306a36Sopenharmony_ci			dlm_wait_for_node_recovery(dlm, bit, 10000);
92762306a36Sopenharmony_ci	}
92862306a36Sopenharmony_ci
92962306a36Sopenharmony_ci	/* must wait for lock to be mastered elsewhere */
93062306a36Sopenharmony_ci	if (blocked)
93162306a36Sopenharmony_ci		goto wait;
93262306a36Sopenharmony_ci
93362306a36Sopenharmony_ci	ret = -EINVAL;
93462306a36Sopenharmony_ci	dlm_node_iter_init(mle->vote_map, &iter);
93562306a36Sopenharmony_ci	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
93662306a36Sopenharmony_ci		ret = dlm_do_master_request(res, mle, nodenum);
93762306a36Sopenharmony_ci		if (ret < 0)
93862306a36Sopenharmony_ci			mlog_errno(ret);
93962306a36Sopenharmony_ci		if (mle->master != O2NM_MAX_NODES) {
94062306a36Sopenharmony_ci			/* found a master ! */
94162306a36Sopenharmony_ci			if (mle->master <= nodenum)
94262306a36Sopenharmony_ci				break;
94362306a36Sopenharmony_ci			/* if our master request has not reached the master
94462306a36Sopenharmony_ci			 * yet, keep going until it does.  this is how the
94562306a36Sopenharmony_ci			 * master will know that asserts are needed back to
94662306a36Sopenharmony_ci			 * the lower nodes. */
94762306a36Sopenharmony_ci			mlog(0, "%s: res %.*s, Requests only up to %u but "
94862306a36Sopenharmony_ci			     "master is %u, keep going\n", dlm->name, namelen,
94962306a36Sopenharmony_ci			     lockid, nodenum, mle->master);
95062306a36Sopenharmony_ci		}
95162306a36Sopenharmony_ci	}
95262306a36Sopenharmony_ci
95362306a36Sopenharmony_ciwait:
95462306a36Sopenharmony_ci	/* keep going until the response map includes all nodes */
95562306a36Sopenharmony_ci	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
95662306a36Sopenharmony_ci	if (ret < 0) {
95762306a36Sopenharmony_ci		wait_on_recovery = 1;
95862306a36Sopenharmony_ci		mlog(0, "%s: res %.*s, Node map changed, redo the master "
95962306a36Sopenharmony_ci		     "request now, blocked=%d\n", dlm->name, res->lockname.len,
96062306a36Sopenharmony_ci		     res->lockname.name, blocked);
96162306a36Sopenharmony_ci		if (++tries > 20) {
96262306a36Sopenharmony_ci			mlog(ML_ERROR, "%s: res %.*s, Spinning on "
96362306a36Sopenharmony_ci			     "dlm_wait_for_lock_mastery, blocked = %d\n",
96462306a36Sopenharmony_ci			     dlm->name, res->lockname.len,
96562306a36Sopenharmony_ci			     res->lockname.name, blocked);
96662306a36Sopenharmony_ci			dlm_print_one_lock_resource(res);
96762306a36Sopenharmony_ci			dlm_print_one_mle(mle);
96862306a36Sopenharmony_ci			tries = 0;
96962306a36Sopenharmony_ci		}
97062306a36Sopenharmony_ci		goto redo_request;
97162306a36Sopenharmony_ci	}
97262306a36Sopenharmony_ci
97362306a36Sopenharmony_ci	mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
97462306a36Sopenharmony_ci	     res->lockname.name, res->owner);
97562306a36Sopenharmony_ci	/* make sure we never continue without this */
97662306a36Sopenharmony_ci	BUG_ON(res->owner == O2NM_MAX_NODES);
97762306a36Sopenharmony_ci
97862306a36Sopenharmony_ci	/* master is known, detach if not already detached */
97962306a36Sopenharmony_ci	dlm_mle_detach_hb_events(dlm, mle);
98062306a36Sopenharmony_ci	dlm_put_mle(mle);
98162306a36Sopenharmony_ci	/* put the extra ref */
98262306a36Sopenharmony_ci	dlm_put_mle_inuse(mle);
98362306a36Sopenharmony_ci
98462306a36Sopenharmony_ciwake_waiters:
98562306a36Sopenharmony_ci	spin_lock(&res->spinlock);
98662306a36Sopenharmony_ci	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
98762306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
98862306a36Sopenharmony_ci	wake_up(&res->wq);
98962306a36Sopenharmony_ci
99062306a36Sopenharmony_cileave:
99162306a36Sopenharmony_ci	/* need to free the unused mle */
99262306a36Sopenharmony_ci	if (alloc_mle)
99362306a36Sopenharmony_ci		kmem_cache_free(dlm_mle_cache, alloc_mle);
99462306a36Sopenharmony_ci
99562306a36Sopenharmony_ci	return res;
99662306a36Sopenharmony_ci}
99762306a36Sopenharmony_ci
99862306a36Sopenharmony_ci
99962306a36Sopenharmony_ci#define DLM_MASTERY_TIMEOUT_MS   5000
100062306a36Sopenharmony_ci
100162306a36Sopenharmony_cistatic int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
100262306a36Sopenharmony_ci				     struct dlm_lock_resource *res,
100362306a36Sopenharmony_ci				     struct dlm_master_list_entry *mle,
100462306a36Sopenharmony_ci				     int *blocked)
100562306a36Sopenharmony_ci{
100662306a36Sopenharmony_ci	u8 m;
100762306a36Sopenharmony_ci	int ret, bit;
100862306a36Sopenharmony_ci	int map_changed, voting_done;
100962306a36Sopenharmony_ci	int assert, sleep;
101062306a36Sopenharmony_ci
101162306a36Sopenharmony_cirecheck:
101262306a36Sopenharmony_ci	ret = 0;
101362306a36Sopenharmony_ci	assert = 0;
101462306a36Sopenharmony_ci
101562306a36Sopenharmony_ci	/* check if another node has already become the owner */
101662306a36Sopenharmony_ci	spin_lock(&res->spinlock);
101762306a36Sopenharmony_ci	if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
101862306a36Sopenharmony_ci		mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
101962306a36Sopenharmony_ci		     res->lockname.len, res->lockname.name, res->owner);
102062306a36Sopenharmony_ci		spin_unlock(&res->spinlock);
102162306a36Sopenharmony_ci		/* this will cause the master to re-assert across
102262306a36Sopenharmony_ci		 * the whole cluster, freeing up mles */
102362306a36Sopenharmony_ci		if (res->owner != dlm->node_num) {
102462306a36Sopenharmony_ci			ret = dlm_do_master_request(res, mle, res->owner);
102562306a36Sopenharmony_ci			if (ret < 0) {
102662306a36Sopenharmony_ci				/* give recovery a chance to run */
102762306a36Sopenharmony_ci				mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
102862306a36Sopenharmony_ci				msleep(500);
102962306a36Sopenharmony_ci				goto recheck;
103062306a36Sopenharmony_ci			}
103162306a36Sopenharmony_ci		}
103262306a36Sopenharmony_ci		ret = 0;
103362306a36Sopenharmony_ci		goto leave;
103462306a36Sopenharmony_ci	}
103562306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
103662306a36Sopenharmony_ci
103762306a36Sopenharmony_ci	spin_lock(&mle->spinlock);
103862306a36Sopenharmony_ci	m = mle->master;
103962306a36Sopenharmony_ci	map_changed = !bitmap_equal(mle->vote_map, mle->node_map,
104062306a36Sopenharmony_ci				    O2NM_MAX_NODES);
104162306a36Sopenharmony_ci	voting_done = bitmap_equal(mle->vote_map, mle->response_map,
104262306a36Sopenharmony_ci				   O2NM_MAX_NODES);
104362306a36Sopenharmony_ci
104462306a36Sopenharmony_ci	/* restart if we hit any errors */
104562306a36Sopenharmony_ci	if (map_changed) {
104662306a36Sopenharmony_ci		int b;
104762306a36Sopenharmony_ci		mlog(0, "%s: %.*s: node map changed, restarting\n",
104862306a36Sopenharmony_ci		     dlm->name, res->lockname.len, res->lockname.name);
104962306a36Sopenharmony_ci		ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
105062306a36Sopenharmony_ci		b = (mle->type == DLM_MLE_BLOCK);
105162306a36Sopenharmony_ci		if ((*blocked && !b) || (!*blocked && b)) {
105262306a36Sopenharmony_ci			mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
105362306a36Sopenharmony_ci			     dlm->name, res->lockname.len, res->lockname.name,
105462306a36Sopenharmony_ci			     *blocked, b);
105562306a36Sopenharmony_ci			*blocked = b;
105662306a36Sopenharmony_ci		}
105762306a36Sopenharmony_ci		spin_unlock(&mle->spinlock);
105862306a36Sopenharmony_ci		if (ret < 0) {
105962306a36Sopenharmony_ci			mlog_errno(ret);
106062306a36Sopenharmony_ci			goto leave;
106162306a36Sopenharmony_ci		}
106262306a36Sopenharmony_ci		mlog(0, "%s:%.*s: restart lock mastery succeeded, "
106362306a36Sopenharmony_ci		     "rechecking now\n", dlm->name, res->lockname.len,
106462306a36Sopenharmony_ci		     res->lockname.name);
106562306a36Sopenharmony_ci		goto recheck;
106662306a36Sopenharmony_ci	} else {
106762306a36Sopenharmony_ci		if (!voting_done) {
106862306a36Sopenharmony_ci			mlog(0, "map not changed and voting not done "
106962306a36Sopenharmony_ci			     "for %s:%.*s\n", dlm->name, res->lockname.len,
107062306a36Sopenharmony_ci			     res->lockname.name);
107162306a36Sopenharmony_ci		}
107262306a36Sopenharmony_ci	}
107362306a36Sopenharmony_ci
107462306a36Sopenharmony_ci	if (m != O2NM_MAX_NODES) {
107562306a36Sopenharmony_ci		/* another node has done an assert!
107662306a36Sopenharmony_ci		 * all done! */
107762306a36Sopenharmony_ci		sleep = 0;
107862306a36Sopenharmony_ci	} else {
107962306a36Sopenharmony_ci		sleep = 1;
108062306a36Sopenharmony_ci		/* have all nodes responded? */
108162306a36Sopenharmony_ci		if (voting_done && !*blocked) {
108262306a36Sopenharmony_ci			bit = find_first_bit(mle->maybe_map, O2NM_MAX_NODES);
108362306a36Sopenharmony_ci			if (dlm->node_num <= bit) {
108462306a36Sopenharmony_ci				/* my node number is lowest.
108562306a36Sopenharmony_ci			 	 * now tell other nodes that I am
108662306a36Sopenharmony_ci				 * mastering this. */
108762306a36Sopenharmony_ci				mle->master = dlm->node_num;
108862306a36Sopenharmony_ci				/* ref was grabbed in get_lock_resource
108962306a36Sopenharmony_ci				 * will be dropped in dlmlock_master */
109062306a36Sopenharmony_ci				assert = 1;
109162306a36Sopenharmony_ci				sleep = 0;
109262306a36Sopenharmony_ci			}
109362306a36Sopenharmony_ci			/* if voting is done, but we have not received
109462306a36Sopenharmony_ci			 * an assert master yet, we must sleep */
109562306a36Sopenharmony_ci		}
109662306a36Sopenharmony_ci	}
109762306a36Sopenharmony_ci
109862306a36Sopenharmony_ci	spin_unlock(&mle->spinlock);
109962306a36Sopenharmony_ci
110062306a36Sopenharmony_ci	/* sleep if we haven't finished voting yet */
110162306a36Sopenharmony_ci	if (sleep) {
110262306a36Sopenharmony_ci		unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
110362306a36Sopenharmony_ci		atomic_set(&mle->woken, 0);
110462306a36Sopenharmony_ci		(void)wait_event_timeout(mle->wq,
110562306a36Sopenharmony_ci					 (atomic_read(&mle->woken) == 1),
110662306a36Sopenharmony_ci					 timeo);
110762306a36Sopenharmony_ci		if (res->owner == O2NM_MAX_NODES) {
110862306a36Sopenharmony_ci			mlog(0, "%s:%.*s: waiting again\n", dlm->name,
110962306a36Sopenharmony_ci			     res->lockname.len, res->lockname.name);
111062306a36Sopenharmony_ci			goto recheck;
111162306a36Sopenharmony_ci		}
111262306a36Sopenharmony_ci		mlog(0, "done waiting, master is %u\n", res->owner);
111362306a36Sopenharmony_ci		ret = 0;
111462306a36Sopenharmony_ci		goto leave;
111562306a36Sopenharmony_ci	}
111662306a36Sopenharmony_ci
111762306a36Sopenharmony_ci	ret = 0;   /* done */
111862306a36Sopenharmony_ci	if (assert) {
111962306a36Sopenharmony_ci		m = dlm->node_num;
112062306a36Sopenharmony_ci		mlog(0, "about to master %.*s here, this=%u\n",
112162306a36Sopenharmony_ci		     res->lockname.len, res->lockname.name, m);
112262306a36Sopenharmony_ci		ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
112362306a36Sopenharmony_ci		if (ret) {
112462306a36Sopenharmony_ci			/* This is a failure in the network path,
112562306a36Sopenharmony_ci			 * not in the response to the assert_master
112662306a36Sopenharmony_ci			 * (any nonzero response is a BUG on this node).
112762306a36Sopenharmony_ci			 * Most likely a socket just got disconnected
112862306a36Sopenharmony_ci			 * due to node death. */
112962306a36Sopenharmony_ci			mlog_errno(ret);
113062306a36Sopenharmony_ci		}
113162306a36Sopenharmony_ci		/* no longer need to restart lock mastery.
113262306a36Sopenharmony_ci		 * all living nodes have been contacted. */
113362306a36Sopenharmony_ci		ret = 0;
113462306a36Sopenharmony_ci	}
113562306a36Sopenharmony_ci
113662306a36Sopenharmony_ci	/* set the lockres owner */
113762306a36Sopenharmony_ci	spin_lock(&res->spinlock);
113862306a36Sopenharmony_ci	/* mastery reference obtained either during
113962306a36Sopenharmony_ci	 * assert_master_handler or in get_lock_resource */
114062306a36Sopenharmony_ci	dlm_change_lockres_owner(dlm, res, m);
114162306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
114262306a36Sopenharmony_ci
114362306a36Sopenharmony_cileave:
114462306a36Sopenharmony_ci	return ret;
114562306a36Sopenharmony_ci}
114662306a36Sopenharmony_ci
114762306a36Sopenharmony_cistruct dlm_bitmap_diff_iter
114862306a36Sopenharmony_ci{
114962306a36Sopenharmony_ci	int curnode;
115062306a36Sopenharmony_ci	unsigned long *orig_bm;
115162306a36Sopenharmony_ci	unsigned long *cur_bm;
115262306a36Sopenharmony_ci	unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
115362306a36Sopenharmony_ci};
115462306a36Sopenharmony_ci
115562306a36Sopenharmony_cienum dlm_node_state_change
115662306a36Sopenharmony_ci{
115762306a36Sopenharmony_ci	NODE_DOWN = -1,
115862306a36Sopenharmony_ci	NODE_NO_CHANGE = 0,
115962306a36Sopenharmony_ci	NODE_UP
116062306a36Sopenharmony_ci};
116162306a36Sopenharmony_ci
116262306a36Sopenharmony_cistatic void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
116362306a36Sopenharmony_ci				      unsigned long *orig_bm,
116462306a36Sopenharmony_ci				      unsigned long *cur_bm)
116562306a36Sopenharmony_ci{
116662306a36Sopenharmony_ci	unsigned long p1, p2;
116762306a36Sopenharmony_ci	int i;
116862306a36Sopenharmony_ci
116962306a36Sopenharmony_ci	iter->curnode = -1;
117062306a36Sopenharmony_ci	iter->orig_bm = orig_bm;
117162306a36Sopenharmony_ci	iter->cur_bm = cur_bm;
117262306a36Sopenharmony_ci
117362306a36Sopenharmony_ci	for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
117462306a36Sopenharmony_ci       		p1 = *(iter->orig_bm + i);
117562306a36Sopenharmony_ci	       	p2 = *(iter->cur_bm + i);
117662306a36Sopenharmony_ci		iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
117762306a36Sopenharmony_ci	}
117862306a36Sopenharmony_ci}
117962306a36Sopenharmony_ci
118062306a36Sopenharmony_cistatic int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
118162306a36Sopenharmony_ci				     enum dlm_node_state_change *state)
118262306a36Sopenharmony_ci{
118362306a36Sopenharmony_ci	int bit;
118462306a36Sopenharmony_ci
118562306a36Sopenharmony_ci	if (iter->curnode >= O2NM_MAX_NODES)
118662306a36Sopenharmony_ci		return -ENOENT;
118762306a36Sopenharmony_ci
118862306a36Sopenharmony_ci	bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
118962306a36Sopenharmony_ci			    iter->curnode+1);
119062306a36Sopenharmony_ci	if (bit >= O2NM_MAX_NODES) {
119162306a36Sopenharmony_ci		iter->curnode = O2NM_MAX_NODES;
119262306a36Sopenharmony_ci		return -ENOENT;
119362306a36Sopenharmony_ci	}
119462306a36Sopenharmony_ci
119562306a36Sopenharmony_ci	/* if it was there in the original then this node died */
119662306a36Sopenharmony_ci	if (test_bit(bit, iter->orig_bm))
119762306a36Sopenharmony_ci		*state = NODE_DOWN;
119862306a36Sopenharmony_ci	else
119962306a36Sopenharmony_ci		*state = NODE_UP;
120062306a36Sopenharmony_ci
120162306a36Sopenharmony_ci	iter->curnode = bit;
120262306a36Sopenharmony_ci	return bit;
120362306a36Sopenharmony_ci}
120462306a36Sopenharmony_ci
120562306a36Sopenharmony_ci
120662306a36Sopenharmony_cistatic int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
120762306a36Sopenharmony_ci				    struct dlm_lock_resource *res,
120862306a36Sopenharmony_ci				    struct dlm_master_list_entry *mle,
120962306a36Sopenharmony_ci				    int blocked)
121062306a36Sopenharmony_ci{
121162306a36Sopenharmony_ci	struct dlm_bitmap_diff_iter bdi;
121262306a36Sopenharmony_ci	enum dlm_node_state_change sc;
121362306a36Sopenharmony_ci	int node;
121462306a36Sopenharmony_ci	int ret = 0;
121562306a36Sopenharmony_ci
121662306a36Sopenharmony_ci	mlog(0, "something happened such that the "
121762306a36Sopenharmony_ci	     "master process may need to be restarted!\n");
121862306a36Sopenharmony_ci
121962306a36Sopenharmony_ci	assert_spin_locked(&mle->spinlock);
122062306a36Sopenharmony_ci
122162306a36Sopenharmony_ci	dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
122262306a36Sopenharmony_ci	node = dlm_bitmap_diff_iter_next(&bdi, &sc);
122362306a36Sopenharmony_ci	while (node >= 0) {
122462306a36Sopenharmony_ci		if (sc == NODE_UP) {
122562306a36Sopenharmony_ci			/* a node came up.  clear any old vote from
122662306a36Sopenharmony_ci			 * the response map and set it in the vote map
122762306a36Sopenharmony_ci			 * then restart the mastery. */
122862306a36Sopenharmony_ci			mlog(ML_NOTICE, "node %d up while restarting\n", node);
122962306a36Sopenharmony_ci
123062306a36Sopenharmony_ci			/* redo the master request, but only for the new node */
123162306a36Sopenharmony_ci			mlog(0, "sending request to new node\n");
123262306a36Sopenharmony_ci			clear_bit(node, mle->response_map);
123362306a36Sopenharmony_ci			set_bit(node, mle->vote_map);
123462306a36Sopenharmony_ci		} else {
123562306a36Sopenharmony_ci			mlog(ML_ERROR, "node down! %d\n", node);
123662306a36Sopenharmony_ci			if (blocked) {
123762306a36Sopenharmony_ci				int lowest = find_first_bit(mle->maybe_map,
123862306a36Sopenharmony_ci						       O2NM_MAX_NODES);
123962306a36Sopenharmony_ci
124062306a36Sopenharmony_ci				/* act like it was never there */
124162306a36Sopenharmony_ci				clear_bit(node, mle->maybe_map);
124262306a36Sopenharmony_ci
124362306a36Sopenharmony_ci			       	if (node == lowest) {
124462306a36Sopenharmony_ci					mlog(0, "expected master %u died"
124562306a36Sopenharmony_ci					    " while this node was blocked "
124662306a36Sopenharmony_ci					    "waiting on it!\n", node);
124762306a36Sopenharmony_ci					lowest = find_next_bit(mle->maybe_map,
124862306a36Sopenharmony_ci						       	O2NM_MAX_NODES,
124962306a36Sopenharmony_ci						       	lowest+1);
125062306a36Sopenharmony_ci					if (lowest < O2NM_MAX_NODES) {
125162306a36Sopenharmony_ci						mlog(0, "%s:%.*s:still "
125262306a36Sopenharmony_ci						     "blocked. waiting on %u "
125362306a36Sopenharmony_ci						     "now\n", dlm->name,
125462306a36Sopenharmony_ci						     res->lockname.len,
125562306a36Sopenharmony_ci						     res->lockname.name,
125662306a36Sopenharmony_ci						     lowest);
125762306a36Sopenharmony_ci					} else {
125862306a36Sopenharmony_ci						/* mle is an MLE_BLOCK, but
125962306a36Sopenharmony_ci						 * there is now nothing left to
126062306a36Sopenharmony_ci						 * block on.  we need to return
126162306a36Sopenharmony_ci						 * all the way back out and try
126262306a36Sopenharmony_ci						 * again with an MLE_MASTER.
126362306a36Sopenharmony_ci						 * dlm_do_local_recovery_cleanup
126462306a36Sopenharmony_ci						 * has already run, so the mle
126562306a36Sopenharmony_ci						 * refcount is ok */
126662306a36Sopenharmony_ci						mlog(0, "%s:%.*s: no "
126762306a36Sopenharmony_ci						     "longer blocking. try to "
126862306a36Sopenharmony_ci						     "master this here\n",
126962306a36Sopenharmony_ci						     dlm->name,
127062306a36Sopenharmony_ci						     res->lockname.len,
127162306a36Sopenharmony_ci						     res->lockname.name);
127262306a36Sopenharmony_ci						mle->type = DLM_MLE_MASTER;
127362306a36Sopenharmony_ci						mle->mleres = res;
127462306a36Sopenharmony_ci					}
127562306a36Sopenharmony_ci				}
127662306a36Sopenharmony_ci			}
127762306a36Sopenharmony_ci
127862306a36Sopenharmony_ci			/* now blank out everything, as if we had never
127962306a36Sopenharmony_ci			 * contacted anyone */
128062306a36Sopenharmony_ci			bitmap_zero(mle->maybe_map, O2NM_MAX_NODES);
128162306a36Sopenharmony_ci			bitmap_zero(mle->response_map, O2NM_MAX_NODES);
128262306a36Sopenharmony_ci			/* reset the vote_map to the current node_map */
128362306a36Sopenharmony_ci			bitmap_copy(mle->vote_map, mle->node_map,
128462306a36Sopenharmony_ci				    O2NM_MAX_NODES);
128562306a36Sopenharmony_ci			/* put myself into the maybe map */
128662306a36Sopenharmony_ci			if (mle->type != DLM_MLE_BLOCK)
128762306a36Sopenharmony_ci				set_bit(dlm->node_num, mle->maybe_map);
128862306a36Sopenharmony_ci		}
128962306a36Sopenharmony_ci		ret = -EAGAIN;
129062306a36Sopenharmony_ci		node = dlm_bitmap_diff_iter_next(&bdi, &sc);
129162306a36Sopenharmony_ci	}
129262306a36Sopenharmony_ci	return ret;
129362306a36Sopenharmony_ci}
129462306a36Sopenharmony_ci
129562306a36Sopenharmony_ci
129662306a36Sopenharmony_ci/*
129762306a36Sopenharmony_ci * DLM_MASTER_REQUEST_MSG
129862306a36Sopenharmony_ci *
129962306a36Sopenharmony_ci * returns: 0 on success,
130062306a36Sopenharmony_ci *          -errno on a network error
130162306a36Sopenharmony_ci *
130262306a36Sopenharmony_ci * on error, the caller should assume the target node is "dead"
130362306a36Sopenharmony_ci *
130462306a36Sopenharmony_ci */
130562306a36Sopenharmony_ci
130662306a36Sopenharmony_cistatic int dlm_do_master_request(struct dlm_lock_resource *res,
130762306a36Sopenharmony_ci				 struct dlm_master_list_entry *mle, int to)
130862306a36Sopenharmony_ci{
130962306a36Sopenharmony_ci	struct dlm_ctxt *dlm = mle->dlm;
131062306a36Sopenharmony_ci	struct dlm_master_request request;
131162306a36Sopenharmony_ci	int ret, response=0, resend;
131262306a36Sopenharmony_ci
131362306a36Sopenharmony_ci	memset(&request, 0, sizeof(request));
131462306a36Sopenharmony_ci	request.node_idx = dlm->node_num;
131562306a36Sopenharmony_ci
131662306a36Sopenharmony_ci	BUG_ON(mle->type == DLM_MLE_MIGRATION);
131762306a36Sopenharmony_ci
131862306a36Sopenharmony_ci	request.namelen = (u8)mle->mnamelen;
131962306a36Sopenharmony_ci	memcpy(request.name, mle->mname, request.namelen);
132062306a36Sopenharmony_ci
132162306a36Sopenharmony_ciagain:
132262306a36Sopenharmony_ci	ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
132362306a36Sopenharmony_ci				 sizeof(request), to, &response);
132462306a36Sopenharmony_ci	if (ret < 0)  {
132562306a36Sopenharmony_ci		if (ret == -ESRCH) {
132662306a36Sopenharmony_ci			/* should never happen */
132762306a36Sopenharmony_ci			mlog(ML_ERROR, "TCP stack not ready!\n");
132862306a36Sopenharmony_ci			BUG();
132962306a36Sopenharmony_ci		} else if (ret == -EINVAL) {
133062306a36Sopenharmony_ci			mlog(ML_ERROR, "bad args passed to o2net!\n");
133162306a36Sopenharmony_ci			BUG();
133262306a36Sopenharmony_ci		} else if (ret == -ENOMEM) {
133362306a36Sopenharmony_ci			mlog(ML_ERROR, "out of memory while trying to send "
133462306a36Sopenharmony_ci			     "network message!  retrying\n");
133562306a36Sopenharmony_ci			/* this is totally crude */
133662306a36Sopenharmony_ci			msleep(50);
133762306a36Sopenharmony_ci			goto again;
133862306a36Sopenharmony_ci		} else if (!dlm_is_host_down(ret)) {
133962306a36Sopenharmony_ci			/* not a network error. bad. */
134062306a36Sopenharmony_ci			mlog_errno(ret);
134162306a36Sopenharmony_ci			mlog(ML_ERROR, "unhandled error!");
134262306a36Sopenharmony_ci			BUG();
134362306a36Sopenharmony_ci		}
134462306a36Sopenharmony_ci		/* all other errors should be network errors,
134562306a36Sopenharmony_ci		 * and likely indicate node death */
134662306a36Sopenharmony_ci		mlog(ML_ERROR, "link to %d went down!\n", to);
134762306a36Sopenharmony_ci		goto out;
134862306a36Sopenharmony_ci	}
134962306a36Sopenharmony_ci
135062306a36Sopenharmony_ci	ret = 0;
135162306a36Sopenharmony_ci	resend = 0;
135262306a36Sopenharmony_ci	spin_lock(&mle->spinlock);
135362306a36Sopenharmony_ci	switch (response) {
135462306a36Sopenharmony_ci		case DLM_MASTER_RESP_YES:
135562306a36Sopenharmony_ci			set_bit(to, mle->response_map);
135662306a36Sopenharmony_ci			mlog(0, "node %u is the master, response=YES\n", to);
135762306a36Sopenharmony_ci			mlog(0, "%s:%.*s: master node %u now knows I have a "
135862306a36Sopenharmony_ci			     "reference\n", dlm->name, res->lockname.len,
135962306a36Sopenharmony_ci			     res->lockname.name, to);
136062306a36Sopenharmony_ci			mle->master = to;
136162306a36Sopenharmony_ci			break;
136262306a36Sopenharmony_ci		case DLM_MASTER_RESP_NO:
136362306a36Sopenharmony_ci			mlog(0, "node %u not master, response=NO\n", to);
136462306a36Sopenharmony_ci			set_bit(to, mle->response_map);
136562306a36Sopenharmony_ci			break;
136662306a36Sopenharmony_ci		case DLM_MASTER_RESP_MAYBE:
136762306a36Sopenharmony_ci			mlog(0, "node %u not master, response=MAYBE\n", to);
136862306a36Sopenharmony_ci			set_bit(to, mle->response_map);
136962306a36Sopenharmony_ci			set_bit(to, mle->maybe_map);
137062306a36Sopenharmony_ci			break;
137162306a36Sopenharmony_ci		case DLM_MASTER_RESP_ERROR:
137262306a36Sopenharmony_ci			mlog(0, "node %u hit an error, resending\n", to);
137362306a36Sopenharmony_ci			resend = 1;
137462306a36Sopenharmony_ci			response = 0;
137562306a36Sopenharmony_ci			break;
137662306a36Sopenharmony_ci		default:
137762306a36Sopenharmony_ci			mlog(ML_ERROR, "bad response! %u\n", response);
137862306a36Sopenharmony_ci			BUG();
137962306a36Sopenharmony_ci	}
138062306a36Sopenharmony_ci	spin_unlock(&mle->spinlock);
138162306a36Sopenharmony_ci	if (resend) {
138262306a36Sopenharmony_ci		/* this is also totally crude */
138362306a36Sopenharmony_ci		msleep(50);
138462306a36Sopenharmony_ci		goto again;
138562306a36Sopenharmony_ci	}
138662306a36Sopenharmony_ci
138762306a36Sopenharmony_ciout:
138862306a36Sopenharmony_ci	return ret;
138962306a36Sopenharmony_ci}
139062306a36Sopenharmony_ci
139162306a36Sopenharmony_ci/*
139262306a36Sopenharmony_ci * locks that can be taken here:
139362306a36Sopenharmony_ci * dlm->spinlock
139462306a36Sopenharmony_ci * res->spinlock
139562306a36Sopenharmony_ci * mle->spinlock
139662306a36Sopenharmony_ci * dlm->master_list
139762306a36Sopenharmony_ci *
139862306a36Sopenharmony_ci * if possible, TRIM THIS DOWN!!!
139962306a36Sopenharmony_ci */
140062306a36Sopenharmony_ciint dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
140162306a36Sopenharmony_ci			       void **ret_data)
140262306a36Sopenharmony_ci{
140362306a36Sopenharmony_ci	u8 response = DLM_MASTER_RESP_MAYBE;
140462306a36Sopenharmony_ci	struct dlm_ctxt *dlm = data;
140562306a36Sopenharmony_ci	struct dlm_lock_resource *res = NULL;
140662306a36Sopenharmony_ci	struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
140762306a36Sopenharmony_ci	struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
140862306a36Sopenharmony_ci	char *name;
140962306a36Sopenharmony_ci	unsigned int namelen, hash;
141062306a36Sopenharmony_ci	int found, ret;
141162306a36Sopenharmony_ci	int set_maybe;
141262306a36Sopenharmony_ci	int dispatch_assert = 0;
141362306a36Sopenharmony_ci	int dispatched = 0;
141462306a36Sopenharmony_ci
141562306a36Sopenharmony_ci	if (!dlm_grab(dlm))
141662306a36Sopenharmony_ci		return DLM_MASTER_RESP_NO;
141762306a36Sopenharmony_ci
141862306a36Sopenharmony_ci	if (!dlm_domain_fully_joined(dlm)) {
141962306a36Sopenharmony_ci		response = DLM_MASTER_RESP_NO;
142062306a36Sopenharmony_ci		goto send_response;
142162306a36Sopenharmony_ci	}
142262306a36Sopenharmony_ci
142362306a36Sopenharmony_ci	name = request->name;
142462306a36Sopenharmony_ci	namelen = request->namelen;
142562306a36Sopenharmony_ci	hash = dlm_lockid_hash(name, namelen);
142662306a36Sopenharmony_ci
142762306a36Sopenharmony_ci	if (namelen > DLM_LOCKID_NAME_MAX) {
142862306a36Sopenharmony_ci		response = DLM_IVBUFLEN;
142962306a36Sopenharmony_ci		goto send_response;
143062306a36Sopenharmony_ci	}
143162306a36Sopenharmony_ci
143262306a36Sopenharmony_ciway_up_top:
143362306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
143462306a36Sopenharmony_ci	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
143562306a36Sopenharmony_ci	if (res) {
143662306a36Sopenharmony_ci		spin_unlock(&dlm->spinlock);
143762306a36Sopenharmony_ci
143862306a36Sopenharmony_ci		/* take care of the easy cases up front */
143962306a36Sopenharmony_ci		spin_lock(&res->spinlock);
144062306a36Sopenharmony_ci
144162306a36Sopenharmony_ci		/*
144262306a36Sopenharmony_ci		 * Right after dlm spinlock was released, dlm_thread could have
144362306a36Sopenharmony_ci		 * purged the lockres. Check if lockres got unhashed. If so
144462306a36Sopenharmony_ci		 * start over.
144562306a36Sopenharmony_ci		 */
144662306a36Sopenharmony_ci		if (hlist_unhashed(&res->hash_node)) {
144762306a36Sopenharmony_ci			spin_unlock(&res->spinlock);
144862306a36Sopenharmony_ci			dlm_lockres_put(res);
144962306a36Sopenharmony_ci			goto way_up_top;
145062306a36Sopenharmony_ci		}
145162306a36Sopenharmony_ci
145262306a36Sopenharmony_ci		if (res->state & (DLM_LOCK_RES_RECOVERING|
145362306a36Sopenharmony_ci				  DLM_LOCK_RES_MIGRATING)) {
145462306a36Sopenharmony_ci			spin_unlock(&res->spinlock);
145562306a36Sopenharmony_ci			mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
145662306a36Sopenharmony_ci			     "being recovered/migrated\n");
145762306a36Sopenharmony_ci			response = DLM_MASTER_RESP_ERROR;
145862306a36Sopenharmony_ci			if (mle)
145962306a36Sopenharmony_ci				kmem_cache_free(dlm_mle_cache, mle);
146062306a36Sopenharmony_ci			goto send_response;
146162306a36Sopenharmony_ci		}
146262306a36Sopenharmony_ci
146362306a36Sopenharmony_ci		if (res->owner == dlm->node_num) {
146462306a36Sopenharmony_ci			dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
146562306a36Sopenharmony_ci			spin_unlock(&res->spinlock);
146662306a36Sopenharmony_ci			response = DLM_MASTER_RESP_YES;
146762306a36Sopenharmony_ci			if (mle)
146862306a36Sopenharmony_ci				kmem_cache_free(dlm_mle_cache, mle);
146962306a36Sopenharmony_ci
147062306a36Sopenharmony_ci			/* this node is the owner.
147162306a36Sopenharmony_ci			 * there is some extra work that needs to
147262306a36Sopenharmony_ci			 * happen now.  the requesting node has
147362306a36Sopenharmony_ci			 * caused all nodes up to this one to
147462306a36Sopenharmony_ci			 * create mles.  this node now needs to
147562306a36Sopenharmony_ci			 * go back and clean those up. */
147662306a36Sopenharmony_ci			dispatch_assert = 1;
147762306a36Sopenharmony_ci			goto send_response;
147862306a36Sopenharmony_ci		} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
147962306a36Sopenharmony_ci			spin_unlock(&res->spinlock);
148062306a36Sopenharmony_ci			// mlog(0, "node %u is the master\n", res->owner);
148162306a36Sopenharmony_ci			response = DLM_MASTER_RESP_NO;
148262306a36Sopenharmony_ci			if (mle)
148362306a36Sopenharmony_ci				kmem_cache_free(dlm_mle_cache, mle);
148462306a36Sopenharmony_ci			goto send_response;
148562306a36Sopenharmony_ci		}
148662306a36Sopenharmony_ci
148762306a36Sopenharmony_ci		/* ok, there is no owner.  either this node is
148862306a36Sopenharmony_ci		 * being blocked, or it is actively trying to
148962306a36Sopenharmony_ci		 * master this lock. */
149062306a36Sopenharmony_ci		if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
149162306a36Sopenharmony_ci			mlog(ML_ERROR, "lock with no owner should be "
149262306a36Sopenharmony_ci			     "in-progress!\n");
149362306a36Sopenharmony_ci			BUG();
149462306a36Sopenharmony_ci		}
149562306a36Sopenharmony_ci
149662306a36Sopenharmony_ci		// mlog(0, "lockres is in progress...\n");
149762306a36Sopenharmony_ci		spin_lock(&dlm->master_lock);
149862306a36Sopenharmony_ci		found = dlm_find_mle(dlm, &tmpmle, name, namelen);
149962306a36Sopenharmony_ci		if (!found) {
150062306a36Sopenharmony_ci			mlog(ML_ERROR, "no mle found for this lock!\n");
150162306a36Sopenharmony_ci			BUG();
150262306a36Sopenharmony_ci		}
150362306a36Sopenharmony_ci		set_maybe = 1;
150462306a36Sopenharmony_ci		spin_lock(&tmpmle->spinlock);
150562306a36Sopenharmony_ci		if (tmpmle->type == DLM_MLE_BLOCK) {
150662306a36Sopenharmony_ci			// mlog(0, "this node is waiting for "
150762306a36Sopenharmony_ci			// "lockres to be mastered\n");
150862306a36Sopenharmony_ci			response = DLM_MASTER_RESP_NO;
150962306a36Sopenharmony_ci		} else if (tmpmle->type == DLM_MLE_MIGRATION) {
151062306a36Sopenharmony_ci			mlog(0, "node %u is master, but trying to migrate to "
151162306a36Sopenharmony_ci			     "node %u.\n", tmpmle->master, tmpmle->new_master);
151262306a36Sopenharmony_ci			if (tmpmle->master == dlm->node_num) {
151362306a36Sopenharmony_ci				mlog(ML_ERROR, "no owner on lockres, but this "
151462306a36Sopenharmony_ci				     "node is trying to migrate it to %u?!\n",
151562306a36Sopenharmony_ci				     tmpmle->new_master);
151662306a36Sopenharmony_ci				BUG();
151762306a36Sopenharmony_ci			} else {
151862306a36Sopenharmony_ci				/* the real master can respond on its own */
151962306a36Sopenharmony_ci				response = DLM_MASTER_RESP_NO;
152062306a36Sopenharmony_ci			}
152162306a36Sopenharmony_ci		} else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
152262306a36Sopenharmony_ci			set_maybe = 0;
152362306a36Sopenharmony_ci			if (tmpmle->master == dlm->node_num) {
152462306a36Sopenharmony_ci				response = DLM_MASTER_RESP_YES;
152562306a36Sopenharmony_ci				/* this node will be the owner.
152662306a36Sopenharmony_ci				 * go back and clean the mles on any
152762306a36Sopenharmony_ci				 * other nodes */
152862306a36Sopenharmony_ci				dispatch_assert = 1;
152962306a36Sopenharmony_ci				dlm_lockres_set_refmap_bit(dlm, res,
153062306a36Sopenharmony_ci							   request->node_idx);
153162306a36Sopenharmony_ci			} else
153262306a36Sopenharmony_ci				response = DLM_MASTER_RESP_NO;
153362306a36Sopenharmony_ci		} else {
153462306a36Sopenharmony_ci			// mlog(0, "this node is attempting to "
153562306a36Sopenharmony_ci			// "master lockres\n");
153662306a36Sopenharmony_ci			response = DLM_MASTER_RESP_MAYBE;
153762306a36Sopenharmony_ci		}
153862306a36Sopenharmony_ci		if (set_maybe)
153962306a36Sopenharmony_ci			set_bit(request->node_idx, tmpmle->maybe_map);
154062306a36Sopenharmony_ci		spin_unlock(&tmpmle->spinlock);
154162306a36Sopenharmony_ci
154262306a36Sopenharmony_ci		spin_unlock(&dlm->master_lock);
154362306a36Sopenharmony_ci		spin_unlock(&res->spinlock);
154462306a36Sopenharmony_ci
154562306a36Sopenharmony_ci		/* keep the mle attached to heartbeat events */
154662306a36Sopenharmony_ci		dlm_put_mle(tmpmle);
154762306a36Sopenharmony_ci		if (mle)
154862306a36Sopenharmony_ci			kmem_cache_free(dlm_mle_cache, mle);
154962306a36Sopenharmony_ci		goto send_response;
155062306a36Sopenharmony_ci	}
155162306a36Sopenharmony_ci
155262306a36Sopenharmony_ci	/*
155362306a36Sopenharmony_ci	 * lockres doesn't exist on this node
155462306a36Sopenharmony_ci	 * if there is an MLE_BLOCK, return NO
155562306a36Sopenharmony_ci	 * if there is an MLE_MASTER, return MAYBE
155662306a36Sopenharmony_ci	 * otherwise, add an MLE_BLOCK, return NO
155762306a36Sopenharmony_ci	 */
155862306a36Sopenharmony_ci	spin_lock(&dlm->master_lock);
155962306a36Sopenharmony_ci	found = dlm_find_mle(dlm, &tmpmle, name, namelen);
156062306a36Sopenharmony_ci	if (!found) {
156162306a36Sopenharmony_ci		/* this lockid has never been seen on this node yet */
156262306a36Sopenharmony_ci		// mlog(0, "no mle found\n");
156362306a36Sopenharmony_ci		if (!mle) {
156462306a36Sopenharmony_ci			spin_unlock(&dlm->master_lock);
156562306a36Sopenharmony_ci			spin_unlock(&dlm->spinlock);
156662306a36Sopenharmony_ci
156762306a36Sopenharmony_ci			mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
156862306a36Sopenharmony_ci			if (!mle) {
156962306a36Sopenharmony_ci				response = DLM_MASTER_RESP_ERROR;
157062306a36Sopenharmony_ci				mlog_errno(-ENOMEM);
157162306a36Sopenharmony_ci				goto send_response;
157262306a36Sopenharmony_ci			}
157362306a36Sopenharmony_ci			goto way_up_top;
157462306a36Sopenharmony_ci		}
157562306a36Sopenharmony_ci
157662306a36Sopenharmony_ci		// mlog(0, "this is second time thru, already allocated, "
157762306a36Sopenharmony_ci		// "add the block.\n");
157862306a36Sopenharmony_ci		dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
157962306a36Sopenharmony_ci		set_bit(request->node_idx, mle->maybe_map);
158062306a36Sopenharmony_ci		__dlm_insert_mle(dlm, mle);
158162306a36Sopenharmony_ci		response = DLM_MASTER_RESP_NO;
158262306a36Sopenharmony_ci	} else {
158362306a36Sopenharmony_ci		spin_lock(&tmpmle->spinlock);
158462306a36Sopenharmony_ci		if (tmpmle->master == dlm->node_num) {
158562306a36Sopenharmony_ci			mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
158662306a36Sopenharmony_ci			BUG();
158762306a36Sopenharmony_ci		}
158862306a36Sopenharmony_ci		if (tmpmle->type == DLM_MLE_BLOCK)
158962306a36Sopenharmony_ci			response = DLM_MASTER_RESP_NO;
159062306a36Sopenharmony_ci		else if (tmpmle->type == DLM_MLE_MIGRATION) {
159162306a36Sopenharmony_ci			mlog(0, "migration mle was found (%u->%u)\n",
159262306a36Sopenharmony_ci			     tmpmle->master, tmpmle->new_master);
159362306a36Sopenharmony_ci			/* real master can respond on its own */
159462306a36Sopenharmony_ci			response = DLM_MASTER_RESP_NO;
159562306a36Sopenharmony_ci		} else
159662306a36Sopenharmony_ci			response = DLM_MASTER_RESP_MAYBE;
159762306a36Sopenharmony_ci		set_bit(request->node_idx, tmpmle->maybe_map);
159862306a36Sopenharmony_ci		spin_unlock(&tmpmle->spinlock);
159962306a36Sopenharmony_ci	}
160062306a36Sopenharmony_ci	spin_unlock(&dlm->master_lock);
160162306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
160262306a36Sopenharmony_ci
160362306a36Sopenharmony_ci	if (found) {
160462306a36Sopenharmony_ci		/* keep the mle attached to heartbeat events */
160562306a36Sopenharmony_ci		dlm_put_mle(tmpmle);
160662306a36Sopenharmony_ci	}
160762306a36Sopenharmony_cisend_response:
160862306a36Sopenharmony_ci	/*
160962306a36Sopenharmony_ci	 * __dlm_lookup_lockres() grabbed a reference to this lockres.
161062306a36Sopenharmony_ci	 * The reference is released by dlm_assert_master_worker() under
161162306a36Sopenharmony_ci	 * the call to dlm_dispatch_assert_master().  If
161262306a36Sopenharmony_ci	 * dlm_assert_master_worker() isn't called, we drop it here.
161362306a36Sopenharmony_ci	 */
161462306a36Sopenharmony_ci	if (dispatch_assert) {
161562306a36Sopenharmony_ci		mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
161662306a36Sopenharmony_ci			     dlm->node_num, res->lockname.len, res->lockname.name);
161762306a36Sopenharmony_ci		spin_lock(&res->spinlock);
161862306a36Sopenharmony_ci		ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
161962306a36Sopenharmony_ci						 DLM_ASSERT_MASTER_MLE_CLEANUP);
162062306a36Sopenharmony_ci		if (ret < 0) {
162162306a36Sopenharmony_ci			mlog(ML_ERROR, "failed to dispatch assert master work\n");
162262306a36Sopenharmony_ci			response = DLM_MASTER_RESP_ERROR;
162362306a36Sopenharmony_ci			spin_unlock(&res->spinlock);
162462306a36Sopenharmony_ci			dlm_lockres_put(res);
162562306a36Sopenharmony_ci		} else {
162662306a36Sopenharmony_ci			dispatched = 1;
162762306a36Sopenharmony_ci			__dlm_lockres_grab_inflight_worker(dlm, res);
162862306a36Sopenharmony_ci			spin_unlock(&res->spinlock);
162962306a36Sopenharmony_ci		}
163062306a36Sopenharmony_ci	} else {
163162306a36Sopenharmony_ci		if (res)
163262306a36Sopenharmony_ci			dlm_lockres_put(res);
163362306a36Sopenharmony_ci	}
163462306a36Sopenharmony_ci
163562306a36Sopenharmony_ci	if (!dispatched)
163662306a36Sopenharmony_ci		dlm_put(dlm);
163762306a36Sopenharmony_ci	return response;
163862306a36Sopenharmony_ci}
163962306a36Sopenharmony_ci
164062306a36Sopenharmony_ci/*
164162306a36Sopenharmony_ci * DLM_ASSERT_MASTER_MSG
164262306a36Sopenharmony_ci */
164362306a36Sopenharmony_ci
164462306a36Sopenharmony_ci
164562306a36Sopenharmony_ci/*
164662306a36Sopenharmony_ci * NOTE: this can be used for debugging
164762306a36Sopenharmony_ci * can periodically run all locks owned by this node
164862306a36Sopenharmony_ci * and re-assert across the cluster...
164962306a36Sopenharmony_ci */
165062306a36Sopenharmony_cistatic int dlm_do_assert_master(struct dlm_ctxt *dlm,
165162306a36Sopenharmony_ci				struct dlm_lock_resource *res,
165262306a36Sopenharmony_ci				void *nodemap, u32 flags)
165362306a36Sopenharmony_ci{
165462306a36Sopenharmony_ci	struct dlm_assert_master assert;
165562306a36Sopenharmony_ci	int to, tmpret;
165662306a36Sopenharmony_ci	struct dlm_node_iter iter;
165762306a36Sopenharmony_ci	int ret = 0;
165862306a36Sopenharmony_ci	int reassert;
165962306a36Sopenharmony_ci	const char *lockname = res->lockname.name;
166062306a36Sopenharmony_ci	unsigned int namelen = res->lockname.len;
166162306a36Sopenharmony_ci
166262306a36Sopenharmony_ci	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
166362306a36Sopenharmony_ci
166462306a36Sopenharmony_ci	spin_lock(&res->spinlock);
166562306a36Sopenharmony_ci	res->state |= DLM_LOCK_RES_SETREF_INPROG;
166662306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
166762306a36Sopenharmony_ci
166862306a36Sopenharmony_ciagain:
166962306a36Sopenharmony_ci	reassert = 0;
167062306a36Sopenharmony_ci
167162306a36Sopenharmony_ci	/* note that if this nodemap is empty, it returns 0 */
167262306a36Sopenharmony_ci	dlm_node_iter_init(nodemap, &iter);
167362306a36Sopenharmony_ci	while ((to = dlm_node_iter_next(&iter)) >= 0) {
167462306a36Sopenharmony_ci		int r = 0;
167562306a36Sopenharmony_ci		struct dlm_master_list_entry *mle = NULL;
167662306a36Sopenharmony_ci
167762306a36Sopenharmony_ci		mlog(0, "sending assert master to %d (%.*s)\n", to,
167862306a36Sopenharmony_ci		     namelen, lockname);
167962306a36Sopenharmony_ci		memset(&assert, 0, sizeof(assert));
168062306a36Sopenharmony_ci		assert.node_idx = dlm->node_num;
168162306a36Sopenharmony_ci		assert.namelen = namelen;
168262306a36Sopenharmony_ci		memcpy(assert.name, lockname, namelen);
168362306a36Sopenharmony_ci		assert.flags = cpu_to_be32(flags);
168462306a36Sopenharmony_ci
168562306a36Sopenharmony_ci		tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
168662306a36Sopenharmony_ci					    &assert, sizeof(assert), to, &r);
168762306a36Sopenharmony_ci		if (tmpret < 0) {
168862306a36Sopenharmony_ci			mlog(ML_ERROR, "Error %d when sending message %u (key "
168962306a36Sopenharmony_ci			     "0x%x) to node %u\n", tmpret,
169062306a36Sopenharmony_ci			     DLM_ASSERT_MASTER_MSG, dlm->key, to);
169162306a36Sopenharmony_ci			if (!dlm_is_host_down(tmpret)) {
169262306a36Sopenharmony_ci				mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
169362306a36Sopenharmony_ci				BUG();
169462306a36Sopenharmony_ci			}
169562306a36Sopenharmony_ci			/* a node died.  finish out the rest of the nodes. */
169662306a36Sopenharmony_ci			mlog(0, "link to %d went down!\n", to);
169762306a36Sopenharmony_ci			/* any nonzero status return will do */
169862306a36Sopenharmony_ci			ret = tmpret;
169962306a36Sopenharmony_ci			r = 0;
170062306a36Sopenharmony_ci		} else if (r < 0) {
170162306a36Sopenharmony_ci			/* ok, something horribly messed.  kill thyself. */
170262306a36Sopenharmony_ci			mlog(ML_ERROR,"during assert master of %.*s to %u, "
170362306a36Sopenharmony_ci			     "got %d.\n", namelen, lockname, to, r);
170462306a36Sopenharmony_ci			spin_lock(&dlm->spinlock);
170562306a36Sopenharmony_ci			spin_lock(&dlm->master_lock);
170662306a36Sopenharmony_ci			if (dlm_find_mle(dlm, &mle, (char *)lockname,
170762306a36Sopenharmony_ci					 namelen)) {
170862306a36Sopenharmony_ci				dlm_print_one_mle(mle);
170962306a36Sopenharmony_ci				__dlm_put_mle(mle);
171062306a36Sopenharmony_ci			}
171162306a36Sopenharmony_ci			spin_unlock(&dlm->master_lock);
171262306a36Sopenharmony_ci			spin_unlock(&dlm->spinlock);
171362306a36Sopenharmony_ci			BUG();
171462306a36Sopenharmony_ci		}
171562306a36Sopenharmony_ci
171662306a36Sopenharmony_ci		if (r & DLM_ASSERT_RESPONSE_REASSERT &&
171762306a36Sopenharmony_ci		    !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
171862306a36Sopenharmony_ci				mlog(ML_ERROR, "%.*s: very strange, "
171962306a36Sopenharmony_ci				     "master MLE but no lockres on %u\n",
172062306a36Sopenharmony_ci				     namelen, lockname, to);
172162306a36Sopenharmony_ci		}
172262306a36Sopenharmony_ci
172362306a36Sopenharmony_ci		if (r & DLM_ASSERT_RESPONSE_REASSERT) {
172462306a36Sopenharmony_ci			mlog(0, "%.*s: node %u create mles on other "
172562306a36Sopenharmony_ci			     "nodes and requests a re-assert\n",
172662306a36Sopenharmony_ci			     namelen, lockname, to);
172762306a36Sopenharmony_ci			reassert = 1;
172862306a36Sopenharmony_ci		}
172962306a36Sopenharmony_ci		if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
173062306a36Sopenharmony_ci			mlog(0, "%.*s: node %u has a reference to this "
173162306a36Sopenharmony_ci			     "lockres, set the bit in the refmap\n",
173262306a36Sopenharmony_ci			     namelen, lockname, to);
173362306a36Sopenharmony_ci			spin_lock(&res->spinlock);
173462306a36Sopenharmony_ci			dlm_lockres_set_refmap_bit(dlm, res, to);
173562306a36Sopenharmony_ci			spin_unlock(&res->spinlock);
173662306a36Sopenharmony_ci		}
173762306a36Sopenharmony_ci	}
173862306a36Sopenharmony_ci
173962306a36Sopenharmony_ci	if (reassert)
174062306a36Sopenharmony_ci		goto again;
174162306a36Sopenharmony_ci
174262306a36Sopenharmony_ci	spin_lock(&res->spinlock);
174362306a36Sopenharmony_ci	res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
174462306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
174562306a36Sopenharmony_ci	wake_up(&res->wq);
174662306a36Sopenharmony_ci
174762306a36Sopenharmony_ci	return ret;
174862306a36Sopenharmony_ci}
174962306a36Sopenharmony_ci
175062306a36Sopenharmony_ci/*
175162306a36Sopenharmony_ci * locks that can be taken here:
175262306a36Sopenharmony_ci * dlm->spinlock
175362306a36Sopenharmony_ci * res->spinlock
175462306a36Sopenharmony_ci * mle->spinlock
175562306a36Sopenharmony_ci * dlm->master_list
175662306a36Sopenharmony_ci *
175762306a36Sopenharmony_ci * if possible, TRIM THIS DOWN!!!
175862306a36Sopenharmony_ci */
175962306a36Sopenharmony_ciint dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
176062306a36Sopenharmony_ci			      void **ret_data)
176162306a36Sopenharmony_ci{
176262306a36Sopenharmony_ci	struct dlm_ctxt *dlm = data;
176362306a36Sopenharmony_ci	struct dlm_master_list_entry *mle = NULL;
176462306a36Sopenharmony_ci	struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
176562306a36Sopenharmony_ci	struct dlm_lock_resource *res = NULL;
176662306a36Sopenharmony_ci	char *name;
176762306a36Sopenharmony_ci	unsigned int namelen, hash;
176862306a36Sopenharmony_ci	u32 flags;
176962306a36Sopenharmony_ci	int master_request = 0, have_lockres_ref = 0;
177062306a36Sopenharmony_ci	int ret = 0;
177162306a36Sopenharmony_ci
177262306a36Sopenharmony_ci	if (!dlm_grab(dlm))
177362306a36Sopenharmony_ci		return 0;
177462306a36Sopenharmony_ci
177562306a36Sopenharmony_ci	name = assert->name;
177662306a36Sopenharmony_ci	namelen = assert->namelen;
177762306a36Sopenharmony_ci	hash = dlm_lockid_hash(name, namelen);
177862306a36Sopenharmony_ci	flags = be32_to_cpu(assert->flags);
177962306a36Sopenharmony_ci
178062306a36Sopenharmony_ci	if (namelen > DLM_LOCKID_NAME_MAX) {
178162306a36Sopenharmony_ci		mlog(ML_ERROR, "Invalid name length!");
178262306a36Sopenharmony_ci		goto done;
178362306a36Sopenharmony_ci	}
178462306a36Sopenharmony_ci
178562306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
178662306a36Sopenharmony_ci
178762306a36Sopenharmony_ci	if (flags)
178862306a36Sopenharmony_ci		mlog(0, "assert_master with flags: %u\n", flags);
178962306a36Sopenharmony_ci
179062306a36Sopenharmony_ci	/* find the MLE */
179162306a36Sopenharmony_ci	spin_lock(&dlm->master_lock);
179262306a36Sopenharmony_ci	if (!dlm_find_mle(dlm, &mle, name, namelen)) {
179362306a36Sopenharmony_ci		/* not an error, could be master just re-asserting */
179462306a36Sopenharmony_ci		mlog(0, "just got an assert_master from %u, but no "
179562306a36Sopenharmony_ci		     "MLE for it! (%.*s)\n", assert->node_idx,
179662306a36Sopenharmony_ci		     namelen, name);
179762306a36Sopenharmony_ci	} else {
179862306a36Sopenharmony_ci		int bit = find_first_bit(mle->maybe_map, O2NM_MAX_NODES);
179962306a36Sopenharmony_ci		if (bit >= O2NM_MAX_NODES) {
180062306a36Sopenharmony_ci			/* not necessarily an error, though less likely.
180162306a36Sopenharmony_ci			 * could be master just re-asserting. */
180262306a36Sopenharmony_ci			mlog(0, "no bits set in the maybe_map, but %u "
180362306a36Sopenharmony_ci			     "is asserting! (%.*s)\n", assert->node_idx,
180462306a36Sopenharmony_ci			     namelen, name);
180562306a36Sopenharmony_ci		} else if (bit != assert->node_idx) {
180662306a36Sopenharmony_ci			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
180762306a36Sopenharmony_ci				mlog(0, "master %u was found, %u should "
180862306a36Sopenharmony_ci				     "back off\n", assert->node_idx, bit);
180962306a36Sopenharmony_ci			} else {
181062306a36Sopenharmony_ci				/* with the fix for bug 569, a higher node
181162306a36Sopenharmony_ci				 * number winning the mastery will respond
181262306a36Sopenharmony_ci				 * YES to mastery requests, but this node
181362306a36Sopenharmony_ci				 * had no way of knowing.  let it pass. */
181462306a36Sopenharmony_ci				mlog(0, "%u is the lowest node, "
181562306a36Sopenharmony_ci				     "%u is asserting. (%.*s)  %u must "
181662306a36Sopenharmony_ci				     "have begun after %u won.\n", bit,
181762306a36Sopenharmony_ci				     assert->node_idx, namelen, name, bit,
181862306a36Sopenharmony_ci				     assert->node_idx);
181962306a36Sopenharmony_ci			}
182062306a36Sopenharmony_ci		}
182162306a36Sopenharmony_ci		if (mle->type == DLM_MLE_MIGRATION) {
182262306a36Sopenharmony_ci			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
182362306a36Sopenharmony_ci				mlog(0, "%s:%.*s: got cleanup assert"
182462306a36Sopenharmony_ci				     " from %u for migration\n",
182562306a36Sopenharmony_ci				     dlm->name, namelen, name,
182662306a36Sopenharmony_ci				     assert->node_idx);
182762306a36Sopenharmony_ci			} else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
182862306a36Sopenharmony_ci				mlog(0, "%s:%.*s: got unrelated assert"
182962306a36Sopenharmony_ci				     " from %u for migration, ignoring\n",
183062306a36Sopenharmony_ci				     dlm->name, namelen, name,
183162306a36Sopenharmony_ci				     assert->node_idx);
183262306a36Sopenharmony_ci				__dlm_put_mle(mle);
183362306a36Sopenharmony_ci				spin_unlock(&dlm->master_lock);
183462306a36Sopenharmony_ci				spin_unlock(&dlm->spinlock);
183562306a36Sopenharmony_ci				goto done;
183662306a36Sopenharmony_ci			}
183762306a36Sopenharmony_ci		}
183862306a36Sopenharmony_ci	}
183962306a36Sopenharmony_ci	spin_unlock(&dlm->master_lock);
184062306a36Sopenharmony_ci
184162306a36Sopenharmony_ci	/* ok everything checks out with the MLE
184262306a36Sopenharmony_ci	 * now check to see if there is a lockres */
184362306a36Sopenharmony_ci	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
184462306a36Sopenharmony_ci	if (res) {
184562306a36Sopenharmony_ci		spin_lock(&res->spinlock);
184662306a36Sopenharmony_ci		if (res->state & DLM_LOCK_RES_RECOVERING)  {
184762306a36Sopenharmony_ci			mlog(ML_ERROR, "%u asserting but %.*s is "
184862306a36Sopenharmony_ci			     "RECOVERING!\n", assert->node_idx, namelen, name);
184962306a36Sopenharmony_ci			goto kill;
185062306a36Sopenharmony_ci		}
185162306a36Sopenharmony_ci		if (!mle) {
185262306a36Sopenharmony_ci			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
185362306a36Sopenharmony_ci			    res->owner != assert->node_idx) {
185462306a36Sopenharmony_ci				mlog(ML_ERROR, "DIE! Mastery assert from %u, "
185562306a36Sopenharmony_ci				     "but current owner is %u! (%.*s)\n",
185662306a36Sopenharmony_ci				     assert->node_idx, res->owner, namelen,
185762306a36Sopenharmony_ci				     name);
185862306a36Sopenharmony_ci				__dlm_print_one_lock_resource(res);
185962306a36Sopenharmony_ci				BUG();
186062306a36Sopenharmony_ci			}
186162306a36Sopenharmony_ci		} else if (mle->type != DLM_MLE_MIGRATION) {
186262306a36Sopenharmony_ci			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
186362306a36Sopenharmony_ci				/* owner is just re-asserting */
186462306a36Sopenharmony_ci				if (res->owner == assert->node_idx) {
186562306a36Sopenharmony_ci					mlog(0, "owner %u re-asserting on "
186662306a36Sopenharmony_ci					     "lock %.*s\n", assert->node_idx,
186762306a36Sopenharmony_ci					     namelen, name);
186862306a36Sopenharmony_ci					goto ok;
186962306a36Sopenharmony_ci				}
187062306a36Sopenharmony_ci				mlog(ML_ERROR, "got assert_master from "
187162306a36Sopenharmony_ci				     "node %u, but %u is the owner! "
187262306a36Sopenharmony_ci				     "(%.*s)\n", assert->node_idx,
187362306a36Sopenharmony_ci				     res->owner, namelen, name);
187462306a36Sopenharmony_ci				goto kill;
187562306a36Sopenharmony_ci			}
187662306a36Sopenharmony_ci			if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
187762306a36Sopenharmony_ci				mlog(ML_ERROR, "got assert from %u, but lock "
187862306a36Sopenharmony_ci				     "with no owner should be "
187962306a36Sopenharmony_ci				     "in-progress! (%.*s)\n",
188062306a36Sopenharmony_ci				     assert->node_idx,
188162306a36Sopenharmony_ci				     namelen, name);
188262306a36Sopenharmony_ci				goto kill;
188362306a36Sopenharmony_ci			}
188462306a36Sopenharmony_ci		} else /* mle->type == DLM_MLE_MIGRATION */ {
188562306a36Sopenharmony_ci			/* should only be getting an assert from new master */
188662306a36Sopenharmony_ci			if (assert->node_idx != mle->new_master) {
188762306a36Sopenharmony_ci				mlog(ML_ERROR, "got assert from %u, but "
188862306a36Sopenharmony_ci				     "new master is %u, and old master "
188962306a36Sopenharmony_ci				     "was %u (%.*s)\n",
189062306a36Sopenharmony_ci				     assert->node_idx, mle->new_master,
189162306a36Sopenharmony_ci				     mle->master, namelen, name);
189262306a36Sopenharmony_ci				goto kill;
189362306a36Sopenharmony_ci			}
189462306a36Sopenharmony_ci
189562306a36Sopenharmony_ci		}
189662306a36Sopenharmony_ciok:
189762306a36Sopenharmony_ci		spin_unlock(&res->spinlock);
189862306a36Sopenharmony_ci	}
189962306a36Sopenharmony_ci
190062306a36Sopenharmony_ci	// mlog(0, "woo!  got an assert_master from node %u!\n",
190162306a36Sopenharmony_ci	// 	     assert->node_idx);
190262306a36Sopenharmony_ci	if (mle) {
190362306a36Sopenharmony_ci		int extra_ref = 0;
190462306a36Sopenharmony_ci		int nn = -1;
190562306a36Sopenharmony_ci		int rr, err = 0;
190662306a36Sopenharmony_ci
190762306a36Sopenharmony_ci		spin_lock(&mle->spinlock);
190862306a36Sopenharmony_ci		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
190962306a36Sopenharmony_ci			extra_ref = 1;
191062306a36Sopenharmony_ci		else {
191162306a36Sopenharmony_ci			/* MASTER mle: if any bits set in the response map
191262306a36Sopenharmony_ci			 * then the calling node needs to re-assert to clear
191362306a36Sopenharmony_ci			 * up nodes that this node contacted */
191462306a36Sopenharmony_ci			while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
191562306a36Sopenharmony_ci						    nn+1)) < O2NM_MAX_NODES) {
191662306a36Sopenharmony_ci				if (nn != dlm->node_num && nn != assert->node_idx) {
191762306a36Sopenharmony_ci					master_request = 1;
191862306a36Sopenharmony_ci					break;
191962306a36Sopenharmony_ci				}
192062306a36Sopenharmony_ci			}
192162306a36Sopenharmony_ci		}
192262306a36Sopenharmony_ci		mle->master = assert->node_idx;
192362306a36Sopenharmony_ci		atomic_set(&mle->woken, 1);
192462306a36Sopenharmony_ci		wake_up(&mle->wq);
192562306a36Sopenharmony_ci		spin_unlock(&mle->spinlock);
192662306a36Sopenharmony_ci
192762306a36Sopenharmony_ci		if (res) {
192862306a36Sopenharmony_ci			int wake = 0;
192962306a36Sopenharmony_ci			spin_lock(&res->spinlock);
193062306a36Sopenharmony_ci			if (mle->type == DLM_MLE_MIGRATION) {
193162306a36Sopenharmony_ci				mlog(0, "finishing off migration of lockres %.*s, "
193262306a36Sopenharmony_ci			     		"from %u to %u\n",
193362306a36Sopenharmony_ci			       		res->lockname.len, res->lockname.name,
193462306a36Sopenharmony_ci			       		dlm->node_num, mle->new_master);
193562306a36Sopenharmony_ci				res->state &= ~DLM_LOCK_RES_MIGRATING;
193662306a36Sopenharmony_ci				wake = 1;
193762306a36Sopenharmony_ci				dlm_change_lockres_owner(dlm, res, mle->new_master);
193862306a36Sopenharmony_ci				BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
193962306a36Sopenharmony_ci			} else {
194062306a36Sopenharmony_ci				dlm_change_lockres_owner(dlm, res, mle->master);
194162306a36Sopenharmony_ci			}
194262306a36Sopenharmony_ci			spin_unlock(&res->spinlock);
194362306a36Sopenharmony_ci			have_lockres_ref = 1;
194462306a36Sopenharmony_ci			if (wake)
194562306a36Sopenharmony_ci				wake_up(&res->wq);
194662306a36Sopenharmony_ci		}
194762306a36Sopenharmony_ci
194862306a36Sopenharmony_ci		/* master is known, detach if not already detached.
194962306a36Sopenharmony_ci		 * ensures that only one assert_master call will happen
195062306a36Sopenharmony_ci		 * on this mle. */
195162306a36Sopenharmony_ci		spin_lock(&dlm->master_lock);
195262306a36Sopenharmony_ci
195362306a36Sopenharmony_ci		rr = kref_read(&mle->mle_refs);
195462306a36Sopenharmony_ci		if (mle->inuse > 0) {
195562306a36Sopenharmony_ci			if (extra_ref && rr < 3)
195662306a36Sopenharmony_ci				err = 1;
195762306a36Sopenharmony_ci			else if (!extra_ref && rr < 2)
195862306a36Sopenharmony_ci				err = 1;
195962306a36Sopenharmony_ci		} else {
196062306a36Sopenharmony_ci			if (extra_ref && rr < 2)
196162306a36Sopenharmony_ci				err = 1;
196262306a36Sopenharmony_ci			else if (!extra_ref && rr < 1)
196362306a36Sopenharmony_ci				err = 1;
196462306a36Sopenharmony_ci		}
196562306a36Sopenharmony_ci		if (err) {
196662306a36Sopenharmony_ci			mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
196762306a36Sopenharmony_ci			     "that will mess up this node, refs=%d, extra=%d, "
196862306a36Sopenharmony_ci			     "inuse=%d\n", dlm->name, namelen, name,
196962306a36Sopenharmony_ci			     assert->node_idx, rr, extra_ref, mle->inuse);
197062306a36Sopenharmony_ci			dlm_print_one_mle(mle);
197162306a36Sopenharmony_ci		}
197262306a36Sopenharmony_ci		__dlm_unlink_mle(dlm, mle);
197362306a36Sopenharmony_ci		__dlm_mle_detach_hb_events(dlm, mle);
197462306a36Sopenharmony_ci		__dlm_put_mle(mle);
197562306a36Sopenharmony_ci		if (extra_ref) {
197662306a36Sopenharmony_ci			/* the assert master message now balances the extra
197762306a36Sopenharmony_ci		 	 * ref given by the master / migration request message.
197862306a36Sopenharmony_ci		 	 * if this is the last put, it will be removed
197962306a36Sopenharmony_ci		 	 * from the list. */
198062306a36Sopenharmony_ci			__dlm_put_mle(mle);
198162306a36Sopenharmony_ci		}
198262306a36Sopenharmony_ci		spin_unlock(&dlm->master_lock);
198362306a36Sopenharmony_ci	} else if (res) {
198462306a36Sopenharmony_ci		if (res->owner != assert->node_idx) {
198562306a36Sopenharmony_ci			mlog(0, "assert_master from %u, but current "
198662306a36Sopenharmony_ci			     "owner is %u (%.*s), no mle\n", assert->node_idx,
198762306a36Sopenharmony_ci			     res->owner, namelen, name);
198862306a36Sopenharmony_ci		}
198962306a36Sopenharmony_ci	}
199062306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
199162306a36Sopenharmony_ci
199262306a36Sopenharmony_cidone:
199362306a36Sopenharmony_ci	ret = 0;
199462306a36Sopenharmony_ci	if (res) {
199562306a36Sopenharmony_ci		spin_lock(&res->spinlock);
199662306a36Sopenharmony_ci		res->state |= DLM_LOCK_RES_SETREF_INPROG;
199762306a36Sopenharmony_ci		spin_unlock(&res->spinlock);
199862306a36Sopenharmony_ci		*ret_data = (void *)res;
199962306a36Sopenharmony_ci	}
200062306a36Sopenharmony_ci	dlm_put(dlm);
200162306a36Sopenharmony_ci	if (master_request) {
200262306a36Sopenharmony_ci		mlog(0, "need to tell master to reassert\n");
200362306a36Sopenharmony_ci		/* positive. negative would shoot down the node. */
200462306a36Sopenharmony_ci		ret |= DLM_ASSERT_RESPONSE_REASSERT;
200562306a36Sopenharmony_ci		if (!have_lockres_ref) {
200662306a36Sopenharmony_ci			mlog(ML_ERROR, "strange, got assert from %u, MASTER "
200762306a36Sopenharmony_ci			     "mle present here for %s:%.*s, but no lockres!\n",
200862306a36Sopenharmony_ci			     assert->node_idx, dlm->name, namelen, name);
200962306a36Sopenharmony_ci		}
201062306a36Sopenharmony_ci	}
201162306a36Sopenharmony_ci	if (have_lockres_ref) {
201262306a36Sopenharmony_ci		/* let the master know we have a reference to the lockres */
201362306a36Sopenharmony_ci		ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
201462306a36Sopenharmony_ci		mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
201562306a36Sopenharmony_ci		     dlm->name, namelen, name, assert->node_idx);
201662306a36Sopenharmony_ci	}
201762306a36Sopenharmony_ci	return ret;
201862306a36Sopenharmony_ci
201962306a36Sopenharmony_cikill:
202062306a36Sopenharmony_ci	/* kill the caller! */
202162306a36Sopenharmony_ci	mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
202262306a36Sopenharmony_ci	     "and killing the other node now!  This node is OK and can continue.\n");
202362306a36Sopenharmony_ci	__dlm_print_one_lock_resource(res);
202462306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
202562306a36Sopenharmony_ci	spin_lock(&dlm->master_lock);
202662306a36Sopenharmony_ci	if (mle)
202762306a36Sopenharmony_ci		__dlm_put_mle(mle);
202862306a36Sopenharmony_ci	spin_unlock(&dlm->master_lock);
202962306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
203062306a36Sopenharmony_ci	*ret_data = (void *)res;
203162306a36Sopenharmony_ci	dlm_put(dlm);
203262306a36Sopenharmony_ci	return -EINVAL;
203362306a36Sopenharmony_ci}
203462306a36Sopenharmony_ci
203562306a36Sopenharmony_civoid dlm_assert_master_post_handler(int status, void *data, void *ret_data)
203662306a36Sopenharmony_ci{
203762306a36Sopenharmony_ci	struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
203862306a36Sopenharmony_ci
203962306a36Sopenharmony_ci	if (ret_data) {
204062306a36Sopenharmony_ci		spin_lock(&res->spinlock);
204162306a36Sopenharmony_ci		res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
204262306a36Sopenharmony_ci		spin_unlock(&res->spinlock);
204362306a36Sopenharmony_ci		wake_up(&res->wq);
204462306a36Sopenharmony_ci		dlm_lockres_put(res);
204562306a36Sopenharmony_ci	}
204662306a36Sopenharmony_ci	return;
204762306a36Sopenharmony_ci}
204862306a36Sopenharmony_ci
204962306a36Sopenharmony_ciint dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
205062306a36Sopenharmony_ci			       struct dlm_lock_resource *res,
205162306a36Sopenharmony_ci			       int ignore_higher, u8 request_from, u32 flags)
205262306a36Sopenharmony_ci{
205362306a36Sopenharmony_ci	struct dlm_work_item *item;
205462306a36Sopenharmony_ci	item = kzalloc(sizeof(*item), GFP_ATOMIC);
205562306a36Sopenharmony_ci	if (!item)
205662306a36Sopenharmony_ci		return -ENOMEM;
205762306a36Sopenharmony_ci
205862306a36Sopenharmony_ci
205962306a36Sopenharmony_ci	/* queue up work for dlm_assert_master_worker */
206062306a36Sopenharmony_ci	dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
206162306a36Sopenharmony_ci	item->u.am.lockres = res; /* already have a ref */
206262306a36Sopenharmony_ci	/* can optionally ignore node numbers higher than this node */
206362306a36Sopenharmony_ci	item->u.am.ignore_higher = ignore_higher;
206462306a36Sopenharmony_ci	item->u.am.request_from = request_from;
206562306a36Sopenharmony_ci	item->u.am.flags = flags;
206662306a36Sopenharmony_ci
206762306a36Sopenharmony_ci	if (ignore_higher)
206862306a36Sopenharmony_ci		mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
206962306a36Sopenharmony_ci		     res->lockname.name);
207062306a36Sopenharmony_ci
207162306a36Sopenharmony_ci	spin_lock(&dlm->work_lock);
207262306a36Sopenharmony_ci	list_add_tail(&item->list, &dlm->work_list);
207362306a36Sopenharmony_ci	spin_unlock(&dlm->work_lock);
207462306a36Sopenharmony_ci
207562306a36Sopenharmony_ci	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
207662306a36Sopenharmony_ci	return 0;
207762306a36Sopenharmony_ci}
207862306a36Sopenharmony_ci
207962306a36Sopenharmony_cistatic void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
208062306a36Sopenharmony_ci{
208162306a36Sopenharmony_ci	struct dlm_ctxt *dlm = data;
208262306a36Sopenharmony_ci	int ret = 0;
208362306a36Sopenharmony_ci	struct dlm_lock_resource *res;
208462306a36Sopenharmony_ci	unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
208562306a36Sopenharmony_ci	int ignore_higher;
208662306a36Sopenharmony_ci	int bit;
208762306a36Sopenharmony_ci	u8 request_from;
208862306a36Sopenharmony_ci	u32 flags;
208962306a36Sopenharmony_ci
209062306a36Sopenharmony_ci	dlm = item->dlm;
209162306a36Sopenharmony_ci	res = item->u.am.lockres;
209262306a36Sopenharmony_ci	ignore_higher = item->u.am.ignore_higher;
209362306a36Sopenharmony_ci	request_from = item->u.am.request_from;
209462306a36Sopenharmony_ci	flags = item->u.am.flags;
209562306a36Sopenharmony_ci
209662306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
209762306a36Sopenharmony_ci	bitmap_copy(nodemap, dlm->domain_map, O2NM_MAX_NODES);
209862306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
209962306a36Sopenharmony_ci
210062306a36Sopenharmony_ci	clear_bit(dlm->node_num, nodemap);
210162306a36Sopenharmony_ci	if (ignore_higher) {
210262306a36Sopenharmony_ci		/* if is this just to clear up mles for nodes below
210362306a36Sopenharmony_ci		 * this node, do not send the message to the original
210462306a36Sopenharmony_ci		 * caller or any node number higher than this */
210562306a36Sopenharmony_ci		clear_bit(request_from, nodemap);
210662306a36Sopenharmony_ci		bit = dlm->node_num;
210762306a36Sopenharmony_ci		while (1) {
210862306a36Sopenharmony_ci			bit = find_next_bit(nodemap, O2NM_MAX_NODES,
210962306a36Sopenharmony_ci					    bit+1);
211062306a36Sopenharmony_ci		       	if (bit >= O2NM_MAX_NODES)
211162306a36Sopenharmony_ci				break;
211262306a36Sopenharmony_ci			clear_bit(bit, nodemap);
211362306a36Sopenharmony_ci		}
211462306a36Sopenharmony_ci	}
211562306a36Sopenharmony_ci
211662306a36Sopenharmony_ci	/*
211762306a36Sopenharmony_ci	 * If we're migrating this lock to someone else, we are no
211862306a36Sopenharmony_ci	 * longer allowed to assert out own mastery.  OTOH, we need to
211962306a36Sopenharmony_ci	 * prevent migration from starting while we're still asserting
212062306a36Sopenharmony_ci	 * our dominance.  The reserved ast delays migration.
212162306a36Sopenharmony_ci	 */
212262306a36Sopenharmony_ci	spin_lock(&res->spinlock);
212362306a36Sopenharmony_ci	if (res->state & DLM_LOCK_RES_MIGRATING) {
212462306a36Sopenharmony_ci		mlog(0, "Someone asked us to assert mastery, but we're "
212562306a36Sopenharmony_ci		     "in the middle of migration.  Skipping assert, "
212662306a36Sopenharmony_ci		     "the new master will handle that.\n");
212762306a36Sopenharmony_ci		spin_unlock(&res->spinlock);
212862306a36Sopenharmony_ci		goto put;
212962306a36Sopenharmony_ci	} else
213062306a36Sopenharmony_ci		__dlm_lockres_reserve_ast(res);
213162306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
213262306a36Sopenharmony_ci
213362306a36Sopenharmony_ci	/* this call now finishes out the nodemap
213462306a36Sopenharmony_ci	 * even if one or more nodes die */
213562306a36Sopenharmony_ci	mlog(0, "worker about to master %.*s here, this=%u\n",
213662306a36Sopenharmony_ci		     res->lockname.len, res->lockname.name, dlm->node_num);
213762306a36Sopenharmony_ci	ret = dlm_do_assert_master(dlm, res, nodemap, flags);
213862306a36Sopenharmony_ci	if (ret < 0) {
213962306a36Sopenharmony_ci		/* no need to restart, we are done */
214062306a36Sopenharmony_ci		if (!dlm_is_host_down(ret))
214162306a36Sopenharmony_ci			mlog_errno(ret);
214262306a36Sopenharmony_ci	}
214362306a36Sopenharmony_ci
214462306a36Sopenharmony_ci	/* Ok, we've asserted ourselves.  Let's let migration start. */
214562306a36Sopenharmony_ci	dlm_lockres_release_ast(dlm, res);
214662306a36Sopenharmony_ci
214762306a36Sopenharmony_ciput:
214862306a36Sopenharmony_ci	dlm_lockres_drop_inflight_worker(dlm, res);
214962306a36Sopenharmony_ci
215062306a36Sopenharmony_ci	dlm_lockres_put(res);
215162306a36Sopenharmony_ci
215262306a36Sopenharmony_ci	mlog(0, "finished with dlm_assert_master_worker\n");
215362306a36Sopenharmony_ci}
215462306a36Sopenharmony_ci
215562306a36Sopenharmony_ci/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
215662306a36Sopenharmony_ci * We cannot wait for node recovery to complete to begin mastering this
215762306a36Sopenharmony_ci * lockres because this lockres is used to kick off recovery! ;-)
215862306a36Sopenharmony_ci * So, do a pre-check on all living nodes to see if any of those nodes
215962306a36Sopenharmony_ci * think that $RECOVERY is currently mastered by a dead node.  If so,
216062306a36Sopenharmony_ci * we wait a short time to allow that node to get notified by its own
216162306a36Sopenharmony_ci * heartbeat stack, then check again.  All $RECOVERY lock resources
216262306a36Sopenharmony_ci * mastered by dead nodes are purged when the heartbeat callback is
216362306a36Sopenharmony_ci * fired, so we can know for sure that it is safe to continue once
216462306a36Sopenharmony_ci * the node returns a live node or no node.  */
216562306a36Sopenharmony_cistatic int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
216662306a36Sopenharmony_ci				       struct dlm_lock_resource *res)
216762306a36Sopenharmony_ci{
216862306a36Sopenharmony_ci	struct dlm_node_iter iter;
216962306a36Sopenharmony_ci	int nodenum;
217062306a36Sopenharmony_ci	int ret = 0;
217162306a36Sopenharmony_ci	u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
217262306a36Sopenharmony_ci
217362306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
217462306a36Sopenharmony_ci	dlm_node_iter_init(dlm->domain_map, &iter);
217562306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
217662306a36Sopenharmony_ci
217762306a36Sopenharmony_ci	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
217862306a36Sopenharmony_ci		/* do not send to self */
217962306a36Sopenharmony_ci		if (nodenum == dlm->node_num)
218062306a36Sopenharmony_ci			continue;
218162306a36Sopenharmony_ci		ret = dlm_do_master_requery(dlm, res, nodenum, &master);
218262306a36Sopenharmony_ci		if (ret < 0) {
218362306a36Sopenharmony_ci			mlog_errno(ret);
218462306a36Sopenharmony_ci			if (!dlm_is_host_down(ret))
218562306a36Sopenharmony_ci				BUG();
218662306a36Sopenharmony_ci			/* host is down, so answer for that node would be
218762306a36Sopenharmony_ci			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
218862306a36Sopenharmony_ci			ret = 0;
218962306a36Sopenharmony_ci		}
219062306a36Sopenharmony_ci
219162306a36Sopenharmony_ci		if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
219262306a36Sopenharmony_ci			/* check to see if this master is in the recovery map */
219362306a36Sopenharmony_ci			spin_lock(&dlm->spinlock);
219462306a36Sopenharmony_ci			if (test_bit(master, dlm->recovery_map)) {
219562306a36Sopenharmony_ci				mlog(ML_NOTICE, "%s: node %u has not seen "
219662306a36Sopenharmony_ci				     "node %u go down yet, and thinks the "
219762306a36Sopenharmony_ci				     "dead node is mastering the recovery "
219862306a36Sopenharmony_ci				     "lock.  must wait.\n", dlm->name,
219962306a36Sopenharmony_ci				     nodenum, master);
220062306a36Sopenharmony_ci				ret = -EAGAIN;
220162306a36Sopenharmony_ci			}
220262306a36Sopenharmony_ci			spin_unlock(&dlm->spinlock);
220362306a36Sopenharmony_ci			mlog(0, "%s: reco lock master is %u\n", dlm->name,
220462306a36Sopenharmony_ci			     master);
220562306a36Sopenharmony_ci			break;
220662306a36Sopenharmony_ci		}
220762306a36Sopenharmony_ci	}
220862306a36Sopenharmony_ci	return ret;
220962306a36Sopenharmony_ci}
221062306a36Sopenharmony_ci
221162306a36Sopenharmony_ci/*
221262306a36Sopenharmony_ci * DLM_DEREF_LOCKRES_MSG
221362306a36Sopenharmony_ci */
221462306a36Sopenharmony_ci
221562306a36Sopenharmony_ciint dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
221662306a36Sopenharmony_ci{
221762306a36Sopenharmony_ci	struct dlm_deref_lockres deref;
221862306a36Sopenharmony_ci	int ret = 0, r;
221962306a36Sopenharmony_ci	const char *lockname;
222062306a36Sopenharmony_ci	unsigned int namelen;
222162306a36Sopenharmony_ci
222262306a36Sopenharmony_ci	lockname = res->lockname.name;
222362306a36Sopenharmony_ci	namelen = res->lockname.len;
222462306a36Sopenharmony_ci	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
222562306a36Sopenharmony_ci
222662306a36Sopenharmony_ci	memset(&deref, 0, sizeof(deref));
222762306a36Sopenharmony_ci	deref.node_idx = dlm->node_num;
222862306a36Sopenharmony_ci	deref.namelen = namelen;
222962306a36Sopenharmony_ci	memcpy(deref.name, lockname, namelen);
223062306a36Sopenharmony_ci
223162306a36Sopenharmony_ci	ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
223262306a36Sopenharmony_ci				 &deref, sizeof(deref), res->owner, &r);
223362306a36Sopenharmony_ci	if (ret < 0)
223462306a36Sopenharmony_ci		mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n",
223562306a36Sopenharmony_ci		     dlm->name, namelen, lockname, ret, res->owner);
223662306a36Sopenharmony_ci	else if (r < 0) {
223762306a36Sopenharmony_ci		/* BAD.  other node says I did not have a ref. */
223862306a36Sopenharmony_ci		mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
223962306a36Sopenharmony_ci		     dlm->name, namelen, lockname, res->owner, r);
224062306a36Sopenharmony_ci		dlm_print_one_lock_resource(res);
224162306a36Sopenharmony_ci		if (r == -ENOMEM)
224262306a36Sopenharmony_ci			BUG();
224362306a36Sopenharmony_ci	} else
224462306a36Sopenharmony_ci		ret = r;
224562306a36Sopenharmony_ci
224662306a36Sopenharmony_ci	return ret;
224762306a36Sopenharmony_ci}
224862306a36Sopenharmony_ci
224962306a36Sopenharmony_ciint dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
225062306a36Sopenharmony_ci			      void **ret_data)
225162306a36Sopenharmony_ci{
225262306a36Sopenharmony_ci	struct dlm_ctxt *dlm = data;
225362306a36Sopenharmony_ci	struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
225462306a36Sopenharmony_ci	struct dlm_lock_resource *res = NULL;
225562306a36Sopenharmony_ci	char *name;
225662306a36Sopenharmony_ci	unsigned int namelen;
225762306a36Sopenharmony_ci	int ret = -EINVAL;
225862306a36Sopenharmony_ci	u8 node;
225962306a36Sopenharmony_ci	unsigned int hash;
226062306a36Sopenharmony_ci	struct dlm_work_item *item;
226162306a36Sopenharmony_ci	int cleared = 0;
226262306a36Sopenharmony_ci	int dispatch = 0;
226362306a36Sopenharmony_ci
226462306a36Sopenharmony_ci	if (!dlm_grab(dlm))
226562306a36Sopenharmony_ci		return 0;
226662306a36Sopenharmony_ci
226762306a36Sopenharmony_ci	name = deref->name;
226862306a36Sopenharmony_ci	namelen = deref->namelen;
226962306a36Sopenharmony_ci	node = deref->node_idx;
227062306a36Sopenharmony_ci
227162306a36Sopenharmony_ci	if (namelen > DLM_LOCKID_NAME_MAX) {
227262306a36Sopenharmony_ci		mlog(ML_ERROR, "Invalid name length!");
227362306a36Sopenharmony_ci		goto done;
227462306a36Sopenharmony_ci	}
227562306a36Sopenharmony_ci	if (deref->node_idx >= O2NM_MAX_NODES) {
227662306a36Sopenharmony_ci		mlog(ML_ERROR, "Invalid node number: %u\n", node);
227762306a36Sopenharmony_ci		goto done;
227862306a36Sopenharmony_ci	}
227962306a36Sopenharmony_ci
228062306a36Sopenharmony_ci	hash = dlm_lockid_hash(name, namelen);
228162306a36Sopenharmony_ci
228262306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
228362306a36Sopenharmony_ci	res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
228462306a36Sopenharmony_ci	if (!res) {
228562306a36Sopenharmony_ci		spin_unlock(&dlm->spinlock);
228662306a36Sopenharmony_ci		mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
228762306a36Sopenharmony_ci		     dlm->name, namelen, name);
228862306a36Sopenharmony_ci		goto done;
228962306a36Sopenharmony_ci	}
229062306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
229162306a36Sopenharmony_ci
229262306a36Sopenharmony_ci	spin_lock(&res->spinlock);
229362306a36Sopenharmony_ci	if (res->state & DLM_LOCK_RES_SETREF_INPROG)
229462306a36Sopenharmony_ci		dispatch = 1;
229562306a36Sopenharmony_ci	else {
229662306a36Sopenharmony_ci		BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
229762306a36Sopenharmony_ci		if (test_bit(node, res->refmap)) {
229862306a36Sopenharmony_ci			dlm_lockres_clear_refmap_bit(dlm, res, node);
229962306a36Sopenharmony_ci			cleared = 1;
230062306a36Sopenharmony_ci		}
230162306a36Sopenharmony_ci	}
230262306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
230362306a36Sopenharmony_ci
230462306a36Sopenharmony_ci	if (!dispatch) {
230562306a36Sopenharmony_ci		if (cleared)
230662306a36Sopenharmony_ci			dlm_lockres_calc_usage(dlm, res);
230762306a36Sopenharmony_ci		else {
230862306a36Sopenharmony_ci			mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
230962306a36Sopenharmony_ci		     	"but it is already dropped!\n", dlm->name,
231062306a36Sopenharmony_ci		     	res->lockname.len, res->lockname.name, node);
231162306a36Sopenharmony_ci			dlm_print_one_lock_resource(res);
231262306a36Sopenharmony_ci		}
231362306a36Sopenharmony_ci		ret = DLM_DEREF_RESPONSE_DONE;
231462306a36Sopenharmony_ci		goto done;
231562306a36Sopenharmony_ci	}
231662306a36Sopenharmony_ci
231762306a36Sopenharmony_ci	item = kzalloc(sizeof(*item), GFP_NOFS);
231862306a36Sopenharmony_ci	if (!item) {
231962306a36Sopenharmony_ci		ret = -ENOMEM;
232062306a36Sopenharmony_ci		mlog_errno(ret);
232162306a36Sopenharmony_ci		goto done;
232262306a36Sopenharmony_ci	}
232362306a36Sopenharmony_ci
232462306a36Sopenharmony_ci	dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
232562306a36Sopenharmony_ci	item->u.dl.deref_res = res;
232662306a36Sopenharmony_ci	item->u.dl.deref_node = node;
232762306a36Sopenharmony_ci
232862306a36Sopenharmony_ci	spin_lock(&dlm->work_lock);
232962306a36Sopenharmony_ci	list_add_tail(&item->list, &dlm->work_list);
233062306a36Sopenharmony_ci	spin_unlock(&dlm->work_lock);
233162306a36Sopenharmony_ci
233262306a36Sopenharmony_ci	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
233362306a36Sopenharmony_ci	return DLM_DEREF_RESPONSE_INPROG;
233462306a36Sopenharmony_ci
233562306a36Sopenharmony_cidone:
233662306a36Sopenharmony_ci	if (res)
233762306a36Sopenharmony_ci		dlm_lockres_put(res);
233862306a36Sopenharmony_ci	dlm_put(dlm);
233962306a36Sopenharmony_ci
234062306a36Sopenharmony_ci	return ret;
234162306a36Sopenharmony_ci}
234262306a36Sopenharmony_ci
234362306a36Sopenharmony_ciint dlm_deref_lockres_done_handler(struct o2net_msg *msg, u32 len, void *data,
234462306a36Sopenharmony_ci			      void **ret_data)
234562306a36Sopenharmony_ci{
234662306a36Sopenharmony_ci	struct dlm_ctxt *dlm = data;
234762306a36Sopenharmony_ci	struct dlm_deref_lockres_done *deref
234862306a36Sopenharmony_ci			= (struct dlm_deref_lockres_done *)msg->buf;
234962306a36Sopenharmony_ci	struct dlm_lock_resource *res = NULL;
235062306a36Sopenharmony_ci	char *name;
235162306a36Sopenharmony_ci	unsigned int namelen;
235262306a36Sopenharmony_ci	int ret = -EINVAL;
235362306a36Sopenharmony_ci	u8 node;
235462306a36Sopenharmony_ci	unsigned int hash;
235562306a36Sopenharmony_ci
235662306a36Sopenharmony_ci	if (!dlm_grab(dlm))
235762306a36Sopenharmony_ci		return 0;
235862306a36Sopenharmony_ci
235962306a36Sopenharmony_ci	name = deref->name;
236062306a36Sopenharmony_ci	namelen = deref->namelen;
236162306a36Sopenharmony_ci	node = deref->node_idx;
236262306a36Sopenharmony_ci
236362306a36Sopenharmony_ci	if (namelen > DLM_LOCKID_NAME_MAX) {
236462306a36Sopenharmony_ci		mlog(ML_ERROR, "Invalid name length!");
236562306a36Sopenharmony_ci		goto done;
236662306a36Sopenharmony_ci	}
236762306a36Sopenharmony_ci	if (deref->node_idx >= O2NM_MAX_NODES) {
236862306a36Sopenharmony_ci		mlog(ML_ERROR, "Invalid node number: %u\n", node);
236962306a36Sopenharmony_ci		goto done;
237062306a36Sopenharmony_ci	}
237162306a36Sopenharmony_ci
237262306a36Sopenharmony_ci	hash = dlm_lockid_hash(name, namelen);
237362306a36Sopenharmony_ci
237462306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
237562306a36Sopenharmony_ci	res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
237662306a36Sopenharmony_ci	if (!res) {
237762306a36Sopenharmony_ci		spin_unlock(&dlm->spinlock);
237862306a36Sopenharmony_ci		mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
237962306a36Sopenharmony_ci		     dlm->name, namelen, name);
238062306a36Sopenharmony_ci		goto done;
238162306a36Sopenharmony_ci	}
238262306a36Sopenharmony_ci
238362306a36Sopenharmony_ci	spin_lock(&res->spinlock);
238462306a36Sopenharmony_ci	if (!(res->state & DLM_LOCK_RES_DROPPING_REF)) {
238562306a36Sopenharmony_ci		spin_unlock(&res->spinlock);
238662306a36Sopenharmony_ci		spin_unlock(&dlm->spinlock);
238762306a36Sopenharmony_ci		mlog(ML_NOTICE, "%s:%.*s: node %u sends deref done "
238862306a36Sopenharmony_ci			"but it is already derefed!\n", dlm->name,
238962306a36Sopenharmony_ci			res->lockname.len, res->lockname.name, node);
239062306a36Sopenharmony_ci		ret = 0;
239162306a36Sopenharmony_ci		goto done;
239262306a36Sopenharmony_ci	}
239362306a36Sopenharmony_ci
239462306a36Sopenharmony_ci	__dlm_do_purge_lockres(dlm, res);
239562306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
239662306a36Sopenharmony_ci	wake_up(&res->wq);
239762306a36Sopenharmony_ci
239862306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
239962306a36Sopenharmony_ci
240062306a36Sopenharmony_ci	ret = 0;
240162306a36Sopenharmony_cidone:
240262306a36Sopenharmony_ci	if (res)
240362306a36Sopenharmony_ci		dlm_lockres_put(res);
240462306a36Sopenharmony_ci	dlm_put(dlm);
240562306a36Sopenharmony_ci	return ret;
240662306a36Sopenharmony_ci}
240762306a36Sopenharmony_ci
240862306a36Sopenharmony_cistatic void dlm_drop_lockres_ref_done(struct dlm_ctxt *dlm,
240962306a36Sopenharmony_ci		struct dlm_lock_resource *res, u8 node)
241062306a36Sopenharmony_ci{
241162306a36Sopenharmony_ci	struct dlm_deref_lockres_done deref;
241262306a36Sopenharmony_ci	int ret = 0, r;
241362306a36Sopenharmony_ci	const char *lockname;
241462306a36Sopenharmony_ci	unsigned int namelen;
241562306a36Sopenharmony_ci
241662306a36Sopenharmony_ci	lockname = res->lockname.name;
241762306a36Sopenharmony_ci	namelen = res->lockname.len;
241862306a36Sopenharmony_ci	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
241962306a36Sopenharmony_ci
242062306a36Sopenharmony_ci	memset(&deref, 0, sizeof(deref));
242162306a36Sopenharmony_ci	deref.node_idx = dlm->node_num;
242262306a36Sopenharmony_ci	deref.namelen = namelen;
242362306a36Sopenharmony_ci	memcpy(deref.name, lockname, namelen);
242462306a36Sopenharmony_ci
242562306a36Sopenharmony_ci	ret = o2net_send_message(DLM_DEREF_LOCKRES_DONE, dlm->key,
242662306a36Sopenharmony_ci				 &deref, sizeof(deref), node, &r);
242762306a36Sopenharmony_ci	if (ret < 0) {
242862306a36Sopenharmony_ci		mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF DONE "
242962306a36Sopenharmony_ci				" to node %u\n", dlm->name, namelen,
243062306a36Sopenharmony_ci				lockname, ret, node);
243162306a36Sopenharmony_ci	} else if (r < 0) {
243262306a36Sopenharmony_ci		/* ignore the error */
243362306a36Sopenharmony_ci		mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
243462306a36Sopenharmony_ci		     dlm->name, namelen, lockname, node, r);
243562306a36Sopenharmony_ci		dlm_print_one_lock_resource(res);
243662306a36Sopenharmony_ci	}
243762306a36Sopenharmony_ci}
243862306a36Sopenharmony_ci
243962306a36Sopenharmony_cistatic void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
244062306a36Sopenharmony_ci{
244162306a36Sopenharmony_ci	struct dlm_ctxt *dlm;
244262306a36Sopenharmony_ci	struct dlm_lock_resource *res;
244362306a36Sopenharmony_ci	u8 node;
244462306a36Sopenharmony_ci	u8 cleared = 0;
244562306a36Sopenharmony_ci
244662306a36Sopenharmony_ci	dlm = item->dlm;
244762306a36Sopenharmony_ci	res = item->u.dl.deref_res;
244862306a36Sopenharmony_ci	node = item->u.dl.deref_node;
244962306a36Sopenharmony_ci
245062306a36Sopenharmony_ci	spin_lock(&res->spinlock);
245162306a36Sopenharmony_ci	BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
245262306a36Sopenharmony_ci	__dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
245362306a36Sopenharmony_ci	if (test_bit(node, res->refmap)) {
245462306a36Sopenharmony_ci		dlm_lockres_clear_refmap_bit(dlm, res, node);
245562306a36Sopenharmony_ci		cleared = 1;
245662306a36Sopenharmony_ci	}
245762306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
245862306a36Sopenharmony_ci
245962306a36Sopenharmony_ci	dlm_drop_lockres_ref_done(dlm, res, node);
246062306a36Sopenharmony_ci
246162306a36Sopenharmony_ci	if (cleared) {
246262306a36Sopenharmony_ci		mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
246362306a36Sopenharmony_ci		     dlm->name, res->lockname.len, res->lockname.name, node);
246462306a36Sopenharmony_ci		dlm_lockres_calc_usage(dlm, res);
246562306a36Sopenharmony_ci	} else {
246662306a36Sopenharmony_ci		mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
246762306a36Sopenharmony_ci		     "but it is already dropped!\n", dlm->name,
246862306a36Sopenharmony_ci		     res->lockname.len, res->lockname.name, node);
246962306a36Sopenharmony_ci		dlm_print_one_lock_resource(res);
247062306a36Sopenharmony_ci	}
247162306a36Sopenharmony_ci
247262306a36Sopenharmony_ci	dlm_lockres_put(res);
247362306a36Sopenharmony_ci}
247462306a36Sopenharmony_ci
247562306a36Sopenharmony_ci/*
247662306a36Sopenharmony_ci * A migratable resource is one that is :
247762306a36Sopenharmony_ci * 1. locally mastered, and,
247862306a36Sopenharmony_ci * 2. zero local locks, and,
247962306a36Sopenharmony_ci * 3. one or more non-local locks, or, one or more references
248062306a36Sopenharmony_ci * Returns 1 if yes, 0 if not.
248162306a36Sopenharmony_ci */
248262306a36Sopenharmony_cistatic int dlm_is_lockres_migratable(struct dlm_ctxt *dlm,
248362306a36Sopenharmony_ci				      struct dlm_lock_resource *res)
248462306a36Sopenharmony_ci{
248562306a36Sopenharmony_ci	enum dlm_lockres_list idx;
248662306a36Sopenharmony_ci	int nonlocal = 0, node_ref;
248762306a36Sopenharmony_ci	struct list_head *queue;
248862306a36Sopenharmony_ci	struct dlm_lock *lock;
248962306a36Sopenharmony_ci	u64 cookie;
249062306a36Sopenharmony_ci
249162306a36Sopenharmony_ci	assert_spin_locked(&res->spinlock);
249262306a36Sopenharmony_ci
249362306a36Sopenharmony_ci	/* delay migration when the lockres is in MIGRATING state */
249462306a36Sopenharmony_ci	if (res->state & DLM_LOCK_RES_MIGRATING)
249562306a36Sopenharmony_ci		return 0;
249662306a36Sopenharmony_ci
249762306a36Sopenharmony_ci	/* delay migration when the lockres is in RECOCERING state */
249862306a36Sopenharmony_ci	if (res->state & (DLM_LOCK_RES_RECOVERING|
249962306a36Sopenharmony_ci			DLM_LOCK_RES_RECOVERY_WAITING))
250062306a36Sopenharmony_ci		return 0;
250162306a36Sopenharmony_ci
250262306a36Sopenharmony_ci	if (res->owner != dlm->node_num)
250362306a36Sopenharmony_ci		return 0;
250462306a36Sopenharmony_ci
250562306a36Sopenharmony_ci        for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
250662306a36Sopenharmony_ci		queue = dlm_list_idx_to_ptr(res, idx);
250762306a36Sopenharmony_ci		list_for_each_entry(lock, queue, list) {
250862306a36Sopenharmony_ci			if (lock->ml.node != dlm->node_num) {
250962306a36Sopenharmony_ci				nonlocal++;
251062306a36Sopenharmony_ci				continue;
251162306a36Sopenharmony_ci			}
251262306a36Sopenharmony_ci			cookie = be64_to_cpu(lock->ml.cookie);
251362306a36Sopenharmony_ci			mlog(0, "%s: Not migratable res %.*s, lock %u:%llu on "
251462306a36Sopenharmony_ci			     "%s list\n", dlm->name, res->lockname.len,
251562306a36Sopenharmony_ci			     res->lockname.name,
251662306a36Sopenharmony_ci			     dlm_get_lock_cookie_node(cookie),
251762306a36Sopenharmony_ci			     dlm_get_lock_cookie_seq(cookie),
251862306a36Sopenharmony_ci			     dlm_list_in_text(idx));
251962306a36Sopenharmony_ci			return 0;
252062306a36Sopenharmony_ci		}
252162306a36Sopenharmony_ci	}
252262306a36Sopenharmony_ci
252362306a36Sopenharmony_ci	if (!nonlocal) {
252462306a36Sopenharmony_ci		node_ref = find_first_bit(res->refmap, O2NM_MAX_NODES);
252562306a36Sopenharmony_ci		if (node_ref >= O2NM_MAX_NODES)
252662306a36Sopenharmony_ci			return 0;
252762306a36Sopenharmony_ci	}
252862306a36Sopenharmony_ci
252962306a36Sopenharmony_ci	mlog(0, "%s: res %.*s, Migratable\n", dlm->name, res->lockname.len,
253062306a36Sopenharmony_ci	     res->lockname.name);
253162306a36Sopenharmony_ci
253262306a36Sopenharmony_ci	return 1;
253362306a36Sopenharmony_ci}
253462306a36Sopenharmony_ci
253562306a36Sopenharmony_ci/*
253662306a36Sopenharmony_ci * DLM_MIGRATE_LOCKRES
253762306a36Sopenharmony_ci */
253862306a36Sopenharmony_ci
253962306a36Sopenharmony_ci
254062306a36Sopenharmony_cistatic int dlm_migrate_lockres(struct dlm_ctxt *dlm,
254162306a36Sopenharmony_ci			       struct dlm_lock_resource *res, u8 target)
254262306a36Sopenharmony_ci{
254362306a36Sopenharmony_ci	struct dlm_master_list_entry *mle = NULL;
254462306a36Sopenharmony_ci	struct dlm_master_list_entry *oldmle = NULL;
254562306a36Sopenharmony_ci 	struct dlm_migratable_lockres *mres = NULL;
254662306a36Sopenharmony_ci	int ret = 0;
254762306a36Sopenharmony_ci	const char *name;
254862306a36Sopenharmony_ci	unsigned int namelen;
254962306a36Sopenharmony_ci	int mle_added = 0;
255062306a36Sopenharmony_ci	int wake = 0;
255162306a36Sopenharmony_ci
255262306a36Sopenharmony_ci	if (!dlm_grab(dlm))
255362306a36Sopenharmony_ci		return -EINVAL;
255462306a36Sopenharmony_ci
255562306a36Sopenharmony_ci	name = res->lockname.name;
255662306a36Sopenharmony_ci	namelen = res->lockname.len;
255762306a36Sopenharmony_ci
255862306a36Sopenharmony_ci	mlog(0, "%s: Migrating %.*s to node %u\n", dlm->name, namelen, name,
255962306a36Sopenharmony_ci	     target);
256062306a36Sopenharmony_ci
256162306a36Sopenharmony_ci	/* preallocate up front. if this fails, abort */
256262306a36Sopenharmony_ci	ret = -ENOMEM;
256362306a36Sopenharmony_ci	mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
256462306a36Sopenharmony_ci	if (!mres) {
256562306a36Sopenharmony_ci		mlog_errno(ret);
256662306a36Sopenharmony_ci		goto leave;
256762306a36Sopenharmony_ci	}
256862306a36Sopenharmony_ci
256962306a36Sopenharmony_ci	mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
257062306a36Sopenharmony_ci	if (!mle) {
257162306a36Sopenharmony_ci		mlog_errno(ret);
257262306a36Sopenharmony_ci		goto leave;
257362306a36Sopenharmony_ci	}
257462306a36Sopenharmony_ci	ret = 0;
257562306a36Sopenharmony_ci
257662306a36Sopenharmony_ci	/*
257762306a36Sopenharmony_ci	 * clear any existing master requests and
257862306a36Sopenharmony_ci	 * add the migration mle to the list
257962306a36Sopenharmony_ci	 */
258062306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
258162306a36Sopenharmony_ci	spin_lock(&dlm->master_lock);
258262306a36Sopenharmony_ci	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
258362306a36Sopenharmony_ci				    namelen, target, dlm->node_num);
258462306a36Sopenharmony_ci	/* get an extra reference on the mle.
258562306a36Sopenharmony_ci	 * otherwise the assert_master from the new
258662306a36Sopenharmony_ci	 * master will destroy this.
258762306a36Sopenharmony_ci	 */
258862306a36Sopenharmony_ci	if (ret != -EEXIST)
258962306a36Sopenharmony_ci		dlm_get_mle_inuse(mle);
259062306a36Sopenharmony_ci
259162306a36Sopenharmony_ci	spin_unlock(&dlm->master_lock);
259262306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
259362306a36Sopenharmony_ci
259462306a36Sopenharmony_ci	if (ret == -EEXIST) {
259562306a36Sopenharmony_ci		mlog(0, "another process is already migrating it\n");
259662306a36Sopenharmony_ci		goto fail;
259762306a36Sopenharmony_ci	}
259862306a36Sopenharmony_ci	mle_added = 1;
259962306a36Sopenharmony_ci
260062306a36Sopenharmony_ci	/*
260162306a36Sopenharmony_ci	 * set the MIGRATING flag and flush asts
260262306a36Sopenharmony_ci	 * if we fail after this we need to re-dirty the lockres
260362306a36Sopenharmony_ci	 */
260462306a36Sopenharmony_ci	if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
260562306a36Sopenharmony_ci		mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
260662306a36Sopenharmony_ci		     "the target went down.\n", res->lockname.len,
260762306a36Sopenharmony_ci		     res->lockname.name, target);
260862306a36Sopenharmony_ci		spin_lock(&res->spinlock);
260962306a36Sopenharmony_ci		res->state &= ~DLM_LOCK_RES_MIGRATING;
261062306a36Sopenharmony_ci		wake = 1;
261162306a36Sopenharmony_ci		spin_unlock(&res->spinlock);
261262306a36Sopenharmony_ci		ret = -EINVAL;
261362306a36Sopenharmony_ci	}
261462306a36Sopenharmony_ci
261562306a36Sopenharmony_cifail:
261662306a36Sopenharmony_ci	if (ret != -EEXIST && oldmle) {
261762306a36Sopenharmony_ci		/* master is known, detach if not already detached */
261862306a36Sopenharmony_ci		dlm_mle_detach_hb_events(dlm, oldmle);
261962306a36Sopenharmony_ci		dlm_put_mle(oldmle);
262062306a36Sopenharmony_ci	}
262162306a36Sopenharmony_ci
262262306a36Sopenharmony_ci	if (ret < 0) {
262362306a36Sopenharmony_ci		if (mle_added) {
262462306a36Sopenharmony_ci			dlm_mle_detach_hb_events(dlm, mle);
262562306a36Sopenharmony_ci			dlm_put_mle(mle);
262662306a36Sopenharmony_ci			dlm_put_mle_inuse(mle);
262762306a36Sopenharmony_ci		} else if (mle) {
262862306a36Sopenharmony_ci			kmem_cache_free(dlm_mle_cache, mle);
262962306a36Sopenharmony_ci			mle = NULL;
263062306a36Sopenharmony_ci		}
263162306a36Sopenharmony_ci		goto leave;
263262306a36Sopenharmony_ci	}
263362306a36Sopenharmony_ci
263462306a36Sopenharmony_ci	/*
263562306a36Sopenharmony_ci	 * at this point, we have a migration target, an mle
263662306a36Sopenharmony_ci	 * in the master list, and the MIGRATING flag set on
263762306a36Sopenharmony_ci	 * the lockres
263862306a36Sopenharmony_ci	 */
263962306a36Sopenharmony_ci
264062306a36Sopenharmony_ci	/* now that remote nodes are spinning on the MIGRATING flag,
264162306a36Sopenharmony_ci	 * ensure that all assert_master work is flushed. */
264262306a36Sopenharmony_ci	flush_workqueue(dlm->dlm_worker);
264362306a36Sopenharmony_ci
264462306a36Sopenharmony_ci	/* notify new node and send all lock state */
264562306a36Sopenharmony_ci	/* call send_one_lockres with migration flag.
264662306a36Sopenharmony_ci	 * this serves as notice to the target node that a
264762306a36Sopenharmony_ci	 * migration is starting. */
264862306a36Sopenharmony_ci	ret = dlm_send_one_lockres(dlm, res, mres, target,
264962306a36Sopenharmony_ci				   DLM_MRES_MIGRATION);
265062306a36Sopenharmony_ci
265162306a36Sopenharmony_ci	if (ret < 0) {
265262306a36Sopenharmony_ci		mlog(0, "migration to node %u failed with %d\n",
265362306a36Sopenharmony_ci		     target, ret);
265462306a36Sopenharmony_ci		/* migration failed, detach and clean up mle */
265562306a36Sopenharmony_ci		dlm_mle_detach_hb_events(dlm, mle);
265662306a36Sopenharmony_ci		dlm_put_mle(mle);
265762306a36Sopenharmony_ci		dlm_put_mle_inuse(mle);
265862306a36Sopenharmony_ci		spin_lock(&res->spinlock);
265962306a36Sopenharmony_ci		res->state &= ~DLM_LOCK_RES_MIGRATING;
266062306a36Sopenharmony_ci		wake = 1;
266162306a36Sopenharmony_ci		spin_unlock(&res->spinlock);
266262306a36Sopenharmony_ci		if (dlm_is_host_down(ret))
266362306a36Sopenharmony_ci			dlm_wait_for_node_death(dlm, target,
266462306a36Sopenharmony_ci						DLM_NODE_DEATH_WAIT_MAX);
266562306a36Sopenharmony_ci		goto leave;
266662306a36Sopenharmony_ci	}
266762306a36Sopenharmony_ci
266862306a36Sopenharmony_ci	/* at this point, the target sends a message to all nodes,
266962306a36Sopenharmony_ci	 * (using dlm_do_migrate_request).  this node is skipped since
267062306a36Sopenharmony_ci	 * we had to put an mle in the list to begin the process.  this
267162306a36Sopenharmony_ci	 * node now waits for target to do an assert master.  this node
267262306a36Sopenharmony_ci	 * will be the last one notified, ensuring that the migration
267362306a36Sopenharmony_ci	 * is complete everywhere.  if the target dies while this is
267462306a36Sopenharmony_ci	 * going on, some nodes could potentially see the target as the
267562306a36Sopenharmony_ci	 * master, so it is important that my recovery finds the migration
267662306a36Sopenharmony_ci	 * mle and sets the master to UNKNOWN. */
267762306a36Sopenharmony_ci
267862306a36Sopenharmony_ci
267962306a36Sopenharmony_ci	/* wait for new node to assert master */
268062306a36Sopenharmony_ci	while (1) {
268162306a36Sopenharmony_ci		ret = wait_event_interruptible_timeout(mle->wq,
268262306a36Sopenharmony_ci					(atomic_read(&mle->woken) == 1),
268362306a36Sopenharmony_ci					msecs_to_jiffies(5000));
268462306a36Sopenharmony_ci
268562306a36Sopenharmony_ci		if (ret >= 0) {
268662306a36Sopenharmony_ci		       	if (atomic_read(&mle->woken) == 1 ||
268762306a36Sopenharmony_ci			    res->owner == target)
268862306a36Sopenharmony_ci				break;
268962306a36Sopenharmony_ci
269062306a36Sopenharmony_ci			mlog(0, "%s:%.*s: timed out during migration\n",
269162306a36Sopenharmony_ci			     dlm->name, res->lockname.len, res->lockname.name);
269262306a36Sopenharmony_ci			/* avoid hang during shutdown when migrating lockres
269362306a36Sopenharmony_ci			 * to a node which also goes down */
269462306a36Sopenharmony_ci			if (dlm_is_node_dead(dlm, target)) {
269562306a36Sopenharmony_ci				mlog(0, "%s:%.*s: expected migration "
269662306a36Sopenharmony_ci				     "target %u is no longer up, restarting\n",
269762306a36Sopenharmony_ci				     dlm->name, res->lockname.len,
269862306a36Sopenharmony_ci				     res->lockname.name, target);
269962306a36Sopenharmony_ci				ret = -EINVAL;
270062306a36Sopenharmony_ci				/* migration failed, detach and clean up mle */
270162306a36Sopenharmony_ci				dlm_mle_detach_hb_events(dlm, mle);
270262306a36Sopenharmony_ci				dlm_put_mle(mle);
270362306a36Sopenharmony_ci				dlm_put_mle_inuse(mle);
270462306a36Sopenharmony_ci				spin_lock(&res->spinlock);
270562306a36Sopenharmony_ci				res->state &= ~DLM_LOCK_RES_MIGRATING;
270662306a36Sopenharmony_ci				wake = 1;
270762306a36Sopenharmony_ci				spin_unlock(&res->spinlock);
270862306a36Sopenharmony_ci				goto leave;
270962306a36Sopenharmony_ci			}
271062306a36Sopenharmony_ci		} else
271162306a36Sopenharmony_ci			mlog(0, "%s:%.*s: caught signal during migration\n",
271262306a36Sopenharmony_ci			     dlm->name, res->lockname.len, res->lockname.name);
271362306a36Sopenharmony_ci	}
271462306a36Sopenharmony_ci
271562306a36Sopenharmony_ci	/* all done, set the owner, clear the flag */
271662306a36Sopenharmony_ci	spin_lock(&res->spinlock);
271762306a36Sopenharmony_ci	dlm_set_lockres_owner(dlm, res, target);
271862306a36Sopenharmony_ci	res->state &= ~DLM_LOCK_RES_MIGRATING;
271962306a36Sopenharmony_ci	dlm_remove_nonlocal_locks(dlm, res);
272062306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
272162306a36Sopenharmony_ci	wake_up(&res->wq);
272262306a36Sopenharmony_ci
272362306a36Sopenharmony_ci	/* master is known, detach if not already detached */
272462306a36Sopenharmony_ci	dlm_mle_detach_hb_events(dlm, mle);
272562306a36Sopenharmony_ci	dlm_put_mle_inuse(mle);
272662306a36Sopenharmony_ci	ret = 0;
272762306a36Sopenharmony_ci
272862306a36Sopenharmony_ci	dlm_lockres_calc_usage(dlm, res);
272962306a36Sopenharmony_ci
273062306a36Sopenharmony_cileave:
273162306a36Sopenharmony_ci	/* re-dirty the lockres if we failed */
273262306a36Sopenharmony_ci	if (ret < 0)
273362306a36Sopenharmony_ci		dlm_kick_thread(dlm, res);
273462306a36Sopenharmony_ci
273562306a36Sopenharmony_ci	/* wake up waiters if the MIGRATING flag got set
273662306a36Sopenharmony_ci	 * but migration failed */
273762306a36Sopenharmony_ci	if (wake)
273862306a36Sopenharmony_ci		wake_up(&res->wq);
273962306a36Sopenharmony_ci
274062306a36Sopenharmony_ci	if (mres)
274162306a36Sopenharmony_ci		free_page((unsigned long)mres);
274262306a36Sopenharmony_ci
274362306a36Sopenharmony_ci	dlm_put(dlm);
274462306a36Sopenharmony_ci
274562306a36Sopenharmony_ci	mlog(0, "%s: Migrating %.*s to %u, returns %d\n", dlm->name, namelen,
274662306a36Sopenharmony_ci	     name, target, ret);
274762306a36Sopenharmony_ci	return ret;
274862306a36Sopenharmony_ci}
274962306a36Sopenharmony_ci
275062306a36Sopenharmony_ci/*
275162306a36Sopenharmony_ci * Should be called only after beginning the domain leave process.
275262306a36Sopenharmony_ci * There should not be any remaining locks on nonlocal lock resources,
275362306a36Sopenharmony_ci * and there should be no local locks left on locally mastered resources.
275462306a36Sopenharmony_ci *
275562306a36Sopenharmony_ci * Called with the dlm spinlock held, may drop it to do migration, but
275662306a36Sopenharmony_ci * will re-acquire before exit.
275762306a36Sopenharmony_ci *
275862306a36Sopenharmony_ci * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped
275962306a36Sopenharmony_ci */
276062306a36Sopenharmony_ciint dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
276162306a36Sopenharmony_ci	__must_hold(&dlm->spinlock)
276262306a36Sopenharmony_ci{
276362306a36Sopenharmony_ci	int ret;
276462306a36Sopenharmony_ci	int lock_dropped = 0;
276562306a36Sopenharmony_ci	u8 target = O2NM_MAX_NODES;
276662306a36Sopenharmony_ci
276762306a36Sopenharmony_ci	assert_spin_locked(&dlm->spinlock);
276862306a36Sopenharmony_ci
276962306a36Sopenharmony_ci	spin_lock(&res->spinlock);
277062306a36Sopenharmony_ci	if (dlm_is_lockres_migratable(dlm, res))
277162306a36Sopenharmony_ci		target = dlm_pick_migration_target(dlm, res);
277262306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
277362306a36Sopenharmony_ci
277462306a36Sopenharmony_ci	if (target == O2NM_MAX_NODES)
277562306a36Sopenharmony_ci		goto leave;
277662306a36Sopenharmony_ci
277762306a36Sopenharmony_ci	/* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
277862306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
277962306a36Sopenharmony_ci	lock_dropped = 1;
278062306a36Sopenharmony_ci	ret = dlm_migrate_lockres(dlm, res, target);
278162306a36Sopenharmony_ci	if (ret)
278262306a36Sopenharmony_ci		mlog(0, "%s: res %.*s, Migrate to node %u failed with %d\n",
278362306a36Sopenharmony_ci		     dlm->name, res->lockname.len, res->lockname.name,
278462306a36Sopenharmony_ci		     target, ret);
278562306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
278662306a36Sopenharmony_cileave:
278762306a36Sopenharmony_ci	return lock_dropped;
278862306a36Sopenharmony_ci}
278962306a36Sopenharmony_ci
279062306a36Sopenharmony_ciint dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
279162306a36Sopenharmony_ci{
279262306a36Sopenharmony_ci	int ret;
279362306a36Sopenharmony_ci	spin_lock(&dlm->ast_lock);
279462306a36Sopenharmony_ci	spin_lock(&lock->spinlock);
279562306a36Sopenharmony_ci	ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
279662306a36Sopenharmony_ci	spin_unlock(&lock->spinlock);
279762306a36Sopenharmony_ci	spin_unlock(&dlm->ast_lock);
279862306a36Sopenharmony_ci	return ret;
279962306a36Sopenharmony_ci}
280062306a36Sopenharmony_ci
280162306a36Sopenharmony_cistatic int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
280262306a36Sopenharmony_ci				     struct dlm_lock_resource *res,
280362306a36Sopenharmony_ci				     u8 mig_target)
280462306a36Sopenharmony_ci{
280562306a36Sopenharmony_ci	int can_proceed;
280662306a36Sopenharmony_ci	spin_lock(&res->spinlock);
280762306a36Sopenharmony_ci	can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
280862306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
280962306a36Sopenharmony_ci
281062306a36Sopenharmony_ci	/* target has died, so make the caller break out of the
281162306a36Sopenharmony_ci	 * wait_event, but caller must recheck the domain_map */
281262306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
281362306a36Sopenharmony_ci	if (!test_bit(mig_target, dlm->domain_map))
281462306a36Sopenharmony_ci		can_proceed = 1;
281562306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
281662306a36Sopenharmony_ci	return can_proceed;
281762306a36Sopenharmony_ci}
281862306a36Sopenharmony_ci
281962306a36Sopenharmony_cistatic int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
282062306a36Sopenharmony_ci				struct dlm_lock_resource *res)
282162306a36Sopenharmony_ci{
282262306a36Sopenharmony_ci	int ret;
282362306a36Sopenharmony_ci	spin_lock(&res->spinlock);
282462306a36Sopenharmony_ci	ret = !!(res->state & DLM_LOCK_RES_DIRTY);
282562306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
282662306a36Sopenharmony_ci	return ret;
282762306a36Sopenharmony_ci}
282862306a36Sopenharmony_ci
282962306a36Sopenharmony_ci
283062306a36Sopenharmony_cistatic int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
283162306a36Sopenharmony_ci				       struct dlm_lock_resource *res,
283262306a36Sopenharmony_ci				       u8 target)
283362306a36Sopenharmony_ci{
283462306a36Sopenharmony_ci	int ret = 0;
283562306a36Sopenharmony_ci
283662306a36Sopenharmony_ci	mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
283762306a36Sopenharmony_ci	       res->lockname.len, res->lockname.name, dlm->node_num,
283862306a36Sopenharmony_ci	       target);
283962306a36Sopenharmony_ci	/* need to set MIGRATING flag on lockres.  this is done by
284062306a36Sopenharmony_ci	 * ensuring that all asts have been flushed for this lockres. */
284162306a36Sopenharmony_ci	spin_lock(&res->spinlock);
284262306a36Sopenharmony_ci	BUG_ON(res->migration_pending);
284362306a36Sopenharmony_ci	res->migration_pending = 1;
284462306a36Sopenharmony_ci	/* strategy is to reserve an extra ast then release
284562306a36Sopenharmony_ci	 * it below, letting the release do all of the work */
284662306a36Sopenharmony_ci	__dlm_lockres_reserve_ast(res);
284762306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
284862306a36Sopenharmony_ci
284962306a36Sopenharmony_ci	/* now flush all the pending asts */
285062306a36Sopenharmony_ci	dlm_kick_thread(dlm, res);
285162306a36Sopenharmony_ci	/* before waiting on DIRTY, block processes which may
285262306a36Sopenharmony_ci	 * try to dirty the lockres before MIGRATING is set */
285362306a36Sopenharmony_ci	spin_lock(&res->spinlock);
285462306a36Sopenharmony_ci	BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
285562306a36Sopenharmony_ci	res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
285662306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
285762306a36Sopenharmony_ci	/* now wait on any pending asts and the DIRTY state */
285862306a36Sopenharmony_ci	wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
285962306a36Sopenharmony_ci	dlm_lockres_release_ast(dlm, res);
286062306a36Sopenharmony_ci
286162306a36Sopenharmony_ci	mlog(0, "about to wait on migration_wq, dirty=%s\n",
286262306a36Sopenharmony_ci	       res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
286362306a36Sopenharmony_ci	/* if the extra ref we just put was the final one, this
286462306a36Sopenharmony_ci	 * will pass thru immediately.  otherwise, we need to wait
286562306a36Sopenharmony_ci	 * for the last ast to finish. */
286662306a36Sopenharmony_ciagain:
286762306a36Sopenharmony_ci	ret = wait_event_interruptible_timeout(dlm->migration_wq,
286862306a36Sopenharmony_ci		   dlm_migration_can_proceed(dlm, res, target),
286962306a36Sopenharmony_ci		   msecs_to_jiffies(1000));
287062306a36Sopenharmony_ci	if (ret < 0) {
287162306a36Sopenharmony_ci		mlog(0, "woken again: migrating? %s, dead? %s\n",
287262306a36Sopenharmony_ci		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
287362306a36Sopenharmony_ci		       test_bit(target, dlm->domain_map) ? "no":"yes");
287462306a36Sopenharmony_ci	} else {
287562306a36Sopenharmony_ci		mlog(0, "all is well: migrating? %s, dead? %s\n",
287662306a36Sopenharmony_ci		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
287762306a36Sopenharmony_ci		       test_bit(target, dlm->domain_map) ? "no":"yes");
287862306a36Sopenharmony_ci	}
287962306a36Sopenharmony_ci	if (!dlm_migration_can_proceed(dlm, res, target)) {
288062306a36Sopenharmony_ci		mlog(0, "trying again...\n");
288162306a36Sopenharmony_ci		goto again;
288262306a36Sopenharmony_ci	}
288362306a36Sopenharmony_ci
288462306a36Sopenharmony_ci	ret = 0;
288562306a36Sopenharmony_ci	/* did the target go down or die? */
288662306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
288762306a36Sopenharmony_ci	if (!test_bit(target, dlm->domain_map)) {
288862306a36Sopenharmony_ci		mlog(ML_ERROR, "aha. migration target %u just went down\n",
288962306a36Sopenharmony_ci		     target);
289062306a36Sopenharmony_ci		ret = -EHOSTDOWN;
289162306a36Sopenharmony_ci	}
289262306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
289362306a36Sopenharmony_ci
289462306a36Sopenharmony_ci	/*
289562306a36Sopenharmony_ci	 * if target is down, we need to clear DLM_LOCK_RES_BLOCK_DIRTY for
289662306a36Sopenharmony_ci	 * another try; otherwise, we are sure the MIGRATING state is there,
289762306a36Sopenharmony_ci	 * drop the unneeded state which blocked threads trying to DIRTY
289862306a36Sopenharmony_ci	 */
289962306a36Sopenharmony_ci	spin_lock(&res->spinlock);
290062306a36Sopenharmony_ci	BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
290162306a36Sopenharmony_ci	res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
290262306a36Sopenharmony_ci	if (!ret)
290362306a36Sopenharmony_ci		BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
290462306a36Sopenharmony_ci	else
290562306a36Sopenharmony_ci		res->migration_pending = 0;
290662306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
290762306a36Sopenharmony_ci
290862306a36Sopenharmony_ci	/*
290962306a36Sopenharmony_ci	 * at this point:
291062306a36Sopenharmony_ci	 *
291162306a36Sopenharmony_ci	 *   o the DLM_LOCK_RES_MIGRATING flag is set if target not down
291262306a36Sopenharmony_ci	 *   o there are no pending asts on this lockres
291362306a36Sopenharmony_ci	 *   o all processes trying to reserve an ast on this
291462306a36Sopenharmony_ci	 *     lockres must wait for the MIGRATING flag to clear
291562306a36Sopenharmony_ci	 */
291662306a36Sopenharmony_ci	return ret;
291762306a36Sopenharmony_ci}
291862306a36Sopenharmony_ci
291962306a36Sopenharmony_ci/* last step in the migration process.
292062306a36Sopenharmony_ci * original master calls this to free all of the dlm_lock
292162306a36Sopenharmony_ci * structures that used to be for other nodes. */
292262306a36Sopenharmony_cistatic void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
292362306a36Sopenharmony_ci				      struct dlm_lock_resource *res)
292462306a36Sopenharmony_ci{
292562306a36Sopenharmony_ci	struct list_head *queue = &res->granted;
292662306a36Sopenharmony_ci	int i, bit;
292762306a36Sopenharmony_ci	struct dlm_lock *lock, *next;
292862306a36Sopenharmony_ci
292962306a36Sopenharmony_ci	assert_spin_locked(&res->spinlock);
293062306a36Sopenharmony_ci
293162306a36Sopenharmony_ci	BUG_ON(res->owner == dlm->node_num);
293262306a36Sopenharmony_ci
293362306a36Sopenharmony_ci	for (i=0; i<3; i++) {
293462306a36Sopenharmony_ci		list_for_each_entry_safe(lock, next, queue, list) {
293562306a36Sopenharmony_ci			if (lock->ml.node != dlm->node_num) {
293662306a36Sopenharmony_ci				mlog(0, "putting lock for node %u\n",
293762306a36Sopenharmony_ci				     lock->ml.node);
293862306a36Sopenharmony_ci				/* be extra careful */
293962306a36Sopenharmony_ci				BUG_ON(!list_empty(&lock->ast_list));
294062306a36Sopenharmony_ci				BUG_ON(!list_empty(&lock->bast_list));
294162306a36Sopenharmony_ci				BUG_ON(lock->ast_pending);
294262306a36Sopenharmony_ci				BUG_ON(lock->bast_pending);
294362306a36Sopenharmony_ci				dlm_lockres_clear_refmap_bit(dlm, res,
294462306a36Sopenharmony_ci							     lock->ml.node);
294562306a36Sopenharmony_ci				list_del_init(&lock->list);
294662306a36Sopenharmony_ci				dlm_lock_put(lock);
294762306a36Sopenharmony_ci				/* In a normal unlock, we would have added a
294862306a36Sopenharmony_ci				 * DLM_UNLOCK_FREE_LOCK action. Force it. */
294962306a36Sopenharmony_ci				dlm_lock_put(lock);
295062306a36Sopenharmony_ci			}
295162306a36Sopenharmony_ci		}
295262306a36Sopenharmony_ci		queue++;
295362306a36Sopenharmony_ci	}
295462306a36Sopenharmony_ci	bit = 0;
295562306a36Sopenharmony_ci	while (1) {
295662306a36Sopenharmony_ci		bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
295762306a36Sopenharmony_ci		if (bit >= O2NM_MAX_NODES)
295862306a36Sopenharmony_ci			break;
295962306a36Sopenharmony_ci		/* do not clear the local node reference, if there is a
296062306a36Sopenharmony_ci		 * process holding this, let it drop the ref itself */
296162306a36Sopenharmony_ci		if (bit != dlm->node_num) {
296262306a36Sopenharmony_ci			mlog(0, "%s:%.*s: node %u had a ref to this "
296362306a36Sopenharmony_ci			     "migrating lockres, clearing\n", dlm->name,
296462306a36Sopenharmony_ci			     res->lockname.len, res->lockname.name, bit);
296562306a36Sopenharmony_ci			dlm_lockres_clear_refmap_bit(dlm, res, bit);
296662306a36Sopenharmony_ci		}
296762306a36Sopenharmony_ci		bit++;
296862306a36Sopenharmony_ci	}
296962306a36Sopenharmony_ci}
297062306a36Sopenharmony_ci
297162306a36Sopenharmony_ci/*
297262306a36Sopenharmony_ci * Pick a node to migrate the lock resource to. This function selects a
297362306a36Sopenharmony_ci * potential target based first on the locks and then on refmap. It skips
297462306a36Sopenharmony_ci * nodes that are in the process of exiting the domain.
297562306a36Sopenharmony_ci */
297662306a36Sopenharmony_cistatic u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
297762306a36Sopenharmony_ci				    struct dlm_lock_resource *res)
297862306a36Sopenharmony_ci{
297962306a36Sopenharmony_ci	enum dlm_lockres_list idx;
298062306a36Sopenharmony_ci	struct list_head *queue;
298162306a36Sopenharmony_ci	struct dlm_lock *lock;
298262306a36Sopenharmony_ci	int noderef;
298362306a36Sopenharmony_ci	u8 nodenum = O2NM_MAX_NODES;
298462306a36Sopenharmony_ci
298562306a36Sopenharmony_ci	assert_spin_locked(&dlm->spinlock);
298662306a36Sopenharmony_ci	assert_spin_locked(&res->spinlock);
298762306a36Sopenharmony_ci
298862306a36Sopenharmony_ci	/* Go through all the locks */
298962306a36Sopenharmony_ci	for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
299062306a36Sopenharmony_ci		queue = dlm_list_idx_to_ptr(res, idx);
299162306a36Sopenharmony_ci		list_for_each_entry(lock, queue, list) {
299262306a36Sopenharmony_ci			if (lock->ml.node == dlm->node_num)
299362306a36Sopenharmony_ci				continue;
299462306a36Sopenharmony_ci			if (test_bit(lock->ml.node, dlm->exit_domain_map))
299562306a36Sopenharmony_ci				continue;
299662306a36Sopenharmony_ci			nodenum = lock->ml.node;
299762306a36Sopenharmony_ci			goto bail;
299862306a36Sopenharmony_ci		}
299962306a36Sopenharmony_ci	}
300062306a36Sopenharmony_ci
300162306a36Sopenharmony_ci	/* Go thru the refmap */
300262306a36Sopenharmony_ci	noderef = -1;
300362306a36Sopenharmony_ci	while (1) {
300462306a36Sopenharmony_ci		noderef = find_next_bit(res->refmap, O2NM_MAX_NODES,
300562306a36Sopenharmony_ci					noderef + 1);
300662306a36Sopenharmony_ci		if (noderef >= O2NM_MAX_NODES)
300762306a36Sopenharmony_ci			break;
300862306a36Sopenharmony_ci		if (noderef == dlm->node_num)
300962306a36Sopenharmony_ci			continue;
301062306a36Sopenharmony_ci		if (test_bit(noderef, dlm->exit_domain_map))
301162306a36Sopenharmony_ci			continue;
301262306a36Sopenharmony_ci		nodenum = noderef;
301362306a36Sopenharmony_ci		goto bail;
301462306a36Sopenharmony_ci	}
301562306a36Sopenharmony_ci
301662306a36Sopenharmony_cibail:
301762306a36Sopenharmony_ci	return nodenum;
301862306a36Sopenharmony_ci}
301962306a36Sopenharmony_ci
302062306a36Sopenharmony_ci/* this is called by the new master once all lockres
302162306a36Sopenharmony_ci * data has been received */
302262306a36Sopenharmony_cistatic int dlm_do_migrate_request(struct dlm_ctxt *dlm,
302362306a36Sopenharmony_ci				  struct dlm_lock_resource *res,
302462306a36Sopenharmony_ci				  u8 master, u8 new_master,
302562306a36Sopenharmony_ci				  struct dlm_node_iter *iter)
302662306a36Sopenharmony_ci{
302762306a36Sopenharmony_ci	struct dlm_migrate_request migrate;
302862306a36Sopenharmony_ci	int ret, skip, status = 0;
302962306a36Sopenharmony_ci	int nodenum;
303062306a36Sopenharmony_ci
303162306a36Sopenharmony_ci	memset(&migrate, 0, sizeof(migrate));
303262306a36Sopenharmony_ci	migrate.namelen = res->lockname.len;
303362306a36Sopenharmony_ci	memcpy(migrate.name, res->lockname.name, migrate.namelen);
303462306a36Sopenharmony_ci	migrate.new_master = new_master;
303562306a36Sopenharmony_ci	migrate.master = master;
303662306a36Sopenharmony_ci
303762306a36Sopenharmony_ci	ret = 0;
303862306a36Sopenharmony_ci
303962306a36Sopenharmony_ci	/* send message to all nodes, except the master and myself */
304062306a36Sopenharmony_ci	while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
304162306a36Sopenharmony_ci		if (nodenum == master ||
304262306a36Sopenharmony_ci		    nodenum == new_master)
304362306a36Sopenharmony_ci			continue;
304462306a36Sopenharmony_ci
304562306a36Sopenharmony_ci		/* We could race exit domain. If exited, skip. */
304662306a36Sopenharmony_ci		spin_lock(&dlm->spinlock);
304762306a36Sopenharmony_ci		skip = (!test_bit(nodenum, dlm->domain_map));
304862306a36Sopenharmony_ci		spin_unlock(&dlm->spinlock);
304962306a36Sopenharmony_ci		if (skip) {
305062306a36Sopenharmony_ci			clear_bit(nodenum, iter->node_map);
305162306a36Sopenharmony_ci			continue;
305262306a36Sopenharmony_ci		}
305362306a36Sopenharmony_ci
305462306a36Sopenharmony_ci		ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
305562306a36Sopenharmony_ci					 &migrate, sizeof(migrate), nodenum,
305662306a36Sopenharmony_ci					 &status);
305762306a36Sopenharmony_ci		if (ret < 0) {
305862306a36Sopenharmony_ci			mlog(ML_ERROR, "%s: res %.*s, Error %d send "
305962306a36Sopenharmony_ci			     "MIGRATE_REQUEST to node %u\n", dlm->name,
306062306a36Sopenharmony_ci			     migrate.namelen, migrate.name, ret, nodenum);
306162306a36Sopenharmony_ci			if (!dlm_is_host_down(ret)) {
306262306a36Sopenharmony_ci				mlog(ML_ERROR, "unhandled error=%d!\n", ret);
306362306a36Sopenharmony_ci				BUG();
306462306a36Sopenharmony_ci			}
306562306a36Sopenharmony_ci			clear_bit(nodenum, iter->node_map);
306662306a36Sopenharmony_ci			ret = 0;
306762306a36Sopenharmony_ci		} else if (status < 0) {
306862306a36Sopenharmony_ci			mlog(0, "migrate request (node %u) returned %d!\n",
306962306a36Sopenharmony_ci			     nodenum, status);
307062306a36Sopenharmony_ci			ret = status;
307162306a36Sopenharmony_ci		} else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
307262306a36Sopenharmony_ci			/* during the migration request we short-circuited
307362306a36Sopenharmony_ci			 * the mastery of the lockres.  make sure we have
307462306a36Sopenharmony_ci			 * a mastery ref for nodenum */
307562306a36Sopenharmony_ci			mlog(0, "%s:%.*s: need ref for node %u\n",
307662306a36Sopenharmony_ci			     dlm->name, res->lockname.len, res->lockname.name,
307762306a36Sopenharmony_ci			     nodenum);
307862306a36Sopenharmony_ci			spin_lock(&res->spinlock);
307962306a36Sopenharmony_ci			dlm_lockres_set_refmap_bit(dlm, res, nodenum);
308062306a36Sopenharmony_ci			spin_unlock(&res->spinlock);
308162306a36Sopenharmony_ci		}
308262306a36Sopenharmony_ci	}
308362306a36Sopenharmony_ci
308462306a36Sopenharmony_ci	if (ret < 0)
308562306a36Sopenharmony_ci		mlog_errno(ret);
308662306a36Sopenharmony_ci
308762306a36Sopenharmony_ci	mlog(0, "returning ret=%d\n", ret);
308862306a36Sopenharmony_ci	return ret;
308962306a36Sopenharmony_ci}
309062306a36Sopenharmony_ci
309162306a36Sopenharmony_ci
309262306a36Sopenharmony_ci/* if there is an existing mle for this lockres, we now know who the master is.
309362306a36Sopenharmony_ci * (the one who sent us *this* message) we can clear it up right away.
309462306a36Sopenharmony_ci * since the process that put the mle on the list still has a reference to it,
309562306a36Sopenharmony_ci * we can unhash it now, set the master and wake the process.  as a result,
309662306a36Sopenharmony_ci * we will have no mle in the list to start with.  now we can add an mle for
309762306a36Sopenharmony_ci * the migration and this should be the only one found for those scanning the
309862306a36Sopenharmony_ci * list.  */
309962306a36Sopenharmony_ciint dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
310062306a36Sopenharmony_ci				void **ret_data)
310162306a36Sopenharmony_ci{
310262306a36Sopenharmony_ci	struct dlm_ctxt *dlm = data;
310362306a36Sopenharmony_ci	struct dlm_lock_resource *res = NULL;
310462306a36Sopenharmony_ci	struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
310562306a36Sopenharmony_ci	struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
310662306a36Sopenharmony_ci	const char *name;
310762306a36Sopenharmony_ci	unsigned int namelen, hash;
310862306a36Sopenharmony_ci	int ret = 0;
310962306a36Sopenharmony_ci
311062306a36Sopenharmony_ci	if (!dlm_grab(dlm))
311162306a36Sopenharmony_ci		return 0;
311262306a36Sopenharmony_ci
311362306a36Sopenharmony_ci	name = migrate->name;
311462306a36Sopenharmony_ci	namelen = migrate->namelen;
311562306a36Sopenharmony_ci	hash = dlm_lockid_hash(name, namelen);
311662306a36Sopenharmony_ci
311762306a36Sopenharmony_ci	/* preallocate.. if this fails, abort */
311862306a36Sopenharmony_ci	mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
311962306a36Sopenharmony_ci
312062306a36Sopenharmony_ci	if (!mle) {
312162306a36Sopenharmony_ci		ret = -ENOMEM;
312262306a36Sopenharmony_ci		goto leave;
312362306a36Sopenharmony_ci	}
312462306a36Sopenharmony_ci
312562306a36Sopenharmony_ci	/* check for pre-existing lock */
312662306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
312762306a36Sopenharmony_ci	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
312862306a36Sopenharmony_ci	if (res) {
312962306a36Sopenharmony_ci		spin_lock(&res->spinlock);
313062306a36Sopenharmony_ci		if (res->state & DLM_LOCK_RES_RECOVERING) {
313162306a36Sopenharmony_ci			/* if all is working ok, this can only mean that we got
313262306a36Sopenharmony_ci		 	* a migrate request from a node that we now see as
313362306a36Sopenharmony_ci		 	* dead.  what can we do here?  drop it to the floor? */
313462306a36Sopenharmony_ci			spin_unlock(&res->spinlock);
313562306a36Sopenharmony_ci			mlog(ML_ERROR, "Got a migrate request, but the "
313662306a36Sopenharmony_ci			     "lockres is marked as recovering!");
313762306a36Sopenharmony_ci			kmem_cache_free(dlm_mle_cache, mle);
313862306a36Sopenharmony_ci			ret = -EINVAL; /* need a better solution */
313962306a36Sopenharmony_ci			goto unlock;
314062306a36Sopenharmony_ci		}
314162306a36Sopenharmony_ci		res->state |= DLM_LOCK_RES_MIGRATING;
314262306a36Sopenharmony_ci		spin_unlock(&res->spinlock);
314362306a36Sopenharmony_ci	}
314462306a36Sopenharmony_ci
314562306a36Sopenharmony_ci	spin_lock(&dlm->master_lock);
314662306a36Sopenharmony_ci	/* ignore status.  only nonzero status would BUG. */
314762306a36Sopenharmony_ci	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
314862306a36Sopenharmony_ci				    name, namelen,
314962306a36Sopenharmony_ci				    migrate->new_master,
315062306a36Sopenharmony_ci				    migrate->master);
315162306a36Sopenharmony_ci
315262306a36Sopenharmony_ci	if (ret < 0)
315362306a36Sopenharmony_ci		kmem_cache_free(dlm_mle_cache, mle);
315462306a36Sopenharmony_ci
315562306a36Sopenharmony_ci	spin_unlock(&dlm->master_lock);
315662306a36Sopenharmony_ciunlock:
315762306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
315862306a36Sopenharmony_ci
315962306a36Sopenharmony_ci	if (oldmle) {
316062306a36Sopenharmony_ci		/* master is known, detach if not already detached */
316162306a36Sopenharmony_ci		dlm_mle_detach_hb_events(dlm, oldmle);
316262306a36Sopenharmony_ci		dlm_put_mle(oldmle);
316362306a36Sopenharmony_ci	}
316462306a36Sopenharmony_ci
316562306a36Sopenharmony_ci	if (res)
316662306a36Sopenharmony_ci		dlm_lockres_put(res);
316762306a36Sopenharmony_cileave:
316862306a36Sopenharmony_ci	dlm_put(dlm);
316962306a36Sopenharmony_ci	return ret;
317062306a36Sopenharmony_ci}
317162306a36Sopenharmony_ci
317262306a36Sopenharmony_ci/* must be holding dlm->spinlock and dlm->master_lock
317362306a36Sopenharmony_ci * when adding a migration mle, we can clear any other mles
317462306a36Sopenharmony_ci * in the master list because we know with certainty that
317562306a36Sopenharmony_ci * the master is "master".  so we remove any old mle from
317662306a36Sopenharmony_ci * the list after setting it's master field, and then add
317762306a36Sopenharmony_ci * the new migration mle.  this way we can hold with the rule
317862306a36Sopenharmony_ci * of having only one mle for a given lock name at all times. */
317962306a36Sopenharmony_cistatic int dlm_add_migration_mle(struct dlm_ctxt *dlm,
318062306a36Sopenharmony_ci				 struct dlm_lock_resource *res,
318162306a36Sopenharmony_ci				 struct dlm_master_list_entry *mle,
318262306a36Sopenharmony_ci				 struct dlm_master_list_entry **oldmle,
318362306a36Sopenharmony_ci				 const char *name, unsigned int namelen,
318462306a36Sopenharmony_ci				 u8 new_master, u8 master)
318562306a36Sopenharmony_ci{
318662306a36Sopenharmony_ci	int found;
318762306a36Sopenharmony_ci	int ret = 0;
318862306a36Sopenharmony_ci
318962306a36Sopenharmony_ci	*oldmle = NULL;
319062306a36Sopenharmony_ci
319162306a36Sopenharmony_ci	assert_spin_locked(&dlm->spinlock);
319262306a36Sopenharmony_ci	assert_spin_locked(&dlm->master_lock);
319362306a36Sopenharmony_ci
319462306a36Sopenharmony_ci	/* caller is responsible for any ref taken here on oldmle */
319562306a36Sopenharmony_ci	found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
319662306a36Sopenharmony_ci	if (found) {
319762306a36Sopenharmony_ci		struct dlm_master_list_entry *tmp = *oldmle;
319862306a36Sopenharmony_ci		spin_lock(&tmp->spinlock);
319962306a36Sopenharmony_ci		if (tmp->type == DLM_MLE_MIGRATION) {
320062306a36Sopenharmony_ci			if (master == dlm->node_num) {
320162306a36Sopenharmony_ci				/* ah another process raced me to it */
320262306a36Sopenharmony_ci				mlog(0, "tried to migrate %.*s, but some "
320362306a36Sopenharmony_ci				     "process beat me to it\n",
320462306a36Sopenharmony_ci				     namelen, name);
320562306a36Sopenharmony_ci				spin_unlock(&tmp->spinlock);
320662306a36Sopenharmony_ci				return -EEXIST;
320762306a36Sopenharmony_ci			} else {
320862306a36Sopenharmony_ci				/* bad.  2 NODES are trying to migrate! */
320962306a36Sopenharmony_ci				mlog(ML_ERROR, "migration error  mle: "
321062306a36Sopenharmony_ci				     "master=%u new_master=%u // request: "
321162306a36Sopenharmony_ci				     "master=%u new_master=%u // "
321262306a36Sopenharmony_ci				     "lockres=%.*s\n",
321362306a36Sopenharmony_ci				     tmp->master, tmp->new_master,
321462306a36Sopenharmony_ci				     master, new_master,
321562306a36Sopenharmony_ci				     namelen, name);
321662306a36Sopenharmony_ci				BUG();
321762306a36Sopenharmony_ci			}
321862306a36Sopenharmony_ci		} else {
321962306a36Sopenharmony_ci			/* this is essentially what assert_master does */
322062306a36Sopenharmony_ci			tmp->master = master;
322162306a36Sopenharmony_ci			atomic_set(&tmp->woken, 1);
322262306a36Sopenharmony_ci			wake_up(&tmp->wq);
322362306a36Sopenharmony_ci			/* remove it so that only one mle will be found */
322462306a36Sopenharmony_ci			__dlm_unlink_mle(dlm, tmp);
322562306a36Sopenharmony_ci			__dlm_mle_detach_hb_events(dlm, tmp);
322662306a36Sopenharmony_ci			if (tmp->type == DLM_MLE_MASTER) {
322762306a36Sopenharmony_ci				ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
322862306a36Sopenharmony_ci				mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
322962306a36Sopenharmony_ci						"telling master to get ref "
323062306a36Sopenharmony_ci						"for cleared out mle during "
323162306a36Sopenharmony_ci						"migration\n", dlm->name,
323262306a36Sopenharmony_ci						namelen, name, master,
323362306a36Sopenharmony_ci						new_master);
323462306a36Sopenharmony_ci			}
323562306a36Sopenharmony_ci		}
323662306a36Sopenharmony_ci		spin_unlock(&tmp->spinlock);
323762306a36Sopenharmony_ci	}
323862306a36Sopenharmony_ci
323962306a36Sopenharmony_ci	/* now add a migration mle to the tail of the list */
324062306a36Sopenharmony_ci	dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
324162306a36Sopenharmony_ci	mle->new_master = new_master;
324262306a36Sopenharmony_ci	/* the new master will be sending an assert master for this.
324362306a36Sopenharmony_ci	 * at that point we will get the refmap reference */
324462306a36Sopenharmony_ci	mle->master = master;
324562306a36Sopenharmony_ci	/* do this for consistency with other mle types */
324662306a36Sopenharmony_ci	set_bit(new_master, mle->maybe_map);
324762306a36Sopenharmony_ci	__dlm_insert_mle(dlm, mle);
324862306a36Sopenharmony_ci
324962306a36Sopenharmony_ci	return ret;
325062306a36Sopenharmony_ci}
325162306a36Sopenharmony_ci
325262306a36Sopenharmony_ci/*
325362306a36Sopenharmony_ci * Sets the owner of the lockres, associated to the mle, to UNKNOWN
325462306a36Sopenharmony_ci */
325562306a36Sopenharmony_cistatic struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
325662306a36Sopenharmony_ci					struct dlm_master_list_entry *mle)
325762306a36Sopenharmony_ci{
325862306a36Sopenharmony_ci	struct dlm_lock_resource *res;
325962306a36Sopenharmony_ci
326062306a36Sopenharmony_ci	/* Find the lockres associated to the mle and set its owner to UNK */
326162306a36Sopenharmony_ci	res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
326262306a36Sopenharmony_ci				   mle->mnamehash);
326362306a36Sopenharmony_ci	if (res) {
326462306a36Sopenharmony_ci		spin_unlock(&dlm->master_lock);
326562306a36Sopenharmony_ci
326662306a36Sopenharmony_ci		/* move lockres onto recovery list */
326762306a36Sopenharmony_ci		spin_lock(&res->spinlock);
326862306a36Sopenharmony_ci		dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
326962306a36Sopenharmony_ci		dlm_move_lockres_to_recovery_list(dlm, res);
327062306a36Sopenharmony_ci		spin_unlock(&res->spinlock);
327162306a36Sopenharmony_ci		dlm_lockres_put(res);
327262306a36Sopenharmony_ci
327362306a36Sopenharmony_ci		/* about to get rid of mle, detach from heartbeat */
327462306a36Sopenharmony_ci		__dlm_mle_detach_hb_events(dlm, mle);
327562306a36Sopenharmony_ci
327662306a36Sopenharmony_ci		/* dump the mle */
327762306a36Sopenharmony_ci		spin_lock(&dlm->master_lock);
327862306a36Sopenharmony_ci		__dlm_put_mle(mle);
327962306a36Sopenharmony_ci		spin_unlock(&dlm->master_lock);
328062306a36Sopenharmony_ci	}
328162306a36Sopenharmony_ci
328262306a36Sopenharmony_ci	return res;
328362306a36Sopenharmony_ci}
328462306a36Sopenharmony_ci
328562306a36Sopenharmony_cistatic void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
328662306a36Sopenharmony_ci				    struct dlm_master_list_entry *mle)
328762306a36Sopenharmony_ci{
328862306a36Sopenharmony_ci	__dlm_mle_detach_hb_events(dlm, mle);
328962306a36Sopenharmony_ci
329062306a36Sopenharmony_ci	spin_lock(&mle->spinlock);
329162306a36Sopenharmony_ci	__dlm_unlink_mle(dlm, mle);
329262306a36Sopenharmony_ci	atomic_set(&mle->woken, 1);
329362306a36Sopenharmony_ci	spin_unlock(&mle->spinlock);
329462306a36Sopenharmony_ci
329562306a36Sopenharmony_ci	wake_up(&mle->wq);
329662306a36Sopenharmony_ci}
329762306a36Sopenharmony_ci
329862306a36Sopenharmony_cistatic void dlm_clean_block_mle(struct dlm_ctxt *dlm,
329962306a36Sopenharmony_ci				struct dlm_master_list_entry *mle, u8 dead_node)
330062306a36Sopenharmony_ci{
330162306a36Sopenharmony_ci	int bit;
330262306a36Sopenharmony_ci
330362306a36Sopenharmony_ci	BUG_ON(mle->type != DLM_MLE_BLOCK);
330462306a36Sopenharmony_ci
330562306a36Sopenharmony_ci	spin_lock(&mle->spinlock);
330662306a36Sopenharmony_ci	bit = find_first_bit(mle->maybe_map, O2NM_MAX_NODES);
330762306a36Sopenharmony_ci	if (bit != dead_node) {
330862306a36Sopenharmony_ci		mlog(0, "mle found, but dead node %u would not have been "
330962306a36Sopenharmony_ci		     "master\n", dead_node);
331062306a36Sopenharmony_ci		spin_unlock(&mle->spinlock);
331162306a36Sopenharmony_ci	} else {
331262306a36Sopenharmony_ci		/* Must drop the refcount by one since the assert_master will
331362306a36Sopenharmony_ci		 * never arrive. This may result in the mle being unlinked and
331462306a36Sopenharmony_ci		 * freed, but there may still be a process waiting in the
331562306a36Sopenharmony_ci		 * dlmlock path which is fine. */
331662306a36Sopenharmony_ci		mlog(0, "node %u was expected master\n", dead_node);
331762306a36Sopenharmony_ci		atomic_set(&mle->woken, 1);
331862306a36Sopenharmony_ci		spin_unlock(&mle->spinlock);
331962306a36Sopenharmony_ci		wake_up(&mle->wq);
332062306a36Sopenharmony_ci
332162306a36Sopenharmony_ci		/* Do not need events any longer, so detach from heartbeat */
332262306a36Sopenharmony_ci		__dlm_mle_detach_hb_events(dlm, mle);
332362306a36Sopenharmony_ci		__dlm_put_mle(mle);
332462306a36Sopenharmony_ci	}
332562306a36Sopenharmony_ci}
332662306a36Sopenharmony_ci
332762306a36Sopenharmony_civoid dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
332862306a36Sopenharmony_ci{
332962306a36Sopenharmony_ci	struct dlm_master_list_entry *mle;
333062306a36Sopenharmony_ci	struct dlm_lock_resource *res;
333162306a36Sopenharmony_ci	struct hlist_head *bucket;
333262306a36Sopenharmony_ci	struct hlist_node *tmp;
333362306a36Sopenharmony_ci	unsigned int i;
333462306a36Sopenharmony_ci
333562306a36Sopenharmony_ci	mlog(0, "dlm=%s, dead node=%u\n", dlm->name, dead_node);
333662306a36Sopenharmony_citop:
333762306a36Sopenharmony_ci	assert_spin_locked(&dlm->spinlock);
333862306a36Sopenharmony_ci
333962306a36Sopenharmony_ci	/* clean the master list */
334062306a36Sopenharmony_ci	spin_lock(&dlm->master_lock);
334162306a36Sopenharmony_ci	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
334262306a36Sopenharmony_ci		bucket = dlm_master_hash(dlm, i);
334362306a36Sopenharmony_ci		hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
334462306a36Sopenharmony_ci			BUG_ON(mle->type != DLM_MLE_BLOCK &&
334562306a36Sopenharmony_ci			       mle->type != DLM_MLE_MASTER &&
334662306a36Sopenharmony_ci			       mle->type != DLM_MLE_MIGRATION);
334762306a36Sopenharmony_ci
334862306a36Sopenharmony_ci			/* MASTER mles are initiated locally. The waiting
334962306a36Sopenharmony_ci			 * process will notice the node map change shortly.
335062306a36Sopenharmony_ci			 * Let that happen as normal. */
335162306a36Sopenharmony_ci			if (mle->type == DLM_MLE_MASTER)
335262306a36Sopenharmony_ci				continue;
335362306a36Sopenharmony_ci
335462306a36Sopenharmony_ci			/* BLOCK mles are initiated by other nodes. Need to
335562306a36Sopenharmony_ci			 * clean up if the dead node would have been the
335662306a36Sopenharmony_ci			 * master. */
335762306a36Sopenharmony_ci			if (mle->type == DLM_MLE_BLOCK) {
335862306a36Sopenharmony_ci				dlm_clean_block_mle(dlm, mle, dead_node);
335962306a36Sopenharmony_ci				continue;
336062306a36Sopenharmony_ci			}
336162306a36Sopenharmony_ci
336262306a36Sopenharmony_ci			/* Everything else is a MIGRATION mle */
336362306a36Sopenharmony_ci
336462306a36Sopenharmony_ci			/* The rule for MIGRATION mles is that the master
336562306a36Sopenharmony_ci			 * becomes UNKNOWN if *either* the original or the new
336662306a36Sopenharmony_ci			 * master dies. All UNKNOWN lockres' are sent to
336762306a36Sopenharmony_ci			 * whichever node becomes the recovery master. The new
336862306a36Sopenharmony_ci			 * master is responsible for determining if there is
336962306a36Sopenharmony_ci			 * still a master for this lockres, or if he needs to
337062306a36Sopenharmony_ci			 * take over mastery. Either way, this node should
337162306a36Sopenharmony_ci			 * expect another message to resolve this. */
337262306a36Sopenharmony_ci
337362306a36Sopenharmony_ci			if (mle->master != dead_node &&
337462306a36Sopenharmony_ci			    mle->new_master != dead_node)
337562306a36Sopenharmony_ci				continue;
337662306a36Sopenharmony_ci
337762306a36Sopenharmony_ci			if (mle->new_master == dead_node && mle->inuse) {
337862306a36Sopenharmony_ci				mlog(ML_NOTICE, "%s: target %u died during "
337962306a36Sopenharmony_ci						"migration from %u, the MLE is "
338062306a36Sopenharmony_ci						"still keep used, ignore it!\n",
338162306a36Sopenharmony_ci						dlm->name, dead_node,
338262306a36Sopenharmony_ci						mle->master);
338362306a36Sopenharmony_ci				continue;
338462306a36Sopenharmony_ci			}
338562306a36Sopenharmony_ci
338662306a36Sopenharmony_ci			/* If we have reached this point, this mle needs to be
338762306a36Sopenharmony_ci			 * removed from the list and freed. */
338862306a36Sopenharmony_ci			dlm_clean_migration_mle(dlm, mle);
338962306a36Sopenharmony_ci
339062306a36Sopenharmony_ci			mlog(0, "%s: node %u died during migration from "
339162306a36Sopenharmony_ci			     "%u to %u!\n", dlm->name, dead_node, mle->master,
339262306a36Sopenharmony_ci			     mle->new_master);
339362306a36Sopenharmony_ci
339462306a36Sopenharmony_ci			/* If we find a lockres associated with the mle, we've
339562306a36Sopenharmony_ci			 * hit this rare case that messes up our lock ordering.
339662306a36Sopenharmony_ci			 * If so, we need to drop the master lock so that we can
339762306a36Sopenharmony_ci			 * take the lockres lock, meaning that we will have to
339862306a36Sopenharmony_ci			 * restart from the head of list. */
339962306a36Sopenharmony_ci			res = dlm_reset_mleres_owner(dlm, mle);
340062306a36Sopenharmony_ci			if (res)
340162306a36Sopenharmony_ci				/* restart */
340262306a36Sopenharmony_ci				goto top;
340362306a36Sopenharmony_ci
340462306a36Sopenharmony_ci			/* This may be the last reference */
340562306a36Sopenharmony_ci			__dlm_put_mle(mle);
340662306a36Sopenharmony_ci		}
340762306a36Sopenharmony_ci	}
340862306a36Sopenharmony_ci	spin_unlock(&dlm->master_lock);
340962306a36Sopenharmony_ci}
341062306a36Sopenharmony_ci
341162306a36Sopenharmony_ciint dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
341262306a36Sopenharmony_ci			 u8 old_master)
341362306a36Sopenharmony_ci{
341462306a36Sopenharmony_ci	struct dlm_node_iter iter;
341562306a36Sopenharmony_ci	int ret = 0;
341662306a36Sopenharmony_ci
341762306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
341862306a36Sopenharmony_ci	dlm_node_iter_init(dlm->domain_map, &iter);
341962306a36Sopenharmony_ci	clear_bit(old_master, iter.node_map);
342062306a36Sopenharmony_ci	clear_bit(dlm->node_num, iter.node_map);
342162306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
342262306a36Sopenharmony_ci
342362306a36Sopenharmony_ci	/* ownership of the lockres is changing.  account for the
342462306a36Sopenharmony_ci	 * mastery reference here since old_master will briefly have
342562306a36Sopenharmony_ci	 * a reference after the migration completes */
342662306a36Sopenharmony_ci	spin_lock(&res->spinlock);
342762306a36Sopenharmony_ci	dlm_lockres_set_refmap_bit(dlm, res, old_master);
342862306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
342962306a36Sopenharmony_ci
343062306a36Sopenharmony_ci	mlog(0, "now time to do a migrate request to other nodes\n");
343162306a36Sopenharmony_ci	ret = dlm_do_migrate_request(dlm, res, old_master,
343262306a36Sopenharmony_ci				     dlm->node_num, &iter);
343362306a36Sopenharmony_ci	if (ret < 0) {
343462306a36Sopenharmony_ci		mlog_errno(ret);
343562306a36Sopenharmony_ci		goto leave;
343662306a36Sopenharmony_ci	}
343762306a36Sopenharmony_ci
343862306a36Sopenharmony_ci	mlog(0, "doing assert master of %.*s to all except the original node\n",
343962306a36Sopenharmony_ci	     res->lockname.len, res->lockname.name);
344062306a36Sopenharmony_ci	/* this call now finishes out the nodemap
344162306a36Sopenharmony_ci	 * even if one or more nodes die */
344262306a36Sopenharmony_ci	ret = dlm_do_assert_master(dlm, res, iter.node_map,
344362306a36Sopenharmony_ci				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
344462306a36Sopenharmony_ci	if (ret < 0) {
344562306a36Sopenharmony_ci		/* no longer need to retry.  all living nodes contacted. */
344662306a36Sopenharmony_ci		mlog_errno(ret);
344762306a36Sopenharmony_ci		ret = 0;
344862306a36Sopenharmony_ci	}
344962306a36Sopenharmony_ci
345062306a36Sopenharmony_ci	bitmap_zero(iter.node_map, O2NM_MAX_NODES);
345162306a36Sopenharmony_ci	set_bit(old_master, iter.node_map);
345262306a36Sopenharmony_ci	mlog(0, "doing assert master of %.*s back to %u\n",
345362306a36Sopenharmony_ci	     res->lockname.len, res->lockname.name, old_master);
345462306a36Sopenharmony_ci	ret = dlm_do_assert_master(dlm, res, iter.node_map,
345562306a36Sopenharmony_ci				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
345662306a36Sopenharmony_ci	if (ret < 0) {
345762306a36Sopenharmony_ci		mlog(0, "assert master to original master failed "
345862306a36Sopenharmony_ci		     "with %d.\n", ret);
345962306a36Sopenharmony_ci		/* the only nonzero status here would be because of
346062306a36Sopenharmony_ci		 * a dead original node.  we're done. */
346162306a36Sopenharmony_ci		ret = 0;
346262306a36Sopenharmony_ci	}
346362306a36Sopenharmony_ci
346462306a36Sopenharmony_ci	/* all done, set the owner, clear the flag */
346562306a36Sopenharmony_ci	spin_lock(&res->spinlock);
346662306a36Sopenharmony_ci	dlm_set_lockres_owner(dlm, res, dlm->node_num);
346762306a36Sopenharmony_ci	res->state &= ~DLM_LOCK_RES_MIGRATING;
346862306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
346962306a36Sopenharmony_ci	/* re-dirty it on the new master */
347062306a36Sopenharmony_ci	dlm_kick_thread(dlm, res);
347162306a36Sopenharmony_ci	wake_up(&res->wq);
347262306a36Sopenharmony_cileave:
347362306a36Sopenharmony_ci	return ret;
347462306a36Sopenharmony_ci}
347562306a36Sopenharmony_ci
347662306a36Sopenharmony_ci/*
347762306a36Sopenharmony_ci * LOCKRES AST REFCOUNT
347862306a36Sopenharmony_ci * this is integral to migration
347962306a36Sopenharmony_ci */
348062306a36Sopenharmony_ci
348162306a36Sopenharmony_ci/* for future intent to call an ast, reserve one ahead of time.
348262306a36Sopenharmony_ci * this should be called only after waiting on the lockres
348362306a36Sopenharmony_ci * with dlm_wait_on_lockres, and while still holding the
348462306a36Sopenharmony_ci * spinlock after the call. */
348562306a36Sopenharmony_civoid __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
348662306a36Sopenharmony_ci{
348762306a36Sopenharmony_ci	assert_spin_locked(&res->spinlock);
348862306a36Sopenharmony_ci	if (res->state & DLM_LOCK_RES_MIGRATING) {
348962306a36Sopenharmony_ci		__dlm_print_one_lock_resource(res);
349062306a36Sopenharmony_ci	}
349162306a36Sopenharmony_ci	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
349262306a36Sopenharmony_ci
349362306a36Sopenharmony_ci	atomic_inc(&res->asts_reserved);
349462306a36Sopenharmony_ci}
349562306a36Sopenharmony_ci
349662306a36Sopenharmony_ci/*
349762306a36Sopenharmony_ci * used to drop the reserved ast, either because it went unused,
349862306a36Sopenharmony_ci * or because the ast/bast was actually called.
349962306a36Sopenharmony_ci *
350062306a36Sopenharmony_ci * also, if there is a pending migration on this lockres,
350162306a36Sopenharmony_ci * and this was the last pending ast on the lockres,
350262306a36Sopenharmony_ci * atomically set the MIGRATING flag before we drop the lock.
350362306a36Sopenharmony_ci * this is how we ensure that migration can proceed with no
350462306a36Sopenharmony_ci * asts in progress.  note that it is ok if the state of the
350562306a36Sopenharmony_ci * queues is such that a lock should be granted in the future
350662306a36Sopenharmony_ci * or that a bast should be fired, because the new master will
350762306a36Sopenharmony_ci * shuffle the lists on this lockres as soon as it is migrated.
350862306a36Sopenharmony_ci */
350962306a36Sopenharmony_civoid dlm_lockres_release_ast(struct dlm_ctxt *dlm,
351062306a36Sopenharmony_ci			     struct dlm_lock_resource *res)
351162306a36Sopenharmony_ci{
351262306a36Sopenharmony_ci	if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
351362306a36Sopenharmony_ci		return;
351462306a36Sopenharmony_ci
351562306a36Sopenharmony_ci	if (!res->migration_pending) {
351662306a36Sopenharmony_ci		spin_unlock(&res->spinlock);
351762306a36Sopenharmony_ci		return;
351862306a36Sopenharmony_ci	}
351962306a36Sopenharmony_ci
352062306a36Sopenharmony_ci	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
352162306a36Sopenharmony_ci	res->migration_pending = 0;
352262306a36Sopenharmony_ci	res->state |= DLM_LOCK_RES_MIGRATING;
352362306a36Sopenharmony_ci	spin_unlock(&res->spinlock);
352462306a36Sopenharmony_ci	wake_up(&res->wq);
352562306a36Sopenharmony_ci	wake_up(&dlm->migration_wq);
352662306a36Sopenharmony_ci}
352762306a36Sopenharmony_ci
352862306a36Sopenharmony_civoid dlm_force_free_mles(struct dlm_ctxt *dlm)
352962306a36Sopenharmony_ci{
353062306a36Sopenharmony_ci	int i;
353162306a36Sopenharmony_ci	struct hlist_head *bucket;
353262306a36Sopenharmony_ci	struct dlm_master_list_entry *mle;
353362306a36Sopenharmony_ci	struct hlist_node *tmp;
353462306a36Sopenharmony_ci
353562306a36Sopenharmony_ci	/*
353662306a36Sopenharmony_ci	 * We notified all other nodes that we are exiting the domain and
353762306a36Sopenharmony_ci	 * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
353862306a36Sopenharmony_ci	 * around we force free them and wake any processes that are waiting
353962306a36Sopenharmony_ci	 * on the mles
354062306a36Sopenharmony_ci	 */
354162306a36Sopenharmony_ci	spin_lock(&dlm->spinlock);
354262306a36Sopenharmony_ci	spin_lock(&dlm->master_lock);
354362306a36Sopenharmony_ci
354462306a36Sopenharmony_ci	BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING);
354562306a36Sopenharmony_ci	BUG_ON((find_first_bit(dlm->domain_map, O2NM_MAX_NODES) < O2NM_MAX_NODES));
354662306a36Sopenharmony_ci
354762306a36Sopenharmony_ci	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
354862306a36Sopenharmony_ci		bucket = dlm_master_hash(dlm, i);
354962306a36Sopenharmony_ci		hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
355062306a36Sopenharmony_ci			if (mle->type != DLM_MLE_BLOCK) {
355162306a36Sopenharmony_ci				mlog(ML_ERROR, "bad mle: %p\n", mle);
355262306a36Sopenharmony_ci				dlm_print_one_mle(mle);
355362306a36Sopenharmony_ci			}
355462306a36Sopenharmony_ci			atomic_set(&mle->woken, 1);
355562306a36Sopenharmony_ci			wake_up(&mle->wq);
355662306a36Sopenharmony_ci
355762306a36Sopenharmony_ci			__dlm_unlink_mle(dlm, mle);
355862306a36Sopenharmony_ci			__dlm_mle_detach_hb_events(dlm, mle);
355962306a36Sopenharmony_ci			__dlm_put_mle(mle);
356062306a36Sopenharmony_ci		}
356162306a36Sopenharmony_ci	}
356262306a36Sopenharmony_ci	spin_unlock(&dlm->master_lock);
356362306a36Sopenharmony_ci	spin_unlock(&dlm->spinlock);
356462306a36Sopenharmony_ci}
3565