Skip to content

Commit ebd2e99

Browse files
committed
prepare for throwing
1 parent bac9182 commit ebd2e99

File tree

5 files changed

+156
-77
lines changed

5 files changed

+156
-77
lines changed

stdlib/public/BackDeployConcurrency/TaskGroup.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
566566
}
567567

568568
case PollStatus::Empty: {
569-
// Initialize the result as a nil Optional<Success>.
569+
// Initialize the result as a .none Optional<Success>.
570570
const Metadata *successType = result.successType;
571571
OpaqueValue *destPtr = context->successResultPointer;
572572
successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1);

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 36 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,13 @@ using namespace swift;
5454
/*************************** TASK GROUP ***************************************/
5555
/******************************************************************************/
5656

57-
#if SWIFT_TASK_DEBUG_LOG_ENABLED
58-
#define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) \
59-
SWIFT_TASK_DEBUG_LOG("group(%p%s) " fmt, \
60-
group, group->isDiscardingResults() ? ",discardResults" : "", \
61-
__VA_ARGS__)
57+
#if 1
58+
#define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) \
59+
fprintf(stderr, "[%#lx] [%s:%d](%s) group(%p%s) " fmt "\n", \
60+
(unsigned long)Thread::current().platformThreadId(), \
61+
__FILE__, __LINE__, __FUNCTION__, \
62+
group, group->isDiscardingResults() ? ",discardResults" : "", \
63+
__VA_ARGS__)
6264
#else
6365
#define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) (void)0
6466
#endif
@@ -212,13 +214,13 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
212214
}
213215

214216
unsigned int readyTasks(const TaskGroupImpl* _Nonnull group) {
215-
assert(group && group->isAccumulatingResults()
217+
assert(group->isAccumulatingResults()
216218
&& "attempted to check ready tasks on group that does not accumulate results!");
217219
return (status & maskReady) >> 31;
218220
}
219221

220-
uint64_t pendingTasks(const TaskGroupImpl* _Nullable group) {
221-
if (group && group->isAccumulatingResults()) {
222+
uint64_t pendingTasks(const TaskGroupImpl* _Nonnull group) {
223+
if (group->isAccumulatingResults()) {
222224
return (status & maskAccumulatingPending);
223225
} else {
224226
return (status & maskDiscardingPending);
@@ -260,17 +262,16 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
260262
/// GroupStatus{ C:{cancelled} W:{waiting task} R:{ready tasks} P:{pending tasks} {binary repr} }
261263
/// If discarding results:
262264
/// GroupStatus{ C:{cancelled} W:{waiting task} P:{pending tasks} {binary repr} }
263-
std::string to_string(const TaskGroupImpl* _Nullable group) {
265+
std::string to_string(const TaskGroupImpl* _Nonnull group) {
264266
std::string str;
265267
str.append("GroupStatus{ ");
266268
str.append("C:"); // cancelled
267-
str.append(isCancelled() ? "y " : "n ");
268-
str.append("W:"); // has waiting task
269+
str.append(isCancelled() ? "y" : "n");
270+
str.append(" W:"); // has waiting task
269271
str.append(hasWaitingTask() ? "y" : "n");
270272
if (group && group->isAccumulatingResults()) {
271273
str.append(" R:"); // ready
272274
str.append(std::to_string(readyTasks(group)));
273-
str.append(" ");
274275
}
275276
str.append(" P:"); // pending
276277
str.append(std::to_string(pendingTasks(group)));
@@ -411,7 +412,7 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
411412
GroupStatus statusMarkWaitingAssumeAcquire() {
412413
fprintf(stderr, "[%s:%d](%s) statusMarkWaitingAssumeAcquire, load....\n", __FILE_NAME__, __LINE__, __FUNCTION__);
413414
fprintf(stderr, "[%s:%d](%s) statusMarkWaitingAssumeAcquire = %s\n", __FILE_NAME__, __LINE__, __FUNCTION__,
414-
statusLoadRelaxed().to_string(nullptr).c_str());
415+
statusLoadRelaxed().to_string(this).c_str());
415416
auto old = status.fetch_or(GroupStatus::waiting, std::memory_order_acquire);
416417
return GroupStatus{old | GroupStatus::waiting};
417418
}
@@ -693,12 +694,16 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
693694
// Initialize the result as an Optional<Success>.
694695
const Metadata *successType = result.successType;
695696
OpaqueValue *destPtr = context->successResultPointer;
696-
successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1);
697+
// TODO: figure out a way to try to optimistically take the
698+
// value out of the finished task's future, if there are no
699+
// remaining references to it.
700+
successType->vw_initializeWithCopy(destPtr, result.storage);
701+
successType->vw_storeEnumTagSinglePayload(destPtr, 0, 1);
697702
return;
698703
}
699704

700705
case PollStatus::Empty: {
701-
// Initialize the result as an Optional<Success>.
706+
// Initialize the result as a .none Optional<Success>.
702707
const Metadata *successType = result.successType;
703708
OpaqueValue *destPtr = context->successResultPointer;
704709
successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1);
@@ -709,37 +714,10 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
709714

710715
static void fillGroupNextNilResult(TaskFutureWaitAsyncContext *context,
711716
PollResult result) {
712-
// /// Fill in the result value
713-
// switch (result.status) {
714-
// case PollStatus::MustWait:
715-
// assert(false && "filling a waiting status?");
716-
// return;
717-
//
718-
// case PollStatus::Error: {
719-
// assert(false && "cannot have errors");
720-
// return;
721-
// }
722-
//
723-
// case PollStatus::Success: {
724-
// // Initialize the result as an Optional<Void>.
725-
// const Metadata *successType = result.successType;
726-
// OpaqueValue *destPtr = context->successResultPointer;
727-
// // TODO: figure out a way to try to optimistically take the
728-
// // value out of the finished task's future, if there are no
729-
// // remaining references to it.
730-
// successType->vw_initializeWithCopy(destPtr, result.storage);
731-
// successType->vw_storeEnumTagSinglePayload(destPtr, 0, 1);
732-
// return;
733-
// }
734-
//
735-
// case PollStatus::Empty: {
736-
// Initialize the result as a nil Optional<Success>.
737-
const Metadata *successType = result.successType;
738-
OpaqueValue *destPtr = context->successResultPointer;
739-
successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1);
740-
return;
741-
// }
742-
// }
717+
// Initialize the result as a .none Optional<Success>.
718+
const Metadata *successType = result.successType;
719+
OpaqueValue *destPtr = context->successResultPointer;
720+
successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1);
743721
}
744722

745723
// TaskGroup is locked upon entry and exit
@@ -774,7 +752,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
774752
assert(completedTask->hasChildFragment());
775753
assert(completedTask->hasGroupChildFragment());
776754
assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this));
777-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, completedTask:%p , status:%s", completedTask, statusLoadRelaxed().to_string(this).c_str());
755+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "ENTER offer, completedTask:%p , status:%s", completedTask, statusLoadRelaxed().to_string(this).c_str());
778756

