Skip to content

Commit edfda7d

Browse files
committed
Add support for concurrent queues and dispatch barriers to AsyncQueue
1 parent 0c30951 commit edfda7d

File tree

3 files changed

+71
-31
lines changed

3 files changed

+71
-31
lines changed

Sources/LanguageServerProtocol/AsyncQueue.swift

Lines changed: 69 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -33,53 +33,93 @@ extension NSLock {
3333
}
3434
}
3535

36-
/// A serial queue that allows the execution of asyncronous blocks of code.
36+
/// A queue that allows the execution of asyncronous blocks of code.
3737
public final class AsyncQueue {
38-
/// Lock guarding `lastTask`.
39-
private let lastTaskLock = NSLock()
38+
public enum QueueKind {
39+
/// A queue that allows concurrent execution of tasks.
40+
case concurrent
4041

41-
/// The last scheduled task if it hasn't finished yet.
42-
///
43-
/// Any newly scheduled tasks need to await this task to ensure that tasks are
44-
/// executed syncronously.
45-
///
46-
/// `id` is a unique value to identify the task. This allows us to set `lastTask`
47-
/// to `nil` if the queue runs empty.
48-
private var lastTask: (task: AnyTask, id: UUID)?
42+
/// A queue that executes one task after the other.
43+
case serial
44+
}
45+
46+
private struct PendingTask {
47+
/// The task that is pending.
48+
let task: any AnyTask
49+
50+
/// Whether the task needs to finish executing befoer any other task can
51+
/// start in executing in the queue.
52+
let isBarrier: Bool
4953

50-
public init() {
51-
self.lastTaskLock.name = "AsyncQueue.lastTaskLock"
54+
/// A unique value used to identify the task. This allows tasks to get
55+
/// removed from `pendingTasks` again after they finished executing.
56+
let id: UUID
57+
}
58+
59+
/// Whether the queue allows concurrent execution of tasks.
60+
private let kind: QueueKind
61+
62+
/// Lock guarding `pendingTasks`.
63+
private let pendingTasksLock = NSLock()
64+
65+
/// Pending tasks that have not finished execution yet.
66+
private var pendingTasks = [PendingTask]()
67+
68+
public init(_ kind: QueueKind) {
69+
self.kind = kind
70+
self.pendingTasksLock.name = "AsyncQueue"
5271
}
5372

5473
/// Schedule a new closure to be executed on the queue.
5574
///
56-
/// All previously added tasks are guaranteed to finished executing before
57-
/// this closure gets executed.
75+
/// If this is a serial queue, all previously added tasks are guaranteed to
76+
/// finished executing before this closure gets executed.
77+
///
78+
/// If this is a barrier, all previously scheduled tasks are guaranteed to
79+
/// finish execution before the barrier is executed and all tasks that are
80+
/// added later will wait until the barrier finishes execution.
5881
@discardableResult
5982
public func async<Success: Sendable>(
6083
priority: TaskPriority? = nil,
84+
barrier isBarrier: Bool = false,
6185
@_inheritActorContext operation: @escaping @Sendable () async -> Success
6286
) -> Task<Success, Never> {
6387
let id = UUID()
6488

65-
return lastTaskLock.withLock {
66-
let task = Task<Success, Never>(priority: priority) { [previousLastTask = lastTask] in
67-
await previousLastTask?.task.waitForCompletion()
68-
69-
defer {
70-
lastTaskLock.withLock {
71-
// If we haven't queued a new task since enquing this one, we can clear
72-
// last task.
73-
if self.lastTask?.id == id {
74-
self.lastTask = nil
75-
}
76-
}
89+
return pendingTasksLock.withLock {
90+
// Build the list of tasks that need to finishe exeuction before this one
91+
// can be executed
92+
let dependencies: [PendingTask]
93+
switch (kind, isBarrier: isBarrier) {
94+
case (.concurrent, isBarrier: true):
95+
// Wait for all tasks after the last barrier.
96+
let lastBarrierIndex = pendingTasks.lastIndex(where: { $0.isBarrier }) ?? pendingTasks.startIndex
97+
dependencies = Array(pendingTasks[lastBarrierIndex...])
98+
case (.concurrent, isBarrier: false):
99+
// If there is a barrier, wait for it.
100+
dependencies = [pendingTasks.last(where: { $0.isBarrier })].compactMap { $0 }
101+
case (.serial, _):
102+
// We are in a serial queue. The last pending task must finish for this one to start.
103+
dependencies = [pendingTasks.last].compactMap { $0 }
104+
}
105+
106+
107+
// Schedule the task.
108+
let task = Task {
109+
for dependency in dependencies {
110+
await dependency.task.waitForCompletion()
111+
}
112+
113+
let result = await operation()
114+
115+
pendingTasksLock.withLock {
116+
pendingTasks.removeAll(where: { $0.id == id })
77117
}
78118

79-
return await operation()
119+
return result
80120
}
81121

82-
lastTask = (task, id)
122+
pendingTasks.append(PendingTask(task: task, isBarrier: isBarrier, id: id))
83123

84124
return task
85125
}

Sources/LanguageServerProtocol/Connection.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public final class LocalConnection {
8787
/// have forwarded the request to clangd.
8888
///
8989
/// The actual semantic handling of the message happens off this queue.
90-
let messageHandlingQueue: AsyncQueue = AsyncQueue()
90+
let messageHandlingQueue: AsyncQueue = AsyncQueue(.serial)
9191

9292
var _nextRequestID: Int = 0
9393

Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public final class JSONRPCConnection {
4040
/// have forwarded the request to clangd.
4141
///
4242
/// The actual semantic handling of the message happens off this queue.
43-
let messageHandlingQueue: AsyncQueue = AsyncQueue()
43+
let messageHandlingQueue: AsyncQueue = AsyncQueue(.serial)
4444
let receiveIO: DispatchIO
4545
let sendIO: DispatchIO
4646
let messageRegistry: MessageRegistry

0 commit comments

Comments
 (0)