Skip to content

5.6 [Concurrency] Resolve race between task creation and concurrent escalation and cancellation #40622

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 1 addition & 47 deletions include/swift/Runtime/Concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
#ifndef SWIFT_RUNTIME_CONCURRENCY_H
#define SWIFT_RUNTIME_CONCURRENCY_H

#include "swift/ABI/AsyncLet.h"
#include "swift/ABI/Task.h"
#include "swift/ABI/TaskGroup.h"
#include "swift/ABI/AsyncLet.h"
#include "swift/ABI/TaskStatus.h"

#pragma clang diagnostic push
Expand Down Expand Up @@ -466,40 +466,6 @@ void swift_asyncLet_consume_throwing(SWIFT_ASYNC_CONTEXT AsyncContext *,
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
bool swift_taskGroup_hasTaskGroupRecord();

/// Add a status record to a task. The record should not be
/// modified while it is registered with a task.
///
/// This must be called synchronously with the task.
///
/// If the task is already cancelled, returns `false` but still adds
/// the status record.
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
bool swift_task_addStatusRecord(TaskStatusRecord *record);

/// Add a status record to a task if the task has not already
/// been cancelled. The record should not be modified while it is
/// registered with a task.
///
/// This must be called synchronously with the task.
///
/// If the task is already cancelled, returns `false` and does not
/// add the status record.
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
bool swift_task_tryAddStatusRecord(TaskStatusRecord *record);

/// Remove a status record from a task. After this call returns,
/// the record's memory can be freely modified or deallocated.
///
/// This must be called synchronously with the task. The record must
/// be registered with the task or else this may crash.
///
/// The given record need not be the last record added to
/// the task, but the operation may be less efficient if not.
///
/// Returns false if the task has been cancelled.
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
bool swift_task_removeStatusRecord(TaskStatusRecord *record);

/// Signifies whether the current task is in the middle of executing the
/// operation block of a `with(Throwing)TaskGroup(...) { <operation> }`.
///
Expand All @@ -509,18 +475,6 @@ bool swift_task_removeStatusRecord(TaskStatusRecord *record);
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
bool swift_task_hasTaskGroupStatusRecord();

/// Attach a child task to its parent task and return the newly created
/// `ChildTaskStatusRecord`.
///
/// The record must be removed with by the parent invoking
/// `swift_task_detachChild` when the child has completed.
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
ChildTaskStatusRecord* swift_task_attachChild(AsyncTask *child);

/// Remove a child task from the parent tracking it.
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void swift_task_detachChild(ChildTaskStatusRecord *record);

SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
size_t swift_task_getJobFlags(AsyncTask* task);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,30 +308,10 @@ OVERRIDE_TASK_LOCAL(task_localsCopyTo, void,
(AsyncTask *target),
(target))

OVERRIDE_TASK_STATUS(task_addStatusRecord, bool,
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift),
swift::, (TaskStatusRecord *newRecord), (newRecord))

OVERRIDE_TASK_STATUS(task_tryAddStatusRecord, bool,
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift),
swift::, (TaskStatusRecord *newRecord), (newRecord))

OVERRIDE_TASK_STATUS(task_removeStatusRecord, bool,
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift),
swift::, (TaskStatusRecord *record), (record))

OVERRIDE_TASK_STATUS(task_hasTaskGroupStatusRecord, bool,
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift),
swift::, , )

OVERRIDE_TASK_STATUS(task_attachChild, ChildTaskStatusRecord *,
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift),
swift::, (AsyncTask *child), (child))

OVERRIDE_TASK_STATUS(task_detachChild, void,
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift),
swift::, (ChildTaskStatusRecord *record), (record))

OVERRIDE_TASK_STATUS(task_cancel, void, SWIFT_EXPORT_FROM(swift_Concurrency),
SWIFT_CC(swift), swift::, (AsyncTask *task), (task))

Expand Down
11 changes: 9 additions & 2 deletions stdlib/public/Concurrency/AsyncLet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,15 @@ void swift::asyncLet_addImpl(AsyncTask *task, AsyncLet *asyncLet,
auto record = impl->getTaskRecord();
assert(impl == record && "the async-let IS the task record");

// ok, now that the group actually is initialized: attach it to the task
swift_task_addStatusRecord(record);
// ok, now that the async let task actually is initialized: attach it to the
// current task
bool addedRecord = swift_task_addStatusRecordWithChecks(
record, [&](ActiveTaskStatus parentStatus) {
swift_task_updateNewChildWithParentAndGroupState(task, parentStatus,
NULL);
return true;
});
assert(addedRecord);
}

// =============================================================================
Expand Down
22 changes: 16 additions & 6 deletions stdlib/public/Concurrency/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1055,12 +1055,22 @@ swift_task_addCancellationHandlerImpl(
auto *record = new (allocation)
CancellationNotificationStatusRecord(unsigned_handler, context);

if (swift_task_addStatusRecord(record))
return record;

// else, the task was already cancelled, so while the record was added,
// we must run it immediately here since no other task will trigger it.
record->run();
bool fireHandlerNow = false;

swift_task_addStatusRecordWithChecks(record,
[&](ActiveTaskStatus parentStatus) {
if (parentStatus.isCancelled()) {
fireHandlerNow = true;
/* We don't fire the cancellation
* handler here since this function
* needs to be idempotent */
}
return true;
});

if (fireHandlerNow) {
record->run();
}
return record;
}

