Skip to content

Commit 34fb4bc

Browse files
author
Stefán J. Sigurðarson
committed
Improving the connection logic.
1 parent 78784ed commit 34fb4bc

File tree

6 files changed

+76
-18
lines changed

6 files changed

+76
-18
lines changed

RabbitMQTest/Program.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class Program
1515
private static int itemsPerBatch = 500;
1616
static async Task Main(string[] args)
1717
{
18+
ThreadPool.SetMinThreads(16 * Environment.ProcessorCount, 16 * Environment.ProcessorCount);
1819
var connectionString = new Uri("amqp://guest:guest@localhost/");
1920

2021
var connectionFactory = new ConnectionFactory() { DispatchConsumersAsync = true, Uri = connectionString };
@@ -23,7 +24,7 @@ static async Task Main(string[] args)
2324
var publisher = connection.CreateModel();
2425
var subscriber = connection2.CreateModel();
2526
publisher.ConfirmSelect();
26-
subscriber.ConfirmSelect();
27+
//subscriber.ConfirmSelect();
2728

2829
publisher.ExchangeDeclare("test", ExchangeType.Topic, true);
2930

@@ -47,8 +48,8 @@ static async Task Main(string[] args)
4748
batch.Add("test", "myawesome.routing.key", false, properties, payload);
4849
}
4950
batch.Publish();
50-
await publisher.WaitForConfirmsOrDieAsync();
5151
messagesSent += itemsPerBatch;
52+
await publisher.WaitForConfirmsOrDieAsync();
5253
}
5354
});
5455

projects/client/RabbitMQ.Client/src/client/api/SslHelper.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,7 @@ public static Stream TcpUpgrade(Stream tcpStream, SslOption sslOption)
7272

7373
var sslStream = new SslStream(tcpStream, false, remoteCertValidator, localCertSelector);
7474

75-
sslStream.AuthenticateAsClientAsync(sslOption.ServerName, sslOption.Certs, sslOption.Version,
76-
sslOption.CheckCertificateRevocation).GetAwaiter().GetResult();
77-
75+
sslStream.AuthenticateAsClient(sslOption.ServerName, sslOption.Certs, sslOption.Version, sslOption.CheckCertificateRevocation);
7876
return sslStream;
7977
}
8078

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using RabbitMQ.Client.Exceptions;
8+
9+
namespace RabbitMQ.Client.Impl
10+
{
11+
public class AsyncRpcContinuation : IRpcContinuation
12+
{
13+
private readonly TaskCompletionSource<Command> _taskCompletionSource = new TaskCompletionSource<Command>(TaskCreationOptions.RunContinuationsAsynchronously);
14+
15+
public virtual async ValueTask<Command> GetReplyAsync(CancellationToken cancellationToken = default)
16+
{
17+
try
18+
{
19+
if (cancellationToken != default)
20+
{
21+
using (cancellationToken.Register(() => _taskCompletionSource.TrySetCanceled(cancellationToken)))
22+
{
23+
return await _taskCompletionSource.Task.ConfigureAwait(false);
24+
}
25+
}
26+
27+
return await _taskCompletionSource.Task.ConfigureAwait(false);
28+
}
29+
catch (OperationInterruptedException)
30+
{
31+
throw;
32+
}
33+
}
34+
35+
public virtual async ValueTask<Command> GetReplyAsync(TimeSpan timeout)
36+
{
37+
using (CancellationTokenSource cts = new CancellationTokenSource(timeout))
38+
{
39+
return await GetReplyAsync(cts.Token).ConfigureAwait(false);
40+
}
41+
}
42+
43+
public void HandleCommand(Command cmd)
44+
{
45+
_taskCompletionSource.TrySetResult(cmd);
46+
}
47+
48+
public void HandleModelShutdown(ShutdownEventArgs reason)
49+
{
50+
_taskCompletionSource.TrySetException(new OperationInterruptedException(reason));
51+
}
52+
}
53+
}

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1181,19 +1181,25 @@ Command ChannelCloseWrapper(ushort reasonCode, string reasonText)
11811181

