Skip to content

Commit a17aa35

Browse files
author
Philipp Hausmann
committed
Move auto-commit from subscription to consumer
Fixes a librdkafka deprecation warning: Configuration property auto.commit.enable is deprecated: [**LEGACY PROPERTY:** This property is used by the simple legacy consumer only. When using the high-level KafkaConsumer, the global `enable.auto.commit` property must be used instead]. If true, periodically commit offset of the last message handed to the application. This committed offset will be used when the process restarts to pick up where it left off. If false, the application will have to call `rd_kafka_offset_store()` to store an offset (optional). **NOTE:** There is currently no zookeeper integration, offsets will be written to broker or local file according to offset.store.method.
1 parent d84bf89 commit a17aa35

File tree

2 files changed

+9
-9
lines changed

2 files changed

+9
-9
lines changed

src/Kafka/Consumer/ConsumerProperties.hs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ module Kafka.Consumer.ConsumerProperties
44
( ConsumerProperties(..)
55
, CallbackMode(..)
66
, brokersList
7+
, autoCommit
78
, noAutoCommit
89
, noAutoOffsetStore
910
, groupId
@@ -29,7 +30,7 @@ import Data.Text (Text)
2930
import qualified Data.Text as Text
3031
import Kafka.Consumer.Types (ConsumerGroupId (..))
3132
import Kafka.Internal.Setup (KafkaConf (..))
32-
import Kafka.Types (BrokerAddress (..), ClientId (..), KafkaCompressionCodec (..), KafkaDebug (..), KafkaLogLevel (..), kafkaCompressionCodecToText, kafkaDebugToText)
33+
import Kafka.Types (BrokerAddress (..), ClientId (..), KafkaCompressionCodec (..), KafkaDebug (..), KafkaLogLevel (..), kafkaCompressionCodecToText, kafkaDebugToText, Millis(..))
3334

3435
import Kafka.Consumer.Callbacks as X
3536

@@ -65,6 +66,13 @@ brokersList bs =
6566
let bs' = Text.intercalate "," ((\(BrokerAddress x) -> x) <$> bs)
6667
in extraProps $ M.fromList [("bootstrap.servers", bs')]
6768

69+
autoCommit :: Millis -> ConsumerProperties
70+
autoCommit (Millis ms) = extraProps $
71+
M.fromList
72+
[ ("enable.auto.commit", "true")
73+
, ("auto.commit.interval.ms", Text.pack $ show ms)
74+
]
75+
6876
-- | Disables auto commit for the consumer
6977
noAutoCommit :: ConsumerProperties
7078
noAutoCommit =

src/Kafka/Consumer/Subscription.hs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ module Kafka.Consumer.Subscription
44
( Subscription(..)
55
, topics
66
, offsetReset
7-
, autoCommit
87
, extraSubscriptionProps
98
)
109
where
@@ -44,12 +43,5 @@ offsetReset o =
4443
Latest -> "latest"
4544
in Subscription (Set.empty) (M.fromList [("auto.offset.reset", o')])
4645

47-
autoCommit :: Millis -> Subscription
48-
autoCommit (Millis ms) = Subscription (Set.empty) $
49-
M.fromList
50-
[ ("enable.auto.commit", "true")
51-
, ("auto.commit.interval.ms", Text.pack $ show ms)
52-
]
53-
5446
extraSubscriptionProps :: Map Text Text -> Subscription
5547
extraSubscriptionProps = Subscription (Set.empty)

0 commit comments

Comments
 (0)