Skip to content

Commit 531b080

Browse files
authored
Merge pull request #35983 from drexin/wip-async-sleep
[Concurrency] Add asynchronous Task.sleep function
2 parents 931c47d + 54c7c76 commit 531b080

24 files changed

+194
-146
lines changed

include/swift/Runtime/Concurrency.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,9 @@ void swift_task_enqueue(Job *job, ExecutorRef executor);
486486
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
487487
void swift_task_enqueueGlobal(Job *job);
488488

489+
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
490+
void swift_task_enqueueGlobalWithDelay(unsigned long long delay, Job *job);
491+
489492
/// FIXME: only exists for the quick-and-dirty MainActor implementation.
490493
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
491494
void swift_task_enqueueMainExecutor(Job *job);
@@ -499,6 +502,11 @@ void swift_MainActor_register(HeapObject *actor);
499502
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
500503
void (*swift_task_enqueueGlobal_hook)(Job *job);
501504

505+
/// A hook to take over global enqueuing with delay.
506+
/// TODO: figure out a better abstraction plan than this.
507+
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
508+
void (*swift_task_enqueueGlobalWithDelay_hook)(unsigned long long delay, Job *job);
509+
502510
/// Initialize the runtime storage for a default actor.
503511
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
504512
void swift_defaultActor_initialize(DefaultActor *actor);

stdlib/public/Concurrency/GlobalExecutor.cpp

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,27 @@ using namespace swift;
6363
SWIFT_CC(swift)
6464
void (*swift::swift_task_enqueueGlobal_hook)(Job *job) = nullptr;
6565

66+
SWIFT_CC(swift)
67+
void (*swift::swift_task_enqueueGlobalWithDelay_hook)(unsigned long long delay, Job *job) = nullptr;
68+
6669
#if SWIFT_CONCURRENCY_COOPERATIVE_GLOBAL_EXECUTOR
70+
71+
#include <chrono>
72+
#include <thread>
73+
6774
static Job *JobQueue = nullptr;
6875

