Skip to content

[ECTM-104] Pivot block selection algorithm #711

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 8, 2020

Conversation

mmrozek
Copy link
Contributor

@mmrozek mmrozek commented Sep 30, 2020

Description

Add mechanism of voting (based on different clients implementations) to choose pivot block (renamed from target block to consistency)

Proposed Solution

Algorithm:

  1. Find the highest block number (best block from all peers)
  2. Define expected pivot block - bestBlock - pivotBlockOffset
  3. Ask minPeersToChoosePivotBlock + peersToChoosePivotBlockMargin peers with the best block >= expected pivot block for expected pivot block header
  4. Wait for all response and check if minPeersToChoosePivotBlock peers have the same header
  5. If not ask the next peer (if available)
  6. If there are no more peers restart the whole process after timeout

@mmrozek mmrozek force-pushed the ectm-104-target-block-selection branch 4 times, most recently from cc65b2c to 955765e Compare October 2, 2020 12:25
@mmrozek mmrozek requested review from ntallar and mirkoAlic October 2, 2020 12:38
@mmrozek mmrozek force-pushed the ectm-104-target-block-selection branch from 955765e to 8a0d99f Compare October 2, 2020 12:40
@mmrozek mmrozek marked this pull request as ready for review October 2, 2020 12:40
case MessageFromPeer(blockHeaders: BlockHeaders, peerId) =>
peerEventBus ! Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peerId)))
val updatedPeersToAsk = peersToAsk - peerId
val targetBlockHeaderOpt = blockHeaders.headers.find(header => header.number == targetBlockNumber)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should validate that we received exactly one header with required number ?

val peersSortedByBestNumber = peersWithBestBlockNumbers.toList.sortBy(-_._2)
val bestPeerBestBlockNumber = peersSortedByBestNumber.head._2
val expectedPivotBlock = (bestPeerBestBlockNumber - syncConfig.pivotBlockOffset).max(0)
val peersToAsk = peersSortedByBestNumber.takeWhile(_._2 >= expectedPivotBlock).map(_._1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should reschedule request for pivot header if there is not enough peers to ask ?

}
}

def tryChooseTargetBlock(peersWithBestBlockNumbers: Map[Peer, BigInt]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general not: there is a lot of _._1 and smiilar operators which hurts readability of this function

peersToAsk: Set[PeerId],
targetBlockNumber: BigInt,
timeout: Cancellable,
headers: Map[BlockHeader, Int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it would be cleanet to keep this data in : Map[ByteString, BlockHeaderWithVotes] where BlockHeaderWithVotes is case class BlockHeaderWithVotes(header: BlockHeader, votes: Int) that way we would able to avoid this _._1 tuple operators and could use header.get(..) instead of find.

} else {
context become waitingForPivotBlock(updatedPeersToAsk, targetBlockNumber, timeout, updatedHeaders)
}
case None =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering, if one of the peers did not respond or responded badly, maybe we should restart whole process if we won't have enough votes ?

@mmrozek mmrozek force-pushed the ectm-104-target-block-selection branch from 6a31724 to cd35418 Compare October 5, 2020 12:21
): Receive =
handleCommonMessages orElse {
case MessageFromPeer(blockHeaders: BlockHeaders, peerId) =>
peerEventBus ! Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peerId)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: this whole handler is a litte bit to long for my taste mabye we could split it up into smaller functions ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried, but in this way, it is the most readable

