Skip to content

[ETCM-211] Tracking of headers/bodies requests that will be ignored due to invalidation #749

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import cats.instances.future._
import cats.instances.option._
import cats.syntax.either._
import io.iohk.ethereum.blockchain.sync.PeersClient._
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
AwaitingBodiesToBeIgnored,
AwaitingHeadersToBeIgnored
}
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop}
import io.iohk.ethereum.crypto.kec256
import io.iohk.ethereum.domain._
Expand Down Expand Up @@ -91,27 +95,51 @@ class BlockFetcher(

private def handleHeadersMessages(state: BlockFetcherState): Receive = {
case Response(peer, BlockHeaders(headers)) if state.isFetchingHeaders =>
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))

val newState = state.validatedHeaders(headers) match {
case Left(err) =>
peersClient ! BlacklistPeer(peer.id, err)
state.fetchingHeaders(false)
case Right(validHeaders) =>
state.appendHeaders(validHeaders)
}
val newState =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move this code into method on BlockFetcherState and leave here only logging

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a handleHeaderResponse function you mean? Isn't the current structure more oriented to doing the handling of responses on the actor? Shouldn't we move the bodies/nodes response handling as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After second look - let's keep it as is. It's just - the more I work with actors, more I want extract all the logic from them so it can be tested without creating that actor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is totally my current sentiments about actors, they should have as little logic as possible and should be only thin communication layer over main logic.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm having the same concerns with them

However for this case in particular I think we would fall into a too big and not understandable BlockFetcherState class if we start moving everything there, we should probably start splitting up this logic into other classes

if (state.fetchingHeadersState == AwaitingHeadersToBeIgnored) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am missing somehing, but what will happen in case of request time out ?
I see such path:

  1. request headers
  2. fetcher receive invalidate
  3. request times out, but we are still in AwaitingHeadersToBeIgnored state
  4. fetcher create request for new headers, from new invalidated blocks
  5. fetcher receivers response witth new block headers, but it is ignored as fetcher is still in AwaitingHeadersToBeIgnored

Is it correct, or there is some detail i am missing ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right! I thought I had analyzed that but I missed it

I'll update this PR with something even closer to what we do when receiving the response

log.debug(
"Received {} headers starting from block {} that will be ignored",
headers.size,
headers.headOption.map(_.number)
)
state.withHeaderFetchReceived
} else {
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))

state.validatedHeaders(headers) match {
case Left(err) =>
peersClient ! BlacklistPeer(peer.id, err)
state.withHeaderFetchReceived
case Right(validHeaders) =>
state.withHeaderFetchReceived.appendHeaders(validHeaders)
}
}

fetchBlocks(newState)
case RetryHeadersRequest if state.isFetchingHeaders =>
log.debug("Retrying request for headers")
fetchHeaders(state)
log.debug("Time-out occurred while waiting for headers")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more of discussion type comment. Do you think it would bring problems if we would receive this time out response later ? i.e something like:

  1. We request header lets say from block with number 100
  2. we receive invalidate from 100 to 80
  3. we receive timeout for our request, we blacklist peer and clear state.
  4. after some time (200 s by default) peer is unblacklisted , and we request headers from block with number 90 form it.
  5. we receive old response due to some network traffic problems or something like that.

Do you think it could result in something more than blacklising peer ? (for example triggering this bug with unbounded resources usage)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely, we are currently doing no validations that the headers received match the request

With that sort of validations we should be safe I think, but for now I was assuming that that case will never happen, the 30 seconds till timeout should be more than enough to prevent that, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes me think if this whole solution shouldn't be implemented as Actor/class in between fetcher and peers client. It could track requests made and invalidate previous ones basing on strategy choosen (at-most-n, allow-all, etc.).
This could simplify things significantly and allow code in fetcher focus on just orchestrating fetching of blocks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would this actor do? Are you maybe thinking of something like the BlockChainDataFetcher of our other project? (only on how it requests headers and bodies)

It could track requests

That at least would focus on ☝️ part and allow validating that the responses received match the requests we sent

