Skip to content

Commit a6f8240

Browse files
authored
Merge pull request #98 from rabbitmq/handle_forced_close
Change the way to decide when the client socket is open
2 parents 5d44375 + f63d30d commit a6f8240

14 files changed

+129
-131
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,6 @@
1717

1818
namespace RabbitMQ.Stream.Client
1919
{
20-
// internal static class TaskExtensions
21-
// {
22-
// public static async Task TimeoutAfter(this Task task, TimeSpan timeout)
23-
// {
24-
// if (task == await Task.WhenAny(task, Task.Delay(timeout)).ConfigureAwait(false))
25-
// {
26-
// await task.ConfigureAwait(false);
27-
// }
28-
// else
29-
// {
30-
// var supressErrorTask = task.ContinueWith((t, s) =>
31-
// {
32-
// t.Exception?.Handle(e => true);
33-
// },
34-
// null,
35-
// CancellationToken.None,
36-
// TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
37-
// TaskScheduler.Default);
38-
// throw new TimeoutException();
39-
// }
40-
// }
41-
// }
4220
public record ClientParameters
4321
{
4422
private string _clientProvidedName;
@@ -50,7 +28,10 @@ public record ClientParameters
5028
{"version", Consts.ClientVersion},
5129
{"platform", ".NET"},
5230
{"copyright", "Copyright (c) 2020-2021 VMware, Inc. or its affiliates."},
53-
{"information", "Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/"},
31+
{
32+
"information",
33+
"Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/"
34+
},
5435
{"connection_name", "Unknown"}
5536
};
5637

@@ -60,6 +41,7 @@ public record ClientParameters
6041
public EndPoint Endpoint { get; set; } = new IPEndPoint(IPAddress.Loopback, 5552);
6142
public Action<MetaDataUpdate> MetadataHandler { get; set; } = _ => { };
6243
public Action<Exception> UnhandledExceptionHandler { get; set; } = _ => { };
44+
6345
public string ClientProvidedName
6446
{
6547
get => _clientProvidedName ??= Properties["connection_name"];
@@ -102,6 +84,8 @@ public int Write(Span<byte> span)
10284

10385
public class Client : IClient
10486
{
87+
private bool isClosed = false;
88+
10589
private readonly TimeSpan defaultTimeout = TimeSpan.FromSeconds(10);
10690

10791
private uint correlationId = 0; // allow for some pre-amble
@@ -124,7 +108,6 @@ public class Client : IClient
124108
private readonly IDictionary<byte, Func<Deliver, Task>> consumers =
125109
new ConcurrentDictionary<byte, Func<Deliver, Task>>();
126110

127-
private object closeResponse;
128111
private int publishCommandsSent;
129112

130113
public int PublishCommandsSent => publishCommandsSent;
@@ -154,11 +137,25 @@ private byte GetNextSubscriptionId()
154137
return result;
155138
}
156139

157-
public bool IsClosed => closeResponse != null;
140+
public bool IsClosed
141+
{
142+
get
143+
{
144+
if (connection.IsClosed)
145+
{
146+
isClosed = true;
147+
}
148+
149+
return isClosed;
150+
}
151+
152+
private set => isClosed = value;
153+
}
158154

159155
private Client(ClientParameters parameters)
160156
{
161157
Parameters = parameters;
158+
IsClosed = false;
162159
}
163160

164161
public delegate Task ConnectionCloseHandler(string reason);
@@ -167,12 +164,16 @@ private Client(ClientParameters parameters)
167164

168165
private async Task OnConnectionClosed(string reason)
169166
{
170-
await ConnectionClosed?.Invoke(reason)!;
167+
if (ConnectionClosed != null)
168+
{
169+
await ConnectionClosed?.Invoke(reason)!;
170+
}
171171
}
172172

173173
public static async Task<Client> Create(ClientParameters parameters)
174174
{
175175
var client = new Client(parameters);
176+
176177
client.connection = await Connection.Create(parameters.Endpoint,
177178
client.HandleIncoming, client.HandleClosed, parameters.Ssl);
178179

@@ -308,6 +309,7 @@ private uint NextCorrelationId()
308309

309310
private async Task HandleClosed(string reason)
310311
{
312+
IsClosed = true;
311313
await OnConnectionClosed(reason);
312314
}
313315

@@ -421,6 +423,7 @@ private void HandleCorrelatedCommand(ushort tag, ref ReadOnlySequence<byte> fram
421423
case CloseResponse.Key:
422424
CloseResponse.Read(frame, out var closeResponse);
423425
HandleCorrelatedResponse(closeResponse);
426+
IsClosed = true;
424427
break;
425428
default:
426429
if (MemoryMarshal.TryGetArray(frame.First, out var segment))
@@ -447,24 +450,26 @@ private void HandleCorrelatedResponse<T>(T command) where T : struct, ICommand
447450

448451
public async Task<CloseResponse> Close(string reason)
449452
{
450-
if (closeResponse != null)
453+
if (IsClosed)
451454
{
452-
return (CloseResponse)closeResponse;
455+
return new CloseResponse(0, ResponseCode.Ok);
453456
}
454457

455458
// TODO LRB timeout
456-
var result = await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason), TimeSpan.FromSeconds(30));
457-
closeResponse = result;
459+
var result =
460+
await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason),
461+
TimeSpan.FromSeconds(30));
458462

459463
try
460464
{
461465
connection.Dispose();
462466
}
463467
catch (Exception e)
464468
{
465-
LogEventSource.Log.LogError($"An error occurred while calling { nameof(connection.Dispose) }.", e);
469+
LogEventSource.Log.LogError($"An error occurred while calling {nameof(connection.Dispose)}.", e);
466470
}
467471

472+
IsClosed = true;
468473
return result;
469474
}
470475

