Skip to content

Commit ea6a585

Browse files
authored
Merge pull request #1335 from ahoppen/task-scheduler-logging
Add some logging to `TaskScheduler` again
2 parents 6103661 + cb24ce9 commit ea6a585

File tree

5 files changed

+122
-69
lines changed

5 files changed

+122
-69
lines changed

Sources/SKCore/TaskScheduler.swift

Lines changed: 47 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,16 @@
1313
import CAtomics
1414
import Foundation
1515
import LSPLogging
16+
import SKSupport
1617

1718
/// See comment on ``TaskDescriptionProtocol/dependencies(to:taskPriority:)``
1819
public enum TaskDependencyAction<TaskDescription: TaskDescriptionProtocol> {
1920
case waitAndElevatePriorityOfDependency(TaskDescription)
2021
case cancelAndRescheduleDependency(TaskDescription)
2122
}
2223

24+
private let taskSchedulerSubsystem = "org.swift.sourcekit-lsp.task-scheduler"
25+
2326
public protocol TaskDescriptionProtocol: Identifiable, Sendable, CustomLogStringConvertible {
2427
/// Execute the task.
2528
///
@@ -123,10 +126,6 @@ public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
123126
/// Every time `execute` gets called, a new task is placed in this continuation. See comment on `executionTask`.
124127
private let executionTaskCreatedContinuation: AsyncStream<Task<ExecutionTaskFinishStatus, Never>>.Continuation
125128

126-
/// Placing a new value in this continuation will cause `resultTask` to query its priority and set
127-
/// `QueuedTask.priority`.
128-
private let updatePriorityContinuation: AsyncStream<Void>.Continuation
129-
130129
nonisolated(unsafe) private var _priority: AtomicUInt8
131130

132131
/// The latest known priority of the task.
@@ -187,16 +186,10 @@ public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
187186
description: TaskDescription,
188187
executionStateChangedCallback: (@Sendable (QueuedTask, TaskExecutionState) async -> Void)?
189188
) async {
190-
self._priority = .init(initialValue: priority?.rawValue ?? Task.currentPriority.rawValue)
189+
self._priority = AtomicUInt8(initialValue: priority?.rawValue ?? Task.currentPriority.rawValue)
191190
self.description = description
192191
self.executionStateChangedCallback = executionStateChangedCallback
193192

194-
var updatePriorityContinuation: AsyncStream<Void>.Continuation!
195-
let updatePriorityStream = AsyncStream {
196-
updatePriorityContinuation = $0
197-
}
198-
self.updatePriorityContinuation = updatePriorityContinuation
199-
200193
var executionTaskCreatedContinuation: AsyncStream<Task<ExecutionTaskFinishStatus, Never>>.Continuation!
201194
let executionTaskCreatedStream = AsyncStream {
202195
executionTaskCreatedContinuation = $0
@@ -205,31 +198,24 @@ public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
205198

206199
self.resultTask = Task.detached(priority: priority) {
207200
await withTaskCancellationHandler {
208-
await withTaskGroup(of: Void.self) { taskGroup in
209-
taskGroup.addTask {
210-
for await _ in updatePriorityStream {
211-
self.priority = Task.currentPriority
201+
await withTaskPriorityChangedHandler(initialPriority: self.priority) {
202+
for await task in executionTaskCreatedStream {
203+
switch await task.valuePropagatingCancellation {
204+
case .cancelledToBeRescheduled:
205+
// Break the switch and wait for a new `executionTask` to be placed into `executionTaskCreatedStream`.
206+
break
207+
case .terminated:
208+
// The task finished. We are done with this `QueuedTask`
209+
return
212210
}
213211
}
214-
taskGroup.addTask {
215-
for await task in executionTaskCreatedStream {
216-
switch await task.valuePropagatingCancellation {
217-
case .cancelledToBeRescheduled:
218-
// Break the switch and wait for a new `executionTask` to be placed into `executionTaskCreatedStream`.
219-
break
220-
case .terminated:
221-
// The task finished. We are done with this `QueuedTask`
222-
return
223-
}
224-
}
225-
}
226-
// The first (update priority) task never finishes, so this waits for the second (wait for execution) task
227-
// to terminate.
228-
// Afterwards we also cancel the update priority task.
229-
for await _ in taskGroup {
230-
taskGroup.cancelAll()
231-
return
212+
} taskPriorityChanged: {
213+
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
214+
logger.debug(
215+
"Updating priority of \(self.description.forLogging) from \(self.priority.rawValue) to \(Task.currentPriority.rawValue)"
216+
)
232217
}
218+
self.priority = Task.currentPriority
233219
}
234220
} onCancel: {
235221
self.resultTaskCancelled.value = true
@@ -282,20 +268,15 @@ public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
282268
self.executionTask = nil
283269
}
284270

285-
/// Trigger `QueuedTask.priority` to be updated with the current priority of the underlying task.
286-
///
287-
/// This is an asynchronous operation that makes no guarantees when the updated priority will be available.
288-
///
289-
/// This is needed because tasks can't subscribe to priority updates (ie. there is no `withPriorityHandler` similar to
290-
/// `withCancellationHandler`, https://github.com/apple/swift/issues/73367).
291-
func triggerPriorityUpdate() {
292-
updatePriorityContinuation.yield()
293-
}
294-
295271
/// If the priority of this task is less than `targetPriority`, elevate the priority to `targetPriority` by spawning
296272
/// a new task that depends on it. Otherwise a no-op.
297273
nonisolated func elevatePriority(to targetPriority: TaskPriority) {
298274
if priority < targetPriority {
275+
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
276+
logger.debug(
277+
"Elevating priority of \(self.description.forLogging) from \(self.priority.rawValue) to \(targetPriority.rawValue)"
278+
)
279+
}
299280
Task(priority: targetPriority) {
300281
await self.resultTask.value
301282
}
@@ -385,16 +366,6 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
385366
return queuedTask
386367
}
387368

388-
/// Trigger all queued tasks to update their priority.
389-
///
390-
/// Should be called occasionally to elevate tasks in the queue whose underlying `Swift.Task` had their priority
391-
/// elevated because a higher-priority task started depending on them.
392-
private func triggerPriorityUpdateOfQueuedTasks() async {
393-
for task in pendingTasks {
394-
await task.triggerPriorityUpdate()
395-
}
396-
}
397-
398369
/// Returns the maximum number of concurrent tasks that are allowed to execute at the given priority.
399370
private func maxConcurrentTasks(at priority: TaskPriority) -> Int {
400371
for (atPriority, maxConcurrentTasks) in maxConcurrentTasksByPriority {
@@ -417,9 +388,8 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
417388
{
418389
// We don't have any execution slots left. Thus, this poker has nothing to do and is done.
419390
// When the next task finishes, it calls `poke` again.
420-
// If the low priority task's priority gets elevated, that will be picked up when the next task in the
421-
// `TaskScheduler` finishes, which causes `triggerPriorityUpdateOfQueuedTasks` to be called, which transfers
422-
// the new elevated priority to `QueuedTask.priority` and which can then be picked up by the next `poke` call.
391+
// If the low priority task's priority gets elevated that task's priority will get elevated and it will be
392+
// picked up on the next `poke` call.
423393
return
424394
}
425395
let dependencies = task.description.dependencies(to: currentlyExecutingTasks.map(\.description))
@@ -428,13 +398,17 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
428398
case .cancelAndRescheduleDependency(let taskDescription):
429399
guard let dependency = self.currentlyExecutingTasks.first(where: { $0.description.id == taskDescription.id })
430400
else {
431-
logger.fault(
432-
"Cannot find task to wait for \(taskDescription.forLogging) in list of currently executing tasks"
433-
)
401+
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
402+
logger.fault(
403+
"Cannot find task to wait for \(taskDescription.forLogging) in list of currently executing tasks"
404+
)
405+
}
434406
return nil
435407
}
436408
if !taskDescription.isIdempotent {
437-
logger.fault("Cannot reschedule task '\(taskDescription.forLogging)' since it is not idempotent")
409+
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
410+
logger.fault("Cannot reschedule task '\(taskDescription.forLogging)' since it is not idempotent")
411+
}
438412
return dependency
439413
}
440414
if dependency.priority > task.priority {
@@ -445,9 +419,11 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
445419
case .waitAndElevatePriorityOfDependency(let taskDescription):
446420
guard let dependency = self.currentlyExecutingTasks.first(where: { $0.description.id == taskDescription.id })
447421
else {
448-
logger.fault(
449-
"Cannot find task to wait for '\(taskDescription.forLogging)' in list of currently executing tasks"
450-
)
422+
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
423+
logger.fault(
424+
"Cannot find task to wait for '\(taskDescription.forLogging)' in list of currently executing tasks"
425+
)
426+
}
451427
return nil
452428
}
453429
return dependency
@@ -465,9 +441,11 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
465441
switch taskDependency {
466442
case .cancelAndRescheduleDependency(let taskDescription):
467443
guard let task = self.currentlyExecutingTasks.first(where: { $0.description.id == taskDescription.id }) else {
468-
logger.fault(
469-
"Cannot find task to reschedule \(taskDescription.forLogging) in list of currently executing tasks"
470-
)
444+
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
445+
logger.fault(
446+
"Cannot find task to reschedule \(taskDescription.forLogging) in list of currently executing tasks"
447+
)
448+
}
471449
return nil
472450
}
473451
return task
@@ -478,6 +456,9 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
478456
if !rescheduleTasks.isEmpty {
479457
Task.detached(priority: task.priority) {
480458
for task in rescheduleTasks {
459+
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
460+
logger.debug("Suspending \(task.description.forLogging)")
461+
}
481462
await task.cancelToBeRescheduled()
482463
}
483464
}
@@ -510,7 +491,6 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
510491
case .terminated: break
511492
case .cancelledToBeRescheduled: pendingTasks.append(task)
512493
}
513-
await self.triggerPriorityUpdateOfQueuedTasks()
514494
self.poke()
515495
}
516496
}

Sources/SKSupport/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ add_library(SKSupport STATIC
1717
Result.swift
1818
Sequence+AsyncMap.swift
1919
SwitchableProcessResultExitStatus.swift
20+
Task+WithPriorityChangedHandler.swift
2021
ThreadSafeBox.swift
2122
WorkspaceType.swift
2223
)
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift.org open source project
4+
//
5+
// Copyright (c) 2014 - 2024 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
/// Runs `operation`. If the task's priority changes while the operation is running, calls `taskPriorityChanged`.
14+
///
15+
/// Since Swift Concurrency doesn't support direct observation of a task's priority, this polls the task's priority at
16+
/// `pollingInterval`.
17+
/// The function assumes that the original priority of the task is `initialPriority`. If the task priority changed
18+
/// compared to `initialPriority`, the `taskPriorityChanged` will be called.
19+
public func withTaskPriorityChangedHandler(
20+
initialPriority: TaskPriority = Task.currentPriority,
21+
pollingInterval: Duration = .seconds(0.1),
22+
@_inheritActorContext operation: @escaping @Sendable () async -> Void,
23+
taskPriorityChanged: @escaping @Sendable () -> Void
24+
) async {
25+
let lastPriority = ThreadSafeBox(initialValue: initialPriority)
26+
await withTaskGroup(of: Void.self) { taskGroup in
27+
taskGroup.addTask {
28+
while true {
29+
if Task.isCancelled {
30+
return
31+
}
32+
let newPriority = Task.currentPriority
33+
let didChange = lastPriority.withLock { lastPriority in
34+
if newPriority != lastPriority {
35+
lastPriority = newPriority
36+
return true
37+
}
38+
return false
39+
}
40+
if didChange {
41+
taskPriorityChanged()
42+
}
43+
do {
44+
try await Task.sleep(for: pollingInterval)
45+
} catch {
46+
break
47+
}
48+
}
49+
}
50+
taskGroup.addTask {
51+
await operation()
52+
}
53+
// The first task that watches the priority never finishes, so we are effectively await the `operation` task here
54+
// and cancelling the priority observation task once the operation task is done.
55+
// We do need to await the observation task as well so that priority escalation also affects the observation task.
56+
for await _ in taskGroup {
57+
taskGroup.cancelAll()
58+
}
59+
}
60+
}

Sources/SKTestSupport/SkipUnless.swift

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,16 @@ public actor SkipUnless {
272272
public static func platformIsDarwin(_ message: String) throws {
273273
try XCTSkipUnless(Platform.current == .darwin, message)
274274
}
275+
276+
public static func platformSupportsTaskPriorityElevation() throws {
277+
#if os(macOS)
278+
guard #available(macOS 14.0, *) else {
279+
// Priority elevation was implemented by https://github.com/apple/swift/pull/63019, which is available in the
280+
// Swift 5.9 runtime included in macOS 14.0+
281+
throw XCTSkip("Priority elevation of tasks is only supported on macOS 14 and above")
282+
}
283+
#endif
284+
}
275285
}
276286

277287
// MARK: - Parsing Swift compiler version

Tests/SKCoreTests/TaskSchedulerTests.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
//
1111
//===----------------------------------------------------------------------===//
1212

13+
import LSPLogging
1314
import SKCore
1415
import SKTestSupport
1516
import XCTest
@@ -54,8 +55,7 @@ final class TaskSchedulerTests: XCTestCase {
5455
}
5556

5657
func testTasksWithElevatedPrioritiesGetExecutedFirst() async throws {
57-
try XCTSkipIf(true, "rdar://128601797")
58-
58+
try SkipUnless.platformSupportsTaskPriorityElevation()
5959
await runTaskScheduler(
6060
scheduleTasks: { scheduler, taskExecutionRecorder in
6161
for i in 0..<20 {
@@ -262,7 +262,9 @@ fileprivate final class ClosureTaskDescription: TaskDescriptionProtocol {
262262
}
263263

264264
func execute() async {
265+
logger.debug("Starting execution of \(self) with priority \(Task.currentPriority.rawValue)")
265266
await closure()
267+
logger.debug("Finished executing \(self) with priority \(Task.currentPriority.rawValue)")
266268
}
267269

268270
func dependencies(

0 commit comments

Comments
 (0)