Skip to content

Commit e974b7e

Browse files
authored
Await Begin response for new sessions (#45)
* Sync-up `CloseAsync` for publishers and consumers
1 parent 953b581 commit e974b7e

File tree

4 files changed

+77
-35
lines changed

4 files changed

+77
-35
lines changed

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,11 @@ void onAttached(ILink argLink, Attach argAttach)
6969
}
7070

7171
ReceiverLink? tmpReceiverLink = null;
72-
Task receiverLinkTask = Task.Run(() =>
72+
Task receiverLinkTask = Task.Run(async () =>
7373
{
74-
tmpReceiverLink = new ReceiverLink(_connection._nativePubSubSessions.GetOrCreateSession(), _id.ToString(), attach, onAttached);
74+
Session session = await _connection._nativePubSubSessions.GetOrCreateSessionAsync()
75+
.ConfigureAwait(false);
76+
tmpReceiverLink = new ReceiverLink(session, _id.ToString(), attach, onAttached);
7577
});
7678

7779
// TODO configurable timeout
@@ -214,19 +216,26 @@ public void Unpause()
214216
}
215217
}
216218

219+
// TODO cancellation token
217220
public override async Task CloseAsync()
218221
{
219-
// TODO make this method similar to publisher CloseAsync
220-
if (_receiverLink == null)
222+
if (_receiverLink is null)
221223
{
222224
return;
223225
}
224226

225227
OnNewStatus(State.Closing, null);
226228

227-
// TODO global timeout for closing, other async actions?
228-
await _receiverLink.CloseAsync(TimeSpan.FromSeconds(5))
229-
.ConfigureAwait(false);
229+
try
230+
{
231+
// TODO global timeout for closing, other async actions?
232+
await _receiverLink.CloseAsync(TimeSpan.FromSeconds(5))
233+
.ConfigureAwait(false);
234+
}
235+
catch (Exception ex)
236+
{
237+
Trace.WriteLine(TraceLevel.Warning, "Failed to close receiver link. The consumer will be closed anyway", ex);
238+
}
230239

231240
_receiverLink = null;
232241
OnNewStatus(State.Closed, null);

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,11 @@ void onAttached(ILink argLink, Attach argAttach)
5656
}
5757

5858
SenderLink? tmpSenderLink = null;
59-
Task senderLinkTask = Task.Run(() =>
59+
Task senderLinkTask = Task.Run(async () =>
6060
{
61-
tmpSenderLink = new SenderLink(_connection._nativePubSubSessions.GetOrCreateSession(), _id.ToString(), attach, onAttached);
61+
Session session = await _connection._nativePubSubSessions.GetOrCreateSessionAsync()
62+
.ConfigureAwait(false);
63+
tmpSenderLink = new SenderLink(session, _id.ToString(), attach, onAttached);
6264
});
6365

6466
// TODO configurable timeout
@@ -164,8 +166,7 @@ void OutcomeCallback(ILink sender, Message message, Outcome outcome, object stat
164166

165167
public override async Task CloseAsync()
166168
{
167-
// TODO make this method similar to consumer CloseAsync
168-
if (State == State.Closed)
169+
if (_senderLink is null)
169170
{
170171
return;
171172
}
@@ -174,17 +175,13 @@ public override async Task CloseAsync()
174175

175176
try
176177
{
177-
if (_senderLink != null)
178-
{
179-
await _senderLink.DetachAsync()
180-
.ConfigureAwait(false);
181-
await _senderLink.CloseAsync()
182-
.ConfigureAwait(false);
183-
}
178+
// TODO global timeout for closing, other async actions?
179+
await _senderLink.CloseAsync(TimeSpan.FromSeconds(5))
180+
.ConfigureAwait(false);
184181
}
185-
catch (Exception e)
182+
catch (Exception ex)
186183
{
187-
Trace.WriteLine(TraceLevel.Warning, "Failed to close sender link. The publisher will be closed anyway", e);
184+
Trace.WriteLine(TraceLevel.Warning, "Failed to close sender link. The publisher will be closed anyway", ex);
188185
}
189186

190187
_senderLink = null;
Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,66 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
5+
using System.Collections.Concurrent;
16
using Amqp;
7+
using Amqp.Framing;
28

39
namespace RabbitMQ.AMQP.Client.Impl;
410

5-
public class AmqpSessionManagement(AmqpConnection amqpConnection, int maxSessionsPerItem)
11+
internal class AmqpSessionManagement
612
{
7-
private int MaxSessionsPerItem { get; } = maxSessionsPerItem;
8-
private List<Session> Sessions { get; } = [];
13+
private readonly AmqpConnection _amqpConnection;
14+
private readonly int _maxSessionsPerItem;
15+
private readonly ConcurrentBag<Session> _sessions = [];
16+
17+
internal AmqpSessionManagement(AmqpConnection amqpConnection, int maxSessionsPerItem)
18+
{
19+
_amqpConnection = amqpConnection;
20+
_maxSessionsPerItem = maxSessionsPerItem;
21+
}
922

10-
public Session GetOrCreateSession()
23+
// TODO cancellation token
24+
internal async Task<Session> GetOrCreateSessionAsync()
1125
{
12-
if (Sessions.Count >= MaxSessionsPerItem)
26+
Session rv;
27+
28+
if (_sessions.Count >= _maxSessionsPerItem)
29+
{
30+
rv = _sessions.First();
31+
}
32+
else
1333
{
14-
return Sessions.First();
34+
TaskCompletionSource<ISession> sessionBeginTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
35+
void OnBegin(ISession session, Begin peerBegin)
36+
{
37+
sessionBeginTcs.SetResult(session);
38+
}
39+
40+
rv = new Session(_amqpConnection.NativeConnection, GetDefaultBegin(), OnBegin);
41+
ISession awaitedSession = await sessionBeginTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
42+
System.Diagnostics.Debug.Assert(Object.ReferenceEquals(rv, awaitedSession));
43+
_sessions.Add(rv);
1544
}
1645

17-
var session = new Session(amqpConnection.NativeConnection);
18-
Sessions.Add(session);
19-
return session;
46+
return rv;
47+
}
48+
49+
internal void ClearSessions()
50+
{
51+
// TODO close open sessions?
52+
_sessions.Clear();
2053
}
2154

22-
public void ClearSessions()
55+
// Note: these values come from Amqp.NET
56+
static Begin GetDefaultBegin()
2357
{
24-
Sessions.Clear();
58+
return new Begin()
59+
{
60+
IncomingWindow = 2048,
61+
OutgoingWindow = 2048,
62+
HandleMax = 1024,
63+
NextOutgoingId = uint.MaxValue - 2u
64+
};
2565
}
2666
}

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -309,10 +309,6 @@ RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.DeadLetterStrategy(RabbitMQ.AM
309309
RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.DeliveryLimit(int limit) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
310310
RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
311311
RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.QuorumInitialGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
312-
RabbitMQ.AMQP.Client.Impl.AmqpSessionManagement
313-
RabbitMQ.AMQP.Client.Impl.AmqpSessionManagement.AmqpSessionManagement(RabbitMQ.AMQP.Client.Impl.AmqpConnection! amqpConnection, int maxSessionsPerItem) -> void
314-
RabbitMQ.AMQP.Client.Impl.AmqpSessionManagement.ClearSessions() -> void
315-
RabbitMQ.AMQP.Client.Impl.AmqpSessionManagement.GetOrCreateSession() -> Amqp.Session!
316312
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification
317313
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.AmqpStreamSpecification(RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification! parent) -> void
318314
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.InitialClusterSize(int initialClusterSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!

0 commit comments

Comments
 (0)