Skip to content

bpo-31234: Add test.support.wait_threads_exit() #3578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions Lib/test/lock_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ def __init__(self, f, n, wait_before_exit=False):
self.started = []
self.finished = []
self._can_exit = not wait_before_exit
self.wait_thread = support.wait_threads_exit()
self.wait_thread.__enter__()

def task():
tid = threading.get_ident()
self.started.append(tid)
Expand All @@ -40,6 +43,7 @@ def task():
self.finished.append(tid)
while not self._can_exit:
_wait()

try:
for i in range(n):
start_new_thread(task, ())
Expand All @@ -54,13 +58,8 @@ def wait_for_started(self):
def wait_for_finished(self):
while len(self.finished) < self.n:
_wait()
# Wait a little bit longer to prevent the "threading_cleanup()
# failed to cleanup X threads" warning. The loop above is a weak
# synchronization. At the C level, t_bootstrap() can still be
# running and so _thread.count() still accounts the "almost dead"
# thead.
for _ in range(self.n):
_wait()
# Wait for threads exit
self.wait_thread.__exit__(None, None, None)

def do_finish(self):
self._can_exit = True
Expand Down Expand Up @@ -227,20 +226,23 @@ def test_reacquire(self):
# Lock needs to be released before re-acquiring.
lock = self.locktype()
phase = []

def f():
lock.acquire()
phase.append(None)
lock.acquire()
phase.append(None)
start_new_thread(f, ())
while len(phase) == 0:
_wait()
_wait()
self.assertEqual(len(phase), 1)
lock.release()
while len(phase) == 1:

with support.wait_threads_exit():
start_new_thread(f, ())
while len(phase) == 0:
_wait()
_wait()
self.assertEqual(len(phase), 2)
self.assertEqual(len(phase), 1)
lock.release()
while len(phase) == 1:
_wait()
self.assertEqual(len(phase), 2)

def test_different_thread(self):
# Lock can be released from a different thread.
Expand Down
35 changes: 35 additions & 0 deletions Lib/test/support/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2072,6 +2072,41 @@ def decorator(*args):
return decorator


@contextlib.contextmanager
def wait_threads_exit(timeout=60.0):
"""
bpo-31234: Context manager to wait until all threads created in the with
statement exit.

Use _thread.count() to check if threads exited. Indirectly, wait until
threads exit the internal t_bootstrap() C function of the _thread module.

threading_setup() and threading_cleanup() are designed to emit a warning
if a test leaves running threads in the background. This context manager
is designed to cleanup threads started by the _thread.start_new_thread()
which doesn't allow to wait for thread exit, whereas thread.Thread has a
join() method.
"""
old_count = _thread._count()
try:
yield
finally:
start_time = time.monotonic()
deadline = start_time + timeout
while True:
count = _thread._count()
if count <= old_count:
break
if time.monotonic() > deadline:
dt = time.monotonic() - start_time
msg = (f"wait_threads() failed to cleanup {count - old_count} "
f"threads after {dt:.1f} seconds "
f"(count: {count}, old count: {old_count})")
raise AssertionError(msg)
time.sleep(0.010)
gc_collect()


