Skip to content

Commit 33c2358

Browse files
authored
Merge pull request #467 from jxtx/patch-1
respect tmp-outdir-prefix when containerless
2 parents d598e2d + 1cdc519 commit 33c2358

File tree

8 files changed

+93
-88
lines changed

8 files changed

+93
-88
lines changed

cwltool/docker.py

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,11 @@ def _check_docker_machine_path(path): # type: (Text) -> None
6767
class DockerCommandLineJob(ContainerCommandLineJob):
6868

6969
@staticmethod
70-
def get_image(dockerRequirement, pull_image, dry_run=False, force_pull=False):
71-
# type: (Dict[Text, Text], bool, bool, bool) -> bool
70+
def get_image(dockerRequirement, # type: Dict[Text, Text]
71+
pull_image, # type: bool
72+
force_pull=False, # type: bool
73+
tmp_outdir_prefix=None # type: Text
74+
): # type: (...) -> bool
7275
found = False
7376

7477
if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
@@ -108,57 +111,58 @@ def get_image(dockerRequirement, pull_image, dry_run=False, force_pull=False):
108111
if "dockerPull" in dockerRequirement:
109112
cmd = ["docker", "pull", str(dockerRequirement["dockerPull"])]
110113
_logger.info(Text(cmd))
111-
if not dry_run:
112-
subprocess.check_call(cmd, stdout=sys.stderr)
113-
found = True
114+
subprocess.check_call(cmd, stdout=sys.stderr)
115+
found = True
114116
elif "dockerFile" in dockerRequirement:
115-
dockerfile_dir = str(tempfile.mkdtemp())
117+
dockerfile_dir = str(tempfile.mkdtemp(prefix=tmp_outdir_prefix))
116118
with open(os.path.join(dockerfile_dir, "Dockerfile"), "wb") as df:
117119
df.write(dockerRequirement["dockerFile"].encode('utf-8'))
118120
cmd = ["docker", "build", "--tag=%s" %
119121
str(dockerRequirement["dockerImageId"]), dockerfile_dir]
120122
_logger.info(Text(cmd))
121-
if not dry_run:
122-
subprocess.check_call(cmd, stdout=sys.stderr)
123-
found = True
123+
subprocess.check_call(cmd, stdout=sys.stderr)
124+
found = True
124125
elif "dockerLoad" in dockerRequirement:
125126
cmd = ["docker", "load"]
126127
_logger.info(Text(cmd))
127-
if not dry_run:
128-
if os.path.exists(dockerRequirement["dockerLoad"]):
129-
_logger.info(u"Loading docker image from %s", dockerRequirement["dockerLoad"])
130-
with open(dockerRequirement["dockerLoad"], "rb") as f:
131-
loadproc = subprocess.Popen(cmd, stdin=f, stdout=sys.stderr)
132-
else:
133-
loadproc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=sys.stderr)
134-
_logger.info(u"Sending GET request to %s", dockerRequirement["dockerLoad"])
135-
req = requests.get(dockerRequirement["dockerLoad"], stream=True)
136-
n = 0
137-
for chunk in req.iter_content(1024 * 1024):
138-
n += len(chunk)
139-
_logger.info("\r%i bytes" % (n))
140-
loadproc.stdin.write(chunk)
141-
loadproc.stdin.close()
142-
rcode = loadproc.wait()
143-
if rcode != 0:
144-
raise WorkflowException("Docker load returned non-zero exit status %i" % (rcode))
145-
found = True
128+
if os.path.exists(dockerRequirement["dockerLoad"]):
129+
_logger.info(u"Loading docker image from %s", dockerRequirement["dockerLoad"])
130+
with open(dockerRequirement["dockerLoad"], "rb") as f:
131+
loadproc = subprocess.Popen(cmd, stdin=f, stdout=sys.stderr)
132+
else:
133+
loadproc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=sys.stderr)
134+
_logger.info(u"Sending GET request to %s", dockerRequirement["dockerLoad"])
135+
req = requests.get(dockerRequirement["dockerLoad"], stream=True)
136+
n = 0
137+
for chunk in req.iter_content(1024 * 1024):
138+
n += len(chunk)
139+
_logger.info("\r%i bytes" % (n))
140+
loadproc.stdin.write(chunk)
141+
loadproc.stdin.close()
142+
rcode = loadproc.wait()
143+
if rcode != 0:
144+
raise WorkflowException("Docker load returned non-zero exit status %i" % (rcode))
145+
found = True
146146
elif "dockerImport" in dockerRequirement:
147147
cmd = ["docker", "import", str(dockerRequirement["dockerImport"]),
148148
str(dockerRequirement["dockerImageId"])]
149149
_logger.info(Text(cmd))
150-
if not dry_run:
151-
subprocess.check_call(cmd, stdout=sys.stderr)
152-
found = True
150+
subprocess.check_call(cmd, stdout=sys.stderr)
151+
found = True
153152

