Skip to content

Commit ee7c6d9

Browse files
Merge pull request #9374 from rabbitmq/rabbitmq-server-9371
Use pg_local to track AMQP 1.0 connections
2 parents ed88e38 + c94d22a commit ee7c6d9

File tree

8 files changed

+73
-39
lines changed

8 files changed

+73
-39
lines changed

deps/amqp_client/src/amqp_connection.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@
6565
-export([error_atom/1]).
6666
-export([info/2, info_keys/1, info_keys/0]).
6767
-export([connection_name/1, update_secret/3]).
68-
-export([socket_adapter_info/2]).
68+
-export([socket_adapter_info/2,
69+
socket_adapter_info/3]).
6970

7071
-define(DEFAULT_CONSUMER, {amqp_selective_consumer, []}).
7172

@@ -379,7 +380,12 @@ info_keys() ->
379380
%% @doc Takes a socket and a protocol, returns an #amqp_adapter_info{}
380381
%% based on the socket for the protocol given.
381382
socket_adapter_info(Sock, Protocol) ->
382-
amqp_direct_connection:socket_adapter_info(Sock, Protocol).
383+
socket_adapter_info(Sock, Protocol, undefined).
384+
385+
%% @doc Takes a socket and a protocol, returns an #amqp_adapter_info{}
386+
%% based on the socket for the protocol given.
387+
socket_adapter_info(Sock, Protocol, UniqueId) ->
388+
amqp_direct_connection:socket_adapter_info(Sock, Protocol, UniqueId).
383389

384390
%% @spec (ConnectionPid) -> ConnectionName
385391
%% where

deps/amqp_client/src/amqp_direct_connection.erl

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
-export([init/0, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
1818
info_keys/0, handle_message/2, closing/3, channels_terminated/1]).
1919

20-
-export([socket_adapter_info/2]).
20+
-export([socket_adapter_info/2,
21+
socket_adapter_info/3]).
2122

2223
-record(state, {node,
2324
user,
@@ -176,17 +177,26 @@ ensure_adapter_info(A = #amqp_adapter_info{name = unknown}) ->
176177
ensure_adapter_info(Info) -> Info.
177178

178179
socket_adapter_info(Sock, Protocol) ->
180+
socket_adapter_info(Sock, Protocol, undefined).
181+
182+
socket_adapter_info(Sock, Protocol, UniqueId) ->
179183
{PeerHost, PeerPort, Host, Port} =
180-
case rabbit_net:socket_ends(Sock, inbound) of
181-
{ok, Res} -> Res;
182-
_ -> {unknown, unknown, unknown, unknown}
183-
end,
184-
Name = case rabbit_net:connection_string(Sock, inbound) of
185-
{ok, Res1} -> Res1;
186-
_Error -> "(unknown)"
184+
case rabbit_net:socket_ends(Sock, inbound) of
185+
{ok, Res} -> Res;
186+
_ -> {unknown, unknown, unknown, unknown}
187+
end,
188+
ConnectionString = case rabbit_net:connection_string(Sock, inbound) of
189+
{ok, Res1} -> Res1;
190+
_Error -> "(unknown)"
191+
end,
192+
Name = case UniqueId of
193+
undefined ->
194+
rabbit_data_coercion:to_binary(ConnectionString);
195+
_ ->
196+
rabbit_data_coercion:to_binary(rabbit_misc:format("~s (~tp)", [ConnectionString, UniqueId]))
187197
end,
188198
#amqp_adapter_info{protocol = Protocol,
189-
name = list_to_binary(Name),
199+
name = Name,
190200
host = Host,
191201
port = Port,
192202
peer_host = PeerHost,

deps/rabbit/test/rabbit_core_metrics_gc_SUITE.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,13 @@ connection_metrics(Config) ->
127127

128128
DeadPid = rabbit_ct_broker_helpers:rpc(Config, A, ?MODULE, dead_pid, []),
129129

130+
Infos = [{info0, foo}, {info1, bar}, {info2, baz},
131+
{authz_backends, [rabbit_auth_backend_oauth2,rabbit_auth_backend_http]}],
132+
130133
rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics,
131-
connection_created, [DeadPid, infos]),
134+
connection_created, [DeadPid, Infos]),
132135
rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics,
133-
connection_stats, [DeadPid, infos]),
136+
connection_stats, [DeadPid, Infos]),
134137
rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics,
135138
connection_stats, [DeadPid, 1, 1, 1]),
136139

