Skip to content

Commit 32e2082

Browse files
author
Craig L Russell
committed
WL#9545 Clusterj automatic reconnect on cluster failure
post-push fix for pb2 failures clusterj-core/src/main/java/com/mysql/clusterj/core/store/ClusterConnection.java add method closing() to separate marking dbs as closing versus force close clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java implement closing() to mark dbs as closing clusterj-core/src/main/java/com/mysql/clusterj/core/SessionFactoryImpl.java use closing() method to mark all cluster connections then close them clusterj-test/src/main/java/testsuite/clusterj/ReconnectTest.java change misbehaving thread to catch any exception and terminate instead of looping
1 parent 400d666 commit 32e2082

File tree

5 files changed

+49
-24
lines changed

5 files changed

+49
-24
lines changed

storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionFactoryImpl.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,19 @@ public void reconnect(int timeout) {
679679
reconnectThread.start();
680680
}
681681
}
682+
683+
protected static int countSessions(SessionFactoryImpl factory) {
684+
return countSessions(factory.getConnectionPoolSessionCounts());
685+
}
686+
687+
protected static int countSessions(List<Integer> sessionCounts) {
688+
int result = 0;
689+
for (int i: sessionCounts) {
690+
result += i;
691+
}
692+
return result;
693+
}
694+
682695
protected static class ReconnectThread implements Runnable {
683696
SessionFactoryImpl factory;
684697
ReconnectThread(SessionFactoryImpl factory) {
@@ -689,23 +702,25 @@ public void run() {
689702
boolean done = false;
690703
int iterations = factory.CLUSTER_RECONNECT_TIMEOUT;
691704
while (!done && iterations-- > 0) {
692-
done = true;
693-
sessionCounts = factory.getConnectionPoolSessionCounts();
694-
for (int i: sessionCounts) {
695-
if (i != 0) {
696-
done = false;
697-
}
698-
}
705+
done = countSessions(sessionCounts) == 0;
699706
if (!done) {
700707
logger.info(local.message("INFO_Reconnect_wait", sessionCounts.toString()));
701708
sleep(1000);
709+
sessionCounts = factory.getConnectionPoolSessionCounts();
702710
}
703711
}
704712
if (!done) {
705713
// timed out waiting for sessions to close
706714
logger.warn(local.message("WARN_Reconnect_timeout", sessionCounts.toString()));
707715
}
708716
logger.warn(local.message("WARN_Reconnect_closing"));
717+
// mark all cluster connections as closing
718+
for (ClusterConnection clusterConnection: factory.pooledConnections) {
719+
clusterConnection.closing();
720+
}
721+
// wait for connections to close on their own
722+
sleep(1000);
723+
// hard close connections that didn't close on their own
709724
for (ClusterConnection clusterConnection: factory.pooledConnections) {
710725
clusterConnection.close();
711726
}

storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/ClusterConnection.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2010, 2015, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2010, 2017, Oracle and/or its affiliates. All rights reserved.
33
*
44
* This program is free software; you can redistribute it and/or modify
55
* it under the terms of the GNU General Public License as published by
@@ -30,6 +30,8 @@ public interface ClusterConnection {
3030

3131
public void waitUntilReady(int connectTimeoutBefore, int connectTimeoutAfter);
3232

33+
public void closing();
34+
3335
public void close();
3436

3537
public int dbCount();

storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/ReconnectTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.TreeSet;
2727

2828
import com.mysql.clusterj.ClusterJDatastoreException;
29+
import com.mysql.clusterj.ClusterJException;
2930
import com.mysql.clusterj.ClusterJUserException;
3031
import com.mysql.clusterj.Query;
3132
import com.mysql.clusterj.Session;
@@ -35,7 +36,6 @@
3536
import testsuite.clusterj.model.Order;
3637
import testsuite.clusterj.model.OrderLine;
3738

38-
@org.junit.Ignore("disable test until diagnosis of failure")
3939
public class ReconnectTest extends AbstractClusterJModelTest {
4040

4141
@Override
@@ -240,11 +240,10 @@ public void run() {
240240
Query<OrderLine> queryOrder = session.createQuery(queryOrderType);
241241
queryOrder.setParameter("orderId", 0);
242242
queryOrder.getResultList();
243-
} catch (ClusterJUserException cjue) {
243+
sleep(100);
244+
} catch (ClusterJException cje) {
245+
// the exception might be any of several exceptions when disconnecting/reconnecting
244246
done = true;
245-
if (!cjue.getMessage().contains("this Db is closing")) {
246-
if (getDebug()) { System.out.print(" Misbehaving caught exception: " + cjue.getMessage()); }
247-
}
248247
}
249248
}
250249
}

storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ public class ClusterConnectionImpl
103103
/** The dictionary used to create NdbRecords */
104104
Dictionary dictionaryForNdbRecord = null;
105105

106+
private boolean isClosing = false;
107+
106108
private long[] autoIncrement;
107109

108110
private static final String USE_SMART_VALUE_HANDLER_NAME = "com.mysql.clusterj.UseSmartValueHandler";
@@ -223,23 +225,31 @@ protected static void throwError(Object returnCode, Ndb_cluster_connection clust
223225
throw new ClusterJDatastoreException(msg);
224226
}
225227

226-
public void close() {
228+
public void closing() {
229+
this.isClosing = true;
227230
if (clusterConnection != null) {
228231
logger.info(local.message("INFO_Close_Cluster_Connection", connectString, nodeId));
229232
if (dbs.size() > 0) {
230233
for (DbImpl db: dbs.keySet()) {
231234
// mark all dbs as closing so no more operations will start
232235
db.closing();
233236
}
234-
dbForNdbRecord.closing();
235-
236-
if (dbs.size() != 0) {
237-
// sleep for 1000 milliseconds to allow operations in other threads to terminate
238-
sleep(1000);
239-
Map<Db, Object> dbsToClose = new IdentityHashMap<Db, Object>(dbs);
240-
for (Db db: dbsToClose.keySet()) {
241-
db.close();
242-
}
237+
}
238+
dbForNdbRecord.closing();
239+
}
240+
}
241+
242+
public void close() {
243+
if (clusterConnection != null) {
244+
if (!this.isClosing) {
245+
this.closing();
246+
// sleep for 1000 milliseconds to allow operations in other threads to terminate
247+
sleep(1000);
248+
}
249+
if (dbs.size() != 0) {
250+
Map<Db, Object> dbsToClose = new IdentityHashMap<Db, Object>(dbs);
251+
for (Db db: dbsToClose.keySet()) {
252+
db.close();
243253
}
244254
}
245255
for (NdbRecordImpl ndbRecord: ndbRecordImplMap.values()) {

storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/ReconnectTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package testsuite.clusterj.tie;
1919

20-
@org.junit.Ignore("disable test until diagnosis of failure")
2120
public class ReconnectTest extends testsuite.clusterj.ReconnectTest {
2221

2322
}

0 commit comments

Comments
 (0)