Skip to content

Address some more TODOs #1708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions projects/RabbitMQ.Client.OAuth2/OAuth2Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ public async Task<IToken> RequestTokenAsync(CancellationToken cancellationToken

if (token is null)
{
// TODO specific exception?
throw new InvalidOperationException("token is null");
}

Expand Down Expand Up @@ -274,7 +273,6 @@ public async Task<IToken> RefreshTokenAsync(IToken token,

if (refreshedToken is null)
{
// TODO specific exception?
throw new InvalidOperationException("refreshed token is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public class CallbackExceptionEventArgs : BaseExceptionEventArgs
private const string ContextString = "context";
private const string ConsumerString = "consumer";

// TODO Why is this public when there is a build method?
public CallbackExceptionEventArgs(IDictionary<string, object> detail, Exception exception, CancellationToken cancellationToken = default)
: base(detail, exception, cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
namespace RabbitMQ.Client.Exceptions
{
/// <summary>
/// TODO WHY IS THIS UNREFERENCED?
/// Thrown when the channel receives an RPC reply that it wasn't expecting.
/// </summary>
[Serializable]
Expand Down
29 changes: 8 additions & 21 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
using RabbitMQ.Client.ConsumerDispatching;
using RabbitMQ.Client.Events;
Expand All @@ -43,18 +42,17 @@ namespace RabbitMQ.Client.Impl
{
internal sealed class AutorecoveringChannel : IChannel, IRecoverable
{
private readonly ChannelOptions _channelOptions;
private readonly List<string> _recordedConsumerTags = new List<string>();

private AutorecoveringConnection _connection;
private RecoveryAwareChannel _innerChannel;
private bool _disposed;
private readonly List<string> _recordedConsumerTags = new List<string>();

private ushort _prefetchCountConsumer;
private ushort _prefetchCountGlobal;
private bool _publisherConfirmationsEnabled = false;
private bool _publisherConfirmationTrackingEnabled = false;
private RateLimiter? _outstandingPublisherConfirmationsRateLimiter = null;

private bool _usesTransactions;
private ushort _consumerDispatchConcurrency;

internal IConsumerDispatcher ConsumerDispatcher => InnerChannel.ConsumerDispatcher;

Expand All @@ -73,20 +71,13 @@ public TimeSpan ContinuationTimeout
set => InnerChannel.ContinuationTimeout = value;
}

// TODO just pass create channel options
public AutorecoveringChannel(AutorecoveringConnection conn,
RecoveryAwareChannel innerChannel,
ushort consumerDispatchConcurrency,
bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
RateLimiter? outstandingPublisherConfirmationsRateLimiter)
ChannelOptions channelOptions)
{
_connection = conn;
_innerChannel = innerChannel;
_consumerDispatchConcurrency = consumerDispatchConcurrency;
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
_outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
_channelOptions = channelOptions;
}

public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
Expand Down Expand Up @@ -171,13 +162,9 @@ internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection con

_connection = conn;

RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(
_publisherConfirmationsEnabled,
_publisherConfirmationTrackingEnabled,
_outstandingPublisherConfirmationsRateLimiter,
_consumerDispatchConcurrency,
cancellationToken)
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_channelOptions, cancellationToken)
.ConfigureAwait(false);

newChannel.TakeOver(_innerChannel);

if (_prefetchCountConsumer != 0)
Expand Down
36 changes: 9 additions & 27 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
Expand Down Expand Up @@ -185,22 +184,12 @@ public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumerAs

public IProtocol Protocol => Endpoint.Protocol;

// TODO pass channel creation options?
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(
bool publisherConfirmationsEnabled = false,
bool publisherConfirmationTrackingEnabled = false,
RateLimiter? outstandingPublisherConfirmationsRateLimiter = null,
ushort? consumerDispatchConcurrency = null,
public ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(
ChannelOptions channelOptions,
CancellationToken cancellationToken = default)
{
ISession session = InnerConnection.CreateSession();
var result = new RecoveryAwareChannel(_config, session, consumerDispatchConcurrency);
return (RecoveryAwareChannel)await result.OpenAsync(
publisherConfirmationsEnabled,
publisherConfirmationTrackingEnabled,
outstandingPublisherConfirmationsRateLimiter,
cancellationToken)
.ConfigureAwait(false);
return RecoveryAwareChannel.CreateAndOpenAsync(session, channelOptions, cancellationToken);
}

public override string ToString()
Expand Down Expand Up @@ -271,23 +260,16 @@ public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = d

ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);

RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
options.OutstandingPublisherConfirmationsRateLimiter,
cdc,
cancellationToken)
var channelOptions = ChannelOptions.From(options, _config);

RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(channelOptions, cancellationToken)
.ConfigureAwait(false);

// TODO just pass create channel options
var autorecoveringChannel = new AutorecoveringChannel(this,
recoveryAwareChannel,
cdc,
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
options.OutstandingPublisherConfirmationsRateLimiter);
var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, channelOptions);

await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);

return autorecoveringChannel;
}

