Skip to content

Packing refactor #177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ eggs/
.eggs/
*.egg-info/
*.egg
.tox/

# Editor Temps
.*.sw?
Expand Down
70 changes: 3 additions & 67 deletions cwltool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from . import draft2tool
from .builder import adjustFileObjs, adjustDirObjs
from .stdfsaccess import StdFsAccess
from .pack import pack

_logger = logging.getLogger("cwltool")

Expand Down Expand Up @@ -495,74 +496,9 @@ def makeRelative(ob):

stdout.write(json.dumps(deps, indent=4))

def flatten_deps(d, files): # type: (Any, Set[Text]) -> None
if isinstance(d, list):
for s in d:
flatten_deps(s, files)
elif isinstance(d, dict):
files.add(d["location"])
if "secondaryFiles" in d:
flatten_deps(d["secondaryFiles"], files)

def find_run(d, runs): # type: (Any, Set[Text]) -> None
if isinstance(d, list):
for s in d:
find_run(s, runs)
elif isinstance(d, dict):
if "run" in d and isinstance(d["run"], (Text, Text)):
runs.add(d["run"])
for s in d.values():
find_run(s, runs)

def replace_refs(d, rewrite, stem, newstem):
# type: (Any, Dict[Text, Text], Text, Text) -> None
if isinstance(d, list):
for s,v in enumerate(d):
if isinstance(v, (str, Text)) and v.startswith(stem):
d[s] = newstem + v[len(stem):]
else:
replace_refs(v, rewrite, stem, newstem)
elif isinstance(d, dict):
if "run" in d and isinstance(d["run"], (str, Text)):
d["run"] = rewrite[d["run"]]
for s,v in d.items():
if isinstance(v, (str, Text)) and v.startswith(stem):
d[s] = newstem + v[len(stem):]
replace_refs(v, rewrite, stem, newstem)

def print_pack(document_loader, processobj, uri, metadata):
# type: (Loader, Any, Text, Dict[Text, Text]) -> Text
def loadref(b, u):
# type: (Text, Text) -> Union[Dict, List, Text]
return document_loader.resolve_ref(u, base_url=b)[0]
deps = scandeps(uri, processobj, set(("run",)), set(), loadref)

fdeps = set((uri,))
flatten_deps(deps, fdeps)

runs = set() # type: Set[Text]
for f in fdeps:
find_run(document_loader.idx[f], runs)

rewrite = {}
if isinstance(processobj, list):
for p in processobj:
rewrite[p["id"]] = "#" + shortname(p["id"])
else:
rewrite[uri] = "#main"

for r in runs:
rewrite[r] = "#" + shortname(r)

packed = {"$graph": [], "cwlVersion": metadata["cwlVersion"]
} # type: Dict[Text, Any]
for r,v in rewrite.items():
dc = cast(Dict[Text, Any], copy.deepcopy(document_loader.idx[r]))
dc["id"] = v
dc["name"] = v
replace_refs(dc, rewrite, r+"/" if "#" in r else r+"#", v+"/")
packed["$graph"].append(dc)

# type: (Loader, Union[Dict[unicode, Any], List[Dict[unicode, Any]]], unicode, Dict[unicode, Any]) -> str
packed = pack(document_loader, processobj, uri, metadata)
if len(packed["$graph"]) > 1:
return json.dumps(packed, indent=4)
else:
Expand Down
84 changes: 84 additions & 0 deletions cwltool/pack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import copy
import json

from schema_salad.ref_resolver import Loader

from .process import scandeps, shortname

from typing import Union, Any, cast, Callable, Dict, Tuple, Type, IO, Text

def flatten_deps(d, files): # type: (Any, Set[Text]) -> None
if isinstance(d, list):
for s in d:
flatten_deps(s, files)
elif isinstance(d, dict):
files.add(d["location"])
if "secondaryFiles" in d:
flatten_deps(d["secondaryFiles"], files)

