Skip to content

Reduce lock contention for frame writes #350

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion RabbitMQDotNetClient.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,6 @@
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAlwaysTreatStructAsNotReorderableMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsCodeFormatterSettingsUpgrader/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsCodeFormatterSettingsUpgrader/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsParsFormattingSettingsUpgrader/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsWrapperSettingsUpgrader/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
103 changes: 26 additions & 77 deletions projects/client/RabbitMQ.Client/src/client/impl/Command.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ public class Command
// - 2 bytes of channel number
// - 4 bytes of frame payload length
// - 1 byte of payload trailer FrameEnd byte
public const int EmptyFrameSize = 8;

public byte[] m_body0;
public IList<byte[]> m_bodyN;
private const int EmptyFrameSize = 8;
private readonly MemoryStream m_body;
private static readonly byte[] m_emptyByteArray = new byte[0];

static Command()
Expand All @@ -68,18 +66,26 @@ static Command()

public Command() : this(null, null, null)
{
m_body = new MemoryStream();
}

public Command(MethodBase method) : this(method, null, null)
{
m_body = new MemoryStream();
}

public Command(MethodBase method, ContentHeaderBase header, byte[] body)
{
Method = method;
Header = header;
m_body0 = body;
m_bodyN = null;
if (body != null)
{
m_body = new MemoryStream(body);
}
else
{
m_body = new MemoryStream();
}
}

public byte[] Body
Expand All @@ -93,7 +99,7 @@ public byte[] Body

