Skip to content

Commit 7dfdec8

Browse files
Merge branch 'master' into rabbitmq-server-1246-master
2 parents 5a9c1af + 269a2c1 commit 7dfdec8

8 files changed

+145
-114
lines changed

docs/rabbitmqctl.8

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1847,6 +1847,32 @@ For example:
18471847
.sp
18481848
.Dl rabbitmqctl encode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '<<"guest">>' mypassphrase
18491849
.El
1850+
.\" ------------------------------------
1851+
.It Cm decode Ar value Ar passphrase
1852+
.Bl -tag -width Ds
1853+
.It Ar value Ar passphrase
1854+
Value to decrypt (as produced by the encode command) and passphrase.
1855+
.Pp
1856+
For example:
1857+
.sp
1858+
.Dl rabbitmqctl decode '{encrypted, <<"...">>}' mypassphrase
1859+
.El
1860+
.\" ------------------------------------
1861+
.It Cm list_hashes
1862+
Lists hash functions supported by encoding commands.
1863+
.Pp
1864+
For example, this command instructs the RabbitMQ broker to list all hash
1865+
functions supported by encoding commands:
1866+
.sp
1867+
.Dl rabbitmqctl list_hashes
1868+
.\" ------------------------------------
1869+
.It Cm list_ciphers
1870+
Lists cipher suites supported by encoding commands.
1871+
.Pp
1872+
For example, this command instructs the RabbitMQ broker to list all
1873+
cipher suites supported by encoding commands:
1874+
.sp
1875+
.Dl rabbitmqctl list_ciphers
18501876
.El
18511877
.\" ------------------------------------------------------------------
18521878
.Sh PLUGIN COMMANDS

src/rabbit_amqqueue.erl

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -627,12 +627,12 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
627627
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
628628

629629
info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
630-
info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info).
630+
info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}).
631631

632632
info(Q = #amqqueue{ state = crashed }, Items) ->
633633
info_down(Q, Items, crashed);
634634
info(#amqqueue{ pid = QPid }, Items) ->
635-
case delegate:call(QPid, {info, Items}) of
635+
case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of
636636
{ok, Res} -> Res;
637637
{error, Error} -> throw(Error)
638638
end.
@@ -693,7 +693,8 @@ force_event_refresh(Ref) ->
693693
notify_policy_changed(#amqqueue{pid = QPid}) ->
694694
gen_server2:cast(QPid, policy_changed).
695695

696-
consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers).
696+
consumers(#amqqueue{ pid = QPid }) ->
697+
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}).
697698

698699
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
699700

@@ -721,7 +722,7 @@ get_queue_consumer_info(Q, ConsumerInfoKeys) ->
721722
AckRequired, Prefetch, Args]) ||
722723
{ChPid, CTag, AckRequired, Prefetch, Args, _} <- consumers(Q)].
723724

724-
stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat).
725+
stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}).
725726

726727
pid_of(#amqqueue{pid = Pid}) -> Pid.
727728
pid_of(VHost, QueueName) ->
@@ -739,7 +740,7 @@ delete_immediately(QPids) ->
739740
ok.
740741

741742
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty, ActingUser) ->
742-
delegate:call(QPid, {delete, IfUnused, IfEmpty, ActingUser}).
743+
delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]}).
743744

744745
delete_crashed(Q) ->
745746
delete_crashed(Q, ?INTERNAL_USER).
@@ -752,21 +753,24 @@ delete_crashed_internal(Q = #amqqueue{ name = QName }, ActingUser) ->
752753
BQ:delete_crashed(Q),
753754
ok = internal_delete(QName, ActingUser).
754755

755-
purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge).
756+
purge(#amqqueue{ pid = QPid }) ->
757+
delegate:invoke(QPid, {gen_server2, call, [purge, infinity]}).
756758

757-
requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
759+
requeue(QPid, MsgIds, ChPid) ->
760+
delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}).
758761

759-
ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
762+
ack(QPid, MsgIds, ChPid) ->
763+
delegate:invoke_no_result(QPid, {gen_server2, cast, [{ack, MsgIds, ChPid}]}).
760764

761765
reject(QPid, Requeue, MsgIds, ChPid) ->
762-
delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}).
766+
delegate:invoke_no_result(QPid, {gen_server2, cast, [{reject, Requeue, MsgIds, ChPid}]}).
763767

764768
notify_down_all(QPids, ChPid) ->
765769
notify_down_all(QPids, ChPid, ?CHANNEL_OPERATION_TIMEOUT).
766770

