Skip to content

Commit f63d30d

Browse files
Gsantomaggiolukebakken
authored andcommitted
Change the way to decide when the client socket is open
When the tcp connection is forced the client sets the status as closed. Make IsClosed setter private and only set in the Client ctor Remove limitations on test parallelism, add IsClosed to Connection
1 parent 5d44375 commit f63d30d

14 files changed

+129
-131
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,6 @@
1717

1818
namespace RabbitMQ.Stream.Client
1919
{
20-
// internal static class TaskExtensions
21-
// {
22-
// public static async Task TimeoutAfter(this Task task, TimeSpan timeout)
23-
// {
24-
// if (task == await Task.WhenAny(task, Task.Delay(timeout)).ConfigureAwait(false))
25-
// {
26-
// await task.ConfigureAwait(false);
27-
// }
28-
// else
29-
// {
30-
// var supressErrorTask = task.ContinueWith((t, s) =>
31-
// {
32-
// t.Exception?.Handle(e => true);
33-
// },
34-
// null,
35-
// CancellationToken.None,
36-
// TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
37-
// TaskScheduler.Default);
38-
// throw new TimeoutException();
39-
// }
40-
// }
41-
// }
4220
public record ClientParameters
4321
{
4422
private string _clientProvidedName;
@@ -50,7 +28,10 @@ public record ClientParameters
5028
{"version", Consts.ClientVersion},
5129
{"platform", ".NET"},
5230
{"copyright", "Copyright (c) 2020-2021 VMware, Inc. or its affiliates."},
53-
{"information", "Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/"},
31+
{
32+
"information",
33+
"Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/"
34+
},
5435
{"connection_name", "Unknown"}
5536
};
5637

@@ -60,6 +41,7 @@ public record ClientParameters
6041
public EndPoint Endpoint { get; set; } = new IPEndPoint(IPAddress.Loopback, 5552);
6142
public Action<MetaDataUpdate> MetadataHandler { get; set; } = _ => { };
6243
public Action<Exception> UnhandledExceptionHandler { get; set; } = _ => { };
44+
6345
public string ClientProvidedName
6446
{
6547
get => _clientProvidedName ??= Properties["connection_name"];
@@ -102,6 +84,8 @@ public int Write(Span<byte> span)
10284

10385
public class Client : IClient
10486
{
87+
private bool isClosed = false;
88+
10589
private readonly TimeSpan defaultTimeout = TimeSpan.FromSeconds(10);
10690

10791
private uint correlationId = 0; // allow for some pre-amble
@@ -124,7 +108,6 @@ public class Client : IClient
124108
private readonly IDictionary<byte, Func<Deliver, Task>> consumers =
125109
new ConcurrentDictionary<byte, Func<Deliver, Task>>();
126110

127-
private object closeResponse;
128111
private int publishCommandsSent;
129112

130113
public int PublishCommandsSent => publishCommandsSent;
@@ -154,11 +137,25 @@ private byte GetNextSubscriptionId()
154137
return result;
155138
}
156139

157-
public bool IsClosed => closeResponse != null;
140+
public bool IsClosed
141+
{
142+
get
143+
{
144+
if (connection.IsClosed)
145+
{
146+
isClosed = true;
147+
}
148+
149+
return isClosed;
150+
}
151+
152+
private set => isClosed = value;
153+
}
158154

159155
private Client(ClientParameters parameters)
160156
{
161157
Parameters = parameters;
158+
IsClosed = false;
162159
}
163160

164161
public delegate Task ConnectionCloseHandler(string reason);
@@ -167,12 +164,16 @@ private Client(ClientParameters parameters)
167164

168165
private async Task OnConnectionClosed(string reason)
169166
{
170-
await ConnectionClosed?.Invoke(reason)!;
167+
if (ConnectionClosed != null)
168+
{
169+
await ConnectionClosed?.Invoke(reason)!;
170+
}
171171
}
172172

173173
public static async Task<Client> Create(ClientParameters parameters)
174174
{
175175
var client = new Client(parameters);
176+
176177
client.connection = await Connection.Create(parameters.Endpoint,
177178
client.HandleIncoming, client.HandleClosed, parameters.Ssl);
178179

@@ -308,6 +309,7 @@ private uint NextCorrelationId()
308309

309310
private async Task HandleClosed(string reason)
310311
{
312+
IsClosed = true;
311313
await OnConnectionClosed(reason);
312314
}
313315

@@ -421,6 +423,7 @@ private void HandleCorrelatedCommand(ushort tag, ref ReadOnlySequence<byte> fram
421423
case CloseResponse.Key:
422424
CloseResponse.Read(frame, out var closeResponse);
423425
HandleCorrelatedResponse(closeResponse);
426+
IsClosed = true;
424427
break;
425428
default:
426429
if (MemoryMarshal.TryGetArray(frame.First, out var segment))
@@ -447,24 +450,26 @@ private void HandleCorrelatedResponse<T>(T command) where T : struct, ICommand
447450

448451
public async Task<CloseResponse> Close(string reason)
449452
{
450-
if (closeResponse != null)
453+
if (IsClosed)
451454
{
452-
return (CloseResponse)closeResponse;
455+
return new CloseResponse(0, ResponseCode.Ok);
453456
}
454457

455458
// TODO LRB timeout
456-
var result = await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason), TimeSpan.FromSeconds(30));
457-
closeResponse = result;
459+
var result =
460+
await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason),
461+
TimeSpan.FromSeconds(30));
458462

459463
try
460464
{
461465
connection.Dispose();
462466
}
463467
catch (Exception e)
464468
{
465-
LogEventSource.Log.LogError($"An error occurred while calling { nameof(connection.Dispose) }.", e);
469+
LogEventSource.Log.LogError($"An error occurred while calling {nameof(connection.Dispose)}.", e);
466470
}
467471

472+
IsClosed = true;
468473
return result;
469474
}
470475

