Skip to content

Commit 4303d8e

Browse files
authored
Refactor daemon pool to fallback to python service (#8515)
* Refactor daemon pool to fallback to python service * Address code review comments * Linter fixes
1 parent c634ffd commit 4303d8e

13 files changed

+1139
-126
lines changed

src/client/common/process/pythonDaemon.ts

Lines changed: 46 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import * as util from 'util';
1010
import { MessageConnection, NotificationType, RequestType, RequestType0 } from 'vscode-jsonrpc';
1111
import { traceError, traceWarning } from '../logger';
1212
import { IDisposable } from '../types';
13+
import { createDeferred, Deferred } from '../utils/async';
1314
import { noop } from '../utils/misc';
1415
import { Architecture } from '../utils/platform';
1516
import { parsePythonVersion } from '../utils/version';
@@ -27,22 +28,37 @@ import {
2728

2829
type ErrorResponse = { error?: string };
2930

31+
export class ConnectionClosedError extends Error {
32+
constructor(public readonly message: string){
33+
super();
34+
}
35+
}
3036
export class PythonDaemonExecutionService implements IPythonDaemonExecutionService {
31-
private connectionClosedMessage?: string;
37+
private connectionClosedMessage: string = '';
3238
private outputObservale = new Subject<Output<string>>();
39+
// tslint:disable-next-line: no-any
40+
private readonly connectionClosedDeferred: Deferred<any>;
3341
private disposables: IDisposable[] = [];
42+
public get isAlive(): boolean {
43+
return this.connectionClosedMessage === '';
44+
}
3445
constructor(
3546
protected readonly pythonExecutionService: IPythonExecutionService,
3647
protected readonly pythonPath: string,
37-
protected readonly daemonProc: ChildProcess,
38-
protected readonly connection: MessageConnection
48+
public readonly proc: ChildProcess,
49+
public readonly connection: MessageConnection
3950
) {
51+
// tslint:disable-next-line: no-any
52+
this.connectionClosedDeferred = createDeferred<any>();
53+
// This promise gets used conditionally, if it doesn't get used, and the promise is rejected,
54+
// then node logs errors. We don't want that, hence add a dummy error handler.
55+
this.connectionClosedDeferred.promise.catch(noop);
4056
this.monitorConnection();
4157
}
4258
public dispose() {
4359
try {
4460
this.connection.dispose();
45-
this.daemonProc.kill();
61+
this.proc.kill();
4662
} catch {
4763
noop();
4864
}
@@ -53,7 +69,7 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
5369
try {
5470
type InterpreterInfoResponse = ErrorResponse & { versionInfo: PythonVersionInfo; sysPrefix: string; sysVersion: string; is64Bit: boolean };
5571
const request = new RequestType0<InterpreterInfoResponse, void, void>('get_interpreter_information');
56-
const response = await this.connection.sendRequest(request);
72+
const response = await this.sendRequestWithoutArgs(request);
5773
const versionValue = response.versionInfo.length === 4 ? `${response.versionInfo.slice(0, 3).join('.')}-${response.versionInfo[3]}` : response.versionInfo.join('.');
5874
return {
5975
architecture: response.is64Bit ? Architecture.x64 : Architecture.x86,
@@ -71,7 +87,7 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
7187
try {
7288
type ExecutablePathResponse = ErrorResponse & { path: string };
7389
const request = new RequestType0<ExecutablePathResponse, void, void>('get_executable');
74-
const response = await this.connection.sendRequest(request);
90+
const response = await this.sendRequestWithoutArgs(request);
7591
if (response.error) {
7692
throw new Error(response.error);
7793
}
@@ -85,7 +101,7 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
85101
try {
86102
type ModuleInstalledResponse = ErrorResponse & { exists: boolean };
87103
const request = new RequestType<{ module_name: string }, ModuleInstalledResponse, void, void>('is_module_installed');
88-
const response = await this.connection.sendRequest(request, { module_name: moduleName });
104+
const response = await this.sendRequest(request, { module_name: moduleName });
89105
if (response.error) {
90106
throw new Error(response.error);
91107
}
@@ -112,31 +128,17 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
112128
}
113129
public async exec(args: string[], options: SpawnOptions): Promise<ExecutionResult<string>> {
114130
this.throwIfRPCConnectionIsDead();
115-
if (!this.canExecFileUsingDaemon(args, options)) {
116-
return this.pythonExecutionService.exec(args, options);
117-
}
118-
try {
119-
return await this.execFileWithDaemon(args[0], args.slice(1), options);
120-
} catch (ex) {
121-
// This is a handled error (error from user code that must be bubbled up).
122-
if (ex instanceof StdErrError){
123-
throw ex;
124-
}
131+
if (this.canExecFileUsingDaemon(args, options)) {
132+
return this.execFileWithDaemon(args[0], args.slice(1), options);
133+
} else {
125134
return this.pythonExecutionService.exec(args, options);
126135
}
127136
}
128137
public async execModule(moduleName: string, args: string[], options: SpawnOptions): Promise<ExecutionResult<string>> {
129138
this.throwIfRPCConnectionIsDead();
130-
if (!this.canExecModuleUsingDaemon(moduleName, args, options)) {
131-
return this.pythonExecutionService.execModule(moduleName, args, options);
132-
}
133-
try {
134-
return await this.execModuleWithDaemon(moduleName, args, options);
135-
} catch (ex) {
136-
// This is a handled error (error from user code that must be bubbled up).
137-
if (ex instanceof StdErrError){
138-
throw ex;
139-
}
139+
if (this.canExecModuleUsingDaemon(moduleName, args, options)) {
140+
return this.execModuleWithDaemon(moduleName, args, options);
141+
} else {
140142
return this.pythonExecutionService.execModule(moduleName, args, options);
141143
}
142144
}
@@ -151,6 +153,13 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
151153
// tslint:disable-next-line: no-any
152154
return Object.keys(options).every(item => daemonSupportedSpawnOptions.indexOf(item as any) >= 0);
153155
}
156+
private sendRequestWithoutArgs<R, E, RO>(type: RequestType0<R, E, RO>): Thenable<R> {
157+
return Promise.race([this.connection.sendRequest(type), this.connectionClosedDeferred.promise]);
158+
}
159+
private sendRequest<P, R, E, RO>(type: RequestType<P, R, E, RO>, params?: P): Thenable<R> {
160+
// Throw an error if the connection has been closed.
161+
return Promise.race([this.connection.sendRequest(type, params), this.connectionClosedDeferred.promise]);
162+
}
154163
/**
155164
* Process the response.
156165
*
@@ -161,7 +170,6 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
161170
*/
162171
private processResponse(response: { error?: string | undefined; stdout: string; stderr?: string }, options: SpawnOptions) {
163172
if (response.error) {
164-
traceError('Failed to execute file using the daemon', response.error);
165173
throw new StdErrError(`Failed to execute using the daemon, ${response.error}`);
166174
}
167175
// Throw an error if configured to do so if there's any output in stderr.
@@ -177,7 +185,7 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
177185
type ExecResponse = ErrorResponse & { stdout: string; stderr?: string };
178186
// tslint:disable-next-line: no-any
179187
const request = new RequestType<{ file_name: string; args: string[]; cwd?: string; env?: any }, ExecResponse, void, void>('exec_file');
180-
const response = await this.connection.sendRequest(request, { file_name: fileName, args, cwd: options.cwd, env: options.env });
188+
const response = await this.sendRequest(request, { file_name: fileName, args, cwd: options.cwd, env: options.env });
181189
this.processResponse(response, options);
182190
return response;
183191
}
@@ -188,7 +196,7 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
188196
type ExecResponse = ErrorResponse & { stdout: string; stderr?: string };
189197
// tslint:disable-next-line: no-any
190198
const request = new RequestType<{ module_name: string; args: string[]; cwd?: string; env?: any }, ExecResponse, void, void>('exec_module');
191-
const response = await this.connection.sendRequest(request, { module_name: moduleName, args, cwd: options.cwd, env: options.env });
199+
const response = await this.sendRequest(request, { module_name: moduleName, args, cwd: options.cwd, env: options.env });
192200
this.processResponse(response, options);
193201
return response;
194202
}
@@ -203,19 +211,19 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
203211
if ('fileName' in moduleOrFile) {
204212
// tslint:disable-next-line: no-any
205213
const request = new RequestType<{ file_name: string; args: string[]; cwd?: string; env?: any }, ExecResponse, void, void>('exec_file_observable');
206-
response = await this.connection.sendRequest(request, { file_name: moduleOrFile.fileName, args, cwd: options.cwd, env: options.env });
214+
response = await this.sendRequest(request, { file_name: moduleOrFile.fileName, args, cwd: options.cwd, env: options.env });
207215
} else {
208216
// tslint:disable-next-line: no-any
209217
const request = new RequestType<{ module_name: string; args: string[]; cwd?: string; env?: any }, ExecResponse, void, void>('exec_module_observable');
210-
response = await this.connection.sendRequest(request, { module_name: moduleOrFile.moduleName, args, cwd: options.cwd, env: options.env });
218+
response = await this.sendRequest(request, { module_name: moduleOrFile.moduleName, args, cwd: options.cwd, env: options.env });
211219
}
212220
// Might not get a response object back, as its observable.
213221
if (response && response.error){
214222
throw new StdErrError(response.error);
215223
}
216224
};
217225
let stdErr = '';
218-
this.daemonProc.stderr.on('data', (output: string | Buffer) => (stdErr += output.toString()));
226+
this.proc.stderr.on('data', (output: string | Buffer) => (stdErr += output.toString()));
219227
// Wire up stdout/stderr.
220228
const subscription = this.outputObservale.subscribe(out => {
221229
if (out.source === 'stderr' && options.throwOnStdErr) {
@@ -243,15 +251,16 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
243251
.ignoreErrors();
244252

245253
return {
246-
proc: this.daemonProc,
254+
proc: this.proc,
247255
dispose: () => this.dispose(),
248256
out: subject
249257
};
250258
}
251259
private monitorConnection() {
252260
// tslint:disable-next-line: no-any
253261
const logConnectionStatus = (msg: string, ex?: any) => {
254-
this.connectionClosedMessage = msg + (ex ? `, With Error: ${util.format(ex)}` : '');
262+
this.connectionClosedMessage += msg + (ex ? `, With Error: ${util.format(ex)}` : '');
263+
this.connectionClosedDeferred.reject(new ConnectionClosedError(this.connectionClosedMessage));
255264
traceWarning(msg);
256265
if (ex) {
257266
traceError('Connection errored', ex);
@@ -260,6 +269,8 @@ export class PythonDaemonExecutionService implements IPythonDaemonExecutionServi
260269
this.disposables.push(this.connection.onClose(() => logConnectionStatus('Daemon Connection Closed')));
261270
this.disposables.push(this.connection.onDispose(() => logConnectionStatus('Daemon Connection disposed')));
262271
this.disposables.push(this.connection.onError(ex => logConnectionStatus('Daemon Connection errored', ex)));
272+
// this.proc.on('error', error => logConnectionStatus('Daemon Processed died with error', error));
273+
this.proc.on('exit', code => logConnectionStatus('Daemon Processed died with exit code', code));
263274
// Wire up stdout/stderr.
264275
const OuputNotification = new NotificationType<Output<string>, void>('output');
265276
this.connection.onNotification(OuputNotification, output => this.outputObservale.next(output));

0 commit comments

Comments
 (0)