Skip to content

Commit efabd50

Browse files
Merge branch 'master' into apigenimprovements
2 parents 87aeb07 + 28c2362 commit efabd50

40 files changed

+1367
-884
lines changed

CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
## Changes Between 6.1.0 and 6.1.1
1+
## Changes Between 6.1.0 and 6.2.0
22

3-
A full list of changes can be found in the GitHub milestone: [`6.1.0`](https://github.com/rabbitmq/rabbitmq-dotnet-client/milestone/50?closed=1).
3+
A full list of changes can be found in the GitHub milestone: [`6.2.0`](https://github.com/rabbitmq/rabbitmq-dotnet-client/milestone/49?closed=1).
44

55
## Changes Between 6.0.0 and 6.1.0
66

RUNNING_TESTS.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,16 @@ Two options to accomplish this are covered below.
4242

4343
### Option One: Using a RabbitMQ Release
4444

45-
It is possible to install and run a node using any [binary build](https://www.rabbitmq.com/download.html) suitable
46-
for the platform. Its [CLI tools]() then must be added to `PATH` so that `rabbitmqctl` (`rabbitmqctl.bat` on Windows)
47-
can be invoked directly without using an absolute file path.
45+
It is possible to install and run a node using any [binary build](https://www.rabbitmq.com/download.html)
46+
suitable for the platform. Its [CLI tools]() then must be added to `PATH` so that `rabbitmqctl` can be
47+
invoked directly without using an absolute file path. Note that this method does *not* work on Windows.
4848

49+
On Windows, you must run unit tests as follows (replace `X.Y.Z` with your RabbitMQ version):
50+
51+
```
52+
set RABBITMQ_RABBITMQCTL_PATH=C:\Program Files\RabbitMQ Server\rabbitmq_server-X.Y.Z\sbin\rabbitmqctl.bat
53+
.\run-test.bat
54+
```
4955

5056
### Option Two: Using RabbitMQ Umbrella Repository
5157

projects/Apigen/apigen/Apigen.cs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -821,9 +821,28 @@ public void EmitClassMethodImplementations(AmqpClass c)
821821
EmitLine("");
822822
EmitLine(" public override void WriteArgumentsTo(ref Client.Impl.MethodArgumentWriter writer)");
823823
EmitLine(" {");
824+
var lastWasBitClass = false;
824825
foreach (AmqpField f in m.m_Fields)
825826
{
826-
EmitLine($" writer.Write{MangleClass(ResolveDomain(f.Domain))}(_{MangleMethod(f.Name)});");
827+
string mangleClass = MangleClass(ResolveDomain(f.Domain));
828+
if (mangleClass != "Bit")
829+
{
830+
if (lastWasBitClass)
831+
{
832+
EmitLine($" writer.EndBits();");
833+
lastWasBitClass = false;
834+
}
835+
}
836+
else
837+
{
838+
lastWasBitClass = true;
839+
}
840+
841+
EmitLine($" writer.Write{mangleClass}(_{MangleMethod(f.Name)});");
842+
}
843+
if (lastWasBitClass)
844+
{
845+
EmitLine($" writer.EndBits();");
827846
}
828847
EmitLine(" }");
829848
EmitLine("");
@@ -944,14 +963,14 @@ public void EmitClassMethodImplementations(AmqpClass c)
944963

945964
public void EmitMethodArgumentReader()
946965
{
947-
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlyMemory<byte> memory)");
966+
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlySpan<byte> span)");
948967
EmitLine(" {");
949-
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Span);");
950-
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Slice(2).Span);");
968+
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(span);");
969+
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(span.Slice(2));");
951970
EmitLine(" Client.Impl.MethodBase result = DecodeMethodFrom(classId, methodId);");
952971
EmitLine(" if(result != null)");
953972
EmitLine(" {");
954-
EmitLine(" Client.Impl.MethodArgumentReader reader = new Client.Impl.MethodArgumentReader(memory.Slice(4));");
973+
EmitLine(" Client.Impl.MethodArgumentReader reader = new Client.Impl.MethodArgumentReader(span.Slice(4));");
955974
EmitLine(" result.ReadArgumentsFrom(ref reader);");
956975
EmitLine(" return result;");
957976
EmitLine(" }");

projects/RabbitMQ.Client/RabbitMQ.Client.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
<MinVerVerbosity>minimal</MinVerVerbosity>
2727
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
2828
<PackageOutputPath>..\..\packages</PackageOutputPath>
29+
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
2930
</PropertyGroup>
3031

3132
<PropertyGroup Condition="'$(CONCOURSE_CI_BUILD)' == 'true'">

projects/RabbitMQ.Client/client/api/IBasicPublishBatch.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,14 @@
3737
// The Initial Developer of the Original Code is Pivotal Software, Inc.
3838
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3939
//---------------------------------------------------------------------------
40+
4041
using System;
4142

4243
namespace RabbitMQ.Client
4344
{
4445
public interface IBasicPublishBatch
4546
{
46-
void Add(string exchange, string routingKey, bool mandatory, IBasicProperties properties, byte[] body);
47+
void Add(string exchange, string routingKey, bool mandatory, IBasicProperties properties, ReadOnlyMemory<byte> body);
4748
void Publish();
4849
}
4950
}

projects/RabbitMQ.Client/client/api/IModel.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,12 @@ void BasicPublish(string exchange, string routingKey, bool mandatory,
291291
[AmqpMethodDoNotImplement(null)]
292292
IBasicPublishBatch CreateBasicPublishBatch();
293293

294+
/// <summary>
295+
/// Creates a BasicPublishBatch instance
296+
/// </summary>
297+
[AmqpMethodDoNotImplement(null)]
298+
IBasicPublishBatch CreateBasicPublishBatch(int sizeHint);
299+
294300
/// <summary>
295301
/// Construct a completely empty content header for use with the Basic content class.
296302
/// </summary>

projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ static async Task HandleConcurrent(Work work, ModelBase model, SemaphoreSlim lim
137137
}
138138
catch (Exception)
139139
{
140-
140+
// ignored
141141
}
142142
finally
143143
{

projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1960,5 +1960,15 @@ public IBasicPublishBatch CreateBasicPublishBatch()
19601960

19611961
return ((IFullModel)_delegate).CreateBasicPublishBatch();
19621962
}
1963+
1964+
public IBasicPublishBatch CreateBasicPublishBatch(int sizeHint)
1965+
{
1966+
if (_disposed)
1967+
{
1968+
throw new ObjectDisposedException(GetType().FullName);
1969+
}
1970+
1971+
return ((IFullModel)_delegate).CreateBasicPublishBatch(sizeHint);
1972+
}
19631973
}
19641974
}

