Skip to content

Commit 1cf9311

Browse files
Merge branch 'master' into fix-648
2 parents 4c7cf7d + d348729 commit 1cf9311

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+429
-84
lines changed

appveyor.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ install:
3131
- "python -c \"import struct; print(struct.calcsize('P') * 8)\""
3232

3333
build_script:
34+
- "%CMD_IN_ENV% pip install -U setuptools pip"
3435
- "%CMD_IN_ENV% pip install ."
3536

3637

cwltool/argparser.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
2828
parser.add_argument("--no-container", action="store_false", default=True,
2929
help="Do not execute jobs in a Docker container, even when specified by the CommandLineTool",
3030
dest="use_container")
31-
31+
parser.add_argument("--parallel", action="store_true", default=False,
32+
help="[experimental] Run jobs in parallel. "
33+
"Does not currently keep track of ResourceRequirements like the number of cores"
34+
"or memory and can overload this system")
3235
parser.add_argument("--preserve-environment", type=Text, action="append",
3336
help="Preserve specific environment variable when running CommandLineTools. May be provided multiple times.",
3437
metavar="ENVVAR",
@@ -49,6 +52,27 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
4952
default=True, help="Do not delete Docker container used by jobs after they exit",
5053
dest="rm_container")
5154

55+
cidgroup = parser.add_argument_group("Options for recording the Docker "
56+
"container identifier into a file")
57+
cidgroup.add_argument("--record-container-id", action="store_true",
58+
default=False,
59+
help="If enabled, store the Docker container ID into a file. "
60+
"See --cidfile-dir to specify the directory.",
61+
dest="record_container_id")
62+
63+
cidgroup.add_argument("--cidfile-dir", type=Text,
64+
help="Directory for storing the Docker container ID file. "
65+
"The default is the current directory",
66+
default="",
67+
dest="cidfile_dir")
68+
69+
cidgroup.add_argument("--cidfile-prefix", type=Text,
70+
help="Specify a prefix to the container ID filename. "
71+
"Final file name will be followed by a timestamp. "
72+
"The default is no prefix.",
73+
default="",
74+
dest="cidfile_prefix")
75+
5276
parser.add_argument("--tmpdir-prefix", type=Text,
5377
help="Path prefix for temporary directories",
5478
default="tmp")
@@ -120,13 +144,16 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
120144
default=True, dest="strict")
121145

122146
parser.add_argument("--skip-schemas", action="store_true",
123-
help="Skip loading of schemas", default=True, dest="skip_schemas")
147+
help="Skip loading of schemas", default=False, dest="skip_schemas")
124148

125149
exgroup = parser.add_mutually_exclusive_group()
126150
exgroup.add_argument("--verbose", action="store_true", help="Default logging")
127151
exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
128152
exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
129153

