Skip to content

Commit 6991437

Browse files
author
Peter Amstutz
committed
Implement targeted "overrides" of requirements on specific tools and steps.
1 parent cc343f0 commit 6991437

File tree

10 files changed

+98
-26
lines changed

10 files changed

+98
-26
lines changed

cwltool/builder.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def __init__(self): # type: () -> None
4545
self.build_job_script = None # type: Callable[[List[str]], Text]
4646
self.debug = False # type: bool
4747
self.mutation_manager = None # type: MutationManager
48+
self.tool_id = None # type: Text
4849

4950
# One of "no_listing", "shallow_listing", "deep_listing"
5051
# Will be default "no_listing" for CWL v1.1

cwltool/draft2tool.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ def job(self,
8080
j.hints = self.hints
8181
j.outdir = None
8282
j.tmpdir = None
83+
j.tool_id = self.tool["id"]
8384

8485
yield j
8586

@@ -175,9 +176,9 @@ def __init__(self, toolpath_object, **kwargs):
175176
# type: (Dict[Text, Any], **Any) -> None
176177
super(CommandLineTool, self).__init__(toolpath_object, **kwargs)
177178

178-
def makeJobRunner(self, use_container=True): # type: (Optional[bool]) -> JobBase
179-
dockerReq, _ = self.get_requirement("DockerRequirement")
180-
if dockerReq and use_container:
179+
def makeJobRunner(self, **kwargs): # type: (Optional[bool]) -> JobBase
180+
dockerReq, _ = self.get_requirement("DockerRequirement", kwargs)
181+
if dockerReq and kwargs.get("use_container", True):
181182
return DockerCommandLineJob()
182183
else:
183184
for t in reversed(self.requirements):
@@ -215,7 +216,7 @@ def job(self,
215216
("File", "Directory"), _check_adjust)
216217

217218
cmdline = flatten(map(cachebuilder.generate_arg, cachebuilder.bindings))
218-
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
219+
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement", kwargs)
219220
if docker_req and kwargs.get("use_container") is not False:
220221
dockerimg = docker_req.get("dockerImageId") or docker_req.get("dockerPull")
221222
cmdline = ["docker", "run", dockerimg] + cmdline
@@ -276,7 +277,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
276277

277278
reffiles = copy.deepcopy(builder.files)
278279

279-
j = self.makeJobRunner(kwargs.get("use_container"))
280+
j = self.makeJobRunner(**kwargs)
280281
j.builder = builder
281282
j.joborder = builder.job
282283
j.stdin = None
@@ -288,6 +289,8 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
288289
j.requirements = self.requirements
289290
j.hints = self.hints
290291
j.name = jobname
292+
j.tool_id = self.tool["id"]
293+
j.overrides = kwargs.get("overrides", [])
291294

292295
if _logger.isEnabledFor(logging.DEBUG):
293296
_logger.debug(u"[job %s] initializing from %s%s",
@@ -332,7 +335,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
332335
if _logger.isEnabledFor(logging.DEBUG):
333336
_logger.debug(u"[job %s] command line bindings is %s", j.name, json.dumps(builder.bindings, indent=4))
334337

335-
dockerReq = self.get_requirement("DockerRequirement")[0]
338+
dockerReq = self.get_requirement("DockerRequirement", kwargs)[0]
336339
if dockerReq and kwargs.get("use_container"):
337340
out_prefix = kwargs.get("tmp_outdir_prefix")
338341
j.outdir = kwargs.get("outdir") or tempfile.mkdtemp(prefix=out_prefix)
@@ -344,7 +347,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
344347
j.tmpdir = builder.tmpdir
345348
j.stagedir = builder.stagedir
346349

347-
initialWorkdir = self.get_requirement("InitialWorkDirRequirement")[0]
350+
initialWorkdir = self.get_requirement("InitialWorkDirRequirement", kwargs)[0]
348351
j.generatefiles = {"class": "Directory", "listing": [], "basename": ""}
349352
if initialWorkdir:
350353
ls = [] # type: List[Dict[Text, Any]]
@@ -380,7 +383,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
380383
ls[i] = t["entry"]
381384
j.generatefiles[u"listing"] = ls
382385

383-
inplaceUpdateReq = self.get_requirement("http://commonwl.org/cwltool#InplaceUpdateRequirement")[0]
386+
inplaceUpdateReq = self.get_requirement("http://commonwl.org/cwltool#InplaceUpdateRequirement", kwargs)[0]
384387

385388
if inplaceUpdateReq:
386389
j.inplace_update = inplaceUpdateReq["inplaceUpdate"]
@@ -414,12 +417,12 @@ def register_reader(f):
414417
adjustDirObjs(builder.bindings, register_reader)
415418

416419
j.environment = {}
417-
evr = self.get_requirement("EnvVarRequirement")[0]
420+
evr = self.get_requirement("EnvVarRequirement", kwargs)[0]
418421
if evr:
419422
for t in evr["envDef"]:
420423
j.environment[t["envName"]] = builder.do_eval(t["envValue"])
421424

422-
shellcmd = self.get_requirement("ShellCommandRequirement")[0]
425+
shellcmd = self.get_requirement("ShellCommandRequirement", kwargs)[0]
423426
if shellcmd:
424427
cmd = [] # type: List[Text]
425428
for b in builder.bindings:

cwltool/job.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ def __init__(self): # type: () -> None
127127
self.generatefiles = None # type: Dict[Text, Union[List[Dict[Text, Text]], Dict[Text, Text], Text]]
128128
self.stagedir = None # type: Text
129129
self.inplace_update = None # type: bool
130+
self.overrides = None # type: List[Dict[Text,Dict[Text, Text]]]
130131

131132
def _setup(self): # type: () -> None
132133
if not os.path.exists(self.outdir):
@@ -149,7 +150,7 @@ def _setup(self): # type: () -> None
149150
def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move"):
150151
# type: (List[Text], MutableMapping[Text, Text], bool, Text) -> None
151152

152-
scr, _ = get_feature(self, "ShellCommandRequirement")
153+
scr, _ = get_feature(self, "ShellCommandRequirement", {})
153154

154155
shouldquote = None # type: Callable[[Any], Any]
155156
if scr:
@@ -320,7 +321,7 @@ def run(self, pull_image=True, rm_container=True,
320321
rm_tmpdir=True, move_outputs="move", **kwargs):
321322
# type: (bool, bool, bool, Text, **Any) -> Union[Tuple[Text, Dict[None, None]], None]
322323

323-
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
324+
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement", kwargs)
324325

325326
img_id = None
326327
env = None # type: MutableMapping[Text, Text]

cwltool/load_tool.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,27 @@
2626

2727
jobloaderctx = {
2828
u"cwl": "https://w3id.org/cwl/cwl#",
29+
u"cwltool": "http://commonwl.org/cwltool#",
2930
u"path": {u"@type": u"@id"},
3031
u"location": {u"@type": u"@id"},
3132
u"format": {u"@type": u"@id"},
3233
u"id": u"@id"
3334
}
3435

36+
overrides_ctx = {
37+
u"overrideTarget": {u"@type": u"@id"},
38+
u"cwltool": "http://commonwl.org/cwltool#",
39+
u"overrides": {
40+
"@id": "cwltool:overrides",
41+
"mapSubject": "overrideTarget",
42+
"mapPredicate": "override"
43+
},
44+
u"override": {
45+
"@id": "cwltool:override",
46+
"mapSubject": "class"
47+
}
48+
}
49+
3550
def fetch_document(argsworkflow, # type: Union[Text, dict[Text, Any]]
3651
resolver=None, # type: Callable[[Loader, Union[Text, dict[Text, Any]]], Text]
3752
fetcher_constructor=None
@@ -276,3 +291,12 @@ def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]]
276291
strict=strict, fetcher_constructor=fetcher_constructor)
277292
return make_tool(document_loader, avsc_names, metadata, uri,
278293
makeTool, kwargs if kwargs else {})
294+
295+
def resolve_overrides(ov, baseurl):
296+
ovloader = Loader(overrides_ctx)
297+
ret, _ = ovloader.resolve_all(ov, baseurl)
298+
return ret["overrides"]
299+
300+
def load_overrides(ov, baseurl):
301+
ovloader = Loader(overrides_ctx)
302+
return resolve_overrides(ovloader.fetch(ov), baseurl)

