v8
V8 is Google’s open source high-performance JavaScript and WebAssembly engine, written in C++.
Loading...
Searching...
No Matches
js-atomics-synchronization.cc
Go to the documentation of this file.
1// Copyright 2022 the V8 project authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
6
7#include "src/base/macros.h"
14
15namespace v8 {
16namespace internal {
17
18namespace detail {
19class WaiterQueueNode;
20}
21
22namespace {
23
24// TODO(v8:12547): Move this logic into a static method JSPromise::PerformThen
25// so that other callsites like this one can use it.
26// Set fulfill/reject handlers for a JSPromise object.
27MaybeDirectHandle<JSReceiver> PerformPromiseThen(
28 Isolate* isolate, DirectHandle<JSReceiver> promise,
29 DirectHandle<Object> fulfill_handler,
30 MaybeDirectHandle<JSFunction> maybe_reject_handler = {}) {
31 DCHECK(IsCallable(*fulfill_handler));
32 DirectHandle<Object> reject_handler = isolate->factory()->undefined_value();
33 if (!maybe_reject_handler.is_null()) {
34 reject_handler = maybe_reject_handler.ToHandleChecked();
35 }
36 DirectHandle<Object> args[] = {fulfill_handler, reject_handler};
37
38 DirectHandle<Object> then_result;
40 isolate, then_result,
41 Execution::CallBuiltin(isolate, isolate->promise_then(), promise,
43
44 return Cast<JSReceiver>(then_result);
45}
46
47MaybeDirectHandle<Context> SetAsyncUnlockHandlers(
48 Isolate* isolate, DirectHandle<JSAtomicsMutex> mutex,
49 DirectHandle<JSReceiver> waiting_for_callback_promise,
50 DirectHandle<JSPromise> unlocked_promise) {
51 DirectHandle<Context> handlers_context =
52 isolate->factory()->NewBuiltinContext(
53 isolate->native_context(), JSAtomicsMutex::kAsyncContextLength);
54 handlers_context->set(JSAtomicsMutex::kMutexAsyncContextSlot, *mutex);
56 *unlocked_promise);
57
58 DirectHandle<SharedFunctionInfo> resolve_info(
59 isolate->heap()->atomics_mutex_async_unlock_resolve_handler_sfi(),
60 isolate);
61 DirectHandle<JSFunction> resolver_callback =
62 Factory::JSFunctionBuilder{isolate, resolve_info, handlers_context}
63 .set_map(isolate->strict_function_without_prototype_map())
64 .set_allocation_type(AllocationType::kYoung)
65 .Build();
66
67 DirectHandle<SharedFunctionInfo> reject_info(
68 isolate->heap()->atomics_mutex_async_unlock_reject_handler_sfi(),
69 isolate);
70 DirectHandle<JSFunction> reject_callback =
71 Factory::JSFunctionBuilder{isolate, reject_info, handlers_context}
72 .set_map(isolate->strict_function_without_prototype_map())
73 .set_allocation_type(AllocationType::kYoung)
74 .Build();
75
76 RETURN_ON_EXCEPTION(isolate,
77 PerformPromiseThen(isolate, waiting_for_callback_promise,
78 resolver_callback, reject_callback));
79 return handlers_context;
80}
81
82void AddPromiseToNativeContext(Isolate* isolate,
83 DirectHandle<JSPromise> promise) {
84 DirectHandle<NativeContext> native_context(isolate->native_context());
85 Handle<OrderedHashSet> promises(native_context->atomics_waitasync_promises(),
86 isolate);
87 promises = OrderedHashSet::Add(isolate, promises, promise).ToHandleChecked();
88 native_context->set_atomics_waitasync_promises(*promises);
89}
90
91void RemovePromiseFromNativeContext(Isolate* isolate,
92 DirectHandle<JSPromise> promise) {
94 isolate->native_context()->atomics_waitasync_promises(), isolate);
95 bool was_deleted = OrderedHashSet::Delete(isolate, *promises, *promise);
96 DCHECK(was_deleted);
97 USE(was_deleted);
98 promises = OrderedHashSet::Shrink(isolate, promises);
99 isolate->native_context()->set_atomics_waitasync_promises(*promises);
100}
101
102template <typename T>
103Global<T> GetWeakGlobal(Isolate* isolate, Local<T> object) {
104 auto* v8_isolate = reinterpret_cast<v8::Isolate*>(isolate);
105 v8::Global<T> global(v8_isolate, object);
106 global.SetWeak();
107 return global;
108}
109
110} // namespace
111
112namespace detail {
113
114// The waiter queue lock guard provides a RAII-style mechanism for locking the
115// waiter queue. It is a non copyable and non movable object and a new state
116// must be set before destroying the guard.
119
120 public:
121 // Spinlock to acquire the IsWaiterQueueLockedField bit. current_state is
122 // updated to the last value of the state before the waiter queue lock was
123 // acquired.
124 explicit WaiterQueueLockGuard(std::atomic<StateT>* state,
125 StateT& current_state)
126 : state_(state), new_state_(kInvalidState) {
127 while (!JSSynchronizationPrimitive::TryLockWaiterQueueExplicit(
128 state, current_state)) {
130 }
131 }
132
133 // Constructor for creating a wrapper around a state whose waiter queue
134 // is already locked and owned by this thread.
135 explicit WaiterQueueLockGuard(std::atomic<StateT>* state, bool is_locked)
136 : state_(state), new_state_(kInvalidState) {
137 CHECK(is_locked);
138 DCHECK(JSSynchronizationPrimitive::IsWaiterQueueLockedField::decode(
139 state->load()));
140 }
141
144
147 DCHECK_NE(new_state_, kInvalidState);
148 DCHECK(JSSynchronizationPrimitive::IsWaiterQueueLockedField::decode(
149 state_->load()));
150 new_state_ = JSSynchronizationPrimitive::IsWaiterQueueLockedField::update(
151 new_state_, false);
152 state_->store(new_state_, std::memory_order_release);
153 }
154
155 void set_new_state(StateT new_state) { new_state_ = new_state; }
156
157 static std::optional<WaiterQueueLockGuard>
158 NewAlreadyLockedWaiterQueueLockGuard(std::atomic<StateT>* state) {
159 return std::optional<WaiterQueueLockGuard>(std::in_place, state, true);
160 }
161
162 private:
163 static constexpr StateT kInvalidState =
164 ~JSSynchronizationPrimitive::kEmptyState;
165 std::atomic<StateT>* state_;
167};
168
170 public:
171 explicit SyncWaiterQueueNode(Isolate* requester)
172 : WaiterQueueNode(requester), should_wait_(true) {}
173
174 void Wait() {
175 AllowGarbageCollection allow_before_parking;
176 requester_->main_thread_local_heap()->ExecuteWhileParked([this]() {
177 base::MutexGuard guard(&wait_lock_);
178 while (should_wait_) {
179 wait_cond_var_.Wait(&wait_lock_);
180 }
181 });
182 }
183
184 // Returns false if timed out, true otherwise.
185 bool WaitFor(const base::TimeDelta& rel_time) {
186 bool result;
187 AllowGarbageCollection allow_before_parking;
188 requester_->main_thread_local_heap()->ExecuteWhileParked([this, rel_time,
189 &result]() {
190 base::MutexGuard guard(&wait_lock_);
191 base::TimeTicks current_time = base::TimeTicks::Now();
192 base::TimeTicks timeout_time = current_time + rel_time;
193 for (;;) {
194 if (!should_wait_) {
195 result = true;
196 return;
197 }
198 current_time = base::TimeTicks::Now();
199 if (current_time >= timeout_time) {
200 result = false;
201 return;
202 }
203 base::TimeDelta time_until_timeout = timeout_time - current_time;
204 bool wait_res = wait_cond_var_.WaitFor(&wait_lock_, time_until_timeout);
205 USE(wait_res);
206 // The wake up may have been spurious, so loop again.
207 }
208 });
209 return result;
210 }
211
212 void Notify() override {
213 base::MutexGuard guard(&wait_lock_);
214 should_wait_ = false;
215 wait_cond_var_.NotifyOne();
216 SetNotInListForVerification();
217 }
218
219 bool IsSameIsolateForAsyncCleanup(Isolate* isolate) override {
220 // Sync waiters are only queued while the thread is sleeping, so there
221 // should not be sync nodes while cleaning up the isolate.
222 DCHECK_NE(requester_, isolate);
223 return false;
224 }
225
226 void CleanupMatchingAsyncWaiters(const DequeueMatcher& matcher) override {
227 UNREACHABLE();
228 }
229
230 private:
232
236};
237
238template <typename T>
240 public:
242 typename T::AsyncWaiterNodeType* node)
243 : CancelableTask(cancelable_task_manager), node_(node) {}
244
245 void RunInternal() override {
246 if (node_->requester_->cancelable_task_manager()->canceled()) return;
247 T::HandleAsyncNotify(node_);
248 }
249
250 private:
251 typename T::AsyncWaiterNodeType* node_;
252};
253template <typename T>
255 public:
257 typename T::AsyncWaiterNodeType* node)
258 : CancelableTask(cancelable_task_manager), node_(node) {}
259
260 void RunInternal() override {
261 if (node_->requester_->cancelable_task_manager()->canceled()) return;
262 T::HandleAsyncTimeout(node_);
263 }
264
265 private:
266 typename T::AsyncWaiterNodeType* node_;
267};
268
269template <typename T>
271 public:
273 Isolate* requester, DirectHandle<T> synchronization_primitive,
274 Handle<JSPromise> internal_waiting_promise,
275 MaybeHandle<JSPromise> unlocked_promise = {}) {
276 auto waiter =
277 std::unique_ptr<AsyncWaiterQueueNode<T>>(new AsyncWaiterQueueNode<T>(
278 requester, synchronization_primitive, internal_waiting_promise,
279 unlocked_promise));
280 AsyncWaiterQueueNode<T>* raw_waiter = waiter.get();
281 requester->async_waiter_queue_nodes().push_back(std::move(waiter));
282 return raw_waiter;
283 }
284
285 // Creates a minimal LockAsyncWaiterQueueNode so that the isolate can keep
286 // track of the locked mutexes and release them in case of isolate deinit.
288 Isolate* requester, DirectHandle<T> synchronization_primitive) {
289 DCHECK(IsJSAtomicsMutex(*synchronization_primitive));
290 auto waiter = std::unique_ptr<AsyncWaiterQueueNode<T>>(
291 new AsyncWaiterQueueNode<T>(requester, synchronization_primitive));
292 AsyncWaiterQueueNode<T>* raw_waiter = waiter.get();
293 requester->async_waiter_queue_nodes().push_back(std::move(waiter));
294 return raw_waiter;
295 }
296
297 TaskRunner* task_runner() { return task_runner_.get(); }
298
300 v8::Isolate* v8_isolate = reinterpret_cast<v8::Isolate*>(requester_);
301 return native_context_.Get(v8_isolate);
302 }
303
305 v8::Isolate* v8_isolate = reinterpret_cast<v8::Isolate*>(requester_);
306 Handle<JSPromise> internal_waiting_promise =
307 Utils::OpenHandle(*internal_waiting_promise_.Get(v8_isolate));
308 return internal_waiting_promise;
309 }
310
311 bool IsEmpty() const { return synchronization_primitive_.IsEmpty(); }
312
314 v8::Isolate* v8_isolate = reinterpret_cast<v8::Isolate*>(requester_);
315 Handle<T> synchronization_primitive =
316 Cast<T>(Utils::OpenHandle(*synchronization_primitive_.Get(v8_isolate)));
317 return synchronization_primitive;
318 }
319
321 v8::Isolate* v8_isolate = reinterpret_cast<v8::Isolate*>(requester_);
322 Handle<JSPromise> unlocked_promise =
323 Cast<JSPromise>(Utils::OpenHandle(*unlocked_promise_.Get(v8_isolate)));
324 return unlocked_promise;
325 }
326
327 void Notify() override {
328 SetNotInListForVerification();
329 CancelableTaskManager* task_manager = requester_->cancelable_task_manager();
330 if (task_manager->canceled()) return;
331 auto notify_task =
332 std::make_unique<AsyncWaiterNotifyTask<T>>(task_manager, this);
333 notify_task_id_ = notify_task->id();
334 task_runner_->PostNonNestableTask(std::move(notify_task));
335 }
336
337 bool IsSameIsolateForAsyncCleanup(Isolate* isolate) override {
338 return requester_ == isolate;
339 }
340
341 void CleanupMatchingAsyncWaiters(const DequeueMatcher& matcher) override {
342 T::CleanupMatchingAsyncWaiters(requester_, this, matcher);
343 }
344
345 // Removes the node from the isolate's `async_waiter_queue_nodes` list; the
346 // passing node will be invalid after this call since the corresponding
347 // unique_ptr is deleted upon removal.
349 node->requester_->async_waiter_queue_nodes().remove_if(
350 [=](std::unique_ptr<WaiterQueueNode>& n) { return n.get() == node; });
351 }
352
353 private:
358
359 explicit AsyncWaiterQueueNode(Isolate* requester,
360 DirectHandle<T> synchronization_primitive)
361 : WaiterQueueNode(requester),
362 notify_task_id_(CancelableTaskManager::kInvalidTaskId) {
364 GetWeakGlobal(requester, Utils::ToLocal(requester->native_context()));
365 synchronization_primitive_ = GetWeakGlobal(
366 requester, Utils::ToLocal(Cast<JSObject>(synchronization_primitive)));
367 }
368
370 Isolate* requester, DirectHandle<T> synchronization_primitive,
371 DirectHandle<JSPromise> internal_waiting_promise,
372 MaybeDirectHandle<JSPromise> unlocked_promise)
373 : WaiterQueueNode(requester),
374 notify_task_id_(CancelableTaskManager::kInvalidTaskId) {
375 v8::Isolate* v8_isolate = reinterpret_cast<v8::Isolate*>(requester);
376 task_runner_ =
377 V8::GetCurrentPlatform()->GetForegroundTaskRunner(v8_isolate);
378 timeout_task_id_ = CancelableTaskManager::kInvalidTaskId;
380 GetWeakGlobal(requester, Utils::ToLocal(requester->native_context()));
381 synchronization_primitive_ = GetWeakGlobal(
382 requester, Utils::ToLocal(Cast<JSObject>(synchronization_primitive)));
383 internal_waiting_promise_ = GetWeakGlobal(
384 requester, Utils::PromiseToLocal(internal_waiting_promise));
385 if (!unlocked_promise.is_null()) {
386 DCHECK(IsJSAtomicsMutex(*synchronization_primitive));
387 unlocked_promise_ = GetWeakGlobal(
388 requester, Utils::PromiseToLocal(unlocked_promise.ToHandleChecked()));
389 }
390 }
391
392 void SetReadyForAsyncCleanup() override { ready_for_async_cleanup_ = true; }
393
394 std::shared_ptr<TaskRunner> task_runner_;
397 bool ready_for_async_cleanup_ = false;
398
399 // The node holds weak global handles to the v8 required to handle the
400 // notification.
402 // `internal_waiting_promise_` is the internal first promise of the chain. See
403 // comments in `JSAtomicsMutex` and `JSAtomicsCondition`.
406 // `unlocked_promise` is the user exposed promise used to handle timeouts,
407 // it should be empty for `JSAtomicsCondition`.
409};
410} // namespace detail
411
418
419// static
421 CleanupAsyncWaiterLists(isolate, [=](WaiterQueueNode* waiter) {
422 return waiter->IsSameIsolateForAsyncCleanup(isolate);
423 });
424}
425
427 Isolate* isolate, DequeueMatcher matcher) {
429 std::list<std::unique_ptr<WaiterQueueNode>>& async_waiter_queue_nodes_list =
430 isolate->async_waiter_queue_nodes();
431 if (!async_waiter_queue_nodes_list.empty()) {
432 // There is no allocation in the following code, so there shouldn't be any
433 // GC, but we use a HandleScope to dereference the global handles.
434 HandleScope handle_scope(isolate);
435 auto it = async_waiter_queue_nodes_list.begin();
436 while (it != async_waiter_queue_nodes_list.end()) {
437 WaiterQueueNode* async_node = it->get();
438 if (!matcher(async_node)) {
439 it++;
440 continue;
441 }
442 async_node->CleanupMatchingAsyncWaiters(matcher);
443 it = async_waiter_queue_nodes_list.erase(it);
444 }
445 }
446}
447
448// static
450 std::atomic<StateT>* state, StateT& expected) {
451 // Try to acquire the queue lock.
452 expected = IsWaiterQueueLockedField::update(expected, false);
453 return state->compare_exchange_weak(
454 expected, IsWaiterQueueLockedField::update(expected, true),
455 std::memory_order_acquire, std::memory_order_relaxed);
456}
457
458// static
460 std::atomic<StateT>* state, StateT new_state) {
461 // Set the new state changing only the waiter queue bits.
462 DCHECK_EQ(new_state & ~kWaiterQueueMask, 0);
463 StateT expected = state->load(std::memory_order_relaxed);
464 StateT desired;
465 do {
466 desired = new_state | (expected & ~kWaiterQueueMask);
467 } while (!state->compare_exchange_weak(
468 expected, desired, std::memory_order_release, std::memory_order_relaxed));
469}
470
472 Isolate* requester) {
474 std::atomic<StateT>* state = AtomicStatePtr();
475 StateT current_state = state->load(std::memory_order_acquire);
476
477 // There are no waiters.
478 if (!HasWaitersField::decode(current_state)) return Smi::FromInt(0);
479
480 int num_waiters;
481 {
482 // If this is counting the number of waiters on a mutex, the js mutex
483 // can be taken by another thread without acquiring the queue lock. We
484 // handle the state manually to release the queue lock without changing the
485 // "is locked" bit.
486 while (!TryLockWaiterQueueExplicit(state, current_state)) {
488 }
489
490 if (!HasWaitersField::decode(current_state)) {
491 // The queue was emptied while waiting for the queue lock.
493 return Smi::FromInt(0);
494 }
495
496 // Get the waiter queue head.
497 WaiterQueueNode* waiter_head = DestructivelyGetWaiterQueueHead(requester);
498 DCHECK_NOT_NULL(waiter_head);
499 num_waiters = WaiterQueueNode::LengthFromHead(waiter_head);
500
501 // Release the queue lock and reinstall the same queue head by creating a
502 // new state.
503 DCHECK_EQ(state->load(),
504 IsWaiterQueueLockedField::update(current_state, true));
505 StateT new_state = SetWaiterQueueHead(requester, waiter_head, kEmptyState);
506 new_state = IsWaiterQueueLockedField::update(new_state, false);
507 SetWaiterQueueStateOnly(state, new_state);
508 }
509
510 return Smi::FromInt(num_waiters);
511}
512
513// TODO(lpardosixtos): Consider making and caching a canonical map for this
514// result object, like we do for the iterator result object.
515// static
517 Isolate* isolate, DirectHandle<Object> value, bool success) {
519 isolate->factory()->NewJSObject(isolate->object_function());
520 DirectHandle<Object> success_value = isolate->factory()->ToBoolean(success);
521 JSObject::AddProperty(isolate, result, "value", value,
523 JSObject::AddProperty(isolate, result, "success", success_value,
525 return result;
526}
527
528// static
530 WaiterQueueNode* node,
531 DequeueMatcher matcher) {
532 auto* async_node = static_cast<LockAsyncWaiterQueueNode*>(node);
533 if (async_node->ready_for_async_cleanup_) {
534 // Whenever a node needs to be looked up in the waiter queue we also remove
535 // any other matching nodes and mark them as ready for async cleanup. This
536 // way we avoid taking the queue lock multiple times, which could slow down
537 // other threads.
538 return;
539 }
540 if (async_node->IsEmpty()) {
541 // The node's underlying synchronization primitive has been collected, so
542 // delete it.
543 async_node->SetNotInListForVerification();
544 return;
545 }
547 async_node->GetSynchronizationPrimitive();
548 std::atomic<StateT>* state = mutex->AtomicStatePtr();
549 StateT current_state = state->load(std::memory_order_relaxed);
550
551 // The details of updating the state in this function are too complicated
552 // for the waiter queue lock guard to manage, so handle the state manually.
553 while (!TryLockWaiterQueueExplicit(state, current_state)) {
555 }
556
557 bool was_locked_by_this_thread = mutex->IsCurrentThreadOwner();
558 WaiterQueueNode* waiter_head =
559 mutex->DestructivelyGetWaiterQueueHead(isolate);
560 if (waiter_head) {
561 // Dequeue all the matching waiters.
563 if (!async_node->ready_for_async_cleanup_) {
564 // The node was not in the queue, so it has already being notified.
565 // Notify the next head unless the lock is already taken by a different
566 // thread or the queue may stall.
567 if (waiter_head && (!IsLockedField::decode(current_state) ||
568 was_locked_by_this_thread)) {
569 // Notify the next head unless the lock is already taken, in which case
570 // the lock owner will notify the next waiter.
571 WaiterQueueNode* old_head = WaiterQueueNode::Dequeue(&waiter_head);
572 old_head->Notify();
573 }
574 }
575 }
576 StateT new_state = kUnlockedUncontended;
577 new_state = mutex->SetWaiterQueueHead(isolate, waiter_head, new_state);
578 new_state = IsWaiterQueueLockedField::update(new_state, false);
579 if (was_locked_by_this_thread) {
580 mutex->ClearOwnerThread();
581 new_state = IsLockedField::update(new_state, false);
582 state->store(new_state, std::memory_order_release);
583 } else {
584 SetWaiterQueueStateOnly(state, new_state);
585 }
586}
587
588// static
589bool JSAtomicsMutex::TryLockExplicit(std::atomic<StateT>* state,
590 StateT& expected) {
591 // Try to lock a possibly contended mutex.
592 expected = IsLockedField::update(expected, false);
593 return state->compare_exchange_weak(
594 expected, IsLockedField::update(expected, true),
595 std::memory_order_acquire, std::memory_order_relaxed);
596}
597
600 std::atomic<StateT>* state) {
601 // The backoff algorithm is copied from PartitionAlloc's SpinningMutex.
602 constexpr int kSpinCount = 64;
603 constexpr int kMaxBackoff = 16;
604
605 int tries = 0;
606 int backoff = 1;
607 StateT current_state = state->load(std::memory_order_relaxed);
608 do {
609 if (JSAtomicsMutex::TryLockExplicit(state, current_state)) return true;
610
611 for (int yields = 0; yields < backoff; yields++) {
613 tries++;
614 }
615
616 backoff = std::min(kMaxBackoff, backoff << 1);
617 } while (tries < kSpinCount);
618 return false;
619}
620
623 std::atomic<StateT>* state,
624 WaiterQueueNode* this_waiter) {
625 DCHECK_NOT_NULL(this_waiter);
626 // Try to acquire the queue lock, which is itself a spinlock.
627 StateT current_state = state->load(std::memory_order_relaxed);
628 std::optional<WaiterQueueLockGuard> waiter_queue_lock_guard =
629 LockWaiterQueueOrJSMutex(state, current_state);
630 if (!waiter_queue_lock_guard.has_value()) {
631 // There is no waiter queue lock guard, so the lock was acquired.
632 DCHECK(IsLockedField::decode(state->load()));
633 return false;
634 }
635
636 // With the queue lock held, enqueue the requester onto the waiter queue.
637 WaiterQueueNode* waiter_head =
638 mutex->DestructivelyGetWaiterQueueHead(requester);
639 WaiterQueueNode::Enqueue(&waiter_head, this_waiter);
640
641 // Enqueue a new waiter queue head and release the queue lock.
642 DCHECK_EQ(state->load(),
643 IsWaiterQueueLockedField::update(current_state, true));
644 StateT new_state =
645 mutex->SetWaiterQueueHead(requester, waiter_head, current_state);
646 // The lock is held, just not by us, so don't set the current thread id as
647 // the owner.
648 DCHECK(IsLockedField::decode(current_state));
649 new_state = IsLockedField::update(new_state, true);
650 waiter_queue_lock_guard->set_new_state(new_state);
651 return true;
652}
653
654// static
655std::optional<WaiterQueueLockGuard> JSAtomicsMutex::LockWaiterQueueOrJSMutex(
656 std::atomic<StateT>* state, StateT& current_state) {
657 for (;;) {
658 if (IsLockedField::decode(current_state) &&
659 TryLockWaiterQueueExplicit(state, current_state)) {
661 }
662 // Also check for the lock having been released by another thread during
663 // attempts to acquire the queue lock.
664 if (TryLockExplicit(state, current_state)) return std::nullopt;
666 }
667}
668
670 Isolate* requester, std::atomic<StateT>* state,
671 WaiterQueueNode* timed_out_waiter) {
672 // First acquire the queue lock, which is itself a spinlock.
673 StateT current_state = state->load(std::memory_order_relaxed);
674 // There are no waiters, but the js mutex lock may be held by another thread.
675 if (!HasWaitersField::decode(current_state)) return false;
676
677 // The details of updating the state in this function are too complicated
678 // for the waiter queue lock guard to manage, so handle the state manually.
679 while (!TryLockWaiterQueueExplicit(state, current_state)) {
681 }
682
683 WaiterQueueNode* waiter_head = DestructivelyGetWaiterQueueHead(requester);
684
685 if (waiter_head == nullptr) {
686 // The queue is empty but the js mutex lock may be held by another thread,
687 // release the waiter queue bit without changing the "is locked" bit.
688 DCHECK(!HasWaitersField::decode(current_state));
689 SetWaiterQueueStateOnly(state, kUnlockedUncontended);
690 return false;
691 }
692
694 &waiter_head,
695 [&](WaiterQueueNode* node) { return node == timed_out_waiter; });
696
697 // Release the queue lock and install the new waiter queue head.
698 DCHECK_EQ(state->load(),
699 IsWaiterQueueLockedField::update(current_state, true));
700 StateT new_state = kUnlockedUncontended;
701 new_state = SetWaiterQueueHead(requester, waiter_head, new_state);
702
703 if (!dequeued_node) {
704 // The timed out waiter was not in the queue, so it must have been dequeued
705 // and notified between the time this thread woke up and the time it
706 // acquired the queue lock, so there is a risk that the next queue head is
707 // never notified. Try to take the js mutex lock here, if we succeed, the
708 // next node will be notified by this thread, otherwise, it will be notified
709 // by the thread holding the lock now.
710
711 // Since we use strong CAS below, we know that the js mutex lock will be
712 // held by either this thread or another thread that can't go through the
713 // unlock fast path because this thread is holding the waiter queue lock.
714 // Hence, it is safe to always set the "is locked" bit in new_state.
715 new_state = IsLockedField::update(new_state, true);
716 DCHECK(!IsWaiterQueueLockedField::decode(new_state));
717 current_state = IsLockedField::update(current_state, false);
718 if (state->compare_exchange_strong(current_state, new_state,
719 std::memory_order_acq_rel,
720 std::memory_order_relaxed)) {
721 // The CAS atomically released the waiter queue lock and acquired the js
722 // mutex lock.
723 return true;
724 }
725
726 DCHECK(IsLockedField::decode(state->load()));
727 state->store(new_state, std::memory_order_release);
728 return false;
729 }
730
731 SetWaiterQueueStateOnly(state, new_state);
732 return false;
733}
734
735// static
738 std::atomic<StateT>* state,
739 std::optional<base::TimeDelta> timeout) {
740 for (;;) {
741 // Spin for a little bit to try to acquire the lock, so as to be fast under
742 // microcontention.
743 if (BackoffTryLock(requester, mutex, state)) return true;
744
745 // At this point the lock is considered contended, so try to go to sleep and
746 // put the requester thread on the waiter queue.
747
748 // Allocate a waiter queue node on-stack, since this thread is going to
749 // sleep and will be blocked anyway.
750 SyncWaiterQueueNode this_waiter(requester);
751 if (!MaybeEnqueueNode(requester, mutex, state, &this_waiter)) return true;
752
753 bool rv;
754 // Wait for another thread to release the lock and wake us up.
755 if (timeout) {
756 rv = this_waiter.WaitFor(*timeout);
757 // Reload the state pointer after wake up in case of shared GC while
758 // blocked.
759 state = mutex->AtomicStatePtr();
760 if (!rv) {
761 // If timed out, remove ourself from the waiter list, which is usually
762 // done by the thread performing the notifying.
763 rv = mutex->LockJSMutexOrDequeueTimedOutWaiter(requester, state,
764 &this_waiter);
765 return rv;
766 }
767 } else {
768 this_waiter.Wait();
769 // Reload the state pointer after wake up in case of shared GC while
770 // blocked.
771 state = mutex->AtomicStatePtr();
772 }
773
774 // After wake up we try to acquire the lock again by spinning, as the
775 // contention at the point of going to sleep should not be correlated with
776 // contention at the point of waking up.
777 }
778}
779
781 std::atomic<StateT>* state) {
782 // The fast path unconditionally cleared the owner thread.
783 DCHECK_EQ(ThreadId::Invalid().ToInteger(),
784 AtomicOwnerThreadIdPtr()->load(std::memory_order_relaxed));
785
786 // To wake a sleeping thread, first acquire the queue lock, which is itself
787 // a spinlock.
788 StateT current_state = state->load(std::memory_order_relaxed);
789 WaiterQueueLockGuard waiter_queue_lock_guard(state, current_state);
790
791 if (!HasWaitersField::decode(current_state)) {
792 // All waiters were removed while waiting for the queue lock, possibly by
793 // timing out. Release both the lock and the queue lock.
794 StateT new_state = IsLockedField::update(current_state, false);
795 waiter_queue_lock_guard.set_new_state(new_state);
796 return;
797 }
798
799 WaiterQueueNode* waiter_head = DestructivelyGetWaiterQueueHead(requester);
800 DCHECK_NOT_NULL(waiter_head);
801 WaiterQueueNode* old_head = WaiterQueueNode::Dequeue(&waiter_head);
802
803 // Release both the lock and the queue lock, and install the new waiter queue
804 // head.
805 StateT new_state = IsLockedField::update(current_state, false);
806 new_state = SetWaiterQueueHead(requester, waiter_head, new_state);
807 waiter_queue_lock_guard.set_new_state(new_state);
808
809 old_head->Notify();
810}
811
812// The lockAsync flow is controlled by a series of promises:
813// 1. `internal_locked_promise`, a promise that settles when the mutex is
814// locked. When this promise is resolved, the callback is run. Not exposed to
815// user code.
816// 2. `waiting_for_callback_promise`, a promise that settles when the callback
817// completes. When this promise settles, the mutex is unlocked
818// 3. `unlocked_promise`, a promise that settles when the mutex is unlocked,
819// either explicitly or by timeout. Returned by lockAsync.
820// static
823 DirectHandle<Object> callback, std::optional<base::TimeDelta> timeout) {
824 Handle<JSPromise> internal_locked_promise =
825 requester->factory()->NewJSPromise();
826 DirectHandle<JSReceiver> waiting_for_callback_promise;
828 requester, waiting_for_callback_promise,
829 PerformPromiseThen(requester, internal_locked_promise, callback));
830 Handle<JSPromise> unlocked_promise = requester->factory()->NewJSPromise();
831 // Set the async unlock handlers here so we can throw without any additional
832 // cleanup if the inner `promise_then` call fails. Keep a reference to
833 // the handlers' synthetic context so we can store the waiter node in it once
834 // the node is created.
835 DirectHandle<Context> handlers_context;
837 requester, handlers_context,
838 SetAsyncUnlockHandlers(requester, mutex, waiting_for_callback_promise,
839 unlocked_promise));
840 LockAsyncWaiterQueueNode* waiter_node = nullptr;
841 bool locked = LockAsync(requester, mutex, internal_locked_promise,
842 unlocked_promise, &waiter_node, timeout);
843 if (locked) {
844 // Create an LockAsyncWaiterQueueNode to be queued in the async locked
845 // waiter queue.
846 DCHECK(!waiter_node);
848 requester, mutex);
849 }
850 // Don't use kWaiterQueueNodeTag here as that will cause the pointer to be
851 // stored in the shared external pointer table, which is not necessary since
852 // this object is only visible in this thread.
853 DirectHandle<Foreign> wrapper =
855 reinterpret_cast<Address>(waiter_node));
857 *wrapper);
858 return unlocked_promise;
859}
860
861// static
864 Handle<JSPromise> internal_locked_promise,
865 MaybeHandle<JSPromise> unlocked_promise,
866 LockAsyncWaiterQueueNode** waiter_node,
867 std::optional<base::TimeDelta> timeout) {
868 bool locked =
869 LockImpl(requester, mutex, timeout, [=](std::atomic<StateT>* state) {
870 return LockAsyncSlowPath(requester, mutex, state,
871 internal_locked_promise, unlocked_promise,
872 waiter_node, timeout);
873 });
874 if (locked) {
875 // Resolve `internal_locked_promise` instead of synchronously running the
876 // callback. This guarantees that the callback is run in a microtask
877 // regardless of the current state of the mutex.
879 internal_locked_promise, requester->factory()->undefined_value());
880 USE(result);
881 } else {
882 // If the promise is not resolved, keep it alive in a set in the native
883 // context. The promise will be resolved and remove from the set in
884 // `JSAtomicsMutex::HandleAsyncNotify` or
885 // `JSAtomicsMutex::HandleAsyncTimeout`.
886 AddPromiseToNativeContext(requester, internal_locked_promise);
887 }
888 return locked;
889}
890
891// static
894 Handle<JSPromise> internal_locked_promise =
895 requester->factory()->NewJSPromise();
896 AsyncWaiterNodeType* waiter_node = nullptr;
897 LockAsync(requester, mutex, internal_locked_promise, {}, &waiter_node);
898 return internal_locked_promise;
899}
900
901// static
904 std::atomic<StateT>* state, Handle<JSPromise> internal_locked_promise,
905 MaybeHandle<JSPromise> unlocked_promise,
906 LockAsyncWaiterQueueNode** waiter_node,
907 std::optional<base::TimeDelta> timeout) {
908 // Spin for a little bit to try to acquire the lock, so as to be fast under
909 // microcontention.
910 if (BackoffTryLock(isolate, mutex, state)) {
911 return true;
912 }
913
914 // At this point the lock is considered contended, create a new async waiter
915 // node in the C++ heap. It's lifetime is managed by the requester's
916 // `async_waiter_queue_nodes` list.
917 LockAsyncWaiterQueueNode* this_waiter =
919 isolate, mutex, internal_locked_promise, unlocked_promise);
920 if (!MaybeEnqueueNode(isolate, mutex, state, this_waiter)) {
921 return true;
922 }
923
924 if (timeout) {
925 // Start a timer to run the `AsyncLockTimeoutTask` after the timeout.
926 TaskRunner* taks_runner = this_waiter->task_runner();
927 auto task = std::make_unique<AsyncLockTimeoutTask>(
928 isolate->cancelable_task_manager(), this_waiter);
929 this_waiter->timeout_task_id_ = task->id();
930 taks_runner->PostNonNestableDelayedTask(std::move(task),
931 timeout->InSecondsF());
932 }
933 *waiter_node = this_waiter;
934 return false;
935}
936
937// static
940 LockAsyncWaiterQueueNode* waiter) {
941 std::atomic<StateT>* state = mutex->AtomicStatePtr();
942 // Spin for a little bit to try to acquire the lock, so as to be fast under
943 // microcontention.
944 if (BackoffTryLock(isolate, mutex, state)) {
945 return true;
946 }
947
948 return !MaybeEnqueueNode(isolate, mutex, state, waiter);
949}
950
952 Isolate* requester, DirectHandle<Foreign> async_locked_waiter_wrapper) {
953 LockAsyncWaiterQueueNode* waiter_node =
954 reinterpret_cast<LockAsyncWaiterQueueNode*>(
955 async_locked_waiter_wrapper->foreign_address<kWaiterQueueForeignTag>(
956 IsolateForSandbox(requester)));
958 if (IsCurrentThreadOwner()) {
959 Unlock(requester);
960 return;
961 }
962 // If this is reached, the lock was already released by this thread.
963 // This can happen if waitAsync is called without awaiting or due to
964 // promise prototype tampering. Setting Promise.prototype.then to a
965 // non callable will cause the `waiting_for_callback_promise` (defined in
966 // LockOrEnqueuePromise) reactions to be called even if the async callback
967 // is not resolved; as a consequence, the following code will try to unlock
968 // the mutex twice:
969 //
970 // let mutex = new Atomics.Mutex();
971 // let cv = new Atomics.Condition();
972 // Promise.prototype.then = undefined;
973 // Atomics.Mutex.lockAsync(mutex, async function() {
974 // await Atomics.Condition.waitAsync(cv, mutex);
975 // }
976}
977
980 std::atomic<StateT>* state, WaiterQueueNode* timed_out_waiter) {
981 // First acquire the queue lock, which is itself a spinlock.
982 StateT current_state = state->load(std::memory_order_relaxed);
983 // There are no waiters, but the js mutex lock may be held by another thread.
984 if (!HasWaitersField::decode(current_state)) return false;
985
986 // The details of updating the state in this function are too complicated
987 // for the waiter queue lock guard to manage, so handle the state manually.
988 while (!TryLockWaiterQueueExplicit(state, current_state)) {
990 }
991
992 // Get the waiter queue head.
993 WaiterQueueNode* waiter_head =
994 mutex->DestructivelyGetWaiterQueueHead(requester);
995
996 if (waiter_head == nullptr) {
997 // The queue is empty but the js mutex lock may be held by another thread,
998 // release the waiter queue bit without changing the "is locked" bit.
999 DCHECK(!HasWaitersField::decode(current_state));
1000 SetWaiterQueueStateOnly(state, kUnlockedUncontended);
1001 return false;
1002 }
1003
1005 &waiter_head,
1006 [&](WaiterQueueNode* node) { return node == timed_out_waiter; });
1007
1008 // Release the queue lock and install the new waiter queue head.
1009 DCHECK_EQ(state->load(),
1010 IsWaiterQueueLockedField::update(current_state, true));
1011 StateT new_state = kUnlockedUncontended;
1012 new_state = mutex->SetWaiterQueueHead(requester, waiter_head, new_state);
1013
1014 SetWaiterQueueStateOnly(state, new_state);
1015 return dequeued_node != nullptr;
1016}
1017
1018// static
1020 Isolate* requester = waiter->requester_;
1021 HandleScope scope(requester);
1022
1023 if (V8_UNLIKELY(waiter->native_context_.IsEmpty())) {
1024 // The native context was destroyed so the lock_promise was already removed
1025 // from the native context. Remove the node from the async unlocked waiter
1026 // list.
1028 return;
1029 }
1030
1031 v8::Context::Scope contextScope(waiter->GetNativeContext());
1033
1035 requester, js_mutex, js_mutex->AtomicStatePtr(), waiter);
1036 // If the waiter is no longer in the queue, then its corresponding notify
1037 // task is already in the event loop, this doesn't guarantee that the lock
1038 // will be taken by the time the notify task runs, so cancel the notify task.
1039 if (!dequeued) {
1040 TryAbortResult abort_result =
1041 requester->cancelable_task_manager()->TryAbort(waiter->notify_task_id_);
1043 USE(abort_result);
1044 }
1045
1046 DirectHandle<JSPromise> lock_promise = waiter->GetInternalWaitingPromise();
1047 DirectHandle<JSPromise> lock_async_promise = waiter->GetUnlockedPromise();
1049 requester, requester->factory()->undefined_value(), false);
1050 auto resolve_result = JSPromise::Resolve(lock_async_promise, result);
1051 USE(resolve_result);
1053 RemovePromiseFromNativeContext(requester, lock_promise);
1054}
1055
1056// static
1058 Isolate* requester = waiter->requester_;
1059 HandleScope scope(requester);
1060
1061 if (V8_UNLIKELY(waiter->native_context_.IsEmpty())) {
1062 // The native context was destroyed, so the promise was already removed. But
1063 // it is possible that other threads are holding references to the
1064 // synchronization primitive. Try to notify the next waiter.
1065 if (!waiter->synchronization_primitive_.IsEmpty()) {
1068 std::atomic<StateT>* state = js_mutex->AtomicStatePtr();
1069 StateT current_state = state->load(std::memory_order_acquire);
1070 if (HasWaitersField::decode(current_state)) {
1071 // Another thread might take the lock while we are notifying the next
1072 // waiter, so manually release the queue lock without changing the
1073 // IsLockedField bit.
1074 while (!TryLockWaiterQueueExplicit(state, current_state)) {
1076 }
1077 WaiterQueueNode* waiter_head =
1078 js_mutex->DestructivelyGetWaiterQueueHead(requester);
1079 if (waiter_head) {
1080 WaiterQueueNode* old_head = WaiterQueueNode::Dequeue(&waiter_head);
1081 old_head->Notify();
1082 }
1083 StateT new_state =
1084 js_mutex->SetWaiterQueueHead(requester, waiter_head, kEmptyState);
1085 new_state = IsWaiterQueueLockedField::update(new_state, false);
1086 SetWaiterQueueStateOnly(state, new_state);
1087 }
1088 }
1090 return;
1091 }
1092
1093 v8::Context::Scope contextScope(waiter->GetNativeContext());
1096 bool locked = LockOrEnqueueAsyncNode(requester, js_mutex, waiter);
1097 if (locked) {
1099 TryAbortResult abort_result =
1100 requester->cancelable_task_manager()->TryAbort(
1101 waiter->timeout_task_id_);
1103 USE(abort_result);
1104 }
1105 if (waiter->unlocked_promise_.IsEmpty()) {
1106 // This node came from an async wait notify giving control back to an
1107 // async lock call, so we don't need to put the node in the locked waiter
1108 // list because the original LockAsycWaiterQueueNode is already in
1109 // the locked waiter list.
1111 }
1112 js_mutex->SetCurrentThreadAsOwner();
1113 auto resolve_result =
1114 JSPromise::Resolve(promise, requester->factory()->undefined_value());
1115 USE(resolve_result);
1116 RemovePromiseFromNativeContext(requester, promise);
1117 }
1118}
1119
1120// static
1122 WaiterQueueNode* node,
1123 DequeueMatcher matcher) {
1124 auto* async_node = static_cast<WaitAsyncWaiterQueueNode*>(node);
1125 if (async_node->ready_for_async_cleanup_) {
1126 // The node is not in the waiter queue and there is no HandleNotify task
1127 // for it in the event loop. So it is safe to delete it.
1128 return;
1129 }
1130 if (async_node->IsEmpty()) {
1131 // The node's underlying synchronization primitive has been collected, so
1132 // delete it.
1133 async_node->SetNotInListForVerification();
1134 return;
1135 }
1137 async_node->GetSynchronizationPrimitive();
1138 std::atomic<StateT>* state = cv->AtomicStatePtr();
1139 StateT current_state = state->load(std::memory_order_relaxed);
1140
1141 WaiterQueueLockGuard waiter_queue_lock_guard(state, current_state);
1142
1143 WaiterQueueNode* waiter_head = cv->DestructivelyGetWaiterQueueHead(isolate);
1144 if (waiter_head) {
1146 }
1147 StateT new_state =
1148 cv->SetWaiterQueueHead(isolate, waiter_head, current_state);
1149 waiter_queue_lock_guard.set_new_state(new_state);
1150}
1151
1152// static
1155 WaiterQueueNode* waiter) {
1156 // The state pointer should not be used outside of this block as a shared GC
1157 // may reallocate it after waiting.
1158 std::atomic<StateT>* state = cv->AtomicStatePtr();
1159
1160 // Try to acquire the queue lock, which is itself a spinlock.
1161 StateT current_state = state->load(std::memory_order_relaxed);
1162 WaiterQueueLockGuard waiter_queue_lock_guard(state, current_state);
1163
1164 // With the queue lock held, enqueue the requester onto the waiter queue.
1165 WaiterQueueNode* waiter_head = cv->DestructivelyGetWaiterQueueHead(requester);
1166 WaiterQueueNode::Enqueue(&waiter_head, waiter);
1167
1168 // Release the queue lock and install the new waiter queue head.
1169 DCHECK_EQ(state->load(),
1170 IsWaiterQueueLockedField::update(current_state, true));
1171 StateT new_state =
1172 cv->SetWaiterQueueHead(requester, waiter_head, current_state);
1173 waiter_queue_lock_guard.set_new_state(new_state);
1174}
1175
1176// static
1180 std::optional<base::TimeDelta> timeout) {
1182
1183 bool rv;
1184 {
1185 // Allocate a waiter queue node on-stack, since this thread is going to
1186 // sleep and will be blocked anyway.
1187 SyncWaiterQueueNode this_waiter(requester);
1188
1189 JSAtomicsCondition::QueueWaiter(requester, cv, &this_waiter);
1190
1191 // Release the mutex and wait for another thread to wake us up, reacquiring
1192 // the mutex upon wakeup.
1193 mutex->Unlock(requester);
1194 if (timeout) {
1195 rv = this_waiter.WaitFor(*timeout);
1196 if (!rv) {
1197 // If timed out, remove ourself from the waiter list, which is usually
1198 // done by the thread performing the notifying.
1199 std::atomic<StateT>* state = cv->AtomicStatePtr();
1201 requester, cv, state, [&](WaiterQueueNode** waiter_head) {
1203 waiter_head,
1204 [&](WaiterQueueNode* node) { return node == &this_waiter; });
1205 return dequeued ? 1 : 0;
1206 });
1207 }
1208 } else {
1209 this_waiter.Wait();
1210 rv = true;
1211 }
1212 }
1213 JSAtomicsMutex::Lock(requester, mutex);
1214 return rv;
1215}
1216
1217// static
1220 std::atomic<StateT>* state, const DequeueAction& action_under_lock) {
1221 // First acquire the queue lock, which is itself a spinlock.
1222 StateT current_state = state->load(std::memory_order_relaxed);
1223
1224 if (!HasWaitersField::decode(current_state)) return 0;
1225 WaiterQueueLockGuard waiter_queue_lock_guard(state, current_state);
1226
1227 // Get the waiter queue head.
1228 WaiterQueueNode* waiter_head = cv->DestructivelyGetWaiterQueueHead(requester);
1229
1230 // There's no waiter to wake up, release the queue lock by setting it to the
1231 // empty state.
1232 if (waiter_head == nullptr) {
1233 StateT new_state = kEmptyState;
1234 waiter_queue_lock_guard.set_new_state(new_state);
1235 return 0;
1236 }
1237
1238 uint32_t num_dequeued_waiters = action_under_lock(&waiter_head);
1239
1240 // Release the queue lock and install the new waiter queue head.
1241 DCHECK_EQ(state->load(),
1242 IsWaiterQueueLockedField::update(current_state, true));
1243 StateT new_state =
1244 cv->SetWaiterQueueHead(requester, waiter_head, current_state);
1245 waiter_queue_lock_guard.set_new_state(new_state);
1246
1247 return num_dequeued_waiters;
1248}
1249
1250// static
1253 uint32_t count) {
1254 std::atomic<StateT>* state = cv->AtomicStatePtr();
1255
1256 // Dequeue count waiters.
1257 return DequeueExplicit(
1258 requester, cv, state, [=](WaiterQueueNode** waiter_head) -> uint32_t {
1259 WaiterQueueNode* old_head;
1260 if (count == 1) {
1261 old_head = WaiterQueueNode::Dequeue(waiter_head);
1262 if (!old_head) return 0;
1263 old_head->Notify();
1264 return 1;
1265 }
1266 if (count == kAllWaiters) {
1267 old_head = *waiter_head;
1268 *waiter_head = nullptr;
1269 } else {
1270 old_head = WaiterQueueNode::Split(waiter_head, count);
1271 }
1272 if (!old_head) return 0;
1273 // Notify while holding the queue lock to avoid notifying
1274 // waiters that have been deleted in other threads.
1275 return old_head->NotifyAllInList();
1276 });
1277}
1278
1279// The lockAsync flow is controlled 2 chained promises, with lock_promise being
1280// the return value of the API.
1281// 1. `internal_waiting_promise`, which will be resolved either in the notify
1282// task or in the
1283// timeout task.
1284// 2. `lock_promise`, which will be resolved when the lock is acquired after
1285// waiting.
1286// static
1290 std::optional<base::TimeDelta> timeout) {
1291 Handle<JSPromise> internal_waiting_promise =
1292 requester->factory()->NewJSPromise();
1293 DirectHandle<Context> handler_context =
1294 requester->factory()->NewBuiltinContext(requester->native_context(),
1296 handler_context->set(kMutexAsyncContextSlot, *mutex);
1297 handler_context->set(kConditionVariableAsyncContextSlot, *cv);
1298
1300 requester->heap()->atomics_condition_acquire_lock_sfi(), requester);
1301 DirectHandle<JSFunction> lock_function =
1302 Factory::JSFunctionBuilder{requester, info, handler_context}
1303 .set_map(requester->strict_function_without_prototype_map())
1304 .Build();
1305
1306 DirectHandle<JSReceiver> lock_promise;
1307
1309 requester, lock_promise,
1310 PerformPromiseThen(requester, internal_waiting_promise, lock_function));
1311
1312 // Create a new async waiter node in the C++ heap. Its lifetime is managed by
1313 // the requester's `async_waiter_queue_nodes` list.
1314 WaitAsyncWaiterQueueNode* this_waiter =
1316 requester, cv, internal_waiting_promise);
1317 QueueWaiter(requester, cv, this_waiter);
1318
1319 if (timeout) {
1320 TaskRunner* taks_runner = this_waiter->task_runner();
1321 auto task = std::make_unique<AsyncWaitTimeoutTask>(
1322 requester->cancelable_task_manager(), this_waiter);
1323 this_waiter->timeout_task_id_ = task->id();
1324 taks_runner->PostNonNestableDelayedTask(std::move(task),
1325 timeout->InSecondsF());
1326 }
1327 mutex->Unlock(requester);
1328 // Keep the wait promise alive in the native context.
1329 AddPromiseToNativeContext(requester, internal_waiting_promise);
1330 return lock_promise;
1331}
1332
1333// static
1335 Isolate* requester = waiter->requester_;
1336 if (V8_UNLIKELY(waiter->native_context_.IsEmpty())) {
1337 // The native context was destroyed so the promise was already removed
1338 // from the native context. Remove the node from the async unlocked waiter
1339 // list.
1341 return;
1342 }
1343 HandleScope scope(requester);
1345 std::atomic<StateT>* state = cv->AtomicStatePtr();
1346 uint32_t num_dequeued =
1347 DequeueExplicit(requester, cv, state, [&](WaiterQueueNode** waiter_head) {
1349 waiter_head, [&](WaiterQueueNode* node) { return node == waiter; });
1350 return dequeued ? 1 : 0;
1351 });
1352 // If the waiter is not in the queue, the notify task is already in the event
1353 // loop, so cancel the notify task.
1354 if (num_dequeued == 0) {
1355 TryAbortResult abort_result =
1356 requester->cancelable_task_manager()->TryAbort(waiter->notify_task_id_);
1358 USE(abort_result);
1359 }
1360 // Reset the timeout task id to kInvalidTaskId, otherwise the notify task will
1361 // try to cancel it.
1364}
1365
1366// static
1368 Isolate* requester = waiter->requester_;
1369 if (V8_UNLIKELY(waiter->native_context_.IsEmpty())) {
1370 // The native context was destroyed so the promise was already removed
1371 // from the native context. Remove the node from the async unlocked waiter
1372 // list.
1374 return;
1375 }
1376 HandleScope scope(requester);
1378 TryAbortResult abort_result =
1379 requester->cancelable_task_manager()->TryAbort(
1380 waiter->timeout_task_id_);
1382 USE(abort_result);
1383 }
1384 v8::Context::Scope contextScope(waiter->GetNativeContext());
1387 JSPromise::Resolve(promise, requester->factory()->undefined_value());
1388 USE(result);
1390 RemovePromiseFromNativeContext(requester, promise);
1391}
1392
1393} // namespace internal
1394} // namespace v8
void PostNonNestableDelayedTask(std::unique_ptr< Task > task, double delay_in_seconds, const SourceLocation &location=SourceLocation::Current())
static constexpr T decode(U value)
Definition bit-field.h:66
static V8_NODISCARD constexpr U update(U previous, T value)
Definition bit-field.h:61
static V8_WARN_UNUSED_RESULT MaybeHandle< Object > CallBuiltin(Isolate *isolate, DirectHandle< JSFunction > builtin, DirectHandle< Object > receiver, base::Vector< const DirectHandle< Object > > args)
Definition execution.cc:545
JSFunctionBuilder & set_map(DirectHandle< Map > v)
Definition factory.h:1116
V8_WARN_UNUSED_RESULT Handle< JSFunction > Build()
Definition factory.cc:4732
DirectHandle< Context > NewBuiltinContext(DirectHandle< NativeContext > native_context, int length)
Definition factory.cc:1490
Handle< Foreign > NewForeign(Address addr, AllocationType allocation_type=AllocationType::kYoung)
Handle< JSPromise > NewJSPromise()
Definition factory.cc:4526
std::list< std::unique_ptr< detail::WaiterQueueNode > > & async_waiter_queue_nodes()
Definition isolate.cc:7611
Handle< NativeContext > native_context()
Definition isolate-inl.h:48
CancelableTaskManager * cancelable_task_manager()
Definition isolate.h:1960
v8::internal::Factory * factory()
Definition isolate.h:1527
static V8_EXPORT_PRIVATE MaybeDirectHandle< JSReceiver > WaitAsync(Isolate *requester, DirectHandle< JSAtomicsCondition > cv, DirectHandle< JSAtomicsMutex > mutex, std::optional< base::TimeDelta > timeout)
static void QueueWaiter(Isolate *requester, DirectHandle< JSAtomicsCondition > cv, WaiterQueueNode *waiter)
static uint32_t DequeueExplicit(Isolate *requester, DirectHandle< JSAtomicsCondition > cv, std::atomic< StateT > *state, const DequeueAction &dequeue_action)
static void HandleAsyncNotify(WaitAsyncWaiterQueueNode *node)
std::function< uint32_t(WaiterQueueNode **)> DequeueAction
static void CleanupMatchingAsyncWaiters(Isolate *isolate, WaiterQueueNode *node, DequeueMatcher matcher)
static void HandleAsyncTimeout(WaitAsyncWaiterQueueNode *node)
static V8_EXPORT_PRIVATE uint32_t Notify(Isolate *requester, DirectHandle< JSAtomicsCondition > cv, uint32_t count)
static V8_EXPORT_PRIVATE bool WaitFor(Isolate *requester, DirectHandle< JSAtomicsCondition > cv, DirectHandle< JSAtomicsMutex > mutex, std::optional< base::TimeDelta > timeout)
static DirectHandle< JSPromise > LockAsyncWrapperForWait(Isolate *requester, DirectHandle< JSAtomicsMutex > mutex)
static void HandleAsyncTimeout(LockAsyncWaiterQueueNode *node)
void UnlockAsyncLockedMutex(Isolate *requester, DirectHandle< Foreign > async_locked_waiter_wrapper)
static bool TryLockExplicit(std::atomic< StateT > *state, StateT &expected)
static void HandleAsyncNotify(LockAsyncWaiterQueueNode *node)
static V8_EXPORT_PRIVATE bool MaybeEnqueueNode(Isolate *requester, DirectHandle< JSAtomicsMutex > mutex, std::atomic< StateT > *state, WaiterQueueNode *this_waiter)
static bool Lock(Isolate *requester, DirectHandle< JSAtomicsMutex > mutex, std::optional< base::TimeDelta > timeout=std::nullopt)
V8_EXPORT_PRIVATE void UnlockSlowPath(Isolate *requester, std::atomic< StateT > *state)
bool LockJSMutexOrDequeueTimedOutWaiter(Isolate *requester, std::atomic< StateT > *state, WaiterQueueNode *timed_out_waiter)
static bool LockImpl(Isolate *requester, DirectHandle< JSAtomicsMutex > mutex, std::optional< base::TimeDelta > timeout, LockSlowPathWrapper slow_path_wrapper)
static DirectHandle< JSObject > CreateResultObject(Isolate *isolate, DirectHandle< Object > value, bool success)
static std::optional< WaiterQueueLockGuard > LockWaiterQueueOrJSMutex(std::atomic< StateT > *state, StateT &current_state)
static void CleanupMatchingAsyncWaiters(Isolate *isolate, WaiterQueueNode *node, DequeueMatcher matcher)
static bool LockOrEnqueueAsyncNode(Isolate *isolate, DirectHandle< JSAtomicsMutex > mutex, LockAsyncWaiterQueueNode *node)
static bool LockAsync(Isolate *requester, DirectHandle< JSAtomicsMutex > mutex, Handle< JSPromise > internal_locked_promise, MaybeHandle< JSPromise > unlocked_promise, AsyncWaiterNodeType **waiter_node, std::optional< base::TimeDelta > timeout=std::nullopt)
std::atomic< int32_t > * AtomicOwnerThreadIdPtr()
static bool DequeueTimedOutAsyncWaiter(Isolate *requester, DirectHandle< JSAtomicsMutex > mutex, std::atomic< StateT > *state, WaiterQueueNode *timed_out_waiter)
static bool LockAsyncSlowPath(Isolate *isolate, DirectHandle< JSAtomicsMutex > mutex, std::atomic< StateT > *state, Handle< JSPromise > internal_locked_promise, MaybeHandle< JSPromise > unlocked_promise, AsyncWaiterNodeType **waiter_node, std::optional< base::TimeDelta > timeout)
static V8_EXPORT_PRIVATE bool LockSlowPath(Isolate *requester, DirectHandle< JSAtomicsMutex > mutex, std::atomic< StateT > *state, std::optional< base::TimeDelta > timeout)
static constexpr StateT kUnlockedUncontended
static MaybeDirectHandle< JSPromise > LockOrEnqueuePromise(Isolate *isolate, DirectHandle< JSAtomicsMutex > mutex, DirectHandle< Object > callback, std::optional< base::TimeDelta > timeout)
static V8_INLINE bool BackoffTryLock(Isolate *requester, DirectHandle< JSAtomicsMutex > mutex, std::atomic< StateT > *state)
static V8_EXPORT_PRIVATE void AddProperty(Isolate *isolate, DirectHandle< JSObject > object, DirectHandle< Name > name, DirectHandle< Object > value, PropertyAttributes attributes)
static V8_WARN_UNUSED_RESULT MaybeHandle< Object > Resolve(DirectHandle< JSPromise > promise, DirectHandle< Object > resolution)
Definition objects.cc:5109
static void CleanupAsyncWaiterLists(Isolate *isolate, DequeueMatcher matcher)
static bool TryLockWaiterQueueExplicit(std::atomic< StateT > *state, StateT &expected)
WaiterQueueNode * DestructivelyGetWaiterQueueHead(Isolate *requester)
static void SetWaiterQueueStateOnly(std::atomic< StateT > *state, StateT new_state)
std::function< bool(WaiterQueueNode *)> DequeueMatcher
Tagged< Object > NumWaitersForTesting(Isolate *requester)
StateT SetWaiterQueueHead(Isolate *requester, WaiterQueueNode *waiter_head, StateT new_state)
V8_INLINE DirectHandle< T > ToHandleChecked() const
V8_INLINE bool is_null() const
static HandleType< OrderedHashSet >::MaybeType Add(Isolate *isolate, HandleType< OrderedHashSet > table, DirectHandle< Object > value)
static HandleType< OrderedHashSet > Shrink(Isolate *isolate, HandleType< OrderedHashSet > table)
static bool Delete(Isolate *isolate, Tagged< OrderedHashSet > table, Tagged< Object > key)
static constexpr Tagged< Smi > FromInt(int value)
Definition smi.h:38
static constexpr ThreadId Invalid()
Definition thread-id.h:35
AsyncWaiterNotifyTask(CancelableTaskManager *cancelable_task_manager, typename T::AsyncWaiterNodeType *node)
static AsyncWaiterQueueNode< T > * NewAsyncWaiterStoredInIsolate(Isolate *requester, DirectHandle< T > synchronization_primitive, Handle< JSPromise > internal_waiting_promise, MaybeHandle< JSPromise > unlocked_promise={})
void CleanupMatchingAsyncWaiters(const DequeueMatcher &matcher) override
static void RemoveFromAsyncWaiterQueueList(AsyncWaiterQueueNode< T > *node)
AsyncWaiterQueueNode(Isolate *requester, DirectHandle< T > synchronization_primitive)
static AsyncWaiterQueueNode< T > * NewLockedAsyncWaiterStoredInIsolate(Isolate *requester, DirectHandle< T > synchronization_primitive)
AsyncWaiterQueueNode(Isolate *requester, DirectHandle< T > synchronization_primitive, DirectHandle< JSPromise > internal_waiting_promise, MaybeDirectHandle< JSPromise > unlocked_promise)
bool IsSameIsolateForAsyncCleanup(Isolate *isolate) override
AsyncWaiterTimeoutTask(CancelableTaskManager *cancelable_task_manager, typename T::AsyncWaiterNodeType *node)
void CleanupMatchingAsyncWaiters(const DequeueMatcher &matcher) override
bool IsSameIsolateForAsyncCleanup(Isolate *isolate) override
bool WaitFor(const base::TimeDelta &rel_time)
WaiterQueueLockGuard(std::atomic< StateT > *state, bool is_locked)
WaiterQueueLockGuard(std::atomic< StateT > *state, StateT &current_state)
WaiterQueueLockGuard(const WaiterQueueLockGuard &)=delete
static std::optional< WaiterQueueLockGuard > NewAlreadyLockedWaiterQueueLockGuard(std::atomic< StateT > *state)
static void DequeueAllMatchingForAsyncCleanup(WaiterQueueNode **head, const DequeueMatcher &matcher)
std::function< bool(WaiterQueueNode *)> DequeueMatcher
static void Enqueue(WaiterQueueNode **head, WaiterQueueNode *new_tail)
static int LengthFromHead(WaiterQueueNode *head)
static WaiterQueueNode * Dequeue(WaiterQueueNode **head)
static WaiterQueueNode * DequeueMatching(WaiterQueueNode **head, const DequeueMatcher &matcher)
static WaiterQueueNode * Split(WaiterQueueNode **head, uint32_t count)
Tagged< NativeContext > native_context_
Handle< SharedFunctionInfo > info
enum v8::internal::@1270::DeoptimizableCodeIterator::@67 state_
#define RETURN_ON_EXCEPTION(isolate, call)
Definition isolate.h:395
#define ASSIGN_RETURN_ON_EXCEPTION(isolate, dst, call)
Definition isolate.h:291
base::Vector< const DirectHandle< Object > > args
Definition execution.cc:74
Isolate * isolate
TNode< Object > callback
Node * node
ZoneVector< RpoNumber > & result
base::Mutex mutex
constexpr Vector< T > VectorOf(T *start, size_t size)
Definition vector.h:360
!IsContextMap !IsContextMap native_context
Definition map-inl.h:877
Tagged< To > Cast(Tagged< From > value, const v8::SourceLocation &loc=INIT_SOURCE_LOCATION_IN_DEBUG)
Definition casting.h:150
Local< T > Handle
#define UNREACHABLE()
Definition logging.h:67
#define CHECK(condition)
Definition logging.h:124
#define DCHECK_NOT_NULL(val)
Definition logging.h:492
#define DCHECK_NE(v1, v2)
Definition logging.h:486
#define DCHECK(condition)
Definition logging.h:482
#define DCHECK_EQ(v1, v2)
Definition logging.h:485
#define USE(...)
Definition macros.h:293
#define V8_UNLIKELY(condition)
Definition v8config.h:660
#define V8_NODISCARD
Definition v8config.h:693
#define YIELD_PROCESSOR