Skip to content

Fix group child tasks not being released #39204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion include/swift/ABI/TaskStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ class TaskGroupTaskStatusRecord : public TaskStatusRecord {

/// Attach the passed in `child` task to this group.
void attachChild(AsyncTask *child) {
assert(child->groupChildFragment());
assert(child->hasGroupChildFragment());
assert(child->groupChildFragment()->getGroup() == getGroup());

Expand Down Expand Up @@ -211,6 +210,33 @@ class TaskGroupTaskStatusRecord : public TaskStatusRecord {
cur->childFragment()->setNextChild(child);
}

void detachChild(AsyncTask *child) {
assert(child && "cannot remove a null child from group");
if (FirstChild == child) {
FirstChild = getNextChildTask(child);
return;
}

AsyncTask *prev = FirstChild;
// Remove the child from the linked list, i.e.:
// prev -> afterPrev -> afterChild
// ==
// child -> afterChild
// Becomes:
// prev --------------> afterChild
while (prev) {
auto afterPrev = getNextChildTask(prev);

if (afterPrev == child) {
auto afterChild = getNextChildTask(child);
prev->childFragment()->setNextChild(afterChild);
return;
}

prev = afterPrev;
}
}

static AsyncTask *getNextChildTask(AsyncTask *task) {
return task->childFragment()->getNextChild();
}
Expand Down
3 changes: 1 addition & 2 deletions stdlib/public/Concurrency/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void NullaryContinuationJob::process(Job *_job) {
void AsyncTask::completeFuture(AsyncContext *context) {
using Status = FutureFragment::Status;
using WaitQueueItem = FutureFragment::WaitQueueItem;

SWIFT_TASK_DEBUG_LOG("complete future = %p", this);
assert(isFuture());
auto fragment = futureFragment();

Expand Down Expand Up @@ -227,7 +227,6 @@ AsyncTask::~AsyncTask() {
SWIFT_CC(swift)
static void destroyTask(SWIFT_CONTEXT HeapObject *obj) {
auto task = static_cast<AsyncTask*>(obj);

task->~AsyncTask();

// The task execution itself should always hold a reference to it, so
Expand Down
72 changes: 43 additions & 29 deletions stdlib/public/Concurrency/TaskGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
// Copyright (c) 2014 - 2021 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
Expand Down Expand Up @@ -278,7 +278,7 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {

private:

// // TODO: move to lockless via the status atomic
// TODO: move to lockless via the status atomic (make readyQueue an mpsc_queue_t<ReadyQueueItem>)
mutable std::mutex mutex;

/// Used for queue management, counting number of waiting and ready tasks
Expand All @@ -289,7 +289,6 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
/// The low bits contain the status, the rest of the pointer is the
/// AsyncTask.
NaiveQueue<ReadyQueueItem> readyQueue;
// mpsc_queue_t<ReadyQueueItem> readyQueue; // TODO: can we get away with an MPSC queue here once actor executors land?

/// Single waiting `AsyncTask` currently waiting on `group.next()`,
/// or `nullptr` if no task is currently waiting.
Expand All @@ -304,10 +303,8 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
: TaskGroupTaskStatusRecord(),
status(GroupStatus::initial().status),
readyQueue(),
// readyQueue(ReadyQueueItem::get(ReadyStatus::Empty, nullptr)),
waitQueue(nullptr), successType(T) {}


TaskGroupTaskStatusRecord *getTaskRecord() {
return reinterpret_cast<TaskGroupTaskStatusRecord *>(this);
}
Expand Down Expand Up @@ -471,11 +468,18 @@ static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T)

// =============================================================================
// ==== add / attachChild ------------------------------------------------------

SWIFT_CC(swift)
static void swift_taskGroup_attachChildImpl(TaskGroup *group,
AsyncTask *child) {
SWIFT_TASK_DEBUG_LOG("attach child task = %p to group = %p\n",
child, group);

// The counterpart of this (detachChild) is performed by the group itself,
// when it offers the completed (child) task's value to a waiting task -
// during the implementation of `await group.next()`.
auto groupRecord = asImpl(group)->getTaskRecord();
return groupRecord->attachChild(child);
groupRecord->attachChild(child);
}

// =============================================================================
Expand All @@ -489,16 +493,8 @@ void TaskGroupImpl::destroy() {
// First, remove the group from the task and deallocate the record
swift_task_removeStatusRecord(getTaskRecord());

mutex.lock(); // TODO: remove lock, and use status for synchronization
// Release all ready tasks which are kept retained, the group destroyed,
// so no other task will ever await on them anymore;
ReadyQueueItem item;
bool taskDequeued = readyQueue.dequeue(item);
while (taskDequeued) {
swift_release(item.getTask());
taskDequeued = readyQueue.dequeue(item);
}
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
// By the time we call destroy, all tasks inside the group must have been
// awaited on already; We handle this on the swift side.
}

// =============================================================================
Expand All @@ -520,9 +516,10 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
assert(false && "filling a waiting status?");
return;

case PollStatus::Error:
context->fillWithError(reinterpret_cast<SwiftError*>(result.storage));
case PollStatus::Error: {
context->fillWithError(reinterpret_cast<SwiftError *>(result.storage));
return;
}

case PollStatus::Success: {
// Initialize the result as an Optional<Success>.
Expand Down Expand Up @@ -552,16 +549,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
assert(completedTask->hasChildFragment());
assert(completedTask->hasGroupChildFragment());
assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this));

// We retain the completed task, because we will either:
// - (a) schedule the waiter to resume on the next() that it is waiting on, or
// - (b) will need to store this task until the group task enters next() and
// picks up this task.
// either way, there is some time between us returning here, and the `completeTask`
// issuing a swift_release on this very task. We need to keep it alive until
// we have the chance to poll it from the queue (via the waiter task entering
// calling next()).
swift_retain(completedTask);
SWIFT_TASK_DEBUG_LOG("offer task %p to group %p", completedTask, this);

mutex.lock(); // TODO: remove fragment lock, and use status for synchronization

Expand All @@ -585,6 +573,8 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
// ==== a) has waiting task, so let us complete it right away
if (assumed.hasWaitingTask()) {
auto waitingTask = waitQueue.load(std::memory_order_acquire);
SWIFT_TASK_DEBUG_LOG("group has waiting task = %p, complete with = %p",
waitingTask, completedTask);
while (true) {
// ==== a) run waiting task directly -------------------------------------
assert(assumed.hasWaitingTask());
Expand All @@ -606,6 +596,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
waitingTask->ResumeContext);

fillGroupNextResult(waitingContext, result);
detachChild(result.retainedTask);

_swift_tsan_acquire(static_cast<Job *>(waitingTask));

Expand All @@ -614,6 +605,8 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
return;
} // else, try again
}

llvm_unreachable("should have enqueued and returned.");
}

// ==== b) enqueue completion ------------------------------------------------
Expand All @@ -622,10 +615,12 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
// queue when a task polls during next() it will notice that we have a value
// ready for it, and will process it immediately without suspending.
assert(!waitQueue.load(std::memory_order_relaxed));

SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, RETAIN and store ready task = %p",
completedTask);
// Retain the task while it is in the queue;
// it must remain alive until the task group is alive.
swift_retain(completedTask);

auto readyItem = ReadyQueueItem::get(
hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success,
completedTask
Expand Down Expand Up @@ -667,6 +662,7 @@ SWIFT_CC(swiftasync) static void workaround_function_swift_taskGroup_wait_next_t

// =============================================================================
// ==== group.next() implementation (wait_next and groupPoll) ------------------

SWIFT_CC(swiftasync)
static void swift_taskGroup_wait_next_throwingImpl(
OpaqueValue *resultPointer, SWIFT_ASYNC_CONTEXT AsyncContext *callerContext,
Expand All @@ -690,6 +686,8 @@ static void swift_taskGroup_wait_next_throwingImpl(
PollResult polled = group->poll(waitingTask);
switch (polled.status) {
case PollStatus::MustWait:
SWIFT_TASK_DEBUG_LOG("poll group = %p, no ready tasks, waiting task = %p",
group, waitingTask);
// The waiting task has been queued on the channel,
// there were pending tasks so it will be woken up eventually.
#ifdef __ARM_ARCH_7K__
Expand All @@ -702,13 +700,22 @@ static void swift_taskGroup_wait_next_throwingImpl(
case PollStatus::Empty:
case PollStatus::Error:
case PollStatus::Success:
SWIFT_TASK_DEBUG_LOG("poll group = %p, task = %p, ready task available = %p",
group, waitingTask, polled.retainedTask);
fillGroupNextResult(context, polled);
if (auto completedTask = polled.retainedTask) {
// it would be null for PollStatus::Empty, then we don't need to release
group->detachChild(polled.retainedTask);
swift_release(polled.retainedTask);
}

return waitingTask->runInFullyEstablishedContext();
}
}

PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
mutex.lock(); // TODO: remove group lock, and use status for synchronization
SWIFT_TASK_DEBUG_LOG("poll group = %p", this);
auto assumed = statusMarkWaitingAssumeAcquire();

PollResult result;
Expand All @@ -718,6 +725,7 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {

// ==== 1) bail out early if no tasks are pending ----------------------------
if (assumed.isEmpty()) {
SWIFT_TASK_DEBUG_LOG("poll group = %p, group is empty, no pending tasks", this);
// No tasks in flight, we know no tasks were submitted before this poll
// was issued, and if we parked here we'd potentially never be woken up.
// Bail out and return `nil` from `group.next()`.
Expand All @@ -735,6 +743,9 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {

// ==== 2) Ready task was polled, return with it immediately -----------------
if (assumed.readyTasks()) {
SWIFT_TASK_DEBUG_LOG("poll group = %p, group has ready tasks = %d",
this, assumed.readyTasks());

auto assumedStatus = assumed.status;
auto newStatus = TaskGroupImpl::GroupStatus{assumedStatus};
if (status.compare_exchange_weak(
Expand Down Expand Up @@ -838,13 +849,16 @@ static void swift_taskGroup_cancelAllImpl(TaskGroup *group) {
}

bool TaskGroupImpl::cancelAll() {
SWIFT_TASK_DEBUG_LOG("cancel all tasks in group = %p", this);

// store the cancelled bit
auto old = statusCancel();
if (old.isCancelled()) {
// already was cancelled previously, nothing to do?
return false;
}

// FIXME: must also remove the records!!!!
// cancel all existing tasks within the group
swift_task_cancel_group_child_tasks(asAbstract(this));
return true;
Expand Down
5 changes: 4 additions & 1 deletion stdlib/public/Concurrency/TaskPrivate.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@
namespace swift {

// Set to 1 to enable helpful debug spew to stderr
// If this is enabled, tests with `swift_task_debug_log` requirement can run.
#if 0
#define SWIFT_TASK_DEBUG_LOG(fmt, ...) \
fprintf(stderr, "[%lu] " fmt "\n", (unsigned long)_swift_get_thread_id(), \
fprintf(stderr, "[%lu] [%s:%d](%s) " fmt "\n", \
(unsigned long)_swift_get_thread_id(), \
__FILE__, __LINE__, __FUNCTION__, \
__VA_ARGS__)
#else
#define SWIFT_TASK_DEBUG_LOG(fmt, ...) (void)0
Expand Down
5 changes: 5 additions & 0 deletions stdlib/public/Concurrency/TaskStatus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ static bool swift_task_tryAddStatusRecordImpl(TaskStatusRecord *newRecord) {
SWIFT_CC(swift)
static bool swift_task_removeStatusRecordImpl(TaskStatusRecord *record) {
auto task = swift_task_getCurrent();
SWIFT_TASK_DEBUG_LOG("remove status record = %p, from current task = %p",
record, task);

// Load the current state.
auto &status = task->_private().Status;
Expand Down Expand Up @@ -454,6 +456,8 @@ static ChildTaskStatusRecord*
swift_task_attachChildImpl(AsyncTask *child) {
void *allocation = malloc(sizeof(swift::ChildTaskStatusRecord));
auto record = new (allocation) swift::ChildTaskStatusRecord(child);
SWIFT_TASK_DEBUG_LOG("attach child task = %p, record = %p, to current task = %p",
child, record, swift_task_getCurrent());
swift_task_addStatusRecord(record);
return record;
}
Expand Down Expand Up @@ -548,6 +552,7 @@ static void performGroupCancellationAction(TaskStatusRecord *record) {

SWIFT_CC(swift)
static void swift_task_cancelImpl(AsyncTask *task) {
SWIFT_TASK_DEBUG_LOG("cancel task = %p", task);
Optional<StatusRecordLockRecord> recordLockRecord;

// Acquire the status record lock.
Expand Down
39 changes: 39 additions & 0 deletions test/Concurrency/Runtime/async_task_withUnsafeCurrentTask.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) 2>&1 | %FileCheck %s --dump-input=always
// REQUIRES: executable_test
// REQUIRES: concurrency
// REQUIRES: swift_task_debug_log

// UNSUPPORTED: use_os_stdlib
// UNSUPPORTED: back_deployment_runtime

#if os(Linux)
import Glibc
#elseif os(Windows)
import MSVCRT
#else
import Darwin
#endif

func test_withUnsafeCurrentTask() async {
// The task we're running in ("main")
// CHECK: creating task [[MAIN_TASK:0x.*]] with parent 0x0

// CHECK: creating task [[TASK:0x.*]] with parent 0x0
let t = Task.detached {
withUnsafeCurrentTask { task in
fputs("OK: \(task!)", stderr)
}
fputs("DONE", stderr)
}

// CHECK: OK: UnsafeCurrentTask(_task: (Opaque Value))
// CHECK: DONE
// CHECK: destroy task [[TASK]]
await t.value
}

@main struct Main {
static func main() async {
await test_withUnsafeCurrentTask()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ func test_taskGroup_next() async {
do {
while let r = try await group.next() {
sum += r
print("add \(r) -> sum: \(sum)")
}
} catch {
catches += 1
print("catch: \(catches)")
}
}

Expand Down
Loading