Skip to content

Commit 19c4ac5

Browse files
Merge pull request #11823 from rabbitmq/mergify/bp/v4.0.x/pr-11793
Configure shovels to wait until all sources and destinations are declared (backport #11793)
2 parents e53596b + 2bbd864 commit 19c4ac5

File tree

7 files changed

+370
-33
lines changed

7 files changed

+370
-33
lines changed

deps/rabbitmq_shovel/app.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ def all_srcs(name = "all_srcs"):
110110

111111
filegroup(
112112
name = "priv",
113+
srcs = ["priv/schema/rabbitmq_shovel.schema"],
113114
)
114115

115116
filegroup(
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
%% ----------------------------------------------------------------------------
2+
%% RabbitMQ Shovel plugin
3+
%%
4+
%% See https://github.com/rabbitmq/rabbitmq-shovel/blob/stable/README.md
5+
%% for details
6+
%% ----------------------------------------------------------------------------
7+
8+
9+
{mapping, "shovel.topology.predeclared", "rabbitmq_shovel.topology.predeclared", [
10+
[{datatype, {enum, [true, false]}}]
11+
]}.

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
-module(rabbit_amqp091_shovel).
99

10+
-define(APP, rabbitmq_shovel).
11+
1012
-behaviour(rabbit_shovel_behaviour).
1113

1214
-include_lib("amqp_client/include/amqp_client.hrl").
@@ -39,7 +41,7 @@
3941
%% from and can break with the next upgrade. It should not be used by
4042
%% another one that the one who created it or survive a node restart.
4143
%% Thus, function references have been replace by the following MFA.
42-
-export([decl_fun/3, publish_fun/4, props_fun_timestamp_header/4,
44+
-export([decl_fun/3, check_fun/3, publish_fun/4, props_fun_timestamp_header/4,
4345
props_fun_forward_header/5]).
4446

4547
-define(MAX_CONNECTION_CLOSE_TIMEOUT, 10000).
@@ -54,7 +56,7 @@ parse(_Name, {source, Source}) ->
5456
CArgs = proplists:get_value(consumer_args, Source, []),
5557
#{module => ?MODULE,
5658
uris => proplists:get_value(uris, Source),
57-
resource_decl => decl_fun(Source),
59+
resource_decl => decl_fun({source, Source}),
5860
queue => Queue,
5961
delete_after => proplists:get_value(delete_after, Source, never),
6062
prefetch_count => Prefetch,
@@ -70,7 +72,7 @@ parse(Name, {destination, Dest}) ->
7072
PropsFun2 = add_timestamp_header_fun(ATH, PropsFun1),
7173
#{module => ?MODULE,
7274
uris => proplists:get_value(uris, Dest),
73-
resource_decl => decl_fun(Dest),
75+
resource_decl => decl_fun({destination, Dest}),
7476
props_fun => PropsFun2,
7577
fields_fun => PubFieldsFun,
7678
add_forward_headers => AFH,
@@ -604,16 +606,31 @@ parse_declaration({[{Method, Props} | _Rest], _Acc}) ->
604606
parse_declaration({[Method | Rest], Acc}) ->
605607
parse_declaration({[{Method, []} | Rest], Acc}).
606608

607-
decl_fun(Endpoint) ->
608-
Decl = parse_declaration({proplists:get_value(declarations, Endpoint, []),
609-
[]}),
609+
decl_fun({source, Endpoint}) ->
610+
case parse_declaration({proplists:get_value(declarations, Endpoint, []), []}) of
611+
[] ->
612+
case proplists:get_value(predeclared, application:get_env(?APP, topology, []), false) of
613+
true -> case proplists:get_value(queue, Endpoint) of
614+
<<>> -> fail({invalid_parameter_value, declarations, {require_non_empty}});
615+
Queue -> {?MODULE, check_fun, [Queue]}
616+
end;
617+
false -> {?MODULE, decl_fun, []}
618+
end;
619+
Decl -> {?MODULE, decl_fun, [Decl]}
620+
end;
621+
decl_fun({destination, Endpoint}) ->
622+
Decl = parse_declaration({proplists:get_value(declarations, Endpoint, []), []}),
610623
{?MODULE, decl_fun, [Decl]}.
611-
624+
612625
decl_fun(Decl, _Conn, Ch) ->
613626
[begin
614627
amqp_channel:call(Ch, M)
615628
end || M <- lists:reverse(Decl)].
616629

630+
check_fun(Queue, _Conn, Ch) ->
631+
amqp_channel:call(Ch, #'queue.declare'{queue = Queue,
632+
passive = true}).
633+
617634
parse_parameter(Param, Fun, Value) ->
618635
try
619636
Fun(Value)

deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
-module(rabbit_shovel_parameters).
99
-behaviour(rabbit_runtime_parameter).
1010

11+
-define(APP, rabbitmq_shovel).
12+
1113
-include_lib("amqp_client/include/amqp_client.hrl").
1214
-include("rabbit_shovel.hrl").
1315

@@ -20,7 +22,8 @@
2022
%% from and can break with the next upgrade. It should not be used by
2123
%% another one that the one who created it or survive a node restart.
2224
%% Thus, function references have been replace by the following MFA.
23-
-export([dest_decl/4, src_decl_exchange/4, src_decl_queue/4,
25+
-export([dest_decl/4, dest_check/4,
26+
src_decl_exchange/4, src_decl_queue/4, src_check_queue/4,
2427
fields_fun/5, props_fun/9]).
2528

2629
-import(rabbit_misc, [pget/2, pget/3, pset/3]).
@@ -146,7 +149,8 @@ amqp091_src_validation(_Def, User) ->
146149
%% a deprecated pre-3.7 setting
147150
{<<"delete-after">>, fun validate_delete_after/2, optional},
148151
%% currently used multi-protocol friend name, introduced in 3.7
149-
{<<"src-delete-after">>, fun validate_delete_after/2, optional}
152+
{<<"src-delete-after">>, fun validate_delete_after/2, optional},
153+
{<<"src-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional}
150154
].
151155

152156
dest_validation(Def0, User) ->
@@ -178,7 +182,8 @@ amqp091_dest_validation(_Def, User) ->
178182
{<<"dest-add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional},
179183
{<<"dest-add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional},
180184
{<<"publish-properties">>, fun validate_properties/2, optional},
181-
{<<"dest-publish-properties">>, fun validate_properties/2, optional}
185+
{<<"dest-publish-properties">>, fun validate_properties/2, optional},
186+
{<<"dest-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional}
182187
].
183188

184189
validate_uri_fun(User) ->
@@ -329,7 +334,13 @@ parse_amqp091_dest({VHost, Name}, ClusterName, Def, SourceHeaders) ->
329334
DestXKey = pget(<<"dest-exchange-key">>, Def, none),
330335
DestQ = pget(<<"dest-queue">>, Def, none),
331336
DestQArgs = pget(<<"dest-queue-args">>, Def, #{}),
332-
DestDeclFun = {?MODULE, dest_decl, [DestQ, DestQArgs]},
337+
GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false),
338+
Predeclared = pget(<<"dest-predeclared">>, Def, GlobalPredeclared),
339+
DestDeclFun = case Predeclared of
340+
true -> {?MODULE, dest_check, [DestQ, DestQArgs]};
341+
false -> {?MODULE, dest_decl, [DestQ, DestQArgs]}
342+
end,
343+
333344
{X, Key} = case DestQ of
334345
none -> {DestX, DestXKey};
335346
_ -> {<<>>, DestQ}
@@ -394,6 +405,11 @@ dest_decl(DestQ, DestQArgs, Conn, _Ch) ->
394405
none -> ok;
395406
_ -> ensure_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs))
396407
end.
408+
dest_check(DestQ, DestQArgs, Conn, _Ch) ->
409+
case DestQ of
410+
none -> ok;
411+
_ -> check_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs))
412+
end.
397413

398414
parse_amqp10_source(Def) ->
399415
Uris = deobfuscated_uris(<<"src-uri">>, Def),
@@ -415,13 +431,21 @@ parse_amqp091_source(Def) ->
415431
SrcQ = pget(<<"src-queue">>, Def, none),
416432
SrcQArgs = pget(<<"src-queue-args">>, Def, #{}),
417433
SrcCArgs = rabbit_misc:to_amqp_table(pget(<<"src-consumer-args">>, Def, [])),
434+
GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false),
435+
Predeclared = pget(<<"src-predeclared">>, Def, GlobalPredeclared),
418436
{SrcDeclFun, Queue, DestHeaders} =
419437
case SrcQ of
420438
none -> {{?MODULE, src_decl_exchange, [SrcX, SrcXKey]}, <<>>,
421439
[{<<"src-exchange">>, SrcX},
422440
{<<"src-exchange-key">>, SrcXKey}]};
423-
_ -> {{?MODULE, src_decl_queue, [SrcQ, SrcQArgs]},
424-
SrcQ, [{<<"src-queue">>, SrcQ}]}
441+
_ -> case Predeclared of
442+
false ->
443+
{{?MODULE, src_decl_queue, [SrcQ, SrcQArgs]},
444+
SrcQ, [{<<"src-queue">>, SrcQ}]};
445+
true ->
446+
{{?MODULE, src_check_queue, [SrcQ, SrcQArgs]},
447+
SrcQ, [{<<"src-queue">>, SrcQ}]}
448+
end
425449
end,
426450
DeleteAfter = pget(<<"src-delete-after">>, Def,
427451
pget(<<"delete-after">>, Def, <<"never">>)),
@@ -450,6 +474,9 @@ src_decl_exchange(SrcX, SrcXKey, _Conn, Ch) ->
450474
src_decl_queue(SrcQ, SrcQArgs, Conn, _Ch) ->
451475
ensure_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs)).
452476

477+
src_check_queue(SrcQ, SrcQArgs, Conn, _Ch) ->
478+
check_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs)).
479+
453480
get_uris(Key, Def) ->
454481
URIs = case pget(Key, Def) of
455482
B when is_binary(B) -> [B];
@@ -481,7 +508,14 @@ ensure_queue(Conn, Queue, XArgs) ->
481508
after
482509
catch amqp_channel:close(Ch)
483510
end.
484-
511+
check_queue(Conn, Queue, _XArgs) ->
512+
{ok, Ch} = amqp_connection:open_channel(Conn),
513+
try
514+
amqp_channel:call(Ch, #'queue.declare'{queue = Queue,
515+
passive = true})
516+
after
517+
catch amqp_channel:close(Ch)
518+
end.
485519
opt_b2a(B) when is_binary(B) -> list_to_atom(binary_to_list(B));
486520
opt_b2a(N) -> N.
487521

