Skip to content

Commit 7e997cb

Browse files
authored
Merge pull request #9334 from rabbitmq/revert-8739-mergify/bp/v3.11.x/pr-8736
Revert "Settle unroutable message with released state (backport #8015) (backport #8736)"
2 parents 04bbe52 + 2ed22e6 commit 7e997cb

File tree

9 files changed

+21
-233
lines changed

9 files changed

+21
-233
lines changed

deps/amqp_client/src/amqp_channel.erl

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -886,9 +886,6 @@ flush_writer(#state{driver = direct}) ->
886886
ok.
887887
amqp_msg(none) ->
888888
none;
889-
amqp_msg({DTag, Content}) ->
890-
{Props, Payload} = rabbit_basic_common:from_content(Content),
891-
{DTag, #amqp_msg{props = Props, payload = Payload}};
892889
amqp_msg(Content) ->
893890
{Props, Payload} = rabbit_basic_common:from_content(Content),
894891
#amqp_msg{props = Props, payload = Payload}.

deps/rabbit/src/rabbit_channel.erl

Lines changed: 14 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,7 @@
113113
consumer_timeout,
114114
authz_context,
115115
%% defines how ofter gc will be executed
116-
writer_gc_threshold,
117-
%% true with AMQP 1.0 to include the publishing sequence
118-
%% in the return callback, false otherwise
119-
extended_return_callback
116+
writer_gc_threshold
120117
}).
121118

122119
-record(pending_ack, {
@@ -523,7 +520,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
523520
MaxMessageSize = get_max_message_size(),
524521
ConsumerTimeout = get_consumer_timeout(),
525522
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
526-
UseExtendedReturnCallback = use_extended_return_callback(AmqpParams),
527523
{ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold),
528524
State = #ch{cfg = #conf{state = starting,
529525
protocol = Protocol,
@@ -542,8 +538,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
542538
max_message_size = MaxMessageSize,
543539
consumer_timeout = ConsumerTimeout,
544540
authz_context = OptionalVariables,
545-
writer_gc_threshold = GCThreshold,
546-
extended_return_callback = UseExtendedReturnCallback
541+
writer_gc_threshold = GCThreshold
547542
},
548543
limiter = Limiter,
549544
tx = none,
@@ -1106,15 +1101,6 @@ extract_variable_map_from_amqp_params([Value]) ->
11061101
extract_variable_map_from_amqp_params(_) ->
11071102
#{}.
11081103

1109-
%% Use tuple representation of amqp_params to avoid a dependency on amqp_client.
1110-
%% Used for AMQP 1.0
1111-
use_extended_return_callback({amqp_params_direct,_,_,_,_,
1112-
{amqp_adapter_info,_,_,_,_,_,{'AMQP',"1.0"},_},
1113-
_}) ->
1114-
true;
1115-
use_extended_return_callback(_) ->
1116-
false.
1117-
11181104
check_msg_size(Content, MaxMessageSize, GCThreshold) ->
11191105
Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
11201106
case Size of
@@ -1956,8 +1942,9 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
19561942
ok
19571943
end.
19581944

1959-
basic_return(Content, #basic_message{exchange_name = ExchangeName,
1960-
routing_keys = [RoutingKey | _CcRoutes]},
1945+
basic_return(#basic_message{exchange_name = ExchangeName,
1946+
routing_keys = [RoutingKey | _CcRoutes],
1947+
content = Content},
19611948
State = #ch{cfg = #conf{protocol = Protocol,
19621949
writer_pid = WriterPid}},
19631950
Reason) ->
@@ -2193,9 +2180,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
21932180
mandatory = Mandatory,
21942181
confirm = Confirm,
21952182
msg_seq_no = MsgSeqNo},
2196-
RoutedToQueueNames = [QName]},
2197-
State0 = #ch{cfg = #conf{extended_return_callback = ExtendedReturnCallback},
2198-
queue_states = QueueStates0}) -> %% optimisation when there is one queue
2183+
RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue
21992184
Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames),
22002185
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
22012186
QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
@@ -2204,7 +2189,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
22042189
rabbit_global_counters:messages_routed(amqp091, erlang:min(1, length(Qs))),
22052190
%% NB: the order here is important since basic.returns must be
22062191
%% sent before confirms.
2207-
ok = process_routing_mandatory(ExtendedReturnCallback, Mandatory, Qs, MsgSeqNo, Message, State0),
2192+
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
22082193
State1 = process_routing_confirm(Confirm, QueueNames, MsgSeqNo, XName, State0),
22092194
%% Actions must be processed after registering confirms as actions may
22102195
%% contain rejections of publishes
@@ -2232,9 +2217,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
22322217
mandatory = Mandatory,
22332218
confirm = Confirm,
22342219
msg_seq_no = MsgSeqNo},
2235-
RoutedToQueueNames},
2236-
State0 = #ch{cfg = #conf{extended_return_callback = ExtendedReturnCallback},
2237-
queue_states = QueueStates0}) ->
2220+
RoutedToQueueNames}, State0 = #ch{queue_states = QueueStates0}) ->
22382221
Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames),
22392222
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
22402223
QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
@@ -2243,7 +2226,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
22432226
rabbit_global_counters:messages_routed(amqp091, length(Qs)),
22442227
%% NB: the order here is important since basic.returns must be
22452228
%% sent before confirms.
2246-
ok = process_routing_mandatory(ExtendedReturnCallback, Mandatory, Qs, MsgSeqNo, Message, State0),
2229+
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
22472230
State1 = process_routing_confirm(Confirm, QueueNames,
22482231
MsgSeqNo, XName, State0),
22492232
%% Actions must be processed after registering confirms as actions may
@@ -2265,32 +2248,19 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
22652248
[rabbit_misc:rs(Resource)])
22662249
end.
22672250

