Skip to content

Scoped ref and typedsl #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jun 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cwltool/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ class Builder(object):
def __init__(self): # type: () -> None
self.names = None # type: avro.schema.Names
self.schemaDefs = None # type: Dict[str,Dict[unicode, Any]]
self.files = None # type: List[Dict[str, str]]
self.files = None # type: List[Dict[unicode, unicode]]
self.fs_access = None # type: StdFsAccess
self.job = None # type: Dict[str, Any]
self.job = None # type: Dict[unicode, Union[Dict[unicode, Any], List, unicode]]
self.requirements = None # type: List[Dict[str,Any]]
self.outdir = None # type: str
self.tmpdir = None # type: str
Expand Down
12 changes: 8 additions & 4 deletions cwltool/cwlrdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any, Union, Dict, IO

def makerdf(workflow, wf, ctx):
# type: (Union[str, unicode], Dict[str,Any], Loader.ContextType) -> Graph
# type: (Union[str, unicode], Union[List[Dict[unicode, Any]], Dict[unicode, Any]], Loader.ContextType) -> Graph
prefixes = {}
for k,v in ctx.iteritems():
if isinstance(v, dict):
Expand All @@ -18,7 +18,11 @@ def makerdf(workflow, wf, ctx):
p, _ = frg.split("/")
prefixes[p] = u"%s#%s/" % (doc_url, p)

wf["@context"] = ctx
if isinstance(wf, list):
for entry in wf:
entry["@context"] = ctx
else:
wf["@context"] = ctx
g = Graph().parse(data=json.dumps(wf), format='json-ld', location=workflow)

# Bug in json-ld loader causes @id fields to be added to the graph
Expand All @@ -31,7 +35,7 @@ def makerdf(workflow, wf, ctx):
return g

def printrdf(workflow, wf, ctx, sr, stdout):
# type: (Union[str, unicode], Dict[str, Any], Loader.ContextType, str, IO[Any]) -> None
# type: (Union[str, unicode], Union[List[Dict[unicode, Any]], Dict[unicode, Any]], Loader.ContextType, str, IO[Any]) -> None
stdout.write(makerdf(workflow, wf, ctx).serialize(format=sr))

def lastpart(uri): # type: (Any) -> str
Expand Down Expand Up @@ -172,7 +176,7 @@ def dot_without_parameters(g, stdout): # type: (Graph, IO[Any]) -> None


def printdot(workflow, wf, ctx, stdout, include_parameters=False):
# type: (Union[str, unicode], Dict[str, Any], Loader.ContextType, Any, bool) -> None
# type: (Union[str, unicode], Union[List[Dict[unicode, Any]], Dict[unicode, Any]], Loader.ContextType, Any, bool) -> None
g = makerdf(workflow, wf, ctx)

stdout.write("digraph {")
Expand Down
20 changes: 10 additions & 10 deletions cwltool/draft2tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

class ExpressionTool(Process):
def __init__(self, toolpath_object, **kwargs):
# type: (Dict[str,List[None]], **Any) -> None
# type: (Dict[unicode, Any], **Any) -> None
super(ExpressionTool, self).__init__(toolpath_object, **kwargs)

class ExpressionJob(object):
Expand All @@ -53,7 +53,7 @@ def run(self, **kwargs): # type: (**Any) -> None
self.output_callback({}, "permanentFail")

def job(self, joborder, output_callback, **kwargs):
# type: (Dict[str,str], str, Callable[[Any, Any], Any], **Any) -> Generator[ExpressionTool.ExpressionJob, None, None]
# type: (Dict[unicode, unicode], Callable[[Any, Any], Any], **Any) -> Generator[ExpressionTool.ExpressionJob, None, None]
builder = self._init_job(joborder, **kwargs)

j = ExpressionTool.ExpressionJob()
Expand Down Expand Up @@ -113,14 +113,14 @@ def run(self, **kwargs):

class CommandLineTool(Process):
def __init__(self, toolpath_object, **kwargs):
# type: (Dict[str,Any], **Any) -> None
# type: (Dict[unicode, Any], **Any) -> None
super(CommandLineTool, self).__init__(toolpath_object, **kwargs)

def makeJobRunner(self): # type: () -> CommandLineJob
return CommandLineJob()

