Skip to content

Commit 48e2119

Browse files
committed
Rename unconfirmed to incoming_unsettled_map
to better match the AMQP spec terminology.
1 parent 0542696 commit 48e2119

File tree

1 file changed

+34
-64
lines changed

1 file changed

+34
-64
lines changed

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session.erl

Lines changed: 34 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
-behaviour(gen_server).
1111

12-
-include_lib("amqp_client/include/amqp_client.hrl").
12+
-include_lib("rabbit_common/include/rabbit.hrl").
1313
-include("rabbit_amqp1_0.hrl").
1414

1515
-define(MAX_SESSION_WINDOW_SIZE, 65_535).
@@ -26,7 +26,7 @@
2626
?V_1_0_SYMBOL_MODIFIED]).
2727

2828
%% Just make these constant for the time being.
29-
-define(INCOMING_CREDIT, 65536).
29+
-define(INCOMING_CREDIT, 65_536).
3030

3131
-define(MAX_PERMISSION_CACHE_SIZE, 12).
3232

@@ -40,7 +40,6 @@
4040
get_info/1]).
4141
-export([init/1,
4242
terminate/2,
43-
code_change/3,
4443
handle_call/3,
4544
handle_cast/2,
4645
handle_info/2]).
@@ -53,17 +52,16 @@
5352
exchange :: rabbit_exchange:name(),
5453
routing_key :: undefined | rabbit_types:routing_key(),
5554
delivery_id :: undefined | delivery_number(),
56-
delivery_count = 0,
55+
delivery_count = 0 :: sequence_no(),
5756
send_settle_mode = undefined,
5857
recv_settle_mode = undefined,
5958
credit_used = ?INCOMING_CREDIT div 2,
6059
msg_acc = []}).
6160

6261
-record(outgoing_link, {
6362
queue :: undefined | rabbit_misc:resource_name(),
64-
delivery_count = 0,
65-
%% TODO below field is not needed?
66-
send_settled}).
63+
delivery_count = 0 :: sequence_no(),
64+
send_settled :: boolean()}).
6765

