Skip to content

Commit 7316846

Browse files
committed
Use amqp-value section instead of data section
1 parent 06314d4 commit 7316846

File tree

5 files changed

+61
-74
lines changed

5 files changed

+61
-74
lines changed

deps/amqp10_common/src/amqp10_framing.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,10 @@ decode({described, Descriptor, {map, Fields}}) ->
120120
Else ->
121121
fill_from_map(Else, Fields)
122122
end;
123-
decode({described, Descriptor, {binary, Field}}) ->
123+
decode({described, Descriptor, {binary, Field} = BinaryField}) ->
124124
case amqp10_framing0:record_for(Descriptor) of
125125
#'v1_0.amqp_value'{} ->
126-
#'v1_0.amqp_value'{content = {binary, Field}};
126+
#'v1_0.amqp_value'{content = BinaryField};
127127
#'v1_0.data'{} ->
128128
#'v1_0.data'{content = Field}
129129
end;

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 27 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,32 +5,22 @@
55

66
-export([handle_request/4]).
77

8-
%% An HTTP message mapped to AMQP using projected mode
9-
%% [HTTP over AMQP Working Draft 06 §4.1]
10-
-record(msg,
11-
{
12-
properties :: #'v1_0.properties'{},
13-
application_properties :: list(),
14-
data = [] :: [#'v1_0.data'{}]
15-
}).
16-
178
-spec handle_request(binary(), rabbit_types:vhost(), rabbit_types:user(), pid()) -> iolist().
189
handle_request(Request, Vhost, User, ConnectionPid) ->
1910
ReqSections = amqp10_framing:decode_bin(Request),
2011
?DEBUG("~s Inbound request:~n ~tp",
2112
[?MODULE, [amqp10_framing:pprint(Section) || Section <- ReqSections]]),
22-
#msg{properties = #'v1_0.properties'{
23-
message_id = MessageId,
24-
to = {utf8, HttpRequestTarget},
25-
subject = {utf8, HttpMethod},
26-
%% see Link Pair CS 01 §2.1
27-
%% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html#_Toc51331305
28-
reply_to = {utf8, <<"$me">>}},
29-
application_properties = _OtherHttpHeaders,
30-
data = ReqBody
31-
} = decode_req(ReqSections, {undefined, undefined, []}),
13+
{#'v1_0.properties'{
14+
message_id = MessageId,
15+
to = {utf8, HttpRequestTarget},
16+
subject = {utf8, HttpMethod},
17+
%% see Link Pair CS 01 §2.1
18+
%% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html#_Toc51331305
19+
reply_to = {utf8, <<"$me">>}},
20+
ReqBin
21+
} = decode_req(ReqSections, {undefined, undefined}),
3222
{PathSegments, QueryMap} = parse_uri(HttpRequestTarget),
33-
ReqPayload = amqp10_framing:decode_bin(list_to_binary(ReqBody)),
23+
ReqPayload = amqp10_framing:decode_bin(ReqBin),
3424
{RespProps0,
3525
RespAppProps0 = #'v1_0.application_properties'{content = C},
3626
RespPayload} = handle_http_req(HttpMethod,
@@ -47,7 +37,8 @@ handle_request(Request, Vhost, User, ConnectionPid) ->
4737
correlation_id = MessageId},
4838
RespAppProps = RespAppProps0#'v1_0.application_properties'{
4939
content = [{{utf8, <<"http:response">>}, {utf8, <<"1.1">>}} | C]},
50-
RespDataSect = #'v1_0.data'{content = iolist_to_binary(amqp10_framing:encode_bin(RespPayload))},
40+
RespBody = iolist_to_binary(amqp10_framing:encode_bin(RespPayload)),
41+
RespDataSect = #'v1_0.amqp_value'{content = {binary, RespBody}},
5142
RespSections = [RespProps, RespAppProps, RespDataSect],
5243
[amqp10_framing:encode_bin(Sect) || Sect <- RespSections].
5344

