Skip to content

Commit 2a29065

Browse files
author
Philipp Hausmann
committed
Wrap callbacks in newtype to make wrong usage harder
1 parent 18be9e6 commit 2a29065

File tree

7 files changed

+36
-27
lines changed

7 files changed

+36
-27
lines changed

src/Kafka/Callbacks.hs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ module Kafka.Callbacks
22
( errorCallback
33
, logCallback
44
, statsCallback
5+
, Callback
56
)
67
where
78

89
import Kafka.Internal.RdKafka (rdKafkaConfSetErrorCb, rdKafkaConfSetLogCb, rdKafkaConfSetStatsCb)
9-
import Kafka.Internal.Setup (HasKafkaConf(..), getRdKafkaConf)
10+
import Kafka.Internal.Setup (HasKafkaConf(..), getRdKafkaConf, Callback(..))
1011
import Kafka.Types (KafkaError(..), KafkaLogLevel(..))
1112

1213
-- | Add a callback for errors.
@@ -19,10 +20,10 @@ import Kafka.Types (KafkaError(..), KafkaLogLevel(..))
1920
-- >
2021
-- > myErrorCallback :: 'KafkaError' -> String -> IO ()
2122
-- > myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> message
22-
errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO ()
23-
errorCallback callback k =
23+
errorCallback :: (KafkaError -> String -> IO ()) -> Callback
24+
errorCallback callback =
2425
let realCb _ err = callback (KafkaResponseError err)
25-
in rdKafkaConfSetErrorCb (getRdKafkaConf k) realCb
26+
in Callback $ \k -> rdKafkaConfSetErrorCb (getRdKafkaConf k) realCb
2627

2728
-- | Add a callback for logs.
2829
--
@@ -34,10 +35,10 @@ errorCallback callback k =
3435
-- >
3536
-- > myLogCallback :: 'KafkaLogLevel' -> String -> String -> IO ()
3637
-- > myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> message
37-
logCallback :: HasKafkaConf k => (KafkaLogLevel -> String -> String -> IO ()) -> k -> IO ()
38-
logCallback callback k =
38+
logCallback :: (KafkaLogLevel -> String -> String -> IO ()) -> Callback
39+
logCallback callback =
3940
let realCb _ = callback . toEnum
40-
in rdKafkaConfSetLogCb (getRdKafkaConf k) realCb
41+
in Callback $ \k -> rdKafkaConfSetLogCb (getRdKafkaConf k) realCb
4142

4243
-- | Add a callback for stats.
4344
--
@@ -49,7 +50,7 @@ logCallback callback k =
4950
-- >
5051
-- > myStatsCallback :: String -> IO ()
5152
-- > myStatsCallback stats = print $ show stats
52-
statsCallback :: HasKafkaConf k => (String -> IO ()) -> k -> IO ()
53-
statsCallback callback k =
53+
statsCallback :: (String -> IO ()) -> Callback
54+
statsCallback callback =
5455
let realCb _ = callback
55-
in rdKafkaConfSetStatsCb (getRdKafkaConf k) realCb
56+
in Callback $ \k -> rdKafkaConfSetStatsCb (getRdKafkaConf k) realCb

src/Kafka/Consumer.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ import Foreign hiding (void)
8585
import Kafka.Consumer.Convert (fromMessagePtr, fromNativeTopicPartitionList'', offsetCommitToBool, offsetToInt64, toMap, toNativeTopicPartitionList, toNativeTopicPartitionList', toNativeTopicPartitionListNoDispose, topicPartitionFromMessageForCommit)
8686
import Kafka.Consumer.Types (KafkaConsumer (..))
8787
import Kafka.Internal.RdKafka (RdKafkaRespErrT (..), RdKafkaTopicPartitionListTPtr, RdKafkaTypeT (..), newRdKafkaT, newRdKafkaTopicPartitionListT, newRdKafkaTopicT, rdKafkaAssign, rdKafkaAssignment, rdKafkaCommit, rdKafkaCommitted, rdKafkaConfSetDefaultTopicConf, rdKafkaConsumeBatchQueue, rdKafkaConsumeQueue, rdKafkaConsumerClose, rdKafkaConsumerPoll, rdKafkaOffsetsStore, rdKafkaPausePartitions, rdKafkaPollSetConsumer, rdKafkaPosition, rdKafkaQueueDestroy, rdKafkaQueueNew, rdKafkaResumePartitions, rdKafkaSeek, rdKafkaSetLogLevel, rdKafkaSubscribe, rdKafkaSubscription, rdKafkaTopicConfDup, rdKafkaTopicPartitionListAdd)
88-
import Kafka.Internal.Setup (CallbackPollStatus (..), Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), getKafkaConf, getRdKafka, kafkaConf, topicConf)
88+
import Kafka.Internal.Setup (CallbackPollStatus (..), Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), getKafkaConf, getRdKafka, kafkaConf, topicConf, Callback(..))
8989
import Kafka.Internal.Shared (kafkaErrorToMaybe, maybeToLeft, rdKafkaErrorToEither)
9090

