26
26
-compile (nowarn_export_all ).
27
27
-compile (export_all ).
28
28
29
+ -import (rabbit_stream_core , [frame /1 ]).
30
+
29
31
-define (WAIT , 5000 ).
30
32
31
33
all () ->
@@ -37,6 +39,7 @@ groups() ->
37
39
test_stream ,
38
40
test_stream_tls ,
39
41
test_publish_v2 ,
42
+ test_super_stream_creation_deletion ,
40
43
test_gc_consumers ,
41
44
test_gc_publishers ,
42
45
test_update_secret ,
@@ -306,6 +309,54 @@ test_publish_v2(Config) ->
306
309
closed = wait_for_socket_close (Transport , S , 10 ),
307
310
ok .
308
311
312
+
313
+ test_super_stream_creation_deletion (Config ) ->
314
+ T = gen_tcp ,
315
+ Port = get_port (T , Config ),
316
+ Opts = get_opts (T ),
317
+ {ok , S } = T :connect (" localhost" , Port , Opts ),
318
+ C = rabbit_stream_core :init (0 ),
319
+ test_peer_properties (T , S , C ),
320
+ test_authenticate (T , S , C ),
321
+
322
+ Ss = atom_to_binary (? FUNCTION_NAME , utf8 ),
323
+ Partitions = [unicode :characters_to_binary ([Ss , <<" -" >>, integer_to_binary (N )]) || N <- lists :seq (0 , 2 )],
324
+ Rks = [integer_to_binary (N ) || N <- lists :seq (0 , 2 )],
325
+ SsCreationFrame = frame ({request , 1 , {create_super_stream , Ss , Partitions , Rks , #{}}}),
326
+ ok = T :send (S , SsCreationFrame ),
327
+ {Cmd1 , _ } = receive_commands (T , S , C ),
328
+ ? assertMatch ({response , 1 , {create_super_stream , ? RESPONSE_CODE_OK }},
329
+ Cmd1 ),
330
+
331
+ PartitionsFrame = frame ({request , 1 , {partitions , Ss }}),
332
+ ok = T :send (S , PartitionsFrame ),
333
+ {Cmd2 , _ } = receive_commands (T , S , C ),
334
+ ? assertMatch ({response , 1 , {partitions , ? RESPONSE_CODE_OK , Partitions }},
335
+ Cmd2 ),
336
+ [begin
337
+ RouteFrame = frame ({request , 1 , {route , Rk , Ss }}),
338
+ ok = T :send (S , RouteFrame ),
339
+ {Command , _ } = receive_commands (T , S , C ),
340
+ ? assertMatch ({response , 1 , {route , ? RESPONSE_CODE_OK , _ }}, Command ),
341
+ {response , 1 , {route , ? RESPONSE_CODE_OK , [P ]}} = Command ,
342
+ ? assertEqual (unicode :characters_to_binary ([Ss , <<" -" >>, Rk ]), P )
343
+ end || Rk <- Rks ],
344
+
345
+ SsDeletionFrame = frame ({request , 1 , {delete_super_stream , Ss }}),
346
+ ok = T :send (S , SsDeletionFrame ),
347
+ {Cmd3 , _ } = receive_commands (T , S , C ),
348
+ ? assertMatch ({response , 1 , {delete_super_stream , ? RESPONSE_CODE_OK }},
349
+ Cmd3 ),
350
+
351
+ ok = T :send (S , PartitionsFrame ),
352
+ {Cmd4 , _ } = receive_commands (T , S , C ),
353
+ ? assertMatch ({response , 1 , {partitions , ? RESPONSE_CODE_STREAM_DOES_NOT_EXIST , []}},
354
+ Cmd4 ),
355
+
356
+ test_close (T , S , C ),
357
+ closed = wait_for_socket_close (T , S , 10 ),
358
+ ok .
359
+
309
360
test_metadata (Config ) ->
310
361
Stream = atom_to_binary (? FUNCTION_NAME , utf8 ),
311
362
Transport = gen_tcp ,
0 commit comments