Skip to content

improve process state management #203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 140 additions & 81 deletions Sources/TSCBasic/Process.swift
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,20 @@ public final class Process: ObjectIdentifierProtocol {
}
}

// process execution mutable state
private enum State {
case idle
case readingOutput(stdout: Thread, stderr: Thread?)
case outputReady(stdout: Result<[UInt8], Swift.Error>, stderr: Result<[UInt8], Swift.Error>)
case complete(ProcessResult)
}

/// Typealias for process id type.
#if !os(Windows)
#if !os(Windows)
public typealias ProcessID = pid_t
#else
#else
public typealias ProcessID = DWORD
#endif
#endif

/// Typealias for stdout/stderr output closure.
public typealias OutputClosure = ([UInt8]) -> Void
Expand All @@ -210,45 +218,47 @@ public final class Process: ObjectIdentifierProtocol {
public let workingDirectory: AbsolutePath?

/// The process id of the spawned process, available after the process is launched.
#if os(Windows)
#if os(Windows)
private var _process: Foundation.Process?
public var processID: ProcessID {
return DWORD(_process?.processIdentifier ?? 0)
}
#else
#else
public private(set) var processID = ProcessID()
#endif
#endif

// process execution mutable state
private var state: State = .idle

/// If the subprocess has launched.
/// Note: This property is not protected by the serial queue because it is only mutated in `launch()`, which will be
/// called only once.
public private(set) var launched = false
/// Lock to protect execution state
private let stateLock = Lock()

/// The result of the process execution. Available after process is terminated.
/// This will block while the process is running, as such equivalent to `waitUntilExit`
@available(*, deprecated, message: "use waitUntilExit instead")
public var result: ProcessResult? {
return self.serialQueue.sync {
self._result
return self.stateLock.withLock {
switch self.state {
case .complete(let result):
return result
default:
return nil
}
}
}

/// How process redirects its output.
public let outputRedirection: OutputRedirection
// ideally we would use the state for this, but we need to access it while the waitForExit is locking state
private var _launched = false
private let launchedLock = Lock()

/// The result of the process execution. Available after process is terminated.
private var _result: ProcessResult?

/// If redirected, stdout result and reference to the thread reading the output.
private var stdout: (result: Result<[UInt8], Swift.Error>, thread: Thread?) = (.success([]), nil)

/// If redirected, stderr result and reference to the thread reading the output.
private var stderr: (result: Result<[UInt8], Swift.Error>, thread: Thread?) = (.success([]), nil)

/// Queue to protect concurrent reads.
private let serialQueue = DispatchQueue(label: "org.swift.swiftpm.process")
public var launched: Bool {
return self.launchedLock.withLock {
return self._launched
}
}

/// Queue to protect reading/writing on map of validated executables.
private static let executablesQueue = DispatchQueue(
label: "org.swift.swiftpm.process.findExecutable")
/// How process redirects its output.
public let outputRedirection: OutputRedirection

/// Indicates if a new progress group is created for the child process.
private let startNewProcessGroup: Bool
Expand All @@ -257,7 +267,10 @@ public final class Process: ObjectIdentifierProtocol {
///
/// Key: Executable name or path.
/// Value: Path to the executable, if found.
static private var validatedExecutablesMap = [String: AbsolutePath?]()
private static var validatedExecutablesMap = [String: AbsolutePath?]()

// Lock to protect reading/writing on validatedExecutablesMap.
private static let validatedExecutablesMapLock = Lock()

/// Create a new process instance.
///
Expand Down Expand Up @@ -348,7 +361,7 @@ public final class Process: ObjectIdentifierProtocol {
}
// This should cover the most common cases, i.e. when the cache is most helpful.
if workingDirectory == localFileSystem.currentWorkingDirectory {
return Process.executablesQueue.sync {
return Process.validatedExecutablesMapLock.withLock {
if let value = Process.validatedExecutablesMap[program] {
return value
}
Expand All @@ -364,10 +377,11 @@ public final class Process: ObjectIdentifierProtocol {
/// Launch the subprocess.
public func launch() throws {
precondition(arguments.count > 0 && !arguments[0].isEmpty, "Need at least one argument to launch the process.")
precondition(!launched, "It is not allowed to launch the same process object again.")

// Set the launch bool to true.
launched = true
self.launchedLock.withLock {
precondition(!self._launched, "It is not allowed to launch the same process object again.")
self._launched = true
}

// Print the arguments if we are verbose.
if self.verbose {
Expand All @@ -381,7 +395,7 @@ public final class Process: ObjectIdentifierProtocol {
throw Process.Error.missingExecutableProgram(program: executable)
}

#if os(Windows)
#if os(Windows)
_process = Foundation.Process()
_process?.arguments = Array(arguments.dropFirst()) // Avoid including the executable URL twice.
_process?.executableURL = executablePath.asURL
Expand Down Expand Up @@ -409,13 +423,13 @@ public final class Process: ObjectIdentifierProtocol {
}

try _process?.run()
#else
#else
// Initialize the spawn attributes.
#if canImport(Darwin) || os(Android)
#if canImport(Darwin) || os(Android)
var attributes: posix_spawnattr_t? = nil
#else
#else
var attributes = posix_spawnattr_t()
#endif
#endif
posix_spawnattr_init(&attributes)
defer { posix_spawnattr_destroy(&attributes) }

Expand All @@ -425,13 +439,13 @@ public final class Process: ObjectIdentifierProtocol {
posix_spawnattr_setsigmask(&attributes, &noSignals)

// Reset all signals to default behavior.
#if os(macOS)
#if os(macOS)
var mostSignals = sigset_t()
sigfillset(&mostSignals)
sigdelset(&mostSignals, SIGKILL)
sigdelset(&mostSignals, SIGSTOP)
posix_spawnattr_setsigdefault(&attributes, &mostSignals)
#else
#else
// On Linux, this can only be used to reset signals that are legal to
// modify, so we have to take care about the set we use.
var mostSignals = sigset_t()
Expand All @@ -443,7 +457,7 @@ public final class Process: ObjectIdentifierProtocol {
sigaddset(&mostSignals, i)
}
posix_spawnattr_setsigdefault(&attributes, &mostSignals)
#endif
#endif

// Set the attribute flags.
var flags = POSIX_SPAWN_SETSIGMASK | POSIX_SPAWN_SETSIGDEF
Expand All @@ -456,31 +470,31 @@ public final class Process: ObjectIdentifierProtocol {
posix_spawnattr_setflags(&attributes, Int16(flags))

// Setup the file actions.
#if canImport(Darwin) || os(Android)
#if canImport(Darwin) || os(Android)
var fileActions: posix_spawn_file_actions_t? = nil
#else
#else
var fileActions = posix_spawn_file_actions_t()
#endif
#endif
posix_spawn_file_actions_init(&fileActions)
defer { posix_spawn_file_actions_destroy(&fileActions) }

if let workingDirectory = workingDirectory?.pathString {
#if os(macOS)
#if os(macOS)
// The only way to set a workingDirectory is using an availability-gated initializer, so we don't need
// to handle the case where the posix_spawn_file_actions_addchdir_np method is unavailable. This check only
// exists here to make the compiler happy.
if #available(macOS 10.15, *) {
posix_spawn_file_actions_addchdir_np(&fileActions, workingDirectory)
}
#elseif os(Linux)
#elseif os(Linux)
guard SPM_posix_spawn_file_actions_addchdir_np_supported() else {
throw Process.Error.workingDirectoryNotSupported
}

SPM_posix_spawn_file_actions_addchdir_np(&fileActions, workingDirectory)
#else
#else
throw Process.Error.workingDirectoryNotSupported
#endif
#endif
}

// Workaround for https://sourceware.org/git/gitweb.cgi?p=glibc.git;h=89e435f3559c53084498e9baad22172b64429362
Expand Down Expand Up @@ -534,43 +548,84 @@ public final class Process: ObjectIdentifierProtocol {
throw SystemError.posix_spawn(rv, arguments)
}

if outputRedirection.redirectsOutput {
if !outputRedirection.redirectsOutput {
// no stdout or stderr in this case
self.stateLock.withLock {
self.state = .outputReady(stdout: .success([]), stderr: .success([]))
}
} else {
var outputResult: (stdout: Result<[UInt8], Swift.Error>?, stderr: Result<[UInt8], Swift.Error>?)
let outputResultLock = Lock()

let outputClosures = outputRedirection.outputClosures

// Close the write end of the output pipe.
try close(fd: &outputPipe[1])

// Create a thread and start reading the output on it.
var thread = Thread { [weak self] in
let stdoutThread = Thread { [weak self] in
if let readResult = self?.readOutput(onFD: outputPipe[0], outputClosure: outputClosures?.stdoutClosure) {
self?.stdout.result = readResult
outputResultLock.withLock {
if let stderrResult = outputResult.stderr {
self?.stateLock.withLock {
self?.state = .outputReady(stdout: readResult, stderr: stderrResult)
}
} else {
outputResult.stdout = readResult
}
}
} else if let stderrResult = (outputResultLock.withLock { outputResult.stderr }) {
// TODO: this is more of an error
self?.stateLock.withLock {
self?.state = .outputReady(stdout: .success([]), stderr: stderrResult)
}
}
}
thread.start()
self.stdout.thread = thread

// Only schedule a thread for stderr if no redirect was requested.
var stderrThread: Thread? = nil
if !outputRedirection.redirectStderr {
// Close the write end of the stderr pipe.
try close(fd: &stderrPipe[1])

// Create a thread and start reading the stderr output on it.
thread = Thread { [weak self] in
stderrThread = Thread { [weak self] in
if let readResult = self?.readOutput(onFD: stderrPipe[0], outputClosure: outputClosures?.stderrClosure) {
self?.stderr.result = readResult
outputResultLock.withLock {
if let stdoutResult = outputResult.stdout {
self?.stateLock.withLock {
self?.state = .outputReady(stdout: stdoutResult, stderr: readResult)
}
} else {
outputResult.stderr = readResult
}
}
} else if let stdoutResult = (outputResultLock.withLock { outputResult.stdout }) {
// TODO: this is more of an error
self?.stateLock.withLock {
self?.state = .outputReady(stdout: stdoutResult, stderr: .success([]))
}
}
}
thread.start()
self.stderr.thread = thread
} else {
outputResultLock.withLock {
outputResult.stderr = .success([]) // no stderr in this case
}
}
// first set state then start reading threads
self.stateLock.withLock {
self.state = .readingOutput(stdout: stdoutThread, stderr: stderrThread)
}
stdoutThread.start()
stderrThread?.start()
}
#endif // POSIX implementation
#endif // POSIX implementation
}

/// Blocks the calling process until the subprocess finishes execution.
@discardableResult
public func waitUntilExit() throws -> ProcessResult {
#if os(Windows)
#if os(Windows)
precondition(_process != nil, "The process is not yet launched.")
let p = _process!
p.waitUntilExit()
Expand All @@ -585,19 +640,23 @@ public final class Process: ObjectIdentifierProtocol {
stderrOutput: stderr.result
)
return executionResult
#else
return try serialQueue.sync {
precondition(launched, "The process is not yet launched.")

// If the process has already finsihed, return it.
if let existingResult = _result {
return existingResult
}

#else
self.stateLock.lock()
switch self.state {
case .idle:
defer { self.stateLock.unlock() }
preconditionFailure("The process is not yet launched.")
case .complete(let result):
defer { self.stateLock.unlock() }
return result
case .readingOutput(let stdoutThread, let stderrThread):
self.stateLock.unlock() // unlock early since output read thread need to change state
// If we're reading output, make sure that is finished.
stdout.thread?.join()
stderr.thread?.join()

stdoutThread.join()
stderrThread?.join()
return try self.waitUntilExit()
case .outputReady(let stdoutResult, let stderrResult):
defer { self.stateLock.unlock() }
// Wait until process finishes execution.
var exitStatusCode: Int32 = 0
var result = waitpid(processID, &exitStatusCode, 0)
Expand All @@ -613,13 +672,13 @@ public final class Process: ObjectIdentifierProtocol {
arguments: arguments,
environment: environment,
exitStatusCode: exitStatusCode,
output: stdout.result,
stderrOutput: stderr.result
output: stdoutResult,
stderrOutput: stderrResult
)
self._result = executionResult
self.state = .complete(executionResult)
return executionResult
}
#endif
#endif
}

#if !os(Windows)
Expand Down Expand Up @@ -671,16 +730,16 @@ public final class Process: ObjectIdentifierProtocol {
///
/// Note: This will signal all processes in the process group.
public func signal(_ signal: Int32) {
#if os(Windows)
#if os(Windows)
if signal == SIGINT {
_process?.interrupt()
_process?.interrupt()
} else {
_process?.terminate()
_process?.terminate()
}
#else
assert(launched, "The process is not yet launched.")
#else
assert(self.launched, "The process is not yet launched.")
_ = TSCLibc.kill(startNewProcessGroup ? -processID : processID, signal)
#endif
#endif
}
}

Expand Down
Loading