Skip to content

[Concurrency] Task "nurseries" or "Task scopes" #34516

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions stdlib/public/Concurrency/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ add_swift_target_library(swift_Concurrency ${SWIFT_STDLIB_LIBRARY_BUILD_TYPES} I
_TimeTypes.swift
TaskAlloc.cpp
TaskStatus.cpp
TaskNurseries.swift
Mutex.cpp

SWIFT_MODULE_DEPENDS_OSX Darwin
Expand Down
1 change: 1 addition & 0 deletions stdlib/public/Concurrency/TaskCancellation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ extension Task {
/// if the current task has been cancelled.
public struct CancellationError: Error {
// no extra information, cancellation is intended to be light-weight
public init() {}
}

}
Expand Down
132 changes: 132 additions & 0 deletions stdlib/public/Concurrency/TaskNurseries.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
////===----------------------------------------------------------------------===//
////
//// This source file is part of the Swift.org open source project
////
//// Copyright (c) 2020 Apple Inc. and the Swift project authors
//// Licensed under Apache License v2.0 with Runtime Library Exception
////
//// See https://swift.org/LICENSE.txt for license information
//// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
////
////===----------------------------------------------------------------------===//

import Swift
@_implementationOnly import _SwiftConcurrencyShims

// ==== Task Nursery -----------------------------------------------------------

extension Task {

/// Starts a new nursery which provides a scope in which a dynamic number of
/// tasks may be spawned.
///
/// Tasks added to the nursery by `nursery.add()` will automatically be
/// awaited on when the scope exits.
///
/// ### Implicit awaiting
/// When results of tasks added to the nursery need to be collected, one will
/// gather task's results using the `while let result = await nursery.next() { ... }`
/// pattern.
///
/// ### Cancellation
/// If any of the tasks throws the nursery and all of its tasks will be cancelled,
/// and the error will be re-thrown by `withNursery`.
///
/// Postcondition:
/// Once `withNursery` returns it is guaranteed that the *nursery* is *empty*.
///
/// This is achieved in the following way:
/// - if the body returns normally:
/// - the nursery will await any not yet complete tasks,
/// - if any of those tasks throws, the remaining tasks will be cancelled,
/// - once the `withNursery` returns the nursery is guaranteed to be empty.
/// - if the body throws:
/// - all tasks remaining in the nursery will be automatically cancelled.
///
// TODO: Do we have to add a different nursery type to accommodate throwing
// tasks without forcing users to use Result? I can't think of how that
// could be propagated out of the callback body reasonably, unless we
// commit to doing multi-statement closure typechecking.
public static func withNursery<TaskResult, BodyResult>(
resultType: TaskResult.Type,
returning returnType: BodyResult.Type = BodyResult.self,
body: (inout Nursery<TaskResult>) async throws -> BodyResult
) async rethrows -> BodyResult {
fatalError("\(#function) not implemented yet.")
}

/// A nursery provides a scope within which a dynamic number of tasks may be
/// started and added to the nursery.
/* @unmoveable */
public struct Nursery<TaskResult> {
/// No public initializers
private init() {}

// Swift will statically prevent this type from being copied or moved.
// For now, that implies that it cannot be used with generics.

/// Add a child task to the nursery.
///
/// ### Error handling
/// Operations are allowed to throw.
///
/// in which case the `await try next()`
/// invocation corresponding to the failed task will re-throw the given task.
///
/// - Parameters:
/// - overridingPriority: override priority of the operation task
/// - operation: operation to execute and add to the nursery
public mutating func add(
overridingPriority: Priority? = nil,
operation: () async throws -> TaskResult
) async {
fatalError("\(#function) not implemented yet.")
}

/// Add a child task and return a `Task.Handle` that can be used to manage it.
///
/// The task's result is accessible either via the returned `handle` or the
/// `nursery.next()` function (as any other `add`-ed task).
///
/// - Parameters:
/// - overridingPriority: override priority of the operation task
/// - operation: operation to execute and add to the nursery
public mutating func addWithHandle(
overridingPriority: Priority? = nil,
operation: () async throws -> TaskResult
) async -> Handle<TaskResult> {
fatalError("\(#function) not implemented yet.")
}

/// Wait for a child task to complete and return the result it returned,
/// or else return.
///
///
public mutating func next() async throws -> TaskResult? {
fatalError("\(#function) not implemented yet.")
}

/// Query whether the nursery has any remaining tasks.
///
/// Nurseries are always empty upon entry to the `withNursery` body, and
/// become empty again when `withNursery` returns (either by awaiting on all
/// pending tasks or cancelling them).
///
/// - Returns: `true` if the nursery has no pending tasks, `false` otherwise.
public var isEmpty: Bool {
fatalError("\(#function) not implemented yet.")
}

/// Cancel all the remaining tasks in the nursery.
///
/// A cancelled nursery will not will NOT accept new tasks being added into it.
///
/// Any results, including errors thrown by tasks affected by this
/// cancellation, are silently discarded.
///
/// - SeeAlso: `Task.addCancellationHandler`
public mutating func cancelAll() {
fatalError("\(#function) not implemented yet.")
}
}
}
2 changes: 1 addition & 1 deletion stdlib/public/Concurrency/TaskPrivate.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class AsyncTask;
/// Initialize the task-local allocator in the given task.
void _swift_task_alloc_initialize(AsyncTask *task);

/// Destsroy the task-local allocator in the given task.
/// Destroy the task-local allocator in the given task.
void _swift_task_alloc_destroy(AsyncTask *task);

} // end namespace swift
Expand Down
2 changes: 1 addition & 1 deletion stdlib/public/Concurrency/_TimeTypes.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ extension Task {
}

