Skip to content

Introduce CallbackMode #110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ zookeeper:

kafka:
image: confluentinc/cp-kafka:latest
hostname: kafka
hostname: localhost
ports:
- 9092:9092
links:
Expand Down
1 change: 1 addition & 0 deletions hw-kafka-client.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ test-suite integration-tests
, hspec
, hw-kafka-client
, monad-loops
, random
, text
, transformers
other-modules:
Expand Down
88 changes: 22 additions & 66 deletions src/Kafka/Consumer.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TupleSections #-}
module Kafka.Consumer
( module X
, runConsumer
Expand All @@ -18,73 +18,28 @@ module Kafka.Consumer
)
where

import Data.Set (Set)
import qualified Data.Set as Set
import qualified Data.Text as Text
import Control.Arrow ((&&&), left)
import Control.Arrow (left, (&&&))
import Control.Concurrent (forkIO, rtsSupportsBoundThreads)
import Control.Exception (bracket)
import Control.Monad (forM_, void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Control.Monad.Trans.Except (ExceptT(ExceptT), runExceptT)
import Data.Bifunctor (first, bimap)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Except (ExceptT (ExceptT), runExceptT)
import Data.Bifunctor (bimap, first)
import qualified Data.ByteString as BS
import Data.IORef (writeIORef, readIORef)
import Data.IORef (readIORef, writeIORef)
import qualified Data.Map as M
import Data.Maybe (fromMaybe)
import Data.Monoid ((<>), Any(Any))
import Data.Monoid ((<>))
import Data.Set (Set)
import qualified Data.Set as Set
import qualified Data.Text as Text
import Foreign hiding (void)
import Kafka.Consumer.Convert
(fromMessagePtr, toNativeTopicPartitionList, topicPartitionFromMessageForCommit
, toNativeTopicPartitionListNoDispose, toNativeTopicPartitionList, fromNativeTopicPartitionList''
, toMap, offsetToInt64, toNativeTopicPartitionList', offsetCommitToBool
)
import Kafka.Consumer.Types (KafkaConsumer(..))
import Kafka.Consumer.Convert (fromMessagePtr, fromNativeTopicPartitionList'', offsetCommitToBool, offsetToInt64, toMap, toNativeTopicPartitionList, toNativeTopicPartitionList', toNativeTopicPartitionListNoDispose, topicPartitionFromMessageForCommit)
import Kafka.Consumer.Types (KafkaConsumer (..))
import Kafka.Internal.CancellationToken as CToken
import Kafka.Internal.RdKafka
( RdKafkaRespErrT(..)
, RdKafkaTopicPartitionListTPtr
, RdKafkaTypeT(..)
, newRdKafkaT
, rdKafkaQueueNew
, rdKafkaConsumeQueue
, rdKafkaPollSetConsumer
, rdKafkaSetLogLevel
, rdKafkaOffsetsStore
, rdKafkaCommit
, rdKafkaConfSetDefaultTopicConf
, rdKafkaTopicConfDup
, rdKafkaSubscribe
, rdKafkaTopicPartitionListAdd
, newRdKafkaTopicPartitionListT
, rdKafkaConsumerClose
, rdKafkaQueueDestroy
, rdKafkaConsumerPoll
, rdKafkaPosition
, rdKafkaCommitted
, rdKafkaSeek
, rdKafkaResumePartitions
, rdKafkaPausePartitions
, rdKafkaSubscription
, rdKafkaAssignment
, rdKafkaConsumeBatchQueue
, newRdKafkaTopicT
)
import Kafka.Internal.Setup
( Kafka(..)
, KafkaConf(..)
, TopicConf(..)
, KafkaProps(..)
, TopicProps(..)
, kafkaConf
, topicConf
, getRdKafka
)
import Kafka.Internal.Shared
( kafkaErrorToMaybe
, maybeToLeft
, rdKafkaErrorToEither
)
import Kafka.Internal.RdKafka (RdKafkaRespErrT (..), RdKafkaTopicPartitionListTPtr, RdKafkaTypeT (..), newRdKafkaT, newRdKafkaTopicPartitionListT, newRdKafkaTopicT, rdKafkaAssignment, rdKafkaCommit, rdKafkaCommitted, rdKafkaConfSetDefaultTopicConf, rdKafkaConsumeBatchQueue, rdKafkaConsumeQueue, rdKafkaConsumerClose, rdKafkaConsumerPoll, rdKafkaOffsetsStore, rdKafkaPausePartitions, rdKafkaPollSetConsumer, rdKafkaPosition, rdKafkaQueueDestroy, rdKafkaQueueNew, rdKafkaResumePartitions, rdKafkaSeek, rdKafkaSetLogLevel, rdKafkaSubscribe, rdKafkaSubscription, rdKafkaTopicConfDup, rdKafkaTopicPartitionListAdd)
import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), getRdKafka, kafkaConf, topicConf)
import Kafka.Internal.Shared (kafkaErrorToMaybe, maybeToLeft, rdKafkaErrorToEither)

