Skip to content

Commit f594c5b

Browse files
author
Peter Amstutz
committed
Fill in args with defaults when using custom argument parser. Pass basedir
through kwargs instead of separate option.
1 parent ad3150f commit f594c5b

File tree

7 files changed

+78
-60
lines changed

7 files changed

+78
-60
lines changed

cwltool/draft2tool.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ def run(self, **kwargs): # type: (**Any) -> None
5252
_logger.warn(u"Failed to evaluate expression:\n%s", e, exc_info=(e if kwargs.get('debug') else False))
5353
self.output_callback({}, "permanentFail")
5454

55-
def job(self, joborder, input_basedir, output_callback, **kwargs):
55+
def job(self, joborder, output_callback, **kwargs):
5656
# type: (Dict[str,str], str, Callable[[Any, Any], Any], **Any) -> Generator[ExpressionTool.ExpressionJob, None, None]
57-
builder = self._init_job(joborder, input_basedir, **kwargs)
57+
builder = self._init_job(joborder, **kwargs)
5858

5959
j = ExpressionTool.ExpressionJob()
6060
j.builder = builder
@@ -119,19 +119,19 @@ def __init__(self, toolpath_object, **kwargs):
119119
def makeJobRunner(self): # type: () -> CommandLineJob
120120
return CommandLineJob()
121121

122-
def makePathMapper(self, reffiles, input_basedir, **kwargs):
122+
def makePathMapper(self, reffiles, **kwargs):
123123
# type: (Set[str], str, **Any) -> PathMapper
124124
dockerReq, _ = self.get_requirement("DockerRequirement")
125125
try:
126126
if dockerReq and kwargs.get("use_container"):
127-
return DockerPathMapper(reffiles, input_basedir)
127+
return DockerPathMapper(reffiles, kwargs["basedir"])
128128
else:
129-
return PathMapper(reffiles, input_basedir)
129+
return PathMapper(reffiles, kwargs["basedir"])
130130
except OSError as e:
131131
if e.errno == errno.ENOENT:
132132
raise WorkflowException(u"Missing input file %s" % e)
133133

134-
def job(self, joborder, input_basedir, output_callback, **kwargs):
134+
def job(self, joborder, output_callback, **kwargs):
135135
# type: (Dict[str,str], str, Callable[..., Any], **Any) -> Generator[Union[CommandLineJob, CallbackJob], None, None]
136136

137137
jobname = uniquename(kwargs.get("name", shortname(self.tool.get("id", "job"))))
@@ -140,9 +140,9 @@ def job(self, joborder, input_basedir, output_callback, **kwargs):
140140
cacheargs = kwargs.copy()
141141
cacheargs["outdir"] = "/out"
142142
cacheargs["tmpdir"] = "/tmp"
143-
cachebuilder = self._init_job(joborder, input_basedir, **cacheargs)
143+
cachebuilder = self._init_job(joborder, **cacheargs)
144144
cachebuilder.pathmapper = PathMapper(set((f["path"] for f in cachebuilder.files)),
145-
input_basedir)
145+
kwargs["basedir"])
146146

147147
cmdline = flatten(map(cachebuilder.generate_arg, cachebuilder.bindings))
148148
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
@@ -198,7 +198,7 @@ def rm_pending_output_callback(output_callback, jobcachepending,
198198
partial(rm_pending_output_callback, output_callback,
199199
jobcachepending))
200200

201-
builder = self._init_job(joborder, input_basedir, **kwargs)
201+
builder = self._init_job(joborder, **kwargs)
202202

203203
reffiles = set((f["path"] for f in builder.files))
204204

