Skip to content

Commit 41cfc5e

Browse files
Implement delete operation (#1324)
This PR adds support for delete operations. Document ID's must be present for each document for this to work. A new bulk command factory has been added which will render a delete operation with the id and no document body in the bulk output.
1 parent fbc52bc commit 41cfc5e

File tree

9 files changed

+248
-95
lines changed

9 files changed

+248
-95
lines changed

mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ public String getSerializerValueWriterClassName() {
221221
return getProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS);
222222
}
223223

224+
224225
public String getSerializerBytesConverterClassName() {
225226
return getProperty(ES_SERIALIZATION_WRITER_BYTES_CLASS);
226227
}
@@ -765,3 +766,4 @@ public String save() {
765766

766767
public abstract Properties asProperties();
767768
}
769+

mr/src/main/java/org/elasticsearch/hadoop/rest/InitializationUtils.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public static void filterNonDataNodesIfNeeded(Settings settings, Log log) {
152152
}
153153

154154
RestClient bootstrap = new RestClient(settings);
155-
try {
155+
try {
156156
String message = "No data nodes with HTTP-enabled available";
157157
List<NodeInfo> dataNodes = bootstrap.getHttpDataNodes();
158158
if (dataNodes.isEmpty()) {
@@ -253,10 +253,18 @@ public static void validateSettings(Settings settings) {
253253
Assert.isTrue(settings.getMappingExcludes().isEmpty(), "When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...");
254254
}
255255

256+
//check the configuration is coherent in order to use the delete operation
257+
if (ConfigurationOptions.ES_OPERATION_DELETE.equals(settings.getOperation())) {
258+
Assert.isTrue(!settings.getInputAsJson(), "When using delete operation, providing data as JSON is not coherent because this operation does not need document as a payload. This is most likely not what the user intended. Bailing out...");
259+
Assert.isTrue(settings.getMappingIncludes().isEmpty(), "When using delete operation, the field inclusion feature is ignored. This is most likely not what the user intended. Bailing out...");
260+
Assert.isTrue(settings.getMappingExcludes().isEmpty(), "When using delete operation, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...");
261+
Assert.isTrue(settings.getMappingId() != null && !StringUtils.EMPTY.equals(settings.getMappingId()), "When using delete operation, the property " + ConfigurationOptions.ES_MAPPING_ID + " must be set and must not be empty since we need the document id in order to delete it. Bailing out...");
262+
}
263+
256264
// Check to make sure user doesn't specify more than one script type
257265
boolean hasScript = false;
258266
String[] scripts = {settings.getUpdateScriptInline(), settings.getUpdateScriptFile(), settings.getUpdateScriptStored()};
259-
for (String script: scripts) {
267+
for (String script : scripts) {
260268
boolean isSet = StringUtils.hasText(script);
261269
Assert.isTrue((hasScript && isSet) == false, "Multiple scripts are specified. Please specify only one via [es.update.script.inline], [es.update.script.file], or [es.update.script.stored]");
262270
hasScript = hasScript || isSet;
@@ -468,4 +476,4 @@ public static boolean setUserProviderIfNotSet(Settings settings, Class<? extends
468476
}
469477
return false;
470478
}
471-
}
479+
}

mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/AbstractBulkFactory.java

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,10 @@
1818
*/
1919
package org.elasticsearch.hadoop.serialization.bulk;
2020

21-
import java.util.ArrayList;
22-
import java.util.List;
23-
import java.util.Date;
24-
2521
import org.apache.commons.logging.Log;
2622
import org.apache.commons.logging.LogFactory;
2723
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
24+
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
2825
import org.elasticsearch.hadoop.cfg.Settings;
2926
import org.elasticsearch.hadoop.rest.Resource;
3027
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
@@ -44,6 +41,10 @@
4441
import org.elasticsearch.hadoop.util.ObjectUtils;
4542
import org.elasticsearch.hadoop.util.StringUtils;
4643

44+
import java.util.ArrayList;
45+
import java.util.Date;
46+
import java.util.List;
47+
4748
public abstract class AbstractBulkFactory implements BulkFactory {
4849

4950
private static Log log = LogFactory.getLog(AbstractBulkFactory.class);
@@ -60,13 +61,13 @@ public abstract class AbstractBulkFactory implements BulkFactory {
6061
// used when specifying an index pattern
6162
private IndexExtractor indexExtractor;
6263
private FieldExtractor idExtractor,
63-
typeExtractor,
64-
parentExtractor,
65-
routingExtractor,
66-
versionExtractor,
67-
ttlExtractor,
68-
timestampExtractor,
69-
paramsExtractor;
64+
typeExtractor,
65+
parentExtractor,
66+
routingExtractor,
67+
versionExtractor,
68+
ttlExtractor,
69+
timestampExtractor,
70+
paramsExtractor;
7071

7172
private final FieldExtractor versionTypeExtractor = new FieldExtractor() {
7273

@@ -139,14 +140,10 @@ void doWrite(Object value) {
139140
}
140141

141142
pool.get().bytes(valueString);
142-
}
143-
144-
else if (value instanceof Date) {
145-
String valueString = (value == null ? "null": Long.toString(((Date) value).getTime()));
143+
} else if (value instanceof Date) {
144+
String valueString = (value == null ? "null" : Long.toString(((Date) value).getTime()));
146145
pool.get().bytes(valueString);
147-
}
148-
149-
else if (value instanceof RawJson) {
146+
} else if (value instanceof RawJson) {
150147
pool.get().bytes(((RawJson) value).json());
151148
}
152149
// library specific type - use the value writer (a bit overkill but handles collections/arrays properly)
@@ -249,25 +246,24 @@ private void initExtractorsFromSettings(final Settings settings) {
249246
ttlExtractor = jsonExtractors.ttl();
250247
timestampExtractor = jsonExtractors.timestamp();
251248
paramsExtractor = jsonExtractors.params();
252-
}
253-
else {
249+
} else {
254250
// init extractors (if needed)
255251
if (settings.getMappingId() != null) {
256252
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingId());
257-
idExtractor = ObjectUtils.<FieldExtractor> instantiate(settings.getMappingIdExtractorClassName(),
253+
idExtractor = ObjectUtils.<FieldExtractor>instantiate(settings.getMappingIdExtractorClassName(),
258254
settings);
259255
}
260256
if (settings.getMappingParent() != null) {
261257
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingParent());
262-
parentExtractor = ObjectUtils.<FieldExtractor> instantiate(
258+
parentExtractor = ObjectUtils.<FieldExtractor>instantiate(
263259
settings.getMappingParentExtractorClassName(), settings);
264260
}
265261
// Two different properties can satisfy the routing field extraction
266262
ChainedFieldExtractor.NoValueHandler routingResponse = ChainedFieldExtractor.NoValueHandler.SKIP;
267263
List<FieldExtractor> routings = new ArrayList<FieldExtractor>(2);
268264
if (settings.getMappingRouting() != null) {
269265
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingRouting());
270-
FieldExtractor extractor = ObjectUtils.<FieldExtractor> instantiate(
266+
FieldExtractor extractor = ObjectUtils.<FieldExtractor>instantiate(
271267
settings.getMappingRoutingExtractorClassName(), settings);
272268
// If we specify a routing field, return NOT_FOUND if we ultimately cannot find one instead of skipping
273269
routingResponse = ChainedFieldExtractor.NoValueHandler.NOT_FOUND;
@@ -286,22 +282,22 @@ private void initExtractorsFromSettings(final Settings settings) {
286282

287283
if (settings.getMappingTtl() != null) {
288284
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingTtl());
289-
ttlExtractor = ObjectUtils.<FieldExtractor> instantiate(settings.getMappingTtlExtractorClassName(),
285+
ttlExtractor = ObjectUtils.<FieldExtractor>instantiate(settings.getMappingTtlExtractorClassName(),
290286
settings);
291287
}
292288
if (settings.getMappingVersion() != null) {
293289
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingVersion());
294-
versionExtractor = ObjectUtils.<FieldExtractor> instantiate(
290+
versionExtractor = ObjectUtils.<FieldExtractor>instantiate(
295291
settings.getMappingVersionExtractorClassName(), settings);
296292
}
297293
if (settings.getMappingTimestamp() != null) {
298294
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingTimestamp());
299-
timestampExtractor = ObjectUtils.<FieldExtractor> instantiate(
295+
timestampExtractor = ObjectUtils.<FieldExtractor>instantiate(
300296
settings.getMappingTimestampExtractorClassName(), settings);
301297
}
302298

303299
// create adapter
304-
IndexExtractor iformat = ObjectUtils.<IndexExtractor> instantiate(settings.getMappingIndexExtractorClassName(), settings);
300+
IndexExtractor iformat = ObjectUtils.<IndexExtractor>instantiate(settings.getMappingIndexExtractorClassName(), settings);
305301
iformat.compile(new Resource(settings, false).toString());
306302

307303
if (iformat.hasPattern()) {
@@ -371,14 +367,15 @@ public BulkCommand createBulk() {
371367
if (!isStatic) {
372368
before.add(new DynamicHeaderRef());
373369
after.add(new DynamicEndRef());
374-
}
375-
else {
370+
} else {
376371
writeObjectHeader(before);
377372
before = compact(before);
378373
writeObjectEnd(after);
379374
after = compact(after);
380375
}
381-
376+
if (ConfigurationOptions.ES_OPERATION_DELETE.equals(getOperation())) {
377+
return new DeleteTemplatedBulk(before, after);
378+
}
382379
boolean isScriptUpdate = settings.hasUpdateScript();
383380
// compress pieces
384381
if (jsonInput) {
@@ -523,15 +520,13 @@ private List<Object> compact(List<Object> list) {
523520
stringAccumulator.setLength(0);
524521
}
525522
compacted.add(object);
526-
}
527-
else if (object instanceof FieldExtractor) {
523+
} else if (object instanceof FieldExtractor) {
528524
if (stringAccumulator.length() > 0) {
529525
compacted.add(new BytesArray(stringAccumulator.toString()));
530526
stringAccumulator.setLength(0);
531527
}
532528
compacted.add(new FieldWriter((FieldExtractor) object));
533-
}
534-
else {
529+
} else {
535530
stringAccumulator.append(object.toString());
536531
}
537532
}
@@ -546,3 +541,4 @@ protected FieldExtractor getParamExtractor() {
546541
return paramsExtractor;
547542
}
548543
}
544+

mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/BulkCommands.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,11 @@ else if (ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation)) {
4545
else if (ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation)) {
4646
factory = new UpdateBulkFactory(settings, true, metaExtractor, version);
4747
}
48+
else if (ConfigurationOptions.ES_OPERATION_DELETE.equals(operation)) {
49+
factory = new DeleteBulkFactory(settings, metaExtractor, version);
50+
}
4851
else {
49-
throw new EsHadoopIllegalArgumentException("Unknown operation " + operation);
52+
throw new EsHadoopIllegalArgumentException("Unsupported bulk operation " + operation);
5053
}
5154

5255
return factory.createBulk();
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.hadoop.serialization.bulk;
20+
21+
import org.apache.hadoop.io.BooleanWritable;
22+
import org.apache.hadoop.io.ByteWritable;
23+
import org.apache.hadoop.io.BytesWritable;
24+
import org.apache.hadoop.io.DoubleWritable;
25+
import org.apache.hadoop.io.FloatWritable;
26+
import org.apache.hadoop.io.IntWritable;
27+
import org.apache.hadoop.io.LongWritable;
28+
import org.apache.hadoop.io.MD5Hash;
29+
import org.apache.hadoop.io.NullWritable;
30+
import org.apache.hadoop.io.Text;
31+
import org.apache.hadoop.io.UTF8;
32+
import org.apache.hadoop.io.VIntWritable;
33+
import org.apache.hadoop.io.VLongWritable;
34+
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
35+
import org.elasticsearch.hadoop.cfg.Settings;
36+
import org.elasticsearch.hadoop.serialization.Generator;
37+
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
38+
import org.elasticsearch.hadoop.util.EsMajorVersion;
39+
import org.elasticsearch.hadoop.util.StringUtils;
40+
41+
import java.util.List;
42+
43+
public class DeleteBulkFactory extends AbstractBulkFactory {
44+
45+
46+
public DeleteBulkFactory(Settings settings, MetadataExtractor metaExtractor, EsMajorVersion version) {
47+
super(settings, metaExtractor, version);
48+
}
49+
50+
@Override
51+
protected String getOperation() {
52+
return ConfigurationOptions.ES_OPERATION_DELETE;
53+
}
54+
55+
@Override
56+
protected void writeObjectEnd(List<Object> list) {
57+
// skip adding new-line for each entity as delete doesn't need entity output
58+
list.add(StringUtils.EMPTY);
59+
}
60+
}
61+
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.hadoop.serialization.bulk;
20+
21+
import org.elasticsearch.hadoop.util.BytesRef;
22+
23+
import java.util.Collection;
24+
25+
public class DeleteTemplatedBulk extends TemplatedBulk {
26+
27+
DeleteTemplatedBulk(Collection<Object> beforeObject, Collection<Object> afterObject) {
28+
super(beforeObject, afterObject, null);
29+
}
30+
31+
@Override
32+
public BytesRef write(Object object) {
33+
ref.reset();
34+
scratchPad.reset();
35+
Object processed = preProcess(object, scratchPad);
36+
writeTemplate(beforeObject, processed);
37+
writeTemplate(afterObject, processed);
38+
return ref;
39+
}
40+
}
41+

mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/TemplatedBulk.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@
3232

3333
class TemplatedBulk implements BulkCommand {
3434

35-
private final Collection<Object> beforeObject;
36-
private final Collection<Object> afterObject;
35+
protected final Collection<Object> beforeObject;
36+
protected final Collection<Object> afterObject;
3737

38-
private BytesArray scratchPad = new BytesArray(1024);
39-
private BytesRef ref = new BytesRef();
38+
protected BytesArray scratchPad = new BytesArray(1024);
39+
protected BytesRef ref = new BytesRef();
4040

4141
private final ValueWriter valueWriter;
4242

@@ -71,7 +71,7 @@ protected void doWriteObject(Object object, BytesArray storage, ValueWriter<?> w
7171
ContentBuilder.generate(bos, writer).value(object).flush().close();
7272
}
7373

74-
private void writeTemplate(Collection<Object> template, Object object) {
74+
protected void writeTemplate(Collection<Object> template, Object object) {
7575
for (Object item : template) {
7676
if (item instanceof BytesArray) {
7777
ref.add((BytesArray) item);
@@ -89,4 +89,4 @@ else if (item instanceof DynamicContentRef) {
8989
}
9090
}
9191
}
92-
}
92+
}

0 commit comments

Comments
 (0)