76+
class DelayedJob {
77+
public:
78+
Job *job;
79+
unsigned long long when;
80+
DelayedJob *next;
81+
82+
DelayedJob(Job *job, unsigned long long when) : job(job), when(when), next(nullptr) {}
83+
};
84+
85+
static DelayedJob *DelayedJobQueue = nullptr;
86+
6987
/// Get the next-in-queue storage slot.
7088
static Job *&nextInQueue(Job *cur) {
7189
return reinterpret_cast<Job*&>(cur->SchedulerPrivate);
@@ -89,13 +107,58 @@ static void insertIntoJobQueue(Job *newJob) {
89107
*position = newJob;
90108
}
91109

110+
static unsigned long long currentNanos() {
111+
auto now = std::chrono::steady_clock::now();
112+
auto nowNanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
113+
auto value = std::chrono::duration_cast<std::chrono::nanoseconds>(nowNanos.time_since_epoch());
114+
return value.count();
115+
}
116+
117+
/// Insert a job into the cooperative global queue.
118+
static void insertIntoDelayedJobQueue(unsigned long long delay, Job *job) {
119+
DelayedJob **position = &DelayedJobQueue;
120+
DelayedJob *newJob = new DelayedJob(job, currentNanos() + delay);
121+
122+
while (auto cur = *position) {
123+
// If we find a job with lower priority, insert here.
124+
if (cur->when > newJob->when) {
125+
newJob->next = cur;
126+
*position = newJob;
127+
return;
128+
}
129+
130+
// Otherwise, keep advancing through the queue.
131+
position = &cur->next;
132+
}
133+
*position = newJob;
134+
}
135+
92136
/// Claim the next job from the cooperative global queue.
93137
static Job *claimNextFromJobQueue() {
94-
if (auto job = JobQueue) {
95-
JobQueue = nextInQueue(job);
96-
return job;
138+
// Check delayed jobs first
139+
while (true) {
140+
if (auto delayedJob = DelayedJobQueue) {
141+
if (delayedJob->when < currentNanos()) {
142+
DelayedJobQueue = delayedJob->next;
143+
auto job = delayedJob->job;
144+
145+
delete delayedJob;
146+
147+
return job;
148+
}
149+
}
150+
if (auto job = JobQueue) {
151+
JobQueue = nextInQueue(job);
152+
return job;
153+
}
154+
// there are only delayed jobs left, but they are not ready,
155+
// so we sleep until the first one is
156+
if (auto delayedJob = DelayedJobQueue) {
157+
std::this_thread::sleep_for(std::chrono::nanoseconds(delayedJob->when - currentNanos()));
158+
continue;
159+
}
160+
return nullptr;
97161
}
98-
return nullptr;
99162
}
100163

101164
void swift::donateThreadToGlobalExecutorUntil(bool (*condition)(void *),
@@ -177,6 +240,30 @@ void swift::swift_task_enqueueGlobal(Job *job) {
177240
#endif
178241
}
179242

243+
void swift::swift_task_enqueueGlobalWithDelay(unsigned long long delay, Job *job) {
244+
assert(job && "no job provided");
245+
246+
// If the hook is defined, use it.
247+
if (swift_task_enqueueGlobalWithDelay_hook)
248+
return swift_task_enqueueGlobalWithDelay_hook(delay, job);
249+
250+
#if SWIFT_CONCURRENCY_COOPERATIVE_GLOBAL_EXECUTOR
251+
insertIntoDelayedJobQueue(delay, job);
252+
#else
253+
254+
dispatch_function_t dispatchFunction = &__swift_run_job;
255+
void *dispatchContext = job;
256+
257+
JobPriority priority = job->getPriority();
258+
259+
// TODO: cache this to avoid the extra call
260+
auto queue = dispatch_get_global_queue((dispatch_qos_class_t) priority,
261+
/*flags*/ 0);
262+
dispatch_time_t when = dispatch_time(DISPATCH_TIME_NOW, delay);
263+
dispatch_after_f(when, queue, dispatchContext, dispatchFunction);
264+
#endif
265+
}
266+
180267

181268
/// Enqueues a task on the main executor.
182269
/// FIXME: only exists for the quick-and-dirty MainActor implementation.

stdlib/public/Concurrency/Task.swift

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,32 @@ public func _runAsyncHandler(operation: @escaping () async -> ()) {
472472
)
473473
}
474474

475+
// ==== Async Sleep ------------------------------------------------------------
476+
477+
extension Task {
478+
/// Suspends the current task for _at least_ the given duration
479+
/// in nanoseconds.
480+
///
481+
/// This function does _not_ block the underlying thread.
482+
public static func sleep(_ duration: UInt64) async {
483+
// Set up the job flags for a new task.
484+
var flags = JobFlags()
485+
flags.kind = .task
486+
flags.priority = .default
487+
flags.isFuture = true
488+
489+
// Create the asynchronous task future.
490+
// FIXME: This should be an empty closure instead. Returning `0` here is
491+
// a workaround for rdar://74957357
492+
let (task, _) = Builtin.createAsyncTaskFuture(flags.bits, nil, { return 0 })
493+
494+
// Enqueue the resulting job.
495+
_enqueueJobGlobalWithDelay(duration, Builtin.convertTaskToJob(task))
496+
497+
let _ = await Handle<Int, Never>(task).get()
498+
}
499+
}
500+
475501
// ==== UnsafeCurrentTask ------------------------------------------------------
476502

477503
extension Task {
@@ -571,6 +597,10 @@ func getJobFlags(_ task: Builtin.NativeObject) -> Task.JobFlags
571597
@usableFromInline
572598
func _enqueueJobGlobal(_ task: Builtin.Job)
573599

600+
@_silgen_name("swift_task_enqueueGlobalWithDelay")
601+
@usableFromInline
602+
func _enqueueJobGlobalWithDelay(_ delay: UInt64, _ task: Builtin.Job)
603+
574604
@available(*, deprecated)
575605
@_silgen_name("swift_task_runAndBlockThread")
576606
public func runAsyncAndBlock(_ asyncFun: @escaping () async -> ())

test/Concurrency/Runtime/actor_counters.swift

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,6 @@
44
// REQUIRES: concurrency
55
// REQUIRES: libdispatch
66

7-
#if canImport(Darwin)
8-
import Darwin
9-
#elseif canImport(Glibc)
10-
import Glibc
11-
#endif
12-
137
actor Counter {
148
private var value = 0
159
private let scratchBuffer: UnsafeMutableBufferPointer<Int>
@@ -53,7 +47,7 @@ func runTest(numCounters: Int, numWorkers: Int, numIterations: Int) async {
5347
for i in 0..<numWorkers {
5448
workers.append(
5549
Task.runDetached { [counters] in
56-
usleep(UInt32.random(in: 0..<100) * 1000)
50+
await Task.sleep(UInt64.random(in: 0..<100) * 1_000_000)
5751
await worker(identity: i, counters: counters, numIterations: numIterations)
5852
}
5953
)

test/Concurrency/Runtime/async_let_fibonacci.swift

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,6 @@
44
// REQUIRES: concurrency
55
// REQUIRES: libdispatch
66

7-
#if canImport(Darwin)
8-
import Darwin
9-
#elseif canImport(Glibc)
10-
import Glibc
11-
#endif
12-
137
func fib(_ n: Int) -> Int {
148
var first = 0
159
var second = 1
@@ -30,12 +24,12 @@ func asyncFib(_ n: Int) async -> Int {
3024
async let second = await asyncFib(n-1)
3125

3226
// Sleep a random amount of time waiting on the result producing a result.
33-
usleep(UInt32.random(in: 0..<100) * 1000)
27+
await Task.sleep(UInt64.random(in: 0..<100) * 1_000_000)
3428

3529
let result = await first + second
3630

3731
// Sleep a random amount of time before producing a result.
38-
usleep(UInt32.random(in: 0..<100) * 1000)
32+
await Task.sleep(UInt64.random(in: 0..<100) * 1_000_000)
3933

4034
return result
4135
}

test/Concurrency/Runtime/async_task_cancellation_early.swift

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,11 @@
66

77
import Dispatch
88

9-
#if canImport(Darwin)
10-
import Darwin
11-
#elseif canImport(Glibc)
12-
import Glibc
13-
#endif
14-
159
func test_runDetached_cancel_child_early() async {
1610
print(#function) // CHECK: test_runDetached_cancel_child_early
1711
let h: Task.Handle<Bool, Error> = Task.runDetached {
1812
async let childCancelled: Bool = { () -> Bool in
19-
sleep(2)
13+
await Task.sleep(2_000_000_000)
2014
return Task.isCancelled
2115
}()
2216

test/Concurrency/Runtime/async_task_cancellation_while_running.swift

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,10 @@
66

77
import Dispatch
88

9-
#if canImport(Darwin)
10-
import Darwin
11-
#elseif canImport(Glibc)
12-
import Glibc
13-
#endif
14-
159
func test_runDetached_cancel_while_child_running() async {
1610
let h: Task.Handle<Bool, Error> = Task.runDetached {
1711
async let childCancelled: Bool = { () -> Bool in
18-
sleep(3)
12+
await Task.sleep(3_000_000_000)
1913
return Task.isCancelled
2014
}()
2115

@@ -27,7 +21,7 @@ func test_runDetached_cancel_while_child_running() async {
2721
}
2822

2923
// sleep here, i.e. give the task a moment to start running
30-
sleep(2)
24+
await Task.sleep(2_000_000_000)
3125

3226
h.cancel()
3327
print("handle cancel")

test/Concurrency/Runtime/async_task_equals_hashCode.swift

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,6 @@
44
// REQUIRES: concurrency
55
// UNSUPPORTED: OS=windows-msvc
66

7-
#if canImport(Darwin)
8-
import Darwin
9-
#elseif canImport(Glibc)
10-
import Glibc
11-
#endif
12-
137
func simple() async {
148
print("\(#function) -----------------------")
159
let one = await Task.current()

test/Concurrency/Runtime/async_task_priority_current.swift

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,6 @@
66

77
import Dispatch
88

9-
#if canImport(Darwin)
10-
import Darwin
11-
#elseif canImport(Glibc)
12-
import Glibc
13-
#endif
14-
159
func test_detach() async {
1610
let a1 = Task.currentPriority
1711
print("a1: \(a1)") // CHECK: a1: default
@@ -32,7 +26,7 @@ func test_multiple_lo_indirectly_escalated() async {
3226
@concurrent
3327
func loopUntil(priority: Task.Priority) async {
3428
while (Task.currentPriority != priority) {
35-
sleep(1)
29+
await Task.sleep(1_000_000_000)
3630
}
3731
}
3832

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency %import-libdispatch -parse-as-library) | %FileCheck %s --dump-input always
2+
// REQUIRES: executable_test
3+
// REQUIRES: concurrency
4+
// REQUIRES: libdispatch
5+
6+
import _Concurrency
7+
// FIXME: should not depend on Dispatch
8+
import Dispatch
9+
10+
@main struct Main {
11+
static let pause = 500_000_000 // 500ms
12+
13+
static func main() async {
14+
await testSleepDuration()
15+
await testSleepDoesNotBlock()
16+
}
17+
18+
static func testSleepDuration() async {
19+
let start = DispatchTime.now()
20+
21+
await Task.sleep(UInt64(pause))
22+
23+
let stop = DispatchTime.now()
24+
25+
// assert that at least the specified time passed since calling `sleep`
26+
assert(stop >= (start + .nanoseconds(pause)))
27+
}
28+
29+
static func testSleepDoesNotBlock() async {
30+
// FIXME: Should run on main executor
31+
let task = Task.runDetached {
32+
print("Run first")
33+
}
34+
35+
await Task.sleep(UInt64(pause))
36+
37+
print("Run second")
38+
39+
// CHECK: Run first
40+
// CHECK: Run second
41+
await task.get()
42+
}
43+
}

test/Concurrency/Runtime/async_taskgroup_cancelAll_only_specific_group.swift

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,6 @@
66

77
import Dispatch
88

9-
#if canImport(Darwin)
10-
import Darwin
11-
#elseif canImport(Glibc)
12-
import Glibc
13-
#endif
14-
159
func asyncEcho(_ value: Int) async -> Int {
1610
value
1711
}
@@ -23,7 +17,7 @@ func test_taskGroup_cancelAll_onlySpecificGroup() async {
2317

2418
for i in 1...5 {
2519
await group.add {
26-
sleep(1)
20+
await Task.sleep(1_000_000_000)
2721
let c = Task.isCancelled
2822
print("add: \(i) (cancelled: \(c))")
2923
return i
@@ -48,7 +42,7 @@ func test_taskGroup_cancelAll_onlySpecificGroup() async {
4842
let g2: Int = try! await Task.withGroup(resultType: Int.self) { group in
4943
for i in 1...3 {
5044
await group.add {
51-
sleep(1)
45+
await Task.sleep(1_000_000_000)
5246
let c = Task.isCancelled
5347
print("g1 task \(i) (cancelled: \(c))")
5448
return i

0 commit comments

Comments
 (0)