cwltool/main.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from . import draft2tool, workflow
2323
from .cwlrdf import printdot, printrdf
2424
from .errors import UnsupportedRequirement, WorkflowException
25-
from .load_tool import fetch_document, make_tool, validate_document, jobloaderctx
25+
from .load_tool import fetch_document, make_tool, validate_document, jobloaderctx, resolve_overrides, load_overrides
2626
from .mutation import MutationManager
2727
from .pack import pack
2828
from .pathmapper import (adjustDirObjs, adjustFileObjs, get_listing,
@@ -196,6 +196,9 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
196196
default=False, help="Relax requirements on path names to permit "
197197
"spaces and hash characters.", dest="relax_path_checks")
198198

199+
parser.add_argument("--overrides", type=str,
200+
default=[], help="Read process requirement overrides from file.")
201+
199202
parser.add_argument("workflow", type=Text, nargs="?", default=None)
200203
parser.add_argument("job_order", nargs=argparse.REMAINDER)
201204

@@ -224,14 +227,18 @@ def output_callback(out, processStatus):
224227
output_dirs.add(kwargs["outdir"])
225228
kwargs["mutation_manager"] = MutationManager()
226229

227-
jobReqs = None
230+
jobReqs = []
228231
if "cwl:requirements" in job_order_object:
229232
jobReqs = job_order_object["cwl:requirements"]
233+
del job_order_object["cwl:requirements"]
230234
elif ("cwl:defaults" in t.metadata and "cwl:requirements" in t.metadata["cwl:defaults"]):
231235
jobReqs = t.metadata["cwl:defaults"]["cwl:requirements"]
232-
if jobReqs:
233-
for req in jobReqs:
234-
t.requirements.append(req)
236+
237+
if "http://commonwl.org/cwltool#overrides" in job_order_object:
238+
kwargs["overrides"] = resolve_overrides(job_order_object, t.tool["id"])
239+
del job_order_object["http://commonwl.org/cwltool#overrides"]
240+
241+
kwargs["requirements"] = jobReqs
235242

