Skip to content

Commit b151624

Browse files
authored
[ETCM-689] Update state sync and pivot block selector to use new blacklist (#935)
* [ETCM-689] Update state sync actor to new blacklist * [ETCM-689] Update pivot block seletor actor to new blacklist * [ETCM-689] Fix pivot block selection test * [ETCM-689] Reformats triggered by sbt pp * [ETCM-689] Remove unused imports * [ETCM-689] Fix tests for real
1 parent fd69635 commit b151624

File tree

11 files changed

+147
-97
lines changed

11 files changed

+147
-97
lines changed

src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@ import org.scalatest.matchers.should.Matchers
2121

2222
import scala.concurrent.duration._
2323

24-
class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators with AsyncFlatSpecLike with Matchers with BeforeAndAfterAll {
24+
class BlockImporterItSpec
25+
extends MockFactory
26+
with TestSetupWithVmAndValidators
27+
with AsyncFlatSpecLike
28+
with Matchers
29+
with BeforeAndAfterAll {
2530

2631
implicit val testScheduler = Scheduler.fixedPool("test", 32)
2732

@@ -57,11 +62,15 @@ class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators
5762
ethCompatibleStorage = true
5863
)
5964

60-
override lazy val ledger = new TestLedgerImpl(successValidators) {
61-
override private[ledger] lazy val blockExecution = new BlockExecution(blockchain, blockchainConfig, consensus.blockPreparator, blockValidation) {
62-
override def executeAndValidateBlock(block: Block, alreadyValidated: Boolean = false): Either[BlockExecutionError, Seq[Receipt]] =
63-
Right(BlockResult(emptyWorld).receipts)
64-
}
65+
override lazy val ledger = new TestLedgerImpl(successValidators) {
66+
override private[ledger] lazy val blockExecution =
67+
new BlockExecution(blockchain, blockchainConfig, consensus.blockPreparator, blockValidation) {
68+
override def executeAndValidateBlock(
69+
block: Block,
70+
alreadyValidated: Boolean = false
71+
): Either[BlockExecutionError, Seq[Receipt]] =
72+
Right(BlockResult(emptyWorld).receipts)
73+
}
6574
}
6675

6776
val blockImporter = system.actorOf(
@@ -75,7 +84,8 @@ class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators
7584
pendingTransactionsManagerProbe.ref,
7685
checkpointBlockGenerator,
7786
supervisor.ref
78-
))
87+
)
88+
)
7989

8090
val genesisBlock = blockchain.genesisBlock
8191
val block1: Block = getBlock(genesisBlock.number + 1, parent = genesisBlock.header.hash)
@@ -119,7 +129,8 @@ class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators
119129
pendingTransactionsManagerProbe.ref,
120130
checkpointBlockGenerator,
121131
supervisor.ref
122-
))
132+
)
133+
)
123134

124135
blockImporter ! BlockImporter.Start
125136
blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(newBranch))
@@ -142,7 +153,6 @@ class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators
142153
blockchain.getBestBlock().get shouldEqual newBlock3
143154
}
144155

145-
146156
it should "switch to a branch with a checkpoint" in {
147157

148158
val checkpoint = ObjectGenerators.fakeCheckpointGen(3, 3).sample.get

src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,18 @@ object Blacklist {
7777
val code: Int = 9
7878
val name: String = "PeerActorTerminated"
7979
}
80+
case object InvalidStateResponseType extends BlacklistReasonType {
81+
val code: Int = 10
82+
val name: String = "InvalidStateResponse"
83+
}
84+
case object InvalidPivotBlockElectionResponseType extends BlacklistReasonType {
85+
val code: Int = 11
86+
val name: String = "InvalidPivotElectionResponse"
87+
}
88+
case object PivotBlockElectionTimeoutType extends BlacklistReasonType {
89+
val code: Int = 12
90+
val name: String = "PivotBlockElectionTimeout"
91+
}
8092
}
8193

8294
case object WrongBlockHeaders extends BlacklistReason {
@@ -115,6 +127,18 @@ object Blacklist {
115127
val reasonType: BlacklistReasonType = PeerActorTerminatedType
116128
val description: String = "Peer actor terminated"
117129
}
130+
final case class InvalidStateResponse(details: String) extends BlacklistReason {
131+
val reasonType: BlacklistReasonType = InvalidStateResponseType
132+
val description: String = s"Invalid response while syncing state trie: $details"
133+
}
134+
case object InvalidPivotBlockElectionResponse extends BlacklistReason {
135+
val reasonType: BlacklistReasonType = InvalidStateResponseType
136+
val description: String = "Invalid response while selecting pivot block"
137+
}
138+
case object PivotBlockElectionTimeout extends BlacklistReason {
139+
val reasonType: BlacklistReasonType = InvalidStateResponseType
140+
val description: String = "Peer didn't respond with requested pivot block candidate in a timely manner"
141+
}
118142
}
119143
}
120144