def find_run(d, runs): # type: (Any, Set[Text]) -> None
if isinstance(d, list):
for s in d:
find_run(s, runs)
elif isinstance(d, dict):
if "run" in d and isinstance(d["run"], (str, unicode)):
runs.add(d["run"])
for s in d.values():
find_run(s, runs)

def replace_refs(d, rewrite, stem, newstem):
# type: (Any, Dict[Text, Text], Text, Text) -> None
if isinstance(d, list):
for s,v in enumerate(d):
if isinstance(v, (str, unicode)) and v.startswith(stem):
d[s] = newstem + v[len(stem):]
else:
replace_refs(v, rewrite, stem, newstem)
elif isinstance(d, dict):
if "package" in d:
raise Exception("where the fuck did this come from %s" % json.dumps(d, indent=4))
if "run" in d and isinstance(d["run"], (str, unicode)):
d["run"] = rewrite[d["run"]]
for s,v in d.items():
if isinstance(v, (str, unicode)) and v.startswith(stem):
d[s] = newstem + v[len(stem):]
replace_refs(v, rewrite, stem, newstem)

def pack(document_loader, processobj, uri, metadata):
# type: (Loader, Union[Dict[Text, Any], List[Dict[Text, Any]]], Text, Dict[Text, Text]) -> Dict[Text, Any]
def loadref(b, u):
# type: (Text, Text) -> Union[Dict, List, Text]
return document_loader.resolve_ref(u, base_url=b)[0]
deps = scandeps(uri, processobj, set(("run",)), set(), loadref)

fdeps = set((uri,))
flatten_deps(deps, fdeps)

runs = set() # type: Set[Text]
for f in fdeps:
find_run(document_loader.idx[f], runs)

rewrite = {}
if isinstance(processobj, list):
for p in processobj:
rewrite[p["id"]] = "#" + shortname(p["id"])
else:
rewrite[uri] = "#main"

for r in runs:
rewrite[r] = "#" + shortname(r)

packed = {"$graph": [], "cwlVersion": metadata["cwlVersion"]
} # type: Dict[Text, Any]

for r in sorted(rewrite.keys()):
v = rewrite[r]
dc = cast(Dict[Text, Any], copy.deepcopy(document_loader.idx[r]))
dc["id"] = v
for n in ("name", "package", "cwlVersion"):
if n in dc:
del dc[n]
replace_refs(dc, rewrite, r+"/" if "#" in r else r+"#", v+"/")
packed["$graph"].append(dc)

return packed
30 changes: 30 additions & 0 deletions cwltool/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,17 @@ class Process(object):

def __init__(self, toolpath_object, **kwargs):
# type: (Dict[Text, Any], **Any) -> None
"""
kwargs:

metadata: tool document metadata
requirements: inherited requirements
hints: inherited hints
loader: schema_salad.ref_resolver.Loader used to load tool document
avsc_names: CWL Avro schema object used to validate document
strict: flag to determine strict validation (fail on unrecognized fields)
"""

self.metadata = kwargs.get("metadata", {}) # type: Dict[Text,Any]
self.names = None # type: avro.schema.Names

Expand All @@ -338,6 +349,9 @@ def __init__(self, toolpath_object, **kwargs):
if "loader" in kwargs:
self.formatgraph = kwargs["loader"].graph

self.doc_loader = kwargs["loader"]
self.doc_schema = kwargs["avsc_names"]

checkRequirements(self.tool, supportedProcessRequirements)
self.validate_hints(kwargs["avsc_names"], self.tool.get("hints", []),
strict=kwargs.get("strict"))
Expand Down Expand Up @@ -395,6 +409,22 @@ def __init__(self, toolpath_object, **kwargs):

def _init_job(self, joborder, **kwargs):
# type: (Dict[Text, Text], **Any) -> Builder
"""
kwargs:

eval_timeout: javascript evaluation timeout
use_container: do/don't use Docker when DockerRequirement hint provided
make_fs_access: make an FsAccess() object with given basedir
basedir: basedir for FsAccess
docker_outdir: output directory inside docker for this job
docker_tmpdir: tmpdir inside docker for this job
docker_stagedir: stagedir inside docker for this job
outdir: outdir on host for this job
tmpdir: tmpdir on host for this job
stagedir: stagedir on host for this job
select_resources: callback to select compute resources
"""

