Skip to content

ETCM-370: Refine responses validation on block fetcher side #993

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import io.iohk.ethereum.consensus.{ConsensusConfig, FullConsensusConfig, pow}
import io.iohk.ethereum.crypto
import io.iohk.ethereum.domain._
import io.iohk.ethereum.ledger._
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
import io.iohk.ethereum.network.PeerId
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
import io.iohk.ethereum.nodebuilder.VmSetup
Expand Down
12 changes: 10 additions & 2 deletions src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,16 @@ object Blacklist {
val code: Int = 27
val name: String = "UnrequestedBodies"
}
case object RegularSyncRequestFailedType extends BlacklistReasonType with RegularSyncBlacklistGroup {
case object UnrequestedHeadersType extends BlacklistReasonType with RegularSyncBlacklistGroup {
val code: Int = 28
val name: String = "UnrequestedHeaders"
}
case object RegularSyncRequestFailedType extends BlacklistReasonType with RegularSyncBlacklistGroup {
val code: Int = 29
val name: String = "RegularSyncRequestFailed"
}
case object BlockImportErrorType extends BlacklistReasonType with RegularSyncBlacklistGroup {
val code: Int = 29
val code: Int = 30
val name: String = "BlockImportError"
}
}
Expand Down Expand Up @@ -281,6 +285,10 @@ object Blacklist {
val reasonType: BlacklistReasonType = UnrequestedBodiesType
val description: String = "Received unrequested bodies"
}
case object UnrequestedHeaders extends BlacklistReason {
val reasonType: BlacklistReasonType = UnrequestedHeadersType
val description: String = "Received unrequested headers"
}
final case class RegularSyncRequestFailed(error: String) extends BlacklistReason {
val reasonType: BlacklistReasonType = RegularSyncRequestFailedType
val description: String = s"Request failed with error: $error"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import io.iohk.ethereum.consensus.validators.BlockValidator
import io.iohk.ethereum.blockchain.sync.PeersClient._
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
AwaitingBodiesToBeIgnored,
AwaitingHeadersToBeIgnored
AwaitingHeadersToBeIgnored,
HeadersNotFormingSeq,
HeadersNotMatchingReadyBlocks,
HeadersNotMatchingWaitingHeaders
}
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop}
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressProtocol
Expand Down Expand Up @@ -130,7 +133,7 @@ class BlockFetcher(
blockProvider.foreach(peersClient ! BlacklistPeer(_, BlacklistReason.BlockImportError(reason)))
fetchBlocks(newState)

case ReceivedHeaders(headers) if state.isFetchingHeaders =>
case ReceivedHeaders(peer, headers) if state.isFetchingHeaders =>
//First successful fetch
if (state.waitingHeaders.isEmpty) {
supervisor ! ProgressProtocol.StartedFetching
Expand All @@ -146,6 +149,14 @@ class BlockFetcher(
} else {
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))
state.appendHeaders(headers) match {
case Left(HeadersNotFormingSeq) =>
log.info("Dismissed received headers due to: {}", HeadersNotFormingSeq.description)
peersClient ! BlacklistPeer(peer.id, BlacklistReason.UnrequestedHeaders)
state.withHeaderFetchReceived
case Left(HeadersNotMatchingReadyBlocks) =>
log.info("Dismissed received headers due to: {}", HeadersNotMatchingReadyBlocks.description)
peersClient ! BlacklistPeer(peer.id, BlacklistReason.UnrequestedHeaders)
state.withHeaderFetchReceived
case Left(err) =>
log.info("Dismissed received headers due to: {}", err)
state.withHeaderFetchReceived
Expand All @@ -154,6 +165,11 @@ class BlockFetcher(
}
}
fetchBlocks(newState)

case ReceivedHeaders(peer, _) if !state.isFetchingHeaders =>
peersClient ! BlacklistPeer(peer.id, BlacklistReason.UnrequestedHeaders)
Behaviors.same

case RetryHeadersRequest if state.isFetchingHeaders =>
log.debug("Something failed on a headers request, cancelling the request and re-fetching")
fetchBlocks(state.withHeaderFetchReceived)
Expand All @@ -177,6 +193,10 @@ class BlockFetcher(
fetchBlocks(newState)
}

case ReceivedBodies(peer, _) if !state.isFetchingBodies =>
peersClient ! BlacklistPeer(peer.id, BlacklistReason.UnrequestedBodies)
Behaviors.same

case RetryBodiesRequest if state.isFetchingBodies =>
log.debug("Something failed on a bodies request, cancelling the request and re-fetching")
fetchBlocks(state.withBodiesFetchReceived)
Expand Down Expand Up @@ -332,7 +352,7 @@ object BlockFetcher {
final case object RetryBodiesRequest extends FetchCommand
final case object RetryHeadersRequest extends FetchCommand
final case class AdaptedMessageFromEventBus(message: Message, peerId: PeerId) extends FetchCommand
final case class ReceivedHeaders(headers: Seq[BlockHeader]) extends FetchCommand
final case class ReceivedHeaders(peer: Peer, headers: Seq[BlockHeader]) extends FetchCommand
final case class ReceivedBodies(peer: Peer, bodies: Seq[BlockBody]) extends FetchCommand

sealed trait FetchResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import io.iohk.ethereum.consensus.validators.BlockValidator
import io.iohk.ethereum.domain.{Block, BlockBody, BlockHeader, HeadersSeq}
import io.iohk.ethereum.network.PeerId
import io.iohk.ethereum.network.p2p.messages.PV62.BlockHash
import io.iohk.ethereum.utils.ByteStringUtils

import scala.annotation.tailrec
import scala.collection.immutable.Queue
Expand Down Expand Up @@ -75,7 +74,7 @@ case class BlockFetcherState(

def takeHashes(amount: Int): Seq[ByteString] = waitingHeaders.take(amount).map(_.hash)

def appendHeaders(headers: Seq[BlockHeader]): Either[String, BlockFetcherState] =
def appendHeaders(headers: Seq[BlockHeader]): Either[ValidationErrors, BlockFetcherState] =
validatedHeaders(headers.sortBy(_.number)).map(validHeaders => {
val lastNumber = HeadersSeq.lastNumber(validHeaders)
withPossibleNewTopAt(lastNumber)
Expand All @@ -86,17 +85,16 @@ case class BlockFetcherState(

/**
* Validates received headers consistency and their compatibility with the state
* TODO ETCM-370: This needs to be more fine-grained and detailed so blacklisting can be re-enabled
*/
private def validatedHeaders(headers: Seq[BlockHeader]): Either[String, Seq[BlockHeader]] =
private def validatedHeaders(headers: Seq[BlockHeader]): Either[ValidationErrors, Seq[BlockHeader]] =
if (headers.isEmpty) {
Right(headers)
} else {
headers
.asRight[String]
.ensure("Given headers should form a sequence without gaps")(HeadersSeq.areChain)
.ensure("Given headers should form a sequence with ready blocks")(checkConsistencyWithReadyBlocks)
.ensure("Given headers do not form a chain with already stored ones")(headers =>
.asRight[ValidationErrors]
.ensure(HeadersNotFormingSeq)(HeadersSeq.areChain)
.ensure(HeadersNotMatchingReadyBlocks)(checkConsistencyWithReadyBlocks)
.ensure(HeadersNotMatchingWaitingHeaders)(headers =>
(waitingHeaders.lastOption, headers.headOption).mapN(_ isParentOf _).getOrElse(true)
)
}
Expand Down Expand Up @@ -319,4 +317,17 @@ object BlockFetcherState {
* State used to keep track of pending request to prevent multiple requests in parallel
*/
case object AwaitingBodiesToBeIgnored extends FetchingBodiesState

sealed trait ValidationErrors {
def description: String
}
case object HeadersNotFormingSeq extends ValidationErrors {
val description = "Given headers should form a sequence without gaps"
}
case object HeadersNotMatchingReadyBlocks extends ValidationErrors {
val description = "Given headers should form a sequence with ready blocks"
}
case object HeadersNotMatchingWaitingHeaders extends ValidationErrors {
val description = "Given headers should form a chain with waiting headers"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ class HeadersFetcher(
log.debug("Start fetching headers from block {}", blockNumber)
requestHeaders(blockNumber, amount)
Behaviors.same
case AdaptedMessage(_, BlockHeaders(headers)) =>
case AdaptedMessage(peer, BlockHeaders(headers)) =>
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))
supervisor ! BlockFetcher.ReceivedHeaders(headers)
supervisor ! BlockFetcher.ReceivedHeaders(peer, headers)
Behaviors.same
case HeadersFetcher.RetryHeadersRequest =>
supervisor ! BlockFetcher.RetryHeadersRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package io.iohk.ethereum.blockchain.sync.regular

import akka.actor.{Actor, ActorLogging, ActorRef, AllForOneStrategy, Cancellable, Props, Scheduler, SupervisorStrategy}
import akka.actor.typed.{ActorRef => TypedActorRef}
import io.iohk.ethereum.blockchain.sync.SyncProtocol
import io.iohk.ethereum.blockchain.sync.{Blacklist, SyncProtocol}
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.iohk.ethereum.blockchain.sync.regular
import akka.actor.ActorSystem
import akka.testkit.{TestKit, TestProbe}
import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.HeadersNotMatchingReadyBlocks
import io.iohk.ethereum.{BlockHelpers, WithActorSystemShutDown}
import io.iohk.ethereum.network.PeerId
import io.iohk.ethereum.utils.ByteStringUtils
Expand Down Expand Up @@ -69,7 +70,7 @@ class BlockFetcherStateSpec
.appendHeaders(blocks.map(_.header))
.map(_.handleRequestedBlocks(blocks, peer))

assert(result.map(_.waitingHeaders) === Left("Given headers should form a sequence with ready blocks"))
assert(result.map(_.waitingHeaders) === Left(HeadersNotMatchingReadyBlocks))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package io.iohk.ethereum.blockchain.sync.regular

import akka.actor.{ActorRef, ActorSystem}
import akka.actor.{ActorRef, ActorSystem, typed}
import akka.actor.typed.{ActorRef => TypedActorRef}
import akka.testkit.TestActor.AutoPilot
import akka.testkit.TestKit
import akka.testkit.{TestKit, TestProbe}
import akka.util.ByteString
import cats.data.NonEmptyList
import cats.effect.Resource
import cats.syntax.traverse._
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.Start
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.NewCheckpoint
import io.iohk.ethereum.blockchain.sync.{PeersClient, SyncProtocol}
import io.iohk.ethereum.crypto.kec256
Expand Down Expand Up @@ -37,6 +39,7 @@ import org.scalatest.{Assertion, BeforeAndAfterEach}

import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.math.BigInt

class RegularSyncSpec
extends WordSpecBase
Expand Down Expand Up @@ -112,29 +115,101 @@ class RegularSyncSpec
peersClient.expectMsg(PeersClient.BlacklistPeer(defaultPeer.id, BlacklistReason.RegularSyncRequestFailed("a random reason")))
})

//TODO: To be re-enabled with ETCM-370
"blacklist peer which returns headers starting from one with higher number than expected" ignore sync(
new Fixture(
testSystem
) {
"blacklist peer which returns headers starting from one with higher number than expected" in sync(
new Fixture(testSystem) {
var blockFetcher: ActorRef = _

regularSync ! SyncProtocol.Start
peerEventBus.expectMsgClass(classOf[Subscribe])
blockFetcher = peerEventBus.sender()

peersClient.expectMsgEq(blockHeadersChunkRequest(0))
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked(1).headers)))
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked.head.headers)))

val getBodies: PeersClient.Request[GetBlockBodies] = PeersClient.Request.create(
GetBlockBodies(testBlocksChunked.head.headers.map(_.hash)),
PeersClient.BestPeer
)
peersClient.expectMsgEq(getBodies)
peersClient.reply(PeersClient.Response(defaultPeer, BlockBodies(testBlocksChunked.head.bodies)))

blockFetcher ! MessageFromPeer(NewBlock(testBlocks.last, ChainWeight.totalDifficultyOnly(testBlocks.last.number)), defaultPeer.id)
peersClient.expectMsgEq(blockHeadersChunkRequest(1))
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked(5).headers)))
peersClient.expectMsgPF() {
case PeersClient.BlacklistPeer(id, _) if id == defaultPeer.id => true
}
}
)

