Skip to content

Add headers to consumed/produced records #179

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from Oct 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions example/ProducerExample.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mkMessage k v = ProducerRecord
, prPartition = UnassignedPartition
, prKey = k
, prValue = v
, prHeaders = mempty
}

-- Run an example
Expand Down Expand Up @@ -61,12 +62,8 @@ sendMessages prod = do
putStrLn "And the last one..."
msg3 <- getLine
err3 <- produceMessage prod (mkMessage (Just "key3") (Just $ pack msg3))

-- errs <- produceMessageBatch prod
-- [ mkMessage (Just "b-1") (Just "batch-1")
-- , mkMessage (Just "b-2") (Just "batch-2")
-- , mkMessage Nothing (Just "batch-3")
-- ]

err4 <- produceMessage prod ((mkMessage (Just "key4") (Just $ pack msg3)) { prHeaders = headersFromList [("fancy", "header")]})

-- forM_ errs (print . snd)

Expand Down
14 changes: 7 additions & 7 deletions nix/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@
"homepage": "https://github.com/nmattia/niv",
"owner": "nmattia",
"repo": "niv",
"rev": "af958e8057f345ee1aca714c1247ef3ba1c15f5e",
"sha256": "1qjavxabbrsh73yck5dcq8jggvh3r2jkbr6b5nlz5d9yrqm9255n",
"rev": "65a61b147f307d24bfd0a5cd56ce7d7b7cc61d2e",
"sha256": "17mirpsx5wyw262fpsd6n6m47jcgw8k2bwcp1iwdnrlzy4dhcgqh",
"type": "tarball",
"url": "https://github.com/nmattia/niv/archive/af958e8057f345ee1aca714c1247ef3ba1c15f5e.tar.gz",
"url": "https://github.com/nmattia/niv/archive/65a61b147f307d24bfd0a5cd56ce7d7b7cc61d2e.tar.gz",
"url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz"
},
"nixpkgs": {
"branch": "release-19.03",
"branch": "nixos-21.05",
"description": "Nix Packages collection",
"homepage": "",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "da0c385a691d38b56b17eb18b852c4cec2050c24",
"sha256": "0svhqn139cy2nlgv4kqv1bsxza2dcm0yylrhnmanw4p73gv85caf",
"rev": "ce7a1190a0fa4ba3465b5f5471b08567060ca14c",
"sha256": "1zr1s9gp0h5g4arlba1bpb9yqfaaby5195ydm6a2psaxhm748li9",
"type": "tarball",
"url": "https://github.com/NixOS/nixpkgs/archive/da0c385a691d38b56b17eb18b852c4cec2050c24.tar.gz",
"url": "https://github.com/NixOS/nixpkgs/archive/ce7a1190a0fa4ba3465b5f5471b08567060ca14c.tar.gz",
"url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz"
}
}
2 changes: 1 addition & 1 deletion scripts/build-librdkafka
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

RDKAFKA_VER="849c066b559950b02e37a69256f0cb7b04381d0e"
RDKAFKA_VER="1a722553638bba85dbda5050455f7b9a5ef302de"

PRJ=$PWD
DST="$PRJ/.librdkafka"
Expand Down
1 change: 1 addition & 0 deletions shell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pkgs.mkShell {
rdkafka
nettools
niv
gmp
];

shellHook = ''
Expand Down
9 changes: 6 additions & 3 deletions src/Kafka/Consumer/Convert.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ where

import Control.Monad ((>=>))
import qualified Data.ByteString as BS
import Data.Either (fromRight)
import Data.Int (Int64)
import Data.Map.Strict (Map, fromListWith)
import qualified Data.Set as S
Expand All @@ -41,7 +42,7 @@ import Kafka.Internal.RdKafka
, rdKafkaTopicPartitionListNew
, peekCText
)
import Kafka.Internal.Shared (kafkaRespErr, readTopic, readKey, readPayload, readTimestamp)
import Kafka.Internal.Shared (kafkaRespErr, readHeaders, readTopic, readKey, readPayload, readTimestamp)
import Kafka.Types (KafkaError(..), PartitionId(..), TopicName(..))

