Skip to content

feat: Implement Fs2KafkaModule #246

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ lazy val root = project
example,
flyway,
flywayPureConfig,
fs2Kafka,
fs2KafkaPureConfig,
grpcServer,
grpcServerMicrometer,
grpcServerPureConfig,
Expand Down Expand Up @@ -167,6 +169,27 @@ lazy val flywayPureConfig = project
libraryDependencies += Dependencies.pureConfig
)

lazy val fs2Kafka = project
.in(file("fs2-kafka"))
.settings(BuildSettings.common)
.settings(
name := "sst-fs2-kafka",
libraryDependencies ++= Seq(
Dependencies.fs2Kafka,
Dependencies.testContainersScalaScalaTest % Test,
Dependencies.testContainersScalaKafka % Test
)
)

lazy val fs2KafkaPureConfig = project
.in(file("fs2-kafka-pureconfig"))
.dependsOn(fs2Kafka)
.settings(BuildSettings.common)
.settings(
name := "sst-fs2-kafka-pureconfig",
libraryDependencies += Dependencies.pureConfig
)

lazy val grpcServer = project
.in(file("grpc-server"))
.settings(BuildSettings.common)
Expand Down Expand Up @@ -423,6 +446,7 @@ lazy val site = project
example,
flyway,
flywayPureConfig,
fs2Kafka,
http4sClientBlazePureConfig,
http4sClientMonixCatnap,
monixCatnapPureConfig,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.avast.sst.fs2kafka.pureconfig

import cats.syntax.either._
import com.avast.sst.fs2kafka.{ConsumerConfig, ProducerConfig}
import fs2.kafka.{Acks, AutoOffsetReset, CommitRecovery, IsolationLevel}
import pureconfig.ConfigReader
import pureconfig.error.CannotConvert
import pureconfig.generic.ProductHint
import pureconfig.generic.semiauto.deriveReader