//TODO: To be re-enabled with ETCM-370
"blacklist peer which returns headers not forming a chain" ignore sync(new Fixture(testSystem) {
"blacklist peer which returns headers not forming a chain" in sync(new Fixture(testSystem) {
regularSync ! SyncProtocol.Start

peersClient.expectMsgEq(blockHeadersChunkRequest(0))
peersClient.reply(
PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked.head.headers.filter(_.number % 2 == 0)))
PeersClient.Response(defaultPeer, BlockHeaders(testBlocks.headers.filter(_.number % 2 == 0)))
)
peersClient.expectMsgPF() {
case PeersClient.BlacklistPeer(id, _) if id == defaultPeer.id => true
}
})

"blacklist peer which sends headers that were not requested" in sync(new Fixture(testSystem) {
import akka.actor.typed.scaladsl.adapter._

val blockImporter = TestProbe()
val fetcher: typed.ActorRef[BlockFetcher.FetchCommand] =
system.spawn(
BlockFetcher(peersClient.ref, peerEventBus.ref, regularSync, syncConfig, validators.blockValidator),
"block-fetcher"
)

fetcher ! Start(blockImporter.ref, 0)

peersClient.expectMsgEq(blockHeadersChunkRequest(0))
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked.head.headers)))

val getBodies: PeersClient.Request[GetBlockBodies] = PeersClient.Request.create(
GetBlockBodies(testBlocksChunked.head.headers.map(_.hash)),
PeersClient.BestPeer
)

