Skip to content

Commit b0305cd

Browse files
author
Levashov, Mykyta
committed
Bump librdkafka version to 1.6.1
1 parent aa1c2c6 commit b0305cd

File tree

3 files changed

+9
-8
lines changed

3 files changed

+9
-8
lines changed

scripts/build-librdkafka

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/bin/bash
22

3-
RDKAFKA_VER="849c066b559950b02e37a69256f0cb7b04381d0e"
3+
RDKAFKA_VER="1a722553638bba85dbda5050455f7b9a5ef302de"
44

55
PRJ=$PWD
66
DST="$PRJ/.librdkafka"

src/Kafka/Producer.hs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ import qualified Data.Text as Text
7979
import Foreign.C.String (withCString)
8080
import Foreign.ForeignPtr (newForeignPtr_, withForeignPtr)
8181
import Foreign.Marshal.Array (withArrayLen)
82+
import Foreign.Marshal.Utils (withMany)
8283
import Foreign.Ptr (Ptr, nullPtr, plusPtr)
8384
import Foreign.Storable (Storable (..))
8485
import Foreign.StablePtr (newStablePtr, castStablePtrToPtr)
@@ -279,13 +280,12 @@ flushProducer kp = liftIO $ do
279280
------------------------------------------------------------------------------------
280281

281282
withHeaders :: Headers -> ([RdKafkaVuT] -> IO a) -> IO a
282-
withHeaders hds handle = go (headersToList hds) []
283+
withHeaders hds = withMany allocHeader (headersToList hds)
283284
where
284-
go [] acc = handle acc
285-
go ((nm, val) : xs) acc =
286-
BS.useAsCString nm $ \cnm ->
285+
allocHeader (nm, val) f =
286+
BS.useAsCString nm $ \cnm ->
287287
withBS (Just val) $ \vp vl ->
288-
go xs (Header'RdKafkaVu cnm vp (fromIntegral vl) : acc)
288+
f $ Header'RdKafkaVu cnm vp (fromIntegral vl)
289289

290290
withBS :: Maybe BS.ByteString -> (Ptr a -> Int -> IO b) -> IO b
291291
withBS Nothing f = f nullPtr 0

tests-it/Kafka/IntegrationSpec.hs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import Control.Monad (forM, forM_, void)
1010
import Control.Monad.Loops
1111
import Data.Either
1212
import Data.Map (fromList)
13+
import qualified Data.Set as Set
1314
import Data.Monoid ((<>))
1415
import Kafka.Consumer
1516
import Kafka.Metadata
@@ -155,7 +156,7 @@ spec = do
155156
describe "Kafka.Headers.Spec" $ do
156157
let testHeaders = headersFromList [("a-header-name", "a-header-value"), ("b-header-name", "b-header-value")]
157158

158-
specWithKafka "Consumer after records with headers are published" consumerProps $ do
159+
specWithKafka "Headers consumer/producer" consumerProps $ do
159160
it "1. sends 2 messages to test topic enriched with headers" $ \(k, prod) -> do
160161
void $ receiveMessages k
161162

@@ -166,7 +167,7 @@ spec = do
166167
(length <$> res) `shouldBe` Right 2
167168

168169
forM_ res $ \rcs ->
169-
forM_ rcs ((`shouldBe` testHeaders) . crHeaders)
170+
forM_ rcs ((`shouldBe` Set.fromList (headersToList testHeaders)) . Set.fromList . headersToList . crHeaders)
170171

171172
----------------------------------------------------------------------------------------------------------------
172173

0 commit comments

Comments
 (0)