154+
parser.add_argument("--timestamps", action="store_true", help="Add "
155+
"timestamps to the errors, warnings, and "
156+
"notifications.")
130157
parser.add_argument("--js-console", action="store_true", help="Enable javascript console output")
131158
parser.add_argument("--user-space-docker-cmd",
132159
help="(Linux/OS X only) Specify a user space docker "
@@ -370,4 +397,4 @@ def generate_parser(toolparser, tool, namemap, records):
370397
default = inp.get("default", None)
371398
add_argument(toolparser, name, inptype, records, description, default)
372399

373-
return toolparser
400+
return toolparser

cwltool/executors.py

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
import logging
2+
import tempfile
3+
import threading
4+
5+
import os
6+
from abc import ABCMeta, abstractmethod
7+
8+
from typing import Dict, Text, Any, Tuple, Set, List
9+
10+
from cwltool.builder import Builder
11+
from cwltool.errors import WorkflowException
12+
from cwltool.mutation import MutationManager
13+
from cwltool.job import JobBase
14+
from cwltool.process import relocateOutputs, cleanIntermediate, Process
15+
16+
17+
_logger = logging.getLogger("cwltool")
18+
19+
defaultStreamHandler = logging.StreamHandler()
20+
_logger.addHandler(defaultStreamHandler)
21+
_logger.setLevel(logging.INFO)
22+
23+
24+
class JobExecutor(object):
25+
__metaclass__ = ABCMeta
26+
27+
def __init__(self):
28+
# type: (...) -> None
29+
self.final_output = [] # type: List
30+
self.final_status = [] # type: List
31+
self.output_dirs = set() # type: Set
32+
33+
def __call__(self, *args, **kwargs):
34+
return self.execute(*args, **kwargs)
35+
36+
def output_callback(self, out, processStatus):
37+
self.final_status.append(processStatus)
38+
self.final_output.append(out)
39+
40+
@abstractmethod
41+
def run_jobs(self,
42+
t, # type: Process
43+
job_order_object, # type: Dict[Text, Any]
44+
logger,
45+
**kwargs # type: Any
46+
):
47+
pass
48+
49+
def execute(self, t, # type: Process
50+
job_order_object, # type: Dict[Text, Any]
51+
logger=_logger,
52+
**kwargs # type: Any
53+
):
54+
# type: (...) -> Tuple[Dict[Text, Any], Text]
55+
56+
if "basedir" not in kwargs:
57+
raise WorkflowException("Must provide 'basedir' in kwargs")
58+
59+
finaloutdir = os.path.abspath(kwargs.get("outdir")) if kwargs.get("outdir") else None
60+
kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get(
61+
"tmp_outdir_prefix") else tempfile.mkdtemp()
62+
self.output_dirs.add(kwargs["outdir"])
63+
kwargs["mutation_manager"] = MutationManager()
64+
65+
jobReqs = None
66+
if "cwl:requirements" in job_order_object:
67+
jobReqs = job_order_object["cwl:requirements"]
68+
elif ("cwl:defaults" in t.metadata and "cwl:requirements" in t.metadata["cwl:defaults"]):
69+
jobReqs = t.metadata["cwl:defaults"]["cwl:requirements"]
70+
if jobReqs:
71+
for req in jobReqs:
72+
t.requirements.append(req)
73+
74+
self.run_jobs(t, job_order_object, logger, **kwargs)
75+
76+
if self.final_output and self.final_output[0] and finaloutdir:
77+
self.final_output[0] = relocateOutputs(self.final_output[0], finaloutdir,
78+
self.output_dirs, kwargs.get("move_outputs"),
79+
kwargs["make_fs_access"](""))
80+
81+
if kwargs.get("rm_tmpdir"):
82+
cleanIntermediate(self.output_dirs)
83+
84+
if self.final_output and self.final_status:
85+
return (self.final_output[0], self.final_status[0])
86+
else:
87+
return (None, "permanentFail")
88+
89+
90+
class SingleJobExecutor(JobExecutor):
91+
def run_jobs(self,
92+
t, # type: Process
93+
job_order_object, # type: Dict[Text, Any]
94+
logger,
95+
**kwargs # type: Any
96+
):
97+
jobiter = t.job(job_order_object,
98+
self.output_callback,
99+
**kwargs)
100+
101+
try:
102+
for r in jobiter:
103+
if r:
104+
builder = kwargs.get("builder", None) # type: Builder
105+
if builder is not None:
106+
r.builder = builder
107+
if r.outdir:
108+
self.output_dirs.add(r.outdir)
109+
r.run(**kwargs)
110+
else:
111+
logger.error("Workflow cannot make any more progress.")
112+
break
113+
except WorkflowException:
114+
raise
115+
except Exception as e:
116+
logger.exception("Got workflow error")
117+
raise WorkflowException(Text(e))
118+
119+
120+
class MultithreadedJobExecutor(JobExecutor):
121+
def __init__(self):
122+
super(MultithreadedJobExecutor, self).__init__()
123+
self.threads = set()
124+
self.exceptions = []
125+
126+
def run_job(self,
127+
job, # type: JobBase
128+
**kwargs # type: Any
129+
):
130+
# type: (...) -> None
131+
def runner():
132+
try:
133+
job.run(**kwargs)
134+
except WorkflowException as e:
135+
self.exceptions.append(e)
136+
except Exception as e:
137+
self.exceptions.append(WorkflowException(Text(e)))
138+
139+
self.threads.remove(thread)
140+
141+
thread = threading.Thread(target=runner)
142+
thread.daemon = True
143+
self.threads.add(thread)
144+
thread.start()
145+
146+
def wait_for_next_completion(self): # type: () -> None
147+
if self.exceptions:
148+
raise self.exceptions[0]
149+
150+
def run_jobs(self,
151+
t, # type: Process
152+
job_order_object, # type: Dict[Text, Any]
153+
logger,
154+
**kwargs # type: Any
155+
):
156+
157+
jobiter = t.job(job_order_object, self.output_callback, **kwargs)
158+
159+
for r in jobiter:
160+
if r:
161+
builder = kwargs.get("builder", None) # type: Builder
162+
if builder is not None:
163+
r.builder = builder
164+
if r.outdir:
165+
self.output_dirs.add(r.outdir)
166+
self.run_job(r, **kwargs)
167+
else:
168+
if len(self.threads):
169+
self.wait_for_next_completion()
170+
else:
171+
logger.error("Workflow cannot make any more progress.")
172+
break
173+
174+
while len(self.threads) > 0:
175+
self.wait_for_next_completion()

cwltool/expression.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ def interpolate(scan, rootvars,
206206
e = evaluator(scan[w[0] + 1:w[1]], jslib, rootvars, fullJS=fullJS,
207207
timeout=timeout, force_docker_pull=force_docker_pull,
208208
debug=debug, js_console=js_console)
209-
if w[0] == 0 and w[1] == len(scan):
209+
if w[0] == 0 and w[1] == len(scan) and len(parts) <= 1:
210210
return e
211211
leaf = json.dumps(e, sort_keys=True)
212212
if leaf[0] == '"':

cwltool/factory.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
from typing import Callable as tCallable
44
from typing import Any, Dict, Text, Tuple, Union
55

6-
from . import load_tool, main, workflow
6+
from . import load_tool, workflow
77
from .argparser import get_default_args
8+
from .executors import SingleJobExecutor
89
from .process import Process
910

1011

@@ -36,11 +37,13 @@ class Factory(object):
3637
def __init__(self,
3738
makeTool=workflow.defaultMakeTool, # type: tCallable[[Any], Process]
3839
# should be tCallable[[Dict[Text, Any], Any], Process] ?
39-
executor=main.single_job_executor, # type: tCallable[...,Tuple[Dict[Text,Any], Text]]
40+
executor=None, # type: tCallable[...,Tuple[Dict[Text,Any], Text]]
4041
**execkwargs # type: Any
4142
):
4243
# type: (...) -> None
4344
self.makeTool = makeTool
45+
if executor is None:
46+
executor = SingleJobExecutor()
4447
self.executor = executor
4548

4649
kwargs = get_default_args()

cwltool/job.py

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import absolute_import
2+
23
import codecs
4+
import datetime
35
import functools
46
import io
57
import json
@@ -12,25 +14,28 @@
1214
import sys
1315
import tempfile
1416
from io import open
17+
from threading import Lock
1518
from typing import (IO, Any, Callable, Dict, Iterable, List, MutableMapping, Text,
16-
Tuple, Union, cast)
19+
Union, cast)
1720

1821
import shellescape
1922

20-
from .utils import copytree_with_merge, docker_windows_path_adjust, onWindows
2123
from . import docker
2224
from .builder import Builder
2325
from .docker_id import docker_vm_id
2426
from .errors import WorkflowException
2527
from .pathmapper import PathMapper, ensure_writable
26-
from .process import (UnsupportedRequirement, empty_subtree, get_feature,
28+
from .process import (UnsupportedRequirement, get_feature,
2729
stageFiles)
2830
from .utils import bytes2str_in_dicts
31+
from .utils import copytree_with_merge, docker_windows_path_adjust, onWindows
2932

3033
_logger = logging.getLogger("cwltool")
3134

3235
needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""")
3336

37+
job_output_lock = Lock()
38+
3439
FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1"
3540

3641
SHELL_COMMAND_TEMPLATE = """#!/bin/bash
@@ -266,7 +271,8 @@ def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move"):
266271
if _logger.isEnabledFor(logging.DEBUG):
267272
_logger.debug(u"[job %s] %s", self.name, json.dumps(outputs, indent=4))
268273

269-
self.output_callback(outputs, processStatus)
274+
with job_output_lock:
275+
self.output_callback(outputs, processStatus)
270276

271277
if self.stagedir and os.path.exists(self.stagedir):
272278
_logger.debug(u"[job %s] Removing input staging directory %s", self.name, self.stagedir)
@@ -363,8 +369,10 @@ def add_volumes(self, pathmapper, runtime):
363369
docker_windows_path_adjust(vol.target)))
364370

