1/* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15#ifndef DFSU_THREAD_H 16#define DFSU_THREAD_H 17 18#include <atomic> 19#include <condition_variable> 20#include <functional> 21#include <mutex> 22#include <thread> 23#include "utils_log.h" 24 25namespace OHOS { 26namespace Storage { 27namespace DistributedFile { 28namespace Utils { 29class DfsuThread { 30public: 31 DfsuThread() = default; 32 DfsuThread(const DfsuThread &) = delete; 33 DfsuThread &operator=(const DfsuThread &) = delete; 34 35 ~DfsuThread() 36 { 37 Stop(); 38 } 39 40 template<class Fn, class... Args> 41 bool Run(Fn &&Fx, Args &&...Ax) 42 { 43 std::unique_lock<std::mutex> lock(threadMutex_); 44 if (thread_ != nullptr) { 45 return false; 46 } 47 running_ = true; 48 thread_ = std::make_unique<std::thread>(std::forward<Fn>(Fx), std::forward<Args>(Ax)...); 49 return true; 50 } 51 52 bool RunLoop(std::function<bool()> task, uint64_t interval, uint32_t retryTimes = UINT32_MAX) 53 { 54 std::unique_lock<std::mutex> lock(threadMutex_); 55 if (thread_ != nullptr) { 56 return false; 57 } 58 running_ = true; 59 thread_ = std::make_unique<std::thread>([this, task, interval, retryTimes] { 60 uint32_t times = retryTimes; 61 LOGD("DfsThread: entering loop"); 62 while ((!task()) && (times > 0)) { 63 times--; 64 std::unique_lock<std::mutex> lock(sleepMutex_); 65 bool stop = 66 sleepCv_.wait_for(lock, std::chrono::milliseconds(interval), [this]() { return !this->running_; }); 67 if (stop) { // is stopped 68 break; 69 } 70 } 71 LOGD("DfsThread: leaving loop"); 72 }); 73 return true; 74 } 75 76 bool RunLoopFlexible(std::function<bool(uint64_t &)> task, uint64_t interval, uint32_t retryTimes = UINT32_MAX) 77 { 78 std::unique_lock<std::mutex> lock(threadMutex_); 79 if (thread_ != nullptr) { 80 return false; 81 } 82 running_ = true; 83 thread_ = std::make_unique<std::thread>([this, task, interval, retryTimes] { 84 uint32_t times = retryTimes; 85 uint64_t duration = interval; 86 LOGD("DfsThread: entering flexible loop"); 87 while ((!task(duration)) && (times > 0)) { 88 times--; 89 std::unique_lock<std::mutex> lock(sleepMutex_); 90 bool stop = 91 sleepCv_.wait_for(lock, std::chrono::milliseconds(duration), [this]() { return !this->running_; }); 92 if (stop) { // is stopped 93 break; 94 } 95 } 96 LOGD("DfsThread: leaving flexible loop"); 97 }); 98 return true; 99 } 100 101 bool Stop() 102 { 103 std::unique_lock<std::mutex> lockThread(threadMutex_); 104 if (thread_ == nullptr) { 105 return true; 106 } 107 { 108 std::unique_lock<std::mutex> lockSleep(sleepMutex_); 109 running_ = false; 110 sleepCv_.notify_one(); 111 } 112 LOGD("wait thread to stop"); 113 if (thread_->joinable()) { 114 thread_->join(); 115 } 116 thread_ = nullptr; 117 return true; 118 } 119 120 bool operator==(std::thread::id id) 121 { 122 if (thread_ == nullptr) { 123 return false; 124 } 125 return thread_->get_id() == id; 126 } 127 128private: 129 std::atomic_bool running_ {false}; 130 std::mutex threadMutex_ {}; 131 std::unique_ptr<std::thread> thread_ {nullptr}; 132 std::mutex sleepMutex_ {}; 133 std::condition_variable sleepCv_ {}; 134}; 135} // namespace Utils 136} // namespace DistributedFile 137} // namespace Storage 138} // namespace OHOS 139#endif // DFSU_THREAD_H