6866
-record(outgoing_unsettled, {
6967
%% The queue sent us this consumer scoped sequence number.
@@ -85,8 +83,8 @@
8583

8684
%%TODO put rarely used fields into separate #cfg{}
8785
-record(state, {frame_max,
88-
reader_pid,
89-
writer_pid,
86+
reader_pid :: pid(),
87+
writer_pid :: pid(),
9088
%% These messages were received from queues thanks to sufficient link credit.
9189
%% However, they are buffered here due to session flow control before being sent to the client.
9290
pending_transfers = queue:new() :: queue:queue(#pending_transfer{}),
@@ -103,15 +101,14 @@
103101
outgoing_window_max,
104102
next_publish_id, %% the 0-9-1-side counter for confirms
105103
next_delivery_id = 0 :: delivery_number(),
106-
incoming_unsettled_map, %%TODO delete
107-
outgoing_unsettled_map = gb_trees:empty() :: gb_trees:tree(delivery_number(), #outgoing_unsettled{}),
108-
queue_states = rabbit_queue_type:init() :: rabbit_queue_type:state(),
109104
%% TRANSFER delivery IDs published to queues but not yet confirmed by queues
110105
%% TODO Use a different data structure because
111106
%% 1. we don't need to record exchanges since we don't emit channel stats,
112107
%% 2. mixed mode can result in large gaps across delivery_ids that need to be confirmed. Use a tree?
113108
%% 3. handle wrap around of 32-bit RFC-1982 serial number
114-
unconfirmed = rabbit_confirms:init() :: rabbit_confirms:state()
109+
incoming_unsettled_map = rabbit_confirms:init() :: rabbit_confirms:state(),
110+
outgoing_unsettled_map = gb_trees:empty() :: gb_trees:tree(delivery_number(), #outgoing_unsettled{}),
111+
queue_states = rabbit_queue_type:init() :: rabbit_queue_type:state()
115112
%% TRANSFER delivery IDs confirmed by queues but yet to be sent to the client
116113
%%TODO accumulate confirms and send DISPOSITIONs after processing the mailbox
117114
%%(see rabbit_channel:noreply_coalesce/1
@@ -138,63 +135,18 @@ init({Channel, ReaderPid, WriterPid, User, Vhost, FrameMax}) ->
138135
user = User,
139136
vhost = Vhost,
140137
channel_num = Channel,
141-
next_publish_id = 0,
142-
incoming_unsettled_map = gb_trees:empty()
138+
next_publish_id = 0
143139
}}.
144140

145141
terminate(_Reason, _State) ->
146142
ok.
147143

148-
code_change(_OldVsn, State, _Extra) ->
149-
{ok, State}.
150-
151144
handle_call(info, _From, #state{reader_pid = ReaderPid} = State) ->
152145
Info = [{reader, ReaderPid}],
153146
{reply, Info, State};
154147
handle_call(Msg, _From, State) ->
155148
{reply, {error, not_understood, Msg}, State}.
156149

157-
handle_info(#'basic.consume_ok'{}, State) ->
158-
%% Handled above
159-
{noreply, State};
160-
161-
handle_info(#'basic.cancel_ok'{}, State) ->
162-
%% just ignore this for now,
163-
%% At some point we should send the detach here but then we'd need to track
164-
%% consumer tags -> link handle somewhere
165-
{noreply, State};
166-
167-
%% A message from the queue saying that there are no more messages
168-
% handle_info(#'basic.credit_drained'{consumer_tag = CTag} = CreditDrained,
169-
% State = #state{writer_pid = WriterPid,
170-
% session = Session}) ->
171-
% Handle = ctag_to_handle(CTag),
172-
% Link = get({out, Handle}),
173-
% {Flow0, Link1} = rabbit_amqp1_0_session:outgoing_link_credit_drained(
174-
% CreditDrained, Handle, Link),
175-
% Flow = rabbit_amqp1_0_session:flow_fields(Flow0, Session),
176-
% rabbit_amqp1_0_writer:send_command(WriterPid, Flow),
177-
% put({out, Handle}, Link1),
178-
% {noreply, State};
179-
180-
% handle_info({#'basic.return'{}, {DTag, _Msg}}, State = #state{writer_pid = WriterPid,
181-
% session = Session}) ->
182-
% {Reply, Session1} = rabbit_amqp1_0_session:return(DTag, Session),
183-
% case Reply of
184-
% undefined ->
185-
% ok;
186-
% _ ->
187-
% rabbit_amqp1_0_writer:send_command(
188-
% WriterPid,
189-
% rabbit_amqp1_0_session:flow_fields(Reply, Session)
190-
% )
191-
% end,
192-
% {noreply, state(Session1, State)};
193-
194-
% handle_info({#'basic.return'{}, _Msg}, State = #state{session = Session}) ->
195-
% rabbit_log:warning("AMQP 1.0 message return without publishing sequence"),
196-
% {noreply, state(Session, State)};
197-
198150
handle_info({bump_credit, Msg}, State) ->
199151
credit_flow:handle_bump_msg(Msg),
200152
{noreply, State};
@@ -893,12 +845,30 @@ handle_queue_event({queue_event, QRef, Evt},
893845
% rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
894846
end.
895847

848+
% handle_info({#'basic.return'{}, {DTag, _Msg}}, State = #state{writer_pid = WriterPid,
849+
% session = Session}) ->
850+
% {Reply, Session1} = rabbit_amqp1_0_session:return(DTag, Session),
851+
% case Reply of
852+
% undefined ->
853+
% ok;
854+
% _ ->
855+
% rabbit_amqp1_0_writer:send_command(
856+
% WriterPid,
857+
% rabbit_amqp1_0_session:flow_fields(Reply, Session)
858+
% )
859+
% end,
860+
% {noreply, state(Session1, State)};
861+
862+
% handle_info({#'basic.return'{}, _Msg}, State = #state{session = Session}) ->
863+
% rabbit_log:warning("AMQP 1.0 message return without publishing sequence"),
864+
% {noreply, state(Session, State)};
865+
896866
handle_queue_actions(Actions, State0) ->
897867
{ReplyRev, State} =
898868
lists:foldl(
899-
fun ({settled, QName, DelIds}, {Reply, S0 = #state{unconfirmed = U0}}) ->
869+
fun ({settled, QName, DelIds}, {Reply, S0 = #state{incoming_unsettled_map = U0}}) ->
900870
{ConfirmMXs, U} = rabbit_confirms:confirm(DelIds, QName, U0),
901-
S = S0#state{unconfirmed = U},
871+
S = S0#state{incoming_unsettled_map = U},
902872
R = if ConfirmMXs =:= [] ->
903873
Reply;
904874
ConfirmMXs =/= [] ->
@@ -960,7 +930,7 @@ handle_deliver(ConsumerTag, AckRequired,
960930
Dtag = if is_integer(MsgId) ->
961931
%% delivery-tag must be unique only per link (not per session)
962932
<<MsgId:64>>;
963-
MsgId =:= undefined ->
933+
MsgId =:= undefined andalso SendSettled ->
964934
%% Both ends of the link will always consider this message settled because
965935
%% "the sender will send all deliveries settled to the receiver" [3.8.2].
966936
%% Hence, the delivery tag does not have to be unique on this link.
@@ -1039,7 +1009,7 @@ incoming_link_transfer(#'v1_0.transfer'{delivery_id = DeliveryId0,
10391009
send_settle_mode = SSM,
10401010
recv_settle_mode = RSM} = Link,
10411011
#state{queue_states = QStates0,
1042-
unconfirmed = U0,
1012+
incoming_unsettled_map = U0,
10431013
next_publish_id = NextPublishId0
10441014
} = State0) ->
10451015
MsgBin = iolist_to_binary(lists:reverse([MsgPart | MsgAcc])),
@@ -1094,7 +1064,7 @@ incoming_link_transfer(#'v1_0.transfer'{delivery_id = DeliveryId0,
10941064
{U, Reply0} = process_routing_confirm(Qs, EffectiveSendSettleMode, DeliveryId, XName, U0),
10951065
State1 = State0#state{queue_states = QStates,
10961066
next_publish_id = NextPublishId,
1097-
unconfirmed = U},
1067+
incoming_unsettled_map = U},
10981068
{Reply1, State} = handle_queue_actions(Actions, State1),
10991069
{SendFlow, CreditUsed1} = case CreditUsed - 1 of
11001070
C when C =< 0 ->

0 commit comments

Comments
 (0)