Skip to content

Commit 2dd6348

Browse files
authored
Merge pull request #80821 from ktoso/wip-refine-scheduling-of-task-start-synchronously
[Concurrency] Improve in order synchronous enqueue of startSynchronously
2 parents 198a802 + b76abb4 commit 2dd6348

File tree

10 files changed

+254
-63
lines changed

10 files changed

+254
-63
lines changed

include/swift/Runtime/Concurrency.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1069,7 +1069,7 @@ SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
10691069
void swift_task_startOnMainActor(AsyncTask* job);
10701070

10711071
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
1072-
void swift_task_startSynchronously(AsyncTask* job);
1072+
void swift_task_startSynchronously(AsyncTask* job, SerialExecutorRef targetExecutor);
10731073

10741074
/// Donate this thread to the global executor until either the
10751075
/// given condition returns true or we've run out of cooperative

stdlib/public/CompatibilityOverride/CompatibilityOverrideConcurrency.def

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,8 @@ OVERRIDE_TASK(task_startOnMainActor, void,
441441
// In ACTOR since we need ExecutorTracking info
442442
OVERRIDE_ACTOR(task_startSynchronously, void,
443443
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift),
444-
swift::, (AsyncTask *task), (task))
444+
swift::, (AsyncTask *task, SerialExecutorRef targetExecutor),
445+
(task, targetExecutor))
445446

446447
#undef OVERRIDE
447448
#undef OVERRIDE_ACTOR

stdlib/public/Concurrency/Actor.cpp

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2531,22 +2531,31 @@ static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContex
25312531
}
25322532

