Skip to content

Commit d3d5352

Browse files
committed
Allow for list and map in amqp-value section
1 parent 15ee8d6 commit d3d5352

File tree

3 files changed

+46
-48
lines changed

3 files changed

+46
-48
lines changed

deps/amqp10_common/src/amqp10_framing.erl

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,16 @@ symbolify(FieldName) when is_atom(FieldName) ->
100100

101101
%% A sequence comes as an arbitrary list of values; it's not a
102102
%% composite type.
103-
decode({described, Descriptor, {list, Fields}}) ->
103+
decode({described, Descriptor, {list, Fields} = Type}) ->
104104
case amqp10_framing0:record_for(Descriptor) of
105105
#'v1_0.amqp_sequence'{} ->
106106
#'v1_0.amqp_sequence'{content = [decode(F) || F <- Fields]};
107+
#'v1_0.amqp_value'{} ->
108+
#'v1_0.amqp_value'{content = Type};
107109
Else ->
108110
fill_from_list(Else, Fields)
109111
end;
110-
decode({described, Descriptor, {map, Fields}}) ->
112+
decode({described, Descriptor, {map, Fields} = Type}) ->
111113
case amqp10_framing0:record_for(Descriptor) of
112114
#'v1_0.application_properties'{} ->
113115
#'v1_0.application_properties'{content = decode_map(Fields)};
@@ -117,13 +119,15 @@ decode({described, Descriptor, {map, Fields}}) ->
117119
#'v1_0.message_annotations'{content = decode_map(Fields)};
118120
#'v1_0.footer'{} ->
119121
#'v1_0.footer'{content = decode_map(Fields)};
122+
#'v1_0.amqp_value'{} ->
123+
#'v1_0.amqp_value'{content = Type};
120124
Else ->
121125
fill_from_map(Else, Fields)
122126
end;
123-
decode({described, Descriptor, {binary, Field} = BinaryField}) ->
127+
decode({described, Descriptor, {binary, Field} = Type}) ->
124128
case amqp10_framing0:record_for(Descriptor) of
125129
#'v1_0.amqp_value'{} ->
126-
#'v1_0.amqp_value'{content = BinaryField};
130+
#'v1_0.amqp_value'{content = Type};
127131
#'v1_0.data'{} ->
128132
#'v1_0.data'{content = Field}
129133
end;

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,26 @@ handle_request(Request, Vhost, User, ConnectionPid) ->
1717
%% see Link Pair CS 01 §2.1
1818
%% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html#_Toc51331305
1919
reply_to = {utf8, <<"$me">>}},
20-
ReqBin
20+
ReqPayload
2121
} = decode_req(ReqSections, {undefined, undefined}),
2222
{PathSegments, QueryMap} = parse_uri(HttpRequestTarget),
23-
ReqPayload = amqp10_framing:decode_bin(ReqBin),
2423
{RespProps0,
2524
RespAppProps0 = #'v1_0.application_properties'{content = C},
26-
RespPayload} = handle_http_req(HttpMethod,
27-
PathSegments,
28-
QueryMap,
29-
ReqPayload,
30-
Vhost,
31-
User,
32-
ConnectionPid),
25+
RespBody} = handle_http_req(HttpMethod,
26+
PathSegments,
27+
QueryMap,
28+
ReqPayload,
29+
Vhost,
30+
User,
31+
ConnectionPid),
3332
RespProps = RespProps0#'v1_0.properties'{
3433
%% "To associate a response with a request, the correlation-id value of the response
3534
%% properties MUST be set to the message-id value of the request properties."
3635
%% [HTTP over AMQP WD 06 §5.1]
3736
correlation_id = MessageId},
3837
RespAppProps = RespAppProps0#'v1_0.application_properties'{
3938
content = [{{utf8, <<"http:response">>}, {utf8, <<"1.1">>}} | C]},
40-
RespBody = iolist_to_binary(amqp10_framing:encode_bin(RespPayload)),
41-
RespDataSect = #'v1_0.amqp_value'{content = {binary, RespBody}},
39+
RespDataSect = #'v1_0.amqp_value'{content = RespBody},
4240
RespSections = [RespProps, RespAppProps, RespDataSect],
4341
[amqp10_framing:encode_bin(Sect) || Sect <- RespSections].
4442

