Skip to content

Commit 56a0d13

Browse files
authored
Adding support for wildcard fields (#1823)
Adding support for wildcard fields in Elasticsearch mappings.
1 parent 2a63ea1 commit 56a0d13

File tree

10 files changed

+93
-0
lines changed

10 files changed

+93
-0
lines changed

mr/src/main/java/org/elasticsearch/hadoop/serialization/FieldType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public enum FieldType {
4444
TOKEN_COUNT,
4545
// ES 5.x
4646
TEXT, KEYWORD, HALF_FLOAT, SCALED_FLOAT,
47+
WILDCARD,
4748

4849
GEO_POINT,
4950
GEO_SHAPE,
@@ -81,6 +82,7 @@ public enum FieldType {
8182
CAST_HIERARCHY.put(TOKEN_COUNT, new LinkedHashSet<FieldType>(Arrays.asList(LONG, KEYWORD)));
8283
CAST_HIERARCHY.put(TEXT, new LinkedHashSet<FieldType>(Collections.singletonList(KEYWORD)));
8384
CAST_HIERARCHY.put(KEYWORD, new LinkedHashSet<FieldType>());
85+
CAST_HIERARCHY.put(WILDCARD, new LinkedHashSet<FieldType>(Collections.singletonList(KEYWORD)));
8486
CAST_HIERARCHY.put(HALF_FLOAT, new LinkedHashSet<FieldType>(Arrays.asList(FLOAT, DOUBLE, KEYWORD)));
8587
CAST_HIERARCHY.put(SCALED_FLOAT, new LinkedHashSet<FieldType>(Arrays.asList(DOUBLE, KEYWORD)));
8688
CAST_HIERARCHY.put(GEO_POINT, new LinkedHashSet<FieldType>());

mr/src/test/java/org/elasticsearch/hadoop/serialization/dto/mapping/MappingTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import static org.elasticsearch.hadoop.serialization.FieldType.SHORT;
5555
import static org.elasticsearch.hadoop.serialization.FieldType.STRING;
5656
import static org.elasticsearch.hadoop.serialization.FieldType.TEXT;
57+
import static org.elasticsearch.hadoop.serialization.FieldType.WILDCARD;
5758

5859
import static org.elasticsearch.hadoop.serialization.dto.mapping.MappingUtils.findTypos;
5960
import static org.junit.Assert.assertEquals;
@@ -167,6 +168,8 @@ public void testPrimitivesParsing() throws Exception {
167168
assertEquals(SCALED_FLOAT, props[13].type());
168169
assertEquals("field15", props[14].name());
169170
assertEquals(DATE_NANOS, props[14].type());
171+
assertEquals("field16", props[15].name());
172+
assertEquals(WILDCARD, props[15].type());
170173
}
171174

172175
@Test

mr/src/test/resources/org/elasticsearch/hadoop/serialization/dto/mapping/typed/primitives.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@
4848
},
4949
"field15" : {
5050
"type" : "date_nanos"
51+
},
52+
"field16" : {
53+
"type" : "wildcard"
5154
}
5255
}
5356
}

spark/core/src/main/scala/org/elasticsearch/spark/serialization/ScalaValueReader.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import org.elasticsearch.hadoop.serialization.FieldType.SHORT
4747
import org.elasticsearch.hadoop.serialization.FieldType.STRING
4848
import org.elasticsearch.hadoop.serialization.FieldType.TEXT
4949
import org.elasticsearch.hadoop.serialization.FieldType.TOKEN_COUNT
50+
import org.elasticsearch.hadoop.serialization.FieldType.WILDCARD
5051
import org.elasticsearch.hadoop.serialization.Parser
5152
import org.elasticsearch.hadoop.serialization.Parser.Token.VALUE_BOOLEAN
5253
import org.elasticsearch.hadoop.serialization.Parser.Token.VALUE_NULL
@@ -82,6 +83,7 @@ class ScalaValueReader extends AbstractValueReader with SettingsAware {
8283
case STRING => textValue(value, parser)
8384
case TEXT => textValue(value, parser)
8485
case KEYWORD => textValue(value, parser)
86+
case WILDCARD => textValue(value, parser)
8587
case BYTE => byteValue(value, parser)
8688
case SHORT => shortValue(value, parser)
8789
case INTEGER => intValue(value, parser)

spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2286,6 +2286,32 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
22862286
assertEquals(0, result(0).size)
22872287
}
22882288

