Skip to content

Commit f99b761

Browse files
committed
termination handler
motivation: allow clients of SwiftPM to terminate background activities changes: * introduce new terminator class that is a registry for termination handlers * use the terminator instad of ProcessSet to manage the termination of active processes (eg git, tests) and build system * add cancel method to GitRepositoryProvider protocol so that it can be interuppted * add cancel methods to RepositoryManager, RegistryDownloadManager and HTTPClient so that it can be interuppted * change workspace initializer to take a terminator so that CLI and other consumer of libSwiftPM can interrupt * register intrupption points mentioned above in workspace * adjust related call sites, ie CLI signal handler now uses the workspace terminator to interuupt TODO: * add more tests * integrate into dependency resolition rdar://64900054 rdar://63723896
1 parent c06e69f commit f99b761

22 files changed

+320
-89
lines changed

Sources/Basics/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ add_library(Basics
2828
Triple+Extensions.swift
2929
SwiftVersion.swift
3030
SQLiteBackedCache.swift
31+
Terminator.swift
3132
Version+Extensions.swift)
3233
target_link_libraries(Basics PUBLIC
3334
SwiftCollections::OrderedCollections

Sources/Basics/ConcurrencyHelpers.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,12 @@ public final class ThreadSafeBox<Value> {
194194
}
195195
}
196196