25332533
SWIFT_CC(swift)
2534-
static void swift_task_startSynchronouslyImpl(AsyncTask* task) {
2534+
static void
2535+
swift_task_startSynchronouslyImpl(AsyncTask *task,
2536+
SerialExecutorRef targetExecutor) {
25352537
swift_retain(task);
2536-
2537-
auto currentTracking = ExecutorTrackingInfo::current();
2538-
if (currentTracking) {
2539-
auto currentExecutor = currentTracking->getActiveExecutor();
2540-
AsyncTask * originalTask = _swift_task_clearCurrent();
2541-
2542-
swift_job_run(task, currentExecutor);
2543-
_swift_task_setCurrent(originalTask);
2538+
if (targetExecutor.isGeneric()) {
2539+
// If the target is generic, it means that the closure did not specify
2540+
// an isolation explicitly. According to the "start synchronously" rules,
2541+
// we should therefore ignore the global and just start running on the
2542+
// caller immediately.
2543+
SerialExecutorRef executor = SerialExecutorRef::forSynchronousStart();
2544+
2545+
auto originalTask = ActiveTask::swap(task);
2546+
swift_job_run(task, executor);
2547+
_swift_task_setCurrent(originalTask);
25442548
} else {
2545-
auto originalTask = ActiveTask::swap(task);
2546-
assert(!originalTask);
2549+
assert(swift_task_isCurrentExecutor(targetExecutor) &&
2550+
"startSynchronously must only be invoked when it is correctly in "
2551+
"the same isolation already, but wasn't!");
2552+
2553+
// We can run synchronously, we're on the expected executor so running in
2554+
// the caller context is going to be in the same context as the requested
2555+
// "caller" context.
2556+
AsyncTask *originalTask = _swift_task_clearCurrent();
25472557

2548-
SerialExecutorRef executor = SerialExecutorRef::forSynchronousStart();
2549-
swift_job_run(task, executor);
2558+
swift_job_run(task, targetExecutor);
25502559
_swift_task_setCurrent(originalTask);
25512560
}
25522561
}

stdlib/public/Concurrency/Task+startSynchronously.swift.gyb

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,30 @@ extension Task where Failure == ${FAILURE_TYPE} {
5959
public static func startSynchronously(
6060
name: String? = nil,
6161
priority: TaskPriority? = nil,
62-
@_inheritActorContext @_implicitSelfCapture _ operation: __owned sending @escaping () async throws -> Success
62+
% # NOTE: This closure cannot be 'sending' because we'll trigger ' pattern that the region based isolation checker does not understand how to check'
63+
% # In this case: `func syncOnMyGlobalActor() { Task.startSynchronously { @MyGlobalActor in } }`
64+
@_implicitSelfCapture _ operation: __owned @isolated(any) @escaping () async throws -> Success
6365
) -> Task<Success, ${FAILURE_TYPE}> {
66+
67+
let builtinSerialExecutor =
68+
unsafe Builtin.extractFunctionIsolation(operation)?.unownedExecutor.executor
69+
70+
// Determine if we're switching isolation dynamically.
71+
// If not, we can run the task synchronously and therefore MUST NOT "enqueue" it.
72+
let flagsMustNotCrash: UInt64 = 0
73+
let canRunSynchronously: Bool =
74+
if let builtinSerialExecutor {
75+
_taskIsCurrentExecutor(executor: builtinSerialExecutor, flags: flagsMustNotCrash)
76+
} else {
77+
true // if there is not target executor, we can run synchronously
78+
}
79+
6480
let flags = taskCreateFlags(
6581
priority: priority,
6682
isChildTask: false,
6783
copyTaskLocals: true,
6884
inheritContext: true,
69-
enqueueJob: false, // don't enqueue, we'll run it manually
85+
enqueueJob: !canRunSynchronously,
7086
addPendingGroupTaskUnconditionally: false,
7187
isDiscardingTask: false,
7288
isSynchronousStart: true
@@ -79,6 +95,7 @@ extension Task where Failure == ${FAILURE_TYPE} {
7995
unsafe name.utf8CString.withUnsafeBufferPointer { nameBytes in
8096
Builtin.createTask(
8197
flags: flags,
98+
initialSerialExecutor: builtinSerialExecutor,
8299
taskName: nameBytes.baseAddress!._rawValue,
83100
operation: operation).0
84101
}
@@ -91,7 +108,9 @@ extension Task where Failure == ${FAILURE_TYPE} {
91108
operation: operation).0
92109
}
93110

94-
_startTaskSynchronously(task!)
111+
if canRunSynchronously {
112+
_startTaskSynchronously(task!, targetExecutor: builtinSerialExecutor)
113+
}
95114
return Task<Success, ${FAILURE_TYPE}>(task!)
96115
}
97116
}
@@ -161,7 +180,7 @@ extension ${GROUP_TYPE} {
161180
public func ${METHOD_NAME}( // in ${GROUP_TYPE}
162181
name: String? = nil,
163182
priority: TaskPriority? = nil,
164-
operation: sending @escaping () async ${THROWS}-> ${RESULT_TYPE}
183+
@_inheritActorContext @_implicitSelfCapture operation: sending @isolated(any) @escaping () async ${THROWS}-> ${RESULT_TYPE}
165184
) {
166185
let flags = taskCreateFlags(
167186
priority: priority,
@@ -174,13 +193,18 @@ extension ${GROUP_TYPE} {
174193
isSynchronousStart: true
175194
)
176195

196+
// Create the asynchronous task.
197+
let builtinSerialExecutor =
198+
unsafe Builtin.extractFunctionIsolation(operation)?.unownedExecutor.executor
199+
177200
// Create the task in this group.
178201
let (task, _) = Builtin.createTask(
179202
flags: flags,
203+
initialSerialExecutor: builtinSerialExecutor,
180204
taskGroup: self._group,
181205
operation: operation
182206
)
183-
_startTaskSynchronously(task)
207+
_startTaskSynchronously(task, targetExecutor: builtinSerialExecutor)
184208
}
185209
}
186210
% end # METHOD_NAMES
@@ -241,4 +265,4 @@ extension Task where Failure == ${FAILURE_TYPE} {
241265
internal func _startTaskOnMainActor(_ task: Builtin.NativeObject)
242266

243267
@_silgen_name("swift_task_startSynchronously")
244-
internal func _startTaskSynchronously(_ task: Builtin.NativeObject)
268+
internal func _startTaskSynchronously(_ task: Builtin.NativeObject, targetExecutor: Builtin.Executor?)

stdlib/public/Concurrency/Task.swift

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -807,8 +807,7 @@ extension Task where Failure == Error {
807807
unsafe Builtin.extractFunctionIsolation(operation)?.unownedExecutor.executor
808808

809809
let (task, _) = Builtin.createTask(flags: flags,
810-
initialSerialExecutor:
811-
builtinSerialExecutor,
810+
initialSerialExecutor: builtinSerialExecutor,
812811
operation: operation)
813812

814813
self._task = task

test/Concurrency/Runtime/startSynchronously.swift

Lines changed: 106 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
// REQUIRES: rdar145735542
21
// RUN: %empty-directory(%t)
32
// RUN: %target-build-swift -Xfrontend -disable-availability-checking %s %import-libdispatch -swift-version 6 -o %t/a.out
43
// RUN: %target-codesign %t/a.out
@@ -62,6 +61,42 @@ extension ThreadID: @unchecked Sendable {}
6261
@globalActor
6362
actor MyGlobalActor {
6463
static let shared: MyGlobalActor = MyGlobalActor()
64+
65+
@MyGlobalActor
66+
static func test() {}
67+
}
68+
69+
final class NaiveQueueExecutor: SerialExecutor {
70+
let queue: DispatchQueue
71+
72+
init(queue: DispatchQueue) {
73+
self.queue = queue
74+
}
75+
76+
public func enqueue(_ job: consuming ExecutorJob) {
77+
let unowned = UnownedJob(job)
78+
print("NaiveQueueExecutor(\(self.queue.label)) enqueue [thread:\(getCurrentThreadID())]")
79+
queue.async {
80+
unowned.runSynchronously(on: self.asUnownedSerialExecutor())
81+
}
82+
}
83+
}
84+
85+
@globalActor
86+
actor DifferentGlobalActor {
87+
static let queue = DispatchQueue(label: "DifferentGlobalActor-queue")
88+
let executor: NaiveQueueExecutor
89+
nonisolated let unownedExecutor: UnownedSerialExecutor
90+
91+
init() {
92+
self.executor = NaiveQueueExecutor(queue: DifferentGlobalActor.queue)
93+
self.unownedExecutor = executor.asUnownedSerialExecutor()
94+
}
95+
96+
static let shared: DifferentGlobalActor = DifferentGlobalActor()
97+
98+
@DifferentGlobalActor
99+
static func test() {}
65100
}
66101

67102
// Test on all platforms
@@ -94,6 +129,49 @@ func syncOnMyGlobalActor() -> [Task<Void, Never>] {
94129
return [t1, tt]
95130
}
96131

132+
func syncOnMyGlobalActorHopToDifferentActor() -> [Task<Void, Never>] {
133+
MyGlobalActor.shared.preconditionIsolated("Should be executing on the global actor here")
134+
print("Confirmed to be on @MyGlobalActor")
135+
136+
// This task must be guaranteed to happen AFTER 'tt' because we are already on this actor
137+
// so this enqueue must happen after we give up the actor.
138+
print("schedule Task { @DifferentGlobalActor }, before startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)")
139+
let t1 = Task { @DifferentGlobalActor in
140+
print("inside Task { @DifferentGlobalActor } [thread:\(getCurrentThreadID())] @ :\(#line)")
141+
DifferentGlobalActor.shared.preconditionIsolated("Expected Task{} to be on DifferentGlobalActor")
142+
}
143+
144+
print("before startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)")
145+
let outerTID = getCurrentThreadID()
146+
let tt = Task.startSynchronously { @DifferentGlobalActor in
147+
let innerTID = getCurrentThreadID()
148+
print("inside startSynchronously, outer thread = \(outerTID)")
149+
print("inside startSynchronously, inner thread = \(innerTID)")
150+
if (compareThreadIDs(outerTID, .equal, innerTID)) {
151+
// This case specifically is NOT synchronously run because we specified a different isolation for the closure
152+
// and FORCED a hop to the DifferentGlobalActor executor.
153+
print("ERROR! Outer Thread ID must NOT equal Thread ID inside runSynchronously synchronous part!")
154+
}
155+
// We crucially need to see this task be enqueued on the different global actor,
156+
// so it did not execute "synchronously" after all - it had to hop to the other actor.
157+
dispatchPrecondition(condition: .onQueue(DifferentGlobalActor.queue))
158+
DifferentGlobalActor.shared.preconditionIsolated("Expected Task.startSynchronously { @DifferentGlobalActor in } to be on DifferentGlobalActor")
159+
160+
print("inside startSynchronously, sleep now [thread:\(getCurrentThreadID())] @ :\(#line)")
161+
_ = try? await Task.sleep(for: .milliseconds(100))
162+
163+
print("inside startSynchronously, after sleep [thread:\(getCurrentThreadID())] @ :\(#line)")
164+
dispatchPrecondition(condition: .onQueue(DifferentGlobalActor.queue))
165+
DifferentGlobalActor.shared.preconditionIsolated("Expected Task.startSynchronously { @DifferentGlobalActor in } to be on DifferentGlobalActor")
166+
167+
// do something here
168+
await MyGlobalActor.test()
169+
DifferentGlobalActor.test()
170+
}
171+
172+
return [t1, tt]
173+
}
174+
97175
func syncOnNonTaskThread(synchronousTask behavior: SynchronousTaskBehavior) {
98176
let sem1 = DispatchSemaphore(value: 0)
99177
let sem2 = DispatchSemaphore(value: 0)
@@ -162,6 +240,33 @@ await Task { @MyGlobalActor in
162240
// resume on some other thread
163241
// CHECK: after sleep, inside startSynchronously
164242

243+
print("\n\n==== ------------------------------------------------------------------")
244+
print("syncOnMyGlobalActorHopToDifferentActor()")
245+
246+
await Task { @MyGlobalActor in
247+
MyGlobalActor.shared.preconditionIsolated("Should be executing on the global actor here")
248+
for t in syncOnMyGlobalActorHopToDifferentActor() {
249+
await t.value
250+
}
251+
}.value
252+
253+
// Assertion Notes: We expect the task to be on the specified queue as we force the Task.startSynchronously
254+
// task to enqueue on the DifferentGlobalActor, however we CANNOT use threads to verify this behavior,
255+
// because dispatch may still pull tricks and reuse threads. We can only verify that we're on the right
256+
// queue, and that the `enqueue` calls on the target executor happen when we expect them to.
257+
//
258+
// CHECK: syncOnMyGlobalActorHopToDifferentActor()
259+
// CHECK: Confirmed to be on @MyGlobalActor
260+
// CHECK: before startSynchronously
261+
262+
// This IS actually enqueueing on the target actor (not synchronous), as expected:
263+
// CHECK: NaiveQueueExecutor(DifferentGlobalActor-queue) enqueue
264+
// CHECK: inside startSynchronously, sleep now
265+
266+
// After the sleep we get back onto the specified executor as expected
267+
// CHECK: NaiveQueueExecutor(DifferentGlobalActor-queue) enqueue
268+
// CHECK: inside startSynchronously, after sleep
269+
165270
print("\n\n==== ------------------------------------------------------------------")
166271
var behavior: SynchronousTaskBehavior = .suspend
167272
print("syncOnNonTaskThread(synchronousTask: \(behavior))")
@@ -347,23 +452,6 @@ callActorFromStartSynchronousTask(recipient: .recipientOnQueue(RecipientOnQueue(
347452
// CHECK-NOT: ERROR!
348453
// CHECK: inside startSynchronously, done
349454

350-
final class NaiveQueueExecutor: SerialExecutor {
351-
let queue: DispatchQueue
352-
353-
init(queue: DispatchQueue) {
354-
self.queue = queue
355-
}
356-
357-
public func enqueue(_ job: consuming ExecutorJob) {
358-
let unowned = UnownedJob(job)
359-
print("NaiveQueueExecutor(\(self.queue.label)) enqueue... [thread:\(getCurrentThreadID())]")
360-
queue.async {
361-
print("NaiveQueueExecutor(\(self.queue.label)) enqueue: run [thread:\(getCurrentThreadID())]")
362-
unowned.runSynchronously(on: self.asUnownedSerialExecutor())
363-
}
364-
}
365-
}
366-
367455
actor RecipientOnQueue {
368456
let executor: NaiveQueueExecutor
369457
nonisolated let unownedExecutor: UnownedSerialExecutor
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// RUN: %empty-directory(%t)
2+
// RUN: %target-build-swift -Xfrontend -disable-availability-checking %s %import-libdispatch -swift-version 6 -o %t/a.out
3+
// RUN: %target-codesign %t/a.out
4+
// RUN: %target-run %t/a.out | %FileCheck %s
5+
6+
// REQUIRES: executable_test
7+
// REQUIRES: concurrency
8+
// REQUIRES: concurrency_runtime
9+
10+
// UNSUPPORTED: back_deployment_runtime
11+
// UNSUPPORTED: back_deploy_concurrency
12+
// UNSUPPORTED: use_os_stdlib
13+
// UNSUPPORTED: freestanding
14+
15+
import _Concurrency
16+
17+
let max = 1000
18+
19+
func bar(x: Int, cc: CheckedContinuation<Void, Never>) {
20+
Task.startSynchronously {
21+
print("Task \(x) started")
22+
try! await Task.sleep(nanoseconds: 10000)
23+
if (x == max) {
24+
cc.resume()
25+
}
26+
}
27+
}
28+
29+
await withCheckedContinuation { (cc: CheckedContinuation<Void, Never>) in
30+
for i in 1...max {
31+
bar(x: i, cc: cc)
32+
}
33+
}
34+
35+
// CHECK: Task 1 started
36+
// CHECK: Task 2 started
37+
// CHECK: Task 3 started
38+
// CHECK: Task 4 started
39+
// CHECK: Task 5 started
40+
// CHECK: Task 6 started
41+
// CHECK: Task 7 started
42+
// CHECK: Task 8 started
43+
// CHECK: Task 9 started
44+
// CHECK: Task 10 started
45+
// CHECK: Task 11 started
46+
// CHECK: Task 12 started
47+
// CHECK: Task 13 started
48+
// CHECK: Task 14 started
49+
// CHECK: Task 15 started
50+
// CHECK: Task 16 started
51+
// CHECK: Task 17 started
52+
// CHECK: Task 18 started
53+
// CHECK: Task 19 started
54+
// CHECK: Task 20 started
55+
// CHECK: Task 21 started
56+
// CHECK: Task 22 started
57+
// CHECK: Task 23 started
58+
// CHECK: Task 24 started
59+
// CHECK: Task 25 started
60+
// CHECK: Task 26 started
61+
// CHECK: Task 27 started
62+
// CHECK: Task 28 started
63+
// CHECK: Task 29 started
64+
// CHECK: Task 30 started

0 commit comments

Comments
 (0)