Skip to content

Commit 5db4ae0

Browse files
committed
Replace function references by MFAs
1 parent 4181a91 commit 5db4ae0

File tree

2 files changed

+84
-59
lines changed

2 files changed

+84
-59
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@
3434
forward/4
3535
]).
3636

37+
%% Function references should not be stored on the metadata store.
38+
%% They are only valid for the version of the module they were created
39+
%% from and can break with the next upgrade. It should not be used by
40+
%% another one that the one who created it or survive a node restart.
41+
%% Thus, function references have been replace by the following MFA.
42+
-export([decl_fun/3, publish_fun/4]).
43+
3744
-define(MAX_CONNECTION_CLOSE_TIMEOUT, 10000).
3845

3946
parse(_Name, {source, Source}) ->
@@ -77,9 +84,9 @@ init_source(Conf = #{ack_mode := AckMode,
7784
source := #{queue := Queue,
7885
current := {Conn, Chan, _},
7986
prefetch_count := Prefetch,
80-
resource_decl := Decl,
87+
resource_decl := {M, F, MFArgs},
8188
consumer_args := Args} = Src}) ->
82-
Decl(Conn, Chan),
89+
apply(M, F, MFArgs ++ [Conn, Chan]),
8390

8491
NoAck = AckMode =:= no_ack,
8592
case NoAck of
@@ -108,9 +115,9 @@ connect_dest(Conf = #{name := Name, dest := #{uris := Uris} = Dst}) ->
108115

109116
init_dest(Conf = #{ack_mode := AckMode,
110117
dest := #{current := {Conn, Chan, _},
111-
resource_decl := Decl} = Dst}) ->
118+
resource_decl := {M, F, MFArgs}} = Dst}) ->
112119

113-
Decl(Conn, Chan),
120+
apply(M, F, MFArgs ++ [Conn, Chan]),
114121

115122
case AckMode of
116123
on_confirm ->
@@ -187,16 +194,16 @@ forward(IncomingTag, Props, Payload, State) ->
187194
end.
188195

189196
do_forward(IncomingTag, Props, Payload,
190-
State0 = #{dest := #{props_fun := PropsFun,
197+
State0 = #{dest := #{props_fun := {M, F, Args},
191198
current := {_, _, DstUri},
192-
fields_fun := FieldsFun}}) ->
199+
fields_fun := {Mf, Ff, Argsf}}}) ->
193200
SrcUri = rabbit_shovel_behaviour:source_uri(State0),
194201
% do publish
195202
Exchange = maps:get(exchange, Props, undefined),
196203
RoutingKey = maps:get(routing_key, Props, undefined),
197204
Method = #'basic.publish'{exchange = Exchange, routing_key = RoutingKey},
198-
Method1 = FieldsFun(SrcUri, DstUri, Method),
199-
Msg1 = #amqp_msg{props = PropsFun(SrcUri, DstUri, props_from_map(Props)),
205+
Method1 = apply(Mf, Ff, Argsf ++ [SrcUri, DstUri, Method]),
206+
Msg1 = #amqp_msg{props = apply(M, F, Args ++ [SrcUri, DstUri, props_from_map(Props)]),
200207
payload = Payload},
201208
publish(IncomingTag, Method1, Msg1, State0).
202209

@@ -519,11 +526,7 @@ make_publish_fun(Fields, ValidFields) when is_list(Fields) ->
519526
case SuppliedFields -- ValidFields of
520527
[] ->
521528
FieldIndices = make_field_indices(ValidFields, Fields),
522-
fun (_SrcUri, _DestUri, Publish) ->
523-
lists:foldl(fun ({Pos1, Value}, Pub) ->
524-
setelement(Pos1, Pub, Value)
525-
end, Publish, FieldIndices)
526-
end;
529+
{?MODULE, publish_fun, [FieldIndices]};
527530
Unexpected ->
528531
fail({invalid_parameter_value, publish_properties,
529532
{unexpected_fields, Unexpected, ValidFields}})
@@ -532,6 +535,11 @@ make_publish_fun(Fields, _) ->
532535
fail({invalid_parameter_value, publish_properties,
533536
{require_list, Fields}}).
534537

538+
publish_fun(FieldIndices, _SrcUri, _DestUri, Publish) ->
539+
lists:foldl(fun ({Pos1, Value}, Pub) ->
540+
setelement(Pos1, Pub, Value)
541+
end, Publish, FieldIndices).
542+
535543
make_field_indices(Valid, Fields) ->
536544
make_field_indices(Fields, field_map(Valid, 2), []).
537545

