Skip to content

Commit bcba9e4

Browse files
authored
Merge pull request #1753 from rabbitmq/rabbitmq-dotnet-client-1750
Handle `OperationCanceledException` in RPC continuations
2 parents db37681 + cc16b10 commit bcba9e4

File tree

7 files changed

+351
-172
lines changed

7 files changed

+351
-172
lines changed

.github/workflows/build-test.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ jobs:
6767
id: install-start-rabbitmq
6868
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
6969
- name: Integration Tests
70-
timeout-minutes: 25
70+
timeout-minutes: 45
7171
run: |
7272
Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" | Out-File -LiteralPath $env:APPDATA\RabbitMQ\log\toxiproxy-log.txt }; `
7373
dotnet test `
@@ -113,7 +113,7 @@ jobs:
113113
id: install-start-rabbitmq
114114
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
115115
- name: Sequential Integration Tests
116-
timeout-minutes: 25
116+
timeout-minutes: 45
117117
run: dotnet test `
118118
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' `
119119
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `

projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -83,61 +83,65 @@ internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)
8383

8484
public ushort Concurrency => _concurrency;
8585

86-
public ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
86+
public async ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
8787
{
88+
cancellationToken.ThrowIfCancellationRequested();
89+
8890
if (false == _disposed && false == _quiesce)
8991
{
90-
AddConsumer(consumer, consumerTag);
91-
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag);
92-
return _writer.WriteAsync(work, cancellationToken);
93-
}
94-
else
95-
{
96-
return default;
92+
try
93+
{
94+
AddConsumer(consumer, consumerTag);
95+
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag);
96+
await _writer.WriteAsync(work, cancellationToken)
97+
.ConfigureAwait(false);
98+
}
99+
catch
100+
{
101+
_ = GetAndRemoveConsumer(consumerTag);
102+
throw;
103+
}
97104
}
98105
}
99106

100-
public ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered,
107+
public async ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered,
101108
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body,
102109
CancellationToken cancellationToken)
103110
{
111+
cancellationToken.ThrowIfCancellationRequested();
112+
104113
if (false == _disposed && false == _quiesce)
105114
{
106115
IAsyncBasicConsumer consumer = GetConsumerOrDefault(consumerTag);
107116
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body);
108-
return _writer.WriteAsync(work, cancellationToken);
109-
}
110-
else
111-
{
112-
return default;
117+
await _writer.WriteAsync(work, cancellationToken)
118+
.ConfigureAwait(false);
113119
}
114120
}
115121

116-
public ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken)
122+
public async ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken)
117123
{
124+
cancellationToken.ThrowIfCancellationRequested();
125+
118126
if (false == _disposed && false == _quiesce)
119127
{
120128
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
121129
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag);
122-
return _writer.WriteAsync(work, cancellationToken);
123-
}
124-
else
125-
{
126-
return default;
130+
await _writer.WriteAsync(work, cancellationToken)
131+
.ConfigureAwait(false);
127132
}
128133
}
129134

130-
public ValueTask HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken)
135+
public async ValueTask HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken)
131136
{
137+
cancellationToken.ThrowIfCancellationRequested();
138+
132139
if (false == _disposed && false == _quiesce)
133140
{
134141
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
135142
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag);
136-
return _writer.WriteAsync(work, cancellationToken);
137-
}
138-
else
139-
{
140-
return default;
143+
await _writer.WriteAsync(work, cancellationToken)
144+
.ConfigureAwait(false);
141145
}
142146
}
143147

0 commit comments

Comments
 (0)