Skip to content

Add super stream creation/deletion commands #9813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Nov 15, 2023
35 changes: 35 additions & 0 deletions deps/rabbitmq_stream/docs/PROTOCOL.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,16 @@ used to make the difference between a request (0) and a response (1). Example fo
|0x001c
|Yes

|<<createsuperstream>>
|Client
|0x001d
|Yes

|<<deletesuperstream>>
|Client
|0x001e
|Yes

|===

=== DeclarePublisher
Expand Down Expand Up @@ -754,6 +764,31 @@ StreamStatsResponse => Key Version CorrelationId ResponseCode Stats
Value => int64
```

=== CreateSuperStream

```
CreateSuperStream => Key Version CorrelationId Name [Partition] [BindingKey] Arguments
Key => uint16 // 0x001d
Version => uint16
CorrelationId => uint32
Name => string
Partition => string
BindingKey => string
Arguments => [Argument]
Argument => Key Value
Key => string
Value => string
```

=== DeleteSuperStream

```
Delete => Key Version CorrelationId Name
Key => uint16 // 0x001e
Version => uint16
CorrelationId => uint32
Name => string
```

== Authentication

Expand Down
33 changes: 18 additions & 15 deletions deps/rabbitmq_stream/src/rabbit_stream_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ create_super_stream(VirtualHost,
Name,
Partitions,
Arguments,
RoutingKeys,
BindingKeys,
Username) ->
gen_server:call(?MODULE,
{create_super_stream,
VirtualHost,
Name,
Partitions,
Arguments,
RoutingKeys,
BindingKeys,
Username}).

-spec delete_super_stream(binary(), binary(), binary()) ->
Expand Down Expand Up @@ -226,10 +226,10 @@ handle_call({create_super_stream,
Name,
Partitions,
Arguments,
RoutingKeys,
BindingKeys,
Username},
_From, State) ->
case validate_super_stream_creation(VirtualHost, Name, Partitions) of
case validate_super_stream_creation(VirtualHost, Name, Partitions, BindingKeys) of
{error, Reason} ->
{reply, {error, Reason}, State};
ok ->
Expand Down Expand Up @@ -273,7 +273,7 @@ handle_call({create_super_stream,
add_super_stream_bindings(VirtualHost,
Name,
Partitions,
RoutingKeys,
BindingKeys,
Username),
case BindingsResult of
ok ->
Expand Down Expand Up @@ -445,8 +445,8 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
end
catch
exit:Error ->
rabbit_log:error("Error while looking up exchange ~tp, ~tp",
[rabbit_misc:rs(ExchangeName), Error]),
rabbit_log:warning("Error while looking up exchange ~tp, ~tp",
[rabbit_misc:rs(ExchangeName), Error]),
{error, stream_not_found}
end,
{reply, Res, State};
Expand Down Expand Up @@ -655,7 +655,10 @@ super_stream_partitions(VirtualHost, SuperStream) ->
{error, stream_not_found}
end.

validate_super_stream_creation(VirtualHost, Name, Partitions) ->
validate_super_stream_creation(_VirtualHost, _Name, Partitions, BindingKeys)
when length(Partitions) =/= length(BindingKeys) ->
{error, {validation_failed, "There must be the same number of partitions and binding keys"}};
validate_super_stream_creation(VirtualHost, Name, Partitions, _BindingKeys) ->
case exchange_exists(VirtualHost, Name) of
{error, validation_failed} ->
{error,
Expand Down Expand Up @@ -758,15 +761,15 @@ declare_super_stream_exchange(VirtualHost, Name, Username) ->
add_super_stream_bindings(VirtualHost,
Name,
Partitions,
RoutingKeys,
BindingKeys,
Username) ->
PartitionsRoutingKeys = lists:zip(Partitions, RoutingKeys),
PartitionsBindingKeys = lists:zip(Partitions, BindingKeys),
BindingsResult =
lists:foldl(fun ({Partition, RoutingKey}, {ok, Order}) ->
lists:foldl(fun ({Partition, BindingKey}, {ok, Order}) ->
case add_super_stream_binding(VirtualHost,
Name,
Partition,
RoutingKey,
BindingKey,
Order,
Username)
of
Expand All @@ -778,7 +781,7 @@ add_super_stream_bindings(VirtualHost,
(_, {{error, _Reason}, _Order} = Acc) ->
Acc
end,
{ok, 0}, PartitionsRoutingKeys),
{ok, 0}, PartitionsBindingKeys),
case BindingsResult of
{ok, _} ->
ok;
Expand All @@ -789,7 +792,7 @@ add_super_stream_bindings(VirtualHost,
add_super_stream_binding(VirtualHost,
SuperStream,
Partition,
RoutingKey,
BindingKey,
Order,
Username) ->
{ok, ExchangeNameBin} =
Expand All @@ -806,7 +809,7 @@ add_super_stream_binding(VirtualHost,
Order),
case rabbit_binding:add(#binding{source = ExchangeName,
destination = QueueName,
key = RoutingKey,
key = BindingKey,
args = Arguments},
fun (_X, Q) when ?is_amqqueue(Q) ->
try
Expand Down
Loading