Skip to content

Commit 0f3014c

Browse files
committed
reduce loading time for packed documents
Cache the document loaders Skip an unneeded deepcopy Propogate the document metadata now that we aren't reparsing from disk
1 parent 0a5f13d commit 0f3014c

File tree

4 files changed

+79
-58
lines changed

4 files changed

+79
-58
lines changed

cwltool/load_tool.py

Lines changed: 63 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
"""Loads a CWL document."""
12
from __future__ import absolute_import
23
# pylint: disable=unused-import
3-
"""Loads a CWL document."""
44

55
import logging
66
import os
@@ -13,14 +13,14 @@
1313

1414
import requests.sessions
1515
from six import itervalues, string_types
16+
from six.moves import urllib
1617

1718
import schema_salad.schema as schema
1819
from avro.schema import Names
1920
from ruamel.yaml.comments import CommentedMap, CommentedSeq
2021
from schema_salad.ref_resolver import Fetcher, Loader, file_uri
2122
from schema_salad.sourceline import cmap
2223
from schema_salad.validate import ValidationException
23-
from six.moves import urllib
2424

2525
from . import process, update
2626
from .errors import WorkflowException
@@ -53,24 +53,33 @@
5353
}
5454
} # type: Dict[Text, Union[Dict[Any, Any], Text, Iterable[Text]]]
5555

56+
57+
loaders = {}
58+
59+
def default_loader(fetcher_constructor):
60+
if fetcher_constructor in loaders:
61+
return loaders[fetcher_constructor]
62+
else:
63+
loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor)
64+
loaders[fetcher_constructor] = loader
65+
return loader
66+
5667
def resolve_tool_uri(argsworkflow, # type: Text
5768
resolver=None, # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
58-
fetcher_constructor=None,
59-
# type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
69+
fetcher_constructor=None, # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
6070
document_loader=None # type: Loader
61-
):
62-
# type: (...) -> Tuple[Text, Text]
71+
): # type: (...) -> Tuple[Text, Text]
6372

6473
uri = None # type: Text
6574
split = urllib.parse.urlsplit(argsworkflow)
6675
# In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that
67-
if split.scheme and split.scheme in [u'http',u'https',u'file']:
76+
if split.scheme and split.scheme in [u'http', u'https', u'file']:
6877
uri = argsworkflow
6978
elif os.path.exists(os.path.abspath(argsworkflow)):
7079
uri = file_uri(str(os.path.abspath(argsworkflow)))
7180
elif resolver:
7281
if document_loader is None:
73-
document_loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore
82+
document_loader = default_loader(fetcher_constructor) # type: ignore
7483
uri = resolver(document_loader, argsworkflow)
7584

7685
if uri is None:
@@ -85,18 +94,17 @@ def resolve_tool_uri(argsworkflow, # type: Text
8594

8695
def fetch_document(argsworkflow, # type: Union[Text, Dict[Text, Any]]
8796
resolver=None, # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
88-
fetcher_constructor=None
89-
# type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
90-
):
91-
# type: (...) -> Tuple[Loader, CommentedMap, Text]
97+
fetcher_constructor=None # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
98+
): # type: (...) -> Tuple[Loader, CommentedMap, Text]
9299
"""Retrieve a CWL document."""
93100

94-
document_loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore
101+
document_loader = default_loader(fetcher_constructor) # type: ignore
95102

96103
uri = None # type: Text
97104
workflowobj = None # type: CommentedMap
98105
if isinstance(argsworkflow, string_types):
99-
uri, fileuri = resolve_tool_uri(argsworkflow, resolver=resolver, document_loader=document_loader)
106+
uri, fileuri = resolve_tool_uri(argsworkflow, resolver=resolver,
107+
document_loader=document_loader)
100108
workflowobj = document_loader.fetch(fileuri)
101109
elif isinstance(argsworkflow, dict):
102110
uri = "#" + Text(id(argsworkflow))
@@ -126,7 +134,7 @@ def _convert_stdstreams_to_files(workflowobj):
126134
sort_keys=True).encode('utf-8')).hexdigest())
127135
workflowobj[streamtype] = filename
128136
out['type'] = 'File'
129-
out['outputBinding'] = {'glob': filename}
137+
out['outputBinding'] = cmap({'glob': filename})
130138
for inp in workflowobj.get('inputs', []):
131139
if inp.get('type') == 'stdin':
132140
if 'inputBinding' in inp:
@@ -170,25 +178,25 @@ def validate_document(document_loader, # type: Loader
170178
enable_dev=False, # type: bool
171179
strict=True, # type: bool
172180
preprocess_only=False, # type: bool
173-
fetcher_constructor=None,
174-
skip_schemas=None,
175-
# type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
176-
overrides=None # type: List[Dict]
181+
fetcher_constructor=None, # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
182+
skip_schemas=None, # type: bool
183+
overrides=None, # type: List[Dict]
184+
metadata=None, # type: Optional[Dict]
177185
):
178186
# type: (...) -> Tuple[Loader, Names, Union[Dict[Text, Any], List[Dict[Text, Any]]], Dict[Text, Any], Text]
179187
"""Validate a CWL document."""
180188

