Skip to content

Commit aa1c2c6

Browse files
author
Levashov, Mykyta
committed
Produce headers
1 parent e822010 commit aa1c2c6

File tree

8 files changed

+147
-81
lines changed

8 files changed

+147
-81
lines changed

example/ProducerExample.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ sendMessages prod = do
6161
putStrLn "And the last one..."
6262
msg3 <- getLine
6363
err3 <- produceMessage prod (mkMessage (Just "key3") (Just $ pack msg3))
64+
65+
err4 <- produceMessageWithHeaders prod (headersFromList [("fancy", "header")]) (mkMessage (Just "key4") (Just $ pack msg3))
6466

6567
-- errs <- produceMessageBatch prod
6668
-- [ mkMessage (Just "b-1") (Just "batch-1")

src/Kafka/Consumer/Types.hs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,13 @@ where
3737
import Data.Bifoldable (Bifoldable (..))
3838
import Data.Bifunctor (Bifunctor (..))
3939
import Data.Bitraversable (Bitraversable (..), bimapM, bisequence)
40-
import Data.ByteString (ByteString)
4140
import Data.Int (Int64)
4241
import Data.String (IsString)
4342
import Data.Text (Text)
4443
import Data.Typeable (Typeable)
4544
import GHC.Generics (Generic)
4645
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..))
47-
import Kafka.Types (Millis (..), PartitionId (..), TopicName (..))
46+
import Kafka.Types (Millis (..), PartitionId (..), TopicName (..), Headers)
4847

4948
-- | The main type for Kafka consumption, used e.g. to poll and commit messages.
5049
--
@@ -140,13 +139,13 @@ data TopicPartition = TopicPartition
140139

