1/* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include "le_task.h" 17 18 19#include "le_loop.h" 20#include "le_utils.h" 21 22int CheckTaskFlags(const BaseTask *task, uint32_t flags) 23{ 24 if (task == NULL) { 25 return 0; 26 } 27 return ((task->flags & flags) == flags); 28} 29 30int GetSocketFd(const TaskHandle task) 31{ 32 BaseTask *stream = (BaseTask *)task; 33 return stream->taskId.fd; 34} 35 36BaseTask *CreateTask(const LoopHandle loopHandle, int fd, const LE_BaseInfo *info, uint32_t size) 37{ 38 if ((size >= LOOP_MAX_BUFFER) || ((size + info->userDataSize) >= LOOP_MAX_BUFFER)) { 39 return NULL; 40 } 41 BaseTask *task = (BaseTask *)calloc(1, size + info->userDataSize); 42 LE_CHECK(task != NULL, return NULL, "Failed to alloc for task"); 43 HASHMAPInitNode(&task->hashNode); 44 // key id 45 task->flags = info->flags; 46 task->taskId.fd = fd; 47 LE_STATUS ret = AddTask((EventLoop *)loopHandle, task); 48 LE_CHECK(ret == LE_SUCCESS, free(task); 49 return NULL, "Failed to alloc for task"); 50 task->userDataSize = info->userDataSize; 51 task->userDataOffset = size; 52 task->close = info->close; 53 return task; 54} 55 56void CloseTask(const LoopHandle loopHandle, BaseTask *task) 57{ 58 LE_CHECK(loopHandle != NULL && task != NULL, return, "Invalid parameters"); 59 LE_LOGV("CloseTask %d", task->taskId.fd); 60 if (CheckTaskFlags(task, TASK_STREAM | TASK_CONNECT) || 61 CheckTaskFlags(task, TASK_EVENT | TASK_ASYNC_EVENT)) { 62 StreamTask *stream = (StreamTask *)task; 63 LE_Buffer *buffer = GetFirstBuffer(stream); 64 while (buffer) { 65 FreeBuffer(loopHandle, stream, (BufferHandle)buffer); 66 buffer = GetFirstBuffer(stream); 67 } 68 } 69 if (task->close != NULL) { 70 task->close((TaskHandle)task); 71 } 72} 73 74LE_Buffer *CreateBuffer(uint32_t bufferSize) 75{ 76 LE_ONLY_CHECK(bufferSize < LOOP_MAX_BUFFER, return NULL); 77 LE_Buffer *buffer = NULL; 78 LE_CHECK((buffer = (LE_Buffer *)malloc(sizeof(LE_Buffer) + bufferSize)) != NULL, 79 return NULL, "Failed to alloc memory for buffer"); 80 OH_ListInit(&buffer->node); 81 buffer->buffSize = bufferSize; 82 buffer->dataSize = 0; 83 return buffer; 84} 85 86int IsBufferEmpty(StreamTask *task) 87{ 88 LoopMutexLock(&task->mutex); 89 int ret = ListEmpty(task->buffHead); 90 LoopMutexUnlock(&task->mutex); 91 return ret; 92} 93 94LE_Buffer *GetFirstBuffer(StreamTask *task) 95{ 96 LoopMutexLock(&task->mutex); 97 ListNode *node = task->buffHead.next; 98 LE_Buffer *buffer = NULL; 99 if (node != &task->buffHead) { 100 buffer = ListEntry(node, LE_Buffer, node); 101 } 102 LoopMutexUnlock(&task->mutex); 103 return buffer; 104} 105 106void AddBuffer(StreamTask *task, LE_Buffer *buffer) 107{ 108 LoopMutexLock(&task->mutex); 109 OH_ListAddTail(&task->buffHead, &buffer->node); 110 LoopMutexUnlock(&task->mutex); 111} 112 113LE_Buffer *GetNextBuffer(StreamTask *task, const LE_Buffer *next) 114{ 115 LoopMutexLock(&task->mutex); 116 LE_Buffer *buffer = NULL; 117 ListNode *node = NULL; 118 if (next == NULL) { 119 node = task->buffHead.next; 120 } else { 121 node = next->node.next; 122 } 123 if (node != &task->buffHead) { 124 buffer = ListEntry(node, LE_Buffer, node); 125 } 126 LoopMutexUnlock(&task->mutex); 127 return buffer; 128} 129 130void FreeBuffer(const LoopHandle loop, StreamTask *task, LE_Buffer *buffer) 131{ 132 LE_CHECK(buffer != NULL, return, "Invalid buffer"); 133 if (task == NULL) { 134 free(buffer); 135 return; 136 } 137 if (CheckTaskFlags((BaseTask *)task, TASK_STREAM | TASK_CONNECT) || 138 CheckTaskFlags((BaseTask *)task, TASK_EVENT | TASK_ASYNC_EVENT)) { 139 LoopMutexLock(&task->mutex); 140 OH_ListRemove(&buffer->node); 141 LoopMutexUnlock(&task->mutex); 142 } 143 free(buffer); 144} 145 146BufferHandle LE_CreateBuffer(const LoopHandle loop, uint32_t bufferSize) 147{ 148 return (BufferHandle)CreateBuffer(bufferSize); 149} 150 151void LE_FreeBuffer(const LoopHandle loop, const TaskHandle taskHandle, const BufferHandle handle) 152{ 153 FreeBuffer(loop, (StreamTask *)taskHandle, (LE_Buffer *)handle); 154} 155 156uint8_t *LE_GetBufferInfo(const BufferHandle handle, uint32_t *dataSize, uint32_t *buffSize) 157{ 158 LE_Buffer *buffer = (LE_Buffer *)handle; 159 LE_CHECK(buffer != NULL, return NULL, "Invalid buffer"); 160 if (dataSize) { 161 *dataSize = (uint32_t)buffer->dataSize; 162 } 163 if (buffSize) { 164 *buffSize = (uint32_t)buffer->buffSize; 165 } 166 return buffer->data; 167} 168 169LE_STATUS LE_Send(const LoopHandle loopHandle, 170 const TaskHandle taskHandle, const BufferHandle buffHandle, uint32_t buffLen) 171{ 172 LE_CHECK(loopHandle != NULL && buffHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters"); 173 LE_CHECK(taskHandle != NULL, return LE_INVALID_TASK, "Invalid task"); 174 EventLoop *loop = (EventLoop *)loopHandle; 175 if (((BaseTask *)taskHandle)->flags & TASK_FLAGS_INVALID) { 176 LE_FreeBuffer(loopHandle, taskHandle, buffHandle); 177 return LE_INVALID_TASK; 178 } 179 LE_Buffer *buffer = (LE_Buffer *)buffHandle; 180 buffer->dataSize = buffLen; 181 if (CheckTaskFlags((BaseTask *)taskHandle, TASK_STREAM | TASK_CONNECT)) { 182 AddBuffer((StreamTask *)taskHandle, buffer); 183 } else if (CheckTaskFlags((BaseTask *)taskHandle, TASK_EVENT | TASK_ASYNC_EVENT)) { 184 AddBuffer((StreamTask *)taskHandle, buffer); 185 } 186 loop->modEvent(loop, (BaseTask *)taskHandle, EVENT_WRITE); 187 return LE_SUCCESS; 188} 189 190void LE_CloseTask(const LoopHandle loopHandle, const TaskHandle taskHandle) 191{ 192 LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters"); 193 if (((LoopBase*)taskHandle)->flags & TASK_TIME) { 194 LE_StopTimer(loopHandle, taskHandle); 195 return; 196 } 197 LE_LOGV("LE_CloseTask %d", GetSocketFd(taskHandle)); 198 BaseTask *task = (BaseTask *)taskHandle; 199 if (task->innerClose != NULL) { 200 task->innerClose(loopHandle, taskHandle); 201 } 202 free(task); 203} 204 205void *LE_GetUserData(TaskHandle handle) 206{ 207 LE_CHECK(handle != NULL, return NULL, "Invalid handle"); 208 BaseTask *stream = (BaseTask *)handle; 209 return (void *)(((char *)stream) + stream->userDataOffset); 210} 211 212int32_t LE_GetSendResult(const BufferHandle handle) 213{ 214 LE_CHECK(handle != NULL, return 0, "Invalid handle"); 215 return ((LE_Buffer *)handle)->result; 216}