Skip to content

Commit 2210cd3

Browse files
authored
Optional directory listing (#311)
* Do not expand 'listing' field on Directory objects unless loadListing is provided. loadListing can be "shallow" (expand one level) or "deep" (expand recursively). * Implement mechanism for registering schema extensions in cwltool. * Make sure empty directory literals get created. * Tests for Directory listing behavior extensions.
1 parent 36de782 commit 2210cd3

16 files changed

+274
-94
lines changed

cwltool/builder.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from . import expression
1010
from .errors import WorkflowException
11-
from .pathmapper import PathMapper, adjustFileObjs, normalizeFilesDirs
11+
from .pathmapper import PathMapper, adjustFileObjs, normalizeFilesDirs, get_listing
1212
from .stdfsaccess import StdFsAccess
1313
from .utils import aslist
1414

@@ -42,6 +42,10 @@ def __init__(self): # type: () -> None
4242
self.build_job_script = None # type: Callable[[List[str]], Text]
4343
self.debug = False # type: bool
4444

45+
# One of None, "shallow", "deep"
46+
# Will be default None for CWL v1.1
47+
self.loadListing = "deep" # type: Union[None, str]
48+
4549
def bind_input(self, schema, datum, lead_pos=None, tail_pos=None):
4650
# type: (Dict[Text, Any], Any, Union[int, List[int]], List[int]) -> List[Dict[Text, Any]]
4751
if tail_pos is None:
@@ -112,9 +116,10 @@ def bind_input(self, schema, datum, lead_pos=None, tail_pos=None):
112116

113117
if schema["type"] == "File":
114118
self.files.append(datum)
115-
if binding and binding.get("loadContents"):
116-
with self.fs_access.open(datum["location"], "rb") as f:
117-
datum["contents"] = f.read(CONTENT_LIMIT)
119+
if binding:
120+
if binding.get("loadContents"):
121+
with self.fs_access.open(datum["location"], "rb") as f:
122+
datum["contents"] = f.read(CONTENT_LIMIT)
118123

119124
if "secondaryFiles" in schema:
120125
if "secondaryFiles" not in datum:
@@ -142,6 +147,9 @@ def _capture_files(f):
142147
adjustFileObjs(datum.get("secondaryFiles", []), _capture_files)
143148

144149
if schema["type"] == "Directory":
150+
ll = self.loadListing or (binding and binding.get("loadListing"))
151+
if ll:
152+
get_listing(self.fs_access, datum, (ll == "deep"))
145153
self.files.append(datum)
146154

147155
# Position to front of the sort key

cwltool/draft2tool.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
from .pathmapper import adjustDirObjs
2121
from .errors import WorkflowException
2222
from .job import CommandLineJob
23-
from .pathmapper import PathMapper
24-
from .process import Process, shortname, uniquename, getListing, normalizeFilesDirs, compute_checksums
23+
from .pathmapper import PathMapper, get_listing, trim_listing
24+
from .process import Process, shortname, uniquename, normalizeFilesDirs, compute_checksums
2525
from .stdfsaccess import StdFsAccess
2626
from .utils import aslist
2727

@@ -419,6 +419,7 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True):
419419
% (shortname(port["id"]), indent(u(str(e)))))
420420

