Skip to content

Commit 1ecaaad

Browse files
authored
Merge pull request #10964 from rabbitmq/amqp-parser
4.x: change AMQP on disk message format & speed up the AMQP parser
2 parents d486563 + d42e110 commit 1ecaaad

39 files changed

+2520
-1827
lines changed

deps/amqp10_client/src/amqp10_client_frame_reader.erl

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -262,11 +262,17 @@ handle_input(expecting_frame_body, Data,
262262
{<<_:BodyLength/binary, Rest/binary>>, 0} ->
263263
% heartbeat
264264
handle_input(expecting_frame_header, Rest, State);
265-
{<<FrameBody:BodyLength/binary, Rest/binary>>, _} ->
265+
{<<Body:BodyLength/binary, Rest/binary>>, _} ->
266266
State1 = State#state{frame_state = undefined},
267-
{PerfDesc, Payload} = amqp10_binary_parser:parse(FrameBody),
268-
Perf = amqp10_framing:decode(PerfDesc),
269-
State2 = route_frame(Channel, FrameType, {Perf, Payload}, State1),
267+
BytesBody = size(Body),
268+
{DescribedPerformative, BytesParsed} = amqp10_binary_parser:parse(Body),
269+
Performative = amqp10_framing:decode(DescribedPerformative),
270+
Payload = if BytesParsed < BytesBody ->
271+
binary_part(Body, BytesParsed, BytesBody - BytesParsed);
272+
BytesParsed =:= BytesBody ->
273+
no_payload
274+
end,
275+
State2 = route_frame(Channel, FrameType, {Performative, Payload}, State1),
270276
handle_input(expecting_frame_header, Rest, State2);
271277
_ ->
272278
{ok, expecting_frame_body, Data, State}
@@ -294,8 +300,8 @@ route_frame(Channel, FrameType, {Performative, Payload} = Frame, State0) ->
294300
State0),
295301
?DBG("FRAME -> ~tp ~tp~n ~tp", [Channel, DestinationPid, Performative]),
296302
case Payload of
297-
<<>> -> ok = gen_statem:cast(DestinationPid, Performative);
298-
_ -> ok = gen_statem:cast(DestinationPid, Frame)
303+
no_payload -> gen_statem:cast(DestinationPid, Performative);
304+
_ -> gen_statem:cast(DestinationPid, Frame)
299305
end,
300306
State.
301307

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 157 additions & 49 deletions
Large diffs are not rendered by default.

deps/amqp10_client/src/amqp10_msg.erl

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,16 @@
108108
-spec from_amqp_records([amqp10_client_types:amqp10_msg_record()]) ->
109109
amqp10_msg().
110110
from_amqp_records([#'v1_0.transfer'{} = Transfer | Records]) ->
111-
lists:foldl(fun parse_from_amqp/2, #amqp10_msg{transfer = Transfer,
112-
body = unset}, Records).
111+
case lists:foldl(fun parse_from_amqp/2,
112+
#amqp10_msg{transfer = Transfer,
113+
body = unset},
114+
Records) of
115+
#amqp10_msg{body = Body} = Msg
116+
when is_list(Body) ->
117+
Msg#amqp10_msg{body = lists:reverse(Body)};
118+
Msg ->
119+
Msg
120+
end.
113121

