Skip to content

Commit 76971e4

Browse files
committed
termination handler
motivation: allow clients of SwiftPM to terminate background activities changes: * introduce new terminator class that is a registry for termination handlers * use the terminator instad of ProcessSet to manage the termination of active processes (eg git, tests) and build system * add cancel method to GitRepositoryProvider protocol so that it can be interuppted * add cancel methods to RepositoryManager, RegistryDownloadManager and HTTPClient so that it can be interuppted * change workspace initializer to take a terminator so that CLI and other consumer of libSwiftPM can interrupt * register intrupption points mentioned above in workspace * adjust related call sites, ie CLI signal handler now uses the workspace terminator to interuupt * add tests rdar://64900054 rdar://63723896
1 parent 2d04d7e commit 76971e4

28 files changed

+865
-92
lines changed

Sources/Basics/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ add_library(Basics
1111
Archiver+Zip.swift
1212
AuthorizationProvider.swift
1313
ByteString+Extensions.swift
14+
Cancellator.swift
1415
ConcurrencyHelpers.swift
1516
Dictionary+Extensions.swift
1617
DispatchTimeInterval+Extensions.swift

Sources/Basics/Cancellator.swift

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
This source file is part of the Swift.org open source project
3+
4+
Copyright (c) 2022 Apple Inc. and the Swift project authors
5+
Licensed under Apache License v2.0 with Runtime Library Exception
6+
7+
See http://swift.org/LICENSE.txt for license information
8+
See http://swift.org/CONTRIBUTORS.txt for Swift project authors
9+
*/
10+
11+
import Dispatch
12+
import Foundation
13+
import TSCBasic
14+
15+
public typealias CancellationHandler = (DispatchTime) throws -> Void
16+
17+
public class Cancellator: Cancellable {
18+
public typealias RegistrationKey = String
19+
20+
private let observabilityScope: ObservabilityScope?
21+
private let registry = ThreadSafeKeyValueStore<String, (name: String, handler: CancellationHandler)>()
22+
private let cancelationQueue = DispatchQueue(label: "org.swift.swiftpm.cancellator", qos: .userInteractive, attributes: .concurrent)
23+
private let cancelling = ThreadSafeBox<Bool>(false)
24+
25+
public init(observabilityScope: ObservabilityScope?) {
26+
self.observabilityScope = observabilityScope
27+
}
28+
29+
@discardableResult
30+
public func register(name: String, handler: @escaping CancellationHandler) -> RegistrationKey? {
31+
if self.cancelling.get(default: false) {
32+
self.observabilityScope?.emit(debug: "not registering '\(name)' with terminator, termination in progress")
33+
return .none
34+
}
35+
let key = UUID().uuidString
36+
self.observabilityScope?.emit(debug: "registering '\(name)' with terminator")
37+
self.registry[key] = (name: name, handler: handler)
38+
return key
39+
}
40+
41+
@discardableResult
42+
public func register(name: String, handler: Cancellable) -> RegistrationKey? {
43+
self.register(name: name, handler: handler.cancel(deadline:))
44+
}
45+
46+
@discardableResult
47+
public func register(name: String, handler: @escaping () throws -> Void) -> RegistrationKey? {
48+
self.register(name: name, handler: { _ in try handler() })
49+
}
50+
51+
public func register(_ process: TSCBasic.Process) -> RegistrationKey? {
52+
self.register(name: "\(process.arguments.joined(separator: " "))", handler: process.terminate)
53+
}
54+
55+
public func deregister(_ key: RegistrationKey) {
56+
self.registry[key] = nil
57+
}
58+
59+
public func cancel(deadline: DispatchTime) throws -> Void {
60+
self._cancel(deadline: deadline)
61+
}
62+
63+
// marked internal for testing
64+
@discardableResult
65+
internal func _cancel(deadline: DispatchTime? = .none)-> Int {
66+
self.cancelling.put(true)
67+
68+
self.observabilityScope?.emit(info: "starting cancellation cycle with \(self.registry.count) cancellation handlers registered")
69+
70+
let deadline = deadline ?? .now() + .seconds(30)
71+
// deadline for individual handlers set slightly before overall deadline
72+
let delta: DispatchTimeInterval = .nanoseconds(abs(deadline.distance(to: .now()).nanoseconds() ?? 0) / 5)
73+
let handlersDeadline = deadline - delta
74+
75+
let cancellationHandlers = self.registry.get()
76+
let cancelled = ThreadSafeArrayStore<String>()
77+
let group = DispatchGroup()
78+
for (_, (name, handler)) in cancellationHandlers {
79+
self.cancelationQueue.async(group: group) {
80+
do {
81+
self.observabilityScope?.emit(debug: "cancelling '\(name)'")
82+
try handler(handlersDeadline)
83+
cancelled.append(name)
84+
} catch {
85+
self.observabilityScope?.emit(warning: "failed cancelling '\(name)': \(error)")
86+
}
87+
}
88+
}
89+
90+
if case .timedOut = group.wait(timeout: deadline) {
91+
self.observabilityScope?.emit(warning: "timeout waiting for cancellation with \(cancellationHandlers.count - cancelled.count) cancellation handlers remaining")
92+
} else {
93+
self.observabilityScope?.emit(info: "cancellation cycle completed successfully")
94+
}
95+
96+
self.cancelling.put(false)
97+
98+
return cancelled.count
99+
}
100+
}
101+
102+
public protocol Cancellable {
103+
func cancel(deadline: DispatchTime) throws -> Void
104+
}
105+
106+
public struct CancellationError: Error, CustomStringConvertible {
107+
public let description = "Operation cancelled"
108+
109+
public init() {}
110+
}
111+
112+
extension TSCBasic.Process {
113+
fileprivate func terminate(timeout: DispatchTime) {
114+
// send graceful shutdown signal
115+
self.signal(SIGINT)
116+
117+
// start a thread to see if we need to terminate more forcibly
118+
let forceKillSemaphore = DispatchSemaphore(value: 0)
119+
let forceKillThread = TSCBasic.Thread {
120+
if case .timedOut = forceKillSemaphore.wait(timeout: timeout) {
121+
// send a force-kill signal
122+
#if os(Windows)
123+
self.signal(SIGTERM)
124+
#else
125+
self.signal(SIGKILL)
126+
#endif
127+
}
128+
}
129+
forceKillThread.start()
130+
_ = try? self.waitUntilExit()
131+
forceKillSemaphore.signal() // let the force-kill thread know we do not need it any more
132+
// join the force-kill thread thread so we don't exit before everything terminates
133+
forceKillThread.join()
134+
}
135+
}

Sources/Basics/ConcurrencyHelpers.swift

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,12 @@ public final class ThreadSafeKeyValueStore<Key, Value> where Key: Hashable {
5555
}
5656
}
5757