def makePathMapper(self, reffiles, **kwargs):
# type: (Set[str], str, **Any) -> PathMapper
# type: (Set[unicode], **Any) -> PathMapper
dockerReq, _ = self.get_requirement("DockerRequirement")
try:
if dockerReq and kwargs.get("use_container"):
Expand All @@ -132,7 +132,7 @@ def makePathMapper(self, reffiles, **kwargs):
raise WorkflowException(u"Missing input file %s" % e)

def job(self, joborder, output_callback, **kwargs):
# type: (Dict[str,str], str, Callable[..., Any], **Any) -> Generator[Union[CommandLineJob, CallbackJob], None, None]
# type: (Dict[unicode, unicode], Callable[..., Any], **Any) -> Generator[Union[CommandLineJob, CallbackJob], None, None]

jobname = uniquename(kwargs.get("name", shortname(self.tool.get("id", "job"))))

Expand All @@ -149,7 +149,7 @@ def job(self, joborder, output_callback, **kwargs):
if docker_req and kwargs.get("use_container") is not False:
dockerimg = docker_req.get("dockerImageId") or docker_req.get("dockerPull")
cmdline = ["docker", "run", dockerimg] + cmdline
keydict = {"cmdline": cmdline}
keydict = {u"cmdline": cmdline}

for _,f in cachebuilder.pathmapper.items():
st = os.stat(f[0])
Expand Down Expand Up @@ -200,7 +200,7 @@ def rm_pending_output_callback(output_callback, jobcachepending,

builder = self._init_job(joborder, **kwargs)

reffiles = set((f["path"] for f in builder.files))
reffiles = set((f[u"path"] for f in builder.files))

j = self.makeJobRunner()
j.builder = builder
Expand Down Expand Up @@ -292,9 +292,9 @@ def _check_adjust(f): # type: (Dict[str,Any]) -> Dict[str,Any]
yield j

def collect_output_ports(self, ports, builder, outdir):
# type: (Set[Dict[str,Any]], Builder, str) -> Dict[str,Union[str,List[Any],Dict[str,Any]]]
# type: (Set[Dict[str,Any]], Builder, str) -> Dict[unicode, Union[unicode, List[Any], Dict[unicode, Any]]]
try:
ret = {} # type: Dict[str,Union[str,List[Any],Dict[str,Any]]]
ret = {} # type: Dict[unicode, Union[unicode, List[Any], Dict[unicode, Any]]]
custom_output = os.path.join(outdir, "cwl.output.json")
if builder.fs_access.exists(custom_output):
with builder.fs_access.open(custom_output, "r") as f:
Expand Down Expand Up @@ -323,7 +323,7 @@ def collect_output_ports(self, ports, builder, outdir):
raise WorkflowException("Error validating output record, " + str(e) + "\n in " + json.dumps(ret, indent=4))

def collect_output(self, schema, builder, outdir):
# type: (Dict[str,Any], Builder, str) -> Union[Dict[str, Any], List[Union[Dict[str, Any], str]]]
# type: (Dict[str,Any], Builder, str) -> Union[Dict[unicode, Any], List[Union[Dict[unicode, Any], unicode]]]
r = [] # type: List[Any]
if "outputBinding" in schema:
binding = schema["outputBinding"]
Expand Down
6 changes: 3 additions & 3 deletions cwltool/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def jshead(engineConfig, rootvars):
return u"\n".join(engineConfig + [u"var %s = %s;" % (k, json.dumps(v, indent=4)) for k, v in rootvars.items()])

def exeval(ex, jobinput, requirements, outdir, tmpdir, context, pull_image):
# type: (Dict[str,Any], Dict[str,str], List[Dict[str, Any]], str, str, Any, bool) -> sandboxjs.JSON
# type: (Dict[str, Any], Dict[unicode, Union[Dict, List, unicode]], List[Dict[str, Any]], str, str, Any, bool) -> sandboxjs.JSON

if ex["engine"] == "https://w3id.org/cwl/cwl#JavascriptEngine":
engineConfig = [] # type: List[unicode]
Expand Down Expand Up @@ -126,7 +126,7 @@ def param_interpolate(ex, obj, strip=True):

def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources,
context=None, pull_image=True, timeout=None):
# type: (Any, Dict[str,str], List[Dict[str,Any]], str, str, Dict[str, Union[int, str]], Any, bool, int) -> Any
# type: (Union[dict, unicode], Dict[unicode, Union[Dict, List, unicode]], List[Dict[str, Any]], str, str, Dict[str, Union[int, str]], Any, bool, int) -> Any

