Skip to content

Commit b5cb34b

Browse files
authored
[ETCM-283] Add block bodies validation in block fetcher (#771)
* Add block bodies validation in block fetcher * Pass block validator as param * Add unit test for the failure scenario * Check that blocks correspond to the requested headers that we want to process * Remove unnecesary lastFullBlockNumber
1 parent 482340d commit b5cb34b

File tree

8 files changed

+263
-82
lines changed

8 files changed

+263
-82
lines changed

src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ object RegularSyncItSpecUtils {
6363
"pending-transactions-manager"
6464
)
6565

66+
lazy val validators = buildEthashConsensus.validators
67+
6668
lazy val regularSync = system.actorOf(
6769
RegularSync.props(
6870
peersClient,
@@ -71,6 +73,7 @@ object RegularSyncItSpecUtils {
7173
ledger,
7274
bl,
7375
blockchainConfig, // FIXME: remove in ETCM-280
76+
validators.blockValidator,
7477
testSyncConfig,
7578
ommersPool,
7679
pendingTransactionsManager,

src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ class SyncController(
103103
ledger,
104104
blockchain,
105105
blockchainConfig,
106+
validators.blockValidator,
106107
syncConfig,
107108
ommersPool,
108109
pendingTransactionsManager,

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import cats.data.NonEmptyList
88
import cats.instances.future._
99
import cats.instances.option._
1010
import cats.syntax.either._
11+
import io.iohk.ethereum.consensus.validators.BlockValidator
1112
import io.iohk.ethereum.blockchain.sync.PeersClient._
1213
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
1314
AwaitingBodiesToBeIgnored,
@@ -37,6 +38,7 @@ class BlockFetcher(
3738
val peerEventBus: ActorRef,
3839
val supervisor: ActorRef,
3940
val syncConfig: SyncConfig,
41+
val blockValidator: BlockValidator,
4042
implicit val scheduler: Scheduler
4143
) extends Actor
4244
with ActorLogging {
@@ -54,7 +56,7 @@ class BlockFetcher(
5456
}
5557

5658
private def idle(): Receive = handleCommonMessages(None) orElse { case Start(importer, blockNr) =>
57-
BlockFetcherState.initial(importer, blockNr) |> fetchBlocks
59+
BlockFetcherState.initial(importer, blockValidator, blockNr) |> fetchBlocks
5860
peerEventBus ! Subscribe(
5961
MessageClassifier(
6062
Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code, BlockHeaders.code),
@@ -138,19 +140,25 @@ class BlockFetcher(
138140

139141
private def handleBodiesMessages(state: BlockFetcherState): Receive = {
140142
case Response(peer, BlockBodies(bodies)) if state.isFetchingBodies =>
141-
val newState =
142-
if (state.fetchingBodiesState == AwaitingBodiesToBeIgnored) {
143-
log.debug("Received {} block bodies that will be ignored", bodies.size)
144-
state.withBodiesFetchReceived
145-
} else {
146-
log.debug("Fetched {} block bodies", bodies.size)
147-
state.withBodiesFetchReceived.addBodies(peer.id, bodies)
148-
}
149-
150-
fetchBlocks(newState)
143+
log.debug(s"Received ${bodies.size} block bodies")
144+
if (state.fetchingBodiesState == AwaitingBodiesToBeIgnored) {
145+
log.debug("Block bodies will be ignored due to an invalidation was requested for them")
146+
fetchBlocks(state.withBodiesFetchReceived)
147+
} else {
148+
val newState =
149+
state.validateBodies(bodies) match {
150+
case Left(err) =>
151+
peersClient ! BlacklistPeer(peer.id, err)
152+
state.withBodiesFetchReceived
153+
case Right(newBlocks) =>
154+
state.withBodiesFetchReceived.handleRequestedBlocks(newBlocks, peer.id)
155+
}
156+
val waitingHeadersDequeued = state.waitingHeaders.size - newState.waitingHeaders.size
157+
log.debug(s"Processed ${waitingHeadersDequeued} new blocks from received block bodies")
158+
fetchBlocks(newState)
159+
}
151160
case RetryBodiesRequest if state.isFetchingBodies =>
152161
log.debug("Something failed on a bodies request, cancelling the request and re-fetching")
153-
154162
val newState = state.withBodiesFetchReceived
155163
fetchBlocks(newState)
156164
}
@@ -194,7 +202,7 @@ class BlockFetcher(
194202
//TODO ETCM-389: Handle mined, checkpoint and new blocks uniformly
195203
log.debug("Received NewBlock {}", block.idTag)
196204
val newBlockNr = block.number
197-
val nextExpectedBlock = state.lastFullBlockNumber + 1
205+
val nextExpectedBlock = state.lastBlock + 1
198206

199207
if (state.isOnTop && newBlockNr == nextExpectedBlock) {
200208
log.debug("Passing block directly to importer")
@@ -346,9 +354,10 @@ object BlockFetcher {
346354
peerEventBus: ActorRef,
347355
supervisor: ActorRef,
348356
syncConfig: SyncConfig,
357+
blockValidator: BlockValidator,
349358
scheduler: Scheduler
350359
): Props =
351-
Props(new BlockFetcher(peersClient, peerEventBus, supervisor, syncConfig, scheduler))
360+
Props(new BlockFetcher(peersClient, peerEventBus, supervisor, syncConfig, blockValidator, scheduler))
352361

353362
sealed trait FetchMsg
354363
case class Start(importer: ActorRef, fromBlock: BigInt) extends FetchMsg

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala

Lines changed: 75 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@ package io.iohk.ethereum.blockchain.sync.regular
33
import akka.actor.ActorRef
44
import akka.util.ByteString
55
import cats.data.NonEmptyList
6+
import io.iohk.ethereum.consensus.validators.BlockValidator
67
import cats.implicits._
78
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState._
89
import io.iohk.ethereum.domain.{Block, BlockBody, BlockHeader, HeadersSeq}
910
import io.iohk.ethereum.network.PeerId
1011
import io.iohk.ethereum.network.p2p.messages.PV62.BlockHash
1112

1213
import scala.collection.immutable.Queue
14+
import scala.annotation.tailrec
15+
import io.iohk.ethereum.consensus.validators.BlockValidator
1316

1417
// scalastyle:off number.of.methods
1518
/**
@@ -33,6 +36,7 @@ import scala.collection.immutable.Queue
3336
*/
3437
case class BlockFetcherState(
3538
importer: ActorRef,
39+
blockValidator: BlockValidator,
3640
readyBlocks: Queue[Block],
3741
waitingHeaders: Queue[BlockHeader],
3842
fetchingHeadersState: FetchingHeadersState,
@@ -55,12 +59,6 @@ case class BlockFetcherState(
5559

5660
def hasReachedSize(size: Int): Boolean = (readyBlocks.size + waitingHeaders.size) >= size
5761

58-
def lastFullBlockNumber: BigInt =
59-
readyBlocks.lastOption
60-
.map(_.number)
61-
.orElse(waitingHeaders.headOption.map(_.number - 1))
62-
.getOrElse(lastBlock)
63-
6462
def lowestBlock: BigInt =
6563
readyBlocks.headOption
6664
.map(_.number)
@@ -117,35 +115,69 @@ case class BlockFetcherState(
117115
)
118116

119117
/**
120-
* Matches bodies with headers in queue and adding matched bodies to the blocks.
121-
* If bodies is empty collection - headers in queue are removed as the cause is:
122-
* - the headers are from rejected fork and therefore it won't be possible to resolve bodies for them
123-
* - given peer is still syncing (quite unlikely due to preference of peers with best total difficulty
124-
* when making a request)
118+
* When bodies are requested, the response don't need to be a complete sub chain,
119+
* even more, we could receive an empty chain and that will be considered valid. Here we just
120+
* validate that the received bodies corresponds to an ordered subset of the requested headers.
125121
*/
126-
def addBodies(peerId: PeerId, bodies: Seq[BlockBody]): BlockFetcherState = {
127-
if (bodies.isEmpty) {
128-
copy(waitingHeaders = Queue.empty)
129-
} else {
130-
val (matching, waiting) = waitingHeaders.splitAt(bodies.length)
131-
val blocks = matching.zip(bodies).map((Block.apply _).tupled)
122+
def validateBodies(receivedBodies: Seq[BlockBody]): Either[String, Seq[Block]] =
123+
bodiesAreOrderedSubsetOfRequested(waitingHeaders.toList, receivedBodies)
124+
.toRight(
125+
"Received unrequested bodies"
126+
)
132127

133-
withPeerForBlocks(peerId, blocks.map(_.header.number))
134-
.copy(
135-
readyBlocks = readyBlocks.enqueue(blocks),
136-
waitingHeaders = waiting
137-
)
128+
// Checks that the received block bodies are an ordered subset of the ones requested
129+
@tailrec
130+
private def bodiesAreOrderedSubsetOfRequested(
131+
requestedHeaders: Seq[BlockHeader],
132+
respondedBodies: Seq[BlockBody],
133+
matchedBlocks: Seq[Block] = Nil
134+
): Option[Seq[Block]] =
135+
(requestedHeaders, respondedBodies) match {
136+
case (Seq(), _ +: _) => None
137+
case (_, Seq()) => Some(matchedBlocks)
138+
case (header +: remainingHeaders, body +: remainingBodies) =>
139+
val doMatch = blockValidator.validateHeaderAndBody(header, body).isRight
140+
if (doMatch)
141+
bodiesAreOrderedSubsetOfRequested(remainingHeaders, remainingBodies, matchedBlocks :+ Block(header, body))
142+
else
143+
bodiesAreOrderedSubsetOfRequested(remainingHeaders, respondedBodies, matchedBlocks)
138144
}
139-
}
140145

141-
def appendNewBlock(block: Block, fromPeer: PeerId): BlockFetcherState =
142-
withPeerForBlocks(fromPeer, Seq(block.header.number))
143-
.withPossibleNewTopAt(block.number)
144-
.withLastBlock(block.number)
145-
.copy(
146-
readyBlocks = readyBlocks.enqueue(block),
147-
waitingHeaders = waitingHeaders.filter(block.number != _.number)
146+
/**
147+
* If blocks is empty collection - headers in queue are removed as the cause is:
148+
* - the headers are from rejected fork and therefore it won't be possible to resolve blocks for them
149+
* - given peer is still syncing (quite unlikely due to preference of peers with best total difficulty
150+
* when making a request)
151+
*/
152+
def handleRequestedBlocks(blocks: Seq[Block], fromPeer: PeerId): BlockFetcherState =
153+
if (blocks.isEmpty)
154+
copy(
155+
waitingHeaders = Queue.empty
148156
)
157+
else
158+
blocks.foldLeft(this) { case (state, block) =>
159+
state.enqueueRequestedBlock(block, fromPeer)
160+
}
161+
162+
/**
163+
* If the requested block is not the next in the line in the waiting headers queue,
164+
* we opt for not adding it in the ready blocks queue.
165+
*/
166+
def enqueueRequestedBlock(block: Block, fromPeer: PeerId): BlockFetcherState =
167+
waitingHeaders.dequeueOption
168+
.map { case (waitingHeader, waitingHeadersTail) =>
169+
if (waitingHeader.hash == block.hash)
170+
withPeerForBlocks(fromPeer, Seq(block.number))
171+
.withPossibleNewTopAt(block.number)
172+
.withLastBlock(block.number)
173+
.copy(
174+
readyBlocks = readyBlocks.enqueue(block),
175+
waitingHeaders = waitingHeadersTail
176+
)
177+
else
178+
this
179+
}
180+
.getOrElse(this)
149181

150182
def pickBlocks(amount: Int): Option[(NonEmptyList[Block], BlockFetcherState)] =
151183
if (readyBlocks.nonEmpty) {
@@ -242,17 +274,19 @@ case class BlockFetcherState(
242274
object BlockFetcherState {
243275
case class StateNodeFetcher(hash: ByteString, replyTo: ActorRef)
244276

245-
def initial(importer: ActorRef, lastBlock: BigInt): BlockFetcherState = BlockFetcherState(
246-
importer = importer,
247-
readyBlocks = Queue(),
248-
waitingHeaders = Queue(),
249-
fetchingHeadersState = NotFetchingHeaders,
250-
fetchingBodiesState = NotFetchingBodies,
251-
stateNodeFetcher = None,
252-
lastBlock = lastBlock,
253-
knownTop = lastBlock + 1,
254-
blockProviders = Map()
255-
)
277+
def initial(importer: ActorRef, blockValidator: BlockValidator, lastBlock: BigInt): BlockFetcherState =
278+
BlockFetcherState(
279+
importer = importer,
280+
blockValidator = blockValidator,
281+
readyBlocks = Queue(),
282+
waitingHeaders = Queue(),
283+
fetchingHeadersState = NotFetchingHeaders,
284+
fetchingBodiesState = NotFetchingBodies,
285+
stateNodeFetcher = None,
286+
lastBlock = lastBlock,
287+
knownTop = lastBlock + 1,
288+
blockProviders = Map()
289+
)
256290

257291
trait FetchingHeadersState
258292
case object NotFetchingHeaders extends FetchingHeadersState

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress
88
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.{NewCheckpoint, ProgressProtocol, ProgressState}
99
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.InternalLastBlockImport
1010
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
11+
import io.iohk.ethereum.consensus.validators.BlockValidator
1112
import io.iohk.ethereum.crypto.ECDSASignature
1213
import io.iohk.ethereum.domain.Blockchain
1314
import io.iohk.ethereum.ledger.Ledger
@@ -21,6 +22,7 @@ class RegularSync(
2122
ledger: Ledger,
2223
blockchain: Blockchain,
2324
blockchainConfig: BlockchainConfig,
25+
blockValidator: BlockValidator,
2426
syncConfig: SyncConfig,
2527
ommersPool: ActorRef,
2628
pendingTransactionsManager: ActorRef,
@@ -30,7 +32,10 @@ class RegularSync(
3032
with ActorLogging {
3133

3234
val fetcher: ActorRef =
33-
context.actorOf(BlockFetcher.props(peersClient, peerEventBus, self, syncConfig, scheduler), "block-fetcher")
35+
context.actorOf(
36+
BlockFetcher.props(peersClient, peerEventBus, self, syncConfig, blockValidator, scheduler),
37+
"block-fetcher"
38+
)
3439
val broadcaster: ActorRef = context.actorOf(
3540
BlockBroadcasterActor
3641
.props(new BlockBroadcast(etcPeerManager, syncConfig), peerEventBus, etcPeerManager, syncConfig, scheduler),
@@ -121,6 +126,7 @@ object RegularSync {
121126
ledger: Ledger,
122127
blockchain: Blockchain,
123128
blockchainConfig: BlockchainConfig,
129+
blockValidator: BlockValidator,
124130
syncConfig: SyncConfig,
125131
ommersPool: ActorRef,
126132
pendingTransactionsManager: ActorRef,
@@ -135,6 +141,7 @@ object RegularSync {
135141
ledger,
136142
blockchain,
137143
blockchainConfig,
144+
blockValidator,
138145
syncConfig,
139146
ommersPool,
140147
pendingTransactionsManager,

0 commit comments

Comments
 (0)