deps/rabbitmq_shovel/test/configuration_SUITE.erl

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
-compile(export_all).
1414

15+
-define(QUEUE, <<"test_queue">>).
1516
-define(EXCHANGE, <<"test_exchange">>).
1617
-define(TO_SHOVEL, <<"to_the_shovel">>).
1718
-define(FROM_SHOVEL, <<"from_the_shovel">>).
@@ -21,7 +22,8 @@
2122

2223
all() ->
2324
[
24-
{group, non_parallel_tests}
25+
{group, non_parallel_tests},
26+
{group, with_predefined_topology}
2527
].
2628

2729
groups() ->
@@ -31,7 +33,10 @@ groups() ->
3133
invalid_legacy_configuration,
3234
valid_legacy_configuration,
3335
valid_configuration
34-
]}
36+
]},
37+
{with_predefined_topology, [], [
38+
valid_configuration_with_predefined_resources
39+
]}
3540
].
3641

3742
%% -------------------------------------------------------------------
@@ -41,7 +46,9 @@ groups() ->
4146
init_per_suite(Config) ->
4247
rabbit_ct_helpers:log_environment(),
4348
Config1 = rabbit_ct_helpers:set_config(Config, [
44-
{rmq_nodename_suffix, ?MODULE}
49+
{rmq_nodename_suffix, ?MODULE},
50+
{ignored_crashes,
51+
["server_initiated_close,404"]}
4552
]),
4653
rabbit_ct_helpers:run_setup_steps(Config1,
4754
rabbit_ct_broker_helpers:setup_steps() ++
@@ -53,9 +60,19 @@ end_per_suite(Config) ->
5360
rabbit_ct_client_helpers:teardown_steps() ++
5461
rabbit_ct_broker_helpers:teardown_steps()).
5562

