Skip to content

[WIP] Add state/mappers to new workflow syntax #2648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
248d39a
Merge branch 'enh/workflow_syntax' of https://github.com/effigies/nip…
djarecka Jul 10, 2018
f568d78
Adding auxiliary and state from nipype2_tmp
djarecka Jul 12, 2018
682248a
updating exceptions and __init__; updating logger to the current vers…
djarecka Jul 13, 2018
7b3bba6
starting updating NewNode using mapper and state from nipype2_tmp; ch…
djarecka Jul 13, 2018
c944685
adding simple tests
djarecka Jul 14, 2018
4da8737
adding run_interface method for NewNode (still have to add run method)
djarecka Jul 15, 2018
7ff7722
copy submitter and workers from nipype2_tmp; adding SubmitterNode and…
djarecka Jul 15, 2018
502f617
fixing run method for node (previously it didn work for dask and cf):…
djarecka Jul 16, 2018
784ec22
adding reading results; changing node names (node name shouldn have _)
djarecka Jul 16, 2018
18a6176
adding dask to requirements
djarecka Jul 16, 2018
601c15b
adding distributed package
djarecka Jul 16, 2018
d5ceb24
skipping tests for py2; sorting results (not sure if the node should …
djarecka Jul 16, 2018
692bc1b
removing printing the bokeh address (doesnt work with travis)
djarecka Jul 17, 2018
ccf8926
adding tests for node mappers
djarecka Jul 17, 2018
8300a98
changing requirments to py>3.4 (fails with py3.4, not sure if we want…
djarecka Jul 17, 2018
46684a4
moving many attributes from NewNodeBase/NewNodeCore to NewNode, so I …
djarecka Jul 22, 2018
26496ae
starting NewWorkflow class that inherits from NewNode (for now only a…
djarecka Jul 24, 2018
40562a8
adding add and method methods to NewWorkflow
djarecka Jul 24, 2018
9fb54c3
fixing one test
djarecka Jul 24, 2018
39e6b50
adding add method to NewWOrkflow
djarecka Jul 24, 2018
e795961
adding comments to the tests
djarecka Jul 24, 2018
e6f445e
Merge remote-tracking branch 'upstream/enh/workflow_syntax' into effi…
effigies Jul 25, 2018
15b4ba8
[wip] rearranging classes - showing the problem with cf/dask when pas…
djarecka Jul 26, 2018
d0bdf1d
Revert "[wip] rearranging classes - showing the problem with cf/dask …
djarecka Jul 27, 2018
522f64c
rearranging classes, NewNode and NewWorkflow inherit from NewBase; fi…
djarecka Jul 30, 2018
2e02f69
allowing for kwarg arg in add method instead of connect method
djarecka Jul 31, 2018
730f371
allowing for using mapper from previous nodes, e.g. [_NA, b], addding…
djarecka Jul 31, 2018
1732b88
removing globals mapper2rpn
djarecka Aug 15, 2018
f1d603e
adding index_generator to the state class
djarecka Aug 15, 2018
25c501c
changing string representation of directories names and inputs names
djarecka Aug 29, 2018
1e4067c
adding comments
djarecka Aug 29, 2018
b93ef44
[wip] adding inner workflow if workflow has a mapper (doesnt work pro…
djarecka Aug 29, 2018
c7737d0
[wip] adding some asserts to the test with inner workflows [skip ci]
djarecka Aug 29, 2018
2214404
Adding comments
djarecka Aug 29, 2018
d0e8c76
fixing inner_workflows (had to add deepcopy to node), changing syntax…
djarecka Sep 4, 2018
dfe7bd5
[skip ci] fixing _check_all_results (bug introduced in f1d603e, but w…
djarecka Sep 4, 2018
409aa84
[skip ci] adding example from #2539 (not running yet); small changes …
djarecka Sep 5, 2018
1bc5350
small update to tests; adding DotDict to auxiliary (but dont use it r…
djarecka Sep 19, 2018
a2d33b9
adding tests for inner workflows
djarecka Sep 19, 2018
b87782c
small edits
djarecka Sep 24, 2018
fa8710b
removing the concept of inner workflow from workflow with mapper
djarecka Sep 24, 2018
2cec408
adding outputs_nm list for NewWorkflow, adding _reading_result for wo…
djarecka Sep 24, 2018
14ebc78
alloing for mapper in workflows (copying workflows in teh submitter);…
djarecka Sep 30, 2018
a9407b4
updating tests for mappers
djarecka Sep 30, 2018
63fe2f4
adding inputs and imports to newnode_nero
djarecka Oct 1, 2018
1c48416
introducing workflows that can be treated as a node: adding results, …
djarecka Oct 3, 2018
ef2b4cf
moving run node/wf to tests and removing run from Node/Workflow (at l…
djarecka Oct 4, 2018
935c8ea
merging all submitter classes
djarecka Oct 4, 2018
8c8aa29
adding comments; cleaning connect methods
djarecka Oct 4, 2018
970eb16
changing orders of the methods
djarecka Oct 4, 2018
89395f2
changing directory format for node without mapper (one layer less, si…
djarecka Oct 4, 2018
a0cdf31
wf has map the same as a node and map_node to change a mapper for the…
djarecka Oct 4, 2018
2f11ad6
adding fixture to change directory in the tests; changing nodedir to …
djarecka Oct 4, 2018
bf54454
adding wrapper to the current interfaces (with simple tests for bet);…
djarecka Oct 5, 2018
0becd3a
updating wf.add method to include nipype interfaces
djarecka Oct 6, 2018
d61519c
test_newnode_neuro works: fixing CurrentInterface and adding _reading…
djarecka Oct 8, 2018
2e3ea2a
[skip ci] small naming change
djarecka Oct 8, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions nipype/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ class NipypeError(Exception):
pass


class EngineError(Exception):
pass


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I resolved this by changing NodeError to derive from PipelineError. Unless there's a good reason to separate out EngineError? I don't have a full imagining of the hierarchy in mind. In any event, I think all of these should derive from NipypeError.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I made this change because the EngineError was not implemented and it was giving me an error. Didn't have anything in mind about hierarchy, just wanted to remove the error.

class PipelineError(NipypeError):
pass

Expand Down
1 change: 1 addition & 0 deletions nipype/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def get_nipype_gitversion():
'pydot>=%s' % PYDOT_MIN_VERSION,
'packaging',
'futures; python_version == "2.7"',
'dask',
]

if sys.version_info <= (3, 4):
Expand Down
2 changes: 1 addition & 1 deletion nipype/pipeline/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@

from __future__ import absolute_import
__docformat__ = 'restructuredtext'
from .workflows import Workflow
from .workflows import Workflow, NewNode, NewWorkflow
from .nodes import Node, MapNode, JoinNode
from .utils import generate_expanded_graph
268 changes: 268 additions & 0 deletions nipype/pipeline/engine/auxiliary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
import pdb
import inspect, os
from ... import config, logging
logger = logging.getLogger('nipype.workflow')
from .nodes import Node


# dj: might create a new class or move to State

# Function to change user provided mapper to "reverse polish notation" used in State
def mapper2rpn(mapper, other_mappers=None):
""" Functions that translate mapper to "reverse polish notation."""
output_mapper = []
_ordering(mapper, i=0, output_mapper=output_mapper, other_mappers=other_mappers)
return output_mapper


def _ordering(el, i, output_mapper, current_sign=None, other_mappers=None):
""" Used in the mapper2rpn to get a proper order of fields and signs. """
if type(el) is tuple:
# checking if the mapper dont contain mapper from previous nodes, i.e. has str "_NA", etc.
if type(el[0]) is str and el[0].startswith("_"):
node_nm = el[0][1:]
if node_nm not in other_mappers:
raise Exception("can't ask for mapper from {}".format(node_nm))
mapper_mod = change_mapper(mapper=other_mappers[node_nm], name=node_nm)
el = (mapper_mod, el[1])
if type(el[1]) is str and el[1].startswith("_"):
node_nm = el[1][1:]
if node_nm not in other_mappers:
raise Exception("can't ask for mapper from {}".format(node_nm))
mapper_mod = change_mapper(mapper=other_mappers[node_nm], name=node_nm)
el = (el[0], mapper_mod)
_iterate_list(el, ".", other_mappers, output_mapper=output_mapper)
elif type(el) is list:
if type(el[0]) is str and el[0].startswith("_"):
node_nm = el[0][1:]
if node_nm not in other_mappers:
raise Exception("can't ask for mapper from {}".format(node_nm))
mapper_mod = change_mapper(mapper=other_mappers[node_nm], name=node_nm)
el[0] = mapper_mod
if type(el[1]) is str and el[1].startswith("_"):
node_nm = el[1][1:]
if node_nm not in other_mappers:
raise Exception("can't ask for mapper from {}".format(node_nm))
mapper_mod = change_mapper(mapper=other_mappers[node_nm], name=node_nm)
el[1] = mapper_mod
_iterate_list(el, "*", other_mappers, output_mapper=output_mapper)
elif type(el) is str:
output_mapper.append(el)
else:
raise Exception("mapper has to be a string, a tuple or a list")

if i > 0:
output_mapper.append(current_sign)


def _iterate_list(element, sign, other_mappers, output_mapper):
""" Used in the mapper2rpn to get recursion. """
for i, el in enumerate(element):
_ordering(el, i, current_sign=sign, other_mappers=other_mappers, output_mapper=output_mapper)


# functions used in State to know which element should be used for a specific axis

def mapping_axis(state_inputs, mapper_rpn):
"""Having inputs and mapper (in rpn notation), functions returns the axes of output for every input."""
axis_for_input = {}
stack = []
current_axis = None
current_shape = None
#pdb.set_trace()
for el in mapper_rpn:
if el == ".":
right = stack.pop()
left = stack.pop()
if left == "OUT":
if state_inputs[right].shape == current_shape: #todo:should we allow for one-element array?
axis_for_input[right] = current_axis
else:
raise Exception("arrays for scalar operations should have the same size")

elif right == "OUT":
if state_inputs[left].shape == current_shape:
axis_for_input[left] = current_axis
else:
raise Exception("arrays for scalar operations should have the same size")

else:
if state_inputs[right].shape == state_inputs[left].shape:
current_axis = list(range(state_inputs[right].ndim))
current_shape = state_inputs[left].shape
axis_for_input[left] = current_axis
axis_for_input[right] = current_axis
else:
raise Exception("arrays for scalar operations should have the same size")

stack.append("OUT")

elif el == "*":
right = stack.pop()
left = stack.pop()
if left == "OUT":
axis_for_input[right] = [i + 1 + current_axis[-1]
for i in range(state_inputs[right].ndim)]
current_axis = current_axis + axis_for_input[right]
current_shape = tuple([i for i in current_shape + state_inputs[right].shape])
elif right == "OUT":
for key in axis_for_input:
axis_for_input[key] = [i + state_inputs[left].ndim
for i in axis_for_input[key]]

axis_for_input[left] = [i - len(current_shape) + current_axis[-1] + 1
for i in range(state_inputs[left].ndim)]
current_axis = current_axis + [i + 1 + current_axis[-1]
for i in range(state_inputs[left].ndim)]
current_shape = tuple([i for i in state_inputs[left].shape + current_shape])
else:
axis_for_input[left] = list(range(state_inputs[left].ndim))
axis_for_input[right] = [i + state_inputs[left].ndim
for i in range(state_inputs[right].ndim)]
current_axis = axis_for_input[left] + axis_for_input[right]
current_shape = tuple([i for i in
state_inputs[left].shape + state_inputs[right].shape])
stack.append("OUT")

else:
stack.append(el)

if len(stack) == 0:
pass
elif len(stack) > 1:
raise Exception("exception from mapping_axis")
elif stack[0] != "OUT":
current_axis = [i for i in range(state_inputs[stack[0]].ndim)]
axis_for_input[stack[0]] = current_axis

if current_axis:
ndim = max(current_axis) + 1
else:
ndim = 0
return axis_for_input, ndim


def converting_axis2input(state_inputs, axis_for_input, ndim):
""" Having axes for all the input fields, the function returns fields for each axis. """
input_for_axis = []
shape = []
for i in range(ndim):
input_for_axis.append([])
shape.append(0)

for inp, axis in axis_for_input.items():
for (i, ax) in enumerate(axis):
input_for_axis[ax].append(inp)
shape[ax] = state_inputs[inp].shape[i]

return input_for_axis, shape


# used in the Node to change names in a mapper

def change_mapper(mapper, name):
"""changing names of mapper: adding names of the node"""
if isinstance(mapper, str):
if "." in mapper or mapper.startswith("_"):
return mapper
else:
return "{}.{}".format(name, mapper)
elif isinstance(mapper, list):
return _add_name(mapper, name)
elif isinstance(mapper, tuple):
mapper_l = list(mapper)
return tuple(_add_name(mapper_l, name))


def _add_name(mlist, name):
for i, elem in enumerate(mlist):
if isinstance(elem, str):
if "." in elem or elem.startswith("_"):
pass
else:
mlist[i] = "{}.{}".format(name, mlist[i])
elif isinstance(elem, list):
mlist[i] = _add_name(elem, name)
elif isinstance(elem, tuple):
mlist[i] = list(elem)
mlist[i] = _add_name(mlist[i], name)
mlist[i] = tuple(mlist[i])
return mlist


#Function interface

class FunctionInterface(object):
""" A new function interface """
def __init__(self, function, output_nm, out_read=False, input_map=None):
self.function = function
if type(output_nm) is list:
self._output_nm = output_nm
else:
raise Exception("output_nm should be a list")
if not input_map:
self.input_map = {}
# TODO use signature
for key in inspect.getargspec(function)[0]:
if key not in self.input_map.keys():
self.input_map[key] = key
# flags if we want to read the txt file to save in node.output
self.out_read = out_read


def run(self, input):
self.output = {}
if self.input_map:
for (key_fun, key_inp) in self.input_map.items():
try:
input[key_fun] = input.pop(key_inp)
except KeyError:
raise Exception("no {} in the input dictionary".format(key_inp))
fun_output = self.function(**input)
logger.debug("Function Interf, input={}, fun_out={}".format(input, fun_output))
if type(fun_output) is tuple:
if len(self._output_nm) == len(fun_output):
for i, out in enumerate(fun_output):
self.output[self._output_nm[i]] = out
else:
raise Exception("length of output_nm doesnt match length of the function output")
elif len(self._output_nm)==1:
self.output[self._output_nm[0]] = fun_output
else:
raise Exception("output_nm doesnt match length of the function output")

return fun_output


# want to use to access input as dot,
# but it doesnt work since im using "." within names (using my old syntax with - also cant work)
# https://stackoverflow.com/questions/2352181/how-to-use-a-dot-to-access-members-of-dictionary
class DotDict(dict):
"""dot.notation access to dictionary attributes"""
def __getattr__(self, attr):
return self.get(attr)
__setattr__= dict.__setitem__
__delattr__= dict.__delitem__

def __getstate__(self):
return self

def __setstate__(self, state):
self.update(state)
self.__dict__ = self


class CurrentInterface(object):
def __init__(self, interface, name):
self.nn = Node(interface=interface, name=name)
self.output = {}

def run(self, inputs, base_dir, dir_nm_el):
self.nn.base_dir = os.path.join(base_dir, dir_nm_el)
for key, val in inputs.items():
key = key.split(".")[-1]
setattr(self.nn.inputs, key, val)
#have to set again self._output_dir in case of mapper
self.nn._output_dir = os.path.join(self.nn.base_dir, self.nn.name)
res = self.nn.run()
return res
Loading