Skip to content

Commit fd3de47

Browse files
committed
Remove unnecessary poll for events
1 parent 5a6cc78 commit fd3de47

File tree

7 files changed

+98
-126
lines changed

7 files changed

+98
-126
lines changed

src/Kafka/Consumer.hs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ module Kafka.Consumer
1919
where
2020

2121
import Control.Arrow (left, (&&&))
22-
import Control.Concurrent (forkIO, rtsSupportsBoundThreads)
22+
import Control.Concurrent (forkIO, rtsSupportsBoundThreads, withMVar)
2323
import Control.Exception (bracket)
24-
import Control.Monad (forM_, void, when)
24+
import Control.Monad (forM_, mapM_, void, when)
2525
import Control.Monad.IO.Class (MonadIO (liftIO))
2626
import Control.Monad.Trans.Except (ExceptT (ExceptT), runExceptT)
2727
import Data.Bifunctor (bimap, first)
@@ -38,7 +38,7 @@ import Kafka.Consumer.Convert (fromMessagePtr, fromNativeTop
3838
import Kafka.Consumer.Types (KafkaConsumer (..))
3939
import Kafka.Internal.CancellationToken as CToken
4040
import Kafka.Internal.RdKafka (RdKafkaRespErrT (..), RdKafkaTopicPartitionListTPtr, RdKafkaTypeT (..), newRdKafkaT, newRdKafkaTopicPartitionListT, newRdKafkaTopicT, rdKafkaAssignment, rdKafkaCommit, rdKafkaCommitted, rdKafkaConfSetDefaultTopicConf, rdKafkaConsumeBatchQueue, rdKafkaConsumeQueue, rdKafkaConsumerClose, rdKafkaConsumerPoll, rdKafkaOffsetsStore, rdKafkaPausePartitions, rdKafkaPollSetConsumer, rdKafkaPosition, rdKafkaQueueDestroy, rdKafkaQueueNew, rdKafkaResumePartitions, rdKafkaSeek, rdKafkaSetLogLevel, rdKafkaSubscribe, rdKafkaSubscription, rdKafkaTopicConfDup, rdKafkaTopicPartitionListAdd)
41-
import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), getRdKafka, kafkaConf, topicConf)
41+
import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), getKafkaConf, getRdKafka, kafkaConf, topicConf)
4242
import Kafka.Internal.Shared (kafkaErrorToMaybe, maybeToLeft, rdKafkaErrorToEither)
4343

