1d9f0492fSopenharmony_ci/* 2d9f0492fSopenharmony_ci * Copyright (c) 2021 Huawei Device Co., Ltd. 3d9f0492fSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4d9f0492fSopenharmony_ci * you may not use this file except in compliance with the License. 5d9f0492fSopenharmony_ci * You may obtain a copy of the License at 6d9f0492fSopenharmony_ci * 7d9f0492fSopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8d9f0492fSopenharmony_ci * 9d9f0492fSopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10d9f0492fSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11d9f0492fSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12d9f0492fSopenharmony_ci * See the License for the specific language governing permissions and 13d9f0492fSopenharmony_ci * limitations under the License. 14d9f0492fSopenharmony_ci */ 15d9f0492fSopenharmony_ci#include "le_task.h" 16d9f0492fSopenharmony_ci#include <time.h> 17d9f0492fSopenharmony_ci#include <sys/eventfd.h> 18d9f0492fSopenharmony_ci 19d9f0492fSopenharmony_ci#include "le_loop.h" 20d9f0492fSopenharmony_ci 21d9f0492fSopenharmony_ci#define MILLION_MICROSECOND 1000000 22d9f0492fSopenharmony_ci#define THOUSAND_MILLISECOND 1000 23d9f0492fSopenharmony_ci 24d9f0492fSopenharmony_cistatic void DoAsyncEvent_(const LoopHandle loopHandle, AsyncEventTask *asyncTask) 25d9f0492fSopenharmony_ci{ 26d9f0492fSopenharmony_ci LE_CHECK(loopHandle != NULL && asyncTask != NULL, return, "Invalid parameters"); 27d9f0492fSopenharmony_ci#ifdef LOOP_DEBUG 28d9f0492fSopenharmony_ci struct timespec startTime = {0}; 29d9f0492fSopenharmony_ci struct timespec endTime = {0}; 30d9f0492fSopenharmony_ci long long diff; 31d9f0492fSopenharmony_ci clock_gettime(CLOCK_MONOTONIC, &(startTime)); 32d9f0492fSopenharmony_ci#endif 33d9f0492fSopenharmony_ci StreamTask *task = &asyncTask->stream; 34d9f0492fSopenharmony_ci ListNode *node = task->buffHead.next; 35d9f0492fSopenharmony_ci if (node != &task->buffHead) { 36d9f0492fSopenharmony_ci LE_Buffer *buffer = ListEntry(node, LE_Buffer, node); 37d9f0492fSopenharmony_ci uint64_t eventId = *(uint64_t*)(buffer->data); 38d9f0492fSopenharmony_ci if (asyncTask->processAsyncEvent) { 39d9f0492fSopenharmony_ci asyncTask->processAsyncEvent((TaskHandle)asyncTask, eventId, 40d9f0492fSopenharmony_ci (uint8_t *)(buffer->data + sizeof(uint64_t)), buffer->dataSize); 41d9f0492fSopenharmony_ci } 42d9f0492fSopenharmony_ci OH_ListRemove(&buffer->node); 43d9f0492fSopenharmony_ci free(buffer); 44d9f0492fSopenharmony_ci#ifdef LOOP_DEBUG 45d9f0492fSopenharmony_ci clock_gettime(CLOCK_MONOTONIC, &(endTime)); 46d9f0492fSopenharmony_ci diff = (long long)(endTime.tv_sec - startTime.tv_sec) * MILLION_MICROSECOND; 47d9f0492fSopenharmony_ci if (endTime.tv_nsec > startTime.tv_nsec) { 48d9f0492fSopenharmony_ci diff += (endTime.tv_nsec - startTime.tv_nsec) / THOUSAND_MILLISECOND; // 1000 ms 49d9f0492fSopenharmony_ci } else { 50d9f0492fSopenharmony_ci diff -= (endTime.tv_nsec - startTime.tv_nsec) / THOUSAND_MILLISECOND; // 1000 ms 51d9f0492fSopenharmony_ci } 52d9f0492fSopenharmony_ci LE_LOGI("DoAsyncEvent_ diff %ld", diff); 53d9f0492fSopenharmony_ci#endif 54d9f0492fSopenharmony_ci } 55d9f0492fSopenharmony_ci} 56d9f0492fSopenharmony_ci 57d9f0492fSopenharmony_ci#ifdef STARTUP_INIT_TEST 58d9f0492fSopenharmony_civoid LE_DoAsyncEvent(const LoopHandle loopHandle, const TaskHandle taskHandle) 59d9f0492fSopenharmony_ci{ 60d9f0492fSopenharmony_ci AsyncEventTask *asyncTask = (AsyncEventTask *)taskHandle; 61d9f0492fSopenharmony_ci while (!IsBufferEmpty(&asyncTask->stream)) { 62d9f0492fSopenharmony_ci DoAsyncEvent_(loopHandle, (AsyncEventTask *)taskHandle); 63d9f0492fSopenharmony_ci } 64d9f0492fSopenharmony_ci} 65d9f0492fSopenharmony_ci#endif 66d9f0492fSopenharmony_ci 67d9f0492fSopenharmony_cistatic LE_STATUS HandleAsyncEvent_(const LoopHandle loopHandle, const TaskHandle taskHandle, uint32_t oper) 68d9f0492fSopenharmony_ci{ 69d9f0492fSopenharmony_ci LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters"); 70d9f0492fSopenharmony_ci LE_LOGV("HandleAsyncEvent_ fd: %d oper 0x%x", GetSocketFd(taskHandle), oper); 71d9f0492fSopenharmony_ci EventLoop *loop = (EventLoop *)loopHandle; 72d9f0492fSopenharmony_ci AsyncEventTask *asyncTask = (AsyncEventTask *)taskHandle; 73d9f0492fSopenharmony_ci if (LE_TEST_FLAGS(oper, EVENT_READ)) { 74d9f0492fSopenharmony_ci uint64_t eventId = 0; 75d9f0492fSopenharmony_ci int ret = read(GetSocketFd(taskHandle), &eventId, sizeof(eventId)); 76d9f0492fSopenharmony_ci LE_LOGV("HandleAsyncEvent_ read fd:%d ret: %d eventId %llu", GetSocketFd(taskHandle), ret, eventId); 77d9f0492fSopenharmony_ci DoAsyncEvent_(loopHandle, asyncTask); 78d9f0492fSopenharmony_ci if (!IsBufferEmpty(&asyncTask->stream)) { 79d9f0492fSopenharmony_ci loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_WRITE); 80d9f0492fSopenharmony_ci return LE_SUCCESS; 81d9f0492fSopenharmony_ci } 82d9f0492fSopenharmony_ci } else { 83d9f0492fSopenharmony_ci static uint64_t eventId = 0; 84d9f0492fSopenharmony_ci (void)write(GetSocketFd(taskHandle), &eventId, sizeof(eventId)); 85d9f0492fSopenharmony_ci loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_READ); 86d9f0492fSopenharmony_ci eventId++; 87d9f0492fSopenharmony_ci } 88d9f0492fSopenharmony_ci return LE_SUCCESS; 89d9f0492fSopenharmony_ci} 90d9f0492fSopenharmony_ci 91d9f0492fSopenharmony_cistatic void HandleAsyncTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle) 92d9f0492fSopenharmony_ci{ 93d9f0492fSopenharmony_ci BaseTask *task = (BaseTask *)taskHandle; 94d9f0492fSopenharmony_ci DelTask((EventLoop *)loopHandle, task); 95d9f0492fSopenharmony_ci CloseTask(loopHandle, task); 96d9f0492fSopenharmony_ci close(task->taskId.fd); 97d9f0492fSopenharmony_ci} 98d9f0492fSopenharmony_ci 99d9f0492fSopenharmony_cistatic void DumpEventTaskInfo_(const TaskHandle task) 100d9f0492fSopenharmony_ci{ 101d9f0492fSopenharmony_ci INIT_CHECK(task != NULL, return); 102d9f0492fSopenharmony_ci BaseTask *baseTask = (BaseTask *)task; 103d9f0492fSopenharmony_ci AsyncEventTask *eventTask = (AsyncEventTask *)baseTask; 104d9f0492fSopenharmony_ci printf("\tfd: %d \n", eventTask->stream.base.taskId.fd); 105d9f0492fSopenharmony_ci printf("\t TaskType: %s\n", "EventTask"); 106d9f0492fSopenharmony_ci} 107d9f0492fSopenharmony_ci 108d9f0492fSopenharmony_ciLE_STATUS LE_CreateAsyncTask(const LoopHandle loopHandle, 109d9f0492fSopenharmony_ci TaskHandle *taskHandle, LE_ProcessAsyncEvent processAsyncEvent) 110d9f0492fSopenharmony_ci{ 111d9f0492fSopenharmony_ci LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters"); 112d9f0492fSopenharmony_ci LE_CHECK(processAsyncEvent != NULL, return LE_INVALID_PARAM, "Invalid parameters processAsyncEvent "); 113d9f0492fSopenharmony_ci 114d9f0492fSopenharmony_ci int fd = eventfd(1, EFD_NONBLOCK | EFD_CLOEXEC); 115d9f0492fSopenharmony_ci LE_CHECK(fd > 0, return LE_FAILURE, "Failed to event fd "); 116d9f0492fSopenharmony_ci LE_BaseInfo baseInfo = {TASK_EVENT | TASK_ASYNC_EVENT, NULL}; 117d9f0492fSopenharmony_ci AsyncEventTask *task = (AsyncEventTask *)CreateTask(loopHandle, fd, &baseInfo, sizeof(AsyncEventTask)); 118d9f0492fSopenharmony_ci LE_CHECK(task != NULL, close(fd); 119d9f0492fSopenharmony_ci return LE_NO_MEMORY, "Failed to create task"); 120d9f0492fSopenharmony_ci task->stream.base.handleEvent = HandleAsyncEvent_; 121d9f0492fSopenharmony_ci task->stream.base.innerClose = HandleAsyncTaskClose_; 122d9f0492fSopenharmony_ci task->stream.base.dumpTaskInfo = DumpEventTaskInfo_; 123d9f0492fSopenharmony_ci OH_ListInit(&task->stream.buffHead); 124d9f0492fSopenharmony_ci LoopMutexInit(&task->stream.mutex); 125d9f0492fSopenharmony_ci task->processAsyncEvent = processAsyncEvent; 126d9f0492fSopenharmony_ci EventLoop *loop = (EventLoop *)loopHandle; 127d9f0492fSopenharmony_ci loop->addEvent(loop, (const BaseTask *)task, EVENT_READ); 128d9f0492fSopenharmony_ci *taskHandle = (TaskHandle)task; 129d9f0492fSopenharmony_ci return LE_SUCCESS; 130d9f0492fSopenharmony_ci} 131d9f0492fSopenharmony_ci 132d9f0492fSopenharmony_ciLE_STATUS LE_StartAsyncEvent(const LoopHandle loopHandle, 133d9f0492fSopenharmony_ci const TaskHandle taskHandle, uint64_t eventId, const uint8_t *data, uint32_t buffLen) 134d9f0492fSopenharmony_ci{ 135d9f0492fSopenharmony_ci LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters"); 136d9f0492fSopenharmony_ci BufferHandle handle = LE_CreateBuffer(loopHandle, buffLen + 1 + sizeof(eventId)); 137d9f0492fSopenharmony_ci char *buff = (char *)LE_GetBufferInfo(handle, NULL, NULL); 138d9f0492fSopenharmony_ci LE_CHECK(buff != NULL, return LE_FAILURE, "Failed to get buff"); 139d9f0492fSopenharmony_ci int ret = memcpy_s(buff, sizeof(eventId), &eventId, sizeof(eventId)); 140d9f0492fSopenharmony_ci LE_CHECK(ret == 0, return -1, "Failed to copy data"); 141d9f0492fSopenharmony_ci if (data != NULL && buffLen > 0) { 142d9f0492fSopenharmony_ci ret = memcpy_s(buff + sizeof(eventId), buffLen, data, buffLen); 143d9f0492fSopenharmony_ci LE_CHECK(ret == 0, return -1, "Failed to copy data"); 144d9f0492fSopenharmony_ci buff[sizeof(eventId) + buffLen] = '\0'; 145d9f0492fSopenharmony_ci } 146d9f0492fSopenharmony_ci return LE_Send(loopHandle, taskHandle, handle, buffLen); 147d9f0492fSopenharmony_ci} 148d9f0492fSopenharmony_ci 149d9f0492fSopenharmony_civoid LE_StopAsyncTask(LoopHandle loopHandle, TaskHandle taskHandle) 150d9f0492fSopenharmony_ci{ 151d9f0492fSopenharmony_ci LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters"); 152d9f0492fSopenharmony_ci LE_CloseTask(loopHandle, taskHandle); 153d9f0492fSopenharmony_ci}