Skip to content

Commit 328bd47

Browse files
authored
Merge pull request #34818 from DougGregor/future-async-wait
[Concurrency] Implement Task.Handle.get() in terms of an async runtime call
2 parents abea46e + a015e4c commit 328bd47

File tree

6 files changed

+180
-267
lines changed

6 files changed

+180
-267
lines changed

include/swift/Runtime/Concurrency.h

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -110,31 +110,31 @@ swift_task_escalate(AsyncTask *task, JobPriority newPriority);
110110

111111
/// The result of waiting for a task future.
112112
struct TaskFutureWaitResult {
113-
enum Kind : uintptr_t {
114-
/// The waiting task has been added to the future's wait queue, and will
115-
/// be scheduled once the future has completed.
116-
Waiting,
117-
118-
/// The future succeeded and produced a result value. \c storage points
119-
/// at that value.
120-
Success,
121-
122-
/// The future finished by throwing an error. \c storage is that error
123-
/// existential.
124-
Error,
125-
};
126-
127-
Kind kind;
113+
/// Whether the storage represents the error result vs. the successful
114+
/// result.
115+
bool hadErrorResult;
116+
117+
/// Storage for the result of the future.
118+
///
119+
/// When the future completed normally, this is a pointer to the storage
120+
/// of the result value, which lives inside the future task itself.
121+
///
122+
/// When the future completed by throwing an error, this is the error
123+
/// object itself.
128124
OpaqueValue *storage;
129125
};
130126

131127
/// Wait for a future task to complete.
132128
///
133-
/// This can be called from any thread.
129+
/// This can be called from any thread. Its Swift signature is
134130
///
131+
/// \code
132+
/// func swift_task_future_wait(on task: Builtin.NativeObject) async
133+
/// -> TaskFutureWaitResult
134+
/// \endcode
135135
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
136-
TaskFutureWaitResult
137-
swift_task_future_wait(AsyncTask *task, AsyncTask *waitingTask);
136+
AsyncFunctionType<TaskFutureWaitResult(AsyncTask *task)>
137+
swift_task_future_wait;
138138

139139
/// Add a status record to a task. The record should not be
140140
/// modified while it is registered with a task.

stdlib/public/Concurrency/Task.cpp

Lines changed: 78 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,56 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask) {
7373
}
7474
}
7575

