Skip to content

Commit c928915

Browse files
Merge pull request #11786 from rabbitmq/mergify/bp/v3.13.x/pr-11784
Handle timeouts possible in Khepri minority in `rabbit_db_binding` (backport #11685) (backport #11784)
2 parents ea3c42e + 1749a9e commit c928915

File tree

7 files changed

+68
-28
lines changed

7 files changed

+68
-28
lines changed

deps/rabbit/src/rabbit_binding.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
-type bind_ok_or_error() :: 'ok' | bind_errors() |
4242
rabbit_types:error({'binding_invalid', string(), [any()]}) |
4343
%% inner_fun() result
44-
rabbit_types:error(rabbit_types:amqp_error()).
44+
rabbit_types:error(rabbit_types:amqp_error()) |
45+
rabbit_khepri:timeout_error().
4546
-type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()).
4647
-type inner_fun() ::
4748
fun((rabbit_types:exchange(),

deps/rabbit/src/rabbit_channel.erl

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1876,7 +1876,7 @@ queue_down_consumer_action(CTag, CMap) ->
18761876
_ -> {recover, ConsumeSpec}
18771877
end.
18781878

1879-
binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
1879+
binding_action(Action, SourceNameBin0, DestinationType, DestinationNameBin0,
18801880
RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext,
18811881
#user{username = Username} = User) ->
18821882
ExchangeNameBin = strip_cr_lf(SourceNameBin0),
@@ -1892,10 +1892,10 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
18921892
{ok, Exchange} ->
18931893
check_read_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext)
18941894
end,
1895-
case Fun(#binding{source = ExchangeName,
1896-
destination = DestinationName,
1897-
key = RoutingKey,
1898-
args = Arguments},
1895+
case rabbit_binding:Action(#binding{source = ExchangeName,
1896+
destination = DestinationName,
1897+
key = RoutingKey,
1898+
args = Arguments},
18991899
fun (_X, Q) when ?is_amqqueue(Q) ->
19001900
try rabbit_amqqueue:check_exclusive_access(Q, ConnPid)
19011901
catch exit:Reason -> {error, Reason}
@@ -1912,6 +1912,9 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
19121912
rabbit_misc:protocol_error(precondition_failed, Fmt, Args);
19131913
{error, #amqp_error{} = Error} ->
19141914
rabbit_misc:protocol_error(Error);
1915+
{error, timeout} ->
1916+
rabbit_misc:protocol_error(
1917+
internal_error, "Could not ~s binding due to timeout", [Action]);
19151918
ok ->
19161919
ok
19171920
end.
@@ -2381,15 +2384,15 @@ i(Item, _) ->
23812384
throw({bad_argument, Item}).
23822385

23832386
pending_raft_commands(QStates) ->
2384-
Fun = fun(_, V, Acc) ->
2387+
Action = fun(_, V, Acc) ->
23852388
case rabbit_queue_type:state_info(V) of
23862389
#{pending_raft_commands := P} ->
23872390
Acc + P;
23882391
_ ->
23892392
Acc
23902393
end
23912394
end,
2392-
rabbit_queue_type:fold_state(Fun, 0, QStates).
2395+
rabbit_queue_type:fold_state(Action, 0, QStates).
23932396

23942397
name(#ch{cfg = #conf{conn_name = ConnName, channel = Channel}}) ->
23952398
list_to_binary(rabbit_misc:format("~ts (~tp)", [ConnName, Channel])).
@@ -2441,31 +2444,31 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin,
24412444
routing_key = RoutingKey,
24422445
arguments = Arguments},
24432446
ConnPid, AuthzContext, _CollectorId, VHostPath, User) ->
2444-
binding_action(fun rabbit_binding:add/3,
2447+
binding_action(add,
24452448
SourceNameBin, exchange, DestinationNameBin,
24462449
RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
24472450
handle_method(#'exchange.unbind'{destination = DestinationNameBin,
24482451
source = SourceNameBin,
24492452
routing_key = RoutingKey,
24502453
arguments = Arguments},
24512454
ConnPid, AuthzContext, _CollectorId, VHostPath, User) ->
2452-
binding_action(fun rabbit_binding:remove/3,
2453-
SourceNameBin, exchange, DestinationNameBin,
2454-
RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
2455+
binding_action(remove,
2456+
SourceNameBin, exchange, DestinationNameBin,
2457+
RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
24552458
handle_method(#'queue.unbind'{queue = QueueNameBin,
24562459
exchange = ExchangeNameBin,
24572460
routing_key = RoutingKey,
24582461
arguments = Arguments},
24592462
ConnPid, AuthzContext, _CollectorId, VHostPath, User) ->
2460-
binding_action(fun rabbit_binding:remove/3,
2463+
binding_action(remove,
24612464
ExchangeNameBin, queue, QueueNameBin,
24622465
RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
24632466
handle_method(#'queue.bind'{queue = QueueNameBin,
24642467
exchange = ExchangeNameBin,
24652468
routing_key = RoutingKey,
24662469
arguments = Arguments},
24672470
ConnPid, AuthzContext, _CollectorId, VHostPath, User) ->
2468-
binding_action(fun rabbit_binding:add/3,
2471+
binding_action(add,
24692472
ExchangeNameBin, queue, QueueNameBin,
24702473
RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
24712474
%% Note that all declares to these are effectively passive. If it
@@ -2580,12 +2583,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
25802583
ConnPid, _AuthzContext, _CollectorPid, VHostPath, _User) ->
25812584
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
25822585
QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin),
2583-
Fun = fun (Q0) ->
2586+
Action = fun (Q0) ->
25842587
QStat = maybe_stat(NoWait, Q0),
25852588
{QStat, Q0}
25862589
end,
25872590
%% Note: no need to check if Q is an #amqqueue, with_or_die does it
2588-
{{ok, MessageCount, ConsumerCount}, Q} = rabbit_amqqueue:with_or_die(QueueName, Fun),
2591+
{{ok, MessageCount, ConsumerCount}, Q} = rabbit_amqqueue:with_or_die(QueueName, Action),
25892592
ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
25902593
{ok, QueueName, MessageCount, ConsumerCount};
25912594
handle_method(#'queue.delete'{queue = QueueNameBin,

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ exists_in_khepri(#binding{source = SrcName,
120120
Errs ->
121121
Errs
122122
end
123-
end) of
123+
end, ro) of
124124
{ok, not_found} -> false;
125125
{ok, Set} -> sets:is_element(Binding, Set);
126126
Errs -> not_found_errs_in_khepri(not_found(Errs, SrcName, DstName))
@@ -150,8 +150,9 @@ not_found({[], []}, SrcName, DstName) ->
150150
Binding :: rabbit_types:binding(),
151151
Src :: rabbit_types:binding_source(),
152152
Dst :: rabbit_types:binding_destination(),
153-
ChecksFun :: fun((Src, Dst) -> ok | {error, Reason :: any()}),
154-
Ret :: ok | {error, Reason :: any()}.
153+
ChecksFun :: fun((Src, Dst) -> ok | {error, ChecksErrReason}),
154+
ChecksErrReason :: any(),
155+
Ret :: ok | {error, ChecksErrReason} | rabbit_khepri:timeout_error().
155156
%% @doc Writes a binding if it doesn't exist already and passes the validation in
156157
%% `ChecksFun' i.e. exclusive access
157158
%%
@@ -255,8 +256,12 @@ serial_in_khepri(true, X) ->
255256
Binding :: rabbit_types:binding(),
256257
Src :: rabbit_types:binding_source(),
257258
Dst :: rabbit_types:binding_destination(),
258-
ChecksFun :: fun((Src, Dst) -> ok | {error, Reason :: any()}),
259-
Ret :: ok | {ok, rabbit_binding:deletions()} | {error, Reason :: any()}.
259+
ChecksFun :: fun((Src, Dst) -> ok | {error, ChecksErrReason}),
260+
ChecksErrReason :: any(),
261+
Ret :: ok |
262+
{ok, rabbit_binding:deletions()} |
263+
{error, ChecksErrReason} |
264+
rabbit_khepri:timeout_error().
260265
%% @doc Deletes a binding record from the database if it passes the validation in
261266
%% `ChecksFun'. It also triggers the deletion of auto-delete exchanges if needed.
262267
%%

deps/rabbit/src/rabbit_definitions.erl

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -886,12 +886,17 @@ add_binding(VHost, Binding, ActingUser) ->
886886
rv(VHost, DestType, destination, Binding), ActingUser).
887887

888888
add_binding_int(Binding, Source, Destination, ActingUser) ->
889-
rabbit_binding:add(
890-
#binding{source = Source,
891-
destination = Destination,
892-
key = maps:get(routing_key, Binding, undefined),
893-
args = args(maps:get(arguments, Binding, undefined))},
894-
ActingUser).
889+
case rabbit_binding:add(
890+
#binding{source = Source,
891+
destination = Destination,
892+
key = maps:get(routing_key, Binding, undefined),
893+
args = args(maps:get(arguments, Binding, undefined))},
894+
ActingUser) of
895+
ok ->
896+
ok;
897+
{error, _} = Err ->
898+
throw(Err)
899+
end.
895900

896901
dest_type(Binding) ->
897902
rabbit_data_coercion:to_atom(maps:get(destination_type, Binding, undefined)).

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,17 @@
180180
clear_forced_metadata_store/0]).
181181
-endif.
182182

