Skip to content

Commit c5fbc60

Browse files
authored
[ETCM-685] Improve pivot block selection (#949)
* [ETCM-685] Add retry logic to PivotBlockSelector * [ETCM-685] Add config entries * [ETCM-685] Add more unit test * [ETCM-685] Fix best block number being pushed back to -ve numbers
1 parent 595c95f commit c5fbc60

File tree

5 files changed

+136
-17
lines changed

5 files changed

+136
-17
lines changed

src/main/resources/conf/base.conf

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,13 @@ mantis {
473473
# Maximum number of retries performed by fast sync when the master peer sends invalid block headers.
474474
# On reaching this limit, it will perform branch resolving.
475475
fast-sync-max-batch-retries = 5
476+
477+
# If the expected pivot block cannot be confirmed from `min-peers-to-choose-pivot-block`,
478+
# the pivot block number is pushed back by the follwing number of blocks and the confirmation process repeats.
479+
pivot-block-number-reset-delta = 50
480+
481+
# Max number of times a pivot block is checked against available best peers before the whole process is restarted.
482+
max-pivot-block-failures-count = 5
476483
}
477484

478485
pruning {

src/main/scala/io/iohk/ethereum/blockchain/sync/fast/PivotBlockSelector.scala

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,19 @@ class PivotBlockSelector(
3535
import PivotBlockSelector._
3636
import syncConfig._
3737

38+
private var pivotBlockRetryCount = 0
39+
3840
override def receive: Receive = idle
3941

4042
private def idle: Receive = handlePeerListMessages orElse { case SelectPivotBlock =>
41-
val election @ ElectionDetails(correctPeers, expectedPivotBlock) = collectVoters
43+
val electionDetails = collectVoters()
44+
startPivotBlockSelection(electionDetails)
45+
}
4246

43-
if (election.isEnoughVoters(minPeersToChoosePivotBlock)) {
47+
private def startPivotBlockSelection(election: ElectionDetails): Unit = {
48+
val ElectionDetails(correctPeers, currentBestBlockNumber, expectedPivotBlock) = election
4449

50+
if (election.hasEnoughVoters(minPeersToChoosePivotBlock)) {
4551
val (peersToAsk, waitingPeers) = correctPeers.splitAt(minPeersToChoosePivotBlock + peersToChoosePivotBlockMargin)
4652

4753
log.info(
@@ -64,17 +70,35 @@ class PivotBlockSelector(
6470
)
6571
} else {
6672
log.info(
67-
"Cannot pick pivot block. Need at least {} peers, but there are only {} which meet the criteria ({} all available at the moment). Retrying in {}",
73+
"Cannot pick pivot block. Need at least {} peers, but there are only {} which meet the criteria " +
74+
"({} all available at the moment).",
6875
minPeersToChoosePivotBlock,
6976
correctPeers.size,
7077
peersToDownloadFrom.size,
78+
currentBestBlockNumber
79+
)
80+
retryPivotBlockSelection(currentBestBlockNumber)
81+
}
82+
}
83+
84+
// Voters are collected until minimum peers to choose pivot block is obtained.
85+
private def retryPivotBlockSelection(pivotBlockNumber: BigInt): Unit = {
86+
pivotBlockRetryCount += 1
87+
if (pivotBlockRetryCount <= maxPivotBlockFailuresCount && pivotBlockNumber > 0) {
88+
val electionDetails = collectVoters(Some(pivotBlockNumber))
89+
startPivotBlockSelection(electionDetails)
90+
} else {
91+
log.debug(
92+
"Cannot pick pivot block. Current best block number [{}]. Retrying in [{}]",
93+
pivotBlockNumber,
7194
startRetryInterval
7295
)
96+
// Restart the whole process.
7397
scheduleRetry(startRetryInterval)
7498
}
7599
}
76100

77-
def runningPivotBlockElection(
101+
private def runningPivotBlockElection(
78102
peersToAsk: Set[PeerId],
79103
waitingPeers: List[PeerId],
80104
pivotBlockNumber: BigInt,
@@ -85,10 +109,7 @@ class PivotBlockSelector(
85109
case MessageFromPeer(blockHeaders: BlockHeaders, peerId) =>
86110
peerEventBus ! Unsubscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peerId)))
87111
val updatedPeersToAsk = peersToAsk - peerId
88-
val targetBlockHeaderOpt =
89-
if (blockHeaders.headers.size != 1) None
90-
else
91-
blockHeaders.headers.find(header => header.number == pivotBlockNumber)
112+
val targetBlockHeaderOpt = blockHeaders.headers.find(header => header.number == pivotBlockNumber)
92113
targetBlockHeaderOpt match {
93114
case Some(targetBlockHeader) =>
94115
val newValue =
@@ -159,6 +180,7 @@ class PivotBlockSelector(
159180
peersLeft + bestHeaderVotes >= minPeersToChoosePivotBlock
160181

161182
private def scheduleRetry(interval: FiniteDuration): Unit = {
183+
pivotBlockRetryCount = 0
162184
scheduler.scheduleOnce(interval, self, SelectPivotBlock)
163185
context become idle
164186
}
@@ -178,7 +200,7 @@ class PivotBlockSelector(
178200
)
179201
}
180202

181-
private def collectVoters: ElectionDetails = {
203+
private def collectVoters(previousBestBlockNumber: Option[BigInt] = None): ElectionDetails = {
182204
val peersUsedToChooseTarget = peersToDownloadFrom.collect {
183205
case (_, PeerWithInfo(peer, PeerInfo(_, _, true, maxBlockNumber, _))) =>
184206
(peer, maxBlockNumber)
@@ -188,12 +210,20 @@ class PivotBlockSelector(
188210
val bestPeerBestBlockNumber = peersSortedByBestNumber.headOption
189211
.map { case (_, bestPeerBestBlockNumber) => bestPeerBestBlockNumber }
190212
.getOrElse(BigInt(0))
191-
val expectedPivotBlock = (bestPeerBestBlockNumber - syncConfig.pivotBlockOffset).max(0)
213+
214+
// The current best block number is pushed back by `pivotBlockNumberResetDelta`
215+
// if this request is issued by the retry logic.
216+
val currentBestBlockNumber: BigInt =
217+
previousBestBlockNumber
218+
.map(_ - pivotBlockNumberResetDelta.max(0))
219+
.getOrElse(bestPeerBestBlockNumber)
220+
221+
val expectedPivotBlock = (currentBestBlockNumber - syncConfig.pivotBlockOffset).max(0)
192222
val correctPeers = peersSortedByBestNumber
193223
.takeWhile { case (_, number) => number >= expectedPivotBlock }
194224
.map { case (peer, _) => peer }
195225

196-
ElectionDetails(correctPeers, expectedPivotBlock)
226+
ElectionDetails(correctPeers, currentBestBlockNumber, expectedPivotBlock)
197227
}
198228
}
199229

@@ -209,7 +239,7 @@ object PivotBlockSelector {
209239
Props(new PivotBlockSelector(etcPeerManager: ActorRef, peerEventBus, syncConfig, scheduler, fastSync, blacklist))
210240

211241
case object SelectPivotBlock
212-
case class Result(targetBlockHeader: BlockHeader)
242+
final case class Result(targetBlockHeader: BlockHeader)
213243

214244
case object ElectionPivotBlockTimeout
215245

@@ -223,7 +253,11 @@ object PivotBlockSelector {
223253
}
224254
}
225255

226-
case class ElectionDetails(participants: List[Peer], expectedPivotBlock: BigInt) {
227-
def isEnoughVoters(minNumberOfVoters: Int): Boolean = participants.size >= minNumberOfVoters
256+
final case class ElectionDetails(
257+
participants: List[Peer],
258+
currentBestBlockNumber: BigInt,
259+
expectedPivotBlock: BigInt
260+
) {
261+
def hasEnoughVoters(minNumberOfVoters: Int): Boolean = participants.size >= minNumberOfVoters
228262
}
229263
}

src/main/scala/io/iohk/ethereum/utils/Config.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,9 @@ object Config {
135135
stateSyncPersistBatchSize: Int,
136136
pivotBlockReScheduleInterval: FiniteDuration,
137137
maxPivotBlockAge: Int,
138-
fastSyncMaxBatchRetries: Int
138+
fastSyncMaxBatchRetries: Int,
139+
pivotBlockNumberResetDelta: Int,
140+
maxPivotBlockFailuresCount: Int
139141
)
140142

141143
object SyncConfig {
@@ -179,7 +181,9 @@ object Config {
179181
stateSyncPersistBatchSize = syncConfig.getInt("state-sync-persist-batch-size"),
180182
pivotBlockReScheduleInterval = syncConfig.getDuration("pivot-block-reschedule-interval").toMillis.millis,
181183
maxPivotBlockAge = syncConfig.getInt("max-pivot-block-age"),
182-
fastSyncMaxBatchRetries = syncConfig.getInt("fast-sync-max-batch-retries")
184+
fastSyncMaxBatchRetries = syncConfig.getInt("fast-sync-max-batch-retries"),
185+
pivotBlockNumberResetDelta = syncConfig.getInt("pivot-block-number-reset-delta"),
186+
maxPivotBlockFailuresCount = syncConfig.getInt("max-pivot-block-failures-count")
183187
)
184188
}
185189
}

src/test/scala/io/iohk/ethereum/blockchain/sync/PivotBlockSelectorSpec.scala

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,78 @@ class PivotBlockSelectorSpec
418418
peerMessageBus.expectMsg(Unsubscribe())
419419
}
420420

421+
it should "retry pivot block election with fallback to lower peer numbers" in new TestSetup {
422+
423+
override val minPeersToChoosePivotBlock = 2
424+
override val peersToChoosePivotBlockMargin = 1
425+
426+
updateHandshakedPeers(
427+
HandshakedPeers(
428+
allPeers
429+
.updated(peer1, allPeers(peer1).copy(maxBlockNumber = 2000))
430+
.updated(peer2, allPeers(peer2).copy(maxBlockNumber = 800))
431+
.updated(peer3, allPeers(peer3).copy(maxBlockNumber = 900))
432+
.updated(peer4, allPeers(peer4).copy(maxBlockNumber = 1400))
433+
)
434+
)
435+
436+
pivotBlockSelector ! SelectPivotBlock
437+
438+
peerMessageBus.expectMsgAllOf(
439+
Subscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer1.id))),
440+
Subscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer4.id)))
441+
)
442+
443+
etcPeerManager.expectMsgAllOf(
444+
EtcPeerManagerActor.SendMessage(GetBlockHeaders(Left(1400), 1, 0, reverse = false), peer1.id),
445+
EtcPeerManagerActor.SendMessage(GetBlockHeaders(Left(1400), 1, 0, reverse = false), peer4.id)
446+
)
447+
etcPeerManager.expectNoMessage()
448+
449+
// Collecting pivot block (for voting)
450+
pivotBlockSelector ! MessageFromPeer(BlockHeaders(Seq(baseBlockHeader.copy(number = 1400))), peer1.id)
451+
pivotBlockSelector ! MessageFromPeer(BlockHeaders(Seq(baseBlockHeader.copy(number = 1400))), peer4.id)
452+
453+
peerMessageBus.expectMsgAllOf(
454+
Unsubscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer1.id))),
455+
Unsubscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer4.id))),
456+
Unsubscribe()
457+
)
458+
peerMessageBus.expectNoMessage()
459+
460+
fastSync.expectMsg(Result(baseBlockHeader.copy(number = 1400)))
461+
}
462+
463+
it should "restart pivot block selection after `maxPivotBlockFailuresCount` is reached" in new TestSetup {
464+
465+
override val minPeersToChoosePivotBlock = 2
466+
override val peersToChoosePivotBlockMargin = 1
467+
468+
updateHandshakedPeers(
469+
HandshakedPeers(
470+
allPeers
471+
.updated(peer1, allPeers(peer1).copy(maxBlockNumber = 2000))
472+
.updated(peer2, allPeers(peer2).copy(maxBlockNumber = 800))
473+
.updated(peer3, allPeers(peer3).copy(maxBlockNumber = 900))
474+
.updated(peer4, allPeers(peer4).copy(maxBlockNumber = 1000))
475+
)
476+
)
477+
478+
pivotBlockSelector ! SelectPivotBlock
479+
480+
peerMessageBus.expectNoMessage()
481+
482+
updateHandshakedPeers(HandshakedPeers(threeAcceptedPeers))
483+
484+
time.advance(syncConfig.startRetryInterval)
485+
486+
peerMessageBus.expectMsgAllOf(
487+
Subscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer1.id))),
488+
Subscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer2.id))),
489+
Subscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peer3.id)))
490+
)
491+
}
492+
421493
class TestSetup extends TestSyncConfig {
422494

423495
val blacklist: Blacklist = CacheBasedBlacklist.empty(100)

src/test/scala/io/iohk/ethereum/blockchain/sync/TestSyncConfig.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ trait TestSyncConfig extends SyncConfigBuilder {
4444
stateSyncPersistBatchSize = 1000,
4545
pivotBlockReScheduleInterval = 1.second,
4646
maxPivotBlockAge = 96,
47-
fastSyncMaxBatchRetries = 3
47+
fastSyncMaxBatchRetries = 3,
48+
pivotBlockNumberResetDelta = 50,
49+
maxPivotBlockFailuresCount = 3
4850
)
4951

5052
override lazy val syncConfig: SyncConfig = defaultSyncConfig

0 commit comments

Comments
 (0)