Skip to content

Commit e99d687

Browse files
committed
Merge pull request #71 from common-workflow-language/cache-jobs
Cache jobs
2 parents 531358e + d32dd68 commit e99d687

File tree

4 files changed

+135
-40
lines changed

4 files changed

+135
-40
lines changed

cwltool/draft2tool.py

Lines changed: 92 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import json
33
import copy
44
from .flatten import flatten
5-
import functools
5+
from functools import partial
66
import os
77
from .pathmapper import PathMapper, DockerPathMapper
88
from .job import CommandLineJob
@@ -23,6 +23,8 @@
2323
import shellescape
2424
import errno
2525
from typing import Callable, Any, Union, Generator, cast
26+
import hashlib
27+
import shutil
2628

2729
_logger = logging.getLogger("cwltool")
2830

@@ -94,6 +96,20 @@ def revmap_file(builder, outdir, f):
9496
else:
9597
raise WorkflowException(u"Output file path %s must be within designated output directory (%s) or an input file pass through." % (f["path"], builder.outdir))
9698

99+
class CallbackJob(object):
100+
def __init__(self, job, output_callback, cachebuilder, jobcache):
101+
# type: (CommandLineTool, Callable[[Any, Any], Any], Builder, str) -> None
102+
self.job = job
103+
self.output_callback = output_callback
104+
self.cachebuilder = cachebuilder
105+
self.outdir = jobcache
106+
107+
def run(self, **kwargs):
108+
# type: (**Any) -> None
109+
self.output_callback(self.job.collect_output_ports(self.job.tool["outputs"],
110+
self.cachebuilder, self.outdir),
111+
"success")
112+
97113

98114
class CommandLineTool(Process):
99115
def __init__(self, toolpath_object, **kwargs):
@@ -116,34 +132,73 @@ def makePathMapper(self, reffiles, input_basedir, **kwargs):
116132
raise WorkflowException(u"Missing input file %s" % e)
117133

118134
def job(self, joborder, input_basedir, output_callback, **kwargs):
119-
# type: (Dict[str,str], str, Callable[[Any, Any], Any], **Any) -> Generator[CommandLineJob, None, None]
120-
builder = self._init_job(joborder, input_basedir, **kwargs)
121-
122-
if self.tool["baseCommand"]:
123-
for n, b in enumerate(aslist(self.tool["baseCommand"])):
124-
builder.bindings.append({
125-
"position": [-1000000, n],
126-
"valueFrom": b
127-
})
128-
129-
if self.tool.get("arguments"):
130-
for i, a in enumerate(self.tool["arguments"]):
131-
if isinstance(a, dict):
132-
a = copy.copy(a)
133-
if a.get("position"):
134-
a["position"] = [a["position"], i]
135-
else:
136-
a["position"] = [0, i]
137-
a["do_eval"] = a["valueFrom"]
138-
a["valueFrom"] = None
139-
builder.bindings.append(a)
135+
# type: (Dict[str,str], str, Callable[..., Any], **Any) -> Generator[Union[CommandLineJob, CallbackJob], None, None]
136+
137+
jobname = uniquename(kwargs.get("name", shortname(self.tool.get("id", "job"))))
138+
139+
if kwargs.get("cachedir"):
140+
cacheargs = kwargs.copy()
141+
cacheargs["outdir"] = "/out"
142+
cacheargs["tmpdir"] = "/tmp"
143+
cachebuilder = self._init_job(joborder, input_basedir, **cacheargs)
144+
cachebuilder.pathmapper = PathMapper(set((f["path"] for f in cachebuilder.files)),
145+
input_basedir)
146+
147+
cmdline = flatten(map(cachebuilder.generate_arg, cachebuilder.bindings))
148+
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
149+
if docker_req and kwargs.get("use_container") is not False:
150+
dockerimg = docker_req.get("dockerImageId") or docker_req.get("dockerPull")
151+
cmdline = ["docker", "run", dockerimg] + cmdline
152+
keydict = {"cmdline": cmdline}
153+
154+
for _,f in cachebuilder.pathmapper.items():
155+
st = os.stat(f[0])
156+
keydict[f[0]] = [st.st_size, int(st.st_mtime * 1000)]
157+
158+
interesting = {"DockerRequirement",
159+
"EnvVarRequirement",
160+
"CreateFileRequirement",
161+
"ShellCommandRequirement"}
162+
for rh in (self.requirements, self.hints):
163+
for r in reversed(rh):
164+
if r["class"] in interesting and r["class"] not in keydict:
165+
keydict[r["class"]] = r
166+
167+
keydictstr = json.dumps(keydict, separators=(',',':'), sort_keys=True)
168+
cachekey = hashlib.md5(keydictstr).hexdigest()
169+
170+
_logger.debug("[job %s] keydictstr is %s -> %s", jobname, keydictstr, cachekey)
171+
172+
jobcache = os.path.join(kwargs["cachedir"], cachekey)
173+
jobcachepending = jobcache + ".pending"
174+
175+
if os.path.isdir(jobcache) and not os.path.isfile(jobcachepending):
176+
if docker_req and kwargs.get("use_container") is not False:
177+
cachebuilder.outdir = kwargs.get("docker_outdir") or "/var/spool/cwl"
140178
else:
141-
builder.bindings.append({
142-
"position": [0, i],
143-
"valueFrom": a
144-
})
179+
cachebuilder.outdir = jobcache
145180

