Skip to content

[Concurrency] waitForAll and next of TaskGroups must inherit isolation #72794

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 49 additions & 10 deletions stdlib/public/Concurrency/TaskGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public func withTaskGroup<ChildTaskResult, GroupResult>(
// Run the withTaskGroup body.
let result = await body(&group)

// TODO(concurrency): should get isolation from param from withThrowingTaskGroup
await group.awaitAllRemainingTasks()

Builtin.destroyTaskGroup(_group)
Expand Down Expand Up @@ -183,13 +184,15 @@ public func withThrowingTaskGroup<ChildTaskResult, GroupResult>(
// Run the withTaskGroup body.
let result = try await body(&group)

// TODO(concurrency): should get isolation from param from withThrowingTaskGroup
await group.awaitAllRemainingTasks()
Builtin.destroyTaskGroup(_group)

return result
} catch {
group.cancelAll()

// TODO(concurrency): should get isolation from param from withThrowingTaskGroup
await group.awaitAllRemainingTasks()
Builtin.destroyTaskGroup(_group)

Expand Down Expand Up @@ -563,22 +566,41 @@ public struct TaskGroup<ChildTaskResult: Sendable> {
/// that method can't be called from a concurrent execution context like a child task.
///
/// - Returns: The value returned by the next child task that completes.
public mutating func next() async -> ChildTaskResult? {
@available(SwiftStdlib 5.1, *)
@backDeployed(before: SwiftStdlib 6.0)
public mutating func next(isolation: isolated (any Actor)? = #isolation) async -> ChildTaskResult? {
// try!-safe because this function only exists for Failure == Never,
// and as such, it is impossible to spawn a throwing child task.
return try! await _taskGroupWaitNext(group: _group) // !-safe cannot throw, we're a non-throwing TaskGroup
}

@usableFromInline
@available(SwiftStdlib 5.1, *)
@_silgen_name("$sScG4nextxSgyYaF")
internal mutating func __abi_next() async -> ChildTaskResult? {
// try!-safe because this function only exists for Failure == Never,
// and as such, it is impossible to spawn a throwing child task.
return try! await _taskGroupWaitNext(group: _group) // !-safe cannot throw, we're a non-throwing TaskGroup
}

/// Await all of the pending tasks added this group.
@usableFromInline
@available(SwiftStdlib 5.1, *)
@backDeployed(before: SwiftStdlib 6.0)
internal mutating func awaitAllRemainingTasks(isolation: isolated (any Actor)? = #isolation) async {
while let _ = await next(isolation: isolation) {}
}

@usableFromInline
@available(SwiftStdlib 5.1, *)
internal mutating func awaitAllRemainingTasks() async {
while let _ = await next() {}
while let _ = await next(isolation: nil) {}
}

/// Wait for all of the group's remaining tasks to complete.
@_alwaysEmitIntoClient
public mutating func waitForAll() async {
await awaitAllRemainingTasks()
public mutating func waitForAll(isolation: isolated (any Actor)? = #isolation) async {
await awaitAllRemainingTasks(isolation: isolation)
}

/// A Boolean value that indicates whether the group has any remaining tasks.
Expand Down Expand Up @@ -703,16 +725,24 @@ public struct ThrowingTaskGroup<ChildTaskResult: Sendable, Failure: Error> {

/// Await all the remaining tasks on this group.
@usableFromInline
internal mutating func awaitAllRemainingTasks() async {
@available(SwiftStdlib 5.1, *)
@backDeployed(before: SwiftStdlib 6.0)
internal mutating func awaitAllRemainingTasks(isolation: isolated (any Actor)? = #isolation) async {
while true {
do {
guard let _ = try await next() else {
guard let _ = try await next(isolation: isolation) else {
return
}
} catch {}
}
}

@usableFromInline
@available(SwiftStdlib 5.1, *)
internal mutating func awaitAllRemainingTasks() async {
await awaitAllRemainingTasks(isolation: nil)
}

@usableFromInline
internal mutating func _waitForAll() async throws {
await self.awaitAllRemainingTasks()
Expand Down Expand Up @@ -750,7 +780,7 @@ public struct ThrowingTaskGroup<ChildTaskResult: Sendable, Failure: Error> {
/// - Throws: The *first* error that was thrown by a child task during draining all the tasks.
/// This first error is stored until all other tasks have completed, and is re-thrown afterwards.
@_alwaysEmitIntoClient
public mutating func waitForAll() async throws {
public mutating func waitForAll(isolation: isolated (any Actor)? = #isolation) async throws {
var firstError: Error? = nil

// Make sure we loop until all child tasks have completed
Expand Down Expand Up @@ -999,7 +1029,16 @@ public struct ThrowingTaskGroup<ChildTaskResult: Sendable, Failure: Error> {
/// - Throws: The error thrown by the next child task that completes.
///
/// - SeeAlso: `nextResult()`
public mutating func next() async throws -> ChildTaskResult? {
@available(SwiftStdlib 5.1, *)
@backDeployed(before: SwiftStdlib 6.0)
public mutating func next(isolation: isolated (any Actor)? = #isolation) async throws -> ChildTaskResult? {
return try await _taskGroupWaitNext(group: _group)
}

@usableFromInline
@available(SwiftStdlib 5.1, *)
@_silgen_name("$sScg4nextxSgyYaKF")
internal mutating func __abi_next() async throws -> ChildTaskResult? {
return try await _taskGroupWaitNext(group: _group)
}

Expand Down Expand Up @@ -1052,7 +1091,7 @@ public struct ThrowingTaskGroup<ChildTaskResult: Sendable, Failure: Error> {
///
/// - SeeAlso: `next()`
@_alwaysEmitIntoClient
public mutating func nextResult() async -> Result<ChildTaskResult, Failure>? {
public mutating func nextResult(isolation: isolated (any Actor)? = #isolation) async -> Result<ChildTaskResult, Failure>? {
return try! await nextResultForABI()
}

Expand Down Expand Up @@ -1332,7 +1371,7 @@ func _taskGroupIsCancelled(group: Builtin.RawPointer) -> Bool

@available(SwiftStdlib 5.1, *)
@_silgen_name("swift_taskGroup_wait_next_throwing")
func _taskGroupWaitNext<T>(group: Builtin.RawPointer) async throws -> T?
public func _taskGroupWaitNext<T>(group: Builtin.RawPointer) async throws -> T?

@available(SwiftStdlib 5.1, *)
@_silgen_name("swift_task_hasTaskGroupStatusRecord")
Expand Down
36 changes: 36 additions & 0 deletions test/Concurrency/async_task_groups_and_actors.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// RUN: %target-swift-frontend -disable-availability-checking %s -emit-sil -o /dev/null -verify -strict-concurrency=complete -enable-upcoming-feature RegionBasedIsolation

// REQUIRES: concurrency
// REQUIRES: asserts
// REQUIRES: libdispatch

@MainActor
class MyActor {
func check() async throws {
await withTaskGroup(of: Int.self) { group in
group.addTask {
2
}
await group.waitForAll()
}

try await withThrowingTaskGroup(of: Int.self) { throwingGroup in
throwingGroup.addTask {
2
}
try await throwingGroup.waitForAll()
}

await withDiscardingTaskGroup { discardingGroup in
discardingGroup.addTask {
()
}
}

try await withThrowingDiscardingTaskGroup { throwingDiscardingGroup in
throwingDiscardingGroup.addTask {
()
}
}
}
}
13 changes: 13 additions & 0 deletions test/abi/macOS/arm64/concurrency.swift
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,19 @@ Added: _$ss26withTaskExecutorPreference_9isolation9operationxSch_pSg_ScA_pSgYixy
// async function pointer to Swift.withTaskExecutorPreference<A, B where B: Swift.Error>(_: Swift.TaskExecutor?, isolation: isolated Swift.Actor?, operation: () async throws(B) -> A) async throws(B) -> A
Added: _$ss26withTaskExecutorPreference_9isolation9operationxSch_pSg_ScA_pSgYixyYaq_YKXEtYaq_YKs5ErrorR_r0_lFTu

// === Add #isolation to next() and waitForAll() in task groups
// Swift.TaskGroup.awaitAllRemainingTasks(isolation: isolated Swift.Actor?) async -> ()
Added: _$sScG22awaitAllRemainingTasks9isolationyScA_pSgYi_tYaF
Added: _$sScG22awaitAllRemainingTasks9isolationyScA_pSgYi_tYaFTu
// Swift.TaskGroup.next(isolation: isolated Swift.Actor?) async -> A?
Added: _$sScG4next9isolationxSgScA_pSgYi_tYaF
Added: _$sScG4next9isolationxSgScA_pSgYi_tYaFTu
// Swift.ThrowingTaskGroup.next(isolation: isolated Swift.Actor?) async throws -> A?
Added: _$sScg4next9isolationxSgScA_pSgYi_tYaKF
Added: _$sScg4next9isolationxSgScA_pSgYi_tYaKFTu
// Swift.ThrowingTaskGroup.awaitAllRemainingTasks(isolation: isolated Swift.Actor?) async -> ()
Added: _$sScg22awaitAllRemainingTasks9isolationyScA_pSgYi_tYaF
Added: _$sScg22awaitAllRemainingTasks9isolationyScA_pSgYi_tYaFTu

// next() default implementation in terms of next(isolation:)
Added: _$sScIsE4next7ElementQzSgyYa7FailureQzYKF
Expand Down
13 changes: 13 additions & 0 deletions test/abi/macOS/x86_64/concurrency.swift
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,19 @@ Added: _$ss26withTaskExecutorPreference_9isolation9operationxSch_pSg_ScA_pSgYixy
// async function pointer to Swift.withTaskExecutorPreference<A, B where B: Swift.Error>(_: Swift.TaskExecutor?, isolation: isolated Swift.Actor?, operation: () async throws(B) -> A) async throws(B) -> A
Added: _$ss26withTaskExecutorPreference_9isolation9operationxSch_pSg_ScA_pSgYixyYaq_YKXEtYaq_YKs5ErrorR_r0_lFTu

// === Add #isolation to next() and waitForAll() in task groups
// Swift.TaskGroup.awaitAllRemainingTasks(isolation: isolated Swift.Actor?) async -> ()
Added: _$sScG22awaitAllRemainingTasks9isolationyScA_pSgYi_tYaF
Added: _$sScG22awaitAllRemainingTasks9isolationyScA_pSgYi_tYaFTu
// Swift.TaskGroup.next(isolation: isolated Swift.Actor?) async -> A?
Added: _$sScG4next9isolationxSgScA_pSgYi_tYaF
Added: _$sScG4next9isolationxSgScA_pSgYi_tYaFTu
// Swift.ThrowingTaskGroup.next(isolation: isolated Swift.Actor?) async throws -> A?
Added: _$sScg4next9isolationxSgScA_pSgYi_tYaKF
Added: _$sScg4next9isolationxSgScA_pSgYi_tYaKFTu
// Swift.ThrowingTaskGroup.awaitAllRemainingTasks(isolation: isolated Swift.Actor?) async -> ()
Added: _$sScg22awaitAllRemainingTasks9isolationyScA_pSgYi_tYaF
Added: _$sScg22awaitAllRemainingTasks9isolationyScA_pSgYi_tYaFTu

// next() default implementation in terms of next(isolation:)
Added: _$sScIsE4next7ElementQzSgyYa7FailureQzYKF
Expand Down
8 changes: 8 additions & 0 deletions test/api-digester/stability-concurrency-abi.test
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ Func Executor.enqueue(_:) is a new API without @available attribute
// This function correctly inherits its availability from the TaskLocal struct.
Func TaskLocal.withValueImpl(_:operation:file:line:) is a new API without @available attribute

// The method is actually still there: '__abi_next' silgen_name("$sScG4nextxSgyYaF")
Func TaskGroup.next() has been renamed to Func next(isolation:)
Func TaskGroup.next() has mangled name changing from 'Swift.TaskGroup.next() async -> Swift.Optional<A>' to 'Swift.TaskGroup.next(isolation: isolated Swift.Optional<Swift.Actor>) async -> Swift.Optional<A>'

// The method is actually still there: '__abi_next' silgen_name("$sScg4nextxSgyYaKF")
Func ThrowingTaskGroup.next() has been renamed to Func next(isolation:)
Func ThrowingTaskGroup.next() has mangled name changing from 'Swift.ThrowingTaskGroup.next() async throws -> Swift.Optional<A>' to 'Swift.ThrowingTaskGroup.next(isolation: isolated Swift.Optional<Swift.Actor>) async throws -> Swift.Optional<A>'

// *** DO NOT DISABLE OR XFAIL THIS TEST. *** (See comment above.)


Expand Down