Skip to content

Commit 141ec18

Browse files
biandratti1015bit
andauthored
[ETCM-316] Fast-sync branch resolver (#887)
* init new actor, FastSyncBranchResolver * added searching mode in fast sync branch resolver * fix style * add schedule when don't get peers * Added ut in class FastSyncBranchResolver * change messages * fix case object... * add new unit test * add new unit test * change name getFirstCommonBlock * change name to batch * init new actor, FastSyncBranchResolver * added searching mode in fast sync branch resolver * fix style * add schedule when don't get peers * Added ut in class FastSyncBranchResolver * change messages * fix case object... * add new unit test * add new unit test * create actor FastSyncBranchResolver * change name getFirstCommonBlock * change name to batch * Reformat triggered by sbt pp * Cleanup and simplify * Handle error cases * Fix tests * [ETCM-316] Add more tests and fix binary search logic * [ETCM-316] Finish tests for branch resolving * [ETCM-316] Cleanup * [ETCM-316] Small test improvements * [ETCM-316] Log binary search state * [ETCM-316] Move some logging to improve readability * [ETCM-316] Remove unneeded errors and reformat * [ETCM-316] Handle branch resolution failure * [ETCM-316] Address PR comments * [ETCM-316] Remove unnecessary string interpolation Co-authored-by: Petra Bierleutgeb <[email protected]>
1 parent 6d918a9 commit 141ec18

File tree

13 files changed

+1101
-29
lines changed

13 files changed

+1101
-29
lines changed

src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@ import org.scalatest.matchers.should.Matchers
2121

2222
import scala.concurrent.duration._
2323

24-
class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators with AsyncFlatSpecLike with Matchers with BeforeAndAfterAll {
24+
class BlockImporterItSpec
25+
extends MockFactory
26+
with TestSetupWithVmAndValidators
27+
with AsyncFlatSpecLike
28+
with Matchers
29+
with BeforeAndAfterAll {
2530

2631
implicit val testScheduler = Scheduler.fixedPool("test", 32)
2732

@@ -57,11 +62,15 @@ class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators
5762
ethCompatibleStorage = true
5863
)
5964

60-
override lazy val ledger = new TestLedgerImpl(successValidators) {
61-
override private[ledger] lazy val blockExecution = new BlockExecution(blockchain, blockchainConfig, consensus.blockPreparator, blockValidation) {
62-
override def executeAndValidateBlock(block: Block, alreadyValidated: Boolean = false): Either[BlockExecutionError, Seq[Receipt]] =
63-
Right(BlockResult(emptyWorld).receipts)
64-
}
65+
override lazy val ledger = new TestLedgerImpl(successValidators) {
66+
override private[ledger] lazy val blockExecution =
67+
new BlockExecution(blockchain, blockchainConfig, consensus.blockPreparator, blockValidation) {
68+
override def executeAndValidateBlock(
69+
block: Block,
70+
alreadyValidated: Boolean = false
71+
): Either[BlockExecutionError, Seq[Receipt]] =
72+
Right(BlockResult(emptyWorld).receipts)
73+
}
6574
}
6675

6776
val blockImporter = system.actorOf(
@@ -75,7 +84,8 @@ class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators
7584
pendingTransactionsManagerProbe.ref,
7685
checkpointBlockGenerator,
7786
supervisor.ref
78-
))
87+
)
88+
)
7989

8090
val genesisBlock = blockchain.genesisBlock
8191
val block1: Block = getBlock(genesisBlock.number + 1, parent = genesisBlock.header.hash)
@@ -119,7 +129,8 @@ class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators
119129
pendingTransactionsManagerProbe.ref,
120130
checkpointBlockGenerator,
121131
supervisor.ref
122-
))
132+
)
133+
)
123134

124135
blockImporter ! BlockImporter.Start
125136
blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(newBranch))
@@ -142,7 +153,6 @@ class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators
142153
blockchain.getBestBlock().get shouldEqual newBlock3
143154
}
144155

