Skip to content

Commit 590ef12

Browse files
committed
Increase the timeout for confirmation to 10s
3s was too aggressive. Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent cd46117 commit 590ef12

File tree

4 files changed

+42
-3
lines changed

4 files changed

+42
-3
lines changed

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ RabbitMQ.Stream.Client.Reliable.Producer.Info.get -> RabbitMQ.Stream.Client.Prod
109109
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
110110
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
111111
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
112+
RabbitMQ.Stream.Client.Reliable.ReconnectingArgs
113+
RabbitMQ.Stream.Client.Reliable.ReconnectingArgs.IsReconnecting.get -> bool
114+
RabbitMQ.Stream.Client.Reliable.ReconnectingArgs.ReconnectingArgs(bool isReconnecting) -> void
115+
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Reconnecting -> RabbitMQ.Stream.Client.Reliable.ReliableConfig.ReconnectingEventHandler
116+
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ReconnectingEventHandler
112117
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
113118
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.set -> void
114119
RabbitMQ.Stream.Client.RouteNotFoundException
@@ -146,3 +151,4 @@ RabbitMQ.Stream.Client.UnsupportedOperationException.UnsupportedOperationExcepti
146151
static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, System.Func<System.Memory<byte>, System.Threading.Tasks.Task> commandCallback, System.Func<string, System.Threading.Tasks.Task> closedCallBack, RabbitMQ.Stream.Client.SslOption sslOption, Microsoft.Extensions.Logging.ILogger logger) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Connection>
147152
static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.ReadOnlySequence<byte> seq, uint len) -> RabbitMQ.Stream.Client.Message
148153
static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer>
154+
virtual RabbitMQ.Stream.Client.Reliable.ReliableBase.OnReconnecting(RabbitMQ.Stream.Client.Reliable.ReconnectingArgs e) -> void

RabbitMQ.Stream.Client/Reliable/Producer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public record SuperStreamConfig
2222

2323
public record ProducerConfig : ReliableConfig
2424
{
25-
private readonly TimeSpan _timeoutMessageAfter = TimeSpan.FromSeconds(3);
25+
private readonly TimeSpan _timeoutMessageAfter = TimeSpan.FromSeconds(10);
2626

2727
/// <summary>
2828
/// Reference used for deduplication.

RabbitMQ.Stream.Client/Reliable/ReliableBase.cs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,32 @@
1010

1111
namespace RabbitMQ.Stream.Client.Reliable;
1212

13+
public class ReconnectingArgs : EventArgs
14+
{
15+
public ReconnectingArgs(bool isReconnecting)
16+
{
17+
IsReconnecting = isReconnecting;
18+
}
19+
20+
public bool IsReconnecting { get; }
21+
}
22+
1323
public record ReliableConfig
1424
{
1525
public IReconnectStrategy ReconnectStrategy { get; set; }
1626
public StreamSystem StreamSystem { get; }
1727
public string Stream { get; }
1828

29+
30+
public delegate void ReconnectingEventHandler(object sender, ReconnectingArgs e);
31+
32+
public event ReconnectingEventHandler Reconnecting;
33+
34+
internal virtual void OnReconnecting(ReconnectingArgs e)
35+
{
36+
Reconnecting?.Invoke(this, e);
37+
}
38+
1939
protected ReliableConfig(StreamSystem streamSystem, string stream)
2040
{
2141
if (string.IsNullOrWhiteSpace(stream))
@@ -41,6 +61,15 @@ public abstract class ReliableBase
4161
protected bool _isOpen;
4262
protected bool _inReconnection;
4363

64+
internal delegate void ReconnectingEventHandler(object sender, ReconnectingArgs e);
65+
66+
internal event ReconnectingEventHandler Reconnecting;
67+
68+
protected virtual void OnReconnecting(ReconnectingArgs e)
69+
{
70+
Reconnecting?.Invoke(this, e);
71+
}
72+
4473
protected abstract ILogger BaseLogger { get; }
4574

4675
internal async Task Init(IReconnectStrategy reconnectStrategy)
@@ -105,6 +134,7 @@ private async Task Init(bool boot, IReconnectStrategy reconnectStrategy)
105134
protected async Task TryToReconnect(IReconnectStrategy reconnectStrategy)
106135
{
107136
_inReconnection = true;
137+
OnReconnecting(new ReconnectingArgs(true));
108138
try
109139
{
110140
switch (await reconnectStrategy.WhenDisconnected(ToString()).ConfigureAwait(false) && _isOpen)
@@ -122,6 +152,7 @@ protected async Task TryToReconnect(IReconnectStrategy reconnectStrategy)
122152
finally
123153
{
124154
_inReconnection = false;
155+
OnReconnecting(new ReconnectingArgs(false));
125156
}
126157
}
127158

@@ -143,7 +174,8 @@ internal async Task HandleMetaDataMaybeReconnect(string stream, StreamSystem sys
143174
await Task.Delay(500).ConfigureAwait(false);
144175
if (await system.StreamExists(stream).ConfigureAwait(false))
145176
{
146-
BaseLogger.LogInformation("Meta data update stream: {StreamIdentifier}. The stream still exists. Client: {Identity}",
177+
BaseLogger.LogInformation(
178+
"Meta data update stream: {StreamIdentifier}. The stream still exists. Client: {Identity}",
147179
stream,
148180
ToString()
149181
);

docs/Documentation/Documentation.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
<MinVerSkip>true</MinVerSkip>
77
<Nullable>enable</Nullable>
88
<OutputType>Exe</OutputType>
9-
<TargetFramework>net6.0</TargetFramework>
9+
<TargetFramework>net7.0</TargetFramework>
10+
<TargetFrameworks />
1011
</PropertyGroup>
1112

1213
<ItemGroup>

0 commit comments

Comments
 (0)