181189
if isinstance(workflowobj, list):
182-
workflowobj = {
190+
workflowobj = cmap({
183191
"$graph": workflowobj
184-
}
192+
}, fn=uri)
185193

186194
if not isinstance(workflowobj, dict):
187195
raise ValueError("workflowjobj must be a dict, got '%s': %s" % (type(workflowobj), workflowobj))
188196

189197
jobobj = None
190198
if "cwl:tool" in workflowobj:
191-
job_loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore
199+
job_loader = default_loader(fetcher_constructor) # type: ignore
192200
jobobj, _ = job_loader.resolve_all(workflowobj, uri)
193201
uri = urllib.parse.urljoin(uri, workflowobj["https://w3id.org/cwl/cwl#tool"])
194202
del cast(dict, jobobj)["https://w3id.org/cwl/cwl#tool"]
@@ -200,22 +208,25 @@ def validate_document(document_loader, # type: Loader
200208
workflowobj = fetch_document(uri, fetcher_constructor=fetcher_constructor)[1]
201209

202210
fileuri = urllib.parse.urldefrag(uri)[0]
203-
204-
if "cwlVersion" in workflowobj:
205-
if not isinstance(workflowobj["cwlVersion"], (str, Text)):
206-
raise Exception("'cwlVersion' must be a string, got %s" % type(workflowobj["cwlVersion"]))
207-
# strip out version
208-
workflowobj["cwlVersion"] = re.sub(
209-
r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "",
210-
workflowobj["cwlVersion"])
211-
if workflowobj["cwlVersion"] not in list(ALLUPDATES):
212-
# print out all the Supported Versions of cwlVersion
213-
versions = list(ALLUPDATES) # ALLUPDATES is a dict
214-
versions.sort()
215-
raise ValidationException("'cwlVersion' not valid. Supported CWL versions are: \n{}".format("\n".join(versions)))
216-
else:
217-
raise ValidationException("No cwlVersion found."
218-
"Use the following syntax in your CWL workflow to declare version: cwlVersion: <version>")
211+
if "cwlVersion" not in workflowobj:
212+
if metadata and 'cwlVersion' in metadata:
213+
workflowobj['cwlVersion'] = metadata['cwlVersion']
214+
else:
215+
raise ValidationException("No cwlVersion found."
216+
"Use the following syntax in your CWL document to declare "
217+
"the version: cwlVersion: <version>")
218+
219+
if not isinstance(workflowobj["cwlVersion"], (str, Text)):
220+
raise Exception("'cwlVersion' must be a string, got %s" % type(workflowobj["cwlVersion"]))
221+
# strip out version
222+
workflowobj["cwlVersion"] = re.sub(
223+
r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "",
224+
workflowobj["cwlVersion"])
225+
if workflowobj["cwlVersion"] not in list(ALLUPDATES):
226+
# print out all the Supported Versions of cwlVersion
227+
versions = list(ALLUPDATES) # ALLUPDATES is a dict
228+
versions.sort()
229+
raise ValidationException("'cwlVersion' not valid. Supported CWL versions are: \n{}".format("\n".join(versions)))
219230

220231
if workflowobj["cwlVersion"] == "draft-2":
221232
workflowobj = cast(CommentedMap, cmap(update._draft2toDraft3dev1(
@@ -238,36 +249,36 @@ def validate_document(document_loader, # type: Loader
238249
_add_blank_ids(workflowobj)
239250

240251
workflowobj["id"] = fileuri
241-
processobj, metadata = document_loader.resolve_all(workflowobj, fileuri)
252+
processobj, new_metadata = document_loader.resolve_all(workflowobj, fileuri)
242253
if not isinstance(processobj, (CommentedMap, CommentedSeq)):
243254
raise ValidationException("Workflow must be a dict or list.")
244255

245-
if not metadata:
256+
if not new_metadata:
246257
if not isinstance(processobj, dict):
247258
raise ValidationException("Draft-2 workflows must be a dict.")
248-
metadata = cast(CommentedMap, cmap({"$namespaces": processobj.get("$namespaces", {}),
249-
"$schemas": processobj.get("$schemas", []),
250-
"cwlVersion": processobj["cwlVersion"]},
251-
fn=fileuri))
259+
new_metadata = cast(CommentedMap, cmap(
260+
{"$namespaces": processobj.get("$namespaces", {}),
261+
"$schemas": processobj.get("$schemas", []),
262+
"cwlVersion": processobj["cwlVersion"]}, fn=fileuri))
252263

253264
_convert_stdstreams_to_files(workflowobj)
254265

255266
if preprocess_only:
256-
return document_loader, avsc_names, processobj, metadata, uri
267+
return document_loader, avsc_names, processobj, new_metadata, uri
257268

258269
schema.validate_doc(avsc_names, processobj, document_loader, strict)
259270

260-
if metadata.get("cwlVersion") != update.LATEST:
271+
if new_metadata.get("cwlVersion") != update.LATEST:
261272
processobj = cast(CommentedMap, cmap(update.update(
262-
processobj, document_loader, fileuri, enable_dev, metadata)))
273+
processobj, document_loader, fileuri, enable_dev, new_metadata)))
263274

264275
if jobobj:
265-
metadata[u"cwl:defaults"] = jobobj
276+
new_metadata[u"cwl:defaults"] = jobobj
266277

267278
if overrides:
268-
metadata[u"cwltool:overrides"] = overrides
279+
new_metadata[u"cwltool:overrides"] = overrides
269280

270-
return document_loader, avsc_names, processobj, metadata, uri
281+
return document_loader, avsc_names, processobj, new_metadata, uri
271282

272283

273284
def make_tool(document_loader, # type: Loader
@@ -332,7 +343,8 @@ def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]]
332343
document_loader, avsc_names, processobj, metadata, uri = validate_document(
333344
document_loader, workflowobj, uri, enable_dev=enable_dev,
334345
strict=strict, fetcher_constructor=fetcher_constructor,
335-
overrides=overrides)
346+
overrides=overrides, metadata=kwargs.get('metadata', None)
347+
if kwargs else None)
336348
return make_tool(document_loader, avsc_names, metadata, uri,
337349
makeTool, kwargs if kwargs else {})
338350

