Skip to content

Commit caaa465

Browse files
committed
Track requeue history
Support tracking the requeue history as described in rabbitmq/rabbitmq-website#2095 This commit: 1. adds a test case tracing the requeue history via AMQP 1.0 using the modified outcome and 2. fixes bugs in the broker which crashed if a modified message annotation value is an AMQP 1.0 list, map, or array. Modified annotations are stored as tagged values from now on. These modified annotations can be consumed via AMQP 1.0, but not via AMQP 0.9.1, which is okay.
1 parent d9ff6a0 commit caaa465

File tree

5 files changed

+68
-34
lines changed

5 files changed

+68
-34
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,14 +1178,16 @@ wrap_map_value(true) ->
11781178
{boolean, true};
11791179
wrap_map_value(false) ->
11801180
{boolean, false};
1181-
wrap_map_value(V) when is_integer(V) ->
1182-
{uint, V};
1181+
wrap_map_value(V) when is_integer(V) andalso V >= 0 ->
1182+
uint(V);
11831183
wrap_map_value(V) when is_binary(V) ->
11841184
utf8(V);
11851185
wrap_map_value(V) when is_list(V) ->
11861186
utf8(list_to_binary(V));
11871187
wrap_map_value(V) when is_atom(V) ->
1188-
utf8(atom_to_list(V)).
1188+
utf8(atom_to_list(V));
1189+
wrap_map_value(TaggedValue) when is_atom(element(1, TaggedValue)) ->
1190+
TaggedValue.
11891191

11901192
utf8(V) -> amqp10_client_types:utf8(V).
11911193

deps/rabbit/src/mc.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
-type str() :: atom() | string() | binary().
4545
-type internal_ann_key() :: atom().
4646
-type x_ann_key() :: binary(). %% should begin with x- or ideally x-opt-
47-
-type x_ann_value() :: str() | integer() | float() | [x_ann_value()].
47+
-type x_ann_value() :: str() | integer() | float() | tuple() | [x_ann_value()].
4848
-type protocol() :: module().
4949
-type annotations() :: #{internal_ann_key() => term(),
5050
x_ann_key() => x_ann_value()}.

deps/rabbit/src/mc_util.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,8 @@ infer_type(V) when is_integer(V) ->
5252
{long, V};
5353
infer_type(V) when is_boolean(V) ->
5454
{boolean, V};
55-
infer_type({T, _} = V) when is_atom(T) ->
56-
%% looks like a pre-tagged type
57-
V.
55+
infer_type(TaggedValue) when is_atom(element(1, TaggedValue)) ->
56+
TaggedValue.
5857

5958
utf8_string_is_ascii(UTF8String) ->
6059
utf8_scan(UTF8String, fun(Char) -> Char >= 0 andalso Char < 128 end).

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1937,8 +1937,8 @@ settle_op_from_outcome(#'v1_0.modified'{delivery_failed = DelFailed,
19371937
{map, KVList} ->
19381938
Anns1 = lists:map(
19391939
%% "all symbolic keys except those beginning with "x-" are reserved." [3.2.10]
1940-
fun({{symbol, <<"x-", _/binary>> = K}, V}) ->
1941-
{K, unwrap(V)}
1940+
fun({{symbol, <<"x-", _/binary>> = K}, TaggedVal}) ->
1941+
{K, TaggedVal}
19421942
end, KVList),
19431943
maps:from_list(Anns1)
19441944
end,

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 58 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -501,61 +501,94 @@ modified_quorum_queue(Config) ->
501501
ok = amqp10_client:send_msg(Sender, Msg2),
502502
ok = amqp10_client:detach_link(Sender),
503503

504-
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled),
504+
Receiver1Name = <<"receiver 1">>,
505+
Receiver2Name = <<"receiver 2">>,
506+
{ok, Receiver1} = amqp10_client:attach_receiver_link(Session, Receiver1Name, Address, unsettled),
507+
{ok, Receiver2} = amqp10_client:attach_receiver_link(Session, Receiver2Name, Address, unsettled),
505508