767771
notify_down_all(QPids, ChPid, Timeout) ->
768-
case rpc:call(node(), delegate, call,
769-
[QPids, {notify_down, ChPid}], Timeout) of
772+
case rpc:call(node(), delegate, invoke,
773+
[QPids, {gen_server2, call, [{notify_down, ChPid}, infinity]}], Timeout) of
770774
{badrpc, timeout} -> {error, {channel_operation_timeout, Timeout}};
771775
{badrpc, Reason} -> {error, Reason};
772776
{_, Bads} ->
@@ -782,35 +786,37 @@ notify_down_all(QPids, ChPid, Timeout) ->
782786
end.
783787

784788
activate_limit_all(QPids, ChPid) ->
785-
delegate:cast(QPids, {activate_limit, ChPid}).
789+
delegate:invoke_no_result(QPids, {gen_server2, cast, [{activate_limit, ChPid}]}).
786790

787791
credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) ->
788-
delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}).
792+
delegate:invoke_no_result(QPid, {gen_server2, cast, [{credit, ChPid, CTag, Credit, Drain}]}).
789793

790794
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
791-
delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
795+
delegate:invoke(QPid, {gen_server2, call, [{basic_get, ChPid, NoAck, LimiterPid}, infinity]}).
792796

793797
basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid,
794798
LimiterActive, ConsumerPrefetchCount, ConsumerTag,
795799
ExclusiveConsume, Args, OkMsg, ActingUser) ->
796800
ok = check_consume_arguments(QName, Args),
797-
delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
798-
ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
799-
Args, OkMsg, ActingUser}).
801+
delegate:invoke(QPid, {gen_server2, call,
802+
[{basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
803+
ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
804+
Args, OkMsg, ActingUser}, infinity]}).
800805

801806
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg, ActingUser) ->
802-
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}).
807+
delegate:invoke(QPid, {gen_server2, call,
808+
[{basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, infinity]}).
803809

804810
notify_decorators(#amqqueue{pid = QPid}) ->
805-
delegate:cast(QPid, notify_decorators).
811+
delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}).
806812

807813
notify_sent(QPid, ChPid) ->
808814
rabbit_amqqueue_common:notify_sent(QPid, ChPid).
809815

810816
notify_sent_queue_down(QPid) ->
811817
rabbit_amqqueue_common:notify_sent_queue_down(QPid).
812818

813-
resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}).
819+
resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [{resume, ChPid}]}).
814820

815821
internal_delete1(QueueName, OnlyDurable) ->
816822
ok = mnesia:delete({rabbit_queue, QueueName}),
@@ -907,12 +913,17 @@ set_ram_duration_target(QPid, Duration) ->
907913
set_maximum_since_use(QPid, Age) ->
908914
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
909915

910-
update_mirroring(QPid) -> ok = delegate:cast(QPid, update_mirroring).
916+
update_mirroring(QPid) ->
917+
ok = delegate:invoke_no_result(QPid, {gen_server2, cast, [update_mirroring]}).
911918

912-
sync_mirrors(#amqqueue{pid = QPid}) -> delegate:call(QPid, sync_mirrors);
913-
sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
914-
cancel_sync_mirrors(#amqqueue{pid = QPid}) -> delegate:call(QPid, cancel_sync_mirrors);
915-
cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors).
919+
sync_mirrors(#amqqueue{pid = QPid}) ->
920+
delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]});
921+
sync_mirrors(QPid) ->
922+
delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}).
923+
cancel_sync_mirrors(#amqqueue{pid = QPid}) ->
924+
delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]});
925+
cancel_sync_mirrors(QPid) ->
926+
delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}).
916927

