Skip to content

[WIP][Concurrency] Optimize Void task group, to not store completed tasks #62246

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 77 additions & 22 deletions stdlib/public/BackDeployConcurrency/TaskGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
/*task*/ asyncTask
};
}

static PollResult getVoid() {
return PollResult{
/*status*/ PollStatus::Success,
/*storage*/ nullptr,
/*successType*/nullptr, // TODO: Void.self
/*task*/ nullptr
};
}
};

/// An item within the message queue of a group.
Expand Down Expand Up @@ -555,13 +564,37 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
}
}

static void fillGroupNextVoidResult(TaskFutureWaitAsyncContext *context,
PollResult result) {
/// Fill in the result value
switch (result.status) {
case PollStatus::MustWait:
assert(false && "filling a waiting status?");
return;

case PollStatus::Error: {
assert(false && "this type of task group cannot throw");
return;
}

case PollStatus::Success:
case PollStatus::Empty: {
// "Success" type is guaranteed to be Void
// Initialize the result as a nil Optional<Success>.
const Metadata *successType = result.successType;
OpaqueValue *destPtr = context->successResultPointer;
successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1);
return;
}
}
}

void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
assert(completedTask);
assert(completedTask->isFuture());
assert(completedTask->hasChildFragment());
assert(completedTask->hasGroupChildFragment());
assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this));
SWIFT_TASK_DEBUG_LOG("offer task %p to group %p", completedTask, this);

mutex.lock(); // TODO: remove fragment lock, and use status for synchronization

Expand All @@ -572,6 +605,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
// W:n R:0 P:1 -> W:y R:1 P:1 // complete immediately
// W:n R:0 P:1 -> W:y R:1 P:3 // complete immediately, 2 more pending tasks
auto assumed = statusAddReadyAssumeAcquire();
SWIFT_TASK_DEBUG_LOG("offer task %p to group %p, tasks pending = %d", completedTask, assumed.pendingTasks());

auto asyncContextPrefix = reinterpret_cast<FutureAsyncContextPrefix *>(
reinterpret_cast<char *>(context) - sizeof(FutureAsyncContextPrefix));
Expand Down Expand Up @@ -607,7 +641,13 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
static_cast<TaskFutureWaitAsyncContext *>(
waitingTask->ResumeContext);

fillGroupNextResult(waitingContext, result);
if (this->eagerlyReleaseCompleteTasks) {
fprintf(stderr, "[%s:%d](%s) offer: eagerlyReleaseCompleteTasks\n", __FILE_NAME__, __LINE__, __FUNCTION__);
fillGroupNextResult(waitingContext, result);
} else {
fprintf(stderr, "[%s:%d](%s) offer: NOT\n", __FILE_NAME__, __LINE__, __FUNCTION__);
fillGroupNextResult(waitingContext, result);
}
detachChild(result.retainedTask);

_swift_tsan_acquire(static_cast<Job *>(waitingTask));
Expand All @@ -627,20 +667,31 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
// queue when a task polls during next() it will notice that we have a value
// ready for it, and will process it immediately without suspending.
assert(!waitQueue.load(std::memory_order_relaxed));
SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, RETAIN and store ready task = %p",
completedTask);
// Retain the task while it is in the queue;
// it must remain alive until the task group is alive.
swift_retain(completedTask);

auto readyItem = ReadyQueueItem::get(
hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success,
completedTask
);
if (!this->eagerlyReleaseCompleteTasks) {
SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, RETAIN and store ready task = %p",
completedTask);
// Retain the task while it is in the queue;
// it must remain alive until the task group is alive.
swift_retain(completedTask);

auto readyItem = ReadyQueueItem::get(
hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success,
completedTask
);

assert(completedTask == readyItem.getTask());
assert(readyItem.getTask()->isFuture());
readyQueue.enqueue(readyItem);
} else {
assert(this->eagerlyReleaseCompleteTasks);
// DO NOT retain the task; and do not store the value in the readyQueue at all (!)
//
// In the "eagerlyRelease" completed tasks mode, we are guaranteed that tasks are of Void type,
// and thus there is no necessity to store values, because we can always "make them up" when polled.
// From the user's perspective, it is indistinguishable if they received the "real value" or one we "made up",
// because Void is always the same, and cannot be examined in any way to determine if it was the "actual" Void or not.
}