Expand Down
27 changes: 16 additions & 11 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
using RabbitMQ.Client.ConsumerDispatching;
using RabbitMQ.Client.Events;
Expand All @@ -62,11 +61,10 @@ internal partial class Channel : IChannel, IRecoverable

internal readonly IConsumerDispatcher ConsumerDispatcher;

public Channel(ConnectionConfig config, ISession session, ushort? perChannelConsumerDispatchConcurrency = null)
public Channel(ISession session, ChannelOptions channelOptions)
{
ContinuationTimeout = config.ContinuationTimeout;
ConsumerDispatcher = new AsyncConsumerDispatcher(this,
perChannelConsumerDispatchConcurrency.GetValueOrDefault(config.ConsumerDispatchConcurrency));
ContinuationTimeout = channelOptions.ContinuationTimeout;
ConsumerDispatcher = new AsyncConsumerDispatcher(this, channelOptions.ConsumerDispatchConcurrency);
Func<Exception, string, CancellationToken, Task> onExceptionAsync = (exception, context, cancellationToken) =>
OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken));
_basicAcksAsyncWrapper = new AsyncEventingWrapper<BasicAckEventArgs>("OnBasicAck", onExceptionAsync);
Expand Down Expand Up @@ -361,14 +359,12 @@ protected bool Enqueue(IRpcContinuation k)
}
}

internal async Task<IChannel> OpenAsync(bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
RateLimiter? outstandingPublisherConfirmationsRateLimiter,
internal async Task<IChannel> OpenAsync(ChannelOptions channelOptions,
CancellationToken cancellationToken)
{
ConfigurePublisherConfirmations(publisherConfirmationsEnabled,
publisherConfirmationTrackingEnabled,
outstandingPublisherConfirmationsRateLimiter);
ConfigurePublisherConfirmations(channelOptions.PublisherConfirmationsEnabled,
channelOptions.PublisherConfirmationTrackingEnabled,
channelOptions.OutstandingPublisherConfirmationsRateLimiter);

bool enqueued = false;
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
Expand Down Expand Up @@ -1497,6 +1493,15 @@ await ModelSendAsync(in method, k.CancellationToken)
}
}

internal static Task<IChannel> CreateAndOpenAsync(CreateChannelOptions createChannelOptions,
ConnectionConfig connectionConfig, ISession session,
CancellationToken cancellationToken)
{
ChannelOptions channelOptions = ChannelOptions.From(createChannelOptions, connectionConfig);
var channel = new Channel(session, channelOptions);
return channel.OpenAsync(channelOptions, cancellationToken);
}

/// <summary>
/// Returning <c>true</c> from this method means that the command was server-originated,
/// and handled already.
Expand Down
90 changes: 90 additions & 0 deletions projects/RabbitMQ.Client/Impl/ChannelOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System;
using System.Threading.RateLimiting;

