Skip to content

Commit 065844a

Browse files
committed
Support stream-filter-size-bytes in stream protocol
1 parent 8647967 commit 065844a

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,12 @@ stream_queue_arguments(ArgumentsAcc,
170170
Value}]
171171
++ ArgumentsAcc,
172172
maps:remove(<<"queue-leader-locator">>, Arguments));
173+
stream_queue_arguments(ArgumentsAcc,
174+
#{<<"stream-filter-size-bytes">> := Value} = Arguments) ->
175+
stream_queue_arguments([{<<"x-stream-filter-size-bytes">>, long,
176+
binary_to_integer(Value)}]
177+
++ ArgumentsAcc,
178+
maps:remove(<<"stream-filter-size-bytes">>, Arguments));
173179
stream_queue_arguments(ArgumentsAcc, _Arguments) ->
174180
ArgumentsAcc.
175181

@@ -191,6 +197,11 @@ validate_stream_queue_arguments([{<<"x-queue-leader-locator">>,
191197
false ->
192198
error
193199
end;
200+
validate_stream_queue_arguments([{<<"x-stream-filter-size-bytes">>, long,
201+
FilterSize}
202+
| _])
203+
when FilterSize < 16 orelse FilterSize > 255 ->
204+
error;
194205
validate_stream_queue_arguments([_ | T]) ->
195206
validate_stream_queue_arguments(T).
196207

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ groups() ->
4646
timeout_authenticating,
4747
timeout_close_sent,
4848
max_segment_size_bytes_validation,
49-
close_connection_on_consumer_update_timeout]},
49+
close_connection_on_consumer_update_timeout,
50+
set_filter_size]},
5051
%% Run `test_global_counters` on its own so the global metrics are
5152
%% initialised to 0 for each testcase
5253
{single_node_1, [], [test_global_counters]},
@@ -433,6 +434,37 @@ close_connection_on_consumer_update_timeout(Config) ->
433434
closed = wait_for_socket_close(Transport, Sb, 10),
434435
ok.
435436

437+
set_filter_size(Config) ->
438+
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
439+
Transport = gen_tcp,
440+
Port = get_stream_port(Config),
441+
Opts = [{active, false}, {mode, binary}],
442+
{ok, S} = Transport:connect("localhost", Port, Opts),
443+
C0 = rabbit_stream_core:init(0),
444+
C1 = test_peer_properties(Transport, S, C0),
445+
C2 = test_authenticate(Transport, S, C1),
446+
447+
Tests = [
448+
{128, ?RESPONSE_CODE_OK},
449+
{15, ?RESPONSE_CODE_PRECONDITION_FAILED},
450+
{256, ?RESPONSE_CODE_PRECONDITION_FAILED}
451+
],
452+
453+
C3 = lists:foldl(fun({Size, ExpectedResponseCode}, Conn0) ->
454+
Frame = rabbit_stream_core:frame(
455+
{request, 1,
456+
{create_stream, Stream,
457+
#{<<"stream-filter-size-bytes">> => integer_to_binary(Size)}}}),
458+
ok = Transport:send(S, Frame),
459+
{Cmd, Conn1} = receive_commands(Transport, S, Conn0),
460+
?assertMatch({response, 1, {create_stream, ExpectedResponseCode}}, Cmd),
461+
Conn1
462+
end, C2, Tests),
463+
464+
_ = test_close(Transport, S, C3),
465+
closed = wait_for_socket_close(Transport, S, 10),
466+
ok.
467+
436468
consumer_count(Config) ->
437469
ets_count(Config, ?TABLE_CONSUMER).
438470

0 commit comments

Comments
 (0)