Skip to content

Commit 3c4d3aa

Browse files
authored
Merge pull request #1264 from stebet/newpipeline
Minimal System.IO.Pipelines integration to prepare for full-async work
2 parents cbe6a8c + 74b6830 commit 3c4d3aa

File tree

12 files changed

+242
-166
lines changed

12 files changed

+242
-166
lines changed

.ci/versions.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
2-
"erlang": "25.2",
3-
"rabbitmq": "3.11.6"
2+
"erlang": "25.2.3",
3+
"rabbitmq": "3.11.9"
44
}

projects/Benchmarks/Benchmarks.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11-
<PackageReference Include="BenchmarkDotNet" Version="0.13.4" />
11+
<PackageReference Include="BenchmarkDotNet" Version="0.13.5" />
1212
<PackageReference Include="Ductus.FluentDocker" Version="2.10.59" />
1313
</ItemGroup>
1414

projects/RabbitMQ.Client/RabbitMQ.Client.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
<PackageReference Include="MinVer" Version="4.3.0" PrivateAssets="all" />
6464
<PackageReference Include="System.Memory" Version="4.5.5" />
6565
<PackageReference Include="System.Threading.Channels" Version="7.0.0" />
66+
<PackageReference Include="System.IO.Pipelines" Version="7.0.0" />
6667
</ItemGroup>
6768

6869
</Project>

projects/RabbitMQ.Client/client/impl/Connection.Receive.cs

Lines changed: 60 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,11 @@ internal sealed partial class Connection
4343
private readonly IFrameHandler _frameHandler;
4444
private readonly Task _mainLoopTask;
4545

46-
private void MainLoop()
46+
private async Task MainLoop()
4747
{
4848
try
4949
{
50-
ReceiveLoop();
50+
await ReceiveLoop().ConfigureAwait(false);
5151
}
5252
catch (EndOfStreamException eose)
5353
{
@@ -56,7 +56,7 @@ private void MainLoop()
5656
}
5757
catch (HardProtocolException hpe)
5858
{
59-
HardProtocolExceptionHandler(hpe);
59+
await HardProtocolExceptionHandler(hpe).ConfigureAwait(false);
6060
}
6161
catch (Exception ex)
6262
{
@@ -66,57 +66,70 @@ private void MainLoop()
6666
FinishClose();
6767
}
6868

69-
private void ReceiveLoop()
69+
private async Task ReceiveLoop()
7070
{
7171
while (!_closed)
7272
{
73-
InboundFrame frame = _frameHandler.ReadFrame();
73+
while (_frameHandler.TryReadFrame(out InboundFrame frame))
74+
{
75+
NotifyHeartbeatListener();
76+
ProcessFrame(frame);
77+
}
78+
79+
// Done reading frames synchronously, go async
80+
InboundFrame asyncFrame = await _frameHandler.ReadFrameAsync()
81+
.ConfigureAwait(false);
7482
NotifyHeartbeatListener();
83+
ProcessFrame(asyncFrame);
84+
}
85+
}
7586

76-
bool shallReturn = true;
77-
if (frame.Channel == 0)
87+
private void ProcessFrame(InboundFrame frame)
88+
{
89+
bool shallReturnPayload = true;
90+
if (frame.Channel == 0)
91+
{
92+
if (frame.Type == FrameType.FrameHeartbeat)
7893
{
79-
if (frame.Type == FrameType.FrameHeartbeat)
80-
{
81-
// Ignore it: we've already just reset the heartbeat
82-
}
83-
else
84-
{
85-
// In theory, we could get non-connection.close-ok
86-
// frames here while we're quiescing (m_closeReason !=
87-
// null). In practice, there's a limited number of
88-
// things the server can ask of us on channel 0 -
89-
// essentially, just connection.close. That, combined
90-
// with the restrictions on pipelining, mean that
91-
// we're OK here to handle channel 0 traffic in a
92-
// quiescing situation, even though technically we
93-
// should be ignoring everything except
94-
// connection.close-ok.
95-
shallReturn = _session0.HandleFrame(in frame);
96-
}
94+
// Ignore it: we've already recently reset the heartbeat
9795
}
9896
else
9997
{
100-
// If we're still m_running, but have a m_closeReason,
101-
// then we must be quiescing, which means any inbound
102-
// frames for non-zero channels (and any inbound
103-
// commands on channel zero that aren't
104-
// Connection.CloseOk) must be discarded.
105-
if (_closeReason is null)
106-
{
107-
// No close reason, not quiescing the
108-
// connection. Handle the frame. (Of course, the
109-
// Session itself may be quiescing this particular
110-
// channel, but that's none of our concern.)
111-
shallReturn = _sessionManager.Lookup(frame.Channel).HandleFrame(in frame);
112-
}
98+
// In theory, we could get non-connection.close-ok
99+
// frames here while we're quiescing (m_closeReason !=
100+
// null). In practice, there's a limited number of
101+
// things the server can ask of us on channel 0 -
102+
// essentially, just connection.close. That, combined
103+
// with the restrictions on pipelining, mean that
104+
// we're OK here to handle channel 0 traffic in a
105+
// quiescing situation, even though technically we
106+
// should be ignoring everything except
107+
// connection.close-ok.
108+
shallReturnPayload = _session0.HandleFrame(in frame);
113109
}
114-
115-
if (shallReturn)
110+
}
111+
else
112+
{
113+
// If we're still m_running, but have a m_closeReason,
114+
// then we must be quiescing, which means any inbound
115+
// frames for non-zero channels (and any inbound
116+
// commands on channel zero that aren't
117+
// Connection.CloseOk) must be discarded.
118+
if (_closeReason is null)
116119
{
117-
frame.ReturnPayload();
120+
// No close reason, not quiescing the
121+
// connection. Handle the frame. (Of course, the
122+
// Session itself may be quiescing this particular
123+
// channel, but that's none of our concern.)
124+
ISession session = _sessionManager.Lookup(frame.Channel);
125+
shallReturnPayload = session.HandleFrame(in frame);
118126
}
119127
}
128+
129+
if (shallReturnPayload)
130+
{
131+
frame.ReturnPayload();
132+
}
120133
}
121134

122135
///<remarks>
@@ -139,7 +152,7 @@ private void HandleMainLoopException(ShutdownEventArgs reason)
139152
LogCloseError($"Unexpected connection closure: {reason}", new Exception(reason.ToString()));
140153
}
141154

