@@ -4,6 +4,7 @@ import java.time.Instant
4
4
5
5
import akka .actor ._
6
6
import akka .util .ByteString
7
+ import cats .data .NonEmptyList
7
8
import io .iohk .ethereum .blockchain .sync .FastSyncReceiptsValidator .ReceiptsValidationResult
8
9
import io .iohk .ethereum .blockchain .sync .PeerRequestHandler .ResponseReceived
9
10
import io .iohk .ethereum .blockchain .sync .SyncBlocksValidator .BlockBodyValidationResult
@@ -69,29 +70,9 @@ class FastSync(
69
70
}
70
71
71
72
def startWithState (syncState : SyncState ): Unit = {
72
- if (syncState.updatingPivotBlock) {
73
- log.info(s " FastSync interrupted during pivot block update, choosing new pivot block " )
74
- val syncingHandler = new SyncingHandler (syncState)
75
- val pivotBlockSelector = context.actorOf(
76
- PivotBlockSelector .props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self),
77
- " pivot-block-selector"
78
- )
79
- pivotBlockSelector ! PivotBlockSelector .SelectPivotBlock
80
- context become syncingHandler.waitingForPivotBlockUpdate(ImportedLastBlock )
81
- } else {
82
- log.info(
83
- s " Starting block synchronization (fast mode), pivot block ${syncState.pivotBlock.number}, " +
84
- s " block to download to ${syncState.safeDownloadTarget}"
85
- )
86
- val syncingHandler = new SyncingHandler (syncState)
87
- context become syncingHandler.receive
88
- if (syncState.isBlockchainWorkFinished && ! syncState.stateSyncFinished) {
89
- log.info(s " Blockchain sync was completed, starting state sync to block ${syncState.pivotBlock.idTag}" )
90
- // chain has already been downloaded we can start state sync
91
- syncingHandler.startStateSync(syncState.pivotBlock)
92
- }
93
- syncingHandler.processSyncing()
94
- }
73
+ log.info(s " Starting with existing state and asking for new pivot block " )
74
+ val syncingHandler = new SyncingHandler (syncState)
75
+ syncingHandler.askForPivotBlockUpdate(NodeRestart )
95
76
}
96
77
97
78
def startFromScratch (): Unit = {
@@ -115,14 +96,17 @@ class FastSync(
115
96
pivotBlockHeader,
116
97
safeDownloadTarget = pivotBlockHeader.number + syncConfig.fastSyncBlockValidationX
117
98
)
118
- startWithState(initialSyncState)
99
+ val syncingHandler = new SyncingHandler (initialSyncState)
100
+ context.become(syncingHandler.receive)
101
+ syncingHandler.processSyncing()
119
102
}
120
103
}
121
104
122
105
// scalastyle:off number.of.methods
123
106
private class SyncingHandler (initialSyncState : SyncState ) {
124
107
125
108
private val BlockHeadersHandlerName = " block-headers-request-handler"
109
+ // not part of syncstate as we do not want to persist is.
126
110
private var stateSyncRestartRequested = false
127
111
128
112
private var requestedHeaders : Map [Peer , BigInt ] = Map .empty
@@ -158,13 +142,11 @@ class FastSync(
158
142
private val heartBeat =
159
143
scheduler.scheduleWithFixedDelay(syncRetryInterval, syncRetryInterval * 2 , self, ProcessSyncing )
160
144
161
- def startStateSync (targetBlockHeader : BlockHeader ): Unit = {
162
- syncStateScheduler ! StartSyncingTo (targetBlockHeader.stateRoot, targetBlockHeader.number)
163
- }
164
-
165
145
def receive : Receive = handleCommonMessages orElse {
166
146
case UpdatePivotBlock (state) => updatePivotBlock(state)
167
- case WaitingForNewTargetBlock => updatePivotBlock(ImportedLastBlock )
147
+ case WaitingForNewTargetBlock =>
148
+ log.info(" State sync stopped until receiving new pivot block" )
149
+ updatePivotBlock(ImportedLastBlock )
168
150
case ProcessSyncing => processSyncing()
169
151
case PrintStatus => printStatus()
170
152
case PersistSyncState => persistSyncState()
@@ -206,49 +188,65 @@ class FastSync(
206
188
handleRequestFailure(assignedHandlers(ref), ref, " Unexpected error" )
207
189
}
208
190
209
- def waitingForPivotBlockUpdate (processState : FinalBlockProcessingResult ): Receive = handleCommonMessages orElse {
191
+ def askForPivotBlockUpdate (updateReason : PivotBlockUpdateReason ): Unit = {
192
+ syncState = syncState.copy(updatingPivotBlock = true )
193
+ log.info(" Asking for new pivot block" )
194
+ val pivotBlockSelector = {
195
+ context.actorOf(
196
+ PivotBlockSelector .props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self)
197
+ )
198
+ }
199
+ pivotBlockSelector ! PivotBlockSelector .SelectPivotBlock
200
+ context become waitingForPivotBlockUpdate(updateReason)
201
+ }
202
+
203
+ def reScheduleAskForNewPivot (updateReason : PivotBlockUpdateReason ): Unit = {
204
+ syncState = syncState.copy(pivotBlockUpdateFailures = syncState.pivotBlockUpdateFailures + 1 )
205
+ scheduler.scheduleOnce(syncConfig.pivotBlockReScheduleInterval, self, UpdatePivotBlock (updateReason))
206
+ }
207
+
208
+ def waitingForPivotBlockUpdate (updateReason : PivotBlockUpdateReason ): Receive = handleCommonMessages orElse {
210
209
case PivotBlockSelector .Result (pivotBlockHeader) =>
211
210
log.info(s " New pivot block with number ${pivotBlockHeader.number} received " )
212
211
if (pivotBlockHeader.number >= syncState.pivotBlock.number) {
213
- updatePivotSyncState(processState, pivotBlockHeader)
214
- syncState = syncState.copy(updatingPivotBlock = false )
215
- context become this .receive
216
- processSyncing()
212
+ if (pivotBlockHeader.number == syncState.pivotBlock.number && updateReason.nodeRestart) {
213
+ // it can happen after quick node restart than pivot block has not changed in the network. To keep whole
214
+ // fast sync machinery running as expected we need to make sure that we will receive better pivot than current
215
+ log.info(" Received stale pivot after restart, asking for new pivot" )
216
+ reScheduleAskForNewPivot(updateReason)
217
+ } else {
218
+ updatePivotSyncState(updateReason, pivotBlockHeader)
219
+ syncState = syncState.copy(updatingPivotBlock = false )
220
+ context become this .receive
221
+ processSyncing()
222
+ }
217
223
} else {
218
- syncState = syncState.copy(pivotBlockUpdateFailures = syncState.pivotBlockUpdateFailures + 1 )
219
- scheduler.scheduleOnce(syncRetryInterval, self, UpdatePivotBlock (processState) )
224
+ log.info( " Received target block is older than old one, re-scheduling asking for new one " )
225
+ reScheduleAskForNewPivot(updateReason )
220
226
}
221
227
222
228
case PersistSyncState => persistSyncState()
223
229
224
230
case UpdatePivotBlock (state) => updatePivotBlock(state)
225
231
}
226
232
227
- private def updatePivotBlock (state : FinalBlockProcessingResult ): Unit = {
233
+ private def updatePivotBlock (state : PivotBlockUpdateReason ): Unit = {
228
234
if (syncState.pivotBlockUpdateFailures <= syncConfig.maximumTargetUpdateFailures) {
229
235
if (assignedHandlers.nonEmpty || syncState.blockChainWorkQueued) {
230
236
log.info(s " Still waiting for some responses, rescheduling pivot block update " )
231
237
scheduler.scheduleOnce(1 .second, self, UpdatePivotBlock (state))
232
238
processSyncing()
233
239
} else {
234
- syncState = syncState.copy(updatingPivotBlock = true )
235
- log.info(" Asking for new pivot block" )
236
- val pivotBlockSelector = {
237
- context.actorOf(
238
- PivotBlockSelector .props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self)
239
- )
240
- }
241
- pivotBlockSelector ! PivotBlockSelector .SelectPivotBlock
242
- context become waitingForPivotBlockUpdate(state)
240
+ askForPivotBlockUpdate(state)
243
241
}
244
242
} else {
245
243
log.warning(s " Sync failure! Number of pivot block update failures reached maximum. " )
246
244
sys.exit(1 )
247
245
}
248
246
}
249
247
250
- private def updatePivotSyncState (state : FinalBlockProcessingResult , pivotBlockHeader : BlockHeader ): Unit =
251
- state match {
248
+ private def updatePivotSyncState (updateReason : PivotBlockUpdateReason , pivotBlockHeader : BlockHeader ): Unit =
249
+ updateReason match {
252
250
case ImportedLastBlock =>
253
251
if (pivotBlockHeader.number - syncState.pivotBlock.number <= syncConfig.maxTargetDifference) {
254
252
log.info(s " Current pivot block is fresh enough, starting state download " )
@@ -277,6 +275,17 @@ class FastSync(
277
275
)
278
276
syncState =
279
277
syncState.updatePivotBlock(pivotBlockHeader, syncConfig.fastSyncBlockValidationX, updateFailures = true )
278
+
279
+ case NodeRestart =>
280
+ // in case of node restart we are sure that new pivotBlockHeader > current pivotBlockHeader
281
+ syncState = syncState.updatePivotBlock(
282
+ pivotBlockHeader,
283
+ syncConfig.fastSyncBlockValidationX,
284
+ updateFailures = false
285
+ )
286
+ log.info(
287
+ s " Changing pivot block to ${pivotBlockHeader.number}, new safe target is ${syncState.safeDownloadTarget}"
288
+ )
280
289
}
281
290
282
291
private def removeRequestHandler (handler : ActorRef ): Unit = {
@@ -527,31 +536,48 @@ class FastSync(
527
536
}
528
537
}
529
538
539
+ def hasBestBlockFreshEnoughToUpdatePivotBlock (info : PeerInfo , state : SyncState , syncConfig : SyncConfig ): Boolean = {
540
+ (info.maxBlockNumber - syncConfig.pivotBlockOffset) - state.pivotBlock.number > syncConfig.maxPivotBlockAge
541
+ }
542
+
530
543
private def getPeerWithTooFreshNewBlock (
531
- peers : Map [ Peer , PeerInfo ],
544
+ peers : NonEmptyList [( Peer , PeerInfo ) ],
532
545
state : SyncState ,
533
546
syncConfig : SyncConfig
534
- ): List [Peer ] = {
547
+ ): List [( Peer , BigInt ) ] = {
535
548
peers.collect {
536
- case (peer, info)
537
- if (info.maxBlockNumber - syncConfig.pivotBlockOffset) - state.pivotBlock.number > FastSync .maxTargetBlockAge =>
538
- peer
539
- }.toList
549
+ case (peer, info) if hasBestBlockFreshEnoughToUpdatePivotBlock(info, state, syncConfig) =>
550
+ (peer, info.maxBlockNumber)
551
+ }
540
552
}
541
553
542
- private def shouldUpdateStateTargetBlock (): Boolean = {
543
- val availablePeers = peersToDownloadFrom
544
- if (availablePeers.isEmpty) {
554
+ def noBlockchainWorkRemaining : Boolean =
555
+ syncState.isBlockchainWorkFinished && assignedHandlers.isEmpty
556
+
557
+ def notInTheMiddleOfUpdate : Boolean =
558
+ ! (syncState.updatingPivotBlock || stateSyncRestartRequested)
559
+
560
+ def pivotBlockIsStale (): Boolean = {
561
+ val currentPeers = peersToDownloadFrom.toList
562
+ if (currentPeers.isEmpty) {
545
563
false
546
564
} else {
547
- if (availablePeers.size < syncConfig.minPeersToChoosePivotBlock) {
548
- getPeerWithTooFreshNewBlock(availablePeers, syncState, syncConfig).size == availablePeers.size
565
+ val peerWithBestBlockInNetwork = currentPeers.maxBy(peerWithNum => peerWithNum._2.maxBlockNumber)
566
+
567
+ val peersWithTooFreshPossiblePivotBlock =
568
+ getPeerWithTooFreshNewBlock(NonEmptyList .fromListUnsafe(currentPeers), syncState, syncConfig)
569
+
570
+ if (peersWithTooFreshPossiblePivotBlock.isEmpty) {
571
+ log.info(s " There are not peers with to fresh possible pivot block, " +
572
+ s " best peer has block with number: ${peerWithBestBlockInNetwork._2.maxBlockNumber}" )
573
+ false
549
574
} else {
550
- getPeerWithTooFreshNewBlock(
551
- availablePeers,
552
- syncState,
553
- syncConfig
554
- ).size >= syncConfig.minPeersToChoosePivotBlock
575
+ val pivotBlockIsStale = peersWithTooFreshPossiblePivotBlock.size >= minPeersToChoosePivotBlock
576
+
577
+ log.info(s " There are ${peersWithTooFreshPossiblePivotBlock.size} peers with possible new pivot block, " +
578
+ s " best known pivot in current peer list has number ${peerWithBestBlockInNetwork._2.maxBlockNumber}" )
579
+
580
+ pivotBlockIsStale
555
581
}
556
582
}
557
583
}
@@ -562,23 +588,9 @@ class FastSync(
562
588
} else {
563
589
if (blockchainDataToDownload) {
564
590
processDownloads()
565
- } else if (syncState.isBlockchainWorkFinished && assignedHandlers.isEmpty && ! syncState.stateSyncFinished) {
566
- if (peersToDownloadFrom.nonEmpty) {
567
- val bestBLock = peersToDownloadFrom.map { case (p, info) => info.maxBlockNumber }.max
568
- log.info(
569
- s " BestKnownBlock in network is ${bestBLock}. Target block is ${syncState.pivotBlock.number}. " +
570
- s " Difference is ${bestBLock - syncState.pivotBlock.number}"
571
- )
572
- }
573
-
574
- if (shouldUpdateStateTargetBlock() && ! stateSyncRestartRequested) {
575
- val bestBLock = peersToDownloadFrom.map { case (p, info) => info.maxBlockNumber }.max
576
- log.info(
577
- s " Updating state sync target block. " +
578
- s " BestKnownBlock in network is ${bestBLock}. Target block is ${syncState.pivotBlock.number}. " +
579
- s " Difference is ${bestBLock - syncState.pivotBlock.number}"
580
- )
581
-
591
+ } else if (noBlockchainWorkRemaining && ! syncState.stateSyncFinished && notInTheMiddleOfUpdate) {
592
+ if (pivotBlockIsStale()){
593
+ log.info(" Restarting state sync to new pivot block" )
582
594
syncStateScheduler ! RestartRequested
583
595
stateSyncRestartRequested = true
584
596
}
@@ -763,7 +775,7 @@ object FastSync {
763
775
)
764
776
)
765
777
766
- private case class UpdatePivotBlock (state : FinalBlockProcessingResult )
778
+ private case class UpdatePivotBlock (state : PivotBlockUpdateReason )
767
779
private case object ProcessSyncing
768
780
769
781
private [sync] case object PersistSyncState
@@ -841,8 +853,15 @@ object FastSync {
841
853
842
854
case object ImportedPivotBlock extends HeaderProcessingResult
843
855
844
- sealed abstract class FinalBlockProcessingResult
856
+ sealed abstract class PivotBlockUpdateReason {
857
+ def nodeRestart : Boolean = this match {
858
+ case ImportedLastBlock => false
859
+ case LastBlockValidationFailed => false
860
+ case NodeRestart => true
861
+ }
862
+ }
845
863
846
- case object ImportedLastBlock extends FinalBlockProcessingResult
847
- case object LastBlockValidationFailed extends FinalBlockProcessingResult
864
+ case object ImportedLastBlock extends PivotBlockUpdateReason
865
+ case object LastBlockValidationFailed extends PivotBlockUpdateReason
866
+ case object NodeRestart extends PivotBlockUpdateReason
848
867
}
0 commit comments