Skip to content

Commit 4ccd7bc

Browse files
authored
Merge pull request #140 from Sir4ur0n/doc/KafkaConsumer
doc(kafka): Add documentation
2 parents 9011a63 + 297d662 commit 4ccd7bc

File tree

5 files changed

+129
-21
lines changed

5 files changed

+129
-21
lines changed

src/Kafka/Consumer.hs

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,58 @@
11
{-# LANGUAGE LambdaCase #-}
22
{-# LANGUAGE OverloadedStrings #-}
33
{-# LANGUAGE TupleSections #-}
4+
5+
-----------------------------------------------------------------------------
6+
-- |
7+
-- Module to consume messages from Kafka topics.
8+
--
9+
-- Here's an example of code to consume messages from a topic:
10+
--
11+
-- @
12+
-- import Control.Exception (bracket)
13+
-- import Data.Monoid ((<>))
14+
-- import Kafka
15+
-- import Kafka.Consumer
16+
--
17+
-- -- Global consumer properties
18+
-- consumerProps :: 'ConsumerProperties'
19+
-- consumerProps = 'brokersList' ['BrokerAddress' "localhost:9092"]
20+
-- <> 'groupId' ('ConsumerGroupId' "consumer_example_group")
21+
-- <> 'noAutoCommit'
22+
-- <> 'logLevel' 'KafkaLogInfo'
23+
--
24+
-- -- Subscription to topics
25+
-- consumerSub :: 'Subscription'
26+
-- consumerSub = 'topics' ['TopicName' "kafka-client-example-topic"]
27+
-- <> 'offsetReset' 'Earliest'
28+
--
29+
-- -- Running an example
30+
-- runConsumerExample :: IO ()
31+
-- runConsumerExample = do
32+
-- res <- bracket mkConsumer clConsumer runHandler
33+
-- print res
34+
-- where
35+
-- mkConsumer = 'newConsumer' consumerProps consumerSub
36+
-- clConsumer (Left err) = return (Left err)
37+
-- clConsumer (Right kc) = (maybe (Right ()) Left) \<$\> 'closeConsumer' kc
38+
-- runHandler (Left err) = return (Left err)
39+
-- runHandler (Right kc) = processMessages kc
40+
--
41+
-- -- Example polling 10 times before stopping
42+
-- processMessages :: 'KafkaConsumer' -> IO (Either 'KafkaError' ())
43+
-- processMessages kafka = do
44+
-- mapM_ (\_ -> do
45+
-- msg1 <- 'pollMessage' kafka ('Timeout' 1000)
46+
-- putStrLn $ "Message: " <> show msg1
47+
-- err <- 'commitAllOffsets' 'OffsetCommit' kafka
48+
-- putStrLn $ "Offsets: " <> maybe "Committed." show err
49+
-- ) [0 .. 10]
50+
-- return $ Right ()
51+
-- @
52+
-----------------------------------------------------------------------------
453
module Kafka.Consumer
5-
( module X
54+
( KafkaConsumer
55+
, module X
656
, runConsumer
757
, newConsumer
858
, assign, assignment, subscription
@@ -14,7 +64,6 @@ module Kafka.Consumer
1464
, storeOffsets, storeOffsetMessage
1565
, closeConsumer
1666
-- ReExport Types
17-
, KafkaConsumer
1867
, RdKafkaRespErrT (..)
1968
)
2069
where
@@ -48,7 +97,7 @@ import Kafka.Types as X
4897

4998
-- | Runs high-level kafka consumer.
5099
-- A callback provided is expected to call 'pollMessage' when convenient.
51-
{-# DEPRECATED runConsumer "Use newConsumer/closeConsumer instead" #-}
100+
{-# DEPRECATED runConsumer "Use 'newConsumer'/'closeConsumer' instead" #-}
52101
runConsumer :: ConsumerProperties
53102
-> Subscription
54103
-> (KafkaConsumer -> IO (Either KafkaError a)) -- ^ A callback function to poll and handle messages
@@ -64,6 +113,7 @@ runConsumer cp sub f =
64113
runHandler (Left err) = return (Left err)
65114
runHandler (Right kc) = f kc
66115

116+
-- | Create a `KafkaConsumer`. This consumer must be correctly released using 'closeConsumer'.
67117
newConsumer :: MonadIO m
68118
=> ConsumerProperties
69119
-> Subscription
@@ -94,6 +144,7 @@ newConsumer props (Subscription ts tp) = liftIO $ do
94144
runConsumerLoop kafka (Just $ Timeout 100)) >> return (Right kafka)
95145
Just err -> closeConsumer kafka >> return (Left err)
96146

147+
-- | Polls a single message
97148
pollMessage :: MonadIO m
98149
=> KafkaConsumer
99150
-> Timeout -- ^ the timeout, in milliseconds
@@ -104,7 +155,7 @@ pollMessage c@(KafkaConsumer _ (KafkaConf _ qr _)) (Timeout ms) = liftIO $ do
104155
Nothing -> rdKafkaConsumerPoll (getRdKafka c) ms >>= fromMessagePtr
105156
Just q -> rdKafkaConsumeQueue q (fromIntegral ms) >>= fromMessagePtr
106157

107-
-- | Polls up to BatchSize messages.
158+
-- | Polls up to 'BatchSize' messages.
108159
-- Unlike 'pollMessage' this function does not return usual "timeout" errors.
109160
-- An empty batch is returned when there are no messages available.
110161
--
@@ -205,6 +256,7 @@ resumePartitions (KafkaConsumer (Kafka k) _) ps = liftIO $ do
205256
mapM_ (\(TopicName topicName, PartitionId partitionId) -> rdKafkaTopicPartitionListAdd pl (Text.unpack topicName) partitionId) ps
206257
KafkaResponseError <$> rdKafkaResumePartitions k pl
207258

259+
-- | Seek a particular offset for each provided 'TopicPartition'
208260
seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError)
209261
seek (KafkaConsumer (Kafka k) _) (Timeout timeout) tps = liftIO $
210262
either Just (const Nothing) <$> seekAll
@@ -244,7 +296,7 @@ position (KafkaConsumer (Kafka k) _) tps = liftIO $ do
244296
--
245297
-- Events will cause application provided callbacks to be called.
246298
--
247-
-- The \p timeout_ms argument specifies the maximum amount of time
299+
-- The 'Timeout' argument specifies the maximum amount of time
248300
-- (in milliseconds) that the call will block waiting for events.
249301
--
250302
-- This function is called on each 'pollMessage' and, if runtime allows
@@ -260,6 +312,8 @@ pollConsumerEvents k timeout =
260312
void . withCallbackPollEnabled k $ pollConsumerEvents' k timeout
261313

262314
-- | Closes the consumer.
315+
--
316+
-- See 'newConsumer'
263317
closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError)
264318
closeConsumer (KafkaConsumer (Kafka k) (KafkaConf _ qr statusVar)) = liftIO $
265319
-- because closing the consumer will raise callbacks,
@@ -280,7 +334,7 @@ newConsumerConf ConsumerProperties {cpProps = m, cpCallbacks = cbs} = do
280334

281335
-- | Subscribes to a given list of topics.
282336
--
283-
-- Wildcard (regex) topics are supported by the librdkafka assignor:
337+
-- Wildcard (regex) topics are supported by the /librdkafka/ assignor:
284338
-- any topic name in the topics list that is prefixed with @^@ will
285339
-- be regex-matched to the full list of topics in the cluster and matching
286340
-- topics will be added to the subscription list.

src/Kafka/Consumer/Subscription.hs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,18 @@ import Data.Text (Text)
1717
import Kafka.Consumer.Types (OffsetReset (..))
1818
import Kafka.Types (TopicName (..))
1919

20+
-- | A consumer subscription to a topic.
21+
--
22+
-- ==== __Examples__
23+
--
24+
-- Typically you don't call the constructor directly, but combine settings:
25+
--
26+
-- @
27+
-- consumerSub :: 'Subscription'
28+
-- consumerSub = 'topics' ['TopicName' "kafka-client-example-topic"]
29+
-- <> 'offsetReset' 'Earliest'
30+
-- <> 'extraSubscriptionProps' (fromList [("prop1", "value 1"), ("prop2", "value 2")])
31+
-- @
2032
data Subscription = Subscription (Set TopicName) (Map Text Text)
2133

2234
instance Sem.Semigroup Subscription where
@@ -32,15 +44,18 @@ instance Monoid Subscription where
3244
mappend = (Sem.<>)
3345
{-# INLINE mappend #-}
3446

47+
-- | Build a subscription by giving the list of topic names only
3548
topics :: [TopicName] -> Subscription
3649
topics ts = Subscription (Set.fromList ts) M.empty
3750

51+
-- | Build a subscription by giving the offset reset parameter only
3852
offsetReset :: OffsetReset -> Subscription
3953
offsetReset o =
4054
let o' = case o of
4155
Earliest -> "earliest"
4256
Latest -> "latest"
4357
in Subscription Set.empty (M.fromList [("auto.offset.reset", o')])
4458

59+
-- | Build a subscription by giving extra properties only
4560
extraSubscriptionProps :: Map Text Text -> Subscription
4661
extraSubscriptionProps = Subscription Set.empty

src/Kafka/Consumer/Types.hs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,17 @@ where
3030

3131
import Data.Bifoldable (Bifoldable (..))
3232
import Data.Bifunctor (Bifunctor (..))
33-
import Data.Bitraversable (Bitraversable (..), bimapM, bisequenceA)
33+
import Data.Bitraversable (Bitraversable (..), bimapM, bisequence)
3434
import Data.Int (Int64)
3535
import Data.Text (Text)
3636
import Data.Typeable (Typeable)
3737
import GHC.Generics (Generic)
3838
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..))
3939
import Kafka.Types (Millis (..), PartitionId (..), TopicName (..))
4040

41+
-- | The main type for Kafka consumption, used e.g. to poll and commit messages.
42+
--
43+
-- Its constructor is intentionally not exposed, instead, one should used 'newConsumer' to acquire such a value.
4144
data KafkaConsumer = KafkaConsumer
4245
{ kcKafkaPtr :: !Kafka
4346
, kcKafkaConf :: !KafkaConf
@@ -51,8 +54,17 @@ instance HasKafkaConf KafkaConsumer where
5154
getKafkaConf = kcKafkaConf
5255
{-# INLINE getKafkaConf #-}
5356

57+
-- | Consumer group ID. Different consumers with the same consumer group ID will get assigned different partitions of each subscribed topic.
58+
--
59+
-- See <https://kafka.apache.org/documentation/#group.id Kafka documentation on consumer group>
5460
newtype ConsumerGroupId = ConsumerGroupId { unConsumerGroupId :: Text } deriving (Show, Ord, Eq, Generic)
61+
62+
-- | A message offset in a partition
5563
newtype Offset = Offset { unOffset :: Int64 } deriving (Show, Eq, Ord, Read, Generic)
64+
65+
-- | Where to reset the offset when there is no initial offset in Kafka
66+
--
67+
-- See <https://kafka.apache.org/documentation/#auto.offset.reset Kafka documentation on offset reset>
5668
data OffsetReset = Earliest | Latest deriving (Show, Eq, Generic)
5769

5870
-- | A set of events which happen during the rebalancing process
@@ -67,6 +79,7 @@ data RebalanceEvent =
6779
| RebalanceRevoke [(TopicName, PartitionId)]
6880
deriving (Eq, Show, Generic)
6981

82+
-- | The partition offset
7083
data PartitionOffset =
7184
PartitionOffsetBeginning
7285
| PartitionOffsetEnd
@@ -75,11 +88,13 @@ data PartitionOffset =
7588
| PartitionOffsetInvalid
7689
deriving (Eq, Show, Generic)
7790

91+
-- | Partitions subscribed by a consumer
7892
data SubscribedPartitions
79-
= SubscribedPartitions [PartitionId]
80-
| SubscribedPartitionsAll
93+
= SubscribedPartitions [PartitionId] -- ^ Subscribe only to those partitions
94+
| SubscribedPartitionsAll -- ^ Subscribe to all partitions
8195
deriving (Show, Eq, Generic)
8296

97+
-- | Consumer record timestamp
8398
data Timestamp =
8499
CreateTime !Millis
85100
| LogAppendTime !Millis
@@ -119,8 +134,8 @@ data ConsumerRecord k v = ConsumerRecord
119134
, crPartition :: !PartitionId -- ^ Kafka partition this message was received from
120135
, crOffset :: !Offset -- ^ Offset within the 'crPartition' Kafka partition
121136
, crTimestamp :: !Timestamp -- ^ Message timestamp
122-
, crKey :: !k
123-
, crValue :: !v
137+
, crKey :: !k -- ^ Message key
138+
, crValue :: !v -- ^ Message value
124139
}
125140
deriving (Eq, Show, Read, Typeable, Generic)
126141

@@ -148,53 +163,56 @@ instance Bitraversable ConsumerRecord where
148163
bitraverse f g r = (\k v -> bimap (const k) (const v) r) <$> f (crKey r) <*> g (crValue r)
149164
{-# INLINE bitraverse #-}
150165

166+
{-# DEPRECATED crMapKey "Isn't concern of this library. Use 'first'" #-}
151167
crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v
152168
crMapKey = first
153169
{-# INLINE crMapKey #-}
154170

171+
{-# DEPRECATED crMapValue "Isn't concern of this library. Use 'second'" #-}
155172
crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v'
156173
crMapValue = second
157174
{-# INLINE crMapValue #-}
158175

176+
{-# DEPRECATED crMapKV "Isn't concern of this library. Use 'bimap'" #-}
159177
crMapKV :: (k -> k') -> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v'
160178
crMapKV = bimap
161179
{-# INLINE crMapKV #-}
162180

163-
{-# DEPRECATED sequenceFirst "Isn't concern of this library. Use 'bitraverse id pure'" #-}
181+
{-# DEPRECATED sequenceFirst "Isn't concern of this library. Use @'bitraverse' 'id' 'pure'@" #-}
164182
sequenceFirst :: (Bitraversable t, Applicative f) => t (f k) v -> f (t k v)
165183
sequenceFirst = bitraverse id pure
166184
{-# INLINE sequenceFirst #-}
167185

168-
{-# DEPRECATED traverseFirst "Isn't concern of this library. Use 'bitraverse f pure'" #-}
186+
{-# DEPRECATED traverseFirst "Isn't concern of this library. Use @'bitraverse' f 'pure'@" #-}
169187
traverseFirst :: (Bitraversable t, Applicative f)
170188
=> (k -> f k')
171189
-> t k v
172190
-> f (t k' v)
173191
traverseFirst f = bitraverse f pure
174192
{-# INLINE traverseFirst #-}
175193

176-
{-# DEPRECATED traverseFirstM "Isn't concern of this library. Use 'bitraverse id pure <$> bitraverse f pure r'" #-}
194+
{-# DEPRECATED traverseFirstM "Isn't concern of this library. Use @'bitraverse' 'id' 'pure' '<$>' 'bitraverse' f 'pure' r@" #-}
177195
traverseFirstM :: (Bitraversable t, Applicative f, Monad m)
178196
=> (k -> m (f k'))
179197
-> t k v
180198
-> m (f (t k' v))
181199
traverseFirstM f r = bitraverse id pure <$> bitraverse f pure r
182200
{-# INLINE traverseFirstM #-}
183201

184-
{-# DEPRECATED traverseM "Isn't concern of this library. Use 'sequenceA <$> traverse f r'" #-}
202+
{-# DEPRECATED traverseM "Isn't concern of this library. Use @'sequenceA' '<$>' 'traverse' f r@" #-}
185203
traverseM :: (Traversable t, Applicative f, Monad m)
186204
=> (v -> m (f v'))
187205
-> t v
188206
-> m (f (t v'))
189207
traverseM f r = sequenceA <$> traverse f r
190208
{-# INLINE traverseM #-}
191209

192-
{-# DEPRECATED bitraverseM "Isn't concern of this library. Use 'bisequenceA <$> bimapM f g r'" #-}
210+
{-# DEPRECATED bitraverseM "Isn't concern of this library. Use @'bisequenceA' '<$>' 'bimapM' f g r@" #-}
193211
bitraverseM :: (Bitraversable t, Applicative f, Monad m)
194212
=> (k -> m (f k'))
195213
-> (v -> m (f v'))
196214
-> t k v
197215
-> m (f (t k' v'))
198-
bitraverseM f g r = bisequenceA <$> bimapM f g r
216+
bitraverseM f g r = bisequence <$> bimapM f g r
199217
{-# INLINE bitraverseM #-}
200218

src/Kafka/Producer.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import Kafka.Types as X
4141
-- | Runs Kafka Producer.
4242
-- The callback provided is expected to call 'produceMessage'
4343
-- or/and 'produceMessageBatch' to send messages to Kafka.
44-
{-# DEPRECATED runProducer "Use newProducer/closeProducer instead" #-}
44+
{-# DEPRECATED runProducer "Use 'newProducer'/'closeProducer' instead" #-}
4545
runProducer :: ProducerProperties
4646
-> (KafkaProducer -> IO (Either KafkaError a))
4747
-> IO (Either KafkaError a)

0 commit comments

Comments
 (0)