1/*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <condition_variable>
17#include <mutex>
18#include <thread>
19
20#include <base/containers/vector.h>
21
22#include <meta/base/interface_macros.h>
23#include <meta/interface/intf_task_queue.h>
24#include <meta/interface/intf_task_queue_registry.h>
25
26#include "future.h"
27#include "meta_object.h"
28#include "task_queue.h"
29
30META_BEGIN_NAMESPACE()
31
32class ThreadedTaskQueue
33    : public Internal::MetaObjectFwd<ThreadedTaskQueue, ClassId::ThreadedTaskQueue, IThreadedTaskQueue, TaskQueueImpl> {
34public:
35    using Super =
36        Internal::MetaObjectFwd<ThreadedTaskQueue, ClassId::ThreadedTaskQueue, IThreadedTaskQueue, TaskQueueImpl>;
37    using Token = ITaskQueue::Token;
38
39    META_NO_COPY_MOVE(ThreadedTaskQueue)
40
41    ThreadedTaskQueue() = default;
42    ~ThreadedTaskQueue() override
43    {
44        Shutdown();
45    }
46
47    bool Build(const IMetadata::Ptr& data) override
48    {
49        bool ret = Super::Build(data);
50        if (ret) {
51            self_ = GetSelf<ITaskQueue>();
52            thread_ = std::thread([this]() { ProcessTasks(); });
53        }
54        return ret;
55    }
56
57    bool InvokeTask(const ITaskQueueTask::Ptr& task) override
58    {
59        auto q = GetTaskQueueRegistry().SetCurrentTaskQueue(self_);
60        auto ret = task->Invoke();
61        GetTaskQueueRegistry().SetCurrentTaskQueue(q);
62        return ret;
63    }
64
65    void Shutdown() override
66    {
67        Close();
68        addCondition_.notify_one();
69        if (thread_.joinable()) {
70            thread_.join();
71        }
72    }
73
74    void CancelTask(Token token) override
75    {
76        TaskQueueImpl::CancelTask(token);
77    }
78
79    Token AddTask(ITaskQueueTask::Ptr p) override
80    {
81        return AddTask(BASE_NS::move(p), TimeSpan::Milliseconds(0));
82    }
83
84    Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay) override
85    {
86        auto t = TaskQueueImpl::AddTask(BASE_NS::move(p), delay, Time() + delay);
87        if (t) {
88            addCondition_.notify_one();
89        }
90        return t;
91    }
92
93    IFuture::Ptr AddWaitableTask(ITaskQueueWaitableTask::Ptr p) override
94    {
95        IPromise::Ptr promise(new Promise);
96        BASE_NS::shared_ptr<PromisedQueueTask> task(new PromisedQueueTask(BASE_NS::move(p), promise));
97        auto f = task->GetFuture();
98        AddTask(BASE_NS::move(task));
99        return f;
100    }
101
102    void ProcessTasks()
103    {
104        std::unique_lock lock { mutex_ };
105        execThread_ = std::this_thread::get_id();
106        while (!terminate_) {
107            if (!tasks_.empty()) {
108                TimeSpan delta = tasks_.back().executeTime - Time();
109                // wait for next execute time (or trigger which ever is first). and see if we can now process things..
110                // technically we will always be a bit late here. "it's a best effort"
111                if (delta > TimeSpan::Microseconds(0)) {
112                    addCondition_.wait_for(lock, std::chrono::microseconds(delta.ToMicroseconds()));
113                }
114            } else {
115                // infinite wait, since the queue is empty..
116                addCondition_.wait(lock);
117            }
118            auto curTime = Time();
119            TaskQueueImpl::ProcessTasks(lock, curTime);
120        }
121    }
122
123private:
124    std::condition_variable addCondition_;
125    std::thread thread_;
126};
127
128namespace Internal {
129
130IObjectFactory::Ptr GetThreadedTaskQueueFactory()
131{
132    return ThreadedTaskQueue::GetFactory();
133}
134
135} // namespace Internal
136
137META_END_NAMESPACE()
138