Skip to content

Commit 72e6f6d

Browse files
author
Levashov, Mykyta
committed
Remove batch producer & add headers to the record type
1 parent b0305cd commit 72e6f6d

File tree

10 files changed

+77
-162
lines changed

10 files changed

+77
-162
lines changed

example/ProducerExample.hs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ mkMessage k v = ProducerRecord
3131
, prPartition = UnassignedPartition
3232
, prKey = k
3333
, prValue = v
34+
, prHeaders = mempty
3435
}
3536

3637
-- Run an example
@@ -62,13 +63,7 @@ sendMessages prod = do
6263
msg3 <- getLine
6364
err3 <- produceMessage prod (mkMessage (Just "key3") (Just $ pack msg3))
6465

65-
err4 <- produceMessageWithHeaders prod (headersFromList [("fancy", "header")]) (mkMessage (Just "key4") (Just $ pack msg3))
66-
67-
-- errs <- produceMessageBatch prod
68-
-- [ mkMessage (Just "b-1") (Just "batch-1")
69-
-- , mkMessage (Just "b-2") (Just "batch-2")
70-
-- , mkMessage Nothing (Just "batch-3")
71-
-- ]
66+
err4 <- produceMessage prod ((mkMessage (Just "key4") (Just $ pack msg3)) { prHeaders = headersFromList [("fancy", "header")]})
7267

7368
-- forM_ errs (print . snd)
7469

nix/sources.json

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,26 @@
11
{
2-
"niv": {
3-
"branch": "master",
4-
"description": "Easy dependency management for Nix projects",
5-
"homepage": "https://github.com/nmattia/niv",
6-
"owner": "nmattia",
7-
"repo": "niv",
8-
"rev": "e0ca65c81a2d7a4d82a189f1e23a48d59ad42070",
9-
"sha256": "1pq9nh1d8nn3xvbdny8fafzw87mj7gsmp6pxkdl65w2g18rmcmzx",
10-
"type": "tarball",
11-
"url": "https://github.com/nmattia/niv/archive/e0ca65c81a2d7a4d82a189f1e23a48d59ad42070.tar.gz",
12-
"url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz"
13-
},
14-
"nixpkgs": {
15-
"branch": "nixos-21.05",
16-
"description": "Nix Packages collection",
17-
"homepage": "",
18-
"owner": "NixOS",
19-
"repo": "nixpkgs",
20-
"rev": "d5aadbefd650cb0a05ba9c788a26327afce2396c",
21-
"sha256": "1h9nvs7nknczpz15c41p3irbykc5qzwr8wlcpzxndc25mnqi257w",
22-
"type": "tarball",
23-
"url": "https://github.com/NixOS/nixpkgs/archive/d5aadbefd650cb0a05ba9c788a26327afce2396c.tar.gz",
24-
"url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz"
25-
}
2+
"niv": {
3+
"branch": "master",
4+
"description": "Easy dependency management for Nix projects",
5+
"homepage": "https://github.com/nmattia/niv",
6+
"owner": "nmattia",
7+
"repo": "niv",
8+
"rev": "65a61b147f307d24bfd0a5cd56ce7d7b7cc61d2e",
9+
"sha256": "17mirpsx5wyw262fpsd6n6m47jcgw8k2bwcp1iwdnrlzy4dhcgqh",
10+
"type": "tarball",
11+
"url": "https://github.com/nmattia/niv/archive/65a61b147f307d24bfd0a5cd56ce7d7b7cc61d2e.tar.gz",
12+
"url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz"
13+
},
14+
"nixpkgs": {
15+
"branch": "nixos-21.05",
16+
"description": "Nix Packages collection",
17+
"homepage": "",
18+
"owner": "NixOS",
19+
"repo": "nixpkgs",
20+
"rev": "ce7a1190a0fa4ba3465b5f5471b08567060ca14c",
21+
"sha256": "1zr1s9gp0h5g4arlba1bpb9yqfaaby5195ydm6a2psaxhm748li9",
22+
"type": "tarball",
23+
"url": "https://github.com/NixOS/nixpkgs/archive/ce7a1190a0fa4ba3465b5f5471b08567060ca14c.tar.gz",
24+
"url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz"
25+
}
2626
}

