Skip to content

Commit 62009dc

Browse files
authored
Translate AMQP 0.9.1 CC headers to AMQP 1.0 x-cc (#9321)
* Translate AMQP 0.9.1 CC headers to AMQP 1.0 x-cc Translate AMQP 0.9.1 CC headers to AMQP 1.0 x-cc message annotations. We want CC headers to be kept an AMQP legacy feature and therefore special case its conversion to AMQP 1.0. * Translate x-cc from 1.0 message annotation to 091 CC header for the case where you publish via 091 with CC to a stream and consume via 091 from a stream in which case the 091 consuming client would like to know the original CC headers.
1 parent c56f2e2 commit 62009dc

File tree

3 files changed

+61
-18
lines changed

3 files changed

+61
-18
lines changed

deps/rabbit/src/mc_amqp.erl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,17 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
441441
lists:foldl(
442442
fun ({{symbol, <<"x-routing-key">>},
443443
{utf8, Key}}, Acc) ->
444-
Acc#{routing_keys => [Key]};
444+
maps:update_with(routing_keys,
445+
fun(L) -> [Key | L] end,
446+
[Key],
447+
Acc);
448+
({{symbol, <<"x-cc">>},
449+
{list, CCs0}}, Acc) ->
450+
CCs = [CC || {_T, CC} <- CCs0],
451+
maps:update_with(routing_keys,
452+
fun(L) -> L ++ CCs end,
453+
CCs,
454+
Acc);
445455
({{symbol, <<"x-exchange">>},
446456
{utf8, Exchange}}, Acc) ->
447457
Acc#{exchange => Exchange};

deps/rabbit/src/mc_amqpl.erl

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,17 @@ convert_from(mc_amqp, Sections) ->
143143
Headers0 = [to_091(K, V) || {{utf8, K}, V} <- AP,
144144
byte_size(K) =< ?AMQP_LEGACY_FIELD_NAME_MAX_LEN],
145145
%% Add remaining message annotations as headers?
146-
XHeaders = [to_091(K, V) || {{symbol, K}, V} <- MA,
147-
not is_internal_header(K),
148-
byte_size(K) =< ?AMQP_LEGACY_FIELD_NAME_MAX_LEN],
146+
XHeaders = lists:filtermap(fun({{symbol, <<"x-cc">>}, V}) ->
147+
{true, to_091(<<"CC">>, V)};
148+
({{symbol, K}, V})
149+
when byte_size(K) =< ?AMQP_LEGACY_FIELD_NAME_MAX_LEN ->
150+
case is_internal_header(K) of
151+
false -> {true, to_091(K, V)};
152+
true -> false
153+
end;
154+
(_) ->
155+
false
156+
end, MA),
149157
{Headers1, MsgId091} = message_id(MsgId, <<"x-message-id">>, Headers0),
150158
{Headers, CorrId091} = message_id(CorrId, <<"x-correlation-id">>, Headers1),
151159

@@ -327,14 +335,18 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content) ->
327335
%% x- headers are stored as message annotations
328336
MA = case amqp10_section_header(?AMQP10_MESSAGE_ANNOTATIONS_HEADER, Headers) of
329337
undefined ->
330-
MAC0 = [{{symbol, K}, from_091(T, V)}
331-
|| {K, T, V} <- Headers,
332-
mc_util:is_x_header(K),
333-
%% all message annotation keys need to be either a symbol or ulong
334-
%% but 0.9.1 field-table names are always strings
335-
is_binary(K)
336-
],
337-
338+
MAC0 = lists:filtermap(
339+
fun({<<"x-", _/binary>> = K, T, V}) ->
340+
%% All message annotation keys need to be either a symbol or ulong
341+
%% but 0.9.1 field-table names are always strings.
342+
{true, {{symbol, K}, from_091(T, V)}};
343+
({<<"CC">>, T = array, V}) ->
344+
%% Special case the 0.9.1 CC header into 1.0 message annotations because
345+
%% 1.0 application properties must not contain list or array values.
346+
{true, {{symbol, <<"x-cc">>}, from_091(T, V)}};
347+
(_) ->
348+
false
349+
end, Headers),
338350
%% `type' doesn't have a direct equivalent so adding as
339351
%% a message annotation here
340352
MAC = map_add(symbol, <<"x-basic-type">>, utf8, Type, MAC0),

deps/rabbit/test/mc_SUITE.erl

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ all() ->
1717
{group, tests}
1818
].
1919

20+
groups() ->
21+
[
22+
{tests, [shuffle], all_tests()}
23+
].
2024

2125
all_tests() ->
2226
[
@@ -26,16 +30,12 @@ all_tests() ->
2630
amqpl_table_x_header_array_of_tbls,
2731
amqpl_death_records,
2832
amqpl_amqp_bin_amqpl,
33+
amqpl_cc_amqp_bin_amqpl,
2934
amqp_amqpl,
3035
amqp_to_amqpl_data_body,
3136
amqp_amqpl_amqp_bodies
3237
].
3338

34-
groups() ->
35-
[
36-
{tests, [], all_tests()}
37-
].
38-
3939
%%%===================================================================
4040
%%% Test cases
4141
%%%===================================================================
@@ -325,9 +325,30 @@ amqpl_amqp_bin_amqpl(_Config) ->
325325
?assertEqual(1, mc:ttl(MsgL2)),
326326
?assertEqual({utf8, <<"apple">>}, mc:x_header(<<"x-stream-filter">>, MsgL2)),
327327
?assertEqual(RoutingHeaders, mc:routing_headers(MsgL2, [])),
328-
329328
ok.
330329

330+
amqpl_cc_amqp_bin_amqpl(_Config) ->
331+
Headers = [{<<"CC">>, array, [{longstr, <<"q1">>},
332+
{longstr, <<"q2">>}]}],
333+
Props = #'P_basic'{headers = Headers},
334+
Content = #content{properties = Props,
335+
payload_fragments_rev = [<<"data">>]},
336+
X = rabbit_misc:r(<<"/">>, exchange, <<"exch">>),
337+
Msg = mc_amqpl:message(X, <<"apple">>, Content, #{}, true),
338+
339+
RoutingKeys = [<<"apple">>, <<"q1">>, <<"q2">>],
340+
?assertEqual(RoutingKeys, mc:get_annotation(routing_keys, Msg)),
341+
342+
Msg10Pre = mc:convert(mc_amqp, Msg),
343+
Sections = amqp10_framing:decode_bin(
344+
iolist_to_binary(amqp_serialize(Msg10Pre))),
345+
Msg10 = mc:init(mc_amqp, Sections, #{}),
346+
?assertEqual(RoutingKeys, mc:get_annotation(routing_keys, Msg10)),
347+
348+
MsgL2 = mc:convert(mc_amqpl, Msg10),
349+
?assertEqual(RoutingKeys, mc:get_annotation(routing_keys, MsgL2)),
350+
?assertMatch(#content{properties = #'P_basic'{headers = Headers}},
351+
mc:protocol_state(MsgL2)).
331352

332353
thead2(T, Value) ->
333354
{symbol(atom_to_binary(T)), {T, Value}}.

0 commit comments

Comments
 (0)