Skip to content

Commit 1bc9ddc

Browse files
committed
Detach link pair
1 parent 18e19ff commit 1bc9ddc

File tree

3 files changed

+91
-13
lines changed

3 files changed

+91
-13
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@
7878
%% For AMQP management operations, we require a link pair as described in
7979
%% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html
8080
-record(management_link_pair, {
81-
client_terminus_address :: tuple(),
81+
client_terminus_address,
8282
incoming_half :: unattached | link_handle(),
8383
outgoing_half :: unattached | link_handle()
8484
}).
@@ -254,6 +254,8 @@
254254
queue_states = rabbit_queue_type:init() :: rabbit_queue_type:state()
255255
}).
256256

257+
-type state() :: #state{}.
258+
257259
start_link(ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, BeginFrame) ->
258260
Args = {ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, BeginFrame},
259261
Opts = [{hibernate_after, ?HIBERNATE_AFTER}],
@@ -1138,10 +1140,11 @@ handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
11381140
{Unsettled1, _RemovedMsgIds} = remove_link_from_outgoing_unsettled_map(Ctag, Unsettled0),
11391141
{QStates0, Unsettled1, OutgoingLinks0}
11401142
end,
1141-
State = State0#state{queue_states = QStates,
1142-
incoming_links = maps:remove(HandleInt, IncomingLinks),
1143-
outgoing_links = OutgoingLinks,
1144-
outgoing_unsettled_map = Unsettled},
1143+
State1 = State0#state{queue_states = QStates,
1144+
incoming_links = maps:remove(HandleInt, IncomingLinks),
1145+
outgoing_links = OutgoingLinks,
1146+
outgoing_unsettled_map = Unsettled},
1147+
State = maybe_detach_mgmt_link(HandleInt, State1),
11451148
maybe_detach_reply(Detach, State, State0),
11461149
publisher_or_consumer_deleted(State, State0),
11471150
{noreply, State};
@@ -2455,20 +2458,61 @@ publisher_or_consumer_deleted(
24552458

24562459
%% If we previously already sent a detach with an error condition, and the Detach we
24572460
%% receive here is therefore the client's reply, do not reply again with a 3rd detach.
2458-
maybe_detach_reply(Detach,
2459-
#state{incoming_links = NewIncomingLinks,
2460-
outgoing_links = NewOutgoingLinks,
2461-
cfg = #cfg{writer_pid = WriterPid,
2462-
channel_num = Ch}},
2463-
#state{incoming_links = OldIncomingLinks,
2464-
outgoing_links = OldOutgoingLinks})
2461+
maybe_detach_reply(
2462+
Detach,
2463+
#state{incoming_links = NewIncomingLinks,
2464+
outgoing_links = NewOutgoingLinks,
2465+
incoming_management_links = NewIncomingMgmtLinks,
2466+
outgoing_management_links = NewOutgoingMgmtLinks,
2467+
cfg = #cfg{writer_pid = WriterPid,
2468+
channel_num = Ch}},
2469+
#state{incoming_links = OldIncomingLinks,
2470+
outgoing_links = OldOutgoingLinks,
2471+
incoming_management_links = OldIncomingMgmtLinks,
2472+
outgoing_management_links = OldOutgoingMgmtLinks})
24652473
when map_size(NewIncomingLinks) < map_size(OldIncomingLinks) orelse
2466-
map_size(NewOutgoingLinks) < map_size(OldOutgoingLinks) ->
2474+
map_size(NewOutgoingLinks) < map_size(OldOutgoingLinks) orelse
2475+
map_size(NewIncomingMgmtLinks) < map_size(OldIncomingMgmtLinks) orelse
2476+
map_size(NewOutgoingMgmtLinks) < map_size(OldOutgoingMgmtLinks) ->
24672477
Reply = Detach#'v1_0.detach'{error = undefined},
24682478
rabbit_amqp_writer:send_command(WriterPid, Ch, Reply);
24692479
maybe_detach_reply(_, _, _) ->
24702480
ok.
24712481

