Skip to content

Commit 5e9bcaa

Browse files
authored
Set UserProvider before discovery in Spark SQL integrations (#1934)
This PR adds the requisite configuration step for the UserProvider implementation before attempting to discover the cluster information in the SparkSQL integrations. This additionally updates the integration tests to add coverage for the write and read paths for SparkSQL.
1 parent e0ef266 commit 5e9bcaa

File tree

10 files changed

+48
-32
lines changed

10 files changed

+48
-32
lines changed

qa/kerberos/build.gradle

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ if (disableTests) {
521521
// =============================================================================
522522
// Spark Jobs
523523
// =============================================================================
524-
524+
525525
// Run the Spark job to load data to ES. Ensure Kerberos settings are available.
526526
SparkApp sparkLoadData = config.createClusterTask('sparkLoadData', SparkApp.class) {
527527
clusterConfiguration = config
@@ -682,7 +682,9 @@ if (disableTests) {
682682

683683
Map<String, Task> readJobs = [
684684
'mr': mrReadData,
685-
'spark': sparkReadData,
685+
'sparkRDD': sparkReadData,
686+
'sparkDF': sparkReadData,
687+
'sparkDS': sparkReadData,
686688
'hive': hiveReadData,
687689
'pig': pigReadData
688690
]

qa/kerberos/src/itest/java/org/elasticsearch/hadoop/qa/kerberos/AbstractClusterVerificationTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ public class AbstractClusterVerificationTests {
4040
public static Collection<Object[]> params() {
4141
List<Object[]> params = new ArrayList<>();
4242
params.add(new Object[]{"mr", "part-m-", 345, true});
43-
params.add(new Object[]{"spark", "part-", 345, true});
43+
params.add(new Object[]{"sparkRDD", "part-", 345, true});
44+
params.add(new Object[]{"sparkDF", "part-", 345, true});
45+
params.add(new Object[]{"sparkDS", "part-", 345, true});
4446
params.add(new Object[]{"hive", "000000_0", 345, false});
4547
params.add(new Object[]{"pig", "part-m-", 345, true});
4648
return params;

qa/kerberos/src/main/scala/org/elasticsearch/hadoop/qa/kerberos/spark/LoadToES.scala

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@
2020
package org.elasticsearch.hadoop.qa.kerberos.spark
2121

2222
import java.security.PrivilegedExceptionAction
23-
2423
import org.apache.spark.SparkConf
2524
import org.apache.spark.sql.SparkSession
25+
import org.apache.spark.sql.types.{StringType, StructField, StructType}
2626
import org.elasticsearch.hadoop.qa.kerberos.security.KeytabLogin
2727
import org.elasticsearch.spark._
28+
import org.elasticsearch.spark.sql._
2829

2930
class LoadToES(args: Array[String]) {
3031

@@ -37,25 +38,16 @@ class LoadToES(args: Array[String]) {
3738
}
3839
val resource = sparkConf.get("spark.es.resource")
3940
val fieldNames = sparkConf.get(LoadToES.CONF_FIELD_NAMES).split(",")
41+
val schema = StructType(fieldNames.map(StructField(_, StringType)))
4042

41-
val df = spark.sqlContext.read.textFile(args(0))
42-
43-
val parsedData = df.rdd
44-
.map(line => {
45-
var record: Map[String, Object] = Map()
46-
val fields = line.split('\t')
47-
var fieldNum = 0
48-
for (field <- fields) {
49-
if (fieldNum < fieldNames.length) {
50-
val fieldName = fieldNames(fieldNum)
51-
record = record + (fieldName -> field)
52-
}
53-
fieldNum = fieldNum + 1
54-
}
55-
record
56-
})
43+
val df = spark.sqlContext.read
44+
.schema(schema)
45+
.option("sep", "\t")
46+
.csv(args(0))
5747

58-
parsedData.saveToEs(resource)
48+
df.rdd.map(row => row.getValuesMap(row.schema.fieldNames)).saveToEs(s"${resource}_rdd")
49+
df.saveToEs(s"${resource}_df")
50+
df.write.format("es").save(s"${resource}_ds")
5951
}
6052
}
6153

qa/kerberos/src/main/scala/org/elasticsearch/hadoop/qa/kerberos/spark/ReadFromES.scala

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.SparkConf
2525
import org.apache.spark.sql.SparkSession
2626
import org.elasticsearch.hadoop.qa.kerberos.security.KeytabLogin
2727
import org.elasticsearch.spark._
28+
import org.elasticsearch.spark.sql._
2829

2930
class ReadFromES(args: Array[String]) {
3031

@@ -34,14 +35,22 @@ class ReadFromES(args: Array[String]) {
3435
def run(): Unit = {
3536
val resource = sparkConf.get("spark.es.resource")
3637

37-
spark.sparkContext.esJsonRDD(resource).saveAsTextFile(args(0))
38-
// spark.sqlContext
39-
// .read
40-
// .format("es")
41-
// .option("es.output.json", "true")
42-
// .load(resource)
43-
// .write
44-
// .text(args(0))
38+
// Expected directory names in :qa:kerberos:build.gradle readJobs
39+
val rddOutputDir = s"${args(0)}RDD"
40+
val dfOutputDir = s"${args(0)}DF"
41+
val dsOutputDir = s"${args(0)}DS"
42+
43+
spark.sparkContext.esJsonRDD(s"${resource}_rdd").saveAsTextFile(rddOutputDir)
44+
45+
spark.sqlContext.esDF(s"${resource}_df")
46+
.rdd
47+
.map(row => row.toString())
48+
.saveAsTextFile(dfOutputDir)
49+
50+
spark.sqlContext.read.format("es").load(s"${resource}_ds")
51+
.rdd
52+
.map(row => row.toString())
53+
.saveAsTextFile(dsOutputDir)
4554
}
4655
}
4756

spark/sql-13/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
151151

152152
@transient private[sql] lazy val cfg = {
153153
val conf = new SparkSettingsManager().load(sqlContext.sparkContext.getConf).merge(parameters.asJava)
154+
InitializationUtils.setUserProviderIfNotSet(conf, classOf[HadoopUserProvider], LogFactory.getLog(classOf[ElasticsearchRelation]))
154155
InitializationUtils.discoverClusterInfo(conf, LogFactory.getLog(classOf[ElasticsearchRelation]))
155156
conf
156157
}
@@ -533,10 +534,10 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
533534

534535
// perform a scan-scroll delete
535536
val cfgCopy = cfg.copy()
537+
InitializationUtils.setUserProviderIfNotSet(cfgCopy, classOf[HadoopUserProvider], null)
536538
InitializationUtils.discoverClusterInfo(cfgCopy, Utils.LOGGER)
537539
InitializationUtils.setValueWriterIfNotSet(cfgCopy, classOf[JdkValueWriter], null)
538540
InitializationUtils.setFieldExtractorIfNotSet(cfgCopy, classOf[ConstantFieldExtractor], null) //throw away extractor
539-
InitializationUtils.setUserProviderIfNotSet(cfgCopy, classOf[HadoopUserProvider], null)
540541
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_FLUSH_MANUAL, "false")
541542
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, "1000")
542543
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_BYTES, "1mb")

spark/sql-13/src/main/scala/org/elasticsearch/spark/sql/EsSparkSQL.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_QUERY
3030
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_READ
3131
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_WRITE
3232
import org.elasticsearch.hadoop.cfg.PropertiesSettings
33+
import org.elasticsearch.hadoop.mr.security.HadoopUserProvider
3334
import org.elasticsearch.hadoop.util.ObjectUtils
3435
import org.elasticsearch.spark.cfg.SparkSettingsManager
3536
import org.elasticsearch.hadoop.rest.InitializationUtils
@@ -74,6 +75,7 @@ object EsSparkSQL {
7475
esCfg.merge(cfg.asJava)
7576

7677
// Need to discover es version before checking index existence
78+
InitializationUtils.setUserProviderIfNotSet(esCfg, classOf[HadoopUserProvider], LOG)
7779
InitializationUtils.discoverClusterInfo(esCfg, LOG)
7880
InitializationUtils.checkIdForOperation(esCfg)
7981
InitializationUtils.checkIndexExistence(esCfg)

spark/sql-20/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ private[sql] class DefaultSource extends RelationProvider with SchemaRelationPro
149149
s"Cannot continue with [$outputMode].")
150150
}
151151

152+
InitializationUtils.setUserProviderIfNotSet(jobSettings, classOf[HadoopUserProvider], LogFactory.getLog(classOf[DefaultSource]))
152153
InitializationUtils.discoverClusterInfo(jobSettings, LogFactory.getLog(classOf[DefaultSource]))
153154
InitializationUtils.checkIdForOperation(jobSettings)
154155
InitializationUtils.checkIndexExistence(jobSettings)
@@ -229,6 +230,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
229230

230231
@transient lazy val cfg = {
231232
val conf = new SparkSettingsManager().load(sqlContext.sparkContext.getConf).merge(parameters.asJava)
233+
InitializationUtils.setUserProviderIfNotSet(conf, classOf[HadoopUserProvider], LogFactory.getLog(classOf[ElasticsearchRelation]))
232234
InitializationUtils.discoverClusterInfo(conf, LogFactory.getLog(classOf[ElasticsearchRelation]))
233235
conf
234236
}
@@ -611,10 +613,10 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
611613

612614
// perform a scan-scroll delete
613615
val cfgCopy = cfg.copy()
616+
InitializationUtils.setUserProviderIfNotSet(cfgCopy, classOf[HadoopUserProvider], null)
614617
InitializationUtils.discoverClusterInfo(cfgCopy, Utils.LOGGER)
615618
InitializationUtils.setValueWriterIfNotSet(cfgCopy, classOf[JdkValueWriter], null)
616619
InitializationUtils.setFieldExtractorIfNotSet(cfgCopy, classOf[ConstantFieldExtractor], null) //throw away extractor
617-
InitializationUtils.setUserProviderIfNotSet(cfgCopy, classOf[HadoopUserProvider], null)
618620
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_FLUSH_MANUAL, "false")
619621
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, "1000")
620622
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_BYTES, "1mb")

