Skip to content

Commit 078f75d

Browse files
authored
Seeting X-Opaque-ID header for all reads and writes for mapreduce and spark (#1770) (#1771)
This commit adds a X-Opaque-ID header when communicating with Elasticsearch from es-hadoop or es-spark. Relates #1182
1 parent 39c725a commit 078f75d

File tree

9 files changed

+67
-1
lines changed

9 files changed

+67
-1
lines changed

mr/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ itestJar {
129129
}
130130
}
131131

132+
tasks.named("test").configure {
133+
jvmArgs "--add-opens=java.base/java.io=ALL-UNNAMED" // Needed for IOUtils's BYTE_ARRAY_BUFFER reflection
134+
}
135+
132136
eclipse.classpath.file {
133137
whenMerged { cp ->
134138
// export all jars (to be used upstream by dependent projects) <-- for some reason Gradle removes all jars

mr/src/main/java/org/elasticsearch/hadoop/cfg/ConfigurationOptions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ public interface ConfigurationOptions {
284284
String ES_NET_SSL_CERT_ALLOW_SELF_SIGNED_DEFAULT = "false";
285285

286286
String ES_NET_HTTP_HEADER_PREFIX = "es.net.http.header.";
287+
String ES_NET_HTTP_HEADER_OPAQUE_ID = ES_NET_HTTP_HEADER_PREFIX + "X-Opaque-ID";
287288

288289
String ES_NET_HTTP_AUTH_USER = "es.net.http.auth.user";
289290
String ES_NET_HTTP_AUTH_PASS = "es.net.http.auth.pass";

mr/src/main/java/org/elasticsearch/hadoop/cfg/HadoopSettings.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
package org.elasticsearch.hadoop.cfg;
2020

2121
import java.io.InputStream;
22+
import java.util.Locale;
2223
import java.util.Properties;
2324

2425
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.mapreduce.JobContext;
2527
import org.elasticsearch.hadoop.mr.HadoopCfgUtils;
2628
import org.elasticsearch.hadoop.mr.HadoopIOUtils;
2729
import org.elasticsearch.hadoop.util.Assert;
@@ -33,6 +35,11 @@ public class HadoopSettings extends Settings {
3335
public HadoopSettings(Configuration cfg) {
3436
Assert.notNull(cfg, "Non-null properties expected");
3537
this.cfg = cfg;
38+
String jobName = cfg.get(JobContext.JOB_NAME, "");
39+
String user = cfg.get(JobContext.USER_NAME, "");
40+
String taskAttemptId = cfg.get(JobContext.TASK_ATTEMPT_ID, "");
41+
String opaqueId = String.format(Locale.ROOT, "[mapreduce] [%s] [%s] [%s]", user, jobName, taskAttemptId);
42+
setOpaqueId(opaqueId);
3643
}
3744

3845
@Override

mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.hadoop.util.unit.TimeValue;
3939

4040
import static org.elasticsearch.hadoop.cfg.ConfigurationOptions.*;
41+
import static org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_NET_HTTP_HEADER_OPAQUE_ID;
4142
import static org.elasticsearch.hadoop.cfg.InternalConfigurationOptions.*;
4243

4344
/**
@@ -765,5 +766,14 @@ public String save() {
765766
}
766767

767768
public abstract Properties asProperties();
769+
770+
public Settings setOpaqueId(String opaqueId) {
771+
setProperty(ES_NET_HTTP_HEADER_OPAQUE_ID, opaqueId);
772+
return this;
773+
}
774+
775+
public String getOpaqueId() {
776+
return getProperty(ES_NET_HTTP_HEADER_OPAQUE_ID);
777+
}
768778
}
769779

spark/core/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,10 @@ configurations.matching{ it.name.contains('CompilerPlugin') == false }.all { Con
143143
conf.exclude group: "org.mortbay.jetty"
144144
}
145145

146+
tasks.named("test").configure {
147+
jvmArgs "--add-opens=java.base/java.io=ALL-UNNAMED" // Needed for IOUtils's BYTE_ARRAY_BUFFER reflection
148+
}
149+
146150
// Set minimum compatibility and java home for compiler task
147151
tasks.withType(ScalaCompile) { ScalaCompile task ->
148152
task.sourceCompatibility = project.ext.minimumRuntimeVersion

spark/core/src/main/scala/org/elasticsearch/spark/cfg/SparkSettings.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
*/
1919
package org.elasticsearch.spark.cfg;
2020

21+
import java.io.IOException;
2122
import java.io.InputStream;
23+
import java.util.Locale;
2224
import java.util.Properties;
2325

26+
import org.apache.hadoop.security.UserGroupInformation;
2427
import org.apache.spark.SparkConf;
2528
import org.elasticsearch.hadoop.cfg.Settings;
2629
import org.elasticsearch.hadoop.util.Assert;
@@ -36,6 +39,18 @@ public class SparkSettings extends Settings {
3639
public SparkSettings(SparkConf cfg) {
3740
Assert.notNull(cfg, "non-null spark configuration expected");
3841
this.cfg = cfg;
42+
String user;
43+
try {
44+
user = System.getenv("SPARK_USER") == null ?
45+
UserGroupInformation.getCurrentUser().getShortUserName() :
46+
System.getenv("SPARK_USER");
47+
} catch (IOException e) {
48+
user = "";
49+
}
50+
String appName = cfg.get("app.name", cfg.get("spark.app.name", ""));
51+
String appId = cfg.get("spark.app.id", "");
52+
String opaqueId = String.format(Locale.ROOT, "[spark] [%s] [%s] [%s]", user, appName, appId);
53+
this.setOpaqueId(opaqueId);
3954
}
4055

4156
@Override

spark/core/src/main/scala/org/elasticsearch/spark/rdd/AbstractEsRDDIterator.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import org.elasticsearch.hadoop.cfg.Settings
2525
import org.elasticsearch.hadoop.rest.RestService
2626
import org.elasticsearch.hadoop.rest.PartitionDefinition
2727

28+
import java.util.Locale
29+
2830
private[spark] abstract class AbstractEsRDDIterator[T](
2931
val context: TaskContext,
3032
partition: PartitionDefinition)
@@ -45,7 +47,10 @@ private[spark] abstract class AbstractEsRDDIterator[T](
4547

4648
// initialize mapping/ scroll reader
4749
initReader(settings, log)
48-
50+
if (settings.getOpaqueId() != null && settings.getOpaqueId().contains("task attempt") == false) {
51+
settings.setOpaqueId(String.format(Locale.ROOT, "%s, stage %s, task attempt %s", settings.getOpaqueId(),
52+
context.stageId().toString, context.taskAttemptId.toString))
53+
}
4954
val readr = RestService.createReader(settings, partition, log)
5055
readr.scrollQuery()
5156
}

spark/core/src/main/scala/org/elasticsearch/spark/rdd/EsRDDWriter.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.elasticsearch.spark.serialization.ScalaMapFieldExtractor
3939
import org.elasticsearch.spark.serialization.ScalaMetadataExtractor
4040
import org.elasticsearch.spark.serialization.ScalaValueWriter
4141

42+
import java.util.Locale
4243
import scala.reflect.ClassTag
4344

4445

@@ -63,6 +64,10 @@ private[spark] class EsRDDWriter[T: ClassTag](val serializedSettings: String,
6364
lazy val metaExtractor = ObjectUtils.instantiate[MetadataExtractor](settings.getMappingMetadataExtractorClassName, settings)
6465

6566
def write(taskContext: TaskContext, data: Iterator[T]): Unit = {
67+
if (settings.getOpaqueId() != null && settings.getOpaqueId().contains("] [task attempt ") == false) {
68+
settings.setOpaqueId(String.format(Locale.ROOT, "%s [stage %s] [task attempt %s]", settings.getOpaqueId(),
69+
taskContext.stageId().toString, taskContext.taskAttemptId.toString))
70+
}
6671
val writer = RestService.createWriter(settings, taskContext.partitionId.toLong, -1, log)
6772

6873
val listener = new TaskCompletionListener {

spark/core/src/test/scala/org/elasticsearch/spark/cfg/SparkConfigTest.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@
1818
*/
1919
package org.elasticsearch.spark.cfg
2020

21+
import org.apache.hadoop.security.UserGroupInformation
2122
import org.elasticsearch.spark.serialization.ReflectionUtils._
2223
import org.junit.Test
2324
import org.junit.Assert._
2425
import org.hamcrest.Matchers._
2526
import org.apache.spark.SparkConf
2627
import org.elasticsearch.hadoop.cfg.PropertiesSettings
2728

29+
import java.util.Locale
30+
2831
class SparkConfigTest {
2932

3033
@Test
@@ -50,4 +53,16 @@ class SparkConfigTest {
5053
val props = new PropertiesSettings().load(settings.save())
5154
assertEquals("win", props.getProperty("type"))
5255
}
56+
57+
@Test
58+
def testOpaqueId(): Unit = {
59+
var cfg = new SparkConf()
60+
assertEquals(String.format(Locale.ROOT, "[spark] [%s] [] []", UserGroupInformation.getCurrentUser.getShortUserName),
61+
new SparkSettingsManager().load(cfg).getOpaqueId)
62+
val appName = "some app"
63+
val appdId = "some app id"
64+
cfg = new SparkConf().set("spark.app.name", appName).set("spark.app.id", appdId)
65+
assertEquals(String.format(Locale.ROOT, "[spark] [%s] [%s] [%s]", UserGroupInformation.getCurrentUser.getShortUserName, appName,
66+
appdId), new SparkSettingsManager().load(cfg).getOpaqueId)
67+
}
5368
}

0 commit comments

Comments
 (0)