236243
if kwargs.get("default_container"):
237244
t.requirements.insert(0, {
@@ -644,7 +651,8 @@ def main(argsl=None, # type: List[str]
644651
'relax_path_checks': False,
645652
'validate': False,
646653
'enable_ga4gh_tool_registry': False,
647-
'ga4gh_tool_registries': []
654+
'ga4gh_tool_registries': [],
655+
'overrides': []
648656
}.iteritems():
649657
if not hasattr(args, k):
650658
setattr(args, k, v)
@@ -775,6 +783,9 @@ def main(argsl=None, # type: List[str]
775783
if isinstance(job_order_object, int):
776784
return job_order_object
777785

786+
if args.overrides:
787+
args.overrides = load_overrides(file_uri(os.path.abspath(args.overrides)), tool.tool["id"])
788+
778789
try:
779790
setattr(args, 'basedir', job_order_object[1])
780791
del args.workflow
@@ -830,8 +841,8 @@ def locToPath(p):
830841

831842
finally:
832843
_logger.removeHandler(stderr_handler)
833-
_logger.addHandler(defaultStreamHandler)
834-
844+
_logger.addHandler(defaultStreamHandler
845+
)
835846

836847
if __name__ == "__main__":
837848
sys.exit(main(sys.argv[1:]))

cwltool/process.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,8 @@ def __init__(self, toolpath_object, **kwargs):
426426
self.requirements = kwargs.get("requirements", []) + self.tool.get("requirements", [])
427427
self.hints = kwargs.get("hints", []) + self.tool.get("hints", [])
428428
self.formatgraph = None # type: Graph
429+
self.tool_id = self.tool["id"]
430+
429431
if "loader" in kwargs:
430432
self.formatgraph = kwargs["loader"].graph
431433

@@ -532,11 +534,11 @@ def _init_job(self, joborder, **kwargs):
532534
builder.debug = kwargs.get("debug")
533535
builder.mutation_manager = kwargs.get("mutation_manager")
534536

535-
dockerReq, is_req = self.get_requirement("DockerRequirement")
537+
dockerReq, is_req = self.get_requirement("DockerRequirement", kwargs)
536538
builder.make_fs_access = kwargs.get("make_fs_access") or StdFsAccess
537539
builder.fs_access = builder.make_fs_access(kwargs["basedir"])
538540

539-
loadListingReq, _ = self.get_requirement("http://commonwl.org/cwltool#LoadListingRequirement")
541+
loadListingReq, _ = self.get_requirement("http://commonwl.org/cwltool#LoadListingRequirement", kwargs)
540542
if loadListingReq:
541543
builder.loadListing = loadListingReq.get("loadListing")
542544

@@ -602,7 +604,7 @@ def _init_job(self, joborder, **kwargs):
602604

603605
def evalResources(self, builder, kwargs):
604606
# type: (Builder, Dict[AnyStr, Any]) -> Dict[Text, Union[int, Text]]
605-
resourceReq, _ = self.get_requirement("ResourceRequirement")
607+
resourceReq, _ = self.get_requirement("ResourceRequirement", kwargs)
606608
if resourceReq is None:
607609
resourceReq = {}
608610
request = {
@@ -655,8 +657,8 @@ def validate_hints(self, avsc_names, hints, strict):
655657
else:
656658
_logger.info(sl.makeError(u"Unknown hint %s" % (r["class"])))
657659

658-
def get_requirement(self, feature): # type: (Any) -> Tuple[Any, bool]
659-
return get_feature(self, feature)
660+
def get_requirement(self, feature, kwargs={}): # type: (Any) -> Tuple[Any, bool]
661+
return get_feature(self, feature, kwargs)
660662

661663
def visit(self, op): # type: (Callable[[Dict[Text, Any]], None]) -> None
662664
op(self.tool)

cwltool/utils.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@ def aslist(l): # type: (Any) -> List[Any]
1010
return [l]
1111

1212

13-
def get_feature(self, feature): # type: (Any, Any) -> Tuple[Any, bool]
13+
def get_feature(self, feature, kwargs): # type: (Any, Any, Any) -> Tuple[Any, bool]
14+
for ov in reversed(kwargs.get("overrides", [])):
15+
if ov.get("overrideTarget") == self.tool_id:
16+
for t in ov.get("override", []):
17+
if t["class"] == feature:
18+
return (t, True)
1419
for t in reversed(self.requirements):
1520
if t["class"] == feature:
1621
return (t, True)

tests/test_ext.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,10 @@ def test_updatedir_inplace(self):
141141
finally:
142142
shutil.rmtree(tmp)
143143
shutil.rmtree(out)
144+
145+
class TestOverride(unittest.TestCase):
146+
def test_job_override(self):
147+
self.assertEquals(main([get_data('tests/wf/revsort.cwl'), get_data('tests/wf/revsort-ovr-job.json')]), 0)
148+
149+
def test_cmdline_override(self):
150+
self.assertEquals(main(["--overrides", "overrides.json", get_data('tests/wf/revsort.cwl'), get_data('tests/wf/revsort-job.json')]), 0)

tests/wf/overrides.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"cwltool:overrides": {
2+
"revtool.cwl": [{
3+
"class": "DockerRequirement",
4+
"dockerPull": "ubuntu:14.04"
5+
}]
6+
}

tests/wf/revsort-ovr-job.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"input": {
3+
"class": "File",
4+
"location": "whale.txt"
5+
},
6+
"cwltool:overrides": {
7+
"revtool.cwl": [{
8+
"class": "DockerRequirement",
9+
"dockerPull": "ubuntu:14.04"
10+
}]
11+
}
12+
}

0 commit comments

Comments
 (0)