xref: /kernel/linux/linux-5.10/fs/ceph/mds_client.c (revision 8c2ecf20)
1// SPDX-License-Identifier: GPL-2.0
2#include <linux/ceph/ceph_debug.h>
3
4#include <linux/fs.h>
5#include <linux/wait.h>
6#include <linux/slab.h>
7#include <linux/gfp.h>
8#include <linux/sched.h>
9#include <linux/debugfs.h>
10#include <linux/seq_file.h>
11#include <linux/ratelimit.h>
12#include <linux/bits.h>
13#include <linux/ktime.h>
14
15#include "super.h"
16#include "mds_client.h"
17
18#include <linux/ceph/ceph_features.h>
19#include <linux/ceph/messenger.h>
20#include <linux/ceph/decode.h>
21#include <linux/ceph/pagelist.h>
22#include <linux/ceph/auth.h>
23#include <linux/ceph/debugfs.h>
24
25#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
26
27/*
28 * A cluster of MDS (metadata server) daemons is responsible for
29 * managing the file system namespace (the directory hierarchy and
30 * inodes) and for coordinating shared access to storage.  Metadata is
31 * partitioning hierarchically across a number of servers, and that
32 * partition varies over time as the cluster adjusts the distribution
33 * in order to balance load.
34 *
35 * The MDS client is primarily responsible to managing synchronous
36 * metadata requests for operations like open, unlink, and so forth.
37 * If there is a MDS failure, we find out about it when we (possibly
38 * request and) receive a new MDS map, and can resubmit affected
39 * requests.
40 *
41 * For the most part, though, we take advantage of a lossless
42 * communications channel to the MDS, and do not need to worry about
43 * timing out or resubmitting requests.
44 *
45 * We maintain a stateful "session" with each MDS we interact with.
46 * Within each session, we sent periodic heartbeat messages to ensure
47 * any capabilities or leases we have been issues remain valid.  If
48 * the session times out and goes stale, our leases and capabilities
49 * are no longer valid.
50 */
51
52struct ceph_reconnect_state {
53	struct ceph_mds_session *session;
54	int nr_caps, nr_realms;
55	struct ceph_pagelist *pagelist;
56	unsigned msg_version;
57	bool allow_multi;
58};
59
60static void __wake_requests(struct ceph_mds_client *mdsc,
61			    struct list_head *head);
62static void ceph_cap_release_work(struct work_struct *work);
63static void ceph_cap_reclaim_work(struct work_struct *work);
64
65static const struct ceph_connection_operations mds_con_ops;
66
67
68/*
69 * mds reply parsing
70 */
71
72static int parse_reply_info_quota(void **p, void *end,
73				  struct ceph_mds_reply_info_in *info)
74{
75	u8 struct_v, struct_compat;
76	u32 struct_len;
77
78	ceph_decode_8_safe(p, end, struct_v, bad);
79	ceph_decode_8_safe(p, end, struct_compat, bad);
80	/* struct_v is expected to be >= 1. we only
81	 * understand encoding with struct_compat == 1. */
82	if (!struct_v || struct_compat != 1)
83		goto bad;
84	ceph_decode_32_safe(p, end, struct_len, bad);
85	ceph_decode_need(p, end, struct_len, bad);
86	end = *p + struct_len;
87	ceph_decode_64_safe(p, end, info->max_bytes, bad);
88	ceph_decode_64_safe(p, end, info->max_files, bad);
89	*p = end;
90	return 0;
91bad:
92	return -EIO;
93}
94
95/*
96 * parse individual inode info
97 */
98static int parse_reply_info_in(void **p, void *end,
99			       struct ceph_mds_reply_info_in *info,
100			       u64 features)
101{
102	int err = 0;
103	u8 struct_v = 0;
104
105	if (features == (u64)-1) {
106		u32 struct_len;
107		u8 struct_compat;
108		ceph_decode_8_safe(p, end, struct_v, bad);
109		ceph_decode_8_safe(p, end, struct_compat, bad);
110		/* struct_v is expected to be >= 1. we only understand
111		 * encoding with struct_compat == 1. */
112		if (!struct_v || struct_compat != 1)
113			goto bad;
114		ceph_decode_32_safe(p, end, struct_len, bad);
115		ceph_decode_need(p, end, struct_len, bad);
116		end = *p + struct_len;
117	}
118
119	ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
120	info->in = *p;
121	*p += sizeof(struct ceph_mds_reply_inode) +
122		sizeof(*info->in->fragtree.splits) *
123		le32_to_cpu(info->in->fragtree.nsplits);
124
125	ceph_decode_32_safe(p, end, info->symlink_len, bad);
126	ceph_decode_need(p, end, info->symlink_len, bad);
127	info->symlink = *p;
128	*p += info->symlink_len;
129
130	ceph_decode_copy_safe(p, end, &info->dir_layout,
131			      sizeof(info->dir_layout), bad);
132	ceph_decode_32_safe(p, end, info->xattr_len, bad);
133	ceph_decode_need(p, end, info->xattr_len, bad);
134	info->xattr_data = *p;
135	*p += info->xattr_len;
136
137	if (features == (u64)-1) {
138		/* inline data */
139		ceph_decode_64_safe(p, end, info->inline_version, bad);
140		ceph_decode_32_safe(p, end, info->inline_len, bad);
141		ceph_decode_need(p, end, info->inline_len, bad);
142		info->inline_data = *p;
143		*p += info->inline_len;
144		/* quota */
145		err = parse_reply_info_quota(p, end, info);
146		if (err < 0)
147			goto out_bad;
148		/* pool namespace */
149		ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
150		if (info->pool_ns_len > 0) {
151			ceph_decode_need(p, end, info->pool_ns_len, bad);
152			info->pool_ns_data = *p;
153			*p += info->pool_ns_len;
154		}
155
156		/* btime */
157		ceph_decode_need(p, end, sizeof(info->btime), bad);
158		ceph_decode_copy(p, &info->btime, sizeof(info->btime));
159
160		/* change attribute */
161		ceph_decode_64_safe(p, end, info->change_attr, bad);
162
163		/* dir pin */
164		if (struct_v >= 2) {
165			ceph_decode_32_safe(p, end, info->dir_pin, bad);
166		} else {
167			info->dir_pin = -ENODATA;
168		}
169
170		/* snapshot birth time, remains zero for v<=2 */
171		if (struct_v >= 3) {
172			ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
173			ceph_decode_copy(p, &info->snap_btime,
174					 sizeof(info->snap_btime));
175		} else {
176			memset(&info->snap_btime, 0, sizeof(info->snap_btime));
177		}
178
179		*p = end;
180	} else {
181		if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
182			ceph_decode_64_safe(p, end, info->inline_version, bad);
183			ceph_decode_32_safe(p, end, info->inline_len, bad);
184			ceph_decode_need(p, end, info->inline_len, bad);
185			info->inline_data = *p;
186			*p += info->inline_len;
187		} else
188			info->inline_version = CEPH_INLINE_NONE;
189
190		if (features & CEPH_FEATURE_MDS_QUOTA) {
191			err = parse_reply_info_quota(p, end, info);
192			if (err < 0)
193				goto out_bad;
194		} else {
195			info->max_bytes = 0;
196			info->max_files = 0;
197		}
198
199		info->pool_ns_len = 0;
200		info->pool_ns_data = NULL;
201		if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
202			ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
203			if (info->pool_ns_len > 0) {
204				ceph_decode_need(p, end, info->pool_ns_len, bad);
205				info->pool_ns_data = *p;
206				*p += info->pool_ns_len;
207			}
208		}
209
210		if (features & CEPH_FEATURE_FS_BTIME) {
211			ceph_decode_need(p, end, sizeof(info->btime), bad);
212			ceph_decode_copy(p, &info->btime, sizeof(info->btime));
213			ceph_decode_64_safe(p, end, info->change_attr, bad);
214		}
215
216		info->dir_pin = -ENODATA;
217		/* info->snap_btime remains zero */
218	}
219	return 0;
220bad:
221	err = -EIO;
222out_bad:
223	return err;
224}
225
226static int parse_reply_info_dir(void **p, void *end,
227				struct ceph_mds_reply_dirfrag **dirfrag,
228				u64 features)
229{
230	if (features == (u64)-1) {
231		u8 struct_v, struct_compat;
232		u32 struct_len;
233		ceph_decode_8_safe(p, end, struct_v, bad);
234		ceph_decode_8_safe(p, end, struct_compat, bad);
235		/* struct_v is expected to be >= 1. we only understand
236		 * encoding whose struct_compat == 1. */
237		if (!struct_v || struct_compat != 1)
238			goto bad;
239		ceph_decode_32_safe(p, end, struct_len, bad);
240		ceph_decode_need(p, end, struct_len, bad);
241		end = *p + struct_len;
242	}
243
244	ceph_decode_need(p, end, sizeof(**dirfrag), bad);
245	*dirfrag = *p;
246	*p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
247	if (unlikely(*p > end))
248		goto bad;
249	if (features == (u64)-1)
250		*p = end;
251	return 0;
252bad:
253	return -EIO;
254}
255
256static int parse_reply_info_lease(void **p, void *end,
257				  struct ceph_mds_reply_lease **lease,
258				  u64 features)
259{
260	if (features == (u64)-1) {
261		u8 struct_v, struct_compat;
262		u32 struct_len;
263		ceph_decode_8_safe(p, end, struct_v, bad);
264		ceph_decode_8_safe(p, end, struct_compat, bad);
265		/* struct_v is expected to be >= 1. we only understand
266		 * encoding whose struct_compat == 1. */
267		if (!struct_v || struct_compat != 1)
268			goto bad;
269		ceph_decode_32_safe(p, end, struct_len, bad);
270		ceph_decode_need(p, end, struct_len, bad);
271		end = *p + struct_len;
272	}
273
274	ceph_decode_need(p, end, sizeof(**lease), bad);
275	*lease = *p;
276	*p += sizeof(**lease);
277	if (features == (u64)-1)
278		*p = end;
279	return 0;
280bad:
281	return -EIO;
282}
283
284/*
285 * parse a normal reply, which may contain a (dir+)dentry and/or a
286 * target inode.
287 */
288static int parse_reply_info_trace(void **p, void *end,
289				  struct ceph_mds_reply_info_parsed *info,
290				  u64 features)
291{
292	int err;
293
294	if (info->head->is_dentry) {
295		err = parse_reply_info_in(p, end, &info->diri, features);
296		if (err < 0)
297			goto out_bad;
298
299		err = parse_reply_info_dir(p, end, &info->dirfrag, features);
300		if (err < 0)
301			goto out_bad;
302
303		ceph_decode_32_safe(p, end, info->dname_len, bad);
304		ceph_decode_need(p, end, info->dname_len, bad);
305		info->dname = *p;
306		*p += info->dname_len;
307
308		err = parse_reply_info_lease(p, end, &info->dlease, features);
309		if (err < 0)
310			goto out_bad;
311	}
312
313	if (info->head->is_target) {
314		err = parse_reply_info_in(p, end, &info->targeti, features);
315		if (err < 0)
316			goto out_bad;
317	}
318
319	if (unlikely(*p != end))
320		goto bad;
321	return 0;
322
323bad:
324	err = -EIO;
325out_bad:
326	pr_err("problem parsing mds trace %d\n", err);
327	return err;
328}
329
330/*
331 * parse readdir results
332 */
333static int parse_reply_info_readdir(void **p, void *end,
334				struct ceph_mds_reply_info_parsed *info,
335				u64 features)
336{
337	u32 num, i = 0;
338	int err;
339
340	err = parse_reply_info_dir(p, end, &info->dir_dir, features);
341	if (err < 0)
342		goto out_bad;
343
344	ceph_decode_need(p, end, sizeof(num) + 2, bad);
345	num = ceph_decode_32(p);
346	{
347		u16 flags = ceph_decode_16(p);
348		info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
349		info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
350		info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
351		info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
352	}
353	if (num == 0)
354		goto done;
355
356	BUG_ON(!info->dir_entries);
357	if ((unsigned long)(info->dir_entries + num) >
358	    (unsigned long)info->dir_entries + info->dir_buf_size) {
359		pr_err("dir contents are larger than expected\n");
360		WARN_ON(1);
361		goto bad;
362	}
363
364	info->dir_nr = num;
365	while (num) {
366		struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
367		/* dentry */
368		ceph_decode_32_safe(p, end, rde->name_len, bad);
369		ceph_decode_need(p, end, rde->name_len, bad);
370		rde->name = *p;
371		*p += rde->name_len;
372		dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
373
374		/* dentry lease */
375		err = parse_reply_info_lease(p, end, &rde->lease, features);
376		if (err)
377			goto out_bad;
378		/* inode */
379		err = parse_reply_info_in(p, end, &rde->inode, features);
380		if (err < 0)
381			goto out_bad;
382		/* ceph_readdir_prepopulate() will update it */
383		rde->offset = 0;
384		i++;
385		num--;
386	}
387
388done:
389	/* Skip over any unrecognized fields */
390	*p = end;
391	return 0;
392
393bad:
394	err = -EIO;
395out_bad:
396	pr_err("problem parsing dir contents %d\n", err);
397	return err;
398}
399
400/*
401 * parse fcntl F_GETLK results
402 */
403static int parse_reply_info_filelock(void **p, void *end,
404				     struct ceph_mds_reply_info_parsed *info,
405				     u64 features)
406{
407	if (*p + sizeof(*info->filelock_reply) > end)
408		goto bad;
409
410	info->filelock_reply = *p;
411
412	/* Skip over any unrecognized fields */
413	*p = end;
414	return 0;
415bad:
416	return -EIO;
417}
418
419
420#if BITS_PER_LONG == 64
421
422#define DELEGATED_INO_AVAILABLE		xa_mk_value(1)
423
424static int ceph_parse_deleg_inos(void **p, void *end,
425				 struct ceph_mds_session *s)
426{
427	u32 sets;
428
429	ceph_decode_32_safe(p, end, sets, bad);
430	dout("got %u sets of delegated inodes\n", sets);
431	while (sets--) {
432		u64 start, len, ino;
433
434		ceph_decode_64_safe(p, end, start, bad);
435		ceph_decode_64_safe(p, end, len, bad);
436
437		/* Don't accept a delegation of system inodes */
438		if (start < CEPH_INO_SYSTEM_BASE) {
439			pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
440					start, len);
441			continue;
442		}
443		while (len--) {
444			int err = xa_insert(&s->s_delegated_inos, ino = start++,
445					    DELEGATED_INO_AVAILABLE,
446					    GFP_KERNEL);
447			if (!err) {
448				dout("added delegated inode 0x%llx\n",
449				     start - 1);
450			} else if (err == -EBUSY) {
451				pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
452					start - 1);
453			} else {
454				return err;
455			}
456		}
457	}
458	return 0;
459bad:
460	return -EIO;
461}
462
463u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
464{
465	unsigned long ino;
466	void *val;
467
468	xa_for_each(&s->s_delegated_inos, ino, val) {
469		val = xa_erase(&s->s_delegated_inos, ino);
470		if (val == DELEGATED_INO_AVAILABLE)
471			return ino;
472	}
473	return 0;
474}
475
476int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
477{
478	return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
479			 GFP_KERNEL);
480}
481#else /* BITS_PER_LONG == 64 */
482/*
483 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
484 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
485 * and bottom words?
486 */
487static int ceph_parse_deleg_inos(void **p, void *end,
488				 struct ceph_mds_session *s)
489{
490	u32 sets;
491
492	ceph_decode_32_safe(p, end, sets, bad);
493	if (sets)
494		ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
495	return 0;
496bad:
497	return -EIO;
498}
499
500u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
501{
502	return 0;
503}
504
505int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
506{
507	return 0;
508}
509#endif /* BITS_PER_LONG == 64 */
510
511/*
512 * parse create results
513 */
514static int parse_reply_info_create(void **p, void *end,
515				  struct ceph_mds_reply_info_parsed *info,
516				  u64 features, struct ceph_mds_session *s)
517{
518	int ret;
519
520	if (features == (u64)-1 ||
521	    (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
522		if (*p == end) {
523			/* Malformed reply? */
524			info->has_create_ino = false;
525		} else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
526			u8 struct_v, struct_compat;
527			u32 len;
528
529			info->has_create_ino = true;
530			ceph_decode_8_safe(p, end, struct_v, bad);
531			ceph_decode_8_safe(p, end, struct_compat, bad);
532			ceph_decode_32_safe(p, end, len, bad);
533			ceph_decode_64_safe(p, end, info->ino, bad);
534			ret = ceph_parse_deleg_inos(p, end, s);
535			if (ret)
536				return ret;
537		} else {
538			/* legacy */
539			ceph_decode_64_safe(p, end, info->ino, bad);
540			info->has_create_ino = true;
541		}
542	} else {
543		if (*p != end)
544			goto bad;
545	}
546
547	/* Skip over any unrecognized fields */
548	*p = end;
549	return 0;
550bad:
551	return -EIO;
552}
553
554/*
555 * parse extra results
556 */
557static int parse_reply_info_extra(void **p, void *end,
558				  struct ceph_mds_reply_info_parsed *info,
559				  u64 features, struct ceph_mds_session *s)
560{
561	u32 op = le32_to_cpu(info->head->op);
562
563	if (op == CEPH_MDS_OP_GETFILELOCK)
564		return parse_reply_info_filelock(p, end, info, features);
565	else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
566		return parse_reply_info_readdir(p, end, info, features);
567	else if (op == CEPH_MDS_OP_CREATE)
568		return parse_reply_info_create(p, end, info, features, s);
569	else
570		return -EIO;
571}
572
573/*
574 * parse entire mds reply
575 */
576static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
577			    struct ceph_mds_reply_info_parsed *info,
578			    u64 features)
579{
580	void *p, *end;
581	u32 len;
582	int err;
583
584	info->head = msg->front.iov_base;
585	p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
586	end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
587
588	/* trace */
589	ceph_decode_32_safe(&p, end, len, bad);
590	if (len > 0) {
591		ceph_decode_need(&p, end, len, bad);
592		err = parse_reply_info_trace(&p, p+len, info, features);
593		if (err < 0)
594			goto out_bad;
595	}
596
597	/* extra */
598	ceph_decode_32_safe(&p, end, len, bad);
599	if (len > 0) {
600		ceph_decode_need(&p, end, len, bad);
601		err = parse_reply_info_extra(&p, p+len, info, features, s);
602		if (err < 0)
603			goto out_bad;
604	}
605
606	/* snap blob */
607	ceph_decode_32_safe(&p, end, len, bad);
608	info->snapblob_len = len;
609	info->snapblob = p;
610	p += len;
611
612	if (p != end)
613		goto bad;
614	return 0;
615
616bad:
617	err = -EIO;
618out_bad:
619	pr_err("mds parse_reply err %d\n", err);
620	return err;
621}
622
623static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
624{
625	if (!info->dir_entries)
626		return;
627	free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
628}
629
630
631/*
632 * sessions
633 */
634const char *ceph_session_state_name(int s)
635{
636	switch (s) {
637	case CEPH_MDS_SESSION_NEW: return "new";
638	case CEPH_MDS_SESSION_OPENING: return "opening";
639	case CEPH_MDS_SESSION_OPEN: return "open";
640	case CEPH_MDS_SESSION_HUNG: return "hung";
641	case CEPH_MDS_SESSION_CLOSING: return "closing";
642	case CEPH_MDS_SESSION_CLOSED: return "closed";
643	case CEPH_MDS_SESSION_RESTARTING: return "restarting";
644	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
645	case CEPH_MDS_SESSION_REJECTED: return "rejected";
646	default: return "???";
647	}
648}
649
650struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
651{
652	if (refcount_inc_not_zero(&s->s_ref)) {
653		dout("mdsc get_session %p %d -> %d\n", s,
654		     refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
655		return s;
656	} else {
657		dout("mdsc get_session %p 0 -- FAIL\n", s);
658		return NULL;
659	}
660}
661
662void ceph_put_mds_session(struct ceph_mds_session *s)
663{
664	if (IS_ERR_OR_NULL(s))
665		return;
666
667	dout("mdsc put_session %p %d -> %d\n", s,
668	     refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
669	if (refcount_dec_and_test(&s->s_ref)) {
670		if (s->s_auth.authorizer)
671			ceph_auth_destroy_authorizer(s->s_auth.authorizer);
672		WARN_ON(mutex_is_locked(&s->s_mutex));
673		xa_destroy(&s->s_delegated_inos);
674		kfree(s);
675	}
676}
677
678/*
679 * called under mdsc->mutex
680 */
681struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
682						   int mds)
683{
684	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
685		return NULL;
686	return ceph_get_mds_session(mdsc->sessions[mds]);
687}
688
689static bool __have_session(struct ceph_mds_client *mdsc, int mds)
690{
691	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
692		return false;
693	else
694		return true;
695}
696
697static int __verify_registered_session(struct ceph_mds_client *mdsc,
698				       struct ceph_mds_session *s)
699{
700	if (s->s_mds >= mdsc->max_sessions ||
701	    mdsc->sessions[s->s_mds] != s)
702		return -ENOENT;
703	return 0;
704}
705
706/*
707 * create+register a new session for given mds.
708 * called under mdsc->mutex.
709 */
710static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
711						 int mds)
712{
713	struct ceph_mds_session *s;
714
715	if (mds >= mdsc->mdsmap->possible_max_rank)
716		return ERR_PTR(-EINVAL);
717
718	s = kzalloc(sizeof(*s), GFP_NOFS);
719	if (!s)
720		return ERR_PTR(-ENOMEM);
721
722	if (mds >= mdsc->max_sessions) {
723		int newmax = 1 << get_count_order(mds + 1);
724		struct ceph_mds_session **sa;
725
726		dout("%s: realloc to %d\n", __func__, newmax);
727		sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
728		if (!sa)
729			goto fail_realloc;
730		if (mdsc->sessions) {
731			memcpy(sa, mdsc->sessions,
732			       mdsc->max_sessions * sizeof(void *));
733			kfree(mdsc->sessions);
734		}
735		mdsc->sessions = sa;
736		mdsc->max_sessions = newmax;
737	}
738
739	dout("%s: mds%d\n", __func__, mds);
740	s->s_mdsc = mdsc;
741	s->s_mds = mds;
742	s->s_state = CEPH_MDS_SESSION_NEW;
743	s->s_ttl = 0;
744	s->s_seq = 0;
745	mutex_init(&s->s_mutex);
746
747	ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
748
749	spin_lock_init(&s->s_gen_ttl_lock);
750	s->s_cap_gen = 1;
751	s->s_cap_ttl = jiffies - 1;
752
753	spin_lock_init(&s->s_cap_lock);
754	s->s_renew_requested = 0;
755	s->s_renew_seq = 0;
756	INIT_LIST_HEAD(&s->s_caps);
757	s->s_nr_caps = 0;
758	refcount_set(&s->s_ref, 1);
759	INIT_LIST_HEAD(&s->s_waiting);
760	INIT_LIST_HEAD(&s->s_unsafe);
761	xa_init(&s->s_delegated_inos);
762	s->s_num_cap_releases = 0;
763	s->s_cap_reconnect = 0;
764	s->s_cap_iterator = NULL;
765	INIT_LIST_HEAD(&s->s_cap_releases);
766	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
767
768	INIT_LIST_HEAD(&s->s_cap_dirty);
769	INIT_LIST_HEAD(&s->s_cap_flushing);
770
771	mdsc->sessions[mds] = s;
772	atomic_inc(&mdsc->num_sessions);
773	refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
774
775	ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
776		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
777
778	return s;
779
780fail_realloc:
781	kfree(s);
782	return ERR_PTR(-ENOMEM);
783}
784
785/*
786 * called under mdsc->mutex
787 */
788static void __unregister_session(struct ceph_mds_client *mdsc,
789			       struct ceph_mds_session *s)
790{
791	dout("__unregister_session mds%d %p\n", s->s_mds, s);
792	BUG_ON(mdsc->sessions[s->s_mds] != s);
793	mdsc->sessions[s->s_mds] = NULL;
794	ceph_con_close(&s->s_con);
795	ceph_put_mds_session(s);
796	atomic_dec(&mdsc->num_sessions);
797}
798
799/*
800 * drop session refs in request.
801 *
802 * should be last request ref, or hold mdsc->mutex
803 */
804static void put_request_session(struct ceph_mds_request *req)
805{
806	if (req->r_session) {
807		ceph_put_mds_session(req->r_session);
808		req->r_session = NULL;
809	}
810}
811
812void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
813				void (*cb)(struct ceph_mds_session *),
814				bool check_state)
815{
816	int mds;
817
818	mutex_lock(&mdsc->mutex);
819	for (mds = 0; mds < mdsc->max_sessions; ++mds) {
820		struct ceph_mds_session *s;
821
822		s = __ceph_lookup_mds_session(mdsc, mds);
823		if (!s)
824			continue;
825
826		if (check_state && !check_session_state(s)) {
827			ceph_put_mds_session(s);
828			continue;
829		}
830
831		mutex_unlock(&mdsc->mutex);
832		cb(s);
833		ceph_put_mds_session(s);
834		mutex_lock(&mdsc->mutex);
835	}
836	mutex_unlock(&mdsc->mutex);
837}
838
839void ceph_mdsc_release_request(struct kref *kref)
840{
841	struct ceph_mds_request *req = container_of(kref,
842						    struct ceph_mds_request,
843						    r_kref);
844	ceph_mdsc_release_dir_caps_no_check(req);
845	destroy_reply_info(&req->r_reply_info);
846	if (req->r_request)
847		ceph_msg_put(req->r_request);
848	if (req->r_reply)
849		ceph_msg_put(req->r_reply);
850	if (req->r_inode) {
851		ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
852		/* avoid calling iput_final() in mds dispatch threads */
853		ceph_async_iput(req->r_inode);
854	}
855	if (req->r_parent) {
856		ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
857		ceph_async_iput(req->r_parent);
858	}
859	ceph_async_iput(req->r_target_inode);
860	if (req->r_dentry)
861		dput(req->r_dentry);
862	if (req->r_old_dentry)
863		dput(req->r_old_dentry);
864	if (req->r_old_dentry_dir) {
865		/*
866		 * track (and drop pins for) r_old_dentry_dir
867		 * separately, since r_old_dentry's d_parent may have
868		 * changed between the dir mutex being dropped and
869		 * this request being freed.
870		 */
871		ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
872				  CEPH_CAP_PIN);
873		ceph_async_iput(req->r_old_dentry_dir);
874	}
875	kfree(req->r_path1);
876	kfree(req->r_path2);
877	if (req->r_pagelist)
878		ceph_pagelist_release(req->r_pagelist);
879	put_request_session(req);
880	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
881	WARN_ON_ONCE(!list_empty(&req->r_wait));
882	kmem_cache_free(ceph_mds_request_cachep, req);
883}
884
885DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
886
887/*
888 * lookup session, bump ref if found.
889 *
890 * called under mdsc->mutex.
891 */
892static struct ceph_mds_request *
893lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
894{
895	struct ceph_mds_request *req;
896
897	req = lookup_request(&mdsc->request_tree, tid);
898	if (req)
899		ceph_mdsc_get_request(req);
900
901	return req;
902}
903
904/*
905 * Register an in-flight request, and assign a tid.  Link to directory
906 * are modifying (if any).
907 *
908 * Called under mdsc->mutex.
909 */
910static void __register_request(struct ceph_mds_client *mdsc,
911			       struct ceph_mds_request *req,
912			       struct inode *dir)
913{
914	int ret = 0;
915
916	req->r_tid = ++mdsc->last_tid;
917	if (req->r_num_caps) {
918		ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
919					req->r_num_caps);
920		if (ret < 0) {
921			pr_err("__register_request %p "
922			       "failed to reserve caps: %d\n", req, ret);
923			/* set req->r_err to fail early from __do_request */
924			req->r_err = ret;
925			return;
926		}
927	}
928	dout("__register_request %p tid %lld\n", req, req->r_tid);
929	ceph_mdsc_get_request(req);
930	insert_request(&mdsc->request_tree, req);
931
932	req->r_uid = current_fsuid();
933	req->r_gid = current_fsgid();
934
935	if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
936		mdsc->oldest_tid = req->r_tid;
937
938	if (dir) {
939		struct ceph_inode_info *ci = ceph_inode(dir);
940
941		ihold(dir);
942		req->r_unsafe_dir = dir;
943		spin_lock(&ci->i_unsafe_lock);
944		list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
945		spin_unlock(&ci->i_unsafe_lock);
946	}
947}
948
949static void __unregister_request(struct ceph_mds_client *mdsc,
950				 struct ceph_mds_request *req)
951{
952	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
953
954	/* Never leave an unregistered request on an unsafe list! */
955	list_del_init(&req->r_unsafe_item);
956
957	if (req->r_tid == mdsc->oldest_tid) {
958		struct rb_node *p = rb_next(&req->r_node);
959		mdsc->oldest_tid = 0;
960		while (p) {
961			struct ceph_mds_request *next_req =
962				rb_entry(p, struct ceph_mds_request, r_node);
963			if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
964				mdsc->oldest_tid = next_req->r_tid;
965				break;
966			}
967			p = rb_next(p);
968		}
969	}
970
971	erase_request(&mdsc->request_tree, req);
972
973	if (req->r_unsafe_dir) {
974		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
975		spin_lock(&ci->i_unsafe_lock);
976		list_del_init(&req->r_unsafe_dir_item);
977		spin_unlock(&ci->i_unsafe_lock);
978	}
979	if (req->r_target_inode &&
980	    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
981		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
982		spin_lock(&ci->i_unsafe_lock);
983		list_del_init(&req->r_unsafe_target_item);
984		spin_unlock(&ci->i_unsafe_lock);
985	}
986
987	if (req->r_unsafe_dir) {
988		/* avoid calling iput_final() in mds dispatch threads */
989		ceph_async_iput(req->r_unsafe_dir);
990		req->r_unsafe_dir = NULL;
991	}
992
993	complete_all(&req->r_safe_completion);
994
995	ceph_mdsc_put_request(req);
996}
997
998/*
999 * Walk back up the dentry tree until we hit a dentry representing a
1000 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
1001 * when calling this) to ensure that the objects won't disappear while we're
1002 * working with them. Once we hit a candidate dentry, we attempt to take a
1003 * reference to it, and return that as the result.
1004 */
1005static struct inode *get_nonsnap_parent(struct dentry *dentry)
1006{
1007	struct inode *inode = NULL;
1008
1009	while (dentry && !IS_ROOT(dentry)) {
1010		inode = d_inode_rcu(dentry);
1011		if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
1012			break;
1013		dentry = dentry->d_parent;
1014	}
1015	if (inode)
1016		inode = igrab(inode);
1017	return inode;
1018}
1019
1020/*
1021 * Choose mds to send request to next.  If there is a hint set in the
1022 * request (e.g., due to a prior forward hint from the mds), use that.
1023 * Otherwise, consult frag tree and/or caps to identify the
1024 * appropriate mds.  If all else fails, choose randomly.
1025 *
1026 * Called under mdsc->mutex.
1027 */
1028static int __choose_mds(struct ceph_mds_client *mdsc,
1029			struct ceph_mds_request *req,
1030			bool *random)
1031{
1032	struct inode *inode;
1033	struct ceph_inode_info *ci;
1034	struct ceph_cap *cap;
1035	int mode = req->r_direct_mode;
1036	int mds = -1;
1037	u32 hash = req->r_direct_hash;
1038	bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1039
1040	if (random)
1041		*random = false;
1042
1043	/*
1044	 * is there a specific mds we should try?  ignore hint if we have
1045	 * no session and the mds is not up (active or recovering).
1046	 */
1047	if (req->r_resend_mds >= 0 &&
1048	    (__have_session(mdsc, req->r_resend_mds) ||
1049	     ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1050		dout("%s using resend_mds mds%d\n", __func__,
1051		     req->r_resend_mds);
1052		return req->r_resend_mds;
1053	}
1054
1055	if (mode == USE_RANDOM_MDS)
1056		goto random;
1057
1058	inode = NULL;
1059	if (req->r_inode) {
1060		if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1061			inode = req->r_inode;
1062			ihold(inode);
1063		} else {
1064			/* req->r_dentry is non-null for LSSNAP request */
1065			rcu_read_lock();
1066			inode = get_nonsnap_parent(req->r_dentry);
1067			rcu_read_unlock();
1068			dout("%s using snapdir's parent %p\n", __func__, inode);
1069		}
1070	} else if (req->r_dentry) {
1071		/* ignore race with rename; old or new d_parent is okay */
1072		struct dentry *parent;
1073		struct inode *dir;
1074
1075		rcu_read_lock();
1076		parent = READ_ONCE(req->r_dentry->d_parent);
1077		dir = req->r_parent ? : d_inode_rcu(parent);
1078
1079		if (!dir || dir->i_sb != mdsc->fsc->sb) {
1080			/*  not this fs or parent went negative */
1081			inode = d_inode(req->r_dentry);
1082			if (inode)
1083				ihold(inode);
1084		} else if (ceph_snap(dir) != CEPH_NOSNAP) {
1085			/* direct snapped/virtual snapdir requests
1086			 * based on parent dir inode */
1087			inode = get_nonsnap_parent(parent);
1088			dout("%s using nonsnap parent %p\n", __func__, inode);
1089		} else {
1090			/* dentry target */
1091			inode = d_inode(req->r_dentry);
1092			if (!inode || mode == USE_AUTH_MDS) {
1093				/* dir + name */
1094				inode = igrab(dir);
1095				hash = ceph_dentry_hash(dir, req->r_dentry);
1096				is_hash = true;
1097			} else {
1098				ihold(inode);
1099			}
1100		}
1101		rcu_read_unlock();
1102	}
1103
1104	dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1105	     hash, mode);
1106	if (!inode)
1107		goto random;
1108	ci = ceph_inode(inode);
1109
1110	if (is_hash && S_ISDIR(inode->i_mode)) {
1111		struct ceph_inode_frag frag;
1112		int found;
1113
1114		ceph_choose_frag(ci, hash, &frag, &found);
1115		if (found) {
1116			if (mode == USE_ANY_MDS && frag.ndist > 0) {
1117				u8 r;
1118
1119				/* choose a random replica */
1120				get_random_bytes(&r, 1);
1121				r %= frag.ndist;
1122				mds = frag.dist[r];
1123				dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1124				     __func__, inode, ceph_vinop(inode),
1125				     frag.frag, mds, (int)r, frag.ndist);
1126				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1127				    CEPH_MDS_STATE_ACTIVE &&
1128				    !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1129					goto out;
1130			}
1131
1132			/* since this file/dir wasn't known to be
1133			 * replicated, then we want to look for the
1134			 * authoritative mds. */
1135			if (frag.mds >= 0) {
1136				/* choose auth mds */
1137				mds = frag.mds;
1138				dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1139				     __func__, inode, ceph_vinop(inode),
1140				     frag.frag, mds);
1141				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1142				    CEPH_MDS_STATE_ACTIVE) {
1143					if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1144								  mds))
1145						goto out;
1146				}
1147			}
1148			mode = USE_AUTH_MDS;
1149		}
1150	}
1151
1152	spin_lock(&ci->i_ceph_lock);
1153	cap = NULL;
1154	if (mode == USE_AUTH_MDS)
1155		cap = ci->i_auth_cap;
1156	if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1157		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1158	if (!cap) {
1159		spin_unlock(&ci->i_ceph_lock);
1160		ceph_async_iput(inode);
1161		goto random;
1162	}
1163	mds = cap->session->s_mds;
1164	dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
1165	     inode, ceph_vinop(inode), mds,
1166	     cap == ci->i_auth_cap ? "auth " : "", cap);
1167	spin_unlock(&ci->i_ceph_lock);
1168out:
1169	/* avoid calling iput_final() while holding mdsc->mutex or
1170	 * in mds dispatch threads */
1171	ceph_async_iput(inode);
1172	return mds;
1173
1174random:
1175	if (random)
1176		*random = true;
1177
1178	mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1179	dout("%s chose random mds%d\n", __func__, mds);
1180	return mds;
1181}
1182
1183
1184/*
1185 * session messages
1186 */
1187struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
1188{
1189	struct ceph_msg *msg;
1190	struct ceph_mds_session_head *h;
1191
1192	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1193			   false);
1194	if (!msg) {
1195		pr_err("ENOMEM creating session %s msg\n",
1196		       ceph_session_op_name(op));
1197		return NULL;
1198	}
1199	h = msg->front.iov_base;
1200	h->op = cpu_to_le32(op);
1201	h->seq = cpu_to_le64(seq);
1202
1203	return msg;
1204}
1205
1206static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1207#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1208static int encode_supported_features(void **p, void *end)
1209{
1210	static const size_t count = ARRAY_SIZE(feature_bits);
1211
1212	if (count > 0) {
1213		size_t i;
1214		size_t size = FEATURE_BYTES(count);
1215		unsigned long bit;
1216
1217		if (WARN_ON_ONCE(*p + 4 + size > end))
1218			return -ERANGE;
1219
1220		ceph_encode_32(p, size);
1221		memset(*p, 0, size);
1222		for (i = 0; i < count; i++) {
1223			bit = feature_bits[i];
1224			((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
1225		}
1226		*p += size;
1227	} else {
1228		if (WARN_ON_ONCE(*p + 4 > end))
1229			return -ERANGE;
1230
1231		ceph_encode_32(p, 0);
1232	}
1233
1234	return 0;
1235}
1236
1237static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1238#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
1239static int encode_metric_spec(void **p, void *end)
1240{
1241	static const size_t count = ARRAY_SIZE(metric_bits);
1242
1243	/* header */
1244	if (WARN_ON_ONCE(*p + 2 > end))
1245		return -ERANGE;
1246
1247	ceph_encode_8(p, 1); /* version */
1248	ceph_encode_8(p, 1); /* compat */
1249
1250	if (count > 0) {
1251		size_t i;
1252		size_t size = METRIC_BYTES(count);
1253
1254		if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1255			return -ERANGE;
1256
1257		/* metric spec info length */
1258		ceph_encode_32(p, 4 + size);
1259
1260		/* metric spec */
1261		ceph_encode_32(p, size);
1262		memset(*p, 0, size);
1263		for (i = 0; i < count; i++)
1264			((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1265		*p += size;
1266	} else {
1267		if (WARN_ON_ONCE(*p + 4 + 4 > end))
1268			return -ERANGE;
1269
1270		/* metric spec info length */
1271		ceph_encode_32(p, 4);
1272		/* metric spec */
1273		ceph_encode_32(p, 0);
1274	}
1275
1276	return 0;
1277}
1278
1279/*
1280 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1281 * to include additional client metadata fields.
1282 */
1283static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1284{
1285	struct ceph_msg *msg;
1286	struct ceph_mds_session_head *h;
1287	int i = -1;
1288	int extra_bytes = 0;
1289	int metadata_key_count = 0;
1290	struct ceph_options *opt = mdsc->fsc->client->options;
1291	struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1292	size_t size, count;
1293	void *p, *end;
1294	int ret;
1295
1296	const char* metadata[][2] = {
1297		{"hostname", mdsc->nodename},
1298		{"kernel_version", init_utsname()->release},
1299		{"entity_id", opt->name ? : ""},
1300		{"root", fsopt->server_path ? : "/"},
1301		{NULL, NULL}
1302	};
1303
1304	/* Calculate serialized length of metadata */
1305	extra_bytes = 4;  /* map length */
1306	for (i = 0; metadata[i][0]; ++i) {
1307		extra_bytes += 8 + strlen(metadata[i][0]) +
1308			strlen(metadata[i][1]);
1309		metadata_key_count++;
1310	}
1311
1312	/* supported feature */
1313	size = 0;
1314	count = ARRAY_SIZE(feature_bits);
1315	if (count > 0)
1316		size = FEATURE_BYTES(count);
1317	extra_bytes += 4 + size;
1318
1319	/* metric spec */
1320	size = 0;
1321	count = ARRAY_SIZE(metric_bits);
1322	if (count > 0)
1323		size = METRIC_BYTES(count);
1324	extra_bytes += 2 + 4 + 4 + size;
1325
1326	/* Allocate the message */
1327	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1328			   GFP_NOFS, false);
1329	if (!msg) {
1330		pr_err("ENOMEM creating session open msg\n");
1331		return ERR_PTR(-ENOMEM);
1332	}
1333	p = msg->front.iov_base;
1334	end = p + msg->front.iov_len;
1335
1336	h = p;
1337	h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1338	h->seq = cpu_to_le64(seq);
1339
1340	/*
1341	 * Serialize client metadata into waiting buffer space, using
1342	 * the format that userspace expects for map<string, string>
1343	 *
1344	 * ClientSession messages with metadata are v4
1345	 */
1346	msg->hdr.version = cpu_to_le16(4);
1347	msg->hdr.compat_version = cpu_to_le16(1);
1348
1349	/* The write pointer, following the session_head structure */
1350	p += sizeof(*h);
1351
1352	/* Number of entries in the map */
1353	ceph_encode_32(&p, metadata_key_count);
1354
1355	/* Two length-prefixed strings for each entry in the map */
1356	for (i = 0; metadata[i][0]; ++i) {
1357		size_t const key_len = strlen(metadata[i][0]);
1358		size_t const val_len = strlen(metadata[i][1]);
1359
1360		ceph_encode_32(&p, key_len);
1361		memcpy(p, metadata[i][0], key_len);
1362		p += key_len;
1363		ceph_encode_32(&p, val_len);
1364		memcpy(p, metadata[i][1], val_len);
1365		p += val_len;
1366	}
1367
1368	ret = encode_supported_features(&p, end);
1369	if (ret) {
1370		pr_err("encode_supported_features failed!\n");
1371		ceph_msg_put(msg);
1372		return ERR_PTR(ret);
1373	}
1374
1375	ret = encode_metric_spec(&p, end);
1376	if (ret) {
1377		pr_err("encode_metric_spec failed!\n");
1378		ceph_msg_put(msg);
1379		return ERR_PTR(ret);
1380	}
1381
1382	msg->front.iov_len = p - msg->front.iov_base;
1383	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1384
1385	return msg;
1386}
1387
1388/*
1389 * send session open request.
1390 *
1391 * called under mdsc->mutex
1392 */
1393static int __open_session(struct ceph_mds_client *mdsc,
1394			  struct ceph_mds_session *session)
1395{
1396	struct ceph_msg *msg;
1397	int mstate;
1398	int mds = session->s_mds;
1399
1400	/* wait for mds to go active? */
1401	mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1402	dout("open_session to mds%d (%s)\n", mds,
1403	     ceph_mds_state_name(mstate));
1404	session->s_state = CEPH_MDS_SESSION_OPENING;
1405	session->s_renew_requested = jiffies;
1406
1407	/* send connect message */
1408	msg = create_session_open_msg(mdsc, session->s_seq);
1409	if (IS_ERR(msg))
1410		return PTR_ERR(msg);
1411	ceph_con_send(&session->s_con, msg);
1412	return 0;
1413}
1414
1415/*
1416 * open sessions for any export targets for the given mds
1417 *
1418 * called under mdsc->mutex
1419 */
1420static struct ceph_mds_session *
1421__open_export_target_session(struct ceph_mds_client *mdsc, int target)
1422{
1423	struct ceph_mds_session *session;
1424	int ret;
1425
1426	session = __ceph_lookup_mds_session(mdsc, target);
1427	if (!session) {
1428		session = register_session(mdsc, target);
1429		if (IS_ERR(session))
1430			return session;
1431	}
1432	if (session->s_state == CEPH_MDS_SESSION_NEW ||
1433	    session->s_state == CEPH_MDS_SESSION_CLOSING) {
1434		ret = __open_session(mdsc, session);
1435		if (ret)
1436			return ERR_PTR(ret);
1437	}
1438
1439	return session;
1440}
1441
1442struct ceph_mds_session *
1443ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1444{
1445	struct ceph_mds_session *session;
1446
1447	dout("open_export_target_session to mds%d\n", target);
1448
1449	mutex_lock(&mdsc->mutex);
1450	session = __open_export_target_session(mdsc, target);
1451	mutex_unlock(&mdsc->mutex);
1452
1453	return session;
1454}
1455
1456static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1457					  struct ceph_mds_session *session)
1458{
1459	struct ceph_mds_info *mi;
1460	struct ceph_mds_session *ts;
1461	int i, mds = session->s_mds;
1462
1463	if (mds >= mdsc->mdsmap->possible_max_rank)
1464		return;
1465
1466	mi = &mdsc->mdsmap->m_info[mds];
1467	dout("open_export_target_sessions for mds%d (%d targets)\n",
1468	     session->s_mds, mi->num_export_targets);
1469
1470	for (i = 0; i < mi->num_export_targets; i++) {
1471		ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1472		ceph_put_mds_session(ts);
1473	}
1474}
1475
1476void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1477					   struct ceph_mds_session *session)
1478{
1479	mutex_lock(&mdsc->mutex);
1480	__open_export_target_sessions(mdsc, session);
1481	mutex_unlock(&mdsc->mutex);
1482}
1483
1484/*
1485 * session caps
1486 */
1487
1488static void detach_cap_releases(struct ceph_mds_session *session,
1489				struct list_head *target)
1490{
1491	lockdep_assert_held(&session->s_cap_lock);
1492
1493	list_splice_init(&session->s_cap_releases, target);
1494	session->s_num_cap_releases = 0;
1495	dout("dispose_cap_releases mds%d\n", session->s_mds);
1496}
1497
1498static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1499				 struct list_head *dispose)
1500{
1501	while (!list_empty(dispose)) {
1502		struct ceph_cap *cap;
1503		/* zero out the in-progress message */
1504		cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1505		list_del(&cap->session_caps);
1506		ceph_put_cap(mdsc, cap);
1507	}
1508}
1509
1510static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1511				     struct ceph_mds_session *session)
1512{
1513	struct ceph_mds_request *req;
1514	struct rb_node *p;
1515
1516	dout("cleanup_session_requests mds%d\n", session->s_mds);
1517	mutex_lock(&mdsc->mutex);
1518	while (!list_empty(&session->s_unsafe)) {
1519		req = list_first_entry(&session->s_unsafe,
1520				       struct ceph_mds_request, r_unsafe_item);
1521		pr_warn_ratelimited(" dropping unsafe request %llu\n",
1522				    req->r_tid);
1523		if (req->r_target_inode)
1524			mapping_set_error(req->r_target_inode->i_mapping, -EIO);
1525		if (req->r_unsafe_dir)
1526			mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
1527		__unregister_request(mdsc, req);
1528	}
1529	/* zero r_attempts, so kick_requests() will re-send requests */
1530	p = rb_first(&mdsc->request_tree);
1531	while (p) {
1532		req = rb_entry(p, struct ceph_mds_request, r_node);
1533		p = rb_next(p);
1534		if (req->r_session &&
1535		    req->r_session->s_mds == session->s_mds)
1536			req->r_attempts = 0;
1537	}
1538	mutex_unlock(&mdsc->mutex);
1539}
1540
1541/*
1542 * Helper to safely iterate over all caps associated with a session, with
1543 * special care taken to handle a racing __ceph_remove_cap().
1544 *
1545 * Caller must hold session s_mutex.
1546 */
1547int ceph_iterate_session_caps(struct ceph_mds_session *session,
1548			      int (*cb)(struct inode *, struct ceph_cap *,
1549					void *), void *arg)
1550{
1551	struct list_head *p;
1552	struct ceph_cap *cap;
1553	struct inode *inode, *last_inode = NULL;
1554	struct ceph_cap *old_cap = NULL;
1555	int ret;
1556
1557	dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1558	spin_lock(&session->s_cap_lock);
1559	p = session->s_caps.next;
1560	while (p != &session->s_caps) {
1561		cap = list_entry(p, struct ceph_cap, session_caps);
1562		inode = igrab(&cap->ci->vfs_inode);
1563		if (!inode) {
1564			p = p->next;
1565			continue;
1566		}
1567		session->s_cap_iterator = cap;
1568		spin_unlock(&session->s_cap_lock);
1569
1570		if (last_inode) {
1571			/* avoid calling iput_final() while holding
1572			 * s_mutex or in mds dispatch threads */
1573			ceph_async_iput(last_inode);
1574			last_inode = NULL;
1575		}
1576		if (old_cap) {
1577			ceph_put_cap(session->s_mdsc, old_cap);
1578			old_cap = NULL;
1579		}
1580
1581		ret = cb(inode, cap, arg);
1582		last_inode = inode;
1583
1584		spin_lock(&session->s_cap_lock);
1585		p = p->next;
1586		if (!cap->ci) {
1587			dout("iterate_session_caps  finishing cap %p removal\n",
1588			     cap);
1589			BUG_ON(cap->session != session);
1590			cap->session = NULL;
1591			list_del_init(&cap->session_caps);
1592			session->s_nr_caps--;
1593			atomic64_dec(&session->s_mdsc->metric.total_caps);
1594			if (cap->queue_release)
1595				__ceph_queue_cap_release(session, cap);
1596			else
1597				old_cap = cap;  /* put_cap it w/o locks held */
1598		}
1599		if (ret < 0)
1600			goto out;
1601	}
1602	ret = 0;
1603out:
1604	session->s_cap_iterator = NULL;
1605	spin_unlock(&session->s_cap_lock);
1606
1607	ceph_async_iput(last_inode);
1608	if (old_cap)
1609		ceph_put_cap(session->s_mdsc, old_cap);
1610
1611	return ret;
1612}
1613
1614static int remove_capsnaps(struct ceph_mds_client *mdsc, struct inode *inode)
1615{
1616	struct ceph_inode_info *ci = ceph_inode(inode);
1617	struct ceph_cap_snap *capsnap;
1618	int capsnap_release = 0;
1619
1620	lockdep_assert_held(&ci->i_ceph_lock);
1621
1622	dout("removing capsnaps, ci is %p, inode is %p\n", ci, inode);
1623
1624	while (!list_empty(&ci->i_cap_snaps)) {
1625		capsnap = list_first_entry(&ci->i_cap_snaps,
1626					   struct ceph_cap_snap, ci_item);
1627		__ceph_remove_capsnap(inode, capsnap, NULL, NULL);
1628		ceph_put_snap_context(capsnap->context);
1629		ceph_put_cap_snap(capsnap);
1630		capsnap_release++;
1631	}
1632	wake_up_all(&ci->i_cap_wq);
1633	wake_up_all(&mdsc->cap_flushing_wq);
1634	return capsnap_release;
1635}
1636
1637static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1638				  void *arg)
1639{
1640	struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1641	struct ceph_mds_client *mdsc = fsc->mdsc;
1642	struct ceph_inode_info *ci = ceph_inode(inode);
1643	LIST_HEAD(to_remove);
1644	bool dirty_dropped = false;
1645	bool invalidate = false;
1646	int capsnap_release = 0;
1647
1648	dout("removing cap %p, ci is %p, inode is %p\n",
1649	     cap, ci, &ci->vfs_inode);
1650	spin_lock(&ci->i_ceph_lock);
1651	__ceph_remove_cap(cap, false);
1652	if (!ci->i_auth_cap) {
1653		struct ceph_cap_flush *cf;
1654
1655		if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1656			if (inode->i_data.nrpages > 0)
1657				invalidate = true;
1658			if (ci->i_wrbuffer_ref > 0)
1659				mapping_set_error(&inode->i_data, -EIO);
1660		}
1661
1662		while (!list_empty(&ci->i_cap_flush_list)) {
1663			cf = list_first_entry(&ci->i_cap_flush_list,
1664					      struct ceph_cap_flush, i_list);
1665			list_move(&cf->i_list, &to_remove);
1666		}
1667
1668		spin_lock(&mdsc->cap_dirty_lock);
1669
1670		list_for_each_entry(cf, &to_remove, i_list)
1671			list_del_init(&cf->g_list);
1672
1673		if (!list_empty(&ci->i_dirty_item)) {
1674			pr_warn_ratelimited(
1675				" dropping dirty %s state for %p %lld\n",
1676				ceph_cap_string(ci->i_dirty_caps),
1677				inode, ceph_ino(inode));
1678			ci->i_dirty_caps = 0;
1679			list_del_init(&ci->i_dirty_item);
1680			dirty_dropped = true;
1681		}
1682		if (!list_empty(&ci->i_flushing_item)) {
1683			pr_warn_ratelimited(
1684				" dropping dirty+flushing %s state for %p %lld\n",
1685				ceph_cap_string(ci->i_flushing_caps),
1686				inode, ceph_ino(inode));
1687			ci->i_flushing_caps = 0;
1688			list_del_init(&ci->i_flushing_item);
1689			mdsc->num_cap_flushing--;
1690			dirty_dropped = true;
1691		}
1692		spin_unlock(&mdsc->cap_dirty_lock);
1693
1694		if (dirty_dropped) {
1695			mapping_set_error(inode->i_mapping, -EIO);
1696
1697			if (ci->i_wrbuffer_ref_head == 0 &&
1698			    ci->i_wr_ref == 0 &&
1699			    ci->i_dirty_caps == 0 &&
1700			    ci->i_flushing_caps == 0) {
1701				ceph_put_snap_context(ci->i_head_snapc);
1702				ci->i_head_snapc = NULL;
1703			}
1704		}
1705
1706		if (atomic_read(&ci->i_filelock_ref) > 0) {
1707			/* make further file lock syscall return -EIO */
1708			ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1709			pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1710					    inode, ceph_ino(inode));
1711		}
1712
1713		if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1714			list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1715			ci->i_prealloc_cap_flush = NULL;
1716		}
1717
1718		if (!list_empty(&ci->i_cap_snaps))
1719			capsnap_release = remove_capsnaps(mdsc, inode);
1720	}
1721	spin_unlock(&ci->i_ceph_lock);
1722	while (!list_empty(&to_remove)) {
1723		struct ceph_cap_flush *cf;
1724		cf = list_first_entry(&to_remove,
1725				      struct ceph_cap_flush, i_list);
1726		list_del_init(&cf->i_list);
1727		if (!cf->is_capsnap)
1728			ceph_free_cap_flush(cf);
1729	}
1730
1731	wake_up_all(&ci->i_cap_wq);
1732	if (invalidate)
1733		ceph_queue_invalidate(inode);
1734	if (dirty_dropped)
1735		iput(inode);
1736	while (capsnap_release--)
1737		iput(inode);
1738	return 0;
1739}
1740
1741/*
1742 * caller must hold session s_mutex
1743 */
1744static void remove_session_caps(struct ceph_mds_session *session)
1745{
1746	struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1747	struct super_block *sb = fsc->sb;
1748	LIST_HEAD(dispose);
1749
1750	dout("remove_session_caps on %p\n", session);
1751	ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1752
1753	wake_up_all(&fsc->mdsc->cap_flushing_wq);
1754
1755	spin_lock(&session->s_cap_lock);
1756	if (session->s_nr_caps > 0) {
1757		struct inode *inode;
1758		struct ceph_cap *cap, *prev = NULL;
1759		struct ceph_vino vino;
1760		/*
1761		 * iterate_session_caps() skips inodes that are being
1762		 * deleted, we need to wait until deletions are complete.
1763		 * __wait_on_freeing_inode() is designed for the job,
1764		 * but it is not exported, so use lookup inode function
1765		 * to access it.
1766		 */
1767		while (!list_empty(&session->s_caps)) {
1768			cap = list_entry(session->s_caps.next,
1769					 struct ceph_cap, session_caps);
1770			if (cap == prev)
1771				break;
1772			prev = cap;
1773			vino = cap->ci->i_vino;
1774			spin_unlock(&session->s_cap_lock);
1775
1776			inode = ceph_find_inode(sb, vino);
1777			 /* avoid calling iput_final() while holding s_mutex */
1778			ceph_async_iput(inode);
1779
1780			spin_lock(&session->s_cap_lock);
1781		}
1782	}
1783
1784	// drop cap expires and unlock s_cap_lock
1785	detach_cap_releases(session, &dispose);
1786
1787	BUG_ON(session->s_nr_caps > 0);
1788	BUG_ON(!list_empty(&session->s_cap_flushing));
1789	spin_unlock(&session->s_cap_lock);
1790	dispose_cap_releases(session->s_mdsc, &dispose);
1791}
1792
1793enum {
1794	RECONNECT,
1795	RENEWCAPS,
1796	FORCE_RO,
1797};
1798
1799/*
1800 * wake up any threads waiting on this session's caps.  if the cap is
1801 * old (didn't get renewed on the client reconnect), remove it now.
1802 *
1803 * caller must hold s_mutex.
1804 */
1805static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1806			      void *arg)
1807{
1808	struct ceph_inode_info *ci = ceph_inode(inode);
1809	unsigned long ev = (unsigned long)arg;
1810
1811	if (ev == RECONNECT) {
1812		spin_lock(&ci->i_ceph_lock);
1813		ci->i_wanted_max_size = 0;
1814		ci->i_requested_max_size = 0;
1815		spin_unlock(&ci->i_ceph_lock);
1816	} else if (ev == RENEWCAPS) {
1817		if (cap->cap_gen < cap->session->s_cap_gen) {
1818			/* mds did not re-issue stale cap */
1819			spin_lock(&ci->i_ceph_lock);
1820			cap->issued = cap->implemented = CEPH_CAP_PIN;
1821			spin_unlock(&ci->i_ceph_lock);
1822		}
1823	} else if (ev == FORCE_RO) {
1824	}
1825	wake_up_all(&ci->i_cap_wq);
1826	return 0;
1827}
1828
1829static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1830{
1831	dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1832	ceph_iterate_session_caps(session, wake_up_session_cb,
1833				  (void *)(unsigned long)ev);
1834}
1835
1836/*
1837 * Send periodic message to MDS renewing all currently held caps.  The
1838 * ack will reset the expiration for all caps from this session.
1839 *
1840 * caller holds s_mutex
1841 */
1842static int send_renew_caps(struct ceph_mds_client *mdsc,
1843			   struct ceph_mds_session *session)
1844{
1845	struct ceph_msg *msg;
1846	int state;
1847
1848	if (time_after_eq(jiffies, session->s_cap_ttl) &&
1849	    time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1850		pr_info("mds%d caps stale\n", session->s_mds);
1851	session->s_renew_requested = jiffies;
1852
1853	/* do not try to renew caps until a recovering mds has reconnected
1854	 * with its clients. */
1855	state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1856	if (state < CEPH_MDS_STATE_RECONNECT) {
1857		dout("send_renew_caps ignoring mds%d (%s)\n",
1858		     session->s_mds, ceph_mds_state_name(state));
1859		return 0;
1860	}
1861
1862	dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1863		ceph_mds_state_name(state));
1864	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1865				      ++session->s_renew_seq);
1866	if (!msg)
1867		return -ENOMEM;
1868	ceph_con_send(&session->s_con, msg);
1869	return 0;
1870}
1871
1872static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1873			     struct ceph_mds_session *session, u64 seq)
1874{
1875	struct ceph_msg *msg;
1876
1877	dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1878	     session->s_mds, ceph_session_state_name(session->s_state), seq);
1879	msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1880	if (!msg)
1881		return -ENOMEM;
1882	ceph_con_send(&session->s_con, msg);
1883	return 0;
1884}
1885
1886
1887/*
1888 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1889 *
1890 * Called under session->s_mutex
1891 */
1892static void renewed_caps(struct ceph_mds_client *mdsc,
1893			 struct ceph_mds_session *session, int is_renew)
1894{
1895	int was_stale;
1896	int wake = 0;
1897
1898	spin_lock(&session->s_cap_lock);
1899	was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1900
1901	session->s_cap_ttl = session->s_renew_requested +
1902		mdsc->mdsmap->m_session_timeout*HZ;
1903
1904	if (was_stale) {
1905		if (time_before(jiffies, session->s_cap_ttl)) {
1906			pr_info("mds%d caps renewed\n", session->s_mds);
1907			wake = 1;
1908		} else {
1909			pr_info("mds%d caps still stale\n", session->s_mds);
1910		}
1911	}
1912	dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1913	     session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1914	     time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1915	spin_unlock(&session->s_cap_lock);
1916
1917	if (wake)
1918		wake_up_session_caps(session, RENEWCAPS);
1919}
1920
1921/*
1922 * send a session close request
1923 */
1924static int request_close_session(struct ceph_mds_session *session)
1925{
1926	struct ceph_msg *msg;
1927
1928	dout("request_close_session mds%d state %s seq %lld\n",
1929	     session->s_mds, ceph_session_state_name(session->s_state),
1930	     session->s_seq);
1931	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
1932				      session->s_seq);
1933	if (!msg)
1934		return -ENOMEM;
1935	ceph_con_send(&session->s_con, msg);
1936	return 1;
1937}
1938
1939/*
1940 * Called with s_mutex held.
1941 */
1942static int __close_session(struct ceph_mds_client *mdsc,
1943			 struct ceph_mds_session *session)
1944{
1945	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1946		return 0;
1947	session->s_state = CEPH_MDS_SESSION_CLOSING;
1948	return request_close_session(session);
1949}
1950
1951static bool drop_negative_children(struct dentry *dentry)
1952{
1953	struct dentry *child;
1954	bool all_negative = true;
1955
1956	if (!d_is_dir(dentry))
1957		goto out;
1958
1959	spin_lock(&dentry->d_lock);
1960	list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1961		if (d_really_is_positive(child)) {
1962			all_negative = false;
1963			break;
1964		}
1965	}
1966	spin_unlock(&dentry->d_lock);
1967
1968	if (all_negative)
1969		shrink_dcache_parent(dentry);
1970out:
1971	return all_negative;
1972}
1973
1974/*
1975 * Trim old(er) caps.
1976 *
1977 * Because we can't cache an inode without one or more caps, we do
1978 * this indirectly: if a cap is unused, we prune its aliases, at which
1979 * point the inode will hopefully get dropped to.
1980 *
1981 * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1982 * memory pressure from the MDS, though, so it needn't be perfect.
1983 */
1984static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1985{
1986	int *remaining = arg;
1987	struct ceph_inode_info *ci = ceph_inode(inode);
1988	int used, wanted, oissued, mine;
1989
1990	if (*remaining <= 0)
1991		return -1;
1992
1993	spin_lock(&ci->i_ceph_lock);
1994	mine = cap->issued | cap->implemented;
1995	used = __ceph_caps_used(ci);
1996	wanted = __ceph_caps_file_wanted(ci);
1997	oissued = __ceph_caps_issued_other(ci, cap);
1998
1999	dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
2000	     inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
2001	     ceph_cap_string(used), ceph_cap_string(wanted));
2002	if (cap == ci->i_auth_cap) {
2003		if (ci->i_dirty_caps || ci->i_flushing_caps ||
2004		    !list_empty(&ci->i_cap_snaps))
2005			goto out;
2006		if ((used | wanted) & CEPH_CAP_ANY_WR)
2007			goto out;
2008		/* Note: it's possible that i_filelock_ref becomes non-zero
2009		 * after dropping auth caps. It doesn't hurt because reply
2010		 * of lock mds request will re-add auth caps. */
2011		if (atomic_read(&ci->i_filelock_ref) > 0)
2012			goto out;
2013	}
2014	/* The inode has cached pages, but it's no longer used.
2015	 * we can safely drop it */
2016	if (S_ISREG(inode->i_mode) &&
2017	    wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
2018	    !(oissued & CEPH_CAP_FILE_CACHE)) {
2019	  used = 0;
2020	  oissued = 0;
2021	}
2022	if ((used | wanted) & ~oissued & mine)
2023		goto out;   /* we need these caps */
2024
2025	if (oissued) {
2026		/* we aren't the only cap.. just remove us */
2027		__ceph_remove_cap(cap, true);
2028		(*remaining)--;
2029	} else {
2030		struct dentry *dentry;
2031		/* try dropping referring dentries */
2032		spin_unlock(&ci->i_ceph_lock);
2033		dentry = d_find_any_alias(inode);
2034		if (dentry && drop_negative_children(dentry)) {
2035			int count;
2036			dput(dentry);
2037			d_prune_aliases(inode);
2038			count = atomic_read(&inode->i_count);
2039			if (count == 1)
2040				(*remaining)--;
2041			dout("trim_caps_cb %p cap %p pruned, count now %d\n",
2042			     inode, cap, count);
2043		} else {
2044			dput(dentry);
2045		}
2046		return 0;
2047	}
2048
2049out:
2050	spin_unlock(&ci->i_ceph_lock);
2051	return 0;
2052}
2053
2054/*
2055 * Trim session cap count down to some max number.
2056 */
2057int ceph_trim_caps(struct ceph_mds_client *mdsc,
2058		   struct ceph_mds_session *session,
2059		   int max_caps)
2060{
2061	int trim_caps = session->s_nr_caps - max_caps;
2062
2063	dout("trim_caps mds%d start: %d / %d, trim %d\n",
2064	     session->s_mds, session->s_nr_caps, max_caps, trim_caps);
2065	if (trim_caps > 0) {
2066		int remaining = trim_caps;
2067
2068		ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2069		dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
2070		     session->s_mds, session->s_nr_caps, max_caps,
2071			trim_caps - remaining);
2072	}
2073
2074	ceph_flush_cap_releases(mdsc, session);
2075	return 0;
2076}
2077
2078static int check_caps_flush(struct ceph_mds_client *mdsc,
2079			    u64 want_flush_tid)
2080{
2081	int ret = 1;
2082
2083	spin_lock(&mdsc->cap_dirty_lock);
2084	if (!list_empty(&mdsc->cap_flush_list)) {
2085		struct ceph_cap_flush *cf =
2086			list_first_entry(&mdsc->cap_flush_list,
2087					 struct ceph_cap_flush, g_list);
2088		if (cf->tid <= want_flush_tid) {
2089			dout("check_caps_flush still flushing tid "
2090			     "%llu <= %llu\n", cf->tid, want_flush_tid);
2091			ret = 0;
2092		}
2093	}
2094	spin_unlock(&mdsc->cap_dirty_lock);
2095	return ret;
2096}
2097
2098/*
2099 * flush all dirty inode data to disk.
2100 *
2101 * returns true if we've flushed through want_flush_tid
2102 */
2103static void wait_caps_flush(struct ceph_mds_client *mdsc,
2104			    u64 want_flush_tid)
2105{
2106	dout("check_caps_flush want %llu\n", want_flush_tid);
2107
2108	wait_event(mdsc->cap_flushing_wq,
2109		   check_caps_flush(mdsc, want_flush_tid));
2110
2111	dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
2112}
2113
2114/*
2115 * called under s_mutex
2116 */
2117static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2118				   struct ceph_mds_session *session)
2119{
2120	struct ceph_msg *msg = NULL;
2121	struct ceph_mds_cap_release *head;
2122	struct ceph_mds_cap_item *item;
2123	struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2124	struct ceph_cap *cap;
2125	LIST_HEAD(tmp_list);
2126	int num_cap_releases;
2127	__le32	barrier, *cap_barrier;
2128
2129	down_read(&osdc->lock);
2130	barrier = cpu_to_le32(osdc->epoch_barrier);
2131	up_read(&osdc->lock);
2132
2133	spin_lock(&session->s_cap_lock);
2134again:
2135	list_splice_init(&session->s_cap_releases, &tmp_list);
2136	num_cap_releases = session->s_num_cap_releases;
2137	session->s_num_cap_releases = 0;
2138	spin_unlock(&session->s_cap_lock);
2139
2140	while (!list_empty(&tmp_list)) {
2141		if (!msg) {
2142			msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2143					PAGE_SIZE, GFP_NOFS, false);
2144			if (!msg)
2145				goto out_err;
2146			head = msg->front.iov_base;
2147			head->num = cpu_to_le32(0);
2148			msg->front.iov_len = sizeof(*head);
2149
2150			msg->hdr.version = cpu_to_le16(2);
2151			msg->hdr.compat_version = cpu_to_le16(1);
2152		}
2153
2154		cap = list_first_entry(&tmp_list, struct ceph_cap,
2155					session_caps);
2156		list_del(&cap->session_caps);
2157		num_cap_releases--;
2158
2159		head = msg->front.iov_base;
2160		put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2161				   &head->num);
2162		item = msg->front.iov_base + msg->front.iov_len;
2163		item->ino = cpu_to_le64(cap->cap_ino);
2164		item->cap_id = cpu_to_le64(cap->cap_id);
2165		item->migrate_seq = cpu_to_le32(cap->mseq);
2166		item->seq = cpu_to_le32(cap->issue_seq);
2167		msg->front.iov_len += sizeof(*item);
2168
2169		ceph_put_cap(mdsc, cap);
2170
2171		if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2172			// Append cap_barrier field
2173			cap_barrier = msg->front.iov_base + msg->front.iov_len;
2174			*cap_barrier = barrier;
2175			msg->front.iov_len += sizeof(*cap_barrier);
2176
2177			msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2178			dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2179			ceph_con_send(&session->s_con, msg);
2180			msg = NULL;
2181		}
2182	}
2183
2184	BUG_ON(num_cap_releases != 0);
2185
2186	spin_lock(&session->s_cap_lock);
2187	if (!list_empty(&session->s_cap_releases))
2188		goto again;
2189	spin_unlock(&session->s_cap_lock);
2190
2191	if (msg) {
2192		// Append cap_barrier field
2193		cap_barrier = msg->front.iov_base + msg->front.iov_len;
2194		*cap_barrier = barrier;
2195		msg->front.iov_len += sizeof(*cap_barrier);
2196
2197		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2198		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2199		ceph_con_send(&session->s_con, msg);
2200	}
2201	return;
2202out_err:
2203	pr_err("send_cap_releases mds%d, failed to allocate message\n",
2204		session->s_mds);
2205	spin_lock(&session->s_cap_lock);
2206	list_splice(&tmp_list, &session->s_cap_releases);
2207	session->s_num_cap_releases += num_cap_releases;
2208	spin_unlock(&session->s_cap_lock);
2209}
2210
2211static void ceph_cap_release_work(struct work_struct *work)
2212{
2213	struct ceph_mds_session *session =
2214		container_of(work, struct ceph_mds_session, s_cap_release_work);
2215
2216	mutex_lock(&session->s_mutex);
2217	if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2218	    session->s_state == CEPH_MDS_SESSION_HUNG)
2219		ceph_send_cap_releases(session->s_mdsc, session);
2220	mutex_unlock(&session->s_mutex);
2221	ceph_put_mds_session(session);
2222}
2223
2224void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2225		             struct ceph_mds_session *session)
2226{
2227	if (mdsc->stopping)
2228		return;
2229
2230	ceph_get_mds_session(session);
2231	if (queue_work(mdsc->fsc->cap_wq,
2232		       &session->s_cap_release_work)) {
2233		dout("cap release work queued\n");
2234	} else {
2235		ceph_put_mds_session(session);
2236		dout("failed to queue cap release work\n");
2237	}
2238}
2239
2240/*
2241 * caller holds session->s_cap_lock
2242 */
2243void __ceph_queue_cap_release(struct ceph_mds_session *session,
2244			      struct ceph_cap *cap)
2245{
2246	list_add_tail(&cap->session_caps, &session->s_cap_releases);
2247	session->s_num_cap_releases++;
2248
2249	if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2250		ceph_flush_cap_releases(session->s_mdsc, session);
2251}
2252
2253static void ceph_cap_reclaim_work(struct work_struct *work)
2254{
2255	struct ceph_mds_client *mdsc =
2256		container_of(work, struct ceph_mds_client, cap_reclaim_work);
2257	int ret = ceph_trim_dentries(mdsc);
2258	if (ret == -EAGAIN)
2259		ceph_queue_cap_reclaim_work(mdsc);
2260}
2261
2262void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2263{
2264	if (mdsc->stopping)
2265		return;
2266
2267        if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2268                dout("caps reclaim work queued\n");
2269        } else {
2270                dout("failed to queue caps release work\n");
2271        }
2272}
2273
2274void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2275{
2276	int val;
2277	if (!nr)
2278		return;
2279	val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2280	if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2281		atomic_set(&mdsc->cap_reclaim_pending, 0);
2282		ceph_queue_cap_reclaim_work(mdsc);
2283	}
2284}
2285
2286/*
2287 * requests
2288 */
2289
2290int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2291				    struct inode *dir)
2292{
2293	struct ceph_inode_info *ci = ceph_inode(dir);
2294	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2295	struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2296	size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2297	unsigned int num_entries;
2298	int order;
2299
2300	spin_lock(&ci->i_ceph_lock);
2301	num_entries = ci->i_files + ci->i_subdirs;
2302	spin_unlock(&ci->i_ceph_lock);
2303	num_entries = max(num_entries, 1U);
2304	num_entries = min(num_entries, opt->max_readdir);
2305
2306	order = get_order(size * num_entries);
2307	while (order >= 0) {
2308		rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2309							     __GFP_NOWARN,
2310							     order);
2311		if (rinfo->dir_entries)
2312			break;
2313		order--;
2314	}
2315	if (!rinfo->dir_entries)
2316		return -ENOMEM;
2317
2318	num_entries = (PAGE_SIZE << order) / size;
2319	num_entries = min(num_entries, opt->max_readdir);
2320
2321	rinfo->dir_buf_size = PAGE_SIZE << order;
2322	req->r_num_caps = num_entries + 1;
2323	req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2324	req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2325	return 0;
2326}
2327
2328/*
2329 * Create an mds request.
2330 */
2331struct ceph_mds_request *
2332ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2333{
2334	struct ceph_mds_request *req;
2335
2336	req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2337	if (!req)
2338		return ERR_PTR(-ENOMEM);
2339
2340	mutex_init(&req->r_fill_mutex);
2341	req->r_mdsc = mdsc;
2342	req->r_started = jiffies;
2343	req->r_start_latency = ktime_get();
2344	req->r_resend_mds = -1;
2345	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2346	INIT_LIST_HEAD(&req->r_unsafe_target_item);
2347	req->r_fmode = -1;
2348	kref_init(&req->r_kref);
2349	RB_CLEAR_NODE(&req->r_node);
2350	INIT_LIST_HEAD(&req->r_wait);
2351	init_completion(&req->r_completion);
2352	init_completion(&req->r_safe_completion);
2353	INIT_LIST_HEAD(&req->r_unsafe_item);
2354
2355	ktime_get_coarse_real_ts64(&req->r_stamp);
2356
2357	req->r_op = op;
2358	req->r_direct_mode = mode;
2359	return req;
2360}
2361
2362/*
2363 * return oldest (lowest) request, tid in request tree, 0 if none.
2364 *
2365 * called under mdsc->mutex.
2366 */
2367static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2368{
2369	if (RB_EMPTY_ROOT(&mdsc->request_tree))
2370		return NULL;
2371	return rb_entry(rb_first(&mdsc->request_tree),
2372			struct ceph_mds_request, r_node);
2373}
2374
2375static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2376{
2377	return mdsc->oldest_tid;
2378}
2379
2380/*
2381 * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
2382 * on build_path_from_dentry in fs/cifs/dir.c.
2383 *
2384 * If @stop_on_nosnap, generate path relative to the first non-snapped
2385 * inode.
2386 *
2387 * Encode hidden .snap dirs as a double /, i.e.
2388 *   foo/.snap/bar -> foo//bar
2389 */
2390char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2391			   int stop_on_nosnap)
2392{
2393	struct dentry *temp;
2394	char *path;
2395	int pos;
2396	unsigned seq;
2397	u64 base;
2398
2399	if (!dentry)
2400		return ERR_PTR(-EINVAL);
2401
2402	path = __getname();
2403	if (!path)
2404		return ERR_PTR(-ENOMEM);
2405retry:
2406	pos = PATH_MAX - 1;
2407	path[pos] = '\0';
2408
2409	seq = read_seqbegin(&rename_lock);
2410	rcu_read_lock();
2411	temp = dentry;
2412	for (;;) {
2413		struct inode *inode;
2414
2415		spin_lock(&temp->d_lock);
2416		inode = d_inode(temp);
2417		if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2418			dout("build_path path+%d: %p SNAPDIR\n",
2419			     pos, temp);
2420		} else if (stop_on_nosnap && inode && dentry != temp &&
2421			   ceph_snap(inode) == CEPH_NOSNAP) {
2422			spin_unlock(&temp->d_lock);
2423			pos++; /* get rid of any prepended '/' */
2424			break;
2425		} else {
2426			pos -= temp->d_name.len;
2427			if (pos < 0) {
2428				spin_unlock(&temp->d_lock);
2429				break;
2430			}
2431			memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2432		}
2433		spin_unlock(&temp->d_lock);
2434		temp = READ_ONCE(temp->d_parent);
2435
2436		/* Are we at the root? */
2437		if (IS_ROOT(temp))
2438			break;
2439
2440		/* Are we out of buffer? */
2441		if (--pos < 0)
2442			break;
2443
2444		path[pos] = '/';
2445	}
2446	base = ceph_ino(d_inode(temp));
2447	rcu_read_unlock();
2448
2449	if (read_seqretry(&rename_lock, seq))
2450		goto retry;
2451
2452	if (pos < 0) {
2453		/*
2454		 * A rename didn't occur, but somehow we didn't end up where
2455		 * we thought we would. Throw a warning and try again.
2456		 */
2457		pr_warn("build_path did not end path lookup where "
2458			"expected, pos is %d\n", pos);
2459		goto retry;
2460	}
2461
2462	*pbase = base;
2463	*plen = PATH_MAX - 1 - pos;
2464	dout("build_path on %p %d built %llx '%.*s'\n",
2465	     dentry, d_count(dentry), base, *plen, path + pos);
2466	return path + pos;
2467}
2468
2469static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2470			     const char **ppath, int *ppathlen, u64 *pino,
2471			     bool *pfreepath, bool parent_locked)
2472{
2473	char *path;
2474
2475	rcu_read_lock();
2476	if (!dir)
2477		dir = d_inode_rcu(dentry->d_parent);
2478	if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
2479		*pino = ceph_ino(dir);
2480		rcu_read_unlock();
2481		*ppath = dentry->d_name.name;
2482		*ppathlen = dentry->d_name.len;
2483		return 0;
2484	}
2485	rcu_read_unlock();
2486	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2487	if (IS_ERR(path))
2488		return PTR_ERR(path);
2489	*ppath = path;
2490	*pfreepath = true;
2491	return 0;
2492}
2493
2494static int build_inode_path(struct inode *inode,
2495			    const char **ppath, int *ppathlen, u64 *pino,
2496			    bool *pfreepath)
2497{
2498	struct dentry *dentry;
2499	char *path;
2500
2501	if (ceph_snap(inode) == CEPH_NOSNAP) {
2502		*pino = ceph_ino(inode);
2503		*ppathlen = 0;
2504		return 0;
2505	}
2506	dentry = d_find_alias(inode);
2507	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2508	dput(dentry);
2509	if (IS_ERR(path))
2510		return PTR_ERR(path);
2511	*ppath = path;
2512	*pfreepath = true;
2513	return 0;
2514}
2515
2516/*
2517 * request arguments may be specified via an inode *, a dentry *, or
2518 * an explicit ino+path.
2519 */
2520static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2521				  struct inode *rdiri, const char *rpath,
2522				  u64 rino, const char **ppath, int *pathlen,
2523				  u64 *ino, bool *freepath, bool parent_locked)
2524{
2525	int r = 0;
2526
2527	if (rinode) {
2528		r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2529		dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2530		     ceph_snap(rinode));
2531	} else if (rdentry) {
2532		r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2533					freepath, parent_locked);
2534		dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2535		     *ppath);
2536	} else if (rpath || rino) {
2537		*ino = rino;
2538		*ppath = rpath;
2539		*pathlen = rpath ? strlen(rpath) : 0;
2540		dout(" path %.*s\n", *pathlen, rpath);
2541	}
2542
2543	return r;
2544}
2545
2546/*
2547 * called under mdsc->mutex
2548 */
2549static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
2550					       struct ceph_mds_request *req,
2551					       int mds, bool drop_cap_releases)
2552{
2553	struct ceph_msg *msg;
2554	struct ceph_mds_request_head *head;
2555	const char *path1 = NULL;
2556	const char *path2 = NULL;
2557	u64 ino1 = 0, ino2 = 0;
2558	int pathlen1 = 0, pathlen2 = 0;
2559	bool freepath1 = false, freepath2 = false;
2560	int len;
2561	u16 releases;
2562	void *p, *end;
2563	int ret;
2564
2565	ret = set_request_path_attr(req->r_inode, req->r_dentry,
2566			      req->r_parent, req->r_path1, req->r_ino1.ino,
2567			      &path1, &pathlen1, &ino1, &freepath1,
2568			      test_bit(CEPH_MDS_R_PARENT_LOCKED,
2569					&req->r_req_flags));
2570	if (ret < 0) {
2571		msg = ERR_PTR(ret);
2572		goto out;
2573	}
2574
2575	/* If r_old_dentry is set, then assume that its parent is locked */
2576	ret = set_request_path_attr(NULL, req->r_old_dentry,
2577			      req->r_old_dentry_dir,
2578			      req->r_path2, req->r_ino2.ino,
2579			      &path2, &pathlen2, &ino2, &freepath2, true);
2580	if (ret < 0) {
2581		msg = ERR_PTR(ret);
2582		goto out_free1;
2583	}
2584
2585	len = sizeof(*head) +
2586		pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2587		sizeof(struct ceph_timespec);
2588
2589	/* calculate (max) length for cap releases */
2590	len += sizeof(struct ceph_mds_request_release) *
2591		(!!req->r_inode_drop + !!req->r_dentry_drop +
2592		 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2593	if (req->r_dentry_drop)
2594		len += pathlen1;
2595	if (req->r_old_dentry_drop)
2596		len += pathlen2;
2597
2598	msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2599	if (!msg) {
2600		msg = ERR_PTR(-ENOMEM);
2601		goto out_free2;
2602	}
2603
2604	msg->hdr.version = cpu_to_le16(2);
2605	msg->hdr.tid = cpu_to_le64(req->r_tid);
2606
2607	head = msg->front.iov_base;
2608	p = msg->front.iov_base + sizeof(*head);
2609	end = msg->front.iov_base + msg->front.iov_len;
2610
2611	head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2612	head->op = cpu_to_le32(req->r_op);
2613	head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
2614	head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
2615	head->ino = cpu_to_le64(req->r_deleg_ino);
2616	head->args = req->r_args;
2617
2618	ceph_encode_filepath(&p, end, ino1, path1);
2619	ceph_encode_filepath(&p, end, ino2, path2);
2620
2621	/* make note of release offset, in case we need to replay */
2622	req->r_request_release_offset = p - msg->front.iov_base;
2623
2624	/* cap releases */
2625	releases = 0;
2626	if (req->r_inode_drop)
2627		releases += ceph_encode_inode_release(&p,
2628		      req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2629		      mds, req->r_inode_drop, req->r_inode_unless,
2630		      req->r_op == CEPH_MDS_OP_READDIR);
2631	if (req->r_dentry_drop)
2632		releases += ceph_encode_dentry_release(&p, req->r_dentry,
2633				req->r_parent, mds, req->r_dentry_drop,
2634				req->r_dentry_unless);
2635	if (req->r_old_dentry_drop)
2636		releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2637				req->r_old_dentry_dir, mds,
2638				req->r_old_dentry_drop,
2639				req->r_old_dentry_unless);
2640	if (req->r_old_inode_drop)
2641		releases += ceph_encode_inode_release(&p,
2642		      d_inode(req->r_old_dentry),
2643		      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2644
2645	if (drop_cap_releases) {
2646		releases = 0;
2647		p = msg->front.iov_base + req->r_request_release_offset;
2648	}
2649
2650	head->num_releases = cpu_to_le16(releases);
2651
2652	/* time stamp */
2653	{
2654		struct ceph_timespec ts;
2655		ceph_encode_timespec64(&ts, &req->r_stamp);
2656		ceph_encode_copy(&p, &ts, sizeof(ts));
2657	}
2658
2659	if (WARN_ON_ONCE(p > end)) {
2660		ceph_msg_put(msg);
2661		msg = ERR_PTR(-ERANGE);
2662		goto out_free2;
2663	}
2664
2665	msg->front.iov_len = p - msg->front.iov_base;
2666	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2667
2668	if (req->r_pagelist) {
2669		struct ceph_pagelist *pagelist = req->r_pagelist;
2670		ceph_msg_data_add_pagelist(msg, pagelist);
2671		msg->hdr.data_len = cpu_to_le32(pagelist->length);
2672	} else {
2673		msg->hdr.data_len = 0;
2674	}
2675
2676	msg->hdr.data_off = cpu_to_le16(0);
2677
2678out_free2:
2679	if (freepath2)
2680		ceph_mdsc_free_path((char *)path2, pathlen2);
2681out_free1:
2682	if (freepath1)
2683		ceph_mdsc_free_path((char *)path1, pathlen1);
2684out:
2685	return msg;
2686}
2687
2688/*
2689 * called under mdsc->mutex if error, under no mutex if
2690 * success.
2691 */
2692static void complete_request(struct ceph_mds_client *mdsc,
2693			     struct ceph_mds_request *req)
2694{
2695	req->r_end_latency = ktime_get();
2696
2697	if (req->r_callback)
2698		req->r_callback(mdsc, req);
2699	complete_all(&req->r_completion);
2700}
2701
2702/*
2703 * called under mdsc->mutex
2704 */
2705static int __prepare_send_request(struct ceph_mds_client *mdsc,
2706				  struct ceph_mds_request *req,
2707				  int mds, bool drop_cap_releases)
2708{
2709	struct ceph_mds_request_head *rhead;
2710	struct ceph_msg *msg;
2711	int flags = 0;
2712
2713	req->r_attempts++;
2714	if (req->r_inode) {
2715		struct ceph_cap *cap =
2716			ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2717
2718		if (cap)
2719			req->r_sent_on_mseq = cap->mseq;
2720		else
2721			req->r_sent_on_mseq = -1;
2722	}
2723	dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2724	     req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2725
2726	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2727		void *p;
2728		/*
2729		 * Replay.  Do not regenerate message (and rebuild
2730		 * paths, etc.); just use the original message.
2731		 * Rebuilding paths will break for renames because
2732		 * d_move mangles the src name.
2733		 */
2734		msg = req->r_request;
2735		rhead = msg->front.iov_base;
2736
2737		flags = le32_to_cpu(rhead->flags);
2738		flags |= CEPH_MDS_FLAG_REPLAY;
2739		rhead->flags = cpu_to_le32(flags);
2740
2741		if (req->r_target_inode)
2742			rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2743
2744		rhead->num_retry = req->r_attempts - 1;
2745
2746		/* remove cap/dentry releases from message */
2747		rhead->num_releases = 0;
2748
2749		/* time stamp */
2750		p = msg->front.iov_base + req->r_request_release_offset;
2751		{
2752			struct ceph_timespec ts;
2753			ceph_encode_timespec64(&ts, &req->r_stamp);
2754			ceph_encode_copy(&p, &ts, sizeof(ts));
2755		}
2756
2757		msg->front.iov_len = p - msg->front.iov_base;
2758		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2759		return 0;
2760	}
2761
2762	if (req->r_request) {
2763		ceph_msg_put(req->r_request);
2764		req->r_request = NULL;
2765	}
2766	msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2767	if (IS_ERR(msg)) {
2768		req->r_err = PTR_ERR(msg);
2769		return PTR_ERR(msg);
2770	}
2771	req->r_request = msg;
2772
2773	rhead = msg->front.iov_base;
2774	rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2775	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2776		flags |= CEPH_MDS_FLAG_REPLAY;
2777	if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
2778		flags |= CEPH_MDS_FLAG_ASYNC;
2779	if (req->r_parent)
2780		flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2781	rhead->flags = cpu_to_le32(flags);
2782	rhead->num_fwd = req->r_num_fwd;
2783	rhead->num_retry = req->r_attempts - 1;
2784
2785	dout(" r_parent = %p\n", req->r_parent);
2786	return 0;
2787}
2788
2789/*
2790 * called under mdsc->mutex
2791 */
2792static int __send_request(struct ceph_mds_client *mdsc,
2793			  struct ceph_mds_session *session,
2794			  struct ceph_mds_request *req,
2795			  bool drop_cap_releases)
2796{
2797	int err;
2798
2799	err = __prepare_send_request(mdsc, req, session->s_mds,
2800				     drop_cap_releases);
2801	if (!err) {
2802		ceph_msg_get(req->r_request);
2803		ceph_con_send(&session->s_con, req->r_request);
2804	}
2805
2806	return err;
2807}
2808
2809/*
2810 * send request, or put it on the appropriate wait list.
2811 */
2812static void __do_request(struct ceph_mds_client *mdsc,
2813			struct ceph_mds_request *req)
2814{
2815	struct ceph_mds_session *session = NULL;
2816	int mds = -1;
2817	int err = 0;
2818	bool random;
2819
2820	if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2821		if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2822			__unregister_request(mdsc, req);
2823		return;
2824	}
2825
2826	if (req->r_timeout &&
2827	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2828		dout("do_request timed out\n");
2829		err = -ETIMEDOUT;
2830		goto finish;
2831	}
2832	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2833		dout("do_request forced umount\n");
2834		err = -EIO;
2835		goto finish;
2836	}
2837	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2838		if (mdsc->mdsmap_err) {
2839			err = mdsc->mdsmap_err;
2840			dout("do_request mdsmap err %d\n", err);
2841			goto finish;
2842		}
2843		if (mdsc->mdsmap->m_epoch == 0) {
2844			dout("do_request no mdsmap, waiting for map\n");
2845			list_add(&req->r_wait, &mdsc->waiting_for_map);
2846			return;
2847		}
2848		if (!(mdsc->fsc->mount_options->flags &
2849		      CEPH_MOUNT_OPT_MOUNTWAIT) &&
2850		    !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2851			err = -EHOSTUNREACH;
2852			goto finish;
2853		}
2854	}
2855
2856	put_request_session(req);
2857
2858	mds = __choose_mds(mdsc, req, &random);
2859	if (mds < 0 ||
2860	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2861		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2862			err = -EJUKEBOX;
2863			goto finish;
2864		}
2865		dout("do_request no mds or not active, waiting for map\n");
2866		list_add(&req->r_wait, &mdsc->waiting_for_map);
2867		return;
2868	}
2869
2870	/* get, open session */
2871	session = __ceph_lookup_mds_session(mdsc, mds);
2872	if (!session) {
2873		session = register_session(mdsc, mds);
2874		if (IS_ERR(session)) {
2875			err = PTR_ERR(session);
2876			goto finish;
2877		}
2878	}
2879	req->r_session = ceph_get_mds_session(session);
2880
2881	dout("do_request mds%d session %p state %s\n", mds, session,
2882	     ceph_session_state_name(session->s_state));
2883	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2884	    session->s_state != CEPH_MDS_SESSION_HUNG) {
2885		if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2886			err = -EACCES;
2887			goto out_session;
2888		}
2889		/*
2890		 * We cannot queue async requests since the caps and delegated
2891		 * inodes are bound to the session. Just return -EJUKEBOX and
2892		 * let the caller retry a sync request in that case.
2893		 */
2894		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2895			err = -EJUKEBOX;
2896			goto out_session;
2897		}
2898		if (session->s_state == CEPH_MDS_SESSION_NEW ||
2899		    session->s_state == CEPH_MDS_SESSION_CLOSING) {
2900			err = __open_session(mdsc, session);
2901			if (err)
2902				goto out_session;
2903			/* retry the same mds later */
2904			if (random)
2905				req->r_resend_mds = mds;
2906		}
2907		list_add(&req->r_wait, &session->s_waiting);
2908		goto out_session;
2909	}
2910
2911	/* send request */
2912	req->r_resend_mds = -1;   /* forget any previous mds hint */
2913
2914	if (req->r_request_started == 0)   /* note request start time */
2915		req->r_request_started = jiffies;
2916
2917	err = __send_request(mdsc, session, req, false);
2918
2919out_session:
2920	ceph_put_mds_session(session);
2921finish:
2922	if (err) {
2923		dout("__do_request early error %d\n", err);
2924		req->r_err = err;
2925		complete_request(mdsc, req);
2926		__unregister_request(mdsc, req);
2927	}
2928	return;
2929}
2930
2931/*
2932 * called under mdsc->mutex
2933 */
2934static void __wake_requests(struct ceph_mds_client *mdsc,
2935			    struct list_head *head)
2936{
2937	struct ceph_mds_request *req;
2938	LIST_HEAD(tmp_list);
2939
2940	list_splice_init(head, &tmp_list);
2941
2942	while (!list_empty(&tmp_list)) {
2943		req = list_entry(tmp_list.next,
2944				 struct ceph_mds_request, r_wait);
2945		list_del_init(&req->r_wait);
2946		dout(" wake request %p tid %llu\n", req, req->r_tid);
2947		__do_request(mdsc, req);
2948	}
2949}
2950
2951/*
2952 * Wake up threads with requests pending for @mds, so that they can
2953 * resubmit their requests to a possibly different mds.
2954 */
2955static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2956{
2957	struct ceph_mds_request *req;
2958	struct rb_node *p = rb_first(&mdsc->request_tree);
2959
2960	dout("kick_requests mds%d\n", mds);
2961	while (p) {
2962		req = rb_entry(p, struct ceph_mds_request, r_node);
2963		p = rb_next(p);
2964		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2965			continue;
2966		if (req->r_attempts > 0)
2967			continue; /* only new requests */
2968		if (req->r_session &&
2969		    req->r_session->s_mds == mds) {
2970			dout(" kicking tid %llu\n", req->r_tid);
2971			list_del_init(&req->r_wait);
2972			__do_request(mdsc, req);
2973		}
2974	}
2975}
2976
2977int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
2978			      struct ceph_mds_request *req)
2979{
2980	int err = 0;
2981
2982	/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2983	if (req->r_inode)
2984		ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2985	if (req->r_parent) {
2986		struct ceph_inode_info *ci = ceph_inode(req->r_parent);
2987		int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
2988			    CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
2989		spin_lock(&ci->i_ceph_lock);
2990		ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
2991		__ceph_touch_fmode(ci, mdsc, fmode);
2992		spin_unlock(&ci->i_ceph_lock);
2993		ihold(req->r_parent);
2994	}
2995	if (req->r_old_dentry_dir)
2996		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2997				  CEPH_CAP_PIN);
2998
2999	if (req->r_inode) {
3000		err = ceph_wait_on_async_create(req->r_inode);
3001		if (err) {
3002			dout("%s: wait for async create returned: %d\n",
3003			     __func__, err);
3004			return err;
3005		}
3006	}
3007
3008	if (!err && req->r_old_inode) {
3009		err = ceph_wait_on_async_create(req->r_old_inode);
3010		if (err) {
3011			dout("%s: wait for async create returned: %d\n",
3012			     __func__, err);
3013			return err;
3014		}
3015	}
3016
3017	dout("submit_request on %p for inode %p\n", req, dir);
3018	mutex_lock(&mdsc->mutex);
3019	__register_request(mdsc, req, dir);
3020	__do_request(mdsc, req);
3021	err = req->r_err;
3022	mutex_unlock(&mdsc->mutex);
3023	return err;
3024}
3025
3026static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3027				  struct ceph_mds_request *req)
3028{
3029	int err;
3030
3031	/* wait */
3032	dout("do_request waiting\n");
3033	if (!req->r_timeout && req->r_wait_for_completion) {
3034		err = req->r_wait_for_completion(mdsc, req);
3035	} else {
3036		long timeleft = wait_for_completion_killable_timeout(
3037					&req->r_completion,
3038					ceph_timeout_jiffies(req->r_timeout));
3039		if (timeleft > 0)
3040			err = 0;
3041		else if (!timeleft)
3042			err = -ETIMEDOUT;  /* timed out */
3043		else
3044			err = timeleft;  /* killed */
3045	}
3046	dout("do_request waited, got %d\n", err);
3047	mutex_lock(&mdsc->mutex);
3048
3049	/* only abort if we didn't race with a real reply */
3050	if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3051		err = le32_to_cpu(req->r_reply_info.head->result);
3052	} else if (err < 0) {
3053		dout("aborted request %lld with %d\n", req->r_tid, err);
3054
3055		/*
3056		 * ensure we aren't running concurrently with
3057		 * ceph_fill_trace or ceph_readdir_prepopulate, which
3058		 * rely on locks (dir mutex) held by our caller.
3059		 */
3060		mutex_lock(&req->r_fill_mutex);
3061		req->r_err = err;
3062		set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3063		mutex_unlock(&req->r_fill_mutex);
3064
3065		if (req->r_parent &&
3066		    (req->r_op & CEPH_MDS_OP_WRITE))
3067			ceph_invalidate_dir_request(req);
3068	} else {
3069		err = req->r_err;
3070	}
3071
3072	mutex_unlock(&mdsc->mutex);
3073	return err;
3074}
3075
3076/*
3077 * Synchrously perform an mds request.  Take care of all of the
3078 * session setup, forwarding, retry details.
3079 */
3080int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3081			 struct inode *dir,
3082			 struct ceph_mds_request *req)
3083{
3084	int err;
3085
3086	dout("do_request on %p\n", req);
3087
3088	/* issue */
3089	err = ceph_mdsc_submit_request(mdsc, dir, req);
3090	if (!err)
3091		err = ceph_mdsc_wait_request(mdsc, req);
3092	dout("do_request %p done, result %d\n", req, err);
3093	return err;
3094}
3095
3096/*
3097 * Invalidate dir's completeness, dentry lease state on an aborted MDS
3098 * namespace request.
3099 */
3100void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3101{
3102	struct inode *dir = req->r_parent;
3103	struct inode *old_dir = req->r_old_dentry_dir;
3104
3105	dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
3106
3107	ceph_dir_clear_complete(dir);
3108	if (old_dir)
3109		ceph_dir_clear_complete(old_dir);
3110	if (req->r_dentry)
3111		ceph_invalidate_dentry_lease(req->r_dentry);
3112	if (req->r_old_dentry)
3113		ceph_invalidate_dentry_lease(req->r_old_dentry);
3114}
3115
3116/*
3117 * Handle mds reply.
3118 *
3119 * We take the session mutex and parse and process the reply immediately.
3120 * This preserves the logical ordering of replies, capabilities, etc., sent
3121 * by the MDS as they are applied to our local cache.
3122 */
3123static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3124{
3125	struct ceph_mds_client *mdsc = session->s_mdsc;
3126	struct ceph_mds_request *req;
3127	struct ceph_mds_reply_head *head = msg->front.iov_base;
3128	struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
3129	struct ceph_snap_realm *realm;
3130	u64 tid;
3131	int err, result;
3132	int mds = session->s_mds;
3133
3134	if (msg->front.iov_len < sizeof(*head)) {
3135		pr_err("mdsc_handle_reply got corrupt (short) reply\n");
3136		ceph_msg_dump(msg);
3137		return;
3138	}
3139
3140	/* get request, session */
3141	tid = le64_to_cpu(msg->hdr.tid);
3142	mutex_lock(&mdsc->mutex);
3143	req = lookup_get_request(mdsc, tid);
3144	if (!req) {
3145		dout("handle_reply on unknown tid %llu\n", tid);
3146		mutex_unlock(&mdsc->mutex);
3147		return;
3148	}
3149	dout("handle_reply %p\n", req);
3150
3151	/* correct session? */
3152	if (req->r_session != session) {
3153		pr_err("mdsc_handle_reply got %llu on session mds%d"
3154		       " not mds%d\n", tid, session->s_mds,
3155		       req->r_session ? req->r_session->s_mds : -1);
3156		mutex_unlock(&mdsc->mutex);
3157		goto out;
3158	}
3159
3160	/* dup? */
3161	if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3162	    (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3163		pr_warn("got a dup %s reply on %llu from mds%d\n",
3164			   head->safe ? "safe" : "unsafe", tid, mds);
3165		mutex_unlock(&mdsc->mutex);
3166		goto out;
3167	}
3168	if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3169		pr_warn("got unsafe after safe on %llu from mds%d\n",
3170			   tid, mds);
3171		mutex_unlock(&mdsc->mutex);
3172		goto out;
3173	}
3174
3175	result = le32_to_cpu(head->result);
3176
3177	/*
3178	 * Handle an ESTALE
3179	 * if we're not talking to the authority, send to them
3180	 * if the authority has changed while we weren't looking,
3181	 * send to new authority
3182	 * Otherwise we just have to return an ESTALE
3183	 */
3184	if (result == -ESTALE) {
3185		dout("got ESTALE on request %llu\n", req->r_tid);
3186		req->r_resend_mds = -1;
3187		if (req->r_direct_mode != USE_AUTH_MDS) {
3188			dout("not using auth, setting for that now\n");
3189			req->r_direct_mode = USE_AUTH_MDS;
3190			__do_request(mdsc, req);
3191			mutex_unlock(&mdsc->mutex);
3192			goto out;
3193		} else  {
3194			int mds = __choose_mds(mdsc, req, NULL);
3195			if (mds >= 0 && mds != req->r_session->s_mds) {
3196				dout("but auth changed, so resending\n");
3197				__do_request(mdsc, req);
3198				mutex_unlock(&mdsc->mutex);
3199				goto out;
3200			}
3201		}
3202		dout("have to return ESTALE on request %llu\n", req->r_tid);
3203	}
3204
3205
3206	if (head->safe) {
3207		set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3208		__unregister_request(mdsc, req);
3209
3210		/* last request during umount? */
3211		if (mdsc->stopping && !__get_oldest_req(mdsc))
3212			complete_all(&mdsc->safe_umount_waiters);
3213
3214		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3215			/*
3216			 * We already handled the unsafe response, now do the
3217			 * cleanup.  No need to examine the response; the MDS
3218			 * doesn't include any result info in the safe
3219			 * response.  And even if it did, there is nothing
3220			 * useful we could do with a revised return value.
3221			 */
3222			dout("got safe reply %llu, mds%d\n", tid, mds);
3223
3224			mutex_unlock(&mdsc->mutex);
3225			goto out;
3226		}
3227	} else {
3228		set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3229		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3230	}
3231
3232	dout("handle_reply tid %lld result %d\n", tid, result);
3233	rinfo = &req->r_reply_info;
3234	if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3235		err = parse_reply_info(session, msg, rinfo, (u64)-1);
3236	else
3237		err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
3238	mutex_unlock(&mdsc->mutex);
3239
3240	mutex_lock(&session->s_mutex);
3241	if (err < 0) {
3242		pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
3243		ceph_msg_dump(msg);
3244		goto out_err;
3245	}
3246
3247	/* snap trace */
3248	realm = NULL;
3249	if (rinfo->snapblob_len) {
3250		down_write(&mdsc->snap_rwsem);
3251		ceph_update_snap_trace(mdsc, rinfo->snapblob,
3252				rinfo->snapblob + rinfo->snapblob_len,
3253				le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3254				&realm);
3255		downgrade_write(&mdsc->snap_rwsem);
3256	} else {
3257		down_read(&mdsc->snap_rwsem);
3258	}
3259
3260	/* insert trace into our cache */
3261	mutex_lock(&req->r_fill_mutex);
3262	current->journal_info = req;
3263	err = ceph_fill_trace(mdsc->fsc->sb, req);
3264	if (err == 0) {
3265		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3266				    req->r_op == CEPH_MDS_OP_LSSNAP))
3267			ceph_readdir_prepopulate(req, req->r_session);
3268	}
3269	current->journal_info = NULL;
3270	mutex_unlock(&req->r_fill_mutex);
3271
3272	up_read(&mdsc->snap_rwsem);
3273	if (realm)
3274		ceph_put_snap_realm(mdsc, realm);
3275
3276	if (err == 0) {
3277		if (req->r_target_inode &&
3278		    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3279			struct ceph_inode_info *ci =
3280				ceph_inode(req->r_target_inode);
3281			spin_lock(&ci->i_unsafe_lock);
3282			list_add_tail(&req->r_unsafe_target_item,
3283				      &ci->i_unsafe_iops);
3284			spin_unlock(&ci->i_unsafe_lock);
3285		}
3286
3287		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
3288	}
3289out_err:
3290	mutex_lock(&mdsc->mutex);
3291	if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3292		if (err) {
3293			req->r_err = err;
3294		} else {
3295			req->r_reply =  ceph_msg_get(msg);
3296			set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
3297		}
3298	} else {
3299		dout("reply arrived after request %lld was aborted\n", tid);
3300	}
3301	mutex_unlock(&mdsc->mutex);
3302
3303	mutex_unlock(&session->s_mutex);
3304
3305	/* kick calling process */
3306	complete_request(mdsc, req);
3307
3308	ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency,
3309				     req->r_end_latency, err);
3310out:
3311	ceph_mdsc_put_request(req);
3312	return;
3313}
3314
3315
3316
3317/*
3318 * handle mds notification that our request has been forwarded.
3319 */
3320static void handle_forward(struct ceph_mds_client *mdsc,
3321			   struct ceph_mds_session *session,
3322			   struct ceph_msg *msg)
3323{
3324	struct ceph_mds_request *req;
3325	u64 tid = le64_to_cpu(msg->hdr.tid);
3326	u32 next_mds;
3327	u32 fwd_seq;
3328	int err = -EINVAL;
3329	void *p = msg->front.iov_base;
3330	void *end = p + msg->front.iov_len;
3331
3332	ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3333	next_mds = ceph_decode_32(&p);
3334	fwd_seq = ceph_decode_32(&p);
3335
3336	mutex_lock(&mdsc->mutex);
3337	req = lookup_get_request(mdsc, tid);
3338	if (!req) {
3339		dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
3340		goto out;  /* dup reply? */
3341	}
3342
3343	if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3344		dout("forward tid %llu aborted, unregistering\n", tid);
3345		__unregister_request(mdsc, req);
3346	} else if (fwd_seq <= req->r_num_fwd) {
3347		dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3348		     tid, next_mds, req->r_num_fwd, fwd_seq);
3349	} else {
3350		/* resend. forward race not possible; mds would drop */
3351		dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3352		BUG_ON(req->r_err);
3353		BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3354		req->r_attempts = 0;
3355		req->r_num_fwd = fwd_seq;
3356		req->r_resend_mds = next_mds;
3357		put_request_session(req);
3358		__do_request(mdsc, req);
3359	}
3360	ceph_mdsc_put_request(req);
3361out:
3362	mutex_unlock(&mdsc->mutex);
3363	return;
3364
3365bad:
3366	pr_err("mdsc_handle_forward decode error err=%d\n", err);
3367}
3368
3369static int __decode_session_metadata(void **p, void *end,
3370				     bool *blocklisted)
3371{
3372	/* map<string,string> */
3373	u32 n;
3374	bool err_str;
3375	ceph_decode_32_safe(p, end, n, bad);
3376	while (n-- > 0) {
3377		u32 len;
3378		ceph_decode_32_safe(p, end, len, bad);
3379		ceph_decode_need(p, end, len, bad);
3380		err_str = !strncmp(*p, "error_string", len);
3381		*p += len;
3382		ceph_decode_32_safe(p, end, len, bad);
3383		ceph_decode_need(p, end, len, bad);
3384		/*
3385		 * Match "blocklisted (blacklisted)" from newer MDSes,
3386		 * or "blacklisted" from older MDSes.
3387		 */
3388		if (err_str && strnstr(*p, "blacklisted", len))
3389			*blocklisted = true;
3390		*p += len;
3391	}
3392	return 0;
3393bad:
3394	return -1;
3395}
3396
3397/*
3398 * handle a mds session control message
3399 */
3400static void handle_session(struct ceph_mds_session *session,
3401			   struct ceph_msg *msg)
3402{
3403	struct ceph_mds_client *mdsc = session->s_mdsc;
3404	int mds = session->s_mds;
3405	int msg_version = le16_to_cpu(msg->hdr.version);
3406	void *p = msg->front.iov_base;
3407	void *end = p + msg->front.iov_len;
3408	struct ceph_mds_session_head *h;
3409	u32 op;
3410	u64 seq, features = 0;
3411	int wake = 0;
3412	bool blocklisted = false;
3413
3414	/* decode */
3415	ceph_decode_need(&p, end, sizeof(*h), bad);
3416	h = p;
3417	p += sizeof(*h);
3418
3419	op = le32_to_cpu(h->op);
3420	seq = le64_to_cpu(h->seq);
3421
3422	if (msg_version >= 3) {
3423		u32 len;
3424		/* version >= 2, metadata */
3425		if (__decode_session_metadata(&p, end, &blocklisted) < 0)
3426			goto bad;
3427		/* version >= 3, feature bits */
3428		ceph_decode_32_safe(&p, end, len, bad);
3429		if (len) {
3430			ceph_decode_64_safe(&p, end, features, bad);
3431			p += len - sizeof(features);
3432		}
3433	}
3434
3435	mutex_lock(&mdsc->mutex);
3436	if (op == CEPH_SESSION_CLOSE) {
3437		ceph_get_mds_session(session);
3438		__unregister_session(mdsc, session);
3439	}
3440	/* FIXME: this ttl calculation is generous */
3441	session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3442	mutex_unlock(&mdsc->mutex);
3443
3444	mutex_lock(&session->s_mutex);
3445
3446	dout("handle_session mds%d %s %p state %s seq %llu\n",
3447	     mds, ceph_session_op_name(op), session,
3448	     ceph_session_state_name(session->s_state), seq);
3449
3450	if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3451		session->s_state = CEPH_MDS_SESSION_OPEN;
3452		pr_info("mds%d came back\n", session->s_mds);
3453	}
3454
3455	switch (op) {
3456	case CEPH_SESSION_OPEN:
3457		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3458			pr_info("mds%d reconnect success\n", session->s_mds);
3459		session->s_state = CEPH_MDS_SESSION_OPEN;
3460		session->s_features = features;
3461		renewed_caps(mdsc, session, 0);
3462		if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features))
3463			metric_schedule_delayed(&mdsc->metric);
3464		wake = 1;
3465		if (mdsc->stopping)
3466			__close_session(mdsc, session);
3467		break;
3468
3469	case CEPH_SESSION_RENEWCAPS:
3470		if (session->s_renew_seq == seq)
3471			renewed_caps(mdsc, session, 1);
3472		break;
3473
3474	case CEPH_SESSION_CLOSE:
3475		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3476			pr_info("mds%d reconnect denied\n", session->s_mds);
3477		session->s_state = CEPH_MDS_SESSION_CLOSED;
3478		cleanup_session_requests(mdsc, session);
3479		remove_session_caps(session);
3480		wake = 2; /* for good measure */
3481		wake_up_all(&mdsc->session_close_wq);
3482		break;
3483
3484	case CEPH_SESSION_STALE:
3485		pr_info("mds%d caps went stale, renewing\n",
3486			session->s_mds);
3487		spin_lock(&session->s_gen_ttl_lock);
3488		session->s_cap_gen++;
3489		session->s_cap_ttl = jiffies - 1;
3490		spin_unlock(&session->s_gen_ttl_lock);
3491		send_renew_caps(mdsc, session);
3492		break;
3493
3494	case CEPH_SESSION_RECALL_STATE:
3495		ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3496		break;
3497
3498	case CEPH_SESSION_FLUSHMSG:
3499		/* flush cap releases */
3500		spin_lock(&session->s_cap_lock);
3501		if (session->s_num_cap_releases)
3502			ceph_flush_cap_releases(mdsc, session);
3503		spin_unlock(&session->s_cap_lock);
3504
3505		send_flushmsg_ack(mdsc, session, seq);
3506		break;
3507
3508	case CEPH_SESSION_FORCE_RO:
3509		dout("force_session_readonly %p\n", session);
3510		spin_lock(&session->s_cap_lock);
3511		session->s_readonly = true;
3512		spin_unlock(&session->s_cap_lock);
3513		wake_up_session_caps(session, FORCE_RO);
3514		break;
3515
3516	case CEPH_SESSION_REJECT:
3517		WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3518		pr_info("mds%d rejected session\n", session->s_mds);
3519		session->s_state = CEPH_MDS_SESSION_REJECTED;
3520		cleanup_session_requests(mdsc, session);
3521		remove_session_caps(session);
3522		if (blocklisted)
3523			mdsc->fsc->blocklisted = true;
3524		wake = 2; /* for good measure */
3525		break;
3526
3527	default:
3528		pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3529		WARN_ON(1);
3530	}
3531
3532	mutex_unlock(&session->s_mutex);
3533	if (wake) {
3534		mutex_lock(&mdsc->mutex);
3535		__wake_requests(mdsc, &session->s_waiting);
3536		if (wake == 2)
3537			kick_requests(mdsc, mds);
3538		mutex_unlock(&mdsc->mutex);
3539	}
3540	if (op == CEPH_SESSION_CLOSE)
3541		ceph_put_mds_session(session);
3542	return;
3543
3544bad:
3545	pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3546	       (int)msg->front.iov_len);
3547	ceph_msg_dump(msg);
3548	return;
3549}
3550
3551void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
3552{
3553	int dcaps;
3554
3555	dcaps = xchg(&req->r_dir_caps, 0);
3556	if (dcaps) {
3557		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3558		ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
3559	}
3560}
3561
3562void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
3563{
3564	int dcaps;
3565
3566	dcaps = xchg(&req->r_dir_caps, 0);
3567	if (dcaps) {
3568		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3569		ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
3570						dcaps);
3571	}
3572}
3573
3574/*
3575 * called under session->mutex.
3576 */
3577static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3578				   struct ceph_mds_session *session)
3579{
3580	struct ceph_mds_request *req, *nreq;
3581	struct rb_node *p;
3582
3583	dout("replay_unsafe_requests mds%d\n", session->s_mds);
3584
3585	mutex_lock(&mdsc->mutex);
3586	list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
3587		__send_request(mdsc, session, req, true);
3588
3589	/*
3590	 * also re-send old requests when MDS enters reconnect stage. So that MDS
3591	 * can process completed request in clientreplay stage.
3592	 */
3593	p = rb_first(&mdsc->request_tree);
3594	while (p) {
3595		req = rb_entry(p, struct ceph_mds_request, r_node);
3596		p = rb_next(p);
3597		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3598			continue;
3599		if (req->r_attempts == 0)
3600			continue; /* only old requests */
3601		if (!req->r_session)
3602			continue;
3603		if (req->r_session->s_mds != session->s_mds)
3604			continue;
3605
3606		ceph_mdsc_release_dir_caps_no_check(req);
3607
3608		__send_request(mdsc, session, req, true);
3609	}
3610	mutex_unlock(&mdsc->mutex);
3611}
3612
3613static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3614{
3615	struct ceph_msg *reply;
3616	struct ceph_pagelist *_pagelist;
3617	struct page *page;
3618	__le32 *addr;
3619	int err = -ENOMEM;
3620
3621	if (!recon_state->allow_multi)
3622		return -ENOSPC;
3623
3624	/* can't handle message that contains both caps and realm */
3625	BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3626
3627	/* pre-allocate new pagelist */
3628	_pagelist = ceph_pagelist_alloc(GFP_NOFS);
3629	if (!_pagelist)
3630		return -ENOMEM;
3631
3632	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3633	if (!reply)
3634		goto fail_msg;
3635
3636	/* placeholder for nr_caps */
3637	err = ceph_pagelist_encode_32(_pagelist, 0);
3638	if (err < 0)
3639		goto fail;
3640
3641	if (recon_state->nr_caps) {
3642		/* currently encoding caps */
3643		err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3644		if (err)
3645			goto fail;
3646	} else {
3647		/* placeholder for nr_realms (currently encoding relams) */
3648		err = ceph_pagelist_encode_32(_pagelist, 0);
3649		if (err < 0)
3650			goto fail;
3651	}
3652
3653	err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3654	if (err)
3655		goto fail;
3656
3657	page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3658	addr = kmap_atomic(page);
3659	if (recon_state->nr_caps) {
3660		/* currently encoding caps */
3661		*addr = cpu_to_le32(recon_state->nr_caps);
3662	} else {
3663		/* currently encoding relams */
3664		*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3665	}
3666	kunmap_atomic(addr);
3667
3668	reply->hdr.version = cpu_to_le16(5);
3669	reply->hdr.compat_version = cpu_to_le16(4);
3670
3671	reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3672	ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3673
3674	ceph_con_send(&recon_state->session->s_con, reply);
3675	ceph_pagelist_release(recon_state->pagelist);
3676
3677	recon_state->pagelist = _pagelist;
3678	recon_state->nr_caps = 0;
3679	recon_state->nr_realms = 0;
3680	recon_state->msg_version = 5;
3681	return 0;
3682fail:
3683	ceph_msg_put(reply);
3684fail_msg:
3685	ceph_pagelist_release(_pagelist);
3686	return err;
3687}
3688
3689static struct dentry* d_find_primary(struct inode *inode)
3690{
3691	struct dentry *alias, *dn = NULL;
3692
3693	if (hlist_empty(&inode->i_dentry))
3694		return NULL;
3695
3696	spin_lock(&inode->i_lock);
3697	if (hlist_empty(&inode->i_dentry))
3698		goto out_unlock;
3699
3700	if (S_ISDIR(inode->i_mode)) {
3701		alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
3702		if (!IS_ROOT(alias))
3703			dn = dget(alias);
3704		goto out_unlock;
3705	}
3706
3707	hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
3708		spin_lock(&alias->d_lock);
3709		if (!d_unhashed(alias) &&
3710		    (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
3711			dn = dget_dlock(alias);
3712		}
3713		spin_unlock(&alias->d_lock);
3714		if (dn)
3715			break;
3716	}
3717out_unlock:
3718	spin_unlock(&inode->i_lock);
3719	return dn;
3720}
3721
3722/*
3723 * Encode information about a cap for a reconnect with the MDS.
3724 */
3725static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
3726			  void *arg)
3727{
3728	union {
3729		struct ceph_mds_cap_reconnect v2;
3730		struct ceph_mds_cap_reconnect_v1 v1;
3731	} rec;
3732	struct ceph_inode_info *ci = cap->ci;
3733	struct ceph_reconnect_state *recon_state = arg;
3734	struct ceph_pagelist *pagelist = recon_state->pagelist;
3735	struct dentry *dentry;
3736	char *path;
3737	int pathlen = 0, err;
3738	u64 pathbase;
3739	u64 snap_follows;
3740
3741	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3742	     inode, ceph_vinop(inode), cap, cap->cap_id,
3743	     ceph_cap_string(cap->issued));
3744
3745	dentry = d_find_primary(inode);
3746	if (dentry) {
3747		/* set pathbase to parent dir when msg_version >= 2 */
3748		path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase,
3749					    recon_state->msg_version >= 2);
3750		dput(dentry);
3751		if (IS_ERR(path)) {
3752			err = PTR_ERR(path);
3753			goto out_err;
3754		}
3755	} else {
3756		path = NULL;
3757		pathbase = 0;
3758	}
3759
3760	spin_lock(&ci->i_ceph_lock);
3761	cap->seq = 0;        /* reset cap seq */
3762	cap->issue_seq = 0;  /* and issue_seq */
3763	cap->mseq = 0;       /* and migrate_seq */
3764	cap->cap_gen = cap->session->s_cap_gen;
3765
3766	/* These are lost when the session goes away */
3767	if (S_ISDIR(inode->i_mode)) {
3768		if (cap->issued & CEPH_CAP_DIR_CREATE) {
3769			ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
3770			memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
3771		}
3772		cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
3773	}
3774
3775	if (recon_state->msg_version >= 2) {
3776		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3777		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3778		rec.v2.issued = cpu_to_le32(cap->issued);
3779		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3780		rec.v2.pathbase = cpu_to_le64(pathbase);
3781		rec.v2.flock_len = (__force __le32)
3782			((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3783	} else {
3784		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3785		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3786		rec.v1.issued = cpu_to_le32(cap->issued);
3787		rec.v1.size = cpu_to_le64(inode->i_size);
3788		ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3789		ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3790		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3791		rec.v1.pathbase = cpu_to_le64(pathbase);
3792	}
3793
3794	if (list_empty(&ci->i_cap_snaps)) {
3795		snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3796	} else {
3797		struct ceph_cap_snap *capsnap =
3798			list_first_entry(&ci->i_cap_snaps,
3799					 struct ceph_cap_snap, ci_item);
3800		snap_follows = capsnap->follows;
3801	}
3802	spin_unlock(&ci->i_ceph_lock);
3803
3804	if (recon_state->msg_version >= 2) {
3805		int num_fcntl_locks, num_flock_locks;
3806		struct ceph_filelock *flocks = NULL;
3807		size_t struct_len, total_len = sizeof(u64);
3808		u8 struct_v = 0;
3809
3810encode_again:
3811		if (rec.v2.flock_len) {
3812			ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3813		} else {
3814			num_fcntl_locks = 0;
3815			num_flock_locks = 0;
3816		}
3817		if (num_fcntl_locks + num_flock_locks > 0) {
3818			flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3819					       sizeof(struct ceph_filelock),
3820					       GFP_NOFS);
3821			if (!flocks) {
3822				err = -ENOMEM;
3823				goto out_err;
3824			}
3825			err = ceph_encode_locks_to_buffer(inode, flocks,
3826							  num_fcntl_locks,
3827							  num_flock_locks);
3828			if (err) {
3829				kfree(flocks);
3830				flocks = NULL;
3831				if (err == -ENOSPC)
3832					goto encode_again;
3833				goto out_err;
3834			}
3835		} else {
3836			kfree(flocks);
3837			flocks = NULL;
3838		}
3839
3840		if (recon_state->msg_version >= 3) {
3841			/* version, compat_version and struct_len */
3842			total_len += 2 * sizeof(u8) + sizeof(u32);
3843			struct_v = 2;
3844		}
3845		/*
3846		 * number of encoded locks is stable, so copy to pagelist
3847		 */
3848		struct_len = 2 * sizeof(u32) +
3849			    (num_fcntl_locks + num_flock_locks) *
3850			    sizeof(struct ceph_filelock);
3851		rec.v2.flock_len = cpu_to_le32(struct_len);
3852
3853		struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
3854
3855		if (struct_v >= 2)
3856			struct_len += sizeof(u64); /* snap_follows */
3857
3858		total_len += struct_len;
3859
3860		if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3861			err = send_reconnect_partial(recon_state);
3862			if (err)
3863				goto out_freeflocks;
3864			pagelist = recon_state->pagelist;
3865		}
3866
3867		err = ceph_pagelist_reserve(pagelist, total_len);
3868		if (err)
3869			goto out_freeflocks;
3870
3871		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3872		if (recon_state->msg_version >= 3) {
3873			ceph_pagelist_encode_8(pagelist, struct_v);
3874			ceph_pagelist_encode_8(pagelist, 1);
3875			ceph_pagelist_encode_32(pagelist, struct_len);
3876		}
3877		ceph_pagelist_encode_string(pagelist, path, pathlen);
3878		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3879		ceph_locks_to_pagelist(flocks, pagelist,
3880				       num_fcntl_locks, num_flock_locks);
3881		if (struct_v >= 2)
3882			ceph_pagelist_encode_64(pagelist, snap_follows);
3883out_freeflocks:
3884		kfree(flocks);
3885	} else {
3886		err = ceph_pagelist_reserve(pagelist,
3887					    sizeof(u64) + sizeof(u32) +
3888					    pathlen + sizeof(rec.v1));
3889		if (err)
3890			goto out_err;
3891
3892		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3893		ceph_pagelist_encode_string(pagelist, path, pathlen);
3894		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3895	}
3896
3897out_err:
3898	ceph_mdsc_free_path(path, pathlen);
3899	if (!err)
3900		recon_state->nr_caps++;
3901	return err;
3902}
3903
3904static int encode_snap_realms(struct ceph_mds_client *mdsc,
3905			      struct ceph_reconnect_state *recon_state)
3906{
3907	struct rb_node *p;
3908	struct ceph_pagelist *pagelist = recon_state->pagelist;
3909	int err = 0;
3910
3911	if (recon_state->msg_version >= 4) {
3912		err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3913		if (err < 0)
3914			goto fail;
3915	}
3916
3917	/*
3918	 * snaprealms.  we provide mds with the ino, seq (version), and
3919	 * parent for all of our realms.  If the mds has any newer info,
3920	 * it will tell us.
3921	 */
3922	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3923		struct ceph_snap_realm *realm =
3924		       rb_entry(p, struct ceph_snap_realm, node);
3925		struct ceph_mds_snaprealm_reconnect sr_rec;
3926
3927		if (recon_state->msg_version >= 4) {
3928			size_t need = sizeof(u8) * 2 + sizeof(u32) +
3929				      sizeof(sr_rec);
3930
3931			if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3932				err = send_reconnect_partial(recon_state);
3933				if (err)
3934					goto fail;
3935				pagelist = recon_state->pagelist;
3936			}
3937
3938			err = ceph_pagelist_reserve(pagelist, need);
3939			if (err)
3940				goto fail;
3941
3942			ceph_pagelist_encode_8(pagelist, 1);
3943			ceph_pagelist_encode_8(pagelist, 1);
3944			ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3945		}
3946
3947		dout(" adding snap realm %llx seq %lld parent %llx\n",
3948		     realm->ino, realm->seq, realm->parent_ino);
3949		sr_rec.ino = cpu_to_le64(realm->ino);
3950		sr_rec.seq = cpu_to_le64(realm->seq);
3951		sr_rec.parent = cpu_to_le64(realm->parent_ino);
3952
3953		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3954		if (err)
3955			goto fail;
3956
3957		recon_state->nr_realms++;
3958	}
3959fail:
3960	return err;
3961}
3962
3963
3964/*
3965 * If an MDS fails and recovers, clients need to reconnect in order to
3966 * reestablish shared state.  This includes all caps issued through
3967 * this session _and_ the snap_realm hierarchy.  Because it's not
3968 * clear which snap realms the mds cares about, we send everything we
3969 * know about.. that ensures we'll then get any new info the
3970 * recovering MDS might have.
3971 *
3972 * This is a relatively heavyweight operation, but it's rare.
3973 */
3974static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3975			       struct ceph_mds_session *session)
3976{
3977	struct ceph_msg *reply;
3978	int mds = session->s_mds;
3979	int err = -ENOMEM;
3980	struct ceph_reconnect_state recon_state = {
3981		.session = session,
3982	};
3983	LIST_HEAD(dispose);
3984
3985	pr_info("mds%d reconnect start\n", mds);
3986
3987	recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
3988	if (!recon_state.pagelist)
3989		goto fail_nopagelist;
3990
3991	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3992	if (!reply)
3993		goto fail_nomsg;
3994
3995	xa_destroy(&session->s_delegated_inos);
3996
3997	mutex_lock(&session->s_mutex);
3998	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
3999	session->s_seq = 0;
4000
4001	dout("session %p state %s\n", session,
4002	     ceph_session_state_name(session->s_state));
4003
4004	spin_lock(&session->s_gen_ttl_lock);
4005	session->s_cap_gen++;
4006	spin_unlock(&session->s_gen_ttl_lock);
4007
4008	spin_lock(&session->s_cap_lock);
4009	/* don't know if session is readonly */
4010	session->s_readonly = 0;
4011	/*
4012	 * notify __ceph_remove_cap() that we are composing cap reconnect.
4013	 * If a cap get released before being added to the cap reconnect,
4014	 * __ceph_remove_cap() should skip queuing cap release.
4015	 */
4016	session->s_cap_reconnect = 1;
4017	/* drop old cap expires; we're about to reestablish that state */
4018	detach_cap_releases(session, &dispose);
4019	spin_unlock(&session->s_cap_lock);
4020	dispose_cap_releases(mdsc, &dispose);
4021
4022	/* trim unused caps to reduce MDS's cache rejoin time */
4023	if (mdsc->fsc->sb->s_root)
4024		shrink_dcache_parent(mdsc->fsc->sb->s_root);
4025
4026	ceph_con_close(&session->s_con);
4027	ceph_con_open(&session->s_con,
4028		      CEPH_ENTITY_TYPE_MDS, mds,
4029		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4030
4031	/* replay unsafe requests */
4032	replay_unsafe_requests(mdsc, session);
4033
4034	ceph_early_kick_flushing_caps(mdsc, session);
4035
4036	down_read(&mdsc->snap_rwsem);
4037
4038	/* placeholder for nr_caps */
4039	err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4040	if (err)
4041		goto fail;
4042
4043	if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4044		recon_state.msg_version = 3;
4045		recon_state.allow_multi = true;
4046	} else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4047		recon_state.msg_version = 3;
4048	} else {
4049		recon_state.msg_version = 2;
4050	}
4051	/* trsaverse this session's caps */
4052	err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4053
4054	spin_lock(&session->s_cap_lock);
4055	session->s_cap_reconnect = 0;
4056	spin_unlock(&session->s_cap_lock);
4057
4058	if (err < 0)
4059		goto fail;
4060
4061	/* check if all realms can be encoded into current message */
4062	if (mdsc->num_snap_realms) {
4063		size_t total_len =
4064			recon_state.pagelist->length +
4065			mdsc->num_snap_realms *
4066			sizeof(struct ceph_mds_snaprealm_reconnect);
4067		if (recon_state.msg_version >= 4) {
4068			/* number of realms */
4069			total_len += sizeof(u32);
4070			/* version, compat_version and struct_len */
4071			total_len += mdsc->num_snap_realms *
4072				     (2 * sizeof(u8) + sizeof(u32));
4073		}
4074		if (total_len > RECONNECT_MAX_SIZE) {
4075			if (!recon_state.allow_multi) {
4076				err = -ENOSPC;
4077				goto fail;
4078			}
4079			if (recon_state.nr_caps) {
4080				err = send_reconnect_partial(&recon_state);
4081				if (err)
4082					goto fail;
4083			}
4084			recon_state.msg_version = 5;
4085		}
4086	}
4087
4088	err = encode_snap_realms(mdsc, &recon_state);
4089	if (err < 0)
4090		goto fail;
4091
4092	if (recon_state.msg_version >= 5) {
4093		err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4094		if (err < 0)
4095			goto fail;
4096	}
4097
4098	if (recon_state.nr_caps || recon_state.nr_realms) {
4099		struct page *page =
4100			list_first_entry(&recon_state.pagelist->head,
4101					struct page, lru);
4102		__le32 *addr = kmap_atomic(page);
4103		if (recon_state.nr_caps) {
4104			WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4105			*addr = cpu_to_le32(recon_state.nr_caps);
4106		} else if (recon_state.msg_version >= 4) {
4107			*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
4108		}
4109		kunmap_atomic(addr);
4110	}
4111
4112	reply->hdr.version = cpu_to_le16(recon_state.msg_version);
4113	if (recon_state.msg_version >= 4)
4114		reply->hdr.compat_version = cpu_to_le16(4);
4115
4116	reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
4117	ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
4118
4119	ceph_con_send(&session->s_con, reply);
4120
4121	mutex_unlock(&session->s_mutex);
4122
4123	mutex_lock(&mdsc->mutex);
4124	__wake_requests(mdsc, &session->s_waiting);
4125	mutex_unlock(&mdsc->mutex);
4126
4127	up_read(&mdsc->snap_rwsem);
4128	ceph_pagelist_release(recon_state.pagelist);
4129	return;
4130
4131fail:
4132	ceph_msg_put(reply);
4133	up_read(&mdsc->snap_rwsem);
4134	mutex_unlock(&session->s_mutex);
4135fail_nomsg:
4136	ceph_pagelist_release(recon_state.pagelist);
4137fail_nopagelist:
4138	pr_err("error %d preparing reconnect for mds%d\n", err, mds);
4139	return;
4140}
4141
4142
4143/*
4144 * compare old and new mdsmaps, kicking requests
4145 * and closing out old connections as necessary
4146 *
4147 * called under mdsc->mutex.
4148 */
4149static void check_new_map(struct ceph_mds_client *mdsc,
4150			  struct ceph_mdsmap *newmap,
4151			  struct ceph_mdsmap *oldmap)
4152{
4153	int i;
4154	int oldstate, newstate;
4155	struct ceph_mds_session *s;
4156
4157	dout("check_new_map new %u old %u\n",
4158	     newmap->m_epoch, oldmap->m_epoch);
4159
4160	for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4161		if (!mdsc->sessions[i])
4162			continue;
4163		s = mdsc->sessions[i];
4164		oldstate = ceph_mdsmap_get_state(oldmap, i);
4165		newstate = ceph_mdsmap_get_state(newmap, i);
4166
4167		dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
4168		     i, ceph_mds_state_name(oldstate),
4169		     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
4170		     ceph_mds_state_name(newstate),
4171		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
4172		     ceph_session_state_name(s->s_state));
4173
4174		if (i >= newmap->possible_max_rank) {
4175			/* force close session for stopped mds */
4176			ceph_get_mds_session(s);
4177			__unregister_session(mdsc, s);
4178			__wake_requests(mdsc, &s->s_waiting);
4179			mutex_unlock(&mdsc->mutex);
4180
4181			mutex_lock(&s->s_mutex);
4182			cleanup_session_requests(mdsc, s);
4183			remove_session_caps(s);
4184			mutex_unlock(&s->s_mutex);
4185
4186			ceph_put_mds_session(s);
4187
4188			mutex_lock(&mdsc->mutex);
4189			kick_requests(mdsc, i);
4190			continue;
4191		}
4192
4193		if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
4194			   ceph_mdsmap_get_addr(newmap, i),
4195			   sizeof(struct ceph_entity_addr))) {
4196			/* just close it */
4197			mutex_unlock(&mdsc->mutex);
4198			mutex_lock(&s->s_mutex);
4199			mutex_lock(&mdsc->mutex);
4200			ceph_con_close(&s->s_con);
4201			mutex_unlock(&s->s_mutex);
4202			s->s_state = CEPH_MDS_SESSION_RESTARTING;
4203		} else if (oldstate == newstate) {
4204			continue;  /* nothing new with this mds */
4205		}
4206
4207		/*
4208		 * send reconnect?
4209		 */
4210		if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
4211		    newstate >= CEPH_MDS_STATE_RECONNECT) {
4212			mutex_unlock(&mdsc->mutex);
4213			send_mds_reconnect(mdsc, s);
4214			mutex_lock(&mdsc->mutex);
4215		}
4216
4217		/*
4218		 * kick request on any mds that has gone active.
4219		 */
4220		if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4221		    newstate >= CEPH_MDS_STATE_ACTIVE) {
4222			if (oldstate != CEPH_MDS_STATE_CREATING &&
4223			    oldstate != CEPH_MDS_STATE_STARTING)
4224				pr_info("mds%d recovery completed\n", s->s_mds);
4225			kick_requests(mdsc, i);
4226			mutex_unlock(&mdsc->mutex);
4227			mutex_lock(&s->s_mutex);
4228			mutex_lock(&mdsc->mutex);
4229			ceph_kick_flushing_caps(mdsc, s);
4230			mutex_unlock(&s->s_mutex);
4231			wake_up_session_caps(s, RECONNECT);
4232		}
4233	}
4234
4235	for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4236		s = mdsc->sessions[i];
4237		if (!s)
4238			continue;
4239		if (!ceph_mdsmap_is_laggy(newmap, i))
4240			continue;
4241		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4242		    s->s_state == CEPH_MDS_SESSION_HUNG ||
4243		    s->s_state == CEPH_MDS_SESSION_CLOSING) {
4244			dout(" connecting to export targets of laggy mds%d\n",
4245			     i);
4246			__open_export_target_sessions(mdsc, s);
4247		}
4248	}
4249}
4250
4251
4252
4253/*
4254 * leases
4255 */
4256
4257/*
4258 * caller must hold session s_mutex, dentry->d_lock
4259 */
4260void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4261{
4262	struct ceph_dentry_info *di = ceph_dentry(dentry);
4263
4264	ceph_put_mds_session(di->lease_session);
4265	di->lease_session = NULL;
4266}
4267
4268static void handle_lease(struct ceph_mds_client *mdsc,
4269			 struct ceph_mds_session *session,
4270			 struct ceph_msg *msg)
4271{
4272	struct super_block *sb = mdsc->fsc->sb;
4273	struct inode *inode;
4274	struct dentry *parent, *dentry;
4275	struct ceph_dentry_info *di;
4276	int mds = session->s_mds;
4277	struct ceph_mds_lease *h = msg->front.iov_base;
4278	u32 seq;
4279	struct ceph_vino vino;
4280	struct qstr dname;
4281	int release = 0;
4282
4283	dout("handle_lease from mds%d\n", mds);
4284
4285	/* decode */
4286	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4287		goto bad;
4288	vino.ino = le64_to_cpu(h->ino);
4289	vino.snap = CEPH_NOSNAP;
4290	seq = le32_to_cpu(h->seq);
4291	dname.len = get_unaligned_le32(h + 1);
4292	if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
4293		goto bad;
4294	dname.name = (void *)(h + 1) + sizeof(u32);
4295
4296	/* lookup inode */
4297	inode = ceph_find_inode(sb, vino);
4298	dout("handle_lease %s, ino %llx %p %.*s\n",
4299	     ceph_lease_op_name(h->action), vino.ino, inode,
4300	     dname.len, dname.name);
4301
4302	mutex_lock(&session->s_mutex);
4303	inc_session_sequence(session);
4304
4305	if (!inode) {
4306		dout("handle_lease no inode %llx\n", vino.ino);
4307		goto release;
4308	}
4309
4310	/* dentry */
4311	parent = d_find_alias(inode);
4312	if (!parent) {
4313		dout("no parent dentry on inode %p\n", inode);
4314		WARN_ON(1);
4315		goto release;  /* hrm... */
4316	}
4317	dname.hash = full_name_hash(parent, dname.name, dname.len);
4318	dentry = d_lookup(parent, &dname);
4319	dput(parent);
4320	if (!dentry)
4321		goto release;
4322
4323	spin_lock(&dentry->d_lock);
4324	di = ceph_dentry(dentry);
4325	switch (h->action) {
4326	case CEPH_MDS_LEASE_REVOKE:
4327		if (di->lease_session == session) {
4328			if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4329				h->seq = cpu_to_le32(di->lease_seq);
4330			__ceph_mdsc_drop_dentry_lease(dentry);
4331		}
4332		release = 1;
4333		break;
4334
4335	case CEPH_MDS_LEASE_RENEW:
4336		if (di->lease_session == session &&
4337		    di->lease_gen == session->s_cap_gen &&
4338		    di->lease_renew_from &&
4339		    di->lease_renew_after == 0) {
4340			unsigned long duration =
4341				msecs_to_jiffies(le32_to_cpu(h->duration_ms));
4342
4343			di->lease_seq = seq;
4344			di->time = di->lease_renew_from + duration;
4345			di->lease_renew_after = di->lease_renew_from +
4346				(duration >> 1);
4347			di->lease_renew_from = 0;
4348		}
4349		break;
4350	}
4351	spin_unlock(&dentry->d_lock);
4352	dput(dentry);
4353
4354	if (!release)
4355		goto out;
4356
4357release:
4358	/* let's just reuse the same message */
4359	h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4360	ceph_msg_get(msg);
4361	ceph_con_send(&session->s_con, msg);
4362
4363out:
4364	mutex_unlock(&session->s_mutex);
4365	/* avoid calling iput_final() in mds dispatch threads */
4366	ceph_async_iput(inode);
4367	return;
4368
4369bad:
4370	pr_err("corrupt lease message\n");
4371	ceph_msg_dump(msg);
4372}
4373
4374void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
4375			      struct dentry *dentry, char action,
4376			      u32 seq)
4377{
4378	struct ceph_msg *msg;
4379	struct ceph_mds_lease *lease;
4380	struct inode *dir;
4381	int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
4382
4383	dout("lease_send_msg identry %p %s to mds%d\n",
4384	     dentry, ceph_lease_op_name(action), session->s_mds);
4385
4386	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
4387	if (!msg)
4388		return;
4389	lease = msg->front.iov_base;
4390	lease->action = action;
4391	lease->seq = cpu_to_le32(seq);
4392
4393	spin_lock(&dentry->d_lock);
4394	dir = d_inode(dentry->d_parent);
4395	lease->ino = cpu_to_le64(ceph_ino(dir));
4396	lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
4397
4398	put_unaligned_le32(dentry->d_name.len, lease + 1);
4399	memcpy((void *)(lease + 1) + 4,
4400	       dentry->d_name.name, dentry->d_name.len);
4401	spin_unlock(&dentry->d_lock);
4402	/*
4403	 * if this is a preemptive lease RELEASE, no need to
4404	 * flush request stream, since the actual request will
4405	 * soon follow.
4406	 */
4407	msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
4408
4409	ceph_con_send(&session->s_con, msg);
4410}
4411
4412/*
4413 * lock unlock the session, to wait ongoing session activities
4414 */
4415static void lock_unlock_session(struct ceph_mds_session *s)
4416{
4417	mutex_lock(&s->s_mutex);
4418	mutex_unlock(&s->s_mutex);
4419}
4420
4421static void maybe_recover_session(struct ceph_mds_client *mdsc)
4422{
4423	struct ceph_fs_client *fsc = mdsc->fsc;
4424
4425	if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
4426		return;
4427
4428	if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
4429		return;
4430
4431	if (!READ_ONCE(fsc->blocklisted))
4432		return;
4433
4434	if (fsc->last_auto_reconnect &&
4435	    time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30))
4436		return;
4437
4438	pr_info("auto reconnect after blocklisted\n");
4439	fsc->last_auto_reconnect = jiffies;
4440	ceph_force_reconnect(fsc->sb);
4441}
4442
4443bool check_session_state(struct ceph_mds_session *s)
4444{
4445	switch (s->s_state) {
4446	case CEPH_MDS_SESSION_OPEN:
4447		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4448			s->s_state = CEPH_MDS_SESSION_HUNG;
4449			pr_info("mds%d hung\n", s->s_mds);
4450		}
4451		break;
4452	case CEPH_MDS_SESSION_CLOSING:
4453		/* Should never reach this when we're unmounting */
4454		WARN_ON_ONCE(s->s_ttl);
4455		fallthrough;
4456	case CEPH_MDS_SESSION_NEW:
4457	case CEPH_MDS_SESSION_RESTARTING:
4458	case CEPH_MDS_SESSION_CLOSED:
4459	case CEPH_MDS_SESSION_REJECTED:
4460		return false;
4461	}
4462
4463	return true;
4464}
4465
4466/*
4467 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
4468 * then we need to retransmit that request.
4469 */
4470void inc_session_sequence(struct ceph_mds_session *s)
4471{
4472	lockdep_assert_held(&s->s_mutex);
4473
4474	s->s_seq++;
4475
4476	if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4477		int ret;
4478
4479		dout("resending session close request for mds%d\n", s->s_mds);
4480		ret = request_close_session(s);
4481		if (ret < 0)
4482			pr_err("unable to close session to mds%d: %d\n",
4483			       s->s_mds, ret);
4484	}
4485}
4486
4487/*
4488 * delayed work -- periodically trim expired leases, renew caps with mds.  If
4489 * the @delay parameter is set to 0 or if it's more than 5 secs, the default
4490 * workqueue delay value of 5 secs will be used.
4491 */
4492static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
4493{
4494	unsigned long max_delay = HZ * 5;
4495
4496	/* 5 secs default delay */
4497	if (!delay || (delay > max_delay))
4498		delay = max_delay;
4499	schedule_delayed_work(&mdsc->delayed_work,
4500			      round_jiffies_relative(delay));
4501}
4502
4503static void delayed_work(struct work_struct *work)
4504{
4505	struct ceph_mds_client *mdsc =
4506		container_of(work, struct ceph_mds_client, delayed_work.work);
4507	unsigned long delay;
4508	int renew_interval;
4509	int renew_caps;
4510	int i;
4511
4512	dout("mdsc delayed_work\n");
4513
4514	if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
4515		return;
4516
4517	mutex_lock(&mdsc->mutex);
4518	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4519	renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4520				   mdsc->last_renew_caps);
4521	if (renew_caps)
4522		mdsc->last_renew_caps = jiffies;
4523
4524	for (i = 0; i < mdsc->max_sessions; i++) {
4525		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4526		if (!s)
4527			continue;
4528
4529		if (!check_session_state(s)) {
4530			ceph_put_mds_session(s);
4531			continue;
4532		}
4533		mutex_unlock(&mdsc->mutex);
4534
4535		mutex_lock(&s->s_mutex);
4536		if (renew_caps)
4537			send_renew_caps(mdsc, s);
4538		else
4539			ceph_con_keepalive(&s->s_con);
4540		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4541		    s->s_state == CEPH_MDS_SESSION_HUNG)
4542			ceph_send_cap_releases(mdsc, s);
4543		mutex_unlock(&s->s_mutex);
4544		ceph_put_mds_session(s);
4545
4546		mutex_lock(&mdsc->mutex);
4547	}
4548	mutex_unlock(&mdsc->mutex);
4549
4550	delay = ceph_check_delayed_caps(mdsc);
4551
4552	ceph_queue_cap_reclaim_work(mdsc);
4553
4554	ceph_trim_snapid_map(mdsc);
4555
4556	maybe_recover_session(mdsc);
4557
4558	schedule_delayed(mdsc, delay);
4559}
4560
4561int ceph_mdsc_init(struct ceph_fs_client *fsc)
4562
4563{
4564	struct ceph_mds_client *mdsc;
4565	int err;
4566
4567	mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4568	if (!mdsc)
4569		return -ENOMEM;
4570	mdsc->fsc = fsc;
4571	mutex_init(&mdsc->mutex);
4572	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
4573	if (!mdsc->mdsmap) {
4574		err = -ENOMEM;
4575		goto err_mdsc;
4576	}
4577
4578	init_completion(&mdsc->safe_umount_waiters);
4579	init_waitqueue_head(&mdsc->session_close_wq);
4580	INIT_LIST_HEAD(&mdsc->waiting_for_map);
4581	mdsc->sessions = NULL;
4582	atomic_set(&mdsc->num_sessions, 0);
4583	mdsc->max_sessions = 0;
4584	mdsc->stopping = 0;
4585	atomic64_set(&mdsc->quotarealms_count, 0);
4586	mdsc->quotarealms_inodes = RB_ROOT;
4587	mutex_init(&mdsc->quotarealms_inodes_mutex);
4588	mdsc->last_snap_seq = 0;
4589	init_rwsem(&mdsc->snap_rwsem);
4590	mdsc->snap_realms = RB_ROOT;
4591	INIT_LIST_HEAD(&mdsc->snap_empty);
4592	mdsc->num_snap_realms = 0;
4593	spin_lock_init(&mdsc->snap_empty_lock);
4594	mdsc->last_tid = 0;
4595	mdsc->oldest_tid = 0;
4596	mdsc->request_tree = RB_ROOT;
4597	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4598	mdsc->last_renew_caps = jiffies;
4599	INIT_LIST_HEAD(&mdsc->cap_delay_list);
4600	INIT_LIST_HEAD(&mdsc->cap_wait_list);
4601	spin_lock_init(&mdsc->cap_delay_lock);
4602	INIT_LIST_HEAD(&mdsc->snap_flush_list);
4603	spin_lock_init(&mdsc->snap_flush_lock);
4604	mdsc->last_cap_flush_tid = 1;
4605	INIT_LIST_HEAD(&mdsc->cap_flush_list);
4606	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4607	mdsc->num_cap_flushing = 0;
4608	spin_lock_init(&mdsc->cap_dirty_lock);
4609	init_waitqueue_head(&mdsc->cap_flushing_wq);
4610	INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4611	atomic_set(&mdsc->cap_reclaim_pending, 0);
4612	err = ceph_metric_init(&mdsc->metric);
4613	if (err)
4614		goto err_mdsmap;
4615
4616	spin_lock_init(&mdsc->dentry_list_lock);
4617	INIT_LIST_HEAD(&mdsc->dentry_leases);
4618	INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
4619
4620	ceph_caps_init(mdsc);
4621	ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4622
4623	spin_lock_init(&mdsc->snapid_map_lock);
4624	mdsc->snapid_map_tree = RB_ROOT;
4625	INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4626
4627	init_rwsem(&mdsc->pool_perm_rwsem);
4628	mdsc->pool_perm_tree = RB_ROOT;
4629
4630	strscpy(mdsc->nodename, utsname()->nodename,
4631		sizeof(mdsc->nodename));
4632
4633	fsc->mdsc = mdsc;
4634	return 0;
4635
4636err_mdsmap:
4637	kfree(mdsc->mdsmap);
4638err_mdsc:
4639	kfree(mdsc);
4640	return err;
4641}
4642
4643/*
4644 * Wait for safe replies on open mds requests.  If we time out, drop
4645 * all requests from the tree to avoid dangling dentry refs.
4646 */
4647static void wait_requests(struct ceph_mds_client *mdsc)
4648{
4649	struct ceph_options *opts = mdsc->fsc->client->options;
4650	struct ceph_mds_request *req;
4651
4652	mutex_lock(&mdsc->mutex);
4653	if (__get_oldest_req(mdsc)) {
4654		mutex_unlock(&mdsc->mutex);
4655
4656		dout("wait_requests waiting for requests\n");
4657		wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4658				    ceph_timeout_jiffies(opts->mount_timeout));
4659
4660		/* tear down remaining requests */
4661		mutex_lock(&mdsc->mutex);
4662		while ((req = __get_oldest_req(mdsc))) {
4663			dout("wait_requests timed out on tid %llu\n",
4664			     req->r_tid);
4665			list_del_init(&req->r_wait);
4666			__unregister_request(mdsc, req);
4667		}
4668	}
4669	mutex_unlock(&mdsc->mutex);
4670	dout("wait_requests done\n");
4671}
4672
4673void send_flush_mdlog(struct ceph_mds_session *s)
4674{
4675	struct ceph_msg *msg;
4676
4677	/*
4678	 * Pre-luminous MDS crashes when it sees an unknown session request
4679	 */
4680	if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
4681		return;
4682
4683	mutex_lock(&s->s_mutex);
4684	dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds,
4685	     ceph_session_state_name(s->s_state), s->s_seq);
4686	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
4687				      s->s_seq);
4688	if (!msg) {
4689		pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n",
4690		       s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
4691	} else {
4692		ceph_con_send(&s->s_con, msg);
4693	}
4694	mutex_unlock(&s->s_mutex);
4695}
4696
4697/*
4698 * called before mount is ro, and before dentries are torn down.
4699 * (hmm, does this still race with new lookups?)
4700 */
4701void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4702{
4703	dout("pre_umount\n");
4704	mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN;
4705
4706	ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
4707	ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
4708	ceph_flush_dirty_caps(mdsc);
4709	wait_requests(mdsc);
4710
4711	/*
4712	 * wait for reply handlers to drop their request refs and
4713	 * their inode/dcache refs
4714	 */
4715	ceph_msgr_flush();
4716
4717	ceph_cleanup_quotarealms_inodes(mdsc);
4718}
4719
4720/*
4721 * wait for all write mds requests to flush.
4722 */
4723static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4724{
4725	struct ceph_mds_request *req = NULL, *nextreq;
4726	struct rb_node *n;
4727
4728	mutex_lock(&mdsc->mutex);
4729	dout("wait_unsafe_requests want %lld\n", want_tid);
4730restart:
4731	req = __get_oldest_req(mdsc);
4732	while (req && req->r_tid <= want_tid) {
4733		/* find next request */
4734		n = rb_next(&req->r_node);
4735		if (n)
4736			nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4737		else
4738			nextreq = NULL;
4739		if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4740		    (req->r_op & CEPH_MDS_OP_WRITE)) {
4741			/* write op */
4742			ceph_mdsc_get_request(req);
4743			if (nextreq)
4744				ceph_mdsc_get_request(nextreq);
4745			mutex_unlock(&mdsc->mutex);
4746			dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
4747			     req->r_tid, want_tid);
4748			wait_for_completion(&req->r_safe_completion);
4749			mutex_lock(&mdsc->mutex);
4750			ceph_mdsc_put_request(req);
4751			if (!nextreq)
4752				break;  /* next dne before, so we're done! */
4753			if (RB_EMPTY_NODE(&nextreq->r_node)) {
4754				/* next request was removed from tree */
4755				ceph_mdsc_put_request(nextreq);
4756				goto restart;
4757			}
4758			ceph_mdsc_put_request(nextreq);  /* won't go away */
4759		}
4760		req = nextreq;
4761	}
4762	mutex_unlock(&mdsc->mutex);
4763	dout("wait_unsafe_requests done\n");
4764}
4765
4766void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4767{
4768	u64 want_tid, want_flush;
4769
4770	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4771		return;
4772
4773	dout("sync\n");
4774	mutex_lock(&mdsc->mutex);
4775	want_tid = mdsc->last_tid;
4776	mutex_unlock(&mdsc->mutex);
4777
4778	ceph_flush_dirty_caps(mdsc);
4779	spin_lock(&mdsc->cap_dirty_lock);
4780	want_flush = mdsc->last_cap_flush_tid;
4781	if (!list_empty(&mdsc->cap_flush_list)) {
4782		struct ceph_cap_flush *cf =
4783			list_last_entry(&mdsc->cap_flush_list,
4784					struct ceph_cap_flush, g_list);
4785		cf->wake = true;
4786	}
4787	spin_unlock(&mdsc->cap_dirty_lock);
4788
4789	dout("sync want tid %lld flush_seq %lld\n",
4790	     want_tid, want_flush);
4791
4792	wait_unsafe_requests(mdsc, want_tid);
4793	wait_caps_flush(mdsc, want_flush);
4794}
4795
4796/*
4797 * true if all sessions are closed, or we force unmount
4798 */
4799static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4800{
4801	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4802		return true;
4803	return atomic_read(&mdsc->num_sessions) <= skipped;
4804}
4805
4806/*
4807 * called after sb is ro.
4808 */
4809void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4810{
4811	struct ceph_options *opts = mdsc->fsc->client->options;
4812	struct ceph_mds_session *session;
4813	int i;
4814	int skipped = 0;
4815
4816	dout("close_sessions\n");
4817
4818	/* close sessions */
4819	mutex_lock(&mdsc->mutex);
4820	for (i = 0; i < mdsc->max_sessions; i++) {
4821		session = __ceph_lookup_mds_session(mdsc, i);
4822		if (!session)
4823			continue;
4824		mutex_unlock(&mdsc->mutex);
4825		mutex_lock(&session->s_mutex);
4826		if (__close_session(mdsc, session) <= 0)
4827			skipped++;
4828		mutex_unlock(&session->s_mutex);
4829		ceph_put_mds_session(session);
4830		mutex_lock(&mdsc->mutex);
4831	}
4832	mutex_unlock(&mdsc->mutex);
4833
4834	dout("waiting for sessions to close\n");
4835	wait_event_timeout(mdsc->session_close_wq,
4836			   done_closing_sessions(mdsc, skipped),
4837			   ceph_timeout_jiffies(opts->mount_timeout));
4838
4839	/* tear down remaining sessions */
4840	mutex_lock(&mdsc->mutex);
4841	for (i = 0; i < mdsc->max_sessions; i++) {
4842		if (mdsc->sessions[i]) {
4843			session = ceph_get_mds_session(mdsc->sessions[i]);
4844			__unregister_session(mdsc, session);
4845			mutex_unlock(&mdsc->mutex);
4846			mutex_lock(&session->s_mutex);
4847			remove_session_caps(session);
4848			mutex_unlock(&session->s_mutex);
4849			ceph_put_mds_session(session);
4850			mutex_lock(&mdsc->mutex);
4851		}
4852	}
4853	WARN_ON(!list_empty(&mdsc->cap_delay_list));
4854	mutex_unlock(&mdsc->mutex);
4855
4856	ceph_cleanup_snapid_map(mdsc);
4857	ceph_cleanup_empty_realms(mdsc);
4858
4859	cancel_work_sync(&mdsc->cap_reclaim_work);
4860	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4861
4862	dout("stopped\n");
4863}
4864
4865void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4866{
4867	struct ceph_mds_session *session;
4868	int mds;
4869
4870	dout("force umount\n");
4871
4872	mutex_lock(&mdsc->mutex);
4873	for (mds = 0; mds < mdsc->max_sessions; mds++) {
4874		session = __ceph_lookup_mds_session(mdsc, mds);
4875		if (!session)
4876			continue;
4877
4878		if (session->s_state == CEPH_MDS_SESSION_REJECTED)
4879			__unregister_session(mdsc, session);
4880		__wake_requests(mdsc, &session->s_waiting);
4881		mutex_unlock(&mdsc->mutex);
4882
4883		mutex_lock(&session->s_mutex);
4884		__close_session(mdsc, session);
4885		if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4886			cleanup_session_requests(mdsc, session);
4887			remove_session_caps(session);
4888		}
4889		mutex_unlock(&session->s_mutex);
4890		ceph_put_mds_session(session);
4891
4892		mutex_lock(&mdsc->mutex);
4893		kick_requests(mdsc, mds);
4894	}
4895	__wake_requests(mdsc, &mdsc->waiting_for_map);
4896	mutex_unlock(&mdsc->mutex);
4897}
4898
4899static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
4900{
4901	dout("stop\n");
4902	/*
4903	 * Make sure the delayed work stopped before releasing
4904	 * the resources.
4905	 *
4906	 * Because the cancel_delayed_work_sync() will only
4907	 * guarantee that the work finishes executing. But the
4908	 * delayed work will re-arm itself again after that.
4909	 */
4910	flush_delayed_work(&mdsc->delayed_work);
4911
4912	if (mdsc->mdsmap)
4913		ceph_mdsmap_destroy(mdsc->mdsmap);
4914	kfree(mdsc->sessions);
4915	ceph_caps_finalize(mdsc);
4916	ceph_pool_perm_destroy(mdsc);
4917}
4918
4919void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
4920{
4921	struct ceph_mds_client *mdsc = fsc->mdsc;
4922	dout("mdsc_destroy %p\n", mdsc);
4923
4924	if (!mdsc)
4925		return;
4926
4927	/* flush out any connection work with references to us */
4928	ceph_msgr_flush();
4929
4930	ceph_mdsc_stop(mdsc);
4931
4932	ceph_metric_destroy(&mdsc->metric);
4933
4934	fsc->mdsc = NULL;
4935	kfree(mdsc);
4936	dout("mdsc_destroy %p done\n", mdsc);
4937}
4938
4939void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4940{
4941	struct ceph_fs_client *fsc = mdsc->fsc;
4942	const char *mds_namespace = fsc->mount_options->mds_namespace;
4943	void *p = msg->front.iov_base;
4944	void *end = p + msg->front.iov_len;
4945	u32 epoch;
4946	u32 map_len;
4947	u32 num_fs;
4948	u32 mount_fscid = (u32)-1;
4949	u8 struct_v, struct_cv;
4950	int err = -EINVAL;
4951
4952	ceph_decode_need(&p, end, sizeof(u32), bad);
4953	epoch = ceph_decode_32(&p);
4954
4955	dout("handle_fsmap epoch %u\n", epoch);
4956
4957	ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4958	struct_v = ceph_decode_8(&p);
4959	struct_cv = ceph_decode_8(&p);
4960	map_len = ceph_decode_32(&p);
4961
4962	ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
4963	p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
4964
4965	num_fs = ceph_decode_32(&p);
4966	while (num_fs-- > 0) {
4967		void *info_p, *info_end;
4968		u32 info_len;
4969		u8 info_v, info_cv;
4970		u32 fscid, namelen;
4971
4972		ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4973		info_v = ceph_decode_8(&p);
4974		info_cv = ceph_decode_8(&p);
4975		info_len = ceph_decode_32(&p);
4976		ceph_decode_need(&p, end, info_len, bad);
4977		info_p = p;
4978		info_end = p + info_len;
4979		p = info_end;
4980
4981		ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
4982		fscid = ceph_decode_32(&info_p);
4983		namelen = ceph_decode_32(&info_p);
4984		ceph_decode_need(&info_p, info_end, namelen, bad);
4985
4986		if (mds_namespace &&
4987		    strlen(mds_namespace) == namelen &&
4988		    !strncmp(mds_namespace, (char *)info_p, namelen)) {
4989			mount_fscid = fscid;
4990			break;
4991		}
4992	}
4993
4994	ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
4995	if (mount_fscid != (u32)-1) {
4996		fsc->client->monc.fs_cluster_id = mount_fscid;
4997		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
4998				   0, true);
4999		ceph_monc_renew_subs(&fsc->client->monc);
5000	} else {
5001		err = -ENOENT;
5002		goto err_out;
5003	}
5004	return;
5005
5006bad:
5007	pr_err("error decoding fsmap\n");
5008err_out:
5009	mutex_lock(&mdsc->mutex);
5010	mdsc->mdsmap_err = err;
5011	__wake_requests(mdsc, &mdsc->waiting_for_map);
5012	mutex_unlock(&mdsc->mutex);
5013}
5014
5015/*
5016 * handle mds map update.
5017 */
5018void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
5019{
5020	u32 epoch;
5021	u32 maplen;
5022	void *p = msg->front.iov_base;
5023	void *end = p + msg->front.iov_len;
5024	struct ceph_mdsmap *newmap, *oldmap;
5025	struct ceph_fsid fsid;
5026	int err = -EINVAL;
5027
5028	ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
5029	ceph_decode_copy(&p, &fsid, sizeof(fsid));
5030	if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
5031		return;
5032	epoch = ceph_decode_32(&p);
5033	maplen = ceph_decode_32(&p);
5034	dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
5035
5036	/* do we need it? */
5037	mutex_lock(&mdsc->mutex);
5038	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
5039		dout("handle_map epoch %u <= our %u\n",
5040		     epoch, mdsc->mdsmap->m_epoch);
5041		mutex_unlock(&mdsc->mutex);
5042		return;
5043	}
5044
5045	newmap = ceph_mdsmap_decode(&p, end);
5046	if (IS_ERR(newmap)) {
5047		err = PTR_ERR(newmap);
5048		goto bad_unlock;
5049	}
5050
5051	/* swap into place */
5052	if (mdsc->mdsmap) {
5053		oldmap = mdsc->mdsmap;
5054		mdsc->mdsmap = newmap;
5055		check_new_map(mdsc, newmap, oldmap);
5056		ceph_mdsmap_destroy(oldmap);
5057	} else {
5058		mdsc->mdsmap = newmap;  /* first mds map */
5059	}
5060	mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
5061					MAX_LFS_FILESIZE);
5062
5063	__wake_requests(mdsc, &mdsc->waiting_for_map);
5064	ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
5065			  mdsc->mdsmap->m_epoch);
5066
5067	mutex_unlock(&mdsc->mutex);
5068	schedule_delayed(mdsc, 0);
5069	return;
5070
5071bad_unlock:
5072	mutex_unlock(&mdsc->mutex);
5073bad:
5074	pr_err("error decoding mdsmap %d\n", err);
5075	return;
5076}
5077
5078static struct ceph_connection *con_get(struct ceph_connection *con)
5079{
5080	struct ceph_mds_session *s = con->private;
5081
5082	if (ceph_get_mds_session(s))
5083		return con;
5084	return NULL;
5085}
5086
5087static void con_put(struct ceph_connection *con)
5088{
5089	struct ceph_mds_session *s = con->private;
5090
5091	ceph_put_mds_session(s);
5092}
5093
5094/*
5095 * if the client is unresponsive for long enough, the mds will kill
5096 * the session entirely.
5097 */
5098static void peer_reset(struct ceph_connection *con)
5099{
5100	struct ceph_mds_session *s = con->private;
5101	struct ceph_mds_client *mdsc = s->s_mdsc;
5102
5103	pr_warn("mds%d closed our session\n", s->s_mds);
5104	send_mds_reconnect(mdsc, s);
5105}
5106
5107static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5108{
5109	struct ceph_mds_session *s = con->private;
5110	struct ceph_mds_client *mdsc = s->s_mdsc;
5111	int type = le16_to_cpu(msg->hdr.type);
5112
5113	mutex_lock(&mdsc->mutex);
5114	if (__verify_registered_session(mdsc, s) < 0) {
5115		mutex_unlock(&mdsc->mutex);
5116		goto out;
5117	}
5118	mutex_unlock(&mdsc->mutex);
5119
5120	switch (type) {
5121	case CEPH_MSG_MDS_MAP:
5122		ceph_mdsc_handle_mdsmap(mdsc, msg);
5123		break;
5124	case CEPH_MSG_FS_MAP_USER:
5125		ceph_mdsc_handle_fsmap(mdsc, msg);
5126		break;
5127	case CEPH_MSG_CLIENT_SESSION:
5128		handle_session(s, msg);
5129		break;
5130	case CEPH_MSG_CLIENT_REPLY:
5131		handle_reply(s, msg);
5132		break;
5133	case CEPH_MSG_CLIENT_REQUEST_FORWARD:
5134		handle_forward(mdsc, s, msg);
5135		break;
5136	case CEPH_MSG_CLIENT_CAPS:
5137		ceph_handle_caps(s, msg);
5138		break;
5139	case CEPH_MSG_CLIENT_SNAP:
5140		ceph_handle_snap(mdsc, s, msg);
5141		break;
5142	case CEPH_MSG_CLIENT_LEASE:
5143		handle_lease(mdsc, s, msg);
5144		break;
5145	case CEPH_MSG_CLIENT_QUOTA:
5146		ceph_handle_quota(mdsc, s, msg);
5147		break;
5148
5149	default:
5150		pr_err("received unknown message type %d %s\n", type,
5151		       ceph_msg_type_name(type));
5152	}
5153out:
5154	ceph_msg_put(msg);
5155}
5156
5157/*
5158 * authentication
5159 */
5160
5161/*
5162 * Note: returned pointer is the address of a structure that's
5163 * managed separately.  Caller must *not* attempt to free it.
5164 */
5165static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
5166					int *proto, int force_new)
5167{
5168	struct ceph_mds_session *s = con->private;
5169	struct ceph_mds_client *mdsc = s->s_mdsc;
5170	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5171	struct ceph_auth_handshake *auth = &s->s_auth;
5172
5173	if (force_new && auth->authorizer) {
5174		ceph_auth_destroy_authorizer(auth->authorizer);
5175		auth->authorizer = NULL;
5176	}
5177	if (!auth->authorizer) {
5178		int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
5179						      auth);
5180		if (ret)
5181			return ERR_PTR(ret);
5182	} else {
5183		int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
5184						      auth);
5185		if (ret)
5186			return ERR_PTR(ret);
5187	}
5188	*proto = ac->protocol;
5189
5190	return auth;
5191}
5192
5193static int add_authorizer_challenge(struct ceph_connection *con,
5194				    void *challenge_buf, int challenge_buf_len)
5195{
5196	struct ceph_mds_session *s = con->private;
5197	struct ceph_mds_client *mdsc = s->s_mdsc;
5198	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5199
5200	return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
5201					    challenge_buf, challenge_buf_len);
5202}
5203
5204static int verify_authorizer_reply(struct ceph_connection *con)
5205{
5206	struct ceph_mds_session *s = con->private;
5207	struct ceph_mds_client *mdsc = s->s_mdsc;
5208	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5209
5210	return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
5211}
5212
5213static int invalidate_authorizer(struct ceph_connection *con)
5214{
5215	struct ceph_mds_session *s = con->private;
5216	struct ceph_mds_client *mdsc = s->s_mdsc;
5217	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5218
5219	ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
5220
5221	return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
5222}
5223
5224static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
5225				struct ceph_msg_header *hdr, int *skip)
5226{
5227	struct ceph_msg *msg;
5228	int type = (int) le16_to_cpu(hdr->type);
5229	int front_len = (int) le32_to_cpu(hdr->front_len);
5230
5231	if (con->in_msg)
5232		return con->in_msg;
5233
5234	*skip = 0;
5235	msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
5236	if (!msg) {
5237		pr_err("unable to allocate msg type %d len %d\n",
5238		       type, front_len);
5239		return NULL;
5240	}
5241
5242	return msg;
5243}
5244
5245static int mds_sign_message(struct ceph_msg *msg)
5246{
5247       struct ceph_mds_session *s = msg->con->private;
5248       struct ceph_auth_handshake *auth = &s->s_auth;
5249
5250       return ceph_auth_sign_message(auth, msg);
5251}
5252
5253static int mds_check_message_signature(struct ceph_msg *msg)
5254{
5255       struct ceph_mds_session *s = msg->con->private;
5256       struct ceph_auth_handshake *auth = &s->s_auth;
5257
5258       return ceph_auth_check_message_signature(auth, msg);
5259}
5260
5261static const struct ceph_connection_operations mds_con_ops = {
5262	.get = con_get,
5263	.put = con_put,
5264	.dispatch = dispatch,
5265	.get_authorizer = get_authorizer,
5266	.add_authorizer_challenge = add_authorizer_challenge,
5267	.verify_authorizer_reply = verify_authorizer_reply,
5268	.invalidate_authorizer = invalidate_authorizer,
5269	.peer_reset = peer_reset,
5270	.alloc_msg = mds_alloc_msg,
5271	.sign_message = mds_sign_message,
5272	.check_message_signature = mds_check_message_signature,
5273};
5274
5275/* eof */
5276