Skip to content

Commit 21a5c07

Browse files
committed
unify rpc calls
1 parent 6f8e660 commit 21a5c07

File tree

2 files changed

+28
-86
lines changed

2 files changed

+28
-86
lines changed

projects/RabbitMQ.Client/client/framing/Model.cs

Lines changed: 16 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34-
using RabbitMQ.Client.Exceptions;
3534
using RabbitMQ.Client.Impl;
3635

3736
namespace RabbitMQ.Client.Framing.Impl
@@ -95,11 +94,7 @@ public override void _Private_ChannelFlowOk(bool active)
9594

9695
public override void _Private_ChannelOpen(string outOfBand)
9796
{
98-
MethodBase __repBase = ModelRpc(new ChannelOpen(outOfBand));
99-
if (!(__repBase is ChannelOpenOk))
100-
{
101-
throw new UnexpectedMethodException(__repBase);
102-
}
97+
ModelRpc<ChannelOpenOk>(new ChannelOpen(outOfBand));
10398
}
10499

105100
public override void _Private_ConfirmSelect(bool nowait)
@@ -111,21 +106,13 @@ public override void _Private_ConfirmSelect(bool nowait)
111106
}
112107
else
113108
{
114-
MethodBase __repBase = ModelRpc(method);
115-
if (!(__repBase is ConfirmSelectOk))
116-
{
117-
throw new UnexpectedMethodException(__repBase);
118-
}
109+
ModelRpc<ConfirmSelectOk>(method);
119110
}
120111
}
121112

122113
public override void _Private_ConnectionClose(ushort replyCode, string replyText, ushort classId, ushort methodId)
123114
{
124-
MethodBase __repBase = ModelRpc(new ConnectionClose(replyCode, replyText, classId, methodId));
125-
if (!(__repBase is ConnectionCloseOk))
126-
{
127-
throw new UnexpectedMethodException(__repBase);
128-
}
115+
ModelRpc<ConnectionCloseOk>(new ConnectionClose(replyCode, replyText, classId, methodId));
129116
}
130117

131118
public override void _Private_ConnectionCloseOk()
@@ -150,11 +137,7 @@ public override void _Private_ConnectionStartOk(IDictionary<string, object> clie
150137

151138
public override void _Private_UpdateSecret(byte[] newSecret, string reason)
152139
{
153-
MethodBase __repBase = ModelRpc(new ConnectionUpdateSecret(newSecret, reason));
154-
if (!(__repBase is ConnectionUpdateSecretOk))
155-
{
156-
throw new UnexpectedMethodException(__repBase);
157-
}
140+
ModelRpc<ConnectionUpdateSecretOk>(new ConnectionUpdateSecret(newSecret, reason));
158141
}
159142

160143
public override void _Private_ExchangeBind(string destination, string source, string routingKey, bool nowait, IDictionary<string, object> arguments)
@@ -166,11 +149,7 @@ public override void _Private_ExchangeBind(string destination, string source, st
166149
}
167150
else
168151
{
169-
MethodBase __repBase = ModelRpc(method);
170-
if (!(__repBase is ExchangeBindOk))
171-
{
172-
throw new UnexpectedMethodException(__repBase);
173-
}
152+
ModelRpc<ExchangeBindOk>(method);
174153
}
175154
}
176155

@@ -183,11 +162,7 @@ public override void _Private_ExchangeDeclare(string exchange, string type, bool
183162
}
184163
else
185164
{
186-
MethodBase __repBase = ModelRpc(method);
187-
if (!(__repBase is ExchangeDeclareOk))
188-
{
189-
throw new UnexpectedMethodException(__repBase);
190-
}
165+
ModelRpc<ExchangeDeclareOk>(method);
191166
}
192167
}
193168

@@ -200,11 +175,7 @@ public override void _Private_ExchangeDelete(string exchange, bool ifUnused, boo
200175
}
201176
else
202177
{
203-
MethodBase __repBase = ModelRpc(method);
204-
if (!(__repBase is ExchangeDeleteOk))
205-
{
206-
throw new UnexpectedMethodException(__repBase);
207-
}
178+
ModelRpc<ExchangeDeleteOk>(method);
208179
}
209180
}
210181

@@ -217,11 +188,7 @@ public override void _Private_ExchangeUnbind(string destination, string source,
217188
}
218189
else
219190
{
220-
MethodBase __repBase = ModelRpc(method);
221-
if (!(__repBase is ExchangeUnbindOk))
222-
{
223-
throw new UnexpectedMethodException(__repBase);
224-
}
191+
ModelRpc<ExchangeUnbindOk>(method);
225192
}
226193
}
227194

