Skip to content

Commit 9cf18e8

Browse files
Merge branch 'RentTheRunway-falconertc/fix_duplicate_binding_exchange'
2 parents 1e7df8c + 87325b0 commit 9cf18e8

File tree

2 files changed

+99
-5
lines changed

2 files changed

+99
-5
lines changed

deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,6 @@ remove_bindings(none, X, Bindings) ->
227227
ok.
228228

229229
remove_binding(#binding{source = S, destination = D, key = RK}) ->
230-
Weight = rabbit_data_coercion:to_integer(RK),
231230
rabbit_log:debug("Consistent hashing exchange: removing binding "
232231
"from exchange '~p' to destination '~p' with routing key '~s'",
233232
[rabbit_misc:rs(S), rabbit_misc:rs(D), RK]),
@@ -237,7 +236,7 @@ remove_binding(#binding{source = S, destination = D, key = RK}) ->
237236
next_bucket_number = NexN0}] ->
238237
%% Buckets with lower numbers stay as is; buckets that
239238
%% belong to this binding are removed; buckets with
240-
%% greater numbers are updated (their numbers are adjusted downwards by weight)
239+
%% greater numbers are updated (their numbers are adjusted downwards)
241240
BucketsOfThisBinding = maps:filter(fun (_K, V) -> V =:= D end, BM0),
242241
case maps:size(BucketsOfThisBinding) of
243242
0 -> ok;
@@ -251,10 +250,10 @@ remove_binding(#binding{source = S, destination = D, key = RK}) ->
251250
%% final state with "down the ring" buckets updated
252251
NewBucketsDownTheRing = maps:fold(
253252
fun(K0, V, Acc) ->
254-
maps:put(K0 - Weight, V, Acc)
253+
maps:put(K0 - N, V, Acc)
255254
end, #{}, BucketsDownTheRing),
256255
BM1 = maps:merge(UnchangedBuckets, NewBucketsDownTheRing),
257-
NextN = NexN0 - Weight,
256+
NextN = NexN0 - N,
258257
State = State0#chx_hash_ring{bucket_map = BM1,
259258
next_bucket_number = NextN},
260259

deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ groups() ->
4141
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case5,
4242
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case6,
4343
test_hash_ring_updates_when_exchange_is_deleted,
44-
test_hash_ring_updates_when_queue_is_unbound
44+
test_hash_ring_updates_when_queue_is_unbound,
45+
test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted,
46+
test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted
4547
]}
4648
].
4749

@@ -557,6 +559,99 @@ test_hash_ring_updates_when_queue_is_unbound(Config) ->
557559
rabbit_ct_client_helpers:close_channel(Chan),
558560
ok.
559561

562+
test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted(Config) ->
563+
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
564+
565+
X = <<"test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted">>,
566+
amqp_channel:call(Chan, #'exchange.delete' {exchange = X}),
567+
568+
Declare = #'exchange.declare'{exchange = X,
569+
type = <<"x-consistent-hash">>},
570+
#'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare),
571+
572+
Q1 = <<"f-q1">>,
573+
#'queue.declare_ok'{} =
574+
amqp_channel:call(Chan, #'queue.declare'{
575+
queue = Q1, durable = true, exclusive = false}),
576+
#'queue.bind_ok'{} =
577+
amqp_channel:call(Chan, #'queue.bind'{queue = Q1,
578+
exchange = X,
579+
routing_key = <<"2">>}),
580+
581+
#'queue.bind_ok'{} =
582+
amqp_channel:call(Chan, #'queue.bind'{queue = Q1,
583+
exchange = X,
584+
routing_key = <<"3">>}),
585+
586+
?assertEqual(5, count_buckets_of_exchange(Config, X)),
587+
assert_ring_consistency(Config, X),
588+
589+
Q2 = <<"f-q2">>,
590+
#'queue.declare_ok'{} =
591+
amqp_channel:call(Chan, #'queue.declare'{
592+
queue = Q2, durable = true, exclusive = false}),
593+
#'queue.bind_ok'{} =
594+
amqp_channel:call(Chan, #'queue.bind'{queue = Q2,
595+
exchange = X,
596+
routing_key = <<"4">>}),
597+
598+
?assertEqual(9, count_buckets_of_exchange(Config, X)),
599+
assert_ring_consistency(Config, X),
600+
601+
amqp_channel:call(Chan, #'queue.delete' {queue = Q1}),
602+
?assertEqual(4, count_buckets_of_exchange(Config, X)),
603+
assert_ring_consistency(Config, X),
604+
605+
clean_up_test_topology(Config, X, [Q1, Q2]),
606+
rabbit_ct_client_helpers:close_channel(Chan),
607+
ok.
608+
609+
test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted(Config) ->
610+
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
611+
612+
X = <<"test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted">>,
613+
amqp_channel:call(Chan, #'exchange.delete' {exchange = X}),
614+
615+
Declare = #'exchange.declare'{exchange = X,
616+
type = <<"x-consistent-hash">>},
617+
#'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare),
618+
619+
Q1 = <<"f-q1">>,
620+
#'queue.declare_ok'{} =
621+
amqp_channel:call(Chan, #'queue.declare'{
622+
queue = Q1, durable = true, exclusive = false}),
623+
#'queue.bind_ok'{} =
624+
amqp_channel:call(Chan, #'queue.bind'{queue = Q1,
625+
exchange = X,
626+
routing_key = <<"2">>}),
627+
628+
#'queue.bind_ok'{} =
629+
amqp_channel:call(Chan, #'queue.bind'{queue = Q1,
630+
exchange = X,
631+
routing_key = <<"3">>}),
632+
633+
Q2 = <<"f-q2">>,
634+
#'queue.declare_ok'{} =
635+
amqp_channel:call(Chan, #'queue.declare'{
636+
queue = Q2, durable = true, exclusive = false}),
637+
#'queue.bind_ok'{} =
638+
amqp_channel:call(Chan, #'queue.bind'{queue = Q2,
639+
exchange = X,
640+
routing_key = <<"4">>}),
641+
642+
?assertEqual(9, count_buckets_of_exchange(Config, X)),
643+
assert_ring_consistency(Config, X),
644+
645+
%% Both bindings to Q1 will be deleted
646+
amqp_channel:call(Chan, #'queue.unbind'{queue = Q1,
647+
exchange = X,
648+
routing_key = <<"3">>}),
649+
?assertEqual(4, count_buckets_of_exchange(Config, X)),
650+
assert_ring_consistency(Config, X),
651+
652+
clean_up_test_topology(Config, X, [Q1, Q2]),
653+
rabbit_ct_client_helpers:close_channel(Chan),
654+
ok.
560655

561656
%%
562657
%% Helpers

0 commit comments

Comments
 (0)