-- | Converts offsets sync policy to integer (the way Kafka understands it):
Expand Down Expand Up @@ -158,20 +159,22 @@ fromMessagePtr ptr =
s <- peek realPtr
msg <- if err'RdKafkaMessageT s /= RdKafkaRespErrNoError
then return . Left . KafkaResponseError $ err'RdKafkaMessageT s
else Right <$> mkRecord s
else Right <$> mkRecord s realPtr
rdKafkaMessageDestroy realPtr
return msg
where
mkRecord msg = do
mkRecord msg rptr = do
topic <- readTopic msg
key <- readKey msg
payload <- readPayload msg
timestamp <- readTimestamp ptr
headers <- fromRight mempty <$> readHeaders rptr
return ConsumerRecord
{ crTopic = TopicName topic
, crPartition = PartitionId $ partition'RdKafkaMessageT msg
, crOffset = Offset $ offset'RdKafkaMessageT msg
, crTimestamp = timestamp
, crHeaders = headers
, crKey = key
, crValue = payload
}
Expand Down
5 changes: 3 additions & 2 deletions src/Kafka/Consumer/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import Data.Text (Text)
import Data.Typeable (Typeable)
import GHC.Generics (Generic)
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..))
import Kafka.Types (Millis (..), PartitionId (..), TopicName (..))
import Kafka.Types (Millis (..), PartitionId (..), TopicName (..), Headers)

-- | The main type for Kafka consumption, used e.g. to poll and commit messages.
--
Expand Down Expand Up @@ -143,13 +143,14 @@ data ConsumerRecord k v = ConsumerRecord
, crPartition :: !PartitionId -- ^ Kafka partition this message was received from
, crOffset :: !Offset -- ^ Offset within the 'crPartition' Kafka partition
, crTimestamp :: !Timestamp -- ^ Message timestamp
, crHeaders :: !Headers -- ^ Message headers
, crKey :: !k -- ^ Message key
, crValue :: !v -- ^ Message value
}
deriving (Eq, Show, Read, Typeable, Generic)

instance Bifunctor ConsumerRecord where
bimap f g (ConsumerRecord t p o ts k v) = ConsumerRecord t p o ts (f k) (g v)
bimap f g (ConsumerRecord t p o ts hds k v) = ConsumerRecord t p o ts hds (f k) (g v)
{-# INLINE bimap #-}

instance Functor (ConsumerRecord k) where
Expand Down
118 changes: 115 additions & 3 deletions src/Kafka/Internal/RdKafka.chs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import Data.Word (Word8)
import Foreign.Concurrent (newForeignPtr)
import qualified Foreign.Concurrent as Concurrent
import Foreign.Marshal.Alloc (alloca, allocaBytes)
import Foreign.Marshal.Array (peekArray, allocaArray)
import Foreign.Marshal.Array (peekArray, allocaArray, withArrayLen)
import Foreign.Storable (Storable(..))
import Foreign.Ptr (Ptr, FunPtr, castPtr, nullPtr)
import Foreign.ForeignPtr (FinalizerPtr, addForeignPtrFinalizer, newForeignPtr_, withForeignPtr)
import Foreign.C.Error (Errno(..), getErrno)
import Foreign.C.String (CString, newCString, withCAString, peekCAString, peekCString)
import Foreign.C.Types (CFile, CInt(..), CSize, CChar)
import Foreign.C.Types (CFile, CInt(..), CSize, CChar, CLong)
import System.IO (Handle, stdin, stdout, stderr)
import System.Posix.IO (handleToFd)
import System.Posix.Types (Fd(..))
Expand Down Expand Up @@ -972,6 +972,118 @@ newRdKafkaTopicT kafkaPtr topic topicConfPtr = do
_ <- traverse (addForeignPtrFinalizer rdKafkaTopicDestroy') res
return res

-------------------------------------------------------------------------------------------------
---- Errors

data RdKafkaErrorT
{#pointer *rd_kafka_error_t as RdKafkaErrorTPtr -> RdKafkaErrorT #}

{#fun rd_kafka_error_code as ^
{`RdKafkaErrorTPtr'} -> `RdKafkaRespErrT' cIntToEnum #}

{#fun rd_kafka_error_destroy as ^
{`RdKafkaErrorTPtr'} -> `()' #}
-------------------------------------------------------------------------------------------------
---- Headers

data RdKafkaHeadersT
{#pointer *rd_kafka_headers_t as RdKafkaHeadersTPtr -> RdKafkaHeadersT #}

{#fun rd_kafka_header_get_all as ^
{`RdKafkaHeadersTPtr', cIntConv `CSize', castPtr `Ptr CString', castPtr `Ptr Word8Ptr', `CSizePtr'} -> `RdKafkaRespErrT' cIntToEnum #}

{#fun rd_kafka_message_headers as ^
{castPtr `Ptr RdKafkaMessageT', alloca- `RdKafkaHeadersTPtr' peekPtr*} -> `RdKafkaRespErrT' cIntToEnum #}

--- Produceva api

{#enum rd_kafka_vtype_t as ^ {underscoreToCase} deriving (Show, Eq) #}

data RdKafkaVuT
= Topic'RdKafkaVu CString
| TopicHandle'RdKafkaVu (Ptr RdKafkaTopicT)
| Partition'RdKafkaVu CInt32T
| Value'RdKafkaVu Word8Ptr CSize
| Key'RdKafkaVu Word8Ptr CSize
| MsgFlags'RdKafkaVu CInt
| Timestamp'RdKafkaVu CInt64T
| Opaque'RdKafkaVu (Ptr ())
| Header'RdKafkaVu CString Word8Ptr CSize
| Headers'RdKafkaVu (Ptr RdKafkaHeadersT) -- The message object will assume ownership of the headers (unless produceva() fails)
| End'RdKafkaVu

{#pointer *rd_kafka_vu_t as RdKafkaVuTPtr foreign -> RdKafkaVuT #}

instance Storable RdKafkaVuT where
alignment _ = {#alignof rd_kafka_vu_t #}
sizeOf _ = {#sizeof rd_kafka_vu_t #}
peek p = {#get rd_kafka_vu_t->vtype #} p >>= \a -> case cIntToEnum a of
RdKafkaVtypeEnd -> return End'RdKafkaVu
RdKafkaVtypeTopic -> Topic'RdKafkaVu <$> ({#get rd_kafka_vu_t->u.cstr #} p)
RdKafkaVtypeMsgflags -> MsgFlags'RdKafkaVu <$> ({#get rd_kafka_vu_t->u.i #} p)
RdKafkaVtypeTimestamp -> Timestamp'RdKafkaVu <$> ({#get rd_kafka_vu_t->u.i64 #} p)
RdKafkaVtypePartition -> Partition'RdKafkaVu <$> ({#get rd_kafka_vu_t->u.i32 #} p)
RdKafkaVtypeHeaders -> Headers'RdKafkaVu <$> ({#get rd_kafka_vu_t->u.headers #} p)
RdKafkaVtypeValue -> do
nm <- liftM castPtr ({#get rd_kafka_vu_t->u.mem.ptr #} p)
sz <- ({#get rd_kafka_vu_t->u.mem.size #} p)
return $ Value'RdKafkaVu nm (cIntConv sz)
RdKafkaVtypeKey -> do
nm <- liftM castPtr ({#get rd_kafka_vu_t->u.mem.ptr #} p)
sz <- ({#get rd_kafka_vu_t->u.mem.size #} p)
return $ Key'RdKafkaVu nm (cIntConv sz)
RdKafkaVtypeRkt -> TopicHandle'RdKafkaVu <$> ({#get rd_kafka_vu_t->u.rkt #} p)
RdKafkaVtypeOpaque -> Opaque'RdKafkaVu <$> ({#get rd_kafka_vu_t->u.ptr #} p)
RdKafkaVtypeHeader -> do
nm <- ({#get rd_kafka_vu_t->u.header.name #} p)
val' <- liftM castPtr ({#get rd_kafka_vu_t->u.header.val #} p)
sz <- ({#get rd_kafka_vu_t->u.header.size #} p)
return $ Header'RdKafkaVu nm val' (cIntConv sz)
poke p End'RdKafkaVu =
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeEnd)
poke p (Topic'RdKafkaVu str) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeTopic)
{#set rd_kafka_vu_t.u.cstr #} p str
poke p (Timestamp'RdKafkaVu tms) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeTimestamp)
{#set rd_kafka_vu_t.u.i64 #} p tms
poke p (Partition'RdKafkaVu prt) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypePartition)
{#set rd_kafka_vu_t.u.i32 #} p prt
poke p (MsgFlags'RdKafkaVu flags) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeMsgflags)
{#set rd_kafka_vu_t.u.i #} p flags
poke p (Headers'RdKafkaVu headers) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeHeaders)
{#set rd_kafka_vu_t.u.headers #} p headers
poke p (TopicHandle'RdKafkaVu tphandle) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeRkt)
{#set rd_kafka_vu_t.u.rkt #} p tphandle
poke p (Value'RdKafkaVu pl sz) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeValue)
{#set rd_kafka_vu_t.u.mem.size #} p (cIntConv sz)
{#set rd_kafka_vu_t.u.mem.ptr #} p (castPtr pl)
poke p (Key'RdKafkaVu pl sz) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeKey)
{#set rd_kafka_vu_t.u.mem.size #} p (cIntConv sz)
{#set rd_kafka_vu_t.u.mem.ptr #} p (castPtr pl)
poke p (Opaque'RdKafkaVu ptr') = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeOpaque)
{#set rd_kafka_vu_t.u.ptr #} p ptr'
poke p (Header'RdKafkaVu nm val' sz) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeHeader)
{#set rd_kafka_vu_t.u.header.size #} p (cIntConv sz)
{#set rd_kafka_vu_t.u.header.name #} p nm
{#set rd_kafka_vu_t.u.header.val #} p (castPtr val')

{#fun rd_kafka_produceva as rdKafkaMessageProduceVa'
{`RdKafkaTPtr', `RdKafkaVuTPtr', `CLong'} -> `RdKafkaErrorTPtr' #}

rdKafkaMessageProduceVa :: RdKafkaTPtr -> [RdKafkaVuT] -> IO RdKafkaErrorTPtr
rdKafkaMessageProduceVa kafkaPtr vts = withArrayLen vts $ \i arrPtr -> do
fptr <- newForeignPtr_ arrPtr
rdKafkaMessageProduceVa' kafkaPtr fptr (cIntConv i)

-- Marshall / Unmarshall
enumToCInt :: Enum a => a -> CInt
enumToCInt = fromIntegral . fromEnum
Expand Down Expand Up @@ -1013,4 +1125,4 @@ c_stdin = handleToCFile stdin "r"
c_stdout :: IO CFilePtr
c_stdout = handleToCFile stdout "w"
c_stderr :: IO CFilePtr
c_stderr = handleToCFile stderr "w"
c_stderr = handleToCFile stderr "w"
31 changes: 29 additions & 2 deletions src/Kafka/Internal/Shared.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{-# LANGUAGE LambdaCase #-}

module Kafka.Internal.Shared
( pollEvents
, word8PtrToBS
Expand All @@ -8,6 +10,7 @@ module Kafka.Internal.Shared
, kafkaErrorToEither
, kafkaErrorToMaybe
, maybeToLeft
, readHeaders
, readPayload
, readTopic
, readKey
Expand All @@ -29,9 +32,9 @@ import Foreign.Marshal.Alloc (alloca)
import Foreign.Ptr (Ptr, nullPtr)
import Foreign.Storable (Storable (peek))
import Kafka.Consumer.Types (Timestamp (..))
import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaMessageTPtr, RdKafkaRespErrT (..), RdKafkaTimestampTypeT (..), Word8Ptr, rdKafkaErrno2err, rdKafkaMessageTimestamp, rdKafkaPoll, rdKafkaTopicName)
import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaMessageTPtr, RdKafkaRespErrT (..), RdKafkaTimestampTypeT (..), Word8Ptr, rdKafkaErrno2err, rdKafkaMessageTimestamp, rdKafkaPoll, rdKafkaTopicName, rdKafkaHeaderGetAll, rdKafkaMessageHeaders)
import Kafka.Internal.Setup (HasKafka (..), Kafka (..))
import Kafka.Types (KafkaError (..), Millis (..), Timeout (..))
import Kafka.Types (KafkaError (..), Millis (..), Timeout (..), Headers, headersFromList)

pollEvents :: HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents a tm =
Expand Down Expand Up @@ -102,6 +105,30 @@ readTimestamp msg =
RdKafkaTimestampLogAppendTime -> LogAppendTime (Millis ts)
RdKafkaTimestampNotAvailable -> NoTimestamp


readHeaders :: Ptr RdKafkaMessageT -> IO (Either RdKafkaRespErrT Headers)
readHeaders msg = do
(err, headersPtr) <- rdKafkaMessageHeaders msg
case err of
RdKafkaRespErrNoent -> return $ Right mempty
RdKafkaRespErrNoError -> fmap headersFromList <$> extractHeaders headersPtr
e -> return . Left $ e
where extractHeaders ptHeaders =
alloca $ \nptr ->
alloca $ \vptr ->
alloca $ \szptr ->
let go acc idx = rdKafkaHeaderGetAll ptHeaders idx nptr vptr szptr >>= \case
RdKafkaRespErrNoent -> return $ Right acc
RdKafkaRespErrNoError -> do
cstr <- peek nptr
wptr <- peek vptr
csize <- peek szptr
hn <- BS.packCString cstr
hv <- word8PtrToBS (fromIntegral csize) wptr
go ((hn, hv) : acc) (idx + 1)
_ -> error "Unexpected error code while extracting headers"
in go [] 0

readBS :: (t -> Int) -> (t -> Ptr Word8) -> t -> IO (Maybe BS.ByteString)
readBS flen fdata s = if fdata s == nullPtr
then return Nothing
Expand Down
Loading