4444
import Kafka.Consumer.ConsumerProperties as X
@@ -69,17 +69,17 @@ newConsumer :: MonadIO m
6969
-> Subscription
7070
-> m (Either KafkaError KafkaConsumer)
7171
newConsumer props (Subscription ts tp) = liftIO $ do
72-
let cp = case cpUserPolls props of
73-
CallbackModeAsync -> setCallback (rebalanceCallback (\_ _ -> return ())) <> props
74-
CallbackModeSync -> props
75-
kc@(KafkaConf kc' qref ct) <- newConsumerConf cp
72+
let cp = case cpCallbackPollMode props of
73+
CallbackPollModeAsync -> setCallback (rebalanceCallback (\_ _ -> return ())) <> props
74+
CallbackPollModeSync -> props
75+
kc@(KafkaConf kc' qref _ ct) <- newConsumerConf cp
7676
tp' <- topicConf (TopicProps tp)
7777
_ <- setDefaultTopicConf kc tp'
7878
rdk <- newRdKafkaT RdKafkaConsumer kc'
7979
case rdk of
8080
Left err -> return . Left $ KafkaError err
8181
Right rdk' -> do
82-
when (cpUserPolls props == CallbackModeAsync) $ do
82+
when (cpCallbackPollMode props == CallbackPollModeAsync) $ do
8383
msgq <- rdKafkaQueueNew rdk'
8484
writeIORef qref (Just msgq)
8585
let kafka = KafkaConsumer (Kafka rdk') kc
@@ -90,21 +90,19 @@ newConsumer props (Subscription ts tp) = liftIO $ do
9090
forM_ (cpLogLevel cp) (setConsumerLogLevel kafka)
9191
sub <- subscribe kafka ts
9292
case sub of
93-
Nothing -> (when (cpUserPolls props == CallbackModeAsync) $
93+
Nothing -> (when (cpCallbackPollMode props == CallbackPollModeAsync) $
9494
runConsumerLoop kafka ct (Just $ Timeout 100)) >> return (Right kafka)
9595
Just err -> closeConsumer kafka >> return (Left err)
9696

9797
pollMessage :: MonadIO m
9898
=> KafkaConsumer
9999
-> Timeout -- ^ the timeout, in milliseconds
100100
-> m (Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString))) -- ^ Left on error or timeout, right for success
101-
pollMessage c@(KafkaConsumer _ (KafkaConf _ qr _)) (Timeout ms) = liftIO $ do
101+
pollMessage c@(KafkaConsumer _ (KafkaConf _ qr _ _)) (Timeout ms) = liftIO $ do
102102
mbq <- readIORef qr
103103
case mbq of
104104
Nothing -> rdKafkaConsumerPoll (getRdKafka c) ms >>= fromMessagePtr
105-
Just q -> do
106-
pollConsumerEvents c Nothing
107-
rdKafkaConsumeQueue q (fromIntegral ms) >>= fromMessagePtr
105+
Just q -> rdKafkaConsumeQueue q (fromIntegral ms) >>= fromMessagePtr
108106

109107
-- | Polls up to BatchSize messages.
110108
-- Unlike 'pollMessage' this function does not return usual "timeout" errors.
@@ -116,7 +114,7 @@ pollMessageBatch :: MonadIO m
116114
-> Timeout
117115
-> BatchSize
118116
-> m [Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString))]
119-
pollMessageBatch c@(KafkaConsumer _ (KafkaConf _ qr _)) (Timeout ms) (BatchSize b) = liftIO $ do
117+
pollMessageBatch c@(KafkaConsumer _ (KafkaConf _ qr _ _)) (Timeout ms) (BatchSize b) = liftIO $ do
120118
pollConsumerEvents c Nothing
121119
mbq <- readIORef qr
122120
case mbq of
@@ -206,8 +204,7 @@ seek (KafkaConsumer (Kafka k) _) (Timeout timeout) tps = liftIO $
206204
where
207205
seekAll = runExceptT $ do
208206
tr <- traverse (ExceptT . topicPair) tps
209-
void $ traverse (\(kt, p, o) -> ExceptT (rdSeek kt p o)) tr
210-
return ()
207+
mapM_ (\(kt, p, o) -> ExceptT (rdSeek kt p o)) tr
211208

212209
rdSeek kt (PartitionId p) o =
213210
rdKafkaErrorToEither <$> rdKafkaSeek kt (fromIntegral p) (offsetToInt64 o) timeout
@@ -252,13 +249,14 @@ position (KafkaConsumer (Kafka k) _) tps = liftIO $ do
252249
-- when polling for events on each 'pollMessage' is not
253250
-- frequent enough.
254251
pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO ()
255-
pollConsumerEvents k timeout =
252+
pollConsumerEvents k timeout = do
256253
let (Timeout tm) = fromMaybe (Timeout 0) timeout
257-
in void $ rdKafkaConsumerPoll (getRdKafka k) tm
254+
let (KafkaConf _ _ rv _) = getKafkaConf k
255+
withMVar rv . const $ void $ rdKafkaConsumerPoll (getRdKafka k) tm
258256

259257
-- | Closes the consumer.
260258
closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError)
261-
closeConsumer (KafkaConsumer (Kafka k) (KafkaConf _ qr ct)) = liftIO $ do
259+
closeConsumer (KafkaConsumer (Kafka k) (KafkaConf _ qr _ ct)) = liftIO $ do
262260
CToken.cancel ct
263261
mbq <- readIORef qr
264262
void $ traverse rdKafkaQueueDestroy mbq
@@ -285,7 +283,7 @@ subscribe (KafkaConsumer (Kafka k) _) ts = do
285283
return $ kafkaErrorToMaybe res
286284

