Skip to content

Commit 8945280

Browse files
committed
[Concurrency] Implement Task.Handle.get() in terms of an async runtime call.
Switch the contract between the runtime operation `swift_future_task_wait` and Task.Handle.get() pver to an asynchronous call, so that the compiler will set up the resumption frame for us. This allows us to correctly wait on futures. Update our "basic" future test to perform both normal returns and throwing returns from a future, either having to wait on the queue or coming by afterward.
1 parent 632c9ff commit 8945280

File tree

4 files changed

+180
-100
lines changed

4 files changed

+180
-100
lines changed

include/swift/Runtime/Concurrency.h

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -110,31 +110,31 @@ swift_task_escalate(AsyncTask *task, JobPriority newPriority);
110110

111111
/// The result of waiting for a task future.
112112
struct TaskFutureWaitResult {
113-
enum Kind : uintptr_t {
114-
/// The waiting task has been added to the future's wait queue, and will
115-
/// be scheduled once the future has completed.
116-
Waiting,
117-
118-
/// The future succeeded and produced a result value. \c storage points
119-
/// at that value.
120-
Success,
121-
122-
/// The future finished by throwing an error. \c storage is that error
123-
/// existential.
124-
Error,
125-
};
126-
127-
Kind kind;
113+
/// Whether the storage represents the error result vs. the successful
114+
/// result.
115+
bool hadErrorResult;
116+
117+
/// Storage for the result of the future.
118+
///
119+
/// When the future completed normally, this is a pointer to the storage
120+
/// of the result value, which lives inside the future task itself.
121+
///
122+
/// When the future completed by throwing an error, this is the error
123+
/// object itself.
128124
OpaqueValue *storage;
129125
};
130126

131127
/// Wait for a future task to complete.
132128
///
133-
/// This can be called from any thread.
129+
/// This can be called from any thread. Its Swift signature is
134130
///
131+
/// \code
132+
/// func swift_task_future_wait(on task: Builtin.NativeObject) async
133+
/// -> TaskFutureWaitResult
134+
/// \endcode
135135
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
136-
TaskFutureWaitResult
137-
swift_task_future_wait(AsyncTask *task, AsyncTask *waitingTask);
136+
AsyncFunctionType<TaskFutureWaitResult(AsyncTask *task)>
137+
swift_task_future_wait;
138138

139139
/// Add a status record to a task. The record should not be
140140
/// modified while it is registered with a task.

stdlib/public/Concurrency/Task.cpp

Lines changed: 78 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,56 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask) {
7373
}
7474
}
7575

