Skip to content

Commit 97ba318

Browse files
committed
add the OnConnectionClosed
function on the Producer class Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 578589a commit 97ba318

File tree

3 files changed

+10
-4
lines changed

3 files changed

+10
-4
lines changed

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig
8888
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig.DeduplicatingProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream, string reference) -> void
8989
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
9090
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
91+
RabbitMQ.Stream.Client.Reliable.ProducerConfig.OnConnectionClosed.get -> System.Func<string, System.Threading.Tasks.Task>
92+
RabbitMQ.Stream.Client.Reliable.ProducerConfig.OnConnectionClosed.set -> void
9193
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
9294
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
9395
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.set -> void

RabbitMQ.Stream.Client/Reliable/Producer.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ public TimeSpan TimeoutMessageAfter
104104
}
105105
}
106106

107+
public Func<string, Task> OnConnectionClosed { get; set; } = null;
108+
107109
public ProducerConfig(StreamSystem streamSystem, string stream) : base(streamSystem, stream)
108110
{
109111
}

RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ private async Task<IProducer> SuperStreamProducer()
6262

6363
private async Task<IProducer> StandardProducer()
6464
{
65+
var onConnectionClosed = _producerConfig.OnConnectionClosed ?? (async _ =>
66+
{
67+
await TryToReconnect(_producerConfig.ReconnectStrategy).ConfigureAwait(false);
68+
});
69+
6570
return await _producerConfig.StreamSystem.CreateRawProducer(new RawProducerConfig(_producerConfig.Stream)
6671
{
6772
ClientProvidedName = _producerConfig.ClientProvidedName,
@@ -80,10 +85,7 @@ private async Task<IProducer> StandardProducer()
8085
_producerConfig.StreamSystem).WaitAsync(CancellationToken.None);
8186
});
8287
},
83-
ConnectionClosedHandler = async _ =>
84-
{
85-
await TryToReconnect(_producerConfig.ReconnectStrategy).ConfigureAwait(false);
86-
},
88+
ConnectionClosedHandler = onConnectionClosed,
8789
ConfirmHandler = confirmation =>
8890
{
8991
var confirmationStatus = confirmation.Code switch

0 commit comments

Comments
 (0)