154153
if found:
155154
with found_images_lock:
156155
found_images.add(dockerRequirement["dockerImageId"])
157156

158157
return found
159158

160-
def get_from_requirements(self, r, req, pull_image, dry_run=False, force_pull=False):
161-
# type: (Dict[Text, Text], bool, bool, bool, bool) -> Text
159+
def get_from_requirements(self,
160+
r, # type: Dict[Text, Text]
161+
req, # type: bool
162+
pull_image, # type: bool
163+
force_pull=False, # type: bool
164+
tmp_outdir_prefix=None # type: Text
165+
): # type: (...) -> Text
162166
if r:
163167
errmsg = None
164168
try:
@@ -174,7 +178,7 @@ def get_from_requirements(self, r, req, pull_image, dry_run=False, force_pull=Fa
174178
else:
175179
return None
176180

177-
if self.get_image(r, pull_image, dry_run, force_pull=force_pull):
181+
if self.get_image(r, pull_image, force_pull, tmp_outdir_prefix):
178182
return r["dockerImageId"]
179183
else:
180184
if req:

cwltool/executors.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,18 @@ def run_jobs(self,
4343
):
4444
pass
4545

46-
def execute(self, t, # type: Process
46+
def execute(self,
47+
t, # type: Process
4748
job_order_object, # type: Dict[Text, Any]
4849
logger=_logger,
49-
**kwargs # type: Any
50-
):
51-
# type: (...) -> Tuple[Dict[Text, Any], Text]
50+
**kwargs # type: Any
51+
): # type: (...) -> Tuple[Dict[Text, Any], Text]
5252

5353
if "basedir" not in kwargs:
5454
raise WorkflowException("Must provide 'basedir' in kwargs")
5555

5656
finaloutdir = os.path.abspath(kwargs.get("outdir")) if kwargs.get("outdir") else None
57-
kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get(
58-
"tmp_outdir_prefix") else tempfile.mkdtemp()
57+
kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs.get("tmp_outdir_prefix"))
5958
self.output_dirs.add(kwargs["outdir"])
6059
kwargs["mutation_manager"] = MutationManager()
6160
kwargs["toplevel"] = True

cwltool/job.py

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,14 @@ def _setup(self, kwargs): # type: (Dict) -> None
175175
_logger.debug(u"[job %s] initial work dir %s", self.name,
176176
json.dumps({p: self.generatemapper.mapper(p) for p in self.generatemapper.files()}, indent=4))
177177

178-
def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move", secret_store=None):
179-
# type: (List[Text], MutableMapping[Text, Text], bool, Text, SecretStore) -> None
178+
def _execute(self,
179+
runtime, # type:List[Text]
180+
env, # type: MutableMapping[Text, Text]
181+
rm_tmpdir=True, # type: bool
182+
move_outputs="move", # type: Text
183+
secret_store=None, # type: SecretStore
184+
tmp_outdir_prefix=None # type: Text
185+
): # type: (...) -> None
180186

181187
scr, _ = get_feature(self, "ShellCommandRequirement")
182188

@@ -228,14 +234,9 @@ def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move", secret_sto
228234
if builder is not None:
229235
job_script_contents = builder.build_job_script(commands)
230236
rcode = _job_popen(
231-
commands,
232-
stdin_path=stdin_path,
233-
stdout_path=stdout_path,
234-
stderr_path=stderr_path,
235-
env=env,
236-
cwd=self.outdir,
237-
job_script_contents=job_script_contents,
238-
)
237+
commands, stdin_path, stdout_path, stderr_path, env,
238+
self.outdir, tempfile.mkdtemp(prefix=tmp_outdir_prefix),
239+
job_script_contents)
239240

