xref: /kernel/linux/linux-5.10/fs/ceph/super.c (revision 8c2ecf20)
1// SPDX-License-Identifier: GPL-2.0-only
2
3#include <linux/ceph/ceph_debug.h>
4
5#include <linux/backing-dev.h>
6#include <linux/ctype.h>
7#include <linux/fs.h>
8#include <linux/inet.h>
9#include <linux/in6.h>
10#include <linux/module.h>
11#include <linux/mount.h>
12#include <linux/fs_context.h>
13#include <linux/fs_parser.h>
14#include <linux/sched.h>
15#include <linux/seq_file.h>
16#include <linux/slab.h>
17#include <linux/statfs.h>
18#include <linux/string.h>
19
20#include "super.h"
21#include "mds_client.h"
22#include "cache.h"
23
24#include <linux/ceph/ceph_features.h>
25#include <linux/ceph/decode.h>
26#include <linux/ceph/mon_client.h>
27#include <linux/ceph/auth.h>
28#include <linux/ceph/debugfs.h>
29
30static DEFINE_SPINLOCK(ceph_fsc_lock);
31static LIST_HEAD(ceph_fsc_list);
32
33/*
34 * Ceph superblock operations
35 *
36 * Handle the basics of mounting, unmounting.
37 */
38
39/*
40 * super ops
41 */
42static void ceph_put_super(struct super_block *s)
43{
44	struct ceph_fs_client *fsc = ceph_sb_to_client(s);
45
46	dout("put_super\n");
47	ceph_mdsc_close_sessions(fsc->mdsc);
48}
49
50static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
51{
52	struct ceph_fs_client *fsc = ceph_inode_to_client(d_inode(dentry));
53	struct ceph_mon_client *monc = &fsc->client->monc;
54	struct ceph_statfs st;
55	int i, err;
56	u64 data_pool;
57
58	if (fsc->mdsc->mdsmap->m_num_data_pg_pools == 1) {
59		data_pool = fsc->mdsc->mdsmap->m_data_pg_pools[0];
60	} else {
61		data_pool = CEPH_NOPOOL;
62	}
63
64	dout("statfs\n");
65	err = ceph_monc_do_statfs(monc, data_pool, &st);
66	if (err < 0)
67		return err;
68
69	/* fill in kstatfs */
70	buf->f_type = CEPH_SUPER_MAGIC;  /* ?? */
71
72	/*
73	 * express utilization in terms of large blocks to avoid
74	 * overflow on 32-bit machines.
75	 *
76	 * NOTE: for the time being, we make bsize == frsize to humor
77	 * not-yet-ancient versions of glibc that are broken.
78	 * Someday, we will probably want to report a real block
79	 * size...  whatever that may mean for a network file system!
80	 */
81	buf->f_bsize = 1 << CEPH_BLOCK_SHIFT;
82	buf->f_frsize = 1 << CEPH_BLOCK_SHIFT;
83
84	/*
85	 * By default use root quota for stats; fallback to overall filesystem
86	 * usage if using 'noquotadf' mount option or if the root dir doesn't
87	 * have max_bytes quota set.
88	 */
89	if (ceph_test_mount_opt(fsc, NOQUOTADF) ||
90	    !ceph_quota_update_statfs(fsc, buf)) {
91		buf->f_blocks = le64_to_cpu(st.kb) >> (CEPH_BLOCK_SHIFT-10);
92		buf->f_bfree = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10);
93		buf->f_bavail = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10);
94	}
95
96	buf->f_files = le64_to_cpu(st.num_objects);
97	buf->f_ffree = -1;
98	buf->f_namelen = NAME_MAX;
99
100	/* Must convert the fsid, for consistent values across arches */
101	buf->f_fsid.val[0] = 0;
102	mutex_lock(&monc->mutex);
103	for (i = 0 ; i < sizeof(monc->monmap->fsid) / sizeof(__le32) ; ++i)
104		buf->f_fsid.val[0] ^= le32_to_cpu(((__le32 *)&monc->monmap->fsid)[i]);
105	mutex_unlock(&monc->mutex);
106
107	/* fold the fs_cluster_id into the upper bits */
108	buf->f_fsid.val[1] = monc->fs_cluster_id;
109
110	return 0;
111}
112
113static int ceph_sync_fs(struct super_block *sb, int wait)
114{
115	struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
116
117	if (!wait) {
118		dout("sync_fs (non-blocking)\n");
119		ceph_flush_dirty_caps(fsc->mdsc);
120		dout("sync_fs (non-blocking) done\n");
121		return 0;
122	}
123
124	dout("sync_fs (blocking)\n");
125	ceph_osdc_sync(&fsc->client->osdc);
126	ceph_mdsc_sync(fsc->mdsc);
127	dout("sync_fs (blocking) done\n");
128	return 0;
129}
130
131/*
132 * mount options
133 */
134enum {
135	Opt_wsize,
136	Opt_rsize,
137	Opt_rasize,
138	Opt_caps_wanted_delay_min,
139	Opt_caps_wanted_delay_max,
140	Opt_caps_max,
141	Opt_readdir_max_entries,
142	Opt_readdir_max_bytes,
143	Opt_congestion_kb,
144	/* int args above */
145	Opt_snapdirname,
146	Opt_mds_namespace,
147	Opt_recover_session,
148	Opt_source,
149	/* string args above */
150	Opt_dirstat,
151	Opt_rbytes,
152	Opt_asyncreaddir,
153	Opt_dcache,
154	Opt_ino32,
155	Opt_fscache,
156	Opt_poolperm,
157	Opt_require_active_mds,
158	Opt_acl,
159	Opt_quotadf,
160	Opt_copyfrom,
161	Opt_wsync,
162};
163
164enum ceph_recover_session_mode {
165	ceph_recover_session_no,
166	ceph_recover_session_clean
167};
168
169static const struct constant_table ceph_param_recover[] = {
170	{ "no",		ceph_recover_session_no },
171	{ "clean",	ceph_recover_session_clean },
172	{}
173};
174
175static const struct fs_parameter_spec ceph_mount_parameters[] = {
176	fsparam_flag_no ("acl",				Opt_acl),
177	fsparam_flag_no ("asyncreaddir",		Opt_asyncreaddir),
178	fsparam_s32	("caps_max",			Opt_caps_max),
179	fsparam_u32	("caps_wanted_delay_max",	Opt_caps_wanted_delay_max),
180	fsparam_u32	("caps_wanted_delay_min",	Opt_caps_wanted_delay_min),
181	fsparam_u32	("write_congestion_kb",		Opt_congestion_kb),
182	fsparam_flag_no ("copyfrom",			Opt_copyfrom),
183	fsparam_flag_no ("dcache",			Opt_dcache),
184	fsparam_flag_no ("dirstat",			Opt_dirstat),
185	fsparam_flag_no	("fsc",				Opt_fscache), // fsc|nofsc
186	fsparam_string	("fsc",				Opt_fscache), // fsc=...
187	fsparam_flag_no ("ino32",			Opt_ino32),
188	fsparam_string	("mds_namespace",		Opt_mds_namespace),
189	fsparam_flag_no ("poolperm",			Opt_poolperm),
190	fsparam_flag_no ("quotadf",			Opt_quotadf),
191	fsparam_u32	("rasize",			Opt_rasize),
192	fsparam_flag_no ("rbytes",			Opt_rbytes),
193	fsparam_u32	("readdir_max_bytes",		Opt_readdir_max_bytes),
194	fsparam_u32	("readdir_max_entries",		Opt_readdir_max_entries),
195	fsparam_enum	("recover_session",		Opt_recover_session, ceph_param_recover),
196	fsparam_flag_no ("require_active_mds",		Opt_require_active_mds),
197	fsparam_u32	("rsize",			Opt_rsize),
198	fsparam_string	("snapdirname",			Opt_snapdirname),
199	fsparam_string	("source",			Opt_source),
200	fsparam_u32	("wsize",			Opt_wsize),
201	fsparam_flag_no	("wsync",			Opt_wsync),
202	{}
203};
204
205struct ceph_parse_opts_ctx {
206	struct ceph_options		*copts;
207	struct ceph_mount_options	*opts;
208};
209
210/*
211 * Remove adjacent slashes and then the trailing slash, unless it is
212 * the only remaining character.
213 *
214 * E.g. "//dir1////dir2///" --> "/dir1/dir2", "///" --> "/".
215 */
216static void canonicalize_path(char *path)
217{
218	int i, j = 0;
219
220	for (i = 0; path[i] != '\0'; i++) {
221		if (path[i] != '/' || j < 1 || path[j - 1] != '/')
222			path[j++] = path[i];
223	}
224
225	if (j > 1 && path[j - 1] == '/')
226		j--;
227	path[j] = '\0';
228}
229
230/*
231 * Parse the source parameter.  Distinguish the server list from the path.
232 *
233 * The source will look like:
234 *     <server_spec>[,<server_spec>...]:[<path>]
235 * where
236 *     <server_spec> is <ip>[:<port>]
237 *     <path> is optional, but if present must begin with '/'
238 */
239static int ceph_parse_source(struct fs_parameter *param, struct fs_context *fc)
240{
241	struct ceph_parse_opts_ctx *pctx = fc->fs_private;
242	struct ceph_mount_options *fsopt = pctx->opts;
243	char *dev_name = param->string, *dev_name_end;
244	int ret;
245
246	dout("%s '%s'\n", __func__, dev_name);
247	if (!dev_name || !*dev_name)
248		return invalfc(fc, "Empty source");
249
250	dev_name_end = strchr(dev_name, '/');
251	if (dev_name_end) {
252		/*
253		 * The server_path will include the whole chars from userland
254		 * including the leading '/'.
255		 */
256		kfree(fsopt->server_path);
257		fsopt->server_path = kstrdup(dev_name_end, GFP_KERNEL);
258		if (!fsopt->server_path)
259			return -ENOMEM;
260
261		canonicalize_path(fsopt->server_path);
262	} else {
263		dev_name_end = dev_name + strlen(dev_name);
264	}
265
266	dev_name_end--;		/* back up to ':' separator */
267	if (dev_name_end < dev_name || *dev_name_end != ':')
268		return invalfc(fc, "No path or : separator in source");
269
270	dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name);
271	if (fsopt->server_path)
272		dout("server path '%s'\n", fsopt->server_path);
273
274	ret = ceph_parse_mon_ips(param->string, dev_name_end - dev_name,
275				 pctx->copts, fc->log.log);
276	if (ret)
277		return ret;
278
279	fc->source = param->string;
280	param->string = NULL;
281	return 0;
282}
283
284static int ceph_parse_mount_param(struct fs_context *fc,
285				  struct fs_parameter *param)
286{
287	struct ceph_parse_opts_ctx *pctx = fc->fs_private;
288	struct ceph_mount_options *fsopt = pctx->opts;
289	struct fs_parse_result result;
290	unsigned int mode;
291	int token, ret;
292
293	ret = ceph_parse_param(param, pctx->copts, fc->log.log);
294	if (ret != -ENOPARAM)
295		return ret;
296
297	token = fs_parse(fc, ceph_mount_parameters, param, &result);
298	dout("%s fs_parse '%s' token %d\n", __func__, param->key, token);
299	if (token < 0)
300		return token;
301
302	switch (token) {
303	case Opt_snapdirname:
304		kfree(fsopt->snapdir_name);
305		fsopt->snapdir_name = param->string;
306		param->string = NULL;
307		break;
308	case Opt_mds_namespace:
309		kfree(fsopt->mds_namespace);
310		fsopt->mds_namespace = param->string;
311		param->string = NULL;
312		break;
313	case Opt_recover_session:
314		mode = result.uint_32;
315		if (mode == ceph_recover_session_no)
316			fsopt->flags &= ~CEPH_MOUNT_OPT_CLEANRECOVER;
317		else if (mode == ceph_recover_session_clean)
318			fsopt->flags |= CEPH_MOUNT_OPT_CLEANRECOVER;
319		else
320			BUG();
321		break;
322	case Opt_source:
323		if (fc->source)
324			return invalfc(fc, "Multiple sources specified");
325		return ceph_parse_source(param, fc);
326	case Opt_wsize:
327		if (result.uint_32 < PAGE_SIZE ||
328		    result.uint_32 > CEPH_MAX_WRITE_SIZE)
329			goto out_of_range;
330		fsopt->wsize = ALIGN(result.uint_32, PAGE_SIZE);
331		break;
332	case Opt_rsize:
333		if (result.uint_32 < PAGE_SIZE ||
334		    result.uint_32 > CEPH_MAX_READ_SIZE)
335			goto out_of_range;
336		fsopt->rsize = ALIGN(result.uint_32, PAGE_SIZE);
337		break;
338	case Opt_rasize:
339		fsopt->rasize = ALIGN(result.uint_32, PAGE_SIZE);
340		break;
341	case Opt_caps_wanted_delay_min:
342		if (result.uint_32 < 1)
343			goto out_of_range;
344		fsopt->caps_wanted_delay_min = result.uint_32;
345		break;
346	case Opt_caps_wanted_delay_max:
347		if (result.uint_32 < 1)
348			goto out_of_range;
349		fsopt->caps_wanted_delay_max = result.uint_32;
350		break;
351	case Opt_caps_max:
352		if (result.int_32 < 0)
353			goto out_of_range;
354		fsopt->caps_max = result.int_32;
355		break;
356	case Opt_readdir_max_entries:
357		if (result.uint_32 < 1)
358			goto out_of_range;
359		fsopt->max_readdir = result.uint_32;
360		break;
361	case Opt_readdir_max_bytes:
362		if (result.uint_32 < PAGE_SIZE && result.uint_32 != 0)
363			goto out_of_range;
364		fsopt->max_readdir_bytes = result.uint_32;
365		break;
366	case Opt_congestion_kb:
367		if (result.uint_32 < 1024) /* at least 1M */
368			goto out_of_range;
369		fsopt->congestion_kb = result.uint_32;
370		break;
371	case Opt_dirstat:
372		if (!result.negated)
373			fsopt->flags |= CEPH_MOUNT_OPT_DIRSTAT;
374		else
375			fsopt->flags &= ~CEPH_MOUNT_OPT_DIRSTAT;
376		break;
377	case Opt_rbytes:
378		if (!result.negated)
379			fsopt->flags |= CEPH_MOUNT_OPT_RBYTES;
380		else
381			fsopt->flags &= ~CEPH_MOUNT_OPT_RBYTES;
382		break;
383	case Opt_asyncreaddir:
384		if (!result.negated)
385			fsopt->flags &= ~CEPH_MOUNT_OPT_NOASYNCREADDIR;
386		else
387			fsopt->flags |= CEPH_MOUNT_OPT_NOASYNCREADDIR;
388		break;
389	case Opt_dcache:
390		if (!result.negated)
391			fsopt->flags |= CEPH_MOUNT_OPT_DCACHE;
392		else
393			fsopt->flags &= ~CEPH_MOUNT_OPT_DCACHE;
394		break;
395	case Opt_ino32:
396		if (!result.negated)
397			fsopt->flags |= CEPH_MOUNT_OPT_INO32;
398		else
399			fsopt->flags &= ~CEPH_MOUNT_OPT_INO32;
400		break;
401
402	case Opt_fscache:
403#ifdef CONFIG_CEPH_FSCACHE
404		kfree(fsopt->fscache_uniq);
405		fsopt->fscache_uniq = NULL;
406		if (result.negated) {
407			fsopt->flags &= ~CEPH_MOUNT_OPT_FSCACHE;
408		} else {
409			fsopt->flags |= CEPH_MOUNT_OPT_FSCACHE;
410			fsopt->fscache_uniq = param->string;
411			param->string = NULL;
412		}
413		break;
414#else
415		return invalfc(fc, "fscache support is disabled");
416#endif
417	case Opt_poolperm:
418		if (!result.negated)
419			fsopt->flags &= ~CEPH_MOUNT_OPT_NOPOOLPERM;
420		else
421			fsopt->flags |= CEPH_MOUNT_OPT_NOPOOLPERM;
422		break;
423	case Opt_require_active_mds:
424		if (!result.negated)
425			fsopt->flags &= ~CEPH_MOUNT_OPT_MOUNTWAIT;
426		else
427			fsopt->flags |= CEPH_MOUNT_OPT_MOUNTWAIT;
428		break;
429	case Opt_quotadf:
430		if (!result.negated)
431			fsopt->flags &= ~CEPH_MOUNT_OPT_NOQUOTADF;
432		else
433			fsopt->flags |= CEPH_MOUNT_OPT_NOQUOTADF;
434		break;
435	case Opt_copyfrom:
436		if (!result.negated)
437			fsopt->flags &= ~CEPH_MOUNT_OPT_NOCOPYFROM;
438		else
439			fsopt->flags |= CEPH_MOUNT_OPT_NOCOPYFROM;
440		break;
441	case Opt_acl:
442		if (!result.negated) {
443#ifdef CONFIG_CEPH_FS_POSIX_ACL
444			fc->sb_flags |= SB_POSIXACL;
445#else
446			return invalfc(fc, "POSIX ACL support is disabled");
447#endif
448		} else {
449			fc->sb_flags &= ~SB_POSIXACL;
450		}
451		break;
452	case Opt_wsync:
453		if (!result.negated)
454			fsopt->flags &= ~CEPH_MOUNT_OPT_ASYNC_DIROPS;
455		else
456			fsopt->flags |= CEPH_MOUNT_OPT_ASYNC_DIROPS;
457		break;
458	default:
459		BUG();
460	}
461	return 0;
462
463out_of_range:
464	return invalfc(fc, "%s out of range", param->key);
465}
466
467static void destroy_mount_options(struct ceph_mount_options *args)
468{
469	dout("destroy_mount_options %p\n", args);
470	if (!args)
471		return;
472
473	kfree(args->snapdir_name);
474	kfree(args->mds_namespace);
475	kfree(args->server_path);
476	kfree(args->fscache_uniq);
477	kfree(args);
478}
479
480static int strcmp_null(const char *s1, const char *s2)
481{
482	if (!s1 && !s2)
483		return 0;
484	if (s1 && !s2)
485		return -1;
486	if (!s1 && s2)
487		return 1;
488	return strcmp(s1, s2);
489}
490
491static int compare_mount_options(struct ceph_mount_options *new_fsopt,
492				 struct ceph_options *new_opt,
493				 struct ceph_fs_client *fsc)
494{
495	struct ceph_mount_options *fsopt1 = new_fsopt;
496	struct ceph_mount_options *fsopt2 = fsc->mount_options;
497	int ofs = offsetof(struct ceph_mount_options, snapdir_name);
498	int ret;
499
500	ret = memcmp(fsopt1, fsopt2, ofs);
501	if (ret)
502		return ret;
503
504	ret = strcmp_null(fsopt1->snapdir_name, fsopt2->snapdir_name);
505	if (ret)
506		return ret;
507
508	ret = strcmp_null(fsopt1->mds_namespace, fsopt2->mds_namespace);
509	if (ret)
510		return ret;
511
512	ret = strcmp_null(fsopt1->server_path, fsopt2->server_path);
513	if (ret)
514		return ret;
515
516	ret = strcmp_null(fsopt1->fscache_uniq, fsopt2->fscache_uniq);
517	if (ret)
518		return ret;
519
520	return ceph_compare_options(new_opt, fsc->client);
521}
522
523/**
524 * ceph_show_options - Show mount options in /proc/mounts
525 * @m: seq_file to write to
526 * @root: root of that (sub)tree
527 */
528static int ceph_show_options(struct seq_file *m, struct dentry *root)
529{
530	struct ceph_fs_client *fsc = ceph_sb_to_client(root->d_sb);
531	struct ceph_mount_options *fsopt = fsc->mount_options;
532	size_t pos;
533	int ret;
534
535	/* a comma between MNT/MS and client options */
536	seq_putc(m, ',');
537	pos = m->count;
538
539	ret = ceph_print_client_options(m, fsc->client, false);
540	if (ret)
541		return ret;
542
543	/* retract our comma if no client options */
544	if (m->count == pos)
545		m->count--;
546
547	if (fsopt->flags & CEPH_MOUNT_OPT_DIRSTAT)
548		seq_puts(m, ",dirstat");
549	if ((fsopt->flags & CEPH_MOUNT_OPT_RBYTES))
550		seq_puts(m, ",rbytes");
551	if (fsopt->flags & CEPH_MOUNT_OPT_NOASYNCREADDIR)
552		seq_puts(m, ",noasyncreaddir");
553	if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0)
554		seq_puts(m, ",nodcache");
555	if (fsopt->flags & CEPH_MOUNT_OPT_INO32)
556		seq_puts(m, ",ino32");
557	if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) {
558		seq_show_option(m, "fsc", fsopt->fscache_uniq);
559	}
560	if (fsopt->flags & CEPH_MOUNT_OPT_NOPOOLPERM)
561		seq_puts(m, ",nopoolperm");
562	if (fsopt->flags & CEPH_MOUNT_OPT_NOQUOTADF)
563		seq_puts(m, ",noquotadf");
564
565#ifdef CONFIG_CEPH_FS_POSIX_ACL
566	if (root->d_sb->s_flags & SB_POSIXACL)
567		seq_puts(m, ",acl");
568	else
569		seq_puts(m, ",noacl");
570#endif
571
572	if ((fsopt->flags & CEPH_MOUNT_OPT_NOCOPYFROM) == 0)
573		seq_puts(m, ",copyfrom");
574
575	if (fsopt->mds_namespace)
576		seq_show_option(m, "mds_namespace", fsopt->mds_namespace);
577
578	if (fsopt->flags & CEPH_MOUNT_OPT_CLEANRECOVER)
579		seq_show_option(m, "recover_session", "clean");
580
581	if (fsopt->flags & CEPH_MOUNT_OPT_ASYNC_DIROPS)
582		seq_puts(m, ",nowsync");
583
584	if (fsopt->wsize != CEPH_MAX_WRITE_SIZE)
585		seq_printf(m, ",wsize=%u", fsopt->wsize);
586	if (fsopt->rsize != CEPH_MAX_READ_SIZE)
587		seq_printf(m, ",rsize=%u", fsopt->rsize);
588	if (fsopt->rasize != CEPH_RASIZE_DEFAULT)
589		seq_printf(m, ",rasize=%u", fsopt->rasize);
590	if (fsopt->congestion_kb != default_congestion_kb())
591		seq_printf(m, ",write_congestion_kb=%u", fsopt->congestion_kb);
592	if (fsopt->caps_max)
593		seq_printf(m, ",caps_max=%d", fsopt->caps_max);
594	if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT)
595		seq_printf(m, ",caps_wanted_delay_min=%u",
596			 fsopt->caps_wanted_delay_min);
597	if (fsopt->caps_wanted_delay_max != CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT)
598		seq_printf(m, ",caps_wanted_delay_max=%u",
599			   fsopt->caps_wanted_delay_max);
600	if (fsopt->max_readdir != CEPH_MAX_READDIR_DEFAULT)
601		seq_printf(m, ",readdir_max_entries=%u", fsopt->max_readdir);
602	if (fsopt->max_readdir_bytes != CEPH_MAX_READDIR_BYTES_DEFAULT)
603		seq_printf(m, ",readdir_max_bytes=%u", fsopt->max_readdir_bytes);
604	if (strcmp(fsopt->snapdir_name, CEPH_SNAPDIRNAME_DEFAULT))
605		seq_show_option(m, "snapdirname", fsopt->snapdir_name);
606
607	return 0;
608}
609
610/*
611 * handle any mon messages the standard library doesn't understand.
612 * return error if we don't either.
613 */
614static int extra_mon_dispatch(struct ceph_client *client, struct ceph_msg *msg)
615{
616	struct ceph_fs_client *fsc = client->private;
617	int type = le16_to_cpu(msg->hdr.type);
618
619	switch (type) {
620	case CEPH_MSG_MDS_MAP:
621		ceph_mdsc_handle_mdsmap(fsc->mdsc, msg);
622		return 0;
623	case CEPH_MSG_FS_MAP_USER:
624		ceph_mdsc_handle_fsmap(fsc->mdsc, msg);
625		return 0;
626	default:
627		return -1;
628	}
629}
630
631/*
632 * create a new fs client
633 *
634 * Success or not, this function consumes @fsopt and @opt.
635 */
636static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
637					struct ceph_options *opt)
638{
639	struct ceph_fs_client *fsc;
640	int err;
641
642	fsc = kzalloc(sizeof(*fsc), GFP_KERNEL);
643	if (!fsc) {
644		err = -ENOMEM;
645		goto fail;
646	}
647
648	fsc->client = ceph_create_client(opt, fsc);
649	if (IS_ERR(fsc->client)) {
650		err = PTR_ERR(fsc->client);
651		goto fail;
652	}
653	opt = NULL; /* fsc->client now owns this */
654
655	fsc->client->extra_mon_dispatch = extra_mon_dispatch;
656	ceph_set_opt(fsc->client, ABORT_ON_FULL);
657
658	if (!fsopt->mds_namespace) {
659		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
660				   0, true);
661	} else {
662		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_FSMAP,
663				   0, false);
664	}
665
666	fsc->mount_options = fsopt;
667
668	fsc->sb = NULL;
669	fsc->mount_state = CEPH_MOUNT_MOUNTING;
670	fsc->filp_gen = 1;
671	fsc->have_copy_from2 = true;
672
673	atomic_long_set(&fsc->writeback_count, 0);
674
675	err = -ENOMEM;
676	/*
677	 * The number of concurrent works can be high but they don't need
678	 * to be processed in parallel, limit concurrency.
679	 */
680	fsc->inode_wq = alloc_workqueue("ceph-inode", WQ_UNBOUND, 0);
681	if (!fsc->inode_wq)
682		goto fail_client;
683	fsc->cap_wq = alloc_workqueue("ceph-cap", 0, 1);
684	if (!fsc->cap_wq)
685		goto fail_inode_wq;
686
687	spin_lock(&ceph_fsc_lock);
688	list_add_tail(&fsc->metric_wakeup, &ceph_fsc_list);
689	spin_unlock(&ceph_fsc_lock);
690
691	return fsc;
692
693fail_inode_wq:
694	destroy_workqueue(fsc->inode_wq);
695fail_client:
696	ceph_destroy_client(fsc->client);
697fail:
698	kfree(fsc);
699	if (opt)
700		ceph_destroy_options(opt);
701	destroy_mount_options(fsopt);
702	return ERR_PTR(err);
703}
704
705static void flush_fs_workqueues(struct ceph_fs_client *fsc)
706{
707	flush_workqueue(fsc->inode_wq);
708	flush_workqueue(fsc->cap_wq);
709}
710
711static void destroy_fs_client(struct ceph_fs_client *fsc)
712{
713	dout("destroy_fs_client %p\n", fsc);
714
715	spin_lock(&ceph_fsc_lock);
716	list_del(&fsc->metric_wakeup);
717	spin_unlock(&ceph_fsc_lock);
718
719	ceph_mdsc_destroy(fsc);
720	destroy_workqueue(fsc->inode_wq);
721	destroy_workqueue(fsc->cap_wq);
722
723	destroy_mount_options(fsc->mount_options);
724
725	ceph_destroy_client(fsc->client);
726
727	kfree(fsc);
728	dout("destroy_fs_client %p done\n", fsc);
729}
730
731/*
732 * caches
733 */
734struct kmem_cache *ceph_inode_cachep;
735struct kmem_cache *ceph_cap_cachep;
736struct kmem_cache *ceph_cap_flush_cachep;
737struct kmem_cache *ceph_dentry_cachep;
738struct kmem_cache *ceph_file_cachep;
739struct kmem_cache *ceph_dir_file_cachep;
740struct kmem_cache *ceph_mds_request_cachep;
741mempool_t *ceph_wb_pagevec_pool;
742
743static void ceph_inode_init_once(void *foo)
744{
745	struct ceph_inode_info *ci = foo;
746	inode_init_once(&ci->vfs_inode);
747}
748
749static int __init init_caches(void)
750{
751	int error = -ENOMEM;
752
753	ceph_inode_cachep = kmem_cache_create("ceph_inode_info",
754				      sizeof(struct ceph_inode_info),
755				      __alignof__(struct ceph_inode_info),
756				      SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD|
757				      SLAB_ACCOUNT, ceph_inode_init_once);
758	if (!ceph_inode_cachep)
759		return -ENOMEM;
760
761	ceph_cap_cachep = KMEM_CACHE(ceph_cap, SLAB_MEM_SPREAD);
762	if (!ceph_cap_cachep)
763		goto bad_cap;
764	ceph_cap_flush_cachep = KMEM_CACHE(ceph_cap_flush,
765					   SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
766	if (!ceph_cap_flush_cachep)
767		goto bad_cap_flush;
768
769	ceph_dentry_cachep = KMEM_CACHE(ceph_dentry_info,
770					SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
771	if (!ceph_dentry_cachep)
772		goto bad_dentry;
773
774	ceph_file_cachep = KMEM_CACHE(ceph_file_info, SLAB_MEM_SPREAD);
775	if (!ceph_file_cachep)
776		goto bad_file;
777
778	ceph_dir_file_cachep = KMEM_CACHE(ceph_dir_file_info, SLAB_MEM_SPREAD);
779	if (!ceph_dir_file_cachep)
780		goto bad_dir_file;
781
782	ceph_mds_request_cachep = KMEM_CACHE(ceph_mds_request, SLAB_MEM_SPREAD);
783	if (!ceph_mds_request_cachep)
784		goto bad_mds_req;
785
786	ceph_wb_pagevec_pool = mempool_create_kmalloc_pool(10, CEPH_MAX_WRITE_SIZE >> PAGE_SHIFT);
787	if (!ceph_wb_pagevec_pool)
788		goto bad_pagevec_pool;
789
790	error = ceph_fscache_register();
791	if (error)
792		goto bad_fscache;
793
794	return 0;
795
796bad_fscache:
797	kmem_cache_destroy(ceph_mds_request_cachep);
798bad_pagevec_pool:
799	mempool_destroy(ceph_wb_pagevec_pool);
800bad_mds_req:
801	kmem_cache_destroy(ceph_dir_file_cachep);
802bad_dir_file:
803	kmem_cache_destroy(ceph_file_cachep);
804bad_file:
805	kmem_cache_destroy(ceph_dentry_cachep);
806bad_dentry:
807	kmem_cache_destroy(ceph_cap_flush_cachep);
808bad_cap_flush:
809	kmem_cache_destroy(ceph_cap_cachep);
810bad_cap:
811	kmem_cache_destroy(ceph_inode_cachep);
812	return error;
813}
814
815static void destroy_caches(void)
816{
817	/*
818	 * Make sure all delayed rcu free inodes are flushed before we
819	 * destroy cache.
820	 */
821	rcu_barrier();
822
823	kmem_cache_destroy(ceph_inode_cachep);
824	kmem_cache_destroy(ceph_cap_cachep);
825	kmem_cache_destroy(ceph_cap_flush_cachep);
826	kmem_cache_destroy(ceph_dentry_cachep);
827	kmem_cache_destroy(ceph_file_cachep);
828	kmem_cache_destroy(ceph_dir_file_cachep);
829	kmem_cache_destroy(ceph_mds_request_cachep);
830	mempool_destroy(ceph_wb_pagevec_pool);
831
832	ceph_fscache_unregister();
833}
834
835/*
836 * ceph_umount_begin - initiate forced umount.  Tear down the
837 * mount, skipping steps that may hang while waiting for server(s).
838 */
839static void ceph_umount_begin(struct super_block *sb)
840{
841	struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
842
843	dout("ceph_umount_begin - starting forced umount\n");
844	if (!fsc)
845		return;
846	fsc->mount_state = CEPH_MOUNT_SHUTDOWN;
847	ceph_osdc_abort_requests(&fsc->client->osdc, -EIO);
848	ceph_mdsc_force_umount(fsc->mdsc);
849	fsc->filp_gen++; // invalidate open files
850}
851
852static const struct super_operations ceph_super_ops = {
853	.alloc_inode	= ceph_alloc_inode,
854	.free_inode	= ceph_free_inode,
855	.write_inode    = ceph_write_inode,
856	.drop_inode	= generic_delete_inode,
857	.evict_inode	= ceph_evict_inode,
858	.sync_fs        = ceph_sync_fs,
859	.put_super	= ceph_put_super,
860	.show_options   = ceph_show_options,
861	.statfs		= ceph_statfs,
862	.umount_begin   = ceph_umount_begin,
863};
864
865/*
866 * Bootstrap mount by opening the root directory.  Note the mount
867 * @started time from caller, and time out if this takes too long.
868 */
869static struct dentry *open_root_dentry(struct ceph_fs_client *fsc,
870				       const char *path,
871				       unsigned long started)
872{
873	struct ceph_mds_client *mdsc = fsc->mdsc;
874	struct ceph_mds_request *req = NULL;
875	int err;
876	struct dentry *root;
877
878	/* open dir */
879	dout("open_root_inode opening '%s'\n", path);
880	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS);
881	if (IS_ERR(req))
882		return ERR_CAST(req);
883	req->r_path1 = kstrdup(path, GFP_NOFS);
884	if (!req->r_path1) {
885		root = ERR_PTR(-ENOMEM);
886		goto out;
887	}
888
889	req->r_ino1.ino = CEPH_INO_ROOT;
890	req->r_ino1.snap = CEPH_NOSNAP;
891	req->r_started = started;
892	req->r_timeout = fsc->client->options->mount_timeout;
893	req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE);
894	req->r_num_caps = 2;
895	err = ceph_mdsc_do_request(mdsc, NULL, req);
896	if (err == 0) {
897		struct inode *inode = req->r_target_inode;
898		req->r_target_inode = NULL;
899		dout("open_root_inode success\n");
900		root = d_make_root(inode);
901		if (!root) {
902			root = ERR_PTR(-ENOMEM);
903			goto out;
904		}
905		dout("open_root_inode success, root dentry is %p\n", root);
906	} else {
907		root = ERR_PTR(err);
908	}
909out:
910	ceph_mdsc_put_request(req);
911	return root;
912}
913
914/*
915 * mount: join the ceph cluster, and open root directory.
916 */
917static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
918				      struct fs_context *fc)
919{
920	int err;
921	unsigned long started = jiffies;  /* note the start time */
922	struct dentry *root;
923
924	dout("mount start %p\n", fsc);
925	mutex_lock(&fsc->client->mount_mutex);
926
927	if (!fsc->sb->s_root) {
928		const char *path = fsc->mount_options->server_path ?
929				     fsc->mount_options->server_path + 1 : "";
930
931		err = __ceph_open_session(fsc->client, started);
932		if (err < 0)
933			goto out;
934
935		/* setup fscache */
936		if (fsc->mount_options->flags & CEPH_MOUNT_OPT_FSCACHE) {
937			err = ceph_fscache_register_fs(fsc, fc);
938			if (err < 0)
939				goto out;
940		}
941
942		dout("mount opening path '%s'\n", path);
943
944		ceph_fs_debugfs_init(fsc);
945
946		root = open_root_dentry(fsc, path, started);
947		if (IS_ERR(root)) {
948			err = PTR_ERR(root);
949			goto out;
950		}
951		fsc->sb->s_root = dget(root);
952	} else {
953		root = dget(fsc->sb->s_root);
954	}
955
956	fsc->mount_state = CEPH_MOUNT_MOUNTED;
957	dout("mount success\n");
958	mutex_unlock(&fsc->client->mount_mutex);
959	return root;
960
961out:
962	mutex_unlock(&fsc->client->mount_mutex);
963	return ERR_PTR(err);
964}
965
966static int ceph_set_super(struct super_block *s, struct fs_context *fc)
967{
968	struct ceph_fs_client *fsc = s->s_fs_info;
969	int ret;
970
971	dout("set_super %p\n", s);
972
973	s->s_maxbytes = MAX_LFS_FILESIZE;
974
975	s->s_xattr = ceph_xattr_handlers;
976	fsc->sb = s;
977	fsc->max_file_size = 1ULL << 40; /* temp value until we get mdsmap */
978
979	s->s_op = &ceph_super_ops;
980	s->s_d_op = &ceph_dentry_ops;
981	s->s_export_op = &ceph_export_ops;
982
983	s->s_time_gran = 1;
984	s->s_time_min = 0;
985	s->s_time_max = U32_MAX;
986
987	ret = set_anon_super_fc(s, fc);
988	if (ret != 0)
989		fsc->sb = NULL;
990	return ret;
991}
992
993/*
994 * share superblock if same fs AND options
995 */
996static int ceph_compare_super(struct super_block *sb, struct fs_context *fc)
997{
998	struct ceph_fs_client *new = fc->s_fs_info;
999	struct ceph_mount_options *fsopt = new->mount_options;
1000	struct ceph_options *opt = new->client->options;
1001	struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
1002
1003	dout("ceph_compare_super %p\n", sb);
1004
1005	if (compare_mount_options(fsopt, opt, fsc)) {
1006		dout("monitor(s)/mount options don't match\n");
1007		return 0;
1008	}
1009	if ((opt->flags & CEPH_OPT_FSID) &&
1010	    ceph_fsid_compare(&opt->fsid, &fsc->client->fsid)) {
1011		dout("fsid doesn't match\n");
1012		return 0;
1013	}
1014	if (fc->sb_flags != (sb->s_flags & ~SB_BORN)) {
1015		dout("flags differ\n");
1016		return 0;
1017	}
1018
1019	if (fsc->blocklisted && !ceph_test_mount_opt(fsc, CLEANRECOVER)) {
1020		dout("client is blocklisted (and CLEANRECOVER is not set)\n");
1021		return 0;
1022	}
1023
1024	if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) {
1025		dout("client has been forcibly unmounted\n");
1026		return 0;
1027	}
1028
1029	return 1;
1030}
1031
1032/*
1033 * construct our own bdi so we can control readahead, etc.
1034 */
1035static atomic_long_t bdi_seq = ATOMIC_LONG_INIT(0);
1036
1037static int ceph_setup_bdi(struct super_block *sb, struct ceph_fs_client *fsc)
1038{
1039	int err;
1040
1041	err = super_setup_bdi_name(sb, "ceph-%ld",
1042				   atomic_long_inc_return(&bdi_seq));
1043	if (err)
1044		return err;
1045
1046	/* set ra_pages based on rasize mount option? */
1047	sb->s_bdi->ra_pages = fsc->mount_options->rasize >> PAGE_SHIFT;
1048
1049	/* set io_pages based on max osd read size */
1050	sb->s_bdi->io_pages = fsc->mount_options->rsize >> PAGE_SHIFT;
1051
1052	return 0;
1053}
1054
1055static int ceph_get_tree(struct fs_context *fc)
1056{
1057	struct ceph_parse_opts_ctx *pctx = fc->fs_private;
1058	struct super_block *sb;
1059	struct ceph_fs_client *fsc;
1060	struct dentry *res;
1061	int (*compare_super)(struct super_block *, struct fs_context *) =
1062		ceph_compare_super;
1063	int err;
1064
1065	dout("ceph_get_tree\n");
1066
1067	if (!fc->source)
1068		return invalfc(fc, "No source");
1069
1070	/* create client (which we may/may not use) */
1071	fsc = create_fs_client(pctx->opts, pctx->copts);
1072	pctx->opts = NULL;
1073	pctx->copts = NULL;
1074	if (IS_ERR(fsc)) {
1075		err = PTR_ERR(fsc);
1076		goto out_final;
1077	}
1078
1079	err = ceph_mdsc_init(fsc);
1080	if (err < 0)
1081		goto out;
1082
1083	if (ceph_test_opt(fsc->client, NOSHARE))
1084		compare_super = NULL;
1085
1086	fc->s_fs_info = fsc;
1087	sb = sget_fc(fc, compare_super, ceph_set_super);
1088	fc->s_fs_info = NULL;
1089	if (IS_ERR(sb)) {
1090		err = PTR_ERR(sb);
1091		goto out;
1092	}
1093
1094	if (ceph_sb_to_client(sb) != fsc) {
1095		destroy_fs_client(fsc);
1096		fsc = ceph_sb_to_client(sb);
1097		dout("get_sb got existing client %p\n", fsc);
1098	} else {
1099		dout("get_sb using new client %p\n", fsc);
1100		err = ceph_setup_bdi(sb, fsc);
1101		if (err < 0)
1102			goto out_splat;
1103	}
1104
1105	res = ceph_real_mount(fsc, fc);
1106	if (IS_ERR(res)) {
1107		err = PTR_ERR(res);
1108		goto out_splat;
1109	}
1110	dout("root %p inode %p ino %llx.%llx\n", res,
1111	     d_inode(res), ceph_vinop(d_inode(res)));
1112	fc->root = fsc->sb->s_root;
1113	return 0;
1114
1115out_splat:
1116	if (!ceph_mdsmap_is_cluster_available(fsc->mdsc->mdsmap)) {
1117		pr_info("No mds server is up or the cluster is laggy\n");
1118		err = -EHOSTUNREACH;
1119	}
1120
1121	ceph_mdsc_close_sessions(fsc->mdsc);
1122	deactivate_locked_super(sb);
1123	goto out_final;
1124
1125out:
1126	destroy_fs_client(fsc);
1127out_final:
1128	dout("ceph_get_tree fail %d\n", err);
1129	return err;
1130}
1131
1132static void ceph_free_fc(struct fs_context *fc)
1133{
1134	struct ceph_parse_opts_ctx *pctx = fc->fs_private;
1135
1136	if (pctx) {
1137		destroy_mount_options(pctx->opts);
1138		ceph_destroy_options(pctx->copts);
1139		kfree(pctx);
1140	}
1141}
1142
1143static int ceph_reconfigure_fc(struct fs_context *fc)
1144{
1145	struct ceph_parse_opts_ctx *pctx = fc->fs_private;
1146	struct ceph_mount_options *fsopt = pctx->opts;
1147	struct ceph_fs_client *fsc = ceph_sb_to_client(fc->root->d_sb);
1148
1149	if (fsopt->flags & CEPH_MOUNT_OPT_ASYNC_DIROPS)
1150		ceph_set_mount_opt(fsc, ASYNC_DIROPS);
1151	else
1152		ceph_clear_mount_opt(fsc, ASYNC_DIROPS);
1153
1154	sync_filesystem(fc->root->d_sb);
1155	return 0;
1156}
1157
1158static const struct fs_context_operations ceph_context_ops = {
1159	.free		= ceph_free_fc,
1160	.parse_param	= ceph_parse_mount_param,
1161	.get_tree	= ceph_get_tree,
1162	.reconfigure	= ceph_reconfigure_fc,
1163};
1164
1165/*
1166 * Set up the filesystem mount context.
1167 */
1168static int ceph_init_fs_context(struct fs_context *fc)
1169{
1170	struct ceph_parse_opts_ctx *pctx;
1171	struct ceph_mount_options *fsopt;
1172
1173	pctx = kzalloc(sizeof(*pctx), GFP_KERNEL);
1174	if (!pctx)
1175		return -ENOMEM;
1176
1177	pctx->copts = ceph_alloc_options();
1178	if (!pctx->copts)
1179		goto nomem;
1180
1181	pctx->opts = kzalloc(sizeof(*pctx->opts), GFP_KERNEL);
1182	if (!pctx->opts)
1183		goto nomem;
1184
1185	fsopt = pctx->opts;
1186	fsopt->flags = CEPH_MOUNT_OPT_DEFAULT;
1187
1188	fsopt->wsize = CEPH_MAX_WRITE_SIZE;
1189	fsopt->rsize = CEPH_MAX_READ_SIZE;
1190	fsopt->rasize = CEPH_RASIZE_DEFAULT;
1191	fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
1192	if (!fsopt->snapdir_name)
1193		goto nomem;
1194
1195	fsopt->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
1196	fsopt->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
1197	fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
1198	fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
1199	fsopt->congestion_kb = default_congestion_kb();
1200
1201#ifdef CONFIG_CEPH_FS_POSIX_ACL
1202	fc->sb_flags |= SB_POSIXACL;
1203#endif
1204
1205	fc->fs_private = pctx;
1206	fc->ops = &ceph_context_ops;
1207	return 0;
1208
1209nomem:
1210	destroy_mount_options(pctx->opts);
1211	ceph_destroy_options(pctx->copts);
1212	kfree(pctx);
1213	return -ENOMEM;
1214}
1215
1216static void ceph_kill_sb(struct super_block *s)
1217{
1218	struct ceph_fs_client *fsc = ceph_sb_to_client(s);
1219
1220	dout("kill_sb %p\n", s);
1221
1222	ceph_mdsc_pre_umount(fsc->mdsc);
1223	flush_fs_workqueues(fsc);
1224
1225	/*
1226	 * Though the kill_anon_super() will finally trigger the
1227	 * sync_filesystem() anyway, we still need to do it here
1228	 * and then bump the stage of shutdown to stop the work
1229	 * queue as earlier as possible.
1230	 */
1231	sync_filesystem(s);
1232
1233	fsc->mdsc->stopping = CEPH_MDSC_STOPPING_FLUSHED;
1234
1235	kill_anon_super(s);
1236
1237	fsc->client->extra_mon_dispatch = NULL;
1238	ceph_fs_debugfs_cleanup(fsc);
1239
1240	ceph_fscache_unregister_fs(fsc);
1241
1242	destroy_fs_client(fsc);
1243}
1244
1245static struct file_system_type ceph_fs_type = {
1246	.owner		= THIS_MODULE,
1247	.name		= "ceph",
1248	.init_fs_context = ceph_init_fs_context,
1249	.kill_sb	= ceph_kill_sb,
1250	.fs_flags	= FS_RENAME_DOES_D_MOVE,
1251};
1252MODULE_ALIAS_FS("ceph");
1253
1254int ceph_force_reconnect(struct super_block *sb)
1255{
1256	struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
1257	int err = 0;
1258
1259	ceph_umount_begin(sb);
1260
1261	/* Make sure all page caches get invalidated.
1262	 * see remove_session_caps_cb() */
1263	flush_workqueue(fsc->inode_wq);
1264
1265	/* In case that we were blocklisted. This also reset
1266	 * all mon/osd connections */
1267	ceph_reset_client_addr(fsc->client);
1268
1269	ceph_osdc_clear_abort_err(&fsc->client->osdc);
1270
1271	fsc->blocklisted = false;
1272	fsc->mount_state = CEPH_MOUNT_MOUNTED;
1273
1274	if (sb->s_root) {
1275		err = __ceph_do_getattr(d_inode(sb->s_root), NULL,
1276					CEPH_STAT_CAP_INODE, true);
1277	}
1278	return err;
1279}
1280
1281static int __init init_ceph(void)
1282{
1283	int ret = init_caches();
1284	if (ret)
1285		goto out;
1286
1287	ceph_flock_init();
1288	ret = register_filesystem(&ceph_fs_type);
1289	if (ret)
1290		goto out_caches;
1291
1292	pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL);
1293
1294	return 0;
1295
1296out_caches:
1297	destroy_caches();
1298out:
1299	return ret;
1300}
1301
1302static void __exit exit_ceph(void)
1303{
1304	dout("exit_ceph\n");
1305	unregister_filesystem(&ceph_fs_type);
1306	destroy_caches();
1307}
1308
1309static int param_set_metrics(const char *val, const struct kernel_param *kp)
1310{
1311	struct ceph_fs_client *fsc;
1312	int ret;
1313
1314	ret = param_set_bool(val, kp);
1315	if (ret) {
1316		pr_err("Failed to parse sending metrics switch value '%s'\n",
1317		       val);
1318		return ret;
1319	} else if (!disable_send_metrics) {
1320		// wake up all the mds clients
1321		spin_lock(&ceph_fsc_lock);
1322		list_for_each_entry(fsc, &ceph_fsc_list, metric_wakeup) {
1323			metric_schedule_delayed(&fsc->mdsc->metric);
1324		}
1325		spin_unlock(&ceph_fsc_lock);
1326	}
1327
1328	return 0;
1329}
1330
1331static const struct kernel_param_ops param_ops_metrics = {
1332	.set = param_set_metrics,
1333	.get = param_get_bool,
1334};
1335
1336bool disable_send_metrics = false;
1337module_param_cb(disable_send_metrics, &param_ops_metrics, &disable_send_metrics, 0644);
1338MODULE_PARM_DESC(disable_send_metrics, "Enable sending perf metrics to ceph cluster (default: on)");
1339
1340module_init(init_ceph);
1341module_exit(exit_ceph);
1342
1343MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
1344MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
1345MODULE_AUTHOR("Patience Warnick <patience@newdream.net>");
1346MODULE_DESCRIPTION("Ceph filesystem for Linux");
1347MODULE_LICENSE("GPL");
1348