Skip to content

[3.5] bpo-28699: fix a bug that exception at the very first of a iterable would cause pools behave abnormaly #884

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 55 additions & 24 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
try:
result = (True, func(*args, **kwds))
except Exception as e:
if wrap_exception:
if wrap_exception and func is not _helper_reraises_exception:
e = ExceptionWithTraceback(e, e.__traceback__)
result = (False, e)
try:
Expand All @@ -133,6 +133,10 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
completed += 1
util.debug('worker exiting after %d tasks' % completed)

def _helper_reraises_exception(ex):
'Pickle-able helper function for use by _guarded_task_generation.'
raise ex

#
# Class representing a process pool
#
Expand Down Expand Up @@ -277,6 +281,17 @@ def starmap_async(self, func, iterable, chunksize=None, callback=None,
return self._map_async(func, iterable, starmapstar, chunksize,
callback, error_callback)

def _guarded_task_generation(self, result_job, func, iterable):
'''Provides a generator of tasks for imap and imap_unordered with
appropriate handling for iterables which throw exceptions during
iteration.'''
try:
i = -1
for i, x in enumerate(iterable):
yield (result_job, i, func, (x,), {})
except Exception as e:
yield (result_job, i+1, _helper_reraises_exception, (e,), {})

def imap(self, func, iterable, chunksize=1):
'''
Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
Expand All @@ -285,15 +300,23 @@ def imap(self, func, iterable, chunksize=1):
raise ValueError("Pool not running")
if chunksize == 1:
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
for i, x in enumerate(iterable)), result._set_length))
self._taskqueue.put(
(
self._guarded_task_generation(result._job, func, iterable),
result._set_length
))
return result
else:
assert chunksize > 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._set_length))
self._taskqueue.put(
(
self._guarded_task_generation(result._job,
mapstar,
task_batches),
result._set_length
))
return (item for chunk in result for item in chunk)

def imap_unordered(self, func, iterable, chunksize=1):
Expand All @@ -304,15 +327,23 @@ def imap_unordered(self, func, iterable, chunksize=1):
raise ValueError("Pool not running")
if chunksize == 1:
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
for i, x in enumerate(iterable)), result._set_length))
self._taskqueue.put(
(
self._guarded_task_generation(result._job, func, iterable),
result._set_length
))
return result
else:
assert chunksize > 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._set_length))
self._taskqueue.put(
(
self._guarded_task_generation(result._job,
mapstar,
task_batches),
result._set_length
))
return (item for chunk in result for item in chunk)

def apply_async(self, func, args=(), kwds={}, callback=None,
Expand All @@ -323,7 +354,7 @@ def apply_async(self, func, args=(), kwds={}, callback=None,
if self._state != RUN:
raise ValueError("Pool not running")
result = ApplyResult(self._cache, callback, error_callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
return result

def map_async(self, func, iterable, chunksize=None, callback=None,
Expand Down Expand Up @@ -354,8 +385,14 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback,
error_callback=error_callback)
self._taskqueue.put((((result._job, i, mapper, (x,), {})
for i, x in enumerate(task_batches)), None))
self._taskqueue.put(
(
self._guarded_task_generation(result._job,
mapper,
task_batches),
None
)
)
return result

@staticmethod
Expand All @@ -377,33 +414,27 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache):

for taskseq, set_length in iter(taskqueue.get, None):
task = None
i = -1
try:
for i, task in enumerate(taskseq):
# iterating taskseq cannot fail
for task in taskseq:
if thread._state:
util.debug('task handler found thread._state != RUN')
break
try:
put(task)
except Exception as e:
job, ind = task[:2]
job, idx = task[:2]
try:
cache[job]._set(ind, (False, e))
cache[job]._set(idx, (False, e))
except KeyError:
pass
else:
if set_length:
util.debug('doing set_length()')
set_length(i+1)
idx = task[1] if task else -1
set_length(idx + 1)
continue
break
except Exception as ex:
job, ind = task[:2] if task else (0, 0)
if job in cache:
cache[job]._set(ind + 1, (False, ex))
if set_length:
util.debug('doing set_length()')
set_length(i+1)
finally:
task = taskseq = job = None
else:
Expand Down
59 changes: 58 additions & 1 deletion Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,8 @@ def __del__(self):
class SayWhenError(ValueError): pass

def exception_throwing_generator(total, when):
if when == -1:
raise SayWhenError("Somebody said when")
for i in range(total):
if i == when:
raise SayWhenError("Somebody said when")
Expand Down Expand Up @@ -1763,6 +1765,32 @@ def test_map_chunksize(self):
except multiprocessing.TimeoutError:
self.fail("pool.map_async with chunksize stalled on null list")

def test_map_handle_iterable_exception(self):
if self.TYPE == 'manager':
self.skipTest('test not appropriate for {}'.format(self.TYPE))

# SayWhenError seen at the very first of the iterable
with self.assertRaises(SayWhenError):
self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
# again, make sure it's reentrant
with self.assertRaises(SayWhenError):
self.pool.map(sqr, exception_throwing_generator(1, -1), 1)

with self.assertRaises(SayWhenError):
self.pool.map(sqr, exception_throwing_generator(10, 3), 1)

class SpecialIterable:
def __iter__(self):
return self
def __next__(self):
raise SayWhenError
def __len__(self):
return 1
with self.assertRaises(SayWhenError):
self.pool.map(sqr, SpecialIterable(), 1)
with self.assertRaises(SayWhenError):
self.pool.map(sqr, SpecialIterable(), 1)

def test_async(self):
res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
get = TimingWrapper(res.get)
Expand Down Expand Up @@ -1793,6 +1821,13 @@ def test_imap_handle_iterable_exception(self):
if self.TYPE == 'manager':
self.skipTest('test not appropriate for {}'.format(self.TYPE))

# SayWhenError seen at the very first of the iterable
it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
self.assertRaises(SayWhenError, it.__next__)
# again, make sure it's reentrant
it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
self.assertRaises(SayWhenError, it.__next__)

it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
for i in range(3):
self.assertEqual(next(it), i*i)
Expand All @@ -1819,6 +1854,17 @@ def test_imap_unordered_handle_iterable_exception(self):
if self.TYPE == 'manager':
self.skipTest('test not appropriate for {}'.format(self.TYPE))

# SayWhenError seen at the very first of the iterable
it = self.pool.imap_unordered(sqr,
exception_throwing_generator(1, -1),
1)
self.assertRaises(SayWhenError, it.__next__)
# again, make sure it's reentrant
it = self.pool.imap_unordered(sqr,
exception_throwing_generator(1, -1),
1)
self.assertRaises(SayWhenError, it.__next__)

it = self.pool.imap_unordered(sqr,
exception_throwing_generator(10, 3),
1)
Expand Down Expand Up @@ -1900,7 +1946,7 @@ def test_traceback(self):
except Exception as e:
exc = e
else:
raise AssertionError('expected RuntimeError')
self.fail('expected RuntimeError')
self.assertIs(type(exc), RuntimeError)
self.assertEqual(exc.args, (123,))
cause = exc.__cause__
Expand All @@ -1914,6 +1960,17 @@ def test_traceback(self):
sys.excepthook(*sys.exc_info())
self.assertIn('raise RuntimeError(123) # some comment',
f1.getvalue())
# _helper_reraises_exception should not make the error
# a remote exception
with self.Pool(1) as p:
try:
p.map(sqr, exception_throwing_generator(1, -1), 1)
except Exception as e:
exc = e
else:
self.fail('expected SayWhenError')
self.assertIs(type(exc), SayWhenError)
self.assertIs(exc.__cause__, None)

@classmethod
def _test_wrapped_exception(cls):
Expand Down
4 changes: 4 additions & 0 deletions Misc/NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ Extension Modules
Library
-------

- bpo-28699: Fixed a bug in pools in multiprocessing.pool that raising an
exception at the very first of an iterable may swallow the exception or
make the program hang. Patch by Davin Potts and Xiang Zhang.

- bpo-25803: Avoid incorrect errors raised by Path.mkdir(exist_ok=True)
when the OS gives priority to errors such as EACCES over EEXIST.

Expand Down