Skip to content

Add TryQueryOffset API #413

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
<PackageVersion Include="coverlet.collector" Version="3.2.0" />
<!-- docs/**/*.csproj -->
<PackageVersion Include="K4os.Compression.LZ4.Streams" Version="1.2.16" />
<PackageVersion Include="Microsoft.Extensions.Logging" Version="7.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="7.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="8.0.0" />
</ItemGroup>
<ItemGroup Label="net8.0 specific" Condition="'$(TargetFramework)' == 'net8.0'">
<!-- RabbitMQ.Stream.Client -->
Expand All @@ -37,4 +37,4 @@
<!-- RabbitMQ.Stream.Client.PerfTest -->
<PackageVersion Include="FSharp.Core" Version="9.0.201" />
</ItemGroup>
</Project>
</Project>
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The library requires .NET 6, .NET 7 or .NET 8.
- [Best practices to write a reliable client](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/tree/main/docs/ReliableClient/)
- [Super Stream example](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/blob/main/docs/SuperStream)
- [Stream Performance Test](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/tree/main/RabbitMQ.Stream.Client.PerfTest)
- [Single Active consumer for a stream](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/tree/main/docs/SingleActiveConsumer)



Expand Down
1 change: 1 addition & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ RabbitMQ.Stream.Client.StreamSystem.StoreOffset(string reference, string stream,
RabbitMQ.Stream.Client.StreamSystem.StreamInfo(string streamName) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamInfo>
RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamStats>
RabbitMQ.Stream.Client.StreamSystem.SuperStreamExists(string superStream) -> System.Threading.Tasks.Task<bool>
RabbitMQ.Stream.Client.StreamSystem.TryQueryOffset(string reference, string stream) -> System.Threading.Tasks.Task<ulong?>
RabbitMQ.Stream.Client.StreamSystem.UpdateSecret(string newSecret) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void
Expand Down
20 changes: 16 additions & 4 deletions RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,22 @@ private async Task Init()
{
// in this case the StoredOffsetSpec is overridden by the ConsumerUpdateListener
// since the user decided to override the default behavior
_config.StoredOffsetSpec = await _config.ConsumerUpdateListener(
_config.Reference,
_config.Stream,
promotedAsActive).ConfigureAwait(false);

try
{
_config.StoredOffsetSpec = await _config.ConsumerUpdateListener(
_config.Reference,
_config.Stream,
promotedAsActive).ConfigureAwait(false);
}
catch (Exception e)
{
Logger?.LogError(e,
"Error while calling the ConsumerUpdateListener. OffsetTypeNext will be used. {EntityInfo}",
DumpEntityConfiguration());
// in this case the default behavior is to use the OffsetTypeNext
_config.StoredOffsetSpec = new OffsetTypeNext();
}
}

// Here we set the promotion status
Expand Down
19 changes: 19 additions & 0 deletions RabbitMQ.Stream.Client/StreamSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,29 @@ private static void MaybeThrowQueryException(string reference, string stream)
/// <param name="stream">Stream name</param>
/// <returns></returns>
public async Task<ulong> QueryOffset(string reference, string stream)
{
var offset = await TryQueryOffset(reference, stream).ConfigureAwait(false);
return offset ??
throw new OffsetNotFoundException($"QueryOffset stream: {stream}, reference: {reference}");
}

/// <summary>
/// TryQueryOffset tries to retrieve the last consumer offset stored
/// given a consumer name and stream name.
/// Returns null if the offset is not found.
/// </summary>
public async Task<ulong?> TryQueryOffset(string reference, string stream)
{
MaybeThrowQueryException(reference, stream);

var response = await _client.QueryOffset(reference, stream).ConfigureAwait(false);

// Offset do not exist so just return null. There is no need to throw an OffsetNotFoundException and capture it.
if (response.ResponseCode == ResponseCode.OffsetNotFound)
{
return null;
}

ClientExceptions.MaybeThrowException(response.ResponseCode,
$"QueryOffset stream: {stream}, reference: {reference}");
return response.Offset;
Expand Down
9 changes: 9 additions & 0 deletions Tests/RawConsumerSystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -518,11 +518,18 @@ public async Task ConsumerQueryOffset()
Assert.Equal((ulong)(NumberOfMessagesToStore - 1),
await system.QueryOffset(Reference, stream));

Assert.Equal((ulong)(NumberOfMessagesToStore - 1),
await system.TryQueryOffset(Reference, stream));

// this has to raise OffsetNotFoundException in case the offset
// does not exist like in this case.
await Assert.ThrowsAsync<OffsetNotFoundException>(() =>
system.QueryOffset("reference_does_not_exist", stream));

Assert.Null(await system.TryQueryOffset("reference_does_not_exist", stream));
await Assert.ThrowsAsync<GenericProtocolException>(() =>
(system.TryQueryOffset(Reference, "stream_does_not_exist")));

await rawConsumer.Close();
await system.DeleteStream(stream);
await system.Close();
Expand Down Expand Up @@ -579,8 +586,10 @@ public async Task ShouldConsumeFromStoredOffset()

// new consumer that should start from stored offset
var offset = await system.QueryOffset(Reference, stream);
var tryOffset = await system.TryQueryOffset(Reference, stream);
// the offset received must be the same from the last stored
Assert.Equal(offset, await storedOffset.Task);
Assert.Equal(offset, tryOffset);
var messagesConsumed = new TaskCompletionSource<ulong>();
var rawConsumerWithOffset = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
Expand Down
4 changes: 3 additions & 1 deletion Tests/SystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ await Assert.ThrowsAsync<ArgumentException>(
await Assert.ThrowsAsync<QueryException>(
async () => { await system.QueryPartition("stream_does_not_exist"); }
);

await system.Close();
}

Expand Down Expand Up @@ -297,7 +298,8 @@ public async Task CloseProducerConsumerAfterForceCloseShouldNotRaiseError()
var system = await StreamSystem.Create(config);
await system.CreateStream(new StreamSpec(stream));
var producer =
await system.CreateRawProducer(new RawProducerConfig(stream) { ClientProvidedName = clientProvidedName });
await system.CreateRawProducer(
new RawProducerConfig(stream) { ClientProvidedName = clientProvidedName });
await SystemUtils.WaitAsync();
var consumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream) { ClientProvidedName = clientProvidedName });
Expand Down
25 changes: 25 additions & 0 deletions docs/SingleActiveConsumer/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// See https://aka.ms/new-console-template for more information