421421
if ret:
422+
adjustDirObjs(ret, trim_listing)
422423
adjustFileObjs(ret,
423424
cast(Callable[[Any], Any], # known bug in mypy
424425
# https://github.com/python/mypy/issues/797
@@ -468,8 +469,10 @@ def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=Tr
468469
raise
469470

470471
for files in r:
471-
if files["class"] == "Directory" and "listing" not in files:
472-
getListing(fs_access, files)
472+
if files["class"] == "Directory":
473+
ll = builder.loadListing or (binding and binding.get("loadListing"))
474+
if ll:
475+
get_listing(fs_access, files, (ll == "deep"))
473476
else:
474477
with fs_access.open(files["location"], "rb") as f:
475478
contents = ""

cwltool/extensions.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
$base: http://commonwl.org/cwltool#
2+
$namespaces:
3+
cwl: "https://w3id.org/cwl/cwl#"
4+
$graph:
5+
- $import: https://w3id.org/cwl/CommonWorkflowLanguage.yml
6+
7+
- name: LoadListingRequirement
8+
type: record
9+
extends: cwl:ProcessRequirement
10+
fields:
11+
class:
12+
type: string
13+
doc: "Always 'LoadListingRequirement'"
14+
jsonldPredicate:
15+
"_id": "@type"
16+
"_type": "@vocab"
17+
loadListing:
18+
type:
19+
- "null"
20+
- type: enum
21+
name: LoadListingEnum
22+
symbols: [shallow, deep]

cwltool/job.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,10 @@ def run(self, dry_run=False, pull_image=True, rm_container=True,
154154

155155
if img_id:
156156
runtime = ["docker", "run", "-i"]
157-
for src in self.pathmapper.files():
158-
vol = self.pathmapper.mapper(src)
159-
if vol.type == "File":
157+
for src, vol in self.pathmapper.items():
158+
if not vol.staged:
159+
continue
160+
if vol.type in ("File", "Directory"):
160161
runtime.append(u"--volume=%s:%s:ro" % (vol.resolved, vol.target))
161162
if vol.type == "CreateFile":
162163
createtmp = os.path.join(self.stagedir, os.path.basename(vol.target))
@@ -209,7 +210,7 @@ def run(self, dry_run=False, pull_image=True, rm_container=True,
209210
env["HOME"] = self.outdir
210211
env["TMPDIR"] = self.tmpdir
211212

212-
stageFiles(self.pathmapper, os.symlink)
213+
stageFiles(self.pathmapper, os.symlink, ignoreWritable=True)
213214

214215
scr, _ = get_feature(self, "ShellCommandRequirement")
215216

cwltool/main.py

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020

2121
from . import draft2tool
2222
from . import workflow
23-
from .builder import adjustFileObjs
24-
from .pathmapper import adjustDirObjs
23+
from .pathmapper import adjustDirObjs, get_listing, adjustFileObjs, trim_listing
2524
from .cwlrdf import printrdf, printdot
2625
from .errors import WorkflowException, UnsupportedRequirement
2726
from .load_tool import fetch_document, validate_document, make_tool
2827
from .pack import pack
29-
from .process import shortname, Process, getListing, relocateOutputs, cleanIntermediate, scandeps, normalizeFilesDirs
28+
from .process import (shortname, Process, relocateOutputs, cleanIntermediate,
29+
scandeps, normalizeFilesDirs, use_custom_schema, use_standard_schema)
3030
from .resolver import tool_resolver
3131
from .stdfsaccess import StdFsAccess
3232

@@ -148,9 +148,13 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
148148
"relative to primary file or current working directory.")
149149

150150
parser.add_argument("--enable-dev", action="store_true",
151-
help="Allow loading and running development versions "
151+
help="Enable loading and running development versions "
152152
"of CWL spec.", default=False)
153153

154+
parser.add_argument("--enable-ext", action="store_true",
155+
help="Enable loading and running cwltool extensions "
156+
"to CWL spec.", default=False)
157+
154158
parser.add_argument("--default-container",
155159
help="Specify a default docker container that will be used if the workflow fails to specify one.")
156160
parser.add_argument("--no-match-user", action="store_true",
@@ -175,8 +179,9 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
175179
dest="compute_checksum")
176180

177181
parser.add_argument("--relax-path-checks", action="store_true",
178-
default=False, help="Relax requirements on path names. Currently "
179-
"allows spaces.", dest="relax_path_checks")
182+
default=False, help="Relax requirements on path names to permit "
183+
"spaces and hash characters.", dest="relax_path_checks")
184+
180185
parser.add_argument("workflow", type=Text, nargs="?", default=None)
181186
parser.add_argument("job_order", nargs=argparse.REMAINDER)
182187

@@ -234,7 +239,8 @@ def output_callback(out, processStatus):
234239

235240
if final_output and final_output[0] and finaloutdir:
236241
final_output[0] = relocateOutputs(final_output[0], finaloutdir,
237-
output_dirs, kwargs.get("move_outputs"))
242+
output_dirs, kwargs.get("move_outputs"),
243+
kwargs["make_fs_access"](""))
238244

239245
if kwargs.get("rm_tmpdir"):
240246
cleanIntermediate(output_dirs)
@@ -488,9 +494,8 @@ def pathToLoc(p):
488494

489495
adjustDirObjs(job_order_object, pathToLoc)
490496
adjustFileObjs(job_order_object, pathToLoc)
497+
adjustDirObjs(job_order_object, trim_listing)
491498
normalizeFilesDirs(job_order_object)
492-
adjustDirObjs(job_order_object, cast(Callable[..., Any],
493-
functools.partial(getListing, make_fs_access(input_basedir))))
494499

495500
if "cwl:tool" in job_order_object:
496501
del job_order_object["cwl:tool"]
@@ -601,6 +606,7 @@ def main(argsl=None, # type: List[str]
601606
'debug': False,
602607
'version': False,
603608
'enable_dev': False,
609+
'enable_ext': False,
604610
'strict': True,
605611
'rdf_serializer': None,
606612
'basedir': None,
@@ -636,6 +642,13 @@ def main(argsl=None, # type: List[str]
636642
if args.relax_path_checks:
637643
draft2tool.ACCEPTLIST_RE = draft2tool.ACCEPTLIST_EN_RELAXED_RE
638644

645+
if args.enable_ext:
646+
res = pkg_resources.resource_stream(__name__, 'extensions.yml')
647+
use_custom_schema("v1.0", "http://commonwl.org/cwltool", res.read())
648+
res.close()
649+
else:
650+
use_standard_schema("v1.0")
651+
639652
try:
640653
document_loader, workflowobj, uri = fetch_document(args.workflow, resolver=resolver,
641654
fetcher_constructor=fetcher_constructor)
@@ -708,13 +721,16 @@ def main(argsl=None, # type: List[str]
708721
setattr(args, 'move_outputs', "copy")
709722
setattr(args, "tmp_outdir_prefix", args.cachedir)
710723

711-
if job_order_object is None:
712-
job_order_object = load_job_order(args, tool, stdin,
713-
print_input_deps=args.print_input_deps,
714-
relative_deps=args.relative_deps,
715-
stdout=stdout,
716-
make_fs_access=make_fs_access,
717-
fetcher_constructor=fetcher_constructor)
724+
try:
725+
if job_order_object is None:
726+
job_order_object = load_job_order(args, tool, stdin,
727+
print_input_deps=args.print_input_deps,
728+
relative_deps=args.relative_deps,
729+
stdout=stdout,
730+
make_fs_access=make_fs_access,
731+
fetcher_constructor=fetcher_constructor)
732+
except SystemExit as e:
733+
return e.code
718734

719735
if isinstance(job_order_object, int):
720736
return job_order_object
@@ -731,6 +747,7 @@ def main(argsl=None, # type: List[str]
731747

732748
# This is the workflow output, it needs to be written
733749
if out is not None:
750+
734751
def locToPath(p):
735752
if p["location"].startswith("file://"):
736753
p["path"] = uri_file_path(p["location"])

cwltool/pathmapper.py

Lines changed: 63 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111
from typing import Any, Callable, Set, Text, Tuple, Union
1212
from six.moves import urllib
1313

14+
from .stdfsaccess import abspath, StdFsAccess
15+
1416
_logger = logging.getLogger("cwltool")
1517

16-
MapperEnt = collections.namedtuple("MapperEnt", ["resolved", "target", "type"])
18+
MapperEnt = collections.namedtuple("MapperEnt", ["resolved", "target", "type", "staged"])
1719

1820

1921
def adjustFiles(rec, op): # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None
@@ -77,12 +79,6 @@ def addLocation(d):
7779
adjustDirObjs(job, addLocation)
7880

7981

80-
def abspath(src, basedir): # type: (Text, Text) -> Text
81-
if src.startswith(u"file://"):
82-
ab = unicode(uri_file_path(str(src)))
83-
else:
84-
ab = src if os.path.isabs(src) else os.path.join(basedir, src)
85-
return ab
8682

8783

8884
def dedup(listing): # type: (List[Any]) -> List[Any]
@@ -106,6 +102,38 @@ def mark(d):
106102

107103
return dd
108104

105+
def get_listing(fs_access, rec, recursive=True):
106+
# type: (StdFsAccess, Dict[Text, Any], bool) -> None
107+
if "listing" in rec:
108+
return
109+
listing = []
110+
loc = rec["location"]
111+
for ld in fs_access.listdir(loc):
112+
parse = urllib.parse.urlparse(ld)
113+
bn = os.path.basename(urllib.request.url2pathname(parse.path))
114+
if fs_access.isdir(ld):
115+
ent = {u"class": u"Directory",
116+
u"location": ld,
117+
u"basename": bn}
118+
if recursive:
119+
get_listing(fs_access, ent, recursive)
120+
listing.append(ent)
121+
else:
122+
listing.append({"class": "File", "location": ld, "basename": bn})
123+
rec["listing"] = listing
124+
125+
def trim_listing(obj):
126+
"""Remove 'listing' field from Directory objects that are file references.
127+
128+
It redundant and potentially expensive to pass fully enumerated Directory
129+
objects around if not explicitly needed, so delete the 'listing' field when
130+
it is safe to do so.
131+
132+
"""
133+
134+
if obj.get("location", "").startswith("file://") and "listing" in obj:
135+
del obj["listing"]
136+
109137

110138
class PathMapper(object):
111139
"""Mapping of files from relative path provided in the file to a tuple of
@@ -148,44 +176,42 @@ def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
148176
self.separateDirs = separateDirs
149177
self.setup(dedup(referenced_files), basedir)
150178

151-
def visitlisting(self, listing, stagedir, basedir, copy=False):
152-
# type: (List[Dict[Text, Any]], Text, Text, bool) -> None
179+
def visitlisting(self, listing, stagedir, basedir, copy=False, staged=False):
180+
# type: (List[Dict[Text, Any]], Text, Text, bool, bool) -> None
153181
for ld in listing:
154-
tgt = os.path.join(stagedir, ld["basename"])
155-
if ld["class"] == "Directory":
156-
self.visit(ld, stagedir, basedir, copy=ld.get("writable", copy))
157-
else:
158-
self.visit(ld, stagedir, basedir, copy=ld.get("writable", copy))
182+
self.visit(ld, stagedir, basedir, copy=ld.get("writable", copy), staged=staged)
159183

160-
def visit(self, obj, stagedir, basedir, copy=False):
161-
# type: (Dict[Text, Any], Text, Text, bool) -> None
184+
def visit(self, obj, stagedir, basedir, copy=False, staged=False):
185+
# type: (Dict[Text, Any], Text, Text, bool, bool) -> None
162186
tgt = os.path.join(stagedir, obj["basename"])
187+
if obj["location"] in self._pathmap:
188+
return
163189
if obj["class"] == "Directory":
164-
self._pathmap[obj["location"]] = MapperEnt(obj["location"], tgt, "Directory")
165-
self.visitlisting(obj.get("listing", []), tgt, basedir, copy=copy)
190+
if obj["location"].startswith("file://"):
191+
resolved = uri_file_path(obj["location"])
192+
else:
193+
resolved = obj["location"]
194+
self._pathmap[obj["location"]] = MapperEnt(resolved, tgt, "WritableDirectory" if copy else "Directory", staged)
195+
if obj["location"].startswith("file://"):
196+
staged = False
197+
self.visitlisting(obj.get("listing", []), tgt, basedir, copy=copy, staged=staged)
166198
elif obj["class"] == "File":
167199
path = obj["location"]
168-
if path in self._pathmap:
169-
return
170200
ab = abspath(path, basedir)
171201
if "contents" in obj and obj["location"].startswith("_:"):
172-
self._pathmap[obj["location"]] = MapperEnt(obj["contents"], tgt, "CreateFile")
202+
self._pathmap[obj["location"]] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
173203
else:
174-
if copy:
175-
self._pathmap[path] = MapperEnt(ab, tgt, "WritableFile")
176-
else:
177-
with SourceLine(obj, "location", validate.ValidationException):
178-
# Dereference symbolic links
179-
deref = ab
204+
with SourceLine(obj, "location", validate.ValidationException):
205+
# Dereference symbolic links
206+
deref = ab
207+
st = os.lstat(deref)
208+
while stat.S_ISLNK(st.st_mode):
209+
rl = os.readlink(deref)
210+
deref = rl if os.path.isabs(rl) else os.path.join(
211+
os.path.dirname(deref), rl)
180212
st = os.lstat(deref)
181-
while stat.S_ISLNK(st.st_mode):
182-
rl = os.readlink(deref)
183-
deref = rl if os.path.isabs(rl) else os.path.join(
184-
os.path.dirname(deref), rl)
185-
st = os.lstat(deref)
186-
187-
self._pathmap[path] = MapperEnt(deref, tgt, "File")
188-
self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir, copy=copy)
213+
self._pathmap[path] = MapperEnt(deref, tgt, "WritableFile" if copy else "File", staged)
214+
self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir, copy=copy, staged=staged)
189215

190216
def setup(self, referenced_files, basedir):
191217
# type: (List[Any], Text) -> None
@@ -196,13 +222,13 @@ def setup(self, referenced_files, basedir):
196222
for fob in referenced_files:
197223
if self.separateDirs:
198224
stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4())
199-
self.visit(fob, stagedir, basedir)
225+
self.visit(fob, stagedir, basedir, staged=True)
200226

201227
def mapper(self, src): # type: (Text) -> MapperEnt
202228
if u"#" in src:
203229
i = src.index(u"#")
204230
p = self._pathmap[src[:i]]
205-
return MapperEnt(p.resolved, p.target + src[i:], None)
231+
return MapperEnt(p.resolved, p.target + src[i:], p.type, p.staged)
206232
else:
207233
return self._pathmap[src]
208234

0 commit comments

Comments
 (0)