Skip to content

Use Foreign.Concurrent.newForeignPtr where applicable #146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions src/Kafka/Internal/RdKafka.chs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import qualified Data.Text as Text
import Control.Monad (liftM)
import Data.Int (Int32, Int64)
import Data.Word (Word8)
import Foreign.Concurrent (newForeignPtr)
import Foreign.Marshal.Alloc (alloca, allocaBytes)
import Foreign.Marshal.Array (peekArray, allocaArray)
import Foreign.Storable (Storable(..))
import Foreign.Ptr (Ptr, FunPtr, castPtr, nullPtr)
import Foreign.ForeignPtr (FinalizerPtr, addForeignPtrFinalizer, withForeignPtr, newForeignPtr, newForeignPtr_)
import Foreign.ForeignPtr (FinalizerPtr, addForeignPtrFinalizer, newForeignPtr_, withForeignPtr)
import Foreign.C.Error (Errno(..), getErrno)
import Foreign.C.String (CString, newCString, withCAString, peekCAString, peekCAStringLen, peekCString)
import Foreign.C.Types (CFile, CInt(..), CSize, CChar)
Expand Down Expand Up @@ -267,12 +268,14 @@ instance Storable RdKafkaMetadataT where
{`Int'} -> `RdKafkaTopicPartitionListTPtr' #}

foreign import ccall unsafe "rdkafka.h &rd_kafka_topic_partition_list_destroy"
rdKafkaTopicPartitionListDestroy :: FinalizerPtr RdKafkaTopicPartitionListT
rdKafkaTopicPartitionListDestroyF :: FinalizerPtr RdKafkaTopicPartitionListT
foreign import ccall unsafe "rdkafka.h rd_kafka_topic_partition_list_destroy"
rdKafkaTopicPartitionListDestroy :: Ptr RdKafkaTopicPartitionListT -> IO ()

newRdKafkaTopicPartitionListT :: Int -> IO RdKafkaTopicPartitionListTPtr
newRdKafkaTopicPartitionListT size = do
ret <- rdKafkaTopicPartitionListNew size
addForeignPtrFinalizer rdKafkaTopicPartitionListDestroy ret
addForeignPtrFinalizer rdKafkaTopicPartitionListDestroyF ret
return ret

{# fun rd_kafka_topic_partition_list_add as ^
Expand All @@ -287,7 +290,7 @@ newRdKafkaTopicPartitionListT size = do
copyRdKafkaTopicPartitionList :: RdKafkaTopicPartitionListTPtr -> IO RdKafkaTopicPartitionListTPtr
copyRdKafkaTopicPartitionList pl = do
cp <- rdKafkaTopicPartitionListCopy pl
addForeignPtrFinalizer rdKafkaTopicPartitionListDestroy cp
addForeignPtrFinalizer rdKafkaTopicPartitionListDestroyF cp
return cp

{# fun rd_kafka_topic_partition_list_set_offset as ^
Expand Down Expand Up @@ -560,7 +563,7 @@ rdKafkaConsumeBatchQueue :: RdKafkaQueueTPtr -> Int -> Int -> IO [RdKafkaMessage
rdKafkaConsumeBatchQueue qptr timeout batchSize = do
allocaArray batchSize $ \pArr -> do
rSize <- rdKafkaConsumeBatchQueue' qptr timeout pArr (fromIntegral batchSize)
peekArray (fromIntegral rSize) pArr >>= traverse newForeignPtr_
peekArray (fromIntegral rSize) pArr >>= traverse (flip newForeignPtr (return ()))

-------------------------------------------------------------------------------------------------
---- High-level KafkaConsumer
Expand All @@ -582,7 +585,7 @@ rdKafkaSubscription k = do
(err, sub) <- rdKafkaSubscription' k
case err of
RdKafkaRespErrNoError ->
Right <$> newForeignPtr rdKafkaTopicPartitionListDestroy sub
Right <$> newForeignPtr sub (rdKafkaTopicPartitionListDestroy sub)
e -> return (Left e)

{#fun rd_kafka_consumer_poll as ^
Expand Down Expand Up @@ -614,7 +617,7 @@ rdKafkaAssignment k = do
(err, ass) <- rdKafkaAssignment' k
case err of
RdKafkaRespErrNoError ->
Right <$> newForeignPtr rdKafkaTopicPartitionListDestroy ass
Right <$> newForeignPtr ass (rdKafkaTopicPartitionListDestroy ass)
e -> return (Left e)

{#fun rd_kafka_commit as ^
Expand Down Expand Up @@ -721,7 +724,10 @@ instance Storable RdKafkaGroupListT where
-> `RdKafkaRespErrT' cIntToEnum #}

foreign import ccall "rdkafka.h &rd_kafka_group_list_destroy"
rdKafkaGroupListDestroy :: FinalizerPtr RdKafkaGroupListT
rdKafkaGroupListDestroyF :: FinalizerPtr RdKafkaGroupListT

foreign import ccall "rdkafka.h rd_kafka_group_list_destroy"
rdKafkaGroupListDestroy :: Ptr RdKafkaGroupListT -> IO ()

rdKafkaListGroups :: RdKafkaTPtr -> Maybe String -> Int -> IO (Either RdKafkaRespErrT RdKafkaGroupListTPtr)
rdKafkaListGroups k g t = case g of
Expand All @@ -731,7 +737,7 @@ rdKafkaListGroups k g t = case g of
listGroups grp = do
(err, res) <- rdKafkaListGroups' k grp t
case err of
RdKafkaRespErrNoError -> Right <$> newForeignPtr rdKafkaGroupListDestroy res
RdKafkaRespErrNoError -> Right <$> newForeignPtr res (rdKafkaGroupListDestroy res)
e -> return $ Left e
-------------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -910,15 +916,15 @@ rdKafkaConsumeStop topicPtr partition = do
alloca- `Ptr RdKafkaMetadataT' peekPtr*, `Int'}
-> `RdKafkaRespErrT' cIntToEnum #}

foreign import ccall unsafe "rdkafka.h &rd_kafka_metadata_destroy"
rdKafkaMetadataDestroy :: FinalizerPtr RdKafkaMetadataT
foreign import ccall unsafe "rdkafka.h rd_kafka_metadata_destroy"
rdKafkaMetadataDestroy :: Ptr RdKafkaMetadataT -> IO ()

rdKafkaMetadata :: RdKafkaTPtr -> Bool -> Maybe RdKafkaTopicTPtr -> Int -> IO (Either RdKafkaRespErrT RdKafkaMetadataTPtr)
rdKafkaMetadata k allTopics mt timeout = do
tptr <- maybe (newForeignPtr_ nullPtr) pure mt
(err, res) <- rdKafkaMetadata' k allTopics tptr timeout
case err of
RdKafkaRespErrNoError -> Right <$> newForeignPtr rdKafkaMetadataDestroy res
RdKafkaRespErrNoError -> Right <$> newForeignPtr res (rdKafkaMetadataDestroy res)
e -> return (Left e)

{#fun rd_kafka_poll as ^
Expand Down