spark/sql-20/src/main/scala/org/elasticsearch/spark/sql/EsSparkSQL.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_QUERY
2828
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_READ
2929
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_WRITE
3030
import org.elasticsearch.hadoop.cfg.PropertiesSettings
31+
import org.elasticsearch.hadoop.mr.security.HadoopUserProvider
3132
import org.elasticsearch.hadoop.rest.InitializationUtils
3233
import org.elasticsearch.hadoop.util.ObjectUtils
3334
import org.elasticsearch.spark.cfg.SparkSettingsManager
@@ -94,6 +95,7 @@ object EsSparkSQL {
9495
esCfg.merge(cfg.asJava)
9596

9697
// Need to discover ES Version before checking index existence
98+
InitializationUtils.setUserProviderIfNotSet(esCfg, classOf[HadoopUserProvider], LOG)
9799
InitializationUtils.discoverClusterInfo(esCfg, LOG)
98100
InitializationUtils.checkIdForOperation(esCfg)
99101
InitializationUtils.checkIndexExistence(esCfg)

spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ private[sql] class DefaultSource extends RelationProvider with SchemaRelationPro
149149
s"Cannot continue with [$outputMode].")
150150
}
151151

152+
InitializationUtils.setUserProviderIfNotSet(jobSettings, classOf[HadoopUserProvider], LogFactory.getLog(classOf[DefaultSource]))
152153
InitializationUtils.discoverClusterInfo(jobSettings, LogFactory.getLog(classOf[DefaultSource]))
153154
InitializationUtils.checkIdForOperation(jobSettings)
154155
InitializationUtils.checkIndexExistence(jobSettings)
@@ -229,6 +230,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
229230

