Skip to content

Commit 35ef1e5

Browse files
authored
Merge pull request #3038 from rabbitmq/stream-tls
TLS support for streams
2 parents c0b4fef + 05bd6dd commit 35ef1e5

File tree

6 files changed

+180
-94
lines changed

6 files changed

+180
-94
lines changed

deps/rabbitmq_stream/Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ define PROJECT_ENV
88
{num_tcp_acceptors, 10},
99
{tcp_listen_options, [{backlog, 128},
1010
{nodelay, true}]},
11+
{ssl_listeners, []},
12+
{num_ssl_acceptors, 10},
13+
{ssl_listen_options, []},
1114
{initial_credits, 50000},
1215
{credits_required_for_unblocking, 12500},
1316
{frame_max, 1048576},

deps/rabbitmq_stream/priv/schema/rabbitmq_stream.schema

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,46 @@ end}.
128128
{datatype, integer}
129129
]}.
130130

131+
132+
%%
133+
%% TLS
134+
%%
135+
136+
{mapping, "stream.listeners.ssl", "rabbitmq_stream.ssl_listeners",[
137+
{datatype, {enum, [none]}}
138+
]}.
139+
140+
{mapping, "stream.listeners.ssl.$name", "rabbitmq_stream.ssl_listeners",[
141+
{datatype, [integer, ip]}
142+
]}.
143+
144+
{translation, "rabbitmq_stream.ssl_listeners",
145+
fun(Conf) ->
146+
case cuttlefish:conf_get("stream.listeners.ssl", Conf, undefined) of
147+
none -> [];
148+
_ ->
149+
Settings = cuttlefish_variable:filter_by_prefix("stream.listeners.ssl", Conf),
150+
[ V || {_, V} <- Settings ]
151+
end
152+
end}.
153+
154+
%% Number of Erlang processes that will accept connections for the SSL listeners.
155+
%%
156+
%% {num_ssl_acceptors, 10},
157+
158+
{mapping, "stream.num_acceptors.ssl", "rabbitmq_stream.num_ssl_acceptors", [
159+
{datatype, integer}
160+
]}.
161+
162+
%% Additional TLS options
163+
164+
%% Extract a name from the client's certificate when using TLS.
165+
%%
166+
%% Defaults to true.
167+
168+
{mapping, "stream.ssl_cert_login", "rabbitmq_stream.ssl_cert_login",
169+
[{datatype, {enum, [true, false]}}]}.
170+
131171
{mapping, "stream.initial_credits", "rabbitmq_stream.initial_credits", [
132172
{datatype, integer}
133173
]}.

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@
7979
stats_timer :: undefined | reference(),
8080
resource_alarm :: boolean(),
8181
send_file_oct ::
82-
atomics:atomics_ref()}). % number of bytes sent with send_file (for metrics)
82+
atomics:atomics_ref(), % number of bytes sent with send_file (for metrics)
83+
transport :: 'tcp' | 'ssl'}).
8384
-record(configuration,
8485
{initial_credits :: integer(),
8586
credits_required_for_unblocking :: integer(),
@@ -157,7 +158,8 @@ init([KeepaliveSup,
157158
#{initial_credits := InitialCredits,
158159
credits_required_for_unblocking := CreditsRequiredBeforeUnblocking,
159160
frame_max := FrameMax,
160-
heartbeat := Heartbeat}]) ->
161+
heartbeat := Heartbeat,
162+
transport := ConnTransport}]) ->
161163
process_flag(trap_exit, true),
162164
{ok, Sock} =
163165
rabbit_networking:handshake(Ref,
@@ -193,7 +195,8 @@ init([KeepaliveSup,
193195
connection_step = tcp_connected,
194196
frame_max = FrameMax,
195197
resource_alarm = false,
196-
send_file_oct = SendFileOct},
198+
send_file_oct = SendFileOct,
199+
transport = ConnTransport},
197200
State =
198201
#stream_connection_state{consumers = #{},
199202
blocked = false,
@@ -1364,7 +1367,8 @@ handle_frame_post_auth(Transport,
13641367
StreamSubscriptions,
13651368
virtual_host = VirtualHost,
13661369
user = User,
1367-
send_file_oct = SendFileOct} =
1370+
send_file_oct = SendFileOct,
1371+
transport = ConnTransport} =
13681372
Connection,
13691373
#stream_connection_state{consumers = Consumers} = State,
13701374
{request, CorrelationId,
@@ -1417,7 +1421,7 @@ handle_frame_post_auth(Transport,
14171421
{{?MODULE, QueueResource, SubscriptionId, self()}, []},
14181422
{ok, Segment} =
14191423
osiris:init_reader(LocalMemberPid, OffsetSpec,
1420-
CounterSpec),
1424+
CounterSpec, ConnTransport),
14211425
rabbit_log:info("Next offset for subscription ~p is ~p",
14221426
[SubscriptionId,
14231427
osiris_log:next_offset(Segment)]),

deps/rabbitmq_stream/src/rabbit_stream_sup.erl

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,23 @@ init([]) ->
3030
{ok, Listeners} = application:get_env(rabbitmq_stream, tcp_listeners),
3131
NumTcpAcceptors =
3232
application:get_env(rabbitmq_stream, num_tcp_acceptors, 10),
33-
{ok, SocketOpts} =
34-
application:get_env(rabbitmq_stream, tcp_listen_options),
33+
SocketOpts =
34+
application:get_env(rabbitmq_stream, tcp_listen_options, []),
35+
36+
{ok, SslListeners0} = application:get_env(rabbitmq_stream, ssl_listeners),
37+
SslSocketOpts =
38+
application:get_env(rabbitmq_stream, ssl_listen_options, []),
39+
{SslOpts, NumSslAcceptors, SslListeners}
40+
= case SslListeners0 of
41+
[] -> {none, 0, []};
42+
_ -> {rabbit_networking:ensure_ssl(),
43+
application:get_env(rabbitmq_stream, num_ssl_acceptors, 10),
44+
case rabbit_networking:poodle_check('STREAM') of
45+
ok -> SslListeners0;
46+
danger -> []
47+
end}
48+
end,
49+
3550
Nodes = rabbit_mnesia:cluster_nodes(all),
3651
OsirisConf = #{nodes => Nodes},
3752

@@ -64,7 +79,10 @@ init([]) ->
6479
[StreamManager, MetricsGc]
6580
++ listener_specs(fun tcp_listener_spec/1,
6681
[SocketOpts, ServerConfiguration, NumTcpAcceptors],
67-
Listeners)}}.
82+
Listeners)
83+
++ listener_specs(fun ssl_listener_spec/1,
84+
[SslSocketOpts, SslOpts, ServerConfiguration, NumSslAcceptors],
85+
SslListeners)}}.
6886

