Skip to content

improve process state management #209

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 170 additions & 80 deletions Sources/TSCBasic/Process.swift
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,15 @@ public final class Process: ObjectIdentifierProtocol {
}
}

// process execution mutable state
private enum State {
case idle
case readingOutputThread(stdout: Thread, stderr: Thread?)
case readingOutputPipe(sync: DispatchGroup)
case outputReady(stdout: Result<[UInt8], Swift.Error>, stderr: Result<[UInt8], Swift.Error>)
case complete(ProcessResult)
}

/// Typealias for process id type.
#if !os(Windows)
public typealias ProcessID = pid_t
Expand Down Expand Up @@ -219,36 +228,36 @@ public final class Process: ObjectIdentifierProtocol {
public private(set) var processID = ProcessID()
#endif

/// If the subprocess has launched.
/// Note: This property is not protected by the serial queue because it is only mutated in `launch()`, which will be
/// called only once.
public private(set) var launched = false
// process execution mutable state
private var state: State = .idle
private let stateLock = Lock()

/// The result of the process execution. Available after process is terminated.
/// This will block while the process is awaiting result
@available(*, deprecated, message: "use waitUntilExit instead")
public var result: ProcessResult? {
return self.serialQueue.sync {
self._result
return self.stateLock.withLock {
switch self.state {
case .complete(let result):
return result
default:
return nil
}
}
}

/// How process redirects its output.
public let outputRedirection: OutputRedirection
// ideally we would use the state for this, but we need to access it while the waitForExit is locking state
private var _launched = false
private let launchedLock = Lock()

/// The result of the process execution. Available after process is terminated.
private var _result: ProcessResult?

/// If redirected, stdout result and reference to the thread reading the output.
private var stdout: (result: Result<[UInt8], Swift.Error>, thread: Thread?) = (.success([]), nil)

/// If redirected, stderr result and reference to the thread reading the output.
private var stderr: (result: Result<[UInt8], Swift.Error>, thread: Thread?) = (.success([]), nil)

/// Queue to protect concurrent reads.
private let serialQueue = DispatchQueue(label: "org.swift.swiftpm.process")
public var launched: Bool {
return self.launchedLock.withLock {
return self._launched
}
}

/// Queue to protect reading/writing on map of validated executables.
private static let executablesQueue = DispatchQueue(
label: "org.swift.swiftpm.process.findExecutable")
/// How process redirects its output.
public let outputRedirection: OutputRedirection

/// Indicates if a new progress group is created for the child process.
private let startNewProcessGroup: Bool
Expand All @@ -257,7 +266,8 @@ public final class Process: ObjectIdentifierProtocol {
///
/// Key: Executable name or path.
/// Value: Path to the executable, if found.
static private var validatedExecutablesMap = [String: AbsolutePath?]()
private static var validatedExecutablesMap = [String: AbsolutePath?]()
private static let validatedExecutablesMapLock = Lock()

/// Create a new process instance.
///
Expand Down Expand Up @@ -348,7 +358,7 @@ public final class Process: ObjectIdentifierProtocol {
}
// This should cover the most common cases, i.e. when the cache is most helpful.
if workingDirectory == localFileSystem.currentWorkingDirectory {
return Process.executablesQueue.sync {
return Process.validatedExecutablesMapLock.withLock {
if let value = Process.validatedExecutablesMap[program] {
return value
}
Expand All @@ -367,10 +377,11 @@ public final class Process: ObjectIdentifierProtocol {
@discardableResult
public func launch() throws -> WritableByteStream {
precondition(arguments.count > 0 && !arguments[0].isEmpty, "Need at least one argument to launch the process.")
precondition(!launched, "It is not allowed to launch the same process object again.")

// Set the launch bool to true.
launched = true
self.launchedLock.withLock {
precondition(!self._launched, "It is not allowed to launch the same process object again.")
self._launched = true
}

// Print the arguments if we are verbose.
if self.verbose {
Expand All @@ -393,30 +404,69 @@ public final class Process: ObjectIdentifierProtocol {
let stdinPipe = Pipe()
_process?.standardInput = stdinPipe

let group = DispatchGroup()

var stdout: [UInt8] = []
let stdoutLock = Lock()

var stderr: [UInt8] = []
let stderrLock = Lock()

if outputRedirection.redirectsOutput {
let stdoutPipe = Pipe()
let stderrPipe = Pipe()

group.enter()
stdoutPipe.fileHandleForReading.readabilityHandler = { (fh : FileHandle) -> Void in
let contents = fh.readDataToEndOfFile()
self.outputRedirection.outputClosures?.stdoutClosure([UInt8](contents))
if case .success(let data) = self.stdout.result {
self.stdout.result = .success(data + contents)
let data = fh.availableData
if (data.count == 0) {
stdoutPipe.fileHandleForReading.readabilityHandler = nil
group.leave()
} else {
let contents = data.withUnsafeBytes { Array<UInt8>($0) }
self.outputRedirection.outputClosures?.stdoutClosure(contents)
stdoutLock.withLock {
stdout += contents
}
}
}

group.enter()
stderrPipe.fileHandleForReading.readabilityHandler = { (fh : FileHandle) -> Void in
let contents = fh.readDataToEndOfFile()
self.outputRedirection.outputClosures?.stderrClosure([UInt8](contents))
if case .success(let data) = self.stderr.result {
self.stderr.result = .success(data + contents)
let data = fh.availableData
if (data.count == 0) {
stderrPipe.fileHandleForReading.readabilityHandler = nil
group.leave()
} else {
let contents = data.withUnsafeBytes { Array<UInt8>($0) }
self.outputRedirection.outputClosures?.stderrClosure(contents)
stderrLock.withLock {
stderr += contents
}
}
}

_process?.standardOutput = stdoutPipe
_process?.standardError = stderrPipe
}

// first set state then start reading threads
let sync = DispatchGroup()
sync.enter()
self.stateLock.withLock {
self.state = .readingOutputPipe(sync: sync)
}

group.notify(queue: .global()) {
self.stateLock.withLock {
self.state = .outputReady(stdout: .success(stdout), stderr: .success(stderr))
}
sync.leave()
}

try _process?.run()
return stdinPipe.fileHandleForWriting
#else
#else
// Initialize the spawn attributes.
#if canImport(Darwin) || os(Android)
var attributes: posix_spawnattr_t? = nil
Expand Down Expand Up @@ -547,72 +597,112 @@ public final class Process: ObjectIdentifierProtocol {
// Close the local read end of the input pipe.
try close(fd: stdinPipe[0])

if outputRedirection.redirectsOutput {
if !outputRedirection.redirectsOutput {
// no stdout or stderr in this case
self.stateLock.withLock {
self.state = .outputReady(stdout: .success([]), stderr: .success([]))
}
} else {
var pending: Result<[UInt8], Swift.Error>?
let pendingLock = Lock()

let outputClosures = outputRedirection.outputClosures

// Close the local write end of the output pipe.
try close(fd: outputPipe[1])

// Create a thread and start reading the output on it.
var thread = Thread { [weak self] in
let stdoutThread = Thread { [weak self] in
if let readResult = self?.readOutput(onFD: outputPipe[0], outputClosure: outputClosures?.stdoutClosure) {
self?.stdout.result = readResult
pendingLock.withLock {
if let stderrResult = pending {
self?.stateLock.withLock {
self?.state = .outputReady(stdout: readResult, stderr: stderrResult)
}
} else {
pending = readResult
}
}
} else if let stderrResult = (pendingLock.withLock { pending }) {
// TODO: this is more of an error
self?.stateLock.withLock {
self?.state = .outputReady(stdout: .success([]), stderr: stderrResult)
}
}
}
thread.start()
self.stdout.thread = thread

// Only schedule a thread for stderr if no redirect was requested.
var stderrThread: Thread? = nil
if !outputRedirection.redirectStderr {
// Close the local write end of the stderr pipe.
try close(fd: stderrPipe[1])

// Create a thread and start reading the stderr output on it.
thread = Thread { [weak self] in
stderrThread = Thread { [weak self] in
if let readResult = self?.readOutput(onFD: stderrPipe[0], outputClosure: outputClosures?.stderrClosure) {
self?.stderr.result = readResult
pendingLock.withLock {
if let stdoutResult = pending {
self?.stateLock.withLock {
self?.state = .outputReady(stdout: stdoutResult, stderr: readResult)
}
} else {
pending = readResult
}
}
} else if let stdoutResult = (pendingLock.withLock { pending }) {
// TODO: this is more of an error
self?.stateLock.withLock {
self?.state = .outputReady(stdout: stdoutResult, stderr: .success([]))
}
}
}
thread.start()
self.stderr.thread = thread
} else {
pendingLock.withLock {
pending = .success([]) // no stderr in this case
}
}
// first set state then start reading threads
self.stateLock.withLock {
self.state = .readingOutputThread(stdout: stdoutThread, stderr: stderrThread)
}
stdoutThread.start()
stderrThread?.start()
}

return stdinStream
#endif // POSIX implementation
#endif // POSIX implementation
}

/// Blocks the calling process until the subprocess finishes execution.
@discardableResult
public func waitUntilExit() throws -> ProcessResult {
#if os(Windows)
precondition(_process != nil, "The process is not yet launched.")
let p = _process!
p.waitUntilExit()
stdout.thread?.join()
stderr.thread?.join()

let executionResult = ProcessResult(
arguments: arguments,
environment: environment,
exitStatusCode: p.terminationStatus,
output: stdout.result,
stderrOutput: stderr.result
)
return executionResult
#else
return try serialQueue.sync {
precondition(launched, "The process is not yet launched.")

// If the process has already finsihed, return it.
if let existingResult = _result {
return existingResult
}

self.stateLock.lock()
switch self.state {
case .idle:
defer { self.stateLock.unlock() }
preconditionFailure("The process is not yet launched.")
case .complete(let result):
defer { self.stateLock.unlock() }
return result
case .readingOutputThread(let stdoutThread, let stderrThread):
self.stateLock.unlock() // unlock early since output read thread need to change state
// If we're reading output, make sure that is finished.
stdout.thread?.join()
stderr.thread?.join()

stdoutThread.join()
stderrThread?.join()
return try self.waitUntilExit()
case .readingOutputPipe(let sync):
self.stateLock.unlock() // unlock early since output read thread need to change state
sync.wait()
return try self.waitUntilExit()
case .outputReady(let stdoutResult, let stderrResult):
defer { self.stateLock.unlock() }
// Wait until process finishes execution.
#if os(Windows)
precondition(_process != nil, "The process is not yet launched.")
let p = _process!
p.waitUntilExit()
let exitStatusCode = p.terminationStatus
#else
var exitStatusCode: Int32 = 0
var result = waitpid(processID, &exitStatusCode, 0)
while result == -1 && errno == EINTR {
Expand All @@ -621,19 +711,19 @@ public final class Process: ObjectIdentifierProtocol {
if result == -1 {
throw SystemError.waitpid(errno)
}
#endif

// Construct the result.
let executionResult = ProcessResult(
arguments: arguments,
environment: environment,
exitStatusCode: exitStatusCode,
output: stdout.result,
stderrOutput: stderr.result
output: stdoutResult,
stderrOutput: stderrResult
)
self._result = executionResult
self.state = .complete(executionResult)
return executionResult
}
#endif
}

#if !os(Windows)
Expand Down Expand Up @@ -687,12 +777,12 @@ public final class Process: ObjectIdentifierProtocol {
public func signal(_ signal: Int32) {
#if os(Windows)
if signal == SIGINT {
_process?.interrupt()
_process?.interrupt()
} else {
_process?.terminate()
_process?.terminate()
}
#else
assert(launched, "The process is not yet launched.")
assert(self.launched, "The process is not yet launched.")
_ = TSCLibc.kill(startNewProcessGroup ? -processID : processID, signal)
#endif
}
Expand Down
2 changes: 1 addition & 1 deletion Tests/TSCBasicTests/ProcessSetTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ProcessSetTests: XCTestCase {
threadStartCondition.signal()
}
let result = try process.waitUntilExit()
// Ensure we did termiated due to signal.
// Ensure we did terminated due to signal.
switch result.exitStatus {
case .signalled: break
default: XCTFail("Expected to exit via signal")
Expand Down