1d9f0492fSopenharmony_ci/*
2d9f0492fSopenharmony_ci * Copyright (c) 2021 Huawei Device Co., Ltd.
3d9f0492fSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4d9f0492fSopenharmony_ci * you may not use this file except in compliance with the License.
5d9f0492fSopenharmony_ci * You may obtain a copy of the License at
6d9f0492fSopenharmony_ci *
7d9f0492fSopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0
8d9f0492fSopenharmony_ci *
9d9f0492fSopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10d9f0492fSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11d9f0492fSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12d9f0492fSopenharmony_ci * See the License for the specific language governing permissions and
13d9f0492fSopenharmony_ci * limitations under the License.
14d9f0492fSopenharmony_ci */
15d9f0492fSopenharmony_ci#include "le_task.h"
16d9f0492fSopenharmony_ci#include <time.h>
17d9f0492fSopenharmony_ci#include <sys/eventfd.h>
18d9f0492fSopenharmony_ci
19d9f0492fSopenharmony_ci#include "le_loop.h"
20d9f0492fSopenharmony_ci
21d9f0492fSopenharmony_ci#define MILLION_MICROSECOND 1000000
22d9f0492fSopenharmony_ci#define THOUSAND_MILLISECOND 1000
23d9f0492fSopenharmony_ci
24d9f0492fSopenharmony_cistatic void DoAsyncEvent_(const LoopHandle loopHandle, AsyncEventTask *asyncTask)
25d9f0492fSopenharmony_ci{
26d9f0492fSopenharmony_ci    LE_CHECK(loopHandle != NULL && asyncTask != NULL, return, "Invalid parameters");
27d9f0492fSopenharmony_ci#ifdef LOOP_DEBUG
28d9f0492fSopenharmony_ci    struct timespec startTime = {0};
29d9f0492fSopenharmony_ci    struct timespec endTime = {0};
30d9f0492fSopenharmony_ci    long long diff;
31d9f0492fSopenharmony_ci    clock_gettime(CLOCK_MONOTONIC, &(startTime));
32d9f0492fSopenharmony_ci#endif
33d9f0492fSopenharmony_ci    StreamTask *task = &asyncTask->stream;
34d9f0492fSopenharmony_ci    ListNode *node = task->buffHead.next;
35d9f0492fSopenharmony_ci    if (node != &task->buffHead) {
36d9f0492fSopenharmony_ci        LE_Buffer *buffer = ListEntry(node, LE_Buffer, node);
37d9f0492fSopenharmony_ci        uint64_t eventId = *(uint64_t*)(buffer->data);
38d9f0492fSopenharmony_ci        if (asyncTask->processAsyncEvent) {
39d9f0492fSopenharmony_ci            asyncTask->processAsyncEvent((TaskHandle)asyncTask, eventId,
40d9f0492fSopenharmony_ci                (uint8_t *)(buffer->data + sizeof(uint64_t)), buffer->dataSize);
41d9f0492fSopenharmony_ci        }
42d9f0492fSopenharmony_ci        OH_ListRemove(&buffer->node);
43d9f0492fSopenharmony_ci        free(buffer);
44d9f0492fSopenharmony_ci#ifdef LOOP_DEBUG
45d9f0492fSopenharmony_ci        clock_gettime(CLOCK_MONOTONIC, &(endTime));
46d9f0492fSopenharmony_ci        diff = (long long)(endTime.tv_sec - startTime.tv_sec) * MILLION_MICROSECOND;
47d9f0492fSopenharmony_ci        if (endTime.tv_nsec > startTime.tv_nsec) {
48d9f0492fSopenharmony_ci            diff += (endTime.tv_nsec - startTime.tv_nsec) / THOUSAND_MILLISECOND; // 1000 ms
49d9f0492fSopenharmony_ci        } else {
50d9f0492fSopenharmony_ci            diff -= (endTime.tv_nsec - startTime.tv_nsec) / THOUSAND_MILLISECOND; // 1000 ms
51d9f0492fSopenharmony_ci        }
52d9f0492fSopenharmony_ci        LE_LOGI("DoAsyncEvent_ diff %ld",  diff);
53d9f0492fSopenharmony_ci#endif
54d9f0492fSopenharmony_ci    }
55d9f0492fSopenharmony_ci}
56d9f0492fSopenharmony_ci
57d9f0492fSopenharmony_ci#ifdef STARTUP_INIT_TEST
58d9f0492fSopenharmony_civoid LE_DoAsyncEvent(const LoopHandle loopHandle, const TaskHandle taskHandle)
59d9f0492fSopenharmony_ci{
60d9f0492fSopenharmony_ci    AsyncEventTask *asyncTask = (AsyncEventTask *)taskHandle;
61d9f0492fSopenharmony_ci    while (!IsBufferEmpty(&asyncTask->stream)) {
62d9f0492fSopenharmony_ci        DoAsyncEvent_(loopHandle, (AsyncEventTask *)taskHandle);
63d9f0492fSopenharmony_ci    }
64d9f0492fSopenharmony_ci}
65d9f0492fSopenharmony_ci#endif
66d9f0492fSopenharmony_ci
67d9f0492fSopenharmony_cistatic LE_STATUS HandleAsyncEvent_(const LoopHandle loopHandle, const TaskHandle taskHandle, uint32_t oper)
68d9f0492fSopenharmony_ci{
69d9f0492fSopenharmony_ci    LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
70d9f0492fSopenharmony_ci    LE_LOGV("HandleAsyncEvent_ fd: %d oper 0x%x", GetSocketFd(taskHandle), oper);
71d9f0492fSopenharmony_ci    EventLoop *loop = (EventLoop *)loopHandle;
72d9f0492fSopenharmony_ci    AsyncEventTask *asyncTask = (AsyncEventTask *)taskHandle;
73d9f0492fSopenharmony_ci    if (LE_TEST_FLAGS(oper, EVENT_READ)) {
74d9f0492fSopenharmony_ci        uint64_t eventId = 0;
75d9f0492fSopenharmony_ci        int ret = read(GetSocketFd(taskHandle), &eventId, sizeof(eventId));
76d9f0492fSopenharmony_ci        LE_LOGV("HandleAsyncEvent_ read fd:%d ret: %d eventId %llu", GetSocketFd(taskHandle), ret, eventId);
77d9f0492fSopenharmony_ci        DoAsyncEvent_(loopHandle, asyncTask);
78d9f0492fSopenharmony_ci        if (!IsBufferEmpty(&asyncTask->stream)) {
79d9f0492fSopenharmony_ci            loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_WRITE);
80d9f0492fSopenharmony_ci            return LE_SUCCESS;
81d9f0492fSopenharmony_ci        }
82d9f0492fSopenharmony_ci    } else {
83d9f0492fSopenharmony_ci        static uint64_t eventId = 0;
84d9f0492fSopenharmony_ci        (void)write(GetSocketFd(taskHandle), &eventId, sizeof(eventId));
85d9f0492fSopenharmony_ci        loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_READ);
86d9f0492fSopenharmony_ci        eventId++;
87d9f0492fSopenharmony_ci    }
88d9f0492fSopenharmony_ci    return LE_SUCCESS;
89d9f0492fSopenharmony_ci}
90d9f0492fSopenharmony_ci
91d9f0492fSopenharmony_cistatic void HandleAsyncTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle)
92d9f0492fSopenharmony_ci{
93d9f0492fSopenharmony_ci    BaseTask *task = (BaseTask *)taskHandle;
94d9f0492fSopenharmony_ci    DelTask((EventLoop *)loopHandle, task);
95d9f0492fSopenharmony_ci    CloseTask(loopHandle, task);
96d9f0492fSopenharmony_ci    close(task->taskId.fd);
97d9f0492fSopenharmony_ci}
98d9f0492fSopenharmony_ci
99d9f0492fSopenharmony_cistatic void DumpEventTaskInfo_(const TaskHandle task)
100d9f0492fSopenharmony_ci{
101d9f0492fSopenharmony_ci    INIT_CHECK(task != NULL, return);
102d9f0492fSopenharmony_ci    BaseTask *baseTask = (BaseTask *)task;
103d9f0492fSopenharmony_ci    AsyncEventTask *eventTask = (AsyncEventTask *)baseTask;
104d9f0492fSopenharmony_ci    printf("\tfd: %d \n", eventTask->stream.base.taskId.fd);
105d9f0492fSopenharmony_ci    printf("\t  TaskType: %s\n", "EventTask");
106d9f0492fSopenharmony_ci}
107d9f0492fSopenharmony_ci
108d9f0492fSopenharmony_ciLE_STATUS LE_CreateAsyncTask(const LoopHandle loopHandle,
109d9f0492fSopenharmony_ci    TaskHandle *taskHandle, LE_ProcessAsyncEvent processAsyncEvent)
110d9f0492fSopenharmony_ci{
111d9f0492fSopenharmony_ci    LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
112d9f0492fSopenharmony_ci    LE_CHECK(processAsyncEvent != NULL, return LE_INVALID_PARAM, "Invalid parameters processAsyncEvent ");
113d9f0492fSopenharmony_ci
114d9f0492fSopenharmony_ci    int fd = eventfd(1, EFD_NONBLOCK | EFD_CLOEXEC);
115d9f0492fSopenharmony_ci    LE_CHECK(fd > 0, return LE_FAILURE, "Failed to event fd ");
116d9f0492fSopenharmony_ci    LE_BaseInfo baseInfo = {TASK_EVENT | TASK_ASYNC_EVENT, NULL};
117d9f0492fSopenharmony_ci    AsyncEventTask *task = (AsyncEventTask *)CreateTask(loopHandle, fd, &baseInfo, sizeof(AsyncEventTask));
118d9f0492fSopenharmony_ci    LE_CHECK(task != NULL, close(fd);
119d9f0492fSopenharmony_ci        return LE_NO_MEMORY, "Failed to create task");
120d9f0492fSopenharmony_ci    task->stream.base.handleEvent = HandleAsyncEvent_;
121d9f0492fSopenharmony_ci    task->stream.base.innerClose = HandleAsyncTaskClose_;
122d9f0492fSopenharmony_ci    task->stream.base.dumpTaskInfo = DumpEventTaskInfo_;
123d9f0492fSopenharmony_ci    OH_ListInit(&task->stream.buffHead);
124d9f0492fSopenharmony_ci    LoopMutexInit(&task->stream.mutex);
125d9f0492fSopenharmony_ci    task->processAsyncEvent = processAsyncEvent;
126d9f0492fSopenharmony_ci    EventLoop *loop = (EventLoop *)loopHandle;
127d9f0492fSopenharmony_ci    loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
128d9f0492fSopenharmony_ci    *taskHandle = (TaskHandle)task;
129d9f0492fSopenharmony_ci    return LE_SUCCESS;
130d9f0492fSopenharmony_ci}
131d9f0492fSopenharmony_ci
132d9f0492fSopenharmony_ciLE_STATUS LE_StartAsyncEvent(const LoopHandle loopHandle,
133d9f0492fSopenharmony_ci    const TaskHandle taskHandle, uint64_t eventId, const uint8_t *data, uint32_t buffLen)
134d9f0492fSopenharmony_ci{
135d9f0492fSopenharmony_ci    LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
136d9f0492fSopenharmony_ci    BufferHandle handle = LE_CreateBuffer(loopHandle, buffLen + 1 + sizeof(eventId));
137d9f0492fSopenharmony_ci    char *buff = (char *)LE_GetBufferInfo(handle, NULL, NULL);
138d9f0492fSopenharmony_ci    LE_CHECK(buff != NULL, return LE_FAILURE, "Failed to get buff");
139d9f0492fSopenharmony_ci    int ret = memcpy_s(buff, sizeof(eventId), &eventId, sizeof(eventId));
140d9f0492fSopenharmony_ci    LE_CHECK(ret == 0, return -1, "Failed to copy data");
141d9f0492fSopenharmony_ci    if (data != NULL && buffLen > 0) {
142d9f0492fSopenharmony_ci        ret = memcpy_s(buff + sizeof(eventId), buffLen, data, buffLen);
143d9f0492fSopenharmony_ci        LE_CHECK(ret == 0, return -1, "Failed to copy data");
144d9f0492fSopenharmony_ci        buff[sizeof(eventId) + buffLen] = '\0';
145d9f0492fSopenharmony_ci    }
146d9f0492fSopenharmony_ci    return LE_Send(loopHandle, taskHandle, handle, buffLen);
147d9f0492fSopenharmony_ci}
148d9f0492fSopenharmony_ci
149d9f0492fSopenharmony_civoid LE_StopAsyncTask(LoopHandle loopHandle, TaskHandle taskHandle)
150d9f0492fSopenharmony_ci{
151d9f0492fSopenharmony_ci    LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
152d9f0492fSopenharmony_ci    LE_CloseTask(loopHandle, taskHandle);
153d9f0492fSopenharmony_ci}