Skip to content

WIP: New workflow syntax #2629

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions nipype/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
class NipypeError(Exception):
pass


class PipelineError(NipypeError):
pass


class NodeError(PipelineError):
pass


class WorkflowError(NodeError):
pass


class MappingError(NodeError):
pass


class JoinError(NodeError):
pass


class InterfaceError(NipypeError):
pass
109 changes: 108 additions & 1 deletion nipype/pipeline/engine/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import networkx as nx

from ... import config, logging
from ...exceptions import NodeError, WorkflowError, MappingError, JoinError
from ...utils.misc import str2bool
from ...utils.functions import (getsource, create_function_from_source)

Expand All @@ -33,7 +34,7 @@
get_print_name, merge_dict, format_node)

from .base import EngineBase
from .nodes import MapNode
from .nodes import MapNode, Node

# Py2 compat: http://python-future.org/compatible_idioms.html#collections-counter-and-ordereddict
from future import standard_library
Expand Down Expand Up @@ -1043,3 +1044,109 @@ def _get_dot(self,
vname1.replace('.', '_')))
logger.debug('cross connection: %s', dotlist[-1])
return ('\n' + prefix).join(dotlist)

def add(self, name, node_like):
if is_interface(node_like):
node = Node(node_like, name=name)
elif is_node(node_like):
node = node_like

self.add_nodes([node])


class Map(Node):
pass


class Join(Node):
pass

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we planning to change the current Node class and define Map and Join?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just prep for my syntax suggestion from #2539, where Join(node, ...) is equivalent to node.join(...).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


class MapState(object):
pass

class NewNode(EngineBase):
def __init__(self, inputs={}, map_on=None, join_by=None,
*args, **kwargs):
self._mappers = {}
self._joiners = {}

def map(self, field, values=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that field is a "mapper" itself (either list, tuple or string), is that right? Not sure about values, do you want to be a dictionary? From the Satra example I understand that you can provide values of some of the fields from the mapper (i.e. field in your syntax).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not actually positive of what I was doing here. Probably should have responded more promptly. :-) In any event, I clearly didn't finish this function (see L1076).

I believe what I was suggesting is that you could do:

node.map('a', [1, 2, 3, 4])

# or, where node.inputs.a takes the output of another node
node.map('a')

if isinstance(field, list):
for field_
if values is not None:
if len(values != len(field)):
elif isinstance(field, tuple):
pass
if values is None:
values = getattr(self._inputs, field)
if values is None:
raise MappingError('Cannot map unassigned input field')
self._mappers[field] = values

def join(self, field):
pass


class NewWorkflow(NewNode):
def __init__(self, inputs={}, *args, **kwargs):
super(NewWorkflow, self).__init__(*args, **kwargs)

self._nodes = {}

mro = self.__class__.mro()
wf_klasses = mro[:mro.index(NewWorkflow)][::-1]
items = {}
for klass in wf_klasses:
items.update(klass.__dict__)
for name, runnable in items.items():
if name in ('__module__', '__doc__'):
continue

self.add(name, value)

def add(self, name, runnable):
if is_function(runnable):
node = Node(Function(function=runnable), name=name)
elif is_interface(runnable):
node = Node(runnable, name=name)
elif is_node(runnable):
node = runnable if runnable.name == name else runnable.clone(name=name)
else:
raise ValueError("Unknown workflow element: {!r}".format(runnable))
setattr(self, name, node)
self._nodes[name] = node
self._last_added = name

def map(self, field, node=None, values=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what is the meaning of node. it shouldn't be just self._last_added?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I'm almost certain the idea is that I could do:

wf.add(node).map(<something over node>)

# or
wf.add(node)
...
wf.map(<something over node>, node=node)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see. Didn't think about this syntax.

if node is None:
if '.' in field:
node, field = field.rsplit('.', 1)
else:
node = self._last_added

if '.' in node:
subwf, node = node.split('.', 1)
self._nodes[subwf].map(field, node, values)
return

if node in self._mappers:
raise WorkflowError("Cannot assign two mappings to the same input")

self._mappers[node] = (field, values)

def join(self, field, node=None):
pass


def is_function(obj):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is nitpicking, but the builtin callable can be used here instead of defining another function.

return hasattr(obj, '__call__')


def is_interface(obj):
return all(hasattr(obj, protocol)
for protocol in ('input_spec', 'output_spec', 'run'))


def is_node(obj):
return hasattr(obj, itername)