Skip to content

[5.6] Fix priority inversions when waiting on a task status lock #40594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 75 additions & 18 deletions include/swift/Runtime/AtomicWaitQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,28 @@ class AtomicWaitQueue {
return referenceCount == 1;
}

/// This queue is being re-used with new construction arguments.
/// Update it appropriately.
void updateForNewArguments() {
// We intentionally take no arguments so that only calls to
// createQueue with no arguments will succeed at calling this no-op
// implementation. Queue types with construction arguments
// will need to implement this method to take the appropriate
// arguments. Hopefully this discourages people from forgetting
// that queues can be re-used if created in a loop.
}

/// An RAII helper class for signalling that the current thread is a
/// worker thread which has acquired the lock.
///
/// `AtomicWaitQueue` does not require the global lock to be held
/// while creating or publishing the queue. Clients taking advantage
/// of this should inform the Worker class that a created queue has
/// been published by calling `flagQueueIsPublished`. Clients who
/// wish to publish the queue while holding the global lock, perhaps
/// to get a rule that all stores are done under the lock, may instead
/// use `tryPublishQueue`.
///
/// The expected use pattern is something like:
///
/// ```
Expand All @@ -117,26 +136,47 @@ class AtomicWaitQueue {
/// while (true) {
/// if (oldStatus.isDone()) return;
///
/// // Try to publish the wait queue. If this succeeds, we've become
/// // the worker thread.
/// bool published = worker.tryPublishQueue([&] {
/// auto newStatus = oldStatus.withLock(worker.createQueue());
/// return myAtomic.compare_exchange_weak(oldStatus, newStatus,
/// /*success*/ std::memory_order_release,
/// /*failure*/ std::memory_order_acquire);
/// });
/// if (!published) continue;
/// if (oldStatus.hasWaitQueue()) {
/// bool waited = worker.tryReloadAndWait([&] {
/// oldStatus = myAtomic.load(std::memory_order_acquire);
/// return (oldStatus.hasWaitQueue() ? oldStatus.getWaitQueue()
/// : nullptr);
/// });
///
/// // Do the actual work here.
/// // If we waited, `oldStatus` will be out of date; reload it.
/// //
/// // (For the pattern in this example, where the worker thread
/// // always transitions the status to done, this is actually
/// // unnecessary: by virtue of having successfully waited, we've
/// // synchronized with the worker thread and know that the status
/// // is done, so we could just return. But in general, this
/// // reload is necessary.)
/// if (waited)
/// oldStatus = myAtomic.load(std::memory_order_acquire);
///
/// // "Unpublish" the queue from the the atomic.
/// while (true) {
/// auto newStatus = oldStatus.withDone(true);
/// if (myAtomic.compare_exchange_weak(oldStatus, newStatus,
/// // Go back and reconsider the updated status.
/// continue;
/// }
///
/// // Create a queue and try to publish it. If this succeeds,
/// // we've become the worker thread. We don't have to worry
/// // about the queue leaking if we don't use it; that's managed
/// // by the Worker class.
/// {
/// auto queue = worker.createQueue();
/// auto newStatus = oldStatus.withWaitQueue(queue);
/// if (!myAtomic.compare_exchange_weak(oldStatus, newStatus,
/// /*success*/ std::memory_order_release,
/// /*failure*/ std::memory_order_acquire))
/// break;
/// continue;
/// worker.flagQueueIsPublished(queue);
/// }
///
/// // Do the actual work here.
///
/// // Report that the work is done and "unpublish" the queue from
/// // the atomic.
/// myAtomic.store(oldStatus.withDone(true), std::memory_order_release);
/// worker.finishAndUnpublishQueue([]{});
/// return;
/// }
Expand Down Expand Up @@ -182,6 +222,19 @@ class AtomicWaitQueue {
return Published;
}

/// Given that this thread is not the worker thread and there seems
/// to be a wait queue in place, try to wait on it.
///
/// Acquire the global lock and call the given function. If it
/// returns a wait queue, wait on that queue and return true;
/// otherwise, return false.
template <class Fn>
bool tryReloadAndWait(Fn &&fn) {
assert(!isWorkerThread());
typename Impl::Waiter waiter(GlobalLock);
return waiter.tryReloadAndWait(std::forward<Fn>(fn));
}

/// Given that this thread is the worker thread, return the queue
/// that's been created and published for it.
Impl *getPublishedQueue() const {
Expand All @@ -195,14 +248,18 @@ class AtomicWaitQueue {
///
/// The Worker object takes ownership of the queue until it's
/// published, so you can safely call this even if publishing
/// might fail. Note that the same queue will be returned on
/// successive invocations, so take care if the arguments might
/// change during the loop.
/// might fail.
///
/// Note that the same queue will be returned on successive
/// invocations. Queues that accept arguments for construction
/// should implement `updateForNewArguments`.
template <class... Args>
Impl *createQueue(Args &&...args) {
assert(!Published);
if (!CurrentQueue)
CurrentQueue = asImpl().createNewQueue(std::forward<Args>(args)...);
else
CurrentQueue->updateForNewArguments(std::forward<Args>(args)...);
return CurrentQueue;
}

Expand Down
Loading