Skip to content

Commit 4aece57

Browse files
committed
termination handler
motivation: allow clients of SwiftPM to terminate background activities changes: * introduce new terminator class that is a registry for termination handlers * use the terminator instad of ProcessSet to manage the termination of active processes (eg git, tests) and build system * add cancel method to GitRepositoryProvider protocol so that it can be interuppted * add cancel methods to RepositoryManager, RegistryDownloadManager and HTTPClient so that it can be interuppted * change workspace initializer to take a terminator so that CLI and other consumer of libSwiftPM can interrupt * register intrupption points mentioned above in workspace * adjust related call sites, ie CLI signal handler now uses the workspace terminator to interuupt * add tests rdar://64900054 rdar://63723896
1 parent 511ffb3 commit 4aece57

26 files changed

+659
-99
lines changed

Sources/Basics/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ add_library(Basics
2828
Triple+Extensions.swift
2929
SwiftVersion.swift
3030
SQLiteBackedCache.swift
31+
Terminator.swift
3132
Version+Extensions.swift)
3233
target_link_libraries(Basics PUBLIC
3334
SwiftCollections::OrderedCollections

Sources/Basics/HTTPClient.swift

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import class Foundation.JSONDecoder
1515
import class Foundation.NSError
1616
import class Foundation.OperationQueue
1717
import struct Foundation.URL
18+
import struct Foundation.UUID
1819
import TSCBasic
1920

2021
#if canImport(Glibc)
@@ -35,7 +36,7 @@ public enum HTTPClientError: Error, Equatable {
3536

3637
// MARK: - HTTPClient
3738

38-
public struct HTTPClient {
39+
public struct HTTPClient: Cancellable {
3940
public typealias Configuration = HTTPClientConfiguration
4041
public typealias Request = HTTPClientRequest
4142
public typealias Response = HTTPClientResponse
@@ -47,9 +48,12 @@ public struct HTTPClient {
4748
private let underlying: Handler
4849

4950
/// DispatchSemaphore to restrict concurrent operations on manager.
50-
private let concurrencySemaphore: DispatchSemaphore
51-
/// OperationQueue to park pending requests
52-
private let requestsQueue: OperationQueue
51+
private let concurrencySemaphore: DispatchSemaphore
52+
/// OperationQueue to park pending requests
53+
private let requestsQueue: OperationQueue
54+
55+
// tracks outstanding requests for cancellation
56+
private var outstandingRequests = ThreadSafeKeyValueStore<UUID, (completion: CompletionHandler, progress: ProgressHandler?, queue: DispatchQueue)>()
5357

5458
// static to share across instances of the http client
5559
private static var hostsErrorsLock = Lock()
@@ -76,7 +80,12 @@ public struct HTTPClient {
7680
/// - observabilityScope: the observability scope to emit diagnostics on
7781
/// - progress: A progress handler to handle progress for example for downloads
7882
/// - completion: A completion handler to be notified of the completion of the request.
79-
public func execute(_ request: Request, observabilityScope: ObservabilityScope? = nil, progress: ProgressHandler? = nil, completion: @escaping CompletionHandler) {
83+
public func execute(
84+
_ request: Request,
85+
observabilityScope: ObservabilityScope? = nil,
86+
progress: ProgressHandler? = nil,
87+
completion: @escaping CompletionHandler
88+
) {
8089
// merge configuration
8190
var request = request
8291
if request.options.callbackQueue == nil {
@@ -107,7 +116,9 @@ public struct HTTPClient {
107116
request.headers.add(name: "Authorization", value: authorization)
108117
}
109118
// execute
110-
let callbackQueue = request.options.callbackQueue ?? self.configuration.callbackQueue
119+
guard let callbackQueue = request.options.callbackQueue else {
120+
return completion(.failure(InternalError("unknown callback queue")))
121+
}
111122
self._execute(
112123
request: request,
113124
requestNumber: 0,
@@ -129,13 +140,44 @@ public struct HTTPClient {
129140
)
130141
}
131142

132-
private func _execute(request: Request, requestNumber: Int, observabilityScope: ObservabilityScope?, progress: ProgressHandler?, completion: @escaping CompletionHandler) {
143+
/// Cancel any outstanding requests
144+
public func cancel(deadline: DispatchTime) throws {
145+
let outstanding = self.outstandingRequests.get()
146+
self.outstandingRequests.clear()
147+
for (completion, _, queue) in outstanding.values {
148+
queue.async {
149+
completion(.failure(CancellationError()))
150+
}
151+
}
152+
}
153+
154+
private func _execute(
155+
request: Request,
156+
requestNumber: Int,
157+
observabilityScope: ObservabilityScope?,
158+
progress: ProgressHandler?,
159+
completion: @escaping CompletionHandler
160+
) {
161+
// records outstanding requests for cancellation purposes
162+
guard let callbackQueue = request.options.callbackQueue else {
163+
return completion(.failure(InternalError("unknown callback queue")))
164+
}
165+
let requestKey = UUID()
166+
self.outstandingRequests[requestKey] = (completion: completion, progress: progress, queue: callbackQueue)
167+
133168
// wrap completion handler with concurrency control cleanup
134169
let originalCompletion = completion
135170
let completion: CompletionHandler = { result in
136171
// free concurrency control semaphore
137172
self.concurrencySemaphore.signal()
138-
originalCompletion(result)
173+
// cancellation support
174+
// if the callback is no longer on the pending lists it has been canceled already
175+
if let (callback, _, queue) = self.outstandingRequests[requestKey] {
176+
// remove from outstanding requests
177+
self.outstandingRequests[requestKey] = nil
178+
// call back on the request queue
179+
queue.async { callback(result) }
180+
}
139181
}
140182

141183
// we must not block the calling thread (for concurrency control) so nesting this in a queue
@@ -172,9 +214,11 @@ public struct HTTPClient {
172214
// handle retry strategy
173215
if let retryDelay = self.shouldRetry(response: response, request: request, requestNumber: requestNumber) {
174216
observabilityScope?.emit(warning: "\(request.url) failed, retrying in \(retryDelay)")
175-
// free concurrency control semaphore, since we re-submitting the request with the original completion handler
176-
// using the wrapped completion handler may lead to starving the mac concurrent requests
217+
// free concurrency control semaphore and outstanding request,
218+
// since we re-submitting the request with the original completion handler
219+
// using the wrapped completion handler may lead to starving the max concurrent requests
177220
self.concurrencySemaphore.signal()
221+
self.outstandingRequests[requestKey] = nil
178222
// TODO: dedicated retry queue?
179223
return self.configuration.callbackQueue.asyncAfter(deadline: .now() + retryDelay) {
180224
self._execute(request: request, requestNumber: requestNumber + 1, observabilityScope: observabilityScope, progress: progress, completion: originalCompletion)

Sources/Basics/Terminator.swift

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
This source file is part of the Swift.org open source project
3+
4+
Copyright (c) 2022 Apple Inc. and the Swift project authors
5+
Licensed under Apache License v2.0 with Runtime Library Exception
6+
7+
See http://swift.org/LICENSE.txt for license information
8+
See http://swift.org/CONTRIBUTORS.txt for Swift project authors
9+
*/
10+
11+
import Dispatch
12+
import Foundation
13+
import TSCBasic
14+
15+
public typealias TerminationHandler = (DispatchTime) throws -> Void
16+
17+
public class Terminator: Cancellable {
18+
public typealias RegistrationKey = String
19+
20+
private let observabilityScope: ObservabilityScope?
21+
private let registry = ThreadSafeKeyValueStore<String, (name: String, handler: TerminationHandler)>()
22+
private let terminationQueue = DispatchQueue(label: "org.swift.swiftpm.terminator", qos: .userInteractive, attributes: .concurrent)
23+
private let terminating = ThreadSafeBox<Bool>(false)
24+
25+
public init(observabilityScope: ObservabilityScope?) {
26+
self.observabilityScope = observabilityScope
27+
}
28+
29+
@discardableResult
30+
public func register(name: String, handler: @escaping TerminationHandler) -> RegistrationKey? {
31+
if self.terminating.get(default: false) {
32+
self.observabilityScope?.emit(debug: "not registering '\(name)' with terminator, termination in progress")
33+
return .none
34+
}
35+
let key = UUID().uuidString
36+
self.observabilityScope?.emit(debug: "registering '\(name)' with terminator")
37+
self.registry[key] = (name: name, handler: handler)
38+
return key
39+
}
40+
41+
@discardableResult
42+
public func register(name: String, handler: Cancellable) -> RegistrationKey? {
43+
self.register(name: name, handler: handler.cancel(deadline:))
44+
}
45+
46+
@discardableResult
47+
public func register(name: String, handler: @escaping () throws -> Void) -> RegistrationKey? {
48+
self.register(name: name, handler: { _ in try handler() })
49+
}
50+
51+
public func register(_ process: TSCBasic.Process) -> RegistrationKey? {
52+
self.register(name: "\(process.arguments.joined(separator: " "))", handler: process.terminate)
53+
}
54+
55+
public func deregister(_ key: RegistrationKey) {
56+
self.registry[key] = nil
57+
}
58+
59+
@discardableResult
60+
public func terminate(deadline: DispatchTime? = .none) -> Int {
61+
self.terminating.put(true)
62+
63+
self.observabilityScope?.emit(info: "starting termination cycle with \(self.registry.count) termination handlers registered")
64+
65+
let deadline = deadline ?? .now() + .seconds(30)
66+
let terminationHandlers = self.registry.get()
67+
let terminated = ThreadSafeArrayStore<String>()
68+
let group = DispatchGroup()
69+
for (_, (name, handler)) in terminationHandlers {
70+
self.terminationQueue.async(group: group) {
71+
do {
72+
self.observabilityScope?.emit(debug: "terminating '\(name)'")
73+
// TODO: consider making this configurable
74+
// deadline for individual handlers set slightly before overall deadline
75+
let handlerDeadline = deadline - .seconds(5)
76+
try handler(handlerDeadline)
77+
terminated.append(name)
78+
} catch {
79+
self.observabilityScope?.emit(warning: "failed terminating '\(name)': \(error)")
80+
}
81+
}
82+
}
83+
84+
if case .timedOut = group.wait(timeout: deadline) {
85+
self.observabilityScope?.emit(warning: "timeout waiting for termination with \(terminationHandlers.count - terminated.count) termination handlers remaining")
86+
} else {
87+
self.observabilityScope?.emit(info: "termination cycle completed successfully")
88+
}
89+
90+
self.terminating.put(false)
91+
92+
return terminated.count
93+
}
94+
95+
public func cancel(deadline: DispatchTime) throws {
96+
self.terminate(deadline: deadline)
97+
}
98+
}
99+
100+
public protocol Cancellable {
101+
func cancel(deadline: DispatchTime) throws -> Void
102+
}
103+
104+
public struct CancellationError: Error, CustomStringConvertible {
105+
public let description = "Operation cancelled"
106+
107+
public init() {}
108+
}
109+
110+
extension TSCBasic.Process {
111+
fileprivate func terminate(timeout: DispatchTime) {
112+
// send graceful shutdown signal
113+
self.signal(SIGINT)
114+
115+
// start a thread to see if we need to terminate more forcibly
116+
let forceKillSemaphore = DispatchSemaphore(value: 0)
117+
let forceKillThread = TSCBasic.Thread {
118+
if case .timedOut = forceKillSemaphore.wait(timeout: timeout) {
119+
// send a force-kill signal
120+
#if os(Windows)
121+
self.signal(SIGTERM)
122+
#else
123+
self.signal(SIGKILL)
124+
#endif
125+
}
126+
}
127+
forceKillThread.start()
128+
_ = try? self.waitUntilExit()
129+
forceKillSemaphore.signal() // let the force-kill thread know we do not need it any more
130+
// join the force-kill thread thread so we don't exit before everything terminates
131+
forceKillThread.join()
132+
}
133+
}

Sources/Build/BuildOperation.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public final class BuildOperation: PackageStructureDelegate, SPMBuildCore.BuildS
140140
}
141141

142142
/// Cancel the active build operation.
143-
public func cancel() {
143+
public func cancel(deadline: DispatchTime) throws {
144144
buildSystem?.cancel()
145145
}
146146

Sources/Commands/APIDigester.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,10 @@ struct APIDigesterBaselineDumper {
102102
try workingCopy.checkout(revision: baselineRevision)
103103

104104
// Create the workspace for this package.
105-
let workspace = try Workspace(forRootPackage: baselinePackageRoot)
105+
let workspace = try Workspace(
106+
forRootPackage: baselinePackageRoot,
107+
terminator: swiftTool.terminator
108+
)
106109

107110
let graph = try workspace.loadPackageGraph(
108111
rootPath: baselinePackageRoot,

Sources/Commands/SwiftPackageTool.swift

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,21 +1170,14 @@ final class PluginDelegate: PluginInvocationDelegate {
11701170

11711171
// Create a build operation. We have to disable the cache in order to get a build plan created.
11721172
let outputStream = BufferedOutputByteStream()
1173-
let buildOperation = BuildOperation(
1174-
buildParameters: buildParameters,
1173+
let buildOperation = try swiftTool.createBuildOperation(
1174+
explicitProduct: explicitProduct,
11751175
cacheBuildManifest: false,
1176-
packageGraphLoader: { try self.swiftTool.loadPackageGraph(explicitProduct: explicitProduct) },
1177-
pluginScriptRunner: try self.swiftTool.getPluginScriptRunner(),
1178-
pluginWorkDirectory: try self.swiftTool.getActiveWorkspace().location.pluginWorkingDirectory,
1179-
outputStream: outputStream,
1180-
logLevel: logLevel,
1181-
fileSystem: swiftTool.fileSystem,
1182-
observabilityScope: self.swiftTool.observabilityScope
1176+
customBuildParameters: buildParameters,
1177+
customOutputStream: outputStream,
1178+
customLogLevel: logLevel
11831179
)
11841180

1185-
// Save the instance so it can be canceled from the interrupt handler.
1186-
self.swiftTool.buildSystemRef.buildSystem = buildOperation
1187-
11881181
// Get or create the build description and plan the build.
11891182
let _ = try buildOperation.getBuildDescription()
11901183
let buildPlan = buildOperation.buildPlan!
@@ -1280,7 +1273,7 @@ final class PluginDelegate: PluginInvocationDelegate {
12801273
let testRunner = TestRunner(
12811274
bundlePaths: [testProduct.bundlePath],
12821275
xctestArg: testSpecifier,
1283-
processSet: swiftTool.processSet,
1276+
terminator: swiftTool.terminator,
12841277
toolchain: toolchain,
12851278
testEnv: testEnvironment,
12861279
observabilityScope: swiftTool.observabilityScope)
@@ -1564,6 +1557,7 @@ extension SwiftPackageTool {
15641557
projectName: projectName,
15651558
xcodeprojPath: xcodeprojPath,
15661559
graph: graph,
1560+
repositoryProvider: GitRepositoryProvider(),
15671561
options: genOptions,
15681562
fileSystem: swiftTool.fileSystem,
15691563
observabilityScope: swiftTool.observabilityScope

0 commit comments

Comments
 (0)