Skip to content

Deprecate Spark 2.x #2305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/src/reference/asciidoc/appendix/breaking.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ For clarity, we always list any breaking changes at the top of the
//NOTE: The notable-breaking-changes tagged regions are re-used in the
//Installation and Upgrade Guide

=== Deprecations in 8.18

The following functionality has been deprecated in {eh} 8.18 and will be removed
in a future version. While this won’t have an immediate impact on your
applications, we strongly encourage you take the described steps to update your
code after upgrading to 8.18.

==== Spark 2.x support is deprecated

Spark 2.x is no longer maintained. Spark 3 is still supported.

[[breaking-changes-8.9]]
=== Breaking Changes in 8.9

Expand Down
2 changes: 2 additions & 0 deletions docs/src/reference/asciidoc/core/intro/requirements.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ Hive version {hv-v}
[[requirements-spark]]
=== Apache Spark

deprecated::[9.0,Support for Spark 2.x in {eh} is deprecated.]

Spark 2.0 or higher. We recommend using the latest release of Spark (currently {sp-v}). As {eh} provides native integration (which is recommended) with {sp}, it does not matter what binary one is using.
The same applies when using the Hadoop layer to integrate the two as {eh} supports the majority of
Hadoop distributions out there.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -854,10 +854,10 @@ class AbstractScalaEsScalaSpark(prefix: String, readMetadata: jl.Boolean) extend
val target = resource(index, typename, version)

val rawCore = List( Map("colint" -> 1, "colstr" -> "s"),
Map("colint" -> null, "colstr" -> null) )
Map("colint" -> 9, "colstr" -> null) )
sc.parallelize(rawCore, 1).saveToEs(target)
val qjson =
"""{"query":{"range":{"colint":{"from":null,"to":"9","include_lower":true,"include_upper":true}}}}"""
"""{"query":{"range":{"colint":{"lte":"9"}}}}"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this was some change in range query syntax in Elasticsearch, not related to deprecation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, this is to fix the broken test.


val esRDD = EsSpark.esRDD(sc, target, qjson)
val scRDD = sc.esRDD(target, qjson)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.JavaConverters.propertiesAsScalaMapConverter
import scala.collection.Map

@deprecated("Support for Apache Spark 2 is deprecated. Use Spark 3.")
object EsSparkSQL {

private val init = { ObjectUtils.loadClass("org.elasticsearch.spark.rdd.CompatUtils", classOf[ObjectUtils].getClassLoader) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_QUERY
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_READ
import org.elasticsearch.spark.sql.EsSparkSQL

@deprecated("Support for Apache Spark 2 is deprecated. Use Spark 3.")
object JavaEsSparkSQL {

// specify the return types to make sure the bytecode is generated properly (w/o any scala.collections in it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import scala.reflect.ClassTag

package object sql {

@deprecated("Support for Apache Spark 2 is deprecated. Use Spark 3.")
implicit def sqlContextFunctions(sc: SQLContext)= new SQLContextFunctions(sc)

@deprecated("Support for Apache Spark 2 is deprecated. Use Spark 3.")
class SQLContextFunctions(sc: SQLContext) extends Serializable {
def esDF() = EsSparkSQL.esDF(sc)
def esDF(resource: String) = EsSparkSQL.esDF(sc, resource)
Expand All @@ -42,16 +44,20 @@ package object sql {

// the sparkDatasetFunctions already takes care of this
// but older clients might still import it hence why it's still here
@deprecated("Support for Apache Spark 2 is deprecated. Use Spark 3.")
implicit def sparkDataFrameFunctions(df: DataFrame) = new SparkDataFrameFunctions(df)

@deprecated("Support for Apache Spark 2 is deprecated. Use Spark 3.")
class SparkDataFrameFunctions(df: DataFrame) extends Serializable {
def saveToEs(resource: String): Unit = { EsSparkSQL.saveToEs(df, resource) }
def saveToEs(resource: String, cfg: scala.collection.Map[String, String]): Unit = { EsSparkSQL.saveToEs(df, resource, cfg) }
def saveToEs(cfg: scala.collection.Map[String, String]): Unit = { EsSparkSQL.saveToEs(df, cfg) }
}


@deprecated("Support for Apache Spark 2 is deprecated. Use Spark 3.")
implicit def sparkSessionFunctions(ss: SparkSession)= new SparkSessionFunctions(ss)


@deprecated("Support for Apache Spark 2 is deprecated. Use Spark 3.")
class SparkSessionFunctions(ss: SparkSession) extends Serializable {
def esDF() = EsSparkSQL.esDF(ss)
def esDF(resource: String) = EsSparkSQL.esDF(ss, resource)
Expand All @@ -61,8 +67,10 @@ package object sql {
def esDF(resource: String, query: String, cfg: scala.collection.Map[String, String]) = EsSparkSQL.esDF(ss, resource, query, cfg)
}

@deprecated("Support for Apache Spark 2 is deprecated. Use Spark 3.")
implicit def sparkDatasetFunctions[T : ClassTag](ds: Dataset[T]) = new SparkDatasetFunctions(ds)


@deprecated("Support for Apache Spark 2 is deprecated. Use Spark 3.")
class SparkDatasetFunctions[T : ClassTag](ds: Dataset[T]) extends Serializable {
def saveToEs(resource: String): Unit = { EsSparkSQL.saveToEs(ds, resource) }
def saveToEs(resource: String, cfg: scala.collection.Map[String, String]): Unit = { EsSparkSQL.saveToEs(ds, resource, cfg) }
Expand Down