src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,12 @@ class SyncController(
2121
checkpointBlockGenerator: CheckpointBlockGenerator,
2222
ommersPool: ActorRef,
2323
etcPeerManager: ActorRef,
24+
blacklist: Blacklist,
2425
syncConfig: SyncConfig,
2526
externalSchedulerOpt: Option[Scheduler] = None
2627
) extends Actor
2728
with ActorLogging {
2829

29-
private val blacklistSize: Int = 1000 // TODO ETCM-642 move to config
30-
private val blacklist: Blacklist = CacheBasedBlacklist.empty(blacklistSize)
31-
3230
def scheduler: Scheduler = externalSchedulerOpt getOrElse context.system.scheduler
3331

3432
override def receive: Receive = idle
@@ -132,6 +130,7 @@ object SyncController {
132130
checkpointBlockGenerator: CheckpointBlockGenerator,
133131
ommersPool: ActorRef,
134132
etcPeerManager: ActorRef,
133+
blacklist: Blacklist,
135134
syncConfig: SyncConfig
136135
): Props =
137136
Props(
@@ -146,6 +145,7 @@ object SyncController {
146145
checkpointBlockGenerator,
147146
ommersPool,
148147
etcPeerManager,
148+
blacklist,
149149
syncConfig
150150
)
151151
)

src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class FastSync(
8383
def startFromScratch(): Unit = {
8484
log.info("Starting fast sync from scratch")
8585
val pivotBlockSelector = context.actorOf(
86-
PivotBlockSelector.props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self),
86+
PivotBlockSelector.props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self, blacklist),
8787
"pivot-block-selector"
8888
)
8989
pivotBlockSelector ! PivotBlockSelector.SelectPivotBlock
@@ -138,6 +138,7 @@ class FastSync(
138138
syncConfig,
139139
etcPeerManager,
140140
peerEventBus,
141+
blacklist,
141142
scheduler
142143
),
143144
"state-scheduler"
@@ -218,7 +219,7 @@ class FastSync(
218219
log.info("Asking for new pivot block")
219220
val pivotBlockSelector = {
220221
context.actorOf(
221-
PivotBlockSelector.props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self)
222+
PivotBlockSelector.props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self, blacklist)
222223
)
223224
}
224225
pivotBlockSelector ! PivotBlockSelector.SelectPivotBlock

src/main/scala/io/iohk/ethereum/blockchain/sync/fast/PivotBlockSelector.scala

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@ package io.iohk.ethereum.blockchain.sync.fast
22

