@@ -7,24 +7,28 @@ import { Slot } from '@phosphor/signaling';
7
7
import { Observable } from 'rxjs/Observable' ;
8
8
import { ReplaySubject } from 'rxjs/ReplaySubject' ;
9
9
import { Event , EventEmitter } from 'vscode' ;
10
+ import { CancellationToken } from 'vscode-jsonrpc' ;
10
11
import { ServerStatus } from '../../datascience-ui/interactive-common/mainState' ;
11
- import { traceError } from '../common/logger' ;
12
+ import { isTestExecution } from '../common/constants' ;
13
+ import { traceError , traceInfo , traceWarning } from '../common/logger' ;
14
+ import { IDisposable } from '../common/types' ;
12
15
import { waitForPromise } from '../common/utils/async' ;
13
16
import * as localize from '../common/utils/localize' ;
17
+ import { noop } from '../common/utils/misc' ;
14
18
import { sendTelemetryEvent } from '../telemetry' ;
15
19
import { Telemetry } from './constants' ;
16
20
import { JupyterWebSockets } from './jupyter/jupyterWebSocket' ;
17
21
import { JupyterKernelPromiseFailedError } from './jupyter/kernels/jupyterKernelPromiseFailedError' ;
22
+ import { KernelSelector } from './jupyter/kernels/kernelSelector' ;
18
23
import { LiveKernelModel } from './jupyter/kernels/types' ;
24
+ import { IKernelProcess } from './kernel-launcher/types' ;
19
25
import { IJupyterKernelSpec , IJupyterSession , KernelSocketInformation } from './types' ;
20
26
21
27
export type ISession = Session . ISession & {
22
- /**
23
- * Whether this is a remote session that we attached to.
24
- *
25
- * @type {boolean }
26
- */
28
+ // Whether this is a remote session that we attached to.
27
29
isRemoteSession ?: boolean ;
30
+ // If a kernel process is associated with this session
31
+ process ?: IKernelProcess ;
28
32
} ;
29
33
30
34
/**
@@ -49,6 +53,24 @@ export abstract class BaseJupyterSession implements IJupyterSession {
49
53
protected set session ( session : ISession | undefined ) {
50
54
const oldSession = this . _session ;
51
55
this . _session = session ;
56
+
57
+ // When setting the session clear our current exit handler and hook up to the
58
+ // new session process
59
+ if ( this . processExitHandler ) {
60
+ this . processExitHandler . dispose ( ) ;
61
+ this . processExitHandler = undefined ;
62
+ }
63
+ if ( session ?. process ) {
64
+ // Watch to see if our process exits
65
+ this . processExitHandler = session . process . exited ( ( exitCode ) => {
66
+ traceError ( `Raw kernel process exited code: ${ exitCode } ` ) ;
67
+ this . shutdown ( ) . catch ( ( reason ) => {
68
+ traceError ( `Error shutting down jupyter session: ${ reason } ` ) ;
69
+ } ) ;
70
+ // Next code the user executes will show a session disposed message
71
+ } ) ;
72
+ }
73
+
52
74
// If we have a new session, then emit the new kernel connection information.
53
75
if ( session && oldSession !== session ) {
54
76
const socket = JupyterWebSockets . get ( session . kernel . id ) ;
@@ -68,6 +90,7 @@ export abstract class BaseJupyterSession implements IJupyterSession {
68
90
} ) ;
69
91
}
70
92
}
93
+ protected kernelSpec : IJupyterKernelSpec | LiveKernelModel | undefined ;
71
94
public get kernelSocket ( ) : Observable < KernelSocketInformation | undefined > {
72
95
return this . _kernelSocket ;
73
96
}
@@ -96,21 +119,53 @@ export abstract class BaseJupyterSession implements IJupyterSession {
96
119
protected onStatusChangedEvent : EventEmitter < ServerStatus > = new EventEmitter < ServerStatus > ( ) ;
97
120
protected statusHandler : Slot < ISession , Kernel . Status > ;
98
121
protected connected : boolean = false ;
122
+ protected restartSessionPromise : Promise < ISession | undefined > | undefined ;
99
123
private _session : ISession | undefined ;
100
124
private _kernelSocket = new ReplaySubject < KernelSocketInformation | undefined > ( ) ;
101
125
private _jupyterLab ?: typeof import ( '@jupyterlab/services' ) ;
102
- constructor ( ) {
126
+ private processExitHandler : IDisposable | undefined ;
127
+
128
+ constructor ( protected readonly kernelSelector : KernelSelector ) {
103
129
this . statusHandler = this . onStatusChanged . bind ( this ) ;
104
130
}
105
131
public dispose ( ) : Promise < void > {
132
+ if ( this . processExitHandler ) {
133
+ this . processExitHandler . dispose ( ) ;
134
+ this . processExitHandler = undefined ;
135
+ }
106
136
return this . shutdown ( ) ;
107
137
}
108
138
// Abstracts for each Session type to implement
109
- public abstract async shutdown ( ) : Promise < void > ;
110
- public abstract async restart ( timeout : number ) : Promise < void > ;
111
139
public abstract async changeKernel ( kernel : IJupyterKernelSpec | LiveKernelModel , timeoutMS : number ) : Promise < void > ;
112
140
public abstract async waitForIdle ( timeout : number ) : Promise < void > ;
113
141
142
+ public async shutdown ( ) : Promise < void > {
143
+ if ( this . processExitHandler ) {
144
+ this . processExitHandler . dispose ( ) ;
145
+ this . processExitHandler = undefined ;
146
+ }
147
+
148
+ if ( this . session ) {
149
+ try {
150
+ traceInfo ( 'Shutdown session - current session' ) ;
151
+ await this . shutdownSession ( this . session , this . statusHandler ) ;
152
+ traceInfo ( 'Shutdown session - get restart session' ) ;
153
+ if ( this . restartSessionPromise ) {
154
+ const restartSession = await this . restartSessionPromise ;
155
+ traceInfo ( 'Shutdown session - shutdown restart session' ) ;
156
+ await this . shutdownSession ( restartSession , undefined ) ;
157
+ }
158
+ } catch {
159
+ noop ( ) ;
160
+ }
161
+ this . session = undefined ;
162
+ this . restartSessionPromise = undefined ;
163
+ }
164
+ if ( this . onStatusChangedEvent ) {
165
+ this . onStatusChangedEvent . dispose ( ) ;
166
+ }
167
+ traceInfo ( 'Shutdown session -- complete' ) ;
168
+ }
114
169
public async interrupt ( timeout : number ) : Promise < void > {
115
170
if ( this . session && this . session . kernel ) {
116
171
// Listen for session status changes
@@ -124,6 +179,48 @@ export abstract class BaseJupyterSession implements IJupyterSession {
124
179
}
125
180
}
126
181
182
+ public async restart ( _timeout : number ) : Promise < void > {
183
+ if ( this . session ?. isRemoteSession ) {
184
+ await this . session . kernel . restart ( ) ;
185
+ return ;
186
+ }
187
+
188
+ // Start the restart session now in case it wasn't started
189
+ if ( ! this . restartSessionPromise ) {
190
+ this . startRestartSession ( ) ;
191
+ }
192
+
193
+ // Just kill the current session and switch to the other
194
+ if ( this . restartSessionPromise && this . session ) {
195
+ traceInfo ( `Restarting ${ this . session . kernel . id } ` ) ;
196
+
197
+ // Save old state for shutdown
198
+ const oldSession = this . session ;
199
+ const oldStatusHandler = this . statusHandler ;
200
+
201
+ // Just switch to the other session. It should already be ready
202
+ this . session = await this . restartSessionPromise ;
203
+ if ( ! this . session ) {
204
+ throw new Error ( localize . DataScience . sessionDisposed ( ) ) ;
205
+ }
206
+ this . kernelSelector . removeKernelFromIgnoreList ( this . session . kernel ) ;
207
+ traceInfo ( `Got new session ${ this . session . kernel . id } ` ) ;
208
+
209
+ // Rewire our status changed event.
210
+ this . session . statusChanged . connect ( this . statusHandler ) ;
211
+
212
+ // After switching, start another in case we restart again.
213
+ this . restartSessionPromise = this . createRestartSession ( this . kernelSpec , oldSession ) ;
214
+ traceInfo ( 'Started new restart session' ) ;
215
+ if ( oldStatusHandler ) {
216
+ oldSession . statusChanged . disconnect ( oldStatusHandler ) ;
217
+ }
218
+ this . shutdownSession ( oldSession , undefined ) . ignoreErrors ( ) ;
219
+ } else {
220
+ throw new Error ( localize . DataScience . sessionDisposed ( ) ) ;
221
+ }
222
+ }
223
+
127
224
public requestExecute (
128
225
content : KernelMessage . IExecuteRequestMsg [ 'content' ] ,
129
226
disposeOnDone ?: boolean ,
@@ -228,6 +325,68 @@ export abstract class BaseJupyterSession implements IJupyterSession {
228
325
}
229
326
}
230
327
328
+ // Sub classes need to implement their own restarting specific code
329
+ protected abstract startRestartSession ( ) : void ;
330
+ protected abstract async createRestartSession (
331
+ kernelSpec : IJupyterKernelSpec | LiveKernelModel | undefined ,
332
+ session : ISession ,
333
+ cancelToken ?: CancellationToken
334
+ ) : Promise < ISession > ;
335
+
336
+ protected async shutdownSession (
337
+ session : ISession | undefined ,
338
+ statusHandler : Slot < ISession , Kernel . Status > | undefined
339
+ ) : Promise < void > {
340
+ if ( session && session . kernel ) {
341
+ const kernelId = session . kernel . id ;
342
+ traceInfo ( `shutdownSession ${ kernelId } - start` ) ;
343
+ try {
344
+ if ( statusHandler ) {
345
+ session . statusChanged . disconnect ( statusHandler ) ;
346
+ }
347
+ // Do not shutdown remote sessions.
348
+ if ( session . isRemoteSession ) {
349
+ session . dispose ( ) ;
350
+ return ;
351
+ }
352
+ try {
353
+ // When running under a test, mark all futures as done so we
354
+ // don't hit this problem:
355
+ // https://github.com/jupyterlab/jupyterlab/issues/4252
356
+ // tslint:disable:no-any
357
+ if ( isTestExecution ( ) ) {
358
+ const defaultKernel = session . kernel as any ;
359
+ if ( defaultKernel && defaultKernel . _futures ) {
360
+ const futures = defaultKernel . _futures as Map < any , any > ;
361
+ if ( futures ) {
362
+ futures . forEach ( ( f ) => {
363
+ if ( f . _status !== undefined ) {
364
+ f . _status |= 4 ;
365
+ }
366
+ } ) ;
367
+ }
368
+ }
369
+ if ( defaultKernel && defaultKernel . _reconnectLimit ) {
370
+ defaultKernel . _reconnectLimit = 0 ;
371
+ }
372
+ await waitForPromise ( session . shutdown ( ) , 1000 ) ;
373
+ } else {
374
+ // Shutdown may fail if the process has been killed
375
+ await waitForPromise ( session . shutdown ( ) , 1000 ) ;
376
+ }
377
+ } catch {
378
+ noop ( ) ;
379
+ }
380
+ if ( session && ! session . isDisposed ) {
381
+ session . dispose ( ) ;
382
+ }
383
+ } catch ( e ) {
384
+ // Ignore, just trace.
385
+ traceWarning ( e ) ;
386
+ }
387
+ traceInfo ( `shutdownSession ${ kernelId } - shutdown complete` ) ;
388
+ }
389
+ }
231
390
private getServerStatus ( ) : ServerStatus {
232
391
if ( this . session ) {
233
392
switch ( this . session . kernel . status ) {
0 commit comments