peersClient.expectMsgEq(getBodies)
peersClient.reply(PeersClient.Response(defaultPeer, BlockBodies(testBlocksChunked.head.bodies)))

fetcher ! BlockFetcher.ReceivedHeaders(defaultPeer, testBlocksChunked(3).headers)

peersClient.expectMsgPF() {
case PeersClient.BlacklistPeer(id, _) if id == defaultPeer.id => true
}
})

"blacklist peer which sends bodies that were not requested" in sync(new Fixture(testSystem) {
import akka.actor.typed.scaladsl.adapter._

var blockFetcherAdapter: TypedActorRef[MessageFromPeer] = _
val blockImporter = TestProbe()
val fetcher: typed.ActorRef[BlockFetcher.FetchCommand] =
system.spawn(
BlockFetcher(peersClient.ref, peerEventBus.ref, regularSync, syncConfig, validators.blockValidator),
"block-fetcher"
)

fetcher ! Start(blockImporter.ref, 0)

peersClient.expectMsgEq(blockHeadersChunkRequest(0))
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked.head.headers)))

val getBodies: PeersClient.Request[GetBlockBodies] = PeersClient.Request.create(
GetBlockBodies(testBlocksChunked.head.headers.map(_.hash)),
PeersClient.BestPeer
)

peersClient.expectMsgEq(getBodies)
peersClient.reply(PeersClient.Response(defaultPeer, BlockBodies(testBlocksChunked.head.bodies)))

fetcher ! BlockFetcher.ReceivedBodies(defaultPeer, testBlocksChunked(3).bodies)

peersClient.expectMsgPF() {
case PeersClient.BlacklistPeer(id, _) if id == defaultPeer.id => true
}
Expand Down