Skip to content

[ETCM-316] Fast-sync branch resolver #887

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
10da132
init new actor, FastSyncBranchResolver
biandratti Jan 8, 2021
4e7ec34
added searching mode in fast sync branch resolver
biandratti Jan 11, 2021
a05f98a
fix style
biandratti Jan 11, 2021
bb39178
add schedule when don't get peers
biandratti Jan 12, 2021
0020d1c
Added ut in class FastSyncBranchResolver
biandratti Jan 15, 2021
36a7754
change messages
biandratti Jan 15, 2021
14a15d8
fix case object...
biandratti Jan 15, 2021
4233cd4
add new unit test
biandratti Jan 18, 2021
2ce03dc
add new unit test
biandratti Jan 18, 2021
f35d963
change name getFirstCommonBlock
biandratti Jan 19, 2021
1dfb0c6
change name to batch
biandratti Jan 19, 2021
b529bee
init new actor, FastSyncBranchResolver
biandratti Jan 8, 2021
6ea0f84
added searching mode in fast sync branch resolver
biandratti Jan 11, 2021
6c2f5cd
fix style
biandratti Jan 11, 2021
77ac352
add schedule when don't get peers
biandratti Jan 12, 2021
1044f86
Added ut in class FastSyncBranchResolver
biandratti Jan 15, 2021
6a067a4
change messages
biandratti Jan 15, 2021
d9f3b90
fix case object...
biandratti Jan 15, 2021
b758030
add new unit test
biandratti Jan 18, 2021
a6045bd
add new unit test
biandratti Jan 18, 2021
1dc68d8
create actor FastSyncBranchResolver
biandratti Jan 18, 2021
5f15af4
change name getFirstCommonBlock
biandratti Jan 19, 2021
5da37c3
change name to batch
biandratti Jan 19, 2021
230065f
Reformat triggered by sbt pp
1015bit Feb 17, 2021
e434d97
Cleanup and simplify
1015bit Feb 18, 2021
107dc18
Handle error cases
1015bit Feb 24, 2021
3c9e4d6
Fix tests
1015bit Feb 24, 2021
8834451
[ETCM-316] Add more tests and fix binary search logic
1015bit Feb 26, 2021
2f7be23
[ETCM-316] Finish tests for branch resolving
1015bit Feb 28, 2021
241d34b
[ETCM-316] Cleanup
1015bit Feb 28, 2021
f3d0c6b
[ETCM-316] Small test improvements
1015bit Feb 28, 2021
aff2728
[ETCM-316] Log binary search state
1015bit Feb 28, 2021
1f0ae9b
[ETCM-316] Move some logging to improve readability
1015bit Mar 1, 2021
921ccab
Merge develop into branch
1015bit Mar 1, 2021
7672baa
Merge ETCM-313 into branch
1015bit Mar 2, 2021
122dd41
[ETCM-316] Remove unneeded errors and reformat
1015bit Mar 2, 2021
e5eac10
[ETCM-316] Handle branch resolution failure
1015bit Mar 2, 2021
da89353
[ETCM-316] Address PR comments
1015bit Mar 4, 2021
d7c6f84
[ETCM-316] Remove unnecessary string interpolation
1015bit Mar 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration._

class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators with AsyncFlatSpecLike with Matchers with BeforeAndAfterAll {
class BlockImporterItSpec
extends MockFactory
with TestSetupWithVmAndValidators
with AsyncFlatSpecLike
with Matchers
with BeforeAndAfterAll {

implicit val testScheduler = Scheduler.fixedPool("test", 32)

Expand Down Expand Up @@ -57,11 +62,15 @@ class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators
ethCompatibleStorage = true
)

override lazy val ledger = new TestLedgerImpl(successValidators) {
override private[ledger] lazy val blockExecution = new BlockExecution(blockchain, blockchainConfig, consensus.blockPreparator, blockValidation) {
override def executeAndValidateBlock(block: Block, alreadyValidated: Boolean = false): Either[BlockExecutionError, Seq[Receipt]] =
Right(BlockResult(emptyWorld).receipts)
}
override lazy val ledger = new TestLedgerImpl(successValidators) {
override private[ledger] lazy val blockExecution =
new BlockExecution(blockchain, blockchainConfig, consensus.blockPreparator, blockValidation) {
override def executeAndValidateBlock(
block: Block,
alreadyValidated: Boolean = false
): Either[BlockExecutionError, Seq[Receipt]] =
Right(BlockResult(emptyWorld).receipts)
}
}

val blockImporter = system.actorOf(
Expand All @@ -75,7 +84,8 @@ class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators
pendingTransactionsManagerProbe.ref,
checkpointBlockGenerator,
supervisor.ref
))
)
)

val genesisBlock = blockchain.genesisBlock
val block1: Block = getBlock(genesisBlock.number + 1, parent = genesisBlock.header.hash)
Expand Down Expand Up @@ -119,7 +129,8 @@ class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators
pendingTransactionsManagerProbe.ref,
checkpointBlockGenerator,
supervisor.ref
))
)
)

