Skip to content

Commit cb92681

Browse files
authored
Merge pull request #83 from rabbitmq/fix-stream-consumer-recovery
Recover connection if consumer recovery closes it
2 parents 3ec6df8 + cff095d commit cb92681

File tree

2 files changed

+8
-3
lines changed

2 files changed

+8
-3
lines changed

.github/workflows/test-pr.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ jobs:
2424
cache: 'maven'
2525
- name: Start broker
2626
run: ci/start-broker.sh
27-
env:
28-
RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:main'
2927
- name: Start toxiproxy
3028
run: ci/start-toxiproxy.sh
3129
- name: Display Java version

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ private void recoverAfterConnectionFailure(
388388
this.state(OPEN);
389389
} catch (Exception ex) {
390390
// likely InterruptedException or IO exception
391-
LOGGER.info(
391+
LOGGER.warn(
392392
"Error while trying to recover topology for connection '{}': {}",
393393
this.name(),
394394
ex.getMessage());
@@ -481,6 +481,13 @@ private void recoverConsumers() throws InterruptedException {
481481
consumer.recoverAfterConnectionFailure();
482482
consumer.state(OPEN);
483483
LOGGER.debug("Recovered consumer {} (queue '{}')", consumer.id(), consumer.queue());
484+
} catch (AmqpException.AmqpConnectionException ex) {
485+
LOGGER.warn(
486+
"Connection error while trying to recover consumer {} (queue '{}'), restarting recovery",
487+
consumer.id(),
488+
consumer.queue(),
489+
ex);
490+
throw ex;
484491
} catch (Exception ex) {
485492
LOGGER.warn(
486493
"Error while trying to recover consumer {} (queue '{}')",

0 commit comments

Comments
 (0)