-
Notifications
You must be signed in to change notification settings - Fork 532
[WIP] Add state/mappers to new workflow syntax #2648
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
djarecka
wants to merge
57
commits into
nipy:enh/workflow_syntax
from
djarecka:effigies-enh/workflow_syntax
Closed
Changes from all commits
Commits
Show all changes
57 commits
Select commit
Hold shift + click to select a range
248d39a
Merge branch 'enh/workflow_syntax' of https://github.com/effigies/nip…
djarecka f568d78
Adding auxiliary and state from nipype2_tmp
djarecka 682248a
updating exceptions and __init__; updating logger to the current vers…
djarecka 7b3bba6
starting updating NewNode using mapper and state from nipype2_tmp; ch…
djarecka c944685
adding simple tests
djarecka 4da8737
adding run_interface method for NewNode (still have to add run method)
djarecka 7ff7722
copy submitter and workers from nipype2_tmp; adding SubmitterNode and…
djarecka 502f617
fixing run method for node (previously it didn work for dask and cf):…
djarecka 784ec22
adding reading results; changing node names (node name shouldn have _)
djarecka 18a6176
adding dask to requirements
djarecka 601c15b
adding distributed package
djarecka d5ceb24
skipping tests for py2; sorting results (not sure if the node should …
djarecka 692bc1b
removing printing the bokeh address (doesnt work with travis)
djarecka ccf8926
adding tests for node mappers
djarecka 8300a98
changing requirments to py>3.4 (fails with py3.4, not sure if we want…
djarecka 46684a4
moving many attributes from NewNodeBase/NewNodeCore to NewNode, so I …
djarecka 26496ae
starting NewWorkflow class that inherits from NewNode (for now only a…
djarecka 40562a8
adding add and method methods to NewWorkflow
djarecka 9fb54c3
fixing one test
djarecka 39e6b50
adding add method to NewWOrkflow
djarecka e795961
adding comments to the tests
djarecka e6f445e
Merge remote-tracking branch 'upstream/enh/workflow_syntax' into effi…
effigies 15b4ba8
[wip] rearranging classes - showing the problem with cf/dask when pas…
djarecka d0bdf1d
Revert "[wip] rearranging classes - showing the problem with cf/dask …
djarecka 522f64c
rearranging classes, NewNode and NewWorkflow inherit from NewBase; fi…
djarecka 2e02f69
allowing for kwarg arg in add method instead of connect method
djarecka 730f371
allowing for using mapper from previous nodes, e.g. [_NA, b], addding…
djarecka 1732b88
removing globals mapper2rpn
djarecka f1d603e
adding index_generator to the state class
djarecka 25c501c
changing string representation of directories names and inputs names
djarecka 1e4067c
adding comments
djarecka b93ef44
[wip] adding inner workflow if workflow has a mapper (doesnt work pro…
djarecka c7737d0
[wip] adding some asserts to the test with inner workflows [skip ci]
djarecka 2214404
Adding comments
djarecka d0e8c76
fixing inner_workflows (had to add deepcopy to node), changing syntax…
djarecka dfe7bd5
[skip ci] fixing _check_all_results (bug introduced in f1d603e, but w…
djarecka 409aa84
[skip ci] adding example from #2539 (not running yet); small changes …
djarecka 1bc5350
small update to tests; adding DotDict to auxiliary (but dont use it r…
djarecka a2d33b9
adding tests for inner workflows
djarecka b87782c
small edits
djarecka fa8710b
removing the concept of inner workflow from workflow with mapper
djarecka 2cec408
adding outputs_nm list for NewWorkflow, adding _reading_result for wo…
djarecka 14ebc78
alloing for mapper in workflows (copying workflows in teh submitter);…
djarecka a9407b4
updating tests for mappers
djarecka 63fe2f4
adding inputs and imports to newnode_nero
djarecka 1c48416
introducing workflows that can be treated as a node: adding results, …
djarecka ef2b4cf
moving run node/wf to tests and removing run from Node/Workflow (at l…
djarecka 935c8ea
merging all submitter classes
djarecka 8c8aa29
adding comments; cleaning connect methods
djarecka 970eb16
changing orders of the methods
djarecka 89395f2
changing directory format for node without mapper (one layer less, si…
djarecka a0cdf31
wf has map the same as a node and map_node to change a mapper for the…
djarecka 2f11ad6
adding fixture to change directory in the tests; changing nodedir to …
djarecka bf54454
adding wrapper to the current interfaces (with simple tests for bet);…
djarecka 0becd3a
updating wf.add method to include nipype interfaces
djarecka d61519c
test_newnode_neuro works: fixing CurrentInterface and adding _reading…
djarecka 2e3ea2a
[skip ci] small naming change
djarecka File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,268 @@ | ||
import pdb | ||
import inspect, os | ||
from ... import config, logging | ||
logger = logging.getLogger('nipype.workflow') | ||
from .nodes import Node | ||
|
||
|
||
# dj: might create a new class or move to State | ||
|
||
# Function to change user provided mapper to "reverse polish notation" used in State | ||
def mapper2rpn(mapper, other_mappers=None): | ||
""" Functions that translate mapper to "reverse polish notation.""" | ||
output_mapper = [] | ||
_ordering(mapper, i=0, output_mapper=output_mapper, other_mappers=other_mappers) | ||
return output_mapper | ||
|
||
|
||
def _ordering(el, i, output_mapper, current_sign=None, other_mappers=None): | ||
""" Used in the mapper2rpn to get a proper order of fields and signs. """ | ||
if type(el) is tuple: | ||
# checking if the mapper dont contain mapper from previous nodes, i.e. has str "_NA", etc. | ||
if type(el[0]) is str and el[0].startswith("_"): | ||
node_nm = el[0][1:] | ||
if node_nm not in other_mappers: | ||
raise Exception("can't ask for mapper from {}".format(node_nm)) | ||
mapper_mod = change_mapper(mapper=other_mappers[node_nm], name=node_nm) | ||
el = (mapper_mod, el[1]) | ||
if type(el[1]) is str and el[1].startswith("_"): | ||
node_nm = el[1][1:] | ||
if node_nm not in other_mappers: | ||
raise Exception("can't ask for mapper from {}".format(node_nm)) | ||
mapper_mod = change_mapper(mapper=other_mappers[node_nm], name=node_nm) | ||
el = (el[0], mapper_mod) | ||
_iterate_list(el, ".", other_mappers, output_mapper=output_mapper) | ||
elif type(el) is list: | ||
if type(el[0]) is str and el[0].startswith("_"): | ||
node_nm = el[0][1:] | ||
if node_nm not in other_mappers: | ||
raise Exception("can't ask for mapper from {}".format(node_nm)) | ||
mapper_mod = change_mapper(mapper=other_mappers[node_nm], name=node_nm) | ||
el[0] = mapper_mod | ||
if type(el[1]) is str and el[1].startswith("_"): | ||
node_nm = el[1][1:] | ||
if node_nm not in other_mappers: | ||
raise Exception("can't ask for mapper from {}".format(node_nm)) | ||
mapper_mod = change_mapper(mapper=other_mappers[node_nm], name=node_nm) | ||
el[1] = mapper_mod | ||
_iterate_list(el, "*", other_mappers, output_mapper=output_mapper) | ||
elif type(el) is str: | ||
output_mapper.append(el) | ||
else: | ||
raise Exception("mapper has to be a string, a tuple or a list") | ||
|
||
if i > 0: | ||
output_mapper.append(current_sign) | ||
|
||
|
||
def _iterate_list(element, sign, other_mappers, output_mapper): | ||
""" Used in the mapper2rpn to get recursion. """ | ||
for i, el in enumerate(element): | ||
_ordering(el, i, current_sign=sign, other_mappers=other_mappers, output_mapper=output_mapper) | ||
|
||
|
||
# functions used in State to know which element should be used for a specific axis | ||
|
||
def mapping_axis(state_inputs, mapper_rpn): | ||
"""Having inputs and mapper (in rpn notation), functions returns the axes of output for every input.""" | ||
axis_for_input = {} | ||
stack = [] | ||
current_axis = None | ||
current_shape = None | ||
#pdb.set_trace() | ||
for el in mapper_rpn: | ||
if el == ".": | ||
right = stack.pop() | ||
left = stack.pop() | ||
if left == "OUT": | ||
if state_inputs[right].shape == current_shape: #todo:should we allow for one-element array? | ||
axis_for_input[right] = current_axis | ||
else: | ||
raise Exception("arrays for scalar operations should have the same size") | ||
|
||
elif right == "OUT": | ||
if state_inputs[left].shape == current_shape: | ||
axis_for_input[left] = current_axis | ||
else: | ||
raise Exception("arrays for scalar operations should have the same size") | ||
|
||
else: | ||
if state_inputs[right].shape == state_inputs[left].shape: | ||
current_axis = list(range(state_inputs[right].ndim)) | ||
current_shape = state_inputs[left].shape | ||
axis_for_input[left] = current_axis | ||
axis_for_input[right] = current_axis | ||
else: | ||
raise Exception("arrays for scalar operations should have the same size") | ||
|
||
stack.append("OUT") | ||
|
||
elif el == "*": | ||
right = stack.pop() | ||
left = stack.pop() | ||
if left == "OUT": | ||
axis_for_input[right] = [i + 1 + current_axis[-1] | ||
for i in range(state_inputs[right].ndim)] | ||
current_axis = current_axis + axis_for_input[right] | ||
current_shape = tuple([i for i in current_shape + state_inputs[right].shape]) | ||
elif right == "OUT": | ||
for key in axis_for_input: | ||
axis_for_input[key] = [i + state_inputs[left].ndim | ||
for i in axis_for_input[key]] | ||
|
||
axis_for_input[left] = [i - len(current_shape) + current_axis[-1] + 1 | ||
for i in range(state_inputs[left].ndim)] | ||
current_axis = current_axis + [i + 1 + current_axis[-1] | ||
for i in range(state_inputs[left].ndim)] | ||
current_shape = tuple([i for i in state_inputs[left].shape + current_shape]) | ||
else: | ||
axis_for_input[left] = list(range(state_inputs[left].ndim)) | ||
axis_for_input[right] = [i + state_inputs[left].ndim | ||
for i in range(state_inputs[right].ndim)] | ||
current_axis = axis_for_input[left] + axis_for_input[right] | ||
current_shape = tuple([i for i in | ||
state_inputs[left].shape + state_inputs[right].shape]) | ||
stack.append("OUT") | ||
|
||
else: | ||
stack.append(el) | ||
|
||
if len(stack) == 0: | ||
pass | ||
elif len(stack) > 1: | ||
raise Exception("exception from mapping_axis") | ||
elif stack[0] != "OUT": | ||
current_axis = [i for i in range(state_inputs[stack[0]].ndim)] | ||
axis_for_input[stack[0]] = current_axis | ||
|
||
if current_axis: | ||
ndim = max(current_axis) + 1 | ||
else: | ||
ndim = 0 | ||
return axis_for_input, ndim | ||
|
||
|
||
def converting_axis2input(state_inputs, axis_for_input, ndim): | ||
""" Having axes for all the input fields, the function returns fields for each axis. """ | ||
input_for_axis = [] | ||
shape = [] | ||
for i in range(ndim): | ||
input_for_axis.append([]) | ||
shape.append(0) | ||
|
||
for inp, axis in axis_for_input.items(): | ||
for (i, ax) in enumerate(axis): | ||
input_for_axis[ax].append(inp) | ||
shape[ax] = state_inputs[inp].shape[i] | ||
|
||
return input_for_axis, shape | ||
|
||
|
||
# used in the Node to change names in a mapper | ||
|
||
def change_mapper(mapper, name): | ||
"""changing names of mapper: adding names of the node""" | ||
if isinstance(mapper, str): | ||
if "." in mapper or mapper.startswith("_"): | ||
return mapper | ||
else: | ||
return "{}.{}".format(name, mapper) | ||
elif isinstance(mapper, list): | ||
return _add_name(mapper, name) | ||
elif isinstance(mapper, tuple): | ||
mapper_l = list(mapper) | ||
return tuple(_add_name(mapper_l, name)) | ||
|
||
|
||
def _add_name(mlist, name): | ||
for i, elem in enumerate(mlist): | ||
if isinstance(elem, str): | ||
if "." in elem or elem.startswith("_"): | ||
pass | ||
else: | ||
mlist[i] = "{}.{}".format(name, mlist[i]) | ||
elif isinstance(elem, list): | ||
mlist[i] = _add_name(elem, name) | ||
elif isinstance(elem, tuple): | ||
mlist[i] = list(elem) | ||
mlist[i] = _add_name(mlist[i], name) | ||
mlist[i] = tuple(mlist[i]) | ||
return mlist | ||
|
||
|
||
#Function interface | ||
|
||
class FunctionInterface(object): | ||
""" A new function interface """ | ||
def __init__(self, function, output_nm, out_read=False, input_map=None): | ||
self.function = function | ||
if type(output_nm) is list: | ||
self._output_nm = output_nm | ||
else: | ||
raise Exception("output_nm should be a list") | ||
if not input_map: | ||
self.input_map = {} | ||
# TODO use signature | ||
for key in inspect.getargspec(function)[0]: | ||
if key not in self.input_map.keys(): | ||
self.input_map[key] = key | ||
# flags if we want to read the txt file to save in node.output | ||
self.out_read = out_read | ||
|
||
|
||
def run(self, input): | ||
self.output = {} | ||
if self.input_map: | ||
for (key_fun, key_inp) in self.input_map.items(): | ||
try: | ||
input[key_fun] = input.pop(key_inp) | ||
except KeyError: | ||
raise Exception("no {} in the input dictionary".format(key_inp)) | ||
fun_output = self.function(**input) | ||
logger.debug("Function Interf, input={}, fun_out={}".format(input, fun_output)) | ||
if type(fun_output) is tuple: | ||
if len(self._output_nm) == len(fun_output): | ||
for i, out in enumerate(fun_output): | ||
self.output[self._output_nm[i]] = out | ||
else: | ||
raise Exception("length of output_nm doesnt match length of the function output") | ||
elif len(self._output_nm)==1: | ||
self.output[self._output_nm[0]] = fun_output | ||
else: | ||
raise Exception("output_nm doesnt match length of the function output") | ||
|
||
return fun_output | ||
|
||
|
||
# want to use to access input as dot, | ||
# but it doesnt work since im using "." within names (using my old syntax with - also cant work) | ||
# https://stackoverflow.com/questions/2352181/how-to-use-a-dot-to-access-members-of-dictionary | ||
class DotDict(dict): | ||
"""dot.notation access to dictionary attributes""" | ||
def __getattr__(self, attr): | ||
return self.get(attr) | ||
__setattr__= dict.__setitem__ | ||
__delattr__= dict.__delitem__ | ||
|
||
def __getstate__(self): | ||
return self | ||
|
||
def __setstate__(self, state): | ||
self.update(state) | ||
self.__dict__ = self | ||
|
||
|
||
class CurrentInterface(object): | ||
def __init__(self, interface, name): | ||
self.nn = Node(interface=interface, name=name) | ||
self.output = {} | ||
|
||
def run(self, inputs, base_dir, dir_nm_el): | ||
self.nn.base_dir = os.path.join(base_dir, dir_nm_el) | ||
for key, val in inputs.items(): | ||
key = key.split(".")[-1] | ||
setattr(self.nn.inputs, key, val) | ||
#have to set again self._output_dir in case of mapper | ||
self.nn._output_dir = os.path.join(self.nn.base_dir, self.nn.name) | ||
res = self.nn.run() | ||
return res |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I resolved this by changing
NodeError
to derive fromPipelineError
. Unless there's a good reason to separate outEngineError
? I don't have a full imagining of the hierarchy in mind. In any event, I think all of these should derive fromNipypeError
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, I made this change because the
EngineError
was not implemented and it was giving me an error. Didn't have anything in mind about hierarchy, just wanted to remove the error.