Skip to content

Integrate Channel into ChannelBase #1700

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace RabbitMQ.Client.ConsumerDispatching
{
internal sealed class AsyncConsumerDispatcher : ConsumerDispatcherChannelBase
{
internal AsyncConsumerDispatcher(ChannelBase channel, ushort concurrency)
internal AsyncConsumerDispatcher(Channel channel, ushort concurrency)
: base(channel, concurrency)
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,35 @@

using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Impl;
using RabbitMQ.Client.Logging;

namespace RabbitMQ.Client.ConsumerDispatching
{
internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase, IConsumerDispatcher
{
protected readonly ChannelBase _channel;
protected readonly ChannelReader<WorkStruct> _reader;
private readonly ChannelWriter<WorkStruct> _writer;
protected readonly Impl.Channel _channel;
protected readonly System.Threading.Channels.ChannelReader<WorkStruct> _reader;
private readonly System.Threading.Channels.ChannelWriter<WorkStruct> _writer;
private readonly Task _worker;
private readonly ushort _concurrency;
private bool _quiesce = false;
private bool _disposed;

internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency)
internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)
{
_channel = channel;
_concurrency = concurrency;
var workChannel = Channel.CreateUnbounded<WorkStruct>(new UnboundedChannelOptions

var channelOpts = new System.Threading.Channels.UnboundedChannelOptions
{
SingleReader = _concurrency == 1,
SingleWriter = false,
AllowSynchronousContinuations = false
});
};

var workChannel = System.Threading.Channels.Channel.CreateUnbounded<WorkStruct>(channelOpts);
_reader = workChannel.Reader;
_writer = workChannel.Writer;

Expand Down
155 changes: 0 additions & 155 deletions projects/RabbitMQ.Client/Framing/Channel.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

namespace RabbitMQ.Client.Impl
{
internal abstract class ChannelBase : IChannel, IRecoverable
internal class Channel : IChannel, IRecoverable
{
///<summary>Only used to kick-start a connection open
///sequence. See <see cref="Connection.OpenAsync"/> </summary>
Expand All @@ -71,7 +71,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable

internal readonly IConsumerDispatcher ConsumerDispatcher;

protected ChannelBase(ConnectionConfig config, ISession session, ushort? perChannelConsumerDispatchConcurrency = null)
public Channel(ConnectionConfig config, ISession session, ushort? perChannelConsumerDispatchConcurrency = null)
{
ContinuationTimeout = config.ContinuationTimeout;
ConsumerDispatcher = new AsyncConsumerDispatcher(this,
Expand All @@ -92,6 +92,7 @@ protected ChannelBase(ConnectionConfig config, ISession session, ushort? perChan
}

internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10);

public TimeSpan ContinuationTimeout { get; set; }

public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
Expand Down Expand Up @@ -192,7 +193,7 @@ public void MaybeSetConnectionStartException(Exception ex)
}
}

protected void TakeOver(ChannelBase other)
protected void TakeOver(Channel other)
{
_basicAcksAsyncWrapper.Takeover(other._basicAcksAsyncWrapper);
_basicNacksAsyncWrapper.Takeover(other._basicNacksAsyncWrapper);
Expand Down Expand Up @@ -355,8 +356,6 @@ await ModelSendAsync(in method, k.CancellationToken)
}
}

protected abstract Task<bool> DispatchCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken);

protected bool Enqueue(IRpcContinuation k)
{
if (IsOpen)
Expand Down Expand Up @@ -873,14 +872,26 @@ public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToke
}
}

public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple,
CancellationToken cancellationToken);
public virtual ValueTask BasicAckAsync(ulong deliveryTag, bool multiple,
CancellationToken cancellationToken)
{
var method = new BasicAck(deliveryTag, multiple);
return ModelSendAsync(in method, cancellationToken);
}

public abstract ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue,
CancellationToken cancellationToken);
public virtual ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue,
CancellationToken cancellationToken)
{
var method = new BasicNack(deliveryTag, multiple, requeue);
return ModelSendAsync(in method, cancellationToken);
}

public abstract ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue,
CancellationToken cancellationToken);
public virtual ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue,
CancellationToken cancellationToken)
{
var method = new BasicReject(deliveryTag, requeue);
return ModelSendAsync(in method, cancellationToken);
}

public async Task BasicCancelAsync(string consumerTag, bool noWait,
CancellationToken cancellationToken)
Expand Down Expand Up @@ -1881,5 +1892,93 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
}
}
}

/// <summary>
/// Returning <c>true</c> from this method means that the command was server-originated,
/// and handled already.
/// Returning <c>false</c> (the default) means that the incoming command is the response to
/// a client-initiated RPC call, and must be handled.
/// </summary>
/// <param name="cmd">The incoming command from the AMQP server</param>
/// <param name="cancellationToken">The cancellation token</param>
/// <returns></returns>
private Task<bool> DispatchCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
switch (cmd.CommandId)
{
case ProtocolCommandId.BasicCancel:
{
// Note: always returns true
return HandleBasicCancelAsync(cmd, cancellationToken);
}
case ProtocolCommandId.BasicDeliver:
{
// Note: always returns true
return HandleBasicDeliverAsync(cmd, cancellationToken);
}
case ProtocolCommandId.BasicAck:
{
return HandleBasicAck(cmd, cancellationToken);
}
case ProtocolCommandId.BasicNack:
{
return HandleBasicNack(cmd, cancellationToken);
}
case ProtocolCommandId.BasicReturn:
{
// Note: always returns true
return HandleBasicReturn(cmd, cancellationToken);
}
case ProtocolCommandId.ChannelClose:
{
// Note: always returns true
return HandleChannelCloseAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ChannelCloseOk:
{
// Note: always returns true
return HandleChannelCloseOkAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ChannelFlow:
{
// Note: always returns true
return HandleChannelFlowAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionBlocked:
{
// Note: always returns true
return HandleConnectionBlockedAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionClose:
{
// Note: always returns true
return HandleConnectionCloseAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionSecure:
{
// Note: always returns true
return HandleConnectionSecureAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionStart:
{
// Note: always returns true
return HandleConnectionStartAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionTune:
{
// Note: always returns true
return HandleConnectionTuneAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionUnblocked:
{
// Note: always returns true
return HandleConnectionUnblockedAsync(cancellationToken);
}
default:
{
return Task.FromResult(false);
}
}
}
}
}
Loading