240241
if self.successCodes and rcode in self.successCodes:
241242
processStatus = "success"
@@ -334,15 +335,22 @@ def run(self, pull_image=True, rm_container=True,
334335
stageFiles(self.generatemapper, ignoreWritable=self.inplace_update, symLink=True, secret_store=kwargs.get("secret_store"))
335336
relink_initialworkdir(self.generatemapper, self.outdir, self.builder.outdir, inplace_update=self.inplace_update)
336337

337-
self._execute([], env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs, secret_store=kwargs.get("secret_store"))
338+
self._execute(
339+
[], env, rm_tmpdir, move_outputs, kwargs.get("secret_store"),
340+
kwargs.get("tmp_outdir_prefix"))
338341

339342

340343
class ContainerCommandLineJob(JobBase):
341344
__metaclass__ = ABCMeta
342345

343346
@abstractmethod
344-
def get_from_requirements(self, r, req, pull_image, dry_run=False, force_pull=False):
345-
# type: (Dict[Text, Text], bool, bool, bool, bool) -> Text
347+
def get_from_requirements(self,
348+
r, # type: Dict[Text, Text]
349+
req, # type: bool
350+
pull_image, # type: bool
351+
force_pull=False, # type: bool
352+
tmp_outdir_prefix=None # type: Text
353+
): # type: (...) -> Text
346354
pass
347355

348356
@abstractmethod
@@ -377,7 +385,9 @@ def run(self, pull_image=True, rm_container=True,
377385
try:
378386
env = cast(MutableMapping[Text, Text], os.environ)
379387
if docker_req and kwargs.get("use_container"):
380-
img_id = str(self.get_from_requirements(docker_req, True, pull_image, force_pull=kwargs.get("force_docker_pull")))
388+
img_id = str(self.get_from_requirements(docker_req, True,
389+
pull_image, kwargs.get("force_docker_pull"),
390+
kwargs.get('tmp_outdir_prefix')))
381391
if img_id is None:
382392
if self.builder.find_default_container:
383393
default_container = self.builder.find_default_container()
@@ -404,20 +414,21 @@ def run(self, pull_image=True, rm_container=True,
404414
runtime = self.create_runtime(env, rm_container, record_container_id, cidfile_dir, cidfile_prefix, **kwargs)
405415
runtime.append(img_id)
406416

407-
self._execute(runtime, env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs, secret_store=kwargs.get("secret_store"))
417+
self._execute(
418+
runtime, env, rm_tmpdir, move_outputs, kwargs.get("secret_store"),
419+
kwargs.get("tmp_outdir_prefix"))
408420

409421

410422
def _job_popen(
411-
commands, # type: List[Text]
412-
stdin_path, # type: Text
413-
stdout_path, # type: Text
414-
stderr_path, # type: Text
423+
commands, # type: List[Text]
424+
stdin_path, # type: Text
425+
stdout_path, # type: Text
426+
stderr_path, # type: Text
415427
env, # type: Union[MutableMapping[Text, Text], MutableMapping[str, str]]
416-
cwd, # type: Text
417-
job_dir=None, # type: Text
428+
cwd, # type: Text
429+
job_dir, # type: Text
418430
job_script_contents=None, # type: Text
419-
):
420-
# type: (...) -> int
431+
): # type: (...) -> int
421432
if not job_script_contents and not FORCE_SHELLED_POPEN:
422433

423434
stdin = None # type: Union[IO[Any], int]
@@ -464,9 +475,6 @@ def _job_popen(
464475

465476
return rcode
466477
else:
467-
if job_dir is None:
468-
job_dir = tempfile.mkdtemp(prefix="cwltooljob")
469-
470478
if not job_script_contents:
471479
job_script_contents = SHELL_COMMAND_TEMPLATE
472480

cwltool/process.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,7 @@ def _init_job(self, joborder, **kwargs):
566566
select_resources: callback to select compute resources
567567
debug: enable debugging output
568568
js_console: enable javascript console output
569+
tmp_outdir_prefix: Path prefix for intermediate output directories
569570
"""
570571

571572
builder = Builder()
@@ -622,7 +623,8 @@ def _init_job(self, joborder, **kwargs):
622623
builder.tmpdir = builder.fs_access.docker_compatible_realpath(kwargs.get("docker_tmpdir") or "/tmp")
623624
builder.stagedir = builder.fs_access.docker_compatible_realpath(kwargs.get("docker_stagedir") or "/var/lib/cwl")
624625
else:
625-
builder.outdir = builder.fs_access.realpath(kwargs.get("outdir") or tempfile.mkdtemp())
626+
builder.outdir = builder.fs_access.realpath(kwargs.get("outdir")
627+
or tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]))
626628
if self.tool[u"class"] != 'Workflow':
627629
builder.tmpdir = builder.fs_access.realpath(kwargs.get("tmpdir") or tempfile.mkdtemp())
628630
builder.stagedir = builder.fs_access.realpath(kwargs.get("stagedir") or tempfile.mkdtemp())

cwltool/singularity.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ class SingularityCommandLineJob(ContainerCommandLineJob):
5050
@staticmethod
5151
def get_image(dockerRequirement, # type: Dict[Text, Text]
5252
pull_image, # type: bool
53-
dry_run=False, # type: bool
5453
force_pull=False # type: bool
5554
):
5655
# type: (...) -> bool
@@ -95,9 +94,8 @@ def get_image(dockerRequirement, # type: Dict[Text, Text]
9594
str(dockerRequirement["dockerImageId"]),
9695
str(dockerRequirement["dockerPull"])]
9796
_logger.info(Text(cmd))
98-
if not dry_run:
99-
check_call(cmd, stdout=sys.stderr)
100-
found = True
97+
check_call(cmd, stdout=sys.stderr)
98+
found = True
10199
elif "dockerFile" in dockerRequirement:
102100
raise WorkflowException(SourceLine(
103101
dockerRequirement, 'dockerFile').makeError(
@@ -117,13 +115,12 @@ def get_image(dockerRequirement, # type: Dict[Text, Text]
117115
return found
118116

119117
def get_from_requirements(self,
120-
r, # type: Optional[Dict[Text, Text]]
121-
req, # type: bool
122-
pull_image, # type: bool
123-
dry_run=False, # type: bool
124-
force_pull=False # type: bool
125-
):
126-
# type: (...) -> Text
118+
r, # type: Optional[Dict[Text, Text]]
119+
req, # type: bool
120+
pull_image, # type: bool
121+
force_pull=False, # type: bool
122+
tmp_outdir_prefix=None # type: Text
123+
): # type: (...) -> Text
127124
"""
128125
Returns the filename of the Singularity image (e.g.
129126
hello-world-latest.img).
@@ -144,7 +141,7 @@ def get_from_requirements(self,
144141
else:
145142
return None
146143

147-
if self.get_image(r, pull_image, dry_run, force_pull):
144+
if self.get_image(r, pull_image, force_pull):
148145
return os.path.abspath(r["dockerImageId"])
149146
else:
150147
if req:

cwltool/workflow.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -179,14 +179,8 @@ def __init__(self, workflow, **kwargs):
179179
self.state = None # type: Dict[Text, WorkflowStateItem]
180180
self.processStatus = None # type: Text
181181
self.did_callback = False
182-
183-
if "outdir" in kwargs:
184-
self.outdir = kwargs["outdir"]
185-
elif "tmp_outdir_prefix" in kwargs:
186-
self.outdir = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"])
187-
else:
188-
# tmp_outdir_prefix defaults to tmp, so this is unlikely to be used
189-
self.outdir = tempfile.mkdtemp()
182+
self.outdir = kwargs.get("outdir")
183+
self.outdir = tempfile.mkdtemp(prefix=kwargs.get("tmp_outdir_prefix"))
190184

191185
self.name = uniquename(u"workflow %s" % kwargs.get("name", shortname(self.workflow.tool.get("id", "embedded"))))
192186

jenkins.bat

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ set PATH=%PATH%;"C:\\Program Files\\Docker Toolbox\\"
22
docker-machine start default
33
REM Set the environment variables to use docker-machine and docker commands
44
FOR /f "tokens=*" %%i IN ('docker-machine env --shell cmd default') DO %%i
5-
5+
docker version
66
python setup.py test --addopts "--junit-xml=tests.xml --cov-report xml --cov cwltool"
77
pip install codecov
88
codecov

tox.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ passenv = CI TRAVIS TRAVIS_*
2121
deps =
2222
-rrequirements.txt
2323
py{27,34,35,36}-unit: codecov
24+
py{27,34,35,36}-unit: pytest-xdist
2425
py{27,34,35,36}-lint: flake8
2526

2627
commands =

0 commit comments

Comments
 (0)