Skip to content

Commit 73d3873

Browse files
author
sir4ur0n
committed
doc: Add documentation on Producer and remaining modules
1 parent b270311 commit 73d3873

File tree

13 files changed

+184
-68
lines changed

13 files changed

+184
-68
lines changed

README.md

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ HaskellWorks Kafka ecosystem is described here: <https://github.com/haskell-work
1313

1414
High level consumers are supported by `librdkafka` starting from version 0.9.
1515
High-level consumers provide an abstraction for consuming messages from multiple
16-
partitions and topics. They are also address scalability (up to a number of partitions)
16+
partitions and topics. They also address scalability (up to a number of partitions)
1717
by providing automatic rebalancing functionality. When a new consumer joins a consumer
18-
group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer.
18+
group the set of consumers attempts to "rebalance" the load to assign partitions to each consumer.
1919

2020
### Consumer example
2121

@@ -36,8 +36,6 @@ To run an example please compile with the `examples` flag.
3636

3737
```haskell
3838
import Control.Exception (bracket)
39-
import Data.Monoid ((<>))
40-
import Kafka
4139
import Kafka.Consumer
4240

4341
-- Global consumer properties
@@ -67,12 +65,11 @@ runConsumerExample = do
6765
-------------------------------------------------------------------
6866
processMessages :: KafkaConsumer -> IO (Either KafkaError ())
6967
processMessages kafka = do
70-
mapM_ (\_ -> do
71-
msg1 <- pollMessage kafka (Timeout 1000)
72-
putStrLn $ "Message: " <> show msg1
73-
err <- commitAllOffsets OffsetCommit kafka
74-
putStrLn $ "Offsets: " <> maybe "Committed." show err
75-
) [0 .. 10]
68+
replicateM_ 10 $ do
69+
msg <- pollMessage kafka (Timeout 1000)
70+
putStrLn $ "Message: " <> show msg
71+
err <- commitAllOffsets OffsetCommit kafka
72+
putStrLn $ "Offsets: " <> maybe "Committed." show err
7673
return $ Right ()
7774
```
7875

src/Kafka/Callbacks.hs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ import Kafka.Types (KafkaError(..), KafkaLogLevel(..))
1515
--
1616
-- Basic usage:
1717
--
18-
-- > setCallback (errorCallback myErrorCallback)
18+
-- > 'setCallback' ('errorCallback' myErrorCallback)
1919
-- >
20-
-- > myErrorCallback :: KafkaError -> String -> IO ()
20+
-- > myErrorCallback :: 'KafkaError' -> String -> IO ()
2121
-- > myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> message
2222
errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO ()
2323
errorCallback callback k =
@@ -30,9 +30,9 @@ errorCallback callback k =
3030
--
3131
-- Basic usage:
3232
--
33-
-- > setCallback (logCallback myLogCallback)
33+
-- > 'setCallback' ('logCallback' myLogCallback)
3434
-- >
35-
-- > myLogCallback :: KafkaLogLevel -> String -> String -> IO ()
35+
-- > myLogCallback :: 'KafkaLogLevel' -> String -> String -> IO ()
3636
-- > myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> message
3737
logCallback :: HasKafkaConf k => (KafkaLogLevel -> String -> String -> IO ()) -> k -> IO ()
3838
logCallback callback k =
@@ -45,7 +45,7 @@ logCallback callback k =
4545
--
4646
-- Basic usage:
4747
--
48-
-- > setCallback (statsCallback myStatsCallback)
48+
-- > 'setCallback' ('statsCallback' myStatsCallback)
4949
-- >
5050
-- > myStatsCallback :: String -> IO ()
5151
-- > myStatsCallback stats = print $ show stats

