Skip to content

Commit f00a115

Browse files
michaelklishinmkuratczyk
authored andcommitted
Merge pull request #9813 from rabbitmq/super-stream-frames
Add super stream creation/deletion commands
2 parents 2adec32 + 5e1155c commit f00a115

24 files changed

+458
-115
lines changed

deps/rabbitmq_cli/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
PROJECT = rabbitmq_cli
22

33
BUILD_DEPS = rabbit_common
4-
DEPS = csv json observer_cli stdout_formatter
4+
DEPS = csv jason observer_cli stdout_formatter
55
TEST_DEPS = amqp amqp_client temp x509 rabbit
66

77
dep_amqp = hex 3.3.0
88
dep_csv = hex 3.2.0
9-
dep_json = hex 1.4.1
9+
dep_jason = hex 1.4.1
1010
dep_temp = hex 0.4.7
1111
dep_x509 = hex 0.8.8
1212

deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/export_definitions_command.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ExportDefinitionsCommand do
159159
end)
160160
end)
161161

162-
{:ok, json} = JSON.encode(map)
162+
{:ok, json} = Jason.encode(map)
163163
json
164164
end
165165

deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/import_definitions_command.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ImportDefinitionsCommand do
166166
#
167167

168168
defp deserialise(bin, "json") do
169-
JSON.decode(bin)
169+
Jason.decode(bin)
170170
end
171171

172172
defp deserialise(bin, "erlang") do

deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/list_user_limits_command.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListUserLimitsCommand do
3636

3737
val ->
3838
Enum.map(val, fn {user, val} ->
39-
{:ok, val_encoded} = JSON.encode(Map.new(val))
39+
{:ok, val_encoded} = Jason.encode(Map.new(val))
4040
[user: user, limits: val_encoded]
4141
end)
4242
end
@@ -56,7 +56,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListUserLimitsCommand do
5656
{:badrpc, node}
5757

5858
val when is_list(val) or is_map(val) ->
59-
{:ok, val_encoded} = JSON.encode(Map.new(val))
59+
{:ok, val_encoded} = Jason.encode(Map.new(val))
6060
val_encoded
6161
end
6262
end

deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/list_vhost_limits_command.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListVhostLimitsCommand do
3636

3737
val ->
3838
Enum.map(val, fn {vhost, val} ->
39-
{:ok, val_encoded} = JSON.encode(Map.new(val))
39+
{:ok, val_encoded} = Jason.encode(Map.new(val))
4040
[vhost: vhost, limits: val_encoded]
4141
end)
4242
end
@@ -54,7 +54,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListVhostLimitsCommand do
5454
{:badrpc, node}
5555

5656
val when is_list(val) or is_map(val) ->
57-
JSON.encode(Map.new(val))
57+
Jason.encode(Map.new(val))
5858
end
5959
end
6060

deps/rabbitmq_cli/lib/rabbitmq/cli/formatters/json.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ defmodule RabbitMQ.CLI.Formatters.Json do
1818
end
1919

2020
def format_output(output, _opts) do
21-
{:ok, json} = JSON.encode(keys_to_atoms(output))
21+
{:ok, json} = Jason.encode(keys_to_atoms(output))
2222
json
2323
end
2424

deps/rabbitmq_cli/lib/rabbitmq/cli/formatters/json_stream.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ defmodule RabbitMQ.CLI.Formatters.JsonStream do
3131
end
3232

3333
def format_output(output, _opts) do
34-
{:ok, json} = JSON.encode(keys_to_atoms(output))
34+
{:ok, json} = Jason.encode(keys_to_atoms(output))
3535
json
3636
end
3737

deps/rabbitmq_cli/lib/rabbitmqctl.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ defmodule RabbitMQCtl do
558558
end
559559

560560
defp format_error({:error, :check_failed, err}, %{formatter: "json"}, _) when is_map(err) do
561-
{:ok, res} = JSON.encode(err)
561+
{:ok, res} = Jason.encode(err)
562562
{:error, ExitCodes.exit_unavailable(), res}
563563
end
564564

@@ -578,12 +578,12 @@ defmodule RabbitMQCtl do
578578

579579
# Catch all clauses
580580
defp format_error({:error, err}, %{formatter: "json"}, _) when is_map(err) do
581-
{:ok, res} = JSON.encode(err)
581+
{:ok, res} = Jason.encode(err)
582582
{:error, ExitCodes.exit_unavailable(), res}
583583
end
584584

585585
defp format_error({:error, exit_code, err}, %{formatter: "json"}, _) when is_map(err) do
586-
{:ok, res} = JSON.encode(err)
586+
{:ok, res} = Jason.encode(err)
587587
{:error, exit_code, res}
588588
end
589589

deps/rabbitmq_cli/mix.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ defmodule RabbitMQCtl.MixfileBase do
141141