197+
public func get(`default`: Value) -> Value {
198+
self.lock.withLock {
199+
self.underlying ?? `default`
200+
}
201+
}
202+
197203
public func put(_ newValue: Value) {
198204
self.lock.withLock {
199205
self.underlying = newValue

Sources/Basics/HTTPClient.swift

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import struct Foundation.Date
1414
import class Foundation.JSONDecoder
1515
import class Foundation.NSError
1616
import struct Foundation.URL
17+
import struct Foundation.UUID
1718
import TSCBasic
1819

1920
#if canImport(Glibc)
@@ -34,7 +35,7 @@ public enum HTTPClientError: Error, Equatable {
3435

3536
// MARK: - HTTPClient
3637

37-
public struct HTTPClient {
38+
public struct HTTPClient: Cancellable {
3839
public typealias Configuration = HTTPClientConfiguration
3940
public typealias Request = HTTPClientRequest
4041
public typealias Response = HTTPClientResponse
@@ -45,6 +46,9 @@ public struct HTTPClient {
4546
public var configuration: HTTPClientConfiguration
4647
private let underlying: Handler
4748

49+
// tracks outstanding callback for cancellation
50+
private var pendingCallbacks = ThreadSafeKeyValueStore<UUID, (completion: CompletionHandler, progress: ProgressHandler?, queue: DispatchQueue)>()
51+
4852
// static to share across instances of the http client
4953
private static var hostsErrorsLock = Lock()
5054
private static var hostsErrors = [String: [Date]]()
@@ -92,27 +96,44 @@ public struct HTTPClient {
9296
if let authorization = request.options.authorizationProvider?(request.url), !request.headers.contains("Authorization") {
9397
request.headers.add(name: "Authorization", value: authorization)
9498
}
95-
// execute
99+
// records outstanding requests for cancellation purposes
96100
let callbackQueue = request.options.callbackQueue ?? self.configuration.callbackQueue
101+
let pendingKey = UUID()
102+
self.pendingCallbacks[pendingKey] = (completion: completion, progress: progress, queue: callbackQueue)
103+
// execute
97104
self._execute(
98105
request: request,
99106
requestNumber: 0,
100107
observabilityScope: observabilityScope,
101-
progress: progress.map { handler in
102-
{ received, expected in
103-
callbackQueue.async {
104-
handler(received, expected)
105-
}
108+
progress: { received, expected in
109+
// call back on the request queue
110+
// if the callback is no longer on the pending lists it has been canceled already
111+
if let (_, progress, queue) = self.pendingCallbacks[pendingKey], let progress = progress {
112+
queue.async { progress(received, expected) }
106113
}
107114
},
108115
completion: { result in
109-
callbackQueue.async {
110-
completion(result)
116+
// call back on the request queue
117+
// if the callback is no longer on the pending lists it has been canceled already
118+
if let (callback, _, queue) = self.pendingCallbacks[pendingKey] {
119+
queue.async { callback(result) }
120+
self.pendingCallbacks[pendingKey] = nil
111121
}
112122
}
113123
)
114124
}
115125

126+
/// Cancel any outstanding requests
127+
public func cancel(deadline: DispatchTime) {
128+
let outstanding = self.pendingCallbacks.get()
129+
self.pendingCallbacks.clear()
130+
for (completion, _, queue) in outstanding.values {
131+
queue.async {
132+
completion(.failure(CancellationError()))
133+
}
134+
}
135+
}
136+
116137
private func _execute(request: Request, requestNumber: Int, observabilityScope: ObservabilityScope?, progress: ProgressHandler?, completion: @escaping CompletionHandler) {
117138
if self.shouldCircuitBreak(request: request) {
118139
observabilityScope?.emit(warning: "Circuit breaker triggered for \(request.url)")

Sources/Basics/Terminator.swift

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
This source file is part of the Swift.org open source project
3+
4+
Copyright (c) 2022 Apple Inc. and the Swift project authors
5+
Licensed under Apache License v2.0 with Runtime Library Exception
6+
7+
See http://swift.org/LICENSE.txt for license information
8+
See http://swift.org/CONTRIBUTORS.txt for Swift project authors
9+
*/
10+
11+
import Dispatch
12+
import Foundation
13+
import TSCBasic
14+
15+
public typealias TerminationHandler = (DispatchTime) throws -> Void
16+
17+
public class Terminator {
18+
public typealias RegistrationKey = String
19+
20+
private let observabilityScope: ObservabilityScope?
21+
private let registry = ThreadSafeKeyValueStore<String, (name: String, handler: TerminationHandler)>()
22+
private let terminationQueue = DispatchQueue(label: "org.swift.swiftpm.terminator", attributes: .concurrent)
23+
private let terminating = ThreadSafeBox<Bool>(false)
24+
25+
public init(observabilityScope: ObservabilityScope?) {
26+
self.observabilityScope = observabilityScope
27+
}
28+
29+
@discardableResult
30+
public func register(name: String, handler: @escaping TerminationHandler) -> RegistrationKey? {
31+
if self.terminating.get() ?? false {
32+
self.observabilityScope?.emit(debug: "not registering '\(name)' with terminator, termination in progress")
33+
return .none
34+
}
35+
let key = UUID().uuidString
36+
self.observabilityScope?.emit(debug: "registering '\(name)' with terminator")
37+
self.registry[key] = (name: name, handler: handler)
38+
return key
39+
}
40+
41+
@discardableResult
42+
public func register(name: String, handler: Cancellable) -> RegistrationKey? {
43+
self.register(name: name, handler: handler.cancel(deadline:))
44+
}
45+
46+
@discardableResult
47+
public func register(name: String, handler: @escaping () throws -> Void) -> RegistrationKey? {
48+
self.register(name: name, handler: { _ in try handler() })
49+
}
50+
51+
public func register(_ process: TSCBasic.Process) -> RegistrationKey? {
52+
self.register(name: "\(process.arguments.joined(separator: " "))", handler: process.terminate)
53+
}
54+
55+
public func deregister(_ key: RegistrationKey) {
56+
self.registry[key] = nil
57+
}
58+
59+
public func terminate(deadline: DispatchTime? = .none) {
60+
self.terminating.put(true)
61+
62+
self.observabilityScope?.emit(info: "starting termination cycle with \(self.registry.count) termination handlers registered")
63+
64+
let deadline = deadline ?? .now() + .seconds(30)
65+
let success = ThreadSafeArrayStore<String>()
66+
let group = DispatchGroup()
67+
for (_, (name, handler)) in self.registry.get() {
68+
group.enter()
69+
self.terminationQueue.async {
70+
defer { group.leave() }
71+
do {
72+
self.observabilityScope?.emit(debug: "terminating '\(name)'")
73+
try handler(deadline)
74+
success.append(name)
75+
} catch {
76+
self.observabilityScope?.emit(warning: "failed terminating '\(name)': \(error)")
77+
}
78+
}
79+
}
80+
81+
if case .timedOut = group.wait(timeout: deadline) {
82+
self.observabilityScope?.emit(warning: "timeout waiting for termination with \(self.registry.count - success.count) termination handlers remaining")
83+
} else {
84+
self.observabilityScope?.emit(info: "termination cycle completed successfully")
85+
}
86+
87+
self.terminating.put(false)
88+
}
89+
}
90+
91+
public protocol Cancellable {
92+
func cancel(deadline: DispatchTime) throws -> Void
93+
}
94+
95+
public struct CancellationError: Error, CustomStringConvertible {
96+
public let description = "Operation cancelled"
97+
98+
public init() {}
99+
}
100+
101+
extension TSCBasic.Process {
102+
fileprivate func terminate(timeout: DispatchTime) {
103+
// send graceful shutdown signal
104+
self.signal(SIGINT)
105+
106+
// start a thread to see if we need to terminate more forcibly
107+
let forceKillSemaphore = DispatchSemaphore(value: 1)
108+
let forceKillThread = TSCBasic.Thread {
109+
if case .timedOut = forceKillSemaphore.wait(timeout: timeout) {
110+
// send a force-kill signal
111+
#if os(Windows)
112+
self.signal(SIGTERM)
113+
#else
114+
self.signal(SIGKILL)
115+
#endif
116+
}
117+
}
118+
forceKillThread.start()
119+
_ = try? self.waitUntilExit()
120+
forceKillSemaphore.signal() // let the force-kill thread know we do not need it any more
121+
// join the force-kill thread thread so we don't exit before everything terminates
122+
forceKillThread.join()
123+
}
124+
}

Sources/Commands/APIDigester.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,10 @@ struct APIDigesterBaselineDumper {
102102
try workingCopy.checkout(revision: baselineRevision)
103103

104104
// Create the workspace for this package.
105-
let workspace = try Workspace(forRootPackage: baselinePackageRoot)
105+
let workspace = try Workspace(
106+
forRootPackage: baselinePackageRoot,
107+
terminator: swiftTool.terminator
108+
)
106109

107110
let graph = try workspace.loadPackageGraph(
108111
rootPath: baselinePackageRoot,

Sources/Commands/SwiftPackageTool.swift

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,21 +1170,14 @@ final class PluginDelegate: PluginInvocationDelegate {
11701170

11711171
// Create a build operation. We have to disable the cache in order to get a build plan created.
11721172
let outputStream = BufferedOutputByteStream()
1173-
let buildOperation = BuildOperation(
1174-
buildParameters: buildParameters,
1173+
let buildOperation = try swiftTool.createBuildOperation(
1174+
explicitProduct: explicitProduct,
11751175
cacheBuildManifest: false,
1176-
packageGraphLoader: { try self.swiftTool.loadPackageGraph(explicitProduct: explicitProduct) },
1177-
pluginScriptRunner: try self.swiftTool.getPluginScriptRunner(),
1178-
pluginWorkDirectory: try self.swiftTool.getActiveWorkspace().location.pluginWorkingDirectory,
1179-
outputStream: outputStream,
1180-
logLevel: logLevel,
1181-
fileSystem: swiftTool.fileSystem,
1182-
observabilityScope: self.swiftTool.observabilityScope
1176+
customBuildParameters: buildParameters,
1177+
customOutputStream: outputStream,
1178+
customLogLevel: logLevel
11831179
)
11841180

1185-
// Save the instance so it can be canceled from the interrupt handler.
1186-
self.swiftTool.buildSystemRef.buildSystem = buildOperation
1187-
11881181
// Get or create the build description and plan the build.
11891182
let _ = try buildOperation.getBuildDescription()
11901183
let buildPlan = buildOperation.buildPlan!
@@ -1280,7 +1273,7 @@ final class PluginDelegate: PluginInvocationDelegate {
12801273
let testRunner = TestRunner(
12811274
bundlePaths: [testProduct.bundlePath],
12821275
xctestArg: testSpecifier,
1283-
processSet: swiftTool.processSet,
1276+
terminator: swiftTool.terminator,
12841277
toolchain: toolchain,
12851278
testEnv: testEnvironment,
12861279
observabilityScope: swiftTool.observabilityScope)
@@ -1564,6 +1557,7 @@ extension SwiftPackageTool {
15641557
projectName: projectName,
15651558
xcodeprojPath: xcodeprojPath,
15661559
graph: graph,
1560+
repositoryProvider: GitRepositoryProvider(),
15671561
options: genOptions,
15681562
fileSystem: swiftTool.fileSystem,
15691563
observabilityScope: swiftTool.observabilityScope

Sources/Commands/SwiftTestTool.swift

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ public struct SwiftTestTool: SwiftCommand {
283283
let runner = TestRunner(
284284
bundlePaths: testProducts.map { $0.bundlePath },
285285
xctestArg: xctestArg,
286-
processSet: swiftTool.processSet,
286+
terminator: swiftTool.terminator,
287287
toolchain: toolchain,
288288
testEnv: testEnv,
289289
observabilityScope: swiftTool.observabilityScope
@@ -327,7 +327,7 @@ public struct SwiftTestTool: SwiftCommand {
327327
// Run the tests using the parallel runner.
328328
let runner = ParallelTestRunner(
329329
bundlePaths: testProducts.map { $0.bundlePath },
330-
processSet: swiftTool.processSet,
330+
terminator: swiftTool.terminator,
331331
toolchain: toolchain,
332332
numJobs: options.numberOfWorkers ?? ProcessInfo.processInfo.activeProcessorCount,
333333
options: swiftOptions,
@@ -514,7 +514,7 @@ final class TestRunner {
514514
/// Arguments to pass to XCTest if any.
515515
private let xctestArg: String?
516516

517-
private let processSet: ProcessSet
517+
private let terminator: Terminator
518518

519519
// The toolchain to use.
520520
private let toolchain: UserToolchain
@@ -532,14 +532,14 @@ final class TestRunner {
532532
init(
533533
bundlePaths: [AbsolutePath],
534534
xctestArg: String? = nil,
535-
processSet: ProcessSet,
535+
terminator: Terminator,
536536
toolchain: UserToolchain,
537537
testEnv: [String: String],
538538
observabilityScope: ObservabilityScope
539539
) {
540540
self.bundlePaths = bundlePaths
541541
self.xctestArg = xctestArg
542-
self.processSet = processSet
542+
self.terminator = terminator
543543
self.toolchain = toolchain
544544
self.testEnv = testEnv
545545
self.observabilityScope = observabilityScope.makeChildScope(description: "Test Runner")
@@ -591,7 +591,10 @@ final class TestRunner {
591591
stderr: outputHandler
592592
)
593593
let process = Process(arguments: try args(forTestAt: path), environment: self.testEnv, outputRedirection: outputRedirection)
594-
try self.processSet.add(process)
594+
guard let terminationKey = self.terminator.register(process) else {
595+
return false // terminating
596+
}
597+
defer { self.terminator.deregister(terminationKey) }
595598
try process.launch()
596599
let result = try process.waitUntilExit()
597600
switch result.exitStatus {
@@ -605,8 +608,6 @@ final class TestRunner {
605608
default:
606609
return false
607610
}
608-
} catch ProcessSetError.cancelled {
609-
return false
610611
} catch {
611612
testObservabilityScope.emit(error)
612613
return false
@@ -644,7 +645,7 @@ final class ParallelTestRunner {
644645
/// True if all tests executed successfully.
645646
private(set) var ranSuccessfully = true
646647

647-
private let processSet: ProcessSet
648+
private let terminator: Terminator
648649

649650
private let toolchain: UserToolchain
650651

@@ -662,7 +663,7 @@ final class ParallelTestRunner {
662663

663664
init(
664665
bundlePaths: [AbsolutePath],
665-
processSet: ProcessSet,
666+
terminator: Terminator,
666667
toolchain: UserToolchain,
667668
numJobs: Int,
668669
options: SwiftToolOptions,
@@ -671,7 +672,7 @@ final class ParallelTestRunner {
671672
observabilityScope: ObservabilityScope
672673
) {
673674
self.bundlePaths = bundlePaths
674-
self.processSet = processSet
675+
self.terminator = terminator
675676
self.toolchain = toolchain
676677
self.numJobs = numJobs
677678
self.shouldOutputSuccess = shouldOutputSuccess
@@ -727,7 +728,7 @@ final class ParallelTestRunner {
727728
let testRunner = TestRunner(
728729
bundlePaths: [test.productPath],
729730
xctestArg: test.specifier,
730-
processSet: self.processSet,
731+
terminator: self.terminator,
731732
toolchain: self.toolchain,
732733
testEnv: testEnv,
733734
observabilityScope: self.observabilityScope

0 commit comments

Comments
 (0)