Skip to content

Commit e2e92d3

Browse files
MarcialRosalesmichaelklishin
authored andcommitted
Support predeclared feature in static shovels
1 parent 55bc5a2 commit e2e92d3

File tree

4 files changed

+87
-11
lines changed

4 files changed

+87
-11
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
%% from and can break with the next upgrade. It should not be used by
4242
%% another one that the one who created it or survive a node restart.
4343
%% Thus, function references have been replace by the following MFA.
44-
-export([decl_fun/3, publish_fun/4, props_fun_timestamp_header/4,
44+
-export([decl_fun/3, check_fun/3, publish_fun/4, props_fun_timestamp_header/4,
4545
props_fun_forward_header/5]).
4646

4747
-define(MAX_CONNECTION_CLOSE_TIMEOUT, 10000).
@@ -56,7 +56,7 @@ parse(_Name, {source, Source}) ->
5656
CArgs = proplists:get_value(consumer_args, Source, []),
5757
#{module => ?MODULE,
5858
uris => proplists:get_value(uris, Source),
59-
resource_decl => decl_fun(Source),
59+
resource_decl => decl_fun({source, Source}),
6060
queue => Queue,
6161
delete_after => proplists:get_value(delete_after, Source, never),
6262
prefetch_count => Prefetch,
@@ -72,7 +72,7 @@ parse(Name, {destination, Dest}) ->
7272
PropsFun2 = add_timestamp_header_fun(ATH, PropsFun1),
7373
#{module => ?MODULE,
7474
uris => proplists:get_value(uris, Dest),
75-
resource_decl => decl_fun(Dest),
75+
resource_decl => decl_fun({destination, Dest}),
7676
props_fun => PropsFun2,
7777
fields_fun => PubFieldsFun,
7878
add_forward_headers => AFH,
@@ -606,16 +606,35 @@ parse_declaration({[{Method, Props} | _Rest], _Acc}) ->
606606
parse_declaration({[Method | Rest], Acc}) ->
607607
parse_declaration({[{Method, []} | Rest], Acc}).
608608

609-
decl_fun(Endpoint) ->
610-
Decl = parse_declaration({proplists:get_value(declarations, Endpoint, []),
611-
[]}),
609+
decl_fun({source, Endpoint}) ->
610+
case parse_declaration({proplists:get_value(declarations, Endpoint, []), []}) of
611+
[] ->
612+
case proplists:get_value(predeclared, application:get_env(?APP, topology, []), false) of
613+
true -> case proplists:get_value(queue, Endpoint) of
614+
<<>> -> fail({invalid_parameter_value, declarations, {require_non_empty}});
615+
Queue -> {?MODULE, check_fun, [Queue]}
616+
end;
617+
false -> {?MODULE, decl_fun, []}
618+
end;
619+
Decl -> {?MODULE, decl_fun, [Decl]}
620+
end;
621+
decl_fun({destination, Endpoint}) ->
622+
Decl = parse_declaration({proplists:get_value(declarations, Endpoint, []), []}),
612623
{?MODULE, decl_fun, [Decl]}.
613-
624+
614625
decl_fun(Decl, _Conn, Ch) ->
615626
[begin
616627
amqp_channel:call(Ch, M)
617628
end || M <- lists:reverse(Decl)].
618629

630+
check_fun(Queue, _Conn, Ch) ->
631+
try
632+
amqp_channel:call(Ch, #'queue.declare'{queue = Queue,
633+
passive = true})
634+
after
635+
catch amqp_channel:close(Ch)
636+
end.
637+
619638
parse_parameter(Param, Fun, Value) ->
620639
try
621640
Fun(Value)

deps/rabbitmq_shovel/src/rabbit_shovel_config.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,13 @@ convert_from_legacy(Config) ->
6666
{reconnect_delay, RD}].
6767

6868
parse(ShovelName, Config0) ->
69+
rabbit_log:debug("rabbit_shovel_config:parse ~p ~p", [ShovelName, Config0]),
6970
try
7071
validate(Config0),
7172
case is_legacy(Config0) of
72-
true ->
73+
true ->
7374
Config = convert_from_legacy(Config0),
75+
rabbit_log:debug("rabbit_shovel_config:parse is_legacy -> ~p", [Config]),
7476
parse_current(ShovelName, Config);
7577
false ->
7678
parse_current(ShovelName, Config0)
@@ -124,8 +126,9 @@ validate_uris0([Uri | Uris]) ->
124126
validate_uris0([]) -> ok.
125127

126128
parse_current(ShovelName, Config) ->
129+
rabbit_log:debug("rabbit_shovel_config:parse_current ~p", [ShovelName]),
127130
{source, Source} = proplists:lookup(source, Config),
128-
validate(Source),
131+
validate(Source),
129132
SrcMod = resolve_module(proplists:get_value(protocol, Source, amqp091)),
130133
{destination, Destination} = proplists:lookup(destination, Config),
131134
validate(Destination),

deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ parse_configuration(_Defaults, [], Acc) ->
7070
parse_configuration(Defaults, [{ShovelName, ShovelConfig} | Env], Acc) when
7171
is_atom(ShovelName) andalso is_list(ShovelConfig)
7272
->
73+
rabbit_log:debug("rabbit_shovel:parse_configuration ~p ~p", [ShovelName, ShovelConfig]),
7374
case dict:is_key(ShovelName, Acc) of
7475
true ->
7576
{error, {duplicate_shovel_definition, ShovelName}};

deps/rabbitmq_shovel/test/configuration_SUITE.erl

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
-compile(export_all).
1414

15+
-define(QUEUE, <<"test_queue">>).
1516
-define(EXCHANGE, <<"test_exchange">>).
1617
-define(TO_SHOVEL, <<"to_the_shovel">>).
1718
-define(FROM_SHOVEL, <<"from_the_shovel">>).
@@ -31,7 +32,10 @@ groups() ->
3132
invalid_legacy_configuration,
3233
valid_legacy_configuration,
3334
valid_configuration
34-
]}
35+
]},
36+
{with_predefined_topology, [], [
37+
valid_configuration_with_predefined_resources
38+
]}
3539
].
3640

3741
%% -------------------------------------------------------------------
@@ -53,9 +57,19 @@ end_per_suite(Config) ->
5357
rabbit_ct_client_helpers:teardown_steps() ++
5458
rabbit_ct_broker_helpers:teardown_steps()).
5559

60+
init_per_group(with_predefined_topology, Config) ->
61+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,
62+
[rabbitmq_shovel, topology, [{predeclared, true}]]),
63+
Config;
64+
5665
init_per_group(_, Config) ->
5766
Config.
5867

68+
end_per_group(with_predefined_topology, Config) ->
69+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, unset_env,
70+
[rabbitmq_shovel, topology]),
71+
Config;
72+
5973
end_per_group(_, Config) ->
6074
Config.
6175

@@ -209,6 +223,11 @@ valid_configuration(Config) ->
209223
ok = setup_shovels(Config),
210224
run_valid_test(Config).
211225

226+
valid_configuration_with_predefined_resources(Config) ->
227+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, setup_shovels2, [Config]),
228+
run_valid_test2(Config),
229+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, await_running_shovel, [test_shovel]).
230+
212231
run_valid_test(Config) ->
213232
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
214233

@@ -271,14 +290,20 @@ run_valid_test(Config) ->
271290

272291
rabbit_ct_client_helpers:close_channel(Chan).
273292

293+
run_valid_test2(Config) ->
294+
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
295+
amqp_channel:call(Chan, #'queue.declare'{queue = ?QUEUE,
296+
durable = true}),
297+
rabbit_ct_client_helpers:close_channel(Chan).
298+
274299
setup_legacy_shovels(Config) ->
275300
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
276301
?MODULE, setup_legacy_shovels1, [Config]).
277302

278303
setup_shovels(Config) ->
279304
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
280305
?MODULE, setup_shovels1, [Config]).
281-
306+
282307
setup_legacy_shovels1(Config) ->
283308
_ = application:stop(rabbitmq_shovel),
284309
Hostname = ?config(rmq_hostname, Config),
@@ -349,6 +374,34 @@ setup_shovels1(Config) ->
349374
ok = application:start(rabbitmq_shovel),
350375
await_running_shovel(test_shovel).
351376

377+
setup_shovels2(Config) ->
378+
_ = application:stop(rabbitmq_shovel),
379+
Hostname = ?config(rmq_hostname, Config),
380+
TcpPort = rabbit_ct_broker_helpers:get_node_config(Config, 0,
381+
tcp_port_amqp),
382+
%% a working config
383+
application:set_env(
384+
rabbitmq_shovel,
385+
shovels,
386+
[{test_shovel,
387+
[{source,
388+
[{uris, [rabbit_misc:format("amqp://~ts:~b/%2f?heartbeat=5",
389+
[Hostname, TcpPort])]},
390+
{queue, ?QUEUE}]},
391+
{destination,
392+
[{uris, [rabbit_misc:format("amqp://~ts:~b/%2f",
393+
[Hostname, TcpPort])]},
394+
{publish_fields, [{exchange, ?EXCHANGE}, {routing_key, ?FROM_SHOVEL}]},
395+
{publish_properties, [{delivery_mode, 2},
396+
{cluster_id, <<"my-cluster">>},
397+
{content_type, ?SHOVELLED}]},
398+
{add_forward_headers, true},
399+
{add_timestamp_header, true}]},
400+
{ack_mode, on_confirm}]}],
401+
infinity),
402+
403+
ok = application:start(rabbitmq_shovel).
404+
352405
await_running_shovel(Name) ->
353406
case [N || {N, _, {running, _}, _}
354407
<- rabbit_shovel_status:status(),

0 commit comments

Comments
 (0)