Skip to content

Minimal System.IO.Pipelines integration to prepare for full-async work #1264

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
49c61dc
Minimal System.IO.Pipelines integration to prepare for full-async work.
stebet Oct 11, 2022
c31b776
Reverting test change.
stebet Oct 13, 2022
5febffa
Adding .ConfigureAwait(false)
stebet Oct 13, 2022
037eba3
Fixing lints.
stebet Oct 13, 2022
339c0d1
Adding sync TryReadFrame method.
stebet Oct 26, 2022
3083f29
Reverting task changes.
stebet Oct 27, 2022
d2b847a
Use System.IO.Pipelines version 7.0.0
lukebakken Feb 7, 2023
7af52f2
Update projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
lukebakken Feb 7, 2023
b8bfc0d
Update projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
lukebakken Feb 7, 2023
1f9d148
PR review by @danielmarbach
lukebakken Feb 7, 2023
bf8c1b5
Update projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
lukebakken Feb 7, 2023
e439df5
Update projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
lukebakken Feb 7, 2023
8f555c7
Apply suggestions from code review
lukebakken Feb 7, 2023
4d8fff1
No need to copy data around here. Add a comment explaining the purpos…
lukebakken Feb 8, 2023
bcde29e
Simplify writer code, and throw an exception if the unthinkable happens
lukebakken Feb 8, 2023
65b7a23
Add static method to check for end of stream and throw exception if n…
lukebakken Feb 8, 2023
9c79e7f
In the case of an exception, clear the rented array.
lukebakken Feb 8, 2023
0a49ae8
Preserve frame end marker for the exception prior to returning the bu…
lukebakken Feb 8, 2023
58b2d52
Bump rabbitmq version on Windows
lukebakken Feb 8, 2023
390d503
AsStream() is very necessary since it must flush the data
lukebakken Feb 8, 2023
06ef4ec
Do not try to write when the frame handler is closed
lukebakken Feb 8, 2023
0a9e405
Complete pipes when closing
lukebakken Mar 1, 2023
74b6830
Update package versions, update RMQ and Erlang version for Windows te…
lukebakken Mar 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .ci/versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"erlang": "25.2",
"rabbitmq": "3.11.6"
"erlang": "25.2.3",
"rabbitmq": "3.11.9"
}
2 changes: 1 addition & 1 deletion projects/Benchmarks/Benchmarks.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.4" />
<PackageReference Include="BenchmarkDotNet" Version="0.13.5" />
<PackageReference Include="Ductus.FluentDocker" Version="2.10.59" />
</ItemGroup>

Expand Down
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
<PackageReference Include="MinVer" Version="4.3.0" PrivateAssets="all" />
<PackageReference Include="System.Memory" Version="4.5.5" />
<PackageReference Include="System.Threading.Channels" Version="7.0.0" />
<PackageReference Include="System.IO.Pipelines" Version="7.0.0" />
</ItemGroup>

</Project>
105 changes: 60 additions & 45 deletions projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ internal sealed partial class Connection
private readonly IFrameHandler _frameHandler;
private readonly Task _mainLoopTask;

