Skip to content

Commit d70458a

Browse files
authored
Add event status changed (#349)
Add StatusChangedHandler StatusChanged to the Producer and Consumer classes. It exposes the internal status with `From` and `To` and the reason for the status change. The user is aware of what is happening internally and makes decisions based on the internal status. The Producer and Consumer classes are considered `Open` even if the status is: `Reconnecting`. --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 85c220a commit d70458a

16 files changed

+812
-123
lines changed

Directory.Packages.props

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,16 @@
2121
<PackageVersion Include="Microsoft.Extensions.Logging" Version="7.0.0" />
2222
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="7.0.0" />
2323
</ItemGroup>
24-
2524
<ItemGroup Label=".NET 6 Specific" Condition="'$(TargetFramework)' == 'net6.0'">
2625
<!-- RabbitMQ.Stream.Client -->
2726
<PackageVersion Include="System.IO.Hashing" Version="6.0.0" />
2827
<PackageVersion Include="System.IO.Pipelines" Version="6.0.0" />
2928
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0" />
3029
</ItemGroup>
31-
3230
<ItemGroup Label=".NET 7 Specific" Condition="'$(TargetFramework)' == 'net7.0'">
3331
<!-- RabbitMQ.Stream.Client -->
3432
<PackageVersion Include="System.IO.Hashing" Version="7.0.0" />
3533
<PackageVersion Include="System.IO.Pipelines" Version="7.0.0" />
3634
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.0" />
37-
</ItemGroup>
38-
</Project>
35+
</ItemGroup>
36+
</Project>

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,13 @@ RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.get
184184
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.set -> void
185185
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
186186
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
187+
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
188+
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.BoolFailure = 5 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
189+
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.ClosedByStrategyPolicy = 4 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
190+
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.ClosedByUser = 3 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
191+
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.MetaDataUpdate = 2 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
192+
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.None = 0 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
193+
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.UnexpectedlyDisconnected = 1 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
187194
RabbitMQ.Stream.Client.Reliable.Consumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
188195
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
189196
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.set -> void
@@ -209,18 +216,35 @@ RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
209216
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
210217
RabbitMQ.Stream.Client.Reliable.ProducerFactory.CreateProducer(bool boot) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
211218
RabbitMQ.Stream.Client.Reliable.ProducerFactory._producer -> RabbitMQ.Stream.Client.IProducer
212-
RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus status) -> void
219+
RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus newStatus, RabbitMQ.Stream.Client.Reliable.ChangeStatusReason reason, string partition = null) -> void
213220
RabbitMQ.Stream.Client.Reliable.ReliableBase._status -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
214221
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Identifier.get -> string
215222
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Identifier.set -> void
223+
RabbitMQ.Stream.Client.Reliable.ReliableConfig.OnStatusChanged(RabbitMQ.Stream.Client.Reliable.StatusInfo statusInfo) -> void
216224
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ReconnectStrategy.get -> RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
217225
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ResourceAvailableReconnectStrategy.get -> RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
218226
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ResourceAvailableReconnectStrategy.set -> void
227+
RabbitMQ.Stream.Client.Reliable.ReliableConfig.StatusChanged -> RabbitMQ.Stream.Client.Reliable.ReliableConfig.StatusChangedHandler
228+
RabbitMQ.Stream.Client.Reliable.ReliableConfig.StatusChangedHandler
219229
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
220230
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Closed = 3 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
221231
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Initialization = 0 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
222232
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Open = 1 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
223-
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Reconnecting = 2 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
233+
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Reconnection = 2 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
234+
RabbitMQ.Stream.Client.Reliable.StatusInfo
235+
RabbitMQ.Stream.Client.Reliable.StatusInfo.From.get -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
236+
RabbitMQ.Stream.Client.Reliable.StatusInfo.From.init -> void
237+
RabbitMQ.Stream.Client.Reliable.StatusInfo.Identifier.get -> string
238+
RabbitMQ.Stream.Client.Reliable.StatusInfo.Identifier.init -> void
239+
RabbitMQ.Stream.Client.Reliable.StatusInfo.Partition.get -> string
240+
RabbitMQ.Stream.Client.Reliable.StatusInfo.Partition.init -> void
241+
RabbitMQ.Stream.Client.Reliable.StatusInfo.Reason.get -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
242+
RabbitMQ.Stream.Client.Reliable.StatusInfo.Reason.init -> void
243+
RabbitMQ.Stream.Client.Reliable.StatusInfo.StatusInfo(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus From, RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus To, string Stream, string Identifier, string Partition, RabbitMQ.Stream.Client.Reliable.ChangeStatusReason Reason = RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.None) -> void
244+
RabbitMQ.Stream.Client.Reliable.StatusInfo.Stream.get -> string
245+
RabbitMQ.Stream.Client.Reliable.StatusInfo.Stream.init -> void
246+
RabbitMQ.Stream.Client.Reliable.StatusInfo.To.get -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
247+
RabbitMQ.Stream.Client.Reliable.StatusInfo.To.init -> void
224248
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
225249
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.set -> void
226250
RabbitMQ.Stream.Client.RouteNotFoundException

