Skip to content

Commit 384b1c3

Browse files
authored
Merge pull request #732 from stebet/serializationAllocReductions
Reducing allocations when (de)serializing frames/commands.
2 parents 4a7a93b + b1c16b4 commit 384b1c3

File tree

82 files changed

+1428
-4396
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+1428
-4396
lines changed

projects/client/Apigen/src/apigen/Apigen.cs

Lines changed: 109 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public static int GetInt(XmlNode n0, string path)
122122
/// <returns>renamed string</returns>
123123
private static string xmlStringMapper(string xmlString)
124124
{
125-
switch(xmlString)
125+
switch (xmlString)
126126
{
127127
case "no-wait":
128128
return "nowait";
@@ -445,12 +445,14 @@ public void EmitPrelude()
445445
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
446446
//---------------------------------------------------------------------------
447447
448+
using System;
448449
using System.Collections.Generic;
449450
using System.Text;
450451
451452
using RabbitMQ.Client;
452453
using RabbitMQ.Client.Exceptions;
453454
using RabbitMQ.Client.Framing.Impl;
455+
using RabbitMQ.Client.Impl;
454456
";
455457
EmitLine(prelude);
456458
}
@@ -681,6 +683,48 @@ public void EmitClassProperties(AmqpClass c)
681683
}
682684
EmitLine(" }");
683685
EmitLine("");
686+
EmitLine(" public override int GetRequiredPayloadBufferSize()");
687+
EmitLine(" {");
688+
EmitLine(" int bufferSize = 0;");
689+
EmitLine(" int fieldCount = 0;");
690+
foreach (AmqpField f in c.m_Fields)
691+
{
692+
switch (MapDomain(f.Domain))
693+
{
694+
case "byte":
695+
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize++; }} // _{MangleMethod(f.Name)} in bytes");
696+
break;
697+
case "string":
698+
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += 1 + Encoding.UTF8.GetByteCount(_{MangleMethod(f.Name)}); }} // _{MangleMethod(f.Name)} in bytes");
699+
break;
700+
case "byte[]":
701+
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += 4 + _{MangleMethod(f.Name)}.Length; }} // _{MangleMethod(f.Name)} in bytes");
702+
break;
703+
case "ushort":
704+
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += 2; }} // _{MangleMethod(f.Name)} in bytes");
705+
break;
706+
case "uint":
707+
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += 4; }} // _{MangleMethod(f.Name)} in bytes");
708+
break;
709+
case "ulong":
710+
case "AmqpTimestamp":
711+
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += 8; }} // _{MangleMethod(f.Name)} in bytes");
712+
break;
713+
case "bool":
714+
// TODO: implement if used, not used anywhere yet
715+
break;
716+
case "IDictionary<string, object>":
717+
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += WireFormatting.GetTableByteCount(_{MangleMethod(f.Name)}); }} // _{MangleMethod(f.Name)} in bytes");
718+
break;
719+
default:
720+
throw new ArgumentOutOfRangeException($"Can't handle size calculations for type = {f.Domain};");
721+
}
722+
}
723+
724+
EmitLine($" bufferSize += Math.Max((int)Math.Ceiling(fieldCount / 15.0), 1) * 2; // number of presence fields in bytes");
725+
EmitLine(" return bufferSize;");
726+
EmitLine(" }");
727+
EmitLine("");
684728
EmitLine(" public override void AppendPropertyDebugStringTo(StringBuilder sb)");
685729
EmitLine(" {");
686730
EmitLine(" sb.Append(\"(\");");
@@ -813,6 +857,51 @@ public void EmitClassMethodImplementations(AmqpClass c)
813857
}
814858
EmitLine(" }");
815859
EmitLine("");
860+
EmitLine(" public override int GetRequiredBufferSize()");
861+
EmitLine(" {");
862+
EmitLine(" int bufferSize = 0;");
863+
int bitCount = 0;
864+
foreach (AmqpField f in m.m_Fields)
865+
{
866+
switch (MapDomain(f.Domain))
867+
{
868+
case "byte":
869+
EmitLine($" bufferSize++; // _{MangleMethod(f.Name)} in bytes");
870+
break;
871+
case "string":
872+
EmitLine($" bufferSize += 1 + Encoding.UTF8.GetByteCount(_{MangleMethod(f.Name)}); // _{MangleMethod(f.Name)} in bytes");
873+
break;
874+
case "byte[]":
875+
EmitLine($" bufferSize += 4 + _{MangleMethod(f.Name)}.Length; // _{MangleMethod(f.Name)} in bytes");
876+
break;
877+
case "ushort":
878+
EmitLine($" bufferSize += 2; // _{MangleMethod(f.Name)} in bytes");
879+
break;
880+
case "uint":
881+
EmitLine($" bufferSize += 4; // _{MangleMethod(f.Name)} in bytes");
882+
break;
883+
case "ulong":
884+
case "AmqpTimestamp":
885+
EmitLine($" bufferSize += 8; // _{MangleMethod(f.Name)} in bytes");
886+
break;
887+
case "bool":
888+
bitCount++;
889+
break;
890+
case "IDictionary<string, object>":
891+
EmitLine($" bufferSize += WireFormatting.GetTableByteCount(_{MangleMethod(f.Name)}); // _{MangleMethod(f.Name)} in bytes");
892+
break;
893+
default:
894+
throw new ArgumentOutOfRangeException($"Can't handle size calculations for type = {f.Domain};");
895+
}
896+
}
897+
898+
if (bitCount > 0)
899+
{
900+
EmitLine($" bufferSize += {Math.Ceiling(bitCount / 8.0)}; // number of bit fields in bytes");
901+
}
902+
EmitLine(" return bufferSize;");
903+
EmitLine(" }");
904+
EmitLine("");
816905
EmitLine(" public override void AppendArgumentDebugStringTo(StringBuilder sb)");
817906
EmitLine(" {");
818907
EmitLine(" sb.Append(\"(\");");
@@ -840,44 +929,41 @@ public void EmitClassMethodImplementations(AmqpClass c)
840929

