Skip to content

Commit 79be12a

Browse files
committed
[ETCM-283] Check that blocks correspond to the requested headers that we want to process
1 parent 85cbcf5 commit 79be12a

File tree

2 files changed

+48
-20
lines changed

2 files changed

+48
-20
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -140,24 +140,25 @@ class BlockFetcher(
140140

141141
private def handleBodiesMessages(state: BlockFetcherState): Receive = {
142142
case Response(peer, BlockBodies(bodies)) if state.isFetchingBodies =>
143-
val newState =
144-
if (state.fetchingBodiesState == AwaitingBodiesToBeIgnored) {
145-
log.debug("Received {} block bodies that will be ignored", bodies.size)
146-
state.withBodiesFetchReceived
147-
} else {
143+
log.debug(s"Received ${bodies.size} block bodies")
144+
if (state.fetchingBodiesState == AwaitingBodiesToBeIgnored) {
145+
log.debug("Block bodies will be ignored due to an invalidation was requested for them")
146+
fetchBlocks(state.withBodiesFetchReceived)
147+
} else {
148+
val newState =
148149
state.validateBodies(bodies) match {
149150
case Left(err) =>
150151
peersClient ! BlacklistPeer(peer.id, err)
151152
state.withBodiesFetchReceived
152153
case Right(newBlocks) =>
153-
log.debug("Fetched {} block bodies", newBlocks.size)
154-
state.withBodiesFetchReceived.appendNewBlocks(newBlocks, peer.id)
154+
state.withBodiesFetchReceived.receiveBlocks(newBlocks, peer.id)
155155
}
156-
}
157-
fetchBlocks(newState)
156+
val waitingHeadersDequeued = newState.waitingHeaders.size - state.waitingHeaders.size
157+
log.debug(s"Processed ${waitingHeadersDequeued} new blocks from received block bodies")
158+
fetchBlocks(newState)
159+
}
158160
case RetryBodiesRequest if state.isFetchingBodies =>
159161
log.debug("Time-out occurred while waiting for bodies")
160-
161162
val newState = state.withBodiesFetchReceived
162163
fetchBlocks(newState)
163164
}
@@ -214,7 +215,7 @@ class BlockFetcher(
214215
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
215216
context become started(newState)
216217
// there are some blocks waiting for import but it seems that we reached top on fetch side so we can enqueue new block for import
217-
} else if (newBlockNr == nextExpectedBlock && !state.isFetching && state.waitingHeaders.isEmpty) {
218+
} else if (newBlockNr == nextExpectedBlock && !state.isFetching) {
218219
log.debug("Enqueue new block for import")
219220
val newState = state.appendNewBlock(block, peerId)
220221
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ case class BlockFetcherState(
105105
}
106106
)
107107

108+
/**
109+
* When bodies are requested, the response don't need to be a complete sub chain,
110+
* even more, we could receive an empty chain and that will be considered valid. Here we just
111+
* validate that the received bodies corresponds to an ordered subset of the requested headers.
112+
*/
108113
def validateBodies(receivedBodies: Seq[BlockBody]): Either[String, Seq[Block]] =
109114
bodiesAreOrderedSubsetOfRequested(waitingHeaders.toList, receivedBodies)
110115
.toRight(
@@ -129,22 +134,44 @@ case class BlockFetcherState(
129134
bodiesAreOrderedSubsetOfRequested(remainingHeaders, respondedBodies, matchedBlocks)
130135
}
131136

132-
def appendNewBlocks(blocks: Seq[Block], fromPeer: PeerId): BlockFetcherState = {
133-
val receivedHeaders = blocks.map(_.header)
134-
withPeerForBlocks(fromPeer, blocks.map(_.header.number))
135-
.copy(
136-
readyBlocks = readyBlocks.enqueue(blocks.toList),
137-
waitingHeaders = waitingHeaders.diff(receivedHeaders)
138-
)
137+
// We could optimize this method by stopping as soon as a block is not appended.
138+
def receiveBlocks(blocks: Seq[Block], fromPeer: PeerId): BlockFetcherState = {
139+
blocks.foldLeft(this) { case (state, block) =>
140+
state.receiveBlock(block, fromPeer)
141+
}
139142
}
140143

144+
/**
145+
* Currently to fill in headers we use a queue, so we if we try to process
146+
* a block that has its header in the queue but is not the next in the line,
147+
* we opt for not appending it.
148+
*/
149+
def receiveBlock(block: Block, fromPeer: PeerId): BlockFetcherState =
150+
waitingHeaders.dequeueOption
151+
.map { case (waitingHeader, waitingHeadersTail) =>
152+
if (waitingHeader.hash == block.hash)
153+
unsafeAppendNewBlock(block, fromPeer).copy(
154+
waitingHeaders = waitingHeadersTail
155+
)
156+
else
157+
this
158+
}
159+
.getOrElse(this)
160+
161+
// only succeed if there is no waiting headers.
141162
def appendNewBlock(block: Block, fromPeer: PeerId): BlockFetcherState =
163+
if (waitingHeaders.isEmpty)
164+
unsafeAppendNewBlock(block, fromPeer)
165+
else
166+
this
167+
168+
// unsafe in terms of not checking waiting headers queue
169+
private def unsafeAppendNewBlock(block: Block, fromPeer: PeerId): BlockFetcherState =
142170
withPeerForBlocks(fromPeer, Seq(block.header.number))
143171
.withPossibleNewTopAt(block.number)
144172
.withLastBlock(block.number)
145173
.copy(
146-
readyBlocks = readyBlocks.enqueue(block),
147-
waitingHeaders = waitingHeaders.filter(block.number != _.number)
174+
readyBlocks = readyBlocks.enqueue(block)
148175
)
149176

150177
def pickBlocks(amount: Int): Option[(NonEmptyList[Block], BlockFetcherState)] =

0 commit comments

Comments
 (0)