Skip to content

Commit ad9e3f7

Browse files
Merge pull request #11784 from rabbitmq/mergify/bp/v4.0.x/pr-11685
Handle timeouts possible in Khepri minority in `rabbit_db_binding` (backport #11685)
2 parents 2abe52f + f4d2452 commit ad9e3f7

File tree

7 files changed

+53
-13
lines changed

7 files changed

+53
-13
lines changed

deps/rabbit/src/rabbit_binding.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
-type bind_ok_or_error() :: 'ok' | bind_errors() |
4242
rabbit_types:error({'binding_invalid', string(), [any()]}) |
4343
%% inner_fun() result
44-
rabbit_types:error(rabbit_types:amqp_error()).
44+
rabbit_types:error(rabbit_types:amqp_error()) |
45+
rabbit_khepri:timeout_error().
4546
-type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()).
4647
-type inner_fun() ::
4748
fun((rabbit_types:exchange(),

deps/rabbit/src/rabbit_channel.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1822,6 +1822,9 @@ binding_action(Action, Binding, Username, ConnPid) ->
18221822
rabbit_misc:protocol_error(precondition_failed, Fmt, Args);
18231823
{error, #amqp_error{} = Error} ->
18241824
rabbit_misc:protocol_error(Error);
1825+
{error, timeout} ->
1826+
rabbit_misc:protocol_error(
1827+
internal_error, "Could not ~s binding due to timeout", [Action]);
18251828
ok ->
18261829
ok
18271830
end.

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ exists_in_khepri(#binding{source = SrcName,
120120
Errs ->
121121
Errs
122122
end
123-
end) of
123+
end, ro) of
124124
{ok, not_found} -> false;
125125
{ok, Set} -> sets:is_element(Binding, Set);
126126
Errs -> not_found_errs_in_khepri(not_found(Errs, SrcName, DstName))
@@ -150,8 +150,9 @@ not_found({[], []}, SrcName, DstName) ->
150150
Binding :: rabbit_types:binding(),
151151
Src :: rabbit_types:binding_source(),
152152
Dst :: rabbit_types:binding_destination(),
153-
ChecksFun :: fun((Src, Dst) -> ok | {error, Reason :: any()}),
154-
Ret :: ok | {error, Reason :: any()}.
153+
ChecksFun :: fun((Src, Dst) -> ok | {error, ChecksErrReason}),
154+
ChecksErrReason :: any(),
155+
Ret :: ok | {error, ChecksErrReason} | rabbit_khepri:timeout_error().
155156
%% @doc Writes a binding if it doesn't exist already and passes the validation in
156157
%% `ChecksFun' i.e. exclusive access
157158
%%
@@ -255,8 +256,12 @@ serial_in_khepri(true, X) ->
255256
Binding :: rabbit_types:binding(),
256257
Src :: rabbit_types:binding_source(),
257258
Dst :: rabbit_types:binding_destination(),
258-
ChecksFun :: fun((Src, Dst) -> ok | {error, Reason :: any()}),
259-
Ret :: ok | {ok, rabbit_binding:deletions()} | {error, Reason :: any()}.
259+
ChecksFun :: fun((Src, Dst) -> ok | {error, ChecksErrReason}),
260+
ChecksErrReason :: any(),
261+
Ret :: ok |
262+
{ok, rabbit_binding:deletions()} |
263+
{error, ChecksErrReason} |
264+
rabbit_khepri:timeout_error().
260265
%% @doc Deletes a binding record from the database if it passes the validation in
261266
%% `ChecksFun'. It also triggers the deletion of auto-delete exchanges if needed.
262267
%%

deps/rabbit/src/rabbit_definitions.erl

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -883,12 +883,17 @@ add_binding(VHost, Binding, ActingUser) ->
883883
rv(VHost, DestType, destination, Binding), ActingUser).
884884

885885
add_binding_int(Binding, Source, Destination, ActingUser) ->
886-
rabbit_binding:add(
887-
#binding{source = Source,
888-
destination = Destination,
889-
key = maps:get(routing_key, Binding, undefined),
890-
args = args(maps:get(arguments, Binding, undefined))},
891-
ActingUser).
886+
case rabbit_binding:add(
887+
#binding{source = Source,
888+
destination = Destination,
889+
key = maps:get(routing_key, Binding, undefined),
890+
args = args(maps:get(arguments, Binding, undefined))},
891+
ActingUser) of
892+
ok ->
893+
ok;
894+
{error, _} = Err ->
895+
throw(Err)
896+
end.
892897

893898
dest_type(Binding) ->
894899
rabbit_data_coercion:to_atom(maps:get(destination_type, Binding, undefined)).

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,17 @@
180180
clear_forced_metadata_store/0]).
181181
-endif.
182182

183+
-type timeout_error() :: khepri:error(timeout).
184+
%% Commands like 'put'/'delete' etc. might time out in Khepri. It might take
185+
%% the leader longer to apply the command and reply to the caller than the
186+
%% configured timeout. This error is easy to reproduce - a cluster which is
187+
%% only running a minority of nodes will consistently return `{error, timeout}`
188+
%% for commands until the cluster majority can be re-established. Commands
189+
%% returning `{error, timeout}` are a likely (but not certain) indicator that
190+
%% the node which submitted the command is running in a minority.
191+
192+
-export_type([timeout_error/0]).
193+
183194
-compile({no_auto_import, [get/1, get/2, nodes/0]}).
184195

185196
-define(RA_SYSTEM, coordination).

deps/rabbit/test/cluster_minority_SUITE.erl

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ groups() ->
2525
open_channel,
2626
declare_exchange,
2727
declare_binding,
28+
delete_binding,
2829
declare_queue,
2930
publish_to_exchange,
3031
publish_and_consume_to_local_classic_queue,
@@ -85,7 +86,7 @@ init_per_group(Group, Config0) when Group == client_operations;
8586
{skip, _} ->
8687
Config1;
8788
_ ->
88-
%% Before partitioning the cluster, create a policy and queue that can be used in
89+
%% Before partitioning the cluster, create resources that can be used in
8990
%% the test cases. They're needed for delete and consume operations, which can list
9091
%% them but fail to operate anything else.
9192
%%
@@ -95,6 +96,10 @@ init_per_group(Group, Config0) when Group == client_operations;
9596
%% To be used in consume_from_queue
9697
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = <<"test-queue">>,
9798
arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}]}),
99+
%% To be used in delete_binding
100+
#'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = <<"amq.fanout">>,
101+
source = <<"amq.direct">>,
102+
routing_key = <<"binding-to-be-deleted">>}),
98103

99104
%% Lower the default Khepri command timeout. By default this is set
100105
%% to 30s in `rabbit_khepri:setup/1' which makes the cases in this
@@ -160,6 +165,14 @@ declare_binding(Config) ->
160165
source = <<"amq.direct">>,
161166
routing_key = <<"key">>})).
162167

168+
delete_binding(Config) ->
169+
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
170+
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
171+
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
172+
amqp_channel:call(Ch, #'exchange.unbind'{destination = <<"amq.fanout">>,
173+
source = <<"amq.direct">>,
174+
routing_key = <<"binding-to-be-deleted">>})).
175+
163176
declare_queue(Config) ->
164177
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
165178
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -879,6 +879,8 @@ add_super_stream_binding(VirtualHost,
879879
{error, {binding_invalid, rabbit_misc:format(Fmt, Args)}};
880880
{error, #amqp_error{} = Error} ->
881881
{error, {internal_error, rabbit_misc:format("~tp", [Error])}};
882+
{error, timeout} ->
883+
{error, {internal_error, "failed to add binding due to a timeout"}};
882884
ok ->
883885
ok
884886
end.

0 commit comments

Comments
 (0)