Skip to content

Misc changes from lukebakken/amqp-string #1575

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions build.ps1
Original file line number Diff line number Diff line change
@@ -1,36 +1,45 @@
[CmdletBinding(PositionalBinding=$false)]
param(
[switch]$RunTests
[switch]$RunTests,
[switch]$RunTestsUntilFailure
)

$ErrorActionPreference = 'Stop'
Set-StrictMode -Version Latest
$PSNativeCommandUseErrorActionPreference = $true

Write-Host "Run Parameters:" -ForegroundColor Cyan
Write-Host "`tPSScriptRoot: $PSScriptRoot"
Write-Host "`tRunTests: $RunTests"
Write-Host "`tRunTestsUntilFailure: $RunTestsUntilFailure"
Write-Host "`tdotnet --version: $(dotnet --version)"

Write-Host "[INFO] building all projects (Build.csproj traversal)..." -ForegroundColor "Magenta"
dotnet build "$PSScriptRoot\Build.csproj"
Write-Host "[INFO] done building." -ForegroundColor "Green"

if ($RunTests)
if ($RunTests -or $RunTestsUntilFailure)
{
$tests_dir = Join-Path -Path $PSScriptRoot -ChildPath 'projects' | Join-Path -ChildPath 'Test'
$unit_csproj_file = Resolve-Path -LiteralPath (Join-Path -Path $tests_dir -ChildPath 'Unit' | Join-Path -ChildPath 'Unit.csproj')
$integration_csproj_file = Resolve-Path -LiteralPath (Join-Path -Path $tests_dir -ChildPath 'Integration' | Join-Path -ChildPath 'Integration.csproj')
$sequential_integration_csproj_file = Resolve-Path -LiteralPath (Join-Path -Path $tests_dir -ChildPath 'SequentialIntegration' | Join-Path -ChildPath 'SequentialIntegration.csproj')

foreach ($csproj_file in $unit_csproj_file, $integration_csproj_file, $sequential_integration_csproj_file)
Do
{
Write-Host "[INFO] running Unit / Integration tests from '$csproj_file' (all frameworks)" -ForegroundColor "Magenta"
dotnet test $csproj_file --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' --no-restore --no-build --logger "console;verbosity=detailed"
if ($LASTEXITCODE -ne 0)
{
Write-Host "[ERROR] tests errored, exiting" -Foreground "Red"
Exit 1
}
else
foreach ($csproj_file in $unit_csproj_file, $integration_csproj_file, $sequential_integration_csproj_file)
{
Write-Host "[INFO] tests passed" -ForegroundColor "Green"
Write-Host "[INFO] running Unit / Integration tests from '$csproj_file' (all frameworks)" -ForegroundColor "Magenta"
& dotnet test $csproj_file --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' --no-restore --no-build --logger "console;verbosity=detailed"
if ($LASTEXITCODE -ne 0)
{
Write-Host "[ERROR] tests errored, exiting" -Foreground "Red"
Exit 1
}
else
{
Write-Host "[INFO] tests passed" -ForegroundColor "Green"
}
}
}
} While ($RunTestsUntilFailure)
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public ValueTask HandleBasicConsumeOkAsync(IBasicConsumer consumer, string consu
if (false == _disposed && false == _quiesce)
{
AddConsumer(consumer, consumerTag);
return _writer.WriteAsync(new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag), cancellationToken);
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag);
return _writer.WriteAsync(work, cancellationToken);
}
else
{
Expand All @@ -78,7 +79,8 @@ public ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag,
{
if (false == _disposed && false == _quiesce)
{
var work = new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body);
IBasicConsumer consumer = GetConsumerOrDefault(consumerTag);
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body);
return _writer.WriteAsync(work, cancellationToken);
}
else
Expand All @@ -91,7 +93,9 @@ public ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken
{
if (false == _disposed && false == _quiesce)
{
return _writer.WriteAsync(new WorkStruct(WorkType.CancelOk, GetAndRemoveConsumer(consumerTag), consumerTag), cancellationToken);
IBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag);
return _writer.WriteAsync(work, cancellationToken);
}
else
{
Expand All @@ -103,7 +107,9 @@ public ValueTask HandleBasicCancelAsync(string consumerTag, CancellationToken ca
{
if (false == _disposed && false == _quiesce)
{
return _writer.WriteAsync(new WorkStruct(WorkType.Cancel, GetAndRemoveConsumer(consumerTag), consumerTag), cancellationToken);
IBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag);
return _writer.WriteAsync(work, cancellationToken);
}
else
{
Expand Down Expand Up @@ -226,7 +232,7 @@ await _worker

protected sealed override void ShutdownConsumer(IBasicConsumer consumer, ShutdownEventArgs reason)
{
_writer.TryWrite(new WorkStruct(consumer, reason));
_writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason));
}

protected override void InternalShutdown()
Expand Down Expand Up @@ -258,23 +264,23 @@ protected override Task InternalShutdownAsync()
public readonly ShutdownEventArgs? Reason;
public readonly WorkType WorkType;

public WorkStruct(WorkType type, IBasicConsumer consumer, string consumerTag)
private WorkStruct(WorkType type, IBasicConsumer consumer, string consumerTag)
: this()
{
WorkType = type;
Consumer = consumer;
ConsumerTag = consumerTag;
}

public WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason)
private WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason)
: this()
{
WorkType = WorkType.Shutdown;
Consumer = consumer;
Reason = reason;
}

public WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
private WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, RentedMemory body)
{
WorkType = WorkType.Deliver;
Expand All @@ -289,6 +295,33 @@ public WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag
Reason = default;
}

public static WorkStruct CreateCancel(IBasicConsumer consumer, string consumerTag)
{
return new WorkStruct(WorkType.Cancel, consumer, consumerTag);
}

public static WorkStruct CreateCancelOk(IBasicConsumer consumer, string consumerTag)
{
return new WorkStruct(WorkType.CancelOk, consumer, consumerTag);
}

public static WorkStruct CreateConsumeOk(IBasicConsumer consumer, string consumerTag)
{
return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag);
}

public static WorkStruct CreateShutdown(IBasicConsumer consumer, ShutdownEventArgs reason)
{
return new WorkStruct(consumer, reason);
}

public static WorkStruct CreateDeliver(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, RentedMemory body)
{
return new WorkStruct(consumer, consumerTag, deliveryTag, redelivered,
exchange, routingKey, basicProperties, body);
}

public void Dispose() => Body.Dispose();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void Enqueue(IRpcContinuation k)
IRpcContinuation result = Interlocked.CompareExchange(ref _outstandingRpc, k, s_tmp);
if (!(result is EmptyRpcContinuation))
{
throw new NotSupportedException("Pipelining of requests forbidden");
throw new NotSupportedException($"Pipelining of requests forbidden (attempted: {k.GetType()}, enqueued: {result.GetType()})");
}
}

Expand Down
8 changes: 4 additions & 4 deletions projects/Test/Common/IntegrationFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ protected ConnectionFactory CreateConnectionFactory()

protected void HandleConnectionShutdown(object sender, ShutdownEventArgs args)
{
if (args.Initiator == ShutdownInitiator.Peer)
if (args.Initiator != ShutdownInitiator.Application)
{
IConnection conn = (IConnection)sender;
_output.WriteLine($"{_testDisplayName} connection {conn.ClientProvidedName} shut down: {args}");
Expand All @@ -547,7 +547,7 @@ protected void HandleConnectionShutdown(object sender, ShutdownEventArgs args)

protected void HandleConnectionShutdown(IConnection conn, ShutdownEventArgs args, Action<ShutdownEventArgs> a)
{
if (args.Initiator == ShutdownInitiator.Peer)
if (args.Initiator != ShutdownInitiator.Application)
{
_output.WriteLine($"{_testDisplayName} connection {conn.ClientProvidedName} shut down: {args}");
}
Expand All @@ -556,7 +556,7 @@ protected void HandleConnectionShutdown(IConnection conn, ShutdownEventArgs args

protected void HandleChannelShutdown(object sender, ShutdownEventArgs args)
{
if (args.Initiator == ShutdownInitiator.Peer)
if (args.Initiator != ShutdownInitiator.Application)
{
IChannel ch = (IChannel)sender;
_output.WriteLine($"{_testDisplayName} channel {ch.ChannelNumber} shut down: {args}");
Expand All @@ -565,7 +565,7 @@ protected void HandleChannelShutdown(object sender, ShutdownEventArgs args)

protected void HandleChannelShutdown(IChannel ch, ShutdownEventArgs args, Action<ShutdownEventArgs> a)
{
if (args.Initiator == ShutdownInitiator.Peer)
if (args.Initiator != ShutdownInitiator.Application)
{
_output.WriteLine($"{_testDisplayName} channel {ch.ChannelNumber} shut down: {args}");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2020 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Impl;
using Xunit;
using Xunit.Abstractions;

namespace Test.Integration.ConnectionRecovery.EventHandlerRecovery.Channel
{
public class TestRecoveryEventHandlers : TestConnectionRecoveryBase
{
public TestRecoveryEventHandlers(ITestOutputHelper output) : base(output)
{
}

[Fact]
public async Task TestRecoveryEventHandlers_Called()
{
int counter = 0;
((AutorecoveringChannel)_channel).Recovery += (source, ea) => Interlocked.Increment(ref counter);

await CloseAndWaitForRecoveryAsync();
await CloseAndWaitForRecoveryAsync();
await CloseAndWaitForRecoveryAsync();
await CloseAndWaitForRecoveryAsync();
Assert.True(_channel.IsOpen);
Assert.True(counter >= 3);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2020 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace Test.Integration.ConnectionRecovery.EventHandlerRecovery.Channel
{
public class TestShutdownEventHandlers : TestConnectionRecoveryBase
{
public TestShutdownEventHandlers(ITestOutputHelper output) : base(output)
{
}

[Fact]
public async Task TestShutdownEventHandlersOnChannel_Called()
{
int counter = 0;
_channel.ChannelShutdown += (c, args) => Interlocked.Increment(ref counter);

Assert.True(_channel.IsOpen);
await CloseAndWaitForRecoveryAsync();
await CloseAndWaitForRecoveryAsync();
await CloseAndWaitForRecoveryAsync();
await CloseAndWaitForRecoveryAsync();
Assert.True(_channel.IsOpen);

Assert.True(counter >= 3);
}
}
}
Loading