142142
[
143143
{
144-
:json,
145-
path: Path.join(deps_dir, "json")
144+
:jason,
145+
path: Path.join(deps_dir, "jason")
146146
},
147147
{
148148
:csv,

deps/rabbitmq_cli/test/ctl/export_definitions_command_test.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ defmodule ExportDefinitionsCommandTest do
111111
{:ok, nil} = @command.run([valid_file_path()], context[:opts])
112112

113113
{:ok, bin} = File.read(valid_file_path())
114-
{:ok, map} = JSON.decode(bin)
114+
{:ok, map} = Jason.decode(bin)
115115
assert Map.has_key?(map, "rabbitmq_version")
116116
end
117117

@@ -128,7 +128,7 @@ defmodule ExportDefinitionsCommandTest do
128128
clear_parameter("/", "federation-upstream", "up-1")
129129

130130
{:ok, bin} = File.read(valid_file_path())
131-
{:ok, map} = JSON.decode(bin)
131+
{:ok, map} = Jason.decode(bin)
132132
assert Map.has_key?(map, "rabbitmq_version")
133133
params = map["parameters"]
134134
assert is_map(hd(params)["value"])

deps/rabbitmq_cli/test/ctl/set_user_limits_command_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,6 @@ defmodule SetUserLimitsCommandTest do
131131

132132
defp assert_limits(context, definition) do
133133
limits = get_user_limits(context[:user])
134-
assert {:ok, limits} == JSON.decode(definition)
134+
assert {:ok, limits} == Jason.decode(definition)
135135
end
136136
end

deps/rabbitmq_cli/test/ctl/set_vhost_limits_command_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,6 @@ defmodule SetVhostLimitsCommandTest do
138138

139139
defp assert_limits(context) do
140140
limits = get_vhost_limits(context[:vhost])
141-
assert {:ok, limits} == JSON.decode(context[:definition])
141+
assert {:ok, limits} == Jason.decode(context[:definition])
142142
end
143143
end

deps/rabbitmq_cli/test/json_formatting.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ defmodule JSONFormattingTest do
2929
error_check(command, exit_ok())
3030
end)
3131

32-
{:ok, doc} = JSON.decode(output)
32+
{:ok, doc} = Jason.decode(output)
3333

3434
assert Map.has_key?(doc, "memory")
3535
assert Map.has_key?(doc, "file_descriptors")
@@ -53,7 +53,7 @@ defmodule JSONFormattingTest do
5353
error_check(command, exit_ok())
5454
end)
5555

56-
{:ok, doc} = JSON.decode(output)
56+
{:ok, doc} = Jason.decode(output)
5757

5858
assert Enum.member?(doc["disk_nodes"], node)
5959
assert Map.has_key?(doc["listeners"], node)

