1/* 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include <stdio.h> 17#include <stdlib.h> 18#include <string.h> 19#include <fcntl.h> 20 21#include "loop_systest.h" 22#include "loop_event.h" 23#include "le_socket.h" 24#include "le_task.h" 25#include "list.h" 26 27typedef struct { 28 uint16_t tlvLen; 29 uint16_t tlvType; 30} Tlv; 31 32typedef struct { 33 uint16_t tlvLen; // 对齐后的长度 34 uint16_t tlvType; 35 uint16_t dataLen; // 数据的长度 36 uint16_t dataType; 37 char tlvName[TLV_NAME_LEN]; 38} TlvExt; 39 40typedef struct { 41 Message msgHdr; 42 Result result; 43}ResponseMsg; 44 45typedef struct ReceiverCtx_ { 46 uint32_t nextMsgId; // 校验消息id 47 uint32_t msgRecvLen; // 已经接收的长度 48 TimerHandle timer; // 测试消息完整 49 MsgNode *incompleteMsg; // 保存不完整的消息 50} ReceiverCtx; 51 52typedef struct MyTask_ { 53 TaskHandle stream; 54 int id; 55 ReceiverCtx ctx; 56} MyTask; 57 58typedef struct MyService_ { 59 TaskHandle serverTask; 60 struct ListNode head; 61} MyService; 62 63typedef struct MsgNode_ { 64 MyTask *task; 65 Message msgHeader; 66 uint32_t tlvCount; 67 uint32_t *tlvOffset; 68 uint8_t *buffer; 69} MsgNode; 70 71static MyService g_service = NULL; 72 73int MakeDirRec(const char *path, mode_t mode, int lastPath) 74{ 75 if (path == NULL || *path == '\0') { 76 printf("Invalid path to create \n"); 77 return -1; 78 } 79 80 char buffer[PATH_MAX] = {0}; 81 const char slash = '/'; 82 const char *p = path; 83 char *curPos = strchr(path, slash); 84 while (curPos != NULL) { 85 int len = curPos - p; 86 p = curPos + 1; 87 if (len == 0) { 88 curPos = strchr(p, slash); 89 continue; 90 } 91 92 int ret = memcpy_s(buffer, PATH_MAX, path, p - path - 1); 93 if (ret != 0) { 94 printf("Failed to copy path \n"); 95 return -1; 96 } 97 98 ret = mkdir(buffer, mode); 99 if (ret == -1 && errno != EEXIST) { 100 return errno; 101 } 102 curPos = strchr(p, slash); 103 } 104 105 if (lastPath) { 106 if (mkdir(path, mode) == -1 && errno != EEXIST) { 107 return errno; 108 } 109 } 110 return 0; 111} 112 113static inline void SetFdCtrl(int fd, int opt) 114{ 115 int option = fcntl(fd, F_GETFD); 116 int ret = fcntl(fd, F_SETFD, option | opt); 117 if (ret < 0) { 118 printf("Set fd %d option %d %d result: %d \n", fd, option, opt, errno); 119 } 120} 121 122static int CreatePipeServer(TaskHandle *server, const char *name) 123{ 124 char path[128] = {0}; 125 int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "%s%s", SOCKET_DIR, name); 126 if (ret < 0) { 127 printf("Failed to snprintf_s %d \n", ret); 128 } 129 int socketId = GetControlSocket(name); 130 printf("get socket from env %s socketId %d \n", SOCKET_NAME, socketId); 131 132 LE_StreamInfo info = {}; 133 info.baseInfo.flags = TASK_STREAM | TASK_PIPE | TASK_SERVER; 134 info.socketId = socketId; 135 info.server = server; 136 info.baseInfo.close = NULL; 137 info.incommingConnect = OnConnection; 138 139 MakeDirRec(path, DIR_MODE, 0); 140 ret = LE_CreateStreamServer(LE_GetDefaultLoop(), server, &info); 141 if (ret < 0) { 142 printf("Create server failed \n"); 143 } 144 SetFdCtrl(LE_GetSocketFd(*server), FD_CLOEXEC); 145 146 printf("CreateServer path %s fd %d \n", path, LE_GetSocketFd(*server)); 147 return 0; 148} 149 150static int CreateTcpServer(TaskHandle *server, const char *name) 151{ 152 char path[128] = {0}; 153 int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "%s%s", SOCKET_DIR, name); 154 if (ret < 0) { 155 printf("Failed to snprintf_s %d \n", ret); 156 } 157 int socketId = GetControlSocket(name); 158 printf("get socket from env %s socketId %d \n", SOCKET_NAME, socketId); 159 160 LE_StreamInfo info = {}; 161 info.baseInfo.flags = TASK_STREAM | TASK_TCP | TASK_SERVER; 162 info.socketId = socketId; 163 info.server = server; 164 info.baseInfo.close = NULL; 165 info.incommingConnect = OnConnection; 166 167 MakeDirRec(path, DIR_MODE, 0); 168 ret = LE_CreateStreamServer(LE_GetDefaultLoop(), server, &info); 169 SetFdCtrl(LE_GetSocketFd(*server), FD_CLOEXEC); 170 171 printf("CreateServer path %s fd %d \n", path, LE_GetSocketFd(*server)); 172 return ret; 173} 174 175static inline int StartTimerForCheckMsg(MyTask *task) 176{ 177 if (connection->receiverCtx.timer != NULL) { 178 return 0; 179 } 180 int ret = LE_CreateTimer(LE_GetDefaultLoop(), &connection->receiverCtx.timer, WaitMsgCompleteTimeOut, connection); 181 if (ret == 0) { 182 ret = LE_StartTimer(LE_GetDefaultLoop(), connection->receiverCtx.timer, MAX_WAIT_MSG_COMPLETE, 1); 183 } 184 return ret; 185} 186 187static int SendMessage(LoopHandle loop, TaskHandle task, const char *message) 188{ 189 if (message == NULL) { 190 printf("message is null \n"); 191 return -1; 192 } 193 BufferHandle handle = NULL; 194 uint32_t bufferSize = strlen(message) + 1; 195 handle = LE_CreateBuffer(loop, bufferSize); 196 char *buff = (char *)LE_GetBufferInfo(handle, NULL, &bufferSize); 197 if (buff == NULL) { 198 printf("Failed get buffer info \n"); 199 return -1; 200 } 201 202 int ret = memcpy_s(buff, bufferSize, message, strlen(message) + 1); 203 if (ret != 0) { 204 LE_FreeBuffer(loop, task, handle); 205 printf("Failed memcpy_s err=%d \n", errno); 206 return -1; 207 } 208 209 LE_STATUS status = LE_Send(loop, task, handle, strlen(message) + 1); 210 if (status != LE_SUCCESS) { 211 printf("Failed le_send msg \n"); 212 return -1; 213 } 214 return 0; 215} 216 217static int OnConnection(const LoopHandle loop, const TaskHandle server) 218{ 219 TaskHandle stream = NULL; 220 LE_StreamInfo info = {}; 221 info.baseInfo.flags = flags; 222 info.baseInfo.close = OnClose; 223 info.baseInfo.userDataSize = sizeof(MyTask); 224 info.disConnectComplete = OnDisConnect; 225 info.sendMessageComplete = sendMessageComplete; 226 info.recvMessage = OnRecvMessage; 227 228 int ret = LE_AcceptStreamClient(loop, server, &stream, &info); 229 if (ret != 0) { 230 printf("Failed to accept stream \n"); 231 } 232 233 MyTask *agent = (MyTask *)LE_GetUserData(stream); 234 // 收到数据后的处理 235 static uint32_t connectionId = 0; 236 agent->id = ++connectionId; 237 agent->stream = stream; 238 agent->ctx.timer = NULL; 239 agent->incompleteMsg = NULL; 240 241 printf("accept id: %d\n", agent->id); 242 ret = SendMessage(loop, agent->stream, "Connect Success."); 243 if (ret != 0) { 244 printf("Failed to send msg \n"); 245 return -1; 246 } 247 248 return 0; 249} 250 251static void OnClose(const TaskHandle taskHandle) 252{ 253 MyTask *task = (MyTask *)LE_GetUserData(taskHandle); 254 if (task == NULL) { 255 printf("Invalid task \n"); 256 return; 257 } 258} 259 260static void OnDisConnect(const TaskHandle taskHandle) 261{ 262 MyTask *task = (MyTask *)LE_GetUserData(taskHandle); 263 if (task == NULL) { 264 printf("Invalid task \n"); 265 return; 266 } 267 printf("task id: %d\n", task->id); 268 OnClose(taskHandle); 269} 270 271static void SendMessageComplete(const TaskHandle taskHandle, BufferHandle handle) 272{ 273 MyTask *task = (MyTask *)LE_GetUserData(taskHandle); 274 if (task == NULL) { 275 printf("Invalid task \n"); 276 return; 277 } 278 uint32_t bufferSize = sizeof(Message); 279 Message *msg = (Message *)LE_GetBufferInfo(handle, NULL, &bufferSize); 280 if (msg == NULL) { 281 return; 282 } 283 printf("SendMessageComplete taskId: %u msgId %u msgType %u buf %s", 284 task->id, msg->msgId, msg->msgType, msg->buffer); 285} 286 287void ServerInit(const char *server, LoopHandle loop, int flags) 288{ 289 if (server == NULL || loop == NULL) { 290 printf("Invalid parameter\n"); 291 return; 292 } 293 294 OH_ListInit(&g_service.head); 295 LE_StreamServerInfo info = {}; 296 info.baseInfo.flags = flags; 297 info.baseInfo.close = NULL; 298 info.socketId = -1; 299 info.server = server; 300 info.disConnectComplete = NULL; 301 info.incomingConnect = OnConnection; 302 info.sendMessageComplete = NULL; 303 info.recvMessage = NULL; 304 305 int ret = LE_CreateStreamServer(loop, &g_service.serverTask, &info); 306 if (ret != 0) { 307 printf("Init server failed.\n"); 308 } 309} 310 311static int MsgRebuild(MsgNode *message, const Message *msg) 312{ 313 if (CheckMsg(&message->msgHeader) != 0) { 314 return MSG_INVALID; 315 } 316 if (msg->msgLen == sizeof(message->msgHeader)) { // only has msg header 317 return 0; 318 } 319 if (message->buffer == NULL) { 320 message->buffer = calloc(1, msg->msgLen - sizeof(message->msgHeader)); 321 if (message->buffer == NULL) { 322 printf("Failed to alloc memory for recv message \n"); 323 return -1; 324 } 325 } 326 if (message->tlvOffset == NULL) { 327 uint32_t totalCount = msg->tlvCount + TLV_MAX; 328 message->tlvOffset = malloc(totalCount * sizeof(uint32_t)); 329 if (message->tlvOffset == NULL) { 330 printf("Failed to alloc memory for recv message \n"); 331 return -1; 332 } 333 for (uint32_t i = 0; i < totalCount; i++) { 334 message->tlvOffset[i] = INVALID_OFFSET; 335 } 336 } 337 return 0; 338} 339 340int GetMsgFromBuffer(const uint8_t *buffer, uint32_t bufferLen, 341 Message **outMsg, uint32_t *msgRecvLen, uint32_t *reminder) 342{ 343 if (buffer == NULL || outMsg == NULL || msgRecvLen == NULL || reminder == NULL) { 344 return MSG_INVALID; 345 } 346 347 *reminder = 0; 348 Message *message = *outMsg; 349 if (message == NULL) { 350 message = CreateMessage(); 351 if (message == NULL) { 352 printf("Failed to create message \n"); 353 return SYSTEM_ERROR; 354 } 355 *outMsg = message; 356 } 357 358 uint32_t reminderLen = bufferLen; 359 const uint8_t *reminderBuffer = buffer; 360 if (*msgRecvLen < sizeof(Message)) { // recv partial message 361 if ((bufferLen + *msgRecvLen) >= sizeof(Message)) { 362 int ret = memcpy_s(((uint8_t *)&message->msgHeader) + *msgRecvLen, 363 sizeof(message->msgHeader) - *msgRecvLen, 364 buffer, sizeof(message->msgHeader) - *msgRecvLen); 365 if (ret != 0) { 366 printf("Failed to copy recv buffer \n"); 367 return -1; 368 } 369 370 ret = MsgRebuild(message, &message->msgHeader); 371 if (ret != 0) { 372 printf("Failed to alloc buffer for receive msg \n"); 373 return -1; 374 } 375 reminderLen = bufferLen - (sizeof(message->msgHeader) - *msgRecvLen); 376 reminderBuffer = buffer + sizeof(message->msgHeader) - *msgRecvLen; 377 *msgRecvLen = sizeof(message->msgHeader); 378 } else { 379 int ret = memcpy_s(((uint8_t *)&message->msgHeader) + *msgRecvLen, 380 sizeof(message->msgHeader) - *msgRecvLen, buffer, bufferLen); 381 if (ret != 0) { 382 printf("Failed to copy recv buffer \n"); 383 return -1; 384 } 385 *msgRecvLen += bufferLen; 386 return 0; 387 } 388 } 389 return 0; 390} 391 392int DecodeMsg(Message * message) 393{ 394 if (message == NULL) { 395 printf("decode empty message, failed! \n"); 396 return -1; 397 } 398 /* 399 写解析消息的逻辑 400 */ 401 return 0; 402} 403 404int ProcessTerminationStatusMsg(const MsgNode *message, Result *result) 405{ 406 if (message == NULL || result == NULL) { 407 return -1; 408 } 409 410 result->result = -1; 411 result->pid = 0; 412 return 0; 413} 414 415static int SendResponse(const MyTask *task, const Message *msg, int result, pid_t pid) 416{ 417 printf("SendResponse connectionId: %u result: 0x%x pid: %d \n", 418 task->id, result, pid); 419 uint32_t bufferSize = sizeof(ResponseMsg); 420 BufferHandle handle = LE_CreateBuffer(LE_GetDefaultLoop(), bufferSize); 421 ResponseMsg *buffer = (ResponseMsg *)LE_GetBufferInfo(handle, NULL, &bufferSize); 422 int ret = memcpy_s(buffer, bufferSize, msg, sizeof(Message)); 423 if (ret != 0) { 424 LE_FreeBuffer(LE_GetDefaultLoop(), NULL, handle); 425 printf("Failed to memcpy_s bufferSize \n"); 426 return -1; 427 } 428 429 buffer->result.result = result; 430 buffer->result.pid = pid; 431 return LE_Send(LE_GetDefaultLoop(), task->stream, handle, bufferSize); 432} 433 434void DeleteMsg(MsgNode *msgNode) 435{ 436 if (msgNode == NULL) { 437 return; 438 } 439 if (msgNode->buffer) { 440 free(msgNode->buffer); 441 msgNode->buffer = NULL; 442 } 443 if (msgNode->tlvOffset) { 444 free(msgNode->tlvOffset); 445 msgNode->tlvOffset = NULL; 446 } 447 free(msgNode); 448} 449 450int CheckMsg(const MsgNode *message) 451{ 452 if (message == NULL) { 453 return MSG_INVALID; 454 } 455 if (strlen(message->msgHeader.processName) <= 0) { 456 printf("Invalid property processName %s \n", message->msgHeader.buffer); 457 return MSG_INVALID; 458 } 459 if (message->tlvOffset == NULL) { 460 printf("Invalid property tlv offset %s \n", message->msgHeader.buffer); 461 return MSG_INVALID; 462 } 463 if (message->buffer == NULL) { 464 printf("Invalid property buffer %s \n", message->msgHeader.buffer); 465 return MSG_INVALID; 466 } 467 468 if (message->tlvOffset[TLV_BUNDLE_INFO] == INVALID_OFFSET || 469 message->tlvOffset[TLV_MSG_FLAGS] == INVALID_OFFSET || 470 message->tlvOffset[TLV_ACCESS_TOKEN_INFO] == INVALID_OFFSET || 471 message->tlvOffset[TLV_DOMAIN_INFO] == INVALID_OFFSET || 472 message->tlvOffset[TLV_DAC_INFO] == INVALID_OFFSET) { 473 printf("No must tlv bundle: %u flags: %u token: %u domain %u %u \n", 474 message->tlvOffset[TLV_BUNDLE_INFO], message->tlvOffset[TLV_MSG_FLAGS], 475 message->tlvOffset[TLV_ACCESS_TOKEN_INFO], 476 message->tlvOffset[TLV_DOMAIN_INFO], message->tlvOffset[TLV_DAC_INFO]); 477 return MSG_INVALID; 478 } 479 480 return 0; 481} 482 483static void ProcessReqMsg(MyTask *task, MsgNode *message) 484{ 485 int ret = CheckMsg(message); 486 if (ret != 0) { 487 SendResponse(task, &message->msgHeader, ret, 0); 488 DeleteMsg(message); 489 return; 490 } 491 492 message->task = task; 493} 494 495void *GetMsgExtInfo(const MsgNode *message, const char *name, uint32_t *len) 496{ 497 if (name == NULL) { 498 printf("Invalid name \n"); 499 return NULL; 500 } 501 if (message == NULL || message->buffer == NULL || message->tlvOffset == NULL) { 502 return NULL; 503 } 504 printf("GetMsgExtInfo tlvCount %d name %s \n", message->tlvCount, name); 505 506 for (uint32_t index = TLV_MAX; index < (TLV_MAX + message->tlvCount); index++) { 507 if (message->tlvOffset[index] == INVALID_OFFSET) { 508 return NULL; 509 } 510 uint8_t *data = message->buffer + message->tlvOffset[index]; 511 if (((Tlv *)data)->tlvType != TLV_MAX) { 512 continue; 513 } 514 TlvExt *tlv = (TlvExt *)data; 515 if (strcmp(tlv->tlvName, name) != 0) { 516 continue; 517 } 518 if (len != NULL) { 519 *len = tlv->dataLen; 520 } 521 return data + sizeof(TlvExt); 522 } 523 return NULL; 524} 525 526MsgNode *RebuildMsgNode(MsgNode *message, Process *info) 527{ 528#ifdef DEBUG_BEGETCTL_BOOT 529 if (message == NULL || info == NULL) { 530 printf("params is null \n"); 531 return NULL; 532 } 533 534 uint32_t bufferLen = 0; 535 MsgNode *node = CreateMsg(); 536 if (node == NULL) { 537 printf("Failed to create MsgNode \n"); 538 return NULL; 539 } 540 541 int ret = memcpy_s(&node->msgHeader, sizeof(Message), &message->msgHeader, sizeof(Message)); 542 if (ret != 0) { 543 printf("Failed to memcpy_s node->msgHeader \n"); 544 return NULL; 545 } 546 547 bufferLen = message->msgHeader.msgLen + info->message->msgHeader.msgLen - sizeof(Message); 548 node->msgHeader.msgLen = bufferLen; 549 node->msgHeader.msgType = MSG_NATIVE_PROCESS; 550 node->msgHeader.tlvCount += message->msgHeader.tlvCount; 551 ret = MsgRebuild(node, &node->msgHeader); 552 if (ret != 0) { 553 DeleteMsg(node); 554 printf("Failed to alloc memory for recv message \n"); 555 return NULL; 556 } 557 558 uint32_t infoBufLen = info->message->msgHeader.msgLen - sizeof(Message); 559 uint32_t msgBufLen = message->msgHeader.msgLen - sizeof(Message); 560 ret = memcpy_s(node->buffer, bufferLen, info->message->buffer, infoBufLen); 561 if (ret != 0) { 562 DeleteMsg(node); 563 printf("Failed to memcpy_s info buffer \n"); 564 return NULL; 565 } 566 ret = memcpy_s(node->buffer + infoBufLen, bufferLen - infoBufLen, message->buffer, msgBufLen); 567 if (ret != 0) { 568 DeleteMsg(node); 569 printf("Failed to memcpy_s message->buffer \n"); 570 return NULL; 571 } 572 return node; 573#endif 574 return NULL; 575} 576 577static MsgNode *ProcessBegetctlMsg(MyTask *task, MsgNode *message) 578{ 579 uint32_t len = 0; 580 const char *msg = (const char *)GetMsgExtInfo(message, MSG_EXT_NAME_BEGET_PID, &len); 581 if (msg == NULL) { 582 printf("Failed to get extInfo \n"); 583 return NULL; 584 } 585 586 MsgNode *msgNode = RebuildMsgNode(message, info); 587 if (msgNode == NULL) { 588 printf("Failed to rebuild message node \n"); 589 return NULL; 590 } 591 592 int ret = DecodeMsg(msgNode); 593 if (ret != 0) { 594 DeletenMsg(msgNode); 595 return NULL; 596 } 597 return msgNode; 598} 599 600static void ProcessBegetMsg(MyTask *task, MsgNode *message) 601{ 602 Message *msg = &message->msgHeader; 603 604 MsgNode *msgNode = ProcessBegetctlMsg(connection, message); 605 if (msgNode == NULL) { 606 SendResponse(task, msg, DEBUG_MODE_NOT_SUPPORT, 0); 607 DeleteMsg(message); 608 return; 609 } 610 ProcessReqMsg(task, msgNode); 611 DeleteMsg(message); 612 DeleteMsg(msgNode); 613} 614 615static void ProcessRecvMsg(MyTask *task, MsgNode *message) 616{ 617 Message *msg = &message->msgHeader; 618 printf("Recv message header magic 0x%x type %u id %u len %u %s \n", 619 msg->magic, msg->msgType, msg->msgId, msg->msgLen, msg->buffer); 620 if (task->receiverCtx.nextMsgId != msg->msgId) { 621 printf("Invalid msg id %u %u \n", task->receiverCtx.nextMsgId, msg->msgId); 622 } 623 task->receiverCtx.nextMsgId++; 624 625 int ret; 626 switch (msg->msgType) { 627 case MSG_GET_RENDER_TERMINATION_STATUS: { // get status 628 Result result = {0}; 629 ret = ProcessTerminationStatusMsg(message, &result); 630 SendResponse(task, msg, ret == 0 ? result.result : ret, result.pid); 631 DeleteMessage(message); 632 break; 633 } 634 case MSG_NORMAL: { 635 ProcessReqMsg(task, message); 636 break; 637 } 638 case MSG_DUMP: 639 SendResponse(task, msg, 0, 0); 640 DeleteMessage(message); 641 break; 642 case MSG_BEGET: { 643 ProcessBegetMsg(task, message); 644 break; 645 } 646 case MSG_BEGET_TIME: 647 SendResponse(task, msg, MIN_TIME, MAX_TIME); 648 DeleteMessage(message); 649 break; 650 case MSG_UPDATE_MOUNT_POINTS: 651 ret = ProcessRemountMsg(task, message); 652 SendResponse(task, msg, ret, 0); 653 break; 654 case MSG_RESTART: 655 ProcessRestartMsg(task, message); 656 break; 657 default: 658 SendResponse(task, msg, MSG_INVALID, 0); 659 DeleteMessage(message); 660 break; 661 } 662} 663 664static void OnReceiveRequest(const TaskHandle taskHandle, const uint8_t *buffer, uint32_t buffLen) 665{ 666 MyTask *task = (MyTask *)LE_GetUserData(taskHandle); 667 if (task == NULL) { 668 printf("Failed to get client form socket\n"); 669 LE_CloseTask(LE_GetDefaultLoop(), taskHandle); 670 return; 671 } 672 673 if (buffLen >= MAX_MSG_TOTAL_LENGTH) { 674 printf("Message too long %u \n", buffLen); 675 LE_CloseTask(LE_GetDefaultLoop(), taskHandle); 676 return; 677 } 678 679 uint32_t reminder = 0; 680 uint32_t currLen = 0; 681 Message *message = task->ctx.incompleteMsg; // incomplete msg 682 task->ctx.incompleteMsg = NULL; 683 int ret = 0; 684 do { 685 printf("OnReceiveRequest connectionId: %u start: 0x%x buffLen %d", 686 task->id, *(uint32_t *)(buffer + currLen), buffLen - currLen); 687 688 ret = GetMsgFromBuffer(buffer + currLen, buffLen - currLen, 689 &message, &task->ctx.msgRecvLen, &reminder); 690 if (ret != 0) { 691 break; 692 } 693 694 if (task->ctx.msgRecvLen != message->msgLen) { // recv complete msg 695 task->ctx.incompleteMsg = message; 696 message = NULL; 697 break; 698 } 699 task->ctx.msgRecvLen = 0; 700 if (task->ctx.timer) { 701 LE_StopTimer(LE_GetDefaultLoop(), task->ctx.timer); 702 task->ctx.timer = NULL; 703 } 704 // decode msg 705 ret = DecodeMsg(message); 706 if (ret != 0) { 707 break; 708 } 709 (void)ProcessRecvMsg(task, message); 710 message = NULL; 711 currLen += buffLen - reminder; 712 } while (reminder > 0); 713 714 if (task->ctx.incompleteMsg != NULL) { // Start the detection timer 715 ret = StartTimerForCheckMsg(task); 716 if (ret != 0) { 717 LE_CloseStreamTask(LE_GetDefaultLoop(), taskHandle); 718 } 719 } 720} 721 722int main(int argc, char *const argv[]) 723{ 724 printf("main argc: %d \n", argc); 725 if (argc <= 0) { 726 return 0; 727 } 728 729 printf("请输入创建socket的类型:(pipe, tcp)\n"); 730 char type[128]; 731 int ret = scanf_s("%s", type, sizeof(type)); 732 if (ret <= 0) { 733 printf("input error \n"); 734 return 0; 735 } 736 737 int flags; 738 char *server; 739 if (strcmp(type, "pipe") == 0) { 740 flags = TASK_STREAM | TASK_PIPE |TASK_SERVER | TASK_TEST; 741 server = (char *)"/data/testpipe"; 742 } else if (strcmp(type, "tcp") == 0) { 743 flags = TASK_STREAM | TASK_TCP |TASK_SERVER | TASK_TEST; 744 server = (char *)"127.0.0.1:7777"; 745 } else { 746 printf("输入有误,请输入pipe或者tcp!"); 747 system("pause"); 748 return 0; 749 } 750 751 return 0; 752}