@@ -8,11 +8,12 @@ import qualified Data.Text as Text
8
8
import Control.Monad (liftM )
9
9
import Data.Int (Int32 , Int64 )
10
10
import Data.Word (Word8 )
11
+ import Foreign.Concurrent (newForeignPtr )
11
12
import Foreign.Marshal.Alloc (alloca , allocaBytes )
12
13
import Foreign.Marshal.Array (peekArray , allocaArray )
13
14
import Foreign.Storable (Storable (.. ))
14
15
import Foreign.Ptr (Ptr , FunPtr , castPtr , nullPtr )
15
- import Foreign.ForeignPtr (FinalizerPtr , addForeignPtrFinalizer , withForeignPtr , newForeignPtr , newForeignPtr_ )
16
+ import Foreign.ForeignPtr (FinalizerPtr , addForeignPtrFinalizer , newForeignPtr_ , withForeignPtr )
16
17
import Foreign.C.Error (Errno (.. ), getErrno )
17
18
import Foreign.C.String (CString , newCString , withCAString , peekCAString , peekCAStringLen , peekCString )
18
19
import Foreign.C.Types (CFile , CInt (.. ), CSize , CChar )
@@ -267,12 +268,14 @@ instance Storable RdKafkaMetadataT where
267
268
{`Int' } -> `RdKafkaTopicPartitionListTPtr' # }
268
269
269
270
foreign import ccall unsafe " rdkafka.h &rd_kafka_topic_partition_list_destroy"
270
- rdKafkaTopicPartitionListDestroy :: FinalizerPtr RdKafkaTopicPartitionListT
271
+ rdKafkaTopicPartitionListDestroyF :: FinalizerPtr RdKafkaTopicPartitionListT
272
+ foreign import ccall unsafe " rdkafka.h rd_kafka_topic_partition_list_destroy"
273
+ rdKafkaTopicPartitionListDestroy :: Ptr RdKafkaTopicPartitionListT -> IO ()
271
274
272
275
newRdKafkaTopicPartitionListT :: Int -> IO RdKafkaTopicPartitionListTPtr
273
276
newRdKafkaTopicPartitionListT size = do
274
277
ret <- rdKafkaTopicPartitionListNew size
275
- addForeignPtrFinalizer rdKafkaTopicPartitionListDestroy ret
278
+ addForeignPtrFinalizer rdKafkaTopicPartitionListDestroyF ret
276
279
return ret
277
280
278
281
{# fun rd_kafka_topic_partition_list_add as ^
@@ -287,7 +290,7 @@ newRdKafkaTopicPartitionListT size = do
287
290
copyRdKafkaTopicPartitionList :: RdKafkaTopicPartitionListTPtr -> IO RdKafkaTopicPartitionListTPtr
288
291
copyRdKafkaTopicPartitionList pl = do
289
292
cp <- rdKafkaTopicPartitionListCopy pl
290
- addForeignPtrFinalizer rdKafkaTopicPartitionListDestroy cp
293
+ addForeignPtrFinalizer rdKafkaTopicPartitionListDestroyF cp
291
294
return cp
292
295
293
296
{# fun rd_kafka_topic_partition_list_set_offset as ^
@@ -560,7 +563,7 @@ rdKafkaConsumeBatchQueue :: RdKafkaQueueTPtr -> Int -> Int -> IO [RdKafkaMessage
560
563
rdKafkaConsumeBatchQueue qptr timeout batchSize = do
561
564
allocaArray batchSize $ \ pArr -> do
562
565
rSize <- rdKafkaConsumeBatchQueue' qptr timeout pArr (fromIntegral batchSize)
563
- peekArray (fromIntegral rSize) pArr >>= traverse newForeignPtr_
566
+ peekArray (fromIntegral rSize) pArr >>= traverse ( flip newForeignPtr ( return () ))
564
567
565
568
-------------------------------------------------------------------------------------------------
566
569
---- High-level KafkaConsumer
@@ -582,7 +585,7 @@ rdKafkaSubscription k = do
582
585
(err, sub) <- rdKafkaSubscription' k
583
586
case err of
584
587
RdKafkaRespErrNoError ->
585
- Right <$> newForeignPtr rdKafkaTopicPartitionListDestroy sub
588
+ Right <$> newForeignPtr sub ( rdKafkaTopicPartitionListDestroy sub)
586
589
e -> return (Left e)
587
590
588
591
{# fun rd_kafka_consumer_poll as ^
@@ -614,7 +617,7 @@ rdKafkaAssignment k = do
614
617
(err, ass) <- rdKafkaAssignment' k
615
618
case err of
616
619
RdKafkaRespErrNoError ->
617
- Right <$> newForeignPtr rdKafkaTopicPartitionListDestroy ass
620
+ Right <$> newForeignPtr ass ( rdKafkaTopicPartitionListDestroy ass)
618
621
e -> return (Left e)
619
622
620
623
{# fun rd_kafka_commit as ^
@@ -721,7 +724,10 @@ instance Storable RdKafkaGroupListT where
721
724
-> `RdKafkaRespErrT' cIntToEnum # }
722
725
723
726
foreign import ccall " rdkafka.h &rd_kafka_group_list_destroy"
724
- rdKafkaGroupListDestroy :: FinalizerPtr RdKafkaGroupListT
727
+ rdKafkaGroupListDestroyF :: FinalizerPtr RdKafkaGroupListT
728
+
729
+ foreign import ccall " rdkafka.h rd_kafka_group_list_destroy"
730
+ rdKafkaGroupListDestroy :: Ptr RdKafkaGroupListT -> IO ()
725
731
726
732
rdKafkaListGroups :: RdKafkaTPtr -> Maybe String -> Int -> IO (Either RdKafkaRespErrT RdKafkaGroupListTPtr )
727
733
rdKafkaListGroups k g t = case g of
@@ -731,7 +737,7 @@ rdKafkaListGroups k g t = case g of
731
737
listGroups grp = do
732
738
(err, res) <- rdKafkaListGroups' k grp t
733
739
case err of
734
- RdKafkaRespErrNoError -> Right <$> newForeignPtr rdKafkaGroupListDestroy res
740
+ RdKafkaRespErrNoError -> Right <$> newForeignPtr res ( rdKafkaGroupListDestroy res)
735
741
e -> return $ Left e
736
742
-------------------------------------------------------------------------------------------------
737
743
@@ -910,15 +916,15 @@ rdKafkaConsumeStop topicPtr partition = do
910
916
alloca- `Ptr RdKafkaMetadataT' peekPtr* , `Int' }
911
917
-> `RdKafkaRespErrT' cIntToEnum # }
912
918
913
- foreign import ccall unsafe " rdkafka.h & rd_kafka_metadata_destroy"
914
- rdKafkaMetadataDestroy :: FinalizerPtr RdKafkaMetadataT
919
+ foreign import ccall unsafe " rdkafka.h rd_kafka_metadata_destroy"
920
+ rdKafkaMetadataDestroy :: Ptr RdKafkaMetadataT -> IO ()
915
921
916
922
rdKafkaMetadata :: RdKafkaTPtr -> Bool -> Maybe RdKafkaTopicTPtr -> Int -> IO (Either RdKafkaRespErrT RdKafkaMetadataTPtr )
917
923
rdKafkaMetadata k allTopics mt timeout = do
918
924
tptr <- maybe (newForeignPtr_ nullPtr) pure mt
919
925
(err, res) <- rdKafkaMetadata' k allTopics tptr timeout
920
926
case err of
921
- RdKafkaRespErrNoError -> Right <$> newForeignPtr rdKafkaMetadataDestroy res
927
+ RdKafkaRespErrNoError -> Right <$> newForeignPtr res ( rdKafkaMetadataDestroy res)
922
928
e -> return (Left e)
923
929
924
930
{# fun rd_kafka_poll as ^
0 commit comments