Skip to content

Commit 78b7e79

Browse files
committed
work in progress
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent cf8c927 commit 78b7e79

File tree

1 file changed

+24
-6
lines changed

1 file changed

+24
-6
lines changed

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,14 @@ internal void Validate()
8787

8888
switch (ConsumerFilter)
8989
{
90-
case { PostFilter: null }:
90+
case {PostFilter: null}:
9191
throw new ArgumentException("PostFilter must be provided when Filter is set");
92-
case { Values.Count: 0 }:
92+
case {Values.Count: 0}:
9393
throw new ArgumentException("Values must be provided when Filter is set");
9494
}
9595
}
9696

97-
internal bool IsFiltering => ConsumerFilter is { Values.Count: > 0 };
97+
internal bool IsFiltering => ConsumerFilter is {Values.Count: > 0};
9898

9999
// it is needed to be able to add the subscriptions arguments
100100
// see consumerProperties["super-stream"] = SuperStream;
@@ -177,6 +177,7 @@ protected override string GetStream()
177177
{
178178
return _config.Stream;
179179
}
180+
180181
public async Task StoreOffset(ulong offset)
181182
{
182183
await _client.StoreOffset(_config.Reference, _config.Stream, offset).ConfigureAwait(false);
@@ -528,7 +529,8 @@ private async Task Init()
528529
{
529530
Logger?.LogError(
530531
"CRC32 does not match, server crc: {Crc}, local crc: {CrcCalculated}, {EntityInfo}, " +
531-
"Chunk Consumed {ChunkConsumed}", deliver.Chunk.Crc, crcCalculated, DumpEntityConfiguration(),
532+
"Chunk Consumed {ChunkConsumed}", deliver.Chunk.Crc, crcCalculated,
533+
DumpEntityConfiguration(),
532534
chunkConsumed);
533535

534536
throw new CrcException(
@@ -537,7 +539,20 @@ private async Task Init()
537539
}
538540
}
539541

540-
await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false);
542+
try
543+
{
544+
await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false);
545+
}
546+
catch (OperationCanceledException)
547+
{
548+
// The consumer is closing from the user but some chunks are still in the buffer
549+
// simply skip the chunk since the Token.IsCancellationRequested is true
550+
// the catch is needed to avoid to propagate the exception to the socket thread.
551+
Logger?.LogWarning(
552+
"OperationCanceledException. {EntityInfo} has been closed while consuming messages. " +
553+
"Token.IsCancellationRequested: {IsCancellationRequested}",
554+
DumpEntityConfiguration(), Token.IsCancellationRequested);
555+
}
541556
}, async promotedAsActive =>
542557
{
543558
if (_config.ConsumerUpdateListener != null)
@@ -628,11 +643,14 @@ protected override async Task<ResponseCode> DeleteEntityFromTheServer(bool ignor
628643
}
629644

630645
return ResponseCode.Ok;
631-
632646
}
633647

634648
public override async Task<ResponseCode> Close()
635649
{
650+
// when the consumer is closed we must be sure that the
651+
// the subscription is completed to avoid problems with the connection
652+
// It could happen when the closing is called just after the creation
653+
await _completeSubscription.Task.ConfigureAwait(false);
636654
return await Shutdown(_config).ConfigureAwait(false);
637655
}
638656

0 commit comments

Comments
 (0)