Skip to content

Add a consumer option to rely on users for polling. #108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions src/Kafka/Consumer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import qualified Data.ByteString as BS
import Data.IORef (writeIORef, readIORef)
import qualified Data.Map as M
import Data.Maybe (fromMaybe)
import Data.Monoid ((<>))
import Data.Monoid ((<>), Any(Any))
import Foreign hiding (void)
import Kafka.Consumer.Convert
(fromMessagePtr, toNativeTopicPartitionList, topicPartitionFromMessageForCommit
Expand Down Expand Up @@ -113,17 +113,20 @@ newConsumer :: MonadIO m
=> ConsumerProperties
-> Subscription
-> m (Either KafkaError KafkaConsumer)
newConsumer props (Subscription ts tp) = liftIO $ do
let cp = setCallback (rebalanceCallback (\_ _ -> return ())) <> props
newConsumer props@ConsumerProperties { cpUserPolls = Any cup } (Subscription ts tp) = liftIO $ do
let cp = case cup of
False -> setCallback (rebalanceCallback (\_ _ -> return ())) <> props
True -> props
kc@(KafkaConf kc' qref ct) <- newConsumerConf cp
tp' <- topicConf (TopicProps tp)
_ <- setDefaultTopicConf kc tp'
rdk <- newRdKafkaT RdKafkaConsumer kc'
case rdk of
Left err -> return . Left $ KafkaError err
Right rdk' -> do
msgq <- rdKafkaQueueNew rdk'
writeIORef qref (Just msgq)
when (not cup) $ do
msgq <- rdKafkaQueueNew rdk'
writeIORef qref (Just msgq)
let kafka = KafkaConsumer (Kafka rdk') kc
redErr <- redirectCallbacksPoll kafka
case redErr of
Expand All @@ -132,23 +135,26 @@ newConsumer props (Subscription ts tp) = liftIO $ do
forM_ (cpLogLevel cp) (setConsumerLogLevel kafka)
sub <- subscribe kafka ts
case sub of
Nothing -> runConsumerLoop kafka ct (Just $ Timeout 100) >> return (Right kafka)
Nothing -> (when (not cup) $ runConsumerLoop kafka ct (Just $ Timeout 100)) >> return (Right kafka)
Just err -> closeConsumer kafka >> return (Left err)

pollMessage :: MonadIO m
=> KafkaConsumer
-> Timeout -- ^ the timeout, in milliseconds
-> m (Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString))) -- ^ Left on error or timeout, right for success
pollMessage c@(KafkaConsumer _ (KafkaConf _ qr _)) (Timeout ms) = liftIO $ do
pollConsumerEvents c Nothing
mbq <- readIORef qr
case mbq of
Nothing -> return . Left $ KafkaBadSpecification "Messages queue is not configured, internal error, fatal."
Just q -> rdKafkaConsumeQueue q (fromIntegral ms) >>= fromMessagePtr
Nothing -> rdKafkaConsumerPoll (getRdKafka c) ms >>= fromMessagePtr
Just q -> do
pollConsumerEvents c Nothing
rdKafkaConsumeQueue q (fromIntegral ms) >>= fromMessagePtr

-- | Polls up to BatchSize messages.
-- Unlike 'pollMessage' this function does not return usual "timeout" errors.
-- An empty batch is returned when there are no messages available.
--
-- This API is not available when 'userPolls' is set.
pollMessageBatch :: MonadIO m
=> KafkaConsumer
-> Timeout
Expand All @@ -158,7 +164,7 @@ pollMessageBatch c@(KafkaConsumer _ (KafkaConf _ qr _)) (Timeout ms) (BatchSize
pollConsumerEvents c Nothing
mbq <- readIORef qr
case mbq of
Nothing -> return [Left $ KafkaBadSpecification "Messages queue is not configured, internal error, fatal."]
Nothing -> return [Left $ KafkaBadSpecification "userPolls is set when calling pollMessageBatch."]
Just q -> rdKafkaConsumeBatchQueue q ms b >>= traverse fromMessagePtr

-- | Commit message's offset on broker for the message's partition.
Expand Down
18 changes: 16 additions & 2 deletions src/Kafka/Consumer/ConsumerProperties.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ module Kafka.Consumer.ConsumerProperties
, extraProp
, debugOptions
, queuedMaxMessagesKBytes
, userPolls
, module X
)
where

import Control.Monad (MonadPlus(mplus))
import Data.Map (Map)
import Data.Monoid (Any)
import qualified Data.Map as M
import Data.Semigroup as Sem
import Data.Text (Text)
Expand All @@ -36,11 +38,12 @@ data ConsumerProperties = ConsumerProperties
{ cpProps :: Map Text Text
, cpLogLevel :: Maybe KafkaLogLevel
, cpCallbacks :: [KafkaConf -> IO ()]
, cpUserPolls :: Any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Any means that once stitched on it can never be switched off. It could be confusing, so can we have just Bool or Last Bool?

}

instance Sem.Semigroup ConsumerProperties where
(ConsumerProperties m1 ll1 cb1) <> (ConsumerProperties m2 ll2 cb2) =
ConsumerProperties (M.union m2 m1) (ll2 `mplus` ll1) (cb1 `mplus` cb2)
(ConsumerProperties m1 ll1 cb1 cup1) <> (ConsumerProperties m2 ll2 cb2 cup2) =
ConsumerProperties (M.union m2 m1) (ll2 `mplus` ll1) (cb1 `mplus` cb2) (cup1 <> cup2)
{-# INLINE (<>) #-}

-- | /Right biased/ so we prefer newer properties over older ones.
Expand All @@ -49,6 +52,7 @@ instance Monoid ConsumerProperties where
{ cpProps = M.empty
, cpLogLevel = Nothing
, cpCallbacks = []
, cpUserPolls = Any False
}
{-# INLINE mempty #-}
mappend = (Sem.<>)
Expand Down Expand Up @@ -123,3 +127,13 @@ queuedMaxMessagesKBytes :: Int -> ConsumerProperties
queuedMaxMessagesKBytes kBytes =
extraProp "queued.max.messages.kbytes" (Text.pack $ show kBytes)
{-# INLINE queuedMaxMessagesKBytes #-}

-- | The user will poll the consumer frequently to handle both new
-- messages and rebalance events.
--
-- By default hw-kafka-client handles polling rebalance events for you
-- in a background thread, with this property set you can simplify
-- hw-kafka-client's footprint and have full control over when polling
-- happens at the cost of having to manage this yourself.
userPolls :: ConsumerProperties
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name userPolls kind of suggests that user should somehow poll manually.

Should we instead do something like:

data CallbackMode = CallbackModeAsync | CallbackModeSync

callbackMode :: CallbackMode -> ConsumerProperties

It looks a bit more explicit to me, do you agree?

userPolls = mempty { cpUserPolls = Any True }
197 changes: 101 additions & 96 deletions tests-it/Kafka/IntegrationSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -113,103 +113,9 @@ spec = do
res <- sendMessages (testMessages testTopic) prod
res `shouldBe` Right ()

specWithConsumer "Run consumer" consumerProps $ do
it "should get committed" $ \k -> do
res <- committed k (Timeout 1000) [(testTopic, PartitionId 0)]
res `shouldSatisfy` isRight
specWithConsumer "Run consumer" consumerProps runConsumerSpec

it "should get position" $ \k -> do
res <- position k [(testTopic, PartitionId 0)]
res `shouldSatisfy` isRight

it "should receive messages" $ \k -> do
res <- receiveMessages k
length <$> res `shouldBe` Right 2

let timestamps = crTimestamp <$> either (const []) id res
forM_ timestamps $ \ts ->
ts `shouldNotBe` NoTimestamp

comRes <- commitAllOffsets OffsetCommit k
comRes `shouldBe` Nothing

it "should get watermark offsets" $ \k -> do
res <- sequence <$> watermarkOffsets k (Timeout 1000) testTopic
res `shouldSatisfy` isRight
length <$> res `shouldBe` (Right 1)

it "should return subscription" $ \k -> do
res <- subscription k
res `shouldSatisfy` isRight
length <$> res `shouldBe` Right 1

it "should return assignment" $ \k -> do
res <- assignment k
res `shouldSatisfy` isRight
res `shouldBe` Right (fromList [(testTopic, [PartitionId 0])])

it "should return all topics metadata" $ \k -> do
res <- allTopicsMetadata k (Timeout 1000)
res `shouldSatisfy` isRight
let filterUserTopics m = m { kmTopics = filter (\t -> topicType (tmTopicName t) == User) (kmTopics m) }
let res' = fmap filterUserTopics res
length . kmBrokers <$> res' `shouldBe` Right 1
length . kmTopics <$> res' `shouldBe` Right 1

it "should return topic metadata" $ \k -> do
res <- topicMetadata k (Timeout 1000) testTopic
res `shouldSatisfy` isRight
(length . kmBrokers) <$> res `shouldBe` Right 1
(length . kmTopics) <$> res `shouldBe` Right 1

it "should describe all consumer groups" $ \k -> do
res <- allConsumerGroupsInfo k (Timeout 1000)
fmap giGroup <$> res `shouldBe` Right [testGroupId]

it "should describe a given consumer group" $ \k -> do
res <- consumerGroupInfo k (Timeout 1000) testGroupId
fmap giGroup <$> res `shouldBe` Right [testGroupId]

it "should describe non-existent consumer group" $ \k -> do
res <- consumerGroupInfo k (Timeout 1000) (ConsumerGroupId "does-not-exist")
res `shouldBe` Right []

it "should read topic offsets for time" $ \k -> do
res <- topicOffsetsForTime k (Timeout 1000) (Millis 1904057189508) testTopic
res `shouldSatisfy` isRight
fmap tpOffset <$> res `shouldBe` Right [PartitionOffsetEnd]

it "should seek and return no error" $ \k -> do
res <- seek k (Timeout 1000) [TopicPartition testTopic (PartitionId 0) (PartitionOffset 1)]
res `shouldBe` Nothing
msg <- pollMessage k (Timeout 1000)
crOffset <$> msg `shouldBe` Right (Offset 1)

it "should seek to the beginning" $ \k -> do
res <- seek k (Timeout 1000) [TopicPartition testTopic (PartitionId 0) PartitionOffsetBeginning]
res `shouldBe` Nothing
msg <- pollMessage k (Timeout 1000)
crOffset <$> msg `shouldBe` Right (Offset 0)

it "should seek to the end" $ \k -> do
res <- seek k (Timeout 1000) [TopicPartition testTopic (PartitionId 0) PartitionOffsetEnd]
res `shouldBe` Nothing
msg <- pollMessage k (Timeout 1000)
crOffset <$> msg `shouldSatisfy` (\x ->
x == Left (KafkaResponseError RdKafkaRespErrPartitionEof)
|| x == Left (KafkaResponseError RdKafkaRespErrTimedOut))

it "should respect out-of-bound offsets (invalid offset)" $ \k -> do
res <- seek k (Timeout 1000) [TopicPartition testTopic (PartitionId 0) PartitionOffsetInvalid]
res `shouldBe` Nothing
msg <- pollMessage k (Timeout 1000)
crOffset <$> msg `shouldBe` Right (Offset 0)

it "should respect out-of-bound offsets (huge offset)" $ \k -> do
res <- seek k (Timeout 1000) [TopicPartition testTopic (PartitionId 0) (PartitionOffset 123456)]
res `shouldBe` Nothing
msg <- pollMessage k (Timeout 1000)
crOffset <$> msg `shouldBe` Right (Offset 0)
specWithConsumer "Run consumer with user polling" (consumerProps <> userPolls) runConsumerSpec

describe "Kafka.Consumer.BatchSpec" $ do
specWithConsumer "Batch consumer" (consumerProps <> groupId (ConsumerGroupId "batch-consumer")) $ do
Expand Down Expand Up @@ -254,3 +160,102 @@ testMessages t =
sendMessages :: [ProducerRecord] -> KafkaProducer -> IO (Either KafkaError ())
sendMessages msgs prod =
Right <$> (forM_ msgs (produceMessage prod) >> flushProducer prod)

runConsumerSpec :: SpecWith KafkaConsumer
runConsumerSpec = do
it "should get committed" $ \k -> do
res <- committed k (Timeout 1000) [(testTopic, PartitionId 0)]
res `shouldSatisfy` isRight

it "should get position" $ \k -> do
res <- position k [(testTopic, PartitionId 0)]
res `shouldSatisfy` isRight

it "should receive messages" $ \k -> do
res <- receiveMessages k
length <$> res `shouldBe` Right 2

let timestamps = crTimestamp <$> either (const []) id res
forM_ timestamps $ \ts ->
ts `shouldNotBe` NoTimestamp

comRes <- commitAllOffsets OffsetCommit k
comRes `shouldBe` Nothing

it "should get watermark offsets" $ \k -> do
res <- sequence <$> watermarkOffsets k (Timeout 1000) testTopic
res `shouldSatisfy` isRight
length <$> res `shouldBe` (Right 1)

it "should return subscription" $ \k -> do
res <- subscription k
res `shouldSatisfy` isRight
length <$> res `shouldBe` Right 1

it "should return assignment" $ \k -> do
res <- assignment k
res `shouldSatisfy` isRight
res `shouldBe` Right (fromList [(testTopic, [PartitionId 0])])

it "should return all topics metadata" $ \k -> do
res <- allTopicsMetadata k (Timeout 1000)
res `shouldSatisfy` isRight
let filterUserTopics m = m { kmTopics = filter (\t -> topicType (tmTopicName t) == User) (kmTopics m) }
let res' = fmap filterUserTopics res
length . kmBrokers <$> res' `shouldBe` Right 1
length . kmTopics <$> res' `shouldBe` Right 1

it "should return topic metadata" $ \k -> do
res <- topicMetadata k (Timeout 1000) testTopic
res `shouldSatisfy` isRight
(length . kmBrokers) <$> res `shouldBe` Right 1
(length . kmTopics) <$> res `shouldBe` Right 1

it "should describe all consumer groups" $ \k -> do
res <- allConsumerGroupsInfo k (Timeout 1000)
fmap giGroup <$> res `shouldBe` Right [testGroupId]

it "should describe a given consumer group" $ \k -> do
res <- consumerGroupInfo k (Timeout 1000) testGroupId
fmap giGroup <$> res `shouldBe` Right [testGroupId]

it "should describe non-existent consumer group" $ \k -> do
res <- consumerGroupInfo k (Timeout 1000) (ConsumerGroupId "does-not-exist")
res `shouldBe` Right []

it "should read topic offsets for time" $ \k -> do
res <- topicOffsetsForTime k (Timeout 1000) (Millis 1904057189508) testTopic
res `shouldSatisfy` isRight
fmap tpOffset <$> res `shouldBe` Right [PartitionOffsetEnd]

it "should seek and return no error" $ \k -> do
res <- seek k (Timeout 1000) [TopicPartition testTopic (PartitionId 0) (PartitionOffset 1)]
res `shouldBe` Nothing
msg <- pollMessage k (Timeout 1000)
crOffset <$> msg `shouldBe` Right (Offset 1)

it "should seek to the beginning" $ \k -> do
res <- seek k (Timeout 1000) [TopicPartition testTopic (PartitionId 0) PartitionOffsetBeginning]
res `shouldBe` Nothing
msg <- pollMessage k (Timeout 1000)
crOffset <$> msg `shouldBe` Right (Offset 0)

it "should seek to the end" $ \k -> do
res <- seek k (Timeout 1000) [TopicPartition testTopic (PartitionId 0) PartitionOffsetEnd]
res `shouldBe` Nothing
msg <- pollMessage k (Timeout 1000)
crOffset <$> msg `shouldSatisfy` (\x ->
x == Left (KafkaResponseError RdKafkaRespErrPartitionEof)
|| x == Left (KafkaResponseError RdKafkaRespErrTimedOut))

it "should respect out-of-bound offsets (invalid offset)" $ \k -> do
res <- seek k (Timeout 1000) [TopicPartition testTopic (PartitionId 0) PartitionOffsetInvalid]
res `shouldBe` Nothing
msg <- pollMessage k (Timeout 1000)
crOffset <$> msg `shouldBe` Right (Offset 0)

it "should respect out-of-bound offsets (huge offset)" $ \k -> do
res <- seek k (Timeout 1000) [TopicPartition testTopic (PartitionId 0) (PartitionOffset 123456)]
res `shouldBe` Nothing
msg <- pollMessage k (Timeout 1000)
crOffset <$> msg `shouldBe` Right (Offset 0)