Skip to content

[ETCM-689] Update state sync and pivot block selector to use new blacklist #935

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration._

class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators with AsyncFlatSpecLike with Matchers with BeforeAndAfterAll {
class BlockImporterItSpec
extends MockFactory
with TestSetupWithVmAndValidators
with AsyncFlatSpecLike
with Matchers
with BeforeAndAfterAll {

implicit val testScheduler = Scheduler.fixedPool("test", 32)

Expand Down Expand Up @@ -57,11 +62,15 @@ class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators
ethCompatibleStorage = true
)

override lazy val ledger = new TestLedgerImpl(successValidators) {
override private[ledger] lazy val blockExecution = new BlockExecution(blockchain, blockchainConfig, consensus.blockPreparator, blockValidation) {
override def executeAndValidateBlock(block: Block, alreadyValidated: Boolean = false): Either[BlockExecutionError, Seq[Receipt]] =
Right(BlockResult(emptyWorld).receipts)
}
override lazy val ledger = new TestLedgerImpl(successValidators) {
override private[ledger] lazy val blockExecution =
new BlockExecution(blockchain, blockchainConfig, consensus.blockPreparator, blockValidation) {
override def executeAndValidateBlock(
block: Block,
alreadyValidated: Boolean = false
): Either[BlockExecutionError, Seq[Receipt]] =
Right(BlockResult(emptyWorld).receipts)
}
}

val blockImporter = system.actorOf(
Expand All @@ -75,7 +84,8 @@ class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators
pendingTransactionsManagerProbe.ref,
checkpointBlockGenerator,
supervisor.ref
))
)
)

val genesisBlock = blockchain.genesisBlock
val block1: Block = getBlock(genesisBlock.number + 1, parent = genesisBlock.header.hash)
Expand Down Expand Up @@ -119,7 +129,8 @@ class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators
pendingTransactionsManagerProbe.ref,
checkpointBlockGenerator,
supervisor.ref
))
)
)

blockImporter ! BlockImporter.Start
blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(newBranch))
Expand All @@ -142,7 +153,6 @@ class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators
blockchain.getBestBlock().get shouldEqual newBlock3
}


it should "switch to a branch with a checkpoint" in {

val checkpoint = ObjectGenerators.fakeCheckpointGen(3, 3).sample.get
Expand Down
24 changes: 24 additions & 0 deletions src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ object Blacklist {
val code: Int = 9
val name: String = "PeerActorTerminated"
}
case object InvalidStateResponseType extends BlacklistReasonType {
val code: Int = 10
val name: String = "InvalidStateResponse"
}
case object InvalidPivotBlockElectionResponseType extends BlacklistReasonType {
val code: Int = 11
val name: String = "InvalidPivotElectionResponse"
}
case object PivotBlockElectionTimeoutType extends BlacklistReasonType {
val code: Int = 12
val name: String = "PivotBlockElectionTimeout"
}
}