Either way I'd do that effort in parallel with this kind of patching that only touches the minimum required (and is needed for the testnet)

I'm a bit worried of how risky any design changes on our sync could be... though that BlockChainDataFetcher refactoring would be low risk I think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, BlockFetcher is more or less the same as BlockchainDataFetcher there already IMO. I think more on structure like that:

                           block fetcher
                      |       |                      |
headers fetcher(Fetcher)  bodies fetcher(Fetcher)  state nodes fetcher(Fetcher)
                      |       |                      |
                             peers client

Where Fetcher is that specialized class/actor that manages requests of given type and could have interface like:

abstract class Fetcher(multipleReqStrategy: AtMostN|AllowAll) {
  def makeRequest(msg: MessageSerializable): IO[msg.Response] //Issues a request and add it to tracked pool
  def cancelPendingRequests: IO[Unit] //Stores information about cancellation so knows which responses cannot be passed further if received
}  

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so imo details up to discussion, but i am all for doing some kinda detailed request-response tracking. Unfortunately as eth do not have any request-id for its request-response messages, such thing is ultimately necessary if we want to have possiblitly to cancel in fly requests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we receive old response due to some network traffic problems or something like that.

I just realized this might not be a problem, our PeerRequestHandler awaiting the response should have been killed by then so any too long into the future response should be discarded.

Either way I added this task to add validations that the request/response match and then detect malicious peer behaviour: https://jira.iohk.io/browse/ETCM-283

Unfortunately as eth do not have any request-id for its request-response messages, such thing is ultimately necessary if we want to have possiblitly to cancel in fly requests.

Due to our PeerRequestHandler design this might be easier to do 🤔 we can identify the request by the actor that's in charge of getting the response. If we use a single peer per request type that might work (for now we don't have even more than one request type in parallel)


val newState = state.withHeaderFetchReceived
fetchBlocks(newState)
}

private def handleBodiesMessages(state: BlockFetcherState): Receive = {
case Response(peer, BlockBodies(bodies)) if state.isFetchingBodies =>
log.debug("Fetched {} block bodies", bodies.size)
state.addBodies(peer, bodies) |> fetchBlocks
case RetryBodiesRequest if state.isFetchingBodies => fetchBodies(state)
val newState =
if (state.fetchingBodiesState == AwaitingBodiesToBeIgnored) {
log.debug("Received {} block bodies that will be ignored", bodies.size)
state.withBodiesFetchReceived
} else {
log.debug("Fetched {} block bodies", bodies.size)
state.withBodiesFetchReceived.addBodies(peer, bodies)
}

fetchBlocks(newState)
case RetryBodiesRequest if state.isFetchingBodies =>
log.debug("Time-out occurred while waiting for bodies")

val newState = state.withBodiesFetchReceived
fetchBlocks(newState)
}