Expand Down
12 changes: 7 additions & 5 deletions stdlib/public/Concurrency/TaskGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,13 @@ static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T)
assert(impl == record && "the group IS the task record");

// ok, now that the group actually is initialized: attach it to the task
bool notCancelled = swift_task_addStatusRecord(record);

// If the task has already been cancelled, reflect that immediately in
// the group status.
if (!notCancelled) impl->statusCancel();
swift_task_addStatusRecordWithChecks(record,
[&](ActiveTaskStatus parentStatus) {
if (parentStatus.isCancelled()) {
impl->statusCancel();
}
return true;
});
}

// =============================================================================
Expand Down
34 changes: 34 additions & 0 deletions stdlib/public/Concurrency/TaskPrivate.h
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,40 @@ inline bool AsyncTask::localValuePop() {
return _private().Local.popValue(this);
}

/*************** Methods for Status records manipulation ******************/

/// Remove a status record from a task. After this call returns,
/// the record's memory can be freely modified or deallocated.
///
/// This must be called synchronously with the task. The record must
/// be registered with the task or else this may crash.
///
/// The given record need not be the last record added to
/// the task, but the operation may be less efficient if not.
///
/// Returns false if the task has been cancelled.
SWIFT_EXPORT_FROM(swift_Concurrency)
SWIFT_CC(swift) bool swift_task_removeStatusRecord(TaskStatusRecord *record);

/// Add a status record to a task. This must be called synchronously with the
/// task.
///
/// This function also takes in a function_ref which is given the task status of
/// the task we're adding the record to, to determine if the current status of
/// the task permits adding the status record. This function_ref may be called
/// multiple times and must be idempotent.
SWIFT_EXPORT_FROM(swift_Concurrency)
SWIFT_CC(swift)
bool swift_task_addStatusRecordWithChecks(
TaskStatusRecord *record,
llvm::function_ref<bool(ActiveTaskStatus)> testAddRecord);

/// A helper method for updating a new child task that is created with
/// information from the parent or the group that it was going to be added to.
SWIFT_EXPORT_FROM(swift_Concurrency)
SWIFT_CC(swift)
void swift_task_updateNewChildWithParentAndGroupState(
AsyncTask *child, ActiveTaskStatus parentStatus, TaskGroup *group);
} // end namespace swift

