Skip to content

Restart for raw sessions + rawSession now owns process lifetime #11230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 168 additions & 9 deletions src/client/datascience/baseJupyterSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,28 @@ import { Slot } from '@phosphor/signaling';
import { Observable } from 'rxjs/Observable';
import { ReplaySubject } from 'rxjs/ReplaySubject';
import { Event, EventEmitter } from 'vscode';
import { CancellationToken } from 'vscode-jsonrpc';
import { ServerStatus } from '../../datascience-ui/interactive-common/mainState';
import { traceError } from '../common/logger';
import { isTestExecution } from '../common/constants';
import { traceError, traceInfo, traceWarning } from '../common/logger';
import { IDisposable } from '../common/types';
import { waitForPromise } from '../common/utils/async';
import * as localize from '../common/utils/localize';
import { noop } from '../common/utils/misc';
import { sendTelemetryEvent } from '../telemetry';
import { Telemetry } from './constants';
import { JupyterWebSockets } from './jupyter/jupyterWebSocket';
import { JupyterKernelPromiseFailedError } from './jupyter/kernels/jupyterKernelPromiseFailedError';
import { KernelSelector } from './jupyter/kernels/kernelSelector';
import { LiveKernelModel } from './jupyter/kernels/types';
import { IKernelProcess } from './kernel-launcher/types';
import { IJupyterKernelSpec, IJupyterSession, KernelSocketInformation } from './types';

export type ISession = Session.ISession & {
/**
* Whether this is a remote session that we attached to.
*
* @type {boolean}
*/
// Whether this is a remote session that we attached to.
isRemoteSession?: boolean;
// If a kernel process is associated with this session
process?: IKernelProcess;
};

