9
9
10
10
-behaviour (gen_server ).
11
11
12
- -include_lib (" amqp_client /include/amqp_client .hrl" ).
12
+ -include_lib (" rabbit_common /include/rabbit .hrl" ).
13
13
-include (" rabbit_amqp1_0.hrl" ).
14
14
15
15
-define (MAX_SESSION_WINDOW_SIZE , 65_535 ).
26
26
? V_1_0_SYMBOL_MODIFIED ]).
27
27
28
28
% % Just make these constant for the time being.
29
- -define (INCOMING_CREDIT , 65536 ).
29
+ -define (INCOMING_CREDIT , 65_536 ).
30
30
31
31
-define (MAX_PERMISSION_CACHE_SIZE , 12 ).
32
32
40
40
get_info /1 ]).
41
41
-export ([init /1 ,
42
42
terminate /2 ,
43
- code_change /3 ,
44
43
handle_call /3 ,
45
44
handle_cast /2 ,
46
45
handle_info /2 ]).
53
52
exchange :: rabbit_exchange :name (),
54
53
routing_key :: undefined | rabbit_types :routing_key (),
55
54
delivery_id :: undefined | delivery_number (),
56
- delivery_count = 0 ,
55
+ delivery_count = 0 :: sequence_no () ,
57
56
send_settle_mode = undefined ,
58
57
recv_settle_mode = undefined ,
59
58
credit_used = ? INCOMING_CREDIT div 2 ,
60
59
msg_acc = []}).
61
60
62
61
-record (outgoing_link , {
63
62
queue :: undefined | rabbit_misc :resource_name (),
64
- delivery_count = 0 ,
65
- % % TODO below field is not needed?
66
- send_settled }).
63
+ delivery_count = 0 :: sequence_no (),
64
+ send_settled :: boolean ()}).
67
65
68
66
-record (outgoing_unsettled , {
69
67
% % The queue sent us this consumer scoped sequence number.
85
83
86
84
% %TODO put rarely used fields into separate #cfg{}
87
85
-record (state , {frame_max ,
88
- reader_pid ,
89
- writer_pid ,
86
+ reader_pid :: pid () ,
87
+ writer_pid :: pid () ,
90
88
% % These messages were received from queues thanks to sufficient link credit.
91
89
% % However, they are buffered here due to session flow control before being sent to the client.
92
90
pending_transfers = queue :new () :: queue :queue (# pending_transfer {}),
103
101
outgoing_window_max ,
104
102
next_publish_id , % % the 0-9-1-side counter for confirms
105
103
next_delivery_id = 0 :: delivery_number (),
106
- incoming_unsettled_map , % %TODO delete
107
- outgoing_unsettled_map = gb_trees :empty () :: gb_trees :tree (delivery_number (), # outgoing_unsettled {}),
108
- queue_states = rabbit_queue_type :init () :: rabbit_queue_type :state (),
109
104
% % TRANSFER delivery IDs published to queues but not yet confirmed by queues
110
105
% % TODO Use a different data structure because
111
106
% % 1. we don't need to record exchanges since we don't emit channel stats,
112
107
% % 2. mixed mode can result in large gaps across delivery_ids that need to be confirmed. Use a tree?
113
108
% % 3. handle wrap around of 32-bit RFC-1982 serial number
114
- unconfirmed = rabbit_confirms :init () :: rabbit_confirms :state ()
109
+ incoming_unsettled_map = rabbit_confirms :init () :: rabbit_confirms :state (),
110
+ outgoing_unsettled_map = gb_trees :empty () :: gb_trees :tree (delivery_number (), # outgoing_unsettled {}),
111
+ queue_states = rabbit_queue_type :init () :: rabbit_queue_type :state ()
115
112
% % TRANSFER delivery IDs confirmed by queues but yet to be sent to the client
116
113
% %TODO accumulate confirms and send DISPOSITIONs after processing the mailbox
117
114
% %(see rabbit_channel:noreply_coalesce/1
@@ -138,63 +135,18 @@ init({Channel, ReaderPid, WriterPid, User, Vhost, FrameMax}) ->
138
135
user = User ,
139
136
vhost = Vhost ,
140
137
channel_num = Channel ,
141
- next_publish_id = 0 ,
142
- incoming_unsettled_map = gb_trees :empty ()
138
+ next_publish_id = 0
143
139
}}.
144
140
145
141
terminate (_Reason , _State ) ->
146
142
ok .
147
143
148
- code_change (_OldVsn , State , _Extra ) ->
149
- {ok , State }.
150
-
151
144
handle_call (info , _From , # state {reader_pid = ReaderPid } = State ) ->
152
145
Info = [{reader , ReaderPid }],
153
146
{reply , Info , State };
154
147
handle_call (Msg , _From , State ) ->
155
148
{reply , {error , not_understood , Msg }, State }.
156
149
157
- handle_info (# 'basic.consume_ok' {}, State ) ->
158
- % % Handled above
159
- {noreply , State };
160
-
161
- handle_info (# 'basic.cancel_ok' {}, State ) ->
162
- % % just ignore this for now,
163
- % % At some point we should send the detach here but then we'd need to track
164
- % % consumer tags -> link handle somewhere
165
- {noreply , State };
166
-
167
- % % A message from the queue saying that there are no more messages
168
- % handle_info(#'basic.credit_drained'{consumer_tag = CTag} = CreditDrained,
169
- % State = #state{writer_pid = WriterPid,
170
- % session = Session}) ->
171
- % Handle = ctag_to_handle(CTag),
172
- % Link = get({out, Handle}),
173
- % {Flow0, Link1} = rabbit_amqp1_0_session:outgoing_link_credit_drained(
174
- % CreditDrained, Handle, Link),
175
- % Flow = rabbit_amqp1_0_session:flow_fields(Flow0, Session),
176
- % rabbit_amqp1_0_writer:send_command(WriterPid, Flow),
177
- % put({out, Handle}, Link1),
178
- % {noreply, State};
179
-
180
- % handle_info({#'basic.return'{}, {DTag, _Msg}}, State = #state{writer_pid = WriterPid,
181
- % session = Session}) ->
182
- % {Reply, Session1} = rabbit_amqp1_0_session:return(DTag, Session),
183
- % case Reply of
184
- % undefined ->
185
- % ok;
186
- % _ ->
187
- % rabbit_amqp1_0_writer:send_command(
188
- % WriterPid,
189
- % rabbit_amqp1_0_session:flow_fields(Reply, Session)
190
- % )
191
- % end,
192
- % {noreply, state(Session1, State)};
193
-
194
- % handle_info({#'basic.return'{}, _Msg}, State = #state{session = Session}) ->
195
- % rabbit_log:warning("AMQP 1.0 message return without publishing sequence"),
196
- % {noreply, state(Session, State)};
197
-
198
150
handle_info ({bump_credit , Msg }, State ) ->
199
151
credit_flow :handle_bump_msg (Msg ),
200
152
{noreply , State };
@@ -830,12 +782,6 @@ flow(#'v1_0.flow'{next_incoming_id = FlowNextIn0,
830
782
end
831
783
end .
832
784
833
- acknowledgement (DeliveryIds , Disposition ) ->
834
- Disposition # 'v1_0.disposition' {first = {uint , hd (DeliveryIds )},
835
- last = {uint , lists :last (DeliveryIds )},
836
- settled = true ,
837
- state = # 'v1_0.accepted' {}}.
838
-
839
785
set_delivery_id ({uint , D }, # incoming_link {delivery_id = undefined } = Link ) ->
840
786
% % "The delivery-id MUST be supplied on the first transfer of a multi-transfer delivery.
841
787
Link # incoming_link {delivery_id = D };
@@ -893,20 +839,43 @@ handle_queue_event({queue_event, QRef, Evt},
893
839
% rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
894
840
end .
895
841
842
+ % handle_info({#'basic.return'{}, {DTag, _Msg}}, State = #state{writer_pid = WriterPid,
843
+ % session = Session}) ->
844
+ % {Reply, Session1} = rabbit_amqp1_0_session:return(DTag, Session),
845
+ % case Reply of
846
+ % undefined ->
847
+ % ok;
848
+ % _ ->
849
+ % rabbit_amqp1_0_writer:send_command(
850
+ % WriterPid,
851
+ % rabbit_amqp1_0_session:flow_fields(Reply, Session)
852
+ % )
853
+ % end,
854
+ % {noreply, state(Session1, State)};
855
+
856
+ % handle_info({#'basic.return'{}, _Msg}, State = #state{session = Session}) ->
857
+ % rabbit_log:warning("AMQP 1.0 message return without publishing sequence"),
858
+ % {noreply, state(Session, State)};
859
+
896
860
handle_queue_actions (Actions , State0 ) ->
897
861
{ReplyRev , State } =
898
862
lists :foldl (
899
- fun ({settled , QName , DelIds }, {Reply , S0 = # state {unconfirmed = U0 }}) ->
863
+ fun ({settled , QName , DelIds }, {Reply , S0 = # state {incoming_unsettled_map = U0 }}) ->
900
864
{ConfirmMXs , U } = rabbit_confirms :confirm (DelIds , QName , U0 ),
901
- S = S0 # state {unconfirmed = U },
865
+ S = S0 # state {incoming_unsettled_map = U },
902
866
R = if ConfirmMXs =:= [] ->
903
867
Reply ;
904
868
ConfirmMXs =/= [] ->
905
869
ConfirmDelIds = lists :map (fun ({Id , _Exchange }) -> Id end , ConfirmMXs ),
906
870
Ids = lists :usort (ConfirmDelIds ),
907
871
% %TODO defer sending confirms as done in rabbit_channel
908
872
% record_confirms(ConfirmDelIds, S)
909
- Disposition = acknowledgement (Ids , # 'v1_0.disposition' {role = ? RECV_ROLE }),
873
+ Disposition = # 'v1_0.disposition' {
874
+ role = ? RECV_ROLE ,
875
+ settled = true ,
876
+ state = # 'v1_0.accepted' {},
877
+ first = {uint , hd (Ids )},
878
+ last = {uint , lists :last (Ids )}},
910
879
[Disposition | Reply ]
911
880
end ,
912
881
{R , S };
@@ -960,7 +929,7 @@ handle_deliver(ConsumerTag, AckRequired,
960
929
Dtag = if is_integer (MsgId ) ->
961
930
% % delivery-tag must be unique only per link (not per session)
962
931
<<MsgId :64 >>;
963
- MsgId =:= undefined ->
932
+ MsgId =:= undefined andalso SendSettled ->
964
933
% % Both ends of the link will always consider this message settled because
965
934
% % "the sender will send all deliveries settled to the receiver" [3.8.2].
966
935
% % Hence, the delivery tag does not have to be unique on this link.
@@ -1039,7 +1008,7 @@ incoming_link_transfer(#'v1_0.transfer'{delivery_id = DeliveryId0,
1039
1008
send_settle_mode = SSM ,
1040
1009
recv_settle_mode = RSM } = Link ,
1041
1010
# state {queue_states = QStates0 ,
1042
- unconfirmed = U0 ,
1011
+ incoming_unsettled_map = U0 ,
1043
1012
next_publish_id = NextPublishId0
1044
1013
} = State0 ) ->
1045
1014
MsgBin = iolist_to_binary (lists :reverse ([MsgPart | MsgAcc ])),
@@ -1094,7 +1063,7 @@ incoming_link_transfer(#'v1_0.transfer'{delivery_id = DeliveryId0,
1094
1063
{U , Reply0 } = process_routing_confirm (Qs , EffectiveSendSettleMode , DeliveryId , XName , U0 ),
1095
1064
State1 = State0 # state {queue_states = QStates ,
1096
1065
next_publish_id = NextPublishId ,
1097
- unconfirmed = U },
1066
+ incoming_unsettled_map = U },
1098
1067
{Reply1 , State } = handle_queue_actions (Actions , State1 ),
1099
1068
{SendFlow , CreditUsed1 } = case CreditUsed - 1 of
1100
1069
C when C =< 0 ->
0 commit comments