xref: /kernel/linux/linux-5.10/fs/ceph/inode.c (revision 8c2ecf20)
1// SPDX-License-Identifier: GPL-2.0
2#include <linux/ceph/ceph_debug.h>
3
4#include <linux/module.h>
5#include <linux/fs.h>
6#include <linux/slab.h>
7#include <linux/string.h>
8#include <linux/uaccess.h>
9#include <linux/kernel.h>
10#include <linux/writeback.h>
11#include <linux/vmalloc.h>
12#include <linux/xattr.h>
13#include <linux/posix_acl.h>
14#include <linux/random.h>
15#include <linux/sort.h>
16#include <linux/iversion.h>
17
18#include "super.h"
19#include "mds_client.h"
20#include "cache.h"
21#include <linux/ceph/decode.h>
22
23/*
24 * Ceph inode operations
25 *
26 * Implement basic inode helpers (get, alloc) and inode ops (getattr,
27 * setattr, etc.), xattr helpers, and helpers for assimilating
28 * metadata returned by the MDS into our cache.
29 *
30 * Also define helpers for doing asynchronous writeback, invalidation,
31 * and truncation for the benefit of those who can't afford to block
32 * (typically because they are in the message handler path).
33 */
34
35static const struct inode_operations ceph_symlink_iops;
36
37static void ceph_inode_work(struct work_struct *work);
38
39/*
40 * find or create an inode, given the ceph ino number
41 */
42static int ceph_set_ino_cb(struct inode *inode, void *data)
43{
44	struct ceph_inode_info *ci = ceph_inode(inode);
45	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
46
47	ci->i_vino = *(struct ceph_vino *)data;
48	inode->i_ino = ceph_vino_to_ino_t(ci->i_vino);
49	inode_set_iversion_raw(inode, 0);
50	percpu_counter_inc(&mdsc->metric.total_inodes);
51
52	return 0;
53}
54
55struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)
56{
57	struct inode *inode;
58
59	if (ceph_vino_is_reserved(vino))
60		return ERR_PTR(-EREMOTEIO);
61
62	inode = iget5_locked(sb, (unsigned long)vino.ino, ceph_ino_compare,
63			     ceph_set_ino_cb, &vino);
64	if (!inode)
65		return ERR_PTR(-ENOMEM);
66
67	dout("get_inode on %llu=%llx.%llx got %p new %d\n", ceph_present_inode(inode),
68	     ceph_vinop(inode), inode, !!(inode->i_state & I_NEW));
69	return inode;
70}
71
72/*
73 * get/constuct snapdir inode for a given directory
74 */
75struct inode *ceph_get_snapdir(struct inode *parent)
76{
77	struct ceph_vino vino = {
78		.ino = ceph_ino(parent),
79		.snap = CEPH_SNAPDIR,
80	};
81	struct inode *inode = ceph_get_inode(parent->i_sb, vino);
82	struct ceph_inode_info *ci = ceph_inode(inode);
83
84	BUG_ON(!S_ISDIR(parent->i_mode));
85	if (IS_ERR(inode))
86		return inode;
87	inode->i_mode = parent->i_mode;
88	inode->i_uid = parent->i_uid;
89	inode->i_gid = parent->i_gid;
90	inode->i_mtime = parent->i_mtime;
91	inode->i_ctime = parent->i_ctime;
92	inode->i_atime = parent->i_atime;
93	ci->i_rbytes = 0;
94	ci->i_btime = ceph_inode(parent)->i_btime;
95
96	if (inode->i_state & I_NEW) {
97		inode->i_op = &ceph_snapdir_iops;
98		inode->i_fop = &ceph_snapdir_fops;
99		ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */
100		unlock_new_inode(inode);
101	}
102
103	return inode;
104}
105
106const struct inode_operations ceph_file_iops = {
107	.permission = ceph_permission,
108	.setattr = ceph_setattr,
109	.getattr = ceph_getattr,
110	.listxattr = ceph_listxattr,
111	.get_acl = ceph_get_acl,
112	.set_acl = ceph_set_acl,
113};
114
115
116/*
117 * We use a 'frag tree' to keep track of the MDS's directory fragments
118 * for a given inode (usually there is just a single fragment).  We
119 * need to know when a child frag is delegated to a new MDS, or when
120 * it is flagged as replicated, so we can direct our requests
121 * accordingly.
122 */
123
124/*
125 * find/create a frag in the tree
126 */
127static struct ceph_inode_frag *__get_or_create_frag(struct ceph_inode_info *ci,
128						    u32 f)
129{
130	struct rb_node **p;
131	struct rb_node *parent = NULL;
132	struct ceph_inode_frag *frag;
133	int c;
134
135	p = &ci->i_fragtree.rb_node;
136	while (*p) {
137		parent = *p;
138		frag = rb_entry(parent, struct ceph_inode_frag, node);
139		c = ceph_frag_compare(f, frag->frag);
140		if (c < 0)
141			p = &(*p)->rb_left;
142		else if (c > 0)
143			p = &(*p)->rb_right;
144		else
145			return frag;
146	}
147
148	frag = kmalloc(sizeof(*frag), GFP_NOFS);
149	if (!frag)
150		return ERR_PTR(-ENOMEM);
151
152	frag->frag = f;
153	frag->split_by = 0;
154	frag->mds = -1;
155	frag->ndist = 0;
156
157	rb_link_node(&frag->node, parent, p);
158	rb_insert_color(&frag->node, &ci->i_fragtree);
159
160	dout("get_or_create_frag added %llx.%llx frag %x\n",
161	     ceph_vinop(&ci->vfs_inode), f);
162	return frag;
163}
164
165/*
166 * find a specific frag @f
167 */
168struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, u32 f)
169{
170	struct rb_node *n = ci->i_fragtree.rb_node;
171
172	while (n) {
173		struct ceph_inode_frag *frag =
174			rb_entry(n, struct ceph_inode_frag, node);
175		int c = ceph_frag_compare(f, frag->frag);
176		if (c < 0)
177			n = n->rb_left;
178		else if (c > 0)
179			n = n->rb_right;
180		else
181			return frag;
182	}
183	return NULL;
184}
185
186/*
187 * Choose frag containing the given value @v.  If @pfrag is
188 * specified, copy the frag delegation info to the caller if
189 * it is present.
190 */
191static u32 __ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
192			      struct ceph_inode_frag *pfrag, int *found)
193{
194	u32 t = ceph_frag_make(0, 0);
195	struct ceph_inode_frag *frag;
196	unsigned nway, i;
197	u32 n;
198
199	if (found)
200		*found = 0;
201
202	while (1) {
203		WARN_ON(!ceph_frag_contains_value(t, v));
204		frag = __ceph_find_frag(ci, t);
205		if (!frag)
206			break; /* t is a leaf */
207		if (frag->split_by == 0) {
208			if (pfrag)
209				memcpy(pfrag, frag, sizeof(*pfrag));
210			if (found)
211				*found = 1;
212			break;
213		}
214
215		/* choose child */
216		nway = 1 << frag->split_by;
217		dout("choose_frag(%x) %x splits by %d (%d ways)\n", v, t,
218		     frag->split_by, nway);
219		for (i = 0; i < nway; i++) {
220			n = ceph_frag_make_child(t, frag->split_by, i);
221			if (ceph_frag_contains_value(n, v)) {
222				t = n;
223				break;
224			}
225		}
226		BUG_ON(i == nway);
227	}
228	dout("choose_frag(%x) = %x\n", v, t);
229
230	return t;
231}
232
233u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
234		     struct ceph_inode_frag *pfrag, int *found)
235{
236	u32 ret;
237	mutex_lock(&ci->i_fragtree_mutex);
238	ret = __ceph_choose_frag(ci, v, pfrag, found);
239	mutex_unlock(&ci->i_fragtree_mutex);
240	return ret;
241}
242
243/*
244 * Process dirfrag (delegation) info from the mds.  Include leaf
245 * fragment in tree ONLY if ndist > 0.  Otherwise, only
246 * branches/splits are included in i_fragtree)
247 */
248static int ceph_fill_dirfrag(struct inode *inode,
249			     struct ceph_mds_reply_dirfrag *dirinfo)
250{
251	struct ceph_inode_info *ci = ceph_inode(inode);
252	struct ceph_inode_frag *frag;
253	u32 id = le32_to_cpu(dirinfo->frag);
254	int mds = le32_to_cpu(dirinfo->auth);
255	int ndist = le32_to_cpu(dirinfo->ndist);
256	int diri_auth = -1;
257	int i;
258	int err = 0;
259
260	spin_lock(&ci->i_ceph_lock);
261	if (ci->i_auth_cap)
262		diri_auth = ci->i_auth_cap->mds;
263	spin_unlock(&ci->i_ceph_lock);
264
265	if (mds == -1) /* CDIR_AUTH_PARENT */
266		mds = diri_auth;
267
268	mutex_lock(&ci->i_fragtree_mutex);
269	if (ndist == 0 && mds == diri_auth) {
270		/* no delegation info needed. */
271		frag = __ceph_find_frag(ci, id);
272		if (!frag)
273			goto out;
274		if (frag->split_by == 0) {
275			/* tree leaf, remove */
276			dout("fill_dirfrag removed %llx.%llx frag %x"
277			     " (no ref)\n", ceph_vinop(inode), id);
278			rb_erase(&frag->node, &ci->i_fragtree);
279			kfree(frag);
280		} else {
281			/* tree branch, keep and clear */
282			dout("fill_dirfrag cleared %llx.%llx frag %x"
283			     " referral\n", ceph_vinop(inode), id);
284			frag->mds = -1;
285			frag->ndist = 0;
286		}
287		goto out;
288	}
289
290
291	/* find/add this frag to store mds delegation info */
292	frag = __get_or_create_frag(ci, id);
293	if (IS_ERR(frag)) {
294		/* this is not the end of the world; we can continue
295		   with bad/inaccurate delegation info */
296		pr_err("fill_dirfrag ENOMEM on mds ref %llx.%llx fg %x\n",
297		       ceph_vinop(inode), le32_to_cpu(dirinfo->frag));
298		err = -ENOMEM;
299		goto out;
300	}
301
302	frag->mds = mds;
303	frag->ndist = min_t(u32, ndist, CEPH_MAX_DIRFRAG_REP);
304	for (i = 0; i < frag->ndist; i++)
305		frag->dist[i] = le32_to_cpu(dirinfo->dist[i]);
306	dout("fill_dirfrag %llx.%llx frag %x ndist=%d\n",
307	     ceph_vinop(inode), frag->frag, frag->ndist);
308
309out:
310	mutex_unlock(&ci->i_fragtree_mutex);
311	return err;
312}
313
314static int frag_tree_split_cmp(const void *l, const void *r)
315{
316	struct ceph_frag_tree_split *ls = (struct ceph_frag_tree_split*)l;
317	struct ceph_frag_tree_split *rs = (struct ceph_frag_tree_split*)r;
318	return ceph_frag_compare(le32_to_cpu(ls->frag),
319				 le32_to_cpu(rs->frag));
320}
321
322static bool is_frag_child(u32 f, struct ceph_inode_frag *frag)
323{
324	if (!frag)
325		return f == ceph_frag_make(0, 0);
326	if (ceph_frag_bits(f) != ceph_frag_bits(frag->frag) + frag->split_by)
327		return false;
328	return ceph_frag_contains_value(frag->frag, ceph_frag_value(f));
329}
330
331static int ceph_fill_fragtree(struct inode *inode,
332			      struct ceph_frag_tree_head *fragtree,
333			      struct ceph_mds_reply_dirfrag *dirinfo)
334{
335	struct ceph_inode_info *ci = ceph_inode(inode);
336	struct ceph_inode_frag *frag, *prev_frag = NULL;
337	struct rb_node *rb_node;
338	unsigned i, split_by, nsplits;
339	u32 id;
340	bool update = false;
341
342	mutex_lock(&ci->i_fragtree_mutex);
343	nsplits = le32_to_cpu(fragtree->nsplits);
344	if (nsplits != ci->i_fragtree_nsplits) {
345		update = true;
346	} else if (nsplits) {
347		i = prandom_u32() % nsplits;
348		id = le32_to_cpu(fragtree->splits[i].frag);
349		if (!__ceph_find_frag(ci, id))
350			update = true;
351	} else if (!RB_EMPTY_ROOT(&ci->i_fragtree)) {
352		rb_node = rb_first(&ci->i_fragtree);
353		frag = rb_entry(rb_node, struct ceph_inode_frag, node);
354		if (frag->frag != ceph_frag_make(0, 0) || rb_next(rb_node))
355			update = true;
356	}
357	if (!update && dirinfo) {
358		id = le32_to_cpu(dirinfo->frag);
359		if (id != __ceph_choose_frag(ci, id, NULL, NULL))
360			update = true;
361	}
362	if (!update)
363		goto out_unlock;
364
365	if (nsplits > 1) {
366		sort(fragtree->splits, nsplits, sizeof(fragtree->splits[0]),
367		     frag_tree_split_cmp, NULL);
368	}
369
370	dout("fill_fragtree %llx.%llx\n", ceph_vinop(inode));
371	rb_node = rb_first(&ci->i_fragtree);
372	for (i = 0; i < nsplits; i++) {
373		id = le32_to_cpu(fragtree->splits[i].frag);
374		split_by = le32_to_cpu(fragtree->splits[i].by);
375		if (split_by == 0 || ceph_frag_bits(id) + split_by > 24) {
376			pr_err("fill_fragtree %llx.%llx invalid split %d/%u, "
377			       "frag %x split by %d\n", ceph_vinop(inode),
378			       i, nsplits, id, split_by);
379			continue;
380		}
381		frag = NULL;
382		while (rb_node) {
383			frag = rb_entry(rb_node, struct ceph_inode_frag, node);
384			if (ceph_frag_compare(frag->frag, id) >= 0) {
385				if (frag->frag != id)
386					frag = NULL;
387				else
388					rb_node = rb_next(rb_node);
389				break;
390			}
391			rb_node = rb_next(rb_node);
392			/* delete stale split/leaf node */
393			if (frag->split_by > 0 ||
394			    !is_frag_child(frag->frag, prev_frag)) {
395				rb_erase(&frag->node, &ci->i_fragtree);
396				if (frag->split_by > 0)
397					ci->i_fragtree_nsplits--;
398				kfree(frag);
399			}
400			frag = NULL;
401		}
402		if (!frag) {
403			frag = __get_or_create_frag(ci, id);
404			if (IS_ERR(frag))
405				continue;
406		}
407		if (frag->split_by == 0)
408			ci->i_fragtree_nsplits++;
409		frag->split_by = split_by;
410		dout(" frag %x split by %d\n", frag->frag, frag->split_by);
411		prev_frag = frag;
412	}
413	while (rb_node) {
414		frag = rb_entry(rb_node, struct ceph_inode_frag, node);
415		rb_node = rb_next(rb_node);
416		/* delete stale split/leaf node */
417		if (frag->split_by > 0 ||
418		    !is_frag_child(frag->frag, prev_frag)) {
419			rb_erase(&frag->node, &ci->i_fragtree);
420			if (frag->split_by > 0)
421				ci->i_fragtree_nsplits--;
422			kfree(frag);
423		}
424	}
425out_unlock:
426	mutex_unlock(&ci->i_fragtree_mutex);
427	return 0;
428}
429
430/*
431 * initialize a newly allocated inode.
432 */
433struct inode *ceph_alloc_inode(struct super_block *sb)
434{
435	struct ceph_inode_info *ci;
436	int i;
437
438	ci = kmem_cache_alloc(ceph_inode_cachep, GFP_NOFS);
439	if (!ci)
440		return NULL;
441
442	dout("alloc_inode %p\n", &ci->vfs_inode);
443
444	spin_lock_init(&ci->i_ceph_lock);
445
446	ci->i_version = 0;
447	ci->i_inline_version = 0;
448	ci->i_time_warp_seq = 0;
449	ci->i_ceph_flags = 0;
450	atomic64_set(&ci->i_ordered_count, 1);
451	atomic64_set(&ci->i_release_count, 1);
452	atomic64_set(&ci->i_complete_seq[0], 0);
453	atomic64_set(&ci->i_complete_seq[1], 0);
454	ci->i_symlink = NULL;
455
456	ci->i_max_bytes = 0;
457	ci->i_max_files = 0;
458
459	memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
460	memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
461	RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL);
462
463	ci->i_fragtree = RB_ROOT;
464	mutex_init(&ci->i_fragtree_mutex);
465
466	ci->i_xattrs.blob = NULL;
467	ci->i_xattrs.prealloc_blob = NULL;
468	ci->i_xattrs.dirty = false;
469	ci->i_xattrs.index = RB_ROOT;
470	ci->i_xattrs.count = 0;
471	ci->i_xattrs.names_size = 0;
472	ci->i_xattrs.vals_size = 0;
473	ci->i_xattrs.version = 0;
474	ci->i_xattrs.index_version = 0;
475
476	ci->i_caps = RB_ROOT;
477	ci->i_auth_cap = NULL;
478	ci->i_dirty_caps = 0;
479	ci->i_flushing_caps = 0;
480	INIT_LIST_HEAD(&ci->i_dirty_item);
481	INIT_LIST_HEAD(&ci->i_flushing_item);
482	ci->i_prealloc_cap_flush = NULL;
483	INIT_LIST_HEAD(&ci->i_cap_flush_list);
484	init_waitqueue_head(&ci->i_cap_wq);
485	ci->i_hold_caps_max = 0;
486	INIT_LIST_HEAD(&ci->i_cap_delay_list);
487	INIT_LIST_HEAD(&ci->i_cap_snaps);
488	ci->i_head_snapc = NULL;
489	ci->i_snap_caps = 0;
490
491	ci->i_last_rd = ci->i_last_wr = jiffies - 3600 * HZ;
492	for (i = 0; i < CEPH_FILE_MODE_BITS; i++)
493		ci->i_nr_by_mode[i] = 0;
494
495	mutex_init(&ci->i_truncate_mutex);
496	ci->i_truncate_seq = 0;
497	ci->i_truncate_size = 0;
498	ci->i_truncate_pending = 0;
499
500	ci->i_max_size = 0;
501	ci->i_reported_size = 0;
502	ci->i_wanted_max_size = 0;
503	ci->i_requested_max_size = 0;
504
505	ci->i_pin_ref = 0;
506	ci->i_rd_ref = 0;
507	ci->i_rdcache_ref = 0;
508	ci->i_wr_ref = 0;
509	ci->i_wb_ref = 0;
510	ci->i_fx_ref = 0;
511	ci->i_wrbuffer_ref = 0;
512	ci->i_wrbuffer_ref_head = 0;
513	atomic_set(&ci->i_filelock_ref, 0);
514	atomic_set(&ci->i_shared_gen, 1);
515	ci->i_rdcache_gen = 0;
516	ci->i_rdcache_revoking = 0;
517
518	INIT_LIST_HEAD(&ci->i_unsafe_dirops);
519	INIT_LIST_HEAD(&ci->i_unsafe_iops);
520	spin_lock_init(&ci->i_unsafe_lock);
521
522	ci->i_snap_realm = NULL;
523	INIT_LIST_HEAD(&ci->i_snap_realm_item);
524	INIT_LIST_HEAD(&ci->i_snap_flush_item);
525
526	INIT_WORK(&ci->i_work, ceph_inode_work);
527	ci->i_work_mask = 0;
528	memset(&ci->i_btime, '\0', sizeof(ci->i_btime));
529
530	ceph_fscache_inode_init(ci);
531
532	return &ci->vfs_inode;
533}
534
535void ceph_free_inode(struct inode *inode)
536{
537	struct ceph_inode_info *ci = ceph_inode(inode);
538
539	kfree(ci->i_symlink);
540	kmem_cache_free(ceph_inode_cachep, ci);
541}
542
543void ceph_evict_inode(struct inode *inode)
544{
545	struct ceph_inode_info *ci = ceph_inode(inode);
546	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
547	struct ceph_inode_frag *frag;
548	struct rb_node *n;
549
550	dout("evict_inode %p ino %llx.%llx\n", inode, ceph_vinop(inode));
551
552	percpu_counter_dec(&mdsc->metric.total_inodes);
553
554	truncate_inode_pages_final(&inode->i_data);
555	clear_inode(inode);
556
557	ceph_fscache_unregister_inode_cookie(ci);
558
559	__ceph_remove_caps(ci);
560
561	if (__ceph_has_any_quota(ci))
562		ceph_adjust_quota_realms_count(inode, false);
563
564	/*
565	 * we may still have a snap_realm reference if there are stray
566	 * caps in i_snap_caps.
567	 */
568	if (ci->i_snap_realm) {
569		if (ceph_snap(inode) == CEPH_NOSNAP) {
570			struct ceph_snap_realm *realm = ci->i_snap_realm;
571			dout(" dropping residual ref to snap realm %p\n",
572			     realm);
573			spin_lock(&realm->inodes_with_caps_lock);
574			list_del_init(&ci->i_snap_realm_item);
575			ci->i_snap_realm = NULL;
576			if (realm->ino == ci->i_vino.ino)
577				realm->inode = NULL;
578			spin_unlock(&realm->inodes_with_caps_lock);
579			ceph_put_snap_realm(mdsc, realm);
580		} else {
581			ceph_put_snapid_map(mdsc, ci->i_snapid_map);
582			ci->i_snap_realm = NULL;
583		}
584	}
585
586	while ((n = rb_first(&ci->i_fragtree)) != NULL) {
587		frag = rb_entry(n, struct ceph_inode_frag, node);
588		rb_erase(n, &ci->i_fragtree);
589		kfree(frag);
590	}
591	ci->i_fragtree_nsplits = 0;
592
593	__ceph_destroy_xattrs(ci);
594	if (ci->i_xattrs.blob)
595		ceph_buffer_put(ci->i_xattrs.blob);
596	if (ci->i_xattrs.prealloc_blob)
597		ceph_buffer_put(ci->i_xattrs.prealloc_blob);
598
599	ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns));
600	ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
601}
602
603static inline blkcnt_t calc_inode_blocks(u64 size)
604{
605	return (size + (1<<9) - 1) >> 9;
606}
607
608/*
609 * Helpers to fill in size, ctime, mtime, and atime.  We have to be
610 * careful because either the client or MDS may have more up to date
611 * info, depending on which capabilities are held, and whether
612 * time_warp_seq or truncate_seq have increased.  (Ordinarily, mtime
613 * and size are monotonically increasing, except when utimes() or
614 * truncate() increments the corresponding _seq values.)
615 */
616int ceph_fill_file_size(struct inode *inode, int issued,
617			u32 truncate_seq, u64 truncate_size, u64 size)
618{
619	struct ceph_inode_info *ci = ceph_inode(inode);
620	int queue_trunc = 0;
621
622	if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
623	    (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
624		dout("size %lld -> %llu\n", inode->i_size, size);
625		if (size > 0 && S_ISDIR(inode->i_mode)) {
626			pr_err("fill_file_size non-zero size for directory\n");
627			size = 0;
628		}
629		i_size_write(inode, size);
630		inode->i_blocks = calc_inode_blocks(size);
631		ci->i_reported_size = size;
632		if (truncate_seq != ci->i_truncate_seq) {
633			dout("truncate_seq %u -> %u\n",
634			     ci->i_truncate_seq, truncate_seq);
635			ci->i_truncate_seq = truncate_seq;
636
637			/* the MDS should have revoked these caps */
638			WARN_ON_ONCE(issued & (CEPH_CAP_FILE_RD |
639					       CEPH_CAP_FILE_LAZYIO));
640			/*
641			 * If we hold relevant caps, or in the case where we're
642			 * not the only client referencing this file and we
643			 * don't hold those caps, then we need to check whether
644			 * the file is either opened or mmaped
645			 */
646			if ((issued & (CEPH_CAP_FILE_CACHE|
647				       CEPH_CAP_FILE_BUFFER)) ||
648			    mapping_mapped(inode->i_mapping) ||
649			    __ceph_is_file_opened(ci)) {
650				ci->i_truncate_pending++;
651				queue_trunc = 1;
652			}
653		}
654	}
655	if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) >= 0 &&
656	    ci->i_truncate_size != truncate_size) {
657		dout("truncate_size %lld -> %llu\n", ci->i_truncate_size,
658		     truncate_size);
659		ci->i_truncate_size = truncate_size;
660	}
661
662	if (queue_trunc)
663		ceph_fscache_invalidate(inode);
664
665	return queue_trunc;
666}
667
668void ceph_fill_file_time(struct inode *inode, int issued,
669			 u64 time_warp_seq, struct timespec64 *ctime,
670			 struct timespec64 *mtime, struct timespec64 *atime)
671{
672	struct ceph_inode_info *ci = ceph_inode(inode);
673	int warn = 0;
674
675	if (issued & (CEPH_CAP_FILE_EXCL|
676		      CEPH_CAP_FILE_WR|
677		      CEPH_CAP_FILE_BUFFER|
678		      CEPH_CAP_AUTH_EXCL|
679		      CEPH_CAP_XATTR_EXCL)) {
680		if (ci->i_version == 0 ||
681		    timespec64_compare(ctime, &inode->i_ctime) > 0) {
682			dout("ctime %lld.%09ld -> %lld.%09ld inc w/ cap\n",
683			     inode->i_ctime.tv_sec, inode->i_ctime.tv_nsec,
684			     ctime->tv_sec, ctime->tv_nsec);
685			inode->i_ctime = *ctime;
686		}
687		if (ci->i_version == 0 ||
688		    ceph_seq_cmp(time_warp_seq, ci->i_time_warp_seq) > 0) {
689			/* the MDS did a utimes() */
690			dout("mtime %lld.%09ld -> %lld.%09ld "
691			     "tw %d -> %d\n",
692			     inode->i_mtime.tv_sec, inode->i_mtime.tv_nsec,
693			     mtime->tv_sec, mtime->tv_nsec,
694			     ci->i_time_warp_seq, (int)time_warp_seq);
695
696			inode->i_mtime = *mtime;
697			inode->i_atime = *atime;
698			ci->i_time_warp_seq = time_warp_seq;
699		} else if (time_warp_seq == ci->i_time_warp_seq) {
700			/* nobody did utimes(); take the max */
701			if (timespec64_compare(mtime, &inode->i_mtime) > 0) {
702				dout("mtime %lld.%09ld -> %lld.%09ld inc\n",
703				     inode->i_mtime.tv_sec,
704				     inode->i_mtime.tv_nsec,
705				     mtime->tv_sec, mtime->tv_nsec);
706				inode->i_mtime = *mtime;
707			}
708			if (timespec64_compare(atime, &inode->i_atime) > 0) {
709				dout("atime %lld.%09ld -> %lld.%09ld inc\n",
710				     inode->i_atime.tv_sec,
711				     inode->i_atime.tv_nsec,
712				     atime->tv_sec, atime->tv_nsec);
713				inode->i_atime = *atime;
714			}
715		} else if (issued & CEPH_CAP_FILE_EXCL) {
716			/* we did a utimes(); ignore mds values */
717		} else {
718			warn = 1;
719		}
720	} else {
721		/* we have no write|excl caps; whatever the MDS says is true */
722		if (ceph_seq_cmp(time_warp_seq, ci->i_time_warp_seq) >= 0) {
723			inode->i_ctime = *ctime;
724			inode->i_mtime = *mtime;
725			inode->i_atime = *atime;
726			ci->i_time_warp_seq = time_warp_seq;
727		} else {
728			warn = 1;
729		}
730	}
731	if (warn) /* time_warp_seq shouldn't go backwards */
732		dout("%p mds time_warp_seq %llu < %u\n",
733		     inode, time_warp_seq, ci->i_time_warp_seq);
734}
735
736/*
737 * Populate an inode based on info from mds.  May be called on new or
738 * existing inodes.
739 */
740int ceph_fill_inode(struct inode *inode, struct page *locked_page,
741		    struct ceph_mds_reply_info_in *iinfo,
742		    struct ceph_mds_reply_dirfrag *dirinfo,
743		    struct ceph_mds_session *session, int cap_fmode,
744		    struct ceph_cap_reservation *caps_reservation)
745{
746	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
747	struct ceph_mds_reply_inode *info = iinfo->in;
748	struct ceph_inode_info *ci = ceph_inode(inode);
749	int issued, new_issued, info_caps;
750	struct timespec64 mtime, atime, ctime;
751	struct ceph_buffer *xattr_blob = NULL;
752	struct ceph_buffer *old_blob = NULL;
753	struct ceph_string *pool_ns = NULL;
754	struct ceph_cap *new_cap = NULL;
755	int err = 0;
756	bool wake = false;
757	bool queue_trunc = false;
758	bool new_version = false;
759	bool fill_inline = false;
760
761	lockdep_assert_held(&mdsc->snap_rwsem);
762
763	dout("%s %p ino %llx.%llx v %llu had %llu\n", __func__,
764	     inode, ceph_vinop(inode), le64_to_cpu(info->version),
765	     ci->i_version);
766
767	info_caps = le32_to_cpu(info->cap.caps);
768
769	/* prealloc new cap struct */
770	if (info_caps && ceph_snap(inode) == CEPH_NOSNAP) {
771		new_cap = ceph_get_cap(mdsc, caps_reservation);
772		if (!new_cap)
773			return -ENOMEM;
774	}
775
776	/*
777	 * prealloc xattr data, if it looks like we'll need it.  only
778	 * if len > 4 (meaning there are actually xattrs; the first 4
779	 * bytes are the xattr count).
780	 */
781	if (iinfo->xattr_len > 4) {
782		xattr_blob = ceph_buffer_new(iinfo->xattr_len, GFP_NOFS);
783		if (!xattr_blob)
784			pr_err("%s ENOMEM xattr blob %d bytes\n", __func__,
785			       iinfo->xattr_len);
786	}
787
788	if (iinfo->pool_ns_len > 0)
789		pool_ns = ceph_find_or_create_string(iinfo->pool_ns_data,
790						     iinfo->pool_ns_len);
791
792	if (ceph_snap(inode) != CEPH_NOSNAP && !ci->i_snapid_map)
793		ci->i_snapid_map = ceph_get_snapid_map(mdsc, ceph_snap(inode));
794
795	spin_lock(&ci->i_ceph_lock);
796
797	/*
798	 * provided version will be odd if inode value is projected,
799	 * even if stable.  skip the update if we have newer stable
800	 * info (ours>=theirs, e.g. due to racing mds replies), unless
801	 * we are getting projected (unstable) info (in which case the
802	 * version is odd, and we want ours>theirs).
803	 *   us   them
804	 *   2    2     skip
805	 *   3    2     skip
806	 *   3    3     update
807	 */
808	if (ci->i_version == 0 ||
809	    ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
810	     le64_to_cpu(info->version) > (ci->i_version & ~1)))
811		new_version = true;
812
813	/* Update change_attribute */
814	inode_set_max_iversion_raw(inode, iinfo->change_attr);
815
816	__ceph_caps_issued(ci, &issued);
817	issued |= __ceph_caps_dirty(ci);
818	new_issued = ~issued & info_caps;
819
820	/* update inode */
821	inode->i_rdev = le32_to_cpu(info->rdev);
822	/* directories have fl_stripe_unit set to zero */
823	if (le32_to_cpu(info->layout.fl_stripe_unit))
824		inode->i_blkbits =
825			fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
826	else
827		inode->i_blkbits = CEPH_BLOCK_SHIFT;
828
829	__ceph_update_quota(ci, iinfo->max_bytes, iinfo->max_files);
830
831	if ((new_version || (new_issued & CEPH_CAP_AUTH_SHARED)) &&
832	    (issued & CEPH_CAP_AUTH_EXCL) == 0) {
833		inode->i_mode = le32_to_cpu(info->mode);
834		inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(info->uid));
835		inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(info->gid));
836		dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode,
837		     from_kuid(&init_user_ns, inode->i_uid),
838		     from_kgid(&init_user_ns, inode->i_gid));
839		ceph_decode_timespec64(&ci->i_btime, &iinfo->btime);
840		ceph_decode_timespec64(&ci->i_snap_btime, &iinfo->snap_btime);
841	}
842
843	if ((new_version || (new_issued & CEPH_CAP_LINK_SHARED)) &&
844	    (issued & CEPH_CAP_LINK_EXCL) == 0)
845		set_nlink(inode, le32_to_cpu(info->nlink));
846
847	if (new_version || (new_issued & CEPH_CAP_ANY_RD)) {
848		/* be careful with mtime, atime, size */
849		ceph_decode_timespec64(&atime, &info->atime);
850		ceph_decode_timespec64(&mtime, &info->mtime);
851		ceph_decode_timespec64(&ctime, &info->ctime);
852		ceph_fill_file_time(inode, issued,
853				le32_to_cpu(info->time_warp_seq),
854				&ctime, &mtime, &atime);
855	}
856
857	if (new_version || (info_caps & CEPH_CAP_FILE_SHARED)) {
858		ci->i_files = le64_to_cpu(info->files);
859		ci->i_subdirs = le64_to_cpu(info->subdirs);
860	}
861
862	if (new_version ||
863	    (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
864		s64 old_pool = ci->i_layout.pool_id;
865		struct ceph_string *old_ns;
866
867		ceph_file_layout_from_legacy(&ci->i_layout, &info->layout);
868		old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
869					lockdep_is_held(&ci->i_ceph_lock));
870		rcu_assign_pointer(ci->i_layout.pool_ns, pool_ns);
871
872		if (ci->i_layout.pool_id != old_pool || pool_ns != old_ns)
873			ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
874
875		pool_ns = old_ns;
876
877		queue_trunc = ceph_fill_file_size(inode, issued,
878					le32_to_cpu(info->truncate_seq),
879					le64_to_cpu(info->truncate_size),
880					le64_to_cpu(info->size));
881		/* only update max_size on auth cap */
882		if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
883		    ci->i_max_size != le64_to_cpu(info->max_size)) {
884			dout("max_size %lld -> %llu\n", ci->i_max_size,
885					le64_to_cpu(info->max_size));
886			ci->i_max_size = le64_to_cpu(info->max_size);
887		}
888	}
889
890	/* layout and rstat are not tracked by capability, update them if
891	 * the inode info is from auth mds */
892	if (new_version || (info->cap.flags & CEPH_CAP_FLAG_AUTH)) {
893		if (S_ISDIR(inode->i_mode)) {
894			ci->i_dir_layout = iinfo->dir_layout;
895			ci->i_rbytes = le64_to_cpu(info->rbytes);
896			ci->i_rfiles = le64_to_cpu(info->rfiles);
897			ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
898			ci->i_dir_pin = iinfo->dir_pin;
899			ceph_decode_timespec64(&ci->i_rctime, &info->rctime);
900		}
901	}
902
903	/* xattrs */
904	/* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
905	if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))  &&
906	    le64_to_cpu(info->xattr_version) > ci->i_xattrs.version) {
907		if (ci->i_xattrs.blob)
908			old_blob = ci->i_xattrs.blob;
909		ci->i_xattrs.blob = xattr_blob;
910		if (xattr_blob)
911			memcpy(ci->i_xattrs.blob->vec.iov_base,
912			       iinfo->xattr_data, iinfo->xattr_len);
913		ci->i_xattrs.version = le64_to_cpu(info->xattr_version);
914		ceph_forget_all_cached_acls(inode);
915		ceph_security_invalidate_secctx(inode);
916		xattr_blob = NULL;
917	}
918
919	/* finally update i_version */
920	if (le64_to_cpu(info->version) > ci->i_version)
921		ci->i_version = le64_to_cpu(info->version);
922
923	inode->i_mapping->a_ops = &ceph_aops;
924
925	switch (inode->i_mode & S_IFMT) {
926	case S_IFIFO:
927	case S_IFBLK:
928	case S_IFCHR:
929	case S_IFSOCK:
930		inode->i_blkbits = PAGE_SHIFT;
931		init_special_inode(inode, inode->i_mode, inode->i_rdev);
932		inode->i_op = &ceph_file_iops;
933		break;
934	case S_IFREG:
935		inode->i_op = &ceph_file_iops;
936		inode->i_fop = &ceph_file_fops;
937		break;
938	case S_IFLNK:
939		inode->i_op = &ceph_symlink_iops;
940		if (!ci->i_symlink) {
941			u32 symlen = iinfo->symlink_len;
942			char *sym;
943
944			spin_unlock(&ci->i_ceph_lock);
945
946			if (symlen != i_size_read(inode)) {
947				pr_err("%s %llx.%llx BAD symlink "
948					"size %lld\n", __func__,
949					ceph_vinop(inode),
950					i_size_read(inode));
951				i_size_write(inode, symlen);
952				inode->i_blocks = calc_inode_blocks(symlen);
953			}
954
955			err = -ENOMEM;
956			sym = kstrndup(iinfo->symlink, symlen, GFP_NOFS);
957			if (!sym)
958				goto out;
959
960			spin_lock(&ci->i_ceph_lock);
961			if (!ci->i_symlink)
962				ci->i_symlink = sym;
963			else
964				kfree(sym); /* lost a race */
965		}
966		inode->i_link = ci->i_symlink;
967		break;
968	case S_IFDIR:
969		inode->i_op = &ceph_dir_iops;
970		inode->i_fop = &ceph_dir_fops;
971		break;
972	default:
973		pr_err("%s %llx.%llx BAD mode 0%o\n", __func__,
974		       ceph_vinop(inode), inode->i_mode);
975	}
976
977	/* were we issued a capability? */
978	if (info_caps) {
979		if (ceph_snap(inode) == CEPH_NOSNAP) {
980			ceph_add_cap(inode, session,
981				     le64_to_cpu(info->cap.cap_id),
982				     info_caps,
983				     le32_to_cpu(info->cap.wanted),
984				     le32_to_cpu(info->cap.seq),
985				     le32_to_cpu(info->cap.mseq),
986				     le64_to_cpu(info->cap.realm),
987				     info->cap.flags, &new_cap);
988
989			/* set dir completion flag? */
990			if (S_ISDIR(inode->i_mode) &&
991			    ci->i_files == 0 && ci->i_subdirs == 0 &&
992			    (info_caps & CEPH_CAP_FILE_SHARED) &&
993			    (issued & CEPH_CAP_FILE_EXCL) == 0 &&
994			    !__ceph_dir_is_complete(ci)) {
995				dout(" marking %p complete (empty)\n", inode);
996				i_size_write(inode, 0);
997				__ceph_dir_set_complete(ci,
998					atomic64_read(&ci->i_release_count),
999					atomic64_read(&ci->i_ordered_count));
1000			}
1001
1002			wake = true;
1003		} else {
1004			dout(" %p got snap_caps %s\n", inode,
1005			     ceph_cap_string(info_caps));
1006			ci->i_snap_caps |= info_caps;
1007		}
1008	}
1009
1010	if (iinfo->inline_version > 0 &&
1011	    iinfo->inline_version >= ci->i_inline_version) {
1012		int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
1013		ci->i_inline_version = iinfo->inline_version;
1014		if (ci->i_inline_version != CEPH_INLINE_NONE &&
1015		    (locked_page || (info_caps & cache_caps)))
1016			fill_inline = true;
1017	}
1018
1019	if (cap_fmode >= 0) {
1020		if (!info_caps)
1021			pr_warn("mds issued no caps on %llx.%llx\n",
1022				ceph_vinop(inode));
1023		__ceph_touch_fmode(ci, mdsc, cap_fmode);
1024	}
1025
1026	spin_unlock(&ci->i_ceph_lock);
1027
1028	if (fill_inline)
1029		ceph_fill_inline_data(inode, locked_page,
1030				      iinfo->inline_data, iinfo->inline_len);
1031
1032	if (wake)
1033		wake_up_all(&ci->i_cap_wq);
1034
1035	/* queue truncate if we saw i_size decrease */
1036	if (queue_trunc)
1037		ceph_queue_vmtruncate(inode);
1038
1039	/* populate frag tree */
1040	if (S_ISDIR(inode->i_mode))
1041		ceph_fill_fragtree(inode, &info->fragtree, dirinfo);
1042
1043	/* update delegation info? */
1044	if (dirinfo)
1045		ceph_fill_dirfrag(inode, dirinfo);
1046
1047	err = 0;
1048out:
1049	if (new_cap)
1050		ceph_put_cap(mdsc, new_cap);
1051	ceph_buffer_put(old_blob);
1052	ceph_buffer_put(xattr_blob);
1053	ceph_put_string(pool_ns);
1054	return err;
1055}
1056
1057/*
1058 * caller should hold session s_mutex and dentry->d_lock.
1059 */
1060static void __update_dentry_lease(struct inode *dir, struct dentry *dentry,
1061				  struct ceph_mds_reply_lease *lease,
1062				  struct ceph_mds_session *session,
1063				  unsigned long from_time,
1064				  struct ceph_mds_session **old_lease_session)
1065{
1066	struct ceph_dentry_info *di = ceph_dentry(dentry);
1067	unsigned mask = le16_to_cpu(lease->mask);
1068	long unsigned duration = le32_to_cpu(lease->duration_ms);
1069	long unsigned ttl = from_time + (duration * HZ) / 1000;
1070	long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000;
1071
1072	dout("update_dentry_lease %p duration %lu ms ttl %lu\n",
1073	     dentry, duration, ttl);
1074
1075	/* only track leases on regular dentries */
1076	if (ceph_snap(dir) != CEPH_NOSNAP)
1077		return;
1078
1079	if (mask & CEPH_LEASE_PRIMARY_LINK)
1080		di->flags |= CEPH_DENTRY_PRIMARY_LINK;
1081	else
1082		di->flags &= ~CEPH_DENTRY_PRIMARY_LINK;
1083
1084	di->lease_shared_gen = atomic_read(&ceph_inode(dir)->i_shared_gen);
1085	if (!(mask & CEPH_LEASE_VALID)) {
1086		__ceph_dentry_dir_lease_touch(di);
1087		return;
1088	}
1089
1090	if (di->lease_gen == session->s_cap_gen &&
1091	    time_before(ttl, di->time))
1092		return;  /* we already have a newer lease. */
1093
1094	if (di->lease_session && di->lease_session != session) {
1095		*old_lease_session = di->lease_session;
1096		di->lease_session = NULL;
1097	}
1098
1099	if (!di->lease_session)
1100		di->lease_session = ceph_get_mds_session(session);
1101	di->lease_gen = session->s_cap_gen;
1102	di->lease_seq = le32_to_cpu(lease->seq);
1103	di->lease_renew_after = half_ttl;
1104	di->lease_renew_from = 0;
1105	di->time = ttl;
1106
1107	__ceph_dentry_lease_touch(di);
1108}
1109
1110static inline void update_dentry_lease(struct inode *dir, struct dentry *dentry,
1111					struct ceph_mds_reply_lease *lease,
1112					struct ceph_mds_session *session,
1113					unsigned long from_time)
1114{
1115	struct ceph_mds_session *old_lease_session = NULL;
1116	spin_lock(&dentry->d_lock);
1117	__update_dentry_lease(dir, dentry, lease, session, from_time,
1118			      &old_lease_session);
1119	spin_unlock(&dentry->d_lock);
1120	ceph_put_mds_session(old_lease_session);
1121}
1122
1123/*
1124 * update dentry lease without having parent inode locked
1125 */
1126static void update_dentry_lease_careful(struct dentry *dentry,
1127					struct ceph_mds_reply_lease *lease,
1128					struct ceph_mds_session *session,
1129					unsigned long from_time,
1130					char *dname, u32 dname_len,
1131					struct ceph_vino *pdvino,
1132					struct ceph_vino *ptvino)
1133
1134{
1135	struct inode *dir;
1136	struct ceph_mds_session *old_lease_session = NULL;
1137
1138	spin_lock(&dentry->d_lock);
1139	/* make sure dentry's name matches target */
1140	if (dentry->d_name.len != dname_len ||
1141	    memcmp(dentry->d_name.name, dname, dname_len))
1142		goto out_unlock;
1143
1144	dir = d_inode(dentry->d_parent);
1145	/* make sure parent matches dvino */
1146	if (!ceph_ino_compare(dir, pdvino))
1147		goto out_unlock;
1148
1149	/* make sure dentry's inode matches target. NULL ptvino means that
1150	 * we expect a negative dentry */
1151	if (ptvino) {
1152		if (d_really_is_negative(dentry))
1153			goto out_unlock;
1154		if (!ceph_ino_compare(d_inode(dentry), ptvino))
1155			goto out_unlock;
1156	} else {
1157		if (d_really_is_positive(dentry))
1158			goto out_unlock;
1159	}
1160
1161	__update_dentry_lease(dir, dentry, lease, session,
1162			      from_time, &old_lease_session);
1163out_unlock:
1164	spin_unlock(&dentry->d_lock);
1165	ceph_put_mds_session(old_lease_session);
1166}
1167
1168/*
1169 * splice a dentry to an inode.
1170 * caller must hold directory i_mutex for this to be safe.
1171 */
1172static int splice_dentry(struct dentry **pdn, struct inode *in)
1173{
1174	struct dentry *dn = *pdn;
1175	struct dentry *realdn;
1176
1177	BUG_ON(d_inode(dn));
1178
1179	if (S_ISDIR(in->i_mode)) {
1180		/* If inode is directory, d_splice_alias() below will remove
1181		 * 'realdn' from its origin parent. We need to ensure that
1182		 * origin parent's readdir cache will not reference 'realdn'
1183		 */
1184		realdn = d_find_any_alias(in);
1185		if (realdn) {
1186			struct ceph_dentry_info *di = ceph_dentry(realdn);
1187			spin_lock(&realdn->d_lock);
1188
1189			realdn->d_op->d_prune(realdn);
1190
1191			di->time = jiffies;
1192			di->lease_shared_gen = 0;
1193			di->offset = 0;
1194
1195			spin_unlock(&realdn->d_lock);
1196			dput(realdn);
1197		}
1198	}
1199
1200	/* dn must be unhashed */
1201	if (!d_unhashed(dn))
1202		d_drop(dn);
1203	realdn = d_splice_alias(in, dn);
1204	if (IS_ERR(realdn)) {
1205		pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n",
1206		       PTR_ERR(realdn), dn, in, ceph_vinop(in));
1207		return PTR_ERR(realdn);
1208	}
1209
1210	if (realdn) {
1211		dout("dn %p (%d) spliced with %p (%d) "
1212		     "inode %p ino %llx.%llx\n",
1213		     dn, d_count(dn),
1214		     realdn, d_count(realdn),
1215		     d_inode(realdn), ceph_vinop(d_inode(realdn)));
1216		dput(dn);
1217		*pdn = realdn;
1218	} else {
1219		BUG_ON(!ceph_dentry(dn));
1220		dout("dn %p attached to %p ino %llx.%llx\n",
1221		     dn, d_inode(dn), ceph_vinop(d_inode(dn)));
1222	}
1223	return 0;
1224}
1225
1226/*
1227 * Incorporate results into the local cache.  This is either just
1228 * one inode, or a directory, dentry, and possibly linked-to inode (e.g.,
1229 * after a lookup).
1230 *
1231 * A reply may contain
1232 *         a directory inode along with a dentry.
1233 *  and/or a target inode
1234 *
1235 * Called with snap_rwsem (read).
1236 */
1237int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
1238{
1239	struct ceph_mds_session *session = req->r_session;
1240	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1241	struct inode *in = NULL;
1242	struct ceph_vino tvino, dvino;
1243	struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
1244	int err = 0;
1245
1246	dout("fill_trace %p is_dentry %d is_target %d\n", req,
1247	     rinfo->head->is_dentry, rinfo->head->is_target);
1248
1249	if (!rinfo->head->is_target && !rinfo->head->is_dentry) {
1250		dout("fill_trace reply is empty!\n");
1251		if (rinfo->head->result == 0 && req->r_parent)
1252			ceph_invalidate_dir_request(req);
1253		return 0;
1254	}
1255
1256	if (rinfo->head->is_dentry) {
1257		struct inode *dir = req->r_parent;
1258
1259		if (dir) {
1260			err = ceph_fill_inode(dir, NULL, &rinfo->diri,
1261					      rinfo->dirfrag, session, -1,
1262					      &req->r_caps_reservation);
1263			if (err < 0)
1264				goto done;
1265		} else {
1266			WARN_ON_ONCE(1);
1267		}
1268
1269		if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME &&
1270		    test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
1271		    !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
1272			struct qstr dname;
1273			struct dentry *dn, *parent;
1274
1275			BUG_ON(!rinfo->head->is_target);
1276			BUG_ON(req->r_dentry);
1277
1278			parent = d_find_any_alias(dir);
1279			BUG_ON(!parent);
1280
1281			dname.name = rinfo->dname;
1282			dname.len = rinfo->dname_len;
1283			dname.hash = full_name_hash(parent, dname.name, dname.len);
1284			tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
1285			tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
1286retry_lookup:
1287			dn = d_lookup(parent, &dname);
1288			dout("d_lookup on parent=%p name=%.*s got %p\n",
1289			     parent, dname.len, dname.name, dn);
1290
1291			if (!dn) {
1292				dn = d_alloc(parent, &dname);
1293				dout("d_alloc %p '%.*s' = %p\n", parent,
1294				     dname.len, dname.name, dn);
1295				if (!dn) {
1296					dput(parent);
1297					err = -ENOMEM;
1298					goto done;
1299				}
1300				err = 0;
1301			} else if (d_really_is_positive(dn) &&
1302				   (ceph_ino(d_inode(dn)) != tvino.ino ||
1303				    ceph_snap(d_inode(dn)) != tvino.snap)) {
1304				dout(" dn %p points to wrong inode %p\n",
1305				     dn, d_inode(dn));
1306				ceph_dir_clear_ordered(dir);
1307				d_delete(dn);
1308				dput(dn);
1309				goto retry_lookup;
1310			}
1311
1312			req->r_dentry = dn;
1313			dput(parent);
1314		}
1315	}
1316
1317	if (rinfo->head->is_target) {
1318		tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
1319		tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
1320
1321		in = ceph_get_inode(sb, tvino);
1322		if (IS_ERR(in)) {
1323			err = PTR_ERR(in);
1324			goto done;
1325		}
1326
1327		err = ceph_fill_inode(in, req->r_locked_page, &rinfo->targeti,
1328				NULL, session,
1329				(!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) &&
1330				 !test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) &&
1331				 rinfo->head->result == 0) ?  req->r_fmode : -1,
1332				&req->r_caps_reservation);
1333		if (err < 0) {
1334			pr_err("ceph_fill_inode badness %p %llx.%llx\n",
1335				in, ceph_vinop(in));
1336			if (in->i_state & I_NEW)
1337				discard_new_inode(in);
1338			else
1339				iput(in);
1340			goto done;
1341		}
1342		req->r_target_inode = in;
1343		if (in->i_state & I_NEW)
1344			unlock_new_inode(in);
1345	}
1346
1347	/*
1348	 * ignore null lease/binding on snapdir ENOENT, or else we
1349	 * will have trouble splicing in the virtual snapdir later
1350	 */
1351	if (rinfo->head->is_dentry &&
1352            !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) &&
1353	    test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
1354	    (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name,
1355					       fsc->mount_options->snapdir_name,
1356					       req->r_dentry->d_name.len))) {
1357		/*
1358		 * lookup link rename   : null -> possibly existing inode
1359		 * mknod symlink mkdir  : null -> new inode
1360		 * unlink               : linked -> null
1361		 */
1362		struct inode *dir = req->r_parent;
1363		struct dentry *dn = req->r_dentry;
1364		bool have_dir_cap, have_lease;
1365
1366		BUG_ON(!dn);
1367		BUG_ON(!dir);
1368		BUG_ON(d_inode(dn->d_parent) != dir);
1369
1370		dvino.ino = le64_to_cpu(rinfo->diri.in->ino);
1371		dvino.snap = le64_to_cpu(rinfo->diri.in->snapid);
1372
1373		BUG_ON(ceph_ino(dir) != dvino.ino);
1374		BUG_ON(ceph_snap(dir) != dvino.snap);
1375
1376		/* do we have a lease on the whole dir? */
1377		have_dir_cap =
1378			(le32_to_cpu(rinfo->diri.in->cap.caps) &
1379			 CEPH_CAP_FILE_SHARED);
1380
1381		/* do we have a dn lease? */
1382		have_lease = have_dir_cap ||
1383			le32_to_cpu(rinfo->dlease->duration_ms);
1384		if (!have_lease)
1385			dout("fill_trace  no dentry lease or dir cap\n");
1386
1387		/* rename? */
1388		if (req->r_old_dentry && req->r_op == CEPH_MDS_OP_RENAME) {
1389			struct inode *olddir = req->r_old_dentry_dir;
1390			BUG_ON(!olddir);
1391
1392			dout(" src %p '%pd' dst %p '%pd'\n",
1393			     req->r_old_dentry,
1394			     req->r_old_dentry,
1395			     dn, dn);
1396			dout("fill_trace doing d_move %p -> %p\n",
1397			     req->r_old_dentry, dn);
1398
1399			/* d_move screws up sibling dentries' offsets */
1400			ceph_dir_clear_ordered(dir);
1401			ceph_dir_clear_ordered(olddir);
1402
1403			d_move(req->r_old_dentry, dn);
1404			dout(" src %p '%pd' dst %p '%pd'\n",
1405			     req->r_old_dentry,
1406			     req->r_old_dentry,
1407			     dn, dn);
1408
1409			/* ensure target dentry is invalidated, despite
1410			   rehashing bug in vfs_rename_dir */
1411			ceph_invalidate_dentry_lease(dn);
1412
1413			dout("dn %p gets new offset %lld\n", req->r_old_dentry,
1414			     ceph_dentry(req->r_old_dentry)->offset);
1415
1416			/* swap r_dentry and r_old_dentry in case that
1417			 * splice_dentry() gets called later. This is safe
1418			 * because no other place will use them */
1419			req->r_dentry = req->r_old_dentry;
1420			req->r_old_dentry = dn;
1421			dn = req->r_dentry;
1422		}
1423
1424		/* null dentry? */
1425		if (!rinfo->head->is_target) {
1426			dout("fill_trace null dentry\n");
1427			if (d_really_is_positive(dn)) {
1428				dout("d_delete %p\n", dn);
1429				ceph_dir_clear_ordered(dir);
1430				d_delete(dn);
1431			} else if (have_lease) {
1432				if (d_unhashed(dn))
1433					d_add(dn, NULL);
1434				update_dentry_lease(dir, dn,
1435						    rinfo->dlease, session,
1436						    req->r_request_started);
1437			}
1438			goto done;
1439		}
1440
1441		/* attach proper inode */
1442		if (d_really_is_negative(dn)) {
1443			ceph_dir_clear_ordered(dir);
1444			ihold(in);
1445			err = splice_dentry(&req->r_dentry, in);
1446			if (err < 0)
1447				goto done;
1448			dn = req->r_dentry;  /* may have spliced */
1449		} else if (d_really_is_positive(dn) && d_inode(dn) != in) {
1450			dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
1451			     dn, d_inode(dn), ceph_vinop(d_inode(dn)),
1452			     ceph_vinop(in));
1453			d_invalidate(dn);
1454			have_lease = false;
1455		}
1456
1457		if (have_lease) {
1458			update_dentry_lease(dir, dn,
1459					    rinfo->dlease, session,
1460					    req->r_request_started);
1461		}
1462		dout(" final dn %p\n", dn);
1463	} else if ((req->r_op == CEPH_MDS_OP_LOOKUPSNAP ||
1464		    req->r_op == CEPH_MDS_OP_MKSNAP) &&
1465	           test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
1466		   !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
1467		struct inode *dir = req->r_parent;
1468
1469		/* fill out a snapdir LOOKUPSNAP dentry */
1470		BUG_ON(!dir);
1471		BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
1472		BUG_ON(!req->r_dentry);
1473		dout(" linking snapped dir %p to dn %p\n", in, req->r_dentry);
1474		ceph_dir_clear_ordered(dir);
1475		ihold(in);
1476		err = splice_dentry(&req->r_dentry, in);
1477		if (err < 0)
1478			goto done;
1479	} else if (rinfo->head->is_dentry && req->r_dentry) {
1480		/* parent inode is not locked, be carefull */
1481		struct ceph_vino *ptvino = NULL;
1482		dvino.ino = le64_to_cpu(rinfo->diri.in->ino);
1483		dvino.snap = le64_to_cpu(rinfo->diri.in->snapid);
1484		if (rinfo->head->is_target) {
1485			tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
1486			tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
1487			ptvino = &tvino;
1488		}
1489		update_dentry_lease_careful(req->r_dentry, rinfo->dlease,
1490					    session, req->r_request_started,
1491					    rinfo->dname, rinfo->dname_len,
1492					    &dvino, ptvino);
1493	}
1494done:
1495	dout("fill_trace done err=%d\n", err);
1496	return err;
1497}
1498
1499/*
1500 * Prepopulate our cache with readdir results, leases, etc.
1501 */
1502static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
1503					   struct ceph_mds_session *session)
1504{
1505	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1506	int i, err = 0;
1507
1508	for (i = 0; i < rinfo->dir_nr; i++) {
1509		struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
1510		struct ceph_vino vino;
1511		struct inode *in;
1512		int rc;
1513
1514		vino.ino = le64_to_cpu(rde->inode.in->ino);
1515		vino.snap = le64_to_cpu(rde->inode.in->snapid);
1516
1517		in = ceph_get_inode(req->r_dentry->d_sb, vino);
1518		if (IS_ERR(in)) {
1519			err = PTR_ERR(in);
1520			dout("new_inode badness got %d\n", err);
1521			continue;
1522		}
1523		rc = ceph_fill_inode(in, NULL, &rde->inode, NULL, session,
1524				     -1, &req->r_caps_reservation);
1525		if (rc < 0) {
1526			pr_err("ceph_fill_inode badness on %p got %d\n",
1527			       in, rc);
1528			err = rc;
1529			if (in->i_state & I_NEW) {
1530				ihold(in);
1531				discard_new_inode(in);
1532			}
1533		} else if (in->i_state & I_NEW) {
1534			unlock_new_inode(in);
1535		}
1536
1537		/* avoid calling iput_final() in mds dispatch threads */
1538		ceph_async_iput(in);
1539	}
1540
1541	return err;
1542}
1543
1544void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl)
1545{
1546	if (ctl->page) {
1547		kunmap(ctl->page);
1548		put_page(ctl->page);
1549		ctl->page = NULL;
1550	}
1551}
1552
1553static int fill_readdir_cache(struct inode *dir, struct dentry *dn,
1554			      struct ceph_readdir_cache_control *ctl,
1555			      struct ceph_mds_request *req)
1556{
1557	struct ceph_inode_info *ci = ceph_inode(dir);
1558	unsigned nsize = PAGE_SIZE / sizeof(struct dentry*);
1559	unsigned idx = ctl->index % nsize;
1560	pgoff_t pgoff = ctl->index / nsize;
1561
1562	if (!ctl->page || pgoff != page_index(ctl->page)) {
1563		ceph_readdir_cache_release(ctl);
1564		if (idx == 0)
1565			ctl->page = grab_cache_page(&dir->i_data, pgoff);
1566		else
1567			ctl->page = find_lock_page(&dir->i_data, pgoff);
1568		if (!ctl->page) {
1569			ctl->index = -1;
1570			return idx == 0 ? -ENOMEM : 0;
1571		}
1572		/* reading/filling the cache are serialized by
1573		 * i_mutex, no need to use page lock */
1574		unlock_page(ctl->page);
1575		ctl->dentries = kmap(ctl->page);
1576		if (idx == 0)
1577			memset(ctl->dentries, 0, PAGE_SIZE);
1578	}
1579
1580	if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) &&
1581	    req->r_dir_ordered_cnt == atomic64_read(&ci->i_ordered_count)) {
1582		dout("readdir cache dn %p idx %d\n", dn, ctl->index);
1583		ctl->dentries[idx] = dn;
1584		ctl->index++;
1585	} else {
1586		dout("disable readdir cache\n");
1587		ctl->index = -1;
1588	}
1589	return 0;
1590}
1591
1592int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1593			     struct ceph_mds_session *session)
1594{
1595	struct dentry *parent = req->r_dentry;
1596	struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
1597	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1598	struct qstr dname;
1599	struct dentry *dn;
1600	struct inode *in;
1601	int err = 0, skipped = 0, ret, i;
1602	struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
1603	u32 frag = le32_to_cpu(rhead->args.readdir.frag);
1604	u32 last_hash = 0;
1605	u32 fpos_offset;
1606	struct ceph_readdir_cache_control cache_ctl = {};
1607
1608	if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
1609		return readdir_prepopulate_inodes_only(req, session);
1610
1611	if (rinfo->hash_order) {
1612		if (req->r_path2) {
1613			last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
1614						  req->r_path2,
1615						  strlen(req->r_path2));
1616			last_hash = ceph_frag_value(last_hash);
1617		} else if (rinfo->offset_hash) {
1618			/* mds understands offset_hash */
1619			WARN_ON_ONCE(req->r_readdir_offset != 2);
1620			last_hash = le32_to_cpu(rhead->args.readdir.offset_hash);
1621		}
1622	}
1623
1624	if (rinfo->dir_dir &&
1625	    le32_to_cpu(rinfo->dir_dir->frag) != frag) {
1626		dout("readdir_prepopulate got new frag %x -> %x\n",
1627		     frag, le32_to_cpu(rinfo->dir_dir->frag));
1628		frag = le32_to_cpu(rinfo->dir_dir->frag);
1629		if (!rinfo->hash_order)
1630			req->r_readdir_offset = 2;
1631	}
1632
1633	if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
1634		dout("readdir_prepopulate %d items under SNAPDIR dn %p\n",
1635		     rinfo->dir_nr, parent);
1636	} else {
1637		dout("readdir_prepopulate %d items under dn %p\n",
1638		     rinfo->dir_nr, parent);
1639		if (rinfo->dir_dir)
1640			ceph_fill_dirfrag(d_inode(parent), rinfo->dir_dir);
1641
1642		if (ceph_frag_is_leftmost(frag) &&
1643		    req->r_readdir_offset == 2 &&
1644		    !(rinfo->hash_order && last_hash)) {
1645			/* note dir version at start of readdir so we can
1646			 * tell if any dentries get dropped */
1647			req->r_dir_release_cnt =
1648				atomic64_read(&ci->i_release_count);
1649			req->r_dir_ordered_cnt =
1650				atomic64_read(&ci->i_ordered_count);
1651			req->r_readdir_cache_idx = 0;
1652		}
1653	}
1654
1655	cache_ctl.index = req->r_readdir_cache_idx;
1656	fpos_offset = req->r_readdir_offset;
1657
1658	/* FIXME: release caps/leases if error occurs */
1659	for (i = 0; i < rinfo->dir_nr; i++) {
1660		struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
1661		struct ceph_vino tvino;
1662
1663		dname.name = rde->name;
1664		dname.len = rde->name_len;
1665		dname.hash = full_name_hash(parent, dname.name, dname.len);
1666
1667		tvino.ino = le64_to_cpu(rde->inode.in->ino);
1668		tvino.snap = le64_to_cpu(rde->inode.in->snapid);
1669
1670		if (rinfo->hash_order) {
1671			u32 hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
1672						 rde->name, rde->name_len);
1673			hash = ceph_frag_value(hash);
1674			if (hash != last_hash)
1675				fpos_offset = 2;
1676			last_hash = hash;
1677			rde->offset = ceph_make_fpos(hash, fpos_offset++, true);
1678		} else {
1679			rde->offset = ceph_make_fpos(frag, fpos_offset++, false);
1680		}
1681
1682retry_lookup:
1683		dn = d_lookup(parent, &dname);
1684		dout("d_lookup on parent=%p name=%.*s got %p\n",
1685		     parent, dname.len, dname.name, dn);
1686
1687		if (!dn) {
1688			dn = d_alloc(parent, &dname);
1689			dout("d_alloc %p '%.*s' = %p\n", parent,
1690			     dname.len, dname.name, dn);
1691			if (!dn) {
1692				dout("d_alloc badness\n");
1693				err = -ENOMEM;
1694				goto out;
1695			}
1696		} else if (d_really_is_positive(dn) &&
1697			   (ceph_ino(d_inode(dn)) != tvino.ino ||
1698			    ceph_snap(d_inode(dn)) != tvino.snap)) {
1699			struct ceph_dentry_info *di = ceph_dentry(dn);
1700			dout(" dn %p points to wrong inode %p\n",
1701			     dn, d_inode(dn));
1702
1703			spin_lock(&dn->d_lock);
1704			if (di->offset > 0 &&
1705			    di->lease_shared_gen ==
1706			    atomic_read(&ci->i_shared_gen)) {
1707				__ceph_dir_clear_ordered(ci);
1708				di->offset = 0;
1709			}
1710			spin_unlock(&dn->d_lock);
1711
1712			d_delete(dn);
1713			dput(dn);
1714			goto retry_lookup;
1715		}
1716
1717		/* inode */
1718		if (d_really_is_positive(dn)) {
1719			in = d_inode(dn);
1720		} else {
1721			in = ceph_get_inode(parent->d_sb, tvino);
1722			if (IS_ERR(in)) {
1723				dout("new_inode badness\n");
1724				d_drop(dn);
1725				dput(dn);
1726				err = PTR_ERR(in);
1727				goto out;
1728			}
1729		}
1730
1731		ret = ceph_fill_inode(in, NULL, &rde->inode, NULL, session,
1732				      -1, &req->r_caps_reservation);
1733		if (ret < 0) {
1734			pr_err("ceph_fill_inode badness on %p\n", in);
1735			if (d_really_is_negative(dn)) {
1736				/* avoid calling iput_final() in mds
1737				 * dispatch threads */
1738				if (in->i_state & I_NEW) {
1739					ihold(in);
1740					discard_new_inode(in);
1741				}
1742				ceph_async_iput(in);
1743			}
1744			d_drop(dn);
1745			err = ret;
1746			goto next_item;
1747		}
1748		if (in->i_state & I_NEW)
1749			unlock_new_inode(in);
1750
1751		if (d_really_is_negative(dn)) {
1752			if (ceph_security_xattr_deadlock(in)) {
1753				dout(" skip splicing dn %p to inode %p"
1754				     " (security xattr deadlock)\n", dn, in);
1755				ceph_async_iput(in);
1756				skipped++;
1757				goto next_item;
1758			}
1759
1760			err = splice_dentry(&dn, in);
1761			if (err < 0)
1762				goto next_item;
1763		}
1764
1765		ceph_dentry(dn)->offset = rde->offset;
1766
1767		update_dentry_lease(d_inode(parent), dn,
1768				    rde->lease, req->r_session,
1769				    req->r_request_started);
1770
1771		if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
1772			ret = fill_readdir_cache(d_inode(parent), dn,
1773						 &cache_ctl, req);
1774			if (ret < 0)
1775				err = ret;
1776		}
1777next_item:
1778		dput(dn);
1779	}
1780out:
1781	if (err == 0 && skipped == 0) {
1782		set_bit(CEPH_MDS_R_DID_PREPOPULATE, &req->r_req_flags);
1783		req->r_readdir_cache_idx = cache_ctl.index;
1784	}
1785	ceph_readdir_cache_release(&cache_ctl);
1786	dout("readdir_prepopulate done\n");
1787	return err;
1788}
1789
1790bool ceph_inode_set_size(struct inode *inode, loff_t size)
1791{
1792	struct ceph_inode_info *ci = ceph_inode(inode);
1793	bool ret;
1794
1795	spin_lock(&ci->i_ceph_lock);
1796	dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
1797	i_size_write(inode, size);
1798	inode->i_blocks = calc_inode_blocks(size);
1799
1800	ret = __ceph_should_report_size(ci);
1801
1802	spin_unlock(&ci->i_ceph_lock);
1803	return ret;
1804}
1805
1806/*
1807 * Put reference to inode, but avoid calling iput_final() in current thread.
1808 * iput_final() may wait for reahahead pages. The wait can cause deadlock in
1809 * some contexts.
1810 */
1811void ceph_async_iput(struct inode *inode)
1812{
1813	if (!inode)
1814		return;
1815	for (;;) {
1816		if (atomic_add_unless(&inode->i_count, -1, 1))
1817			break;
1818		if (queue_work(ceph_inode_to_client(inode)->inode_wq,
1819			       &ceph_inode(inode)->i_work))
1820			break;
1821		/* queue work failed, i_count must be at least 2 */
1822	}
1823}
1824
1825/*
1826 * Write back inode data in a worker thread.  (This can't be done
1827 * in the message handler context.)
1828 */
1829void ceph_queue_writeback(struct inode *inode)
1830{
1831	struct ceph_inode_info *ci = ceph_inode(inode);
1832	set_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask);
1833
1834	ihold(inode);
1835	if (queue_work(ceph_inode_to_client(inode)->inode_wq,
1836		       &ci->i_work)) {
1837		dout("ceph_queue_writeback %p\n", inode);
1838	} else {
1839		dout("ceph_queue_writeback %p already queued, mask=%lx\n",
1840		     inode, ci->i_work_mask);
1841		iput(inode);
1842	}
1843}
1844
1845/*
1846 * queue an async invalidation
1847 */
1848void ceph_queue_invalidate(struct inode *inode)
1849{
1850	struct ceph_inode_info *ci = ceph_inode(inode);
1851	set_bit(CEPH_I_WORK_INVALIDATE_PAGES, &ci->i_work_mask);
1852
1853	ihold(inode);
1854	if (queue_work(ceph_inode_to_client(inode)->inode_wq,
1855		       &ceph_inode(inode)->i_work)) {
1856		dout("ceph_queue_invalidate %p\n", inode);
1857	} else {
1858		dout("ceph_queue_invalidate %p already queued, mask=%lx\n",
1859		     inode, ci->i_work_mask);
1860		iput(inode);
1861	}
1862}
1863
1864/*
1865 * Queue an async vmtruncate.  If we fail to queue work, we will handle
1866 * the truncation the next time we call __ceph_do_pending_vmtruncate.
1867 */
1868void ceph_queue_vmtruncate(struct inode *inode)
1869{
1870	struct ceph_inode_info *ci = ceph_inode(inode);
1871	set_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask);
1872
1873	ihold(inode);
1874	if (queue_work(ceph_inode_to_client(inode)->inode_wq,
1875		       &ci->i_work)) {
1876		dout("ceph_queue_vmtruncate %p\n", inode);
1877	} else {
1878		dout("ceph_queue_vmtruncate %p already queued, mask=%lx\n",
1879		     inode, ci->i_work_mask);
1880		iput(inode);
1881	}
1882}
1883
1884static void ceph_do_invalidate_pages(struct inode *inode)
1885{
1886	struct ceph_inode_info *ci = ceph_inode(inode);
1887	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1888	u32 orig_gen;
1889	int check = 0;
1890
1891	mutex_lock(&ci->i_truncate_mutex);
1892
1893	if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1894		pr_warn_ratelimited("invalidate_pages %p %lld forced umount\n",
1895				    inode, ceph_ino(inode));
1896		mapping_set_error(inode->i_mapping, -EIO);
1897		truncate_pagecache(inode, 0);
1898		mutex_unlock(&ci->i_truncate_mutex);
1899		goto out;
1900	}
1901
1902	spin_lock(&ci->i_ceph_lock);
1903	dout("invalidate_pages %p gen %d revoking %d\n", inode,
1904	     ci->i_rdcache_gen, ci->i_rdcache_revoking);
1905	if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
1906		if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE))
1907			check = 1;
1908		spin_unlock(&ci->i_ceph_lock);
1909		mutex_unlock(&ci->i_truncate_mutex);
1910		goto out;
1911	}
1912	orig_gen = ci->i_rdcache_gen;
1913	spin_unlock(&ci->i_ceph_lock);
1914
1915	ceph_fscache_invalidate(inode);
1916	if (invalidate_inode_pages2(inode->i_mapping) < 0) {
1917		pr_err("invalidate_pages %p fails\n", inode);
1918	}
1919
1920	spin_lock(&ci->i_ceph_lock);
1921	if (orig_gen == ci->i_rdcache_gen &&
1922	    orig_gen == ci->i_rdcache_revoking) {
1923		dout("invalidate_pages %p gen %d successful\n", inode,
1924		     ci->i_rdcache_gen);
1925		ci->i_rdcache_revoking--;
1926		check = 1;
1927	} else {
1928		dout("invalidate_pages %p gen %d raced, now %d revoking %d\n",
1929		     inode, orig_gen, ci->i_rdcache_gen,
1930		     ci->i_rdcache_revoking);
1931		if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE))
1932			check = 1;
1933	}
1934	spin_unlock(&ci->i_ceph_lock);
1935	mutex_unlock(&ci->i_truncate_mutex);
1936out:
1937	if (check)
1938		ceph_check_caps(ci, 0, NULL);
1939}
1940
1941/*
1942 * Make sure any pending truncation is applied before doing anything
1943 * that may depend on it.
1944 */
1945void __ceph_do_pending_vmtruncate(struct inode *inode)
1946{
1947	struct ceph_inode_info *ci = ceph_inode(inode);
1948	u64 to;
1949	int wrbuffer_refs, finish = 0;
1950
1951	mutex_lock(&ci->i_truncate_mutex);
1952retry:
1953	spin_lock(&ci->i_ceph_lock);
1954	if (ci->i_truncate_pending == 0) {
1955		dout("__do_pending_vmtruncate %p none pending\n", inode);
1956		spin_unlock(&ci->i_ceph_lock);
1957		mutex_unlock(&ci->i_truncate_mutex);
1958		return;
1959	}
1960
1961	/*
1962	 * make sure any dirty snapped pages are flushed before we
1963	 * possibly truncate them.. so write AND block!
1964	 */
1965	if (ci->i_wrbuffer_ref_head < ci->i_wrbuffer_ref) {
1966		spin_unlock(&ci->i_ceph_lock);
1967		dout("__do_pending_vmtruncate %p flushing snaps first\n",
1968		     inode);
1969		filemap_write_and_wait_range(&inode->i_data, 0,
1970					     inode->i_sb->s_maxbytes);
1971		goto retry;
1972	}
1973
1974	/* there should be no reader or writer */
1975	WARN_ON_ONCE(ci->i_rd_ref || ci->i_wr_ref);
1976
1977	to = ci->i_truncate_size;
1978	wrbuffer_refs = ci->i_wrbuffer_ref;
1979	dout("__do_pending_vmtruncate %p (%d) to %lld\n", inode,
1980	     ci->i_truncate_pending, to);
1981	spin_unlock(&ci->i_ceph_lock);
1982
1983	truncate_pagecache(inode, to);
1984
1985	spin_lock(&ci->i_ceph_lock);
1986	if (to == ci->i_truncate_size) {
1987		ci->i_truncate_pending = 0;
1988		finish = 1;
1989	}
1990	spin_unlock(&ci->i_ceph_lock);
1991	if (!finish)
1992		goto retry;
1993
1994	mutex_unlock(&ci->i_truncate_mutex);
1995
1996	if (wrbuffer_refs == 0)
1997		ceph_check_caps(ci, 0, NULL);
1998
1999	wake_up_all(&ci->i_cap_wq);
2000}
2001
2002static void ceph_inode_work(struct work_struct *work)
2003{
2004	struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
2005						 i_work);
2006	struct inode *inode = &ci->vfs_inode;
2007
2008	if (test_and_clear_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask)) {
2009		dout("writeback %p\n", inode);
2010		filemap_fdatawrite(&inode->i_data);
2011	}
2012	if (test_and_clear_bit(CEPH_I_WORK_INVALIDATE_PAGES, &ci->i_work_mask))
2013		ceph_do_invalidate_pages(inode);
2014
2015	if (test_and_clear_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask))
2016		__ceph_do_pending_vmtruncate(inode);
2017
2018	iput(inode);
2019}
2020
2021/*
2022 * symlinks
2023 */
2024static const struct inode_operations ceph_symlink_iops = {
2025	.get_link = simple_get_link,
2026	.setattr = ceph_setattr,
2027	.getattr = ceph_getattr,
2028	.listxattr = ceph_listxattr,
2029};
2030
2031int __ceph_setattr(struct inode *inode, struct iattr *attr)
2032{
2033	struct ceph_inode_info *ci = ceph_inode(inode);
2034	unsigned int ia_valid = attr->ia_valid;
2035	struct ceph_mds_request *req;
2036	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
2037	struct ceph_cap_flush *prealloc_cf;
2038	int issued;
2039	int release = 0, dirtied = 0;
2040	int mask = 0;
2041	int err = 0;
2042	int inode_dirty_flags = 0;
2043	bool lock_snap_rwsem = false;
2044
2045	prealloc_cf = ceph_alloc_cap_flush();
2046	if (!prealloc_cf)
2047		return -ENOMEM;
2048
2049	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETATTR,
2050				       USE_AUTH_MDS);
2051	if (IS_ERR(req)) {
2052		ceph_free_cap_flush(prealloc_cf);
2053		return PTR_ERR(req);
2054	}
2055
2056	spin_lock(&ci->i_ceph_lock);
2057	issued = __ceph_caps_issued(ci, NULL);
2058
2059	if (!ci->i_head_snapc &&
2060	    (issued & (CEPH_CAP_ANY_EXCL | CEPH_CAP_FILE_WR))) {
2061		lock_snap_rwsem = true;
2062		if (!down_read_trylock(&mdsc->snap_rwsem)) {
2063			spin_unlock(&ci->i_ceph_lock);
2064			down_read(&mdsc->snap_rwsem);
2065			spin_lock(&ci->i_ceph_lock);
2066			issued = __ceph_caps_issued(ci, NULL);
2067		}
2068	}
2069
2070	dout("setattr %p issued %s\n", inode, ceph_cap_string(issued));
2071
2072	if (ia_valid & ATTR_UID) {
2073		dout("setattr %p uid %d -> %d\n", inode,
2074		     from_kuid(&init_user_ns, inode->i_uid),
2075		     from_kuid(&init_user_ns, attr->ia_uid));
2076		if (issued & CEPH_CAP_AUTH_EXCL) {
2077			inode->i_uid = attr->ia_uid;
2078			dirtied |= CEPH_CAP_AUTH_EXCL;
2079		} else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||
2080			   !uid_eq(attr->ia_uid, inode->i_uid)) {
2081			req->r_args.setattr.uid = cpu_to_le32(
2082				from_kuid(&init_user_ns, attr->ia_uid));
2083			mask |= CEPH_SETATTR_UID;
2084			release |= CEPH_CAP_AUTH_SHARED;
2085		}
2086	}
2087	if (ia_valid & ATTR_GID) {
2088		dout("setattr %p gid %d -> %d\n", inode,
2089		     from_kgid(&init_user_ns, inode->i_gid),
2090		     from_kgid(&init_user_ns, attr->ia_gid));
2091		if (issued & CEPH_CAP_AUTH_EXCL) {
2092			inode->i_gid = attr->ia_gid;
2093			dirtied |= CEPH_CAP_AUTH_EXCL;
2094		} else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||
2095			   !gid_eq(attr->ia_gid, inode->i_gid)) {
2096			req->r_args.setattr.gid = cpu_to_le32(
2097				from_kgid(&init_user_ns, attr->ia_gid));
2098			mask |= CEPH_SETATTR_GID;
2099			release |= CEPH_CAP_AUTH_SHARED;
2100		}
2101	}
2102	if (ia_valid & ATTR_MODE) {
2103		dout("setattr %p mode 0%o -> 0%o\n", inode, inode->i_mode,
2104		     attr->ia_mode);
2105		if (issued & CEPH_CAP_AUTH_EXCL) {
2106			inode->i_mode = attr->ia_mode;
2107			dirtied |= CEPH_CAP_AUTH_EXCL;
2108		} else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||
2109			   attr->ia_mode != inode->i_mode) {
2110			inode->i_mode = attr->ia_mode;
2111			req->r_args.setattr.mode = cpu_to_le32(attr->ia_mode);
2112			mask |= CEPH_SETATTR_MODE;
2113			release |= CEPH_CAP_AUTH_SHARED;
2114		}
2115	}
2116
2117	if (ia_valid & ATTR_ATIME) {
2118		dout("setattr %p atime %lld.%ld -> %lld.%ld\n", inode,
2119		     inode->i_atime.tv_sec, inode->i_atime.tv_nsec,
2120		     attr->ia_atime.tv_sec, attr->ia_atime.tv_nsec);
2121		if (issued & CEPH_CAP_FILE_EXCL) {
2122			ci->i_time_warp_seq++;
2123			inode->i_atime = attr->ia_atime;
2124			dirtied |= CEPH_CAP_FILE_EXCL;
2125		} else if ((issued & CEPH_CAP_FILE_WR) &&
2126			   timespec64_compare(&inode->i_atime,
2127					    &attr->ia_atime) < 0) {
2128			inode->i_atime = attr->ia_atime;
2129			dirtied |= CEPH_CAP_FILE_WR;
2130		} else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
2131			   !timespec64_equal(&inode->i_atime, &attr->ia_atime)) {
2132			ceph_encode_timespec64(&req->r_args.setattr.atime,
2133					       &attr->ia_atime);
2134			mask |= CEPH_SETATTR_ATIME;
2135			release |= CEPH_CAP_FILE_SHARED |
2136				   CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
2137		}
2138	}
2139	if (ia_valid & ATTR_SIZE) {
2140		dout("setattr %p size %lld -> %lld\n", inode,
2141		     inode->i_size, attr->ia_size);
2142		if ((issued & CEPH_CAP_FILE_EXCL) &&
2143		    attr->ia_size > inode->i_size) {
2144			i_size_write(inode, attr->ia_size);
2145			inode->i_blocks = calc_inode_blocks(attr->ia_size);
2146			ci->i_reported_size = attr->ia_size;
2147			dirtied |= CEPH_CAP_FILE_EXCL;
2148			ia_valid |= ATTR_MTIME;
2149		} else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
2150			   attr->ia_size != inode->i_size) {
2151			req->r_args.setattr.size = cpu_to_le64(attr->ia_size);
2152			req->r_args.setattr.old_size =
2153				cpu_to_le64(inode->i_size);
2154			mask |= CEPH_SETATTR_SIZE;
2155			release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
2156				   CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
2157		}
2158	}
2159	if (ia_valid & ATTR_MTIME) {
2160		dout("setattr %p mtime %lld.%ld -> %lld.%ld\n", inode,
2161		     inode->i_mtime.tv_sec, inode->i_mtime.tv_nsec,
2162		     attr->ia_mtime.tv_sec, attr->ia_mtime.tv_nsec);
2163		if (issued & CEPH_CAP_FILE_EXCL) {
2164			ci->i_time_warp_seq++;
2165			inode->i_mtime = attr->ia_mtime;
2166			dirtied |= CEPH_CAP_FILE_EXCL;
2167		} else if ((issued & CEPH_CAP_FILE_WR) &&
2168			   timespec64_compare(&inode->i_mtime,
2169					    &attr->ia_mtime) < 0) {
2170			inode->i_mtime = attr->ia_mtime;
2171			dirtied |= CEPH_CAP_FILE_WR;
2172		} else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
2173			   !timespec64_equal(&inode->i_mtime, &attr->ia_mtime)) {
2174			ceph_encode_timespec64(&req->r_args.setattr.mtime,
2175					       &attr->ia_mtime);
2176			mask |= CEPH_SETATTR_MTIME;
2177			release |= CEPH_CAP_FILE_SHARED |
2178				   CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
2179		}
2180	}
2181
2182	/* these do nothing */
2183	if (ia_valid & ATTR_CTIME) {
2184		bool only = (ia_valid & (ATTR_SIZE|ATTR_MTIME|ATTR_ATIME|
2185					 ATTR_MODE|ATTR_UID|ATTR_GID)) == 0;
2186		dout("setattr %p ctime %lld.%ld -> %lld.%ld (%s)\n", inode,
2187		     inode->i_ctime.tv_sec, inode->i_ctime.tv_nsec,
2188		     attr->ia_ctime.tv_sec, attr->ia_ctime.tv_nsec,
2189		     only ? "ctime only" : "ignored");
2190		if (only) {
2191			/*
2192			 * if kernel wants to dirty ctime but nothing else,
2193			 * we need to choose a cap to dirty under, or do
2194			 * a almost-no-op setattr
2195			 */
2196			if (issued & CEPH_CAP_AUTH_EXCL)
2197				dirtied |= CEPH_CAP_AUTH_EXCL;
2198			else if (issued & CEPH_CAP_FILE_EXCL)
2199				dirtied |= CEPH_CAP_FILE_EXCL;
2200			else if (issued & CEPH_CAP_XATTR_EXCL)
2201				dirtied |= CEPH_CAP_XATTR_EXCL;
2202			else
2203				mask |= CEPH_SETATTR_CTIME;
2204		}
2205	}
2206	if (ia_valid & ATTR_FILE)
2207		dout("setattr %p ATTR_FILE ... hrm!\n", inode);
2208
2209	if (dirtied) {
2210		inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied,
2211							   &prealloc_cf);
2212		inode->i_ctime = attr->ia_ctime;
2213	}
2214
2215	release &= issued;
2216	spin_unlock(&ci->i_ceph_lock);
2217	if (lock_snap_rwsem)
2218		up_read(&mdsc->snap_rwsem);
2219
2220	if (inode_dirty_flags)
2221		__mark_inode_dirty(inode, inode_dirty_flags);
2222
2223
2224	if (mask) {
2225		req->r_inode = inode;
2226		ihold(inode);
2227		req->r_inode_drop = release;
2228		req->r_args.setattr.mask = cpu_to_le32(mask);
2229		req->r_num_caps = 1;
2230		req->r_stamp = attr->ia_ctime;
2231		err = ceph_mdsc_do_request(mdsc, NULL, req);
2232	}
2233	dout("setattr %p result=%d (%s locally, %d remote)\n", inode, err,
2234	     ceph_cap_string(dirtied), mask);
2235
2236	ceph_mdsc_put_request(req);
2237	ceph_free_cap_flush(prealloc_cf);
2238
2239	if (err >= 0 && (mask & CEPH_SETATTR_SIZE))
2240		__ceph_do_pending_vmtruncate(inode);
2241
2242	return err;
2243}
2244
2245/*
2246 * setattr
2247 */
2248int ceph_setattr(struct dentry *dentry, struct iattr *attr)
2249{
2250	struct inode *inode = d_inode(dentry);
2251	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
2252	int err;
2253
2254	if (ceph_snap(inode) != CEPH_NOSNAP)
2255		return -EROFS;
2256
2257	err = setattr_prepare(dentry, attr);
2258	if (err != 0)
2259		return err;
2260
2261	if ((attr->ia_valid & ATTR_SIZE) &&
2262	    attr->ia_size > max(inode->i_size, fsc->max_file_size))
2263		return -EFBIG;
2264
2265	if ((attr->ia_valid & ATTR_SIZE) &&
2266	    ceph_quota_is_max_bytes_exceeded(inode, attr->ia_size))
2267		return -EDQUOT;
2268
2269	err = __ceph_setattr(inode, attr);
2270
2271	if (err >= 0 && (attr->ia_valid & ATTR_MODE))
2272		err = posix_acl_chmod(inode, attr->ia_mode);
2273
2274	return err;
2275}
2276
2277/*
2278 * Verify that we have a lease on the given mask.  If not,
2279 * do a getattr against an mds.
2280 */
2281int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
2282		      int mask, bool force)
2283{
2284	struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
2285	struct ceph_mds_client *mdsc = fsc->mdsc;
2286	struct ceph_mds_request *req;
2287	int mode;
2288	int err;
2289
2290	if (ceph_snap(inode) == CEPH_SNAPDIR) {
2291		dout("do_getattr inode %p SNAPDIR\n", inode);
2292		return 0;
2293	}
2294
2295	dout("do_getattr inode %p mask %s mode 0%o\n",
2296	     inode, ceph_cap_string(mask), inode->i_mode);
2297	if (!force && ceph_caps_issued_mask_metric(ceph_inode(inode), mask, 1))
2298			return 0;
2299
2300	mode = (mask & CEPH_STAT_RSTAT) ? USE_AUTH_MDS : USE_ANY_MDS;
2301	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode);
2302	if (IS_ERR(req))
2303		return PTR_ERR(req);
2304	req->r_inode = inode;
2305	ihold(inode);
2306	req->r_num_caps = 1;
2307	req->r_args.getattr.mask = cpu_to_le32(mask);
2308	req->r_locked_page = locked_page;
2309	err = ceph_mdsc_do_request(mdsc, NULL, req);
2310	if (locked_page && err == 0) {
2311		u64 inline_version = req->r_reply_info.targeti.inline_version;
2312		if (inline_version == 0) {
2313			/* the reply is supposed to contain inline data */
2314			err = -EINVAL;
2315		} else if (inline_version == CEPH_INLINE_NONE) {
2316			err = -ENODATA;
2317		} else {
2318			err = req->r_reply_info.targeti.inline_len;
2319		}
2320	}
2321	ceph_mdsc_put_request(req);
2322	dout("do_getattr result=%d\n", err);
2323	return err;
2324}
2325
2326
2327/*
2328 * Check inode permissions.  We verify we have a valid value for
2329 * the AUTH cap, then call the generic handler.
2330 */
2331int ceph_permission(struct inode *inode, int mask)
2332{
2333	int err;
2334
2335	if (mask & MAY_NOT_BLOCK)
2336		return -ECHILD;
2337
2338	err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED, false);
2339
2340	if (!err)
2341		err = generic_permission(inode, mask);
2342	return err;
2343}
2344
2345/* Craft a mask of needed caps given a set of requested statx attrs. */
2346static int statx_to_caps(u32 want)
2347{
2348	int mask = 0;
2349
2350	if (want & (STATX_MODE|STATX_UID|STATX_GID|STATX_CTIME|STATX_BTIME))
2351		mask |= CEPH_CAP_AUTH_SHARED;
2352
2353	if (want & (STATX_NLINK|STATX_CTIME))
2354		mask |= CEPH_CAP_LINK_SHARED;
2355
2356	if (want & (STATX_ATIME|STATX_MTIME|STATX_CTIME|STATX_SIZE|
2357		    STATX_BLOCKS))
2358		mask |= CEPH_CAP_FILE_SHARED;
2359
2360	if (want & (STATX_CTIME))
2361		mask |= CEPH_CAP_XATTR_SHARED;
2362
2363	return mask;
2364}
2365
2366/*
2367 * Get all the attributes. If we have sufficient caps for the requested attrs,
2368 * then we can avoid talking to the MDS at all.
2369 */
2370int ceph_getattr(const struct path *path, struct kstat *stat,
2371		 u32 request_mask, unsigned int flags)
2372{
2373	struct inode *inode = d_inode(path->dentry);
2374	struct ceph_inode_info *ci = ceph_inode(inode);
2375	u32 valid_mask = STATX_BASIC_STATS;
2376	int err = 0;
2377
2378	/* Skip the getattr altogether if we're asked not to sync */
2379	if (!(flags & AT_STATX_DONT_SYNC)) {
2380		err = ceph_do_getattr(inode, statx_to_caps(request_mask),
2381				      flags & AT_STATX_FORCE_SYNC);
2382		if (err)
2383			return err;
2384	}
2385
2386	generic_fillattr(inode, stat);
2387	stat->ino = ceph_present_inode(inode);
2388
2389	/*
2390	 * btime on newly-allocated inodes is 0, so if this is still set to
2391	 * that, then assume that it's not valid.
2392	 */
2393	if (ci->i_btime.tv_sec || ci->i_btime.tv_nsec) {
2394		stat->btime = ci->i_btime;
2395		valid_mask |= STATX_BTIME;
2396	}
2397
2398	if (ceph_snap(inode) == CEPH_NOSNAP)
2399		stat->dev = inode->i_sb->s_dev;
2400	else
2401		stat->dev = ci->i_snapid_map ? ci->i_snapid_map->dev : 0;
2402
2403	if (S_ISDIR(inode->i_mode)) {
2404		if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb),
2405					RBYTES))
2406			stat->size = ci->i_rbytes;
2407		else
2408			stat->size = ci->i_files + ci->i_subdirs;
2409		stat->blocks = 0;
2410		stat->blksize = 65536;
2411		/*
2412		 * Some applications rely on the number of st_nlink
2413		 * value on directories to be either 0 (if unlinked)
2414		 * or 2 + number of subdirectories.
2415		 */
2416		if (stat->nlink == 1)
2417			/* '.' + '..' + subdirs */
2418			stat->nlink = 1 + 1 + ci->i_subdirs;
2419	}
2420
2421	stat->result_mask = request_mask & valid_mask;
2422	return err;
2423}
2424