Skip to content

Commit 6db07e0

Browse files
authored
Merge pull request #112 from phile314/move_autocommit
Move auto-commit from subscription to consumer
2 parents d84bf89 + a17aa35 commit 6db07e0

File tree

2 files changed

+9
-9
lines changed

2 files changed

+9
-9
lines changed

src/Kafka/Consumer/ConsumerProperties.hs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ module Kafka.Consumer.ConsumerProperties
44
( ConsumerProperties(..)
55
, CallbackMode(..)
66
, brokersList
7+
, autoCommit
78
, noAutoCommit
89
, noAutoOffsetStore
910
, groupId
@@ -29,7 +30,7 @@ import Data.Text (Text)
2930
import qualified Data.Text as Text
3031
import Kafka.Consumer.Types (ConsumerGroupId (..))
3132
import Kafka.Internal.Setup (KafkaConf (..))
32-
import Kafka.Types (BrokerAddress (..), ClientId (..), KafkaCompressionCodec (..), KafkaDebug (..), KafkaLogLevel (..), kafkaCompressionCodecToText, kafkaDebugToText)
33+
import Kafka.Types (BrokerAddress (..), ClientId (..), KafkaCompressionCodec (..), KafkaDebug (..), KafkaLogLevel (..), kafkaCompressionCodecToText, kafkaDebugToText, Millis(..))
3334

3435
import Kafka.Consumer.Callbacks as X
3536

@@ -65,6 +66,13 @@ brokersList bs =
6566
let bs' = Text.intercalate "," ((\(BrokerAddress x) -> x) <$> bs)
6667
in extraProps $ M.fromList [("bootstrap.servers", bs')]
6768

69+
autoCommit :: Millis -> ConsumerProperties
70+
autoCommit (Millis ms) = extraProps $
71+
M.fromList
72+
[ ("enable.auto.commit", "true")
73+
, ("auto.commit.interval.ms", Text.pack $ show ms)
74+
]
75+
6876
-- | Disables auto commit for the consumer
6977
noAutoCommit :: ConsumerProperties
7078
noAutoCommit =

src/Kafka/Consumer/Subscription.hs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ module Kafka.Consumer.Subscription
44
( Subscription(..)
55
, topics
66
, offsetReset
7-
, autoCommit
87
, extraSubscriptionProps
98
)
109
where
@@ -44,12 +43,5 @@ offsetReset o =
4443
Latest -> "latest"
4544
in Subscription (Set.empty) (M.fromList [("auto.offset.reset", o')])
4645

47-
autoCommit :: Millis -> Subscription
48-
autoCommit (Millis ms) = Subscription (Set.empty) $
49-
M.fromList
50-
[ ("enable.auto.commit", "true")
51-
, ("auto.commit.interval.ms", Text.pack $ show ms)
52-
]
53-
5446
extraSubscriptionProps :: Map Text Text -> Subscription
5547
extraSubscriptionProps = Subscription (Set.empty)

0 commit comments

Comments
 (0)