v8
V8 is Google’s open source high-performance JavaScript and WebAssembly engine, written in C++.
Loading...
Searching...
No Matches
default-job.cc
Go to the documentation of this file.
1// Copyright 2020 the V8 project authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
6
7#include "src/base/bits.h"
8#include "src/base/macros.h"
9
10namespace v8 {
11namespace platform {
12namespace {
13
14// Capped to allow assigning task_ids from a bitfield.
15constexpr size_t kMaxWorkersPerJob = 32;
16
17} // namespace
18
20 static_assert(kInvalidTaskId >= kMaxWorkersPerJob,
21 "kInvalidTaskId must be outside of the range of valid task_ids "
22 "[0, kMaxWorkersPerJob)");
24}
25
27 if (task_id_ == kInvalidTaskId) task_id_ = outer_->AcquireTaskId();
28 return task_id_;
29}
30
32 std::unique_ptr<JobTask> job_task,
34 size_t num_worker_threads)
35 : platform_(platform),
36 job_task_(std::move(job_task)),
38 num_worker_threads_(std::min(num_worker_threads, kMaxWorkersPerJob)) {}
39
41
43 if (is_canceled_.load(std::memory_order_relaxed)) return;
44
45 size_t num_tasks_to_post = 0;
47 {
49 const size_t max_concurrency = CappedMaxConcurrency(active_workers_);
50 // Consider |pending_tasks_| to avoid posting too many tasks.
51 if (max_concurrency > active_workers_ + pending_tasks_) {
52 num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_;
53 pending_tasks_ += num_tasks_to_post;
54 }
56 }
57 // Post additional worker tasks to reach |max_concurrency|.
58 for (size_t i = 0; i < num_tasks_to_post; ++i) {
59 CallOnWorkerThread(priority, std::make_unique<DefaultJobWorker>(
60 shared_from_this(), job_task_.get()));
61 }
62}
63
65 static_assert(kMaxWorkersPerJob <= sizeof(assigned_task_ids_) * 8,
66 "TaskId bitfield isn't big enough to fit kMaxWorkersPerJob.");
67 uint32_t assigned_task_ids =
68 assigned_task_ids_.load(std::memory_order_relaxed);
69 DCHECK_LE(v8::base::bits::CountPopulation(assigned_task_ids) + 1,
70 kMaxWorkersPerJob);
71 uint32_t new_assigned_task_ids = 0;
72 uint8_t task_id = 0;
73 // memory_order_acquire on success, matched with memory_order_release in
74 // ReleaseTaskId() so that operations done by previous threads that had
75 // the same task_id become visible to the current thread.
76 do {
77 // Count trailing one bits. This is the id of the right-most 0-bit in
78 // |assigned_task_ids|.
79 task_id = v8::base::bits::CountTrailingZeros32(~assigned_task_ids);
80 new_assigned_task_ids = assigned_task_ids | (uint32_t(1) << task_id);
81 } while (!assigned_task_ids_.compare_exchange_weak(
82 assigned_task_ids, new_assigned_task_ids, std::memory_order_acquire,
83 std::memory_order_relaxed));
84 return task_id;
85}
86
87void DefaultJobState::ReleaseTaskId(uint8_t task_id) {
88 // memory_order_release to match AcquireTaskId().
89 uint32_t previous_task_ids = assigned_task_ids_.fetch_and(
90 ~(uint32_t(1) << task_id), std::memory_order_release);
91 DCHECK(previous_task_ids & (uint32_t(1) << task_id));
92 USE(previous_task_ids);
93}
94
96 auto WaitForParticipationOpportunity = [this]() -> size_t {
97 // Subtract one from active_workers_ since the current thread is not
98 // participating yet.
99 size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
100 // Wait until we can participate in the job.
101 while (active_workers_ > max_concurrency && active_workers_ > 1) {
103 max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
104 }
105 DCHECK_LE(0, max_concurrency);
106 if (max_concurrency != 0) return max_concurrency;
107 // The job is done (max_concurrency dropped to zero).
109 active_workers_ = 0;
110 is_canceled_.store(true, std::memory_order_relaxed);
111 return 0;
112 };
113
114 size_t num_tasks_to_post = 0;
115 {
116 base::MutexGuard guard(&mutex_);
118 // Reserve a worker for the joining (current) thread.
119 // GetMaxConcurrency() is ignored here, but if necessary we wait below
120 // for workers to return so we don't exceed GetMaxConcurrency().
123 size_t max_concurrency = WaitForParticipationOpportunity();
124 if (max_concurrency == 0) return;
125 // Compute the number of additional worker tasks to spawn.
126 if (max_concurrency > active_workers_ + pending_tasks_) {
127 num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_;
128 pending_tasks_ += num_tasks_to_post;
129 }
130 }
131 // Spawn more worker tasks if needed.
132 for (size_t i = 0; i < num_tasks_to_post; ++i) {
134 std::make_unique<DefaultJobWorker>(shared_from_this(),
135 job_task_.get()));
136 }
137
138 DefaultJobState::JobDelegate delegate(this, true);
139 while (true) {
140 // Participate in job execution, as one active worker.
141 job_task_->Run(&delegate);
142
143 base::MutexGuard guard(&mutex_);
144 if (WaitForParticipationOpportunity() == 0) return;
145 }
146}
147
149 {
150 base::MutexGuard guard(&mutex_);
151 is_canceled_.store(true, std::memory_order_relaxed);
152 while (active_workers_ > 0) {
154 }
155 }
156}
157
159 is_canceled_.store(true, std::memory_order_relaxed);
160}
161
163 base::MutexGuard guard(&mutex_);
164 return job_task_->GetMaxConcurrency(active_workers_) != 0 ||
165 active_workers_ != 0;
166}
167
169 base::MutexGuard guard(&mutex_);
171 if (is_canceled_.load(std::memory_order_relaxed)) return false;
173 // Acquire current worker.
175 return true;
176}
177
179 size_t num_tasks_to_post = 0;
181 {
182 base::MutexGuard guard(&mutex_);
183 const size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
184 if (is_canceled_.load(std::memory_order_relaxed) ||
185 active_workers_ > max_concurrency) {
186 // Release current worker and notify.
189 return false;
190 }
191 // Consider |pending_tasks_| to avoid posting too many tasks.
192 if (max_concurrency > active_workers_ + pending_tasks_) {
193 num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_;
194 pending_tasks_ += num_tasks_to_post;
195 }
197 }
198 // Post additional worker tasks to reach |max_concurrency| in the case that
199 // max concurrency increased. This is not strictly necessary, since
200 // NotifyConcurrencyIncrease() should eventually be invoked. However, some
201 // users of PostJob() batch work and tend to call NotifyConcurrencyIncrease()
202 // late. Posting here allows us to spawn new workers sooner.
203 for (size_t i = 0; i < num_tasks_to_post; ++i) {
204 CallOnWorkerThread(priority, std::make_unique<DefaultJobWorker>(
205 shared_from_this(), job_task_.get()));
206 }
207 return true;
208}
209
210size_t DefaultJobState::CappedMaxConcurrency(size_t worker_count) const {
211 return std::min(job_task_->GetMaxConcurrency(worker_count),
213}
214
216 std::unique_ptr<Task> task) {
217 platform_->PostTaskOnWorkerThread(priority, std::move(task));
218}
219
224
225DefaultJobHandle::DefaultJobHandle(std::shared_ptr<DefaultJobState> state)
226 : state_(std::move(state)) {}
227
229
231 state_->Join();
232 state_ = nullptr;
233}
235 state_->CancelAndWait();
236 state_ = nullptr;
237}
238
240 state_->CancelAndDetach();
241 state_ = nullptr;
242}
243
244bool DefaultJobHandle::IsActive() { return state_->IsActive(); }
245
249
250} // namespace platform
251} // namespace v8
void PostTaskOnWorkerThread(TaskPriority priority, std::unique_ptr< Task > task, const SourceLocation &location=SourceLocation::Current())
void UpdatePriority(TaskPriority) override
DefaultJobHandle(std::shared_ptr< DefaultJobState > state)
std::shared_ptr< DefaultJobState > state_
static constexpr uint8_t kInvalidTaskId
Definition default-job.h:45
std::atomic_bool is_canceled_
Definition default-job.h:95
std::unique_ptr< JobTask > job_task_
Definition default-job.h:85
DefaultJobState(Platform *platform, std::unique_ptr< JobTask > job_task, TaskPriority priority, size_t num_worker_threads)
void ReleaseTaskId(uint8_t task_id)
base::ConditionVariable worker_released_condition_
Definition default-job.h:99
void UpdatePriority(TaskPriority)
std::atomic< uint32_t > assigned_task_ids_
size_t CappedMaxConcurrency(size_t worker_count) const
void CallOnWorkerThread(TaskPriority priority, std::unique_ptr< Task > task)
enum v8::internal::@1270::DeoptimizableCodeIterator::@67 state_
size_t priority
STL namespace.
constexpr unsigned CountTrailingZeros32(uint32_t value)
Definition bits.h:161
constexpr unsigned CountPopulation(T value)
Definition bits.h:26
TaskPriority
Definition v8-platform.h:24
#define DCHECK_LE(v1, v2)
Definition logging.h:490
#define DCHECK(condition)
Definition logging.h:482
#define DCHECK_EQ(v1, v2)
Definition logging.h:485
#define USE(...)
Definition macros.h:293