Skip to content

Commit 40bf970

Browse files
committed
Add configurable stomp frame size limit, default to 192kb.
1 parent 3ce37e1 commit 40bf970

File tree

3 files changed

+82
-17
lines changed

3 files changed

+82
-17
lines changed

deps/rabbitmq_stomp/priv/schema/rabbitmq_stomp.schema

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,3 +238,12 @@ end}.
238238

239239
{mapping, "stomp.default_nack_requeue", "rabbitmq_stomp.default_nack_requeue",
240240
[{datatype, {enum, [true, false]}}]}.
241+
242+
243+
244+
%% Maximum frame size.
245+
%%
246+
%% Defaults to 192Kb.
247+
248+
{mapping, "stomp.max_frame_size", "rabbitmq_stomp.max_frame_size",
249+
[{datatype, integer}, {validators, ["non_negative_integer"]}]}.

deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
timeout]).
2626

2727
-record(reader_state, {socket, conn_name, parse_state, processor_state, state,
28-
conserve_resources, recv_outstanding, stats_timer,
29-
parent, connection, heartbeat_sup, heartbeat,
28+
conserve_resources, recv_outstanding, max_frame_length, frame_length,
29+
stats_timer, parent, connection, heartbeat_sup, heartbeat,
3030
timeout_sec %% heartbeat timeout value used, 0 means
3131
%% heartbeats are disabled
3232
}).
@@ -69,6 +69,7 @@ init([SupHelperPid, Ref, Configuration]) ->
6969
_ = register_resource_alarm(),
7070

7171
LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000),
72+
MaxFrameLength = application:get_env(rabbitmq_stomp, max_frame_length, 192 * 1024),
7273
erlang:send_after(LoginTimeout, self(), login_timeout),
7374

7475
gen_server2:enter_loop(?MODULE, [],
@@ -80,6 +81,8 @@ init([SupHelperPid, Ref, Configuration]) ->
8081
processor_state = ProcState,
8182
heartbeat_sup = SupHelperPid,
8283
heartbeat = {none, none},
84+
max_frame_length = MaxFrameLength,
85+
frame_length = 0,
8386
state = running,
8487
conserve_resources = false,
8588
recv_outstanding = false})), #reader_state.stats_timer),
@@ -222,23 +225,41 @@ process_received_bytes([], State) ->
222225
{ok, State};
223226
process_received_bytes(Bytes,
224227
State = #reader_state{
225-
processor_state = ProcState,
226-
parse_state = ParseState}) ->
228+
max_frame_length = MaxFrameLength,
229+
frame_length = FrameLength,
230+
processor_state = ProcState,
231+
parse_state = ParseState}) ->
227232
case rabbit_stomp_frame:parse(Bytes, ParseState) of
228233
{more, ParseState1} ->
229-
{ok, State#reader_state{parse_state = ParseState1}};
234+
FrameLength1 = FrameLength + byte_size(Bytes),
235+
case FrameLength1 > MaxFrameLength of
236+
true ->
237+
log_reason({network_error, {frame_too_big, {FrameLength1, MaxFrameLength}}}, State),
238+
{stop, normal, State};
239+
false ->
240+
{ok, State#reader_state{parse_state = ParseState1,
241+
frame_length = FrameLength1}}
242+
end;
230243
{ok, Frame, Rest} ->
231-
case rabbit_stomp_processor:process_frame(Frame, ProcState) of
232-
{ok, NewProcState, Conn} ->
233-
PS = rabbit_stomp_frame:initial_state(),
234-
NextState = maybe_block(State, Frame),
235-
process_received_bytes(Rest, NextState#reader_state{
236-
processor_state = NewProcState,
237-
parse_state = PS,
238-
connection = Conn});
239-
{stop, Reason, NewProcState} ->
240-
{stop, Reason,
241-
processor_state(NewProcState, State)}
244+
FrameLength1 = FrameLength + byte_size(Bytes) - byte_size(Rest),
245+
case FrameLength1 > MaxFrameLength of
246+
true ->
247+
log_reason({network_error, {frame_too_big, {FrameLength1, MaxFrameLength}}}, State),
248+
{stop, normal, State};
249+
false ->
250+
case rabbit_stomp_processor:process_frame(Frame, ProcState) of
251+
{ok, NewProcState, Conn} ->
252+
PS = rabbit_stomp_frame:initial_state(),
253+
NextState = maybe_block(State, Frame),
254+
process_received_bytes(Rest, NextState#reader_state{
255+
frame_length = 0,
256+
processor_state = NewProcState,
257+
parse_state = PS,
258+
connection = Conn});
259+
{stop, Reason, NewProcState} ->
260+
{stop, Reason,
261+
processor_state(NewProcState, State)}
262+
end
242263
end;
243264
{error, Reason} ->
244265
%% The parser couldn't parse data. We log the reason right

deps/rabbitmq_stomp/test/connections_SUITE.erl

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ all() ->
2222
stats_are_not_leaked,
2323
stats,
2424
heartbeat,
25-
login_timeout
25+
login_timeout,
26+
frame_size,
27+
frame_size_huge
2628
].
2729

2830
merge_app_env(Config) ->
@@ -174,3 +176,36 @@ login_timeout(Config) ->
174176
Config, 0,
175177
application, unset_env, [rabbitmq_stomp, login_timeout])
176178
end.
179+
180+
frame_size(Config) ->
181+
rabbit_ct_broker_helpers:rpc(
182+
Config, 0,
183+
application, set_env, [rabbitmq_stomp, max_frame_length, 80]),
184+
StompPort = get_stomp_port(Config),
185+
{ok, Client} = rabbit_stomp_client:connect("1.2", "guest", "guest", StompPort,
186+
[{"heart-beat", "5000,7000"}]),
187+
ok = rabbit_stomp_client:send(
188+
Client, "SEND", [{"destination", "qwe"}],
189+
["Lorem ipsum dolor sit amet viverra fusce. "
190+
"Lorem ipsum dolor sit amet viverra fusce. "
191+
"Lorem ipsum dolor sit amet viverra fusce."
192+
"Lorem ipsum dolor sit amet viverra fusce."
193+
"Lorem ipsum dolor sit amet viverra fusce."]),
194+
{S, _} = Client,
195+
{error, closed} = gen_tcp:recv(S, 0, 500),
196+
ok.
197+
198+
199+
frame_size_huge(Config) ->
200+
rabbit_ct_broker_helpers:rpc(
201+
Config, 0,
202+
application, set_env, [rabbitmq_stomp, max_frame_length, 700]),
203+
StompPort = get_stomp_port(Config),
204+
{ok, Client} = rabbit_stomp_client:connect("1.2", "guest", "guest", StompPort,
205+
[{"heart-beat", "5000,7000"}]),
206+
rabbit_stomp_client:send(
207+
Client, "SEND", [{"destination", "qwe"}],
208+
[base64:encode(crypto:strong_rand_bytes(100000000))]),
209+
{S, _} = Client,
210+
{error, closed} = gen_tcp:recv(S, 0, 500),
211+
ok.

0 commit comments

Comments
 (0)