Skip to content

Refactor daemon pool to fallback to python service #8515

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 46 additions & 35 deletions src/client/common/process/pythonDaemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import * as util from 'util';
import { MessageConnection, NotificationType, RequestType, RequestType0 } from 'vscode-jsonrpc';
import { traceError, traceWarning } from '../logger';
import { IDisposable } from '../types';
import { createDeferred, Deferred } from '../utils/async';
import { noop } from '../utils/misc';
import { Architecture } from '../utils/platform';
import { parsePythonVersion } from '../utils/version';
Expand All @@ -27,22 +28,37 @@ import {

type ErrorResponse = { error?: string };

export class ConnectionClosedError extends Error {
constructor(public readonly message: string){
super();
}
}
export class PythonDaemonExecutionService implements IPythonDaemonExecutionService {
private connectionClosedMessage?: string;
private connectionClosedMessage: string = '';
private outputObservale = new Subject<Output<string>>();
// tslint:disable-next-line: no-any
private readonly connectionClosedDeferred: Deferred<any>;
private disposables: IDisposable[] = [];
public get isAlive(): boolean {
return this.connectionClosedMessage === '';
}
constructor(
protected readonly pythonExecutionService: IPythonExecutionService,
protected readonly pythonPath: string,
protected readonly daemonProc: ChildProcess,
protected readonly connection: MessageConnection
public readonly proc: ChildProcess,
public readonly connection: MessageConnection
) {
// tslint:disable-next-line: no-any
this.connectionClosedDeferred = createDeferred<any>();
// This promise gets used conditionally, if it doesn't get used, and the promise is rejected,
// then node logs errors. We don't want that, hence add a dummy error handler.
this.connectionClosedDeferred.promise.catch(noop);
this.monitorConnection();
}
public dispose() {
try {
this.connection.dispose();
this.daemonProc.kill();
this.proc.kill();
} catch {
noop();
}
Expand All @@ -53,7 +69,7 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
try {
type InterpreterInfoResponse = ErrorResponse & { versionInfo: PythonVersionInfo; sysPrefix: string; sysVersion: string; is64Bit: boolean };
const request = new RequestType0<InterpreterInfoResponse, void, void>('get_interpreter_information');
const response = await this.connection.sendRequest(request);
const response = await this.sendRequestWithoutArgs(request);
const versionValue = response.versionInfo.length === 4 ? `${response.versionInfo.slice(0, 3).join('.')}-${response.versionInfo[3]}` : response.versionInfo.join('.');
return {
architecture: response.is64Bit ? Architecture.x64 : Architecture.x86,
Expand All @@ -71,7 +87,7 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
try {
type ExecutablePathResponse = ErrorResponse & { path: string };
const request = new RequestType0<ExecutablePathResponse, void, void>('get_executable');
const response = await this.connection.sendRequest(request);
const response = await this.sendRequestWithoutArgs(request);
if (response.error) {
throw new Error(response.error);
}
Expand All @@ -85,7 +101,7 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
try {
type ModuleInstalledResponse = ErrorResponse & { exists: boolean };
const request = new RequestType<{ module_name: string }, ModuleInstalledResponse, void, void>('is_module_installed');
const response = await this.connection.sendRequest(request, { module_name: moduleName });
const response = await this.sendRequest(request, { module_name: moduleName });
if (response.error) {
throw new Error(response.error);
}
Expand All @@ -112,31 +128,17 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
}
public async exec(args: string[], options: SpawnOptions): Promise<ExecutionResult<string>> {
this.throwIfRPCConnectionIsDead();
if (!this.canExecFileUsingDaemon(args, options)) {
return this.pythonExecutionService.exec(args, options);
}
try {
return await this.execFileWithDaemon(args[0], args.slice(1), options);
} catch (ex) {
// This is a handled error (error from user code that must be bubbled up).
if (ex instanceof StdErrError){
throw ex;
}
if (this.canExecFileUsingDaemon(args, options)) {
return this.execFileWithDaemon(args[0], args.slice(1), options);
} else {
return this.pythonExecutionService.exec(args, options);
}
}
public async execModule(moduleName: string, args: string[], options: SpawnOptions): Promise<ExecutionResult<string>> {
this.throwIfRPCConnectionIsDead();
if (!this.canExecModuleUsingDaemon(moduleName, args, options)) {
return this.pythonExecutionService.execModule(moduleName, args, options);
}
try {
return await this.execModuleWithDaemon(moduleName, args, options);
} catch (ex) {
// This is a handled error (error from user code that must be bubbled up).
if (ex instanceof StdErrError){
throw ex;
}
if (this.canExecModuleUsingDaemon(moduleName, args, options)) {
return this.execModuleWithDaemon(moduleName, args, options);
} else {
return this.pythonExecutionService.execModule(moduleName, args, options);
}
}
Expand All @@ -151,6 +153,13 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
// tslint:disable-next-line: no-any
return Object.keys(options).every(item => daemonSupportedSpawnOptions.indexOf(item as any) >= 0);
}
private sendRequestWithoutArgs<R, E, RO>(type: RequestType0<R, E, RO>): Thenable<R> {
return Promise.race([this.connection.sendRequest(type), this.connectionClosedDeferred.promise]);
}
private sendRequest<P, R, E, RO>(type: RequestType<P, R, E, RO>, params?: P): Thenable<R> {
// Throw an error if the connection has been closed.
return Promise.race([this.connection.sendRequest(type, params), this.connectionClosedDeferred.promise]);
}
/**
* Process the response.
*
Expand All @@ -161,7 +170,6 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
*/
private processResponse(response: { error?: string | undefined; stdout: string; stderr?: string }, options: SpawnOptions) {
if (response.error) {
traceError('Failed to execute file using the daemon', response.error);
throw new StdErrError(`Failed to execute using the daemon, ${response.error}`);
}
// Throw an error if configured to do so if there's any output in stderr.
Expand All @@ -177,7 +185,7 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
type ExecResponse = ErrorResponse & { stdout: string; stderr?: string };
// tslint:disable-next-line: no-any
const request = new RequestType<{ file_name: string; args: string[]; cwd?: string; env?: any }, ExecResponse, void, void>('exec_file');
const response = await this.connection.sendRequest(request, { file_name: fileName, args, cwd: options.cwd, env: options.env });
const response = await this.sendRequest(request, { file_name: fileName, args, cwd: options.cwd, env: options.env });
this.processResponse(response, options);
return response;
}
Expand All @@ -188,7 +196,7 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
type ExecResponse = ErrorResponse & { stdout: string; stderr?: string };
// tslint:disable-next-line: no-any
const request = new RequestType<{ module_name: string; args: string[]; cwd?: string; env?: any }, ExecResponse, void, void>('exec_module');
const response = await this.connection.sendRequest(request, { module_name: moduleName, args, cwd: options.cwd, env: options.env });
const response = await this.sendRequest(request, { module_name: moduleName, args, cwd: options.cwd, env: options.env });
this.processResponse(response, options);
return response;
}
Expand All @@ -203,19 +211,19 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
if ('fileName' in moduleOrFile) {
// tslint:disable-next-line: no-any
const request = new RequestType<{ file_name: string; args: string[]; cwd?: string; env?: any }, ExecResponse, void, void>('exec_file_observable');
response = await this.connection.sendRequest(request, { file_name: moduleOrFile.fileName, args, cwd: options.cwd, env: options.env });
response = await this.sendRequest(request, { file_name: moduleOrFile.fileName, args, cwd: options.cwd, env: options.env });
} else {
// tslint:disable-next-line: no-any
const request = new RequestType<{ module_name: string; args: string[]; cwd?: string; env?: any }, ExecResponse, void, void>('exec_module_observable');
response = await this.connection.sendRequest(request, { module_name: moduleOrFile.moduleName, args, cwd: options.cwd, env: options.env });
response = await this.sendRequest(request, { module_name: moduleOrFile.moduleName, args, cwd: options.cwd, env: options.env });
}
// Might not get a response object back, as its observable.
if (response && response.error){
throw new StdErrError(response.error);
}
};
let stdErr = '';
this.daemonProc.stderr.on('data', (output: string | Buffer) => (stdErr += output.toString()));
this.proc.stderr.on('data', (output: string | Buffer) => (stdErr += output.toString()));
// Wire up stdout/stderr.
const subscription = this.outputObservale.subscribe(out => {
if (out.source === 'stderr' && options.throwOnStdErr) {
Expand Down Expand Up @@ -243,15 +251,16 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
.ignoreErrors();

return {
proc: this.daemonProc,
proc: this.proc,
dispose: () => this.dispose(),
out: subject
};
}
private monitorConnection() {
// tslint:disable-next-line: no-any
const logConnectionStatus = (msg: string, ex?: any) => {
this.connectionClosedMessage = msg + (ex ? `, With Error: ${util.format(ex)}` : '');
this.connectionClosedMessage += msg + (ex ? `, With Error: ${util.format(ex)}` : '');
this.connectionClosedDeferred.reject(new ConnectionClosedError(this.connectionClosedMessage));
traceWarning(msg);
if (ex) {
traceError('Connection errored', ex);
Expand All @@ -260,6 +269,8 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
this.disposables.push(this.connection.onClose(() => logConnectionStatus('Daemon Connection Closed')));
this.disposables.push(this.connection.onDispose(() => logConnectionStatus('Daemon Connection disposed')));
this.disposables.push(this.connection.onError(ex => logConnectionStatus('Daemon Connection errored', ex)));
// this.proc.on('error', error => logConnectionStatus('Daemon Processed died with error', error));
this.proc.on('exit', code => logConnectionStatus('Daemon Processed died with exit code', code));
// Wire up stdout/stderr.
const OuputNotification = new NotificationType<Output<string>, void>('output');
this.connection.onNotification(OuputNotification, output => this.outputObservale.next(output));
Expand Down
Loading