146-
builder.bindings.sort(key=lambda a: a["position"])
181+
_logger.info("[job %s] Using cached output in %s", jobname, jobcache)
182+
yield CallbackJob(self, output_callback, cachebuilder, jobcache)
183+
return
184+
else:
185+
_logger.info("[job %s] Output of job will be cached in %s", jobname, jobcache)
186+
shutil.rmtree(jobcache, True)
187+
os.makedirs(jobcache)
188+
kwargs["outdir"] = jobcache
189+
open(jobcachepending, "w").close()
190+
def rm_pending_output_callback(output_callback, jobcachepending,
191+
outputs, processStatus):
192+
if processStatus == "success":
193+
os.remove(jobcachepending)
194+
output_callback(outputs, processStatus)
195+
output_callback = cast(
196+
Callable[..., Any], # known bug in mypy
197+
# https://github.com/python/mypy/issues/797
198+
partial(rm_pending_output_callback, output_callback,
199+
jobcachepending))
200+
201+
builder = self._init_job(joborder, input_basedir, **kwargs)
147202

148203
reffiles = set((f["path"] for f in builder.files))
149204

@@ -157,7 +212,7 @@ def job(self, joborder, input_basedir, output_callback, **kwargs):
157212
j.permanentFailCodes = self.tool.get("permanentFailCodes")
158213
j.requirements = self.requirements
159214
j.hints = self.hints
160-
j.name = uniquename(kwargs.get("name", str(id(j))))
215+
j.name = jobname
161216

162217
_logger.debug(u"[job %s] initializing from %s%s",
163218
j.name,
@@ -195,7 +250,7 @@ def _check_adjust(f): # type: (Dict[str,Any]) -> Dict[str,Any]
195250

196251
_logger.debug(u"[job %s] command line bindings is %s", j.name, json.dumps(builder.bindings, indent=4))
197252

198-
dockerReq, _ = self.get_requirement("DockerRequirement")
253+
dockerReq = self.get_requirement("DockerRequirement")[0]
199254
if dockerReq and kwargs.get("use_container"):
200255
out_prefix = kwargs.get("tmp_outdir_prefix")
201256
j.outdir = kwargs.get("outdir") or tempfile.mkdtemp(prefix=out_prefix)
@@ -205,19 +260,19 @@ def _check_adjust(f): # type: (Dict[str,Any]) -> Dict[str,Any]
205260
j.outdir = builder.outdir
206261
j.tmpdir = builder.tmpdir
207262

208-
createFiles, _ = self.get_requirement("CreateFileRequirement")
263+
createFiles = self.get_requirement("CreateFileRequirement")[0]
209264
j.generatefiles = {}
210265
if createFiles:
211266
for t in createFiles["fileDef"]:
212267
j.generatefiles[builder.do_eval(t["filename"])] = copy.deepcopy(builder.do_eval(t["fileContent"]))
213268

214269
j.environment = {}
215-
evr, _ = self.get_requirement("EnvVarRequirement")
270+
evr = self.get_requirement("EnvVarRequirement")[0]
216271
if evr:
217272
for t in evr["envDef"]:
218273
j.environment[t["envName"]] = builder.do_eval(t["envValue"])
219274

220-
shellcmd, _ = self.get_requirement("ShellCommandRequirement")
275+
shellcmd = self.get_requirement("ShellCommandRequirement")[0]
221276
if shellcmd:
222277
cmd = [] # type: List[str]
223278
for b in builder.bindings:
@@ -230,7 +285,8 @@ def _check_adjust(f): # type: (Dict[str,Any]) -> Dict[str,Any]
230285
j.command_line = flatten(map(builder.generate_arg, builder.bindings))
231286

232287
j.pathmapper = builder.pathmapper
233-
j.collect_outputs = functools.partial(self.collect_output_ports, self.tool["outputs"], builder)
288+
j.collect_outputs = partial(
289+
self.collect_output_ports, self.tool["outputs"], builder)
234290
j.output_callback = output_callback
235291

236292
yield j
@@ -246,9 +302,9 @@ def collect_output_ports(self, ports, builder, outdir):
246302
_logger.debug(u"Raw output from %s: %s", custom_output, json.dumps(ret, indent=4))
247303
adjustFileObjs(ret, remove_hostfs)
248304
adjustFileObjs(ret,
249-
cast(Callable[[Any], Any], # known bug in mypy
305+
cast(Callable[[Any], Any], # known bug in mypy
250306
# https://github.com/python/mypy/issues/797
251-
functools.partial(revmap_file, builder, outdir)))
307+
partial(revmap_file, builder, outdir)))
252308
adjustFileObjs(ret, remove_hostfs)
253309
validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret)
254310
return ret
@@ -273,7 +329,7 @@ def collect_output(self, schema, builder, outdir):
273329
binding = schema["outputBinding"]
274330
globpatterns = [] # type: List[str]
275331

