1d9f0492fSopenharmony_ci/*
2d9f0492fSopenharmony_ci * Copyright (c) 2021 Huawei Device Co., Ltd.
3d9f0492fSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4d9f0492fSopenharmony_ci * you may not use this file except in compliance with the License.
5d9f0492fSopenharmony_ci * You may obtain a copy of the License at
6d9f0492fSopenharmony_ci *
7d9f0492fSopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0
8d9f0492fSopenharmony_ci *
9d9f0492fSopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10d9f0492fSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11d9f0492fSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12d9f0492fSopenharmony_ci * See the License for the specific language governing permissions and
13d9f0492fSopenharmony_ci * limitations under the License.
14d9f0492fSopenharmony_ci */
15d9f0492fSopenharmony_ci
16d9f0492fSopenharmony_ci#include "le_loop.h"
17d9f0492fSopenharmony_ci
18d9f0492fSopenharmony_ci#include <errno.h>
19d9f0492fSopenharmony_ci#include <sys/socket.h>
20d9f0492fSopenharmony_ci#include "securec.h"
21d9f0492fSopenharmony_ci
22d9f0492fSopenharmony_ci#include "le_socket.h"
23d9f0492fSopenharmony_ci#include "le_task.h"
24d9f0492fSopenharmony_ci
25d9f0492fSopenharmony_cistatic LE_STATUS HandleSendMsg_(const LoopHandle loopHandle,
26d9f0492fSopenharmony_ci    const TaskHandle taskHandle, const LE_SendMessageComplete complete)
27d9f0492fSopenharmony_ci{
28d9f0492fSopenharmony_ci    EventLoop *loop = (EventLoop *)loopHandle;
29d9f0492fSopenharmony_ci    StreamTask *stream = (StreamTask *)taskHandle;
30d9f0492fSopenharmony_ci    LE_Buffer *buffer = GetFirstBuffer(stream);
31d9f0492fSopenharmony_ci    while (buffer) {
32d9f0492fSopenharmony_ci        int ret = write(GetSocketFd(taskHandle), buffer->data, buffer->dataSize);
33d9f0492fSopenharmony_ci        if (ret < 0 || (size_t)ret < buffer->dataSize) {
34d9f0492fSopenharmony_ci            LE_LOGE("HandleSendMsg_ fd:%d send data size %d %d, err:%d", GetSocketFd(taskHandle),
35d9f0492fSopenharmony_ci                    buffer->dataSize, ret, errno);
36d9f0492fSopenharmony_ci        }
37d9f0492fSopenharmony_ci        LE_LOGV("HandleSendMsg_ fd:%d send data size %d %d", GetSocketFd(taskHandle), buffer->dataSize, ret);
38d9f0492fSopenharmony_ci        buffer->result = (ret == (int)buffer->dataSize) ? 0 : errno;
39d9f0492fSopenharmony_ci        if (complete != NULL) {
40d9f0492fSopenharmony_ci            complete(taskHandle, buffer);
41d9f0492fSopenharmony_ci        }
42d9f0492fSopenharmony_ci        FreeBuffer(loopHandle, stream, buffer);
43d9f0492fSopenharmony_ci        buffer = GetFirstBuffer(stream);
44d9f0492fSopenharmony_ci    }
45d9f0492fSopenharmony_ci    if (IsBufferEmpty(stream)) {
46d9f0492fSopenharmony_ci        LE_LOGV("HandleSendMsg_ fd:%d empty wait read", GetSocketFd(taskHandle));
47d9f0492fSopenharmony_ci        loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_READ);
48d9f0492fSopenharmony_ci        return LE_SUCCESS;
49d9f0492fSopenharmony_ci    }
50d9f0492fSopenharmony_ci    return LE_SUCCESS;
51d9f0492fSopenharmony_ci}
52d9f0492fSopenharmony_ci
53d9f0492fSopenharmony_cistatic LE_STATUS HandleRecvMsg_(const LoopHandle loopHandle,
54d9f0492fSopenharmony_ci    const TaskHandle taskHandle, const LE_RecvMessage recvMessage, const LE_HandleRecvMsg handleRecvMsg)
55d9f0492fSopenharmony_ci{
56d9f0492fSopenharmony_ci    LE_STATUS status = LE_SUCCESS;
57d9f0492fSopenharmony_ci    LE_Buffer *buffer = CreateBuffer(LOOP_DEFAULT_BUFFER);
58d9f0492fSopenharmony_ci    LE_CHECK(buffer != NULL, return LE_NO_MEMORY, "Failed to create buffer");
59d9f0492fSopenharmony_ci    int readLen = 0;
60d9f0492fSopenharmony_ci    while (1) {
61d9f0492fSopenharmony_ci        if (handleRecvMsg != NULL) {
62d9f0492fSopenharmony_ci            readLen = handleRecvMsg(taskHandle, buffer->data, LOOP_DEFAULT_BUFFER, 0);
63d9f0492fSopenharmony_ci        } else {
64d9f0492fSopenharmony_ci            readLen = recv(GetSocketFd(taskHandle), buffer->data, LOOP_DEFAULT_BUFFER, 0);
65d9f0492fSopenharmony_ci        }
66d9f0492fSopenharmony_ci        LE_LOGV("HandleRecvMsg fd:%d read msg len %d", GetSocketFd(taskHandle), readLen);
67d9f0492fSopenharmony_ci        if (readLen < 0) {
68d9f0492fSopenharmony_ci            if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
69d9f0492fSopenharmony_ci                continue;
70d9f0492fSopenharmony_ci            }
71d9f0492fSopenharmony_ci            status = LE_DIS_CONNECTED;
72d9f0492fSopenharmony_ci            break;
73d9f0492fSopenharmony_ci        } else if (readLen == 0) {
74d9f0492fSopenharmony_ci            // 若另一端已关闭连接则返回0,这种关闭是对方主动且正常的关闭
75d9f0492fSopenharmony_ci            status = LE_DIS_CONNECTED;
76d9f0492fSopenharmony_ci            break;
77d9f0492fSopenharmony_ci        } else {
78d9f0492fSopenharmony_ci            break;
79d9f0492fSopenharmony_ci        }
80d9f0492fSopenharmony_ci    }
81d9f0492fSopenharmony_ci    if (status != LE_SUCCESS) {
82d9f0492fSopenharmony_ci        FreeBuffer(loopHandle, NULL, buffer);
83d9f0492fSopenharmony_ci        return status;
84d9f0492fSopenharmony_ci    }
85d9f0492fSopenharmony_ci    if (recvMessage) {
86d9f0492fSopenharmony_ci        recvMessage(taskHandle, buffer->data, readLen);
87d9f0492fSopenharmony_ci    }
88d9f0492fSopenharmony_ci    FreeBuffer(loopHandle, NULL, buffer);
89d9f0492fSopenharmony_ci    return status;
90d9f0492fSopenharmony_ci}
91d9f0492fSopenharmony_ci
92d9f0492fSopenharmony_cistatic LE_STATUS HandleStreamEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
93d9f0492fSopenharmony_ci{
94d9f0492fSopenharmony_ci    StreamConnectTask *stream = (StreamConnectTask *)handle;
95d9f0492fSopenharmony_ci    LE_LOGV("HandleStreamEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
96d9f0492fSopenharmony_ci
97d9f0492fSopenharmony_ci    LE_STATUS status = LE_SUCCESS;
98d9f0492fSopenharmony_ci    if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
99d9f0492fSopenharmony_ci        status = HandleSendMsg_(loopHandle, handle, stream->sendMessageComplete);
100d9f0492fSopenharmony_ci    }
101d9f0492fSopenharmony_ci    if (LE_TEST_FLAGS(oper, EVENT_READ)) {
102d9f0492fSopenharmony_ci        status = HandleRecvMsg_(loopHandle, handle, stream->recvMessage, stream->handleRecvMsg);
103d9f0492fSopenharmony_ci    }
104d9f0492fSopenharmony_ci    if (LE_TEST_FLAGS(oper, EVENT_ERROR)) {
105d9f0492fSopenharmony_ci        if (stream->disConnectComplete) {
106d9f0492fSopenharmony_ci            stream->disConnectComplete(handle);
107d9f0492fSopenharmony_ci        }
108d9f0492fSopenharmony_ci        LE_CloseStreamTask(loopHandle, handle);
109d9f0492fSopenharmony_ci    }
110d9f0492fSopenharmony_ci    return status;
111d9f0492fSopenharmony_ci}
112d9f0492fSopenharmony_ci
113d9f0492fSopenharmony_cistatic LE_STATUS HandleClientEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
114d9f0492fSopenharmony_ci{
115d9f0492fSopenharmony_ci    StreamClientTask *client = (StreamClientTask *)handle;
116d9f0492fSopenharmony_ci    LE_LOGV("HandleClientEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
117d9f0492fSopenharmony_ci
118d9f0492fSopenharmony_ci    LE_STATUS status = LE_SUCCESS;
119d9f0492fSopenharmony_ci    if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
120d9f0492fSopenharmony_ci        LE_ONLY_CHECK(!(client->connected == 0 && client->connectComplete), client->connectComplete(handle));
121d9f0492fSopenharmony_ci        client->connected = 1;
122d9f0492fSopenharmony_ci        status = HandleSendMsg_(loopHandle, handle, client->sendMessageComplete);
123d9f0492fSopenharmony_ci    }
124d9f0492fSopenharmony_ci    if (LE_TEST_FLAGS(oper, EVENT_READ)) {
125d9f0492fSopenharmony_ci        status = HandleRecvMsg_(loopHandle, handle, client->recvMessage, client->handleRecvMsg);
126d9f0492fSopenharmony_ci    }
127d9f0492fSopenharmony_ci    if (status == LE_DIS_CONNECTED) {
128d9f0492fSopenharmony_ci        if (client->disConnectComplete) {
129d9f0492fSopenharmony_ci            client->disConnectComplete(handle);
130d9f0492fSopenharmony_ci        }
131d9f0492fSopenharmony_ci        client->connected = 0;
132d9f0492fSopenharmony_ci        LE_CloseStreamTask(loopHandle, handle);
133d9f0492fSopenharmony_ci    }
134d9f0492fSopenharmony_ci    return status;
135d9f0492fSopenharmony_ci}
136d9f0492fSopenharmony_ci
137d9f0492fSopenharmony_cistatic void HandleStreamTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle)
138d9f0492fSopenharmony_ci{
139d9f0492fSopenharmony_ci    BaseTask *task = (BaseTask *)taskHandle;
140d9f0492fSopenharmony_ci    DelTask((EventLoop *)loopHandle, task);
141d9f0492fSopenharmony_ci    CloseTask(loopHandle, task);
142d9f0492fSopenharmony_ci    if (task->taskId.fd > 0) {
143d9f0492fSopenharmony_ci        close(task->taskId.fd);
144d9f0492fSopenharmony_ci    }
145d9f0492fSopenharmony_ci}
146d9f0492fSopenharmony_ci
147d9f0492fSopenharmony_cistatic void DumpStreamServerTaskInfo_(const TaskHandle task)
148d9f0492fSopenharmony_ci{
149d9f0492fSopenharmony_ci    INIT_CHECK(task != NULL, return);
150d9f0492fSopenharmony_ci    BaseTask *baseTask = (BaseTask *)task;
151d9f0492fSopenharmony_ci    StreamServerTask *serverTask = (StreamServerTask *)baseTask;
152d9f0492fSopenharmony_ci    printf("\tfd: %d \n", serverTask->base.taskId.fd);
153d9f0492fSopenharmony_ci    printf("\t  TaskType: %s \n", "ServerTask");
154d9f0492fSopenharmony_ci    if (strlen(serverTask->server) > 0) {
155d9f0492fSopenharmony_ci        printf("\t  Server socket:%s \n", serverTask->server);
156d9f0492fSopenharmony_ci    } else {
157d9f0492fSopenharmony_ci        printf("\t  Server socket:%s \n", "NULL");
158d9f0492fSopenharmony_ci    }
159d9f0492fSopenharmony_ci}
160d9f0492fSopenharmony_ci
161d9f0492fSopenharmony_cistatic void DumpStreamConnectTaskInfo_(const TaskHandle task)
162d9f0492fSopenharmony_ci{
163d9f0492fSopenharmony_ci    INIT_CHECK(task != NULL, return);
164d9f0492fSopenharmony_ci    BaseTask *baseTask = (BaseTask *)task;
165d9f0492fSopenharmony_ci    StreamConnectTask *connectTask = (StreamConnectTask *)baseTask;
166d9f0492fSopenharmony_ci    TaskHandle taskHandle = (TaskHandle)connectTask;
167d9f0492fSopenharmony_ci    printf("\tfd: %d \n", connectTask->stream.base.taskId.fd);
168d9f0492fSopenharmony_ci    printf("\t  TaskType: %s \n", "ConnectTask");
169d9f0492fSopenharmony_ci    printf("\t  ServiceInfo: \n");
170d9f0492fSopenharmony_ci    struct ucred cred = {-1, -1, -1};
171d9f0492fSopenharmony_ci    socklen_t credSize  = sizeof(struct ucred);
172d9f0492fSopenharmony_ci    if (getsockopt(LE_GetSocketFd(taskHandle), SOL_SOCKET, SO_PEERCRED, &cred, &credSize) == 0) {
173d9f0492fSopenharmony_ci        printf("\t    Service Pid: %d \n", cred.pid);
174d9f0492fSopenharmony_ci        printf("\t    Service Uid: %u \n", cred.uid);
175d9f0492fSopenharmony_ci        printf("\t    Service Gid: %u \n", cred.gid);
176d9f0492fSopenharmony_ci    } else {
177d9f0492fSopenharmony_ci        printf("\t    Service Pid: %s \n", "NULL");
178d9f0492fSopenharmony_ci        printf("\t    Service Uid: %s \n", "NULL");
179d9f0492fSopenharmony_ci        printf("\t    Service Gid: %s \n", "NULL");
180d9f0492fSopenharmony_ci    }
181d9f0492fSopenharmony_ci}
182d9f0492fSopenharmony_ci
183d9f0492fSopenharmony_cistatic LE_STATUS HandleServerEvent_(const LoopHandle loopHandle, const TaskHandle serverTask, uint32_t oper)
184d9f0492fSopenharmony_ci{
185d9f0492fSopenharmony_ci    LE_LOGV("HandleServerEvent_ fd %d oper 0x%x", GetSocketFd(serverTask), oper);
186d9f0492fSopenharmony_ci    if (!LE_TEST_FLAGS(oper, EVENT_READ)) {
187d9f0492fSopenharmony_ci        return LE_FAILURE;
188d9f0492fSopenharmony_ci    }
189d9f0492fSopenharmony_ci    StreamServerTask *server = (StreamServerTask *)serverTask;
190d9f0492fSopenharmony_ci    LE_ONLY_CHECK(server->incommingConnect != NULL, return LE_SUCCESS);
191d9f0492fSopenharmony_ci
192d9f0492fSopenharmony_ci    int ret = server->incommingConnect(loopHandle, serverTask);
193d9f0492fSopenharmony_ci    if (ret != LE_SUCCESS) {
194d9f0492fSopenharmony_ci        LE_LOGE("HandleServerEvent_ fd %d do not accept socket", GetSocketFd(serverTask));
195d9f0492fSopenharmony_ci    }
196d9f0492fSopenharmony_ci    EventLoop *loop = (EventLoop *)loopHandle;
197d9f0492fSopenharmony_ci    loop->modEvent(loop, (const BaseTask *)serverTask, EVENT_READ);
198d9f0492fSopenharmony_ci    return LE_SUCCESS;
199d9f0492fSopenharmony_ci}
200d9f0492fSopenharmony_ci
201d9f0492fSopenharmony_ciLE_STATUS LE_CreateStreamServer(const LoopHandle loopHandle,
202d9f0492fSopenharmony_ci    TaskHandle *taskHandle, const LE_StreamServerInfo *info)
203d9f0492fSopenharmony_ci{
204d9f0492fSopenharmony_ci    LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
205d9f0492fSopenharmony_ci    LE_CHECK(info->server != NULL, return LE_INVALID_PARAM, "Invalid parameters server");
206d9f0492fSopenharmony_ci    LE_CHECK(info->incommingConnect != NULL, return LE_INVALID_PARAM,
207d9f0492fSopenharmony_ci        "Invalid parameters incommingConnect %s", info->server);
208d9f0492fSopenharmony_ci
209d9f0492fSopenharmony_ci    int fd = info->socketId;
210d9f0492fSopenharmony_ci    int ret = 0;
211d9f0492fSopenharmony_ci    if (info->socketId <= 0) {
212d9f0492fSopenharmony_ci        fd = CreateSocket(info->baseInfo.flags, info->server);
213d9f0492fSopenharmony_ci        LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
214d9f0492fSopenharmony_ci    } else {
215d9f0492fSopenharmony_ci        ret = listenSocket(fd, info->baseInfo.flags, info->server);
216d9f0492fSopenharmony_ci        LE_CHECK(ret == 0, return LE_FAILURE, "Failed to listen socket %s", info->server);
217d9f0492fSopenharmony_ci    }
218d9f0492fSopenharmony_ci
219d9f0492fSopenharmony_ci    EventLoop *loop = (EventLoop *)loopHandle;
220d9f0492fSopenharmony_ci    StreamServerTask *task = (StreamServerTask *)CreateTask(loopHandle, fd, &info->baseInfo,
221d9f0492fSopenharmony_ci        sizeof(StreamServerTask) + strlen(info->server) + 1);
222d9f0492fSopenharmony_ci    LE_CHECK(task != NULL, close(fd);
223d9f0492fSopenharmony_ci        return LE_NO_MEMORY, "Failed to create task");
224d9f0492fSopenharmony_ci    task->base.handleEvent = HandleServerEvent_;
225d9f0492fSopenharmony_ci    task->base.innerClose = HandleStreamTaskClose_;
226d9f0492fSopenharmony_ci    task->base.dumpTaskInfo = DumpStreamServerTaskInfo_;
227d9f0492fSopenharmony_ci    task->incommingConnect = info->incommingConnect;
228d9f0492fSopenharmony_ci    loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
229d9f0492fSopenharmony_ci    ret = memcpy_s(task->server, strlen(info->server) + 1, info->server, strlen(info->server) + 1);
230d9f0492fSopenharmony_ci    LE_CHECK(ret == 0, return LE_FAILURE, "Failed to copy server name %s", info->server);
231d9f0492fSopenharmony_ci    *taskHandle = (TaskHandle)task;
232d9f0492fSopenharmony_ci    return LE_SUCCESS;
233d9f0492fSopenharmony_ci}
234d9f0492fSopenharmony_ci
235d9f0492fSopenharmony_ciLE_STATUS LE_CreateStreamClient(const LoopHandle loopHandle,
236d9f0492fSopenharmony_ci    TaskHandle *taskHandle, const LE_StreamInfo *info)
237d9f0492fSopenharmony_ci{
238d9f0492fSopenharmony_ci    LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
239d9f0492fSopenharmony_ci    LE_CHECK(info->recvMessage != NULL, return LE_FAILURE, "Invalid parameters recvMessage %s", info->server);
240d9f0492fSopenharmony_ci
241d9f0492fSopenharmony_ci    int fd = CreateSocket(info->baseInfo.flags, info->server);
242d9f0492fSopenharmony_ci    LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
243d9f0492fSopenharmony_ci
244d9f0492fSopenharmony_ci    StreamClientTask *task = (StreamClientTask *)CreateTask(loopHandle, fd, &info->baseInfo, sizeof(StreamClientTask));
245d9f0492fSopenharmony_ci    LE_CHECK(task != NULL, close(fd);
246d9f0492fSopenharmony_ci        return LE_NO_MEMORY, "Failed to create task");
247d9f0492fSopenharmony_ci    task->stream.base.handleEvent = HandleClientEvent_;
248d9f0492fSopenharmony_ci    task->stream.base.innerClose = HandleStreamTaskClose_;
249d9f0492fSopenharmony_ci    OH_ListInit(&task->stream.buffHead);
250d9f0492fSopenharmony_ci    LoopMutexInit(&task->stream.mutex);
251d9f0492fSopenharmony_ci
252d9f0492fSopenharmony_ci    task->connectComplete = info->connectComplete;
253d9f0492fSopenharmony_ci    task->sendMessageComplete = info->sendMessageComplete;
254d9f0492fSopenharmony_ci    task->recvMessage = info->recvMessage;
255d9f0492fSopenharmony_ci    task->disConnectComplete = info->disConnectComplete;
256d9f0492fSopenharmony_ci    task->handleRecvMsg = info->handleRecvMsg;
257d9f0492fSopenharmony_ci    EventLoop *loop = (EventLoop *)loopHandle;
258d9f0492fSopenharmony_ci    loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
259d9f0492fSopenharmony_ci    *taskHandle = (TaskHandle)task;
260d9f0492fSopenharmony_ci    return LE_SUCCESS;
261d9f0492fSopenharmony_ci}
262d9f0492fSopenharmony_ci
263d9f0492fSopenharmony_ciLE_STATUS LE_AcceptStreamClient(const LoopHandle loopHandle, const TaskHandle server,
264d9f0492fSopenharmony_ci    TaskHandle *taskHandle, const LE_StreamInfo *info)
265d9f0492fSopenharmony_ci{
266d9f0492fSopenharmony_ci    LE_CHECK(loopHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
267d9f0492fSopenharmony_ci    LE_CHECK(server != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
268d9f0492fSopenharmony_ci    LE_CHECK(info->recvMessage != NULL, return LE_INVALID_PARAM, "Invalid parameters recvMessage");
269d9f0492fSopenharmony_ci    int fd = -1;
270d9f0492fSopenharmony_ci    if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
271d9f0492fSopenharmony_ci        fd = AcceptSocket(GetSocketFd(server), info->baseInfo.flags);
272d9f0492fSopenharmony_ci        LE_CHECK(fd > 0, return LE_FAILURE, "Failed to accept socket %d", GetSocketFd(server));
273d9f0492fSopenharmony_ci    }
274d9f0492fSopenharmony_ci    StreamConnectTask *task = (StreamConnectTask *)CreateTask(
275d9f0492fSopenharmony_ci        loopHandle, fd, &info->baseInfo, sizeof(StreamConnectTask));
276d9f0492fSopenharmony_ci    LE_CHECK(task != NULL, close(fd);
277d9f0492fSopenharmony_ci        return LE_NO_MEMORY, "Failed to create task");
278d9f0492fSopenharmony_ci    task->stream.base.handleEvent = HandleStreamEvent_;
279d9f0492fSopenharmony_ci    task->stream.base.innerClose = HandleStreamTaskClose_;
280d9f0492fSopenharmony_ci    task->stream.base.dumpTaskInfo = DumpStreamConnectTaskInfo_;
281d9f0492fSopenharmony_ci    task->disConnectComplete = info->disConnectComplete;
282d9f0492fSopenharmony_ci    task->sendMessageComplete = info->sendMessageComplete;
283d9f0492fSopenharmony_ci    task->recvMessage = info->recvMessage;
284d9f0492fSopenharmony_ci    task->serverTask = (StreamServerTask *)server;
285d9f0492fSopenharmony_ci    task->handleRecvMsg = info->handleRecvMsg;
286d9f0492fSopenharmony_ci    OH_ListInit(&task->stream.buffHead);
287d9f0492fSopenharmony_ci    LoopMutexInit(&task->stream.mutex);
288d9f0492fSopenharmony_ci    if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
289d9f0492fSopenharmony_ci        EventLoop *loop = (EventLoop *)loopHandle;
290d9f0492fSopenharmony_ci        loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
291d9f0492fSopenharmony_ci    }
292d9f0492fSopenharmony_ci    *taskHandle = (TaskHandle)task;
293d9f0492fSopenharmony_ci    return 0;
294d9f0492fSopenharmony_ci}
295d9f0492fSopenharmony_ci
296d9f0492fSopenharmony_civoid LE_CloseStreamTask(const LoopHandle loopHandle, const TaskHandle taskHandle)
297d9f0492fSopenharmony_ci{
298d9f0492fSopenharmony_ci    LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
299d9f0492fSopenharmony_ci    LE_CloseTask(loopHandle, taskHandle);
300d9f0492fSopenharmony_ci}
301d9f0492fSopenharmony_ci
302d9f0492fSopenharmony_ciint LE_GetSocketFd(const TaskHandle taskHandle)
303d9f0492fSopenharmony_ci{
304d9f0492fSopenharmony_ci    LE_CHECK(taskHandle != NULL, return -1, "Invalid parameters");
305d9f0492fSopenharmony_ci    return GetSocketFd(taskHandle);
306d9f0492fSopenharmony_ci}
307