/**
Expand All @@ -49,6 +53,24 @@ export abstract class BaseJupyterSession implements IJupyterSession {
protected set session(session: ISession | undefined) {
const oldSession = this._session;
this._session = session;

// When setting the session clear our current exit handler and hook up to the
// new session process
if (this.processExitHandler) {
this.processExitHandler.dispose();
this.processExitHandler = undefined;
}
if (session?.process) {
// Watch to see if our process exits
this.processExitHandler = session.process.exited((exitCode) => {
traceError(`Raw kernel process exited code: ${exitCode}`);
this.shutdown().catch((reason) => {
traceError(`Error shutting down jupyter session: ${reason}`);
});
// Next code the user executes will show a session disposed message
});
}

// If we have a new session, then emit the new kernel connection information.
if (session && oldSession !== session) {
const socket = JupyterWebSockets.get(session.kernel.id);
Expand All @@ -68,6 +90,7 @@ export abstract class BaseJupyterSession implements IJupyterSession {
});
}
}
protected kernelSpec: IJupyterKernelSpec | LiveKernelModel | undefined;
public get kernelSocket(): Observable<KernelSocketInformation | undefined> {
return this._kernelSocket;
}
Expand Down Expand Up @@ -96,21 +119,53 @@ export abstract class BaseJupyterSession implements IJupyterSession {
protected onStatusChangedEvent: EventEmitter<ServerStatus> = new EventEmitter<ServerStatus>();
protected statusHandler: Slot<ISession, Kernel.Status>;
protected connected: boolean = false;
protected restartSessionPromise: Promise<ISession | undefined> | undefined;
private _session: ISession | undefined;
private _kernelSocket = new ReplaySubject<KernelSocketInformation | undefined>();
private _jupyterLab?: typeof import('@jupyterlab/services');
constructor() {
private processExitHandler: IDisposable | undefined;

constructor(protected readonly kernelSelector: KernelSelector) {
this.statusHandler = this.onStatusChanged.bind(this);
}
public dispose(): Promise<void> {
if (this.processExitHandler) {
this.processExitHandler.dispose();
this.processExitHandler = undefined;
}
return this.shutdown();
}
// Abstracts for each Session type to implement
public abstract async shutdown(): Promise<void>;
public abstract async restart(timeout: number): Promise<void>;
public abstract async changeKernel(kernel: IJupyterKernelSpec | LiveKernelModel, timeoutMS: number): Promise<void>;
public abstract async waitForIdle(timeout: number): Promise<void>;

public async shutdown(): Promise<void> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the big chunks of changes in this file are not new changes, but refactors. Previous to this change I had restart and shutdown just implemented in the raw and jupyter session classes. Now they have moved into the base class so that they can share code. So shutdown and restart code was pulled from the jupyter specific class into the base class and changes were made to support that.

if (this.processExitHandler) {
this.processExitHandler.dispose();
this.processExitHandler = undefined;
}

if (this.session) {
try {
traceInfo('Shutdown session - current session');
await this.shutdownSession(this.session, this.statusHandler);
traceInfo('Shutdown session - get restart session');
if (this.restartSessionPromise) {
const restartSession = await this.restartSessionPromise;
traceInfo('Shutdown session - shutdown restart session');
await this.shutdownSession(restartSession, undefined);
}
} catch {
noop();
}
this.session = undefined;
this.restartSessionPromise = undefined;
}
if (this.onStatusChangedEvent) {
this.onStatusChangedEvent.dispose();
}
traceInfo('Shutdown session -- complete');
}
public async interrupt(timeout: number): Promise<void> {
if (this.session && this.session.kernel) {
// Listen for session status changes
Expand All @@ -124,6 +179,48 @@ export abstract class BaseJupyterSession implements IJupyterSession {
}
}

public async restart(_timeout: number): Promise<void> {
if (this.session?.isRemoteSession) {
await this.session.kernel.restart();
return;
}

// Start the restart session now in case it wasn't started
if (!this.restartSessionPromise) {
this.startRestartSession();
}

// Just kill the current session and switch to the other
if (this.restartSessionPromise && this.session) {
traceInfo(`Restarting ${this.session.kernel.id}`);

// Save old state for shutdown
const oldSession = this.session;
const oldStatusHandler = this.statusHandler;

// Just switch to the other session. It should already be ready
this.session = await this.restartSessionPromise;
if (!this.session) {
throw new Error(localize.DataScience.sessionDisposed());
}
this.kernelSelector.removeKernelFromIgnoreList(this.session.kernel);
traceInfo(`Got new session ${this.session.kernel.id}`);

// Rewire our status changed event.
this.session.statusChanged.connect(this.statusHandler);

// After switching, start another in case we restart again.
this.restartSessionPromise = this.createRestartSession(this.kernelSpec, oldSession);
traceInfo('Started new restart session');
if (oldStatusHandler) {
oldSession.statusChanged.disconnect(oldStatusHandler);
}
this.shutdownSession(oldSession, undefined).ignoreErrors();
} else {
throw new Error(localize.DataScience.sessionDisposed());
}
}

public requestExecute(
content: KernelMessage.IExecuteRequestMsg['content'],
disposeOnDone?: boolean,
Expand Down Expand Up @@ -228,6 +325,68 @@ export abstract class BaseJupyterSession implements IJupyterSession {
}
}

// Sub classes need to implement their own restarting specific code
protected abstract startRestartSession(): void;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The restart code above is common and unchanged from previous jupyter code, but there are specific start and restart functions for the jupyter / raw subclasses.

protected abstract async createRestartSession(
kernelSpec: IJupyterKernelSpec | LiveKernelModel | undefined,
session: ISession,
cancelToken?: CancellationToken
): Promise<ISession>;

protected async shutdownSession(
session: ISession | undefined,
statusHandler: Slot<ISession, Kernel.Status> | undefined
): Promise<void> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous JupyterSession code for shutting down a session

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for comment, was trying to figure out what waas old and new.
This is one of those cases where I feel splitting PR into two helps, one for refactoring and other for new changes. This way the refactoring code requires less review, as we're merely moving stuff around.
Also makes for smaller PRs..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I did that with the notebook provider refactor and it was really helpful. But I didn't here, will try to make sure that I do that in the future as much as I can. 👍 👍 For smaller PRs as well.

if (session && session.kernel) {
const kernelId = session.kernel.id;
traceInfo(`shutdownSession ${kernelId} - start`);
try {
if (statusHandler) {
session.statusChanged.disconnect(statusHandler);
}
// Do not shutdown remote sessions.
if (session.isRemoteSession) {
session.dispose();
return;
}
try {
// When running under a test, mark all futures as done so we
// don't hit this problem:
// https://github.com/jupyterlab/jupyterlab/issues/4252
// tslint:disable:no-any
if (isTestExecution()) {
const defaultKernel = session.kernel as any;
if (defaultKernel && defaultKernel._futures) {
const futures = defaultKernel._futures as Map<any, any>;
if (futures) {
futures.forEach((f) => {
if (f._status !== undefined) {
f._status |= 4;
}
});
}
}
if (defaultKernel && defaultKernel._reconnectLimit) {
defaultKernel._reconnectLimit = 0;
}
await waitForPromise(session.shutdown(), 1000);
} else {
// Shutdown may fail if the process has been killed
await waitForPromise(session.shutdown(), 1000);
}
} catch {
noop();
}
if (session && !session.isDisposed) {
session.dispose();
}
} catch (e) {
// Ignore, just trace.
traceWarning(e);
}
traceInfo(`shutdownSession ${kernelId} - shutdown complete`);
}
}
private getServerStatus(): ServerStatus {
if (this.session) {
switch (this.session.kernel.status) {
Expand Down
Loading