Skip to content

Remove batch publishing as its optimizations are now performed for "regular" publishes/outgoing frames #1028

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 0 additions & 42 deletions projects/RabbitMQ.Client/client/api/IBasicPublishBatch.cs

This file was deleted.

10 changes: 0 additions & 10 deletions projects/RabbitMQ.Client/client/api/IModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,16 +229,6 @@ string BasicConsume(
/// </summary>
void ConfirmSelect();

/// <summary>
/// Creates a BasicPublishBatch instance
/// </summary>
IBasicPublishBatch CreateBasicPublishBatch();

/// <summary>
/// Creates a BasicPublishBatch instance
/// </summary>
IBasicPublishBatch CreateBasicPublishBatch(int sizeHint);

/// <summary>
/// Construct a completely empty content header for use with the Basic content class.
/// </summary>
Expand Down
6 changes: 0 additions & 6 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -420,12 +420,6 @@ private void RecoverState()
}
}

public IBasicPublishBatch CreateBasicPublishBatch()
=> InnerChannel.CreateBasicPublishBatch();

public IBasicPublishBatch CreateBasicPublishBatch(int sizeHint)
=> InnerChannel.CreateBasicPublishBatch(sizeHint);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ThrowIfDisposed()
{
Expand Down
78 changes: 0 additions & 78 deletions projects/RabbitMQ.Client/client/impl/BasicPublishBatch.cs

This file was deleted.

3 changes: 0 additions & 3 deletions projects/RabbitMQ.Client/client/impl/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;

using RabbitMQ.Client.Framing.Impl;

namespace RabbitMQ.Client.Impl
Expand Down Expand Up @@ -75,6 +73,5 @@ internal interface ISession
bool HandleFrame(in InboundFrame frame);
void Notify();
void Transmit<T>(in T cmd) where T : struct, IOutgoingCommand;
void Transmit<T>(List<T> cmds) where T : struct, IOutgoingCommand;
}
}
33 changes: 0 additions & 33 deletions projects/RabbitMQ.Client/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -851,17 +851,6 @@ public BasicGetResult BasicGet(string queue, bool autoAck)

public abstract void BasicNack(ulong deliveryTag, bool multiple, bool requeue);

private void AllocatePublishSeqNos(int count)
{
lock (_confirmLock)
{
for (int i = 0; i < count; i++)
{
_pendingDeliveryTags.AddLast(NextPublishSeqNo++);
}
}
}

public void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
if (routingKey is null)
Expand Down Expand Up @@ -941,19 +930,7 @@ public void ConfirmSelect()
_Private_ConfirmSelect(false);
}

///////////////////////////////////////////////////////////////////////////

public abstract IBasicProperties CreateBasicProperties();
public IBasicPublishBatch CreateBasicPublishBatch()
{
return new BasicPublishBatch(this);
}

public IBasicPublishBatch CreateBasicPublishBatch(int sizeHint)
{
return new BasicPublishBatch(this, sizeHint);
}


public void ExchangeBind(string destination, string source, string routingKey, IDictionary<string, object> arguments)
{
Expand Down Expand Up @@ -1143,16 +1120,6 @@ await CloseAsync(new ShutdownEventArgs(ShutdownInitiator.Application,
}
}

internal void SendCommands(List<OutgoingContentCommand> commands)
{
_flowControlBlock.Wait();
if (NextPublishSeqNo > 0)
{
AllocatePublishSeqNos(commands.Count);
}
Session.Transmit(commands);
}

private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
{
var k = new QueueDeclareRpcContinuation();
Expand Down
10 changes: 0 additions & 10 deletions projects/RabbitMQ.Client/client/impl/SessionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading;

using RabbitMQ.Client.Exceptions;
Expand Down Expand Up @@ -136,14 +135,5 @@ public virtual void Transmit<T>(in T cmd) where T : struct, IOutgoingCommand
// of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock.
Connection.Write(cmd.SerializeToFrames(ChannelNumber, Connection.FrameMax));
}

public virtual void Transmit<T>(List<T> cmds) where T : struct, IOutgoingCommand
{
uint frameMax = Connection.FrameMax;
for (int i = 0; i < cmds.Count; i++)
{
Connection.Write(cmds[i].SerializeToFrames(ChannelNumber, frameMax));
}
}
}
}
4 changes: 1 addition & 3 deletions projects/TestApplications/MassPublish/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@ public static void Main()
{
while (messagesSent < BatchesToSend * ItemsPerBatch)
{
var batch = publisher.CreateBasicPublishBatch();
for (int i = 0; i < ItemsPerBatch; i++)
{
var properties = publisher.CreateBasicProperties();
properties.AppId = "testapp";
properties.CorrelationId = Guid.NewGuid().ToString();
batch.Add("test", "myawesome.routing.key", false, properties, payload);
publisher.BasicPublish("test", "myawesome.routing.key", false, properties, payload);
}
batch.Publish();
messagesSent += ItemsPerBatch;
await publisher.WaitForConfirmsOrDieAsync().ConfigureAwait(false);
}
Expand Down
8 changes: 0 additions & 8 deletions projects/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,6 @@ namespace RabbitMQ.Client
bool IsTypePresent();
bool IsUserIdPresent();
}
public interface IBasicPublishBatch
{
void Add(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, bool mandatory, RabbitMQ.Client.IBasicProperties basicProperties, System.ReadOnlyMemory<byte> body);
void Add(string exchange, string routingKey, bool mandatory, RabbitMQ.Client.IBasicProperties properties, System.ReadOnlyMemory<byte> body);
void Publish();
}
public interface IConnection : RabbitMQ.Client.INetworkConnection, System.IDisposable
{
ushort ChannelMax { get; }
Expand Down Expand Up @@ -403,8 +397,6 @@ namespace RabbitMQ.Client
void ConfirmSelect();
uint ConsumerCount(string queue);
RabbitMQ.Client.IBasicProperties CreateBasicProperties();
RabbitMQ.Client.IBasicPublishBatch CreateBasicPublishBatch();
RabbitMQ.Client.IBasicPublishBatch CreateBasicPublishBatch(int sizeHint);
void ExchangeBind(string destination, string source, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments);
void ExchangeBindNoWait(string destination, string source, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments);
void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, System.Collections.Generic.IDictionary<string, object> arguments);
Expand Down
98 changes: 0 additions & 98 deletions projects/Unit/TestBasicPublishBatch.cs

This file was deleted.