xref: /kernel/linux/linux-5.10/fs/dlm/lockspace.c (revision 8c2ecf20)
1// SPDX-License-Identifier: GPL-2.0-only
2/******************************************************************************
3*******************************************************************************
4**
5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7**
8**
9*******************************************************************************
10******************************************************************************/
11
12#include <linux/module.h>
13
14#include "dlm_internal.h"
15#include "lockspace.h"
16#include "member.h"
17#include "recoverd.h"
18#include "dir.h"
19#include "lowcomms.h"
20#include "config.h"
21#include "memory.h"
22#include "lock.h"
23#include "recover.h"
24#include "requestqueue.h"
25#include "user.h"
26#include "ast.h"
27
28static int			ls_count;
29static struct mutex		ls_lock;
30static struct list_head		lslist;
31static spinlock_t		lslist_lock;
32static struct task_struct *	scand_task;
33
34
35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36{
37	ssize_t ret = len;
38	int n;
39	int rc = kstrtoint(buf, 0, &n);
40
41	if (rc)
42		return rc;
43	ls = dlm_find_lockspace_local(ls->ls_local_handle);
44	if (!ls)
45		return -EINVAL;
46
47	switch (n) {
48	case 0:
49		dlm_ls_stop(ls);
50		break;
51	case 1:
52		dlm_ls_start(ls);
53		break;
54	default:
55		ret = -EINVAL;
56	}
57	dlm_put_lockspace(ls);
58	return ret;
59}
60
61static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62{
63	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64
65	if (rc)
66		return rc;
67	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68	wake_up(&ls->ls_uevent_wait);
69	return len;
70}
71
72static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73{
74	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75}
76
77static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78{
79	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80
81	if (rc)
82		return rc;
83	return len;
84}
85
86static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87{
88	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89}
90
91static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92{
93	int val;
94	int rc = kstrtoint(buf, 0, &val);
95
96	if (rc)
97		return rc;
98	if (val == 1)
99		set_bit(LSFL_NODIR, &ls->ls_flags);
100	return len;
101}
102
103static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104{
105	uint32_t status = dlm_recover_status(ls);
106	return snprintf(buf, PAGE_SIZE, "%x\n", status);
107}
108
109static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110{
111	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112}
113
114struct dlm_attr {
115	struct attribute attr;
116	ssize_t (*show)(struct dlm_ls *, char *);
117	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118};
119
120static struct dlm_attr dlm_attr_control = {
121	.attr  = {.name = "control", .mode = S_IWUSR},
122	.store = dlm_control_store
123};
124
125static struct dlm_attr dlm_attr_event = {
126	.attr  = {.name = "event_done", .mode = S_IWUSR},
127	.store = dlm_event_store
128};
129
130static struct dlm_attr dlm_attr_id = {
131	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132	.show  = dlm_id_show,
133	.store = dlm_id_store
134};
135
136static struct dlm_attr dlm_attr_nodir = {
137	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138	.show  = dlm_nodir_show,
139	.store = dlm_nodir_store
140};
141
142static struct dlm_attr dlm_attr_recover_status = {
143	.attr  = {.name = "recover_status", .mode = S_IRUGO},
144	.show  = dlm_recover_status_show
145};
146
147static struct dlm_attr dlm_attr_recover_nodeid = {
148	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149	.show  = dlm_recover_nodeid_show
150};
151
152static struct attribute *dlm_attrs[] = {
153	&dlm_attr_control.attr,
154	&dlm_attr_event.attr,
155	&dlm_attr_id.attr,
156	&dlm_attr_nodir.attr,
157	&dlm_attr_recover_status.attr,
158	&dlm_attr_recover_nodeid.attr,
159	NULL,
160};
161ATTRIBUTE_GROUPS(dlm);
162
163static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164			     char *buf)
165{
166	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168	return a->show ? a->show(ls, buf) : 0;
169}
170
171static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172			      const char *buf, size_t len)
173{
174	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176	return a->store ? a->store(ls, buf, len) : len;
177}
178
179static void lockspace_kobj_release(struct kobject *k)
180{
181	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182	kfree(ls);
183}
184
185static const struct sysfs_ops dlm_attr_ops = {
186	.show  = dlm_attr_show,
187	.store = dlm_attr_store,
188};
189
190static struct kobj_type dlm_ktype = {
191	.default_groups = dlm_groups,
192	.sysfs_ops     = &dlm_attr_ops,
193	.release       = lockspace_kobj_release,
194};
195
196static struct kset *dlm_kset;
197
198static int do_uevent(struct dlm_ls *ls, int in)
199{
200	if (in)
201		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
202	else
203		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
204
205	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
206
207	/* dlm_controld will see the uevent, do the necessary group management
208	   and then write to sysfs to wake us */
209
210	wait_event(ls->ls_uevent_wait,
211		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
212
213	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
214
215	return ls->ls_uevent_result;
216}
217
218static int dlm_uevent(struct kset *kset, struct kobject *kobj,
219		      struct kobj_uevent_env *env)
220{
221	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
222
223	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
224	return 0;
225}
226
227static const struct kset_uevent_ops dlm_uevent_ops = {
228	.uevent = dlm_uevent,
229};
230
231int __init dlm_lockspace_init(void)
232{
233	ls_count = 0;
234	mutex_init(&ls_lock);
235	INIT_LIST_HEAD(&lslist);
236	spin_lock_init(&lslist_lock);
237
238	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
239	if (!dlm_kset) {
240		printk(KERN_WARNING "%s: can not create kset\n", __func__);
241		return -ENOMEM;
242	}
243	return 0;
244}
245
246void dlm_lockspace_exit(void)
247{
248	kset_unregister(dlm_kset);
249}
250
251static struct dlm_ls *find_ls_to_scan(void)
252{
253	struct dlm_ls *ls;
254
255	spin_lock(&lslist_lock);
256	list_for_each_entry(ls, &lslist, ls_list) {
257		if (time_after_eq(jiffies, ls->ls_scan_time +
258					    dlm_config.ci_scan_secs * HZ)) {
259			spin_unlock(&lslist_lock);
260			return ls;
261		}
262	}
263	spin_unlock(&lslist_lock);
264	return NULL;
265}
266
267static int dlm_scand(void *data)
268{
269	struct dlm_ls *ls;
270
271	while (!kthread_should_stop()) {
272		ls = find_ls_to_scan();
273		if (ls) {
274			if (dlm_lock_recovery_try(ls)) {
275				ls->ls_scan_time = jiffies;
276				dlm_scan_rsbs(ls);
277				dlm_scan_timeout(ls);
278				dlm_scan_waiters(ls);
279				dlm_unlock_recovery(ls);
280			} else {
281				ls->ls_scan_time += HZ;
282			}
283			continue;
284		}
285		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
286	}
287	return 0;
288}
289
290static int dlm_scand_start(void)
291{
292	struct task_struct *p;
293	int error = 0;
294
295	p = kthread_run(dlm_scand, NULL, "dlm_scand");
296	if (IS_ERR(p))
297		error = PTR_ERR(p);
298	else
299		scand_task = p;
300	return error;
301}
302
303static void dlm_scand_stop(void)
304{
305	kthread_stop(scand_task);
306}
307
308struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
309{
310	struct dlm_ls *ls;
311
312	spin_lock(&lslist_lock);
313
314	list_for_each_entry(ls, &lslist, ls_list) {
315		if (ls->ls_global_id == id) {
316			ls->ls_count++;
317			goto out;
318		}
319	}
320	ls = NULL;
321 out:
322	spin_unlock(&lslist_lock);
323	return ls;
324}
325
326struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
327{
328	struct dlm_ls *ls;
329
330	spin_lock(&lslist_lock);
331	list_for_each_entry(ls, &lslist, ls_list) {
332		if (ls->ls_local_handle == lockspace) {
333			ls->ls_count++;
334			goto out;
335		}
336	}
337	ls = NULL;
338 out:
339	spin_unlock(&lslist_lock);
340	return ls;
341}
342
343struct dlm_ls *dlm_find_lockspace_device(int minor)
344{
345	struct dlm_ls *ls;
346
347	spin_lock(&lslist_lock);
348	list_for_each_entry(ls, &lslist, ls_list) {
349		if (ls->ls_device.minor == minor) {
350			ls->ls_count++;
351			goto out;
352		}
353	}
354	ls = NULL;
355 out:
356	spin_unlock(&lslist_lock);
357	return ls;
358}
359
360void dlm_put_lockspace(struct dlm_ls *ls)
361{
362	spin_lock(&lslist_lock);
363	ls->ls_count--;
364	spin_unlock(&lslist_lock);
365}
366
367static void remove_lockspace(struct dlm_ls *ls)
368{
369	for (;;) {
370		spin_lock(&lslist_lock);
371		if (ls->ls_count == 0) {
372			WARN_ON(ls->ls_create_count != 0);
373			list_del(&ls->ls_list);
374			spin_unlock(&lslist_lock);
375			return;
376		}
377		spin_unlock(&lslist_lock);
378		ssleep(1);
379	}
380}
381
382static int threads_start(void)
383{
384	int error;
385
386	error = dlm_scand_start();
387	if (error) {
388		log_print("cannot start dlm_scand thread %d", error);
389		goto fail;
390	}
391
392	/* Thread for sending/receiving messages for all lockspace's */
393	error = dlm_lowcomms_start();
394	if (error) {
395		log_print("cannot start dlm lowcomms %d", error);
396		goto scand_fail;
397	}
398
399	return 0;
400
401 scand_fail:
402	dlm_scand_stop();
403 fail:
404	return error;
405}
406
407static void threads_stop(void)
408{
409	dlm_scand_stop();
410	dlm_lowcomms_stop();
411}
412
413static int new_lockspace(const char *name, const char *cluster,
414			 uint32_t flags, int lvblen,
415			 const struct dlm_lockspace_ops *ops, void *ops_arg,
416			 int *ops_result, dlm_lockspace_t **lockspace)
417{
418	struct dlm_ls *ls;
419	int i, size, error;
420	int do_unreg = 0;
421	int namelen = strlen(name);
422
423	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
424		return -EINVAL;
425
426	if (!lvblen || (lvblen % 8))
427		return -EINVAL;
428
429	if (!try_module_get(THIS_MODULE))
430		return -EINVAL;
431
432	if (!dlm_user_daemon_available()) {
433		log_print("dlm user daemon not available");
434		error = -EUNATCH;
435		goto out;
436	}
437
438	if (ops && ops_result) {
439	       	if (!dlm_config.ci_recover_callbacks)
440			*ops_result = -EOPNOTSUPP;
441		else
442			*ops_result = 0;
443	}
444
445	if (!cluster)
446		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
447			  dlm_config.ci_cluster_name);
448
449	if (dlm_config.ci_recover_callbacks && cluster &&
450	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
451		log_print("dlm cluster name '%s' does not match "
452			  "the application cluster name '%s'",
453			  dlm_config.ci_cluster_name, cluster);
454		error = -EBADR;
455		goto out;
456	}
457
458	error = 0;
459
460	spin_lock(&lslist_lock);
461	list_for_each_entry(ls, &lslist, ls_list) {
462		WARN_ON(ls->ls_create_count <= 0);
463		if (ls->ls_namelen != namelen)
464			continue;
465		if (memcmp(ls->ls_name, name, namelen))
466			continue;
467		if (flags & DLM_LSFL_NEWEXCL) {
468			error = -EEXIST;
469			break;
470		}
471		ls->ls_create_count++;
472		*lockspace = ls;
473		error = 1;
474		break;
475	}
476	spin_unlock(&lslist_lock);
477
478	if (error)
479		goto out;
480
481	error = -ENOMEM;
482
483	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
484	if (!ls)
485		goto out;
486	memcpy(ls->ls_name, name, namelen);
487	ls->ls_namelen = namelen;
488	ls->ls_lvblen = lvblen;
489	ls->ls_count = 0;
490	ls->ls_flags = 0;
491	ls->ls_scan_time = jiffies;
492
493	if (ops && dlm_config.ci_recover_callbacks) {
494		ls->ls_ops = ops;
495		ls->ls_ops_arg = ops_arg;
496	}
497
498	if (flags & DLM_LSFL_TIMEWARN)
499		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
500
501	/* ls_exflags are forced to match among nodes, and we don't
502	   need to require all nodes to have some flags set */
503	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
504				    DLM_LSFL_NEWEXCL));
505
506	size = dlm_config.ci_rsbtbl_size;
507	ls->ls_rsbtbl_size = size;
508
509	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
510	if (!ls->ls_rsbtbl)
511		goto out_lsfree;
512	for (i = 0; i < size; i++) {
513		ls->ls_rsbtbl[i].keep.rb_node = NULL;
514		ls->ls_rsbtbl[i].toss.rb_node = NULL;
515		spin_lock_init(&ls->ls_rsbtbl[i].lock);
516	}
517
518	spin_lock_init(&ls->ls_remove_spin);
519
520	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
521		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
522						 GFP_KERNEL);
523		if (!ls->ls_remove_names[i])
524			goto out_rsbtbl;
525	}
526
527	idr_init(&ls->ls_lkbidr);
528	spin_lock_init(&ls->ls_lkbidr_spin);
529
530	INIT_LIST_HEAD(&ls->ls_waiters);
531	mutex_init(&ls->ls_waiters_mutex);
532	INIT_LIST_HEAD(&ls->ls_orphans);
533	mutex_init(&ls->ls_orphans_mutex);
534	INIT_LIST_HEAD(&ls->ls_timeout);
535	mutex_init(&ls->ls_timeout_mutex);
536
537	INIT_LIST_HEAD(&ls->ls_new_rsb);
538	spin_lock_init(&ls->ls_new_rsb_spin);
539
540	INIT_LIST_HEAD(&ls->ls_nodes);
541	INIT_LIST_HEAD(&ls->ls_nodes_gone);
542	ls->ls_num_nodes = 0;
543	ls->ls_low_nodeid = 0;
544	ls->ls_total_weight = 0;
545	ls->ls_node_array = NULL;
546
547	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
548	ls->ls_stub_rsb.res_ls = ls;
549
550	ls->ls_debug_rsb_dentry = NULL;
551	ls->ls_debug_waiters_dentry = NULL;
552
553	init_waitqueue_head(&ls->ls_uevent_wait);
554	ls->ls_uevent_result = 0;
555	init_completion(&ls->ls_members_done);
556	ls->ls_members_result = -1;
557
558	mutex_init(&ls->ls_cb_mutex);
559	INIT_LIST_HEAD(&ls->ls_cb_delay);
560
561	ls->ls_recoverd_task = NULL;
562	mutex_init(&ls->ls_recoverd_active);
563	spin_lock_init(&ls->ls_recover_lock);
564	spin_lock_init(&ls->ls_rcom_spin);
565	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
566	ls->ls_recover_status = 0;
567	ls->ls_recover_seq = 0;
568	ls->ls_recover_args = NULL;
569	init_rwsem(&ls->ls_in_recovery);
570	init_rwsem(&ls->ls_recv_active);
571	INIT_LIST_HEAD(&ls->ls_requestqueue);
572	mutex_init(&ls->ls_requestqueue_mutex);
573	mutex_init(&ls->ls_clear_proc_locks);
574
575	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
576	if (!ls->ls_recover_buf)
577		goto out_lkbidr;
578
579	ls->ls_slot = 0;
580	ls->ls_num_slots = 0;
581	ls->ls_slots_size = 0;
582	ls->ls_slots = NULL;
583
584	INIT_LIST_HEAD(&ls->ls_recover_list);
585	spin_lock_init(&ls->ls_recover_list_lock);
586	idr_init(&ls->ls_recover_idr);
587	spin_lock_init(&ls->ls_recover_idr_lock);
588	ls->ls_recover_list_count = 0;
589	ls->ls_local_handle = ls;
590	init_waitqueue_head(&ls->ls_wait_general);
591	INIT_LIST_HEAD(&ls->ls_root_list);
592	init_rwsem(&ls->ls_root_sem);
593
594	spin_lock(&lslist_lock);
595	ls->ls_create_count = 1;
596	list_add(&ls->ls_list, &lslist);
597	spin_unlock(&lslist_lock);
598
599	if (flags & DLM_LSFL_FS) {
600		error = dlm_callback_start(ls);
601		if (error) {
602			log_error(ls, "can't start dlm_callback %d", error);
603			goto out_delist;
604		}
605	}
606
607	init_waitqueue_head(&ls->ls_recover_lock_wait);
608
609	/*
610	 * Once started, dlm_recoverd first looks for ls in lslist, then
611	 * initializes ls_in_recovery as locked in "down" mode.  We need
612	 * to wait for the wakeup from dlm_recoverd because in_recovery
613	 * has to start out in down mode.
614	 */
615
616	error = dlm_recoverd_start(ls);
617	if (error) {
618		log_error(ls, "can't start dlm_recoverd %d", error);
619		goto out_callback;
620	}
621
622	wait_event(ls->ls_recover_lock_wait,
623		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
624
625	/* let kobject handle freeing of ls if there's an error */
626	do_unreg = 1;
627
628	ls->ls_kobj.kset = dlm_kset;
629	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
630				     "%s", ls->ls_name);
631	if (error)
632		goto out_recoverd;
633	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
634
635	/* This uevent triggers dlm_controld in userspace to add us to the
636	   group of nodes that are members of this lockspace (managed by the
637	   cluster infrastructure.)  Once it's done that, it tells us who the
638	   current lockspace members are (via configfs) and then tells the
639	   lockspace to start running (via sysfs) in dlm_ls_start(). */
640
641	error = do_uevent(ls, 1);
642	if (error)
643		goto out_recoverd;
644
645	wait_for_completion(&ls->ls_members_done);
646	error = ls->ls_members_result;
647	if (error)
648		goto out_members;
649
650	dlm_create_debug_file(ls);
651
652	log_rinfo(ls, "join complete");
653	*lockspace = ls;
654	return 0;
655
656 out_members:
657	do_uevent(ls, 0);
658	dlm_clear_members(ls);
659	kfree(ls->ls_node_array);
660 out_recoverd:
661	dlm_recoverd_stop(ls);
662 out_callback:
663	dlm_callback_stop(ls);
664 out_delist:
665	spin_lock(&lslist_lock);
666	list_del(&ls->ls_list);
667	spin_unlock(&lslist_lock);
668	idr_destroy(&ls->ls_recover_idr);
669	kfree(ls->ls_recover_buf);
670 out_lkbidr:
671	idr_destroy(&ls->ls_lkbidr);
672 out_rsbtbl:
673	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
674		kfree(ls->ls_remove_names[i]);
675	vfree(ls->ls_rsbtbl);
676 out_lsfree:
677	if (do_unreg)
678		kobject_put(&ls->ls_kobj);
679	else
680		kfree(ls);
681 out:
682	module_put(THIS_MODULE);
683	return error;
684}
685
686int dlm_new_lockspace(const char *name, const char *cluster,
687		      uint32_t flags, int lvblen,
688		      const struct dlm_lockspace_ops *ops, void *ops_arg,
689		      int *ops_result, dlm_lockspace_t **lockspace)
690{
691	int error = 0;
692
693	mutex_lock(&ls_lock);
694	if (!ls_count)
695		error = threads_start();
696	if (error)
697		goto out;
698
699	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
700			      ops_result, lockspace);
701	if (!error)
702		ls_count++;
703	if (error > 0)
704		error = 0;
705	if (!ls_count)
706		threads_stop();
707 out:
708	mutex_unlock(&ls_lock);
709	return error;
710}
711
712static int lkb_idr_is_local(int id, void *p, void *data)
713{
714	struct dlm_lkb *lkb = p;
715
716	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
717}
718
719static int lkb_idr_is_any(int id, void *p, void *data)
720{
721	return 1;
722}
723
724static int lkb_idr_free(int id, void *p, void *data)
725{
726	struct dlm_lkb *lkb = p;
727
728	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
729		dlm_free_lvb(lkb->lkb_lvbptr);
730
731	dlm_free_lkb(lkb);
732	return 0;
733}
734
735/* NOTE: We check the lkbidr here rather than the resource table.
736   This is because there may be LKBs queued as ASTs that have been unlinked
737   from their RSBs and are pending deletion once the AST has been delivered */
738
739static int lockspace_busy(struct dlm_ls *ls, int force)
740{
741	int rv;
742
743	spin_lock(&ls->ls_lkbidr_spin);
744	if (force == 0) {
745		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
746	} else if (force == 1) {
747		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
748	} else {
749		rv = 0;
750	}
751	spin_unlock(&ls->ls_lkbidr_spin);
752	return rv;
753}
754
755static int release_lockspace(struct dlm_ls *ls, int force)
756{
757	struct dlm_rsb *rsb;
758	struct rb_node *n;
759	int i, busy, rv;
760
761	busy = lockspace_busy(ls, force);
762
763	spin_lock(&lslist_lock);
764	if (ls->ls_create_count == 1) {
765		if (busy) {
766			rv = -EBUSY;
767		} else {
768			/* remove_lockspace takes ls off lslist */
769			ls->ls_create_count = 0;
770			rv = 0;
771		}
772	} else if (ls->ls_create_count > 1) {
773		rv = --ls->ls_create_count;
774	} else {
775		rv = -EINVAL;
776	}
777	spin_unlock(&lslist_lock);
778
779	if (rv) {
780		log_debug(ls, "release_lockspace no remove %d", rv);
781		return rv;
782	}
783
784	dlm_device_deregister(ls);
785
786	if (force < 3 && dlm_user_daemon_available())
787		do_uevent(ls, 0);
788
789	dlm_recoverd_stop(ls);
790
791	dlm_callback_stop(ls);
792
793	remove_lockspace(ls);
794
795	dlm_delete_debug_file(ls);
796
797	idr_destroy(&ls->ls_recover_idr);
798	kfree(ls->ls_recover_buf);
799
800	/*
801	 * Free all lkb's in idr
802	 */
803
804	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
805	idr_destroy(&ls->ls_lkbidr);
806
807	/*
808	 * Free all rsb's on rsbtbl[] lists
809	 */
810
811	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
812		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
813			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
814			rb_erase(n, &ls->ls_rsbtbl[i].keep);
815			dlm_free_rsb(rsb);
816		}
817
818		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
819			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
820			rb_erase(n, &ls->ls_rsbtbl[i].toss);
821			dlm_free_rsb(rsb);
822		}
823	}
824
825	vfree(ls->ls_rsbtbl);
826
827	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
828		kfree(ls->ls_remove_names[i]);
829
830	while (!list_empty(&ls->ls_new_rsb)) {
831		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
832				       res_hashchain);
833		list_del(&rsb->res_hashchain);
834		dlm_free_rsb(rsb);
835	}
836
837	/*
838	 * Free structures on any other lists
839	 */
840
841	dlm_purge_requestqueue(ls);
842	kfree(ls->ls_recover_args);
843	dlm_clear_members(ls);
844	dlm_clear_members_gone(ls);
845	kfree(ls->ls_node_array);
846	log_rinfo(ls, "release_lockspace final free");
847	kobject_put(&ls->ls_kobj);
848	/* The ls structure will be freed when the kobject is done with */
849
850	module_put(THIS_MODULE);
851	return 0;
852}
853
854/*
855 * Called when a system has released all its locks and is not going to use the
856 * lockspace any longer.  We free everything we're managing for this lockspace.
857 * Remaining nodes will go through the recovery process as if we'd died.  The
858 * lockspace must continue to function as usual, participating in recoveries,
859 * until this returns.
860 *
861 * Force has 4 possible values:
862 * 0 - don't destroy locksapce if it has any LKBs
863 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
864 * 2 - destroy lockspace regardless of LKBs
865 * 3 - destroy lockspace as part of a forced shutdown
866 */
867
868int dlm_release_lockspace(void *lockspace, int force)
869{
870	struct dlm_ls *ls;
871	int error;
872
873	ls = dlm_find_lockspace_local(lockspace);
874	if (!ls)
875		return -EINVAL;
876	dlm_put_lockspace(ls);
877
878	mutex_lock(&ls_lock);
879	error = release_lockspace(ls, force);
880	if (!error)
881		ls_count--;
882	if (!ls_count)
883		threads_stop();
884	mutex_unlock(&ls_lock);
885
886	return error;
887}
888
889void dlm_stop_lockspaces(void)
890{
891	struct dlm_ls *ls;
892	int count;
893
894 restart:
895	count = 0;
896	spin_lock(&lslist_lock);
897	list_for_each_entry(ls, &lslist, ls_list) {
898		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
899			count++;
900			continue;
901		}
902		spin_unlock(&lslist_lock);
903		log_error(ls, "no userland control daemon, stopping lockspace");
904		dlm_ls_stop(ls);
905		goto restart;
906	}
907	spin_unlock(&lslist_lock);
908
909	if (count)
910		log_print("dlm user daemon left %d lockspaces", count);
911}
912
913