@@ -234,11 +201,7 @@ public override void _Private_QueueBind(string queue, string exchange, string ro
234201
}
235202
else
236203
{
237-
MethodBase __repBase = ModelRpc(method);
238-
if (!(__repBase is QueueBindOk))
239-
{
240-
throw new UnexpectedMethodException(__repBase);
241-
}
204+
ModelRpc<QueueBindOk>(method);
242205
}
243206
}
244207

@@ -264,12 +227,7 @@ public override uint _Private_QueueDelete(string queue, bool ifUnused, bool ifEm
264227
return 0xFFFFFFFF;
265228
}
266229

267-
MethodBase __repBase = ModelRpc(method);
268-
if (!(__repBase is QueueDeleteOk __rep))
269-
{
270-
throw new UnexpectedMethodException(__repBase);
271-
}
272-
return __rep._messageCount;
230+
return ModelRpc<QueueDeleteOk>(method)._messageCount;
273231
}
274232

275233
public override uint _Private_QueuePurge(string queue, bool nowait)
@@ -281,12 +239,7 @@ public override uint _Private_QueuePurge(string queue, bool nowait)
281239
return 0xFFFFFFFF;
282240
}
283241

284-
MethodBase __repBase = ModelRpc(method);
285-
if (!(__repBase is QueuePurgeOk __rep))
286-
{
287-
throw new UnexpectedMethodException(__repBase);
288-
}
289-
return __rep._messageCount;
242+
return ModelRpc<QueuePurgeOk>(method)._messageCount;
290243
}
291244

292245
public override void BasicAck(ulong deliveryTag, bool multiple)
@@ -301,11 +254,7 @@ public override void BasicNack(ulong deliveryTag, bool multiple, bool requeue)
301254

302255
public override void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
303256
{
304-
MethodBase __repBase = ModelRpc(new BasicQos(prefetchSize, prefetchCount, global));
305-
if (!(__repBase is BasicQosOk))
306-
{
307-
throw new UnexpectedMethodException(__repBase);
308-
}
257+
ModelRpc<BasicQosOk>(new BasicQos(prefetchSize, prefetchCount, global));
309258
}
310259

311260
public override void BasicRecoverAsync(bool requeue)
@@ -325,38 +274,22 @@ public override IBasicProperties CreateBasicProperties()
325274

326275
public override void QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments)
327276
{
328-
MethodBase __repBase = ModelRpc(new QueueUnbind(default, queue, exchange, routingKey, arguments));
329-
if (!(__repBase is QueueUnbindOk))
330-
{
331-
throw new UnexpectedMethodException(__repBase);
332-
}
277+
ModelRpc<QueueUnbindOk>(new QueueUnbind(default, queue, exchange, routingKey, arguments));
333278
}
334279

335280
public override void TxCommit()
336281
{
337-
MethodBase __repBase = ModelRpc(new TxCommit());
338-
if (!(__repBase is TxCommitOk))
339-
{
340-
throw new UnexpectedMethodException(__repBase);
341-
}
282+
ModelRpc<TxCommitOk>(new TxCommit());
342283
}
343284

344285
public override void TxRollback()
345286
{
346-
MethodBase __repBase = ModelRpc(new TxRollback());
347-
if (!(__repBase is TxRollbackOk))
348-
{
349-
throw new UnexpectedMethodException(__repBase);
350-
}
287+
ModelRpc<TxRollbackOk>(new TxRollback());
351288
}
352289

353290
public override void TxSelect()
354291
{
355-
MethodBase __repBase = ModelRpc(new TxSelect());
356-
if (!(__repBase is TxSelectOk))
357-
{
358-
throw new UnexpectedMethodException(__repBase);
359-
}
292+
ModelRpc<TxSelectOk>(new TxSelect());
360293
}
361294

362295
public override bool DispatchAsynchronous(in IncomingCommand cmd)

projects/RabbitMQ.Client/client/impl/ModelBase.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -330,14 +330,23 @@ public void HandleCommand(in IncomingCommand cmd)
330330
}
331331
}
332332

333-
public MethodBase ModelRpc(MethodBase method)
333+
public T ModelRpc<T>(MethodBase method) where T : MethodBase
334334
{
335335
var k = new SimpleBlockingRpcContinuation();
336+
var outgoingCommand = new OutgoingCommand(method);
337+
MethodBase baseResult;
336338
lock (_rpcLock)
337339
{
338-
TransmitAndEnqueue(new OutgoingCommand(method), k);
339-
return k.GetReply(ContinuationTimeout).Method;
340+
TransmitAndEnqueue(outgoingCommand, k);
341+
baseResult = k.GetReply(ContinuationTimeout).Method;
340342
}
343+
344+
if (baseResult is T result)
345+
{
346+
return result;
347+
}
348+
349+
throw new UnexpectedMethodException(baseResult);
341350
}
342351

343352
public void ModelSend(MethodBase method)

0 commit comments

Comments
 (0)