-
Notifications
You must be signed in to change notification settings - Fork 75
[ECTM-104] Pivot block selection algorithm #711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc65b2c
to
955765e
Compare
955765e
to
8a0d99f
Compare
case MessageFromPeer(blockHeaders: BlockHeaders, peerId) => | ||
peerEventBus ! Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peerId))) | ||
val updatedPeersToAsk = peersToAsk - peerId | ||
val targetBlockHeaderOpt = blockHeaders.headers.find(header => header.number == targetBlockNumber) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should validate that we received exactly one header with required number ?
val peersSortedByBestNumber = peersWithBestBlockNumbers.toList.sortBy(-_._2) | ||
val bestPeerBestBlockNumber = peersSortedByBestNumber.head._2 | ||
val expectedPivotBlock = (bestPeerBestBlockNumber - syncConfig.pivotBlockOffset).max(0) | ||
val peersToAsk = peersSortedByBestNumber.takeWhile(_._2 >= expectedPivotBlock).map(_._1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should reschedule request for pivot header if there is not enough peers to ask ?
} | ||
} | ||
|
||
def tryChooseTargetBlock(peersWithBestBlockNumbers: Map[Peer, BigInt]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
general not: there is a lot of _._1
and smiilar operators which hurts readability of this function
peersToAsk: Set[PeerId], | ||
targetBlockNumber: BigInt, | ||
timeout: Cancellable, | ||
headers: Map[BlockHeader, Int] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe it would be cleanet to keep this data in : Map[ByteString, BlockHeaderWithVotes]
where BlockHeaderWithVotes
is case class BlockHeaderWithVotes(header: BlockHeader, votes: Int)
that way we would able to avoid this _._1
tuple operators and could use header.get(..)
instead of find.
} else { | ||
context become waitingForPivotBlock(updatedPeersToAsk, targetBlockNumber, timeout, updatedHeaders) | ||
} | ||
case None => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering, if one of the peers did not respond or responded badly, maybe we should restart whole process if we won't have enough votes ?
6a31724
to
cd35418
Compare
): Receive = | ||
handleCommonMessages orElse { | ||
case MessageFromPeer(blockHeaders: BlockHeaders, peerId) => | ||
peerEventBus ! Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peerId))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: this whole handler is a litte bit to long for my taste mabye we could split it up into smaller functions ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried, but in this way, it is the most readable
headers.get(targetBlockHeader.hash).map(_.vote).getOrElse(BlockHeaderWithVotes(targetBlockHeader)) | ||
val updatedHeaders = headers.updated(targetBlockHeader.hash, newValue) | ||
val BlockHeaderWithVotes(mostPopularBlockHeader, updatedVotes) = updatedHeaders.mostVotedHeader | ||
if (updatedVotes >= minPeersToChoosePivotBlock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If i understand correctly how this works now is that even if we have lets say 7 peers to whom we send request, we finish selection when we receive minPeersToChoosePivotBlock
same blocks, and we do not wait for the rest of responses ?
I wonder if this does not make such scenario possible:
- We do our selection and receive enough votes for block, but we do not receive header from lets say
peer1
- We are back to fast sync and start making requests, and we make request for x block headers from
peer1
, it means we createPeerRequestHandler
which subsribes toPeerEventBus
forBlockHeaders
messages fromPeer1
. peer1
respond with pivot block.PivotBlockSelector
do not receive response as it already finished, butPeerRequestHandler
from fast sync headers request is still alive and receievs this pivot block- we blacklist
peer1
for response which do not match our fast sync request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is possible. But it is the same case when we assume the timeout - this response could be received later. I could add collecting for all msgs if you prefer, but in a case when we have a lot of peers it won't perform very well
cd35418
to
797b12b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code lgtm! Waiting for testing result on mainnet and mordor
@KonradStaniec I successfully synced with Mordor. Today I will try to run it against the mainnet |
log.info(s"Current target block is fresh enough, starting state download") | ||
if (syncState.targetBlock.stateRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) { | ||
syncState = syncState.copy(pendingMptNodes = Seq()) | ||
private def updatePivotSyncState(state: FinalBlockProcessingResult, targetBlockHeader: BlockHeader): Unit = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) maybe just to safe some space:
private def updatePivotSyncState(state: FinalBlockProcessingResult, targetBlockHeader: BlockHeader): Unit =
state match {
case ImportedLastBlock
if (targetBlockHeader.number - syncState.pivotBlock.number <= syncConfig.maxTargetDifference) =>
log.info(s"Current target block is fresh enough, starting state download")
if (syncState.pivotBlock.stateRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) {
syncState = syncState.copy(pendingMptNodes = Seq())
} else {
syncState = syncState.copy(pendingMptNodes = Seq(StateMptNodeHash(syncState.pivotBlock.stateRoot)))
}
case _ =>
log.info(
s"FinalBlockProcessingResult: $state, changing target to ${targetBlockHeader.number}, new safe target is ${syncState.safeDownloadTarget}"
)
syncState =
syncState.updateTargetBlock(targetBlockHeader, syncConfig.fastSyncBlockValidationX, updateFailures = true)
}
safeDownloadTarget = newTarget.number + numberOfSafeBlocks, | ||
targetBlockUpdateFailures = if (updateFailures) targetBlockUpdateFailures + 1 else targetBlockUpdateFailures | ||
) | ||
def updateTargetBlock(newTarget: BlockHeader, numberOfSafeBlocks: BigInt, updateFailures: Boolean): SyncState = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updatePivotBlock(newPivot: ..
?
@@ -328,8 +360,8 @@ class FastSync( | |||
handleRewind(header, peer, syncConfig.fastSyncBlockValidationN) | |||
case HeadersProcessingFinished => | |||
processSyncing() | |||
case ImportedTargetBlock => | |||
updateTargetBlock(ImportedLastBlock) | |||
case ImportedTargetBlock => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ImportedPivotBlock
?
val targetBlockSelector = context.actorOf(FastSyncTargetBlockSelector.props(etcPeerManager, peerEventBus, syncConfig, scheduler), "target-block-selector") | ||
targetBlockSelector ! FastSyncTargetBlockSelector.ChooseTargetBlock | ||
context become syncingHandler.waitingForTargetBlockUpdate(ImportedLastBlock) | ||
val pivotBlockSelector = context.actorOf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense maybe to create PivotBlockSelector
actor once and ask him for pivot block whenever there's a need for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure. I don't want to change that in this PR. Will see after fastsync refactoring what solution will be the best
val scheduler: Scheduler | ||
) extends Actor | ||
with ActorLogging | ||
with PeerListSupport |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PeersClient
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? We don't need PeersClient functionality here
import FastSyncPivotBlockSelector._ | ||
import syncConfig._ | ||
|
||
val fastSync: ActorRef = context.parent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: could/should fastSync
actor be passed through constructor? Or sender()
used?
import scala.concurrent.Await | ||
import scala.concurrent.duration._ | ||
|
||
class FastSyncPivotBlockSelectorSpec extends AnyFlatSpec with Matchers with BeforeAndAfter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WithActorSystemShutdown
?
val peer3TestProbe: TestProbe = TestProbe("peer3")(system) | ||
val peer4TestProbe: TestProbe = TestProbe("peer4")(system) | ||
|
||
val peer1 = Peer(new InetSocketAddress("127.0.0.1", 0), peer1TestProbe.ref, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shoduln't these peers be extracted into separate file?
import scala.concurrent.ExecutionContext.Implicits.global | ||
import scala.concurrent.duration.FiniteDuration | ||
|
||
class FastSyncPivotBlockSelector( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code of this actor seems ok, but IMHO, we could reflect the Actor behaviour in more expressive way, i will leave some comments regarding that.
|
||
override def receive: Receive = idle | ||
|
||
def idle: Receive = handleCommonMessages orElse { case ChoosePivotBlock => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not ChoosePivotBlock
-> StartPivotBlockElection
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because from parent (fast sync) perspective we wnt to choose a pivot block. It doesn't matter if we use an election for that or sth else
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough, then just a minor one: ChoosePivotBlock
to SelectPivotBlock
? i mean the actor is called FastSyncPivotBlockSelector
...also, it does matter if is fastSync related? or it could be PivotBlockSelector
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, regaring using the name Start...Election
, take into consideration that programming with actors, behind the scenes, is defining a protocol, so, not sure if not express the way the protocol behaves is best than explicitly mention it. I mean if this was a trait instead of an Actor, i would be agree with you regarding not mention the actual implementation.
} | ||
} | ||
|
||
def waitingForPivotBlock( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not runningPivotBlockElection
?
Possible scenarios:
VoteReceived
akaMessageFromPeer ...
ElectionExpired
akaPivotBlockTimeout
scheduleRetry(startRetryInterval) | ||
} | ||
|
||
private def votingProcess( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not countingVotes
?
From my perspective, at this stage these are states the code should try to reflect better:
ConsensusReached
: send response and cleanup
ElectionInProgress
: continue runningPivotBlockElection
ConsensusNotReached.NoMoreVoters
: schedule a new election startPivotBlockElection
ConsensusNotReached.AddNewVoter
: continue runningPivotBlockElection
(adds extra time)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a value of changing votingProcess
to countingVotes
. For me counting votes is process after the election. But it is not our case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think is a thin line, i mean, you are checking or counting votes and defining what to do. But yeah, using the word process
is valid too.
override def receive: Receive = idle | ||
|
||
def idle: Receive = handleCommonMessages orElse { case ChoosePivotBlock => | ||
val peersUsedToChooseTarget = peersToDownloadFrom.collect { case (peer, PeerInfo(_, _, true, maxBlockNumber, _)) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the code till the if could be an auxiliar function like: getVoters
and given the data that returns we express which path we should take (re-schedule or running the election)
7dd239f
to
8932dc1
Compare
with BeforeAndAfter | ||
with WithActorSystemShutDown { | ||
|
||
"FastSyncPivotBlockSelector" should "download pivot block from peers" in new TestSetup { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) it has the old name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Description
Add mechanism of voting (based on different clients implementations) to choose pivot block (renamed from
target block
to consistency)Proposed Solution
Algorithm:
bestBlock - pivotBlockOffset
minPeersToChoosePivotBlock + peersToChoosePivotBlockMargin
peers with the best block >= expected pivot block for expected pivot block headerminPeersToChoosePivotBlock
peers have the same header