def reap_children():
"""Use this function at the end of test_main() whenever sub-processes
are started. This will help ensure that no extra children (zombies)
Expand Down
4 changes: 4 additions & 0 deletions Lib/test/test_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ def serverExplicitReady(self):
self.server_ready.set()

def _setUp(self):
self.wait_threads = support.wait_threads_exit()
self.wait_threads.__enter__()

self.server_ready = threading.Event()
self.client_ready = threading.Event()
self.done = threading.Event()
Expand All @@ -297,6 +300,7 @@ def _setUp(self):
def _tearDown(self):
self.__tearDown()
self.done.wait()
self.wait_threads.__exit__(None, None, None)

if self.queue.qsize():
exc = self.queue.get()
Expand Down
93 changes: 47 additions & 46 deletions Lib/test/test_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ def task(self, ident):
self.done_mutex.release()

def test_starting_threads(self):
# Basic test for thread creation.
for i in range(NUMTASKS):
self.newtask()
verbose_print("waiting for tasks to complete...")
self.done_mutex.acquire()
verbose_print("all tasks done")
with support.wait_threads_exit():
# Basic test for thread creation.
for i in range(NUMTASKS):
self.newtask()
verbose_print("waiting for tasks to complete...")
self.done_mutex.acquire()
verbose_print("all tasks done")

def test_stack_size(self):
# Various stack size tests.
Expand Down Expand Up @@ -94,12 +95,13 @@ def test_nt_and_posix_stack_size(self):
verbose_print("trying stack_size = (%d)" % tss)
self.next_ident = 0
self.created = 0
for i in range(NUMTASKS):
self.newtask()
with support.wait_threads_exit():
for i in range(NUMTASKS):
self.newtask()

verbose_print("waiting for all tasks to complete")
self.done_mutex.acquire()
verbose_print("all tasks done")
verbose_print("waiting for all tasks to complete")
self.done_mutex.acquire()
verbose_print("all tasks done")

thread.stack_size(0)

Expand All @@ -109,25 +111,28 @@ def test__count(self):
mut = thread.allocate_lock()
mut.acquire()
started = []

def task():
started.append(None)
mut.acquire()
mut.release()
thread.start_new_thread(task, ())
while not started:
time.sleep(POLL_SLEEP)
self.assertEqual(thread._count(), orig + 1)
# Allow the task to finish.
mut.release()
# The only reliable way to be sure that the thread ended from the
# interpreter's point of view is to wait for the function object to be
# destroyed.
done = []
wr = weakref.ref(task, lambda _: done.append(None))
del task
while not done:
time.sleep(POLL_SLEEP)
self.assertEqual(thread._count(), orig)

with support.wait_threads_exit():
thread.start_new_thread(task, ())
while not started:
time.sleep(POLL_SLEEP)
self.assertEqual(thread._count(), orig + 1)
# Allow the task to finish.
mut.release()
# The only reliable way to be sure that the thread ended from the
# interpreter's point of view is to wait for the function object to be
# destroyed.
done = []
wr = weakref.ref(task, lambda _: done.append(None))
del task
while not done:
time.sleep(POLL_SLEEP)
self.assertEqual(thread._count(), orig)

def test_save_exception_state_on_error(self):
# See issue #14474
Expand All @@ -140,16 +145,14 @@ def mywrite(self, *args):
except ValueError:
pass
real_write(self, *args)
c = thread._count()
started = thread.allocate_lock()
with support.captured_output("stderr") as stderr:
real_write = stderr.write
stderr.write = mywrite
started.acquire()
thread.start_new_thread(task, ())
started.acquire()
while thread._count() > c:
time.sleep(POLL_SLEEP)
with support.wait_threads_exit():
thread.start_new_thread(task, ())
started.acquire()
self.assertIn("Traceback", stderr.getvalue())


Expand Down Expand Up @@ -181,13 +184,14 @@ def enter(self):
class BarrierTest(BasicThreadTest):

def test_barrier(self):
self.bar = Barrier(NUMTASKS)
self.running = NUMTASKS
for i in range(NUMTASKS):
thread.start_new_thread(self.task2, (i,))
verbose_print("waiting for tasks to end")
self.done_mutex.acquire()
verbose_print("tasks done")
with support.wait_threads_exit():
self.bar = Barrier(NUMTASKS)
self.running = NUMTASKS
for i in range(NUMTASKS):
thread.start_new_thread(self.task2, (i,))
verbose_print("waiting for tasks to end")
self.done_mutex.acquire()
verbose_print("tasks done")

def task2(self, ident):
for i in range(NUMTRIPS):
Expand Down Expand Up @@ -225,11 +229,10 @@ def setUp(self):
@unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork')
@support.reap_threads
def test_forkinthread(self):
running = True
status = "not set"

def thread1():
nonlocal running, status
nonlocal status

# fork in a thread
pid = os.fork()
Expand All @@ -244,13 +247,11 @@ def thread1():
# parent
os.close(self.write_fd)
pid, status = os.waitpid(pid, 0)
running = False

thread.start_new_thread(thread1, ())
self.assertEqual(os.read(self.read_fd, 2), b"OK",
"Unable to fork() in thread")
while running:
time.sleep(POLL_SLEEP)
with support.wait_threads_exit():
thread.start_new_thread(thread1, ())
self.assertEqual(os.read(self.read_fd, 2), b"OK",
"Unable to fork() in thread")
self.assertEqual(status, 0)

def tearDown(self):
Expand Down
14 changes: 8 additions & 6 deletions Lib/test/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ def f():
done.set()
done = threading.Event()
ident = []
_thread.start_new_thread(f, ())
done.wait()
self.assertIsNotNone(ident[0])
with support.wait_threads_exit():
tid = _thread.start_new_thread(f, ())
done.wait()
self.assertEqual(ident[0], tid)
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]

Expand Down Expand Up @@ -165,9 +166,10 @@ def f(mutex):

mutex = threading.Lock()
mutex.acquire()
tid = _thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
with support.wait_threads_exit():
tid = _thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
self.assertIn(tid, threading._active)
self.assertIsInstance(threading._active[tid], threading._DummyThread)
#Issue 29376
Expand Down
Loading