779757
// The current ownership convention is that we are *not* given ownership
780758
// of a retain on completedTask; we're called from the task completion
@@ -802,19 +780,19 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
802780
// Immediately decrement the pending count.
803781
// We can do this, since in this mode there is no ready count to keep track of,
804782
// and we immediately discard the result.
805-
SWIFT_TASK_DEBUG_LOG("group(%p) discard result, was pending:%llu",
806-
this, assumed.pendingTasks(this));
783+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "discard result, was pending:%llu",
784+
assumed.pendingTasks(this));
807785

808786
// If this was the last pending task, and there is a waiting task (from waitAll),
809787
// we must resume the task; but not otherwise. There cannot be any waiters on next()
810788
// while we're discarding results.
811789
if (assumed.pendingTasks(this) == 1 && assumed.hasWaitingTask()) {
812-
SWIFT_TASK_DEBUG_LOG("group(%p) offer, discardResults, offered last pending task, resume waiting task:%p",
813-
this, waitQueue.load(std::memory_order_relaxed));
790+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, discardResults, offered last pending task, resume waiting task:%p",
791+
waitQueue.load(std::memory_order_relaxed));
814792
resumeWaitingTask(completedTask, assumed, /*hadErrorResult=*/false);
815793
} else {
816794
auto afterComplete = statusCompletePendingAssumeRelease(this);
817-
SWIFT_TASK_DEBUG_LOG("group(%p) offer, discardResults, either more pending tasks, or no waiting task, status:%s", this,
795+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, either more pending tasks, or no waiting task, status:%s",
818796
afterComplete.to_string(this).c_str());
819797
_swift_taskGroup_detachChild(asAbstract(this), completedTask);
820798
}
@@ -823,8 +801,8 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
823801
return;
824802
}
825803

826-
SWIFT_TASK_DEBUG_LOG("group(%p), ready: %d, pending: %llu",
827-
this, assumed.readyTasks(this), assumed.pendingTasks(this));
804+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "ready: %d, pending: %llu",
805+
assumed.readyTasks(this), assumed.pendingTasks(this));
828806