src/Kafka/Consumer/Convert.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,16 +159,16 @@ fromMessagePtr ptr =
159159
s <- peek realPtr
160160
msg <- if err'RdKafkaMessageT s /= RdKafkaRespErrNoError
161161
then return . Left . KafkaResponseError $ err'RdKafkaMessageT s
162-
else Right <$> mkRecord s
162+
else Right <$> mkRecord s realPtr
163163
rdKafkaMessageDestroy realPtr
164164
return msg
165165
where
166-
mkRecord msg = do
166+
mkRecord msg rptr = do
167167
topic <- readTopic msg
168168
key <- readKey msg
169169
payload <- readPayload msg
170170
timestamp <- readTimestamp ptr
171-
headers <- fromRight mempty <$> readHeaders ptr
171+
headers <- fromRight mempty <$> readHeaders rptr
172172
return ConsumerRecord
173173
{ crTopic = TopicName topic
174174
, crPartition = PartitionId $ partition'RdKafkaMessageT msg

src/Kafka/Internal/RdKafka.chs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -993,7 +993,7 @@ data RdKafkaHeadersT
993993
{`RdKafkaHeadersTPtr', cIntConv `CSize', castPtr `Ptr CString', castPtr `Ptr Word8Ptr', `CSizePtr'} -> `RdKafkaRespErrT' cIntToEnum #}
994994

995995
{#fun rd_kafka_message_headers as ^
996-
{`RdKafkaMessageTPtr', alloca- `RdKafkaHeadersTPtr' peekPtr*} -> `RdKafkaRespErrT' cIntToEnum #}
996+
{castPtr `Ptr RdKafkaMessageT', alloca- `RdKafkaHeadersTPtr' peekPtr*} -> `RdKafkaRespErrT' cIntToEnum #}
997997

998998
--- Produceva api
999999

src/Kafka/Internal/Shared.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,13 @@ readTimestamp msg =
106106
RdKafkaTimestampNotAvailable -> NoTimestamp
107107

108108

109-
readHeaders :: RdKafkaMessageTPtr -> IO (Either RdKafkaRespErrT Headers)
109+
readHeaders :: Ptr RdKafkaMessageT -> IO (Either RdKafkaRespErrT Headers)
110110
readHeaders msg = do
111111
(err, headersPtr) <- rdKafkaMessageHeaders msg
112112
case err of
113113
RdKafkaRespErrNoent -> return $ Right mempty
114114
RdKafkaRespErrNoError -> fmap headersFromList <$> extractHeaders headersPtr
115-
e -> return $ Left e
115+
e -> return . Left $ e
116116
where extractHeaders ptHeaders =
117117
alloca $ \nptr ->
118118
alloca $ \vptr ->
@@ -126,7 +126,7 @@ readHeaders msg = do
126126
hn <- BS.packCString cstr
127127
hv <- word8PtrToBS (fromIntegral csize) wptr
128128
go ((hn, hv) : acc) (idx + 1)
129-
_ -> error "Unexpected error code"
129+
_ -> error "Unexpected error code while extracting headers"
130130
in go [] 0
131131

132132
readBS :: (t -> Int) -> (t -> Ptr Word8) -> t -> IO (Maybe BS.ByteString)

src/Kafka/Producer.hs

Lines changed: 10 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -58,35 +58,29 @@ module Kafka.Producer
5858
, module X
5959
, runProducer
6060
, newProducer
61-
, produceMessage, produceMessageBatch, produceMessageWithHeaders
62-
, produceMessage', produceMessageWithHeaders'
61+
, produceMessage
62+
, produceMessage'
6363
, flushProducer
6464
, closeProducer
6565
, RdKafkaRespErrT (..)
6666
)
6767
where
6868

69-
import Control.Arrow ((&&&))
7069
import Control.Exception (bracket)
71-
import Control.Monad (forM, forM_, (<=<))
70+
import Control.Monad (forM_)
7271
import Control.Monad.IO.Class (MonadIO (liftIO))
7372
import qualified Data.ByteString as BS
7473
import qualified Data.ByteString.Internal as BSI
75-
import Data.Function (on)
76-
import Data.List (groupBy, sortBy)
77-
import Data.Ord (comparing)
7874
import qualified Data.Text as Text
7975
import Foreign.C.String (withCString)
80-
import Foreign.ForeignPtr (newForeignPtr_, withForeignPtr)
81-
import Foreign.Marshal.Array (withArrayLen)
76+
import Foreign.ForeignPtr (withForeignPtr)
8277
import Foreign.Marshal.Utils (withMany)
8378
import Foreign.Ptr (Ptr, nullPtr, plusPtr)
84-
import Foreign.Storable (Storable (..))
8579
import Foreign.StablePtr (newStablePtr, castStablePtrToPtr)
86-
import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaRespErrT (..), RdKafkaTypeT (..), RdKafkaVuT(..), destroyUnmanagedRdKafkaTopic, newRdKafkaT, newUnmanagedRdKafkaTopicT, rdKafkaErrorCode, rdKafkaErrorDestroy, rdKafkaOutqLen, rdKafkaProduceBatch, rdKafkaMessageProduceVa, rdKafkaSetLogLevel)
87-
import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), kafkaConf, topicConf, Callback(..))
80+
import Kafka.Internal.RdKafka (RdKafkaRespErrT (..), RdKafkaTypeT (..), RdKafkaVuT(..), newRdKafkaT, rdKafkaErrorCode, rdKafkaErrorDestroy, rdKafkaOutqLen, rdKafkaMessageProduceVa, rdKafkaSetLogLevel)
81+
import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicProps (..), kafkaConf, topicConf, Callback(..))
8882
import Kafka.Internal.Shared (pollEvents)
89-
import Kafka.Producer.Convert (copyMsgFlags, handleProduceErrT, producePartitionCInt, producePartitionInt)
83+
import Kafka.Producer.Convert (copyMsgFlags, handleProduceErrT, producePartitionCInt)
9084
import Kafka.Producer.Types (KafkaProducer (..))
9185

