Skip to content

Commit c4e459e

Browse files
Merge pull request #8802 from rabbitmq/ik-stomp-frame-size-limit
STOMP: introduce a max frame size limit
2 parents d2762ff + 4cac6a9 commit c4e459e

File tree

10 files changed

+137
-50
lines changed

10 files changed

+137
-50
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ rabbitmq-server-*.tar.xz
8686
rabbitmq-server-*.zip
8787

8888
traces*
89+
deps/rabbitmq_stomp/test/python_SUITE_data/src/deps
8990
callgrand*
9091

9192
/user.bazelrc

deps/rabbitmq_stomp/include/rabbit_stomp.hrl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,5 @@
4040
ssl_hash]).
4141

4242
-define(STOMP_GUIDE_URL, <<"https://rabbitmq.com/stomp.html">>).
43+
44+
-define(DEFAULT_MAX_FRAME_SIZE, 4 * 1024 * 1024).

deps/rabbitmq_stomp/priv/schema/rabbitmq_stomp.schema

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,3 +238,12 @@ end}.
238238

239239
{mapping, "stomp.default_nack_requeue", "rabbitmq_stomp.default_nack_requeue",
240240
[{datatype, {enum, [true, false]}}]}.
241+
242+
243+
244+
%% Maximum frame size.
245+
%%
246+
%% Defaults to 192Kb.
247+
248+
{mapping, "stomp.max_frame_size", "rabbitmq_stomp.max_frame_size",
249+
[{datatype, integer}, {validators, ["non_negative_integer"]}]}.

deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,24 @@
2424
-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, garbage_collection, state,
2525
timeout]).
2626

27-
-record(reader_state, {socket, conn_name, parse_state, processor_state, state,
28-
conserve_resources, recv_outstanding, stats_timer,
29-
parent, connection, heartbeat_sup, heartbeat,
30-
timeout_sec %% heartbeat timeout value used, 0 means
31-
%% heartbeats are disabled
32-
}).
27+
-record(reader_state, {
28+
socket,
29+
conn_name,
30+
parse_state,
31+
processor_state,
32+
state,
33+
conserve_resources,
34+
recv_outstanding,
35+
max_frame_size,
36+
current_frame_size,
37+
stats_timer,
38+
parent,
39+
connection,
40+
heartbeat_sup, heartbeat,
41+
%% heartbeat timeout value used, 0 means
42+
%% heartbeats are disabled
43+
timeout_sec
44+
}).
3345

3446
%%----------------------------------------------------------------------------
3547

@@ -69,6 +81,7 @@ init([SupHelperPid, Ref, Configuration]) ->
6981
_ = register_resource_alarm(),
7082

7183
LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000),
84+
MaxFrameSize = application:get_env(rabbitmq_stomp, max_frame_size, ?DEFAULT_MAX_FRAME_SIZE),
7285
erlang:send_after(LoginTimeout, self(), login_timeout),
7386

