xref: /kernel/linux/linux-5.10/fs/dlm/lock.c (revision 8c2ecf20)
1// SPDX-License-Identifier: GPL-2.0-only
2/******************************************************************************
3*******************************************************************************
4**
5**  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6**
7**
8*******************************************************************************
9******************************************************************************/
10
11/* Central locking logic has four stages:
12
13   dlm_lock()
14   dlm_unlock()
15
16   request_lock(ls, lkb)
17   convert_lock(ls, lkb)
18   unlock_lock(ls, lkb)
19   cancel_lock(ls, lkb)
20
21   _request_lock(r, lkb)
22   _convert_lock(r, lkb)
23   _unlock_lock(r, lkb)
24   _cancel_lock(r, lkb)
25
26   do_request(r, lkb)
27   do_convert(r, lkb)
28   do_unlock(r, lkb)
29   do_cancel(r, lkb)
30
31   Stage 1 (lock, unlock) is mainly about checking input args and
32   splitting into one of the four main operations:
33
34       dlm_lock          = request_lock
35       dlm_lock+CONVERT  = convert_lock
36       dlm_unlock        = unlock_lock
37       dlm_unlock+CANCEL = cancel_lock
38
39   Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40   provided to the next stage.
41
42   Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43   When remote, it calls send_xxxx(), when local it calls do_xxxx().
44
45   Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46   given rsb and lkb and queues callbacks.
47
48   For remote operations, send_xxxx() results in the corresponding do_xxxx()
49   function being executed on the remote node.  The connecting send/receive
50   calls on local (L) and remote (R) nodes:
51
52   L: send_xxxx()              ->  R: receive_xxxx()
53                                   R: do_xxxx()
54   L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55*/
56#include <linux/types.h>
57#include <linux/rbtree.h>
58#include <linux/slab.h>
59#include "dlm_internal.h"
60#include <linux/dlm_device.h>
61#include "memory.h"
62#include "lowcomms.h"
63#include "requestqueue.h"
64#include "util.h"
65#include "dir.h"
66#include "member.h"
67#include "lockspace.h"
68#include "ast.h"
69#include "lock.h"
70#include "rcom.h"
71#include "recover.h"
72#include "lvb_table.h"
73#include "user.h"
74#include "config.h"
75
76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_remove(struct dlm_rsb *r);
84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87				    struct dlm_message *ms);
88static int receive_extralen(struct dlm_message *ms);
89static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90static void del_timeout(struct dlm_lkb *lkb);
91static void toss_rsb(struct kref *kref);
92
93/*
94 * Lock compatibilty matrix - thanks Steve
95 * UN = Unlocked state. Not really a state, used as a flag
96 * PD = Padding. Used to make the matrix a nice power of two in size
97 * Other states are the same as the VMS DLM.
98 * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
99 */
100
101static const int __dlm_compat_matrix[8][8] = {
102      /* UN NL CR CW PR PW EX PD */
103        {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
104        {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
105        {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
106        {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
107        {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
108        {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
109        {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
110        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
111};
112
113/*
114 * This defines the direction of transfer of LVB data.
115 * Granted mode is the row; requested mode is the column.
116 * Usage: matrix[grmode+1][rqmode+1]
117 * 1 = LVB is returned to the caller
118 * 0 = LVB is written to the resource
119 * -1 = nothing happens to the LVB
120 */
121
122const int dlm_lvb_operations[8][8] = {
123        /* UN   NL  CR  CW  PR  PW  EX  PD*/
124        {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
125        {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
126        {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
127        {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
128        {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
129        {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
130        {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
131        {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
132};
133
134#define modes_compat(gr, rq) \
135	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136
137int dlm_modes_compat(int mode1, int mode2)
138{
139	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140}
141
142/*
143 * Compatibility matrix for conversions with QUECVT set.
144 * Granted mode is the row; requested mode is the column.
145 * Usage: matrix[grmode+1][rqmode+1]
146 */
147
148static const int __quecvt_compat_matrix[8][8] = {
149      /* UN NL CR CW PR PW EX PD */
150        {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
151        {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
152        {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
153        {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
154        {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
155        {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
156        {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
157        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
158};
159
160void dlm_print_lkb(struct dlm_lkb *lkb)
161{
162	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
163	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
164	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
167	       (unsigned long long)lkb->lkb_recover_seq);
168}
169
170static void dlm_print_rsb(struct dlm_rsb *r)
171{
172	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
173	       "rlc %d name %s\n",
174	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
175	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
176	       r->res_name);
177}
178
179void dlm_dump_rsb(struct dlm_rsb *r)
180{
181	struct dlm_lkb *lkb;
182
183	dlm_print_rsb(r);
184
185	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
186	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
187	printk(KERN_ERR "rsb lookup list\n");
188	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
189		dlm_print_lkb(lkb);
190	printk(KERN_ERR "rsb grant queue:\n");
191	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
192		dlm_print_lkb(lkb);
193	printk(KERN_ERR "rsb convert queue:\n");
194	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
195		dlm_print_lkb(lkb);
196	printk(KERN_ERR "rsb wait queue:\n");
197	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
198		dlm_print_lkb(lkb);
199}
200
201/* Threads cannot use the lockspace while it's being recovered */
202
203static inline void dlm_lock_recovery(struct dlm_ls *ls)
204{
205	down_read(&ls->ls_in_recovery);
206}
207
208void dlm_unlock_recovery(struct dlm_ls *ls)
209{
210	up_read(&ls->ls_in_recovery);
211}
212
213int dlm_lock_recovery_try(struct dlm_ls *ls)
214{
215	return down_read_trylock(&ls->ls_in_recovery);
216}
217
218static inline int can_be_queued(struct dlm_lkb *lkb)
219{
220	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
221}
222
223static inline int force_blocking_asts(struct dlm_lkb *lkb)
224{
225	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
226}
227
228static inline int is_demoted(struct dlm_lkb *lkb)
229{
230	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
231}
232
233static inline int is_altmode(struct dlm_lkb *lkb)
234{
235	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
236}
237
238static inline int is_granted(struct dlm_lkb *lkb)
239{
240	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
241}
242
243static inline int is_remote(struct dlm_rsb *r)
244{
245	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
246	return !!r->res_nodeid;
247}
248
249static inline int is_process_copy(struct dlm_lkb *lkb)
250{
251	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
252}
253
254static inline int is_master_copy(struct dlm_lkb *lkb)
255{
256	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
257}
258
259static inline int middle_conversion(struct dlm_lkb *lkb)
260{
261	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
262	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
263		return 1;
264	return 0;
265}
266
267static inline int down_conversion(struct dlm_lkb *lkb)
268{
269	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
270}
271
272static inline int is_overlap_unlock(struct dlm_lkb *lkb)
273{
274	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
275}
276
277static inline int is_overlap_cancel(struct dlm_lkb *lkb)
278{
279	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
280}
281
282static inline int is_overlap(struct dlm_lkb *lkb)
283{
284	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
285				  DLM_IFL_OVERLAP_CANCEL));
286}
287
288static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
289{
290	if (is_master_copy(lkb))
291		return;
292
293	del_timeout(lkb);
294
295	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296
297	/* if the operation was a cancel, then return -DLM_ECANCEL, if a
298	   timeout caused the cancel then return -ETIMEDOUT */
299	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
300		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
301		rv = -ETIMEDOUT;
302	}
303
304	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
305		lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
306		rv = -EDEADLK;
307	}
308
309	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
310}
311
312static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
313{
314	queue_cast(r, lkb,
315		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
316}
317
318static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
319{
320	if (is_master_copy(lkb)) {
321		send_bast(r, lkb, rqmode);
322	} else {
323		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
324	}
325}
326
327/*
328 * Basic operations on rsb's and lkb's
329 */
330
331/* This is only called to add a reference when the code already holds
332   a valid reference to the rsb, so there's no need for locking. */
333
334static inline void hold_rsb(struct dlm_rsb *r)
335{
336	kref_get(&r->res_ref);
337}
338
339void dlm_hold_rsb(struct dlm_rsb *r)
340{
341	hold_rsb(r);
342}
343
344/* When all references to the rsb are gone it's transferred to
345   the tossed list for later disposal. */
346
347static void put_rsb(struct dlm_rsb *r)
348{
349	struct dlm_ls *ls = r->res_ls;
350	uint32_t bucket = r->res_bucket;
351
352	spin_lock(&ls->ls_rsbtbl[bucket].lock);
353	kref_put(&r->res_ref, toss_rsb);
354	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
355}
356
357void dlm_put_rsb(struct dlm_rsb *r)
358{
359	put_rsb(r);
360}
361
362static int pre_rsb_struct(struct dlm_ls *ls)
363{
364	struct dlm_rsb *r1, *r2;
365	int count = 0;
366
367	spin_lock(&ls->ls_new_rsb_spin);
368	if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
369		spin_unlock(&ls->ls_new_rsb_spin);
370		return 0;
371	}
372	spin_unlock(&ls->ls_new_rsb_spin);
373
374	r1 = dlm_allocate_rsb(ls);
375	r2 = dlm_allocate_rsb(ls);
376
377	spin_lock(&ls->ls_new_rsb_spin);
378	if (r1) {
379		list_add(&r1->res_hashchain, &ls->ls_new_rsb);
380		ls->ls_new_rsb_count++;
381	}
382	if (r2) {
383		list_add(&r2->res_hashchain, &ls->ls_new_rsb);
384		ls->ls_new_rsb_count++;
385	}
386	count = ls->ls_new_rsb_count;
387	spin_unlock(&ls->ls_new_rsb_spin);
388
389	if (!count)
390		return -ENOMEM;
391	return 0;
392}
393
394/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
395   unlock any spinlocks, go back and call pre_rsb_struct again.
396   Otherwise, take an rsb off the list and return it. */
397
398static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
399			  struct dlm_rsb **r_ret)
400{
401	struct dlm_rsb *r;
402	int count;
403
404	spin_lock(&ls->ls_new_rsb_spin);
405	if (list_empty(&ls->ls_new_rsb)) {
406		count = ls->ls_new_rsb_count;
407		spin_unlock(&ls->ls_new_rsb_spin);
408		log_debug(ls, "find_rsb retry %d %d %s",
409			  count, dlm_config.ci_new_rsb_count, name);
410		return -EAGAIN;
411	}
412
413	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
414	list_del(&r->res_hashchain);
415	/* Convert the empty list_head to a NULL rb_node for tree usage: */
416	memset(&r->res_hashnode, 0, sizeof(struct rb_node));
417	ls->ls_new_rsb_count--;
418	spin_unlock(&ls->ls_new_rsb_spin);
419
420	r->res_ls = ls;
421	r->res_length = len;
422	memcpy(r->res_name, name, len);
423	mutex_init(&r->res_mutex);
424
425	INIT_LIST_HEAD(&r->res_lookup);
426	INIT_LIST_HEAD(&r->res_grantqueue);
427	INIT_LIST_HEAD(&r->res_convertqueue);
428	INIT_LIST_HEAD(&r->res_waitqueue);
429	INIT_LIST_HEAD(&r->res_root_list);
430	INIT_LIST_HEAD(&r->res_recover_list);
431
432	*r_ret = r;
433	return 0;
434}
435
436static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
437{
438	char maxname[DLM_RESNAME_MAXLEN];
439
440	memset(maxname, 0, DLM_RESNAME_MAXLEN);
441	memcpy(maxname, name, nlen);
442	return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
443}
444
445int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
446			struct dlm_rsb **r_ret)
447{
448	struct rb_node *node = tree->rb_node;
449	struct dlm_rsb *r;
450	int rc;
451
452	while (node) {
453		r = rb_entry(node, struct dlm_rsb, res_hashnode);
454		rc = rsb_cmp(r, name, len);
455		if (rc < 0)
456			node = node->rb_left;
457		else if (rc > 0)
458			node = node->rb_right;
459		else
460			goto found;
461	}
462	*r_ret = NULL;
463	return -EBADR;
464
465 found:
466	*r_ret = r;
467	return 0;
468}
469
470static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
471{
472	struct rb_node **newn = &tree->rb_node;
473	struct rb_node *parent = NULL;
474	int rc;
475
476	while (*newn) {
477		struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
478					       res_hashnode);
479
480		parent = *newn;
481		rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
482		if (rc < 0)
483			newn = &parent->rb_left;
484		else if (rc > 0)
485			newn = &parent->rb_right;
486		else {
487			log_print("rsb_insert match");
488			dlm_dump_rsb(rsb);
489			dlm_dump_rsb(cur);
490			return -EEXIST;
491		}
492	}
493
494	rb_link_node(&rsb->res_hashnode, parent, newn);
495	rb_insert_color(&rsb->res_hashnode, tree);
496	return 0;
497}
498
499/*
500 * Find rsb in rsbtbl and potentially create/add one
501 *
502 * Delaying the release of rsb's has a similar benefit to applications keeping
503 * NL locks on an rsb, but without the guarantee that the cached master value
504 * will still be valid when the rsb is reused.  Apps aren't always smart enough
505 * to keep NL locks on an rsb that they may lock again shortly; this can lead
506 * to excessive master lookups and removals if we don't delay the release.
507 *
508 * Searching for an rsb means looking through both the normal list and toss
509 * list.  When found on the toss list the rsb is moved to the normal list with
510 * ref count of 1; when found on normal list the ref count is incremented.
511 *
512 * rsb's on the keep list are being used locally and refcounted.
513 * rsb's on the toss list are not being used locally, and are not refcounted.
514 *
515 * The toss list rsb's were either
516 * - previously used locally but not any more (were on keep list, then
517 *   moved to toss list when last refcount dropped)
518 * - created and put on toss list as a directory record for a lookup
519 *   (we are the dir node for the res, but are not using the res right now,
520 *   but some other node is)
521 *
522 * The purpose of find_rsb() is to return a refcounted rsb for local use.
523 * So, if the given rsb is on the toss list, it is moved to the keep list
524 * before being returned.
525 *
526 * toss_rsb() happens when all local usage of the rsb is done, i.e. no
527 * more refcounts exist, so the rsb is moved from the keep list to the
528 * toss list.
529 *
530 * rsb's on both keep and toss lists are used for doing a name to master
531 * lookups.  rsb's that are in use locally (and being refcounted) are on
532 * the keep list, rsb's that are not in use locally (not refcounted) and
533 * only exist for name/master lookups are on the toss list.
534 *
535 * rsb's on the toss list who's dir_nodeid is not local can have stale
536 * name/master mappings.  So, remote requests on such rsb's can potentially
537 * return with an error, which means the mapping is stale and needs to
538 * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
539 * first_lkid is to keep only a single outstanding request on an rsb
540 * while that rsb has a potentially stale master.)
541 */
542
543static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
544			uint32_t hash, uint32_t b,
545			int dir_nodeid, int from_nodeid,
546			unsigned int flags, struct dlm_rsb **r_ret)
547{
548	struct dlm_rsb *r = NULL;
549	int our_nodeid = dlm_our_nodeid();
550	int from_local = 0;
551	int from_other = 0;
552	int from_dir = 0;
553	int create = 0;
554	int error;
555
556	if (flags & R_RECEIVE_REQUEST) {
557		if (from_nodeid == dir_nodeid)
558			from_dir = 1;
559		else
560			from_other = 1;
561	} else if (flags & R_REQUEST) {
562		from_local = 1;
563	}
564
565	/*
566	 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
567	 * from_nodeid has sent us a lock in dlm_recover_locks, believing
568	 * we're the new master.  Our local recovery may not have set
569	 * res_master_nodeid to our_nodeid yet, so allow either.  Don't
570	 * create the rsb; dlm_recover_process_copy() will handle EBADR
571	 * by resending.
572	 *
573	 * If someone sends us a request, we are the dir node, and we do
574	 * not find the rsb anywhere, then recreate it.  This happens if
575	 * someone sends us a request after we have removed/freed an rsb
576	 * from our toss list.  (They sent a request instead of lookup
577	 * because they are using an rsb from their toss list.)
578	 */
579
580	if (from_local || from_dir ||
581	    (from_other && (dir_nodeid == our_nodeid))) {
582		create = 1;
583	}
584
585 retry:
586	if (create) {
587		error = pre_rsb_struct(ls);
588		if (error < 0)
589			goto out;
590	}
591
592	spin_lock(&ls->ls_rsbtbl[b].lock);
593
594	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
595	if (error)
596		goto do_toss;
597
598	/*
599	 * rsb is active, so we can't check master_nodeid without lock_rsb.
600	 */
601
602	kref_get(&r->res_ref);
603	error = 0;
604	goto out_unlock;
605
606
607 do_toss:
608	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
609	if (error)
610		goto do_new;
611
612	/*
613	 * rsb found inactive (master_nodeid may be out of date unless
614	 * we are the dir_nodeid or were the master)  No other thread
615	 * is using this rsb because it's on the toss list, so we can
616	 * look at or update res_master_nodeid without lock_rsb.
617	 */
618
619	if ((r->res_master_nodeid != our_nodeid) && from_other) {
620		/* our rsb was not master, and another node (not the dir node)
621		   has sent us a request */
622		log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
623			  from_nodeid, r->res_master_nodeid, dir_nodeid,
624			  r->res_name);
625		error = -ENOTBLK;
626		goto out_unlock;
627	}
628
629	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
630		/* don't think this should ever happen */
631		log_error(ls, "find_rsb toss from_dir %d master %d",
632			  from_nodeid, r->res_master_nodeid);
633		dlm_print_rsb(r);
634		/* fix it and go on */
635		r->res_master_nodeid = our_nodeid;
636		r->res_nodeid = 0;
637		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
638		r->res_first_lkid = 0;
639	}
640
641	if (from_local && (r->res_master_nodeid != our_nodeid)) {
642		/* Because we have held no locks on this rsb,
643		   res_master_nodeid could have become stale. */
644		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
645		r->res_first_lkid = 0;
646	}
647
648	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
649	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
650	goto out_unlock;
651
652
653 do_new:
654	/*
655	 * rsb not found
656	 */
657
658	if (error == -EBADR && !create)
659		goto out_unlock;
660
661	error = get_rsb_struct(ls, name, len, &r);
662	if (error == -EAGAIN) {
663		spin_unlock(&ls->ls_rsbtbl[b].lock);
664		goto retry;
665	}
666	if (error)
667		goto out_unlock;
668
669	r->res_hash = hash;
670	r->res_bucket = b;
671	r->res_dir_nodeid = dir_nodeid;
672	kref_init(&r->res_ref);
673
674	if (from_dir) {
675		/* want to see how often this happens */
676		log_debug(ls, "find_rsb new from_dir %d recreate %s",
677			  from_nodeid, r->res_name);
678		r->res_master_nodeid = our_nodeid;
679		r->res_nodeid = 0;
680		goto out_add;
681	}
682
683	if (from_other && (dir_nodeid != our_nodeid)) {
684		/* should never happen */
685		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
686			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
687		dlm_free_rsb(r);
688		r = NULL;
689		error = -ENOTBLK;
690		goto out_unlock;
691	}
692
693	if (from_other) {
694		log_debug(ls, "find_rsb new from_other %d dir %d %s",
695			  from_nodeid, dir_nodeid, r->res_name);
696	}
697
698	if (dir_nodeid == our_nodeid) {
699		/* When we are the dir nodeid, we can set the master
700		   node immediately */
701		r->res_master_nodeid = our_nodeid;
702		r->res_nodeid = 0;
703	} else {
704		/* set_master will send_lookup to dir_nodeid */
705		r->res_master_nodeid = 0;
706		r->res_nodeid = -1;
707	}
708
709 out_add:
710	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
711 out_unlock:
712	spin_unlock(&ls->ls_rsbtbl[b].lock);
713 out:
714	*r_ret = r;
715	return error;
716}
717
718/* During recovery, other nodes can send us new MSTCPY locks (from
719   dlm_recover_locks) before we've made ourself master (in
720   dlm_recover_masters). */
721
722static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
723			  uint32_t hash, uint32_t b,
724			  int dir_nodeid, int from_nodeid,
725			  unsigned int flags, struct dlm_rsb **r_ret)
726{
727	struct dlm_rsb *r = NULL;
728	int our_nodeid = dlm_our_nodeid();
729	int recover = (flags & R_RECEIVE_RECOVER);
730	int error;
731
732 retry:
733	error = pre_rsb_struct(ls);
734	if (error < 0)
735		goto out;
736
737	spin_lock(&ls->ls_rsbtbl[b].lock);
738
739	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
740	if (error)
741		goto do_toss;
742
743	/*
744	 * rsb is active, so we can't check master_nodeid without lock_rsb.
745	 */
746
747	kref_get(&r->res_ref);
748	goto out_unlock;
749
750
751 do_toss:
752	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
753	if (error)
754		goto do_new;
755
756	/*
757	 * rsb found inactive. No other thread is using this rsb because
758	 * it's on the toss list, so we can look at or update
759	 * res_master_nodeid without lock_rsb.
760	 */
761
762	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
763		/* our rsb is not master, and another node has sent us a
764		   request; this should never happen */
765		log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
766			  from_nodeid, r->res_master_nodeid, dir_nodeid);
767		dlm_print_rsb(r);
768		error = -ENOTBLK;
769		goto out_unlock;
770	}
771
772	if (!recover && (r->res_master_nodeid != our_nodeid) &&
773	    (dir_nodeid == our_nodeid)) {
774		/* our rsb is not master, and we are dir; may as well fix it;
775		   this should never happen */
776		log_error(ls, "find_rsb toss our %d master %d dir %d",
777			  our_nodeid, r->res_master_nodeid, dir_nodeid);
778		dlm_print_rsb(r);
779		r->res_master_nodeid = our_nodeid;
780		r->res_nodeid = 0;
781	}
782
783	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
784	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
785	goto out_unlock;
786
787
788 do_new:
789	/*
790	 * rsb not found
791	 */
792
793	error = get_rsb_struct(ls, name, len, &r);
794	if (error == -EAGAIN) {
795		spin_unlock(&ls->ls_rsbtbl[b].lock);
796		goto retry;
797	}
798	if (error)
799		goto out_unlock;
800
801	r->res_hash = hash;
802	r->res_bucket = b;
803	r->res_dir_nodeid = dir_nodeid;
804	r->res_master_nodeid = dir_nodeid;
805	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
806	kref_init(&r->res_ref);
807
808	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
809 out_unlock:
810	spin_unlock(&ls->ls_rsbtbl[b].lock);
811 out:
812	*r_ret = r;
813	return error;
814}
815
816static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
817		    unsigned int flags, struct dlm_rsb **r_ret)
818{
819	uint32_t hash, b;
820	int dir_nodeid;
821
822	if (len > DLM_RESNAME_MAXLEN)
823		return -EINVAL;
824
825	hash = jhash(name, len, 0);
826	b = hash & (ls->ls_rsbtbl_size - 1);
827
828	dir_nodeid = dlm_hash2nodeid(ls, hash);
829
830	if (dlm_no_directory(ls))
831		return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
832				      from_nodeid, flags, r_ret);
833	else
834		return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
835				      from_nodeid, flags, r_ret);
836}
837
838/* we have received a request and found that res_master_nodeid != our_nodeid,
839   so we need to return an error or make ourself the master */
840
841static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
842				  int from_nodeid)
843{
844	if (dlm_no_directory(ls)) {
845		log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
846			  from_nodeid, r->res_master_nodeid,
847			  r->res_dir_nodeid);
848		dlm_print_rsb(r);
849		return -ENOTBLK;
850	}
851
852	if (from_nodeid != r->res_dir_nodeid) {
853		/* our rsb is not master, and another node (not the dir node)
854	   	   has sent us a request.  this is much more common when our
855	   	   master_nodeid is zero, so limit debug to non-zero.  */
856
857		if (r->res_master_nodeid) {
858			log_debug(ls, "validate master from_other %d master %d "
859				  "dir %d first %x %s", from_nodeid,
860				  r->res_master_nodeid, r->res_dir_nodeid,
861				  r->res_first_lkid, r->res_name);
862		}
863		return -ENOTBLK;
864	} else {
865		/* our rsb is not master, but the dir nodeid has sent us a
866	   	   request; this could happen with master 0 / res_nodeid -1 */
867
868		if (r->res_master_nodeid) {
869			log_error(ls, "validate master from_dir %d master %d "
870				  "first %x %s",
871				  from_nodeid, r->res_master_nodeid,
872				  r->res_first_lkid, r->res_name);
873		}
874
875		r->res_master_nodeid = dlm_our_nodeid();
876		r->res_nodeid = 0;
877		return 0;
878	}
879}
880
881/*
882 * We're the dir node for this res and another node wants to know the
883 * master nodeid.  During normal operation (non recovery) this is only
884 * called from receive_lookup(); master lookups when the local node is
885 * the dir node are done by find_rsb().
886 *
887 * normal operation, we are the dir node for a resource
888 * . _request_lock
889 * . set_master
890 * . send_lookup
891 * . receive_lookup
892 * . dlm_master_lookup flags 0
893 *
894 * recover directory, we are rebuilding dir for all resources
895 * . dlm_recover_directory
896 * . dlm_rcom_names
897 *   remote node sends back the rsb names it is master of and we are dir of
898 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
899 *   we either create new rsb setting remote node as master, or find existing
900 *   rsb and set master to be the remote node.
901 *
902 * recover masters, we are finding the new master for resources
903 * . dlm_recover_masters
904 * . recover_master
905 * . dlm_send_rcom_lookup
906 * . receive_rcom_lookup
907 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
908 */
909
910int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
911		      unsigned int flags, int *r_nodeid, int *result)
912{
913	struct dlm_rsb *r = NULL;
914	uint32_t hash, b;
915	int from_master = (flags & DLM_LU_RECOVER_DIR);
916	int fix_master = (flags & DLM_LU_RECOVER_MASTER);
917	int our_nodeid = dlm_our_nodeid();
918	int dir_nodeid, error, toss_list = 0;
919
920	if (len > DLM_RESNAME_MAXLEN)
921		return -EINVAL;
922
923	if (from_nodeid == our_nodeid) {
924		log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
925			  our_nodeid, flags);
926		return -EINVAL;
927	}
928
929	hash = jhash(name, len, 0);
930	b = hash & (ls->ls_rsbtbl_size - 1);
931
932	dir_nodeid = dlm_hash2nodeid(ls, hash);
933	if (dir_nodeid != our_nodeid) {
934		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
935			  from_nodeid, dir_nodeid, our_nodeid, hash,
936			  ls->ls_num_nodes);
937		*r_nodeid = -1;
938		return -EINVAL;
939	}
940
941 retry:
942	error = pre_rsb_struct(ls);
943	if (error < 0)
944		return error;
945
946	spin_lock(&ls->ls_rsbtbl[b].lock);
947	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
948	if (!error) {
949		/* because the rsb is active, we need to lock_rsb before
950		   checking/changing re_master_nodeid */
951
952		hold_rsb(r);
953		spin_unlock(&ls->ls_rsbtbl[b].lock);
954		lock_rsb(r);
955		goto found;
956	}
957
958	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
959	if (error)
960		goto not_found;
961
962	/* because the rsb is inactive (on toss list), it's not refcounted
963	   and lock_rsb is not used, but is protected by the rsbtbl lock */
964
965	toss_list = 1;
966 found:
967	if (r->res_dir_nodeid != our_nodeid) {
968		/* should not happen, but may as well fix it and carry on */
969		log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
970			  r->res_dir_nodeid, our_nodeid, r->res_name);
971		r->res_dir_nodeid = our_nodeid;
972	}
973
974	if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
975		/* Recovery uses this function to set a new master when
976		   the previous master failed.  Setting NEW_MASTER will
977		   force dlm_recover_masters to call recover_master on this
978		   rsb even though the res_nodeid is no longer removed. */
979
980		r->res_master_nodeid = from_nodeid;
981		r->res_nodeid = from_nodeid;
982		rsb_set_flag(r, RSB_NEW_MASTER);
983
984		if (toss_list) {
985			/* I don't think we should ever find it on toss list. */
986			log_error(ls, "dlm_master_lookup fix_master on toss");
987			dlm_dump_rsb(r);
988		}
989	}
990
991	if (from_master && (r->res_master_nodeid != from_nodeid)) {
992		/* this will happen if from_nodeid became master during
993		   a previous recovery cycle, and we aborted the previous
994		   cycle before recovering this master value */
995
996		log_limit(ls, "dlm_master_lookup from_master %d "
997			  "master_nodeid %d res_nodeid %d first %x %s",
998			  from_nodeid, r->res_master_nodeid, r->res_nodeid,
999			  r->res_first_lkid, r->res_name);
1000
1001		if (r->res_master_nodeid == our_nodeid) {
1002			log_error(ls, "from_master %d our_master", from_nodeid);
1003			dlm_dump_rsb(r);
1004			goto out_found;
1005		}
1006
1007		r->res_master_nodeid = from_nodeid;
1008		r->res_nodeid = from_nodeid;
1009		rsb_set_flag(r, RSB_NEW_MASTER);
1010	}
1011
1012	if (!r->res_master_nodeid) {
1013		/* this will happen if recovery happens while we're looking
1014		   up the master for this rsb */
1015
1016		log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1017			  from_nodeid, r->res_first_lkid, r->res_name);
1018		r->res_master_nodeid = from_nodeid;
1019		r->res_nodeid = from_nodeid;
1020	}
1021
1022	if (!from_master && !fix_master &&
1023	    (r->res_master_nodeid == from_nodeid)) {
1024		/* this can happen when the master sends remove, the dir node
1025		   finds the rsb on the keep list and ignores the remove,
1026		   and the former master sends a lookup */
1027
1028		log_limit(ls, "dlm_master_lookup from master %d flags %x "
1029			  "first %x %s", from_nodeid, flags,
1030			  r->res_first_lkid, r->res_name);
1031	}
1032
1033 out_found:
1034	*r_nodeid = r->res_master_nodeid;
1035	if (result)
1036		*result = DLM_LU_MATCH;
1037
1038	if (toss_list) {
1039		r->res_toss_time = jiffies;
1040		/* the rsb was inactive (on toss list) */
1041		spin_unlock(&ls->ls_rsbtbl[b].lock);
1042	} else {
1043		/* the rsb was active */
1044		unlock_rsb(r);
1045		put_rsb(r);
1046	}
1047	return 0;
1048
1049 not_found:
1050	error = get_rsb_struct(ls, name, len, &r);
1051	if (error == -EAGAIN) {
1052		spin_unlock(&ls->ls_rsbtbl[b].lock);
1053		goto retry;
1054	}
1055	if (error)
1056		goto out_unlock;
1057
1058	r->res_hash = hash;
1059	r->res_bucket = b;
1060	r->res_dir_nodeid = our_nodeid;
1061	r->res_master_nodeid = from_nodeid;
1062	r->res_nodeid = from_nodeid;
1063	kref_init(&r->res_ref);
1064	r->res_toss_time = jiffies;
1065
1066	error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1067	if (error) {
1068		/* should never happen */
1069		dlm_free_rsb(r);
1070		spin_unlock(&ls->ls_rsbtbl[b].lock);
1071		goto retry;
1072	}
1073
1074	if (result)
1075		*result = DLM_LU_ADD;
1076	*r_nodeid = from_nodeid;
1077	error = 0;
1078 out_unlock:
1079	spin_unlock(&ls->ls_rsbtbl[b].lock);
1080	return error;
1081}
1082
1083static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1084{
1085	struct rb_node *n;
1086	struct dlm_rsb *r;
1087	int i;
1088
1089	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1090		spin_lock(&ls->ls_rsbtbl[i].lock);
1091		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1092			r = rb_entry(n, struct dlm_rsb, res_hashnode);
1093			if (r->res_hash == hash)
1094				dlm_dump_rsb(r);
1095		}
1096		spin_unlock(&ls->ls_rsbtbl[i].lock);
1097	}
1098}
1099
1100void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1101{
1102	struct dlm_rsb *r = NULL;
1103	uint32_t hash, b;
1104	int error;
1105
1106	hash = jhash(name, len, 0);
1107	b = hash & (ls->ls_rsbtbl_size - 1);
1108
1109	spin_lock(&ls->ls_rsbtbl[b].lock);
1110	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1111	if (!error)
1112		goto out_dump;
1113
1114	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1115	if (error)
1116		goto out;
1117 out_dump:
1118	dlm_dump_rsb(r);
1119 out:
1120	spin_unlock(&ls->ls_rsbtbl[b].lock);
1121}
1122
1123static void toss_rsb(struct kref *kref)
1124{
1125	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1126	struct dlm_ls *ls = r->res_ls;
1127
1128	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1129	kref_init(&r->res_ref);
1130	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1131	rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1132	r->res_toss_time = jiffies;
1133	ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK;
1134	if (r->res_lvbptr) {
1135		dlm_free_lvb(r->res_lvbptr);
1136		r->res_lvbptr = NULL;
1137	}
1138}
1139
1140/* See comment for unhold_lkb */
1141
1142static void unhold_rsb(struct dlm_rsb *r)
1143{
1144	int rv;
1145	rv = kref_put(&r->res_ref, toss_rsb);
1146	DLM_ASSERT(!rv, dlm_dump_rsb(r););
1147}
1148
1149static void kill_rsb(struct kref *kref)
1150{
1151	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1152
1153	/* All work is done after the return from kref_put() so we
1154	   can release the write_lock before the remove and free. */
1155
1156	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1157	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1158	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1159	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1160	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1161	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1162}
1163
1164/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1165   The rsb must exist as long as any lkb's for it do. */
1166
1167static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1168{
1169	hold_rsb(r);
1170	lkb->lkb_resource = r;
1171}
1172
1173static void detach_lkb(struct dlm_lkb *lkb)
1174{
1175	if (lkb->lkb_resource) {
1176		put_rsb(lkb->lkb_resource);
1177		lkb->lkb_resource = NULL;
1178	}
1179}
1180
1181static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1182{
1183	struct dlm_lkb *lkb;
1184	int rv;
1185
1186	lkb = dlm_allocate_lkb(ls);
1187	if (!lkb)
1188		return -ENOMEM;
1189
1190	lkb->lkb_nodeid = -1;
1191	lkb->lkb_grmode = DLM_LOCK_IV;
1192	kref_init(&lkb->lkb_ref);
1193	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1194	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1195	INIT_LIST_HEAD(&lkb->lkb_time_list);
1196	INIT_LIST_HEAD(&lkb->lkb_cb_list);
1197	mutex_init(&lkb->lkb_cb_mutex);
1198	INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1199
1200	idr_preload(GFP_NOFS);
1201	spin_lock(&ls->ls_lkbidr_spin);
1202	rv = idr_alloc(&ls->ls_lkbidr, lkb, 1, 0, GFP_NOWAIT);
1203	if (rv >= 0)
1204		lkb->lkb_id = rv;
1205	spin_unlock(&ls->ls_lkbidr_spin);
1206	idr_preload_end();
1207
1208	if (rv < 0) {
1209		log_error(ls, "create_lkb idr error %d", rv);
1210		dlm_free_lkb(lkb);
1211		return rv;
1212	}
1213
1214	*lkb_ret = lkb;
1215	return 0;
1216}
1217
1218static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1219{
1220	struct dlm_lkb *lkb;
1221
1222	spin_lock(&ls->ls_lkbidr_spin);
1223	lkb = idr_find(&ls->ls_lkbidr, lkid);
1224	if (lkb)
1225		kref_get(&lkb->lkb_ref);
1226	spin_unlock(&ls->ls_lkbidr_spin);
1227
1228	*lkb_ret = lkb;
1229	return lkb ? 0 : -ENOENT;
1230}
1231
1232static void kill_lkb(struct kref *kref)
1233{
1234	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1235
1236	/* All work is done after the return from kref_put() so we
1237	   can release the write_lock before the detach_lkb */
1238
1239	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1240}
1241
1242/* __put_lkb() is used when an lkb may not have an rsb attached to
1243   it so we need to provide the lockspace explicitly */
1244
1245static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1246{
1247	uint32_t lkid = lkb->lkb_id;
1248
1249	spin_lock(&ls->ls_lkbidr_spin);
1250	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
1251		idr_remove(&ls->ls_lkbidr, lkid);
1252		spin_unlock(&ls->ls_lkbidr_spin);
1253
1254		detach_lkb(lkb);
1255
1256		/* for local/process lkbs, lvbptr points to caller's lksb */
1257		if (lkb->lkb_lvbptr && is_master_copy(lkb))
1258			dlm_free_lvb(lkb->lkb_lvbptr);
1259		dlm_free_lkb(lkb);
1260		return 1;
1261	} else {
1262		spin_unlock(&ls->ls_lkbidr_spin);
1263		return 0;
1264	}
1265}
1266
1267int dlm_put_lkb(struct dlm_lkb *lkb)
1268{
1269	struct dlm_ls *ls;
1270
1271	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1272	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1273
1274	ls = lkb->lkb_resource->res_ls;
1275	return __put_lkb(ls, lkb);
1276}
1277
1278/* This is only called to add a reference when the code already holds
1279   a valid reference to the lkb, so there's no need for locking. */
1280
1281static inline void hold_lkb(struct dlm_lkb *lkb)
1282{
1283	kref_get(&lkb->lkb_ref);
1284}
1285
1286/* This is called when we need to remove a reference and are certain
1287   it's not the last ref.  e.g. del_lkb is always called between a
1288   find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1289   put_lkb would work fine, but would involve unnecessary locking */
1290
1291static inline void unhold_lkb(struct dlm_lkb *lkb)
1292{
1293	int rv;
1294	rv = kref_put(&lkb->lkb_ref, kill_lkb);
1295	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1296}
1297
1298static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1299			    int mode)
1300{
1301	struct dlm_lkb *lkb = NULL;
1302
1303	list_for_each_entry(lkb, head, lkb_statequeue)
1304		if (lkb->lkb_rqmode < mode)
1305			break;
1306
1307	__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
1308}
1309
1310/* add/remove lkb to rsb's grant/convert/wait queue */
1311
1312static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1313{
1314	kref_get(&lkb->lkb_ref);
1315
1316	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1317
1318	lkb->lkb_timestamp = ktime_get();
1319
1320	lkb->lkb_status = status;
1321
1322	switch (status) {
1323	case DLM_LKSTS_WAITING:
1324		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1325			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1326		else
1327			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1328		break;
1329	case DLM_LKSTS_GRANTED:
1330		/* convention says granted locks kept in order of grmode */
1331		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1332				lkb->lkb_grmode);
1333		break;
1334	case DLM_LKSTS_CONVERT:
1335		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1336			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1337		else
1338			list_add_tail(&lkb->lkb_statequeue,
1339				      &r->res_convertqueue);
1340		break;
1341	default:
1342		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1343	}
1344}
1345
1346static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1347{
1348	lkb->lkb_status = 0;
1349	list_del(&lkb->lkb_statequeue);
1350	unhold_lkb(lkb);
1351}
1352
1353static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1354{
1355	hold_lkb(lkb);
1356	del_lkb(r, lkb);
1357	add_lkb(r, lkb, sts);
1358	unhold_lkb(lkb);
1359}
1360
1361static int msg_reply_type(int mstype)
1362{
1363	switch (mstype) {
1364	case DLM_MSG_REQUEST:
1365		return DLM_MSG_REQUEST_REPLY;
1366	case DLM_MSG_CONVERT:
1367		return DLM_MSG_CONVERT_REPLY;
1368	case DLM_MSG_UNLOCK:
1369		return DLM_MSG_UNLOCK_REPLY;
1370	case DLM_MSG_CANCEL:
1371		return DLM_MSG_CANCEL_REPLY;
1372	case DLM_MSG_LOOKUP:
1373		return DLM_MSG_LOOKUP_REPLY;
1374	}
1375	return -1;
1376}
1377
1378static int nodeid_warned(int nodeid, int num_nodes, int *warned)
1379{
1380	int i;
1381
1382	for (i = 0; i < num_nodes; i++) {
1383		if (!warned[i]) {
1384			warned[i] = nodeid;
1385			return 0;
1386		}
1387		if (warned[i] == nodeid)
1388			return 1;
1389	}
1390	return 0;
1391}
1392
1393void dlm_scan_waiters(struct dlm_ls *ls)
1394{
1395	struct dlm_lkb *lkb;
1396	s64 us;
1397	s64 debug_maxus = 0;
1398	u32 debug_scanned = 0;
1399	u32 debug_expired = 0;
1400	int num_nodes = 0;
1401	int *warned = NULL;
1402
1403	if (!dlm_config.ci_waitwarn_us)
1404		return;
1405
1406	mutex_lock(&ls->ls_waiters_mutex);
1407
1408	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1409		if (!lkb->lkb_wait_time)
1410			continue;
1411
1412		debug_scanned++;
1413
1414		us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1415
1416		if (us < dlm_config.ci_waitwarn_us)
1417			continue;
1418
1419		lkb->lkb_wait_time = 0;
1420
1421		debug_expired++;
1422		if (us > debug_maxus)
1423			debug_maxus = us;
1424
1425		if (!num_nodes) {
1426			num_nodes = ls->ls_num_nodes;
1427			warned = kcalloc(num_nodes, sizeof(int), GFP_KERNEL);
1428		}
1429		if (!warned)
1430			continue;
1431		if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1432			continue;
1433
1434		log_error(ls, "waitwarn %x %lld %d us check connection to "
1435			  "node %d", lkb->lkb_id, (long long)us,
1436			  dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1437	}
1438	mutex_unlock(&ls->ls_waiters_mutex);
1439	kfree(warned);
1440
1441	if (debug_expired)
1442		log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1443			  debug_scanned, debug_expired,
1444			  dlm_config.ci_waitwarn_us, (long long)debug_maxus);
1445}
1446
1447/* add/remove lkb from global waiters list of lkb's waiting for
1448   a reply from a remote node */
1449
1450static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1451{
1452	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1453	int error = 0;
1454
1455	mutex_lock(&ls->ls_waiters_mutex);
1456
1457	if (is_overlap_unlock(lkb) ||
1458	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1459		error = -EINVAL;
1460		goto out;
1461	}
1462
1463	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1464		switch (mstype) {
1465		case DLM_MSG_UNLOCK:
1466			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1467			break;
1468		case DLM_MSG_CANCEL:
1469			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1470			break;
1471		default:
1472			error = -EBUSY;
1473			goto out;
1474		}
1475		lkb->lkb_wait_count++;
1476		hold_lkb(lkb);
1477
1478		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1479			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
1480			  lkb->lkb_wait_count, lkb->lkb_flags);
1481		goto out;
1482	}
1483
1484	DLM_ASSERT(!lkb->lkb_wait_count,
1485		   dlm_print_lkb(lkb);
1486		   printk("wait_count %d\n", lkb->lkb_wait_count););
1487
1488	lkb->lkb_wait_count++;
1489	lkb->lkb_wait_type = mstype;
1490	lkb->lkb_wait_time = ktime_get();
1491	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1492	hold_lkb(lkb);
1493	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1494 out:
1495	if (error)
1496		log_error(ls, "addwait error %x %d flags %x %d %d %s",
1497			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
1498			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1499	mutex_unlock(&ls->ls_waiters_mutex);
1500	return error;
1501}
1502
1503/* We clear the RESEND flag because we might be taking an lkb off the waiters
1504   list as part of process_requestqueue (e.g. a lookup that has an optimized
1505   request reply on the requestqueue) between dlm_recover_waiters_pre() which
1506   set RESEND and dlm_recover_waiters_post() */
1507
1508static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1509				struct dlm_message *ms)
1510{
1511	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1512	int overlap_done = 0;
1513
1514	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1515		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1516		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1517		overlap_done = 1;
1518		goto out_del;
1519	}
1520
1521	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1522		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1523		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1524		overlap_done = 1;
1525		goto out_del;
1526	}
1527
1528	/* Cancel state was preemptively cleared by a successful convert,
1529	   see next comment, nothing to do. */
1530
1531	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1532	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1533		log_debug(ls, "remwait %x cancel_reply wait_type %d",
1534			  lkb->lkb_id, lkb->lkb_wait_type);
1535		return -1;
1536	}
1537
1538	/* Remove for the convert reply, and premptively remove for the
1539	   cancel reply.  A convert has been granted while there's still
1540	   an outstanding cancel on it (the cancel is moot and the result
1541	   in the cancel reply should be 0).  We preempt the cancel reply
1542	   because the app gets the convert result and then can follow up
1543	   with another op, like convert.  This subsequent op would see the
1544	   lingering state of the cancel and fail with -EBUSY. */
1545
1546	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1547	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1548	    is_overlap_cancel(lkb) && ms && !ms->m_result) {
1549		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1550			  lkb->lkb_id);
1551		lkb->lkb_wait_type = 0;
1552		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1553		lkb->lkb_wait_count--;
1554		unhold_lkb(lkb);
1555		goto out_del;
1556	}
1557
1558	/* N.B. type of reply may not always correspond to type of original
1559	   msg due to lookup->request optimization, verify others? */
1560
1561	if (lkb->lkb_wait_type) {
1562		lkb->lkb_wait_type = 0;
1563		goto out_del;
1564	}
1565
1566	log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1567		  lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1568		  mstype, lkb->lkb_flags);
1569	return -1;
1570
1571 out_del:
1572	/* the force-unlock/cancel has completed and we haven't recvd a reply
1573	   to the op that was in progress prior to the unlock/cancel; we
1574	   give up on any reply to the earlier op.  FIXME: not sure when/how
1575	   this would happen */
1576
1577	if (overlap_done && lkb->lkb_wait_type) {
1578		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1579			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1580		lkb->lkb_wait_count--;
1581		unhold_lkb(lkb);
1582		lkb->lkb_wait_type = 0;
1583	}
1584
1585	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1586
1587	lkb->lkb_flags &= ~DLM_IFL_RESEND;
1588	lkb->lkb_wait_count--;
1589	if (!lkb->lkb_wait_count)
1590		list_del_init(&lkb->lkb_wait_reply);
1591	unhold_lkb(lkb);
1592	return 0;
1593}
1594
1595static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1596{
1597	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1598	int error;
1599
1600	mutex_lock(&ls->ls_waiters_mutex);
1601	error = _remove_from_waiters(lkb, mstype, NULL);
1602	mutex_unlock(&ls->ls_waiters_mutex);
1603	return error;
1604}
1605
1606/* Handles situations where we might be processing a "fake" or "stub" reply in
1607   which we can't try to take waiters_mutex again. */
1608
1609static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1610{
1611	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1612	int error;
1613
1614	if (ms->m_flags != DLM_IFL_STUB_MS)
1615		mutex_lock(&ls->ls_waiters_mutex);
1616	error = _remove_from_waiters(lkb, ms->m_type, ms);
1617	if (ms->m_flags != DLM_IFL_STUB_MS)
1618		mutex_unlock(&ls->ls_waiters_mutex);
1619	return error;
1620}
1621
1622/* If there's an rsb for the same resource being removed, ensure
1623   that the remove message is sent before the new lookup message.
1624   It should be rare to need a delay here, but if not, then it may
1625   be worthwhile to add a proper wait mechanism rather than a delay. */
1626
1627static void wait_pending_remove(struct dlm_rsb *r)
1628{
1629	struct dlm_ls *ls = r->res_ls;
1630 restart:
1631	spin_lock(&ls->ls_remove_spin);
1632	if (ls->ls_remove_len &&
1633	    !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
1634		log_debug(ls, "delay lookup for remove dir %d %s",
1635		  	  r->res_dir_nodeid, r->res_name);
1636		spin_unlock(&ls->ls_remove_spin);
1637		msleep(1);
1638		goto restart;
1639	}
1640	spin_unlock(&ls->ls_remove_spin);
1641}
1642
1643/*
1644 * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1645 * read by other threads in wait_pending_remove.  ls_remove_names
1646 * and ls_remove_lens are only used by the scan thread, so they do
1647 * not need protection.
1648 */
1649
1650static void shrink_bucket(struct dlm_ls *ls, int b)
1651{
1652	struct rb_node *n, *next;
1653	struct dlm_rsb *r;
1654	char *name;
1655	int our_nodeid = dlm_our_nodeid();
1656	int remote_count = 0;
1657	int need_shrink = 0;
1658	int i, len, rv;
1659
1660	memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1661
1662	spin_lock(&ls->ls_rsbtbl[b].lock);
1663
1664	if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) {
1665		spin_unlock(&ls->ls_rsbtbl[b].lock);
1666		return;
1667	}
1668
1669	for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1670		next = rb_next(n);
1671		r = rb_entry(n, struct dlm_rsb, res_hashnode);
1672
1673		/* If we're the directory record for this rsb, and
1674		   we're not the master of it, then we need to wait
1675		   for the master node to send us a dir remove for
1676		   before removing the dir record. */
1677
1678		if (!dlm_no_directory(ls) &&
1679		    (r->res_master_nodeid != our_nodeid) &&
1680		    (dlm_dir_nodeid(r) == our_nodeid)) {
1681			continue;
1682		}
1683
1684		need_shrink = 1;
1685
1686		if (!time_after_eq(jiffies, r->res_toss_time +
1687				   dlm_config.ci_toss_secs * HZ)) {
1688			continue;
1689		}
1690
1691		if (!dlm_no_directory(ls) &&
1692		    (r->res_master_nodeid == our_nodeid) &&
1693		    (dlm_dir_nodeid(r) != our_nodeid)) {
1694
1695			/* We're the master of this rsb but we're not
1696			   the directory record, so we need to tell the
1697			   dir node to remove the dir record. */
1698
1699			ls->ls_remove_lens[remote_count] = r->res_length;
1700			memcpy(ls->ls_remove_names[remote_count], r->res_name,
1701			       DLM_RESNAME_MAXLEN);
1702			remote_count++;
1703
1704			if (remote_count >= DLM_REMOVE_NAMES_MAX)
1705				break;
1706			continue;
1707		}
1708
1709		if (!kref_put(&r->res_ref, kill_rsb)) {
1710			log_error(ls, "tossed rsb in use %s", r->res_name);
1711			continue;
1712		}
1713
1714		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1715		dlm_free_rsb(r);
1716	}
1717
1718	if (need_shrink)
1719		ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK;
1720	else
1721		ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK;
1722	spin_unlock(&ls->ls_rsbtbl[b].lock);
1723
1724	/*
1725	 * While searching for rsb's to free, we found some that require
1726	 * remote removal.  We leave them in place and find them again here
1727	 * so there is a very small gap between removing them from the toss
1728	 * list and sending the removal.  Keeping this gap small is
1729	 * important to keep us (the master node) from being out of sync
1730	 * with the remote dir node for very long.
1731	 *
1732	 * From the time the rsb is removed from toss until just after
1733	 * send_remove, the rsb name is saved in ls_remove_name.  A new
1734	 * lookup checks this to ensure that a new lookup message for the
1735	 * same resource name is not sent just before the remove message.
1736	 */
1737
1738	for (i = 0; i < remote_count; i++) {
1739		name = ls->ls_remove_names[i];
1740		len = ls->ls_remove_lens[i];
1741
1742		spin_lock(&ls->ls_rsbtbl[b].lock);
1743		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1744		if (rv) {
1745			spin_unlock(&ls->ls_rsbtbl[b].lock);
1746			log_debug(ls, "remove_name not toss %s", name);
1747			continue;
1748		}
1749
1750		if (r->res_master_nodeid != our_nodeid) {
1751			spin_unlock(&ls->ls_rsbtbl[b].lock);
1752			log_debug(ls, "remove_name master %d dir %d our %d %s",
1753				  r->res_master_nodeid, r->res_dir_nodeid,
1754				  our_nodeid, name);
1755			continue;
1756		}
1757
1758		if (r->res_dir_nodeid == our_nodeid) {
1759			/* should never happen */
1760			spin_unlock(&ls->ls_rsbtbl[b].lock);
1761			log_error(ls, "remove_name dir %d master %d our %d %s",
1762				  r->res_dir_nodeid, r->res_master_nodeid,
1763				  our_nodeid, name);
1764			continue;
1765		}
1766
1767		if (!time_after_eq(jiffies, r->res_toss_time +
1768				   dlm_config.ci_toss_secs * HZ)) {
1769			spin_unlock(&ls->ls_rsbtbl[b].lock);
1770			log_debug(ls, "remove_name toss_time %lu now %lu %s",
1771				  r->res_toss_time, jiffies, name);
1772			continue;
1773		}
1774
1775		if (!kref_put(&r->res_ref, kill_rsb)) {
1776			spin_unlock(&ls->ls_rsbtbl[b].lock);
1777			log_error(ls, "remove_name in use %s", name);
1778			continue;
1779		}
1780
1781		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1782
1783		/* block lookup of same name until we've sent remove */
1784		spin_lock(&ls->ls_remove_spin);
1785		ls->ls_remove_len = len;
1786		memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1787		spin_unlock(&ls->ls_remove_spin);
1788		spin_unlock(&ls->ls_rsbtbl[b].lock);
1789
1790		send_remove(r);
1791
1792		/* allow lookup of name again */
1793		spin_lock(&ls->ls_remove_spin);
1794		ls->ls_remove_len = 0;
1795		memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1796		spin_unlock(&ls->ls_remove_spin);
1797
1798		dlm_free_rsb(r);
1799	}
1800}
1801
1802void dlm_scan_rsbs(struct dlm_ls *ls)
1803{
1804	int i;
1805
1806	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1807		shrink_bucket(ls, i);
1808		if (dlm_locking_stopped(ls))
1809			break;
1810		cond_resched();
1811	}
1812}
1813
1814static void add_timeout(struct dlm_lkb *lkb)
1815{
1816	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1817
1818	if (is_master_copy(lkb))
1819		return;
1820
1821	if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1822	    !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1823		lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1824		goto add_it;
1825	}
1826	if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1827		goto add_it;
1828	return;
1829
1830 add_it:
1831	DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1832	mutex_lock(&ls->ls_timeout_mutex);
1833	hold_lkb(lkb);
1834	list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1835	mutex_unlock(&ls->ls_timeout_mutex);
1836}
1837
1838static void del_timeout(struct dlm_lkb *lkb)
1839{
1840	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1841
1842	mutex_lock(&ls->ls_timeout_mutex);
1843	if (!list_empty(&lkb->lkb_time_list)) {
1844		list_del_init(&lkb->lkb_time_list);
1845		unhold_lkb(lkb);
1846	}
1847	mutex_unlock(&ls->ls_timeout_mutex);
1848}
1849
1850/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1851   lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1852   and then lock rsb because of lock ordering in add_timeout.  We may need
1853   to specify some special timeout-related bits in the lkb that are just to
1854   be accessed under the timeout_mutex. */
1855
1856void dlm_scan_timeout(struct dlm_ls *ls)
1857{
1858	struct dlm_rsb *r;
1859	struct dlm_lkb *lkb = NULL, *iter;
1860	int do_cancel, do_warn;
1861	s64 wait_us;
1862
1863	for (;;) {
1864		if (dlm_locking_stopped(ls))
1865			break;
1866
1867		do_cancel = 0;
1868		do_warn = 0;
1869		mutex_lock(&ls->ls_timeout_mutex);
1870		list_for_each_entry(iter, &ls->ls_timeout, lkb_time_list) {
1871
1872			wait_us = ktime_to_us(ktime_sub(ktime_get(),
1873							iter->lkb_timestamp));
1874
1875			if ((iter->lkb_exflags & DLM_LKF_TIMEOUT) &&
1876			    wait_us >= (iter->lkb_timeout_cs * 10000))
1877				do_cancel = 1;
1878
1879			if ((iter->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1880			    wait_us >= dlm_config.ci_timewarn_cs * 10000)
1881				do_warn = 1;
1882
1883			if (!do_cancel && !do_warn)
1884				continue;
1885			hold_lkb(iter);
1886			lkb = iter;
1887			break;
1888		}
1889		mutex_unlock(&ls->ls_timeout_mutex);
1890
1891		if (!lkb)
1892			break;
1893
1894		r = lkb->lkb_resource;
1895		hold_rsb(r);
1896		lock_rsb(r);
1897
1898		if (do_warn) {
1899			/* clear flag so we only warn once */
1900			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1901			if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1902				del_timeout(lkb);
1903			dlm_timeout_warn(lkb);
1904		}
1905
1906		if (do_cancel) {
1907			log_debug(ls, "timeout cancel %x node %d %s",
1908				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1909			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1910			lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1911			del_timeout(lkb);
1912			_cancel_lock(r, lkb);
1913		}
1914
1915		unlock_rsb(r);
1916		unhold_rsb(r);
1917		dlm_put_lkb(lkb);
1918	}
1919}
1920
1921/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1922   dlm_recoverd before checking/setting ls_recover_begin. */
1923
1924void dlm_adjust_timeouts(struct dlm_ls *ls)
1925{
1926	struct dlm_lkb *lkb;
1927	u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1928
1929	ls->ls_recover_begin = 0;
1930	mutex_lock(&ls->ls_timeout_mutex);
1931	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1932		lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1933	mutex_unlock(&ls->ls_timeout_mutex);
1934
1935	if (!dlm_config.ci_waitwarn_us)
1936		return;
1937
1938	mutex_lock(&ls->ls_waiters_mutex);
1939	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1940		if (ktime_to_us(lkb->lkb_wait_time))
1941			lkb->lkb_wait_time = ktime_get();
1942	}
1943	mutex_unlock(&ls->ls_waiters_mutex);
1944}
1945
1946/* lkb is master or local copy */
1947
1948static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1949{
1950	int b, len = r->res_ls->ls_lvblen;
1951
1952	/* b=1 lvb returned to caller
1953	   b=0 lvb written to rsb or invalidated
1954	   b=-1 do nothing */
1955
1956	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1957
1958	if (b == 1) {
1959		if (!lkb->lkb_lvbptr)
1960			return;
1961
1962		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1963			return;
1964
1965		if (!r->res_lvbptr)
1966			return;
1967
1968		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1969		lkb->lkb_lvbseq = r->res_lvbseq;
1970
1971	} else if (b == 0) {
1972		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1973			rsb_set_flag(r, RSB_VALNOTVALID);
1974			return;
1975		}
1976
1977		if (!lkb->lkb_lvbptr)
1978			return;
1979
1980		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1981			return;
1982
1983		if (!r->res_lvbptr)
1984			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1985
1986		if (!r->res_lvbptr)
1987			return;
1988
1989		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1990		r->res_lvbseq++;
1991		lkb->lkb_lvbseq = r->res_lvbseq;
1992		rsb_clear_flag(r, RSB_VALNOTVALID);
1993	}
1994
1995	if (rsb_flag(r, RSB_VALNOTVALID))
1996		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1997}
1998
1999static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2000{
2001	if (lkb->lkb_grmode < DLM_LOCK_PW)
2002		return;
2003
2004	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
2005		rsb_set_flag(r, RSB_VALNOTVALID);
2006		return;
2007	}
2008
2009	if (!lkb->lkb_lvbptr)
2010		return;
2011
2012	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2013		return;
2014
2015	if (!r->res_lvbptr)
2016		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
2017
2018	if (!r->res_lvbptr)
2019		return;
2020
2021	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2022	r->res_lvbseq++;
2023	rsb_clear_flag(r, RSB_VALNOTVALID);
2024}
2025
2026/* lkb is process copy (pc) */
2027
2028static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2029			    struct dlm_message *ms)
2030{
2031	int b;
2032
2033	if (!lkb->lkb_lvbptr)
2034		return;
2035
2036	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2037		return;
2038
2039	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2040	if (b == 1) {
2041		int len = receive_extralen(ms);
2042		if (len > r->res_ls->ls_lvblen)
2043			len = r->res_ls->ls_lvblen;
2044		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2045		lkb->lkb_lvbseq = ms->m_lvbseq;
2046	}
2047}
2048
2049/* Manipulate lkb's on rsb's convert/granted/waiting queues
2050   remove_lock -- used for unlock, removes lkb from granted
2051   revert_lock -- used for cancel, moves lkb from convert to granted
2052   grant_lock  -- used for request and convert, adds lkb to granted or
2053                  moves lkb from convert or waiting to granted
2054
2055   Each of these is used for master or local copy lkb's.  There is
2056   also a _pc() variation used to make the corresponding change on
2057   a process copy (pc) lkb. */
2058
2059static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2060{
2061	del_lkb(r, lkb);
2062	lkb->lkb_grmode = DLM_LOCK_IV;
2063	/* this unhold undoes the original ref from create_lkb()
2064	   so this leads to the lkb being freed */
2065	unhold_lkb(lkb);
2066}
2067
2068static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2069{
2070	set_lvb_unlock(r, lkb);
2071	_remove_lock(r, lkb);
2072}
2073
2074static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2075{
2076	_remove_lock(r, lkb);
2077}
2078
2079/* returns: 0 did nothing
2080	    1 moved lock to granted
2081	   -1 removed lock */
2082
2083static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2084{
2085	int rv = 0;
2086
2087	lkb->lkb_rqmode = DLM_LOCK_IV;
2088
2089	switch (lkb->lkb_status) {
2090	case DLM_LKSTS_GRANTED:
2091		break;
2092	case DLM_LKSTS_CONVERT:
2093		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2094		rv = 1;
2095		break;
2096	case DLM_LKSTS_WAITING:
2097		del_lkb(r, lkb);
2098		lkb->lkb_grmode = DLM_LOCK_IV;
2099		/* this unhold undoes the original ref from create_lkb()
2100		   so this leads to the lkb being freed */
2101		unhold_lkb(lkb);
2102		rv = -1;
2103		break;
2104	default:
2105		log_print("invalid status for revert %d", lkb->lkb_status);
2106	}
2107	return rv;
2108}
2109
2110static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2111{
2112	return revert_lock(r, lkb);
2113}
2114
2115static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2116{
2117	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2118		lkb->lkb_grmode = lkb->lkb_rqmode;
2119		if (lkb->lkb_status)
2120			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2121		else
2122			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2123	}
2124
2125	lkb->lkb_rqmode = DLM_LOCK_IV;
2126	lkb->lkb_highbast = 0;
2127}
2128
2129static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2130{
2131	set_lvb_lock(r, lkb);
2132	_grant_lock(r, lkb);
2133}
2134
2135static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2136			  struct dlm_message *ms)
2137{
2138	set_lvb_lock_pc(r, lkb, ms);
2139	_grant_lock(r, lkb);
2140}
2141
2142/* called by grant_pending_locks() which means an async grant message must
2143   be sent to the requesting node in addition to granting the lock if the
2144   lkb belongs to a remote node. */
2145
2146static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2147{
2148	grant_lock(r, lkb);
2149	if (is_master_copy(lkb))
2150		send_grant(r, lkb);
2151	else
2152		queue_cast(r, lkb, 0);
2153}
2154
2155/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2156   change the granted/requested modes.  We're munging things accordingly in
2157   the process copy.
2158   CONVDEADLK: our grmode may have been forced down to NL to resolve a
2159   conversion deadlock
2160   ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2161   compatible with other granted locks */
2162
2163static void munge_demoted(struct dlm_lkb *lkb)
2164{
2165	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2166		log_print("munge_demoted %x invalid modes gr %d rq %d",
2167			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2168		return;
2169	}
2170
2171	lkb->lkb_grmode = DLM_LOCK_NL;
2172}
2173
2174static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2175{
2176	if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
2177	    ms->m_type != DLM_MSG_GRANT) {
2178		log_print("munge_altmode %x invalid reply type %d",
2179			  lkb->lkb_id, ms->m_type);
2180		return;
2181	}
2182
2183	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2184		lkb->lkb_rqmode = DLM_LOCK_PR;
2185	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2186		lkb->lkb_rqmode = DLM_LOCK_CW;
2187	else {
2188		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2189		dlm_print_lkb(lkb);
2190	}
2191}
2192
2193static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2194{
2195	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2196					   lkb_statequeue);
2197	if (lkb->lkb_id == first->lkb_id)
2198		return 1;
2199
2200	return 0;
2201}
2202
2203/* Check if the given lkb conflicts with another lkb on the queue. */
2204
2205static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2206{
2207	struct dlm_lkb *this;
2208
2209	list_for_each_entry(this, head, lkb_statequeue) {
2210		if (this == lkb)
2211			continue;
2212		if (!modes_compat(this, lkb))
2213			return 1;
2214	}
2215	return 0;
2216}
2217
2218/*
2219 * "A conversion deadlock arises with a pair of lock requests in the converting
2220 * queue for one resource.  The granted mode of each lock blocks the requested
2221 * mode of the other lock."
2222 *
2223 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2224 * convert queue from being granted, then deadlk/demote lkb.
2225 *
2226 * Example:
2227 * Granted Queue: empty
2228 * Convert Queue: NL->EX (first lock)
2229 *                PR->EX (second lock)
2230 *
2231 * The first lock can't be granted because of the granted mode of the second
2232 * lock and the second lock can't be granted because it's not first in the
2233 * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2234 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2235 * flag set and return DEMOTED in the lksb flags.
2236 *
2237 * Originally, this function detected conv-deadlk in a more limited scope:
2238 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2239 * - if lkb1 was the first entry in the queue (not just earlier), and was
2240 *   blocked by the granted mode of lkb2, and there was nothing on the
2241 *   granted queue preventing lkb1 from being granted immediately, i.e.
2242 *   lkb2 was the only thing preventing lkb1 from being granted.
2243 *
2244 * That second condition meant we'd only say there was conv-deadlk if
2245 * resolving it (by demotion) would lead to the first lock on the convert
2246 * queue being granted right away.  It allowed conversion deadlocks to exist
2247 * between locks on the convert queue while they couldn't be granted anyway.
2248 *
2249 * Now, we detect and take action on conversion deadlocks immediately when
2250 * they're created, even if they may not be immediately consequential.  If
2251 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2252 * mode that would prevent lkb1's conversion from being granted, we do a
2253 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2254 * I think this means that the lkb_is_ahead condition below should always
2255 * be zero, i.e. there will never be conv-deadlk between two locks that are
2256 * both already on the convert queue.
2257 */
2258
2259static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2260{
2261	struct dlm_lkb *lkb1;
2262	int lkb_is_ahead = 0;
2263
2264	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2265		if (lkb1 == lkb2) {
2266			lkb_is_ahead = 1;
2267			continue;
2268		}
2269
2270		if (!lkb_is_ahead) {
2271			if (!modes_compat(lkb2, lkb1))
2272				return 1;
2273		} else {
2274			if (!modes_compat(lkb2, lkb1) &&
2275			    !modes_compat(lkb1, lkb2))
2276				return 1;
2277		}
2278	}
2279	return 0;
2280}
2281
2282/*
2283 * Return 1 if the lock can be granted, 0 otherwise.
2284 * Also detect and resolve conversion deadlocks.
2285 *
2286 * lkb is the lock to be granted
2287 *
2288 * now is 1 if the function is being called in the context of the
2289 * immediate request, it is 0 if called later, after the lock has been
2290 * queued.
2291 *
2292 * recover is 1 if dlm_recover_grant() is trying to grant conversions
2293 * after recovery.
2294 *
2295 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2296 */
2297
2298static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2299			   int recover)
2300{
2301	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2302
2303	/*
2304	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2305	 * a new request for a NL mode lock being blocked.
2306	 *
2307	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2308	 * request, then it would be granted.  In essence, the use of this flag
2309	 * tells the Lock Manager to expedite theis request by not considering
2310	 * what may be in the CONVERTING or WAITING queues...  As of this
2311	 * writing, the EXPEDITE flag can be used only with new requests for NL
2312	 * mode locks.  This flag is not valid for conversion requests.
2313	 *
2314	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2315	 * conversion or used with a non-NL requested mode.  We also know an
2316	 * EXPEDITE request is always granted immediately, so now must always
2317	 * be 1.  The full condition to grant an expedite request: (now &&
2318	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2319	 * therefore be shortened to just checking the flag.
2320	 */
2321
2322	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2323		return 1;
2324
2325	/*
2326	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2327	 * added to the remaining conditions.
2328	 */
2329
2330	if (queue_conflict(&r->res_grantqueue, lkb))
2331		return 0;
2332
2333	/*
2334	 * 6-3: By default, a conversion request is immediately granted if the
2335	 * requested mode is compatible with the modes of all other granted
2336	 * locks
2337	 */
2338
2339	if (queue_conflict(&r->res_convertqueue, lkb))
2340		return 0;
2341
2342	/*
2343	 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2344	 * locks for a recovered rsb, on which lkb's have been rebuilt.
2345	 * The lkb's may have been rebuilt on the queues in a different
2346	 * order than they were in on the previous master.  So, granting
2347	 * queued conversions in order after recovery doesn't make sense
2348	 * since the order hasn't been preserved anyway.  The new order
2349	 * could also have created a new "in place" conversion deadlock.
2350	 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2351	 * After recovery, there would be no granted locks, and possibly
2352	 * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2353	 * recovery, grant conversions without considering order.
2354	 */
2355
2356	if (conv && recover)
2357		return 1;
2358
2359	/*
2360	 * 6-5: But the default algorithm for deciding whether to grant or
2361	 * queue conversion requests does not by itself guarantee that such
2362	 * requests are serviced on a "first come first serve" basis.  This, in
2363	 * turn, can lead to a phenomenon known as "indefinate postponement".
2364	 *
2365	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2366	 * the system service employed to request a lock conversion.  This flag
2367	 * forces certain conversion requests to be queued, even if they are
2368	 * compatible with the granted modes of other locks on the same
2369	 * resource.  Thus, the use of this flag results in conversion requests
2370	 * being ordered on a "first come first servce" basis.
2371	 *
2372	 * DCT: This condition is all about new conversions being able to occur
2373	 * "in place" while the lock remains on the granted queue (assuming
2374	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2375	 * doesn't _have_ to go onto the convert queue where it's processed in
2376	 * order.  The "now" variable is necessary to distinguish converts
2377	 * being received and processed for the first time now, because once a
2378	 * convert is moved to the conversion queue the condition below applies
2379	 * requiring fifo granting.
2380	 */
2381
2382	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2383		return 1;
2384
2385	/*
2386	 * Even if the convert is compat with all granted locks,
2387	 * QUECVT forces it behind other locks on the convert queue.
2388	 */
2389
2390	if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2391		if (list_empty(&r->res_convertqueue))
2392			return 1;
2393		else
2394			return 0;
2395	}
2396
2397	/*
2398	 * The NOORDER flag is set to avoid the standard vms rules on grant
2399	 * order.
2400	 */
2401
2402	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2403		return 1;
2404
2405	/*
2406	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2407	 * granted until all other conversion requests ahead of it are granted
2408	 * and/or canceled.
2409	 */
2410
2411	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2412		return 1;
2413
2414	/*
2415	 * 6-4: By default, a new request is immediately granted only if all
2416	 * three of the following conditions are satisfied when the request is
2417	 * issued:
2418	 * - The queue of ungranted conversion requests for the resource is
2419	 *   empty.
2420	 * - The queue of ungranted new requests for the resource is empty.
2421	 * - The mode of the new request is compatible with the most
2422	 *   restrictive mode of all granted locks on the resource.
2423	 */
2424
2425	if (now && !conv && list_empty(&r->res_convertqueue) &&
2426	    list_empty(&r->res_waitqueue))
2427		return 1;
2428
2429	/*
2430	 * 6-4: Once a lock request is in the queue of ungranted new requests,
2431	 * it cannot be granted until the queue of ungranted conversion
2432	 * requests is empty, all ungranted new requests ahead of it are
2433	 * granted and/or canceled, and it is compatible with the granted mode
2434	 * of the most restrictive lock granted on the resource.
2435	 */
2436
2437	if (!now && !conv && list_empty(&r->res_convertqueue) &&
2438	    first_in_list(lkb, &r->res_waitqueue))
2439		return 1;
2440
2441	return 0;
2442}
2443
2444static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2445			  int recover, int *err)
2446{
2447	int rv;
2448	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2449	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2450
2451	if (err)
2452		*err = 0;
2453
2454	rv = _can_be_granted(r, lkb, now, recover);
2455	if (rv)
2456		goto out;
2457
2458	/*
2459	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2460	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2461	 * cancels one of the locks.
2462	 */
2463
2464	if (is_convert && can_be_queued(lkb) &&
2465	    conversion_deadlock_detect(r, lkb)) {
2466		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2467			lkb->lkb_grmode = DLM_LOCK_NL;
2468			lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2469		} else if (err) {
2470			*err = -EDEADLK;
2471		} else {
2472			log_print("can_be_granted deadlock %x now %d",
2473				  lkb->lkb_id, now);
2474			dlm_dump_rsb(r);
2475		}
2476		goto out;
2477	}
2478
2479	/*
2480	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2481	 * to grant a request in a mode other than the normal rqmode.  It's a
2482	 * simple way to provide a big optimization to applications that can
2483	 * use them.
2484	 */
2485
2486	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2487		alt = DLM_LOCK_PR;
2488	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2489		alt = DLM_LOCK_CW;
2490
2491	if (alt) {
2492		lkb->lkb_rqmode = alt;
2493		rv = _can_be_granted(r, lkb, now, 0);
2494		if (rv)
2495			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2496		else
2497			lkb->lkb_rqmode = rqmode;
2498	}
2499 out:
2500	return rv;
2501}
2502
2503/* Returns the highest requested mode of all blocked conversions; sets
2504   cw if there's a blocked conversion to DLM_LOCK_CW. */
2505
2506static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2507				 unsigned int *count)
2508{
2509	struct dlm_lkb *lkb, *s;
2510	int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2511	int hi, demoted, quit, grant_restart, demote_restart;
2512	int deadlk;
2513
2514	quit = 0;
2515 restart:
2516	grant_restart = 0;
2517	demote_restart = 0;
2518	hi = DLM_LOCK_IV;
2519
2520	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2521		demoted = is_demoted(lkb);
2522		deadlk = 0;
2523
2524		if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2525			grant_lock_pending(r, lkb);
2526			grant_restart = 1;
2527			if (count)
2528				(*count)++;
2529			continue;
2530		}
2531
2532		if (!demoted && is_demoted(lkb)) {
2533			log_print("WARN: pending demoted %x node %d %s",
2534				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2535			demote_restart = 1;
2536			continue;
2537		}
2538
2539		if (deadlk) {
2540			/*
2541			 * If DLM_LKB_NODLKWT flag is set and conversion
2542			 * deadlock is detected, we request blocking AST and
2543			 * down (or cancel) conversion.
2544			 */
2545			if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2546				if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2547					queue_bast(r, lkb, lkb->lkb_rqmode);
2548					lkb->lkb_highbast = lkb->lkb_rqmode;
2549				}
2550			} else {
2551				log_print("WARN: pending deadlock %x node %d %s",
2552					  lkb->lkb_id, lkb->lkb_nodeid,
2553					  r->res_name);
2554				dlm_dump_rsb(r);
2555			}
2556			continue;
2557		}
2558
2559		hi = max_t(int, lkb->lkb_rqmode, hi);
2560
2561		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2562			*cw = 1;
2563	}
2564
2565	if (grant_restart)
2566		goto restart;
2567	if (demote_restart && !quit) {
2568		quit = 1;
2569		goto restart;
2570	}
2571
2572	return max_t(int, high, hi);
2573}
2574
2575static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2576			      unsigned int *count)
2577{
2578	struct dlm_lkb *lkb, *s;
2579
2580	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2581		if (can_be_granted(r, lkb, 0, 0, NULL)) {
2582			grant_lock_pending(r, lkb);
2583			if (count)
2584				(*count)++;
2585		} else {
2586			high = max_t(int, lkb->lkb_rqmode, high);
2587			if (lkb->lkb_rqmode == DLM_LOCK_CW)
2588				*cw = 1;
2589		}
2590	}
2591
2592	return high;
2593}
2594
2595/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2596   on either the convert or waiting queue.
2597   high is the largest rqmode of all locks blocked on the convert or
2598   waiting queue. */
2599
2600static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2601{
2602	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2603		if (gr->lkb_highbast < DLM_LOCK_EX)
2604			return 1;
2605		return 0;
2606	}
2607
2608	if (gr->lkb_highbast < high &&
2609	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2610		return 1;
2611	return 0;
2612}
2613
2614static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2615{
2616	struct dlm_lkb *lkb, *s;
2617	int high = DLM_LOCK_IV;
2618	int cw = 0;
2619
2620	if (!is_master(r)) {
2621		log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2622		dlm_dump_rsb(r);
2623		return;
2624	}
2625
2626	high = grant_pending_convert(r, high, &cw, count);
2627	high = grant_pending_wait(r, high, &cw, count);
2628
2629	if (high == DLM_LOCK_IV)
2630		return;
2631
2632	/*
2633	 * If there are locks left on the wait/convert queue then send blocking
2634	 * ASTs to granted locks based on the largest requested mode (high)
2635	 * found above.
2636	 */
2637
2638	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2639		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2640			if (cw && high == DLM_LOCK_PR &&
2641			    lkb->lkb_grmode == DLM_LOCK_PR)
2642				queue_bast(r, lkb, DLM_LOCK_CW);
2643			else
2644				queue_bast(r, lkb, high);
2645			lkb->lkb_highbast = high;
2646		}
2647	}
2648}
2649
2650static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2651{
2652	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2653	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2654		if (gr->lkb_highbast < DLM_LOCK_EX)
2655			return 1;
2656		return 0;
2657	}
2658
2659	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2660		return 1;
2661	return 0;
2662}
2663
2664static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2665			    struct dlm_lkb *lkb)
2666{
2667	struct dlm_lkb *gr;
2668
2669	list_for_each_entry(gr, head, lkb_statequeue) {
2670		/* skip self when sending basts to convertqueue */
2671		if (gr == lkb)
2672			continue;
2673		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2674			queue_bast(r, gr, lkb->lkb_rqmode);
2675			gr->lkb_highbast = lkb->lkb_rqmode;
2676		}
2677	}
2678}
2679
2680static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2681{
2682	send_bast_queue(r, &r->res_grantqueue, lkb);
2683}
2684
2685static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2686{
2687	send_bast_queue(r, &r->res_grantqueue, lkb);
2688	send_bast_queue(r, &r->res_convertqueue, lkb);
2689}
2690
2691/* set_master(r, lkb) -- set the master nodeid of a resource
2692
2693   The purpose of this function is to set the nodeid field in the given
2694   lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2695   known, it can just be copied to the lkb and the function will return
2696   0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2697   before it can be copied to the lkb.
2698
2699   When the rsb nodeid is being looked up remotely, the initial lkb
2700   causing the lookup is kept on the ls_waiters list waiting for the
2701   lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2702   on the rsb's res_lookup list until the master is verified.
2703
2704   Return values:
2705   0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2706   1: the rsb master is not available and the lkb has been placed on
2707      a wait queue
2708*/
2709
2710static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2711{
2712	int our_nodeid = dlm_our_nodeid();
2713
2714	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2715		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2716		r->res_first_lkid = lkb->lkb_id;
2717		lkb->lkb_nodeid = r->res_nodeid;
2718		return 0;
2719	}
2720
2721	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2722		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2723		return 1;
2724	}
2725
2726	if (r->res_master_nodeid == our_nodeid) {
2727		lkb->lkb_nodeid = 0;
2728		return 0;
2729	}
2730
2731	if (r->res_master_nodeid) {
2732		lkb->lkb_nodeid = r->res_master_nodeid;
2733		return 0;
2734	}
2735
2736	if (dlm_dir_nodeid(r) == our_nodeid) {
2737		/* This is a somewhat unusual case; find_rsb will usually
2738		   have set res_master_nodeid when dir nodeid is local, but
2739		   there are cases where we become the dir node after we've
2740		   past find_rsb and go through _request_lock again.
2741		   confirm_master() or process_lookup_list() needs to be
2742		   called after this. */
2743		log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2744			  lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2745			  r->res_name);
2746		r->res_master_nodeid = our_nodeid;
2747		r->res_nodeid = 0;
2748		lkb->lkb_nodeid = 0;
2749		return 0;
2750	}
2751
2752	wait_pending_remove(r);
2753
2754	r->res_first_lkid = lkb->lkb_id;
2755	send_lookup(r, lkb);
2756	return 1;
2757}
2758
2759static void process_lookup_list(struct dlm_rsb *r)
2760{
2761	struct dlm_lkb *lkb, *safe;
2762
2763	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2764		list_del_init(&lkb->lkb_rsb_lookup);
2765		_request_lock(r, lkb);
2766		schedule();
2767	}
2768}
2769
2770/* confirm_master -- confirm (or deny) an rsb's master nodeid */
2771
2772static void confirm_master(struct dlm_rsb *r, int error)
2773{
2774	struct dlm_lkb *lkb;
2775
2776	if (!r->res_first_lkid)
2777		return;
2778
2779	switch (error) {
2780	case 0:
2781	case -EINPROGRESS:
2782		r->res_first_lkid = 0;
2783		process_lookup_list(r);
2784		break;
2785
2786	case -EAGAIN:
2787	case -EBADR:
2788	case -ENOTBLK:
2789		/* the remote request failed and won't be retried (it was
2790		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2791		   lkb the first_lkid */
2792
2793		r->res_first_lkid = 0;
2794
2795		if (!list_empty(&r->res_lookup)) {
2796			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2797					 lkb_rsb_lookup);
2798			list_del_init(&lkb->lkb_rsb_lookup);
2799			r->res_first_lkid = lkb->lkb_id;
2800			_request_lock(r, lkb);
2801		}
2802		break;
2803
2804	default:
2805		log_error(r->res_ls, "confirm_master unknown error %d", error);
2806	}
2807}
2808
2809static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2810			 int namelen, unsigned long timeout_cs,
2811			 void (*ast) (void *astparam),
2812			 void *astparam,
2813			 void (*bast) (void *astparam, int mode),
2814			 struct dlm_args *args)
2815{
2816	int rv = -EINVAL;
2817
2818	/* check for invalid arg usage */
2819
2820	if (mode < 0 || mode > DLM_LOCK_EX)
2821		goto out;
2822
2823	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2824		goto out;
2825
2826	if (flags & DLM_LKF_CANCEL)
2827		goto out;
2828
2829	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2830		goto out;
2831
2832	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2833		goto out;
2834
2835	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2836		goto out;
2837
2838	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2839		goto out;
2840
2841	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2842		goto out;
2843
2844	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2845		goto out;
2846
2847	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2848		goto out;
2849
2850	if (!ast || !lksb)
2851		goto out;
2852
2853	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2854		goto out;
2855
2856	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2857		goto out;
2858
2859	/* these args will be copied to the lkb in validate_lock_args,
2860	   it cannot be done now because when converting locks, fields in
2861	   an active lkb cannot be modified before locking the rsb */
2862
2863	args->flags = flags;
2864	args->astfn = ast;
2865	args->astparam = astparam;
2866	args->bastfn = bast;
2867	args->timeout = timeout_cs;
2868	args->mode = mode;
2869	args->lksb = lksb;
2870	rv = 0;
2871 out:
2872	return rv;
2873}
2874
2875static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2876{
2877	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2878 		      DLM_LKF_FORCEUNLOCK))
2879		return -EINVAL;
2880
2881	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2882		return -EINVAL;
2883
2884	args->flags = flags;
2885	args->astparam = astarg;
2886	return 0;
2887}
2888
2889static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2890			      struct dlm_args *args)
2891{
2892	int rv = -EBUSY;
2893
2894	if (args->flags & DLM_LKF_CONVERT) {
2895		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2896			goto out;
2897
2898		if (lkb->lkb_wait_type)
2899			goto out;
2900
2901		if (is_overlap(lkb))
2902			goto out;
2903
2904		rv = -EINVAL;
2905		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2906			goto out;
2907
2908		if (args->flags & DLM_LKF_QUECVT &&
2909		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2910			goto out;
2911	}
2912
2913	lkb->lkb_exflags = args->flags;
2914	lkb->lkb_sbflags = 0;
2915	lkb->lkb_astfn = args->astfn;
2916	lkb->lkb_astparam = args->astparam;
2917	lkb->lkb_bastfn = args->bastfn;
2918	lkb->lkb_rqmode = args->mode;
2919	lkb->lkb_lksb = args->lksb;
2920	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2921	lkb->lkb_ownpid = (int) current->pid;
2922	lkb->lkb_timeout_cs = args->timeout;
2923	rv = 0;
2924 out:
2925	if (rv)
2926		log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2927			  rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2928			  lkb->lkb_status, lkb->lkb_wait_type,
2929			  lkb->lkb_resource->res_name);
2930	return rv;
2931}
2932
2933/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2934   for success */
2935
2936/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2937   because there may be a lookup in progress and it's valid to do
2938   cancel/unlockf on it */
2939
2940static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2941{
2942	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2943	int rv = -EINVAL;
2944
2945	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2946		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2947		dlm_print_lkb(lkb);
2948		goto out;
2949	}
2950
2951	/* an lkb may still exist even though the lock is EOL'ed due to a
2952	   cancel, unlock or failed noqueue request; an app can't use these
2953	   locks; return same error as if the lkid had not been found at all */
2954
2955	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2956		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2957		rv = -ENOENT;
2958		goto out;
2959	}
2960
2961	/* an lkb may be waiting for an rsb lookup to complete where the
2962	   lookup was initiated by another lock */
2963
2964	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2965		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2966			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2967			list_del_init(&lkb->lkb_rsb_lookup);
2968			queue_cast(lkb->lkb_resource, lkb,
2969				   args->flags & DLM_LKF_CANCEL ?
2970				   -DLM_ECANCEL : -DLM_EUNLOCK);
2971			unhold_lkb(lkb); /* undoes create_lkb() */
2972		}
2973		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2974		rv = -EBUSY;
2975		goto out;
2976	}
2977
2978	/* cancel not allowed with another cancel/unlock in progress */
2979
2980	if (args->flags & DLM_LKF_CANCEL) {
2981		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2982			goto out;
2983
2984		if (is_overlap(lkb))
2985			goto out;
2986
2987		/* don't let scand try to do a cancel */
2988		del_timeout(lkb);
2989
2990		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2991			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2992			rv = -EBUSY;
2993			goto out;
2994		}
2995
2996		/* there's nothing to cancel */
2997		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2998		    !lkb->lkb_wait_type) {
2999			rv = -EBUSY;
3000			goto out;
3001		}
3002
3003		switch (lkb->lkb_wait_type) {
3004		case DLM_MSG_LOOKUP:
3005		case DLM_MSG_REQUEST:
3006			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
3007			rv = -EBUSY;
3008			goto out;
3009		case DLM_MSG_UNLOCK:
3010		case DLM_MSG_CANCEL:
3011			goto out;
3012		}
3013		/* add_to_waiters() will set OVERLAP_CANCEL */
3014		goto out_ok;
3015	}
3016
3017	/* do we need to allow a force-unlock if there's a normal unlock
3018	   already in progress?  in what conditions could the normal unlock
3019	   fail such that we'd want to send a force-unlock to be sure? */
3020
3021	if (args->flags & DLM_LKF_FORCEUNLOCK) {
3022		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3023			goto out;
3024
3025		if (is_overlap_unlock(lkb))
3026			goto out;
3027
3028		/* don't let scand try to do a cancel */
3029		del_timeout(lkb);
3030
3031		if (lkb->lkb_flags & DLM_IFL_RESEND) {
3032			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3033			rv = -EBUSY;
3034			goto out;
3035		}
3036
3037		switch (lkb->lkb_wait_type) {
3038		case DLM_MSG_LOOKUP:
3039		case DLM_MSG_REQUEST:
3040			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3041			rv = -EBUSY;
3042			goto out;
3043		case DLM_MSG_UNLOCK:
3044			goto out;
3045		}
3046		/* add_to_waiters() will set OVERLAP_UNLOCK */
3047		goto out_ok;
3048	}
3049
3050	/* normal unlock not allowed if there's any op in progress */
3051	rv = -EBUSY;
3052	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3053		goto out;
3054
3055 out_ok:
3056	/* an overlapping op shouldn't blow away exflags from other op */
3057	lkb->lkb_exflags |= args->flags;
3058	lkb->lkb_sbflags = 0;
3059	lkb->lkb_astparam = args->astparam;
3060	rv = 0;
3061 out:
3062	if (rv)
3063		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3064			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3065			  args->flags, lkb->lkb_wait_type,
3066			  lkb->lkb_resource->res_name);
3067	return rv;
3068}
3069
3070/*
3071 * Four stage 4 varieties:
3072 * do_request(), do_convert(), do_unlock(), do_cancel()
3073 * These are called on the master node for the given lock and
3074 * from the central locking logic.
3075 */
3076
3077static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3078{
3079	int error = 0;
3080
3081	if (can_be_granted(r, lkb, 1, 0, NULL)) {
3082		grant_lock(r, lkb);
3083		queue_cast(r, lkb, 0);
3084		goto out;
3085	}
3086
3087	if (can_be_queued(lkb)) {
3088		error = -EINPROGRESS;
3089		add_lkb(r, lkb, DLM_LKSTS_WAITING);
3090		add_timeout(lkb);
3091		goto out;
3092	}
3093
3094	error = -EAGAIN;
3095	queue_cast(r, lkb, -EAGAIN);
3096 out:
3097	return error;
3098}
3099
3100static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3101			       int error)
3102{
3103	switch (error) {
3104	case -EAGAIN:
3105		if (force_blocking_asts(lkb))
3106			send_blocking_asts_all(r, lkb);
3107		break;
3108	case -EINPROGRESS:
3109		send_blocking_asts(r, lkb);
3110		break;
3111	}
3112}
3113
3114static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3115{
3116	int error = 0;
3117	int deadlk = 0;
3118
3119	/* changing an existing lock may allow others to be granted */
3120
3121	if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3122		grant_lock(r, lkb);
3123		queue_cast(r, lkb, 0);
3124		goto out;
3125	}
3126
3127	/* can_be_granted() detected that this lock would block in a conversion
3128	   deadlock, so we leave it on the granted queue and return EDEADLK in
3129	   the ast for the convert. */
3130
3131	if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3132		/* it's left on the granted queue */
3133		revert_lock(r, lkb);
3134		queue_cast(r, lkb, -EDEADLK);
3135		error = -EDEADLK;
3136		goto out;
3137	}
3138
3139	/* is_demoted() means the can_be_granted() above set the grmode
3140	   to NL, and left us on the granted queue.  This auto-demotion
3141	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
3142	   now grantable.  We have to try to grant other converting locks
3143	   before we try again to grant this one. */
3144
3145	if (is_demoted(lkb)) {
3146		grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3147		if (_can_be_granted(r, lkb, 1, 0)) {
3148			grant_lock(r, lkb);
3149			queue_cast(r, lkb, 0);
3150			goto out;
3151		}
3152		/* else fall through and move to convert queue */
3153	}
3154
3155	if (can_be_queued(lkb)) {
3156		error = -EINPROGRESS;
3157		del_lkb(r, lkb);
3158		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3159		add_timeout(lkb);
3160		goto out;
3161	}
3162
3163	error = -EAGAIN;
3164	queue_cast(r, lkb, -EAGAIN);
3165 out:
3166	return error;
3167}
3168
3169static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3170			       int error)
3171{
3172	switch (error) {
3173	case 0:
3174		grant_pending_locks(r, NULL);
3175		/* grant_pending_locks also sends basts */
3176		break;
3177	case -EAGAIN:
3178		if (force_blocking_asts(lkb))
3179			send_blocking_asts_all(r, lkb);
3180		break;
3181	case -EINPROGRESS:
3182		send_blocking_asts(r, lkb);
3183		break;
3184	}
3185}
3186
3187static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3188{
3189	remove_lock(r, lkb);
3190	queue_cast(r, lkb, -DLM_EUNLOCK);
3191	return -DLM_EUNLOCK;
3192}
3193
3194static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3195			      int error)
3196{
3197	grant_pending_locks(r, NULL);
3198}
3199
3200/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3201
3202static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3203{
3204	int error;
3205
3206	error = revert_lock(r, lkb);
3207	if (error) {
3208		queue_cast(r, lkb, -DLM_ECANCEL);
3209		return -DLM_ECANCEL;
3210	}
3211	return 0;
3212}
3213
3214static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3215			      int error)
3216{
3217	if (error)
3218		grant_pending_locks(r, NULL);
3219}
3220
3221/*
3222 * Four stage 3 varieties:
3223 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3224 */
3225
3226/* add a new lkb to a possibly new rsb, called by requesting process */
3227
3228static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3229{
3230	int error;
3231
3232	/* set_master: sets lkb nodeid from r */
3233
3234	error = set_master(r, lkb);
3235	if (error < 0)
3236		goto out;
3237	if (error) {
3238		error = 0;
3239		goto out;
3240	}
3241
3242	if (is_remote(r)) {
3243		/* receive_request() calls do_request() on remote node */
3244		error = send_request(r, lkb);
3245	} else {
3246		error = do_request(r, lkb);
3247		/* for remote locks the request_reply is sent
3248		   between do_request and do_request_effects */
3249		do_request_effects(r, lkb, error);
3250	}
3251 out:
3252	return error;
3253}
3254
3255/* change some property of an existing lkb, e.g. mode */
3256
3257static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3258{
3259	int error;
3260
3261	if (is_remote(r)) {
3262		/* receive_convert() calls do_convert() on remote node */
3263		error = send_convert(r, lkb);
3264	} else {
3265		error = do_convert(r, lkb);
3266		/* for remote locks the convert_reply is sent
3267		   between do_convert and do_convert_effects */
3268		do_convert_effects(r, lkb, error);
3269	}
3270
3271	return error;
3272}
3273
3274/* remove an existing lkb from the granted queue */
3275
3276static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3277{
3278	int error;
3279
3280	if (is_remote(r)) {
3281		/* receive_unlock() calls do_unlock() on remote node */
3282		error = send_unlock(r, lkb);
3283	} else {
3284		error = do_unlock(r, lkb);
3285		/* for remote locks the unlock_reply is sent
3286		   between do_unlock and do_unlock_effects */
3287		do_unlock_effects(r, lkb, error);
3288	}
3289
3290	return error;
3291}
3292
3293/* remove an existing lkb from the convert or wait queue */
3294
3295static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3296{
3297	int error;
3298
3299	if (is_remote(r)) {
3300		/* receive_cancel() calls do_cancel() on remote node */
3301		error = send_cancel(r, lkb);
3302	} else {
3303		error = do_cancel(r, lkb);
3304		/* for remote locks the cancel_reply is sent
3305		   between do_cancel and do_cancel_effects */
3306		do_cancel_effects(r, lkb, error);
3307	}
3308
3309	return error;
3310}
3311
3312/*
3313 * Four stage 2 varieties:
3314 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3315 */
3316
3317static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3318			int len, struct dlm_args *args)
3319{
3320	struct dlm_rsb *r;
3321	int error;
3322
3323	error = validate_lock_args(ls, lkb, args);
3324	if (error)
3325		return error;
3326
3327	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3328	if (error)
3329		return error;
3330
3331	lock_rsb(r);
3332
3333	attach_lkb(r, lkb);
3334	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3335
3336	error = _request_lock(r, lkb);
3337
3338	unlock_rsb(r);
3339	put_rsb(r);
3340	return error;
3341}
3342
3343static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3344			struct dlm_args *args)
3345{
3346	struct dlm_rsb *r;
3347	int error;
3348
3349	r = lkb->lkb_resource;
3350
3351	hold_rsb(r);
3352	lock_rsb(r);
3353
3354	error = validate_lock_args(ls, lkb, args);
3355	if (error)
3356		goto out;
3357
3358	error = _convert_lock(r, lkb);
3359 out:
3360	unlock_rsb(r);
3361	put_rsb(r);
3362	return error;
3363}
3364
3365static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3366		       struct dlm_args *args)
3367{
3368	struct dlm_rsb *r;
3369	int error;
3370
3371	r = lkb->lkb_resource;
3372
3373	hold_rsb(r);
3374	lock_rsb(r);
3375
3376	error = validate_unlock_args(lkb, args);
3377	if (error)
3378		goto out;
3379
3380	error = _unlock_lock(r, lkb);
3381 out:
3382	unlock_rsb(r);
3383	put_rsb(r);
3384	return error;
3385}
3386
3387static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3388		       struct dlm_args *args)
3389{
3390	struct dlm_rsb *r;
3391	int error;
3392
3393	r = lkb->lkb_resource;
3394
3395	hold_rsb(r);
3396	lock_rsb(r);
3397
3398	error = validate_unlock_args(lkb, args);
3399	if (error)
3400		goto out;
3401
3402	error = _cancel_lock(r, lkb);
3403 out:
3404	unlock_rsb(r);
3405	put_rsb(r);
3406	return error;
3407}
3408
3409/*
3410 * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3411 */
3412
3413int dlm_lock(dlm_lockspace_t *lockspace,
3414	     int mode,
3415	     struct dlm_lksb *lksb,
3416	     uint32_t flags,
3417	     void *name,
3418	     unsigned int namelen,
3419	     uint32_t parent_lkid,
3420	     void (*ast) (void *astarg),
3421	     void *astarg,
3422	     void (*bast) (void *astarg, int mode))
3423{
3424	struct dlm_ls *ls;
3425	struct dlm_lkb *lkb;
3426	struct dlm_args args;
3427	int error, convert = flags & DLM_LKF_CONVERT;
3428
3429	ls = dlm_find_lockspace_local(lockspace);
3430	if (!ls)
3431		return -EINVAL;
3432
3433	dlm_lock_recovery(ls);
3434
3435	if (convert)
3436		error = find_lkb(ls, lksb->sb_lkid, &lkb);
3437	else
3438		error = create_lkb(ls, &lkb);
3439
3440	if (error)
3441		goto out;
3442
3443	error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3444			      astarg, bast, &args);
3445	if (error)
3446		goto out_put;
3447
3448	if (convert)
3449		error = convert_lock(ls, lkb, &args);
3450	else
3451		error = request_lock(ls, lkb, name, namelen, &args);
3452
3453	if (error == -EINPROGRESS)
3454		error = 0;
3455 out_put:
3456	if (convert || error)
3457		__put_lkb(ls, lkb);
3458	if (error == -EAGAIN || error == -EDEADLK)
3459		error = 0;
3460 out:
3461	dlm_unlock_recovery(ls);
3462	dlm_put_lockspace(ls);
3463	return error;
3464}
3465
3466int dlm_unlock(dlm_lockspace_t *lockspace,
3467	       uint32_t lkid,
3468	       uint32_t flags,
3469	       struct dlm_lksb *lksb,
3470	       void *astarg)
3471{
3472	struct dlm_ls *ls;
3473	struct dlm_lkb *lkb;
3474	struct dlm_args args;
3475	int error;
3476
3477	ls = dlm_find_lockspace_local(lockspace);
3478	if (!ls)
3479		return -EINVAL;
3480
3481	dlm_lock_recovery(ls);
3482
3483	error = find_lkb(ls, lkid, &lkb);
3484	if (error)
3485		goto out;
3486
3487	error = set_unlock_args(flags, astarg, &args);
3488	if (error)
3489		goto out_put;
3490
3491	if (flags & DLM_LKF_CANCEL)
3492		error = cancel_lock(ls, lkb, &args);
3493	else
3494		error = unlock_lock(ls, lkb, &args);
3495
3496	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3497		error = 0;
3498	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3499		error = 0;
3500 out_put:
3501	dlm_put_lkb(lkb);
3502 out:
3503	dlm_unlock_recovery(ls);
3504	dlm_put_lockspace(ls);
3505	return error;
3506}
3507
3508/*
3509 * send/receive routines for remote operations and replies
3510 *
3511 * send_args
3512 * send_common
3513 * send_request			receive_request
3514 * send_convert			receive_convert
3515 * send_unlock			receive_unlock
3516 * send_cancel			receive_cancel
3517 * send_grant			receive_grant
3518 * send_bast			receive_bast
3519 * send_lookup			receive_lookup
3520 * send_remove			receive_remove
3521 *
3522 * 				send_common_reply
3523 * receive_request_reply	send_request_reply
3524 * receive_convert_reply	send_convert_reply
3525 * receive_unlock_reply		send_unlock_reply
3526 * receive_cancel_reply		send_cancel_reply
3527 * receive_lookup_reply		send_lookup_reply
3528 */
3529
3530static int _create_message(struct dlm_ls *ls, int mb_len,
3531			   int to_nodeid, int mstype,
3532			   struct dlm_message **ms_ret,
3533			   struct dlm_mhandle **mh_ret)
3534{
3535	struct dlm_message *ms;
3536	struct dlm_mhandle *mh;
3537	char *mb;
3538
3539	/* get_buffer gives us a message handle (mh) that we need to
3540	   pass into lowcomms_commit and a message buffer (mb) that we
3541	   write our data into */
3542
3543	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
3544	if (!mh)
3545		return -ENOBUFS;
3546
3547	memset(mb, 0, mb_len);
3548
3549	ms = (struct dlm_message *) mb;
3550
3551	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3552	ms->m_header.h_lockspace = ls->ls_global_id;
3553	ms->m_header.h_nodeid = dlm_our_nodeid();
3554	ms->m_header.h_length = mb_len;
3555	ms->m_header.h_cmd = DLM_MSG;
3556
3557	ms->m_type = mstype;
3558
3559	*mh_ret = mh;
3560	*ms_ret = ms;
3561	return 0;
3562}
3563
3564static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3565			  int to_nodeid, int mstype,
3566			  struct dlm_message **ms_ret,
3567			  struct dlm_mhandle **mh_ret)
3568{
3569	int mb_len = sizeof(struct dlm_message);
3570
3571	switch (mstype) {
3572	case DLM_MSG_REQUEST:
3573	case DLM_MSG_LOOKUP:
3574	case DLM_MSG_REMOVE:
3575		mb_len += r->res_length;
3576		break;
3577	case DLM_MSG_CONVERT:
3578	case DLM_MSG_UNLOCK:
3579	case DLM_MSG_REQUEST_REPLY:
3580	case DLM_MSG_CONVERT_REPLY:
3581	case DLM_MSG_GRANT:
3582		if (lkb && lkb->lkb_lvbptr)
3583			mb_len += r->res_ls->ls_lvblen;
3584		break;
3585	}
3586
3587	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3588			       ms_ret, mh_ret);
3589}
3590
3591/* further lowcomms enhancements or alternate implementations may make
3592   the return value from this function useful at some point */
3593
3594static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3595{
3596	dlm_message_out(ms);
3597	dlm_lowcomms_commit_buffer(mh);
3598	return 0;
3599}
3600
3601static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3602		      struct dlm_message *ms)
3603{
3604	ms->m_nodeid   = lkb->lkb_nodeid;
3605	ms->m_pid      = lkb->lkb_ownpid;
3606	ms->m_lkid     = lkb->lkb_id;
3607	ms->m_remid    = lkb->lkb_remid;
3608	ms->m_exflags  = lkb->lkb_exflags;
3609	ms->m_sbflags  = lkb->lkb_sbflags;
3610	ms->m_flags    = lkb->lkb_flags;
3611	ms->m_lvbseq   = lkb->lkb_lvbseq;
3612	ms->m_status   = lkb->lkb_status;
3613	ms->m_grmode   = lkb->lkb_grmode;
3614	ms->m_rqmode   = lkb->lkb_rqmode;
3615	ms->m_hash     = r->res_hash;
3616
3617	/* m_result and m_bastmode are set from function args,
3618	   not from lkb fields */
3619
3620	if (lkb->lkb_bastfn)
3621		ms->m_asts |= DLM_CB_BAST;
3622	if (lkb->lkb_astfn)
3623		ms->m_asts |= DLM_CB_CAST;
3624
3625	/* compare with switch in create_message; send_remove() doesn't
3626	   use send_args() */
3627
3628	switch (ms->m_type) {
3629	case DLM_MSG_REQUEST:
3630	case DLM_MSG_LOOKUP:
3631		memcpy(ms->m_extra, r->res_name, r->res_length);
3632		break;
3633	case DLM_MSG_CONVERT:
3634	case DLM_MSG_UNLOCK:
3635	case DLM_MSG_REQUEST_REPLY:
3636	case DLM_MSG_CONVERT_REPLY:
3637	case DLM_MSG_GRANT:
3638		if (!lkb->lkb_lvbptr)
3639			break;
3640		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3641		break;
3642	}
3643}
3644
3645static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3646{
3647	struct dlm_message *ms;
3648	struct dlm_mhandle *mh;
3649	int to_nodeid, error;
3650
3651	to_nodeid = r->res_nodeid;
3652
3653	error = add_to_waiters(lkb, mstype, to_nodeid);
3654	if (error)
3655		return error;
3656
3657	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3658	if (error)
3659		goto fail;
3660
3661	send_args(r, lkb, ms);
3662
3663	error = send_message(mh, ms);
3664	if (error)
3665		goto fail;
3666	return 0;
3667
3668 fail:
3669	remove_from_waiters(lkb, msg_reply_type(mstype));
3670	return error;
3671}
3672
3673static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3674{
3675	return send_common(r, lkb, DLM_MSG_REQUEST);
3676}
3677
3678static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3679{
3680	int error;
3681
3682	error = send_common(r, lkb, DLM_MSG_CONVERT);
3683
3684	/* down conversions go without a reply from the master */
3685	if (!error && down_conversion(lkb)) {
3686		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3687		r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3688		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3689		r->res_ls->ls_stub_ms.m_result = 0;
3690		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3691	}
3692
3693	return error;
3694}
3695
3696/* FIXME: if this lkb is the only lock we hold on the rsb, then set
3697   MASTER_UNCERTAIN to force the next request on the rsb to confirm
3698   that the master is still correct. */
3699
3700static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3701{
3702	return send_common(r, lkb, DLM_MSG_UNLOCK);
3703}
3704
3705static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3706{
3707	return send_common(r, lkb, DLM_MSG_CANCEL);
3708}
3709
3710static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3711{
3712	struct dlm_message *ms;
3713	struct dlm_mhandle *mh;
3714	int to_nodeid, error;
3715
3716	to_nodeid = lkb->lkb_nodeid;
3717
3718	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3719	if (error)
3720		goto out;
3721
3722	send_args(r, lkb, ms);
3723
3724	ms->m_result = 0;
3725
3726	error = send_message(mh, ms);
3727 out:
3728	return error;
3729}
3730
3731static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3732{
3733	struct dlm_message *ms;
3734	struct dlm_mhandle *mh;
3735	int to_nodeid, error;
3736
3737	to_nodeid = lkb->lkb_nodeid;
3738
3739	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3740	if (error)
3741		goto out;
3742
3743	send_args(r, lkb, ms);
3744
3745	ms->m_bastmode = mode;
3746
3747	error = send_message(mh, ms);
3748 out:
3749	return error;
3750}
3751
3752static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3753{
3754	struct dlm_message *ms;
3755	struct dlm_mhandle *mh;
3756	int to_nodeid, error;
3757
3758	to_nodeid = dlm_dir_nodeid(r);
3759
3760	error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3761	if (error)
3762		return error;
3763
3764	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3765	if (error)
3766		goto fail;
3767
3768	send_args(r, lkb, ms);
3769
3770	error = send_message(mh, ms);
3771	if (error)
3772		goto fail;
3773	return 0;
3774
3775 fail:
3776	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3777	return error;
3778}
3779
3780static int send_remove(struct dlm_rsb *r)
3781{
3782	struct dlm_message *ms;
3783	struct dlm_mhandle *mh;
3784	int to_nodeid, error;
3785
3786	to_nodeid = dlm_dir_nodeid(r);
3787
3788	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3789	if (error)
3790		goto out;
3791
3792	memcpy(ms->m_extra, r->res_name, r->res_length);
3793	ms->m_hash = r->res_hash;
3794
3795	error = send_message(mh, ms);
3796 out:
3797	return error;
3798}
3799
3800static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3801			     int mstype, int rv)
3802{
3803	struct dlm_message *ms;
3804	struct dlm_mhandle *mh;
3805	int to_nodeid, error;
3806
3807	to_nodeid = lkb->lkb_nodeid;
3808
3809	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3810	if (error)
3811		goto out;
3812
3813	send_args(r, lkb, ms);
3814
3815	ms->m_result = rv;
3816
3817	error = send_message(mh, ms);
3818 out:
3819	return error;
3820}
3821
3822static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3823{
3824	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3825}
3826
3827static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3828{
3829	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3830}
3831
3832static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3833{
3834	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3835}
3836
3837static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3838{
3839	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3840}
3841
3842static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3843			     int ret_nodeid, int rv)
3844{
3845	struct dlm_rsb *r = &ls->ls_stub_rsb;
3846	struct dlm_message *ms;
3847	struct dlm_mhandle *mh;
3848	int error, nodeid = ms_in->m_header.h_nodeid;
3849
3850	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3851	if (error)
3852		goto out;
3853
3854	ms->m_lkid = ms_in->m_lkid;
3855	ms->m_result = rv;
3856	ms->m_nodeid = ret_nodeid;
3857
3858	error = send_message(mh, ms);
3859 out:
3860	return error;
3861}
3862
3863/* which args we save from a received message depends heavily on the type
3864   of message, unlike the send side where we can safely send everything about
3865   the lkb for any type of message */
3866
3867static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3868{
3869	lkb->lkb_exflags = ms->m_exflags;
3870	lkb->lkb_sbflags = ms->m_sbflags;
3871	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3872		         (ms->m_flags & 0x0000FFFF);
3873}
3874
3875static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3876{
3877	if (ms->m_flags == DLM_IFL_STUB_MS)
3878		return;
3879
3880	lkb->lkb_sbflags = ms->m_sbflags;
3881	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3882		         (ms->m_flags & 0x0000FFFF);
3883}
3884
3885static int receive_extralen(struct dlm_message *ms)
3886{
3887	return (ms->m_header.h_length - sizeof(struct dlm_message));
3888}
3889
3890static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3891		       struct dlm_message *ms)
3892{
3893	int len;
3894
3895	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3896		if (!lkb->lkb_lvbptr)
3897			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3898		if (!lkb->lkb_lvbptr)
3899			return -ENOMEM;
3900		len = receive_extralen(ms);
3901		if (len > ls->ls_lvblen)
3902			len = ls->ls_lvblen;
3903		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3904	}
3905	return 0;
3906}
3907
3908static void fake_bastfn(void *astparam, int mode)
3909{
3910	log_print("fake_bastfn should not be called");
3911}
3912
3913static void fake_astfn(void *astparam)
3914{
3915	log_print("fake_astfn should not be called");
3916}
3917
3918static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3919				struct dlm_message *ms)
3920{
3921	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3922	lkb->lkb_ownpid = ms->m_pid;
3923	lkb->lkb_remid = ms->m_lkid;
3924	lkb->lkb_grmode = DLM_LOCK_IV;
3925	lkb->lkb_rqmode = ms->m_rqmode;
3926
3927	lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3928	lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3929
3930	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3931		/* lkb was just created so there won't be an lvb yet */
3932		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3933		if (!lkb->lkb_lvbptr)
3934			return -ENOMEM;
3935	}
3936
3937	return 0;
3938}
3939
3940static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3941				struct dlm_message *ms)
3942{
3943	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3944		return -EBUSY;
3945
3946	if (receive_lvb(ls, lkb, ms))
3947		return -ENOMEM;
3948
3949	lkb->lkb_rqmode = ms->m_rqmode;
3950	lkb->lkb_lvbseq = ms->m_lvbseq;
3951
3952	return 0;
3953}
3954
3955static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3956			       struct dlm_message *ms)
3957{
3958	if (receive_lvb(ls, lkb, ms))
3959		return -ENOMEM;
3960	return 0;
3961}
3962
3963/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3964   uses to send a reply and that the remote end uses to process the reply. */
3965
3966static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3967{
3968	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3969	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3970	lkb->lkb_remid = ms->m_lkid;
3971}
3972
3973/* This is called after the rsb is locked so that we can safely inspect
3974   fields in the lkb. */
3975
3976static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3977{
3978	int from = ms->m_header.h_nodeid;
3979	int error = 0;
3980
3981	/* currently mixing of user/kernel locks are not supported */
3982	if (ms->m_flags & DLM_IFL_USER && ~lkb->lkb_flags & DLM_IFL_USER) {
3983		log_error(lkb->lkb_resource->res_ls,
3984			  "got user dlm message for a kernel lock");
3985		error = -EINVAL;
3986		goto out;
3987	}
3988
3989	switch (ms->m_type) {
3990	case DLM_MSG_CONVERT:
3991	case DLM_MSG_UNLOCK:
3992	case DLM_MSG_CANCEL:
3993		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3994			error = -EINVAL;
3995		break;
3996
3997	case DLM_MSG_CONVERT_REPLY:
3998	case DLM_MSG_UNLOCK_REPLY:
3999	case DLM_MSG_CANCEL_REPLY:
4000	case DLM_MSG_GRANT:
4001	case DLM_MSG_BAST:
4002		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
4003			error = -EINVAL;
4004		break;
4005
4006	case DLM_MSG_REQUEST_REPLY:
4007		if (!is_process_copy(lkb))
4008			error = -EINVAL;
4009		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
4010			error = -EINVAL;
4011		break;
4012
4013	default:
4014		error = -EINVAL;
4015	}
4016
4017out:
4018	if (error)
4019		log_error(lkb->lkb_resource->res_ls,
4020			  "ignore invalid message %d from %d %x %x %x %d",
4021			  ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
4022			  lkb->lkb_flags, lkb->lkb_nodeid);
4023	return error;
4024}
4025
4026static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4027{
4028	char name[DLM_RESNAME_MAXLEN + 1];
4029	struct dlm_message *ms;
4030	struct dlm_mhandle *mh;
4031	struct dlm_rsb *r;
4032	uint32_t hash, b;
4033	int rv, dir_nodeid;
4034
4035	memset(name, 0, sizeof(name));
4036	memcpy(name, ms_name, len);
4037
4038	hash = jhash(name, len, 0);
4039	b = hash & (ls->ls_rsbtbl_size - 1);
4040
4041	dir_nodeid = dlm_hash2nodeid(ls, hash);
4042
4043	log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4044
4045	spin_lock(&ls->ls_rsbtbl[b].lock);
4046	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4047	if (!rv) {
4048		spin_unlock(&ls->ls_rsbtbl[b].lock);
4049		log_error(ls, "repeat_remove on keep %s", name);
4050		return;
4051	}
4052
4053	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4054	if (!rv) {
4055		spin_unlock(&ls->ls_rsbtbl[b].lock);
4056		log_error(ls, "repeat_remove on toss %s", name);
4057		return;
4058	}
4059
4060	/* use ls->remove_name2 to avoid conflict with shrink? */
4061
4062	spin_lock(&ls->ls_remove_spin);
4063	ls->ls_remove_len = len;
4064	memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
4065	spin_unlock(&ls->ls_remove_spin);
4066	spin_unlock(&ls->ls_rsbtbl[b].lock);
4067
4068	rv = _create_message(ls, sizeof(struct dlm_message) + len,
4069			     dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
4070	if (rv)
4071		goto out;
4072
4073	memcpy(ms->m_extra, name, len);
4074	ms->m_hash = hash;
4075
4076	send_message(mh, ms);
4077
4078out:
4079	spin_lock(&ls->ls_remove_spin);
4080	ls->ls_remove_len = 0;
4081	memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
4082	spin_unlock(&ls->ls_remove_spin);
4083}
4084
4085static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
4086{
4087	struct dlm_lkb *lkb;
4088	struct dlm_rsb *r;
4089	int from_nodeid;
4090	int error, namelen = 0;
4091
4092	from_nodeid = ms->m_header.h_nodeid;
4093
4094	error = create_lkb(ls, &lkb);
4095	if (error)
4096		goto fail;
4097
4098	receive_flags(lkb, ms);
4099	lkb->lkb_flags |= DLM_IFL_MSTCPY;
4100	error = receive_request_args(ls, lkb, ms);
4101	if (error) {
4102		__put_lkb(ls, lkb);
4103		goto fail;
4104	}
4105
4106	/* The dir node is the authority on whether we are the master
4107	   for this rsb or not, so if the master sends us a request, we should
4108	   recreate the rsb if we've destroyed it.   This race happens when we
4109	   send a remove message to the dir node at the same time that the dir
4110	   node sends us a request for the rsb. */
4111
4112	namelen = receive_extralen(ms);
4113
4114	error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4115			 R_RECEIVE_REQUEST, &r);
4116	if (error) {
4117		__put_lkb(ls, lkb);
4118		goto fail;
4119	}
4120
4121	lock_rsb(r);
4122
4123	if (r->res_master_nodeid != dlm_our_nodeid()) {
4124		error = validate_master_nodeid(ls, r, from_nodeid);
4125		if (error) {
4126			unlock_rsb(r);
4127			put_rsb(r);
4128			__put_lkb(ls, lkb);
4129			goto fail;
4130		}
4131	}
4132
4133	attach_lkb(r, lkb);
4134	error = do_request(r, lkb);
4135	send_request_reply(r, lkb, error);
4136	do_request_effects(r, lkb, error);
4137
4138	unlock_rsb(r);
4139	put_rsb(r);
4140
4141	if (error == -EINPROGRESS)
4142		error = 0;
4143	if (error)
4144		dlm_put_lkb(lkb);
4145	return 0;
4146
4147 fail:
4148	/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4149	   and do this receive_request again from process_lookup_list once
4150	   we get the lookup reply.  This would avoid a many repeated
4151	   ENOTBLK request failures when the lookup reply designating us
4152	   as master is delayed. */
4153
4154	/* We could repeatedly return -EBADR here if our send_remove() is
4155	   delayed in being sent/arriving/being processed on the dir node.
4156	   Another node would repeatedly lookup up the master, and the dir
4157	   node would continue returning our nodeid until our send_remove
4158	   took effect.
4159
4160	   We send another remove message in case our previous send_remove
4161	   was lost/ignored/missed somehow. */
4162
4163	if (error != -ENOTBLK) {
4164		log_limit(ls, "receive_request %x from %d %d",
4165			  ms->m_lkid, from_nodeid, error);
4166	}
4167
4168	if (namelen && error == -EBADR) {
4169		send_repeat_remove(ls, ms->m_extra, namelen);
4170		msleep(1000);
4171	}
4172
4173	setup_stub_lkb(ls, ms);
4174	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4175	return error;
4176}
4177
4178static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4179{
4180	struct dlm_lkb *lkb;
4181	struct dlm_rsb *r;
4182	int error, reply = 1;
4183
4184	error = find_lkb(ls, ms->m_remid, &lkb);
4185	if (error)
4186		goto fail;
4187
4188	if (lkb->lkb_remid != ms->m_lkid) {
4189		log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4190			  "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4191			  (unsigned long long)lkb->lkb_recover_seq,
4192			  ms->m_header.h_nodeid, ms->m_lkid);
4193		error = -ENOENT;
4194		dlm_put_lkb(lkb);
4195		goto fail;
4196	}
4197
4198	r = lkb->lkb_resource;
4199
4200	hold_rsb(r);
4201	lock_rsb(r);
4202
4203	error = validate_message(lkb, ms);
4204	if (error)
4205		goto out;
4206
4207	receive_flags(lkb, ms);
4208
4209	error = receive_convert_args(ls, lkb, ms);
4210	if (error) {
4211		send_convert_reply(r, lkb, error);
4212		goto out;
4213	}
4214
4215	reply = !down_conversion(lkb);
4216
4217	error = do_convert(r, lkb);
4218	if (reply)
4219		send_convert_reply(r, lkb, error);
4220	do_convert_effects(r, lkb, error);
4221 out:
4222	unlock_rsb(r);
4223	put_rsb(r);
4224	dlm_put_lkb(lkb);
4225	return 0;
4226
4227 fail:
4228	setup_stub_lkb(ls, ms);
4229	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4230	return error;
4231}
4232
4233static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4234{
4235	struct dlm_lkb *lkb;
4236	struct dlm_rsb *r;
4237	int error;
4238
4239	error = find_lkb(ls, ms->m_remid, &lkb);
4240	if (error)
4241		goto fail;
4242
4243	if (lkb->lkb_remid != ms->m_lkid) {
4244		log_error(ls, "receive_unlock %x remid %x remote %d %x",
4245			  lkb->lkb_id, lkb->lkb_remid,
4246			  ms->m_header.h_nodeid, ms->m_lkid);
4247		error = -ENOENT;
4248		dlm_put_lkb(lkb);
4249		goto fail;
4250	}
4251
4252	r = lkb->lkb_resource;
4253
4254	hold_rsb(r);
4255	lock_rsb(r);
4256
4257	error = validate_message(lkb, ms);
4258	if (error)
4259		goto out;
4260
4261	receive_flags(lkb, ms);
4262
4263	error = receive_unlock_args(ls, lkb, ms);
4264	if (error) {
4265		send_unlock_reply(r, lkb, error);
4266		goto out;
4267	}
4268
4269	error = do_unlock(r, lkb);
4270	send_unlock_reply(r, lkb, error);
4271	do_unlock_effects(r, lkb, error);
4272 out:
4273	unlock_rsb(r);
4274	put_rsb(r);
4275	dlm_put_lkb(lkb);
4276	return 0;
4277
4278 fail:
4279	setup_stub_lkb(ls, ms);
4280	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4281	return error;
4282}
4283
4284static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4285{
4286	struct dlm_lkb *lkb;
4287	struct dlm_rsb *r;
4288	int error;
4289
4290	error = find_lkb(ls, ms->m_remid, &lkb);
4291	if (error)
4292		goto fail;
4293
4294	receive_flags(lkb, ms);
4295
4296	r = lkb->lkb_resource;
4297
4298	hold_rsb(r);
4299	lock_rsb(r);
4300
4301	error = validate_message(lkb, ms);
4302	if (error)
4303		goto out;
4304
4305	error = do_cancel(r, lkb);
4306	send_cancel_reply(r, lkb, error);
4307	do_cancel_effects(r, lkb, error);
4308 out:
4309	unlock_rsb(r);
4310	put_rsb(r);
4311	dlm_put_lkb(lkb);
4312	return 0;
4313
4314 fail:
4315	setup_stub_lkb(ls, ms);
4316	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4317	return error;
4318}
4319
4320static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4321{
4322	struct dlm_lkb *lkb;
4323	struct dlm_rsb *r;
4324	int error;
4325
4326	error = find_lkb(ls, ms->m_remid, &lkb);
4327	if (error)
4328		return error;
4329
4330	r = lkb->lkb_resource;
4331
4332	hold_rsb(r);
4333	lock_rsb(r);
4334
4335	error = validate_message(lkb, ms);
4336	if (error)
4337		goto out;
4338
4339	receive_flags_reply(lkb, ms);
4340	if (is_altmode(lkb))
4341		munge_altmode(lkb, ms);
4342	grant_lock_pc(r, lkb, ms);
4343	queue_cast(r, lkb, 0);
4344 out:
4345	unlock_rsb(r);
4346	put_rsb(r);
4347	dlm_put_lkb(lkb);
4348	return 0;
4349}
4350
4351static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4352{
4353	struct dlm_lkb *lkb;
4354	struct dlm_rsb *r;
4355	int error;
4356
4357	error = find_lkb(ls, ms->m_remid, &lkb);
4358	if (error)
4359		return error;
4360
4361	r = lkb->lkb_resource;
4362
4363	hold_rsb(r);
4364	lock_rsb(r);
4365
4366	error = validate_message(lkb, ms);
4367	if (error)
4368		goto out;
4369
4370	queue_bast(r, lkb, ms->m_bastmode);
4371	lkb->lkb_highbast = ms->m_bastmode;
4372 out:
4373	unlock_rsb(r);
4374	put_rsb(r);
4375	dlm_put_lkb(lkb);
4376	return 0;
4377}
4378
4379static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4380{
4381	int len, error, ret_nodeid, from_nodeid, our_nodeid;
4382
4383	from_nodeid = ms->m_header.h_nodeid;
4384	our_nodeid = dlm_our_nodeid();
4385
4386	len = receive_extralen(ms);
4387
4388	error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4389				  &ret_nodeid, NULL);
4390
4391	/* Optimization: we're master so treat lookup as a request */
4392	if (!error && ret_nodeid == our_nodeid) {
4393		receive_request(ls, ms);
4394		return;
4395	}
4396	send_lookup_reply(ls, ms, ret_nodeid, error);
4397}
4398
4399static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4400{
4401	char name[DLM_RESNAME_MAXLEN+1];
4402	struct dlm_rsb *r;
4403	uint32_t hash, b;
4404	int rv, len, dir_nodeid, from_nodeid;
4405
4406	from_nodeid = ms->m_header.h_nodeid;
4407
4408	len = receive_extralen(ms);
4409
4410	if (len > DLM_RESNAME_MAXLEN) {
4411		log_error(ls, "receive_remove from %d bad len %d",
4412			  from_nodeid, len);
4413		return;
4414	}
4415
4416	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
4417	if (dir_nodeid != dlm_our_nodeid()) {
4418		log_error(ls, "receive_remove from %d bad nodeid %d",
4419			  from_nodeid, dir_nodeid);
4420		return;
4421	}
4422
4423	/* Look for name on rsbtbl.toss, if it's there, kill it.
4424	   If it's on rsbtbl.keep, it's being used, and we should ignore this
4425	   message.  This is an expected race between the dir node sending a
4426	   request to the master node at the same time as the master node sends
4427	   a remove to the dir node.  The resolution to that race is for the
4428	   dir node to ignore the remove message, and the master node to
4429	   recreate the master rsb when it gets a request from the dir node for
4430	   an rsb it doesn't have. */
4431
4432	memset(name, 0, sizeof(name));
4433	memcpy(name, ms->m_extra, len);
4434
4435	hash = jhash(name, len, 0);
4436	b = hash & (ls->ls_rsbtbl_size - 1);
4437
4438	spin_lock(&ls->ls_rsbtbl[b].lock);
4439
4440	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4441	if (rv) {
4442		/* verify the rsb is on keep list per comment above */
4443		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4444		if (rv) {
4445			/* should not happen */
4446			log_error(ls, "receive_remove from %d not found %s",
4447				  from_nodeid, name);
4448			spin_unlock(&ls->ls_rsbtbl[b].lock);
4449			return;
4450		}
4451		if (r->res_master_nodeid != from_nodeid) {
4452			/* should not happen */
4453			log_error(ls, "receive_remove keep from %d master %d",
4454				  from_nodeid, r->res_master_nodeid);
4455			dlm_print_rsb(r);
4456			spin_unlock(&ls->ls_rsbtbl[b].lock);
4457			return;
4458		}
4459
4460		log_debug(ls, "receive_remove from %d master %d first %x %s",
4461			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4462			  name);
4463		spin_unlock(&ls->ls_rsbtbl[b].lock);
4464		return;
4465	}
4466
4467	if (r->res_master_nodeid != from_nodeid) {
4468		log_error(ls, "receive_remove toss from %d master %d",
4469			  from_nodeid, r->res_master_nodeid);
4470		dlm_print_rsb(r);
4471		spin_unlock(&ls->ls_rsbtbl[b].lock);
4472		return;
4473	}
4474
4475	if (kref_put(&r->res_ref, kill_rsb)) {
4476		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4477		spin_unlock(&ls->ls_rsbtbl[b].lock);
4478		dlm_free_rsb(r);
4479	} else {
4480		log_error(ls, "receive_remove from %d rsb ref error",
4481			  from_nodeid);
4482		dlm_print_rsb(r);
4483		spin_unlock(&ls->ls_rsbtbl[b].lock);
4484	}
4485}
4486
4487static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4488{
4489	do_purge(ls, ms->m_nodeid, ms->m_pid);
4490}
4491
4492static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4493{
4494	struct dlm_lkb *lkb;
4495	struct dlm_rsb *r;
4496	int error, mstype, result;
4497	int from_nodeid = ms->m_header.h_nodeid;
4498
4499	error = find_lkb(ls, ms->m_remid, &lkb);
4500	if (error)
4501		return error;
4502
4503	r = lkb->lkb_resource;
4504	hold_rsb(r);
4505	lock_rsb(r);
4506
4507	error = validate_message(lkb, ms);
4508	if (error)
4509		goto out;
4510
4511	mstype = lkb->lkb_wait_type;
4512	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4513	if (error) {
4514		log_error(ls, "receive_request_reply %x remote %d %x result %d",
4515			  lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
4516		dlm_dump_rsb(r);
4517		goto out;
4518	}
4519
4520	/* Optimization: the dir node was also the master, so it took our
4521	   lookup as a request and sent request reply instead of lookup reply */
4522	if (mstype == DLM_MSG_LOOKUP) {
4523		r->res_master_nodeid = from_nodeid;
4524		r->res_nodeid = from_nodeid;
4525		lkb->lkb_nodeid = from_nodeid;
4526	}
4527
4528	/* this is the value returned from do_request() on the master */
4529	result = ms->m_result;
4530
4531	switch (result) {
4532	case -EAGAIN:
4533		/* request would block (be queued) on remote master */
4534		queue_cast(r, lkb, -EAGAIN);
4535		confirm_master(r, -EAGAIN);
4536		unhold_lkb(lkb); /* undoes create_lkb() */
4537		break;
4538
4539	case -EINPROGRESS:
4540	case 0:
4541		/* request was queued or granted on remote master */
4542		receive_flags_reply(lkb, ms);
4543		lkb->lkb_remid = ms->m_lkid;
4544		if (is_altmode(lkb))
4545			munge_altmode(lkb, ms);
4546		if (result) {
4547			add_lkb(r, lkb, DLM_LKSTS_WAITING);
4548			add_timeout(lkb);
4549		} else {
4550			grant_lock_pc(r, lkb, ms);
4551			queue_cast(r, lkb, 0);
4552		}
4553		confirm_master(r, result);
4554		break;
4555
4556	case -EBADR:
4557	case -ENOTBLK:
4558		/* find_rsb failed to find rsb or rsb wasn't master */
4559		log_limit(ls, "receive_request_reply %x from %d %d "
4560			  "master %d dir %d first %x %s", lkb->lkb_id,
4561			  from_nodeid, result, r->res_master_nodeid,
4562			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4563
4564		if (r->res_dir_nodeid != dlm_our_nodeid() &&
4565		    r->res_master_nodeid != dlm_our_nodeid()) {
4566			/* cause _request_lock->set_master->send_lookup */
4567			r->res_master_nodeid = 0;
4568			r->res_nodeid = -1;
4569			lkb->lkb_nodeid = -1;
4570		}
4571
4572		if (is_overlap(lkb)) {
4573			/* we'll ignore error in cancel/unlock reply */
4574			queue_cast_overlap(r, lkb);
4575			confirm_master(r, result);
4576			unhold_lkb(lkb); /* undoes create_lkb() */
4577		} else {
4578			_request_lock(r, lkb);
4579
4580			if (r->res_master_nodeid == dlm_our_nodeid())
4581				confirm_master(r, 0);
4582		}
4583		break;
4584
4585	default:
4586		log_error(ls, "receive_request_reply %x error %d",
4587			  lkb->lkb_id, result);
4588	}
4589
4590	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4591		log_debug(ls, "receive_request_reply %x result %d unlock",
4592			  lkb->lkb_id, result);
4593		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4594		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4595		send_unlock(r, lkb);
4596	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4597		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4598		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4599		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4600		send_cancel(r, lkb);
4601	} else {
4602		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4603		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4604	}
4605 out:
4606	unlock_rsb(r);
4607	put_rsb(r);
4608	dlm_put_lkb(lkb);
4609	return 0;
4610}
4611
4612static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4613				    struct dlm_message *ms)
4614{
4615	/* this is the value returned from do_convert() on the master */
4616	switch (ms->m_result) {
4617	case -EAGAIN:
4618		/* convert would block (be queued) on remote master */
4619		queue_cast(r, lkb, -EAGAIN);
4620		break;
4621
4622	case -EDEADLK:
4623		receive_flags_reply(lkb, ms);
4624		revert_lock_pc(r, lkb);
4625		queue_cast(r, lkb, -EDEADLK);
4626		break;
4627
4628	case -EINPROGRESS:
4629		/* convert was queued on remote master */
4630		receive_flags_reply(lkb, ms);
4631		if (is_demoted(lkb))
4632			munge_demoted(lkb);
4633		del_lkb(r, lkb);
4634		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4635		add_timeout(lkb);
4636		break;
4637
4638	case 0:
4639		/* convert was granted on remote master */
4640		receive_flags_reply(lkb, ms);
4641		if (is_demoted(lkb))
4642			munge_demoted(lkb);
4643		grant_lock_pc(r, lkb, ms);
4644		queue_cast(r, lkb, 0);
4645		break;
4646
4647	default:
4648		log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4649			  lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
4650			  ms->m_result);
4651		dlm_print_rsb(r);
4652		dlm_print_lkb(lkb);
4653	}
4654}
4655
4656static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4657{
4658	struct dlm_rsb *r = lkb->lkb_resource;
4659	int error;
4660
4661	hold_rsb(r);
4662	lock_rsb(r);
4663
4664	error = validate_message(lkb, ms);
4665	if (error)
4666		goto out;
4667
4668	/* stub reply can happen with waiters_mutex held */
4669	error = remove_from_waiters_ms(lkb, ms);
4670	if (error)
4671		goto out;
4672
4673	__receive_convert_reply(r, lkb, ms);
4674 out:
4675	unlock_rsb(r);
4676	put_rsb(r);
4677}
4678
4679static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4680{
4681	struct dlm_lkb *lkb;
4682	int error;
4683
4684	error = find_lkb(ls, ms->m_remid, &lkb);
4685	if (error)
4686		return error;
4687
4688	_receive_convert_reply(lkb, ms);
4689	dlm_put_lkb(lkb);
4690	return 0;
4691}
4692
4693static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4694{
4695	struct dlm_rsb *r = lkb->lkb_resource;
4696	int error;
4697
4698	hold_rsb(r);
4699	lock_rsb(r);
4700
4701	error = validate_message(lkb, ms);
4702	if (error)
4703		goto out;
4704
4705	/* stub reply can happen with waiters_mutex held */
4706	error = remove_from_waiters_ms(lkb, ms);
4707	if (error)
4708		goto out;
4709
4710	/* this is the value returned from do_unlock() on the master */
4711
4712	switch (ms->m_result) {
4713	case -DLM_EUNLOCK:
4714		receive_flags_reply(lkb, ms);
4715		remove_lock_pc(r, lkb);
4716		queue_cast(r, lkb, -DLM_EUNLOCK);
4717		break;
4718	case -ENOENT:
4719		break;
4720	default:
4721		log_error(r->res_ls, "receive_unlock_reply %x error %d",
4722			  lkb->lkb_id, ms->m_result);
4723	}
4724 out:
4725	unlock_rsb(r);
4726	put_rsb(r);
4727}
4728
4729static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4730{
4731	struct dlm_lkb *lkb;
4732	int error;
4733
4734	error = find_lkb(ls, ms->m_remid, &lkb);
4735	if (error)
4736		return error;
4737
4738	_receive_unlock_reply(lkb, ms);
4739	dlm_put_lkb(lkb);
4740	return 0;
4741}
4742
4743static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4744{
4745	struct dlm_rsb *r = lkb->lkb_resource;
4746	int error;
4747
4748	hold_rsb(r);
4749	lock_rsb(r);
4750
4751	error = validate_message(lkb, ms);
4752	if (error)
4753		goto out;
4754
4755	/* stub reply can happen with waiters_mutex held */
4756	error = remove_from_waiters_ms(lkb, ms);
4757	if (error)
4758		goto out;
4759
4760	/* this is the value returned from do_cancel() on the master */
4761
4762	switch (ms->m_result) {
4763	case -DLM_ECANCEL:
4764		receive_flags_reply(lkb, ms);
4765		revert_lock_pc(r, lkb);
4766		queue_cast(r, lkb, -DLM_ECANCEL);
4767		break;
4768	case 0:
4769		break;
4770	default:
4771		log_error(r->res_ls, "receive_cancel_reply %x error %d",
4772			  lkb->lkb_id, ms->m_result);
4773	}
4774 out:
4775	unlock_rsb(r);
4776	put_rsb(r);
4777}
4778
4779static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4780{
4781	struct dlm_lkb *lkb;
4782	int error;
4783
4784	error = find_lkb(ls, ms->m_remid, &lkb);
4785	if (error)
4786		return error;
4787
4788	_receive_cancel_reply(lkb, ms);
4789	dlm_put_lkb(lkb);
4790	return 0;
4791}
4792
4793static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4794{
4795	struct dlm_lkb *lkb;
4796	struct dlm_rsb *r;
4797	int error, ret_nodeid;
4798	int do_lookup_list = 0;
4799
4800	error = find_lkb(ls, ms->m_lkid, &lkb);
4801	if (error) {
4802		log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
4803		return;
4804	}
4805
4806	/* ms->m_result is the value returned by dlm_master_lookup on dir node
4807	   FIXME: will a non-zero error ever be returned? */
4808
4809	r = lkb->lkb_resource;
4810	hold_rsb(r);
4811	lock_rsb(r);
4812
4813	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4814	if (error)
4815		goto out;
4816
4817	ret_nodeid = ms->m_nodeid;
4818
4819	/* We sometimes receive a request from the dir node for this
4820	   rsb before we've received the dir node's loookup_reply for it.
4821	   The request from the dir node implies we're the master, so we set
4822	   ourself as master in receive_request_reply, and verify here that
4823	   we are indeed the master. */
4824
4825	if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4826		/* This should never happen */
4827		log_error(ls, "receive_lookup_reply %x from %d ret %d "
4828			  "master %d dir %d our %d first %x %s",
4829			  lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
4830			  r->res_master_nodeid, r->res_dir_nodeid,
4831			  dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4832	}
4833
4834	if (ret_nodeid == dlm_our_nodeid()) {
4835		r->res_master_nodeid = ret_nodeid;
4836		r->res_nodeid = 0;
4837		do_lookup_list = 1;
4838		r->res_first_lkid = 0;
4839	} else if (ret_nodeid == -1) {
4840		/* the remote node doesn't believe it's the dir node */
4841		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4842			  lkb->lkb_id, ms->m_header.h_nodeid);
4843		r->res_master_nodeid = 0;
4844		r->res_nodeid = -1;
4845		lkb->lkb_nodeid = -1;
4846	} else {
4847		/* set_master() will set lkb_nodeid from r */
4848		r->res_master_nodeid = ret_nodeid;
4849		r->res_nodeid = ret_nodeid;
4850	}
4851
4852	if (is_overlap(lkb)) {
4853		log_debug(ls, "receive_lookup_reply %x unlock %x",
4854			  lkb->lkb_id, lkb->lkb_flags);
4855		queue_cast_overlap(r, lkb);
4856		unhold_lkb(lkb); /* undoes create_lkb() */
4857		goto out_list;
4858	}
4859
4860	_request_lock(r, lkb);
4861
4862 out_list:
4863	if (do_lookup_list)
4864		process_lookup_list(r);
4865 out:
4866	unlock_rsb(r);
4867	put_rsb(r);
4868	dlm_put_lkb(lkb);
4869}
4870
4871static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4872			     uint32_t saved_seq)
4873{
4874	int error = 0, noent = 0;
4875
4876	if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4877		log_limit(ls, "receive %d from non-member %d %x %x %d",
4878			  ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4879			  ms->m_remid, ms->m_result);
4880		return;
4881	}
4882
4883	switch (ms->m_type) {
4884
4885	/* messages sent to a master node */
4886
4887	case DLM_MSG_REQUEST:
4888		error = receive_request(ls, ms);
4889		break;
4890
4891	case DLM_MSG_CONVERT:
4892		error = receive_convert(ls, ms);
4893		break;
4894
4895	case DLM_MSG_UNLOCK:
4896		error = receive_unlock(ls, ms);
4897		break;
4898
4899	case DLM_MSG_CANCEL:
4900		noent = 1;
4901		error = receive_cancel(ls, ms);
4902		break;
4903
4904	/* messages sent from a master node (replies to above) */
4905
4906	case DLM_MSG_REQUEST_REPLY:
4907		error = receive_request_reply(ls, ms);
4908		break;
4909
4910	case DLM_MSG_CONVERT_REPLY:
4911		error = receive_convert_reply(ls, ms);
4912		break;
4913
4914	case DLM_MSG_UNLOCK_REPLY:
4915		error = receive_unlock_reply(ls, ms);
4916		break;
4917
4918	case DLM_MSG_CANCEL_REPLY:
4919		error = receive_cancel_reply(ls, ms);
4920		break;
4921
4922	/* messages sent from a master node (only two types of async msg) */
4923
4924	case DLM_MSG_GRANT:
4925		noent = 1;
4926		error = receive_grant(ls, ms);
4927		break;
4928
4929	case DLM_MSG_BAST:
4930		noent = 1;
4931		error = receive_bast(ls, ms);
4932		break;
4933
4934	/* messages sent to a dir node */
4935
4936	case DLM_MSG_LOOKUP:
4937		receive_lookup(ls, ms);
4938		break;
4939
4940	case DLM_MSG_REMOVE:
4941		receive_remove(ls, ms);
4942		break;
4943
4944	/* messages sent from a dir node (remove has no reply) */
4945
4946	case DLM_MSG_LOOKUP_REPLY:
4947		receive_lookup_reply(ls, ms);
4948		break;
4949
4950	/* other messages */
4951
4952	case DLM_MSG_PURGE:
4953		receive_purge(ls, ms);
4954		break;
4955
4956	default:
4957		log_error(ls, "unknown message type %d", ms->m_type);
4958	}
4959
4960	/*
4961	 * When checking for ENOENT, we're checking the result of
4962	 * find_lkb(m_remid):
4963	 *
4964	 * The lock id referenced in the message wasn't found.  This may
4965	 * happen in normal usage for the async messages and cancel, so
4966	 * only use log_debug for them.
4967	 *
4968	 * Some errors are expected and normal.
4969	 */
4970
4971	if (error == -ENOENT && noent) {
4972		log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4973			  ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4974			  ms->m_lkid, saved_seq);
4975	} else if (error == -ENOENT) {
4976		log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4977			  ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4978			  ms->m_lkid, saved_seq);
4979
4980		if (ms->m_type == DLM_MSG_CONVERT)
4981			dlm_dump_rsb_hash(ls, ms->m_hash);
4982	}
4983
4984	if (error == -EINVAL) {
4985		log_error(ls, "receive %d inval from %d lkid %x remid %x "
4986			  "saved_seq %u",
4987			  ms->m_type, ms->m_header.h_nodeid,
4988			  ms->m_lkid, ms->m_remid, saved_seq);
4989	}
4990}
4991
4992/* If the lockspace is in recovery mode (locking stopped), then normal
4993   messages are saved on the requestqueue for processing after recovery is
4994   done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4995   messages off the requestqueue before we process new ones. This occurs right
4996   after recovery completes when we transition from saving all messages on
4997   requestqueue, to processing all the saved messages, to processing new
4998   messages as they arrive. */
4999
5000static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
5001				int nodeid)
5002{
5003	if (dlm_locking_stopped(ls)) {
5004		/* If we were a member of this lockspace, left, and rejoined,
5005		   other nodes may still be sending us messages from the
5006		   lockspace generation before we left. */
5007		if (!ls->ls_generation) {
5008			log_limit(ls, "receive %d from %d ignore old gen",
5009				  ms->m_type, nodeid);
5010			return;
5011		}
5012
5013		dlm_add_requestqueue(ls, nodeid, ms);
5014	} else {
5015		dlm_wait_requestqueue(ls);
5016		_receive_message(ls, ms, 0);
5017	}
5018}
5019
5020/* This is called by dlm_recoverd to process messages that were saved on
5021   the requestqueue. */
5022
5023void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
5024			       uint32_t saved_seq)
5025{
5026	_receive_message(ls, ms, saved_seq);
5027}
5028
5029/* This is called by the midcomms layer when something is received for
5030   the lockspace.  It could be either a MSG (normal message sent as part of
5031   standard locking activity) or an RCOM (recovery message sent as part of
5032   lockspace recovery). */
5033
5034void dlm_receive_buffer(union dlm_packet *p, int nodeid)
5035{
5036	struct dlm_header *hd = &p->header;
5037	struct dlm_ls *ls;
5038	int type = 0;
5039
5040	switch (hd->h_cmd) {
5041	case DLM_MSG:
5042		dlm_message_in(&p->message);
5043		type = p->message.m_type;
5044		break;
5045	case DLM_RCOM:
5046		dlm_rcom_in(&p->rcom);
5047		type = p->rcom.rc_type;
5048		break;
5049	default:
5050		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
5051		return;
5052	}
5053
5054	if (hd->h_nodeid != nodeid) {
5055		log_print("invalid h_nodeid %d from %d lockspace %x",
5056			  hd->h_nodeid, nodeid, hd->h_lockspace);
5057		return;
5058	}
5059
5060	ls = dlm_find_lockspace_global(hd->h_lockspace);
5061	if (!ls) {
5062		if (dlm_config.ci_log_debug) {
5063			printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
5064				"%u from %d cmd %d type %d\n",
5065				hd->h_lockspace, nodeid, hd->h_cmd, type);
5066		}
5067
5068		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
5069			dlm_send_ls_not_ready(nodeid, &p->rcom);
5070		return;
5071	}
5072
5073	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5074	   be inactive (in this ls) before transitioning to recovery mode */
5075
5076	down_read(&ls->ls_recv_active);
5077	if (hd->h_cmd == DLM_MSG)
5078		dlm_receive_message(ls, &p->message, nodeid);
5079	else
5080		dlm_receive_rcom(ls, &p->rcom, nodeid);
5081	up_read(&ls->ls_recv_active);
5082
5083	dlm_put_lockspace(ls);
5084}
5085
5086static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5087				   struct dlm_message *ms_stub)
5088{
5089	if (middle_conversion(lkb)) {
5090		hold_lkb(lkb);
5091		memset(ms_stub, 0, sizeof(struct dlm_message));
5092		ms_stub->m_flags = DLM_IFL_STUB_MS;
5093		ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
5094		ms_stub->m_result = -EINPROGRESS;
5095		ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5096		_receive_convert_reply(lkb, ms_stub);
5097
5098		/* Same special case as in receive_rcom_lock_args() */
5099		lkb->lkb_grmode = DLM_LOCK_IV;
5100		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5101		unhold_lkb(lkb);
5102
5103	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5104		lkb->lkb_flags |= DLM_IFL_RESEND;
5105	}
5106
5107	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5108	   conversions are async; there's no reply from the remote master */
5109}
5110
5111/* A waiting lkb needs recovery if the master node has failed, or
5112   the master node is changing (only when no directory is used) */
5113
5114static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5115				 int dir_nodeid)
5116{
5117	if (dlm_no_directory(ls))
5118		return 1;
5119
5120	if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5121		return 1;
5122
5123	return 0;
5124}
5125
5126/* Recovery for locks that are waiting for replies from nodes that are now
5127   gone.  We can just complete unlocks and cancels by faking a reply from the
5128   dead node.  Requests and up-conversions we flag to be resent after
5129   recovery.  Down-conversions can just be completed with a fake reply like
5130   unlocks.  Conversions between PR and CW need special attention. */
5131
5132void dlm_recover_waiters_pre(struct dlm_ls *ls)
5133{
5134	struct dlm_lkb *lkb, *safe;
5135	struct dlm_message *ms_stub;
5136	int wait_type, stub_unlock_result, stub_cancel_result;
5137	int dir_nodeid;
5138
5139	ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL);
5140	if (!ms_stub)
5141		return;
5142
5143	mutex_lock(&ls->ls_waiters_mutex);
5144
5145	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5146
5147		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5148
5149		/* exclude debug messages about unlocks because there can be so
5150		   many and they aren't very interesting */
5151
5152		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5153			log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5154				  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5155				  lkb->lkb_id,
5156				  lkb->lkb_remid,
5157				  lkb->lkb_wait_type,
5158				  lkb->lkb_resource->res_nodeid,
5159				  lkb->lkb_nodeid,
5160				  lkb->lkb_wait_nodeid,
5161				  dir_nodeid);
5162		}
5163
5164		/* all outstanding lookups, regardless of destination  will be
5165		   resent after recovery is done */
5166
5167		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5168			lkb->lkb_flags |= DLM_IFL_RESEND;
5169			continue;
5170		}
5171
5172		if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5173			continue;
5174
5175		wait_type = lkb->lkb_wait_type;
5176		stub_unlock_result = -DLM_EUNLOCK;
5177		stub_cancel_result = -DLM_ECANCEL;
5178
5179		/* Main reply may have been received leaving a zero wait_type,
5180		   but a reply for the overlapping op may not have been
5181		   received.  In that case we need to fake the appropriate
5182		   reply for the overlap op. */
5183
5184		if (!wait_type) {
5185			if (is_overlap_cancel(lkb)) {
5186				wait_type = DLM_MSG_CANCEL;
5187				if (lkb->lkb_grmode == DLM_LOCK_IV)
5188					stub_cancel_result = 0;
5189			}
5190			if (is_overlap_unlock(lkb)) {
5191				wait_type = DLM_MSG_UNLOCK;
5192				if (lkb->lkb_grmode == DLM_LOCK_IV)
5193					stub_unlock_result = -ENOENT;
5194			}
5195
5196			log_debug(ls, "rwpre overlap %x %x %d %d %d",
5197				  lkb->lkb_id, lkb->lkb_flags, wait_type,
5198				  stub_cancel_result, stub_unlock_result);
5199		}
5200
5201		switch (wait_type) {
5202
5203		case DLM_MSG_REQUEST:
5204			lkb->lkb_flags |= DLM_IFL_RESEND;
5205			break;
5206
5207		case DLM_MSG_CONVERT:
5208			recover_convert_waiter(ls, lkb, ms_stub);
5209			break;
5210
5211		case DLM_MSG_UNLOCK:
5212			hold_lkb(lkb);
5213			memset(ms_stub, 0, sizeof(struct dlm_message));
5214			ms_stub->m_flags = DLM_IFL_STUB_MS;
5215			ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
5216			ms_stub->m_result = stub_unlock_result;
5217			ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5218			_receive_unlock_reply(lkb, ms_stub);
5219			dlm_put_lkb(lkb);
5220			break;
5221
5222		case DLM_MSG_CANCEL:
5223			hold_lkb(lkb);
5224			memset(ms_stub, 0, sizeof(struct dlm_message));
5225			ms_stub->m_flags = DLM_IFL_STUB_MS;
5226			ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
5227			ms_stub->m_result = stub_cancel_result;
5228			ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5229			_receive_cancel_reply(lkb, ms_stub);
5230			dlm_put_lkb(lkb);
5231			break;
5232
5233		default:
5234			log_error(ls, "invalid lkb wait_type %d %d",
5235				  lkb->lkb_wait_type, wait_type);
5236		}
5237		schedule();
5238	}
5239	mutex_unlock(&ls->ls_waiters_mutex);
5240	kfree(ms_stub);
5241}
5242
5243static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5244{
5245	struct dlm_lkb *lkb = NULL, *iter;
5246
5247	mutex_lock(&ls->ls_waiters_mutex);
5248	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5249		if (iter->lkb_flags & DLM_IFL_RESEND) {
5250			hold_lkb(iter);
5251			lkb = iter;
5252			break;
5253		}
5254	}
5255	mutex_unlock(&ls->ls_waiters_mutex);
5256
5257	return lkb;
5258}
5259
5260/* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
5261   master or dir-node for r.  Processing the lkb may result in it being placed
5262   back on waiters. */
5263
5264/* We do this after normal locking has been enabled and any saved messages
5265   (in requestqueue) have been processed.  We should be confident that at
5266   this point we won't get or process a reply to any of these waiting
5267   operations.  But, new ops may be coming in on the rsbs/locks here from
5268   userspace or remotely. */
5269
5270/* there may have been an overlap unlock/cancel prior to recovery or after
5271   recovery.  if before, the lkb may still have a pos wait_count; if after, the
5272   overlap flag would just have been set and nothing new sent.  we can be
5273   confident here than any replies to either the initial op or overlap ops
5274   prior to recovery have been received. */
5275
5276int dlm_recover_waiters_post(struct dlm_ls *ls)
5277{
5278	struct dlm_lkb *lkb;
5279	struct dlm_rsb *r;
5280	int error = 0, mstype, err, oc, ou;
5281
5282	while (1) {
5283		if (dlm_locking_stopped(ls)) {
5284			log_debug(ls, "recover_waiters_post aborted");
5285			error = -EINTR;
5286			break;
5287		}
5288
5289		lkb = find_resend_waiter(ls);
5290		if (!lkb)
5291			break;
5292
5293		r = lkb->lkb_resource;
5294		hold_rsb(r);
5295		lock_rsb(r);
5296
5297		mstype = lkb->lkb_wait_type;
5298		oc = is_overlap_cancel(lkb);
5299		ou = is_overlap_unlock(lkb);
5300		err = 0;
5301
5302		log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5303			  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5304			  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5305			  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5306			  dlm_dir_nodeid(r), oc, ou);
5307
5308		/* At this point we assume that we won't get a reply to any
5309		   previous op or overlap op on this lock.  First, do a big
5310		   remove_from_waiters() for all previous ops. */
5311
5312		lkb->lkb_flags &= ~DLM_IFL_RESEND;
5313		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5314		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5315		lkb->lkb_wait_type = 0;
5316		/* drop all wait_count references we still
5317		 * hold a reference for this iteration.
5318		 */
5319		while (lkb->lkb_wait_count) {
5320			lkb->lkb_wait_count--;
5321			unhold_lkb(lkb);
5322		}
5323		mutex_lock(&ls->ls_waiters_mutex);
5324		list_del_init(&lkb->lkb_wait_reply);
5325		mutex_unlock(&ls->ls_waiters_mutex);
5326
5327		if (oc || ou) {
5328			/* do an unlock or cancel instead of resending */
5329			switch (mstype) {
5330			case DLM_MSG_LOOKUP:
5331			case DLM_MSG_REQUEST:
5332				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5333							-DLM_ECANCEL);
5334				unhold_lkb(lkb); /* undoes create_lkb() */
5335				break;
5336			case DLM_MSG_CONVERT:
5337				if (oc) {
5338					queue_cast(r, lkb, -DLM_ECANCEL);
5339				} else {
5340					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5341					_unlock_lock(r, lkb);
5342				}
5343				break;
5344			default:
5345				err = 1;
5346			}
5347		} else {
5348			switch (mstype) {
5349			case DLM_MSG_LOOKUP:
5350			case DLM_MSG_REQUEST:
5351				_request_lock(r, lkb);
5352				if (is_master(r))
5353					confirm_master(r, 0);
5354				break;
5355			case DLM_MSG_CONVERT:
5356				_convert_lock(r, lkb);
5357				break;
5358			default:
5359				err = 1;
5360			}
5361		}
5362
5363		if (err) {
5364			log_error(ls, "waiter %x msg %d r_nodeid %d "
5365				  "dir_nodeid %d overlap %d %d",
5366				  lkb->lkb_id, mstype, r->res_nodeid,
5367				  dlm_dir_nodeid(r), oc, ou);
5368		}
5369		unlock_rsb(r);
5370		put_rsb(r);
5371		dlm_put_lkb(lkb);
5372	}
5373
5374	return error;
5375}
5376
5377static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5378			      struct list_head *list)
5379{
5380	struct dlm_lkb *lkb, *safe;
5381
5382	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5383		if (!is_master_copy(lkb))
5384			continue;
5385
5386		/* don't purge lkbs we've added in recover_master_copy for
5387		   the current recovery seq */
5388
5389		if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5390			continue;
5391
5392		del_lkb(r, lkb);
5393
5394		/* this put should free the lkb */
5395		if (!dlm_put_lkb(lkb))
5396			log_error(ls, "purged mstcpy lkb not released");
5397	}
5398}
5399
5400void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5401{
5402	struct dlm_ls *ls = r->res_ls;
5403
5404	purge_mstcpy_list(ls, r, &r->res_grantqueue);
5405	purge_mstcpy_list(ls, r, &r->res_convertqueue);
5406	purge_mstcpy_list(ls, r, &r->res_waitqueue);
5407}
5408
5409static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5410			    struct list_head *list,
5411			    int nodeid_gone, unsigned int *count)
5412{
5413	struct dlm_lkb *lkb, *safe;
5414
5415	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5416		if (!is_master_copy(lkb))
5417			continue;
5418
5419		if ((lkb->lkb_nodeid == nodeid_gone) ||
5420		    dlm_is_removed(ls, lkb->lkb_nodeid)) {
5421
5422			/* tell recover_lvb to invalidate the lvb
5423			   because a node holding EX/PW failed */
5424			if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5425			    (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5426				rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5427			}
5428
5429			del_lkb(r, lkb);
5430
5431			/* this put should free the lkb */
5432			if (!dlm_put_lkb(lkb))
5433				log_error(ls, "purged dead lkb not released");
5434
5435			rsb_set_flag(r, RSB_RECOVER_GRANT);
5436
5437			(*count)++;
5438		}
5439	}
5440}
5441
5442/* Get rid of locks held by nodes that are gone. */
5443
5444void dlm_recover_purge(struct dlm_ls *ls)
5445{
5446	struct dlm_rsb *r;
5447	struct dlm_member *memb;
5448	int nodes_count = 0;
5449	int nodeid_gone = 0;
5450	unsigned int lkb_count = 0;
5451
5452	/* cache one removed nodeid to optimize the common
5453	   case of a single node removed */
5454
5455	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5456		nodes_count++;
5457		nodeid_gone = memb->nodeid;
5458	}
5459
5460	if (!nodes_count)
5461		return;
5462
5463	down_write(&ls->ls_root_sem);
5464	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5465		hold_rsb(r);
5466		lock_rsb(r);
5467		if (is_master(r)) {
5468			purge_dead_list(ls, r, &r->res_grantqueue,
5469					nodeid_gone, &lkb_count);
5470			purge_dead_list(ls, r, &r->res_convertqueue,
5471					nodeid_gone, &lkb_count);
5472			purge_dead_list(ls, r, &r->res_waitqueue,
5473					nodeid_gone, &lkb_count);
5474		}
5475		unlock_rsb(r);
5476		unhold_rsb(r);
5477		cond_resched();
5478	}
5479	up_write(&ls->ls_root_sem);
5480
5481	if (lkb_count)
5482		log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5483			  lkb_count, nodes_count);
5484}
5485
5486static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5487{
5488	struct rb_node *n;
5489	struct dlm_rsb *r;
5490
5491	spin_lock(&ls->ls_rsbtbl[bucket].lock);
5492	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5493		r = rb_entry(n, struct dlm_rsb, res_hashnode);
5494
5495		if (!rsb_flag(r, RSB_RECOVER_GRANT))
5496			continue;
5497		if (!is_master(r)) {
5498			rsb_clear_flag(r, RSB_RECOVER_GRANT);
5499			continue;
5500		}
5501		hold_rsb(r);
5502		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5503		return r;
5504	}
5505	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5506	return NULL;
5507}
5508
5509/*
5510 * Attempt to grant locks on resources that we are the master of.
5511 * Locks may have become grantable during recovery because locks
5512 * from departed nodes have been purged (or not rebuilt), allowing
5513 * previously blocked locks to now be granted.  The subset of rsb's
5514 * we are interested in are those with lkb's on either the convert or
5515 * waiting queues.
5516 *
5517 * Simplest would be to go through each master rsb and check for non-empty
5518 * convert or waiting queues, and attempt to grant on those rsbs.
5519 * Checking the queues requires lock_rsb, though, for which we'd need
5520 * to release the rsbtbl lock.  This would make iterating through all
5521 * rsb's very inefficient.  So, we rely on earlier recovery routines
5522 * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5523 * locks for.
5524 */
5525
5526void dlm_recover_grant(struct dlm_ls *ls)
5527{
5528	struct dlm_rsb *r;
5529	int bucket = 0;
5530	unsigned int count = 0;
5531	unsigned int rsb_count = 0;
5532	unsigned int lkb_count = 0;
5533
5534	while (1) {
5535		r = find_grant_rsb(ls, bucket);
5536		if (!r) {
5537			if (bucket == ls->ls_rsbtbl_size - 1)
5538				break;
5539			bucket++;
5540			continue;
5541		}
5542		rsb_count++;
5543		count = 0;
5544		lock_rsb(r);
5545		/* the RECOVER_GRANT flag is checked in the grant path */
5546		grant_pending_locks(r, &count);
5547		rsb_clear_flag(r, RSB_RECOVER_GRANT);
5548		lkb_count += count;
5549		confirm_master(r, 0);
5550		unlock_rsb(r);
5551		put_rsb(r);
5552		cond_resched();
5553	}
5554
5555	if (lkb_count)
5556		log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5557			  lkb_count, rsb_count);
5558}
5559
5560static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5561					 uint32_t remid)
5562{
5563	struct dlm_lkb *lkb;
5564
5565	list_for_each_entry(lkb, head, lkb_statequeue) {
5566		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5567			return lkb;
5568	}
5569	return NULL;
5570}
5571
5572static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5573				    uint32_t remid)
5574{
5575	struct dlm_lkb *lkb;
5576
5577	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5578	if (lkb)
5579		return lkb;
5580	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5581	if (lkb)
5582		return lkb;
5583	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5584	if (lkb)
5585		return lkb;
5586	return NULL;
5587}
5588
5589/* needs at least dlm_rcom + rcom_lock */
5590static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5591				  struct dlm_rsb *r, struct dlm_rcom *rc)
5592{
5593	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5594
5595	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
5596	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5597	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5598	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5599	lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5600	lkb->lkb_flags |= DLM_IFL_MSTCPY;
5601	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5602	lkb->lkb_rqmode = rl->rl_rqmode;
5603	lkb->lkb_grmode = rl->rl_grmode;
5604	/* don't set lkb_status because add_lkb wants to itself */
5605
5606	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5607	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5608
5609	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5610		int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
5611			 sizeof(struct rcom_lock);
5612		if (lvblen > ls->ls_lvblen)
5613			return -EINVAL;
5614		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5615		if (!lkb->lkb_lvbptr)
5616			return -ENOMEM;
5617		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5618	}
5619
5620	/* Conversions between PR and CW (middle modes) need special handling.
5621	   The real granted mode of these converting locks cannot be determined
5622	   until all locks have been rebuilt on the rsb (recover_conversion) */
5623
5624	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5625	    middle_conversion(lkb)) {
5626		rl->rl_status = DLM_LKSTS_CONVERT;
5627		lkb->lkb_grmode = DLM_LOCK_IV;
5628		rsb_set_flag(r, RSB_RECOVER_CONVERT);
5629	}
5630
5631	return 0;
5632}
5633
5634/* This lkb may have been recovered in a previous aborted recovery so we need
5635   to check if the rsb already has an lkb with the given remote nodeid/lkid.
5636   If so we just send back a standard reply.  If not, we create a new lkb with
5637   the given values and send back our lkid.  We send back our lkid by sending
5638   back the rcom_lock struct we got but with the remid field filled in. */
5639
5640/* needs at least dlm_rcom + rcom_lock */
5641int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5642{
5643	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5644	struct dlm_rsb *r;
5645	struct dlm_lkb *lkb;
5646	uint32_t remid = 0;
5647	int from_nodeid = rc->rc_header.h_nodeid;
5648	int error;
5649
5650	if (rl->rl_parent_lkid) {
5651		error = -EOPNOTSUPP;
5652		goto out;
5653	}
5654
5655	remid = le32_to_cpu(rl->rl_lkid);
5656
5657	/* In general we expect the rsb returned to be R_MASTER, but we don't
5658	   have to require it.  Recovery of masters on one node can overlap
5659	   recovery of locks on another node, so one node can send us MSTCPY
5660	   locks before we've made ourselves master of this rsb.  We can still
5661	   add new MSTCPY locks that we receive here without any harm; when
5662	   we make ourselves master, dlm_recover_masters() won't touch the
5663	   MSTCPY locks we've received early. */
5664
5665	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5666			 from_nodeid, R_RECEIVE_RECOVER, &r);
5667	if (error)
5668		goto out;
5669
5670	lock_rsb(r);
5671
5672	if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5673		log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5674			  from_nodeid, remid);
5675		error = -EBADR;
5676		goto out_unlock;
5677	}
5678
5679	lkb = search_remid(r, from_nodeid, remid);
5680	if (lkb) {
5681		error = -EEXIST;
5682		goto out_remid;
5683	}
5684
5685	error = create_lkb(ls, &lkb);
5686	if (error)
5687		goto out_unlock;
5688
5689	error = receive_rcom_lock_args(ls, lkb, r, rc);
5690	if (error) {
5691		__put_lkb(ls, lkb);
5692		goto out_unlock;
5693	}
5694
5695	attach_lkb(r, lkb);
5696	add_lkb(r, lkb, rl->rl_status);
5697	error = 0;
5698	ls->ls_recover_locks_in++;
5699
5700	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5701		rsb_set_flag(r, RSB_RECOVER_GRANT);
5702
5703 out_remid:
5704	/* this is the new value returned to the lock holder for
5705	   saving in its process-copy lkb */
5706	rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5707
5708	lkb->lkb_recover_seq = ls->ls_recover_seq;
5709
5710 out_unlock:
5711	unlock_rsb(r);
5712	put_rsb(r);
5713 out:
5714	if (error && error != -EEXIST)
5715		log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5716			  from_nodeid, remid, error);
5717	rl->rl_result = cpu_to_le32(error);
5718	return error;
5719}
5720
5721/* needs at least dlm_rcom + rcom_lock */
5722int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5723{
5724	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5725	struct dlm_rsb *r;
5726	struct dlm_lkb *lkb;
5727	uint32_t lkid, remid;
5728	int error, result;
5729
5730	lkid = le32_to_cpu(rl->rl_lkid);
5731	remid = le32_to_cpu(rl->rl_remid);
5732	result = le32_to_cpu(rl->rl_result);
5733
5734	error = find_lkb(ls, lkid, &lkb);
5735	if (error) {
5736		log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5737			  lkid, rc->rc_header.h_nodeid, remid, result);
5738		return error;
5739	}
5740
5741	r = lkb->lkb_resource;
5742	hold_rsb(r);
5743	lock_rsb(r);
5744
5745	if (!is_process_copy(lkb)) {
5746		log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5747			  lkid, rc->rc_header.h_nodeid, remid, result);
5748		dlm_dump_rsb(r);
5749		unlock_rsb(r);
5750		put_rsb(r);
5751		dlm_put_lkb(lkb);
5752		return -EINVAL;
5753	}
5754
5755	switch (result) {
5756	case -EBADR:
5757		/* There's a chance the new master received our lock before
5758		   dlm_recover_master_reply(), this wouldn't happen if we did
5759		   a barrier between recover_masters and recover_locks. */
5760
5761		log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5762			  lkid, rc->rc_header.h_nodeid, remid, result);
5763
5764		dlm_send_rcom_lock(r, lkb);
5765		goto out;
5766	case -EEXIST:
5767	case 0:
5768		lkb->lkb_remid = remid;
5769		break;
5770	default:
5771		log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5772			  lkid, rc->rc_header.h_nodeid, remid, result);
5773	}
5774
5775	/* an ack for dlm_recover_locks() which waits for replies from
5776	   all the locks it sends to new masters */
5777	dlm_recovered_lock(r);
5778 out:
5779	unlock_rsb(r);
5780	put_rsb(r);
5781	dlm_put_lkb(lkb);
5782
5783	return 0;
5784}
5785
5786int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5787		     int mode, uint32_t flags, void *name, unsigned int namelen,
5788		     unsigned long timeout_cs)
5789{
5790	struct dlm_lkb *lkb;
5791	struct dlm_args args;
5792	int error;
5793
5794	dlm_lock_recovery(ls);
5795
5796	error = create_lkb(ls, &lkb);
5797	if (error) {
5798		kfree(ua);
5799		goto out;
5800	}
5801
5802	if (flags & DLM_LKF_VALBLK) {
5803		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5804		if (!ua->lksb.sb_lvbptr) {
5805			kfree(ua);
5806			__put_lkb(ls, lkb);
5807			error = -ENOMEM;
5808			goto out;
5809		}
5810	}
5811	error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5812			      fake_astfn, ua, fake_bastfn, &args);
5813	if (error) {
5814		kfree(ua->lksb.sb_lvbptr);
5815		ua->lksb.sb_lvbptr = NULL;
5816		kfree(ua);
5817		__put_lkb(ls, lkb);
5818		goto out;
5819	}
5820
5821	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
5822	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
5823	   lock and that lkb_astparam is the dlm_user_args structure. */
5824	lkb->lkb_flags |= DLM_IFL_USER;
5825	error = request_lock(ls, lkb, name, namelen, &args);
5826
5827	switch (error) {
5828	case 0:
5829		break;
5830	case -EINPROGRESS:
5831		error = 0;
5832		break;
5833	case -EAGAIN:
5834		error = 0;
5835		fallthrough;
5836	default:
5837		__put_lkb(ls, lkb);
5838		goto out;
5839	}
5840
5841	/* add this new lkb to the per-process list of locks */
5842	spin_lock(&ua->proc->locks_spin);
5843	hold_lkb(lkb);
5844	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5845	spin_unlock(&ua->proc->locks_spin);
5846 out:
5847	dlm_unlock_recovery(ls);
5848	return error;
5849}
5850
5851int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5852		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5853		     unsigned long timeout_cs)
5854{
5855	struct dlm_lkb *lkb;
5856	struct dlm_args args;
5857	struct dlm_user_args *ua;
5858	int error;
5859
5860	dlm_lock_recovery(ls);
5861
5862	error = find_lkb(ls, lkid, &lkb);
5863	if (error)
5864		goto out;
5865
5866	/* user can change the params on its lock when it converts it, or
5867	   add an lvb that didn't exist before */
5868
5869	ua = lkb->lkb_ua;
5870
5871	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5872		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5873		if (!ua->lksb.sb_lvbptr) {
5874			error = -ENOMEM;
5875			goto out_put;
5876		}
5877	}
5878	if (lvb_in && ua->lksb.sb_lvbptr)
5879		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5880
5881	ua->xid = ua_tmp->xid;
5882	ua->castparam = ua_tmp->castparam;
5883	ua->castaddr = ua_tmp->castaddr;
5884	ua->bastparam = ua_tmp->bastparam;
5885	ua->bastaddr = ua_tmp->bastaddr;
5886	ua->user_lksb = ua_tmp->user_lksb;
5887
5888	error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5889			      fake_astfn, ua, fake_bastfn, &args);
5890	if (error)
5891		goto out_put;
5892
5893	error = convert_lock(ls, lkb, &args);
5894
5895	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5896		error = 0;
5897 out_put:
5898	dlm_put_lkb(lkb);
5899 out:
5900	dlm_unlock_recovery(ls);
5901	kfree(ua_tmp);
5902	return error;
5903}
5904
5905/*
5906 * The caller asks for an orphan lock on a given resource with a given mode.
5907 * If a matching lock exists, it's moved to the owner's list of locks and
5908 * the lkid is returned.
5909 */
5910
5911int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5912		     int mode, uint32_t flags, void *name, unsigned int namelen,
5913		     unsigned long timeout_cs, uint32_t *lkid)
5914{
5915	struct dlm_lkb *lkb = NULL, *iter;
5916	struct dlm_user_args *ua;
5917	int found_other_mode = 0;
5918	int rv = 0;
5919
5920	mutex_lock(&ls->ls_orphans_mutex);
5921	list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5922		if (iter->lkb_resource->res_length != namelen)
5923			continue;
5924		if (memcmp(iter->lkb_resource->res_name, name, namelen))
5925			continue;
5926		if (iter->lkb_grmode != mode) {
5927			found_other_mode = 1;
5928			continue;
5929		}
5930
5931		lkb = iter;
5932		list_del_init(&iter->lkb_ownqueue);
5933		iter->lkb_flags &= ~DLM_IFL_ORPHAN;
5934		*lkid = iter->lkb_id;
5935		break;
5936	}
5937	mutex_unlock(&ls->ls_orphans_mutex);
5938
5939	if (!lkb && found_other_mode) {
5940		rv = -EAGAIN;
5941		goto out;
5942	}
5943
5944	if (!lkb) {
5945		rv = -ENOENT;
5946		goto out;
5947	}
5948
5949	lkb->lkb_exflags = flags;
5950	lkb->lkb_ownpid = (int) current->pid;
5951
5952	ua = lkb->lkb_ua;
5953
5954	ua->proc = ua_tmp->proc;
5955	ua->xid = ua_tmp->xid;
5956	ua->castparam = ua_tmp->castparam;
5957	ua->castaddr = ua_tmp->castaddr;
5958	ua->bastparam = ua_tmp->bastparam;
5959	ua->bastaddr = ua_tmp->bastaddr;
5960	ua->user_lksb = ua_tmp->user_lksb;
5961
5962	/*
5963	 * The lkb reference from the ls_orphans list was not
5964	 * removed above, and is now considered the reference
5965	 * for the proc locks list.
5966	 */
5967
5968	spin_lock(&ua->proc->locks_spin);
5969	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5970	spin_unlock(&ua->proc->locks_spin);
5971 out:
5972	kfree(ua_tmp);
5973	return rv;
5974}
5975
5976int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5977		    uint32_t flags, uint32_t lkid, char *lvb_in)
5978{
5979	struct dlm_lkb *lkb;
5980	struct dlm_args args;
5981	struct dlm_user_args *ua;
5982	int error;
5983
5984	dlm_lock_recovery(ls);
5985
5986	error = find_lkb(ls, lkid, &lkb);
5987	if (error)
5988		goto out;
5989
5990	ua = lkb->lkb_ua;
5991
5992	if (lvb_in && ua->lksb.sb_lvbptr)
5993		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5994	if (ua_tmp->castparam)
5995		ua->castparam = ua_tmp->castparam;
5996	ua->user_lksb = ua_tmp->user_lksb;
5997
5998	error = set_unlock_args(flags, ua, &args);
5999	if (error)
6000		goto out_put;
6001
6002	error = unlock_lock(ls, lkb, &args);
6003
6004	if (error == -DLM_EUNLOCK)
6005		error = 0;
6006	/* from validate_unlock_args() */
6007	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
6008		error = 0;
6009	if (error)
6010		goto out_put;
6011
6012	spin_lock(&ua->proc->locks_spin);
6013	/* dlm_user_add_cb() may have already taken lkb off the proc list */
6014	if (!list_empty(&lkb->lkb_ownqueue))
6015		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
6016	spin_unlock(&ua->proc->locks_spin);
6017 out_put:
6018	dlm_put_lkb(lkb);
6019 out:
6020	dlm_unlock_recovery(ls);
6021	kfree(ua_tmp);
6022	return error;
6023}
6024
6025int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6026		    uint32_t flags, uint32_t lkid)
6027{
6028	struct dlm_lkb *lkb;
6029	struct dlm_args args;
6030	struct dlm_user_args *ua;
6031	int error;
6032
6033	dlm_lock_recovery(ls);
6034
6035	error = find_lkb(ls, lkid, &lkb);
6036	if (error)
6037		goto out;
6038
6039	ua = lkb->lkb_ua;
6040	if (ua_tmp->castparam)
6041		ua->castparam = ua_tmp->castparam;
6042	ua->user_lksb = ua_tmp->user_lksb;
6043
6044	error = set_unlock_args(flags, ua, &args);
6045	if (error)
6046		goto out_put;
6047
6048	error = cancel_lock(ls, lkb, &args);
6049
6050	if (error == -DLM_ECANCEL)
6051		error = 0;
6052	/* from validate_unlock_args() */
6053	if (error == -EBUSY)
6054		error = 0;
6055 out_put:
6056	dlm_put_lkb(lkb);
6057 out:
6058	dlm_unlock_recovery(ls);
6059	kfree(ua_tmp);
6060	return error;
6061}
6062
6063int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6064{
6065	struct dlm_lkb *lkb;
6066	struct dlm_args args;
6067	struct dlm_user_args *ua;
6068	struct dlm_rsb *r;
6069	int error;
6070
6071	dlm_lock_recovery(ls);
6072
6073	error = find_lkb(ls, lkid, &lkb);
6074	if (error)
6075		goto out;
6076
6077	ua = lkb->lkb_ua;
6078
6079	error = set_unlock_args(flags, ua, &args);
6080	if (error)
6081		goto out_put;
6082
6083	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6084
6085	r = lkb->lkb_resource;
6086	hold_rsb(r);
6087	lock_rsb(r);
6088
6089	error = validate_unlock_args(lkb, &args);
6090	if (error)
6091		goto out_r;
6092	lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
6093
6094	error = _cancel_lock(r, lkb);
6095 out_r:
6096	unlock_rsb(r);
6097	put_rsb(r);
6098
6099	if (error == -DLM_ECANCEL)
6100		error = 0;
6101	/* from validate_unlock_args() */
6102	if (error == -EBUSY)
6103		error = 0;
6104 out_put:
6105	dlm_put_lkb(lkb);
6106 out:
6107	dlm_unlock_recovery(ls);
6108	return error;
6109}
6110
6111/* lkb's that are removed from the waiters list by revert are just left on the
6112   orphans list with the granted orphan locks, to be freed by purge */
6113
6114static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6115{
6116	struct dlm_args args;
6117	int error;
6118
6119	hold_lkb(lkb); /* reference for the ls_orphans list */
6120	mutex_lock(&ls->ls_orphans_mutex);
6121	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6122	mutex_unlock(&ls->ls_orphans_mutex);
6123
6124	set_unlock_args(0, lkb->lkb_ua, &args);
6125
6126	error = cancel_lock(ls, lkb, &args);
6127	if (error == -DLM_ECANCEL)
6128		error = 0;
6129	return error;
6130}
6131
6132/* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6133   granted.  Regardless of what rsb queue the lock is on, it's removed and
6134   freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6135   if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6136
6137static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6138{
6139	struct dlm_args args;
6140	int error;
6141
6142	set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6143			lkb->lkb_ua, &args);
6144
6145	error = unlock_lock(ls, lkb, &args);
6146	if (error == -DLM_EUNLOCK)
6147		error = 0;
6148	return error;
6149}
6150
6151/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6152   (which does lock_rsb) due to deadlock with receiving a message that does
6153   lock_rsb followed by dlm_user_add_cb() */
6154
6155static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6156				     struct dlm_user_proc *proc)
6157{
6158	struct dlm_lkb *lkb = NULL;
6159
6160	mutex_lock(&ls->ls_clear_proc_locks);
6161	if (list_empty(&proc->locks))
6162		goto out;
6163
6164	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6165	list_del_init(&lkb->lkb_ownqueue);
6166
6167	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6168		lkb->lkb_flags |= DLM_IFL_ORPHAN;
6169	else
6170		lkb->lkb_flags |= DLM_IFL_DEAD;
6171 out:
6172	mutex_unlock(&ls->ls_clear_proc_locks);
6173	return lkb;
6174}
6175
6176/* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6177   1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6178   which we clear here. */
6179
6180/* proc CLOSING flag is set so no more device_reads should look at proc->asts
6181   list, and no more device_writes should add lkb's to proc->locks list; so we
6182   shouldn't need to take asts_spin or locks_spin here.  this assumes that
6183   device reads/writes/closes are serialized -- FIXME: we may need to serialize
6184   them ourself. */
6185
6186void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6187{
6188	struct dlm_lkb *lkb, *safe;
6189
6190	dlm_lock_recovery(ls);
6191
6192	while (1) {
6193		lkb = del_proc_lock(ls, proc);
6194		if (!lkb)
6195			break;
6196		del_timeout(lkb);
6197		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6198			orphan_proc_lock(ls, lkb);
6199		else
6200			unlock_proc_lock(ls, lkb);
6201
6202		/* this removes the reference for the proc->locks list
6203		   added by dlm_user_request, it may result in the lkb
6204		   being freed */
6205
6206		dlm_put_lkb(lkb);
6207	}
6208
6209	mutex_lock(&ls->ls_clear_proc_locks);
6210
6211	/* in-progress unlocks */
6212	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6213		list_del_init(&lkb->lkb_ownqueue);
6214		lkb->lkb_flags |= DLM_IFL_DEAD;
6215		dlm_put_lkb(lkb);
6216	}
6217
6218	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6219		memset(&lkb->lkb_callbacks, 0,
6220		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6221		list_del_init(&lkb->lkb_cb_list);
6222		dlm_put_lkb(lkb);
6223	}
6224
6225	mutex_unlock(&ls->ls_clear_proc_locks);
6226	dlm_unlock_recovery(ls);
6227}
6228
6229static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6230{
6231	struct dlm_lkb *lkb, *safe;
6232
6233	while (1) {
6234		lkb = NULL;
6235		spin_lock(&proc->locks_spin);
6236		if (!list_empty(&proc->locks)) {
6237			lkb = list_entry(proc->locks.next, struct dlm_lkb,
6238					 lkb_ownqueue);
6239			list_del_init(&lkb->lkb_ownqueue);
6240		}
6241		spin_unlock(&proc->locks_spin);
6242
6243		if (!lkb)
6244			break;
6245
6246		lkb->lkb_flags |= DLM_IFL_DEAD;
6247		unlock_proc_lock(ls, lkb);
6248		dlm_put_lkb(lkb); /* ref from proc->locks list */
6249	}
6250
6251	spin_lock(&proc->locks_spin);
6252	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6253		list_del_init(&lkb->lkb_ownqueue);
6254		lkb->lkb_flags |= DLM_IFL_DEAD;
6255		dlm_put_lkb(lkb);
6256	}
6257	spin_unlock(&proc->locks_spin);
6258
6259	spin_lock(&proc->asts_spin);
6260	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6261		memset(&lkb->lkb_callbacks, 0,
6262		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6263		list_del_init(&lkb->lkb_cb_list);
6264		dlm_put_lkb(lkb);
6265	}
6266	spin_unlock(&proc->asts_spin);
6267}
6268
6269/* pid of 0 means purge all orphans */
6270
6271static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6272{
6273	struct dlm_lkb *lkb, *safe;
6274
6275	mutex_lock(&ls->ls_orphans_mutex);
6276	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6277		if (pid && lkb->lkb_ownpid != pid)
6278			continue;
6279		unlock_proc_lock(ls, lkb);
6280		list_del_init(&lkb->lkb_ownqueue);
6281		dlm_put_lkb(lkb);
6282	}
6283	mutex_unlock(&ls->ls_orphans_mutex);
6284}
6285
6286static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6287{
6288	struct dlm_message *ms;
6289	struct dlm_mhandle *mh;
6290	int error;
6291
6292	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6293				DLM_MSG_PURGE, &ms, &mh);
6294	if (error)
6295		return error;
6296	ms->m_nodeid = nodeid;
6297	ms->m_pid = pid;
6298
6299	return send_message(mh, ms);
6300}
6301
6302int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6303		   int nodeid, int pid)
6304{
6305	int error = 0;
6306
6307	if (nodeid && (nodeid != dlm_our_nodeid())) {
6308		error = send_purge(ls, nodeid, pid);
6309	} else {
6310		dlm_lock_recovery(ls);
6311		if (pid == current->pid)
6312			purge_proc_locks(ls, proc);
6313		else
6314			do_purge(ls, nodeid, pid);
6315		dlm_unlock_recovery(ls);
6316	}
6317	return error;
6318}
6319
6320