Skip to content

Commit 6ce9390

Browse files
authored
resolve saveToEs saves case classes fields with NULL values #998 (#1478)
This PR fix the Scala Writer when it processes case classes with null fields. Closes #998
1 parent fd00a53 commit 6ce9390

File tree

3 files changed

+65
-4
lines changed

3 files changed

+65
-4
lines changed

mr/src/main/java/org/elasticsearch/hadoop/serialization/builder/FilteringValueWriter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
import org.elasticsearch.hadoop.serialization.field.FieldFilter;
2828
import org.elasticsearch.hadoop.serialization.field.FieldFilter.NumberedInclude;
2929
import org.elasticsearch.hadoop.util.StringUtils;
30+
import static org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_SPARK_DATAFRAME_WRITE_NULL_VALUES_DEFAULT;
3031

3132
public abstract class FilteringValueWriter<T> implements ValueWriter<T>, SettingsAware {
3233

3334
private List<NumberedInclude> includes;
3435
private List<String> excludes;
36+
private Boolean writeNullValues = Boolean.parseBoolean(ES_SPARK_DATAFRAME_WRITE_NULL_VALUES_DEFAULT);
3537

3638
@Override
3739
public void setSettings(Settings settings) {
@@ -41,10 +43,15 @@ public void setSettings(Settings settings) {
4143
includes.add(new NumberedInclude(include));
4244
}
4345
excludes = StringUtils.tokenize(settings.getMappingExcludes());
46+
writeNullValues = settings.getDataFrameWriteNullValues();
4447
}
4548

4649
protected boolean shouldKeep(String parentField, String name) {
4750
name = StringUtils.hasText(parentField) ? parentField + "." + name : name;
4851
return FieldFilter.filter(name, includes, excludes).matched;
4952
}
53+
54+
protected boolean hasWriteNullValues() {
55+
return writeNullValues;
56+
}
5057
}

spark/core/src/main/scala/org/elasticsearch/spark/serialization/ScalaValueWriter.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,12 @@ class ScalaValueWriter(writeUnknownTypes: Boolean = false) extends JdkValueWrite
6161
generator.writeBeginObject()
6262
for ((k, v) <- m) {
6363
if (shouldKeep(parentField, k.toString)) {
64-
generator.writeFieldName(k.toString)
65-
val result = doWrite(v, generator, k.toString)
66-
if (!result.isSuccesful) {
67-
return result
64+
if (v != None && v!= null && v != () || hasWriteNullValues) {
65+
generator.writeFieldName(k.toString)
66+
val result = doWrite(v, generator, k.toString)
67+
if (!result.isSuccesful) {
68+
return result
69+
}
6870
}
6971
}
7072
}

spark/core/src/test/scala/org/elasticsearch/spark/serialization/ScalaValueWriterTest.scala

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,58 @@ class ScalaValueWriterTest {
154154
println(serialize(node1))
155155
}
156156

157+
@Test
158+
def testCaseClassWithNone(): Unit = {
159+
160+
case class TestCaseClass(option : Option[String])
161+
val caseClass = TestCaseClass(None)
162+
163+
assertEquals("""{}""", serialize(caseClass))
164+
}
165+
166+
@Test
167+
def testCaseClassWithSome(): Unit = {
168+
169+
case class TestCaseClass(option : Option[String])
170+
val caseClass = TestCaseClass(Some("value"))
171+
172+
assertEquals("""{"option":"value"}""", serialize(caseClass))
173+
}
174+
175+
176+
@Test
177+
def testCaseClassWithSomeAndNone(): Unit = {
178+
179+
case class TestCaseClass(option1 : Option[String], option2 : Option[String])
180+
val caseClass = TestCaseClass(None, Some("value2"))
181+
182+
assertEquals("""{"option2":"value2"}""", serialize(caseClass))
183+
}
184+
185+
186+
@Test
187+
def testCaseClassWithInnerObject(): Unit = {
188+
189+
case class TestCaseClass(option1 : Option[String], option2 : Option[TestCaseClassInner])
190+
case class TestCaseClassInner(option1 : Option[String], option2 : Option[String])
191+
val caseClass = TestCaseClass(None, Some(TestCaseClassInner(option1 = Some("value1") , option2 = None)))
192+
193+
assertEquals("""{"option2":{"option1":"value1"}}""", serialize(caseClass))
194+
}
195+
196+
@Test
197+
def testCaseClassWithInnerObjectAndNullSetting(): Unit = {
198+
199+
case class TestCaseClass(option1: Option[String], option2: Option[TestCaseClassInner])
200+
case class TestCaseClassInner(option1: Option[String], option2: Option[String])
201+
val caseClass = TestCaseClass(None, Some(TestCaseClassInner(option1 = Some("value1"), option2 = None)))
202+
203+
val settings = new TestSettings()
204+
settings.setProperty(ConfigurationOptions.ES_SPARK_DATAFRAME_WRITE_NULL_VALUES, "true")
205+
206+
assertEquals("""{"option1":null,"option2":{"option1":"value1","option2":null}}""", serialize(caseClass, settings))
207+
}
208+
157209
@Test
158210
def testDate(): Unit = {
159211
val date = new Date(1420114230123l)

0 commit comments

Comments
 (0)