RabbitMQ.Stream.Client/Connection.cs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ public class Connection : IDisposable
2222
private int numFrames;
2323
private readonly object writeLock = new();
2424
internal int NumFrames => numFrames;
25+
private bool isClosed = false;
26+
27+
public bool IsClosed => isClosed;
2528

2629
internal Func<Memory<byte>, Task> CommandCallback
2730
{
@@ -141,18 +144,20 @@ private async Task ProcessIncomingFrames()
141144

142145
await reader.CompleteAsync();
143146
}
144-
// The exception is needed mostly to raise the
145-
// closedCallback event.
146-
// It is useful to trace the error, but at this point
147-
// the socket is closed maybe not in the correct way
148147
catch (Exception e)
149148
{
149+
// The exception is needed mostly to raise the
150+
// closedCallback event.
151+
// It is useful to trace the error, but at this point
152+
// the socket is closed maybe not in the correct way
150153
Debug.WriteLine($"Error reading the socket, error: {e}");
151154
}
152-
153-
await closedCallback?.Invoke("TCP Connection Closed")!;
154-
155-
Debug.WriteLine("TCP Connection Closed");
155+
finally
156+
{
157+
isClosed = true;
158+
await closedCallback?.Invoke("TCP Connection Closed")!;
159+
Debug.WriteLine("TCP Connection Closed");
160+
}
156161
}
157162

158163
private static bool TryReadFrame(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> frame)

RabbitMQ.Stream.Client/Consumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ await config.MessageHandler(this,
119119

120120
public async Task<ResponseCode> Close()
121121
{
122-
if (_disposed)
122+
if (client.IsClosed)
123123
{
124124
return ResponseCode.Ok;
125125
}

RabbitMQ.Stream.Client/Producer.cs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public record ProducerConfig : INamedEntity
3232

3333
public class Producer : AbstractEntity, IDisposable
3434
{
35-
private bool _disposed;
35+
private readonly bool _disposed;
3636
private byte publisherId;
3737
private readonly ProducerConfig config;
3838
private readonly Channel<OutgoingMsg> messageBuffer;
@@ -82,11 +82,7 @@ private async Task Init()
8282
{
8383
foreach (var id in publishingIds.Span)
8484
{
85-
config.ConfirmHandler(new Confirmation
86-
{
87-
PublishingId = id,
88-
Code = ResponseCode.Ok,
89-
});
85+
config.ConfirmHandler(new Confirmation { PublishingId = id, Code = ResponseCode.Ok, });
9086
}
9187

9288
semaphore.Release(publishingIds.Length);
@@ -95,11 +91,7 @@ private async Task Init()
9591
{
9692
foreach (var (id, code) in errors)
9793
{
98-
config.ConfirmHandler(new Confirmation
99-
{
100-
PublishingId = id,
101-
Code = code,
102-
});
94+
config.ConfirmHandler(new Confirmation { PublishingId = id, Code = code, });
10395
}
10496

10597
semaphore.Release(errors.Length);
@@ -198,7 +190,7 @@ async Task SendMessages(List<(ulong, Message)> messages)
198190

199191
public async Task<ResponseCode> Close()
200192
{
201-
if (_disposed)
193+
if (client.IsClosed)
202194
{
203195
return ResponseCode.Ok;
204196
}
@@ -207,7 +199,6 @@ public async Task<ResponseCode> Close()
207199
var result = deletePublisherResponse.ResponseCode;
208200
var closed = client.MaybeClose($"client-close-publisher: {publisherId}");
209201
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"client-close-publisher: {publisherId}");
210-
_disposed = true;
211202
return result;
212203
}
213204

Tests/Amqp10Tests.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ namespace Tests
1414
public class Amqp10Tests
1515
{
1616
[Fact]
17-
[WaitTestBeforeAfter]
1817
public void ReadsThrowsExceptionInvalidType()
1918
{
2019
var data = new byte[10];
@@ -119,7 +118,6 @@ public void ReadsThrowsExceptionInvalidType()
119118
}
120119

121120
[Fact]
122-
[WaitTestBeforeAfter]
123121
public void ValidateFormatCode()
124122
{
125123
const bool boolTrue = true;
@@ -256,7 +254,6 @@ private static void RunValidateFormatCode<T>(T value, byte[] result)
256254
}
257255

258256
[Fact]
259-
[WaitTestBeforeAfter]
260257
public void Validate32Bytes8BytesLists()
261258
{
262259
var value32Bin = new byte[] { 0xD0, 0x0, 0x0, 0x0, 0xF, 0x0, 0x0, 0x1, 0xF };
@@ -278,7 +275,6 @@ public void Validate32Bytes8BytesLists()
278275
}
279276

280277
[Fact]
281-
[WaitTestBeforeAfter]
282278
public void ValidateMessagesFromGo()
283279
{
284280
// These files are generated with the Go AMQP 1.0 client
@@ -379,7 +375,6 @@ public void ValidateMessagesFromGo()
379375
}
380376

381377
[Fact]
382-
[WaitTestBeforeAfter]
383378
public void MapEntriesWithAnEmptyKeyShouldNotBeWrittenToTheWire()
384379
{
385380
// Given we have an annotation with a valid key

Tests/ApiApproval.Approve.verified.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ namespace RabbitMQ.Stream.Client
266266
}
267267
public class Connection : System.IDisposable
268268
{
269+
public bool IsClosed { get; }
269270
public void Dispose() { }
270271
public System.Threading.Tasks.ValueTask<bool> Write<T>(T command)
271272
where T : struct, RabbitMQ.Stream.Client.ICommand { }

0 commit comments

Comments
 (0)