Skip to content

Commit 33e832c

Browse files
Reliable Producer (#104)
- Implement Reliable Producer - See #104 Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Luke Bakken <[email protected]>
1 parent 950eaab commit 33e832c

12 files changed

+915
-22
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -254,11 +254,18 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
254254

255255
public async Task<DeletePublisherResponse> DeletePublisher(byte publisherId)
256256
{
257-
var result =
258-
await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
259-
new DeletePublisherRequest(corr, publisherId));
260-
publishers.Remove(publisherId);
261-
return result;
257+
try
258+
{
259+
var result =
260+
await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
261+
new DeletePublisherRequest(corr, publisherId));
262+
263+
return result;
264+
}
265+
finally
266+
{
267+
publishers.Remove(publisherId);
268+
}
262269
}
263270

264271
public async Task<(byte, SubscribeResponse)> Subscribe(string stream, IOffsetType offsetType,

RabbitMQ.Stream.Client/Connection.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,6 @@ private async Task ProcessIncomingFrames()
127127
{
128128
// Let's rent some memory to copy the frame from the network stream. This memory will be reclaimed once the frame has been handled.
129129

130-
// Console.WriteLine(
131-
// $"B TryReadFrame {buffer.Length} {result.IsCompleted} {result.Buffer.IsEmpty} {frame.Length}");
132-
133130
var memory =
134131
ArrayPool<byte>.Shared.Rent((int)frame.Length).AsMemory(0, (int)frame.Length);
135132
frame.CopyTo(memory.Span);

RabbitMQ.Stream.Client/Producer.cs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public record ProducerConfig : INamedEntity
3232

3333
public class Producer : AbstractEntity, IDisposable
3434
{
35-
private readonly bool _disposed;
35+
private bool _disposed;
3636
private byte publisherId;
3737
private readonly ProducerConfig config;
3838
private readonly Channel<OutgoingMsg> messageBuffer;
@@ -131,7 +131,7 @@ public async ValueTask Send(ulong publishingId, List<Message> subEntryMessages,
131131

132132
private async Task SemaphoreWait()
133133
{
134-
if (!semaphore.Wait(0))
134+
if (!semaphore.Wait(0) && !client.IsClosed)
135135
{
136136
// Nope, we have maxed our In-Flight messages, let's asynchronously wait for confirms
137137
if (!await semaphore.WaitAsync(1000).ConfigureAwait(false))
@@ -159,7 +159,7 @@ private async Task ProcessBuffer()
159159
{
160160
// TODO: make the batch size configurable.
161161
var messages = new List<(ulong, Message)>(100);
162-
while (await messageBuffer.Reader.WaitToReadAsync().ConfigureAwait(false))
162+
while (await messageBuffer.Reader.WaitToReadAsync().ConfigureAwait(false) && !client.IsClosed)
163163
{
164164
while (messageBuffer.Reader.TryRead(out var msg))
165165
{
@@ -188,18 +188,35 @@ async Task SendMessages(List<(ulong, Message)> messages)
188188
}
189189
}
190190

191-
public async Task<ResponseCode> Close()
191+
public Task<ResponseCode> Close()
192192
{
193193
if (client.IsClosed)
194194
{
195-
return ResponseCode.Ok;
195+
return Task.FromResult(ResponseCode.Ok);
196+
}
197+
198+
var result = ResponseCode.Ok;
199+
try
200+
{
201+
var deletePublisherResponseTask = client.DeletePublisher(publisherId);
202+
// The default timeout is usually 10 seconds
203+
// in this case we reduce the waiting time
204+
// the producer could be removed because of stream deleted
205+
// so it is not necessary to wait.
206+
deletePublisherResponseTask.Wait(TimeSpan.FromSeconds(3));
207+
if (deletePublisherResponseTask.IsCompletedSuccessfully)
208+
{
209+
result = deletePublisherResponseTask.Result.ResponseCode;
210+
}
211+
}
212+
catch (Exception e)
213+
{
214+
LogEventSource.Log.LogError($"Error removing the producer id: {publisherId} from the server. {e}");
196215
}
197216

198-
var deletePublisherResponse = await client.DeletePublisher(publisherId);
199-
var result = deletePublisherResponse.ResponseCode;
200217
var closed = client.MaybeClose($"client-close-publisher: {publisherId}");
201218
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"client-close-publisher: {publisherId}");
202-
return result;
219+
return Task.FromResult(result);
203220
}
204221

205222
public static async Task<Producer> Create(ClientParameters clientParameters,
@@ -228,6 +245,7 @@ private void Dispose(bool disposing)
228245
closeProducer.Wait(1000);
229246
ClientExceptions.MaybeThrowException(closeProducer.Result,
230247
$"Error during remove producer. Producer: {publisherId}");
248+
_disposed = true;
231249
}
232250

233251
public void Dispose()
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2020 VMware, Inc.
4+
5+
using System;
6+
using System.Collections.Concurrent;
7+
using System.Collections.Generic;
8+
using System.Linq;
9+
using System.Threading.Tasks;
10+
using System.Threading.Tasks.Dataflow;
11+
using System.Timers;
12+
using Timer = System.Timers.Timer;
13+
14+
namespace RabbitMQ.Stream.Client.Reliable;
15+
16+
/// <summary>
17+
/// ConfirmationStatus can be:
18+
/// </summary>
19+
public enum ConfirmationStatus : ushort
20+
{
21+
WaitForConfirmation = 0,
22+
Confirmed = 1,
23+
TimeoutError = 2,
24+
StreamNotAvailable = 6,
25+
InternalError = 15,
26+
AccessRefused = 16,
27+
PreconditionFailed = 17,
28+
PublisherDoesNotExist = 18,
29+
UndefinedError = 200,
30+
}
31+
32+
/// <summary>
33+
/// MessagesConfirmation is a wrapper around the message/s
34+
/// This class is returned to the user to understand
35+
/// the message status.
36+
/// </summary>
37+
public class MessagesConfirmation
38+
{
39+
public ulong PublishingId { get; internal set; }
40+
public List<Message> Messages { get; internal init; }
41+
public DateTime InsertDateTime { get; init; }
42+
public ConfirmationStatus Status { get; internal set; }
43+
}
44+
45+
/// <summary>
46+
/// ConfirmationPipe maintains the status for the sent and received messages.
47+
/// TPL Action block sends the confirmation to the user in async way
48+
/// So the send/1 is not blocking.
49+
/// </summary>
50+
public class ConfirmationPipe
51+
{
52+
private ActionBlock<Tuple<ConfirmationStatus, ulong>> _waitForConfirmationActionBlock;
53+
private readonly ConcurrentDictionary<ulong, MessagesConfirmation> _waitForConfirmation = new();
54+
private readonly Timer _invalidateTimer = new();
55+
private Func<MessagesConfirmation, Task> ConfirmHandler { get; }
56+
57+
public ConfirmationPipe(Func<MessagesConfirmation, Task> confirmHandler)
58+
{
59+
ConfirmHandler = confirmHandler;
60+
}
61+
62+
public void Start()
63+
{
64+
_waitForConfirmationActionBlock = new ActionBlock<Tuple<ConfirmationStatus, ulong>>(
65+
request =>
66+
{
67+
var (confirmationStatus, publishingId) = request;
68+
69+
_waitForConfirmation.TryRemove(publishingId, out var message);
70+
if (message == null)
71+
{
72+
return;
73+
}
74+
75+
message.Status = confirmationStatus;
76+
ConfirmHandler?.Invoke(message);
77+
}, new ExecutionDataflowBlockOptions
78+
{
79+
MaxDegreeOfParallelism = 1,
80+
// throttling
81+
BoundedCapacity = 50_000
82+
});
83+
84+
_invalidateTimer.Elapsed += OnTimedEvent;
85+
_invalidateTimer.Interval = 2000;
86+
_invalidateTimer.Enabled = true;
87+
}
88+
89+
public void Stop()
90+
{
91+
_invalidateTimer.Enabled = false;
92+
_waitForConfirmationActionBlock.Complete();
93+
}
94+
95+
private async void OnTimedEvent(object? sender, ElapsedEventArgs e)
96+
{
97+
{
98+
foreach (var pair in _waitForConfirmation.Where(pair =>
99+
(DateTime.Now - pair.Value.InsertDateTime).Seconds > 2))
100+
{
101+
await RemoveUnConfirmedMessage(pair.Value.PublishingId, ConfirmationStatus.TimeoutError);
102+
}
103+
}
104+
}
105+
106+
public void AddUnConfirmedMessage(ulong publishingId, Message message)
107+
{
108+
AddUnConfirmedMessage(publishingId, new List<Message>() { message });
109+
}
110+
111+
public void AddUnConfirmedMessage(ulong publishingId, List<Message> messages)
112+
{
113+
_waitForConfirmation.TryAdd(publishingId,
114+
new MessagesConfirmation()
115+
{
116+
Messages = messages,
117+
PublishingId = publishingId,
118+
InsertDateTime = DateTime.Now
119+
});
120+
}
121+
122+
public Task RemoveUnConfirmedMessage(ulong publishingId, ConfirmationStatus confirmationStatus)
123+
{
124+
return _waitForConfirmationActionBlock.SendAsync(
125+
Tuple.Create(confirmationStatus, publishingId));
126+
}
127+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2020 VMware, Inc.
4+
5+
namespace RabbitMQ.Stream.Client.Reliable;
6+
7+
public interface IReconnectStrategy
8+
{
9+
void WhenDisconnected(out bool reconnect);
10+
void WhenConnected();
11+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2020 VMware, Inc.
4+
5+
using System.Threading.Tasks;
6+
7+
namespace RabbitMQ.Stream.Client.Reliable;
8+
9+
/// <summary>
10+
/// Define PublishingId Strategy.
11+
/// Can be automatic, so the ReliableProducer will provide
12+
/// the PublishingId
13+
/// </summary>
14+
public interface IPublishingIdStrategy
15+
{
16+
ulong GetPublishingId();
17+
Task InitPublishingId();
18+
}

0 commit comments

Comments
 (0)