114122
-spec to_amqp_records(amqp10_msg()) -> [amqp10_client_types:amqp10_msg_record()].
115123
to_amqp_records(#amqp10_msg{transfer = T,
@@ -327,7 +335,7 @@ set_properties(Props, #amqp10_msg{properties = Current} = Msg) ->
327335
P = maps:fold(fun(message_id, {T, _V} = TypeVal, Acc) when T =:= ulong orelse
328336
T =:= uuid orelse
329337
T =:= binary orelse
330-
T =:= uf8 ->
338+
T =:= utf8 ->
331339
Acc#'v1_0.properties'{message_id = TypeVal};
332340
(message_id, V, Acc) when is_binary(V) ->
333341
%% backward compat clause
@@ -414,15 +422,17 @@ wrap_ap_value(true) ->
414422
{boolean, true};
415423
wrap_ap_value(false) ->
416424
{boolean, false};
417-
wrap_ap_value(V) when is_integer(V) ->
418-
{uint, V};
419425
wrap_ap_value(V) when is_binary(V) ->
420426
utf8(V);
421427
wrap_ap_value(V) when is_list(V) ->
422428
utf8(list_to_binary(V));
423429
wrap_ap_value(V) when is_atom(V) ->
424-
utf8(atom_to_list(V)).
425-
430+
utf8(atom_to_binary(V));
431+
wrap_ap_value(V) when is_integer(V) ->
432+
case V < 0 of
433+
true -> {int, V};
434+
false -> {uint, V}
435+
end.
426436

427437
%% LOCAL
428438
header_value(durable, undefined) -> false;
@@ -444,10 +454,16 @@ parse_from_amqp(#'v1_0.application_properties'{} = APs, AmqpMsg) ->
444454
AmqpMsg#amqp10_msg{application_properties = APs};
445455
parse_from_amqp(#'v1_0.amqp_value'{} = Value, AmqpMsg) ->
446456
AmqpMsg#amqp10_msg{body = Value};
447-
parse_from_amqp(#'v1_0.amqp_sequence'{} = Seq, AmqpMsg) ->
448-
AmqpMsg#amqp10_msg{body = [Seq]};
449-
parse_from_amqp(#'v1_0.data'{} = Data, AmqpMsg) ->
450-
AmqpMsg#amqp10_msg{body = [Data]};
457+
parse_from_amqp(#'v1_0.amqp_sequence'{} = Seq, AmqpMsg = #amqp10_msg{body = Body0}) ->
458+
Body = if Body0 =:= unset -> [Seq];
459+
is_list(Body0) -> [Seq | Body0]
460+
end,
461+
AmqpMsg#amqp10_msg{body = Body};
462+
parse_from_amqp(#'v1_0.data'{} = Data, AmqpMsg = #amqp10_msg{body = Body0}) ->
463+
Body = if Body0 =:= unset -> [Data];
464+
is_list(Body0) -> [Data | Body0]
465+
end,
466+
AmqpMsg#amqp10_msg{body = Body};
451467
parse_from_amqp(#'v1_0.footer'{} = Header, AmqpMsg) ->
452468
AmqpMsg#amqp10_msg{footer = Header}.
453469

deps/amqp10_client/test/mock_server.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,15 @@ recv(Sock) ->
5252
{ok, <<Length:32/unsigned, 2:8/unsigned,
5353
_/unsigned, Ch:16/unsigned>>} = gen_tcp:recv(Sock, 8),
5454
{ok, Data} = gen_tcp:recv(Sock, Length - 8),
55-
{PerfDesc, Payload} = amqp10_binary_parser:parse(Data),
55+
{PerfDesc, BytesParsed} = amqp10_binary_parser:parse(Data),
5656
Perf = amqp10_framing:decode(PerfDesc),
57+
Payload = binary_part(Data, BytesParsed, size(Data) - BytesParsed),
5758
{Ch, Perf, Payload}.
5859

5960
amqp_step(Fun) ->
6061
fun (Sock) ->
6162
Recv = recv(Sock),
62-
ct:pal("AMQP Step receieved ~tp~n", [Recv]),
63+
ct:pal("AMQP Step received ~tp~n", [Recv]),
6364
case Fun(Recv) of
6465
{_Ch, []} -> ok;
6566
{Ch, {multi, Records}} ->
@@ -81,4 +82,4 @@ send_amqp_header_step(Sock) ->
8182
recv_amqp_header_step(Sock) ->
8283
ct:pal("Receiving AMQP protocol header"),
8384
{ok, R} = gen_tcp:recv(Sock, 8),
84-
ct:pal("handshake Step receieved ~tp~n", [R]).
85+
ct:pal("handshake Step received ~tp~n", [R]).

deps/amqp10_client/test/system_SUITE.erl

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -313,12 +313,17 @@ roundtrip(OpenConf, Body) ->
313313
await_link(Sender, credited, link_credit_timeout),
314314

315315
Now = os:system_time(millisecond),
316-
Props = #{creation_time => Now},
316+
Props = #{creation_time => Now,
317+
message_id => <<"my message ID">>,
318+
correlation_id => <<"my correlation ID">>,
319+
content_type => <<"my content type">>,
320+
content_encoding => <<"my content encoding">>,
321+
group_id => <<"my group ID">>},
317322
Msg0 = amqp10_msg:new(<<"my-tag">>, Body, true),
318-
Msg1 = amqp10_msg:set_properties(Props, Msg0),
319-
Msg2 = amqp10_msg:set_application_properties(#{"a_key" => "a_value"}, Msg1),
320-
Msg3 = amqp10_msg:set_message_annotations(#{<<"x_key">> => "x_value"}, Msg2),
321-
Msg = amqp10_msg:set_delivery_annotations(#{<<"y_key">> => "y_value"}, Msg3),
323+
Msg1 = amqp10_msg:set_application_properties(#{"a_key" => "a_value"}, Msg0),
324+
Msg2 = amqp10_msg:set_properties(Props, Msg1),
325+
Msg = amqp10_msg:set_message_annotations(#{<<"x-key">> => "x-value",
326+
<<"x_key">> => "x_value"}, Msg2),
322327
ok = amqp10_client:send_msg(Sender, Msg),
323328
ok = amqp10_client:detach_link(Sender),
324329
await_link(Sender, {detached, normal}, link_detach_timeout),
@@ -331,10 +336,10 @@ roundtrip(OpenConf, Body) ->
331336
ok = amqp10_client:close_connection(Connection),
332337

333338
% ct:pal(?LOW_IMPORTANCE, "roundtrip message Out: ~tp~nIn: ~tp~n", [OutMsg, Msg]),
334-
#{creation_time := Now} = amqp10_msg:properties(OutMsg),
335-
#{<<"a_key">> := <<"a_value">>} = amqp10_msg:application_properties(OutMsg),
336-
#{<<"x_key">> := <<"x_value">>} = amqp10_msg:message_annotations(OutMsg),
337-
#{<<"y_key">> := <<"y_value">>} = amqp10_msg:delivery_annotations(OutMsg),
339+
?assertMatch(Props, amqp10_msg:properties(OutMsg)),
340+
?assertEqual(#{<<"a_key">> => <<"a_value">>}, amqp10_msg:application_properties(OutMsg)),
341+
?assertMatch(#{<<"x-key">> := <<"x-value">>,
342+
<<"x_key">> := <<"x_value">>}, amqp10_msg:message_annotations(OutMsg)),
338343
?assertEqual([Body], amqp10_msg:body(OutMsg)),
339344
ok.
340345

deps/amqp10_common/.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414
/logs/
1515
/plugins/
1616
/plugins.lock
17-
/rebar.config
1817
/rebar.lock
1918
/sbin/
2019
/sbin.lock
2120
/test/ct.cover.spec
2221
/xrefr
22+
_build
2323

2424
/amqp10_common.d
2525
/*.plt

deps/amqp10_common/BUILD.bazel

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,17 +110,26 @@ dialyze(
110110

111111
rabbitmq_suite(
112112
name = "binary_generator_SUITE",
113+
size = "small",
113114
)
114115

115116
rabbitmq_suite(
116117
name = "binary_parser_SUITE",
118+
size = "small",
117119
)
118120

119121
rabbitmq_suite(
120122
name = "serial_number_SUITE",
121123
size = "small",
122124
)
123125

126+
rabbitmq_suite(
127+
name = "prop_SUITE",
128+
deps = [
129+
"//deps/rabbitmq_ct_helpers:erlang_app",
130+
],
131+
)
132+
124133
assert_suites()
125134

126135
alias(

deps/amqp10_common/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ endef
2626

2727
DIALYZER_OPTS += --src -r test -DTEST
2828
BUILD_DEPS = rabbit_common
29+
TEST_DEPS = rabbitmq_ct_helpers proper
2930

3031
# Variables and recipes in development.*.mk are meant to be used from
3132
# any Git clone. They are excluded from the files published to Hex.pm.

deps/amqp10_common/app.bzl

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,13 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
110110
app_name = "amqp10_common",
111111
erlc_opts = "//:test_erlc_opts",
112112
)
113+
erlang_bytecode(
114+
name = "prop_SUITE_beam_files",
115+
testonly = True,
116+
srcs = ["test/prop_SUITE.erl"],
117+
outs = ["test/prop_SUITE.beam"],
118+
hdrs = ["include/amqp10_framing.hrl"],
119+
app_name = "amqp10_common",
120+
erlc_opts = "//:test_erlc_opts",
121+
deps = ["@proper//:erlang_app"],
122+
)

deps/amqp10_common/rebar.config

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{profiles,
2+
[{test, [{deps, [proper
3+
]}]}
4+
]
5+
}.

deps/amqp10_common/src/amqp10_binary_generator.erl

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
{symbol, binary()} |
4141
{binary, binary()} |
4242
{list, [amqp10_type()]} |
43-
{map, [{amqp10_prim(), amqp10_prim()}]} | %% TODO: make map a map
43+
{map, [{amqp10_prim(), amqp10_prim()}]} |
4444
{array, amqp10_ctor(), [amqp10_type()]}.
4545

4646
-type amqp10_described() ::
@@ -113,16 +113,20 @@ generate1({long, V}) when V<128 andalso V>-129 -> <<16#55,V:8/signed>>;
113113
generate1({long, V}) -> <<16#81,V:64/signed>>;
114114
generate1({float, V}) -> <<16#72,V:32/float>>;
115115
generate1({double, V}) -> <<16#82,V:64/float>>;
116-
generate1({char, V}) -> <<16#73,V:4/binary>>;
116+
generate1({char,V}) when V>=0 andalso V=<16#10ffff -> <<16#73,V:32>>;
117+
%% AMQP timestamp is "64-bit two's-complement integer representing milliseconds since the unix epoch".
118+
%% For small integers (i.e. values that can be stored in a single word),
119+
%% Erlang uses two’s complement to represent the signed integers.
117120
generate1({timestamp,V}) -> <<16#83,V:64/signed>>;
118121
generate1({uuid, V}) -> <<16#98,V:16/binary>>;
119122

120-
generate1({utf8, V}) when size(V) < ?VAR_1_LIMIT -> [16#a1, size(V), V];
121-
generate1({utf8, V}) -> [<<16#b1, (size(V)):32>>, V];
122-
generate1({symbol, V}) -> [16#a3, size(V), V];
123+
generate1({utf8, V}) when size(V) =< ?VAR_1_LIMIT -> [16#a1, size(V), V];
124+
generate1({utf8, V}) -> [<<16#b1, (size(V)):32>>, V];
125+
generate1({symbol, V}) when size(V) =< ?VAR_1_LIMIT -> [16#a3, size(V), V];
126+
generate1({symbol, V}) -> [<<16#b3, (size(V)):32>>, V];
123127
generate1({binary, V}) ->
124128
Size = iolist_size(V),
125-
case Size < ?VAR_1_LIMIT of
129+
case Size =< ?VAR_1_LIMIT of
126130
true ->
127131
[16#a0, Size, V];
128132
false ->
@@ -145,12 +149,12 @@ generate1({list, List}) ->
145149
[16#c0, S + 1, Count, Compound]
146150
end;
147151

148-
generate1({map, ListOfPairs}) ->
149-
Count = length(ListOfPairs) * 2,
152+
generate1({map, KvList}) ->
153+
Count = length(KvList) * 2,
150154
Compound = lists:map(fun ({Key, Val}) ->
151155
[(generate1(Key)),
152156
(generate1(Val))]
153-
end, ListOfPairs),
157+
end, KvList),
154158
S = iolist_size(Compound),
155159
%% See generate1({list, ...}) for an explanation of this test.
156160
if Count >= (256 - 1) orelse (S + 1) >= 256 ->
@@ -168,16 +172,12 @@ generate1({array, Type, List}) ->
168172
if Count >= (256 - 1) orelse (S + 1) >= 256 ->
169173
[<<16#f0, (S + 4):32, Count:32>>, Array];
170174
true ->
171-
[16#e0, S + 1, Count, Array]
175+
[16#e0, S + 1, Count, Array]
172176
end;
173177

174178
generate1({as_is, TypeCode, Bin}) ->
175179
<<TypeCode, Bin>>.
176180

177-
%% TODO again these are a stub to get SASL working. New codec? Will
178-
%% that ever happen? If not we really just need to split generate/1
179-
%% up into things like these...
180-
%% for these constructors map straight-forwardly
181181
constructor(symbol) -> 16#b3;
182182
constructor(ubyte) -> 16#50;
183183
constructor(ushort) -> 16#60;
@@ -194,18 +194,23 @@ constructor(timestamp) -> 16#83;
194194
constructor(uuid) -> 16#98;
195195
constructor(null) -> 16#40;
196196
constructor(boolean) -> 16#56;
197-
constructor(array) -> 16#f0; % use large array type for all nested arrays
197+
constructor(binary) -> 16#b0;
198198
constructor(utf8) -> 16#b1;
199+
constructor(list) -> 16#d0; % use large list type for all array elements
200+
constructor(map) -> 16#d1; % use large map type for all array elements
201+
constructor(array) -> 16#f0; % use large array type for all nested arrays
199202
constructor({described, Descriptor, Primitive}) ->
200203
[16#00, generate1(Descriptor), constructor(Primitive)].
201204

202-
% returns io_list
203205
generate2(symbol, {symbol, V}) -> [<<(size(V)):32>>, V];
204206
generate2(utf8, {utf8, V}) -> [<<(size(V)):32>>, V];
207+
generate2(binary, {binary, V}) -> [<<(size(V)):32>>, V];
205208
generate2(boolean, true) -> 16#01;
206209
generate2(boolean, false) -> 16#00;
207210
generate2(boolean, {boolean, true}) -> 16#01;
208211
generate2(boolean, {boolean, false}) -> 16#00;
212+
generate2(null, null) -> 16#40;
213+
generate2(char, {char,V}) when V>=0 andalso V=<16#10ffff -> <<V:32>>;
209214
generate2(ubyte, {ubyte, V}) -> V;
210215
generate2(byte, {byte, V}) -> <<V:8/signed>>;
211216
generate2(ushort, {ushort, V}) -> <<V:16/unsigned>>;
@@ -214,12 +219,28 @@ generate2(uint, {uint, V}) -> <<V:32/unsigned>>;
214219
generate2(int, {int, V}) -> <<V:32/signed>>;
215220
generate2(ulong, {ulong, V}) -> <<V:64/unsigned>>;
216221
generate2(long, {long, V}) -> <<V:64/signed>>;
222+
generate2(float, {float, V}) -> <<V:32/float>>;
223+
generate2(double, {double, V}) -> <<V:64/float>>;
224+
generate2(timestamp, {timestamp,V}) -> <<V:64/signed>>;
225+
generate2(uuid, {uuid, V}) -> <<V:16/binary>>;
217226
generate2({described, D, P}, {described, D, V}) ->
218227
generate2(P, V);
228+
generate2(list, {list, List}) ->
229+
Count = length(List),
230+
Compound = lists:map(fun generate1/1, List),
231+
S = iolist_size(Compound),
232+
[<<(S + 4):32, Count:32>>, Compound];
233+
generate2(map, {map, KvList}) ->
234+
Count = length(KvList) * 2,
235+
Compound = lists:map(fun ({Key, Val}) ->
236+
[(generate1(Key)),
237+
(generate1(Val))]
238+
end, KvList),
239+
S = iolist_size(Compound),
240+
[<<(S + 4):32, Count:32>>, Compound];
219241
generate2(array, {array, Type, List}) ->
220242
Count = length(List),
221243
Array = [constructor(Type),
222244
[generate2(Type, I) || I <- List]],
223245
S = iolist_size(Array),
224-
%% See generate1({list, ...}) for an explanation of this test.
225246
[<<(S + 4):32, Count:32>>, Array].

0 commit comments

Comments
 (0)