Skip to content

Asynchronously return the request result for folding range requests #849

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 69 additions & 29 deletions Sources/LanguageServerProtocol/AsyncQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,53 +33,93 @@ extension NSLock {
}
}

/// A serial queue that allows the execution of asyncronous blocks of code.
/// A queue that allows the execution of asyncronous blocks of code.
public final class AsyncQueue {
/// Lock guarding `lastTask`.
private let lastTaskLock = NSLock()
public enum QueueKind {
/// A queue that allows concurrent execution of tasks.
case concurrent

/// The last scheduled task if it hasn't finished yet.
///
/// Any newly scheduled tasks need to await this task to ensure that tasks are
/// executed syncronously.
///
/// `id` is a unique value to identify the task. This allows us to set `lastTask`
/// to `nil` if the queue runs empty.
private var lastTask: (task: AnyTask, id: UUID)?
/// A queue that executes one task after the other.
case serial
}

private struct PendingTask {
/// The task that is pending.
let task: any AnyTask

/// Whether the task needs to finish executing befoer any other task can
/// start in executing in the queue.
let isBarrier: Bool

public init() {
self.lastTaskLock.name = "AsyncQueue.lastTaskLock"
/// A unique value used to identify the task. This allows tasks to get
/// removed from `pendingTasks` again after they finished executing.
let id: UUID
}

/// Whether the queue allows concurrent execution of tasks.
private let kind: QueueKind

/// Lock guarding `pendingTasks`.
private let pendingTasksLock = NSLock()

/// Pending tasks that have not finished execution yet.
private var pendingTasks = [PendingTask]()

public init(_ kind: QueueKind) {
self.kind = kind
self.pendingTasksLock.name = "AsyncQueue"
}

/// Schedule a new closure to be executed on the queue.
///
/// All previously added tasks are guaranteed to finished executing before
/// this closure gets executed.
/// If this is a serial queue, all previously added tasks are guaranteed to
/// finished executing before this closure gets executed.
///
/// If this is a barrier, all previously scheduled tasks are guaranteed to
/// finish execution before the barrier is executed and all tasks that are
/// added later will wait until the barrier finishes execution.
@discardableResult
public func async<Success: Sendable>(
priority: TaskPriority? = nil,
barrier isBarrier: Bool = false,
@_inheritActorContext operation: @escaping @Sendable () async -> Success
) -> Task<Success, Never> {
let id = UUID()

return lastTaskLock.withLock {
let task = Task<Success, Never>(priority: priority) { [previousLastTask = lastTask] in
await previousLastTask?.task.waitForCompletion()

defer {
lastTaskLock.withLock {
// If we haven't queued a new task since enquing this one, we can clear
// last task.
if self.lastTask?.id == id {
self.lastTask = nil
}
}
return pendingTasksLock.withLock {
// Build the list of tasks that need to finishe exeuction before this one
// can be executed
let dependencies: [PendingTask]
switch (kind, isBarrier: isBarrier) {
case (.concurrent, isBarrier: true):
// Wait for all tasks after the last barrier.
let lastBarrierIndex = pendingTasks.lastIndex(where: { $0.isBarrier }) ?? pendingTasks.startIndex
dependencies = Array(pendingTasks[lastBarrierIndex...])
case (.concurrent, isBarrier: false):
// If there is a barrier, wait for it.
dependencies = [pendingTasks.last(where: { $0.isBarrier })].compactMap { $0 }
case (.serial, _):
// We are in a serial queue. The last pending task must finish for this one to start.
dependencies = [pendingTasks.last].compactMap { $0 }
}


// Schedule the task.
let task = Task {
for dependency in dependencies {
await dependency.task.waitForCompletion()
}

let result = await operation()

pendingTasksLock.withLock {
pendingTasks.removeAll(where: { $0.id == id })
}

return await operation()
return result
}

lastTask = (task, id)
pendingTasks.append(PendingTask(task: task, isBarrier: isBarrier, id: id))

return task
}
Expand Down
41 changes: 13 additions & 28 deletions Sources/LanguageServerProtocol/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ public protocol MessageHandler: AnyObject {
/// The method should return as soon as the notification has been sufficiently
/// handled to avoid out-of-order requests, e.g. once the notification has
/// been forwarded to clangd.
func handle(_ params: some NotificationType, from clientID: ObjectIdentifier) async
func handle(_ params: some NotificationType, from clientID: ObjectIdentifier)

/// Handle a request and (asynchronously) receive a reply.
///
/// The method should return as soon as the request has been sufficiently
/// handled to avoid out-of-order requests, e.g. once the corresponding
/// request has been sent to sourcekitd. The actual semantic computation
/// should occur after the method returns and report the result via `reply`.
func handle<Request: RequestType>(_ params: Request, id: RequestID, from clientID: ObjectIdentifier, reply: @escaping (LSPResult<Request.Response>) -> Void) async
func handle<Request: RequestType>(_ params: Request, id: RequestID, from clientID: ObjectIdentifier, reply: @escaping (LSPResult<Request.Response>) -> Void)
}

/// A connection between two message handlers in the same process.
Expand All @@ -77,18 +77,7 @@ public final class LocalConnection {

/// The queue guarding `_nextRequestID`.
let queue: DispatchQueue = DispatchQueue(label: "local-connection-queue")

/// The queue on which all messages (notifications, requests, responses) are
/// handled.
///
/// The queue is blocked until the message has been sufficiently handled to
/// avoid out-of-order handling of messages. For sourcekitd, this means that
/// a request has been sent to sourcekitd and for clangd, this means that we
/// have forwarded the request to clangd.
///
/// The actual semantic handling of the message happens off this queue.
let messageHandlingQueue: AsyncQueue = AsyncQueue()


var _nextRequestID: Int = 0

var state: State = .ready
Expand Down Expand Up @@ -125,9 +114,7 @@ public final class LocalConnection {

extension LocalConnection: Connection {
public func send<Notification>(_ notification: Notification) where Notification: NotificationType {
messageHandlingQueue.async {
await self.handler?.handle(notification, from: ObjectIdentifier(self))
}
self.handler?.handle(notification, from: ObjectIdentifier(self))
}

public func send<Request: RequestType>(
Expand All @@ -137,19 +124,17 @@ extension LocalConnection: Connection {
) -> RequestID {
let id = nextRequestID()

messageHandlingQueue.async {
guard let handler = self.handler else {
queue.async {
reply(.failure(.serverCancelled))
}
return
guard let handler = self.handler else {
queue.async {
reply(.failure(.serverCancelled))
}
return id
}

precondition(self.state == .started)
await handler.handle(request, id: id, from: ObjectIdentifier(self)) { result in
queue.async {
reply(result)
}
precondition(self.state == .started)
handler.handle(request, id: id, from: ObjectIdentifier(self)) { result in
queue.async {
reply(result)
}
}

Expand Down
10 changes: 5 additions & 5 deletions Sources/LanguageServerProtocol/Message.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public protocol _RequestType: MessageType {
id: RequestID,
connection: Connection,
reply: @escaping (LSPResult<ResponseType>, RequestID) -> Void
) async
)
}

/// A request, which must have a unique `method` name as well as an associated response type.
Expand All @@ -54,16 +54,16 @@ extension RequestType {
id: RequestID,
connection: Connection,
reply: @escaping (LSPResult<ResponseType>, RequestID) -> Void
) async {
await handler.handle(self, id: id, from: ObjectIdentifier(connection)) { response in
) {
handler.handle(self, id: id, from: ObjectIdentifier(connection)) { response in
reply(response.map({ $0 as ResponseType }), id)
}
}
}

extension NotificationType {
public func _handle(_ handler: MessageHandler, connection: Connection) async {
await handler.handle(self, from: ObjectIdentifier(connection))
public func _handle(_ handler: MessageHandler, connection: Connection) {
handler.handle(self, from: ObjectIdentifier(connection))
}
}

Expand Down
23 changes: 5 additions & 18 deletions Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,6 @@ public final class JSONRPCConnection {
/// The queue on which we send data.
let sendQueue: DispatchQueue = DispatchQueue(label: "jsonrpc-send-queue", qos: .userInitiated)

/// The queue on which all messages (notifications, requests, responses) are
/// handled.
///
/// The queue is blocked until the message has been sufficiently handled to
/// avoid out-of-order handling of messages. For sourcekitd, this means that
/// a request has been sent to sourcekitd and for clangd, this means that we
/// have forwarded the request to clangd.
///
/// The actual semantic handling of the message happens off this queue.
let messageHandlingQueue: AsyncQueue = AsyncQueue()
let receiveIO: DispatchIO
let sendIO: DispatchIO
let messageRegistry: MessageRegistry
Expand Down Expand Up @@ -282,17 +272,14 @@ public final class JSONRPCConnection {
func handle(_ message: JSONRPCMessage) {
switch message {
case .notification(let notification):
messageHandlingQueue.async {
await notification._handle(self.receiveHandler!, connection: self)
}
notification._handle(self.receiveHandler!, connection: self)
case .request(let request, id: let id):
let semaphore: DispatchSemaphore? = syncRequests ? .init(value: 0) : nil
messageHandlingQueue.async {
await request._handle(self.receiveHandler!, id: id, connection: self) { (response, id) in
self.sendReply(response, id: id)
semaphore?.signal()
}
request._handle(self.receiveHandler!, id: id, connection: self) { (response, id) in
self.sendReply(response, id: id)
semaphore?.signal()
}

semaphore?.wait()

case .response(let response, id: let id):
Expand Down
25 changes: 19 additions & 6 deletions Sources/SKCore/BuildServerBuildSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ public actor BuildServerBuildSystem: MessageHandler {

var buildServer: JSONRPCConnection?

/// The queue on which all messages that originate from the build server are
/// handled.
///
/// These are requests and notifications sent *from* the build server,
/// not replies from the build server.
///
/// This ensures that messages from the build server are handled in the order
/// they were received. Swift concurrency does not guarentee in-order
/// execution of tasks.
public let bspMessageHandlingQueue = AsyncQueue(.serial)

let searchPaths: [AbsolutePath]

public private(set) var indexDatabasePath: AbsolutePath?
Expand Down Expand Up @@ -167,18 +178,20 @@ public actor BuildServerBuildSystem: MessageHandler {
/// the build server has sent us a notification.
///
/// We need to notify the delegate about any updated build settings.
public func handle(_ params: some NotificationType, from clientID: ObjectIdentifier) async {
if let params = params as? BuildTargetsChangedNotification {
await self.handleBuildTargetsChanged(Notification(params, clientID: clientID))
} else if let params = params as? FileOptionsChangedNotification {
await self.handleFileOptionsChanged(Notification(params, clientID: clientID))
public nonisolated func handle(_ params: some NotificationType, from clientID: ObjectIdentifier) {
bspMessageHandlingQueue.async {
if let params = params as? BuildTargetsChangedNotification {
await self.handleBuildTargetsChanged(Notification(params, clientID: clientID))
} else if let params = params as? FileOptionsChangedNotification {
await self.handleFileOptionsChanged(Notification(params, clientID: clientID))
}
}
}

/// Handler for requests received **from** the build server.
///
/// We currently can't handle any requests sent from the build server to us.
public func handle<R: RequestType>(
public nonisolated func handle<R: RequestType>(
_ params: R,
id: RequestID,
from clientID: ObjectIdentifier,
Expand Down
Loading