Skip to content

Commit 55841b4

Browse files
authored
Refactor daemon pool factory to allow creation of daemons outs… (#11310)
* Allow creation of dedicated/single use daemons * Refactor * Oops * Fix linter issues * Format py code * Add comments * More refactor * Some more * Fix tests * Linter fixes
1 parent 7076152 commit 55841b4

18 files changed

+630
-469
lines changed

pythonFiles/vscode_datascience_helpers/daemon/__main__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111

1212
log = logging.getLogger(__name__)
1313

14-
LOG_FORMAT = "%(asctime)s UTC - %(levelname)s - %(name)s - %(message)s"
14+
LOG_FORMAT = (
15+
"%(asctime)s UTC - %(levelname)s - (PID: %(process)d) - %(name)s - %(message)s"
16+
)
1517
queue_handler = None
1618

1719

@@ -62,7 +64,11 @@ def set_server(self, server):
6264
self.queue = []
6365

6466
def emit(self, record):
65-
data = {"level": record.levelname, "msg": self.format(record)}
67+
data = {
68+
"level": record.levelname,
69+
"msg": self.format(record),
70+
"pid": os.getpid(),
71+
}
6672
# If we don't have the server, then queue it and send it later.
6773
if self.server is None:
6874
self.queue.append(data)
Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
'use strict';
5+
6+
import { ChildProcess } from 'child_process';
7+
import * as os from 'os';
8+
import { Subject } from 'rxjs/Subject';
9+
import * as util from 'util';
10+
import { MessageConnection, NotificationType, RequestType, RequestType0 } from 'vscode-jsonrpc';
11+
import { traceError, traceInfo, traceVerbose, traceWarning } from '../logger';
12+
import { IDisposable } from '../types';
13+
import { createDeferred, Deferred } from '../utils/async';
14+
import { noop } from '../utils/misc';
15+
import {
16+
ExecutionResult,
17+
IPythonExecutionService,
18+
ObservableExecutionResult,
19+
Output,
20+
SpawnOptions,
21+
StdErrError
22+
} from './types';
23+
24+
type ErrorResponse = { error?: string };
25+
26+
export class ConnectionClosedError extends Error {
27+
constructor(public readonly message: string) {
28+
super();
29+
}
30+
}
31+
32+
export class DaemonError extends Error {
33+
constructor(public readonly message: string) {
34+
super();
35+
}
36+
}
37+
export abstract class BasePythonDaemon {
38+
public get isAlive(): boolean {
39+
return this.connectionClosedMessage === '';
40+
}
41+
private connectionClosedMessage: string = '';
42+
private outputObservale = new Subject<Output<string>>();
43+
// tslint:disable-next-line: no-any
44+
private readonly connectionClosedDeferred: Deferred<any>;
45+
private disposables: IDisposable[] = [];
46+
private disposed = false;
47+
constructor(
48+
protected readonly pythonExecutionService: IPythonExecutionService,
49+
protected readonly pythonPath: string,
50+
public readonly proc: ChildProcess,
51+
public readonly connection: MessageConnection
52+
) {
53+
// tslint:disable-next-line: no-any
54+
this.connectionClosedDeferred = createDeferred<any>();
55+
// This promise gets used conditionally, if it doesn't get used, and the promise is rejected,
56+
// then node logs errors. We don't want that, hence add a dummy error handler.
57+
this.connectionClosedDeferred.promise.catch(noop);
58+
this.monitorConnection();
59+
}
60+
public dispose() {
61+
try {
62+
this.disposed = true;
63+
// The daemon should die as a result of this.
64+
this.connection.sendNotification(new NotificationType('exit'));
65+
this.proc.kill();
66+
} catch {
67+
noop();
68+
}
69+
this.disposables.forEach((item) => item.dispose());
70+
}
71+
public execObservable(args: string[], options: SpawnOptions): ObservableExecutionResult<string> {
72+
if (this.isAlive && this.canExecFileUsingDaemon(args, options)) {
73+
try {
74+
return this.execAsObservable({ fileName: args[0] }, args.slice(1), options);
75+
} catch (ex) {
76+
if (ex instanceof DaemonError || ex instanceof ConnectionClosedError) {
77+
traceWarning('Falling back to Python Execution Service due to failure in daemon', ex);
78+
return this.pythonExecutionService.execObservable(args, options);
79+
} else {
80+
throw ex;
81+
}
82+
}
83+
} else {
84+
return this.pythonExecutionService.execObservable(args, options);
85+
}
86+
}
87+
public execModuleObservable(
88+
moduleName: string,
89+
args: string[],
90+
options: SpawnOptions
91+
): ObservableExecutionResult<string> {
92+
if (this.isAlive && this.canExecModuleUsingDaemon(moduleName, args, options)) {
93+
try {
94+
return this.execAsObservable({ moduleName }, args, options);
95+
} catch (ex) {
96+
if (ex instanceof DaemonError || ex instanceof ConnectionClosedError) {
97+
traceWarning('Falling back to Python Execution Service due to failure in daemon', ex);
98+
return this.pythonExecutionService.execModuleObservable(moduleName, args, options);
99+
} else {
100+
throw ex;
101+
}
102+
}
103+
} else {
104+
return this.pythonExecutionService.execModuleObservable(moduleName, args, options);
105+
}
106+
}
107+
public async exec(args: string[], options: SpawnOptions): Promise<ExecutionResult<string>> {
108+
if (this.isAlive && this.canExecFileUsingDaemon(args, options)) {
109+
try {
110+
return await this.execFileWithDaemon(args[0], args.slice(1), options);
111+
} catch (ex) {
112+
if (ex instanceof DaemonError || ex instanceof ConnectionClosedError) {
113+
traceWarning('Falling back to Python Execution Service due to failure in daemon', ex);
114+
return this.pythonExecutionService.exec(args, options);
115+
} else {
116+
throw ex;
117+
}
118+
}
119+
} else {
120+
return this.pythonExecutionService.exec(args, options);
121+
}
122+
}
123+
public async execModule(
124+
moduleName: string,
125+
args: string[],
126+
options: SpawnOptions
127+
): Promise<ExecutionResult<string>> {
128+
if (this.isAlive && this.canExecModuleUsingDaemon(moduleName, args, options)) {
129+
try {
130+
return await this.execModuleWithDaemon(moduleName, args, options);
131+
} catch (ex) {
132+
if (ex instanceof DaemonError || ex instanceof ConnectionClosedError) {
133+
traceWarning('Falling back to Python Execution Service due to failure in daemon', ex);
134+
return this.pythonExecutionService.execModule(moduleName, args, options);
135+
} else {
136+
throw ex;
137+
}
138+
}
139+
} else {
140+
return this.pythonExecutionService.execModule(moduleName, args, options);
141+
}
142+
}
143+
protected canExecFileUsingDaemon(args: string[], options: SpawnOptions): boolean {
144+
return args[0].toLowerCase().endsWith('.py') && this.areOptionsSupported(options);
145+
}
146+
protected canExecModuleUsingDaemon(_moduleName: string, _args: string[], options: SpawnOptions): boolean {
147+
return this.areOptionsSupported(options);
148+
}
149+
protected areOptionsSupported(options: SpawnOptions): boolean {
150+
const daemonSupportedSpawnOptions: (keyof SpawnOptions)[] = [
151+
'cwd',
152+
'env',
153+
'throwOnStdErr',
154+
'token',
155+
'encoding',
156+
'mergeStdOutErr',
157+
'extraVariables'
158+
];
159+
// tslint:disable-next-line: no-any
160+
return Object.keys(options).every((item) => daemonSupportedSpawnOptions.indexOf(item as any) >= 0);
161+
}
162+
protected sendRequestWithoutArgs<R, E, RO>(type: RequestType0<R, E, RO>): Thenable<R> {
163+
return Promise.race([this.connection.sendRequest(type), this.connectionClosedDeferred.promise]);
164+
}
165+
protected sendRequest<P, R, E, RO>(type: RequestType<P, R, E, RO>, params?: P): Thenable<R> {
166+
// Throw an error if the connection has been closed.
167+
return Promise.race([this.connection.sendRequest(type, params), this.connectionClosedDeferred.promise]);
168+
}
169+
protected throwIfRPCConnectionIsDead() {
170+
if (!this.isAlive) {
171+
throw new ConnectionClosedError(this.connectionClosedMessage);
172+
}
173+
}
174+
protected execAsObservable(
175+
moduleOrFile: { moduleName: string } | { fileName: string },
176+
args: string[],
177+
options: SpawnOptions
178+
): ObservableExecutionResult<string> {
179+
const subject = new Subject<Output<string>>();
180+
const start = async () => {
181+
type ExecResponse = ErrorResponse & { stdout: string; stderr?: string };
182+
let response: ExecResponse;
183+
if ('fileName' in moduleOrFile) {
184+
const request = new RequestType<
185+
// tslint:disable-next-line: no-any
186+
{ file_name: string; args: string[]; cwd?: string; env?: any },
187+
ExecResponse,
188+
void,
189+
void
190+
>('exec_file_observable');
191+
response = await this.sendRequest(request, {
192+
file_name: moduleOrFile.fileName,
193+
args,
194+
cwd: options.cwd,
195+
env: options.env
196+
});
197+
} else {
198+
const request = new RequestType<
199+
// tslint:disable-next-line: no-any
200+
{ module_name: string; args: string[]; cwd?: string; env?: any },
201+
ExecResponse,
202+
void,
203+
void
204+
>('exec_module_observable');
205+
response = await this.sendRequest(request, {
206+
module_name: moduleOrFile.moduleName,
207+
args,
208+
cwd: options.cwd,
209+
env: options.env
210+
});
211+
}
212+
// Might not get a response object back, as its observable.
213+
if (response && response.error) {
214+
throw new DaemonError(response.error);
215+
}
216+
};
217+
let stdErr = '';
218+
this.proc.stderr.on('data', (output: string | Buffer) => (stdErr += output.toString()));
219+
// Wire up stdout/stderr.
220+
const subscription = this.outputObservale.subscribe((out) => {
221+
if (out.source === 'stderr' && options.throwOnStdErr) {
222+
subject.error(new StdErrError(out.out));
223+
} else if (out.source === 'stderr' && options.mergeStdOutErr) {
224+
subject.next({ source: 'stdout', out: out.out });
225+
} else {
226+
subject.next(out);
227+
}
228+
});
229+
start()
230+
.catch((ex) => {
231+
const errorMsg = `Failed to run ${
232+
'fileName' in moduleOrFile ? moduleOrFile.fileName : moduleOrFile.moduleName
233+
} as observable with args ${args.join(' ')}`;
234+
traceError(errorMsg, ex);
235+
subject.next({ source: 'stderr', out: `${errorMsg}\n${stdErr}` });
236+
subject.error(ex);
237+
})
238+
.finally(() => {
239+
// Wait until all messages are received.
240+
setTimeout(() => {
241+
subscription.unsubscribe();
242+
subject.complete();
243+
}, 100);
244+
})
245+
.ignoreErrors();
246+
247+
return {
248+
proc: this.proc,
249+
dispose: () => this.dispose(),
250+
out: subject
251+
};
252+
}
253+
/**
254+
* Process the response.
255+
*
256+
* @private
257+
* @param {{ error?: string | undefined; stdout: string; stderr?: string }} response
258+
* @param {SpawnOptions} options
259+
* @memberof PythonDaemonExecutionService
260+
*/
261+
private processResponse(
262+
response: { error?: string | undefined; stdout: string; stderr?: string },
263+
options: SpawnOptions
264+
) {
265+
if (response.error) {
266+
throw new DaemonError(`Failed to execute using the daemon, ${response.error}`);
267+
}
268+
// Throw an error if configured to do so if there's any output in stderr.
269+
if (response.stderr && options.throwOnStdErr) {
270+
throw new StdErrError(response.stderr);
271+
}
272+
// Merge stdout and stderr into on if configured to do so.
273+
if (response.stderr && options.mergeStdOutErr) {
274+
response.stdout = `${response.stdout || ''}${os.EOL}${response.stderr}`;
275+
}
276+
}
277+
private async execFileWithDaemon(
278+
fileName: string,
279+
args: string[],
280+
options: SpawnOptions
281+
): Promise<ExecutionResult<string>> {
282+
type ExecResponse = ErrorResponse & { stdout: string; stderr?: string };
283+
const request = new RequestType<
284+
// tslint:disable-next-line: no-any
285+
{ file_name: string; args: string[]; cwd?: string; env?: any },
286+
ExecResponse,
287+
void,
288+
void
289+
>('exec_file');
290+
const response = await this.sendRequest(request, {
291+
file_name: fileName,
292+
args,
293+
cwd: options.cwd,
294+
env: options.env
295+
});
296+
this.processResponse(response, options);
297+
return response;
298+
}
299+
private async execModuleWithDaemon(
300+
moduleName: string,
301+
args: string[],
302+
options: SpawnOptions
303+
): Promise<ExecutionResult<string>> {
304+
type ExecResponse = ErrorResponse & { stdout: string; stderr?: string };
305+
const request = new RequestType<
306+
// tslint:disable-next-line: no-any
307+
{ module_name: string; args: string[]; cwd?: string; env?: any },
308+
ExecResponse,
309+
void,
310+
void
311+
>('exec_module');
312+
const response = await this.sendRequest(request, {
313+
module_name: moduleName,
314+
args,
315+
cwd: options.cwd,
316+
env: options.env
317+
});
318+
this.processResponse(response, options);
319+
return response;
320+
}
321+
private monitorConnection() {
322+
// tslint:disable-next-line: no-any
323+
const logConnectionStatus = (msg: string, ex?: any) => {
324+
if (!this.disposed) {
325+
this.connectionClosedMessage += msg + (ex ? `, With Error: ${util.format(ex)}` : '');
326+
this.connectionClosedDeferred.reject(new ConnectionClosedError(this.connectionClosedMessage));
327+
traceWarning(msg);
328+
if (ex) {
329+
traceError('Connection errored', ex);
330+
}
331+
}
332+
};
333+
this.disposables.push(this.connection.onClose(() => logConnectionStatus('Daemon Connection Closed')));
334+
this.disposables.push(this.connection.onDispose(() => logConnectionStatus('Daemon Connection disposed')));
335+
this.disposables.push(this.connection.onError((ex) => logConnectionStatus('Daemon Connection errored', ex)));
336+
// this.proc.on('error', error => logConnectionStatus('Daemon Processed died with error', error));
337+
this.proc.on('exit', (code) => logConnectionStatus('Daemon Processed died with exit code', code));
338+
// Wire up stdout/stderr.
339+
const OuputNotification = new NotificationType<Output<string>, void>('output');
340+
this.connection.onNotification(OuputNotification, (output) => this.outputObservale.next(output));
341+
const logNotification = new NotificationType<
342+
{ level: 'WARN' | 'WARNING' | 'INFO' | 'DEBUG' | 'NOTSET'; msg: string; pid?: string },
343+
void
344+
>('log');
345+
this.connection.onNotification(logNotification, (output) => {
346+
const pid = output.pid ? ` (pid: ${output.pid})` : '';
347+
const msg = `Python Daemon${pid}: ${output.msg}`;
348+
if (output.level === 'DEBUG' || output.level === 'NOTSET') {
349+
traceVerbose(msg);
350+
} else if (output.level === 'INFO') {
351+
traceInfo(msg);
352+
} else if (output.level === 'WARN' || output.level === 'WARNING') {
353+
traceWarning(msg);
354+
} else {
355+
traceError(msg);
356+
}
357+
});
358+
this.connection.onUnhandledNotification(traceError);
359+
}
360+
}

0 commit comments

Comments
 (0)