Skip to content

Commit 6206585

Browse files
authored
Merge pull request #849 from ahoppen/ahoppen/async-request-result-return
Asynchronously return the request result for folding range requests
2 parents e1548a0 + 1b6015f commit 6206585

File tree

9 files changed

+345
-256
lines changed

9 files changed

+345
-256
lines changed

Sources/LanguageServerProtocol/AsyncQueue.swift

Lines changed: 69 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -33,53 +33,93 @@ extension NSLock {
3333
}
3434
}
3535

36-
/// A serial queue that allows the execution of asyncronous blocks of code.
36+
/// A queue that allows the execution of asyncronous blocks of code.
3737
public final class AsyncQueue {
38-
/// Lock guarding `lastTask`.
39-
private let lastTaskLock = NSLock()
38+
public enum QueueKind {
39+
/// A queue that allows concurrent execution of tasks.
40+
case concurrent
4041

41-
/// The last scheduled task if it hasn't finished yet.
42-
///
43-
/// Any newly scheduled tasks need to await this task to ensure that tasks are
44-
/// executed syncronously.
45-
///
46-
/// `id` is a unique value to identify the task. This allows us to set `lastTask`
47-
/// to `nil` if the queue runs empty.
48-
private var lastTask: (task: AnyTask, id: UUID)?
42+
/// A queue that executes one task after the other.
43+
case serial
44+
}
45+
46+
private struct PendingTask {
47+
/// The task that is pending.
48+
let task: any AnyTask
49+
50+
/// Whether the task needs to finish executing befoer any other task can
51+
/// start in executing in the queue.
52+
let isBarrier: Bool
4953

50-
public init() {
51-
self.lastTaskLock.name = "AsyncQueue.lastTaskLock"
54+
/// A unique value used to identify the task. This allows tasks to get
55+
/// removed from `pendingTasks` again after they finished executing.
56+
let id: UUID
57+
}
58+
59+
/// Whether the queue allows concurrent execution of tasks.
60+
private let kind: QueueKind
61+
62+
/// Lock guarding `pendingTasks`.
63+
private let pendingTasksLock = NSLock()
64+
65+
/// Pending tasks that have not finished execution yet.
66+
private var pendingTasks = [PendingTask]()
67+
68+
public init(_ kind: QueueKind) {
69+
self.kind = kind
70+
self.pendingTasksLock.name = "AsyncQueue"
5271
}
5372

5473
/// Schedule a new closure to be executed on the queue.
5574
///
56-
/// All previously added tasks are guaranteed to finished executing before
57-
/// this closure gets executed.
75+
/// If this is a serial queue, all previously added tasks are guaranteed to
76+
/// finished executing before this closure gets executed.
77+
///
78+
/// If this is a barrier, all previously scheduled tasks are guaranteed to
79+
/// finish execution before the barrier is executed and all tasks that are
80+
/// added later will wait until the barrier finishes execution.
5881
@discardableResult
5982
public func async<Success: Sendable>(
6083
priority: TaskPriority? = nil,
84+
barrier isBarrier: Bool = false,
6185
@_inheritActorContext operation: @escaping @Sendable () async -> Success
6286
) -> Task<Success, Never> {
6387
let id = UUID()
6488

65-
return lastTaskLock.withLock {
66-
let task = Task<Success, Never>(priority: priority) { [previousLastTask = lastTask] in
67-
await previousLastTask?.task.waitForCompletion()
68-
69-
defer {
70-
lastTaskLock.withLock {
71-
// If we haven't queued a new task since enquing this one, we can clear
72-
// last task.
73-
if self.lastTask?.id == id {
74-
self.lastTask = nil
75-
}
76-
}
89+
return pendingTasksLock.withLock {
90+
// Build the list of tasks that need to finishe exeuction before this one
91+
// can be executed
92+
let dependencies: [PendingTask]
93+
switch (kind, isBarrier: isBarrier) {
94+
case (.concurrent, isBarrier: true):
95+
// Wait for all tasks after the last barrier.
96+
let lastBarrierIndex = pendingTasks.lastIndex(where: { $0.isBarrier }) ?? pendingTasks.startIndex
97+
dependencies = Array(pendingTasks[lastBarrierIndex...])
98+
case (.concurrent, isBarrier: false):
99+
// If there is a barrier, wait for it.
100+
dependencies = [pendingTasks.last(where: { $0.isBarrier })].compactMap { $0 }
101+
case (.serial, _):
102+
// We are in a serial queue. The last pending task must finish for this one to start.
103+
dependencies = [pendingTasks.last].compactMap { $0 }
104+
}
105+
106+
107+
// Schedule the task.
108+
let task = Task {
109+
for dependency in dependencies {
110+
await dependency.task.waitForCompletion()
111+
}
112+
113+
let result = await operation()
114+
115+
pendingTasksLock.withLock {
116+
pendingTasks.removeAll(where: { $0.id == id })
77117
}
78118

79-
return await operation()
119+
return result
80120
}
81121

82-
lastTask = (task, id)
122+
pendingTasks.append(PendingTask(task: task, isBarrier: isBarrier, id: id))
83123

84124
return task
85125
}

Sources/LanguageServerProtocol/Connection.swift

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,15 @@ public protocol MessageHandler: AnyObject {
4646
/// The method should return as soon as the notification has been sufficiently
4747
/// handled to avoid out-of-order requests, e.g. once the notification has
4848
/// been forwarded to clangd.
49-
func handle(_ params: some NotificationType, from clientID: ObjectIdentifier) async
49+
func handle(_ params: some NotificationType, from clientID: ObjectIdentifier)
5050

5151
/// Handle a request and (asynchronously) receive a reply.
5252
///
5353
/// The method should return as soon as the request has been sufficiently
5454
/// handled to avoid out-of-order requests, e.g. once the corresponding
5555
/// request has been sent to sourcekitd. The actual semantic computation
5656
/// should occur after the method returns and report the result via `reply`.
57-
func handle<Request: RequestType>(_ params: Request, id: RequestID, from clientID: ObjectIdentifier, reply: @escaping (LSPResult<Request.Response>) -> Void) async
57+
func handle<Request: RequestType>(_ params: Request, id: RequestID, from clientID: ObjectIdentifier, reply: @escaping (LSPResult<Request.Response>) -> Void)
5858
}
5959

6060
/// A connection between two message handlers in the same process.
@@ -77,18 +77,7 @@ public final class LocalConnection {
7777

7878
/// The queue guarding `_nextRequestID`.
7979
let queue: DispatchQueue = DispatchQueue(label: "local-connection-queue")
80-
81-
/// The queue on which all messages (notifications, requests, responses) are
82-
/// handled.
83-
///
84-
/// The queue is blocked until the message has been sufficiently handled to
85-
/// avoid out-of-order handling of messages. For sourcekitd, this means that
86-
/// a request has been sent to sourcekitd and for clangd, this means that we
87-
/// have forwarded the request to clangd.
88-
///
89-
/// The actual semantic handling of the message happens off this queue.
90-
let messageHandlingQueue: AsyncQueue = AsyncQueue()
91-
80+
9281
var _nextRequestID: Int = 0
9382

9483
var state: State = .ready
@@ -125,9 +114,7 @@ public final class LocalConnection {
125114

126115
extension LocalConnection: Connection {
127116
public func send<Notification>(_ notification: Notification) where Notification: NotificationType {
128-
messageHandlingQueue.async {
129-
await self.handler?.handle(notification, from: ObjectIdentifier(self))
130-
}
117+
self.handler?.handle(notification, from: ObjectIdentifier(self))
131118
}
132119

133120
public func send<Request: RequestType>(
@@ -137,19 +124,17 @@ extension LocalConnection: Connection {
137124
) -> RequestID {
138125
let id = nextRequestID()
139126

140-
messageHandlingQueue.async {
141-
guard let handler = self.handler else {
142-
queue.async {
143-
reply(.failure(.serverCancelled))
144-
}
145-
return
127+
guard let handler = self.handler else {
128+
queue.async {
129+
reply(.failure(.serverCancelled))
146130
}
131+
return id
132+
}
147133

148-
precondition(self.state == .started)
149-
await handler.handle(request, id: id, from: ObjectIdentifier(self)) { result in
150-
queue.async {
151-
reply(result)
152-
}
134+
precondition(self.state == .started)
135+
handler.handle(request, id: id, from: ObjectIdentifier(self)) { result in
136+
queue.async {
137+
reply(result)
153138
}
154139
}
155140

Sources/LanguageServerProtocol/Message.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public protocol _RequestType: MessageType {
2828
id: RequestID,
2929
connection: Connection,
3030
reply: @escaping (LSPResult<ResponseType>, RequestID) -> Void
31-
) async
31+
)
3232
}
3333

3434
/// A request, which must have a unique `method` name as well as an associated response type.
@@ -54,16 +54,16 @@ extension RequestType {
5454
id: RequestID,
5555
connection: Connection,
5656
reply: @escaping (LSPResult<ResponseType>, RequestID) -> Void
57-
) async {
58-
await handler.handle(self, id: id, from: ObjectIdentifier(connection)) { response in
57+
) {
58+
handler.handle(self, id: id, from: ObjectIdentifier(connection)) { response in
5959
reply(response.map({ $0 as ResponseType }), id)
6060
}
6161
}
6262
}
6363

6464
extension NotificationType {
65-
public func _handle(_ handler: MessageHandler, connection: Connection) async {
66-
await handler.handle(self, from: ObjectIdentifier(connection))
65+
public func _handle(_ handler: MessageHandler, connection: Connection) {
66+
handler.handle(self, from: ObjectIdentifier(connection))
6767
}
6868
}
6969

Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,6 @@ public final class JSONRPCConnection {
3131
/// The queue on which we send data.
3232
let sendQueue: DispatchQueue = DispatchQueue(label: "jsonrpc-send-queue", qos: .userInitiated)
3333

34-
/// The queue on which all messages (notifications, requests, responses) are
35-
/// handled.
36-
///
37-
/// The queue is blocked until the message has been sufficiently handled to
38-
/// avoid out-of-order handling of messages. For sourcekitd, this means that
39-
/// a request has been sent to sourcekitd and for clangd, this means that we
40-
/// have forwarded the request to clangd.
41-
///
42-
/// The actual semantic handling of the message happens off this queue.
43-
let messageHandlingQueue: AsyncQueue = AsyncQueue()
4434
let receiveIO: DispatchIO
4535
let sendIO: DispatchIO
4636
let messageRegistry: MessageRegistry
@@ -282,17 +272,14 @@ public final class JSONRPCConnection {
282272
func handle(_ message: JSONRPCMessage) {
283273
switch message {
284274
case .notification(let notification):
285-
messageHandlingQueue.async {
286-
await notification._handle(self.receiveHandler!, connection: self)
287-
}
275+
notification._handle(self.receiveHandler!, connection: self)
288276
case .request(let request, id: let id):
289277
let semaphore: DispatchSemaphore? = syncRequests ? .init(value: 0) : nil
290-
messageHandlingQueue.async {
291-
await request._handle(self.receiveHandler!, id: id, connection: self) { (response, id) in
292-
self.sendReply(response, id: id)
293-
semaphore?.signal()
294-
}
278+
request._handle(self.receiveHandler!, id: id, connection: self) { (response, id) in
279+
self.sendReply(response, id: id)
280+
semaphore?.signal()
295281
}
282+
296283
semaphore?.wait()
297284

298285
case .response(let response, id: let id):

Sources/SKCore/BuildServerBuildSystem.swift

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,17 @@ public actor BuildServerBuildSystem: MessageHandler {
5151

5252
var buildServer: JSONRPCConnection?
5353

54+
/// The queue on which all messages that originate from the build server are
55+
/// handled.
56+
///
57+
/// These are requests and notifications sent *from* the build server,
58+
/// not replies from the build server.
59+
///
60+
/// This ensures that messages from the build server are handled in the order
61+
/// they were received. Swift concurrency does not guarentee in-order
62+
/// execution of tasks.
63+
public let bspMessageHandlingQueue = AsyncQueue(.serial)
64+
5465
let searchPaths: [AbsolutePath]
5566

5667
public private(set) var indexDatabasePath: AbsolutePath?
@@ -167,18 +178,20 @@ public actor BuildServerBuildSystem: MessageHandler {
167178
/// the build server has sent us a notification.
168179
///
169180
/// We need to notify the delegate about any updated build settings.
170-
public func handle(_ params: some NotificationType, from clientID: ObjectIdentifier) async {
171-
if let params = params as? BuildTargetsChangedNotification {
172-
await self.handleBuildTargetsChanged(Notification(params, clientID: clientID))
173-
} else if let params = params as? FileOptionsChangedNotification {
174-
await self.handleFileOptionsChanged(Notification(params, clientID: clientID))
181+
public nonisolated func handle(_ params: some NotificationType, from clientID: ObjectIdentifier) {
182+
bspMessageHandlingQueue.async {
183+
if let params = params as? BuildTargetsChangedNotification {
184+
await self.handleBuildTargetsChanged(Notification(params, clientID: clientID))
185+
} else if let params = params as? FileOptionsChangedNotification {
186+
await self.handleFileOptionsChanged(Notification(params, clientID: clientID))
187+
}
175188
}
176189
}
177190

178191
/// Handler for requests received **from** the build server.
179192
///
180193
/// We currently can't handle any requests sent from the build server to us.
181-
public func handle<R: RequestType>(
194+
public nonisolated func handle<R: RequestType>(
182195
_ params: R,
183196
id: RequestID,
184197
from clientID: ObjectIdentifier,

0 commit comments

Comments
 (0)