Skip to content

Commit ce44b82

Browse files
committed
improve process state management
motivation: reports on hang process (mostly tests) changes: * refactor Process to use a state machine to track the process execution state * replace use of DispatchQueue with Locks to protect state rdar://76087764
1 parent 435a270 commit ce44b82

File tree

2 files changed

+133
-60
lines changed

2 files changed

+133
-60
lines changed

Sources/TSCBasic/Process.swift

Lines changed: 132 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,14 @@ public final class Process: ObjectIdentifierProtocol {
178178
}
179179
}
180180

181+
// process execution mutable state
182+
private enum State {
183+
case idle
184+
case readingOutput(stdout: Thread, stderr: Thread?)
185+
case outputReady(stdout: Result<[UInt8], Swift.Error>, stderr: Result<[UInt8], Swift.Error>)
186+
case complete(ProcessResult)
187+
}
188+
181189
/// Typealias for process id type.
182190
#if !os(Windows)
183191
public typealias ProcessID = pid_t
@@ -219,36 +227,36 @@ public final class Process: ObjectIdentifierProtocol {
219227
public private(set) var processID = ProcessID()
220228
#endif
221229

222-
/// If the subprocess has launched.
223-
/// Note: This property is not protected by the serial queue because it is only mutated in `launch()`, which will be
224-
/// called only once.
225-
public private(set) var launched = false
230+
// process execution mutable state
231+
private var state: State = .idle
232+
private let stateLock = Lock()
226233

227234
/// The result of the process execution. Available after process is terminated.
235+
/// This will block while the process is running, as such equivalent to `waitUntilExit`
236+
@available(*, deprecated, message: "use waitUntilExit instead")
228237
public var result: ProcessResult? {
229-
return self.serialQueue.sync {
230-
self._result
238+
return self.stateLock.withLock {
239+
switch self.state {
240+
case .complete(let result):
241+
return result
242+
default:
243+
return nil
244+
}
231245
}
232246
}
233247

234-
/// How process redirects its output.
235-
public let outputRedirection: OutputRedirection
236-
237-
/// The result of the process execution. Available after process is terminated.
238-
private var _result: ProcessResult?
239-
240-
/// If redirected, stdout result and reference to the thread reading the output.
241-
private var stdout: (result: Result<[UInt8], Swift.Error>, thread: Thread?) = (.success([]), nil)
242-
243-
/// If redirected, stderr result and reference to the thread reading the output.
244-
private var stderr: (result: Result<[UInt8], Swift.Error>, thread: Thread?) = (.success([]), nil)
248+
// ideally we would use the state for this, but we need to access it while the waitForExit is locking state
249+
private var _launched = false
250+
private let launchedLock = Lock()
245251

246-
/// Queue to protect concurrent reads.
247-
private let serialQueue = DispatchQueue(label: "org.swift.swiftpm.process")
252+
public var launched: Bool {
253+
return self.launchedLock.withLock {
254+
return self._launched
255+
}
256+
}
248257

249-
/// Queue to protect reading/writing on map of validated executables.
250-
private static let executablesQueue = DispatchQueue(
251-
label: "org.swift.swiftpm.process.findExecutable")
258+
/// How process redirects its output.
259+
public let outputRedirection: OutputRedirection
252260

253261
/// Indicates if a new progress group is created for the child process.
254262
private let startNewProcessGroup: Bool
@@ -257,7 +265,8 @@ public final class Process: ObjectIdentifierProtocol {
257265
///
258266
/// Key: Executable name or path.
259267
/// Value: Path to the executable, if found.
260-
static private var validatedExecutablesMap = [String: AbsolutePath?]()
268+
private static var validatedExecutablesMap = [String: AbsolutePath?]()
269+
private static let validatedExecutablesMapLock = Lock()
261270

262271
/// Create a new process instance.
263272
///
@@ -348,7 +357,7 @@ public final class Process: ObjectIdentifierProtocol {
348357
}
349358
// This should cover the most common cases, i.e. when the cache is most helpful.
350359
if workingDirectory == localFileSystem.currentWorkingDirectory {
351-
return Process.executablesQueue.sync {
360+
return Process.validatedExecutablesMapLock.withLock {
352361
if let value = Process.validatedExecutablesMap[program] {
353362
return value
354363
}
@@ -367,10 +376,11 @@ public final class Process: ObjectIdentifierProtocol {
367376
@discardableResult
368377
public func launch() throws -> WritableByteStream {
369378
precondition(arguments.count > 0 && !arguments[0].isEmpty, "Need at least one argument to launch the process.")
370-
precondition(!launched, "It is not allowed to launch the same process object again.")
371379

372-
// Set the launch bool to true.
373-
launched = true
380+
self.launchedLock.withLock {
381+
precondition(!self._launched, "It is not allowed to launch the same process object again.")
382+
self._launched = true
383+
}
374384

375385
// Print the arguments if we are verbose.
376386
if self.verbose {
@@ -396,27 +406,44 @@ public final class Process: ObjectIdentifierProtocol {
396406
if outputRedirection.redirectsOutput {
397407
let stdoutPipe = Pipe()
398408
let stderrPipe = Pipe()
409+
410+
var pending: [UInt8]?
411+
let pendingLock = Lock()
412+
399413
stdoutPipe.fileHandleForReading.readabilityHandler = { (fh : FileHandle) -> Void in
400414
let contents = fh.readDataToEndOfFile()
401415
self.outputRedirection.outputClosures?.stdoutClosure([UInt8](contents))
402-
if case .success(let data) = self.stdout.result {
403-
self.stdout.result = .success(data + contents)
416+
pendingLock.withLock {
417+
if let stderr = pending {
418+
self.stateLock.withLock {
419+
self?.state = .outputReady(stdout: .success(contents), stderr: .success(stderr))
420+
}
421+
} else {
422+
pending = contents
423+
}
404424
}
405425
}
406426
stderrPipe.fileHandleForReading.readabilityHandler = { (fh : FileHandle) -> Void in
407427
let contents = fh.readDataToEndOfFile()
408428
self.outputRedirection.outputClosures?.stderrClosure([UInt8](contents))
409-
if case .success(let data) = self.stderr.result {
410-
self.stderr.result = .success(data + contents)
429+
pendingLock.withLock {
430+
if let stdout = pending {
431+
self.stateLock.withLock {
432+
self?.state = .outputReady(stdout: .success(stdout), stderr: .success(contents))
433+
}
434+
} else {
435+
pending = contents
436+
}
411437
}
412438
}
439+
413440
_process?.standardOutput = stdoutPipe
414441
_process?.standardError = stderrPipe
415442
}
416443

417444
try _process?.run()
418445
return stdinPipe.fileHandleForWriting
419-
#else
446+
#else
420447
// Initialize the spawn attributes.
421448
#if canImport(Darwin) || os(Android)
422449
var attributes: posix_spawnattr_t? = nil
@@ -547,38 +574,80 @@ public final class Process: ObjectIdentifierProtocol {
547574
// Close the local read end of the input pipe.
548575
try close(fd: stdinPipe[0])
549576

550-
if outputRedirection.redirectsOutput {
577+
if !outputRedirection.redirectsOutput {
578+
// no stdout or stderr in this case
579+
self.stateLock.withLock {
580+
self.state = .outputReady(stdout: .success([]), stderr: .success([]))
581+
}
582+
} else {
583+
var pending: Result<[UInt8], Swift.Error>?
584+
let pendingLock = Lock()
585+
551586
let outputClosures = outputRedirection.outputClosures
552587

553588
// Close the local write end of the output pipe.
554589
try close(fd: outputPipe[1])
555590

556591
// Create a thread and start reading the output on it.
557-
var thread = Thread { [weak self] in
592+
let stdoutThread = Thread { [weak self] in
558593
if let readResult = self?.readOutput(onFD: outputPipe[0], outputClosure: outputClosures?.stdoutClosure) {
559-
self?.stdout.result = readResult
594+
pendingLock.withLock {
595+
if let stderrResult = pending {
596+
self?.stateLock.withLock {
597+
self?.state = .outputReady(stdout: readResult, stderr: stderrResult)
598+
}
599+
} else {
600+
pending = readResult
601+
}
602+
}
603+
} else if let stderrResult = (pendingLock.withLock { pending }) {
604+
// TODO: this is more of an error
605+
self?.stateLock.withLock {
606+
self?.state = .outputReady(stdout: .success([]), stderr: stderrResult)
607+
}
560608
}
561609
}
562-
thread.start()
563-
self.stdout.thread = thread
564610

565611
// Only schedule a thread for stderr if no redirect was requested.
612+
var stderrThread: Thread? = nil
566613
if !outputRedirection.redirectStderr {
567614
// Close the local write end of the stderr pipe.
568615
try close(fd: stderrPipe[1])
569616

570617
// Create a thread and start reading the stderr output on it.
571-
thread = Thread { [weak self] in
618+
stderrThread = Thread { [weak self] in
572619
if let readResult = self?.readOutput(onFD: stderrPipe[0], outputClosure: outputClosures?.stderrClosure) {
573-
self?.stderr.result = readResult
620+
pendingLock.withLock {
621+
if let stdoutResult = pending {
622+
self?.stateLock.withLock {
623+
self?.state = .outputReady(stdout: stdoutResult, stderr: readResult)
624+
}
625+
} else {
626+
pending = readResult
627+
}
628+
}
629+
} else if let stdoutResult = (pendingLock.withLock { pending }) {
630+
// TODO: this is more of an error
631+
self?.stateLock.withLock {
632+
self?.state = .outputReady(stdout: stdoutResult, stderr: .success([]))
633+
}
574634
}
575635
}
576-
thread.start()
577-
self.stderr.thread = thread
636+
} else {
637+
pendingLock.withLock {
638+
pending = .success([]) // no stderr in this case
639+
}
578640
}
641+
// first set state then start reading threads
642+
self.stateLock.withLock {
643+
self.state = .readingOutput(stdout: stdoutThread, stderr: stderrThread)
644+
}
645+
stdoutThread.start()
646+
stderrThread?.start()
579647
}
648+
580649
return stdinStream
581-
#endif // POSIX implementation
650+
#endif // POSIX implementation
582651
}
583652

584653
/// Blocks the calling process until the subprocess finishes execution.
@@ -600,18 +669,22 @@ public final class Process: ObjectIdentifierProtocol {
600669
)
601670
return executionResult
602671
#else
603-
return try serialQueue.sync {
604-
precondition(launched, "The process is not yet launched.")
605-
606-
// If the process has already finsihed, return it.
607-
if let existingResult = _result {
608-
return existingResult
609-
}
610-
672+
self.stateLock.lock()
673+
switch self.state {
674+
case .idle:
675+
defer { self.stateLock.unlock() }
676+
preconditionFailure("The process is not yet launched.")
677+
case .complete(let result):
678+
defer { self.stateLock.unlock() }
679+
return result
680+
case .readingOutput(let stdoutThread, let stderrThread):
681+
self.stateLock.unlock() // unlock early since output read thread need to change state
611682
// If we're reading output, make sure that is finished.
612-
stdout.thread?.join()
613-
stderr.thread?.join()
614-
683+
stdoutThread.join()
684+
stderrThread?.join()
685+
return try self.waitUntilExit()
686+
case .outputReady(let stdoutResult, let stderrResult):
687+
defer { self.stateLock.unlock() }
615688
// Wait until process finishes execution.
616689
var exitStatusCode: Int32 = 0
617690
var result = waitpid(processID, &exitStatusCode, 0)
@@ -627,10 +700,10 @@ public final class Process: ObjectIdentifierProtocol {
627700
arguments: arguments,
628701
environment: environment,
629702
exitStatusCode: exitStatusCode,
630-
output: stdout.result,
631-
stderrOutput: stderr.result
703+
output: stdoutResult,
704+
stderrOutput: stderrResult
632705
)
633-
self._result = executionResult
706+
self.state = .complete(executionResult)
634707
return executionResult
635708
}
636709
#endif
@@ -687,12 +760,12 @@ public final class Process: ObjectIdentifierProtocol {
687760
public func signal(_ signal: Int32) {
688761
#if os(Windows)
689762
if signal == SIGINT {
690-
_process?.interrupt()
763+
_process?.interrupt()
691764
} else {
692-
_process?.terminate()
765+
_process?.terminate()
693766
}
694767
#else
695-
assert(launched, "The process is not yet launched.")
768+
assert(self.launched, "The process is not yet launched.")
696769
_ = TSCLibc.kill(startNewProcessGroup ? -processID : processID, signal)
697770
#endif
698771
}

Tests/TSCBasicTests/ProcessSetTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class ProcessSetTests: XCTestCase {
5555
threadStartCondition.signal()
5656
}
5757
let result = try process.waitUntilExit()
58-
// Ensure we did termiated due to signal.
58+
// Ensure we did terminated due to signal.
5959
switch result.exitStatus {
6060
case .signalled: break
6161
default: XCTFail("Expected to exit via signal")

0 commit comments

Comments
 (0)