using SingleActiveConsumer;

internal class Program
{
static async Task Main(string[] args)
{
if (args.Length > 0)
{
switch (args[0])
{
case "--producer":
await SaCProducer.Start().ConfigureAwait(false);
break;
case "--consumer":
await SacConsumer.Start().ConfigureAwait(false);
break;
default:
Console.WriteLine("Unknown option, valid options: --producer / --consumer");
break;
}
}
}
}
44 changes: 44 additions & 0 deletions docs/SingleActiveConsumer/SaCProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using Microsoft.Extensions.Logging;

namespace SingleActiveConsumer;

using System.Text;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;

public class SaCProducer
{
public static async Task Start()
{
var loggerFactory = LoggerFactory.Create(builder =>
{
builder.AddSimpleConsole();
builder.AddFilter("RabbitMQ.Stream", LogLevel.Information);
});

var loggerProducer = loggerFactory.CreateLogger<Producer>();
var loggerMain = loggerFactory.CreateLogger<StreamSystem>();


var streamSystem = await StreamSystem.Create(new StreamSystemConfig(), loggerMain).ConfigureAwait(false);
await streamSystem.CreateStream(new StreamSpec("my-sac-stream")).ConfigureAwait(false);
var producer = await Producer.Create(new ProducerConfig(streamSystem, "my-sac-stream"), loggerProducer)
.ConfigureAwait(false);
for (var i = 0; i < 5000; i++)
{
var body = Encoding.UTF8.GetBytes($"Message #{i}");
var message = new Message(body);
await producer.Send(message).ConfigureAwait(false);
Thread.Sleep(2000);
loggerProducer.LogInformation($"Message {i} sent");
}

Console.WriteLine("Sending 50 messages to my-sac-stream");
await producer.Close().ConfigureAwait(false);
await streamSystem.Close().ConfigureAwait(false);
}
}
69 changes: 69 additions & 0 deletions docs/SingleActiveConsumer/SacConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System.Buffers;
using System.Text;
using Microsoft.Extensions.Logging;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;

