Skip to content

Commit 317945c

Browse files
authored
Merge pull request #1681 from danielmarbach/reset-event
Async flow control
2 parents fd36d23 + a15c1f8 commit 317945c

File tree

5 files changed

+226
-51
lines changed

5 files changed

+226
-51
lines changed

projects/Directory.Packages.props

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
<PackageVersion Include="System.Threading.Channels" Version="6.0.0" />
3535
<PackageVersion Include="System.Text.Json" Version="6.0.0" />
3636
<PackageVersion Include="System.Net.Http.Json" Version="6.0.0" />
37+
<PackageVersion Include="Microsoft.Bcl.AsyncInterfaces" Version="6.0.0" />
3738
</ItemGroup>
3839
<ItemGroup Condition="$(TargetFramework)=='net472'">
3940
<PackageVersion Include="System.Text.Json" Version="6.0.0" />
@@ -46,4 +47,4 @@
4647
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
4748
<GlobalPackageReference Include="MinVer" Version="5.0.0" />
4849
</ItemGroup>
49-
</Project>
50+
</Project>

projects/RabbitMQ.Client/RabbitMQ.Client.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,5 +77,6 @@
7777
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
7878
<PackageReference Include="System.Memory" />
7979
<PackageReference Include="System.Threading.Channels" />
80+
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
8081
</ItemGroup>
8182
</Project>

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,21 @@ public override ValueTask BasicAckAsync(ulong deliveryTag, bool multiple,
4747
CancellationToken cancellationToken)
4848
{
4949
var method = new BasicAck(deliveryTag, multiple);
50-
return ModelSendAsync(method, cancellationToken);
50+
return ModelSendAsync(in method, cancellationToken);
5151
}
5252

5353
public override ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue,
5454
CancellationToken cancellationToken)
5555
{
5656
var method = new BasicNack(deliveryTag, multiple, requeue);
57-
return ModelSendAsync(method, cancellationToken);
57+
return ModelSendAsync(in method, cancellationToken);
5858
}
5959

6060
public override ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue,
6161
CancellationToken cancellationToken)
6262
{
6363
var method = new BasicReject(deliveryTag, requeue);
64-
return ModelSendAsync(method, cancellationToken);
64+
return ModelSendAsync(in method, cancellationToken);
6565
}
6666

6767
/// <summary>
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
using System.Diagnostics.CodeAnalysis;
34+
using System.Threading;
35+
using System.Threading.Tasks;
36+
using System.Threading.Tasks.Sources;
37+
38+
namespace RabbitMQ.Client.client.impl
39+
{
40+
sealed class AsyncManualResetEvent : IValueTaskSource
41+
{
42+
private ManualResetValueTaskSourceCore<bool> _valueTaskSource;
43+
private bool _isSet;
44+
45+
public AsyncManualResetEvent(bool initialState = false)
46+
{
47+
_isSet = initialState;
48+
_valueTaskSource.Reset();
49+
if (initialState)
50+
{
51+
_valueTaskSource.SetResult(true);
52+
}
53+
}
54+
55+
public bool IsSet => Volatile.Read(ref _isSet);
56+
57+
public async ValueTask WaitAsync(CancellationToken cancellationToken)
58+
{
59+
if (IsSet)
60+
{
61+
return;
62+
}
63+
64+
cancellationToken.ThrowIfCancellationRequested();
65+
66+
CancellationTokenRegistration tokenRegistration =
67+
#if NET6_0_OR_GREATER
68+
cancellationToken.UnsafeRegister(
69+
static state =>
70+
{
71+
var (source, token) = ((ManualResetValueTaskSourceCore<bool>, CancellationToken))state!;
72+
source.SetException(new OperationCanceledException(token));
73+
}, (_valueTaskSource, cancellationToken));
74+
#else
75+
cancellationToken.Register(
76+
static state =>
77+
{
78+
var (source, token) = ((ManualResetValueTaskSourceCore<bool>, CancellationToken))state!;
79+
source.SetException(new OperationCanceledException(token));
80+
},
81+
state: (_valueTaskSource, cancellationToken), useSynchronizationContext: false);
82+
#endif
83+
try
84+
{
85+
await new ValueTask(this, _valueTaskSource.Version)
86+
.ConfigureAwait(false);
87+
}
88+
finally
89+
{
90+
#if NET6_0_OR_GREATER
91+
await tokenRegistration.DisposeAsync()
92+
.ConfigureAwait(false);
93+
#else
94+
tokenRegistration.Dispose();
95+
#endif
96+
}
97+
}
98+
99+
public void Set()
100+
{
101+
if (IsSet)
102+
{
103+
return;
104+
}
105+
106+
Volatile.Write(ref _isSet, true);
107+
_valueTaskSource.SetResult(true);
108+
}
109+
110+
public void Reset()
111+
{
112+
if (!IsSet)
113+
{
114+
return;
115+
}
116+
117+
Volatile.Write(ref _isSet, false);
118+
_valueTaskSource.Reset();
119+
}
120+
121+
void IValueTaskSource.GetResult(short token)
122+
{
123+
if (token != _valueTaskSource.Version)
124+
{
125+
ThrowIncorrectTokenException();
126+
}
127+
128+
_valueTaskSource.GetResult(token);
129+
}
130+
131+
ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
132+
{
133+
if (token != _valueTaskSource.Version)
134+
{
135+
ThrowIncorrectTokenException();
136+
}
137+
138+
return _valueTaskSource.GetStatus(token);
139+
}
140+
141+
void IValueTaskSource.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
142+
{
143+
if (token != _valueTaskSource.Version)
144+
{
145+
ThrowIncorrectTokenException();
146+
}
147+
148+
_valueTaskSource.OnCompleted(continuation, state, token, flags);
149+
}
150+
151+
[DoesNotReturn]
152+
static void ThrowIncorrectTokenException() =>
153+
throw new InvalidOperationException("ValueTask cannot be awaited multiple times.");
154+
}
155+
}

0 commit comments

Comments
 (0)