Skip to content

Commit cec71f7

Browse files
committed
Refactor job.py to allow modification of job environment w/shell commands.
1 parent 5fe90dc commit cec71f7

File tree

1 file changed

+167
-28
lines changed

1 file changed

+167
-28
lines changed

cwltool/job.py

Lines changed: 167 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import yaml
88
import logging
99
import sys
10+
import string
1011
import requests
1112
from . import docker
1213
from .process import get_feature, empty_subtree
@@ -25,6 +26,9 @@
2526

2627
needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""")
2728

29+
FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "1") == "1"
30+
31+
2832
def deref_links(outputs): # type: (Any) -> None
2933
if isinstance(outputs, dict):
3034
if outputs.get("class") == "File":
@@ -177,50 +181,36 @@ def run(self, dry_run=False, pull_image=True, rm_container=True,
177181
raise Exception("Unhandled type %s", type(entry))
178182

179183
if self.stdin:
180-
stdin = open(self.pathmapper.mapper(self.stdin)[0], "rb")
184+
stdin_path = self.pathmapper.mapper(self.stdin)[0]
181185
else:
182-
stdin = subprocess.PIPE
186+
stdin_path = None
183187

184188
if self.stderr:
185189
abserr = os.path.join(self.outdir, self.stderr)
186190
dnerr = os.path.dirname(abserr)
187191
if dnerr and not os.path.exists(dnerr):
188192
os.makedirs(dnerr)
189-
stderr = open(abserr, "wb")
193+
stderr_path = abserr
190194
else:
191-
stderr = sys.stderr
195+
stderr_path = None
192196

193197
if self.stdout:
194198
absout = os.path.join(self.outdir, self.stdout)
195199
dn = os.path.dirname(absout)
196200
if dn and not os.path.exists(dn):
197201
os.makedirs(dn)
198-
stdout = open(absout, "wb")
202+
stdout_path = absout
199203
else:
200-
stdout = sys.stderr
201-
202-
sp = subprocess.Popen([str(x) for x in runtime + self.command_line],
203-
shell=False,
204-
close_fds=True,
205-
stdin=stdin,
206-
stderr=stderr,
207-
stdout=stdout,
208-
env=env,
209-
cwd=self.outdir)
210-
211-
if sp.stdin:
212-
sp.stdin.close()
213-
214-
rcode = sp.wait()
215-
216-
if isinstance(stdin, file):
217-
stdin.close()
204+
stdout_path = None
218205

219-
if stderr is not sys.stderr:
220-
stderr.close()
221-
222-
if stdout is not sys.stderr:
223-
stdout.close()
206+
rcode = shelled_popen(
207+
[str(x) for x in runtime + self.command_line],
208+
stdin_path=stdin_path,
209+
stdout_path=stdout_path,
210+
stderr_path=stderr_path,
211+
env=env,
212+
cwd=self.outdir,
213+
)
224214

225215
if self.successCodes and rcode in self.successCodes:
226216
processStatus = "success"
@@ -274,3 +264,152 @@ def run(self, dry_run=False, pull_image=True, rm_container=True,
274264
if move_outputs and empty_subtree(self.outdir):
275265
_logger.debug(u"[job %s] Removing empty output directory %s", self.name, self.outdir)
276266
shutil.rmtree(self.outdir, True)
267+
268+
269+
SHELL_COMMAND_TEMPLATE = string.Template("""#!/bin/bash
270+
$prefix
271+
python "run_job.py" "job.json"
272+
""")
273+
PYTHON_RUN_SCRIPT = """
274+
import json
275+
import sys
276+
import subprocess
277+
278+
with open(sys.argv[1], "r") as f:
279+
popen_description = json.load(f)
280+
commands = popen_description["commands"]
281+
cwd = popen_description["cwd"]
282+
env = popen_description["env"]
283+
stdin_path = popen_description["stdin_path"]
284+
stdout_path = popen_description["stdout_path"]
285+
stderr_path = popen_description["stderr_path"]
286+
287+
if stdin_path is not None:
288+
stdin = open(stdin_path, "rd")
289+
else:
290+
stdin = subprocess.PIPE
291+
292+
if stdout_path is not None:
293+
stdout = open(stdout_path, "wb")
294+
else:
295+
stdout = sys.stderr
296+
297+
if stderr_path is not None:
298+
stderr = open(stderr_path, "wb)
299+
else:
300+
stderr = sys.stderr
301+
302+
sp = subprocess.Popen(commands,
303+
shell=False,
304+
close_fds=True,
305+
stdin=stdin,
306+
stdout=stdout,
307+
env=env,
308+
cwd=cwd)
309+
310+
if sp.stdin:
311+
sp.stdin.close()
312+
313+
rcode = sp.wait()
314+
315+
if isinstance(stdin, file):
316+
stdin.close()
317+
318+
if stdout is not sys.stderr:
319+
stdout.close()
320+
321+
if stderr is not sys.stderr:
322+
stderr.close()
323+
324+
sys.exit(rcode)
325+
"""
326+
327+
328+
def shelled_popen(commands,
329+
stdin_path,
330+
stdout_path,
331+
stderr_path,
332+
env,
333+
cwd,
334+
prefix=None):
335+
if prefix is None and not FORCE_SHELLED_POPEN:
336+
if stdin_path is not None:
337+
stdin = open(stdin_path, "rd")
338+
else:
339+
stdin = subprocess.PIPE
340+
341+
if stdout_path is not None:
342+
stdout = open(stdout_path, "wb")
343+
else:
344+
stdout = sys.stderr
345+
346+
if stderr_path is not None:
347+
stderr = open(stderr_path, "wb")
348+
else:
349+
stderr = sys.stderr
350+
351+
sp = subprocess.Popen(commands,
352+
shell=False,
353+
close_fds=True,
354+
stdin=stdin,
355+
stdout=stdout,
356+
stderr=stderr,
357+
env=env,
358+
cwd=cwd)
359+
360+
if sp.stdin:
361+
sp.stdin.close()
362+
363+
rcode = sp.wait()
364+
365+
if isinstance(stdin, file):
366+
stdin.close()
367+
368+
if stdout is not sys.stderr:
369+
stdout.close()
370+
371+
if stderr is not sys.stderr:
372+
stderr.close()
373+
374+
return rcode
375+
else:
376+
template_kwds = dict(
377+
prefix=prefix or '',
378+
)
379+
job_script_contents = SHELL_COMMAND_TEMPLATE.substitute(
380+
**template_kwds
381+
)
382+
job_dir = tempfile.mkdtemp(prefix="cwltooljob")
383+
job_description = dict(
384+
commands=commands,
385+
cwd=cwd,
386+
env=env,
387+
stdout_path=stdout_path,
388+
stderr_path=stderr_path,
389+
stdin_path=stdin_path,
390+
)
391+
with open(os.path.join(job_dir, "job.json"), "w") as f:
392+
json.dump(job_description, f)
393+
try:
394+
job_script = os.path.join(job_dir, "run_job.bash")
395+
with open(job_script, "w") as f:
396+
f.write(job_script_contents)
397+
job_run = os.path.join(job_dir, "run_job.py")
398+
with open(job_run, "w") as f:
399+
f.write(PYTHON_RUN_SCRIPT)
400+
sp = subprocess.Popen(
401+
["bash", job_script],
402+
shell=False,
403+
cwd=job_dir,
404+
stdout=subprocess.PIPE,
405+
stderr=subprocess.PIPE,
406+
stdin=subprocess.PIPE,
407+
)
408+
if sp.stdin:
409+
sp.stdin.close()
410+
411+
rcode = sp.wait()
412+
413+
return rcode
414+
finally:
415+
shutil.rmtree(job_dir)

0 commit comments

Comments
 (0)