-
Notifications
You must be signed in to change notification settings - Fork 75
[ETCM-211] Tracking of headers/bodies requests that will be ignored due to invalidation #749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
523e90a
5f49352
6fbad03
822a21f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,10 @@ import cats.instances.future._ | |
import cats.instances.option._ | ||
import cats.syntax.either._ | ||
import io.iohk.ethereum.blockchain.sync.PeersClient._ | ||
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{ | ||
AwaitingBodiesToBeIgnored, | ||
AwaitingHeadersToBeIgnored | ||
} | ||
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop} | ||
import io.iohk.ethereum.crypto.kec256 | ||
import io.iohk.ethereum.domain._ | ||
|
@@ -91,27 +95,51 @@ class BlockFetcher( | |
|
||
private def handleHeadersMessages(state: BlockFetcherState): Receive = { | ||
case Response(peer, BlockHeaders(headers)) if state.isFetchingHeaders => | ||
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number)) | ||
|
||
val newState = state.validatedHeaders(headers) match { | ||
case Left(err) => | ||
peersClient ! BlacklistPeer(peer.id, err) | ||
state.fetchingHeaders(false) | ||
case Right(validHeaders) => | ||
state.appendHeaders(validHeaders) | ||
} | ||
val newState = | ||
if (state.fetchingHeadersState == AwaitingHeadersToBeIgnored) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe I am missing somehing, but what will happen in case of request time out ?
Is it correct, or there is some detail i am missing ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right! I thought I had analyzed that but I missed it I'll update this PR with something even closer to what we do when receiving the response |
||
log.debug( | ||
"Received {} headers starting from block {} that will be ignored", | ||
headers.size, | ||
headers.headOption.map(_.number) | ||
) | ||
state.withHeaderFetchReceived | ||
} else { | ||
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number)) | ||
|
||
state.validatedHeaders(headers) match { | ||
case Left(err) => | ||
peersClient ! BlacklistPeer(peer.id, err) | ||
state.withHeaderFetchReceived | ||
case Right(validHeaders) => | ||
state.withHeaderFetchReceived.appendHeaders(validHeaders) | ||
} | ||
} | ||
|
||
fetchBlocks(newState) | ||
case RetryHeadersRequest if state.isFetchingHeaders => | ||
log.debug("Retrying request for headers") | ||
fetchHeaders(state) | ||
log.debug("Time-out occurred while waiting for headers") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. more of discussion type comment. Do you think it would bring problems if we would receive this time out response later ? i.e something like:
Do you think it could result in something more than blacklising peer ? (for example triggering this bug with unbounded resources usage) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Definitely, we are currently doing no validations that the headers received match the request With that sort of validations we should be safe I think, but for now I was assuming that that case will never happen, the 30 seconds till timeout should be more than enough to prevent that, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That makes me think if this whole solution shouldn't be implemented as Actor/class in between fetcher and peers client. It could track requests made and invalidate previous ones basing on strategy choosen (at-most-n, allow-all, etc.). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would this actor do? Are you maybe thinking of something like the BlockChainDataFetcher of our other project? (only on how it requests headers and bodies)
That at least would focus on ☝️ part and allow validating that the responses received match the requests we sent Either way I'd do that effort in parallel with this kind of patching that only touches the minimum required (and is needed for the testnet) I'm a bit worried of how risky any design changes on our sync could be... though that BlockChainDataFetcher refactoring would be low risk I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well,
Where
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so imo details up to discussion, but i am all for doing some kinda detailed request-response tracking. Unfortunately as eth do not have any request-id for its request-response messages, such thing is ultimately necessary if we want to have possiblitly to cancel in fly requests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I just realized this might not be a problem, our PeerRequestHandler awaiting the response should have been killed by then so any too long into the future response should be discarded. Either way I added this task to add validations that the request/response match and then detect malicious peer behaviour: https://jira.iohk.io/browse/ETCM-283
Due to our PeerRequestHandler design this might be easier to do 🤔 we can identify the request by the actor that's in charge of getting the response. If we use a single peer per request type that might work (for now we don't have even more than one request type in parallel) |
||
|
||
val newState = state.withHeaderFetchReceived | ||
fetchBlocks(newState) | ||
} | ||
|
||
private def handleBodiesMessages(state: BlockFetcherState): Receive = { | ||
case Response(peer, BlockBodies(bodies)) if state.isFetchingBodies => | ||
log.debug("Fetched {} block bodies", bodies.size) | ||
state.addBodies(peer, bodies) |> fetchBlocks | ||
case RetryBodiesRequest if state.isFetchingBodies => fetchBodies(state) | ||
val newState = | ||
if (state.fetchingBodiesState == AwaitingBodiesToBeIgnored) { | ||
log.debug("Received {} block bodies that will be ignored", bodies.size) | ||
state.withBodiesFetchReceived | ||
} else { | ||
log.debug("Fetched {} block bodies", bodies.size) | ||
state.withBodiesFetchReceived.addBodies(peer, bodies) | ||
} | ||
|
||
fetchBlocks(newState) | ||
case RetryBodiesRequest if state.isFetchingBodies => | ||
log.debug("Time-out occurred while waiting for bodies") | ||
|
||
val newState = state.withBodiesFetchReceived | ||
fetchBlocks(newState) | ||
} | ||
|
||
private def handleStateNodeMessages(state: BlockFetcherState): Receive = { | ||
|
@@ -207,7 +235,7 @@ class BlockFetcher( | |
.filter(!_.hasFetchedTopHeader) | ||
.filter(!_.hasReachedSize(syncConfig.maxFetcherQueueSize)) | ||
.tap(fetchHeaders) | ||
.map(_.fetchingHeaders(true)) | ||
.map(_.withNewHeadersFetch) | ||
.getOrElse(fetcherState) | ||
|
||
private def fetchHeaders(state: BlockFetcherState): Unit = { | ||
|
@@ -229,10 +257,12 @@ class BlockFetcher( | |
.filter(!_.isFetchingBodies) | ||
.filter(_.waitingHeaders.nonEmpty) | ||
.tap(fetchBodies) | ||
.map(state => state.fetchingBodies(true)) | ||
.map(state => state.withNewBodiesFetch) | ||
.getOrElse(fetcherState) | ||
|
||
private def fetchBodies(state: BlockFetcherState): Unit = { | ||
log.debug("Fetching bodies") | ||
|
||
val hashes = state.takeHashes(syncConfig.blockBodiesPerRequest) | ||
requestBlockBodies(hashes) pipeTo self | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,12 +12,32 @@ import cats.syntax.option._ | |
|
||
import scala.collection.immutable.Queue | ||
|
||
// scalastyle:off number.of.methods | ||
/** | ||
* State used by the BlockFetcher | ||
* | ||
* @param importer the BlockImporter actor reference | ||
* @param readyBlocks | ||
* @param waitingHeaders | ||
* @param fetchingHeadersState the current state of the headers fetching, whether we | ||
* - haven't fetched any yet | ||
* - are awaiting a response | ||
* - are awaiting a response but it should be ignored due to blocks being invalidated | ||
* @param fetchingBodiesState the current state of the bodies fetching, whether we | ||
* - haven't fetched any yet | ||
* - are awaiting a response | ||
* - are awaiting a response but it should be ignored due to blocks being invalidated | ||
* @param stateNodeFetcher | ||
* @param lastBlock | ||
* @param knownTop | ||
* @param blockProviders | ||
*/ | ||
case class BlockFetcherState( | ||
importer: ActorRef, | ||
readyBlocks: Queue[Block], | ||
waitingHeaders: Queue[BlockHeader], | ||
isFetchingHeaders: Boolean, | ||
isFetchingBodies: Boolean, | ||
fetchingHeadersState: FetchingHeadersState, | ||
fetchingBodiesState: FetchingBodiesState, | ||
stateNodeFetcher: Option[StateNodeFetcher], | ||
lastBlock: BigInt, | ||
knownTop: BigInt, | ||
|
@@ -28,7 +48,7 @@ case class BlockFetcherState( | |
|
||
def isFetchingStateNode: Boolean = stateNodeFetcher.isDefined | ||
|
||
def hasEmptyBuffer: Boolean = readyBlocks.isEmpty && waitingHeaders.isEmpty | ||
private def hasEmptyBuffer: Boolean = readyBlocks.isEmpty && waitingHeaders.isEmpty | ||
|
||
def hasFetchedTopHeader: Boolean = lastBlock == knownTop | ||
|
||
|
@@ -53,8 +73,7 @@ case class BlockFetcherState( | |
def takeHashes(amount: Int): Seq[ByteString] = waitingHeaders.take(amount).map(_.hash) | ||
|
||
def appendHeaders(headers: Seq[BlockHeader]): BlockFetcherState = | ||
fetchingHeaders(false) | ||
.withPossibleNewTopAt(headers.lastOption.map(_.number)) | ||
withPossibleNewTopAt(headers.lastOption.map(_.number)) | ||
.copy( | ||
waitingHeaders = waitingHeaders ++ headers.filter(_.number > lastBlock).sortBy(_.number), | ||
lastBlock = HeadersSeq.lastNumber(headers).getOrElse(lastBlock) | ||
|
@@ -86,9 +105,11 @@ case class BlockFetcherState( | |
val (matching, waiting) = waitingHeaders.splitAt(bodies.length) | ||
val blocks = matching.zip(bodies).map((Block.apply _).tupled) | ||
|
||
fetchingBodies(false) | ||
.withPeerForBlocks(peer.id, blocks.map(_.header.number)) | ||
.copy(readyBlocks = readyBlocks.enqueue(blocks), waitingHeaders = waiting) | ||
withPeerForBlocks(peer.id, blocks.map(_.header.number)) | ||
.copy( | ||
readyBlocks = readyBlocks.enqueue(blocks), | ||
waitingHeaders = waiting | ||
) | ||
} | ||
|
||
def appendNewBlock(block: Block, fromPeer: PeerId): BlockFetcherState = | ||
|
@@ -126,18 +147,25 @@ case class BlockFetcherState( | |
|
||
def invalidateBlocksFrom(nr: BigInt): (Option[PeerId], BlockFetcherState) = invalidateBlocksFrom(nr, Some(nr)) | ||
|
||
def invalidateBlocksFrom(nr: BigInt, toBlacklist: Option[BigInt]): (Option[PeerId], BlockFetcherState) = | ||
def invalidateBlocksFrom(nr: BigInt, toBlacklist: Option[BigInt]): (Option[PeerId], BlockFetcherState) = { | ||
// We can't start completely from scratch as requests could be in progress, we have to keep special track of them | ||
val newFetchingHeadersState = | ||
if (fetchingHeadersState == AwaitingHeaders) AwaitingHeadersToBeIgnored else fetchingHeadersState | ||
val newFetchingBodiesState = | ||
if (fetchingBodiesState == AwaitingBodies) AwaitingBodiesToBeIgnored else fetchingBodiesState | ||
|
||
( | ||
toBlacklist.flatMap(blockProviders.get), | ||
copy( | ||
readyBlocks = Queue(), | ||
waitingHeaders = Queue(), | ||
lastBlock = (nr - 2).max(0), | ||
isFetchingHeaders = false, | ||
isFetchingBodies = false, | ||
fetchingHeadersState = newFetchingHeadersState, | ||
fetchingBodiesState = newFetchingBodiesState, | ||
blockProviders = blockProviders - nr | ||
) | ||
) | ||
} | ||
|
||
def withLastBlock(nr: BigInt): BlockFetcherState = copy(lastBlock = nr) | ||
|
||
|
@@ -154,9 +182,13 @@ case class BlockFetcherState( | |
def withPeerForBlocks(peerId: PeerId, blocks: Seq[BigInt]): BlockFetcherState = | ||
copy(blockProviders = blockProviders ++ blocks.map(block => block -> peerId)) | ||
|
||
def fetchingHeaders(isFetching: Boolean): BlockFetcherState = copy(isFetchingHeaders = isFetching) | ||
def isFetchingHeaders: Boolean = fetchingHeadersState != NotFetchingHeaders | ||
def withNewHeadersFetch: BlockFetcherState = copy(fetchingHeadersState = AwaitingHeaders) | ||
def withHeaderFetchReceived: BlockFetcherState = copy(fetchingHeadersState = NotFetchingHeaders) | ||
|
||
def fetchingBodies(isFetching: Boolean): BlockFetcherState = copy(isFetchingBodies = isFetching) | ||
def isFetchingBodies: Boolean = fetchingBodiesState != NotFetchingBodies | ||
def withNewBodiesFetch: BlockFetcherState = copy(fetchingBodiesState = AwaitingBodies) | ||
def withBodiesFetchReceived: BlockFetcherState = copy(fetchingBodiesState = NotFetchingBodies) | ||
|
||
def fetchingStateNode(hash: ByteString, requestor: ActorRef): BlockFetcherState = | ||
copy(stateNodeFetcher = Some(StateNodeFetcher(hash, requestor))) | ||
|
@@ -188,11 +220,31 @@ object BlockFetcherState { | |
importer = importer, | ||
readyBlocks = Queue(), | ||
waitingHeaders = Queue(), | ||
isFetchingHeaders = false, | ||
isFetchingBodies = false, | ||
fetchingHeadersState = NotFetchingHeaders, | ||
fetchingBodiesState = NotFetchingBodies, | ||
stateNodeFetcher = None, | ||
lastBlock = lastBlock, | ||
knownTop = lastBlock + 1, | ||
blockProviders = Map() | ||
) | ||
|
||
trait FetchingHeadersState | ||
case object NotFetchingHeaders extends FetchingHeadersState | ||
case object AwaitingHeaders extends FetchingHeadersState | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about tracking parameters for request made? It would allow to track (and at least log for now) case which Konrad mentioned - received response in a moment when we're waiting for different one There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd do this on a separate task, one that adds the BlockChainDataFetcher class and all the validations of requests from there, wdyt? (maybe not adding it as is but just a class that could be used by the fetcher that contain all the request/response validations) |
||
|
||
/** | ||
* Headers request in progress but will be ignored due to invalidation | ||
* State used to keep track of pending request to prevent multiple requests in parallel | ||
*/ | ||
case object AwaitingHeadersToBeIgnored extends FetchingHeadersState | ||
|
||
trait FetchingBodiesState | ||
case object NotFetchingBodies extends FetchingBodiesState | ||
case object AwaitingBodies extends FetchingBodiesState | ||
|
||
/** | ||
* Bodies request in progress but will be ignored due to invalidation | ||
* State used to keep track of pending request to prevent multiple requests in parallel | ||
*/ | ||
case object AwaitingBodiesToBeIgnored extends FetchingBodiesState | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package io.iohk.ethereum | ||
|
||
import akka.util.ByteString | ||
import io.iohk.ethereum.domain.Block | ||
|
||
object BlockHelpers { | ||
|
||
def generateChain(amount: Int, parent: Block): Seq[Block] = { | ||
(1 to amount).foldLeft[Seq[Block]](Nil){ case (acc, _) => | ||
val baseBlock = Fixtures.Blocks.ValidBlock.block | ||
|
||
val parentHeader = acc.lastOption.getOrElse(parent) | ||
val blockHeader = baseBlock.header.copy( | ||
number = parentHeader.number + 1, | ||
parentHash = parentHeader.hash, | ||
// Random nonce used for having our blocks be different | ||
nonce = ByteString(Math.random().toString) | ||
) | ||
val block = baseBlock.copy(header = blockHeader) | ||
|
||
acc :+ block | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd move this code into method on
BlockFetcherState
and leave here only loggingThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a handleHeaderResponse function you mean? Isn't the current structure more oriented to doing the handling of responses on the actor? Shouldn't we move the bodies/nodes response handling as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After second look - let's keep it as is. It's just - the more I work with actors, more I want extract all the logic from them so it can be tested without creating that actor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is totally my current sentiments about actors, they should have as little logic as possible and should be only thin communication layer over main logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'm having the same concerns with them
However for this case in particular I think we would fall into a too big and not understandable BlockFetcherState class if we start moving everything there, we should probably start splitting up this logic into other classes