Skip to content

Commit 141284e

Browse files
authored
[7.17] Set UserProvider before discovery in Spark SQL integrations (#1934) (#1935)
This PR adds the requisite configuration step for the UserProvider implementation before attempting to discover the cluster information in the SparkSQL integrations. This additionally updates the integration tests to add coverage for the write and read paths for SparkSQL.
1 parent 49a3e91 commit 141284e

File tree

10 files changed

+48
-32
lines changed

10 files changed

+48
-32
lines changed

qa/kerberos/build.gradle

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ if (disableTests) {
517517
// =============================================================================
518518
// Spark Jobs
519519
// =============================================================================
520-
520+
521521
// Run the Spark job to load data to ES. Ensure Kerberos settings are available.
522522
SparkApp sparkLoadData = config.createClusterTask('sparkLoadData', SparkApp.class) {
523523
clusterConfiguration = config
@@ -674,7 +674,9 @@ if (disableTests) {
674674

675675
Map<String, Task> readJobs = [
676676
'mr': mrReadData,
677-
'spark': sparkReadData,
677+
'sparkRDD': sparkReadData,
678+
'sparkDF': sparkReadData,
679+
'sparkDS': sparkReadData,
678680
'hive': hiveReadData,
679681
'pig': pigReadData
680682
]

qa/kerberos/src/itest/java/org/elasticsearch/hadoop/qa/kerberos/AbstractClusterVerificationTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ public class AbstractClusterVerificationTests {
4040
public static Collection<Object[]> params() {
4141
List<Object[]> params = new ArrayList<>();
4242
params.add(new Object[]{"mr", "part-m-", 345, true});
43-
params.add(new Object[]{"spark", "part-", 345, true});
43+
params.add(new Object[]{"sparkRDD", "part-", 345, true});
44+
params.add(new Object[]{"sparkDF", "part-", 345, true});
45+
params.add(new Object[]{"sparkDS", "part-", 345, true});
4446
params.add(new Object[]{"hive", "000000_0", 345, false});
4547
params.add(new Object[]{"pig", "part-m-", 345, true});
4648
return params;

qa/kerberos/src/main/scala/org/elasticsearch/hadoop/qa/kerberos/spark/LoadToES.scala

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@
2020
package org.elasticsearch.hadoop.qa.kerberos.spark
2121

2222
import java.security.PrivilegedExceptionAction
23-
2423
import org.apache.spark.SparkConf
2524
import org.apache.spark.sql.SparkSession
25+
import org.apache.spark.sql.types.{StringType, StructField, StructType}
2626
import org.elasticsearch.hadoop.qa.kerberos.security.KeytabLogin
2727
import org.elasticsearch.spark._
28+
import org.elasticsearch.spark.sql._
2829

2930
class LoadToES(args: Array[String]) {
3031

@@ -37,25 +38,16 @@ class LoadToES(args: Array[String]) {
3738
}
3839
val resource = sparkConf.get("spark.es.resource")
3940
val fieldNames = sparkConf.get(LoadToES.CONF_FIELD_NAMES).split(",")
41+
val schema = StructType(fieldNames.map(StructField(_, StringType)))
4042

41-
val df = spark.sqlContext.read.textFile(args(0))
42-
43-
val parsedData = df.rdd
44-
.map(line => {
45-
var record: Map[String, Object] = Map()
46-
val fields = line.split('\t')
47-
var fieldNum = 0
48-
for (field <- fields) {
49-
if (fieldNum < fieldNames.length) {
50-
val fieldName = fieldNames(fieldNum)
51-
record = record + (fieldName -> field)
52-
}
53-
fieldNum = fieldNum + 1
54-
}
55-
record
56-
})
43+
val df = spark.sqlContext.read
44+
.schema(schema)
45+
.option("sep", "\t")
46+
.csv(args(0))
5747

58-
parsedData.saveToEs(resource)
48+
df.rdd.map(row => row.getValuesMap(row.schema.fieldNames)).saveToEs(s"${resource}_rdd")
49+
df.saveToEs(s"${resource}_df")
50+
df.write.format("es").save(s"${resource}_ds")
5951
}
6052
}
6153

qa/kerberos/src/main/scala/org/elasticsearch/hadoop/qa/kerberos/spark/ReadFromES.scala

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.SparkConf
2525
import org.apache.spark.sql.SparkSession
2626
import org.elasticsearch.hadoop.qa.kerberos.security.KeytabLogin
2727
import org.elasticsearch.spark._
28+
import org.elasticsearch.spark.sql._
2829

2930
class ReadFromES(args: Array[String]) {
3031

@@ -34,14 +35,22 @@ class ReadFromES(args: Array[String]) {
3435
def run(): Unit = {
3536
val resource = sparkConf.get("spark.es.resource")
3637

37-
spark.sparkContext.esJsonRDD(resource).saveAsTextFile(args(0))
38-
// spark.sqlContext
39-
// .read
40-
// .format("es")
41-
// .option("es.output.json", "true")
42-
// .load(resource)
43-
// .write
44-
// .text(args(0))
38+
// Expected directory names in :qa:kerberos:build.gradle readJobs
39+
val rddOutputDir = s"${args(0)}RDD"
40+
val dfOutputDir = s"${args(0)}DF"
41+
val dsOutputDir = s"${args(0)}DS"
42+
43+
spark.sparkContext.esJsonRDD(s"${resource}_rdd").saveAsTextFile(rddOutputDir)
44+
45+
spark.sqlContext.esDF(s"${resource}_df")
46+
.rdd
47+
.map(row => row.toString())
48+
.saveAsTextFile(dfOutputDir)
49+
50+
spark.sqlContext.read.format("es").load(s"${resource}_ds")
51+
.rdd
52+
.map(row => row.toString())
53+
.saveAsTextFile(dsOutputDir)
4554
}
4655
}
4756

spark/sql-13/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
151151

152152
@transient private[sql] lazy val cfg = {
153153
val conf = new SparkSettingsManager().load(sqlContext.sparkContext.getConf).merge(parameters.asJava)
154+
InitializationUtils.setUserProviderIfNotSet(conf, classOf[HadoopUserProvider], LogFactory.getLog(classOf[ElasticsearchRelation]))
154155
InitializationUtils.discoverClusterInfo(conf, LogFactory.getLog(classOf[ElasticsearchRelation]))
155156
conf
156157
}
@@ -533,10 +534,10 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
533534

534535
// perform a scan-scroll delete
535536
val cfgCopy = cfg.copy()
537+
InitializationUtils.setUserProviderIfNotSet(cfgCopy, classOf[HadoopUserProvider], null)
536538
InitializationUtils.discoverClusterInfo(cfgCopy, Utils.LOGGER)
537539
InitializationUtils.setValueWriterIfNotSet(cfgCopy, classOf[JdkValueWriter], null)
538540
InitializationUtils.setFieldExtractorIfNotSet(cfgCopy, classOf[ConstantFieldExtractor], null) //throw away extractor
539-
InitializationUtils.setUserProviderIfNotSet(cfgCopy, classOf[HadoopUserProvider], null)
540541
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_FLUSH_MANUAL, "false")
541542
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, "1000")
542543
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_BYTES, "1mb")

spark/sql-13/src/main/scala/org/elasticsearch/spark/sql/EsSparkSQL.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_QUERY
3030
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_READ
3131
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_WRITE
3232
import org.elasticsearch.hadoop.cfg.PropertiesSettings
33+
import org.elasticsearch.hadoop.mr.security.HadoopUserProvider
3334
import org.elasticsearch.hadoop.util.ObjectUtils
3435
import org.elasticsearch.spark.cfg.SparkSettingsManager
3536
import org.elasticsearch.hadoop.rest.InitializationUtils
@@ -73,6 +74,7 @@ object EsSparkSQL {
7374
esCfg.merge(cfg.asJava)
7475

7576
// Need to discover es version before checking index existence
77+
InitializationUtils.setUserProviderIfNotSet(esCfg, classOf[HadoopUserProvider], LOG)
7678
InitializationUtils.discoverClusterInfo(esCfg, LOG)
7779
InitializationUtils.checkIdForOperation(esCfg)
7880
InitializationUtils.checkIndexExistence(esCfg)

spark/sql-20/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ private[sql] class DefaultSource extends RelationProvider with SchemaRelationPro
142142
.load(sqlContext.sparkContext.getConf)
143143
.merge(streamParams(mapConfig.toMap, sparkSession).asJava)
144144

145+
InitializationUtils.setUserProviderIfNotSet(jobSettings, classOf[HadoopUserProvider], LogFactory.getLog(classOf[DefaultSource]))
145146
InitializationUtils.discoverClusterInfo(jobSettings, LogFactory.getLog(classOf[DefaultSource]))
146147
InitializationUtils.checkIdForOperation(jobSettings)
147148
InitializationUtils.checkIndexExistence(jobSettings)
@@ -222,6 +223,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
222223

223224
@transient lazy val cfg = {
224225
val conf = new SparkSettingsManager().load(sqlContext.sparkContext.getConf).merge(parameters.asJava)
226+
InitializationUtils.setUserProviderIfNotSet(conf, classOf[HadoopUserProvider], LogFactory.getLog(classOf[ElasticsearchRelation]))
225227
InitializationUtils.discoverClusterInfo(conf, LogFactory.getLog(classOf[ElasticsearchRelation]))
226228
conf
227229
}
@@ -604,10 +606,10 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
604606

605607
// perform a scan-scroll delete
606608
val cfgCopy = cfg.copy()
609+
InitializationUtils.setUserProviderIfNotSet(cfgCopy, classOf[HadoopUserProvider], null)
607610
InitializationUtils.discoverClusterInfo(cfgCopy, Utils.LOGGER)
608611
InitializationUtils.setValueWriterIfNotSet(cfgCopy, classOf[JdkValueWriter], null)
609612
InitializationUtils.setFieldExtractorIfNotSet(cfgCopy, classOf[ConstantFieldExtractor], null) //throw away extractor
610-
InitializationUtils.setUserProviderIfNotSet(cfgCopy, classOf[HadoopUserProvider], null)
611613
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_FLUSH_MANUAL, "false")
612614
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, "1000")
613615
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_BYTES, "1mb")

spark/sql-20/src/main/scala/org/elasticsearch/spark/sql/EsSparkSQL.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_QUERY
2828
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_READ
2929
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_WRITE
3030
import org.elasticsearch.hadoop.cfg.PropertiesSettings
31+
import org.elasticsearch.hadoop.mr.security.HadoopUserProvider
3132
import org.elasticsearch.hadoop.rest.InitializationUtils
3233
import org.elasticsearch.hadoop.util.ObjectUtils
3334
import org.elasticsearch.spark.cfg.SparkSettingsManager
@@ -94,6 +95,7 @@ object EsSparkSQL {
9495
esCfg.merge(cfg.asJava)
9596

9697
// Need to discover ES Version before checking index existence
98+
InitializationUtils.setUserProviderIfNotSet(esCfg, classOf[HadoopUserProvider], LOG)
9799
InitializationUtils.discoverClusterInfo(esCfg, LOG)
98100
InitializationUtils.checkIdForOperation(esCfg)
99101
InitializationUtils.checkIndexExistence(esCfg)

spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ private[sql] class DefaultSource extends RelationProvider with SchemaRelationPro
142142
.load(sqlContext.sparkContext.getConf)
143143
.merge(streamParams(mapConfig.toMap, sparkSession).asJava)
144144

145+
InitializationUtils.setUserProviderIfNotSet(jobSettings, classOf[HadoopUserProvider], LogFactory.getLog(classOf[DefaultSource]))
145146
InitializationUtils.discoverClusterInfo(jobSettings, LogFactory.getLog(classOf[DefaultSource]))
146147
InitializationUtils.checkIdForOperation(jobSettings)
147148
InitializationUtils.checkIndexExistence(jobSettings)
@@ -222,6 +223,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
222223

223224
@transient lazy val cfg = {
224225
val conf = new SparkSettingsManager().load(sqlContext.sparkContext.getConf).merge(parameters.asJava)
226+
InitializationUtils.setUserProviderIfNotSet(conf, classOf[HadoopUserProvider], LogFactory.getLog(classOf[ElasticsearchRelation]))
225227
InitializationUtils.discoverClusterInfo(conf, LogFactory.getLog(classOf[ElasticsearchRelation]))
226228
conf
227229
}
@@ -604,10 +606,10 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
604606

605607
// perform a scan-scroll delete
606608
val cfgCopy = cfg.copy()
609+
InitializationUtils.setUserProviderIfNotSet(cfgCopy, classOf[HadoopUserProvider], null)
607610
InitializationUtils.discoverClusterInfo(cfgCopy, Utils.LOGGER)
608611
InitializationUtils.setValueWriterIfNotSet(cfgCopy, classOf[JdkValueWriter], null)
609612
InitializationUtils.setFieldExtractorIfNotSet(cfgCopy, classOf[ConstantFieldExtractor], null) //throw away extractor
610-
InitializationUtils.setUserProviderIfNotSet(cfgCopy, classOf[HadoopUserProvider], null)
611613
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_FLUSH_MANUAL, "false")
612614
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, "1000")
613615
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_BYTES, "1mb")

spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/EsSparkSQL.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_QUERY
2828
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_READ
2929
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_WRITE
3030
import org.elasticsearch.hadoop.cfg.PropertiesSettings
31+
import org.elasticsearch.hadoop.mr.security.HadoopUserProvider
3132
import org.elasticsearch.hadoop.rest.InitializationUtils
3233
import org.elasticsearch.hadoop.util.ObjectUtils
3334
import org.elasticsearch.spark.cfg.SparkSettingsManager
@@ -94,6 +95,7 @@ object EsSparkSQL {
9495
esCfg.merge(cfg.asJava)
9596

9697
// Need to discover ES Version before checking index existence
98+
InitializationUtils.setUserProviderIfNotSet(esCfg, classOf[HadoopUserProvider], LOG)
9799
InitializationUtils.discoverClusterInfo(esCfg, LOG)
98100
InitializationUtils.checkIdForOperation(esCfg)
99101
InitializationUtils.checkIndexExistence(esCfg)

0 commit comments

Comments
 (0)