Skip to content

Commit ca95dca

Browse files
committed
Cache exchange record
for default and pre-declared exchanges to save copying the #exchange{} record (i.e. save an ETS lookup call) on every received message. The default and pre-declared exchanges are protected from deletion and modification. Exchange routing decorators are not used in tier 1 plugins and in no open source tier 2 plugin.
1 parent 1bc9ddc commit ca95dca

File tree

2 files changed

+50
-26
lines changed

2 files changed

+50
-26
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@
9494
}).
9595

9696
-record(incoming_link, {
97-
exchange :: rabbit_exchange:name(),
97+
exchange :: rabbit_types:exchange() | rabbit_exchange:name(),
9898
routing_key :: undefined | rabbit_types:routing_key(),
9999
%% queue_name_bin is only set if the link target address refers to a queue.
100100
queue_name_bin :: undefined | rabbit_misc:resource_name(),
@@ -857,9 +857,9 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
857857
user = User}}) ->
858858
ok = validate_attach(Attach),
859859
case ensure_target(Target, Vhost, User) of
860-
{ok, XName, RoutingKey, QNameBin} ->
860+
{ok, Exchange, RoutingKey, QNameBin} ->
861861
IncomingLink = #incoming_link{
862-
exchange = XName,
862+
exchange = Exchange,
863863
routing_key = RoutingKey,
864864
queue_name_bin = QNameBin,
865865
delivery_count = DeliveryCountInt,
@@ -1757,7 +1757,7 @@ incoming_link_transfer(
17571757
rcv_settle_mode = RcvSettleMode,
17581758
handle = Handle = ?UINT(HandleInt)},
17591759
MsgPart,
1760-
#incoming_link{exchange = XName = #resource{name = XNameBin},
1760+
#incoming_link{exchange = Exchange,
17611761
routing_key = LinkRKey,
17621762
delivery_count = DeliveryCount0,
17631763
incoming_unconfirmed_map = U0,
@@ -1789,20 +1789,20 @@ incoming_link_transfer(
17891789
Sections = amqp10_framing:decode_bin(MsgBin),
17901790
?DEBUG("~s Inbound content:~n ~tp",
17911791
[?MODULE, [amqp10_framing:pprint(Section) || Section <- Sections]]),
1792-
Anns0 = #{?ANN_EXCHANGE => XNameBin},
1793-
Anns = case LinkRKey of
1794-
undefined -> Anns0;
1795-
_ -> Anns0#{?ANN_ROUTING_KEYS => [LinkRKey]}
1796-
end,
1797-
Mc0 = mc:init(mc_amqp, Sections, Anns),
1798-
Mc1 = rabbit_message_interceptor:intercept(Mc0),
1799-
{Mc, RoutingKey} = ensure_routing_key(Mc1),
1800-
check_user_id(Mc, User),
1801-
messages_received(Settled),
1802-
case rabbit_exchange:lookup(XName) of
1803-
{ok, Exchange} ->
1804-
check_write_permitted_on_topic(Exchange, User, RoutingKey),
1805-
QNames = rabbit_exchange:route(Exchange, Mc, #{return_binding_keys => true}),
1792+
case rabbit_exchange_lookup(Exchange) of
1793+
{ok, X = #exchange{name = #resource{name = XNameBin}}} ->
1794+
Anns0 = #{?ANN_EXCHANGE => XNameBin},
1795+
Anns = case LinkRKey of
1796+
undefined -> Anns0;
1797+
_ -> Anns0#{?ANN_ROUTING_KEYS => [LinkRKey]}
1798+
end,
1799+
Mc0 = mc:init(mc_amqp, Sections, Anns),
1800+
Mc1 = rabbit_message_interceptor:intercept(Mc0),
1801+
{Mc, RoutingKey} = ensure_routing_key(Mc1),
1802+
check_user_id(Mc, User),
1803+
messages_received(Settled),
1804+
check_write_permitted_on_topic(X, User, RoutingKey),
1805+
QNames = rabbit_exchange:route(X, Mc, #{return_binding_keys => true}),
18061806
rabbit_trace:tap_in(Mc, QNames, ConnName, ChannelNum, Username, Trace),
18071807
Opts = #{correlation => {HandleInt, DeliveryId}},
18081808
Qs0 = rabbit_amqqueue:lookup_many(QNames),
@@ -1838,6 +1838,11 @@ incoming_link_transfer(
18381838
{error, [Disposition, Detach]}
18391839
end.
18401840

1841+
rabbit_exchange_lookup(X = #exchange{}) ->
1842+
{ok, X};
1843+
rabbit_exchange_lookup(XName = #resource{}) ->
1844+
rabbit_exchange:lookup(XName).
1845+
18411846
ensure_routing_key(Mc) ->
18421847
case mc:routing_keys(Mc) of
18431848
[RoutingKey] ->
@@ -1911,16 +1916,25 @@ ensure_target(#'v1_0.target'{address = Address,
19111916
{ok, Dest} ->
19121917
QNameBin = ensure_terminus(target, Dest, Vhost, User, Durable),
19131918
{XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest),
1914-
XName = rabbit_misc:r(Vhost, exchange, list_to_binary(XNameList1)),
1919+
XNameBin = list_to_binary(XNameList1),
1920+
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
19151921
{ok, X} = rabbit_exchange:lookup(XName),
19161922
check_internal_exchange(X),
19171923
check_write_permitted(XName, User),
1924+
%% Pre-declared exchanges are protected against deletion and modification.
1925+
%% Let's cache the whole #exchange{} record to save a
1926+
%% rabbit_exchange:lookup(XName) call each time we receive a message.
1927+
Exchange = case XNameBin of
1928+
<<>> -> X;
1929+
<<"amq.", _/binary>> -> X;
1930+
_ -> XName
1931+
end,
19181932
RoutingKey = case RK of
19191933
undefined -> undefined;
19201934
[] -> undefined;
19211935
_ -> list_to_binary(RK)
19221936
end,
1923-
{ok, XName, RoutingKey, QNameBin};
1937+
{ok, Exchange, RoutingKey, QNameBin};
19241938
{error, _} = E ->
19251939
E
19261940
end;

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -925,35 +925,45 @@ server_closes_link(QType, Config) ->
925925

926926
server_closes_link_exchange(Config) ->
927927
XName = atom_to_binary(?FUNCTION_NAME),
928+
QName = <<"my queue">>,
929+
RoutingKey = <<"my routing key">>,
928930
Ch = rabbit_ct_client_helpers:open_channel(Config),
929931
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = XName}),
930-
932+
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
933+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName,
934+
exchange = XName,
935+
routing_key = RoutingKey}),
931936
OpnConf = connection_config(Config),
932937
{ok, Connection} = amqp10_client:open_connection(OpnConf),
933938
{ok, Session} = amqp10_client:begin_session_sync(Connection),
934-
Address = <<"/exchange/", XName/binary, "/some-routing-key">>,
939+
Address = <<"/exchange/", XName/binary, "/", RoutingKey/binary>>,
935940
{ok, Sender} = amqp10_client:attach_sender_link(
936941
Session, <<"test-sender">>, Address),
937942
ok = wait_for_credit(Sender),
938943
?assertMatch(#{publishers := 1}, get_global_counters(Config)),
939944

945+
DTag1 = <<1>>,
946+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)),
947+
ok = wait_for_settlement(DTag1),
948+
940949
%% Server closes the link endpoint due to some AMQP 1.0 external condition:
941950
%% In this test, the external condition is that an AMQP 0.9.1 client deletes the exchange.
942951
#'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = XName}),
943-
ok = rabbit_ct_client_helpers:close_channel(Ch),
944952

945953
%% When we publish the next message, we expect:
946954
%% 1. that the message is released because the exchange doesn't exist anymore, and
947-
DTag = <<255>>,
948-
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, <<"body">>, false)),
949-
ok = wait_for_settlement(DTag, released),
955+
DTag2 = <<255>>,
956+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag2, <<"m2">>, false)),
957+
ok = wait_for_settlement(DTag2, released),
950958
%% 2. that the server closes the link, i.e. sends us a DETACH frame.
951959
ExpectedError = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED},
952960
receive {amqp10_event, {link, Sender, {detached, ExpectedError}}} -> ok
953961
after 5000 -> ct:fail("server did not close our outgoing link")
954962
end,
955963
?assertMatch(#{publishers := 0}, get_global_counters(Config)),
956964

965+
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
966+
ok = rabbit_ct_client_helpers:close_channel(Ch),
957967
ok = end_session_sync(Session),
958968
ok = amqp10_client:close_connection(Connection).
959969

0 commit comments

Comments
 (0)