-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Restart for raw sessions + rawSession now owns process lifetime #11230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fc91279
8772193
9a2d032
f33a4c5
4ecaaf2
21689e3
b746f42
e274ba6
839d076
1d6f18e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,24 +7,28 @@ import { Slot } from '@phosphor/signaling'; | |
import { Observable } from 'rxjs/Observable'; | ||
import { ReplaySubject } from 'rxjs/ReplaySubject'; | ||
import { Event, EventEmitter } from 'vscode'; | ||
import { CancellationToken } from 'vscode-jsonrpc'; | ||
import { ServerStatus } from '../../datascience-ui/interactive-common/mainState'; | ||
import { traceError } from '../common/logger'; | ||
import { isTestExecution } from '../common/constants'; | ||
import { traceError, traceInfo, traceWarning } from '../common/logger'; | ||
import { IDisposable } from '../common/types'; | ||
import { waitForPromise } from '../common/utils/async'; | ||
import * as localize from '../common/utils/localize'; | ||
import { noop } from '../common/utils/misc'; | ||
import { sendTelemetryEvent } from '../telemetry'; | ||
import { Telemetry } from './constants'; | ||
import { JupyterWebSockets } from './jupyter/jupyterWebSocket'; | ||
import { JupyterKernelPromiseFailedError } from './jupyter/kernels/jupyterKernelPromiseFailedError'; | ||
import { KernelSelector } from './jupyter/kernels/kernelSelector'; | ||
import { LiveKernelModel } from './jupyter/kernels/types'; | ||
import { IKernelProcess } from './kernel-launcher/types'; | ||
import { IJupyterKernelSpec, IJupyterSession, KernelSocketInformation } from './types'; | ||
|
||
export type ISession = Session.ISession & { | ||
/** | ||
* Whether this is a remote session that we attached to. | ||
* | ||
* @type {boolean} | ||
*/ | ||
// Whether this is a remote session that we attached to. | ||
isRemoteSession?: boolean; | ||
// If a kernel process is associated with this session | ||
process?: IKernelProcess; | ||
}; | ||
|
||
/** | ||
|
@@ -49,6 +53,24 @@ export abstract class BaseJupyterSession implements IJupyterSession { | |
protected set session(session: ISession | undefined) { | ||
const oldSession = this._session; | ||
this._session = session; | ||
|
||
// When setting the session clear our current exit handler and hook up to the | ||
// new session process | ||
if (this.processExitHandler) { | ||
this.processExitHandler.dispose(); | ||
this.processExitHandler = undefined; | ||
} | ||
if (session?.process) { | ||
// Watch to see if our process exits | ||
this.processExitHandler = session.process.exited((exitCode) => { | ||
traceError(`Raw kernel process exited code: ${exitCode}`); | ||
this.shutdown().catch((reason) => { | ||
traceError(`Error shutting down jupyter session: ${reason}`); | ||
}); | ||
// Next code the user executes will show a session disposed message | ||
}); | ||
} | ||
|
||
// If we have a new session, then emit the new kernel connection information. | ||
if (session && oldSession !== session) { | ||
const socket = JupyterWebSockets.get(session.kernel.id); | ||
|
@@ -68,6 +90,7 @@ export abstract class BaseJupyterSession implements IJupyterSession { | |
}); | ||
} | ||
} | ||
protected kernelSpec: IJupyterKernelSpec | LiveKernelModel | undefined; | ||
public get kernelSocket(): Observable<KernelSocketInformation | undefined> { | ||
return this._kernelSocket; | ||
} | ||
|
@@ -96,21 +119,53 @@ export abstract class BaseJupyterSession implements IJupyterSession { | |
protected onStatusChangedEvent: EventEmitter<ServerStatus> = new EventEmitter<ServerStatus>(); | ||
protected statusHandler: Slot<ISession, Kernel.Status>; | ||
protected connected: boolean = false; | ||
protected restartSessionPromise: Promise<ISession | undefined> | undefined; | ||
private _session: ISession | undefined; | ||
private _kernelSocket = new ReplaySubject<KernelSocketInformation | undefined>(); | ||
private _jupyterLab?: typeof import('@jupyterlab/services'); | ||
constructor() { | ||
private processExitHandler: IDisposable | undefined; | ||
|
||
constructor(protected readonly kernelSelector: KernelSelector) { | ||
this.statusHandler = this.onStatusChanged.bind(this); | ||
} | ||
public dispose(): Promise<void> { | ||
if (this.processExitHandler) { | ||
this.processExitHandler.dispose(); | ||
this.processExitHandler = undefined; | ||
} | ||
return this.shutdown(); | ||
} | ||
// Abstracts for each Session type to implement | ||
public abstract async shutdown(): Promise<void>; | ||
public abstract async restart(timeout: number): Promise<void>; | ||
public abstract async changeKernel(kernel: IJupyterKernelSpec | LiveKernelModel, timeoutMS: number): Promise<void>; | ||
public abstract async waitForIdle(timeout: number): Promise<void>; | ||
|
||
public async shutdown(): Promise<void> { | ||
if (this.processExitHandler) { | ||
this.processExitHandler.dispose(); | ||
this.processExitHandler = undefined; | ||
} | ||
|
||
if (this.session) { | ||
try { | ||
traceInfo('Shutdown session - current session'); | ||
await this.shutdownSession(this.session, this.statusHandler); | ||
traceInfo('Shutdown session - get restart session'); | ||
if (this.restartSessionPromise) { | ||
const restartSession = await this.restartSessionPromise; | ||
traceInfo('Shutdown session - shutdown restart session'); | ||
await this.shutdownSession(restartSession, undefined); | ||
} | ||
} catch { | ||
noop(); | ||
} | ||
this.session = undefined; | ||
this.restartSessionPromise = undefined; | ||
} | ||
if (this.onStatusChangedEvent) { | ||
this.onStatusChangedEvent.dispose(); | ||
} | ||
traceInfo('Shutdown session -- complete'); | ||
} | ||
public async interrupt(timeout: number): Promise<void> { | ||
if (this.session && this.session.kernel) { | ||
// Listen for session status changes | ||
|
@@ -124,6 +179,48 @@ export abstract class BaseJupyterSession implements IJupyterSession { | |
} | ||
} | ||
|
||
public async restart(_timeout: number): Promise<void> { | ||
if (this.session?.isRemoteSession) { | ||
await this.session.kernel.restart(); | ||
return; | ||
} | ||
|
||
// Start the restart session now in case it wasn't started | ||
if (!this.restartSessionPromise) { | ||
this.startRestartSession(); | ||
} | ||
|
||
// Just kill the current session and switch to the other | ||
if (this.restartSessionPromise && this.session) { | ||
traceInfo(`Restarting ${this.session.kernel.id}`); | ||
|
||
// Save old state for shutdown | ||
const oldSession = this.session; | ||
const oldStatusHandler = this.statusHandler; | ||
|
||
// Just switch to the other session. It should already be ready | ||
this.session = await this.restartSessionPromise; | ||
if (!this.session) { | ||
throw new Error(localize.DataScience.sessionDisposed()); | ||
} | ||
this.kernelSelector.removeKernelFromIgnoreList(this.session.kernel); | ||
traceInfo(`Got new session ${this.session.kernel.id}`); | ||
|
||
// Rewire our status changed event. | ||
this.session.statusChanged.connect(this.statusHandler); | ||
|
||
// After switching, start another in case we restart again. | ||
this.restartSessionPromise = this.createRestartSession(this.kernelSpec, oldSession); | ||
traceInfo('Started new restart session'); | ||
if (oldStatusHandler) { | ||
oldSession.statusChanged.disconnect(oldStatusHandler); | ||
} | ||
this.shutdownSession(oldSession, undefined).ignoreErrors(); | ||
} else { | ||
throw new Error(localize.DataScience.sessionDisposed()); | ||
} | ||
} | ||
|
||
public requestExecute( | ||
content: KernelMessage.IExecuteRequestMsg['content'], | ||
disposeOnDone?: boolean, | ||
|
@@ -228,6 +325,68 @@ export abstract class BaseJupyterSession implements IJupyterSession { | |
} | ||
} | ||
|
||
// Sub classes need to implement their own restarting specific code | ||
protected abstract startRestartSession(): void; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The restart code above is common and unchanged from previous jupyter code, but there are specific start and restart functions for the jupyter / raw subclasses. |
||
protected abstract async createRestartSession( | ||
kernelSpec: IJupyterKernelSpec | LiveKernelModel | undefined, | ||
session: ISession, | ||
cancelToken?: CancellationToken | ||
): Promise<ISession>; | ||
|
||
protected async shutdownSession( | ||
session: ISession | undefined, | ||
statusHandler: Slot<ISession, Kernel.Status> | undefined | ||
): Promise<void> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previous JupyterSession code for shutting down a session There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for comment, was trying to figure out what waas old and new. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I did that with the notebook provider refactor and it was really helpful. But I didn't here, will try to make sure that I do that in the future as much as I can. 👍 👍 For smaller PRs as well. |
||
if (session && session.kernel) { | ||
const kernelId = session.kernel.id; | ||
traceInfo(`shutdownSession ${kernelId} - start`); | ||
try { | ||
if (statusHandler) { | ||
session.statusChanged.disconnect(statusHandler); | ||
} | ||
// Do not shutdown remote sessions. | ||
if (session.isRemoteSession) { | ||
session.dispose(); | ||
return; | ||
} | ||
try { | ||
// When running under a test, mark all futures as done so we | ||
// don't hit this problem: | ||
// https://github.com/jupyterlab/jupyterlab/issues/4252 | ||
// tslint:disable:no-any | ||
if (isTestExecution()) { | ||
const defaultKernel = session.kernel as any; | ||
if (defaultKernel && defaultKernel._futures) { | ||
const futures = defaultKernel._futures as Map<any, any>; | ||
if (futures) { | ||
futures.forEach((f) => { | ||
if (f._status !== undefined) { | ||
f._status |= 4; | ||
} | ||
}); | ||
} | ||
} | ||
if (defaultKernel && defaultKernel._reconnectLimit) { | ||
defaultKernel._reconnectLimit = 0; | ||
} | ||
await waitForPromise(session.shutdown(), 1000); | ||
} else { | ||
// Shutdown may fail if the process has been killed | ||
await waitForPromise(session.shutdown(), 1000); | ||
} | ||
} catch { | ||
noop(); | ||
} | ||
if (session && !session.isDisposed) { | ||
session.dispose(); | ||
} | ||
} catch (e) { | ||
// Ignore, just trace. | ||
traceWarning(e); | ||
} | ||
traceInfo(`shutdownSession ${kernelId} - shutdown complete`); | ||
} | ||
} | ||
private getServerStatus(): ServerStatus { | ||
if (this.session) { | ||
switch (this.session.kernel.status) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the big chunks of changes in this file are not new changes, but refactors. Previous to this change I had restart and shutdown just implemented in the raw and jupyter session classes. Now they have moved into the base class so that they can share code. So shutdown and restart code was pulled from the jupyter specific class into the base class and changes were made to support that.