179
179
open /3 ,
180
180
close_sent /3 ]).
181
181
182
- % % not called by gen_statem since gen_statem:enter_loop/4 is used
183
-
184
- % % states
185
-
186
182
callback_mode () ->
187
183
[state_functions , state_enter ].
188
184
@@ -1747,7 +1743,7 @@ handle_frame_post_auth(Transport,
1747
1743
{declare_publisher , PublisherId , WriterRef , Stream }}) ->
1748
1744
case rabbit_stream_utils :check_write_permitted (stream_r (Stream ,
1749
1745
Connection0 ),
1750
- User , #{} )
1746
+ User )
1751
1747
of
1752
1748
ok ->
1753
1749
case {maps :is_key (PublisherId , Publishers0 ),
@@ -1876,7 +1872,6 @@ handle_frame_post_auth(Transport,
1876
1872
handle_frame_post_auth (Transport ,
1877
1873
# stream_connection {socket = S ,
1878
1874
credits = Credits ,
1879
- virtual_host = VirtualHost ,
1880
1875
user = User ,
1881
1876
publishers = Publishers } =
1882
1877
Connection ,
@@ -1890,15 +1885,8 @@ handle_frame_post_auth(Transport,
1890
1885
message_counters = Counters } =
1891
1886
Publisher ,
1892
1887
increase_messages_received (Counters , MessageCount ),
1893
- case rabbit_stream_utils :check_write_permitted (# resource {name =
1894
- Stream ,
1895
- kind =
1896
- queue ,
1897
- virtual_host
1898
- =
1899
- VirtualHost },
1900
- User , #{})
1901
- of
1888
+ case rabbit_stream_utils :check_write_permitted (stream_r (Stream , Connection ),
1889
+ User ) of
1902
1890
ok ->
1903
1891
rabbit_stream_utils :write_messages (Version , Leader ,
1904
1892
Reference ,
@@ -2294,18 +2282,11 @@ handle_frame_post_auth(Transport,
2294
2282
{Connection , State }
2295
2283
end ;
2296
2284
handle_frame_post_auth (_Transport ,
2297
- # stream_connection {virtual_host = VirtualHost ,
2298
- user = User } =
2299
- Connection ,
2285
+ # stream_connection {user = User } = Connection ,
2300
2286
State ,
2301
2287
{store_offset , Reference , Stream , Offset }) ->
2302
- case rabbit_stream_utils :check_write_permitted (# resource {name =
2303
- Stream ,
2304
- kind = queue ,
2305
- virtual_host =
2306
- VirtualHost },
2307
- User , #{})
2308
- of
2288
+ case rabbit_stream_utils :check_write_permitted (stream_r (Stream , Connection ),
2289
+ User ) of
2309
2290
ok ->
2310
2291
case lookup_leader (Stream , Connection ) of
2311
2292
{error , Error } ->
@@ -2398,24 +2379,13 @@ handle_frame_post_auth(Transport,
2398
2379
end ;
2399
2380
handle_frame_post_auth (Transport ,
2400
2381
# stream_connection {virtual_host = VirtualHost ,
2401
- user =
2402
- # user {username = Username } =
2403
- User } =
2404
- Connection ,
2382
+ user = # user {username = Username } = User } = Connection ,
2405
2383
State ,
2406
2384
{request , CorrelationId ,
2407
2385
{create_stream , Stream , Arguments }}) ->
2408
2386
case rabbit_stream_utils :enforce_correct_name (Stream ) of
2409
2387
{ok , StreamName } ->
2410
- case rabbit_stream_utils :check_configure_permitted (# resource {name =
2411
- StreamName ,
2412
- kind =
2413
- queue ,
2414
- virtual_host
2415
- =
2416
- VirtualHost },
2417
- User , #{})
2418
- of
2388
+ case rabbit_stream_utils :check_configure_permitted (stream_r (StreamName , Connection ), User ) of
2419
2389
ok ->
2420
2390
case rabbit_stream_manager :create (VirtualHost ,
2421
2391
StreamName ,
@@ -2489,19 +2459,10 @@ handle_frame_post_auth(Transport,
2489
2459
handle_frame_post_auth (Transport ,
2490
2460
# stream_connection {socket = S ,
2491
2461
virtual_host = VirtualHost ,
2492
- user =
2493
- # user {username = Username } =
2494
- User } =
2495
- Connection ,
2462
+ user = # user {username = Username } = User } = Connection ,
2496
2463
State ,
2497
2464
{request , CorrelationId , {delete_stream , Stream }}) ->
2498
- case rabbit_stream_utils :check_configure_permitted (# resource {name =
2499
- Stream ,
2500
- kind = queue ,
2501
- virtual_host =
2502
- VirtualHost },
2503
- User , #{})
2504
- of
2465
+ case rabbit_stream_utils :check_configure_permitted (stream_r (Stream , Connection ), User ) of
2505
2466
ok ->
2506
2467
case rabbit_stream_manager :delete (VirtualHost , Stream , Username ) of
2507
2468
{ok , deleted } ->
@@ -2917,6 +2878,154 @@ handle_frame_post_auth(Transport,
2917
2878
Frame = rabbit_stream_core :frame ({response , CorrelationId , Response }),
2918
2879
send (Transport , S , Frame ),
2919
2880
{Connection , State };
2881
+ handle_frame_post_auth (Transport ,
2882
+ # stream_connection {virtual_host = VirtualHost ,
2883
+ user = # user {username = Username } = User } = Connection ,
2884
+ State ,
2885
+ {request , CorrelationId ,
2886
+ {create_super_stream , SuperStream , Partitions , RoutingKeys , Arguments }}) ->
2887
+ case rabbit_stream_utils :enforce_correct_name (SuperStream ) of
2888
+ {ok , SuperStreamName } ->
2889
+ case rabbit_stream_utils :check_super_stream_management_permitted (VirtualHost ,
2890
+ SuperStreamName ,
2891
+ Partitions ,
2892
+ User ) of
2893
+ ok ->
2894
+ case rabbit_stream_manager :create_super_stream (VirtualHost ,
2895
+ SuperStreamName ,
2896
+ Partitions ,
2897
+ Arguments ,
2898
+ RoutingKeys ,
2899
+ Username ) of
2900
+ ok ->
2901
+ rabbit_log :debug (" Created super stream ~tp " , [SuperStreamName ]),
2902
+ response_ok (Transport ,
2903
+ Connection ,
2904
+ create_super_stream ,
2905
+ CorrelationId ),
2906
+ {Connection , State };
2907
+ {error , {validation_failed , Msg }} ->
2908
+ rabbit_log :warning (" Error while trying to create super stream ~tp : ~tp " ,
2909
+ [SuperStreamName , Msg ]),
2910
+ response (Transport ,
2911
+ Connection ,
2912
+ create_super_stream ,
2913
+ CorrelationId ,
2914
+ ? RESPONSE_CODE_PRECONDITION_FAILED ),
2915
+ rabbit_global_counters :increase_protocol_counter (stream ,
2916
+ ? PRECONDITION_FAILED ,
2917
+ 1 ),
2918
+ {Connection , State };
2919
+ {error , {reference_already_exists , Msg }} ->
2920
+ rabbit_log :warning (" Error while trying to create super stream ~tp : ~tp " ,
2921
+ [SuperStreamName , Msg ]),
2922
+ response (Transport ,
2923
+ Connection ,
2924
+ create_super_stream ,
2925
+ CorrelationId ,
2926
+ ? RESPONSE_CODE_STREAM_ALREADY_EXISTS ),
2927
+ rabbit_global_counters :increase_protocol_counter (stream ,
2928
+ ? STREAM_ALREADY_EXISTS ,
2929
+ 1 ),
2930
+ {Connection , State };
2931
+ {error , Error } ->
2932
+ rabbit_log :warning (" Error while trying to create super stream ~tp : ~tp " ,
2933
+ [SuperStreamName , Error ]),
2934
+ response (Transport ,
2935
+ Connection ,
2936
+ create_super_stream ,
2937
+ CorrelationId ,
2938
+ ? RESPONSE_CODE_INTERNAL_ERROR ),
2939
+ rabbit_global_counters :increase_protocol_counter (stream ,
2940
+ ? INTERNAL_ERROR ,
2941
+ 1 ),
2942
+ {Connection , State }
2943
+ end ;
2944
+ error ->
2945
+ response (Transport ,
2946
+ Connection ,
2947
+ create_super_stream ,
2948
+ CorrelationId ,
2949
+ ? RESPONSE_CODE_ACCESS_REFUSED ),
2950
+ rabbit_global_counters :increase_protocol_counter (stream ,
2951
+ ? ACCESS_REFUSED ,
2952
+ 1 ),
2953
+ {Connection , State }
2954
+ end ;
2955
+ _ ->
2956
+ response (Transport ,
2957
+ Connection ,
2958
+ create_super_stream ,
2959
+ CorrelationId ,
2960
+ ? RESPONSE_CODE_PRECONDITION_FAILED ),
2961
+ rabbit_global_counters :increase_protocol_counter (stream ,
2962
+ ? PRECONDITION_FAILED ,
2963
+ 1 ),
2964
+ {Connection , State }
2965
+ end ;
2966
+ handle_frame_post_auth (Transport ,
2967
+ # stream_connection {socket = S ,
2968
+ virtual_host = VirtualHost ,
2969
+ user = # user {username = Username } = User } = Connection ,
2970
+ State ,
2971
+ {request , CorrelationId , {delete_super_stream , SuperStream }}) ->
2972
+ Partitions = case rabbit_stream_manager :partitions (VirtualHost , SuperStream ) of
2973
+ {ok , Ps } ->
2974
+ Ps ;
2975
+ _ ->
2976
+ []
2977
+ end ,
2978
+ case rabbit_stream_utils :check_super_stream_management_permitted (VirtualHost ,
2979
+ SuperStream ,
2980
+ Partitions ,
2981
+ User ) of
2982
+ ok ->
2983
+ case rabbit_stream_manager :delete_super_stream (VirtualHost , SuperStream , Username ) of
2984
+ ok ->
2985
+ response_ok (Transport ,
2986
+ Connection ,
2987
+ delete_super_stream ,
2988
+ CorrelationId ),
2989
+ {Connection1 , State1 } = clean_state_after_super_stream_deletion (Partitions ,
2990
+ Connection ,
2991
+ State ,
2992
+ Transport , S ),
2993
+ {Connection1 , State1 };
2994
+ {error , stream_not_found } ->
2995
+ response (Transport ,
2996
+ Connection ,
2997
+ delete_super_stream ,
2998
+ CorrelationId ,
2999
+ ? RESPONSE_CODE_STREAM_DOES_NOT_EXIST ),
3000
+ rabbit_global_counters :increase_protocol_counter (stream ,
3001
+ ? STREAM_DOES_NOT_EXIST ,
3002
+ 1 ),
3003
+ {Connection , State };
3004
+ {error , Error } ->
3005
+ rabbit_log :warning (" Error while trying to delete super stream ~tp : ~tp " ,
3006
+ [SuperStream , Error ]),
3007
+ response (Transport ,
3008
+ Connection ,
3009
+ delete_super_stream ,
3010
+ CorrelationId ,
3011
+ ? RESPONSE_CODE_PRECONDITION_FAILED ),
3012
+ rabbit_global_counters :increase_protocol_counter (stream ,
3013
+ ? PRECONDITION_FAILED ,
3014
+ 1 ),
3015
+ {Connection , State }
3016
+
3017
+ end ;
3018
+ error ->
3019
+ response (Transport ,
3020
+ Connection ,
3021
+ delete_stream ,
3022
+ CorrelationId ,
3023
+ ? RESPONSE_CODE_ACCESS_REFUSED ),
3024
+ rabbit_global_counters :increase_protocol_counter (stream ,
3025
+ ? ACCESS_REFUSED ,
3026
+ 1 ),
3027
+ {Connection , State }
3028
+ end ;
2920
3029
handle_frame_post_auth (Transport ,
2921
3030
# stream_connection {socket = S } = Connection ,
2922
3031
State ,
@@ -3248,6 +3357,27 @@ stream_r(Stream, #stream_connection{virtual_host = VHost}) ->
3248
3357
kind = queue ,
3249
3358
virtual_host = VHost }.
3250
3359
3360
+ clean_state_after_super_stream_deletion (Partitions , Connection , State , Transport , S ) ->
3361
+ lists :foldl (fun (Partition , {Conn , St }) ->
3362
+ case
3363
+ clean_state_after_stream_deletion_or_failure (undefined , Partition ,
3364
+ Conn ,
3365
+ St )
3366
+ of
3367
+ {cleaned , NewConnection , NewState } ->
3368
+ Command = {metadata_update , Partition ,
3369
+ ? RESPONSE_CODE_STREAM_NOT_AVAILABLE },
3370
+ Frame = rabbit_stream_core :frame (Command ),
3371
+ send (Transport , S , Frame ),
3372
+ rabbit_global_counters :increase_protocol_counter (stream ,
3373
+ ? STREAM_NOT_AVAILABLE ,
3374
+ 1 ),
3375
+ {NewConnection , NewState };
3376
+ {not_cleaned , SameConnection , SameState } ->
3377
+ {SameConnection , SameState }
3378
+ end
3379
+ end , {Connection , State }, Partitions ).
3380
+
3251
3381
clean_state_after_stream_deletion_or_failure (MemberPid , Stream ,
3252
3382
# stream_connection {virtual_host =
3253
3383
VirtualHost ,
0 commit comments