Skip to content

Commit 0b07c12

Browse files
committed
GH-9276: Mitigate ConcurrentModificationException in the Mqttv5PahoMessageDrivenChannelAdapter
Fixes: #9276 The current Eclipse Paho client has wrong removal from map logic which leads to the `ConcurrentModificationException` when we unsubscribe from topic. * Catch a `ConcurrentModificationException` `this.mqttClient.unsubscribe()` and just log it as an `error` (cherry picked from commit 12fc353) # Conflicts: # spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
1 parent e99acd8 commit 0b07c12

File tree

1 file changed

+14
-3
lines changed

1 file changed

+14
-3
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.mqtt.inbound;
1818

1919
import java.util.Arrays;
20+
import java.util.ConcurrentModificationException;
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.concurrent.locks.Lock;
@@ -240,7 +241,7 @@ protected void doStop() {
240241
try {
241242
if (this.mqttClient != null && this.mqttClient.isConnected()) {
242243
if (this.connectionOptions.isCleanStart()) {
243-
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
244+
unsubscribe(topics);
244245
// Have to re-subscribe on next start if connection is not lost.
245246
this.readyToSubscribeOnStart = true;
246247

@@ -261,12 +262,22 @@ protected void doStop() {
261262
}
262263
}
263264

265+
private void unsubscribe(String... topics) throws MqttException {
266+
try {
267+
// Catch ConcurrentModificationException: https://github.com/eclipse/paho.mqtt.java/issues/986
268+
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
269+
}
270+
catch (ConcurrentModificationException ex) {
271+
logger.error(ex, () -> "Error unsubscribing from " + Arrays.toString(topics));
272+
}
273+
}
274+
264275
@Override
265276
public void destroy() {
266277
super.destroy();
267278
try {
268279
if (getClientManager() == null && this.mqttClient != null) {
269-
this.mqttClient.close(true);
280+
this.mqttClient.close();
270281
}
271282
}
272283
catch (MqttException ex) {
@@ -301,7 +312,7 @@ public void removeTopic(String... topic) {
301312
this.topicLock.lock();
302313
try {
303314
if (this.mqttClient != null && this.mqttClient.isConnected()) {
304-
this.mqttClient.unsubscribe(topic).waitForCompletion(getCompletionTimeout());
315+
unsubscribe(topic);
305316
}
306317
super.removeTopic(topic);
307318
}

0 commit comments

Comments
 (0)