case object WrongBlockHeaders extends BlacklistReason {
Expand Down Expand Up @@ -115,6 +127,18 @@ object Blacklist {
val reasonType: BlacklistReasonType = PeerActorTerminatedType
val description: String = "Peer actor terminated"
}
final case class InvalidStateResponse(details: String) extends BlacklistReason {
val reasonType: BlacklistReasonType = InvalidStateResponseType
val description: String = s"Invalid response while syncing state trie: $details"
}
case object InvalidPivotBlockElectionResponse extends BlacklistReason {
val reasonType: BlacklistReasonType = InvalidStateResponseType
val description: String = "Invalid response while selecting pivot block"
}
case object PivotBlockElectionTimeout extends BlacklistReason {
val reasonType: BlacklistReasonType = InvalidStateResponseType
val description: String = "Peer didn't respond with requested pivot block candidate in a timely manner"
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ class SyncController(
checkpointBlockGenerator: CheckpointBlockGenerator,
ommersPool: ActorRef,
etcPeerManager: ActorRef,
blacklist: Blacklist,
syncConfig: SyncConfig,
externalSchedulerOpt: Option[Scheduler] = None
) extends Actor
with ActorLogging {

private val blacklistSize: Int = 1000 // TODO ETCM-642 move to config
private val blacklist: Blacklist = CacheBasedBlacklist.empty(blacklistSize)

def scheduler: Scheduler = externalSchedulerOpt getOrElse context.system.scheduler

override def receive: Receive = idle
Expand Down Expand Up @@ -132,6 +130,7 @@ object SyncController {
checkpointBlockGenerator: CheckpointBlockGenerator,
ommersPool: ActorRef,
etcPeerManager: ActorRef,
blacklist: Blacklist,
syncConfig: SyncConfig
): Props =
Props(
Expand All @@ -146,6 +145,7 @@ object SyncController {
checkpointBlockGenerator,
ommersPool,
etcPeerManager,
blacklist,
syncConfig
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class FastSync(
def startFromScratch(): Unit = {
log.info("Starting fast sync from scratch")
val pivotBlockSelector = context.actorOf(
PivotBlockSelector.props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self),
PivotBlockSelector.props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self, blacklist),
"pivot-block-selector"
)
pivotBlockSelector ! PivotBlockSelector.SelectPivotBlock
Expand Down Expand Up @@ -138,6 +138,7 @@ class FastSync(
syncConfig,
etcPeerManager,
peerEventBus,
blacklist,
scheduler
),
"state-scheduler"
Expand Down Expand Up @@ -218,7 +219,7 @@ class FastSync(
log.info("Asking for new pivot block")
val pivotBlockSelector = {
context.actorOf(
PivotBlockSelector.props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self)
PivotBlockSelector.props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self, blacklist)
)
}
pivotBlockSelector ! PivotBlockSelector.SelectPivotBlock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ package io.iohk.ethereum.blockchain.sync.fast

import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props, Scheduler}
import akka.util.ByteString
import io.iohk.ethereum.blockchain.sync.{BlacklistSupport, PeerListSupport}
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason.{
InvalidPivotBlockElectionResponse,
PivotBlockElectionTimeout
}
import io.iohk.ethereum.blockchain.sync.PeerListSupportNg.PeerWithInfo
import io.iohk.ethereum.blockchain.sync.{Blacklist, PeerListSupportNg}
import io.iohk.ethereum.domain.BlockHeader
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
Expand All @@ -21,28 +26,26 @@ class PivotBlockSelector(
val peerEventBus: ActorRef,
val syncConfig: SyncConfig,
val scheduler: Scheduler,
fastSync: ActorRef
fastSync: ActorRef,
val blacklist: Blacklist
) extends Actor
with ActorLogging
with PeerListSupport
with BlacklistSupport {
with PeerListSupportNg {

import PivotBlockSelector._
import syncConfig._

def handleCommonMessages: Receive = handlePeerListMessages orElse handleBlacklistMessages

override def receive: Receive = idle

def idle: Receive = handleCommonMessages orElse { case SelectPivotBlock =>
private def idle: Receive = handlePeerListMessages orElse { case SelectPivotBlock =>
val election @ ElectionDetails(correctPeers, expectedPivotBlock) = collectVoters

if (election.isEnoughVoters(minPeersToChoosePivotBlock)) {

val (peersToAsk, waitingPeers) = correctPeers.splitAt(minPeersToChoosePivotBlock + peersToChoosePivotBlockMargin)

log.info(
"Trying to choose fast sync pivot block using {} peers ({} correct ones). Ask {} peers for block nr {}",
"Trying to choose fast sync pivot block using {} peers ({} ones with high enough block). Ask {} peers for block nr {}",
peersToDownloadFrom.size,
correctPeers.size,
peersToAsk.size,
Expand Down Expand Up @@ -78,7 +81,7 @@ class PivotBlockSelector(
timeout: Cancellable,
headers: Map[ByteString, BlockHeaderWithVotes]
): Receive =
handleCommonMessages orElse {
handlePeerListMessages orElse {
case MessageFromPeer(blockHeaders: BlockHeaders, peerId) =>
peerEventBus ! Unsubscribe(MessageClassifier(Set(Codes.BlockHeadersCode), PeerSelector.WithId(peerId)))
val updatedPeersToAsk = peersToAsk - peerId
Expand All @@ -93,12 +96,12 @@ class PivotBlockSelector(
val updatedHeaders = headers.updated(targetBlockHeader.hash, newValue)
votingProcess(updatedPeersToAsk, waitingPeers, pivotBlockNumber, timeout, updatedHeaders)
case None =>
blacklist(peerId, blacklistDuration, "Did not respond with pivot block header, blacklisting")
blacklist.add(peerId, blacklistDuration, InvalidPivotBlockElectionResponse)
votingProcess(updatedPeersToAsk, waitingPeers, pivotBlockNumber, timeout, headers)
}
case ElectionPivotBlockTimeout =>
peersToAsk.foreach { peerId =>
blacklist(peerId, blacklistDuration, "Did not respond with pivot block header (timeout), blacklisting")
blacklist.add(peerId, blacklistDuration, PivotBlockElectionTimeout)
}
peerEventBus ! Unsubscribe()
log.info("Pivot block header receive timeout. Retrying in {}", startRetryInterval)
Expand Down Expand Up @@ -155,12 +158,12 @@ class PivotBlockSelector(
private def isPossibleToReachConsensus(peersLeft: Int, bestHeaderVotes: Int): Boolean =
peersLeft + bestHeaderVotes >= minPeersToChoosePivotBlock

def scheduleRetry(interval: FiniteDuration): Unit = {
private def scheduleRetry(interval: FiniteDuration): Unit = {
scheduler.scheduleOnce(interval, self, SelectPivotBlock)
context become idle
}

def sendResponseAndCleanup(pivotBlockHeader: BlockHeader): Unit = {
private def sendResponseAndCleanup(pivotBlockHeader: BlockHeader): Unit = {
log.info("Found pivot block: {} hash: {}", pivotBlockHeader.number, pivotBlockHeader.hashAsHexString)
fastSync ! Result(pivotBlockHeader)
peerEventBus ! Unsubscribe()
Expand All @@ -176,8 +179,9 @@ class PivotBlockSelector(
}

private def collectVoters: ElectionDetails = {
val peersUsedToChooseTarget = peersToDownloadFrom.collect { case (peer, PeerInfo(_, _, true, maxBlockNumber, _)) =>
(peer, maxBlockNumber)
val peersUsedToChooseTarget = peersToDownloadFrom.collect {
case (_, PeerWithInfo(peer, PeerInfo(_, _, true, maxBlockNumber, _))) =>
(peer, maxBlockNumber)
}

val peersSortedByBestNumber = peersUsedToChooseTarget.toList.sortBy { case (_, number) => -number }
Expand All @@ -199,9 +203,10 @@ object PivotBlockSelector {
peerEventBus: ActorRef,
syncConfig: SyncConfig,
scheduler: Scheduler,
fastSync: ActorRef
fastSync: ActorRef,
blacklist: Blacklist
): Props =
Props(new PivotBlockSelector(etcPeerManager: ActorRef, peerEventBus, syncConfig, scheduler, fastSync))
Props(new PivotBlockSelector(etcPeerManager: ActorRef, peerEventBus, syncConfig, scheduler, fastSync, blacklist))

case object SelectPivotBlock
case class Result(targetBlockHeader: BlockHeader)
Expand Down
Loading