6987
listener_specs(Fun, Args, Listeners) ->
7088
[Fun([Address | Args])
@@ -80,7 +98,21 @@ tcp_listener_spec([Address,
8098
SocketOpts,
8199
ranch_tcp,
82100
rabbit_stream_connection_sup,
83-
Configuration,
84-
stream,
101+
Configuration#{transport => tcp},
102+
'stream',
85103
NumAcceptors,
86104
"Stream TCP listener").
105+
106+
ssl_listener_spec([Address,
107+
SocketOpts,
108+
SslOpts,
109+
Configuration,
110+
NumAcceptors]) ->
111+
rabbit_networking:tcp_listener_spec(rabbit_stream_listener_sup,
112+
Address, SocketOpts ++ SslOpts,
113+
ranch_ssl,
114+
rabbit_stream_connection_sup,
115+
Configuration#{transport => ssl},
116+
'stream/ssl',
117+
NumAcceptors,
118+
"Stream TLS listener").

deps/rabbitmq_stream/test/commands_SUITE.erl

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ list_connections_run(Config) ->
128128
?assertEqual([], Keys -- ?INFO_ITEMS),
129129
?assertEqual([], ?INFO_ITEMS -- Keys),
130130

131-
rabbit_stream_SUITE:test_close(S1, C1),
132-
rabbit_stream_SUITE:test_close(S2, C2),
131+
rabbit_stream_SUITE:test_close(gen_tcp, S1, C1),
132+
rabbit_stream_SUITE:test_close(gen_tcp, S2, C2),
133133
ok.
134134

135135
list_consumers_merge_defaults(_Config) ->
@@ -271,22 +271,22 @@ list_publishers_run(Config) ->
271271
ok.
272272

273273
create_stream(S, Stream, C0) ->
274-
rabbit_stream_SUITE:test_create_stream(S, Stream, C0).
274+
rabbit_stream_SUITE:test_create_stream(gen_tcp, S, Stream, C0).
275275

276276
subscribe(S, SubId, Stream, C) ->
277-
rabbit_stream_SUITE:test_subscribe(S, SubId, Stream, C).
277+
rabbit_stream_SUITE:test_subscribe(gen_tcp, S, SubId, Stream, C).
278278

279279
declare_publisher(S, PubId, Stream, C) ->
280-
rabbit_stream_SUITE:test_declare_publisher(S, PubId, Stream, C).
280+
rabbit_stream_SUITE:test_declare_publisher(gen_tcp, S, PubId, Stream, C).
281281

282282
delete_stream(S, Stream, C) ->
283-
rabbit_stream_SUITE:test_delete_stream(S, Stream, C).
283+
rabbit_stream_SUITE:test_delete_stream(gen_tcp, S, Stream, C).
284284

285285
metadata_update_stream_deleted(S, Stream, C) ->
286-
rabbit_stream_SUITE:test_metadata_update_stream_deleted(S, Stream, C).
286+
rabbit_stream_SUITE:test_metadata_update_stream_deleted(gen_tcp, S, Stream, C).
287287

288288
close(S, C) ->
289-
rabbit_stream_SUITE:test_close(S, C).
289+
rabbit_stream_SUITE:test_close(gen_tcp, S, C).
290290

291291
options(Config) ->
292292
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
@@ -334,8 +334,8 @@ start_stream_connection(Port) ->
334334
{ok, S} =
335335
gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
336336
C0 = rabbit_stream_core:init(0),
337-
C1 = rabbit_stream_SUITE:test_peer_properties(S, C0),
338-
C = rabbit_stream_SUITE:test_authenticate(S, C1),
337+
C1 = rabbit_stream_SUITE:test_peer_properties(gen_tcp, S, C0),
338+
C = rabbit_stream_SUITE:test_authenticate(gen_tcp, S, C1),
339339
{S, C}.
340340

341341
start_amqp_connection(Type, Node, Port) ->

0 commit comments

Comments
 (0)