2482+
-spec maybe_detach_mgmt_link(link_handle(), state()) -> state().
2483+
maybe_detach_mgmt_link(
2484+
HandleInt,
2485+
State = #state{management_link_pairs = LinkPairs0,
2486+
incoming_management_links = IncomingLinks0,
2487+
outgoing_management_links = OutgoingLinks0}) ->
2488+
case maps:take(HandleInt, IncomingLinks0) of
2489+
{#management_link{name = Name}, IncomingLinks} ->
2490+
Pair = #management_link_pair{outgoing_half = OutgoingHalf} = maps:get(Name, LinkPairs0),
2491+
LinkPairs = case OutgoingHalf of
2492+
unattached ->
2493+
maps:remove(Name, LinkPairs0);
2494+
_ ->
2495+
maps:update(Name, Pair#management_link_pair{incoming_half = unattached}, LinkPairs0)
2496+
end,
2497+
State#state{incoming_management_links = IncomingLinks,
2498+
management_link_pairs = LinkPairs};
2499+
error ->
2500+
case maps:take(HandleInt, OutgoingLinks0) of
2501+
{#management_link{name = Name}, OutgoingLinks} ->
2502+
Pair = #management_link_pair{incoming_half = IncomingHalf} = maps:get(Name, LinkPairs0),
2503+
LinkPairs = case IncomingHalf of
2504+
unattached ->
2505+
maps:remove(Name, LinkPairs0);
2506+
_ ->
2507+
maps:update(Name, Pair#management_link_pair{outgoing_half = unattached}, LinkPairs0)
2508+
end,
2509+
State#state{outgoing_management_links = OutgoingLinks,
2510+
management_link_pairs = LinkPairs};
2511+
error ->
2512+
State
2513+
end
2514+
end.
2515+
24722516
check_internal_exchange(#exchange{internal = true,
24732517
name = XName}) ->
24742518
protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,

deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
-feature(maybe_expr, enable).
1010

1111
-export[attach_management_link_pair_sync/2,
12+
detach_management_link_pair_sync/1,
1213
declare_queue/2,
1314
declare_exchange/2,
1415
bind_queue/5,
@@ -87,6 +88,38 @@ await_attached(Ref) ->
8788
{error, timeout}
8889
end.
8990

91+
-spec detach_management_link_pair_sync(link_pair()) ->
92+
ok | {error, term()}.
93+
detach_management_link_pair_sync(
94+
#link_pair{outgoing_link = OutgoingLink,
95+
incoming_link = IncomingLink}) ->
96+
maybe
97+
ok ?= detach(OutgoingLink),
98+
ok ?= detach(IncomingLink),
99+
ok ?= await_detached(OutgoingLink),
100+
await_detached(IncomingLink)
101+
end.
102+
103+
-spec detach(amqp10_client:link_ref()) ->
104+
ok | {error, term()}.
105+
detach(Ref) ->
106+
try amqp10_client:detach_link(Ref)
107+
catch exit:Reason ->
108+
{error, Reason}
109+
end.
110+
111+
-spec await_detached(amqp10_client:link_ref()) ->
112+
ok | {error, term()}.
113+
await_detached(Ref) ->
114+
receive
115+
{amqp10_event, {link, Ref, {detached, normal}}} ->
116+
ok;
117+
{amqp10_event, {link, Ref, {detached, Err}}} ->
118+
{error, Err}
119+
after ?TIMEOUT ->
120+
{error, timeout}
121+
end.
122+
90123
-spec declare_queue(link_pair(), queue_properties()) ->
91124
{ok, map()} | {error, term()}.
92125
declare_queue(LinkPair, QueueProperties) ->

deps/rabbitmq_amqp_client/test/management_SUITE.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ all_management_operations(Config) ->
155155
?assertEqual({ok, #{message_count => 0}},
156156
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
157157

158+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
158159
ok = amqp10_client:end_session(Session),
159160
ok = amqp10_client:close_connection(Connection).
160161

0 commit comments

Comments
 (0)