Skip to content

FIX: Disallow returning None in pipeline.utils.load_resultfile #3023

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 37 additions & 29 deletions nipype/pipeline/engine/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def interface(self):
def result(self):
"""Get result from result file (do not hold it in memory)"""
return _load_resultfile(
op.join(self.output_dir(), 'result_%s.pklz' % self.name))[0]
op.join(self.output_dir(), 'result_%s.pklz' % self.name))

@property
def inputs(self):
Expand Down Expand Up @@ -518,7 +518,7 @@ def _get_inputs(self):
logger.debug('input: %s', key)
results_file = info[0]
logger.debug('results file: %s', results_file)
outputs = _load_resultfile(results_file)[0].outputs
outputs = _load_resultfile(results_file).outputs
if outputs is None:
raise RuntimeError("""\
Error populating the input "%s" of node "%s": the results file of the source node \
Expand Down Expand Up @@ -565,34 +565,42 @@ def _run_interface(self, execute=True, updatehash=False):

def _load_results(self):
cwd = self.output_dir()
result, aggregate, attribute_error = _load_resultfile(
op.join(cwd, 'result_%s.pklz' % self.name))

try:
result = _load_resultfile(
op.join(cwd, 'result_%s.pklz' % self.name))
except (traits.TraitError, EOFError):
logger.debug(
'Error populating inputs/outputs, (re)aggregating results...')
except (AttributeError, ImportError) as err:
logger.debug('attribute error: %s probably using '
'different trait pickled file', str(err))
old_inputs = loadpkl(op.join(cwd, '_inputs.pklz'))
self.inputs.trait_set(**old_inputs)
else:
return result

# try aggregating first
if aggregate:
logger.debug('aggregating results')
if attribute_error:
old_inputs = loadpkl(op.join(cwd, '_inputs.pklz'))
self.inputs.trait_set(**old_inputs)
if not isinstance(self, MapNode):
self._copyfiles_to_wd(linksonly=True)
aggouts = self._interface.aggregate_outputs(
needed_outputs=self.needed_outputs)
runtime = Bunch(
cwd=cwd,
returncode=0,
environ=dict(os.environ),
hostname=socket.gethostname())
result = InterfaceResult(
interface=self._interface.__class__,
runtime=runtime,
inputs=self._interface.inputs.get_traitsfree(),
outputs=aggouts)
_save_resultfile(
result, cwd, self.name,
rebase=str2bool(self.config['execution']['use_relative_paths']))
else:
logger.debug('aggregating mapnode results')
result = self._run_interface()
if not isinstance(self, MapNode):
self._copyfiles_to_wd(linksonly=True)
aggouts = self._interface.aggregate_outputs(
needed_outputs=self.needed_outputs)
runtime = Bunch(
cwd=cwd,
returncode=0,
environ=dict(os.environ),
hostname=socket.gethostname())
result = InterfaceResult(
interface=self._interface.__class__,
runtime=runtime,
inputs=self._interface.inputs.get_traitsfree(),
outputs=aggouts)
_save_resultfile(
result, cwd, self.name,
rebase=str2bool(self.config['execution']['use_relative_paths']))
else:
logger.debug('aggregating mapnode results')
result = self._run_interface()
return result

def _run_command(self, execute, copyfiles=True):
Expand Down
49 changes: 48 additions & 1 deletion nipype/pipeline/engine/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from ....interfaces import base as nib
from ....interfaces import utility as niu
from .... import config
from ..utils import clean_working_directory, write_workflow_prov
from ..utils import (clean_working_directory, write_workflow_prov,
load_resultfile)


class InputSpec(nib.TraitedSpec):
Expand Down Expand Up @@ -283,3 +284,49 @@ def test_modify_paths_bug(tmpdir):
assert outputs.out_dict_path == {out_str: out_path}
assert outputs.out_dict_str == {out_str: out_str}
assert outputs.out_list == [out_str] * 2


@pytest.mark.xfail(sys.version_info < (3, 4),
reason="rebase does not fully work with Python 2.7")
@pytest.mark.parametrize("use_relative", [True, False])
def test_save_load_resultfile(tmpdir, use_relative):
"""Test minimally the save/load functions for result files."""
from shutil import copytree, rmtree
tmpdir.chdir()

old_use_relative = config.getboolean('execution', 'use_relative_paths')
config.set('execution', 'use_relative_paths', use_relative)

spc = pe.Node(StrPathConfuser(in_str='2'), name='spc')
spc.base_dir = tmpdir.mkdir('node').strpath

result = spc.run()

loaded_result = load_resultfile(
tmpdir.join('node').join('spc').join('result_spc.pklz').strpath)

assert result.runtime.dictcopy() == loaded_result.runtime.dictcopy()
assert result.inputs == loaded_result.inputs
assert result.outputs.get() == loaded_result.outputs.get()

# Test the mobility of the result file.
copytree(tmpdir.join('node').strpath, tmpdir.join('node2').strpath)
rmtree(tmpdir.join('node').strpath)

if use_relative:
loaded_result2 = load_resultfile(
tmpdir.join('node2').join('spc').join('result_spc.pklz').strpath)

assert result.runtime.dictcopy() == loaded_result2.runtime.dictcopy()
assert result.inputs == loaded_result2.inputs
assert loaded_result2.outputs.get() != result.outputs.get()
newpath = result.outputs.out_path.replace('/node/', '/node2/')
assert loaded_result2.outputs.out_path == newpath
assert loaded_result2.outputs.out_tuple[0] == newpath
assert loaded_result2.outputs.out_dict_path['2'] == newpath
else:
with pytest.raises(nib.TraitError):
load_resultfile(
tmpdir.join('node2').join('spc').join('result_spc.pklz').strpath)

config.set('execution', 'use_relative_paths', old_use_relative)
79 changes: 35 additions & 44 deletions nipype/pipeline/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@
write_rst_header,
write_rst_dict,
write_rst_list,
FileNotFoundError,
)
from ...utils.misc import str2bool
from ...utils.functions import create_function_from_source
from ...interfaces.base.traits_extension import (
rebase_path_traits, resolve_path_traits, OutputMultiPath, isdefined, Undefined, traits)
rebase_path_traits, resolve_path_traits, OutputMultiPath, isdefined, Undefined)
from ...interfaces.base.support import Bunch, InterfaceResult
from ...interfaces.base import CommandLine
from ...interfaces.utility import IdentityInterface
Expand Down Expand Up @@ -249,6 +250,10 @@ def save_resultfile(result, cwd, name, rebase=None):
savepkl(resultsfile, result)
return

if not rebase:
savepkl(resultsfile, result)
return

backup_traits = {}
try:
with indirectory(cwd):
Expand All @@ -273,58 +278,44 @@ def load_resultfile(results_file, resolve=True):
"""
Load InterfaceResult file from path.

Parameter
---------
path : base_dir of node
name : name of node
Parameters
----------
results_file : pathlike
Path to an existing pickle (``result_<interface name>.pklz``) created with
``save_resultfile``.
Raises ``FileNotFoundError`` if ``results_file`` does not exist.
resolve : bool
Determines whether relative paths will be resolved to absolute (default is ``True``).

Returns
-------
result : InterfaceResult structure
aggregate : boolean indicating whether node should aggregate_outputs
attribute error : boolean indicating whether there was some mismatch in
versions of traits used to store result and hence node needs to
rerun
result : InterfaceResult
A Nipype object containing the runtime, inputs, outputs and other interface information
such as a traceback in the case of errors.

"""
results_file = Path(results_file)
aggregate = True
result = None
attribute_error = False

if not results_file.exists():
return result, aggregate, attribute_error
raise FileNotFoundError(results_file)

with indirectory(str(results_file.parent)):
result = loadpkl(results_file)
if resolve and result.outputs:
try:
result = loadpkl(results_file)
except (traits.TraitError, EOFError):
logger.debug(
'some file does not exist. hence trait cannot be set')
except (AttributeError, ImportError) as err:
attribute_error = True
logger.debug('attribute error: %s probably using '
'different trait pickled file', str(err))
else:
aggregate = False

if resolve and result.outputs:
try:
outputs = result.outputs.get()
except TypeError: # This is a Bunch
return result, aggregate, attribute_error

logger.debug('Resolving paths in outputs loaded from results file.')
for trait_name, old in list(outputs.items()):
if isdefined(old):
if result.outputs.trait(trait_name).is_trait_type(OutputMultiPath):
old = result.outputs.trait(trait_name).handler.get_value(
result.outputs, trait_name)
value = resolve_path_traits(result.outputs.trait(trait_name), old,
results_file.parent)
setattr(result.outputs, trait_name, value)

return result, aggregate, attribute_error
outputs = result.outputs.get()
except TypeError: # This is a Bunch
logger.debug('Outputs object of loaded result %s is a Bunch.', results_file)
return result

logger.debug('Resolving paths in outputs loaded from results file.')
for trait_name, old in list(outputs.items()):
if isdefined(old):
if result.outputs.trait(trait_name).is_trait_type(OutputMultiPath):
old = result.outputs.trait(trait_name).handler.get_value(
result.outputs, trait_name)
value = resolve_path_traits(result.outputs.trait(trait_name), old,
results_file.parent)
setattr(result.outputs, trait_name, value)
return result


def strip_temp(files, wd):
Expand Down
31 changes: 22 additions & 9 deletions nipype/pipeline/plugins/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from traceback import format_exception

from ... import logging
from ...utils.filemanip import savepkl, crash2txt, makedirs
from ...utils.filemanip import savepkl, crash2txt, makedirs, FileNotFoundError

logger = logging.getLogger('nipype.workflow')

Expand All @@ -26,17 +26,30 @@ def report_crash(node, traceback=None, hostname=None):
"""
name = node._id
host = None
if node.result and getattr(node.result, 'runtime'):
if isinstance(node.result.runtime, list):
host = node.result.runtime[0].hostname
else:
host = node.result.runtime.hostname
traceback = traceback or format_exception(*sys.exc_info())

try:
result = node.result
except FileNotFoundError:
traceback += """

When creating this crashfile, the results file corresponding
to the node could not be found.""".splitlines(keepends=True)
except Exception as exc:
traceback += """

During the creation of this crashfile triggered by the above exception,
another exception occurred:\n\n{}.""".format(exc).splitlines(keepends=True)
else:
if getattr(result, 'runtime', None):
if isinstance(result.runtime, list):
host = result.runtime[0].hostname
else:
host = result.runtime.hostname

# Try everything to fill in the host
host = host or hostname or gethostname()
logger.error('Node %s failed to run on host %s.', name, host)
if not traceback:
traceback = format_exception(*sys.exc_info())
timeofcrash = strftime('%Y%m%d-%H%M%S')
try:
login_name = getpass.getuser()
Expand All @@ -49,7 +62,7 @@ def report_crash(node, traceback=None, hostname=None):
makedirs(crashdir, exist_ok=True)
crashfile = os.path.join(crashdir, crashfile)

if node.config['execution']['crashfile_format'].lower() in ['text', 'txt']:
if node.config['execution']['crashfile_format'].lower() in ('text', 'txt', '.txt'):
crashfile += '.txt'
else:
crashfile += '.pklz'
Expand Down
2 changes: 1 addition & 1 deletion nipype/utils/filemanip.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ def loadpkl(infile):
Attempted to open a results file generated by Nipype version %s, \
with an incompatible Nipype version (%s)""", pkl_metadata['version'], version)
raise e
fmlogger.error("""\
fmlogger.warning("""\
No metadata was found in the pkl file. Make sure you are currently using \
the same Nipype version from the generated pkl.""")
raise e
Expand Down