Skip to content

Add debug info during the login step #301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions RabbitMQ.Stream.Client/AvailableFeatures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ internal class AvailableFeatures

public bool Is311OrMore { get; private set; }

public string BrokerVersion { get; private set; }

private static string ExtractVersion(string fullVersion)
{
const string Pattern = @"(\d+\.\d+\.\d+)";
Expand All @@ -30,6 +32,7 @@ private static string ExtractVersion(string fullVersion)
public void SetServerVersion(string brokerVersion)
{
var v = ExtractVersion(brokerVersion);
BrokerVersion = v;
Is311OrMore = new System.Version(v) >= new System.Version("3.11.0");
}

Expand Down
4 changes: 3 additions & 1 deletion RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public static async Task<Client> Create(ClientParameters parameters, ILogger log
// exchange properties
var peerPropertiesResponse = await client.Request<PeerPropertiesRequest, PeerPropertiesResponse>(corr =>
new PeerPropertiesRequest(corr, parameters.Properties)).ConfigureAwait(false);
logger?.LogDebug("Server properties: {@Properties}", peerPropertiesResponse);
logger?.LogDebug("Server properties: {@Properties}", peerPropertiesResponse.Properties);

//auth
var saslHandshakeResponse =
Expand Down Expand Up @@ -254,6 +254,8 @@ await client.Publish(new TuneRequest(0,
if (peerPropertiesResponse.Properties.TryGetValue("version", out var version))
{
AvailableFeaturesSingleton.Instance.SetServerVersion(version);
logger?.LogDebug("Extracted BrokerVersion version: {Version}",
AvailableFeaturesSingleton.Instance.BrokerVersion);
if (AvailableFeaturesSingleton.Instance.Is311OrMore)
{
var features = await client.ExchangeVersions().ConfigureAwait(false);
Expand Down
3 changes: 3 additions & 0 deletions RabbitMQ.Stream.Client/Consts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ internal static class Consts
internal const string SubscriptionPropertyFilterPrefix = "filter.";
internal const string SubscriptionPropertyMatchUnfiltered = "match-unfiltered";

internal const string FilterNotSupported = "Filtering is not supported by the broker "
+ "(requires RabbitMQ 3.13+ and stream_filtering feature flag activated)";

internal static int RandomShort()
{
return Random.Shared.Next(500, 1500);
Expand Down
3 changes: 1 addition & 2 deletions RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ internal void Validate()

if (IsFiltering && !AvailableFeaturesSingleton.Instance.PublishFilter)
{
throw new UnsupportedOperationException("Broker does not support filtering. You need " +
"RabbitMQ 3.13.0 or later.");
throw new UnsupportedOperationException(Consts.FilterNotSupported);
}

switch (ConsumerFilter)
Expand Down
3 changes: 1 addition & 2 deletions RabbitMQ.Stream.Client/RawProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ internal void Validate()
{
if (Filter is { FilterValue: not null } && !AvailableFeaturesSingleton.Instance.PublishFilter)
{
throw new UnsupportedOperationException("Broker does not support filtering. " +
"You need RabbitMQ 3.13.0 or later.");
throw new UnsupportedOperationException(Consts.FilterNotSupported);
}
}
}
Expand Down
19 changes: 10 additions & 9 deletions docs/StreamFilter/StreamFilter/FilterConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ public static async Task Start(string streamName)
builder.AddFilter("RabbitMQ.Stream", LogLevel.Information);
});

var logger = loggerFactory.CreateLogger<Consumer>();
var loggerMain = loggerFactory.CreateLogger<FilterConsumer>();
var consumerLogger = loggerFactory.CreateLogger<Consumer>();
var streamLogger = loggerFactory.CreateLogger<StreamSystem>();
var mainLogger = loggerFactory.CreateLogger<FilterConsumer>();


var config = new StreamSystemConfig();
var system = await StreamSystem.Create(config).ConfigureAwait(false);
var system = await StreamSystem.Create(config, streamLogger).ConfigureAwait(false);
await system.CreateStream(new StreamSpec(streamName)).ConfigureAwait(false);
loggerMain.LogInformation("FilterConsumer connected to RabbitMQ. StreamName {StreamName}", streamName);
mainLogger.LogInformation("FilterConsumer connected to RabbitMQ. StreamName {StreamName}", streamName);


// tag::consumer-filter[]
Expand All @@ -38,19 +39,19 @@ public static async Task Start(string streamName)
// This is mandatory for enabling the filter
Filter = new ConsumerFilter()
{
Values = new List<string>() {"Alabama"},// <1>
Values = new List<string>() {"Alabama"}, // <1>
PostFilter = message => message.ApplicationProperties["state"].Equals("Alabama"), // <2>
MatchUnfiltered = true
MatchUnfiltered = true
},
MessageHandler = (_, _, _, message) =>
{
logger.LogInformation("Received message with state {State} - consumed {Consumed}",
consumerLogger.LogInformation("Received message with state {State} - consumed {Consumed}",
message.ApplicationProperties["state"], ++consumedMessages);
return Task.CompletedTask;
}
// end::consumer-filter[]
}).ConfigureAwait(false);
}, consumerLogger).ConfigureAwait(false);

await Task.Delay(2000).ConfigureAwait(false);
await consumer.Close().ConfigureAwait(false);
await system.Close().ConfigureAwait(false);
Expand Down
15 changes: 8 additions & 7 deletions docs/StreamFilter/StreamFilter/FilterProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ public static async Task Start(string streamName)
builder.AddFilter("RabbitMQ.Stream", LogLevel.Information);
});

