Skip to content

Commit ef2e818

Browse files
committed
Refactor job.py to allow modification of job environment w/shell commands.
1 parent 68bfef3 commit ef2e818

File tree

1 file changed

+144
-22
lines changed

1 file changed

+144
-22
lines changed

cwltool/job.py

Lines changed: 144 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import yaml
88
import logging
99
import sys
10+
import string
1011
import requests
1112
from . import docker
1213
from .process import get_feature, empty_subtree
@@ -25,6 +26,9 @@
2526

2627
needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""")
2728

29+
FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "1") == "1"
30+
31+
2832
def deref_links(outputs): # type: (Any) -> None
2933
if isinstance(outputs, dict):
3034
if outputs.get("class") == "File":
@@ -174,37 +178,26 @@ def run(self, dry_run=False, pull_image=True, rm_container=True,
174178
raise Exception("Unhandled type")
175179

176180
if self.stdin:
177-
stdin = open(self.pathmapper.mapper(self.stdin)[0], "rb")
181+
stdin_path = self.pathmapper.mapper(self.stdin)[0]
178182
else:
179-
stdin = subprocess.PIPE
183+
stdin_path = None
180184

181185
if self.stdout:
182186
absout = os.path.join(self.outdir, self.stdout)
183187
dn = os.path.dirname(absout)
184188
if dn and not os.path.exists(dn):
185189
os.makedirs(dn)
186-
stdout = open(absout, "wb")
190+
stdout_path = absout
187191
else:
188-
stdout = sys.stderr
189-
190-
sp = subprocess.Popen([str(x) for x in runtime + self.command_line],
191-
shell=False,
192-
close_fds=True,
193-
stdin=stdin,
194-
stdout=stdout,
195-
env=env,
196-
cwd=self.outdir)
197-
198-
if sp.stdin:
199-
sp.stdin.close()
192+
stdout_path = None
200193

201-
rcode = sp.wait()
202-
203-
if isinstance(stdin, file):
204-
stdin.close()
205-
206-
if stdout is not sys.stderr:
207-
stdout.close()
194+
rcode = shelled_popen(
195+
[str(x) for x in runtime + self.command_line],
196+
stdin_path=stdin_path,
197+
stdout_path=stdout_path,
198+
env=env,
199+
cwd=self.outdir,
200+
)
208201

209202
if self.successCodes and rcode in self.successCodes:
210203
processStatus = "success"
@@ -258,3 +251,132 @@ def run(self, dry_run=False, pull_image=True, rm_container=True,
258251
if move_outputs and empty_subtree(self.outdir):
259252
_logger.debug(u"[job %s] Removing empty output directory %s", self.name, self.outdir)
260253
shutil.rmtree(self.outdir, True)
254+
255+
256+
SHELL_COMMAND_TEMPLATE = string.Template("""#!/bin/bash
257+
$prefix
258+
python "run_job.py" "job.json"
259+
""")
260+
PYTHON_RUN_SCRIPT = """
261+
import json
262+
import sys
263+
import subprocess
264+
265+
with open(sys.argv[1], "r") as f:
266+
popen_description = json.load(f)
267+
commands = popen_description["commands"]
268+
cwd = popen_description["cwd"]
269+
env = popen_description["env"]
270+
stdin_path = popen_description["stdin_path"]
271+
stdout_path = popen_description["stdout_path"]
272+
273+
if stdin_path is not None:
274+
stdin = open(stdin_path, "rd")
275+
else:
276+
stdin = subprocess.PIPE
277+
278+
if stdout_path is not None:
279+
stdout = open(stdout_path, "wb")
280+
else:
281+
stdout = sys.stderr
282+
283+
sp = subprocess.Popen(commands,
284+
shell=False,
285+
close_fds=True,
286+
stdin=stdin,
287+
stdout=stdout,
288+
env=env,
289+
cwd=cwd)
290+
291+
if sp.stdin:
292+
sp.stdin.close()
293+
294+
rcode = sp.wait()
295+
296+
if isinstance(stdin, file):
297+
stdin.close()
298+
299+
if stdout is not sys.stderr:
300+
stdout.close()
301+
302+
sys.exit(rcode)
303+
"""
304+
305+
306+
def shelled_popen(commands,
307+
stdin_path,
308+
stdout_path,
309+
env,
310+
cwd,
311+
prefix=None):
312+
if prefix is None and not FORCE_SHELLED_POPEN:
313+
if stdin_path is not None:
314+
stdin = open(stdin_path, "rd")
315+
else:
316+
stdin = subprocess.PIPE
317+
318+
if stdout_path is not None:
319+
stdout = open(stdout_path, "wb")
320+
else:
321+
stdout = sys.stderr
322+
323+
sp = subprocess.Popen(commands,
324+
shell=False,
325+
close_fds=True,
326+
stdin=stdin,
327+
stdout=stdout,
328+
env=env,
329+
cwd=cwd)
330+
331+
if sp.stdin:
332+
sp.stdin.close()
333+
334+
rcode = sp.wait()
335+
336+
if isinstance(stdin, file):
337+
stdin.close()
338+
339+
if stdout is not sys.stderr:
340+
stdout.close()
341+
342+
return rcode
343+
else:
344+
template_kwds = dict(
345+
prefix=prefix or '',
346+
)
347+
job_script_contents = SHELL_COMMAND_TEMPLATE.substitute(
348+
**template_kwds
349+
)
350+
job_dir = tempfile.mkdtemp(prefix="cwltooljob")
351+
job_description = dict(
352+
commands=commands,
353+
cwd=cwd,
354+
env=env,
355+
stdout_path=stdout_path,
356+
stdin_path=stdin_path,
357+
)
358+
with open(os.path.join(job_dir, "job.json"), "w") as f:
359+
json.dump(job_description, f)
360+
try:
361+
job_script = os.path.join(job_dir, "run_job.bash")
362+
with open(job_script, "w") as f:
363+
f.write(job_script_contents)
364+
job_run = os.path.join(job_dir, "run_job.py")
365+
with open(job_run, "w") as f:
366+
f.write(PYTHON_RUN_SCRIPT)
367+
sp = subprocess.Popen(
368+
["bash", job_script],
369+
shell=False,
370+
cwd=job_dir,
371+
stdout=subprocess.PIPE,
372+
stderr=subprocess.PIPE,
373+
stdin=subprocess.PIPE,
374+
)
375+
if sp.stdin:
376+
sp.stdin.close()
377+
378+
rcode = sp.wait()
379+
380+
return rcode
381+
finally:
382+
shutil.rmtree(job_dir)

0 commit comments

Comments
 (0)