Skip to content

Commit ae35ad7

Browse files
authored
cancellation handler (#4211)
* Revert "Revert "cancellation handler (#4173)"" This reverts commit c948a86. * windows fix
1 parent c948a86 commit ae35ad7

31 files changed

+1012
-117
lines changed

Sources/Basics/Archiver+Zip.swift

Lines changed: 52 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,54 +12,83 @@ import TSCBasic
1212
import Dispatch
1313

1414
/// An `Archiver` that handles ZIP archives using the command-line `zip` and `unzip` tools.
15-
public struct ZipArchiver: Archiver {
15+
public struct ZipArchiver: Archiver, Cancellable {
1616
public var supportedExtensions: Set<String> { ["zip"] }
1717

1818
/// The file-system implementation used for various file-system operations and checks.
1919
private let fileSystem: FileSystem
2020

21+
/// Helper for cancelling in-fligh requests
22+
private let cancellator: Cancellator
23+
2124
/// Creates a `ZipArchiver`.
2225
///
2326
/// - Parameters:
2427
/// - fileSystem: The file-system to used by the `ZipArchiver`.
2528
public init(fileSystem: FileSystem) {
2629
self.fileSystem = fileSystem
30+
self.cancellator = Cancellator(observabilityScope: .none)
2731
}
2832

2933
public func extract(
3034
from archivePath: AbsolutePath,
3135
to destinationPath: AbsolutePath,
3236
completion: @escaping (Result<Void, Error>) -> Void
3337
) {
34-
guard fileSystem.exists(archivePath) else {
35-
completion(.failure(FileSystemError(.noEntry, archivePath)))
36-
return
37-
}
38+
do {
39+
guard self.fileSystem.exists(archivePath) else {
40+
throw FileSystemError(.noEntry, archivePath)
41+
}
3842

39-
guard fileSystem.isDirectory(destinationPath) else {
40-
completion(.failure(FileSystemError(.notDirectory, destinationPath)))
41-
return
42-
}
43+
guard self.fileSystem.isDirectory(destinationPath) else {
44+
throw FileSystemError(.notDirectory, destinationPath)
45+
}
46+
47+
let process = Process(arguments: ["unzip", archivePath.pathString, "-d", destinationPath.pathString])
48+
guard let registrationKey = self.cancellator.register(process) else {
49+
throw StringError("cancellation")
50+
}
4351

44-
Process.popen(arguments: ["unzip", archivePath.pathString, "-d", destinationPath.pathString], queue: .sharedConcurrent) { result in
45-
completion(result.tryMap { processResult in
46-
guard processResult.exitStatus == .terminated(code: 0) else {
47-
throw try StringError(processResult.utf8stderrOutput())
48-
}
49-
})
52+
DispatchQueue.sharedConcurrent.async {
53+
defer { self.cancellator.deregister(registrationKey) }
54+
completion(.init(catching: {
55+
try process.launch()
56+
let processResult = try process.waitUntilExit()
57+
guard processResult.exitStatus == .terminated(code: 0) else {
58+
throw try StringError(processResult.utf8stderrOutput())
59+
}
60+
}))
61+
}
62+
} catch {
63+
return completion(.failure(error))
5064
}
5165
}
5266

5367
public func validate(path: AbsolutePath, completion: @escaping (Result<Bool, Error>) -> Void) {
54-
guard fileSystem.exists(path) else {
55-
completion(.failure(FileSystemError(.noEntry, path)))
56-
return
57-
}
68+
do {
69+
guard self.fileSystem.exists(path) else {
70+
throw FileSystemError(.noEntry, path)
71+
}
72+
73+
let process = Process(arguments: ["unzip", "-t", path.pathString])
74+
guard let registrationKey = self.cancellator.register(process) else {
75+
throw StringError("cancellation")
76+
}
5877

59-
Process.popen(arguments: ["unzip", "-t", path.pathString], queue: .sharedConcurrent) { result in
60-
completion(result.tryMap { processResult in
61-
return processResult.exitStatus == .terminated(code: 0)
62-
})
78+
DispatchQueue.sharedConcurrent.async {
79+
defer { self.cancellator.deregister(registrationKey) }
80+
completion(.init(catching: {
81+
try process.launch()
82+
let processResult = try process.waitUntilExit()
83+
return processResult.exitStatus == .terminated(code: 0)
84+
}))
85+
}
86+
} catch {
87+
return completion(.failure(error))
6388
}
6489
}
90+
91+
public func cancel(deadline: DispatchTime) throws {
92+
try self.cancellator.cancel(deadline: deadline)
93+
}
6594
}

Sources/Basics/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ add_library(Basics
1111
Archiver+Zip.swift
1212
AuthorizationProvider.swift
1313
ByteString+Extensions.swift
14+
Cancellator.swift
1415
ConcurrencyHelpers.swift
1516
Dictionary+Extensions.swift
1617
DispatchTimeInterval+Extensions.swift

Sources/Basics/Cancellator.swift

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
This source file is part of the Swift.org open source project
3+
4+
Copyright (c) 2022 Apple Inc. and the Swift project authors
5+
Licensed under Apache License v2.0 with Runtime Library Exception
6+
7+
See http://swift.org/LICENSE.txt for license information
8+
See http://swift.org/CONTRIBUTORS.txt for Swift project authors
9+
*/
10+
11+
import Dispatch
12+
import Foundation
13+
import TSCBasic
14+
15+
public typealias CancellationHandler = (DispatchTime) throws -> Void
16+
17+
public class Cancellator: Cancellable {
18+
public typealias RegistrationKey = String
19+
20+
private let observabilityScope: ObservabilityScope?
21+
private let registry = ThreadSafeKeyValueStore<String, (name: String, handler: CancellationHandler)>()
22+
private let cancelationQueue = DispatchQueue(label: "org.swift.swiftpm.cancellator", qos: .userInteractive, attributes: .concurrent)
23+
private let cancelling = ThreadSafeBox<Bool>(false)
24+
25+
public init(observabilityScope: ObservabilityScope?) {
26+
self.observabilityScope = observabilityScope
27+
}
28+
29+
@discardableResult
30+
public func register(name: String, handler: @escaping CancellationHandler) -> RegistrationKey? {
31+
if self.cancelling.get(default: false) {
32+
self.observabilityScope?.emit(debug: "not registering '\(name)' with terminator, termination in progress")
33+
return .none
34+
}
35+
let key = UUID().uuidString
36+
self.observabilityScope?.emit(debug: "registering '\(name)' with terminator")
37+
self.registry[key] = (name: name, handler: handler)
38+
return key
39+
}
40+
41+
@discardableResult
42+
public func register(name: String, handler: Cancellable) -> RegistrationKey? {
43+
self.register(name: name, handler: handler.cancel(deadline:))
44+
}
45+
46+
@discardableResult
47+
public func register(name: String, handler: @escaping () throws -> Void) -> RegistrationKey? {
48+
self.register(name: name, handler: { _ in try handler() })
49+
}
50+
51+
public func register(_ process: TSCBasic.Process) -> RegistrationKey? {
52+
self.register(name: "\(process.arguments.joined(separator: " "))", handler: process.terminate)
53+
}
54+
55+
public func deregister(_ key: RegistrationKey) {
56+
self.registry[key] = nil
57+
}
58+
59+
public func cancel(deadline: DispatchTime) throws -> Void {
60+
self._cancel(deadline: deadline)
61+
}
62+
63+
// marked internal for testing
64+
@discardableResult
65+
internal func _cancel(deadline: DispatchTime? = .none)-> Int {
66+
self.cancelling.put(true)
67+
68+
self.observabilityScope?.emit(info: "starting cancellation cycle with \(self.registry.count) cancellation handlers registered")
69+
70+
let deadline = deadline ?? .now() + .seconds(30)
71+
// deadline for individual handlers set slightly before overall deadline
72+
let delta: DispatchTimeInterval = .nanoseconds(abs(deadline.distance(to: .now()).nanoseconds() ?? 0) / 5)
73+
let handlersDeadline = deadline - delta
74+
75+
let cancellationHandlers = self.registry.get()
76+
let cancelled = ThreadSafeArrayStore<String>()
77+
let group = DispatchGroup()
78+
for (_, (name, handler)) in cancellationHandlers {
79+
self.cancelationQueue.async(group: group) {
80+
do {
81+
self.observabilityScope?.emit(debug: "cancelling '\(name)'")
82+
try handler(handlersDeadline)
83+
cancelled.append(name)
84+
} catch {
85+
self.observabilityScope?.emit(warning: "failed cancelling '\(name)': \(error)")
86+
}
87+
}
88+
}
89+
90+
if case .timedOut = group.wait(timeout: deadline) {
91+
self.observabilityScope?.emit(warning: "timeout waiting for cancellation with \(cancellationHandlers.count - cancelled.count) cancellation handlers remaining")
92+
} else {
93+
self.observabilityScope?.emit(info: "cancellation cycle completed successfully")
94+
}
95+
96+
self.cancelling.put(false)
97+
98+
return cancelled.count
99+
}
100+
}
101+
102+
public protocol Cancellable {
103+
func cancel(deadline: DispatchTime) throws -> Void
104+
}
105+
106+
public struct CancellationError: Error, CustomStringConvertible {
107+
public let description = "Operation cancelled"
108+
109+
public init() {}
110+
}
111+
112+
extension TSCBasic.Process {
113+
fileprivate func terminate(timeout: DispatchTime) {
114+
// send graceful shutdown signal
115+
self.signal(SIGINT)
116+
117+
// start a thread to see if we need to terminate more forcibly
118+
let forceKillSemaphore = DispatchSemaphore(value: 0)
119+
let forceKillThread = TSCBasic.Thread {
120+
if case .timedOut = forceKillSemaphore.wait(timeout: timeout) {
121+
// send a force-kill signal
122+
#if os(Windows)
123+
self.signal(SIGTERM)
124+
#else
125+
self.signal(SIGKILL)
126+
#endif
127+
}
128+
}
129+
forceKillThread.start()
130+
_ = try? self.waitUntilExit()
131+
forceKillSemaphore.signal() // let the force-kill thread know we do not need it any more
132+
// join the force-kill thread thread so we don't exit before everything terminates
133+
forceKillThread.join()
134+
}
135+
}

Sources/Basics/ConcurrencyHelpers.swift

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,12 @@ public final class ThreadSafeKeyValueStore<Key, Value> where Key: Hashable {
5555
}
5656
}
5757

58-
public func clear() {
58+
@discardableResult
59+
public func clear() -> [Key: Value] {
5960
self.lock.withLock {
61+
let underlying = self.underlying
6062
self.underlying.removeAll()
63+
return underlying
6164
}
6265
}
6366

@@ -113,9 +116,12 @@ public final class ThreadSafeArrayStore<Value> {
113116
}
114117
}
115118

116-
public func clear() {
119+
@discardableResult
120+
public func clear() -> [Value] {
117121
self.lock.withLock {
118-
self.underlying = []
122+
let underlying = self.underlying
123+
self.underlying.removeAll()
124+
return underlying
119125
}
120126
}
121127

Sources/Basics/DispatchTimeInterval+Extensions.swift

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,21 @@ extension DispatchTimeInterval {
2727
}
2828
}
2929

30+
public func nanoseconds() -> Int? {
31+
switch self {
32+
case .seconds(let value):
33+
return value.multipliedReportingOverflow(by: 1_000_000_000).partialValue
34+
case .milliseconds(let value):
35+
return value.multipliedReportingOverflow(by: 1_000_000).partialValue
36+
case .microseconds(let value):
37+
return value.multipliedReportingOverflow(by: 1000).partialValue
38+
case .nanoseconds(let value):
39+
return value
40+
default:
41+
return nil
42+
}
43+
}
44+
3045
public func milliseconds() -> Int? {
3146
switch self {
3247
case .seconds(let value):
@@ -81,7 +96,7 @@ extension DispatchTimeInterval {
8196
#if os(Linux) || os(Windows) || os(Android) || os(OpenBSD)
8297
extension DispatchTime {
8398
public func distance(to: DispatchTime) -> DispatchTimeInterval {
84-
let duration = to.uptimeNanoseconds - self.uptimeNanoseconds
99+
let duration = to.uptimeNanoseconds.subtractingReportingOverflow(self.uptimeNanoseconds).partialValue
85100
return .nanoseconds(duration >= Int.max ? Int.max : Int(duration))
86101
}
87102
}

0 commit comments

Comments
 (0)