@@ -149,6 +149,7 @@ def _default_kernel_buffers(self):
149
149
150
150
def __init__ (self , ** kwargs ):
151
151
self .pinned_superclass = MultiKernelManager
152
+ self ._pending_kernel_tasks = {}
152
153
self .pinned_superclass .__init__ (self , ** kwargs )
153
154
self .last_kernel_activity = utcnow ()
154
155
@@ -216,9 +217,11 @@ async def start_kernel(self, kernel_id=None, path=None, **kwargs):
216
217
kwargs ["kernel_id" ] = kernel_id
217
218
kernel_id = await ensure_async (self .pinned_superclass .start_kernel (self , ** kwargs ))
218
219
self ._kernel_connections [kernel_id ] = 0
219
- fut = asyncio .ensure_future (self ._finish_kernel_start (kernel_id ))
220
+ task = asyncio .create_task (self ._finish_kernel_start (kernel_id ))
220
221
if not getattr (self , "use_pending_kernels" , None ):
221
- await fut
222
+ await task
223
+ else :
224
+ self ._pending_kernel_tasks [kernel_id ] = task
222
225
# add busy/activity markers:
223
226
kernel = self .get_kernel (kernel_id )
224
227
kernel .execution_state = "starting"
@@ -372,7 +375,7 @@ def stop_buffering(self, kernel_id):
372
375
buffer_info = self ._kernel_buffers .pop (kernel_id )
373
376
# close buffering streams
374
377
for stream in buffer_info ["channels" ].values ():
375
- if not stream .closed () :
378
+ if not stream .socket . closed :
376
379
stream .on_recv (None )
377
380
stream .close ()
378
381
@@ -387,13 +390,19 @@ def stop_buffering(self, kernel_id):
387
390
def shutdown_kernel (self , kernel_id , now = False , restart = False ):
388
391
"""Shutdown a kernel by kernel_id"""
389
392
self ._check_kernel_id (kernel_id )
390
- self .stop_watching_activity (kernel_id )
391
- self .stop_buffering (kernel_id )
392
393
393
394
# Decrease the metric of number of kernels
394
395
# running for the relevant kernel type by 1
395
396
KERNEL_CURRENTLY_RUNNING_TOTAL .labels (type = self ._kernels [kernel_id ].kernel_name ).dec ()
396
397
398
+ if kernel_id in self ._pending_kernel_tasks :
399
+ task = self ._pending_kernel_tasks .pop (kernel_id )
400
+ task .cancel ()
401
+ return
402
+
403
+ self .stop_watching_activity (kernel_id )
404
+ self .stop_buffering (kernel_id )
405
+
397
406
self .pinned_superclass .shutdown_kernel (self , kernel_id , now = now , restart = restart )
398
407
399
408
async def restart_kernel (self , kernel_id , now = False ):
@@ -533,7 +542,8 @@ def stop_watching_activity(self, kernel_id):
533
542
"""Stop watching IOPub messages on a kernel for activity."""
534
543
kernel = self ._kernels [kernel_id ]
535
544
if getattr (kernel , "_activity_stream" , None ):
536
- kernel ._activity_stream .close ()
545
+ if not kernel ._activity_stream .socket .closed :
546
+ kernel ._activity_stream .close ()
537
547
kernel ._activity_stream = None
538
548
539
549
def initialize_culler (self ):
@@ -638,19 +648,25 @@ def __init__(self, **kwargs):
638
648
self .pinned_superclass = AsyncMultiKernelManager
639
649
self .pinned_superclass .__init__ (self , ** kwargs )
640
650
self .last_kernel_activity = utcnow ()
651
+ self ._pending_kernel_tasks = {}
641
652
642
653
async def shutdown_kernel (self , kernel_id , now = False , restart = False ):
643
654
"""Shutdown a kernel by kernel_id"""
644
655
self ._check_kernel_id (kernel_id )
645
- self .stop_watching_activity (kernel_id )
646
- self .stop_buffering (kernel_id )
647
656
648
657
# Decrease the metric of number of kernels
649
658
# running for the relevant kernel type by 1
650
659
KERNEL_CURRENTLY_RUNNING_TOTAL .labels (type = self ._kernels [kernel_id ].kernel_name ).dec ()
651
660
661
+ if kernel_id in self ._pending_kernel_tasks :
662
+ task = self ._pending_kernel_tasks .pop (kernel_id )
663
+ task .cancel ()
664
+ return
665
+
666
+ self .stop_watching_activity (kernel_id )
667
+ self .stop_buffering (kernel_id )
668
+
652
669
# Finish shutting down the kernel before clearing state to avoid a race condition.
653
- ret = await self .pinned_superclass .shutdown_kernel (
670
+ return await self .pinned_superclass .shutdown_kernel (
654
671
self , kernel_id , now = now , restart = restart
655
672
)
656
- return ret
0 commit comments