projects/RabbitMQ.Client/client/impl/BasicPublishBatch.cs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,33 +39,39 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42-
using System.Buffers;
4342
using System.Collections.Generic;
4443

4544
using RabbitMQ.Client.Framing.Impl;
4645

4746
namespace RabbitMQ.Client.Impl
4847
{
49-
class BasicPublishBatch : IBasicPublishBatch
48+
internal sealed class BasicPublishBatch : IBasicPublishBatch
5049
{
51-
private readonly List<Command> _commands = new List<Command>();
50+
private readonly List<Command> _commands;
5251
private readonly ModelBase _model;
52+
5353
internal BasicPublishBatch (ModelBase model)
5454
{
5555
_model = model;
56+
_commands = new List<Command>();
57+
}
58+
59+
internal BasicPublishBatch (ModelBase model, int sizeHint)
60+
{
61+
_model = model;
62+
_commands = new List<Command>(sizeHint);
5663
}
5764

58-
public void Add(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, byte[] body)
65+
public void Add(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
5966
{
60-
IBasicProperties bp = basicProperties ?? _model.CreateBasicProperties();
6167
var method = new BasicPublish
6268
{
6369
_exchange = exchange,
6470
_routingKey = routingKey,
6571
_mandatory = mandatory
6672
};
6773

68-
_commands.Add(new Command(method, (ContentHeaderBase)bp, body, false));
74+
_commands.Add(new Command(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body, false));
6975
}
7076

7177
public void Publish()

projects/RabbitMQ.Client/client/impl/Command.cs

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@
4040

4141
using System;
4242
using System.Buffers;
43-
using System.Collections.Generic;
4443
using System.Runtime.InteropServices;
45-
using RabbitMQ.Client.Exceptions;
4644
using RabbitMQ.Client.Framing.Impl;
4745

4846
namespace RabbitMQ.Client.Impl
@@ -57,11 +55,6 @@ class Command : IDisposable
5755
private const int EmptyFrameSize = 8;
5856
private readonly bool _returnBufferOnDispose;
5957

60-
static Command()
61-
{
62-
CheckEmptyFrameSize();
63-
}
64-
6558
internal Command(MethodBase method) : this(method, null, null, false)
6659
{
6760
}
@@ -80,38 +73,57 @@ public Command(MethodBase method, ContentHeaderBase header, ReadOnlyMemory<byte>
8073

8174
internal MethodBase Method { get; private set; }
8275

83-
public static void CheckEmptyFrameSize()
76+
internal void Transmit(ushort channelNumber, Connection connection)
8477
{
85-
var f = new EmptyOutboundFrame();
86-
byte[] b = new byte[f.GetMinimumBufferSize()];
87-
f.WriteTo(b);
88-
long actualLength = f.ByteCount;
78+
int maxBodyPayloadBytes = (int)(connection.FrameMax == 0 ? int.MaxValue : connection.FrameMax - EmptyFrameSize);
79+
var size = GetMaxSize(maxBodyPayloadBytes);
8980

90-
if (EmptyFrameSize != actualLength)
81+
// Will be returned by SocketFrameWriter.WriteLoop
82+
var memory = new Memory<byte>(ArrayPool<byte>.Shared.Rent(size), 0, size);
83+
var span = memory.Span;
84+
85+
var offset = Framing.Method.WriteTo(span, channelNumber, Method);
86+
if (Method.HasContent)
9187
{
92-
string message =
93-
string.Format("EmptyFrameSize is incorrect - defined as {0} where the computed value is in fact {1}.",
94-
EmptyFrameSize,
95-
actualLength);
96-
throw new ProtocolViolationException(message);
88+
int remainingBodyBytes = Body.Length;
89+
offset += Framing.Header.WriteTo(span.Slice(offset), channelNumber, Header, remainingBodyBytes);
90+
var bodySpan = Body.Span;
91+
while (remainingBodyBytes > 0)
92+
{
93+
int frameSize = remainingBodyBytes > maxBodyPayloadBytes ? maxBodyPayloadBytes : remainingBodyBytes;
94+
offset += Framing.BodySegment.WriteTo(span.Slice(offset), channelNumber, bodySpan.Slice(bodySpan.Length - remainingBodyBytes, frameSize));
95+
remainingBodyBytes -= frameSize;
96+
}
9797
}
98+
99+
if (offset != size)
100+
{
101+
throw new InvalidOperationException($"Serialized to wrong size, expect {size}, offset {offset}");
102+
}
103+
104+
connection.Write(memory);
98105
}
99106

100-
internal void Transmit(int channelNumber, Connection connection)
107+
private int GetMaxSize(int maxPayloadBytes)
101108
{
102-
connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method));
103-
if (Method.HasContent)
109+
if (!Method.HasContent)
104110
{
105-
connection.WriteFrame(new HeaderOutboundFrame(channelNumber, Header, Body.Length));
106-
int frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
107-
int bodyPayloadMax = (frameMax == 0) ? Body.Length : frameMax - EmptyFrameSize;
108-
for (int offset = 0; offset < Body.Length; offset += bodyPayloadMax)
109-
{
110-
int remaining = Body.Length - offset;
111-
int count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
112-
connection.WriteFrame(new BodySegmentOutboundFrame(channelNumber, Body.Slice(offset, count)));
113-
}
111+
return Framing.Method.FrameSize + Method.GetRequiredBufferSize();
112+
}
113+
114+
return Framing.Method.FrameSize + Method.GetRequiredBufferSize() +
115+
Framing.Header.FrameSize + Header.GetRequiredPayloadBufferSize() +
116+
Framing.BodySegment.FrameSize * GetBodyFrameCount(maxPayloadBytes) + Body.Length;
117+
}
118+
119+
private int GetBodyFrameCount(int maxPayloadBytes)
120+
{
121+
if (maxPayloadBytes == int.MaxValue)
122+
{
123+
return 1;
114124
}
125+
126+
return (Body.Length + maxPayloadBytes - 1) / maxPayloadBytes;
115127
}
116128

117129
public void Dispose()

projects/RabbitMQ.Client/client/impl/CommandAssembler.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,22 +81,27 @@ public Command HandleFrame(in InboundFrame f)
8181
{
8282
throw new UnexpectedFrameException(f.Type);
8383
}
84-
m_method = m_protocol.DecodeMethodFrom(f.Payload);
84+
m_method = m_protocol.DecodeMethodFrom(f.Payload.Span);
8585
m_state = m_method.HasContent ? AssemblyState.ExpectingContentHeader : AssemblyState.Complete;
8686
return CompletedCommand();
8787
case AssemblyState.ExpectingContentHeader:
8888
if (!f.IsHeader())
8989
{
9090
throw new UnexpectedFrameException(f.Type);
9191
}
92-
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(f.Payload.Span));
93-
ulong totalBodyBytes = m_header.ReadFrom(f.Payload.Slice(2));
92+
93+
ReadOnlySpan<byte> span = f.Payload.Span;
94+
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(span));
95+
m_header.ReadFrom(span.Slice(12));
96+
ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(4));
9497
if (totalBodyBytes > MaxArrayOfBytesSize)
9598
{
9699
throw new UnexpectedFrameException(f.Type);
97100
}
98101