365371
def run(self, pull_image=True, rm_container=True,
372+
record_container_id=False, cidfile_dir="",
373+
cidfile_prefix="",
366374
rm_tmpdir=True, move_outputs="move", **kwargs):
367-
# type: (bool, bool, bool, Text, **Any) -> None
375+
# type: (bool, bool, bool, Text, Text, bool, Text, **Any) -> None
368376

369377
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
370378

@@ -463,6 +471,26 @@ def run(self, pull_image=True, rm_container=True,
463471
# directory." but spec might change to designated temp directory.
464472
# runtime.append("--env=HOME=/tmp")
465473
runtime.append(u"--env=HOME=%s" % self.builder.outdir)
474+
475+
# add parameters to docker to write a container ID file
476+
if record_container_id:
477+
if cidfile_dir != "":
478+
if not os.path.isdir(cidfile_dir):
479+
_logger.error("--cidfile-dir %s error:\n%s", cidfile_dir,
480+
cidfile_dir + " is not a directory or "
481+
"directory doesn't exist, please check it first")
482+
exit(2)
483+
if not os.path.exists(cidfile_dir):
484+
_logger.error("--cidfile-dir %s error:\n%s", cidfile_dir,
485+
"directory doesn't exist, please create it first")
486+
exit(2)
487+
else:
488+
cidfile_dir = os.getcwd()
489+
cidfile_name = datetime.datetime.now().strftime("%Y%m%d%H%M%S-%f")+".cid"
490+
if cidfile_prefix != "":
491+
cidfile_name = str(cidfile_prefix + "-" + cidfile_name)
492+
cidfile_path = os.path.join(cidfile_dir, cidfile_name)
493+
runtime.append(u"--cidfile=%s" % cidfile_path)
466494

467495
for t, v in self.environment.items():
468496
runtime.append(u"--env=%s=%s" % (t, v))

0 commit comments

Comments
 (0)