Skip to content

Share serialized settings and mappings between partition definitions #1477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,33 @@ public class PartitionDefinition implements Serializable, Comparable<PartitionDe
private final String serializedSettings, serializedMapping;
private final String[] locations;

public PartitionDefinition(Settings settings, Mapping mapping, String index, int shardId) {
this(settings, mapping, index, shardId, null, EMPTY_ARRAY);
}
public static class PartitionDefinitionBuilder {
private final String serializedSettings, serializedMapping;

private PartitionDefinitionBuilder(Settings settings, Mapping resolvedMapping) {
this.serializedSettings = settings == null ? null : settings.save();
this.serializedMapping = resolvedMapping == null ? null : IOUtils.serializeToBase64(resolvedMapping);
}

public PartitionDefinition build(String index, int shardId) {
return new PartitionDefinition(serializedSettings, serializedMapping, index, shardId, null, EMPTY_ARRAY);
}

public PartitionDefinition build(String index, int shardId, String[] locations) {
return new PartitionDefinition(serializedSettings, serializedMapping, index, shardId, null, locations);
}

public PartitionDefinition build(String index, int shardId, Slice slice) {
return new PartitionDefinition(serializedSettings, serializedMapping, index, shardId, slice, EMPTY_ARRAY);
}

public PartitionDefinition(Settings settings, Mapping mapping, String index, int shardId, String[] locations) {
this(settings, mapping, index, shardId, null, locations);
public PartitionDefinition build(String index, int shardId, Slice slice, String[] locations) {
return new PartitionDefinition(serializedSettings, serializedMapping, index, shardId, slice, locations);
}
}

public PartitionDefinition(Settings settings, Mapping mapping, String index, int shardId, Slice slice) {
this(settings, mapping, index, shardId, slice, EMPTY_ARRAY);
public static PartitionDefinitionBuilder builder(Settings settings, Mapping resolvedMapping) {
return new PartitionDefinitionBuilder(settings, resolvedMapping);
}

/**
Expand All @@ -63,19 +80,11 @@ public PartitionDefinition(Settings settings, Mapping mapping, String index, int
* @param slice The slice the partition will be executed on or null
* @param locations The locations where to find nodes (hostname:port or ip:port) that can execute the partition locally
*/
public PartitionDefinition(Settings settings, Mapping mapping, String index, int shardId, Slice slice, String[] locations) {
private PartitionDefinition(String serializedSettings, String serializedMapping, String index, int shardId, Slice slice, String[] locations) {
this.index = index;
this.shardId = shardId;
if (settings != null) {
this.serializedSettings = settings.save();
} else {
this.serializedSettings = null;
}
if (mapping != null) {
this.serializedMapping = IOUtils.serializeToBase64(mapping);
} else {
this.serializedMapping = null;
}
this.serializedSettings = serializedSettings;
this.serializedMapping = serializedMapping;
this.slice = slice;
this.locations = locations;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ static List<PartitionDefinition> findShardPartitions(Settings settings, MappingS
List<List<Map<String, Object>>> shards, Log log) {
Mapping resolvedMapping = mappingSet == null ? null : mappingSet.getResolvedView();
List<PartitionDefinition> partitions = new ArrayList<PartitionDefinition>(shards.size());
PartitionDefinition.PartitionDefinitionBuilder partitionBuilder = PartitionDefinition.builder(settings, resolvedMapping);
for (List<Map<String, Object>> group : shards) {
String index = null;
int shardId = -1;
Expand All @@ -308,8 +309,7 @@ static List<PartitionDefinition> findShardPartitions(Settings settings, MappingS
"Check your cluster status to see if it is unstable!");
}
} else {
PartitionDefinition partition = new PartitionDefinition(settings, resolvedMapping, index, shardId,
locationList.toArray(new String[0]));
PartitionDefinition partition = partitionBuilder.build(index, shardId, locationList.toArray(new String[0]));
partitions.add(partition);
}
}
Expand All @@ -326,6 +326,7 @@ static List<PartitionDefinition> findSlicePartitions(RestClient client, Settings
Assert.notNull(maxDocsPerPartition, "Attempting to find slice partitions but maximum documents per partition is not set.");
Resource readResource = new Resource(settings, true);
Mapping resolvedMapping = mappingSet == null ? null : mappingSet.getResolvedView();
PartitionDefinition.PartitionDefinitionBuilder partitionBuilder = PartitionDefinition.builder(settings, resolvedMapping);

List<PartitionDefinition> partitions = new ArrayList<PartitionDefinition>(shards.size());
for (List<Map<String, Object>> group : shards) {
Expand Down Expand Up @@ -362,7 +363,7 @@ static List<PartitionDefinition> findSlicePartitions(RestClient client, Settings
int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition);
for (int i = 0; i < numPartitions; i++) {
PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions);
partitions.add(new PartitionDefinition(settings, resolvedMapping, index, shardId, slice, locations));
partitions.add(partitionBuilder.build(index, shardId, slice, locations));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,17 @@ public class FindPartitionsTest {

private static final PartitionDefinition[] EXPECTED_SHARDS_PARTITIONS;
static {
PartitionDefinition.PartitionDefinitionBuilder partitionBuilder = PartitionDefinition.builder(null, null);
List<PartitionDefinition> expected =
new ArrayList<PartitionDefinition>();
for (int i = 0; i < 15; i++) {
expected.add(new PartitionDefinition(null, null, "index1", i));
expected.add(partitionBuilder.build("index1", i));
}
for (int i = 0; i < 18; i++) {
expected.add(new PartitionDefinition(null, null, "index2", i));
expected.add(partitionBuilder.build("index2", i));
}
for (int i = 0; i < 1; i++) {
expected.add(new PartitionDefinition(null, null, "index3", i));
expected.add(partitionBuilder.build("index3", i));
}
Collections.sort(expected);
EXPECTED_SHARDS_PARTITIONS = expected.toArray(new PartitionDefinition[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.elasticsearch.hadoop.cfg.PropertiesSettings;
import org.elasticsearch.hadoop.rest.PartitionDefinition.PartitionDefinitionBuilder;
import org.elasticsearch.hadoop.serialization.dto.mapping.FieldParser;
import org.elasticsearch.hadoop.serialization.dto.mapping.Mapping;
import org.elasticsearch.hadoop.util.BytesArray;
Expand All @@ -35,8 +36,7 @@
import java.io.ObjectOutputStream;
import java.util.Map;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;

public class PartitionDefinitionTest {

Expand All @@ -55,7 +55,7 @@ public void testWritable() throws IOException {
PropertiesSettings settings = new PropertiesSettings();
settings.setProperty("setting1", "value1");
settings.setProperty("setting2", "value2");
PartitionDefinition expected = new PartitionDefinition(settings, mapping, "foo", 12,
PartitionDefinition expected = PartitionDefinition.builder(settings, mapping).build("foo", 12,
new String[] {"localhost:9200", "otherhost:9200"});
BytesArray bytes = writeWritablePartition(expected);
PartitionDefinition def = readWritablePartition(bytes);
Expand All @@ -68,7 +68,7 @@ public void testSerializable() throws IOException, ClassNotFoundException {
PropertiesSettings settings = new PropertiesSettings();
settings.setProperty("setting1", "value1");
settings.setProperty("setting2", "value2");
PartitionDefinition expected = new PartitionDefinition(settings, mapping, "bar", 37,
PartitionDefinition expected = PartitionDefinition.builder(settings, mapping).build("bar", 37,
new String[] {"localhost:9200", "otherhost:9200"});
BytesArray bytes = writeSerializablePartition(expected);
PartitionDefinition def = readSerializablePartition(bytes);
Expand All @@ -81,7 +81,7 @@ public void testWritableWithSlice() throws IOException {
PropertiesSettings settings = new PropertiesSettings();
settings.setProperty("setting1", "value1");
settings.setProperty("setting2", "value2");
PartitionDefinition expected = new PartitionDefinition(settings, mapping, "foo", 12, new PartitionDefinition.Slice(10, 27),
PartitionDefinition expected = PartitionDefinition.builder(settings, mapping).build("foo", 12, new PartitionDefinition.Slice(10, 27),
new String[] {"localhost:9200", "otherhost:9200"});
BytesArray bytes = writeWritablePartition(expected);
PartitionDefinition def = readWritablePartition(bytes);
Expand All @@ -95,13 +95,35 @@ public void testSerializableWithSlice() throws IOException, ClassNotFoundExcepti
settings.setProperty("setting1", "value1");
settings.setProperty("setting2", "value2");

PartitionDefinition expected = new PartitionDefinition(settings, mapping, "bar", 37,
PartitionDefinition expected = PartitionDefinition.builder(settings, mapping).build("bar", 37,
new PartitionDefinition.Slice(13, 35), new String[] {"localhost:9200", "otherhost:9200"});
BytesArray bytes = writeSerializablePartition(expected);
PartitionDefinition def = readSerializablePartition(bytes);
assertPartitionEquals(expected, def);
}

@Test
public void testNonDuplicationOfConfiguration() throws IOException {
Mapping mapping = getTestMapping();
PropertiesSettings settings = new PropertiesSettings();
settings.setProperty("setting1", "value1");
settings.setProperty("setting2", "value2");
PartitionDefinitionBuilder partitionBuilder = PartitionDefinition.builder(settings, mapping);

PartitionDefinition first = partitionBuilder.build("foo", 11,
new String[] {"localhost:9200", "otherhost:9200"});
PartitionDefinition second = partitionBuilder.build("foo", 12,
new String[] {"localhost:9200", "otherhost:9200"});
assertNotEquals(first, second);
assertSame(first.getSerializedSettings(), second.getSerializedSettings());
assertSame(first.getSerializedMapping(), second.getSerializedMapping());
BytesArray bytes = writeWritablePartition(first);
PartitionDefinition def = readWritablePartition(bytes);
assertPartitionEquals(first, def);
assertNotSame(first.getSerializedSettings(), def.getSerializedSettings());
assertNotSame(first.getSerializedMapping(), def.getSerializedMapping());
}

static PartitionDefinition readSerializablePartition(BytesArray bytes) throws IOException, ClassNotFoundException {
FastByteArrayInputStream in = new FastByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ public void setup() {

ShardInfo sh6 = new ShardInfo(info);

pd1 = new PartitionDefinition(null, null, sh1.getIndex(), sh1.getName());
pd2 = new PartitionDefinition(null, null, sh2.getIndex(), sh2.getName());
pd3 = new PartitionDefinition(null, null, sh3.getIndex(), sh3.getName());
pd4 = new PartitionDefinition(null, null, sh4.getIndex(), sh4.getName());
pd5 = new PartitionDefinition(null, null, sh5.getIndex(), sh5.getName());
pd6 = new PartitionDefinition(null, null, sh6.getIndex(), sh6.getName());
PartitionDefinition.PartitionDefinitionBuilder bldr = PartitionDefinition.builder(null, null);
pd1 = bldr.build(sh1.getIndex(), sh1.getName());
pd2 = bldr.build(sh2.getIndex(), sh2.getName());
pd3 = bldr.build(sh3.getIndex(), sh3.getName());
pd4 = bldr.build(sh4.getIndex(), sh4.getName());
pd5 = bldr.build(sh5.getIndex(), sh5.getName());
pd6 = bldr.build(sh6.getIndex(), sh6.getName());

pds = Arrays.asList(pd1, pd2, pd3, pd4, pd5, pd6);
}
Expand Down