Skip to content

Commit fb405a1

Browse files
authored
Merge pull request #124 from felixmulder/sync-example
Add example of synchronous production of messages to examples and README
2 parents ec707c3 + 347c300 commit fb405a1

File tree

2 files changed

+88
-5
lines changed

2 files changed

+88
-5
lines changed

README.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,56 @@ mkMessage k v = ProducerRecord
165165
}
166166
```
167167

168+
### Synchronous sending of messages
169+
Because of the asynchronous nature of librdkafka. It there is no API to provide
170+
synchronous production of messages. It is, however, possible to combine the
171+
delivery reports feature with that of callbacks. This can be done using the
172+
`Kafka.Producer.produceMessage'` function.
173+
174+
```haskell
175+
produceMessage' :: MonadIO m
176+
=> KafkaProducer
177+
-> ProducerRecord
178+
-> (DeliveryReport -> IO ())
179+
-> m (Either ImmediateError ())
180+
```
181+
182+
Using this function, you can provide a callback which will be invoked upon the
183+
produced message's delivery report. With a little help of `MVar`s or similar,
184+
you can in fact, create a synchronous-like interface.
185+
186+
```haskell
187+
sendMessageSync :: MonadIO m
188+
=> KafkaProducer
189+
-> ProducerRecord
190+
-> m (Either KafkaError Offset)
191+
sendMessageSync producer record = liftIO $ do
192+
-- Create an empty MVar:
193+
var <- newEmptyMVar
194+
195+
-- Produce the message and use the callback to put the delivery report in the
196+
-- MVar:
197+
res <- produceMessage' producer record (putMVar var)
198+
199+
case res of
200+
Left (ImmediateError err) ->
201+
pure (Left err)
202+
Right () -> do
203+
-- Flush producer queue to make sure you don't get stuck waiting for the
204+
-- message to send:
205+
flushProducer producer
206+
207+
-- Wait for the message's delivery report and map accordingly:
208+
takeMVar var >>= return . \case
209+
DeliverySuccess _ offset -> Right offset
210+
DeliveryFailure _ err -> Left err
211+
NoMessageError err -> Left err
212+
```
213+
214+
_Note:_ this is a semi-naive solution as this waits forever (or until
215+
librdkafka times out). You should make sure that your configuration reflects
216+
the behavior you want out of this functionality.
217+
168218
# Installation
169219

170220
## Installing librdkafka

example/ProducerExample.hs

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
{-# LANGUAGE OverloadedStrings #-}
2+
{-# LANGUAGE LambdaCase #-}
23

34
module ProducerExample
45
where
56

6-
import Control.Exception (bracket)
7-
import Control.Monad (forM_)
8-
import Data.ByteString (ByteString)
9-
import Data.ByteString.Char8 (pack)
7+
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
8+
import Control.Exception (bracket)
9+
import Control.Monad (forM_)
10+
import Control.Monad.IO.Class (MonadIO(..))
11+
import Data.ByteString (ByteString)
12+
import Data.ByteString.Char8 (pack)
1013
import Data.Monoid
14+
import Kafka.Consumer (Offset)
1115
import Kafka.Producer
12-
import Data.Text (Text)
16+
import Data.Text (Text)
1317

1418
-- Global producer properties
1519
producerProps :: ProducerProperties
@@ -69,3 +73,32 @@ sendMessages prod = do
6973

7074
putStrLn "Thank you."
7175
return $ Right ()
76+
77+
-- | An example for sending messages synchronously using the 'produceMessage''
78+
-- function
79+
--
80+
sendMessageSync :: MonadIO m
81+
=> KafkaProducer
82+
-> ProducerRecord
83+
-> m (Either KafkaError Offset)
84+
sendMessageSync producer record = liftIO $ do
85+
-- Create an empty MVar:
86+
var <- newEmptyMVar
87+
88+
-- Produce the message and use the callback to put the delivery report in the
89+
-- MVar:
90+
res <- produceMessage' producer record (putMVar var)
91+
92+
case res of
93+
Left (ImmediateError err) ->
94+
pure (Left err)
95+
Right () -> do
96+
-- Flush producer queue to make sure you don't get stuck waiting for the
97+
-- message to send:
98+
flushProducer producer
99+
100+
-- Wait for the message's delivery report and map accordingly:
101+
takeMVar var >>= return . \case
102+
DeliverySuccess _ offset -> Right offset
103+
DeliveryFailure _ err -> Left err
104+
NoMessageError err -> Left err

0 commit comments

Comments
 (0)