Skip to content

Commit 83b8be8

Browse files
committed
Add a withTaskPriorityChangedHandler function and use it in TaskScheduler
We need this function anyway to escalate process priorities when we set `nice`ness values for them. It also simplifies the task scheduler and I’m hoping that it fixes a non-deterministic failure that causes task priority elevation to not work properly.
1 parent 8a413d2 commit 83b8be8

File tree

3 files changed

+80
-64
lines changed

3 files changed

+80
-64
lines changed

Sources/SKCore/TaskScheduler.swift

Lines changed: 19 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import CAtomics
1414
import Foundation
1515
import LSPLogging
16+
import SKSupport
1617

1718
/// See comment on ``TaskDescriptionProtocol/dependencies(to:taskPriority:)``
1819
public enum TaskDependencyAction<TaskDescription: TaskDescriptionProtocol> {
@@ -125,10 +126,6 @@ public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
125126
/// Every time `execute` gets called, a new task is placed in this continuation. See comment on `executionTask`.
126127
private let executionTaskCreatedContinuation: AsyncStream<Task<ExecutionTaskFinishStatus, Never>>.Continuation
127128

128-
/// Placing a new value in this continuation will cause `resultTask` to query its priority and set
129-
/// `QueuedTask.priority`.
130-
private let updatePriorityContinuation: AsyncStream<Void>.Continuation
131-
132129
nonisolated(unsafe) private var _priority: AtomicUInt8
133130

134131
/// The latest known priority of the task.
@@ -189,16 +186,10 @@ public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
189186
description: TaskDescription,
190187
executionStateChangedCallback: (@Sendable (QueuedTask, TaskExecutionState) async -> Void)?
191188
) async {
192-
self._priority = .init(initialValue: priority?.rawValue ?? Task.currentPriority.rawValue)
189+
self._priority = AtomicUInt8(initialValue: priority?.rawValue ?? Task.currentPriority.rawValue)
193190
self.description = description
194191
self.executionStateChangedCallback = executionStateChangedCallback
195192

196-
var updatePriorityContinuation: AsyncStream<Void>.Continuation!
197-
let updatePriorityStream = AsyncStream {
198-
updatePriorityContinuation = $0
199-
}
200-
self.updatePriorityContinuation = updatePriorityContinuation
201-
202193
var executionTaskCreatedContinuation: AsyncStream<Task<ExecutionTaskFinishStatus, Never>>.Continuation!
203194
let executionTaskCreatedStream = AsyncStream {
204195
executionTaskCreatedContinuation = $0
@@ -207,38 +198,24 @@ public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
207198

208199
self.resultTask = Task.detached(priority: priority) {
209200
await withTaskCancellationHandler {
210-
await withTaskGroup(of: Void.self) { taskGroup in
211-
taskGroup.addTask {
212-
for await _ in updatePriorityStream {
213-
if Task.currentPriority != self.priority {
214-
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
215-
logger.debug(
216-
"Updating priority of \(self.description.forLogging) from \(self.priority.rawValue) to \(Task.currentPriority.rawValue)"
217-
)
218-
}
219-
self.priority = Task.currentPriority
220-
}
201+
await withTaskPriorityChangedHandler(initialPriority: self.priority) {
202+
for await task in executionTaskCreatedStream {
203+
switch await task.valuePropagatingCancellation {
204+
case .cancelledToBeRescheduled:
205+
// Break the switch and wait for a new `executionTask` to be placed into `executionTaskCreatedStream`.
206+
break
207+
case .terminated:
208+
// The task finished. We are done with this `QueuedTask`
209+
return
221210
}
222211
}
223-
taskGroup.addTask {
224-
for await task in executionTaskCreatedStream {
225-
switch await task.valuePropagatingCancellation {
226-
case .cancelledToBeRescheduled:
227-
// Break the switch and wait for a new `executionTask` to be placed into `executionTaskCreatedStream`.
228-
break
229-
case .terminated:
230-
// The task finished. We are done with this `QueuedTask`
231-
return
232-
}
233-
}
234-
}
235-
// The first (update priority) task never finishes, so this waits for the second (wait for execution) task
236-
// to terminate.
237-
// Afterwards we also cancel the update priority task.
238-
for await _ in taskGroup {
239-
taskGroup.cancelAll()
240-
return
212+
} taskPriorityChanged: {
213+
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
214+
logger.debug(
215+
"Updating priority of \(self.description.forLogging) from \(self.priority.rawValue) to \(Task.currentPriority.rawValue)"
216+
)
241217
}
218+
self.priority = Task.currentPriority
242219
}
243220
} onCancel: {
244221
self.resultTaskCancelled.value = true
@@ -291,16 +268,6 @@ public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
291268
self.executionTask = nil
292269
}
293270

294-
/// Trigger `QueuedTask.priority` to be updated with the current priority of the underlying task.
295-
///
296-
/// This is an asynchronous operation that makes no guarantees when the updated priority will be available.
297-
///
298-
/// This is needed because tasks can't subscribe to priority updates (ie. there is no `withPriorityHandler` similar to
299-
/// `withCancellationHandler`, https://github.com/apple/swift/issues/73367).
300-
func triggerPriorityUpdate() {
301-
updatePriorityContinuation.yield()
302-
}
303-
304271
/// If the priority of this task is less than `targetPriority`, elevate the priority to `targetPriority` by spawning
305272
/// a new task that depends on it. Otherwise a no-op.
306273
nonisolated func elevatePriority(to targetPriority: TaskPriority) {
@@ -399,16 +366,6 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
399366
return queuedTask
400367
}
401368

402-
/// Trigger all queued tasks to update their priority.
403-
///
404-
/// Should be called occasionally to elevate tasks in the queue whose underlying `Swift.Task` had their priority
405-
/// elevated because a higher-priority task started depending on them.
406-
private func triggerPriorityUpdateOfQueuedTasks() async {
407-
for task in pendingTasks {
408-
await task.triggerPriorityUpdate()
409-
}
410-
}
411-
412369
/// Returns the maximum number of concurrent tasks that are allowed to execute at the given priority.
413370
private func maxConcurrentTasks(at priority: TaskPriority) -> Int {
414371
for (atPriority, maxConcurrentTasks) in maxConcurrentTasksByPriority {
@@ -431,9 +388,8 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
431388
{
432389
// We don't have any execution slots left. Thus, this poker has nothing to do and is done.
433390
// When the next task finishes, it calls `poke` again.
434-
// If the low priority task's priority gets elevated, that will be picked up when the next task in the
435-
// `TaskScheduler` finishes, which causes `triggerPriorityUpdateOfQueuedTasks` to be called, which transfers
436-
// the new elevated priority to `QueuedTask.priority` and which can then be picked up by the next `poke` call.
391+
// If the low priority task's priority gets elevated that task's priority will get elevated and it will be
392+
// picked up on the next `poke` call.
437393
return
438394
}
439395
let dependencies = task.description.dependencies(to: currentlyExecutingTasks.map(\.description))
@@ -535,7 +491,6 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
535491
case .terminated: break
536492
case .cancelledToBeRescheduled: pendingTasks.append(task)
537493
}
538-
await self.triggerPriorityUpdateOfQueuedTasks()
539494
self.poke()
540495
}
541496
}

Sources/SKSupport/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ add_library(SKSupport STATIC
1717
Result.swift
1818
Sequence+AsyncMap.swift
1919
SwitchableProcessResultExitStatus.swift
20+
Task+WithPriorityChangedHandler.swift
2021
ThreadSafeBox.swift
2122
WorkspaceType.swift
2223
)
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift.org open source project
4+
//
5+
// Copyright (c) 2014 - 2024 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
/// Runs `operation`. If the task's priority changes while the operation is running, calls `taskPriorityChanged`.
14+
///
15+
/// Since Swift Concurrency doesn't support direct observation of a task's priority, this polls the task's priority at
16+
/// `pollingInterval`.
17+
/// The function assumes that the original priority of the task is `initialPriority`. If the task priority changed
18+
/// compared to `initialPriority`, the `taskPriorityChanged` will be called.
19+
public func withTaskPriorityChangedHandler(
20+
initialPriority: TaskPriority = Task.currentPriority,
21+
pollingInterval: Duration = .seconds(0.1),
22+
@_inheritActorContext operation: @escaping @Sendable () async -> Void,
23+
taskPriorityChanged: @escaping @Sendable () -> Void
24+
) async {
25+
let lastPriority = ThreadSafeBox(initialValue: initialPriority)
26+
await withTaskGroup(of: Void.self) { taskGroup in
27+
taskGroup.addTask {
28+
while true {
29+
if Task.isCancelled {
30+
return
31+
}
32+
let newPriority = Task.currentPriority
33+
let didChange = lastPriority.withLock { lastPriority in
34+
if newPriority != lastPriority {
35+
lastPriority = newPriority
36+
return true
37+
}
38+
return false
39+
}
40+
if didChange {
41+
taskPriorityChanged()
42+
}
43+
do {
44+
try await Task.sleep(for: pollingInterval)
45+
} catch {
46+
break
47+
}
48+
}
49+
}
50+
taskGroup.addTask {
51+
await operation()
52+
}
53+
// The first task that watches the priority never finishes, so we are effectively await the `operation` task here
54+
// and cancelling the priority observation task once the operation task is done.
55+
// We do need to await the observation task as well so that priority escalation also affects the observation task.
56+
for await _ in taskGroup {
57+
taskGroup.cancelAll()
58+
}
59+
}
60+
}

0 commit comments

Comments
 (0)