Skip to content

Commit 7f16996

Browse files
committed
Khepri: use updated policy on rabbit_amqqueue_process
Similar to quorum queues, use the new record to update the policies in the internal gen_server state. When using Khepri, the projections can have stale data until it has been applied to all nodes. Test cases on `confirm_rejects_SUITE` show this race condition on `rabbit_amqqueue_process` when updating policies. This change uses the Khepri feature flag to hide the API change on `rabbit_amqqueue_process` when handling the `policy_changed` notification.
1 parent 3453f94 commit 7f16996

File tree

3 files changed

+51
-2
lines changed

3 files changed

+51
-2
lines changed

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1684,6 +1684,30 @@ handle_cast(policy_changed, State = #q{q = Q0}) ->
16841684
{ok, Q} = rabbit_amqqueue:lookup(Name),
16851685
noreply(process_args_policy(State#q{q = Q}));
16861686

1687+
handle_cast({policy_changed, Q0}, State) ->
1688+
Name = amqqueue:get_name(Q0),
1689+
PolicyVersion0 = amqqueue:get_policy_version(Q0),
1690+
%% We depend on the #q.q field being up to date at least WRT
1691+
%% policy (but not mirror pids) in various places, so when it
1692+
%% changes we go and read it from Mnesia again.
1693+
%%
1694+
%% This also has the side effect of waking us up so we emit a
1695+
%% stats event - so event consumers see the changed policy.
1696+
{ok, Q} = rabbit_amqqueue:lookup(Name),
1697+
PolicyVersion = amqqueue:get_policy_version(Q),
1698+
case PolicyVersion >= PolicyVersion0 of
1699+
true ->
1700+
noreply(process_args_policy(State#q{q = Q}));
1701+
false ->
1702+
%% Update just the policy, as pids and mirrors could have been
1703+
%% updated simultaneously. A testcase on the `confirm_rejects_SUITE`
1704+
%% fails consistently if the internal state is updated directly to `Q0`.
1705+
Q1 = amqqueue:set_policy(Q, amqqueue:get_policy(Q0)),
1706+
Q2 = amqqueue:set_operator_policy(Q1, amqqueue:get_operator_policy(Q0)),
1707+
Q3 = amqqueue:set_policy_version(Q2, PolicyVersion0),
1708+
noreply(process_args_policy(State#q{q = Q3}))
1709+
end;
1710+
16871711
handle_cast({sync_start, _, _}, State = #q{q = Q}) ->
16881712
Name = amqqueue:get_name(Q),
16891713
%% Only a mirror should receive this, it means we are a duplicated master

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,19 @@ find_missing_queues([Q1|Rem1], [Q2|Rem2] = Q2s, Acc) ->
169169
-spec policy_changed(amqqueue:amqqueue()) -> ok.
170170
policy_changed(Q) ->
171171
QPid = amqqueue:get_pid(Q),
172-
gen_server2:cast(QPid, policy_changed).
172+
case rabbit_khepri:is_enabled() of
173+
false ->
174+
gen_server2:cast(QPid, policy_changed);
175+
true ->
176+
%% When using Khepri, projections are guaranteed to be atomic on
177+
%% the node that processes them, but there might be a slight delay
178+
%% until they're applied on other nodes. Some test suites fail
179+
%% intermittently, showing that rabbit_amqqueue_process is reading
180+
%% the old policy value. We use the khepri ff to hide this API change,
181+
%% and use the up-to-date record to update the policy on the gen_server
182+
%% state.
183+
gen_server2:cast(QPid, {policy_changed, Q})
184+
end.
173185

174186
stat(Q) ->
175187
delegate:invoke(amqqueue:get_pid(Q),

deps/rabbit/test/confirms_rejects_SUITE.erl

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
-include_lib("common_test/include/ct.hrl").
55
-include_lib("amqp_client/include/amqp_client.hrl").
6+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
67
-compile(export_all).
78

89
all() ->
@@ -278,7 +279,7 @@ policy_resets_to_default(Config) ->
278279
QueueName, QueueName, <<"queues">>,
279280
[{<<"max-length">>, MaxLength}, {<<"overflow">>, XOverflow}]),
280281

281-
timer:sleep(1000),
282+
?awaitMatch([_, _], get_policy_definition(Config, QueueName), 30000),
282283

283284
[amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
284285
#amqp_msg{payload = <<"HI">>})
@@ -301,6 +302,8 @@ policy_resets_to_default(Config) ->
301302
QueueName, QueueName, <<"queues">>,
302303
[{<<"max-length">>, MaxLength}]),
303304

305+
?awaitMatch([_], get_policy_definition(Config, QueueName), 30000),
306+
304307
NotRejectedMessage = <<"HI-not-rejected">>,
305308
amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
306309
#amqp_msg{payload = NotRejectedMessage}),
@@ -318,6 +321,16 @@ policy_resets_to_default(Config) ->
318321
_ -> ok
319322
end.
320323

324+
get_policy_definition(Config, QueueName) ->
325+
{ok, Q} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup,
326+
[rabbit_misc:r(<<"/">>, queue, QueueName)]),
327+
case amqqueue:get_policy(Q) of
328+
undefined ->
329+
undefined;
330+
Policy ->
331+
proplists:get_value(definition, Policy, [])
332+
end.
333+
321334
consume_all_messages(Ch, QueueName) ->
322335
consume_all_messages(Ch, QueueName, []).
323336

0 commit comments

Comments
 (0)