public static void CheckEmptyFrameSize()
{
var f = new Frame(Constants.FrameBody, 0, m_emptyByteArray);
var f = new EmptyWriteFrame();
var stream = new MemoryStream();
var writer = new NetworkBinaryWriter(stream);
f.WriteTo(writer);
Expand All @@ -111,50 +117,20 @@ public static void CheckEmptyFrameSize()

public void AppendBodyFragment(byte[] fragment)
{
if (m_body0 == null)
if (fragment != null)
{
m_body0 = fragment;
}
else
{
if (m_bodyN == null)
{
m_bodyN = new List<byte[]>();
}
m_bodyN.Add(fragment);
m_body.Write(fragment, 0, fragment.Length);
}
}

public byte[] ConsolidateBody()
{
if (m_bodyN == null)
{
return m_body0 ?? m_emptyByteArray;
}
else
{
int totalSize = m_body0.Length;
foreach (byte[] fragment in m_bodyN)
{
totalSize += fragment.Length;
}
var result = new byte[totalSize];
Array.Copy(m_body0, 0, result, 0, m_body0.Length);
int offset = m_body0.Length;
foreach (byte[] fragment in m_bodyN)
{
Array.Copy(fragment, 0, result, offset, fragment.Length);
offset += fragment.Length;
}
m_body0 = result;
m_bodyN = null;
return m_body0;
}
return m_body.Length == 0 ? m_emptyByteArray : m_body.ToArray();
}

public void Transmit(int channelNumber, Connection connection)
{
if(Method.HasContent)
if (Method.HasContent)
{
TransmitAsFrameSet(channelNumber, connection);
}
Expand All @@ -166,52 +142,25 @@ public void Transmit(int channelNumber, Connection connection)

public void TransmitAsSingleFrame(int channelNumber, Connection connection)
{
var frame = new Frame(Constants.FrameMethod, channelNumber);
NetworkBinaryWriter writer = frame.GetWriter();
writer.Write((ushort)Method.ProtocolClassId);
writer.Write((ushort)Method.ProtocolMethodId);
var argWriter = new MethodArgumentWriter(writer);
Method.WriteArgumentsTo(argWriter);
argWriter.Flush();
connection.WriteFrame(frame);
connection.WriteFrame(new MethodWriteFrame(channelNumber, Method));
}

public void TransmitAsFrameSet(int channelNumber, Connection connection)
{
var frame = new Frame(Constants.FrameMethod, channelNumber);
NetworkBinaryWriter writer = frame.GetWriter();
writer.Write((ushort)Method.ProtocolClassId);
writer.Write((ushort)Method.ProtocolMethodId);
var argWriter = new MethodArgumentWriter(writer);
Method.WriteArgumentsTo(argWriter);
argWriter.Flush();

var frames = new List<Frame>();
frames.Add(frame);

var frames = new List<WriteFrame>();
frames.Add(new MethodWriteFrame(channelNumber, Method));
if (Method.HasContent)
{
byte[] body = Body;

frame = new Frame(Constants.FrameHeader, channelNumber);
writer = frame.GetWriter();
writer.Write((ushort)Header.ProtocolClassId);
Header.WriteTo(writer, (ulong)body.Length);
frames.Add(frame);
var body = ConsolidateBody(); // Cache, since the property is compiled.

frames.Add(new HeaderWriteFrame(channelNumber, Header, body.Length));
var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
int bodyPayloadMax = (frameMax == 0)
? body.Length
: frameMax - EmptyFrameSize;
var bodyPayloadMax = (frameMax == 0) ? body.Length : frameMax - EmptyFrameSize;
for (int offset = 0; offset < body.Length; offset += bodyPayloadMax)
{
int remaining = body.Length - offset;

frame = new Frame(Constants.FrameBody, channelNumber);
writer = frame.GetWriter();
writer.Write(body, offset,
(remaining < bodyPayloadMax) ? remaining : bodyPayloadMax);
frames.Add(frame);
var remaining = body.Length - offset;
var count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
frames.Add(new BodySegmentWriteFrame(channelNumber, body, offset, count));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ public CommandAssembler(ProtocolBase protocol)
Reset();
}

public Command HandleFrame(Frame f)
public Command HandleFrame(ReadFrame f)
{
switch (m_state)
{
case AssemblyState.ExpectingMethod:
{
if (f.Type != Constants.FrameMethod)
if (!f.IsMethod())
{
throw new UnexpectedFrameException(f);
}
Expand All @@ -86,7 +86,7 @@ public Command HandleFrame(Frame f)
}
case AssemblyState.ExpectingContentHeader:
{
if (f.Type != Constants.FrameHeader)
if (!f.IsHeader())
{
throw new UnexpectedFrameException(f);
}
Expand All @@ -98,20 +98,19 @@ public Command HandleFrame(Frame f)
}
case AssemblyState.ExpectingContentBody:
{
if (f.Type != Constants.FrameBody)
if (!f.IsBody())
{
throw new UnexpectedFrameException(f);
}
byte[] fragment = f.Payload;
m_command.AppendBodyFragment(fragment);
if ((ulong)fragment.Length > m_remainingBodyBytes)
m_command.AppendBodyFragment(f.Payload);
if ((ulong)f.Payload.Length > m_remainingBodyBytes)
{
throw new MalformedFrameException
(string.Format("Overlong content body received - {0} bytes remaining, {1} bytes received",
m_remainingBodyBytes,
fragment.Length));
f.Payload.Length));
}
m_remainingBodyBytes -= (ulong)fragment.Length;
m_remainingBodyBytes -= (ulong)f.Payload.Length;
UpdateContentBodyState();
return CompletedCommand();
}
Expand Down
14 changes: 7 additions & 7 deletions projects/client/RabbitMQ.Client/src/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class Connection : IConnection
private readonly object m_eventLock = new object();

///<summary>Heartbeat frame for transmission. Reusable across connections.</summary>
private readonly Frame m_heartbeatFrame = new Frame(Constants.FrameHeartbeat, 0, new byte[0]);
private readonly EmptyWriteFrame m_heartbeatFrame = new EmptyWriteFrame();

private ManualResetEvent m_appContinuation = new ManualResetEvent(false);
private EventHandler<CallbackExceptionEventArgs> m_callbackException;
Expand Down Expand Up @@ -716,11 +716,11 @@ public void MainLoop()

public void MainLoopIteration()
{
Frame frame = m_frameHandler.ReadFrame();
ReadFrame frame = m_frameHandler.ReadFrame();

NotifyHeartbeatListener();
// We have received an actual frame.
if (frame.Type == Constants.FrameHeartbeat)
if (frame.IsHeartbeat())
{
// Ignore it: we've already just reset the heartbeat
// latch.
Expand Down Expand Up @@ -767,7 +767,7 @@ public void MainLoopIteration()
}
}

public void NotifyHeartbeatListener()
public void NotifyHeartbeatListener()
{
if (m_heartbeat != 0)
{
Expand Down Expand Up @@ -1099,7 +1099,7 @@ public void HeartbeatWriteTimerCallback(object state)
if (!m_closed)
{
WriteFrame(m_heartbeatFrame);
m_frameHandler.Flush();
//m_frameHandler.Flush();
}
}
catch (Exception e)
Expand Down Expand Up @@ -1168,13 +1168,13 @@ public override string ToString()
return string.Format("Connection({0},{1})", m_id, Endpoint);
}

public void WriteFrame(Frame f)
public void WriteFrame(WriteFrame f)
{
m_frameHandler.WriteFrame(f);
m_heartbeatWrite.Set();
}

public void WriteFrameSet(IList<Frame> f)
public void WriteFrameSet(IList<WriteFrame> f)
{
m_frameHandler.WriteFrameSet(f);
m_heartbeatWrite.Set();
Expand Down
Loading