Skip to content

Commit 36ddf74

Browse files
committed
wip
1 parent ebd2e99 commit 36ddf74

File tree

2 files changed

+75
-83
lines changed

2 files changed

+75
-83
lines changed

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 73 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,10 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
312312
return true;
313313
}
314314

315+
bool isEmpty() const {
316+
return queue.empty();
317+
}
318+
315319
void enqueue(const T item) {
316320
queue.push(item);
317321
}
@@ -410,9 +414,9 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
410414

411415
/// Returns *assumed* new status, including the just performed +1.
412416
GroupStatus statusMarkWaitingAssumeAcquire() {
413-
fprintf(stderr, "[%s:%d](%s) statusMarkWaitingAssumeAcquire, load....\n", __FILE_NAME__, __LINE__, __FUNCTION__);
414-
fprintf(stderr, "[%s:%d](%s) statusMarkWaitingAssumeAcquire = %s\n", __FILE_NAME__, __LINE__, __FUNCTION__,
415-
statusLoadRelaxed().to_string(this).c_str());
417+
// fprintf(stderr, "[%s:%d](%s) statusMarkWaitingAssumeAcquire, load....\n", __FILE_NAME__, __LINE__, __FUNCTION__);
418+
// fprintf(stderr, "[%s:%d](%s) statusMarkWaitingAssumeAcquire = %s\n", __FILE_NAME__, __LINE__, __FUNCTION__,
419+
// statusLoadRelaxed().to_string(this).c_str());
416420
auto old = status.fetch_or(GroupStatus::waiting, std::memory_order_acquire);
417421
return GroupStatus{old | GroupStatus::waiting};
418422
}
@@ -723,13 +727,24 @@ static void fillGroupNextNilResult(TaskFutureWaitAsyncContext *context,
723727
// TaskGroup is locked upon entry and exit
724728
void TaskGroupImpl::enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult) {
725729
if (discardResults) {
726-
SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, eager release mode; release result task = %p",
727-
completedTask);
728-
// DO NOT RETAIN THE TASK.
729-
// We know it is Void, so we don't need to store the result;
730-
// By releasing tasks eagerly we're able to keep "infinite" task groups,
731-
// running, that never consume their values. Even more-so,
732-
return;
730+
if (hadErrorResult) {
731+
// we only store the FIRST error in discardResults mode
732+
if (readyQueue.isEmpty()) {
733+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "store first error, completedTask:%p", completedTask);
734+
// continue handling as usual, which will perform the enqueue
735+
} else {
736+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "discard error result, we already have an error stored, completedTask:%p", completedTask);
737+
// DO NOT RETAIN THE TASK.
738+
return;
739+
}
740+
} else {
741+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "discard successful result, %p", completedTask);
742+
// DO NOT RETAIN THE TASK.
743+
// We know it is Void, so we don't need to store the result;
744+
// By releasing tasks eagerly we're able to keep "infinite" task groups,
745+
// running, that never consume their values. Even more-so,
746+
return;
747+
}
733748
}
734749

735750
// Retain the task while it is in the queue; it must remain alive until
@@ -776,43 +791,56 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
776791
// W:n R:0 P:1 -> W:y R:1 P:3 // complete immediately, 2 more pending tasks
777792
GroupStatus assumed = statusAddReadyAssumeAcquire(this);
778793

