Skip to content

Reducing allocations when (de)serializing frames/commands. #732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 109 additions & 24 deletions projects/client/Apigen/src/apigen/Apigen.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public static int GetInt(XmlNode n0, string path)
/// <returns>renamed string</returns>
private static string xmlStringMapper(string xmlString)
{
switch(xmlString)
switch (xmlString)
{
case "no-wait":
return "nowait";
Expand Down Expand Up @@ -445,12 +445,14 @@ public void EmitPrelude()
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Text;

using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Client.Impl;
";
EmitLine(prelude);
}
Expand Down Expand Up @@ -681,6 +683,48 @@ public void EmitClassProperties(AmqpClass c)
}
EmitLine(" }");
EmitLine("");
EmitLine(" public override int GetRequiredPayloadBufferSize()");
EmitLine(" {");
EmitLine(" int bufferSize = 0;");
EmitLine(" int fieldCount = 0;");
foreach (AmqpField f in c.m_Fields)
{
switch (MapDomain(f.Domain))
{
case "byte":
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize++; }} // _{MangleMethod(f.Name)} in bytes");
break;
case "string":
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += 1 + Encoding.UTF8.GetByteCount(_{MangleMethod(f.Name)}); }} // _{MangleMethod(f.Name)} in bytes");
break;
case "byte[]":
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += 4 + _{MangleMethod(f.Name)}.Length; }} // _{MangleMethod(f.Name)} in bytes");
break;
case "ushort":
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += 2; }} // _{MangleMethod(f.Name)} in bytes");
break;
case "uint":
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += 4; }} // _{MangleMethod(f.Name)} in bytes");
break;
case "ulong":
case "AmqpTimestamp":
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += 8; }} // _{MangleMethod(f.Name)} in bytes");
break;
case "bool":
// TODO: implement if used, not used anywhere yet
break;
case "IDictionary<string, object>":
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += WireFormatting.GetTableByteCount(_{MangleMethod(f.Name)}); }} // _{MangleMethod(f.Name)} in bytes");
break;
default:
throw new ArgumentOutOfRangeException($"Can't handle size calculations for type = {f.Domain};");
}
}

EmitLine($" bufferSize += Math.Max((int)Math.Ceiling(fieldCount / 15.0), 1) * 2; // number of presence fields in bytes");
EmitLine(" return bufferSize;");
EmitLine(" }");
EmitLine("");
EmitLine(" public override void AppendPropertyDebugStringTo(StringBuilder sb)");
EmitLine(" {");
EmitLine(" sb.Append(\"(\");");
Expand Down Expand Up @@ -813,6 +857,51 @@ public void EmitClassMethodImplementations(AmqpClass c)
}
EmitLine(" }");
EmitLine("");
EmitLine(" public override int GetRequiredBufferSize()");
EmitLine(" {");
EmitLine(" int bufferSize = 0;");
int bitCount = 0;
foreach (AmqpField f in m.m_Fields)
{
switch (MapDomain(f.Domain))
{
case "byte":
EmitLine($" bufferSize++; // _{MangleMethod(f.Name)} in bytes");
break;
case "string":
EmitLine($" bufferSize += 1 + Encoding.UTF8.GetByteCount(_{MangleMethod(f.Name)}); // _{MangleMethod(f.Name)} in bytes");
break;
case "byte[]":
EmitLine($" bufferSize += 4 + _{MangleMethod(f.Name)}.Length; // _{MangleMethod(f.Name)} in bytes");
break;
case "ushort":
EmitLine($" bufferSize += 2; // _{MangleMethod(f.Name)} in bytes");
break;
case "uint":
EmitLine($" bufferSize += 4; // _{MangleMethod(f.Name)} in bytes");
break;
case "ulong":
case "AmqpTimestamp":
EmitLine($" bufferSize += 8; // _{MangleMethod(f.Name)} in bytes");
break;
case "bool":
bitCount++;
break;
case "IDictionary<string, object>":
EmitLine($" bufferSize += WireFormatting.GetTableByteCount(_{MangleMethod(f.Name)}); // _{MangleMethod(f.Name)} in bytes");
break;
default:
throw new ArgumentOutOfRangeException($"Can't handle size calculations for type = {f.Domain};");
}
}

if (bitCount > 0)
{
EmitLine($" bufferSize += {Math.Ceiling(bitCount / 8.0)}; // number of bit fields in bytes");
}
EmitLine(" return bufferSize;");
EmitLine(" }");
EmitLine("");
EmitLine(" public override void AppendArgumentDebugStringTo(StringBuilder sb)");
EmitLine(" {");
EmitLine(" sb.Append(\"(\");");
Expand Down Expand Up @@ -840,44 +929,41 @@ public void EmitClassMethodImplementations(AmqpClass c)