trait ConfigReaders {

implicit protected def hint[T]: ProductHint[T] = ProductHint.default

implicit val fs2KafkaCommitRecoveryConfigReader: ConfigReader[CommitRecovery] = ConfigReader[String].emap {
case s if s.toLowerCase() == "default" => CommitRecovery.Default.asRight
case s if s.toLowerCase() == "none" => CommitRecovery.None.asRight
case value => CannotConvert(value, "CommitRecovery", "default|none").asLeft
}

implicit val fs2KafkaAutoOffsetResetConfigReader: ConfigReader[AutoOffsetReset] = ConfigReader[String].emap {
case s if s.toLowerCase() == "earliest" => AutoOffsetReset.Earliest.asRight
case s if s.toLowerCase() == "latest" => AutoOffsetReset.Latest.asRight
case s if s.toLowerCase() == "none" => AutoOffsetReset.None.asRight
case value => CannotConvert(value, "AutoOffsetReset", "earliest|latest|none").asLeft
}

implicit val fs2KafkaIsolationLevelConfigReader: ConfigReader[IsolationLevel] = ConfigReader[String].emap {
case s if s.toLowerCase() == "read_committed" => IsolationLevel.ReadCommitted.asRight
case s if s.toLowerCase() == "read_uncommitted" => IsolationLevel.ReadUncommitted.asRight
case value => CannotConvert(value, "IsolationLevel", "read_committed|read_uncommitted").asLeft
}

implicit val fs2KafkaAcksConfigReader: ConfigReader[Acks] = ConfigReader[String].emap {
case s if s.toLowerCase() == "0" => Acks.Zero.asRight
case s if s.toLowerCase() == "1" => Acks.One.asRight
case s if s.toLowerCase() == "all" => Acks.All.asRight
case value => CannotConvert(value, "Acks", "0|1|all").asLeft
}

implicit val fs2KafkaConsumerConfigReader: ConfigReader[ConsumerConfig] = deriveReader

implicit val fs2KafkaProducerConfigReader: ConfigReader[ProducerConfig] = deriveReader

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.avast.sst.fs2kafka.pureconfig

import pureconfig.ConfigFieldMapping
import pureconfig.generic.ProductHint

/** Contains [[pureconfig.ConfigReader]] instances with default "kebab-case" naming convention. */
object implicits extends ConfigReaders {

/** Contains [[pureconfig.ConfigReader]] instances with "kebab-case" naming convention.
*
* This is alias for the default `implicits._` import.
*/
object KebabCase extends ConfigReaders

/** Contains [[pureconfig.ConfigReader]] instances with "camelCase" naming convention. */
object CamelCase extends ConfigReaders {
implicit override protected def hint[T]: ProductHint[T] = ProductHint(ConfigFieldMapping(pureconfig.CamelCase, pureconfig.CamelCase))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.avast.sst.fs2kafka

import java.util.concurrent.TimeUnit.{MILLISECONDS, SECONDS}

import com.avast.sst.fs2kafka.ConsumerConfig._
import com.github.ghik.silencer.silent
import fs2.kafka.{AutoOffsetReset, CommitRecovery, IsolationLevel}
import org.apache.kafka.clients.consumer.{ConsumerConfig => ApacheConsumerConfig}

import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

@silent("dead code")
final case class ConsumerConfig(
bootstrapServers: List[String],
groupId: String,
groupInstanceId: Option[String] = None,
clientId: Option[String] = None,
clientRack: Option[String] = None,
autoOffsetReset: AutoOffsetReset = AutoOffsetReset.None,
enableAutoCommit: Boolean = false,
autoCommitInterval: FiniteDuration = defaultMillis(ApacheConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
allowAutoCreateTopics: Boolean = default(ApacheConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
closeTimeout: FiniteDuration = FiniteDuration(20, SECONDS),
commitRecovery: CommitRecovery = CommitRecovery.Default,
commitTimeout: FiniteDuration = FiniteDuration(15, SECONDS),
defaultApiTimeout: FiniteDuration = defaultMillis(ApacheConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG),
heartbeatInterval: FiniteDuration = defaultMillis(ApacheConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
isolationLevel: IsolationLevel = defaultIsolationLevel,
maxPrefetchBatches: Int = 2,
pollInterval: FiniteDuration = FiniteDuration(50, MILLISECONDS),
pollTimeout: FiniteDuration = FiniteDuration(50, MILLISECONDS),
maxPollInterval: FiniteDuration = defaultMillis(ApacheConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
maxPollRecords: Int = default(ApacheConsumerConfig.MAX_POLL_RECORDS_CONFIG),
requestTimeout: FiniteDuration = defaultMillis(ApacheConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
sessionTimeout: FiniteDuration = defaultMillis(ApacheConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
properties: Map[String, String] = Map.empty
)

object ConsumerConfig {

private val officialDefaults = ApacheConsumerConfig.configDef().defaultValues().asScala

private def default[A](key: String): A = officialDefaults(key).asInstanceOf[A]

private def defaultMillis(key: String): FiniteDuration = FiniteDuration(default[Int](key).toLong, MILLISECONDS)

private val defaultIsolationLevel = default[String](ApacheConsumerConfig.ISOLATION_LEVEL_CONFIG) match {
case "read_uncommitted" => IsolationLevel.ReadUncommitted
case "read_committed" => IsolationLevel.ReadCommitted
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.avast.sst.fs2kafka

import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Resource, Timer}
import fs2.kafka._

object Fs2KafkaModule {

def makeConsumer[F[_]: ConcurrentEffect: ContextShift: Timer, K: Deserializer[F, *], V: Deserializer[F, *]](
config: ConsumerConfig,
blocker: Option[Blocker] = None
): Resource[F, KafkaConsumer[F, K, V]] = {
def setOpt[A](maybeValue: Option[A])(
setter: (ConsumerSettings[F, K, V], A) => ConsumerSettings[F, K, V]
)(initial: ConsumerSettings[F, K, V]): ConsumerSettings[F, K, V] =
maybeValue match {
case Some(value) => setter(initial, value)
case None => initial
}

val settings = ConsumerSettings(implicitly[Deserializer[F, K]], implicitly[Deserializer[F, V]])
.withBootstrapServers(config.bootstrapServers.mkString(","))
.withGroupId(config.groupId)
.pipe(setOpt(config.groupInstanceId)(_.withGroupInstanceId(_)))
.pipe(setOpt(config.clientId)(_.withClientId(_)))
.pipe(setOpt(config.clientRack)(_.withClientRack(_)))
.withAutoOffsetReset(config.autoOffsetReset)
.withEnableAutoCommit(config.enableAutoCommit)
.withAutoCommitInterval(config.autoCommitInterval)
.withAllowAutoCreateTopics(config.allowAutoCreateTopics)
.withCloseTimeout(config.closeTimeout)
.withCommitRecovery(config.commitRecovery)
.withCommitTimeout(config.closeTimeout)
.withDefaultApiTimeout(config.defaultApiTimeout)
.withHeartbeatInterval(config.heartbeatInterval)
.withIsolationLevel(config.isolationLevel)
.withMaxPrefetchBatches(config.maxPrefetchBatches)
.withPollInterval(config.pollInterval)
.withPollTimeout(config.pollTimeout)
.withMaxPollInterval(config.maxPollInterval)
.withMaxPollRecords(config.maxPollRecords)
.withRequestTimeout(config.requestTimeout)
.withSessionTimeout(config.sessionTimeout)
.pipe(setOpt(blocker)(_.withBlocker(_)))
.withProperties(config.properties)

makeConsumer(settings)
}

def makeConsumer[F[_]: ConcurrentEffect: ContextShift: Timer, K, V](
settings: ConsumerSettings[F, K, V]
): Resource[F, KafkaConsumer[F, K, V]] = consumerResource[F].using(settings)

def makeProducer[F[_]: ConcurrentEffect: ContextShift, K: Serializer[F, *], V: Serializer[F, *]](
config: ProducerConfig,
blocker: Option[Blocker] = None
): Resource[F, KafkaProducer[F, K, V]] = {
def setOpt[A](maybeValue: Option[A])(
setter: (ProducerSettings[F, K, V], A) => ProducerSettings[F, K, V]
)(initial: ProducerSettings[F, K, V]): ProducerSettings[F, K, V] =
maybeValue match {
case Some(value) => setter(initial, value)
case None => initial
}

val settings = ProducerSettings(implicitly[Serializer[F, K]], implicitly[Serializer[F, V]])
.withBootstrapServers(config.bootstrapServers.mkString(","))
.pipe(setOpt(config.clientId)(_.withClientId(_)))
.withAcks(config.acks)
.withBatchSize(config.batchSize)
.withCloseTimeout(config.closeTimeout)
.withDeliveryTimeout(config.deliveryTimeout)
.withRequestTimeout(config.requestTimeout)
.withLinger(config.linger)
.withEnableIdempotence(config.enableIdempotence)
.withMaxInFlightRequestsPerConnection(config.maxInFlightRequestsPerConnection)
.withParallelism(config.parallelism)
.withRetries(config.retries)
.pipe(setOpt(blocker)(_.withBlocker(_)))
.withProperties(config.properties)

makeProducer(settings)
}

def makeProducer[F[_]: ConcurrentEffect: ContextShift, K, V](settings: ProducerSettings[F, K, V]): Resource[F, KafkaProducer[F, K, V]] =
producerResource[F].using(settings)

/** Copy of the same class from Scala 2.13 */
implicit private final class ChainingOps[A](private val self: A) extends AnyVal {
def pipe[B](f: A => B): B = f(self)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.avast.sst.fs2kafka

import java.util.concurrent.TimeUnit.{MILLISECONDS, SECONDS}

import com.avast.sst.fs2kafka.ProducerConfig._
import fs2.kafka.Acks
import org.apache.kafka.clients.producer.{ProducerConfig => ApacheProducerConfig}

import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

final case class ProducerConfig(
bootstrapServers: List[String],
clientId: Option[String] = None,
acks: Acks = defaultAcks,
batchSize: Int = default[Int](ApacheProducerConfig.BATCH_SIZE_CONFIG),
closeTimeout: FiniteDuration = FiniteDuration(60, SECONDS),
deliveryTimeout: FiniteDuration = defaultMillis(ApacheProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG),
requestTimeout: FiniteDuration = defaultMillis(ApacheProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
linger: FiniteDuration = defaultMillisLong(ApacheProducerConfig.LINGER_MS_CONFIG),
enableIdempotence: Boolean = default[Boolean](ApacheProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
maxInFlightRequestsPerConnection: Int = default[Int](ApacheProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
parallelism: Int = 100,
retries: Int = 0,
properties: Map[String, String] = Map.empty
)

object ProducerConfig {

private val officialDefaults = ApacheProducerConfig.configDef().defaultValues().asScala

private def default[A](key: String): A = officialDefaults(key).asInstanceOf[A]

private def defaultMillis(key: String): FiniteDuration = FiniteDuration(default[Int](key).toLong, MILLISECONDS)
private def defaultMillisLong(key: String): FiniteDuration = FiniteDuration(default[Long](key), MILLISECONDS)

private val defaultAcks = default[String](ApacheProducerConfig.ACKS_CONFIG) match {
case "all" => Acks.All
case "0" => Acks.Zero
case "1" => Acks.One
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.avast.sst.fs2kafka

import cats.effect.{IO, Resource}
import cats.syntax.flatMap._
import com.dimafeng.testcontainers.{ForAllTestContainer, KafkaContainer}
import fs2.kafka.{AutoOffsetReset, ProducerRecord, ProducerRecords}
import org.scalatest.funsuite.AsyncFunSuite

import scala.concurrent.ExecutionContext.Implicits.global

class Fs2KafkaModuleTest extends AsyncFunSuite with ForAllTestContainer {

override val container = KafkaContainer()

implicit private val cs = IO.contextShift(global)
implicit private val timer = IO.timer(global)

test("producer") {
val io = for {
producer <- Fs2KafkaModule.makeProducer[IO, String, String](ProducerConfig(List(container.bootstrapServers)))
consumer <- Fs2KafkaModule.makeConsumer[IO, String, String](
ConsumerConfig(List(container.bootstrapServers), groupId = "test", autoOffsetReset = AutoOffsetReset.Earliest)
)
_ <- Resource.liftF(consumer.subscribeTo("test"))
_ <- Resource.liftF(producer.produce(ProducerRecords.one(ProducerRecord("test", "key", "value"))).flatten)
event <- Resource.liftF(consumer.stream.head.compile.toList)
} yield assert(event.head.record.key === "key" && event.head.record.value === "value")

io.use(IO.pure).unsafeToFuture
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.avast.sst.fs2kafka

import org.scalatest.funsuite.AnyFunSuite

class KafkaConfigTest extends AnyFunSuite {

test("verify ConsumerConfig defaults") {
ConsumerConfig(List.empty, "group.id")
succeed
}

test("verify ProducerConfig defaults") {
ProducerConfig(List.empty)
succeed
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.grpc.{Server, ServerBuilder, ServerInterceptor, ServerServiceDefinitio

import scala.collection.immutable.Seq
import scala.concurrent.ExecutionContext

object GrpcServerModule {

/** Makes [[io.grpc.Server]] (Netty) initialized with the given config, services and interceptors.
Expand Down
3 changes: 3 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ object Dependencies {
val doobie = "org.tpolecat" %% "doobie-core" % Versions.doobie
val doobieHikari = "org.tpolecat" %% "doobie-hikari" % Versions.doobie
val flywayCore = "org.flywaydb" % "flyway-core" % "6.4.2"
val fs2Kafka = "com.github.fd4s" %% "fs2-kafka" % "1.0.0"
val grpcNettyShaded = "io.grpc" % "grpc-netty-shaded" % Versions.grpc
val grpcProtobuf = "io.grpc" % "grpc-protobuf" % Versions.grpc
val grpcStub = "io.grpc" % "grpc-stub" % Versions.grpc
Expand Down Expand Up @@ -34,6 +35,8 @@ object Dependencies {
val silencerLib = "com.github.ghik" % "silencer-lib" % Versions.silencer cross CrossVersion.full
val slf4jApi = "org.slf4j" % "slf4j-api" % "1.7.30"
val sslConfig = "com.typesafe" %% "ssl-config-core" % "0.4.2"
val testContainersScalaScalaTest = "com.dimafeng" %% "testcontainers-scala-scalatest" % "0.37.0"
val testContainersScalaKafka = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.37.0"
val zio = "dev.zio" %% "zio" % "1.0.0-RC19-2"
val zioInteropCats = "dev.zio" %% "zio-interop-cats" % "2.0.0.0-RC14"

Expand Down
Loading