Skip to content

Commit 5160a05

Browse files
authored
Fixing support for empty and null arrays (#1827)
This commit makes it so that if an array is null in Elasticsearch, it comes into es-hadoop/spark as null. Previously it showed up as either [] or [null], depending on spark version. Closes #1527
1 parent 56a0d13 commit 5160a05

File tree

7 files changed

+169
-1
lines changed

7 files changed

+169
-1
lines changed

mr/src/main/java/org/elasticsearch/hadoop/serialization/ScrollReader.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -895,7 +895,12 @@ else if (t == Token.START_OBJECT) {
895895
String rawValue = parser.text();
896896
try {
897897
if (isArrayField(fieldMapping)) {
898-
return singletonList(fieldMapping, parseValue(parser, esType), parser);
898+
Object parsedValue = parseValue(parser, esType);
899+
if (parsedValue == null) {
900+
return null; //There is not a null element in the array. The array itself is null.
901+
} else {
902+
return singletonList(fieldMapping, parsedValue, parser);
903+
}
899904
} else {
900905
return parseValue(parser, esType);
901906
}

mr/src/test/java/org/elasticsearch/hadoop/serialization/ScrollReaderTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import static org.junit.Assert.assertEquals;
5454
import static org.junit.Assert.assertFalse;
5555
import static org.junit.Assert.assertNotNull;
56+
import static org.junit.Assert.assertNull;
5657
import static org.junit.Assert.assertThat;
5758
import static org.junit.Assert.assertTrue;
5859
import static org.junit.Assert.fail;
@@ -431,6 +432,24 @@ public void testScrollWithNestedArrays() throws IOException {
431432
assertEquals(10L, JsonUtils.query("a").get(0).get(0).get(0).apply(scroll.getHits().get(2)[1]));
432433
}
433434

435+
@Test
436+
public void testScrollWithEmptyrrays() throws IOException {
437+
MappingSet mappings = getMappingSet("empty-list");
438+
InputStream stream = getClass().getResourceAsStream(scrollData("empty-list"));
439+
Settings testSettings = new TestSettings();
440+
testSettings.setProperty(ConfigurationOptions.ES_READ_FIELD_AS_ARRAY_INCLUDE, "status_code");
441+
testSettings.setProperty(ConfigurationOptions.ES_READ_METADATA, "" + readMetadata);
442+
testSettings.setProperty(ConfigurationOptions.ES_READ_METADATA_FIELD, "" + metadataField);
443+
testSettings.setProperty(ConfigurationOptions.ES_OUTPUT_JSON, "" + readAsJson);
444+
JdkValueReader valueReader = ObjectUtils.instantiate(JdkValueReader.class.getName(), testSettings);
445+
ScrollReader reader = new ScrollReader(ScrollReaderConfigBuilder.builder(valueReader, mappings.getResolvedView(), testSettings));
446+
ScrollReader.Scroll scroll = reader.read(stream);
447+
// The first entry is null. The second is an array with the single element '123'. And the third is an empty array
448+
assertNull(JsonUtils.query("status_code").apply(scroll.getHits().get(0)[1]));
449+
assertEquals(123L, JsonUtils.query("status_code").get(0).apply(scroll.getHits().get(1)[1]));
450+
assertEquals(Collections.emptyList(), JsonUtils.query("status_code").apply(scroll.getHits().get(2)[1]));
451+
}
452+
434453
@Test(expected = EsHadoopParsingException.class)
435454
public void testScrollWithBreakOnInvalidMapping() throws IOException {
436455
MappingSet mappings = getMappingSet("numbers-as-strings");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
{
2+
"artists" : {
3+
"mappings" : {
4+
"properties" : {
5+
"name" : {
6+
"type" : "text",
7+
"fields" : {
8+
"keyword" : {
9+
"type" : "keyword",
10+
"ignore_above" : 256
11+
}
12+
}
13+
},
14+
"status_code" : {
15+
"type" : "long",
16+
"fields" : {
17+
"keyword" : {
18+
"type" : "keyword",
19+
"ignore_above" : 256
20+
}
21+
}
22+
},
23+
"ts" : {
24+
"type" : "long"
25+
}
26+
}
27+
}
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
{
2+
"_scroll_id" : "FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFmJUOHBDZ0pjU1dPUm9CbThxUWp1M3cAAAAAAABtCxZzRFItZkxtWFRjZWxXZ3pZamhZZWZ3",
3+
"took" : 0,
4+
"timed_out" : false,
5+
"_shards" : {
6+
"total" : 1,
7+
"successful" : 1,
8+
"skipped" : 0,
9+
"failed" : 0
10+
},
11+
"hits" : {
12+
"total" : {
13+
"value" : 3,
14+
"relation" : "eq"
15+
},
16+
"max_score" : 1.0,
17+
"hits" : [
18+
{
19+
"_index" : "empty_list",
20+
"_type" : "_doc",
21+
"_id" : "1",
22+
"_score" : 1.0,
23+
"_source" : {
24+
"status_code" : null,
25+
"ts" : 12345678910,
26+
"name" : "john"
27+
}
28+
},
29+
{
30+
"_index" : "empty_list",
31+
"_type" : "_doc",
32+
"_id" : "2",
33+
"_score" : 1.0,
34+
"_source" : {
35+
"status_code" : [
36+
123
37+
],
38+
"ts" : null,
39+
"name" : "johnny"
40+
}
41+
},
42+
{
43+
"_index" : "empty_list",
44+
"_type" : "_doc",
45+
"_id" : "3",
46+
"_score" : 1.0,
47+
"_source" : {
48+
"status_code" : [ ],
49+
"ts" : 12345678910,
50+
"name" : null
51+
}
52+
}
53+
]
54+
}
55+
}

spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2242,6 +2242,26 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
22422242
assertEquals(2, df.count())
22432243
}
22442244

2245+
@Test
2246+
def testArraysAndNulls() {
2247+
val index = wrapIndex("sparksql-test-arrays-and-nulls")
2248+
val typed = "data"
2249+
val (target, docPath) = makeTargets(index, typed)
2250+
RestUtils.touch(index)
2251+
val document1 = """{ "id": 1, "status_code" : [123]}""".stripMargin
2252+
val document2 = """{ "id" : 2, "status_code" : []}""".stripMargin
2253+
val document3 = """{ "id" : 3, "status_code" : null}""".stripMargin
2254+
sc.makeRDD(Seq(document1, document2, document3)).saveJsonToEs(target)
2255+
RestUtils.refresh(index)
2256+
val df = sqc.read.format("es").option("es.read.field.as.array.include","status_code").load(index)
2257+
.select("id", "status_code")
2258+
var result = df.where("id = 1").first().getList(1)
2259+
assertEquals(123, result.get(0))
2260+
result = df.where("id = 2").first().getList(1)
2261+
assertTrue(result.isEmpty)
2262+
assertTrue(df.where("id = 3").first().isNullAt(1))
2263+
}
2264+
22452265
@Test
22462266
def testReadFieldInclude(): Unit = {
22472267
val data = Seq(

spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2324,6 +2324,26 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
23242324
assertEquals(2, df.count())
23252325
}
23262326

2327+
@Test
2328+
def testArraysAndNulls() {
2329+
val index = wrapIndex("sparksql-test-arrays-and-nulls")
2330+
val typed = "data"
2331+
val (target, docPath) = makeTargets(index, typed)
2332+
RestUtils.touch(index)
2333+
val document1 = """{ "id": 1, "status_code" : [123]}""".stripMargin
2334+
val document2 = """{ "id" : 2, "status_code" : []}""".stripMargin
2335+
val document3 = """{ "id" : 3, "status_code" : null}""".stripMargin
2336+
sc.makeRDD(Seq(document1, document2, document3)).saveJsonToEs(target)
2337+
RestUtils.refresh(index)
2338+
val df = sqc.read.format("es").option("es.read.field.as.array.include","status_code").load(index)
2339+
.select("id", "status_code")
2340+
var result = df.where("id = 1").first().getList(1)
2341+
assertEquals(123, result.get(0))
2342+
result = df.where("id = 2").first().getList(1)
2343+
assertTrue(result.isEmpty)
2344+
assertTrue(df.where("id = 3").first().isNullAt(1))
2345+
}
2346+
23272347
@Test
23282348
def testReadFieldInclude(): Unit = {
23292349
val data = Seq(

spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2325,6 +2325,26 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
23252325
assertEquals(2, df.count())
23262326
}
23272327

2328+
@Test
2329+
def testArraysAndNulls() {
2330+
val index = wrapIndex("sparksql-test-arrays-and-nulls")
2331+
val typed = "data"
2332+
val (target, docPath) = makeTargets(index, typed)
2333+
RestUtils.touch(index)
2334+
val document1 = """{ "id": 1, "status_code" : [123]}""".stripMargin
2335+
val document2 = """{ "id" : 2, "status_code" : []}""".stripMargin
2336+
val document3 = """{ "id" : 3, "status_code" : null}""".stripMargin
2337+
sc.makeRDD(Seq(document1, document2, document3)).saveJsonToEs(target)
2338+
RestUtils.refresh(index)
2339+
val df = sqc.read.format("es").option("es.read.field.as.array.include","status_code").load(index)
2340+
.select("id", "status_code")
2341+
var result = df.where("id = 1").first().getList(1)
2342+
assertEquals(123, result.get(0))
2343+
result = df.where("id = 2").first().getList(1)
2344+
assertTrue(result.isEmpty)
2345+
assertTrue(df.where("id = 3").first().isNullAt(1))
2346+
}
2347+
23282348
@Test
23292349
def testReadFieldInclude(): Unit = {
23302350
val data = Seq(

0 commit comments

Comments
 (0)