Skip to content

Commit 4ed7ea5

Browse files
Merge pull request #10049 from rabbitmq/mergify/bp/v3.12.x/pr-10034
QQ: ensure members that are started late have right config. (backport #10034)
2 parents 70de184 + 0b3913f commit 4ed7ea5

File tree

2 files changed

+40
-20
lines changed

2 files changed

+40
-20
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -202,11 +202,7 @@ start_cluster(Q) ->
202202
[QuorumSize, rabbit_misc:rs(QName), Leader]),
203203
case rabbit_amqqueue:internal_declare(NewQ1, false) of
204204
{created, NewQ} ->
205-
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
206-
?TICK_TIMEOUT),
207-
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
208-
?SNAPSHOT_INTERVAL),
209-
RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout, SnapshotInterval)
205+
RaConfs = [make_ra_conf(NewQ, ServerId)
210206
|| ServerId <- members(NewQ)],
211207
try erpc_call(Leader, ra, start_cluster,
212208
[?RA_SYSTEM, RaConfs, ?START_CLUSTER_TIMEOUT],
@@ -571,11 +567,10 @@ recover(_Vhost, Queues) ->
571567
lists:foldl(
572568
fun (Q0, {R0, F0}) ->
573569
{Name, _} = amqqueue:get_pid(Q0),
570+
ServerId = {Name, node()},
574571
QName = amqqueue:get_name(Q0),
575-
Nodes = get_nodes(Q0),
576-
Formatter = {?MODULE, format_ra_event, [QName]},
577-
Res = case ra:restart_server(?RA_SYSTEM, {Name, node()},
578-
#{ra_event_formatter => Formatter}) of
572+
MutConf = make_mutable_config(Q0),
573+
Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of
579574
ok ->
580575
% queue was restarted, good
581576
ok;
@@ -587,10 +582,7 @@ recover(_Vhost, Queues) ->
587582
[rabbit_misc:rs(QName), Err1]),
588583
% queue was never started on this node
589584
% so needs to be started from scratch.
590-
Machine = ra_machine(Q0),
591-
RaNodes = [{Name, Node} || Node <- Nodes],
592-
case ra:start_server(?RA_SYSTEM, Name, {Name, node()},
593-
Machine, RaNodes) of
585+
case start_server(make_ra_conf(Q0, ServerId)) of
594586
ok -> ok;
595587
Err2 ->
596588
rabbit_log:warning("recover: quorum queue ~w could not"
@@ -1105,11 +1097,7 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
11051097
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
11061098
ServerId = {RaName, Node},
11071099
Members = members(Q),
1108-
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
1109-
?TICK_TIMEOUT),
1110-
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
1111-
?SNAPSHOT_INTERVAL),
1112-
Conf = make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval),
1100+
Conf = make_ra_conf(Q, ServerId),
11131101
case ra:start_server(?RA_SYSTEM, Conf) of
11141102
ok ->
11151103
case ra:add_member(Members, ServerId, Timeout) of
@@ -1609,6 +1597,13 @@ members(Q) when ?amqqueue_is_quorum(Q) ->
16091597
format_ra_event(ServerId, Evt, QRef) ->
16101598
{'$gen_cast', {queue_event, QRef, {ServerId, Evt}}}.
16111599

1600+
make_ra_conf(Q, ServerId) ->
1601+
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
1602+
?TICK_TIMEOUT),
1603+
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
1604+
?SNAPSHOT_INTERVAL),
1605+
make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval).
1606+
16121607
make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval) ->
16131608
QName = amqqueue:get_name(Q),
16141609
RaMachine = ra_machine(Q),
@@ -1628,6 +1623,16 @@ make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval) ->
16281623
machine => RaMachine,
16291624
ra_event_formatter => Formatter}.
16301625

1626+
make_mutable_config(Q) ->
1627+
QName = amqqueue:get_name(Q),
1628+
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
1629+
?TICK_TIMEOUT),
1630+
Formatter = {?MODULE, format_ra_event, [QName]},
1631+
#{tick_timeout => TickTimeout,
1632+
ra_event_formatter => Formatter}.
1633+
1634+
1635+
16311636
get_nodes(Q) when ?is_amqqueue(Q) ->
16321637
#{nodes := Nodes} = amqqueue:get_type_state(Q),
16331638
Nodes.

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1650,7 +1650,7 @@ channel_handles_ra_event(Config) ->
16501650

16511651
declare_during_node_down(Config) ->
16521652
[Server, DownServer, _] = Servers = rabbit_ct_broker_helpers:get_node_configs(
1653-
Config, nodename),
1653+
Config, nodename),
16541654

16551655
stop_node(Config, DownServer),
16561656
Running = Servers -- [DownServer],
@@ -1676,7 +1676,20 @@ declare_during_node_down(Config) ->
16761676

16771677
publish(Ch, QQ),
16781678
wait_for_messages_ready(Servers, RaName, 1),
1679-
ok.
1679+
1680+
case rabbit_ct_helpers:is_mixed_versions() of
1681+
true ->
1682+
%% stop here if mixexd
1683+
ok;
1684+
false ->
1685+
%% further assertions that we can consume from the newly
1686+
%% started member
1687+
SubCh = rabbit_ct_client_helpers:open_channel(Config, DownServer),
1688+
subscribe(SubCh, QQ, false),
1689+
receive_and_ack(Ch),
1690+
wait_for_messages_ready(Servers, RaName, 0),
1691+
ok
1692+
end.
16801693

16811694
simple_confirm_availability_on_leader_change(Config) ->
16821695
[Node1, Node2, _Node3] = Servers =
@@ -2772,6 +2785,8 @@ receive_and_ack(Ch) ->
27722785
redelivered = false}, _} ->
27732786
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
27742787
multiple = false})
2788+
after 5000 ->
2789+
ct:fail("receive_and_ack timed out", [])
27752790
end.
27762791

27772792
message_ttl_policy(Config) ->

0 commit comments

Comments
 (0)