Skip to content

Commit 936cf86

Browse files
authored
Merge pull request #168 from phile314/fix_delivery_cb
Fix delivery callback (fallout from #161)
2 parents e17af27 + 817bc96 commit 936cf86

File tree

2 files changed

+5
-4
lines changed

2 files changed

+5
-4
lines changed

src/Kafka/Producer.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ newProducer pps = liftIO $ do
117117
tc <- topicConf (TopicProps $ (ppTopicProps pps))
118118

119119
-- add default delivery report callback
120-
deliveryCallback (const mempty) kc
120+
let Callback setDeliveryCallback = deliveryCallback (const mempty)
121+
setDeliveryCallback kc
121122

122123
-- set callbacks
123124
forM_ (ppCallbacks pps) (\(Callback setCb) -> setCb kc)

src/Kafka/Producer/Callbacks.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import Foreign.StablePtr (castPtrToStablePtr, deRefStablePtr, fre
1515
import Kafka.Callbacks as X
1616
import Kafka.Consumer.Types (Offset(..))
1717
import Kafka.Internal.RdKafka (RdKafkaMessageT(..), RdKafkaRespErrT(..), rdKafkaConfSetDrMsgCb)
18-
import Kafka.Internal.Setup (KafkaConf(..), getRdKafkaConf)
18+
import Kafka.Internal.Setup (KafkaConf(..), getRdKafkaConf, Callback(..))
1919
import Kafka.Internal.Shared (kafkaRespErr, readTopic, readKey, readPayload)
2020
import Kafka.Producer.Types (ProducerRecord(..), DeliveryReport(..), ProducePartition(..))
2121
import Kafka.Types (KafkaError(..), TopicName(..))
@@ -27,8 +27,8 @@ import Kafka.Types (KafkaError(..), TopicName(..))
2727
-- callbacks. For callbacks to individual messsages see
2828
-- 'Kafka.Producer.produceMessage\''./
2929
--
30-
deliveryCallback :: (DeliveryReport -> IO ()) -> KafkaConf -> IO ()
31-
deliveryCallback callback kc = rdKafkaConfSetDrMsgCb (getRdKafkaConf kc) realCb
30+
deliveryCallback :: (DeliveryReport -> IO ()) -> Callback
31+
deliveryCallback callback = Callback $ \kc -> rdKafkaConfSetDrMsgCb (getRdKafkaConf kc) realCb
3232
where
3333
realCb :: t -> Ptr RdKafkaMessageT -> IO ()
3434
realCb _ mptr =

0 commit comments

Comments
 (0)