794+
auto asyncContextPrefix = reinterpret_cast<FutureAsyncContextPrefix *>(
795+
reinterpret_cast<char *>(context) - sizeof(FutureAsyncContextPrefix));
796+
bool hadErrorResult = false;
797+
auto errorObject = asyncContextPrefix->errorResult;
798+
if (errorObject) {
799+
// instead, we need to enqueue this result:
800+
hadErrorResult = true;
801+
}
802+
779803
if (isDiscardingResults()) {
804+
/// If we're the last task we've been waiting for, and there is a waiting task on the group
805+
bool lastPendingTaskAndWaitingTask =
806+
assumed.pendingTasks(this) == 1 && assumed.hasWaitingTask();
807+
780808
// Immediately decrement the pending count.
781809
// We can do this, since in this mode there is no ready count to keep track of,
782810
// and we immediately discard the result.
783-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "discard result, was pending:%llu",
784-
assumed.pendingTasks(this));
811+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "discard result, hadError:%d, was pending:%llu",
812+
hadErrorResult, assumed.pendingTasks(this));
813+
if (!lastPendingTaskAndWaitingTask) {
814+
// we're not able to immediately complete a waitingTask with this task, so we may have to store it...
815+
if (hadErrorResult) {
816+
// a discardResults throwing task group must retain the FIRST error it encounters.
817+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer error, completedTask:%p", completedTask);
818+
enqueueCompletedTask(completedTask, /*hadErrorResult=*/hadErrorResult);
819+
}
820+
} // else, no need to store the task, as we'll immediately complete the waitingTask using it.
785821

786822
// If this was the last pending task, and there is a waiting task (from waitAll),
787823
// we must resume the task; but not otherwise. There cannot be any waiters on next()
788824
// while we're discarding results.
789-
if (assumed.pendingTasks(this) == 1 && assumed.hasWaitingTask()) {
790-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, discardResults, offered last pending task, resume waiting task:%p",
791-
waitQueue.load(std::memory_order_relaxed));
792-
resumeWaitingTask(completedTask, assumed, /*hadErrorResult=*/false);
825+
if (lastPendingTaskAndWaitingTask) {
826+
/// No need to maintain status????
827+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, offered last pending task, resume waiting task:%p",
828+
waitQueue.load(std::memory_order_relaxed));
829+
resumeWaitingTask(completedTask, assumed, /*hadErrorResult=*/hadErrorResult);
793830
} else {
794831
auto afterComplete = statusCompletePendingAssumeRelease(this);
795832
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, either more pending tasks, or no waiting task, status:%s",
796-
afterComplete.to_string(this).c_str());
833+
afterComplete.to_string(this).c_str());
797834
_swift_taskGroup_detachChild(asAbstract(this), completedTask);
798835
}
799836

800837
unlock();
801838
return;
802-
}
839+
} // isDiscardingResults
803840

804841
SWIFT_TASK_GROUP_DEBUG_LOG(this, "ready: %d, pending: %llu",
805842
assumed.readyTasks(this), assumed.pendingTasks(this));
806843