9286
import Kafka.Producer.ProducerProperties as X
@@ -95,7 +89,7 @@ import Kafka.Types as X
9589

9690
-- | Runs Kafka Producer.
9791
-- The callback provided is expected to call 'produceMessage'
98-
-- or/and 'produceMessageBatch' to send messages to Kafka.
92+
-- to send messages to Kafka.
9993
{-# DEPRECATED runProducer "Use 'newProducer'/'closeProducer' instead" #-}
10094
runProducer :: ProducerProperties
10195
-> (KafkaProducer -> IO (Either KafkaError a))
@@ -146,42 +140,16 @@ produceMessage kp m = produceMessage' kp m (pure . mempty) >>= adjustRes
146140
Right () -> pure Nothing
147141
Left (ImmediateError err) -> pure (Just err)
148142

149-
-- | Sends a single message with a registered callback and headers.
150-
produceMessageWithHeaders :: MonadIO m
151-
=> KafkaProducer
152-
-> Headers
153-
-> ProducerRecord
154-
-> m (Maybe KafkaError)
155-
produceMessageWithHeaders kp headers msg = produceMessageWithHeaders' kp headers msg (pure . mempty) >>= adjustRes
156-
where
157-
adjustRes = \case
158-
Right () -> pure Nothing
159-
Left (ImmediateError err) -> pure (Just err)
160-
161143
-- | Sends a single message with a registered callback.
162144
--
163145
-- The callback can be a long running process, as it is forked by the thread
164146
-- that handles the delivery reports.
165-
--
166147
produceMessage' :: MonadIO m
167148
=> KafkaProducer
168149
-> ProducerRecord
169150
-> (DeliveryReport -> IO ())
170151
-> m (Either ImmediateError ())
171-
produceMessage' kp = produceMessageWithHeaders' kp mempty
172-
173-
-- | Sends a single message with a registered callback and headers.
174-
--
175-
-- The callback can be a long running process, as it is forked by the thread
176-
-- that handles the delivery reports.
177-
--
178-
produceMessageWithHeaders' :: MonadIO m
179-
=> KafkaProducer
180-
-> Headers
181-
-> ProducerRecord
182-
-> (DeliveryReport -> IO ())
183-
-> m (Either ImmediateError ())
184-
produceMessageWithHeaders' kp@(KafkaProducer (Kafka k) _ _) headers msg cb = liftIO $
152+
produceMessage' kp@(KafkaProducer (Kafka k) _ _) msg cb = liftIO $
185153
fireCallbacks >> produceIt
186154
where
187155
fireCallbacks =
@@ -190,7 +158,7 @@ produceMessageWithHeaders' kp@(KafkaProducer (Kafka k) _ _) headers msg cb = lif
190158
produceIt =
191159
withBS (prValue msg) $ \payloadPtr payloadLength ->
192160
withBS (prKey msg) $ \keyPtr keyLength ->
193-
withHeaders headers $ \hdrs ->
161+
withHeaders (prHeaders msg) $ \hdrs ->
194162
withCString (Text.unpack . unTopicName . prTopic $ msg) $ \topicName -> do
195163
callbackPtr <- newStablePtr cb
196164
let opts = [
@@ -208,59 +176,6 @@ produceMessageWithHeaders' kp@(KafkaProducer (Kafka k) _ _) headers msg cb = lif
208176
Just err -> Left . ImmediateError $ err
209177
Nothing -> Right ()
210178

211-
-- | Sends a batch of messages.
212-
-- Returns a list of messages which it was unable to send with corresponding errors.
213-
-- Since librdkafka is backed by a queue, this function can return before messages are sent. See
214-
-- 'flushProducer' to wait for queue to empty.
215-
produceMessageBatch :: MonadIO m
216-
=> KafkaProducer
217-
-> [ProducerRecord]
218-
-> m [(ProducerRecord, KafkaError)]
219-
-- ^ An empty list when the operation is successful,
220-
-- otherwise a list of "failed" messages with corresponsing errors.
221-
produceMessageBatch kp@(KafkaProducer (Kafka k) _ (TopicConf tc)) messages = liftIO $ do
222-
pollEvents kp (Just $ Timeout 0) -- fire callbacks if any exist (handle delivery reports)
223-
concat <$> forM (mkBatches messages) sendBatch
224-
where
225-
mkSortKey = prTopic &&& prPartition
226-
mkBatches = groupBy ((==) `on` mkSortKey) . sortBy (comparing mkSortKey)
227-
228-
mkTopic (TopicName tn) = newUnmanagedRdKafkaTopicT k (Text.unpack tn) (Just tc)
229-
230-
clTopic = either (return . const ()) destroyUnmanagedRdKafkaTopic
231-
232-
sendBatch [] = return []
233-
sendBatch batch = bracket (mkTopic $ prTopic (head batch)) clTopic (withTopic batch)
234-
235-
withTopic ms (Left err) = return $ (, KafkaError (Text.pack err)) <$> ms
236-
withTopic ms (Right t) = do
237-
let (partInt, partCInt) = (producePartitionInt &&& producePartitionCInt) $ prPartition (head ms)
238-
withForeignPtr t $ \topicPtr -> do
239-
nativeMs <- forM ms (toNativeMessage topicPtr partInt)
240-
withArrayLen nativeMs $ \len batchPtr -> do
241-
batchPtrF <- newForeignPtr_ batchPtr
242-
numRet <- rdKafkaProduceBatch t partCInt copyMsgFlags batchPtrF len
243-
if numRet == len then return []
244-
else do
245-
errs <- mapM (return . err'RdKafkaMessageT <=< peekElemOff batchPtr)
246-
[0..(fromIntegral $ len - 1)]
247-
return [(m, KafkaResponseError e) | (m, e) <- zip messages errs, e /= RdKafkaRespErrNoError]
248-
249-
toNativeMessage t p m =
250-
withBS (prValue m) $ \payloadPtr payloadLength ->
251-
withBS (prKey m) $ \keyPtr keyLength ->
252-
return RdKafkaMessageT
253-
{ err'RdKafkaMessageT = RdKafkaRespErrNoError
254-
, topic'RdKafkaMessageT = t
255-
, partition'RdKafkaMessageT = p
256-
, len'RdKafkaMessageT = payloadLength
257-
, payload'RdKafkaMessageT = payloadPtr
258-
, offset'RdKafkaMessageT = 0
259-
, keyLen'RdKafkaMessageT = keyLength
260-
, key'RdKafkaMessageT = keyPtr
261-
, opaque'RdKafkaMessageT = nullPtr
262-
}
263-
264179
-- | Closes the producer.
265180
-- Will wait until the outbound queue is drained before returning the control.
266181
closeProducer :: MonadIO m => KafkaProducer -> m ()

src/Kafka/Producer/Callbacks.hs

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{-# LANGUAGE TypeApplications #-}
2+
{-# LANGUAGE LambdaCase #-}
23
module Kafka.Producer.Callbacks
34
( deliveryCallback
45
, module X
@@ -15,10 +16,11 @@ import Foreign.StablePtr (castPtrToStablePtr, deRefStablePtr, fre
1516
import Kafka.Callbacks as X
1617
import Kafka.Consumer.Types (Offset(..))
1718
import Kafka.Internal.RdKafka (RdKafkaMessageT(..), RdKafkaRespErrT(..), rdKafkaConfSetDrMsgCb)
18-
import Kafka.Internal.Setup (KafkaConf(..), getRdKafkaConf, Callback(..))
19-
import Kafka.Internal.Shared (kafkaRespErr, readTopic, readKey, readPayload)
19+
import Kafka.Internal.Setup (getRdKafkaConf, Callback(..))
20+
import Kafka.Internal.Shared (kafkaRespErr, readTopic, readKey, readPayload, readHeaders)
2021
import Kafka.Producer.Types (ProducerRecord(..), DeliveryReport(..), ProducePartition(..))
2122
import Kafka.Types (KafkaError(..), TopicName(..))
23+
import Data.Either (fromRight)
2224

2325
-- | Sets the callback for delivery reports.
2426
--
@@ -36,10 +38,12 @@ deliveryCallback callback = Callback $ \kc -> rdKafkaConfSetDrMsgCb (getRdKafkaC
3638
then getErrno >>= (callback . NoMessageError . kafkaRespErr)
3739
else do
3840
s <- peek mptr
41+
prodRec <- mkProdRec mptr
3942
let cbPtr = opaque'RdKafkaMessageT s
40-
if err'RdKafkaMessageT s /= RdKafkaRespErrNoError
41-
then mkErrorReport s >>= callbacks cbPtr
42-
else mkSuccessReport s >>= callbacks cbPtr
43+
callbacks cbPtr $
44+
if err'RdKafkaMessageT s /= RdKafkaRespErrNoError
45+
then mkErrorReport s prodRec
46+
else mkSuccessReport s prodRec
4347

4448
callbacks cbPtr rep = do
4549
callback rep
@@ -51,24 +55,23 @@ deliveryCallback callback = Callback $ \kc -> rdKafkaConfSetDrMsgCb (getRdKafkaC
5155
-- blocking here would block librdkafka from continuing its execution
5256
void . forkIO $ msgCb rep
5357

54-
mkErrorReport :: RdKafkaMessageT -> IO DeliveryReport
55-
mkErrorReport msg = do
56-
prodRec <- mkProdRec msg
57-
pure $ DeliveryFailure prodRec (KafkaResponseError (err'RdKafkaMessageT msg))
58+
mkErrorReport :: RdKafkaMessageT -> ProducerRecord -> DeliveryReport
59+
mkErrorReport msg prodRec = DeliveryFailure prodRec (KafkaResponseError (err'RdKafkaMessageT msg))
5860

59-
mkSuccessReport :: RdKafkaMessageT -> IO DeliveryReport
60-
mkSuccessReport msg = do
61-
prodRec <- mkProdRec msg
62-
pure $ DeliverySuccess prodRec (Offset $ offset'RdKafkaMessageT msg)
61+
mkSuccessReport :: RdKafkaMessageT -> ProducerRecord -> DeliveryReport
62+
mkSuccessReport msg prodRec = DeliverySuccess prodRec (Offset $ offset'RdKafkaMessageT msg)
6363

64-
mkProdRec :: RdKafkaMessageT -> IO ProducerRecord
65-
mkProdRec msg = do
66-
topic <- readTopic msg
67-
key <- readKey msg
68-
payload <- readPayload msg
69-
pure ProducerRecord
70-
{ prTopic = TopicName topic
71-
, prPartition = SpecifiedPartition (partition'RdKafkaMessageT msg)
72-
, prKey = key
73-
, prValue = payload
74-
}
64+
mkProdRec :: Ptr RdKafkaMessageT -> IO ProducerRecord
65+
mkProdRec pmsg = do
66+
msg <- peek pmsg
67+
topic <- readTopic msg
68+
key <- readKey msg
69+
payload <- readPayload msg
70+
flip fmap (fromRight mempty <$> readHeaders pmsg) $ \headers ->
71+
ProducerRecord
72+
{ prTopic = TopicName topic
73+
, prPartition = SpecifiedPartition (partition'RdKafkaMessageT msg)
74+
, prKey = key
75+
, prValue = payload
76+
, prHeaders = headers
77+
}

src/Kafka/Producer/Types.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import Data.Typeable (Typeable)
2121
import GHC.Generics (Generic)
2222
import Kafka.Consumer.Types (Offset (..))
2323
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), HasTopicConf (..), Kafka (..), KafkaConf (..), TopicConf (..))
24-
import Kafka.Types (KafkaError (..), TopicName (..))
24+
import Kafka.Types (KafkaError (..), TopicName (..), Headers)
2525

2626
-- | The main type for Kafka message production, used e.g. to send messages.
2727
--
@@ -50,6 +50,7 @@ data ProducerRecord = ProducerRecord
5050
, prPartition :: !ProducePartition
5151
, prKey :: Maybe ByteString
5252
, prValue :: Maybe ByteString
53+
, prHeaders :: !Headers
5354
} deriving (Eq, Show, Typeable, Generic)
5455

5556
-- |

src/Kafka/Types.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ module Kafka.Types
2121
, KafkaDebug(..)
2222
, KafkaCompressionCodec(..)
2323
, TopicType(..)
24-
, Headers(unHeaders), headersFromList, headersToList
24+
, Headers, headersFromList, headersToList
2525
, topicType
2626
, kafkaDebugToText
2727
, kafkaCompressionCodecToText

0 commit comments

Comments
 (0)