145-
146156
it should "switch to a branch with a checkpoint" in {
147157

148158
val checkpoint = ObjectGenerators.fakeCheckpointGen(3, 3).sample.get

src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ object Blacklist {
8181

8282
case object WrongBlockHeaders extends BlacklistReason {
8383
val reasonType: BlacklistReasonType = WrongBlockHeadersType
84-
val description: String = "Wrong blockheaders response (empty or not chain forming)"
84+
val description: String = "Wrong blockheaders response: Peer didn't respond with requested block headers."
8585
}
8686
case object BlockHeaderValidationFailed extends BlacklistReason {
8787
val reasonType: BlacklistReasonType = BlockHeaderValidationFailedType

src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ trait PeerListSupportNg { self: Actor with ActorLogging =>
1717

1818
private implicit val ec: ExecutionContext = context.dispatcher
1919

20+
private val bigIntReverseOrdering: Ordering[BigInt] = Ordering[BigInt].reverse
21+
2022
def etcPeerManager: ActorRef
2123
def peerEventBus: ActorRef
2224
def blacklist: Blacklist
@@ -44,6 +46,11 @@ trait PeerListSupportNg { self: Actor with ActorLogging =>
4446

4547
def getPeerById(peerId: PeerId): Option[Peer] = handshakedPeers.get(peerId).map(_.peer)
4648

49+
def getPeerWithHighestBlock: Option[PeerWithInfo] =
50+
peersToDownloadFrom.values.toList.sortBy { case PeerWithInfo(_, peerInfo) =>
51+
peerInfo.maxBlockNumber
52+
}(bigIntReverseOrdering).headOption
53+
4754
def blacklistIfHandshaked(peerId: PeerId, duration: FiniteDuration, reason: BlacklistReason): Unit =
4855
handshakedPeers.get(peerId).foreach(_ => blacklist.add(peerId, duration, reason))
4956

src/main/scala/io/iohk/ethereum/blockchain/sync/PeerRequestHandler.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@ class PeerRequestHandler[RequestMsg <: Message, ResponseMsg <: Message: ClassTag
2424

2525
import PeerRequestHandler._
2626

27-
val initiator: ActorRef = context.parent
27+
private val initiator: ActorRef = context.parent
2828

29-
val timeout: Cancellable = scheduler.scheduleOnce(responseTimeout, self, Timeout)
29+
private val timeout: Cancellable = scheduler.scheduleOnce(responseTimeout, self, Timeout)
3030

31-
val startTime: Long = System.currentTimeMillis()
31+
private val startTime: Long = System.currentTimeMillis()
3232

3333
private def subscribeMessageClassifier = MessageClassifier(Set(responseMsgCode), PeerSelector.WithId(peer.id))
3434

35-
def timeTakenSoFar(): Long = System.currentTimeMillis() - startTime
35+
private def timeTakenSoFar(): Long = System.currentTimeMillis() - startTime
3636

3737
override def preStart(): Unit = {
3838
etcPeerManager ! EtcPeerManagerActor.SendMessage(toSerializable(requestMsg), peer.id)
@@ -79,8 +79,8 @@ object PeerRequestHandler {
7979
)(implicit scheduler: Scheduler, toSerializable: RequestMsg => MessageSerializable): Props =
8080
Props(new PeerRequestHandler(peer, responseTimeout, etcPeerManager, peerEventBus, requestMsg, responseMsgCode))
8181

82-
case class RequestFailed(peer: Peer, reason: String)
83-
case class ResponseReceived[T](peer: Peer, response: T, timeTaken: Long)
82+
final case class RequestFailed(peer: Peer, reason: String)
83+
final case class ResponseReceived[T](peer: Peer, response: T, timeTaken: Long)
8484

8585
private case object Timeout
8686
}

src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,11 @@ class FastSync(
136136
private val syncStateStorageActor = context.actorOf(Props[StateStorageActor](), "state-storage")
137137
syncStateStorageActor ! fastSyncStateStorage
138138

139-
private val branchResolver = context.actorOf(Props[FastSyncBranchResolver](), "fast-sync-branch-resolver")
139+
private val branchResolver = context.actorOf(
140+
FastSyncBranchResolverActor
141+
.props(self, peerEventBus, etcPeerManager, blockchain, blacklist, syncConfig, appStateStorage, scheduler),
142+
"fast-sync-branch-resolver"
143+
)
140144

141145
private val syncStateScheduler = context.actorOf(
142146
SyncStateSchedulerActor
@@ -288,13 +292,13 @@ class FastSync(
288292

289293
// Start branch resolution and wait for response from the FastSyncBranchResolver actor.
290294
context become waitingForBranchResolution
291-
branchResolver ! FastSyncBranchResolver.StartBranchResolver
295+
branchResolver ! FastSyncBranchResolverActor.StartBranchResolver
292296
currentSkeleton = None
293297
}
294298
}
295299

296300
private def waitingForBranchResolution: Receive = handleStatus orElse {
297-
case FastSyncBranchResolver.BranchResolvedSuccessful(firstCommonBlockNumber, resolvedPeer) =>
301+
case FastSyncBranchResolverActor.BranchResolvedSuccessful(firstCommonBlockNumber, resolvedPeer) =>
298302
// Reset the batch failures count
299303
batchFailuresCount = 0
300304

@@ -303,6 +307,9 @@ class FastSync(
303307
masterPeer = Some(resolvedPeer)
304308
context become receive
305309
processSyncing()
310+
case _: FastSyncBranchResolverActor.BranchResolutionFailed =>
311+
// there isn't much we can do if we don't find a branch/peer to continue syncing, so let's try again
312+
branchResolver ! FastSyncBranchResolverActor.StartBranchResolver
306313
}
307314

308315
private def blockHeadersError(peer: Peer): Unit = {
Lines changed: 105 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,116 @@
11
package io.iohk.ethereum.blockchain.sync.fast
22

3-
import akka.actor.Actor
3+
import cats.data.NonEmptyList
4+
import io.iohk.ethereum.domain.{BlockHeader, Blockchain}
45
import io.iohk.ethereum.network.Peer
6+
import io.iohk.ethereum.utils.Logger
57

6-
class FastSyncBranchResolver extends Actor {
8+
trait FastSyncBranchResolver {
79

8-
override def receive: Receive = ???
10+
import FastSyncBranchResolver._
11+
12+
protected def blockchain: Blockchain
13+
14+
// TODO [ETCM-676] move to [[Blockchain]] and make sure it's atomic
15+
def discardBlocksAfter(lastValidBlock: BigInt): Unit =
16+
discardBlocks(lastValidBlock, blockchain.getBestBlockNumber())
17+
18+
// TODO [ETCM-676] move to [[Blockchain]] and make sure it's atomic
19+
private def discardBlocks(fromBlock: BigInt, toBlock: BigInt): Unit = {
20+
val blocksToBeRemoved = childOf(fromBlock).to(toBlock).reverse.toList
21+
blocksToBeRemoved.foreach { toBeRemoved =>
22+
blockchain
23+
.getBlockHeaderByNumber(toBeRemoved)
24+
.foreach(header => blockchain.removeBlock(header.hash, withState = false))
25+
}
26+
}
927

1028
}
1129

1230
object FastSyncBranchResolver {
13-
sealed trait BranchResolverRequest
14-
case object StartBranchResolver extends BranchResolverRequest
1531

16-
sealed trait BranchResolverResponse
17-
case class BranchResolvedSuccessful(firstCommonBlockNumber: BigInt, masterPeer: Peer) extends BranchResolverResponse
32+
/**
33+
* Stores the current search state for binary search.
34+
* Meaning we know the first common block lies between minBlockNumber and maxBlockNumber.
35+
*/
36+
final case class SearchState(minBlockNumber: BigInt, maxBlockNumber: BigInt, masterPeer: Peer)
37+
38+
def parentOf(blockHeaderNumber: BigInt): BigInt = blockHeaderNumber - 1
39+
def childOf(blockHeaderNumber: BigInt): BigInt = blockHeaderNumber + 1
40+
}
41+
42+
/**
43+
* Attempt to find last common block within recent blocks by looking for a parent/child
44+
* relationship between our block headers and remote peer's block headers.
45+
*/
46+
class RecentBlocksSearch(blockchain: Blockchain) {
47+
48+
/**
49+
* Find the highest common block by trying to find a block so that our block n is the parent of remote candidate block n + 1
50+
*/
51+
def getHighestCommonBlock(
52+
candidateHeaders: Seq[BlockHeader],
53+
bestBlockNumber: BigInt
54+
): Option[BigInt] = {
55+
def isParent(potentialParent: BigInt, childCandidate: BlockHeader): Boolean =
56+
blockchain.getBlockHeaderByNumber(potentialParent).exists { _.isParentOf(childCandidate) }
57+
NonEmptyList.fromList(candidateHeaders.reverse.toList).flatMap { remoteHeaders =>
58+
val blocksToBeCompared = bestBlockNumber.until(bestBlockNumber - remoteHeaders.size).by(-1).toList
59+
remoteHeaders.toList
60+
.zip(blocksToBeCompared)
61+
.collectFirst {
62+
case (childCandidate, parent) if isParent(parent, childCandidate) => parent
63+
}
64+
}
65+
}
66+
67+
}
68+
69+
object BinarySearchSupport extends Logger {
70+
import FastSyncBranchResolver._
71+
72+
sealed trait BinarySearchResult
73+
final case class BinarySearchCompleted(highestCommonBlockNumber: BigInt) extends BinarySearchResult
74+
final case class ContinueBinarySearch(searchState: SearchState) extends BinarySearchResult
75+
case object NoCommonBlock extends BinarySearchResult
76+
77+
/**
78+
* Returns the block number in the middle between min and max.
79+
* If there is no middle, it will return the lower value.
80+
*
81+
* E.g. calling this method with min = 3 and max = 6 will return 4
82+
*/
83+
def middleBlockNumber(min: BigInt, max: BigInt): BigInt = (min + max) / 2
84+
85+
def blockHeaderNumberToRequest(min: BigInt, max: BigInt): BigInt =
86+
childOf(middleBlockNumber(min, max))
87+
88+
def validateBlockHeaders(
89+
parentBlockHeader: BlockHeader,
90+
childBlockHeader: BlockHeader,
91+
searchState: SearchState
92+
): BinarySearchResult = {
93+
val childNum = childBlockHeader.number
94+
val parentNum = parentBlockHeader.number
95+
val min = searchState.minBlockNumber
96+
val max = searchState.maxBlockNumber
97+
98+
log.debug(
99+
"Validating block headers (binary search) for parentBlockHeader {}, remote childBlockHeader {} and search state {}",
100+
parentBlockHeader.number,
101+
childBlockHeader.number,
102+
searchState
103+
)
104+
105+
if (parentBlockHeader.isParentOf(childBlockHeader)) { // chains are still aligned but there might be an even better block
106+
if (parentNum == max) BinarySearchCompleted(parentNum)
107+
else if (parentNum == min && childNum == max) ContinueBinarySearch(searchState.copy(minBlockNumber = childNum))
108+
else ContinueBinarySearch(searchState.copy(minBlockNumber = parentNum))
109+
} else { // no parent/child -> chains have diverged before parent block
110+
if (min == 1 && max <= 2) NoCommonBlock
111+
else if (min == max) BinarySearchCompleted(parentOf(parentNum))
112+
else ContinueBinarySearch(searchState.copy(maxBlockNumber = parentOf(parentNum).max(1)))
113+
}
114+
}
115+
18116
}

0 commit comments

Comments
 (0)