Skip to content

Commit bcc25fd

Browse files
committed
[FIX] MultiProc starting workers at dubious wd
When ``maxtasksperchild`` was set at a very low value, workers are created very often, sometimes at working directories deleted after the interface cleanup. That would trigger an ``OSError`` when calling ``os.getcwd()`` during ``nipype.config`` import. This PR sets an initializer for the workers that just changes to the appropriate working directory before the worker is spun up. Fixes nipreps/fmriprep#868
1 parent 00ef467 commit bcc25fd

File tree

1 file changed

+24
-4
lines changed

1 file changed

+24
-4
lines changed

nipype/pipeline/plugins/multiproc.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from __future__ import print_function, division, unicode_literals, absolute_import
1010

1111
# Import packages
12+
import os
1213
from multiprocessing import Process, Pool, cpu_count, pool
1314
from traceback import format_exception
1415
import sys
@@ -47,6 +48,8 @@ def run_node(node, updatehash, taskid):
4748
the node to run
4849
updatehash : boolean
4950
flag for updating hash
51+
taskid : int
52+
an identifier for this task
5053
5154
Returns
5255
-------
@@ -68,6 +71,10 @@ def run_node(node, updatehash, taskid):
6871
return result
6972

7073

74+
def _init_worker(cwd):
75+
os.chdir(cwd)
76+
77+
7178
class NonDaemonProcess(Process):
7279
"""A non-daemon process to support internal multiprocessing.
7380
"""
@@ -128,6 +135,10 @@ def __init__(self, plugin_args=None):
128135
self._task_obj = {}
129136
self._taskid = 0
130137

138+
# Cache current working directory and make sure we
139+
# change to it when workers are set up
140+
self._cwd = os.getcwd()
141+
131142
# Read in options or set defaults.
132143
non_daemon = self.plugin_args.get('non_daemon', True)
133144
maxtasks = self.plugin_args.get('maxtasksperchild', 10)
@@ -140,19 +151,28 @@ def __init__(self, plugin_args=None):
140151

141152
# Instantiate different thread pools for non-daemon processes
142153
logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, '
143-
'mem_gb=%0.2f)', 'non' * int(non_daemon), self.processors,
144-
self.memory_gb)
154+
'mem_gb=%0.2f, cwd=%s)', 'non' * int(non_daemon),
155+
self.processors, self.memory_gb, self._cwd)
145156

146157
NipypePool = NonDaemonPool if non_daemon else Pool
147158
try:
148-
self.pool = NipypePool(processes=self.processors,
149-
maxtasksperchild=maxtasks)
159+
self.pool = NipypePool(
160+
processes=self.processors,
161+
maxtasksperchild=maxtasks,
162+
initializer=_init_worker,
163+
initargs=(self._cwd,)
164+
)
150165
except TypeError:
166+
# Python < 3.2 does not have maxtasksperchild
167+
# When maxtasksperchild is not set, initializer is not to be
168+
# called.
151169
self.pool = NipypePool(processes=self.processors)
152170

153171
self._stats = None
154172

155173
def _async_callback(self, args):
174+
# Make sure runtime is not left at a dubious working directory
175+
os.chdir(self._cwd)
156176
self._taskresult[args['taskid']] = args
157177

158178
def _get_result(self, taskid):

0 commit comments

Comments
 (0)