import Kafka.Consumer.ConsumerProperties as X
import Kafka.Consumer.Subscription as X
Expand Down Expand Up @@ -113,18 +68,18 @@ newConsumer :: MonadIO m
=> ConsumerProperties
-> Subscription
-> m (Either KafkaError KafkaConsumer)
newConsumer props@ConsumerProperties { cpUserPolls = Any cup } (Subscription ts tp) = liftIO $ do
let cp = case cup of
False -> setCallback (rebalanceCallback (\_ _ -> return ())) <> props
True -> props
newConsumer props (Subscription ts tp) = liftIO $ do
let cp = case cpUserPolls props of
CallbackModeAsync -> setCallback (rebalanceCallback (\_ _ -> return ())) <> props
CallbackModeSync -> props
kc@(KafkaConf kc' qref ct) <- newConsumerConf cp
tp' <- topicConf (TopicProps tp)
_ <- setDefaultTopicConf kc tp'
rdk <- newRdKafkaT RdKafkaConsumer kc'
case rdk of
Left err -> return . Left $ KafkaError err
Right rdk' -> do
when (not cup) $ do
when (cpUserPolls props == CallbackModeAsync) $ do
msgq <- rdKafkaQueueNew rdk'
writeIORef qref (Just msgq)
let kafka = KafkaConsumer (Kafka rdk') kc
Expand All @@ -135,7 +90,8 @@ newConsumer props@ConsumerProperties { cpUserPolls = Any cup } (Subscription ts
forM_ (cpLogLevel cp) (setConsumerLogLevel kafka)
sub <- subscribe kafka ts
case sub of
Nothing -> (when (not cup) $ runConsumerLoop kafka ct (Just $ Timeout 100)) >> return (Right kafka)
Nothing -> (when (cpUserPolls props == CallbackModeAsync) $
runConsumerLoop kafka ct (Just $ Timeout 100)) >> return (Right kafka)
Just err -> closeConsumer kafka >> return (Left err)

pollMessage :: MonadIO m
Expand Down
6 changes: 3 additions & 3 deletions src/Kafka/Consumer/Callbacks.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import Foreign.ForeignPtr (newForeignPtr_)
import Foreign.Ptr (nullPtr)
import Kafka.Callbacks as X
import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'', toNativeTopicPartitionList)
import Kafka.Consumer.Types (KafkaConsumer(..), TopicPartition(..), RebalanceEvent(..))
import Kafka.Consumer.Types (KafkaConsumer (..), RebalanceEvent (..), TopicPartition (..))
import Kafka.Internal.RdKafka
import Kafka.Internal.Setup (Kafka(..), KafkaConf(..), HasKafka(..), HasKafkaConf(..), getRdMsgQueue)
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue)
import Kafka.Internal.Shared (kafkaErrorToMaybe)
import Kafka.Types (KafkaError(..), PartitionId(..), TopicName(..))
import Kafka.Types (KafkaError (..), PartitionId (..), TopicName (..))

-- | Sets a callback that is called when rebalance is needed.
--
Expand Down
38 changes: 22 additions & 16 deletions src/Kafka/Consumer/ConsumerProperties.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

module Kafka.Consumer.ConsumerProperties
( ConsumerProperties(..)
, CallbackMode(..)
, brokersList
, noAutoCommit
, noAutoOffsetStore
Expand All @@ -15,35 +16,36 @@ module Kafka.Consumer.ConsumerProperties
, extraProp
, debugOptions
, queuedMaxMessagesKBytes
, userPolls
, callbackPollMode
, module X
)
where

import Control.Monad (MonadPlus(mplus))
import Control.Monad (MonadPlus (mplus))
import Data.Map (Map)
import Data.Monoid (Any)
import qualified Data.Map as M
import Data.Semigroup as Sem
import Data.Text (Text)
import qualified Data.Text as Text
import Kafka.Consumer.Types (ConsumerGroupId(..))
import Kafka.Internal.Setup (KafkaConf(..))
import Kafka.Types (KafkaDebug(..), KafkaCompressionCodec(..), KafkaLogLevel(..), ClientId(..), BrokerAddress(..), kafkaDebugToText, kafkaCompressionCodecToText)
import Kafka.Consumer.Types (ConsumerGroupId (..))
import Kafka.Internal.Setup (KafkaConf (..))
import Kafka.Types (BrokerAddress (..), ClientId (..), KafkaCompressionCodec (..), KafkaDebug (..), KafkaLogLevel (..), kafkaCompressionCodecToText, kafkaDebugToText)

import Kafka.Consumer.Callbacks as X

data CallbackMode = CallbackModeSync | CallbackModeAsync deriving (Show, Eq)

-- | Properties to create 'KafkaConsumer'.
data ConsumerProperties = ConsumerProperties
{ cpProps :: Map Text Text
, cpLogLevel :: Maybe KafkaLogLevel
, cpCallbacks :: [KafkaConf -> IO ()]
, cpUserPolls :: Any
, cpUserPolls :: CallbackMode
}

instance Sem.Semigroup ConsumerProperties where
(ConsumerProperties m1 ll1 cb1 cup1) <> (ConsumerProperties m2 ll2 cb2 cup2) =
ConsumerProperties (M.union m2 m1) (ll2 `mplus` ll1) (cb1 `mplus` cb2) (cup1 <> cup2)
(ConsumerProperties m1 ll1 cb1 _) <> (ConsumerProperties m2 ll2 cb2 cup2) =
ConsumerProperties (M.union m2 m1) (ll2 `mplus` ll1) (cb1 `mplus` cb2) cup2
{-# INLINE (<>) #-}

-- | /Right biased/ so we prefer newer properties over older ones.
Expand All @@ -52,7 +54,7 @@ instance Monoid ConsumerProperties where
{ cpProps = M.empty
, cpLogLevel = Nothing
, cpCallbacks = []
, cpUserPolls = Any False
, cpUserPolls = CallbackModeAsync
}
{-# INLINE mempty #-}
mappend = (Sem.<>)
Expand Down Expand Up @@ -128,12 +130,16 @@ queuedMaxMessagesKBytes kBytes =
extraProp "queued.max.messages.kbytes" (Text.pack $ show kBytes)
{-# INLINE queuedMaxMessagesKBytes #-}

-- | The user will poll the consumer frequently to handle both new
-- messages and rebalance events.
-- | Sets the callback poll mode.
--
-- The default 'CallbackModeAsync' mode handles polling rebalance
-- and keep alive events for you
-- in a background thread.
--
-- By default hw-kafka-client handles polling rebalance events for you
-- in a background thread, with this property set you can simplify
-- With 'CalalcacModeSync' the user will poll the consumer
-- frequently to handle new messages as well as rebalance and keep alive events.
-- 'CalalcacModeSync' lets you can simplify
-- hw-kafka-client's footprint and have full control over when polling
-- happens at the cost of having to manage this yourself.
userPolls :: ConsumerProperties
userPolls = mempty { cpUserPolls = Any True }
callbackPollMode :: CallbackMode -> ConsumerProperties
callbackPollMode mode = mempty { cpUserPolls = mode }
37 changes: 23 additions & 14 deletions tests-it/Kafka/IntegrationSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ spec = do
res <- sendMessages (testMessages testTopic) prod
res `shouldBe` Right ()

specWithConsumer "Run consumer" consumerProps runConsumerSpec

specWithConsumer "Run consumer with user polling" (consumerProps <> userPolls) runConsumerSpec
specWithConsumer "Run consumer with async polling" (consumerProps <> groupId (makeGroupId "async")) runConsumerSpec
specWithConsumer "Run consumer with sync polling" (consumerProps <> groupId (makeGroupId "sync") <> callbackPollMode CallbackModeSync) runConsumerSpec

describe "Kafka.Consumer.BatchSpec" $ do
specWithConsumer "Batch consumer" (consumerProps <> groupId (ConsumerGroupId "batch-consumer")) $ do
Expand Down Expand Up @@ -163,17 +162,10 @@ sendMessages msgs prod =

runConsumerSpec :: SpecWith KafkaConsumer
runConsumerSpec = do
it "should get committed" $ \k -> do
res <- committed k (Timeout 1000) [(testTopic, PartitionId 0)]
res `shouldSatisfy` isRight

it "should get position" $ \k -> do
res <- position k [(testTopic, PartitionId 0)]
res `shouldSatisfy` isRight

it "should receive messages" $ \k -> do
res <- receiveMessages k
length <$> res `shouldBe` Right 2
let msgsLen = either (const 0) length res
msgsLen `shouldSatisfy` (> 0)

let timestamps = crTimestamp <$> either (const []) id res
forM_ timestamps $ \ts ->
Expand All @@ -182,6 +174,14 @@ runConsumerSpec = do
comRes <- commitAllOffsets OffsetCommit k
comRes `shouldBe` Nothing

it "should get committed" $ \k -> do
res <- committed k (Timeout 1000) [(testTopic, PartitionId 0)]
res `shouldSatisfy` isRight

it "should get position" $ \k -> do
res <- position k [(testTopic, PartitionId 0)]
res `shouldSatisfy` isRight

it "should get watermark offsets" $ \k -> do
res <- sequence <$> watermarkOffsets k (Timeout 1000) testTopic
res `shouldSatisfy` isRight
Expand All @@ -203,7 +203,12 @@ runConsumerSpec = do
let filterUserTopics m = m { kmTopics = filter (\t -> topicType (tmTopicName t) == User) (kmTopics m) }
let res' = fmap filterUserTopics res
length . kmBrokers <$> res' `shouldBe` Right 1
length . kmTopics <$> res' `shouldBe` Right 1

let topicsLen = either (const 0) (length . kmTopics) res'
let hasTopic = either (const False) (any (\t -> tmTopicName t == testTopic) . kmTopics) res'

topicsLen `shouldSatisfy` (>0)
hasTopic `shouldBe` True

it "should return topic metadata" $ \k -> do
res <- topicMetadata k (Timeout 1000) testTopic
Expand All @@ -213,7 +218,11 @@ runConsumerSpec = do

it "should describe all consumer groups" $ \k -> do
res <- allConsumerGroupsInfo k (Timeout 1000)
fmap giGroup <$> res `shouldBe` Right [testGroupId]
let groups = either (const []) (fmap giGroup) res
let prefixedGroups = filter isTestGroupId groups
let resLen = length prefixedGroups
resLen `shouldSatisfy` (>0)
-- fmap giGroup <$> res `shouldBe` Right [testGroupId]

it "should describe a given consumer group" $ \k -> do
res <- consumerGroupInfo k (Timeout 1000) testGroupId
Expand Down
Loading