@@ -79,7 +70,7 @@ handle_http_req(<<"PUT">>,
7970
{new, _Q} = rabbit_queue_type:declare(Q0, node()),
8071
Props = #'v1_0.properties'{subject = {utf8, <<"201">>}},
8172
AppProps = #'v1_0.application_properties'{content = []},
82-
RespPayload = {map, []},
73+
RespPayload = undefined,
8374
{Props, AppProps, RespPayload};
8475

8576
handle_http_req(<<"PUT">>,
@@ -107,14 +98,13 @@ handle_http_req(<<"PUT">>,
10798
Username),
10899
Props = #'v1_0.properties'{subject = {utf8, <<"201">>} },
109100
AppProps = #'v1_0.application_properties'{content = []},
110-
111-
RespPayload = {map, []},
101+
RespPayload = undefined,
112102
{Props, AppProps, RespPayload};
113103

114104
handle_http_req(<<"DELETE">>,
115105
[<<"queues">>, QNameBinQ, <<"messages">>],
116106
_Query,
117-
[],
107+
[undefined],
118108
Vhost,
119109
_User,
120110
ConnPid) ->
@@ -133,7 +123,7 @@ handle_http_req(<<"DELETE">>,
133123
handle_http_req(<<"DELETE">>,
134124
[<<"queues">>, QNameBinQ],
135125
_Query,
136-
[],
126+
[undefined],
137127
Vhost,
138128
#user{username = Username},
139129
ConnPid) ->
@@ -148,7 +138,7 @@ handle_http_req(<<"DELETE">>,
148138
handle_http_req(<<"DELETE">>,
149139
[<<"exchanges">>, XNameBinQ],
150140
_Query,
151-
[],
141+
[undefined],
152142
Vhost,
153143
#user{username = Username},
154144
_ConnPid) ->
@@ -164,7 +154,7 @@ handle_http_req(<<"DELETE">>,
164154
end,
165155
Props = #'v1_0.properties'{subject = {utf8, <<"204">>}},
166156
AppProps = #'v1_0.application_properties'{content = []},
167-
RespPayload = {map, []},
157+
RespPayload = undefined,
168158
{Props, AppProps, RespPayload};
169159

170160
handle_http_req(<<"POST">>,
@@ -195,13 +185,13 @@ handle_http_req(<<"POST">>,
195185
Location = compose_binding_uri(SrcXNameBin, DstKind, DstNameBin, BindingKey, Args),
196186
AppProps = #'v1_0.application_properties'{
197187
content = [{{utf8, <<"location">>}, {utf8, Location}}]},
198-
RespPayload = {map, []},
188+
RespPayload = undefined,
199189
{Props, AppProps, RespPayload};
200190

201191
handle_http_req(<<"DELETE">>,
202192
[<<"bindings">>, BindingSegment],
203193
_Query,
204-
[],
194+
[undefined],
205195
Vhost,
206196
#user{username = Username},
207197
_ConnPid) ->
@@ -217,14 +207,14 @@ handle_http_req(<<"DELETE">>,
217207
end,
218208
Props = #'v1_0.properties'{subject = {utf8, <<"204">>}},
219209
AppProps = #'v1_0.application_properties'{content = []},
220-
RespPayload = {map, []},
210+
RespPayload = undefined,
221211
{Props, AppProps, RespPayload};
222212

223213
handle_http_req(<<"GET">>,
224214
[<<"bindings">>],
225215
QueryMap = #{<<"src">> := SrcXNameBin,
226216
<<"key">> := Key},
227-
[],
217+
[undefined],
228218
Vhost,
229219
_User,
230220
_ConnPid) ->
@@ -326,17 +316,13 @@ encode_bindings(Bindings) ->
326316
end, Bindings),
327317
{list, Bs}.
328318

329-
decode_req([], {Props, AppProps, DataRev}) ->
330-
#msg{properties = Props,
331-
application_properties = AppProps,
332-
data = lists:reverse(DataRev)};
319+
decode_req([], Acc) ->
320+
Acc;
333321
decode_req([#'v1_0.properties'{} = P | Rem], Acc) ->
334322
decode_req(Rem, setelement(1, Acc, P));
335-
decode_req([#'v1_0.application_properties'{content = Content} | Rem], Acc) ->
336-
decode_req(Rem, setelement(2, Content, Acc));
337-
decode_req([#'v1_0.data'{content = C} | Rem], {Props, AppProps, DataRev}) ->
338-
decode_req(Rem, {Props, AppProps, [C | DataRev]});
339-
decode_req([_IgnoreOtherSection | Rem], Acc) ->
323+
decode_req([#'v1_0.amqp_value'{content = {binary, B}} | Rem], Acc) ->
324+
decode_req(Rem, setelement(2, Acc, B));
325+
decode_req([_IgnoreSection | Rem], Acc) ->
340326
decode_req(Rem, Acc).
341327

342328
parse_uri(Uri) ->

deps/rabbitmq_amqp_client/app.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ def all_beam_files(name = "all_beam_files"):
1313
app_name = "rabbitmq_amqp_client",
1414
dest = "ebin",
1515
erlc_opts = "//:erlc_opts",
16+
deps = ["//deps/amqp10_common:erlang_app"],
1617
)
1718

1819
def all_srcs(name = "all_srcs"):
@@ -48,6 +49,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
4849
app_name = "rabbitmq_amqp_client",
4950
dest = "test",
5051
erlc_opts = "//:test_erlc_opts",
52+
deps = ["//deps/amqp10_common:erlang_app"],
5153
)
5254
filegroup(
5355
name = "test_beam_files",

deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
-feature(maybe_expr, enable).
1010

11+
-include_lib("amqp10_common/include/amqp10_framing.hrl").
12+
1113
-export[attach_management_link_pair_sync/2,
1214
detach_management_link_pair_sync/1,
1315
declare_queue/2,
@@ -140,9 +142,7 @@ declare_queue(LinkPair, QueueProperties) ->
140142
end, [], V),
141143
{N, [{{utf8, <<"arguments">>}, {map, KVList}} | L0]}
142144
end, {undefined, []}, QueueProperties),
143-
Body1 = {map, Body0},
144-
Body = iolist_to_binary(amqp10_framing:encode_bin(Body1)),
145-
145+
Body = {map, Body0},
146146
QNameQuoted = uri_string:quote(QName),
147147
Props = #{subject => <<"PUT">>,
148148
to => <<"/queues/", QNameQuoted/binary>>},
@@ -151,9 +151,10 @@ declare_queue(LinkPair, QueueProperties) ->
151151
{ok, Resp} ->
152152
case amqp10_msg:properties(Resp) of
153153
#{subject := <<"201">>} ->
154-
RespBody = amqp10_msg:body_bin(Resp),
155-
[{map, KVList}] = amqp10_framing:decode_bin(RespBody),
156-
{ok, proplists:to_map(KVList)};
154+
#'v1_0.amqp_value'{content = {binary, RespBin}} = amqp10_msg:body(Resp),
155+
[undefined] = amqp10_framing:decode_bin(RespBin),
156+
% {ok, proplists:to_map(KVList)};
157+
{ok, #{}};
157158
Other ->
158159
{error, Other}
159160
end;
@@ -179,14 +180,12 @@ bind(DestinationKind, LinkPair, Destination, Source, BindingKey, BindingArgument
179180
when is_binary(Key) ->
180181
[{{utf8, Key}, TaggedVal} | L]
181182
end, [], BindingArguments),
182-
Body0 = {map, [
183-
{{utf8, <<"source">>}, {utf8, Source}},
184-
{{utf8, DestinationKind}, {utf8, Destination}},
185-
{{utf8, <<"binding_key">>}, {utf8, BindingKey}},
186-
{{utf8, <<"arguments">>}, {map, KVList}}
187-
]},
188-
Body = iolist_to_binary(amqp10_framing:encode_bin(Body0)),
189-
183+
Body = {map, [
184+
{{utf8, <<"source">>}, {utf8, Source}},
185+
{{utf8, DestinationKind}, {utf8, Destination}},
186+
{{utf8, <<"binding_key">>}, {utf8, BindingKey}},
187+
{{utf8, <<"arguments">>}, {map, KVList}}
188+
]},
190189
Props = #{subject => <<"POST">>,
191190
to => <<"/bindings">>},
192191

@@ -235,12 +234,12 @@ unbind(DestinationChar, LinkPair, Destination, Source, BindingKey, BindingArgume
235234
Props = #{subject => <<"GET">>,
236235
to => Uri0},
237236

238-
case request(LinkPair, Props, <<>>) of
237+
case request(LinkPair, Props, undefined) of
239238
{ok, Resp} ->
240239
case amqp10_msg:properties(Resp) of
241240
#{subject := <<"200">>} ->
242-
RespBody = amqp10_msg:body_bin(Resp),
243-
[{list, Bindings}] = amqp10_framing:decode_bin(RespBody),
241+
#'v1_0.amqp_value'{content = {binary, RespBin}} = amqp10_msg:body(Resp),
242+
[{list, Bindings}] = amqp10_framing:decode_bin(RespBin),
244243
case search_binding_uri(BindingArguments, Bindings) of
245244
{ok, Uri} ->
246245
delete_binding(LinkPair, Uri);
@@ -278,7 +277,7 @@ search_binding_uri(BindingArguments, [{map, Binding} | Bindings]) ->
278277
delete_binding(LinkPair, BindingUri) ->
279278
Props = #{subject => <<"DELETE">>,
280279
to => BindingUri},
281-
case request(LinkPair, Props, <<>>) of
280+
case request(LinkPair, Props, undefined) of
282281
{ok, Resp} ->
283282
case amqp10_msg:properties(Resp) of
284283
#{subject := <<"204">>} ->
@@ -307,15 +306,15 @@ purge_or_delete_queue(LinkPair, QueueName, PathSuffix) ->
307306
HttpRequestTarget = <<"/queues/", QNameQuoted/binary, PathSuffix/binary>>,
308307
Props = #{subject => <<"DELETE">>,
309308
to => HttpRequestTarget},
310-
case request(LinkPair, Props, <<>>) of
309+
case request(LinkPair, Props, undefined) of
311310
{ok, Resp} ->
312311
case amqp10_msg:properties(Resp) of
313312
#{subject := <<"200">>} ->
314-
RespBody = amqp10_msg:body_bin(Resp),
313+
#'v1_0.amqp_value'{content = {binary, RespBin}} = amqp10_msg:body(Resp),
315314
[{map, [
316315
{{utf8, <<"message_count">>}, {ulong, Count}}
317316
]
318-
}] = amqp10_framing:decode_bin(RespBody),
317+
}] = amqp10_framing:decode_bin(RespBin),
319318
{ok, #{message_count => Count}};
320319
_ ->
321320
{error, Resp}
@@ -325,7 +324,7 @@ purge_or_delete_queue(LinkPair, QueueName, PathSuffix) ->
325324
end.
326325

327326
-spec declare_exchange(link_pair(), exchange_properties()) ->
328-
{ok, map()} | {error, term()}.
327+
ok | {error, term()}.
329328
declare_exchange(LinkPair, ExchangeProperties) ->
330329
{XName, Body0} = maps:fold(
331330
fun(name, V, {undefined, L}) when is_binary(V) ->
@@ -346,8 +345,7 @@ declare_exchange(LinkPair, ExchangeProperties) ->
346345
end, [], V),
347346
{N, [{{utf8, <<"arguments">>}, {map, KVList}} | L0]}
348347
end, {undefined, []}, ExchangeProperties),
349-
Body1 = {map, Body0},
350-
Body = iolist_to_binary(amqp10_framing:encode_bin(Body1)),
348+
Body = {map, Body0},
351349

352350
XNameQuoted = uri_string:quote(XName),
353351
Props = #{subject => <<"PUT">>,
@@ -357,9 +355,9 @@ declare_exchange(LinkPair, ExchangeProperties) ->
357355
{ok, Resp} ->
358356
case amqp10_msg:properties(Resp) of
359357
#{subject := <<"201">>} ->
360-
RespBody = amqp10_msg:body_bin(Resp),
361-
[{map, KVList}] = amqp10_framing:decode_bin(RespBody),
362-
{ok, proplists:to_map(KVList)};
358+
#'v1_0.amqp_value'{content = {binary, RespBin}} = amqp10_msg:body(Resp),
359+
[undefined] = amqp10_framing:decode_bin(RespBin),
360+
ok;
363361
_ ->
364362
{error, Resp}
365363
end;
@@ -373,7 +371,7 @@ delete_exchange(LinkPair, ExchangeName) ->
373371
XNameQuoted = uri_string:quote(ExchangeName),
374372
Props = #{subject => <<"DELETE">>,
375373
to => <<"/exchanges/", XNameQuoted/binary>>},
376-
case request(LinkPair, Props, <<>>) of
374+
case request(LinkPair, Props, undefined) of
377375
{ok, Resp} ->
378376
case amqp10_msg:properties(Resp) of
379377
#{subject := <<"204">>} ->
@@ -385,14 +383,15 @@ delete_exchange(LinkPair, ExchangeName) ->
385383
Err
386384
end.
387385

388-
-spec request(link_pair(), amqp10_msg:amqp10_properties(), binary()) ->
386+
-spec request(link_pair(), amqp10_msg:amqp10_properties(), undefined | amqp10_prim()) ->
389387
{ok, Response :: amqp10_msg:amqp10_msg()} | {error, term()}.
390388
request(#link_pair{outgoing_link = OutgoingLink,
391389
incoming_link = IncomingLink}, Properties, Body) ->
392390
MessageId = message_id(),
393391
Properties1 = Properties#{message_id => {binary, MessageId},
394392
reply_to => <<"$me">>},
395-
Request = amqp10_msg:new(<<>>, Body, true),
393+
BodyBin = iolist_to_binary(amqp10_framing:encode_bin(Body)),
394+
Request = amqp10_msg:new(<<>>, #'v1_0.amqp_value'{content = {binary, BodyBin}}, true),
396395
Request1 = amqp10_msg:set_properties(Properties1, Request),
397396
ok = amqp10_client:flow_link_credit(IncomingLink, 1, never),
398397
case amqp10_client:send_msg(OutgoingLink, Request1) of

deps/rabbitmq_amqp_client/test/management_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ all_management_operations(Config) ->
117117
auto_delete => false,
118118
internal => false,
119119
arguments => #{}},
120-
{ok, #{}} = rabbitmq_amqp_client:declare_exchange(LinkPair, X),
120+
?assertEqual(ok, rabbitmq_amqp_client:declare_exchange(LinkPair, X)),
121121

122122
TargetAddr3 = <<"/exchange/", XName/binary>>,
123123
SourceExchange = <<"amq.direct">>,

0 commit comments

Comments
 (0)