-
Notifications
You must be signed in to change notification settings - Fork 54
Add a consumer option to rely on users for polling. #108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,12 +15,14 @@ module Kafka.Consumer.ConsumerProperties | |
, extraProp | ||
, debugOptions | ||
, queuedMaxMessagesKBytes | ||
, userPolls | ||
, module X | ||
) | ||
where | ||
|
||
import Control.Monad (MonadPlus(mplus)) | ||
import Data.Map (Map) | ||
import Data.Monoid (Any) | ||
import qualified Data.Map as M | ||
import Data.Semigroup as Sem | ||
import Data.Text (Text) | ||
|
@@ -36,11 +38,12 @@ data ConsumerProperties = ConsumerProperties | |
{ cpProps :: Map Text Text | ||
, cpLogLevel :: Maybe KafkaLogLevel | ||
, cpCallbacks :: [KafkaConf -> IO ()] | ||
, cpUserPolls :: Any | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using |
||
} | ||
|
||
instance Sem.Semigroup ConsumerProperties where | ||
(ConsumerProperties m1 ll1 cb1) <> (ConsumerProperties m2 ll2 cb2) = | ||
ConsumerProperties (M.union m2 m1) (ll2 `mplus` ll1) (cb1 `mplus` cb2) | ||
(ConsumerProperties m1 ll1 cb1 cup1) <> (ConsumerProperties m2 ll2 cb2 cup2) = | ||
ConsumerProperties (M.union m2 m1) (ll2 `mplus` ll1) (cb1 `mplus` cb2) (cup1 <> cup2) | ||
{-# INLINE (<>) #-} | ||
|
||
-- | /Right biased/ so we prefer newer properties over older ones. | ||
|
@@ -49,6 +52,7 @@ instance Monoid ConsumerProperties where | |
{ cpProps = M.empty | ||
, cpLogLevel = Nothing | ||
, cpCallbacks = [] | ||
, cpUserPolls = Any False | ||
} | ||
{-# INLINE mempty #-} | ||
mappend = (Sem.<>) | ||
|
@@ -123,3 +127,13 @@ queuedMaxMessagesKBytes :: Int -> ConsumerProperties | |
queuedMaxMessagesKBytes kBytes = | ||
extraProp "queued.max.messages.kbytes" (Text.pack $ show kBytes) | ||
{-# INLINE queuedMaxMessagesKBytes #-} | ||
|
||
-- | The user will poll the consumer frequently to handle both new | ||
-- messages and rebalance events. | ||
-- | ||
-- By default hw-kafka-client handles polling rebalance events for you | ||
-- in a background thread, with this property set you can simplify | ||
-- hw-kafka-client's footprint and have full control over when polling | ||
-- happens at the cost of having to manage this yourself. | ||
userPolls :: ConsumerProperties | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The name Should we instead do something like:
It looks a bit more explicit to me, do you agree? |
||
userPolls = mempty { cpUserPolls = Any True } |
Uh oh!
There was an error while loading. Please reload this page.