Skip to content

Commit 803bd70

Browse files
committed
Use node with latest machine version to connect
References #4133
1 parent 11dfc3a commit 803bd70

File tree

2 files changed

+21
-10
lines changed

2 files changed

+21
-10
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -260,9 +260,7 @@ register_listener(Q) when ?is_amqqueue(Q)->
260260
#{name := StreamId} = amqqueue:get_type_state(Q),
261261
process_command({register_listener,
262262
#{pid => self(),
263-
node => node(self()),
264-
stream_id => StreamId,
265-
type => leader}}).
263+
stream_id => StreamId}}).
266264

267265
-spec register_local_member_listener(amqqueue:amqqueue()) ->
268266
{error, term()} | {ok, ok | stream_not_found, atom() | {atom(), atom()}}.
@@ -462,11 +460,22 @@ apply(#{machine_version := MachineVersion} = Meta,
462460

463461
apply(#{machine_version := MachineVersion} = Meta,
464462
{register_listener, #{pid := Pid,
465-
node := Node,
466-
stream_id := StreamId,
467-
type := Type}},
463+
stream_id := StreamId} = Args},
468464
#?MODULE{streams = Streams,
469465
monitors = Monitors0} = State0) when MachineVersion >= 2 ->
466+
Node = case Args of
467+
#{node := N} ->
468+
N;
469+
_ ->
470+
node(Pid)
471+
end,
472+
Type = case Args of
473+
#{type := T} ->
474+
T;
475+
_ ->
476+
leader
477+
end,
478+
470479
case Streams of
471480
#{StreamId := #stream{listeners = Listeners0} = Stream0} ->
472481
{LKey, LValue} =

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ groups() ->
5454
{cluster_size_3_1, [], [shrink_coordinator_cluster]},
5555
{cluster_size_3_2, [], [recover,
5656
declare_with_node_down]},
57-
{cluster_size_3_parallel_1, [parallel], [delete_replica,
57+
{cluster_size_3_parallel_1, [parallel], [
58+
delete_replica,
5859
delete_last_replica,
5960
delete_classic_replica,
6061
delete_quorum_replica,
@@ -65,7 +66,8 @@ groups() ->
6566
leader_locator_client_local,
6667
declare_delete_same_stream,
6768
leader_locator_random,
68-
leader_locator_least_leaders]},
69+
leader_locator_least_leaders
70+
]},
6971
{cluster_size_3_parallel_2, [parallel], all_tests()},
7072
{unclustered_size_3_1, [], [add_replica]},
7173
{unclustered_size_3_2, [], [consume_without_local_replica]},
@@ -1322,7 +1324,7 @@ consume_from_relative_time_offset(Config) ->
13221324
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
13231325

13241326
consume_from_replica(Config) ->
1325-
[Server1, Server2 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1327+
[Server1, _, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
13261328

13271329
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
13281330
Q = ?config(queue_name, Config),
@@ -1338,7 +1340,7 @@ consume_from_replica(Config) ->
13381340
length(proplists:get_value(online, Info)) == 3
13391341
end),
13401342

1341-
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
1343+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server3),
13421344
qos(Ch2, 10, false),
13431345

13441346
subscribe(Ch2, Q, false, 0),

0 commit comments

Comments
 (0)