@@ -49,7 +47,7 @@ handle_request(Request, Vhost, User, ConnectionPid) ->
4947
handle_http_req(<<"PUT">>,
5048
[<<"queues">>, QNameBinQ],
5149
_Query,
52-
[ReqPayload],
50+
ReqPayload,
5351
Vhost,
5452
#user{username = Username},
5553
ConnPid) ->
@@ -70,13 +68,13 @@ handle_http_req(<<"PUT">>,
7068
{new, _Q} = rabbit_queue_type:declare(Q0, node()),
7169
Props = #'v1_0.properties'{subject = {utf8, <<"201">>}},
7270
AppProps = #'v1_0.application_properties'{content = []},
73-
RespPayload = undefined,
71+
RespPayload = null,
7472
{Props, AppProps, RespPayload};
7573

7674
handle_http_req(<<"PUT">>,
7775
[<<"exchanges">>, XNameBinQ],
7876
_Query,
79-
[ReqPayload],
77+
ReqPayload,
8078
Vhost,
8179
#user{username = Username},
8280
_ConnPid) ->
@@ -98,13 +96,13 @@ handle_http_req(<<"PUT">>,
9896
Username),
9997
Props = #'v1_0.properties'{subject = {utf8, <<"201">>} },
10098
AppProps = #'v1_0.application_properties'{content = []},
101-
RespPayload = undefined,
99+
RespPayload = null,
102100
{Props, AppProps, RespPayload};
103101

104102
handle_http_req(<<"DELETE">>,
105103
[<<"queues">>, QNameBinQ, <<"messages">>],
106104
_Query,
107-
[undefined],
105+
null,
108106
Vhost,
109107
_User,
110108
ConnPid) ->
@@ -123,7 +121,7 @@ handle_http_req(<<"DELETE">>,
123121
handle_http_req(<<"DELETE">>,
124122
[<<"queues">>, QNameBinQ],
125123
_Query,
126-
[undefined],
124+
null,
127125
Vhost,
128126
#user{username = Username},
129127
ConnPid) ->
@@ -138,7 +136,7 @@ handle_http_req(<<"DELETE">>,
138136
handle_http_req(<<"DELETE">>,
139137
[<<"exchanges">>, XNameBinQ],
140138
_Query,
141-
[undefined],
139+
null,
142140
Vhost,
143141
#user{username = Username},
144142
_ConnPid) ->
@@ -154,13 +152,13 @@ handle_http_req(<<"DELETE">>,
154152
end,
155153
Props = #'v1_0.properties'{subject = {utf8, <<"204">>}},
156154
AppProps = #'v1_0.application_properties'{content = []},
157-
RespPayload = undefined,
155+
RespPayload = null,
158156
{Props, AppProps, RespPayload};
159157

160158
handle_http_req(<<"POST">>,
161159
[<<"bindings">>],
162160
_Query,
163-
[ReqPayload],
161+
ReqPayload,
164162
Vhost,
165163
#user{username = Username},
166164
_ConnPid) ->
@@ -185,13 +183,13 @@ handle_http_req(<<"POST">>,
185183
Location = compose_binding_uri(SrcXNameBin, DstKind, DstNameBin, BindingKey, Args),
186184
AppProps = #'v1_0.application_properties'{
187185
content = [{{utf8, <<"location">>}, {utf8, Location}}]},
188-
RespPayload = undefined,
186+
RespPayload = null,
189187
{Props, AppProps, RespPayload};
190188

191189
handle_http_req(<<"DELETE">>,
192190
[<<"bindings">>, BindingSegment],
193191
_Query,
194-
[undefined],
192+
null,
195193
Vhost,
196194
#user{username = Username},
197195
_ConnPid) ->
@@ -207,14 +205,14 @@ handle_http_req(<<"DELETE">>,
207205
end,
208206
Props = #'v1_0.properties'{subject = {utf8, <<"204">>}},
209207
AppProps = #'v1_0.application_properties'{content = []},
210-
RespPayload = undefined,
208+
RespPayload = null,
211209
{Props, AppProps, RespPayload};
212210

213211
handle_http_req(<<"GET">>,
214212
[<<"bindings">>],
215213
QueryMap = #{<<"src">> := SrcXNameBin,
216214
<<"key">> := Key},
217-
[undefined],
215+
null,
218216
Vhost,
219217
_User,
220218
_ConnPid) ->
@@ -320,8 +318,8 @@ decode_req([], Acc) ->
320318
Acc;
321319
decode_req([#'v1_0.properties'{} = P | Rem], Acc) ->
322320
decode_req(Rem, setelement(1, Acc, P));
323-
decode_req([#'v1_0.amqp_value'{content = {binary, B}} | Rem], Acc) ->
324-
decode_req(Rem, setelement(2, Acc, B));
321+
decode_req([#'v1_0.amqp_value'{content = C} | Rem], Acc) ->
322+
decode_req(Rem, setelement(2, Acc, C));
325323
decode_req([_IgnoreSection | Rem], Acc) ->
326324
decode_req(Rem, Acc).
327325

deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,7 @@ declare_queue(LinkPair, QueueProperties) ->
151151
{ok, Resp} ->
152152
case amqp10_msg:properties(Resp) of
153153
#{subject := <<"201">>} ->
154-
#'v1_0.amqp_value'{content = {binary, RespBin}} = amqp10_msg:body(Resp),
155-
[undefined] = amqp10_framing:decode_bin(RespBin),
154+
#'v1_0.amqp_value'{content = null} = amqp10_msg:body(Resp),
156155
% {ok, proplists:to_map(KVList)};
157156
{ok, #{}};
158157
Other ->
@@ -234,12 +233,11 @@ unbind(DestinationChar, LinkPair, Destination, Source, BindingKey, BindingArgume
234233
Props = #{subject => <<"GET">>,
235234
to => Uri0},
236235

237-
case request(LinkPair, Props, undefined) of
236+
case request(LinkPair, Props, null) of
238237
{ok, Resp} ->
239238
case amqp10_msg:properties(Resp) of
240239
#{subject := <<"200">>} ->
241-
#'v1_0.amqp_value'{content = {binary, RespBin}} = amqp10_msg:body(Resp),
242-
[{list, Bindings}] = amqp10_framing:decode_bin(RespBin),
240+
#'v1_0.amqp_value'{content = {list, Bindings}} = amqp10_msg:body(Resp),
243241
case search_binding_uri(BindingArguments, Bindings) of
244242
{ok, Uri} ->
245243
delete_binding(LinkPair, Uri);
@@ -277,7 +275,7 @@ search_binding_uri(BindingArguments, [{map, Binding} | Bindings]) ->
277275
delete_binding(LinkPair, BindingUri) ->
278276
Props = #{subject => <<"DELETE">>,
279277
to => BindingUri},
280-
case request(LinkPair, Props, undefined) of
278+
case request(LinkPair, Props, null) of
281279
{ok, Resp} ->
282280
case amqp10_msg:properties(Resp) of
283281
#{subject := <<"204">>} ->
@@ -306,15 +304,15 @@ purge_or_delete_queue(LinkPair, QueueName, PathSuffix) ->
306304
HttpRequestTarget = <<"/queues/", QNameQuoted/binary, PathSuffix/binary>>,
307305
Props = #{subject => <<"DELETE">>,
308306
to => HttpRequestTarget},
309-
case request(LinkPair, Props, undefined) of
307+
case request(LinkPair, Props, null) of
310308
{ok, Resp} ->
311309
case amqp10_msg:properties(Resp) of
312310
#{subject := <<"200">>} ->
313-
#'v1_0.amqp_value'{content = {binary, RespBin}} = amqp10_msg:body(Resp),
314-
[{map, [
315-
{{utf8, <<"message_count">>}, {ulong, Count}}
316-
]
317-
}] = amqp10_framing:decode_bin(RespBin),
311+
#'v1_0.amqp_value'{content = Content} = amqp10_msg:body(Resp),
312+
{map, [
313+
{{utf8, <<"message_count">>}, {ulong, Count}}
314+
]
315+
} = Content,
318316
{ok, #{message_count => Count}};
319317
_ ->
320318
{error, Resp}
@@ -355,8 +353,7 @@ declare_exchange(LinkPair, ExchangeProperties) ->
355353
{ok, Resp} ->
356354
case amqp10_msg:properties(Resp) of
357355
#{subject := <<"201">>} ->
358-
#'v1_0.amqp_value'{content = {binary, RespBin}} = amqp10_msg:body(Resp),
359-
[undefined] = amqp10_framing:decode_bin(RespBin),
356+
#'v1_0.amqp_value'{content = null} = amqp10_msg:body(Resp),
360357
ok;
361358
_ ->
362359
{error, Resp}
@@ -371,7 +368,7 @@ delete_exchange(LinkPair, ExchangeName) ->
371368
XNameQuoted = uri_string:quote(ExchangeName),
372369
Props = #{subject => <<"DELETE">>,
373370
to => <<"/exchanges/", XNameQuoted/binary>>},
374-
case request(LinkPair, Props, undefined) of
371+
case request(LinkPair, Props, null) of
375372
{ok, Resp} ->
376373
case amqp10_msg:properties(Resp) of
377374
#{subject := <<"204">>} ->
@@ -383,15 +380,14 @@ delete_exchange(LinkPair, ExchangeName) ->
383380
Err
384381
end.
385382

386-
-spec request(link_pair(), amqp10_msg:amqp10_properties(), undefined | amqp10_prim()) ->
383+
-spec request(link_pair(), amqp10_msg:amqp10_properties(), amqp10_prim()) ->
387384
{ok, Response :: amqp10_msg:amqp10_msg()} | {error, term()}.
388385
request(#link_pair{outgoing_link = OutgoingLink,
389386
incoming_link = IncomingLink}, Properties, Body) ->
390387
MessageId = message_id(),
391388
Properties1 = Properties#{message_id => {binary, MessageId},
392389
reply_to => <<"$me">>},
393-
BodyBin = iolist_to_binary(amqp10_framing:encode_bin(Body)),
394-
Request = amqp10_msg:new(<<>>, #'v1_0.amqp_value'{content = {binary, BodyBin}}, true),
390+
Request = amqp10_msg:new(<<>>, #'v1_0.amqp_value'{content = Body}, true),
395391
Request1 = amqp10_msg:set_properties(Properties1, Request),
396392
ok = amqp10_client:flow_link_credit(IncomingLink, 1, never),
397393
case amqp10_client:send_msg(OutgoingLink, Request1) of

0 commit comments

Comments
 (0)