File tree Expand file tree Collapse file tree 1 file changed +4
-3
lines changed Expand file tree Collapse file tree 1 file changed +4
-3
lines changed Original file line number Diff line number Diff line change @@ -6,11 +6,12 @@ module Kafka.Producer.Callbacks
6
6
where
7
7
8
8
import Control.Monad (void )
9
+ import Control.Exception (bracket )
9
10
import Control.Concurrent (forkIO )
10
11
import Foreign.C.Error (getErrno )
11
12
import Foreign.Ptr (Ptr , nullPtr )
12
13
import Foreign.Storable (Storable (peek ))
13
- import Foreign.StablePtr (castPtrToStablePtr , deRefStablePtr )
14
+ import Foreign.StablePtr (castPtrToStablePtr , deRefStablePtr , freeStablePtr )
14
15
import Kafka.Callbacks as X
15
16
import Kafka.Consumer.Types (Offset (.. ))
16
17
import Kafka.Internal.RdKafka (RdKafkaMessageT (.. ), RdKafkaRespErrT (.. ), rdKafkaConfSetDrMsgCb )
@@ -44,8 +45,8 @@ deliveryCallback callback kc = rdKafkaConfSetDrMsgCb (getRdKafkaConf kc) realCb
44
45
callback rep
45
46
if cbPtr == nullPtr then
46
47
pure ()
47
- else do
48
- msgCb <- deRefStablePtr @ (DeliveryReport -> IO () ) $ castPtrToStablePtr $ cbPtr
48
+ else bracket ( pure $ castPtrToStablePtr cbPtr) freeStablePtr $ \ stablePtr -> do
49
+ msgCb <- deRefStablePtr @ (DeliveryReport -> IO () ) stablePtr
49
50
-- Here we fork the callback since it might be a longer action and
50
51
-- blocking here would block librdkafka from continuing its execution
51
52
void . forkIO $ msgCb rep
You can’t perform that action at this time.
0 commit comments