@@ -596,11 +604,12 @@ parse_declaration({[Method | Rest], Acc}) ->
596604
decl_fun(Endpoint) ->
597605
Decl = parse_declaration({proplists:get_value(declarations, Endpoint, []),
598606
[]}),
599-
fun (_Conn, Ch) ->
600-
[begin
601-
amqp_channel:call(Ch, M)
602-
end || M <- lists:reverse(Decl)]
603-
end.
607+
{?MODULE, decl_fun, [Decl]}.
608+
609+
decl_fun(Decl, _Conn, Ch) ->
610+
[begin
611+
amqp_channel:call(Ch, M)
612+
end || M <- lists:reverse(Decl)].
604613

605614
parse_parameter(Param, Fun, Value) ->
606615
try

deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl

Lines changed: 57 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@
1515
-export([register/0, unregister/0, parse/3]).
1616
-export([obfuscate_uris_in_definition/1]).
1717

18+
%% Function references should not be stored on the metadata store.
19+
%% They are only valid for the version of the module they were created
20+
%% from and can break with the next upgrade. It should not be used by
21+
%% another one that the one who created it or survive a node restart.
22+
%% Thus, function references have been replace by the following MFA.
23+
-export([dest_decl/4, src_decl_exchange/4, src_decl_queue/4,
24+
fields_fun/5, props_fun/9]).
25+
1826
-import(rabbit_misc, [pget/2, pget/3, pset/3]).
1927