917928
is_mirrored(Q) ->
918929
rabbit_mirror_queue_misc:is_mirrored(Q).
@@ -1031,8 +1042,8 @@ deliver(Qs, Delivery = #delivery{flow = Flow}) ->
10311042
%% done with it.
10321043
MMsg = {deliver, Delivery, false},
10331044
SMsg = {deliver, Delivery, true},
1034-
delegate:cast(MPids, MMsg),
1035-
delegate:cast(SPids, SMsg),
1045+
delegate:invoke_no_result(MPids, {gen_server2, cast, [MMsg]}),
1046+
delegate:invoke_no_result(SPids, {gen_server2, cast, [SMsg]}),
10361047
QPids.
10371048

10381049
qpids([]) -> {[], []}; %% optimisation

src/rabbit_control_pbe.erl

Lines changed: 59 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,61 +16,74 @@
1616

1717
-module(rabbit_control_pbe).
1818

19-
-export([encode/7]).
19+
-export([decode/4, encode/4, list_ciphers/0, list_hashes/0]).
2020

2121
% for testing purposes
2222
-export([evaluate_input_as_term/1]).
2323

24-
encode(ListCiphers, _ListHashes, _Decode, _Cipher, _Hash, _Iterations, _Args) when ListCiphers ->
25-
{ok, io_lib:format("~p", [rabbit_pbe:supported_ciphers()])};
24+
list_ciphers() ->
25+
{ok, io_lib:format("~p", [rabbit_pbe:supported_ciphers()])}.
2626

27-
encode(_ListCiphers, ListHashes, _Decode, _Cipher, _Hash, _Iterations, _Args) when ListHashes ->
28-
{ok, io_lib:format("~p", [rabbit_pbe:supported_hashes()])};
27+
list_hashes() ->
28+
{ok, io_lib:format("~p", [rabbit_pbe:supported_hashes()])}.
2929

30-
encode(_ListCiphers, _ListHashes, Decode, Cipher, Hash, Iterations, Args) ->
31-
CipherExists = lists:member(Cipher, rabbit_pbe:supported_ciphers()),
32-
HashExists = lists:member(Hash, rabbit_pbe:supported_hashes()),
33-
encode_encrypt_decrypt(CipherExists, HashExists, Decode, Cipher, Hash, Iterations, Args).
34-
35-
encode_encrypt_decrypt(CipherExists, _HashExists, _Decode, _Cipher, _Hash, _Iterations, _Args) when CipherExists =:= false ->
36-
{error, io_lib:format("The requested cipher is not supported", [])};
37-
38-
encode_encrypt_decrypt(_CipherExists, HashExists, _Decode, _Cipher, _Hash, _Iterations, _Args) when HashExists =:= false ->
39-
{error, io_lib:format("The requested hash is not supported", [])};
40-
41-
encode_encrypt_decrypt(_CipherExists, _HashExists, _Decode, _Cipher, _Hash, Iterations, _Args) when Iterations =< 0 ->
30+
validate(_Cipher, _Hash, Iterations, _Args) when Iterations =< 0 ->
4231
{error, io_lib:format("The requested number of iterations is incorrect", [])};
32+
validate(_Cipher, _Hash, _Iterations, Args) when length(Args) < 2 ->
33+
{error, io_lib:format("Please provide a value to encode/decode and a passphrase", [])};
34+
validate(_Cipher, _Hash, _Iterations, Args) when length(Args) > 2 ->
35+
{error, io_lib:format("Too many arguments. Please provide a value to encode/decode and a passphrase", [])};
36+
validate(Cipher, Hash, _Iterations, _Args) ->
37+
case lists:member(Cipher, rabbit_pbe:supported_ciphers()) of
38+
false ->
39+
{error, io_lib:format("The requested cipher is not supported", [])};
40+
true ->
41+
case lists:member(Hash, rabbit_pbe:supported_hashes()) of
42+
false ->
43+
{error, io_lib:format("The requested hash is not supported", [])};
44+
true -> ok
45+
end
46+
end.
4347

44-
encode_encrypt_decrypt(_CipherExists, _HashExists, Decode, Cipher, Hash, Iterations, Args) when length(Args) == 2, Decode =:= false ->
45-
[Value, PassPhrase] = Args,
46-
try begin
47-
TermValue = evaluate_input_as_term(Value),
48-
Result = rabbit_pbe:encrypt_term(Cipher, Hash, Iterations, list_to_binary(PassPhrase), TermValue),
49-
{ok, io_lib:format("~p", [{encrypted, Result}])}
50-
end
51-
catch
52-
_:Msg -> {error, io_lib:format("Error during cipher operation: ~p", [Msg])}
53-
end;
54-
55-
encode_encrypt_decrypt(_CipherExists, _HashExists, Decode, Cipher, Hash, Iterations, Args) when length(Args) == 2, Decode ->
56-
[Value, PassPhrase] = Args,
57-
try begin
58-
TermValue = evaluate_input_as_term(Value),
59-
TermToDecrypt = case TermValue of
60-
{encrypted, EncryptedTerm} ->
61-
EncryptedTerm;
62-
_ ->
63-
TermValue
64-
end,
65-
Result = rabbit_pbe:decrypt_term(Cipher, Hash, Iterations, list_to_binary(PassPhrase), TermToDecrypt),
66-
{ok, io_lib:format("~p", [Result])}
67-
end
68-
catch
69-
_:Msg -> {error, io_lib:format("Error during cipher operation: ~p", [Msg])}
70-
end;
48+
encode(Cipher, Hash, Iterations, Args) ->
49+
case validate(Cipher, Hash, Iterations, Args) of
50+
{error, Err} -> {error, Err};
51+
ok ->
52+
[Value, PassPhrase] = Args,
53+
try begin
54+
TermValue = evaluate_input_as_term(Value),
55+
Result = rabbit_pbe:encrypt_term(Cipher, Hash, Iterations,
56+
list_to_binary(PassPhrase),
57+
TermValue),
58+
{ok, io_lib:format("~p", [{encrypted, Result}])}
59+
end
60+
catch
61+
_:Msg -> {error, io_lib:format("Error during cipher operation: ~p", [Msg])}
62+
end
63+
end.
7164

72-
encode_encrypt_decrypt(_CipherExists, _HashExists, _Decode, _Cipher, _Hash, _Iterations, _Args) ->
73-
{error, io_lib:format("Please provide a value to encode/decode and a passphrase", [])}.
65+
decode(Cipher, Hash, Iterations, Args) ->
66+
case validate(Cipher, Hash, Iterations, Args) of
67+
{error, Err} -> {error, Err};
68+
ok ->
69+
[Value, PassPhrase] = Args,
70+
try begin
71+
TermValue = evaluate_input_as_term(Value),
72+
TermToDecrypt = case TermValue of
73+
{encrypted, EncryptedTerm} ->
74+
EncryptedTerm;
75+
_ ->
76+
TermValue
77+
end,
78+
Result = rabbit_pbe:decrypt_term(Cipher, Hash, Iterations,
79+
list_to_binary(PassPhrase),
80+
TermToDecrypt),
81+
{ok, io_lib:format("~p", [Result])}
82+
end
83+
catch
84+
_:Msg -> {error, io_lib:format("Error during cipher operation: ~p", [Msg])}
85+
end
86+
end.
7487

7588
evaluate_input_as_term(Input) ->
7689
{ok,Tokens,_EndLine} = erl_scan:string(Input ++ "."),

src/rabbit_exchange_parameters.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
-export([register/0]).
2424
-export([validate/5, notify/5, notify_clear/4]).
2525

26-
-import(rabbit_misc, [pget/2]).
27-
2826
-rabbit_boot_step({?MODULE,
2927
[{description, "exchange parameters"},
3028
{mfa, {rabbit_exchange_parameters, register, []}},
@@ -36,7 +34,8 @@ register() ->
3634
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, ?MODULE),
3735
%% ensure there are no leftovers from before node restart/crash
3836
rabbit_runtime_parameters:clear_component(
39-
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT),
37+
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
38+
?INTERNAL_USER),
4039
ok.
4140

4241
validate(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Term, _User) ->

src/rabbit_looking_glass.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
-module(rabbit_looking_glass).
1818

19+
-ignore_xref([{lg, trace, 4}]).
20+
1921
-export([boot/0]).
2022
-export([connections/0]).
2123

src/rabbit_runtime_parameters.erl

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353

5454
-export([parse_set/5, set/5, set_any/5, clear/4, clear_any/4, list/0, list/1,
5555
list_component/1, list/2, list_formatted/1, list_formatted/3,
56-
lookup/3, value/3, value/4, info_keys/0, clear_component/1]).
56+
lookup/3, value/3, value/4, info_keys/0, clear_component/2]).
5757

