Skip to content

Commit 3fefa8e

Browse files
dcorbachokjnilsson
authored andcommitted
Use ssl option when initialising data reader
1 parent 8f54150 commit 3fefa8e

File tree

3 files changed

+17
-9
lines changed

3 files changed

+17
-9
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@
7979
stats_timer :: undefined | reference(),
8080
resource_alarm :: boolean(),
8181
send_file_oct ::
82-
atomics:atomics_ref()}). % number of bytes sent with send_file (for metrics)
82+
atomics:atomics_ref(),
83+
transport :: 'tcp' | 'ssl'}). % number of bytes sent with send_file (for metrics)
8384
-record(configuration,
8485
{initial_credits :: integer(),
8586
credits_required_for_unblocking :: integer(),
@@ -157,7 +158,8 @@ init([KeepaliveSup,
157158
#{initial_credits := InitialCredits,
158159
credits_required_for_unblocking := CreditsRequiredBeforeUnblocking,
159160
frame_max := FrameMax,
160-
heartbeat := Heartbeat}]) ->
161+
heartbeat := Heartbeat,
162+
transport := ConnTransport}]) ->
161163
process_flag(trap_exit, true),
162164
{ok, Sock} =
163165
rabbit_networking:handshake(Ref,
@@ -193,7 +195,8 @@ init([KeepaliveSup,
193195
connection_step = tcp_connected,
194196
frame_max = FrameMax,
195197
resource_alarm = false,
196-
send_file_oct = SendFileOct},
198+
send_file_oct = SendFileOct,
199+
transport = ConnTransport},
197200
State =
198201
#stream_connection_state{consumers = #{},
199202
blocked = false,
@@ -1364,7 +1367,8 @@ handle_frame_post_auth(Transport,
13641367
StreamSubscriptions,
13651368
virtual_host = VirtualHost,
13661369
user = User,
1367-
send_file_oct = SendFileOct} =
1370+
send_file_oct = SendFileOct,
1371+
transport = ConnTransport} =
13681372
Connection,
13691373
#stream_connection_state{consumers = Consumers} = State,
13701374
{request, CorrelationId,
@@ -1417,7 +1421,7 @@ handle_frame_post_auth(Transport,
14171421
{{?MODULE, QueueResource, SubscriptionId, self()}, []},
14181422
{ok, Segment} =
14191423
osiris:init_reader(LocalMemberPid, OffsetSpec,
1420-
CounterSpec),
1424+
CounterSpec, ConnTransport),
14211425
rabbit_log:info("Next offset for subscription ~p is ~p",
14221426
[SubscriptionId,
14231427
osiris_log:next_offset(Segment)]),

deps/rabbitmq_stream/src/rabbit_stream_sup.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ tcp_listener_spec([Address,
9898
SocketOpts,
9999
ranch_tcp,
100100
rabbit_stream_connection_sup,
101-
Configuration,
101+
Configuration#{transport => tcp},
102102
'stream',
103103
NumAcceptors,
104104
"Stream TCP listener").
@@ -112,7 +112,7 @@ ssl_listener_spec([Address,
112112
Address, SocketOpts ++ SslOpts,
113113
ranch_ssl,
114114
rabbit_stream_connection_sup,
115-
Configuration,
115+
Configuration#{transport => ssl},
116116
'stream/ssl',
117117
NumAcceptors,
118118
"Stream TLS listener").

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ all() ->
3232
groups() ->
3333
[{single_node, [],
3434
[test_stream, test_stream_tls, test_gc_consumers, test_gc_publishers]},
35-
{cluster, [], [test_stream, java]}].
35+
{cluster, [], [test_stream, test_stream_tls, java]}].
3636

3737
init_per_suite(Config) ->
3838
rabbit_ct_helpers:log_environment(),
@@ -61,7 +61,8 @@ init_per_group(cluster = Group, Config) ->
6161
[{rmq_nodes_count, 3},
6262
{rmq_nodename_suffix, Group},
6363
{tcp_ports_base}]),
64-
rabbit_ct_helpers:run_setup_steps(Config2,
64+
Config3 = rabbit_ct_helpers:set_config(Config2, {rabbitmq_ct_tls_verify, verify_none}),
65+
rabbit_ct_helpers:run_setup_steps(Config3,
6566
[fun(StepConfig) ->
6667
rabbit_ct_helpers:merge_app_env(StepConfig,
6768
{aten,
@@ -342,6 +343,9 @@ test_deliver(Transport, S, SubscriptionId, Body, C0) ->
342343
Chunk,
343344
C.
344345

346+
test_metadata_update_stream_deleted(S, Stream) ->
347+
test_metadata_update_stream_deleted(gen_tcp, S, Stream).
348+
345349
test_metadata_update_stream_deleted(Transport, S, Stream) ->
346350
StreamSize = byte_size(Stream),
347351
FrameSize = 2 + 2 + 2 + 2 + StreamSize,

0 commit comments

Comments
 (0)