13
13
import CAtomics
14
14
import Foundation
15
15
import LSPLogging
16
+ import SKSupport
16
17
17
18
/// See comment on ``TaskDescriptionProtocol/dependencies(to:taskPriority:)``
18
19
public enum TaskDependencyAction < TaskDescription: TaskDescriptionProtocol > {
19
20
case waitAndElevatePriorityOfDependency( TaskDescription )
20
21
case cancelAndRescheduleDependency( TaskDescription )
21
22
}
22
23
24
+ private let taskSchedulerSubsystem = " org.swift.sourcekit-lsp.task-scheduler "
25
+
23
26
public protocol TaskDescriptionProtocol : Identifiable , Sendable , CustomLogStringConvertible {
24
27
/// Execute the task.
25
28
///
@@ -123,10 +126,6 @@ public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
123
126
/// Every time `execute` gets called, a new task is placed in this continuation. See comment on `executionTask`.
124
127
private let executionTaskCreatedContinuation : AsyncStream < Task < ExecutionTaskFinishStatus , Never > > . Continuation
125
128
126
- /// Placing a new value in this continuation will cause `resultTask` to query its priority and set
127
- /// `QueuedTask.priority`.
128
- private let updatePriorityContinuation : AsyncStream < Void > . Continuation
129
-
130
129
nonisolated ( unsafe) private var _priority : AtomicUInt8
131
130
132
131
/// The latest known priority of the task.
@@ -183,20 +182,14 @@ public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
183
182
private let executionStateChangedCallback : ( @Sendable ( QueuedTask, TaskExecutionState) async -> Void ) ?
184
183
185
184
init(
186
- priority: TaskPriority ? = nil ,
185
+ priority: TaskPriority ,
187
186
description: TaskDescription ,
188
187
executionStateChangedCallback: ( @Sendable ( QueuedTask, TaskExecutionState) async -> Void ) ?
189
188
) async {
190
- self . _priority = . init ( initialValue: priority? . rawValue ?? Task . currentPriority . rawValue)
189
+ self . _priority = AtomicUInt8 ( initialValue: priority. rawValue)
191
190
self . description = description
192
191
self . executionStateChangedCallback = executionStateChangedCallback
193
192
194
- var updatePriorityContinuation : AsyncStream < Void > . Continuation !
195
- let updatePriorityStream = AsyncStream {
196
- updatePriorityContinuation = $0
197
- }
198
- self . updatePriorityContinuation = updatePriorityContinuation
199
-
200
193
var executionTaskCreatedContinuation : AsyncStream < Task < ExecutionTaskFinishStatus , Never > > . Continuation !
201
194
let executionTaskCreatedStream = AsyncStream {
202
195
executionTaskCreatedContinuation = $0
@@ -205,31 +198,24 @@ public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
205
198
206
199
self . resultTask = Task . detached ( priority: priority) {
207
200
await withTaskCancellationHandler {
208
- await withTaskGroup ( of: Void . self) { taskGroup in
209
- taskGroup. addTask {
210
- for await _ in updatePriorityStream {
211
- self . priority = Task . currentPriority
212
- }
213
- }
214
- taskGroup. addTask {
215
- for await task in executionTaskCreatedStream {
216
- switch await task. valuePropagatingCancellation {
217
- case . cancelledToBeRescheduled:
218
- // Break the switch and wait for a new `executionTask` to be placed into `executionTaskCreatedStream`.
219
- break
220
- case . terminated:
221
- // The task finished. We are done with this `QueuedTask`
222
- return
223
- }
201
+ await withTaskPriorityChangedHandler ( initialPriority: self . priority) {
202
+ for await task in executionTaskCreatedStream {
203
+ switch await task. valuePropagatingCancellation {
204
+ case . cancelledToBeRescheduled:
205
+ // Break the switch and wait for a new `executionTask` to be placed into `executionTaskCreatedStream`.
206
+ break
207
+ case . terminated:
208
+ // The task finished. We are done with this `QueuedTask`
209
+ return
224
210
}
225
211
}
226
- // The first (update priority) task never finishes, so this waits for the second (wait for execution) task
227
- // to terminate.
228
- // Afterwards we also cancel the update priority task.
229
- for await _ in taskGroup {
230
- taskGroup. cancelAll ( )
231
- return
212
+ } taskPriorityChanged: {
213
+ withLoggingSubsystemAndScope ( subsystem: taskSchedulerSubsystem, scope: nil ) {
214
+ logger. debug (
215
+ " Updating priority of \( self . description. forLogging) from \( self . priority. rawValue) to \( Task . currentPriority. rawValue) "
216
+ )
232
217
}
218
+ self . priority = Task . currentPriority
233
219
}
234
220
} onCancel: {
235
221
self . resultTaskCancelled. value = true
@@ -282,20 +268,19 @@ public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
282
268
self . executionTask = nil
283
269
}
284
270
285
- /// Trigger `QueuedTask.priority` to be updated with the current priority of the underlying task.
286
- ///
287
- /// This is an asynchronous operation that makes no guarantees when the updated priority will be available.
288
- ///
289
- /// This is needed because tasks can't subscribe to priority updates (ie. there is no `withPriorityHandler` similar to
290
- /// `withCancellationHandler`, https://github.com/apple/swift/issues/73367).
291
- func triggerPriorityUpdate( ) {
292
- updatePriorityContinuation. yield ( )
293
- }
294
-
295
271
/// If the priority of this task is less than `targetPriority`, elevate the priority to `targetPriority` by spawning
296
272
/// a new task that depends on it. Otherwise a no-op.
297
273
nonisolated func elevatePriority( to targetPriority: TaskPriority ) {
298
274
if priority < targetPriority {
275
+ withLoggingSubsystemAndScope ( subsystem: taskSchedulerSubsystem, scope: nil ) {
276
+ logger. debug (
277
+ " Elevating priority of \( self . description. forLogging) from \( self . priority. rawValue) to \( targetPriority. rawValue) "
278
+ )
279
+ }
280
+ // Awaiting the result task from a higher-priority task will eventually update `priority` through
281
+ // `withTaskPriorityChangedHandler` but that might take a while because `withTaskPriorityChangedHandler` polls.
282
+ // Since we know that the priority will be elevated, set it now. That way we don't try to elevate it again.
283
+ self . priority = targetPriority
299
284
Task ( priority: targetPriority) {
300
285
await self . resultTask. value
301
286
}
@@ -371,7 +356,7 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
371
356
) ? = nil
372
357
) async -> QueuedTask < TaskDescription > {
373
358
let queuedTask = await QueuedTask (
374
- priority: priority,
359
+ priority: priority ?? Task . currentPriority ,
375
360
description: taskDescription,
376
361
executionStateChangedCallback: executionStateChangedCallback
377
362
)
@@ -385,16 +370,6 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
385
370
return queuedTask
386
371
}
387
372
388
- /// Trigger all queued tasks to update their priority.
389
- ///
390
- /// Should be called occasionally to elevate tasks in the queue whose underlying `Swift.Task` had their priority
391
- /// elevated because a higher-priority task started depending on them.
392
- private func triggerPriorityUpdateOfQueuedTasks( ) async {
393
- for task in pendingTasks {
394
- await task. triggerPriorityUpdate ( )
395
- }
396
- }
397
-
398
373
/// Returns the maximum number of concurrent tasks that are allowed to execute at the given priority.
399
374
private func maxConcurrentTasks( at priority: TaskPriority ) -> Int {
400
375
for (atPriority, maxConcurrentTasks) in maxConcurrentTasksByPriority {
@@ -417,9 +392,8 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
417
392
{
418
393
// We don't have any execution slots left. Thus, this poker has nothing to do and is done.
419
394
// When the next task finishes, it calls `poke` again.
420
- // If the low priority task's priority gets elevated, that will be picked up when the next task in the
421
- // `TaskScheduler` finishes, which causes `triggerPriorityUpdateOfQueuedTasks` to be called, which transfers
422
- // the new elevated priority to `QueuedTask.priority` and which can then be picked up by the next `poke` call.
395
+ // If the low priority task's priority gets elevated that task's priority will get elevated and it will be
396
+ // picked up on the next `poke` call.
423
397
return
424
398
}
425
399
let dependencies = task. description. dependencies ( to: currentlyExecutingTasks. map ( \. description) )
@@ -428,13 +402,17 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
428
402
case . cancelAndRescheduleDependency( let taskDescription) :
429
403
guard let dependency = self . currentlyExecutingTasks. first ( where: { $0. description. id == taskDescription. id } )
430
404
else {
431
- logger. fault (
432
- " Cannot find task to wait for \( taskDescription. forLogging) in list of currently executing tasks "
433
- )
405
+ withLoggingSubsystemAndScope ( subsystem: taskSchedulerSubsystem, scope: nil ) {
406
+ logger. fault (
407
+ " Cannot find task to wait for \( taskDescription. forLogging) in list of currently executing tasks "
408
+ )
409
+ }
434
410
return nil
435
411
}
436
412
if !taskDescription. isIdempotent {
437
- logger. fault ( " Cannot reschedule task ' \( taskDescription. forLogging) ' since it is not idempotent " )
413
+ withLoggingSubsystemAndScope ( subsystem: taskSchedulerSubsystem, scope: nil ) {
414
+ logger. fault ( " Cannot reschedule task ' \( taskDescription. forLogging) ' since it is not idempotent " )
415
+ }
438
416
return dependency
439
417
}
440
418
if dependency. priority > task. priority {
@@ -445,9 +423,11 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
445
423
case . waitAndElevatePriorityOfDependency( let taskDescription) :
446
424
guard let dependency = self . currentlyExecutingTasks. first ( where: { $0. description. id == taskDescription. id } )
447
425
else {
448
- logger. fault (
449
- " Cannot find task to wait for ' \( taskDescription. forLogging) ' in list of currently executing tasks "
450
- )
426
+ withLoggingSubsystemAndScope ( subsystem: taskSchedulerSubsystem, scope: nil ) {
427
+ logger. fault (
428
+ " Cannot find task to wait for ' \( taskDescription. forLogging) ' in list of currently executing tasks "
429
+ )
430
+ }
451
431
return nil
452
432
}
453
433
return dependency
@@ -465,9 +445,11 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
465
445
switch taskDependency {
466
446
case . cancelAndRescheduleDependency( let taskDescription) :
467
447
guard let task = self . currentlyExecutingTasks. first ( where: { $0. description. id == taskDescription. id } ) else {
468
- logger. fault (
469
- " Cannot find task to reschedule \( taskDescription. forLogging) in list of currently executing tasks "
470
- )
448
+ withLoggingSubsystemAndScope ( subsystem: taskSchedulerSubsystem, scope: nil ) {
449
+ logger. fault (
450
+ " Cannot find task to reschedule \( taskDescription. forLogging) in list of currently executing tasks "
451
+ )
452
+ }
471
453
return nil
472
454
}
473
455
return task
@@ -478,6 +460,9 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
478
460
if !rescheduleTasks. isEmpty {
479
461
Task . detached ( priority: task. priority) {
480
462
for task in rescheduleTasks {
463
+ withLoggingSubsystemAndScope ( subsystem: taskSchedulerSubsystem, scope: nil ) {
464
+ logger. debug ( " Suspending \( task. description. forLogging) " )
465
+ }
481
466
await task. cancelToBeRescheduled ( )
482
467
}
483
468
}
@@ -510,7 +495,6 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
510
495
case . terminated: break
511
496
case . cancelledToBeRescheduled: pendingTasks. append ( task)
512
497
}
513
- await self . triggerPriorityUpdateOfQueuedTasks ( )
514
498
self . poke ( )
515
499
}
516
500
}
0 commit comments