9191
import Kafka.Consumer.ConsumerProperties as X
@@ -327,7 +327,7 @@ closeConsumer (KafkaConsumer (Kafka k) (KafkaConf _ qr statusVar)) = liftIO $
327327
newConsumerConf :: ConsumerProperties -> IO KafkaConf
328328
newConsumerConf ConsumerProperties {cpProps = m, cpCallbacks = cbs} = do
329329
conf <- kafkaConf (KafkaProps m)
330-
forM_ cbs (\setCb -> setCb conf)
330+
forM_ cbs (\(Callback setCb) -> setCb conf)
331331
return conf
332332

333333
-- | Subscribes to a given list of topics.

src/Kafka/Consumer/Callbacks.hs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,17 @@ import Kafka.Callbacks as X
1414
import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'')
1515
import Kafka.Consumer.Types (KafkaConsumer (..), RebalanceEvent (..), TopicPartition (..))
1616
import Kafka.Internal.RdKafka
17-
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue)
17+
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue, Callback (..))
1818
import Kafka.Types (KafkaError (..), PartitionId (..), TopicName (..))
1919

2020
import qualified Data.Text as Text
2121

2222
-- | Sets a callback that is called when rebalance is needed.
23-
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO ()
24-
rebalanceCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetRebalanceCb conf realCb
23+
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback
24+
rebalanceCallback callback =
25+
Callback $ \kc@(KafkaConf con _ _) -> rdKafkaConfSetRebalanceCb con (realCb kc)
2526
where
26-
realCb k err pl = do
27+
realCb kc k err pl = do
2728
k' <- newForeignPtr_ k
2829
pls <- newForeignPtr_ pl
2930
setRebalanceCallback callback (KafkaConsumer (Kafka k') kc) (KafkaResponseError err) pls
@@ -36,10 +37,11 @@ rebalanceCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetRebalanceCb c
3637
-- If no partitions had valid offsets to commit this callback will be called
3738
-- with 'KafkaResponseError' 'RdKafkaRespErrNoOffset' which is not to be considered
3839
-- an error.
39-
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO ()
40-
offsetCommitCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetOffsetCommitCb conf realCb
40+
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> Callback
41+
offsetCommitCallback callback =
42+
Callback $ \kc@(KafkaConf conf _ _) -> rdKafkaConfSetOffsetCommitCb conf (realCb kc)
4143
where
42-
realCb k err pl = do
44+
realCb kc k err pl = do
4345
k' <- newForeignPtr_ k
4446
pls <- fromNativeTopicPartitionList' pl
4547
callback (KafkaConsumer (Kafka k') kc) (KafkaResponseError err) pls

src/Kafka/Consumer/ConsumerProperties.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import Data.Semigroup as Sem
3333
import Data.Text (Text)
3434
import qualified Data.Text as Text
3535
import Kafka.Consumer.Types (ConsumerGroupId (..))
36-
import Kafka.Internal.Setup (KafkaConf (..))
36+
import Kafka.Internal.Setup (KafkaConf (..), Callback(..))
3737
import Kafka.Types (BrokerAddress (..), ClientId (..), KafkaCompressionCodec (..), KafkaDebug (..), KafkaLogLevel (..), Millis (..), kafkaCompressionCodecToText, kafkaDebugToText)
3838

3939
import Kafka.Consumer.Callbacks as X
@@ -52,7 +52,7 @@ data CallbackPollMode =
5252
data ConsumerProperties = ConsumerProperties
5353
{ cpProps :: Map Text Text
5454
, cpLogLevel :: Maybe KafkaLogLevel
55-
, cpCallbacks :: [KafkaConf -> IO ()]
55+
, cpCallbacks :: [Callback]
5656
, cpCallbackPollMode :: CallbackPollMode
5757
}
5858

@@ -116,7 +116,7 @@ clientId (ClientId cid) =
116116
-- * 'errorCallback'
117117
-- * 'logCallback'
118118
-- * 'statsCallback'
119-
setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties
119+
setCallback :: Callback -> ConsumerProperties
120120
setCallback cb = mempty { cpCallbacks = [cb] }
121121

122122
-- | Set the logging level.

src/Kafka/Internal/Setup.hs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ module Kafka.Internal.Setup
77
, HasKafka(..)
88
, HasKafkaConf(..)
99
, HasTopicConf(..)
10+
, Callback(..)
1011
, CallbackPollStatus(..)
1112
, getRdKafka
1213
, getRdKafkaConf
@@ -46,6 +47,11 @@ newtype TopicProps = TopicProps (Map Text Text) deriving (Show, Eq)
4647
newtype Kafka = Kafka RdKafkaTPtr deriving Show
4748
newtype TopicConf = TopicConf RdKafkaTopicConfTPtr deriving Show
4849

50+
-- | Callbacks allow retrieving various information like error occurences, statistics
51+
-- and log messages.
52+
-- See `Kafka.Consumer.setCallback` (Consumer) and `Kafka.Producer.setCallback` (Producer) for more details.
53+
newtype Callback = Callback (KafkaConf -> IO ())
54+
4955
data CallbackPollStatus = CallbackPollEnabled | CallbackPollDisabled deriving (Show, Eq)
5056

5157
data KafkaConf = KafkaConf

src/Kafka/Producer.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ import Foreign.Ptr (Ptr, nullPtr, plusPtr)
8282
import Foreign.Storable (Storable (..))
8383
import Foreign.StablePtr (newStablePtr, castStablePtrToPtr)
8484
import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaRespErrT (..), RdKafkaTypeT (..), destroyUnmanagedRdKafkaTopic, newRdKafkaT, newUnmanagedRdKafkaTopicT, rdKafkaOutqLen, rdKafkaProduce, rdKafkaProduceBatch, rdKafkaSetLogLevel)
85-
import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), kafkaConf, topicConf)
85+
import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), kafkaConf, topicConf, Callback(..))
8686
import Kafka.Internal.Shared (pollEvents)
8787
import Kafka.Producer.Convert (copyMsgFlags, handleProduceErr', producePartitionCInt, producePartitionInt)
8888
import Kafka.Producer.Types (KafkaProducer (..), ImmediateError(..))
@@ -120,7 +120,7 @@ newProducer pps = liftIO $ do
120120
deliveryCallback (const mempty) kc
121121

122122
-- set callbacks
123-
forM_ (ppCallbacks pps) (\setCb -> setCb kc)
123+
forM_ (ppCallbacks pps) (\(Callback setCb) -> setCb kc)
124124

125125
mbKafka <- newRdKafkaT RdKafkaProducer kc'
126126
case mbKafka of

src/Kafka/Producer/ProducerProperties.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import Control.Monad (MonadPlus(mplus))
2626
import Data.Map (Map)
2727
import qualified Data.Map as M
2828
import Data.Semigroup as Sem
29-
import Kafka.Internal.Setup (KafkaConf(..))
29+
import Kafka.Internal.Setup (KafkaConf(..), Callback(..))
3030
import Kafka.Types (KafkaDebug(..), Timeout(..), KafkaCompressionCodec(..), KafkaLogLevel(..), BrokerAddress(..), kafkaDebugToText, kafkaCompressionCodecToText)
3131

3232
import Kafka.Producer.Callbacks
@@ -36,7 +36,7 @@ data ProducerProperties = ProducerProperties
3636
{ ppKafkaProps :: Map Text Text
3737
, ppTopicProps :: Map Text Text
3838
, ppLogLevel :: Maybe KafkaLogLevel
39-
, ppCallbacks :: [KafkaConf -> IO ()]
39+
, ppCallbacks :: [Callback]
4040
}
4141

4242
instance Sem.Semigroup ProducerProperties where
@@ -69,7 +69,7 @@ brokersList bs =
6969
-- * 'errorCallback'
7070
-- * 'logCallback'
7171
-- * 'statsCallback'
72-
setCallback :: (KafkaConf -> IO ()) -> ProducerProperties
72+
setCallback :: Callback -> ProducerProperties
7373
setCallback cb = mempty { ppCallbacks = [cb] }
7474

7575
-- | Sets the logging level.

0 commit comments

Comments
 (0)