assert(completedTask == readyItem.getTask());
assert(readyItem.getTask()->isFuture());
readyQueue.enqueue(readyItem);
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
return;
}
Expand Down Expand Up @@ -698,7 +749,7 @@ static void swift_taskGroup_wait_next_throwingImpl(
PollResult polled = group->poll(waitingTask);
switch (polled.status) {
case PollStatus::MustWait:
SWIFT_TASK_DEBUG_LOG("poll group = %p, no ready tasks, waiting task = %p",
SWIFT_TASK_DEBUG_LOG("poll group = %p, tasks ready = 0, waiting task = %p",
group, waitingTask);
// The waiting task has been queued on the channel,
// there were pending tasks so it will be woken up eventually.
Expand All @@ -714,13 +765,17 @@ static void swift_taskGroup_wait_next_throwingImpl(
case PollStatus::Success:
SWIFT_TASK_DEBUG_LOG("poll group = %p, task = %p, ready task available = %p",
group, waitingTask, polled.retainedTask);
fillGroupNextResult(context, polled);
if (this->eagerlyReleaseCompleteTasks) {
fillGroupNextVoidResult(context, polled);
} else {
fillGroupNextResult(context, polled);
}

if (auto completedTask = polled.retainedTask) {
// it would be null for PollStatus::Empty, then we don't need to release
group->detachChild(polled.retainedTask);
swift_release(polled.retainedTask);
group->detachChild(completedTask);
swift_release(completedTask);
}

return waitingTask->runInFullyEstablishedContext();
}
}
Expand Down Expand Up @@ -755,8 +810,8 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {

// ==== 2) Ready task was polled, return with it immediately -----------------
if (assumed.readyTasks()) {
SWIFT_TASK_DEBUG_LOG("poll group = %p, group has ready tasks = %d",
this, assumed.readyTasks());
SWIFT_TASK_DEBUG_LOG("poll group = %p, tasks ready=%d, pending=%d",
this, assumed.readyTasks(), assumed.pendingTasks());

auto assumedStatus = assumed.status;
auto newStatus = TaskGroupImpl::GroupStatus{assumedStatus};
Expand Down
1 change: 1 addition & 0 deletions stdlib/public/BackDeployConcurrency/TaskGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ public struct ThrowingTaskGroup<ChildTaskResult: Sendable, Failure: Error> {
}
}

// TODO(ktoso): doesn't seem to be used?
@usableFromInline
internal mutating func _waitForAll() async throws {
while let _ = try await next() { }
Expand Down
130 changes: 108 additions & 22 deletions stdlib/public/Concurrency/TaskGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
/*task*/ asyncTask
};
}

static PollResult getVoid() {
return PollResult{
/*status*/ PollStatus::Empty,
/*storage*/ nullptr,
/*successType*/nullptr, // TODO: Void.self
/*task*/ nullptr
};
}
};

/// An item within the message queue of a group.
Expand Down Expand Up @@ -322,11 +331,14 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
friend class ::swift::AsyncTask;

public:
explicit TaskGroupImpl(const Metadata *T)
const bool eagerlyReleaseCompleteTasks;
explicit TaskGroupImpl(const Metadata *T, bool eagerlyReleaseCompleteTasks)
: TaskGroupTaskStatusRecord(),
status(GroupStatus::initial().status),
readyQueue(),
waitQueue(nullptr), successType(T) {}
waitQueue(nullptr),
successType(T),
eagerlyReleaseCompleteTasks(eagerlyReleaseCompleteTasks) {}

TaskGroupTaskStatusRecord *getTaskRecord() {
return reinterpret_cast<TaskGroupTaskStatusRecord *>(this);
Expand Down Expand Up @@ -488,7 +500,7 @@ SWIFT_CC(swift)
static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T) {
SWIFT_TASK_DEBUG_LOG("creating task group = %p", group);

TaskGroupImpl *impl = ::new (group) TaskGroupImpl(T);
TaskGroupImpl *impl = ::new (group) TaskGroupImpl(T, /*eagerlyReleaseCompleteTasks=*/true);
auto record = impl->getTaskRecord();
assert(impl == record && "the group IS the task record");

Expand Down Expand Up @@ -525,6 +537,12 @@ static void swift_taskGroup_destroyImpl(TaskGroup *group) {

void TaskGroupImpl::destroy() {
SWIFT_TASK_DEBUG_LOG("destroying task group = %p", this);
if (!this->isEmpty()) {
auto status = this->statusLoadRelaxed();
SWIFT_TASK_DEBUG_LOG("destroying task group = %p, tasks .ready = %d, .pending = %d",
this, status.readyTasks(), status.pendingTasks());
}
assert(this->isEmpty() && "Attempted to destroy non-empty task group!");

// First, remove the group from the task and deallocate the record
removeStatusRecord(getTaskRecord());
Expand Down Expand Up @@ -583,20 +601,68 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
}
}

static void fillGroupNextVoidResult(TaskFutureWaitAsyncContext *context,
PollResult result) {
/// Fill in the result value
switch (result.status) {
case PollStatus::MustWait:
assert(false && "filling a waiting status?");
return;

case PollStatus::Error: {
assert(false && "cannot have errors");
return;
}

case PollStatus::Success: {
// Initialize the result as an Optional<Void>.
const Metadata *successType = result.successType;
OpaqueValue *destPtr = context->successResultPointer;
// TODO: figure out a way to try to optimistically take the
// value out of the finished task's future, if there are no
// remaining references to it.
successType->vw_initializeWithCopy(destPtr, result.storage);
successType->vw_storeEnumTagSinglePayload(destPtr, 0, 1);
return;
}

case PollStatus::Empty: {
// Initialize the result as a nil Optional<Success>.
const Metadata *successType = result.successType;
OpaqueValue *destPtr = context->successResultPointer;
successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1);
return;
}
}
}

// TaskGroup is locked upon entry and exit
void TaskGroupImpl::enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult) {
// Retain the task while it is in the queue;
// it must remain alive until the task group is alive.
swift_retain(completedTask);

auto readyItem = ReadyQueueItem::get(
hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success,
completedTask
);

assert(completedTask == readyItem.getTask());
assert(readyItem.getTask()->isFuture());
readyQueue.enqueue(readyItem);
if (this->eagerlyReleaseCompleteTasks) {
SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, eager release mode; release result task = %p",
completedTask);
// DO NOT RETAIN THE TASK.
// We know it is Void, so we don't need to store the result;
// By releasing tasks eagerly we're able to keep "infinite" task groups,
// running, that never consume their values. Even more-so,
return;
}

SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, RETAIN and store ready task = %p",
completedTask);

// Retain the task while it is in the queue;
// it must remain alive until the task group is alive.
swift_retain(completedTask);

auto readyItem = ReadyQueueItem::get(
hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success,
completedTask
);

assert(completedTask == readyItem.getTask());
assert(readyItem.getTask()->isFuture());
readyQueue.enqueue(readyItem);
}

void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
Expand All @@ -617,6 +683,9 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
// W:n R:0 P:1 -> W:y R:1 P:3 // complete immediately, 2 more pending tasks
auto assumed = statusAddReadyAssumeAcquire();

SWIFT_TASK_DEBUG_LOG("group %p, ready: %d, pending: %d",
this, assumed.readyTasks(), assumed.pendingTasks());

auto asyncContextPrefix = reinterpret_cast<FutureAsyncContextPrefix *>(
reinterpret_cast<char *>(context) - sizeof(FutureAsyncContextPrefix));
bool hadErrorResult = false;
Expand Down Expand Up @@ -686,8 +755,6 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
// ready for it, and will process it immediately without suspending.
assert(!waitQueue.load(std::memory_order_relaxed));

SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, RETAIN and store ready task = %p",
completedTask);
enqueueCompletedTask(completedTask, hadErrorResult);
unlock(); // TODO: remove fragment lock, and use status for synchronization
}
Expand Down Expand Up @@ -764,7 +831,11 @@ static void swift_taskGroup_wait_next_throwingImpl(
case PollStatus::Success:
SWIFT_TASK_DEBUG_LOG("poll group = %p, task = %p, ready task available = %p",
group, waitingTask, polled.retainedTask);
fillGroupNextResult(context, polled);
if (group->eagerlyReleaseCompleteTasks) {
fillGroupNextVoidResult(context, polled);
} else {
fillGroupNextResult(context, polled);
}
if (auto completedTask = polled.retainedTask) {
// it would be null for PollStatus::Empty, then we don't need to release
group->detachChild(polled.retainedTask);
Expand Down Expand Up @@ -811,8 +882,8 @@ reevaluate_if_taskgroup_has_results:;

// ==== 2) Ready task was polled, return with it immediately -----------------
if (assumed.readyTasks()) {
SWIFT_TASK_DEBUG_LOG("poll group = %p, group has ready tasks = %d",
this, assumed.readyTasks());
SWIFT_TASK_DEBUG_LOG("poll group = %p, tasks .ready = %d, .pending = %d",
this, assumed.readyTasks(), assumed.pendingTasks());

auto assumedStatus = assumed.status;
auto newStatus = TaskGroupImpl::GroupStatus{assumedStatus};
Expand All @@ -829,6 +900,17 @@ reevaluate_if_taskgroup_has_results:;

// Success! We are allowed to poll.
ReadyQueueItem item;
if (this->eagerlyReleaseCompleteTasks) {
SWIFT_TASK_DEBUG_LOG("poll group = %p; polled in eager-release mode; make up Void value to yield",
this, assumed.readyTasks(), assumed.pendingTasks());
result.status = PollStatus::Success;
result.storage = nullptr;
result.retainedTask = nullptr;
result.successType = this->successType;
unlock(); // TODO: remove fragment lock, and use status for synchronization
return result;
}

bool taskDequeued = readyQueue.dequeue(item);
assert(taskDequeued); (void) taskDequeued;

Expand Down Expand Up @@ -956,10 +1038,14 @@ bool TaskGroupImpl::cancelAll() {

// =============================================================================
// ==== addPending -------------------------------------------------------------

SWIFT_CC(swift)
static bool swift_taskGroup_addPendingImpl(TaskGroup *group, bool unconditionally) {
auto assumedStatus = asImpl(group)->statusAddPendingTaskRelaxed(unconditionally);
return !assumedStatus.isCancelled();
auto assumed = asImpl(group)->statusAddPendingTaskRelaxed(unconditionally);
SWIFT_TASK_DEBUG_LOG("add pending %s to group %p, tasks pending = %d",
unconditionally ? "unconditionally" : "",
group, assumed.pendingTasks());
return !assumed.isCancelled();
}

#define OVERRIDE_TASK_GROUP COMPATIBILITY_OVERRIDE
Expand Down
Loading