blockImporter ! BlockImporter.Start
blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(newBranch))
Expand All @@ -142,7 +153,6 @@ class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators
blockchain.getBestBlock().get shouldEqual newBlock3
}


it should "switch to a branch with a checkpoint" in {

val checkpoint = ObjectGenerators.fakeCheckpointGen(3, 3).sample.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ object Blacklist {

case object WrongBlockHeaders extends BlacklistReason {
val reasonType: BlacklistReasonType = WrongBlockHeadersType
val description: String = "Wrong blockheaders response (empty or not chain forming)"
val description: String = "Wrong blockheaders response: Peer didn't respond with requested block headers."
}
case object BlockHeaderValidationFailed extends BlacklistReason {
val reasonType: BlacklistReasonType = BlockHeaderValidationFailedType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ trait PeerListSupportNg { self: Actor with ActorLogging =>

private implicit val ec: ExecutionContext = context.dispatcher

private val bigIntReverseOrdering: Ordering[BigInt] = Ordering[BigInt].reverse

def etcPeerManager: ActorRef
def peerEventBus: ActorRef
def blacklist: Blacklist
Expand Down Expand Up @@ -44,6 +46,11 @@ trait PeerListSupportNg { self: Actor with ActorLogging =>

def getPeerById(peerId: PeerId): Option[Peer] = handshakedPeers.get(peerId).map(_.peer)

def getPeerWithHighestBlock: Option[PeerWithInfo] =
peersToDownloadFrom.values.toList.sortBy { case PeerWithInfo(_, peerInfo) =>
peerInfo.maxBlockNumber
}(bigIntReverseOrdering).headOption

def blacklistIfHandshaked(peerId: PeerId, duration: FiniteDuration, reason: BlacklistReason): Unit =
handshakedPeers.get(peerId).foreach(_ => blacklist.add(peerId, duration, reason))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ class PeerRequestHandler[RequestMsg <: Message, ResponseMsg <: Message: ClassTag

import PeerRequestHandler._

val initiator: ActorRef = context.parent
private val initiator: ActorRef = context.parent

val timeout: Cancellable = scheduler.scheduleOnce(responseTimeout, self, Timeout)
private val timeout: Cancellable = scheduler.scheduleOnce(responseTimeout, self, Timeout)

val startTime: Long = System.currentTimeMillis()
private val startTime: Long = System.currentTimeMillis()

private def subscribeMessageClassifier = MessageClassifier(Set(responseMsgCode), PeerSelector.WithId(peer.id))

def timeTakenSoFar(): Long = System.currentTimeMillis() - startTime
private def timeTakenSoFar(): Long = System.currentTimeMillis() - startTime

override def preStart(): Unit = {
etcPeerManager ! EtcPeerManagerActor.SendMessage(toSerializable(requestMsg), peer.id)
Expand Down Expand Up @@ -79,8 +79,8 @@ object PeerRequestHandler {
)(implicit scheduler: Scheduler, toSerializable: RequestMsg => MessageSerializable): Props =
Props(new PeerRequestHandler(peer, responseTimeout, etcPeerManager, peerEventBus, requestMsg, responseMsgCode))

case class RequestFailed(peer: Peer, reason: String)
case class ResponseReceived[T](peer: Peer, response: T, timeTaken: Long)
final case class RequestFailed(peer: Peer, reason: String)
final case class ResponseReceived[T](peer: Peer, response: T, timeTaken: Long)

private case object Timeout
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ class FastSync(
private val syncStateStorageActor = context.actorOf(Props[StateStorageActor](), "state-storage")
syncStateStorageActor ! fastSyncStateStorage

private val branchResolver = context.actorOf(Props[FastSyncBranchResolver](), "fast-sync-branch-resolver")
private val branchResolver = context.actorOf(
FastSyncBranchResolverActor
.props(self, peerEventBus, etcPeerManager, blockchain, blacklist, syncConfig, appStateStorage, scheduler),
"fast-sync-branch-resolver"
)

private val syncStateScheduler = context.actorOf(
SyncStateSchedulerActor
Expand Down Expand Up @@ -288,13 +292,13 @@ class FastSync(

// Start branch resolution and wait for response from the FastSyncBranchResolver actor.
context become waitingForBranchResolution
branchResolver ! FastSyncBranchResolver.StartBranchResolver
branchResolver ! FastSyncBranchResolverActor.StartBranchResolver
currentSkeleton = None
}
}

private def waitingForBranchResolution: Receive = handleStatus orElse {
case FastSyncBranchResolver.BranchResolvedSuccessful(firstCommonBlockNumber, resolvedPeer) =>
case FastSyncBranchResolverActor.BranchResolvedSuccessful(firstCommonBlockNumber, resolvedPeer) =>
// Reset the batch failures count
batchFailuresCount = 0

Expand All @@ -303,6 +307,9 @@ class FastSync(
masterPeer = Some(resolvedPeer)
context become receive
processSyncing()
case _: FastSyncBranchResolverActor.BranchResolutionFailed =>
// there isn't much we can do if we don't find a branch/peer to continue syncing, so let's try again
branchResolver ! FastSyncBranchResolverActor.StartBranchResolver
}

private def blockHeadersError(peer: Peer): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,116 @@
package io.iohk.ethereum.blockchain.sync.fast

import akka.actor.Actor
import cats.data.NonEmptyList
import io.iohk.ethereum.domain.{BlockHeader, Blockchain}
import io.iohk.ethereum.network.Peer
import io.iohk.ethereum.utils.Logger

class FastSyncBranchResolver extends Actor {
trait FastSyncBranchResolver {

override def receive: Receive = ???
import FastSyncBranchResolver._

protected def blockchain: Blockchain

// TODO [ETCM-676] move to [[Blockchain]] and make sure it's atomic
def discardBlocksAfter(lastValidBlock: BigInt): Unit =
discardBlocks(lastValidBlock, blockchain.getBestBlockNumber())

// TODO [ETCM-676] move to [[Blockchain]] and make sure it's atomic
private def discardBlocks(fromBlock: BigInt, toBlock: BigInt): Unit = {
val blocksToBeRemoved = childOf(fromBlock).to(toBlock).reverse.toList
blocksToBeRemoved.foreach { toBeRemoved =>
blockchain
.getBlockHeaderByNumber(toBeRemoved)
.foreach(header => blockchain.removeBlock(header.hash, withState = false))
}
}

}

object FastSyncBranchResolver {
sealed trait BranchResolverRequest
case object StartBranchResolver extends BranchResolverRequest

sealed trait BranchResolverResponse
case class BranchResolvedSuccessful(firstCommonBlockNumber: BigInt, masterPeer: Peer) extends BranchResolverResponse
/**
* Stores the current search state for binary search.
* Meaning we know the first common block lies between minBlockNumber and maxBlockNumber.
*/
final case class SearchState(minBlockNumber: BigInt, maxBlockNumber: BigInt, masterPeer: Peer)

def parentOf(blockHeaderNumber: BigInt): BigInt = blockHeaderNumber - 1
def childOf(blockHeaderNumber: BigInt): BigInt = blockHeaderNumber + 1
}

/**
* Attempt to find last common block within recent blocks by looking for a parent/child
* relationship between our block headers and remote peer's block headers.
*/
class RecentBlocksSearch(blockchain: Blockchain) {

/**
* Find the highest common block by trying to find a block so that our block n is the parent of remote candidate block n + 1
*/
def getHighestCommonBlock(
candidateHeaders: Seq[BlockHeader],
bestBlockNumber: BigInt
): Option[BigInt] = {
def isParent(potentialParent: BigInt, childCandidate: BlockHeader): Boolean =
blockchain.getBlockHeaderByNumber(potentialParent).exists { _.isParentOf(childCandidate) }
NonEmptyList.fromList(candidateHeaders.reverse.toList).flatMap { remoteHeaders =>
val blocksToBeCompared = bestBlockNumber.until(bestBlockNumber - remoteHeaders.size).by(-1).toList
remoteHeaders.toList
.zip(blocksToBeCompared)
.collectFirst {
case (childCandidate, parent) if isParent(parent, childCandidate) => parent
}
}
}

}

object BinarySearchSupport extends Logger {
import FastSyncBranchResolver._

sealed trait BinarySearchResult
final case class BinarySearchCompleted(highestCommonBlockNumber: BigInt) extends BinarySearchResult
final case class ContinueBinarySearch(searchState: SearchState) extends BinarySearchResult
case object NoCommonBlock extends BinarySearchResult

/**
* Returns the block number in the middle between min and max.
* If there is no middle, it will return the lower value.
*
* E.g. calling this method with min = 3 and max = 6 will return 4
*/
def middleBlockNumber(min: BigInt, max: BigInt): BigInt = (min + max) / 2

def blockHeaderNumberToRequest(min: BigInt, max: BigInt): BigInt =
childOf(middleBlockNumber(min, max))

def validateBlockHeaders(
parentBlockHeader: BlockHeader,
childBlockHeader: BlockHeader,
searchState: SearchState
): BinarySearchResult = {
val childNum = childBlockHeader.number
val parentNum = parentBlockHeader.number
val min = searchState.minBlockNumber
val max = searchState.maxBlockNumber

log.debug(
"Validating block headers (binary search) for parentBlockHeader {}, remote childBlockHeader {} and search state {}",
parentBlockHeader.number,
childBlockHeader.number,
searchState
)

if (parentBlockHeader.isParentOf(childBlockHeader)) { // chains are still aligned but there might be an even better block
if (parentNum == max) BinarySearchCompleted(parentNum)
else if (parentNum == min && childNum == max) ContinueBinarySearch(searchState.copy(minBlockNumber = childNum))
else ContinueBinarySearch(searchState.copy(minBlockNumber = parentNum))
} else { // no parent/child -> chains have diverged before parent block
if (min == 1 && max <= 2) NoCommonBlock
else if (min == max) BinarySearchCompleted(parentOf(parentNum))
else ContinueBinarySearch(searchState.copy(maxBlockNumber = parentOf(parentNum).max(1)))
}
}

}
Loading