76+
77+
namespace {
78+
79+
/// An asynchronous context within a task that describes a general "Future".
80+
/// task.
81+
///
82+
/// This type matches the ABI of a function `<T> () async throws -> T`, which
83+
/// is the type used by `Task.runDetached` and `Task.group.add` to create
84+
/// futures.
85+
class TaskFutureWaitAsyncContext : public AsyncContext {
86+
public:
87+
// Error result is always present.
88+
SwiftError *errorResult = nullptr;
89+
90+
// No indirect results.
91+
92+
TaskFutureWaitResult result;
93+
94+
// FIXME: Currently, this is always here, but it isn't technically
95+
// necessary.
96+
void* Self;
97+
98+
// Arguments.
99+
AsyncTask *task;
100+
101+
using AsyncContext::AsyncContext;
102+
};
103+
104+
}
105+
106+
/// Run the given task, privoding it with the result of the future.
107+
static void runTaskWithFutureResult(
108+
AsyncTask *waitingTask, ExecutorRef executor,
109+
FutureFragment *futureFragment, bool hadErrorResult) {
110+
auto waitingTaskContext =
111+
static_cast<TaskFutureWaitAsyncContext *>(waitingTask->ResumeContext);
112+
113+
waitingTaskContext->result.hadErrorResult = hadErrorResult;
114+
if (hadErrorResult) {
115+
waitingTaskContext->result.storage =
116+
reinterpret_cast<OpaqueValue *>(futureFragment->getError());
117+
} else {
118+
waitingTaskContext->result.storage = futureFragment->getStoragePtr();
119+
}
120+
121+
// TODO: schedule this task on the executor rather than running it
122+
// directly.
123+
waitingTask->run(executor);
124+
}
125+
76126
void AsyncTask::completeFuture(AsyncContext *context, ExecutorRef executor) {
77127
using Status = FutureFragment::Status;
78128
using WaitQueueItem = FutureFragment::WaitQueueItem;
@@ -103,9 +153,8 @@ void AsyncTask::completeFuture(AsyncContext *context, ExecutorRef executor) {
103153
// Find the next waiting task.
104154
auto nextWaitingTask = waitingTask->getNextWaitingTask();
105155

106-
// TODO: schedule this task on the executor rather than running it
107-
// directly.
108-
waitingTask->run(executor);
156+
// Run the task.
157+
runTaskWithFutureResult(waitingTask, executor, fragment, hadErrorResult);
109158

110159
// Move to the next task.
111160
waitingTask = nextWaitingTask;
@@ -264,21 +313,37 @@ AsyncTaskAndContext swift::swift_task_create_future_f(
264313
return {task, initialContext};
265314
}
266315

267-
TaskFutureWaitResult
268-
swift::swift_task_future_wait(AsyncTask *task, AsyncTask *waitingTask) {
316+
void swift::swift_task_future_wait(
317+
AsyncTask *waitingTask, ExecutorRef executor,
318+
AsyncContext *rawContext) {
319+
// Suspend the waiting task.
320+
waitingTask->ResumeTask = rawContext->ResumeParent;
321+
waitingTask->ResumeContext = rawContext;
322+
323+
// Wait on the future.
324+
auto context = static_cast<TaskFutureWaitAsyncContext *>(rawContext);
325+
auto task = context->task;
269326
assert(task->isFuture());
270327
switch (task->waitFuture(waitingTask)) {
271328
case FutureFragment::Status::Executing:
272-
return TaskFutureWaitResult{TaskFutureWaitResult::Waiting, nullptr};
329+
// The waiting task has been queued on the future.
330+
return;
273331

274332
case FutureFragment::Status::Success:
275-
return TaskFutureWaitResult{
276-
TaskFutureWaitResult::Success, task->futureFragment()->getStoragePtr()};
277-
278-
case FutureFragment::Status::Error:
279-
return TaskFutureWaitResult{
280-
TaskFutureWaitResult::Error,
281-
reinterpret_cast<OpaqueValue *>(task->futureFragment()->getError())};
333+
// Run the task with a successful result.
334+
// FIXME: Want to guarantee a tail call here
335+
runTaskWithFutureResult(
336+
waitingTask, executor, task->futureFragment(),
337+
/*hadErrorResult=*/false);
338+
return;
339+
340+
case FutureFragment::Status::Error:
341+
// Run the task with an error result.
342+
// FIXME: Want to guarantee a tail call here
343+
runTaskWithFutureResult(
344+
waitingTask, executor, task->futureFragment(),
345+
/*hadErrorResult=*/true);
346+
return;
282347
}
283348
}
284349

stdlib/public/Concurrency/Task.swift

Lines changed: 14 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -121,18 +121,16 @@ extension Task {
121121
/// and throwing a specific error or using `checkCancellation` the error
122122
/// thrown out of the task will be re-thrown here.
123123
public func get() async throws -> Success {
124-
let rawResult = taskFutureWait(
125-
on: task, waiting: Builtin.getCurrentAsyncTask())
126-
switch TaskFutureWaitResult<Success>(raw: rawResult) {
127-
case .executing:
128-
fatalError("don't know how to synchronously return")
129-
130-
case .success(let result):
131-
return result
132-
133-
case .failure(let error):
134-
throw error
124+
let rawResult = await taskFutureWait(on: task)
125+
if rawResult.hadErrorResult {
126+
// Throw the result on error.
127+
throw unsafeBitCast(rawResult.storage, to: Error.self)
135128
}
129+
130+
// Take the value on success
131+
let storagePtr =
132+
rawResult.storage.bindMemory(to: Success.self, capacity: 1)
133+
return UnsafeMutablePointer<Success>(mutating: storagePtr).pointee
136134
}
137135

138136
/// Attempt to cancel the task.
@@ -269,8 +267,7 @@ extension Task {
269267
flags.isFuture = true
270268

271269
// Create the asynchronous task future.
272-
let (task, context) =
273-
Builtin.createAsyncTaskFuture(flags.bits, nil, operation)
270+
let (task, _) = Builtin.createAsyncTaskFuture(flags.bits, nil, operation)
274271

275272
return Handle<T>(task: task)
276273
}
@@ -316,8 +313,7 @@ extension Task {
316313
flags.isFuture = true
317314

318315
// Create the asynchronous task future.
319-
let (task, context) =
320-
Builtin.createAsyncTaskFuture(flags.bits, nil, operation)
316+
let (task, _) = Builtin.createAsyncTaskFuture(flags.bits, nil, operation)
321317

322318
return Handle<T>(task: task)
323319
}
@@ -423,47 +419,12 @@ public func runAsync(_ asyncFun: @escaping () async -> ()) {
423419
runTask(childTask.0)
424420
}
425421

426-
/// Describes the result of waiting for a future.
427-
enum TaskFutureWaitResult<T> {
428-
/// The future is still executing, and our waiting task has been placed
429-
/// on its queue for when the future completes.
430-
case executing
431-
432-
/// The future has succeeded with the given value.
433-
case success(T)
434-
435-
/// The future has thrown the given error.
436-
case failure(Error)
437-
438-
/// Initialize this instance from a raw result, taking any instance within
439-
/// that result.
440-
init(raw: RawTaskFutureWaitResult) {
441-
switch raw.kind {
442-
case 0:
443-
self = .executing
444-
445-
case 1:
446-
// Take the value on success
447-
let storagePtr = raw.storage.bindMemory(to: T.self, capacity: 1)
448-
self = .success(UnsafeMutablePointer<T>(mutating: storagePtr).move())
449-
450-
case 2:
451-
// Take the error on error.
452-
self = .failure(unsafeBitCast(raw.storage, to: Error.self))
453-
454-
default:
455-
assert(false)
456-
self = .executing
457-
}
458-
}
459-
}
460-
461422
struct RawTaskFutureWaitResult {
462-
let kind: Int
423+
let hadErrorResult: Bool
463424
let storage: UnsafeRawPointer
464425
}
465426

466427
@_silgen_name("swift_task_future_wait")
467428
func taskFutureWait(
468-
on task: Builtin.NativeObject, waiting waitingTask: Builtin.NativeObject
469-
) -> RawTaskFutureWaitResult
429+
on task: Builtin.NativeObject
430+
) async -> RawTaskFutureWaitResult
Lines changed: 70 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency) | %FileCheck %s
1+
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency)
2+
23
// REQUIRES: executable_test
34
// REQUIRES: concurrency
45
// REQUIRES: OS=macosx
56

67
import Dispatch
78

89
extension DispatchQueue {
9-
func async<R>(execute: @escaping () async -> R) -> Task.Handle<R> {
10+
func async<R>(execute: @escaping () async throws -> R) -> Task.Handle<R> {
1011
let handle = Task.runDetached(operation: execute)
1112

1213
// Run the task
@@ -16,30 +17,83 @@ extension DispatchQueue {
1617
}
1718
}
1819

20+
enum HomeworkError: Error, Equatable {
21+
case dogAteIt(String)
22+
}
23+
1924
func formGreeting(name: String) async -> String {
2025
return "Hello \(name) from async world"
2126
}
2227

23-
func test(name: String) {
24-
let taskHandle = DispatchQueue.main.async { () async -> String in
28+
func testSimple(
29+
name: String, dogName: String, shouldThrow: Bool, doSuspend: Bool
30+
) {
31+
print("Testing name: \(name), dog: \(dogName), shouldThrow: \(shouldThrow) doSuspend: \(doSuspend)")
32+
33+
let queue = DispatchQueue(label: "concurrent", attributes: .concurrent)
34+
let group = DispatchGroup()
35+
var completed = false
36+
37+
group.enter()
38+
let taskHandle = queue.async { () async throws -> String in
39+
defer {
40+
group.leave()
41+
}
42+
2543
let greeting = await formGreeting(name: name)
44+
45+
// If the intent is to test suspending, wait a bit so the second task
46+
// can complete.
47+
if doSuspend {
48+
print("- Future sleeping")
49+
sleep(1)
50+
}
51+
52+
if (shouldThrow) {
53+
print("- Future throwing")
54+
throw HomeworkError.dogAteIt(dogName + " the dog")
55+
}
56+
57+
print("- Future returning normally")
2658
return greeting + "!"
2759
}
2860

29-
_ = DispatchQueue.main.async { () async in
30-
// CHECK: Sleeping
31-
print("Sleeping...")
32-
sleep(2)
33-
let result = await try! taskHandle.get()
34-
// CHECK: Hello Ted from async world
35-
print(result)
36-
assert(result == "Hello Ted from async world!")
37-
exit(0)
61+
group.enter()
62+
_ = queue.async { () async in
63+
defer {
64+
group.leave()
65+
}
66+
67+
// If the intent is not to test suspending, wait a bit so the first task
68+
// can complete.
69+
if !doSuspend {
70+
print("+ Reader sleeping")
71+
sleep(1)
72+
}
73+
74+
do {
75+
print("+ Reader waiting for the result")
76+
let result = await try taskHandle.get()
77+
completed = true
78+
print("+ Normal return: \(result)")
79+
assert(result == "Hello \(name) from async world!")
80+
} catch HomeworkError.dogAteIt(let badDog) {
81+
completed = true
82+
print("+ Error return: HomeworkError.dogAteIt(\(badDog))")
83+
assert(badDog == dogName + " the dog")
84+
} catch {
85+
fatalError("Caught a different exception?")
86+
}
3887
}
3988

40-
print("Main task")
89+
group.wait()
90+
assert(completed)
91+
print("Finished test")
4192
}
4293

43-
test(name: "Ted")
94+
testSimple(name: "Ted", dogName: "Hazel", shouldThrow: false, doSuspend: false)
95+
testSimple(name: "Ted", dogName: "Hazel", shouldThrow: true, doSuspend: false)
96+
testSimple(name: "Ted", dogName: "Hazel", shouldThrow: false, doSuspend: true)
97+
testSimple(name: "Ted", dogName: "Hazel", shouldThrow: true, doSuspend: true)
4498

45-
dispatchMain()
99+
print("Done")

unittests/runtime/CMakeLists.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ if(("${SWIFT_HOST_VARIANT_SDK}" STREQUAL "${SWIFT_PRIMARY_VARIANT_SDK}") AND
5555

5656
if(SWIFT_ENABLE_EXPERIMENTAL_CONCURRENCY)
5757
list(APPEND PLATFORM_SOURCES
58-
TaskFuture.cpp
5958
TaskStatus.cpp
6059
)
6160
list(APPEND PLATFORM_TARGET_LINK_LIBRARIES
@@ -67,7 +66,6 @@ if(("${SWIFT_HOST_VARIANT_SDK}" STREQUAL "${SWIFT_PRIMARY_VARIANT_SDK}") AND
6766
set(LLVM_OPTIONAL_SOURCES
6867
weak.mm
6968
Refcounting.mm
70-
TaskFuture.cpp
7169
TaskStatus.cpp)
7270

7371
add_swift_unittest(SwiftRuntimeTests

0 commit comments

Comments
 (0)