private void MainLoop()
private async Task MainLoop()
{
try
{
ReceiveLoop();
await ReceiveLoop().ConfigureAwait(false);
}
catch (EndOfStreamException eose)
{
Expand All @@ -56,7 +56,7 @@ private void MainLoop()
}
catch (HardProtocolException hpe)
{
HardProtocolExceptionHandler(hpe);
await HardProtocolExceptionHandler(hpe).ConfigureAwait(false);
}
catch (Exception ex)
{
Expand All @@ -66,57 +66,70 @@ private void MainLoop()
FinishClose();
}

private void ReceiveLoop()
private async Task ReceiveLoop()
{
while (!_closed)
{
InboundFrame frame = _frameHandler.ReadFrame();
while (_frameHandler.TryReadFrame(out InboundFrame frame))
{
NotifyHeartbeatListener();
ProcessFrame(frame);
}

// Done reading frames synchronously, go async
InboundFrame asyncFrame = await _frameHandler.ReadFrameAsync()
.ConfigureAwait(false);
NotifyHeartbeatListener();
ProcessFrame(asyncFrame);
}
}

bool shallReturn = true;
if (frame.Channel == 0)
private void ProcessFrame(InboundFrame frame)
{
bool shallReturnPayload = true;
if (frame.Channel == 0)
{
if (frame.Type == FrameType.FrameHeartbeat)
{
if (frame.Type == FrameType.FrameHeartbeat)
{
// Ignore it: we've already just reset the heartbeat
}
else
{
// In theory, we could get non-connection.close-ok
// frames here while we're quiescing (m_closeReason !=
// null). In practice, there's a limited number of
// things the server can ask of us on channel 0 -
// essentially, just connection.close. That, combined
// with the restrictions on pipelining, mean that
// we're OK here to handle channel 0 traffic in a
// quiescing situation, even though technically we
// should be ignoring everything except
// connection.close-ok.
shallReturn = _session0.HandleFrame(in frame);
}
// Ignore it: we've already recently reset the heartbeat
}
else
{
// If we're still m_running, but have a m_closeReason,
// then we must be quiescing, which means any inbound
// frames for non-zero channels (and any inbound
// commands on channel zero that aren't
// Connection.CloseOk) must be discarded.
if (_closeReason is null)
{
// No close reason, not quiescing the
// connection. Handle the frame. (Of course, the
// Session itself may be quiescing this particular
// channel, but that's none of our concern.)
shallReturn = _sessionManager.Lookup(frame.Channel).HandleFrame(in frame);
}
// In theory, we could get non-connection.close-ok
// frames here while we're quiescing (m_closeReason !=
// null). In practice, there's a limited number of
// things the server can ask of us on channel 0 -
// essentially, just connection.close. That, combined
// with the restrictions on pipelining, mean that
// we're OK here to handle channel 0 traffic in a
// quiescing situation, even though technically we
// should be ignoring everything except
// connection.close-ok.
shallReturnPayload = _session0.HandleFrame(in frame);
}

if (shallReturn)
}
else
{
// If we're still m_running, but have a m_closeReason,
// then we must be quiescing, which means any inbound
// frames for non-zero channels (and any inbound
// commands on channel zero that aren't
// Connection.CloseOk) must be discarded.
if (_closeReason is null)
{
frame.ReturnPayload();
// No close reason, not quiescing the
// connection. Handle the frame. (Of course, the
// Session itself may be quiescing this particular
// channel, but that's none of our concern.)
ISession session = _sessionManager.Lookup(frame.Channel);
shallReturnPayload = session.HandleFrame(in frame);
}
}

if (shallReturnPayload)
{
frame.ReturnPayload();
}
}

///<remarks>
Expand All @@ -139,7 +152,7 @@ private void HandleMainLoopException(ShutdownEventArgs reason)
LogCloseError($"Unexpected connection closure: {reason}", new Exception(reason.ToString()));
}

private void HardProtocolExceptionHandler(HardProtocolException hpe)
private async Task HardProtocolExceptionHandler(HardProtocolException hpe)
{
if (SetCloseReason(hpe.ShutdownReason))
{
Expand All @@ -151,7 +164,8 @@ private void HardProtocolExceptionHandler(HardProtocolException hpe)
_session0.Transmit(in cmd);
if (hpe.CanShutdownCleanly)
{
ClosingLoop();
await ClosingLoop()
.ConfigureAwait(false);
}
}
catch (IOException ioe)
Expand All @@ -168,13 +182,14 @@ private void HardProtocolExceptionHandler(HardProtocolException hpe)
///<remarks>
/// Loop only used while quiescing. Use only to cleanly close connection
///</remarks>
private void ClosingLoop()
private async Task ClosingLoop()
{
try
{
_frameHandler.ReadTimeout = TimeSpan.Zero;
// Wait for response/socket closure or timeout
ReceiveLoop();
await ReceiveLoop()
.ConfigureAwait(false);
}
catch (ObjectDisposedException ode)
{
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public Connection(ConnectionConfig config, IFrameHandler frameHandler)
["connection_name"] = ClientProvidedName
};

_mainLoopTask = Task.Factory.StartNew(MainLoop, TaskCreationOptions.LongRunning);
_mainLoopTask = Task.Run(MainLoop);
try
{
Open();
Expand Down
Loading