2
2
import json
3
3
import copy
4
4
from .flatten import flatten
5
- import functools
5
+ from functools import partial
6
6
import os
7
7
from .pathmapper import PathMapper , DockerPathMapper
8
8
from .job import CommandLineJob
@@ -96,6 +96,20 @@ def revmap_file(builder, outdir, f):
96
96
else :
97
97
raise WorkflowException (u"Output file path %s must be within designated output directory (%s) or an input file pass through." % (f ["path" ], builder .outdir ))
98
98
99
+ class CallbackJob (object ):
100
+ def __init__ (self , job , output_callback , cachebuilder , jobcache ):
101
+ # type: (CommandLineTool, Callable[[Any, Any], Any], Builder, str) -> None
102
+ self .job = job
103
+ self .output_callback = output_callback
104
+ self .cachebuilder = cachebuilder
105
+ self .outdir = jobcache
106
+
107
+ def run (self , ** kwargs ):
108
+ # type: (**Any) -> None
109
+ self .output_callback (self .job .collect_output_ports (self .job .tool ["outputs" ],
110
+ self .cachebuilder , self .outdir ),
111
+ "success" )
112
+
99
113
100
114
class CommandLineTool (Process ):
101
115
def __init__ (self , toolpath_object , ** kwargs ):
@@ -118,7 +132,7 @@ def makePathMapper(self, reffiles, input_basedir, **kwargs):
118
132
raise WorkflowException (u"Missing input file %s" % e )
119
133
120
134
def job (self , joborder , input_basedir , output_callback , ** kwargs ):
121
- # type: (Dict[str,str], str, Callable[[Any , Any], Any], **Any) -> Generator[CommandLineJob, None, None]
135
+ # type: (Dict[str,str], str, Callable[... , Any], **Any) -> Generator[Union[ CommandLineJob, CallbackJob] , None, None]
122
136
123
137
jobname = uniquename (kwargs .get ("name" , shortname (self .tool .get ("id" , "job" ))))
124
138
@@ -164,16 +178,6 @@ def job(self, joborder, input_basedir, output_callback, **kwargs):
164
178
else :
165
179
cachebuilder .outdir = jobcache
166
180
167
- class CallbackJob (object ):
168
- def __init__ (self , job , output_callback , cachebuilder , jobcache ):
169
- self .job = job
170
- self .output_callback = output_callback
171
- self .cachebuilder = cachebuilder
172
- self .outdir = jobcache
173
- def run (self , ** kwargs ):
174
- self .output_callback (self .job .collect_output_ports (self .job .tool ["outputs" ],
175
- self .cachebuilder , self .outdir ),
176
- "success" )
177
181
_logger .info ("[job %s] Using cached output in %s" , jobname , jobcache )
178
182
yield CallbackJob (self , output_callback , cachebuilder , jobcache )
179
183
return
@@ -188,7 +192,11 @@ def rm_pending_output_callback(output_callback, jobcachepending,
188
192
if processStatus == "success" :
189
193
os .remove (jobcachepending )
190
194
output_callback (outputs , processStatus )
191
- output_callback = functools .partial (rm_pending_output_callback , output_callback , jobcachepending )
195
+ output_callback = cast (
196
+ Callable [..., Any ], # known bug in mypy
197
+ # https://github.com/python/mypy/issues/797
198
+ partial (rm_pending_output_callback , output_callback ,
199
+ jobcachepending ))
192
200
193
201
builder = self ._init_job (joborder , input_basedir , ** kwargs )
194
202
@@ -242,7 +250,7 @@ def _check_adjust(f): # type: (Dict[str,Any]) -> Dict[str,Any]
242
250
243
251
_logger .debug (u"[job %s] command line bindings is %s" , j .name , json .dumps (builder .bindings , indent = 4 ))
244
252
245
- dockerReq , _ = self .get_requirement ("DockerRequirement" )
253
+ dockerReq = self .get_requirement ("DockerRequirement" )[ 0 ]
246
254
if dockerReq and kwargs .get ("use_container" ):
247
255
out_prefix = kwargs .get ("tmp_outdir_prefix" )
248
256
j .outdir = kwargs .get ("outdir" ) or tempfile .mkdtemp (prefix = out_prefix )
@@ -252,19 +260,19 @@ def _check_adjust(f): # type: (Dict[str,Any]) -> Dict[str,Any]
252
260
j .outdir = builder .outdir
253
261
j .tmpdir = builder .tmpdir
254
262
255
- createFiles , _ = self .get_requirement ("CreateFileRequirement" )
263
+ createFiles = self .get_requirement ("CreateFileRequirement" )[ 0 ]
256
264
j .generatefiles = {}
257
265
if createFiles :
258
266
for t in createFiles ["fileDef" ]:
259
267
j .generatefiles [builder .do_eval (t ["filename" ])] = copy .deepcopy (builder .do_eval (t ["fileContent" ]))
260
268
261
269
j .environment = {}
262
- evr , _ = self .get_requirement ("EnvVarRequirement" )
270
+ evr = self .get_requirement ("EnvVarRequirement" )[ 0 ]
263
271
if evr :
264
272
for t in evr ["envDef" ]:
265
273
j .environment [t ["envName" ]] = builder .do_eval (t ["envValue" ])
266
274
267
- shellcmd , _ = self .get_requirement ("ShellCommandRequirement" )
275
+ shellcmd = self .get_requirement ("ShellCommandRequirement" )[ 0 ]
268
276
if shellcmd :
269
277
cmd = [] # type: List[str]
270
278
for b in builder .bindings :
@@ -277,7 +285,8 @@ def _check_adjust(f): # type: (Dict[str,Any]) -> Dict[str,Any]
277
285
j .command_line = flatten (map (builder .generate_arg , builder .bindings ))
278
286
279
287
j .pathmapper = builder .pathmapper
280
- j .collect_outputs = functools .partial (self .collect_output_ports , self .tool ["outputs" ], builder )
288
+ j .collect_outputs = partial (
289
+ self .collect_output_ports , self .tool ["outputs" ], builder )
281
290
j .output_callback = output_callback
282
291
283
292
yield j
@@ -295,7 +304,7 @@ def collect_output_ports(self, ports, builder, outdir):
295
304
adjustFileObjs (ret ,
296
305
cast (Callable [[Any ], Any ], # known bug in mypy
297
306
# https://github.com/python/mypy/issues/797
298
- functools . partial (revmap_file , builder , outdir )))
307
+ partial (revmap_file , builder , outdir )))
299
308
adjustFileObjs (ret , remove_hostfs )
300
309
validate .validate_ex (self .names .get_name ("outputs_record_schema" , "" ), ret )
301
310
return ret
@@ -320,7 +329,7 @@ def collect_output(self, schema, builder, outdir):
320
329
binding = schema ["outputBinding" ]
321
330
globpatterns = [] # type: List[str]
322
331
323
- revmap = functools . partial (revmap_file , builder , outdir )
332
+ revmap = partial (revmap_file , builder , outdir )
324
333
325
334
if "glob" in binding :
326
335
for gb in aslist (binding ["glob" ]):
0 commit comments