xref: /kernel/linux/linux-5.10/fs/hmdfs/stash.c (revision 8c2ecf20)
1// SPDX-License-Identifier: GPL-2.0
2/*
3 * fs/hmdfs/stash.c
4 *
5 * Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6 */
7
8#include <linux/kernel.h>
9#include <linux/fs.h>
10#include <linux/file.h>
11#include <linux/dcache.h>
12#include <linux/namei.h>
13#include <linux/mount.h>
14#include <linux/slab.h>
15#include <linux/list.h>
16#include <linux/pagemap.h>
17#include <linux/sched/mm.h>
18#include <linux/sched/task.h>
19#include <linux/errseq.h>
20#include <linux/crc32.h>
21
22#include "stash.h"
23#include "comm/node_cb.h"
24#include "comm/protocol.h"
25#include "comm/connection.h"
26#include "file_remote.h"
27#include "hmdfs_dentryfile.h"
28#include "authority/authentication.h"
29
30/* Head magic used to identify a stash file */
31#define HMDFS_STASH_FILE_HEAD_MAGIC 0xF7AB06C3
32/* Head and path in stash file are aligned with HMDFS_STASH_BLK_SIZE */
33#define HMDFS_STASH_BLK_SIZE 4096
34#define HMDFS_STASH_BLK_SHIFT 12
35#define HMDFS_STASH_PAGE_TO_SECTOR_SHIFT 3
36#define HMDFS_STASH_DIR_NAME "stash"
37#define HMDFS_STASH_FMT_DIR_NAME "v1"
38#define HMDFS_STASH_WORK_DIR_NAME \
39	(HMDFS_STASH_DIR_NAME "/" HMDFS_STASH_FMT_DIR_NAME)
40
41#define HMDFS_STASH_FILE_NAME_LEN 20
42
43#define HMDFS_STASH_FLUSH_CNT 2
44
45#define HMDFS_STASH_PATH_LEN (HMDFS_CID_SIZE + HMDFS_STASH_FILE_NAME_LEN + 1)
46
47struct hmdfs_cache_file_head {
48	__le32 magic;
49	__le32 crc_offset;
50	__le64 ino;
51	__le64 size;
52	__le64 blocks;
53	__le64 last_write_pos;
54	__le64 ctime;
55	__le32 ctime_nsec;
56	__le32 change_detect_cap;
57	__le64 ichange_count;
58	__le32 path_offs;
59	__le32 path_len;
60	__le32 path_cnt;
61	__le32 data_offs;
62	/* Attention: expand new fields in here to compatible with old ver */
63	__le32 crc32;
64} __packed;
65
66struct hmdfs_stash_work {
67	struct hmdfs_peer *conn;
68	struct list_head *list;
69	struct work_struct work;
70	struct completion done;
71};
72
73struct hmdfs_inode_tbl {
74	unsigned int cnt;
75	unsigned int max;
76	uint64_t inodes[0];
77};
78
79struct hmdfs_stash_dir_context {
80	struct dir_context dctx;
81	char name[NAME_MAX + 1];
82	struct hmdfs_inode_tbl *tbl;
83};
84
85struct hmdfs_restore_stats {
86	unsigned int succeed;
87	unsigned int fail;
88	unsigned int keep;
89	unsigned long long ok_pages;
90	unsigned long long fail_pages;
91};
92
93struct hmdfs_stash_stats {
94	unsigned int succeed;
95	unsigned int donothing;
96	unsigned int fail;
97	unsigned long long ok_pages;
98	unsigned long long fail_pages;
99};
100
101struct hmdfs_file_restore_ctx {
102	struct hmdfs_peer *conn;
103	struct path src_dir_path;
104	struct path dst_root_path;
105	char *dst;
106	char *page;
107	struct file *src_filp;
108	uint64_t inum;
109	uint64_t pages;
110	unsigned int seq;
111	unsigned int data_offs;
112	/* output */
113	bool keep;
114};
115
116struct hmdfs_copy_args {
117	struct file *src;
118	struct file *dst;
119	void *buf;
120	size_t buf_len;
121	unsigned int seq;
122	unsigned int data_offs;
123	uint64_t inum;
124};
125
126struct hmdfs_copy_ctx {
127	struct hmdfs_copy_args args;
128	loff_t src_pos;
129	loff_t dst_pos;
130	/* output */
131	size_t copied;
132	bool eof;
133};
134
135struct hmdfs_rebuild_stats {
136	unsigned int succeed;
137	unsigned int total;
138	unsigned int fail;
139	unsigned int invalid;
140};
141
142struct hmdfs_check_work {
143	struct hmdfs_peer *conn;
144	struct work_struct work;
145	struct completion done;
146};
147
148typedef int (*stash_operation_func)(struct hmdfs_peer *,
149				    unsigned int,
150				    struct path *,
151				    const struct hmdfs_inode_tbl *,
152				    void *);
153
154static struct dentry *hmdfs_do_vfs_mkdir(struct dentry *parent,
155					 const char *name, int namelen,
156					 umode_t mode)
157{
158	struct inode *dir = d_inode(parent);
159	struct dentry *child = NULL;
160	int err;
161
162	inode_lock_nested(dir, I_MUTEX_PARENT);
163
164	child = lookup_one_len(name, parent, namelen);
165	if (IS_ERR(child))
166		goto out;
167
168	if (d_is_positive(child)) {
169		if (d_can_lookup(child))
170			goto out;
171
172		dput(child);
173		child = ERR_PTR(-EINVAL);
174		goto out;
175	}
176
177	err = vfs_mkdir(dir, child, mode);
178	if (err) {
179		dput(child);
180		child = ERR_PTR(err);
181		goto out;
182	}
183
184out:
185	inode_unlock(dir);
186	return child;
187}
188
189struct dentry *hmdfs_stash_new_work_dir(struct dentry *parent)
190{
191	struct dentry *base = NULL;
192	struct dentry *work = NULL;
193
194	base = hmdfs_do_vfs_mkdir(parent, HMDFS_STASH_DIR_NAME,
195				   strlen(HMDFS_STASH_DIR_NAME), 0700);
196	if (IS_ERR(base))
197		return base;
198
199	work = hmdfs_do_vfs_mkdir(base, HMDFS_STASH_FMT_DIR_NAME,
200				  strlen(HMDFS_STASH_FMT_DIR_NAME), 0700);
201	dput(base);
202
203	return work;
204}
205
206static struct file *hmdfs_new_stash_file(struct path *d_path, const char *cid)
207{
208	struct dentry *parent = NULL;
209	struct dentry *child = NULL;
210	struct file *filp = NULL;
211	struct path stash;
212	int err;
213
214	parent = hmdfs_do_vfs_mkdir(d_path->dentry, cid, strlen(cid), 0700);
215	if (IS_ERR(parent)) {
216		err = PTR_ERR(parent);
217		hmdfs_err("mkdir error %d", err);
218		goto mkdir_err;
219	}
220
221	child = vfs_tmpfile(parent, S_IFREG | 0600, 0);
222	if (IS_ERR(child)) {
223		err = PTR_ERR(child);
224		hmdfs_err("new stash file error %d", err);
225		goto tmpfile_err;
226	}
227
228	stash.mnt = d_path->mnt;
229	stash.dentry = child;
230	filp = dentry_open(&stash, O_LARGEFILE | O_WRONLY, current_cred());
231	if (IS_ERR(filp)) {
232		err = PTR_ERR(filp);
233		hmdfs_err("open stash file error %d", err);
234		goto open_err;
235	}
236
237	dput(child);
238	dput(parent);
239
240	return filp;
241
242open_err:
243	dput(child);
244tmpfile_err:
245	dput(parent);
246mkdir_err:
247	return ERR_PTR(err);
248}
249
250static inline bool hmdfs_is_dir(struct dentry *child)
251{
252	return d_is_positive(child) && d_can_lookup(child);
253}
254
255static inline bool hmdfs_is_reg(struct dentry *child)
256{
257	return d_is_positive(child) && d_is_reg(child);
258}
259
260static void hmdfs_set_stash_file_head(const struct hmdfs_cache_info *cache,
261				      uint64_t ino,
262				      struct hmdfs_cache_file_head *head)
263{
264	long long blocks;
265	unsigned int crc_offset;
266
267	memset(head, 0, sizeof(*head));
268	head->magic = cpu_to_le32(HMDFS_STASH_FILE_HEAD_MAGIC);
269	head->ino = cpu_to_le64(ino);
270	head->size = cpu_to_le64(i_size_read(file_inode(cache->cache_file)));
271	blocks = atomic64_read(&cache->written_pgs) <<
272			       HMDFS_STASH_PAGE_TO_SECTOR_SHIFT;
273	head->blocks = cpu_to_le64(blocks);
274	head->path_offs = cpu_to_le32(cache->path_offs);
275	head->path_len = cpu_to_le32(cache->path_len);
276	head->path_cnt = cpu_to_le32(cache->path_cnt);
277	head->data_offs = cpu_to_le32(cache->data_offs);
278	crc_offset = offsetof(struct hmdfs_cache_file_head, crc32);
279	head->crc_offset = cpu_to_le32(crc_offset);
280	head->crc32 = cpu_to_le32(crc32(0, head, crc_offset));
281}
282
283static int hmdfs_flush_stash_file_metadata(struct hmdfs_inode_info *info)
284{
285	struct hmdfs_cache_info *cache = NULL;
286	struct hmdfs_peer *conn = info->conn;
287	struct hmdfs_cache_file_head cache_head;
288	size_t written;
289	loff_t pos;
290	unsigned int head_size;
291
292	/* No metadata if no cache file info */
293	cache = info->cache;
294	if (!cache)
295		return -EINVAL;
296
297	if (strlen(cache->path) == 0) {
298		long long to_write_pgs = atomic64_read(&cache->to_write_pgs);
299
300		/* Nothing to stash. No need to flush meta data. */
301		if (to_write_pgs == 0)
302			return 0;
303
304		hmdfs_err("peer 0x%x:0x%llx inode 0x%llx lost %lld pages due to no path",
305			  conn->owner, conn->device_id,
306			  info->remote_ino, to_write_pgs);
307		return -EINVAL;
308	}
309
310	hmdfs_set_stash_file_head(cache, info->remote_ino, &cache_head);
311
312	/* Write head */
313	pos = 0;
314	head_size = sizeof(cache_head);
315	written = kernel_write(cache->cache_file, &cache_head, head_size, &pos);
316	if (written != head_size) {
317		hmdfs_err("stash peer 0x%x:0x%llx ino 0x%llx write head len %u err %zd",
318			   conn->owner, conn->device_id, info->remote_ino,
319			   head_size, written);
320		return -EIO;
321	}
322	/* Write path */
323	pos = (loff_t)cache->path_offs << HMDFS_STASH_BLK_SHIFT;
324	written = kernel_write(cache->cache_file, cache->path, cache->path_len,
325			       &pos);
326	if (written != cache->path_len) {
327		hmdfs_err("stash peer 0x%x:0x%llx ino 0x%llx write path len %u err %zd",
328			   conn->owner, conn->device_id, info->remote_ino,
329			   cache->path_len, written);
330		return -EIO;
331	}
332
333	return 0;
334}
335
336/* Mainly from inode_wait_for_writeback() */
337static void hmdfs_wait_remote_writeback_once(struct hmdfs_peer *conn,
338					     struct hmdfs_inode_info *info)
339{
340	struct inode *inode = &info->vfs_inode;
341	DEFINE_WAIT_BIT(wq, &inode->i_state, __I_SYNC);
342	wait_queue_head_t *wq_head = NULL;
343	bool in_sync = false;
344
345	spin_lock(&inode->i_lock);
346	in_sync = inode->i_state & I_SYNC;
347	spin_unlock(&inode->i_lock);
348
349	if (!in_sync)
350		return;
351
352	hmdfs_info("peer 0x%x:0x%llx ino 0x%llx wait for wb once",
353		   conn->owner, conn->device_id, info->remote_ino);
354
355	wq_head = bit_waitqueue(&inode->i_state, __I_SYNC);
356	__wait_on_bit(wq_head, &wq, bit_wait, TASK_UNINTERRUPTIBLE);
357}
358
359static void hmdfs_reset_remote_write_err(struct hmdfs_peer *conn,
360					 struct hmdfs_inode_info *info)
361{
362	struct address_space *mapping = info->vfs_inode.i_mapping;
363	int flags_err;
364	errseq_t old;
365	int wb_err;
366
367	flags_err = filemap_check_errors(mapping);
368
369	old = errseq_sample(&mapping->wb_err);
370	wb_err = errseq_check_and_advance(&mapping->wb_err, &old);
371	if (flags_err || wb_err)
372		hmdfs_warning("peer 0x%x:0x%llx inode 0x%llx wb error %d %d before stash",
373			      conn->owner, conn->device_id, info->remote_ino,
374			      flags_err, wb_err);
375}
376
377static bool hmdfs_is_mapping_clean(struct address_space *mapping)
378{
379	bool clean = false;
380
381	/* b93b016313b3b ("page cache: use xa_lock") introduces i_pages */
382#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 17, 0)
383	xa_lock_irq(&mapping->i_pages);
384#else
385	spin_lock_irq(&mapping->tree_lock);
386#endif
387	clean = !mapping_tagged(mapping, PAGECACHE_TAG_DIRTY) &&
388		!mapping_tagged(mapping, PAGECACHE_TAG_WRITEBACK);
389#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 17, 0)
390	xa_unlock_irq(&mapping->i_pages);
391#else
392	spin_unlock_irq(&mapping->tree_lock);
393#endif
394	return clean;
395}
396
397static int hmdfs_flush_stash_file_data(struct hmdfs_peer *conn,
398				       struct hmdfs_inode_info *info)
399{
400	struct inode *inode = &info->vfs_inode;
401	struct address_space *mapping = inode->i_mapping;
402	bool all_clean = true;
403	int err = 0;
404	int i;
405
406	/* Wait for the completion of write syscall */
407	inode_lock(inode);
408	inode_unlock(inode);
409
410	all_clean = hmdfs_is_mapping_clean(mapping);
411	if (all_clean) {
412		hmdfs_reset_remote_write_err(conn, info);
413		return 0;
414	}
415
416	/*
417	 * No-sync_all writeback during offline may have not seen
418	 * the setting of stash_status as HMDFS_REMOTE_INODE_STASHING
419	 * and will call mapping_set_error() after we just reset
420	 * the previous error. So waiting for these writeback once,
421	 * and the following writeback will do local write.
422	 */
423	hmdfs_wait_remote_writeback_once(conn, info);
424
425	/* Need to clear previous error ? */
426	hmdfs_reset_remote_write_err(conn, info);
427
428	/*
429	 * 1. dirty page: do write back
430	 * 2. writeback page: wait for its completion
431	 * 3. writeback -> redirty page: do filemap_write_and_wait()
432	 *    twice, so 2th writeback should not allow
433	 *    writeback -> redirty transition
434	 */
435	for (i = 0; i < HMDFS_STASH_FLUSH_CNT; i++) {
436		err = filemap_write_and_wait(mapping);
437		if (err) {
438			hmdfs_err("peer 0x%x:0x%llx inode 0x%llx #%d stash flush error %d",
439				  conn->owner, conn->device_id,
440				  info->remote_ino, i, err);
441			return err;
442		}
443	}
444
445	if (!hmdfs_is_mapping_clean(mapping))
446		hmdfs_err("peer 0x%x:0x%llx inode 0x%llx is still dirty dt %d wb %d",
447			  conn->owner, conn->device_id, info->remote_ino,
448			  !!mapping_tagged(mapping, PAGECACHE_TAG_DIRTY),
449			  !!mapping_tagged(mapping, PAGECACHE_TAG_WRITEBACK));
450
451	return 0;
452}
453
454static int hmdfs_flush_stash_file(struct hmdfs_inode_info *info)
455{
456	int err;
457
458	err = hmdfs_flush_stash_file_data(info->conn, info);
459	if (!err)
460		err = hmdfs_flush_stash_file_metadata(info);
461
462	return err;
463}
464
465static int hmdfs_enable_stash_file(struct hmdfs_inode_info *info,
466				   struct dentry *stash)
467{
468	char name[HMDFS_STASH_FILE_NAME_LEN];
469	struct dentry *parent = NULL;
470	struct inode *dir = NULL;
471	struct dentry *child = NULL;
472	int err = 0;
473	bool retried = false;
474
475	snprintf(name, sizeof(name), "0x%llx", info->remote_ino);
476
477	parent = lock_parent(stash);
478	dir = d_inode(parent);
479
480lookup_again:
481	child = lookup_one_len(name, parent, strlen(name));
482	if (IS_ERR(child)) {
483		err = PTR_ERR(child);
484		child = NULL;
485		hmdfs_err("lookup %s err %d", name, err);
486		goto out;
487	}
488
489	if (d_is_positive(child)) {
490		hmdfs_warning("%s exists (mode 0%o)",
491			      name, d_inode(child)->i_mode);
492
493		err = vfs_unlink(dir, child, NULL);
494		if (err) {
495			hmdfs_err("unlink %s err %d", name, err);
496			goto out;
497		}
498		if (retried) {
499			err = -EEXIST;
500			goto out;
501		}
502
503		retried = true;
504		dput(child);
505		goto lookup_again;
506	}
507
508	err = vfs_link(stash, dir, child, NULL);
509	if (err) {
510		hmdfs_err("link stash file to %s err %d", name, err);
511		goto out;
512	}
513
514out:
515	unlock_dir(parent);
516	if (child)
517		dput(child);
518
519	return err;
520}
521
522/* Return 1 if stash is done, 0 if nothing is stashed */
523static int hmdfs_close_stash_file(struct hmdfs_peer *conn,
524				  struct hmdfs_inode_info *info)
525{
526	struct file *cache_file = info->cache->cache_file;
527	struct dentry *c_dentry = file_dentry(cache_file);
528	struct inode *c_inode = d_inode(c_dentry);
529	long long to_write_pgs = atomic64_read(&info->cache->to_write_pgs);
530	int err;
531
532	hmdfs_info("peer 0x%x:0x%llx inode 0x%llx stashed bytes %lld pages %lld",
533		   conn->owner, conn->device_id, info->remote_ino,
534		   i_size_read(c_inode), to_write_pgs);
535
536	if (to_write_pgs == 0)
537		return 0;
538
539	err = vfs_fsync(cache_file, 0);
540	if (!err)
541		err = hmdfs_enable_stash_file(info, c_dentry);
542	else
543		hmdfs_err("fsync stash file err %d", err);
544
545	return err < 0 ? err : 1;
546}
547
548static void hmdfs_del_file_cache(struct hmdfs_cache_info *cache)
549{
550	if (!cache)
551		return;
552
553	fput(cache->cache_file);
554	kfree(cache->path_buf);
555	kfree(cache);
556}
557
558static struct hmdfs_cache_info *
559hmdfs_new_file_cache(struct hmdfs_peer *conn, struct hmdfs_inode_info *info)
560{
561	struct hmdfs_cache_info *cache = NULL;
562	struct dentry *stash_dentry = NULL;
563	int err;
564
565	cache = kzalloc(sizeof(*cache), GFP_KERNEL);
566	if (!cache)
567		return ERR_PTR(-ENOMEM);
568
569	atomic64_set(&cache->to_write_pgs, 0);
570	atomic64_set(&cache->written_pgs, 0);
571	cache->path_buf = kmalloc(PATH_MAX, GFP_KERNEL);
572	if (!cache->path_buf) {
573		err = -ENOMEM;
574		goto free_cache;
575	}
576
577	/* Need to handle "hardlink" ? */
578	stash_dentry = d_find_any_alias(&info->vfs_inode);
579	if (stash_dentry) {
580		/* Needs full path in hmdfs, will be a device-view path */
581		cache->path = dentry_path_raw(stash_dentry, cache->path_buf,
582					      PATH_MAX);
583		dput(stash_dentry);
584		if (IS_ERR(cache->path)) {
585			err = PTR_ERR(cache->path);
586			hmdfs_err("peer 0x%x:0x%llx inode 0x%llx gen path err %d",
587				  conn->owner, conn->device_id,
588				  info->remote_ino, err);
589			goto free_path;
590		}
591	} else {
592		/* Write-opened file was closed before finding dentry */
593		hmdfs_info("peer 0x%x:0x%llx inode 0x%llx no dentry found",
594			   conn->owner, conn->device_id, info->remote_ino);
595		cache->path_buf[0] = '\0';
596		cache->path = cache->path_buf;
597	}
598
599	cache->path_cnt = 1;
600	cache->path_len = strlen(cache->path) + 1;
601	cache->path_offs = DIV_ROUND_UP(sizeof(struct hmdfs_cache_file_head),
602					HMDFS_STASH_BLK_SIZE);
603	cache->data_offs = cache->path_offs + DIV_ROUND_UP(cache->path_len,
604					HMDFS_STASH_BLK_SIZE);
605	cache->cache_file = hmdfs_new_stash_file(&conn->sbi->stash_work_dir,
606						 conn->cid);
607	if (IS_ERR(cache->cache_file)) {
608		err = PTR_ERR(cache->cache_file);
609		goto free_path;
610	}
611
612	return cache;
613
614free_path:
615	kfree(cache->path_buf);
616free_cache:
617	kfree(cache);
618	return ERR_PTR(err);
619}
620
621static void hmdfs_init_stash_file_cache(struct hmdfs_peer *conn,
622					struct hmdfs_inode_info *info)
623{
624	struct hmdfs_cache_info *cache = NULL;
625
626	cache = hmdfs_new_file_cache(conn, info);
627	if (IS_ERR(cache))
628		/*
629		 * Continue even creating stash info failed.
630		 * We need to ensure there is no dirty pages
631		 * after stash completes
632		 */
633		cache = NULL;
634
635	/* Make write() returns */
636	spin_lock(&info->stash_lock);
637	info->cache = cache;
638	info->stash_status = HMDFS_REMOTE_INODE_STASHING;
639	spin_unlock(&info->stash_lock);
640}
641
642static void hmdfs_update_stash_stats(struct hmdfs_stash_stats *stats,
643				     const struct hmdfs_cache_info *cache,
644				     int err)
645{
646	unsigned long long ok_pages, fail_pages;
647
648	if (cache) {
649		ok_pages = err > 0 ? atomic64_read(&cache->written_pgs) : 0;
650		fail_pages = atomic64_read(&cache->to_write_pgs) - ok_pages;
651		stats->ok_pages += ok_pages;
652		stats->fail_pages += fail_pages;
653	}
654
655	if (err > 0)
656		stats->succeed++;
657	else if (!err)
658		stats->donothing++;
659	else
660		stats->fail++;
661}
662
663/* Return 1 if stash is done, 0 if nothing is stashed */
664static int hmdfs_stash_remote_inode(struct hmdfs_inode_info *info,
665				    struct hmdfs_stash_stats *stats)
666{
667	struct hmdfs_cache_info *cache = info->cache;
668	struct hmdfs_peer *conn = info->conn;
669	unsigned int status;
670	int err = 0;
671
672	hmdfs_info("stash peer 0x%x:0x%llx ino 0x%llx",
673		   conn->owner, conn->device_id, info->remote_ino);
674
675	err = hmdfs_flush_stash_file(info);
676	if (!err)
677		err = hmdfs_close_stash_file(conn, info);
678
679	if (err <= 0)
680		set_bit(HMDFS_FID_NEED_OPEN, &info->fid_flags);
681	status = err > 0 ? HMDFS_REMOTE_INODE_RESTORING :
682			   HMDFS_REMOTE_INODE_NONE;
683	spin_lock(&info->stash_lock);
684	info->cache = NULL;
685	/*
686	 * Use smp_store_release() to ensure order between HMDFS_FID_NEED_OPEN
687	 * and HMDFS_REMOTE_INODE_NONE.
688	 */
689	smp_store_release(&info->stash_status, status);
690	spin_unlock(&info->stash_lock);
691
692	hmdfs_update_stash_stats(stats, cache, err);
693	hmdfs_del_file_cache(cache);
694
695	return err;
696}
697
698static void hmdfs_init_cache_for_stash_files(struct hmdfs_peer *conn,
699					     struct list_head *list)
700{
701	const struct cred *old_cred = NULL;
702	struct hmdfs_inode_info *info = NULL;
703
704	/* For file creation under stash_work_dir */
705	old_cred = hmdfs_override_creds(conn->sbi->cred);
706	list_for_each_entry(info, list, stash_node)
707		hmdfs_init_stash_file_cache(conn, info);
708	hmdfs_revert_creds(old_cred);
709}
710
711static void hmdfs_init_stash_cache_work_fn(struct work_struct *base)
712{
713	struct hmdfs_stash_work *work =
714		container_of(base, struct hmdfs_stash_work, work);
715
716	hmdfs_init_cache_for_stash_files(work->conn, work->list);
717	complete(&work->done);
718}
719
720static void hmdfs_init_cache_for_stash_files_by_work(struct hmdfs_peer *conn,
721						     struct list_head *list)
722{
723	struct hmdfs_stash_work work = {
724		.conn = conn,
725		.list = list,
726		.done = COMPLETION_INITIALIZER_ONSTACK(work.done),
727	};
728
729	INIT_WORK_ONSTACK(&work.work, hmdfs_init_stash_cache_work_fn);
730	schedule_work(&work.work);
731	wait_for_completion(&work.done);
732}
733
734static void hmdfs_stash_fetch_ready_files(struct hmdfs_peer *conn,
735					  bool check, struct list_head *list)
736{
737	struct hmdfs_inode_info *info = NULL;
738
739	spin_lock(&conn->wr_opened_inode_lock);
740	list_for_each_entry(info, &conn->wr_opened_inode_list, wr_opened_node) {
741		int status;
742
743		/* Paired with *_release() in hmdfs_reset_stashed_inode() */
744		status = smp_load_acquire(&info->stash_status);
745		if (status == HMDFS_REMOTE_INODE_NONE) {
746			list_add_tail(&info->stash_node, list);
747			/*
748			 * Prevent close() removing the inode from
749			 * writeable-opened inode list
750			 */
751			hmdfs_remote_add_wr_opened_inode_nolock(conn, info);
752			/* Prevent the inode from eviction */
753			ihold(&info->vfs_inode);
754		} else if (check && status == HMDFS_REMOTE_INODE_STASHING) {
755			hmdfs_warning("peer 0x%x:0x%llx inode 0x%llx unexpected stash status %d",
756				      conn->owner, conn->device_id,
757				      info->remote_ino, status);
758		}
759	}
760	spin_unlock(&conn->wr_opened_inode_lock);
761}
762
763static void hmdfs_stash_offline_prepare(struct hmdfs_peer *conn, int evt,
764					unsigned int seq)
765{
766	LIST_HEAD(preparing);
767
768	if (!hmdfs_is_stash_enabled(conn->sbi))
769		return;
770
771	mutex_lock(&conn->offline_cb_lock);
772
773	hmdfs_stash_fetch_ready_files(conn, true, &preparing);
774
775	if (list_empty(&preparing))
776		goto out;
777
778	hmdfs_init_cache_for_stash_files_by_work(conn, &preparing);
779out:
780	mutex_unlock(&conn->offline_cb_lock);
781}
782
783static void hmdfs_track_inode_locked(struct hmdfs_peer *conn,
784				     struct hmdfs_inode_info *info)
785{
786	spin_lock(&conn->stashed_inode_lock);
787	list_add_tail(&info->stash_node, &conn->stashed_inode_list);
788	conn->stashed_inode_nr++;
789	spin_unlock(&conn->stashed_inode_lock);
790}
791
792static void
793hmdfs_update_peer_stash_stats(struct hmdfs_stash_statistics *stash_stats,
794			      const struct hmdfs_stash_stats *stats)
795{
796	stash_stats->cur_ok = stats->succeed;
797	stash_stats->cur_nothing = stats->donothing;
798	stash_stats->cur_fail = stats->fail;
799	stash_stats->total_ok += stats->succeed;
800	stash_stats->total_nothing += stats->donothing;
801	stash_stats->total_fail += stats->fail;
802	stash_stats->ok_pages += stats->ok_pages;
803	stash_stats->fail_pages += stats->fail_pages;
804}
805
806static void hmdfs_stash_remote_inodes(struct hmdfs_peer *conn,
807				      struct list_head *list)
808{
809	const struct cred *old_cred = NULL;
810	struct hmdfs_inode_info *info = NULL;
811	struct hmdfs_inode_info *next = NULL;
812	struct hmdfs_stash_stats stats;
813
814	/* For file creation, write and relink under stash_work_dir */
815	old_cred = hmdfs_override_creds(conn->sbi->cred);
816
817	memset(&stats, 0, sizeof(stats));
818	list_for_each_entry_safe(info, next, list, stash_node) {
819		int err;
820
821		list_del_init(&info->stash_node);
822
823		err = hmdfs_stash_remote_inode(info, &stats);
824		if (err > 0)
825			hmdfs_track_inode_locked(conn, info);
826
827		hmdfs_remote_del_wr_opened_inode(conn, info);
828		if (err <= 0)
829			iput(&info->vfs_inode);
830	}
831	hmdfs_revert_creds(old_cred);
832
833	hmdfs_update_peer_stash_stats(&conn->stats.stash, &stats);
834	hmdfs_info("peer 0x%x:0x%llx total stashed %u cur ok %u none %u fail %u",
835		   conn->owner, conn->device_id, conn->stashed_inode_nr,
836		   stats.succeed, stats.donothing, stats.fail);
837}
838
839static void hmdfs_stash_offline_do_stash(struct hmdfs_peer *conn, int evt,
840					 unsigned int seq)
841{
842	struct hmdfs_inode_info *info = NULL;
843	LIST_HEAD(preparing);
844	LIST_HEAD(stashing);
845
846	if (!hmdfs_is_stash_enabled(conn->sbi))
847		return;
848
849	/* release seq_lock to prevent blocking no-offline sync cb */
850	mutex_unlock(&conn->seq_lock);
851	/* acquire offline_cb_lock to serialized with offline sync cb */
852	mutex_lock(&conn->offline_cb_lock);
853
854	hmdfs_stash_fetch_ready_files(conn, false, &preparing);
855	if (!list_empty(&preparing))
856		hmdfs_init_cache_for_stash_files(conn, &preparing);
857
858	spin_lock(&conn->wr_opened_inode_lock);
859	list_for_each_entry(info, &conn->wr_opened_inode_list, wr_opened_node) {
860		int status = READ_ONCE(info->stash_status);
861
862		if (status == HMDFS_REMOTE_INODE_STASHING)
863			list_add_tail(&info->stash_node, &stashing);
864	}
865	spin_unlock(&conn->wr_opened_inode_lock);
866
867	if (list_empty(&stashing))
868		goto unlock;
869
870	hmdfs_stash_remote_inodes(conn, &stashing);
871
872unlock:
873	mutex_unlock(&conn->offline_cb_lock);
874	mutex_lock(&conn->seq_lock);
875}
876
877static struct hmdfs_inode_info *
878hmdfs_lookup_stash_inode(struct hmdfs_peer *conn, uint64_t inum)
879{
880	struct hmdfs_inode_info *info = NULL;
881
882	list_for_each_entry(info, &conn->stashed_inode_list, stash_node) {
883		if (info->remote_ino == inum)
884			return info;
885	}
886
887	return NULL;
888}
889
890static void hmdfs_untrack_stashed_inode(struct hmdfs_peer *conn,
891					struct hmdfs_inode_info *info)
892{
893	list_del_init(&info->stash_node);
894	iput(&info->vfs_inode);
895
896	conn->stashed_inode_nr--;
897}
898
899static void hmdfs_reset_stashed_inode(struct hmdfs_peer *conn,
900				      struct hmdfs_inode_info *info)
901{
902	struct inode *ino = &info->vfs_inode;
903
904	/*
905	 * For updating stash_status after iput()
906	 * in hmdfs_untrack_stashed_inode()
907	 */
908	ihold(ino);
909	hmdfs_untrack_stashed_inode(conn, info);
910	/*
911	 * Ensure the order of stash_node and stash_status:
912	 * only update stash_status to NONE after removal of
913	 * stash_node is completed.
914	 */
915	smp_store_release(&info->stash_status,
916			  HMDFS_REMOTE_INODE_NONE);
917	iput(ino);
918}
919
920static void hmdfs_drop_stashed_inodes(struct hmdfs_peer *conn)
921{
922	struct hmdfs_inode_info *info = NULL;
923	struct hmdfs_inode_info *next = NULL;
924
925	if (list_empty(&conn->stashed_inode_list))
926		return;
927
928	hmdfs_warning("peer 0x%x:0x%llx drop unrestorable file %u",
929		      conn->owner, conn->device_id, conn->stashed_inode_nr);
930
931	list_for_each_entry_safe(info, next,
932				 &conn->stashed_inode_list, stash_node) {
933		hmdfs_warning("peer 0x%x:0x%llx inode 0x%llx unrestorable status %u",
934			      conn->owner, conn->device_id, info->remote_ino,
935			      READ_ONCE(info->stash_status));
936
937		hmdfs_reset_stashed_inode(conn, info);
938	}
939}
940
941static struct file *hmdfs_open_stash_dir(struct path *d_path, const char *cid)
942{
943	int err = 0;
944	struct dentry *parent = d_path->dentry;
945	struct inode *dir = d_inode(parent);
946	struct dentry *child = NULL;
947	struct path peer_path;
948	struct file *filp = NULL;
949
950	inode_lock_nested(dir, I_MUTEX_PARENT);
951	child = lookup_one_len(cid, parent, strlen(cid));
952	if (!IS_ERR(child)) {
953		if (!hmdfs_is_dir(child)) {
954			if (d_is_positive(child)) {
955				hmdfs_err("invalid stash dir mode 0%o", d_inode(child)->i_mode);
956				err = -EINVAL;
957			} else {
958				err = -ENOENT;
959			}
960			dput(child);
961		}
962	} else {
963		err = PTR_ERR(child);
964		hmdfs_err("lookup stash dir err %d", err);
965	}
966	inode_unlock(dir);
967
968	if (err)
969		return ERR_PTR(err);
970
971	peer_path.mnt = d_path->mnt;
972	peer_path.dentry = child;
973	filp = dentry_open(&peer_path, O_RDONLY | O_DIRECTORY, current_cred());
974	if (IS_ERR(filp))
975		hmdfs_err("open err %d", (int)PTR_ERR(filp));
976
977	dput(child);
978
979	return filp;
980}
981
982static int hmdfs_new_inode_tbl(struct hmdfs_inode_tbl **tbl)
983{
984	struct hmdfs_inode_tbl *new = NULL;
985
986	new = kmalloc(PAGE_SIZE, GFP_KERNEL);
987	if (!new)
988		return -ENOMEM;
989
990	new->cnt = 0;
991	new->max = (PAGE_SIZE - offsetof(struct hmdfs_inode_tbl, inodes)) /
992		   sizeof(new->inodes[0]);
993	*tbl = new;
994
995	return 0;
996}
997
998static int hmdfs_parse_stash_file_name(struct dir_context *dctx,
999					const char *name,
1000					int namelen,
1001					unsigned int d_type,
1002					uint64_t *stash_inum)
1003{
1004	struct hmdfs_stash_dir_context *ctx = NULL;
1005	int err;
1006
1007	if (d_type != DT_UNKNOWN && d_type != DT_REG)
1008		return 0;
1009	if (namelen > NAME_MAX)
1010		return 0;
1011
1012	ctx = container_of(dctx, struct hmdfs_stash_dir_context, dctx);
1013	memcpy(ctx->name, name, namelen);
1014	ctx->name[namelen] = '\0';
1015	err = kstrtoull(ctx->name, 16, stash_inum);
1016	if (err) {
1017		hmdfs_err("unexpected stash file err %d", err);
1018		return 0;
1019	}
1020	return 1;
1021}
1022
1023static int hmdfs_has_stash_file(struct dir_context *dctx, const char *name,
1024				int namelen, loff_t offset,
1025				u64 inum, unsigned int d_type)
1026{
1027	struct hmdfs_stash_dir_context *ctx = NULL;
1028	uint64_t stash_inum;
1029	int err;
1030
1031	ctx = container_of(dctx, struct hmdfs_stash_dir_context, dctx);
1032	err = hmdfs_parse_stash_file_name(dctx, name, namelen,
1033					   d_type, &stash_inum);
1034	if (!err)
1035		return 0;
1036
1037	ctx->tbl->cnt++;
1038	return 1;
1039}
1040
1041static int hmdfs_fill_stash_file(struct dir_context *dctx, const char *name,
1042				 int namelen, loff_t offset,
1043				 u64 inum, unsigned int d_type)
1044{
1045	struct hmdfs_stash_dir_context *ctx = NULL;
1046	uint64_t stash_inum;
1047	int err;
1048
1049	ctx = container_of(dctx, struct hmdfs_stash_dir_context, dctx);
1050	err = hmdfs_parse_stash_file_name(dctx, name, namelen,
1051					   d_type, &stash_inum);
1052	if (!err)
1053		return 0;
1054	if (ctx->tbl->cnt >= ctx->tbl->max)
1055		return 1;
1056
1057	ctx->tbl->inodes[ctx->tbl->cnt++] = stash_inum;
1058
1059	return 0;
1060}
1061
1062static int hmdfs_del_stash_file(struct dentry *parent, struct dentry *child)
1063{
1064	struct inode *dir = d_inode(parent);
1065	int err = 0;
1066
1067	/* Prevent d_delete() from calling dentry_unlink_inode() */
1068	dget(child);
1069
1070	inode_lock_nested(dir, I_MUTEX_PARENT);
1071	err = vfs_unlink(dir, child, NULL);
1072	if (err)
1073		hmdfs_err("remove stash file err %d", err);
1074	inode_unlock(dir);
1075
1076	dput(child);
1077
1078	return err;
1079}
1080
1081static inline bool hmdfs_is_node_offlined(const struct hmdfs_peer *conn,
1082					  unsigned int seq)
1083{
1084	/*
1085	 * open()/fsync() may fail due to "status = NODE_STAT_OFFLINE"
1086	 * in hmdfs_disconnect_node().
1087	 * Pair with smp_mb() in hmdfs_disconnect_node() to ensure
1088	 * getting the newest event sequence.
1089	 */
1090	smp_mb__before_atomic();
1091	return hmdfs_node_evt_seq(conn) != seq;
1092}
1093
1094static int hmdfs_verify_restore_file_head(struct hmdfs_file_restore_ctx *ctx,
1095				    const struct hmdfs_cache_file_head *head)
1096{
1097	struct inode *inode = file_inode(ctx->src_filp);
1098	struct hmdfs_peer *conn = ctx->conn;
1099	unsigned int crc, read_crc, crc_offset;
1100	loff_t path_offs, data_offs, isize;
1101	int err = 0;
1102
1103	if (le32_to_cpu(head->magic) != HMDFS_STASH_FILE_HEAD_MAGIC) {
1104		err = -EUCLEAN;
1105		hmdfs_err("peer 0x%x:0x%llx ino 0x%llx invalid magic: got 0x%x, exp 0x%x",
1106			  conn->owner, conn->device_id, ctx->inum,
1107			  le32_to_cpu(head->magic),
1108			  HMDFS_STASH_FILE_HEAD_MAGIC);
1109		goto out;
1110	}
1111
1112	crc_offset = le32_to_cpu(head->crc_offset);
1113	read_crc = le32_to_cpu(*((__le32 *)((char *)head + crc_offset)));
1114	crc = crc32(0, head, crc_offset);
1115	if (read_crc != crc) {
1116		err = -EUCLEAN;
1117		hmdfs_err("peer 0x%x:0x%llx ino 0x%llx invalid crc: got 0x%x, exp 0x%x",
1118			  conn->owner, conn->device_id, ctx->inum,
1119			  read_crc, crc);
1120		goto out;
1121	}
1122
1123	if (le64_to_cpu(head->ino) != ctx->inum) {
1124		err = -EUCLEAN;
1125		hmdfs_err("peer 0x%x:0x%llx ino 0x%llx invalid ino: got %llu, exp %llu",
1126			  conn->owner, conn->device_id, ctx->inum,
1127			  le64_to_cpu(head->ino), ctx->inum);
1128		goto out;
1129	}
1130
1131	path_offs = (loff_t)le32_to_cpu(head->path_offs) <<
1132		    HMDFS_STASH_BLK_SHIFT;
1133	if (path_offs <= 0 || path_offs >= i_size_read(inode)) {
1134		err = -EUCLEAN;
1135		hmdfs_err("peer 0x%x:0x%llx ino 0x%llx invalid path_offs %d, stash file size %llu",
1136			  conn->owner, conn->device_id, ctx->inum,
1137			  le32_to_cpu(head->path_offs), i_size_read(inode));
1138		goto out;
1139	}
1140
1141	data_offs = (loff_t)le32_to_cpu(head->data_offs) <<
1142		    HMDFS_STASH_BLK_SHIFT;
1143	if (path_offs >= data_offs) {
1144		err = -EUCLEAN;
1145		hmdfs_err("peer 0x%x:0x%llx ino 0x%llx invalid data_offs %d, path_offs %d",
1146			  conn->owner, conn->device_id, ctx->inum,
1147			  le32_to_cpu(head->data_offs),
1148			  le32_to_cpu(head->path_offs));
1149		goto out;
1150	}
1151	if (data_offs <= 0 || data_offs >= i_size_read(inode)) {
1152		err = -EUCLEAN;
1153		hmdfs_err("peer 0x%x:0x%llx ino 0x%llx invalid data_offs %d, stash file size %llu",
1154			  conn->owner, conn->device_id, ctx->inum,
1155			  le32_to_cpu(head->data_offs), i_size_read(inode));
1156		goto out;
1157	}
1158
1159	isize = le64_to_cpu(head->size);
1160	if (isize != i_size_read(inode)) {
1161		err = -EUCLEAN;
1162		hmdfs_err("peer 0x%x:0x%llx ino 0x%llx invalid isize: got %llu, exp %llu",
1163			  conn->owner, conn->device_id, ctx->inum,
1164			  le64_to_cpu(head->size), i_size_read(inode));
1165		goto out;
1166	}
1167
1168	if (le32_to_cpu(head->path_cnt) < 1) {
1169		err = -EUCLEAN;
1170		hmdfs_err("peer 0x%x:0x%llx ino 0x%llx invalid path_cnt %d",
1171			  conn->owner, conn->device_id, ctx->inum,
1172			  le32_to_cpu(head->path_cnt));
1173		goto out;
1174	}
1175
1176out:
1177	return err;
1178}
1179
1180static int hmdfs_get_restore_file_metadata(struct hmdfs_file_restore_ctx *ctx)
1181{
1182	struct hmdfs_cache_file_head head;
1183	struct hmdfs_peer *conn = ctx->conn;
1184	unsigned int head_size, read_size, head_crc_offset;
1185	loff_t pos;
1186	ssize_t rd;
1187	int err = 0;
1188
1189	head_size = sizeof(struct hmdfs_cache_file_head);
1190	memset(&head, 0, head_size);
1191	/* Read part head */
1192	pos = 0;
1193	read_size = offsetof(struct hmdfs_cache_file_head, crc_offset) +
1194		    sizeof(head.crc_offset);
1195	rd = kernel_read(ctx->src_filp, &head, read_size, &pos);
1196	if (rd != read_size) {
1197		err = rd < 0 ? rd : -ENODATA;
1198		hmdfs_err("peer 0x%x:0x%llx ino 0x%llx read part head err %d",
1199			  conn->owner, conn->device_id, ctx->inum, err);
1200		goto out;
1201	}
1202	head_crc_offset = le32_to_cpu(head.crc_offset);
1203	if (head_crc_offset + sizeof(head.crc32) < head_crc_offset ||
1204	    head_crc_offset + sizeof(head.crc32) > head_size) {
1205		err = -EUCLEAN;
1206		hmdfs_err("peer 0x%x:0x%llx ino 0x%llx got bad head: Too long crc_offset %u which exceeds head size %u",
1207			  conn->owner, conn->device_id, ctx->inum,
1208			  head_crc_offset, head_size);
1209		goto out;
1210	}
1211
1212	/* Read full head */
1213	pos = 0;
1214	read_size = le32_to_cpu(head.crc_offset) + sizeof(head.crc32);
1215	rd = kernel_read(ctx->src_filp, &head, read_size, &pos);
1216	if (rd != read_size) {
1217		err = rd < 0 ? rd : -ENODATA;
1218		hmdfs_err("peer 0x%x:0x%llx ino 0x%llx read full head err %d",
1219			  conn->owner, conn->device_id, ctx->inum, err);
1220		goto out;
1221	}
1222
1223	err = hmdfs_verify_restore_file_head(ctx, &head);
1224	if (err)
1225		goto out;
1226
1227	ctx->pages = le64_to_cpu(head.blocks) >>
1228		     HMDFS_STASH_PAGE_TO_SECTOR_SHIFT;
1229	ctx->data_offs = le32_to_cpu(head.data_offs);
1230	/* Read path */
1231	read_size = min_t(unsigned int, le32_to_cpu(head.path_len), PATH_MAX);
1232	pos = (loff_t)le32_to_cpu(head.path_offs) << HMDFS_STASH_BLK_SHIFT;
1233	rd = kernel_read(ctx->src_filp, ctx->dst, read_size, &pos);
1234	if (rd != read_size) {
1235		err = rd < 0 ? rd : -ENODATA;
1236		hmdfs_err("peer 0x%x:0x%llx ino 0x%llx read path err %d",
1237			  conn->owner, conn->device_id, ctx->inum, err);
1238		goto out;
1239	}
1240	if (strnlen(ctx->dst, read_size) >= read_size) {
1241		err = -EUCLEAN;
1242		hmdfs_err("peer 0x%x:0x%llx ino 0x%llx read path not end with \\0",
1243			  conn->owner, conn->device_id, ctx->inum);
1244		goto out;
1245	}
1246	/* TODO: Pick a valid path from all paths */
1247
1248out:
1249	return err;
1250}
1251
1252static int hmdfs_open_restore_dst_file(struct hmdfs_file_restore_ctx *ctx,
1253				       unsigned int rw_flag, struct file **filp)
1254{
1255	struct hmdfs_peer *conn = ctx->conn;
1256	struct file *dst = NULL;
1257	int err = 0;
1258
1259	err = hmdfs_get_restore_file_metadata(ctx);
1260	if (err)
1261		goto out;
1262
1263	/* Error comes from connection or server ? */
1264	dst = file_open_root(&ctx->dst_root_path,
1265			     ctx->dst, O_LARGEFILE | rw_flag, 0);
1266	if (IS_ERR(dst)) {
1267		err = PTR_ERR(dst);
1268		hmdfs_err("open remote file ino 0x%llx err %d", ctx->inum, err);
1269		if (hmdfs_is_node_offlined(conn, ctx->seq))
1270			err = -ESHUTDOWN;
1271		goto out;
1272	}
1273
1274	*filp = dst;
1275out:
1276	return err;
1277}
1278
1279static bool hmdfs_need_abort_restore(struct hmdfs_file_restore_ctx *ctx,
1280				     struct hmdfs_inode_info *pinned,
1281				     struct file *opened_file)
1282{
1283	struct hmdfs_inode_info *opened = hmdfs_i(file_inode(opened_file));
1284
1285	if (opened->inode_type != HMDFS_LAYER_OTHER_REMOTE)
1286		goto abort;
1287
1288	if (opened == pinned)
1289		return false;
1290
1291abort:
1292	hmdfs_warning("peer 0x%x:0x%llx inode 0x%llx invalid remote file",
1293		      ctx->conn->owner, ctx->conn->device_id, ctx->inum);
1294	hmdfs_warning("got: peer 0x%x:0x%llx inode 0x%llx type %d status %d",
1295		      opened->conn ? opened->conn->owner : 0,
1296		      opened->conn ? opened->conn->device_id : 0,
1297		      opened->remote_ino, opened->inode_type,
1298		      opened->stash_status);
1299	hmdfs_warning("pinned: peer 0x%x:0x%llx inode 0x%llx type %d status %d",
1300		      pinned->conn->owner, pinned->conn->device_id,
1301		      pinned->remote_ino, pinned->inode_type,
1302		      pinned->stash_status);
1303	return true;
1304}
1305
1306static void hmdfs_init_copy_args(const struct hmdfs_file_restore_ctx *ctx,
1307				 struct file *dst, struct hmdfs_copy_args *args)
1308{
1309	args->src = ctx->src_filp;
1310	args->dst = dst;
1311	args->buf = ctx->page;
1312	args->buf_len = PAGE_SIZE;
1313	args->seq = ctx->seq;
1314	args->data_offs = ctx->data_offs;
1315	args->inum = ctx->inum;
1316}
1317
1318static ssize_t hmdfs_write_dst(struct hmdfs_peer *conn, struct file *filp,
1319			       void *buf, size_t len, loff_t pos)
1320{
1321	mm_segment_t old_fs;
1322	struct kiocb kiocb;
1323	struct iovec iov;
1324	struct iov_iter iter;
1325	ssize_t wr;
1326	int err = 0;
1327
1328	file_start_write(filp);
1329
1330	old_fs = force_uaccess_begin();
1331
1332	init_sync_kiocb(&kiocb, filp);
1333	kiocb.ki_pos = pos;
1334
1335	iov.iov_base = buf;
1336	iov.iov_len = len;
1337	iov_iter_init(&iter, WRITE, &iov, 1, len);
1338
1339	wr = hmdfs_file_write_iter_remote_nocheck(&kiocb, &iter);
1340
1341	force_uaccess_end(old_fs);
1342
1343	file_end_write(filp);
1344
1345	if (wr != len) {
1346		struct hmdfs_inode_info *info = hmdfs_i(file_inode(filp));
1347
1348		hmdfs_err("peer 0x%x:0x%llx ino 0x%llx short write ret %zd exp %zu",
1349			  conn->owner, conn->device_id, info->remote_ino,
1350			  wr, len);
1351		err = wr < 0 ? (int)wr : -EFAULT;
1352	}
1353
1354	return err;
1355}
1356
1357static int hmdfs_rd_src_wr_dst(struct hmdfs_peer *conn,
1358			       struct hmdfs_copy_ctx *ctx)
1359{
1360	const struct hmdfs_copy_args *args = NULL;
1361	int err = 0;
1362	loff_t rd_pos;
1363	ssize_t rd;
1364
1365	ctx->eof = false;
1366	ctx->copied = 0;
1367
1368	args = &ctx->args;
1369	rd_pos = ctx->src_pos;
1370	rd = kernel_read(args->src, args->buf, args->buf_len, &rd_pos);
1371	if (rd < 0) {
1372		err = (int)rd;
1373		hmdfs_err("peer 0x%x:0x%llx ino 0x%llx short read err %d",
1374			  conn->owner, conn->device_id, args->inum, err);
1375		goto out;
1376	} else if (rd == 0) {
1377		ctx->eof = true;
1378		goto out;
1379	}
1380
1381	err = hmdfs_write_dst(conn, args->dst, args->buf, rd, ctx->dst_pos);
1382	if (!err)
1383		ctx->copied = rd;
1384	else if (hmdfs_is_node_offlined(conn, args->seq))
1385		err = -ESHUTDOWN;
1386out:
1387	return err;
1388}
1389
1390static int hmdfs_copy_src_to_dst(struct hmdfs_peer *conn,
1391				 const struct hmdfs_copy_args *args)
1392{
1393	int err = 0;
1394	struct file *src = NULL;
1395	struct hmdfs_copy_ctx ctx;
1396	loff_t seek_pos, data_init_pos;
1397	loff_t src_size;
1398
1399	ctx.args = *args;
1400
1401	src = ctx.args.src;
1402	data_init_pos = (loff_t)ctx.args.data_offs << HMDFS_STASH_BLK_SHIFT;
1403	seek_pos = data_init_pos;
1404	src_size = i_size_read(file_inode(src));
1405	while (true) {
1406		loff_t data_pos;
1407
1408		data_pos = vfs_llseek(src, seek_pos, SEEK_DATA);
1409		if (data_pos > seek_pos) {
1410			seek_pos = data_pos;
1411			continue;
1412		} else if (data_pos < 0) {
1413			if (data_pos == -ENXIO) {
1414				loff_t src_blks = file_inode(src)->i_blocks;
1415
1416				hmdfs_info("peer 0x%x:0x%llx ino 0x%llx end at 0x%llx (sz 0x%llx blk 0x%llx)",
1417					   conn->owner, conn->device_id,
1418					   args->inum, seek_pos,
1419					   src_size, src_blks);
1420			} else {
1421				err = (int)data_pos;
1422				hmdfs_err("peer 0x%x:0x%llx ino 0x%llx seek pos 0x%llx err %d",
1423					  conn->owner, conn->device_id,
1424					  args->inum, seek_pos, err);
1425			}
1426			break;
1427		}
1428
1429		hmdfs_debug("peer 0x%x:0x%llx ino 0x%llx seek to 0x%llx",
1430			    conn->owner, conn->device_id, args->inum, data_pos);
1431
1432		ctx.src_pos = data_pos;
1433		ctx.dst_pos = data_pos - data_init_pos;
1434		err = hmdfs_rd_src_wr_dst(conn, &ctx);
1435		if (err || ctx.eof)
1436			break;
1437
1438		seek_pos += ctx.copied;
1439		if (seek_pos >= src_size)
1440			break;
1441	}
1442
1443	return err;
1444}
1445
1446static int hmdfs_restore_src_to_dst(struct hmdfs_file_restore_ctx *ctx,
1447				    struct file *dst)
1448{
1449	struct file *src = ctx->src_filp;
1450	struct hmdfs_copy_args args;
1451	int err;
1452
1453	hmdfs_init_copy_args(ctx, dst, &args);
1454	err = hmdfs_copy_src_to_dst(ctx->conn, &args);
1455	if (err)
1456		goto out;
1457
1458	err = vfs_fsync(dst, 0);
1459	if (err) {
1460		hmdfs_err("fsync remote file ino 0x%llx err %d", ctx->inum, err);
1461		if (hmdfs_is_node_offlined(ctx->conn, ctx->seq))
1462			err = -ESHUTDOWN;
1463	}
1464
1465out:
1466	if (err)
1467		truncate_inode_pages(file_inode(dst)->i_mapping, 0);
1468
1469	/* Remove the unnecessary cache */
1470	invalidate_mapping_pages(file_inode(src)->i_mapping, 0, -1);
1471
1472	return err;
1473}
1474
1475
1476static int hmdfs_restore_file(struct hmdfs_file_restore_ctx *ctx)
1477{
1478	struct hmdfs_peer *conn = ctx->conn;
1479	uint64_t inum = ctx->inum;
1480	struct hmdfs_inode_info *pinned_info = NULL;
1481	struct file *dst_filp = NULL;
1482	int err = 0;
1483	bool keep = false;
1484
1485	hmdfs_info("peer 0x%x:0x%llx ino 0x%llx do restore",
1486		   conn->owner, conn->device_id, inum);
1487
1488	pinned_info = hmdfs_lookup_stash_inode(conn, inum);
1489	if (pinned_info) {
1490		unsigned int status = READ_ONCE(pinned_info->stash_status);
1491
1492		if (status != HMDFS_REMOTE_INODE_RESTORING) {
1493			hmdfs_err("peer 0x%x:0x%llx ino 0x%llx invalid status %u",
1494				  conn->owner, conn->device_id, inum, status);
1495			err = -EINVAL;
1496			goto clean;
1497		}
1498	} else {
1499		hmdfs_warning("peer 0x%x:0x%llx ino 0x%llx doesn't being pinned",
1500			      conn->owner, conn->device_id, inum);
1501		err = -EINVAL;
1502		goto clean;
1503	}
1504
1505	set_bit(HMDFS_FID_NEED_OPEN, &pinned_info->fid_flags);
1506	err = hmdfs_open_restore_dst_file(ctx, O_RDWR, &dst_filp);
1507	if (err) {
1508		if (err == -ESHUTDOWN)
1509			keep = true;
1510		goto clean;
1511	}
1512
1513	if (hmdfs_need_abort_restore(ctx, pinned_info, dst_filp))
1514		goto abort;
1515
1516	err = hmdfs_restore_src_to_dst(ctx, dst_filp);
1517	if (err == -ESHUTDOWN)
1518		keep = true;
1519abort:
1520	fput(dst_filp);
1521clean:
1522	if (pinned_info && !keep)
1523		hmdfs_reset_stashed_inode(conn, pinned_info);
1524	ctx->keep = keep;
1525
1526	hmdfs_info("peer 0x%x:0x%llx ino 0x%llx restore err %d keep %d",
1527		   conn->owner, conn->device_id, inum, err, ctx->keep);
1528
1529	return err;
1530}
1531
1532static int hmdfs_init_file_restore_ctx(struct hmdfs_peer *conn,
1533				       unsigned int seq, struct path *src_dir,
1534				       struct hmdfs_file_restore_ctx *ctx)
1535{
1536	struct hmdfs_sb_info *sbi = conn->sbi;
1537	struct path dst_root;
1538	char *dst = NULL;
1539	char *page = NULL;
1540	int err = 0;
1541
1542	err = hmdfs_get_path_in_sb(sbi->sb, sbi->real_dst, LOOKUP_DIRECTORY,
1543				   &dst_root);
1544	if (err)
1545		return err;
1546
1547	dst = kmalloc(PATH_MAX, GFP_KERNEL);
1548	if (!dst) {
1549		err = -ENOMEM;
1550		goto put_path;
1551	}
1552
1553	page = kmalloc(PAGE_SIZE, GFP_KERNEL);
1554	if (!page) {
1555		err = -ENOMEM;
1556		goto free_dst;
1557	}
1558
1559	ctx->conn = conn;
1560	ctx->src_dir_path = *src_dir;
1561	ctx->dst_root_path = dst_root;
1562	ctx->dst = dst;
1563	ctx->page = page;
1564	ctx->seq = seq;
1565
1566	return 0;
1567free_dst:
1568	kfree(dst);
1569put_path:
1570	path_put(&dst_root);
1571	return err;
1572}
1573
1574static void hmdfs_exit_file_restore_ctx(struct hmdfs_file_restore_ctx *ctx)
1575{
1576	path_put(&ctx->dst_root_path);
1577	kfree(ctx->dst);
1578	kfree(ctx->page);
1579}
1580
1581static struct file *hmdfs_open_stash_file(struct path *p_path, char *name)
1582{
1583	struct dentry *parent = NULL;
1584	struct inode *dir = NULL;
1585	struct dentry *child = NULL;
1586	struct file *filp = NULL;
1587	struct path c_path;
1588	int err = 0;
1589
1590	parent = p_path->dentry;
1591	dir = d_inode(parent);
1592	inode_lock_nested(dir, I_MUTEX_PARENT);
1593	child = lookup_one_len(name, parent, strlen(name));
1594	if (!IS_ERR(child) && !hmdfs_is_reg(child)) {
1595		if (d_is_positive(child)) {
1596			hmdfs_err("invalid stash file (mode 0%o)",
1597				  d_inode(child)->i_mode);
1598			err = -EINVAL;
1599		} else {
1600			hmdfs_err("missing stash file");
1601			err = -ENOENT;
1602		}
1603		dput(child);
1604	} else if (IS_ERR(child)) {
1605		err = PTR_ERR(child);
1606		hmdfs_err("lookup stash file err %d", err);
1607	}
1608	inode_unlock(dir);
1609
1610	if (err)
1611		return ERR_PTR(err);
1612
1613	c_path.mnt = p_path->mnt;
1614	c_path.dentry = child;
1615	filp = dentry_open(&c_path, O_RDONLY | O_LARGEFILE, current_cred());
1616	if (IS_ERR(filp))
1617		hmdfs_err("open stash file err %d", (int)PTR_ERR(filp));
1618
1619	dput(child);
1620
1621	return filp;
1622}
1623
1624static void hmdfs_update_restore_stats(struct hmdfs_restore_stats *stats,
1625				       bool keep, uint64_t pages, int err)
1626{
1627	if (!err) {
1628		stats->succeed++;
1629		stats->ok_pages += pages;
1630	} else if (keep) {
1631		stats->keep++;
1632	} else {
1633		stats->fail++;
1634		stats->fail_pages += pages;
1635	}
1636}
1637
1638static int hmdfs_restore_files(struct hmdfs_peer *conn,
1639			       unsigned int seq, struct path *dir,
1640			       const struct hmdfs_inode_tbl *tbl,
1641			       void *priv)
1642{
1643	unsigned int i;
1644	struct hmdfs_file_restore_ctx ctx;
1645	int err = 0;
1646	struct hmdfs_restore_stats *stats = priv;
1647
1648	err = hmdfs_init_file_restore_ctx(conn, seq, dir, &ctx);
1649	if (err)
1650		return err;
1651
1652	for (i = 0; i < tbl->cnt; i++) {
1653		char name[HMDFS_STASH_FILE_NAME_LEN];
1654		struct file *filp = NULL;
1655
1656		snprintf(name, sizeof(name), "0x%llx", tbl->inodes[i]);
1657		filp = hmdfs_open_stash_file(dir, name);
1658		/* Continue to restore if any error */
1659		if (IS_ERR(filp)) {
1660			stats->fail++;
1661			continue;
1662		}
1663
1664		ctx.inum = tbl->inodes[i];
1665		ctx.src_filp = filp;
1666		ctx.keep = false;
1667		ctx.pages = 0;
1668		err = hmdfs_restore_file(&ctx);
1669		hmdfs_update_restore_stats(stats, ctx.keep, ctx.pages, err);
1670
1671		if (!ctx.keep)
1672			hmdfs_del_stash_file(dir->dentry,
1673					     file_dentry(ctx.src_filp));
1674		fput(ctx.src_filp);
1675
1676		/* Continue to restore */
1677		if (err == -ESHUTDOWN)
1678			break;
1679		err = 0;
1680	}
1681
1682	hmdfs_exit_file_restore_ctx(&ctx);
1683
1684	return err;
1685}
1686
1687static bool hmdfs_is_valid_stash_status(struct hmdfs_inode_info *inode_info,
1688					uint64_t ino)
1689{
1690	return (inode_info->inode_type == HMDFS_LAYER_OTHER_REMOTE &&
1691		inode_info->stash_status == HMDFS_REMOTE_INODE_RESTORING &&
1692		inode_info->remote_ino == ino);
1693}
1694
1695static int hmdfs_rebuild_stash_list(struct hmdfs_peer *conn,
1696				    unsigned int seq,
1697				    struct path *dir,
1698				    const struct hmdfs_inode_tbl *tbl,
1699				    void *priv)
1700{
1701	struct hmdfs_file_restore_ctx ctx;
1702	unsigned int i;
1703	int err;
1704	struct hmdfs_rebuild_stats *stats = priv;
1705
1706	err = hmdfs_init_file_restore_ctx(conn, seq, dir, &ctx);
1707	if (err)
1708		return err;
1709
1710	stats->total += tbl->cnt;
1711
1712	for (i = 0; i < tbl->cnt; i++) {
1713		char name[HMDFS_STASH_FILE_NAME_LEN];
1714		struct file *src_filp = NULL;
1715		struct file *dst_filp = NULL;
1716		struct hmdfs_inode_info *inode_info = NULL;
1717		bool is_valid = true;
1718
1719		snprintf(name, sizeof(name), "0x%llx", tbl->inodes[i]);
1720		src_filp = hmdfs_open_stash_file(dir, name);
1721		if (IS_ERR(src_filp)) {
1722			stats->fail++;
1723			continue;
1724		}
1725		ctx.inum = tbl->inodes[i];
1726		ctx.src_filp = src_filp;
1727
1728		/* No need to track the open which only needs meta info */
1729		err = hmdfs_open_restore_dst_file(&ctx, O_RDONLY, &dst_filp);
1730		if (err) {
1731			fput(src_filp);
1732			if (err == -ESHUTDOWN)
1733				break;
1734			stats->fail++;
1735			err = 0;
1736			continue;
1737		}
1738
1739		inode_info = hmdfs_i(file_inode(dst_filp));
1740		is_valid = hmdfs_is_valid_stash_status(inode_info,
1741						       ctx.inum);
1742		if (is_valid) {
1743			stats->succeed++;
1744		} else {
1745			hmdfs_err("peer 0x%x:0x%llx inode 0x%llx invalid state: type: %d, status: %u, inode: %llu",
1746				  conn->owner, conn->device_id, ctx.inum,
1747				  inode_info->inode_type,
1748				  READ_ONCE(inode_info->stash_status),
1749				  inode_info->remote_ino);
1750			stats->invalid++;
1751		}
1752
1753		fput(ctx.src_filp);
1754		fput(dst_filp);
1755	}
1756
1757	hmdfs_exit_file_restore_ctx(&ctx);
1758	return err;
1759}
1760
1761static int hmdfs_iter_stash_file(struct hmdfs_peer *conn,
1762				 unsigned int seq,
1763				 struct file *filp,
1764				 stash_operation_func op,
1765				 void *priv)
1766{
1767	int err = 0;
1768	struct hmdfs_stash_dir_context ctx = {
1769		.dctx.actor = hmdfs_fill_stash_file,
1770	};
1771	struct hmdfs_inode_tbl *tbl = NULL;
1772	struct path dir;
1773
1774	err = hmdfs_new_inode_tbl(&tbl);
1775	if (err)
1776		goto out;
1777
1778	dir.mnt = filp->f_path.mnt;
1779	dir.dentry = file_dentry(filp);
1780
1781	ctx.tbl = tbl;
1782	ctx.dctx.pos = 0;
1783	do {
1784		tbl->cnt = 0;
1785		err = iterate_dir(filp, &ctx.dctx);
1786		if (err || !tbl->cnt) {
1787			if (err)
1788				hmdfs_err("iterate stash dir err %d", err);
1789			break;
1790		}
1791		err = op(conn, seq, &dir, tbl, priv);
1792	} while (!err);
1793
1794out:
1795	kfree(tbl);
1796	return err;
1797}
1798
1799static void hmdfs_rebuild_check_work_fn(struct work_struct *base)
1800{
1801	struct hmdfs_check_work *work =
1802		container_of(base, struct hmdfs_check_work, work);
1803	struct hmdfs_peer *conn = work->conn;
1804	struct hmdfs_sb_info *sbi = conn->sbi;
1805	struct file *filp = NULL;
1806	const struct cred *old_cred = NULL;
1807	struct hmdfs_stash_dir_context ctx = {
1808		.dctx.actor = hmdfs_has_stash_file,
1809	};
1810	struct hmdfs_inode_tbl tbl;
1811	int err;
1812
1813	old_cred = hmdfs_override_creds(sbi->cred);
1814	filp = hmdfs_open_stash_dir(&sbi->stash_work_dir, conn->cid);
1815	if (IS_ERR(filp))
1816		goto out;
1817
1818	memset(&tbl, 0, sizeof(tbl));
1819	ctx.tbl = &tbl;
1820	err = iterate_dir(filp, &ctx.dctx);
1821	if (!err && ctx.tbl->cnt > 0)
1822		conn->need_rebuild_stash_list = true;
1823
1824	fput(filp);
1825out:
1826	hmdfs_revert_creds(old_cred);
1827	hmdfs_info("peer 0x%x:0x%llx %sneed to rebuild stash list",
1828		   conn->owner, conn->device_id,
1829		   conn->need_rebuild_stash_list ? "" : "don't ");
1830	complete(&work->done);
1831}
1832
1833static void hmdfs_stash_add_do_check(struct hmdfs_peer *conn, int evt,
1834				     unsigned int seq)
1835{
1836	struct hmdfs_sb_info *sbi = conn->sbi;
1837	struct hmdfs_check_work work = {
1838		.conn = conn,
1839		.done = COMPLETION_INITIALIZER_ONSTACK(work.done),
1840	};
1841
1842	if (!hmdfs_is_stash_enabled(sbi))
1843		return;
1844
1845	INIT_WORK_ONSTACK(&work.work, hmdfs_rebuild_check_work_fn);
1846	schedule_work(&work.work);
1847	wait_for_completion(&work.done);
1848}
1849
1850static void
1851hmdfs_update_peer_rebuild_stats(struct hmdfs_rebuild_statistics *rebuild_stats,
1852				const struct hmdfs_rebuild_stats *stats)
1853{
1854	rebuild_stats->cur_ok = stats->succeed;
1855	rebuild_stats->cur_fail = stats->fail;
1856	rebuild_stats->cur_invalid = stats->invalid;
1857	rebuild_stats->total_ok += stats->succeed;
1858	rebuild_stats->total_fail += stats->fail;
1859	rebuild_stats->total_invalid += stats->invalid;
1860}
1861
1862/* rebuild stash inode list */
1863static void hmdfs_stash_online_prepare(struct hmdfs_peer *conn, int evt,
1864				       unsigned int seq)
1865{
1866	struct hmdfs_sb_info *sbi = conn->sbi;
1867	struct file *filp = NULL;
1868	const struct cred *old_cred = NULL;
1869	int err;
1870	struct hmdfs_rebuild_stats stats;
1871
1872	if (!hmdfs_is_stash_enabled(sbi) ||
1873	    !conn->need_rebuild_stash_list)
1874		return;
1875
1876	/* release seq_lock to prevent blocking no-online sync cb */
1877	mutex_unlock(&conn->seq_lock);
1878	old_cred = hmdfs_override_creds(sbi->cred);
1879	filp = hmdfs_open_stash_dir(&sbi->stash_work_dir, conn->cid);
1880	if (IS_ERR(filp))
1881		goto out;
1882
1883	memset(&stats, 0, sizeof(stats));
1884	err = hmdfs_iter_stash_file(conn, seq, filp,
1885				    hmdfs_rebuild_stash_list, &stats);
1886	if (err == -ESHUTDOWN) {
1887		hmdfs_info("peer 0x%x:0x%llx offline again during rebuild",
1888			   conn->owner, conn->device_id);
1889	} else {
1890		WRITE_ONCE(conn->need_rebuild_stash_list, false);
1891		if (err)
1892			hmdfs_warning("partial rebuild fail err %d", err);
1893	}
1894
1895	hmdfs_update_peer_rebuild_stats(&conn->stats.rebuild, &stats);
1896	hmdfs_info("peer 0x%x:0x%llx rebuild stashed-file total %u succeed %u fail %u invalid %u",
1897		   conn->owner, conn->device_id, stats.total, stats.succeed,
1898		   stats.fail, stats.invalid);
1899	fput(filp);
1900out:
1901	conn->stats.rebuild.time++;
1902	hmdfs_revert_creds(old_cred);
1903	if (!READ_ONCE(conn->need_rebuild_stash_list)) {
1904		/*
1905		 * Use smp_mb__before_atomic() to ensure order between
1906		 * writing @conn->need_rebuild_stash_list and
1907		 * reading conn->rebuild_inode_status_nr.
1908		 */
1909		smp_mb__before_atomic();
1910		/*
1911		 * Wait until all inodes finish rebuilding stash status before
1912		 * accessing @conn->stashed_inode_list in restoring.
1913		 */
1914		wait_event(conn->rebuild_inode_status_wq,
1915			   !atomic_read(&conn->rebuild_inode_status_nr));
1916	}
1917	mutex_lock(&conn->seq_lock);
1918}
1919
1920static void
1921hmdfs_update_peer_restore_stats(struct hmdfs_restore_statistics *restore_stats,
1922				const struct hmdfs_restore_stats *stats)
1923{
1924	restore_stats->cur_ok = stats->succeed;
1925	restore_stats->cur_fail = stats->fail;
1926	restore_stats->cur_keep = stats->keep;
1927	restore_stats->total_ok += stats->succeed;
1928	restore_stats->total_fail += stats->fail;
1929	restore_stats->total_keep += stats->keep;
1930	restore_stats->ok_pages += stats->ok_pages;
1931	restore_stats->fail_pages += stats->fail_pages;
1932}
1933
1934static void hmdfs_stash_online_do_restore(struct hmdfs_peer *conn, int evt,
1935					  unsigned int seq)
1936{
1937	struct hmdfs_sb_info *sbi = conn->sbi;
1938	struct file *filp = NULL;
1939	const struct cred *old_cred = NULL;
1940	struct hmdfs_restore_stats stats;
1941	int err = 0;
1942
1943	if (!hmdfs_is_stash_enabled(sbi) || conn->need_rebuild_stash_list) {
1944		if (conn->need_rebuild_stash_list)
1945			hmdfs_info("peer 0x%x:0x%llx skip restoring due to rebuild-need",
1946				   conn->owner, conn->device_id);
1947		return;
1948	}
1949
1950	/* release seq_lock to prevent blocking no-online sync cb */
1951	mutex_unlock(&conn->seq_lock);
1952	/* For dir iteration, file read and unlink */
1953	old_cred = hmdfs_override_creds(conn->sbi->cred);
1954
1955	memset(&stats, 0, sizeof(stats));
1956	filp = hmdfs_open_stash_dir(&sbi->stash_work_dir, conn->cid);
1957	if (IS_ERR(filp)) {
1958		err = PTR_ERR(filp);
1959		goto out;
1960	}
1961
1962	err = hmdfs_iter_stash_file(conn, seq, filp,
1963				    hmdfs_restore_files, &stats);
1964
1965	fput(filp);
1966out:
1967	hmdfs_revert_creds(old_cred);
1968
1969	/* offline again ? */
1970	if (err != -ESHUTDOWN)
1971		hmdfs_drop_stashed_inodes(conn);
1972
1973	hmdfs_update_peer_restore_stats(&conn->stats.restore, &stats);
1974	hmdfs_info("peer 0x%x:0x%llx restore stashed-file ok %u fail %u keep %u",
1975		   conn->owner, conn->device_id,
1976		   stats.succeed, stats.fail, stats.keep);
1977
1978	mutex_lock(&conn->seq_lock);
1979}
1980
1981static void hmdfs_stash_del_do_cleanup(struct hmdfs_peer *conn, int evt,
1982				       unsigned int seq)
1983{
1984	struct hmdfs_inode_info *info = NULL;
1985	struct hmdfs_inode_info *next = NULL;
1986	unsigned int preparing;
1987
1988	if (!hmdfs_is_stash_enabled(conn->sbi))
1989		return;
1990
1991	/* Async cb is cancelled */
1992	preparing = 0;
1993	list_for_each_entry_safe(info, next, &conn->wr_opened_inode_list,
1994				 wr_opened_node) {
1995		int status = READ_ONCE(info->stash_status);
1996
1997		if (status == HMDFS_REMOTE_INODE_STASHING) {
1998			struct hmdfs_cache_info *cache = NULL;
1999
2000			spin_lock(&info->stash_lock);
2001			cache = info->cache;
2002			info->cache = NULL;
2003			info->stash_status = HMDFS_REMOTE_INODE_NONE;
2004			spin_unlock(&info->stash_lock);
2005
2006			hmdfs_remote_del_wr_opened_inode(conn, info);
2007			hmdfs_del_file_cache(cache);
2008			/* put inode after all access are completed */
2009			iput(&info->vfs_inode);
2010			preparing++;
2011		}
2012	}
2013	hmdfs_info("release %u preparing inodes", preparing);
2014
2015	hmdfs_info("release %u pinned inodes", conn->stashed_inode_nr);
2016	if (list_empty(&conn->stashed_inode_list))
2017		return;
2018
2019	list_for_each_entry_safe(info, next,
2020				 &conn->stashed_inode_list, stash_node)
2021		hmdfs_untrack_stashed_inode(conn, info);
2022}
2023
2024void hmdfs_exit_stash(struct hmdfs_sb_info *sbi)
2025{
2026	if (!sbi->s_offline_stash)
2027		return;
2028
2029	if (sbi->stash_work_dir.dentry) {
2030		path_put(&sbi->stash_work_dir);
2031		sbi->stash_work_dir.dentry = NULL;
2032	}
2033}
2034
2035int hmdfs_init_stash(struct hmdfs_sb_info *sbi)
2036{
2037	int err = 0;
2038	struct path parent;
2039	struct dentry *child = NULL;
2040
2041	if (!sbi->s_offline_stash)
2042		return 0;
2043
2044	err = kern_path(sbi->cache_dir, LOOKUP_FOLLOW | LOOKUP_DIRECTORY,
2045			&parent);
2046	if (err) {
2047		hmdfs_err("invalid cache dir err %d", err);
2048		goto out;
2049	}
2050
2051	child = hmdfs_stash_new_work_dir(parent.dentry);
2052	if (!IS_ERR(child)) {
2053		sbi->stash_work_dir.mnt = mntget(parent.mnt);
2054		sbi->stash_work_dir.dentry = child;
2055	} else {
2056		err = PTR_ERR(child);
2057		hmdfs_err("create stash work dir err %d", err);
2058	}
2059
2060	path_put(&parent);
2061out:
2062	return err;
2063}
2064
2065static int hmdfs_stash_write_local_file(struct hmdfs_peer *conn,
2066					struct hmdfs_inode_info *info,
2067					struct hmdfs_writepage_context *ctx,
2068					struct hmdfs_cache_info *cache)
2069{
2070	struct page *page = ctx->page;
2071	const struct cred *old_cred = NULL;
2072	void *buf = NULL;
2073	loff_t pos;
2074	unsigned int flags;
2075	ssize_t written;
2076	int err = 0;
2077
2078	buf = kmap(page);
2079	pos = (loff_t)page->index << PAGE_SHIFT;
2080	/* enable NOFS for memory allocation */
2081	flags = memalloc_nofs_save();
2082	old_cred = hmdfs_override_creds(conn->sbi->cred);
2083	pos += cache->data_offs << HMDFS_STASH_BLK_SHIFT;
2084	written = kernel_write(cache->cache_file, buf, ctx->count, &pos);
2085	hmdfs_revert_creds(old_cred);
2086	memalloc_nofs_restore(flags);
2087	kunmap(page);
2088
2089	if (written != ctx->count) {
2090		hmdfs_err("stash peer 0x%x:0x%llx ino 0x%llx page 0x%lx data_offs 0x%x len %u err %zd",
2091			  conn->owner, conn->device_id, info->remote_ino,
2092			  page->index, cache->data_offs, ctx->count, written);
2093		err = -EIO;
2094	}
2095
2096	return err;
2097}
2098
2099int hmdfs_stash_writepage(struct hmdfs_peer *conn,
2100			  struct hmdfs_writepage_context *ctx)
2101{
2102	struct inode *inode = ctx->page->mapping->host;
2103	struct hmdfs_inode_info *info = hmdfs_i(inode);
2104	struct hmdfs_cache_info *cache = NULL;
2105	int err;
2106
2107	/* e.g. fail to create stash file */
2108	cache = info->cache;
2109	if (!cache)
2110		return -EIO;
2111
2112	err = hmdfs_stash_write_local_file(conn, info, ctx, cache);
2113	if (!err) {
2114		hmdfs_client_writepage_done(info, ctx);
2115		atomic64_inc(&cache->written_pgs);
2116		put_task_struct(ctx->caller);
2117		kfree(ctx);
2118	}
2119	atomic64_inc(&cache->to_write_pgs);
2120
2121	return err;
2122}
2123
2124static void hmdfs_stash_rebuild_status(struct hmdfs_peer *conn,
2125				       struct inode *inode)
2126{
2127	char *path_str = NULL;
2128	struct hmdfs_inode_info *info = NULL;
2129	const struct cred *old_cred = NULL;
2130	struct path path;
2131	struct path *stash_path = NULL;
2132	int err = 0;
2133
2134	path_str = kmalloc(HMDFS_STASH_PATH_LEN, GFP_KERNEL);
2135	if (!path_str) {
2136		err = -ENOMEM;
2137		return;
2138	}
2139
2140	info = hmdfs_i(inode);
2141	err = snprintf(path_str, HMDFS_STASH_PATH_LEN, "%s/0x%llx",
2142		       conn->cid, info->remote_ino);
2143	if (err >= HMDFS_STASH_PATH_LEN) {
2144		kfree(path_str);
2145		hmdfs_err("peer 0x%x:0x%llx inode 0x%llx too long name len",
2146			  conn->owner, conn->device_id, info->remote_ino);
2147		return;
2148	}
2149	old_cred = hmdfs_override_creds(conn->sbi->cred);
2150	stash_path = &conn->sbi->stash_work_dir;
2151	err = vfs_path_lookup(stash_path->dentry, stash_path->mnt,
2152			      path_str, 0, &path);
2153	hmdfs_revert_creds(old_cred);
2154	if (!err) {
2155		if (hmdfs_is_reg(path.dentry)) {
2156			WRITE_ONCE(info->stash_status,
2157				   HMDFS_REMOTE_INODE_RESTORING);
2158			ihold(&info->vfs_inode);
2159			hmdfs_track_inode_locked(conn, info);
2160		} else {
2161			hmdfs_info("peer 0x%x:0x%llx inode 0x%llx unexpected stashed file mode 0%o",
2162				    conn->owner, conn->device_id,
2163				    info->remote_ino,
2164				    d_inode(path.dentry)->i_mode);
2165		}
2166
2167		path_put(&path);
2168	} else if (err && err != -ENOENT) {
2169		hmdfs_err("peer 0x%x:0x%llx inode 0x%llx find %s err %d",
2170			   conn->owner, conn->device_id, info->remote_ino,
2171			   path_str, err);
2172	}
2173
2174	kfree(path_str);
2175}
2176
2177static inline bool
2178hmdfs_need_rebuild_inode_stash_status(struct hmdfs_peer *conn, umode_t mode)
2179{
2180	return hmdfs_is_stash_enabled(conn->sbi) &&
2181	       READ_ONCE(conn->need_rebuild_stash_list) &&
2182	       (S_ISREG(mode) || S_ISLNK(mode));
2183}
2184
2185void hmdfs_remote_init_stash_status(struct hmdfs_peer *conn,
2186				    struct inode *inode, umode_t mode)
2187{
2188	if (!hmdfs_need_rebuild_inode_stash_status(conn, mode))
2189		return;
2190
2191	atomic_inc(&conn->rebuild_inode_status_nr);
2192	/*
2193	 * Use smp_mb__after_atomic() to ensure order between writing
2194	 * @conn->rebuild_inode_status_nr and reading
2195	 * @conn->need_rebuild_stash_list.
2196	 */
2197	smp_mb__after_atomic();
2198	if (READ_ONCE(conn->need_rebuild_stash_list))
2199		hmdfs_stash_rebuild_status(conn, inode);
2200	if (atomic_dec_and_test(&conn->rebuild_inode_status_nr))
2201		wake_up(&conn->rebuild_inode_status_wq);
2202}
2203
2204static struct hmdfs_node_cb_desc stash_cb[] = {
2205	{
2206		.evt = NODE_EVT_OFFLINE,
2207		.sync = true,
2208		.fn = hmdfs_stash_offline_prepare,
2209	},
2210	{
2211		.evt = NODE_EVT_OFFLINE,
2212		.sync = false,
2213		.fn = hmdfs_stash_offline_do_stash,
2214	},
2215	{
2216		.evt = NODE_EVT_ADD,
2217		.sync = true,
2218		.fn = hmdfs_stash_add_do_check,
2219	},
2220	{
2221		.evt = NODE_EVT_ONLINE,
2222		.sync = false,
2223		.fn = hmdfs_stash_online_prepare,
2224	},
2225	{
2226		.evt = NODE_EVT_ONLINE,
2227		.sync = false,
2228		.fn = hmdfs_stash_online_do_restore,
2229	},
2230	{
2231		.evt = NODE_EVT_DEL,
2232		.sync = true,
2233		.fn = hmdfs_stash_del_do_cleanup,
2234	},
2235};
2236
2237void __init hmdfs_stash_add_node_evt_cb(void)
2238{
2239	hmdfs_node_add_evt_cb(stash_cb, ARRAY_SIZE(stash_cb));
2240}
2241
2242