Skip to content

Commit 8647967

Browse files
kjnilssonacogoluegnes
authored andcommitted
Make filter size configurable
as a queue arg and policy
1 parent 58dc8b9 commit 8647967

File tree

7 files changed

+81
-41
lines changed

7 files changed

+81
-41
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
%% This Source Code Form is subject to the terms of the Mozilla Public
1+
% This Source Code Form is subject to the terms of the Mozilla Public
22
%% License, v. 2.0. If a copy of the MPL was not distributed with this
33
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
%%
@@ -834,26 +834,27 @@ check_arguments_key(QueueName, QueueType, Args, InvalidArgs) ->
834834
end, Args).
835835

836836
declare_args() ->
837-
[{<<"x-expires">>, fun check_expires_arg/2},
838-
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
839-
{<<"x-dead-letter-exchange">>, fun check_dlxname_arg/2},
837+
[{<<"x-expires">>, fun check_expires_arg/2},
838+
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
839+
{<<"x-dead-letter-exchange">>, fun check_dlxname_arg/2},
840840
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
841-
{<<"x-dead-letter-strategy">>, fun check_dlxstrategy_arg/2},
842-
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
843-
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
844-
{<<"x-max-in-memory-length">>, fun check_non_neg_int_arg/2},
845-
{<<"x-max-in-memory-bytes">>, fun check_non_neg_int_arg/2},
846-
{<<"x-max-priority">>, fun check_max_priority_arg/2},
847-
{<<"x-overflow">>, fun check_overflow/2},
848-
{<<"x-queue-mode">>, fun check_queue_mode/2},
849-
{<<"x-queue-version">>, fun check_queue_version/2},
850-
{<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2},
851-
{<<"x-queue-type">>, fun check_queue_type/2},
852-
{<<"x-quorum-initial-group-size">>, fun check_initial_cluster_size_arg/2},
853-
{<<"x-max-age">>, fun check_max_age_arg/2},
854-
{<<"x-stream-max-segment-size-bytes">>, fun check_non_neg_int_arg/2},
855-
{<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2},
856-
{<<"x-queue-leader-locator">>, fun check_queue_leader_locator_arg/2}].
841+
{<<"x-dead-letter-strategy">>, fun check_dlxstrategy_arg/2},
842+
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
843+
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
844+
{<<"x-max-in-memory-length">>, fun check_non_neg_int_arg/2},
845+
{<<"x-max-in-memory-bytes">>, fun check_non_neg_int_arg/2},
846+
{<<"x-max-priority">>, fun check_max_priority_arg/2},
847+
{<<"x-overflow">>, fun check_overflow/2},
848+
{<<"x-queue-mode">>, fun check_queue_mode/2},
849+
{<<"x-queue-version">>, fun check_queue_version/2},
850+
{<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2},
851+
{<<"x-queue-type">>, fun check_queue_type/2},
852+
{<<"x-quorum-initial-group-size">>, fun check_initial_cluster_size_arg/2},
853+
{<<"x-max-age">>, fun check_max_age_arg/2},
854+
{<<"x-stream-max-segment-size-bytes">>, fun check_non_neg_int_arg/2},
855+
{<<"x-stream-filter-size-bytes">>, fun check_non_neg_int_arg/2},
856+
{<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2},
857+
{<<"x-queue-leader-locator">>, fun check_queue_leader_locator_arg/2}].
857858

858859
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
859860
{<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2},

deps/rabbit/src/rabbit_policies.erl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ register() ->
4444
{policy_validator, <<"delivery-limit">>},
4545
{policy_validator, <<"max-age">>},
4646
{policy_validator, <<"stream-max-segment-size-bytes">>},
47+
{policy_validator, <<"stream-filter-size-bytes">>},
4748
{policy_validator, <<"queue-leader-locator">>},
4849
{policy_validator, <<"initial-cluster-size">>},
4950
{operator_policy_validator, <<"expires">>},
@@ -195,7 +196,13 @@ validate_policy0(<<"stream-max-segment-size-bytes">>, Value)
195196
when is_integer(Value), Value >= 0, Value =< ?MAX_STREAM_MAX_SEGMENT_SIZE ->
196197
ok;
197198
validate_policy0(<<"stream-max-segment-size-bytes">>, Value) ->
198-
{error, "~tp is not a valid segment size", [Value]}.
199+
{error, "~tp is not a valid segment size", [Value]};
200+
201+
validate_policy0(<<"stream-filter-size-bytes">>, Value)
202+
when is_integer(Value), Value >= 16, Value =< 255 ->
203+
ok;
204+
validate_policy0(<<"stream-filter-size-bytes">>, Value) ->
205+
{error, "~tp is not a valid filter size. Valid range is 16-255", [Value]}.
199206

