1/*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "thread_pool.h"
17#include "errors.h"
18#include "utils_log.h"
19
20#include <memory>
21#include <pthread.h>
22
23namespace OHOS {
24
25ThreadPool::ThreadPool(const std::string& name)
26    : myName_(name), maxTaskNum_(0), running_(false)
27{
28}
29
30ThreadPool::~ThreadPool()
31{
32    if (running_) {
33        Stop();
34    }
35}
36
37uint32_t ThreadPool::Start(int numThreads)
38{
39    if (!threads_.empty()) {
40        return ERR_INVALID_OPERATION;
41    }
42
43    if (numThreads <= 0) {
44        return ERR_INVALID_VALUE;
45    }
46    running_ = true;
47    threads_.reserve(numThreads);
48
49    for (int i = 0; i < numThreads; ++i) {
50        std::thread t([this] { this->WorkInThread(); });
51        // Give the name of ThreadPool to threads created by the ThreadPool.
52        int err = pthread_setname_np(t.native_handle(), (myName_ + std::to_string(i)).c_str());
53        if (err != 0) {
54            UTILS_LOGD("Failed to set name to thread. %{public}s", strerror(err));
55        }
56        threads_.push_back(std::move(t));
57    }
58    return ERR_OK;
59}
60
61void ThreadPool::Stop()
62{
63    {
64        std::unique_lock<std::mutex>  lock(mutex_);
65        running_ = false;
66        hasTaskToDo_.notify_all();
67    }
68
69    for (auto& e : threads_) {
70        e.join();
71    }
72}
73
74void ThreadPool::AddTask(const Task &f)
75{
76    if (threads_.empty()) {
77        f();
78    } else {
79        std::unique_lock<std::mutex> lock(mutex_);
80        while (Overloaded()) {
81            acceptNewTask_.wait(lock);
82        }
83
84        tasks_.push_back(f);
85        hasTaskToDo_.notify_one();
86    }
87}
88
89size_t ThreadPool::GetCurTaskNum()
90{
91    std::unique_lock<std::mutex> lock(mutex_);
92    return tasks_.size();
93}
94
95
96ThreadPool::Task ThreadPool::ScheduleTask()
97{
98    std::unique_lock<std::mutex> lock(mutex_);
99    while (tasks_.empty() && running_) {
100        hasTaskToDo_.wait(lock);
101    }
102
103    Task task;
104    if (!tasks_.empty()) {
105        task = tasks_.front();
106        tasks_.pop_front();
107
108        if (maxTaskNum_ > 0) {
109            acceptNewTask_.notify_one();
110        }
111    }
112    return task;
113}
114
115bool ThreadPool::Overloaded() const
116{
117    return (maxTaskNum_ > 0) && (tasks_.size() >= maxTaskNum_);
118}
119
120void ThreadPool::WorkInThread()
121{
122    while (running_) {
123        Task task = ScheduleTask();
124        if (task) {
125            task();
126        }
127    }
128}
129
130} // namespace OHOS
131