Skip to content

Commit cb6ffe8

Browse files
committed
In task-to-thread model, make sure that when we are waiting on an
async-let task, we run it in the context of the parent task.
1 parent f2099c5 commit cb6ffe8

File tree

7 files changed

+133
-6
lines changed

7 files changed

+133
-6
lines changed

include/swift/Runtime/Concurrency.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141

4242
// Does the runtime integrate with libdispatch?
4343
#ifndef SWIFT_CONCURRENCY_ENABLE_DISPATCH
44-
#if SWIFT_CONCURRENCY_COOPERATIVE_GLOBAL_EXECUTOR
44+
#if SWIFT_CONCURRENCY_COOPERATIVE_GLOBAL_EXECUTOR || SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
4545
#define SWIFT_CONCURRENCY_ENABLE_DISPATCH 0
4646
#else
4747
#define SWIFT_CONCURRENCY_ENABLE_DISPATCH 1

stdlib/public/Concurrency/Actor.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,12 @@ AsyncTask *swift::_swift_task_clearCurrent() {
249249
return task;
250250
}
251251

252+
AsyncTask *swift::_swift_task_setCurrent(AsyncTask *new_task) {
253+
auto task = ActiveTask::get();
254+
ActiveTask::set(new_task);
255+
return task;
256+
}
257+
252258
SWIFT_CC(swift)
253259
static ExecutorRef swift_task_getCurrentExecutorImpl() {
254260
auto currentTracking = ExecutorTrackingInfo::current();

stdlib/public/Concurrency/AsyncLet.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,13 @@ void swift::swift_asyncLet_start(AsyncLet *alet,
167167
void *closureEntryPoint,
168168
HeapObject *closureContext) {
169169
auto flags = TaskCreateFlags();
170+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
171+
// In the task to thread model, we don't want tasks to start running on
172+
// separate threads - they will run in the context of the parent
173+
flags.setEnqueueJob(false);
174+
#else
170175
flags.setEnqueueJob(true);
176+
#endif
171177

172178
AsyncLetTaskOptionRecord asyncLetOptionRecord(alet);
173179
asyncLetOptionRecord.Parent = options;
@@ -191,7 +197,14 @@ void swift::swift_asyncLet_begin(AsyncLet *alet,
191197
resultBuffer);
192198

193199
auto flags = TaskCreateFlags();
200+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
201+
// In the task to thread model, we don't want tasks to start running on
202+
// separate threads - they will run in the context of the parent
203+
flags.setEnqueueJob(false);
204+
#else
194205
flags.setEnqueueJob(true);
206+
#endif
207+
195208

196209
AsyncLetWithBufferTaskOptionRecord asyncLetOptionRecord(alet, resultBuffer);
197210
asyncLetOptionRecord.Parent = options;

stdlib/public/Concurrency/Executor.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func _checkExpectedExecutor(_filenameStart: Builtin.RawPointer,
9393
_filenameStart, _filenameLength, _filenameIsASCII, _line, _executor)
9494
}
9595

96-
#if !SWIFT_STDLIB_SINGLE_THREADED_CONCURRENCY
96+
#if !SWIFT_STDLIB_SINGLE_THREADED_CONCURRENCY && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
9797
// This must take a DispatchQueueShim, not something like AnyObject,
9898
// or else SILGen will emit a retain/release in unoptimized builds,
9999
// which won't work because DispatchQueues aren't actually

stdlib/public/Concurrency/Task.cpp

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,29 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
187187
escalatedPriority = waitingStatus.getStoredPriority();
188188
}
189189

190+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
191+
// In the task to thread model, we will execute the task that we are waiting
192+
// on, on the current thread itself. As a result, do not bother adding the
193+
// waitingTask to any thread queue. Instead, we will clear the old task, run
194+
// the new one and then reattempt to continue running the old task
195+
196+
auto oldTask = _swift_task_clearCurrent();
197+
assert(oldTask == waitingTask);
198+
199+
SWIFT_TASK_DEBUG_LOG("[RunInline] Switching away from running %p to now running %p", oldTask, this);
200+
// Run the new task on the same thread now - this should run the new task to
201+
// completion. All swift tasks in task-to-thread model run on generic
202+
// executor
203+
swift_job_run(this, ExecutorRef::generic());
204+
205+
SWIFT_TASK_DEBUG_LOG("[RunInline] Switching back from running %p to now running %p", this, oldTask);
206+
207+
// We now are back in the context of the waiting task and need to reevaluate
208+
// our state
209+
_swift_task_setCurrent(oldTask);
210+
queueHead = fragment->waitQueue.load(std::memory_order_acquire);
211+
continue;
212+
#else
190213
// Put the waiting task at the beginning of the wait queue.
191214
waitingTask->getNextWaitingTask() = queueHead.getTask();
192215
auto newQueueHead = WaitQueueItem::get(Status::Executing, waitingTask);
@@ -198,6 +221,7 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
198221
_swift_task_clearCurrent();
199222
return FutureFragment::Status::Executing;
200223
}
224+
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
201225
}
202226
}
203227

