Skip to content

Commit 16560f2

Browse files
Merge pull request #858 from danielmarbach/unify-onmodel
Unify on IModel
2 parents e84d339 + 7816766 commit 16560f2

File tree

7 files changed

+48
-23
lines changed

7 files changed

+48
-23
lines changed

projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public AsyncConsumerWorkService(int concurrency) : base(concurrency)
1616
_startNewWorkPoolFunc = model => StartNewWorkPool(model);
1717
}
1818

19-
public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
19+
public void Schedule<TWork>(IModel model, TWork work) where TWork : Work
2020
{
2121
/*
2222
* rabbitmq/rabbitmq-dotnet-client#841
@@ -29,7 +29,7 @@ public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
2929

3030
private WorkPool StartNewWorkPool(IModel model)
3131
{
32-
var newWorkPool = new WorkPool(model as ModelBase, _concurrency);
32+
var newWorkPool = new WorkPool(model, _concurrency);
3333
newWorkPool.Start();
3434
return newWorkPool;
3535
}
@@ -47,13 +47,13 @@ public Task Stop(IModel model)
4747
class WorkPool
4848
{
4949
readonly Channel<Work> _channel;
50-
readonly ModelBase _model;
50+
readonly IModel _model;
5151
private Task _worker;
5252
private readonly int _concurrency;
5353
private SemaphoreSlim _limiter;
5454
private CancellationTokenSource _tokenSource;
5555

56-
public WorkPool(ModelBase model, int concurrency)
56+
public WorkPool(IModel model, int concurrency)
5757
{
5858
_concurrency = concurrency;
5959
_model = model;
@@ -125,7 +125,7 @@ async Task LoopWithConcurrency(CancellationToken cancellationToken)
125125
}
126126
}
127127

128-
static async Task HandleConcurrent(Work work, ModelBase model, SemaphoreSlim limiter)
128+
static async Task HandleConcurrent(Work work, IModel model, SemaphoreSlim limiter)
129129
{
130130
try
131131
{

projects/RabbitMQ.Client/client/impl/BasicCancel.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,25 @@ public BasicCancel(IBasicConsumer consumer, string consumerTag) : base(consumer)
1515
_consumerTag = consumerTag;
1616
}
1717

18-
protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
18+
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
1919
{
2020
try
2121
{
2222
await consumer.HandleBasicCancel(_consumerTag).ConfigureAwait(false);
2323
}
2424
catch (Exception e)
2525
{
26+
if (!(model is ModelBase modelBase))
27+
{
28+
return;
29+
}
30+
2631
var details = new Dictionary<string, object>
2732
{
2833
{"consumer", consumer},
2934
{"context", "HandleBasicCancel"}
3035
};
31-
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
36+
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
3237
}
3338
}
3439
}

projects/RabbitMQ.Client/client/impl/BasicCancelOk.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,25 @@ public BasicCancelOk(IBasicConsumer consumer, string consumerTag) : base(consume
1515
_consumerTag = consumerTag;
1616
}
1717

18-
protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
18+
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
1919
{
2020
try
2121
{
2222
await consumer.HandleBasicCancelOk(_consumerTag).ConfigureAwait(false);
2323
}
2424
catch (Exception e)
2525
{
26+
if (!(model is ModelBase modelBase))
27+
{
28+
return;
29+
}
30+
2631
var details = new Dictionary<string, object>()
2732
{
2833
{"consumer", consumer},
2934
{"context", "HandleBasicCancelOk"}
3035
};
31-
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
36+
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
3237
}
3338
}
3439
}

projects/RabbitMQ.Client/client/impl/BasicConsumeOk.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,25 @@ public BasicConsumeOk(IBasicConsumer consumer, string consumerTag) : base(consum
1515
_consumerTag = consumerTag;
1616
}
1717

18-
protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
18+
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
1919
{
2020
try
2121
{
2222
await consumer.HandleBasicConsumeOk(_consumerTag).ConfigureAwait(false);
2323
}
2424
catch (Exception e)
2525
{
26+
if (!(model is ModelBase modelBase))
27+
{
28+
return;
29+
}
30+
2631
var details = new Dictionary<string, object>()
2732
{
2833
{"consumer", consumer},
2934
{"context", "HandleBasicConsumeOk"}
3035
};
31-
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
36+
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
3237
}
3338
}
3439
}

projects/RabbitMQ.Client/client/impl/BasicDeliver.cs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ sealed class BasicDeliver : Work
1818
readonly IBasicProperties _basicProperties;
1919
readonly ReadOnlyMemory<byte> _body;
2020

21-
public BasicDeliver(IBasicConsumer consumer,
22-
string consumerTag,
23-
ulong deliveryTag,
24-
bool redelivered,
25-
string exchange,
26-
string routingKey,
21+
public BasicDeliver(IBasicConsumer consumer,
22+
string consumerTag,
23+
ulong deliveryTag,
24+
bool redelivered,
25+
string exchange,
26+
string routingKey,
2727
IBasicProperties basicProperties,
2828
ReadOnlyMemory<byte> body) : base(consumer)
2929
{
@@ -36,7 +36,7 @@ public BasicDeliver(IBasicConsumer consumer,
3636
_body = body;
3737
}
3838

39-
protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
39+
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
4040
{
4141
try
4242
{
@@ -50,12 +50,17 @@ await consumer.HandleBasicDeliver(_consumerTag,
5050
}
5151
catch (Exception e)
5252
{
53+
if (!(model is ModelBase modelBase))
54+
{
55+
return;
56+
}
57+
5358
var details = new Dictionary<string, object>()
5459
{
5560
{"consumer", consumer},
5661
{"context", "HandleBasicDeliver"}
5762
};
58-
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
63+
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
5964
}
6065
finally
6166
{

projects/RabbitMQ.Client/client/impl/ModelShutdown.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,25 @@ public ModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason) : base(c
1515
_reason = reason;
1616
}
1717

18-
protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
18+
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
1919
{
2020
try
2121
{
2222
await consumer.HandleModelShutdown(model, _reason).ConfigureAwait(false);
2323
}
2424
catch (Exception e)
2525
{
26+
if (!(model is ModelBase modelBase))
27+
{
28+
return;
29+
}
30+
2631
var details = new Dictionary<string, object>()
2732
{
2833
{ "consumer", consumer },
2934
{ "context", "HandleModelShutdown" }
3035
};
31-
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
36+
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
3237
}
3338
}
3439
}

projects/RabbitMQ.Client/client/impl/Work.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ protected Work(IBasicConsumer consumer)
1111
_asyncConsumer = (IAsyncBasicConsumer)consumer;
1212
}
1313

14-
public Task Execute(ModelBase model)
14+
public Task Execute(IModel model)
1515
{
1616
return Execute(model, _asyncConsumer);
1717
}
1818

19-
protected abstract Task Execute(ModelBase model, IAsyncBasicConsumer consumer);
19+
protected abstract Task Execute(IModel model, IAsyncBasicConsumer consumer);
2020
}
2121
}

0 commit comments

Comments
 (0)