1/*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <fcntl.h>
20
21#include "loop_systest.h"
22#include "loop_event.h"
23#include "le_socket.h"
24#include "le_task.h"
25#include "list.h"
26
27typedef struct {
28    uint16_t tlvLen;
29    uint16_t tlvType;
30} Tlv;
31
32typedef struct {
33    uint16_t tlvLen;  // 对齐后的长度
34    uint16_t tlvType;
35    uint16_t dataLen;  // 数据的长度
36    uint16_t dataType;
37    char tlvName[TLV_NAME_LEN];
38} TlvExt;
39
40typedef struct {
41    Message msgHdr;
42    Result result;
43}ResponseMsg;
44
45typedef struct ReceiverCtx_ {
46    uint32_t nextMsgId;              // 校验消息id
47    uint32_t msgRecvLen;             // 已经接收的长度
48    TimerHandle timer;               // 测试消息完整
49    MsgNode *incompleteMsg;          // 保存不完整的消息
50} ReceiverCtx;
51
52typedef struct MyTask_ {
53    TaskHandle stream;
54    int id;
55    ReceiverCtx ctx;
56} MyTask;
57
58typedef struct MyService_ {
59    TaskHandle serverTask;
60    struct ListNode head;
61} MyService;
62
63typedef struct MsgNode_ {
64    MyTask *task;
65    Message msgHeader;
66    uint32_t tlvCount;
67    uint32_t *tlvOffset;
68    uint8_t *buffer;
69} MsgNode;
70
71static MyService g_service = NULL;
72
73int MakeDirRec(const char *path, mode_t mode, int lastPath)
74{
75    if (path == NULL || *path == '\0') {
76        printf("Invalid path to create \n");
77        return -1;
78    }
79
80    char buffer[PATH_MAX] = {0};
81    const char slash = '/';
82    const char *p = path;
83    char *curPos = strchr(path, slash);
84    while (curPos != NULL) {
85        int len = curPos - p;
86        p = curPos + 1;
87        if (len == 0) {
88            curPos = strchr(p, slash);
89            continue;
90        }
91
92        int ret = memcpy_s(buffer, PATH_MAX, path, p - path - 1);
93        if (ret != 0) {
94            printf("Failed to copy path \n");
95            return -1;
96        }
97
98        ret = mkdir(buffer, mode);
99        if (ret == -1 && errno != EEXIST) {
100            return errno;
101        }
102        curPos = strchr(p, slash);
103    }
104
105    if (lastPath) {
106        if (mkdir(path, mode) == -1 && errno != EEXIST) {
107            return errno;
108        }
109    }
110    return 0;
111}
112
113static inline void SetFdCtrl(int fd, int opt)
114{
115    int option = fcntl(fd, F_GETFD);
116    int ret = fcntl(fd, F_SETFD, option | opt);
117    if (ret < 0) {
118        printf("Set fd %d option %d %d result: %d \n", fd, option, opt, errno);
119    }
120}
121
122static int CreatePipeServer(TaskHandle *server, const char *name)
123{
124    char path[128] = {0};
125    int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "%s%s", SOCKET_DIR, name);
126    if (ret < 0) {
127        printf("Failed to snprintf_s %d \n", ret);
128    }
129    int socketId = GetControlSocket(name);
130    printf("get socket from env %s socketId %d \n", SOCKET_NAME, socketId);
131
132    LE_StreamInfo info = {};
133    info.baseInfo.flags = TASK_STREAM | TASK_PIPE | TASK_SERVER;
134    info.socketId = socketId;
135    info.server = server;
136    info.baseInfo.close = NULL;
137    info.incommingConnect = OnConnection;
138
139    MakeDirRec(path, DIR_MODE, 0);
140    ret = LE_CreateStreamServer(LE_GetDefaultLoop(), server, &info);
141    if (ret < 0) {
142        printf("Create server failed \n");
143    }
144    SetFdCtrl(LE_GetSocketFd(*server), FD_CLOEXEC);
145
146    printf("CreateServer path %s fd %d \n", path, LE_GetSocketFd(*server));
147    return 0;
148}
149
150static int CreateTcpServer(TaskHandle *server, const char *name)
151{
152    char path[128] = {0};
153    int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "%s%s", SOCKET_DIR, name);
154    if (ret < 0) {
155        printf("Failed to snprintf_s %d \n", ret);
156    }
157    int socketId = GetControlSocket(name);
158    printf("get socket from env %s socketId %d \n", SOCKET_NAME, socketId);
159
160    LE_StreamInfo info = {};
161    info.baseInfo.flags = TASK_STREAM | TASK_TCP | TASK_SERVER;
162    info.socketId = socketId;
163    info.server = server;
164    info.baseInfo.close = NULL;
165    info.incommingConnect = OnConnection;
166
167    MakeDirRec(path, DIR_MODE, 0);
168    ret = LE_CreateStreamServer(LE_GetDefaultLoop(), server, &info);
169    SetFdCtrl(LE_GetSocketFd(*server), FD_CLOEXEC);
170
171    printf("CreateServer path %s fd %d \n", path, LE_GetSocketFd(*server));
172    return ret;
173}
174
175static inline int StartTimerForCheckMsg(MyTask *task)
176{
177    if (connection->receiverCtx.timer != NULL) {
178        return 0;
179    }
180    int ret = LE_CreateTimer(LE_GetDefaultLoop(), &connection->receiverCtx.timer, WaitMsgCompleteTimeOut, connection);
181    if (ret == 0) {
182        ret = LE_StartTimer(LE_GetDefaultLoop(), connection->receiverCtx.timer, MAX_WAIT_MSG_COMPLETE, 1);
183    }
184    return ret;
185}
186
187static int SendMessage(LoopHandle loop, TaskHandle task, const char *message)
188{
189    if (message == NULL) {
190        printf("message is null \n");
191        return -1;
192    }
193    BufferHandle handle = NULL;
194    uint32_t bufferSize = strlen(message) + 1;
195    handle = LE_CreateBuffer(loop, bufferSize);
196    char *buff = (char *)LE_GetBufferInfo(handle, NULL, &bufferSize);
197    if (buff == NULL) {
198        printf("Failed get buffer info \n");
199        return -1;
200    }
201
202    int ret = memcpy_s(buff, bufferSize, message, strlen(message) + 1);
203    if (ret != 0) {
204        LE_FreeBuffer(loop, task, handle);
205        printf("Failed memcpy_s err=%d \n", errno);
206        return -1;
207    }
208
209    LE_STATUS status = LE_Send(loop, task, handle, strlen(message) + 1);
210    if (status != LE_SUCCESS) {
211        printf("Failed le_send msg \n");
212        return -1;
213    }
214    return 0;
215}
216
217static int OnConnection(const LoopHandle loop, const TaskHandle server)
218{
219    TaskHandle stream = NULL;
220    LE_StreamInfo info = {};
221    info.baseInfo.flags = flags;
222    info.baseInfo.close = OnClose;
223    info.baseInfo.userDataSize = sizeof(MyTask);
224    info.disConnectComplete = OnDisConnect;
225    info.sendMessageComplete = sendMessageComplete;
226    info.recvMessage = OnRecvMessage;
227
228    int ret = LE_AcceptStreamClient(loop, server, &stream, &info);
229    if (ret != 0) {
230        printf("Failed to accept stream \n");
231    }
232
233    MyTask *agent = (MyTask *)LE_GetUserData(stream);
234    // 收到数据后的处理
235    static uint32_t connectionId = 0;
236    agent->id = ++connectionId;
237    agent->stream = stream;
238    agent->ctx.timer = NULL;
239    agent->incompleteMsg = NULL;
240
241    printf("accept id: %d\n", agent->id);
242    ret = SendMessage(loop, agent->stream, "Connect Success.");
243    if (ret != 0) {
244        printf("Failed to send msg \n");
245        return -1;
246    }
247
248    return 0;
249}
250
251static void OnClose(const TaskHandle taskHandle)
252{
253    MyTask *task = (MyTask *)LE_GetUserData(taskHandle);
254    if (task == NULL) {
255        printf("Invalid task \n");
256        return;
257    }
258}
259
260static void OnDisConnect(const TaskHandle taskHandle)
261{
262    MyTask *task = (MyTask *)LE_GetUserData(taskHandle);
263    if (task == NULL) {
264        printf("Invalid task \n");
265        return;
266    }
267    printf("task id: %d\n", task->id);
268    OnClose(taskHandle);
269}
270
271static void SendMessageComplete(const TaskHandle taskHandle, BufferHandle handle)
272{
273    MyTask *task = (MyTask *)LE_GetUserData(taskHandle);
274    if (task == NULL) {
275        printf("Invalid task \n");
276        return;
277    }
278    uint32_t bufferSize = sizeof(Message);
279    Message *msg = (Message *)LE_GetBufferInfo(handle, NULL, &bufferSize);
280    if (msg == NULL) {
281        return;
282    }
283    printf("SendMessageComplete taskId: %u msgId %u msgType %u buf %s",
284        task->id, msg->msgId, msg->msgType, msg->buffer);
285}
286
287void ServerInit(const char *server, LoopHandle loop, int flags)
288{
289    if (server == NULL || loop == NULL) {
290        printf("Invalid parameter\n");
291        return;
292    }
293
294    OH_ListInit(&g_service.head);
295    LE_StreamServerInfo info = {};
296    info.baseInfo.flags = flags;
297    info.baseInfo.close = NULL;
298    info.socketId = -1;
299    info.server = server;
300    info.disConnectComplete = NULL;
301    info.incomingConnect = OnConnection;
302    info.sendMessageComplete = NULL;
303    info.recvMessage = NULL;
304
305    int ret = LE_CreateStreamServer(loop, &g_service.serverTask, &info);
306    if (ret != 0) {
307        printf("Init server failed.\n");
308    }
309}
310
311static int MsgRebuild(MsgNode *message, const Message *msg)
312{
313    if (CheckMsg(&message->msgHeader) != 0) {
314        return MSG_INVALID;
315    }
316    if (msg->msgLen == sizeof(message->msgHeader)) {  // only has msg header
317        return 0;
318    }
319    if (message->buffer == NULL) {
320        message->buffer = calloc(1, msg->msgLen - sizeof(message->msgHeader));
321        if (message->buffer == NULL) {
322            printf("Failed to alloc memory for recv message \n");
323            return -1;
324        }
325    }
326    if (message->tlvOffset == NULL) {
327        uint32_t totalCount = msg->tlvCount + TLV_MAX;
328        message->tlvOffset = malloc(totalCount * sizeof(uint32_t));
329        if (message->tlvOffset == NULL) {
330            printf("Failed to alloc memory for recv message \n");
331            return -1;
332        }
333        for (uint32_t i = 0; i < totalCount; i++) {
334            message->tlvOffset[i] = INVALID_OFFSET;
335        }
336    }
337    return 0;
338}
339
340int GetMsgFromBuffer(const uint8_t *buffer, uint32_t bufferLen,
341    Message **outMsg, uint32_t *msgRecvLen, uint32_t *reminder)
342{
343    if (buffer == NULL || outMsg == NULL || msgRecvLen == NULL || reminder == NULL) {
344        return MSG_INVALID;
345    }
346
347    *reminder = 0;
348    Message *message = *outMsg;
349    if (message == NULL) {
350        message = CreateMessage();
351        if (message == NULL) {
352            printf("Failed to create message \n");
353            return SYSTEM_ERROR;
354        }
355        *outMsg = message;
356    }
357
358    uint32_t reminderLen = bufferLen;
359    const uint8_t *reminderBuffer = buffer;
360    if (*msgRecvLen < sizeof(Message)) {  // recv partial message
361        if ((bufferLen + *msgRecvLen) >= sizeof(Message)) {
362            int ret = memcpy_s(((uint8_t *)&message->msgHeader) + *msgRecvLen,
363                sizeof(message->msgHeader) - *msgRecvLen,
364                buffer, sizeof(message->msgHeader) - *msgRecvLen);
365            if (ret != 0) {
366                printf("Failed to copy recv buffer \n");
367                return -1;
368            }
369
370            ret = MsgRebuild(message, &message->msgHeader);
371            if (ret != 0) {
372                printf("Failed to alloc buffer for receive msg \n");
373                return -1;
374            }
375            reminderLen = bufferLen - (sizeof(message->msgHeader) - *msgRecvLen);
376            reminderBuffer = buffer + sizeof(message->msgHeader) - *msgRecvLen;
377            *msgRecvLen = sizeof(message->msgHeader);
378        } else {
379            int ret = memcpy_s(((uint8_t *)&message->msgHeader) + *msgRecvLen,
380                sizeof(message->msgHeader) - *msgRecvLen, buffer, bufferLen);
381            if (ret != 0) {
382                printf("Failed to copy recv buffer \n");
383                return -1;
384            }
385            *msgRecvLen += bufferLen;
386            return 0;
387        }
388    }
389    return 0;
390}
391
392int DecodeMsg(Message * message)
393{
394    if (message == NULL) {
395        printf("decode empty message, failed! \n");
396        return -1;
397    }
398    /*
399        写解析消息的逻辑
400    */
401    return 0;
402}
403
404int ProcessTerminationStatusMsg(const MsgNode *message, Result *result)
405{
406    if (message == NULL || result == NULL) {
407        return -1;
408    }
409
410    result->result = -1;
411    result->pid = 0;
412    return 0;
413}
414
415static int SendResponse(const MyTask *task, const Message *msg, int result, pid_t pid)
416{
417    printf("SendResponse connectionId: %u result: 0x%x pid: %d \n",
418        task->id, result, pid);
419    uint32_t bufferSize = sizeof(ResponseMsg);
420    BufferHandle handle = LE_CreateBuffer(LE_GetDefaultLoop(), bufferSize);
421    ResponseMsg *buffer = (ResponseMsg *)LE_GetBufferInfo(handle, NULL, &bufferSize);
422    int ret = memcpy_s(buffer, bufferSize, msg, sizeof(Message));
423    if (ret != 0) {
424        LE_FreeBuffer(LE_GetDefaultLoop(), NULL, handle);
425        printf("Failed to memcpy_s bufferSize \n");
426        return -1;
427    }
428
429    buffer->result.result = result;
430    buffer->result.pid = pid;
431    return LE_Send(LE_GetDefaultLoop(), task->stream, handle, bufferSize);
432}
433
434void DeleteMsg(MsgNode *msgNode)
435{
436    if (msgNode == NULL) {
437        return;
438    }
439    if (msgNode->buffer) {
440        free(msgNode->buffer);
441        msgNode->buffer = NULL;
442    }
443    if (msgNode->tlvOffset) {
444        free(msgNode->tlvOffset);
445        msgNode->tlvOffset = NULL;
446    }
447    free(msgNode);
448}
449
450int CheckMsg(const MsgNode *message)
451{
452    if (message == NULL) {
453        return MSG_INVALID;
454    }
455    if (strlen(message->msgHeader.processName) <= 0) {
456        printf("Invalid property processName %s \n", message->msgHeader.buffer);
457        return MSG_INVALID;
458    }
459    if (message->tlvOffset == NULL) {
460        printf("Invalid property tlv offset %s \n", message->msgHeader.buffer);
461        return MSG_INVALID;
462    }
463    if (message->buffer == NULL) {
464        printf("Invalid property buffer %s \n", message->msgHeader.buffer);
465        return MSG_INVALID;
466    }
467
468    if (message->tlvOffset[TLV_BUNDLE_INFO] == INVALID_OFFSET ||
469        message->tlvOffset[TLV_MSG_FLAGS] == INVALID_OFFSET ||
470        message->tlvOffset[TLV_ACCESS_TOKEN_INFO] == INVALID_OFFSET ||
471        message->tlvOffset[TLV_DOMAIN_INFO] == INVALID_OFFSET ||
472        message->tlvOffset[TLV_DAC_INFO] == INVALID_OFFSET) {
473        printf("No must tlv bundle: %u flags: %u token: %u domain %u %u \n",
474            message->tlvOffset[TLV_BUNDLE_INFO], message->tlvOffset[TLV_MSG_FLAGS],
475            message->tlvOffset[TLV_ACCESS_TOKEN_INFO],
476            message->tlvOffset[TLV_DOMAIN_INFO], message->tlvOffset[TLV_DAC_INFO]);
477        return MSG_INVALID;
478    }
479
480    return 0;
481}
482
483static void ProcessReqMsg(MyTask *task, MsgNode *message)
484{
485    int ret = CheckMsg(message);
486    if (ret != 0) {
487        SendResponse(task, &message->msgHeader, ret, 0);
488        DeleteMsg(message);
489        return;
490    }
491
492    message->task = task;
493}
494
495void *GetMsgExtInfo(const MsgNode *message, const char *name, uint32_t *len)
496{
497    if (name == NULL) {
498        printf("Invalid name \n");
499        return NULL;
500    }
501    if (message == NULL || message->buffer == NULL || message->tlvOffset == NULL) {
502        return NULL;
503    }
504    printf("GetMsgExtInfo tlvCount %d name %s \n", message->tlvCount, name);
505
506    for (uint32_t index = TLV_MAX; index < (TLV_MAX + message->tlvCount); index++) {
507        if (message->tlvOffset[index] == INVALID_OFFSET) {
508            return NULL;
509        }
510        uint8_t *data = message->buffer + message->tlvOffset[index];
511        if (((Tlv *)data)->tlvType != TLV_MAX) {
512            continue;
513        }
514        TlvExt *tlv = (TlvExt *)data;
515        if (strcmp(tlv->tlvName, name) != 0) {
516            continue;
517        }
518        if (len != NULL) {
519            *len = tlv->dataLen;
520        }
521        return data + sizeof(TlvExt);
522    }
523    return NULL;
524}
525
526MsgNode *RebuildMsgNode(MsgNode *message, Process *info)
527{
528#ifdef DEBUG_BEGETCTL_BOOT
529    if (message == NULL || info == NULL) {
530        printf("params is null \n");
531        return NULL;
532    }
533
534    uint32_t bufferLen = 0;
535    MsgNode *node = CreateMsg();
536    if (node == NULL) {
537        printf("Failed to create MsgNode \n");
538        return NULL;
539    }
540
541    int ret = memcpy_s(&node->msgHeader, sizeof(Message), &message->msgHeader, sizeof(Message));
542    if (ret != 0) {
543        printf("Failed to memcpy_s node->msgHeader \n");
544        return NULL;
545    }
546
547    bufferLen = message->msgHeader.msgLen + info->message->msgHeader.msgLen - sizeof(Message);
548    node->msgHeader.msgLen = bufferLen;
549    node->msgHeader.msgType = MSG_NATIVE_PROCESS;
550    node->msgHeader.tlvCount += message->msgHeader.tlvCount;
551    ret = MsgRebuild(node, &node->msgHeader);
552    if (ret != 0) {
553        DeleteMsg(node);
554        printf("Failed to alloc memory for recv message \n");
555        return NULL;
556    }
557
558    uint32_t infoBufLen = info->message->msgHeader.msgLen - sizeof(Message);
559    uint32_t msgBufLen = message->msgHeader.msgLen - sizeof(Message);
560    ret = memcpy_s(node->buffer, bufferLen, info->message->buffer, infoBufLen);
561    if (ret != 0) {
562        DeleteMsg(node);
563        printf("Failed to memcpy_s info buffer \n");
564        return NULL;
565    }
566    ret = memcpy_s(node->buffer + infoBufLen, bufferLen - infoBufLen, message->buffer, msgBufLen);
567    if (ret != 0) {
568        DeleteMsg(node);
569        printf("Failed to memcpy_s message->buffer \n");
570        return NULL;
571    }
572    return node;
573#endif
574    return NULL;
575}
576
577static MsgNode *ProcessBegetctlMsg(MyTask *task, MsgNode *message)
578{
579    uint32_t len = 0;
580    const char *msg = (const char *)GetMsgExtInfo(message, MSG_EXT_NAME_BEGET_PID, &len);
581    if (msg == NULL) {
582        printf("Failed to get extInfo \n");
583        return NULL;
584    }
585
586    MsgNode *msgNode = RebuildMsgNode(message, info);
587    if (msgNode == NULL) {
588        printf("Failed to rebuild message node \n");
589        return NULL;
590    }
591
592    int ret = DecodeMsg(msgNode);
593    if (ret != 0) {
594        DeletenMsg(msgNode);
595        return NULL;
596    }
597    return msgNode;
598}
599
600static void ProcessBegetMsg(MyTask *task, MsgNode *message)
601{
602    Message *msg = &message->msgHeader;
603
604    MsgNode *msgNode = ProcessBegetctlMsg(connection, message);
605    if (msgNode == NULL) {
606        SendResponse(task, msg, DEBUG_MODE_NOT_SUPPORT, 0);
607        DeleteMsg(message);
608        return;
609    }
610    ProcessReqMsg(task, msgNode);
611    DeleteMsg(message);
612    DeleteMsg(msgNode);
613}
614
615static void ProcessRecvMsg(MyTask *task, MsgNode *message)
616{
617    Message *msg = &message->msgHeader;
618    printf("Recv message header magic 0x%x type %u id %u len %u %s \n",
619        msg->magic, msg->msgType, msg->msgId, msg->msgLen, msg->buffer);
620    if (task->receiverCtx.nextMsgId != msg->msgId) {
621        printf("Invalid msg id %u %u \n", task->receiverCtx.nextMsgId, msg->msgId);
622    }
623    task->receiverCtx.nextMsgId++;
624
625    int ret;
626    switch (msg->msgType) {
627        case MSG_GET_RENDER_TERMINATION_STATUS: {  // get status
628            Result result = {0};
629            ret = ProcessTerminationStatusMsg(message, &result);
630            SendResponse(task, msg, ret == 0 ? result.result : ret, result.pid);
631            DeleteMessage(message);
632            break;
633        }
634        case MSG_NORMAL: {
635            ProcessReqMsg(task, message);
636            break;
637        }
638        case MSG_DUMP:
639            SendResponse(task, msg, 0, 0);
640            DeleteMessage(message);
641            break;
642        case MSG_BEGET: {
643            ProcessBegetMsg(task, message);
644            break;
645        }
646        case MSG_BEGET_TIME:
647            SendResponse(task, msg, MIN_TIME, MAX_TIME);
648            DeleteMessage(message);
649            break;
650        case MSG_UPDATE_MOUNT_POINTS:
651            ret = ProcessRemountMsg(task, message);
652            SendResponse(task, msg, ret, 0);
653            break;
654        case MSG_RESTART:
655            ProcessRestartMsg(task, message);
656            break;
657        default:
658            SendResponse(task, msg, MSG_INVALID, 0);
659            DeleteMessage(message);
660            break;
661    }
662}
663
664static void OnReceiveRequest(const TaskHandle taskHandle, const uint8_t *buffer, uint32_t buffLen)
665{
666    MyTask *task = (MyTask *)LE_GetUserData(taskHandle);
667    if (task == NULL) {
668        printf("Failed to get client form socket\n");
669        LE_CloseTask(LE_GetDefaultLoop(), taskHandle);
670        return;
671    }
672
673    if (buffLen >= MAX_MSG_TOTAL_LENGTH) {
674        printf("Message too long %u \n", buffLen);
675        LE_CloseTask(LE_GetDefaultLoop(), taskHandle);
676        return;
677    }
678
679    uint32_t reminder = 0;
680    uint32_t currLen = 0;
681    Message *message = task->ctx.incompleteMsg; // incomplete msg
682    task->ctx.incompleteMsg = NULL;
683    int ret = 0;
684    do {
685        printf("OnReceiveRequest connectionId: %u start: 0x%x buffLen %d",
686            task->id, *(uint32_t *)(buffer + currLen), buffLen - currLen);
687
688        ret = GetMsgFromBuffer(buffer + currLen, buffLen - currLen,
689            &message, &task->ctx.msgRecvLen, &reminder);
690        if (ret != 0) {
691            break;
692        }
693
694        if (task->ctx.msgRecvLen != message->msgLen) {  // recv complete msg
695            task->ctx.incompleteMsg = message;
696            message = NULL;
697            break;
698        }
699        task->ctx.msgRecvLen = 0;
700        if (task->ctx.timer) {
701            LE_StopTimer(LE_GetDefaultLoop(), task->ctx.timer);
702            task->ctx.timer = NULL;
703        }
704        // decode msg
705        ret = DecodeMsg(message);
706        if (ret != 0) {
707            break;
708        }
709        (void)ProcessRecvMsg(task, message);
710        message = NULL;
711        currLen += buffLen - reminder;
712    } while (reminder > 0);
713
714    if (task->ctx.incompleteMsg != NULL) { // Start the detection timer
715        ret = StartTimerForCheckMsg(task);
716        if (ret != 0) {
717            LE_CloseStreamTask(LE_GetDefaultLoop(), taskHandle);
718        }
719    }
720}
721
722int main(int argc, char *const argv[])
723{
724    printf("main argc: %d \n", argc);
725    if (argc <= 0) {
726        return 0;
727    }
728
729    printf("请输入创建socket的类型:(pipe, tcp)\n");
730    char type[128];
731    int ret = scanf_s("%s", type, sizeof(type));
732    if (ret <= 0) {
733        printf("input error \n");
734        return 0;
735    }
736
737    int flags;
738    char *server;
739    if (strcmp(type, "pipe") == 0) {
740        flags = TASK_STREAM | TASK_PIPE |TASK_SERVER | TASK_TEST;
741        server = (char *)"/data/testpipe";
742    } else if (strcmp(type, "tcp") == 0) {
743        flags = TASK_STREAM | TASK_TCP |TASK_SERVER | TASK_TEST;
744        server = (char *)"127.0.0.1:7777";
745    } else {
746        printf("输入有误,请输入pipe或者tcp!");
747        system("pause");
748        return 0;
749    }
750
751    return 0;
752}