Skip to content

Commit 441ebf4

Browse files
author
Anton Khodak
committed
Merge branch 'master' into deprecate_drafts
2 parents 559d0b8 + 32fd7cb commit 441ebf4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+1064
-520
lines changed

.travis.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
sudo: false
22
language: python
3+
cache: pip
34
python:
45
- 2.7
56
- 3.3
@@ -10,9 +11,13 @@ os:
1011
- linux
1112
install:
1213
- pip install tox-travis
14+
jobs:
15+
include:
16+
- stage: release-test
17+
script: RELEASE_SKIP=head ./release-test.sh
1318
script: tox
1419
branches:
1520
only:
1621
- master
1722
notifications:
18-
email: false
23+
email: false

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ install: FORCE
5656
dist: dist/${MODULE}-$(VERSION).tar.gz
5757

5858
dist/${MODULE}-$(VERSION).tar.gz: $(SOURCES)
59-
./setup.py sdist
59+
./setup.py sdist bdist_wheel
6060

6161
## clean : clean up all temporary / machine-generated files
6262
clean: FORCE

appveyor.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ install:
3131
- "python -c \"import struct; print(struct.calcsize('P') * 8)\""
3232

3333
build_script:
34+
- "%CMD_IN_ENV% pip install -U setuptools pip"
3435
- "%CMD_IN_ENV% pip install ."
3536

3637

cwltool/argparser.py

Lines changed: 399 additions & 0 deletions
Large diffs are not rendered by default.

cwltool/builder.py

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ def bind_input(self, schema, datum, lead_pos=None, tail_pos=None):
7878
lead_pos = []
7979
bindings = [] # type: List[Dict[Text,Text]]
8080
binding = None # type: Dict[Text,Any]
81+
value_from_expression = False
8182
if "inputBinding" in schema and isinstance(schema["inputBinding"], dict):
8283
binding = copy.copy(schema["inputBinding"])
8384

@@ -87,29 +88,33 @@ def bind_input(self, schema, datum, lead_pos=None, tail_pos=None):
8788
binding["position"] = aslist(lead_pos) + [0] + aslist(tail_pos)
8889

8990
binding["datum"] = datum
91+
if "valueFrom" in binding:
92+
value_from_expression = True
9093

9194
# Handle union types
9295
if isinstance(schema["type"], list):
93-
for t in schema["type"]:
94-
if isinstance(t, (str, Text)) and self.names.has_name(t, ""):
95-
avsc = self.names.get_name(t, "")
96-
elif isinstance(t, dict) and "name" in t and self.names.has_name(t["name"], ""):
97-
avsc = self.names.get_name(t["name"], "")
98-
else:
99-
avsc = AvroSchemaFromJSONData(t, self.names)
100-
if validate.validate(avsc, datum):
101-
schema = copy.deepcopy(schema)
102-
schema["type"] = t
103-
return self.bind_input(schema, datum, lead_pos=lead_pos, tail_pos=tail_pos)
104-
raise validate.ValidationException(u"'%s' is not a valid union %s" % (datum, schema["type"]))
96+
if not value_from_expression:
97+
for t in schema["type"]:
98+
if isinstance(t, (str, Text)) and self.names.has_name(t, ""):
99+
avsc = self.names.get_name(t, "")
100+
elif isinstance(t, dict) and "name" in t and self.names.has_name(t["name"], ""):
101+
avsc = self.names.get_name(t["name"], "")
102+
else:
103+
avsc = AvroSchemaFromJSONData(t, self.names)
104+
if validate.validate(avsc, datum):
105+
schema = copy.deepcopy(schema)
106+
schema["type"] = t
107+
return self.bind_input(schema, datum, lead_pos=lead_pos, tail_pos=tail_pos)
108+
raise validate.ValidationException(u"'%s' is not a valid union %s" % (datum, schema["type"]))
105109
elif isinstance(schema["type"], dict):
106-
st = copy.deepcopy(schema["type"])
107-
if binding and "inputBinding" not in st and st["type"] == "array" and "itemSeparator" not in binding:
108-
st["inputBinding"] = {}
109-
for k in ("secondaryFiles", "format", "streamable"):
110-
if k in schema:
111-
st[k] = schema[k]
112-
bindings.extend(self.bind_input(st, datum, lead_pos=lead_pos, tail_pos=tail_pos))
110+
if not value_from_expression:
111+
st = copy.deepcopy(schema["type"])
112+
if binding and "inputBinding" not in st and st["type"] == "array" and "itemSeparator" not in binding:
113+
st["inputBinding"] = {}
114+
for k in ("secondaryFiles", "format", "streamable"):
115+
if k in schema:
116+
st[k] = schema[k]
117+
bindings.extend(self.bind_input(st, datum, lead_pos=lead_pos, tail_pos=tail_pos))
113118
else:
114119
if schema["type"] in self.schemaDefs:
115120
schema = self.schemaDefs[schema["type"]]
@@ -212,15 +217,18 @@ def generate_arg(self, binding): # type: (Dict[Text,Any]) -> List[Text]
212217