142-
private void HardProtocolExceptionHandler(HardProtocolException hpe)
155+
private async Task HardProtocolExceptionHandler(HardProtocolException hpe)
143156
{
144157
if (SetCloseReason(hpe.ShutdownReason))
145158
{
@@ -151,7 +164,8 @@ private void HardProtocolExceptionHandler(HardProtocolException hpe)
151164
_session0.Transmit(in cmd);
152165
if (hpe.CanShutdownCleanly)
153166
{
154-
ClosingLoop();
167+
await ClosingLoop()
168+
.ConfigureAwait(false);
155169
}
156170
}
157171
catch (IOException ioe)
@@ -168,13 +182,14 @@ private void HardProtocolExceptionHandler(HardProtocolException hpe)
168182
///<remarks>
169183
/// Loop only used while quiescing. Use only to cleanly close connection
170184
///</remarks>
171-
private void ClosingLoop()
185+
private async Task ClosingLoop()
172186
{
173187
try
174188
{
175189
_frameHandler.ReadTimeout = TimeSpan.Zero;
176190
// Wait for response/socket closure or timeout
177-
ReceiveLoop();
191+
await ReceiveLoop()
192+
.ConfigureAwait(false);
178193
}
179194
catch (ObjectDisposedException ode)
180195
{

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public Connection(ConnectionConfig config, IFrameHandler frameHandler)
7979
["connection_name"] = ClientProvidedName
8080
};
8181

82-
_mainLoopTask = Task.Factory.StartNew(MainLoop, TaskCreationOptions.LongRunning);
82+
_mainLoopTask = Task.Run(MainLoop);
8383
try
8484
{
8585
Open();

0 commit comments

Comments
 (0)