76+
77+
namespace {
78+
79+
/// An asynchronous context within a task that describes a general "Future".
80+
/// task.
81+
///
82+
/// This type matches the ABI of a function `<T> () async throws -> T`, which
83+
/// is the type used by `Task.runDetached` and `Task.group.add` to create
84+
/// futures.
85+
class TaskFutureWaitAsyncContext : public AsyncContext {
86+
public:
87+
// Error result is always present.
88+
SwiftError *errorResult = nullptr;
89+
90+
// No indirect results.
91+
92+
TaskFutureWaitResult result;
93+
94+
// FIXME: Currently, this is always here, but it isn't technically
95+
// necessary.
96+
void* Self;
97+
98+
// Arguments.
99+
AsyncTask *task;
100+
101+
using AsyncContext::AsyncContext;
102+
};
103+
104+
}
105+
106+
/// Run the given task, privoding it with the result of the future.
107+
static void runTaskWithFutureResult(
108+
AsyncTask *waitingTask, ExecutorRef executor,
109+
FutureFragment *futureFragment, bool hadErrorResult) {
110+
auto waitingTaskContext =
111+
static_cast<TaskFutureWaitAsyncContext *>(waitingTask->ResumeContext);
112+
113+
waitingTaskContext->result.hadErrorResult = hadErrorResult;
114+
if (hadErrorResult) {
115+
waitingTaskContext->result.storage =
116+
reinterpret_cast<OpaqueValue *>(futureFragment->getError());
117+
} else {
118+
waitingTaskContext->result.storage = futureFragment->getStoragePtr();
119+
}
120+
121+
// TODO: schedule this task on the executor rather than running it
122+
// directly.
123+
waitingTask->run(executor);
124+
}
125+
76126
void AsyncTask::completeFuture(AsyncContext *context, ExecutorRef executor) {
77127
using Status = FutureFragment::Status;
78128
using WaitQueueItem = FutureFragment::WaitQueueItem;
@@ -103,9 +153,8 @@ void AsyncTask::completeFuture(AsyncContext *context, ExecutorRef executor) {
103153
// Find the next waiting task.
104154
auto nextWaitingTask = waitingTask->getNextWaitingTask();
105155

106-
// TODO: schedule this task on the executor rather than running it
107-
// directly.
108-
waitingTask->run(executor);
156+
// Run the task.
157+
runTaskWithFutureResult(waitingTask, executor, fragment, hadErrorResult);
109158

110159
// Move to the next task.
111160
waitingTask = nextWaitingTask;
@@ -264,21 +313,37 @@ AsyncTaskAndContext swift::swift_task_create_future_f(
264313
return {task, initialContext};
265314
}
266315

267-
TaskFutureWaitResult
268-
swift::swift_task_future_wait(AsyncTask *task, AsyncTask *waitingTask) {
316+
void swift::swift_task_future_wait(
317+
AsyncTask *waitingTask, ExecutorRef executor,
318+
AsyncContext *rawContext) {
319+
// Suspend the waiting task.
320+
waitingTask->ResumeTask = rawContext->ResumeParent;
321+
waitingTask->ResumeContext = rawContext;
322+
323+
// Wait on the future.
324+
auto context = static_cast<TaskFutureWaitAsyncContext *>(rawContext);
325+
auto task = context->task;
269326
assert(task->isFuture());
270327
switch (task->waitFuture(waitingTask)) {
271328
case FutureFragment::Status::Executing:
272-
return TaskFutureWaitResult{TaskFutureWaitResult::Waiting, nullptr};
329+
// The waiting task has been queued on the future.
330+
return;
273331

274332
case FutureFragment::Status::Success:
275-
return TaskFutureWaitResult{
276-
TaskFutureWaitResult::Success, task->futureFragment()->getStoragePtr()};
277-
278-
case FutureFragment::Status::Error:
279-
return TaskFutureWaitResult{
280-
TaskFutureWaitResult::Error,
281-
reinterpret_cast<OpaqueValue *>(task->futureFragment()->getError())};
333+
// Run the task with a successful result.
334+
// FIXME: Want to guarantee a tail call here
335+
runTaskWithFutureResult(
336+
waitingTask, executor, task->futureFragment(),
337+
/*hadErrorResult=*/false);
338+
return;
339+
340+
case FutureFragment::Status::Error:
341+
// Run the task with an error result.
342+
// FIXME: Want to guarantee a tail call here
343+
runTaskWithFutureResult(
344+
waitingTask, executor, task->futureFragment(),
345+
/*hadErrorResult=*/true);
346+
return;
282347
}
283348
}
284349

stdlib/public/Concurrency/Task.swift

Lines changed: 14 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -121,18 +121,16 @@ extension Task {
121121
/// and throwing a specific error or using `checkCancellation` the error
122122
/// thrown out of the task will be re-thrown here.
123123
public func get() async throws -> Success {
124-
let rawResult = taskFutureWait(
125-
on: task, waiting: Builtin.getCurrentAsyncTask())
126-
switch TaskFutureWaitResult<Success>(raw: rawResult) {
127-
case .executing:
128-
fatalError("don't know how to synchronously return")
129-
130-
case .success(let result):
131-
return result
132-
133-
case .failure(let error):
134-
throw error
124+
let rawResult = await taskFutureWait(on: task)
125+
if rawResult.hadErrorResult {
126+
// Throw the result on error.
127+
throw unsafeBitCast(rawResult.storage, to: Error.self)
135128
}
129+
130+
// Take the value on success
131+
let storagePtr =
132+
rawResult.storage.bindMemory(to: Success.self, capacity: 1)
133+
return UnsafeMutablePointer<Success>(mutating: storagePtr).pointee
136134
}
137135

138136
/// Attempt to cancel the task.
@@ -269,8 +267,7 @@ extension Task {
269267
flags.isFuture = true
270268

271269
// Create the asynchronous task future.
272-
let (task, context) =
273-
Builtin.createAsyncTaskFuture(flags.bits, nil, operation)
270+
let (task, _) = Builtin.createAsyncTaskFuture(flags.bits, nil, operation)
274271

275272
return Handle<T>(task: task)
276273
}
@@ -316,8 +313,7 @@ extension Task {
316313
flags.isFuture = true
317314

318315
// Create the asynchronous task future.
319-
let (task, context) =
320-
Builtin.createAsyncTaskFuture(flags.bits, nil, operation)
316+
let (task, _) = Builtin.createAsyncTaskFuture(flags.bits, nil, operation)
321317

322318
return Handle<T>(task: task)
323319
}
@@ -423,47 +419,12 @@ public func runAsync(_ asyncFun: @escaping () async -> ()) {
423419
runTask(childTask.0)
424420
}
425421

426-
/// Describes the result of waiting for a future.
427-
enum TaskFutureWaitResult<T> {
428-
/// The future is still executing, and our waiting task has been placed
429-
/// on its queue for when the future completes.
430-
case executing
431-
432-
/// The future has succeeded with the given value.
433-
case success(T)
434-
435-
/// The future has thrown the given error.
436-
case failure(Error)
437-
438-
/// Initialize this instance from a raw result, taking any instance within
439-
/// that result.
440-
init(raw: RawTaskFutureWaitResult) {
441-
switch raw.kind {
442-
case 0:
443-
self = .executing
444-
445-
case 1:
446-
// Take the value on success
447-
let storagePtr = raw.storage.bindMemory(to: T.self, capacity: 1)
448-
self = .success(UnsafeMutablePointer<T>(mutating: storagePtr).move())
449-
450-
case 2:
451-
// Take the error on error.
452-
self = .failure(unsafeBitCast(raw.storage, to: Error.self))
453-
454-
default:
455-
assert(false)
456-
self = .executing
457-
}
458-
}
459-
}
460-
461422
struct RawTaskFutureWaitResult {
462-
let kind: Int
423+
let hadErrorResult: Bool
463424
let storage: UnsafeRawPointer
464425
}
465426

466427
@_silgen_name("swift_task_future_wait")
467428
func taskFutureWait(
468-
on task: Builtin.NativeObject, waiting waitingTask: Builtin.NativeObject
469-
) -> RawTaskFutureWaitResult
429+
on task: Builtin.NativeObject
430+
) async -> RawTaskFutureWaitResult
Lines changed: 70 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency) | %FileCheck %s
1+
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency)
2+
23
// REQUIRES: executable_test
34
// REQUIRES: concurrency
45
// REQUIRES: OS=macosx
56

67
import Dispatch
78

89
extension DispatchQueue {
9-
func async<R>(execute: @escaping () async -> R) -> Task.Handle<R> {
10+
func async<R>(execute: @escaping () async throws -> R) -> Task.Handle<R> {
1011
let handle = Task.runDetached(operation: execute)
1112

1213
// Run the task
@@ -16,30 +17,83 @@ extension DispatchQueue {
1617
}
1718
}
1819

20+
enum HomeworkError: Error, Equatable {
21+
case dogAteIt(String)
22+
}
23+
1924
func formGreeting(name: String) async -> String {
2025
return "Hello \(name) from async world"
2126
}
2227

23-
func test(name: String) {
24-
let taskHandle = DispatchQueue.main.async { () async -> String in
28+
func testSimple(
29+
name: String, dogName: String, shouldThrow: Bool, doSuspend: Bool
30+
) {
31+
print("Testing name: \(name), dog: \(dogName), shouldThrow: \(shouldThrow) doSuspend: \(doSuspend)")
32+
33+
let queue = DispatchQueue(label: "concurrent", attributes: .concurrent)
34+
let group = DispatchGroup()
35+
var completed = false
36+
37+
group.enter()
38+
let taskHandle = queue.async { () async throws -> String in
39+
defer {
40+
group.leave()
41+
}
42+
2543
let greeting = await formGreeting(name: name)
44+
45+
// If the intent is to test suspending, wait a bit so the second task
46+
// can complete.
47+
if doSuspend {
48+
print("- Future sleeping")
49+
sleep(1)
50+
}
51+
52+
if (shouldThrow) {
53+
print("- Future throwing")
54+
throw HomeworkError.dogAteIt(dogName + " the dog")
55+
}
56+
57+
print("- Future returning normally")
2658
return greeting + "!"
2759
}
2860

29-
_ = DispatchQueue.main.async { () async in
30-
// CHECK: Sleeping
31-
print("Sleeping...")
32-
sleep(2)
33-
let result = await try! taskHandle.get()
34-
// CHECK: Hello Ted from async world
35-
print(result)
36-
assert(result == "Hello Ted from async world!")
37-
exit(0)
61+
group.enter()
62+
_ = queue.async { () async in
63+
defer {
64+
group.leave()
65+
}
66+
67+
// If the intent is not to test suspending, wait a bit so the first task
68+
// can complete.
69+
if !doSuspend {
70+
print("+ Reader sleeping")
71+
sleep(1)
72+
}
73+
74+
do {
75+
print("+ Reader waiting for the result")
76+
let result = await try taskHandle.get()
77+
completed = true
78+
print("+ Normal return: \(result)")
79+
assert(result == "Hello \(name) from async world!")
80+
} catch HomeworkError.dogAteIt(let badDog) {
81+
completed = true
82+
print("+ Error return: HomeworkError.dogAteIt(\(badDog))")
83+
assert(badDog == dogName + " the dog")
84+
} catch {
85+
fatalError("Caught a different exception?")
86+
}
3887
}
3988

40-
print("Main task")
89+
group.wait()
90+
assert(completed)
91+
print("Finished test")
4192
}
4293

43-
test(name: "Ted")
94+
testSimple(name: "Ted", dogName: "Hazel", shouldThrow: false, doSuspend: false)
95+
testSimple(name: "Ted", dogName: "Hazel", shouldThrow: true, doSuspend: false)
96+
testSimple(name: "Ted", dogName: "Hazel", shouldThrow: false, doSuspend: true)
97+
testSimple(name: "Ted", dogName: "Hazel", shouldThrow: true, doSuspend: true)
4498

45-
dispatchMain()
99+
print("Done")

0 commit comments

Comments
 (0)