11821182
void StartAndTune()
11831183
{
1184-
var connectionStartCell = new BlockingCell<ConnectionStartDetails>();
1185-
m_model0.m_connectionStartCell = connectionStartCell;
1184+
m_model0.m_connectionStartCell = new TaskCompletionSource<ConnectionStartDetails>(TaskCreationOptions.RunContinuationsAsynchronously);
11861185
m_model0.HandshakeContinuationTimeout = m_factory.HandshakeContinuationTimeout;
11871186
m_frameHandler.ReadTimeout = m_factory.HandshakeContinuationTimeout;
11881187
m_frameHandler.SendHeader();
1188+
try
1189+
{
1190+
m_model0.m_connectionStartCell.Task.Wait(m_factory.HandshakeContinuationTimeout);
1191+
}
1192+
catch (Exception)
1193+
{
1194+
m_model0.m_connectionStartCell.TrySetCanceled();
1195+
}
11891196

1190-
var connectionStart = connectionStartCell.WaitForValue();
1191-
1192-
if (connectionStart == null)
1197+
if (m_model0.m_connectionStartCell.Task.IsCanceled)
11931198
{
11941199
throw new IOException("connection.start was never received, likely due to a network timeout");
11951200
}
11961201

1202+
var connectionStart = m_model0.m_connectionStartCell.Task.Result;
11971203
ServerProperties = connectionStart.m_serverProperties;
11981204

11991205
var serverVersion = new AmqpVersion(connectionStart.m_versionMajor,

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public abstract class ModelBase : IFullModel, IRecoverable
6060

6161
///<summary>Only used to kick-start a connection open
6262
///sequence. See <see cref="Connection.Open"/> </summary>
63-
public BlockingCell<ConnectionStartDetails> m_connectionStartCell = null;
63+
//public BlockingCell<ConnectionStartDetails> m_connectionStartCell = null;
64+
public TaskCompletionSource<ConnectionStartDetails> m_connectionStartCell;
6465

6566
private TimeSpan m_handshakeContinuationTimeout = TimeSpan.FromSeconds(10);
6667
private TimeSpan m_continuationTimeout = TimeSpan.FromSeconds(20);
@@ -401,7 +402,7 @@ public void FinishClose()
401402
}
402403
if (m_connectionStartCell != null)
403404
{
404-
m_connectionStartCell.ContinueWithValue(null);
405+
m_connectionStartCell.TrySetResult(null);
405406
}
406407
}
407408

@@ -958,8 +959,7 @@ public void HandleConnectionStart(byte versionMajor,
958959
m_mechanisms = mechanisms,
959960
m_locales = locales
960961
};
961-
m_connectionStartCell.ContinueWithValue(details);
962-
m_connectionStartCell = null;
962+
m_connectionStartCell.TrySetResult(details);
963963
}
964964

965965
///<summary>Handle incoming Connection.Tune

projects/client/RabbitMQ.Client/src/client/impl/SimpleBlockingRpcContinuation.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42-
using System.Diagnostics;
42+
4343
using RabbitMQ.Client.Exceptions;
4444
using RabbitMQ.Util;
4545

@@ -79,19 +79,19 @@ public virtual Command GetReply(TimeSpan timeout)
7979
}
8080
}
8181

82-
private static void ReportInvalidInvariant(Either<Command,ShutdownEventArgs> result)
82+
private static void ReportInvalidInvariant(Either<Command, ShutdownEventArgs> result)
8383
{
8484
string error = "Illegal EitherAlternative " + result.Alternative;
8585
}
8686

8787
public virtual void HandleCommand(Command cmd)
8888
{
89-
m_cell.ContinueWithValue(Either<Command,ShutdownEventArgs>.Left(cmd));
89+
m_cell.ContinueWithValue(Either<Command, ShutdownEventArgs>.Left(cmd));
9090
}
9191

9292
public virtual void HandleModelShutdown(ShutdownEventArgs reason)
9393
{
94-
m_cell.ContinueWithValue(Either<Command,ShutdownEventArgs>.Right(reason));
94+
m_cell.ContinueWithValue(Either<Command, ShutdownEventArgs>.Right(reason));
9595
}
9696
}
9797
}

0 commit comments

Comments
 (0)