24
24
import errno
25
25
from typing import Callable , Any , Union , Generator , cast
26
26
import hashlib
27
+ import shutil
27
28
28
29
_logger = logging .getLogger ("cwltool" )
29
30
@@ -119,20 +120,30 @@ def makePathMapper(self, reffiles, input_basedir, **kwargs):
119
120
def job (self , joborder , input_basedir , output_callback , ** kwargs ):
120
121
# type: (Dict[str,str], str, Callable[[Any, Any], Any], **Any) -> Generator[CommandLineJob, None, None]
121
122
123
+ jobname = uniquename (kwargs .get ("name" , shortname (self .tool ["id" ])))
124
+
122
125
if kwargs .get ("cachedir" ):
123
126
cacheargs = kwargs .copy ()
124
127
cacheargs ["outdir" ] = "/out"
125
128
cacheargs ["tmpdir" ] = "/tmp"
126
129
cachebuilder = self ._init_job (joborder , input_basedir , ** cacheargs )
130
+ cachebuilder .pathmapper = PathMapper (set ((f ["path" ] for f in cachebuilder .files )),
131
+ input_basedir )
127
132
cmdline = flatten (map (cachebuilder .generate_arg , cachebuilder .bindings ))
128
133
(docker_req , docker_is_req ) = self .get_requirement ("DockerRequirement" )
129
134
if docker_req and kwargs .get ("use_container" ) is not False :
130
135
dockerimg = docker_req .get ("dockerImageId" ) or docker_req .get ("dockerPull" )
131
136
cmdline = ["docker" , "run" , dockerimg ] + cmdline
132
- cmdlinestr = json .dumps (cmdline )
133
- cachekey = hashlib .md5 (cmdlinestr ).hexdigest ()
137
+ keydict = {"cmdline" : cmdline }
138
+ for _ ,f in cachebuilder .pathmapper .items ():
139
+ st = os .stat (f [0 ])
140
+ keydict [f [0 ]] = [st .st_size , int (st .st_mtime * 1000 )]
141
+ keydictstr = json .dumps (keydict , separators = (',' ,':' ), sort_keys = True )
142
+ cachekey = hashlib .md5 (keydictstr ).hexdigest ()
143
+ _logger .debug ("[job %s] keydictstr is %s -> %s" , jobname , keydictstr , cachekey )
134
144
jobcache = os .path .join (kwargs ["cachedir" ], cachekey )
135
- if os .path .isdir (jobcache ):
145
+ jobcachepending = jobcache + ".pending"
146
+ if os .path .isdir (jobcache ) and not os .path .isfile (jobcachepending ):
136
147
class CallbackJob (object ):
137
148
def __init__ (self , job , output_callback , cachebuilder , jobcache ):
138
149
self .job = job
@@ -143,11 +154,20 @@ def run(self, **kwargs):
143
154
self .output_callback (self .job .collect_output_ports (self .job .tool ["outputs" ],
144
155
self .cachebuilder , self .outdir ),
145
156
"success" )
157
+ _logger .info ("[job %s] Using cached output in %s" , jobname , jobcache )
146
158
yield CallbackJob (self , output_callback , cachebuilder , jobcache )
147
159
return
148
160
else :
161
+ shutil .rmtree (jobcache , True )
149
162
os .makedirs (jobcache )
150
163
kwargs ["outdir" ] = jobcache
164
+ open (jobcachepending , "w" ).close ()
165
+ def rm_pending_output_callback (output_callback , jobcachepending ,
166
+ outputs , processStatus ):
167
+ if processStatus == "success" :
168
+ os .remove (jobcachepending )
169
+ output_callback (outputs , processStatus )
170
+ output_callback = functools .partial (rm_pending_output_callback , output_callback , jobcachepending )
151
171
152
172
builder = self ._init_job (joborder , input_basedir , ** kwargs )
153
173
@@ -163,7 +183,7 @@ def run(self, **kwargs):
163
183
j .permanentFailCodes = self .tool .get ("permanentFailCodes" )
164
184
j .requirements = self .requirements
165
185
j .hints = self .hints
166
- j .name = uniquename ( kwargs . get ( "name" , str ( id ( j ))))
186
+ j .name = jobname
167
187
168
188
_logger .debug (u"[job %s] initializing from %s%s" ,
169
189
j .name ,
0 commit comments