18bf80f4bSopenharmony_ci/*
28bf80f4bSopenharmony_ci * Copyright (c) 2024 Huawei Device Co., Ltd.
38bf80f4bSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
48bf80f4bSopenharmony_ci * you may not use this file except in compliance with the License.
58bf80f4bSopenharmony_ci * You may obtain a copy of the License at
68bf80f4bSopenharmony_ci *
78bf80f4bSopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
88bf80f4bSopenharmony_ci *
98bf80f4bSopenharmony_ci * Unless required by applicable law or agreed to in writing, software
108bf80f4bSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
118bf80f4bSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
128bf80f4bSopenharmony_ci * See the License for the specific language governing permissions and
138bf80f4bSopenharmony_ci * limitations under the License.
148bf80f4bSopenharmony_ci */
158bf80f4bSopenharmony_ci
168bf80f4bSopenharmony_ci#include <mutex>
178bf80f4bSopenharmony_ci#include <thread>
188bf80f4bSopenharmony_ci
198bf80f4bSopenharmony_ci#include <base/containers/vector.h>
208bf80f4bSopenharmony_ci
218bf80f4bSopenharmony_ci#include <meta/interface/intf_task_queue.h>
228bf80f4bSopenharmony_ci#include <meta/interface/intf_task_queue_registry.h>
238bf80f4bSopenharmony_ci
248bf80f4bSopenharmony_ci#include "future.h"
258bf80f4bSopenharmony_ci#include "meta_object.h"
268bf80f4bSopenharmony_ci#include "task_queue.h"
278bf80f4bSopenharmony_ci
288bf80f4bSopenharmony_ciMETA_BEGIN_NAMESPACE()
298bf80f4bSopenharmony_ci
308bf80f4bSopenharmony_ci// notice, this is object only so we can construct it via object registery
318bf80f4bSopenharmony_ciclass PollingTaskQueue
328bf80f4bSopenharmony_ci    : public Internal::MetaObjectFwd<PollingTaskQueue, ClassId::PollingTaskQueue, IPollingTaskQueue, TaskQueueImpl> {
338bf80f4bSopenharmony_cipublic:
348bf80f4bSopenharmony_ci    using Super =
358bf80f4bSopenharmony_ci        Internal::MetaObjectFwd<PollingTaskQueue, ClassId::PollingTaskQueue, IPollingTaskQueue, TaskQueueImpl>;
368bf80f4bSopenharmony_ci    using Token = ITaskQueue::Token;
378bf80f4bSopenharmony_ci
388bf80f4bSopenharmony_ci    bool Build(const IMetadata::Ptr& data) override
398bf80f4bSopenharmony_ci    {
408bf80f4bSopenharmony_ci        bool ret = Super::Build(data);
418bf80f4bSopenharmony_ci        if (ret) {
428bf80f4bSopenharmony_ci            self_ = GetSelf<ITaskQueue>();
438bf80f4bSopenharmony_ci        }
448bf80f4bSopenharmony_ci        return ret;
458bf80f4bSopenharmony_ci    }
468bf80f4bSopenharmony_ci
478bf80f4bSopenharmony_ci    bool InvokeTask(const ITaskQueueTask::Ptr& task) override
488bf80f4bSopenharmony_ci    {
498bf80f4bSopenharmony_ci        auto q = GetTaskQueueRegistry().SetCurrentTaskQueue(self_);
508bf80f4bSopenharmony_ci        auto ret = task->Invoke();
518bf80f4bSopenharmony_ci        GetTaskQueueRegistry().SetCurrentTaskQueue(q);
528bf80f4bSopenharmony_ci        return ret;
538bf80f4bSopenharmony_ci    }
548bf80f4bSopenharmony_ci    void CancelTask(Token token) override
558bf80f4bSopenharmony_ci    {
568bf80f4bSopenharmony_ci        TaskQueueImpl::CancelTask(token);
578bf80f4bSopenharmony_ci    }
588bf80f4bSopenharmony_ci
598bf80f4bSopenharmony_ci    Token AddTask(ITaskQueueTask::Ptr p) override
608bf80f4bSopenharmony_ci    {
618bf80f4bSopenharmony_ci        return AddTask(BASE_NS::move(p), TimeSpan::Milliseconds(0));
628bf80f4bSopenharmony_ci    }
638bf80f4bSopenharmony_ci
648bf80f4bSopenharmony_ci    Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay) override
658bf80f4bSopenharmony_ci    {
668bf80f4bSopenharmony_ci        return TaskQueueImpl::AddTask(BASE_NS::move(p), delay, Time() + delay);
678bf80f4bSopenharmony_ci    }
688bf80f4bSopenharmony_ci
698bf80f4bSopenharmony_ci    IFuture::Ptr AddWaitableTask(ITaskQueueWaitableTask::Ptr p) override
708bf80f4bSopenharmony_ci    {
718bf80f4bSopenharmony_ci        IPromise::Ptr promise(new Promise);
728bf80f4bSopenharmony_ci        BASE_NS::shared_ptr<PromisedQueueTask> task(new PromisedQueueTask(BASE_NS::move(p), promise));
738bf80f4bSopenharmony_ci        auto f = task->GetFuture();
748bf80f4bSopenharmony_ci        AddTask(BASE_NS::move(task));
758bf80f4bSopenharmony_ci        return f;
768bf80f4bSopenharmony_ci    }
778bf80f4bSopenharmony_ci
788bf80f4bSopenharmony_ci    void ProcessTasks() override
798bf80f4bSopenharmony_ci    {
808bf80f4bSopenharmony_ci        TimeSpan ctime = Time();
818bf80f4bSopenharmony_ci        std::unique_lock lock { mutex_ };
828bf80f4bSopenharmony_ci        if (ctime != lastTime_) {
838bf80f4bSopenharmony_ci            lastTime_ = ctime;
848bf80f4bSopenharmony_ci            execThread_ = std::this_thread::get_id();
858bf80f4bSopenharmony_ci            TaskQueueImpl::ProcessTasks(lock, ctime);
868bf80f4bSopenharmony_ci            execThread_ = std::thread::id();
878bf80f4bSopenharmony_ci        } else {
888bf80f4bSopenharmony_ci            // non issue. but.
898bf80f4bSopenharmony_ci            CORE_LOG_V("Double call to ProcessTasks.");
908bf80f4bSopenharmony_ci        }
918bf80f4bSopenharmony_ci    }
928bf80f4bSopenharmony_ci
938bf80f4bSopenharmony_ciprivate:
948bf80f4bSopenharmony_ci    TimeSpan lastTime_;
958bf80f4bSopenharmony_ci};
968bf80f4bSopenharmony_ci// Internal api for engine task queue
978bf80f4bSopenharmony_cinamespace Internal {
988bf80f4bSopenharmony_ci
998bf80f4bSopenharmony_ciIObjectFactory::Ptr GetPollingTaskQueueFactory()
1008bf80f4bSopenharmony_ci{
1018bf80f4bSopenharmony_ci    return PollingTaskQueue::GetFactory();
1028bf80f4bSopenharmony_ci}
1038bf80f4bSopenharmony_ci
1048bf80f4bSopenharmony_ci} // namespace Internal
1058bf80f4bSopenharmony_ci
1068bf80f4bSopenharmony_ciMETA_END_NAMESPACE()
107