Skip to content

Commit 3abfe38

Browse files
authored
Merge pull request #1788 from rabbitmq/delivery-count-header
Include redelivery counts for quorum queues
2 parents 0e4e0ca + cd070ee commit 3abfe38

File tree

4 files changed

+116
-4
lines changed

4 files changed

+116
-4
lines changed

src/rabbit_basic.erl

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
2424
header_routes/1, parse_expiration/1, header/2, header/3]).
2525
-export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]).
26+
-export([add_header/4]).
2627

2728
%%----------------------------------------------------------------------------
2829

@@ -300,3 +301,12 @@ maybe_gc_large_msg(Content) ->
300301

301302
msg_size(Content) ->
302303
rabbit_writer:msg_size(Content).
304+
305+
add_header(Name, Type, Value, #basic_message{content = Content0} = Msg) ->
306+
Content = rabbit_basic:map_headers(
307+
fun(undefined) ->
308+
rabbit_misc:set_table_value([], Name, Type, Value);
309+
(Headers) ->
310+
rabbit_misc:set_table_value(Headers, Name, Type, Value)
311+
end, Content0),
312+
Msg#basic_message{content = Content}.

src/rabbit_channel.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,8 +649,9 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
649649
State = lists:foldl(
650650
fun({MsgId, {MsgHeader, Msg}}, Acc) ->
651651
IsDelivered = maps:is_key(delivery_count, MsgHeader),
652+
Msg1 = add_delivery_count_header(MsgHeader, Msg),
652653
handle_deliver(CTag, AckRequired,
653-
{QName, From, MsgId, IsDelivered, Msg},
654+
{QName, From, MsgId, IsDelivered, Msg1},
654655
Acc)
655656
end, State0#ch{queue_states = maps:put(Name, QState2, QueueStates)}, Msgs),
656657
noreply(State);
@@ -2488,3 +2489,7 @@ maybe_monitor(_, QMons) ->
24882489
maybe_monitor_all([], S) -> S; %% optimisation
24892490
maybe_monitor_all([Item], S) -> maybe_monitor(Item, S); %% optimisation
24902491
maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items).
2492+
2493+
add_delivery_count_header(MsgHeader, Msg) ->
2494+
Count = maps:get(delivery_count, MsgHeader, 0),
2495+
rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg).

src/rabbit_quorum_queue.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,8 +335,10 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
335335
case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of
336336
{ok, empty, QState} ->
337337
{ok, empty, QState};
338-
{ok, {MsgId, {MsgHeader, Msg}}, QState} ->
339-
IsDelivered = maps:is_key(delivery_count, MsgHeader),
338+
{ok, {MsgId, {MsgHeader, Msg0}}, QState} ->
339+
Count = maps:get(delivery_count, MsgHeader, 0),
340+
IsDelivered = Count > 0,
341+
Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0),
340342
{ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState};
341343
{timeout, _} ->
342344
{error, timeout}

test/quorum_queue_SUITE.erl

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ all_tests() ->
113113
cancel_sync_queue,
114114
basic_recover,
115115
idempotent_recover,
116-
vhost_with_quorum_queue_is_deleted
116+
vhost_with_quorum_queue_is_deleted,
117+
consume_redelivery_count,
118+
subscribe_redelivery_count
117119
].
118120

119121
%% -------------------------------------------------------------------
@@ -1820,6 +1822,99 @@ basic_recover(Config) ->
18201822
amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
18211823
wait_for_messages_ready(Servers, RaName, 1),
18221824
wait_for_messages_pending_ack(Servers, RaName, 0).
1825+
1826+
subscribe_redelivery_count(Config) ->
1827+
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1828+
1829+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
1830+
QQ = ?config(queue_name, Config),
1831+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1832+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1833+
1834+
RaName = ra_name(QQ),
1835+
publish(Ch, QQ),
1836+
wait_for_messages_ready(Servers, RaName, 1),
1837+
wait_for_messages_pending_ack(Servers, RaName, 0),
1838+
subscribe(Ch, QQ, false),
1839+
1840+
DTag = <<"x-delivery-count">>,
1841+
receive
1842+
{#'basic.deliver'{delivery_tag = DeliveryTag,
1843+
redelivered = false},
1844+
#amqp_msg{props = #'P_basic'{headers = H0}}} ->
1845+
?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)),
1846+
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
1847+
multiple = false,
1848+
requeue = true})
1849+
end,
1850+
1851+
receive
1852+
{#'basic.deliver'{delivery_tag = DeliveryTag1,
1853+
redelivered = true},
1854+
#amqp_msg{props = #'P_basic'{headers = H1}}} ->
1855+
?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
1856+
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
1857+
multiple = false,
1858+
requeue = true})
1859+
end,
1860+
1861+
receive
1862+
{#'basic.deliver'{delivery_tag = DeliveryTag2,
1863+
redelivered = true},
1864+
#amqp_msg{props = #'P_basic'{headers = H2}}} ->
1865+
?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)),
1866+
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2,
1867+
multiple = false}),
1868+
wait_for_messages_ready(Servers, RaName, 0),
1869+
wait_for_messages_pending_ack(Servers, RaName, 0)
1870+
end.
1871+
1872+
consume_redelivery_count(Config) ->
1873+
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1874+
1875+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
1876+
QQ = ?config(queue_name, Config),
1877+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1878+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1879+
1880+
RaName = ra_name(QQ),
1881+
publish(Ch, QQ),
1882+
wait_for_messages_ready(Servers, RaName, 1),
1883+
wait_for_messages_pending_ack(Servers, RaName, 0),
1884+
1885+
DTag = <<"x-delivery-count">>,
1886+
1887+
{#'basic.get_ok'{delivery_tag = DeliveryTag,
1888+
redelivered = false},
1889+
#amqp_msg{props = #'P_basic'{headers = H0}}} =
1890+
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
1891+
no_ack = false}),
1892+
?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)),
1893+
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
1894+
multiple = false,
1895+
requeue = true}),
1896+
1897+
{#'basic.get_ok'{delivery_tag = DeliveryTag1,
1898+
redelivered = true},
1899+
#amqp_msg{props = #'P_basic'{headers = H1}}} =
1900+
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
1901+
no_ack = false}),
1902+
?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
1903+
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
1904+
multiple = false,
1905+
requeue = true}),
1906+
1907+
{#'basic.get_ok'{delivery_tag = DeliveryTag2,
1908+
redelivered = true},
1909+
#amqp_msg{props = #'P_basic'{headers = H2}}} =
1910+
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
1911+
no_ack = false}),
1912+
?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)),
1913+
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2,
1914+
multiple = false,
1915+
requeue = true}).
1916+
1917+
18231918
%%----------------------------------------------------------------------------
18241919

18251920
declare(Ch, Q) ->

0 commit comments

Comments
 (0)