841930
public void EmitMethodArgumentReader()
842931
{
843-
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(Util.NetworkBinaryReader reader)");
932+
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlyMemory<byte> memory)");
844933
EmitLine(" {");
845-
EmitLine(" ushort classId = reader.ReadUInt16();");
846-
EmitLine(" ushort methodId = reader.ReadUInt16();");
847-
EmitLine(" Client.Impl.MethodBase result = null;");
934+
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(memory);");
935+
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Slice(2));");
936+
EmitLine(" Client.Impl.MethodBase result = DecodeMethodFrom(classId, methodId);");
937+
EmitLine(" if(result != null)");
938+
EmitLine(" {");
939+
EmitLine(" result.ReadArgumentsFrom(new Client.Impl.MethodArgumentReader(memory.Slice(4)));");
940+
EmitLine(" return result;");
941+
EmitLine(" }");
848942
EmitLine("");
943+
EmitLine(" throw new Client.Impl.UnknownClassOrMethodException(classId, methodId);");
944+
EmitLine(" }");
945+
EmitLine("");
946+
EmitLine(" internal Client.Impl.MethodBase DecodeMethodFrom(ushort classId, ushort methodId)");
947+
EmitLine(" {");
849948
EmitLine(" switch ((classId << 16) | methodId)");
850949
EmitLine(" {");
851950
foreach (AmqpClass c in m_classes)
852951
{
853952
foreach (AmqpMethod m in c.m_Methods)
854953
{
855-
EmitLine($" case (ClassConstants.{MangleConstant(c.Name)} << 16) | {MangleConstant(c.Name)}MethodConstants.{MangleConstant(m.Name)}:");
856-
EmitLine(" {");
857-
EmitLine($" result = new Impl.{MangleMethodClass(c, m)}();");
858-
EmitLine($" break;");
859-
EmitLine(" }");
954+
EmitLine($" case (ClassConstants.{MangleConstant(c.Name)} << 16) | {MangleConstant(c.Name)}MethodConstants.{MangleConstant(m.Name)}: return new Impl.{MangleMethodClass(c, m)}();");
860955
}
861956
}
862-
EmitLine(" default: break;");
957+
EmitLine(" default: return null;");
863958
EmitLine(" }");
864-
EmitLine("");
865-
EmitLine(" if(result != null)");
866-
EmitLine(" {");
867-
EmitLine(" result.ReadArgumentsFrom(new Client.Impl.MethodArgumentReader(reader));");
868-
EmitLine(" return result;");
869-
EmitLine(" }");
870-
EmitLine("");
871-
EmitLine(" throw new Client.Impl.UnknownClassOrMethodException(classId, methodId);");
872959
EmitLine(" }");
960+
EmitLine("");
873961
}
874962

