Skip to content

Commit 3147ab5

Browse files
committed
improve
1 parent b4d63bc commit 3147ab5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+2145
-2090
lines changed

RabbitMQTest/Program.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ static async Task Main(string[] args)
3535
subscriber.BasicConsume("testqueue", false, "testconsumer", asyncListener);
3636

3737
byte[] payload = new byte[16384];
38-
var batchPublish = Task.Run(async () =>
38+
var batchPublish = Task.Run(() =>
3939
{
4040
while (messagesSent < batchesToSend * itemsPerBatch)
4141
{
@@ -81,7 +81,7 @@ static async Task Main(string[] args)
8181
Console.ReadLine();
8282
}
8383

84-
private static Task AsyncListener_Received(object sender, BasicDeliverEventArgs @event)
84+
private static ValueTask AsyncListener_Received(object sender, BasicDeliverEventArgs @event)
8585
{
8686
// Doing things in parallel here is what will eventually trigger the deadlock,
8787
// probably due to a race condition in AsyncConsumerWorkService.Loop, although
@@ -92,7 +92,7 @@ private static Task AsyncListener_Received(object sender, BasicDeliverEventArgs
9292
// is standard practice as well to maximize core utilization and reduce overhead of Thread creation
9393
Interlocked.Increment(ref messagesReceived);
9494
(sender as AsyncDefaultBasicConsumer).Model.BasicAck(@event.DeliveryTag, true);
95-
return Task.CompletedTask;
95+
return default;
9696
}
9797
}
9898
}

build.sh

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ else
99
readonly script_dir="$(cd "$(dirname "$0")" && pwd)"
1010
fi
1111

12-
dotnet restore "$script_dir/RabbitMQDotNetClient.sln"
13-
dotnet run -p "$script_dir/projects/client/Apigen/Apigen.csproj" --apiName:AMQP_0_9_1 \
14-
"$script_dir/docs/specs/amqp0-9-1.stripped.xml" \
15-
"$script_dir/gensrc/autogenerated-api-0-9-1.cs"
16-
dotnet build "$script_dir/RabbitMQDotNetClient.sln"
12+
cd "$script_dir"
13+
14+
dotnet restore ./RabbitMQDotNetClient.sln
15+
dotnet run -p ./projects/client/Apigen/Apigen.csproj --apiName:AMQP_0_9_1 \
16+
./docs/specs/amqp0-9-1.stripped.xml \
17+
./gensrc/autogenerated-api-0-9-1.cs
18+
dotnet build ./RabbitMQDotNetClient.sln

projects/client/Apigen/src/apigen/Apigen.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@ public void EmitClassProperties(AmqpClass c)
613613
EmitLine(" public override ushort ProtocolClassId { get { return " + c.Index + "; } }");
614614
EmitLine(" public override string ProtocolClassName { get { return \"" + c.Name + "\"; } }");
615615
EmitLine("");
616-
EmitLine(" public override void ReadPropertiesFrom(RabbitMQ.Client.Impl.ContentHeaderPropertyReader reader) {");
616+
EmitLine(" public override void ReadPropertiesFrom(ref RabbitMQ.Client.Impl.ContentHeaderPropertyReader reader) {");
617617
foreach (AmqpField f in c.m_Fields)
618618
{
619619
if (IsBoolean(f))
@@ -635,7 +635,7 @@ public void EmitClassProperties(AmqpClass c)
635635
}
636636
EmitLine(" }");
637637
EmitLine("");
638-
EmitLine(" public override void WritePropertiesTo(RabbitMQ.Client.Impl.ContentHeaderPropertyWriter writer) {");
638+
EmitLine(" public override void WritePropertiesTo(ref RabbitMQ.Client.Impl.ContentHeaderPropertyWriter writer) {");
639639
foreach (AmqpField f in c.m_Fields)
640640
{
641641
if (IsBoolean(f))
@@ -770,14 +770,14 @@ public void EmitClassMethodImplementations(AmqpClass c)
770770
EmitLine(" public override bool HasContent { get { return "
771771
+ (m.HasContent ? "true" : "false") + "; } }");
772772
EmitLine("");
773-
EmitLine(" public override void ReadArgumentsFrom(RabbitMQ.Client.Impl.MethodArgumentReader reader) {");
773+
EmitLine(" public override void ReadArgumentsFrom(ref RabbitMQ.Client.Impl.MethodArgumentReader reader) {");
774774
foreach (AmqpField f in m.m_Fields)
775775
{
776776
EmitLine(" m_" + MangleMethod(f.Name) + " = reader.Read" + MangleClass(ResolveDomain(f.Domain)) + "();");
777777
}
778778
EmitLine(" }");
779779
EmitLine("");
780-
EmitLine(" public override void WriteArgumentsTo(RabbitMQ.Client.Impl.MethodArgumentWriter writer) {");
780+
EmitLine(" public override void WriteArgumentsTo(ref RabbitMQ.Client.Impl.MethodArgumentWriter writer) {");
781781
foreach (AmqpField f in m.m_Fields)
782782
{
783783
EmitLine(" writer.Write" + MangleClass(ResolveDomain(f.Domain))
@@ -811,7 +811,7 @@ public void EmitClassMethodImplementations(AmqpClass c)
811811

812812
public void EmitMethodArgumentReader()
813813
{
814-
EmitLine(" public override RabbitMQ.Client.Impl.MethodBase DecodeMethodFrom(RabbitMQ.Util.NetworkBinaryReader reader) {");
814+
EmitLine(" public override RabbitMQ.Client.Impl.MethodBase DecodeMethodFrom(RabbitMQ.Util.BinaryBufferReader reader) {");
815815
EmitLine(" ushort classId = reader.ReadUInt16();");
816816
EmitLine(" ushort methodId = reader.ReadUInt16();");
817817
EmitLine("");
@@ -824,7 +824,8 @@ public void EmitMethodArgumentReader()
824824
{
825825
EmitLine(" case " + m.Index + ": {");
826826
EmitLine(" " + ImplNamespaceBase + "." + MangleMethodClass(c, m) + " result = new " + ImplNamespaceBase + "." + MangleMethodClass(c, m) + "();");
827-
EmitLine(" result.ReadArgumentsFrom(new RabbitMQ.Client.Impl.MethodArgumentReader(reader));");
827+
EmitLine(" var methodReader = new RabbitMQ.Client.Impl.MethodArgumentReader(reader);");
828+
EmitLine(" result.ReadArgumentsFrom(ref methodReader);");
828829
EmitLine(" return result;");
829830
EmitLine(" }");
830831
}
@@ -841,7 +842,7 @@ public void EmitMethodArgumentReader()
841842

842843
public void EmitContentHeaderReader()
843844
{
844-
EmitLine(" public override RabbitMQ.Client.Impl.ContentHeaderBase DecodeContentHeaderFrom(RabbitMQ.Util.NetworkBinaryReader reader) {");
845+
EmitLine(" public override RabbitMQ.Client.Impl.ContentHeaderBase DecodeContentHeaderFrom(RabbitMQ.Util.BinaryBufferReader reader) {");
845846
EmitLine(" ushort classId = reader.ReadUInt16();");
846847
EmitLine("");
847848
EmitLine(" switch (classId) {");

projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<PropertyGroup>
33
<Description>The RabbitMQ .NET client is the official client library for C# (and, implicitly, other .NET languages)</Description>
44
<VersionPrefix>6.0.0</VersionPrefix>
5-
<TargetFrameworks>net461;netstandard2.0</TargetFrameworks>
5+
<TargetFrameworks>net461;netstandard2.0;netstandard2.1</TargetFrameworks>
66
<NoWarn>$(NoWarn);CS1591</NoWarn>
77
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
88
<GenerateDocumentationFile>true</GenerateDocumentationFile>
@@ -19,8 +19,17 @@
1919
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
2020
<GenerateAssemblyCopyrightAttribute>false</GenerateAssemblyCopyrightAttribute>
2121
<GenerateAssemblyVersionAttribute>false</GenerateAssemblyVersionAttribute>
22+
<PublishRepositoryUrl>true</PublishRepositoryUrl>
23+
<EmbedUntrackedSources>true</EmbedUntrackedSources>
24+
<IncludeSymbols>true</IncludeSymbols>
25+
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
26+
<AllowedOutputExtensionsInPackageBuildOutputFolder>$(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb</AllowedOutputExtensionsInPackageBuildOutputFolder>
2227
</PropertyGroup>
2328

29+
<PropertyGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' Or '$(TargetFramework)' == 'net461'">
30+
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
31+
</PropertyGroup>
32+
2433
<ItemGroup>
2534
<Compile Remove="build\**\*" />
2635
<Compile Include="..\..\..\gensrc\autogenerated-api-0-9-1.cs" Exclude="build\**\*;bin\**;obj\**;**\*.xproj;packages\**" />
@@ -33,7 +42,7 @@
3342
</None>
3443
</ItemGroup>
3544

36-
<PropertyGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' ">
45+
<PropertyGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' Or '$(TargetFramework)' == 'netstandard2.1'">
3746
<DefineConstants>$(DefineConstants);CORECLR</DefineConstants>
3847
</PropertyGroup>
3948

@@ -46,9 +55,9 @@
4655
</PropertyGroup>
4756

4857
<ItemGroup>
58+
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="3.1.2" />
4959
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="2.1.1" />
5060
<PackageReference Include="System.Threading.Channels" Version="4.7.0" />
51-
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="1.3.2" />
5261
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="All" Condition="$('TargetFramework') == ('net461')" />
5362
</ItemGroup>
5463

projects/client/RabbitMQ.Client/src/client/api/AsyncDefaultBasicConsumer.cs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
using System.Linq;
44
using System.Threading.Tasks;
55
using RabbitMQ.Client.Events;
6-
using TaskExtensions = RabbitMQ.Client.Impl.TaskExtensions;
76

87
namespace RabbitMQ.Client
98
{
@@ -72,7 +71,7 @@ public string[] ConsumerTags
7271
/// See <see cref="HandleBasicCancelOk"/> for notification of consumer cancellation due to basicCancel
7372
/// </summary>
7473
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
75-
public virtual Task HandleBasicCancel(string consumerTag)
74+
public virtual ValueTask HandleBasicCancel(string consumerTag)
7675
{
7776
return OnCancel(consumerTag);
7877
}
@@ -81,7 +80,7 @@ public virtual Task HandleBasicCancel(string consumerTag)
8180
/// Called upon successful deregistration of the consumer from the broker.
8281
/// </summary>
8382
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
84-
public virtual Task HandleBasicCancelOk(string consumerTag)
83+
public virtual ValueTask HandleBasicCancelOk(string consumerTag)
8584
{
8685
return OnCancel(consumerTag);
8786
}
@@ -90,11 +89,11 @@ public virtual Task HandleBasicCancelOk(string consumerTag)
9089
/// Called upon successful registration of the consumer with the broker.
9190
/// </summary>
9291
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
93-
public virtual Task HandleBasicConsumeOk(string consumerTag)
92+
public virtual ValueTask HandleBasicConsumeOk(string consumerTag)
9493
{
9594
m_consumerTags.Add(consumerTag);
9695
IsRunning = true;
97-
return TaskExtensions.CompletedTask;
96+
return default;
9897
}
9998

10099
/// <summary>
@@ -105,7 +104,7 @@ public virtual Task HandleBasicConsumeOk(string consumerTag)
105104
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IModel.BasicAck"/>.
106105
/// The implementation of this method in this class does NOT acknowledge such messages.
107106
/// </remarks>
108-
public virtual Task HandleBasicDeliver(string consumerTag,
107+
public virtual ValueTask HandleBasicDeliver(string consumerTag,
109108
ulong deliveryTag,
110109
bool redelivered,
111110
string exchange,
@@ -114,15 +113,15 @@ public virtual Task HandleBasicDeliver(string consumerTag,
114113
byte[] body)
115114
{
116115
// Nothing to do here.
117-
return TaskExtensions.CompletedTask;
116+
return default;
118117
}
119118

120119
/// <summary>
121120
/// Called when the model shuts down.
122121
/// </summary>
123122
/// <param name="model"> Common AMQP model.</param>
124123
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
125-
public virtual Task HandleModelShutdown(object model, ShutdownEventArgs reason)
124+
public virtual ValueTask HandleModelShutdown(object model, ShutdownEventArgs reason)
126125
{
127126
ShutdownReason = reason;
128127
return OnCancel(m_consumerTags.ToArray());
@@ -135,7 +134,7 @@ public virtual Task HandleModelShutdown(object model, ShutdownEventArgs reason)
135134
/// This default implementation simply sets the <see cref="IsRunning"/>
136135
/// property to false, and takes no further action.
137136
/// </remarks>
138-
public virtual async Task OnCancel(params string[] consumerTags)
137+
public virtual async ValueTask OnCancel(params string[] consumerTags)
139138
{
140139
IsRunning = false;
141140
var handler = ConsumerCancelled;
@@ -184,4 +183,4 @@ void IBasicConsumer.HandleBasicCancel(string consumerTag)
184183
throw new InvalidOperationException("Should never be called.");
185184
}
186185
}
187-
}
186+
}

projects/client/RabbitMQ.Client/src/client/api/DefaultBasicConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class DefaultBasicConsumer : IBasicConsumer
5959
{
6060
private readonly object m_eventLock = new object();
6161
private readonly HashSet<string> m_consumerTags = new HashSet<string>();
62-
public EventHandler<ConsumerEventArgs> m_consumerCancelled;
62+
private EventHandler<ConsumerEventArgs> m_consumerCancelled;
6363

6464
/// <summary>
6565
/// Creates a new instance of an <see cref="DefaultBasicConsumer"/>.

projects/client/RabbitMQ.Client/src/client/api/IAsyncBasicConsumer.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,19 @@ public interface IAsyncBasicConsumer
2222
/// See <see cref="HandleBasicCancelOk"/> for notification of consumer cancellation due to basicCancel
2323
/// </summary>
2424
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
25-
Task HandleBasicCancel(string consumerTag);
25+
ValueTask HandleBasicCancel(string consumerTag);
2626

2727
/// <summary>
2828
/// Called upon successful deregistration of the consumer from the broker.
2929
/// </summary>
3030
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
31-
Task HandleBasicCancelOk(string consumerTag);
31+
ValueTask HandleBasicCancelOk(string consumerTag);
3232

3333
/// <summary>
3434
/// Called upon successful registration of the consumer with the broker.
3535
/// </summary>
3636
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
37-
Task HandleBasicConsumeOk(string consumerTag);
37+
ValueTask HandleBasicConsumeOk(string consumerTag);
3838

3939
/// <summary>
4040
/// Called each time a message arrives for this consumer.
@@ -44,7 +44,7 @@ public interface IAsyncBasicConsumer
4444
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IModel.BasicAck"/>.
4545
/// The implementation of this method in this class does NOT acknowledge such messages.
4646
/// </remarks>
47-
Task HandleBasicDeliver(string consumerTag,
47+
ValueTask HandleBasicDeliver(string consumerTag,
4848
ulong deliveryTag,
4949
bool redelivered,
5050
string exchange,
@@ -57,6 +57,6 @@ Task HandleBasicDeliver(string consumerTag,
5757
/// </summary>
5858
/// <param name="model"> Common AMQP model.</param>
5959
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
60-
Task HandleModelShutdown(object model, ShutdownEventArgs reason);
60+
ValueTask HandleModelShutdown(object model, ShutdownEventArgs reason);
6161
}
62-
}
62+
}

projects/client/RabbitMQ.Client/src/client/content/BasicMessageBuilder.cs

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,9 @@ public class BasicMessageBuilder : IMessageBuilder
5353
/// <summary>
5454
/// By default, new instances of BasicMessageBuilder and its subclasses will have this much initial buffer space.
5555
/// </summary>
56-
public const int DefaultAccumulatorSize = 1024;
56+
public const int DefaultAccumulatorSize = 4096;
5757

58-
protected MemoryStream m_accumulator;
59-
protected NetworkBinaryWriter m_writer;
58+
protected BinaryBufferWriter m_writer;
6059

6160
/// <summary>
6261
/// Construct an instance ready for writing.
@@ -74,7 +73,7 @@ public BasicMessageBuilder(IModel model) : this(model, DefaultAccumulatorSize)
7473
public BasicMessageBuilder(IModel model, int initialAccumulatorSize)
7574
{
7675
Properties = model.CreateBasicProperties();
77-
m_accumulator = PooledMemoryStream.GetMemoryStream(initialAccumulatorSize);
76+
m_writer = new BinaryBufferWriter(initialAccumulatorSize);
7877

7978
string contentType = GetDefaultContentType();
8079
if (contentType != null)
@@ -89,33 +88,22 @@ public BasicMessageBuilder(IModel model, int initialAccumulatorSize)
8988
public IBasicProperties Properties { get; protected set; }
9089

9190
/// <summary>
92-
/// Retrieve this instance's <see cref="NetworkBinaryWriter"/> writing to BodyStream.
91+
/// Retrieve this instance's <see cref="BinaryBufferWriter"/> writing to BodyStream.
9392
/// </summary>
9493
/// <remarks>
95-
/// If no <see cref="NetworkBinaryWriter"/> instance exists, one is created,
94+
/// If no <see cref="BinaryBufferWriter"/> instance exists, one is created,
9695
/// pointing at the beginning of the accumulator. If one
9796
/// already exists, the existing instance is returned. The
9897
/// instance is not reset.
9998
/// </remarks>
100-
public NetworkBinaryWriter Writer
99+
public BinaryBufferWriter Writer
101100
{
102101
get
103102
{
104-
if (m_writer == null)
105-
{
106-
m_writer = new NetworkBinaryWriter(m_accumulator);
107-
}
108103
return m_writer;
109104
}
110105
}
111106

112-
/// <summary>
113-
/// Retrieve the <see cref="Stream"/> being used to construct the message body.
114-
/// </summary>
115-
public Stream BodyStream
116-
{
117-
get { return m_accumulator; }
118-
}
119107

120108
/// <summary>
121109
/// Retrieves the dictionary that will be used to construct the message header table.
@@ -133,12 +121,17 @@ public IDictionary<string, object> Headers
133121
}
134122
}
135123

124+
public void Dispose()
125+
{
126+
m_writer.Dispose();
127+
}
128+
136129
/// <summary>
137130
/// Finish and retrieve the content body for transmission.
138131
/// </summary>
139132
public virtual byte[] GetContentBody()
140133
{
141-
return m_accumulator.ToArray();
134+
return m_writer.Buffer.ToArray();
142135
}
143136

144137
/// <summary>
@@ -164,7 +157,7 @@ public virtual string GetDefaultContentType()
164157
/// </summary>
165158
public IMessageBuilder RawWrite(byte value)
166159
{
167-
BodyStream.WriteByte(value);
160+
m_writer.Write(value);
168161
return this;
169162
}
170163

@@ -181,7 +174,7 @@ public IMessageBuilder RawWrite(byte[] bytes)
181174
/// </summary>
182175
public IMessageBuilder RawWrite(byte[] bytes, int offset, int length)
183176
{
184-
BodyStream.Write(bytes, offset, length);
177+
m_writer.Write(bytes, offset, length);
185178
return this;
186179
}
187180
}

0 commit comments

Comments
 (0)