Skip to content

Commit c3d8e55

Browse files
Update secret functionality (#342)
* add update secret functionality. * Update the secrets for all the pool connections Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Gabriele Santomaggio <[email protected]>
1 parent d70458a commit c3d8e55

File tree

7 files changed

+107
-11
lines changed

7 files changed

+107
-11
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public class Client : IClient
117117

118118
private uint correlationId = 0; // allow for some pre-amble
119119

120-
private Connection connection;
120+
private Connection _connection;
121121

122122
private readonly ConcurrentDictionary<uint, IValueTaskSource> requests = new();
123123

@@ -148,7 +148,7 @@ public class Client : IClient
148148

149149
public int ConfirmFrames => confirmFrames;
150150

151-
public int IncomingFrames => connection.NumFrames;
151+
public int IncomingFrames => _connection.NumFrames;
152152

153153
//public int IncomingChannelCount => this.incoming.Reader.Count;
154154
private static readonly object Obj = new();
@@ -176,7 +176,7 @@ public bool IsClosed
176176
{
177177
get
178178
{
179-
if (connection.IsClosed)
179+
if (_connection.IsClosed)
180180
{
181181
isClosed = true;
182182
}
@@ -208,10 +208,10 @@ private async Task OnConnectionClosed(string reason)
208208
public static async Task<Client> Create(ClientParameters parameters, ILogger logger = null)
209209
{
210210
var client = new Client(parameters, logger);
211-
client.connection = await Connection
211+
client._connection = await Connection
212212
.Create(parameters.Endpoint, client.HandleIncoming, client.HandleClosed, parameters.Ssl, logger)
213213
.ConfigureAwait(false);
214-
client.connection.ClientId = client.ClientId;
214+
client._connection.ClientId = client.ClientId;
215215
// exchange properties
216216
var peerPropertiesResponse = await client.Request<PeerPropertiesRequest, PeerPropertiesResponse>(corr =>
217217
new PeerPropertiesRequest(corr, parameters.Properties)).ConfigureAwait(false);
@@ -283,6 +283,23 @@ await client.Publish(new TuneRequest(0,
283283
return client;
284284
}
285285

286+
public async Task UpdateSecret(string newSecret)
287+
{
288+
var saslData = Encoding.UTF8.GetBytes($"\0{Parameters.UserName}\0{newSecret}");
289+
290+
var authResponse =
291+
await Request<SaslAuthenticateRequest, SaslAuthenticateResponse>(corr =>
292+
new SaslAuthenticateRequest(
293+
corr,
294+
Parameters.AuthMechanism.ToString().ToUpperInvariant(),
295+
saslData))
296+
.ConfigureAwait(false);
297+
298+
ClientExceptions.MaybeThrowException(
299+
authResponse.ResponseCode,
300+
"Error while updating secret: the secret will not be updated.");
301+
}
302+
286303
public async ValueTask<bool> Publish(Publish publishMsg)
287304
{
288305
var publishTask = await Publish<Publish>(publishMsg).ConfigureAwait(false);
@@ -296,7 +313,7 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
296313
{
297314
try
298315
{
299-
return connection.Write(msg);
316+
return _connection.Write(msg);
300317
}
301318
catch (Exception e)
302319
{
@@ -757,7 +774,7 @@ public async Task<CloseResponse> Close(string reason)
757774
InternalClose();
758775
try
759776
{
760-
connection.UpdateCloseStatus(ConnectionClosedReason.Normal);
777+
_connection.UpdateCloseStatus(ConnectionClosedReason.Normal);
761778
var result =
762779
await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason),
763780
TimeSpan.FromSeconds(10)).ConfigureAwait(false);
@@ -771,11 +788,11 @@ public async Task<CloseResponse> Close(string reason)
771788
}
772789
catch (Exception e)
773790
{
774-
_logger.LogError(e, "An error occurred while calling {CalledFunction}", nameof(connection.Dispose));
791+
_logger.LogError(e, "An error occurred while calling {CalledFunction}", nameof(_connection.Dispose));
775792
}
776793
finally
777794
{
778-
connection.Dispose();
795+
_connection.Dispose();
779796
}
780797

781798
return new CloseResponse(0, ResponseCode.Ok);

RabbitMQ.Stream.Client/ConnectionsPool.cs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public ConnectionsPool(int maxConnections, byte idsPerConnection)
130130
/// Value is the connection item
131131
/// The Connections contains all the connections created by the pool
132132
/// </summary>
133-
internal ConcurrentDictionary<string, ConnectionItem> Connections { get; } = new();
133+
private ConcurrentDictionary<string, ConnectionItem> Connections { get; } = new();
134134

135135
/// <summary>
136136
/// GetOrCreateClient returns a client for the given brokerInfo.
@@ -162,7 +162,8 @@ internal async Task<IClient> GetOrCreateClient(string brokerInfo, Func<Task<ICli
162162
// let's remove it from the pool
163163
Connections.TryRemove(connectionItem.Client.ClientId, out _);
164164
// let's create a new one
165-
connectionItem = new ConnectionItem(brokerInfo, _idsPerConnection, await createClient().ConfigureAwait(false));
165+
connectionItem = new ConnectionItem(brokerInfo, _idsPerConnection,
166+
await createClient().ConfigureAwait(false));
166167
Connections.TryAdd(connectionItem.Client.ClientId, connectionItem);
167168

168169
return connectionItem.Client;
@@ -185,6 +186,7 @@ internal async Task<IClient> GetOrCreateClient(string brokerInfo, Func<Task<ICli
185186
_semaphoreSlim.Release();
186187
}
187188
}
189+
188190
public void Remove(string clientId)
189191
{
190192
_semaphoreSlim.Wait();
@@ -202,6 +204,23 @@ public void Remove(string clientId)
202204
}
203205
}
204206

207+
public async Task UpdateSecrets(string newSecret)
208+
{
209+
await _semaphoreSlim.WaitAsync().ConfigureAwait(false);
210+
try
211+
{
212+
foreach (var connectionItem in Connections.Values)
213+
{
214+
await connectionItem.Client.UpdateSecret(newSecret).ConfigureAwait(false);
215+
connectionItem.Client.Parameters.Password = newSecret;
216+
}
217+
}
218+
finally
219+
{
220+
_semaphoreSlim.Release();
221+
}
222+
}
223+
205224
public void MaybeClose(string clientId, string reason)
206225
{
207226
_semaphoreSlim.Wait();

RabbitMQ.Stream.Client/IClient.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public interface IClient
2828
IDictionary<byte, (string, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>))> Publishers { get; }
2929
IDictionary<byte, (string, ConsumerEvents)> Consumers { get; }
3030

31+
Task UpdateSecret(string newSecret);
32+
3133
public bool IsClosed { get; }
3234
}
3335
}

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey)
4141
RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.StreamStatsResponse>
4242
RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IOffsetType offsetType, ushort initialCredit, System.Collections.Generic.Dictionary<string, string> properties, System.Func<RabbitMQ.Stream.Client.Deliver, System.Threading.Tasks.Task> deliverHandler, System.Func<bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>> consumerUpdateHandler = null, RabbitMQ.Stream.Client.ConnectionsPool pool = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
4343
RabbitMQ.Stream.Client.Client.Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.UnsubscribeResponse>
44+
RabbitMQ.Stream.Client.Client.UpdateSecret(string newSecret) -> System.Threading.Tasks.Task
4445
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
4546
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void
4647
RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler
@@ -68,6 +69,7 @@ RabbitMQ.Stream.Client.ConnectionsPool.MaybeClose(string clientId, string reason
6869
RabbitMQ.Stream.Client.ConnectionsPool.Remove(string clientId) -> void
6970
RabbitMQ.Stream.Client.ConnectionsPool.RemoveConsumerEntityFromStream(string clientId, byte id, string stream) -> void
7071
RabbitMQ.Stream.Client.ConnectionsPool.RemoveProducerEntityFromStream(string clientId, byte id, string stream) -> void
72+
RabbitMQ.Stream.Client.ConnectionsPool.UpdateSecrets(string newSecret) -> System.Threading.Tasks.Task
7173
RabbitMQ.Stream.Client.ConsumerEvents
7274
RabbitMQ.Stream.Client.ConsumerEvents.ConsumerEvents() -> void
7375
RabbitMQ.Stream.Client.ConsumerEvents.ConsumerEvents(System.Func<RabbitMQ.Stream.Client.Deliver, System.Threading.Tasks.Task> deliverHandler, System.Func<bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>> consumerUpdateHandler) -> void
@@ -103,6 +105,7 @@ RabbitMQ.Stream.Client.IClient.ClientId.init -> void
103105
RabbitMQ.Stream.Client.IClient.Consumers.get -> System.Collections.Generic.IDictionary<byte, (string, RabbitMQ.Stream.Client.ConsumerEvents)>
104106
RabbitMQ.Stream.Client.IClient.IsClosed.get -> bool
105107
RabbitMQ.Stream.Client.IClient.Publishers.get -> System.Collections.Generic.IDictionary<byte, (string, (System.Action<System.ReadOnlyMemory<ulong>>, System.Action<(ulong, RabbitMQ.Stream.Client.ResponseCode)[]>))>
108+
RabbitMQ.Stream.Client.IClient.UpdateSecret(string newSecret) -> System.Threading.Tasks.Task
106109
RabbitMQ.Stream.Client.IClosable
107110
RabbitMQ.Stream.Client.IClosable.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
108111
RabbitMQ.Stream.Client.IConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
@@ -276,6 +279,7 @@ RabbitMQ.Stream.Client.StreamSystem.CreateRawSuperStreamProducer(RabbitMQ.Stream
276279
RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ISuperStreamConsumer>
277280
RabbitMQ.Stream.Client.StreamSystem.StreamInfo(string streamName) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamInfo>
278281
RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamStats>
282+
RabbitMQ.Stream.Client.StreamSystem.UpdateSecret(string newSecret) -> System.Threading.Tasks.Task
279283
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
280284
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void
281285
RabbitMQ.Stream.Client.StreamSystemConfig.ConnectionPoolConfig.get -> RabbitMQ.Stream.Client.ConnectionPoolConfig
@@ -286,6 +290,8 @@ RabbitMQ.Stream.Client.UnknownCommandException
286290
RabbitMQ.Stream.Client.UnknownCommandException.UnknownCommandException(string s) -> void
287291
RabbitMQ.Stream.Client.UnsupportedOperationException
288292
RabbitMQ.Stream.Client.UnsupportedOperationException.UnsupportedOperationException(string s) -> void
293+
RabbitMQ.Stream.Client.UpdateSecretFailureException
294+
RabbitMQ.Stream.Client.UpdateSecretFailureException.UpdateSecretFailureException(string s) -> void
289295
static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, System.Func<System.Memory<byte>, System.Threading.Tasks.Task> commandCallback, System.Func<string, System.Threading.Tasks.Task> closedCallBack, RabbitMQ.Stream.Client.SslOption sslOption, Microsoft.Extensions.Logging.ILogger logger) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Connection>
290296
static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.ReadOnlySequence<byte> seq, uint len) -> RabbitMQ.Stream.Client.Message
291297
static RabbitMQ.Stream.Client.RawConsumer.Create(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.RawConsumerConfig config, RabbitMQ.Stream.Client.StreamInfo metaStreamInfo, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IConsumer>

RabbitMQ.Stream.Client/StreamSystem.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,17 @@ private async Task MayBeReconnectLocator()
153153
}
154154
}
155155

156+
public async Task UpdateSecret(string newSecret)
157+
{
158+
if (_client.IsClosed)
159+
throw new UpdateSecretFailureException("Cannot update a closed connection.");
160+
161+
await _client.UpdateSecret(newSecret).ConfigureAwait(false);
162+
_clientParameters.Password = newSecret;
163+
_client.Parameters.Password = newSecret;
164+
165+
}
166+
156167
public async Task<ISuperStreamProducer> CreateRawSuperStreamProducer(
157168
RawSuperStreamProducerConfig rawSuperStreamProducerConfig, ILogger logger = null)
158169
{
@@ -542,4 +553,11 @@ public StreamSystemInitialisationException(string error) : base(error)
542553
{
543554
}
544555
}
556+
public class UpdateSecretFailureException : ProtocolException
557+
{
558+
public UpdateSecretFailureException(string s)
559+
: base(s)
560+
{
561+
}
562+
}
545563
}