141140
-- | Represents a /received/ message from Kafka (i.e. used in a consumer)
142141
data ConsumerRecord k v = ConsumerRecord
143-
{ crTopic :: !TopicName -- ^ Kafka topic this message was received from
144-
, crPartition :: !PartitionId -- ^ Kafka partition this message was received from
145-
, crOffset :: !Offset -- ^ Offset within the 'crPartition' Kafka partition
146-
, crTimestamp :: !Timestamp -- ^ Message timestamp
147-
, crHeaders :: ![(ByteString, ByteString)] -- ^ Message headers
148-
, crKey :: !k -- ^ Message key
149-
, crValue :: !v -- ^ Message value
142+
{ crTopic :: !TopicName -- ^ Kafka topic this message was received from
143+
, crPartition :: !PartitionId -- ^ Kafka partition this message was received from
144+
, crOffset :: !Offset -- ^ Offset within the 'crPartition' Kafka partition
145+
, crTimestamp :: !Timestamp -- ^ Message timestamp
146+
, crHeaders :: !Headers -- ^ Message headers
147+
, crKey :: !k -- ^ Message key
148+
, crValue :: !v -- ^ Message value
150149
}
151150
deriving (Eq, Show, Read, Typeable, Generic)
152151

src/Kafka/Internal/RdKafka.chs

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
{-# LANGUAGE ForeignFunctionInterface #-}
22
{-# LANGUAGE EmptyDataDecls #-}
3-
{-# LANGUAGE TypeApplications #-}
4-
{-# LANGUAGE CApiFFI #-}
53

64
module Kafka.Internal.RdKafka where
75

@@ -978,57 +976,40 @@ newRdKafkaTopicT kafkaPtr topic topicConfPtr = do
978976
---- Errors
979977

980978
data RdKafkaErrorT
981-
{#pointer *rd_kafka_error_t as RdKafkaErrorTPtr foreign -> RdKafkaErrorT #}
979+
{#pointer *rd_kafka_error_t as RdKafkaErrorTPtr -> RdKafkaErrorT #}
982980

983-
foreign import ccall "rdkafka.h &rd_kafka_error_destroy"
984-
rdKafkaErrorDestroyF :: FinalizerPtr RdKafkaErrorT
985-
986-
foreign import ccall "rdkafka.h rd_kafka_error_destroy"
987-
rdKafkaErrorDestroy :: Ptr RdKafkaErrorT -> IO ()
981+
{#fun rd_kafka_error_code as ^
982+
{`RdKafkaErrorTPtr'} -> `RdKafkaRespErrT' cIntToEnum #}
988983

984+
{#fun rd_kafka_error_destroy as ^
985+
{`RdKafkaErrorTPtr'} -> `()' #}
989986
-------------------------------------------------------------------------------------------------
990987
---- Headers
991988

992989
data RdKafkaHeadersT
993990
{#pointer *rd_kafka_headers_t as RdKafkaHeadersTPtr -> RdKafkaHeadersT #}
994991

995-
{#fun rd_kafka_headers_new as ^
996-
{`Int'} -> `RdKafkaHeadersTPtr' #}
997-
998-
foreign import ccall "rdkafka.h &rd_kafka_headers_destroy"
999-
rdKafkaHeadersDestroyF :: FinalizerPtr RdKafkaHeadersT
1000-
1001-
foreign import ccall "rdkafka.h rd_kafka_headers_destroy"
1002-
rdKafkaHeadersDestroy :: Ptr RdKafkaHeadersT -> IO ()
1003-
1004-
{#fun rd_kafka_header_cnt as ^
1005-
{`RdKafkaHeadersTPtr'} -> `CSize' cIntConv #}
1006-
1007-
{#fun rd_kafka_header_add as ^
1008-
{`RdKafkaHeadersTPtr', `CString', cIntConv `CSize', `Ptr ()', `CLong'} -> `CSize' cIntConv #}
1009-
1010992
{#fun rd_kafka_header_get_all as ^
1011993
{`RdKafkaHeadersTPtr', cIntConv `CSize', castPtr `Ptr CString', castPtr `Ptr Word8Ptr', `CSizePtr'} -> `RdKafkaRespErrT' cIntToEnum #}
1012994

1013-
{#fun rd_kafka_message_set_headers as ^
1014-
{`RdKafkaMessageTPtr', `RdKafkaHeadersTPtr'} -> `()' #}
1015-
1016995
{#fun rd_kafka_message_headers as ^
1017996
{`RdKafkaMessageTPtr', alloca- `RdKafkaHeadersTPtr' peekPtr*} -> `RdKafkaRespErrT' cIntToEnum #}
1018997

1019-
--- Producev api
998+
--- Produceva api
1020999

10211000
{#enum rd_kafka_vtype_t as ^ {underscoreToCase} deriving (Show, Eq) #}
10221001

10231002
data RdKafkaVuT
10241003
= Topic'RdKafkaVu CString
10251004
| TopicHandle'RdKafkaVu (Ptr RdKafkaTopicT)
1026-
| Partition'RdKafkaVu CInt
1005+
| Partition'RdKafkaVu CInt32T
10271006
| Value'RdKafkaVu Word8Ptr CSize
10281007
| Key'RdKafkaVu Word8Ptr CSize
10291008
| MsgFlags'RdKafkaVu CInt
1030-
| Timestamp'RdKafkaVu CLong
1031-
| Headers'RdKafkaVu (Ptr RdKafkaHeadersT) -- The message object will assume ownership of the headers (unless producev() fails)
1009+
| Timestamp'RdKafkaVu CInt64T
1010+
| Opaque'RdKafkaVu (Ptr ())
1011+
| Header'RdKafkaVu CString Word8Ptr CSize
1012+
| Headers'RdKafkaVu (Ptr RdKafkaHeadersT) -- The message object will assume ownership of the headers (unless produceva() fails)
10321013
| End'RdKafkaVu
10331014

10341015
{#pointer *rd_kafka_vu_t as RdKafkaVuTPtr foreign -> RdKafkaVuT #}
@@ -1052,8 +1033,12 @@ instance Storable RdKafkaVuT where
10521033
sz <- ({#get rd_kafka_vu_t->u.mem.size #} p)
10531034
return $ Key'RdKafkaVu nm (cIntConv sz)
10541035
RdKafkaVtypeRkt -> TopicHandle'RdKafkaVu <$> ({#get rd_kafka_vu_t->u.rkt #} p)
1055-
-- RdKafkaVtypeOpaque
1056-
-- RdKafkaVtypeHeader
1036+
RdKafkaVtypeOpaque -> Opaque'RdKafkaVu <$> ({#get rd_kafka_vu_t->u.ptr #} p)
1037+
RdKafkaVtypeHeader -> do
1038+
nm <- ({#get rd_kafka_vu_t->u.header.name #} p)
1039+
val' <- liftM castPtr ({#get rd_kafka_vu_t->u.header.val #} p)
1040+
sz <- ({#get rd_kafka_vu_t->u.header.size #} p)
1041+
return $ Header'RdKafkaVu nm val' (cIntConv sz)
10571042
poke p End'RdKafkaVu =
10581043
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeEnd)
10591044
poke p (Topic'RdKafkaVu str) = do
@@ -1082,6 +1067,14 @@ instance Storable RdKafkaVuT where
10821067
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeKey)
10831068
{#set rd_kafka_vu_t.u.mem.size #} p (cIntConv sz)
10841069
{#set rd_kafka_vu_t.u.mem.ptr #} p (castPtr pl)
1070+
poke p (Opaque'RdKafkaVu ptr') = do
1071+
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeOpaque)
1072+
{#set rd_kafka_vu_t.u.ptr #} p ptr'
1073+
poke p (Header'RdKafkaVu nm val' sz) = do
1074+
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeHeader)
1075+
{#set rd_kafka_vu_t.u.header.size #} p (cIntConv sz)
1076+
{#set rd_kafka_vu_t.u.header.name #} p nm
1077+
{#set rd_kafka_vu_t.u.header.val #} p (castPtr val')
10851078

10861079
{#fun rd_kafka_produceva as rdKafkaMessageProduceVa'
10871080
{`RdKafkaTPtr', `RdKafkaVuTPtr', `CLong'} -> `RdKafkaErrorTPtr' #}

src/Kafka/Internal/Shared.hs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import Foreign.Storable (Storable (peek))
3434
import Kafka.Consumer.Types (Timestamp (..))
3535
import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaMessageTPtr, RdKafkaRespErrT (..), RdKafkaTimestampTypeT (..), Word8Ptr, rdKafkaErrno2err, rdKafkaMessageTimestamp, rdKafkaPoll, rdKafkaTopicName, rdKafkaHeaderGetAll, rdKafkaMessageHeaders)
3636
import Kafka.Internal.Setup (HasKafka (..), Kafka (..))
37-
import Kafka.Types (KafkaError (..), Millis (..), Timeout (..))
37+
import Kafka.Types (KafkaError (..), Millis (..), Timeout (..), Headers, headersFromList)
3838

3939
pollEvents :: HasKafka a => a -> Maybe Timeout -> IO ()
4040
pollEvents a tm =
@@ -106,17 +106,17 @@ readTimestamp msg =
106106
RdKafkaTimestampNotAvailable -> NoTimestamp
107107

108108

109-
readHeaders :: RdKafkaMessageTPtr -> IO (Either RdKafkaRespErrT [(BS.ByteString, BS.ByteString)])
109+
readHeaders :: RdKafkaMessageTPtr -> IO (Either RdKafkaRespErrT Headers)
110110
readHeaders msg = do
111111
(err, headersPtr) <- rdKafkaMessageHeaders msg
112-
case err of
113-
RdKafkaRespErrNoent -> return $ Right []
114-
RdKafkaRespErrNoError -> extractHeaders headersPtr
112+
case err of
113+
RdKafkaRespErrNoent -> return $ Right mempty
114+
RdKafkaRespErrNoError -> fmap headersFromList <$> extractHeaders headersPtr
115115
e -> return $ Left e
116-
where extractHeaders ptHeaders =
117-
alloca $ \nptr ->
118-
alloca $ \vptr ->
119-
alloca $ \szptr ->
116+
where extractHeaders ptHeaders =
117+
alloca $ \nptr ->
118+
alloca $ \vptr ->
119+
alloca $ \szptr ->
120120
let go acc idx = rdKafkaHeaderGetAll ptHeaders idx nptr vptr szptr >>= \case
121121
RdKafkaRespErrNoent -> return $ Right acc
122122
RdKafkaRespErrNoError -> do
@@ -132,4 +132,4 @@ readHeaders msg = do
132132
readBS :: (t -> Int) -> (t -> Ptr Word8) -> t -> IO (Maybe BS.ByteString)
133133
readBS flen fdata s = if fdata s == nullPtr
134134
then return Nothing
135-
else Just <$> word8PtrToBS (flen s) (fdata s)
135+
else Just <$> word8PtrToBS (flen s) (fdata s)

src/Kafka/Producer.hs

Lines changed: 59 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ module Kafka.Producer
5858
, module X
5959
, runProducer
6060
, newProducer
61-
, produceMessage, produceMessageBatch
62-
, produceMessage'
61+
, produceMessage, produceMessageBatch, produceMessageWithHeaders
62+
, produceMessage', produceMessageWithHeaders'
6363
, flushProducer
6464
, closeProducer
6565
, RdKafkaRespErrT (..)
@@ -76,15 +76,16 @@ import Data.Function (on)
7676
import Data.List (groupBy, sortBy)
7777
import Data.Ord (comparing)
7878
import qualified Data.Text as Text
79+
import Foreign.C.String (withCString)
7980
import Foreign.ForeignPtr (newForeignPtr_, withForeignPtr)
8081
import Foreign.Marshal.Array (withArrayLen)
8182
import Foreign.Ptr (Ptr, nullPtr, plusPtr)
8283
import Foreign.Storable (Storable (..))
8384
import Foreign.StablePtr (newStablePtr, castStablePtrToPtr)
84-
import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaRespErrT (..), RdKafkaTypeT (..), destroyUnmanagedRdKafkaTopic, newRdKafkaT, newUnmanagedRdKafkaTopicT, rdKafkaOutqLen, rdKafkaProduce, rdKafkaProduceBatch, rdKafkaSetLogLevel)
85+
import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaRespErrT (..), RdKafkaTypeT (..), RdKafkaVuT(..), destroyUnmanagedRdKafkaTopic, newRdKafkaT, newUnmanagedRdKafkaTopicT, rdKafkaErrorCode, rdKafkaErrorDestroy, rdKafkaOutqLen, rdKafkaProduceBatch, rdKafkaMessageProduceVa, rdKafkaSetLogLevel)
8586
import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), kafkaConf, topicConf, Callback(..))
8687
import Kafka.Internal.Shared (pollEvents)
87-
import Kafka.Producer.Convert (copyMsgFlags, handleProduceErr', producePartitionCInt, producePartitionInt)
88+
import Kafka.Producer.Convert (copyMsgFlags, handleProduceErrT, producePartitionCInt, producePartitionInt)
8889
import Kafka.Producer.Types (KafkaProducer (..))
8990

9091
import Kafka.Producer.ProducerProperties as X
@@ -144,6 +145,18 @@ produceMessage kp m = produceMessage' kp m (pure . mempty) >>= adjustRes
144145
Right () -> pure Nothing
145146
Left (ImmediateError err) -> pure (Just err)
146147

148+
-- | Sends a single message with a registered callback and headers.
149+
produceMessageWithHeaders :: MonadIO m
150+
=> KafkaProducer
151+
-> Headers
152+
-> ProducerRecord
153+
-> m (Maybe KafkaError)
154+
produceMessageWithHeaders kp headers msg = produceMessageWithHeaders' kp headers msg (pure . mempty) >>= adjustRes
155+
where
156+
adjustRes = \case
157+
Right () -> pure Nothing
158+
Left (ImmediateError err) -> pure (Just err)
159+
147160
-- | Sends a single message with a registered callback.
148161
--
149162
-- The callback can be a long running process, as it is forked by the thread
@@ -154,35 +167,45 @@ produceMessage' :: MonadIO m
154167
-> ProducerRecord
155168
-> (DeliveryReport -> IO ())
156169
-> m (Either ImmediateError ())
157-
produceMessage' kp@(KafkaProducer (Kafka k) _ (TopicConf tc)) msg cb = liftIO $
158-
fireCallbacks >> bracket (mkTopic . prTopic $ msg) closeTopic withTopic
170+
produceMessage' kp = produceMessageWithHeaders' kp mempty
171+
172+
-- | Sends a single message with a registered callback and headers.
173+
--
174+
-- The callback can be a long running process, as it is forked by the thread
175+
-- that handles the delivery reports.
176+
--
177+
produceMessageWithHeaders' :: MonadIO m
178+
=> KafkaProducer
179+
-> Headers
180+
-> ProducerRecord
181+
-> (DeliveryReport -> IO ())
182+
-> m (Either ImmediateError ())
183+
produceMessageWithHeaders' kp@(KafkaProducer (Kafka k) _ _) headers msg cb = liftIO $
184+
fireCallbacks >> produceIt
159185
where
160186
fireCallbacks =
161187
pollEvents kp . Just . Timeout $ 0
162188

163-
mkTopic (TopicName tn) =
164-
newUnmanagedRdKafkaTopicT k (Text.unpack tn) (Just tc)
165-
166-
closeTopic = either mempty destroyUnmanagedRdKafkaTopic
167-
168-
withTopic (Left err) = return . Left . ImmediateError . KafkaError . Text.pack $ err
169-
withTopic (Right topic) =
189+
produceIt =
170190
withBS (prValue msg) $ \payloadPtr payloadLength ->
171-
withBS (prKey msg) $ \keyPtr keyLength -> do
172-
callbackPtr <- newStablePtr cb
173-
res <- handleProduceErr' =<< rdKafkaProduce
174-
topic
175-
(producePartitionCInt (prPartition msg))
176-
copyMsgFlags
177-
payloadPtr
178-
(fromIntegral payloadLength)
179-
keyPtr
180-
(fromIntegral keyLength)
181-
(castStablePtrToPtr callbackPtr)
191+
withBS (prKey msg) $ \keyPtr keyLength ->
192+
withHeaders headers $ \hdrs ->
193+
withCString (Text.unpack . unTopicName . prTopic $ msg) $ \topicName -> do
194+
callbackPtr <- newStablePtr cb
195+
let opts = [
196+
Topic'RdKafkaVu topicName
197+
, Partition'RdKafkaVu . producePartitionCInt . prPartition $ msg
198+
, MsgFlags'RdKafkaVu (fromIntegral copyMsgFlags)
199+
, Value'RdKafkaVu payloadPtr (fromIntegral payloadLength)
200+
, Key'RdKafkaVu keyPtr (fromIntegral keyLength)
201+
, Opaque'RdKafkaVu (castStablePtrToPtr callbackPtr)
202+
]
182203

183-
pure $ case res of
184-
Left err -> Left . ImmediateError $ err
185-
Right () -> Right ()
204+
code <- bracket (rdKafkaMessageProduceVa k (hdrs ++ opts)) rdKafkaErrorDestroy rdKafkaErrorCode
205+
res <- handleProduceErrT code
206+
pure $ case res of
207+
Just err -> Left . ImmediateError $ err
208+
Nothing -> Right ()
186209

187210
-- | Sends a batch of messages.
188211
-- Returns a list of messages which it was unable to send with corresponding errors.
@@ -255,6 +278,15 @@ flushProducer kp = liftIO $ do
255278

256279
------------------------------------------------------------------------------------
257280

281+
withHeaders :: Headers -> ([RdKafkaVuT] -> IO a) -> IO a
282+
withHeaders hds handle = go (headersToList hds) []
283+
where
284+
go [] acc = handle acc
285+
go ((nm, val) : xs) acc =
286+
BS.useAsCString nm $ \cnm ->
287+
withBS (Just val) $ \vp vl ->
288+
go xs (Header'RdKafkaVu cnm vp (fromIntegral vl) : acc)
289+
258290
withBS :: Maybe BS.ByteString -> (Ptr a -> Int -> IO b) -> IO b
259291
withBS Nothing f = f nullPtr 0
260292
withBS (Just bs) f =

src/Kafka/Producer/Convert.hs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ module Kafka.Producer.Convert
44
, producePartitionCInt
55
, handleProduceErr
66
, handleProduceErr'
7+
, handleProduceErrT
78
)
89
where
910

1011
import Foreign.C.Error (getErrno)
1112
import Foreign.C.Types (CInt)
12-
import Kafka.Internal.RdKafka (rdKafkaMsgFlagCopy)
13+
import Kafka.Internal.RdKafka (RdKafkaRespErrT(..), rdKafkaMsgFlagCopy)
1314
import Kafka.Internal.Shared (kafkaRespErr)
1415
import Kafka.Types (KafkaError(..))
1516
import Kafka.Producer.Types (ProducePartition(..))
@@ -33,6 +34,12 @@ handleProduceErr 0 = return Nothing
3334
handleProduceErr _ = return $ Just KafkaInvalidReturnValue
3435
{-# INLINE handleProduceErr #-}
3536

37+
handleProduceErrT :: RdKafkaRespErrT -> IO (Maybe KafkaError)
38+
handleProduceErrT RdKafkaRespErrUnknown = Just . kafkaRespErr <$> getErrno
39+
handleProduceErrT RdKafkaRespErrNoError = return Nothing
40+
handleProduceErrT e = return $ Just (KafkaResponseError e)
41+
{-# INLINE handleProduceErrT #-}
42+
3643
handleProduceErr' :: Int -> IO (Either KafkaError ())
3744
handleProduceErr' (- 1) = Left . kafkaRespErr <$> getErrno
3845
handleProduceErr' 0 = return (Right ())

src/Kafka/Types.hs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ module Kafka.Types
2121
, KafkaDebug(..)
2222
, KafkaCompressionCodec(..)
2323
, TopicType(..)
24+
, Headers(unHeaders), headersFromList, headersToList
2425
, topicType
2526
, kafkaDebugToText
2627
, kafkaCompressionCodecToText
@@ -34,6 +35,7 @@ import Data.Text (Text, isPrefixOf)
3435
import Data.Typeable (Typeable)
3536
import GHC.Generics (Generic)
3637
import Kafka.Internal.RdKafka (RdKafkaRespErrT, rdKafkaErr2name, rdKafkaErr2str)
38+
import qualified Data.ByteString as BS
3739

3840
-- | Kafka broker ID
3941
newtype BrokerId = BrokerId { unBrokerId :: Int } deriving (Show, Eq, Ord, Read, Generic)
@@ -158,4 +160,14 @@ kafkaCompressionCodecToText c = case c of
158160
NoCompression -> "none"
159161
Gzip -> "gzip"
160162
Snappy -> "snappy"
161-
Lz4 -> "lz4"
163+
Lz4 -> "lz4"
164+
165+
-- | Headers that might be passed along with a record
166+
newtype Headers = Headers { unHeaders :: [(BS.ByteString, BS.ByteString)] }
167+
deriving (Eq, Show, Semigroup, Monoid, Read, Typeable, Generic)
168+
169+
headersFromList :: [(BS.ByteString, BS.ByteString)] -> Headers
170+
headersFromList = Headers
171+
172+
headersToList :: Headers -> [(BS.ByteString, BS.ByteString)]
173+
headersToList = unHeaders

0 commit comments

Comments
 (0)