Skip to content

Commit db71a2d

Browse files
implement a new BulkCommand to support delete operation
1 parent e9a7455 commit db71a2d

File tree

7 files changed

+90
-241
lines changed

7 files changed

+90
-241
lines changed

mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,10 +221,6 @@ public String getSerializerValueWriterClassName() {
221221
return getProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS);
222222
}
223223

224-
public Settings setSerializerValueWriterClassName(String className) {
225-
setProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS, className);
226-
return this;
227-
}
228224

229225
public String getSerializerBytesConverterClassName() {
230226
return getProperty(ES_SERIALIZATION_WRITER_BYTES_CLASS);
@@ -770,3 +766,4 @@ public String save() {
770766

771767
public abstract Properties asProperties();
772768
}
769+

mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/AbstractBulkFactory.java

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,10 @@
1818
*/
1919
package org.elasticsearch.hadoop.serialization.bulk;
2020

21-
import java.util.ArrayList;
22-
import java.util.List;
23-
import java.util.Date;
24-
2521
import org.apache.commons.logging.Log;
2622
import org.apache.commons.logging.LogFactory;
2723
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
24+
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
2825
import org.elasticsearch.hadoop.cfg.Settings;
2926
import org.elasticsearch.hadoop.rest.Resource;
3027
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
@@ -44,6 +41,10 @@
4441
import org.elasticsearch.hadoop.util.ObjectUtils;
4542
import org.elasticsearch.hadoop.util.StringUtils;
4643

44+
import java.util.ArrayList;
45+
import java.util.Date;
46+
import java.util.List;
47+
4748
public abstract class AbstractBulkFactory implements BulkFactory {
4849

4950
private static Log log = LogFactory.getLog(AbstractBulkFactory.class);
@@ -60,13 +61,13 @@ public abstract class AbstractBulkFactory implements BulkFactory {
6061
// used when specifying an index pattern
6162
private IndexExtractor indexExtractor;
6263
private FieldExtractor idExtractor,
63-
typeExtractor,
64-
parentExtractor,
65-
routingExtractor,
66-
versionExtractor,
67-
ttlExtractor,
68-
timestampExtractor,
69-
paramsExtractor;
64+
typeExtractor,
65+
parentExtractor,
66+
routingExtractor,
67+
versionExtractor,
68+
ttlExtractor,
69+
timestampExtractor,
70+
paramsExtractor;
7071

7172
private final FieldExtractor versionTypeExtractor = new FieldExtractor() {
7273

@@ -139,14 +140,10 @@ void doWrite(Object value) {
139140
}
140141

141142
pool.get().bytes(valueString);
142-
}
143-
144-
else if (value instanceof Date) {
145-
String valueString = (value == null ? "null": Long.toString(((Date) value).getTime()));
143+
} else if (value instanceof Date) {
144+
String valueString = (value == null ? "null" : Long.toString(((Date) value).getTime()));
146145
pool.get().bytes(valueString);
147-
}
148-
149-
else if (value instanceof RawJson) {
146+
} else if (value instanceof RawJson) {
150147
pool.get().bytes(((RawJson) value).json());
151148
}
152149
// library specific type - use the value writer (a bit overkill but handles collections/arrays properly)
@@ -249,25 +246,24 @@ private void initExtractorsFromSettings(final Settings settings) {
249246
ttlExtractor = jsonExtractors.ttl();
250247
timestampExtractor = jsonExtractors.timestamp();
251248
paramsExtractor = jsonExtractors.params();
252-
}
253-
else {
249+
} else {
254250
// init extractors (if needed)
255251
if (settings.getMappingId() != null) {
256252
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingId());
257-
idExtractor = ObjectUtils.<FieldExtractor> instantiate(settings.getMappingIdExtractorClassName(),
253+
idExtractor = ObjectUtils.<FieldExtractor>instantiate(settings.getMappingIdExtractorClassName(),
258254
settings);
259255
}
260256
if (settings.getMappingParent() != null) {
261257
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingParent());
262-
parentExtractor = ObjectUtils.<FieldExtractor> instantiate(
258+
parentExtractor = ObjectUtils.<FieldExtractor>instantiate(
263259
settings.getMappingParentExtractorClassName(), settings);
264260
}
265261
// Two different properties can satisfy the routing field extraction
266262
ChainedFieldExtractor.NoValueHandler routingResponse = ChainedFieldExtractor.NoValueHandler.SKIP;
267263
List<FieldExtractor> routings = new ArrayList<FieldExtractor>(2);
268264
if (settings.getMappingRouting() != null) {
269265
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingRouting());
270-
FieldExtractor extractor = ObjectUtils.<FieldExtractor> instantiate(
266+
FieldExtractor extractor = ObjectUtils.<FieldExtractor>instantiate(
271267
settings.getMappingRoutingExtractorClassName(), settings);
272268
// If we specify a routing field, return NOT_FOUND if we ultimately cannot find one instead of skipping
273269
routingResponse = ChainedFieldExtractor.NoValueHandler.NOT_FOUND;
@@ -286,22 +282,22 @@ private void initExtractorsFromSettings(final Settings settings) {
286282

287283
if (settings.getMappingTtl() != null) {
288284
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingTtl());
289-
ttlExtractor = ObjectUtils.<FieldExtractor> instantiate(settings.getMappingTtlExtractorClassName(),
285+
ttlExtractor = ObjectUtils.<FieldExtractor>instantiate(settings.getMappingTtlExtractorClassName(),
290286
settings);
291287
}
292288
if (settings.getMappingVersion() != null) {
293289
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingVersion());
294-
versionExtractor = ObjectUtils.<FieldExtractor> instantiate(
290+
versionExtractor = ObjectUtils.<FieldExtractor>instantiate(
295291
settings.getMappingVersionExtractorClassName(), settings);
296292
}
297293
if (settings.getMappingTimestamp() != null) {
298294
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingTimestamp());
299-
timestampExtractor = ObjectUtils.<FieldExtractor> instantiate(
295+
timestampExtractor = ObjectUtils.<FieldExtractor>instantiate(
300296
settings.getMappingTimestampExtractorClassName(), settings);
301297
}
302298

303299
// create adapter
304-
IndexExtractor iformat = ObjectUtils.<IndexExtractor> instantiate(settings.getMappingIndexExtractorClassName(), settings);
300+
IndexExtractor iformat = ObjectUtils.<IndexExtractor>instantiate(settings.getMappingIndexExtractorClassName(), settings);
305301
iformat.compile(new Resource(settings, false).toString());
306302

307303
if (iformat.hasPattern()) {
@@ -371,14 +367,15 @@ public BulkCommand createBulk() {
371367
if (!isStatic) {
372368
before.add(new DynamicHeaderRef());
373369
after.add(new DynamicEndRef());
374-
}
375-
else {
370+
} else {
376371
writeObjectHeader(before);
377372
before = compact(before);
378373
writeObjectEnd(after);
379374
after = compact(after);
380375
}
381-
376+
if (ConfigurationOptions.ES_OPERATION_DELETE.equals(getOperation())) {
377+
return new DeleteTemplatedBulk(before, after);
378+
}
382379
boolean isScriptUpdate = settings.hasUpdateScript();
383380
// compress pieces
384381
if (jsonInput) {
@@ -523,15 +520,13 @@ private List<Object> compact(List<Object> list) {
523520
stringAccumulator.setLength(0);
524521
}
525522
compacted.add(object);
526-
}
527-
else if (object instanceof FieldExtractor) {
523+
} else if (object instanceof FieldExtractor) {
528524
if (stringAccumulator.length() > 0) {
529525
compacted.add(new BytesArray(stringAccumulator.toString()));
530526
stringAccumulator.setLength(0);
531527
}
532528
compacted.add(new FieldWriter((FieldExtractor) object));
533-
}
534-
else {
529+
} else {
535530
stringAccumulator.append(object.toString());
536531
}
537532
}
@@ -546,3 +541,4 @@ protected FieldExtractor getParamExtractor() {
546541
return paramsExtractor;
547542
}
548543
}
544+

mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteBulkFactory.java

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -42,48 +42,9 @@
4242

4343
public class DeleteBulkFactory extends AbstractBulkFactory {
4444

45-
public static final class NoDataWriter implements ValueWriter<Object> {
46-
47-
@Override
48-
public Result write(Object writable, Generator generator) {
49-
//delete doesn't require any content but it needs to extract metadata associated to a document
50-
if (writable == null || writable instanceof NullWritable) {
51-
generator.writeNull();
52-
} else if (writable instanceof Text) {
53-
Text text = (Text) writable;
54-
generator.writeUTF8String(text.getBytes(), 0, text.getLength());
55-
} else if (writable instanceof UTF8) {
56-
UTF8 utf8 = (UTF8) writable;
57-
generator.writeUTF8String(utf8.getBytes(), 0, utf8.getLength());
58-
} else if (writable instanceof IntWritable) {
59-
generator.writeNumber(((IntWritable) writable).get());
60-
} else if (writable instanceof LongWritable) {
61-
generator.writeNumber(((LongWritable) writable).get());
62-
} else if (writable instanceof VLongWritable) {
63-
generator.writeNumber(((VLongWritable) writable).get());
64-
} else if (writable instanceof VIntWritable) {
65-
generator.writeNumber(((VIntWritable) writable).get());
66-
} else if (writable instanceof ByteWritable) {
67-
generator.writeNumber(((ByteWritable) writable).get());
68-
} else if (writable instanceof DoubleWritable) {
69-
generator.writeNumber(((DoubleWritable) writable).get());
70-
} else if (writable instanceof FloatWritable) {
71-
generator.writeNumber(((FloatWritable) writable).get());
72-
} else if (writable instanceof BooleanWritable) {
73-
generator.writeBoolean(((BooleanWritable) writable).get());
74-
} else if (writable instanceof BytesWritable) {
75-
BytesWritable bw = (BytesWritable) writable;
76-
generator.writeBinary(bw.getBytes(), 0, bw.getLength());
77-
} else if (writable instanceof MD5Hash) {
78-
generator.writeString(writable.toString());
79-
}
80-
return Result.SUCCESFUL();
81-
}
82-
}
8345

8446
public DeleteBulkFactory(Settings settings, MetadataExtractor metaExtractor, EsMajorVersion version) {
85-
// we only want a specific serializer for this particular bulk factory
86-
super(settings.copy().setSerializerValueWriterClassName(NoDataWriter.class.getName()), metaExtractor, version);
47+
super(settings, metaExtractor, version);
8748
}
8849

8950
@Override
@@ -97,3 +58,4 @@ protected void writeObjectEnd(List<Object> list) {
9758
list.add(StringUtils.EMPTY);
9859
}
9960
}
61+
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.hadoop.serialization.bulk;
20+
21+
import org.elasticsearch.hadoop.util.BytesRef;
22+
23+
import java.util.Collection;
24+
25+
public class DeleteTemplatedBulk extends TemplatedBulk {
26+
27+
DeleteTemplatedBulk(Collection<Object> beforeObject, Collection<Object> afterObject) {
28+
super(beforeObject, afterObject, null);
29+
}
30+
31+
@Override
32+
public BytesRef write(Object object) {
33+
ref.reset();
34+
scratchPad.reset();
35+
Object processed = preProcess(object, scratchPad);
36+
writeTemplate(beforeObject, processed);
37+
writeTemplate(afterObject, processed);
38+
return ref;
39+
}
40+
}
41+

mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/TemplatedBulk.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@
3232

3333
class TemplatedBulk implements BulkCommand {
3434

35-
private final Collection<Object> beforeObject;
36-
private final Collection<Object> afterObject;
35+
protected final Collection<Object> beforeObject;
36+
protected final Collection<Object> afterObject;
3737

38-
private BytesArray scratchPad = new BytesArray(1024);
39-
private BytesRef ref = new BytesRef();
38+
protected BytesArray scratchPad = new BytesArray(1024);
39+
protected BytesRef ref = new BytesRef();
4040

4141
private final ValueWriter valueWriter;
4242

@@ -71,7 +71,7 @@ protected void doWriteObject(Object object, BytesArray storage, ValueWriter<?> w
7171
ContentBuilder.generate(bos, writer).value(object).flush().close();
7272
}
7373

74-
private void writeTemplate(Collection<Object> template, Object object) {
74+
protected void writeTemplate(Collection<Object> template, Object object) {
7575
for (Object item : template) {
7676
if (item instanceof BytesArray) {
7777
ref.add((BytesArray) item);
@@ -89,4 +89,4 @@ else if (item instanceof DynamicContentRef) {
8989
}
9090
}
9191
}
92-
}
92+
}

mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,6 @@
1818
*/
1919
package org.elasticsearch.hadoop.serialization;
2020

21-
import java.util.ArrayList;
22-
import java.util.Arrays;
23-
import java.util.Collection;
24-
import java.util.Collections;
25-
import java.util.LinkedHashMap;
26-
import java.util.Map;
27-
2821
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
2922
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
3023
import org.elasticsearch.hadoop.cfg.Settings;
@@ -33,7 +26,6 @@
3326
import org.elasticsearch.hadoop.serialization.builder.JdkValueWriter;
3427
import org.elasticsearch.hadoop.serialization.bulk.BulkCommand;
3528
import org.elasticsearch.hadoop.serialization.bulk.BulkCommands;
36-
import org.elasticsearch.hadoop.serialization.bulk.DeleteBulkFactory;
3729
import org.elasticsearch.hadoop.util.BytesArray;
3830
import org.elasticsearch.hadoop.util.EsMajorVersion;
3931
import org.elasticsearch.hadoop.util.StringUtils;
@@ -44,8 +36,12 @@
4436
import org.junit.runners.Parameterized;
4537
import org.junit.runners.Parameterized.Parameters;
4638

47-
import static org.junit.Assert.assertEquals;
39+
import java.util.ArrayList;
40+
import java.util.Collection;
41+
import java.util.LinkedHashMap;
42+
import java.util.Map;
4843

44+
import static org.junit.Assert.assertEquals;
4945
import static org.junit.Assume.assumeFalse;
5046
import static org.junit.Assume.assumeTrue;
5147

@@ -84,7 +80,7 @@ public static Collection<Object[]> data() {
8480
for (EsMajorVersion version : versions) {
8581
for (boolean asJson : asJsons) {
8682
for (String operation : operations) {
87-
result.add(new Object[]{ operation, asJson, version });
83+
result.add(new Object[]{operation, asJson, version});
8884
}
8985
}
9086
}
@@ -317,7 +313,7 @@ public void testUpdateOnlyInlineScript5X() throws Exception {
317313
create(set).write(data).copyTo(ba);
318314
String result =
319315
"{\"" + operation + "\":{\"_id\":2,\"_retry_on_conflict\":3}}\n" +
320-
"{\"script\":{\"inline\":\"counter = 3\",\"lang\":\"groovy\"}}\n";
316+
"{\"script\":{\"inline\":\"counter = 3\",\"lang\":\"groovy\"}}\n";
321317
assertEquals(result, ba.toString());
322318
}
323319

@@ -447,7 +443,7 @@ public void testUpdateOnlyParamInlineScript5X() throws Exception {
447443

448444
String result =
449445
"{\"" + operation + "\":{\"_id\":1}}\n" +
450-
"{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}}}\n";
446+
"{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}}}\n";
451447

452448
assertEquals(result, ba.toString());
453449
}
@@ -523,11 +519,7 @@ private Settings settings() {
523519

524520
set.setInternalVersion(version);
525521
set.setProperty(ConfigurationOptions.ES_INPUT_JSON, Boolean.toString(jsonInput));
526-
if (isDeleteOP()) {
527-
InitializationUtils.setValueWriterIfNotSet(set, DeleteBulkFactory.NoDataWriter.class, null);
528-
} else {
529-
InitializationUtils.setValueWriterIfNotSet(set, JdkValueWriter.class, null);
530-
}
522+
InitializationUtils.setValueWriterIfNotSet(set, JdkValueWriter.class, null);
531523
InitializationUtils.setFieldExtractorIfNotSet(set, MapFieldExtractor.class, null);
532524
InitializationUtils.setBytesConverterIfNeeded(set, JdkBytesConverter.class, null);
533525
InitializationUtils.setUserProviderIfNotSet(set, HadoopUserProvider.class, null);
@@ -577,3 +569,4 @@ private boolean isDeleteOP() {
577569
return ConfigurationOptions.ES_OPERATION_DELETE.equals(operation);
578570
}
579571
}
572+

0 commit comments

Comments
 (0)