Skip to content

Commit 14b6e2d

Browse files
authored
Check the logic status before raising the event (#385)
* Fixes: #384 Check the logic status before raising the event * Add RPC timeout configuration. Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent dbfcdc9 commit 14b6e2d

File tree

5 files changed

+45
-11
lines changed

5 files changed

+45
-11
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ public record ClientParameters
3737
public IDictionary<string, string> Properties { get; } =
3838
new Dictionary<string, string>
3939
{
40-
{"product", "RabbitMQ Stream"},
41-
{"version", Version.VersionString},
42-
{"platform", ".NET"},
40+
{ "product", "RabbitMQ Stream" },
41+
{ "version", Version.VersionString },
42+
{ "platform", ".NET" },
4343
{
4444
"copyright",
4545
"Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries."
@@ -48,7 +48,7 @@ public record ClientParameters
4848
"information",
4949
"Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/"
5050
},
51-
{"connection_name", "Unknown"}
51+
{ "connection_name", "Unknown" }
5252
};
5353

5454
public string UserName { get; set; } = "guest";
@@ -77,6 +77,8 @@ public string ClientProvidedName
7777

7878
public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain;
7979

80+
public TimeSpan RpcTimeOut { get; set; } = TimeSpan.FromSeconds(10);
81+
8082
internal void FireMetadataUpdate(MetaDataUpdate metaDataUpdate)
8183
{
8284
OnMetadataUpdate?.Invoke(metaDataUpdate);
@@ -113,8 +115,6 @@ public class Client : IClient
113115
{
114116
private bool isClosed = true;
115117

116-
private readonly TimeSpan defaultTimeout = TimeSpan.FromSeconds(10);
117-
118118
private uint correlationId = 0; // allow for some pre-amble
119119

120120
private Connection _connection;
@@ -150,7 +150,6 @@ public class Client : IClient
150150

151151
public int IncomingFrames => _connection.NumFrames;
152152

153-
//public int IncomingChannelCount => this.incoming.Reader.Count;
154153
private static readonly object Obj = new();
155154

156155
private readonly ILogger _logger;
@@ -494,7 +493,7 @@ private async ValueTask<TOut> Request<TIn, TOut>(Func<uint, TIn> request, TimeSp
494493
var tcs = PooledTaskSource<TOut>.Rent();
495494
requests.TryAdd(corr, tcs);
496495
await Publish(request(corr)).ConfigureAwait(false);
497-
using var cts = new CancellationTokenSource(timeout ?? defaultTimeout);
496+
using var cts = new CancellationTokenSource(timeout ?? Parameters.RpcTimeOut);
498497
await using (cts.Token.Register(
499498
valueTaskSource =>
500499
((ManualResetValueTaskSource<TOut>)valueTaskSource).SetException(
@@ -973,7 +972,16 @@ public bool RunContinuationsAsynchronously
973972
public short Version => _logic.Version;
974973
public void Reset() => _logic.Reset();
975974
public void SetResult(T result) => _logic.SetResult(result);
976-
public void SetException(Exception error) => _logic.SetException(error);
975+
976+
public void SetException(Exception error)
977+
{
978+
// https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/issues/384
979+
// we need to check if the task is pending before setting the exception
980+
if (_logic.GetStatus(_logic.Version) == ValueTaskSourceStatus.Pending)
981+
{
982+
_logic.SetException(error);
983+
}
984+
}
977985

978986
void IValueTaskSource.GetResult(short token) => _logic.GetResult(token);
979987
T IValueTaskSource<T>.GetResult(short token) => _logic.GetResult(token);

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Cli
5454
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void
5555
RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler
5656
RabbitMQ.Stream.Client.ClientParameters.OnMetadataUpdate -> RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler
57+
RabbitMQ.Stream.Client.ClientParameters.RpcTimeOut.get -> System.TimeSpan
58+
RabbitMQ.Stream.Client.ClientParameters.RpcTimeOut.set -> void
5759
RabbitMQ.Stream.Client.Connection.UpdateCloseStatus(string reason) -> void
5860
RabbitMQ.Stream.Client.ConnectionItem
5961
RabbitMQ.Stream.Client.ConnectionItem.Available.get -> bool
@@ -324,6 +326,8 @@ RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.C
324326
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void
325327
RabbitMQ.Stream.Client.StreamSystemConfig.ConnectionPoolConfig.get -> RabbitMQ.Stream.Client.ConnectionPoolConfig
326328
RabbitMQ.Stream.Client.StreamSystemConfig.ConnectionPoolConfig.set -> void
329+
RabbitMQ.Stream.Client.StreamSystemConfig.RpcTimeOut.get -> System.TimeSpan
330+
RabbitMQ.Stream.Client.StreamSystemConfig.RpcTimeOut.set -> void
327331
RabbitMQ.Stream.Client.SuperStreamSpec
328332
RabbitMQ.Stream.Client.SuperStreamSpec.Args.get -> System.Collections.Generic.IDictionary<string, string>
329333
RabbitMQ.Stream.Client.SuperStreamSpec.LeaderLocator.set -> void

RabbitMQ.Stream.Client/StreamSystem.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ internal void Validate()
2020
{
2121
throw new ArgumentException("ConnectionPoolConfig can't be null");
2222
}
23+
24+
if (RpcTimeOut < TimeSpan.FromSeconds(1))
25+
{
26+
throw new ArgumentException("RpcTimeOut must be at least 1 second");
27+
}
2328
}
2429

2530
public string UserName { get; set; } = "guest";
@@ -44,6 +49,13 @@ internal void Validate()
4449
/// Configure the connection pool for producers and consumers.
4550
/// </summary>
4651
public ConnectionPoolConfig ConnectionPoolConfig { get; set; } = new();
52+
53+
/// <summary>
54+
/// The timeout for RPC calls, like PeerProperties, QueryMetadata, etc.
55+
/// Default value is 10 seconds and in most cases it should be enough.
56+
/// Low value can cause false errors in the client.
57+
/// </summary>
58+
public TimeSpan RpcTimeOut { get; set; } = TimeSpan.FromSeconds(10);
4759
}
4860

4961
public class StreamSystem
@@ -85,7 +97,8 @@ public static async Task<StreamSystem> Create(StreamSystemConfig config, ILogger
8597
ClientProvidedName = config.ClientProvidedName,
8698
Heartbeat = config.Heartbeat,
8799
Endpoints = config.Endpoints,
88-
AuthMechanism = config.AuthMechanism
100+
AuthMechanism = config.AuthMechanism,
101+
RpcTimeOut = config.RpcTimeOut
89102
};
90103
// create the metadata client connection
91104
foreach (var endPoint in clientParams.Endpoints)

Tests/SystemTests.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,15 @@ await Assert.ThrowsAsync<AuthMechanismNotSupportedException>(
274274
);
275275
}
276276

277+
[Fact]
278+
public async void ValidateRpCtimeOut()
279+
{
280+
var config = new StreamSystemConfig() { RpcTimeOut = TimeSpan.FromMilliseconds(1) };
281+
await Assert.ThrowsAsync<ArgumentException>(
282+
async () => { await StreamSystem.Create(config); }
283+
);
284+
}
285+
277286
[Fact]
278287
public async void CloseProducerConsumerAfterForceCloseShouldNotRaiseError()
279288
{

kubernetes/stream_cluster.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ metadata:
1010
namespace: stream-clients-test
1111
spec:
1212
replicas: 3
13-
image: rabbitmq:3.13-rc-management
13+
image: rabbitmq:3.13-management
1414
service:
1515
type: LoadBalancer
1616
# tls:

0 commit comments

Comments
 (0)