63+
init_per_group(with_predefined_topology, Config) ->
64+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,
65+
[rabbitmq_shovel, topology, [{predeclared, true}]]),
66+
Config;
67+
5668
init_per_group(_, Config) ->
5769
Config.
5870

71+
end_per_group(with_predefined_topology, Config) ->
72+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, unset_env,
73+
[rabbitmq_shovel, topology]),
74+
Config;
75+
5976
end_per_group(_, Config) ->
6077
Config.
6178

@@ -209,6 +226,12 @@ valid_configuration(Config) ->
209226
ok = setup_shovels(Config),
210227
run_valid_test(Config).
211228

229+
valid_configuration_with_predefined_resources(Config) ->
230+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, setup_shovels2, [Config]),
231+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, await_terminated_shovel, [test_shovel]),
232+
declare_queue(Config),
233+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, await_running_shovel, [test_shovel]).
234+
212235
run_valid_test(Config) ->
213236
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
214237

@@ -271,6 +294,12 @@ run_valid_test(Config) ->
271294

272295
rabbit_ct_client_helpers:close_channel(Chan).
273296

297+
declare_queue(Config) ->
298+
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
299+
amqp_channel:call(Chan, #'queue.declare'{queue = ?QUEUE,
300+
durable = true}),
301+
rabbit_ct_client_helpers:close_channel(Chan).
302+
274303
setup_legacy_shovels(Config) ->
275304
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
276305
?MODULE, setup_legacy_shovels1, [Config]).
@@ -349,6 +378,34 @@ setup_shovels1(Config) ->
349378
ok = application:start(rabbitmq_shovel),
350379
await_running_shovel(test_shovel).
351380

