-
Notifications
You must be signed in to change notification settings - Fork 606
Reduce lock contention for frame writes #350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Details discussed on Google Groups https://groups.google.com/forum/?nomobile=true#!topic/rabbitmq-users/OyoYTkAyCSk
Updated code to remove lock contention
} | ||
} | ||
m_reader = new NetworkBinaryReader(new BufferedStream(netstream, m_socket.Client.ReceiveBufferSize)); | ||
m_writer = new NetworkBinaryWriter(netstream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed use of bufferedstream on the write side.
Information was being written to the stream and flushed immediately.
The change to chunk information onto the stream means we don't have to buffer.
Since we don't have to buffer, we also don't need to lock the flush.
The write doesn't need to be locked when done as a chunk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello and thank you for this PR. I have made suggestions with regard to coding style and whitespace. In addition, several files show a large number of diffs via the GitHub web UI so I'm wondering if there are newline issues.
@@ -994,7 +994,7 @@ protected void RecoverQueues() | |||
// anything to recover. MK. | |||
PropagateQueueNameChangeToBindings(oldName, newName); | |||
PropagateQueueNameChangeToConsumers(oldName, newName); | |||
// see rabbitmq/rabbitmq-dotnet-client#43 | |||
// see RabbitMQ/RabbitMQ-dotnet-client#43 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert this change, thanks!
m_body0 = body; | ||
m_bodyN = null; | ||
Header = header; | ||
if (body != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use curly braces for single-statement conditionals. That appears to be the style in this codebase.
} | ||
m_bodyN.Add(fragment); | ||
} | ||
if(fragment !=null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use a space after if
as well as curly braces.
@@ -363,7 +363,7 @@ IProtocol IConnection.Protocol | |||
table["platform"] = Encoding.UTF8.GetBytes(".NET"); | |||
table["copyright"] = Encoding.UTF8.GetBytes("Copyright (c) 2007-2016 Pivotal Software, Inc."); | |||
table["information"] = Encoding.UTF8.GetBytes("Licensed under the MPL. " + | |||
"See http://www.rabbitmq.com/"); | |||
"See http://www.RabbitMQ.com/"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert this change.
@@ -1258,7 +1258,7 @@ void IDisposable.Dispose() | |||
} | |||
catch (OperationInterruptedException) | |||
{ | |||
// ignored, see rabbitmq/rabbitmq-dotnet-client#133 | |||
// ignored, see RabbitMQ/RabbitMQ-dotnet-client#133 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert this change.
|
||
namespace RabbitMQ.Client.Impl | ||
{ | ||
public class HeaderWriteFrame : WriteFrame { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please open the curly brace on a new line.
writer.Write((ushort)header.ProtocolClassId); | ||
header.WriteTo(writer, (ulong)bodyLength); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please include a blank line here and in similar places.
} | ||
} | ||
} | ||
// This source code is dual-licensed under the Apache License, version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the diff viewer showing a difference here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The delta between old and new has too many changes to allow the compare tool to provide a true comparison, the frame class has been extended with many frame super classes.
Also the base frame class data is now immutable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually easy to see what the difference is:
git log -p -- projects/client/RabbitMQ.Client/src/client/impl/Frame.cs
Existing line endings are replaced by Windows ones.
} | ||
} | ||
} | ||
// This source code is dual-licensed under the Apache License, version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file should not be showing this many differences. Is there an issue with newlines?
@YulerB I wanted to take a look at this PR, but there appear to be a bunch of changes that are unrelated to fixing lock contention. For example, lots of things have had their accessibility changed, which is a breaking change. Since this project is following SemVer now, that would mean this would require a 6.0 release of client, and I know the maintainers aren't going to be interested in doing that any time soon. |
@bording is right. We can accept minor breaking changes for a It would also be interesting to see some benchmarks and other metrics that demonstrate the effect of this change. |
nbw.Write((byte)Endpoint.Protocol.MajorVersion); | ||
nbw.Write((byte)Endpoint.Protocol.MinorVersion); | ||
} | ||
m_writer.Write(ms.ToArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Entire frame is written as one unit to the stream, thus removing the need to worry about frame interleaving.
var nbw = new NetworkBinaryWriter(ms); | ||
frame.WriteTo(nbw); | ||
m_socket.Client.Poll(m_writeableStateTimeout, SelectMode.SelectWrite); | ||
m_writer.Write(ms.ToArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Entire frame is written as one unit to the stream, thus removing the need to worry about frame interleaving.
var nbw = new NetworkBinaryWriter(ms); | ||
foreach (var f in frames) f.WriteTo(nbw); | ||
m_socket.Client.Poll(m_writeableStateTimeout, SelectMode.SelectWrite); | ||
m_writer.Write(ms.ToArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Entire frame is written as one unit to the stream, thus removing the need to worry about frame interleaving.
|
||
public void WriteFrameSet(IList<Frame> frames) | ||
{ | ||
lock (m_writer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this lock
Frame interleaving is fixed by writing the entire frame as one unit.
The lock for the flush is not required when not using BufferedStream.
|
||
public void Flush() | ||
{ | ||
lock (m_writer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this lock and flush, since we don't require flush when not using a BufferedStream.
|
||
public void SendHeader() | ||
{ | ||
lock (m_writer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this lock and flush, since we don't require flush when not using a BufferedStream and we don't require the lock on write because we will put the full frame on the stream as one unit.
} | ||
// We used to transmit *inside* the lock to avoid interleaving | ||
// of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock. | ||
cmd.Transmit(ChannelNumber, Connection); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the cause of most of the lock contention, took the command transmittion out of the lock since the issue with frame interleaving is mitigated downstream by writing the entire frame to the stream as one unit.
Updating to configure await on connectasync as per trunk
Made some changes, can we check again if there are breaking changes? |
I tried to change the Frame.cs file, can you check the line endings on that file please. If it will be ok, then I can fix others too |
@vendre21 accorindg to |
Thank you @michaelklishin. I think I could fix the line endings in both files this time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it correct to say that the actual changes here should be separated from the beautifying of the code? It feels to me these are actually two PRs. One that is removing the lock contention and one that is introducing ReadFrame, WriteFrame and all the extension methods is IsMethod
and so on. Or am I looking at it from the wrong angle?
@@ -164,54 +134,27 @@ public void Transmit(int channelNumber, Connection connection) | |||
} | |||
} | |||
|
|||
public void TransmitAsSingleFrame(int channelNumber, Connection connection) | |||
private void TransmitAsSingleFrame(int channelNumber, Connection connection) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be a public API breaking change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is never called by end user apps or libraries that depend on this client. See #350 (comment). So we can undo it but if someone's code depends on this, they are doing something really risky to begin with.
} | ||
|
||
public void TransmitAsFrameSet(int channelNumber, Connection connection) | ||
private void TransmitAsFrameSet(int channelNumber, Connection connection) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be a public API breaking change
@@ -273,7 +273,7 @@ public ushort ChannelMax | |||
public IDictionary<string, object> ClientProperties | |||
{ | |||
get { return m_clientProperties; } | |||
set { m_clientProperties = value; } | |||
private set { m_clientProperties = value; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be a public API breaking change
@@ -339,7 +339,7 @@ public int RemotePort | |||
get { return m_frameHandler.RemotePort; } | |||
} | |||
|
|||
public IDictionary<string, object> ServerProperties { get; set; } | |||
public IDictionary<string, object> ServerProperties { get; private set; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be a public API breaking change
@@ -465,7 +465,7 @@ public void Close(ShutdownEventArgs reason, bool abort, int timeout) | |||
///<remarks> | |||
/// Loop only used while quiescing. Use only to cleanly close connection | |||
///</remarks> | |||
public void ClosingLoop() | |||
private void ClosingLoop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be a public API breaking change
@@ -533,7 +533,7 @@ public void EnsureIsOpen() | |||
} | |||
|
|||
// Only call at the end of the Mainloop or HeartbeatLoop | |||
public void FinishClose() | |||
private void FinishClose() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be a public API breaking change
@@ -561,13 +561,13 @@ public void HandleApplicationSuspend(object sender, SuspendingEventArgs suspendi | |||
/// We need to close the socket, otherwise attempting to unload the domain | |||
/// could cause a CannotUnloadAppDomainException | |||
/// </remarks> | |||
public void HandleDomainUnload(object sender, EventArgs ea) | |||
private void HandleDomainUnload(object sender, EventArgs ea) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be a public API breaking change as well as following
@@ -32,7 +32,7 @@ public TcpClientAdapter(Socket socket) | |||
throw new ArgumentException("No ip address could be resolved for " + host); | |||
} | |||
#if CORECLR | |||
await sock.ConnectAsync(ep, port).ConfigureAwait(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConfigureAwait is required
Since we cannot make changes on the main branch, we can only make changes from a fork. I like the split of the responsibilities for generating frames and reading frames. I believe @michaelklishin wants to work on this taking the best out of the PR and adding to the trunk. We are considering other PR's, I'll need the rest of my team to create forks though, since looks like GitHub only allows one per user, may be wrong, we are new to github. We have a change to the AsyncConsumerWorkService we would also like to include, but this is to simplify some code and to also ensure there are no locking issues in the current code (A little clue, BlockingCollection). I believe @vendre21 will submit soon. We also see allot of casting in the application, and have extensions to the networkbinaryreader/networkbinarywriter to avoid doing casts. We also see allot of stuff like (byte) 'S' which really should be a constant byte S = 68;, rather than 2 opcodes to allocate and cast each method call, for example WireFormatter. |
I'd like to add to my comment above: this PR refactors an area that is not used by end user apps or other libraries. Besides some naming suggestions, I'm personally happy with them to be in the same PR as it makes the intent of the change clearer. I would also like to avoid dragging the PR for too long for non-essential issues (such as the unfortunate line endings change) as I've been on the contributor side of the table and it can be really frustrating. Therefore if @bording @danielmarbach and @lukebakken don't see any flaws in the frame serialization approach we have here, I think we should focus on testing this version against more workloads and projects (such as NServiceBus). All other changes mentioned by @YulerB (e.g. switching to constants) are definitely welcome in separate PRs (and perhaps they should be submitted after we are out of the woods with this one, as I suspect they will be building on what's already changed here). @YulerB any user can submit any number of PRs for a repo (that's why some recommend splitting this one into two). It is generally a good idea to submit as few changes as possible at a time, this way |
@YulerB - Please disregard the comment I made earlier, I was looking at a stale version of your PR.
You can create an organiziation to which to make your fork, and then add your team to that organization. As an alternative, one person can fork this repo and add collaborators via the fork's settings ( Thanks!! |
The diff is much smaller now, thank you, @YulerB. Some of the unrelated eg. member visibility changes are not ideal from PR hygiene perspective but I can see why they were done: this client has a complex history and a fair amount of curious (as far as C# coding style goes) decisions that go back all the way to 2007. So it's attracting drive-by changes. |
I QA'ed it yesterday and so far it looks promising. I'm going to look into review our concurrent publishing tests and see if we may need to add a couple more. @bording @danielmarbach it would be great to have a review and some NServiceBus-specific testing from you folks :) |
I'm not an expert when it comes to buffered streams. Based on the limited knowledge I have around it I know they are overrated when it comes to FileStreams since the file stream itself has already built in buffering. I don't know about the network streams. That is where my knowledge ends. I do think though this has to be taken into account and at least verified. Since what might look like an optimization for small and high number of messages could turn around as a major performance drawback when considering larger message payloads if the BufferedStream was purposefully introduced for those scenarios. If that scenario can be rules out I'd say it is worth proceeding with the reduction of the lock contention since we know based on the history in this client how drastic this can improve performance. |
In preparation for #350.
I'm adding concurrent connection access tests in preparation for further QA of this pull request. |
@YulerB Please sign the Contributor License Agreement! Click here to manually synchronize the status of this Pull Request. See the FAQ for frequently asked questions. |
Can you also add payload tests with several payload sizes?
|
I pushed this branch together with some extra tests in master as #354. Let's continue there. |
@danielmarbach good idea. I will do that on the #354 branch. |
@YulerB Thank you for signing the Contributor License Agreement! |
when transmitting multiple frames at once. While changes in #350, #354 are supposed to be safe, there's some evidence (#681) that sometimes they are not which leads to incorrect byte stream interleaving on the wire. Update API approval test expectations Re-introduce lock for all socket writes The official `NetworkStream` docs are confusing: https://docs.microsoft.com/en-us/dotnet/api/system.net.sockets.networkstream?view=netframework-4.5.1 > Read and write operations can be performed simultaneously on an instance of the NetworkStream class without the need for synchronization. As long as there is one unique thread for the write operations and one unique thread for the read operations, there will be no cross-interference between read and write threads and no synchronization is required. This is a poorly-written way to say "multiple threads must be synchronized". Fixes #681 Update API approval test expectations
Further discussion can be found at:
https://groups.google.com/forum/?nomobile=true#!topic/rabbitmq-users/OyoYTkAyCSk