public static func microseconds(_ us: UInt64) -> Self {
.init(nanoseconds: clampedInt64Product(us, 1000))
.init(nanoseconds: clampedInt64Product(us, 1_000))
}

public static func nanoseconds(_ ns: UInt64) -> Self {
Expand Down
174 changes: 174 additions & 0 deletions test/Concurrency/async_nurseries.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// RUN: %target-typecheck-verify-swift -enable-experimental-concurrency
// REQUIRES: concurrency

func asyncFunc() async -> Int { 42 }
func asyncThrowsFunc() async throws -> Int { 42 }
func asyncThrowsOnCancel() async throws -> Int {
// terrible suspend-spin-loop -- do not do this
// only for purposes of demonstration
while await !Task.isCancelled() {
await Task.sleep(until: Task.Deadline.in(.seconds(1)))
}

throw Task.CancellationError()
}

func test_nursery_add() async throws -> Int {
await try Task.withNursery(resultType: Int.self) { nursery in
await nursery.add {
await asyncFunc()
}

await nursery.add {
await asyncFunc()
}

var sum = 0
while let v = await try nursery.next() {
sum += v
}
return sum
} // implicitly awaits
}

func test_nursery_addHandles() async throws -> Int {
await try Task.withNursery(resultType: Int.self) { nursery in
let one = await nursery.addWithHandle {
await asyncFunc()
}

let two = await nursery.addWithHandle {
await asyncFunc()
}

_ = await try one.get()
_ = await try two.get()
} // implicitly awaits
}

func test_nursery_cancel_handles() async throws {
await try Task.withNursery(resultType: Int.self) { nursery in
let one = await nursery.addWithHandle {
await try asyncThrowsOnCancel()
}

let two = await nursery.addWithHandle {
await asyncFunc()
}

_ = await try one.get()
_ = await try two.get()
} // implicitly awaits
}

// ==== ------------------------------------------------------------------------
// MARK: Example Nursery Usages

struct Boom: Error {}
func work() async -> Int { 42 }
func boom() async throws -> Int { throw Boom() }

func first_allMustSucceed() async throws {

let first: Int = await try Task.withNursery(resultType: Int.self) { nursery in
await nursery.add { await work() }
await nursery.add { await work() }
await nursery.add { await try boom() }

if let first = await try nursery.next() {
return first
} else {
fatalError("Should never happen, we either throw, or get a result from any of the tasks")
}
// implicitly await: boom
}
_ = first
// Expected: re-thrown Boom
}

func first_ignoreFailures() async throws {
func work() async -> Int { 42 }
func boom() async throws -> Int { throw Boom() }

let first: Int = await try Task.withNursery(resultType: Int.self) { nursery in
await nursery.add { await work() }
await nursery.add { await work() }
await nursery.add {
do {
return await try boom()
} catch {
return 0 // TODO: until await try? works properly
}
}

var result: Int = 0
while let v = await try nursery.next() {
result = v

if result != 0 {
break
}
}

return result
}
_ = first
// Expected: re-thrown Boom
}

// ==== ------------------------------------------------------------------------
// MARK: Advanced Custom Nursery Usage

func test_nursery_quorum_thenCancel() async {
// imitates a typical "gather quorum" routine that is typical in distributed systems programming
enum Vote {
case yay
case nay
}
struct Follower {
init(_ name: String) {}
func vote() async throws -> Vote {
// "randomly" vote yes or no
return .yay
}
}

/// Performs a simple quorum vote among the followers.
///
/// - Returns: `true` iff `N/2 + 1` followers return `.yay`, `false` otherwise.
func gatherQuorum(followers: [Follower]) async -> Bool {
await try! Task.withNursery(resultType: Vote.self) { nursery in
for follower in followers {
await nursery.add { await try follower.vote() }
}

defer {
nursery.cancelAll()
}

var yays: Int = 0
var nays: Int = 0
let quorum = Int(followers.count / 2) + 1
while let vote = await try nursery.next() {
switch vote {
case .yay:
yays += 1
if yays >= quorum {
// cancel all remaining voters, we already reached quorum
return true
}
case .nay:
nays += 1
if nays >= quorum {
return false
}
}
}

return false
}
}

_ = await gatherQuorum(followers: [Follower("A"), Follower("B"), Follower("C")])
}