Skip to content

Commit c4c8977

Browse files
Implement delete operation (#1)
Implement delete operation and tests
1 parent edbb3e8 commit c4c8977

File tree

7 files changed

+369
-52
lines changed

7 files changed

+369
-52
lines changed

mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,11 @@ public String getSerializerValueWriterClassName() {
221221
return getProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS);
222222
}
223223

224+
public Settings setSerializerValueWriterClassName(String className) {
225+
setProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS, className);
226+
return this;
227+
}
228+
224229
public String getSerializerBytesConverterClassName() {
225230
return getProperty(ES_SERIALIZATION_WRITER_BYTES_CLASS);
226231
}

mr/src/main/java/org/elasticsearch/hadoop/rest/InitializationUtils.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public static void filterNonDataNodesIfNeeded(Settings settings, Log log) {
152152
}
153153

154154
RestClient bootstrap = new RestClient(settings);
155-
try {
155+
try {
156156
String message = "No data nodes with HTTP-enabled available";
157157
List<NodeInfo> dataNodes = bootstrap.getHttpDataNodes();
158158
if (dataNodes.isEmpty()) {
@@ -253,10 +253,18 @@ public static void validateSettings(Settings settings) {
253253
Assert.isTrue(settings.getMappingExcludes().isEmpty(), "When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...");
254254
}
255255

256+
//check the configuration is coherent in order to use the delete operation
257+
if (ConfigurationOptions.ES_OPERATION_DELETE.equals(settings.getOperation())) {
258+
Assert.isTrue(!settings.getInputAsJson(), "When using delete operation, providing data as JSON is not coherent because this operation does not need document as a payload. This is most likely not what the user intended. Bailing out...");
259+
Assert.isTrue(settings.getMappingIncludes().isEmpty(), "When using delete operation, the field inclusion feature is ignored. This is most likely not what the user intended. Bailing out...");
260+
Assert.isTrue(settings.getMappingExcludes().isEmpty(), "When using delete operation, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...");
261+
Assert.isTrue(settings.getMappingId() != null && !StringUtils.EMPTY.equals(settings.getMappingId()), "When using delete operation, the property " + ConfigurationOptions.ES_MAPPING_ID + " must be set and must not be empty since we need the document id in order to delete it. Bailing out...");
262+
}
263+
256264
// Check to make sure user doesn't specify more than one script type
257265
boolean hasScript = false;
258266
String[] scripts = {settings.getUpdateScriptInline(), settings.getUpdateScriptFile(), settings.getUpdateScriptStored()};
259-
for (String script: scripts) {
267+
for (String script : scripts) {
260268
boolean isSet = StringUtils.hasText(script);
261269
Assert.isTrue((hasScript && isSet) == false, "Multiple scripts are specified. Please specify only one via [es.update.script.inline], [es.update.script.file], or [es.update.script.stored]");
262270
hasScript = hasScript || isSet;
@@ -291,7 +299,8 @@ public static void validateSettingsForWriting(Settings settings) {
291299
throw new EsHadoopIllegalArgumentException("Cannot use TTL on index/update requests in ES 6.x and " +
292300
"above. Please remove the [" + ConfigurationOptions.ES_MAPPING_TTL + "] setting.");
293301
}
294-
} else {
302+
}
303+
else {
295304
if (StringUtils.hasText(settings.getMappingTtl())) {
296305
LOG.warn("Setting [" + ConfigurationOptions.ES_MAPPING_TTL + "] is deprecated! Support for [ttl] on " +
297306
"indexing and update requests has been removed in ES 6.x and above!");
@@ -378,7 +387,7 @@ private static void doCheckIndexExistence(Settings settings, RestRepository clie
378387
settings.getResourceWrite(), ConfigurationOptions.ES_INDEX_AUTO_CREATE, settings.getIndexAutoCreate()));
379388
}
380389
}
381-
390+
382391
public static boolean setMetadataExtractorIfNotSet(Settings settings, Class<? extends MetadataExtractor> clazz, Log log) {
383392
if (!StringUtils.hasText(settings.getMappingMetadataExtractorClassName())) {
384393
Log logger = (log != null ? log : LogFactory.getLog(clazz));
@@ -468,4 +477,4 @@ public static boolean setUserProviderIfNotSet(Settings settings, Class<? extends
468477
}
469478
return false;
470479
}
471-
}
480+
}

mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/BulkCommands.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,11 @@ else if (ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation)) {
4545
else if (ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation)) {
4646
factory = new UpdateBulkFactory(settings, true, metaExtractor, version);
4747
}
48+
else if (ConfigurationOptions.ES_OPERATION_DELETE.equals(operation)) {
49+
factory = new DeleteBulkFactory(settings, metaExtractor, version);
50+
}
4851
else {
49-
throw new EsHadoopIllegalArgumentException("Unknown operation " + operation);
52+
throw new EsHadoopIllegalArgumentException("Unsupported bulk operation " + operation);
5053
}
5154

5255
return factory.createBulk();
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.hadoop.serialization.bulk;
20+
21+
import org.apache.hadoop.io.*;
22+
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
23+
import org.elasticsearch.hadoop.cfg.Settings;
24+
import org.elasticsearch.hadoop.serialization.Generator;
25+
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
26+
import org.elasticsearch.hadoop.util.EsMajorVersion;
27+
import org.elasticsearch.hadoop.util.StringUtils;
28+
29+
import java.util.List;
30+
31+
public class DeleteBulkFactory extends AbstractBulkFactory {
32+
33+
public static final class NoDataWriter implements ValueWriter<Object> {
34+
35+
@Override
36+
public Result write(Object writable, Generator generator) {
37+
///delete doesn't require any content but it needs to extract metadata associated to a document
38+
if (writable == null || writable instanceof NullWritable) {
39+
generator.writeNull();
40+
}
41+
else if (writable instanceof Text) {
42+
Text text = (Text) writable;
43+
generator.writeUTF8String(text.getBytes(), 0, text.getLength());
44+
}
45+
else if (writable instanceof UTF8) {
46+
UTF8 utf8 = (UTF8) writable;
47+
generator.writeUTF8String(utf8.getBytes(), 0, utf8.getLength());
48+
}
49+
else if (writable instanceof IntWritable) {
50+
generator.writeNumber(((IntWritable) writable).get());
51+
}
52+
else if (writable instanceof LongWritable) {
53+
generator.writeNumber(((LongWritable) writable).get());
54+
}
55+
else if (writable instanceof VLongWritable) {
56+
generator.writeNumber(((VLongWritable) writable).get());
57+
}
58+
else if (writable instanceof VIntWritable) {
59+
generator.writeNumber(((VIntWritable) writable).get());
60+
}
61+
else if (writable instanceof ByteWritable) {
62+
generator.writeNumber(((ByteWritable) writable).get());
63+
}
64+
else if (writable instanceof DoubleWritable) {
65+
generator.writeNumber(((DoubleWritable) writable).get());
66+
}
67+
else if (writable instanceof FloatWritable) {
68+
generator.writeNumber(((FloatWritable) writable).get());
69+
}
70+
else if (writable instanceof BooleanWritable) {
71+
generator.writeBoolean(((BooleanWritable) writable).get());
72+
}
73+
else if (writable instanceof BytesWritable) {
74+
BytesWritable bw = (BytesWritable) writable;
75+
generator.writeBinary(bw.getBytes(), 0, bw.getLength());
76+
}
77+
else if (writable instanceof MD5Hash) {
78+
generator.writeString(writable.toString());
79+
}
80+
return Result.SUCCESFUL();
81+
}
82+
}
83+
84+
public DeleteBulkFactory(Settings settings, MetadataExtractor metaExtractor, EsMajorVersion version) {
85+
// we only want a specific serializer for this particular bulk factory
86+
super(settings.copy().setSerializerValueWriterClassName(NoDataWriter.class.getName()), metaExtractor, version);
87+
}
88+
89+
@Override
90+
protected String getOperation() {
91+
return ConfigurationOptions.ES_OPERATION_DELETE;
92+
}
93+
94+
@Override
95+
protected void writeObjectEnd(List<Object> list) {
96+
// skip adding new-line for each entity as delete doesn't need entity output
97+
list.add(StringUtils.EMPTY);
98+
}
99+
}

mr/src/test/java/org/elasticsearch/hadoop/rest/InitializationUtilsTest.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,4 +136,43 @@ public void testValidateWriteV6PlusTimestampRemoved() throws Exception {
136136
set.setProperty(ES_MAPPING_TIMESTAMP, "1000");
137137
validateSettingsForWriting(set);
138138
}
139-
}
139+
140+
@Test(expected = EsHadoopIllegalArgumentException.class)
141+
public void testValidateDeleteOperationVsInputAsJson() {
142+
Settings set = new TestSettings();
143+
set.setProperty(ES_WRITE_OPERATION, "delete");
144+
set.setProperty(ES_INPUT_JSON, "true");
145+
validateSettings(set);
146+
}
147+
148+
@Test(expected = EsHadoopIllegalArgumentException.class)
149+
public void testValidateDeleteOperationVsIncludeFields() {
150+
Settings set = new TestSettings();
151+
set.setProperty(ES_WRITE_OPERATION, "delete");
152+
set.setProperty(ES_MAPPING_INCLUDE, "field");
153+
validateSettings(set);
154+
}
155+
156+
@Test(expected = EsHadoopIllegalArgumentException.class)
157+
public void testValidateDeleteOperationVsExcludeFields() {
158+
Settings set = new TestSettings();
159+
set.setProperty(ES_WRITE_OPERATION, "delete");
160+
set.setProperty(ES_MAPPING_EXCLUDE, "field");
161+
validateSettings(set);
162+
}
163+
164+
@Test(expected = EsHadoopIllegalArgumentException.class)
165+
public void testValidateDeleteOperationVsIdNotSet() {
166+
Settings set = new TestSettings();
167+
set.setProperty(ES_WRITE_OPERATION, "delete");
168+
validateSettings(set);
169+
}
170+
171+
@Test(expected = EsHadoopIllegalArgumentException.class)
172+
public void testValidateDeleteOperationVsEmptyId() {
173+
Settings set = new TestSettings();
174+
set.setProperty(ES_WRITE_OPERATION, "delete");
175+
set.setProperty(ES_MAPPING_ID, "");
176+
validateSettings(set);
177+
}
178+
}

0 commit comments

Comments
 (0)