Skip to content

Commit 832b6c1

Browse files
authored
Merge pull request #1662 from rabbitmq/lukebakken/confirms
Fix `NextPublishSeqNo` when retrieved concurrently
2 parents f63c9c8 + a583dd1 commit 832b6c1

File tree

10 files changed

+418
-81
lines changed

10 files changed

+418
-81
lines changed

.github/workflows/build-test.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ jobs:
6767
id: install-start-rabbitmq
6868
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
6969
- name: Integration Tests
70-
timeout-minutes: 15
70+
timeout-minutes: 25
7171
run: |
7272
$tx = Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" }; `
7373
Start-Sleep -Seconds 1; `
@@ -115,7 +115,7 @@ jobs:
115115
id: install-start-rabbitmq
116116
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
117117
- name: Sequential Integration Tests
118-
timeout-minutes: 15
118+
timeout-minutes: 25
119119
run: dotnet test `
120120
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' `
121121
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `

Build.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
<ProjectReference Include="projects/Test/Common/Common.csproj" />
1515
<ProjectReference Include="projects/Test/Applications/CreateChannel/CreateChannel.csproj" />
1616
<ProjectReference Include="projects/Test/Applications/MassPublish/MassPublish.csproj" />
17+
<ProjectReference Include="projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj" />
1718
<ProjectReference Include="projects/Test/Integration/Integration.csproj" />
1819
<ProjectReference Include="projects/Test/SequentialIntegration/SequentialIntegration.csproj" />
1920
<ProjectReference Include="projects/Test/Unit/Unit.csproj" />

RabbitMQDotNetClient.sln

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "project
4242
EndProject
4343
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}"
4444
EndProject
45-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GH-1647", "projects\Test\Applications\GH-1647\GH-1647.csproj", "{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}"
45+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1647", "projects\Test\Applications\GH-1647\GH-1647.csproj", "{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}"
46+
EndProject
47+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherConfirms", "projects\Test\Applications\PublisherConfirms\PublisherConfirms.csproj", "{13149F73-2CDB-4ECF-BF2C-403860045751}"
4648
EndProject
4749
Global
4850
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -102,6 +104,10 @@ Global
102104
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Debug|Any CPU.Build.0 = Debug|Any CPU
103105
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.ActiveCfg = Release|Any CPU
104106
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.Build.0 = Release|Any CPU
107+
{13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
108+
{13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.Build.0 = Debug|Any CPU
109+
{13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.ActiveCfg = Release|Any CPU
110+
{13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.Build.0 = Release|Any CPU
105111
EndGlobalSection
106112
GlobalSection(SolutionProperties) = preSolution
107113
HideSolutionNode = FALSE
@@ -117,6 +123,7 @@ Global
117123
{C11F25F4-7EA1-4874-9E25-DEB42E3A7C67} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
118124
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
119125
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
126+
{13149F73-2CDB-4ECF-BF2C-403860045751} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
120127
EndGlobalSection
121128
GlobalSection(ExtensibilityGlobals) = postSolution
122129
SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1}

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
813813
~RabbitMQ.Client.IChannel.BasicRejectAsync(ulong deliveryTag, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
814814
~RabbitMQ.Client.IChannel.CloseAsync(RabbitMQ.Client.ShutdownEventArgs reason, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
815815
~RabbitMQ.Client.IChannel.CloseAsync(ushort replyCode, string replyText, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
816-
~RabbitMQ.Client.IChannel.ConfirmSelectAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
817816
~RabbitMQ.Client.IChannel.ConsumerCountAsync(string queue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<uint>
818817
~RabbitMQ.Client.IChannel.ExchangeBindAsync(string destination, string source, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
819818
~RabbitMQ.Client.IChannel.ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, System.Collections.Generic.IDictionary<string, object> arguments = null, bool passive = false, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
@@ -897,3 +896,4 @@ RabbitMQ.Client.ICredentialsProvider.GetCredentialsAsync(System.Threading.Cancel
897896
RabbitMQ.Client.ICredentialsProvider.Name.get -> string!
898897
RabbitMQ.Client.PlainMechanism.HandleChallengeAsync(byte[]? challenge, RabbitMQ.Client.ConnectionConfig! config, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<byte[]!>!
899898
readonly RabbitMQ.Client.ConnectionConfig.CredentialsProvider -> RabbitMQ.Client.ICredentialsProvider!
899+
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!

projects/RabbitMQ.Client/client/api/IChannel.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,13 @@ Task CloseAsync(ushort replyCode, string replyText, bool abort,
265265
Task CloseAsync(ShutdownEventArgs reason, bool abort,
266266
CancellationToken cancellationToken = default);
267267

268-
/// <summary>Asynchronously enable publisher confirmations.</summary>
268+
/// <summary>
269+
/// Asynchronously enable publisher confirmations.
270+
/// </summary>
271+
/// <param name="trackConfirmations">Set to <c>false</c> if tracking via <see cref="BasicAcks"/> and <see cref="BasicNacks"/> yourself.</param>
269272
/// <param name="cancellationToken">CancellationToken for this operation.</param>
270-
Task ConfirmSelectAsync(CancellationToken cancellationToken = default);
273+
Task ConfirmSelectAsync(bool trackConfirmations = true,
274+
CancellationToken cancellationToken = default);
271275

272276
/// <summary>Asynchronously declare an exchange.</summary>
273277
/// <param name="exchange">The name of the exchange.</param>

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
5050
private ushort _prefetchCountConsumer;
5151
private ushort _prefetchCountGlobal;
5252
private bool _usesPublisherConfirms;
53+
private bool _tracksPublisherConfirmations;
5354
private bool _usesTransactions;
5455

5556
internal IConsumerDispatcher ConsumerDispatcher => InnerChannel.ConsumerDispatcher;
@@ -177,7 +178,7 @@ await newChannel.BasicQosAsync(0, _prefetchCountGlobal, true, cancellationToken)
177178

178179
if (_usesPublisherConfirms)
179180
{
180-
await newChannel.ConfirmSelectAsync(cancellationToken)
181+
await newChannel.ConfirmSelectAsync(_tracksPublisherConfirmations, cancellationToken)
181182
.ConfigureAwait(false);
182183
}
183184

@@ -334,11 +335,12 @@ public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global,
334335
return _innerChannel.BasicQosAsync(prefetchSize, prefetchCount, global, cancellationToken);
335336
}
336337

337-
public async Task ConfirmSelectAsync(CancellationToken cancellationToken)
338+
public async Task ConfirmSelectAsync(bool trackConfirmations = true, CancellationToken cancellationToken = default)
338339
{
339-
await InnerChannel.ConfirmSelectAsync(cancellationToken)
340+
await InnerChannel.ConfirmSelectAsync(trackConfirmations, cancellationToken)
340341
.ConfigureAwait(false);
341342
_usesPublisherConfirms = true;
343+
_tracksPublisherConfirmations = trackConfirmations;
342344
}
343345

344346
public async Task ExchangeBindAsync(string destination, string source, string routingKey,

0 commit comments

Comments
 (0)