Skip to content

Commit 2a99901

Browse files
committed
Unify on IModel
1 parent aa5e8cb commit 2a99901

File tree

7 files changed

+47
-22
lines changed

7 files changed

+47
-22
lines changed

projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ internal sealed class AsyncConsumerWorkService : ConsumerWorkService
1111
private readonly ConcurrentDictionary<IModel, WorkPool> _workPools = new ConcurrentDictionary<IModel, WorkPool>();
1212
private readonly Func<IModel, WorkPool> _startNewWorkPoolFunc = model => StartNewWorkPool(model);
1313

14-
public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
14+
public void Schedule<TWork>(IModel model, TWork work) where TWork : Work
1515
{
1616
/*
1717
* rabbitmq/rabbitmq-dotnet-client#841
@@ -24,7 +24,7 @@ public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
2424

2525
private static WorkPool StartNewWorkPool(IModel model)
2626
{
27-
var newWorkPool = new WorkPool(model as ModelBase);
27+
var newWorkPool = new WorkPool(model);
2828
newWorkPool.Start();
2929
return newWorkPool;
3030
}
@@ -42,10 +42,10 @@ public Task Stop(IModel model)
4242
class WorkPool
4343
{
4444
readonly Channel<Work> _channel;
45-
readonly ModelBase _model;
45+
readonly IModel _model;
4646
private Task _worker;
4747

48-
public WorkPool(ModelBase model)
48+
public WorkPool(IModel model)
4949
{
5050
_model = model;
5151
_channel = Channel.CreateUnbounded<Work>(new UnboundedChannelOptions { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false });

projects/RabbitMQ.Client/client/impl/BasicCancel.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,25 @@ public BasicCancel(IBasicConsumer consumer, string consumerTag) : base(consumer)
1515
_consumerTag = consumerTag;
1616
}
1717

18-
protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
18+
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
1919
{
2020
try
2121
{
2222
await consumer.HandleBasicCancel(_consumerTag).ConfigureAwait(false);
2323
}
2424
catch (Exception e)
2525
{
26+
if (!(model is ModelBase modelBase))
27+
{
28+
return;
29+
}
30+
2631
var details = new Dictionary<string, object>
2732
{
2833
{"consumer", consumer},
2934
{"context", "HandleBasicCancel"}
3035
};
31-
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
36+
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
3237
}
3338
}
3439
}

projects/RabbitMQ.Client/client/impl/BasicCancelOk.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,25 @@ public BasicCancelOk(IBasicConsumer consumer, string consumerTag) : base(consume
1515
_consumerTag = consumerTag;
1616
}
1717

18-
protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
18+
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
1919
{
2020
try
2121
{
2222
await consumer.HandleBasicCancelOk(_consumerTag).ConfigureAwait(false);
2323
}
2424
catch (Exception e)
2525
{
26+
if (!(model is ModelBase modelBase))
27+
{
28+
return;
29+
}
30+
2631
var details = new Dictionary<string, object>()
2732
{
2833
{"consumer", consumer},
2934
{"context", "HandleBasicCancelOk"}
3035
};
31-
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
36+
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
3237
}
3338
}
3439
}

projects/RabbitMQ.Client/client/impl/BasicConsumeOk.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,25 @@ public BasicConsumeOk(IBasicConsumer consumer, string consumerTag) : base(consum
1515
_consumerTag = consumerTag;
1616
}
1717

18-
protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
18+
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
1919
{
2020
try
2121
{
2222
await consumer.HandleBasicConsumeOk(_consumerTag).ConfigureAwait(false);
2323
}
2424
catch (Exception e)
2525
{
26+
if (!(model is ModelBase modelBase))
27+
{
28+
return;
29+
}
30+
2631
var details = new Dictionary<string, object>()
2732
{
2833
{"consumer", consumer},
2934
{"context", "HandleBasicConsumeOk"}
3035
};
31-
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
36+
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
3237
}
3338
}
3439
}

projects/RabbitMQ.Client/client/impl/BasicDeliver.cs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ sealed class BasicDeliver : Work
1818
readonly IBasicProperties _basicProperties;
1919
readonly ReadOnlyMemory<byte> _body;
2020

21-
public BasicDeliver(IBasicConsumer consumer,
22-
string consumerTag,
23-
ulong deliveryTag,
24-
bool redelivered,
25-
string exchange,
26-
string routingKey,
21+
public BasicDeliver(IBasicConsumer consumer,
22+
string consumerTag,
23+
ulong deliveryTag,
24+
bool redelivered,
25+
string exchange,
26+
string routingKey,
2727
IBasicProperties basicProperties,
2828
ReadOnlyMemory<byte> body) : base(consumer)
2929
{
@@ -36,7 +36,7 @@ public BasicDeliver(IBasicConsumer consumer,
3636
_body = body;
3737
}
3838

39-
protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
39+
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
4040
{
4141
try
4242
{
@@ -50,12 +50,17 @@ await consumer.HandleBasicDeliver(_consumerTag,
5050
}
5151
catch (Exception e)
5252
{
53+
if (!(model is ModelBase modelBase))
54+
{
55+
return;
56+
}
57+
5358
var details = new Dictionary<string, object>()
5459
{
5560
{"consumer", consumer},
5661
{"context", "HandleBasicDeliver"}
5762
};
58-
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
63+
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
5964
}
6065
finally
6166
{

projects/RabbitMQ.Client/client/impl/ModelShutdown.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,25 @@ public ModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason) : base(c
1515
_reason = reason;
1616
}
1717

18-
protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
18+
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
1919
{
2020
try
2121
{
2222
await consumer.HandleModelShutdown(model, _reason).ConfigureAwait(false);
2323
}
2424
catch (Exception e)
2525
{
26+
if (!(model is ModelBase modelBase))
27+
{
28+
return;
29+
}
30+
2631
var details = new Dictionary<string, object>()
2732
{
2833
{ "consumer", consumer },
2934
{ "context", "HandleModelShutdown" }
3035
};
31-
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
36+
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
3237
}
3338
}
3439
}

projects/RabbitMQ.Client/client/impl/Work.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ protected Work(IBasicConsumer consumer)
1212
_asyncConsumer = (IAsyncBasicConsumer)consumer;
1313
}
1414

15-
public async Task Execute(ModelBase model)
15+
public async Task Execute(IModel model)
1616
{
1717
try
1818
{
@@ -25,6 +25,6 @@ public async Task Execute(ModelBase model)
2525
}
2626
}
2727

28-
protected abstract Task Execute(ModelBase model, IAsyncBasicConsumer consumer);
28+
protected abstract Task Execute(IModel model, IAsyncBasicConsumer consumer);
2929
}
3030
}

0 commit comments

Comments
 (0)