1/* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15#include "le_task.h" 16#include <time.h> 17#include <sys/eventfd.h> 18 19#include "le_loop.h" 20 21#define MILLION_MICROSECOND 1000000 22#define THOUSAND_MILLISECOND 1000 23 24static void DoAsyncEvent_(const LoopHandle loopHandle, AsyncEventTask *asyncTask) 25{ 26 LE_CHECK(loopHandle != NULL && asyncTask != NULL, return, "Invalid parameters"); 27#ifdef LOOP_DEBUG 28 struct timespec startTime = {0}; 29 struct timespec endTime = {0}; 30 long long diff; 31 clock_gettime(CLOCK_MONOTONIC, &(startTime)); 32#endif 33 StreamTask *task = &asyncTask->stream; 34 ListNode *node = task->buffHead.next; 35 if (node != &task->buffHead) { 36 LE_Buffer *buffer = ListEntry(node, LE_Buffer, node); 37 uint64_t eventId = *(uint64_t*)(buffer->data); 38 if (asyncTask->processAsyncEvent) { 39 asyncTask->processAsyncEvent((TaskHandle)asyncTask, eventId, 40 (uint8_t *)(buffer->data + sizeof(uint64_t)), buffer->dataSize); 41 } 42 OH_ListRemove(&buffer->node); 43 free(buffer); 44#ifdef LOOP_DEBUG 45 clock_gettime(CLOCK_MONOTONIC, &(endTime)); 46 diff = (long long)(endTime.tv_sec - startTime.tv_sec) * MILLION_MICROSECOND; 47 if (endTime.tv_nsec > startTime.tv_nsec) { 48 diff += (endTime.tv_nsec - startTime.tv_nsec) / THOUSAND_MILLISECOND; // 1000 ms 49 } else { 50 diff -= (endTime.tv_nsec - startTime.tv_nsec) / THOUSAND_MILLISECOND; // 1000 ms 51 } 52 LE_LOGI("DoAsyncEvent_ diff %ld", diff); 53#endif 54 } 55} 56 57#ifdef STARTUP_INIT_TEST 58void LE_DoAsyncEvent(const LoopHandle loopHandle, const TaskHandle taskHandle) 59{ 60 AsyncEventTask *asyncTask = (AsyncEventTask *)taskHandle; 61 while (!IsBufferEmpty(&asyncTask->stream)) { 62 DoAsyncEvent_(loopHandle, (AsyncEventTask *)taskHandle); 63 } 64} 65#endif 66 67static LE_STATUS HandleAsyncEvent_(const LoopHandle loopHandle, const TaskHandle taskHandle, uint32_t oper) 68{ 69 LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters"); 70 LE_LOGV("HandleAsyncEvent_ fd: %d oper 0x%x", GetSocketFd(taskHandle), oper); 71 EventLoop *loop = (EventLoop *)loopHandle; 72 AsyncEventTask *asyncTask = (AsyncEventTask *)taskHandle; 73 if (LE_TEST_FLAGS(oper, EVENT_READ)) { 74 uint64_t eventId = 0; 75 int ret = read(GetSocketFd(taskHandle), &eventId, sizeof(eventId)); 76 LE_LOGV("HandleAsyncEvent_ read fd:%d ret: %d eventId %llu", GetSocketFd(taskHandle), ret, eventId); 77 DoAsyncEvent_(loopHandle, asyncTask); 78 if (!IsBufferEmpty(&asyncTask->stream)) { 79 loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_WRITE); 80 return LE_SUCCESS; 81 } 82 } else { 83 static uint64_t eventId = 0; 84 (void)write(GetSocketFd(taskHandle), &eventId, sizeof(eventId)); 85 loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_READ); 86 eventId++; 87 } 88 return LE_SUCCESS; 89} 90 91static void HandleAsyncTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle) 92{ 93 BaseTask *task = (BaseTask *)taskHandle; 94 DelTask((EventLoop *)loopHandle, task); 95 CloseTask(loopHandle, task); 96 close(task->taskId.fd); 97} 98 99static void DumpEventTaskInfo_(const TaskHandle task) 100{ 101 INIT_CHECK(task != NULL, return); 102 BaseTask *baseTask = (BaseTask *)task; 103 AsyncEventTask *eventTask = (AsyncEventTask *)baseTask; 104 printf("\tfd: %d \n", eventTask->stream.base.taskId.fd); 105 printf("\t TaskType: %s\n", "EventTask"); 106} 107 108LE_STATUS LE_CreateAsyncTask(const LoopHandle loopHandle, 109 TaskHandle *taskHandle, LE_ProcessAsyncEvent processAsyncEvent) 110{ 111 LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters"); 112 LE_CHECK(processAsyncEvent != NULL, return LE_INVALID_PARAM, "Invalid parameters processAsyncEvent "); 113 114 int fd = eventfd(1, EFD_NONBLOCK | EFD_CLOEXEC); 115 LE_CHECK(fd > 0, return LE_FAILURE, "Failed to event fd "); 116 LE_BaseInfo baseInfo = {TASK_EVENT | TASK_ASYNC_EVENT, NULL}; 117 AsyncEventTask *task = (AsyncEventTask *)CreateTask(loopHandle, fd, &baseInfo, sizeof(AsyncEventTask)); 118 LE_CHECK(task != NULL, close(fd); 119 return LE_NO_MEMORY, "Failed to create task"); 120 task->stream.base.handleEvent = HandleAsyncEvent_; 121 task->stream.base.innerClose = HandleAsyncTaskClose_; 122 task->stream.base.dumpTaskInfo = DumpEventTaskInfo_; 123 OH_ListInit(&task->stream.buffHead); 124 LoopMutexInit(&task->stream.mutex); 125 task->processAsyncEvent = processAsyncEvent; 126 EventLoop *loop = (EventLoop *)loopHandle; 127 loop->addEvent(loop, (const BaseTask *)task, EVENT_READ); 128 *taskHandle = (TaskHandle)task; 129 return LE_SUCCESS; 130} 131 132LE_STATUS LE_StartAsyncEvent(const LoopHandle loopHandle, 133 const TaskHandle taskHandle, uint64_t eventId, const uint8_t *data, uint32_t buffLen) 134{ 135 LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters"); 136 BufferHandle handle = LE_CreateBuffer(loopHandle, buffLen + 1 + sizeof(eventId)); 137 char *buff = (char *)LE_GetBufferInfo(handle, NULL, NULL); 138 LE_CHECK(buff != NULL, return LE_FAILURE, "Failed to get buff"); 139 int ret = memcpy_s(buff, sizeof(eventId), &eventId, sizeof(eventId)); 140 LE_CHECK(ret == 0, return -1, "Failed to copy data"); 141 if (data != NULL && buffLen > 0) { 142 ret = memcpy_s(buff + sizeof(eventId), buffLen, data, buffLen); 143 LE_CHECK(ret == 0, return -1, "Failed to copy data"); 144 buff[sizeof(eventId) + buffLen] = '\0'; 145 } 146 return LE_Send(loopHandle, taskHandle, handle, buffLen); 147} 148 149void LE_StopAsyncTask(LoopHandle loopHandle, TaskHandle taskHandle) 150{ 151 LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters"); 152 LE_CloseTask(loopHandle, taskHandle); 153}