7487
gen_server2:enter_loop(?MODULE, [],
@@ -80,6 +93,8 @@ init([SupHelperPid, Ref, Configuration]) ->
8093
processor_state = ProcState,
8194
heartbeat_sup = SupHelperPid,
8295
heartbeat = {none, none},
96+
max_frame_size = MaxFrameSize,
97+
current_frame_size = 0,
8398
state = running,
8499
conserve_resources = false,
85100
recv_outstanding = false})), #reader_state.stats_timer),
@@ -222,23 +237,41 @@ process_received_bytes([], State) ->
222237
{ok, State};
223238
process_received_bytes(Bytes,
224239
State = #reader_state{
225-
processor_state = ProcState,
226-
parse_state = ParseState}) ->
240+
max_frame_size = MaxFrameSize,
241+
current_frame_size = FrameLength,
242+
processor_state = ProcState,
243+
parse_state = ParseState}) ->
227244
case rabbit_stomp_frame:parse(Bytes, ParseState) of
228245
{more, ParseState1} ->
229-
{ok, State#reader_state{parse_state = ParseState1}};
246+
FrameLength1 = FrameLength + byte_size(Bytes),
247+
case FrameLength1 > MaxFrameSize of
248+
true ->
249+
log_reason({network_error, {frame_too_big, {FrameLength1, MaxFrameSize}}}, State),
250+
{stop, normal, State};
251+
false ->
252+
{ok, State#reader_state{parse_state = ParseState1,
253+
current_frame_size = FrameLength1}}
254+
end;
230255
{ok, Frame, Rest} ->
231-
case rabbit_stomp_processor:process_frame(Frame, ProcState) of
232-
{ok, NewProcState, Conn} ->
233-
PS = rabbit_stomp_frame:initial_state(),
234-
NextState = maybe_block(State, Frame),
235-
process_received_bytes(Rest, NextState#reader_state{
236-
processor_state = NewProcState,
237-
parse_state = PS,
238-
connection = Conn});
239-
{stop, Reason, NewProcState} ->
240-
{stop, Reason,
241-
processor_state(NewProcState, State)}
256+
FrameLength1 = FrameLength + byte_size(Bytes) - byte_size(Rest),
257+
case FrameLength1 > MaxFrameSize of
258+
true ->
259+
log_reason({network_error, {frame_too_big, {FrameLength1, MaxFrameSize}}}, State),
260+
{stop, normal, State};
261+
false ->
262+
case rabbit_stomp_processor:process_frame(Frame, ProcState) of
263+
{ok, NewProcState, Conn} ->
264+
PS = rabbit_stomp_frame:initial_state(),
265+
NextState = maybe_block(State, Frame),
266+
process_received_bytes(Rest, NextState#reader_state{
267+
current_frame_size = 0,
268+
processor_state = NewProcState,
269+
parse_state = PS,
270+
connection = Conn});
271+
{stop, Reason, NewProcState} ->
272+
{stop, Reason,
273+
processor_state(NewProcState, State)}
274+
end
242275
end;
243276
{error, Reason} ->
244277
%% The parser couldn't parse data. We log the reason right

deps/rabbitmq_stomp/test/connections_SUITE.erl

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ all() ->
2222
stats_are_not_leaked,
2323
stats,
2424
heartbeat,
25-
login_timeout
25+
login_timeout,
26+
frame_size,
27+
frame_size_huge
2628
].
2729

2830
merge_app_env(Config) ->
@@ -174,3 +176,36 @@ login_timeout(Config) ->
174176
Config, 0,
175177
application, unset_env, [rabbitmq_stomp, login_timeout])
176178
end.
179+
180+
frame_size(Config) ->
181+
rabbit_ct_broker_helpers:rpc(
182+
Config, 0,
183+
application, set_env, [rabbitmq_stomp, max_frame_size, 80]),
184+
StompPort = get_stomp_port(Config),
185+
{ok, Client} = rabbit_stomp_client:connect("1.2", "guest", "guest", StompPort,
186+
[{"heart-beat", "5000,7000"}]),
187+
ok = rabbit_stomp_client:send(
188+
Client, "SEND", [{"destination", "qwe"}],
189+
["Lorem ipsum dolor sit amet viverra fusce. "
190+
"Lorem ipsum dolor sit amet viverra fusce. "
191+
"Lorem ipsum dolor sit amet viverra fusce."
192+
"Lorem ipsum dolor sit amet viverra fusce."
193+
"Lorem ipsum dolor sit amet viverra fusce."]),
194+
{S, _} = Client,
195+
{error, closed} = gen_tcp:recv(S, 0, 500),
196+
ok.
197+
198+
199+
frame_size_huge(Config) ->
200+
rabbit_ct_broker_helpers:rpc(
201+
Config, 0,
202+
application, set_env, [rabbitmq_stomp, max_frame_size, 700]),
203+
StompPort = get_stomp_port(Config),
204+
{ok, Client} = rabbit_stomp_client:connect("1.2", "guest", "guest", StompPort,
205+
[{"heart-beat", "5000,7000"}]),
206+
rabbit_stomp_client:send(
207+
Client, "SEND", [{"destination", "qwe"}],
208+
[base64:encode(crypto:strong_rand_bytes(100000000))]),
209+
{S, _} = Client,
210+
{error, closed} = gen_tcp:recv(S, 0, 500),
211+
ok.

deps/rabbitmq_stomp/test/python_SUITE.erl

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,17 @@ groups() ->
3030
]}
3131
].
3232