99102
m_remainingBodyBytes = (int)totalBodyBytes;
103+
104+
// Is returned by Command.Dispose in Session.HandleFrame
100105
byte[] bodyBytes = ArrayPool<byte>.Shared.Rent(m_remainingBodyBytes);
101106
m_body = new Memory<byte>(bodyBytes, 0, m_remainingBodyBytes);
102107
UpdateContentBodyState();

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42+
using System.Buffers;
4243
using System.Collections.Generic;
4344
using System.IO;
4445
using System.Net;
@@ -62,7 +63,6 @@ internal sealed class Connection : IConnection
6263
private readonly object _eventLock = new object();
6364

6465
///<summary>Heartbeat frame for transmission. Reusable across connections.</summary>
65-
private readonly EmptyOutboundFrame _heartbeatFrame = new EmptyOutboundFrame();
6666

6767
private readonly ManualResetEventSlim _appContinuation = new ManualResetEventSlim(false);
6868

@@ -903,7 +903,7 @@ public void HeartbeatWriteTimerCallback(object state)
903903
{
904904
if (!_closed)
905905
{
906-
WriteFrame(_heartbeatFrame);
906+
Write(Client.Impl.Framing.Heartbeat.GetHeartbeatFrame());
907907
_heartbeatWriteTimer?.Change((int)_heartbeatTimeSpan.TotalMilliseconds, Timeout.Infinite);
908908
}
909909
}
@@ -940,9 +940,9 @@ public override string ToString()
940940
return string.Format("Connection({0},{1})", _id, Endpoint);
941941
}
942942

943-
public void WriteFrame(OutboundFrame f)
943+
public void Write(Memory<byte> memory)
944944
{
945-
_frameHandler.WriteFrame(f);
945+
_frameHandler.Write(memory);
946946
}
947947

948948
public void UpdateSecret(string newSecret, string reason)

0 commit comments

Comments
 (0)