cwltool/main.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -544,8 +544,8 @@ def load_job_order(args, # type: argparse.Namespace
544544
job_order_object, _ = loader.resolve_ref(job_order_file, checklinks=False)
545545

546546
if job_order_object and "http://commonwl.org/cwltool#overrides" in job_order_object:
547-
overrides.extend(resolve_overrides(job_order_object, file_uri(job_order_file), tool_file_uri))
548-
del job_order_object["http://commonwl.org/cwltool#overrides"]
547+
overrides.extend(resolve_overrides(job_order_object, file_uri(job_order_file), tool_file_uri))
548+
del job_order_object["http://commonwl.org/cwltool#overrides"]
549549

550550
if not job_order_object:
551551
input_basedir = args.basedir if args.basedir else os.getcwd()
@@ -641,6 +641,7 @@ def addSizes(p):
641641
ns = {} # type: Dict[Text, Union[Dict[Any, Any], Text, Iterable[Text]]]
642642
ns.update(t.metadata.get("$namespaces", {}))
643643
ld = Loader(ns)
644+
644645
def expand_formats(p):
645646
if "format" in p:
646647
p["format"] = ld.expand_url(p["format"], "")

cwltool/workflow.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,15 @@ def match_types(sinktype, src, iid, inputobj, linkMerge, valueFrom):
7070
elif isinstance(src.parameter["type"], list):
7171
# Source is union type
7272
# Check that at least one source type is compatible with the sink.
73-
for st in src.parameter["type"]:
74-
srccopy = copy.deepcopy(src)
75-
srccopy.parameter["type"] = st
76-
if match_types(sinktype, srccopy, iid, inputobj, linkMerge, valueFrom):
73+
original_types = src.parameter["type"]
74+
for source_type in original_types:
75+
src.parameter["type"] = source_type
76+
match = match_types(
77+
sinktype, src, iid, inputobj, linkMerge, valueFrom)
78+
if match:
79+
src.parameter["type"] = original_types
7780
return True
81+
src.parameter["type"] = original_types
7882
return False
7983
elif linkMerge:
8084
if iid not in inputobj:

tests/test_pack.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
import cwltool.pack
1414
import cwltool.workflow
15+
from cwltool.resolver import tool_resolver
16+
from cwltool import load_tool
1517
from cwltool.load_tool import fetch_document, validate_document
1618
from cwltool.main import makeRelative, main, print_pack
1719
from cwltool.pathmapper import adjustDirObjs, adjustFileObjs
@@ -23,6 +25,7 @@ class TestPack(unittest.TestCase):
2325
maxDiff = None
2426

2527
def test_pack(self):
28+
load_tool.loaders = {}
2629

2730
document_loader, workflowobj, uri = fetch_document(
2831
get_data("tests/wf/revsort.cwl"))
@@ -97,10 +100,11 @@ def _pack_idempotently(self, document):
97100
reason="Instance of cwltool is used, on Windows it invokes a default docker container"
98101
"which is not supported on AppVeyor")
99102
def test_packed_workflow_execution(self):
103+
load_tool.loaders = {}
100104
test_wf = "tests/wf/count-lines1-wf.cwl"
101105
test_wf_job = "tests/wf/wc-job.json"
102106
document_loader, workflowobj, uri = fetch_document(
103-
get_data(test_wf))
107+
get_data(test_wf), resolver=tool_resolver)
104108
document_loader, avsc_names, processobj, metadata, uri = validate_document(
105109
document_loader, workflowobj, uri)
106110
packed = json.loads(print_pack(document_loader, processobj, uri, metadata))

0 commit comments

Comments
 (0)