Skip to content

Commit 9f2bade

Browse files
committed
generic dialect
1 parent 186cc69 commit 9f2bade

File tree

3 files changed

+47
-158
lines changed

3 files changed

+47
-158
lines changed

kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala

Lines changed: 8 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -419,32 +419,16 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
419419
}
420420
}
421421

422-
// Visible for testing.
423-
private[kyuubi] def insertKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = {
424-
val insertQuery =
425-
s"""
426-
|INSERT INTO $KUBERNETES_ENGINE_INFO_TABLE(
427-
|identifier,
428-
|context,
429-
|namespace,
430-
|pod_name,
431-
|pod_state,
432-
|container_state,
433-
|engine_id,
434-
|engine_name,
435-
|engine_state,
436-
|engine_error,
437-
|update_time
438-
|)
439-
|SELECT ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
440-
|WHERE NOT EXISTS (
441-
| SELECT 1 FROM $KUBERNETES_ENGINE_INFO_TABLE WHERE identifier = ?
442-
|)
443-
|""".stripMargin
422+
override def upsertKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = {
423+
val query = dialect.insertOrReplace(
424+
KUBERNETES_ENGINE_INFO_TABLE,
425+
KUBERNETES_ENGINE_INFO_COLUMNS,
426+
KUBERNETES_ENGINE_INFO_KEY_COLUMN,
427+
engineInfo.identifier)
444428
JdbcUtils.withConnection { connection =>
445429
execute(
446430
connection,
447-
insertQuery,
431+
query,
448432
engineInfo.identifier,
449433
engineInfo.context.orNull,
450434
engineInfo.namespace.orNull,
@@ -455,99 +439,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
455439
engineInfo.engineName,
456440
engineInfo.engineState,
457441
engineInfo.engineError.orNull,
458-
System.currentTimeMillis(),
459-
engineInfo.identifier)
460-
}
461-
}
462-
463-
// Visible for testing.
464-
private[kyuubi] def updateKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = {
465-
val queryBuilder = new StringBuilder
466-
val params = ListBuffer[Any]()
467-
468-
queryBuilder.append(s"UPDATE $KUBERNETES_ENGINE_INFO_TABLE")
469-
val setClauses = ListBuffer[String]()
470-
engineInfo.context.foreach { context =>
471-
setClauses += "context = ?"
472-
params += context
473-
}
474-
engineInfo.namespace.foreach { namespace =>
475-
setClauses += "namespace = ?"
476-
params += namespace
477-
}
478-
Option(engineInfo.podName).foreach { pod =>
479-
setClauses += "pod_name = ?"
480-
params += pod
481-
}
482-
Option(engineInfo.podState).foreach { podState =>
483-
setClauses += "pod_state = ?"
484-
params += podState
485-
}
486-
Option(engineInfo.containerState).foreach { containerState =>
487-
setClauses += "container_state = ?"
488-
params += containerState
489-
}
490-
Option(engineInfo.engineId).foreach { appId =>
491-
setClauses += "engine_id = ?"
492-
params += appId
493-
}
494-
Option(engineInfo.engineName).foreach { appName =>
495-
setClauses += "engine_name = ?"
496-
params += appName
497-
}
498-
Option(engineInfo.engineState).foreach { appState =>
499-
setClauses += "engine_state = ?"
500-
params += appState
501-
}
502-
engineInfo.engineError.foreach { appError =>
503-
setClauses += "engine_error = ?"
504-
params += appError
505-
}
506-
setClauses += "update_time = ?"
507-
params += System.currentTimeMillis()
508-
509-
queryBuilder.append(setClauses.mkString(" SET ", ", ", ""))
510-
queryBuilder.append(" WHERE identifier = ?")
511-
params += engineInfo.identifier
512-
513-
val query = queryBuilder.toString()
514-
JdbcUtils.withConnection { connection =>
515-
withUpdateCount(connection, query, params.toSeq: _*) { updateCount =>
516-
if (updateCount == 0) {
517-
throw new KyuubiException(
518-
s"Error updating kubernetes engine info for ${engineInfo.identifier} by SQL: $query, " +
519-
s"with params: ${params.mkString(", ")}")
520-
}
521-
}
522-
}
523-
524-
}
525-
526-
override def upsertKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = {
527-
dialect.insertOrReplace(
528-
KUBERNETES_ENGINE_INFO_TABLE,
529-
KUBERNETES_ENGINE_INFO_COLUMNS,
530-
KUBERNETES_ENGINE_INFO_KEY_COLUMN) match {
531-
case Some(query) =>
532-
JdbcUtils.withConnection { connection =>
533-
execute(
534-
connection,
535-
query,
536-
engineInfo.identifier,
537-
engineInfo.context.orNull,
538-
engineInfo.namespace.orNull,
539-
engineInfo.podName,
540-
engineInfo.podState,
541-
engineInfo.containerState,
542-
engineInfo.engineId,
543-
engineInfo.engineName,
544-
engineInfo.engineState,
545-
engineInfo.engineError.orNull,
546-
System.currentTimeMillis())
547-
}
548-
case None =>
549-
insertKubernetesEngineInfo(engineInfo)
550-
updateKubernetesEngineInfo(engineInfo)
442+
System.currentTimeMillis())
551443
}
552444
}
553445

kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,53 +22,68 @@ trait JdbcDatabaseDialect {
2222
def insertOrReplace(
2323
table: String,
2424
cols: Seq[String],
25-
keyCol: String): Option[String] = None
25+
keyCol: String,
26+
keyVal: String): String
2627
}
2728

