Skip to content

Commit 9f7278a

Browse files
committed
Add super stream add/delete protocol commands
1 parent bd9546b commit 9f7278a

File tree

6 files changed

+321
-71
lines changed

6 files changed

+321
-71
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 179 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -179,10 +179,6 @@
179179
open/3,
180180
close_sent/3]).
181181

182-
%% not called by gen_statem since gen_statem:enter_loop/4 is used
183-
184-
%% states
185-
186182
callback_mode() ->
187183
[state_functions, state_enter].
188184

@@ -1747,7 +1743,7 @@ handle_frame_post_auth(Transport,
17471743
{declare_publisher, PublisherId, WriterRef, Stream}}) ->
17481744
case rabbit_stream_utils:check_write_permitted(stream_r(Stream,
17491745
Connection0),
1750-
User, #{})
1746+
User)
17511747
of
17521748
ok ->
17531749
case {maps:is_key(PublisherId, Publishers0),
@@ -1876,7 +1872,6 @@ handle_frame_post_auth(Transport,
18761872
handle_frame_post_auth(Transport,
18771873
#stream_connection{socket = S,
18781874
credits = Credits,
1879-
virtual_host = VirtualHost,
18801875
user = User,
18811876
publishers = Publishers} =
18821877
Connection,
@@ -1890,15 +1885,8 @@ handle_frame_post_auth(Transport,
18901885
message_counters = Counters} =
18911886
Publisher,
18921887
increase_messages_received(Counters, MessageCount),
1893-
case rabbit_stream_utils:check_write_permitted(#resource{name =
1894-
Stream,
1895-
kind =
1896-
queue,
1897-
virtual_host
1898-
=
1899-
VirtualHost},
1900-
User, #{})
1901-
of
1888+
case rabbit_stream_utils:check_write_permitted(stream_r(Stream, Connection),
1889+
User) of
19021890
ok ->
19031891
rabbit_stream_utils:write_messages(Version, Leader,
19041892
Reference,
@@ -2294,18 +2282,11 @@ handle_frame_post_auth(Transport,
22942282
{Connection, State}
22952283
end;
22962284
handle_frame_post_auth(_Transport,
2297-
#stream_connection{virtual_host = VirtualHost,
2298-
user = User} =
2299-
Connection,
2285+
#stream_connection{user = User} = Connection,
23002286
State,
23012287
{store_offset, Reference, Stream, Offset}) ->
2302-
case rabbit_stream_utils:check_write_permitted(#resource{name =
2303-
Stream,
2304-
kind = queue,
2305-
virtual_host =
2306-
VirtualHost},
2307-
User, #{})
2308-
of
2288+
case rabbit_stream_utils:check_write_permitted(stream_r(Stream, Connection),
2289+
User) of
23092290
ok ->
23102291
case lookup_leader(Stream, Connection) of
23112292
{error, Error} ->
@@ -2398,24 +2379,13 @@ handle_frame_post_auth(Transport,
23982379
end;
23992380
handle_frame_post_auth(Transport,
24002381
#stream_connection{virtual_host = VirtualHost,
2401-
user =
2402-
#user{username = Username} =
2403-
User} =
2404-
Connection,
2382+
user = #user{username = Username} = User} = Connection,
24052383
State,
24062384
{request, CorrelationId,
24072385
{create_stream, Stream, Arguments}}) ->
24082386
case rabbit_stream_utils:enforce_correct_name(Stream) of
24092387
{ok, StreamName} ->
2410-
case rabbit_stream_utils:check_configure_permitted(#resource{name =
2411-
StreamName,
2412-
kind =
2413-
queue,
2414-
virtual_host
2415-
=
2416-
VirtualHost},
2417-
User, #{})
2418-
of
2388+
case rabbit_stream_utils:check_configure_permitted(stream_r(StreamName, Connection), User) of
24192389
ok ->
24202390
case rabbit_stream_manager:create(VirtualHost,
24212391
StreamName,
@@ -2489,19 +2459,10 @@ handle_frame_post_auth(Transport,
24892459
handle_frame_post_auth(Transport,
24902460
#stream_connection{socket = S,
24912461
virtual_host = VirtualHost,
2492-
user =
2493-
#user{username = Username} =
2494-
User} =
2495-
Connection,
2462+
user = #user{username = Username} = User} = Connection,
24962463
State,
24972464
{request, CorrelationId, {delete_stream, Stream}}) ->
2498-
case rabbit_stream_utils:check_configure_permitted(#resource{name =
2499-
Stream,
2500-
kind = queue,
2501-
virtual_host =
2502-
VirtualHost},
2503-
User, #{})
2504-
of
2465+
case rabbit_stream_utils:check_configure_permitted(stream_r(Stream, Connection), User) of
25052466
ok ->
25062467
case rabbit_stream_manager:delete(VirtualHost, Stream, Username) of
25072468
{ok, deleted} ->
@@ -2917,6 +2878,154 @@ handle_frame_post_auth(Transport,
29172878
Frame = rabbit_stream_core:frame({response, CorrelationId, Response}),
29182879
send(Transport, S, Frame),
29192880
{Connection, State};
2881+
handle_frame_post_auth(Transport,
2882+
#stream_connection{virtual_host = VirtualHost,
2883+
user = #user{username = Username} = User} = Connection,
2884+
State,
2885+
{request, CorrelationId,
2886+
{create_super_stream, SuperStream, Partitions, RoutingKeys, Arguments}}) ->
2887+
case rabbit_stream_utils:enforce_correct_name(SuperStream) of
2888+
{ok, SuperStreamName} ->
2889+
case rabbit_stream_utils:check_super_stream_management_permitted(VirtualHost,
2890+
SuperStreamName,
2891+
Partitions,
2892+
User) of
2893+
ok ->
2894+
case rabbit_stream_manager:create_super_stream(VirtualHost,
2895+
SuperStreamName,
2896+
Partitions,
2897+
Arguments,
2898+
RoutingKeys,
2899+
Username) of
2900+
ok ->
2901+
rabbit_log:debug("Created super stream ~tp", [SuperStreamName]),
2902+
response_ok(Transport,
2903+
Connection,
2904+
create_super_stream,
2905+
CorrelationId),
2906+
{Connection, State};
2907+
{error, {validation_failed, Msg}} ->
2908+
rabbit_log:warning("Error while trying to create super stream ~tp: ~tp",
2909+
[SuperStreamName, Msg]),
2910+
response(Transport,
2911+
Connection,
2912+
create_super_stream,
2913+
CorrelationId,
2914+
?RESPONSE_CODE_PRECONDITION_FAILED),
2915+
rabbit_global_counters:increase_protocol_counter(stream,
2916+
?PRECONDITION_FAILED,
2917+
1),
2918+
{Connection, State};
2919+
{error, {reference_already_exists, Msg}} ->
2920+
rabbit_log:warning("Error while trying to create super stream ~tp: ~tp",
2921+
[SuperStreamName, Msg]),
2922+
response(Transport,
2923+
Connection,
2924+
create_super_stream,
2925+
CorrelationId,
2926+
?RESPONSE_CODE_STREAM_ALREADY_EXISTS),
2927+
rabbit_global_counters:increase_protocol_counter(stream,
2928+
?STREAM_ALREADY_EXISTS,
2929+
1),
2930+
{Connection, State};
2931+
{error, Error} ->
2932+
rabbit_log:warning("Error while trying to create super stream ~tp: ~tp",
2933+
[SuperStreamName, Error]),
2934+
response(Transport,
2935+
Connection,
2936+
create_super_stream,
2937+
CorrelationId,
2938+
?RESPONSE_CODE_INTERNAL_ERROR),
2939+
rabbit_global_counters:increase_protocol_counter(stream,
2940+
?INTERNAL_ERROR,
2941+
1),
2942+
{Connection, State}
2943+
end;
2944+
error ->
2945+
response(Transport,
2946+
Connection,
2947+
create_super_stream,
2948+
CorrelationId,
2949+
?RESPONSE_CODE_ACCESS_REFUSED),
2950+
rabbit_global_counters:increase_protocol_counter(stream,
2951+
?ACCESS_REFUSED,
2952+
1),
2953+
{Connection, State}
2954+
end;
2955+
_ ->
2956+
response(Transport,
2957+
Connection,
2958+
create_super_stream,
2959+
CorrelationId,
2960+
?RESPONSE_CODE_PRECONDITION_FAILED),
2961+
rabbit_global_counters:increase_protocol_counter(stream,
2962+
?PRECONDITION_FAILED,
2963+
1),
2964+
{Connection, State}
2965+
end;
2966+
handle_frame_post_auth(Transport,
2967+
#stream_connection{socket = S,
2968+
virtual_host = VirtualHost,
2969+
user = #user{username = Username} = User} = Connection,
2970+
State,
2971+
{request, CorrelationId, {delete_super_stream, SuperStream}}) ->
2972+
Partitions = case rabbit_stream_manager:partitions(VirtualHost, SuperStream) of
2973+
{ok, Ps} ->
2974+
Ps;
2975+
_ ->
2976+
[]
2977+
end,
2978+
case rabbit_stream_utils:check_super_stream_management_permitted(VirtualHost,
2979+
SuperStream,
2980+
Partitions,
2981+
User) of
2982+
ok ->
2983+
case rabbit_stream_manager:delete_super_stream(VirtualHost, SuperStream, Username) of
2984+
ok ->
2985+
response_ok(Transport,
2986+
Connection,
2987+
delete_super_stream,
2988+
CorrelationId),
2989+
{Connection1, State1} = clean_state_after_super_stream_deletion(Partitions,
2990+
Connection,
2991+
State,
2992+
Transport, S),
2993+
{Connection1, State1};
2994+
{error, stream_not_found} ->
2995+
response(Transport,
2996+
Connection,
2997+
delete_super_stream,
2998+
CorrelationId,
2999+
?RESPONSE_CODE_STREAM_DOES_NOT_EXIST),
3000+
rabbit_global_counters:increase_protocol_counter(stream,
3001+
?STREAM_DOES_NOT_EXIST,
3002+
1),
3003+
{Connection, State};
3004+
{error, Error} ->
3005+
rabbit_log:warning("Error while trying to delete super stream ~tp: ~tp",
3006+
[SuperStream, Error]),
3007+
response(Transport,
3008+
Connection,
3009+
delete_super_stream,
3010+
CorrelationId,
3011+
?RESPONSE_CODE_PRECONDITION_FAILED),
3012+
rabbit_global_counters:increase_protocol_counter(stream,
3013+
?PRECONDITION_FAILED,
3014+
1),
3015+
{Connection, State}
3016+
3017+
end;
3018+
error ->
3019+
response(Transport,
3020+
Connection,
3021+
delete_stream,
3022+
CorrelationId,
3023+
?RESPONSE_CODE_ACCESS_REFUSED),
3024+
rabbit_global_counters:increase_protocol_counter(stream,
3025+
?ACCESS_REFUSED,
3026+
1),
3027+
{Connection, State}
3028+
end;
29203029
handle_frame_post_auth(Transport,
29213030
#stream_connection{socket = S} = Connection,
29223031
State,
@@ -3248,6 +3357,27 @@ stream_r(Stream, #stream_connection{virtual_host = VHost}) ->
32483357
kind = queue,
32493358
virtual_host = VHost}.
32503359

3360+
clean_state_after_super_stream_deletion(Partitions, Connection, State, Transport, S) ->
3361+
lists:foldl(fun(Partition, {Conn, St}) ->
3362+
case
3363+
clean_state_after_stream_deletion_or_failure(undefined, Partition,
3364+
Conn,
3365+
St)
3366+
of
3367+
{cleaned, NewConnection, NewState} ->
3368+
Command = {metadata_update, Partition,
3369+
?RESPONSE_CODE_STREAM_NOT_AVAILABLE},
3370+
Frame = rabbit_stream_core:frame(Command),
3371+
send(Transport, S, Frame),
3372+
rabbit_global_counters:increase_protocol_counter(stream,
3373+
?STREAM_NOT_AVAILABLE,
3374+
1),
3375+
{NewConnection, NewState};
3376+
{not_cleaned, SameConnection, SameState} ->
3377+
{SameConnection, SameState}
3378+
end
3379+
end, {Connection, State}, Partitions).
3380+
32513381
clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
32523382
#stream_connection{virtual_host =
32533383
VirtualHost,

0 commit comments

Comments
 (0)