230231
@transient lazy val cfg = {
231232
val conf = new SparkSettingsManager().load(sqlContext.sparkContext.getConf).merge(parameters.asJava)
233+
InitializationUtils.setUserProviderIfNotSet(conf, classOf[HadoopUserProvider], LogFactory.getLog(classOf[ElasticsearchRelation]))
232234
InitializationUtils.discoverClusterInfo(conf, LogFactory.getLog(classOf[ElasticsearchRelation]))
233235
conf
234236
}
@@ -611,10 +613,10 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
611613

612614
// perform a scan-scroll delete
613615
val cfgCopy = cfg.copy()
616+
InitializationUtils.setUserProviderIfNotSet(cfgCopy, classOf[HadoopUserProvider], null)
614617
InitializationUtils.discoverClusterInfo(cfgCopy, Utils.LOGGER)
615618
InitializationUtils.setValueWriterIfNotSet(cfgCopy, classOf[JdkValueWriter], null)
616619
InitializationUtils.setFieldExtractorIfNotSet(cfgCopy, classOf[ConstantFieldExtractor], null) //throw away extractor
617-
InitializationUtils.setUserProviderIfNotSet(cfgCopy, classOf[HadoopUserProvider], null)
618620
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_FLUSH_MANUAL, "false")
619621
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, "1000")
620622
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_BYTES, "1mb")

spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/EsSparkSQL.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_QUERY
2828
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_READ
2929
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_WRITE
3030
import org.elasticsearch.hadoop.cfg.PropertiesSettings
31+
import org.elasticsearch.hadoop.mr.security.HadoopUserProvider
3132
import org.elasticsearch.hadoop.rest.InitializationUtils
3233
import org.elasticsearch.hadoop.util.ObjectUtils
3334
import org.elasticsearch.spark.cfg.SparkSettingsManager
@@ -94,6 +95,7 @@ object EsSparkSQL {
9495
esCfg.merge(cfg.asJava)
9596

9697
// Need to discover ES Version before checking index existence
98+
InitializationUtils.setUserProviderIfNotSet(esCfg, classOf[HadoopUserProvider], LOG)
9799
InitializationUtils.discoverClusterInfo(esCfg, LOG)
98100
InitializationUtils.checkIdForOperation(esCfg)
99101
InitializationUtils.checkIndexExistence(esCfg)

0 commit comments

Comments
 (0)