1// Copyright 2020 the V8 project authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "src/libplatform/default-job.h" 6 7#include "src/base/bits.h" 8#include "src/base/macros.h" 9 10namespace v8 { 11namespace platform { 12namespace { 13 14// Capped to allow assigning task_ids from a bitfield. 15constexpr size_t kMaxWorkersPerJob = 32; 16 17} // namespace 18 19DefaultJobState::JobDelegate::~JobDelegate() { 20 static_assert(kInvalidTaskId >= kMaxWorkersPerJob, 21 "kInvalidTaskId must be outside of the range of valid task_ids " 22 "[0, kMaxWorkersPerJob)"); 23 if (task_id_ != kInvalidTaskId) outer_->ReleaseTaskId(task_id_); 24} 25 26uint8_t DefaultJobState::JobDelegate::GetTaskId() { 27 if (task_id_ == kInvalidTaskId) task_id_ = outer_->AcquireTaskId(); 28 return task_id_; 29} 30 31DefaultJobState::DefaultJobState(Platform* platform, 32 std::unique_ptr<JobTask> job_task, 33 TaskPriority priority, 34 size_t num_worker_threads) 35 : platform_(platform), 36 job_task_(std::move(job_task)), 37 priority_(priority), 38 num_worker_threads_(std::min(num_worker_threads, kMaxWorkersPerJob)) {} 39 40DefaultJobState::~DefaultJobState() { DCHECK_EQ(0U, active_workers_); } 41 42void DefaultJobState::NotifyConcurrencyIncrease() { 43 if (is_canceled_.load(std::memory_order_relaxed)) return; 44 45 size_t num_tasks_to_post = 0; 46 TaskPriority priority; 47 { 48 base::MutexGuard guard(&mutex_); 49 const size_t max_concurrency = CappedMaxConcurrency(active_workers_); 50 // Consider |pending_tasks_| to avoid posting too many tasks. 51 if (max_concurrency > (active_workers_ + pending_tasks_)) { 52 num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_; 53 pending_tasks_ += num_tasks_to_post; 54 } 55 priority = priority_; 56 } 57 // Post additional worker tasks to reach |max_concurrency|. 58 for (size_t i = 0; i < num_tasks_to_post; ++i) { 59 CallOnWorkerThread(priority, std::make_unique<DefaultJobWorker>( 60 shared_from_this(), job_task_.get())); 61 } 62} 63 64uint8_t DefaultJobState::AcquireTaskId() { 65 static_assert(kMaxWorkersPerJob <= sizeof(assigned_task_ids_) * 8, 66 "TaskId bitfield isn't big enough to fit kMaxWorkersPerJob."); 67 uint32_t assigned_task_ids = 68 assigned_task_ids_.load(std::memory_order_relaxed); 69 DCHECK_LE(v8::base::bits::CountPopulation(assigned_task_ids) + 1, 70 kMaxWorkersPerJob); 71 uint32_t new_assigned_task_ids = 0; 72 uint8_t task_id = 0; 73 // memory_order_acquire on success, matched with memory_order_release in 74 // ReleaseTaskId() so that operations done by previous threads that had 75 // the same task_id become visible to the current thread. 76 do { 77 // Count trailing one bits. This is the id of the right-most 0-bit in 78 // |assigned_task_ids|. 79 task_id = v8::base::bits::CountTrailingZeros32(~assigned_task_ids); 80 new_assigned_task_ids = assigned_task_ids | (uint32_t(1) << task_id); 81 } while (!assigned_task_ids_.compare_exchange_weak( 82 assigned_task_ids, new_assigned_task_ids, std::memory_order_acquire, 83 std::memory_order_relaxed)); 84 return task_id; 85} 86 87void DefaultJobState::ReleaseTaskId(uint8_t task_id) { 88 // memory_order_release to match AcquireTaskId(). 89 uint32_t previous_task_ids = assigned_task_ids_.fetch_and( 90 ~(uint32_t(1) << task_id), std::memory_order_release); 91 DCHECK(previous_task_ids & (uint32_t(1) << task_id)); 92 USE(previous_task_ids); 93} 94 95void DefaultJobState::Join() { 96 bool can_run = false; 97 { 98 base::MutexGuard guard(&mutex_); 99 priority_ = TaskPriority::kUserBlocking; 100 // Reserve a worker for the joining thread. GetMaxConcurrency() is ignored 101 // here, but WaitForParticipationOpportunityLockRequired() waits for 102 // workers to return if necessary so we don't exceed GetMaxConcurrency(). 103 num_worker_threads_ = platform_->NumberOfWorkerThreads() + 1; 104 ++active_workers_; 105 can_run = WaitForParticipationOpportunityLockRequired(); 106 } 107 DefaultJobState::JobDelegate delegate(this, true); 108 while (can_run) { 109 job_task_->Run(&delegate); 110 base::MutexGuard guard(&mutex_); 111 can_run = WaitForParticipationOpportunityLockRequired(); 112 } 113} 114 115void DefaultJobState::CancelAndWait() { 116 { 117 base::MutexGuard guard(&mutex_); 118 is_canceled_.store(true, std::memory_order_relaxed); 119 while (active_workers_ > 0) { 120 worker_released_condition_.Wait(&mutex_); 121 } 122 } 123} 124 125void DefaultJobState::CancelAndDetach() { 126 is_canceled_.store(true, std::memory_order_relaxed); 127} 128 129bool DefaultJobState::IsActive() { 130 base::MutexGuard guard(&mutex_); 131 return job_task_->GetMaxConcurrency(active_workers_) != 0 || 132 active_workers_ != 0; 133} 134 135bool DefaultJobState::CanRunFirstTask() { 136 base::MutexGuard guard(&mutex_); 137 --pending_tasks_; 138 if (is_canceled_.load(std::memory_order_relaxed)) return false; 139 if (active_workers_ >= std::min(job_task_->GetMaxConcurrency(active_workers_), 140 num_worker_threads_)) { 141 return false; 142 } 143 // Acquire current worker. 144 ++active_workers_; 145 return true; 146} 147 148bool DefaultJobState::DidRunTask() { 149 size_t num_tasks_to_post = 0; 150 TaskPriority priority; 151 { 152 base::MutexGuard guard(&mutex_); 153 const size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1); 154 if (is_canceled_.load(std::memory_order_relaxed) || 155 active_workers_ > max_concurrency) { 156 // Release current worker and notify. 157 --active_workers_; 158 worker_released_condition_.NotifyOne(); 159 return false; 160 } 161 // Consider |pending_tasks_| to avoid posting too many tasks. 162 if (max_concurrency > active_workers_ + pending_tasks_) { 163 num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_; 164 pending_tasks_ += num_tasks_to_post; 165 } 166 priority = priority_; 167 } 168 // Post additional worker tasks to reach |max_concurrency| in the case that 169 // max concurrency increased. This is not strictly necessary, since 170 // NotifyConcurrencyIncrease() should eventually be invoked. However, some 171 // users of PostJob() batch work and tend to call NotifyConcurrencyIncrease() 172 // late. Posting here allows us to spawn new workers sooner. 173 for (size_t i = 0; i < num_tasks_to_post; ++i) { 174 CallOnWorkerThread(priority, std::make_unique<DefaultJobWorker>( 175 shared_from_this(), job_task_.get())); 176 } 177 return true; 178} 179 180bool DefaultJobState::WaitForParticipationOpportunityLockRequired() { 181 size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1); 182 while (active_workers_ > max_concurrency && active_workers_ > 1) { 183 worker_released_condition_.Wait(&mutex_); 184 max_concurrency = CappedMaxConcurrency(active_workers_ - 1); 185 } 186 if (active_workers_ <= max_concurrency) return true; 187 DCHECK_EQ(1U, active_workers_); 188 DCHECK_EQ(0U, max_concurrency); 189 active_workers_ = 0; 190 is_canceled_.store(true, std::memory_order_relaxed); 191 return false; 192} 193 194size_t DefaultJobState::CappedMaxConcurrency(size_t worker_count) const { 195 return std::min(job_task_->GetMaxConcurrency(worker_count), 196 num_worker_threads_); 197} 198 199void DefaultJobState::CallOnWorkerThread(TaskPriority priority, 200 std::unique_ptr<Task> task) { 201 switch (priority) { 202 case TaskPriority::kBestEffort: 203 return platform_->CallLowPriorityTaskOnWorkerThread(std::move(task)); 204 case TaskPriority::kUserVisible: 205 return platform_->CallOnWorkerThread(std::move(task)); 206 case TaskPriority::kUserBlocking: 207 return platform_->CallBlockingTaskOnWorkerThread(std::move(task)); 208 } 209} 210 211void DefaultJobState::UpdatePriority(TaskPriority priority) { 212 base::MutexGuard guard(&mutex_); 213 priority_ = priority; 214} 215 216DefaultJobHandle::DefaultJobHandle(std::shared_ptr<DefaultJobState> state) 217 : state_(std::move(state)) { 218 state_->NotifyConcurrencyIncrease(); 219} 220 221DefaultJobHandle::~DefaultJobHandle() { DCHECK_EQ(nullptr, state_); } 222 223void DefaultJobHandle::Join() { 224 state_->Join(); 225 state_ = nullptr; 226} 227void DefaultJobHandle::Cancel() { 228 state_->CancelAndWait(); 229 state_ = nullptr; 230} 231 232void DefaultJobHandle::CancelAndDetach() { 233 state_->CancelAndDetach(); 234 state_ = nullptr; 235} 236 237bool DefaultJobHandle::IsActive() { return state_->IsActive(); } 238 239void DefaultJobHandle::UpdatePriority(TaskPriority priority) { 240 state_->UpdatePriority(priority); 241} 242 243} // namespace platform 244} // namespace v8 245