Skip to content

Commit 69ad696

Browse files
committed
Add stream.advertised_tls_port setting
1 parent 1af7d7e commit 69ad696

10 files changed

+129
-60
lines changed

deps/rabbitmq_stream/priv/schema/rabbitmq_stream.schema

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,4 +195,8 @@ end}.
195195

196196
{mapping, "stream.advertised_port", "rabbitmq_stream.advertised_port", [
197197
{datatype, integer}
198+
]}.
199+
200+
{mapping, "stream.advertised_tls_port", "rabbitmq_stream.advertised_tls_port", [
201+
{datatype, integer}
198202
]}.

deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@
1919

2020
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
2121

22-
-ignore_xref([
23-
{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1},
24-
{'Elixir.RabbitMQ.CLI.Core.Helpers', nodes_in_cluster, 1},
25-
{'Elixir.RabbitMQ.CLI.Ctl.InfoKeys', prepare_info_keys, 1},
26-
{'Elixir.RabbitMQ.CLI.Ctl.RpcStream', receive_list_items, 7},
27-
{'Elixir.RabbitMQ.CLI.Ctl.InfoKeys', validate_info_keys, 2},
28-
{'Elixir.Enum', join, 2}
29-
]).
22+
-ignore_xref([{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1},
23+
{'Elixir.RabbitMQ.CLI.Core.Helpers', nodes_in_cluster, 1},
24+
{'Elixir.RabbitMQ.CLI.Ctl.InfoKeys', prepare_info_keys, 1},
25+
{'Elixir.RabbitMQ.CLI.Ctl.RpcStream', receive_list_items, 7},
26+
{'Elixir.RabbitMQ.CLI.Ctl.InfoKeys', validate_info_keys, 2},
27+
{'Elixir.Enum', join, 2}]).
3028

