Skip to content

Reuse nodejs subprocess for faster expressions #178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cwltool/cwlNodeEngine.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"use strict";
process.stdin.setEncoding('utf8');
var incoming = "";
process.stdin.on('data', function(chunk) {
incoming += chunk;
var i = incoming.indexOf("\n");
if (i > -1) {
var fn = JSON.parse(incoming.substr(0, i));
incoming = incoming.substr(i+1);
process.stdout.write(JSON.stringify(require("vm").runInNewContext(fn, {})) + "\n");
}
});
process.stdin.on('end', process.exit);
74 changes: 61 additions & 13 deletions cwltool/sandboxjs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,36 @@
import threading
import errno
import logging
from typing import Any, Dict, List, Mapping, Text, TypeVar, Union
import select
import os

import cStringIO
from cStringIO import StringIO
from typing import Any, Dict, List, Mapping, Text, TypeVar, Union
from pkg_resources import resource_stream

class JavascriptException(Exception):
pass

_logger = logging.getLogger("cwltool")

JSON = Union[Dict[Any,Any], List[Any], Text, int, long, float, bool, None]
JSON = Union[Dict[Text,Any], List[Any], Text, int, long, float, bool, None]

localdata = threading.local()

have_node_slim = False

def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) -> JSON
def new_js_proc():
# type: () -> subprocess.Popen

res = resource_stream(__name__, 'cwlNodeEngine.js')
nodecode = res.read()

nodejs = None
trynodes = ("nodejs", "node")
for n in trynodes:
try:
nodejs = subprocess.Popen([n], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
nodejs = subprocess.Popen([n, "--eval", nodecode], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
break
except OSError as e:
if e.errno == errno.ENOENT:
Expand All @@ -39,7 +51,7 @@ def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) -
nodejs = subprocess.Popen(["docker", "run",
"--attach=STDIN", "--attach=STDOUT", "--attach=STDERR",
"--sig-proxy=true", "--interactive",
"--rm", nodeimg],
"--rm", nodeimg, "node", "--eval", nodecode],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError as e:
if e.errno == errno.ENOENT:
Expand All @@ -55,15 +67,24 @@ def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) -
"expressions, but couldn't find it. Tried %s, docker run "
"node:slim" % u", ".join(trynodes))

return nodejs


def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) -> JSON

if not hasattr(localdata, "proc") or localdata.proc.poll() is not None:
localdata.proc = new_js_proc()

nodejs = localdata.proc

fn = u"\"use strict\";\n%s\n(function()%s)()" % (jslib, js if isinstance(js, basestring) and len(js) > 1 and js[0] == '{' else ("{return (%s);}" % js))
script = u"console.log(JSON.stringify(require(\"vm\").runInNewContext(%s, {})));\n" % json.dumps(fn)

killed = []

def term():
try:
nodejs.kill()
killed.append(True)
nodejs.kill()
except OSError:
pass

Expand All @@ -73,17 +94,44 @@ def term():
tm = threading.Timer(timeout, term)
tm.start()

stdoutdata, stderrdata = nodejs.communicate(script)
stdin_buf = StringIO(json.dumps(fn)+"\n")
stdout_buf = StringIO()
stderr_buf = StringIO()

completed = [] # type: List[Union[cStringIO.InputType, cStringIO.OutputType]]
while len(completed) < 3:
rready, wready, _ = select.select([nodejs.stdout, nodejs.stderr], [nodejs.stdin], [])
if nodejs.stdin in wready:
b = stdin_buf.read(select.PIPE_BUF)
if b:
os.write(nodejs.stdin.fileno(), b)
elif stdin_buf not in completed:
completed.append(stdin_buf)
for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)):
if pipes[0] in rready:
b = os.read(pipes[0].fileno(), select.PIPE_BUF)
if b:
pipes[1].write(b)
elif pipes[1] not in completed:
completed.append(pipes[1])
if stdout_buf.getvalue().endswith("\n"):
for buf in (stdout_buf, stderr_buf):
if buf not in completed:
completed.append(buf)
tm.cancel()

stdin_buf.close()
stdoutdata = stdout_buf.getvalue()
stderrdata = stderr_buf.getvalue()

def fn_linenum(): # type: () -> Text
return u"\n".join(u"%04i %s" % (i+1, b) for i, b in enumerate(fn.split("\n")))

if killed:
raise JavascriptException(u"Long-running script killed after %s seconds.\nscript was:\n%s\n" % (timeout, fn_linenum()))

if nodejs.returncode != 0:
raise JavascriptException(u"Returncode was: %s\nscript was:\n%s\nstdout was: '%s'\nstderr was: '%s'\n" % (nodejs.returncode, fn_linenum(), stdoutdata, stderrdata))
if nodejs.poll() not in (None, 0):
if killed:
raise JavascriptException(u"Long-running script killed after %s seconds.\nscript was:\n%s\n" % (timeout, fn_linenum()))
else:
raise JavascriptException(u"Returncode was: %s\nscript was:\n%s\nstdout was: '%s'\nstderr was: '%s'\n" % (nodejs.returncode, fn_linenum(), stdoutdata, stderrdata))
else:
try:
return json.loads(stdoutdata)
Expand Down
46 changes: 0 additions & 46 deletions node-expr-engine/cwlNodeEngine.js

This file was deleted.

3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
'schemas/v1.0/*.yml',
'schemas/v1.0/*.md',
'schemas/v1.0/salad/schema_salad/metaschema/*.yml',
'schemas/v1.0/salad/schema_salad/metaschema/*.md']},
'schemas/v1.0/salad/schema_salad/metaschema/*.md',
'cwlNodeEngine.js']},
install_requires=[
'requests',
'ruamel.yaml == 0.12.4',
Expand Down