src/Kafka/Consumer.hs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@
1010
--
1111
-- @
1212
-- import Control.Exception (bracket)
13-
-- import Data.Monoid ((<>))
14-
-- import Kafka
13+
-- import Control.Monad (replicateM_)
1514
-- import Kafka.Consumer
1615
--
1716
-- -- Global consumer properties
@@ -33,21 +32,20 @@
3332
-- print res
3433
-- where
3534
-- mkConsumer = 'newConsumer' consumerProps consumerSub
36-
-- clConsumer (Left err) = return (Left err)
35+
-- clConsumer (Left err) = pure (Left err)
3736
-- clConsumer (Right kc) = (maybe (Right ()) Left) \<$\> 'closeConsumer' kc
38-
-- runHandler (Left err) = return (Left err)
37+
-- runHandler (Left err) = pure (Left err)
3938
-- runHandler (Right kc) = processMessages kc
4039
--
4140
-- -- Example polling 10 times before stopping
4241
-- processMessages :: 'KafkaConsumer' -> IO (Either 'KafkaError' ())
4342
-- processMessages kafka = do
44-
-- mapM_ (\_ -> do
45-
-- msg1 <- 'pollMessage' kafka ('Timeout' 1000)
46-
-- putStrLn $ "Message: " <> show msg1
47-
-- err <- 'commitAllOffsets' 'OffsetCommit' kafka
48-
-- putStrLn $ "Offsets: " <> maybe "Committed." show err
49-
-- ) [0 .. 10]
50-
-- return $ Right ()
43+
-- replicateM_ 10 $ do
44+
-- msg <- 'pollMessage' kafka ('Timeout' 1000)
45+
-- putStrLn $ "Message: " <> show msg
46+
-- err <- 'commitAllOffsets' 'OffsetCommit' kafka
47+
-- putStrLn $ "Offsets: " <> maybe "Committed." show err
48+
-- pure $ Right ()
5149
-- @
5250
-----------------------------------------------------------------------------
5351
module Kafka.Consumer
@@ -159,7 +157,7 @@ pollMessage c@(KafkaConsumer _ (KafkaConf _ qr _)) (Timeout ms) = liftIO $ do
159157
-- Unlike 'pollMessage' this function does not return usual "timeout" errors.
160158
-- An empty batch is returned when there are no messages available.
161159
--
162-
-- This API is not available when 'userPolls' is set.
160+
-- This API is not available when 'CallbackPollMode' is set to 'CallbackPollModeSync'.
163161
pollMessageBatch :: MonadIO m
164162
=> KafkaConsumer
165163
-> Timeout
@@ -169,7 +167,7 @@ pollMessageBatch c@(KafkaConsumer _ (KafkaConf _ qr _)) (Timeout ms) (BatchSize
169167
pollConsumerEvents c Nothing
170168
mbq <- readIORef qr
171169
case mbq of
172-
Nothing -> return [Left $ KafkaBadSpecification "userPolls is set when calling pollMessageBatch."]
170+
Nothing -> return [Left $ KafkaBadSpecification "Calling pollMessageBatch while CallbackPollMode is set to CallbackPollModeSync."]
173171
Just q -> rdKafkaConsumeBatchQueue q ms b >>= traverse fromMessagePtr
174172

175173
-- | Commit message's offset on broker for the message's partition.

src/Kafka/Consumer/Callbacks.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ rebalanceCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetRebalanceCb c
3131
-- | Sets a callback that is called when rebalance is needed.
3232
--
3333
-- The results of automatic or manual offset commits will be scheduled
34-
-- for this callback and is served by `pollMessage`.
34+
-- for this callback and is served by 'Kafka.Consumer.pollMessage'.
3535
--
3636
-- If no partitions had valid offsets to commit this callback will be called
37-
-- with `KafkaError` == `KafkaResponseError` `RdKafkaRespErrNoOffset` which is not to be considered
37+
-- with 'KafkaResponseError' 'RdKafkaRespErrNoOffset' which is not to be considered
3838
-- an error.
3939
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO ()
4040
offsetCommitCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetOffsetCommitCb conf realCb

src/Kafka/Consumer/ConsumerProperties.hs

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
{-# LANGUAGE OverloadedStrings #-}
22

3+
-----------------------------------------------------------------------------
4+
-- |
5+
-- Module with consumer properties types and functions.
6+
-----------------------------------------------------------------------------
37
module Kafka.Consumer.ConsumerProperties
48
( ConsumerProperties(..)
59
, CallbackPollMode(..)
@@ -34,9 +38,17 @@ import Kafka.Types (BrokerAddress (..), ClientId (..), KafkaC
3438

3539
import Kafka.Consumer.Callbacks as X
3640

37-
data CallbackPollMode = CallbackPollModeSync | CallbackPollModeAsync deriving (Show, Eq)
38-
39-
-- | Properties to create 'KafkaConsumer'.
41+
-- | Whether the callback polling should be done synchronously or not.
42+
data CallbackPollMode =
43+
-- | You have to poll the consumer frequently to handle new messages
44+
-- as well as rebalance and keep alive events.
45+
-- This enables lowering the footprint and having full control over when polling
46+
-- happens, at the cost of manually managing those events.
47+
CallbackPollModeSync
48+
-- | Handle polling rebalance and keep alive events for you in a background thread.
49+
| CallbackPollModeAsync deriving (Show, Eq)
50+
51+
-- | Properties to create 'Kafka.Consumer.Types.KafkaConsumer'.
4052
data ConsumerProperties = ConsumerProperties
4153
{ cpProps :: Map Text Text
4254
, cpLogLevel :: Maybe KafkaLogLevel
@@ -61,93 +73,96 @@ instance Monoid ConsumerProperties where
6173
mappend = (Sem.<>)
6274
{-# INLINE mappend #-}
6375

76+
-- | Set the <https://kafka.apache.org/documentation/#bootstrap.servers list of brokers> to contact to connect to the Kafka cluster.
6477
brokersList :: [BrokerAddress] -> ConsumerProperties
6578
brokersList bs =
6679
let bs' = Text.intercalate "," ((\(BrokerAddress x) -> x) <$> bs)
6780
in extraProps $ M.fromList [("bootstrap.servers", bs')]
6881

82+
-- | Set the <https://kafka.apache.org/documentation/#auto.commit.interval.ms auto commit interval> and enables <https://kafka.apache.org/documentation/#enable.auto.commit auto commit>.
6983
autoCommit :: Millis -> ConsumerProperties
7084
autoCommit (Millis ms) = extraProps $
7185
M.fromList
7286
[ ("enable.auto.commit", "true")
7387
, ("auto.commit.interval.ms", Text.pack $ show ms)
7488
]
7589

76-
-- | Disables auto commit for the consumer
90+
-- | Disable <https://kafka.apache.org/documentation/#enable.auto.commit auto commit> for the consumer.
7791
noAutoCommit :: ConsumerProperties
7892
noAutoCommit =
7993
extraProps $ M.fromList [("enable.auto.commit", "false")]
8094

81-
-- | Disables auto offset store for the consumer
95+
-- | Disable auto offset store for the consumer.
96+
--
97+
-- See <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md enable.auto.offset.store> for more information.
8298
noAutoOffsetStore :: ConsumerProperties
8399
noAutoOffsetStore =
84100
extraProps $ M.fromList [("enable.auto.offset.store", "false")]
85101

86-
-- | Consumer group id
102+
-- | Set the consumer <https://kafka.apache.org/documentation/#group.id group id>.
87103
groupId :: ConsumerGroupId -> ConsumerProperties
88104
groupId (ConsumerGroupId cid) =
89105
extraProps $ M.fromList [("group.id", cid)]
90106

107+
-- | Set the <https://kafka.apache.org/documentation/#client.id consumer identifier>.
91108
clientId :: ClientId -> ConsumerProperties
92109
clientId (ClientId cid) =
93110
extraProps $ M.fromList [("client.id", cid)]
94111

112+
-- | Set the consumer callback.
113+
--
114+
-- For examples of use, see:
115+
--
116+
-- * 'errorCallback'
117+
-- * 'logCallback'
118+
-- * 'statsCallback'
95119
setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties
96120
setCallback cb = mempty { cpCallbacks = [cb] }
97121

98-
-- | Sets the logging level.
122+
-- | Set the logging level.
99123
-- Usually is used with 'debugOptions' to configure which logs are needed.
100124
logLevel :: KafkaLogLevel -> ConsumerProperties
101125
logLevel ll = mempty { cpLogLevel = Just ll }
102126

103-
-- | Sets the compression codec for the consumer.
127+
-- | Set the <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md compression.codec> for the consumer.
104128
compression :: KafkaCompressionCodec -> ConsumerProperties
105129
compression c =
106130
extraProps $ M.singleton "compression.codec" (kafkaCompressionCodecToText c)
107131

108-
-- | Suppresses consumer disconnects logs.
132+
-- | Suppresses consumer <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md log.connection.close>.
109133
--
110134
-- It might be useful to turn this off when interacting with brokers
111-
-- with an aggressive connection.max.idle.ms value.
135+
-- with an aggressive @connection.max.idle.ms@ value.
112136
suppressDisconnectLogs :: ConsumerProperties
113137
suppressDisconnectLogs =
114138
extraProps $ M.fromList [("log.connection.close", "false")]
115139

116-
-- | Any configuration options that are supported by /librdkafka/.
140+
-- | Set any configuration options that are supported by /librdkafka/.
117141
-- The full list can be found <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md here>
118142
extraProps :: Map Text Text -> ConsumerProperties
119143
extraProps m = mempty { cpProps = m }
120144
{-# INLINE extraProps #-}
121145

122-
-- | Any configuration options that are supported by /librdkafka/.
146+
-- | Set any configuration option that is supported by /librdkafka/.
123147
-- The full list can be found <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md here>
124148
extraProp :: Text -> Text -> ConsumerProperties
125149
extraProp k v = mempty { cpProps = M.singleton k v }
126150
{-# INLINE extraProp #-}
127151

128-
-- | Sets debug features for the consumer.
129-
-- Usually is used with 'consumerLogLevel'.
152+
-- | Set <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md debug> features for the consumer.
153+
-- Usually is used with 'logLevel'.
130154
debugOptions :: [KafkaDebug] -> ConsumerProperties
131155
debugOptions [] = extraProps M.empty
132156
debugOptions d =
133157
let points = Text.intercalate "," (kafkaDebugToText <$> d)
134158
in extraProps $ M.fromList [("debug", points)]
135159

160+
-- | Set <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md queued.max.messages.kbytes>
136161
queuedMaxMessagesKBytes :: Int -> ConsumerProperties
137162
queuedMaxMessagesKBytes kBytes =
138163
extraProp "queued.max.messages.kbytes" (Text.pack $ show kBytes)
139164
{-# INLINE queuedMaxMessagesKBytes #-}
140165

141-
-- | Sets the callback poll mode.
142-
--
143-
-- The default 'CallbackPollModeAsync' mode handles polling rebalance
144-
-- and keep alive events for you
145-
-- in a background thread.
146-
--
147-
-- With 'CallbacPollModeSync' the user will poll the consumer
148-
-- frequently to handle new messages as well as rebalance and keep alive events.
149-
-- 'CallbacPollModeSync' lets you can simplify
150-
-- hw-kafka-client's footprint and have full control over when polling
151-
-- happens at the cost of having to manage this yourself.
166+
-- | Set the callback poll mode. Default value is 'CallbackPollModeAsync'.
152167
callbackPollMode :: CallbackPollMode -> ConsumerProperties
153168
callbackPollMode mode = mempty { cpCallbackPollMode = mode }

src/Kafka/Consumer/Subscription.hs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
{-# LANGUAGE OverloadedStrings #-}
22

3+
-----------------------------------------------------------------------------
4+
-- |
5+
-- Module with subscription types and functions.
6+
-----------------------------------------------------------------------------
37
module Kafka.Consumer.Subscription
48
( Subscription(..)
59
, topics
@@ -26,8 +30,8 @@ import Kafka.Types (TopicName (..))
2630
-- @
2731
-- consumerSub :: 'Subscription'
2832
-- consumerSub = 'topics' ['TopicName' "kafka-client-example-topic"]
29-
-- <> 'offsetReset' 'Earliest'
30-
-- <> 'extraSubscriptionProps' (fromList [("prop1", "value 1"), ("prop2", "value 2")])
33+
-- <> 'offsetReset' 'Earliest'
34+
-- <> 'extraSubscriptionProps' (fromList [("prop1", "value 1"), ("prop2", "value 2")])
3135
-- @
3236
data Subscription = Subscription (Set TopicName) (Map Text Text)
3337

src/Kafka/Consumer/Types.hs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
{-# LANGUAGE DeriveDataTypeable #-}
22
{-# LANGUAGE DeriveGeneric #-}
3+
4+
-----------------------------------------------------------------------------
5+
-- |
6+
-- Module holding consumer types.
7+
-----------------------------------------------------------------------------
38
module Kafka.Consumer.Types
49
( KafkaConsumer(..)
510
, ConsumerGroupId(..)
@@ -40,7 +45,7 @@ import Kafka.Types (Millis (..), PartitionId (..), TopicName (..))
4045

4146
-- | The main type for Kafka consumption, used e.g. to poll and commit messages.
4247
--
43-
-- Its constructor is intentionally not exposed, instead, one should used 'newConsumer' to acquire such a value.
48+
-- Its constructor is intentionally not exposed, instead, one should use 'Kafka.Consumer.newConsumer' to acquire such a value.
4449
data KafkaConsumer = KafkaConsumer
4550
{ kcKafkaPtr :: !Kafka
4651
, kcKafkaConf :: !KafkaConf
@@ -207,7 +212,7 @@ traverseM :: (Traversable t, Applicative f, Monad m)
207212
traverseM f r = sequenceA <$> traverse f r
208213
{-# INLINE traverseM #-}
209214

210-
{-# DEPRECATED bitraverseM "Isn't concern of this library. Use @'bisequenceA' '<$>' 'bimapM' f g r@" #-}
215+
{-# DEPRECATED bitraverseM "Isn't concern of this library. Use @'Data.Bitraversable.bisequenceA' '<$>' 'bimapM' f g r@" #-}
211216
bitraverseM :: (Bitraversable t, Applicative f, Monad m)
212217
=> (k -> m (f k'))
213218
-> (v -> m (f v'))

src/Kafka/Dump.hs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
-----------------------------------------------------------------------------
2+
-- |
3+
-- Module providing various functions to dump information. These may be useful for
4+
-- debug/investigation but should probably not be used on production applications.
5+
-----------------------------------------------------------------------------
16
module Kafka.Dump
27
( hPrintSupportedKafkaConf
38
, hPrintKafka

src/Kafka/Metadata.hs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
{-# LANGUAGE DeriveGeneric #-}
22
{-# LANGUAGE OverloadedStrings #-}
3+
4+
-----------------------------------------------------------------------------
5+
-- |
6+
-- Module with metadata types and functions.
7+
-----------------------------------------------------------------------------
38
module Kafka.Metadata
49
( KafkaMetadata(..), BrokerMetadata(..), TopicMetadata(..), PartitionMetadata(..)
510
, WatermarkOffsets(..)

0 commit comments

Comments
 (0)