@@ -15,21 +15,45 @@ namespace Tests.Rpc
15
15
{
16
16
public class RpcServerTests ( ITestOutputHelper testOutputHelper ) : IntegrationTest ( testOutputHelper )
17
17
{
18
+ private string _requestQueueName = string . Empty ;
19
+ private string _replyToName = $ "queueReplyTo-{ Now } -{ Guid . NewGuid ( ) } ";
20
+ private string _correlationId = $ "my-correlation-id-{ Guid . NewGuid ( ) } ";
21
+
22
+ public override async Task InitializeAsync ( )
23
+ {
24
+ await base . InitializeAsync ( ) ;
25
+
26
+ Assert . NotNull ( _management ) ;
27
+
28
+ IQueueInfo requestQueueInfo = await _management . Queue ( )
29
+ . Exclusive ( true )
30
+ . AutoDelete ( true )
31
+ . DeclareAsync ( ) ;
32
+
33
+ _requestQueueName = requestQueueInfo . Name ( ) ;
34
+ }
35
+
18
36
[ Fact ]
19
37
public async Task MockRpcServerPingPong ( )
20
38
{
21
39
Assert . NotNull ( _connection ) ;
22
- Assert . NotNull ( _management ) ;
23
- await _management . Queue ( _queueName ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
24
40
TaskCompletionSource < IMessage > tcs = CreateTaskCompletionSource < IMessage > ( ) ;
25
- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
41
+
42
+ Task < IMessage > RpcHandler ( IRpcServer . IContext context , IMessage request )
26
43
{
27
- var reply = context . Message ( "pong" ) ;
44
+ IMessage reply = context . Message ( "pong" ) ;
28
45
tcs . SetResult ( reply ) ;
29
46
return Task . FromResult ( reply ) ;
30
- } ) . RequestQueue ( _queueName ) . BuildAsync ( ) ;
31
- Assert . NotNull ( rpcServer ) ;
32
- IPublisher p = await _connection . PublisherBuilder ( ) . Queue ( _queueName ) . BuildAsync ( ) ;
47
+ }
48
+
49
+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
50
+ . Handler ( RpcHandler )
51
+ . RequestQueue ( _requestQueueName )
52
+ . BuildAsync ( ) ;
53
+
54
+ IPublisher p = await _connection . PublisherBuilder ( )
55
+ . Queue ( _requestQueueName )
56
+ . BuildAsync ( ) ;
33
57
34
58
await p . PublishAsync ( new AmqpMessage ( "test" ) ) ;
35
59
IMessage m = await WhenTcsCompletes ( tcs ) ;
@@ -41,15 +65,21 @@ public async Task MockRpcServerPingPong()
41
65
public async Task RpcServerValidateStateChange ( )
42
66
{
43
67
Assert . NotNull ( _connection ) ;
44
- Assert . NotNull ( _management ) ;
68
+
45
69
List < ( State , State ) > states = [ ] ;
46
- await _management . Queue ( _queueName ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
47
70
TaskCompletionSource < int > tcs = CreateTaskCompletionSource < int > ( ) ;
48
- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
71
+
72
+ static Task < IMessage > RpcHandler ( IRpcServer . IContext context , IMessage request )
49
73
{
50
- var m = context . Message ( request . Body ( ) ) ;
74
+ IMessage m = context . Message ( request . Body ( ) ) ;
51
75
return Task . FromResult ( m ) ;
52
- } ) . RequestQueue ( _queueName ) . BuildAsync ( ) ;
76
+ }
77
+
78
+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
79
+ . Handler ( RpcHandler )
80
+ . RequestQueue ( _requestQueueName )
81
+ . BuildAsync ( ) ;
82
+
53
83
rpcServer . ChangeState += ( sender , fromState , toState , e ) =>
54
84
{
55
85
states . Add ( ( fromState , toState ) ) ;
@@ -58,8 +88,9 @@ public async Task RpcServerValidateStateChange()
58
88
tcs . SetResult ( states . Count ) ;
59
89
}
60
90
} ;
61
- Assert . NotNull ( rpcServer ) ;
91
+
62
92
await rpcServer . CloseAsync ( ) ;
93
+
63
94
int count = await WhenTcsCompletes ( tcs ) ;
64
95
Assert . Equal ( 2 , count ) ;
65
96
Assert . Equal ( State . Open , states [ 0 ] . Item1 ) ;
@@ -76,33 +107,38 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess()
76
107
{
77
108
Assert . NotNull ( _connection ) ;
78
109
Assert . NotNull ( _management ) ;
79
- string requestQueue = _queueName ;
80
- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
81
- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
82
- {
83
- var reply = context . Message ( "pong" ) ;
84
- return Task . FromResult ( reply ) ;
85
- } ) . RequestQueue ( requestQueue ) . BuildAsync ( ) ;
86
110
87
- Assert . NotNull ( rpcServer ) ;
88
- string queueReplyTo = $ "queueReplyTo-{ Now } ";
89
- IQueueSpecification spec = _management . Queue ( queueReplyTo ) . Exclusive ( true ) . AutoDelete ( true ) ;
90
- await spec . DeclareAsync ( ) ;
111
+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
112
+ . Handler ( PongRpcHandler )
113
+ . RequestQueue ( _requestQueueName )
114
+ . BuildAsync ( ) ;
115
+
116
+ IQueueSpecification replyQueueSpec = _management . Queue ( _replyToName )
117
+ . Exclusive ( true )
118
+ . AutoDelete ( true ) ;
119
+ await replyQueueSpec . DeclareAsync ( ) ;
120
+
91
121
TaskCompletionSource < IMessage > tcs = CreateTaskCompletionSource < IMessage > ( ) ;
92
122
93
- IConsumer consumer = await _connection . ConsumerBuilder ( ) . Queue ( queueReplyTo ) . MessageHandler (
94
- ( context , message ) =>
95
- {
96
- context . Accept ( ) ;
97
- tcs . SetResult ( message ) ;
98
- return Task . CompletedTask ;
99
- } ) . BuildAndStartAsync ( ) ;
123
+ Task MessageHandler ( IContext context , IMessage message )
124
+ {
125
+ context . Accept ( ) ;
126
+ tcs . SetResult ( message ) ;
127
+ return Task . CompletedTask ;
128
+ }
100
129
101
- IPublisher publisher = await _connection . PublisherBuilder ( ) . Queue ( requestQueue ) . BuildAsync ( ) ;
102
- Assert . NotNull ( publisher ) ;
103
- AddressBuilder addressBuilder = new ( ) ;
130
+ IConsumer consumer = await _connection . ConsumerBuilder ( )
131
+ . Queue ( replyQueueSpec )
132
+ . MessageHandler ( MessageHandler )
133
+ . BuildAndStartAsync ( ) ;
134
+
135
+ IPublisher publisher = await _connection . PublisherBuilder ( )
136
+ . Queue ( _requestQueueName )
137
+ . BuildAsync ( ) ;
104
138
105
- IMessage message = new AmqpMessage ( "test" ) . ReplyTo ( addressBuilder . Queue ( queueReplyTo ) . Address ( ) ) ;
139
+ AddressBuilder addressBuilder = new ( ) ;
140
+ string replyToAddress = addressBuilder . Queue ( replyQueueSpec ) . Address ( ) ;
141
+ IMessage message = new AmqpMessage ( "test" ) . ReplyTo ( replyToAddress ) ;
106
142
PublishResult pr = await publisher . PublishAsync ( message ) ;
107
143
Assert . Equal ( OutcomeState . Accepted , pr . Outcome . State ) ;
108
144
@@ -123,17 +159,15 @@ public async Task RpcServerClientPingPongWithDefault()
123
159
{
124
160
Assert . NotNull ( _connection ) ;
125
161
Assert . NotNull ( _management ) ;
126
- string requestQueue = _queueName ;
127
- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
128
- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
129
- {
130
- var reply = context . Message ( "pong" ) ;
131
- return Task . FromResult ( reply ) ;
132
- } ) . RequestQueue ( _queueName ) . BuildAsync ( ) ;
133
- Assert . NotNull ( rpcServer ) ;
134
162
135
- IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
136
- . Queue ( requestQueue )
163
+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
164
+ . Handler ( PongRpcHandler )
165
+ . RequestQueue ( _requestQueueName )
166
+ . BuildAsync ( ) ;
167
+
168
+ IRpcClient rpcClient = await _connection . RpcClientBuilder ( )
169
+ . RequestAddress ( )
170
+ . Queue ( _requestQueueName )
137
171
. RpcClient ( )
138
172
. BuildAsync ( ) ;
139
173
@@ -153,27 +187,22 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
153
187
{
154
188
Assert . NotNull ( _connection ) ;
155
189
Assert . NotNull ( _management ) ;
156
- string requestQueue = _queueName ;
157
- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
158
- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
159
- {
160
- var reply = context . Message ( "pong" ) ;
161
- return Task . FromResult ( reply ) ;
162
- } ) . RequestQueue ( _queueName )
163
- . BuildAsync ( ) ;
164
- Assert . NotNull ( rpcServer ) ;
165
190
166
- // custom replyTo queue
167
- IQueueInfo replyTo =
168
- await _management . Queue ( $ "replyTo-{ Now } ") . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
191
+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
192
+ . Handler ( PongRpcHandler )
193
+ . RequestQueue ( _requestQueueName )
194
+ . BuildAsync ( ) ;
169
195
170
- // custom correlationId supplier
171
- const string correlationId = "my-correlation-id" ;
196
+ IQueueInfo replyTo = await _management . Queue ( _replyToName )
197
+ . Exclusive ( true )
198
+ . AutoDelete ( true )
199
+ . DeclareAsync ( ) ;
172
200
173
- IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
174
- . Queue ( requestQueue )
201
+ IRpcClient rpcClient = await _connection . RpcClientBuilder ( )
202
+ . RequestAddress ( )
203
+ . Queue ( _requestQueueName )
175
204
. RpcClient ( )
176
- . CorrelationIdSupplier ( ( ) => correlationId )
205
+ . CorrelationIdSupplier ( ( ) => _correlationId )
177
206
. CorrelationIdExtractor ( message => message . CorrelationId ( ) )
178
207
. ReplyToQueue ( replyTo . Name ( ) )
179
208
. BuildAsync ( ) ;
@@ -182,7 +211,7 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
182
211
183
212
IMessage response = await rpcClient . PublishAsync ( message ) ;
184
213
Assert . Equal ( "pong" , response . Body ( ) ) ;
185
- Assert . Equal ( correlationId , response . CorrelationId ( ) ) ;
214
+ Assert . Equal ( _correlationId , response . CorrelationId ( ) ) ;
186
215
await rpcClient . CloseAsync ( ) ;
187
216
await rpcServer . CloseAsync ( ) ;
188
217
}
@@ -199,34 +228,29 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
199
228
{
200
229
Assert . NotNull ( _connection ) ;
201
230
Assert . NotNull ( _management ) ;
202
- string requestQueue = _queueName ;
203
- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
204
- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
205
- {
206
- var reply = context . Message ( "pong" ) ;
207
- return Task . FromResult ( reply ) ;
208
- } ) . RequestQueue ( _queueName )
209
- //come from the client
231
+
232
+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
233
+ . Handler ( PongRpcHandler )
234
+ . RequestQueue ( _requestQueueName )
210
235
. CorrelationIdExtractor ( message => message . Property ( "correlationId" ) )
211
- // replace the correlation id location with Application properties
212
236
. ReplyPostProcessor ( ( reply , replyCorrelationId ) => reply . Property ( "correlationId" ,
213
237
replyCorrelationId . ToString ( ) ?? throw new InvalidOperationException ( ) ) )
214
238
. BuildAsync ( ) ;
215
- Assert . NotNull ( rpcServer ) ;
216
239
217
- IQueueInfo replyTo =
218
- await _management . Queue ( $ "replyTo-{ Now } ") . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
240
+ IQueueInfo replyTo = await _management . Queue ( _replyToName )
241
+ . Exclusive ( true )
242
+ . AutoDelete ( true )
243
+ . DeclareAsync ( ) ;
219
244
220
- // custom correlationId supplier
221
- const string correlationId = "my-correlation-id" ;
222
245
int correlationIdCounter = 0 ;
223
246
224
- IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
225
- . Queue ( requestQueue )
247
+ IRpcClient rpcClient = await _connection . RpcClientBuilder ( )
248
+ . RequestAddress ( )
249
+ . Queue ( _requestQueueName )
226
250
. RpcClient ( )
227
251
. ReplyToQueue ( replyTo . Name ( ) )
228
252
// replace the correlation id creation with a custom function
229
- . CorrelationIdSupplier ( ( ) => $ "{ correlationId } _{ Interlocked . Increment ( ref correlationIdCounter ) } ")
253
+ . CorrelationIdSupplier ( ( ) => $ "{ _correlationId } _{ Interlocked . Increment ( ref correlationIdCounter ) } ")
230
254
// The server will reply with the correlation id in application properties
231
255
. CorrelationIdExtractor ( message => message . Property ( "correlationId" ) )
232
256
// The client will use application properties to set the correlation id
@@ -244,8 +268,8 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
244
268
IMessage response = await rpcClient . PublishAsync ( message ) ;
245
269
Assert . Equal ( "pong" , response . Body ( ) ) ;
246
270
// the server replies with the correlation id in the application properties
247
- Assert . Equal ( $ "{ correlationId } _{ i } ", response . Property ( "correlationId" ) ) ;
248
- Assert . Equal ( $ "{ correlationId } _{ i } ", response . Properties ( ) [ "correlationId" ] ) ;
271
+ Assert . Equal ( $ "{ _correlationId } _{ i } ", response . Property ( "correlationId" ) ) ;
272
+ Assert . Equal ( $ "{ _correlationId } _{ i } ", response . Properties ( ) [ "correlationId" ] ) ;
249
273
Assert . Single ( response . Properties ( ) ) ;
250
274
i ++ ;
251
275
}
@@ -259,18 +283,16 @@ public async Task RpcClientMultiThreadShouldBeSafe()
259
283
{
260
284
Assert . NotNull ( _connection ) ;
261
285
Assert . NotNull ( _management ) ;
262
-
263
- string requestQueue = _queueName ;
264
-
265
- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
266
286
const int messagesToSend = 99 ;
287
+
267
288
TaskCompletionSource < bool > tcs = CreateTaskCompletionSource ( ) ;
268
289
List < IMessage > messagesReceived = [ ] ;
269
- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
290
+
291
+ Task < IMessage > RpcHandler ( IRpcServer . IContext context , IMessage request )
270
292
{
271
293
try
272
294
{
273
- var reply = context . Message ( "pong" ) ;
295
+ IMessage reply = context . Message ( "pong" ) ;
274
296
messagesReceived . Add ( request ) ;
275
297
return Task . FromResult ( reply ) ;
276
298
}
@@ -281,17 +303,19 @@ public async Task RpcClientMultiThreadShouldBeSafe()
281
303
tcs . SetResult ( true ) ;
282
304
}
283
305
}
284
- } ) . RequestQueue ( requestQueue ) . BuildAsync ( ) ;
306
+ }
285
307
286
- Assert . NotNull ( rpcServer ) ;
308
+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
309
+ . Handler ( RpcHandler )
310
+ . RequestQueue ( _requestQueueName )
311
+ . BuildAsync ( ) ;
287
312
288
313
IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
289
- . Queue ( requestQueue )
314
+ . Queue ( _requestQueueName )
290
315
. RpcClient ( )
291
316
. BuildAsync ( ) ;
292
317
293
318
List < Task > tasks = [ ] ;
294
-
295
319
// we simulate a multi-thread environment
296
320
// where multiple threads send messages to the server
297
321
// and the server replies to each message in a consistent way
@@ -332,25 +356,29 @@ public async Task RpcClientShouldRaiseTimeoutError()
332
356
{
333
357
Assert . NotNull ( _connection ) ;
334
358
Assert . NotNull ( _management ) ;
335
- string requestQueue = _queueName ;
336
- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
337
- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( async ( context , request ) =>
359
+
360
+ static async Task < IMessage > RpcHandler ( IRpcServer . IContext context , IMessage request )
338
361
{
339
362
IMessage reply = context . Message ( "pong" ) ;
340
363
object millisecondsToWait = request . Property ( "wait" ) ;
341
364
await Task . Delay ( TimeSpan . FromMilliseconds ( ( int ) millisecondsToWait ) ) ;
342
365
return reply ;
343
- } ) . RequestQueue ( _queueName ) . BuildAsync ( ) ;
344
- Assert . NotNull ( rpcServer ) ;
366
+ }
345
367
346
- IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
347
- . Queue ( requestQueue )
368
+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
369
+ . Handler ( RpcHandler )
370
+ . RequestQueue ( _requestQueueName )
371
+ . BuildAsync ( ) ;
372
+
373
+ IRpcClient rpcClient = await _connection . RpcClientBuilder ( )
374
+ . RequestAddress ( )
375
+ . Queue ( _requestQueueName )
348
376
. RpcClient ( )
349
377
. Timeout ( TimeSpan . FromMilliseconds ( 300 ) )
350
378
. BuildAsync ( ) ;
351
379
352
- IMessage reply = await rpcClient . PublishAsync (
353
- new AmqpMessage ( "ping" ) . Property ( "wait" , 1 ) ) ;
380
+ IMessage msg = new AmqpMessage ( "ping" ) . Property ( "wait" , 1 ) ;
381
+ IMessage reply = await rpcClient . PublishAsync ( msg ) ;
354
382
Assert . Equal ( "pong" , reply . Body ( ) ) ;
355
383
356
384
await Assert . ThrowsAsync < TimeoutException > ( ( ) => rpcClient . PublishAsync (
@@ -359,5 +387,11 @@ await Assert.ThrowsAsync<TimeoutException>(() => rpcClient.PublishAsync(
359
387
await rpcClient . CloseAsync ( ) ;
360
388
await rpcServer . CloseAsync ( ) ;
361
389
}
390
+
391
+ private static Task < IMessage > PongRpcHandler ( IRpcServer . IContext context , IMessage request )
392
+ {
393
+ IMessage reply = context . Message ( "pong" ) ;
394
+ return Task . FromResult ( reply ) ;
395
+ }
362
396
}
363
397
}
0 commit comments