200207
merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal);
201208
merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal);

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ declare(Q0, _Node) when ?amqqueue_is_stream(Q0) ->
110110
[fun rabbit_queue_type_util:check_auto_delete/1,
111111
fun rabbit_queue_type_util:check_exclusive/1,
112112
fun rabbit_queue_type_util:check_non_durable/1,
113-
fun rabbit_stream_queue:check_max_segment_size_bytes/1],
113+
fun check_max_segment_size_bytes/1,
114+
fun check_filter_size/1
115+
],
114116
Q0) of
115117
ok ->
116118
create_stream(Q0);
@@ -130,6 +132,18 @@ check_max_segment_size_bytes(Q) ->
130132
ok
131133
end.
132134

135+
check_filter_size(Q) ->
136+
Args = amqqueue:get_arguments(Q),
137+
case rabbit_misc:table_lookup(Args, <<"x-stream-filter-size-bytes">>) of
138+
undefined ->
139+
ok;
140+
{_Type, Val} when Val > 255 orelse Val < 16 ->
141+
{protocol_error, precondition_failed,
142+
"Invalid value for x-stream-filter-size-bytes", []};
143+
_ ->
144+
ok
145+
end.
146+
133147
create_stream(Q0) ->
134148
Arguments = amqqueue:get_arguments(Q0),
135149
QName = amqqueue:get_name(Q0),
@@ -865,34 +879,29 @@ delete_replica(VHost, Name, Node) ->
865879
make_stream_conf(Q) ->
866880
QName = amqqueue:get_name(Q),
867881
Name = stream_name(QName),
868-
%% MaxLength = args_policy_lookup(<<"max-length">>, policy_precedence/2, Q),
869-
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun policy_precedence/2, Q),
870-
MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun policy_precedence/2, Q)),
871-
MaxSegmentSizeBytes = args_policy_lookup(<<"stream-max-segment-size-bytes">>, fun policy_precedence/2, Q),
872882
Formatter = {?MODULE, format_osiris_event, [QName]},
873-
Retention = lists:filter(fun({_, R}) ->
874-
R =/= undefined
875-
end, [{max_bytes, MaxBytes},
876-
{max_age, MaxAge}]),
877-
add_if_defined(max_segment_size_bytes, MaxSegmentSizeBytes,
878-
#{reference => QName,
879-
name => Name,
880-
retention => Retention,
881-
event_formatter => Formatter,
882-
epoch => 1}).
883+
update_stream_conf(Q, #{reference => QName,
884+
name => Name,
885+
event_formatter => Formatter,
886+
epoch => 1}).
883887

884888
update_stream_conf(undefined, #{} = Conf) ->
885889
Conf;
886890
update_stream_conf(Q, #{} = Conf) when ?is_amqqueue(Q) ->
887891
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun policy_precedence/2, Q),
888892
MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun policy_precedence/2, Q)),
889-
MaxSegmentSizeBytes = args_policy_lookup(<<"stream-max-segment-size-bytes">>, fun policy_precedence/2, Q),
893+
MaxSegmentSizeBytes = args_policy_lookup(<<"stream-max-segment-size-bytes">>,
894+
fun policy_precedence/2, Q),
895+
FilterSizeBytes = args_policy_lookup(<<"stream-filter-size-bytes">>,
896+
fun policy_precedence/2, Q),
890897
Retention = lists:filter(fun({_, R}) ->
891898
R =/= undefined
892899
end, [{max_bytes, MaxBytes},
893900
{max_age, MaxAge}]),
894-
add_if_defined(max_segment_size_bytes, MaxSegmentSizeBytes,
895-
Conf#{retention => Retention}).
901+
add_if_defined(
902+
filter_size, FilterSizeBytes,
903+
add_if_defined(max_segment_size_bytes, MaxSegmentSizeBytes,
904+
Conf#{retention => Retention})).
896905

897906
add_if_defined(_, undefined, Map) ->
898907
Map;

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ all_tests() ->
8888
declare_invalid_properties,
8989
declare_server_named,
9090
declare_invalid_arg,
91+
declare_invalid_filter_size,
9192
consume_invalid_arg,
9293
declare_queue,
9394
delete_queue,
@@ -261,7 +262,11 @@ declare_args(Config) ->
261262
Q = ?config(queue_name, Config),
262263
?assertEqual({'queue.declare_ok', Q, 0, 0},
263264
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
264-
{<<"x-max-length-bytes">>, long, 2_000_000}])),
265+
{<<"x-max-length-bytes">>, long, 2_000_000},
266+
{<<"x-max-age">>, longstr, <<"10D">>},
267+
{<<"x-stream-max-segment-size-bytes">>, long, 5_000_000},
268+
{<<"x-stream-filter-size-bytes">>, long, 32}
269+
])),
265270
assert_queue_type(Server, Q, rabbit_stream_queue),
266271
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
267272

