Skip to content

Commit f67058a

Browse files
authored
Merge pull request #9830 from rabbitmq/mc-refinements
Message container conversion improvements
2 parents 14ffd57 + 61f13d0 commit f67058a

File tree

15 files changed

+713
-339
lines changed

15 files changed

+713
-339
lines changed

deps/rabbit/include/mc.hrl

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
%% good enough for most use cases
1818
-define(IS_MC(Msg), element(1, Msg) == mc andalso tuple_size(Msg) == 5).
1919

20-
%% "Field names MUST start with a letter, '$' or '#' and may continue with letters, '$' or '#', digits, or
21-
%% underlines, to a maximum length of 128 characters." [AMQP 0.9.1 4.2.5.5 Field Tables]
22-
%% Given that the valid chars are ASCII chars, 1 char is encoded as 1 byte.
23-
-define(AMQP_LEGACY_FIELD_NAME_MAX_LEN, 128).
20+
%% "Short strings can carry up to 255 octets of UTF-8 data, but
21+
%% may not contain binary zero octets." [AMQP 0.9.1 $4.2.5.3]
22+
-define(IS_SHORTSTR_LEN(B), byte_size(B) < 256).

deps/rabbit/src/mc.erl

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
-export([
44
init/3,
5+
init/4,
56
size/1,
67
is/1,
78
get_annotation/2,
@@ -19,6 +20,7 @@
1920
routing_headers/2,
2021
%%
2122
convert/2,
23+
convert/3,
2224
protocol_state/1,
2325
prepare/2,
2426
record_death/3,
@@ -36,6 +38,7 @@
3638
-type protocol() :: module().
3739
-type annotations() :: #{internal_ann_key() => term(),
3840
x_ann_key() => x_ann_value()}.
41+
-type environment() :: #{atom() => term()}.
3942
-type ann_key() :: internal_ann_key() | x_ann_key().
4043
-type ann_value() :: term().
4144

@@ -107,12 +110,12 @@
107110

108111
%% Convert state to another protocol
109112
%% all protocols must be able to convert to mc_amqp (AMQP 1.0)
110-
-callback convert_to(Target :: protocol(), proto_state()) ->
113+
-callback convert_to(Target :: protocol(), proto_state(), environment()) ->
111114
proto_state() | not_implemented.
112115

113116
%% Convert from another protocol
114117
%% all protocols must be able to convert from mc_amqp (AMQP 1.0)
115-
-callback convert_from(Source :: protocol(), proto_state()) ->
118+
-callback convert_from(Source :: protocol(), proto_state(), environment()) ->
116119
proto_state() | not_implemented.
117120

118121
%% emit a protocol specific state package
@@ -128,10 +131,21 @@
128131
%%% API
129132

130133
-spec init(protocol(), term(), annotations()) -> state().
131-
init(Proto, Data, Anns)
134+
init(Proto, Data, Anns) ->
135+
init(Proto, Data, Anns, #{}).
136+
137+
-spec init(protocol(), term(), annotations(), environment()) -> state().
138+
init(Proto, Data, Anns0, Env)
132139
when is_atom(Proto)
133-
andalso is_map(Anns) ->
140+
andalso is_map(Anns0)
141+
andalso is_map(Env) ->
134142
{ProtoData, ProtoAnns} = Proto:init(Data),
143+
Anns = case maps:size(Env) == 0 of
144+
true ->
145+
Anns0;
146+
false ->
147+
Anns0#{env => Env}
148+
end,
135149
#?MODULE{protocol = Proto,
136150
data = ProtoData,
137151
annotations = maps:merge(ProtoAnns, Anns)}.
@@ -265,18 +279,26 @@ set_ttl(Value, BasicMsg) ->
265279
mc_compat:set_ttl(Value, BasicMsg).
266280

267281
-spec convert(protocol(), state()) -> state().
268-
convert(Proto, #?MODULE{protocol = Proto} = State) ->
282+
convert(Proto, State) ->
283+
convert(Proto, State, #{}).
284+
285+
-spec convert(protocol(), state(), environment()) -> state().
286+
convert(Proto, #?MODULE{protocol = Proto} = State, _Env) ->
269287
State;
270288
convert(TargetProto, #?MODULE{protocol = SourceProto,
271-
data = Data0} = State) ->
289+
annotations = Anns,
290+
data = Data0} = State,
291+
TargetEnv) ->
272292
Data = SourceProto:prepare(read, Data0),
293+
SourceEnv = maps:get(env, Anns, #{}),
294+
Env = maps:merge(SourceEnv, TargetEnv),
273295
TargetState =
274-
case SourceProto:convert_to(TargetProto, Data) of
296+
case SourceProto:convert_to(TargetProto, Data, Env) of
275297
not_implemented ->
276-
case TargetProto:convert_from(SourceProto, Data) of
298+
case TargetProto:convert_from(SourceProto, Data, Env) of
277299
not_implemented ->
278-
AmqpData = SourceProto:convert_to(mc_amqp, Data),
279-
mc_amqp:convert_to(TargetProto, AmqpData);
300+
AmqpData = SourceProto:convert_to(mc_amqp, Data, Env),
301+
mc_amqp:convert_to(TargetProto, AmqpData, Env);
280302
TargetState0 ->
281303
TargetState0
282304
end;
@@ -285,7 +307,7 @@ convert(TargetProto, #?MODULE{protocol = SourceProto,
285307
end,
286308
State#?MODULE{protocol = TargetProto,
287309
data = TargetState};
288-
convert(Proto, BasicMsg) ->
310+
convert(Proto, BasicMsg, _Env) ->
289311
mc_compat:convert_to(Proto, BasicMsg).
290312

291313
-spec protocol_state(state()) -> term().

deps/rabbit/src/mc_amqp.erl

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
property/2,
1212
routing_headers/2,
1313
get_property/2,
14-
convert_to/2,
15-
convert_from/2,
14+
convert_to/3,
15+
convert_from/3,
1616
protocol_state/2,
1717
serialize/1,
1818
prepare/2
@@ -70,9 +70,9 @@ init(#msg{} = Msg) ->
7070
Anns = essential_properties(Msg),
7171
{Msg, Anns}.
7272

73-
convert_from(?MODULE, Sections) ->
73+
convert_from(?MODULE, Sections, _Env) ->
7474
element(1, init(Sections));
75-
convert_from(_SourceProto, _) ->
75+
convert_from(_SourceProto, _, _Env) ->
7676
not_implemented.
7777

7878
size(#msg{data = Body}) ->
@@ -177,10 +177,12 @@ get_property(priority, Msg) ->
177177
get_property(_P, _Msg) ->
178178
undefined.
179179

180-
convert_to(?MODULE, Msg) ->
180+
convert_to(?MODULE, Msg, _Env) ->
181181
Msg;
182-
convert_to(TargetProto, Msg) ->
183-
TargetProto:convert_from(?MODULE, msg_to_sections(Msg, fun (X) -> X end)).
182+
convert_to(TargetProto, Msg, Env) ->
183+
TargetProto:convert_from(?MODULE,
184+
msg_to_sections(Msg, fun (X) -> X end),
185+
Env).
184186

185187
serialize(Sections) ->
186188
encode_bin(Sections).

deps/rabbit/src/mc_amqpl.erl

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
size/1,
1313
x_header/2,
1414
routing_headers/2,
15-
convert_to/2,
16-
convert_from/2,
15+
convert_to/3,
16+
convert_from/3,
1717
protocol_state/2,
1818
property/2,
1919
set_property/3,
@@ -54,7 +54,7 @@ init(#content{} = Content0) ->
5454
Anns = essential_properties(Content),
5555
{strip_header(Content, ?DELETED_HEADER), Anns}.
5656

57-
convert_from(mc_amqp, Sections) ->
57+
convert_from(mc_amqp, Sections, _Env) ->
5858
{H, MAnn, Prop, AProp, BodyRev} =
5959
lists:foldl(
6060
fun
@@ -144,15 +144,17 @@ convert_from(mc_amqp, Sections) ->
144144
end,
145145

146146
Headers0 = [to_091(K, V) || {{utf8, K}, V} <- AP,
147-
byte_size(K) =< ?AMQP_LEGACY_FIELD_NAME_MAX_LEN],
148-
%% Add remaining message annotations as headers?
147+
?IS_SHORTSTR_LEN(K)],
148+
%% Add remaining x- message annotations as headers
149149
XHeaders = lists:filtermap(fun({{symbol, <<"x-cc">>}, V}) ->
150150
{true, to_091(<<"CC">>, V)};
151-
({{symbol, K}, V})
152-
when byte_size(K) =< ?AMQP_LEGACY_FIELD_NAME_MAX_LEN ->
151+
({{symbol, <<"x-", _/binary>> = K}, V})
152+
when ?IS_SHORTSTR_LEN(K) ->
153153
case is_internal_header(K) of
154-
false -> {true, to_091(K, V)};
155-
true -> false
154+
false ->
155+
{true, to_091(K, V)};
156+
true ->
157+
false
156158
end;
157159
(_) ->
158160
false
@@ -161,6 +163,8 @@ convert_from(mc_amqp, Sections) ->
161163
{Headers, CorrId091} = message_id(CorrId, <<"x-correlation-id">>, Headers1),
162164

163165
UserId1 = unwrap(UserId0),
166+
%% user-id is a binary type so we need to validate
167+
%% if we can use it as is
164168
UserId = case mc_util:is_valid_shortstr(UserId1) of
165169
true ->
166170
UserId1;
@@ -177,9 +181,9 @@ convert_from(mc_amqp, Sections) ->
177181
[] -> undefined;
178182
AllHeaders -> AllHeaders
179183
end,
180-
reply_to = unwrap(ReplyTo0),
184+
reply_to = unwrap_shortstr(ReplyTo0),
181185
type = Type,
182-
app_id = unwrap(GroupId),
186+
app_id = unwrap_shortstr(GroupId),
183187
priority = Priority,
184188
correlation_id = CorrId091,
185189
content_type = unwrap(ContentType),
@@ -190,7 +194,7 @@ convert_from(mc_amqp, Sections) ->
190194
properties = BP,
191195
properties_bin = none,
192196
payload_fragments_rev = PayloadRev};
193-
convert_from(_SourceProto, _) ->
197+
convert_from(_SourceProto, _, _) ->
194198
not_implemented.
195199

196200
size(#content{properties_bin = PropsBin,
@@ -263,11 +267,11 @@ prepare(store, Content) ->
263267
rabbit_binary_parser:clear_decoded_content(
264268
rabbit_binary_generator:ensure_content_encoded(Content, ?PROTOMOD)).
265269

266-
convert_to(?MODULE, Content) ->
270+
convert_to(?MODULE, Content, _Env) ->
267271
Content;
268-
convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content) ->
272+
convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content, Env) ->
269273
#content{properties = Props} = prepare(read, Content),
270-
#'P_basic'{message_id = MsgId,
274+
#'P_basic'{message_id = MsgId0,
271275
expiration = Expiration,
272276
delivery_mode = DelMode,
273277
headers = Headers0,
@@ -276,7 +280,7 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content) ->
276280
type = Type,
277281
priority = Priority,
278282
app_id = AppId,
279-
correlation_id = CorrId,
283+
correlation_id = CorrId0,
280284
content_type = ContentType,
281285
content_encoding = ContentEncoding,
282286
timestamp = Timestamp} = Props,
@@ -297,21 +301,34 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content) ->
297301
undefined ->
298302
undefined;
299303
_ ->
304+
%% Channel already checked for valid integer.
300305
binary_to_integer(Expiration)
301306
end,
302307

303308
H = #'v1_0.header'{durable = DelMode =:= 2,
304309
ttl = wrap(uint, Ttl),
305310
%% TODO: check Priority is a ubyte?
306311
priority = wrap(ubyte, Priority)},
312+
CorrId = case mc_util:urn_string_to_uuid(CorrId0) of
313+
{ok, CorrUUID} ->
314+
{uuid, CorrUUID};
315+
_ ->
316+
wrap(utf8, CorrId0)
317+
end,
318+
MsgId = case mc_util:urn_string_to_uuid(MsgId0) of
319+
{ok, MsgUUID} ->
320+
{uuid, MsgUUID};
321+
_ ->
322+
wrap(utf8, MsgId0)
323+
end,
307324
P = case amqp10_section_header(?AMQP10_PROPERTIES_HEADER, Headers) of
308325
undefined ->
309-
#'v1_0.properties'{message_id = wrap(utf8, MsgId),
326+
#'v1_0.properties'{message_id = MsgId,
310327
user_id = wrap(binary, UserId),
311328
to = undefined,
312329
% subject = wrap(utf8, RKey),
313330
reply_to = wrap(utf8, ReplyTo),
314-
correlation_id = wrap(utf8, CorrId),
331+
correlation_id = CorrId,
315332
content_type = wrap(symbol, ContentType),
316333
content_encoding = wrap(symbol, ContentEncoding),
317334
creation_time = wrap(timestamp, ConvertedTs),
@@ -367,8 +384,8 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content) ->
367384
end,
368385

