1// SPDX-License-Identifier: GPL-2.0
2/*
3 * fs/hmdfs/comm/socket_adapter.c
4 *
5 * Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6 */
7
8#include "socket_adapter.h"
9
10#include <linux/file.h>
11#include <linux/module.h>
12#include <linux/namei.h>
13#include <linux/net.h>
14#include <linux/pagemap.h>
15#include <net/sock.h>
16
17#include "authority/authentication.h"
18#include "comm/device_node.h"
19#include "hmdfs_client.h"
20#include "hmdfs_server.h"
21#include "hmdfs_trace.h"
22#include "message_verify.h"
23
24#define ACQUIRE_WFIRED_INTVAL_USEC_MIN 10
25#define ACQUIRE_WFIRED_INTVAL_USEC_MAX 30
26
27typedef void (*request_callback)(struct hmdfs_peer *, struct hmdfs_head_cmd *,
28				 void *);
29typedef void (*response_callback)(struct hmdfs_peer *,
30				  struct sendmsg_wait_queue *, void *, size_t);
31
32static const request_callback s_recv_callbacks[F_SIZE] = {
33	[F_OPEN] = hmdfs_server_open,
34	[F_READPAGE] = hmdfs_server_readpage,
35	[F_RELEASE] = hmdfs_server_release,
36	[F_WRITEPAGE] = hmdfs_server_writepage,
37	[F_ITERATE] = hmdfs_server_readdir,
38	[F_MKDIR] = hmdfs_server_mkdir,
39	[F_CREATE] = hmdfs_server_create,
40	[F_RMDIR] = hmdfs_server_rmdir,
41	[F_UNLINK] = hmdfs_server_unlink,
42	[F_RENAME] = hmdfs_server_rename,
43	[F_SETATTR] = hmdfs_server_setattr,
44	[F_STATFS] = hmdfs_server_statfs,
45	[F_DROP_PUSH] = hmdfs_server_get_drop_push,
46	[F_GETATTR] = hmdfs_server_getattr,
47	[F_FSYNC] = hmdfs_server_fsync,
48	[F_SYNCFS] = hmdfs_server_syncfs,
49	[F_GETXATTR] = hmdfs_server_getxattr,
50	[F_SETXATTR] = hmdfs_server_setxattr,
51	[F_LISTXATTR] = hmdfs_server_listxattr,
52	[F_ATOMIC_OPEN] = hmdfs_server_atomic_open,
53};
54
55typedef void (*file_request_callback)(struct hmdfs_peer *,
56				      struct hmdfs_send_command *);
57
58struct async_req_callbacks {
59	void (*on_wakeup)(struct hmdfs_peer *peer, const struct hmdfs_req *req,
60			  const struct hmdfs_resp *resp);
61};
62
63static const struct async_req_callbacks g_async_req_callbacks[F_SIZE] = {
64	[F_SYNCFS] = { .on_wakeup = hmdfs_recv_syncfs_cb },
65	[F_WRITEPAGE] = { .on_wakeup = hmdfs_writepage_cb },
66};
67
68static void msg_release(struct kref *kref)
69{
70	struct sendmsg_wait_queue *msg_wq;
71	struct hmdfs_peer *con;
72
73	msg_wq = (struct sendmsg_wait_queue *)container_of(kref,
74			struct hmdfs_msg_idr_head, ref);
75	con = msg_wq->head.peer;
76	idr_remove(&con->msg_idr, msg_wq->head.msg_id);
77	spin_unlock(&con->idr_lock);
78
79	kfree(msg_wq->buf);
80	if (msg_wq->recv_info.local_filp)
81		fput(msg_wq->recv_info.local_filp);
82	kfree(msg_wq);
83}
84
85// Always remember to find before put, and make sure con is avilable
86void msg_put(struct sendmsg_wait_queue *msg_wq)
87{
88	kref_put_lock(&msg_wq->head.ref, msg_release,
89		      &msg_wq->head.peer->idr_lock);
90}
91
92static void recv_info_init(struct file_recv_info *recv_info)
93{
94	memset(recv_info, 0, sizeof(struct file_recv_info));
95	atomic_set(&recv_info->local_fslices, 0);
96	atomic_set(&recv_info->state, FILE_RECV_PROCESS);
97}
98
99static int msg_init(struct hmdfs_peer *con, struct sendmsg_wait_queue *msg_wq,
100		    struct hmdfs_cmd operations)
101{
102	int ret = 0;
103	struct file_recv_info *recv_info = &msg_wq->recv_info;
104
105	ret = hmdfs_alloc_msg_idr(con, MSG_IDR_MESSAGE_SYNC, msg_wq, operations);
106	if (unlikely(ret))
107		return ret;
108
109	atomic_set(&msg_wq->valid, MSG_Q_SEND);
110	init_waitqueue_head(&msg_wq->response_q);
111	recv_info_init(recv_info);
112	msg_wq->start = jiffies;
113	return 0;
114}
115
116static inline void statistic_con_sb_dirty(struct hmdfs_peer *con,
117					  const struct hmdfs_cmd *op)
118{
119	if (op->command == F_WRITEPAGE && op->cmd_flag == C_REQUEST)
120		atomic64_inc(&con->sb_dirty_count);
121}
122
123int hmdfs_sendmessage(struct hmdfs_peer *node, struct hmdfs_send_data *msg)
124{
125	int ret = 0;
126	struct connection *connect = NULL;
127	struct tcp_handle *tcp = NULL;
128	struct hmdfs_head_cmd *head = msg->head;
129	const struct cred *old_cred;
130
131	if (!node) {
132		hmdfs_err("node NULL when send cmd %d",
133			  head->operations.command);
134		ret = -EAGAIN;
135		goto out_err;
136	} else if (node->status != NODE_STAT_ONLINE) {
137		hmdfs_err("device %llu OFFLINE %d when send cmd %d",
138			  node->device_id, node->status,
139			  head->operations.command);
140		ret = -EAGAIN;
141		goto out;
142	}
143
144	old_cred = hmdfs_override_creds(node->sbi->system_cred);
145
146	do {
147		connect = get_conn_impl(node, CONNECT_TYPE_TCP);
148		if (!connect) {
149			hmdfs_info_ratelimited(
150				"device %llu no connection available when send cmd %d, get new session",
151				node->device_id, head->operations.command);
152			if (node->status != NODE_STAT_OFFLINE) {
153				struct notify_param param;
154
155				memcpy(param.remote_cid, node->cid,
156				       HMDFS_CID_SIZE);
157				param.notify = NOTIFY_OFFLINE;
158				param.fd = INVALID_SOCKET_FD;
159				notify(node, &param);
160			}
161			ret = -EAGAIN;
162			goto revert_cred;
163		}
164
165		ret = connect->send_message(connect, msg);
166		if (ret == -ESHUTDOWN) {
167			hmdfs_info("device %llu send cmd %d message fail, connection stop",
168				   node->device_id, head->operations.command);
169			connect->status = CONNECT_STAT_STOP;
170			tcp = connect->connect_handle;
171			if (node->status != NODE_STAT_OFFLINE) {
172				connection_get(connect);
173				if (!queue_work(node->reget_conn_wq,
174						&connect->reget_work))
175					connection_put(connect);
176			}
177			connection_put(connect);
178			/*
179			 * node->status is OFFLINE can not ensure
180			 * node_seq will be increased before
181			 * hmdfs_sendmessage() returns.
182			 */
183			hmdfs_node_inc_evt_seq(node);
184		} else {
185			connection_put(connect);
186			goto revert_cred;
187		}
188	} while (node->status != NODE_STAT_OFFLINE);
189revert_cred:
190	hmdfs_revert_creds(old_cred);
191
192	if (!ret)
193		statistic_con_sb_dirty(node, &head->operations);
194out:
195	if (head->operations.cmd_flag == C_REQUEST)
196		hmdfs_client_snd_statis(node->sbi,
197					head->operations.command, ret);
198	else if (head->operations.cmd_flag == C_RESPONSE)
199		hmdfs_server_snd_statis(node->sbi,
200					head->operations.command, ret);
201out_err:
202	return ret;
203}
204
205int hmdfs_sendmessage_response(struct hmdfs_peer *con,
206			       struct hmdfs_head_cmd *cmd, __u32 data_len,
207			       void *buf, __u32 ret_code)
208{
209	int ret;
210	struct hmdfs_send_data msg;
211	struct hmdfs_head_cmd head;
212
213	head.magic = HMDFS_MSG_MAGIC;
214	head.version = HMDFS_VERSION;
215	head.operations = cmd->operations;
216	head.operations.cmd_flag = C_RESPONSE;
217	head.data_len = cpu_to_le32(data_len + sizeof(struct hmdfs_head_cmd));
218	head.ret_code = cpu_to_le32(ret_code);
219	head.msg_id = cmd->msg_id;
220	head.reserved = cmd->reserved;
221	head.reserved1 = cmd->reserved1;
222	msg.head = &head;
223	msg.head_len = sizeof(struct hmdfs_head_cmd);
224	msg.data = buf;
225	msg.len = data_len;
226	msg.sdesc = NULL;
227	msg.sdesc_len = 0;
228
229	ret = hmdfs_sendmessage(con, &msg);
230	return ret;
231}
232
233static void mp_release(struct kref *kref)
234{
235	struct hmdfs_msg_parasite *mp = NULL;
236	struct hmdfs_peer *peer = NULL;
237
238	mp = (struct hmdfs_msg_parasite *)container_of(kref,
239			struct hmdfs_msg_idr_head, ref);
240	peer = mp->head.peer;
241	idr_remove(&peer->msg_idr, mp->head.msg_id);
242	spin_unlock(&peer->idr_lock);
243
244	peer_put(peer);
245	kfree(mp->resp.out_buf);
246	kfree(mp);
247}
248
249void mp_put(struct hmdfs_msg_parasite *mp)
250{
251	kref_put_lock(&mp->head.ref, mp_release, &mp->head.peer->idr_lock);
252}
253
254static void async_request_cb_on_wakeup_fn(struct work_struct *w)
255{
256	struct hmdfs_msg_parasite *mp =
257		container_of(w, struct hmdfs_msg_parasite, d_work.work);
258	struct async_req_callbacks cbs;
259	const struct cred *old_cred =
260		hmdfs_override_creds(mp->head.peer->sbi->cred);
261
262	if (mp->resp.ret_code == -ETIME)
263		hmdfs_client_resp_statis(mp->head.peer->sbi,
264					 mp->req.operations.command,
265					 HMDFS_RESP_TIMEOUT, 0, 0);
266
267	cbs = g_async_req_callbacks[mp->req.operations.command];
268	if (cbs.on_wakeup)
269		(*cbs.on_wakeup)(mp->head.peer, &mp->req, &mp->resp);
270	mp_put(mp);
271	hmdfs_revert_creds(old_cred);
272}
273
274static struct hmdfs_msg_parasite *mp_alloc(struct hmdfs_peer *peer,
275					   const struct hmdfs_req *req)
276{
277	struct hmdfs_msg_parasite *mp = kzalloc(sizeof(*mp), GFP_KERNEL);
278	int ret;
279
280	if (unlikely(!mp))
281		return ERR_PTR(-ENOMEM);
282
283	ret = hmdfs_alloc_msg_idr(peer, MSG_IDR_MESSAGE_ASYNC, mp,
284				  req->operations);
285	if (unlikely(ret)) {
286		kfree(mp);
287		return ERR_PTR(ret);
288	}
289
290	mp->start = jiffies;
291	peer_get(mp->head.peer);
292	mp->resp.ret_code = -ETIME;
293	INIT_DELAYED_WORK(&mp->d_work, async_request_cb_on_wakeup_fn);
294	mp->wfired = false;
295	mp->req = *req;
296	return mp;
297}
298
299/**
300 * hmdfs_send_async_request - sendout a async request
301 * @peer: target device node
302 * @req: request descriptor + necessary contexts
303 *
304 * Sendout a request synchronously and wait for its response asynchronously
305 * Return -ESHUTDOWN when the device node is unachievable
306 * Return -EAGAIN if the network is recovering
307 * Return -ENOMEM if out of memory
308 *
309 * Register g_async_req_callbacks to recv the response
310 */
311int hmdfs_send_async_request(struct hmdfs_peer *peer,
312			     const struct hmdfs_req *req)
313{
314	int ret = 0;
315	struct hmdfs_send_data msg;
316	struct hmdfs_head_cmd head;
317	struct hmdfs_msg_parasite *mp = NULL;
318	size_t msg_len = req->data_len + sizeof(struct hmdfs_head_cmd);
319	unsigned int timeout;
320
321	if (req->timeout == TIMEOUT_CONFIG)
322		timeout = get_cmd_timeout(peer->sbi, req->operations.command);
323	else
324		timeout = req->timeout;
325	if (timeout == TIMEOUT_UNINIT || timeout == TIMEOUT_NONE) {
326		hmdfs_err("send msg %d with uninitialized/invalid timeout",
327			  req->operations.command);
328		return -EINVAL;
329	}
330
331	if (!hmdfs_is_node_online(peer))
332		return -EAGAIN;
333
334	mp = mp_alloc(peer, req);
335	if (IS_ERR(mp))
336		return PTR_ERR(mp);
337	head.magic = HMDFS_MSG_MAGIC;
338	head.version = HMDFS_VERSION;
339	head.data_len = cpu_to_le32(msg_len);
340	head.operations = mp->req.operations;
341	head.msg_id = cpu_to_le32(mp->head.msg_id);
342	head.reserved = 0;
343	head.reserved1 = 0;
344
345	msg.head = &head;
346	msg.head_len = sizeof(head);
347	msg.data = mp->req.data;
348	msg.len = mp->req.data_len;
349	msg.sdesc_len = 0;
350	msg.sdesc = NULL;
351
352	ret = hmdfs_sendmessage(peer, &msg);
353	if (unlikely(ret)) {
354		mp_put(mp);
355		goto out;
356	}
357
358	queue_delayed_work(peer->async_wq, &mp->d_work, timeout * HZ);
359	/*
360	 * The work may havn't been queued upon the arriving of it's response,
361	 * resulting in meaningless waiting. So we use the membar to tell the
362	 * recv thread if the work has been queued
363	 */
364	smp_store_release(&mp->wfired, true);
365out:
366	hmdfs_dec_msg_idr_process(peer);
367	return ret;
368}
369
370static int hmdfs_record_async_readdir(struct hmdfs_peer *con,
371				      struct sendmsg_wait_queue *msg_wq)
372{
373	struct hmdfs_sb_info *sbi = con->sbi;
374
375	spin_lock(&sbi->async_readdir_msg_lock);
376	if (sbi->async_readdir_prohibit) {
377		spin_unlock(&sbi->async_readdir_msg_lock);
378		return -EINTR;
379	}
380
381	list_add(&msg_wq->async_msg, &sbi->async_readdir_msg_list);
382	spin_unlock(&sbi->async_readdir_msg_lock);
383
384	return 0;
385}
386
387static void hmdfs_untrack_async_readdir(struct hmdfs_peer *con,
388					struct sendmsg_wait_queue *msg_wq)
389{
390	struct hmdfs_sb_info *sbi = con->sbi;
391
392	spin_lock(&sbi->async_readdir_msg_lock);
393	list_del(&msg_wq->async_msg);
394	spin_unlock(&sbi->async_readdir_msg_lock);
395}
396
397int hmdfs_sendmessage_request(struct hmdfs_peer *con,
398			      struct hmdfs_send_command *sm)
399{
400	int time_left;
401	int ret = 0;
402	struct sendmsg_wait_queue *msg_wq = NULL;
403	struct hmdfs_send_data msg;
404	size_t outlen = sm->len + sizeof(struct hmdfs_head_cmd);
405	unsigned int timeout =
406		get_cmd_timeout(con->sbi, sm->operations.command);
407	struct hmdfs_head_cmd *head = NULL;
408	bool dec = false;
409
410	if (!hmdfs_is_node_online(con)) {
411		ret = -EAGAIN;
412		goto free_filp;
413	}
414
415	if (timeout == TIMEOUT_UNINIT) {
416		hmdfs_err_ratelimited("send msg %d with uninitialized timeout",
417				      sm->operations.command);
418		ret = -EINVAL;
419		goto free_filp;
420	}
421
422	head = kzalloc(sizeof(struct hmdfs_head_cmd), GFP_KERNEL);
423	if (!head) {
424		ret = -ENOMEM;
425		goto free_filp;
426	}
427
428	sm->out_buf = NULL;
429	head->magic = HMDFS_MSG_MAGIC;
430	head->version = HMDFS_VERSION;
431	head->operations = sm->operations;
432	head->data_len = cpu_to_le32(outlen);
433	head->ret_code = cpu_to_le32(sm->ret_code);
434	head->reserved = 0;
435	head->reserved1 = 0;
436	if (timeout != TIMEOUT_NONE) {
437		msg_wq = kzalloc(sizeof(*msg_wq), GFP_KERNEL);
438		if (!msg_wq) {
439			ret = -ENOMEM;
440			goto free_filp;
441		}
442		ret = msg_init(con, msg_wq, sm->operations);
443		if (ret) {
444			kfree(msg_wq);
445			msg_wq = NULL;
446			goto free_filp;
447		}
448		dec = true;
449		head->msg_id = cpu_to_le32(msg_wq->head.msg_id);
450		if (sm->operations.command == F_ITERATE)
451			msg_wq->recv_info.local_filp = sm->local_filp;
452	}
453	msg.head = head;
454	msg.head_len = sizeof(struct hmdfs_head_cmd);
455	msg.data = sm->data;
456	msg.len = sm->len;
457	msg.sdesc_len = 0;
458	msg.sdesc = NULL;
459	ret = hmdfs_sendmessage(con, &msg);
460	if (ret) {
461		hmdfs_err_ratelimited("send err sm->device_id, %lld, msg_id %u",
462				      con->device_id, head->msg_id);
463		goto free;
464	}
465
466	if (timeout == TIMEOUT_NONE)
467		goto free;
468
469	hmdfs_dec_msg_idr_process(con);
470	dec = false;
471
472	if (sm->operations.command == F_ITERATE) {
473		ret = hmdfs_record_async_readdir(con, msg_wq);
474		if (ret) {
475			atomic_set(&msg_wq->recv_info.state, FILE_RECV_ERR_SPC);
476			goto free;
477		}
478	}
479
480	time_left = wait_event_interruptible_timeout(
481		msg_wq->response_q,
482		(atomic_read(&msg_wq->valid) == MSG_Q_END_RECV), timeout * HZ);
483
484	if (sm->operations.command == F_ITERATE)
485		hmdfs_untrack_async_readdir(con, msg_wq);
486
487	if (time_left == -ERESTARTSYS || time_left == 0) {
488		hmdfs_err("timeout err sm->device_id %lld,  msg_id %d cmd %d",
489			  con->device_id, head->msg_id,
490			  head->operations.command);
491		if (sm->operations.command == F_ITERATE)
492			atomic_set(&msg_wq->recv_info.state, FILE_RECV_ERR_NET);
493		ret = -ETIME;
494		hmdfs_client_resp_statis(con->sbi, sm->operations.command,
495					 HMDFS_RESP_TIMEOUT, 0, 0);
496		goto free;
497	}
498	sm->out_buf = msg_wq->buf;
499	msg_wq->buf = NULL;
500	sm->out_len = msg_wq->size - sizeof(struct hmdfs_head_cmd);
501	ret = msg_wq->ret;
502
503free:
504	if (msg_wq)
505		msg_put(msg_wq);
506	if (dec)
507		hmdfs_dec_msg_idr_process(con);
508	kfree(head);
509	return ret;
510
511free_filp:
512	if (sm->local_filp)
513		fput(sm->local_filp);
514	kfree(head);
515	return ret;
516}
517
518static int hmdfs_send_slice(struct hmdfs_peer *con, struct hmdfs_head_cmd *cmd,
519			    struct slice_descriptor *sdesc, void *slice_buf)
520{
521	int ret;
522	struct hmdfs_send_data msg;
523	struct hmdfs_head_cmd head;
524	int content_size = le32_to_cpu(sdesc->content_size);
525	int msg_len = sizeof(struct hmdfs_head_cmd) + content_size +
526		      sizeof(struct slice_descriptor);
527
528	head.magic = HMDFS_MSG_MAGIC;
529	head.version = HMDFS_VERSION;
530	head.operations = cmd->operations;
531	head.operations.cmd_flag = C_RESPONSE;
532	head.data_len = cpu_to_le32(msg_len);
533	head.ret_code = cpu_to_le32(0);
534	head.msg_id = cmd->msg_id;
535	head.reserved = cmd->reserved;
536	head.reserved1 = cmd->reserved1;
537
538	msg.head = &head;
539	msg.head_len = sizeof(struct hmdfs_head_cmd);
540	msg.sdesc = sdesc;
541	msg.sdesc_len = le32_to_cpu(sizeof(struct slice_descriptor));
542	msg.data = slice_buf;
543	msg.len = content_size;
544
545	ret = hmdfs_sendmessage(con, &msg);
546
547	return ret;
548}
549
550int hmdfs_readfile_response(struct hmdfs_peer *con, struct hmdfs_head_cmd *head,
551			    struct file *filp)
552{
553	int ret;
554	const unsigned int slice_size = PAGE_SIZE;
555	char *slice_buf = NULL;
556	loff_t file_offset = 0, file_size;
557	ssize_t size;
558	struct slice_descriptor sdesc;
559	unsigned int slice_sn = 0;
560
561	if (!filp)
562		return hmdfs_sendmessage_response(con, head, 0, NULL, 0);
563
564	sdesc.slice_size = cpu_to_le32(slice_size);
565	file_size = i_size_read(file_inode(filp));
566	file_size = round_up(file_size, slice_size);
567	sdesc.num_slices = cpu_to_le32(file_size / slice_size);
568
569	slice_buf = kmalloc(slice_size, GFP_KERNEL);
570	if (!slice_buf) {
571		ret = -ENOMEM;
572		goto out;
573	}
574
575	while (1) {
576		sdesc.slice_sn = cpu_to_le32(slice_sn++);
577		size = kernel_read(filp, slice_buf, (size_t)slice_size,
578				   &file_offset);
579		if (IS_ERR_VALUE(size)) {
580			ret = (int)size;
581			goto out;
582		}
583		sdesc.content_size = cpu_to_le32(size);
584		ret = hmdfs_send_slice(con, head, &sdesc, slice_buf);
585		if (ret) {
586			hmdfs_info("Cannot send file slice %d ",
587				   le32_to_cpu(sdesc.slice_sn));
588			break;
589		}
590		if (file_offset >= i_size_read(file_inode(filp)))
591			break;
592	}
593
594out:
595	kfree(slice_buf);
596	if (ret)
597		hmdfs_sendmessage_response(con, head, 0, NULL, ret);
598	return ret;
599}
600
601static void asw_release(struct kref *kref)
602{
603	struct hmdfs_async_work *asw = NULL;
604	struct hmdfs_peer *peer = NULL;
605
606	asw = (struct hmdfs_async_work *)container_of(kref,
607			struct hmdfs_msg_idr_head, ref);
608	peer = asw->head.peer;
609	idr_remove(&peer->msg_idr, asw->head.msg_id);
610	spin_unlock(&peer->idr_lock);
611	kfree(asw);
612}
613
614void asw_put(struct hmdfs_async_work *asw)
615{
616	kref_put_lock(&asw->head.ref, asw_release, &asw->head.peer->idr_lock);
617}
618
619void hmdfs_recv_page_work_fn(struct work_struct *ptr)
620{
621	struct hmdfs_async_work *async_work =
622		container_of(ptr, struct hmdfs_async_work, d_work.work);
623
624	hmdfs_client_resp_statis(async_work->head.peer->sbi,
625					F_READPAGE, HMDFS_RESP_TIMEOUT, 0, 0);
626	hmdfs_err_ratelimited("timeout and release page, msg_id:%u",
627			      async_work->head.msg_id);
628	asw_done(async_work);
629}
630
631int hmdfs_sendpage_request(struct hmdfs_peer *con,
632			   struct hmdfs_send_command *sm)
633{
634	int ret = 0;
635	struct hmdfs_send_data msg;
636	struct hmdfs_async_work *async_work = NULL;
637	size_t outlen = sm->len + sizeof(struct hmdfs_head_cmd);
638	struct hmdfs_head_cmd head;
639	unsigned int timeout;
640	unsigned long start = jiffies;
641
642	WARN_ON(!sm->out_buf);
643
644	timeout = get_cmd_timeout(con->sbi, sm->operations.command);
645	if (timeout == TIMEOUT_UNINIT) {
646		hmdfs_err("send msg %d with uninitialized timeout",
647			  sm->operations.command);
648		ret = -EINVAL;
649		goto unlock;
650	}
651
652	if (!hmdfs_is_node_online(con)) {
653		ret = -EAGAIN;
654		goto unlock;
655	}
656
657	memset(&head, 0, sizeof(head));
658	head.magic = HMDFS_MSG_MAGIC;
659	head.version = HMDFS_VERSION;
660	head.operations = sm->operations;
661	head.data_len = cpu_to_le32(outlen);
662	head.ret_code = cpu_to_le32(sm->ret_code);
663	head.reserved = 0;
664	head.reserved1 = 0;
665
666	msg.head = &head;
667	msg.head_len = sizeof(struct hmdfs_head_cmd);
668	msg.data = sm->data;
669	msg.len = sm->len;
670	msg.sdesc_len = 0;
671	msg.sdesc = NULL;
672
673	async_work = kzalloc(sizeof(*async_work), GFP_KERNEL);
674	if (!async_work) {
675		ret = -ENOMEM;
676		goto unlock;
677	}
678	async_work->start = start;
679	ret = hmdfs_alloc_msg_idr(con, MSG_IDR_PAGE, async_work, sm->operations);
680	if (ret) {
681		hmdfs_err("alloc msg_id failed, err %d", ret);
682		goto unlock;
683	}
684	head.msg_id = cpu_to_le32(async_work->head.msg_id);
685	async_work->page = sm->out_buf;
686	asw_get(async_work);
687	INIT_DELAYED_WORK(&async_work->d_work, hmdfs_recv_page_work_fn);
688	ret = queue_delayed_work(con->async_wq, &async_work->d_work,
689				 timeout * HZ);
690	if (!ret) {
691		hmdfs_err("queue_delayed_work failed, msg_id %u", head.msg_id);
692		goto fail_and_unlock_page;
693	}
694	ret = hmdfs_sendmessage(con, &msg);
695	if (ret) {
696		hmdfs_err("send err sm->device_id, %lld, msg_id %u",
697			  con->device_id, head.msg_id);
698		if (!cancel_delayed_work(&async_work->d_work)) {
699			hmdfs_err("cancel async work err");
700			asw_put(async_work);
701			hmdfs_dec_msg_idr_process(con);
702			goto out;
703		}
704		goto fail_and_unlock_page;
705	}
706
707	asw_put(async_work);
708	hmdfs_dec_msg_idr_process(con);
709	return 0;
710
711fail_and_unlock_page:
712	asw_put(async_work);
713	asw_done(async_work);
714	hmdfs_dec_msg_idr_process(con);
715	return ret;
716unlock:
717	kfree(async_work);
718	unlock_page(sm->out_buf);
719out:
720	return ret;
721}
722
723static void hmdfs_request_handle_sync(struct hmdfs_peer *con,
724				      struct hmdfs_head_cmd *head, void *buf)
725{
726	unsigned long start = jiffies;
727	const struct cred *saved_cred = hmdfs_override_fsids(true);
728
729	if (!saved_cred) {
730		hmdfs_err("prepare cred failed!");
731		kfree(buf);
732		return;
733	}
734
735	s_recv_callbacks[head->operations.command](con, head, buf);
736	hmdfs_statistic(con->sbi, head->operations.command, jiffies - start);
737
738	kfree(buf);
739
740	hmdfs_revert_fsids(saved_cred);
741}
742
743static void hmdfs_msg_handle_sync(struct hmdfs_peer *con,
744				 struct hmdfs_head_cmd *head, void *buf)
745{
746	const struct cred *old_cred = hmdfs_override_creds(con->sbi->cred);
747
748	/*
749	 * Reuse PF_NPROC_EXCEEDED as an indication of hmdfs server context:
750	 * 1. PF_NPROC_EXCEEDED will set by setreuid()/setuid()/setresuid(),
751	 *    we assume kwork will not call theses syscalls.
752	 * 2. PF_NPROC_EXCEEDED will be cleared by execv(), and kworker
753	 *    will not call it.
754	 */
755	current->flags |= PF_NPROC_EXCEEDED;
756	hmdfs_request_handle_sync(con, head, buf);
757	current->flags &= ~PF_NPROC_EXCEEDED;
758
759	hmdfs_revert_creds(old_cred);
760}
761
762
763static void hmdfs_request_work_fn(struct work_struct *ptr)
764{
765	struct work_handler_desp *desp =
766		container_of(ptr, struct work_handler_desp, work);
767
768	hmdfs_msg_handle_sync(desp->peer, desp->head, desp->buf);
769	peer_put(desp->peer);
770	kfree(desp->head);
771	kfree(desp);
772}
773
774static int hmdfs_msg_handle_async(struct hmdfs_peer *con,
775				  struct hmdfs_head_cmd *head, void *buf,
776				  struct workqueue_struct *wq,
777				  void (*work_fn)(struct work_struct *ptr))
778{
779	struct work_handler_desp *desp = NULL;
780	struct hmdfs_head_cmd *dup_head = NULL;
781	int ret;
782
783	desp = kzalloc(sizeof(*desp), GFP_KERNEL);
784	if (!desp) {
785		ret = -ENOMEM;
786		goto exit_desp;
787	}
788
789	dup_head = kzalloc(sizeof(*dup_head), GFP_KERNEL);
790	if (!dup_head) {
791		ret = -ENOMEM;
792		goto exit_desp;
793	}
794
795	*dup_head = *head;
796	desp->peer = con;
797	desp->head = dup_head;
798	desp->buf = buf;
799	INIT_WORK(&desp->work, work_fn);
800
801	peer_get(con);
802	queue_work(wq, &desp->work);
803
804	ret = 0;
805	return ret;
806
807exit_desp:
808	kfree(desp);
809	return ret;
810}
811
812static int hmdfs_request_recv(struct hmdfs_peer *con,
813			      struct hmdfs_head_cmd *head, void *buf)
814{
815	int ret;
816
817	if (head->operations.command >= F_SIZE ||
818	    !s_recv_callbacks[head->operations.command]) {
819		ret = -EINVAL;
820		hmdfs_err("NULL callback, command %d",
821			  head->operations.command);
822		goto out;
823	}
824
825	switch (head->operations.command) {
826	case F_OPEN:
827	case F_RELEASE:
828	case F_ITERATE:
829	case F_MKDIR:
830	case F_RMDIR:
831	case F_CREATE:
832	case F_UNLINK:
833	case F_RENAME:
834	case F_SETATTR:
835	case F_STATFS:
836	case F_CONNECT_REKEY:
837	case F_DROP_PUSH:
838	case F_GETATTR:
839	case F_FSYNC:
840	case F_SYNCFS:
841	case F_GETXATTR:
842	case F_SETXATTR:
843	case F_LISTXATTR:
844	case F_ATOMIC_OPEN:
845		ret = hmdfs_msg_handle_async(con, head, buf, con->req_handle_wq,
846					     hmdfs_request_work_fn);
847		break;
848	case F_WRITEPAGE:
849	case F_READPAGE:
850		hmdfs_msg_handle_sync(con, head, buf);
851		ret = 0;
852		break;
853	default:
854		hmdfs_err("Fatal! Unexpected request command %d",
855			  head->operations.command);
856		ret = -EINVAL;
857	}
858
859out:
860	return ret;
861}
862
863void hmdfs_response_wakeup(struct sendmsg_wait_queue *msg_info,
864			   __u32 ret_code, __u32 data_len, void *buf)
865{
866	msg_info->ret = ret_code;
867	msg_info->size = data_len;
868	msg_info->buf = buf;
869	atomic_set(&msg_info->valid, MSG_Q_END_RECV);
870	wake_up_interruptible(&msg_info->response_q);
871}
872
873static int hmdfs_readfile_slice(struct sendmsg_wait_queue *msg_info,
874				struct work_handler_desp *desp)
875{
876	struct slice_descriptor *sdesc = desp->buf;
877	void *slice_buf = sdesc + 1;
878	struct file_recv_info *recv_info = &msg_info->recv_info;
879	struct file *filp = recv_info->local_filp;
880	loff_t offset;
881	ssize_t written_size;
882
883	if (filp == NULL) {
884		hmdfs_warning("recv_info filp is NULL \n");
885		return -EINVAL;
886	}
887
888	if (atomic_read(&recv_info->state) != FILE_RECV_PROCESS)
889		return -EBUSY;
890
891	offset = le32_to_cpu(sdesc->slice_size) * le32_to_cpu(sdesc->slice_sn);
892
893	written_size = kernel_write(filp, slice_buf,
894				    le32_to_cpu(sdesc->content_size), &offset);
895	if (IS_ERR_VALUE(written_size)) {
896		atomic_set(&recv_info->state, FILE_RECV_ERR_SPC);
897		hmdfs_info("Fatal! Cannot store a file slice %d/%d, ret = %d",
898			   le32_to_cpu(sdesc->slice_sn),
899			   le32_to_cpu(sdesc->num_slices), (int)written_size);
900		return (int)written_size;
901	}
902
903	if (atomic_inc_return(&recv_info->local_fslices) >=
904	    le32_to_cpu(sdesc->num_slices))
905		atomic_set(&recv_info->state, FILE_RECV_SUCC);
906	return 0;
907}
908
909static void hmdfs_file_response_work_fn(struct work_struct *ptr)
910{
911	struct work_handler_desp *desp =
912		container_of(ptr, struct work_handler_desp, work);
913	struct sendmsg_wait_queue *msg_info = NULL;
914	int ret;
915	atomic_t *pstate = NULL;
916	u8 cmd = desp->head->operations.command;
917	const struct cred *old_cred =
918		hmdfs_override_creds(desp->peer->sbi->cred);
919
920	msg_info = (struct sendmsg_wait_queue *)hmdfs_find_msg_head(desp->peer,
921		le32_to_cpu(desp->head->msg_id), desp->head->operations);
922	if (!msg_info || atomic_read(&msg_info->valid) != MSG_Q_SEND) {
923		hmdfs_client_resp_statis(desp->peer->sbi, cmd, HMDFS_RESP_DELAY,
924					 0, 0);
925		hmdfs_info("cannot find msg(id %d)",
926			   le32_to_cpu(desp->head->msg_id));
927		goto free;
928	}
929
930	ret = le32_to_cpu(desp->head->ret_code);
931	if (ret || le32_to_cpu(desp->head->data_len) == sizeof(*desp->head))
932		goto wakeup;
933	ret = hmdfs_readfile_slice(msg_info, desp);
934	pstate = &msg_info->recv_info.state;
935	if (ret || atomic_read(pstate) != FILE_RECV_PROCESS)
936		goto wakeup;
937	goto free;
938
939wakeup:
940	hmdfs_response_wakeup(msg_info, ret, sizeof(struct hmdfs_head_cmd),
941			      NULL);
942	hmdfs_client_resp_statis(desp->peer->sbi, cmd, HMDFS_RESP_NORMAL,
943				 msg_info->start, jiffies);
944free:
945	if (msg_info)
946		msg_put(msg_info);
947	peer_put(desp->peer);
948	hmdfs_revert_creds(old_cred);
949
950	kfree(desp->buf);
951	kfree(desp->head);
952	kfree(desp);
953}
954
955static void hmdfs_wait_mp_wfired(struct hmdfs_msg_parasite *mp)
956{
957	/* We just cancel queued works */
958	while (unlikely(!smp_load_acquire(&mp->wfired)))
959		usleep_range(ACQUIRE_WFIRED_INTVAL_USEC_MIN,
960			     ACQUIRE_WFIRED_INTVAL_USEC_MAX);
961}
962
963int hmdfs_response_handle_sync(struct hmdfs_peer *con,
964			       struct hmdfs_head_cmd *head, void *buf)
965{
966	struct sendmsg_wait_queue *msg_info = NULL;
967	struct hmdfs_msg_parasite *mp = NULL;
968	struct hmdfs_msg_idr_head *msg_head = NULL;
969	u32 msg_id = le32_to_cpu(head->msg_id);
970	bool woke = false;
971	u8 cmd = head->operations.command;
972
973	msg_head = hmdfs_find_msg_head(con, msg_id, head->operations);
974	if (!msg_head)
975		goto out;
976
977	switch (msg_head->type) {
978	case MSG_IDR_MESSAGE_SYNC:
979		msg_info = (struct sendmsg_wait_queue *)msg_head;
980		if (atomic_read(&msg_info->valid) == MSG_Q_SEND) {
981			hmdfs_response_wakeup(msg_info,
982					      le32_to_cpu(head->ret_code),
983					      le32_to_cpu(head->data_len), buf);
984			hmdfs_client_resp_statis(con->sbi, cmd,
985						 HMDFS_RESP_NORMAL,
986						 msg_info->start, jiffies);
987			woke = true;
988		}
989
990		msg_put(msg_info);
991		break;
992	case MSG_IDR_MESSAGE_ASYNC:
993		mp = (struct hmdfs_msg_parasite *)msg_head;
994
995		hmdfs_wait_mp_wfired(mp);
996		if (cancel_delayed_work(&mp->d_work)) {
997			mp->resp.out_buf = buf;
998			mp->resp.out_len =
999				le32_to_cpu(head->data_len) - sizeof(*head);
1000			mp->resp.ret_code = le32_to_cpu(head->ret_code);
1001			queue_delayed_work(con->async_wq, &mp->d_work, 0);
1002			hmdfs_client_resp_statis(con->sbi, cmd,
1003						 HMDFS_RESP_NORMAL, mp->start,
1004						 jiffies);
1005			woke = true;
1006		}
1007		mp_put(mp);
1008		break;
1009	default:
1010		hmdfs_err("receive incorrect msg type %d msg_id %d cmd %d",
1011			  msg_head->type, msg_id, cmd);
1012		break;
1013	}
1014
1015	if (likely(woke))
1016		return 0;
1017out:
1018	hmdfs_client_resp_statis(con->sbi, cmd, HMDFS_RESP_DELAY, 0, 0);
1019	hmdfs_info("cannot find msg_id %d cmd %d", msg_id, cmd);
1020	return -EINVAL;
1021}
1022
1023static int hmdfs_response_recv(struct hmdfs_peer *con,
1024			       struct hmdfs_head_cmd *head, void *buf)
1025{
1026	__u16 command = head->operations.command;
1027	int ret;
1028
1029	if (command >= F_SIZE) {
1030		ret = -EINVAL;
1031		return ret;
1032	}
1033
1034	switch (head->operations.command) {
1035	case F_OPEN:
1036	case F_RELEASE:
1037	case F_READPAGE:
1038	case F_WRITEPAGE:
1039	case F_MKDIR:
1040	case F_RMDIR:
1041	case F_CREATE:
1042	case F_UNLINK:
1043	case F_RENAME:
1044	case F_SETATTR:
1045	case F_STATFS:
1046	case F_CONNECT_REKEY:
1047	case F_DROP_PUSH:
1048	case F_GETATTR:
1049	case F_FSYNC:
1050	case F_SYNCFS:
1051	case F_GETXATTR:
1052	case F_SETXATTR:
1053	case F_LISTXATTR:
1054		ret = hmdfs_response_handle_sync(con, head, buf);
1055		return ret;
1056
1057	case F_ITERATE:
1058		ret = hmdfs_msg_handle_async(con, head, buf, con->async_wq,
1059					     hmdfs_file_response_work_fn);
1060		return ret;
1061
1062	default:
1063		hmdfs_err("Fatal! Unexpected response command %d",
1064			  head->operations.command);
1065		ret = -EINVAL;
1066		return ret;
1067	}
1068}
1069
1070void hmdfs_recv_mesg_callback(struct hmdfs_peer *con, void *head,
1071				     void *buf)
1072{
1073	struct hmdfs_head_cmd *hmdfs_head = (struct hmdfs_head_cmd *)head;
1074
1075	trace_hmdfs_recv_mesg_callback(hmdfs_head);
1076
1077	if (hmdfs_message_verify(con, hmdfs_head, buf) < 0) {
1078		hmdfs_info("Message %d has been abandoned", hmdfs_head->msg_id);
1079		goto out_err;
1080	}
1081
1082	switch (hmdfs_head->operations.cmd_flag) {
1083	case C_REQUEST:
1084		if (hmdfs_request_recv(con, hmdfs_head, buf) < 0)
1085			goto out_err;
1086		break;
1087
1088	case C_RESPONSE:
1089		if (hmdfs_response_recv(con, hmdfs_head, buf) < 0)
1090			goto out_err;
1091		break;
1092
1093	default:
1094		hmdfs_err("Fatal! Unexpected msg cmd %d",
1095			  hmdfs_head->operations.cmd_flag);
1096		goto out_err;
1097	}
1098	return;
1099
1100out_err:
1101	kfree(buf);
1102}
1103
1104void hmdfs_wakeup_parasite(struct hmdfs_msg_parasite *mp)
1105{
1106	hmdfs_wait_mp_wfired(mp);
1107	if (!cancel_delayed_work(&mp->d_work))
1108		hmdfs_err("cancel parasite work err msg_id=%d cmd=%d",
1109			  mp->head.msg_id, mp->req.operations.command);
1110	else
1111		async_request_cb_on_wakeup_fn(&mp->d_work.work);
1112}
1113
1114void hmdfs_wakeup_async_work(struct hmdfs_async_work *async_work)
1115{
1116	if (!cancel_delayed_work(&async_work->d_work))
1117		hmdfs_err("cancel async work err msg_id=%d",
1118			  async_work->head.msg_id);
1119	else
1120		hmdfs_recv_page_work_fn(&async_work->d_work.work);
1121}
1122