3129
-export([formatter/0,
3230
scopes/0,

deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand.erl

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@
1919

2020
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
2121

22-
-ignore_xref([
23-
{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1},
24-
{'Elixir.RabbitMQ.CLI.Core.Helpers', nodes_in_cluster, 1},
25-
{'Elixir.RabbitMQ.CLI.Ctl.InfoKeys', prepare_info_keys, 1},
26-
{'Elixir.RabbitMQ.CLI.Ctl.RpcStream', receive_list_items, 7},
27-
{'Elixir.RabbitMQ.CLI.Ctl.InfoKeys', validate_info_keys, 2},
28-
{'Elixir.Enum', join, 2}
29-
]).
22+
-ignore_xref([{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1},
23+
{'Elixir.RabbitMQ.CLI.Core.Helpers', nodes_in_cluster, 1},
24+
{'Elixir.RabbitMQ.CLI.Ctl.InfoKeys', prepare_info_keys, 1},
25+
{'Elixir.RabbitMQ.CLI.Ctl.RpcStream', receive_list_items, 7},
26+
{'Elixir.RabbitMQ.CLI.Ctl.InfoKeys', validate_info_keys, 2},
27+
{'Elixir.Enum', join, 2}]).
3028

3129
-export([formatter/0,
3230
scopes/0,

deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand.erl

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@
1919

2020
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
2121

22-
-ignore_xref([
23-
{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1},
24-
{'Elixir.RabbitMQ.CLI.Core.Helpers', nodes_in_cluster, 1},
25-
{'Elixir.RabbitMQ.CLI.Ctl.InfoKeys', prepare_info_keys, 1},
26-
{'Elixir.RabbitMQ.CLI.Ctl.RpcStream', receive_list_items, 7},
27-
{'Elixir.RabbitMQ.CLI.Ctl.InfoKeys', validate_info_keys, 2},
28-
{'Elixir.Enum', join, 2}
29-
]).
22+
-ignore_xref([{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1},
23+
{'Elixir.RabbitMQ.CLI.Core.Helpers', nodes_in_cluster, 1},
24+
{'Elixir.RabbitMQ.CLI.Ctl.InfoKeys', prepare_info_keys, 1},
25+
{'Elixir.RabbitMQ.CLI.Ctl.RpcStream', receive_list_items, 7},
26+
{'Elixir.RabbitMQ.CLI.Ctl.InfoKeys', validate_info_keys, 2},
27+
{'Elixir.Enum', join, 2}]).
3028

3129
-export([formatter/0,
3230
scopes/0,

deps/rabbitmq_stream/src/rabbit_stream.erl

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
-export([start/2,
2222
host/0,
2323
port/0,
24+
tls_port/0,
2425
kill_connection/1]).
2526
-export([stop/1]).
2627
-export([emit_connection_info_local/3,
@@ -78,6 +79,28 @@ port_from_listener() ->
7879
undefined, Listeners),
7980
Port.
8081

82+
tls_port() ->
83+
case application:get_env(rabbitmq_stream, advertised_tls_port,
84+
undefined)
85+
of
86+
undefined ->
87+
tls_port_from_listener();
88+
Port ->
89+
Port
90+
end.
91+
92+
tls_port_from_listener() ->
93+
Listeners = rabbit_networking:node_listeners(node()),
94+
Port =
95+
lists:foldl(fun (#listener{port = Port, protocol = 'stream/ssl'},
96+
_Acc) ->
97+
Port;
98+
(_, Acc) ->
99+
Acc
100+
end,
101+
undefined, Listeners),
102+
Port.
103+
81104
stop(_State) ->
82105
ok.
83106

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
resource_alarm :: boolean(),
8181
send_file_oct ::
8282
atomics:atomics_ref(), % number of bytes sent with send_file (for metrics)
83-
transport :: 'tcp' | 'ssl'}).
83+
transport :: tcp | ssl}).
8484
-record(configuration,
8585
{initial_credits :: integer(),
8686
credits_required_for_unblocking :: integer(),
@@ -1378,9 +1378,13 @@ handle_frame_post_auth(Transport,
13781378
OffsetSpec,
13791379
Credit,
13801380
Properties}}) ->
1381-
QueueResource = #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
1381+
QueueResource =
1382+
#resource{name = Stream,
1383+
kind = queue,
1384+
virtual_host = VirtualHost},
13821385
%% FIXME check the max number of subs is not reached already
1383-
case rabbit_stream_utils:check_read_permitted(QueueResource, User, #{})
1386+
case rabbit_stream_utils:check_read_permitted(QueueResource, User,
1387+
#{})
13841388
of
13851389
ok ->
13861390
case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream)
@@ -1418,10 +1422,16 @@ handle_frame_post_auth(Transport,
14181422
OffsetSpec,
14191423
Properties]),
14201424
CounterSpec =
1421-
{{?MODULE, QueueResource, SubscriptionId, self()}, []},
1425+
{{?MODULE,
1426+
QueueResource,
1427+
SubscriptionId,
1428+
self()},
1429+
[]},
14221430
{ok, Segment} =
1423-
osiris:init_reader(LocalMemberPid, OffsetSpec,
1424-
CounterSpec, ConnTransport),
1431+
osiris:init_reader(LocalMemberPid,
1432+
OffsetSpec,
1433+
CounterSpec,
1434+
ConnTransport),
14251435
rabbit_log:info("Next offset for subscription ~p is ~p",
14261436
[SubscriptionId,
14271437
osiris_log:next_offset(Segment)]),
@@ -1473,7 +1483,8 @@ handle_frame_post_auth(Transport,
14731483
ConsumerState1,
14741484

14751485
ConsumerOffset = osiris_log:next_offset(Segment1),
1476-
ConsumerOffsetLag = consumer_i(offset_lag, ConsumerState1),
1486+
ConsumerOffsetLag =
1487+
consumer_i(offset_lag, ConsumerState1),
14771488

14781489
rabbit_log:info("Subscription ~p is now at offset ~p with ~p message(s) "
14791490
"distributed after subscription",
@@ -1773,7 +1784,8 @@ handle_frame_post_auth(Transport,
17731784
end;
17741785
handle_frame_post_auth(Transport,
17751786
#stream_connection{socket = S,
1776-
virtual_host = VirtualHost} =
1787+
virtual_host = VirtualHost,
1788+
transport = TransportLayer} =
17771789
Connection,
17781790
State,
17791791
{request, CorrelationId, {metadata, Streams}}) ->
@@ -1816,8 +1828,13 @@ handle_frame_post_auth(Transport,
18161828
maps:keys(NodesMap)),
18171829
NodeEndpoints =
18181830
lists:foldr(fun(Node, Acc) ->
1831+
PortFunction =
1832+
case TransportLayer of
1833+
tcp -> port;
1834+
ssl -> tls_port
1835+
end,
18191836
Host = rpc:call(Node, rabbit_stream, host, []),
1820-
Port = rpc:call(Node, rabbit_stream, port, []),
1837+
Port = rpc:call(Node, rabbit_stream, PortFunction, []),
18211838
case {is_binary(Host), is_integer(Port)} of
18221839
{true, true} -> Acc#{Node => {Host, Port}};
18231840
_ ->
@@ -2308,7 +2325,8 @@ emit_stats(#stream_connection{publishers = Publishers} = Connection,
23082325
subscription_id = Id,
23092326
credit = Credit,
23102327
counters = Counters,
2311-
properties = Properties} = Consumer
2328+
properties = Properties} =
2329+
Consumer
23122330
<- maps:values(Consumers)],
23132331
[rabbit_stream_metrics:publisher_updated(self(),
23142332
stream_r(S, Connection),
@@ -2362,7 +2380,8 @@ consumer_i(messages_consumed, #consumer{counters = Counters}) ->
23622380
messages_consumed(Counters);
23632381
consumer_i(offset, #consumer{counters = Counters}) ->
23642382
consumer_offset(Counters);
2365-
consumer_i(offset_lag, #consumer{counters = Counters, segment = Log}) ->
2383+
consumer_i(offset_lag,
2384+
#consumer{counters = Counters, segment = Log}) ->
23662385
stream_committed_offset(Log) - consumer_offset(Counters);
23672386
consumer_i(connection_pid, _) ->
23682387
self();

deps/rabbitmq_stream/src/rabbit_stream_sup.erl

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,24 @@ init([]) ->
3333
SocketOpts =
3434
application:get_env(rabbitmq_stream, tcp_listen_options, []),
3535

36-
{ok, SslListeners0} = application:get_env(rabbitmq_stream, ssl_listeners),
36+
{ok, SslListeners0} =
37+
application:get_env(rabbitmq_stream, ssl_listeners),
3738
SslSocketOpts =
3839
application:get_env(rabbitmq_stream, ssl_listen_options, []),
39-
{SslOpts, NumSslAcceptors, SslListeners}
40-
= case SslListeners0 of
41-
[] -> {none, 0, []};
42-
_ -> {rabbit_networking:ensure_ssl(),
43-
application:get_env(rabbitmq_stream, num_ssl_acceptors, 10),
44-
case rabbit_networking:poodle_check('STREAM') of
45-
ok -> SslListeners0;
46-
danger -> []
47-
end}
48-
end,
40+
{SslOpts, NumSslAcceptors, SslListeners} =
41+
case SslListeners0 of
42+
[] ->
43+
{none, 0, []};
44+
_ ->
45+
{rabbit_networking:ensure_ssl(),
46+
application:get_env(rabbitmq_stream, num_ssl_acceptors, 10),
47+
case rabbit_networking:poodle_check('STREAM') of
48+
ok ->
49+
SslListeners0;
50+
danger ->
51+
[]
52+
end}
53+
end,
4954

5055
Nodes = rabbit_mnesia:cluster_nodes(all),
5156
OsirisConf = #{nodes => Nodes},
@@ -81,7 +86,10 @@ init([]) ->
8186
[SocketOpts, ServerConfiguration, NumTcpAcceptors],
8287
Listeners)
8388
++ listener_specs(fun ssl_listener_spec/1,
84-
[SslSocketOpts, SslOpts, ServerConfiguration, NumSslAcceptors],
89+
[SslSocketOpts,
90+
SslOpts,
91+
ServerConfiguration,
92+
NumSslAcceptors],
8593
SslListeners)}}.
8694

8795
listener_specs(Fun, Args, Listeners) ->
@@ -99,7 +107,7 @@ tcp_listener_spec([Address,
99107
ranch_tcp,
100108
rabbit_stream_connection_sup,
101109
Configuration#{transport => tcp},
102-
'stream',
110+
stream,
103111
NumAcceptors,
104112
"Stream TCP listener").
105113

@@ -109,7 +117,8 @@ ssl_listener_spec([Address,
109117
Configuration,
110118
NumAcceptors]) ->
111119
rabbit_networking:tcp_listener_spec(rabbit_stream_listener_sup,
112-
Address, SocketOpts ++ SslOpts,
120+
Address,
121+
SocketOpts ++ SslOpts,
113122
ranch_ssl,
114123
rabbit_stream_connection_sup,
115124
Configuration#{transport => ssl},

deps/rabbitmq_stream/test/commands_SUITE.erl

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,13 +277,20 @@ subscribe(S, SubId, Stream, C) ->
277277
rabbit_stream_SUITE:test_subscribe(gen_tcp, S, SubId, Stream, C).
278278

279279
declare_publisher(S, PubId, Stream, C) ->
280-
rabbit_stream_SUITE:test_declare_publisher(gen_tcp, S, PubId, Stream, C).
280+
rabbit_stream_SUITE:test_declare_publisher(gen_tcp,
281+
S,
282+
PubId,
283+
Stream,
284+
C).
281285

282286
delete_stream(S, Stream, C) ->
283287
rabbit_stream_SUITE:test_delete_stream(gen_tcp, S, Stream, C).
284288

285289
metadata_update_stream_deleted(S, Stream, C) ->
286-
rabbit_stream_SUITE:test_metadata_update_stream_deleted(gen_tcp, S, Stream, C).
290+
rabbit_stream_SUITE:test_metadata_update_stream_deleted(gen_tcp,
291+
S,
292+
Stream,
293+
C).
287294

288295
close(S, C) ->
289296
rabbit_stream_SUITE:test_close(gen_tcp, S, C).

deps/rabbitmq_stream/test/config_schema_SUITE_data/rabbitmq_stream.snippets

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,11 @@
5454
[rabbitmq_stream]},
5555
{advertised_host_port,
5656
"stream.advertised_host = some-host
57-
stream.advertised_port = 5556",
57+
stream.advertised_port = 5556
58+
stream.advertised_tls_port = 5553",
5859
[{rabbitmq_stream,[{advertised_host, <<"some-host">>},
59-
{advertised_port, 5556}]}],
60+
{advertised_port, 5556},
61+
{advertised_tls_port, 5553}]}],
6062
[rabbitmq_stream]},
6163
{credits,
6264
"stream.frame_max = 2097152

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ all() ->
3131

3232
groups() ->
3333
[{single_node, [],
34-
[test_stream, test_stream_tls, test_gc_consumers, test_gc_publishers]},
34+
[test_stream,
35+
test_stream_tls,
36+
test_gc_consumers,
37+
test_gc_publishers]},
3538
{cluster, [], [test_stream, test_stream_tls, java]}].
3639

3740
init_per_suite(Config) ->
@@ -44,7 +47,9 @@ end_per_suite(Config) ->
4447
init_per_group(single_node, Config) ->
4548
Config1 =
4649
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]),
47-
Config2 = rabbit_ct_helpers:set_config(Config1, {rabbitmq_ct_tls_verify, verify_none}),
50+
Config2 =
51+
rabbit_ct_helpers:set_config(Config1,
52+
{rabbitmq_ct_tls_verify, verify_none}),
4853
rabbit_ct_helpers:run_setup_steps(Config2,
4954
[fun(StepConfig) ->
5055
rabbit_ct_helpers:merge_app_env(StepConfig,
@@ -61,7 +66,9 @@ init_per_group(cluster = Group, Config) ->
6166
[{rmq_nodes_count, 3},
6267
{rmq_nodename_suffix, Group},
6368
{tcp_ports_base}]),
64-
Config3 = rabbit_ct_helpers:set_config(Config2, {rabbitmq_ct_tls_verify, verify_none}),
69+
Config3 =
70+
rabbit_ct_helpers:set_config(Config2,
71+
{rabbitmq_ct_tls_verify, verify_none}),
6572
rabbit_ct_helpers:run_setup_steps(Config3,
6673
[fun(StepConfig) ->
6774
rabbit_ct_helpers:merge_app_env(StepConfig,
@@ -153,11 +160,13 @@ java(Config) ->
153160
rabbit_ct_helpers:make(Config, DataDir,
154161
["tests",
155162
{"NODE1_STREAM_PORT=~b", [StreamPortNode1]},
156-
{"NODE1_STREAM_PORT_TLS=~b", [StreamPortTlsNode1]},
163+
{"NODE1_STREAM_PORT_TLS=~b",
164+
[StreamPortTlsNode1]},
157165
{"NODE1_NAME=~p", [Node1Name]},
158166
{"NODE2_NAME=~p", [Node2Name]},
159167
{"NODE2_STREAM_PORT=~b", [StreamPortNode2]},
160-
{"NODE2_STREAM_PORT_TLS=~b", [StreamPortTlsNode2]},
168+
{"NODE2_STREAM_PORT_TLS=~b",
169+
[StreamPortTlsNode2]},
161170
{"RABBITMQCTL=~p", [RabbitMqCtl]}]),
162171
{ok, _} = MakeResult.
163172

@@ -186,7 +195,8 @@ get_node_name(Config, Node) ->
186195

187196
test_server(Transport, Port) ->
188197
{ok, S} =
189-
Transport:connect("localhost", Port, [{active, false}, {mode, binary}]),
198+
Transport:connect("localhost", Port,
199+
[{active, false}, {mode, binary}]),
190200
C0 = rabbit_stream_core:init(0),
191201
C1 = test_peer_properties(Transport, S, C0),
192202
C2 = test_authenticate(Transport, S, C1),
@@ -325,7 +335,8 @@ test_subscribe(Transport, S, SubscriptionId, Stream, C0) ->
325335
C.
326336

327337
test_deliver(Transport, S, SubscriptionId, COffset, Body, C0) ->
328-
{C, [{deliver, SubscriptionId, Chunk}]} = receive_commands(Transport, S, C0),
338+
{C, [{deliver, SubscriptionId, Chunk}]} =
339+
receive_commands(Transport, S, C0),
329340
<<5:4/unsigned,
330341
0:4/unsigned,
331342
0:8,

0 commit comments

Comments
 (0)