287285
setDefaultTopicConf :: KafkaConf -> TopicConf -> IO ()
288-
setDefaultTopicConf (KafkaConf kc _ _) (TopicConf tc) =
286+
setDefaultTopicConf (KafkaConf kc _ _ _) (TopicConf tc) =
289287
rdKafkaTopicConfDup tc >>= rdKafkaConfSetDefaultTopicConf kc
290288

291289
commitOffsets :: OffsetCommit -> KafkaConsumer -> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)

src/Kafka/Consumer/Callbacks.hs

Lines changed: 16 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,17 @@ import Data.Monoid ((<>))
1212
import Foreign.ForeignPtr (newForeignPtr_)
1313
import Foreign.Ptr (nullPtr)
1414
import Kafka.Callbacks as X
15-
import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'', toNativeTopicPartitionList)
15+
import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'')
1616
import Kafka.Consumer.Types (KafkaConsumer (..), RebalanceEvent (..), TopicPartition (..))
1717
import Kafka.Internal.RdKafka
1818
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue)
19-
import Kafka.Internal.Shared (kafkaErrorToMaybe)
2019
import Kafka.Types (KafkaError (..), PartitionId (..), TopicName (..))
2120

2221
import qualified Data.Text as Text
2322

2423
-- | Sets a callback that is called when rebalance is needed.
25-
--
26-
-- Callback implementations suppose to watch for 'KafkaResponseError' 'RdKafkaRespErrAssignPartitions' and
27-
-- for 'KafkaResponseError' 'RdKafkaRespErrRevokePartitions'. Other error codes are not expected and would indicate
28-
-- something really bad happening in a system, or bugs in @librdkafka@ itself.
29-
--
30-
-- A callback is expected to call 'assign' according to the error code it receives.
31-
--
32-
-- * When 'RdKafkaRespErrAssignPartitions' happens 'assign' should be called with all the partitions it was called with.
33-
-- It is OK to alter partitions offsets before calling 'assign'.
34-
--
35-
-- * When 'RdKafkaRespErrRevokePartitions' happens 'assign' should be called with an empty list of partitions.
36-
-- rebalanceCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO ()
3724
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO ()
38-
rebalanceCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetRebalanceCb conf realCb
25+
rebalanceCallback callback kc@(KafkaConf conf _ _ _) = rdKafkaConfSetRebalanceCb conf realCb
3926
where
4027
realCb k err pl = do
4128
k' <- newForeignPtr_ k
@@ -47,13 +34,11 @@ rebalanceCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetRebalanceCb c
4734
-- The results of automatic or manual offset commits will be scheduled
4835
-- for this callback and is served by `pollMessage`.
4936
--
50-
-- A callback is expected to call 'assign' according to the error code it receives.
51-
--
5237
-- If no partitions had valid offsets to commit this callback will be called
5338
-- with `KafkaError` == `KafkaResponseError` `RdKafkaRespErrNoOffset` which is not to be considered
5439
-- an error.
5540
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO ()
56-
offsetCommitCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetOffsetCommitCb conf realCb
41+
offsetCommitCallback callback kc@(KafkaConf conf _ _ _) = rdKafkaConfSetOffsetCommitCb conf realCb
5742
where
5843
realCb k err pl = do
5944
k' <- newForeignPtr_ k
@@ -75,46 +60,30 @@ setRebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ())
7560
setRebalanceCallback f k e pls = do
7661
ps <- fromNativeTopicPartitionList'' pls
7762
let assignment = (tpTopicName &&& tpPartition) <$> ps
63+
let (Kafka kptr) = getKafka k
64+
7865
case e of
7966
KafkaResponseError RdKafkaRespErrAssignPartitions -> do
80-
mbq <- getRdMsgQueue $ getKafkaConf k
8167
f k (RebalanceBeforeAssign assignment)
68+
void $ rdKafkaAssign kptr pls
69+
70+
mbq <- getRdMsgQueue $ getKafkaConf k
8271
case mbq of
8372
Nothing -> pure ()
8473
Just mq -> do
85-
let (Kafka kptr) = getKafka k
86-
-- Magnus Edenhill:
87-
-- If you redirect after assign() it means some messages may be forwarded to the single consumer queue,
88-
-- so either do it before assign() or do: assign(); pause(); redirect; resume()
89-
void $ rdKafkaAssign kptr pls
74+
{- Magnus Edenhill:
75+
If you redirect after assign() it means some messages may be forwarded to the single consumer queue,
76+
so either do it before assign() or do: assign(); pause(); redirect; resume()
77+
-}
9078
void $ rdKafkaPausePartitions kptr pls
9179
forM_ ps (\tp -> redirectPartitionQueue (getKafka k) (tpTopicName tp) (tpPartition tp) mq)
9280
void $ rdKafkaResumePartitions kptr pls
81+
9382
f k (RebalanceAssign assignment)
83+
9484
KafkaResponseError RdKafkaRespErrRevokePartitions -> do
9585
f k (RebalanceBeforeRevoke assignment)
96-
void $ assign k []
86+
void $ newForeignPtr_ nullPtr >>= rdKafkaAssign kptr
87+
-- void $ assign k []
9788
f k (RebalanceRevoke assignment)
9889
x -> error $ "Rebalance: UNKNOWN response: " <> show x
99-
100-
101-
-- | Assigns specified partitions to a current consumer.
102-
-- Assigning an empty list means unassigning from all partitions that are currently assigned.
103-
assign :: KafkaConsumer -> [TopicPartition] -> IO (Maybe KafkaError)
104-
assign (KafkaConsumer (Kafka k) _) ps =
105-
let pl = if null ps
106-
then newForeignPtr_ nullPtr
107-
else toNativeTopicPartitionList ps
108-
er = KafkaResponseError <$> (pl >>= rdKafkaAssign k)
109-
in kafkaErrorToMaybe <$> er
110-
111-
-- -- | Assigns specified partitions to a current consumer.
112-
-- -- Assigning an empty list means unassigning from all partitions that are currently assigned.
113-
-- assign' :: KafkaConsumer -> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
114-
-- assign' (KafkaConsumer (Kafka k) _) pls = do
115-
116-
117-
118-
-- where
119-
-- asExcept f = ExceptT $ rdKafkaErrorToEither <$> f
120-

