Skip to content

Commit c988bb6

Browse files
committed
Refactor job.py to allow modification of job environment w/shell commands.
1 parent 5c02ee7 commit c988bb6

File tree

1 file changed

+167
-28
lines changed

1 file changed

+167
-28
lines changed

cwltool/job.py

Lines changed: 167 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import json
77
import logging
88
import sys
9+
import string
910
import requests
1011
from . import docker
1112
from .process import get_feature, empty_subtree, stageFiles
@@ -24,6 +25,9 @@
2425

2526
needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""")
2627

28+
FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "1") == "1"
29+
30+
2731
def deref_links(outputs): # type: (Any) -> None
2832
if isinstance(outputs, dict):
2933
if outputs.get("class") == "File":
@@ -188,50 +192,36 @@ def linkoutdir(src, tgt):
188192
stageFiles(generatemapper, linkoutdir)
189193

190194
if self.stdin:
191-
stdin = open(self.pathmapper.reversemap(self.stdin)[1], "rb")
195+
stdin_path = self.pathmapper.reversemap(self.stdin)[1]
192196
else:
193-
stdin = subprocess.PIPE
197+
stdin_path = None
194198

195199
if self.stderr:
196200
abserr = os.path.join(self.outdir, self.stderr)
197201
dnerr = os.path.dirname(abserr)
198202
if dnerr and not os.path.exists(dnerr):
199203
os.makedirs(dnerr)
200-
stderr = open(abserr, "wb")
204+
stderr_path = abserr
201205
else:
202-
stderr = sys.stderr
206+
stderr_path = None
203207

204208
if self.stdout:
205209
absout = os.path.join(self.outdir, self.stdout)
206210
dn = os.path.dirname(absout)
207211
if dn and not os.path.exists(dn):
208212
os.makedirs(dn)
209-
stdout = open(absout, "wb")
213+
stdout_path = absout
210214
else:
211-
stdout = sys.stderr
212-
213-
sp = subprocess.Popen([unicode(x).encode('utf-8') for x in runtime + self.command_line],
214-
shell=False,
215-
close_fds=True,
216-
stdin=stdin,
217-
stderr=stderr,
218-
stdout=stdout,
219-
env=env,
220-
cwd=self.outdir)
221-
222-
if sp.stdin:
223-
sp.stdin.close()
224-
225-
rcode = sp.wait()
226-
227-
if isinstance(stdin, file):
228-
stdin.close()
215+
stdout_path = None
229216

230-
if stderr is not sys.stderr:
231-
stderr.close()
232-
233-
if stdout is not sys.stderr:
234-
stdout.close()
217+
rcode = shelled_popen(
218+
[unicode(x).encode('utf-8') for x in runtime + self.command_line],
219+
stdin_path=stdin_path,
220+
stdout_path=stdout_path,
221+
stderr_path=stderr_path,
222+
env=env,
223+
cwd=self.outdir,
224+
)
235225

236226
if self.successCodes and rcode in self.successCodes:
237227
processStatus = "success"
@@ -290,3 +280,152 @@ def linkoutdir(src, tgt):
290280
if move_outputs == "move" and empty_subtree(self.outdir):
291281
_logger.debug(u"[job %s] Removing empty output directory %s", self.name, self.outdir)
292282
shutil.rmtree(self.outdir, True)
283+
284+
285+
SHELL_COMMAND_TEMPLATE = string.Template("""#!/bin/bash
286+
$prefix
287+
python "run_job.py" "job.json"
288+
""")
289+
PYTHON_RUN_SCRIPT = """
290+
import json
291+
import sys
292+
import subprocess
293+
294+
with open(sys.argv[1], "r") as f:
295+
popen_description = json.load(f)
296+
commands = popen_description["commands"]
297+
cwd = popen_description["cwd"]
298+
env = popen_description["env"]
299+
stdin_path = popen_description["stdin_path"]
300+
stdout_path = popen_description["stdout_path"]
301+
stderr_path = popen_description["stderr_path"]
302+
303+
if stdin_path is not None:
304+
stdin = open(stdin_path, "rd")
305+
else:
306+
stdin = subprocess.PIPE
307+
308+
if stdout_path is not None:
309+
stdout = open(stdout_path, "wb")
310+
else:
311+
stdout = sys.stderr
312+
313+
if stderr_path is not None:
314+
stderr = open(stderr_path, "wb)
315+
else:
316+
stderr = sys.stderr
317+
318+
sp = subprocess.Popen(commands,
319+
shell=False,
320+
close_fds=True,
321+
stdin=stdin,
322+
stdout=stdout,
323+
env=env,
324+
cwd=cwd)
325+
326+
if sp.stdin:
327+
sp.stdin.close()
328+
329+
rcode = sp.wait()
330+
331+
if isinstance(stdin, file):
332+
stdin.close()
333+
334+
if stdout is not sys.stderr:
335+
stdout.close()
336+
337+
if stderr is not sys.stderr:
338+
stderr.close()
339+
340+
sys.exit(rcode)
341+
"""
342+
343+
344+
def shelled_popen(commands,
345+
stdin_path,
346+
stdout_path,
347+
stderr_path,
348+
env,
349+
cwd,
350+
prefix=None):
351+
if prefix is None and not FORCE_SHELLED_POPEN:
352+
if stdin_path is not None:
353+
stdin = open(stdin_path, "rd")
354+
else:
355+
stdin = subprocess.PIPE
356+
357+
if stdout_path is not None:
358+
stdout = open(stdout_path, "wb")
359+
else:
360+
stdout = sys.stderr
361+
362+
if stderr_path is not None:
363+
stderr = open(stderr_path, "wb")
364+
else:
365+
stderr = sys.stderr
366+
367+
sp = subprocess.Popen(commands,
368+
shell=False,
369+
close_fds=True,
370+
stdin=stdin,
371+
stdout=stdout,
372+
stderr=stderr,
373+
env=env,
374+
cwd=cwd)
375+
376+
if sp.stdin:
377+
sp.stdin.close()
378+
379+
rcode = sp.wait()
380+
381+
if isinstance(stdin, file):
382+
stdin.close()
383+
384+
if stdout is not sys.stderr:
385+
stdout.close()
386+
387+
if stderr is not sys.stderr:
388+
stderr.close()
389+
390+
return rcode
391+
else:
392+
template_kwds = dict(
393+
prefix=prefix or '',
394+
)
395+
job_script_contents = SHELL_COMMAND_TEMPLATE.substitute(
396+
**template_kwds
397+
)
398+
job_dir = tempfile.mkdtemp(prefix="cwltooljob")
399+
job_description = dict(
400+
commands=commands,
401+
cwd=cwd,
402+
env=env,
403+
stdout_path=stdout_path,
404+
stderr_path=stderr_path,
405+
stdin_path=stdin_path,
406+
)
407+
with open(os.path.join(job_dir, "job.json"), "w") as f:
408+
json.dump(job_description, f)
409+
try:
410+
job_script = os.path.join(job_dir, "run_job.bash")
411+
with open(job_script, "w") as f:
412+
f.write(job_script_contents)
413+
job_run = os.path.join(job_dir, "run_job.py")
414+
with open(job_run, "w") as f:
415+
f.write(PYTHON_RUN_SCRIPT)
416+
sp = subprocess.Popen(
417+
["bash", job_script],
418+
shell=False,
419+
cwd=job_dir,
420+
stdout=subprocess.PIPE,
421+
stderr=subprocess.PIPE,
422+
stdin=subprocess.PIPE,
423+
)
424+
if sp.stdin:
425+
sp.stdin.close()
426+
427+
rcode = sp.wait()
428+
429+
return rcode
430+
finally:
431+
shutil.rmtree(job_dir)

0 commit comments

Comments
 (0)