Skip to content

Commit 7e8c571

Browse files
gh-128340: add thread safe handle for loop.call_soon_threadsafe (#128369)
Adds `_ThreadSafeHandle` to be used for callbacks scheduled with `loop.call_soon_threadsafe`.
1 parent 657d7b7 commit 7e8c571

File tree

4 files changed

+151
-1
lines changed

4 files changed

+151
-1
lines changed

Lib/asyncio/base_events.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -873,7 +873,10 @@ def call_soon_threadsafe(self, callback, *args, context=None):
873873
self._check_closed()
874874
if self._debug:
875875
self._check_callback(callback, 'call_soon_threadsafe')
876-
handle = self._call_soon(callback, args, context)
876+
handle = events._ThreadSafeHandle(callback, args, self, context)
877+
self._ready.append(handle)
878+
if handle._source_traceback:
879+
del handle._source_traceback[-1]
877880
if handle._source_traceback:
878881
del handle._source_traceback[-1]
879882
self._write_to_self()

Lib/asyncio/events.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,34 @@ def _run(self):
113113
self._loop.call_exception_handler(context)
114114
self = None # Needed to break cycles when an exception occurs.
115115

116+
# _ThreadSafeHandle is used for callbacks scheduled with call_soon_threadsafe
117+
# and is thread safe unlike Handle which is not thread safe.
118+
class _ThreadSafeHandle(Handle):
119+
120+
__slots__ = ('_lock',)
121+
122+
def __init__(self, callback, args, loop, context=None):
123+
super().__init__(callback, args, loop, context)
124+
self._lock = threading.RLock()
125+
126+
def cancel(self):
127+
with self._lock:
128+
return super().cancel()
129+
130+
def cancelled(self):
131+
with self._lock:
132+
return super().cancelled()
133+
134+
def _run(self):
135+
# The event loop checks for cancellation without holding the lock
136+
# It is possible that the handle is cancelled after the check
137+
# but before the callback is called so check it again after acquiring
138+
# the lock and return without calling the callback if it is cancelled.
139+
with self._lock:
140+
if self._cancelled:
141+
return
142+
return super()._run()
143+
116144

117145
class TimerHandle(Handle):
118146
"""Object returned by timed callback registration methods."""

Lib/test/test_asyncio/test_events.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,124 @@ def run_in_thread():
353353
t.join()
354354
self.assertEqual(results, ['hello', 'world'])
355355

356+
def test_call_soon_threadsafe_handle_block_check_cancelled(self):
357+
results = []
358+
359+
callback_started = threading.Event()
360+
callback_finished = threading.Event()
361+
def callback(arg):
362+
callback_started.set()
363+
results.append(arg)
364+
time.sleep(1)
365+
callback_finished.set()
366+
367+
def run_in_thread():
368+
handle = self.loop.call_soon_threadsafe(callback, 'hello')
369+
self.assertIsInstance(handle, events._ThreadSafeHandle)
370+
callback_started.wait()
371+
# callback started so it should block checking for cancellation
372+
# until it finishes
373+
self.assertFalse(handle.cancelled())
374+
self.assertTrue(callback_finished.is_set())
375+
self.loop.call_soon_threadsafe(self.loop.stop)
376+
377+
t = threading.Thread(target=run_in_thread)
378+
t.start()
379+
380+
self.loop.run_forever()
381+
t.join()
382+
self.assertEqual(results, ['hello'])
383+
384+
def test_call_soon_threadsafe_handle_block_cancellation(self):
385+
results = []
386+
387+
callback_started = threading.Event()
388+
callback_finished = threading.Event()
389+
def callback(arg):
390+
callback_started.set()
391+
results.append(arg)
392+
time.sleep(1)
393+
callback_finished.set()
394+
395+
def run_in_thread():
396+
handle = self.loop.call_soon_threadsafe(callback, 'hello')
397+
self.assertIsInstance(handle, events._ThreadSafeHandle)
398+
callback_started.wait()
399+
# callback started so it cannot be cancelled from other thread until
400+
# it finishes
401+
handle.cancel()
402+
self.assertTrue(callback_finished.is_set())
403+
self.loop.call_soon_threadsafe(self.loop.stop)
404+
405+
t = threading.Thread(target=run_in_thread)
406+
t.start()
407+
408+
self.loop.run_forever()
409+
t.join()
410+
self.assertEqual(results, ['hello'])
411+
412+
def test_call_soon_threadsafe_handle_cancel_same_thread(self):
413+
results = []
414+
callback_started = threading.Event()
415+
callback_finished = threading.Event()
416+
417+
fut = concurrent.futures.Future()
418+
def callback(arg):
419+
callback_started.set()
420+
handle = fut.result()
421+
handle.cancel()
422+
results.append(arg)
423+
callback_finished.set()
424+
self.loop.stop()
425+
426+
def run_in_thread():
427+
handle = self.loop.call_soon_threadsafe(callback, 'hello')
428+
fut.set_result(handle)
429+
self.assertIsInstance(handle, events._ThreadSafeHandle)
430+
callback_started.wait()
431+
# callback cancels itself from same thread so it has no effect
432+
# it runs to completion
433+
self.assertTrue(handle.cancelled())
434+
self.assertTrue(callback_finished.is_set())
435+
self.loop.call_soon_threadsafe(self.loop.stop)
436+
437+
t = threading.Thread(target=run_in_thread)
438+
t.start()
439+
440+
self.loop.run_forever()
441+
t.join()
442+
self.assertEqual(results, ['hello'])
443+
444+
def test_call_soon_threadsafe_handle_cancel_other_thread(self):
445+
results = []
446+
ev = threading.Event()
447+
448+
callback_finished = threading.Event()
449+
def callback(arg):
450+
results.append(arg)
451+
callback_finished.set()
452+
self.loop.stop()
453+
454+
def run_in_thread():
455+
handle = self.loop.call_soon_threadsafe(callback, 'hello')
456+
# handle can be cancelled from other thread if not started yet
457+
self.assertIsInstance(handle, events._ThreadSafeHandle)
458+
handle.cancel()
459+
self.assertTrue(handle.cancelled())
460+
self.assertFalse(callback_finished.is_set())
461+
ev.set()
462+
self.loop.call_soon_threadsafe(self.loop.stop)
463+
464+
# block the main loop until the callback is added and cancelled in the
465+
# other thread
466+
self.loop.call_soon(ev.wait)
467+
t = threading.Thread(target=run_in_thread)
468+
t.start()
469+
self.loop.run_forever()
470+
t.join()
471+
self.assertEqual(results, [])
472+
self.assertFalse(callback_finished.is_set())
473+
356474
def test_call_soon_threadsafe_same_thread(self):
357475
results = []
358476

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add internal thread safe handle to be used in :meth:`asyncio.loop.call_soon_threadsafe` for thread safe cancellation.

0 commit comments

Comments
 (0)