1// Copyright 2020 the V8 project authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "src/libplatform/default-job.h"
6
7#include "src/base/bits.h"
8#include "src/base/macros.h"
9
10namespace v8 {
11namespace platform {
12namespace {
13
14// Capped to allow assigning task_ids from a bitfield.
15constexpr size_t kMaxWorkersPerJob = 32;
16
17}  // namespace
18
19DefaultJobState::JobDelegate::~JobDelegate() {
20  static_assert(kInvalidTaskId >= kMaxWorkersPerJob,
21                "kInvalidTaskId must be outside of the range of valid task_ids "
22                "[0, kMaxWorkersPerJob)");
23  if (task_id_ != kInvalidTaskId) outer_->ReleaseTaskId(task_id_);
24}
25
26uint8_t DefaultJobState::JobDelegate::GetTaskId() {
27  if (task_id_ == kInvalidTaskId) task_id_ = outer_->AcquireTaskId();
28  return task_id_;
29}
30
31DefaultJobState::DefaultJobState(Platform* platform,
32                                 std::unique_ptr<JobTask> job_task,
33                                 TaskPriority priority,
34                                 size_t num_worker_threads)
35    : platform_(platform),
36      job_task_(std::move(job_task)),
37      priority_(priority),
38      num_worker_threads_(std::min(num_worker_threads, kMaxWorkersPerJob)) {}
39
40DefaultJobState::~DefaultJobState() { DCHECK_EQ(0U, active_workers_); }
41
42void DefaultJobState::NotifyConcurrencyIncrease() {
43  if (is_canceled_.load(std::memory_order_relaxed)) return;
44
45  size_t num_tasks_to_post = 0;
46  TaskPriority priority;
47  {
48    base::MutexGuard guard(&mutex_);
49    const size_t max_concurrency = CappedMaxConcurrency(active_workers_);
50    // Consider |pending_tasks_| to avoid posting too many tasks.
51    if (max_concurrency > (active_workers_ + pending_tasks_)) {
52      num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_;
53      pending_tasks_ += num_tasks_to_post;
54    }
55    priority = priority_;
56  }
57  // Post additional worker tasks to reach |max_concurrency|.
58  for (size_t i = 0; i < num_tasks_to_post; ++i) {
59    CallOnWorkerThread(priority, std::make_unique<DefaultJobWorker>(
60                                     shared_from_this(), job_task_.get()));
61  }
62}
63
64uint8_t DefaultJobState::AcquireTaskId() {
65  static_assert(kMaxWorkersPerJob <= sizeof(assigned_task_ids_) * 8,
66                "TaskId bitfield isn't big enough to fit kMaxWorkersPerJob.");
67  uint32_t assigned_task_ids =
68      assigned_task_ids_.load(std::memory_order_relaxed);
69  DCHECK_LE(v8::base::bits::CountPopulation(assigned_task_ids) + 1,
70            kMaxWorkersPerJob);
71  uint32_t new_assigned_task_ids = 0;
72  uint8_t task_id = 0;
73  // memory_order_acquire on success, matched with memory_order_release in
74  // ReleaseTaskId() so that operations done by previous threads that had
75  // the same task_id become visible to the current thread.
76  do {
77    // Count trailing one bits. This is the id of the right-most 0-bit in
78    // |assigned_task_ids|.
79    task_id = v8::base::bits::CountTrailingZeros32(~assigned_task_ids);
80    new_assigned_task_ids = assigned_task_ids | (uint32_t(1) << task_id);
81  } while (!assigned_task_ids_.compare_exchange_weak(
82      assigned_task_ids, new_assigned_task_ids, std::memory_order_acquire,
83      std::memory_order_relaxed));
84  return task_id;
85}
86
87void DefaultJobState::ReleaseTaskId(uint8_t task_id) {
88  // memory_order_release to match AcquireTaskId().
89  uint32_t previous_task_ids = assigned_task_ids_.fetch_and(
90      ~(uint32_t(1) << task_id), std::memory_order_release);
91  DCHECK(previous_task_ids & (uint32_t(1) << task_id));
92  USE(previous_task_ids);
93}
94
95void DefaultJobState::Join() {
96  bool can_run = false;
97  {
98    base::MutexGuard guard(&mutex_);
99    priority_ = TaskPriority::kUserBlocking;
100    // Reserve a worker for the joining thread. GetMaxConcurrency() is ignored
101    // here, but WaitForParticipationOpportunityLockRequired() waits for
102    // workers to return if necessary so we don't exceed GetMaxConcurrency().
103    num_worker_threads_ = platform_->NumberOfWorkerThreads() + 1;
104    ++active_workers_;
105    can_run = WaitForParticipationOpportunityLockRequired();
106  }
107  DefaultJobState::JobDelegate delegate(this, true);
108  while (can_run) {
109    job_task_->Run(&delegate);
110    base::MutexGuard guard(&mutex_);
111    can_run = WaitForParticipationOpportunityLockRequired();
112  }
113}
114
115void DefaultJobState::CancelAndWait() {
116  {
117    base::MutexGuard guard(&mutex_);
118    is_canceled_.store(true, std::memory_order_relaxed);
119    while (active_workers_ > 0) {
120      worker_released_condition_.Wait(&mutex_);
121    }
122  }
123}
124
125void DefaultJobState::CancelAndDetach() {
126  is_canceled_.store(true, std::memory_order_relaxed);
127}
128
129bool DefaultJobState::IsActive() {
130  base::MutexGuard guard(&mutex_);
131  return job_task_->GetMaxConcurrency(active_workers_) != 0 ||
132         active_workers_ != 0;
133}
134
135bool DefaultJobState::CanRunFirstTask() {
136  base::MutexGuard guard(&mutex_);
137  --pending_tasks_;
138  if (is_canceled_.load(std::memory_order_relaxed)) return false;
139  if (active_workers_ >= std::min(job_task_->GetMaxConcurrency(active_workers_),
140                                  num_worker_threads_)) {
141    return false;
142  }
143  // Acquire current worker.
144  ++active_workers_;
145  return true;
146}
147
148bool DefaultJobState::DidRunTask() {
149  size_t num_tasks_to_post = 0;
150  TaskPriority priority;
151  {
152    base::MutexGuard guard(&mutex_);
153    const size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
154    if (is_canceled_.load(std::memory_order_relaxed) ||
155        active_workers_ > max_concurrency) {
156      // Release current worker and notify.
157      --active_workers_;
158      worker_released_condition_.NotifyOne();
159      return false;
160    }
161    // Consider |pending_tasks_| to avoid posting too many tasks.
162    if (max_concurrency > active_workers_ + pending_tasks_) {
163      num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_;
164      pending_tasks_ += num_tasks_to_post;
165    }
166    priority = priority_;
167  }
168  // Post additional worker tasks to reach |max_concurrency| in the case that
169  // max concurrency increased. This is not strictly necessary, since
170  // NotifyConcurrencyIncrease() should eventually be invoked. However, some
171  // users of PostJob() batch work and tend to call NotifyConcurrencyIncrease()
172  // late. Posting here allows us to spawn new workers sooner.
173  for (size_t i = 0; i < num_tasks_to_post; ++i) {
174    CallOnWorkerThread(priority, std::make_unique<DefaultJobWorker>(
175                                     shared_from_this(), job_task_.get()));
176  }
177  return true;
178}
179
180bool DefaultJobState::WaitForParticipationOpportunityLockRequired() {
181  size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
182  while (active_workers_ > max_concurrency && active_workers_ > 1) {
183    worker_released_condition_.Wait(&mutex_);
184    max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
185  }
186  if (active_workers_ <= max_concurrency) return true;
187  DCHECK_EQ(1U, active_workers_);
188  DCHECK_EQ(0U, max_concurrency);
189  active_workers_ = 0;
190  is_canceled_.store(true, std::memory_order_relaxed);
191  return false;
192}
193
194size_t DefaultJobState::CappedMaxConcurrency(size_t worker_count) const {
195  return std::min(job_task_->GetMaxConcurrency(worker_count),
196                  num_worker_threads_);
197}
198
199void DefaultJobState::CallOnWorkerThread(TaskPriority priority,
200                                         std::unique_ptr<Task> task) {
201  switch (priority) {
202    case TaskPriority::kBestEffort:
203      return platform_->CallLowPriorityTaskOnWorkerThread(std::move(task));
204    case TaskPriority::kUserVisible:
205      return platform_->CallOnWorkerThread(std::move(task));
206    case TaskPriority::kUserBlocking:
207      return platform_->CallBlockingTaskOnWorkerThread(std::move(task));
208  }
209}
210
211void DefaultJobState::UpdatePriority(TaskPriority priority) {
212  base::MutexGuard guard(&mutex_);
213  priority_ = priority;
214}
215
216DefaultJobHandle::DefaultJobHandle(std::shared_ptr<DefaultJobState> state)
217    : state_(std::move(state)) {
218  state_->NotifyConcurrencyIncrease();
219}
220
221DefaultJobHandle::~DefaultJobHandle() { DCHECK_EQ(nullptr, state_); }
222
223void DefaultJobHandle::Join() {
224  state_->Join();
225  state_ = nullptr;
226}
227void DefaultJobHandle::Cancel() {
228  state_->CancelAndWait();
229  state_ = nullptr;
230}
231
232void DefaultJobHandle::CancelAndDetach() {
233  state_->CancelAndDetach();
234  state_ = nullptr;
235}
236
237bool DefaultJobHandle::IsActive() { return state_->IsActive(); }
238
239void DefaultJobHandle::UpdatePriority(TaskPriority priority) {
240  state_->UpdatePriority(priority);
241}
242
243}  // namespace platform
244}  // namespace v8
245