Skip to content

Reliable Producer #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,18 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand

public async Task<DeletePublisherResponse> DeletePublisher(byte publisherId)
{
var result =
await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
new DeletePublisherRequest(corr, publisherId));
publishers.Remove(publisherId);
return result;
try
{
var result =
await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
new DeletePublisherRequest(corr, publisherId));

return result;
}
finally
{
publishers.Remove(publisherId);
}
}

public async Task<(byte, SubscribeResponse)> Subscribe(string stream, IOffsetType offsetType,
Expand Down
3 changes: 0 additions & 3 deletions RabbitMQ.Stream.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ private async Task ProcessIncomingFrames()
{
// Let's rent some memory to copy the frame from the network stream. This memory will be reclaimed once the frame has been handled.

// Console.WriteLine(
// $"B TryReadFrame {buffer.Length} {result.IsCompleted} {result.Buffer.IsEmpty} {frame.Length}");

var memory =
ArrayPool<byte>.Shared.Rent((int)frame.Length).AsMemory(0, (int)frame.Length);
frame.CopyTo(memory.Span);
Expand Down
34 changes: 26 additions & 8 deletions RabbitMQ.Stream.Client/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public record ProducerConfig : INamedEntity

public class Producer : AbstractEntity, IDisposable
{
private readonly bool _disposed;
private bool _disposed;
private byte publisherId;
private readonly ProducerConfig config;
private readonly Channel<OutgoingMsg> messageBuffer;
Expand Down Expand Up @@ -131,7 +131,7 @@ public async ValueTask Send(ulong publishingId, List<Message> subEntryMessages,

private async Task SemaphoreWait()
{
if (!semaphore.Wait(0))
if (!semaphore.Wait(0) && !client.IsClosed)
{
// Nope, we have maxed our In-Flight messages, let's asynchronously wait for confirms
if (!await semaphore.WaitAsync(1000).ConfigureAwait(false))
Expand Down Expand Up @@ -159,7 +159,7 @@ private async Task ProcessBuffer()
{
// TODO: make the batch size configurable.
var messages = new List<(ulong, Message)>(100);
while (await messageBuffer.Reader.WaitToReadAsync().ConfigureAwait(false))
while (await messageBuffer.Reader.WaitToReadAsync().ConfigureAwait(false) && !client.IsClosed)
{
while (messageBuffer.Reader.TryRead(out var msg))
{
Expand Down Expand Up @@ -188,18 +188,35 @@ async Task SendMessages(List<(ulong, Message)> messages)
}
}

public async Task<ResponseCode> Close()
public Task<ResponseCode> Close()
{
if (client.IsClosed)
{
return ResponseCode.Ok;
return Task.FromResult(ResponseCode.Ok);
}

var result = ResponseCode.Ok;
try
{
var deletePublisherResponseTask = client.DeletePublisher(publisherId);
// The default timeout is usually 10 seconds
// in this case we reduce the waiting time
// the producer could be removed because of stream deleted
// so it is not necessary to wait.
deletePublisherResponseTask.Wait(TimeSpan.FromSeconds(3));
if (deletePublisherResponseTask.IsCompletedSuccessfully)
{
result = deletePublisherResponseTask.Result.ResponseCode;
}
}
catch (Exception e)
{
LogEventSource.Log.LogError($"Error removing the producer id: {publisherId} from the server. {e}");
}

var deletePublisherResponse = await client.DeletePublisher(publisherId);
var result = deletePublisherResponse.ResponseCode;
var closed = client.MaybeClose($"client-close-publisher: {publisherId}");
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"client-close-publisher: {publisherId}");
return result;
return Task.FromResult(result);
}

public static async Task<Producer> Create(ClientParameters clientParameters,
Expand Down Expand Up @@ -228,6 +245,7 @@ private void Dispose(bool disposing)
closeProducer.Wait(1000);
ClientExceptions.MaybeThrowException(closeProducer.Result,
$"Error during remove producer. Producer: {publisherId}");
_disposed = true;
}

public void Dispose()
Expand Down
127 changes: 127 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2020 VMware, Inc.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Timers;
using Timer = System.Timers.Timer;

namespace RabbitMQ.Stream.Client.Reliable;

/// <summary>
/// ConfirmationStatus can be:
/// </summary>
public enum ConfirmationStatus : ushort
{
WaitForConfirmation = 0,
Confirmed = 1,
TimeoutError = 2,
StreamNotAvailable = 6,
InternalError = 15,
AccessRefused = 16,
PreconditionFailed = 17,
PublisherDoesNotExist = 18,
UndefinedError = 200,
}

/// <summary>
/// MessagesConfirmation is a wrapper around the message/s
/// This class is returned to the user to understand
/// the message status.
/// </summary>
public class MessagesConfirmation
{
public ulong PublishingId { get; internal set; }
public List<Message> Messages { get; internal init; }
public DateTime InsertDateTime { get; init; }
public ConfirmationStatus Status { get; internal set; }
}

/// <summary>
/// ConfirmationPipe maintains the status for the sent and received messages.
/// TPL Action block sends the confirmation to the user in async way
/// So the send/1 is not blocking.
/// </summary>
public class ConfirmationPipe
{
private ActionBlock<Tuple<ConfirmationStatus, ulong>> _waitForConfirmationActionBlock;
private readonly ConcurrentDictionary<ulong, MessagesConfirmation> _waitForConfirmation = new();
private readonly Timer _invalidateTimer = new();
private Func<MessagesConfirmation, Task> ConfirmHandler { get; }

public ConfirmationPipe(Func<MessagesConfirmation, Task> confirmHandler)
{
ConfirmHandler = confirmHandler;
}

public void Start()
{
_waitForConfirmationActionBlock = new ActionBlock<Tuple<ConfirmationStatus, ulong>>(
request =>
{
var (confirmationStatus, publishingId) = request;

_waitForConfirmation.TryRemove(publishingId, out var message);
if (message == null)
{
return;
}

message.Status = confirmationStatus;
ConfirmHandler?.Invoke(message);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
// throttling
BoundedCapacity = 50_000
});

_invalidateTimer.Elapsed += OnTimedEvent;
_invalidateTimer.Interval = 2000;
_invalidateTimer.Enabled = true;
}

public void Stop()
{
_invalidateTimer.Enabled = false;
_waitForConfirmationActionBlock.Complete();
}

private async void OnTimedEvent(object? sender, ElapsedEventArgs e)
{
{
foreach (var pair in _waitForConfirmation.Where(pair =>
(DateTime.Now - pair.Value.InsertDateTime).Seconds > 2))
{
await RemoveUnConfirmedMessage(pair.Value.PublishingId, ConfirmationStatus.TimeoutError);
}
}
}

public void AddUnConfirmedMessage(ulong publishingId, Message message)
{
AddUnConfirmedMessage(publishingId, new List<Message>() { message });
}

public void AddUnConfirmedMessage(ulong publishingId, List<Message> messages)
{
_waitForConfirmation.TryAdd(publishingId,
new MessagesConfirmation()
{
Messages = messages,
PublishingId = publishingId,
InsertDateTime = DateTime.Now
});
}

public Task RemoveUnConfirmedMessage(ulong publishingId, ConfirmationStatus confirmationStatus)
{
return _waitForConfirmationActionBlock.SendAsync(
Tuple.Create(confirmationStatus, publishingId));
}
}
11 changes: 11 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2020 VMware, Inc.

namespace RabbitMQ.Stream.Client.Reliable;

public interface IReconnectStrategy
{
void WhenDisconnected(out bool reconnect);
void WhenConnected();
}
18 changes: 18 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2020 VMware, Inc.

using System.Threading.Tasks;

namespace RabbitMQ.Stream.Client.Reliable;

/// <summary>
/// Define PublishingId Strategy.
/// Can be automatic, so the ReliableProducer will provide
/// the PublishingId
/// </summary>
public interface IPublishingIdStrategy
{
ulong GetPublishingId();
Task InitPublishingId();
}
Loading