headers.get(targetBlockHeader.hash).map(_.vote).getOrElse(BlockHeaderWithVotes(targetBlockHeader))
val updatedHeaders = headers.updated(targetBlockHeader.hash, newValue)
val BlockHeaderWithVotes(mostPopularBlockHeader, updatedVotes) = updatedHeaders.mostVotedHeader
if (updatedVotes >= minPeersToChoosePivotBlock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If i understand correctly how this works now is that even if we have lets say 7 peers to whom we send request, we finish selection when we receive minPeersToChoosePivotBlock same blocks, and we do not wait for the rest of responses ?
I wonder if this does not make such scenario possible:

  1. We do our selection and receive enough votes for block, but we do not receive header from lets say peer1
  2. We are back to fast sync and start making requests, and we make request for x block headers from peer1, it means we create PeerRequestHandler which subsribes to PeerEventBus for BlockHeaders messages from Peer1 .
  3. peer1 respond with pivot block. PivotBlockSelector do not receive response as it already finished, but PeerRequestHandler from fast sync headers request is still alive and receievs this pivot block
  4. we blacklist peer1 for response which do not match our fast sync request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is possible. But it is the same case when we assume the timeout - this response could be received later. I could add collecting for all msgs if you prefer, but in a case when we have a lot of peers it won't perform very well

@mmrozek mmrozek force-pushed the ectm-104-target-block-selection branch from cd35418 to 797b12b Compare October 6, 2020 10:38
@mmrozek mmrozek changed the title [ECTM-104] Target block selection algorithm [ECTM-104] Pivot block selection algorithm Oct 6, 2020
Copy link
Contributor

@KonradStaniec KonradStaniec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code lgtm! Waiting for testing result on mainnet and mordor

@mmrozek
Copy link
Contributor Author

mmrozek commented Oct 7, 2020

@KonradStaniec I successfully synced with Mordor. Today I will try to run it against the mainnet

log.info(s"Current target block is fresh enough, starting state download")
if (syncState.targetBlock.stateRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) {
syncState = syncState.copy(pendingMptNodes = Seq())
private def updatePivotSyncState(state: FinalBlockProcessingResult, targetBlockHeader: BlockHeader): Unit =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) maybe just to safe some space:

    private def updatePivotSyncState(state: FinalBlockProcessingResult, targetBlockHeader: BlockHeader): Unit =
      state match {
        case ImportedLastBlock
          if (targetBlockHeader.number - syncState.pivotBlock.number <= syncConfig.maxTargetDifference) =>
            log.info(s"Current target block is fresh enough, starting state download")
            if (syncState.pivotBlock.stateRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) {
              syncState = syncState.copy(pendingMptNodes = Seq())
            } else {
              syncState = syncState.copy(pendingMptNodes = Seq(StateMptNodeHash(syncState.pivotBlock.stateRoot)))
            }
        case _ =>
          log.info(
            s"FinalBlockProcessingResult: $state, changing target to ${targetBlockHeader.number}, new safe target is ${syncState.safeDownloadTarget}"
          )
          syncState =
            syncState.updateTargetBlock(targetBlockHeader, syncConfig.fastSyncBlockValidationX, updateFailures = true)
      }

safeDownloadTarget = newTarget.number + numberOfSafeBlocks,
targetBlockUpdateFailures = if (updateFailures) targetBlockUpdateFailures + 1 else targetBlockUpdateFailures
)
def updateTargetBlock(newTarget: BlockHeader, numberOfSafeBlocks: BigInt, updateFailures: Boolean): SyncState =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updatePivotBlock(newPivot: ..?

@@ -328,8 +360,8 @@ class FastSync(
handleRewind(header, peer, syncConfig.fastSyncBlockValidationN)
case HeadersProcessingFinished =>
processSyncing()
case ImportedTargetBlock =>
updateTargetBlock(ImportedLastBlock)
case ImportedTargetBlock =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ImportedPivotBlock ?

val targetBlockSelector = context.actorOf(FastSyncTargetBlockSelector.props(etcPeerManager, peerEventBus, syncConfig, scheduler), "target-block-selector")
targetBlockSelector ! FastSyncTargetBlockSelector.ChooseTargetBlock
context become syncingHandler.waitingForTargetBlockUpdate(ImportedLastBlock)
val pivotBlockSelector = context.actorOf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense maybe to create PivotBlockSelector actor once and ask him for pivot block whenever there's a need for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. I don't want to change that in this PR. Will see after fastsync refactoring what solution will be the best

val scheduler: Scheduler
) extends Actor
with ActorLogging
with PeerListSupport
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PeersClient instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? We don't need PeersClient functionality here

import FastSyncPivotBlockSelector._
import syncConfig._

val fastSync: ActorRef = context.parent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: could/should fastSync actor be passed through constructor? Or sender() used?

import scala.concurrent.Await
import scala.concurrent.duration._

class FastSyncPivotBlockSelectorSpec extends AnyFlatSpec with Matchers with BeforeAndAfter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WithActorSystemShutdown?

val peer3TestProbe: TestProbe = TestProbe("peer3")(system)
val peer4TestProbe: TestProbe = TestProbe("peer4")(system)

val peer1 = Peer(new InetSocketAddress("127.0.0.1", 0), peer1TestProbe.ref, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shoduln't these peers be extracted into separate file?

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration

class FastSyncPivotBlockSelector(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code of this actor seems ok, but IMHO, we could reflect the Actor behaviour in more expressive way, i will leave some comments regarding that.


override def receive: Receive = idle

def idle: Receive = handleCommonMessages orElse { case ChoosePivotBlock =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not ChoosePivotBlock -> StartPivotBlockElection ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because from parent (fast sync) perspective we wnt to choose a pivot block. It doesn't matter if we use an election for that or sth else

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough, then just a minor one: ChoosePivotBlock to SelectPivotBlock ? i mean the actor is called FastSyncPivotBlockSelector...also, it does matter if is fastSync related? or it could be PivotBlockSelector ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, regaring using the name Start...Election, take into consideration that programming with actors, behind the scenes, is defining a protocol, so, not sure if not express the way the protocol behaves is best than explicitly mention it. I mean if this was a trait instead of an Actor, i would be agree with you regarding not mention the actual implementation.

}
}

def waitingForPivotBlock(
Copy link
Contributor

@mirkoAlic mirkoAlic Oct 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not runningPivotBlockElection ?

Possible scenarios:

  • VoteReceived aka MessageFromPeer ...
  • ElectionExpired aka PivotBlockTimeout

scheduleRetry(startRetryInterval)
}

private def votingProcess(
Copy link
Contributor

@mirkoAlic mirkoAlic Oct 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not countingVotes ?

From my perspective, at this stage these are states the code should try to reflect better:

ConsensusReached: send response and cleanup
ElectionInProgress: continue runningPivotBlockElection
ConsensusNotReached.NoMoreVoters: schedule a new election startPivotBlockElection
ConsensusNotReached.AddNewVoter: continue runningPivotBlockElection (adds extra time)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a value of changing votingProcess to countingVotes. For me counting votes is process after the election. But it is not our case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think is a thin line, i mean, you are checking or counting votes and defining what to do. But yeah, using the word process is valid too.

override def receive: Receive = idle

def idle: Receive = handleCommonMessages orElse { case ChoosePivotBlock =>
val peersUsedToChooseTarget = peersToDownloadFrom.collect { case (peer, PeerInfo(_, _, true, maxBlockNumber, _)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code till the if could be an auxiliar function like: getVoters and given the data that returns we express which path we should take (re-schedule or running the election)

@mmrozek mmrozek force-pushed the ectm-104-target-block-selection branch from 7dd239f to 8932dc1 Compare October 7, 2020 13:56
with BeforeAndAfter
with WithActorSystemShutDown {

"FastSyncPivotBlockSelector" should "download pivot block from peers" in new TestSetup {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) it has the old name

Copy link
Contributor

@mirkoAlic mirkoAlic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@mmrozek mmrozek merged commit 6e30ab6 into develop Oct 8, 2020
@mmrozek mmrozek deleted the ectm-104-target-block-selection branch October 8, 2020 11:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants