1d9f0492fSopenharmony_ci/* 2d9f0492fSopenharmony_ci * Copyright (c) 2021 Huawei Device Co., Ltd. 3d9f0492fSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4d9f0492fSopenharmony_ci * you may not use this file except in compliance with the License. 5d9f0492fSopenharmony_ci * You may obtain a copy of the License at 6d9f0492fSopenharmony_ci * 7d9f0492fSopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8d9f0492fSopenharmony_ci * 9d9f0492fSopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10d9f0492fSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11d9f0492fSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12d9f0492fSopenharmony_ci * See the License for the specific language governing permissions and 13d9f0492fSopenharmony_ci * limitations under the License. 14d9f0492fSopenharmony_ci */ 15d9f0492fSopenharmony_ci 16d9f0492fSopenharmony_ci#include "le_loop.h" 17d9f0492fSopenharmony_ci 18d9f0492fSopenharmony_ci#include <errno.h> 19d9f0492fSopenharmony_ci#include <sys/socket.h> 20d9f0492fSopenharmony_ci#include "securec.h" 21d9f0492fSopenharmony_ci 22d9f0492fSopenharmony_ci#include "le_socket.h" 23d9f0492fSopenharmony_ci#include "le_task.h" 24d9f0492fSopenharmony_ci 25d9f0492fSopenharmony_cistatic LE_STATUS HandleSendMsg_(const LoopHandle loopHandle, 26d9f0492fSopenharmony_ci const TaskHandle taskHandle, const LE_SendMessageComplete complete) 27d9f0492fSopenharmony_ci{ 28d9f0492fSopenharmony_ci EventLoop *loop = (EventLoop *)loopHandle; 29d9f0492fSopenharmony_ci StreamTask *stream = (StreamTask *)taskHandle; 30d9f0492fSopenharmony_ci LE_Buffer *buffer = GetFirstBuffer(stream); 31d9f0492fSopenharmony_ci while (buffer) { 32d9f0492fSopenharmony_ci int ret = write(GetSocketFd(taskHandle), buffer->data, buffer->dataSize); 33d9f0492fSopenharmony_ci if (ret < 0 || (size_t)ret < buffer->dataSize) { 34d9f0492fSopenharmony_ci LE_LOGE("HandleSendMsg_ fd:%d send data size %d %d, err:%d", GetSocketFd(taskHandle), 35d9f0492fSopenharmony_ci buffer->dataSize, ret, errno); 36d9f0492fSopenharmony_ci } 37d9f0492fSopenharmony_ci LE_LOGV("HandleSendMsg_ fd:%d send data size %d %d", GetSocketFd(taskHandle), buffer->dataSize, ret); 38d9f0492fSopenharmony_ci buffer->result = (ret == (int)buffer->dataSize) ? 0 : errno; 39d9f0492fSopenharmony_ci if (complete != NULL) { 40d9f0492fSopenharmony_ci complete(taskHandle, buffer); 41d9f0492fSopenharmony_ci } 42d9f0492fSopenharmony_ci FreeBuffer(loopHandle, stream, buffer); 43d9f0492fSopenharmony_ci buffer = GetFirstBuffer(stream); 44d9f0492fSopenharmony_ci } 45d9f0492fSopenharmony_ci if (IsBufferEmpty(stream)) { 46d9f0492fSopenharmony_ci LE_LOGV("HandleSendMsg_ fd:%d empty wait read", GetSocketFd(taskHandle)); 47d9f0492fSopenharmony_ci loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_READ); 48d9f0492fSopenharmony_ci return LE_SUCCESS; 49d9f0492fSopenharmony_ci } 50d9f0492fSopenharmony_ci return LE_SUCCESS; 51d9f0492fSopenharmony_ci} 52d9f0492fSopenharmony_ci 53d9f0492fSopenharmony_cistatic LE_STATUS HandleRecvMsg_(const LoopHandle loopHandle, 54d9f0492fSopenharmony_ci const TaskHandle taskHandle, const LE_RecvMessage recvMessage, const LE_HandleRecvMsg handleRecvMsg) 55d9f0492fSopenharmony_ci{ 56d9f0492fSopenharmony_ci LE_STATUS status = LE_SUCCESS; 57d9f0492fSopenharmony_ci LE_Buffer *buffer = CreateBuffer(LOOP_DEFAULT_BUFFER); 58d9f0492fSopenharmony_ci LE_CHECK(buffer != NULL, return LE_NO_MEMORY, "Failed to create buffer"); 59d9f0492fSopenharmony_ci int readLen = 0; 60d9f0492fSopenharmony_ci while (1) { 61d9f0492fSopenharmony_ci if (handleRecvMsg != NULL) { 62d9f0492fSopenharmony_ci readLen = handleRecvMsg(taskHandle, buffer->data, LOOP_DEFAULT_BUFFER, 0); 63d9f0492fSopenharmony_ci } else { 64d9f0492fSopenharmony_ci readLen = recv(GetSocketFd(taskHandle), buffer->data, LOOP_DEFAULT_BUFFER, 0); 65d9f0492fSopenharmony_ci } 66d9f0492fSopenharmony_ci LE_LOGV("HandleRecvMsg fd:%d read msg len %d", GetSocketFd(taskHandle), readLen); 67d9f0492fSopenharmony_ci if (readLen < 0) { 68d9f0492fSopenharmony_ci if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) { 69d9f0492fSopenharmony_ci continue; 70d9f0492fSopenharmony_ci } 71d9f0492fSopenharmony_ci status = LE_DIS_CONNECTED; 72d9f0492fSopenharmony_ci break; 73d9f0492fSopenharmony_ci } else if (readLen == 0) { 74d9f0492fSopenharmony_ci // 若另一端已关闭连接则返回0,这种关闭是对方主动且正常的关闭 75d9f0492fSopenharmony_ci status = LE_DIS_CONNECTED; 76d9f0492fSopenharmony_ci break; 77d9f0492fSopenharmony_ci } else { 78d9f0492fSopenharmony_ci break; 79d9f0492fSopenharmony_ci } 80d9f0492fSopenharmony_ci } 81d9f0492fSopenharmony_ci if (status != LE_SUCCESS) { 82d9f0492fSopenharmony_ci FreeBuffer(loopHandle, NULL, buffer); 83d9f0492fSopenharmony_ci return status; 84d9f0492fSopenharmony_ci } 85d9f0492fSopenharmony_ci if (recvMessage) { 86d9f0492fSopenharmony_ci recvMessage(taskHandle, buffer->data, readLen); 87d9f0492fSopenharmony_ci } 88d9f0492fSopenharmony_ci FreeBuffer(loopHandle, NULL, buffer); 89d9f0492fSopenharmony_ci return status; 90d9f0492fSopenharmony_ci} 91d9f0492fSopenharmony_ci 92d9f0492fSopenharmony_cistatic LE_STATUS HandleStreamEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper) 93d9f0492fSopenharmony_ci{ 94d9f0492fSopenharmony_ci StreamConnectTask *stream = (StreamConnectTask *)handle; 95d9f0492fSopenharmony_ci LE_LOGV("HandleStreamEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper); 96d9f0492fSopenharmony_ci 97d9f0492fSopenharmony_ci LE_STATUS status = LE_SUCCESS; 98d9f0492fSopenharmony_ci if (LE_TEST_FLAGS(oper, EVENT_WRITE)) { 99d9f0492fSopenharmony_ci status = HandleSendMsg_(loopHandle, handle, stream->sendMessageComplete); 100d9f0492fSopenharmony_ci } 101d9f0492fSopenharmony_ci if (LE_TEST_FLAGS(oper, EVENT_READ)) { 102d9f0492fSopenharmony_ci status = HandleRecvMsg_(loopHandle, handle, stream->recvMessage, stream->handleRecvMsg); 103d9f0492fSopenharmony_ci } 104d9f0492fSopenharmony_ci if (LE_TEST_FLAGS(oper, EVENT_ERROR)) { 105d9f0492fSopenharmony_ci if (stream->disConnectComplete) { 106d9f0492fSopenharmony_ci stream->disConnectComplete(handle); 107d9f0492fSopenharmony_ci } 108d9f0492fSopenharmony_ci LE_CloseStreamTask(loopHandle, handle); 109d9f0492fSopenharmony_ci } 110d9f0492fSopenharmony_ci return status; 111d9f0492fSopenharmony_ci} 112d9f0492fSopenharmony_ci 113d9f0492fSopenharmony_cistatic LE_STATUS HandleClientEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper) 114d9f0492fSopenharmony_ci{ 115d9f0492fSopenharmony_ci StreamClientTask *client = (StreamClientTask *)handle; 116d9f0492fSopenharmony_ci LE_LOGV("HandleClientEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper); 117d9f0492fSopenharmony_ci 118d9f0492fSopenharmony_ci LE_STATUS status = LE_SUCCESS; 119d9f0492fSopenharmony_ci if (LE_TEST_FLAGS(oper, EVENT_WRITE)) { 120d9f0492fSopenharmony_ci LE_ONLY_CHECK(!(client->connected == 0 && client->connectComplete), client->connectComplete(handle)); 121d9f0492fSopenharmony_ci client->connected = 1; 122d9f0492fSopenharmony_ci status = HandleSendMsg_(loopHandle, handle, client->sendMessageComplete); 123d9f0492fSopenharmony_ci } 124d9f0492fSopenharmony_ci if (LE_TEST_FLAGS(oper, EVENT_READ)) { 125d9f0492fSopenharmony_ci status = HandleRecvMsg_(loopHandle, handle, client->recvMessage, client->handleRecvMsg); 126d9f0492fSopenharmony_ci } 127d9f0492fSopenharmony_ci if (status == LE_DIS_CONNECTED) { 128d9f0492fSopenharmony_ci if (client->disConnectComplete) { 129d9f0492fSopenharmony_ci client->disConnectComplete(handle); 130d9f0492fSopenharmony_ci } 131d9f0492fSopenharmony_ci client->connected = 0; 132d9f0492fSopenharmony_ci LE_CloseStreamTask(loopHandle, handle); 133d9f0492fSopenharmony_ci } 134d9f0492fSopenharmony_ci return status; 135d9f0492fSopenharmony_ci} 136d9f0492fSopenharmony_ci 137d9f0492fSopenharmony_cistatic void HandleStreamTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle) 138d9f0492fSopenharmony_ci{ 139d9f0492fSopenharmony_ci BaseTask *task = (BaseTask *)taskHandle; 140d9f0492fSopenharmony_ci DelTask((EventLoop *)loopHandle, task); 141d9f0492fSopenharmony_ci CloseTask(loopHandle, task); 142d9f0492fSopenharmony_ci if (task->taskId.fd > 0) { 143d9f0492fSopenharmony_ci close(task->taskId.fd); 144d9f0492fSopenharmony_ci } 145d9f0492fSopenharmony_ci} 146d9f0492fSopenharmony_ci 147d9f0492fSopenharmony_cistatic void DumpStreamServerTaskInfo_(const TaskHandle task) 148d9f0492fSopenharmony_ci{ 149d9f0492fSopenharmony_ci INIT_CHECK(task != NULL, return); 150d9f0492fSopenharmony_ci BaseTask *baseTask = (BaseTask *)task; 151d9f0492fSopenharmony_ci StreamServerTask *serverTask = (StreamServerTask *)baseTask; 152d9f0492fSopenharmony_ci printf("\tfd: %d \n", serverTask->base.taskId.fd); 153d9f0492fSopenharmony_ci printf("\t TaskType: %s \n", "ServerTask"); 154d9f0492fSopenharmony_ci if (strlen(serverTask->server) > 0) { 155d9f0492fSopenharmony_ci printf("\t Server socket:%s \n", serverTask->server); 156d9f0492fSopenharmony_ci } else { 157d9f0492fSopenharmony_ci printf("\t Server socket:%s \n", "NULL"); 158d9f0492fSopenharmony_ci } 159d9f0492fSopenharmony_ci} 160d9f0492fSopenharmony_ci 161d9f0492fSopenharmony_cistatic void DumpStreamConnectTaskInfo_(const TaskHandle task) 162d9f0492fSopenharmony_ci{ 163d9f0492fSopenharmony_ci INIT_CHECK(task != NULL, return); 164d9f0492fSopenharmony_ci BaseTask *baseTask = (BaseTask *)task; 165d9f0492fSopenharmony_ci StreamConnectTask *connectTask = (StreamConnectTask *)baseTask; 166d9f0492fSopenharmony_ci TaskHandle taskHandle = (TaskHandle)connectTask; 167d9f0492fSopenharmony_ci printf("\tfd: %d \n", connectTask->stream.base.taskId.fd); 168d9f0492fSopenharmony_ci printf("\t TaskType: %s \n", "ConnectTask"); 169d9f0492fSopenharmony_ci printf("\t ServiceInfo: \n"); 170d9f0492fSopenharmony_ci struct ucred cred = {-1, -1, -1}; 171d9f0492fSopenharmony_ci socklen_t credSize = sizeof(struct ucred); 172d9f0492fSopenharmony_ci if (getsockopt(LE_GetSocketFd(taskHandle), SOL_SOCKET, SO_PEERCRED, &cred, &credSize) == 0) { 173d9f0492fSopenharmony_ci printf("\t Service Pid: %d \n", cred.pid); 174d9f0492fSopenharmony_ci printf("\t Service Uid: %u \n", cred.uid); 175d9f0492fSopenharmony_ci printf("\t Service Gid: %u \n", cred.gid); 176d9f0492fSopenharmony_ci } else { 177d9f0492fSopenharmony_ci printf("\t Service Pid: %s \n", "NULL"); 178d9f0492fSopenharmony_ci printf("\t Service Uid: %s \n", "NULL"); 179d9f0492fSopenharmony_ci printf("\t Service Gid: %s \n", "NULL"); 180d9f0492fSopenharmony_ci } 181d9f0492fSopenharmony_ci} 182d9f0492fSopenharmony_ci 183d9f0492fSopenharmony_cistatic LE_STATUS HandleServerEvent_(const LoopHandle loopHandle, const TaskHandle serverTask, uint32_t oper) 184d9f0492fSopenharmony_ci{ 185d9f0492fSopenharmony_ci LE_LOGV("HandleServerEvent_ fd %d oper 0x%x", GetSocketFd(serverTask), oper); 186d9f0492fSopenharmony_ci if (!LE_TEST_FLAGS(oper, EVENT_READ)) { 187d9f0492fSopenharmony_ci return LE_FAILURE; 188d9f0492fSopenharmony_ci } 189d9f0492fSopenharmony_ci StreamServerTask *server = (StreamServerTask *)serverTask; 190d9f0492fSopenharmony_ci LE_ONLY_CHECK(server->incommingConnect != NULL, return LE_SUCCESS); 191d9f0492fSopenharmony_ci 192d9f0492fSopenharmony_ci int ret = server->incommingConnect(loopHandle, serverTask); 193d9f0492fSopenharmony_ci if (ret != LE_SUCCESS) { 194d9f0492fSopenharmony_ci LE_LOGE("HandleServerEvent_ fd %d do not accept socket", GetSocketFd(serverTask)); 195d9f0492fSopenharmony_ci } 196d9f0492fSopenharmony_ci EventLoop *loop = (EventLoop *)loopHandle; 197d9f0492fSopenharmony_ci loop->modEvent(loop, (const BaseTask *)serverTask, EVENT_READ); 198d9f0492fSopenharmony_ci return LE_SUCCESS; 199d9f0492fSopenharmony_ci} 200d9f0492fSopenharmony_ci 201d9f0492fSopenharmony_ciLE_STATUS LE_CreateStreamServer(const LoopHandle loopHandle, 202d9f0492fSopenharmony_ci TaskHandle *taskHandle, const LE_StreamServerInfo *info) 203d9f0492fSopenharmony_ci{ 204d9f0492fSopenharmony_ci LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters"); 205d9f0492fSopenharmony_ci LE_CHECK(info->server != NULL, return LE_INVALID_PARAM, "Invalid parameters server"); 206d9f0492fSopenharmony_ci LE_CHECK(info->incommingConnect != NULL, return LE_INVALID_PARAM, 207d9f0492fSopenharmony_ci "Invalid parameters incommingConnect %s", info->server); 208d9f0492fSopenharmony_ci 209d9f0492fSopenharmony_ci int fd = info->socketId; 210d9f0492fSopenharmony_ci int ret = 0; 211d9f0492fSopenharmony_ci if (info->socketId <= 0) { 212d9f0492fSopenharmony_ci fd = CreateSocket(info->baseInfo.flags, info->server); 213d9f0492fSopenharmony_ci LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server); 214d9f0492fSopenharmony_ci } else { 215d9f0492fSopenharmony_ci ret = listenSocket(fd, info->baseInfo.flags, info->server); 216d9f0492fSopenharmony_ci LE_CHECK(ret == 0, return LE_FAILURE, "Failed to listen socket %s", info->server); 217d9f0492fSopenharmony_ci } 218d9f0492fSopenharmony_ci 219d9f0492fSopenharmony_ci EventLoop *loop = (EventLoop *)loopHandle; 220d9f0492fSopenharmony_ci StreamServerTask *task = (StreamServerTask *)CreateTask(loopHandle, fd, &info->baseInfo, 221d9f0492fSopenharmony_ci sizeof(StreamServerTask) + strlen(info->server) + 1); 222d9f0492fSopenharmony_ci LE_CHECK(task != NULL, close(fd); 223d9f0492fSopenharmony_ci return LE_NO_MEMORY, "Failed to create task"); 224d9f0492fSopenharmony_ci task->base.handleEvent = HandleServerEvent_; 225d9f0492fSopenharmony_ci task->base.innerClose = HandleStreamTaskClose_; 226d9f0492fSopenharmony_ci task->base.dumpTaskInfo = DumpStreamServerTaskInfo_; 227d9f0492fSopenharmony_ci task->incommingConnect = info->incommingConnect; 228d9f0492fSopenharmony_ci loop->addEvent(loop, (const BaseTask *)task, EVENT_READ); 229d9f0492fSopenharmony_ci ret = memcpy_s(task->server, strlen(info->server) + 1, info->server, strlen(info->server) + 1); 230d9f0492fSopenharmony_ci LE_CHECK(ret == 0, return LE_FAILURE, "Failed to copy server name %s", info->server); 231d9f0492fSopenharmony_ci *taskHandle = (TaskHandle)task; 232d9f0492fSopenharmony_ci return LE_SUCCESS; 233d9f0492fSopenharmony_ci} 234d9f0492fSopenharmony_ci 235d9f0492fSopenharmony_ciLE_STATUS LE_CreateStreamClient(const LoopHandle loopHandle, 236d9f0492fSopenharmony_ci TaskHandle *taskHandle, const LE_StreamInfo *info) 237d9f0492fSopenharmony_ci{ 238d9f0492fSopenharmony_ci LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters"); 239d9f0492fSopenharmony_ci LE_CHECK(info->recvMessage != NULL, return LE_FAILURE, "Invalid parameters recvMessage %s", info->server); 240d9f0492fSopenharmony_ci 241d9f0492fSopenharmony_ci int fd = CreateSocket(info->baseInfo.flags, info->server); 242d9f0492fSopenharmony_ci LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server); 243d9f0492fSopenharmony_ci 244d9f0492fSopenharmony_ci StreamClientTask *task = (StreamClientTask *)CreateTask(loopHandle, fd, &info->baseInfo, sizeof(StreamClientTask)); 245d9f0492fSopenharmony_ci LE_CHECK(task != NULL, close(fd); 246d9f0492fSopenharmony_ci return LE_NO_MEMORY, "Failed to create task"); 247d9f0492fSopenharmony_ci task->stream.base.handleEvent = HandleClientEvent_; 248d9f0492fSopenharmony_ci task->stream.base.innerClose = HandleStreamTaskClose_; 249d9f0492fSopenharmony_ci OH_ListInit(&task->stream.buffHead); 250d9f0492fSopenharmony_ci LoopMutexInit(&task->stream.mutex); 251d9f0492fSopenharmony_ci 252d9f0492fSopenharmony_ci task->connectComplete = info->connectComplete; 253d9f0492fSopenharmony_ci task->sendMessageComplete = info->sendMessageComplete; 254d9f0492fSopenharmony_ci task->recvMessage = info->recvMessage; 255d9f0492fSopenharmony_ci task->disConnectComplete = info->disConnectComplete; 256d9f0492fSopenharmony_ci task->handleRecvMsg = info->handleRecvMsg; 257d9f0492fSopenharmony_ci EventLoop *loop = (EventLoop *)loopHandle; 258d9f0492fSopenharmony_ci loop->addEvent(loop, (const BaseTask *)task, EVENT_READ); 259d9f0492fSopenharmony_ci *taskHandle = (TaskHandle)task; 260d9f0492fSopenharmony_ci return LE_SUCCESS; 261d9f0492fSopenharmony_ci} 262d9f0492fSopenharmony_ci 263d9f0492fSopenharmony_ciLE_STATUS LE_AcceptStreamClient(const LoopHandle loopHandle, const TaskHandle server, 264d9f0492fSopenharmony_ci TaskHandle *taskHandle, const LE_StreamInfo *info) 265d9f0492fSopenharmony_ci{ 266d9f0492fSopenharmony_ci LE_CHECK(loopHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters"); 267d9f0492fSopenharmony_ci LE_CHECK(server != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters"); 268d9f0492fSopenharmony_ci LE_CHECK(info->recvMessage != NULL, return LE_INVALID_PARAM, "Invalid parameters recvMessage"); 269d9f0492fSopenharmony_ci int fd = -1; 270d9f0492fSopenharmony_ci if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) { 271d9f0492fSopenharmony_ci fd = AcceptSocket(GetSocketFd(server), info->baseInfo.flags); 272d9f0492fSopenharmony_ci LE_CHECK(fd > 0, return LE_FAILURE, "Failed to accept socket %d", GetSocketFd(server)); 273d9f0492fSopenharmony_ci } 274d9f0492fSopenharmony_ci StreamConnectTask *task = (StreamConnectTask *)CreateTask( 275d9f0492fSopenharmony_ci loopHandle, fd, &info->baseInfo, sizeof(StreamConnectTask)); 276d9f0492fSopenharmony_ci LE_CHECK(task != NULL, close(fd); 277d9f0492fSopenharmony_ci return LE_NO_MEMORY, "Failed to create task"); 278d9f0492fSopenharmony_ci task->stream.base.handleEvent = HandleStreamEvent_; 279d9f0492fSopenharmony_ci task->stream.base.innerClose = HandleStreamTaskClose_; 280d9f0492fSopenharmony_ci task->stream.base.dumpTaskInfo = DumpStreamConnectTaskInfo_; 281d9f0492fSopenharmony_ci task->disConnectComplete = info->disConnectComplete; 282d9f0492fSopenharmony_ci task->sendMessageComplete = info->sendMessageComplete; 283d9f0492fSopenharmony_ci task->recvMessage = info->recvMessage; 284d9f0492fSopenharmony_ci task->serverTask = (StreamServerTask *)server; 285d9f0492fSopenharmony_ci task->handleRecvMsg = info->handleRecvMsg; 286d9f0492fSopenharmony_ci OH_ListInit(&task->stream.buffHead); 287d9f0492fSopenharmony_ci LoopMutexInit(&task->stream.mutex); 288d9f0492fSopenharmony_ci if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) { 289d9f0492fSopenharmony_ci EventLoop *loop = (EventLoop *)loopHandle; 290d9f0492fSopenharmony_ci loop->addEvent(loop, (const BaseTask *)task, EVENT_READ); 291d9f0492fSopenharmony_ci } 292d9f0492fSopenharmony_ci *taskHandle = (TaskHandle)task; 293d9f0492fSopenharmony_ci return 0; 294d9f0492fSopenharmony_ci} 295d9f0492fSopenharmony_ci 296d9f0492fSopenharmony_civoid LE_CloseStreamTask(const LoopHandle loopHandle, const TaskHandle taskHandle) 297d9f0492fSopenharmony_ci{ 298d9f0492fSopenharmony_ci LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters"); 299d9f0492fSopenharmony_ci LE_CloseTask(loopHandle, taskHandle); 300d9f0492fSopenharmony_ci} 301d9f0492fSopenharmony_ci 302d9f0492fSopenharmony_ciint LE_GetSocketFd(const TaskHandle taskHandle) 303d9f0492fSopenharmony_ci{ 304d9f0492fSopenharmony_ci LE_CHECK(taskHandle != NULL, return -1, "Invalid parameters"); 305d9f0492fSopenharmony_ci return GetSocketFd(taskHandle); 306d9f0492fSopenharmony_ci} 307