Skip to content

Commit 6a13741

Browse files
committed
Merge branch 'stable'
2 parents a55ba69 + b875222 commit 6a13741

File tree

3 files changed

+64
-22
lines changed

3 files changed

+64
-22
lines changed

src/inet_proxy_dist.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,11 @@ do_setup(Kernel, Node, Type, MyNode, LongOrShortNames,SetupTime) ->
6969
undefined -> []
7070
end,
7171
ProxyPort = case inet_tcp_proxy:is_enabled() of
72-
true -> proplists:get_value(TcpPort, PortsMap, TcpPort);
72+
true -> P = proplists:get_value(TcpPort, PortsMap, TcpPort),
73+
error_logger:info_msg(
74+
"Using inet_tcp_proxy to connect to ~s (remote port changed from ~b to ~b)~n",
75+
[Node, TcpPort, P]),
76+
P;
7377
false -> TcpPort
7478
end,
7579
case inet_tcp:connect(Ip, ProxyPort,

src/inet_tcp_proxy.erl

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020

2121
-export([start/3, reconnect/1, is_enabled/0, allow/1, block/1]).
2222

23-
-define(TABLE, ?MODULE).
23+
-define(NODES_TO_BLOCK, inet_tcp_proxy__nodes_to_block).
24+
-define(NODES_BLOCKED, inet_tcp_proxy__nodes_blocked).
2425

2526
%% This can't start_link because there's no supervision hierarchy we
2627
%% can easily fit it into (we need to survive all application
@@ -44,16 +45,16 @@ reconnect(Nodes) ->
4445
ok.
4546

4647
is_enabled() ->
47-
lists:member(?TABLE, ets:all()).
48+
lists:member(?NODES_TO_BLOCK, ets:all()).
4849

4950
allow(Node) ->
50-
error_logger:info_msg("(~s) Allowing distribution between ~s and ~s~n",
51+
rabbit_log:warning("(~s) Allowing distribution between ~s and ~s~n",
5152
[?MODULE, node(), Node]),
52-
ets:delete(?TABLE, Node).
53+
true = ets:delete(?NODES_TO_BLOCK, Node).
5354
block(Node) ->
54-
error_logger:info_msg("(~s) BLOCKING distribution between ~s and ~s~n",
55+
rabbit_log:warning("(~s) BLOCKING distribution between ~s and ~s~n",
5556
[?MODULE, node(), Node]),
56-
ets:insert(?TABLE, {Node, block}).
57+
true = ets:insert(?NODES_TO_BLOCK, {Node, block}).
5758

5859
%%----------------------------------------------------------------------------
5960

@@ -67,14 +68,23 @@ error_handler(Thunk) ->
6768
%% over; die quietly.
6869
ok;
6970
_:X ->
70-
io:format(user, "TCP proxy died with ~p~n At ~p~n",
71+
error_logger:error_msg(
72+
"TCP proxy died with ~p~n At ~p~n",
73+
[X, erlang:get_stacktrace()]),
74+
io:format(standard_error,
75+
"TCP proxy died with ~p~n At ~p~n",
7176
[X, erlang:get_stacktrace()]),
77+
timer:sleep(1000),
7278
erlang:halt(1)
7379
end
7480
end.
7581

7682
go(Parent, Port, ProxyPort) ->
77-
ets:new(?TABLE, [public, named_table]),
83+
ets:new(?NODES_TO_BLOCK, [public, named_table]),
84+
ets:new(?NODES_BLOCKED, [public, named_table]),
85+
rabbit_log:info(
86+
"(~s) Listening on proxy port ~p~n",
87+
[?MODULE, ProxyPort]),
7888
{ok, Sock} = gen_tcp:listen(ProxyPort, [inet,
7989
{reuseaddr, true}]),
8090
Parent ! ready,
@@ -88,8 +98,12 @@ accept_loop(ListenSock, Port) ->
8898

8999
run_it(SockIn, Port) ->
90100
case {inet:peername(SockIn), inet:sockname(SockIn)} of
91-
{{ok, {_Addr, SrcPort}}, {ok, {Addr, _OtherPort}}} ->
101+
{{ok, {_Addr, SrcPort}}, {ok, {Addr, OtherPort}}} ->
92102
{ok, Remote, This} = inet_tcp_proxy_manager:lookup(SrcPort),
103+
rabbit_log:info(
104+
"(~s) => Incoming proxied connection from node ~s (port ~b) "
105+
"to node ~s (port ~b)~n",
106+
[?MODULE, Remote, SrcPort, This, OtherPort]),
93107
case node() of
94108
This -> ok;
95109
_ -> exit({not_me, node(), This})
@@ -101,18 +115,20 @@ run_it(SockIn, Port) ->
101115
end.
102116

103117
run_loop(Sockets, RemoteNode, Buf0) ->
104-
Block = [{RemoteNode, block}] =:= ets:lookup(?TABLE, RemoteNode),
118+
Block = [{RemoteNode, block}] =:= ets:lookup(?NODES_TO_BLOCK, RemoteNode),
119+
WasBlocked = [{RemoteNode, blocked}] =:= ets:lookup(?NODES_BLOCKED,
120+
RemoteNode),
105121
receive
106122
{tcp, Sock, Data} ->
107123
Buf = [Data | Buf0],
108-
case {Block, get(dist_was_blocked)} of
109-
{true, false} ->
110-
put(dist_was_blocked, Block),
124+
case {WasBlocked, Block} of
125+
{false, true} ->
126+
true = ets:insert(?NODES_BLOCKED, {RemoteNode, blocked}),
111127
rabbit_log:warning(
112128
"(~s) Distribution BLOCKED between ~s and ~s~n",
113129
[?MODULE, node(), RemoteNode]);
114-
{false, S} when S =:= true orelse S =:= undefined ->
115-
put(dist_was_blocked, Block),
130+
{true, false} ->
131+
true = ets:delete(?NODES_BLOCKED, RemoteNode),
116132
rabbit_log:warning(
117133
"(~s) Distribution allowed between ~s and ~s~n",
118134
[?MODULE, node(), RemoteNode]);
@@ -125,6 +141,9 @@ run_loop(Sockets, RemoteNode, Buf0) ->
125141
true -> run_loop(Sockets, RemoteNode, Buf)
126142
end;
127143
{tcp_closed, Sock} ->
144+
rabbit_log:info(
145+
"(~s) Distribution closed between ~s and ~s~n",
146+
[?MODULE, node(), RemoteNode]),
128147
gen_tcp:close(other(Sock, Sockets));
129148
X ->
130149
exit({weirdness, X})

src/rabbit_ct_broker_helpers.erl

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@
5555
set_partition_handling_mode_globally/2,
5656
enable_dist_proxy_manager/1,
5757
enable_dist_proxy/1,
58-
enable_dist_proxy_on_node/3,
58+
start_dist_proxy_on_node/2,
59+
disconnect_from_other_nodes/2,
5960
block_traffic_between/2,
6061
allow_traffic_between/2,
6162

@@ -634,20 +635,38 @@ enable_dist_proxy(Config) ->
634635
NodeConfigs = rabbit_ct_broker_helpers:get_node_configs(Config),
635636
Nodes = [?config(nodename, NodeConfig) || NodeConfig <- NodeConfigs],
636637
ManagerNode = node(),
638+
%% We first start the proxy process on all nodes, then we close the
639+
%% existing connection.
640+
%%
641+
%% If we do that in a single loop, i.e. start the proxy on node 1
642+
%% and disconnect it, then, start the proxy on node 2 and disconnect
643+
%% it, etc., there is a chance that the connection is reopened
644+
%% by a node where the proxy is still disabled. Therefore, that
645+
%% connection would bypass the proxy process even though we believe
646+
%% it to be enabled.
637647
ok = lists:foreach(
638648
fun(NodeConfig) ->
639649
ok = rabbit_ct_broker_helpers:rpc(Config,
640650
?config(nodename, NodeConfig),
641-
?MODULE, enable_dist_proxy_on_node,
642-
[NodeConfig, ManagerNode, Nodes])
651+
?MODULE, start_dist_proxy_on_node,
652+
[NodeConfig, ManagerNode])
653+
end, NodeConfigs),
654+
ok = lists:foreach(
655+
fun(NodeConfig) ->
656+
ok = rabbit_ct_broker_helpers:rpc(Config,
657+
?config(nodename, NodeConfig),
658+
?MODULE, disconnect_from_other_nodes,
659+
[NodeConfig, Nodes])
643660
end, NodeConfigs),
644661
Config.
645662

646-
enable_dist_proxy_on_node(NodeConfig, ManagerNode, Nodes) ->
647-
Nodename = ?config(nodename, NodeConfig),
663+
start_dist_proxy_on_node(NodeConfig, ManagerNode) ->
648664
DistPort = ?config(tcp_port_erlang_dist, NodeConfig),
649665
ProxyPort = ?config(tcp_port_erlang_dist_proxy, NodeConfig),
650-
ok = inet_tcp_proxy:start(ManagerNode, DistPort, ProxyPort),
666+
ok = inet_tcp_proxy:start(ManagerNode, DistPort, ProxyPort).
667+
668+
disconnect_from_other_nodes(NodeConfig, Nodes) ->
669+
Nodename = ?config(nodename, NodeConfig),
651670
ok = inet_tcp_proxy:reconnect(Nodes -- [Nodename]).
652671

653672
block_traffic_between(NodeA, NodeB) ->

0 commit comments

Comments
 (0)