18
18
*/
19
19
package org .elasticsearch .hadoop .serialization .bulk ;
20
20
21
- import java .util .ArrayList ;
22
- import java .util .List ;
23
- import java .util .Date ;
24
-
25
21
import org .apache .commons .logging .Log ;
26
22
import org .apache .commons .logging .LogFactory ;
27
23
import org .elasticsearch .hadoop .EsHadoopIllegalArgumentException ;
24
+ import org .elasticsearch .hadoop .cfg .ConfigurationOptions ;
28
25
import org .elasticsearch .hadoop .cfg .Settings ;
29
26
import org .elasticsearch .hadoop .rest .Resource ;
30
27
import org .elasticsearch .hadoop .serialization .builder .ValueWriter ;
44
41
import org .elasticsearch .hadoop .util .ObjectUtils ;
45
42
import org .elasticsearch .hadoop .util .StringUtils ;
46
43
44
+ import java .util .ArrayList ;
45
+ import java .util .Date ;
46
+ import java .util .List ;
47
+
47
48
public abstract class AbstractBulkFactory implements BulkFactory {
48
49
49
50
private static Log log = LogFactory .getLog (AbstractBulkFactory .class );
@@ -60,13 +61,13 @@ public abstract class AbstractBulkFactory implements BulkFactory {
60
61
// used when specifying an index pattern
61
62
private IndexExtractor indexExtractor ;
62
63
private FieldExtractor idExtractor ,
63
- typeExtractor ,
64
- parentExtractor ,
65
- routingExtractor ,
66
- versionExtractor ,
67
- ttlExtractor ,
68
- timestampExtractor ,
69
- paramsExtractor ;
64
+ typeExtractor ,
65
+ parentExtractor ,
66
+ routingExtractor ,
67
+ versionExtractor ,
68
+ ttlExtractor ,
69
+ timestampExtractor ,
70
+ paramsExtractor ;
70
71
71
72
private final FieldExtractor versionTypeExtractor = new FieldExtractor () {
72
73
@@ -139,14 +140,10 @@ void doWrite(Object value) {
139
140
}
140
141
141
142
pool .get ().bytes (valueString );
142
- }
143
-
144
- else if (value instanceof Date ) {
145
- String valueString = (value == null ? "null" : Long .toString (((Date ) value ).getTime ()));
143
+ } else if (value instanceof Date ) {
144
+ String valueString = (value == null ? "null" : Long .toString (((Date ) value ).getTime ()));
146
145
pool .get ().bytes (valueString );
147
- }
148
-
149
- else if (value instanceof RawJson ) {
146
+ } else if (value instanceof RawJson ) {
150
147
pool .get ().bytes (((RawJson ) value ).json ());
151
148
}
152
149
// library specific type - use the value writer (a bit overkill but handles collections/arrays properly)
@@ -249,25 +246,24 @@ private void initExtractorsFromSettings(final Settings settings) {
249
246
ttlExtractor = jsonExtractors .ttl ();
250
247
timestampExtractor = jsonExtractors .timestamp ();
251
248
paramsExtractor = jsonExtractors .params ();
252
- }
253
- else {
249
+ } else {
254
250
// init extractors (if needed)
255
251
if (settings .getMappingId () != null ) {
256
252
settings .setProperty (ConstantFieldExtractor .PROPERTY , settings .getMappingId ());
257
- idExtractor = ObjectUtils .<FieldExtractor > instantiate (settings .getMappingIdExtractorClassName (),
253
+ idExtractor = ObjectUtils .<FieldExtractor >instantiate (settings .getMappingIdExtractorClassName (),
258
254
settings );
259
255
}
260
256
if (settings .getMappingParent () != null ) {
261
257
settings .setProperty (ConstantFieldExtractor .PROPERTY , settings .getMappingParent ());
262
- parentExtractor = ObjectUtils .<FieldExtractor > instantiate (
258
+ parentExtractor = ObjectUtils .<FieldExtractor >instantiate (
263
259
settings .getMappingParentExtractorClassName (), settings );
264
260
}
265
261
// Two different properties can satisfy the routing field extraction
266
262
ChainedFieldExtractor .NoValueHandler routingResponse = ChainedFieldExtractor .NoValueHandler .SKIP ;
267
263
List <FieldExtractor > routings = new ArrayList <FieldExtractor >(2 );
268
264
if (settings .getMappingRouting () != null ) {
269
265
settings .setProperty (ConstantFieldExtractor .PROPERTY , settings .getMappingRouting ());
270
- FieldExtractor extractor = ObjectUtils .<FieldExtractor > instantiate (
266
+ FieldExtractor extractor = ObjectUtils .<FieldExtractor >instantiate (
271
267
settings .getMappingRoutingExtractorClassName (), settings );
272
268
// If we specify a routing field, return NOT_FOUND if we ultimately cannot find one instead of skipping
273
269
routingResponse = ChainedFieldExtractor .NoValueHandler .NOT_FOUND ;
@@ -286,22 +282,22 @@ private void initExtractorsFromSettings(final Settings settings) {
286
282
287
283
if (settings .getMappingTtl () != null ) {
288
284
settings .setProperty (ConstantFieldExtractor .PROPERTY , settings .getMappingTtl ());
289
- ttlExtractor = ObjectUtils .<FieldExtractor > instantiate (settings .getMappingTtlExtractorClassName (),
285
+ ttlExtractor = ObjectUtils .<FieldExtractor >instantiate (settings .getMappingTtlExtractorClassName (),
290
286
settings );
291
287
}
292
288
if (settings .getMappingVersion () != null ) {
293
289
settings .setProperty (ConstantFieldExtractor .PROPERTY , settings .getMappingVersion ());
294
- versionExtractor = ObjectUtils .<FieldExtractor > instantiate (
290
+ versionExtractor = ObjectUtils .<FieldExtractor >instantiate (
295
291
settings .getMappingVersionExtractorClassName (), settings );
296
292
}
297
293
if (settings .getMappingTimestamp () != null ) {
298
294
settings .setProperty (ConstantFieldExtractor .PROPERTY , settings .getMappingTimestamp ());
299
- timestampExtractor = ObjectUtils .<FieldExtractor > instantiate (
295
+ timestampExtractor = ObjectUtils .<FieldExtractor >instantiate (
300
296
settings .getMappingTimestampExtractorClassName (), settings );
301
297
}
302
298
303
299
// create adapter
304
- IndexExtractor iformat = ObjectUtils .<IndexExtractor > instantiate (settings .getMappingIndexExtractorClassName (), settings );
300
+ IndexExtractor iformat = ObjectUtils .<IndexExtractor >instantiate (settings .getMappingIndexExtractorClassName (), settings );
305
301
iformat .compile (new Resource (settings , false ).toString ());
306
302
307
303
if (iformat .hasPattern ()) {
@@ -371,14 +367,15 @@ public BulkCommand createBulk() {
371
367
if (!isStatic ) {
372
368
before .add (new DynamicHeaderRef ());
373
369
after .add (new DynamicEndRef ());
374
- }
375
- else {
370
+ } else {
376
371
writeObjectHeader (before );
377
372
before = compact (before );
378
373
writeObjectEnd (after );
379
374
after = compact (after );
380
375
}
381
-
376
+ if (ConfigurationOptions .ES_OPERATION_DELETE .equals (getOperation ())) {
377
+ return new DeleteTemplatedBulk (before , after );
378
+ }
382
379
boolean isScriptUpdate = settings .hasUpdateScript ();
383
380
// compress pieces
384
381
if (jsonInput ) {
@@ -523,15 +520,13 @@ private List<Object> compact(List<Object> list) {
523
520
stringAccumulator .setLength (0 );
524
521
}
525
522
compacted .add (object );
526
- }
527
- else if (object instanceof FieldExtractor ) {
523
+ } else if (object instanceof FieldExtractor ) {
528
524
if (stringAccumulator .length () > 0 ) {
529
525
compacted .add (new BytesArray (stringAccumulator .toString ()));
530
526
stringAccumulator .setLength (0 );
531
527
}
532
528
compacted .add (new FieldWriter ((FieldExtractor ) object ));
533
- }
534
- else {
529
+ } else {
535
530
stringAccumulator .append (object .toString ());
536
531
}
537
532
}
@@ -546,3 +541,4 @@ protected FieldExtractor getParamExtractor() {
546
541
return paramsExtractor ;
547
542
}
548
543
}
544
+
0 commit comments