Skip to content

Commit 80910a7

Browse files
committed
* Finish porting last two tests from SourceFiltersTest.java
1 parent 075c691 commit 80910a7

File tree

1 file changed

+56
-0
lines changed

1 file changed

+56
-0
lines changed

Tests/Consumer/StreamConsumerTests.cs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -901,4 +901,60 @@ public async Task FilterExpressionPropertiesAndApplicationProperties()
901901
Assert.Equal(body3, mb);
902902
}
903903
}
904+
905+
/// <summary>
906+
/// Port of filterExpressionFilterFewMessagesFromManyToTestFlowControl
907+
/// </summary>
908+
[SkippableFact]
909+
public async Task FilterExpressionFilterFewMessagesFromManyToTestFlowControl()
910+
{
911+
Assert.NotNull(_connection);
912+
Assert.NotNull(_management);
913+
SkipIfNotRabbitMQ_4_1_0();
914+
915+
string groupId = RandomString();
916+
917+
IQueueSpecification q = _management.Queue(_queueName).Stream().Queue();
918+
await q.DeclareAsync();
919+
920+
await PublishAsync(q, 1, (_, msg) => msg.GroupId(groupId));
921+
await PublishAsync(q, 1000);
922+
await PublishAsync(q, 1, (_, msg) => msg.GroupId(groupId));
923+
924+
IEnumerable<IMessage> msgs = await ConsumeAsync(2, options => options.GroupId(groupId));
925+
foreach (IMessage m in msgs)
926+
{
927+
Assert.Equal(groupId, m.GroupId());
928+
}
929+
}
930+
931+
/// <summary>
932+
/// Port of filterExpressionStringModifier
933+
/// </summary>
934+
[SkippableFact]
935+
public async Task FilterExpressionStringModifier()
936+
{
937+
Assert.NotNull(_connection);
938+
Assert.NotNull(_management);
939+
SkipIfNotRabbitMQ_4_1_0();
940+
941+
IQueueSpecification q = _management.Queue(_queueName).Stream().Queue();
942+
await q.DeclareAsync();
943+
944+
await PublishAsync(q, 1, (_, msg) => msg.Subject("abc 123"));
945+
await PublishAsync(q, 1, (_, msg) => msg.Subject("foo bar"));
946+
await PublishAsync(q, 1, (_, msg) => msg.Subject("ab 12"));
947+
948+
IEnumerable<IMessage> msgs = await ConsumeAsync(2, options => options.Subject("$p:ab"));
949+
foreach (IMessage m in msgs)
950+
{
951+
Assert.StartsWith("ab", m.Subject());
952+
}
953+
954+
msgs = await ConsumeAsync(1, options => options.Subject("$s:bar"));
955+
foreach (IMessage m in msgs)
956+
{
957+
Assert.Equal("foo bar", m.Subject());
958+
}
959+
}
904960
}

0 commit comments

Comments
 (0)