public void EmitMethodArgumentReader()
{
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(Util.NetworkBinaryReader reader)");
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlyMemory<byte> memory)");
EmitLine(" {");
EmitLine(" ushort classId = reader.ReadUInt16();");
EmitLine(" ushort methodId = reader.ReadUInt16();");
EmitLine(" Client.Impl.MethodBase result = null;");
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(memory);");
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Slice(2));");
EmitLine(" Client.Impl.MethodBase result = DecodeMethodFrom(classId, methodId);");
EmitLine(" if(result != null)");
EmitLine(" {");
EmitLine(" result.ReadArgumentsFrom(new Client.Impl.MethodArgumentReader(memory.Slice(4)));");
EmitLine(" return result;");
EmitLine(" }");
EmitLine("");
EmitLine(" throw new Client.Impl.UnknownClassOrMethodException(classId, methodId);");
EmitLine(" }");
EmitLine("");
EmitLine(" internal Client.Impl.MethodBase DecodeMethodFrom(ushort classId, ushort methodId)");
EmitLine(" {");
EmitLine(" switch ((classId << 16) | methodId)");
EmitLine(" {");
foreach (AmqpClass c in m_classes)
{
foreach (AmqpMethod m in c.m_Methods)
{
EmitLine($" case (ClassConstants.{MangleConstant(c.Name)} << 16) | {MangleConstant(c.Name)}MethodConstants.{MangleConstant(m.Name)}:");
EmitLine(" {");
EmitLine($" result = new Impl.{MangleMethodClass(c, m)}();");
EmitLine($" break;");
EmitLine(" }");
EmitLine($" case (ClassConstants.{MangleConstant(c.Name)} << 16) | {MangleConstant(c.Name)}MethodConstants.{MangleConstant(m.Name)}: return new Impl.{MangleMethodClass(c, m)}();");
}
}
EmitLine(" default: break;");
EmitLine(" default: return null;");
EmitLine(" }");
EmitLine("");
EmitLine(" if(result != null)");
EmitLine(" {");
EmitLine(" result.ReadArgumentsFrom(new Client.Impl.MethodArgumentReader(reader));");
EmitLine(" return result;");
EmitLine(" }");
EmitLine("");
EmitLine(" throw new Client.Impl.UnknownClassOrMethodException(classId, methodId);");
EmitLine(" }");
EmitLine("");
}

public void EmitContentHeaderReader()
{
EmitLine(" internal override Client.Impl.ContentHeaderBase DecodeContentHeaderFrom(Util.NetworkBinaryReader reader)");
EmitLine(" internal override Client.Impl.ContentHeaderBase DecodeContentHeaderFrom(ushort classId)");
EmitLine(" {");
EmitLine(" ushort classId = reader.ReadUInt16();");
EmitLine("");
EmitLine(" switch (classId)");
EmitLine(" {");
foreach (AmqpClass c in m_classes)
Expand All @@ -887,9 +973,8 @@ public void EmitContentHeaderReader()
EmitLine($" case {c.Index}: return new {MangleClass(c.Name)}Properties();");
}
}
EmitLine(" default: break;");
EmitLine(" default: throw new Client.Impl.UnknownClassOrMethodException(classId, 0);");
EmitLine(" }");
EmitLine(" throw new Client.Impl.UnknownClassOrMethodException(classId, 0);");
EmitLine(" }");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public virtual Task HandleBasicDeliver(string consumerTag,
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body)
ReadOnlyMemory<byte> body)
{
// Nothing to do here.
return TaskExtensions.CompletedTask;
Expand Down Expand Up @@ -167,7 +167,7 @@ void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
throw new InvalidOperationException("Should never be called.");
}

void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body)
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
{
throw new InvalidOperationException("Should never be called.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;

namespace RabbitMQ.Client
{
/// <summary>Represents Basic.GetOk responses from the server.</summary>
Expand All @@ -57,7 +59,7 @@ public class BasicGetResult
/// <param name="basicProperties">The Basic-class content header properties for the message.</param>
/// <param name="body"></param>
public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange,
string routingKey, uint messageCount, IBasicProperties basicProperties, byte[] body)
string routingKey, uint messageCount, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
DeliveryTag = deliveryTag;
Redelivered = redelivered;
Expand All @@ -76,7 +78,7 @@ public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange,
/// <summary>
/// Retrieves the body of this message.
/// </summary>
public byte[] Body { get; private set; }
public ReadOnlyMemory<byte> Body { get; private set; }

/// <summary>
/// Retrieve the delivery tag for this message. See also <see cref="IModel.BasicAck"/>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ public static ITcpClient DefaultSocketFactory(AddressFamily addressFamily)
{
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp)
{
NoDelay = true
NoDelay = true,
ReceiveBufferSize = 65536,
SendBufferSize = 65536
};
return new TcpClientAdapter(socket);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ namespace RabbitMQ.Client
/// </remarks>
public class DefaultBasicConsumer : IBasicConsumer
{
private readonly object _eventLock = new object();
private readonly HashSet<string> _consumerTags = new HashSet<string>();

/// <summary>
Expand Down Expand Up @@ -161,7 +160,7 @@ public virtual void HandleBasicDeliver(string consumerTag,
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body)
ReadOnlyMemory<byte> body)
{
// Nothing to do here.
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;
Expand Down Expand Up @@ -51,7 +52,7 @@ Task HandleBasicDeliver(string consumerTag,
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body);
ReadOnlyMemory<byte> body);

/// <summary>
/// Called when the model shuts down.
Expand All @@ -60,4 +61,4 @@ Task HandleBasicDeliver(string consumerTag,
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
Task HandleModelShutdown(object model, ShutdownEventArgs reason);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void HandleBasicDeliver(string consumerTag,
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body);
ReadOnlyMemory<byte> body);

/// <summary>
/// Called when the model shuts down.
Expand Down
Loading