2028
-rabbit_boot_step({?MODULE,
@@ -321,12 +329,7 @@ parse_amqp091_dest({VHost, Name}, ClusterName, Def, SourceHeaders) ->
321329
DestXKey = pget(<<"dest-exchange-key">>, Def, none),
322330
DestQ = pget(<<"dest-queue">>, Def, none),
323331
DestQArgs = pget(<<"dest-queue-args">>, Def, #{}),
324-
DestDeclFun = fun (Conn, _Ch) ->
325-
case DestQ of
326-
none -> ok;
327-
_ -> ensure_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs))
328-
end
329-
end,
332+
DestDeclFun = {?MODULE, dest_decl, [DestQ, DestQArgs]},
330333
{X, Key} = case DestQ of
331334
none -> {DestX, DestXKey};
332335
_ -> {<<>>, DestQ}
@@ -335,16 +338,6 @@ parse_amqp091_dest({VHost, Name}, ClusterName, Def, SourceHeaders) ->
335338
{<<"dest-exchange-key">>, DestXKey},
336339
{<<"dest-queue">>, DestQ}],
337340
V =/= none],
338-
PubFun = fun (_SrcURI, _DestURI, P0) ->
339-
P1 = case X of
340-
none -> P0;
341-
_ -> P0#'basic.publish'{exchange = X}
342-
end,
343-
case Key of
344-
none -> P1;
345-
_ -> P1#'basic.publish'{routing_key = Key}
346-
end
347-
end,
348341
AddHeadersLegacy = pget(<<"add-forward-headers">>, Def, false),
349342
AddHeaders = pget(<<"dest-add-forward-headers">>, Def, AddHeadersLegacy),
350343
Table0 = [{<<"shovelled-by">>, ClusterName},
@@ -357,19 +350,6 @@ parse_amqp091_dest({VHost, Name}, ClusterName, Def, SourceHeaders) ->
357350
AddTimestampHeaderLegacy = pget(<<"add-timestamp-header">>, Def, false),
358351
AddTimestampHeader = pget(<<"dest-add-timestamp-header">>, Def,
359352
AddTimestampHeaderLegacy),
360-
PubPropsFun = fun (SrcURI, DestURI, P0) ->
361-
P = set_properties(P0, SetProps),
362-
P1 = case AddHeaders of
363-
true -> rabbit_shovel_util:update_headers(
364-
Table0, SourceHeaders ++ Table2,
365-
SrcURI, DestURI, P);
366-
false -> P
367-
end,
368-
case AddTimestampHeader of
369-
true -> rabbit_shovel_util:add_timestamp_header(P1);
370-
false -> P1
371-
end
372-
end,
373353
%% Details are only used for status report in rabbitmqctl, as vhost is not
374354
%% available to query the runtime parameters.
375355
Details = maps:from_list([{K, V} || {K, V} <- [{dest_exchange, DestX},
@@ -379,10 +359,42 @@ parse_amqp091_dest({VHost, Name}, ClusterName, Def, SourceHeaders) ->
379359
maps:merge(#{module => rabbit_amqp091_shovel,
380360
uris => DestURIs,
381361
resource_decl => DestDeclFun,
382-
fields_fun => PubFun,
383-
props_fun => PubPropsFun
362+
fields_fun => {?MODULE, fields_fun, [X, Key]},
363+
props_fun => {?MODULE, props_fun, [Table0, Table2, SetProps,
364+
AddHeaders, SourceHeaders,
365+
AddTimestampHeader]}
384366
}, Details).
385367

368+
fields_fun(X, Key, _SrcURI, _DestURI, P0) ->
369+
P1 = case X of
370+
none -> P0;
371+
_ -> P0#'basic.publish'{exchange = X}
372+
end,
373+
case Key of
374+
none -> P1;
375+
_ -> P1#'basic.publish'{routing_key = Key}
376+
end.
377+
378+
props_fun(Table0, Table2, SetProps, AddHeaders, SourceHeaders, AddTimestampHeader,
379+
SrcURI, DestURI, P0) ->
380+
P = set_properties(P0, SetProps),
381+
P1 = case AddHeaders of
382+
true -> rabbit_shovel_util:update_headers(
383+
Table0, SourceHeaders ++ Table2,
384+
SrcURI, DestURI, P);
385+
false -> P
386+
end,
387+
case AddTimestampHeader of
388+
true -> rabbit_shovel_util:add_timestamp_header(P1);
389+
false -> P1
390+
end.
391+
392+
dest_decl(DestQ, DestQArgs, Conn, _Ch) ->
393+
case DestQ of
394+
none -> ok;
395+
_ -> ensure_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs))
396+
end.
397+
386398
parse_amqp10_source(Def) ->
387399
Uris = deobfuscated_uris(<<"src-uri">>, Def),
388400
Address = pget(<<"src-address">>, Def),
@@ -405,16 +417,11 @@ parse_amqp091_source(Def) ->
405417
SrcCArgs = rabbit_misc:to_amqp_table(pget(<<"src-consumer-args">>, Def, [])),
406418
{SrcDeclFun, Queue, DestHeaders} =
407419
case SrcQ of
408-
none -> {fun (_Conn, Ch) ->
409-
Ms = [#'queue.declare'{exclusive = true},
410-
#'queue.bind'{routing_key = SrcXKey,
411-
exchange = SrcX}],
412-
[amqp_channel:call(Ch, M) || M <- Ms]
413-
end, <<>>, [{<<"src-exchange">>, SrcX},
414-
{<<"src-exchange-key">>, SrcXKey}]};
415-
_ -> {fun (Conn, _Ch) ->
416-
ensure_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs))
417-
end, SrcQ, [{<<"src-queue">>, SrcQ}]}
420+
none -> {{?MODULE, src_decl_exchange, [SrcX, SrcXKey]}, <<>>,
421+
[{<<"src-exchange">>, SrcX},
422+
{<<"src-exchange-key">>, SrcXKey}]};
423+
_ -> {{?MODULE, src_decl_queue, [SrcQ, SrcQArgs]},
424+
SrcQ, [{<<"src-queue">>, SrcQ}]}
418425
end,
419426
DeleteAfter = pget(<<"src-delete-after">>, Def,
420427
pget(<<"delete-after">>, Def, <<"never">>)),
@@ -434,6 +441,15 @@ parse_amqp091_source(Def) ->
434441
consumer_args => SrcCArgs
435442
}, Details), DestHeaders}.
436443

444+
src_decl_exchange(SrcX, SrcXKey, _Conn, Ch) ->
445+
Ms = [#'queue.declare'{exclusive = true},
446+
#'queue.bind'{routing_key = SrcXKey,
447+
exchange = SrcX}],
448+
[amqp_channel:call(Ch, M) || M <- Ms].
449+
450+
src_decl_queue(SrcQ, SrcQArgs, Conn, _Ch) ->
451+
ensure_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs)).
452+
437453
get_uris(Key, Def) ->
438454
URIs = case pget(Key, Def) of
439455
B when is_binary(B) -> [B];

0 commit comments

Comments
 (0)