Skip to content

[Concurrency] Add asynchronous Task.sleep function #35983

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/swift/Runtime/Concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,9 @@ void swift_task_enqueue(Job *job, ExecutorRef executor);
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void swift_task_enqueueGlobal(Job *job);

SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void swift_task_enqueueGlobalWithDelay(unsigned long long delay, Job *job);

/// FIXME: only exists for the quick-and-dirty MainActor implementation.
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void swift_task_enqueueMainExecutor(Job *job);
Expand All @@ -499,6 +502,11 @@ void swift_MainActor_register(HeapObject *actor);
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void (*swift_task_enqueueGlobal_hook)(Job *job);

/// A hook to take over global enqueuing with delay.
/// TODO: figure out a better abstraction plan than this.
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void (*swift_task_enqueueGlobalWithDelay_hook)(unsigned long long delay, Job *job);

/// Initialize the runtime storage for a default actor.
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void swift_defaultActor_initialize(DefaultActor *actor);
Expand Down
95 changes: 91 additions & 4 deletions stdlib/public/Concurrency/GlobalExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,27 @@ using namespace swift;
SWIFT_CC(swift)
void (*swift::swift_task_enqueueGlobal_hook)(Job *job) = nullptr;

SWIFT_CC(swift)
void (*swift::swift_task_enqueueGlobalWithDelay_hook)(unsigned long long delay, Job *job) = nullptr;

#if SWIFT_CONCURRENCY_COOPERATIVE_GLOBAL_EXECUTOR

#include <chrono>
#include <thread>

static Job *JobQueue = nullptr;

class DelayedJob {
public:
Job *job;
unsigned long long when;
DelayedJob *next;

DelayedJob(Job *job, unsigned long long when) : job(job), when(when), next(nullptr) {}
};

static DelayedJob *DelayedJobQueue = nullptr;

