Skip to content

Commit c8e3726

Browse files
authored
Add debug info during the login step (#301)
* Add debug info during the login step * releated to #299 * Add feature flag warning --- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 578589a commit c8e3726

File tree

9 files changed

+34
-22
lines changed

9 files changed

+34
-22
lines changed

RabbitMQ.Stream.Client/AvailableFeatures.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ internal class AvailableFeatures
1717

1818
public bool Is311OrMore { get; private set; }
1919

20+
public string BrokerVersion { get; private set; }
21+
2022
private static string ExtractVersion(string fullVersion)
2123
{
2224
const string Pattern = @"(\d+\.\d+\.\d+)";
@@ -30,6 +32,7 @@ private static string ExtractVersion(string fullVersion)
3032
public void SetServerVersion(string brokerVersion)
3133
{
3234
var v = ExtractVersion(brokerVersion);
35+
BrokerVersion = v;
3336
Is311OrMore = new System.Version(v) >= new System.Version("3.11.0");
3437
}
3538

RabbitMQ.Stream.Client/Client.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public static async Task<Client> Create(ClientParameters parameters, ILogger log
213213
// exchange properties
214214
var peerPropertiesResponse = await client.Request<PeerPropertiesRequest, PeerPropertiesResponse>(corr =>
215215
new PeerPropertiesRequest(corr, parameters.Properties)).ConfigureAwait(false);
216-
logger?.LogDebug("Server properties: {@Properties}", peerPropertiesResponse);
216+
logger?.LogDebug("Server properties: {@Properties}", peerPropertiesResponse.Properties);
217217

218218
//auth
219219
var saslHandshakeResponse =
@@ -254,6 +254,8 @@ await client.Publish(new TuneRequest(0,
254254
if (peerPropertiesResponse.Properties.TryGetValue("version", out var version))
255255
{
256256
AvailableFeaturesSingleton.Instance.SetServerVersion(version);
257+
logger?.LogDebug("Extracted BrokerVersion version: {Version}",
258+
AvailableFeaturesSingleton.Instance.BrokerVersion);
257259
if (AvailableFeaturesSingleton.Instance.Is311OrMore)
258260
{
259261
var features = await client.ExchangeVersions().ConfigureAwait(false);

RabbitMQ.Stream.Client/Consts.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ internal static class Consts
2020
internal const string SubscriptionPropertyFilterPrefix = "filter.";
2121
internal const string SubscriptionPropertyMatchUnfiltered = "match-unfiltered";
2222

23+
internal const string FilterNotSupported = "Filtering is not supported by the broker "
24+
+ "(requires RabbitMQ 3.13+ and stream_filtering feature flag activated)";
25+
2326
internal static int RandomShort()
2427
{
2528
return Random.Shared.Next(500, 1500);

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ internal void Validate()
8282

8383
if (IsFiltering && !AvailableFeaturesSingleton.Instance.PublishFilter)
8484
{
85-
throw new UnsupportedOperationException("Broker does not support filtering. You need " +
86-
"RabbitMQ 3.13.0 or later.");
85+
throw new UnsupportedOperationException(Consts.FilterNotSupported);
8786
}
8887

8988
switch (ConsumerFilter)

RabbitMQ.Stream.Client/RawProducer.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@ internal void Validate()
4444
{
4545
if (Filter is { FilterValue: not null } && !AvailableFeaturesSingleton.Instance.PublishFilter)
4646
{
47-
throw new UnsupportedOperationException("Broker does not support filtering. " +
48-
"You need RabbitMQ 3.13.0 or later.");
47+
throw new UnsupportedOperationException(Consts.FilterNotSupported);
4948
}
5049
}
5150
}

docs/StreamFilter/StreamFilter/FilterConsumer.cs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@ public static async Task Start(string streamName)
1818
builder.AddFilter("RabbitMQ.Stream", LogLevel.Information);
1919
});
2020

21-
var logger = loggerFactory.CreateLogger<Consumer>();
22-
var loggerMain = loggerFactory.CreateLogger<FilterConsumer>();
21+
var consumerLogger = loggerFactory.CreateLogger<Consumer>();
22+
var streamLogger = loggerFactory.CreateLogger<StreamSystem>();
23+
var mainLogger = loggerFactory.CreateLogger<FilterConsumer>();
2324

2425

2526
var config = new StreamSystemConfig();
26-
var system = await StreamSystem.Create(config).ConfigureAwait(false);
27+
var system = await StreamSystem.Create(config, streamLogger).ConfigureAwait(false);
2728
await system.CreateStream(new StreamSpec(streamName)).ConfigureAwait(false);
28-
loggerMain.LogInformation("FilterConsumer connected to RabbitMQ. StreamName {StreamName}", streamName);
29+
mainLogger.LogInformation("FilterConsumer connected to RabbitMQ. StreamName {StreamName}", streamName);
2930

3031

3132
// tag::consumer-filter[]
@@ -38,19 +39,19 @@ public static async Task Start(string streamName)
3839
// This is mandatory for enabling the filter
3940
Filter = new ConsumerFilter()
4041
{
41-
Values = new List<string>() {"Alabama"},// <1>
42+
Values = new List<string>() {"Alabama"}, // <1>
4243
PostFilter = message => message.ApplicationProperties["state"].Equals("Alabama"), // <2>
43-
MatchUnfiltered = true
44+
MatchUnfiltered = true
4445
},
4546
MessageHandler = (_, _, _, message) =>
4647
{
47-
logger.LogInformation("Received message with state {State} - consumed {Consumed}",
48+
consumerLogger.LogInformation("Received message with state {State} - consumed {Consumed}",
4849
message.ApplicationProperties["state"], ++consumedMessages);
4950
return Task.CompletedTask;
5051
}
5152
// end::consumer-filter[]
52-
}).ConfigureAwait(false);
53-
53+
}, consumerLogger).ConfigureAwait(false);
54+
5455
await Task.Delay(2000).ConfigureAwait(false);
5556
await consumer.Close().ConfigureAwait(false);
5657
await system.Close().ConfigureAwait(false);

docs/StreamFilter/StreamFilter/FilterProducer.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ public static async Task Start(string streamName)
2020
builder.AddFilter("RabbitMQ.Stream", LogLevel.Information);
2121
});
2222