runtime = resources.copy()
runtime["tmpdir"] = tmpdir
Expand All @@ -140,7 +140,7 @@ def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources,

if isinstance(ex, dict) and "engine" in ex and "script" in ex:
return exeval(ex, jobinput, requirements, outdir, tmpdir, context, pull_image)
if isinstance(ex, basestring):
if isinstance(ex, (str, unicode)):
for r in requirements:
if r["class"] == "InlineJavascriptRequirement":
return sandboxjs.interpolate(str(ex), jshead(r.get("expressionLib", []), rootvars),
Expand Down
2 changes: 1 addition & 1 deletion cwltool/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class CommandLineJob(object):

def __init__(self): # type: () -> None
self.builder = None # type: Builder
self.joborder = None # type: Dict[str,str]
self.joborder = None # type: Dict[unicode, Union[Dict[unicode, Any], List, unicode]]
self.stdin = None # type: str
self.stdout = None # type: str
self.successCodes = None # type: Iterable[int]
Expand Down
19 changes: 11 additions & 8 deletions cwltool/load_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def fetch_document(argsworkflow):

def validate_document(document_loader, workflowobj, uri,
enable_dev=False, strict=True, preprocess_only=False):
# type: (Loader, Dict[unicode, Any], unicode, bool, bool, bool) -> Tuple[Loader, Names, Any, Dict[str, str], unicode]
# type: (Loader, Dict[unicode, Any], unicode, bool, bool, bool) -> Tuple[Loader, Names, Union[Dict[unicode, Any], List[Dict[unicode, Any]]], Dict[unicode, Any], unicode]
"""Validate a CWL document."""
jobobj = None
if "cwl:tool" in workflowobj:
Expand Down Expand Up @@ -83,31 +83,34 @@ def validate_document(document_loader, workflowobj, uri,

workflowobj["id"] = fileuri
processobj, metadata = document_loader.resolve_all(workflowobj, fileuri)
if not isinstance(processobj, (dict, list)):
raise validate.ValidationException("Workflow must be a dict or list.")

if not metadata:
if not isinstance(processobj, dict):
raise validate.ValidationException(
"Draft-2 workflows must be a dict.")
metadata = {"$namespaces": processobj.get("$namespaces", {}),
"$schemas": processobj.get("$schemas", []),
"cwlVersion": processobj["cwlVersion"]}

if preprocess_only:
return document_loader, avsc_names, processobj, metadata, uri

document_loader.validate_links(processobj)
schema.validate_doc(avsc_names, processobj, document_loader, strict)

if metadata.get("cwlVersion") != update.LATEST:
processobj = update.update(
processobj, document_loader, fileuri, enable_dev, metadata)

if jobobj:
metadata["cwl:defaults"] = jobobj
metadata[u"cwl:defaults"] = jobobj

return document_loader, avsc_names, processobj, metadata, uri


def make_tool(document_loader, avsc_names, processobj, metadata, uri, makeTool,
kwargs):
# type: (Loader, Names, Dict[str, Any], Dict[str, Any], unicode, Callable[..., Process], Dict[str, Any]) -> Process
def make_tool(document_loader, avsc_names, metadata, uri, makeTool, kwargs):
# type: (Loader, Names, Dict[unicode, Any], unicode, Callable[..., Process], Dict[str, Any]) -> Process
"""Make a Python CWL object."""
resolveduri = document_loader.resolve_ref(uri)[0]

Expand All @@ -121,7 +124,7 @@ def make_tool(document_loader, avsc_names, processobj, metadata, uri, makeTool,
urlparse.urldefrag(i["id"])[1] for i in resolveduri
if "id" in i))
else:
processobj = cast(Dict[str, Any], resolveduri)
processobj = resolveduri

kwargs = kwargs.copy()
kwargs.update({
Expand Down Expand Up @@ -149,5 +152,5 @@ def load_tool(argsworkflow, makeTool, kwargs=None,
document_loader, avsc_names, processobj, metadata, uri = validate_document(
document_loader, workflowobj, uri, enable_dev=enable_dev,
strict=strict)
return make_tool(document_loader, avsc_names, processobj, metadata, uri,
return make_tool(document_loader, avsc_names, metadata, uri,
makeTool, kwargs if kwargs else {})
Loading