Skip to content

Commit 075c691

Browse files
committed
* Remove dupicated code around validating message annotation key
* Move code out of `Utils` that is only used for tests
1 parent 9a82e0a commit 075c691

File tree

5 files changed

+114
-95
lines changed

5 files changed

+114
-95
lines changed

RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -233,15 +233,15 @@ public IDictionary<object, object> Properties()
233233
public IMessage Annotation(string key, object value)
234234
{
235235
EnsureAnnotations();
236-
ValidateMessageAnnotationKey(key);
236+
Utils.ValidateMessageAnnotationKey(key);
237237
NativeMessage.MessageAnnotations[new Symbol(key)] = value;
238238
return this;
239239
}
240240

241241
public object Annotation(string key)
242242
{
243243
ThrowIfAnnotationsNotSet();
244-
ValidateMessageAnnotationKey(key);
244+
Utils.ValidateMessageAnnotationKey(key);
245245
return NativeMessage.MessageAnnotations[new Symbol(key)];
246246
}
247247

@@ -321,13 +321,5 @@ private void EnsureApplicationProperties()
321321
{
322322
NativeMessage.ApplicationProperties ??= new ApplicationProperties();
323323
}
324-
325-
private static void ValidateMessageAnnotationKey(string key)
326-
{
327-
if (false == key.StartsWith("x-"))
328-
{
329-
throw new ArgumentOutOfRangeException($"Message annotation key must start with 'x-': {key}");
330-
}
331-
}
332324
}
333325
}

RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void Discard(Dictionary<string, object> annotations)
6969
Utils.ValidateMessageAnnotations(annotations);
7070

7171
Fields messageAnnotations = new();
72-
foreach (var kvp in annotations)
72+
foreach (KeyValuePair<string, object> kvp in annotations)
7373
{
7474
messageAnnotations.Add(new Symbol(kvp.Key), kvp.Value);
7575
}

RabbitMQ.AMQP.Client/Utils.cs

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
using System;
66
using System.Collections.Generic;
7-
using System.Linq;
87
using System.Security.Cryptography;
98
using System.Text;
109
using System.Threading.Tasks;
@@ -40,34 +39,6 @@ internal static int RandomNext(int minValue = 0, int maxValue = 1024)
4039
#endif
4140
}
4241