58-
public func clear() {
58+
@discardableResult
59+
public func clear() -> [Key: Value] {
5960
self.lock.withLock {
61+
let underlying = self.underlying
6062
self.underlying.removeAll()
63+
return underlying
6164
}
6265
}
6366

@@ -113,9 +116,12 @@ public final class ThreadSafeArrayStore<Value> {
113116
}
114117
}
115118

116-
public func clear() {
119+
@discardableResult
120+
public func clear() -> [Value] {
117121
self.lock.withLock {
118-
self.underlying = []
122+
let underlying = self.underlying
123+
self.underlying.removeAll()
124+
return underlying
119125
}
120126
}
121127

Sources/Basics/DispatchTimeInterval+Extensions.swift

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,21 @@ extension DispatchTimeInterval {
2727
}
2828
}
2929

30+
public func nanoseconds() -> Int? {
31+
switch self {
32+
case .seconds(let value):
33+
return value.multipliedReportingOverflow(by: 1_000_000_000).partialValue
34+
case .milliseconds(let value):
35+
return value.multipliedReportingOverflow(by: 1_000_000).partialValue
36+
case .microseconds(let value):
37+
return value.multipliedReportingOverflow(by: 1000).partialValue
38+
case .nanoseconds(let value):
39+
return value
40+
default:
41+
return nil
42+
}
43+
}
44+
3045
public func milliseconds() -> Int? {
3146
switch self {
3247
case .seconds(let value):

Sources/Basics/HTTPClient.swift

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import class Foundation.JSONDecoder
1515
import class Foundation.NSError
1616
import class Foundation.OperationQueue
1717
import struct Foundation.URL
18+
import struct Foundation.UUID
1819
import TSCBasic
1920

2021
#if canImport(Glibc)
@@ -35,7 +36,7 @@ public enum HTTPClientError: Error, Equatable {
3536

3637
// MARK: - HTTPClient
3738

38-
public struct HTTPClient {
39+
public struct HTTPClient: Cancellable {
3940
public typealias Configuration = HTTPClientConfiguration
4041
public typealias Request = HTTPClientRequest
4142
public typealias Response = HTTPClientResponse
@@ -47,9 +48,12 @@ public struct HTTPClient {
4748
private let underlying: Handler
4849

4950
/// DispatchSemaphore to restrict concurrent operations on manager.
50-
private let concurrencySemaphore: DispatchSemaphore
51-
/// OperationQueue to park pending requests
52-
private let requestsQueue: OperationQueue
51+
private let concurrencySemaphore: DispatchSemaphore
52+
/// OperationQueue to park pending requests
53+
private let requestsQueue: OperationQueue
54+
55+
// tracks outstanding requests for cancellation
56+
private var outstandingRequests = ThreadSafeKeyValueStore<UUID, (url: URL, completion: CompletionHandler, progress: ProgressHandler?, queue: DispatchQueue)>()
5357

5458
// static to share across instances of the http client
5559
private static var hostsErrorsLock = Lock()
@@ -76,7 +80,12 @@ public struct HTTPClient {
7680
/// - observabilityScope: the observability scope to emit diagnostics on
7781
/// - progress: A progress handler to handle progress for example for downloads
7882
/// - completion: A completion handler to be notified of the completion of the request.
79-
public func execute(_ request: Request, observabilityScope: ObservabilityScope? = nil, progress: ProgressHandler? = nil, completion: @escaping CompletionHandler) {
83+
public func execute(
84+
_ request: Request,
85+
observabilityScope: ObservabilityScope? = nil,
86+
progress: ProgressHandler? = nil,
87+
completion: @escaping CompletionHandler
88+
) {
8089
// merge configuration
8190
var request = request
8291
if request.options.callbackQueue == nil {
@@ -107,7 +116,9 @@ public struct HTTPClient {
107116
request.headers.add(name: "Authorization", value: authorization)
108117
}
109118
// execute
110-
let callbackQueue = request.options.callbackQueue ?? self.configuration.callbackQueue
119+
guard let callbackQueue = request.options.callbackQueue else {
120+
return completion(.failure(InternalError("unknown callback queue")))
121+
}
111122
self._execute(
112123
request: request,
113124
requestNumber: 0,
@@ -129,13 +140,42 @@ public struct HTTPClient {
129140
)
130141
}
131142

132-
private func _execute(request: Request, requestNumber: Int, observabilityScope: ObservabilityScope?, progress: ProgressHandler?, completion: @escaping CompletionHandler) {
143+
/// Cancel any outstanding requests
144+
public func cancel(deadline: DispatchTime) throws {
145+
let outstanding = self.outstandingRequests.clear()
146+
for (_, callback, _, queue) in outstanding.values {
147+
queue.async {
148+
callback(.failure(CancellationError()))
149+
}
150+
}
151+
}
152+
153+
private func _execute(
154+
request: Request,
155+
requestNumber: Int,
156+
observabilityScope: ObservabilityScope?,
157+
progress: ProgressHandler?,
158+
completion: @escaping CompletionHandler
159+
) {
160+
// records outstanding requests for cancellation purposes
161+
guard let callbackQueue = request.options.callbackQueue else {
162+
return completion(.failure(InternalError("unknown callback queue")))
163+
}
164+
let requestKey = UUID()
165+
self.outstandingRequests[requestKey] = (url: request.url, completion: completion, progress: progress, queue: callbackQueue)
166+
133167
// wrap completion handler with concurrency control cleanup
134168
let originalCompletion = completion
135169
let completion: CompletionHandler = { result in
136170
// free concurrency control semaphore
137171
self.concurrencySemaphore.signal()
138-
originalCompletion(result)
172+
// cancellation support
173+
// if the callback is no longer on the pending lists it has been canceled already
174+
// read + remove from outstanding requests atomically
175+
if let (_, callback, _, queue) = self.outstandingRequests.removeValue(forKey: requestKey) {
176+
// call back on the request queue
177+
queue.async { callback(result) }
178+
}
139179
}
140180

141181
// we must not block the calling thread (for concurrency control) so nesting this in a queue
@@ -172,9 +212,11 @@ public struct HTTPClient {
172212
// handle retry strategy
173213
if let retryDelay = self.shouldRetry(response: response, request: request, requestNumber: requestNumber) {
174214
observabilityScope?.emit(warning: "\(request.url) failed, retrying in \(retryDelay)")
175-
// free concurrency control semaphore, since we re-submitting the request with the original completion handler
176-
// using the wrapped completion handler may lead to starving the mac concurrent requests
215+
// free concurrency control semaphore and outstanding request,
216+
// since we re-submitting the request with the original completion handler
217+
// using the wrapped completion handler may lead to starving the max concurrent requests
177218
self.concurrencySemaphore.signal()
219+
self.outstandingRequests[requestKey] = nil
178220
// TODO: dedicated retry queue?
179221
return self.configuration.callbackQueue.asyncAfter(deadline: .now() + retryDelay) {
180222
self._execute(request: request, requestNumber: requestNumber + 1, observabilityScope: observabilityScope, progress: progress, completion: originalCompletion)

Sources/Build/BuildOperation.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public final class BuildOperation: PackageStructureDelegate, SPMBuildCore.BuildS
140140
}
141141

142142
/// Cancel the active build operation.
143-
public func cancel() {
143+
public func cancel(deadline: DispatchTime) throws {
144144
buildSystem?.cancel()
145145
}
146146

Sources/Commands/APIDigester.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,10 @@ struct APIDigesterBaselineDumper {
102102
try workingCopy.checkout(revision: baselineRevision)
103103

104104
// Create the workspace for this package.
105-
let workspace = try Workspace(forRootPackage: baselinePackageRoot)
105+
let workspace = try Workspace(
106+
forRootPackage: baselinePackageRoot,
107+
cancellator: swiftTool.cancellator
108+
)
106109

107110
let graph = try workspace.loadPackageGraph(
108111
rootPath: baselinePackageRoot,

Sources/Commands/SwiftPackageTool.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1272,7 +1272,7 @@ final class PluginDelegate: PluginInvocationDelegate {
12721272
let testRunner = TestRunner(
12731273
bundlePaths: [testProduct.bundlePath],
12741274
xctestArg: testSpecifier,
1275-
processSet: swiftTool.processSet,
1275+
cancellator: swiftTool.cancellator,
12761276
toolchain: toolchain,
12771277
testEnv: testEnvironment,
12781278
observabilityScope: swiftTool.observabilityScope)
@@ -1562,6 +1562,7 @@ extension SwiftPackageTool {
15621562
projectName: projectName,
15631563
xcodeprojPath: xcodeprojPath,
15641564
graph: graph,
1565+
repositoryProvider: GitRepositoryProvider(),
15651566
options: genOptions,
15661567
fileSystem: swiftTool.fileSystem,
15671568
observabilityScope: swiftTool.observabilityScope

0 commit comments

Comments
 (0)