1d9f0492fSopenharmony_ci/* 2d9f0492fSopenharmony_ci * Copyright (c) 2021 Huawei Device Co., Ltd. 3d9f0492fSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4d9f0492fSopenharmony_ci * you may not use this file except in compliance with the License. 5d9f0492fSopenharmony_ci * You may obtain a copy of the License at 6d9f0492fSopenharmony_ci * 7d9f0492fSopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8d9f0492fSopenharmony_ci * 9d9f0492fSopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10d9f0492fSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11d9f0492fSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12d9f0492fSopenharmony_ci * See the License for the specific language governing permissions and 13d9f0492fSopenharmony_ci * limitations under the License. 14d9f0492fSopenharmony_ci */ 15d9f0492fSopenharmony_ci 16d9f0492fSopenharmony_ci#include "le_task.h" 17d9f0492fSopenharmony_ci 18d9f0492fSopenharmony_ci 19d9f0492fSopenharmony_ci#include "le_loop.h" 20d9f0492fSopenharmony_ci#include "le_utils.h" 21d9f0492fSopenharmony_ci 22d9f0492fSopenharmony_ciint CheckTaskFlags(const BaseTask *task, uint32_t flags) 23d9f0492fSopenharmony_ci{ 24d9f0492fSopenharmony_ci if (task == NULL) { 25d9f0492fSopenharmony_ci return 0; 26d9f0492fSopenharmony_ci } 27d9f0492fSopenharmony_ci return ((task->flags & flags) == flags); 28d9f0492fSopenharmony_ci} 29d9f0492fSopenharmony_ci 30d9f0492fSopenharmony_ciint GetSocketFd(const TaskHandle task) 31d9f0492fSopenharmony_ci{ 32d9f0492fSopenharmony_ci BaseTask *stream = (BaseTask *)task; 33d9f0492fSopenharmony_ci return stream->taskId.fd; 34d9f0492fSopenharmony_ci} 35d9f0492fSopenharmony_ci 36d9f0492fSopenharmony_ciBaseTask *CreateTask(const LoopHandle loopHandle, int fd, const LE_BaseInfo *info, uint32_t size) 37d9f0492fSopenharmony_ci{ 38d9f0492fSopenharmony_ci if ((size >= LOOP_MAX_BUFFER) || ((size + info->userDataSize) >= LOOP_MAX_BUFFER)) { 39d9f0492fSopenharmony_ci return NULL; 40d9f0492fSopenharmony_ci } 41d9f0492fSopenharmony_ci BaseTask *task = (BaseTask *)calloc(1, size + info->userDataSize); 42d9f0492fSopenharmony_ci LE_CHECK(task != NULL, return NULL, "Failed to alloc for task"); 43d9f0492fSopenharmony_ci HASHMAPInitNode(&task->hashNode); 44d9f0492fSopenharmony_ci // key id 45d9f0492fSopenharmony_ci task->flags = info->flags; 46d9f0492fSopenharmony_ci task->taskId.fd = fd; 47d9f0492fSopenharmony_ci LE_STATUS ret = AddTask((EventLoop *)loopHandle, task); 48d9f0492fSopenharmony_ci LE_CHECK(ret == LE_SUCCESS, free(task); 49d9f0492fSopenharmony_ci return NULL, "Failed to alloc for task"); 50d9f0492fSopenharmony_ci task->userDataSize = info->userDataSize; 51d9f0492fSopenharmony_ci task->userDataOffset = size; 52d9f0492fSopenharmony_ci task->close = info->close; 53d9f0492fSopenharmony_ci return task; 54d9f0492fSopenharmony_ci} 55d9f0492fSopenharmony_ci 56d9f0492fSopenharmony_civoid CloseTask(const LoopHandle loopHandle, BaseTask *task) 57d9f0492fSopenharmony_ci{ 58d9f0492fSopenharmony_ci LE_CHECK(loopHandle != NULL && task != NULL, return, "Invalid parameters"); 59d9f0492fSopenharmony_ci LE_LOGV("CloseTask %d", task->taskId.fd); 60d9f0492fSopenharmony_ci if (CheckTaskFlags(task, TASK_STREAM | TASK_CONNECT) || 61d9f0492fSopenharmony_ci CheckTaskFlags(task, TASK_EVENT | TASK_ASYNC_EVENT)) { 62d9f0492fSopenharmony_ci StreamTask *stream = (StreamTask *)task; 63d9f0492fSopenharmony_ci LE_Buffer *buffer = GetFirstBuffer(stream); 64d9f0492fSopenharmony_ci while (buffer) { 65d9f0492fSopenharmony_ci FreeBuffer(loopHandle, stream, (BufferHandle)buffer); 66d9f0492fSopenharmony_ci buffer = GetFirstBuffer(stream); 67d9f0492fSopenharmony_ci } 68d9f0492fSopenharmony_ci } 69d9f0492fSopenharmony_ci if (task->close != NULL) { 70d9f0492fSopenharmony_ci task->close((TaskHandle)task); 71d9f0492fSopenharmony_ci } 72d9f0492fSopenharmony_ci} 73d9f0492fSopenharmony_ci 74d9f0492fSopenharmony_ciLE_Buffer *CreateBuffer(uint32_t bufferSize) 75d9f0492fSopenharmony_ci{ 76d9f0492fSopenharmony_ci LE_ONLY_CHECK(bufferSize < LOOP_MAX_BUFFER, return NULL); 77d9f0492fSopenharmony_ci LE_Buffer *buffer = NULL; 78d9f0492fSopenharmony_ci LE_CHECK((buffer = (LE_Buffer *)malloc(sizeof(LE_Buffer) + bufferSize)) != NULL, 79d9f0492fSopenharmony_ci return NULL, "Failed to alloc memory for buffer"); 80d9f0492fSopenharmony_ci OH_ListInit(&buffer->node); 81d9f0492fSopenharmony_ci buffer->buffSize = bufferSize; 82d9f0492fSopenharmony_ci buffer->dataSize = 0; 83d9f0492fSopenharmony_ci return buffer; 84d9f0492fSopenharmony_ci} 85d9f0492fSopenharmony_ci 86d9f0492fSopenharmony_ciint IsBufferEmpty(StreamTask *task) 87d9f0492fSopenharmony_ci{ 88d9f0492fSopenharmony_ci LoopMutexLock(&task->mutex); 89d9f0492fSopenharmony_ci int ret = ListEmpty(task->buffHead); 90d9f0492fSopenharmony_ci LoopMutexUnlock(&task->mutex); 91d9f0492fSopenharmony_ci return ret; 92d9f0492fSopenharmony_ci} 93d9f0492fSopenharmony_ci 94d9f0492fSopenharmony_ciLE_Buffer *GetFirstBuffer(StreamTask *task) 95d9f0492fSopenharmony_ci{ 96d9f0492fSopenharmony_ci LoopMutexLock(&task->mutex); 97d9f0492fSopenharmony_ci ListNode *node = task->buffHead.next; 98d9f0492fSopenharmony_ci LE_Buffer *buffer = NULL; 99d9f0492fSopenharmony_ci if (node != &task->buffHead) { 100d9f0492fSopenharmony_ci buffer = ListEntry(node, LE_Buffer, node); 101d9f0492fSopenharmony_ci } 102d9f0492fSopenharmony_ci LoopMutexUnlock(&task->mutex); 103d9f0492fSopenharmony_ci return buffer; 104d9f0492fSopenharmony_ci} 105d9f0492fSopenharmony_ci 106d9f0492fSopenharmony_civoid AddBuffer(StreamTask *task, LE_Buffer *buffer) 107d9f0492fSopenharmony_ci{ 108d9f0492fSopenharmony_ci LoopMutexLock(&task->mutex); 109d9f0492fSopenharmony_ci OH_ListAddTail(&task->buffHead, &buffer->node); 110d9f0492fSopenharmony_ci LoopMutexUnlock(&task->mutex); 111d9f0492fSopenharmony_ci} 112d9f0492fSopenharmony_ci 113d9f0492fSopenharmony_ciLE_Buffer *GetNextBuffer(StreamTask *task, const LE_Buffer *next) 114d9f0492fSopenharmony_ci{ 115d9f0492fSopenharmony_ci LoopMutexLock(&task->mutex); 116d9f0492fSopenharmony_ci LE_Buffer *buffer = NULL; 117d9f0492fSopenharmony_ci ListNode *node = NULL; 118d9f0492fSopenharmony_ci if (next == NULL) { 119d9f0492fSopenharmony_ci node = task->buffHead.next; 120d9f0492fSopenharmony_ci } else { 121d9f0492fSopenharmony_ci node = next->node.next; 122d9f0492fSopenharmony_ci } 123d9f0492fSopenharmony_ci if (node != &task->buffHead) { 124d9f0492fSopenharmony_ci buffer = ListEntry(node, LE_Buffer, node); 125d9f0492fSopenharmony_ci } 126d9f0492fSopenharmony_ci LoopMutexUnlock(&task->mutex); 127d9f0492fSopenharmony_ci return buffer; 128d9f0492fSopenharmony_ci} 129d9f0492fSopenharmony_ci 130d9f0492fSopenharmony_civoid FreeBuffer(const LoopHandle loop, StreamTask *task, LE_Buffer *buffer) 131d9f0492fSopenharmony_ci{ 132d9f0492fSopenharmony_ci LE_CHECK(buffer != NULL, return, "Invalid buffer"); 133d9f0492fSopenharmony_ci if (task == NULL) { 134d9f0492fSopenharmony_ci free(buffer); 135d9f0492fSopenharmony_ci return; 136d9f0492fSopenharmony_ci } 137d9f0492fSopenharmony_ci if (CheckTaskFlags((BaseTask *)task, TASK_STREAM | TASK_CONNECT) || 138d9f0492fSopenharmony_ci CheckTaskFlags((BaseTask *)task, TASK_EVENT | TASK_ASYNC_EVENT)) { 139d9f0492fSopenharmony_ci LoopMutexLock(&task->mutex); 140d9f0492fSopenharmony_ci OH_ListRemove(&buffer->node); 141d9f0492fSopenharmony_ci LoopMutexUnlock(&task->mutex); 142d9f0492fSopenharmony_ci } 143d9f0492fSopenharmony_ci free(buffer); 144d9f0492fSopenharmony_ci} 145d9f0492fSopenharmony_ci 146d9f0492fSopenharmony_ciBufferHandle LE_CreateBuffer(const LoopHandle loop, uint32_t bufferSize) 147d9f0492fSopenharmony_ci{ 148d9f0492fSopenharmony_ci return (BufferHandle)CreateBuffer(bufferSize); 149d9f0492fSopenharmony_ci} 150d9f0492fSopenharmony_ci 151d9f0492fSopenharmony_civoid LE_FreeBuffer(const LoopHandle loop, const TaskHandle taskHandle, const BufferHandle handle) 152d9f0492fSopenharmony_ci{ 153d9f0492fSopenharmony_ci FreeBuffer(loop, (StreamTask *)taskHandle, (LE_Buffer *)handle); 154d9f0492fSopenharmony_ci} 155d9f0492fSopenharmony_ci 156d9f0492fSopenharmony_ciuint8_t *LE_GetBufferInfo(const BufferHandle handle, uint32_t *dataSize, uint32_t *buffSize) 157d9f0492fSopenharmony_ci{ 158d9f0492fSopenharmony_ci LE_Buffer *buffer = (LE_Buffer *)handle; 159d9f0492fSopenharmony_ci LE_CHECK(buffer != NULL, return NULL, "Invalid buffer"); 160d9f0492fSopenharmony_ci if (dataSize) { 161d9f0492fSopenharmony_ci *dataSize = (uint32_t)buffer->dataSize; 162d9f0492fSopenharmony_ci } 163d9f0492fSopenharmony_ci if (buffSize) { 164d9f0492fSopenharmony_ci *buffSize = (uint32_t)buffer->buffSize; 165d9f0492fSopenharmony_ci } 166d9f0492fSopenharmony_ci return buffer->data; 167d9f0492fSopenharmony_ci} 168d9f0492fSopenharmony_ci 169d9f0492fSopenharmony_ciLE_STATUS LE_Send(const LoopHandle loopHandle, 170d9f0492fSopenharmony_ci const TaskHandle taskHandle, const BufferHandle buffHandle, uint32_t buffLen) 171d9f0492fSopenharmony_ci{ 172d9f0492fSopenharmony_ci LE_CHECK(loopHandle != NULL && buffHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters"); 173d9f0492fSopenharmony_ci LE_CHECK(taskHandle != NULL, return LE_INVALID_TASK, "Invalid task"); 174d9f0492fSopenharmony_ci EventLoop *loop = (EventLoop *)loopHandle; 175d9f0492fSopenharmony_ci if (((BaseTask *)taskHandle)->flags & TASK_FLAGS_INVALID) { 176d9f0492fSopenharmony_ci LE_FreeBuffer(loopHandle, taskHandle, buffHandle); 177d9f0492fSopenharmony_ci return LE_INVALID_TASK; 178d9f0492fSopenharmony_ci } 179d9f0492fSopenharmony_ci LE_Buffer *buffer = (LE_Buffer *)buffHandle; 180d9f0492fSopenharmony_ci buffer->dataSize = buffLen; 181d9f0492fSopenharmony_ci if (CheckTaskFlags((BaseTask *)taskHandle, TASK_STREAM | TASK_CONNECT)) { 182d9f0492fSopenharmony_ci AddBuffer((StreamTask *)taskHandle, buffer); 183d9f0492fSopenharmony_ci } else if (CheckTaskFlags((BaseTask *)taskHandle, TASK_EVENT | TASK_ASYNC_EVENT)) { 184d9f0492fSopenharmony_ci AddBuffer((StreamTask *)taskHandle, buffer); 185d9f0492fSopenharmony_ci } 186d9f0492fSopenharmony_ci loop->modEvent(loop, (BaseTask *)taskHandle, EVENT_WRITE); 187d9f0492fSopenharmony_ci return LE_SUCCESS; 188d9f0492fSopenharmony_ci} 189d9f0492fSopenharmony_ci 190d9f0492fSopenharmony_civoid LE_CloseTask(const LoopHandle loopHandle, const TaskHandle taskHandle) 191d9f0492fSopenharmony_ci{ 192d9f0492fSopenharmony_ci LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters"); 193d9f0492fSopenharmony_ci if (((LoopBase*)taskHandle)->flags & TASK_TIME) { 194d9f0492fSopenharmony_ci LE_StopTimer(loopHandle, taskHandle); 195d9f0492fSopenharmony_ci return; 196d9f0492fSopenharmony_ci } 197d9f0492fSopenharmony_ci LE_LOGV("LE_CloseTask %d", GetSocketFd(taskHandle)); 198d9f0492fSopenharmony_ci BaseTask *task = (BaseTask *)taskHandle; 199d9f0492fSopenharmony_ci if (task->innerClose != NULL) { 200d9f0492fSopenharmony_ci task->innerClose(loopHandle, taskHandle); 201d9f0492fSopenharmony_ci } 202d9f0492fSopenharmony_ci free(task); 203d9f0492fSopenharmony_ci} 204d9f0492fSopenharmony_ci 205d9f0492fSopenharmony_civoid *LE_GetUserData(TaskHandle handle) 206d9f0492fSopenharmony_ci{ 207d9f0492fSopenharmony_ci LE_CHECK(handle != NULL, return NULL, "Invalid handle"); 208d9f0492fSopenharmony_ci BaseTask *stream = (BaseTask *)handle; 209d9f0492fSopenharmony_ci return (void *)(((char *)stream) + stream->userDataOffset); 210d9f0492fSopenharmony_ci} 211d9f0492fSopenharmony_ci 212d9f0492fSopenharmony_ciint32_t LE_GetSendResult(const BufferHandle handle) 213d9f0492fSopenharmony_ci{ 214d9f0492fSopenharmony_ci LE_CHECK(handle != NULL, return 0, "Invalid handle"); 215d9f0492fSopenharmony_ci return ((LE_Buffer *)handle)->result; 216d9f0492fSopenharmony_ci}