Skip to content

Commit 59d1e61

Browse files
committed
[SE-0304] Implement cancellable Task.sleep(nanoseconds:).
1 parent 20c8bd1 commit 59d1e61

File tree

6 files changed

+258
-21
lines changed

6 files changed

+258
-21
lines changed

stdlib/public/Concurrency/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ add_swift_target_library(swift_Concurrency ${SWIFT_STDLIB_LIBRARY_BUILD_TYPES} I
7070
TaskGroup.swift
7171
TaskLocal.cpp
7272
TaskLocal.swift
73+
TaskSleep.swift
7374
ThreadSanitizer.cpp
7475
Mutex.cpp
7576
AsyncStreamBuffer.swift

stdlib/public/Concurrency/Task.swift

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -615,25 +615,6 @@ extension Task where Failure == Error {
615615
}
616616
}
617617

618-
// ==== Async Sleep ------------------------------------------------------------
619-
620-
@available(SwiftStdlib 5.5, *)
621-
extension Task where Success == Never, Failure == Never {
622-
/// Suspends the current task for _at least_ the given duration
623-
/// in nanoseconds.
624-
///
625-
/// This function does _not_ block the underlying thread.
626-
public static func sleep(_ duration: UInt64) async {
627-
let currentTask = Builtin.getCurrentAsyncTask()
628-
let priority = getJobFlags(currentTask).priority ?? Task.currentPriority._downgradeUserInteractive
629-
630-
return await Builtin.withUnsafeContinuation { (continuation: Builtin.RawUnsafeContinuation) -> Void in
631-
let job = _taskCreateNullaryContinuationJob(priority: Int(priority.rawValue), continuation: continuation)
632-
_enqueueJobGlobalWithDelay(duration, job)
633-
}
634-
}
635-
}
636-
637618
// ==== Voluntary Suspension -----------------------------------------------------
638619

639620
@available(SwiftStdlib 5.5, *)

stdlib/public/Concurrency/TaskCancellation.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public struct CancellationError: Error {
112112

113113
@available(SwiftStdlib 5.5, *)
114114
@_silgen_name("swift_task_addCancellationHandler")
115-
func _taskAddCancellationHandler(handler: @Sendable () -> Void) -> UnsafeRawPointer /*CancellationNotificationStatusRecord*/
115+
func _taskAddCancellationHandler(handler: () -> Void) -> UnsafeRawPointer /*CancellationNotificationStatusRecord*/
116116

117117
@available(SwiftStdlib 5.5, *)
118118
@_silgen_name("swift_task_removeCancellationHandler")

stdlib/public/Concurrency/TaskGroup.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,8 @@ public struct ThrowingTaskGroup<ChildTaskResult, Failure: Error> {
431431
}
432432
}
433433

