1/* 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include <condition_variable> 17#include <mutex> 18#include <thread> 19 20#include <base/containers/vector.h> 21 22#include <meta/base/interface_macros.h> 23#include <meta/interface/intf_task_queue.h> 24#include <meta/interface/intf_task_queue_registry.h> 25 26#include "future.h" 27#include "meta_object.h" 28#include "task_queue.h" 29 30META_BEGIN_NAMESPACE() 31 32class ThreadedTaskQueue 33 : public Internal::MetaObjectFwd<ThreadedTaskQueue, ClassId::ThreadedTaskQueue, IThreadedTaskQueue, TaskQueueImpl> { 34public: 35 using Super = 36 Internal::MetaObjectFwd<ThreadedTaskQueue, ClassId::ThreadedTaskQueue, IThreadedTaskQueue, TaskQueueImpl>; 37 using Token = ITaskQueue::Token; 38 39 META_NO_COPY_MOVE(ThreadedTaskQueue) 40 41 ThreadedTaskQueue() = default; 42 ~ThreadedTaskQueue() override 43 { 44 Shutdown(); 45 } 46 47 bool Build(const IMetadata::Ptr& data) override 48 { 49 bool ret = Super::Build(data); 50 if (ret) { 51 self_ = GetSelf<ITaskQueue>(); 52 thread_ = std::thread([this]() { ProcessTasks(); }); 53 } 54 return ret; 55 } 56 57 bool InvokeTask(const ITaskQueueTask::Ptr& task) override 58 { 59 auto q = GetTaskQueueRegistry().SetCurrentTaskQueue(self_); 60 auto ret = task->Invoke(); 61 GetTaskQueueRegistry().SetCurrentTaskQueue(q); 62 return ret; 63 } 64 65 void Shutdown() override 66 { 67 Close(); 68 addCondition_.notify_one(); 69 if (thread_.joinable()) { 70 thread_.join(); 71 } 72 } 73 74 void CancelTask(Token token) override 75 { 76 TaskQueueImpl::CancelTask(token); 77 } 78 79 Token AddTask(ITaskQueueTask::Ptr p) override 80 { 81 return AddTask(BASE_NS::move(p), TimeSpan::Milliseconds(0)); 82 } 83 84 Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay) override 85 { 86 auto t = TaskQueueImpl::AddTask(BASE_NS::move(p), delay, Time() + delay); 87 if (t) { 88 addCondition_.notify_one(); 89 } 90 return t; 91 } 92 93 IFuture::Ptr AddWaitableTask(ITaskQueueWaitableTask::Ptr p) override 94 { 95 IPromise::Ptr promise(new Promise); 96 BASE_NS::shared_ptr<PromisedQueueTask> task(new PromisedQueueTask(BASE_NS::move(p), promise)); 97 auto f = task->GetFuture(); 98 AddTask(BASE_NS::move(task)); 99 return f; 100 } 101 102 void ProcessTasks() 103 { 104 std::unique_lock lock { mutex_ }; 105 execThread_ = std::this_thread::get_id(); 106 while (!terminate_) { 107 if (!tasks_.empty()) { 108 TimeSpan delta = tasks_.back().executeTime - Time(); 109 // wait for next execute time (or trigger which ever is first). and see if we can now process things.. 110 // technically we will always be a bit late here. "it's a best effort" 111 if (delta > TimeSpan::Microseconds(0)) { 112 addCondition_.wait_for(lock, std::chrono::microseconds(delta.ToMicroseconds())); 113 } 114 } else { 115 // infinite wait, since the queue is empty.. 116 addCondition_.wait(lock); 117 } 118 auto curTime = Time(); 119 TaskQueueImpl::ProcessTasks(lock, curTime); 120 } 121 } 122 123private: 124 std::condition_variable addCondition_; 125 std::thread thread_; 126}; 127 128namespace Internal { 129 130IObjectFactory::Ptr GetThreadedTaskQueueFactory() 131{ 132 return ThreadedTaskQueue::GetFactory(); 133} 134 135} // namespace Internal 136 137META_END_NAMESPACE() 138