2268-
process_routing_mandatory(_ExtendedReturnCallback = false,
2269-
_Mandatory = true,
2270-
_RoutedToQs = [],
2271-
_MsgSeqNo,
2272-
#basic_message{content = Content} = Msg, State) ->
2273-
rabbit_global_counters:messages_unroutable_returned(amqp091, 1),
2274-
ok = basic_return(Content, Msg, State, no_route),
2275-
ok;
2276-
process_routing_mandatory(_ExtendedReturnCallback = true,
2277-
_Mandatory = true,
2251+
process_routing_mandatory(_Mandatory = true,
22782252
_RoutedToQs = [],
2279-
MsgSeqNo,
2280-
#basic_message{content = Content} = Msg, State) ->
2253+
Msg, State) ->
22812254
rabbit_global_counters:messages_unroutable_returned(amqp091, 1),
2282-
%% providing the publishing sequence for AMQP 1.0
2283-
ok = basic_return({MsgSeqNo, Content}, Msg, State, no_route),
2255+
ok = basic_return(Msg, State, no_route),
22842256
ok;
2285-
process_routing_mandatory(_ExtendedReturnCallback,
2286-
_Mandatory = false,
2257+
process_routing_mandatory(_Mandatory = false,
22872258
_RoutedToQs = [],
2288-
_MsgSeqNo,
22892259
#basic_message{exchange_name = ExchangeName}, State) ->
22902260
rabbit_global_counters:messages_unroutable_dropped(amqp091, 1),
22912261
?INCR_STATS(exchange_stats, ExchangeName, 1, drop_unroutable, State),
22922262
ok;
2293-
process_routing_mandatory(_, _, _, _, _, _) ->
2263+
process_routing_mandatory(_, _, _, _) ->
22942264
ok.
22952265

22962266
process_routing_confirm(false, _, _, _, State) ->

deps/rabbit_common/src/rabbit_writer.erl

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,7 @@
105105

106106
-spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
107107
-spec send_command
108-
(pid(), rabbit_framing:amqp_method_record(),
109-
rabbit_types:content() |
110-
{integer(), rabbit_types:content()} %% publishing sequence for AMQP 1.0 return callback
111-
) ->
108+
(pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) ->
112109
'ok'.
113110
-spec send_command_sync(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
114111
-spec send_command_sync

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_incoming_link.erl

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
%% Just make these constant for the time being.
1818
-define(INCOMING_CREDIT, 65536).
1919

20-
-record(incoming_link, {name, exchange, routing_key, mandatory,
20+
-record(incoming_link, {name, exchange, routing_key,
2121
delivery_id = undefined,
2222
delivery_count = 0,
2323
send_settle_mode = undefined,
@@ -53,7 +53,6 @@ attach(#'v1_0.attach'{name = Name,
5353
SndSettleMode == ?V_1_0_SENDER_SETTLE_MODE_MIXED ->
5454
amqp_channel:register_confirm_handler(BCh, self()),
5555
rabbit_amqp1_0_channel:call(BCh, #'confirm.select'{}),
56-
amqp_channel:register_return_handler(BCh, self()),
5756
true
5857
end,
5958
Flow = #'v1_0.flow'{ handle = Handle,
@@ -70,8 +69,7 @@ attach(#'v1_0.attach'{name = Name,
7069
initial_delivery_count = undefined, % must be, I am the receiver
7170
role = ?RECV_ROLE}, %% server is receiver
7271
IncomingLink1 =
73-
IncomingLink#incoming_link{recv_settle_mode = RcvSettleMode,
74-
mandatory = Confirm},
72+
IncomingLink#incoming_link{recv_settle_mode = RcvSettleMode},
7573
{ok, [Attach, Flow], IncomingLink1, Confirm};
7674
{error, Reason} ->
7775
%% TODO proper link establishment protocol here?
@@ -144,8 +142,7 @@ transfer(#'v1_0.transfer'{delivery_id = DeliveryId0,
144142
end,
145143
rabbit_amqp1_0_channel:cast_flow(
146144
BCh, #'basic.publish'{exchange = X,
147-
routing_key = RKey,
148-
mandatory = true}, Msg),
145+
routing_key = RKey}, Msg),
149146
{SendFlow, CreditUsed1} = case CreditUsed - 1 of
150147
C when C =< 0 ->
151148
{true, ?INCOMING_CREDIT div 2};
@@ -209,7 +206,6 @@ ensure_target(Target = #'v1_0.target'{address = Address,
209206
dest, DCh, Dest, DeclareParams,
210207
RouteState)
211208
end),
212-
maybe_ensure_queue(Dest, DCh),
213209
{XName, RK} = rabbit_routing_util:parse_routing(Dest),
214210
{ok, Target, Link#incoming_link{
215211
route_state = RouteState1,
@@ -226,20 +222,6 @@ ensure_target(Target = #'v1_0.target'{address = Address,
226222
{error, {address_not_utf8_string, Address}}
227223
end.
228224

229-
maybe_ensure_queue({amqqueue, Q}, Ch) ->
230-
try
231-
rabbit_amqp1_0_channel:convert_error(
232-
fun () ->
233-
Method = #'queue.declare'{queue = list_to_binary(Q),
234-
passive = true},
235-
amqp_channel:call(Ch, Method)
236-
end)
237-
catch exit:#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED} ->
238-
ok
239-
end;
240-
maybe_ensure_queue(_, _) ->
241-
ok.
242-
243225
incoming_flow(#incoming_link{ delivery_count = Count }, Handle) ->
244226
#'v1_0.flow'{handle = Handle,
245227
delivery_count = {uint, Count},

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session.erl

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
incr_incoming_id/1, next_delivery_id/1, transfers_left/1,
1515
record_transfers/2, bump_outgoing_window/1,
1616
record_outgoing/4, settle/3, flow_fields/2, channel/1,
17-
flow/2, ack/2, return/2, validate_attach/1]).
17+
flow/2, ack/2, validate_attach/1]).
1818

1919
-import(rabbit_amqp1_0_util, [protocol_error/3,
2020
serial_add/2, serial_diff/2, serial_compare/2]).
@@ -396,25 +396,3 @@ acknowledgement(DeliveryIds, Disposition) ->
396396
last = {uint, lists:last(DeliveryIds)},
397397
settled = true,
398398
state = #'v1_0.accepted'{} }.
399-
400-
return(DTag, Session = #session{incoming_unsettled_map = Unsettled}) ->
401-
{DeliveryId,
402-
Unsettled1} = case gb_trees:lookup(DTag, Unsettled) of
403-
{value, #incoming_delivery{ delivery_id = Id }} ->
404-
{Id, gb_trees:delete(DTag, Unsettled)};
405-
none ->
406-
{undefined, Unsettled}
407-
end,
408-
Disposition = case DeliveryId of
409-
undefined -> undefined;
410-
_ -> release(DeliveryId,
411-
#'v1_0.disposition'{role = ?RECV_ROLE})
412-
end,
413-
{Disposition,
414-
Session#session{incoming_unsettled_map = Unsettled1}}.
415-
416-
release(DeliveryId, Disposition) ->
417-
Disposition#'v1_0.disposition'{ first = {uint, DeliveryId},
418-
last = {uint, DeliveryId},
419-
settled = true,
420-
state = #'v1_0.released'{} }.

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -130,24 +130,6 @@ handle_info(#'basic.ack'{} = Ack, State = #state{writer_pid = WriterPid,
130130
F <- rabbit_amqp1_0_session:flow_fields(Reply, Session)],
131131
{noreply, state(Session1, State)};
132132

133-
handle_info({#'basic.return'{}, {DTag, _Msg}}, State = #state{writer_pid = WriterPid,
134-
session = Session}) ->
135-
{Reply, Session1} = rabbit_amqp1_0_session:return(DTag, Session),
136-
case Reply of
137-
undefined ->
138-
ok;
139-
_ ->
140-
rabbit_amqp1_0_writer:send_command(
141-
WriterPid,
142-
rabbit_amqp1_0_session:flow_fields(Reply, Session)
143-
)
144-
end,
145-
{noreply, state(Session1, State)};
146-
147-
handle_info({#'basic.return'{}, _Msg}, State = #state{session = Session}) ->
148-
rabbit_log:warning("AMQP 1.0 message return without publishing sequence"),
149-
{noreply, state(Session, State)};
150-
151133
handle_info({bump_credit, Msg}, State) ->
152134
credit_flow:handle_bump_msg(Msg),
153135
{noreply, State};

deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl

Lines changed: 2 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ groups() ->
2424
[
2525
{tests, [], [
2626
reliable_send_receive_with_outcomes,
27-
publishing_to_non_existing_queue_should_settle_with_released,
28-
open_link_to_non_existing_destination_should_end_session,
2927
roundtrip_classic_queue_with_drain,
3028
roundtrip_quorum_queue_with_drain,
3129
roundtrip_stream_queue_with_drain,
@@ -165,68 +163,6 @@ reliable_send_receive(Config, Outcome) ->
165163

166164
ok.
167165

168-
publishing_to_non_existing_queue_should_settle_with_released(Config) ->
169-
Container = atom_to_binary(?FUNCTION_NAME, utf8),
170-
Suffix = <<"foo">>,
171-
%% does not exist
172-
QName = <<Container/binary, Suffix/binary>>,
173-
Host = ?config(rmq_hostname, Config),
174-
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
175-
Address = <<"/exchange/amq.direct/", QName/binary>>,
176-
177-
OpnConf = #{address => Host,
178-
port => Port,
179-
container_id => Container,
180-
sasl => {plain, <<"guest">>, <<"guest">>}},
181-
{ok, Connection} = amqp10_client:open_connection(OpnConf),
182-
{ok, Session} = amqp10_client:begin_session(Connection),
183-
SenderLinkName = <<"test-sender">>,
184-
{ok, Sender} = amqp10_client:attach_sender_link(Session,
185-
SenderLinkName,
186-
Address),
187-
ok = wait_for_credit(Sender),
188-
DTag1 = <<"dtag-1">>,
189-
%% create an unsettled message,
190-
%% link will be in "mixed" mode by default
191-
Msg1 = amqp10_msg:new(DTag1, <<"body-1">>, false),
192-
ok = amqp10_client:send_msg(Sender, Msg1),
193-
ok = wait_for_settlement(DTag1, released),
194-
195-
ok = amqp10_client:detach_link(Sender),
196-
ok = amqp10_client:close_connection(Connection),
197-
flush("post sender close"),
198-
ok.
199-
200-
open_link_to_non_existing_destination_should_end_session(Config) ->
201-
Container = atom_to_list(?FUNCTION_NAME),
202-
Name = Container ++ "foo",
203-
Addresses = [
204-
"/exchange/" ++ Name ++ "/bar",
205-
"/amq/queue/" ++ Name
206-
],
207-
Host = ?config(rmq_hostname, Config),
208-
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
209-
OpnConf = #{address => Host,
210-
port => Port,
211-
container_id => list_to_binary(Container),
212-
sasl => {plain, <<"guest">>, <<"guest">>}},
213-
214-
[begin
215-
{ok, Connection} = amqp10_client:open_connection(OpnConf),
216-
{ok, Session} = amqp10_client:begin_session(Connection),
217-
SenderLinkName = <<"test-sender">>,
218-
ct:pal("Address ~p", [Address]),
219-
{ok, _} = amqp10_client:attach_sender_link(Session,
220-
SenderLinkName,
221-
list_to_binary(Address)),
222-
223-
wait_for_session_end(Session),
224-
ok = amqp10_client:close_connection(Connection),
225-
flush("post sender close")
226-
227-
end || Address <- Addresses],
228-
ok.
229-
230166
roundtrip_classic_queue_with_drain(Config) ->
231167
QName = atom_to_binary(?FUNCTION_NAME, utf8),
232168
roundtrip_queue_with_drain(Config, <<"classic">>, QName).
@@ -446,27 +382,14 @@ wait_for_credit(Sender) ->
446382
ct:fail(credited_timeout)
447383
end.
448384

449-
wait_for_session_end(Session) ->
450-
receive
451-
{amqp10_event, {session, Session, {ended, _}}} ->
452-
flush(?FUNCTION_NAME),
453-
ok
454-
after 5000 ->
455-
flush("wait_for_session_end timed out"),
456-
ct:fail(settled_timeout)
457-
end.
458-
459385
wait_for_settlement(Tag) ->
460-
wait_for_settlement(Tag, accepted).
461-
462-
wait_for_settlement(Tag, State) ->
463386
receive
464-
{amqp10_disposition, {State, Tag}} ->
387+
{amqp10_disposition, {accepted, Tag}} ->
465388
flush(?FUNCTION_NAME),
466389
ok
467390
after 5000 ->
468391
flush("wait_for_settlement timed out"),
469-
ct:fail(settled_timeout)
392+
ct:fail(credited_timeout)
470393
end.
471394

472395
wait_for_accepts(0) -> ok;

0 commit comments

Comments
 (0)