183+
-type timeout_error() :: khepri:error(timeout).
184+
%% Commands like 'put'/'delete' etc. might time out in Khepri. It might take
185+
%% the leader longer to apply the command and reply to the caller than the
186+
%% configured timeout. This error is easy to reproduce - a cluster which is
187+
%% only running a minority of nodes will consistently return `{error, timeout}`
188+
%% for commands until the cluster majority can be re-established. Commands
189+
%% returning `{error, timeout}` are a likely (but not certain) indicator that
190+
%% the node which submitted the command is running in a minority.
191+
192+
-export_type([timeout_error/0]).
193+
183194
-compile({no_auto_import, [get/1, get/2, nodes/0]}).
184195

185196
-define(RA_SYSTEM, coordination).

deps/rabbit/test/cluster_minority_SUITE.erl

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ groups() ->
2626
open_channel,
2727
declare_exchange,
2828
declare_binding,
29+
delete_binding,
2930
declare_queue,
3031
publish_to_exchange,
3132
publish_and_consume_to_local_classic_queue,
@@ -86,7 +87,7 @@ init_per_group(Group, Config0) when Group == client_operations;
8687
{skip, _} ->
8788
Config1;
8889
_ ->
89-
%% Before partitioning the cluster, create a policy and queue that can be used in
90+
%% Before partitioning the cluster, create resources that can be used in
9091
%% the test cases. They're needed for delete and consume operations, which can list
9192
%% them but fail to operate anything else.
9293
%%
@@ -96,6 +97,10 @@ init_per_group(Group, Config0) when Group == client_operations;
9697
%% To be used in consume_from_queue
9798
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = <<"test-queue">>,
9899
arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}]}),
100+
%% To be used in delete_binding
101+
#'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = <<"amq.fanout">>,
102+
source = <<"amq.direct">>,
103+
routing_key = <<"binding-to-be-deleted">>}),
99104

100105
%% Lower the default Khepri command timeout. By default this is set
101106
%% to 30s in `rabbit_khepri:setup/1' which makes the cases in this
@@ -161,6 +166,14 @@ declare_binding(Config) ->
161166
source = <<"amq.direct">>,
162167
routing_key = <<"key">>})).
163168

169+
delete_binding(Config) ->
170+
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
171+
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
172+
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
173+
amqp_channel:call(Ch, #'exchange.unbind'{destination = <<"amq.fanout">>,
174+
source = <<"amq.direct">>,
175+
routing_key = <<"binding-to-be-deleted">>})).
176+
164177
declare_queue(Config) ->
165178
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
166179
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -879,6 +879,8 @@ add_super_stream_binding(VirtualHost,
879879
{error, {binding_invalid, rabbit_misc:format(Fmt, Args)}};
880880
{error, #amqp_error{} = Error} ->
881881
{error, {internal_error, rabbit_misc:format("~tp", [Error])}};
882+
{error, timeout} ->
883+
{error, {internal_error, "failed to add binding due to a timeout"}};
882884
ok ->
883885
ok
884886
end.

0 commit comments

Comments
 (0)