@@ -232,7 +232,7 @@ def rm_pending_output_callback(output_callback, jobcachepending,
232232
if os.path.isabs(j.stdout) or ".." in j.stdout:
233233
raise validate.ValidationException("stdout must be a relative path")
234234

235-
builder.pathmapper = self.makePathMapper(reffiles, input_basedir, **kwargs)
235+
builder.pathmapper = self.makePathMapper(reffiles, **kwargs)
236236
builder.requirements = j.requirements
237237

238238
# map files to assigned path inside a container. We need to also explicitly

cwltool/factory.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ def __init__(self, t, factory): # type: (Process, Factory) -> None
1313
self.factory = factory
1414

1515
def __call__(self, **kwargs): # type: (**Any) -> Union[str,Dict[str,str]]
16-
return self.factory.executor(self.t, kwargs, os.getcwd(), None, **self.factory.execkwargs)
16+
execkwargs = self.factory.execkwargs.copy()
17+
execkwargs["basedir"] = os.getcwd()
18+
return self.factory.executor(self.t, kwargs, **execkwargs)
1719

1820
class Factory(object):
1921
def __init__(self, makeTool=workflow.defaultMakeTool,

cwltool/job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ def run(self, dry_run=False, pull_image=True, rm_container=True,
149149
_logger.info(u"[job %s] %s$ %s%s%s",
150150
self.name,
151151
self.outdir,
152-
" ".join([shellescape.quote(str(arg)) if shouldquote(str(arg)) else str(arg) for arg in (runtime + self.command_line)]),
152+
" \\\n ".join([shellescape.quote(str(arg)) if shouldquote(str(arg)) else str(arg) for arg in (runtime + self.command_line)]),
153153
u' < %s' % (self.stdin) if self.stdin else '',
154154
u' > %s' % os.path.join(self.outdir, self.stdout) if self.stdout else '')
155155

cwltool/main.py

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
148148
return parser
149149

150150

151-
def single_job_executor(t, job_order, input_basedir, args, **kwargs):
151+
def single_job_executor(t, job_order_object, **kwargs):
152152
# type: (Process, Dict[str,Any], str, argparse.Namespace,**Any) -> Union[str,Dict[str,str]]
153153
final_output = []
154154
final_status = []
@@ -161,15 +161,17 @@ def output_callback(out, processStatus):
161161
_logger.warn(u"Final process status is %s", processStatus)
162162
final_output.append(out)
163163

164+
if "basedir" not in kwargs:
165+
raise WorkflowException("Must provide 'basedir' in kwargs")
166+
164167
if kwargs.get("outdir"):
165168
pass
166169
elif kwargs.get("dry_run"):
167170
kwargs["outdir"] = "/tmp"
168171
else:
169172
kwargs["outdir"] = tempfile.mkdtemp()
170173

171-
jobiter = t.job(job_order,
172-
input_basedir,
174+
jobiter = t.job(job_order_object,
173175
output_callback,
174176
**kwargs)
175177

@@ -447,6 +449,31 @@ def main(argsl=None,
447449

448450
args = parser.parse_args(argsl)
449451

452+
# If caller provided a custom parser, it may be not every option is
453+
# set, so fill in no-op defaults to avoid crashing when dereferencing
454+
# them in args.
455+
for k,v in {'print_deps': False,
456+
'print_pre': False,
457+
'print_rdf': False,
458+
'print_dot': False,
459+
'relative_deps': False,
460+
'tmp_outdir_prefix': 'tmp',
461+
'tmpdir_prefix': 'tmp',
462+
'print_input_deps': False,
463+
'cachedir': None,
464+
'quiet': False,
465+
'debug': False,
466+
'version': False,
467+
'enable_dev': False,
468+
'strict': True,
469+
'rdf_serializer': None,
470+
'basedir': None,
471+
'tool_help': False,
472+
'workflow': None,
473+
'job_order': None}.iteritems():
474+
if not hasattr(args, k):
475+
setattr(args, k, v)
476+
450477
if args.quiet:
451478
_logger.setLevel(logging.WARN)
452479
if args.debug:
@@ -513,7 +540,7 @@ def main(argsl=None,
513540
# Use user defined temp directory (if it exists)
514541
args.tmp_outdir_prefix = os.path.abspath(args.tmp_outdir_prefix)
515542
if not os.path.exists(args.tmp_outdir_prefix):
516-
_logger.error("Intermediate output directory prefix doesn't exist, reverting to default")
543+
_logger.error("Intermediate output directory prefix doesn't exist.")
517544
return 1
518545

519546
if args.tmpdir_prefix != 'tmp':
@@ -536,25 +563,14 @@ def main(argsl=None,
536563
args.move_outputs = False
537564

538565
try:
566+
args.tmp_outdir_prefix = args.cachedir if args.cachedir else args.tmp_outdir_prefix
567+
args.basedir = job_order_object[1]
568+
del args.workflow
569+
del args.job_order
539570
out = executor(tool, job_order_object[0],
540-
job_order_object[1], args,
541-
conformance_test=args.conformance_test,
542-
dry_run=args.dry_run,
543-
outdir=args.outdir,
544-
tmp_outdir_prefix=args.cachedir if args.cachedir else args.tmp_outdir_prefix,
545-
use_container=args.use_container,
546-
preserve_environment=args.preserve_environment,
547-
pull_image=args.enable_pull,
548-
rm_container=args.rm_container,
549-
tmpdir_prefix=args.tmpdir_prefix,
550-
enable_net=args.enable_net,
551-
rm_tmpdir=args.rm_tmpdir,
552571
makeTool=makeTool,
553-
move_outputs=args.move_outputs,
554572
select_resources=selectResources,
555-
eval_timeout=args.eval_timeout,
556-
cachedir=args.cachedir
557-
)
573+
**vars(args))
558574
# This is the workflow output, it needs to be written
559575
if out is not None:
560576
if isinstance(out, basestring):

cwltool/process.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ def __init__(self, toolpath_object, **kwargs):
297297
raise validate.ValidationException(u"Got error `%s` while prcoessing outputs of %s:\n%s" % (str(e), self.tool["id"], json.dumps(self.outputs_record_schema, indent=4)))
298298

299299

300-
def _init_job(self, joborder, input_basedir, **kwargs):
300+
def _init_job(self, joborder, **kwargs):
301301
# type: (Dict[str, str], str, **Any) -> Builder
302302
builder = Builder()
303303
builder.job = copy.deepcopy(joborder)
@@ -326,7 +326,7 @@ def _init_job(self, joborder, input_basedir, **kwargs):
326326
builder.outdir = kwargs.get("outdir") or tempfile.mkdtemp()
327327
builder.tmpdir = kwargs.get("tmpdir") or tempfile.mkdtemp()
328328

329-
builder.fs_access = kwargs.get("fs_access") or StdFsAccess(input_basedir)
329+
builder.fs_access = kwargs.get("fs_access") or StdFsAccess(kwargs["basedir"])
330330

331331
if self.formatgraph:
332332
for i in self.tool["inputs"]:
@@ -426,7 +426,7 @@ def visit(self, op):
426426
op(self.tool)
427427

428428
@abc.abstractmethod
429-
def job(self, job_order, input_basedir, output_callbacks, **kwargs):
429+
def job(self, job_order, output_callbacks, **kwargs):
430430
# type: (Dict[str, str], str, Callable[[Any, Any], Any], **Any) -> Generator[Any, None, None]
431431
return None
432432

cwltool/workflow.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,11 @@ def __init__(self, step): # type: (Any) -> None
145145
self.iterable = None # type: Iterable
146146
self.name = uniquename(u"step %s" % shortname(self.id))
147147

148-
def job(self, joborder, basedir, output_callback, **kwargs):
148+
def job(self, joborder, output_callback, **kwargs):
149149
# type: (Dict[str,str], str, functools.partial[None], **Any) -> Generator
150150
kwargs["part_of"] = self.name
151151
kwargs["name"] = shortname(self.id)
152-
for j in self.step.job(joborder, basedir, output_callback, **kwargs):
152+
for j in self.step.job(joborder, output_callback, **kwargs):
153153
yield j
154154

155155

@@ -197,7 +197,7 @@ def receive_output(self, step, outputparms, jobout, processStatus):
197197

198198
step.completed = True
199199

200-
def try_make_job(self, step, basedir, **kwargs):
200+
def try_make_job(self, step, **kwargs):
201201
# type: (WorkflowJobStep, str, **Any) -> Generator
202202
inputparms = step.tool["inputs"]
203203
outputparms = step.tool["outputs"]
@@ -241,18 +241,18 @@ def valueFromFunc(k, v): # type: (Any, Any) -> Any
241241
for k,v in inputobj.items()}
242242

243243
if method == "dotproduct" or method is None:
244-
jobs = dotproduct_scatter(step, inputobj, basedir, scatter,
244+
jobs = dotproduct_scatter(step, inputobj, scatter,
245245
cast( # known bug with mypy
246246
# https://github.com/python/mypy/issues/797
247247
Callable[[Any], Any],callback), **kwargs)
248248
elif method == "nested_crossproduct":
249-
jobs = nested_crossproduct_scatter(step, inputobj, basedir,
249+
jobs = nested_crossproduct_scatter(step, inputobj,
250250
scatter, cast(Callable[[Any], Any], callback),
251251
# known bug in mypy
252252
# https://github.com/python/mypy/issues/797
253253
**kwargs)
254254
elif method == "flat_crossproduct":
255-
jobs = flat_crossproduct_scatter(step, inputobj, basedir,
255+
jobs = flat_crossproduct_scatter(step, inputobj,
256256
scatter,
257257
cast(Callable[[Any], Any],
258258
# known bug in mypy
@@ -262,7 +262,7 @@ def valueFromFunc(k, v): # type: (Any, Any) -> Any
262262
_logger.debug(u"[job %s] job input %s", step.name, json.dumps(inputobj, indent=4))
263263
inputobj = {k: valueFromFunc(k, v) for k,v in inputobj.items()}
264264
_logger.debug(u"[job %s] evaluated job input to %s", step.name, json.dumps(inputobj, indent=4))
265-
jobs = step.job(inputobj, basedir, callback, **kwargs)
265+
jobs = step.job(inputobj, callback, **kwargs)
266266

267267
step.submitted = True
268268

@@ -278,7 +278,7 @@ def valueFromFunc(k, v): # type: (Any, Any) -> Any
278278
def run(self, **kwargs):
279279
_logger.debug(u"[%s] workflow starting", self.name)
280280

281-
def job(self, joborder, basedir, output_callback, move_outputs=True, **kwargs):
281+
def job(self, joborder, output_callback, move_outputs=True, **kwargs):
282282
# type: (Dict[str,str], str, Callable[[Any, Any], Any], bool, **Any) -> Generator[WorkflowJob, None, None]
283283
self.state = {}
284284
self.processStatus = "success"
@@ -307,7 +307,7 @@ def job(self, joborder, basedir, output_callback, move_outputs=True, **kwargs):
307307

308308
for step in self.steps:
309309
if not step.submitted:
310-
step.iterable = self.try_make_job(step, basedir, **kwargs)
310+
step.iterable = self.try_make_job(step, **kwargs)
311311

312312
if step.iterable:
313313
for newjob in step.iterable:
@@ -387,15 +387,15 @@ def __init__(self, toolpath_object, **kwargs):
387387

388388
# TODO: statically validate data links instead of doing it at runtime.
389389

390-
def job(self, joborder, basedir, output_callback, **kwargs):
390+
def job(self, joborder, output_callback, **kwargs):
391391
# type: (Dict[str,str], str, Callable[[Any, Any], Any], **Any) -> Generator[WorkflowJob, None, None]
392-
builder = self._init_job(joborder, basedir, **kwargs)
392+
builder = self._init_job(joborder, **kwargs)
393393
wj = WorkflowJob(self, **kwargs)
394394
yield wj
395395

396396
kwargs["part_of"] = u"workflow %s" % wj.name
397397

398-
for w in wj.job(builder.job, basedir, output_callback, **kwargs):
398+
for w in wj.job(builder.job, output_callback, **kwargs):
399399
yield w
400400

401401
def visit(self, op):
@@ -497,7 +497,7 @@ def receive_output(self, output_callback, jobout, processStatus):
497497
processStatus = "permanentFail"
498498
output_callback(output, processStatus)
499499

500-
def job(self, joborder, basedir, output_callback, **kwargs):
500+
def job(self, joborder, output_callback, **kwargs):
501501
# type: (Dict[str, Any], str, Callable[...,Any], **Any) -> Generator
502502
for i in self.tool["inputs"]:
503503
p = i["id"]
@@ -509,7 +509,7 @@ def job(self, joborder, basedir, output_callback, **kwargs):
509509
kwargs["hints"] = kwargs.get("hints", []) + self.tool.get("hints", [])
510510

511511
try:
512-
for t in self.embedded_tool.job(joborder, basedir,
512+
for t in self.embedded_tool.job(joborder,
513513
functools.partial(self.receive_output, output_callback),
514514
**kwargs):
515515
yield t
@@ -554,7 +554,7 @@ def setTotal(self, total): # type: (int) -> None
554554
self.output_callback(self.dest, self.processStatus)
555555

556556

557-
def dotproduct_scatter(process, joborder, basedir, scatter_keys, output_callback, **kwargs):
557+
def dotproduct_scatter(process, joborder, scatter_keys, output_callback, **kwargs):
558558
# type: (WorkflowJobStep, Dict[str, Any], str, List[str], Callable[..., Any], **Any) -> Generator[WorkflowJob, None, None]
559559
l = None
560560
for s in scatter_keys:
@@ -574,13 +574,13 @@ def dotproduct_scatter(process, joborder, basedir, scatter_keys, output_callback
574574
for s in scatter_keys:
575575
jo[s] = kwargs["valueFrom"](s, joborder[s][n])
576576

577-
for j in process.job(jo, basedir, functools.partial(rc.receive_scatter_output, n), **kwargs):
577+
for j in process.job(jo, functools.partial(rc.receive_scatter_output, n), **kwargs):
578578
yield j
579579

580580
rc.setTotal(l)
581581

582582

583-
def nested_crossproduct_scatter(process, joborder, basedir, scatter_keys, output_callback, **kwargs):
583+
def nested_crossproduct_scatter(process, joborder, scatter_keys, output_callback, **kwargs):
584584
# type: (WorkflowJobStep, Dict[str, Any], str, List[str], Callable[..., Any], **Any) -> Generator[WorkflowJob, None, None]
585585
scatter_key = scatter_keys[0]
586586
l = len(joborder[scatter_key])
@@ -595,10 +595,10 @@ def nested_crossproduct_scatter(process, joborder, basedir, scatter_keys, output
595595
jo[scatter_key] = kwargs["valueFrom"](scatter_key, joborder[scatter_key][n])
596596

597597
if len(scatter_keys) == 1:
598-
for j in process.job(jo, basedir, functools.partial(rc.receive_scatter_output, n), **kwargs):
598+
for j in process.job(jo, functools.partial(rc.receive_scatter_output, n), **kwargs):
599599
yield j
600600
else:
601-
for j in nested_crossproduct_scatter(process, jo, basedir,
601+
for j in nested_crossproduct_scatter(process, jo,
602602
scatter_keys[1:], cast( # known bug with mypy
603603
# https://github.com/python/mypy/issues/797
604604
Callable[[Any], Any],
@@ -622,7 +622,7 @@ def crossproduct_size(joborder, scatter_keys):
622622
sum += crossproduct_size(joborder, scatter_keys[1:])
623623
return sum
624624

625-
def flat_crossproduct_scatter(process, joborder, basedir, scatter_keys, output_callback, startindex, **kwargs):
625+
def flat_crossproduct_scatter(process, joborder, scatter_keys, output_callback, startindex, **kwargs):
626626
# type: (WorkflowJobStep, Dict[str, Any], str, List[str], Union[ReceiveScatterOutput,Callable[..., Any]], int, **Any) -> Generator[WorkflowJob, None, None]
627627
scatter_key = scatter_keys[0]
628628
l = len(joborder[scatter_key])
@@ -644,11 +644,11 @@ def flat_crossproduct_scatter(process, joborder, basedir, scatter_keys, output_c
644644
jo[scatter_key] = kwargs["valueFrom"](scatter_key, joborder[scatter_key][n])
645645

646646
if len(scatter_keys) == 1:
647-
for j in process.job(jo, basedir, functools.partial(rc.receive_scatter_output, put), **kwargs):
647+
for j in process.job(jo, functools.partial(rc.receive_scatter_output, put), **kwargs):
648648
yield j
649649
put += 1
650650
else:
651-
for j in flat_crossproduct_scatter(process, jo, basedir, scatter_keys[1:], rc, put, **kwargs):
651+
for j in flat_crossproduct_scatter(process, jo, scatter_keys[1:], rc, put, **kwargs):
652652
if j:
653653
put += 1
654654
yield j

setup.cfg

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[flake8]
2-
ignore=E124,E128,E129,E201,E202,E225,E226,E231,E265,E271,E302,E303,F401,E402,E501,W503,E731,F811,F821,F841
2+
ignore = E124,E128,E129,E201,E202,E225,E226,E231,E265,E271,E302,E303,F401,E402,E501,W503,E731,F811,F821,F841
3+
4+
[easy_install]
35

4-
#[bdist_wheel]
5-
#universal = 1

0 commit comments

Comments
 (0)