Skip to content

[Concurrency][startSynchronously] Improve in order synchronous enqueue of startSynchronously #80821

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/swift/Runtime/Concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,7 @@ SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void swift_task_startOnMainActor(AsyncTask* job);

SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void swift_task_startSynchronously(AsyncTask* job);
void swift_task_startSynchronously(AsyncTask* job, SerialExecutorRef targetExecutor);

/// Donate this thread to the global executor until either the
/// given condition returns true or we've run out of cooperative
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,8 @@ OVERRIDE_TASK(task_startOnMainActor, void,
// In ACTOR since we need ExecutorTracking info
OVERRIDE_ACTOR(task_startSynchronously, void,
SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift),
swift::, (AsyncTask *task), (task))
swift::, (AsyncTask *task, SerialExecutorRef targetExecutor),
(task, targetExecutor))

#undef OVERRIDE
#undef OVERRIDE_ACTOR
Expand Down
35 changes: 22 additions & 13 deletions stdlib/public/Concurrency/Actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2531,22 +2531,31 @@ static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContex
}

SWIFT_CC(swift)
static void swift_task_startSynchronouslyImpl(AsyncTask* task) {
static void
swift_task_startSynchronouslyImpl(AsyncTask *task,
SerialExecutorRef targetExecutor) {
swift_retain(task);

auto currentTracking = ExecutorTrackingInfo::current();
if (currentTracking) {
auto currentExecutor = currentTracking->getActiveExecutor();
AsyncTask * originalTask = _swift_task_clearCurrent();

swift_job_run(task, currentExecutor);
_swift_task_setCurrent(originalTask);
if (targetExecutor.isGeneric()) {
// If the target is generic, it means that the closure did not specify
// an isolation explicitly. According to the "start synchronously" rules,
// we should therefore ignore the global and just start running on the
// caller immediately.
SerialExecutorRef executor = SerialExecutorRef::forSynchronousStart();

auto originalTask = ActiveTask::swap(task);
swift_job_run(task, executor);
_swift_task_setCurrent(originalTask);
} else {
auto originalTask = ActiveTask::swap(task);
assert(!originalTask);
assert(swift_task_isCurrentExecutor(targetExecutor) &&
"startSynchronously must only be invoked when it is correctly in "
"the same isolation already, but wasn't!");

// We can run synchronously, we're on the expected executor so running in
// the caller context is going to be in the same context as the requested
// "caller" context.
AsyncTask *originalTask = _swift_task_clearCurrent();

SerialExecutorRef executor = SerialExecutorRef::forSynchronousStart();
swift_job_run(task, executor);
swift_job_run(task, targetExecutor);
_swift_task_setCurrent(originalTask);
}
}
Expand Down
36 changes: 30 additions & 6 deletions stdlib/public/Concurrency/Task+startSynchronously.swift.gyb
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,30 @@ extension Task where Failure == ${FAILURE_TYPE} {
public static func startSynchronously(
name: String? = nil,
priority: TaskPriority? = nil,
@_inheritActorContext @_implicitSelfCapture _ operation: __owned sending @escaping () async throws -> Success
% # NOTE: This closure cannot be 'sending' because we'll trigger ' pattern that the region based isolation checker does not understand how to check'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: figure out the exact signature we want here

% # In this case: `func syncOnMyGlobalActor() { Task.startSynchronously { @MyGlobalActor in } }`
@_implicitSelfCapture _ operation: __owned @isolated(any) @escaping () async throws -> Success
) -> Task<Success, ${FAILURE_TYPE}> {

let builtinSerialExecutor =
unsafe Builtin.extractFunctionIsolation(operation)?.unownedExecutor.executor

// Determine if we're switching isolation dynamically.
// If not, we can run the task synchronously and therefore MUST NOT "enqueue" it.
let flagsMustNotCrash: UInt64 = 0
let canRunSynchronously: Bool =
if let builtinSerialExecutor {
_taskIsCurrentExecutor(executor: builtinSerialExecutor, flags: flagsMustNotCrash)
} else {
true // if there is not target executor, we can run synchronously
}

let flags = taskCreateFlags(
priority: priority,
isChildTask: false,
copyTaskLocals: true,
inheritContext: true,
enqueueJob: false, // don't enqueue, we'll run it manually
enqueueJob: !canRunSynchronously,
addPendingGroupTaskUnconditionally: false,
isDiscardingTask: false,
isSynchronousStart: true
Expand All @@ -79,6 +95,7 @@ extension Task where Failure == ${FAILURE_TYPE} {
unsafe name.utf8CString.withUnsafeBufferPointer { nameBytes in
Builtin.createTask(
flags: flags,
initialSerialExecutor: builtinSerialExecutor,
taskName: nameBytes.baseAddress!._rawValue,
operation: operation).0
}
Expand All @@ -91,7 +108,9 @@ extension Task where Failure == ${FAILURE_TYPE} {
operation: operation).0
}

_startTaskSynchronously(task!)
if canRunSynchronously {
_startTaskSynchronously(task!, targetExecutor: builtinSerialExecutor)
}
return Task<Success, ${FAILURE_TYPE}>(task!)
}
}
Expand Down Expand Up @@ -161,7 +180,7 @@ extension ${GROUP_TYPE} {
public func ${METHOD_NAME}( // in ${GROUP_TYPE}
name: String? = nil,
priority: TaskPriority? = nil,
operation: sending @escaping () async ${THROWS}-> ${RESULT_TYPE}
@_inheritActorContext @_implicitSelfCapture operation: sending @isolated(any) @escaping () async ${THROWS}-> ${RESULT_TYPE}
) {
let flags = taskCreateFlags(
priority: priority,
Expand All @@ -174,13 +193,18 @@ extension ${GROUP_TYPE} {
isSynchronousStart: true
)

// Create the asynchronous task.
let builtinSerialExecutor =
unsafe Builtin.extractFunctionIsolation(operation)?.unownedExecutor.executor

// Create the task in this group.
let (task, _) = Builtin.createTask(
flags: flags,
initialSerialExecutor: builtinSerialExecutor,
taskGroup: self._group,
operation: operation
)
_startTaskSynchronously(task)
_startTaskSynchronously(task, targetExecutor: builtinSerialExecutor)
}
}
% end # METHOD_NAMES
Expand Down Expand Up @@ -241,4 +265,4 @@ extension Task where Failure == ${FAILURE_TYPE} {
internal func _startTaskOnMainActor(_ task: Builtin.NativeObject)

@_silgen_name("swift_task_startSynchronously")
internal func _startTaskSynchronously(_ task: Builtin.NativeObject)
internal func _startTaskSynchronously(_ task: Builtin.NativeObject, targetExecutor: Builtin.Executor?)
3 changes: 1 addition & 2 deletions stdlib/public/Concurrency/Task.swift
Original file line number Diff line number Diff line change
Expand Up @@ -807,8 +807,7 @@ extension Task where Failure == Error {
unsafe Builtin.extractFunctionIsolation(operation)?.unownedExecutor.executor

let (task, _) = Builtin.createTask(flags: flags,
initialSerialExecutor:
builtinSerialExecutor,
initialSerialExecutor: builtinSerialExecutor,
operation: operation)

self._task = task
Expand Down
124 changes: 106 additions & 18 deletions test/Concurrency/Runtime/startSynchronously.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// REQUIRES: rdar145735542
// RUN: %empty-directory(%t)
// RUN: %target-build-swift -Xfrontend -disable-availability-checking %s %import-libdispatch -swift-version 6 -o %t/a.out
// RUN: %target-codesign %t/a.out
Expand Down Expand Up @@ -62,6 +61,42 @@ extension ThreadID: @unchecked Sendable {}
@globalActor
actor MyGlobalActor {
static let shared: MyGlobalActor = MyGlobalActor()

@MyGlobalActor
static func test() {}
}

final class NaiveQueueExecutor: SerialExecutor {
let queue: DispatchQueue

init(queue: DispatchQueue) {
self.queue = queue
}

public func enqueue(_ job: consuming ExecutorJob) {
let unowned = UnownedJob(job)
print("NaiveQueueExecutor(\(self.queue.label)) enqueue [thread:\(getCurrentThreadID())]")
queue.async {
unowned.runSynchronously(on: self.asUnownedSerialExecutor())
}
}
}

@globalActor
actor DifferentGlobalActor {
static let queue = DispatchQueue(label: "DifferentGlobalActor-queue")
let executor: NaiveQueueExecutor
nonisolated let unownedExecutor: UnownedSerialExecutor

init() {
self.executor = NaiveQueueExecutor(queue: DifferentGlobalActor.queue)
self.unownedExecutor = executor.asUnownedSerialExecutor()
}

static let shared: DifferentGlobalActor = DifferentGlobalActor()

@DifferentGlobalActor
static func test() {}
}

// Test on all platforms
Expand Down Expand Up @@ -94,6 +129,49 @@ func syncOnMyGlobalActor() -> [Task<Void, Never>] {
return [t1, tt]
}

func syncOnMyGlobalActorHopToDifferentActor() -> [Task<Void, Never>] {
MyGlobalActor.shared.preconditionIsolated("Should be executing on the global actor here")
print("Confirmed to be on @MyGlobalActor")

// This task must be guaranteed to happen AFTER 'tt' because we are already on this actor
// so this enqueue must happen after we give up the actor.
print("schedule Task { @DifferentGlobalActor }, before startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)")
let t1 = Task { @DifferentGlobalActor in
print("inside Task { @DifferentGlobalActor } [thread:\(getCurrentThreadID())] @ :\(#line)")
DifferentGlobalActor.shared.preconditionIsolated("Expected Task{} to be on DifferentGlobalActor")
}

print("before startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)")
let outerTID = getCurrentThreadID()
let tt = Task.startSynchronously { @DifferentGlobalActor in
let innerTID = getCurrentThreadID()
print("inside startSynchronously, outer thread = \(outerTID)")
print("inside startSynchronously, inner thread = \(innerTID)")
if (compareThreadIDs(outerTID, .equal, innerTID)) {
// This case specifically is NOT synchronously run because we specified a different isolation for the closure
// and FORCED a hop to the DifferentGlobalActor executor.
print("ERROR! Outer Thread ID must NOT equal Thread ID inside runSynchronously synchronous part!")
}
// We crucially need to see this task be enqueued on the different global actor,
// so it did not execute "synchronously" after all - it had to hop to the other actor.
dispatchPrecondition(condition: .onQueue(DifferentGlobalActor.queue))
DifferentGlobalActor.shared.preconditionIsolated("Expected Task.startSynchronously { @DifferentGlobalActor in } to be on DifferentGlobalActor")

print("inside startSynchronously, sleep now [thread:\(getCurrentThreadID())] @ :\(#line)")
_ = try? await Task.sleep(for: .milliseconds(100))

print("inside startSynchronously, after sleep [thread:\(getCurrentThreadID())] @ :\(#line)")
dispatchPrecondition(condition: .onQueue(DifferentGlobalActor.queue))
DifferentGlobalActor.shared.preconditionIsolated("Expected Task.startSynchronously { @DifferentGlobalActor in } to be on DifferentGlobalActor")

// do something here
await MyGlobalActor.test()
DifferentGlobalActor.test()
}

return [t1, tt]
}

func syncOnNonTaskThread(synchronousTask behavior: SynchronousTaskBehavior) {
let sem1 = DispatchSemaphore(value: 0)
let sem2 = DispatchSemaphore(value: 0)
Expand Down Expand Up @@ -162,6 +240,33 @@ await Task { @MyGlobalActor in
// resume on some other thread
// CHECK: after sleep, inside startSynchronously

print("\n\n==== ------------------------------------------------------------------")
print("syncOnMyGlobalActorHopToDifferentActor()")

await Task { @MyGlobalActor in
MyGlobalActor.shared.preconditionIsolated("Should be executing on the global actor here")
for t in syncOnMyGlobalActorHopToDifferentActor() {
await t.value
}
}.value

// Assertion Notes: We expect the task to be on the specified queue as we force the Task.startSynchronously
// task to enqueue on the DifferentGlobalActor, however we CANNOT use threads to verify this behavior,
// because dispatch may still pull tricks and reuse threads. We can only verify that we're on the right
// queue, and that the `enqueue` calls on the target executor happen when we expect them to.
//
// CHECK: syncOnMyGlobalActorHopToDifferentActor()
// CHECK: Confirmed to be on @MyGlobalActor
// CHECK: before startSynchronously

// This IS actually enqueueing on the target actor (not synchronous), as expected:
// CHECK: NaiveQueueExecutor(DifferentGlobalActor-queue) enqueue
// CHECK: inside startSynchronously, sleep now

// After the sleep we get back onto the specified executor as expected
// CHECK: NaiveQueueExecutor(DifferentGlobalActor-queue) enqueue
// CHECK: inside startSynchronously, after sleep

print("\n\n==== ------------------------------------------------------------------")
var behavior: SynchronousTaskBehavior = .suspend
print("syncOnNonTaskThread(synchronousTask: \(behavior))")
Expand Down Expand Up @@ -347,23 +452,6 @@ callActorFromStartSynchronousTask(recipient: .recipientOnQueue(RecipientOnQueue(
// CHECK-NOT: ERROR!
// CHECK: inside startSynchronously, done

final class NaiveQueueExecutor: SerialExecutor {
let queue: DispatchQueue

init(queue: DispatchQueue) {
self.queue = queue
}

public func enqueue(_ job: consuming ExecutorJob) {
let unowned = UnownedJob(job)
print("NaiveQueueExecutor(\(self.queue.label)) enqueue... [thread:\(getCurrentThreadID())]")
queue.async {
print("NaiveQueueExecutor(\(self.queue.label)) enqueue: run [thread:\(getCurrentThreadID())]")
unowned.runSynchronously(on: self.asUnownedSerialExecutor())
}
}
}

actor RecipientOnQueue {
let executor: NaiveQueueExecutor
nonisolated let unownedExecutor: UnownedSerialExecutor
Expand Down
64 changes: 64 additions & 0 deletions test/Concurrency/Runtime/startSynchronously_order.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// RUN: %empty-directory(%t)
// RUN: %target-build-swift -Xfrontend -disable-availability-checking %s %import-libdispatch -swift-version 6 -o %t/a.out
// RUN: %target-codesign %t/a.out
// RUN: %target-run %t/a.out | %FileCheck %s

// REQUIRES: executable_test
// REQUIRES: concurrency
// REQUIRES: concurrency_runtime

// UNSUPPORTED: back_deployment_runtime
// UNSUPPORTED: back_deploy_concurrency
// UNSUPPORTED: use_os_stdlib
// UNSUPPORTED: freestanding

import _Concurrency

let max = 1000

func bar(x: Int, cc: CheckedContinuation<Void, Never>) {
Task.startSynchronously {
print("Task \(x) started")
try! await Task.sleep(nanoseconds: 10000)
if (x == max) {
cc.resume()
}
}
}

await withCheckedContinuation { (cc: CheckedContinuation<Void, Never>) in
for i in 1...max {
bar(x: i, cc: cc)
}
}

// CHECK: Task 1 started
// CHECK: Task 2 started
// CHECK: Task 3 started
// CHECK: Task 4 started
// CHECK: Task 5 started
// CHECK: Task 6 started
// CHECK: Task 7 started
// CHECK: Task 8 started
// CHECK: Task 9 started
// CHECK: Task 10 started
// CHECK: Task 11 started
// CHECK: Task 12 started
// CHECK: Task 13 started
// CHECK: Task 14 started
// CHECK: Task 15 started
// CHECK: Task 16 started
// CHECK: Task 17 started
// CHECK: Task 18 started
// CHECK: Task 19 started
// CHECK: Task 20 started
// CHECK: Task 21 started
// CHECK: Task 22 started
// CHECK: Task 23 started
// CHECK: Task 24 started
// CHECK: Task 25 started
// CHECK: Task 26 started
// CHECK: Task 27 started
// CHECK: Task 28 started
// CHECK: Task 29 started
// CHECK: Task 30 started
Loading