2289+
@Test
2290+
def testWildcard() {
2291+
val mapping = wrapMapping("data", s"""{
2292+
| "properties": {
2293+
| "name": {
2294+
| "type": "wildcard"
2295+
| }
2296+
| }
2297+
| }
2298+
""".stripMargin)
2299+
2300+
val index = wrapIndex("sparksql-test-wildcard")
2301+
val typed = "data"
2302+
val (target, docPath) = makeTargets(index, typed)
2303+
RestUtils.touch(index)
2304+
RestUtils.putMapping(index, typed, mapping.getBytes(StringUtils.UTF_8))
2305+
val wildcardDocument = """{ "name" : "Chipotle Mexican Grill"}""".stripMargin
2306+
sc.makeRDD(Seq(wildcardDocument)).saveJsonToEs(target)
2307+
RestUtils.refresh(index)
2308+
val df = sqc.read.format("es").load(index)
2309+
val dataType = df.schema("name").dataType
2310+
assertEquals("string", dataType.typeName)
2311+
val head = df.head()
2312+
assertThat(head.getString(0), containsString("Chipotle"))
2313+
}
2314+
22892315
/**
22902316
* Take advantage of the fixed method order and clear out all created indices.
22912317
* The indices will last in Elasticsearch for all parameters of this test suite.

spark/sql-13/src/main/scala/org/elasticsearch/spark/sql/SchemaUtils.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ import org.elasticsearch.hadoop.serialization.FieldType.OBJECT
6868
import org.elasticsearch.hadoop.serialization.FieldType.SHORT
6969
import org.elasticsearch.hadoop.serialization.FieldType.STRING
7070
import org.elasticsearch.hadoop.serialization.FieldType.TEXT
71+
import org.elasticsearch.hadoop.serialization.FieldType.WILDCARD
7172
import org.elasticsearch.hadoop.serialization.dto.mapping.Field
7273
import org.elasticsearch.hadoop.serialization.dto.mapping.GeoField
7374
import org.elasticsearch.hadoop.serialization.dto.mapping.GeoPointType
@@ -163,6 +164,7 @@ private[sql] object SchemaUtils {
163164
case STRING => StringType
164165
case TEXT => StringType
165166
case KEYWORD => StringType
167+
case WILDCARD => StringType
166168
case HALF_FLOAT => FloatType
167169
case SCALED_FLOAT => FloatType
168170
case DATE => if (cfg.getMappingDateRich) TimestampType else StringType

spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2368,6 +2368,32 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
23682368
assertEquals(0, result(0).size)
23692369
}
23702370

2371+
@Test
2372+
def testWildcard() {
2373+
val mapping = wrapMapping("data", s"""{
2374+
| "properties": {
2375+
| "name": {
2376+
| "type": "wildcard"
2377+
| }
2378+
| }
2379+
| }
2380+
""".stripMargin)
2381+
2382+
val index = wrapIndex("sparksql-test-wildcard")
2383+
val typed = "data"
2384+
val (target, docPath) = makeTargets(index, typed)
2385+
RestUtils.touch(index)
2386+
RestUtils.putMapping(index, typed, mapping.getBytes(StringUtils.UTF_8))
2387+
val wildcardDocument = """{ "name" : "Chipotle Mexican Grill"}""".stripMargin
2388+
sc.makeRDD(Seq(wildcardDocument)).saveJsonToEs(target)
2389+
RestUtils.refresh(index)
2390+
val df = sqc.read.format("es").load(index)
2391+
val dataType = df.schema("name").dataType
2392+
assertEquals("string", dataType.typeName)
2393+
val head = df.head()
2394+
assertThat(head.getString(0), containsString("Chipotle"))
2395+
}
2396+
23712397
/**
23722398
* Take advantage of the fixed method order and clear out all created indices.
23732399
* The indices will last in Elasticsearch for all parameters of this test suite.

spark/sql-20/src/main/scala/org/elasticsearch/spark/sql/SchemaUtils.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ import org.elasticsearch.hadoop.serialization.FieldType.OBJECT
6868
import org.elasticsearch.hadoop.serialization.FieldType.SHORT
6969
import org.elasticsearch.hadoop.serialization.FieldType.STRING
7070
import org.elasticsearch.hadoop.serialization.FieldType.TEXT
71+
import org.elasticsearch.hadoop.serialization.FieldType.WILDCARD
7172
import org.elasticsearch.hadoop.serialization.dto.mapping.Field
7273
import org.elasticsearch.hadoop.serialization.dto.mapping.GeoField
7374
import org.elasticsearch.hadoop.serialization.dto.mapping.GeoPointType
@@ -165,6 +166,7 @@ private[sql] object SchemaUtils {
165166
case STRING => StringType
166167
case TEXT => StringType
167168
case KEYWORD => StringType
169+
case WILDCARD => StringType
168170
case DATE => if (cfg.getMappingDateRich) TimestampType else StringType
169171
case DATE_NANOS => if (cfg.getMappingDateRich) TimestampType else StringType
170172
case OBJECT => convertToStruct(field, geoInfo, absoluteName, arrayIncludes, arrayExcludes, cfg)

spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2369,6 +2369,31 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
23692369
assertEquals(0, result(0).size)
23702370
}
23712371

2372+
@Test
2373+
def testWildcard() {
2374+
val mapping = wrapMapping("data", s"""{
2375+
| "properties": {
2376+
| "name": {
2377+
| "type": "wildcard"
2378+
| }
2379+
| }
2380+
| }
2381+
""".stripMargin)
2382+
2383+
val index = wrapIndex("sparksql-test-wildcard")
2384+
val typed = "data"
2385+
val (target, docPath) = makeTargets(index, typed)
2386+
RestUtils.touch(index)
2387+
RestUtils.putMapping(index, typed, mapping.getBytes(StringUtils.UTF_8))
2388+
val wildcardDocument = """{ "name" : "Chipotle Mexican Grill"}""".stripMargin
2389+
sc.makeRDD(Seq(wildcardDocument)).saveJsonToEs(target)
2390+
RestUtils.refresh(index)
2391+
val df = sqc.read.format("es").load(index)
2392+
val dataType = df.schema("name").dataType
2393+
assertEquals("string", dataType.typeName)
2394+
val head = df.head()
2395+
assertThat(head.getString(0), containsString("Chipotle"))
2396+
}
23722397

23732398
/**
23742399
* Take advantage of the fixed method order and clear out all created indices.

spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/SchemaUtils.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ import org.elasticsearch.hadoop.serialization.FieldType.OBJECT
6969
import org.elasticsearch.hadoop.serialization.FieldType.SHORT
7070
import org.elasticsearch.hadoop.serialization.FieldType.STRING
7171
import org.elasticsearch.hadoop.serialization.FieldType.TEXT
72+
import org.elasticsearch.hadoop.serialization.FieldType.WILDCARD
7273
import org.elasticsearch.hadoop.serialization.dto.mapping.Field
7374
import org.elasticsearch.hadoop.serialization.dto.mapping.GeoField
7475
import org.elasticsearch.hadoop.serialization.dto.mapping.GeoPointType
@@ -165,6 +166,7 @@ private[sql] object SchemaUtils {
165166
case STRING => StringType
166167
case TEXT => StringType
167168
case KEYWORD => StringType
169+
case WILDCARD => StringType
168170
case DATE => if (cfg.getMappingDateRich) TimestampType else StringType
169171
case DATE_NANOS => if (cfg.getMappingDateRich) TimestampType else StringType
170172
case OBJECT => convertToStruct(field, geoInfo, absoluteName, arrayIncludes, arrayExcludes, cfg)

0 commit comments

Comments
 (0)