/// Get the next-in-queue storage slot.
static Job *&nextInQueue(Job *cur) {
return reinterpret_cast<Job*&>(cur->SchedulerPrivate);
Expand All @@ -89,13 +107,58 @@ static void insertIntoJobQueue(Job *newJob) {
*position = newJob;
}

static unsigned long long currentNanos() {
auto now = std::chrono::steady_clock::now();
auto nowNanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
auto value = std::chrono::duration_cast<std::chrono::nanoseconds>(nowNanos.time_since_epoch());
return value.count();
}

/// Insert a job into the cooperative global queue.
static void insertIntoDelayedJobQueue(unsigned long long delay, Job *job) {
DelayedJob **position = &DelayedJobQueue;
DelayedJob *newJob = new DelayedJob(job, currentNanos() + delay);

while (auto cur = *position) {
// If we find a job with lower priority, insert here.
if (cur->when > newJob->when) {
newJob->next = cur;
*position = newJob;
return;
}

// Otherwise, keep advancing through the queue.
position = &cur->next;
}
*position = newJob;
}

/// Claim the next job from the cooperative global queue.
static Job *claimNextFromJobQueue() {
if (auto job = JobQueue) {
JobQueue = nextInQueue(job);
return job;
// Check delayed jobs first
while (true) {
if (auto delayedJob = DelayedJobQueue) {
if (delayedJob->when < currentNanos()) {
DelayedJobQueue = delayedJob->next;
auto job = delayedJob->job;

delete delayedJob;

return job;
}
}
if (auto job = JobQueue) {
JobQueue = nextInQueue(job);
return job;
}
// there are only delayed jobs left, but they are not ready,
// so we sleep until the first one is
if (auto delayedJob = DelayedJobQueue) {
std::this_thread::sleep_for(std::chrono::nanoseconds(delayedJob->when - currentNanos()));
continue;
}
return nullptr;
}
return nullptr;
}

void swift::donateThreadToGlobalExecutorUntil(bool (*condition)(void *),
Expand Down Expand Up @@ -177,6 +240,30 @@ void swift::swift_task_enqueueGlobal(Job *job) {
#endif
}

void swift::swift_task_enqueueGlobalWithDelay(unsigned long long delay, Job *job) {
assert(job && "no job provided");

// If the hook is defined, use it.
if (swift_task_enqueueGlobalWithDelay_hook)
return swift_task_enqueueGlobalWithDelay_hook(delay, job);

#if SWIFT_CONCURRENCY_COOPERATIVE_GLOBAL_EXECUTOR
insertIntoDelayedJobQueue(delay, job);
#else

dispatch_function_t dispatchFunction = &__swift_run_job;
void *dispatchContext = job;

JobPriority priority = job->getPriority();

// TODO: cache this to avoid the extra call
auto queue = dispatch_get_global_queue((dispatch_qos_class_t) priority,
/*flags*/ 0);
dispatch_time_t when = dispatch_time(DISPATCH_TIME_NOW, delay);
dispatch_after_f(when, queue, dispatchContext, dispatchFunction);
#endif
}


/// Enqueues a task on the main executor.
/// FIXME: only exists for the quick-and-dirty MainActor implementation.
Expand Down
30 changes: 30 additions & 0 deletions stdlib/public/Concurrency/Task.swift
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,32 @@ public func _runAsyncHandler(operation: @escaping () async -> ()) {
)
}

// ==== Async Sleep ------------------------------------------------------------

extension Task {
/// Suspends the current task for _at least_ the given duration
/// in nanoseconds.
///
/// This function does _not_ block the underlying thread.
public static func sleep(_ duration: UInt64) async {
// Set up the job flags for a new task.
var flags = JobFlags()
flags.kind = .task
flags.priority = .default
flags.isFuture = true

// Create the asynchronous task future.
// FIXME: This should be an empty closure instead. Returning `0` here is
// a workaround for rdar://74957357
let (task, _) = Builtin.createAsyncTaskFuture(flags.bits, nil, { return 0 })

// Enqueue the resulting job.
_enqueueJobGlobalWithDelay(duration, Builtin.convertTaskToJob(task))

let _ = await Handle<Int, Never>(task).get()
}
Copy link
Contributor

@ktoso ktoso Feb 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I wonder if implementing this without a detached implicitly created task might be better?

It'd need much more boilerplate to introduce a new "awaitable" function like I did for swift_task_group_wait_next for groups... It'd save some allocations (the task allocates, and if we wanted to make this task a child that's another allocation and hitting a lock in the parent too).

What do @DougGregor @jckarter think?

}

// ==== UnsafeCurrentTask ------------------------------------------------------

extension Task {
Expand Down Expand Up @@ -571,6 +597,10 @@ func getJobFlags(_ task: Builtin.NativeObject) -> Task.JobFlags
@usableFromInline
func _enqueueJobGlobal(_ task: Builtin.Job)

@_silgen_name("swift_task_enqueueGlobalWithDelay")
@usableFromInline
func _enqueueJobGlobalWithDelay(_ delay: UInt64, _ task: Builtin.Job)

@available(*, deprecated)
@_silgen_name("swift_task_runAndBlockThread")
public func runAsyncAndBlock(_ asyncFun: @escaping () async -> ())
Expand Down
8 changes: 1 addition & 7 deletions test/Concurrency/Runtime/actor_counters.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@
// REQUIRES: concurrency
// REQUIRES: libdispatch

#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
import Glibc
#endif

actor Counter {
private var value = 0
private let scratchBuffer: UnsafeMutableBufferPointer<Int>
Expand Down Expand Up @@ -53,7 +47,7 @@ func runTest(numCounters: Int, numWorkers: Int, numIterations: Int) async {
for i in 0..<numWorkers {
workers.append(
Task.runDetached { [counters] in
usleep(UInt32.random(in: 0..<100) * 1000)
await Task.sleep(UInt64.random(in: 0..<100) * 1_000_000)
await worker(identity: i, counters: counters, numIterations: numIterations)
}
)
Expand Down
10 changes: 2 additions & 8 deletions test/Concurrency/Runtime/async_let_fibonacci.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@
// REQUIRES: concurrency
// REQUIRES: libdispatch

#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
import Glibc
#endif

func fib(_ n: Int) -> Int {
var first = 0
var second = 1
Expand All @@ -30,12 +24,12 @@ func asyncFib(_ n: Int) async -> Int {
async let second = await asyncFib(n-1)

// Sleep a random amount of time waiting on the result producing a result.
usleep(UInt32.random(in: 0..<100) * 1000)
await Task.sleep(UInt64.random(in: 0..<100) * 1_000_000)

let result = await first + second

// Sleep a random amount of time before producing a result.
usleep(UInt32.random(in: 0..<100) * 1000)
await Task.sleep(UInt64.random(in: 0..<100) * 1_000_000)

return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,11 @@

import Dispatch

#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
import Glibc
#endif

func test_runDetached_cancel_child_early() async {
print(#function) // CHECK: test_runDetached_cancel_child_early
let h: Task.Handle<Bool, Error> = Task.runDetached {
async let childCancelled: Bool = { () -> Bool in
sleep(2)
await Task.sleep(2_000_000_000)
return Task.isCancelled
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,10 @@

import Dispatch

#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
import Glibc
#endif

func test_runDetached_cancel_while_child_running() async {
let h: Task.Handle<Bool, Error> = Task.runDetached {
async let childCancelled: Bool = { () -> Bool in
sleep(3)
await Task.sleep(3_000_000_000)
return Task.isCancelled
}()

Expand All @@ -27,7 +21,7 @@ func test_runDetached_cancel_while_child_running() async {
}

// sleep here, i.e. give the task a moment to start running
sleep(2)
await Task.sleep(2_000_000_000)

h.cancel()
print("handle cancel")
Expand Down
6 changes: 0 additions & 6 deletions test/Concurrency/Runtime/async_task_equals_hashCode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@
// REQUIRES: concurrency
// UNSUPPORTED: OS=windows-msvc

#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
import Glibc
#endif

func simple() async {
print("\(#function) -----------------------")
let one = await Task.current()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@

import Dispatch

#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
import Glibc
#endif

func test_detach() async {
let a1 = Task.currentPriority
print("a1: \(a1)") // CHECK: a1: default
Expand All @@ -32,7 +26,7 @@ func test_multiple_lo_indirectly_escalated() async {
@concurrent
func loopUntil(priority: Task.Priority) async {
while (Task.currentPriority != priority) {
sleep(1)
await Task.sleep(1_000_000_000)
}
}

Expand Down
43 changes: 43 additions & 0 deletions test/Concurrency/Runtime/async_task_sleep.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency %import-libdispatch -parse-as-library) | %FileCheck %s --dump-input always
// REQUIRES: executable_test
// REQUIRES: concurrency
// REQUIRES: libdispatch

import _Concurrency
// FIXME: should not depend on Dispatch
import Dispatch

@main struct Main {
static let pause = 500_000_000 // 500ms

static func main() async {
await testSleepDuration()
await testSleepDoesNotBlock()
}

static func testSleepDuration() async {
let start = DispatchTime.now()

await Task.sleep(UInt64(pause))

let stop = DispatchTime.now()

// assert that at least the specified time passed since calling `sleep`
assert(stop >= (start + .nanoseconds(pause)))
}

static func testSleepDoesNotBlock() async {
// FIXME: Should run on main executor
let task = Task.runDetached {
print("Run first")
}

await Task.sleep(UInt64(pause))

print("Run second")

// CHECK: Run first
// CHECK: Run second
await task.get()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@

import Dispatch

#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
import Glibc
#endif

func asyncEcho(_ value: Int) async -> Int {
value
}
Expand All @@ -23,7 +17,7 @@ func test_taskGroup_cancelAll_onlySpecificGroup() async {

for i in 1...5 {
await group.add {
sleep(1)
await Task.sleep(1_000_000_000)
let c = Task.isCancelled
print("add: \(i) (cancelled: \(c))")
return i
Expand All @@ -48,7 +42,7 @@ func test_taskGroup_cancelAll_onlySpecificGroup() async {
let g2: Int = try! await Task.withGroup(resultType: Int.self) { group in
for i in 1...3 {
await group.add {
sleep(1)
await Task.sleep(1_000_000_000)
let c = Task.isCancelled
print("g1 task \(i) (cancelled: \(c))")
return i
Expand Down
Loading