RabbitMQ.Stream.Client/Connection.cs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ public class Connection : IDisposable
2222
private int numFrames;
2323
private readonly object writeLock = new();
2424
internal int NumFrames => numFrames;
25+
private bool isClosed = false;
26+
27+
public bool IsClosed => isClosed;
2528

2629
internal Func<Memory<byte>, Task> CommandCallback
2730
{
@@ -141,18 +144,20 @@ private async Task ProcessIncomingFrames()
141144

142145
await reader.CompleteAsync();
143146
}
144-
// The exception is needed mostly to raise the
145-
// closedCallback event.
146-
// It is useful to trace the error, but at this point
147-
// the socket is closed maybe not in the correct way
148147
catch (Exception e)
149148
{
149+
// The exception is needed mostly to raise the
150+
// closedCallback event.
151+
// It is useful to trace the error, but at this point
152+
// the socket is closed maybe not in the correct way
150153
Debug.WriteLine($"Error reading the socket, error: {e}");
151154
}
152-
153-
await closedCallback?.Invoke("TCP Connection Closed")!;
154-
155-
Debug.WriteLine("TCP Connection Closed");
155+
finally
156+
{
157+
isClosed = true;
158+
await closedCallback?.Invoke("TCP Connection Closed")!;
159+
Debug.WriteLine("TCP Connection Closed");
160+
}
156161
}
157162

158163
private static bool TryReadFrame(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> frame)

RabbitMQ.Stream.Client/Consumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ await config.MessageHandler(this,
119119

120120
public async Task<ResponseCode> Close()
121121
{
122-
if (_disposed)
122+
if (client.IsClosed)
123123
{
124124
return ResponseCode.Ok;
125125
}

RabbitMQ.Stream.Client/Producer.cs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public record ProducerConfig : INamedEntity
3232

3333
public class Producer : AbstractEntity, IDisposable
3434
{
35-
private bool _disposed;
35+
private readonly bool _disposed;
3636
private byte publisherId;
3737
private readonly ProducerConfig config;
3838
private readonly Channel<OutgoingMsg> messageBuffer;
@@ -82,11 +82,7 @@ private async Task Init()
8282
{
8383
foreach (var id in publishingIds.Span)
8484
{
85-
config.ConfirmHandler(new Confirmation
86-
{
87-
PublishingId = id,
88-
Code = ResponseCode.Ok,
89-
});
85+
config.ConfirmHandler(new Confirmation { PublishingId = id, Code = ResponseCode.Ok, });
9086
}
9187

9288
semaphore.Release(publishingIds.Length);
@@ -95,11 +91,7 @@ private async Task Init()
9591
{
9692
foreach (var (id, code) in errors)
9793
{
98-
config.ConfirmHandler(new Confirmation
99-
{
100-
PublishingId = id,
101-
Code = code,
102-
});
94+
config.ConfirmHandler(new Confirmation { PublishingId = id, Code = code, });
10395
}
10496

10597
semaphore.Release(errors.Length);
@@ -198,7 +190,7 @@ async Task SendMessages(List<(ulong, Message)> messages)
198190

199191
public async Task<ResponseCode> Close()
200192
{
201-
if (_disposed)
193+
if (client.IsClosed)
202194
{
203195
return ResponseCode.Ok;
204196
}
@@ -207,7 +199,6 @@ public async Task<ResponseCode> Close()
207199
var result = deletePublisherResponse.ResponseCode;
208200
var closed = client.MaybeClose($"client-close-publisher: {publisherId}");
209201
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"client-close-publisher: {publisherId}");
210-
_disposed = true;
211202
return result;
212203
}
213204

Tests/Amqp10Tests.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ namespace Tests
1414
public class Amqp10Tests
1515
{
1616
[Fact]
17-
[WaitTestBeforeAfter]
1817
public void ReadsThrowsExceptionInvalidType()
1918
{
2019
var data = new byte[10];
@@ -119,7 +118,6 @@ public void ReadsThrowsExceptionInvalidType()
119118
}
120119

121120
[Fact]
122-
[WaitTestBeforeAfter]
123121
public void ValidateFormatCode()
124122
{
125123
const bool boolTrue = true;
@@ -256,7 +254,6 @@ private static void RunValidateFormatCode<T>(T value, byte[] result)
256254
}
257255

258256
[Fact]
259-
[WaitTestBeforeAfter]
260257
public void Validate32Bytes8BytesLists()
261258
{
262259
var value32Bin = new byte[] { 0xD0, 0x0, 0x0, 0x0, 0xF, 0x0, 0x0, 0x1, 0xF };
@@ -278,7 +275,6 @@ public void Validate32Bytes8BytesLists()
278275
}
279276

280277
[Fact]
281-
[WaitTestBeforeAfter]
282278
public void ValidateMessagesFromGo()
283279
{
284280
// These files are generated with the Go AMQP 1.0 client
@@ -379,7 +375,6 @@ public void ValidateMessagesFromGo()
379375
}
380376

381377
[Fact]
382-
[WaitTestBeforeAfter]
383378
public void MapEntriesWithAnEmptyKeyShouldNotBeWrittenToTheWire()
384379
{
385380
// Given we have an annotation with a valid key

Tests/ApiApproval.Approve.verified.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ namespace RabbitMQ.Stream.Client
266266
}
267267
public class Connection : System.IDisposable
268268
{
269+
public bool IsClosed { get; }
269270
public void Dispose() { }
270271
public System.Threading.Tasks.ValueTask<bool> Write<T>(T command)
271272
where T : struct, RabbitMQ.Stream.Client.ICommand { }

0 commit comments

Comments
 (0)