Skip to content

Commit de332de

Browse files
committed
[KYUUBI #7078] Make data masking and row filter configurable
1 parent cad5a39 commit de332de

File tree

5 files changed

+109
-38
lines changed

5 files changed

+109
-38
lines changed

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/config/AuthzConfigurationChecker.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.kyuubi.plugin.spark.authz.rule.config
1919

20+
import org.apache.spark.authz.AuthzConf._
2021
import org.apache.spark.sql.SparkSession
2122
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2223
import org.apache.spark.sql.execution.command.SetCommand
@@ -28,11 +29,9 @@ import org.apache.kyuubi.plugin.spark.authz.AccessControlException
2829
*/
2930
case class AuthzConfigurationChecker(spark: SparkSession) extends (LogicalPlan => Unit) {
3031

31-
final val RESTRICT_LIST_KEY = "spark.kyuubi.conf.restricted.list"
32-
3332
private val restrictedConfList: Set[String] =
34-
Set(RESTRICT_LIST_KEY, "spark.sql.runSQLOnFiles", "spark.sql.extensions") ++
35-
spark.conf.getOption(RESTRICT_LIST_KEY).map(_.split(',').toSet).getOrElse(Set.empty)
33+
Set(CONF_RESTRICTED_LIST.key, "spark.sql.runSQLOnFiles", "spark.sql.extensions") ++
34+
confRestrictedList(spark.sparkContext.getConf).map(_.split(',').toSet).getOrElse(Set.empty)
3635

3736
override def apply(plan: LogicalPlan): Unit = plan match {
3837
case SetCommand(Some((

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/datamasking/RuleApplyDataMaskingStage0.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.kyuubi.plugin.spark.authz.rule.datamasking
1919

20+
import org.apache.spark.authz.AuthzConf.dataMaskingEnabled
2021
import org.apache.spark.sql.SparkSession
2122
import org.apache.spark.sql.catalyst.expressions.Alias
2223
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -45,15 +46,19 @@ import org.apache.kyuubi.plugin.spark.authz.serde._
4546
case class RuleApplyDataMaskingStage0(spark: SparkSession) extends RuleHelper {
4647

4748
override def apply(plan: LogicalPlan): LogicalPlan = {
48-
val newPlan = mapChildren(plan) {
49-
case p: DataMaskingStage0Marker => p
50-
case p: DataMaskingStage1Marker => p
51-
case scan if isKnownScan(scan) && scan.resolved =>
52-
val tables = getScanSpec(scan).tables(scan, spark)
53-
tables.headOption.map(applyMasking(scan, _)).getOrElse(scan)
54-
case other => apply(other)
49+
if (!dataMaskingEnabled(conf)) {
50+
plan
51+
} else {
52+
val newPlan = mapChildren(plan) {
53+
case p: DataMaskingStage0Marker => p
54+
case p: DataMaskingStage1Marker => p
55+
case scan if isKnownScan(scan) && scan.resolved =>
56+
val tables = getScanSpec(scan).tables(scan, spark)
57+
tables.headOption.map(applyMasking(scan, _)).getOrElse(scan)
58+
case other => apply(other)
59+
}
60+
newPlan
5561
}
56-
newPlan
5762
}
5863

5964
private def applyMasking(

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/datamasking/RuleApplyDataMaskingStage1.scala

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.kyuubi.plugin.spark.authz.rule.datamasking
1919

20+
import org.apache.spark.authz.AuthzConf.dataMaskingEnabled
2021
import org.apache.spark.sql.SparkSession
2122
import org.apache.spark.sql.catalyst.expressions.NamedExpression
2223
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
@@ -33,25 +34,28 @@ import org.apache.kyuubi.plugin.spark.authz.serde._
3334
case class RuleApplyDataMaskingStage1(spark: SparkSession) extends RuleHelper {
3435

3536
override def apply(plan: LogicalPlan): LogicalPlan = {
36-
37-
plan match {
38-
case marker0: DataMaskingStage0Marker => marker0
39-
case marker1: DataMaskingStage1Marker => marker1
40-
case cmd if isKnownTableCommand(cmd) =>
41-
val tableCommandSpec = getTableCommandSpec(cmd)
42-
val queries = tableCommandSpec.queries(cmd)
43-
cmd.mapChildren {
44-
case marker0: DataMaskingStage0Marker => marker0
45-
case marker1: DataMaskingStage1Marker => marker1
46-
case query if queries.contains(query) && query.resolved =>
47-
applyDataMasking(query)
48-
case o => o
49-
}
50-
case cmd: Command if cmd.childrenResolved =>
51-
cmd.mapChildren(applyDataMasking)
52-
case cmd: Command => cmd
53-
case other if other.resolved => applyDataMasking(other)
54-
case other => other
37+
if (!dataMaskingEnabled(conf)) {
38+
plan
39+
} else {
40+
plan match {
41+
case marker0: DataMaskingStage0Marker => marker0
42+
case marker1: DataMaskingStage1Marker => marker1
43+
case cmd if isKnownTableCommand(cmd) =>
44+
val tableCommandSpec = getTableCommandSpec(cmd)
45+
val queries = tableCommandSpec.queries(cmd)
46+
cmd.mapChildren {
47+
case marker0: DataMaskingStage0Marker => marker0
48+
case marker1: DataMaskingStage1Marker => marker1
49+
case query if queries.contains(query) && query.resolved =>
50+
applyDataMasking(query)
51+
case o => o
52+
}
53+
case cmd: Command if cmd.childrenResolved =>
54+
cmd.mapChildren(applyDataMasking)
55+
case cmd: Command => cmd
56+
case other if other.resolved => applyDataMasking(other)
57+
case other => other
58+
}
5559
}
5660
}
5761

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/rowfilter/RuleApplyRowFilter.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.kyuubi.plugin.spark.authz.rule.rowfilter
1919

20+
import org.apache.spark.authz.AuthzConf.rowFilterEnabled
2021
import org.apache.spark.sql.SparkSession
2122
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
2223

@@ -29,14 +30,18 @@ import org.apache.kyuubi.plugin.spark.authz.serde._
2930
case class RuleApplyRowFilter(spark: SparkSession) extends RuleHelper {
3031

3132
override def apply(plan: LogicalPlan): LogicalPlan = {
32-
val newPlan = mapChildren(plan) {
33-
case p: RowFilterMarker => p
34-
case scan if isKnownScan(scan) && scan.resolved =>
35-
val tables = getScanSpec(scan).tables(scan, spark)
36-
tables.headOption.map(applyFilter(scan, _)).getOrElse(scan)
37-
case other => apply(other)
33+
if (!rowFilterEnabled(conf)) {
34+
plan
35+
} else {
36+
val newPlan = mapChildren(plan) {
37+
case p: RowFilterMarker => p
38+
case scan if isKnownScan(scan) && scan.resolved =>
39+
val tables = getScanSpec(scan).tables(scan, spark)
40+
tables.headOption.map(applyFilter(scan, _)).getOrElse(scan)
41+
case other => apply(other)
42+
}
43+
newPlan
3844
}
39-
newPlan
4045
}
4146

4247
private def applyFilter(
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.authz
18+
19+
import org.apache.spark.SparkConf
20+
import org.apache.spark.internal.config.ConfigBuilder
21+
import org.apache.spark.sql.internal.SQLConf
22+
23+
object AuthzConf {
24+
25+
def confRestrictedList(conf: SparkConf): Option[String] = {
26+
conf.get(CONF_RESTRICTED_LIST)
27+
}
28+
29+
def dataMaskingEnabled(conf: SQLConf): Boolean = {
30+
conf.getConf(DATA_MASKING_ENABLED)
31+
}
32+
33+
def rowFilterEnabled(conf: SQLConf): Boolean = {
34+
conf.getConf(ROW_FILTER_ENABLED)
35+
}
36+
37+
val CONF_RESTRICTED_LIST =
38+
ConfigBuilder("spark.kyuubi.conf.restricted.list")
39+
.doc("The config key in the restricted list cannot set dynamic configuration via SET syntax.")
40+
.version("1.7.0")
41+
.stringConf
42+
.createOptional
43+
44+
val DATA_MASKING_ENABLED =
45+
ConfigBuilder("spark.sql.authz.dataMasking.enabled")
46+
.doc("")
47+
.version("1.11.0")
48+
.booleanConf
49+
.createWithDefault(true)
50+
51+
val ROW_FILTER_ENABLED =
52+
ConfigBuilder("spark.sql.authz.rowFilter.enabled")
53+
.doc("")
54+
.version("1.11.0")
55+
.booleanConf
56+
.createWithDefault(true)
57+
58+
}

0 commit comments

Comments
 (0)