1
1
/*
2
- Copyright (c) 2003, 2016 , Oracle and/or its affiliates. All rights reserved.
2
+ Copyright (c) 2003, 2017 , Oracle and/or its affiliates. All rights reserved.
3
3
4
4
This program is free software; you can redistribute it and/or modify
5
5
it under the terms of the GNU General Public License as published by
@@ -18266,6 +18266,13 @@ Dbdih::master_lcp_fragmentMutex_locked(Signal* signal,
18266
18266
startLcpRoundLoopLab(signal, 0, 0);
18267
18267
}
18268
18268
18269
+
18270
+ //#define DIH_DEBUG_REPLICA_SEARCH
18271
+ #ifdef DIH_DEBUG_REPLICA_SEARCH
18272
+ static Uint32 totalScheduled;
18273
+ static Uint32 totalExamined;
18274
+ #endif
18275
+
18269
18276
void Dbdih::startLcpRoundLoopLab(Signal* signal,
18270
18277
Uint32 startTableId, Uint32 startFragId)
18271
18278
{
@@ -18279,19 +18286,53 @@ void Dbdih::startLcpRoundLoopLab(Signal* signal,
18279
18286
}//if
18280
18287
c_lcpState.currentFragment.tableId = startTableId;
18281
18288
c_lcpState.currentFragment.fragmentId = startFragId;
18289
+ c_lcpState.m_allReplicasQueuedLQH.clear();
18290
+
18291
+ #ifdef DIH_DEBUG_REPLICA_SEARCH
18292
+ totalScheduled = totalExamined = 0;
18293
+ #endif
18294
+
18282
18295
startNextChkpt(signal);
18283
18296
}//Dbdih::startLcpRoundLoopLab()
18284
18297
18285
18298
void Dbdih::startNextChkpt(Signal* signal)
18286
18299
{
18300
+ jam();
18301
+ const bool allReplicaCheckpointsQueued =
18302
+ c_lcpState.m_allReplicasQueuedLQH.
18303
+ contains(c_lcpState.m_participatingLQH);
18304
+
18305
+ if (allReplicaCheckpointsQueued)
18306
+ {
18307
+ jam();
18308
+
18309
+ /**
18310
+ * No need to find new checkpoints to start,
18311
+ * just waiting for completion
18312
+ */
18313
+
18314
+ sendLastLCP_FRAG_ORD(signal);
18315
+ return;
18316
+ }
18317
+
18287
18318
Uint32 lcpId = SYSFILE->latestLCP_ID;
18288
18319
18289
- NdbNodeBitmask busyNodes;
18290
- busyNodes.clear();
18320
+ /* Initialise handledNodes with those already fully queued */
18321
+ NdbNodeBitmask handledNodes = c_lcpState.m_allReplicasQueuedLQH;
18322
+
18323
+ /* Remove any that have failed in the interim */
18324
+ handledNodes.bitAND(c_lcpState.m_participatingLQH);
18325
+
18291
18326
const Uint32 lcpNodes = c_lcpState.m_participatingLQH.count();
18292
18327
18293
18328
bool save = true;
18294
18329
LcpState::CurrentFragment curr = c_lcpState.currentFragment;
18330
+
18331
+ #ifdef DIH_DEBUG_REPLICA_SEARCH
18332
+ Uint32 examined = 0;
18333
+ Uint32 started = 0;
18334
+ Uint32 queued = 0;
18335
+ #endif
18295
18336
18296
18337
while (curr.tableId < ctabFileSize) {
18297
18338
TabRecordPtr tabPtr;
@@ -18314,6 +18355,10 @@ void Dbdih::startNextChkpt(Signal* signal)
18314
18355
18315
18356
jam();
18316
18357
c_replicaRecordPool.getPtr(replicaPtr);
18358
+
18359
+ #ifdef DIH_DEBUG_REPLICA_SEARCH
18360
+ examined++;
18361
+ #endif
18317
18362
18318
18363
NodeRecordPtr nodePtr;
18319
18364
nodePtr.i = replicaPtr.p->procNode;
@@ -18350,6 +18395,10 @@ void Dbdih::startNextChkpt(Signal* signal)
18350
18395
nodePtr.p->noOfStartedChkpt = i + 1;
18351
18396
18352
18397
sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]);
18398
+
18399
+ #ifdef DIH_DEBUG_REPLICA_SEARCH
18400
+ started++;
18401
+ #endif
18353
18402
}
18354
18403
else if (nodePtr.p->noOfQueuedChkpt <
18355
18404
MAX_QUEUED_FRAG_CHECKPOINTS_PER_NODE)
@@ -18369,30 +18418,42 @@ void Dbdih::startNextChkpt(Signal* signal)
18369
18418
nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId;
18370
18419
nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i;
18371
18420
nodePtr.p->noOfQueuedChkpt = i + 1;
18421
+ #ifdef DIH_DEBUG_REPLICA_SEARCH
18422
+ queued++;
18423
+ #endif
18372
18424
}
18373
18425
else
18374
18426
{
18375
18427
jam();
18376
18428
18377
18429
if(save)
18378
18430
{
18379
- /**
18380
- * Stop increasing value on first that was "full"
18381
- */
18382
- c_lcpState.currentFragment = curr;
18383
- save = false;
18384
- }
18431
+ /**
18432
+ * Stop increasing value on first replica that
18433
+ * we could not enqueue, so we don't miss it
18434
+ * next time
18435
+ */
18436
+ c_lcpState.currentFragment = curr;
18437
+ save = false;
18438
+ }
18385
18439
18386
- busyNodes .set(nodePtr.i);
18387
- if(busyNodes .count() == lcpNodes)
18440
+ handledNodes .set(nodePtr.i);
18441
+ if (handledNodes .count() == lcpNodes)
18388
18442
{
18389
- /**
18390
- * There were no possibility to start the local checkpoint
18391
- * and it was not possible to queue it up. In this case we
18392
- * stop the start of local checkpoints until the nodes with a
18393
- * backlog have performed more checkpoints. We will return and
18394
- * will not continue the process of starting any more checkpoints.
18395
- */
18443
+ /**
18444
+ * All participating nodes have either
18445
+ * - Full queues
18446
+ * - All available replica checkpoints queued
18447
+ * (m_allReplicasQueuedLQH)
18448
+ *
18449
+ * Therefore, exit the search here.
18450
+ */
18451
+ #ifdef DIH_DEBUG_REPLICA_SEARCH
18452
+ ndbout_c("Search : All nodes busy. Examined %u Started %u Queued %u",
18453
+ examined, started, queued);
18454
+ totalExamined+= examined;
18455
+ totalScheduled += (started + queued);
18456
+ #endif
18396
18457
return;
18397
18458
}//if
18398
18459
}//if
@@ -18406,6 +18467,31 @@ void Dbdih::startNextChkpt(Signal* signal)
18406
18467
curr.tableId++;
18407
18468
}//if
18408
18469
}//while
18470
+
18471
+ #ifdef DIH_DEBUG_REPLICA_SEARCH
18472
+ ndbout_c("Search : At least one node not busy. Examined %u Started %u Queued %u",
18473
+ examined, started, queued);
18474
+ totalExamined+= examined;
18475
+ totalScheduled += (started + queued);
18476
+ #endif
18477
+
18478
+ /**
18479
+ * Have examined all replicas and attempted to
18480
+ * enqueue as many replica LCPs as possible,
18481
+ * without filling all queues.
18482
+ * This means that some node(s) have no more
18483
+ * replica LCPs to be enqueued.
18484
+ * These are the node(s) which are *not* in
18485
+ * the handled bitmap on this round.
18486
+ * We keep track of these to allow the search
18487
+ * to exit early on future invocations.
18488
+ */
18489
+
18490
+ /* Invert handled nodes to reveal newly finished nodes */
18491
+ handledNodes.bitXOR(c_lcpState.m_participatingLQH);
18492
+
18493
+ /* Add newly finished nodes to the global state */
18494
+ c_lcpState.m_allReplicasQueuedLQH.bitOR(handledNodes);
18409
18495
18410
18496
sendLastLCP_FRAG_ORD(signal);
18411
18497
}//Dbdih::startNextChkpt()
@@ -18874,6 +18960,21 @@ Dbdih::checkLcpAllTablesDoneInLqh(Uint32 line){
18874
18960
ndbout_c("CLEARING 7194");
18875
18961
CLEAR_ERROR_INSERT_VALUE;
18876
18962
}
18963
+
18964
+ #ifdef DIH_DEBUG_REPLICA_SEARCH
18965
+ if (totalScheduled == 0)
18966
+ {
18967
+ totalScheduled = 1;
18968
+ }
18969
+ ndbout_c("LCP complete. Examined %u replicas, scheduled %u. Ratio : %u.%u",
18970
+ totalExamined,
18971
+ totalScheduled,
18972
+ totalExamined/totalScheduled,
18973
+ (10 * (totalExamined -
18974
+ ((totalExamined/totalScheduled) *
18975
+ totalScheduled)))/
18976
+ totalScheduled);
18977
+ #endif
18877
18978
18878
18979
return true;
18879
18980
}
@@ -19059,13 +19160,14 @@ void Dbdih::checkStartMoreLcp(Signal* signal, Uint32 nodeId)
19059
19160
//-------------------------------------------------------------------
19060
19161
19061
19162
sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[startIndex]);
19163
+ return;
19062
19164
}
19063
19165
19064
19166
/* ----------------------------------------------------------------------- */
19065
- // When there are no more outstanding LCP reports and there are no one queued
19066
- // in at least one node, then we are ready to make sure all nodes have at
19067
- // least two outstanding LCP requests per node and at least two queued for
19068
- // sending .
19167
+ // If this node has no checkpoints queued up, then attempt to re-fill the
19168
+ // queues across all nodes.
19169
+ // The search for next replicas can be expensive, so we only do it when
19170
+ // the queues are empty .
19069
19171
/* ----------------------------------------------------------------------- */
19070
19172
startNextChkpt(signal);
19071
19173
}//Dbdih::checkStartMoreLcp()
0 commit comments