Skip to content

Commit d427d47

Browse files
committed
Simplify rabbit_channel
by removing extra AMQP 1.0 logic for settling unroutable messages with released state. This commit reverts the workaround introduced by PR 8015.
1 parent 9e08c4c commit d427d47

File tree

3 files changed

+10
-58
lines changed

3 files changed

+10
-58
lines changed

deps/rabbit/src/rabbit_channel.erl

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,7 @@
108108
consumer_timeout,
109109
authz_context,
110110
%% defines how ofter gc will be executed
111-
writer_gc_threshold,
112-
%% TODO delete
113-
%% true with AMQP 1.0 to include the publishing sequence
114-
%% in the return callback, false otherwise
115-
extended_return_callback
111+
writer_gc_threshold
116112
}).
117113

118114
-record(pending_ack, {
@@ -513,7 +509,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
513509
MaxMessageSize = get_max_message_size(),
514510
ConsumerTimeout = get_consumer_timeout(),
515511
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
516-
UseExtendedReturnCallback = use_extended_return_callback(AmqpParams),
517512
{ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold),
518513
State = #ch{cfg = #conf{state = starting,
519514
protocol = Protocol,
@@ -532,8 +527,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
532527
max_message_size = MaxMessageSize,
533528
consumer_timeout = ConsumerTimeout,
534529
authz_context = OptionalVariables,
535-
writer_gc_threshold = GCThreshold,
536-
extended_return_callback = UseExtendedReturnCallback
530+
writer_gc_threshold = GCThreshold
537531
},
538532
limiter = Limiter,
539533
tx = none,
@@ -1043,17 +1037,6 @@ extract_variable_map_from_amqp_params([Value]) ->
10431037
extract_variable_map_from_amqp_params(_) ->
10441038
#{}.
10451039

1046-
%%TODO delete
1047-
%%
1048-
%% Use tuple representation of amqp_params to avoid a dependency on amqp_client.
1049-
%% Used for AMQP 1.0
1050-
use_extended_return_callback({amqp_params_direct,_,_,_,_,
1051-
{amqp_adapter_info,_,_,_,_,_,{'AMQP',"1.0"},_},
1052-
_}) ->
1053-
true;
1054-
use_extended_return_callback(_) ->
1055-
false.
1056-
10571040
check_msg_size(Content, MaxMessageSize, GCThreshold) ->
10581041
Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
10591042
case Size of
@@ -1272,15 +1255,15 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12721255
check_user_id_header(Props, State),
12731256
check_expiration_header(Props),
12741257
DoConfirm = Tx =/= none orelse ConfirmEnabled,
1275-
{DeliveryOptions, SeqNum, State1} =
1258+
{DeliveryOptions, State1} =
12761259
case DoConfirm of
12771260
false ->
1278-
{maps_put_truthy(flow, Flow, #{}), undefined, State0};
1261+
{maps_put_truthy(flow, Flow, #{}), State0};
12791262
true ->
12801263
rabbit_global_counters:messages_received_confirm(amqp091, 1),
12811264
SeqNo = State0#ch.publish_seqno,
12821265
Opts = maps_put_truthy(flow, Flow, #{correlation => SeqNo}),
1283-
{Opts, SeqNo, State0#ch{publish_seqno = SeqNo + 1}}
1266+
{Opts, State0#ch{publish_seqno = SeqNo + 1}}
12841267
end,
12851268
% rabbit_feature_flags:is_enabled(message_containers),
12861269
Message0 = mc_amqpl:message(ExchangeName,
@@ -1291,7 +1274,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12911274
[rabbit_channel:deliver_reply(RK, Message) ||
12921275
{virtual_reply_queue, RK} <- QNames],
12931276
Queues = rabbit_amqqueue:lookup_many(QNames),
1294-
ok = process_routing_mandatory(Mandatory, Queues, SeqNum, Message, ExchangeName, State0),
1277+
ok = process_routing_mandatory(Mandatory, Queues, Message, ExchangeName, State0),
12951278
rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
12961279
Username, TraceState),
12971280
%% TODO: call delivery_to_queues with plain args
@@ -2191,32 +2174,23 @@ deliver_to_queues(XName,
21912174

21922175
process_routing_mandatory(_Mandatory = true,
21932176
_RoutedToQs = [],
2194-
MsgSeqNo,
21952177
Msg,
21962178
XName,
2197-
State = #ch{cfg = #conf{extended_return_callback = ExtRetCallback}}) ->
2179+
State) ->
21982180
rabbit_global_counters:messages_unroutable_returned(amqp091, 1),
21992181
?INCR_STATS(exchange_stats, XName, 1, return_unroutable, State),
2200-
Content0 = mc:protocol_state(Msg),
2201-
Content = case ExtRetCallback of
2202-
true ->
2203-
%% providing the publishing sequence for AMQP 1.0
2204-
{MsgSeqNo, Content0};
2205-
false ->
2206-
Content0
2207-
end,
2182+
Content = mc:protocol_state(Msg),
22082183
[RoutingKey | _] = mc:get_annotation(routing_keys, Msg),
22092184
ok = basic_return(Content, RoutingKey, XName#resource.name, State, no_route);
22102185
process_routing_mandatory(_Mandatory = false,
22112186
_RoutedToQs = [],
2212-
_MsgSeqNo,
22132187
_Msg,
22142188
XName,
22152189
State) ->
22162190
rabbit_global_counters:messages_unroutable_dropped(amqp091, 1),
22172191
?INCR_STATS(exchange_stats, XName, 1, drop_unroutable, State),
22182192
ok;
2219-
process_routing_mandatory(_, _, _, _, _, _) ->
2193+
process_routing_mandatory(_, _, _, _, _) ->
22202194
ok.
22212195

22222196
process_routing_confirm(undefined, _, _, State) ->

deps/rabbit_common/src/rabbit_writer.erl

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,7 @@
105105

106106
-spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
107107
-spec send_command
108-
(pid(), rabbit_framing:amqp_method_record(),
109-
rabbit_types:content() |
110-
{integer(), rabbit_types:content()} %% publishing sequence for AMQP 1.0 return callback
111-
) ->
108+
(pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) ->
112109
'ok'.
113110
-spec send_command_sync(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
114111
-spec send_command_sync

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session.erl

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -839,24 +839,6 @@ handle_queue_event({queue_event, QRef, Evt},
839839
% rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
840840
end.
841841

842-
% handle_info({#'basic.return'{}, {DTag, _Msg}}, State = #state{writer_pid = WriterPid,
843-
% session = Session}) ->
844-
% {Reply, Session1} = rabbit_amqp1_0_session:return(DTag, Session),
845-
% case Reply of
846-
% undefined ->
847-
% ok;
848-
% _ ->
849-
% rabbit_amqp1_0_writer:send_command(
850-
% WriterPid,
851-
% rabbit_amqp1_0_session:flow_fields(Reply, Session)
852-
% )
853-
% end,
854-
% {noreply, state(Session1, State)};
855-
856-
% handle_info({#'basic.return'{}, _Msg}, State = #state{session = Session}) ->
857-
% rabbit_log:warning("AMQP 1.0 message return without publishing sequence"),
858-
% {noreply, state(Session, State)};
859-
860842
handle_queue_actions(Actions, State0) ->
861843
{ReplyRev, State} =
862844
lists:foldl(
@@ -1100,7 +1082,6 @@ process_routing_confirm([], _SenderSettles = false, DeliveryId, _, U) ->
11001082
first = {uint, DeliveryId},
11011083
settled = true,
11021084
state = #'v1_0.released'{}},
1103-
11041085
{U, [Disposition]};
11051086
process_routing_confirm([_|_] = Qs, _SenderSettles = false, DeliveryId, XName, U) ->
11061087
QNames = rabbit_amqqueue:queue_names(Qs),

0 commit comments

Comments
 (0)