381+
setup_shovels2(Config) ->
382+
_ = application:stop(rabbitmq_shovel),
383+
Hostname = ?config(rmq_hostname, Config),
384+
TcpPort = rabbit_ct_broker_helpers:get_node_config(Config, 0,
385+
tcp_port_amqp),
386+
%% a working config
387+
application:set_env(
388+
rabbitmq_shovel,
389+
shovels,
390+
[{test_shovel,
391+
[{source,
392+
[{uris, [rabbit_misc:format("amqp://~ts:~b/%2f?heartbeat=5",
393+
[Hostname, TcpPort])]},
394+
{queue, ?QUEUE}]},
395+
{destination,
396+
[{uris, [rabbit_misc:format("amqp://~ts:~b/%2f",
397+
[Hostname, TcpPort])]},
398+
{publish_fields, [{exchange, ?EXCHANGE}, {routing_key, ?FROM_SHOVEL}]},
399+
{publish_properties, [{delivery_mode, 2},
400+
{cluster_id, <<"my-cluster">>},
401+
{content_type, ?SHOVELLED}]},
402+
{add_forward_headers, true},
403+
{add_timestamp_header, true}]},
404+
{ack_mode, on_confirm}]}],
405+
infinity),
406+
407+
ok = application:start(rabbitmq_shovel).
408+
352409
await_running_shovel(Name) ->
353410
case [N || {N, _, {running, _}, _}
354411
<- rabbit_shovel_status:status(),
@@ -357,3 +414,11 @@ await_running_shovel(Name) ->
357414
_ -> timer:sleep(100),
358415
await_running_shovel(Name)
359416
end.
417+
await_terminated_shovel(Name) ->
418+
case [N || {N, _, {terminated, _}, _}
419+
<- rabbit_shovel_status:status(),
420+
N =:= Name] of
421+
[_] -> ok;
422+
_ -> timer:sleep(100),
423+
await_terminated_shovel(Name)
424+
end.

0 commit comments

Comments
 (0)