829807
auto asyncContextPrefix = reinterpret_cast<FutureAsyncContextPrefix *>(
830808
reinterpret_cast<char *>(context) - sizeof(FutureAsyncContextPrefix));
@@ -859,8 +837,8 @@ void TaskGroupImpl::resumeWaitingTask(
859837
TaskGroupImpl::GroupStatus &assumed,
860838
bool hadErrorResult) {
861839
auto waitingTask = waitQueue.load(std::memory_order_acquire);
862-
SWIFT_TASK_DEBUG_LOG("group(%p) resume waiting task = %p, complete with = %p",
863-
this, waitingTask, completedTask);
840+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume waiting task = %p, complete with = %p",
841+
waitingTask, completedTask);
864842
while (true) {
865843
// ==== a) run waiting task directly -------------------------------------
866844
assert(assumed.hasWaitingTask());
@@ -1186,7 +1164,6 @@ static void swift_taskGroup_waitAllImpl(
11861164
TaskGroup *_group,
11871165
ThrowingTaskFutureWaitContinuationFunction *resumeFunction,
11881166
AsyncContext *rawContext) {
1189-
fprintf(stderr, "[%s:%d](%s) group(%p) WAIT ALL IMPL\n", __FILE_NAME__, __LINE__, __FUNCTION__, _group);
11901167
auto waitingTask = swift_task_getCurrent();
11911168
waitingTask->ResumeTask = task_group_wait_resume_adapter;
11921169
waitingTask->ResumeContext = rawContext;
@@ -1199,8 +1176,8 @@ static void swift_taskGroup_waitAllImpl(
11991176
context->successResultPointer = resultPointer;
12001177

12011178
auto group = asImpl(_group);
1202-
SWIFT_TASK_DEBUG_LOG("group(%p) waitAll, waiting task = %p, status:%s",
1203-
group, waitingTask, group->statusLoadRelaxed().to_string(group).c_str());
1179+
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAll, waiting task = %p, status:%s",
1180+
waitingTask, group->statusLoadRelaxed().to_string(group).c_str());
12041181

12051182
PollResult polled = group->waitAll(waitingTask);
12061183
switch (polled.status) {

stdlib/public/Concurrency/TaskGroup.swift

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,12 @@ public func withTaskGroup<ChildTaskResult, GroupResult>(
105105

106106
let _group = Builtin.createTaskGroupWithFlags(flags, ChildTaskResult.self)
107107
var group = TaskGroup<ChildTaskResult>(group: _group)
108+
defer { Builtin.destroyTaskGroup(_group) }
108109

109110
// Run the withTaskGroup body.
110111
let result = await body(&group)
111112

112-
let x: ChildTaskResult? = try? await _taskGroupWaitAll(group: _group) // FIXME: should not need to throw
113-
assert(x == nil)
114-
// await group.awaitAllRemainingTasks() // FIXME: use this instead!
115-
116-
Builtin.destroyTaskGroup(_group)
113+
let _: ChildTaskResult? = try? await _taskGroupWaitAll(group: _group) // try!-safe, cannot throw since this is a non throwing group
117114
return result
118115
}
119116

@@ -201,14 +198,14 @@ public func withThrowingTaskGroup<ChildTaskResult, GroupResult>(
201198
// Run the withTaskGroup body.
202199
let result = try await body(&group)
203200

204-
await group.awaitAllRemainingTasks()
201+
_ = try? await group.awaitAllRemainingTasks()
205202
Builtin.destroyTaskGroup(_group)
206203

207204
return result
208205
} catch {
209206
group.cancelAll()
210207

211-
await group.awaitAllRemainingTasks()
208+
_ = try? await group.awaitAllRemainingTasks() // discard errors
212209
Builtin.destroyTaskGroup(_group)
213210

214211
throw error
@@ -219,6 +216,45 @@ public func withThrowingTaskGroup<ChildTaskResult, GroupResult>(
219216
#endif
220217
}
221218

219+
@available(SwiftStdlib 5.8, *)
220+
@_unsafeInheritExecutor
221+
@inlinable
222+
public func withThrowingTaskGroup<ChildTaskResult, GroupResult>(
223+
of childTaskResultType: ChildTaskResult.Type,
224+
returning returnType: GroupResult.Type = GroupResult.self,
225+
discardResults: Bool,
226+
body: (inout ThrowingTaskGroup<ChildTaskResult, Error>) async throws -> GroupResult
227+
) async throws -> GroupResult {
228+
let flags = taskGroupCreateFlags(
229+
discardResults: discardResults
230+
)
231+
232+
let _group = Builtin.createTaskGroupWithFlags(flags, ChildTaskResult.self)
233+
var group = ThrowingTaskGroup<ChildTaskResult, Error>(group: _group)
234+
235+
do {
236+
// Run the withTaskGroup body.
237+
let result = try await body(&group)
238+
239+
try await group.awaitAllRemainingTasks()
240+
Builtin.destroyTaskGroup(_group)
241+
242+
return result
243+
} catch {
244+
group.cancelAll()
245+
246+
do {
247+
try await group.awaitAllRemainingTasks()
248+
Builtin.destroyTaskGroup(_group)
249+
} catch {
250+
Builtin.destroyTaskGroup(_group)
251+
throw error
252+
}
253+
254+
throw error
255+
}
256+
}
257+
222258
/// A group that contains dynamically created child tasks.
223259
///
224260
/// To create a task group,
@@ -478,8 +514,7 @@ public struct TaskGroup<ChildTaskResult: Sendable> {
478514
/// implementation.
479515
if #available(SwiftStdlib 5.8, *) {
480516
if isDiscardingResults {
481-
let x: ChildTaskResult? = try? await _taskGroupWaitAll(group: _group) // FIXME: should not need to throw
482-
assert(x == nil)
517+
let _: ChildTaskResult? = try! await _taskGroupWaitAll(group: _group) // try!-safe, cannot throw, not throwing group
483518
return
484519
}
485520
}
@@ -514,7 +549,7 @@ public struct TaskGroup<ChildTaskResult: Sendable> {
514549
///
515550
/// - SeeAlso: ``waitForAll`` which waits for all pending tasks to complete.
516551
@available(SwiftStdlib 5.8, *)
517-
internal var isDiscardingResults: Bool {
552+
public var isDiscardingResults: Bool {
518553
_taskGroupIsDiscardingResults(group: _group) // TODO: test this
519554
}
520555

@@ -606,7 +641,20 @@ public struct ThrowingTaskGroup<ChildTaskResult: Sendable, Failure: Error> {
606641

607642
/// Await all the remaining tasks on this group.
608643
@usableFromInline
609-
internal mutating func awaitAllRemainingTasks() async {
644+
internal mutating func awaitAllRemainingTasks() async throws {
645+
/// Since 5.8, we implement "wait for all pending tasks to complete"
646+
/// in the runtime, in order to be able to handle the discard-results
647+
/// implementation.
648+
if #available(SwiftStdlib 5.8, *) {
649+
if isDiscardingResults {
650+
let _: ChildTaskResult? = try await _taskGroupWaitAll(group: _group) // if any of the tasks throws, this will "rethrow" here
651+
return
652+
}
653+
}
654+
655+
// Old implementation just consumes the group as normal end-user code would.
656+
// As it does not have to account for the `discardResults` mode.
657+
// The first error encountered while draining tasks is rethrown.
610658
while true {
611659
do {
612660
guard let _ = try await next() else {
@@ -618,13 +666,13 @@ public struct ThrowingTaskGroup<ChildTaskResult: Sendable, Failure: Error> {
618666

619667
@usableFromInline
620668
internal mutating func _waitForAll() async throws {
621-
while let _ = try? await next() { }
669+
try await self.awaitAllRemainingTasks()
622670
}
623671

624672
/// Wait for all of the group's remaining tasks to complete.
625673
@_alwaysEmitIntoClient
626674
public mutating func waitForAll() async throws {
627-
while let _ = try await next() { }
675+
try await self.awaitAllRemainingTasks()
628676
}
629677

630678
#if !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
@@ -904,6 +952,18 @@ public struct ThrowingTaskGroup<ChildTaskResult: Sendable, Failure: Error> {
904952
_taskGroupIsEmpty(_group)
905953
}
906954

955+
/// A Boolean value that indicates whether the group has been created in `discardResults` mode.
956+
///
957+
/// If true, the group will not be accumulating results and the `next()` method will always
958+
/// return `nil` immediately.
959+
///
960+
/// - SeeAlso: ``waitForAll`` which waits for all pending tasks to complete.
961+
@available(SwiftStdlib 5.8, *)
962+
public var isDiscardingResults: Bool {
963+
_taskGroupIsDiscardingResults(group: _group) // TODO: test this
964+
}
965+
966+
907967
/// Cancel all of the remaining tasks in the group.
908968
///
909969
/// After cancellation,
@@ -1143,6 +1203,7 @@ func _taskHasTaskGroupStatusRecord() -> Bool
11431203
/// Always returns `nil`.
11441204
@available(SwiftStdlib 5.8, *)
11451205
@usableFromInline
1206+
@discardableResult
11461207
@_silgen_name("swift_taskGroup_waitAll")
11471208
func _taskGroupWaitAll<T>(group: Builtin.RawPointer) async throws -> T?
11481209

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ namespace swift {
3838

3939
// Set to 1 to enable helpful debug spew to stderr
4040
// If this is enabled, tests with `swift_task_debug_log` requirement can run.
41-
#if 1
41+
#if 0
4242
#define SWIFT_TASK_DEBUG_LOG_ENABLED 1
4343
#define SWIFT_TASK_DEBUG_LOG(fmt, ...) \
4444
fprintf(stderr, "[%#lx] [%s:%d](%s) " fmt "\n", \

0 commit comments

Comments
 (0)