875963
public void EmitContentHeaderReader()
876964
{
877-
EmitLine(" internal override Client.Impl.ContentHeaderBase DecodeContentHeaderFrom(Util.NetworkBinaryReader reader)");
965+
EmitLine(" internal override Client.Impl.ContentHeaderBase DecodeContentHeaderFrom(ushort classId)");
878966
EmitLine(" {");
879-
EmitLine(" ushort classId = reader.ReadUInt16();");
880-
EmitLine("");
881967
EmitLine(" switch (classId)");
882968
EmitLine(" {");
883969
foreach (AmqpClass c in m_classes)
@@ -887,9 +973,8 @@ public void EmitContentHeaderReader()
887973
EmitLine($" case {c.Index}: return new {MangleClass(c.Name)}Properties();");
888974
}
889975
}
890-
EmitLine(" default: break;");
976+
EmitLine(" default: throw new Client.Impl.UnknownClassOrMethodException(classId, 0);");
891977
EmitLine(" }");
892-
EmitLine(" throw new Client.Impl.UnknownClassOrMethodException(classId, 0);");
893978
EmitLine(" }");
894979
}
895980

projects/client/RabbitMQ.Client/src/client/api/AsyncDefaultBasicConsumer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public virtual Task HandleBasicDeliver(string consumerTag,
113113
string exchange,
114114
string routingKey,
115115
IBasicProperties properties,
116-
byte[] body)
116+
ReadOnlyMemory<byte> body)
117117
{
118118
// Nothing to do here.
119119
return TaskExtensions.CompletedTask;
@@ -167,7 +167,7 @@ void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
167167
throw new InvalidOperationException("Should never be called.");
168168
}
169169

170-
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body)
170+
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
171171
{
172172
throw new InvalidOperationException("Should never be called.");
173173
}

projects/client/RabbitMQ.Client/src/client/api/BasicGetResult.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3939
//---------------------------------------------------------------------------
4040

41+
using System;
42+
4143
namespace RabbitMQ.Client
4244
{
4345
/// <summary>Represents Basic.GetOk responses from the server.</summary>
@@ -57,7 +59,7 @@ public class BasicGetResult
5759
/// <param name="basicProperties">The Basic-class content header properties for the message.</param>
5860
/// <param name="body"></param>
5961
public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange,
60-
string routingKey, uint messageCount, IBasicProperties basicProperties, byte[] body)
62+
string routingKey, uint messageCount, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
6163
{
6264
DeliveryTag = deliveryTag;
6365
Redelivered = redelivered;
@@ -76,7 +78,7 @@ public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange,
7678
/// <summary>
7779
/// Retrieves the body of this message.
7880
/// </summary>
79-
public byte[] Body { get; private set; }
81+
public ReadOnlyMemory<byte> Body { get; private set; }
8082

8183
/// <summary>
8284
/// Retrieve the delivery tag for this message. See also <see cref="IModel.BasicAck"/>.

projects/client/RabbitMQ.Client/src/client/api/ConnectionFactoryBase.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ public static ITcpClient DefaultSocketFactory(AddressFamily addressFamily)
5959
{
6060
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp)
6161
{
62-
NoDelay = true
62+
NoDelay = true,
63+
ReceiveBufferSize = 65536,
64+
SendBufferSize = 65536
6365
};
6466
return new TcpClientAdapter(socket);
6567
}

projects/client/RabbitMQ.Client/src/client/api/DefaultBasicConsumer.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ namespace RabbitMQ.Client
5757
/// </remarks>
5858
public class DefaultBasicConsumer : IBasicConsumer
5959
{
60-
private readonly object _eventLock = new object();
6160
private readonly HashSet<string> _consumerTags = new HashSet<string>();
6261

6362
/// <summary>
@@ -161,7 +160,7 @@ public virtual void HandleBasicDeliver(string consumerTag,
161160
string exchange,
162161
string routingKey,
163162
IBasicProperties properties,
164-
byte[] body)
163+
ReadOnlyMemory<byte> body)
165164
{
166165
// Nothing to do here.
167166
}

projects/client/RabbitMQ.Client/src/client/api/IAsyncBasicConsumer.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Threading.Tasks;
23

34
using RabbitMQ.Client.Events;
@@ -51,7 +52,7 @@ Task HandleBasicDeliver(string consumerTag,
5152
string exchange,
5253
string routingKey,
5354
IBasicProperties properties,
54-
byte[] body);
55+
ReadOnlyMemory<byte> body);
5556

5657
/// <summary>
5758
/// Called when the model shuts down.
@@ -60,4 +61,4 @@ Task HandleBasicDeliver(string consumerTag,
6061
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
6162
Task HandleModelShutdown(object model, ShutdownEventArgs reason);
6263
}
63-
}
64+
}

projects/client/RabbitMQ.Client/src/client/api/IBasicConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ void HandleBasicDeliver(string consumerTag,
104104
string exchange,
105105
string routingKey,
106106
IBasicProperties properties,
107-
byte[] body);
107+
ReadOnlyMemory<byte> body);
108108

109109
/// <summary>
110110
/// Called when the model shuts down.

0 commit comments

Comments
 (0)