Skip to content

Commit 0f8a275

Browse files
author
Craig L Russell
committed
WL#9545 Clusterj automatic reconnect on cluster failure
post-push fix for pb2 failures
1 parent c41faff commit 0f8a275

File tree

3 files changed

+95
-47
lines changed

3 files changed

+95
-47
lines changed

storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionFactoryImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -663,12 +663,16 @@ public void reconnect(int timeout) {
663663
synchronized(this) {
664664
// if already restarting, do nothing
665665
if (State.Reconnecting.equals(state)) {
666+
logger.warn(local.message("WARN_Reconnect_already"));
666667
return;
667668
}
668669
CLUSTER_RECONNECT_TIMEOUT = timeout;
669670
if (timeout == 0) {
671+
logger.warn(local.message("WARN_Reconnect_timeout0"));
670672
return;
671673
}
674+
// set the reconnect timeout to the current value
675+
CLUSTER_RECONNECT_TIMEOUT = timeout;
672676
// set the state of this session factory to reconnecting
673677
state = State.Reconnecting;
674678
// create a thread to manage the reconnect operation
@@ -677,6 +681,7 @@ public void reconnect(int timeout) {
677681
// create reconnect thread
678682
reconnectThread = new Thread(threadGroup, new ReconnectThread(this));
679683
reconnectThread.start();
684+
logger.warn(local.message("WARN_Reconnect_started"));
680685
}
681686
}
682687

@@ -725,10 +730,10 @@ public void run() {
725730
clusterConnection.close();
726731
}
727732
factory.pooledConnections.clear();
728-
logger.info(local.message("INFO_Reconnect_creating"));
733+
logger.warn(local.message("WARN_Reconnect_creating"));
729734
factory.createClusterConnectionPool();
730735
factory.verifyConnectionPool();
731-
logger.info(local.message("INFO_Reconnect_reopening"));
736+
logger.warn(local.message("WARN_Reconnect_reopening"));
732737
synchronized(factory) {
733738
factory.state = State.Open;
734739
}

storage/ndb/clusterj/clusterj-core/src/main/resources/com/mysql/clusterj/core/Bundle.properties

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,11 @@ ERR_Session_Counts_Wrong_Creating_Factory:Session counts are wrong creating fact
153153
ERR_Multiple_Column_Name:Column name for interface {0}, field {1} can only be specified once: choose {2} or {3}.
154154
ERR_SessionFactory_not_open:SessionFactory is not open.
155155
WARN_Reconnect:Reconnecting SessionFactory with sessions: {0}
156+
WARN_Reconnect_already:Reconnect called while already reconnecting; ignoring.
157+
WARN_Reconnect_timeout0:Reconnect called with timeout == 0; ignoring.
158+
WARN_Reconnect_started:Reconnect started reconnect thread.
156159
INFO_Reconnect_wait:Reconnect waiting for sessions to close: {0}
157160
WARN_Reconnect_timeout:Reconnect timeout waiting for sessions to close: {0}
158161
WARN_Reconnect_closing:Reconnect closing all sessions and connections.
159-
INFO_Reconnect_creating:Reconnect creating new connection pool.
160-
INFO_Reconnect_reopening:Reconnect reopening SessionFactory.
162+
WARN_Reconnect_creating:Reconnect creating new connection pool.
163+
WARN_Reconnect_reopening:Reconnect reopening SessionFactory.

storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/ReconnectTest.java

Lines changed: 83 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,15 @@
3636
import testsuite.clusterj.model.Order;
3737
import testsuite.clusterj.model.OrderLine;
3838

39-
@org.junit.Ignore("disable test until diagnosis of failure")
39+
//@org.junit.Ignore("disable test until diagnosis of failure")
4040
public class ReconnectTest extends AbstractClusterJModelTest {
4141

4242
@Override
4343
protected boolean getDebug() {
4444
return false;
4545
}
4646

47-
private long sleepBeforeReconnectMillis = 20;
47+
private long sleepBeforeReconnectMillis = 1;
4848
private int numberOfThreads = 30;
4949
private int numberOfNewCustomersPerThread = 5;
5050
private int numberOfNewOrdersPerNewCustomer = 5;
@@ -136,19 +136,21 @@ public void test() {
136136
// create uncaught exception handler
137137
MyUncaughtExceptionHandler uncaughtExceptionHandler = new MyUncaughtExceptionHandler();
138138
Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
139-
// create all threads
139+
// create the thread that misbehaves
140+
Thread misbehaving = new Thread(threadGroup, new Misbehaving());
141+
threads.add(misbehaving);
142+
// create all normal threads
140143
for (int i = 0; i < numberOfThreads ; ++i) {
141144
Thread thread = new Thread(threadGroup, new StuffToDo());
142145
threads.add(thread);
146+
}
147+
// start all threads
148+
for (Thread thread: threads) {
143149
thread.start();
144150
}
145-
// create the thread that misbehaves
146-
Thread misbehaving = new Thread(threadGroup, new Misbehaving());
147-
threads.add(misbehaving);
148-
misbehaving.start();
149151
// tell the SessionFactory to reconnect
150152
sleep(sleepBeforeReconnectMillis);
151-
sessionFactory.reconnect();
153+
sessionFactory.reconnect(5);
152154
// wait until all threads have finished
153155
for (Thread t: threads) {
154156
try {
@@ -209,12 +211,13 @@ public void test() {
209211
actualTotal += orderLine.getTotalValue();
210212
}
211213
errorIfNotEqual("For order " + orderId + ", order value does not equal sum of order line values."
212-
+ " orderLines: " + messages.toString(),
214+
+ " orderLines: \n" + messages.toString(),
213215
expectedTotal, actualTotal);
214216
}
215217
done = true;
216218
} catch (Throwable t) {
217219
if (getDebug()) { System.out.println("summarize for the record caught " + t.getMessage()); }
220+
sleep(1000);
218221
}
219222
}
220223
failOnError();
@@ -276,27 +279,39 @@ public void run() {
276279
session.close();
277280
done = true;
278281
} catch (ClusterJUserException cjue) {
282+
if (getDebug()) { System.out.println("StuffToDo: query orderId caught " + cjue.getMessage()); }
279283
if (cjue.getMessage().contains("SessionFactory is not open")) {
280-
sleep(1000);
284+
sleep(300);
281285
}
282286
}
283287
}
284288
int i = 0;
285289
while (i < numberOfNewCustomersPerThread) {
286290
// create a new customer
287291
try (Session localSession = sessionFactory.getSession()) {
292+
Customer customer = null;
293+
List<Customer> newCustomers = new ArrayList<Customer>(numberOfNewCustomersPerThread);
294+
Order order = null;
295+
List<Order> newOrders = new ArrayList<Order>(
296+
numberOfNewCustomersPerThread * numberOfNewOrdersPerNewCustomer);
288297
localSession.currentTransaction().begin();
289-
createCustomer(localSession, String.valueOf(Thread.currentThread().getId()));
290-
for (int j = 0; j < numberOfNewOrdersPerNewCustomer ; ++j) {
291-
// create a new order
292-
createOrder(localSession, myRandom);
293-
}
294-
++i;
298+
customer = createCustomer(localSession, String.valueOf(Thread.currentThread().getId()));
299+
newCustomers.add(customer);
300+
int customerId = customer.getId();
301+
for (int j = 0; j < numberOfNewOrdersPerNewCustomer ; ++j) {
302+
// create a new order for the customer
303+
newOrders.add(createOrder(localSession, customerId, myRandom));
304+
}
305+
++i;
295306
localSession.currentTransaction().commit();
307+
// add new customers and orders only if successful
308+
addCustomers(newCustomers);
309+
addOrders(newOrders);
296310
} catch (ClusterJUserException cjue) {
311+
if (getDebug()) { System.out.println("StuffToDo: create customer caught " + cjue.getMessage()); }
297312
if (cjue.getMessage().contains("SessionFactory is not open")) {
298313
incrementRetryCount();
299-
sleep(1000);
314+
sleep(300);
300315
}
301316
}
302317
}
@@ -306,13 +321,16 @@ public void run() {
306321
try (Session localSession = sessionFactory.getSession()) {
307322
// update an order
308323
localSession.currentTransaction().begin();
309-
updateOrder(localSession, myRandom, queryOrderType);
324+
Order order = updateOrder(localSession, myRandom, queryOrderType);
310325
localSession.currentTransaction().commit();
326+
// put the updated order back
327+
addOrder(order);
311328
++i;
312329
} catch (ClusterJUserException cjue) {
330+
if (getDebug()) { System.out.println("StuffToDo: update orders caught " + cjue.getMessage()); }
313331
if (cjue.getMessage().contains("SessionFactory is not open")) {
314332
incrementRetryCount();
315-
sleep(1000);
333+
sleep(300);
316334
}
317335
}
318336
}
@@ -327,8 +345,9 @@ public void run() {
327345
done = true;
328346
} catch (ClusterJUserException cjue) {
329347
if (cjue.getMessage().contains("SessionFactory is not open")) {
348+
if (getDebug()) { System.out.println("StuffToDo: delete order caught " + cjue.getMessage()); }
330349
incrementRetryCount();
331-
sleep(1000);
350+
sleep(300);
332351
} else {
333352
System.out.println("deleteOrder threw " + cjue.getMessage());
334353
}
@@ -340,44 +359,45 @@ public void run() {
340359
/** Create a new customer.
341360
* @param session the session
342361
* @param threadId the thread id of the creating thread
362+
* @return the new customer
343363
*/
344-
private void createCustomer(Session session, String threadId) {
364+
private Customer createCustomer(Session session, String threadId) {
345365
Customer customer = session.newInstance(Customer.class);
346366
int id = getNextCustomerId();
347367
customer.setId(id);
348368
customer.setName("Customer number " + id + " thread " + threadId);
349369
customer.setMagic(id * 10000);
350-
session.makePersistent(customer); // autocommit for this
351-
addCustomer(customer);
370+
session.makePersistent(customer);
371+
return customer;
352372
}
353373

354-
/** Create a new order. Add a new order with a random number of order lines
374+
/** Create a new order for a specific customer with a random number of order lines
355375
* and a random unit price and quantity.
356376
* @param session the session
357-
* @param random a random number generator
377+
* @param customer the customer
378+
* @param random the random number generator
379+
* @return the new order
358380
*/
359-
public void createOrder(Session session, Random random) {
381+
public Order createOrder(Session session, int customerId, Random random) {
360382
// get an order number
361383
int orderid = getNextOrderId();
362384
Order order = session.newInstance(Order.class);
363385
order.setId(orderid);
364-
// get a random customer number
365-
int customerId = random .nextInt(nextCustomerId);
366386
order.setCustomerId(customerId);
367387
order.setDescription("Order " + orderid + " for Customer " + customerId);
368388
Double orderValue = 0.0d;
369389
// now create some order lines
370-
int numberOfOrderLines = random.nextInt(maximumOrderLinesPerOrder);
390+
int numberOfOrderLines = random.nextInt(maximumOrderLinesPerOrder) + 1;
371391
if (getDebug()) System.out.println("Create Order " + orderid
372392
+ " with numberOfOrderLines: " + numberOfOrderLines);
373393
for (int i = 0; i < numberOfOrderLines; ++i) {
374394
int orderLineNumber = getNextOrderLineId();
375395
OrderLine orderLine = session.newInstance(OrderLine.class);
376396
orderLine.setId(orderLineNumber);
377397
orderLine.setOrderId(orderid);
378-
long quantity = random.nextInt(maximumQuantityPerOrderLine);
398+
long quantity = random.nextInt(maximumQuantityPerOrderLine) + 1;
379399
orderLine.setQuantity(quantity);
380-
float unitPrice = ((float)random.nextInt(maximumUnitPrice)) / 4;
400+
float unitPrice = (1.0f + (float)random.nextInt(maximumUnitPrice)) / 4;
381401
orderLine.setUnitPrice(unitPrice);
382402
double orderLineValue = unitPrice * quantity;
383403
orderValue += orderLineValue;
@@ -390,39 +410,41 @@ public void createOrder(Session session, Random random) {
390410
}
391411
order.setValue(orderValue);
392412
session.persist(order);
393-
addOrder(order);
413+
return order;
394414
}
395415

396416
/** Update an order; change one or more order lines
397417
* @param session the session
398418
* @param random a random number generator
399419
* @param query
400420
*/
401-
public void updateOrder(Session session, Random random, QueryDomainType<OrderLine> queryOrderType) {
421+
public Order updateOrder(Session session, Random random, QueryDomainType<OrderLine> queryOrderType) {
402422
Order order = null;
403423
// pick an order to update; prevent anyone else from updating the same order
404424
order = removeOrderFromOrdersCollection(random);
405-
if (order == null) { return; }
425+
if (order == null) { return null; }
406426
int orderId = order.getId();
407427
// replace order with its persistent representation
408428
order = session.find(Order.class, orderId);
409-
if (order == null) { return; }
429+
if (order == null) { return null; }
410430
List<OrderLine> orderLines = getOrderLines(session, queryOrderType, orderId);
411431
int numberOfOrderLines = orderLines.size();
412432
OrderLine orderLine = null;
413433
double orderValue = order.getValue();
434+
if (getDebug()) { System.out.println("updateOrder previous orderValue: " + orderValue); }
414435
if (numberOfOrderLines > 0) {
415436
int index = random.nextInt(numberOfOrderLines);
416437
orderLine = orderLines.get(index);
417438
orderValue -= orderLine.getTotalValue();
418439
updateOrderLine(orderLine, random);
419440
orderValue += orderLine.getTotalValue();
420441
}
442+
if (getDebug()) { System.out.println("updateOrder updated orderValue: " + orderValue); }
421443
order.setValue(orderValue);
422444
session.updatePersistent(orderLine);
423445
session.updatePersistent(order);
424-
// put order back now that we're done updating it
425-
addOrder(order);
446+
// return order so it can be put back after committing the transaction
447+
return order;
426448
}
427449

428450
/** Update an order line by randomly changing unit price and quantity.
@@ -492,17 +514,26 @@ private void removeOrderLinesFromOrderLinesCollection(Collection<OrderLine> orde
492514
}
493515
}
494516

495-
/** Add a new customer to the list of customers
496-
* @param customer
517+
/** Add a new customer to the list of customers (multithread safe)
518+
* @param customer the customer to add
497519
*/
498520
private void addCustomer(Customer customer) {
499521
synchronized(customers) {
500522
customers.add(customer);
501523
}
502524
}
503525

526+
/** Add new customers to the list of customers (multithread safe)
527+
* @param newCustomers the customers to add
528+
*/
529+
private void addCustomers(Collection<Customer> newCustomers) {
530+
synchronized(customers) {
531+
customers.addAll(newCustomers);
532+
}
533+
}
534+
504535
/** Get the next customer number (multithread safe)
505-
* @return
536+
* @return the next customer id
506537
*/
507538
private int getNextCustomerId() {
508539
synchronized(customers) {
@@ -512,7 +543,7 @@ private int getNextCustomerId() {
512543
}
513544

514545
/** Get the next order number (multithread safe)
515-
* @return
546+
* @return the next order number
516547
*/
517548
private int getNextOrderId() {
518549
synchronized(orders) {
@@ -531,7 +562,7 @@ private int getNextOrderLineId() {
531562
}
532563
}
533564

534-
/** Add an order to the list of orders.
565+
/** Add an order to the list of orders. (multithread safe)
535566
* @param order the order
536567
*/
537568
private void addOrder(Order order) {
@@ -540,7 +571,16 @@ private void addOrder(Order order) {
540571
}
541572
}
542573

543-
/** Add an order line to the list of order lines.
574+
/** Add a collection of orders to the list of orders. (multithread safe)
575+
* @param newOrders the collection of orders
576+
*/
577+
private void addOrders(Collection<Order> newOrders) {
578+
synchronized(orders) {
579+
orders.addAll(newOrders);
580+
}
581+
}
582+
583+
/** Add an order line to the list of order lines. (multithread safe)
544584
* @param orderLine the order line
545585
*/
546586
private void addOrderLine(OrderLine orderLine) {

0 commit comments

Comments
 (0)