506-
{ok, M1} = amqp10_client:get_msg(Receiver),
509+
{ok, M1} = amqp10_client:get_msg(Receiver1),
507510
?assertEqual([<<"m1">>], amqp10_msg:body(M1)),
508511
?assertMatch(#{delivery_count := 0,
509512
first_acquirer := true},
510513
amqp10_msg:headers(M1)),
511-
ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, true, #{}}),
514+
ok = amqp10_client:settle_msg(Receiver1, M1, {modified, false, true, #{}}),
512515

513-
{ok, M2a} = amqp10_client:get_msg(Receiver),
516+
{ok, M2a} = amqp10_client:get_msg(Receiver1),
514517
?assertEqual([<<"m2">>], amqp10_msg:body(M2a)),
515518
?assertMatch(#{delivery_count := 0,
516519
first_acquirer := true},
517520
amqp10_msg:headers(M2a)),
518-
ok = amqp10_client:settle_msg(Receiver, M2a, {modified, false, false, #{}}),
521+
ok = amqp10_client:settle_msg(Receiver1, M2a, {modified, false, false, #{}}),
519522

520-
{ok, M2b} = amqp10_client:get_msg(Receiver),
523+
{ok, M2b} = amqp10_client:get_msg(Receiver1),
521524
?assertEqual([<<"m2">>], amqp10_msg:body(M2b)),
522525
?assertMatch(#{delivery_count := 0,
523526
first_acquirer := false},
524527
amqp10_msg:headers(M2b)),
525-
ok = amqp10_client:settle_msg(Receiver, M2b, {modified, true, false, #{}}),
528+
ok = amqp10_client:settle_msg(Receiver1, M2b, {modified, true, false, #{}}),
526529

527-
{ok, M2c} = amqp10_client:get_msg(Receiver),
530+
{ok, M2c} = amqp10_client:get_msg(Receiver1),
528531
?assertEqual([<<"m2">>], amqp10_msg:body(M2c)),
529532
?assertMatch(#{delivery_count := 1,
530533
first_acquirer := false},
531534
amqp10_msg:headers(M2c)),
532-
ok = amqp10_client:settle_msg(Receiver, M2c,
533-
{modified, true, false,
534-
#{<<"x-opt-key">> => <<"val 1">>}}),
535-
536-
{ok, M2d} = amqp10_client:get_msg(Receiver),
535+
ok = amqp10_client:settle_msg(
536+
Receiver1, M2c,
537+
{modified, true, false,
538+
%% Test that a history of requeue events can be tracked as described in
539+
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
540+
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver1Name}]},
541+
<<"x-opt-requeue-reason">> => {list, [{utf8, <<"reason 1">>}]},
542+
<<"x-opt-my-map">> => {map, [
543+
{{utf8, <<"k1">>}, {byte, -1}},
544+
{{utf8, <<"k2">>}, {ulong, 2}}
545+
]}}}),
546+
547+
{ok, M2d} = amqp10_client:get_msg(Receiver2),
537548
?assertEqual([<<"m2">>], amqp10_msg:body(M2d)),
538549
?assertMatch(#{delivery_count := 2,
539550
first_acquirer := false},
540551
amqp10_msg:headers(M2d)),
541-
?assertMatch(#{<<"x-opt-key">> := <<"val 1">>}, amqp10_msg:message_annotations(M2d)),
542-
ok = amqp10_client:settle_msg(Receiver, M2d,
543-
{modified, false, false,
544-
#{<<"x-opt-key">> => <<"val 2">>,
545-
<<"x-other">> => 99}}),
546-
547-
{ok, M2e} = amqp10_client:get_msg(Receiver),
552+
#{<<"x-opt-requeued-by">> := {array, utf8, L0},
553+
<<"x-opt-requeue-reason">> := L1,
554+
<<"x-opt-my-map">> := L2} = amqp10_msg:message_annotations(M2d),
555+
ok = amqp10_client:settle_msg(
556+
Receiver1, M2d,
557+
{modified, false, false,
558+
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver2Name} | L0]},
559+
<<"x-opt-requeue-reason">> => {list, [{symbol, <<"reason 2">>} | L1]},
560+
<<"x-opt-my-map">> => {map, L2 ++ [{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}]},
561+
<<"x-other">> => 99}}),
562+
563+
{ok, M2e} = amqp10_client:get_msg(Receiver1),
548564
?assertEqual([<<"m2">>], amqp10_msg:body(M2e)),
549565
?assertMatch(#{delivery_count := 2,
550566
first_acquirer := false},
551567
amqp10_msg:headers(M2e)),
552-
?assertMatch(#{<<"x-opt-key">> := <<"val 2">>,
568+
?assertMatch(#{<<"x-opt-requeued-by">> := {array, utf8, [{utf8, Receiver2Name}, {utf8, Receiver1Name}]},
569+
<<"x-opt-requeue-reason">> := [{symbol, <<"reason 2">>}, {utf8, <<"reason 1">>}],
570+
<<"x-opt-my-map">> := [
571+
{{utf8, <<"k1">>}, {byte, -1}},
572+
{{utf8, <<"k2">>}, {ulong, 2}},
573+
{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}
574+
],
553575
<<"x-other">> := 99}, amqp10_msg:message_annotations(M2e)),
554-
ok = amqp10_client:settle_msg(Receiver, M2e, modified),
576+
ok = amqp10_client:settle_msg(Receiver1, M2e, modified),
555577

556-
ok = amqp10_client:detach_link(Receiver),
557-
?assertMatch({ok, #{message_count := 1}},
558-
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
578+
%% Test that we can consume via AMQP 0.9.1
579+
Ch = rabbit_ct_client_helpers:open_channel(Config),
580+
{#'basic.get_ok'{},
581+
#amqp_msg{payload = <<"m2">>,
582+
props = #'P_basic'{headers = Headers}}
583+
} = amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true}),
584+
%% We don't necessarily expect to receive the modified AMQP 1.0 message annotations via AMQP 0.9.1.
585+
ct:pal("AMQP 0.9.1 headers: ~p", [Headers]),
586+
?assertEqual({value, {<<"x-delivery-count">>, long, 5}},
587+
lists:keysearch(<<"x-delivery-count">>, 1, Headers)),
588+
ok = rabbit_ct_client_helpers:close_channel(Ch),
589+
590+
ok = amqp10_client:detach_link(Receiver1),
591+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
559592
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
560593
ok = end_session_sync(Session),
561594
ok = amqp10_client:close_connection(Connection).

0 commit comments

Comments
 (0)