@@ -255,8 +279,13 @@ void AsyncTask::completeFuture(AsyncContext *context) {
255279
// Schedule every waiting task on the executor.
256280
auto waitingTask = queueHead.getTask();
257281

258-
if (!waitingTask)
282+
if (!waitingTask) {
259283
SWIFT_TASK_DEBUG_LOG("task %p had no waiting tasks", this);
284+
} else {
285+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
286+
assert(false && "Task should have no waiters in task-to-thread model");
287+
#endif
288+
}
260289

261290
while (waitingTask) {
262291
// Find the next waiting task before we invalidate it by resuming

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ void asyncLet_addImpl(AsyncTask *task, AsyncLet *asyncLet,
7979

8080
/// Clear the active task reference for the current thread.
8181
AsyncTask *_swift_task_clearCurrent();
82+
/// Set the active task reference for the current thread.
83+
AsyncTask *_swift_task_setCurrent(AsyncTask *newTask);
8284

8385
/// release() establishes a happens-before relation with a preceding acquire()
8486
/// on the same address.
@@ -89,7 +91,7 @@ void _swift_tsan_release(void *addr);
8991
/// executors.
9092
#define DISPATCH_QUEUE_GLOBAL_EXECUTOR (void *)1
9193

92-
#if !SWIFT_STDLIB_SINGLE_THREADED_CONCURRENCY
94+
#if SWIFT_CONCURRENCY_ENABLE_DISPATCH
9395
inline SerialExecutorWitnessTable *
9496
_swift_task_getDispatchQueueSerialExecutorWitnessTable() {
9597
extern SerialExecutorWitnessTable wtable
@@ -712,7 +714,9 @@ retry:;
712714
///
713715
/// rdar://88366470 (Direct handoff behaviour when tasks switch executors)
714716
inline void AsyncTask::flagAsAndEnqueueOnExecutor(ExecutorRef newExecutor) {
715-
717+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
718+
assert(false && "Should not enqueue any tasks to execute in task-to-thread model");
719+
#else
716720
SWIFT_TASK_DEBUG_LOG("%p->flagAsAndEnqueueOnExecutor()", this);
717721
auto oldStatus = _private()._status().load(std::memory_order_relaxed);
718722
auto newStatus = oldStatus;
@@ -746,7 +750,7 @@ inline void AsyncTask::flagAsAndEnqueueOnExecutor(ExecutorRef newExecutor) {
746750
oldStatus.getStoredPriority(), this);
747751
swift_dispatch_lock_override_end((qos_class_t) oldStatus.getStoredPriority());
748752
}
749-
#endif
753+
#endif /* SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION */
750754
swift_task_exitThreadLocalContext((char *)&_private().ExclusivityAccessSet[0]);
751755
restoreTaskVoucher(this);
752756
}
@@ -759,6 +763,7 @@ inline void AsyncTask::flagAsAndEnqueueOnExecutor(ExecutorRef newExecutor) {
759763
Flags.task_isAsyncLetTask());
760764

761765
swift_task_enqueue(this, newExecutor);
766+
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
762767
}
763768

764769
inline void AsyncTask::flagAsSuspended() {
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// RUN: %target-run-simple-swift(-parse-as-library -Xfrontend -disable-availability-checking -Xfrontend -concurrency-model=task-to-thread)
2+
3+
// REQUIRES: concurrency
4+
// REQUIRES: executable_test
5+
// REQUIRES: freestanding
6+
// REQUIRES: concurrency_runtime
7+
8+
@_spi(_TaskToThreadModel) import _Concurrency
9+
import StdlibUnittest
10+
import Darwin
11+
12+
var a = 0;
13+
14+
func foo() async -> Int {
15+
a += 1
16+
return a
17+
}
18+
19+
func bar() async -> Int {
20+
a += 2
21+
return a
22+
}
23+
24+
func asyncFib(_ n: Int, _ expected_thread: pthread_t) async -> Int {
25+
expectEqual(pthread_self(), expected_thread);
26+
27+
if n == 0 || n == 1 {
28+
return n
29+
}
30+
31+
async let first = asyncFib(n-2, expected_thread)
32+
async let second = asyncFib(n-1, expected_thread)
33+
34+
return (await second) + (await first)
35+
}
36+
37+
func fib(_ n: Int) -> Int {
38+
var first = 0
39+
var second = 1
40+
for _ in 0..<n {
41+
let temp = first
42+
first = second
43+
second = temp + first
44+
}
45+
return first
46+
}
47+
48+
@main struct Main {
49+
static func main() {
50+
51+
let tests = TestSuite("Execute child task on await")
52+
tests.test("Basic async let only execute on await") {
53+
Task.runInline {
54+
async let a = foo()
55+
async let b = bar()
56+
57+
expectEqual(await b, 2)
58+
expectEqual(await a, 3)
59+
// Re-querying a completed task should return original result and not
60+
// reexecute it
61+
expectEqual(await b, 2)
62+
}
63+
}
64+
65+
tests.test("Nested async lets that only execute on await") {
66+
Task.runInline {
67+
let result = await asyncFib(10, pthread_self())
68+
expectEqual(result, fib(10))
69+
}
70+
}
71+
72+
runAllTests()
73+
}
74+
}

0 commit comments

Comments
 (0)