deps/rabbitmq_cli/test/test_helper.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ true = Code.append_path(Path.join([System.get_env("DEPS_DIR"), "rabbit_common",
1919
true = Code.append_path(Path.join([System.get_env("DEPS_DIR"), "rabbit", "ebin"]))
2020

2121
true = Code.append_path(Path.join(["_build", Atom.to_string(Mix.env()), "lib", "amqp", "ebin"]))
22-
true = Code.append_path(Path.join(["_build", Atom.to_string(Mix.env()), "lib", "json", "ebin"]))
22+
true = Code.append_path(Path.join(["_build", Atom.to_string(Mix.env()), "lib", "jason", "ebin"]))
2323
true = Code.append_path(Path.join(["_build", Atom.to_string(Mix.env()), "lib", "x509", "ebin"]))
2424

2525
if function_exported?(Mix, :ensure_application!, 1) do

deps/rabbitmq_stream/docs/PROTOCOL.adoc

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,16 @@ used to make the difference between a request (0) and a response (1). Example fo
230230
|0x001c
231231
|Yes
232232

233+
|<<createsuperstream>>
234+
|Client
235+
|0x001d
236+
|Yes
237+
238+
|<<deletesuperstream>>
239+
|Client
240+
|0x001e
241+
|Yes
242+
233243
|===
234244

235245
=== DeclarePublisher
@@ -754,6 +764,31 @@ StreamStatsResponse => Key Version CorrelationId ResponseCode Stats
754764
Value => int64
755765
```
756766

767+
=== CreateSuperStream
768+
769+
```
770+
CreateSuperStream => Key Version CorrelationId Name [Partition] [BindingKey] Arguments
771+
Key => uint16 // 0x001d
772+
Version => uint16
773+
CorrelationId => uint32
774+
Name => string
775+
Partition => string
776+
BindingKey => string
777+
Arguments => [Argument]
778+
Argument => Key Value
779+
Key => string
780+
Value => string
781+
```
782+
783+
=== DeleteSuperStream
784+
785+
```
786+
Delete => Key Version CorrelationId Name
787+
Key => uint16 // 0x001e
788+
Version => uint16
789+
CorrelationId => uint32
790+
Name => string
791+
```
757792

758793
== Authentication
759794

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,15 @@ create_super_stream(VirtualHost,
7373
Name,
7474
Partitions,
7575
Arguments,
76-
RoutingKeys,
76+
BindingKeys,
7777
Username) ->
7878
gen_server:call(?MODULE,
7979
{create_super_stream,
8080
VirtualHost,
8181
Name,
8282
Partitions,
8383
Arguments,
84-
RoutingKeys,
84+
BindingKeys,
8585
Username}).
8686

8787
-spec delete_super_stream(binary(), binary(), binary()) ->
@@ -226,10 +226,10 @@ handle_call({create_super_stream,
226226
Name,
227227
Partitions,
228228
Arguments,
229-
RoutingKeys,
229+
BindingKeys,
230230
Username},
231231
_From, State) ->
232-
case validate_super_stream_creation(VirtualHost, Name, Partitions) of
232+
case validate_super_stream_creation(VirtualHost, Name, Partitions, BindingKeys) of
233233
{error, Reason} ->
234234
{reply, {error, Reason}, State};
235235
ok ->
@@ -273,7 +273,7 @@ handle_call({create_super_stream,
273273
add_super_stream_bindings(VirtualHost,
274274
Name,
275275
Partitions,
276-
RoutingKeys,
276+
BindingKeys,
277277
Username),
278278
case BindingsResult of
279279
ok ->
@@ -445,8 +445,8 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
445445
end
446446
catch
447447
exit:Error ->
448-
rabbit_log:error("Error while looking up exchange ~tp, ~tp",
449-
[rabbit_misc:rs(ExchangeName), Error]),
448+
rabbit_log:warning("Error while looking up exchange ~tp, ~tp",
449+
[rabbit_misc:rs(ExchangeName), Error]),
450450
{error, stream_not_found}
451451
end,
452452
{reply, Res, State};
@@ -655,7 +655,10 @@ super_stream_partitions(VirtualHost, SuperStream) ->
655655
{error, stream_not_found}
656656
end.
657657

658-
validate_super_stream_creation(VirtualHost, Name, Partitions) ->
658+
validate_super_stream_creation(_VirtualHost, _Name, Partitions, BindingKeys)
659+
when length(Partitions) =/= length(BindingKeys) ->
660+
{error, {validation_failed, "There must be the same number of partitions and binding keys"}};
661+
validate_super_stream_creation(VirtualHost, Name, Partitions, _BindingKeys) ->
659662
case exchange_exists(VirtualHost, Name) of
660663
{error, validation_failed} ->
661664
{error,
@@ -758,15 +761,15 @@ declare_super_stream_exchange(VirtualHost, Name, Username) ->
758761
add_super_stream_bindings(VirtualHost,
759762
Name,
760763
Partitions,
761-
RoutingKeys,
764+
BindingKeys,
762765
Username) ->
763-
PartitionsRoutingKeys = lists:zip(Partitions, RoutingKeys),
766+
PartitionsBindingKeys = lists:zip(Partitions, BindingKeys),
764767
BindingsResult =
765-
lists:foldl(fun ({Partition, RoutingKey}, {ok, Order}) ->
768+
lists:foldl(fun ({Partition, BindingKey}, {ok, Order}) ->
766769
case add_super_stream_binding(VirtualHost,
767770
Name,
768771
Partition,
769-
RoutingKey,
772+
BindingKey,
770773
Order,
771774
Username)
772775
of
@@ -778,7 +781,7 @@ add_super_stream_bindings(VirtualHost,
778781
(_, {{error, _Reason}, _Order} = Acc) ->
779782
Acc
780783
end,
781-
{ok, 0}, PartitionsRoutingKeys),
784+
{ok, 0}, PartitionsBindingKeys),
782785
case BindingsResult of
783786
{ok, _} ->
784787
ok;
@@ -789,7 +792,7 @@ add_super_stream_bindings(VirtualHost,
789792
add_super_stream_binding(VirtualHost,
790793
SuperStream,
791794
Partition,
792-
RoutingKey,
795+
BindingKey,
793796
Order,
794797
Username) ->
795798
{ok, ExchangeNameBin} =
@@ -806,7 +809,7 @@ add_super_stream_binding(VirtualHost,
806809
Order),
807810
case rabbit_binding:add(#binding{source = ExchangeName,
808811
destination = QueueName,
809-
key = RoutingKey,
812+
key = BindingKey,
810813
args = Arguments},
811814
fun (_X, Q) when ?is_amqqueue(Q) ->
812815
try

0 commit comments

Comments
 (0)