213218
prefix = binding.get("prefix")
214219
sep = binding.get("separate", True)
220+
if prefix is None and not sep:
221+
with SourceLine(binding, "separate", WorkflowException, _logger.isEnabledFor(logging.DEBUG)):
222+
raise WorkflowException("'separate' option can not be specified without prefix")
215223

216224
l = [] # type: List[Dict[Text,Text]]
217225
if isinstance(value, list):
218-
if binding.get("itemSeparator"):
226+
if binding.get("itemSeparator") and value:
219227
l = [binding["itemSeparator"].join([self.tostr(v) for v in value])]
220228
elif binding.get("valueFrom"):
221229
value = [self.tostr(v) for v in value]
222230
return ([prefix] if prefix else []) + value
223-
elif prefix:
231+
elif prefix and value:
224232
return [prefix]
225233
else:
226234
return []
@@ -244,8 +252,8 @@ def generate_arg(self, binding): # type: (Dict[Text,Any]) -> List[Text]
244252

245253
return [a for a in args if a is not None]
246254

247-
def do_eval(self, ex, context=None, pull_image=True, recursive=False):
248-
# type: (Union[Dict[Text, Text], Text], Any, bool, bool) -> Any
255+
def do_eval(self, ex, context=None, pull_image=True, recursive=False, strip_whitespace=True):
256+
# type: (Union[Dict[Text, Text], Text], Any, bool, bool, bool) -> Any
249257
if recursive:
250258
if isinstance(ex, dict):
251259
return {k: self.do_eval(v, context, pull_image, recursive) for k, v in iteritems(ex)}
@@ -260,4 +268,5 @@ def do_eval(self, ex, context=None, pull_image=True, recursive=False):
260268
timeout=self.timeout,
261269
debug=self.debug,
262270
js_console=self.js_console,
263-
force_docker_pull=self.force_docker_pull)
271+
force_docker_pull=self.force_docker_pull,
272+
strip_whitespace=strip_whitespace)

cwltool/command_line_tool.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,8 @@ def revmap_file(builder, outdir, f):
150150
u"file pass through." % (path, builder.outdir))
151151
return f
152152

153-
154-
raise WorkflowException(u"Output File object is missing both `location` and `path` fields: %s" % f)
153+
raise WorkflowException(u"Output File object is missing both 'location' "
154+
"and 'path' fields: %s" % f)
155155

156156

157157
class CallbackJob(object):
@@ -382,7 +382,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
382382
else:
383383
for t in initialWorkdir["listing"]:
384384
if "entry" in t:
385-
et = {u"entry": builder.do_eval(t["entry"])}
385+
et = {u"entry": builder.do_eval(t["entry"], strip_whitespace=False)}
386386
if "entryname" in t:
387387
et["entryname"] = builder.do_eval(t["entryname"])
388388
else:

cwltool/executors.py

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
import logging
2+
import tempfile
3+
import threading
4+
5+
import os
6+
from abc import ABCMeta, abstractmethod
7+
8+
from typing import Dict, Text, Any, Tuple, Set, List
9+
10+
from .builder import Builder
11+
from .errors import WorkflowException
12+
from .mutation import MutationManager
13+
from .job import JobBase
14+
from .process import relocateOutputs, cleanIntermediate, Process
15+
from . import loghandler
16+
17+
_logger = logging.getLogger("cwltool")
18+
19+
class JobExecutor(object):
20+
__metaclass__ = ABCMeta
21+
22+
def __init__(self):
23+
# type: (...) -> None
24+
self.final_output = [] # type: List
25+
self.final_status = [] # type: List
26+
self.output_dirs = set() # type: Set
27+
28+
def __call__(self, *args, **kwargs):
29+
return self.execute(*args, **kwargs)
30+
31+
def output_callback(self, out, processStatus):
32+
self.final_status.append(processStatus)
33+
self.final_output.append(out)
34+
35+
@abstractmethod
36+
def run_jobs(self,
37+
t, # type: Process
38+
job_order_object, # type: Dict[Text, Any]
39+
logger,
40+
**kwargs # type: Any
41+
):
42+
pass
43+
44+
def execute(self, t, # type: Process
45+
job_order_object, # type: Dict[Text, Any]
46+
logger=_logger,
47+
**kwargs # type: Any
48+
):
49+
# type: (...) -> Tuple[Dict[Text, Any], Text]
50+
51+
if "basedir" not in kwargs:
52+
raise WorkflowException("Must provide 'basedir' in kwargs")
53+
54+
finaloutdir = os.path.abspath(kwargs.get("outdir")) if kwargs.get("outdir") else None
55+
kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get(
56+
"tmp_outdir_prefix") else tempfile.mkdtemp()
57+
self.output_dirs.add(kwargs["outdir"])
58+
kwargs["mutation_manager"] = MutationManager()
59+
60+
jobReqs = None
61+
if "cwl:requirements" in job_order_object:
62+
jobReqs = job_order_object["cwl:requirements"]
63+
elif ("cwl:defaults" in t.metadata and "cwl:requirements" in t.metadata["cwl:defaults"]):
64+
jobReqs = t.metadata["cwl:defaults"]["cwl:requirements"]
65+
if jobReqs:
66+
for req in jobReqs:
67+
t.requirements.append(req)
68+
69+
self.run_jobs(t, job_order_object, logger, **kwargs)
70+
71+
if self.final_output and self.final_output[0] and finaloutdir:
72+
self.final_output[0] = relocateOutputs(self.final_output[0], finaloutdir,
73+
self.output_dirs, kwargs.get("move_outputs"),
74+
kwargs["make_fs_access"](""),
75+
kwargs["compute_checksum"])
76+
77+
if kwargs.get("rm_tmpdir"):
78+
cleanIntermediate(self.output_dirs)
79+
80+
if self.final_output and self.final_status:
81+
return (self.final_output[0], self.final_status[0])
82+
else:
83+
return (None, "permanentFail")
84+
85+
86+
class SingleJobExecutor(JobExecutor):
87+
def run_jobs(self,
88+
t, # type: Process
89+
job_order_object, # type: Dict[Text, Any]
90+
logger,
91+
**kwargs # type: Any
92+
):
93+
jobiter = t.job(job_order_object,
94+
self.output_callback,
95+
**kwargs)
96+
97+
try:
98+
for r in jobiter:
99+
if r:
100+
builder = kwargs.get("builder", None) # type: Builder
101+
if builder is not None:
102+
r.builder = builder
103+
if r.outdir:
104+
self.output_dirs.add(r.outdir)
105+
r.run(**kwargs)
106+
else:
107+
logger.error("Workflow cannot make any more progress.")
108+
break
109+
except WorkflowException:
110+
raise
111+
except Exception as e:
112+
logger.exception("Got workflow error")
113+
raise WorkflowException(Text(e))
114+
115+
116+
class MultithreadedJobExecutor(JobExecutor):
117+
def __init__(self):
118+
super(MultithreadedJobExecutor, self).__init__()
119+
self.threads = set()
120+
self.exceptions = []
121+
122+
def run_job(self,
123+
job, # type: JobBase
124+
**kwargs # type: Any
125+
):
126+
# type: (...) -> None
127+
def runner():
128+
try:
129+
job.run(**kwargs)
130+
except WorkflowException as e:
131+
self.exceptions.append(e)
132+
except Exception as e:
133+
self.exceptions.append(WorkflowException(Text(e)))
134+
135+
self.threads.remove(thread)
136+
137+
thread = threading.Thread(target=runner)
138+
thread.daemon = True
139+
self.threads.add(thread)
140+
thread.start()
141+
142+
def wait_for_next_completion(self): # type: () -> None
143+
if self.exceptions:
144+
raise self.exceptions[0]
145+
146+
def run_jobs(self,
147+
t, # type: Process
148+
job_order_object, # type: Dict[Text, Any]
149+
logger,
150+
**kwargs # type: Any
151+
):
152+
153+
jobiter = t.job(job_order_object, self.output_callback, **kwargs)
154+
155+
for r in jobiter:
156+
if r:
157+
builder = kwargs.get("builder", None) # type: Builder
158+
if builder is not None:
159+
r.builder = builder
160+
if r.outdir:
161+
self.output_dirs.add(r.outdir)
162+
self.run_job(r, **kwargs)
163+
else:
164+
if len(self.threads):
165+
self.wait_for_next_completion()
166+
else:
167+
logger.error("Workflow cannot make any more progress.")
168+
break
169+
170+
while len(self.threads) > 0:
171+
self.wait_for_next_completion()