deps/rabbit_common/src/rabbit_core_metrics.erl

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,9 @@ terminate() ->
120120
|| {Table, _Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES],
121121
ok.
122122

123-
connection_created(Pid, Infos) ->
124-
ets:insert(connection_created, {Pid, Infos}),
123+
connection_created(Pid, Infos0) ->
124+
Infos1 = maybe_cleanup_infos(Infos0),
125+
ets:insert(connection_created, {Pid, Infos1}),
125126
ets:update_counter(connection_churn_metrics, node(), {2, 1},
126127
?CONNECTION_CHURN_METRICS),
127128
ok.
@@ -446,3 +447,14 @@ format_auth_attempt({{RemoteAddress, Username, Protocol}, Total, Succeeded, Fail
446447
format_auth_attempt({Protocol, Total, Succeeded, Failed}) ->
447448
[{protocol, atom_to_binary(Protocol, utf8)}, {auth_attempts, Total},
448449
{auth_attempts_failed, Failed}, {auth_attempts_succeeded, Succeeded}].
450+
451+
maybe_cleanup_infos(Infos0) when is_list(Infos0) ->
452+
%% Note: authz_backends is added in rabbit_amqp1_0_session_sup:adapter_info/3
453+
%% We delete it here, if present, because it should not be stored in the
454+
%% connection_created table.
455+
%%
456+
%% TODO @ansd this will no longer be necessary once this PR is merged:
457+
%% https://github.com/rabbitmq/rabbitmq-server/pull/9022
458+
proplists:delete(authz_backends, Infos0);
459+
maybe_cleanup_infos(Infos) ->
460+
Infos.

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
%%
77
-module(rabbit_amqp1_0).
88

9-
-export([connection_info_local/1,
10-
emit_connection_info_local/3,
9+
-export([emit_connection_info_local/3,
1110
emit_connection_info_all/4,
12-
list/0]).
11+
list/0,
12+
register_connection/1,
13+
unregister_connection/1]).
1314

1415
emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
1516
Pids = [spawn_link(Node, rabbit_amqp1_0, emit_connection_info_local,
@@ -26,17 +27,14 @@ emit_connection_info_local(Items, Ref, AggregatorPid) ->
2627
end,
2728
list()).
2829

29-
connection_info_local(Items) ->
30-
Connections = list(),
31-
[rabbit_amqp1_0_reader:info(Pid, Items) || Pid <- Connections].
32-
30+
-spec list() -> [pid()].
3331
list() ->
34-
[ReaderPid
35-
|| {_, TcpPid, _, [tcp_listener_sup]} <- supervisor:which_children(rabbit_sup),
36-
{_, RanchEPid, _, [ranch_embedded_sup]} <- supervisor:which_children(TcpPid),
37-
{_, RanchLPid, _, [ranch_listener_sup]} <- supervisor:which_children(RanchEPid),
38-
{_, RanchCSPid, _, [ranch_conns_sup_sup]} <- supervisor:which_children(RanchLPid),
39-
{_, RanchCPid, _, [ranch_conns_sup]} <- supervisor:which_children(RanchCSPid),
40-
{rabbit_connection_sup, ConnPid, _, _} <- supervisor:which_children(RanchCPid),
41-
{reader, ReaderPid, _, _} <- supervisor:which_children(ConnPid)
42-
].
32+
pg_local:get_members(rabbit_amqp10_connections).
33+
34+
-spec register_connection(pid()) -> ok.
35+
register_connection(Pid) ->
36+
pg_local:join(rabbit_amqp10_connections, Pid).
37+
38+
-spec unregister_connection(pid()) -> ok.
39+
unregister_connection(Pid) ->
40+
pg_local:leave(rabbit_amqp10_connections, Pid).

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,12 @@ update_last_blocked_by(Throttle) ->
238238

239239
close_connection(State = #v1{connection = #v1_connection{
240240
timeout_sec = TimeoutSec}}) ->
241+
Pid = self(),
241242
erlang:send_after((if TimeoutSec > 0 andalso
242243
TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
243244
true -> ?CLOSING_TIMEOUT
244-
end) * 1000, self(), terminate_connection),
245+
end) * 1000, Pid, terminate_connection),
246+
rabbit_amqp1_0:unregister_connection(Pid),
245247
State#v1{connection_state = closed}.
246248

