Skip to content

[KYUUBI #7078] Make data masking and row filter configurable #7082

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kyuubi.plugin.spark.authz.rule.config

import org.apache.spark.authz.AuthzConf._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.SetCommand
Expand All @@ -28,11 +29,14 @@ import org.apache.kyuubi.plugin.spark.authz.AccessControlException
*/
case class AuthzConfigurationChecker(spark: SparkSession) extends (LogicalPlan => Unit) {

final val RESTRICT_LIST_KEY = "spark.kyuubi.conf.restricted.list"

private val restrictedConfList: Set[String] =
Set(RESTRICT_LIST_KEY, "spark.sql.runSQLOnFiles", "spark.sql.extensions") ++
spark.conf.getOption(RESTRICT_LIST_KEY).map(_.split(',').toSet).getOrElse(Set.empty)
Set(
CONF_RESTRICTED_LIST.key,
DATA_MASKING_ENABLED.key,
ROW_FILTER_ENABLED.key,
"spark.sql.runSQLOnFiles",
"spark.sql.extensions") ++
confRestrictedList(spark.sparkContext.getConf).map(_.split(',').toSet).getOrElse(Set.empty)

override def apply(plan: LogicalPlan): Unit = plan match {
case SetCommand(Some((
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kyuubi.plugin.spark.authz.rule.datamasking

import org.apache.spark.authz.AuthzConf.dataMaskingEnabled
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
Expand Down Expand Up @@ -45,15 +46,19 @@ import org.apache.kyuubi.plugin.spark.authz.serde._
case class RuleApplyDataMaskingStage0(spark: SparkSession) extends RuleHelper {

override def apply(plan: LogicalPlan): LogicalPlan = {
val newPlan = mapChildren(plan) {
case p: DataMaskingStage0Marker => p
case p: DataMaskingStage1Marker => p
case scan if isKnownScan(scan) && scan.resolved =>
val tables = getScanSpec(scan).tables(scan, spark)
tables.headOption.map(applyMasking(scan, _)).getOrElse(scan)
case other => apply(other)
if (!dataMaskingEnabled(conf)) {
plan
} else {
val newPlan = mapChildren(plan) {
case p: DataMaskingStage0Marker => p
case p: DataMaskingStage1Marker => p
case scan if isKnownScan(scan) && scan.resolved =>
val tables = getScanSpec(scan).tables(scan, spark)
tables.headOption.map(applyMasking(scan, _)).getOrElse(scan)
case other => apply(other)
}
newPlan
}
newPlan
}

private def applyMasking(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kyuubi.plugin.spark.authz.rule.datamasking

import org.apache.spark.authz.AuthzConf.dataMaskingEnabled
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
Expand All @@ -33,25 +34,28 @@ import org.apache.kyuubi.plugin.spark.authz.serde._
case class RuleApplyDataMaskingStage1(spark: SparkSession) extends RuleHelper {

override def apply(plan: LogicalPlan): LogicalPlan = {

plan match {
case marker0: DataMaskingStage0Marker => marker0
case marker1: DataMaskingStage1Marker => marker1
case cmd if isKnownTableCommand(cmd) =>
val tableCommandSpec = getTableCommandSpec(cmd)
val queries = tableCommandSpec.queries(cmd)
cmd.mapChildren {
case marker0: DataMaskingStage0Marker => marker0
case marker1: DataMaskingStage1Marker => marker1
case query if queries.contains(query) && query.resolved =>
applyDataMasking(query)
case o => o
}
case cmd: Command if cmd.childrenResolved =>
cmd.mapChildren(applyDataMasking)
case cmd: Command => cmd
case other if other.resolved => applyDataMasking(other)
case other => other
if (!dataMaskingEnabled(conf)) {
plan
} else {
plan match {
case marker0: DataMaskingStage0Marker => marker0
case marker1: DataMaskingStage1Marker => marker1
case cmd if isKnownTableCommand(cmd) =>
val tableCommandSpec = getTableCommandSpec(cmd)
val queries = tableCommandSpec.queries(cmd)
cmd.mapChildren {
case marker0: DataMaskingStage0Marker => marker0
case marker1: DataMaskingStage1Marker => marker1
case query if queries.contains(query) && query.resolved =>
applyDataMasking(query)
case o => o
}
case cmd: Command if cmd.childrenResolved =>
cmd.mapChildren(applyDataMasking)
case cmd: Command => cmd
case other if other.resolved => applyDataMasking(other)
case other => other
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kyuubi.plugin.spark.authz.rule.rowfilter

import org.apache.spark.authz.AuthzConf.rowFilterEnabled
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}

Expand All @@ -29,14 +30,18 @@ import org.apache.kyuubi.plugin.spark.authz.serde._
case class RuleApplyRowFilter(spark: SparkSession) extends RuleHelper {

override def apply(plan: LogicalPlan): LogicalPlan = {
val newPlan = mapChildren(plan) {
case p: RowFilterMarker => p
case scan if isKnownScan(scan) && scan.resolved =>
val tables = getScanSpec(scan).tables(scan, spark)
tables.headOption.map(applyFilter(scan, _)).getOrElse(scan)
case other => apply(other)
if (!rowFilterEnabled(conf)) {
plan
} else {
val newPlan = mapChildren(plan) {
case p: RowFilterMarker => p
case scan if isKnownScan(scan) && scan.resolved =>
val tables = getScanSpec(scan).tables(scan, spark)
tables.headOption.map(applyFilter(scan, _)).getOrElse(scan)
case other => apply(other)
}
newPlan
}
newPlan
}

private def applyFilter(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.authz

import org.apache.spark.SparkConf
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.buildConf

object AuthzConf {
val CONF_RESTRICTED_LIST =
ConfigBuilder("spark.kyuubi.conf.restricted.list")
.doc("The config key in the restricted list cannot set dynamic configuration via SET syntax.")
.version("1.7.0")
.stringConf
.createOptional

val DATA_MASKING_ENABLED =
buildConf("spark.sql.authz.dataMasking.enabled")
.doc("Whether to enable data masking rule for authz plugin.")
.version("1.11.0")
.booleanConf
.createWithDefault(true)

val ROW_FILTER_ENABLED =
buildConf("spark.sql.authz.rowFilter.enabled")
.doc("Whether to enable row filter rule for authz plugin.")
.version("1.11.0")
.booleanConf
.createWithDefault(true)

def confRestrictedList(conf: SparkConf): Option[String] = {
conf.get(CONF_RESTRICTED_LIST)
}

def dataMaskingEnabled(conf: SQLConf): Boolean = {
conf.getConf(DATA_MASKING_ENABLED)
}

def rowFilterEnabled(conf: SQLConf): Boolean = {
conf.getConf(ROW_FILTER_ENABLED)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import scala.util.Try

// scalastyle:off
import org.apache.commons.codec.digest.DigestUtils.md5Hex
import org.apache.spark.authz.AuthzConf
import org.apache.spark.authz.AuthzConf.DATA_MASKING_ENABLED
import org.apache.spark.sql.{Row, SparkSessionExtensions}
import org.apache.spark.sql.internal.SQLConf
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite

Expand Down Expand Up @@ -85,6 +88,17 @@ trait DataMaskingTestBase extends AnyFunSuite with SparkSessionProvider with Bef
super.afterAll()
}

private def withEnabledDataMasking(enabled: Boolean)(f: => Unit): Unit = {
val conf = SQLConf.get
val oldValue = AuthzConf.dataMaskingEnabled(conf)
try {
conf.setConf(DATA_MASKING_ENABLED, enabled)
f
} finally {
conf.setConf(DATA_MASKING_ENABLED, oldValue)
}
}

test("simple query with a user doesn't have mask rules") {
checkAnswer(
kent,
Expand All @@ -93,17 +107,24 @@ trait DataMaskingTestBase extends AnyFunSuite with SparkSessionProvider with Bef
}

test("simple query with a user has mask rules") {
val result =
Seq(Row(md5Hex("1"), "xxxxx", "worlx", Timestamp.valueOf("2018-01-01 00:00:00"), "Xorld"))
checkAnswer(
bob,
"SELECT value1, value2, value3, value4, value5 FROM default.src " +
"where key = 1",
result)
checkAnswer(
bob,
"SELECT value1 as key, value2, value3, value4, value5 FROM default.src where key = 1",
result)
Seq(true, false).foreach { enabled =>
withEnabledDataMasking(enabled) {
val result: Seq[Row] = if (enabled) {
Seq(Row(md5Hex("1"), "xxxxx", "worlx", Timestamp.valueOf("2018-01-01 00:00:00"), "Xorld"))
} else {
Seq(Row(1, "hello", "world", Timestamp.valueOf("2018-11-17 12:34:56"), "World"))
}
checkAnswer(
bob,
"SELECT value1, value2, value3, value4, value5 FROM default.src " +
"where key = 1",
result)
checkAnswer(
bob,
"SELECT value1 as key, value2, value3, value4, value5 FROM default.src where key = 1",
result)
}
}
}

test("star") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ package org.apache.kyuubi.plugin.spark.authz.ranger.rowfiltering
// scalastyle:off
import scala.util.Try

import org.apache.spark.authz.AuthzConf
import org.apache.spark.authz.AuthzConf.ROW_FILTER_ENABLED
import org.apache.spark.sql.{Row, SparkSessionExtensions}
import org.apache.spark.sql.internal.SQLConf
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite

Expand Down Expand Up @@ -57,19 +60,48 @@ trait RowFilteringTestBase extends AnyFunSuite with SparkSessionProvider with Be
super.afterAll()
}

private def withEnabledRowFilter(enabled: Boolean)(f: => Unit): Unit = {
val conf = SQLConf.get
val oldValue = AuthzConf.rowFilterEnabled(conf)
try {
conf.setConf(ROW_FILTER_ENABLED, enabled)
f
} finally {
conf.setConf(ROW_FILTER_ENABLED, oldValue)
}
}

test("user without row filtering rule") {
checkAnswer(
kent,
"SELECT key FROM default.src order order by key",
"SELECT key FROM default.src order by key",
Seq(Row(1), Row(20), Row(30)))
}

test("simple query projecting filtering column") {
checkAnswer(bob, "SELECT key FROM default.src", Seq(Row(1)))
Seq(true, false).foreach { enabled =>
withEnabledRowFilter(enabled) {
val result = if (enabled) {
Seq(Row(1))
} else {
Seq(Row(1), Row(20), Row(30))
}
checkAnswer(bob, "SELECT key FROM default.src order by key", result)
}
}
}

test("simple query projecting non filtering column") {
checkAnswer(bob, "SELECT value FROM default.src", Seq(Row(1)))
Seq(true, false).foreach { enabled =>
withEnabledRowFilter(enabled) {
val result = if (enabled) {
Seq(Row(1))
} else {
Seq(Row(1), Row(2), Row(3))
}
checkAnswer(bob, "SELECT value FROM default.src order by key", result)
}
}
}

test("simple query projecting non filtering column with udf max") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.kyuubi.plugin.spark.authz.rule

import org.apache.spark.SparkConf
import org.apache.spark.authz.AuthzConf.CONF_RESTRICTED_LIST
import org.apache.spark.sql.AnalysisException
import org.scalatest.BeforeAndAfterAll
// scalastyle:off
import org.scalatest.funsuite.AnyFunSuite
Expand All @@ -29,13 +32,16 @@ class AuthzConfigurationCheckerSuite extends AnyFunSuite with SparkSessionProvid
with BeforeAndAfterAll {

override protected val catalogImpl: String = "in-memory"

override protected val extraSparkConf: SparkConf = new SparkConf()
.set(CONF_RESTRICTED_LIST.key, "spark.sql.abc,spark.sql.xyz")

override def afterAll(): Unit = {
spark.stop()
super.afterAll()
}

test("apply spark configuration restriction rules") {
sql("set spark.kyuubi.conf.restricted.list=spark.sql.abc,spark.sql.xyz")
val extension = AuthzConfigurationChecker(spark)
val p1 = sql("set spark.sql.runSQLOnFiles=true").queryExecution.analyzed
intercept[AccessControlException](extension.apply(p1))
Expand All @@ -47,8 +53,11 @@ class AuthzConfigurationCheckerSuite extends AnyFunSuite with SparkSessionProvid
intercept[AccessControlException](extension.apply(p4))
val p5 = sql("set spark.sql.xyz=abc").queryExecution.analyzed
intercept[AccessControlException](extension.apply(p5))
val p6 = sql("set spark.kyuubi.conf.restricted.list=123").queryExecution.analyzed
intercept[AccessControlException](extension.apply(p6))
val e = intercept[AnalysisException] {
sql("set spark.kyuubi.conf.restricted.list=123")
}
assert(e.getMessage.contains("Cannot modify the value of") && e.getMessage.contains(
CONF_RESTRICTED_LIST.key))
val p7 = sql("set spark.sql.efg=hijk").queryExecution.analyzed
extension.apply(p7)
val p8 = sql(
Expand Down
Loading