1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "le_loop.h"
17
18 #include <errno.h>
19 #include <sys/socket.h>
20 #include "securec.h"
21
22 #include "le_socket.h"
23 #include "le_task.h"
24
HandleSendMsg_(const LoopHandle loopHandle, const TaskHandle taskHandle, const LE_SendMessageComplete complete)25 static LE_STATUS HandleSendMsg_(const LoopHandle loopHandle,
26 const TaskHandle taskHandle, const LE_SendMessageComplete complete)
27 {
28 EventLoop *loop = (EventLoop *)loopHandle;
29 StreamTask *stream = (StreamTask *)taskHandle;
30 LE_Buffer *buffer = GetFirstBuffer(stream);
31 while (buffer) {
32 int ret = write(GetSocketFd(taskHandle), buffer->data, buffer->dataSize);
33 if (ret < 0 || (size_t)ret < buffer->dataSize) {
34 LE_LOGE("HandleSendMsg_ fd:%d send data size %d %d, err:%d", GetSocketFd(taskHandle),
35 buffer->dataSize, ret, errno);
36 }
37 LE_LOGV("HandleSendMsg_ fd:%d send data size %d %d", GetSocketFd(taskHandle), buffer->dataSize, ret);
38 buffer->result = (ret == (int)buffer->dataSize) ? 0 : errno;
39 if (complete != NULL) {
40 complete(taskHandle, buffer);
41 }
42 FreeBuffer(loopHandle, stream, buffer);
43 buffer = GetFirstBuffer(stream);
44 }
45 if (IsBufferEmpty(stream)) {
46 LE_LOGV("HandleSendMsg_ fd:%d empty wait read", GetSocketFd(taskHandle));
47 loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_READ);
48 return LE_SUCCESS;
49 }
50 return LE_SUCCESS;
51 }
52
HandleRecvMsg_(const LoopHandle loopHandle, const TaskHandle taskHandle, const LE_RecvMessage recvMessage, const LE_HandleRecvMsg handleRecvMsg)53 static LE_STATUS HandleRecvMsg_(const LoopHandle loopHandle,
54 const TaskHandle taskHandle, const LE_RecvMessage recvMessage, const LE_HandleRecvMsg handleRecvMsg)
55 {
56 LE_STATUS status = LE_SUCCESS;
57 LE_Buffer *buffer = CreateBuffer(LOOP_DEFAULT_BUFFER);
58 LE_CHECK(buffer != NULL, return LE_NO_MEMORY, "Failed to create buffer");
59 int readLen = 0;
60 while (1) {
61 if (handleRecvMsg != NULL) {
62 readLen = handleRecvMsg(taskHandle, buffer->data, LOOP_DEFAULT_BUFFER, 0);
63 } else {
64 readLen = recv(GetSocketFd(taskHandle), buffer->data, LOOP_DEFAULT_BUFFER, 0);
65 }
66 LE_LOGV("HandleRecvMsg fd:%d read msg len %d", GetSocketFd(taskHandle), readLen);
67 if (readLen < 0) {
68 if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
69 continue;
70 }
71 status = LE_DIS_CONNECTED;
72 break;
73 } else if (readLen == 0) {
74 // 若另一端已关闭连接则返回0,这种关闭是对方主动且正常的关闭
75 status = LE_DIS_CONNECTED;
76 break;
77 } else {
78 break;
79 }
80 }
81 if (status != LE_SUCCESS) {
82 FreeBuffer(loopHandle, NULL, buffer);
83 return status;
84 }
85 if (recvMessage) {
86 recvMessage(taskHandle, buffer->data, readLen);
87 }
88 FreeBuffer(loopHandle, NULL, buffer);
89 return status;
90 }
91
HandleStreamEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)92 static LE_STATUS HandleStreamEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
93 {
94 StreamConnectTask *stream = (StreamConnectTask *)handle;
95 LE_LOGV("HandleStreamEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
96
97 LE_STATUS status = LE_SUCCESS;
98 if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
99 status = HandleSendMsg_(loopHandle, handle, stream->sendMessageComplete);
100 }
101 if (LE_TEST_FLAGS(oper, EVENT_READ)) {
102 status = HandleRecvMsg_(loopHandle, handle, stream->recvMessage, stream->handleRecvMsg);
103 }
104 if (LE_TEST_FLAGS(oper, EVENT_ERROR)) {
105 if (stream->disConnectComplete) {
106 stream->disConnectComplete(handle);
107 }
108 LE_CloseStreamTask(loopHandle, handle);
109 }
110 return status;
111 }
112
HandleClientEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)113 static LE_STATUS HandleClientEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
114 {
115 StreamClientTask *client = (StreamClientTask *)handle;
116 LE_LOGV("HandleClientEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
117
118 LE_STATUS status = LE_SUCCESS;
119 if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
120 LE_ONLY_CHECK(!(client->connected == 0 && client->connectComplete), client->connectComplete(handle));
121 client->connected = 1;
122 status = HandleSendMsg_(loopHandle, handle, client->sendMessageComplete);
123 }
124 if (LE_TEST_FLAGS(oper, EVENT_READ)) {
125 status = HandleRecvMsg_(loopHandle, handle, client->recvMessage, client->handleRecvMsg);
126 }
127 if (status == LE_DIS_CONNECTED) {
128 if (client->disConnectComplete) {
129 client->disConnectComplete(handle);
130 }
131 client->connected = 0;
132 LE_CloseStreamTask(loopHandle, handle);
133 }
134 return status;
135 }
136
HandleStreamTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle)137 static void HandleStreamTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle)
138 {
139 BaseTask *task = (BaseTask *)taskHandle;
140 DelTask((EventLoop *)loopHandle, task);
141 CloseTask(loopHandle, task);
142 if (task->taskId.fd > 0) {
143 close(task->taskId.fd);
144 }
145 }
146
DumpStreamServerTaskInfo_(const TaskHandle task)147 static void DumpStreamServerTaskInfo_(const TaskHandle task)
148 {
149 INIT_CHECK(task != NULL, return);
150 BaseTask *baseTask = (BaseTask *)task;
151 StreamServerTask *serverTask = (StreamServerTask *)baseTask;
152 printf("\tfd: %d \n", serverTask->base.taskId.fd);
153 printf("\t TaskType: %s \n", "ServerTask");
154 if (strlen(serverTask->server) > 0) {
155 printf("\t Server socket:%s \n", serverTask->server);
156 } else {
157 printf("\t Server socket:%s \n", "NULL");
158 }
159 }
160
DumpStreamConnectTaskInfo_(const TaskHandle task)161 static void DumpStreamConnectTaskInfo_(const TaskHandle task)
162 {
163 INIT_CHECK(task != NULL, return);
164 BaseTask *baseTask = (BaseTask *)task;
165 StreamConnectTask *connectTask = (StreamConnectTask *)baseTask;
166 TaskHandle taskHandle = (TaskHandle)connectTask;
167 printf("\tfd: %d \n", connectTask->stream.base.taskId.fd);
168 printf("\t TaskType: %s \n", "ConnectTask");
169 printf("\t ServiceInfo: \n");
170 struct ucred cred = {-1, -1, -1};
171 socklen_t credSize = sizeof(struct ucred);
172 if (getsockopt(LE_GetSocketFd(taskHandle), SOL_SOCKET, SO_PEERCRED, &cred, &credSize) == 0) {
173 printf("\t Service Pid: %d \n", cred.pid);
174 printf("\t Service Uid: %u \n", cred.uid);
175 printf("\t Service Gid: %u \n", cred.gid);
176 } else {
177 printf("\t Service Pid: %s \n", "NULL");
178 printf("\t Service Uid: %s \n", "NULL");
179 printf("\t Service Gid: %s \n", "NULL");
180 }
181 }
182
HandleServerEvent_(const LoopHandle loopHandle, const TaskHandle serverTask, uint32_t oper)183 static LE_STATUS HandleServerEvent_(const LoopHandle loopHandle, const TaskHandle serverTask, uint32_t oper)
184 {
185 LE_LOGV("HandleServerEvent_ fd %d oper 0x%x", GetSocketFd(serverTask), oper);
186 if (!LE_TEST_FLAGS(oper, EVENT_READ)) {
187 return LE_FAILURE;
188 }
189 StreamServerTask *server = (StreamServerTask *)serverTask;
190 LE_ONLY_CHECK(server->incommingConnect != NULL, return LE_SUCCESS);
191
192 int ret = server->incommingConnect(loopHandle, serverTask);
193 if (ret != LE_SUCCESS) {
194 LE_LOGE("HandleServerEvent_ fd %d do not accept socket", GetSocketFd(serverTask));
195 }
196 EventLoop *loop = (EventLoop *)loopHandle;
197 loop->modEvent(loop, (const BaseTask *)serverTask, EVENT_READ);
198 return LE_SUCCESS;
199 }
200
LE_CreateStreamServer(const LoopHandle loopHandle, TaskHandle *taskHandle, const LE_StreamServerInfo *info)201 LE_STATUS LE_CreateStreamServer(const LoopHandle loopHandle,
202 TaskHandle *taskHandle, const LE_StreamServerInfo *info)
203 {
204 LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
205 LE_CHECK(info->server != NULL, return LE_INVALID_PARAM, "Invalid parameters server");
206 LE_CHECK(info->incommingConnect != NULL, return LE_INVALID_PARAM,
207 "Invalid parameters incommingConnect %s", info->server);
208
209 int fd = info->socketId;
210 int ret = 0;
211 if (info->socketId <= 0) {
212 fd = CreateSocket(info->baseInfo.flags, info->server);
213 LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
214 } else {
215 ret = listenSocket(fd, info->baseInfo.flags, info->server);
216 LE_CHECK(ret == 0, return LE_FAILURE, "Failed to listen socket %s", info->server);
217 }
218
219 EventLoop *loop = (EventLoop *)loopHandle;
220 StreamServerTask *task = (StreamServerTask *)CreateTask(loopHandle, fd, &info->baseInfo,
221 sizeof(StreamServerTask) + strlen(info->server) + 1);
222 LE_CHECK(task != NULL, close(fd);
223 return LE_NO_MEMORY, "Failed to create task");
224 task->base.handleEvent = HandleServerEvent_;
225 task->base.innerClose = HandleStreamTaskClose_;
226 task->base.dumpTaskInfo = DumpStreamServerTaskInfo_;
227 task->incommingConnect = info->incommingConnect;
228 loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
229 ret = memcpy_s(task->server, strlen(info->server) + 1, info->server, strlen(info->server) + 1);
230 LE_CHECK(ret == 0, return LE_FAILURE, "Failed to copy server name %s", info->server);
231 *taskHandle = (TaskHandle)task;
232 return LE_SUCCESS;
233 }
234
LE_CreateStreamClient(const LoopHandle loopHandle, TaskHandle *taskHandle, const LE_StreamInfo *info)235 LE_STATUS LE_CreateStreamClient(const LoopHandle loopHandle,
236 TaskHandle *taskHandle, const LE_StreamInfo *info)
237 {
238 LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
239 LE_CHECK(info->recvMessage != NULL, return LE_FAILURE, "Invalid parameters recvMessage %s", info->server);
240
241 int fd = CreateSocket(info->baseInfo.flags, info->server);
242 LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
243
244 StreamClientTask *task = (StreamClientTask *)CreateTask(loopHandle, fd, &info->baseInfo, sizeof(StreamClientTask));
245 LE_CHECK(task != NULL, close(fd);
246 return LE_NO_MEMORY, "Failed to create task");
247 task->stream.base.handleEvent = HandleClientEvent_;
248 task->stream.base.innerClose = HandleStreamTaskClose_;
249 OH_ListInit(&task->stream.buffHead);
250 LoopMutexInit(&task->stream.mutex);
251
252 task->connectComplete = info->connectComplete;
253 task->sendMessageComplete = info->sendMessageComplete;
254 task->recvMessage = info->recvMessage;
255 task->disConnectComplete = info->disConnectComplete;
256 task->handleRecvMsg = info->handleRecvMsg;
257 EventLoop *loop = (EventLoop *)loopHandle;
258 loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
259 *taskHandle = (TaskHandle)task;
260 return LE_SUCCESS;
261 }
262
LE_AcceptStreamClient(const LoopHandle loopHandle, const TaskHandle server, TaskHandle *taskHandle, const LE_StreamInfo *info)263 LE_STATUS LE_AcceptStreamClient(const LoopHandle loopHandle, const TaskHandle server,
264 TaskHandle *taskHandle, const LE_StreamInfo *info)
265 {
266 LE_CHECK(loopHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
267 LE_CHECK(server != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
268 LE_CHECK(info->recvMessage != NULL, return LE_INVALID_PARAM, "Invalid parameters recvMessage");
269 int fd = -1;
270 if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
271 fd = AcceptSocket(GetSocketFd(server), info->baseInfo.flags);
272 LE_CHECK(fd > 0, return LE_FAILURE, "Failed to accept socket %d", GetSocketFd(server));
273 }
274 StreamConnectTask *task = (StreamConnectTask *)CreateTask(
275 loopHandle, fd, &info->baseInfo, sizeof(StreamConnectTask));
276 LE_CHECK(task != NULL, close(fd);
277 return LE_NO_MEMORY, "Failed to create task");
278 task->stream.base.handleEvent = HandleStreamEvent_;
279 task->stream.base.innerClose = HandleStreamTaskClose_;
280 task->stream.base.dumpTaskInfo = DumpStreamConnectTaskInfo_;
281 task->disConnectComplete = info->disConnectComplete;
282 task->sendMessageComplete = info->sendMessageComplete;
283 task->recvMessage = info->recvMessage;
284 task->serverTask = (StreamServerTask *)server;
285 task->handleRecvMsg = info->handleRecvMsg;
286 OH_ListInit(&task->stream.buffHead);
287 LoopMutexInit(&task->stream.mutex);
288 if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
289 EventLoop *loop = (EventLoop *)loopHandle;
290 loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
291 }
292 *taskHandle = (TaskHandle)task;
293 return 0;
294 }
295
LE_CloseStreamTask(const LoopHandle loopHandle, const TaskHandle taskHandle)296 void LE_CloseStreamTask(const LoopHandle loopHandle, const TaskHandle taskHandle)
297 {
298 LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
299 LE_CloseTask(loopHandle, taskHandle);
300 }
301
LE_GetSocketFd(const TaskHandle taskHandle)302 int LE_GetSocketFd(const TaskHandle taskHandle)
303 {
304 LE_CHECK(taskHandle != NULL, return -1, "Invalid parameters");
305 return GetSocketFd(taskHandle);
306 }
307