Skip to content

Commit 7db9ea0

Browse files
authored
Merge pull request #121 from felixmulder/feature/add-message-specific-callbacks
Implement support for callbacks on individual messages
2 parents 0b9c539 + f81bec4 commit 7db9ea0

File tree

6 files changed

+118
-25
lines changed

6 files changed

+118
-25
lines changed

src/Kafka/Internal/RdKafka.chs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ data RdKafkaMessageT = RdKafkaMessageT
147147
, offset'RdKafkaMessageT :: Int64
148148
, payload'RdKafkaMessageT :: Word8Ptr
149149
, key'RdKafkaMessageT :: Word8Ptr
150+
, opaque'RdKafkaMessageT :: Ptr ()
150151
}
151152
deriving (Show, Eq)
152153

@@ -162,6 +163,7 @@ instance Storable RdKafkaMessageT where
162163
<*> liftM fromIntegral ({#get rd_kafka_message_t->offset #} p)
163164
<*> liftM castPtr ({#get rd_kafka_message_t->payload #} p)
164165
<*> liftM castPtr ({#get rd_kafka_message_t->key #} p)
166+
<*> liftM castPtr ({#get rd_kafka_message_t->_private #} p)
165167
poke p x = do
166168
{#set rd_kafka_message_t.err#} p (enumToCInt $ err'RdKafkaMessageT x)
167169
{#set rd_kafka_message_t.rkt#} p (castPtr $ topic'RdKafkaMessageT x)
@@ -171,6 +173,7 @@ instance Storable RdKafkaMessageT where
171173
{#set rd_kafka_message_t.offset#} p (fromIntegral $ offset'RdKafkaMessageT x)
172174
{#set rd_kafka_message_t.payload#} p (castPtr $ payload'RdKafkaMessageT x)
173175
{#set rd_kafka_message_t.key#} p (castPtr $ key'RdKafkaMessageT x)
176+
{#set rd_kafka_message_t._private#} p (castPtr $ opaque'RdKafkaMessageT x)
174177

175178
{#pointer *rd_kafka_message_t as RdKafkaMessageTPtr foreign -> RdKafkaMessageT #}
176179

@@ -893,7 +896,7 @@ rdKafkaConsumeStop topicPtr partition = do
893896

894897
{#fun rd_kafka_produce as ^
895898
{`RdKafkaTopicTPtr', cIntConv `CInt32T', `Int', castPtr `Word8Ptr',
896-
cIntConv `CSize', castPtr `Word8Ptr', cIntConv `CSize', castPtr `Word8Ptr'}
899+
cIntConv `CSize', castPtr `Word8Ptr', cIntConv `CSize', castPtr `Ptr ()'}
897900
-> `Int' #}
898901

899902
{#fun rd_kafka_produce_batch as ^

src/Kafka/Producer.hs

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
{-# LANGUAGE TupleSections #-}
1+
{-# LANGUAGE TupleSections #-}
2+
{-# LANGUAGE LambdaCase #-}
23
module Kafka.Producer
34
( module X
45
, runProducer
56
, newProducer
67
, produceMessage, produceMessageBatch
8+
, produceMessage'
79
, flushProducer
810
, closeProducer
911
, KafkaProducer
@@ -25,11 +27,12 @@ import Foreign.ForeignPtr (newForeignPtr_, withForeignPtr)
2527
import Foreign.Marshal.Array (withArrayLen)
2628
import Foreign.Ptr (Ptr, nullPtr, plusPtr)
2729
import Foreign.Storable (Storable (..))
30+
import Foreign.StablePtr (newStablePtr, castStablePtrToPtr)
2831
import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaRespErrT (..), RdKafkaTypeT (..), destroyUnmanagedRdKafkaTopic, newRdKafkaT, newUnmanagedRdKafkaTopicT, rdKafkaOutqLen, rdKafkaProduce, rdKafkaProduceBatch, rdKafkaSetLogLevel)
2932
import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), kafkaConf, topicConf)
3033
import Kafka.Internal.Shared (pollEvents)
31-
import Kafka.Producer.Convert (copyMsgFlags, handleProduceErr, producePartitionCInt, producePartitionInt)
32-
import Kafka.Producer.Types (KafkaProducer (..))
34+
import Kafka.Producer.Convert (copyMsgFlags, handleProduceErr', producePartitionCInt, producePartitionInt)
35+
import Kafka.Producer.Types (KafkaProducer (..), ImmediateError(..))
3336

3437
import Kafka.Producer.ProducerProperties as X
3538
import Kafka.Producer.Types as X hiding (KafkaProducer)
@@ -60,6 +63,9 @@ newProducer pps = liftIO $ do
6063
kc@(KafkaConf kc' _ _) <- kafkaConf (KafkaProps $ (ppKafkaProps pps))
6164
tc <- topicConf (TopicProps $ (ppTopicProps pps))
6265

66+
-- add default delivery report callback
67+
deliveryCallback (const mempty) kc
68+
6369
-- set callbacks
6470
forM_ (ppCallbacks pps) (\setCb -> setCb kc)
6571

@@ -78,23 +84,51 @@ produceMessage :: MonadIO m
7884
=> KafkaProducer
7985
-> ProducerRecord
8086
-> m (Maybe KafkaError)
81-
produceMessage kp@(KafkaProducer (Kafka k) _ (TopicConf tc)) m = liftIO $ do
82-
pollEvents kp (Just $ Timeout 0) -- fire callbacks if any exist (handle delivery reports)
83-
bracket (mkTopic $ prTopic m) clTopic withTopic
84-
where
85-
mkTopic (TopicName tn) = newUnmanagedRdKafkaTopicT k (Text.unpack tn) (Just tc)
86-
87-
clTopic = either (return . const ()) destroyUnmanagedRdKafkaTopic
88-
89-
withTopic (Left err) = return . Just . KafkaError $ Text.pack err
90-
withTopic (Right t) =
91-
withBS (prValue m) $ \payloadPtr payloadLength ->
92-
withBS (prKey m) $ \keyPtr keyLength ->
93-
handleProduceErr =<<
94-
rdKafkaProduce t (producePartitionCInt (prPartition m))
95-
copyMsgFlags payloadPtr (fromIntegral payloadLength)
96-
keyPtr (fromIntegral keyLength) nullPtr
97-
87+
produceMessage kp m = produceMessage' kp m (pure . mempty) >>= adjustRes
88+
where
89+
adjustRes = \case
90+
Right () -> pure Nothing
91+
Left (ImmediateError err) -> pure (Just err)
92+
93+
-- | Sends a single message with a registered callback.
94+
--
95+
-- The callback can be a long running process, as it is forked by the thread
96+
-- that handles the delivery reports.
97+
--
98+
produceMessage' :: MonadIO m
99+
=> KafkaProducer
100+
-> ProducerRecord
101+
-> (DeliveryReport -> IO ())
102+
-> m (Either ImmediateError ())
103+
produceMessage' kp@(KafkaProducer (Kafka k) _ (TopicConf tc)) msg cb = liftIO $
104+
fireCallbacks >> bracket (mkTopic . prTopic $ msg) closeTopic withTopic
105+
where
106+
fireCallbacks =
107+
pollEvents kp . Just . Timeout $ 0
108+
109+
mkTopic (TopicName tn) =
110+
newUnmanagedRdKafkaTopicT k (Text.unpack tn) (Just tc)
111+
112+
closeTopic = either mempty destroyUnmanagedRdKafkaTopic
113+
114+
withTopic (Left err) = return . Left . ImmediateError . KafkaError . Text.pack $ err
115+
withTopic (Right topic) =
116+
withBS (prValue msg) $ \payloadPtr payloadLength ->
117+
withBS (prKey msg) $ \keyPtr keyLength -> do
118+
callbackPtr <- newStablePtr cb
119+
res <- handleProduceErr' =<< rdKafkaProduce
120+
topic
121+
(producePartitionCInt (prPartition msg))
122+
copyMsgFlags
123+
payloadPtr
124+
(fromIntegral payloadLength)
125+
keyPtr
126+
(fromIntegral keyLength)
127+
(castStablePtrToPtr callbackPtr)
128+
129+
pure $ case res of
130+
Left err -> Left . ImmediateError $ err
131+
Right () -> Right ()
98132

99133
-- | Sends a batch of messages.
100134
-- Returns a list of messages which it was unable to send with corresponding errors.
@@ -146,6 +180,7 @@ produceMessageBatch kp@(KafkaProducer (Kafka k) _ (TopicConf tc)) messages = lif
146180
, offset'RdKafkaMessageT = 0
147181
, keyLen'RdKafkaMessageT = keyLength
148182
, key'RdKafkaMessageT = keyPtr
183+
, opaque'RdKafkaMessageT = nullPtr
149184
}
150185

151186
-- | Closes the producer.

src/Kafka/Producer/Callbacks.hs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
1+
{-# LANGUAGE TypeApplications #-}
12
module Kafka.Producer.Callbacks
23
( deliveryCallback
34
, module X
45
)
56
where
67

8+
import Control.Monad (void)
9+
import Control.Concurrent (forkIO)
710
import Foreign.C.Error (getErrno)
811
import Foreign.Ptr (Ptr, nullPtr)
912
import Foreign.Storable (Storable(peek))
13+
import Foreign.StablePtr (castPtrToStablePtr, deRefStablePtr)
1014
import Kafka.Callbacks as X
1115
import Kafka.Consumer.Types (Offset(..))
1216
import Kafka.Internal.RdKafka (RdKafkaMessageT(..), RdKafkaRespErrT(..), rdKafkaConfSetDrMsgCb)
@@ -16,6 +20,12 @@ import Kafka.Producer.Types (ProducerRecord(..), DeliveryReport(..),
1620
import Kafka.Types (KafkaError(..), TopicName(..))
1721

1822
-- | Sets the callback for delivery reports.
23+
--
24+
-- /Note: A callback should not be a long-running process as it blocks
25+
-- librdkafka from continuing on the thread that handles the delivery
26+
-- callbacks. For callbacks to individual messsages see
27+
-- 'Kafka.Producer.produceMessage\''./
28+
--
1929
deliveryCallback :: (DeliveryReport -> IO ()) -> KafkaConf -> IO ()
2030
deliveryCallback callback kc = rdKafkaConfSetDrMsgCb (getRdKafkaConf kc) realCb
2131
where
@@ -25,9 +35,20 @@ deliveryCallback callback kc = rdKafkaConfSetDrMsgCb (getRdKafkaConf kc) realCb
2535
then getErrno >>= (callback . NoMessageError . kafkaRespErr)
2636
else do
2737
s <- peek mptr
38+
let cbPtr = opaque'RdKafkaMessageT s
2839
if err'RdKafkaMessageT s /= RdKafkaRespErrNoError
29-
then mkErrorReport s >>= callback
30-
else mkSuccessReport s >>= callback
40+
then mkErrorReport s >>= callbacks cbPtr
41+
else mkSuccessReport s >>= callbacks cbPtr
42+
43+
callbacks cbPtr rep = do
44+
callback rep
45+
if cbPtr == nullPtr then
46+
pure ()
47+
else do
48+
msgCb <- deRefStablePtr @(DeliveryReport -> IO ()) $ castPtrToStablePtr $ cbPtr
49+
-- Here we fork the callback since it might be a longer action and
50+
-- blocking here would block librdkafka from continuing its execution
51+
void . forkIO $ msgCb rep
3152

3253
mkErrorReport :: RdKafkaMessageT -> IO DeliveryReport
3354
mkErrorReport msg = do

src/Kafka/Producer/Convert.hs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module Kafka.Producer.Convert
33
, producePartitionInt
44
, producePartitionCInt
55
, handleProduceErr
6+
, handleProduceErr'
67
)
78
where
89

@@ -31,3 +32,9 @@ handleProduceErr (- 1) = (Just . kafkaRespErr) <$> getErrno
3132
handleProduceErr 0 = return Nothing
3233
handleProduceErr _ = return $ Just KafkaInvalidReturnValue
3334
{-# INLINE handleProduceErr #-}
35+
36+
handleProduceErr' :: Int -> IO (Either KafkaError ())
37+
handleProduceErr' (- 1) = (Left . kafkaRespErr) <$> getErrno
38+
handleProduceErr' 0 = return (Right ())
39+
handleProduceErr' _ = return $ Left KafkaInvalidReturnValue
40+
{-# INLINE handleProduceErr' #-}

src/Kafka/Producer/Types.hs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1-
{-# LANGUAGE DeriveDataTypeable #-}
2-
{-# LANGUAGE DeriveGeneric #-}
1+
{-# LANGUAGE DeriveDataTypeable #-}
2+
{-# LANGUAGE DeriveGeneric #-}
3+
{-# LANGUAGE DerivingStrategies #-}
4+
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
35
module Kafka.Producer.Types
46
( KafkaProducer(..)
57
, ProducerRecord(..)
68
, ProducePartition(..)
79
, DeliveryReport(..)
10+
, ImmediateError(..)
811
)
912
where
1013

@@ -47,6 +50,10 @@ data ProducePartition =
4750
| UnassignedPartition
4851
deriving (Show, Eq, Ord, Typeable, Generic)
4952

53+
-- | Data type representing an error that is caused by pre-flight conditions not being met
54+
newtype ImmediateError = ImmediateError KafkaError
55+
deriving newtype (Eq, Show)
56+
5057
data DeliveryReport
5158
= DeliverySuccess ProducerRecord Offset
5259
| DeliveryFailure ProducerRecord KafkaError

tests-it/Kafka/IntegrationSpec.hs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
{-# LANGUAGE LambdaCase #-}
12
{-# LANGUAGE OverloadedStrings #-}
23
{-# LANGUAGE ScopedTypeVariables #-}
34

@@ -6,6 +7,7 @@ where
67

78
import Control.Monad (forM, forM_)
89
import Control.Monad.Loops
10+
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
911
import qualified Data.ByteString as BS
1012
import Data.Either
1113
import Data.Map (fromList)
@@ -113,6 +115,24 @@ spec = do
113115
res <- sendMessages (testMessages testTopic) prod
114116
res `shouldBe` Right ()
115117

118+
it "sends messages with callback to test topic" $ \prod -> do
119+
var <- newEmptyMVar
120+
let
121+
msg = ProducerRecord
122+
{ prTopic = TopicName "callback-topic"
123+
, prPartition = UnassignedPartition
124+
, prKey = Nothing
125+
, prValue = Just "test from producer"
126+
}
127+
128+
res <- produceMessage' prod msg (putMVar var)
129+
res `shouldBe` Right ()
130+
callbackRes <- flushProducer prod *> takeMVar var
131+
callbackRes `shouldSatisfy` \case
132+
DeliverySuccess _ _ -> True
133+
DeliveryFailure _ _ -> False
134+
NoMessageError _ -> False
135+
116136
specWithConsumer "Run consumer with async polling" (consumerProps <> groupId (makeGroupId "async")) runConsumerSpec
117137
specWithConsumer "Run consumer with sync polling" (consumerProps <> groupId (makeGroupId "sync") <> callbackPollMode CallbackPollModeSync) runConsumerSpec
118138

0 commit comments

Comments
 (0)