Skip to content

Commit d2f155d

Browse files
committed
Share serialized settings and mappings between partition definitions (elastic#1477)
In situations where large pushdown queries, large mappings, and indices with many shards are combined, we can inflate the driver program memory very quickly as we duplicate the settings and mappings data for each ParititionDefinition we create. This change serializes the settings and mapping data once and shares the immutable state between all PartitionDefinition instances to avoid ballooning memory.
1 parent 2684e55 commit d2f155d

File tree

5 files changed

+70
-36
lines changed

5 files changed

+70
-36
lines changed

mr/src/main/java/org/elasticsearch/hadoop/rest/PartitionDefinition.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,33 @@ public class PartitionDefinition implements Serializable, Comparable<PartitionDe
4242
private final String serializedSettings, serializedMapping;
4343
private final String[] locations;
4444

45-
public PartitionDefinition(Settings settings, Mapping mapping, String index, int shardId) {
46-
this(settings, mapping, index, shardId, null, EMPTY_ARRAY);
47-
}
45+
public static class PartitionDefinitionBuilder {
46+
private final String serializedSettings, serializedMapping;
47+
48+
private PartitionDefinitionBuilder(Settings settings, Mapping resolvedMapping) {
49+
this.serializedSettings = settings == null ? null : settings.save();
50+
this.serializedMapping = resolvedMapping == null ? null : IOUtils.serializeToBase64(resolvedMapping);
51+
}
52+
53+
public PartitionDefinition build(String index, int shardId) {
54+
return new PartitionDefinition(serializedSettings, serializedMapping, index, shardId, null, EMPTY_ARRAY);
55+
}
56+
57+
public PartitionDefinition build(String index, int shardId, String[] locations) {
58+
return new PartitionDefinition(serializedSettings, serializedMapping, index, shardId, null, locations);
59+
}
60+
61+
public PartitionDefinition build(String index, int shardId, Slice slice) {
62+
return new PartitionDefinition(serializedSettings, serializedMapping, index, shardId, slice, EMPTY_ARRAY);
63+
}
4864

49-
public PartitionDefinition(Settings settings, Mapping mapping, String index, int shardId, String[] locations) {
50-
this(settings, mapping, index, shardId, null, locations);
65+
public PartitionDefinition build(String index, int shardId, Slice slice, String[] locations) {
66+
return new PartitionDefinition(serializedSettings, serializedMapping, index, shardId, slice, locations);
67+
}
5168
}
5269

53-
public PartitionDefinition(Settings settings, Mapping mapping, String index, int shardId, Slice slice) {
54-
this(settings, mapping, index, shardId, slice, EMPTY_ARRAY);
70+
public static PartitionDefinitionBuilder builder(Settings settings, Mapping resolvedMapping) {
71+
return new PartitionDefinitionBuilder(settings, resolvedMapping);
5572
}
5673

5774
/**
@@ -63,19 +80,11 @@ public PartitionDefinition(Settings settings, Mapping mapping, String index, int
6380
* @param slice The slice the partition will be executed on or null
6481
* @param locations The locations where to find nodes (hostname:port or ip:port) that can execute the partition locally
6582
*/
66-
public PartitionDefinition(Settings settings, Mapping mapping, String index, int shardId, Slice slice, String[] locations) {
83+
private PartitionDefinition(String serializedSettings, String serializedMapping, String index, int shardId, Slice slice, String[] locations) {
6784
this.index = index;
6885
this.shardId = shardId;
69-
if (settings != null) {
70-
this.serializedSettings = settings.save();
71-
} else {
72-
this.serializedSettings = null;
73-
}
74-
if (mapping != null) {
75-
this.serializedMapping = IOUtils.serializeToBase64(mapping);
76-
} else {
77-
this.serializedMapping = null;
78-
}
86+
this.serializedSettings = serializedSettings;
87+
this.serializedMapping = serializedMapping;
7988
this.slice = slice;
8089
this.locations = locations;
8190
}

mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ static List<PartitionDefinition> findShardPartitions(Settings settings, MappingS
285285
List<List<Map<String, Object>>> shards, Log log) {
286286
Mapping resolvedMapping = mappingSet == null ? null : mappingSet.getResolvedView();
287287
List<PartitionDefinition> partitions = new ArrayList<PartitionDefinition>(shards.size());
288+
PartitionDefinition.PartitionDefinitionBuilder partitionBuilder = PartitionDefinition.builder(settings, resolvedMapping);
288289
for (List<Map<String, Object>> group : shards) {
289290
String index = null;
290291
int shardId = -1;
@@ -308,8 +309,7 @@ static List<PartitionDefinition> findShardPartitions(Settings settings, MappingS
308309
"Check your cluster status to see if it is unstable!");
309310
}
310311
} else {
311-
PartitionDefinition partition = new PartitionDefinition(settings, resolvedMapping, index, shardId,
312-
locationList.toArray(new String[0]));
312+
PartitionDefinition partition = partitionBuilder.build(index, shardId, locationList.toArray(new String[0]));
313313
partitions.add(partition);
314314
}
315315
}
@@ -326,6 +326,7 @@ static List<PartitionDefinition> findSlicePartitions(RestClient client, Settings
326326
Assert.notNull(maxDocsPerPartition, "Attempting to find slice partitions but maximum documents per partition is not set.");
327327
Resource readResource = new Resource(settings, true);
328328
Mapping resolvedMapping = mappingSet == null ? null : mappingSet.getResolvedView();
329+
PartitionDefinition.PartitionDefinitionBuilder partitionBuilder = PartitionDefinition.builder(settings, resolvedMapping);
329330

330331
List<PartitionDefinition> partitions = new ArrayList<PartitionDefinition>(shards.size());
331332
for (List<Map<String, Object>> group : shards) {
@@ -362,7 +363,7 @@ static List<PartitionDefinition> findSlicePartitions(RestClient client, Settings
362363
int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition);
363364
for (int i = 0; i < numPartitions; i++) {
364365
PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions);
365-
partitions.add(new PartitionDefinition(settings, resolvedMapping, index, shardId, slice, locations));
366+
partitions.add(partitionBuilder.build(index, shardId, slice, locations));
366367
}
367368
}
368369
}

mr/src/test/java/org/elasticsearch/hadoop/rest/FindPartitionsTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,16 +51,17 @@ public class FindPartitionsTest {
5151

5252
private static final PartitionDefinition[] EXPECTED_SHARDS_PARTITIONS;
5353
static {
54+
PartitionDefinition.PartitionDefinitionBuilder partitionBuilder = PartitionDefinition.builder(null, null);
5455
List<PartitionDefinition> expected =
5556
new ArrayList<PartitionDefinition>();
5657
for (int i = 0; i < 15; i++) {
57-
expected.add(new PartitionDefinition(null, null, "index1", i));
58+
expected.add(partitionBuilder.build("index1", i));
5859
}
5960
for (int i = 0; i < 18; i++) {
60-
expected.add(new PartitionDefinition(null, null, "index2", i));
61+
expected.add(partitionBuilder.build("index2", i));
6162
}
6263
for (int i = 0; i < 1; i++) {
63-
expected.add(new PartitionDefinition(null, null, "index3", i));
64+
expected.add(partitionBuilder.build("index3", i));
6465
}
6566
Collections.sort(expected);
6667
EXPECTED_SHARDS_PARTITIONS = expected.toArray(new PartitionDefinition[0]);

mr/src/test/java/org/elasticsearch/hadoop/rest/PartitionDefinitionTest.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.codehaus.jackson.JsonParser;
2222
import org.codehaus.jackson.map.ObjectMapper;
2323
import org.elasticsearch.hadoop.cfg.PropertiesSettings;
24+
import org.elasticsearch.hadoop.rest.PartitionDefinition.PartitionDefinitionBuilder;
2425
import org.elasticsearch.hadoop.serialization.dto.mapping.FieldParser;
2526
import org.elasticsearch.hadoop.serialization.dto.mapping.Mapping;
2627
import org.elasticsearch.hadoop.util.BytesArray;
@@ -35,8 +36,7 @@
3536
import java.io.ObjectOutputStream;
3637
import java.util.Map;
3738

38-
import static org.junit.Assert.assertArrayEquals;
39-
import static org.junit.Assert.assertEquals;
39+
import static org.junit.Assert.*;
4040

4141
public class PartitionDefinitionTest {
4242

@@ -55,7 +55,7 @@ public void testWritable() throws IOException {
5555
PropertiesSettings settings = new PropertiesSettings();
5656
settings.setProperty("setting1", "value1");
5757
settings.setProperty("setting2", "value2");
58-
PartitionDefinition expected = new PartitionDefinition(settings, mapping, "foo", 12,
58+
PartitionDefinition expected = PartitionDefinition.builder(settings, mapping).build("foo", 12,
5959
new String[] {"localhost:9200", "otherhost:9200"});
6060
BytesArray bytes = writeWritablePartition(expected);
6161
PartitionDefinition def = readWritablePartition(bytes);
@@ -68,7 +68,7 @@ public void testSerializable() throws IOException, ClassNotFoundException {
6868
PropertiesSettings settings = new PropertiesSettings();
6969
settings.setProperty("setting1", "value1");
7070
settings.setProperty("setting2", "value2");
71-
PartitionDefinition expected = new PartitionDefinition(settings, mapping, "bar", 37,
71+
PartitionDefinition expected = PartitionDefinition.builder(settings, mapping).build("bar", 37,
7272
new String[] {"localhost:9200", "otherhost:9200"});
7373
BytesArray bytes = writeSerializablePartition(expected);
7474
PartitionDefinition def = readSerializablePartition(bytes);
@@ -81,7 +81,7 @@ public void testWritableWithSlice() throws IOException {
8181
PropertiesSettings settings = new PropertiesSettings();
8282
settings.setProperty("setting1", "value1");
8383
settings.setProperty("setting2", "value2");
84-
PartitionDefinition expected = new PartitionDefinition(settings, mapping, "foo", 12, new PartitionDefinition.Slice(10, 27),
84+
PartitionDefinition expected = PartitionDefinition.builder(settings, mapping).build("foo", 12, new PartitionDefinition.Slice(10, 27),
8585
new String[] {"localhost:9200", "otherhost:9200"});
8686
BytesArray bytes = writeWritablePartition(expected);
8787
PartitionDefinition def = readWritablePartition(bytes);
@@ -95,13 +95,35 @@ public void testSerializableWithSlice() throws IOException, ClassNotFoundExcepti
9595
settings.setProperty("setting1", "value1");
9696
settings.setProperty("setting2", "value2");
9797

98-
PartitionDefinition expected = new PartitionDefinition(settings, mapping, "bar", 37,
98+
PartitionDefinition expected = PartitionDefinition.builder(settings, mapping).build("bar", 37,
9999
new PartitionDefinition.Slice(13, 35), new String[] {"localhost:9200", "otherhost:9200"});
100100
BytesArray bytes = writeSerializablePartition(expected);
101101
PartitionDefinition def = readSerializablePartition(bytes);
102102
assertPartitionEquals(expected, def);
103103
}
104104

105+
@Test
106+
public void testNonDuplicationOfConfiguration() throws IOException {
107+
Mapping mapping = getTestMapping();
108+
PropertiesSettings settings = new PropertiesSettings();
109+
settings.setProperty("setting1", "value1");
110+
settings.setProperty("setting2", "value2");
111+
PartitionDefinitionBuilder partitionBuilder = PartitionDefinition.builder(settings, mapping);
112+
113+
PartitionDefinition first = partitionBuilder.build("foo", 11,
114+
new String[] {"localhost:9200", "otherhost:9200"});
115+
PartitionDefinition second = partitionBuilder.build("foo", 12,
116+
new String[] {"localhost:9200", "otherhost:9200"});
117+
assertNotEquals(first, second);
118+
assertSame(first.getSerializedSettings(), second.getSerializedSettings());
119+
assertSame(first.getSerializedMapping(), second.getSerializedMapping());
120+
BytesArray bytes = writeWritablePartition(first);
121+
PartitionDefinition def = readWritablePartition(bytes);
122+
assertPartitionEquals(first, def);
123+
assertNotSame(first.getSerializedSettings(), def.getSerializedSettings());
124+
assertNotSame(first.getSerializedMapping(), def.getSerializedMapping());
125+
}
126+
105127
static PartitionDefinition readSerializablePartition(BytesArray bytes) throws IOException, ClassNotFoundException {
106128
FastByteArrayInputStream in = new FastByteArrayInputStream(bytes);
107129
ObjectInputStream ois = new ObjectInputStream(in);

mr/src/test/java/org/elasticsearch/hadoop/rest/RestServiceTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,13 @@ public void setup() {
7575

7676
ShardInfo sh6 = new ShardInfo(info);
7777

78-
pd1 = new PartitionDefinition(null, null, sh1.getIndex(), sh1.getName());
79-
pd2 = new PartitionDefinition(null, null, sh2.getIndex(), sh2.getName());
80-
pd3 = new PartitionDefinition(null, null, sh3.getIndex(), sh3.getName());
81-
pd4 = new PartitionDefinition(null, null, sh4.getIndex(), sh4.getName());
82-
pd5 = new PartitionDefinition(null, null, sh5.getIndex(), sh5.getName());
83-
pd6 = new PartitionDefinition(null, null, sh6.getIndex(), sh6.getName());
78+
PartitionDefinition.PartitionDefinitionBuilder bldr = PartitionDefinition.builder(null, null);
79+
pd1 = bldr.build(sh1.getIndex(), sh1.getName());
80+
pd2 = bldr.build(sh2.getIndex(), sh2.getName());
81+
pd3 = bldr.build(sh3.getIndex(), sh3.getName());
82+
pd4 = bldr.build(sh4.getIndex(), sh4.getName());
83+
pd5 = bldr.build(sh5.getIndex(), sh5.getName());
84+
pd6 = bldr.build(sh6.getIndex(), sh6.getName());
8485

8586
pds = Arrays.asList(pd1, pd2, pd3, pd4, pd5, pd6);
8687
}

0 commit comments

Comments
 (0)