private def handleStateNodeMessages(state: BlockFetcherState): Receive = {
Expand Down Expand Up @@ -207,7 +235,7 @@ class BlockFetcher(
.filter(!_.hasFetchedTopHeader)
.filter(!_.hasReachedSize(syncConfig.maxFetcherQueueSize))
.tap(fetchHeaders)
.map(_.fetchingHeaders(true))
.map(_.withNewHeadersFetch)
.getOrElse(fetcherState)

private def fetchHeaders(state: BlockFetcherState): Unit = {
Expand All @@ -229,10 +257,12 @@ class BlockFetcher(
.filter(!_.isFetchingBodies)
.filter(_.waitingHeaders.nonEmpty)
.tap(fetchBodies)
.map(state => state.fetchingBodies(true))
.map(state => state.withNewBodiesFetch)
.getOrElse(fetcherState)

private def fetchBodies(state: BlockFetcherState): Unit = {
log.debug("Fetching bodies")

val hashes = state.takeHashes(syncConfig.blockBodiesPerRequest)
requestBlockBodies(hashes) pipeTo self
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,32 @@ import cats.syntax.option._

import scala.collection.immutable.Queue

// scalastyle:off number.of.methods
/**
* State used by the BlockFetcher
*
* @param importer the BlockImporter actor reference
* @param readyBlocks
* @param waitingHeaders
* @param fetchingHeadersState the current state of the headers fetching, whether we
* - haven't fetched any yet
* - are awaiting a response
* - are awaiting a response but it should be ignored due to blocks being invalidated
* @param fetchingBodiesState the current state of the bodies fetching, whether we
* - haven't fetched any yet
* - are awaiting a response
* - are awaiting a response but it should be ignored due to blocks being invalidated
* @param stateNodeFetcher
* @param lastBlock
* @param knownTop
* @param blockProviders
*/
case class BlockFetcherState(
importer: ActorRef,
readyBlocks: Queue[Block],
waitingHeaders: Queue[BlockHeader],
isFetchingHeaders: Boolean,
isFetchingBodies: Boolean,
fetchingHeadersState: FetchingHeadersState,
fetchingBodiesState: FetchingBodiesState,
stateNodeFetcher: Option[StateNodeFetcher],
lastBlock: BigInt,
knownTop: BigInt,
Expand All @@ -28,7 +48,7 @@ case class BlockFetcherState(

def isFetchingStateNode: Boolean = stateNodeFetcher.isDefined

def hasEmptyBuffer: Boolean = readyBlocks.isEmpty && waitingHeaders.isEmpty
private def hasEmptyBuffer: Boolean = readyBlocks.isEmpty && waitingHeaders.isEmpty

def hasFetchedTopHeader: Boolean = lastBlock == knownTop

Expand All @@ -53,8 +73,7 @@ case class BlockFetcherState(
def takeHashes(amount: Int): Seq[ByteString] = waitingHeaders.take(amount).map(_.hash)

def appendHeaders(headers: Seq[BlockHeader]): BlockFetcherState =
fetchingHeaders(false)
.withPossibleNewTopAt(headers.lastOption.map(_.number))
withPossibleNewTopAt(headers.lastOption.map(_.number))
.copy(
waitingHeaders = waitingHeaders ++ headers.filter(_.number > lastBlock).sortBy(_.number),
lastBlock = HeadersSeq.lastNumber(headers).getOrElse(lastBlock)
Expand Down Expand Up @@ -86,9 +105,11 @@ case class BlockFetcherState(
val (matching, waiting) = waitingHeaders.splitAt(bodies.length)
val blocks = matching.zip(bodies).map((Block.apply _).tupled)

fetchingBodies(false)
.withPeerForBlocks(peer.id, blocks.map(_.header.number))
.copy(readyBlocks = readyBlocks.enqueue(blocks), waitingHeaders = waiting)
withPeerForBlocks(peer.id, blocks.map(_.header.number))
.copy(
readyBlocks = readyBlocks.enqueue(blocks),
waitingHeaders = waiting
)
}

def appendNewBlock(block: Block, fromPeer: PeerId): BlockFetcherState =
Expand Down Expand Up @@ -126,18 +147,25 @@ case class BlockFetcherState(

def invalidateBlocksFrom(nr: BigInt): (Option[PeerId], BlockFetcherState) = invalidateBlocksFrom(nr, Some(nr))

def invalidateBlocksFrom(nr: BigInt, toBlacklist: Option[BigInt]): (Option[PeerId], BlockFetcherState) =
def invalidateBlocksFrom(nr: BigInt, toBlacklist: Option[BigInt]): (Option[PeerId], BlockFetcherState) = {
// We can't start completely from scratch as requests could be in progress, we have to keep special track of them
val newFetchingHeadersState =
if (fetchingHeadersState == AwaitingHeaders) AwaitingHeadersToBeIgnored else fetchingHeadersState
val newFetchingBodiesState =
if (fetchingBodiesState == AwaitingBodies) AwaitingBodiesToBeIgnored else fetchingBodiesState

(
toBlacklist.flatMap(blockProviders.get),
copy(
readyBlocks = Queue(),
waitingHeaders = Queue(),
lastBlock = (nr - 2).max(0),
isFetchingHeaders = false,
isFetchingBodies = false,
fetchingHeadersState = newFetchingHeadersState,
fetchingBodiesState = newFetchingBodiesState,
blockProviders = blockProviders - nr
)
)
}

def withLastBlock(nr: BigInt): BlockFetcherState = copy(lastBlock = nr)

Expand All @@ -154,9 +182,13 @@ case class BlockFetcherState(
def withPeerForBlocks(peerId: PeerId, blocks: Seq[BigInt]): BlockFetcherState =
copy(blockProviders = blockProviders ++ blocks.map(block => block -> peerId))

def fetchingHeaders(isFetching: Boolean): BlockFetcherState = copy(isFetchingHeaders = isFetching)
def isFetchingHeaders: Boolean = fetchingHeadersState != NotFetchingHeaders
def withNewHeadersFetch: BlockFetcherState = copy(fetchingHeadersState = AwaitingHeaders)
def withHeaderFetchReceived: BlockFetcherState = copy(fetchingHeadersState = NotFetchingHeaders)

def fetchingBodies(isFetching: Boolean): BlockFetcherState = copy(isFetchingBodies = isFetching)
def isFetchingBodies: Boolean = fetchingBodiesState != NotFetchingBodies
def withNewBodiesFetch: BlockFetcherState = copy(fetchingBodiesState = AwaitingBodies)
def withBodiesFetchReceived: BlockFetcherState = copy(fetchingBodiesState = NotFetchingBodies)

def fetchingStateNode(hash: ByteString, requestor: ActorRef): BlockFetcherState =
copy(stateNodeFetcher = Some(StateNodeFetcher(hash, requestor)))
Expand Down Expand Up @@ -188,11 +220,31 @@ object BlockFetcherState {
importer = importer,
readyBlocks = Queue(),
waitingHeaders = Queue(),
isFetchingHeaders = false,
isFetchingBodies = false,
fetchingHeadersState = NotFetchingHeaders,
fetchingBodiesState = NotFetchingBodies,
stateNodeFetcher = None,
lastBlock = lastBlock,
knownTop = lastBlock + 1,
blockProviders = Map()
)

trait FetchingHeadersState
case object NotFetchingHeaders extends FetchingHeadersState
case object AwaitingHeaders extends FetchingHeadersState
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about tracking parameters for request made? It would allow to track (and at least log for now) case which Konrad mentioned - received response in a moment when we're waiting for different one

Copy link
Author

@ntallar ntallar Oct 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd do this on a separate task, one that adds the BlockChainDataFetcher class and all the validations of requests from there, wdyt?

(maybe not adding it as is but just a class that could be used by the fetcher that contain all the request/response validations)


/**
* Headers request in progress but will be ignored due to invalidation
* State used to keep track of pending request to prevent multiple requests in parallel
*/
case object AwaitingHeadersToBeIgnored extends FetchingHeadersState

trait FetchingBodiesState
case object NotFetchingBodies extends FetchingBodiesState
case object AwaitingBodies extends FetchingBodiesState

/**
* Bodies request in progress but will be ignored due to invalidation
* State used to keep track of pending request to prevent multiple requests in parallel
*/
case object AwaitingBodiesToBeIgnored extends FetchingBodiesState
}
25 changes: 25 additions & 0 deletions src/test/scala/io/iohk/ethereum/BlockHelpers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.iohk.ethereum

import akka.util.ByteString
import io.iohk.ethereum.domain.Block

object BlockHelpers {

def generateChain(amount: Int, parent: Block): Seq[Block] = {
(1 to amount).foldLeft[Seq[Block]](Nil){ case (acc, _) =>
val baseBlock = Fixtures.Blocks.ValidBlock.block

val parentHeader = acc.lastOption.getOrElse(parent)
val blockHeader = baseBlock.header.copy(
number = parentHeader.number + 1,
parentHash = parentHeader.hash,
// Random nonce used for having our blocks be different
nonce = ByteString(Math.random().toString)
)
val block = baseBlock.copy(header = blockHeader)

acc :+ block
}
}

}
Loading