#endif
121 changes: 65 additions & 56 deletions stdlib/public/Concurrency/TaskStatus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,14 @@ static void releaseStatusRecordLock(AsyncTask *task,
/*************************** RECORD MANAGEMENT ****************************/
/**************************************************************************/

SWIFT_EXPORT_FROM(swift_Concurrency)
SWIFT_CC(swift)
static bool swift_task_addStatusRecordImpl(TaskStatusRecord *newRecord) {
auto task = swift_task_getCurrent();
bool swift_task_addStatusRecordWithChecks(
TaskStatusRecord *newRecord,
llvm::function_ref<bool(ActiveTaskStatus status)> shouldAddRecord) {

// Load the current state. We can use a relaxed load because we're
auto task = swift_task_getCurrent();
// Load the current state. We can use a relaxed load because we're
// synchronous with the task.
auto oldStatus = task->_private().Status.load(std::memory_order_relaxed);

Expand All @@ -306,53 +309,28 @@ static bool swift_task_addStatusRecordImpl(TaskStatusRecord *newRecord) {
newRecord->resetParent(oldStatus.getInnermostRecord());

// Set the record as the new innermost record.
// We have to use a release on success to make the initialization of
// the new record visible to the cancelling thread.
ActiveTaskStatus newStatus = oldStatus.withInnermostRecord(newRecord);
if (task->_private().Status.compare_exchange_weak(oldStatus, newStatus,
/*success*/ std::memory_order_release,
/*failure*/ std::memory_order_relaxed))
return !oldStatus.isCancelled();
}
}

SWIFT_CC(swift)
static bool swift_task_tryAddStatusRecordImpl(TaskStatusRecord *newRecord) {
auto task = swift_task_getCurrent();

// Load the current state. We can use a relaxed load because we're
// synchronous with the task.
auto oldStatus = task->_private().Status.load(std::memory_order_relaxed);

while (true) {
// If the old info is already cancelled, do nothing.
if (oldStatus.isCancelled())
if (shouldAddRecord(newStatus)) {
// We have to use a release on success to make the initialization of
// the new record visible to the cancelling thread.
if (task->_private().Status.compare_exchange_weak(
oldStatus, newStatus,
/*success*/ std::memory_order_release,
/*failure*/ std::memory_order_relaxed)) {
return true;
} else {
/* Retry */
}
} else {
return false;

// Wait for any active lock to be released.
if (oldStatus.isLocked()) {
waitForStatusRecordUnlock(task, oldStatus);

if (oldStatus.isCancelled())
return false;
}

// Reset the parent of the new record.
newRecord->resetParent(oldStatus.getInnermostRecord());

// Set the record as the new innermost record.
// We have to use a release on success to make the initialization of
// the new record visible to the cancelling thread.
ActiveTaskStatus newStatus = oldStatus.withInnermostRecord(newRecord);
if (task->_private().Status.compare_exchange_weak(oldStatus, newStatus,
/*success*/ std::memory_order_release,
/*failure*/ std::memory_order_relaxed))
return true;
}
}

SWIFT_EXPORT_FROM(swift_Concurrency)
SWIFT_CC(swift)
static bool swift_task_removeStatusRecordImpl(TaskStatusRecord *record) {
bool swift_task_removeStatusRecord(TaskStatusRecord *record) {
auto task = swift_task_getCurrent();
SWIFT_TASK_DEBUG_LOG("remove status record = %p, from current task = %p",
record, task);
Expand Down Expand Up @@ -451,21 +429,42 @@ static bool swift_task_hasTaskGroupStatusRecordImpl() {
/**************************************************************************/

// ==== Child tasks ------------------------------------------------------------
SWIFT_CC(swift)
static ChildTaskStatusRecord*
swift_task_attachChildImpl(AsyncTask *child) {
void *allocation = malloc(sizeof(swift::ChildTaskStatusRecord));
auto record = new (allocation) swift::ChildTaskStatusRecord(child);
SWIFT_TASK_DEBUG_LOG("attach child task = %p, record = %p, to current task = %p",
child, record, swift_task_getCurrent());
swift_task_addStatusRecord(record);
return record;
}

/* Called in the path of linking a child into a parent/group synchronously with
* the parent task.
*
* When called to link a child into a parent directly, this does not hold the
* parent's task status record lock. When called to link a child into a task
* group, this holds the parent's task status record lock.
*/
SWIFT_EXPORT_FROM(swift_Concurrency)
SWIFT_CC(swift)
static void
swift_task_detachChildImpl(ChildTaskStatusRecord *record) {
swift_task_removeStatusRecord(record);
void swift_task_updateNewChildWithParentAndGroupState(
AsyncTask *child, ActiveTaskStatus parentStatus, TaskGroup *group) {
/*
* We can take the fast path of just modifying the ActiveTaskStatus in the
* child task since we know that it won't have any task status records and
* cannot be accessed by anyone else since it hasn't been linked in yet.
* Avoids the extra logic in `swift_task_cancel` and `swift_task_escalate`
*/
auto oldChildTaskStatus =
child->_private().Status.load(std::memory_order_relaxed);
assert(oldChildTaskStatus.getInnermostRecord() == NULL);

auto newChildTaskStatus = oldChildTaskStatus;

/* Parent task is cancelled or group the child task is part of (if any) is
* cancelled */
if (parentStatus.isCancelled() || (group && group->isCancelled())) {
newChildTaskStatus = newChildTaskStatus.withCancelled();
}

/* Parent task got escalated, make sure to propagate it to child. */
if (parentStatus.isStoredPriorityEscalated()) {
newChildTaskStatus = newChildTaskStatus.withEscalatedPriority(
parentStatus.getStoredPriority());
}
child->_private().Status.store(newChildTaskStatus, std::memory_order_relaxed);
}

SWIFT_CC(swift)
Expand All @@ -477,12 +476,22 @@ static void swift_taskGroup_attachChildImpl(TaskGroup *group,
// Acquire the status record lock of parent - we want to synchronize with
// concurrent cancellation or escalation as we're adding new tasks to the
// group.

Optional<StatusRecordLockRecord> recordLockRecord;
auto parent = swift_task_getCurrent();
auto oldStatus =
acquireStatusRecordLock(parent, recordLockRecord, LockContext::OnTask);
group->addChildTask(child);
/*
* After getting parent's status record lock, do some sanity checks to see if
* parent task or group has state changes that need to be propagated to the
* child.
*
* This is the same logic that we would do if we were adding a child task
* status record - see also asyncLet_addImpl. Since we attach a child task to
* a TaskGroupRecord instead, we synchronize on the parent's task status and
* then update the child.
*/
swift_task_updateNewChildWithParentAndGroupState(child, oldStatus, group);

// Release the status record lock, restoring exactly the old status.
releaseStatusRecordLock(parent, oldStatus, recordLockRecord);
Expand Down
2 changes: 2 additions & 0 deletions unittests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ if(("${SWIFT_HOST_VARIANT_SDK}" STREQUAL "${SWIFT_PRIMARY_VARIANT_SDK}") AND

target_include_directories(SwiftRuntimeTests BEFORE PRIVATE
${SWIFT_SOURCE_DIR}/stdlib/include)
target_include_directories(SwiftRuntimeTests BEFORE PRIVATE
${SWIFT_SOURCE_DIR}/stdlib/public)

# FIXME: cross-compile for all variants.
target_link_libraries(SwiftRuntimeTests
Expand Down
Loading