Skip to content

Commit 8dd0cf3

Browse files
author
Peter Amstutz
committed
Add --pack feature, combine CWL workflow into single graph with tool dependencies.
1 parent f1901b9 commit 8dd0cf3

File tree

3 files changed

+88
-10
lines changed

3 files changed

+88
-10
lines changed

cwltool/load_tool.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import schema_salad.schema as schema
1010
from . import update
1111
from . import process
12+
from .errors import WorkflowException
1213

1314
_logger = logging.getLogger("cwltool")
1415

@@ -54,6 +55,7 @@ def validate_document(document_loader, workflowobj, uri,
5455
if "cwlVersion" in workflowobj:
5556
workflowobj["cwlVersion"] = re.sub(r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "", workflowobj["cwlVersion"])
5657
else:
58+
_logger.warn("No cwlVersion found, treating this file as draft-2.")
5759
workflowobj["cwlVersion"] = "draft-2"
5860

5961
if workflowobj["cwlVersion"] == "draft-2":
@@ -70,17 +72,17 @@ def validate_document(document_loader, workflowobj, uri,
7072
workflowobj["id"] = fileuri
7173
processobj, metadata = document_loader.resolve_all(workflowobj, fileuri)
7274

75+
if not metadata:
76+
metadata = {"$namespaces": processobj.get("$namespaces", {}),
77+
"$schemas": processobj.get("$schemas", []),
78+
"cwlVersion": processobj["cwlVersion"]}
79+
7380
if preprocess_only:
7481
return document_loader, avsc_names, processobj, metadata, uri
7582

7683
document_loader.validate_links(processobj)
7784
schema.validate_doc(avsc_names, processobj, document_loader, strict)
7885

79-
if not metadata:
80-
metadata = {"$namespaces": processobj.get("$namespaces", {}),
81-
"$schemas": processobj.get("$schemas", []),
82-
"cwlVersion": processobj["cwlVersion"]}
83-
8486
if metadata.get("cwlVersion") != update.latest:
8587
processobj = update.update(processobj, document_loader, fileuri, enable_dev, metadata)
8688

cwltool/main.py

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import os
99
import sys
1010
import logging
11+
import copy
1112
from . import workflow
1213
from .errors import WorkflowException
1314
import schema_salad.validate as validate
@@ -116,6 +117,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
116117
exgroup.add_argument("--print-pre", action="store_true", help="Print CWL document after preprocessing.")
117118
exgroup.add_argument("--print-deps", action="store_true", help="Print CWL document dependencies.")
118119
exgroup.add_argument("--print-input-deps", action="store_true", help="Print input object document dependencies.")
120+
exgroup.add_argument("--pack", action="store_true", help="Combine components into single document and print.")
119121
exgroup.add_argument("--version", action="store_true", help="Print version and exit")
120122

121123
exgroup = parser.add_mutually_exclusive_group()
@@ -411,6 +413,76 @@ def makeRelative(u):
411413

412414
stdout.write(json.dumps(deps, indent=4))
413415

416+
def flatten_deps(d, files):
417+
if isinstance(d, list):
418+
for s in d:
419+
flatten_deps(s, files)
420+
elif isinstance(d, dict):
421+
files.add(d["path"])
422+
if "secondaryFiles" in d:
423+
flatten_deps(d["secondaryFiles"], files)
424+
425+
def find_run(d, runs):
426+
if isinstance(d, list):
427+
for s in d:
428+
find_run(s, runs)
429+
elif isinstance(d, dict):
430+
if "run" in d and isinstance(d["run"], basestring):
431+
runs.add(d["run"])
432+
for s in d.values():
433+
find_run(s, runs)
434+
435+
def replace_refs(d, rewrite, stem, newstem):
436+
if isinstance(d, list):
437+
for s,v in enumerate(d):
438+
if isinstance(v, basestring) and v.startswith(stem):
439+
d[s] = newstem + v[len(stem):]
440+
else:
441+
replace_refs(v, rewrite, stem, newstem)
442+
elif isinstance(d, dict):
443+
if "run" in d and isinstance(d["run"], basestring):
444+
d["run"] = rewrite[d["run"]]
445+
for s,v in d.items():
446+
if isinstance(v, basestring) and v.startswith(stem):
447+
d[s] = newstem + v[len(stem):]
448+
replace_refs(v, rewrite, stem, newstem)
449+
450+
def print_pack(document_loader, processobj, uri, metadata):
451+
def loadref(b, u):
452+
return document_loader.resolve_ref(u, base_url=b)[0]
453+
deps = process.scandeps(uri, processobj,
454+
set(("run",)), set(), loadref)
455+
456+
fdeps = set((uri,))
457+
flatten_deps(deps, fdeps)
458+
459+
runs = set()
460+
for f in fdeps:
461+
find_run(document_loader.idx[f], runs)
462+
463+
rewrite = {}
464+
if isinstance(processobj, list):
465+
for p in processobj:
466+
rewrite[p["id"]] = "#" + shortname(p["id"])
467+
else:
468+
rewrite[uri] = "#main"
469+
470+
for r in runs:
471+
rewrite[r] = "#" + shortname(r)
472+
473+
packed = {"$graph": [], "cwlVersion": metadata["cwlVersion"]}
474+
for r,v in rewrite.items():
475+
dc = copy.deepcopy(document_loader.idx[r])
476+
dc["id"] = v
477+
dc["name"] = v
478+
replace_refs(dc, rewrite, r+"/" if "#" in r else r+"#", v+"/")
479+
packed["$graph"].append(dc)
480+
481+
if len(packed["$graph"]) > 1:
482+
return json.dumps(packed, indent=4)
483+
else:
484+
return json.dumps(packed["$graph"][0], indent=4)
485+
414486
def versionstring():
415487
# type: () -> unicode
416488
pkg = pkg_resources.require("cwltool")
@@ -470,7 +542,11 @@ def main(argsl=None,
470542
workflowobj, uri,
471543
enable_dev=args.enable_dev,
472544
strict=args.strict,
473-
preprocess_only=args.print_pre)
545+
preprocess_only=args.print_pre or args.pack)
546+
547+
if args.pack:
548+
stdout.write(print_pack(document_loader, processobj, uri, metadata))
549+
return 0
474550

475551
if args.print_pre:
476552
stdout.write(json.dumps(processobj, indent=4))

cwltool/process.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ def __init__(self, toolpath_object, **kwargs):
238238
self.formatgraph = kwargs["loader"].graph
239239

240240
checkRequirements(self.tool, supportedProcessRequirements)
241-
self.validate_hints(self.tool.get("hints", []), strict=kwargs.get("strict"))
241+
self.validate_hints(self.tool.get("hints", []), strict=kwargs.get("strict"), avsc_names=kwargs["avsc_names"])
242242

243243
self.schemaDefs = {} # type: Dict[str,Dict[unicode, Any]]
244244

@@ -401,12 +401,12 @@ def evalResources(self, builder, kwargs):
401401
"outdirSize": request["outdirMin"],
402402
}
403403

404-
def validate_hints(self, hints, strict):
404+
def validate_hints(self, hints, strict, avsc_names):
405405
# type: (List[Dict[str, Any]], bool) -> None
406406
for r in hints:
407407
try:
408-
if self.names.get_name(r["class"], "") is not None:
409-
validate.validate_ex(self.names.get_name(r["class"], ""), r, strict=strict)
408+
if avsc_names.get_name(r["class"], "") is not None:
409+
validate.validate_ex(avsc_names.get_name(r["class"], ""), r, strict=strict)
410410
else:
411411
_logger.info(str(validate.ValidationException(
412412
u"Unknown hint %s" % (r["class"]))))

0 commit comments

Comments
 (0)