1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <fcntl.h>
20 
21 #include "loop_systest.h"
22 #include "loop_event.h"
23 #include "le_socket.h"
24 #include "le_task.h"
25 #include "list.h"
26 
27 typedef struct {
28     uint16_t tlvLen;
29     uint16_t tlvType;
30 } Tlv;
31 
32 typedef struct {
33     uint16_t tlvLen;  // 对齐后的长度
34     uint16_t tlvType;
35     uint16_t dataLen;  // 数据的长度
36     uint16_t dataType;
37     char tlvName[TLV_NAME_LEN];
38 } TlvExt;
39 
40 typedef struct {
41     Message msgHdr;
42     Result result;
43 }ResponseMsg;
44 
45 typedef struct ReceiverCtx_ {
46     uint32_t nextMsgId;              // 校验消息id
47     uint32_t msgRecvLen;             // 已经接收的长度
48     TimerHandle timer;               // 测试消息完整
49     MsgNode *incompleteMsg;          // 保存不完整的消息
50 } ReceiverCtx;
51 
52 typedef struct MyTask_ {
53     TaskHandle stream;
54     int id;
55     ReceiverCtx ctx;
56 } MyTask;
57 
58 typedef struct MyService_ {
59     TaskHandle serverTask;
60     struct ListNode head;
61 } MyService;
62 
63 typedef struct MsgNode_ {
64     MyTask *task;
65     Message msgHeader;
66     uint32_t tlvCount;
67     uint32_t *tlvOffset;
68     uint8_t *buffer;
69 } MsgNode;
70 
71 static MyService g_service = NULL;
72 
MakeDirRec(const char *path, mode_t mode, int lastPath)73 int MakeDirRec(const char *path, mode_t mode, int lastPath)
74 {
75     if (path == NULL || *path == '\0') {
76         printf("Invalid path to create \n");
77         return -1;
78     }
79 
80     char buffer[PATH_MAX] = {0};
81     const char slash = '/';
82     const char *p = path;
83     char *curPos = strchr(path, slash);
84     while (curPos != NULL) {
85         int len = curPos - p;
86         p = curPos + 1;
87         if (len == 0) {
88             curPos = strchr(p, slash);
89             continue;
90         }
91 
92         int ret = memcpy_s(buffer, PATH_MAX, path, p - path - 1);
93         if (ret != 0) {
94             printf("Failed to copy path \n");
95             return -1;
96         }
97 
98         ret = mkdir(buffer, mode);
99         if (ret == -1 && errno != EEXIST) {
100             return errno;
101         }
102         curPos = strchr(p, slash);
103     }
104 
105     if (lastPath) {
106         if (mkdir(path, mode) == -1 && errno != EEXIST) {
107             return errno;
108         }
109     }
110     return 0;
111 }
112 
SetFdCtrl(int fd, int opt)113 static inline void SetFdCtrl(int fd, int opt)
114 {
115     int option = fcntl(fd, F_GETFD);
116     int ret = fcntl(fd, F_SETFD, option | opt);
117     if (ret < 0) {
118         printf("Set fd %d option %d %d result: %d \n", fd, option, opt, errno);
119     }
120 }
121 
CreatePipeServer(TaskHandle *server, const char *name)122 static int CreatePipeServer(TaskHandle *server, const char *name)
123 {
124     char path[128] = {0};
125     int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "%s%s", SOCKET_DIR, name);
126     if (ret < 0) {
127         printf("Failed to snprintf_s %d \n", ret);
128     }
129     int socketId = GetControlSocket(name);
130     printf("get socket from env %s socketId %d \n", SOCKET_NAME, socketId);
131 
132     LE_StreamInfo info = {};
133     info.baseInfo.flags = TASK_STREAM | TASK_PIPE | TASK_SERVER;
134     info.socketId = socketId;
135     info.server = server;
136     info.baseInfo.close = NULL;
137     info.incommingConnect = OnConnection;
138 
139     MakeDirRec(path, DIR_MODE, 0);
140     ret = LE_CreateStreamServer(LE_GetDefaultLoop(), server, &info);
141     if (ret < 0) {
142         printf("Create server failed \n");
143     }
144     SetFdCtrl(LE_GetSocketFd(*server), FD_CLOEXEC);
145 
146     printf("CreateServer path %s fd %d \n", path, LE_GetSocketFd(*server));
147     return 0;
148 }
149 
CreateTcpServer(TaskHandle *server, const char *name)150 static int CreateTcpServer(TaskHandle *server, const char *name)
151 {
152     char path[128] = {0};
153     int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "%s%s", SOCKET_DIR, name);
154     if (ret < 0) {
155         printf("Failed to snprintf_s %d \n", ret);
156     }
157     int socketId = GetControlSocket(name);
158     printf("get socket from env %s socketId %d \n", SOCKET_NAME, socketId);
159 
160     LE_StreamInfo info = {};
161     info.baseInfo.flags = TASK_STREAM | TASK_TCP | TASK_SERVER;
162     info.socketId = socketId;
163     info.server = server;
164     info.baseInfo.close = NULL;
165     info.incommingConnect = OnConnection;
166 
167     MakeDirRec(path, DIR_MODE, 0);
168     ret = LE_CreateStreamServer(LE_GetDefaultLoop(), server, &info);
169     SetFdCtrl(LE_GetSocketFd(*server), FD_CLOEXEC);
170 
171     printf("CreateServer path %s fd %d \n", path, LE_GetSocketFd(*server));
172     return ret;
173 }
174 
StartTimerForCheckMsg(MyTask *task)175 static inline int StartTimerForCheckMsg(MyTask *task)
176 {
177     if (connection->receiverCtx.timer != NULL) {
178         return 0;
179     }
180     int ret = LE_CreateTimer(LE_GetDefaultLoop(), &connection->receiverCtx.timer, WaitMsgCompleteTimeOut, connection);
181     if (ret == 0) {
182         ret = LE_StartTimer(LE_GetDefaultLoop(), connection->receiverCtx.timer, MAX_WAIT_MSG_COMPLETE, 1);
183     }
184     return ret;
185 }
186 
SendMessage(LoopHandle loop, TaskHandle task, const char *message)187 static int SendMessage(LoopHandle loop, TaskHandle task, const char *message)
188 {
189     if (message == NULL) {
190         printf("message is null \n");
191         return -1;
192     }
193     BufferHandle handle = NULL;
194     uint32_t bufferSize = strlen(message) + 1;
195     handle = LE_CreateBuffer(loop, bufferSize);
196     char *buff = (char *)LE_GetBufferInfo(handle, NULL, &bufferSize);
197     if (buff == NULL) {
198         printf("Failed get buffer info \n");
199         return -1;
200     }
201 
202     int ret = memcpy_s(buff, bufferSize, message, strlen(message) + 1);
203     if (ret != 0) {
204         LE_FreeBuffer(loop, task, handle);
205         printf("Failed memcpy_s err=%d \n", errno);
206         return -1;
207     }
208 
209     LE_STATUS status = LE_Send(loop, task, handle, strlen(message) + 1);
210     if (status != LE_SUCCESS) {
211         printf("Failed le_send msg \n");
212         return -1;
213     }
214     return 0;
215 }
216 
OnConnection(const LoopHandle loop, const TaskHandle server)217 static int OnConnection(const LoopHandle loop, const TaskHandle server)
218 {
219     TaskHandle stream = NULL;
220     LE_StreamInfo info = {};
221     info.baseInfo.flags = flags;
222     info.baseInfo.close = OnClose;
223     info.baseInfo.userDataSize = sizeof(MyTask);
224     info.disConnectComplete = OnDisConnect;
225     info.sendMessageComplete = sendMessageComplete;
226     info.recvMessage = OnRecvMessage;
227 
228     int ret = LE_AcceptStreamClient(loop, server, &stream, &info);
229     if (ret != 0) {
230         printf("Failed to accept stream \n");
231     }
232 
233     MyTask *agent = (MyTask *)LE_GetUserData(stream);
234     // 收到数据后的处理
235     static uint32_t connectionId = 0;
236     agent->id = ++connectionId;
237     agent->stream = stream;
238     agent->ctx.timer = NULL;
239     agent->incompleteMsg = NULL;
240 
241     printf("accept id: %d\n", agent->id);
242     ret = SendMessage(loop, agent->stream, "Connect Success.");
243     if (ret != 0) {
244         printf("Failed to send msg \n");
245         return -1;
246     }
247 
248     return 0;
249 }
250 
OnClose(const TaskHandle taskHandle)251 static void OnClose(const TaskHandle taskHandle)
252 {
253     MyTask *task = (MyTask *)LE_GetUserData(taskHandle);
254     if (task == NULL) {
255         printf("Invalid task \n");
256         return;
257     }
258 }
259 
OnDisConnect(const TaskHandle taskHandle)260 static void OnDisConnect(const TaskHandle taskHandle)
261 {
262     MyTask *task = (MyTask *)LE_GetUserData(taskHandle);
263     if (task == NULL) {
264         printf("Invalid task \n");
265         return;
266     }
267     printf("task id: %d\n", task->id);
268     OnClose(taskHandle);
269 }
270 
SendMessageComplete(const TaskHandle taskHandle, BufferHandle handle)271 static void SendMessageComplete(const TaskHandle taskHandle, BufferHandle handle)
272 {
273     MyTask *task = (MyTask *)LE_GetUserData(taskHandle);
274     if (task == NULL) {
275         printf("Invalid task \n");
276         return;
277     }
278     uint32_t bufferSize = sizeof(Message);
279     Message *msg = (Message *)LE_GetBufferInfo(handle, NULL, &bufferSize);
280     if (msg == NULL) {
281         return;
282     }
283     printf("SendMessageComplete taskId: %u msgId %u msgType %u buf %s",
284         task->id, msg->msgId, msg->msgType, msg->buffer);
285 }
286 
ServerInit(const char *server, LoopHandle loop, int flags)287 void ServerInit(const char *server, LoopHandle loop, int flags)
288 {
289     if (server == NULL || loop == NULL) {
290         printf("Invalid parameter\n");
291         return;
292     }
293 
294     OH_ListInit(&g_service.head);
295     LE_StreamServerInfo info = {};
296     info.baseInfo.flags = flags;
297     info.baseInfo.close = NULL;
298     info.socketId = -1;
299     info.server = server;
300     info.disConnectComplete = NULL;
301     info.incomingConnect = OnConnection;
302     info.sendMessageComplete = NULL;
303     info.recvMessage = NULL;
304 
305     int ret = LE_CreateStreamServer(loop, &g_service.serverTask, &info);
306     if (ret != 0) {
307         printf("Init server failed.\n");
308     }
309 }
310 
MsgRebuild(MsgNode *message, const Message *msg)311 static int MsgRebuild(MsgNode *message, const Message *msg)
312 {
313     if (CheckMsg(&message->msgHeader) != 0) {
314         return MSG_INVALID;
315     }
316     if (msg->msgLen == sizeof(message->msgHeader)) {  // only has msg header
317         return 0;
318     }
319     if (message->buffer == NULL) {
320         message->buffer = calloc(1, msg->msgLen - sizeof(message->msgHeader));
321         if (message->buffer == NULL) {
322             printf("Failed to alloc memory for recv message \n");
323             return -1;
324         }
325     }
326     if (message->tlvOffset == NULL) {
327         uint32_t totalCount = msg->tlvCount + TLV_MAX;
328         message->tlvOffset = malloc(totalCount * sizeof(uint32_t));
329         if (message->tlvOffset == NULL) {
330             printf("Failed to alloc memory for recv message \n");
331             return -1;
332         }
333         for (uint32_t i = 0; i < totalCount; i++) {
334             message->tlvOffset[i] = INVALID_OFFSET;
335         }
336     }
337     return 0;
338 }
339 
GetMsgFromBuffer(const uint8_t *buffer, uint32_t bufferLen, Message **outMsg, uint32_t *msgRecvLen, uint32_t *reminder)340 int GetMsgFromBuffer(const uint8_t *buffer, uint32_t bufferLen,
341     Message **outMsg, uint32_t *msgRecvLen, uint32_t *reminder)
342 {
343     if (buffer == NULL || outMsg == NULL || msgRecvLen == NULL || reminder == NULL) {
344         return MSG_INVALID;
345     }
346 
347     *reminder = 0;
348     Message *message = *outMsg;
349     if (message == NULL) {
350         message = CreateMessage();
351         if (message == NULL) {
352             printf("Failed to create message \n");
353             return SYSTEM_ERROR;
354         }
355         *outMsg = message;
356     }
357 
358     uint32_t reminderLen = bufferLen;
359     const uint8_t *reminderBuffer = buffer;
360     if (*msgRecvLen < sizeof(Message)) {  // recv partial message
361         if ((bufferLen + *msgRecvLen) >= sizeof(Message)) {
362             int ret = memcpy_s(((uint8_t *)&message->msgHeader) + *msgRecvLen,
363                 sizeof(message->msgHeader) - *msgRecvLen,
364                 buffer, sizeof(message->msgHeader) - *msgRecvLen);
365             if (ret != 0) {
366                 printf("Failed to copy recv buffer \n");
367                 return -1;
368             }
369 
370             ret = MsgRebuild(message, &message->msgHeader);
371             if (ret != 0) {
372                 printf("Failed to alloc buffer for receive msg \n");
373                 return -1;
374             }
375             reminderLen = bufferLen - (sizeof(message->msgHeader) - *msgRecvLen);
376             reminderBuffer = buffer + sizeof(message->msgHeader) - *msgRecvLen;
377             *msgRecvLen = sizeof(message->msgHeader);
378         } else {
379             int ret = memcpy_s(((uint8_t *)&message->msgHeader) + *msgRecvLen,
380                 sizeof(message->msgHeader) - *msgRecvLen, buffer, bufferLen);
381             if (ret != 0) {
382                 printf("Failed to copy recv buffer \n");
383                 return -1;
384             }
385             *msgRecvLen += bufferLen;
386             return 0;
387         }
388     }
389     return 0;
390 }
391 
DecodeMsg(Message * message)392 int DecodeMsg(Message * message)
393 {
394     if (message == NULL) {
395         printf("decode empty message, failed! \n");
396         return -1;
397     }
398     /*
399         写解析消息的逻辑
400     */
401     return 0;
402 }
403 
ProcessTerminationStatusMsg(const MsgNode *message, Result *result)404 int ProcessTerminationStatusMsg(const MsgNode *message, Result *result)
405 {
406     if (message == NULL || result == NULL) {
407         return -1;
408     }
409 
410     result->result = -1;
411     result->pid = 0;
412     return 0;
413 }
414 
SendResponse(const MyTask *task, const Message *msg, int result, pid_t pid)415 static int SendResponse(const MyTask *task, const Message *msg, int result, pid_t pid)
416 {
417     printf("SendResponse connectionId: %u result: 0x%x pid: %d \n",
418         task->id, result, pid);
419     uint32_t bufferSize = sizeof(ResponseMsg);
420     BufferHandle handle = LE_CreateBuffer(LE_GetDefaultLoop(), bufferSize);
421     ResponseMsg *buffer = (ResponseMsg *)LE_GetBufferInfo(handle, NULL, &bufferSize);
422     int ret = memcpy_s(buffer, bufferSize, msg, sizeof(Message));
423     if (ret != 0) {
424         LE_FreeBuffer(LE_GetDefaultLoop(), NULL, handle);
425         printf("Failed to memcpy_s bufferSize \n");
426         return -1;
427     }
428 
429     buffer->result.result = result;
430     buffer->result.pid = pid;
431     return LE_Send(LE_GetDefaultLoop(), task->stream, handle, bufferSize);
432 }
433 
DeleteMsg(MsgNode *msgNode)434 void DeleteMsg(MsgNode *msgNode)
435 {
436     if (msgNode == NULL) {
437         return;
438     }
439     if (msgNode->buffer) {
440         free(msgNode->buffer);
441         msgNode->buffer = NULL;
442     }
443     if (msgNode->tlvOffset) {
444         free(msgNode->tlvOffset);
445         msgNode->tlvOffset = NULL;
446     }
447     free(msgNode);
448 }
449 
CheckMsg(const MsgNode *message)450 int CheckMsg(const MsgNode *message)
451 {
452     if (message == NULL) {
453         return MSG_INVALID;
454     }
455     if (strlen(message->msgHeader.processName) <= 0) {
456         printf("Invalid property processName %s \n", message->msgHeader.buffer);
457         return MSG_INVALID;
458     }
459     if (message->tlvOffset == NULL) {
460         printf("Invalid property tlv offset %s \n", message->msgHeader.buffer);
461         return MSG_INVALID;
462     }
463     if (message->buffer == NULL) {
464         printf("Invalid property buffer %s \n", message->msgHeader.buffer);
465         return MSG_INVALID;
466     }
467 
468     if (message->tlvOffset[TLV_BUNDLE_INFO] == INVALID_OFFSET ||
469         message->tlvOffset[TLV_MSG_FLAGS] == INVALID_OFFSET ||
470         message->tlvOffset[TLV_ACCESS_TOKEN_INFO] == INVALID_OFFSET ||
471         message->tlvOffset[TLV_DOMAIN_INFO] == INVALID_OFFSET ||
472         message->tlvOffset[TLV_DAC_INFO] == INVALID_OFFSET) {
473         printf("No must tlv bundle: %u flags: %u token: %u domain %u %u \n",
474             message->tlvOffset[TLV_BUNDLE_INFO], message->tlvOffset[TLV_MSG_FLAGS],
475             message->tlvOffset[TLV_ACCESS_TOKEN_INFO],
476             message->tlvOffset[TLV_DOMAIN_INFO], message->tlvOffset[TLV_DAC_INFO]);
477         return MSG_INVALID;
478     }
479 
480     return 0;
481 }
482 
ProcessReqMsg(MyTask *task, MsgNode *message)483 static void ProcessReqMsg(MyTask *task, MsgNode *message)
484 {
485     int ret = CheckMsg(message);
486     if (ret != 0) {
487         SendResponse(task, &message->msgHeader, ret, 0);
488         DeleteMsg(message);
489         return;
490     }
491 
492     message->task = task;
493 }
494 
GetMsgExtInfo(const MsgNode *message, const char *name, uint32_t *len)495 void *GetMsgExtInfo(const MsgNode *message, const char *name, uint32_t *len)
496 {
497     if (name == NULL) {
498         printf("Invalid name \n");
499         return NULL;
500     }
501     if (message == NULL || message->buffer == NULL || message->tlvOffset == NULL) {
502         return NULL;
503     }
504     printf("GetMsgExtInfo tlvCount %d name %s \n", message->tlvCount, name);
505 
506     for (uint32_t index = TLV_MAX; index < (TLV_MAX + message->tlvCount); index++) {
507         if (message->tlvOffset[index] == INVALID_OFFSET) {
508             return NULL;
509         }
510         uint8_t *data = message->buffer + message->tlvOffset[index];
511         if (((Tlv *)data)->tlvType != TLV_MAX) {
512             continue;
513         }
514         TlvExt *tlv = (TlvExt *)data;
515         if (strcmp(tlv->tlvName, name) != 0) {
516             continue;
517         }
518         if (len != NULL) {
519             *len = tlv->dataLen;
520         }
521         return data + sizeof(TlvExt);
522     }
523     return NULL;
524 }
525 
RebuildMsgNode(MsgNode *message, Process *info)526 MsgNode *RebuildMsgNode(MsgNode *message, Process *info)
527 {
528 #ifdef DEBUG_BEGETCTL_BOOT
529     if (message == NULL || info == NULL) {
530         printf("params is null \n");
531         return NULL;
532     }
533 
534     uint32_t bufferLen = 0;
535     MsgNode *node = CreateMsg();
536     if (node == NULL) {
537         printf("Failed to create MsgNode \n");
538         return NULL;
539     }
540 
541     int ret = memcpy_s(&node->msgHeader, sizeof(Message), &message->msgHeader, sizeof(Message));
542     if (ret != 0) {
543         printf("Failed to memcpy_s node->msgHeader \n");
544         return NULL;
545     }
546 
547     bufferLen = message->msgHeader.msgLen + info->message->msgHeader.msgLen - sizeof(Message);
548     node->msgHeader.msgLen = bufferLen;
549     node->msgHeader.msgType = MSG_NATIVE_PROCESS;
550     node->msgHeader.tlvCount += message->msgHeader.tlvCount;
551     ret = MsgRebuild(node, &node->msgHeader);
552     if (ret != 0) {
553         DeleteMsg(node);
554         printf("Failed to alloc memory for recv message \n");
555         return NULL;
556     }
557 
558     uint32_t infoBufLen = info->message->msgHeader.msgLen - sizeof(Message);
559     uint32_t msgBufLen = message->msgHeader.msgLen - sizeof(Message);
560     ret = memcpy_s(node->buffer, bufferLen, info->message->buffer, infoBufLen);
561     if (ret != 0) {
562         DeleteMsg(node);
563         printf("Failed to memcpy_s info buffer \n");
564         return NULL;
565     }
566     ret = memcpy_s(node->buffer + infoBufLen, bufferLen - infoBufLen, message->buffer, msgBufLen);
567     if (ret != 0) {
568         DeleteMsg(node);
569         printf("Failed to memcpy_s message->buffer \n");
570         return NULL;
571     }
572     return node;
573 #endif
574     return NULL;
575 }
576 
ProcessBegetctlMsg(MyTask *task, MsgNode *message)577 static MsgNode *ProcessBegetctlMsg(MyTask *task, MsgNode *message)
578 {
579     uint32_t len = 0;
580     const char *msg = (const char *)GetMsgExtInfo(message, MSG_EXT_NAME_BEGET_PID, &len);
581     if (msg == NULL) {
582         printf("Failed to get extInfo \n");
583         return NULL;
584     }
585 
586     MsgNode *msgNode = RebuildMsgNode(message, info);
587     if (msgNode == NULL) {
588         printf("Failed to rebuild message node \n");
589         return NULL;
590     }
591 
592     int ret = DecodeMsg(msgNode);
593     if (ret != 0) {
594         DeletenMsg(msgNode);
595         return NULL;
596     }
597     return msgNode;
598 }
599 
ProcessBegetMsg(MyTask *task, MsgNode *message)600 static void ProcessBegetMsg(MyTask *task, MsgNode *message)
601 {
602     Message *msg = &message->msgHeader;
603 
604     MsgNode *msgNode = ProcessBegetctlMsg(connection, message);
605     if (msgNode == NULL) {
606         SendResponse(task, msg, DEBUG_MODE_NOT_SUPPORT, 0);
607         DeleteMsg(message);
608         return;
609     }
610     ProcessReqMsg(task, msgNode);
611     DeleteMsg(message);
612     DeleteMsg(msgNode);
613 }
614 
ProcessRecvMsg(MyTask *task, MsgNode *message)615 static void ProcessRecvMsg(MyTask *task, MsgNode *message)
616 {
617     Message *msg = &message->msgHeader;
618     printf("Recv message header magic 0x%x type %u id %u len %u %s \n",
619         msg->magic, msg->msgType, msg->msgId, msg->msgLen, msg->buffer);
620     if (task->receiverCtx.nextMsgId != msg->msgId) {
621         printf("Invalid msg id %u %u \n", task->receiverCtx.nextMsgId, msg->msgId);
622     }
623     task->receiverCtx.nextMsgId++;
624 
625     int ret;
626     switch (msg->msgType) {
627         case MSG_GET_RENDER_TERMINATION_STATUS: {  // get status
628             Result result = {0};
629             ret = ProcessTerminationStatusMsg(message, &result);
630             SendResponse(task, msg, ret == 0 ? result.result : ret, result.pid);
631             DeleteMessage(message);
632             break;
633         }
634         case MSG_NORMAL: {
635             ProcessReqMsg(task, message);
636             break;
637         }
638         case MSG_DUMP:
639             SendResponse(task, msg, 0, 0);
640             DeleteMessage(message);
641             break;
642         case MSG_BEGET: {
643             ProcessBegetMsg(task, message);
644             break;
645         }
646         case MSG_BEGET_TIME:
647             SendResponse(task, msg, MIN_TIME, MAX_TIME);
648             DeleteMessage(message);
649             break;
650         case MSG_UPDATE_MOUNT_POINTS:
651             ret = ProcessRemountMsg(task, message);
652             SendResponse(task, msg, ret, 0);
653             break;
654         case MSG_RESTART:
655             ProcessRestartMsg(task, message);
656             break;
657         default:
658             SendResponse(task, msg, MSG_INVALID, 0);
659             DeleteMessage(message);
660             break;
661     }
662 }
663 
OnReceiveRequest(const TaskHandle taskHandle, const uint8_t *buffer, uint32_t buffLen)664 static void OnReceiveRequest(const TaskHandle taskHandle, const uint8_t *buffer, uint32_t buffLen)
665 {
666     MyTask *task = (MyTask *)LE_GetUserData(taskHandle);
667     if (task == NULL) {
668         printf("Failed to get client form socket\n");
669         LE_CloseTask(LE_GetDefaultLoop(), taskHandle);
670         return;
671     }
672 
673     if (buffLen >= MAX_MSG_TOTAL_LENGTH) {
674         printf("Message too long %u \n", buffLen);
675         LE_CloseTask(LE_GetDefaultLoop(), taskHandle);
676         return;
677     }
678 
679     uint32_t reminder = 0;
680     uint32_t currLen = 0;
681     Message *message = task->ctx.incompleteMsg; // incomplete msg
682     task->ctx.incompleteMsg = NULL;
683     int ret = 0;
684     do {
685         printf("OnReceiveRequest connectionId: %u start: 0x%x buffLen %d",
686             task->id, *(uint32_t *)(buffer + currLen), buffLen - currLen);
687 
688         ret = GetMsgFromBuffer(buffer + currLen, buffLen - currLen,
689             &message, &task->ctx.msgRecvLen, &reminder);
690         if (ret != 0) {
691             break;
692         }
693 
694         if (task->ctx.msgRecvLen != message->msgLen) {  // recv complete msg
695             task->ctx.incompleteMsg = message;
696             message = NULL;
697             break;
698         }
699         task->ctx.msgRecvLen = 0;
700         if (task->ctx.timer) {
701             LE_StopTimer(LE_GetDefaultLoop(), task->ctx.timer);
702             task->ctx.timer = NULL;
703         }
704         // decode msg
705         ret = DecodeMsg(message);
706         if (ret != 0) {
707             break;
708         }
709         (void)ProcessRecvMsg(task, message);
710         message = NULL;
711         currLen += buffLen - reminder;
712     } while (reminder > 0);
713 
714     if (task->ctx.incompleteMsg != NULL) { // Start the detection timer
715         ret = StartTimerForCheckMsg(task);
716         if (ret != 0) {
717             LE_CloseStreamTask(LE_GetDefaultLoop(), taskHandle);
718         }
719     }
720 }
721 
main(int argc, char *const argv[])722 int main(int argc, char *const argv[])
723 {
724     printf("main argc: %d \n", argc);
725     if (argc <= 0) {
726         return 0;
727     }
728 
729     printf("请输入创建socket的类型:(pipe, tcp)\n");
730     char type[128];
731     int ret = scanf_s("%s", type, sizeof(type));
732     if (ret <= 0) {
733         printf("input error \n");
734         return 0;
735     }
736 
737     int flags;
738     char *server;
739     if (strcmp(type, "pipe") == 0) {
740         flags = TASK_STREAM | TASK_PIPE |TASK_SERVER | TASK_TEST;
741         server = (char *)"/data/testpipe";
742     } else if (strcmp(type, "tcp") == 0) {
743         flags = TASK_STREAM | TASK_TCP |TASK_SERVER | TASK_TEST;
744         server = (char *)"127.0.0.1:7777";
745     } else {
746         printf("输入有误,请输入pipe或者tcp!");
747         system("pause");
748         return 0;
749     }
750 
751     return 0;
752 }