Tests/SystemTests.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,38 @@ await Assert.ThrowsAsync<AuthenticationFailureException>(
159159
);
160160
}
161161

162+
[Fact]
163+
public async void UpdateSecretWithValidSecretShouldNoRaiseExceptions()
164+
{
165+
var config = new StreamSystemConfig { UserName = "guest", Password = "guest" }; // specified for readability
166+
var streamSystem = await StreamSystem.Create(config);
167+
168+
await streamSystem.UpdateSecret("guest");
169+
}
170+
171+
[Fact]
172+
public async void UpdateSecretWithInvalidSecretShouldThrowAuthenticationFailureException()
173+
{
174+
var config = new StreamSystemConfig { UserName = "guest", Password = "guest" }; // specified for readability
175+
var streamSystem = await StreamSystem.Create(config);
176+
177+
await Assert.ThrowsAsync<AuthenticationFailureException>(
178+
async () => { await streamSystem.UpdateSecret("not_valid_secret"); }
179+
);
180+
}
181+
182+
[Fact]
183+
public async void UpdateSecretForClosedConnectionShouldThrowUpdateSecretFailureException()
184+
{
185+
var config = new StreamSystemConfig { UserName = "guest", Password = "guest" }; // specified for readability
186+
var streamSystem = await StreamSystem.Create(config);
187+
188+
await streamSystem.Close();
189+
await Assert.ThrowsAsync<UpdateSecretFailureException>(
190+
async () => { await streamSystem.UpdateSecret("guest"); }
191+
);
192+
}
193+
162194
[Fact]
163195
public async void CreateExistStreamIdempotentShouldNoRaiseExceptions()
164196
{

Tests/UnitTests.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public Task<CloseResponse> Close(string reason)
3232
}
3333

3434
public IDictionary<byte, (string, ConsumerEvents)> Consumers { get; }
35+
public Task UpdateSecret(string newSecret) => throw new NotImplementedException();
36+
3537
public bool IsClosed { get; }
3638

3739
public FakeClient(ClientParameters clientParameters)

0 commit comments

Comments
 (0)