1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef FFRT_CPUWORKER_MANAGER_HPP 17 #define FFRT_CPUWORKER_MANAGER_HPP 18 19 #include "eu/worker_manager.h" 20 #include "eu/cpu_worker.h" 21 #include "eu/cpu_monitor.h" 22 #include "eu/cpu_manager_interface.h" 23 #include "sync/poller.h" 24 #include "util/spmc_queue.h" 25 #include "tm/cpu_task.h" 26 27 namespace ffrt { 28 struct WorkerSleepCtl { 29 std::mutex mutex; 30 std::condition_variable cv; 31 }; 32 33 class CPUWorkerManager : public WorkerManager { 34 public: 35 CPUWorkerManager(); 36 37 ~CPUWorkerManager() override 38 { 39 } 40 41 void NotifyTaskAdded(const QoS& qos) override; 42 void NotifyLocalTaskAdded(const QoS& qos) override; 43 void NotifyWorkers(const QoS& qos, int number) override; 44 45 std::mutex* GetSleepCtl(int qos) override 46 { 47 return &sleepCtl[qos].mutex; 48 } 49 AddStealingWorker(const QoS& qos)50 void AddStealingWorker(const QoS& qos) 51 { 52 stealWorkers[qos].fetch_add(1); 53 } 54 SubStealingWorker(const QoS& qos)55 void SubStealingWorker(const QoS& qos) 56 { 57 while (1) { 58 uint64_t stealWorkersNum = stealWorkers[qos].load(); 59 if (stealWorkersNum == 0) { 60 return; 61 } 62 if (atomic_compare_exchange_weak(&stealWorkers[qos], &stealWorkersNum, stealWorkersNum - 1)) return; 63 } 64 } 65 GetStealingWorkers(const QoS& qos)66 uint64_t GetStealingWorkers(const QoS& qos) 67 { 68 return stealWorkers[qos].load(std::memory_order_relaxed); 69 } 70 CPUMonitor* GetCPUMonitor() override 71 { 72 return monitor; 73 } 74 75 protected: 76 virtual void WorkerPrepare(WorkerThread* thread) = 0; 77 virtual void WakeupWorkers(const QoS& qos) = 0; 78 bool IncWorker(const QoS& qos) override; 79 int GetTaskCount(const QoS& qos); 80 int GetWorkerCount(const QoS& qos); 81 void WorkerJoinTg(const QoS& qos, pid_t pid); 82 83 CPUMonitor* monitor = nullptr; 84 bool tearDown = false; 85 WorkerSleepCtl sleepCtl[QoS::MaxNum()]; 86 void WorkerLeaveTg(const QoS& qos, pid_t pid); 87 uint8_t polling_[QoS::MaxNum()] = {0}; 88 fast_mutex pollersMtx[QoS::MaxNum()]; 89 void WorkerRetired(WorkerThread* thread); 90 #ifdef FFRT_WORKERS_DYNAMIC_SCALING 91 bool IsExceedRunningThreshold(const WorkerThread* thread); 92 bool IsBlockAwareInit(void); 93 #endif 94 95 private: 96 bool WorkerTearDown(); 97 bool DecWorker() override 98 {return false;} 99 void NotifyTaskPicked(const WorkerThread* thread); 100 /* strategy options for task pick up */ 101 virtual CPUEUTask* PickUpTaskFromGlobalQueue(WorkerThread* thread) = 0; 102 CPUEUTask* PickUpTaskFromLocalQueue(WorkerThread* thread); 103 104 /* strategy options for worker wait action */ 105 virtual WorkerAction WorkerIdleAction(const WorkerThread* thread) = 0; 106 107 void WorkerSetup(WorkerThread* thread); 108 PollerRet TryPoll(const WorkerThread* thread, int timeout = -1); 109 unsigned int StealTaskBatch(WorkerThread* thread); 110 virtual CPUEUTask* PickUpTaskBatch(WorkerThread* thread) = 0; 111 std::atomic_uint64_t stealWorkers[QoS::MaxNum()] = {0}; 112 friend class CPUManagerStrategy; 113 }; 114 } // namespace ffrt 115 #endif 116