Skip to content

Commit 28cd723

Browse files
committed
Use assign/pause/redirect/unpause pattern
1 parent d84bf89 commit 28cd723

File tree

2 files changed

+43
-37
lines changed

2 files changed

+43
-37
lines changed

src/Kafka/Consumer/Callbacks.hs

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,20 @@ module Kafka.Consumer.Callbacks
66
)
77
where
88

9-
import Control.Arrow ((&&&))
10-
import Control.Concurrent (threadDelay)
11-
import Control.Monad (forM_, void)
12-
import Data.Monoid ((<>))
13-
import qualified Data.Text as Text
14-
import Foreign.ForeignPtr (newForeignPtr_)
15-
import Foreign.Ptr (nullPtr)
16-
import Kafka.Callbacks as X
17-
import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'', toNativeTopicPartitionList)
18-
import Kafka.Consumer.Types (KafkaConsumer (..), RebalanceEvent (..), TopicPartition (..))
19-
import Kafka.Internal.RdKafka
20-
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue)
21-
import Kafka.Internal.Shared (kafkaErrorToMaybe)
22-
import Kafka.Types (KafkaError (..), PartitionId (..), TopicName (..))
9+
import Control.Arrow ((&&&))
10+
import Control.Monad (forM_, void)
11+
import Data.Monoid ((<>))
12+
import Foreign.ForeignPtr (newForeignPtr_)
13+
import Foreign.Ptr (nullPtr)
14+
import Kafka.Callbacks as X
15+
import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'', toNativeTopicPartitionList)
16+
import Kafka.Consumer.Types (KafkaConsumer (..), RebalanceEvent (..), TopicPartition (..))
17+
import Kafka.Internal.RdKafka
18+
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue)
19+
import Kafka.Internal.Shared (kafkaErrorToMaybe)
20+
import Kafka.Types (KafkaError (..), PartitionId (..), TopicName (..))
21+
22+
import qualified Data.Text as Text
2323

2424
-- | Sets a callback that is called when rebalance is needed.
2525
--
@@ -81,22 +81,24 @@ setRebalanceCallback f k e pls = do
8181
case mbq of
8282
Nothing -> pure ()
8383
Just mq -> do
84+
let (Kafka kptr) = getKafka k
85+
f k (RebalanceBeforeAssign assignment)
86+
-- Magnus Edenhill:
87+
-- If you redirect after assign() it means some messages may be forwarded to the single consumer queue,
88+
-- so either do it before assign() or do: assign(); pause(); redirect; resume()
89+
void $ rdKafkaAssign kptr pls
90+
void $ rdKafkaPausePartitions kptr pls
8491
forM_ ps (\tp -> redirectPartitionQueue (getKafka k) (tpTopicName tp) (tpPartition tp) mq)
85-
-- sleep for 1 second.
86-
-- it looks like without it there is not enough time for redirect to happen
87-
-- or something similarly strange. I don't understand it.
88-
-- If you know WTH is going on PLEASE let me know because the current "fix" is ugly
89-
-- and is completely unreasonable :(
90-
threadDelay 1000000
91-
f k (RebalanceBeforeAssign assignment)
92-
void $ assign' k pls -- pass as pointer to avoid possible serialisation issues
92+
void $ rdKafkaResumePartitions kptr pls
93+
-- void $ assign' k pls -- pass as pointer to avoid possible serialisation issues
9394
f k (RebalanceAssign assignment)
9495
KafkaResponseError RdKafkaRespErrRevokePartitions -> do
9596
f k (RebalanceBeforeRevoke assignment)
9697
void $ assign k []
9798
f k (RebalanceRevoke assignment)
9899
x -> error $ "Rebalance: UNKNOWN response: " <> show x
99100

101+
100102
-- | Assigns specified partitions to a current consumer.
101103
-- Assigning an empty list means unassigning from all partitions that are currently assigned.
102104
assign :: KafkaConsumer -> [TopicPartition] -> IO (Maybe KafkaError)
@@ -107,9 +109,13 @@ assign (KafkaConsumer (Kafka k) _) ps =
107109
er = KafkaResponseError <$> (pl >>= rdKafkaAssign k)
108110
in kafkaErrorToMaybe <$> er
109111

110-
-- | Assigns specified partitions to a current consumer.
111-
-- Assigning an empty list means unassigning from all partitions that are currently assigned.
112-
assign' :: KafkaConsumer -> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
113-
assign' (KafkaConsumer (Kafka k) _) pls =
114-
(kafkaErrorToMaybe . KafkaResponseError) <$> rdKafkaAssign k pls
112+
-- -- | Assigns specified partitions to a current consumer.
113+
-- -- Assigning an empty list means unassigning from all partitions that are currently assigned.
114+
-- assign' :: KafkaConsumer -> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
115+
-- assign' (KafkaConsumer (Kafka k) _) pls = do
116+
117+
118+
119+
-- where
120+
-- asExcept f = ExceptT $ rdKafkaErrorToEither <$> f
115121

src/Kafka/Internal/Shared.hs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,24 @@ module Kafka.Internal.Shared
1717
)
1818
where
1919

20-
import Data.Text (Text)
21-
import qualified Data.Text as Text
2220
import Control.Concurrent (forkIO, rtsSupportsBoundThreads)
2321
import Control.Exception (throw)
2422
import Control.Monad (void, when)
2523
import qualified Data.ByteString as BS
2624
import qualified Data.ByteString.Internal as BSI
25+
import Data.Text (Text)
26+
import qualified Data.Text as Text
2727
import Data.Word (Word8)
28-
import Foreign.Ptr (Ptr, nullPtr)
29-
import Foreign.Marshal.Alloc (alloca)
28+
import Foreign.C.Error (Errno (..))
3029
import Foreign.ForeignPtr (newForeignPtr_)
31-
import Foreign.Storable (Storable(peek))
32-
import Foreign.C.Error (Errno(..))
33-
import Kafka.Consumer.Types (Timestamp(..))
30+
import Foreign.Marshal.Alloc (alloca)
31+
import Foreign.Ptr (Ptr, nullPtr)
32+
import Foreign.Storable (Storable (peek))
33+
import Kafka.Consumer.Types (Timestamp (..))
3434
import Kafka.Internal.CancellationToken as CToken
35-
import Kafka.Internal.RdKafka (RdKafkaTimestampTypeT(..), RdKafkaMessageTPtr, RdKafkaMessageT(..), RdKafkaRespErrT(..), Word8Ptr, rdKafkaPoll, rdKafkaErrno2err, rdKafkaTopicName, rdKafkaMessageTimestamp)
36-
import Kafka.Internal.Setup (HasKafka(..), Kafka(..))
37-
import Kafka.Types (KafkaError(..), Timeout(..), Millis(..))
35+
import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaMessageTPtr, RdKafkaRespErrT (..), RdKafkaTimestampTypeT (..), Word8Ptr, rdKafkaErrno2err, rdKafkaMessageTimestamp, rdKafkaPoll, rdKafkaTopicName)
36+
import Kafka.Internal.Setup (HasKafka (..), Kafka (..))
37+
import Kafka.Types (KafkaError (..), Millis (..), Timeout (..))
3838

3939
runEventLoop :: HasKafka a => a -> CancellationToken -> Maybe Timeout -> IO ()
4040
runEventLoop k ct timeout =

0 commit comments

Comments
 (0)