Skip to content

Commit c764d26

Browse files
committed
concurrency issue session/connection consumer list
1 parent 6f28ccf commit c764d26

File tree

1 file changed

+15
-36
lines changed

1 file changed

+15
-36
lines changed

src/main/java/com/swiftmq/jms/v750/ConnectionImpl.java

Lines changed: 15 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -149,25 +149,22 @@ public boolean isReconnectEnabled() {
149149
}
150150

151151
private void resetSessions(boolean reset) {
152-
for (int i = 0; i < sessionList.size(); i++) {
153-
SessionImpl session = (SessionImpl) sessionList.get(i);
152+
sessionList.forEach(s -> {
153+
SessionImpl session = (SessionImpl) s;
154154
session.setResetInProgress(reset);
155155
if (!reset && connectionState == CONNECTED_STARTED)
156156
session.startSession();
157-
}
157+
});
158158
if (reconnector.isDebug())
159159
System.out.println(new Date() + " " + toString() + ": resetSessions, ccList=" + connectionConsumerList);
160-
for (int i = 0; i < connectionConsumerList.size(); i++) {
161-
ConnectionConsumerImpl cc = (ConnectionConsumerImpl) connectionConsumerList.get(i);
160+
connectionConsumerList.forEach(c -> {
161+
ConnectionConsumerImpl cc = (ConnectionConsumerImpl) c;
162162
cc.setResetInProgress(reset);
163-
}
163+
});
164164
}
165165

166166
private void setSessionBlockState(boolean blocked) {
167-
for (int i = 0; i < sessionList.size(); i++) {
168-
SessionImpl session = (SessionImpl) sessionList.get(i);
169-
session.setBlocked(blocked);
170-
}
167+
sessionList.forEach(session -> ((SessionImpl) session).setBlocked(blocked));
171168
}
172169

173170
private void reconnect() {
@@ -300,12 +297,8 @@ public List getRecreatables() {
300297
for (Iterator iter = tmpQueues.entrySet().iterator(); iter.hasNext(); ) {
301298
list.add(new TemporaryQueueRecreator(this, (QueueImpl) ((Map.Entry) iter.next()).getValue()));
302299
}
303-
for (int i = 0; i < sessionList.size(); i++) {
304-
list.add(sessionList.get(i));
305-
}
306-
for (int i = 0; i < connectionConsumerList.size(); i++) {
307-
list.add(connectionConsumerList.get(i));
308-
}
300+
sessionList.forEach(s -> list.add(s));
301+
connectionConsumerList.forEach(c -> list.add(c));
309302
return list;
310303
}
311304

@@ -892,12 +885,8 @@ public void start() throws JMSException {
892885
clientIdAllowed = false;
893886

894887
if (connectionState == CONNECTED_STOPPED) {
895-
for (int i = 0; i < sessionList.size(); i++) {
896-
((SessionImpl) sessionList.get(i)).startSession();
897-
}
898-
for (int i = 0; i < connectionConsumerList.size(); i++) {
899-
((ConnectionConsumerImpl) connectionConsumerList.get(i)).startConsumer();
900-
}
888+
sessionList.forEach(s -> ((SessionImpl) s).startSession());
889+
connectionConsumerList.forEach(c -> ((ConnectionConsumerImpl) c).startConsumer());
901890
connectionState = CONNECTED_STARTED;
902891
} else if (connectionState == DISCONNECTED) {
903892
throw new IllegalStateException("could not start - connection is disconnected!");
@@ -915,12 +904,8 @@ public void stop() throws JMSException {
915904
clientIdAllowed = false;
916905

917906
if (connectionState == CONNECTED_STARTED) {
918-
for (int i = 0; i < sessionList.size(); i++) {
919-
((SessionImpl) sessionList.get(i)).stopSession();
920-
}
921-
for (int i = 0; i < connectionConsumerList.size(); i++) {
922-
((ConnectionConsumerImpl) connectionConsumerList.get(i)).stopConsumer();
923-
}
907+
sessionList.forEach(s -> ((SessionImpl) s).stopSession());
908+
connectionConsumerList.forEach(c -> ((ConnectionConsumerImpl) c).stopConsumer());
924909
connectionState = CONNECTED_STOPPED;
925910
} else if (connectionState == DISCONNECTED) {
926911
throw new IllegalStateException("could not stop - connection is disconnected!");
@@ -989,15 +974,9 @@ public void cancel(boolean closeReconnector) {
989974
connectionQueue.stopQueue();
990975
cancelled = true;
991976
closed = true;
992-
for (int i = 0; i < sessionList.size(); i++) {
993-
SessionImpl session = (SessionImpl) sessionList.get(i);
994-
session.cancel();
995-
}
977+
sessionList.forEach(s -> ((SessionImpl) s).cancel());
978+
connectionConsumerList.forEach(c -> ((ConnectionConsumerImpl) c).cancel());
996979
sessionList.clear();
997-
for (int i = 0; i < connectionConsumerList.size(); i++) {
998-
ConnectionConsumerImpl cc = (ConnectionConsumerImpl) connectionConsumerList.get(i);
999-
cc.cancel();
1000-
}
1001980
connectionConsumerList.clear();
1002981
TimerRegistry.Singleton().removeTimerListener(keepaliveInterval, this);
1003982
reconnector.invalidateConnection();

0 commit comments

Comments
 (0)