247249
handle_dependent_exit(ChPid, Reason, State) ->
@@ -434,6 +436,7 @@ handle_1_0_connection_frame(#'v1_0.open'{ max_frame_size = ClientFrameMax,
434436
container_id = {utf8, rabbit_nodes:cluster_name()},
435437
properties = server_properties()}),
436438
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
439+
rabbit_amqp1_0:register_connection(self()),
437440
control_throttle(
438441
State1#v1{throttle = Throttle#throttle{alarmed_by = Conserve}});
439442

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_sup.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ start_link({amqp10_framing, Sock, Channel, FrameMax, ReaderPid,
6262
start =>
6363
{rabbit_amqp1_0_session_process, start_link, [
6464
{Channel, ReaderPid, WriterPid, User, VHost, FrameMax,
65-
adapter_info(User, SocketForAdapterInfo), Collector}
65+
adapter_info(User, SocketForAdapterInfo, Channel), Collector}
6666
]},
6767
restart => transient,
6868
significant => true,
@@ -98,7 +98,7 @@ init([]) ->
9898
%% See rabbit_direct.erl to see how `authz_bakends` is propagated from
9999
% amqp_adapter_info.additional_info to the rabbit_access_control module
100100

101-
adapter_info(User, Sock) ->
102-
AdapterInfo = amqp_connection:socket_adapter_info(Sock, {'AMQP', "1.0"}),
101+
adapter_info(User, Sock, UniqueId) ->
102+
AdapterInfo = amqp_connection:socket_adapter_info(Sock, {'AMQP', "1.0"}, UniqueId),
103103
AdapterInfo#amqp_adapter_info{additional_info =
104104
AdapterInfo#amqp_adapter_info.additional_info ++ [{authz_backends, User#user.authz_backends}]}.

deps/rabbitmq_amqp1_0/test/proxy_protocol_SUITE.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ proxy_protocol_v1(Config) ->
6565
{ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT),
6666
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
6767
?MODULE, connection_name, []),
68-
match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81$">>, [{capture, none}]),
68+
match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81 \\(\\d\\)">>, [{capture, none}]),
6969
gen_tcp:close(Socket),
7070
ok.
7171

@@ -82,7 +82,7 @@ proxy_protocol_v1_tls(Config) ->
8282
timer:sleep(1000),
8383
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
8484
?MODULE, connection_name, []),
85-
match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81$">>, [{capture, none}]),
85+
match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81 \\(\\d\\)$">>, [{capture, none}]),
8686
gen_tcp:close(Socket),
8787
ok.
8888

@@ -100,7 +100,7 @@ proxy_protocol_v2_local(Config) ->
100100
{ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT),
101101
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
102102
?MODULE, connection_name, []),
103-
match = re:run(ConnectionName, <<"^127.0.0.1:\\d+ -> 127.0.0.1:\\d+$">>, [{capture, none}]),
103+
match = re:run(ConnectionName, <<"^127.0.0.1:\\d+ -> 127.0.0.1:\\d+ \\(\\d\\)$">>, [{capture, none}]),
104104
gen_tcp:close(Socket),
105105
ok.
106106

@@ -144,7 +144,9 @@ connection_name() ->
144144
end.
145145

146146
connection_registered() ->
147-
length(ets:tab2list(connection_created)) > 0.
147+
I = ets:info(connection_created),
148+
Size = proplists:get_value(size, I),
149+
Size > 0.
148150

149151
retry(_Function, 0) ->
150152
false;

0 commit comments

Comments
 (0)