1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * fs/hmdfs/comm/socket_adapter.c
4  *
5  * Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6  */
7 
8 #include "socket_adapter.h"
9 
10 #include <linux/file.h>
11 #include <linux/module.h>
12 #include <linux/namei.h>
13 #include <linux/net.h>
14 #include <linux/pagemap.h>
15 #include <net/sock.h>
16 
17 #include "authority/authentication.h"
18 #include "comm/device_node.h"
19 #include "hmdfs_client.h"
20 #include "hmdfs_server.h"
21 #include "hmdfs_trace.h"
22 #include "message_verify.h"
23 
24 #define ACQUIRE_WFIRED_INTVAL_USEC_MIN 10
25 #define ACQUIRE_WFIRED_INTVAL_USEC_MAX 30
26 
27 typedef void (*request_callback)(struct hmdfs_peer *, struct hmdfs_head_cmd *,
28 				 void *);
29 typedef void (*response_callback)(struct hmdfs_peer *,
30 				  struct sendmsg_wait_queue *, void *, size_t);
31 
32 static const request_callback s_recv_callbacks[F_SIZE] = {
33 	[F_OPEN] = hmdfs_server_open,
34 	[F_READPAGE] = hmdfs_server_readpage,
35 	[F_RELEASE] = hmdfs_server_release,
36 	[F_WRITEPAGE] = hmdfs_server_writepage,
37 	[F_ITERATE] = hmdfs_server_readdir,
38 	[F_MKDIR] = hmdfs_server_mkdir,
39 	[F_CREATE] = hmdfs_server_create,
40 	[F_RMDIR] = hmdfs_server_rmdir,
41 	[F_UNLINK] = hmdfs_server_unlink,
42 	[F_RENAME] = hmdfs_server_rename,
43 	[F_SETATTR] = hmdfs_server_setattr,
44 	[F_STATFS] = hmdfs_server_statfs,
45 	[F_DROP_PUSH] = hmdfs_server_get_drop_push,
46 	[F_GETATTR] = hmdfs_server_getattr,
47 	[F_FSYNC] = hmdfs_server_fsync,
48 	[F_SYNCFS] = hmdfs_server_syncfs,
49 	[F_GETXATTR] = hmdfs_server_getxattr,
50 	[F_SETXATTR] = hmdfs_server_setxattr,
51 	[F_LISTXATTR] = hmdfs_server_listxattr,
52 	[F_ATOMIC_OPEN] = hmdfs_server_atomic_open,
53 };
54 
55 typedef void (*file_request_callback)(struct hmdfs_peer *,
56 				      struct hmdfs_send_command *);
57 
58 struct async_req_callbacks {
59 	void (*on_wakeup)(struct hmdfs_peer *peer, const struct hmdfs_req *req,
60 			  const struct hmdfs_resp *resp);
61 };
62 
63 static const struct async_req_callbacks g_async_req_callbacks[F_SIZE] = {
64 	[F_SYNCFS] = { .on_wakeup = hmdfs_recv_syncfs_cb },
65 	[F_WRITEPAGE] = { .on_wakeup = hmdfs_writepage_cb },
66 };
67 
msg_release(struct kref *kref)68 static void msg_release(struct kref *kref)
69 {
70 	struct sendmsg_wait_queue *msg_wq;
71 	struct hmdfs_peer *con;
72 
73 	msg_wq = (struct sendmsg_wait_queue *)container_of(kref,
74 			struct hmdfs_msg_idr_head, ref);
75 	con = msg_wq->head.peer;
76 	idr_remove(&con->msg_idr, msg_wq->head.msg_id);
77 	spin_unlock(&con->idr_lock);
78 
79 	kfree(msg_wq->buf);
80 	if (msg_wq->recv_info.local_filp)
81 		fput(msg_wq->recv_info.local_filp);
82 	kfree(msg_wq);
83 }
84 
85 // Always remember to find before put, and make sure con is avilable
msg_put(struct sendmsg_wait_queue *msg_wq)86 void msg_put(struct sendmsg_wait_queue *msg_wq)
87 {
88 	kref_put_lock(&msg_wq->head.ref, msg_release,
89 		      &msg_wq->head.peer->idr_lock);
90 }
91 
recv_info_init(struct file_recv_info *recv_info)92 static void recv_info_init(struct file_recv_info *recv_info)
93 {
94 	memset(recv_info, 0, sizeof(struct file_recv_info));
95 	atomic_set(&recv_info->local_fslices, 0);
96 	atomic_set(&recv_info->state, FILE_RECV_PROCESS);
97 }
98 
msg_init(struct hmdfs_peer *con, struct sendmsg_wait_queue *msg_wq, struct hmdfs_cmd operations)99 static int msg_init(struct hmdfs_peer *con, struct sendmsg_wait_queue *msg_wq,
100 		    struct hmdfs_cmd operations)
101 {
102 	int ret = 0;
103 	struct file_recv_info *recv_info = &msg_wq->recv_info;
104 
105 	ret = hmdfs_alloc_msg_idr(con, MSG_IDR_MESSAGE_SYNC, msg_wq, operations);
106 	if (unlikely(ret))
107 		return ret;
108 
109 	atomic_set(&msg_wq->valid, MSG_Q_SEND);
110 	init_waitqueue_head(&msg_wq->response_q);
111 	recv_info_init(recv_info);
112 	msg_wq->start = jiffies;
113 	return 0;
114 }
115 
statistic_con_sb_dirty(struct hmdfs_peer *con, const struct hmdfs_cmd *op)116 static inline void statistic_con_sb_dirty(struct hmdfs_peer *con,
117 					  const struct hmdfs_cmd *op)
118 {
119 	if (op->command == F_WRITEPAGE && op->cmd_flag == C_REQUEST)
120 		atomic64_inc(&con->sb_dirty_count);
121 }
122 
hmdfs_sendmessage(struct hmdfs_peer *node, struct hmdfs_send_data *msg)123 int hmdfs_sendmessage(struct hmdfs_peer *node, struct hmdfs_send_data *msg)
124 {
125 	int ret = 0;
126 	struct connection *connect = NULL;
127 	struct tcp_handle *tcp = NULL;
128 	struct hmdfs_head_cmd *head = msg->head;
129 	const struct cred *old_cred;
130 
131 	if (!node) {
132 		hmdfs_err("node NULL when send cmd %d",
133 			  head->operations.command);
134 		ret = -EAGAIN;
135 		goto out_err;
136 	} else if (node->status != NODE_STAT_ONLINE) {
137 		hmdfs_err("device %llu OFFLINE %d when send cmd %d",
138 			  node->device_id, node->status,
139 			  head->operations.command);
140 		ret = -EAGAIN;
141 		goto out;
142 	}
143 
144 	old_cred = hmdfs_override_creds(node->sbi->system_cred);
145 
146 	do {
147 		connect = get_conn_impl(node, CONNECT_TYPE_TCP);
148 		if (!connect) {
149 			hmdfs_info_ratelimited(
150 				"device %llu no connection available when send cmd %d, get new session",
151 				node->device_id, head->operations.command);
152 			if (node->status != NODE_STAT_OFFLINE) {
153 				struct notify_param param;
154 
155 				memcpy(param.remote_cid, node->cid,
156 				       HMDFS_CID_SIZE);
157 				param.notify = NOTIFY_OFFLINE;
158 				param.fd = INVALID_SOCKET_FD;
159 				notify(node, &param);
160 			}
161 			ret = -EAGAIN;
162 			goto revert_cred;
163 		}
164 
165 		ret = connect->send_message(connect, msg);
166 		if (ret == -ESHUTDOWN) {
167 			hmdfs_info("device %llu send cmd %d message fail, connection stop",
168 				   node->device_id, head->operations.command);
169 			connect->status = CONNECT_STAT_STOP;
170 			tcp = connect->connect_handle;
171 			if (node->status != NODE_STAT_OFFLINE) {
172 				connection_get(connect);
173 				if (!queue_work(node->reget_conn_wq,
174 						&connect->reget_work))
175 					connection_put(connect);
176 			}
177 			connection_put(connect);
178 			/*
179 			 * node->status is OFFLINE can not ensure
180 			 * node_seq will be increased before
181 			 * hmdfs_sendmessage() returns.
182 			 */
183 			hmdfs_node_inc_evt_seq(node);
184 		} else {
185 			connection_put(connect);
186 			goto revert_cred;
187 		}
188 	} while (node->status != NODE_STAT_OFFLINE);
189 revert_cred:
190 	hmdfs_revert_creds(old_cred);
191 
192 	if (!ret)
193 		statistic_con_sb_dirty(node, &head->operations);
194 out:
195 	if (head->operations.cmd_flag == C_REQUEST)
196 		hmdfs_client_snd_statis(node->sbi,
197 					head->operations.command, ret);
198 	else if (head->operations.cmd_flag == C_RESPONSE)
199 		hmdfs_server_snd_statis(node->sbi,
200 					head->operations.command, ret);
201 out_err:
202 	return ret;
203 }
204 
hmdfs_sendmessage_response(struct hmdfs_peer *con, struct hmdfs_head_cmd *cmd, __u32 data_len, void *buf, __u32 ret_code)205 int hmdfs_sendmessage_response(struct hmdfs_peer *con,
206 			       struct hmdfs_head_cmd *cmd, __u32 data_len,
207 			       void *buf, __u32 ret_code)
208 {
209 	int ret;
210 	struct hmdfs_send_data msg;
211 	struct hmdfs_head_cmd head;
212 
213 	head.magic = HMDFS_MSG_MAGIC;
214 	head.version = HMDFS_VERSION;
215 	head.operations = cmd->operations;
216 	head.operations.cmd_flag = C_RESPONSE;
217 	head.data_len = cpu_to_le32(data_len + sizeof(struct hmdfs_head_cmd));
218 	head.ret_code = cpu_to_le32(ret_code);
219 	head.msg_id = cmd->msg_id;
220 	head.reserved = cmd->reserved;
221 	head.reserved1 = cmd->reserved1;
222 	msg.head = &head;
223 	msg.head_len = sizeof(struct hmdfs_head_cmd);
224 	msg.data = buf;
225 	msg.len = data_len;
226 	msg.sdesc = NULL;
227 	msg.sdesc_len = 0;
228 
229 	ret = hmdfs_sendmessage(con, &msg);
230 	return ret;
231 }
232 
mp_release(struct kref *kref)233 static void mp_release(struct kref *kref)
234 {
235 	struct hmdfs_msg_parasite *mp = NULL;
236 	struct hmdfs_peer *peer = NULL;
237 
238 	mp = (struct hmdfs_msg_parasite *)container_of(kref,
239 			struct hmdfs_msg_idr_head, ref);
240 	peer = mp->head.peer;
241 	idr_remove(&peer->msg_idr, mp->head.msg_id);
242 	spin_unlock(&peer->idr_lock);
243 
244 	peer_put(peer);
245 	kfree(mp->resp.out_buf);
246 	kfree(mp);
247 }
248 
mp_put(struct hmdfs_msg_parasite *mp)249 void mp_put(struct hmdfs_msg_parasite *mp)
250 {
251 	kref_put_lock(&mp->head.ref, mp_release, &mp->head.peer->idr_lock);
252 }
253 
async_request_cb_on_wakeup_fn(struct work_struct *w)254 static void async_request_cb_on_wakeup_fn(struct work_struct *w)
255 {
256 	struct hmdfs_msg_parasite *mp =
257 		container_of(w, struct hmdfs_msg_parasite, d_work.work);
258 	struct async_req_callbacks cbs;
259 	const struct cred *old_cred =
260 		hmdfs_override_creds(mp->head.peer->sbi->cred);
261 
262 	if (mp->resp.ret_code == -ETIME)
263 		hmdfs_client_resp_statis(mp->head.peer->sbi,
264 					 mp->req.operations.command,
265 					 HMDFS_RESP_TIMEOUT, 0, 0);
266 
267 	cbs = g_async_req_callbacks[mp->req.operations.command];
268 	if (cbs.on_wakeup)
269 		(*cbs.on_wakeup)(mp->head.peer, &mp->req, &mp->resp);
270 	mp_put(mp);
271 	hmdfs_revert_creds(old_cred);
272 }
273 
mp_alloc(struct hmdfs_peer *peer, const struct hmdfs_req *req)274 static struct hmdfs_msg_parasite *mp_alloc(struct hmdfs_peer *peer,
275 					   const struct hmdfs_req *req)
276 {
277 	struct hmdfs_msg_parasite *mp = kzalloc(sizeof(*mp), GFP_KERNEL);
278 	int ret;
279 
280 	if (unlikely(!mp))
281 		return ERR_PTR(-ENOMEM);
282 
283 	ret = hmdfs_alloc_msg_idr(peer, MSG_IDR_MESSAGE_ASYNC, mp,
284 				  mp->head.send_cmd_operations);
285 	if (unlikely(ret)) {
286 		kfree(mp);
287 		return ERR_PTR(ret);
288 	}
289 
290 	mp->start = jiffies;
291 	peer_get(mp->head.peer);
292 	mp->resp.ret_code = -ETIME;
293 	INIT_DELAYED_WORK(&mp->d_work, async_request_cb_on_wakeup_fn);
294 	mp->wfired = false;
295 	mp->req = *req;
296 	return mp;
297 }
298 
299 /**
300  * hmdfs_send_async_request - sendout a async request
301  * @peer: target device node
302  * @req: request descriptor + necessary contexts
303  *
304  * Sendout a request synchronously and wait for its response asynchronously
305  * Return -ESHUTDOWN when the device node is unachievable
306  * Return -EAGAIN if the network is recovering
307  * Return -ENOMEM if out of memory
308  *
309  * Register g_async_req_callbacks to recv the response
310  */
hmdfs_send_async_request(struct hmdfs_peer *peer, const struct hmdfs_req *req)311 int hmdfs_send_async_request(struct hmdfs_peer *peer,
312 			     const struct hmdfs_req *req)
313 {
314 	int ret = 0;
315 	struct hmdfs_send_data msg;
316 	struct hmdfs_head_cmd head;
317 	struct hmdfs_msg_parasite *mp = NULL;
318 	size_t msg_len = req->data_len + sizeof(struct hmdfs_head_cmd);
319 	unsigned int timeout;
320 
321 	if (req->timeout == TIMEOUT_CONFIG)
322 		timeout = get_cmd_timeout(peer->sbi, req->operations.command);
323 	else
324 		timeout = req->timeout;
325 	if (timeout == TIMEOUT_UNINIT || timeout == TIMEOUT_NONE) {
326 		hmdfs_err("send msg %d with uninitialized/invalid timeout",
327 			  req->operations.command);
328 		return -EINVAL;
329 	}
330 
331 	if (!hmdfs_is_node_online(peer))
332 		return -EAGAIN;
333 
334 	mp = mp_alloc(peer, req);
335 	if (IS_ERR(mp))
336 		return PTR_ERR(mp);
337 	head.magic = HMDFS_MSG_MAGIC;
338 	head.version = HMDFS_VERSION;
339 	head.data_len = cpu_to_le32(msg_len);
340 	head.operations = mp->req.operations;
341 	head.msg_id = cpu_to_le32(mp->head.msg_id);
342 	head.reserved = 0;
343 	head.reserved1 = 0;
344 
345 	msg.head = &head;
346 	msg.head_len = sizeof(head);
347 	msg.data = mp->req.data;
348 	msg.len = mp->req.data_len;
349 	msg.sdesc_len = 0;
350 	msg.sdesc = NULL;
351 
352 	ret = hmdfs_sendmessage(peer, &msg);
353 	if (unlikely(ret)) {
354 		mp_put(mp);
355 		goto out;
356 	}
357 
358 	queue_delayed_work(peer->async_wq, &mp->d_work, timeout * HZ);
359 	/*
360 	 * The work may havn't been queued upon the arriving of it's response,
361 	 * resulting in meaningless waiting. So we use the membar to tell the
362 	 * recv thread if the work has been queued
363 	 */
364 	smp_store_release(&mp->wfired, true);
365 out:
366 	hmdfs_dec_msg_idr_process(peer);
367 	return ret;
368 }
369 
hmdfs_record_async_readdir(struct hmdfs_peer *con, struct sendmsg_wait_queue *msg_wq)370 static int hmdfs_record_async_readdir(struct hmdfs_peer *con,
371 				      struct sendmsg_wait_queue *msg_wq)
372 {
373 	struct hmdfs_sb_info *sbi = con->sbi;
374 
375 	spin_lock(&sbi->async_readdir_msg_lock);
376 	if (sbi->async_readdir_prohibit) {
377 		spin_unlock(&sbi->async_readdir_msg_lock);
378 		return -EINTR;
379 	}
380 
381 	list_add(&msg_wq->async_msg, &sbi->async_readdir_msg_list);
382 	spin_unlock(&sbi->async_readdir_msg_lock);
383 
384 	return 0;
385 }
386 
hmdfs_untrack_async_readdir(struct hmdfs_peer *con, struct sendmsg_wait_queue *msg_wq)387 static void hmdfs_untrack_async_readdir(struct hmdfs_peer *con,
388 					struct sendmsg_wait_queue *msg_wq)
389 {
390 	struct hmdfs_sb_info *sbi = con->sbi;
391 
392 	spin_lock(&sbi->async_readdir_msg_lock);
393 	list_del(&msg_wq->async_msg);
394 	spin_unlock(&sbi->async_readdir_msg_lock);
395 }
396 
hmdfs_sendmessage_request(struct hmdfs_peer *con, struct hmdfs_send_command *sm)397 int hmdfs_sendmessage_request(struct hmdfs_peer *con,
398 			      struct hmdfs_send_command *sm)
399 {
400 	int time_left;
401 	int ret = 0;
402 	struct sendmsg_wait_queue *msg_wq = NULL;
403 	struct hmdfs_send_data msg;
404 	size_t outlen = sm->len + sizeof(struct hmdfs_head_cmd);
405 	unsigned int timeout =
406 		get_cmd_timeout(con->sbi, sm->operations.command);
407 	struct hmdfs_head_cmd *head = NULL;
408 	bool dec = false;
409 
410 	if (!hmdfs_is_node_online(con)) {
411 		ret = -EAGAIN;
412 		goto free_filp;
413 	}
414 
415 	if (timeout == TIMEOUT_UNINIT) {
416 		hmdfs_err_ratelimited("send msg %d with uninitialized timeout",
417 				      sm->operations.command);
418 		ret = -EINVAL;
419 		goto free_filp;
420 	}
421 
422 	head = kzalloc(sizeof(struct hmdfs_head_cmd), GFP_KERNEL);
423 	if (!head) {
424 		ret = -ENOMEM;
425 		goto free_filp;
426 	}
427 
428 	sm->out_buf = NULL;
429 	head->magic = HMDFS_MSG_MAGIC;
430 	head->version = HMDFS_VERSION;
431 	head->operations = sm->operations;
432 	head->data_len = cpu_to_le32(outlen);
433 	head->ret_code = cpu_to_le32(sm->ret_code);
434 	head->reserved = 0;
435 	head->reserved1 = 0;
436 	if (timeout != TIMEOUT_NONE) {
437 		msg_wq = kzalloc(sizeof(*msg_wq), GFP_KERNEL);
438 		if (!msg_wq) {
439 			ret = -ENOMEM;
440 			goto free_filp;
441 		}
442 		ret = msg_init(con, msg_wq, sm->operations);
443 		if (ret) {
444 			kfree(msg_wq);
445 			msg_wq = NULL;
446 			goto free_filp;
447 		}
448 		dec = true;
449 		head->msg_id = cpu_to_le32(msg_wq->head.msg_id);
450 		if (sm->operations.command == F_ITERATE)
451 			msg_wq->recv_info.local_filp = sm->local_filp;
452 	}
453 	msg.head = head;
454 	msg.head_len = sizeof(struct hmdfs_head_cmd);
455 	msg.data = sm->data;
456 	msg.len = sm->len;
457 	msg.sdesc_len = 0;
458 	msg.sdesc = NULL;
459 	ret = hmdfs_sendmessage(con, &msg);
460 	if (ret) {
461 		hmdfs_err_ratelimited("send err sm->device_id, %lld, msg_id %u",
462 				      con->device_id, head->msg_id);
463 		goto free;
464 	}
465 
466 	if (timeout == TIMEOUT_NONE)
467 		goto free;
468 
469 	hmdfs_dec_msg_idr_process(con);
470 	dec = false;
471 
472 	if (sm->operations.command == F_ITERATE) {
473 		ret = hmdfs_record_async_readdir(con, msg_wq);
474 		if (ret) {
475 			atomic_set(&msg_wq->recv_info.state, FILE_RECV_ERR_SPC);
476 			goto free;
477 		}
478 	}
479 
480 	time_left = wait_event_interruptible_timeout(
481 		msg_wq->response_q,
482 		(atomic_read(&msg_wq->valid) == MSG_Q_END_RECV), timeout * HZ);
483 
484 	if (sm->operations.command == F_ITERATE)
485 		hmdfs_untrack_async_readdir(con, msg_wq);
486 
487 	if (time_left == -ERESTARTSYS || time_left == 0) {
488 		hmdfs_err("timeout err sm->device_id %lld,  msg_id %d cmd %d",
489 			  con->device_id, head->msg_id,
490 			  head->operations.command);
491 		if (sm->operations.command == F_ITERATE)
492 			atomic_set(&msg_wq->recv_info.state, FILE_RECV_ERR_NET);
493 		ret = -ETIME;
494 		hmdfs_client_resp_statis(con->sbi, sm->operations.command,
495 					 HMDFS_RESP_TIMEOUT, 0, 0);
496 		goto free;
497 	}
498 	sm->out_buf = msg_wq->buf;
499 	msg_wq->buf = NULL;
500 	sm->out_len = msg_wq->size - sizeof(struct hmdfs_head_cmd);
501 	ret = msg_wq->ret;
502 
503 free:
504 	if (msg_wq)
505 		msg_put(msg_wq);
506 	if (dec)
507 		hmdfs_dec_msg_idr_process(con);
508 	kfree(head);
509 	return ret;
510 
511 free_filp:
512 	if (sm->local_filp)
513 		fput(sm->local_filp);
514 	kfree(head);
515 	return ret;
516 }
517 
hmdfs_send_slice(struct hmdfs_peer *con, struct hmdfs_head_cmd *cmd, struct slice_descriptor *sdesc, void *slice_buf)518 static int hmdfs_send_slice(struct hmdfs_peer *con, struct hmdfs_head_cmd *cmd,
519 			    struct slice_descriptor *sdesc, void *slice_buf)
520 {
521 	int ret;
522 	struct hmdfs_send_data msg;
523 	struct hmdfs_head_cmd head;
524 	int content_size = le32_to_cpu(sdesc->content_size);
525 	int msg_len = sizeof(struct hmdfs_head_cmd) + content_size +
526 		      sizeof(struct slice_descriptor);
527 
528 	head.magic = HMDFS_MSG_MAGIC;
529 	head.version = HMDFS_VERSION;
530 	head.operations = cmd->operations;
531 	head.operations.cmd_flag = C_RESPONSE;
532 	head.data_len = cpu_to_le32(msg_len);
533 	head.ret_code = cpu_to_le32(0);
534 	head.msg_id = cmd->msg_id;
535 	head.reserved = cmd->reserved;
536 	head.reserved1 = cmd->reserved1;
537 
538 	msg.head = &head;
539 	msg.head_len = sizeof(struct hmdfs_head_cmd);
540 	msg.sdesc = sdesc;
541 	msg.sdesc_len = le32_to_cpu(sizeof(struct slice_descriptor));
542 	msg.data = slice_buf;
543 	msg.len = content_size;
544 
545 	ret = hmdfs_sendmessage(con, &msg);
546 
547 	return ret;
548 }
549 
hmdfs_readfile_response(struct hmdfs_peer *con, struct hmdfs_head_cmd *head, struct file *filp)550 int hmdfs_readfile_response(struct hmdfs_peer *con, struct hmdfs_head_cmd *head,
551 			    struct file *filp)
552 {
553 	int ret;
554 	const unsigned int slice_size = PAGE_SIZE;
555 	char *slice_buf = NULL;
556 	loff_t file_offset = 0, file_size;
557 	ssize_t size;
558 	struct slice_descriptor sdesc;
559 	unsigned int slice_sn = 0;
560 
561 	if (!filp)
562 		return hmdfs_sendmessage_response(con, head, 0, NULL, 0);
563 
564 	sdesc.slice_size = cpu_to_le32(slice_size);
565 	file_size = i_size_read(file_inode(filp));
566 	file_size = round_up(file_size, slice_size);
567 	sdesc.num_slices = cpu_to_le32(file_size / slice_size);
568 
569 	slice_buf = kmalloc(slice_size, GFP_KERNEL);
570 	if (!slice_buf) {
571 		ret = -ENOMEM;
572 		goto out;
573 	}
574 
575 	while (1) {
576 		sdesc.slice_sn = cpu_to_le32(slice_sn++);
577 		size = kernel_read(filp, slice_buf, (size_t)slice_size,
578 				   &file_offset);
579 		if (IS_ERR_VALUE(size)) {
580 			ret = (int)size;
581 			goto out;
582 		}
583 		sdesc.content_size = cpu_to_le32(size);
584 		ret = hmdfs_send_slice(con, head, &sdesc, slice_buf);
585 		if (ret) {
586 			hmdfs_info("Cannot send file slice %d ",
587 				   le32_to_cpu(sdesc.slice_sn));
588 			break;
589 		}
590 		if (file_offset >= i_size_read(file_inode(filp)))
591 			break;
592 	}
593 
594 out:
595 	kfree(slice_buf);
596 	if (ret)
597 		hmdfs_sendmessage_response(con, head, 0, NULL, ret);
598 	return ret;
599 }
600 
asw_release(struct kref *kref)601 static void asw_release(struct kref *kref)
602 {
603 	struct hmdfs_async_work *asw = NULL;
604 	struct hmdfs_peer *peer = NULL;
605 
606 	asw = (struct hmdfs_async_work *)container_of(kref,
607 			struct hmdfs_msg_idr_head, ref);
608 	peer = asw->head.peer;
609 	idr_remove(&peer->msg_idr, asw->head.msg_id);
610 	spin_unlock(&peer->idr_lock);
611 	kfree(asw);
612 }
613 
asw_put(struct hmdfs_async_work *asw)614 void asw_put(struct hmdfs_async_work *asw)
615 {
616 	kref_put_lock(&asw->head.ref, asw_release, &asw->head.peer->idr_lock);
617 }
618 
hmdfs_recv_page_work_fn(struct work_struct *ptr)619 void hmdfs_recv_page_work_fn(struct work_struct *ptr)
620 {
621 	struct hmdfs_async_work *async_work =
622 		container_of(ptr, struct hmdfs_async_work, d_work.work);
623 
624 	hmdfs_client_resp_statis(async_work->head.peer->sbi,
625 					F_READPAGE, HMDFS_RESP_TIMEOUT, 0, 0);
626 	hmdfs_err_ratelimited("timeout and release page, msg_id:%u",
627 			      async_work->head.msg_id);
628 	asw_done(async_work);
629 }
630 
hmdfs_sendpage_request(struct hmdfs_peer *con, struct hmdfs_send_command *sm)631 int hmdfs_sendpage_request(struct hmdfs_peer *con,
632 			   struct hmdfs_send_command *sm)
633 {
634 	int ret = 0;
635 	struct hmdfs_send_data msg;
636 	struct hmdfs_async_work *async_work = NULL;
637 	size_t outlen = sm->len + sizeof(struct hmdfs_head_cmd);
638 	struct hmdfs_head_cmd head;
639 	unsigned int timeout;
640 	unsigned long start = jiffies;
641 
642 	WARN_ON(!sm->out_buf);
643 
644 	timeout = get_cmd_timeout(con->sbi, sm->operations.command);
645 	if (timeout == TIMEOUT_UNINIT) {
646 		hmdfs_err("send msg %d with uninitialized timeout",
647 			  sm->operations.command);
648 		ret = -EINVAL;
649 		goto unlock;
650 	}
651 
652 	if (!hmdfs_is_node_online(con)) {
653 		ret = -EAGAIN;
654 		goto unlock;
655 	}
656 
657 	memset(&head, 0, sizeof(head));
658 	head.magic = HMDFS_MSG_MAGIC;
659 	head.version = HMDFS_VERSION;
660 	head.operations = sm->operations;
661 	head.data_len = cpu_to_le32(outlen);
662 	head.ret_code = cpu_to_le32(sm->ret_code);
663 	head.reserved = 0;
664 	head.reserved1 = 0;
665 
666 	msg.head = &head;
667 	msg.head_len = sizeof(struct hmdfs_head_cmd);
668 	msg.data = sm->data;
669 	msg.len = sm->len;
670 	msg.sdesc_len = 0;
671 	msg.sdesc = NULL;
672 
673 	async_work = kzalloc(sizeof(*async_work), GFP_KERNEL);
674 	if (!async_work) {
675 		ret = -ENOMEM;
676 		goto unlock;
677 	}
678 	async_work->start = start;
679 	ret = hmdfs_alloc_msg_idr(con, MSG_IDR_PAGE, async_work, sm->operations);
680 	if (ret) {
681 		hmdfs_err("alloc msg_id failed, err %d", ret);
682 		goto unlock;
683 	}
684 	head.msg_id = cpu_to_le32(async_work->head.msg_id);
685 	async_work->page = sm->out_buf;
686 	asw_get(async_work);
687 	INIT_DELAYED_WORK(&async_work->d_work, hmdfs_recv_page_work_fn);
688 	ret = queue_delayed_work(con->async_wq, &async_work->d_work,
689 				 timeout * HZ);
690 	if (!ret) {
691 		hmdfs_err("queue_delayed_work failed, msg_id %u", head.msg_id);
692 		goto fail_and_unlock_page;
693 	}
694 	ret = hmdfs_sendmessage(con, &msg);
695 	if (ret) {
696 		hmdfs_err("send err sm->device_id, %lld, msg_id %u",
697 			  con->device_id, head.msg_id);
698 		if (!cancel_delayed_work(&async_work->d_work)) {
699 			hmdfs_err("cancel async work err");
700 			asw_put(async_work);
701 			hmdfs_dec_msg_idr_process(con);
702 			goto out;
703 		}
704 		goto fail_and_unlock_page;
705 	}
706 
707 	asw_put(async_work);
708 	hmdfs_dec_msg_idr_process(con);
709 	return 0;
710 
711 fail_and_unlock_page:
712 	asw_put(async_work);
713 	asw_done(async_work);
714 	hmdfs_dec_msg_idr_process(con);
715 	return ret;
716 unlock:
717 	kfree(async_work);
718 	unlock_page(sm->out_buf);
719 out:
720 	return ret;
721 }
722 
hmdfs_request_handle_sync(struct hmdfs_peer *con, struct hmdfs_head_cmd *head, void *buf)723 static void hmdfs_request_handle_sync(struct hmdfs_peer *con,
724 				      struct hmdfs_head_cmd *head, void *buf)
725 {
726 	unsigned long start = jiffies;
727 	const struct cred *saved_cred = hmdfs_override_fsids(true);
728 
729 	if (!saved_cred) {
730 		hmdfs_err("prepare cred failed!");
731 		kfree(buf);
732 		return;
733 	}
734 
735 	s_recv_callbacks[head->operations.command](con, head, buf);
736 	hmdfs_statistic(con->sbi, head->operations.command, jiffies - start);
737 
738 	kfree(buf);
739 
740 	hmdfs_revert_fsids(saved_cred);
741 }
742 
hmdfs_msg_handle_sync(struct hmdfs_peer *con, struct hmdfs_head_cmd *head, void *buf)743 static void hmdfs_msg_handle_sync(struct hmdfs_peer *con,
744 				 struct hmdfs_head_cmd *head, void *buf)
745 {
746 	const struct cred *old_cred = hmdfs_override_creds(con->sbi->cred);
747 
748 	/*
749 	 * Reuse PF_NPROC_EXCEEDED as an indication of hmdfs server context:
750 	 * 1. PF_NPROC_EXCEEDED will set by setreuid()/setuid()/setresuid(),
751 	 *    we assume kwork will not call theses syscalls.
752 	 * 2. PF_NPROC_EXCEEDED will be cleared by execv(), and kworker
753 	 *    will not call it.
754 	 */
755 	current->flags |= PF_NPROC_EXCEEDED;
756 	hmdfs_request_handle_sync(con, head, buf);
757 	current->flags &= ~PF_NPROC_EXCEEDED;
758 
759 	hmdfs_revert_creds(old_cred);
760 }
761 
762 
hmdfs_request_work_fn(struct work_struct *ptr)763 static void hmdfs_request_work_fn(struct work_struct *ptr)
764 {
765 	struct work_handler_desp *desp =
766 		container_of(ptr, struct work_handler_desp, work);
767 
768 	hmdfs_msg_handle_sync(desp->peer, desp->head, desp->buf);
769 	peer_put(desp->peer);
770 	kfree(desp->head);
771 	kfree(desp);
772 }
773 
hmdfs_msg_handle_async(struct hmdfs_peer *con, struct hmdfs_head_cmd *head, void *buf, struct workqueue_struct *wq, void (*work_fn)(struct work_struct *ptr))774 static int hmdfs_msg_handle_async(struct hmdfs_peer *con,
775 				  struct hmdfs_head_cmd *head, void *buf,
776 				  struct workqueue_struct *wq,
777 				  void (*work_fn)(struct work_struct *ptr))
778 {
779 	struct work_handler_desp *desp = NULL;
780 	struct hmdfs_head_cmd *dup_head = NULL;
781 	int ret;
782 
783 	desp = kzalloc(sizeof(*desp), GFP_KERNEL);
784 	if (!desp) {
785 		ret = -ENOMEM;
786 		goto exit_desp;
787 	}
788 
789 	dup_head = kzalloc(sizeof(*dup_head), GFP_KERNEL);
790 	if (!dup_head) {
791 		ret = -ENOMEM;
792 		goto exit_desp;
793 	}
794 
795 	*dup_head = *head;
796 	desp->peer = con;
797 	desp->head = dup_head;
798 	desp->buf = buf;
799 	INIT_WORK(&desp->work, work_fn);
800 
801 	peer_get(con);
802 	queue_work(wq, &desp->work);
803 
804 	ret = 0;
805 	return ret;
806 
807 exit_desp:
808 	kfree(desp);
809 	return ret;
810 }
811 
hmdfs_request_recv(struct hmdfs_peer *con, struct hmdfs_head_cmd *head, void *buf)812 static int hmdfs_request_recv(struct hmdfs_peer *con,
813 			      struct hmdfs_head_cmd *head, void *buf)
814 {
815 	int ret;
816 
817 	if (head->operations.command >= F_SIZE ||
818 	    !s_recv_callbacks[head->operations.command]) {
819 		ret = -EINVAL;
820 		hmdfs_err("NULL callback, command %d",
821 			  head->operations.command);
822 		goto out;
823 	}
824 
825 	switch (head->operations.command) {
826 	case F_OPEN:
827 	case F_RELEASE:
828 	case F_ITERATE:
829 	case F_MKDIR:
830 	case F_RMDIR:
831 	case F_CREATE:
832 	case F_UNLINK:
833 	case F_RENAME:
834 	case F_SETATTR:
835 	case F_STATFS:
836 	case F_CONNECT_REKEY:
837 	case F_DROP_PUSH:
838 	case F_GETATTR:
839 	case F_FSYNC:
840 	case F_SYNCFS:
841 	case F_GETXATTR:
842 	case F_SETXATTR:
843 	case F_LISTXATTR:
844 	case F_ATOMIC_OPEN:
845 		ret = hmdfs_msg_handle_async(con, head, buf, con->req_handle_wq,
846 					     hmdfs_request_work_fn);
847 		break;
848 	case F_WRITEPAGE:
849 	case F_READPAGE:
850 		hmdfs_msg_handle_sync(con, head, buf);
851 		ret = 0;
852 		break;
853 	default:
854 		hmdfs_err("Fatal! Unexpected request command %d",
855 			  head->operations.command);
856 		ret = -EINVAL;
857 	}
858 
859 out:
860 	return ret;
861 }
862 
hmdfs_response_wakeup(struct sendmsg_wait_queue *msg_info, __u32 ret_code, __u32 data_len, void *buf)863 void hmdfs_response_wakeup(struct sendmsg_wait_queue *msg_info,
864 			   __u32 ret_code, __u32 data_len, void *buf)
865 {
866 	msg_info->ret = ret_code;
867 	msg_info->size = data_len;
868 	msg_info->buf = buf;
869 	atomic_set(&msg_info->valid, MSG_Q_END_RECV);
870 	wake_up_interruptible(&msg_info->response_q);
871 }
872 
hmdfs_readfile_slice(struct sendmsg_wait_queue *msg_info, struct work_handler_desp *desp)873 static int hmdfs_readfile_slice(struct sendmsg_wait_queue *msg_info,
874 				struct work_handler_desp *desp)
875 {
876 	struct slice_descriptor *sdesc = desp->buf;
877 	void *slice_buf = sdesc + 1;
878 	struct file_recv_info *recv_info = &msg_info->recv_info;
879 	struct file *filp = recv_info->local_filp;
880 	loff_t offset;
881 	ssize_t written_size;
882 
883 	if (filp == NULL) {
884 		hmdfs_warning("recv_info filp is NULL \n");
885 		return -EINVAL;
886 	}
887 
888 	if (atomic_read(&recv_info->state) != FILE_RECV_PROCESS)
889 		return -EBUSY;
890 
891 	offset = le32_to_cpu(sdesc->slice_size) * le32_to_cpu(sdesc->slice_sn);
892 
893 	written_size = kernel_write(filp, slice_buf,
894 				    le32_to_cpu(sdesc->content_size), &offset);
895 	if (IS_ERR_VALUE(written_size)) {
896 		atomic_set(&recv_info->state, FILE_RECV_ERR_SPC);
897 		hmdfs_info("Fatal! Cannot store a file slice %d/%d, ret = %d",
898 			   le32_to_cpu(sdesc->slice_sn),
899 			   le32_to_cpu(sdesc->num_slices), (int)written_size);
900 		return (int)written_size;
901 	}
902 
903 	if (atomic_inc_return(&recv_info->local_fslices) >=
904 	    le32_to_cpu(sdesc->num_slices))
905 		atomic_set(&recv_info->state, FILE_RECV_SUCC);
906 	return 0;
907 }
908 
hmdfs_file_response_work_fn(struct work_struct *ptr)909 static void hmdfs_file_response_work_fn(struct work_struct *ptr)
910 {
911 	struct work_handler_desp *desp =
912 		container_of(ptr, struct work_handler_desp, work);
913 	struct sendmsg_wait_queue *msg_info = NULL;
914 	int ret;
915 	atomic_t *pstate = NULL;
916 	u8 cmd = desp->head->operations.command;
917 	const struct cred *old_cred =
918 		hmdfs_override_creds(desp->peer->sbi->cred);
919 
920 	msg_info = (struct sendmsg_wait_queue *)hmdfs_find_msg_head(desp->peer,
921 		le32_to_cpu(desp->head->msg_id), desp->head->operations);
922 	if (!msg_info || atomic_read(&msg_info->valid) != MSG_Q_SEND) {
923 		hmdfs_client_resp_statis(desp->peer->sbi, cmd, HMDFS_RESP_DELAY,
924 					 0, 0);
925 		hmdfs_info("cannot find msg(id %d)",
926 			   le32_to_cpu(desp->head->msg_id));
927 		goto free;
928 	}
929 
930 	ret = le32_to_cpu(desp->head->ret_code);
931 	if (ret || le32_to_cpu(desp->head->data_len) == sizeof(*desp->head))
932 		goto wakeup;
933 	ret = hmdfs_readfile_slice(msg_info, desp);
934 	pstate = &msg_info->recv_info.state;
935 	if (ret || atomic_read(pstate) != FILE_RECV_PROCESS)
936 		goto wakeup;
937 	goto free;
938 
939 wakeup:
940 	hmdfs_response_wakeup(msg_info, ret, sizeof(struct hmdfs_head_cmd),
941 			      NULL);
942 	hmdfs_client_resp_statis(desp->peer->sbi, cmd, HMDFS_RESP_NORMAL,
943 				 msg_info->start, jiffies);
944 free:
945 	if (msg_info)
946 		msg_put(msg_info);
947 	peer_put(desp->peer);
948 	hmdfs_revert_creds(old_cred);
949 
950 	kfree(desp->buf);
951 	kfree(desp->head);
952 	kfree(desp);
953 }
954 
hmdfs_wait_mp_wfired(struct hmdfs_msg_parasite *mp)955 static void hmdfs_wait_mp_wfired(struct hmdfs_msg_parasite *mp)
956 {
957 	/* We just cancel queued works */
958 	while (unlikely(!smp_load_acquire(&mp->wfired)))
959 		usleep_range(ACQUIRE_WFIRED_INTVAL_USEC_MIN,
960 			     ACQUIRE_WFIRED_INTVAL_USEC_MAX);
961 }
962 
hmdfs_response_handle_sync(struct hmdfs_peer *con, struct hmdfs_head_cmd *head, void *buf)963 int hmdfs_response_handle_sync(struct hmdfs_peer *con,
964 			       struct hmdfs_head_cmd *head, void *buf)
965 {
966 	struct sendmsg_wait_queue *msg_info = NULL;
967 	struct hmdfs_msg_parasite *mp = NULL;
968 	struct hmdfs_msg_idr_head *msg_head = NULL;
969 	u32 msg_id = le32_to_cpu(head->msg_id);
970 	bool woke = false;
971 	u8 cmd = head->operations.command;
972 
973 	msg_head = hmdfs_find_msg_head(con, msg_id, head->operations);
974 	if (!msg_head)
975 		goto out;
976 
977 	switch (msg_head->type) {
978 	case MSG_IDR_MESSAGE_SYNC:
979 		msg_info = (struct sendmsg_wait_queue *)msg_head;
980 		if (atomic_read(&msg_info->valid) == MSG_Q_SEND) {
981 			hmdfs_response_wakeup(msg_info,
982 					      le32_to_cpu(head->ret_code),
983 					      le32_to_cpu(head->data_len), buf);
984 			hmdfs_client_resp_statis(con->sbi, cmd,
985 						 HMDFS_RESP_NORMAL,
986 						 msg_info->start, jiffies);
987 			woke = true;
988 		}
989 
990 		msg_put(msg_info);
991 		break;
992 	case MSG_IDR_MESSAGE_ASYNC:
993 		mp = (struct hmdfs_msg_parasite *)msg_head;
994 
995 		hmdfs_wait_mp_wfired(mp);
996 		if (cancel_delayed_work(&mp->d_work)) {
997 			mp->resp.out_buf = buf;
998 			mp->resp.out_len =
999 				le32_to_cpu(head->data_len) - sizeof(*head);
1000 			mp->resp.ret_code = le32_to_cpu(head->ret_code);
1001 			queue_delayed_work(con->async_wq, &mp->d_work, 0);
1002 			hmdfs_client_resp_statis(con->sbi, cmd,
1003 						 HMDFS_RESP_NORMAL, mp->start,
1004 						 jiffies);
1005 			woke = true;
1006 		}
1007 		mp_put(mp);
1008 		break;
1009 	default:
1010 		hmdfs_err("receive incorrect msg type %d msg_id %d cmd %d",
1011 			  msg_head->type, msg_id, cmd);
1012 		break;
1013 	}
1014 
1015 	if (likely(woke))
1016 		return 0;
1017 out:
1018 	hmdfs_client_resp_statis(con->sbi, cmd, HMDFS_RESP_DELAY, 0, 0);
1019 	hmdfs_info("cannot find msg_id %d cmd %d", msg_id, cmd);
1020 	return -EINVAL;
1021 }
1022 
hmdfs_response_recv(struct hmdfs_peer *con, struct hmdfs_head_cmd *head, void *buf)1023 static int hmdfs_response_recv(struct hmdfs_peer *con,
1024 			       struct hmdfs_head_cmd *head, void *buf)
1025 {
1026 	__u16 command = head->operations.command;
1027 	int ret;
1028 
1029 	if (command >= F_SIZE) {
1030 		ret = -EINVAL;
1031 		return ret;
1032 	}
1033 
1034 	switch (head->operations.command) {
1035 	case F_OPEN:
1036 	case F_RELEASE:
1037 	case F_READPAGE:
1038 	case F_WRITEPAGE:
1039 	case F_MKDIR:
1040 	case F_RMDIR:
1041 	case F_CREATE:
1042 	case F_UNLINK:
1043 	case F_RENAME:
1044 	case F_SETATTR:
1045 	case F_STATFS:
1046 	case F_CONNECT_REKEY:
1047 	case F_DROP_PUSH:
1048 	case F_GETATTR:
1049 	case F_FSYNC:
1050 	case F_SYNCFS:
1051 	case F_GETXATTR:
1052 	case F_SETXATTR:
1053 	case F_LISTXATTR:
1054 		ret = hmdfs_response_handle_sync(con, head, buf);
1055 		return ret;
1056 
1057 	case F_ITERATE:
1058 		ret = hmdfs_msg_handle_async(con, head, buf, con->async_wq,
1059 					     hmdfs_file_response_work_fn);
1060 		return ret;
1061 
1062 	default:
1063 		hmdfs_err("Fatal! Unexpected response command %d",
1064 			  head->operations.command);
1065 		ret = -EINVAL;
1066 		return ret;
1067 	}
1068 }
1069 
hmdfs_recv_mesg_callback(struct hmdfs_peer *con, void *head, void *buf)1070 void hmdfs_recv_mesg_callback(struct hmdfs_peer *con, void *head,
1071 				     void *buf)
1072 {
1073 	struct hmdfs_head_cmd *hmdfs_head = (struct hmdfs_head_cmd *)head;
1074 
1075 	trace_hmdfs_recv_mesg_callback(hmdfs_head);
1076 
1077 	if (hmdfs_message_verify(con, hmdfs_head, buf) < 0) {
1078 		hmdfs_info("Message %d has been abandoned", hmdfs_head->msg_id);
1079 		goto out_err;
1080 	}
1081 
1082 	switch (hmdfs_head->operations.cmd_flag) {
1083 	case C_REQUEST:
1084 		if (hmdfs_request_recv(con, hmdfs_head, buf) < 0)
1085 			goto out_err;
1086 		break;
1087 
1088 	case C_RESPONSE:
1089 		if (hmdfs_response_recv(con, hmdfs_head, buf) < 0)
1090 			goto out_err;
1091 		break;
1092 
1093 	default:
1094 		hmdfs_err("Fatal! Unexpected msg cmd %d",
1095 			  hmdfs_head->operations.cmd_flag);
1096 		goto out_err;
1097 	}
1098 	return;
1099 
1100 out_err:
1101 	kfree(buf);
1102 }
1103 
hmdfs_wakeup_parasite(struct hmdfs_msg_parasite *mp)1104 void hmdfs_wakeup_parasite(struct hmdfs_msg_parasite *mp)
1105 {
1106 	hmdfs_wait_mp_wfired(mp);
1107 	if (!cancel_delayed_work(&mp->d_work))
1108 		hmdfs_err("cancel parasite work err msg_id=%d cmd=%d",
1109 			  mp->head.msg_id, mp->req.operations.command);
1110 	else
1111 		async_request_cb_on_wakeup_fn(&mp->d_work.work);
1112 }
1113 
hmdfs_wakeup_async_work(struct hmdfs_async_work *async_work)1114 void hmdfs_wakeup_async_work(struct hmdfs_async_work *async_work)
1115 {
1116 	if (!cancel_delayed_work(&async_work->d_work))
1117 		hmdfs_err("cancel async work err msg_id=%d",
1118 			  async_work->head.msg_id);
1119 	else
1120 		hmdfs_recv_page_work_fn(&async_work->d_work.work);
1121 }
1122