Skip to content

Commit c06a06e

Browse files
author
petrov-e
committed
add ReadOnlyMemory overload to BasicPublish
1 parent 8617745 commit c06a06e

File tree

8 files changed

+149
-4
lines changed

8 files changed

+149
-4
lines changed

projects/Apigen/apigen/Apigen.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,6 +1112,10 @@ public string SanitisedFullName(Type t)
11121112
{
11131113
return "IDictionary<string, object>";
11141114
}
1115+
if (t.FullName.StartsWith("System.ReadOnlyMemory`1[[System.Byte"))
1116+
{
1117+
return "ReadOnlyMemory<byte>";
1118+
}
11151119

11161120
switch (t.FullName)
11171121
{

projects/RabbitMQ.Client/client/api/IModel.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,18 @@ string BasicConsume(
233233
void BasicPublish(string exchange, string routingKey, bool mandatory,
234234
IBasicProperties basicProperties, byte[] body);
235235

236+
/// <summary>
237+
/// Publishes a message.
238+
/// </summary>
239+
/// <remarks>
240+
/// <para>
241+
/// Routing key must be shorter than 255 bytes.
242+
/// </para>
243+
/// </remarks>
244+
[AmqpMethodDoNotImplement(null)]
245+
void BasicPublish(string exchange, string routingKey, bool mandatory,
246+
IBasicProperties basicProperties, ReadOnlyMemory<byte> body);
247+
236248
/// <summary>
237249
/// Configures QoS parameters of the Basic content-class.
238250
/// </summary>

projects/RabbitMQ.Client/client/api/IModelExtensions.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3939
//---------------------------------------------------------------------------
4040

41+
using System;
4142
using System.Collections.Generic;
4243

4344
namespace RabbitMQ.Client
@@ -93,6 +94,17 @@ public static void BasicPublish(this IModel model, PublicationAddress addr, IBas
9394
model.BasicPublish(addr.ExchangeName, addr.RoutingKey, basicProperties: basicProperties, body: body);
9495
}
9596

97+
/// <summary>
98+
/// (Extension method) Convenience overload of BasicPublish.
99+
/// </summary>
100+
/// <remarks>
101+
/// The publication occurs with mandatory=false and immediate=false.
102+
/// </remarks>
103+
public static void BasicPublish(this IModel model, PublicationAddress addr, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
104+
{
105+
model.BasicPublish(addr.ExchangeName, addr.RoutingKey, basicProperties: basicProperties, body: body);
106+
}
107+
96108
/// <summary>
97109
/// (Extension method) Convenience overload of BasicPublish.
98110
/// </summary>
@@ -104,6 +116,17 @@ public static void BasicPublish(this IModel model, string exchange, string routi
104116
model.BasicPublish(exchange, routingKey, false, basicProperties, body);
105117
}
106118

119+
/// <summary>
120+
/// (Extension method) Convenience overload of BasicPublish.
121+
/// </summary>
122+
/// <remarks>
123+
/// The publication occurs with mandatory=false
124+
/// </remarks>
125+
public static void BasicPublish(this IModel model, string exchange, string routingKey, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
126+
{
127+
model.BasicPublish(exchange, routingKey, false, basicProperties, body);
128+
}
129+
107130
/// <summary>
108131
/// (Spec method) Convenience overload of BasicPublish.
109132
/// </summary>
@@ -112,6 +135,14 @@ public static void BasicPublish(this IModel model, string exchange, string routi
112135
model.BasicPublish(exchange, routingKey, mandatory, basicProperties, body);
113136
}
114137

138+
/// <summary>
139+
/// (Spec method) Convenience overload of BasicPublish.
140+
/// </summary>
141+
public static void BasicPublish(this IModel model, string exchange, string routingKey, bool mandatory = false, IBasicProperties basicProperties = null, ReadOnlyMemory<byte> body = default)
142+
{
143+
model.BasicPublish(exchange, routingKey, mandatory, basicProperties, body);
144+
}
145+
115146
/// <summary>
116147
/// (Spec method) Declare a queue.
117148
/// </summary>

projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ public void _Private_BasicPublish(string exchange,
556556
string routingKey,
557557
bool mandatory,
558558
IBasicProperties basicProperties,
559-
byte[] body)
559+
ReadOnlyMemory<byte> body)
560560
{
561561
if (routingKey == null)
562562
{
@@ -807,6 +807,15 @@ public void BasicPublish(string exchange,
807807
bool mandatory,
808808
IBasicProperties basicProperties,
809809
byte[] body)
810+
{
811+
BasicPublish(exchange, routingKey, mandatory, basicProperties, new ReadOnlyMemory<byte>(body));
812+
}
813+
814+
public void BasicPublish(string exchange,
815+
string routingKey,
816+
bool mandatory,
817+
IBasicProperties basicProperties,
818+
ReadOnlyMemory<byte> body)
810819
{
811820
if (routingKey == null)
812821
{

projects/RabbitMQ.Client/client/impl/IFullModel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ void _Private_BasicPublish(string exchange,
227227
string routingKey,
228228
bool mandatory,
229229
[AmqpContentHeaderMapping] IBasicProperties basicProperties,
230-
[AmqpContentBodyMapping] byte[] body);
230+
[AmqpContentBodyMapping] ReadOnlyMemory<byte> body);
231231

232232
[AmqpForceOneWay]
233233
[AmqpMethodMapping(null, "basic", "recover")]

projects/RabbitMQ.Client/client/impl/ModelBase.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ public MethodBase ModelRpc(MethodBase method, ContentHeaderBase header, byte[] b
349349
}
350350
}
351351

352-
public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body)
352+
public void ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemory<byte> body)
353353
{
354354
if (method.HasContent)
355355
{
@@ -905,7 +905,7 @@ public abstract void _Private_BasicPublish(string exchange,
905905
string routingKey,
906906
bool mandatory,
907907
IBasicProperties basicProperties,
908-
byte[] body);
908+
ReadOnlyMemory<byte> body);
909909

910910
public abstract void _Private_BasicRecover(bool requeue);
911911

@@ -1101,6 +1101,15 @@ public void BasicPublish(string exchange,
11011101
bool mandatory,
11021102
IBasicProperties basicProperties,
11031103
byte[] body)
1104+
{
1105+
BasicPublish(exchange, routingKey, mandatory, basicProperties, new ReadOnlyMemory<byte>(body));
1106+
}
1107+
1108+
public void BasicPublish(string exchange,
1109+
string routingKey,
1110+
bool mandatory,
1111+
IBasicProperties basicProperties,
1112+
ReadOnlyMemory<byte> body)
11041113
{
11051114
if (routingKey == null)
11061115
{

projects/Unit/APIApproval.Approve.approved.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@ namespace RabbitMQ.Client
354354
RabbitMQ.Client.BasicGetResult BasicGet(string queue, bool autoAck);
355355
void BasicNack(ulong deliveryTag, bool multiple, bool requeue);
356356
void BasicPublish(string exchange, string routingKey, bool mandatory, RabbitMQ.Client.IBasicProperties basicProperties, byte[] body);
357+
void BasicPublish(string exchange, string routingKey, bool mandatory, RabbitMQ.Client.IBasicProperties basicProperties, System.ReadOnlyMemory<byte> body);
357358
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
358359
void BasicRecover(bool requeue);
359360
void BasicRecoverAsync(bool requeue);
@@ -399,8 +400,11 @@ namespace RabbitMQ.Client
399400
public static string BasicConsume(this RabbitMQ.Client.IModel model, string queue, bool autoAck, string consumerTag, System.Collections.Generic.IDictionary<string, object> arguments, RabbitMQ.Client.IBasicConsumer consumer) { }
400401
public static string BasicConsume(this RabbitMQ.Client.IModel model, RabbitMQ.Client.IBasicConsumer consumer, string queue, bool autoAck = false, string consumerTag = "", bool noLocal = false, bool exclusive = false, System.Collections.Generic.IDictionary<string, object> arguments = null) { }
401402
public static void BasicPublish(this RabbitMQ.Client.IModel model, RabbitMQ.Client.PublicationAddress addr, RabbitMQ.Client.IBasicProperties basicProperties, byte[] body) { }
403+
public static void BasicPublish(this RabbitMQ.Client.IModel model, RabbitMQ.Client.PublicationAddress addr, RabbitMQ.Client.IBasicProperties basicProperties, System.ReadOnlyMemory<byte> body) { }
402404
public static void BasicPublish(this RabbitMQ.Client.IModel model, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties basicProperties, byte[] body) { }
405+
public static void BasicPublish(this RabbitMQ.Client.IModel model, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties basicProperties, System.ReadOnlyMemory<byte> body) { }
403406
public static void BasicPublish(this RabbitMQ.Client.IModel model, string exchange, string routingKey, bool mandatory = false, RabbitMQ.Client.IBasicProperties basicProperties = null, byte[] body = null) { }
407+
public static void BasicPublish(this RabbitMQ.Client.IModel model, string exchange, string routingKey, bool mandatory = false, RabbitMQ.Client.IBasicProperties basicProperties = null, System.ReadOnlyMemory<byte> body = default) { }
404408
public static void ExchangeBind(this RabbitMQ.Client.IModel model, string destination, string source, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null) { }
405409
public static void ExchangeBindNoWait(this RabbitMQ.Client.IModel model, string destination, string source, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null) { }
406410
public static void ExchangeDeclare(this RabbitMQ.Client.IModel model, string exchange, string type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary<string, object> arguments = null) { }

projects/Unit/TestBasicPublish.cs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using NUnit.Framework;
5+
using RabbitMQ.Client.Events;
6+
7+
namespace RabbitMQ.Client.Unit
8+
{
9+
[TestFixture]
10+
public class TestBasicPublish
11+
{
12+
[Test]
13+
public void TestBasicRoundtripArray()
14+
{
15+
var cf = new ConnectionFactory();
16+
using(IConnection c = cf.CreateConnection())
17+
using(IModel m = c.CreateModel())
18+
{
19+
QueueDeclareOk q = m.QueueDeclare();
20+
IBasicProperties bp = m.CreateBasicProperties();
21+
byte[] sendBody = System.Text.Encoding.UTF8.GetBytes("hi");
22+
byte[] consumeBody = null;
23+
var consumer = new EventingBasicConsumer(m);
24+
var are = new AutoResetEvent(false);
25+
consumer.Received += async (o, a) =>
26+
{
27+
consumeBody = a.Body.ToArray();
28+
are.Set();
29+
await Task.Yield();
30+
};
31+
string tag = m.BasicConsume(q.QueueName, true, consumer);
32+
33+
34+
m.BasicPublish("", q.QueueName, bp, sendBody);
35+
bool waitResFalse = are.WaitOne(2000);
36+
m.BasicCancel(tag);
37+
38+
39+
Assert.IsTrue(waitResFalse);
40+
Assert.AreEqual(sendBody, consumeBody);
41+
}
42+
}
43+
44+
[Test]
45+
public void TestBasicRoundtripReadOnlyMemory()
46+
{
47+
var cf = new ConnectionFactory();
48+
using(IConnection c = cf.CreateConnection())
49+
using(IModel m = c.CreateModel())
50+
{
51+
QueueDeclareOk q = m.QueueDeclare();
52+
IBasicProperties bp = m.CreateBasicProperties();
53+
byte[] sendBody = System.Text.Encoding.UTF8.GetBytes("hi");
54+
byte[] consumeBody = null;
55+
var consumer = new EventingBasicConsumer(m);
56+
var are = new AutoResetEvent(false);
57+
consumer.Received += async (o, a) =>
58+
{
59+
consumeBody = a.Body.ToArray();
60+
are.Set();
61+
await Task.Yield();
62+
};
63+
string tag = m.BasicConsume(q.QueueName, true, consumer);
64+
65+
66+
m.BasicPublish("", q.QueueName, bp, new ReadOnlyMemory<byte>(sendBody));
67+
bool waitResFalse = are.WaitOne(2000);
68+
m.BasicCancel(tag);
69+
70+
71+
Assert.IsTrue(waitResFalse);
72+
Assert.AreEqual(sendBody, consumeBody);
73+
}
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)