Skip to content

Commit 437d6b5

Browse files
authored
Merge 616cb81 into 398145f
2 parents 398145f + 616cb81 commit 437d6b5

File tree

5 files changed

+61
-8
lines changed

5 files changed

+61
-8
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818

1919
namespace RabbitMQ.Stream.Client
2020
{
21+
public enum AuthMechanism
22+
{
23+
Plain,
24+
External,
25+
}
26+
2127
public record ClientParameters
2228
{
2329
// internal list of endpoints where the client will try to connect
@@ -63,6 +69,8 @@ public string ClientProvidedName
6369
public SslOption Ssl { get; set; } = new SslOption();
6470

6571
public AddressResolver AddressResolver { get; set; } = null;
72+
73+
public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain;
6674
}
6775

6876
internal readonly struct OutgoingMsg : ICommand
@@ -214,11 +222,20 @@ await client
214222
.ConfigureAwait(false);
215223
logger?.LogDebug("Sasl mechanism: {Mechanisms}", saslHandshakeResponse.Mechanisms);
216224

225+
var isValid = saslHandshakeResponse.Mechanisms.Contains(parameters.AuthMechanism.ToString().ToUpper(),
226+
StringComparer.OrdinalIgnoreCase);
227+
if (!isValid)
228+
{
229+
throw new AuthMechanismNotSupportedException(
230+
$"Sasl mechanism {parameters.AuthMechanism} is not supported by the server");
231+
}
232+
217233
var saslData = Encoding.UTF8.GetBytes($"\0{parameters.UserName}\0{parameters.Password}");
218234
var authResponse =
219235
await client
220236
.Request<SaslAuthenticateRequest, SaslAuthenticateResponse>(corr =>
221-
new SaslAuthenticateRequest(corr, "PLAIN", saslData)).ConfigureAwait(false);
237+
new SaslAuthenticateRequest(corr, parameters.AuthMechanism.ToString().ToUpper(), saslData))
238+
.ConfigureAwait(false);
222239
ClientExceptions.MaybeThrowException(authResponse.ResponseCode, parameters.UserName);
223240

224241
//tune

RabbitMQ.Stream.Client/ClientExceptions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,12 @@ public RouteNotFoundException(string s)
9393
{
9494
}
9595
}
96+
97+
public class AuthMechanismNotSupportedException : Exception
98+
{
99+
public AuthMechanismNotSupportedException(string s)
100+
: base(s)
101+
{
102+
}
103+
}
96104
}

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,17 @@ const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort
22
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
33
RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void
44
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
5+
RabbitMQ.Stream.Client.AuthMechanism
6+
RabbitMQ.Stream.Client.AuthMechanism.External = 1 -> RabbitMQ.Stream.Client.AuthMechanism
7+
RabbitMQ.Stream.Client.AuthMechanism.Plain = 0 -> RabbitMQ.Stream.Client.AuthMechanism
8+
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException
9+
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException.AuthMechanismNotSupportedException(string s) -> void
510
RabbitMQ.Stream.Client.Chunk.Data.get -> System.Memory<byte>
611
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
712
RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>
813
RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.StreamStatsResponse>
14+
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
15+
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void
916
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
1017
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
1118
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
@@ -53,6 +60,8 @@ RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse() -> void
5360
RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, System.Collections.Generic.IDictionary<string, long> statistic) -> void
5461
RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Span<byte> span) -> int
5562
RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamStats>
63+
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
64+
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void
5665
static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, System.Func<System.Memory<byte>, System.Threading.Tasks.Task> commandCallback, System.Func<string, System.Threading.Tasks.Task> closedCallBack, RabbitMQ.Stream.Client.SslOption sslOption, Microsoft.Extensions.Logging.ILogger logger) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Connection>
5766
static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.ReadOnlySequence<byte> seq, uint len) -> RabbitMQ.Stream.Client.Message
5867
static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer>

RabbitMQ.Stream.Client/StreamSystem.cs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public record StreamSystemConfig : INamedEntity
2828

2929
public AddressResolver AddressResolver { get; set; }
3030
public string ClientProvidedName { get; set; } = "dotnet-stream-locator";
31+
32+
public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain;
3133
}
3234

3335
public class StreamSystem
@@ -56,7 +58,8 @@ public static async Task<StreamSystem> Create(StreamSystemConfig config, ILogger
5658
AddressResolver = config.AddressResolver,
5759
ClientProvidedName = config.ClientProvidedName,
5860
Heartbeat = config.Heartbeat,
59-
Endpoints = config.Endpoints
61+
Endpoints = config.Endpoints,
62+
AuthMechanism = config.AuthMechanism
6063
};
6164
// create the metadata client connection
6265
foreach (var endPoint in clientParams.Endpoints)
@@ -73,14 +76,19 @@ public static async Task<StreamSystem> Create(StreamSystemConfig config, ILogger
7376
}
7477
catch (Exception e)
7578
{
76-
if (e is ProtocolException or SslException)
79+
switch (e)
7780
{
78-
logger?.LogError(e, "ProtocolException or SslException to {@EndPoint}", endPoint);
79-
throw;
81+
case ProtocolException or SslException:
82+
logger?.LogError(e, "ProtocolException or SslException to {@EndPoint}", endPoint);
83+
throw;
84+
case AuthMechanismNotSupportedException:
85+
logger?.LogError(e, "SalsNotSupportedException to {@EndPoint}", endPoint);
86+
throw;
87+
default:
88+
// hopefully all implementations of endpoint have a nice ToString()
89+
logger?.LogError(e, "Error connecting to {@TargetEndpoint}. Trying next endpoint", endPoint);
90+
break;
8091
}
81-
82-
// hopefully all implementations of endpoint have a nice ToString()
83-
logger?.LogError(e, "Error connecting to {@TargetEndpoint}. Trying next endpoint", endPoint);
8492
}
8593
}
8694

Tests/SystemTests.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,17 @@ await Assert.ThrowsAsync<ArgumentException>(
245245
await system.Close();
246246
}
247247

248+
[Fact]
249+
public async void ValidateSalsExternalConfiguration()
250+
{
251+
// the user can set the SALs configuration externally
252+
// this test validates that the configuration is supported by the server
253+
var config = new StreamSystemConfig() { AuthMechanism = AuthMechanism.External };
254+
await Assert.ThrowsAsync<AuthMechanismNotSupportedException>(
255+
async () => { await StreamSystem.Create(config); }
256+
);
257+
}
258+
248259
[Fact]
249260
public async void CloseProducerConsumerAfterForceCloseShouldNotRaiseError()
250261
{

0 commit comments

Comments
 (0)