namespace RabbitMQ.Client.Impl
{
internal sealed class ChannelOptions
{
private readonly bool _publisherConfirmationEnabled;
private readonly bool _publisherConfirmationTrackingEnabled;
private readonly ushort _consumerDispatchConcurrency;
private readonly RateLimiter? _outstandingPublisherConfirmationsRateLimiter;
private readonly TimeSpan _continuationTimeout;

public ChannelOptions(bool publisherConfirmationEnabled,
bool publisherConfirmationTrackingEnabled,
ushort consumerDispatchConcurrency,
RateLimiter? outstandingPublisherConfirmationsRateLimiter,
TimeSpan continuationTimeout)
{
_publisherConfirmationEnabled = publisherConfirmationEnabled;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
_consumerDispatchConcurrency = consumerDispatchConcurrency;
_outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
_continuationTimeout = continuationTimeout;
}

public bool PublisherConfirmationsEnabled => _publisherConfirmationEnabled;

public bool PublisherConfirmationTrackingEnabled => _publisherConfirmationTrackingEnabled;

public ushort ConsumerDispatchConcurrency => _consumerDispatchConcurrency;

public RateLimiter? OutstandingPublisherConfirmationsRateLimiter => _outstandingPublisherConfirmationsRateLimiter;

public TimeSpan ContinuationTimeout => _continuationTimeout;

public static ChannelOptions From(CreateChannelOptions createChannelOptions,
ConnectionConfig connectionConfig)
{
ushort cdc = createChannelOptions.ConsumerDispatchConcurrency.GetValueOrDefault(
connectionConfig.ConsumerDispatchConcurrency);

return new ChannelOptions(createChannelOptions.PublisherConfirmationsEnabled,
createChannelOptions.PublisherConfirmationTrackingEnabled,
cdc,
createChannelOptions.OutstandingPublisherConfirmationsRateLimiter,
connectionConfig.ContinuationTimeout);
}

public static ChannelOptions From(ConnectionConfig connectionConfig)
{
return new ChannelOptions(publisherConfirmationEnabled: false,
publisherConfirmationTrackingEnabled: false,
consumerDispatchConcurrency: Constants.DefaultConsumerDispatchConcurrency,
outstandingPublisherConfirmationsRateLimiter: null,
continuationTimeout: connectionConfig.ContinuationTimeout);
}
}
}
16 changes: 4 additions & 12 deletions projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)

_sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize);
_session0 = new MainSession(this, config.MaxInboundMessageBodySize);
_channel0 = new Channel(_config, _session0);
_channel0 = new Channel(_session0, ChannelOptions.From(config));

ClientProperties = new Dictionary<string, object?>(_config.ClientProperties)
{
Expand Down Expand Up @@ -263,23 +263,15 @@ await CloseAsync(ea, true,
}
}

public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = default,
public Task<IChannel> CreateChannelAsync(CreateChannelOptions? createChannelOptions = default,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();

options ??= CreateChannelOptions.Default;
createChannelOptions ??= CreateChannelOptions.Default;
ISession session = CreateSession();

// TODO channel CreateChannelAsync() to combine ctor and OpenAsync
var channel = new Channel(_config, session, options.ConsumerDispatchConcurrency);
IChannel ch = await channel.OpenAsync(
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
options.OutstandingPublisherConfirmationsRateLimiter,
cancellationToken)
.ConfigureAwait(false);
return ch;
return Channel.CreateAndOpenAsync(createChannelOptions, _config, session, cancellationToken);
}

internal ISession CreateSession()
Expand Down
13 changes: 10 additions & 3 deletions projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@

using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Framing;

namespace RabbitMQ.Client.Impl
{
internal sealed class RecoveryAwareChannel : Channel
{
public RecoveryAwareChannel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null)
: base(config, session, consumerDispatchConcurrency)
public RecoveryAwareChannel(ISession session, ChannelOptions channelOptions)
: base(session, channelOptions)
{
ActiveDeliveryTagOffset = 0;
MaxSeenDeliveryTag = 0;
Expand Down Expand Up @@ -104,5 +103,13 @@ public override ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue,
return default;
}
}

internal static async ValueTask<RecoveryAwareChannel> CreateAndOpenAsync(ISession session, ChannelOptions channelOptions,
CancellationToken cancellationToken)
{
var result = new RecoveryAwareChannel(session, channelOptions);
return (RecoveryAwareChannel)await result.OpenAsync(channelOptions, cancellationToken)
.ConfigureAwait(false);
}
}
}
Loading