1/*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15#include "le_task.h"
16#include <time.h>
17#include <sys/eventfd.h>
18
19#include "le_loop.h"
20
21#define MILLION_MICROSECOND 1000000
22#define THOUSAND_MILLISECOND 1000
23
24static void DoAsyncEvent_(const LoopHandle loopHandle, AsyncEventTask *asyncTask)
25{
26    LE_CHECK(loopHandle != NULL && asyncTask != NULL, return, "Invalid parameters");
27#ifdef LOOP_DEBUG
28    struct timespec startTime = {0};
29    struct timespec endTime = {0};
30    long long diff;
31    clock_gettime(CLOCK_MONOTONIC, &(startTime));
32#endif
33    StreamTask *task = &asyncTask->stream;
34    ListNode *node = task->buffHead.next;
35    if (node != &task->buffHead) {
36        LE_Buffer *buffer = ListEntry(node, LE_Buffer, node);
37        uint64_t eventId = *(uint64_t*)(buffer->data);
38        if (asyncTask->processAsyncEvent) {
39            asyncTask->processAsyncEvent((TaskHandle)asyncTask, eventId,
40                (uint8_t *)(buffer->data + sizeof(uint64_t)), buffer->dataSize);
41        }
42        OH_ListRemove(&buffer->node);
43        free(buffer);
44#ifdef LOOP_DEBUG
45        clock_gettime(CLOCK_MONOTONIC, &(endTime));
46        diff = (long long)(endTime.tv_sec - startTime.tv_sec) * MILLION_MICROSECOND;
47        if (endTime.tv_nsec > startTime.tv_nsec) {
48            diff += (endTime.tv_nsec - startTime.tv_nsec) / THOUSAND_MILLISECOND; // 1000 ms
49        } else {
50            diff -= (endTime.tv_nsec - startTime.tv_nsec) / THOUSAND_MILLISECOND; // 1000 ms
51        }
52        LE_LOGI("DoAsyncEvent_ diff %ld",  diff);
53#endif
54    }
55}
56
57#ifdef STARTUP_INIT_TEST
58void LE_DoAsyncEvent(const LoopHandle loopHandle, const TaskHandle taskHandle)
59{
60    AsyncEventTask *asyncTask = (AsyncEventTask *)taskHandle;
61    while (!IsBufferEmpty(&asyncTask->stream)) {
62        DoAsyncEvent_(loopHandle, (AsyncEventTask *)taskHandle);
63    }
64}
65#endif
66
67static LE_STATUS HandleAsyncEvent_(const LoopHandle loopHandle, const TaskHandle taskHandle, uint32_t oper)
68{
69    LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
70    LE_LOGV("HandleAsyncEvent_ fd: %d oper 0x%x", GetSocketFd(taskHandle), oper);
71    EventLoop *loop = (EventLoop *)loopHandle;
72    AsyncEventTask *asyncTask = (AsyncEventTask *)taskHandle;
73    if (LE_TEST_FLAGS(oper, EVENT_READ)) {
74        uint64_t eventId = 0;
75        int ret = read(GetSocketFd(taskHandle), &eventId, sizeof(eventId));
76        LE_LOGV("HandleAsyncEvent_ read fd:%d ret: %d eventId %llu", GetSocketFd(taskHandle), ret, eventId);
77        DoAsyncEvent_(loopHandle, asyncTask);
78        if (!IsBufferEmpty(&asyncTask->stream)) {
79            loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_WRITE);
80            return LE_SUCCESS;
81        }
82    } else {
83        static uint64_t eventId = 0;
84        (void)write(GetSocketFd(taskHandle), &eventId, sizeof(eventId));
85        loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_READ);
86        eventId++;
87    }
88    return LE_SUCCESS;
89}
90
91static void HandleAsyncTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle)
92{
93    BaseTask *task = (BaseTask *)taskHandle;
94    DelTask((EventLoop *)loopHandle, task);
95    CloseTask(loopHandle, task);
96    close(task->taskId.fd);
97}
98
99static void DumpEventTaskInfo_(const TaskHandle task)
100{
101    INIT_CHECK(task != NULL, return);
102    BaseTask *baseTask = (BaseTask *)task;
103    AsyncEventTask *eventTask = (AsyncEventTask *)baseTask;
104    printf("\tfd: %d \n", eventTask->stream.base.taskId.fd);
105    printf("\t  TaskType: %s\n", "EventTask");
106}
107
108LE_STATUS LE_CreateAsyncTask(const LoopHandle loopHandle,
109    TaskHandle *taskHandle, LE_ProcessAsyncEvent processAsyncEvent)
110{
111    LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
112    LE_CHECK(processAsyncEvent != NULL, return LE_INVALID_PARAM, "Invalid parameters processAsyncEvent ");
113
114    int fd = eventfd(1, EFD_NONBLOCK | EFD_CLOEXEC);
115    LE_CHECK(fd > 0, return LE_FAILURE, "Failed to event fd ");
116    LE_BaseInfo baseInfo = {TASK_EVENT | TASK_ASYNC_EVENT, NULL};
117    AsyncEventTask *task = (AsyncEventTask *)CreateTask(loopHandle, fd, &baseInfo, sizeof(AsyncEventTask));
118    LE_CHECK(task != NULL, close(fd);
119        return LE_NO_MEMORY, "Failed to create task");
120    task->stream.base.handleEvent = HandleAsyncEvent_;
121    task->stream.base.innerClose = HandleAsyncTaskClose_;
122    task->stream.base.dumpTaskInfo = DumpEventTaskInfo_;
123    OH_ListInit(&task->stream.buffHead);
124    LoopMutexInit(&task->stream.mutex);
125    task->processAsyncEvent = processAsyncEvent;
126    EventLoop *loop = (EventLoop *)loopHandle;
127    loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
128    *taskHandle = (TaskHandle)task;
129    return LE_SUCCESS;
130}
131
132LE_STATUS LE_StartAsyncEvent(const LoopHandle loopHandle,
133    const TaskHandle taskHandle, uint64_t eventId, const uint8_t *data, uint32_t buffLen)
134{
135    LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
136    BufferHandle handle = LE_CreateBuffer(loopHandle, buffLen + 1 + sizeof(eventId));
137    char *buff = (char *)LE_GetBufferInfo(handle, NULL, NULL);
138    LE_CHECK(buff != NULL, return LE_FAILURE, "Failed to get buff");
139    int ret = memcpy_s(buff, sizeof(eventId), &eventId, sizeof(eventId));
140    LE_CHECK(ret == 0, return -1, "Failed to copy data");
141    if (data != NULL && buffLen > 0) {
142        ret = memcpy_s(buff + sizeof(eventId), buffLen, data, buffLen);
143        LE_CHECK(ret == 0, return -1, "Failed to copy data");
144        buff[sizeof(eventId) + buffLen] = '\0';
145    }
146    return LE_Send(loopHandle, taskHandle, handle, buffLen);
147}
148
149void LE_StopAsyncTask(LoopHandle loopHandle, TaskHandle taskHandle)
150{
151    LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
152    LE_CloseTask(loopHandle, taskHandle);
153}