@@ -408,10 +408,16 @@ def __init__(self, concurrency=0xffffffff):
408
408
self ._unregistered = []
409
409
self ._stopped_serving = weakref .WeakSet ()
410
410
411
+ def _check_closed (self ):
412
+ if self ._iocp is None :
413
+ raise RuntimeError ('IocpProactor is closed' )
414
+
411
415
def __repr__ (self ):
412
- return ('<%s overlapped#=%s result#=%s>'
413
- % (self .__class__ .__name__ , len (self ._cache ),
414
- len (self ._results )))
416
+ info = ['overlapped#=%s' % len (self ._cache ),
417
+ 'result#=%s' % len (self ._results )]
418
+ if self ._iocp is None :
419
+ info .append ('closed' )
420
+ return '<%s %s>' % (self .__class__ .__name__ , " " .join (info ))
415
421
416
422
def set_loop (self , loop ):
417
423
self ._loop = loop
@@ -618,6 +624,8 @@ def _wait_cancel(self, event, done_callback):
618
624
return fut
619
625
620
626
def _wait_for_handle (self , handle , timeout , _is_cancel ):
627
+ self ._check_closed ()
628
+
621
629
if timeout is None :
622
630
ms = _winapi .INFINITE
623
631
else :
@@ -660,6 +668,8 @@ def _register_with_iocp(self, obj):
660
668
# that succeed immediately.
661
669
662
670
def _register (self , ov , obj , callback ):
671
+ self ._check_closed ()
672
+
663
673
# Return a future which will be set with the result of the
664
674
# operation when it completes. The future's value is actually
665
675
# the value returned by callback().
@@ -696,6 +706,7 @@ def _unregister(self, ov):
696
706
already be signalled (pending in the proactor event queue). It is also
697
707
safe if the event is never signalled (because it was cancelled).
698
708
"""
709
+ self ._check_closed ()
699
710
self ._unregistered .append (ov )
700
711
701
712
def _get_accept_socket (self , family ):
@@ -765,6 +776,10 @@ def _stop_serving(self, obj):
765
776
self ._stopped_serving .add (obj )
766
777
767
778
def close (self ):
779
+ if self ._iocp is None :
780
+ # already closed
781
+ return
782
+
768
783
# Cancel remaining registered operations.
769
784
for address , (fut , ov , obj , callback ) in list (self ._cache .items ()):
770
785
if fut .cancelled ():
@@ -787,14 +802,15 @@ def close(self):
787
802
context ['source_traceback' ] = fut ._source_traceback
788
803
self ._loop .call_exception_handler (context )
789
804
805
+ # wait until all cancelled overlapped future complete
790
806
while self ._cache :
791
807
if not self ._poll (1 ):
792
808
logger .debug ('taking long time to close proactor' )
793
809
794
810
self ._results = []
795
- if self . _iocp is not None :
796
- _winapi .CloseHandle (self ._iocp )
797
- self ._iocp = None
811
+
812
+ _winapi .CloseHandle (self ._iocp )
813
+ self ._iocp = None
798
814
799
815
def __del__ (self ):
800
816
self .close ()
0 commit comments