43-
internal static byte[] RandomBytes(uint length = 16)
44-
{
45-
byte[] buffer = new byte[length];
46-
#if NET6_0_OR_GREATER
47-
s_random.NextBytes(buffer);
48-
#else
49-
s_random ??= new Random();
50-
s_random.NextBytes(buffer);
51-
#endif
52-
return buffer;
53-
}
54-
55-
internal static string RandomString(uint length = 16)
56-
{
57-
const string str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_";
58-
int strLen = str.Length;
59-
60-
StringBuilder sb = new((int)length);
61-
62-
int idx;
63-
for (int i = 0; i < length; i++)
64-
{
65-
idx = RandomNext(0, strLen);
66-
sb.Append(str[idx]);
67-
}
68-
return sb.ToString();
69-
}
70-
7142
internal static string GenerateQueueName()
7243
{
7344
return GenerateName(DefaultPrefix);
@@ -213,10 +184,17 @@ internal static bool CompareMap(Map map1, Map map2)
213184

214185
internal static void ValidateMessageAnnotations(Dictionary<string, object> annotations)
215186
{
216-
foreach (var kvp in annotations.Where(kvp => !kvp.Key.StartsWith("x-")))
187+
foreach (KeyValuePair<string, object> kvp in annotations)
188+
{
189+
ValidateMessageAnnotationKey(kvp.Key);
190+
}
191+
}
192+
193+
internal static void ValidateMessageAnnotationKey(string key)
194+
{
195+
if (false == key.StartsWith("x-"))
217196
{
218-
throw new ArgumentException(
219-
$"Message annotation keys must start with 'x-': {kvp.Key}");
197+
throw new ArgumentOutOfRangeException($"Message annotation key must start with 'x-': {key}");
220198
}
221199
}
222200

Tests/Consumer/StreamConsumerTests.cs

Lines changed: 7 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@ public async Task FilterExpressionApplicationProperties()
569569
Guid expected4 = Guid.NewGuid();
570570
await PublishAsync(q, messageCount, (_, msg) => msg.Property("foo", expected4));
571571

572-
byte[] expected5 = Utils.RandomBytes(128);
572+
byte[] expected5 = RandomBytes(128);
573573
await PublishAsync(q, messageCount, (_, msg) => msg.Property("foo", expected5));
574574

575575
const string expected6 = "bar";
@@ -856,12 +856,12 @@ public async Task FilterExpressionPropertiesAndApplicationProperties()
856856
SkipIfNotRabbitMQ_4_1_0();
857857

858858
const int messageCount = 10;
859-
string subject = Utils.RandomString();
860-
string appKey = Utils.RandomString();
861-
int appValue = Utils.RandomNext();
862-
byte[] body1 = Utils.RandomBytes();
863-
byte[] body2 = Utils.RandomBytes();
864-
byte[] body3 = Utils.RandomBytes();
859+
string subject = RandomString();
860+
string appKey = RandomString();
861+
int appValue = RandomNext();
862+
byte[] body1 = RandomBytes();
863+
byte[] body2 = RandomBytes();
864+
byte[] body3 = RandomBytes();
865865

866866
IQueueSpecification q = _management.Queue(_queueName).Stream().Queue();
867867
await q.DeclareAsync();
@@ -901,49 +901,4 @@ public async Task FilterExpressionPropertiesAndApplicationProperties()
901901
Assert.Equal(body3, mb);
902902
}
903903
}
904-
905-
// TODO nested interfaces are weird
906-
private async Task<IEnumerable<IMessage>> ConsumeAsync(ulong expectedMessageCount,
907-
Action<IConsumerBuilder.IStreamFilterOptions> streamFilterOptionsLogic)
908-
{
909-
Assert.NotNull(_connection);
910-
911-
TaskCompletionSource<bool> allMessagesConsumedTcs = CreateTaskCompletionSource();
912-
int receivedMessageCount = 0;
913-
914-
var messages = new List<IMessage>();
915-
SemaphoreSlim messagesSemaphore = new(1, 1);
916-
async Task MessageHandler(IContext cxt, IMessage msg)
917-
{
918-
await messagesSemaphore.WaitAsync();
919-
try
920-
{
921-
messages.Add(msg);
922-
if (Interlocked.Increment(ref receivedMessageCount) == (int)expectedMessageCount)
923-
{
924-
allMessagesConsumedTcs.SetResult(true);
925-
}
926-
cxt.Accept();
927-
}
928-
catch (Exception ex)
929-
{
930-
allMessagesConsumedTcs.SetException(ex);
931-
}
932-
finally
933-
{
934-
messagesSemaphore.Release();
935-
}
936-
}
937-
938-
IConsumerBuilder consumerBuilder = _connection.ConsumerBuilder().Queue(_queueName).MessageHandler(MessageHandler);
939-
streamFilterOptionsLogic(consumerBuilder.Stream().Offset(StreamOffsetSpecification.First).Filter());
940-
941-
using (IConsumer consumer = await consumerBuilder.BuildAndStartAsync())
942-
{
943-
await WhenTcsCompletes(allMessagesConsumedTcs);
944-
await consumer.CloseAsync();
945-
}
946-
947-
return messages;
948-
}
949904
}

Tests/IntegrationTest.cs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
using System.Collections.Generic;
77
using System.Globalization;
88
using System.Reflection;
9+
using System.Text;
10+
using System.Threading;
911
using System.Threading.Tasks;
1012
using RabbitMQ.AMQP.Client;
1113
using RabbitMQ.AMQP.Client.Impl;
@@ -17,6 +19,12 @@ namespace Tests;
1719