builder = Builder()
builder.job = cast(Dict[Text, Union[Dict[Text, Any], List,
Text]], copy.deepcopy(joborder))
Expand Down
1 change: 1 addition & 0 deletions cwltool/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ def __init__(self, toolpath_object, pos, **kwargs):
u"Tool definition %s failed validation:\n%s" %
(toolpath_object["run"], validate.indent(str(v))))

self.tool = toolpath_object = copy.deepcopy(toolpath_object)
for stepfield, toolfield in (("in", "inputs"), ("out", "outputs")):
toolpath_object[toolfield] = []
for step_entry in toolpath_object[stepfield]:
Expand Down
17 changes: 17 additions & 0 deletions tests/test_pack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import unittest
import json
from cwltool.load_tool import fetch_document, validate_document
import cwltool.pack
import cwltool.workflow

class TestPack(unittest.TestCase):
def test_pack(self):
self.maxDiff = None

document_loader, workflowobj, uri = fetch_document("tests/wf/revsort.cwl")
document_loader, avsc_names, processobj, metadata, uri = validate_document(
document_loader, workflowobj, uri)
packed = cwltool.pack.pack(document_loader, processobj, uri, metadata)
with open("tests/wf/expect_packed.cwl") as f:
expect_packed = json.load(f)
self.assertEqual(expect_packed, packed)
125 changes: 125 additions & 0 deletions tests/wf/expect_packed.cwl
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
{
"cwlVersion": "v1.0",
"$graph": [
{
"inputs": [
{
"doc": "The input file to be processed.",
"type": "File",
"id": "#main/input"
},
{
"default": true,
"doc": "If true, reverse (decending) sort",
"type": "boolean",
"id": "#main/reverse_sort"
}
],
"doc": "Reverse the lines in a document, then sort those lines.",
"class": "Workflow",
"steps": [
{
"out": [
"#main/rev/output"
],
"run": "#revtool.cwl",
"id": "#main/rev",
"in": [
{
"source": "#main/input",
"id": "#main/rev/input"
}
]
},
{
"out": [
"#main/sorted/output"
],
"run": "#sorttool.cwl",
"id": "#main/sorted",
"in": [
{
"source": "#main/rev/output",
"id": "#main/sorted/input"
},
{
"source": "#main/reverse_sort",
"id": "#main/sorted/reverse"
}
]
}
],
"outputs": [
{
"outputSource": "#main/sorted/output",
"type": "File",
"id": "#main/output",
"doc": "The output with the lines reversed and sorted."
}
],
"id": "#main",
"hints": [
{
"dockerPull": "debian:8",
"class": "DockerRequirement"
}
]
},
{
"inputs": [
{
"inputBinding": {},
"type": "File",
"id": "#revtool.cwl/input"
}
],
"stdout": "output.txt",
"doc": "Reverse each line using the `rev` command",
"baseCommand": "rev",
"class": "CommandLineTool",
"outputs": [
{
"outputBinding": {
"glob": "output.txt"
},
"type": "File",
"id": "#revtool.cwl/output"
}
],
"id": "#revtool.cwl"
},
{
"inputs": [
{
"inputBinding": {
"position": 1,
"prefix": "--reverse"
},
"type": "boolean",
"id": "#sorttool.cwl/reverse"
},
{
"inputBinding": {
"position": 2
},
"type": "File",
"id": "#sorttool.cwl/input"
}
],
"stdout": "output.txt",
"doc": "Sort lines using the `sort` command",
"baseCommand": "sort",
"class": "CommandLineTool",
"outputs": [
{
"outputBinding": {
"glob": "output.txt"
},
"type": "File",
"id": "#sorttool.cwl/output"
}
],
"id": "#sorttool.cwl"
}
]
}
6 changes: 6 additions & 0 deletions tests/wf/revsort-job.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"input": {
"class": "File",
"location": "whale.txt"
}
}
Loading