1d9f0492fSopenharmony_ci/*
2d9f0492fSopenharmony_ci * Copyright (c) 2021 Huawei Device Co., Ltd.
3d9f0492fSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4d9f0492fSopenharmony_ci * you may not use this file except in compliance with the License.
5d9f0492fSopenharmony_ci * You may obtain a copy of the License at
6d9f0492fSopenharmony_ci *
7d9f0492fSopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0
8d9f0492fSopenharmony_ci *
9d9f0492fSopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10d9f0492fSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11d9f0492fSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12d9f0492fSopenharmony_ci * See the License for the specific language governing permissions and
13d9f0492fSopenharmony_ci * limitations under the License.
14d9f0492fSopenharmony_ci */
15d9f0492fSopenharmony_ci
16d9f0492fSopenharmony_ci#include "le_task.h"
17d9f0492fSopenharmony_ci
18d9f0492fSopenharmony_ci
19d9f0492fSopenharmony_ci#include "le_loop.h"
20d9f0492fSopenharmony_ci#include "le_utils.h"
21d9f0492fSopenharmony_ci
22d9f0492fSopenharmony_ciint CheckTaskFlags(const BaseTask *task, uint32_t flags)
23d9f0492fSopenharmony_ci{
24d9f0492fSopenharmony_ci    if (task == NULL) {
25d9f0492fSopenharmony_ci        return 0;
26d9f0492fSopenharmony_ci    }
27d9f0492fSopenharmony_ci    return ((task->flags & flags) == flags);
28d9f0492fSopenharmony_ci}
29d9f0492fSopenharmony_ci
30d9f0492fSopenharmony_ciint GetSocketFd(const TaskHandle task)
31d9f0492fSopenharmony_ci{
32d9f0492fSopenharmony_ci    BaseTask *stream = (BaseTask *)task;
33d9f0492fSopenharmony_ci    return stream->taskId.fd;
34d9f0492fSopenharmony_ci}
35d9f0492fSopenharmony_ci
36d9f0492fSopenharmony_ciBaseTask *CreateTask(const LoopHandle loopHandle, int fd, const LE_BaseInfo *info, uint32_t size)
37d9f0492fSopenharmony_ci{
38d9f0492fSopenharmony_ci    if ((size >= LOOP_MAX_BUFFER) || ((size + info->userDataSize) >= LOOP_MAX_BUFFER)) {
39d9f0492fSopenharmony_ci        return NULL;
40d9f0492fSopenharmony_ci    }
41d9f0492fSopenharmony_ci    BaseTask *task = (BaseTask *)calloc(1, size + info->userDataSize);
42d9f0492fSopenharmony_ci    LE_CHECK(task != NULL, return NULL, "Failed to alloc for task");
43d9f0492fSopenharmony_ci    HASHMAPInitNode(&task->hashNode);
44d9f0492fSopenharmony_ci    // key id
45d9f0492fSopenharmony_ci    task->flags = info->flags;
46d9f0492fSopenharmony_ci    task->taskId.fd = fd;
47d9f0492fSopenharmony_ci    LE_STATUS ret = AddTask((EventLoop *)loopHandle, task);
48d9f0492fSopenharmony_ci    LE_CHECK(ret == LE_SUCCESS, free(task);
49d9f0492fSopenharmony_ci        return NULL, "Failed to alloc for task");
50d9f0492fSopenharmony_ci    task->userDataSize = info->userDataSize;
51d9f0492fSopenharmony_ci    task->userDataOffset = size;
52d9f0492fSopenharmony_ci    task->close = info->close;
53d9f0492fSopenharmony_ci    return task;
54d9f0492fSopenharmony_ci}
55d9f0492fSopenharmony_ci
56d9f0492fSopenharmony_civoid CloseTask(const LoopHandle loopHandle, BaseTask *task)
57d9f0492fSopenharmony_ci{
58d9f0492fSopenharmony_ci    LE_CHECK(loopHandle != NULL && task != NULL, return, "Invalid parameters");
59d9f0492fSopenharmony_ci    LE_LOGV("CloseTask %d", task->taskId.fd);
60d9f0492fSopenharmony_ci    if (CheckTaskFlags(task, TASK_STREAM | TASK_CONNECT) ||
61d9f0492fSopenharmony_ci        CheckTaskFlags(task, TASK_EVENT | TASK_ASYNC_EVENT)) {
62d9f0492fSopenharmony_ci        StreamTask *stream = (StreamTask *)task;
63d9f0492fSopenharmony_ci        LE_Buffer *buffer = GetFirstBuffer(stream);
64d9f0492fSopenharmony_ci        while (buffer) {
65d9f0492fSopenharmony_ci            FreeBuffer(loopHandle, stream, (BufferHandle)buffer);
66d9f0492fSopenharmony_ci            buffer = GetFirstBuffer(stream);
67d9f0492fSopenharmony_ci        }
68d9f0492fSopenharmony_ci    }
69d9f0492fSopenharmony_ci    if (task->close != NULL) {
70d9f0492fSopenharmony_ci        task->close((TaskHandle)task);
71d9f0492fSopenharmony_ci    }
72d9f0492fSopenharmony_ci}
73d9f0492fSopenharmony_ci
74d9f0492fSopenharmony_ciLE_Buffer *CreateBuffer(uint32_t bufferSize)
75d9f0492fSopenharmony_ci{
76d9f0492fSopenharmony_ci    LE_ONLY_CHECK(bufferSize < LOOP_MAX_BUFFER, return NULL);
77d9f0492fSopenharmony_ci    LE_Buffer *buffer = NULL;
78d9f0492fSopenharmony_ci    LE_CHECK((buffer = (LE_Buffer *)malloc(sizeof(LE_Buffer) + bufferSize)) != NULL,
79d9f0492fSopenharmony_ci        return NULL, "Failed to alloc memory for buffer");
80d9f0492fSopenharmony_ci    OH_ListInit(&buffer->node);
81d9f0492fSopenharmony_ci    buffer->buffSize = bufferSize;
82d9f0492fSopenharmony_ci    buffer->dataSize = 0;
83d9f0492fSopenharmony_ci    return buffer;
84d9f0492fSopenharmony_ci}
85d9f0492fSopenharmony_ci
86d9f0492fSopenharmony_ciint IsBufferEmpty(StreamTask *task)
87d9f0492fSopenharmony_ci{
88d9f0492fSopenharmony_ci    LoopMutexLock(&task->mutex);
89d9f0492fSopenharmony_ci    int ret = ListEmpty(task->buffHead);
90d9f0492fSopenharmony_ci    LoopMutexUnlock(&task->mutex);
91d9f0492fSopenharmony_ci    return ret;
92d9f0492fSopenharmony_ci}
93d9f0492fSopenharmony_ci
94d9f0492fSopenharmony_ciLE_Buffer *GetFirstBuffer(StreamTask *task)
95d9f0492fSopenharmony_ci{
96d9f0492fSopenharmony_ci    LoopMutexLock(&task->mutex);
97d9f0492fSopenharmony_ci    ListNode *node = task->buffHead.next;
98d9f0492fSopenharmony_ci    LE_Buffer *buffer = NULL;
99d9f0492fSopenharmony_ci    if (node != &task->buffHead) {
100d9f0492fSopenharmony_ci        buffer = ListEntry(node, LE_Buffer, node);
101d9f0492fSopenharmony_ci    }
102d9f0492fSopenharmony_ci    LoopMutexUnlock(&task->mutex);
103d9f0492fSopenharmony_ci    return buffer;
104d9f0492fSopenharmony_ci}
105d9f0492fSopenharmony_ci
106d9f0492fSopenharmony_civoid AddBuffer(StreamTask *task, LE_Buffer *buffer)
107d9f0492fSopenharmony_ci{
108d9f0492fSopenharmony_ci    LoopMutexLock(&task->mutex);
109d9f0492fSopenharmony_ci    OH_ListAddTail(&task->buffHead, &buffer->node);
110d9f0492fSopenharmony_ci    LoopMutexUnlock(&task->mutex);
111d9f0492fSopenharmony_ci}
112d9f0492fSopenharmony_ci
113d9f0492fSopenharmony_ciLE_Buffer *GetNextBuffer(StreamTask *task, const LE_Buffer *next)
114d9f0492fSopenharmony_ci{
115d9f0492fSopenharmony_ci    LoopMutexLock(&task->mutex);
116d9f0492fSopenharmony_ci    LE_Buffer *buffer = NULL;
117d9f0492fSopenharmony_ci    ListNode *node = NULL;
118d9f0492fSopenharmony_ci    if (next == NULL) {
119d9f0492fSopenharmony_ci        node = task->buffHead.next;
120d9f0492fSopenharmony_ci    } else {
121d9f0492fSopenharmony_ci        node = next->node.next;
122d9f0492fSopenharmony_ci    }
123d9f0492fSopenharmony_ci    if (node != &task->buffHead) {
124d9f0492fSopenharmony_ci        buffer = ListEntry(node, LE_Buffer, node);
125d9f0492fSopenharmony_ci    }
126d9f0492fSopenharmony_ci    LoopMutexUnlock(&task->mutex);
127d9f0492fSopenharmony_ci    return buffer;
128d9f0492fSopenharmony_ci}
129d9f0492fSopenharmony_ci
130d9f0492fSopenharmony_civoid FreeBuffer(const LoopHandle loop, StreamTask *task, LE_Buffer *buffer)
131d9f0492fSopenharmony_ci{
132d9f0492fSopenharmony_ci    LE_CHECK(buffer != NULL, return, "Invalid buffer");
133d9f0492fSopenharmony_ci    if (task == NULL) {
134d9f0492fSopenharmony_ci        free(buffer);
135d9f0492fSopenharmony_ci        return;
136d9f0492fSopenharmony_ci    }
137d9f0492fSopenharmony_ci    if (CheckTaskFlags((BaseTask *)task, TASK_STREAM | TASK_CONNECT) ||
138d9f0492fSopenharmony_ci        CheckTaskFlags((BaseTask *)task, TASK_EVENT | TASK_ASYNC_EVENT)) {
139d9f0492fSopenharmony_ci        LoopMutexLock(&task->mutex);
140d9f0492fSopenharmony_ci        OH_ListRemove(&buffer->node);
141d9f0492fSopenharmony_ci        LoopMutexUnlock(&task->mutex);
142d9f0492fSopenharmony_ci    }
143d9f0492fSopenharmony_ci    free(buffer);
144d9f0492fSopenharmony_ci}
145d9f0492fSopenharmony_ci
146d9f0492fSopenharmony_ciBufferHandle LE_CreateBuffer(const LoopHandle loop, uint32_t bufferSize)
147d9f0492fSopenharmony_ci{
148d9f0492fSopenharmony_ci    return (BufferHandle)CreateBuffer(bufferSize);
149d9f0492fSopenharmony_ci}
150d9f0492fSopenharmony_ci
151d9f0492fSopenharmony_civoid LE_FreeBuffer(const LoopHandle loop, const TaskHandle taskHandle, const BufferHandle handle)
152d9f0492fSopenharmony_ci{
153d9f0492fSopenharmony_ci    FreeBuffer(loop, (StreamTask *)taskHandle, (LE_Buffer *)handle);
154d9f0492fSopenharmony_ci}
155d9f0492fSopenharmony_ci
156d9f0492fSopenharmony_ciuint8_t *LE_GetBufferInfo(const BufferHandle handle, uint32_t *dataSize, uint32_t *buffSize)
157d9f0492fSopenharmony_ci{
158d9f0492fSopenharmony_ci    LE_Buffer *buffer = (LE_Buffer *)handle;
159d9f0492fSopenharmony_ci    LE_CHECK(buffer != NULL, return NULL, "Invalid buffer");
160d9f0492fSopenharmony_ci    if (dataSize) {
161d9f0492fSopenharmony_ci        *dataSize = (uint32_t)buffer->dataSize;
162d9f0492fSopenharmony_ci    }
163d9f0492fSopenharmony_ci    if (buffSize) {
164d9f0492fSopenharmony_ci        *buffSize = (uint32_t)buffer->buffSize;
165d9f0492fSopenharmony_ci    }
166d9f0492fSopenharmony_ci    return buffer->data;
167d9f0492fSopenharmony_ci}
168d9f0492fSopenharmony_ci
169d9f0492fSopenharmony_ciLE_STATUS LE_Send(const LoopHandle loopHandle,
170d9f0492fSopenharmony_ci    const TaskHandle taskHandle, const BufferHandle buffHandle, uint32_t buffLen)
171d9f0492fSopenharmony_ci{
172d9f0492fSopenharmony_ci    LE_CHECK(loopHandle != NULL && buffHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
173d9f0492fSopenharmony_ci    LE_CHECK(taskHandle != NULL, return LE_INVALID_TASK, "Invalid task");
174d9f0492fSopenharmony_ci    EventLoop *loop = (EventLoop *)loopHandle;
175d9f0492fSopenharmony_ci    if (((BaseTask *)taskHandle)->flags & TASK_FLAGS_INVALID) {
176d9f0492fSopenharmony_ci        LE_FreeBuffer(loopHandle, taskHandle, buffHandle);
177d9f0492fSopenharmony_ci        return LE_INVALID_TASK;
178d9f0492fSopenharmony_ci    }
179d9f0492fSopenharmony_ci    LE_Buffer *buffer = (LE_Buffer *)buffHandle;
180d9f0492fSopenharmony_ci    buffer->dataSize = buffLen;
181d9f0492fSopenharmony_ci    if (CheckTaskFlags((BaseTask *)taskHandle, TASK_STREAM | TASK_CONNECT)) {
182d9f0492fSopenharmony_ci        AddBuffer((StreamTask *)taskHandle, buffer);
183d9f0492fSopenharmony_ci    } else if (CheckTaskFlags((BaseTask *)taskHandle, TASK_EVENT | TASK_ASYNC_EVENT)) {
184d9f0492fSopenharmony_ci        AddBuffer((StreamTask *)taskHandle, buffer);
185d9f0492fSopenharmony_ci    }
186d9f0492fSopenharmony_ci    loop->modEvent(loop, (BaseTask *)taskHandle, EVENT_WRITE);
187d9f0492fSopenharmony_ci    return LE_SUCCESS;
188d9f0492fSopenharmony_ci}
189d9f0492fSopenharmony_ci
190d9f0492fSopenharmony_civoid LE_CloseTask(const LoopHandle loopHandle, const TaskHandle taskHandle)
191d9f0492fSopenharmony_ci{
192d9f0492fSopenharmony_ci    LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
193d9f0492fSopenharmony_ci    if (((LoopBase*)taskHandle)->flags & TASK_TIME) {
194d9f0492fSopenharmony_ci        LE_StopTimer(loopHandle, taskHandle);
195d9f0492fSopenharmony_ci        return;
196d9f0492fSopenharmony_ci    }
197d9f0492fSopenharmony_ci    LE_LOGV("LE_CloseTask %d", GetSocketFd(taskHandle));
198d9f0492fSopenharmony_ci    BaseTask *task = (BaseTask *)taskHandle;
199d9f0492fSopenharmony_ci    if (task->innerClose != NULL) {
200d9f0492fSopenharmony_ci        task->innerClose(loopHandle, taskHandle);
201d9f0492fSopenharmony_ci    }
202d9f0492fSopenharmony_ci    free(task);
203d9f0492fSopenharmony_ci}
204d9f0492fSopenharmony_ci
205d9f0492fSopenharmony_civoid *LE_GetUserData(TaskHandle handle)
206d9f0492fSopenharmony_ci{
207d9f0492fSopenharmony_ci    LE_CHECK(handle != NULL, return NULL, "Invalid handle");
208d9f0492fSopenharmony_ci    BaseTask *stream = (BaseTask *)handle;
209d9f0492fSopenharmony_ci    return (void *)(((char *)stream) + stream->userDataOffset);
210d9f0492fSopenharmony_ci}
211d9f0492fSopenharmony_ci
212d9f0492fSopenharmony_ciint32_t LE_GetSendResult(const BufferHandle handle)
213d9f0492fSopenharmony_ci{
214d9f0492fSopenharmony_ci    LE_CHECK(handle != NULL, return 0, "Invalid handle");
215d9f0492fSopenharmony_ci    return ((LE_Buffer *)handle)->result;
216d9f0492fSopenharmony_ci}