Skip to content

Commit 58dc8b9

Browse files
committed
Add test for stream filtering
1 parent 62c83b0 commit 58dc8b9

File tree

1 file changed

+47
-1
lines changed

1 file changed

+47
-1
lines changed

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ groups() ->
3535
[{single_node, [],
3636
[test_stream,
3737
test_stream_tls,
38+
test_publish_v2,
3839
test_gc_consumers,
3940
test_gc_publishers,
4041
unauthenticated_client_rejected_tcp_connected,
@@ -172,6 +173,36 @@ test_stream_tls(Config) ->
172173
test_server(ssl, Stream, Config),
173174
ok.
174175

176+
test_publish_v2(Config) ->
177+
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
178+
Transport = gen_tcp,
179+
Port = get_stream_port(Config),
180+
Opts = [{active, false}, {mode, binary}],
181+
{ok, S} =
182+
Transport:connect("localhost", Port, Opts),
183+
C0 = rabbit_stream_core:init(0),
184+
C1 = test_peer_properties(Transport, S, C0),
185+
C2 = test_authenticate(Transport, S, C1),
186+
C3 = test_create_stream(Transport, S, Stream, C2),
187+
PublisherId = 42,
188+
C4 = test_declare_publisher(Transport, S, PublisherId, Stream, C3),
189+
Body = <<"hello">>,
190+
C5 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body, C4),
191+
C6 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body, C5),
192+
SubscriptionId = 42,
193+
C7 = test_subscribe(Transport, S, SubscriptionId, Stream,
194+
#{<<"filter.0">> => <<"foo">>},
195+
C6),
196+
C8 = test_deliver(Transport, S, SubscriptionId, 0, Body, C7),
197+
C8b = test_deliver(Transport, S, SubscriptionId, 1, Body, C8),
198+
199+
C9 = test_unsubscribe(Transport, S, SubscriptionId, C8b),
200+
201+
C10 = test_delete_stream(Transport, S, Stream, C9),
202+
_C11 = test_close(Transport, S, C10),
203+
closed = wait_for_socket_close(Transport, S, 10),
204+
ok.
205+
175206
test_metadata(Config) ->
176207
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
177208
Transport = gen_tcp,
@@ -618,10 +649,25 @@ test_declare_publisher(Transport, S, PublisherId, Stream, C0) ->
618649
C.
619650

620651
test_publish_confirm(Transport, S, PublisherId, Body, C0) ->
652+
test_publish_confirm(Transport, S, publish, PublisherId, Body, C0).
653+
654+
test_publish_confirm(Transport, S, publish = PublishCmd, PublisherId, Body, C0) ->
621655
BodySize = byte_size(Body),
622656
Messages = [<<1:64, 0:1, BodySize:31, Body:BodySize/binary>>],
623657
PublishFrame =
624-
rabbit_stream_core:frame({publish, PublisherId, 1, Messages}),
658+
rabbit_stream_core:frame({PublishCmd, PublisherId, 1, Messages}),
659+
ok = Transport:send(S, PublishFrame),
660+
{Cmd, C} = receive_commands(Transport, S, C0),
661+
?assertMatch({publish_confirm, PublisherId, [1]}, Cmd),
662+
C;
663+
test_publish_confirm(Transport, S, publish_v2 = PublishCmd, PublisherId, Body, C0) ->
664+
BodySize = byte_size(Body),
665+
FilterValue = <<"foo">>,
666+
FilterValueSize = byte_size(FilterValue),
667+
Messages = [<<1:64, FilterValueSize:16, FilterValue:FilterValueSize/binary,
668+
0:1, BodySize:31, Body:BodySize/binary>>],
669+
PublishFrame =
670+
rabbit_stream_core:frame({PublishCmd, PublisherId, 1, Messages}),
625671
ok = Transport:send(S, PublishFrame),
626672
{Cmd, C} = receive_commands(Transport, S, C0),
627673
?assertMatch({publish_confirm, PublisherId, [1]}, Cmd),

0 commit comments

Comments
 (0)