1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef FFRT_CPUWORKER_MANAGER_HPP
17 #define FFRT_CPUWORKER_MANAGER_HPP
18 
19 #include "eu/worker_manager.h"
20 #include "eu/cpu_worker.h"
21 #include "eu/cpu_monitor.h"
22 #include "eu/cpu_manager_interface.h"
23 #include "sync/poller.h"
24 #include "util/spmc_queue.h"
25 #include "tm/cpu_task.h"
26 
27 namespace ffrt {
28 struct WorkerSleepCtl {
29     std::mutex mutex;
30     std::condition_variable cv;
31 };
32 
33 class CPUWorkerManager : public WorkerManager {
34 public:
35     CPUWorkerManager();
36 
37     ~CPUWorkerManager() override
38     {
39     }
40 
41     void NotifyTaskAdded(const QoS& qos) override;
42     void NotifyLocalTaskAdded(const QoS& qos) override;
43     void NotifyWorkers(const QoS& qos, int number) override;
44 
45     std::mutex* GetSleepCtl(int qos) override
46     {
47         return &sleepCtl[qos].mutex;
48     }
49 
AddStealingWorker(const QoS& qos)50     void AddStealingWorker(const QoS& qos)
51     {
52         stealWorkers[qos].fetch_add(1);
53     }
54 
SubStealingWorker(const QoS& qos)55     void SubStealingWorker(const QoS& qos)
56     {
57         while (1) {
58             uint64_t stealWorkersNum = stealWorkers[qos].load();
59             if (stealWorkersNum == 0) {
60                 return;
61             }
62             if (atomic_compare_exchange_weak(&stealWorkers[qos], &stealWorkersNum, stealWorkersNum - 1)) return;
63         }
64     }
65 
GetStealingWorkers(const QoS& qos)66     uint64_t GetStealingWorkers(const QoS& qos)
67     {
68         return stealWorkers[qos].load(std::memory_order_relaxed);
69     }
70     CPUMonitor* GetCPUMonitor() override
71     {
72         return monitor;
73     }
74 
75 protected:
76     virtual void WorkerPrepare(WorkerThread* thread) = 0;
77     virtual void WakeupWorkers(const QoS& qos) = 0;
78     bool IncWorker(const QoS& qos) override;
79     int GetTaskCount(const QoS& qos);
80     int GetWorkerCount(const QoS& qos);
81     void WorkerJoinTg(const QoS& qos, pid_t pid);
82 
83     CPUMonitor* monitor = nullptr;
84     bool tearDown = false;
85     WorkerSleepCtl sleepCtl[QoS::MaxNum()];
86     void WorkerLeaveTg(const QoS& qos, pid_t pid);
87     uint8_t polling_[QoS::MaxNum()] = {0};
88     fast_mutex pollersMtx[QoS::MaxNum()];
89     void WorkerRetired(WorkerThread* thread);
90 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
91     bool IsExceedRunningThreshold(const WorkerThread* thread);
92     bool IsBlockAwareInit(void);
93 #endif
94 
95 private:
96     bool WorkerTearDown();
97     bool DecWorker() override
98     {return false;}
99     void NotifyTaskPicked(const WorkerThread* thread);
100     /* strategy options for task pick up */
101     virtual CPUEUTask* PickUpTaskFromGlobalQueue(WorkerThread* thread) = 0;
102     CPUEUTask* PickUpTaskFromLocalQueue(WorkerThread* thread);
103 
104     /* strategy options for worker wait action */
105     virtual WorkerAction WorkerIdleAction(const WorkerThread* thread) = 0;
106 
107     void WorkerSetup(WorkerThread* thread);
108     PollerRet TryPoll(const WorkerThread* thread, int timeout = -1);
109     unsigned int StealTaskBatch(WorkerThread* thread);
110     virtual CPUEUTask* PickUpTaskBatch(WorkerThread* thread) = 0;
111     std::atomic_uint64_t stealWorkers[QoS::MaxNum()] = {0};
112     friend class CPUManagerStrategy;
113 };
114 } // namespace ffrt
115 #endif
116