Skip to content

Commit 3aeb1a3

Browse files
ktosoxedin
authored andcommitted
[Concurrency] Improve in order synchronous enqueue of startSynchronously
Previously there was still a sneaky hop which caused ordering issues. This introduced a specific test startSynchronously_order which checks that the task enqueues indeed are "immediate" and cleans up how we handle this. This also prepares for the being discussed in SE review direction of this API that it SHOULD be ALLOWED to actually hop and NOT be synchronous at all IF the isolation is specified on the closure and is DIFFERENT than the callers dynamic isolation. This effectively implements "synchronously run right now if dynamically on the exact isolation as requested by the closure; otherwise enqueue the task as usual". resolves rdar://149284186 cc @drexin (cherry picked from commit a24a28c)
1 parent 0fbf660 commit 3aeb1a3

File tree

8 files changed

+228
-26
lines changed

8 files changed

+228
-26
lines changed

include/swift/Runtime/Concurrency.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1063,7 +1063,7 @@ SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
10631063
void swift_task_startOnMainActor(AsyncTask* job);
10641064

10651065
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
1066-
void swift_task_startSynchronously(AsyncTask* job);
1066+
void swift_task_startSynchronously(AsyncTask* job, SerialExecutorRef targetExecutor);
10671067

10681068
/// Donate this thread to the global executor until either the
10691069
/// given condition returns true or we've run out of cooperative

stdlib/public/CompatibilityOverride/CompatibilityOverrideConcurrency.def

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,8 @@ OVERRIDE_TASK(task_startOnMainActor, void,
441441
// In ACTOR since we need ExecutorTracking info
442442
OVERRIDE_ACTOR(task_startSynchronously, void,
443443
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift),
444-
swift::, (AsyncTask *task), (task))
444+
swift::, (AsyncTask *task, SerialExecutorRef targetExecutor),
445+
(task, targetExecutor))
445446

446447
#undef OVERRIDE
447448
#undef OVERRIDE_ACTOR

stdlib/public/Concurrency/Actor.cpp

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2510,22 +2510,31 @@ static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContex
25102510
}
25112511

