Skip to content

Commit 6255984

Browse files
authored
feat: Allow to set custom underlying consumer/producer in fs2-kafka (#247)
1 parent 8c218d9 commit 6255984

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

fs2-kafka/src/main/scala/com/avast/sst/fs2kafka/Fs2KafkaModule.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ object Fs2KafkaModule {
77

88
def makeConsumer[F[_]: ConcurrentEffect: ContextShift: Timer, K: Deserializer[F, *], V: Deserializer[F, *]](
99
config: ConsumerConfig,
10-
blocker: Option[Blocker] = None
10+
blocker: Option[Blocker] = None,
11+
createConsumer: Option[Map[String, String] => F[KafkaByteConsumer]] = None
1112
): Resource[F, KafkaConsumer[F, K, V]] = {
1213
def setOpt[A](maybeValue: Option[A])(
1314
setter: (ConsumerSettings[F, K, V], A) => ConsumerSettings[F, K, V]
@@ -42,6 +43,7 @@ object Fs2KafkaModule {
4243
.withSessionTimeout(config.sessionTimeout)
4344
.pipe(setOpt(blocker)(_.withBlocker(_)))
4445
.withProperties(config.properties)
46+
.pipe(setOpt(createConsumer)(_.withCreateConsumer(_)))
4547

4648
makeConsumer(settings)
4749
}
@@ -52,7 +54,8 @@ object Fs2KafkaModule {
5254

5355
def makeProducer[F[_]: ConcurrentEffect: ContextShift, K: Serializer[F, *], V: Serializer[F, *]](
5456
config: ProducerConfig,
55-
blocker: Option[Blocker] = None
57+
blocker: Option[Blocker] = None,
58+
createProducer: Option[Map[String, String] => F[KafkaByteProducer]] = None
5659
): Resource[F, KafkaProducer[F, K, V]] = {
5760
def setOpt[A](maybeValue: Option[A])(
5861
setter: (ProducerSettings[F, K, V], A) => ProducerSettings[F, K, V]
@@ -77,6 +80,7 @@ object Fs2KafkaModule {
7780
.withRetries(config.retries)
7881
.pipe(setOpt(blocker)(_.withBlocker(_)))
7982
.withProperties(config.properties)
83+
.pipe(setOpt(createProducer)(_.withCreateProducer(_)))
8084

8185
makeProducer(settings)
8286
}

0 commit comments

Comments
 (0)