369386
Sections = [H, MA, P, AP | BodySections],
370-
mc_amqp:convert_from(mc_amqp, Sections);
371-
convert_to(_TargetProto, _Content) ->
387+
mc_amqp:convert_from(mc_amqp, Sections, Env);
388+
convert_to(_TargetProto, _Content, _Env) ->
372389
not_implemented.
373390

374391
protocol_state(#content{properties = #'P_basic'{headers = H00} = B0} = C,
@@ -606,7 +623,15 @@ unwrap({timestamp, V}) ->
606623
unwrap({_Type, V}) ->
607624
V.
608625

609-
to_091(Key, {utf8, V}) when is_binary(V) -> {Key, longstr, V};
626+
unwrap_shortstr({utf8, V})
627+
when is_binary(V) andalso
628+
?IS_SHORTSTR_LEN(V) ->
629+
V;
630+
unwrap_shortstr(_) ->
631+
undefined.
632+
633+
to_091(Key, {utf8, V}) -> {Key, longstr, V};
634+
to_091(Key, {symbol, V}) -> {Key, longstr, V};
610635
to_091(Key, {long, V}) -> {Key, long, V};
611636
to_091(Key, {ulong, V}) -> {Key, long, V}; %% TODO: we could try to constrain this
612637
to_091(Key, {byte, V}) -> {Key, byte, V};
@@ -630,6 +655,7 @@ to_091(Key, {map, M}) ->
630655
{Key, table, [to_091(unwrap(K), V) || {K, V} <- M]}.
631656

632657
to_091({utf8, V}) -> {longstr, V};
658+
to_091({symbol, V}) -> {longstr, V};
633659
to_091({long, V}) -> {long, V};
634660
to_091({byte, V}) -> {byte, V};
635661
to_091({ubyte, V}) -> {unsignedbyte, V};
@@ -652,17 +678,17 @@ to_091({map, M}) ->
652678
{table, [to_091(unwrap(K), V) || {K, V} <- M]}.
653679

654680
message_id({uuid, UUID}, _HKey, H0) ->
655-
{H0, mc_util:uuid_to_string(UUID)};
681+
{H0, mc_util:uuid_to_urn_string(UUID)};
656682
message_id({ulong, N}, _HKey, H0) ->
657683
{H0, erlang:integer_to_binary(N)};
658684
message_id({binary, B}, HKey, H0) ->
659-
{[{HKey, longstr, B} | H0], undefined};
685+
{[{HKey, binary, B} | H0], undefined};
660686
message_id({utf8, S}, HKey, H0) ->
661-
case byte_size(S) > 255 of
687+
case ?IS_SHORTSTR_LEN(S) of
662688
true ->
663-
{[{HKey, longstr, S} | H0], undefined};
689+
{H0, S};
664690
false ->
665-
{H0, S}
691+
{[{HKey, longstr, S} | H0], undefined}
666692
end;
667693
message_id(undefined, _HKey, H) ->
668694
{H, undefined}.

0 commit comments

Comments
 (0)