cwltool/expression.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,10 @@ def evaluator(ex, jslib, obj, fullJS=False, timeout=None, force_docker_pull=Fals
173173

174174
def interpolate(scan, rootvars,
175175
timeout=None, fullJS=None, jslib="", force_docker_pull=False,
176-
debug=False, js_console=False):
177-
# type: (Text, Dict[Text, Any], int, bool, Union[str, Text], bool, bool, bool) -> JSON
178-
scan = scan.strip()
176+
debug=False, js_console=False, strip_whitespace=True):
177+
# type: (Text, Dict[Text, Any], int, bool, Union[str, Text], bool, bool, bool, bool) -> JSON
178+
if strip_whitespace:
179+
scan = scan.strip()
179180
parts = []
180181
w = scanner(scan)
181182
while w:
@@ -185,7 +186,7 @@ def interpolate(scan, rootvars,
185186
e = evaluator(scan[w[0] + 1:w[1]], jslib, rootvars, fullJS=fullJS,
186187
timeout=timeout, force_docker_pull=force_docker_pull,
187188
debug=debug, js_console=js_console)
188-
if w[0] == 0 and w[1] == len(scan):
189+
if w[0] == 0 and w[1] == len(scan) and len(parts) <= 1:
189190
return e
190191
leaf = json.dumps(e, sort_keys=True)
191192
if leaf[0] == '"':
@@ -202,8 +203,9 @@ def interpolate(scan, rootvars,
202203

203204

204205
def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources,
205-
context=None, pull_image=True, timeout=None, force_docker_pull=False, debug=False, js_console=False):
206-
# type: (Union[dict, AnyStr], Dict[Text, Union[Dict, List, Text]], List[Dict[Text, Any]], Text, Text, Dict[Text, Union[int, Text]], Any, bool, int, bool, bool, bool) -> Any
206+
context=None, pull_image=True, timeout=None, force_docker_pull=False,
207+
debug=False, js_console=False, strip_whitespace=True):
208+
# type: (Union[dict, AnyStr], Dict[Text, Union[Dict, List, Text]], List[Dict[Text, Any]], Text, Text, Dict[Text, Union[int, Text]], Any, bool, int, bool, bool, bool, bool) -> Any
207209

208210
runtime = copy.copy(resources)
209211
runtime["tmpdir"] = docker_windows_path_adjust(tmpdir)
@@ -231,7 +233,8 @@ def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources,
231233
jslib=jslib,
232234
force_docker_pull=force_docker_pull,
233235
debug=debug,
234-
js_console=js_console)
236+
js_console=js_console,
237+
strip_whitespace=strip_whitespace)
235238

236239
except Exception as e:
237240
raise WorkflowException("Expression evaluation error:\n%s" % e)

0 commit comments

Comments
 (0)