@@ -7,6 +7,7 @@ import cats.data.NonEmptyList
7
7
import io .iohk .ethereum .blockchain .sync .FastSyncReceiptsValidator .ReceiptsValidationResult
8
8
import io .iohk .ethereum .blockchain .sync .PeerRequestHandler .ResponseReceived
9
9
import io .iohk .ethereum .blockchain .sync .SyncBlocksValidator .BlockBodyValidationResult
10
+ import io .iohk .ethereum .blockchain .sync .SyncProtocol .Status .Progress
10
11
import io .iohk .ethereum .blockchain .sync .SyncStateSchedulerActor .{
11
12
RestartRequested ,
12
13
StartSyncingTo ,
@@ -26,7 +27,7 @@ import io.iohk.ethereum.utils.Config.SyncConfig
26
27
import org .bouncycastle .util .encoders .Hex
27
28
import scala .annotation .tailrec
28
29
import scala .concurrent .ExecutionContext .Implicits .global
29
- import scala .concurrent .duration .{ FiniteDuration , _ }
30
+ import scala .concurrent .duration ._
30
31
import scala .util .Random
31
32
32
33
// scalastyle:off file.size.limit
@@ -55,8 +56,10 @@ class FastSync(
55
56
56
57
def handleCommonMessages : Receive = handlePeerListMessages orElse handleBlacklistMessages
57
58
58
- def idle : Receive = handleCommonMessages orElse { case Start =>
59
- start()
59
+ def idle : Receive = handleCommonMessages orElse {
60
+ case SyncProtocol .Start =>
61
+ start()
62
+ case SyncProtocol .GetStatus => sender() ! SyncProtocol .Status .NotSyncing
60
63
}
61
64
62
65
def start (): Unit = {
@@ -82,22 +85,24 @@ class FastSync(
82
85
context become waitingForPivotBlock
83
86
}
84
87
85
- def waitingForPivotBlock : Receive = handleCommonMessages orElse { case PivotBlockSelector .Result (pivotBlockHeader) =>
86
- if (pivotBlockHeader.number < 1 ) {
87
- log.info(" Unable to start block synchronization in fast mode: pivot block is less than 1" )
88
- appStateStorage.fastSyncDone().commit()
89
- context become idle
90
- syncController ! Done
91
- } else {
92
- val initialSyncState =
93
- SyncState (
94
- pivotBlockHeader,
95
- safeDownloadTarget = pivotBlockHeader.number + syncConfig.fastSyncBlockValidationX
96
- )
97
- val syncingHandler = new SyncingHandler (initialSyncState)
98
- context.become(syncingHandler.receive)
99
- syncingHandler.processSyncing()
100
- }
88
+ def waitingForPivotBlock : Receive = handleCommonMessages orElse {
89
+ case SyncProtocol .GetStatus => sender() ! SyncProtocol .Status .NotSyncing
90
+ case PivotBlockSelector .Result (pivotBlockHeader) =>
91
+ if (pivotBlockHeader.number < 1 ) {
92
+ log.info(" Unable to start block synchronization in fast mode: pivot block is less than 1" )
93
+ appStateStorage.fastSyncDone().commit()
94
+ context become idle
95
+ syncController ! Done
96
+ } else {
97
+ val initialSyncState =
98
+ SyncState (
99
+ pivotBlockHeader,
100
+ safeDownloadTarget = pivotBlockHeader.number + syncConfig.fastSyncBlockValidationX
101
+ )
102
+ val syncingHandler = new SyncingHandler (initialSyncState)
103
+ context.become(syncingHandler.receive)
104
+ syncingHandler.processSyncing()
105
+ }
101
106
}
102
107
103
108
// scalastyle:off number.of.methods
@@ -140,8 +145,14 @@ class FastSync(
140
145
private val heartBeat =
141
146
scheduler.scheduleWithFixedDelay(syncRetryInterval, syncRetryInterval * 2 , self, ProcessSyncing )
142
147
143
- def receive : Receive = handleCommonMessages orElse {
144
- case UpdatePivotBlock (state) => updatePivotBlock(state)
148
+ def handleStatus : Receive = {
149
+ case SyncProtocol .GetStatus => sender() ! currentSyncingStatus
150
+ case SyncStateSchedulerActor .StateSyncStats (saved, missing) =>
151
+ syncState = syncState.copy(downloadedNodesCount = saved, totalNodesCount = (saved + missing))
152
+ }
153
+
154
+ def receive : Receive = handleCommonMessages orElse handleStatus orElse {
155
+ case UpdatePivotBlock (reason) => updatePivotBlock(reason)
145
156
case WaitingForNewTargetBlock =>
146
157
log.info(" State sync stopped until receiving new pivot block" )
147
158
updatePivotBlock(ImportedLastBlock )
@@ -200,7 +211,8 @@ class FastSync(
200
211
201
212
def reScheduleAskForNewPivot (updateReason : PivotBlockUpdateReason ): Unit = {
202
213
syncState = syncState.copy(pivotBlockUpdateFailures = syncState.pivotBlockUpdateFailures + 1 )
203
- scheduler.scheduleOnce(syncConfig.pivotBlockReScheduleInterval, self, UpdatePivotBlock (updateReason))
214
+ scheduler
215
+ .scheduleOnce(syncConfig.pivotBlockReScheduleInterval, self, UpdatePivotBlock (updateReason))
204
216
}
205
217
206
218
private def stalePivotAfterRestart (
@@ -219,23 +231,33 @@ class FastSync(
219
231
newPivot.number >= currentState.pivotBlock.number && ! stalePivotAfterRestart(newPivot, currentState, updateReason)
220
232
}
221
233
222
- def waitingForPivotBlockUpdate (updateReason : PivotBlockUpdateReason ): Receive = handleCommonMessages orElse {
223
- case PivotBlockSelector .Result (pivotBlockHeader)
224
- if newPivotIsGoodEnough(pivotBlockHeader, syncState, updateReason) =>
225
- log.info(s " New pivot block with number ${pivotBlockHeader.number} received " )
226
- updatePivotSyncState(updateReason, pivotBlockHeader)
227
- context become this .receive
228
- processSyncing()
234
+ def waitingForPivotBlockUpdate (updateReason : PivotBlockUpdateReason ): Receive =
235
+ handleCommonMessages orElse handleStatus orElse {
236
+ case PivotBlockSelector .Result (pivotBlockHeader)
237
+ if newPivotIsGoodEnough(pivotBlockHeader, syncState, updateReason) =>
238
+ log.info(s " New pivot block with number ${pivotBlockHeader.number} received " )
239
+ updatePivotSyncState(updateReason, pivotBlockHeader)
240
+ context become this .receive
241
+ processSyncing()
229
242
230
- case PivotBlockSelector .Result (pivotBlockHeader)
231
- if ! newPivotIsGoodEnough(pivotBlockHeader, syncState, updateReason) =>
232
- log.info(" Received pivot block is older than old one, re-scheduling asking for new one" )
233
- reScheduleAskForNewPivot(updateReason)
243
+ case PivotBlockSelector .Result (pivotBlockHeader)
244
+ if ! newPivotIsGoodEnough(pivotBlockHeader, syncState, updateReason) =>
245
+ log.info(" Received pivot block is older than old one, re-scheduling asking for new one" )
246
+ reScheduleAskForNewPivot(updateReason)
234
247
235
- case PersistSyncState => persistSyncState()
248
+ case PersistSyncState => persistSyncState()
236
249
237
- case UpdatePivotBlock (state) => updatePivotBlock(state)
238
- }
250
+ case UpdatePivotBlock (state) => updatePivotBlock(state)
251
+ }
252
+
253
+ def currentSyncingStatus : SyncProtocol .Status =
254
+ SyncProtocol .Status .Syncing (
255
+ initialSyncState.lastFullBlockNumber,
256
+ Progress (syncState.lastFullBlockNumber, syncState.pivotBlock.number),
257
+ Some (
258
+ Progress (syncState.downloadedNodesCount, syncState.totalNodesCount.max(1 ))
259
+ ) // There's always at least one state root to fetch
260
+ )
239
261
240
262
private def updatePivotBlock (updateReason : PivotBlockUpdateReason ): Unit = {
241
263
if (syncState.pivotBlockUpdateFailures <= syncConfig.maximumTargetUpdateFailures) {
@@ -745,24 +767,26 @@ class FastSync(
745
767
def fullySynced : Boolean = {
746
768
syncState.isBlockchainWorkFinished && assignedHandlers.isEmpty && syncState.stateSyncFinished
747
769
}
748
- }
749
770
750
- private def updateBestBlockIfNeeded (receivedHashes : Seq [ByteString ]): Unit = {
751
- val fullBlocks = receivedHashes.flatMap { hash =>
752
- for {
753
- header <- blockchain.getBlockHeaderByHash(hash)
754
- _ <- blockchain.getBlockBodyByHash(hash)
755
- _ <- blockchain.getReceiptsByHash(hash)
756
- } yield header
757
- }
771
+ private def updateBestBlockIfNeeded (receivedHashes : Seq [ByteString ]): Unit = {
772
+ val fullBlocks = receivedHashes.flatMap { hash =>
773
+ for {
774
+ header <- blockchain.getBlockHeaderByHash(hash)
775
+ _ <- blockchain.getBlockBodyByHash(hash)
776
+ _ <- blockchain.getReceiptsByHash(hash)
777
+ } yield header
778
+ }
758
779
759
- if (fullBlocks.nonEmpty) {
760
- val bestReceivedBlock = fullBlocks.maxBy(_.number)
761
- if (appStateStorage.getBestBlockNumber() < bestReceivedBlock.number) {
762
- appStateStorage.putBestBlockNumber(bestReceivedBlock.number).commit()
780
+ if (fullBlocks.nonEmpty) {
781
+ val bestReceivedBlock = fullBlocks.maxBy(_.number)
782
+ val lastStoredBestBlockNumber = appStateStorage.getBestBlockNumber()
783
+ if (lastStoredBestBlockNumber < bestReceivedBlock.number) {
784
+ appStateStorage.putBestBlockNumber(bestReceivedBlock.number).commit()
785
+ }
786
+ syncState = syncState.copy(lastFullBlockNumber = bestReceivedBlock.number.max(lastStoredBestBlockNumber))
763
787
}
764
- }
765
788
789
+ }
766
790
}
767
791
}
768
792
@@ -794,7 +818,7 @@ object FastSync {
794
818
)
795
819
)
796
820
797
- private case class UpdatePivotBlock (state : PivotBlockUpdateReason )
821
+ private case class UpdatePivotBlock (reason : PivotBlockUpdateReason )
798
822
private case object ProcessSyncing
799
823
800
824
private [sync] case object PersistSyncState
@@ -803,11 +827,12 @@ object FastSync {
803
827
804
828
case class SyncState (
805
829
pivotBlock : BlockHeader ,
830
+ lastFullBlockNumber : BigInt = 0 ,
806
831
safeDownloadTarget : BigInt = 0 ,
807
832
blockBodiesQueue : Seq [ByteString ] = Nil ,
808
833
receiptsQueue : Seq [ByteString ] = Nil ,
809
- downloadedNodesCount : Int = 0 ,
810
- totalNodesCount : Int = 0 ,
834
+ downloadedNodesCount : Long = 0 ,
835
+ totalNodesCount : Long = 0 ,
811
836
bestBlockHeaderNumber : BigInt = 0 ,
812
837
nextBlockToFullyValidate : BigInt = 1 ,
813
838
pivotBlockUpdateFailures : Int = 0 ,
@@ -846,9 +871,8 @@ object FastSync {
846
871
updatingPivotBlock = false
847
872
)
848
873
849
- def isBlockchainWorkFinished : Boolean = {
874
+ def isBlockchainWorkFinished : Boolean =
850
875
bestBlockHeaderNumber >= safeDownloadTarget && ! blockChainWorkQueued
851
- }
852
876
}
853
877
854
878
sealed trait HashType {
@@ -860,17 +884,12 @@ object FastSync {
860
884
case class EvmCodeHash (v : ByteString ) extends HashType
861
885
case class StorageRootHash (v : ByteString ) extends HashType
862
886
863
- case object Start
864
887
case object Done
865
888
866
889
sealed abstract class HeaderProcessingResult
867
-
868
890
case object HeadersProcessingFinished extends HeaderProcessingResult
869
-
870
891
case class ParentDifficultyNotFound (header : BlockHeader ) extends HeaderProcessingResult
871
-
872
892
case class ValidationFailed (header : BlockHeader , peer : Peer ) extends HeaderProcessingResult
873
-
874
893
case object ImportedPivotBlock extends HeaderProcessingResult
875
894
876
895
sealed abstract class PivotBlockUpdateReason {
0 commit comments