Skip to content

Commit 38e8739

Browse files
committed
Test connection recovery for RPC classes
1 parent 56c149e commit 38e8739

File tree

1 file changed

+93
-0
lines changed

1 file changed

+93
-0
lines changed

src/test/java/com/rabbitmq/model/amqp/RpcTest.java

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@
1717
1818
package com.rabbitmq.model.amqp;
1919

20+
import static com.rabbitmq.model.BackOffDelayPolicy.fixed;
21+
import static com.rabbitmq.model.amqp.Cli.closeConnection;
2022
import static com.rabbitmq.model.amqp.TestUtils.environmentBuilder;
2123
import static java.nio.charset.StandardCharsets.UTF_8;
24+
import static java.time.Duration.ofMillis;
2225
import static org.assertj.core.api.Assertions.assertThat;
26+
import static org.assertj.core.api.Assertions.fail;
2327

2428
import com.rabbitmq.model.*;
2529
import java.util.UUID;
@@ -162,7 +166,96 @@ void rpcWithCustomSettings() {
162166
}
163167
}
164168

169+
@Test
170+
void rpcShouldRecoverAfterConnectionIsClosed()
171+
throws ExecutionException, InterruptedException, TimeoutException {
172+
String clientConnectionName = UUID.randomUUID().toString();
173+
CountDownLatch clientConnectionLatch = new CountDownLatch(1);
174+
String serverConnectionName = UUID.randomUUID().toString();
175+
CountDownLatch serverConnectionLatch = new CountDownLatch(1);
176+
177+
BackOffDelayPolicy backOffDelayPolicy = fixed(ofMillis(100));
178+
Connection serverConnection =
179+
connectionBuilder()
180+
.name(serverConnectionName)
181+
.listeners(recoveredListener(serverConnectionLatch))
182+
.recovery()
183+
.backOffDelayPolicy(backOffDelayPolicy)
184+
.connectionBuilder()
185+
.build();
186+
String requestQueue = serverConnection.management().queue().declare().name();
187+
try (Connection clientConnection =
188+
connectionBuilder()
189+
.name(clientConnectionName)
190+
.listeners(recoveredListener(clientConnectionLatch))
191+
.recovery()
192+
.backOffDelayPolicy(backOffDelayPolicy)
193+
.connectionBuilder()
194+
.build()) {
195+
RpcClient rpcClient =
196+
clientConnection
197+
.rpcClientBuilder()
198+
.requestAddress()
199+
.queue(requestQueue)
200+
.rpcClient()
201+
.build();
202+
203+
serverConnection.rpcServerBuilder().requestQueue(requestQueue).handler(HANDLER).build();
204+
205+
byte[] requestBody = request(UUID.randomUUID().toString());
206+
CompletableFuture<Message> response =
207+
rpcClient.publish(rpcClient.message(requestBody).messageId(UUID.randomUUID()));
208+
assertThat(response.get(10, TimeUnit.SECONDS).body()).isEqualTo(process(requestBody));
209+
210+
closeConnection(clientConnectionName);
211+
requestBody = request(UUID.randomUUID().toString());
212+
try {
213+
rpcClient.publish(rpcClient.message(requestBody).messageId(UUID.randomUUID()));
214+
fail("Client connection is recovering, the call should have failed");
215+
} catch (ModelException e) {
216+
// OK
217+
}
218+
TestUtils.assertThat(clientConnectionLatch).completes();
219+
requestBody = request(UUID.randomUUID().toString());
220+
response = rpcClient.publish(rpcClient.message(requestBody).messageId(UUID.randomUUID()));
221+
assertThat(response.get(10, TimeUnit.SECONDS).body()).isEqualTo(process(requestBody));
222+
223+
closeConnection(serverConnectionName);
224+
requestBody = request(UUID.randomUUID().toString());
225+
response = rpcClient.publish(rpcClient.message(requestBody).messageId(UUID.randomUUID()));
226+
assertThat(response.get(10, TimeUnit.SECONDS).body()).isEqualTo(process(requestBody));
227+
TestUtils.assertThat(serverConnectionLatch).completes();
228+
requestBody = request(UUID.randomUUID().toString());
229+
response = rpcClient.publish(rpcClient.message(requestBody).messageId(UUID.randomUUID()));
230+
assertThat(response.get(10, TimeUnit.SECONDS).body()).isEqualTo(process(requestBody));
231+
} finally {
232+
serverConnection.management().queueDeletion().delete(requestQueue);
233+
serverConnection.close();
234+
}
235+
}
236+
237+
private static AmqpConnectionBuilder connectionBuilder() {
238+
return (AmqpConnectionBuilder) environment.connectionBuilder();
239+
}
240+
165241
private static String process(String in) {
166242
return "*** " + in + " ***";
167243
}
244+
245+
private static byte[] request(String request) {
246+
return request.getBytes(UTF_8);
247+
}
248+
249+
private static byte[] process(byte[] in) {
250+
return process(new String(in, UTF_8)).getBytes(UTF_8);
251+
}
252+
253+
private static Resource.StateListener recoveredListener(CountDownLatch latch) {
254+
return context -> {
255+
if (context.previousState() == Resource.State.RECOVERING
256+
&& context.currentState() == Resource.State.OPEN) {
257+
latch.countDown();
258+
}
259+
};
260+
}
168261
}

0 commit comments

Comments
 (0)