Skip to content

Commit 3af97ef

Browse files
author
Stefán J. Sigurðarson
committed
Adding single and multi-threaded message integrity tests.
1 parent 8dedb6f commit 3af97ef

File tree

1 file changed

+202
-0
lines changed

1 file changed

+202
-0
lines changed

projects/Unit/TestMessageIntegrity.cs

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2020 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
using System.Collections.Generic;
34+
using System.Security.Cryptography;
35+
using System.Threading;
36+
using System.Threading.Tasks;
37+
38+
using NUnit.Framework;
39+
40+
using RabbitMQ.Client.Events;
41+
42+
namespace RabbitMQ.Client.Unit
43+
{
44+
[TestFixture]
45+
public class TestMessageIntegrity
46+
{
47+
[ThreadStatic]
48+
private static SHA1 _hasher = SHA1.Create();
49+
50+
[TestCase(100)]
51+
[TestCase(1000)]
52+
[TestCase(10000)]
53+
[TestCase(100000)]
54+
public void SingleThreaded(int messagesToSend)
55+
{
56+
_hasher = _hasher ?? SHA1.Create();
57+
Random _randomizer = new Random();
58+
ManualResetEventSlim allMessagesReceived = new ManualResetEventSlim();
59+
int messagesReceived = 0;
60+
var connFactory = new ConnectionFactory()
61+
{
62+
RequestedHeartbeat = TimeSpan.FromSeconds(60),
63+
AutomaticRecoveryEnabled = false,
64+
DispatchConsumersAsync = true
65+
};
66+
67+
using (IConnection conn = connFactory.CreateConnection())
68+
using (IConnection subConn = connFactory.CreateConnection())
69+
{
70+
using (IModel subModel = subConn.CreateModel())
71+
{
72+
string exchange = $"SingleThreaderTest_{Guid.NewGuid()}";
73+
string queueName = $"SingleThreaderTest_{Guid.NewGuid()}";
74+
var subscriber = new AsyncEventingBasicConsumer(subModel);
75+
subscriber.Received += Subscriber_Received;
76+
subModel.ExchangeDeclare(exchange, ExchangeType.Topic, false, true);
77+
subModel.QueueDeclare(queueName, false, false, true);
78+
subModel.QueueBind(queueName, exchange, "");
79+
subModel.BasicConsume(queueName, true, subscriber);
80+
81+
using (IModel model = conn.CreateModel())
82+
{
83+
for (int i = 0; i < messagesToSend; i++)
84+
{
85+
byte[] messagePayload = new byte[_randomizer.Next(64, 512)];
86+
_randomizer.NextBytes(messagePayload);
87+
byte[] hash = _hasher.ComputeHash(messagePayload);
88+
IBasicProperties props = model.CreateBasicProperties();
89+
props.Headers = new Dictionary<string, object>();
90+
props.Headers.Add("hash", hash);
91+
model.BasicPublish(exchange, "", props, messagePayload);
92+
}
93+
}
94+
95+
allMessagesReceived.Wait();
96+
}
97+
}
98+
99+
Task Subscriber_Received(object sender, BasicDeliverEventArgs @event)
100+
{
101+
_hasher = _hasher ?? SHA1.Create();
102+
byte[] hash = @event.BasicProperties.Headers["hash"] as byte[];
103+
Assert.IsTrue(hash.AsSpan().SequenceEqual(_hasher.ComputeHash(@event.Body.ToArray())));
104+
if (messagesToSend == Interlocked.Increment(ref messagesReceived))
105+
{
106+
allMessagesReceived.Set();
107+
}
108+
109+
return Task.CompletedTask;
110+
}
111+
}
112+
113+
114+
115+
[TestCase(100, 2)]
116+
[TestCase(100, 4)]
117+
[TestCase(100, 8)]
118+
[TestCase(100, 16)]
119+
[TestCase(1000, 2)]
120+
[TestCase(1000, 4)]
121+
[TestCase(1000, 8)]
122+
[TestCase(1000, 16)]
123+
[TestCase(10000, 2)]
124+
[TestCase(10000, 4)]
125+
[TestCase(10000, 8)]
126+
[TestCase(10000, 16)]
127+
public void MultiThreaded(int messagesToSend, int threads)
128+
{
129+
Random _randomizer = new Random();
130+
ThreadPool.SetMinThreads(16 * Environment.ProcessorCount, 16 * Environment.ProcessorCount);
131+
ManualResetEventSlim allMessagesReceived = new ManualResetEventSlim();
132+
int messagesReceived = 0;
133+
var connFactory = new ConnectionFactory()
134+
{
135+
RequestedHeartbeat = TimeSpan.FromSeconds(60),
136+
AutomaticRecoveryEnabled = false,
137+
DispatchConsumersAsync = true
138+
};
139+
140+
using (IConnection conn = connFactory.CreateConnection())
141+
using (IConnection subConn = connFactory.CreateConnection())
142+
using (IModel setupModel = conn.CreateModel())
143+
{
144+
string exchange = $"MultiThreadedTest_{Guid.NewGuid()}";
145+
string queueName = $"MultiThreadedTest_{Guid.NewGuid()}";
146+
147+
setupModel.ExchangeDeclare(exchange, ExchangeType.Topic, false, true);
148+
setupModel.QueueDeclare(queueName, false, false, true);
149+
setupModel.QueueBind(queueName, exchange, "");
150+
151+
List<Task> consumerTasks = new List<Task>();
152+
List<Task> publisherTasks = new List<Task>();
153+
154+
for (int i = 0; i < threads; i++)
155+
{
156+
consumerTasks.Add(Task.Run(() =>
157+
{
158+
using (IModel subModel = subConn.CreateModel())
159+
{
160+
var subscriber = new AsyncEventingBasicConsumer(subModel);
161+
subscriber.Received += Subscriber_Received;
162+
subModel.BasicConsume(queueName, true, subscriber);
163+
allMessagesReceived.Wait();
164+
}
165+
}));
166+
167+
publisherTasks.Add(Task.Run(() =>
168+
{
169+
using (IModel model = conn.CreateModel())
170+
{
171+
_hasher = _hasher ?? SHA1.Create();
172+
for (int x = 0; x < messagesToSend; x++)
173+
{
174+
byte[] messagePayload = new byte[_randomizer.Next(64, 512)];
175+
_randomizer.NextBytes(messagePayload);
176+
byte[] hash = _hasher.ComputeHash(messagePayload);
177+
IBasicProperties props = model.CreateBasicProperties();
178+
props.Headers = new Dictionary<string, object>();
179+
props.Headers.Add("hash", hash);
180+
model.BasicPublish(exchange, "", props, messagePayload);
181+
}
182+
}
183+
}));
184+
}
185+
allMessagesReceived.Wait();
186+
}
187+
188+
Task Subscriber_Received(object sender, BasicDeliverEventArgs @event)
189+
{
190+
_hasher = _hasher ?? SHA1.Create();
191+
byte[] hash = @event.BasicProperties.Headers["hash"] as byte[];
192+
Assert.IsTrue(hash.AsSpan().SequenceEqual(_hasher.ComputeHash(@event.Body.ToArray())));
193+
if ((messagesToSend * threads) == Interlocked.Increment(ref messagesReceived))
194+
{
195+
allMessagesReceived.Set();
196+
}
197+
198+
return Task.CompletedTask;
199+
}
200+
}
201+
}
202+
}

0 commit comments

Comments
 (0)