1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "le_loop.h"
17 
18 #include <errno.h>
19 #include <sys/socket.h>
20 #include "securec.h"
21 
22 #include "le_socket.h"
23 #include "le_task.h"
24 
HandleSendMsg_(const LoopHandle loopHandle, const TaskHandle taskHandle, const LE_SendMessageComplete complete)25 static LE_STATUS HandleSendMsg_(const LoopHandle loopHandle,
26     const TaskHandle taskHandle, const LE_SendMessageComplete complete)
27 {
28     EventLoop *loop = (EventLoop *)loopHandle;
29     StreamTask *stream = (StreamTask *)taskHandle;
30     LE_Buffer *buffer = GetFirstBuffer(stream);
31     while (buffer) {
32         int ret = write(GetSocketFd(taskHandle), buffer->data, buffer->dataSize);
33         if (ret < 0 || (size_t)ret < buffer->dataSize) {
34             LE_LOGE("HandleSendMsg_ fd:%d send data size %d %d, err:%d", GetSocketFd(taskHandle),
35                     buffer->dataSize, ret, errno);
36         }
37         LE_LOGV("HandleSendMsg_ fd:%d send data size %d %d", GetSocketFd(taskHandle), buffer->dataSize, ret);
38         buffer->result = (ret == (int)buffer->dataSize) ? 0 : errno;
39         if (complete != NULL) {
40             complete(taskHandle, buffer);
41         }
42         FreeBuffer(loopHandle, stream, buffer);
43         buffer = GetFirstBuffer(stream);
44     }
45     if (IsBufferEmpty(stream)) {
46         LE_LOGV("HandleSendMsg_ fd:%d empty wait read", GetSocketFd(taskHandle));
47         loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_READ);
48         return LE_SUCCESS;
49     }
50     return LE_SUCCESS;
51 }
52 
HandleRecvMsg_(const LoopHandle loopHandle, const TaskHandle taskHandle, const LE_RecvMessage recvMessage, const LE_HandleRecvMsg handleRecvMsg)53 static LE_STATUS HandleRecvMsg_(const LoopHandle loopHandle,
54     const TaskHandle taskHandle, const LE_RecvMessage recvMessage, const LE_HandleRecvMsg handleRecvMsg)
55 {
56     LE_STATUS status = LE_SUCCESS;
57     LE_Buffer *buffer = CreateBuffer(LOOP_DEFAULT_BUFFER);
58     LE_CHECK(buffer != NULL, return LE_NO_MEMORY, "Failed to create buffer");
59     int readLen = 0;
60     while (1) {
61         if (handleRecvMsg != NULL) {
62             readLen = handleRecvMsg(taskHandle, buffer->data, LOOP_DEFAULT_BUFFER, 0);
63         } else {
64             readLen = recv(GetSocketFd(taskHandle), buffer->data, LOOP_DEFAULT_BUFFER, 0);
65         }
66         LE_LOGV("HandleRecvMsg fd:%d read msg len %d", GetSocketFd(taskHandle), readLen);
67         if (readLen < 0) {
68             if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
69                 continue;
70             }
71             status = LE_DIS_CONNECTED;
72             break;
73         } else if (readLen == 0) {
74             // 若另一端已关闭连接则返回0,这种关闭是对方主动且正常的关闭
75             status = LE_DIS_CONNECTED;
76             break;
77         } else {
78             break;
79         }
80     }
81     if (status != LE_SUCCESS) {
82         FreeBuffer(loopHandle, NULL, buffer);
83         return status;
84     }
85     if (recvMessage) {
86         recvMessage(taskHandle, buffer->data, readLen);
87     }
88     FreeBuffer(loopHandle, NULL, buffer);
89     return status;
90 }
91 
HandleStreamEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)92 static LE_STATUS HandleStreamEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
93 {
94     StreamConnectTask *stream = (StreamConnectTask *)handle;
95     LE_LOGV("HandleStreamEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
96 
97     LE_STATUS status = LE_SUCCESS;
98     if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
99         status = HandleSendMsg_(loopHandle, handle, stream->sendMessageComplete);
100     }
101     if (LE_TEST_FLAGS(oper, EVENT_READ)) {
102         status = HandleRecvMsg_(loopHandle, handle, stream->recvMessage, stream->handleRecvMsg);
103     }
104     if (LE_TEST_FLAGS(oper, EVENT_ERROR)) {
105         if (stream->disConnectComplete) {
106             stream->disConnectComplete(handle);
107         }
108         LE_CloseStreamTask(loopHandle, handle);
109     }
110     return status;
111 }
112 
HandleClientEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)113 static LE_STATUS HandleClientEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
114 {
115     StreamClientTask *client = (StreamClientTask *)handle;
116     LE_LOGV("HandleClientEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
117 
118     LE_STATUS status = LE_SUCCESS;
119     if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
120         LE_ONLY_CHECK(!(client->connected == 0 && client->connectComplete), client->connectComplete(handle));
121         client->connected = 1;
122         status = HandleSendMsg_(loopHandle, handle, client->sendMessageComplete);
123     }
124     if (LE_TEST_FLAGS(oper, EVENT_READ)) {
125         status = HandleRecvMsg_(loopHandle, handle, client->recvMessage, client->handleRecvMsg);
126     }
127     if (status == LE_DIS_CONNECTED) {
128         if (client->disConnectComplete) {
129             client->disConnectComplete(handle);
130         }
131         client->connected = 0;
132         LE_CloseStreamTask(loopHandle, handle);
133     }
134     return status;
135 }
136 
HandleStreamTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle)137 static void HandleStreamTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle)
138 {
139     BaseTask *task = (BaseTask *)taskHandle;
140     DelTask((EventLoop *)loopHandle, task);
141     CloseTask(loopHandle, task);
142     if (task->taskId.fd > 0) {
143         close(task->taskId.fd);
144     }
145 }
146 
DumpStreamServerTaskInfo_(const TaskHandle task)147 static void DumpStreamServerTaskInfo_(const TaskHandle task)
148 {
149     INIT_CHECK(task != NULL, return);
150     BaseTask *baseTask = (BaseTask *)task;
151     StreamServerTask *serverTask = (StreamServerTask *)baseTask;
152     printf("\tfd: %d \n", serverTask->base.taskId.fd);
153     printf("\t  TaskType: %s \n", "ServerTask");
154     if (strlen(serverTask->server) > 0) {
155         printf("\t  Server socket:%s \n", serverTask->server);
156     } else {
157         printf("\t  Server socket:%s \n", "NULL");
158     }
159 }
160 
DumpStreamConnectTaskInfo_(const TaskHandle task)161 static void DumpStreamConnectTaskInfo_(const TaskHandle task)
162 {
163     INIT_CHECK(task != NULL, return);
164     BaseTask *baseTask = (BaseTask *)task;
165     StreamConnectTask *connectTask = (StreamConnectTask *)baseTask;
166     TaskHandle taskHandle = (TaskHandle)connectTask;
167     printf("\tfd: %d \n", connectTask->stream.base.taskId.fd);
168     printf("\t  TaskType: %s \n", "ConnectTask");
169     printf("\t  ServiceInfo: \n");
170     struct ucred cred = {-1, -1, -1};
171     socklen_t credSize  = sizeof(struct ucred);
172     if (getsockopt(LE_GetSocketFd(taskHandle), SOL_SOCKET, SO_PEERCRED, &cred, &credSize) == 0) {
173         printf("\t    Service Pid: %d \n", cred.pid);
174         printf("\t    Service Uid: %u \n", cred.uid);
175         printf("\t    Service Gid: %u \n", cred.gid);
176     } else {
177         printf("\t    Service Pid: %s \n", "NULL");
178         printf("\t    Service Uid: %s \n", "NULL");
179         printf("\t    Service Gid: %s \n", "NULL");
180     }
181 }
182 
HandleServerEvent_(const LoopHandle loopHandle, const TaskHandle serverTask, uint32_t oper)183 static LE_STATUS HandleServerEvent_(const LoopHandle loopHandle, const TaskHandle serverTask, uint32_t oper)
184 {
185     LE_LOGV("HandleServerEvent_ fd %d oper 0x%x", GetSocketFd(serverTask), oper);
186     if (!LE_TEST_FLAGS(oper, EVENT_READ)) {
187         return LE_FAILURE;
188     }
189     StreamServerTask *server = (StreamServerTask *)serverTask;
190     LE_ONLY_CHECK(server->incommingConnect != NULL, return LE_SUCCESS);
191 
192     int ret = server->incommingConnect(loopHandle, serverTask);
193     if (ret != LE_SUCCESS) {
194         LE_LOGE("HandleServerEvent_ fd %d do not accept socket", GetSocketFd(serverTask));
195     }
196     EventLoop *loop = (EventLoop *)loopHandle;
197     loop->modEvent(loop, (const BaseTask *)serverTask, EVENT_READ);
198     return LE_SUCCESS;
199 }
200 
LE_CreateStreamServer(const LoopHandle loopHandle, TaskHandle *taskHandle, const LE_StreamServerInfo *info)201 LE_STATUS LE_CreateStreamServer(const LoopHandle loopHandle,
202     TaskHandle *taskHandle, const LE_StreamServerInfo *info)
203 {
204     LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
205     LE_CHECK(info->server != NULL, return LE_INVALID_PARAM, "Invalid parameters server");
206     LE_CHECK(info->incommingConnect != NULL, return LE_INVALID_PARAM,
207         "Invalid parameters incommingConnect %s", info->server);
208 
209     int fd = info->socketId;
210     int ret = 0;
211     if (info->socketId <= 0) {
212         fd = CreateSocket(info->baseInfo.flags, info->server);
213         LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
214     } else {
215         ret = listenSocket(fd, info->baseInfo.flags, info->server);
216         LE_CHECK(ret == 0, return LE_FAILURE, "Failed to listen socket %s", info->server);
217     }
218 
219     EventLoop *loop = (EventLoop *)loopHandle;
220     StreamServerTask *task = (StreamServerTask *)CreateTask(loopHandle, fd, &info->baseInfo,
221         sizeof(StreamServerTask) + strlen(info->server) + 1);
222     LE_CHECK(task != NULL, close(fd);
223         return LE_NO_MEMORY, "Failed to create task");
224     task->base.handleEvent = HandleServerEvent_;
225     task->base.innerClose = HandleStreamTaskClose_;
226     task->base.dumpTaskInfo = DumpStreamServerTaskInfo_;
227     task->incommingConnect = info->incommingConnect;
228     loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
229     ret = memcpy_s(task->server, strlen(info->server) + 1, info->server, strlen(info->server) + 1);
230     LE_CHECK(ret == 0, return LE_FAILURE, "Failed to copy server name %s", info->server);
231     *taskHandle = (TaskHandle)task;
232     return LE_SUCCESS;
233 }
234 
LE_CreateStreamClient(const LoopHandle loopHandle, TaskHandle *taskHandle, const LE_StreamInfo *info)235 LE_STATUS LE_CreateStreamClient(const LoopHandle loopHandle,
236     TaskHandle *taskHandle, const LE_StreamInfo *info)
237 {
238     LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
239     LE_CHECK(info->recvMessage != NULL, return LE_FAILURE, "Invalid parameters recvMessage %s", info->server);
240 
241     int fd = CreateSocket(info->baseInfo.flags, info->server);
242     LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
243 
244     StreamClientTask *task = (StreamClientTask *)CreateTask(loopHandle, fd, &info->baseInfo, sizeof(StreamClientTask));
245     LE_CHECK(task != NULL, close(fd);
246         return LE_NO_MEMORY, "Failed to create task");
247     task->stream.base.handleEvent = HandleClientEvent_;
248     task->stream.base.innerClose = HandleStreamTaskClose_;
249     OH_ListInit(&task->stream.buffHead);
250     LoopMutexInit(&task->stream.mutex);
251 
252     task->connectComplete = info->connectComplete;
253     task->sendMessageComplete = info->sendMessageComplete;
254     task->recvMessage = info->recvMessage;
255     task->disConnectComplete = info->disConnectComplete;
256     task->handleRecvMsg = info->handleRecvMsg;
257     EventLoop *loop = (EventLoop *)loopHandle;
258     loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
259     *taskHandle = (TaskHandle)task;
260     return LE_SUCCESS;
261 }
262 
LE_AcceptStreamClient(const LoopHandle loopHandle, const TaskHandle server, TaskHandle *taskHandle, const LE_StreamInfo *info)263 LE_STATUS LE_AcceptStreamClient(const LoopHandle loopHandle, const TaskHandle server,
264     TaskHandle *taskHandle, const LE_StreamInfo *info)
265 {
266     LE_CHECK(loopHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
267     LE_CHECK(server != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
268     LE_CHECK(info->recvMessage != NULL, return LE_INVALID_PARAM, "Invalid parameters recvMessage");
269     int fd = -1;
270     if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
271         fd = AcceptSocket(GetSocketFd(server), info->baseInfo.flags);
272         LE_CHECK(fd > 0, return LE_FAILURE, "Failed to accept socket %d", GetSocketFd(server));
273     }
274     StreamConnectTask *task = (StreamConnectTask *)CreateTask(
275         loopHandle, fd, &info->baseInfo, sizeof(StreamConnectTask));
276     LE_CHECK(task != NULL, close(fd);
277         return LE_NO_MEMORY, "Failed to create task");
278     task->stream.base.handleEvent = HandleStreamEvent_;
279     task->stream.base.innerClose = HandleStreamTaskClose_;
280     task->stream.base.dumpTaskInfo = DumpStreamConnectTaskInfo_;
281     task->disConnectComplete = info->disConnectComplete;
282     task->sendMessageComplete = info->sendMessageComplete;
283     task->recvMessage = info->recvMessage;
284     task->serverTask = (StreamServerTask *)server;
285     task->handleRecvMsg = info->handleRecvMsg;
286     OH_ListInit(&task->stream.buffHead);
287     LoopMutexInit(&task->stream.mutex);
288     if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
289         EventLoop *loop = (EventLoop *)loopHandle;
290         loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
291     }
292     *taskHandle = (TaskHandle)task;
293     return 0;
294 }
295 
LE_CloseStreamTask(const LoopHandle loopHandle, const TaskHandle taskHandle)296 void LE_CloseStreamTask(const LoopHandle loopHandle, const TaskHandle taskHandle)
297 {
298     LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
299     LE_CloseTask(loopHandle, taskHandle);
300 }
301 
LE_GetSocketFd(const TaskHandle taskHandle)302 int LE_GetSocketFd(const TaskHandle taskHandle)
303 {
304     LE_CHECK(taskHandle != NULL, return -1, "Invalid parameters");
305     return GetSocketFd(taskHandle);
306 }
307