Skip to content

Commit 0c5ad2e

Browse files
committed
Do not batch poll and rebalance in parallel
If we allow a batch poll and rebalance event to run in parallel then there is no way to tell if a poll that starts before a rebalance and returns after the rebalance completes returns messages from the old assigned partition set or the new one. The hw-kafka-client user will then not know whether they should disregard the returned message set or process it. This makes it so the batch message polling runs in the same lock the consumer event callback does. That way we can be sure a poll for messages and a rebalance event will never be running in parallel.
1 parent bfc7420 commit 0c5ad2e

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

src/Kafka/Consumer.hs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ pollMessageBatch c@(KafkaConsumer _ (KafkaConf _ qr _)) (Timeout ms) (BatchSize
168168
mbq <- readIORef qr
169169
case mbq of
170170
Nothing -> return [Left $ KafkaBadSpecification "Calling pollMessageBatch while CallbackPollMode is set to CallbackPollModeSync."]
171-
Just q -> rdKafkaConsumeBatchQueue q ms b >>= traverse fromMessagePtr
171+
Just q -> whileNoCallbackRunning c $ rdKafkaConsumeBatchQueue q ms b >>= traverse fromMessagePtr
172172

173173
-- | Commit message's offset on broker for the message's partition.
174174
commitOffsetMessage :: MonadIO m
@@ -373,6 +373,11 @@ runConsumerLoop k timeout =
373373
CallbackPollEnabled -> go
374374
CallbackPollDisabled -> pure ()
375375

376+
whileNoCallbackRunning :: KafkaConsumer -> IO a -> IO a
377+
whileNoCallbackRunning k f = do
378+
let statusVar = kcfgCallbackPollStatus (getKafkaConf k)
379+
withMVar statusVar $ \_ -> f
380+
376381
withCallbackPollEnabled :: KafkaConsumer -> IO () -> IO CallbackPollStatus
377382
withCallbackPollEnabled k f = do
378383
let statusVar = kcfgCallbackPollStatus (getKafkaConf k)

0 commit comments

Comments
 (0)