1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <fcntl.h>
20
21 #include "loop_systest.h"
22 #include "loop_event.h"
23 #include "le_socket.h"
24 #include "le_task.h"
25 #include "list.h"
26
27 typedef struct {
28 uint16_t tlvLen;
29 uint16_t tlvType;
30 } Tlv;
31
32 typedef struct {
33 uint16_t tlvLen; // 对齐后的长度
34 uint16_t tlvType;
35 uint16_t dataLen; // 数据的长度
36 uint16_t dataType;
37 char tlvName[TLV_NAME_LEN];
38 } TlvExt;
39
40 typedef struct {
41 Message msgHdr;
42 Result result;
43 }ResponseMsg;
44
45 typedef struct ReceiverCtx_ {
46 uint32_t nextMsgId; // 校验消息id
47 uint32_t msgRecvLen; // 已经接收的长度
48 TimerHandle timer; // 测试消息完整
49 MsgNode *incompleteMsg; // 保存不完整的消息
50 } ReceiverCtx;
51
52 typedef struct MyTask_ {
53 TaskHandle stream;
54 int id;
55 ReceiverCtx ctx;
56 } MyTask;
57
58 typedef struct MyService_ {
59 TaskHandle serverTask;
60 struct ListNode head;
61 } MyService;
62
63 typedef struct MsgNode_ {
64 MyTask *task;
65 Message msgHeader;
66 uint32_t tlvCount;
67 uint32_t *tlvOffset;
68 uint8_t *buffer;
69 } MsgNode;
70
71 static MyService g_service = NULL;
72
MakeDirRec(const char *path, mode_t mode, int lastPath)73 int MakeDirRec(const char *path, mode_t mode, int lastPath)
74 {
75 if (path == NULL || *path == '\0') {
76 printf("Invalid path to create \n");
77 return -1;
78 }
79
80 char buffer[PATH_MAX] = {0};
81 const char slash = '/';
82 const char *p = path;
83 char *curPos = strchr(path, slash);
84 while (curPos != NULL) {
85 int len = curPos - p;
86 p = curPos + 1;
87 if (len == 0) {
88 curPos = strchr(p, slash);
89 continue;
90 }
91
92 int ret = memcpy_s(buffer, PATH_MAX, path, p - path - 1);
93 if (ret != 0) {
94 printf("Failed to copy path \n");
95 return -1;
96 }
97
98 ret = mkdir(buffer, mode);
99 if (ret == -1 && errno != EEXIST) {
100 return errno;
101 }
102 curPos = strchr(p, slash);
103 }
104
105 if (lastPath) {
106 if (mkdir(path, mode) == -1 && errno != EEXIST) {
107 return errno;
108 }
109 }
110 return 0;
111 }
112
SetFdCtrl(int fd, int opt)113 static inline void SetFdCtrl(int fd, int opt)
114 {
115 int option = fcntl(fd, F_GETFD);
116 int ret = fcntl(fd, F_SETFD, option | opt);
117 if (ret < 0) {
118 printf("Set fd %d option %d %d result: %d \n", fd, option, opt, errno);
119 }
120 }
121
CreatePipeServer(TaskHandle *server, const char *name)122 static int CreatePipeServer(TaskHandle *server, const char *name)
123 {
124 char path[128] = {0};
125 int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "%s%s", SOCKET_DIR, name);
126 if (ret < 0) {
127 printf("Failed to snprintf_s %d \n", ret);
128 }
129 int socketId = GetControlSocket(name);
130 printf("get socket from env %s socketId %d \n", SOCKET_NAME, socketId);
131
132 LE_StreamInfo info = {};
133 info.baseInfo.flags = TASK_STREAM | TASK_PIPE | TASK_SERVER;
134 info.socketId = socketId;
135 info.server = server;
136 info.baseInfo.close = NULL;
137 info.incommingConnect = OnConnection;
138
139 MakeDirRec(path, DIR_MODE, 0);
140 ret = LE_CreateStreamServer(LE_GetDefaultLoop(), server, &info);
141 if (ret < 0) {
142 printf("Create server failed \n");
143 }
144 SetFdCtrl(LE_GetSocketFd(*server), FD_CLOEXEC);
145
146 printf("CreateServer path %s fd %d \n", path, LE_GetSocketFd(*server));
147 return 0;
148 }
149
CreateTcpServer(TaskHandle *server, const char *name)150 static int CreateTcpServer(TaskHandle *server, const char *name)
151 {
152 char path[128] = {0};
153 int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "%s%s", SOCKET_DIR, name);
154 if (ret < 0) {
155 printf("Failed to snprintf_s %d \n", ret);
156 }
157 int socketId = GetControlSocket(name);
158 printf("get socket from env %s socketId %d \n", SOCKET_NAME, socketId);
159
160 LE_StreamInfo info = {};
161 info.baseInfo.flags = TASK_STREAM | TASK_TCP | TASK_SERVER;
162 info.socketId = socketId;
163 info.server = server;
164 info.baseInfo.close = NULL;
165 info.incommingConnect = OnConnection;
166
167 MakeDirRec(path, DIR_MODE, 0);
168 ret = LE_CreateStreamServer(LE_GetDefaultLoop(), server, &info);
169 SetFdCtrl(LE_GetSocketFd(*server), FD_CLOEXEC);
170
171 printf("CreateServer path %s fd %d \n", path, LE_GetSocketFd(*server));
172 return ret;
173 }
174
StartTimerForCheckMsg(MyTask *task)175 static inline int StartTimerForCheckMsg(MyTask *task)
176 {
177 if (connection->receiverCtx.timer != NULL) {
178 return 0;
179 }
180 int ret = LE_CreateTimer(LE_GetDefaultLoop(), &connection->receiverCtx.timer, WaitMsgCompleteTimeOut, connection);
181 if (ret == 0) {
182 ret = LE_StartTimer(LE_GetDefaultLoop(), connection->receiverCtx.timer, MAX_WAIT_MSG_COMPLETE, 1);
183 }
184 return ret;
185 }
186
SendMessage(LoopHandle loop, TaskHandle task, const char *message)187 static int SendMessage(LoopHandle loop, TaskHandle task, const char *message)
188 {
189 if (message == NULL) {
190 printf("message is null \n");
191 return -1;
192 }
193 BufferHandle handle = NULL;
194 uint32_t bufferSize = strlen(message) + 1;
195 handle = LE_CreateBuffer(loop, bufferSize);
196 char *buff = (char *)LE_GetBufferInfo(handle, NULL, &bufferSize);
197 if (buff == NULL) {
198 printf("Failed get buffer info \n");
199 return -1;
200 }
201
202 int ret = memcpy_s(buff, bufferSize, message, strlen(message) + 1);
203 if (ret != 0) {
204 LE_FreeBuffer(loop, task, handle);
205 printf("Failed memcpy_s err=%d \n", errno);
206 return -1;
207 }
208
209 LE_STATUS status = LE_Send(loop, task, handle, strlen(message) + 1);
210 if (status != LE_SUCCESS) {
211 printf("Failed le_send msg \n");
212 return -1;
213 }
214 return 0;
215 }
216
OnConnection(const LoopHandle loop, const TaskHandle server)217 static int OnConnection(const LoopHandle loop, const TaskHandle server)
218 {
219 TaskHandle stream = NULL;
220 LE_StreamInfo info = {};
221 info.baseInfo.flags = flags;
222 info.baseInfo.close = OnClose;
223 info.baseInfo.userDataSize = sizeof(MyTask);
224 info.disConnectComplete = OnDisConnect;
225 info.sendMessageComplete = sendMessageComplete;
226 info.recvMessage = OnRecvMessage;
227
228 int ret = LE_AcceptStreamClient(loop, server, &stream, &info);
229 if (ret != 0) {
230 printf("Failed to accept stream \n");
231 }
232
233 MyTask *agent = (MyTask *)LE_GetUserData(stream);
234 // 收到数据后的处理
235 static uint32_t connectionId = 0;
236 agent->id = ++connectionId;
237 agent->stream = stream;
238 agent->ctx.timer = NULL;
239 agent->incompleteMsg = NULL;
240
241 printf("accept id: %d\n", agent->id);
242 ret = SendMessage(loop, agent->stream, "Connect Success.");
243 if (ret != 0) {
244 printf("Failed to send msg \n");
245 return -1;
246 }
247
248 return 0;
249 }
250
OnClose(const TaskHandle taskHandle)251 static void OnClose(const TaskHandle taskHandle)
252 {
253 MyTask *task = (MyTask *)LE_GetUserData(taskHandle);
254 if (task == NULL) {
255 printf("Invalid task \n");
256 return;
257 }
258 }
259
OnDisConnect(const TaskHandle taskHandle)260 static void OnDisConnect(const TaskHandle taskHandle)
261 {
262 MyTask *task = (MyTask *)LE_GetUserData(taskHandle);
263 if (task == NULL) {
264 printf("Invalid task \n");
265 return;
266 }
267 printf("task id: %d\n", task->id);
268 OnClose(taskHandle);
269 }
270
SendMessageComplete(const TaskHandle taskHandle, BufferHandle handle)271 static void SendMessageComplete(const TaskHandle taskHandle, BufferHandle handle)
272 {
273 MyTask *task = (MyTask *)LE_GetUserData(taskHandle);
274 if (task == NULL) {
275 printf("Invalid task \n");
276 return;
277 }
278 uint32_t bufferSize = sizeof(Message);
279 Message *msg = (Message *)LE_GetBufferInfo(handle, NULL, &bufferSize);
280 if (msg == NULL) {
281 return;
282 }
283 printf("SendMessageComplete taskId: %u msgId %u msgType %u buf %s",
284 task->id, msg->msgId, msg->msgType, msg->buffer);
285 }
286
ServerInit(const char *server, LoopHandle loop, int flags)287 void ServerInit(const char *server, LoopHandle loop, int flags)
288 {
289 if (server == NULL || loop == NULL) {
290 printf("Invalid parameter\n");
291 return;
292 }
293
294 OH_ListInit(&g_service.head);
295 LE_StreamServerInfo info = {};
296 info.baseInfo.flags = flags;
297 info.baseInfo.close = NULL;
298 info.socketId = -1;
299 info.server = server;
300 info.disConnectComplete = NULL;
301 info.incomingConnect = OnConnection;
302 info.sendMessageComplete = NULL;
303 info.recvMessage = NULL;
304
305 int ret = LE_CreateStreamServer(loop, &g_service.serverTask, &info);
306 if (ret != 0) {
307 printf("Init server failed.\n");
308 }
309 }
310
MsgRebuild(MsgNode *message, const Message *msg)311 static int MsgRebuild(MsgNode *message, const Message *msg)
312 {
313 if (CheckMsg(&message->msgHeader) != 0) {
314 return MSG_INVALID;
315 }
316 if (msg->msgLen == sizeof(message->msgHeader)) { // only has msg header
317 return 0;
318 }
319 if (message->buffer == NULL) {
320 message->buffer = calloc(1, msg->msgLen - sizeof(message->msgHeader));
321 if (message->buffer == NULL) {
322 printf("Failed to alloc memory for recv message \n");
323 return -1;
324 }
325 }
326 if (message->tlvOffset == NULL) {
327 uint32_t totalCount = msg->tlvCount + TLV_MAX;
328 message->tlvOffset = malloc(totalCount * sizeof(uint32_t));
329 if (message->tlvOffset == NULL) {
330 printf("Failed to alloc memory for recv message \n");
331 return -1;
332 }
333 for (uint32_t i = 0; i < totalCount; i++) {
334 message->tlvOffset[i] = INVALID_OFFSET;
335 }
336 }
337 return 0;
338 }
339
GetMsgFromBuffer(const uint8_t *buffer, uint32_t bufferLen, Message **outMsg, uint32_t *msgRecvLen, uint32_t *reminder)340 int GetMsgFromBuffer(const uint8_t *buffer, uint32_t bufferLen,
341 Message **outMsg, uint32_t *msgRecvLen, uint32_t *reminder)
342 {
343 if (buffer == NULL || outMsg == NULL || msgRecvLen == NULL || reminder == NULL) {
344 return MSG_INVALID;
345 }
346
347 *reminder = 0;
348 Message *message = *outMsg;
349 if (message == NULL) {
350 message = CreateMessage();
351 if (message == NULL) {
352 printf("Failed to create message \n");
353 return SYSTEM_ERROR;
354 }
355 *outMsg = message;
356 }
357
358 uint32_t reminderLen = bufferLen;
359 const uint8_t *reminderBuffer = buffer;
360 if (*msgRecvLen < sizeof(Message)) { // recv partial message
361 if ((bufferLen + *msgRecvLen) >= sizeof(Message)) {
362 int ret = memcpy_s(((uint8_t *)&message->msgHeader) + *msgRecvLen,
363 sizeof(message->msgHeader) - *msgRecvLen,
364 buffer, sizeof(message->msgHeader) - *msgRecvLen);
365 if (ret != 0) {
366 printf("Failed to copy recv buffer \n");
367 return -1;
368 }
369
370 ret = MsgRebuild(message, &message->msgHeader);
371 if (ret != 0) {
372 printf("Failed to alloc buffer for receive msg \n");
373 return -1;
374 }
375 reminderLen = bufferLen - (sizeof(message->msgHeader) - *msgRecvLen);
376 reminderBuffer = buffer + sizeof(message->msgHeader) - *msgRecvLen;
377 *msgRecvLen = sizeof(message->msgHeader);
378 } else {
379 int ret = memcpy_s(((uint8_t *)&message->msgHeader) + *msgRecvLen,
380 sizeof(message->msgHeader) - *msgRecvLen, buffer, bufferLen);
381 if (ret != 0) {
382 printf("Failed to copy recv buffer \n");
383 return -1;
384 }
385 *msgRecvLen += bufferLen;
386 return 0;
387 }
388 }
389 return 0;
390 }
391
DecodeMsg(Message * message)392 int DecodeMsg(Message * message)
393 {
394 if (message == NULL) {
395 printf("decode empty message, failed! \n");
396 return -1;
397 }
398 /*
399 写解析消息的逻辑
400 */
401 return 0;
402 }
403
ProcessTerminationStatusMsg(const MsgNode *message, Result *result)404 int ProcessTerminationStatusMsg(const MsgNode *message, Result *result)
405 {
406 if (message == NULL || result == NULL) {
407 return -1;
408 }
409
410 result->result = -1;
411 result->pid = 0;
412 return 0;
413 }
414
SendResponse(const MyTask *task, const Message *msg, int result, pid_t pid)415 static int SendResponse(const MyTask *task, const Message *msg, int result, pid_t pid)
416 {
417 printf("SendResponse connectionId: %u result: 0x%x pid: %d \n",
418 task->id, result, pid);
419 uint32_t bufferSize = sizeof(ResponseMsg);
420 BufferHandle handle = LE_CreateBuffer(LE_GetDefaultLoop(), bufferSize);
421 ResponseMsg *buffer = (ResponseMsg *)LE_GetBufferInfo(handle, NULL, &bufferSize);
422 int ret = memcpy_s(buffer, bufferSize, msg, sizeof(Message));
423 if (ret != 0) {
424 LE_FreeBuffer(LE_GetDefaultLoop(), NULL, handle);
425 printf("Failed to memcpy_s bufferSize \n");
426 return -1;
427 }
428
429 buffer->result.result = result;
430 buffer->result.pid = pid;
431 return LE_Send(LE_GetDefaultLoop(), task->stream, handle, bufferSize);
432 }
433
DeleteMsg(MsgNode *msgNode)434 void DeleteMsg(MsgNode *msgNode)
435 {
436 if (msgNode == NULL) {
437 return;
438 }
439 if (msgNode->buffer) {
440 free(msgNode->buffer);
441 msgNode->buffer = NULL;
442 }
443 if (msgNode->tlvOffset) {
444 free(msgNode->tlvOffset);
445 msgNode->tlvOffset = NULL;
446 }
447 free(msgNode);
448 }
449
CheckMsg(const MsgNode *message)450 int CheckMsg(const MsgNode *message)
451 {
452 if (message == NULL) {
453 return MSG_INVALID;
454 }
455 if (strlen(message->msgHeader.processName) <= 0) {
456 printf("Invalid property processName %s \n", message->msgHeader.buffer);
457 return MSG_INVALID;
458 }
459 if (message->tlvOffset == NULL) {
460 printf("Invalid property tlv offset %s \n", message->msgHeader.buffer);
461 return MSG_INVALID;
462 }
463 if (message->buffer == NULL) {
464 printf("Invalid property buffer %s \n", message->msgHeader.buffer);
465 return MSG_INVALID;
466 }
467
468 if (message->tlvOffset[TLV_BUNDLE_INFO] == INVALID_OFFSET ||
469 message->tlvOffset[TLV_MSG_FLAGS] == INVALID_OFFSET ||
470 message->tlvOffset[TLV_ACCESS_TOKEN_INFO] == INVALID_OFFSET ||
471 message->tlvOffset[TLV_DOMAIN_INFO] == INVALID_OFFSET ||
472 message->tlvOffset[TLV_DAC_INFO] == INVALID_OFFSET) {
473 printf("No must tlv bundle: %u flags: %u token: %u domain %u %u \n",
474 message->tlvOffset[TLV_BUNDLE_INFO], message->tlvOffset[TLV_MSG_FLAGS],
475 message->tlvOffset[TLV_ACCESS_TOKEN_INFO],
476 message->tlvOffset[TLV_DOMAIN_INFO], message->tlvOffset[TLV_DAC_INFO]);
477 return MSG_INVALID;
478 }
479
480 return 0;
481 }
482
ProcessReqMsg(MyTask *task, MsgNode *message)483 static void ProcessReqMsg(MyTask *task, MsgNode *message)
484 {
485 int ret = CheckMsg(message);
486 if (ret != 0) {
487 SendResponse(task, &message->msgHeader, ret, 0);
488 DeleteMsg(message);
489 return;
490 }
491
492 message->task = task;
493 }
494
GetMsgExtInfo(const MsgNode *message, const char *name, uint32_t *len)495 void *GetMsgExtInfo(const MsgNode *message, const char *name, uint32_t *len)
496 {
497 if (name == NULL) {
498 printf("Invalid name \n");
499 return NULL;
500 }
501 if (message == NULL || message->buffer == NULL || message->tlvOffset == NULL) {
502 return NULL;
503 }
504 printf("GetMsgExtInfo tlvCount %d name %s \n", message->tlvCount, name);
505
506 for (uint32_t index = TLV_MAX; index < (TLV_MAX + message->tlvCount); index++) {
507 if (message->tlvOffset[index] == INVALID_OFFSET) {
508 return NULL;
509 }
510 uint8_t *data = message->buffer + message->tlvOffset[index];
511 if (((Tlv *)data)->tlvType != TLV_MAX) {
512 continue;
513 }
514 TlvExt *tlv = (TlvExt *)data;
515 if (strcmp(tlv->tlvName, name) != 0) {
516 continue;
517 }
518 if (len != NULL) {
519 *len = tlv->dataLen;
520 }
521 return data + sizeof(TlvExt);
522 }
523 return NULL;
524 }
525
RebuildMsgNode(MsgNode *message, Process *info)526 MsgNode *RebuildMsgNode(MsgNode *message, Process *info)
527 {
528 #ifdef DEBUG_BEGETCTL_BOOT
529 if (message == NULL || info == NULL) {
530 printf("params is null \n");
531 return NULL;
532 }
533
534 uint32_t bufferLen = 0;
535 MsgNode *node = CreateMsg();
536 if (node == NULL) {
537 printf("Failed to create MsgNode \n");
538 return NULL;
539 }
540
541 int ret = memcpy_s(&node->msgHeader, sizeof(Message), &message->msgHeader, sizeof(Message));
542 if (ret != 0) {
543 printf("Failed to memcpy_s node->msgHeader \n");
544 return NULL;
545 }
546
547 bufferLen = message->msgHeader.msgLen + info->message->msgHeader.msgLen - sizeof(Message);
548 node->msgHeader.msgLen = bufferLen;
549 node->msgHeader.msgType = MSG_NATIVE_PROCESS;
550 node->msgHeader.tlvCount += message->msgHeader.tlvCount;
551 ret = MsgRebuild(node, &node->msgHeader);
552 if (ret != 0) {
553 DeleteMsg(node);
554 printf("Failed to alloc memory for recv message \n");
555 return NULL;
556 }
557
558 uint32_t infoBufLen = info->message->msgHeader.msgLen - sizeof(Message);
559 uint32_t msgBufLen = message->msgHeader.msgLen - sizeof(Message);
560 ret = memcpy_s(node->buffer, bufferLen, info->message->buffer, infoBufLen);
561 if (ret != 0) {
562 DeleteMsg(node);
563 printf("Failed to memcpy_s info buffer \n");
564 return NULL;
565 }
566 ret = memcpy_s(node->buffer + infoBufLen, bufferLen - infoBufLen, message->buffer, msgBufLen);
567 if (ret != 0) {
568 DeleteMsg(node);
569 printf("Failed to memcpy_s message->buffer \n");
570 return NULL;
571 }
572 return node;
573 #endif
574 return NULL;
575 }
576
ProcessBegetctlMsg(MyTask *task, MsgNode *message)577 static MsgNode *ProcessBegetctlMsg(MyTask *task, MsgNode *message)
578 {
579 uint32_t len = 0;
580 const char *msg = (const char *)GetMsgExtInfo(message, MSG_EXT_NAME_BEGET_PID, &len);
581 if (msg == NULL) {
582 printf("Failed to get extInfo \n");
583 return NULL;
584 }
585
586 MsgNode *msgNode = RebuildMsgNode(message, info);
587 if (msgNode == NULL) {
588 printf("Failed to rebuild message node \n");
589 return NULL;
590 }
591
592 int ret = DecodeMsg(msgNode);
593 if (ret != 0) {
594 DeletenMsg(msgNode);
595 return NULL;
596 }
597 return msgNode;
598 }
599
ProcessBegetMsg(MyTask *task, MsgNode *message)600 static void ProcessBegetMsg(MyTask *task, MsgNode *message)
601 {
602 Message *msg = &message->msgHeader;
603
604 MsgNode *msgNode = ProcessBegetctlMsg(connection, message);
605 if (msgNode == NULL) {
606 SendResponse(task, msg, DEBUG_MODE_NOT_SUPPORT, 0);
607 DeleteMsg(message);
608 return;
609 }
610 ProcessReqMsg(task, msgNode);
611 DeleteMsg(message);
612 DeleteMsg(msgNode);
613 }
614
ProcessRecvMsg(MyTask *task, MsgNode *message)615 static void ProcessRecvMsg(MyTask *task, MsgNode *message)
616 {
617 Message *msg = &message->msgHeader;
618 printf("Recv message header magic 0x%x type %u id %u len %u %s \n",
619 msg->magic, msg->msgType, msg->msgId, msg->msgLen, msg->buffer);
620 if (task->receiverCtx.nextMsgId != msg->msgId) {
621 printf("Invalid msg id %u %u \n", task->receiverCtx.nextMsgId, msg->msgId);
622 }
623 task->receiverCtx.nextMsgId++;
624
625 int ret;
626 switch (msg->msgType) {
627 case MSG_GET_RENDER_TERMINATION_STATUS: { // get status
628 Result result = {0};
629 ret = ProcessTerminationStatusMsg(message, &result);
630 SendResponse(task, msg, ret == 0 ? result.result : ret, result.pid);
631 DeleteMessage(message);
632 break;
633 }
634 case MSG_NORMAL: {
635 ProcessReqMsg(task, message);
636 break;
637 }
638 case MSG_DUMP:
639 SendResponse(task, msg, 0, 0);
640 DeleteMessage(message);
641 break;
642 case MSG_BEGET: {
643 ProcessBegetMsg(task, message);
644 break;
645 }
646 case MSG_BEGET_TIME:
647 SendResponse(task, msg, MIN_TIME, MAX_TIME);
648 DeleteMessage(message);
649 break;
650 case MSG_UPDATE_MOUNT_POINTS:
651 ret = ProcessRemountMsg(task, message);
652 SendResponse(task, msg, ret, 0);
653 break;
654 case MSG_RESTART:
655 ProcessRestartMsg(task, message);
656 break;
657 default:
658 SendResponse(task, msg, MSG_INVALID, 0);
659 DeleteMessage(message);
660 break;
661 }
662 }
663
OnReceiveRequest(const TaskHandle taskHandle, const uint8_t *buffer, uint32_t buffLen)664 static void OnReceiveRequest(const TaskHandle taskHandle, const uint8_t *buffer, uint32_t buffLen)
665 {
666 MyTask *task = (MyTask *)LE_GetUserData(taskHandle);
667 if (task == NULL) {
668 printf("Failed to get client form socket\n");
669 LE_CloseTask(LE_GetDefaultLoop(), taskHandle);
670 return;
671 }
672
673 if (buffLen >= MAX_MSG_TOTAL_LENGTH) {
674 printf("Message too long %u \n", buffLen);
675 LE_CloseTask(LE_GetDefaultLoop(), taskHandle);
676 return;
677 }
678
679 uint32_t reminder = 0;
680 uint32_t currLen = 0;
681 Message *message = task->ctx.incompleteMsg; // incomplete msg
682 task->ctx.incompleteMsg = NULL;
683 int ret = 0;
684 do {
685 printf("OnReceiveRequest connectionId: %u start: 0x%x buffLen %d",
686 task->id, *(uint32_t *)(buffer + currLen), buffLen - currLen);
687
688 ret = GetMsgFromBuffer(buffer + currLen, buffLen - currLen,
689 &message, &task->ctx.msgRecvLen, &reminder);
690 if (ret != 0) {
691 break;
692 }
693
694 if (task->ctx.msgRecvLen != message->msgLen) { // recv complete msg
695 task->ctx.incompleteMsg = message;
696 message = NULL;
697 break;
698 }
699 task->ctx.msgRecvLen = 0;
700 if (task->ctx.timer) {
701 LE_StopTimer(LE_GetDefaultLoop(), task->ctx.timer);
702 task->ctx.timer = NULL;
703 }
704 // decode msg
705 ret = DecodeMsg(message);
706 if (ret != 0) {
707 break;
708 }
709 (void)ProcessRecvMsg(task, message);
710 message = NULL;
711 currLen += buffLen - reminder;
712 } while (reminder > 0);
713
714 if (task->ctx.incompleteMsg != NULL) { // Start the detection timer
715 ret = StartTimerForCheckMsg(task);
716 if (ret != 0) {
717 LE_CloseStreamTask(LE_GetDefaultLoop(), taskHandle);
718 }
719 }
720 }
721
main(int argc, char *const argv[])722 int main(int argc, char *const argv[])
723 {
724 printf("main argc: %d \n", argc);
725 if (argc <= 0) {
726 return 0;
727 }
728
729 printf("请输入创建socket的类型:(pipe, tcp)\n");
730 char type[128];
731 int ret = scanf_s("%s", type, sizeof(type));
732 if (ret <= 0) {
733 printf("input error \n");
734 return 0;
735 }
736
737 int flags;
738 char *server;
739 if (strcmp(type, "pipe") == 0) {
740 flags = TASK_STREAM | TASK_PIPE |TASK_SERVER | TASK_TEST;
741 server = (char *)"/data/testpipe";
742 } else if (strcmp(type, "tcp") == 0) {
743 flags = TASK_STREAM | TASK_TCP |TASK_SERVER | TASK_TEST;
744 server = (char *)"127.0.0.1:7777";
745 } else {
746 printf("输入有误,请输入pipe或者tcp!");
747 system("pause");
748 return 0;
749 }
750
751 return 0;
752 }