namespace SingleActiveConsumer;

public class SacConsumer
{
public static async Task Start()
{
var loggerFactory = LoggerFactory.Create(builder =>
{
builder.AddSimpleConsole();
builder.AddFilter("RabbitMQ.Stream", LogLevel.Information);
});

var loggerConsumer = loggerFactory.CreateLogger<Consumer>();
var loggerMain = loggerFactory.CreateLogger<StreamSystem>();


var streamSystem = await StreamSystem.Create(new StreamSystemConfig(), loggerMain).ConfigureAwait(false);
var consumer = await Consumer.Create(new ConsumerConfig(streamSystem, "my-sac-stream")
{
Reference = "sac_consumer",
OffsetSpec = new OffsetTypeFirst(),
IsSingleActiveConsumer = true,
MessageHandler = async (_, consumer, context, message) =>
{
var text = Encoding.UTF8.GetString(message.Data.Contents.ToArray());
loggerConsumer.LogInformation($"The message {text} was received");

// Store the offset of the message.
// store offset for each message is not a good practice
// here is only for demo purpose
await consumer.StoreOffset(context.Offset).ConfigureAwait(false);

await Task.CompletedTask.ConfigureAwait(false);
},
ConsumerUpdateListener = async (consumerRef, stream, isActive) =>
{
var status = isActive ? "active" : "inactive";
loggerConsumer.LogInformation($"Consumer {consumerRef} is {status} on stream {stream}");
if (!isActive) return new OffsetTypeNext();

var offset = await streamSystem.TryQueryOffset(consumerRef, stream).ConfigureAwait(false);
if (offset != null)
{
loggerConsumer.LogInformation($"The offset for {consumerRef} on stream {stream} is {offset}");
return new OffsetTypeOffset(offset.Value);
}

loggerConsumer.LogWarning(
$"The offset for {consumerRef} on stream {stream} is not available, OffsetNext will be used.");
return new OffsetTypeNext();

}
}, loggerConsumer).ConfigureAwait(false);
Console.WriteLine("Consumer is running. Press [enter] to exit.");
Console.ReadLine();
await consumer.Close().ConfigureAwait(false);
await streamSystem.Close().ConfigureAwait(false);
}
}
19 changes: 19 additions & 0 deletions docs/SingleActiveConsumer/SingleActiveConsumer.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\RabbitMQ.Stream.Client\RabbitMQ.Stream.Client.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" />
</ItemGroup>

</Project>
2 changes: 2 additions & 0 deletions docs/asciidoc/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,8 @@ include::{test-examples}/ConsumerUsage.cs[tag=manual-tracking-defaults]
<2> Store the current offset on some condition

The snippet above uses `consumer.StoreOffset(context.Offset)` to store at the offset of the current message.
It is possible to store the offset in a more generic way with `StreamSystem.StoreOffset(reference,stream, offsetValue)`


====== Considerations On Offset Tracking

Expand Down
6 changes: 5 additions & 1 deletion docs/asciidoc/query-stream.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ The following methods are available:


|`QueryOffset(string reference, string stream)`
|Retrieves retrieves the last consumer offset stored for a given consumer Reference and stream. Useful for as consumer wants to know the last stored offset.
|Retrieves the last consumer offset stored for a given consumer Reference and stream. Useful for as consumer wants to know the last stored offset.
| Stream

|`TryQueryOffset(string reference, string stream)`
|Like `QueryOffset` but returns `null` if the offset was not found.
| Stream

|`QueryPartition(string superStream)`
Expand Down