33
import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props, Scheduler}
44
import akka.util.ByteString
5-
import io.iohk.ethereum.blockchain.sync.{BlacklistSupport, PeerListSupport}
5+
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason.{
6+
InvalidPivotBlockElectionResponse,
7+
PivotBlockElectionTimeout
8+
}
9+
import io.iohk.ethereum.blockchain.sync.PeerListSupportNg.PeerWithInfo
10+
import io.iohk.ethereum.blockchain.sync.{Blacklist, PeerListSupportNg}
611
import io.iohk.ethereum.domain.BlockHeader
712
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
813
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
@@ -21,28 +26,26 @@ class PivotBlockSelector(
2126
val peerEventBus: ActorRef,
2227
val syncConfig: SyncConfig,
2328
val scheduler: Scheduler,
24-
fastSync: ActorRef
29+
fastSync: ActorRef,
30+
val blacklist: Blacklist
2531
) extends Actor
2632
with ActorLogging
27-
with PeerListSupport
28-
with BlacklistSupport {
33+
with PeerListSupportNg {
2934

3035
import PivotBlockSelector._
3136
import syncConfig._
3237

33-
def handleCommonMessages: Receive = handlePeerListMessages orElse handleBlacklistMessages
34-
3538
override def receive: Receive = idle
3639

37-
def idle: Receive = handleCommonMessages orElse { case SelectPivotBlock =>
40+
private def idle: Receive = handlePeerListMessages orElse { case SelectPivotBlock =>
3841
val election @ ElectionDetails(correctPeers, expectedPivotBlock) = collectVoters
3942

4043
if (election.isEnoughVoters(minPeersToChoosePivotBlock)) {
4144

4245
val (peersToAsk, waitingPeers) = correctPeers.splitAt(minPeersToChoosePivotBlock + peersToChoosePivotBlockMargin)
4346

4447
log.info(
45-
"Trying to choose fast sync pivot block using {} peers ({} correct ones). Ask {} peers for block nr {}",
48+
"Trying to choose fast sync pivot block using {} peers ({} ones with high enough block). Ask {} peers for block nr {}",
4649
peersToDownloadFrom.size,
4750
correctPeers.size,
4851
peersToAsk.size,
@@ -78,7 +81,7 @@ class PivotBlockSelector(
7881
timeout: Cancellable,
7982
headers: Map[ByteString, BlockHeaderWithVotes]
8083
): Receive =
81-
handleCommonMessages orElse {
84+
handlePeerListMessages orElse {
8285
case MessageFromPeer(blockHeaders: BlockHeaders, peerId) =>
8386
peerEventBus ! Unsubscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peerId)))
8487
val updatedPeersToAsk = peersToAsk - peerId
@@ -93,12 +96,12 @@ class PivotBlockSelector(
9396
val updatedHeaders = headers.updated(targetBlockHeader.hash, newValue)
9497
votingProcess(updatedPeersToAsk, waitingPeers, pivotBlockNumber, timeout, updatedHeaders)
9598
case None =>
96-
blacklist(peerId, blacklistDuration, "Did not respond with pivot block header, blacklisting")
99+
blacklist.add(peerId, blacklistDuration, InvalidPivotBlockElectionResponse)
97100
votingProcess(updatedPeersToAsk, waitingPeers, pivotBlockNumber, timeout, headers)
98101
}
99102
case ElectionPivotBlockTimeout =>
100103
peersToAsk.foreach { peerId =>
101-
blacklist(peerId, blacklistDuration, "Did not respond with pivot block header (timeout), blacklisting")
104+
blacklist.add(peerId, blacklistDuration, PivotBlockElectionTimeout)
102105
}
103106
peerEventBus ! Unsubscribe()
104107
log.info("Pivot block header receive timeout. Retrying in {}", startRetryInterval)
@@ -155,12 +158,12 @@ class PivotBlockSelector(
155158
private def isPossibleToReachConsensus(peersLeft: Int, bestHeaderVotes: Int): Boolean =
156159
peersLeft + bestHeaderVotes >= minPeersToChoosePivotBlock
157160

158-
def scheduleRetry(interval: FiniteDuration): Unit = {
161+
private def scheduleRetry(interval: FiniteDuration): Unit = {
159162
scheduler.scheduleOnce(interval, self, SelectPivotBlock)
160163
context become idle
161164
}
162165

163-
def sendResponseAndCleanup(pivotBlockHeader: BlockHeader): Unit = {
166+
private def sendResponseAndCleanup(pivotBlockHeader: BlockHeader): Unit = {
164167
log.info("Found pivot block: {} hash: {}", pivotBlockHeader.number, pivotBlockHeader.hashAsHexString)
165168
fastSync ! Result(pivotBlockHeader)
166169
peerEventBus ! Unsubscribe()
@@ -176,8 +179,9 @@ class PivotBlockSelector(
176179
}
177180

178181
private def collectVoters: ElectionDetails = {
179-
val peersUsedToChooseTarget = peersToDownloadFrom.collect { case (peer, PeerInfo(_, _, true, maxBlockNumber, _)) =>
180-
(peer, maxBlockNumber)
182+
val peersUsedToChooseTarget = peersToDownloadFrom.collect {
183+
case (_, PeerWithInfo(peer, PeerInfo(_, _, true, maxBlockNumber, _))) =>
184+
(peer, maxBlockNumber)
181185
}
182186

183187
val peersSortedByBestNumber = peersUsedToChooseTarget.toList.sortBy { case (_, number) => -number }
@@ -199,9 +203,10 @@ object PivotBlockSelector {
199203
peerEventBus: ActorRef,
200204
syncConfig: SyncConfig,
201205
scheduler: Scheduler,
202-
fastSync: ActorRef
206+
fastSync: ActorRef,
207+
blacklist: Blacklist
203208
): Props =
204-
Props(new PivotBlockSelector(etcPeerManager: ActorRef, peerEventBus, syncConfig, scheduler, fastSync))
209+
Props(new PivotBlockSelector(etcPeerManager: ActorRef, peerEventBus, syncConfig, scheduler, fastSync, blacklist))
205210

206211
case object SelectPivotBlock
207212
case class Result(targetBlockHeader: BlockHeader)

0 commit comments

Comments
 (0)