RabbitMQ.Stream.Client/Reliable/Consumer.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public static async Task<Consumer> Create(ConsumerConfig consumerConfig, ILogger
172172
consumerConfig.ReconnectStrategy ??= new BackOffReconnectStrategy(logger);
173173
consumerConfig.ResourceAvailableReconnectStrategy ??= new ResourceAvailableBackOffReconnectStrategy(logger);
174174
var rConsumer = new Consumer(consumerConfig, logger);
175-
await rConsumer.Init(consumerConfig.ReconnectStrategy, consumerConfig.ResourceAvailableReconnectStrategy)
175+
await rConsumer.Init(consumerConfig)
176176
.ConfigureAwait(false);
177177
return rConsumer;
178178
}
@@ -204,11 +204,11 @@ public override async Task Close()
204204
{
205205
if (_status == ReliableEntityStatus.Initialization)
206206
{
207-
UpdateStatus(ReliableEntityStatus.Closed);
207+
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser);
208208
return;
209209
}
210210

211-
UpdateStatus(ReliableEntityStatus.Closed);
211+
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser);
212212
await CloseEntity().ConfigureAwait(false);
213213
_logger?.LogDebug("Consumer {Identity} closed", ToString());
214214
}

RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,18 @@ private async Task<IConsumer> StandardConsumer(bool boot)
6262
{
6363
if (closeReason == ConnectionClosedReason.Normal)
6464
{
65+
// we don't update the status here since it happens when Close() is called in a normal way
6566
BaseLogger.LogInformation("{Identity} is closed normally", ToString());
6667
return;
6768
}
6869

69-
await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream).ConfigureAwait(false);
70+
await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
71+
ChangeStatusReason.UnexpectedlyDisconnected).ConfigureAwait(false);
7072
},
7173
MetadataHandler = async _ =>
7274
{
73-
await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream).ConfigureAwait(false);
75+
await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
76+
ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);
7477
},
7578
MessageHandler = async (consumer, ctx, message) =>
7679
{
@@ -127,19 +130,22 @@ private async Task<IConsumer> SuperConsumer(bool boot)
127130
await RandomWait().ConfigureAwait(false);
128131
if (closeReason == ConnectionClosedReason.Normal)
129132
{
133+
// we don't update the status here since it happens when Close() is called in a normal way
130134
BaseLogger.LogInformation("{Identity} is closed normally", ToString());
131135
return;
132136
}
133137

134138
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
135-
await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r)
139+
await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r,
140+
ChangeStatusReason.UnexpectedlyDisconnected)
136141
.ConfigureAwait(false);
137142
},
138143
MetadataHandler = async update =>
139144
{
140145
await RandomWait().ConfigureAwait(false);
141146
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
142-
await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r)
147+
await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r,
148+
ChangeStatusReason.MetaDataUpdate)
143149
.ConfigureAwait(false);
144150
},
145151
MessageHandler = async (partitionStream, consumer, ctx, message) =>

RabbitMQ.Stream.Client/Reliable/Producer.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public static async Task<Producer> Create(ProducerConfig producerConfig, ILogger
159159
producerConfig.ReconnectStrategy ??= new BackOffReconnectStrategy(logger);
160160
producerConfig.ResourceAvailableReconnectStrategy ??= new ResourceAvailableBackOffReconnectStrategy(logger);
161161
var rProducer = new Producer(producerConfig, logger);
162-
await rProducer.Init(producerConfig.ReconnectStrategy, producerConfig.ResourceAvailableReconnectStrategy)
162+
await rProducer.Init(producerConfig)
163163
.ConfigureAwait(false);
164164
return rProducer;
165165
}
@@ -201,11 +201,11 @@ public override async Task Close()
201201
{
202202
if (ReliableEntityStatus.Initialization == _status)
203203
{
204-
UpdateStatus(ReliableEntityStatus.Closed);
204+
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser);
205205
return;
206206
}
207207

208-
UpdateStatus(ReliableEntityStatus.Closed);
208+
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser);
209209
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
210210
try
211211
{

RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ private async Task<IProducer> SuperStreamProducer(bool boot)
3434
{
3535
if (boot)
3636
{
37-
3837
return await _producerConfig.StreamSystem.CreateRawSuperStreamProducer(
3938
new RawSuperStreamProducerConfig(_producerConfig.Stream)
4039
{
@@ -56,14 +55,16 @@ private async Task<IProducer> SuperStreamProducer(bool boot)
5655
}
5756

5857
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
59-
await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r)
58+
await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r,
59+
ChangeStatusReason.UnexpectedlyDisconnected)
6060
.ConfigureAwait(false);
6161
},
6262
MetadataHandler = async update =>
6363
{
6464
await RandomWait().ConfigureAwait(false);
6565
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
66-
await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r)
66+
await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r,
67+
ChangeStatusReason.MetaDataUpdate)
6768
.ConfigureAwait(false);
6869
},
6970
ConfirmHandler = confirmationHandler =>
@@ -104,7 +105,8 @@ private async Task<IProducer> StandardProducer()
104105
MetadataHandler = async _ =>
105106
{
106107
await RandomWait().ConfigureAwait(false);
107-
await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream).ConfigureAwait(false);
108+
await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
109+
ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);
108110
},
109111
ConnectionClosedHandler = async (closeReason) =>
110112
{
@@ -115,7 +117,8 @@ private async Task<IProducer> StandardProducer()
115117
return;
116118
}
117119

118-
await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream).ConfigureAwait(false);
120+
await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
121+
ChangeStatusReason.UnexpectedlyDisconnected).ConfigureAwait(false);
119122
},
120123
ConfirmHandler = confirmation =>
121124
{

0 commit comments

Comments
 (0)