Skip to content

Commit 4a2b00a

Browse files
committed
rabbit_fifo: tidy up and formatting
1 parent 08f2061 commit 4a2b00a

File tree

3 files changed

+28
-51
lines changed

3 files changed

+28
-51
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -106,21 +106,6 @@
106106
-record(purge_nodes, {nodes :: [node()]}).
107107
-record(update_config, {config :: config()}).
108108
-record(garbage_collection, {}).
109-
%% v2 alternative commands
110-
%% each consumer is assigned an integer index which can be used
111-
%% instead of the consumer id to identify the consumer
112-
-type consumer_idx() :: non_neg_integer().
113-
114-
-record(?SETTLE_V2, {consumer_idx :: consumer_idx(),
115-
msg_ids :: [msg_id()]}).
116-
-record(?RETURN_V2, {consumer_idx :: consumer_idx(),
117-
msg_ids :: [msg_id()]}).
118-
-record(?DISCARD_V2, {consumer_idx :: consumer_idx(),
119-
msg_ids :: [msg_id()]}).
120-
-record(?CREDIT_V2, {consumer_idx :: consumer_idx(),
121-
credit :: non_neg_integer(),
122-
delivery_count :: non_neg_integer(),
123-
drain :: boolean()}).
124109

125110
-opaque protocol() ::
126111
#enqueue{} |
@@ -134,12 +119,7 @@
134119
#purge{} |
135120
#purge_nodes{} |
136121
#update_config{} |
137-
#garbage_collection{} |
138-
% v2
139-
#?SETTLE_V2{} |
140-
#?RETURN_V2{} |
141-
#?DISCARD_V2{} |
142-
#?CREDIT_V2{}.
122+
#garbage_collection{}.
143123

144124
-type command() :: protocol() |
145125
rabbit_fifo_dlx:protocol() |

deps/rabbit/src/rabbit_fifo_dlx.erl

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,8 @@
2929
smallest_raft_index/1
3030
]).
3131

32-
-record(checkout,{
33-
consumer :: pid(),
34-
prefetch :: non_neg_integer()
35-
}).
32+
-record(checkout, {consumer :: pid(),
33+
prefetch :: non_neg_integer()}).
3634
-record(settle, {msg_ids :: [msg_id()]}).
3735
-type protocol() :: {dlx, #checkout{} | #settle{}}.
3836
-opaque state() :: #?MODULE{}.
@@ -93,16 +91,17 @@ stat(#?MODULE{consumer = Con,
9391
apply(_Meta, {dlx, #settle{msg_ids = MsgIds}}, at_least_once,
9492
#?MODULE{consumer = #dlx_consumer{checked_out = Checked0}} = State0) ->
9593
Acked = maps:with(MsgIds, Checked0),
96-
State = maps:fold(fun(MsgId, ?TUPLE(_Rsn, ?MSG(Idx, _) = Msg),
97-
#?MODULE{consumer = #dlx_consumer{checked_out = Checked} = C,
98-
msg_bytes_checkout = BytesCheckout,
99-
ra_indexes = Indexes0} = S) ->
100-
Indexes = rabbit_fifo_index:delete(Idx, Indexes0),
101-
S#?MODULE{consumer = C#dlx_consumer{checked_out =
102-
maps:remove(MsgId, Checked)},
103-
msg_bytes_checkout = BytesCheckout - size_in_bytes(Msg),
104-
ra_indexes = Indexes}
105-
end, State0, Acked),
94+
State = maps:fold(
95+
fun(MsgId, ?TUPLE(_Rsn, ?MSG(Idx, _) = Msg),
96+
#?MODULE{consumer = #dlx_consumer{checked_out = Checked} = C,
97+
msg_bytes_checkout = BytesCheckout,
98+
ra_indexes = Indexes0} = S) ->
99+
Indexes = rabbit_fifo_index:delete(Idx, Indexes0),
100+
S#?MODULE{consumer = C#dlx_consumer{checked_out =
101+
maps:remove(MsgId, Checked)},
102+
msg_bytes_checkout = BytesCheckout - size_in_bytes(Msg),
103+
ra_indexes = Indexes}
104+
end, State0, Acked),
106105
{State, [{mod_call, rabbit_global_counters, messages_dead_lettered_confirmed,
107106
[rabbit_quorum_queue, at_least_once, maps:size(Acked)]}]};
108107
apply(_, {dlx, #checkout{consumer = Pid,

deps/rabbit/src/rabbit_fifo_dlx.hrl

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,18 @@
44
%%
55
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
66

7-
-record(dlx_consumer,{
8-
pid :: pid(),
9-
prefetch :: non_neg_integer(),
10-
checked_out = #{} :: #{msg_id() => tuple(rabbit_dead_letter:reason(), msg())},
11-
next_msg_id = 0 :: msg_id()
12-
}).
7+
-record(dlx_consumer,
8+
{pid :: pid(),
9+
prefetch :: non_neg_integer(),
10+
checked_out = #{} :: #{msg_id() => tuple(rabbit_dead_letter:reason(), msg())},
11+
next_msg_id = 0 :: msg_id()}).
1312

14-
-record(rabbit_fifo_dlx,{
15-
consumer :: option(#dlx_consumer{}),
16-
%% Queue of dead-lettered messages.
17-
discards = lqueue:new() :: lqueue:lqueue(tuple(rabbit_dead_letter:reason(), msg())),
18-
%% Raft indexes of messages in both discards queue and dlx_consumer's checked_out map
19-
%% so that we get the smallest ra index in O(1).
20-
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
21-
msg_bytes = 0 :: non_neg_integer(),
22-
msg_bytes_checkout = 0 :: non_neg_integer()
23-
}).
13+
-record(rabbit_fifo_dlx,
14+
{consumer :: option(#dlx_consumer{}),
15+
%% Queue of dead-lettered messages.
16+
discards = lqueue:new() :: lqueue:lqueue(tuple(rabbit_dead_letter:reason(), msg())),
17+
%% Raft indexes of messages in both discards queue and dlx_consumer's checked_out map
18+
%% so that we get the smallest ra index in O(1).
19+
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
20+
msg_bytes = 0 :: non_neg_integer(),
21+
msg_bytes_checkout = 0 :: non_neg_integer()}).

0 commit comments

Comments
 (0)