src/Kafka/Consumer/ConsumerProperties.hs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
module Kafka.Consumer.ConsumerProperties
44
( ConsumerProperties(..)
5-
, CallbackMode(..)
5+
, CallbackPollMode(..)
66
, brokersList
77
, autoCommit
88
, noAutoCommit
@@ -30,18 +30,18 @@ import Data.Text (Text)
3030
import qualified Data.Text as Text
3131
import Kafka.Consumer.Types (ConsumerGroupId (..))
3232
import Kafka.Internal.Setup (KafkaConf (..))
33-
import Kafka.Types (BrokerAddress (..), ClientId (..), KafkaCompressionCodec (..), KafkaDebug (..), KafkaLogLevel (..), kafkaCompressionCodecToText, kafkaDebugToText, Millis(..))
33+
import Kafka.Types (BrokerAddress (..), ClientId (..), KafkaCompressionCodec (..), KafkaDebug (..), KafkaLogLevel (..), Millis (..), kafkaCompressionCodecToText, kafkaDebugToText)
3434

3535
import Kafka.Consumer.Callbacks as X
3636

37-
data CallbackMode = CallbackModeSync | CallbackModeAsync deriving (Show, Eq)
37+
data CallbackPollMode = CallbackPollModeSync | CallbackPollModeAsync deriving (Show, Eq)
3838

3939
-- | Properties to create 'KafkaConsumer'.
4040
data ConsumerProperties = ConsumerProperties
41-
{ cpProps :: Map Text Text
42-
, cpLogLevel :: Maybe KafkaLogLevel
43-
, cpCallbacks :: [KafkaConf -> IO ()]
44-
, cpUserPolls :: CallbackMode
41+
{ cpProps :: Map Text Text
42+
, cpLogLevel :: Maybe KafkaLogLevel
43+
, cpCallbacks :: [KafkaConf -> IO ()]
44+
, cpCallbackPollMode :: CallbackPollMode
4545
}
4646

4747
instance Sem.Semigroup ConsumerProperties where
@@ -52,10 +52,10 @@ instance Sem.Semigroup ConsumerProperties where
5252
-- | /Right biased/ so we prefer newer properties over older ones.
5353
instance Monoid ConsumerProperties where
5454
mempty = ConsumerProperties
55-
{ cpProps = M.empty
56-
, cpLogLevel = Nothing
57-
, cpCallbacks = []
58-
, cpUserPolls = CallbackModeAsync
55+
{ cpProps = M.empty
56+
, cpLogLevel = Nothing
57+
, cpCallbacks = []
58+
, cpCallbackPollMode = CallbackPollModeAsync
5959
}
6060
{-# INLINE mempty #-}
6161
mappend = (Sem.<>)
@@ -140,14 +140,14 @@ queuedMaxMessagesKBytes kBytes =
140140

141141
-- | Sets the callback poll mode.
142142
--
143-
-- The default 'CallbackModeAsync' mode handles polling rebalance
143+
-- The default 'CallbackPollModeAsync' mode handles polling rebalance
144144
-- and keep alive events for you
145145
-- in a background thread.
146146
--
147-
-- With 'CalalcacModeSync' the user will poll the consumer
147+
-- With 'CallbacPollModeSync' the user will poll the consumer
148148
-- frequently to handle new messages as well as rebalance and keep alive events.
149-
-- 'CalalcacModeSync' lets you can simplify
149+
-- 'CallbacPollModeSync' lets you can simplify
150150
-- hw-kafka-client's footprint and have full control over when polling
151151
-- happens at the cost of having to manage this yourself.
152-
callbackPollMode :: CallbackMode -> ConsumerProperties
153-
callbackPollMode mode = mempty { cpUserPolls = mode }
152+
callbackPollMode :: CallbackPollMode -> ConsumerProperties
153+
callbackPollMode mode = mempty { cpCallbackPollMode = mode }

src/Kafka/Consumer/Subscription.hs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,14 @@ module Kafka.Consumer.Subscription
88
)
99
where
1010

11-
import qualified Data.Text as Text
12-
import Data.Text (Text)
1311
import Data.Map (Map)
1412
import qualified Data.Map as M
1513
import Data.Semigroup as Sem
16-
import Kafka.Consumer.Types (OffsetReset(..))
17-
import Kafka.Types (TopicName(..), Millis(..))
1814
import Data.Set (Set)
1915
import qualified Data.Set as Set
16+
import Data.Text (Text)
17+
import Kafka.Consumer.Types (OffsetReset (..))
18+
import Kafka.Types (TopicName (..))
2019

2120
data Subscription = Subscription (Set TopicName) (Map Text Text)
2221

@@ -41,7 +40,7 @@ offsetReset o =
4140
let o' = case o of
4241
Earliest -> "earliest"
4342
Latest -> "latest"
44-
in Subscription (Set.empty) (M.fromList [("auto.offset.reset", o')])
43+
in Subscription Set.empty (M.fromList [("auto.offset.reset", o')])
4544

4645
extraSubscriptionProps :: Map Text Text -> Subscription
47-
extraSubscriptionProps = Subscription (Set.empty)
46+
extraSubscriptionProps = Subscription Set.empty

0 commit comments

Comments
 (0)