Skip to content

Commit 6234ff2

Browse files
committed
[ETCM-103] Add more tests
1 parent b30f5e6 commit 6234ff2

File tree

8 files changed

+137
-42
lines changed

8 files changed

+137
-42
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ class FastSync(
537537
}
538538

539539
def hasBestBlockFreshEnoughToUpdatePivotBlock(info: PeerInfo, state: SyncState, syncConfig: SyncConfig): Boolean = {
540-
(info.maxBlockNumber - syncConfig.pivotBlockOffset) - state.pivotBlock.number > syncConfig.maxPivotBlockAge
540+
(info.maxBlockNumber - syncConfig.pivotBlockOffset) - state.pivotBlock.number >= syncConfig.maxPivotBlockAge
541541
}
542542

543543
private def getPeerWithTooFreshNewBlock(
@@ -564,18 +564,25 @@ class FastSync(
564564
} else {
565565
val peerWithBestBlockInNetwork = currentPeers.maxBy(peerWithNum => peerWithNum._2.maxBlockNumber)
566566

567+
val bestPossibleTargetDifferenceInNetwork =
568+
(peerWithBestBlockInNetwork._2.maxBlockNumber - syncConfig.pivotBlockOffset) - syncState.pivotBlock.number
569+
567570
val peersWithTooFreshPossiblePivotBlock =
568571
getPeerWithTooFreshNewBlock(NonEmptyList.fromListUnsafe(currentPeers), syncState, syncConfig)
569572

570573
if (peersWithTooFreshPossiblePivotBlock.isEmpty) {
571-
log.info(s"There are not peers with to fresh possible pivot block, " +
572-
s"best peer has block with number: ${peerWithBestBlockInNetwork._2.maxBlockNumber}")
574+
log.info(
575+
s"There are no peers with too fresh possible pivot block. " +
576+
s"Current pivot block is $bestPossibleTargetDifferenceInNetwork blocks behind best possible target"
577+
)
573578
false
574579
} else {
575580
val pivotBlockIsStale = peersWithTooFreshPossiblePivotBlock.size >= minPeersToChoosePivotBlock
576581

577-
log.info(s"There are ${peersWithTooFreshPossiblePivotBlock.size} peers with possible new pivot block, " +
578-
s"best known pivot in current peer list has number ${peerWithBestBlockInNetwork._2.maxBlockNumber}")
582+
log.info(
583+
s"There are ${peersWithTooFreshPossiblePivotBlock.size} peers with possible new pivot block, " +
584+
s"best known pivot in current peer list has number ${peerWithBestBlockInNetwork._2.maxBlockNumber}"
585+
)
579586

580587
pivotBlockIsStale
581588
}
@@ -589,7 +596,7 @@ class FastSync(
589596
if (blockchainDataToDownload) {
590597
processDownloads()
591598
} else if (noBlockchainWorkRemaining && !syncState.stateSyncFinished && notInTheMiddleOfUpdate) {
592-
if (pivotBlockIsStale()){
599+
if (pivotBlockIsStale()) {
593600
log.info("Restarting state sync to new pivot block")
594601
syncStateScheduler ! RestartRequested
595602
stateSyncRestartRequested = true
@@ -749,7 +756,6 @@ class FastSync(
749756
}
750757

751758
object FastSync {
752-
val maxTargetBlockAge = 96
753759

754760
// scalastyle:off parameter.number
755761
def props(

src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateDownloaderActor.scala

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class SyncStateDownloaderActor(
9797
context.become(downloading(scheduler, currentState.handleRequestFailure(peer)))
9898
}
9999

100-
def idle: Receive = { case RegisterScheduler =>
100+
def idle: Receive = handleCommonMessages orElse { case RegisterScheduler =>
101101
log.debug("Scheduler registered, starting sync download")
102102
context.become(downloading(sender(), DownloaderState(Map.empty, Map.empty)))
103103
}
@@ -106,32 +106,35 @@ class SyncStateDownloaderActor(
106106

107107
def downloading(scheduler: ActorRef, currentState: DownloaderState): Receive =
108108
handleRequestResults(scheduler, currentState) orElse
109-
handleCommonMessages orElse { case GetMissingNodes(newNodesToGet) =>
110-
val freePeers = getFreePeers(currentState)
111-
if (freePeers.isEmpty) {
112-
log.info("No available peer, rescheduling request for retrieval")
113-
timers.startSingleTimer(CheckForPeersKey, GetMissingNodes(List.empty), syncConfig.syncRetryInterval)
114-
context.become(downloading(scheduler, currentState.scheduleNewNodesForRetrieval(newNodesToGet)))
115-
} else if (newNodesToGet.isEmpty && currentState.nodesToGet.isEmpty) {
116-
log.info("No available work, waiting for additional requests")
117-
} else {
118-
val nodesToGet = if (newNodesToGet.isEmpty) None else Some(newNodesToGet)
119-
val (newRequests, newState) =
120-
currentState.assignTasksToPeers(
121-
NonEmptyList.fromListUnsafe(freePeers.toList),
122-
nodesToGet,
123-
syncConfig.nodesPerRequest
109+
handleCommonMessages orElse {
110+
case GetMissingNodes(newNodesToGet) =>
111+
val freePeers = getFreePeers(currentState)
112+
if (freePeers.isEmpty) {
113+
log.info("No available peer, rescheduling request for retrieval")
114+
timers.startSingleTimer(CheckForPeersKey, GetMissingNodes(List.empty), syncConfig.syncRetryInterval)
115+
context.become(downloading(scheduler, currentState.scheduleNewNodesForRetrieval(newNodesToGet)))
116+
} else if (newNodesToGet.isEmpty && currentState.nodesToGet.isEmpty) {
117+
log.info("No available work, waiting for additional requests")
118+
} else {
119+
val nodesToGet = if (newNodesToGet.isEmpty) None else Some(newNodesToGet)
120+
val (newRequests, newState) =
121+
currentState.assignTasksToPeers(
122+
NonEmptyList.fromListUnsafe(freePeers.toList),
123+
nodesToGet,
124+
syncConfig.nodesPerRequest
125+
)
126+
log.info(
127+
"Creating {} new state node requests. Current request queue size is {}",
128+
newRequests.size,
129+
newState.nodesToGet.size
124130
)
125-
log.info(
126-
"Creating {} new state node requests. Current request queue size is {}",
127-
newRequests.size,
128-
newState.nodesToGet.size
129-
)
130-
newRequests.foreach { request =>
131-
requestNodes(request)
131+
newRequests.foreach { request =>
132+
requestNodes(request)
133+
}
134+
context.become(downloading(scheduler, newState))
132135
}
133-
context.become(downloading(scheduler, newState))
134-
}
136+
case CancelDownload =>
137+
context.become(idle)
135138
}
136139

137140
override def receive: Receive = idle
@@ -141,6 +144,7 @@ object SyncStateDownloaderActor {
141144
def props(etcPeerManager: ActorRef, peerEventBus: ActorRef, syncConfig: SyncConfig, scheduler: Scheduler): Props = {
142145
Props(new SyncStateDownloaderActor(etcPeerManager, peerEventBus, syncConfig, scheduler))
143146
}
147+
final case object CancelDownload
144148

145149
final case object CheckForPeersKey
146150

src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateScheduler.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ class SyncStateScheduler(blockchain: Blockchain, bloomFilter: BloomFilter[ByteSt
105105
}
106106

107107
def persistBatch(state: SchedulerState, targetBlockNumber: BigInt): SchedulerState = {
108+
// Potential optimisation would be to expose some kind batch api from db to make only 1 write instead od 100k
109+
// for we could do this over code as it exposes DataSourceBatchUpdate, but not for mpt node as it write path is more
110+
// complex due to pruning.
108111
val (nodes, newState) = state.getNodesToPersist
109112
nodes.foreach { case (hash, (data, reqType)) =>
110113
reqType match {
@@ -282,7 +285,7 @@ object SyncStateScheduler {
282285
def getEmptyFilter(expectedFilterSize: Int): BloomFilter[ByteString] = {
283286
BloomFilter.create[ByteString](ByteStringFunnel, expectedFilterSize)
284287
}
285-
// TODO add method to load bloom filter after node restart. Perfect way to do it would be to expose Observable
288+
// TODO [ETCM-213] add method to load bloom filter after node restart. Perfect way to do it would be to expose Observable
286289
// in RocksDBDataSource which underneath would use RockDbIterator which would traverse whole namespace.
287290
def apply(blockchain: Blockchain, expectedBloomFilterSize: Int): SyncStateScheduler = {
288291
new SyncStateScheduler(blockchain, getEmptyFilter(expectedBloomFilterSize))

src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateSchedulerActor.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package io.iohk.ethereum.blockchain.sync
22

33
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Timers}
44
import akka.util.ByteString
5-
import io.iohk.ethereum.blockchain.sync.SyncStateDownloaderActor.RegisterScheduler
5+
import io.iohk.ethereum.blockchain.sync.SyncStateDownloaderActor.{CancelDownload, RegisterScheduler}
66
import io.iohk.ethereum.blockchain.sync.SyncStateScheduler.{ProcessingStatistics, SchedulerState, SyncResponse}
77
import io.iohk.ethereum.blockchain.sync.SyncStateSchedulerActor.{
88
GetMissingNodes,
@@ -116,6 +116,7 @@ class SyncStateSchedulerActor(downloader: ActorRef, sync: SyncStateScheduler, sy
116116
log.info(syncStats)
117117

118118
case RestartRequested =>
119+
downloader ! CancelDownload
119120
sync.persistBatch(currentState, targetBlock)
120121
sender() ! WaitingForNewTargetBlock
121122
context.become(idle(currentStats))

src/main/scala/io/iohk/ethereum/utils/Config.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ object Config {
170170
stateSyncBloomFilterSize = syncConfig.getInt("state-sync-bloomFilter-size"),
171171
stateSyncPersistBatchSize = syncConfig.getInt("state-sync-persistBatch-size"),
172172
pivotBlockReScheduleInterval = syncConfig.getDuration("pivot-block-reSchedule-interval").toMillis.millis,
173-
maxPivotBlockAge = syncConfig.getInt("max-pivot-block-age"),
173+
maxPivotBlockAge = syncConfig.getInt("max-pivot-block-age")
174174
)
175175
}
176176
}

src/test/scala/io/iohk/ethereum/blockchain/sync/StateSyncSpec.scala

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@ import akka.actor.{ActorRef, ActorSystem}
66
import akka.testkit.TestActor.AutoPilot
77
import akka.testkit.{TestKit, TestProbe}
88
import io.iohk.ethereum.blockchain.sync.StateSyncUtils.TrieProvider
9-
import io.iohk.ethereum.blockchain.sync.SyncStateSchedulerActor.{StartSyncingTo, StateSyncFinished}
9+
import io.iohk.ethereum.blockchain.sync.SyncStateSchedulerActor.{
10+
RestartRequested,
11+
StartSyncingTo,
12+
StateSyncFinished,
13+
WaitingForNewTargetBlock
14+
}
1015
import io.iohk.ethereum.domain.BlockchainImpl
1116
import io.iohk.ethereum.network.EtcPeerManagerActor.{GetHandshakedPeers, HandshakedPeers, PeerInfo, SendMessage}
1217
import io.iohk.ethereum.network.{Peer, PeerId}
@@ -17,6 +22,7 @@ import io.iohk.ethereum.network.p2p.messages.PV63.NodeData
1722
import io.iohk.ethereum.network.p2p.messages.Versions
1823
import io.iohk.ethereum.utils.Config
1924
import io.iohk.ethereum.{Fixtures, ObjectGenerators, WithActorSystemShutDown}
25+
import org.scalactic.anyvals.PosInt
2026
import org.scalatest.BeforeAndAfterAll
2127
import org.scalatest.flatspec.AnyFlatSpecLike
2228
import org.scalatest.matchers.should.Matchers
@@ -35,6 +41,9 @@ class StateSyncSpec
3541

3642
val actorSystem = system
3743

44+
// those tests are somewhat long running 3 successful evaluation should be fine
45+
implicit override val generatorDrivenConfig = PropertyCheckConfiguration(minSuccessful = PosInt(3))
46+
3847
"StateSync" should "sync state to different tries" in new TestSetup() {
3948
forAll(ObjectGenerators.genMultipleNodeData(3000)) { nodeData =>
4049
val initiator = TestProbe()
@@ -68,6 +77,18 @@ class StateSyncSpec
6877
}
6978
}
7079

80+
it should "stop state sync when requested" in new TestSetup() {
81+
forAll(ObjectGenerators.genMultipleNodeData(1000)) { nodeData =>
82+
val initiator = TestProbe()
83+
val trieProvider1 = TrieProvider()
84+
val target = trieProvider1.buildWorld(nodeData)
85+
setAutoPilotWithProvider(trieProvider1)
86+
initiator.send(scheduler, StartSyncingTo(target, 1))
87+
initiator.send(scheduler, RestartRequested)
88+
initiator.expectMsg(WaitingForNewTargetBlock)
89+
}
90+
}
91+
7192
class TestSetup extends EphemBlockchainTestSetup with TestSyncConfig {
7293
override implicit lazy val system = actorSystem
7394
type PeerConfig = Map[PeerId, PeerAction]
@@ -176,7 +197,10 @@ class StateSyncSpec
176197
lazy val scheduler = system.actorOf(
177198
SyncStateSchedulerActor.props(
178199
downloader,
179-
new SyncStateScheduler(buildBlockChain(), SyncStateScheduler.getEmptyFilter(syncConfig.stateSyncBloomFilterSize)),
200+
new SyncStateScheduler(
201+
buildBlockChain(),
202+
SyncStateScheduler.getEmptyFilter(syncConfig.stateSyncBloomFilterSize)
203+
),
180204
syncConfig
181205
)
182206
)

src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -369,11 +369,64 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
369369
}
370370
}
371371

372+
it should "update pivot block during state sync if it goes stale" in new TestSetup() {
373+
startWithState(defaultStateBeforeNodeRestart)
374+
375+
syncController ! SyncController.Start
376+
377+
val handshakedPeers = HandshakedPeers(singlePeer)
378+
379+
val newBlocks =
380+
getHeaders(defaultStateBeforeNodeRestart.bestBlockHeaderNumber + 1, 50)
381+
382+
setupAutoPilot(
383+
etcPeerManager,
384+
handshakedPeers,
385+
defaultpivotBlockHeader,
386+
BlockchainData(newBlocks),
387+
failedNodeRequest = true
388+
)
389+
390+
// choose first pivot and as it is fresh enough start state sync
391+
eventually(timeout = eventuallyTimeOut) {
392+
val syncState = storagesInstance.storages.fastSyncStateStorage.getSyncState().get
393+
syncState.isBlockchainWorkFinished shouldBe true
394+
syncState.updatingPivotBlock shouldBe false
395+
stateDownloadStarted shouldBe true
396+
}
397+
val peerWithBetterBlock = defaultPeer1Info.copy(maxBlockNumber = bestBlock + syncConfig.maxPivotBlockAge)
398+
val newHandshakedPeers = HandshakedPeers(Map(peer1 -> peerWithBetterBlock))
399+
val newPivot = defaultpivotBlockHeader.copy(number = defaultpivotBlockHeader.number + syncConfig.maxPivotBlockAge)
400+
401+
setupAutoPilot(etcPeerManager, newHandshakedPeers, newPivot, BlockchainData(newBlocks), failedNodeRequest = true)
402+
403+
// sync to new pivot
404+
eventually(timeout = eventuallyTimeOut) {
405+
val syncState = storagesInstance.storages.fastSyncStateStorage.getSyncState().get
406+
syncState.pivotBlock shouldBe newPivot
407+
}
408+
409+
// enable peer to respond with mpt nodes
410+
setupAutoPilot(etcPeerManager, newHandshakedPeers, newPivot, BlockchainData(newBlocks))
411+
412+
val watcher = TestProbe()
413+
watcher.watch(syncController)
414+
415+
eventually(timeout = longeventuallyTimeOut) {
416+
//switch to regular download
417+
val children = syncController.children
418+
assert(storagesInstance.storages.appStateStorage.isFastSyncDone())
419+
assert(children.exists(ref => ref.path.name == "regular-sync"))
420+
assert(blockchain.getBestBlockNumber() == newPivot.number)
421+
}
422+
}
423+
372424
class TestSetup(
373425
blocksForWhichLedgerFails: Seq[BigInt] = Nil,
374426
_validators: Validators = new Mocks.MockValidatorsAlwaysSucceed
375427
) extends EphemBlockchainTestSetup
376428
with TestSyncConfig {
429+
var stateDownloadStarted = false
377430

378431
val eventuallyTimeOut: Timeout = Timeout(Span(10, Seconds))
379432
val longeventuallyTimeOut = Timeout(Span(30, Seconds))
@@ -407,7 +460,8 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
407460
blacklistDuration = 1.second,
408461
peerResponseTimeout = 2.seconds,
409462
persistStateSnapshotInterval = 0.1.seconds,
410-
fastSyncThrottle = 10.milliseconds
463+
fastSyncThrottle = 10.milliseconds,
464+
maxPivotBlockAge = 30
411465
)
412466

413467
lazy val syncController = TestActorRef(
@@ -510,7 +564,8 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
510564
blockchainData: BlockchainData,
511565
failedReceiptsTries: Int = 0,
512566
failedBodiesTries: Int = 0,
513-
onlyPivot: Boolean = false
567+
onlyPivot: Boolean = false,
568+
failedNodeRequest: Boolean = false
514569
): Unit = {
515570
testProbe.setAutoPilot(new AutoPilot {
516571
var failedReceipts = 0
@@ -555,8 +610,11 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
555610
}
556611

557612
case SendMessage(msg: GetNodeDataEnc, peer) if !onlyPivot =>
613+
stateDownloadStarted = true
558614
val underlyingMessage = msg.underlyingMsg
559-
sender ! MessageFromPeer(NodeData(Seq(defaultStateMptLeafWithAccount)), peer)
615+
if (!failedNodeRequest) {
616+
sender ! MessageFromPeer(NodeData(Seq(defaultStateMptLeafWithAccount)), peer)
617+
}
560618
}
561619

562620
this

src/test/scala/io/iohk/ethereum/blockchain/sync/TestSyncConfig.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@ trait TestSyncConfig extends SyncConfigBuilder {
4343
stateSyncBloomFilterSize = 1000,
4444
stateSyncPersistBatchSize = 1000,
4545
pivotBlockReScheduleInterval = 1.second,
46-
maxPivotBlockAge = 96,
47-
46+
maxPivotBlockAge = 96
4847
)
4948

5049
override lazy val syncConfig: SyncConfig = defaultSyncConfig

0 commit comments

Comments
 (0)