-
Notifications
You must be signed in to change notification settings - Fork 75
[ETCM-211] Tracking of headers/bodies requests that will be ignored due to invalidation #749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ETCM-211] Tracking of headers/bodies requests that will be ignored due to invalidation #749
Conversation
4350799
to
0a18fc7
Compare
…ue to invalidation
0a18fc7
to
523e90a
Compare
state.appendHeaders(validHeaders) | ||
} | ||
val newState = | ||
if (state.fetchingHeadersState == AwaitingHeadersToBeIgnored) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I am missing somehing, but what will happen in case of request time out ?
I see such path:
- request headers
- fetcher receive invalidate
- request times out, but we are still in
AwaitingHeadersToBeIgnored
state - fetcher create request for new headers, from new invalidated blocks
- fetcher receivers response witth new block headers, but it is ignored as fetcher is still in
AwaitingHeadersToBeIgnored
Is it correct, or there is some detail i am missing ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right! I thought I had analyzed that but I missed it
I'll update this PR with something even closer to what we do when receiving the response
…ling of responses
|
||
"BlockFetcher" - { | ||
|
||
"should not requests headers upon invalidation if a request is already in progress" in new TestSetup { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could add separate test case for time case case ?
|
||
fetchBlocks(newState) | ||
case RetryHeadersRequest if state.isFetchingHeaders => | ||
log.debug("Retrying request for headers") | ||
fetchHeaders(state) | ||
log.debug("Time-out occurred while waiting for headers") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more of discussion type comment. Do you think it would bring problems if we would receive this time out response later ? i.e something like:
- We request header lets say from block with number 100
- we receive invalidate from 100 to 80
- we receive timeout for our request, we blacklist peer and clear state.
- after some time (200 s by default) peer is unblacklisted , and we request headers from block with number 90 form it.
- we receive old response due to some network traffic problems or something like that.
Do you think it could result in something more than blacklising peer ? (for example triggering this bug with unbounded resources usage)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely, we are currently doing no validations that the headers received match the request
With that sort of validations we should be safe I think, but for now I was assuming that that case will never happen, the 30 seconds till timeout should be more than enough to prevent that, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes me think if this whole solution shouldn't be implemented as Actor/class in between fetcher and peers client. It could track requests made and invalidate previous ones basing on strategy choosen (at-most-n, allow-all, etc.).
This could simplify things significantly and allow code in fetcher focus on just orchestrating fetching of blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would this actor do? Are you maybe thinking of something like the BlockChainDataFetcher of our other project? (only on how it requests headers and bodies)
It could track requests
That at least would focus on ☝️ part and allow validating that the responses received match the requests we sent
Either way I'd do that effort in parallel with this kind of patching that only touches the minimum required (and is needed for the testnet)
I'm a bit worried of how risky any design changes on our sync could be... though that BlockChainDataFetcher refactoring would be low risk I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, BlockFetcher
is more or less the same as BlockchainDataFetcher
there already IMO. I think more on structure like that:
block fetcher
| | |
headers fetcher(Fetcher) bodies fetcher(Fetcher) state nodes fetcher(Fetcher)
| | |
peers client
Where Fetcher
is that specialized class/actor that manages requests of given type and could have interface like:
abstract class Fetcher(multipleReqStrategy: AtMostN|AllowAll) {
def makeRequest(msg: MessageSerializable): IO[msg.Response] //Issues a request and add it to tracked pool
def cancelPendingRequests: IO[Unit] //Stores information about cancellation so knows which responses cannot be passed further if received
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so imo details up to discussion, but i am all for doing some kinda detailed request-response tracking. Unfortunately as eth do not have any request-id for its request-response messages, such thing is ultimately necessary if we want to have possiblitly to cancel in fly requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we receive old response due to some network traffic problems or something like that.
I just realized this might not be a problem, our PeerRequestHandler awaiting the response should have been killed by then so any too long into the future response should be discarded.
Either way I added this task to add validations that the request/response match and then detect malicious peer behaviour: https://jira.iohk.io/browse/ETCM-283
Unfortunately as eth do not have any request-id for its request-response messages, such thing is ultimately necessary if we want to have possiblitly to cancel in fly requests.
Due to our PeerRequestHandler design this might be easier to do 🤔 we can identify the request by the actor that's in charge of getting the response. If we use a single peer per request type that might work (for now we don't have even more than one request type in parallel)
case Right(validHeaders) => | ||
state.appendHeaders(validHeaders) | ||
} | ||
val newState = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd move this code into method on BlockFetcherState
and leave here only logging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a handleHeaderResponse function you mean? Isn't the current structure more oriented to doing the handling of responses on the actor? Shouldn't we move the bodies/nodes response handling as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After second look - let's keep it as is. It's just - the more I work with actors, more I want extract all the logic from them so it can be tested without creating that actor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is totally my current sentiments about actors, they should have as little logic as possible and should be only thin communication layer over main logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'm having the same concerns with them
However for this case in particular I think we would fall into a too big and not understandable BlockFetcherState class if we start moving everything there, we should probably start splitting up this logic into other classes
|
||
fetchBlocks(newState) | ||
case RetryHeadersRequest if state.isFetchingHeaders => | ||
log.debug("Retrying request for headers") | ||
fetchHeaders(state) | ||
log.debug("Time-out occurred while waiting for headers") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes me think if this whole solution shouldn't be implemented as Actor/class in between fetcher and peers client. It could track requests made and invalidate previous ones basing on strategy choosen (at-most-n, allow-all, etc.).
This could simplify things significantly and allow code in fetcher focus on just orchestrating fetching of blocks.
def fetchingHeaders(isFetching: Boolean): BlockFetcherState = copy(isFetchingHeaders = isFetching) | ||
def isFetchingHeaders: Boolean = fetchingHeadersState != NoHeadersFetched | ||
def withNewHeadersFetch: BlockFetcherState = copy(fetchingHeadersState = AwaitingHeaders) | ||
def withHeaderFetchReceived: BlockFetcherState = copy(fetchingHeadersState = NoHeadersFetched) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no fetched
or not fetching
?
def fetchingBodies(isFetching: Boolean): BlockFetcherState = copy(isFetchingBodies = isFetching) | ||
def isFetchingBodies: Boolean = fetchingBodiesState != NoBodiesFetched | ||
def withNewBodiesFetch: BlockFetcherState = copy(fetchingBodiesState = AwaitingBodies) | ||
def withBodiesFetchReceived: BlockFetcherState = copy(fetchingBodiesState = NoBodiesFetched) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
||
trait FetchingHeadersState | ||
case object NoHeadersFetched extends FetchingHeadersState | ||
case object AwaitingHeaders extends FetchingHeadersState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about tracking parameters for request made? It would allow to track (and at least log for now) case which Konrad mentioned - received response in a moment when we're waiting for different one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd do this on a separate task, one that adds the BlockChainDataFetcher class and all the validations of requests from there, wdyt?
(maybe not adding it as is but just a class that could be used by the fetcher that contain all the request/response validations)
import akka.util.ByteString | ||
import io.iohk.ethereum.domain.Block | ||
|
||
trait BlockHelpers { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you make it an object? I'd move helpers I extracted there: https://github.com/input-output-hk/mantis/pull/735/files#diff-840c293a5c8d271766b7ad38d72f23fdc2a5d0406db32ee60fd727cc6615ca75R1 into the same package, so less conflicts and/or code duplicates are created
…dded extra test for the block fetcher
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Description
Address the cause of CPU and network spikes on the testnet
Proposed Solution
Scenario addressed
Eventually the invalidation of local blocks due to unknown branch should have happened while a request was in progress, causing a new request to be sent, having at that moment 2 of them in parallel. As the sync doesn't handle 2 requests in parallel for the same type of data, eventually it should cause node 1 to have a lot of requests in parallel, using up a lot of network and cpu resources.
Note that the step 1. and related checks are needed to prevent falling into the issue: https://jira.iohk.io/browse/ETCM-246
Note that step 8 is needed due to: https://jira.iohk.io/browse/ETCM-248
Indicators for the issue ocurring:
Testing