1/*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "le_task.h"
17
18
19#include "le_loop.h"
20#include "le_utils.h"
21
22int CheckTaskFlags(const BaseTask *task, uint32_t flags)
23{
24    if (task == NULL) {
25        return 0;
26    }
27    return ((task->flags & flags) == flags);
28}
29
30int GetSocketFd(const TaskHandle task)
31{
32    BaseTask *stream = (BaseTask *)task;
33    return stream->taskId.fd;
34}
35
36BaseTask *CreateTask(const LoopHandle loopHandle, int fd, const LE_BaseInfo *info, uint32_t size)
37{
38    if ((size >= LOOP_MAX_BUFFER) || ((size + info->userDataSize) >= LOOP_MAX_BUFFER)) {
39        return NULL;
40    }
41    BaseTask *task = (BaseTask *)calloc(1, size + info->userDataSize);
42    LE_CHECK(task != NULL, return NULL, "Failed to alloc for task");
43    HASHMAPInitNode(&task->hashNode);
44    // key id
45    task->flags = info->flags;
46    task->taskId.fd = fd;
47    LE_STATUS ret = AddTask((EventLoop *)loopHandle, task);
48    LE_CHECK(ret == LE_SUCCESS, free(task);
49        return NULL, "Failed to alloc for task");
50    task->userDataSize = info->userDataSize;
51    task->userDataOffset = size;
52    task->close = info->close;
53    return task;
54}
55
56void CloseTask(const LoopHandle loopHandle, BaseTask *task)
57{
58    LE_CHECK(loopHandle != NULL && task != NULL, return, "Invalid parameters");
59    LE_LOGV("CloseTask %d", task->taskId.fd);
60    if (CheckTaskFlags(task, TASK_STREAM | TASK_CONNECT) ||
61        CheckTaskFlags(task, TASK_EVENT | TASK_ASYNC_EVENT)) {
62        StreamTask *stream = (StreamTask *)task;
63        LE_Buffer *buffer = GetFirstBuffer(stream);
64        while (buffer) {
65            FreeBuffer(loopHandle, stream, (BufferHandle)buffer);
66            buffer = GetFirstBuffer(stream);
67        }
68    }
69    if (task->close != NULL) {
70        task->close((TaskHandle)task);
71    }
72}
73
74LE_Buffer *CreateBuffer(uint32_t bufferSize)
75{
76    LE_ONLY_CHECK(bufferSize < LOOP_MAX_BUFFER, return NULL);
77    LE_Buffer *buffer = NULL;
78    LE_CHECK((buffer = (LE_Buffer *)malloc(sizeof(LE_Buffer) + bufferSize)) != NULL,
79        return NULL, "Failed to alloc memory for buffer");
80    OH_ListInit(&buffer->node);
81    buffer->buffSize = bufferSize;
82    buffer->dataSize = 0;
83    return buffer;
84}
85
86int IsBufferEmpty(StreamTask *task)
87{
88    LoopMutexLock(&task->mutex);
89    int ret = ListEmpty(task->buffHead);
90    LoopMutexUnlock(&task->mutex);
91    return ret;
92}
93
94LE_Buffer *GetFirstBuffer(StreamTask *task)
95{
96    LoopMutexLock(&task->mutex);
97    ListNode *node = task->buffHead.next;
98    LE_Buffer *buffer = NULL;
99    if (node != &task->buffHead) {
100        buffer = ListEntry(node, LE_Buffer, node);
101    }
102    LoopMutexUnlock(&task->mutex);
103    return buffer;
104}
105
106void AddBuffer(StreamTask *task, LE_Buffer *buffer)
107{
108    LoopMutexLock(&task->mutex);
109    OH_ListAddTail(&task->buffHead, &buffer->node);
110    LoopMutexUnlock(&task->mutex);
111}
112
113LE_Buffer *GetNextBuffer(StreamTask *task, const LE_Buffer *next)
114{
115    LoopMutexLock(&task->mutex);
116    LE_Buffer *buffer = NULL;
117    ListNode *node = NULL;
118    if (next == NULL) {
119        node = task->buffHead.next;
120    } else {
121        node = next->node.next;
122    }
123    if (node != &task->buffHead) {
124        buffer = ListEntry(node, LE_Buffer, node);
125    }
126    LoopMutexUnlock(&task->mutex);
127    return buffer;
128}
129
130void FreeBuffer(const LoopHandle loop, StreamTask *task, LE_Buffer *buffer)
131{
132    LE_CHECK(buffer != NULL, return, "Invalid buffer");
133    if (task == NULL) {
134        free(buffer);
135        return;
136    }
137    if (CheckTaskFlags((BaseTask *)task, TASK_STREAM | TASK_CONNECT) ||
138        CheckTaskFlags((BaseTask *)task, TASK_EVENT | TASK_ASYNC_EVENT)) {
139        LoopMutexLock(&task->mutex);
140        OH_ListRemove(&buffer->node);
141        LoopMutexUnlock(&task->mutex);
142    }
143    free(buffer);
144}
145
146BufferHandle LE_CreateBuffer(const LoopHandle loop, uint32_t bufferSize)
147{
148    return (BufferHandle)CreateBuffer(bufferSize);
149}
150
151void LE_FreeBuffer(const LoopHandle loop, const TaskHandle taskHandle, const BufferHandle handle)
152{
153    FreeBuffer(loop, (StreamTask *)taskHandle, (LE_Buffer *)handle);
154}
155
156uint8_t *LE_GetBufferInfo(const BufferHandle handle, uint32_t *dataSize, uint32_t *buffSize)
157{
158    LE_Buffer *buffer = (LE_Buffer *)handle;
159    LE_CHECK(buffer != NULL, return NULL, "Invalid buffer");
160    if (dataSize) {
161        *dataSize = (uint32_t)buffer->dataSize;
162    }
163    if (buffSize) {
164        *buffSize = (uint32_t)buffer->buffSize;
165    }
166    return buffer->data;
167}
168
169LE_STATUS LE_Send(const LoopHandle loopHandle,
170    const TaskHandle taskHandle, const BufferHandle buffHandle, uint32_t buffLen)
171{
172    LE_CHECK(loopHandle != NULL && buffHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
173    LE_CHECK(taskHandle != NULL, return LE_INVALID_TASK, "Invalid task");
174    EventLoop *loop = (EventLoop *)loopHandle;
175    if (((BaseTask *)taskHandle)->flags & TASK_FLAGS_INVALID) {
176        LE_FreeBuffer(loopHandle, taskHandle, buffHandle);
177        return LE_INVALID_TASK;
178    }
179    LE_Buffer *buffer = (LE_Buffer *)buffHandle;
180    buffer->dataSize = buffLen;
181    if (CheckTaskFlags((BaseTask *)taskHandle, TASK_STREAM | TASK_CONNECT)) {
182        AddBuffer((StreamTask *)taskHandle, buffer);
183    } else if (CheckTaskFlags((BaseTask *)taskHandle, TASK_EVENT | TASK_ASYNC_EVENT)) {
184        AddBuffer((StreamTask *)taskHandle, buffer);
185    }
186    loop->modEvent(loop, (BaseTask *)taskHandle, EVENT_WRITE);
187    return LE_SUCCESS;
188}
189
190void LE_CloseTask(const LoopHandle loopHandle, const TaskHandle taskHandle)
191{
192    LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
193    if (((LoopBase*)taskHandle)->flags & TASK_TIME) {
194        LE_StopTimer(loopHandle, taskHandle);
195        return;
196    }
197    LE_LOGV("LE_CloseTask %d", GetSocketFd(taskHandle));
198    BaseTask *task = (BaseTask *)taskHandle;
199    if (task->innerClose != NULL) {
200        task->innerClose(loopHandle, taskHandle);
201    }
202    free(task);
203}
204
205void *LE_GetUserData(TaskHandle handle)
206{
207    LE_CHECK(handle != NULL, return NULL, "Invalid handle");
208    BaseTask *stream = (BaseTask *)handle;
209    return (void *)(((char *)stream) + stream->userDataOffset);
210}
211
212int32_t LE_GetSendResult(const BufferHandle handle)
213{
214    LE_CHECK(handle != NULL, return 0, "Invalid handle");
215    return ((LE_Buffer *)handle)->result;
216}