2829
class GenericDatabaseDialect extends JdbcDatabaseDialect {
2930
override def limitClause(limit: Int, offset: Int): String = {
3031
s"LIMIT $limit OFFSET $offset"
3132
}
33+
34+
override def insertOrReplace(
35+
table: String,
36+
cols: Seq[String],
37+
keyCol: String,
38+
keyVal: String): String = {
39+
s"""
40+
|INSERT INTO $table (${cols.mkString(",")})
41+
|SELECT ${cols.map(_ => "?").mkString(",")}
42+
|WHERE NOT EXISTS (
43+
| SELECT 1 FROM $table WHERE $keyCol = '$keyVal')
44+
|)
45+
|""".stripMargin
46+
}
3247
}
3348

3449
class SQLiteDatabaseDialect extends GenericDatabaseDialect {
3550
override def insertOrReplace(
3651
table: String,
3752
cols: Seq[String],
38-
keyCol: String): Option[String] = {
39-
Some(
40-
s"""
41-
|INSERT OR REPLACE INTO $table (${cols.mkString(",")})
42-
|VALUES (${cols.map(_ => "?").mkString(",")})
43-
|""".stripMargin)
53+
keyCol: String,
54+
keyVal: String): String = {
55+
s"""
56+
|INSERT OR REPLACE INTO $table (${cols.mkString(",")})
57+
|VALUES (${cols.map(_ => "?").mkString(",")})
58+
|""".stripMargin
4459
}
4560
}
4661
class MySQLDatabaseDialect extends GenericDatabaseDialect {
4762
override def insertOrReplace(
4863
table: String,
4964
cols: Seq[String],
50-
keyCol: String): Option[String] = {
51-
Some(
52-
s"""
53-
|INSERT INTO $table (${cols.mkString(",")})
54-
|VALUES (${cols.map(_ => "?").mkString(",")}) AS new
55-
|ON DUPLICATE KEY UPDATE
56-
|${cols.filterNot(_ == keyCol).map(c => s"$c = new.$c").mkString(",")}
57-
|""".stripMargin)
65+
keyCol: String,
66+
keyVal: String): String = {
67+
s"""
68+
|INSERT INTO $table (${cols.mkString(",")})
69+
|VALUES (${cols.map(_ => "?").mkString(",")}) AS new
70+
|ON DUPLICATE KEY UPDATE
71+
|${cols.filterNot(_ == keyCol).map(c => s"$c = new.$c").mkString(",")}
72+
|""".stripMargin
5873
}
5974
}
6075
class PostgreSQLDatabaseDialect extends GenericDatabaseDialect {
6176
override def insertOrReplace(
6277
table: String,
6378
cols: Seq[String],
64-
keyCol: String): Option[String] = {
65-
Some(
66-
s"""
67-
|INSERT INTO $table (${cols.mkString(",")})
68-
|VALUES (${cols.map(_ => "?").mkString(",")})
69-
|ON CONFLICT ($keyCol)
70-
|DO UPDATE SET
71-
|${cols.filterNot(_ == keyCol).map(c => s"$c = EXCLUDED.$c").mkString(",")}
72-
|""".stripMargin)
79+
keyCol: String,
80+
keyVal: String): String = {
81+
s"""
82+
|INSERT INTO $table (${cols.mkString(",")})
83+
|VALUES (${cols.map(_ => "?").mkString(",")})
84+
|ON CONFLICT ($keyCol)
85+
|DO UPDATE SET
86+
|${cols.filterNot(_ == keyCol).map(c => s"$c = EXCLUDED.$c").mkString(",")}
87+
|""".stripMargin
7388
}
7489
}

kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -312,9 +312,6 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
312312
engineError = Some("appError2"))
313313
jdbcMetadataStore.upsertKubernetesEngineInfo(metadata3)
314314

315-
// test generic insert if not exist
316-
jdbcMetadataStore.insertKubernetesEngineInfo(metadata)
317-
318315
val metadata4 = jdbcMetadataStore.getKubernetesMetaEngineInfo(tag)
319316
assert(metadata4.identifier == metadata3.identifier)
320317
assert(metadata4.context == metadata3.context)
@@ -336,21 +333,6 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
336333
assert(applicationInfo.error == Some("appError2"))
337334
assert(applicationInfo.podName == Some("podName2"))
338335

339-
// test generic update
340-
jdbcMetadataStore.updateKubernetesEngineInfo(metadata)
341-
val metadata5 = jdbcMetadataStore.getKubernetesMetaEngineInfo(tag)
342-
assert(metadata5.identifier == metadata.identifier)
343-
assert(metadata5.context == metadata.context)
344-
assert(metadata5.namespace == metadata.namespace)
345-
assert(metadata5.podName == metadata.podName)
346-
assert(metadata5.podState == metadata.podState)
347-
assert(metadata5.containerState == metadata.containerState)
348-
assert(metadata5.engineId == metadata.engineId)
349-
assert(metadata5.engineName == metadata.engineName)
350-
assert(metadata5.engineState == metadata.engineState)
351-
assert(metadata5.engineError == metadata.engineError)
352-
assert(metadata5.updateTime > 0)
353-
354336
jdbcMetadataStore.cleanupKubernetesEngineInfoByIdentifier(tag)
355337
assert(jdbcMetadataStore.getKubernetesMetaEngineInfo(tag) == null)
356338
}

0 commit comments

Comments
 (0)