1/*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "le_loop.h"
17
18#include <errno.h>
19#include <sys/socket.h>
20#include "securec.h"
21
22#include "le_socket.h"
23#include "le_task.h"
24
25static LE_STATUS HandleSendMsg_(const LoopHandle loopHandle,
26    const TaskHandle taskHandle, const LE_SendMessageComplete complete)
27{
28    EventLoop *loop = (EventLoop *)loopHandle;
29    StreamTask *stream = (StreamTask *)taskHandle;
30    LE_Buffer *buffer = GetFirstBuffer(stream);
31    while (buffer) {
32        int ret = write(GetSocketFd(taskHandle), buffer->data, buffer->dataSize);
33        if (ret < 0 || (size_t)ret < buffer->dataSize) {
34            LE_LOGE("HandleSendMsg_ fd:%d send data size %d %d, err:%d", GetSocketFd(taskHandle),
35                    buffer->dataSize, ret, errno);
36        }
37        LE_LOGV("HandleSendMsg_ fd:%d send data size %d %d", GetSocketFd(taskHandle), buffer->dataSize, ret);
38        buffer->result = (ret == (int)buffer->dataSize) ? 0 : errno;
39        if (complete != NULL) {
40            complete(taskHandle, buffer);
41        }
42        FreeBuffer(loopHandle, stream, buffer);
43        buffer = GetFirstBuffer(stream);
44    }
45    if (IsBufferEmpty(stream)) {
46        LE_LOGV("HandleSendMsg_ fd:%d empty wait read", GetSocketFd(taskHandle));
47        loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_READ);
48        return LE_SUCCESS;
49    }
50    return LE_SUCCESS;
51}
52
53static LE_STATUS HandleRecvMsg_(const LoopHandle loopHandle,
54    const TaskHandle taskHandle, const LE_RecvMessage recvMessage, const LE_HandleRecvMsg handleRecvMsg)
55{
56    LE_STATUS status = LE_SUCCESS;
57    LE_Buffer *buffer = CreateBuffer(LOOP_DEFAULT_BUFFER);
58    LE_CHECK(buffer != NULL, return LE_NO_MEMORY, "Failed to create buffer");
59    int readLen = 0;
60    while (1) {
61        if (handleRecvMsg != NULL) {
62            readLen = handleRecvMsg(taskHandle, buffer->data, LOOP_DEFAULT_BUFFER, 0);
63        } else {
64            readLen = recv(GetSocketFd(taskHandle), buffer->data, LOOP_DEFAULT_BUFFER, 0);
65        }
66        LE_LOGV("HandleRecvMsg fd:%d read msg len %d", GetSocketFd(taskHandle), readLen);
67        if (readLen < 0) {
68            if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
69                continue;
70            }
71            status = LE_DIS_CONNECTED;
72            break;
73        } else if (readLen == 0) {
74            // 若另一端已关闭连接则返回0,这种关闭是对方主动且正常的关闭
75            status = LE_DIS_CONNECTED;
76            break;
77        } else {
78            break;
79        }
80    }
81    if (status != LE_SUCCESS) {
82        FreeBuffer(loopHandle, NULL, buffer);
83        return status;
84    }
85    if (recvMessage) {
86        recvMessage(taskHandle, buffer->data, readLen);
87    }
88    FreeBuffer(loopHandle, NULL, buffer);
89    return status;
90}
91
92static LE_STATUS HandleStreamEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
93{
94    StreamConnectTask *stream = (StreamConnectTask *)handle;
95    LE_LOGV("HandleStreamEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
96
97    LE_STATUS status = LE_SUCCESS;
98    if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
99        status = HandleSendMsg_(loopHandle, handle, stream->sendMessageComplete);
100    }
101    if (LE_TEST_FLAGS(oper, EVENT_READ)) {
102        status = HandleRecvMsg_(loopHandle, handle, stream->recvMessage, stream->handleRecvMsg);
103    }
104    if (LE_TEST_FLAGS(oper, EVENT_ERROR)) {
105        if (stream->disConnectComplete) {
106            stream->disConnectComplete(handle);
107        }
108        LE_CloseStreamTask(loopHandle, handle);
109    }
110    return status;
111}
112
113static LE_STATUS HandleClientEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
114{
115    StreamClientTask *client = (StreamClientTask *)handle;
116    LE_LOGV("HandleClientEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
117
118    LE_STATUS status = LE_SUCCESS;
119    if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
120        LE_ONLY_CHECK(!(client->connected == 0 && client->connectComplete), client->connectComplete(handle));
121        client->connected = 1;
122        status = HandleSendMsg_(loopHandle, handle, client->sendMessageComplete);
123    }
124    if (LE_TEST_FLAGS(oper, EVENT_READ)) {
125        status = HandleRecvMsg_(loopHandle, handle, client->recvMessage, client->handleRecvMsg);
126    }
127    if (status == LE_DIS_CONNECTED) {
128        if (client->disConnectComplete) {
129            client->disConnectComplete(handle);
130        }
131        client->connected = 0;
132        LE_CloseStreamTask(loopHandle, handle);
133    }
134    return status;
135}
136
137static void HandleStreamTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle)
138{
139    BaseTask *task = (BaseTask *)taskHandle;
140    DelTask((EventLoop *)loopHandle, task);
141    CloseTask(loopHandle, task);
142    if (task->taskId.fd > 0) {
143        close(task->taskId.fd);
144    }
145}
146
147static void DumpStreamServerTaskInfo_(const TaskHandle task)
148{
149    INIT_CHECK(task != NULL, return);
150    BaseTask *baseTask = (BaseTask *)task;
151    StreamServerTask *serverTask = (StreamServerTask *)baseTask;
152    printf("\tfd: %d \n", serverTask->base.taskId.fd);
153    printf("\t  TaskType: %s \n", "ServerTask");
154    if (strlen(serverTask->server) > 0) {
155        printf("\t  Server socket:%s \n", serverTask->server);
156    } else {
157        printf("\t  Server socket:%s \n", "NULL");
158    }
159}
160
161static void DumpStreamConnectTaskInfo_(const TaskHandle task)
162{
163    INIT_CHECK(task != NULL, return);
164    BaseTask *baseTask = (BaseTask *)task;
165    StreamConnectTask *connectTask = (StreamConnectTask *)baseTask;
166    TaskHandle taskHandle = (TaskHandle)connectTask;
167    printf("\tfd: %d \n", connectTask->stream.base.taskId.fd);
168    printf("\t  TaskType: %s \n", "ConnectTask");
169    printf("\t  ServiceInfo: \n");
170    struct ucred cred = {-1, -1, -1};
171    socklen_t credSize  = sizeof(struct ucred);
172    if (getsockopt(LE_GetSocketFd(taskHandle), SOL_SOCKET, SO_PEERCRED, &cred, &credSize) == 0) {
173        printf("\t    Service Pid: %d \n", cred.pid);
174        printf("\t    Service Uid: %u \n", cred.uid);
175        printf("\t    Service Gid: %u \n", cred.gid);
176    } else {
177        printf("\t    Service Pid: %s \n", "NULL");
178        printf("\t    Service Uid: %s \n", "NULL");
179        printf("\t    Service Gid: %s \n", "NULL");
180    }
181}
182
183static LE_STATUS HandleServerEvent_(const LoopHandle loopHandle, const TaskHandle serverTask, uint32_t oper)
184{
185    LE_LOGV("HandleServerEvent_ fd %d oper 0x%x", GetSocketFd(serverTask), oper);
186    if (!LE_TEST_FLAGS(oper, EVENT_READ)) {
187        return LE_FAILURE;
188    }
189    StreamServerTask *server = (StreamServerTask *)serverTask;
190    LE_ONLY_CHECK(server->incommingConnect != NULL, return LE_SUCCESS);
191
192    int ret = server->incommingConnect(loopHandle, serverTask);
193    if (ret != LE_SUCCESS) {
194        LE_LOGE("HandleServerEvent_ fd %d do not accept socket", GetSocketFd(serverTask));
195    }
196    EventLoop *loop = (EventLoop *)loopHandle;
197    loop->modEvent(loop, (const BaseTask *)serverTask, EVENT_READ);
198    return LE_SUCCESS;
199}
200
201LE_STATUS LE_CreateStreamServer(const LoopHandle loopHandle,
202    TaskHandle *taskHandle, const LE_StreamServerInfo *info)
203{
204    LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
205    LE_CHECK(info->server != NULL, return LE_INVALID_PARAM, "Invalid parameters server");
206    LE_CHECK(info->incommingConnect != NULL, return LE_INVALID_PARAM,
207        "Invalid parameters incommingConnect %s", info->server);
208
209    int fd = info->socketId;
210    int ret = 0;
211    if (info->socketId <= 0) {
212        fd = CreateSocket(info->baseInfo.flags, info->server);
213        LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
214    } else {
215        ret = listenSocket(fd, info->baseInfo.flags, info->server);
216        LE_CHECK(ret == 0, return LE_FAILURE, "Failed to listen socket %s", info->server);
217    }
218
219    EventLoop *loop = (EventLoop *)loopHandle;
220    StreamServerTask *task = (StreamServerTask *)CreateTask(loopHandle, fd, &info->baseInfo,
221        sizeof(StreamServerTask) + strlen(info->server) + 1);
222    LE_CHECK(task != NULL, close(fd);
223        return LE_NO_MEMORY, "Failed to create task");
224    task->base.handleEvent = HandleServerEvent_;
225    task->base.innerClose = HandleStreamTaskClose_;
226    task->base.dumpTaskInfo = DumpStreamServerTaskInfo_;
227    task->incommingConnect = info->incommingConnect;
228    loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
229    ret = memcpy_s(task->server, strlen(info->server) + 1, info->server, strlen(info->server) + 1);
230    LE_CHECK(ret == 0, return LE_FAILURE, "Failed to copy server name %s", info->server);
231    *taskHandle = (TaskHandle)task;
232    return LE_SUCCESS;
233}
234
235LE_STATUS LE_CreateStreamClient(const LoopHandle loopHandle,
236    TaskHandle *taskHandle, const LE_StreamInfo *info)
237{
238    LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
239    LE_CHECK(info->recvMessage != NULL, return LE_FAILURE, "Invalid parameters recvMessage %s", info->server);
240
241    int fd = CreateSocket(info->baseInfo.flags, info->server);
242    LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
243
244    StreamClientTask *task = (StreamClientTask *)CreateTask(loopHandle, fd, &info->baseInfo, sizeof(StreamClientTask));
245    LE_CHECK(task != NULL, close(fd);
246        return LE_NO_MEMORY, "Failed to create task");
247    task->stream.base.handleEvent = HandleClientEvent_;
248    task->stream.base.innerClose = HandleStreamTaskClose_;
249    OH_ListInit(&task->stream.buffHead);
250    LoopMutexInit(&task->stream.mutex);
251
252    task->connectComplete = info->connectComplete;
253    task->sendMessageComplete = info->sendMessageComplete;
254    task->recvMessage = info->recvMessage;
255    task->disConnectComplete = info->disConnectComplete;
256    task->handleRecvMsg = info->handleRecvMsg;
257    EventLoop *loop = (EventLoop *)loopHandle;
258    loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
259    *taskHandle = (TaskHandle)task;
260    return LE_SUCCESS;
261}
262
263LE_STATUS LE_AcceptStreamClient(const LoopHandle loopHandle, const TaskHandle server,
264    TaskHandle *taskHandle, const LE_StreamInfo *info)
265{
266    LE_CHECK(loopHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
267    LE_CHECK(server != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
268    LE_CHECK(info->recvMessage != NULL, return LE_INVALID_PARAM, "Invalid parameters recvMessage");
269    int fd = -1;
270    if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
271        fd = AcceptSocket(GetSocketFd(server), info->baseInfo.flags);
272        LE_CHECK(fd > 0, return LE_FAILURE, "Failed to accept socket %d", GetSocketFd(server));
273    }
274    StreamConnectTask *task = (StreamConnectTask *)CreateTask(
275        loopHandle, fd, &info->baseInfo, sizeof(StreamConnectTask));
276    LE_CHECK(task != NULL, close(fd);
277        return LE_NO_MEMORY, "Failed to create task");
278    task->stream.base.handleEvent = HandleStreamEvent_;
279    task->stream.base.innerClose = HandleStreamTaskClose_;
280    task->stream.base.dumpTaskInfo = DumpStreamConnectTaskInfo_;
281    task->disConnectComplete = info->disConnectComplete;
282    task->sendMessageComplete = info->sendMessageComplete;
283    task->recvMessage = info->recvMessage;
284    task->serverTask = (StreamServerTask *)server;
285    task->handleRecvMsg = info->handleRecvMsg;
286    OH_ListInit(&task->stream.buffHead);
287    LoopMutexInit(&task->stream.mutex);
288    if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
289        EventLoop *loop = (EventLoop *)loopHandle;
290        loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
291    }
292    *taskHandle = (TaskHandle)task;
293    return 0;
294}
295
296void LE_CloseStreamTask(const LoopHandle loopHandle, const TaskHandle taskHandle)
297{
298    LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
299    LE_CloseTask(loopHandle, taskHandle);
300}
301
302int LE_GetSocketFd(const TaskHandle taskHandle)
303{
304    LE_CHECK(taskHandle != NULL, return -1, "Invalid parameters");
305    return GetSocketFd(taskHandle);
306}
307