Skip to content

[ETCM-283] Add block bodies validation in block fetcher #771

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ object RegularSyncItSpecUtils {
"pending-transactions-manager"
)

lazy val validators = buildEthashConsensus.validators

lazy val regularSync = system.actorOf(
RegularSync.props(
peersClient,
Expand All @@ -71,6 +73,7 @@ object RegularSyncItSpecUtils {
ledger,
bl,
blockchainConfig, // FIXME: remove in ETCM-280
validators.blockValidator,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: should this validator be used here instead of more close to production one?

testSyncConfig,
ommersPool,
pendingTransactionsManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class SyncController(
ledger,
blockchain,
blockchainConfig,
validators.blockValidator,
syncConfig,
ommersPool,
pendingTransactionsManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import cats.data.NonEmptyList
import cats.instances.future._
import cats.instances.option._
import cats.syntax.either._
import io.iohk.ethereum.consensus.validators.BlockValidator
import io.iohk.ethereum.blockchain.sync.PeersClient._
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
AwaitingBodiesToBeIgnored,
Expand Down Expand Up @@ -37,6 +38,7 @@ class BlockFetcher(
val peerEventBus: ActorRef,
val supervisor: ActorRef,
val syncConfig: SyncConfig,
val blockValidator: BlockValidator,
implicit val scheduler: Scheduler
) extends Actor
with ActorLogging {
Expand All @@ -54,7 +56,7 @@ class BlockFetcher(
}

private def idle(): Receive = handleCommonMessages(None) orElse { case Start(importer, blockNr) =>
BlockFetcherState.initial(importer, blockNr) |> fetchBlocks
BlockFetcherState.initial(importer, blockValidator, blockNr) |> fetchBlocks
peerEventBus ! Subscribe(
MessageClassifier(
Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code, BlockHeaders.code),
Expand Down Expand Up @@ -138,19 +140,25 @@ class BlockFetcher(

private def handleBodiesMessages(state: BlockFetcherState): Receive = {
case Response(peer, BlockBodies(bodies)) if state.isFetchingBodies =>
val newState =
if (state.fetchingBodiesState == AwaitingBodiesToBeIgnored) {
log.debug("Received {} block bodies that will be ignored", bodies.size)
state.withBodiesFetchReceived
} else {
log.debug("Fetched {} block bodies", bodies.size)
state.withBodiesFetchReceived.addBodies(peer.id, bodies)
}

fetchBlocks(newState)
log.debug(s"Received ${bodies.size} block bodies")
if (state.fetchingBodiesState == AwaitingBodiesToBeIgnored) {
log.debug("Block bodies will be ignored due to an invalidation was requested for them")
fetchBlocks(state.withBodiesFetchReceived)
} else {
val newState =
state.validateBodies(bodies) match {
case Left(err) =>
peersClient ! BlacklistPeer(peer.id, err)
state.withBodiesFetchReceived
case Right(newBlocks) =>
state.withBodiesFetchReceived.handleRequestedBlocks(newBlocks, peer.id)
}
val waitingHeadersDequeued = state.waitingHeaders.size - newState.waitingHeaders.size
log.debug(s"Processed ${waitingHeadersDequeued} new blocks from received block bodies")
fetchBlocks(newState)
}
case RetryBodiesRequest if state.isFetchingBodies =>
log.debug("Something failed on a bodies request, cancelling the request and re-fetching")

val newState = state.withBodiesFetchReceived
fetchBlocks(newState)
}
Expand Down Expand Up @@ -194,7 +202,7 @@ class BlockFetcher(
//TODO ETCM-389: Handle mined, checkpoint and new blocks uniformly
log.debug("Received NewBlock {}", block.idTag)
val newBlockNr = block.number
val nextExpectedBlock = state.lastFullBlockNumber + 1
val nextExpectedBlock = state.lastBlock + 1

if (state.isOnTop && newBlockNr == nextExpectedBlock) {
log.debug("Passing block directly to importer")
Expand Down Expand Up @@ -346,9 +354,10 @@ object BlockFetcher {
peerEventBus: ActorRef,
supervisor: ActorRef,
syncConfig: SyncConfig,
blockValidator: BlockValidator,
scheduler: Scheduler
): Props =
Props(new BlockFetcher(peersClient, peerEventBus, supervisor, syncConfig, scheduler))
Props(new BlockFetcher(peersClient, peerEventBus, supervisor, syncConfig, blockValidator, scheduler))

sealed trait FetchMsg
case class Start(importer: ActorRef, fromBlock: BigInt) extends FetchMsg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package io.iohk.ethereum.blockchain.sync.regular
import akka.actor.ActorRef
import akka.util.ByteString
import cats.data.NonEmptyList
import io.iohk.ethereum.consensus.validators.BlockValidator
import cats.implicits._
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState._
import io.iohk.ethereum.domain.{Block, BlockBody, BlockHeader, HeadersSeq}
import io.iohk.ethereum.network.PeerId
import io.iohk.ethereum.network.p2p.messages.PV62.BlockHash

import scala.collection.immutable.Queue
import scala.annotation.tailrec
import io.iohk.ethereum.consensus.validators.BlockValidator

// scalastyle:off number.of.methods
/**
Expand All @@ -33,6 +36,7 @@ import scala.collection.immutable.Queue
*/
case class BlockFetcherState(
importer: ActorRef,
blockValidator: BlockValidator,
readyBlocks: Queue[Block],
waitingHeaders: Queue[BlockHeader],
fetchingHeadersState: FetchingHeadersState,
Expand All @@ -55,12 +59,6 @@ case class BlockFetcherState(

def hasReachedSize(size: Int): Boolean = (readyBlocks.size + waitingHeaders.size) >= size

def lastFullBlockNumber: BigInt =
readyBlocks.lastOption
.map(_.number)
.orElse(waitingHeaders.headOption.map(_.number - 1))
.getOrElse(lastBlock)

def lowestBlock: BigInt =
readyBlocks.headOption
.map(_.number)
Expand Down Expand Up @@ -117,35 +115,69 @@ case class BlockFetcherState(
)

/**
* Matches bodies with headers in queue and adding matched bodies to the blocks.
* If bodies is empty collection - headers in queue are removed as the cause is:
* - the headers are from rejected fork and therefore it won't be possible to resolve bodies for them
* - given peer is still syncing (quite unlikely due to preference of peers with best total difficulty
* when making a request)
* When bodies are requested, the response don't need to be a complete sub chain,
* even more, we could receive an empty chain and that will be considered valid. Here we just
* validate that the received bodies corresponds to an ordered subset of the requested headers.
*/
def addBodies(peerId: PeerId, bodies: Seq[BlockBody]): BlockFetcherState = {
if (bodies.isEmpty) {
copy(waitingHeaders = Queue.empty)
} else {
val (matching, waiting) = waitingHeaders.splitAt(bodies.length)
val blocks = matching.zip(bodies).map((Block.apply _).tupled)
def validateBodies(receivedBodies: Seq[BlockBody]): Either[String, Seq[Block]] =
bodiesAreOrderedSubsetOfRequested(waitingHeaders.toList, receivedBodies)
.toRight(
"Received unrequested bodies"
)

withPeerForBlocks(peerId, blocks.map(_.header.number))
.copy(
readyBlocks = readyBlocks.enqueue(blocks),
waitingHeaders = waiting
)
// Checks that the received block bodies are an ordered subset of the ones requested
@tailrec
private def bodiesAreOrderedSubsetOfRequested(
requestedHeaders: Seq[BlockHeader],
respondedBodies: Seq[BlockBody],
matchedBlocks: Seq[Block] = Nil
): Option[Seq[Block]] =
(requestedHeaders, respondedBodies) match {
case (Seq(), _ +: _) => None
case (_, Seq()) => Some(matchedBlocks)
case (header +: remainingHeaders, body +: remainingBodies) =>
val doMatch = blockValidator.validateHeaderAndBody(header, body).isRight
if (doMatch)
bodiesAreOrderedSubsetOfRequested(remainingHeaders, remainingBodies, matchedBlocks :+ Block(header, body))
else
bodiesAreOrderedSubsetOfRequested(remainingHeaders, respondedBodies, matchedBlocks)
}
}

def appendNewBlock(block: Block, fromPeer: PeerId): BlockFetcherState =
withPeerForBlocks(fromPeer, Seq(block.header.number))
.withPossibleNewTopAt(block.number)
.withLastBlock(block.number)
.copy(
readyBlocks = readyBlocks.enqueue(block),
waitingHeaders = waitingHeaders.filter(block.number != _.number)
/**
* If blocks is empty collection - headers in queue are removed as the cause is:
* - the headers are from rejected fork and therefore it won't be possible to resolve blocks for them
* - given peer is still syncing (quite unlikely due to preference of peers with best total difficulty
* when making a request)
*/
def handleRequestedBlocks(blocks: Seq[Block], fromPeer: PeerId): BlockFetcherState =
if (blocks.isEmpty)
copy(
waitingHeaders = Queue.empty
)
else
blocks.foldLeft(this) { case (state, block) =>
state.enqueueRequestedBlock(block, fromPeer)
}

/**
* If the requested block is not the next in the line in the waiting headers queue,
* we opt for not adding it in the ready blocks queue.
*/
def enqueueRequestedBlock(block: Block, fromPeer: PeerId): BlockFetcherState =
waitingHeaders.dequeueOption
.map { case (waitingHeader, waitingHeadersTail) =>
if (waitingHeader.hash == block.hash)
withPeerForBlocks(fromPeer, Seq(block.number))
.withPossibleNewTopAt(block.number)
.withLastBlock(block.number)
.copy(
readyBlocks = readyBlocks.enqueue(block),
waitingHeaders = waitingHeadersTail
)
else
this
}
.getOrElse(this)

def pickBlocks(amount: Int): Option[(NonEmptyList[Block], BlockFetcherState)] =
if (readyBlocks.nonEmpty) {
Expand Down Expand Up @@ -242,17 +274,19 @@ case class BlockFetcherState(
object BlockFetcherState {
case class StateNodeFetcher(hash: ByteString, replyTo: ActorRef)

def initial(importer: ActorRef, lastBlock: BigInt): BlockFetcherState = BlockFetcherState(
importer = importer,
readyBlocks = Queue(),
waitingHeaders = Queue(),
fetchingHeadersState = NotFetchingHeaders,
fetchingBodiesState = NotFetchingBodies,
stateNodeFetcher = None,
lastBlock = lastBlock,
knownTop = lastBlock + 1,
blockProviders = Map()
)
def initial(importer: ActorRef, blockValidator: BlockValidator, lastBlock: BigInt): BlockFetcherState =
BlockFetcherState(
importer = importer,
blockValidator = blockValidator,
readyBlocks = Queue(),
waitingHeaders = Queue(),
fetchingHeadersState = NotFetchingHeaders,
fetchingBodiesState = NotFetchingBodies,
stateNodeFetcher = None,
lastBlock = lastBlock,
knownTop = lastBlock + 1,
blockProviders = Map()
)

trait FetchingHeadersState
case object NotFetchingHeaders extends FetchingHeadersState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.{NewCheckpoint, ProgressProtocol, ProgressState}
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.InternalLastBlockImport
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
import io.iohk.ethereum.consensus.validators.BlockValidator
import io.iohk.ethereum.crypto.ECDSASignature
import io.iohk.ethereum.domain.Blockchain
import io.iohk.ethereum.ledger.Ledger
Expand All @@ -21,6 +22,7 @@ class RegularSync(
ledger: Ledger,
blockchain: Blockchain,
blockchainConfig: BlockchainConfig,
blockValidator: BlockValidator,
syncConfig: SyncConfig,
ommersPool: ActorRef,
pendingTransactionsManager: ActorRef,
Expand All @@ -30,7 +32,10 @@ class RegularSync(
with ActorLogging {

val fetcher: ActorRef =
context.actorOf(BlockFetcher.props(peersClient, peerEventBus, self, syncConfig, scheduler), "block-fetcher")
context.actorOf(
BlockFetcher.props(peersClient, peerEventBus, self, syncConfig, blockValidator, scheduler),
"block-fetcher"
)
val broadcaster: ActorRef = context.actorOf(
BlockBroadcasterActor
.props(new BlockBroadcast(etcPeerManager, syncConfig), peerEventBus, etcPeerManager, syncConfig, scheduler),
Expand Down Expand Up @@ -121,6 +126,7 @@ object RegularSync {
ledger: Ledger,
blockchain: Blockchain,
blockchainConfig: BlockchainConfig,
blockValidator: BlockValidator,
syncConfig: SyncConfig,
ommersPool: ActorRef,
pendingTransactionsManager: ActorRef,
Expand All @@ -135,6 +141,7 @@ object RegularSync {
ledger,
blockchain,
blockchainConfig,
blockValidator,
syncConfig,
ommersPool,
pendingTransactionsManager,
Expand Down
Loading