Skip to content

Commit 5a6cc78

Browse files
committed
Use assign/pause/redirect/unpause pattern
1 parent 6db07e0 commit 5a6cc78

File tree

2 files changed

+42
-37
lines changed

2 files changed

+42
-37
lines changed

src/Kafka/Consumer/Callbacks.hs

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,20 @@ module Kafka.Consumer.Callbacks
66
)
77
where
88

9-
import Control.Arrow ((&&&))
10-
import Control.Concurrent (threadDelay)
11-
import Control.Monad (forM_, void)
12-
import Data.Monoid ((<>))
13-
import qualified Data.Text as Text
14-
import Foreign.ForeignPtr (newForeignPtr_)
15-
import Foreign.Ptr (nullPtr)
16-
import Kafka.Callbacks as X
17-
import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'', toNativeTopicPartitionList)
18-
import Kafka.Consumer.Types (KafkaConsumer (..), RebalanceEvent (..), TopicPartition (..))
19-
import Kafka.Internal.RdKafka
20-
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue)
21-
import Kafka.Internal.Shared (kafkaErrorToMaybe)
22-
import Kafka.Types (KafkaError (..), PartitionId (..), TopicName (..))
9+
import Control.Arrow ((&&&))
10+
import Control.Monad (forM_, void)
11+
import Data.Monoid ((<>))
12+
import Foreign.ForeignPtr (newForeignPtr_)
13+
import Foreign.Ptr (nullPtr)
14+
import Kafka.Callbacks as X
15+
import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'', toNativeTopicPartitionList)
16+
import Kafka.Consumer.Types (KafkaConsumer (..), RebalanceEvent (..), TopicPartition (..))
17+
import Kafka.Internal.RdKafka
18+
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue)
19+
import Kafka.Internal.Shared (kafkaErrorToMaybe)
20+
import Kafka.Types (KafkaError (..), PartitionId (..), TopicName (..))
21+
22+
import qualified Data.Text as Text
2323

2424
-- | Sets a callback that is called when rebalance is needed.
2525
--
@@ -78,25 +78,26 @@ setRebalanceCallback f k e pls = do
7878
case e of
7979
KafkaResponseError RdKafkaRespErrAssignPartitions -> do
8080
mbq <- getRdMsgQueue $ getKafkaConf k
81+
f k (RebalanceBeforeAssign assignment)
8182
case mbq of
8283
Nothing -> pure ()
8384
Just mq -> do
85+
let (Kafka kptr) = getKafka k
86+
-- Magnus Edenhill:
87+
-- If you redirect after assign() it means some messages may be forwarded to the single consumer queue,
88+
-- so either do it before assign() or do: assign(); pause(); redirect; resume()
89+
void $ rdKafkaAssign kptr pls
90+
void $ rdKafkaPausePartitions kptr pls
8491
forM_ ps (\tp -> redirectPartitionQueue (getKafka k) (tpTopicName tp) (tpPartition tp) mq)
85-
-- sleep for 1 second.
86-
-- it looks like without it there is not enough time for redirect to happen
87-
-- or something similarly strange. I don't understand it.
88-
-- If you know WTH is going on PLEASE let me know because the current "fix" is ugly
89-
-- and is completely unreasonable :(
90-
threadDelay 1000000
91-
f k (RebalanceBeforeAssign assignment)
92-
void $ assign' k pls -- pass as pointer to avoid possible serialisation issues
92+
void $ rdKafkaResumePartitions kptr pls
9393
f k (RebalanceAssign assignment)
9494
KafkaResponseError RdKafkaRespErrRevokePartitions -> do
9595
f k (RebalanceBeforeRevoke assignment)
9696
void $ assign k []
9797
f k (RebalanceRevoke assignment)
9898
x -> error $ "Rebalance: UNKNOWN response: " <> show x
9999

100+
100101
-- | Assigns specified partitions to a current consumer.
101102
-- Assigning an empty list means unassigning from all partitions that are currently assigned.
102103
assign :: KafkaConsumer -> [TopicPartition] -> IO (Maybe KafkaError)
@@ -107,9 +108,13 @@ assign (KafkaConsumer (Kafka k) _) ps =
107108
er = KafkaResponseError <$> (pl >>= rdKafkaAssign k)
108109
in kafkaErrorToMaybe <$> er
109110

110-
-- | Assigns specified partitions to a current consumer.
111-
-- Assigning an empty list means unassigning from all partitions that are currently assigned.
112-
assign' :: KafkaConsumer -> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
113-
assign' (KafkaConsumer (Kafka k) _) pls =
114-
(kafkaErrorToMaybe . KafkaResponseError) <$> rdKafkaAssign k pls
111+
-- -- | Assigns specified partitions to a current consumer.
112+
-- -- Assigning an empty list means unassigning from all partitions that are currently assigned.
113+
-- assign' :: KafkaConsumer -> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
114+
-- assign' (KafkaConsumer (Kafka k) _) pls = do
115+
116+
117+
118+
-- where
119+
-- asExcept f = ExceptT $ rdKafkaErrorToEither <$> f
115120

src/Kafka/Internal/Shared.hs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,24 @@ module Kafka.Internal.Shared
1717
)
1818
where
1919

20-
import Data.Text (Text)
21-
import qualified Data.Text as Text
2220
import Control.Concurrent (forkIO, rtsSupportsBoundThreads)
2321
import Control.Exception (throw)
2422
import Control.Monad (void, when)
2523
import qualified Data.ByteString as BS
2624
import qualified Data.ByteString.Internal as BSI
25+
import Data.Text (Text)
26+
import qualified Data.Text as Text
2727
import Data.Word (Word8)
28-
import Foreign.Ptr (Ptr, nullPtr)
29-
import Foreign.Marshal.Alloc (alloca)
28+
import Foreign.C.Error (Errno (..))
3029
import Foreign.ForeignPtr (newForeignPtr_)
31-
import Foreign.Storable (Storable(peek))
32-
import Foreign.C.Error (Errno(..))
33-
import Kafka.Consumer.Types (Timestamp(..))
30+
import Foreign.Marshal.Alloc (alloca)
31+
import Foreign.Ptr (Ptr, nullPtr)
32+
import Foreign.Storable (Storable (peek))
33+
import Kafka.Consumer.Types (Timestamp (..))
3434
import Kafka.Internal.CancellationToken as CToken
35-
import Kafka.Internal.RdKafka (RdKafkaTimestampTypeT(..), RdKafkaMessageTPtr, RdKafkaMessageT(..), RdKafkaRespErrT(..), Word8Ptr, rdKafkaPoll, rdKafkaErrno2err, rdKafkaTopicName, rdKafkaMessageTimestamp)
36-
import Kafka.Internal.Setup (HasKafka(..), Kafka(..))
37-
import Kafka.Types (KafkaError(..), Timeout(..), Millis(..))
35+
import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaMessageTPtr, RdKafkaRespErrT (..), RdKafkaTimestampTypeT (..), Word8Ptr, rdKafkaErrno2err, rdKafkaMessageTimestamp, rdKafkaPoll, rdKafkaTopicName)
36+
import Kafka.Internal.Setup (HasKafka (..), Kafka (..))
37+
import Kafka.Types (KafkaError (..), Millis (..), Timeout (..))
3838

3939
runEventLoop :: HasKafka a => a -> CancellationToken -> Maybe Timeout -> IO ()
4040
runEventLoop k ct timeout =

0 commit comments

Comments
 (0)