Skip to content

AMQP 1.0 Erlang client: integrate credentials_obfuscation #9767

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion deps/amqp10_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ rabbitmq_app(
],
license_files = [":license_files"],
priv = [":priv"],
deps = ["//deps/amqp10_common:erlang_app"],
deps = [
"//deps/amqp10_common:erlang_app",
"@credentials_obfuscation//:erlang_app",
],
)

xref(
Expand Down
2 changes: 1 addition & 1 deletion deps/amqp10_client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ endef
PACKAGES_DIR ?= $(abspath PACKAGES)

BUILD_DEPS = rabbit_common elvis_mk
DEPS = amqp10_common
DEPS = amqp10_common credentials_obfuscation
TEST_DEPS = rabbit rabbitmq_amqp1_0 rabbitmq_ct_helpers
LOCAL_DEPS = ssl inets crypto public_key

Expand Down
12 changes: 10 additions & 2 deletions deps/amqp10_client/src/amqp10_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
-spec open_connection(inet:socket_address() | inet:hostname(),
inet:port_number()) -> supervisor:startchild_ret().
open_connection(Addr, Port) ->
_ = ensure_started(),
open_connection(#{address => Addr, port => Port, notify => self(),
sasl => anon}).

Expand All @@ -97,14 +98,19 @@ open_connection(Addr, Port) ->
-spec open_connection(connection_config()) ->
supervisor:startchild_ret().
open_connection(ConnectionConfig0) ->
_ = ensure_started(),

Notify = maps:get(notify, ConnectionConfig0, self()),
NotifyWhenOpened = maps:get(notify_when_opened, ConnectionConfig0, self()),
NotifyWhenClosed = maps:get(notify_when_closed, ConnectionConfig0, self()),
amqp10_client_connection:open(ConnectionConfig0#{
ConnectionConfig1 = ConnectionConfig0#{
notify => Notify,
notify_when_opened => NotifyWhenOpened,
notify_when_closed => NotifyWhenClosed
}).
},
Sasl = maps:get(sasl, ConnectionConfig1),
ConnectionConfig2 = ConnectionConfig1#{sasl => amqp10_client_connection:encrypt_sasl(Sasl)},
amqp10_client_connection:open(ConnectionConfig2).

%% @doc Opens a connection using a connection_config map
%% This is asynchronous and will notify completion to the caller using
Expand Down Expand Up @@ -497,6 +503,8 @@ try_to_existing_atom(L) when is_list(L) ->
throw({non_existent_atom, L})
end.

ensure_started() ->
_ = application:ensure_all_started(credentials_obfuscation).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
Expand Down
46 changes: 35 additions & 11 deletions deps/amqp10_client/src/amqp10_client_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
socket_ready/2,
protocol_header_received/5,
begin_session/1,
heartbeat/1]).
heartbeat/1,
encrypt_sasl/1,
decrypt_sasl/1]).

%% gen_fsm callbacks.
-export([init/1,
Expand All @@ -56,6 +58,9 @@

-type address() :: inet:socket_address() | inet:hostname().

-type encrypted_sasl() :: {plaintext, binary()} | {encrypted, binary()}.
-type decrypted_sasl() :: none | anon | {plain, User :: binary(), Pwd :: binary()}.

-type connection_config() ::
#{container_id => binary(), % AMQP container id
hostname => binary(), % the dns name of the target host
Expand All @@ -72,7 +77,9 @@
% set to a negative value to allow a sender to "overshoot" the flow
% control by this margin
transfer_limit_margin => 0 | neg_integer(),
sasl => none | anon | {plain, User :: binary(), Pwd :: binary()},
%% These credentials_obfuscation-wrapped values have the type of
%% decrypted_sasl/0
sasl => encrypted_sasl() | decrypted_sasl(),
notify => pid(),
notify_when_opened => pid() | none,
notify_when_closed => pid() | none
Expand All @@ -92,7 +99,9 @@
}).

-export_type([connection_config/0,
amqp10_socket/0]).
amqp10_socket/0,
encrypted_sasl/0,
decrypted_sasl/0]).

%% -------------------------------------------------------------------
%% Public API.
Expand Down Expand Up @@ -125,6 +134,18 @@ open(Config) ->
close(Pid, Reason) ->
gen_statem:cast(Pid, {close, Reason}).

-spec encrypt_sasl(decrypted_sasl()) -> encrypted_sasl().
encrypt_sasl(none) ->
credentials_obfuscation:encrypt(none);
encrypt_sasl(DecryptedSasl) ->
credentials_obfuscation:encrypt(term_to_binary(DecryptedSasl)).

-spec decrypt_sasl(encrypted_sasl()) -> decrypted_sasl().
decrypt_sasl(none) ->
credentials_obfuscation:decrypt(none);
decrypt_sasl(EncryptedSasl) ->
binary_to_term(credentials_obfuscation:decrypt(EncryptedSasl)).

%% -------------------------------------------------------------------
%% Private API.
%% -------------------------------------------------------------------
Expand Down Expand Up @@ -166,8 +187,9 @@ init([Sup, Config0]) ->
expecting_socket(_EvtType, {socket_ready, Socket},
State = #state{config = Cfg}) ->
State1 = State#state{socket = Socket},
case Cfg of
#{sasl := none} ->
Sasl = credentials_obfuscation:decrypt(maps:get(sasl, Cfg)),
case Sasl of
none ->
ok = socket_send(Socket, ?AMQP_PROTOCOL_HEADER),
{next_state, hdr_sent, State1};
_ ->
Expand All @@ -193,16 +215,17 @@ sasl_hdr_sent({call, From}, begin_session,

sasl_hdr_rcvds(_EvtType, #'v1_0.sasl_mechanisms'{
sasl_server_mechanisms = {array, symbol, Mechs}},
State = #state{config = #{sasl := Sasl}}) ->
SaslBin = {symbol, sasl_to_bin(Sasl)},
State = #state{config = #{sasl := EncryptedSasl}}) ->
DecryptedSasl = decrypt_sasl(EncryptedSasl),
SaslBin = {symbol, decrypted_sasl_to_bin(DecryptedSasl)},
case lists:any(fun(S) when S =:= SaslBin -> true;
(_) -> false
end, Mechs) of
true ->
ok = send_sasl_init(State, Sasl),
ok = send_sasl_init(State, DecryptedSasl),
{next_state, sasl_init_sent, State};
false ->
{stop, {sasl_not_supported, Sasl}, State}
{stop, {sasl_not_supported, DecryptedSasl}, State}
end;
sasl_hdr_rcvds({call, From}, begin_session,
#state{pending_session_reqs = PendingSessionReqs} = State) ->
Expand Down Expand Up @@ -522,8 +545,9 @@ translate_err(#'v1_0.error'{condition = Cond, description = Desc}) ->
amqp10_event(Evt) ->
{amqp10_event, {connection, self(), Evt}}.

sasl_to_bin({plain, _, _}) -> <<"PLAIN">>;
sasl_to_bin(anon) -> <<"ANONYMOUS">>.
decrypted_sasl_to_bin({plain, _, _}) -> <<"PLAIN">>;
decrypted_sasl_to_bin(anon) -> <<"ANONYMOUS">>;
decrypted_sasl_to_bin(none) -> <<"ANONYMOUS">>.

config_defaults() ->
#{sasl => none,
Expand Down