1820
public abstract class IntegrationTest : IAsyncLifetime
1921
{
22+
#if NET6_0_OR_GREATER
23+
private static readonly Random s_random = Random.Shared;
24+
#else
25+
[ThreadStatic] private static Random? s_random;
26+
#endif
27+
2028
protected readonly ITestOutputHelper _testOutputHelper;
2129
protected readonly string _testDisplayName = nameof(IntegrationTest);
2230
protected readonly TimeSpan _waitSpan = TimeSpan.FromSeconds(5);
@@ -105,6 +113,19 @@ public virtual async Task DisposeAsync()
105113
}
106114
}
107115

116+
protected static Random S_Random
117+
{
118+
get
119+
{
120+
#if NET6_0_OR_GREATER
121+
return s_random;
122+
#else
123+
s_random ??= new Random();
124+
return s_random;
125+
#endif
126+
}
127+
}
128+
108129
protected void SkipIfNotRabbitMQ_4_1_0()
109130
{
110131
Skip.If(_rabbitmqVersion.WithoutPrereleaseOrMetadata().ComparePrecedenceTo(s_rmq4_1_0) < 0, "At least RabbitMQ 4.1.0 required");
@@ -203,6 +224,79 @@ protected async Task PublishAsync(IQueueSpecification queueSpecification, ulong
203224
}
204225
}
205226

227+
protected async Task<IEnumerable<IMessage>> ConsumeAsync(ulong expectedMessageCount,
228+
Action<IConsumerBuilder.IStreamFilterOptions> streamFilterOptionsLogic)
229+
{
230+
Assert.NotNull(_connection);
231+
232+
TaskCompletionSource<bool> allMessagesConsumedTcs = CreateTaskCompletionSource();
233+
int receivedMessageCount = 0;
234+
235+
var messages = new List<IMessage>();
236+
SemaphoreSlim messagesSemaphore = new(1, 1);
237+
async Task MessageHandler(IContext cxt, IMessage msg)
238+
{
239+
await messagesSemaphore.WaitAsync();
240+
try
241+
{
242+
messages.Add(msg);
243+
receivedMessageCount++;
244+
if (receivedMessageCount == (int)expectedMessageCount)
245+
{
246+
allMessagesConsumedTcs.SetResult(true);
247+
}
248+
cxt.Accept();
249+
}
250+
catch (Exception ex)
251+
{
252+
allMessagesConsumedTcs.SetException(ex);
253+
}
254+
finally
255+
{
256+
messagesSemaphore.Release();
257+
}
258+
}
259+
260+
IConsumerBuilder consumerBuilder = _connection.ConsumerBuilder().Queue(_queueName).MessageHandler(MessageHandler);
261+
streamFilterOptionsLogic(consumerBuilder.Stream().Offset(StreamOffsetSpecification.First).Filter());
262+
263+
using (IConsumer consumer = await consumerBuilder.BuildAndStartAsync())
264+
{
265+
await WhenTcsCompletes(allMessagesConsumedTcs);
266+
await consumer.CloseAsync();
267+
}
268+
269+
return messages;
270+
}
271+
272+
protected static byte[] RandomBytes(uint length = 128)
273+
{
274+
byte[] buffer = new byte[length];
275+
S_Random.NextBytes(buffer);
276+
return buffer;
277+
}
278+
279+
protected static int RandomNext(int minValue = 0, int maxValue = 1024)
280+
{
281+
return S_Random.Next(minValue, maxValue);
282+
}
283+
284+
protected static string RandomString(uint length = 128)
285+
{
286+
const string str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_";
287+
int strLen = str.Length;
288+
289+
StringBuilder sb = new((int)length);
290+
291+
int idx;
292+
for (int i = 0; i < length; i++)
293+
{
294+
idx = RandomNext(0, strLen);
295+
sb.Append(str[idx]);
296+
}
297+
return sb.ToString();
298+
}
299+
206300
private string InitTestDisplayName()
207301
{
208302
string rv = _testDisplayName;

0 commit comments

Comments
 (0)