@@ -331,6 +336,17 @@ declare_invalid_arg(Config) ->
331336
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
332337
{<<"x-overflow">>, longstr, <<"reject-publish">>}])).
333338

339+
declare_invalid_filter_size(Config) ->
340+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
341+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
342+
Q = ?config(queue_name, Config),
343+
344+
ExpectedError = <<"PRECONDITION_FAILED - Invalid value for x-stream-filter-size-bytes">>,
345+
?assertExit(
346+
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
347+
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
348+
{<<"x-stream-filter-size-bytes">>, long, 256}])).
349+
334350
consume_invalid_arg(Config) ->
335351
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
336352
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
@@ -2232,7 +2248,8 @@ update_retention_policy(Config) ->
22322248
Q = ?config(queue_name, Config),
22332249
?assertEqual({'queue.declare_ok', Q, 0, 0},
22342250
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
2235-
{<<"x-stream-max-segment-size-bytes">>, long, 200}
2251+
{<<"x-stream-max-segment-size-bytes">>, long, 200},
2252+
{<<"x-stream-filter-size-bytes">>, long, 32}
22362253
])),
22372254
check_leader_and_replicas(Config, Servers),
22382255

deps/rabbitmq_management/priv/www/js/global.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,9 @@ var HELP = {
209209
'queue-stream-max-segment-size-bytes':
210210
'Total segment size for stream segments on disk.<br/>(Sets the x-stream-max-segment-size-bytes argument.)',
211211

212+
'queue-stream-max-segment-size-bytes':
213+
'Size of the filter data attached to each stream chunk.<br/>(Sets the x-stream-filter-size-bytes argument.)',
214+
212215
'queue-auto-delete':
213216
'If yes, the queue will delete itself after at least one consumer has connected, and then all consumers have disconnected.',
214217

deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@
141141
<span class="help" id="queue-max-age"></span> |
142142
<span class="argument-link" field="definition" key="stream-max-segment-size-bytes" type="number">Max segment size in bytes</span>
143143
<span class="help" id="queue-stream-max-segment-size-bytes"></span> |
144+
<span class="argument-link" field="definition" key="stream-filter-size-bytes" type="number">Filter size in bytes. Valid range: 16-255</span>
145+
<span class="help" id="queue-stream-filter-size-bytes"></span> |
144146
<span class="argument-link" field="definition" key="queue-leader-locator" type="string">Leader locator</span>
145147
<span class="help" id="queue-leader-locator"></span>
146148
</td>

deps/rabbitmq_management/priv/www/js/tmpl/queues.ejs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@
341341
<% if (queue_type == "stream") { %>
342342
<span class="argument-link" field="arguments" key="x-max-age" type="string">Max time retention</span><span class="help" id="queue-max-age"></span>
343343
| <span class="argument-link" field="arguments" key="x-stream-max-segment-size-bytes" type="number">Max segment size in bytes</span><span class="help" id="queue-stream-max-segment-size-bytes"></span>
344+
| <span class="argument-link" field="arguments" key="x-stream-filter-size-bytes" type="number">Filter size (per chunk) in bytes</span><span class="help" id="queue-stream-filter-size-bytes"></span>
344345
| <span class="argument-link" field="arguments" key="x-initial-cluster-size" type="number">Initial cluster size</span><span class="help" id="queue-initial-cluster-size"></span>
345346
<% } %>
346347
<% if (queue_type != "classic") { %>

0 commit comments

Comments
 (0)