23-
var logger = loggerFactory.CreateLogger<Producer>();
24-
var loggerMain = loggerFactory.CreateLogger<FilterProducer>();
23+
var producerLogger = loggerFactory.CreateLogger<Producer>();
24+
var streamLogger = loggerFactory.CreateLogger<StreamSystem>();
25+
var mainLogger = loggerFactory.CreateLogger<FilterConsumer>();
2526

2627

2728
var config = new StreamSystemConfig();
28-
var system = await StreamSystem.Create(config).ConfigureAwait(false);
29+
var system = await StreamSystem.Create(config, streamLogger).ConfigureAwait(false);
2930
await system.CreateStream(new StreamSpec(streamName)).ConfigureAwait(false);
30-
loggerMain.LogInformation("FilterProducer connected to RabbitMQ. StreamName {StreamName}", streamName);
31+
mainLogger.LogInformation("FilterProducer connected to RabbitMQ. StreamName {StreamName}", streamName);
3132

3233
var producer = await Producer.Create(new ProducerConfig(system, streamName)
3334
{
@@ -39,7 +40,7 @@ public static async Task Start(string streamName)
3940
FilterValue = message => message.ApplicationProperties["state"].ToString(), // <1>
4041
}
4142
// end::producer-filter[]
42-
}).ConfigureAwait(false);
43+
},producerLogger).ConfigureAwait(false);
4344

4445
const int ToSend = 100;
4546

@@ -62,14 +63,14 @@ async Task SendTo(string state)
6263
// Send the first 200 messages with state "New York"
6364
// then we wait a bit to be sure that all the messages will go in a chunk
6465
await SendTo("New York").ConfigureAwait(false);
65-
loggerMain.LogInformation("Sent: {MessagesSent} - filter value: {FilerValue}", ToSend * 2, "New York");
66+
mainLogger.LogInformation("Sent: {MessagesSent} - filter value: {FilerValue}", ToSend * 2, "New York");
6667

6768
// Wait a bit to be sure that all the messages will go in a chunk
6869
await Task.Delay(2000).ConfigureAwait(false);
6970

7071
// Send the second 200 messages with the Alabama state
7172
await SendTo("Alabama").ConfigureAwait(false);
72-
loggerMain.LogInformation("Sent: {MessagesSent} - filter value: {FilerValue}", ToSend * 2, "Alabama");
73+
mainLogger.LogInformation("Sent: {MessagesSent} - filter value: {FilerValue}", ToSend * 2, "Alabama");
7374
await Task.Delay(1000).ConfigureAwait(false);
7475
await producer.Close().ConfigureAwait(false);
7576
await system.Close().ConfigureAwait(false);

docs/StreamFilter/StreamFilter/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
Server Side Filter Example
22
--------------------------
33

4+
WARNING: Filtering requires *RabbitMQ 3.13* or more and the stream_filter feature flag enabled.
5+
6+
47
This example shows how to use the server side filter to filter the data on the server side.
58

9+
610
### Running the Producer
711

812
```

docs/asciidoc/advanced-topics.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
==== Filtering
66

7-
WARNING: Filtering requires *RabbitMQ 3.13* or more.
7+
WARNING: Filtering requires *RabbitMQ 3.13* or more and the stream_filter feature flag enabled.
88

99
RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side.
1010
This helps to save network bandwidth when a consuming application needs only a subset of messages, e.g. the messages from a given geographical region.

0 commit comments

Comments
 (0)