var logger = loggerFactory.CreateLogger<Producer>();
var loggerMain = loggerFactory.CreateLogger<FilterProducer>();
var producerLogger = loggerFactory.CreateLogger<Producer>();
var streamLogger = loggerFactory.CreateLogger<StreamSystem>();
var mainLogger = loggerFactory.CreateLogger<FilterConsumer>();


var config = new StreamSystemConfig();
var system = await StreamSystem.Create(config).ConfigureAwait(false);
var system = await StreamSystem.Create(config, streamLogger).ConfigureAwait(false);
await system.CreateStream(new StreamSpec(streamName)).ConfigureAwait(false);
loggerMain.LogInformation("FilterProducer connected to RabbitMQ. StreamName {StreamName}", streamName);
mainLogger.LogInformation("FilterProducer connected to RabbitMQ. StreamName {StreamName}", streamName);

var producer = await Producer.Create(new ProducerConfig(system, streamName)
{
Expand All @@ -39,7 +40,7 @@ public static async Task Start(string streamName)
FilterValue = message => message.ApplicationProperties["state"].ToString(), // <1>
}
// end::producer-filter[]
}).ConfigureAwait(false);
},producerLogger).ConfigureAwait(false);

const int ToSend = 100;

Expand All @@ -62,14 +63,14 @@ async Task SendTo(string state)
// Send the first 200 messages with state "New York"
// then we wait a bit to be sure that all the messages will go in a chunk
await SendTo("New York").ConfigureAwait(false);
loggerMain.LogInformation("Sent: {MessagesSent} - filter value: {FilerValue}", ToSend * 2, "New York");
mainLogger.LogInformation("Sent: {MessagesSent} - filter value: {FilerValue}", ToSend * 2, "New York");

// Wait a bit to be sure that all the messages will go in a chunk
await Task.Delay(2000).ConfigureAwait(false);

// Send the second 200 messages with the Alabama state
await SendTo("Alabama").ConfigureAwait(false);
loggerMain.LogInformation("Sent: {MessagesSent} - filter value: {FilerValue}", ToSend * 2, "Alabama");
mainLogger.LogInformation("Sent: {MessagesSent} - filter value: {FilerValue}", ToSend * 2, "Alabama");
await Task.Delay(1000).ConfigureAwait(false);
await producer.Close().ConfigureAwait(false);
await system.Close().ConfigureAwait(false);
Expand Down
4 changes: 4 additions & 0 deletions docs/StreamFilter/StreamFilter/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
Server Side Filter Example
--------------------------

WARNING: Filtering requires *RabbitMQ 3.13* or more and the stream_filter feature flag enabled.


This example shows how to use the server side filter to filter the data on the server side.


### Running the Producer

```
Expand Down
2 changes: 1 addition & 1 deletion docs/asciidoc/advanced-topics.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

==== Filtering

WARNING: Filtering requires *RabbitMQ 3.13* or more.
WARNING: Filtering requires *RabbitMQ 3.13* or more and the stream_filter feature flag enabled.

RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side.
This helps to save network bandwidth when a consuming application needs only a subset of messages, e.g. the messages from a given geographical region.
Expand Down