807-
auto asyncContextPrefix = reinterpret_cast<FutureAsyncContextPrefix *>(
808-
reinterpret_cast<char *>(context) - sizeof(FutureAsyncContextPrefix));
809-
bool hadErrorResult = false;
810-
auto errorObject = asyncContextPrefix->errorResult;
811-
if (errorObject) {
812-
// instead, we need to enqueue this result:
813-
hadErrorResult = true;
814-
}
815-
816844
// ==== a) has waiting task, so let us complete it right away
817845
if (assumed.hasWaitingTask()) {
818846
resumeWaitingTask(completedTask, assumed, hadErrorResult);
@@ -1182,8 +1210,8 @@ static void swift_taskGroup_waitAllImpl(
11821210
PollResult polled = group->waitAll(waitingTask);
11831211
switch (polled.status) {
11841212
case PollStatus::MustWait:
1185-
SWIFT_TASK_DEBUG_LOG("group(%p) waitAll MustWait, pending tasks exist, waiting task = %p",
1186-
group, waitingTask);
1213+
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAll MustWait, pending tasks exist, waiting task = %p",
1214+
waitingTask);
11871215
// The waiting task has been queued on the channel,
11881216
// there were pending tasks so it will be woken up eventually.
11891217
#ifdef __ARM_ARCH_7K__
@@ -1193,12 +1221,26 @@ static void swift_taskGroup_waitAllImpl(
11931221
return;
11941222
#endif /* __ARM_ARCH_7K__ */
11951223

1196-
case PollStatus::Empty:
11971224
case PollStatus::Error:
1225+
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAll found error, waiting task = %p, status:%s",
1226+
waitingTask, group->statusLoadRelaxed().to_string(group).c_str());
1227+
fillGroupNextResult(context, polled);
1228+
if (auto completedTask = polled.retainedTask) {
1229+
// Remove the child from the task group's running tasks list.
1230+
_swift_taskGroup_detachChild(asAbstract(group), completedTask);
1231+
1232+
// Balance the retain done by enqueueCompletedTask.
1233+
swift_release(completedTask);
1234+
}
1235+
1236+
return waitingTask->runInFullyEstablishedContext();
1237+
1238+
case PollStatus::Empty:
11981239
case PollStatus::Success:
11991240
/// Anything else than a "MustWait" can be treated as a successful poll.
12001241
/// Only if there are in flight pending tasks do we need to wait after all.
1201-
SWIFT_TASK_DEBUG_LOG("group(%p) waitAll successful, waiting task = %p", group, waitingTask);
1242+
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAll successful, waiting task = %p, status:%s",
1243+
waitingTask, group->statusLoadRelaxed().to_string(group).c_str());
12021244
fillGroupNextNilResult(context, polled);
12031245

12041246
return waitingTask->runInFullyEstablishedContext();

test/Concurrency/Runtime/async_taskgroup_throw_rethrow.swift

Lines changed: 2 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -14,56 +14,6 @@ struct IgnoredBoom: Error {}
1414
func echo(_ i: Int) async -> Int { i }
1515
func boom() async throws -> Int { throw Boom() }
1616

17-
func test_taskGroup_throws_rethrows() async {
18-
print("==== \(#function) ------")
19-
do {
20-
let got = try await withThrowingTaskGroup(of: Int.self, returning: Int.self) { group in
21-
group.addTask { await echo(1) }
22-
group.addTask { await echo(2) }
23-
group.addTask { try await boom() }
24-
25-
do {
26-
while let r = try await group.next() {
27-
print("next: \(r)")
28-
}
29-
} catch {
30-
// CHECK: error caught and rethrown in group: Boom()
31-
print("error caught and rethrown in group: \(error)")
32-
throw error
33-
}
34-
35-
print("should have thrown")
36-
return 0
37-
}
38-
39-
print("Expected error to be thrown, but got: \(got)")
40-
} catch {
41-
// CHECK: rethrown: Boom()
42-
print("rethrown: \(error)")
43-
}
44-
}
45-
46-
func test_taskGroup_noThrow_ifNotAwaitedThrowingTask() async {
47-
print("==== \(#function) ------")
48-
do {
49-
let got = try await withThrowingTaskGroup(of: Int.self, returning: Int.self) { group in
50-
group.addTask { await echo(1) }
51-
guard let r = try await group.next() else {
52-
return 0
53-
}
54-
55-
group.addTask { try await boom() }
56-
// don't consume this task, so we're not throwing here
57-
58-
return r
59-
}
60-
61-
print("Expected no error to be thrown, got: \(got)") // CHECK: expected no error to be thrown, got: 1
62-
} catch {
63-
print("Unexpected error: \(error)")
64-
}
65-
}
66-
6717
func test_taskGroup_discardResults_automaticallyRethrows() async {
6818
print("==== \(#function) ------")
6919
do {
@@ -88,8 +38,8 @@ func test_taskGroup_discardResults_automaticallyRethrows() async {
8838
@available(SwiftStdlib 5.1, *)
8939
@main struct Main {
9040
static func main() async {
91-
await test_taskGroup_throws_rethrows()
92-
await test_taskGroup_noThrow_ifNotAwaitedThrowingTask()
41+
// await test_taskGroup_throws_rethrows()
42+
// await test_taskGroup_noThrow_ifNotAwaitedThrowingTask()
9343
await test_taskGroup_discardResults_automaticallyRethrows()
9444
}
9545
}

0 commit comments

Comments
 (0)