33+
init_per_suite(Config) ->
34+
DataDir = ?config(data_dir, Config),
35+
{ok, _} = rabbit_ct_helpers:exec(["pip", "install", "-r", requirements_path(Config),
36+
"--target", deps_path(Config)]),
37+
Config.
38+
39+
end_per_suite(Config) ->
40+
DataDir = ?config(data_dir, Config),
41+
ok = file:del_dir_r(deps_path(Config)),
42+
Config.
43+
3344
init_per_group(_, Config) ->
3445
Config1 = rabbit_ct_helpers:set_config(Config,
3546
[
@@ -54,6 +65,9 @@ end_per_testcase(Test, Config) ->
5465

5566

5667
main(Config) ->
68+
rabbit_ct_broker_helpers:rpc(
69+
Config, 0,
70+
application, set_env, [rabbitmq_stomp, max_frame_size, 17 * 1024 * 1024]),
5771
run(Config, filename:join("src", "main_runner.py")).
5872

5973
implicit_connect(Config) ->
@@ -64,7 +78,6 @@ tls_connections(Config) ->
6478

6579

6680
run(Config, Test) ->
67-
DataDir = ?config(data_dir, Config),
6881
CertsDir = rabbit_ct_helpers:get_config(Config, rmq_certsdir),
6982
StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
7083
StompPortTls = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp_tls),
@@ -75,8 +88,26 @@ run(Config, Test) ->
7588
os:putenv("STOMP_PORT_TLS", integer_to_list(StompPortTls)),
7689
os:putenv("RABBITMQ_NODENAME", atom_to_list(NodeName)),
7790
os:putenv("SSL_CERTS_PATH", CertsDir),
78-
{ok, _} = rabbit_ct_helpers:exec([filename:join(DataDir, Test)]).
91+
run_python(Config, Test).
92+
93+
run_python(Config, What) ->
94+
DataDir = ?config(data_dir, Config),
95+
os:putenv("PYTHONPATH", python_path(Config)),
96+
{ok, _} = rabbit_ct_helpers:exec([filename:join(DataDir, What)]).
97+
98+
deps_path(Config) ->
99+
DataDir = ?config(data_dir, Config),
100+
filename:join([DataDir, "src", "deps"]).
101+
102+
requirements_path(Config) ->
103+
DataDir = ?config(data_dir, Config),
104+
filename:join([DataDir, "src", "requirements.txt"]).
79105

106+
python_path(Config) ->
107+
case os:getenv("PYTHONPATH") of
108+
false -> deps_path(Config);
109+
P -> deps_path(Config) ++ ":" ++ P
110+
end.
80111

81112
cur_dir() ->
82113
{ok, Src} = filelib:find_source(?MODULE),

deps/rabbitmq_stomp/test/python_SUITE_data/src/main_runner.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,5 @@
11
#!/usr/bin/env python3
22

3-
import sys
4-
import subprocess
5-
6-
# implement pip as a subprocess:
7-
subprocess.check_call([sys.executable, '-m', 'pip', 'install',
8-
'stomp.py==8.1.0'])
9-
subprocess.check_call([sys.executable, '-m', 'pip', 'install',
10-
'pika==1.1.0'])
11-
123
import test_runner
134

145
if __name__ == '__main__':
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
stomp.py==8.1.0
2+
pika==1.1.0
3+

deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,6 @@
77
## Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
88
##
99

10-
import sys
11-
import subprocess
12-
13-
# implement pip as a subprocess:
14-
subprocess.check_call([sys.executable, '-m', 'pip', 'install',
15-
'stomp.py==8.1.0'])
16-
subprocess.check_call([sys.executable, '-m', 'pip', 'install',
17-
'pika==1.1.0'])
18-
1910
import unittest
2011
import sys
2112
import os

deps/rabbitmq_stomp/test/python_SUITE_data/src/tls_runner.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,6 @@
77
## Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
88
##
99

10-
import sys
11-
import subprocess
12-
13-
# implement pip as a subprocess:
14-
subprocess.check_call([sys.executable, '-m', 'pip', 'install',
15-
'stomp.py==8.1.0'])
16-
subprocess.check_call([sys.executable, '-m', 'pip', 'install',
17-
'pika==1.1.0'])
18-
1910
import test_runner
2011
import test_util
2112

0 commit comments

Comments
 (0)