276-
revmap = functools.partial(revmap_file, builder, outdir)
332+
revmap = partial(revmap_file, builder, outdir)
277333

278334
if "glob" in binding:
279335
for gb in aslist(binding["glob"]):

cwltool/main.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from . import draft2tool
44
import argparse
55
from schema_salad.ref_resolver import Loader
6+
import string
67
import json
78
import os
89
import sys
@@ -21,6 +22,7 @@
2122
from . import update
2223
from .process import shortname, Process
2324
import rdflib
25+
import hashlib
2426
from .utils import aslist
2527
from typing import Union, Any, cast, Callable, Dict, Tuple, IO
2628

@@ -61,10 +63,14 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
6163
help="Path prefix for temporary directories",
6264
default="tmp")
6365

64-
parser.add_argument("--tmp-outdir-prefix", type=str,
66+
exgroup = parser.add_mutually_exclusive_group()
67+
exgroup.add_argument("--tmp-outdir-prefix", type=str,
6568
help="Path prefix for intermediate output directories",
6669
default="tmp")
6770

71+
exgroup.add_argument("--cachedir", type=str, default="",
72+
help="Directory to cache intermediate workflow outputs to avoid recomputing steps.")
73+
6874
exgroup = parser.add_mutually_exclusive_group()
6975
exgroup.add_argument("--rm-tmpdir", action="store_true", default=True,
7076
help="Delete intermediate temporary directories (default)",
@@ -597,13 +603,17 @@ def main(argsl=None,
597603
if isinstance(job_order_object, int):
598604
return job_order_object
599605

606+
if args.cachedir:
607+
args.cachedir = os.path.abspath(args.cachedir)
608+
args.move_outputs = False
609+
600610
try:
601611
out = executor(t, job_order_object[0],
602612
job_order_object[1], args,
603613
conformance_test=args.conformance_test,
604614
dry_run=args.dry_run,
605615
outdir=args.outdir,
606-
tmp_outdir_prefix=args.tmp_outdir_prefix,
616+
tmp_outdir_prefix=args.cachedir if args.cachedir else args.tmp_outdir_prefix,
607617
use_container=args.use_container,
608618
preserve_environment=args.preserve_environment,
609619
pull_image=args.enable_pull,
@@ -614,7 +624,8 @@ def main(argsl=None,
614624
makeTool=makeTool,
615625
move_outputs=args.move_outputs,
616626
select_resources=selectResources,
617-
eval_timeout=args.eval_timeout
627+
eval_timeout=args.eval_timeout,
628+
cachedir=args.cachedir
618629
)
619630
# This is the workflow output, it needs to be written
620631
if out is not None:

cwltool/pathmapper.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ def mapper(self, src): # type: (str) -> Tuple[str,str]
4242
def files(self): # type: () -> List[str]
4343
return self._pathmap.keys()
4444

45+
def items(self): # type: () -> List[Tuple[str,Tuple[str,str]]]
46+
return self._pathmap.items()
47+
4548
def reversemap(self, target): # type: (str) -> Tuple[str, str]
4649
for k, v in self._pathmap.items():
4750
if v[1] == target:

cwltool/process.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
_logger = logging.getLogger("cwltool")
3232

3333
supportedProcessRequirements = ["DockerRequirement",
34-
"ExpressionEngineRequirement",
3534
"SchemaDefRequirement",
3635
"EnvVarRequirement",
3736
"CreateFileRequirement",
@@ -310,6 +309,32 @@ def _init_job(self, joborder, input_basedir, **kwargs):
310309

311310
builder.bindings.extend(builder.bind_input(self.inputs_record_schema, builder.job))
312311

312+
if self.tool.get("baseCommand"):
313+
for n, b in enumerate(aslist(self.tool["baseCommand"])):
314+
builder.bindings.append({
315+
"position": [-1000000, n],
316+
"valueFrom": b
317+
})
318+
319+
if self.tool.get("arguments"):
320+
for i, a in enumerate(self.tool["arguments"]):
321+
if isinstance(a, dict):
322+
a = copy.copy(a)
323+
if a.get("position"):
324+
a["position"] = [a["position"], i]
325+
else:
326+
a["position"] = [0, i]
327+
a["do_eval"] = a["valueFrom"]
328+
a["valueFrom"] = None
329+
builder.bindings.append(a)
330+
else:
331+
builder.bindings.append({
332+
"position": [0, i],
333+
"valueFrom": a
334+
})
335+
336+
builder.bindings.sort(key=lambda a: a["position"])
337+
313338
builder.resources = self.evalResources(builder, kwargs)
314339

315340
return builder

0 commit comments

Comments
 (0)