25122512
SWIFT_CC(swift)
2513-
static void swift_task_startSynchronouslyImpl(AsyncTask* task) {
2513+
static void
2514+
swift_task_startSynchronouslyImpl(AsyncTask *task,
2515+
SerialExecutorRef targetExecutor) {
25142516
swift_retain(task);
2515-
2516-
auto currentTracking = ExecutorTrackingInfo::current();
2517-
if (currentTracking) {
2518-
auto currentExecutor = currentTracking->getActiveExecutor();
2519-
AsyncTask * originalTask = _swift_task_clearCurrent();
2520-
2521-
swift_job_run(task, currentExecutor);
2522-
_swift_task_setCurrent(originalTask);
2517+
if (targetExecutor.isGeneric()) {
2518+
// If the target is generic, it means that the closure did not specify
2519+
// an isolation explicitly. According to the "start synchronously" rules,
2520+
// we should therefore ignore the global and just start running on the
2521+
// caller immediately.
2522+
SerialExecutorRef executor = SerialExecutorRef::forSynchronousStart();
2523+
2524+
auto originalTask = ActiveTask::swap(task);
2525+
swift_job_run(task, executor);
2526+
_swift_task_setCurrent(originalTask);
25232527
} else {
2524-
auto originalTask = ActiveTask::swap(task);
2525-
assert(!originalTask);
2528+
assert(swift_task_isCurrentExecutor(targetExecutor) &&
2529+
"startSynchronously must only be invoked when it is correctly in "
2530+
"the same isolation already, but wasn't!");
2531+
2532+
// We can run synchronously, we're on the expected executor so running in
2533+
// the caller context is going to be in the same context as the requested
2534+
// "caller" context.
2535+
AsyncTask *originalTask = _swift_task_clearCurrent();
25262536

2527-
SerialExecutorRef executor = SerialExecutorRef::forSynchronousStart();
2528-
swift_job_run(task, executor);
2537+
swift_job_run(task, targetExecutor);
25292538
_swift_task_setCurrent(originalTask);
25302539
}
25312540
}

stdlib/public/Concurrency/Task+startSynchronously.swift.gyb

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,30 @@ extension Task where Failure == ${FAILURE_TYPE} {
5959
public static func startSynchronously(
6060
name: String? = nil,
6161
priority: TaskPriority? = nil,
62-
@_inheritActorContext @_implicitSelfCapture _ operation: __owned sending @escaping () async throws -> Success
62+
% # NOTE: This closure cannot be 'sending' because we'll trigger ' pattern that the region based isolation checker does not understand how to check'
63+
% # In this case: `func syncOnMyGlobalActor() { Task.startSynchronously { @MyGlobalActor in } }`
64+
@_implicitSelfCapture _ operation: __owned @isolated(any) @escaping () async throws -> Success
6365
) -> Task<Success, ${FAILURE_TYPE}> {
66+
67+
let builtinSerialExecutor =
68+
unsafe Builtin.extractFunctionIsolation(operation)?.unownedExecutor.executor
69+
70+
// Determine if we're switching isolation dynamically.
71+
// If not, we can run the task synchronously and therefore MUST NOT "enqueue" it.
72+
let flagsMustNotCrash: UInt64 = 0
73+
let canRunSynchronously: Bool =
74+
if let builtinSerialExecutor {
75+
_taskIsCurrentExecutor(executor: builtinSerialExecutor, flags: flagsMustNotCrash)
76+
} else {
77+
true // if there is not target executor, we can run synchronously
78+
}
79+
6480
let flags = taskCreateFlags(
6581
priority: priority,
6682
isChildTask: false,
6783
copyTaskLocals: true,
6884
inheritContext: true,
69-
enqueueJob: false, // don't enqueue, we'll run it manually
85+
enqueueJob: !canRunSynchronously,
7086
addPendingGroupTaskUnconditionally: false,
7187
isDiscardingTask: false,
7288
isSynchronousStart: true
@@ -79,6 +95,7 @@ extension Task where Failure == ${FAILURE_TYPE} {
7995
unsafe name.utf8CString.withUnsafeBufferPointer { nameBytes in
8096
Builtin.createTask(
8197
flags: flags,
98+
initialSerialExecutor: builtinSerialExecutor,
8299
taskName: nameBytes.baseAddress!._rawValue,
83100
operation: operation).0
84101
}
@@ -91,7 +108,9 @@ extension Task where Failure == ${FAILURE_TYPE} {
91108
operation: operation).0
92109
}
93110

94-
_startTaskSynchronously(task!)
111+
if canRunSynchronously {
112+
_startTaskSynchronously(task!, targetExecutor: builtinSerialExecutor)
113+
}
95114
return Task<Success, ${FAILURE_TYPE}>(task!)
96115
}
97116
}
@@ -161,7 +180,7 @@ extension ${GROUP_TYPE} {
161180
public func ${METHOD_NAME}( // in ${GROUP_TYPE}
162181
name: String? = nil,
163182
priority: TaskPriority? = nil,
164-
operation: sending @escaping () async ${THROWS}-> ${RESULT_TYPE}
183+
@_inheritActorContext @_implicitSelfCapture operation: sending @isolated(any) @escaping () async ${THROWS}-> ${RESULT_TYPE}
165184
) {
166185
let flags = taskCreateFlags(
167186
priority: priority,
@@ -174,13 +193,18 @@ extension ${GROUP_TYPE} {
174193
isSynchronousStart: true
175194
)
176195

196+
// Create the asynchronous task.
197+
let builtinSerialExecutor =
198+
unsafe Builtin.extractFunctionIsolation(operation)?.unownedExecutor.executor
199+
177200
// Create the task in this group.
178201
let (task, _) = Builtin.createTask(
179202
flags: flags,
203+
initialSerialExecutor: builtinSerialExecutor,
180204
taskGroup: self._group,
181205
operation: operation
182206
)
183-
_startTaskSynchronously(task)
207+
_startTaskSynchronously(task, targetExecutor: builtinSerialExecutor)
184208
}
185209
}
186210
% end # METHOD_NAMES
@@ -241,4 +265,4 @@ extension Task where Failure == ${FAILURE_TYPE} {
241265
internal func _startTaskOnMainActor(_ task: Builtin.NativeObject)
242266

243267
@_silgen_name("swift_task_startSynchronously")
244-
internal func _startTaskSynchronously(_ task: Builtin.NativeObject)
268+
internal func _startTaskSynchronously(_ task: Builtin.NativeObject, targetExecutor: Builtin.Executor?)

stdlib/public/Concurrency/Task.swift

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -807,8 +807,7 @@ extension Task where Failure == Error {
807807
unsafe Builtin.extractFunctionIsolation(operation)?.unownedExecutor.executor
808808

809809
let (task, _) = Builtin.createTask(flags: flags,
810-
initialSerialExecutor:
811-
builtinSerialExecutor,
810+
initialSerialExecutor: builtinSerialExecutor,
812811
operation: operation)
813812

814813
self._task = task

test/Concurrency/Runtime/startSynchronously.swift

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
// REQUIRES: rdar145735542
21
// RUN: %empty-directory(%t)
32
// RUN: %target-build-swift -Xfrontend -disable-availability-checking %s %import-libdispatch -swift-version 6 -o %t/a.out
43
// RUN: %target-codesign %t/a.out
@@ -62,6 +61,42 @@ extension ThreadID: @unchecked Sendable {}
6261
@globalActor
6362
actor MyGlobalActor {
6463
static let shared: MyGlobalActor = MyGlobalActor()
64+
65+
@MyGlobalActor
66+
static func test() {}
67+
}
68+
69+
final class NaiveQueueExecutor: SerialExecutor {
70+
let queue: DispatchQueue
71+
72+
init(queue: DispatchQueue) {
73+
self.queue = queue
74+
}
75+
76+
public func enqueue(_ job: consuming ExecutorJob) {
77+
let unowned = UnownedJob(job)
78+
print("NaiveQueueExecutor(\(self.queue.label)) enqueue [thread:\(getCurrentThreadID())]")
79+
queue.async {
80+
unowned.runSynchronously(on: self.asUnownedSerialExecutor())
81+
}
82+
}
83+
}
84+
85+
@globalActor
86+
actor DifferentGlobalActor {
87+
static let queue = DispatchQueue(label: "DifferentGlobalActor-queue")
88+
let executor: NaiveQueueExecutor
89+
nonisolated let unownedExecutor: UnownedSerialExecutor
90+
91+
init() {
92+
self.executor = NaiveQueueExecutor(queue: DifferentGlobalActor.queue)
93+
self.unownedExecutor = executor.asUnownedSerialExecutor()
94+
}
95+
96+
static let shared: DifferentGlobalActor = DifferentGlobalActor()
97+
98+
@DifferentGlobalActor
99+
static func test() {}
65100
}
66101

67102
final class NaiveQueueExecutor: SerialExecutor {
@@ -111,6 +146,49 @@ func syncOnMyGlobalActor() -> [Task<Void, Never>] {
111146
return [t1, tt]
112147
}
113148

149+
func syncOnMyGlobalActorHopToDifferentActor() -> [Task<Void, Never>] {
150+
MyGlobalActor.shared.preconditionIsolated("Should be executing on the global actor here")
151+
print("Confirmed to be on @MyGlobalActor")
152+
153+
// This task must be guaranteed to happen AFTER 'tt' because we are already on this actor
154+
// so this enqueue must happen after we give up the actor.
155+
print("schedule Task { @DifferentGlobalActor }, before startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)")
156+
let t1 = Task { @DifferentGlobalActor in
157+
print("inside Task { @DifferentGlobalActor } [thread:\(getCurrentThreadID())] @ :\(#line)")
158+
DifferentGlobalActor.shared.preconditionIsolated("Expected Task{} to be on DifferentGlobalActor")
159+
}
160+
161+
print("before startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)")
162+
let outerTID = getCurrentThreadID()
163+
let tt = Task.startSynchronously { @DifferentGlobalActor in
164+
let innerTID = getCurrentThreadID()
165+
print("inside startSynchronously, outer thread = \(outerTID)")
166+
print("inside startSynchronously, inner thread = \(innerTID)")
167+
if (compareThreadIDs(outerTID, .equal, innerTID)) {
168+
// This case specifically is NOT synchronously run because we specified a different isolation for the closure
169+
// and FORCED a hop to the DifferentGlobalActor executor.
170+
print("ERROR! Outer Thread ID must NOT equal Thread ID inside runSynchronously synchronous part!")
171+
}
172+
// We crucially need to see this task be enqueued on the different global actor,
173+
// so it did not execute "synchronously" after all - it had to hop to the other actor.
174+
dispatchPrecondition(condition: .onQueue(DifferentGlobalActor.queue))
175+
DifferentGlobalActor.shared.preconditionIsolated("Expected Task.startSynchronously { @DifferentGlobalActor in } to be on DifferentGlobalActor")
176+
177+
print("inside startSynchronously, sleep now [thread:\(getCurrentThreadID())] @ :\(#line)")
178+
_ = try? await Task.sleep(for: .milliseconds(100))
179+
180+
print("inside startSynchronously, after sleep [thread:\(getCurrentThreadID())] @ :\(#line)")
181+
dispatchPrecondition(condition: .onQueue(DifferentGlobalActor.queue))
182+
DifferentGlobalActor.shared.preconditionIsolated("Expected Task.startSynchronously { @DifferentGlobalActor in } to be on DifferentGlobalActor")
183+
184+
// do something here
185+
await MyGlobalActor.test()
186+
DifferentGlobalActor.test()
187+
}
188+
189+
return [t1, tt]
190+
}
191+
114192
func syncOnNonTaskThread(synchronousTask behavior: SynchronousTaskBehavior) {
115193
let sem1 = DispatchSemaphore(value: 0)
116194
let sem2 = DispatchSemaphore(value: 0)
@@ -179,6 +257,33 @@ await Task { @MyGlobalActor in
179257
// resume on some other thread
180258
// CHECK: after sleep, inside startSynchronously
181259

260+
print("\n\n==== ------------------------------------------------------------------")
261+
print("syncOnMyGlobalActorHopToDifferentActor()")
262+
263+
await Task { @MyGlobalActor in
264+
MyGlobalActor.shared.preconditionIsolated("Should be executing on the global actor here")
265+
for t in syncOnMyGlobalActorHopToDifferentActor() {
266+
await t.value
267+
}
268+
}.value
269+
270+
// Assertion Notes: We expect the task to be on the specified queue as we force the Task.startSynchronously
271+
// task to enqueue on the DifferentGlobalActor, however we CANNOT use threads to verify this behavior,
272+
// because dispatch may still pull tricks and reuse threads. We can only verify that we're on the right
273+
// queue, and that the `enqueue` calls on the target executor happen when we expect them to.
274+
//
275+
// CHECK: syncOnMyGlobalActorHopToDifferentActor()
276+
// CHECK: Confirmed to be on @MyGlobalActor
277+
// CHECK: before startSynchronously
278+
279+
// This IS actually enqueueing on the target actor (not synchronous), as expected:
280+
// CHECK: NaiveQueueExecutor(DifferentGlobalActor-queue) enqueue
281+
// CHECK: inside startSynchronously, sleep now
282+
283+
// After the sleep we get back onto the specified executor as expected
284+
// CHECK: NaiveQueueExecutor(DifferentGlobalActor-queue) enqueue
285+
// CHECK: inside startSynchronously, after sleep
286+
182287
print("\n\n==== ------------------------------------------------------------------")
183288
var behavior: SynchronousTaskBehavior = .suspend
184289
print("syncOnNonTaskThread(synchronousTask: \(behavior))")
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// RUN: %empty-directory(%t)
2+
// RUN: %target-build-swift -Xfrontend -disable-availability-checking %s %import-libdispatch -swift-version 6 -o %t/a.out
3+
// RUN: %target-codesign %t/a.out
4+
// RUN: %target-run %t/a.out | %FileCheck %s
5+
6+
// REQUIRES: executable_test
7+
// REQUIRES: concurrency
8+
// REQUIRES: concurrency_runtime
9+
10+
// UNSUPPORTED: back_deployment_runtime
11+
// UNSUPPORTED: back_deploy_concurrency
12+
// UNSUPPORTED: use_os_stdlib
13+
// UNSUPPORTED: freestanding
14+
15+
import _Concurrency
16+
17+
let max = 1000
18+
19+
func bar(x: Int, cc: CheckedContinuation<Void, Never>) {
20+
Task.startSynchronously {
21+
print("Task \(x) started")
22+
try! await Task.sleep(nanoseconds: 10000)
23+
if (x == max) {
24+
cc.resume()
25+
}
26+
}
27+
}
28+
29+
await withCheckedContinuation { (cc: CheckedContinuation<Void, Never>) in
30+
for i in 1...max {
31+
bar(x: i, cc: cc)
32+
}
33+
}
34+
35+
// CHECK: Task 1 started
36+
// CHECK: Task 2 started
37+
// CHECK: Task 3 started
38+
// CHECK: Task 4 started
39+
// CHECK: Task 5 started
40+
// CHECK: Task 6 started
41+
// CHECK: Task 7 started
42+
// CHECK: Task 8 started
43+
// CHECK: Task 9 started
44+
// CHECK: Task 10 started
45+
// CHECK: Task 11 started
46+
// CHECK: Task 12 started
47+
// CHECK: Task 13 started
48+
// CHECK: Task 14 started
49+
// CHECK: Task 15 started
50+
// CHECK: Task 16 started
51+
// CHECK: Task 17 started
52+
// CHECK: Task 18 started
53+
// CHECK: Task 19 started
54+
// CHECK: Task 20 started
55+
// CHECK: Task 21 started
56+
// CHECK: Task 22 started
57+
// CHECK: Task 23 started
58+
// CHECK: Task 24 started
59+
// CHECK: Task 25 started
60+
// CHECK: Task 26 started
61+
// CHECK: Task 27 started
62+
// CHECK: Task 28 started
63+
// CHECK: Task 29 started
64+
// CHECK: Task 30 started

unittests/runtime/CompatibilityOverrideConcurrency.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ static void swift_task_startOnMainActor_override(AsyncTask* task) {
118118
}
119119

120120
SWIFT_CC(swift)
121-
static void swift_task_startSynchronously_override(AsyncTask* task) {
121+
static void swift_task_startSynchronously_override(AsyncTask* task, SerialExecutorRef targetExecutor) {
122122
Ran = true;
123123
}
124124

@@ -351,7 +351,7 @@ TEST_F(CompatibilityOverrideConcurrencyTest, test_swift_startOnMainActorImpl) {
351351
}
352352

353353
TEST_F(CompatibilityOverrideConcurrencyTest, test_swift_startSynchronously) {
354-
swift_task_startSynchronously(nullptr);
354+
swift_task_startSynchronously(nullptr, SerialExecutorRef::generic());
355355
}
356356

357357
TEST_F(CompatibilityOverrideConcurrencyTest,

0 commit comments

Comments
 (0)