Skip to content

Commit 4642879

Browse files
committed
Bind queue
1 parent 9153532 commit 4642879

File tree

7 files changed

+129
-17
lines changed

7 files changed

+129
-17
lines changed

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
-include_lib("rabbit_common/include/rabbit.hrl").
55

66
-export([process_request/4]).
7+
-export([args_hash/1]).
78

89
%% An HTTP message mapped to AMQP using projected mode
910
%% [HTTP over AMQP Working Draft 06 §4.1]
@@ -84,6 +85,47 @@ process_http_request(<<"POST">>,
8485
]},
8586
{Props, AppProps, RespPayload};
8687

88+
process_http_request(<<"POST">>,
89+
<<"/$management/queues/", QNamePath/binary>>,
90+
{symbol, <<"application/amqp-management+amqp", _OptionalType/binary>>},
91+
[ReqPayload],
92+
Vhost,
93+
#user{username = Username},
94+
_) ->
95+
[QNameBin, <<>>] = re:split(QNamePath, <<"/\\$management/entities$">>, [{return, binary}]),
96+
%% TODO add a top level binding key so that other entity types can be created in future by posting
97+
%% to the queue's $management/entities URI
98+
#{source_exchange := XNameBin,
99+
binding_key := BindingKey,
100+
arguments := Args} = decode_binding(ReqPayload),
101+
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
102+
QName = rabbit_misc:r(Vhost, queue, QNameBin),
103+
Binding = #binding{source = XName,
104+
destination = QName,
105+
key = BindingKey,
106+
args = Args},
107+
%%TODO If the binding already exists, return 303 with location.
108+
ok = rabbit_binding:add(Binding, Username),
109+
Props = #'v1_0.properties'{
110+
subject = {utf8, <<"201">>},
111+
content_type = {symbol, <<"application/amqp-management+amqp;type=entity-collection">>}
112+
},
113+
%% TODO URI encode, which is against the HTTP over AMQP spec [3.1].
114+
%% How else to escape "/" in exchange names and binding keys?
115+
ArgsHash = args_hash(Args),
116+
Self = {utf8, <<"/$management/queues/",
117+
QNameBin/binary, "/bindings/",
118+
XNameBin/binary, "/",
119+
BindingKey/binary, "/",
120+
ArgsHash/binary>>},
121+
AppProps = #'v1_0.application_properties'{
122+
content = [{{utf8, <<"location">>}, Self}]},
123+
%% TODO Include source_exchange, binding key, and binding arguments in the response?
124+
RespPayload = {map, [{{utf8, <<"type">>}, {utf8, <<"binding">>}},
125+
{{utf8, <<"self">>}, Self}
126+
]},
127+
{Props, AppProps, RespPayload};
128+
87129
process_http_request(<<"POST">>,
88130
<<"/$management/queues/", QNamePath/binary>>,
89131
undefined,
@@ -149,6 +191,18 @@ decode_queue({map, KVList}) ->
149191
Acc#{arguments => Args}
150192
end, #{}, KVList).
151193

194+
decode_binding({map, KVList}) ->
195+
lists:foldl(
196+
fun({{utf8, <<"source_exchange">>}, {utf8, V}}, Acc) ->
197+
Acc#{source_exchange => V};
198+
({{utf8, <<"binding_key">>}, {utf8, V}}, Acc) ->
199+
Acc#{binding_key => V};
200+
({{utf8, <<"arguments">>}, {map, List}}, Acc) ->
201+
Args = [mc_amqpl:to_091(Key, TypeVal)
202+
|| {{utf8, Key}, TypeVal} <- List],
203+
Acc#{arguments => Args}
204+
end, #{}, KVList).
205+
152206
decode_req([], {Props, AppProps, DataRev}) ->
153207
#msg{properties = Props,
154208
application_properties = AppProps,
@@ -161,3 +215,7 @@ decode_req([#'v1_0.data'{content = C} | Rem], {Props, AppProps, DataRev}) ->
161215
decode_req(Rem, {Props, AppProps, [C | DataRev]});
162216
decode_req([_IgnoreOtherSection | Rem], Acc) ->
163217
decode_req(Rem, Acc).
218+
219+
-spec args_hash(rabbit_framing:amqp_table()) -> binary().
220+
args_hash(Args) ->
221+
list_to_binary(rabbit_misc:base64url(<<(erlang:phash2(Args, 1 bsl 32)):32>>)).

deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
-export[attach_management_link_pair_sync/2,
1212
declare_queue/2,
13+
bind_queue/5,
14+
% unbind_queue/5,
1315
purge_queue/2,
1416
delete_queue/2].
1517

@@ -126,6 +128,50 @@ declare_queue(#link_pair{outgoing_link = OutgoingLink,
126128
{ok, proplists:to_map(KVList)}
127129
end.
128130

131+
-spec bind_queue(link_pair(), binary(), binary(), binary(), #{binary() => {atom(), term()}}) ->
132+
ok | {error, term()}.
133+
bind_queue(#link_pair{outgoing_link = OutgoingLink,
134+
incoming_link = IncomingLink},
135+
QueueName, ExchangeName, BindingKey, BindingArguments) ->
136+
KVList = maps:fold(
137+
fun(Key, TaggedVal = {T, _}, L)
138+
when is_binary(Key) andalso is_atom(T) ->
139+
[{{utf8, Key}, TaggedVal} | L]
140+
end, [], BindingArguments),
141+
Body0 = {map, [
142+
{{utf8, <<"source_exchange">>}, {utf8, ExchangeName}},
143+
{{utf8, <<"binding_key">>}, {utf8, BindingKey}},
144+
{{utf8, <<"arguments">>}, {map, KVList}}
145+
]},
146+
Body = iolist_to_binary(amqp10_framing:encode_bin(Body0)),
147+
148+
MessageId = message_id(),
149+
HttpMethod = <<"POST">>,
150+
HttpRequestTarget = <<"/$management/queues/", QueueName/binary, "/$management/entities">>,
151+
ContentType = <<"application/amqp-management+amqp;type=entity">>,
152+
Props = #{message_id => {binary, MessageId},
153+
to => HttpRequestTarget,
154+
subject => HttpMethod,
155+
reply_to => <<"$me">>,
156+
content_type => ContentType},
157+
Req0 = amqp10_msg:new(<<>>, Body, true),
158+
Req = amqp10_msg:set_properties(Props, Req0),
159+
160+
ok = amqp10_client:flow_link_credit(IncomingLink, 1, never),
161+
maybe
162+
ok ?= amqp10_client:send_msg(OutgoingLink, Req),
163+
{ok, Resp} ?= receive {amqp10_msg, IncomingLink, Message} -> {ok, Message}
164+
after ?TIMEOUT -> {error, response_timeout}
165+
end,
166+
#{correlation_id := MessageId,
167+
subject := <<"201">>
168+
} = amqp10_msg:properties(Resp),
169+
#{<<"http:response">> := <<"1.1">>,
170+
<<"location">> := _BindingRUI
171+
} = amqp10_msg:application_properties(Resp),
172+
ok
173+
end.
174+
129175
-spec purge_queue(link_pair(), binary()) ->
130176
{ok, map()} | {error, term()}.
131177
purge_queue(#link_pair{outgoing_link = OutgoingLink,

deps/rabbitmq_amqp_client/test/management_SUITE.erl

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,30 @@ queue(Config) ->
7272
exclusive => false,
7373
auto_delete => false,
7474
arguments => #{<<"x-queue-type">> => {symbol, <<"quorum">>}}},
75-
{ok, #{{utf8, <<"target">>} := {utf8, Address}} } = rabbitmq_amqp_client:declare_queue(LinkPair, Q),
75+
{ok, #{{utf8, <<"target">>} := {utf8, TargetAddr1}} } = rabbitmq_amqp_client:declare_queue(LinkPair, Q),
7676

77-
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"test-sender">>, Address),
78-
ok = wait_for_credit(Sender),
77+
{ok, Sender1} = amqp10_client:attach_sender_link(Session, <<"sender 1">>, TargetAddr1),
78+
ok = wait_for_credit(Sender1),
7979
flush(credited),
80-
DTag = <<"tag 1">>,
81-
Msg1 = amqp10_msg:new(DTag, <<"m1">>, false),
82-
ok = amqp10_client:send_msg(Sender, Msg1),
83-
ok = wait_for_accepted(DTag),
80+
DTag1 = <<"tag 1">>,
81+
Msg1 = amqp10_msg:new(DTag1, <<"m1">>, false),
82+
ok = amqp10_client:send_msg(Sender1, Msg1),
83+
ok = wait_for_accepted(DTag1),
84+
85+
RoutingKey = BindingKey = <<"key 1">>,
86+
SourceExchange = <<"amq.direct">>,
87+
?assertEqual(ok, rabbitmq_amqp_client:bind_queue(LinkPair, QName, SourceExchange, BindingKey, #{})),
88+
TargetAddr2 = <<"/exchange/", SourceExchange/binary, "/", RoutingKey/binary>>,
89+
90+
{ok, Sender2} = amqp10_client:attach_sender_link(Session, <<"sender 2">>, TargetAddr2),
91+
ok = wait_for_credit(Sender2),
92+
flush(credited),
93+
DTag2 = <<"tag 2">>,
94+
Msg2 = amqp10_msg:new(DTag2, <<"m2">>, false),
95+
ok = amqp10_client:send_msg(Sender2, Msg2),
96+
ok = wait_for_accepted(DTag2),
8497

85-
?assertEqual({ok, #{message_count => 1}},
98+
?assertEqual({ok, #{message_count => 2}},
8699
rabbitmq_amqp_client:purge_queue(LinkPair, QName)),
87100

88101
?assertEqual({ok, #{message_count => 0}},

deps/rabbitmq_management/src/rabbit_mgmt_wm_binding.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ lookup(RoutingKey, Hash, [#binding{args = Args} | Rest]) ->
127127
end.
128128

129129
args_hash(Args) ->
130-
rabbit_mgmt_format:args_hash(Args).
130+
rabbit_amqp_management:args_hash(Args).
131131

132132
unquote(Name) ->
133133
list_to_binary(rabbit_http_util:unquote(Name)).

deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2094,7 +2094,7 @@ arguments_test(Config) ->
20942094
passed.
20952095

20962096
table_hash(Table) ->
2097-
binary_to_list(rabbit_mgmt_format:args_hash(Table)).
2097+
binary_to_list(rabbit_amqp_management:args_hash(Table)).
20982098

20992099
arguments_table_test(Config) ->
21002100
Args = #{'upstreams' => [<<"amqp://localhost/%2F/upstream1">>,

deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,7 @@ arguments_test(Config) ->
892892
passed.
893893

894894
table_hash(Table) ->
895-
binary_to_list(rabbit_mgmt_format:args_hash(Table)).
895+
binary_to_list(rabbit_amqp_management:args_hash(Table)).
896896

897897
queue_actions_test(Config) ->
898898
http_put(Config, "/queues/%2F/q", #{}, {group, '2xx'}),

deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828

2929
-export([clean_consumer_details/1, clean_channel_details/1]).
3030

31-
-export([args_hash/1]).
32-
3331
-import(rabbit_misc, [pget/2, pget/3, pset/3]).
3432
-import(rabbit_data_coercion, [to_binary/1]).
3533

@@ -340,7 +338,7 @@ pack_binding_props(<<"">>, []) ->
340338
pack_binding_props(Key, []) ->
341339
list_to_binary(quote_binding(Key));
342340
pack_binding_props(Key, Args) ->
343-
ArgsEnc = args_hash(Args),
341+
ArgsEnc = rabbit_amqp_management:args_hash(Args),
344342
list_to_binary(quote_binding(Key) ++ "~" ++ quote_binding(ArgsEnc)).
345343

346344
quote_binding(Name) ->
@@ -595,6 +593,3 @@ parse_bool(true) -> true;
595593
parse_bool(false) -> false;
596594
parse_bool(undefined) -> undefined;
597595
parse_bool(V) -> throw({error, {not_boolean, V}}).
598-
599-
args_hash(Args) ->
600-
list_to_binary(rabbit_misc:base64url(<<(erlang:phash2(Args, 1 bsl 32)):32>>)).

0 commit comments

Comments
 (0)