5858
-export([parse_set_global/3, set_global/3, value_global/1, value_global/2,
5959
list_global/0, list_global_formatted/0, list_global_formatted/2,
@@ -95,7 +95,7 @@
9595

9696
%%---------------------------------------------------------------------------
9797

98-
-import(rabbit_misc, [pget/2, pset/3]).
98+
-import(rabbit_misc, [pget/2]).
9999

100100
-define(TABLE, rabbit_runtime_parameters).
101101

@@ -228,14 +228,15 @@ clear_global(Key, ActingUser) ->
228228
end
229229
end.
230230

231-
clear_component(Component) ->
232-
case rabbit_runtime_parameters:list_component(Component) of
231+
clear_component(Component, ActingUser) ->
232+
case list_component(Component) of
233233
[] ->
234234
ok;
235235
Xs ->
236-
[rabbit_runtime_parameters:clear(pget(vhost, X),
237-
pget(component, X),
238-
pget(name, X))|| X <- Xs],
236+
[clear(pget(vhost, X),
237+
pget(component, X),
238+
pget(name, X),
239+
ActingUser) || X <- Xs],
239240
ok
240241
end.
241242

test/cluster_SUITE.erl

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,6 @@ delegates_async1(_Config, SecondaryNode) ->
123123
ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender),
124124
await_response(2),
125125

126-
LocalPids = spawn_responders(node(), Responder, 10),
127-
RemotePids = spawn_responders(SecondaryNode, Responder, 10),
128-
ok = delegate:invoke_no_result(LocalPids ++ RemotePids, Sender),
129-
await_response(20),
130-
131126
passed.
132127

133128
delegates_sync(Config) ->

0 commit comments

Comments
 (0)