434-
public mutating func _waitForAll() async throws {
434+
@usableFromInline
435+
internal mutating func _waitForAll() async throws {
435436
while let _ = try await next() { }
436437
}
437438

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift.org open source project
4+
//
5+
// Copyright (c) 2020 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
10+
//
11+
//===----------------------------------------------------------------------===//
12+
import Swift
13+
@_implementationOnly import _SwiftConcurrencyShims
14+
15+
@available(SwiftStdlib 5.5, *)
16+
extension Task where Success == Never, Failure == Never {
17+
/// Suspends the current task for _at least_ the given duration
18+
/// in nanoseconds.
19+
///
20+
/// This function does _not_ block the underlying thread.
21+
public static func sleep(_ duration: UInt64) async {
22+
let currentTask = Builtin.getCurrentAsyncTask()
23+
let priority = getJobFlags(currentTask).priority ?? Task.currentPriority._downgradeUserInteractive
24+
25+
return await Builtin.withUnsafeContinuation { (continuation: Builtin.RawUnsafeContinuation) -> Void in
26+
let job = _taskCreateNullaryContinuationJob(priority: Int(priority.rawValue), continuation: continuation)
27+
_enqueueJobGlobalWithDelay(duration, job)
28+
}
29+
}
30+
31+
/// The type of continuation used in the implementation of
32+
/// sleep(nanoseconds:).
33+
private typealias SleepContinuation = UnsafeContinuation<(), Error>
34+
35+
/// Called when the sleep(nanoseconds:) operation woke up without being
36+
/// cancelled.
37+
private static func onSleepWake(
38+
_ wordPtr: UnsafeMutablePointer<Builtin.Word>,
39+
_ continuation: UnsafeContinuation<(), Error>
40+
) {
41+
// Indicate that we've finished by putting a "1" into the flag word.
42+
let (_, won) = Builtin.cmpxchg_seqcst_seqcst_Word(
43+
wordPtr._rawValue,
44+
UInt(0)._builtinWordValue,
45+
UInt(1)._builtinWordValue)
46+
47+
if Bool(_builtinBooleanLiteral: won) {
48+
// The sleep finished, invoke the continuation.
49+
continuation.resume()
50+
} else {
51+
// The task was cancelled first, which means the continuation was
52+
// called by the cancellation handler. We need to deallocate up the flag
53+
// word, because it was left over for this task to complete.
54+
wordPtr.deallocate()
55+
}
56+
}
57+
58+
/// Called when the sleep(nanoseconds:) operation has been cancelled before
59+
/// the sleep completed.
60+
private static func onSleepCancel(
61+
_ wordPtr: UnsafeMutablePointer<Builtin.Word>,
62+
_ continuation: UnsafeContinuation<(), Error>
63+
) {
64+
// Indicate that we've finished by putting a "2" into the flag word.
65+
let (_, won) = Builtin.cmpxchg_seqcst_seqcst_Word(
66+
wordPtr._rawValue,
67+
UInt(0)._builtinWordValue,
68+
UInt(2)._builtinWordValue)
69+
70+
if Bool(_builtinBooleanLiteral: won) {
71+
// We recorded the task cancellation before the sleep finished, so
72+
// invoke the continuation with a the cancellation error.
73+
continuation.resume(throwing: _Concurrency.CancellationError())
74+
}
75+
}
76+
77+
/// Suspends the current task for _at least_ the given duration
78+
/// in nanoseconds, unless the task is cancelled. If the task is cancelled,
79+
/// throws \c CancellationError.
80+
///
81+
/// This function does _not_ block the underlying thread.
82+
public static func sleep(nanoseconds duration: UInt64) async throws {
83+
// If the task was already cancelled, go ahead and throw now.
84+
try checkCancellation()
85+
86+
// Allocate storage for the flag word and continuation.
87+
let wordPtr = UnsafeMutablePointer<Builtin.Word>.allocate(capacity: 2)
88+
89+
// Initialize the flag word to 0, which means the continuation has not
90+
// executed.
91+
Builtin.atomicstore_seqcst_Word(
92+
wordPtr._rawValue, UInt(0)._builtinWordValue)
93+
94+
// A pointer to the storage continuation. Also initialize it to zero, to
95+
// indicate that there is no continuation.
96+
let continuationPtr = wordPtr + 1
97+
Builtin.atomicstore_seqcst_Word(
98+
continuationPtr._rawValue, UInt(0)._builtinWordValue)
99+
100+
do {
101+
// Install a cancellation handler to resume the continuation by
102+
// throwing CancellationError.
103+
try await withTaskCancellationHandler {
104+
let _: () = try await withUnsafeThrowingContinuation { continuation in
105+
// Stash the continuation so the cancellation handler can see it.
106+
Builtin.atomicstore_seqcst_Word(
107+
continuationPtr._rawValue,
108+
unsafeBitCast(continuation, to: Builtin.Word.self))
109+
110+
// Create a task that resumes the continuation normally if it
111+
// finishes first. Enqueue it directly with the delay, so it fires
112+
// when we're done sleeping.
113+
let sleepTaskFlags = taskCreateFlags(
114+
priority: nil, isChildTask: false, copyTaskLocals: false,
115+
inheritContext: false, enqueueJob: false,
116+
addPendingGroupTaskUnconditionally: false)
117+
let (sleepTask, _) = Builtin.createAsyncTask(sleepTaskFlags) {
118+
onSleepWake(wordPtr, continuation)
119+
}
120+
_enqueueJobGlobalWithDelay(
121+
duration, Builtin.convertTaskToJob(sleepTask))
122+
}
123+
} onCancel: {
124+
let continuationWord = continuationPtr.pointee
125+
if UInt(continuationWord) != 0 {
126+
// Try to cancel, which will resume the continuation by throwing a
127+
// CancellationError if the continuation hasn't already been resumed.
128+
continuationPtr.withMemoryRebound(
129+
to: SleepContinuation.self, capacity: 1) {
130+
onSleepCancel(wordPtr, $0.pointee)
131+
}
132+
}
133+
}
134+
135+
// We got here without being cancelled, so deallocate the storage for
136+
// the flag word and continuation.
137+
wordPtr.deallocate()
138+
} catch {
139+
// The task was cancelled; propagate the error. The "on wake" task is
140+
// responsible for deallocating the flag word.
141+
throw error
142+
}
143+
}
144+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency %import-libdispatch -parse-as-library) | %FileCheck %s --dump-input always
2+
// REQUIRES: executable_test
3+
// REQUIRES: concurrency
4+
// REQUIRES: libdispatch
5+
6+
// rdar://76038845
7+
// UNSUPPORTED: use_os_stdlib
8+
// UNSUPPORTED: back_deployment_runtime
9+
10+
import _Concurrency
11+
// FIXME: should not depend on Dispatch
12+
import Dispatch
13+
14+
@available(SwiftStdlib 5.5, *)
15+
@main struct Main {
16+
static let pause = 500_000_000 // 500ms
17+
18+
static func main() async {
19+
// CHECK: Starting!
20+
print("Starting!")
21+
await testSleepFinished()
22+
await testSleepCancelledBeforeStarted()
23+
await testSleepCancelled()
24+
}
25+
26+
static func testSleepFinished() async {
27+
// CHECK-NEXT: Testing sleep that completes
28+
print("Testing sleep that completes")
29+
let start = DispatchTime.now()
30+
31+
// try! will fail if the task got cancelled (which shouldn't happen).
32+
try! await Task.sleep(nanoseconds: UInt64(pause))
33+
34+
let stop = DispatchTime.now()
35+
36+
// assert that at least the specified time passed since calling `sleep`
37+
assert(stop >= (start + .nanoseconds(pause)))
38+
39+
// CHECK-NEXT: Wakey wakey!
40+
print("Wakey wakey!")
41+
}
42+
43+
static func testSleepCancelledBeforeStarted() async {
44+
// CHECK-NEXT: Testing sleep that gets cancelled before it starts
45+
print("Testing sleep that gets cancelled before it starts")
46+
let start = DispatchTime.now()
47+
48+
let sleepyTask = Task {
49+
try await Task.sleep(nanoseconds: UInt64(pause))
50+
}
51+
52+
do {
53+
sleepyTask.cancel()
54+
try await sleepyTask.value
55+
56+
fatalError("sleep(nanoseconds:) should have thrown CancellationError")
57+
} catch is CancellationError {
58+
// CHECK-NEXT: Caught the cancellation error
59+
print("Caught the cancellation error")
60+
61+
let stop = DispatchTime.now()
62+
63+
// assert that we stopped early.
64+
assert(stop < (start + .nanoseconds(pause)))
65+
} catch {
66+
fatalError("sleep(nanoseconds:) threw some other error: \(error)")
67+
}
68+
69+
// CHECK-NEXT: Cancelled!
70+
print("Cancelled!")
71+
}
72+
73+
static func testSleepCancelled() async {
74+
// CHECK-NEXT: Testing sleep that gets cancelled before it completes
75+
print("Testing sleep that gets cancelled before it completes")
76+
let start = DispatchTime.now()
77+
78+
let sleepyTask = Task {
79+
try await Task.sleep(nanoseconds: UInt64(pause))
80+
}
81+
82+
do {
83+
let waiterTask = Task {
84+
try await sleepyTask.value
85+
}
86+
87+
let cancellerTask = Task {
88+
await Task.sleep(UInt64(pause / 2))
89+
sleepyTask.cancel()
90+
}
91+
92+
try await waiterTask.value
93+
94+
fatalError("sleep(nanoseconds:) should have thrown CancellationError")
95+
} catch is CancellationError {
96+
// CHECK-NEXT: Caught the cancellation error
97+
print("Caught the cancellation error")
98+
99+
let stop = DispatchTime.now()
100+
101+
// assert that we stopped early.
102+
assert(stop < (start + .nanoseconds(pause)))
103+
} catch {
104+
fatalError("sleep(nanoseconds:) threw some other error: \(error)")
105+
}
106+
107+
// CHECK-NEXT: Cancelled!
108+
print("Cancelled!")
109+
}
110+
}

0 commit comments

Comments
 (0)