@@ -78,6 +78,7 @@ def bind_input(self, schema, datum, lead_pos=None, tail_pos=None):
78
78
lead_pos = []
79
79
bindings = [] # type: List[Dict[Text,Text]]
80
80
binding = None # type: Dict[Text,Any]
81
+ value_from_expression = False
81
82
if "inputBinding" in schema and isinstance (schema ["inputBinding" ], dict ):
82
83
binding = copy .copy (schema ["inputBinding" ])
83
84
@@ -87,29 +88,33 @@ def bind_input(self, schema, datum, lead_pos=None, tail_pos=None):
87
88
binding ["position" ] = aslist (lead_pos ) + [0 ] + aslist (tail_pos )
88
89
89
90
binding ["datum" ] = datum
91
+ if "valueFrom" in binding :
92
+ value_from_expression = True
90
93
91
94
# Handle union types
92
95
if isinstance (schema ["type" ], list ):
93
- for t in schema ["type" ]:
94
- if isinstance (t , (str , Text )) and self .names .has_name (t , "" ):
95
- avsc = self .names .get_name (t , "" )
96
- elif isinstance (t , dict ) and "name" in t and self .names .has_name (t ["name" ], "" ):
97
- avsc = self .names .get_name (t ["name" ], "" )
98
- else :
99
- avsc = AvroSchemaFromJSONData (t , self .names )
100
- if validate .validate (avsc , datum ):
101
- schema = copy .deepcopy (schema )
102
- schema ["type" ] = t
103
- return self .bind_input (schema , datum , lead_pos = lead_pos , tail_pos = tail_pos )
104
- raise validate .ValidationException (u"'%s' is not a valid union %s" % (datum , schema ["type" ]))
96
+ if not value_from_expression :
97
+ for t in schema ["type" ]:
98
+ if isinstance (t , (str , Text )) and self .names .has_name (t , "" ):
99
+ avsc = self .names .get_name (t , "" )
100
+ elif isinstance (t , dict ) and "name" in t and self .names .has_name (t ["name" ], "" ):
101
+ avsc = self .names .get_name (t ["name" ], "" )
102
+ else :
103
+ avsc = AvroSchemaFromJSONData (t , self .names )
104
+ if validate .validate (avsc , datum ):
105
+ schema = copy .deepcopy (schema )
106
+ schema ["type" ] = t
107
+ return self .bind_input (schema , datum , lead_pos = lead_pos , tail_pos = tail_pos )
108
+ raise validate .ValidationException (u"'%s' is not a valid union %s" % (datum , schema ["type" ]))
105
109
elif isinstance (schema ["type" ], dict ):
106
- st = copy .deepcopy (schema ["type" ])
107
- if binding and "inputBinding" not in st and st ["type" ] == "array" and "itemSeparator" not in binding :
108
- st ["inputBinding" ] = {}
109
- for k in ("secondaryFiles" , "format" , "streamable" ):
110
- if k in schema :
111
- st [k ] = schema [k ]
112
- bindings .extend (self .bind_input (st , datum , lead_pos = lead_pos , tail_pos = tail_pos ))
110
+ if not value_from_expression :
111
+ st = copy .deepcopy (schema ["type" ])
112
+ if binding and "inputBinding" not in st and st ["type" ] == "array" and "itemSeparator" not in binding :
113
+ st ["inputBinding" ] = {}
114
+ for k in ("secondaryFiles" , "format" , "streamable" ):
115
+ if k in schema :
116
+ st [k ] = schema [k ]
117
+ bindings .extend (self .bind_input (st , datum , lead_pos = lead_pos , tail_pos = tail_pos ))
113
118
else :
114
119
if schema ["type" ] in self .schemaDefs :
115
120
schema = self .schemaDefs [schema ["type" ]]
@@ -212,15 +217,18 @@ def generate_arg(self, binding): # type: (Dict[Text,Any]) -> List[Text]
212
217
213
218
prefix = binding .get ("prefix" )
214
219
sep = binding .get ("separate" , True )
220
+ if prefix is None and not sep :
221
+ with SourceLine (binding , "separate" , WorkflowException , _logger .isEnabledFor (logging .DEBUG )):
222
+ raise WorkflowException ("'separate' option can not be specified without prefix" )
215
223
216
224
l = [] # type: List[Dict[Text,Text]]
217
225
if isinstance (value , list ):
218
- if binding .get ("itemSeparator" ):
226
+ if binding .get ("itemSeparator" ) and value :
219
227
l = [binding ["itemSeparator" ].join ([self .tostr (v ) for v in value ])]
220
228
elif binding .get ("valueFrom" ):
221
229
value = [self .tostr (v ) for v in value ]
222
230
return ([prefix ] if prefix else []) + value
223
- elif prefix :
231
+ elif prefix and value :
224
232
return [prefix ]
225
233
else :
226
234
return []
@@ -244,8 +252,8 @@ def generate_arg(self, binding): # type: (Dict[Text,Any]) -> List[Text]
244
252
245
253
return [a for a in args if a is not None ]
246
254
247
- def do_eval (self , ex , context = None , pull_image = True , recursive = False ):
248
- # type: (Union[Dict[Text, Text], Text], Any, bool, bool) -> Any
255
+ def do_eval (self , ex , context = None , pull_image = True , recursive = False , strip_whitespace = True ):
256
+ # type: (Union[Dict[Text, Text], Text], Any, bool, bool, bool ) -> Any
249
257
if recursive :
250
258
if isinstance (ex , dict ):
251
259
return {k : self .do_eval (v , context , pull_image , recursive ) for k , v in iteritems (ex )}
@@ -260,4 +268,5 @@ def do_eval(self, ex, context=None, pull_image=True, recursive=False):
260
268
timeout = self .timeout ,
261
269
debug = self .debug ,
262
270
js_console = self .js_console ,
263
- force_docker_pull = self .force_docker_pull )
271
+ force_docker_pull = self .force_docker_pull ,
272
+ strip_whitespace = strip_whitespace )
0 commit comments