Skip to content

Commit 7a1323e

Browse files
authored
Etcm 290 invalid branches built by fetcher (#787)
- Fix for invalid chains passed to importer - Add chain validation on headers added to queue, remove pending requests check when checking for top on fetch side - Temporarily remove peers blacklisting after receiving useless response, improve peer message logging - Fix problem with fetcher being stuck with headers from rejected fork
1 parent 9ba2205 commit 7a1323e

File tree

8 files changed

+170
-64
lines changed

8 files changed

+170
-64
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,12 @@ class BlockFetcher(
114114
} else {
115115
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))
116116

117-
state.validatedHeaders(headers) match {
117+
state.appendHeaders(headers) match {
118118
case Left(err) =>
119-
peersClient ! BlacklistPeer(peer.id, err)
119+
log.info("Dismissed received headers due to: {}", err)
120120
state.withHeaderFetchReceived
121-
case Right(validHeaders) =>
122-
state.withHeaderFetchReceived.appendHeaders(validHeaders)
121+
case Right(updatedState) =>
122+
updatedState.withHeaderFetchReceived
123123
}
124124
}
125125

@@ -144,7 +144,7 @@ class BlockFetcher(
144144
state.withBodiesFetchReceived
145145
} else {
146146
log.debug("Fetched {} block bodies", bodies.size)
147-
state.withBodiesFetchReceived.addBodies(peer, bodies)
147+
state.withBodiesFetchReceived.addBodies(peer.id, bodies)
148148
}
149149

150150
fetchBlocks(newState)
@@ -188,40 +188,24 @@ class BlockFetcher(
188188
case Left(_) => state
189189
case Right(validHashes) => state.withPossibleNewTopAt(validHashes.lastOption.map(_.number))
190190
}
191-
192191
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
193-
194192
fetchBlocks(newState)
195193
case MessageFromPeer(NewBlock(_, block, _), peerId) =>
194+
//TODO ETCM-389: Handle mined, checkpoint and new blocks uniformly
195+
log.debug("Received NewBlock {}", block.idTag)
196196
val newBlockNr = block.number
197197
val nextExpectedBlock = state.lastFullBlockNumber + 1
198198

199-
log.debug("Received NewBlock nr {}", newBlockNr)
200-
201-
// we're on top, so we can pass block directly to importer
202-
if (newBlockNr == nextExpectedBlock && state.isOnTop) {
203-
log.debug("Pass block directly to importer")
199+
if (state.isOnTop && newBlockNr == nextExpectedBlock) {
200+
log.debug("Passing block directly to importer")
204201
val newState = state.withPeerForBlocks(peerId, Seq(newBlockNr)).withKnownTopAt(newBlockNr)
205202
state.importer ! OnTop
206203
state.importer ! ImportNewBlock(block, peerId)
207204
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
208205
context become started(newState)
209-
// there are some blocks waiting for import but it seems that we reached top on fetch side so we can enqueue new block for import
210-
} else if (newBlockNr == nextExpectedBlock && !state.isFetching && state.waitingHeaders.isEmpty) {
211-
log.debug("Enqueue new block for import")
212-
val newState = state.appendNewBlock(block, peerId)
213-
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
214-
context become started(newState)
215-
// waiting for some bodies but we don't have this header yet - at least we can use new block header
216-
} else if (newBlockNr == state.nextToLastBlock && !state.isFetchingHeaders) {
217-
log.debug("Waiting for bodies. Add only headers")
218-
val newState = state.appendHeaders(List(block.header))
219-
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
220-
fetchBlocks(newState)
221-
// we're far from top
222-
} else if (newBlockNr > nextExpectedBlock) {
223-
log.debug("Far from top")
224-
val newState = state.withKnownTopAt(newBlockNr)
206+
} else {
207+
log.debug("Ignoring received block as it doesn't match local state or fetch side is not on top")
208+
val newState = state.withPossibleNewTopAt(block.number)
225209
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
226210
fetchBlocks(newState)
227211
}
@@ -236,17 +220,17 @@ class BlockFetcher(
236220
//ex. After a successful handshake, fetcher will receive the info about the header of the peer best block
237221
case MessageFromPeer(BlockHeaders(headers), _) =>
238222
headers.lastOption.map { bh =>
239-
log.debug(s"Candidate for new top at block ${bh.number}, current know top ${state.knownTop}")
223+
log.debug(s"Candidate for new top at block ${bh.number}, current known top ${state.knownTop}")
240224
val newState = state.withPossibleNewTopAt(bh.number)
241225
fetchBlocks(newState)
242226
}
243227
//keep fetcher state updated in case new checkpoint block or mined block was imported
244-
case InternalLastBlockImport(blockNr) => {
228+
case InternalLastBlockImport(blockNr) =>
245229
log.debug(s"New last block $blockNr imported from the inside")
246230
val newLastBlock = blockNr.max(state.lastBlock)
247231
val newState = state.withLastBlock(newLastBlock).withPossibleNewTopAt(blockNr)
232+
248233
fetchBlocks(newState)
249-
}
250234
}
251235

252236
private def handlePickedBlocks(
@@ -277,7 +261,7 @@ class BlockFetcher(
277261
.getOrElse(fetcherState)
278262

279263
private def fetchHeaders(state: BlockFetcherState): Unit = {
280-
val blockNr = state.nextToLastBlock
264+
val blockNr = state.nextBlockToFetch
281265
val amount = syncConfig.blockHeadersPerRequest
282266

283267
fetchHeadersFrom(blockNr, amount) pipeTo self

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala

Lines changed: 52 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@ package io.iohk.ethereum.blockchain.sync.regular
33
import akka.actor.ActorRef
44
import akka.util.ByteString
55
import cats.data.NonEmptyList
6-
import io.iohk.ethereum.domain.{Block, BlockHeader, BlockBody, HeadersSeq}
7-
import io.iohk.ethereum.network.{Peer, PeerId}
6+
import cats.implicits._
7+
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState._
8+
import io.iohk.ethereum.domain.{Block, BlockBody, BlockHeader, HeadersSeq}
9+
import io.iohk.ethereum.network.PeerId
810
import io.iohk.ethereum.network.p2p.messages.PV62.BlockHash
9-
import BlockFetcherState._
10-
import cats.syntax.either._
11-
import cats.syntax.option._
1211

1312
import scala.collection.immutable.Queue
1413

@@ -52,7 +51,7 @@ case class BlockFetcherState(
5251

5352
def hasFetchedTopHeader: Boolean = lastBlock == knownTop
5453

55-
def isOnTop: Boolean = !isFetching && hasFetchedTopHeader && hasEmptyBuffer
54+
def isOnTop: Boolean = hasFetchedTopHeader && hasEmptyBuffer
5655

5756
def hasReachedSize(size: Int): Boolean = (readyBlocks.size + waitingHeaders.size) >= size
5857

@@ -68,26 +67,43 @@ case class BlockFetcherState(
6867
.orElse(waitingHeaders.headOption.map(_.number))
6968
.getOrElse(lastBlock)
7069

71-
def nextToLastBlock: BigInt = lastBlock + 1
70+
/**
71+
* Next block number to be fetched, calculated in a way to maintain local queues consistency,
72+
* even if `lastBlock` property is much higher - it's more important to have this consistency
73+
* here and allow standard rollback/reorganization mechanisms to kick in if we get too far with mining,
74+
* therefore `lastBlock` is used here only if blocks and headers queues are empty
75+
*/
76+
def nextBlockToFetch: BigInt = waitingHeaders.lastOption
77+
.map(_.number)
78+
.orElse(readyBlocks.lastOption.map(_.number))
79+
.getOrElse(lastBlock) + 1
7280

7381
def takeHashes(amount: Int): Seq[ByteString] = waitingHeaders.take(amount).map(_.hash)
7482

75-
def appendHeaders(headers: Seq[BlockHeader]): BlockFetcherState =
76-
withPossibleNewTopAt(headers.lastOption.map(_.number))
77-
.copy(
78-
waitingHeaders = waitingHeaders ++ headers.filter(_.number > lastBlock).sortBy(_.number),
79-
lastBlock = HeadersSeq.lastNumber(headers).getOrElse(lastBlock)
80-
)
83+
def appendHeaders(headers: Seq[BlockHeader]): Either[String, BlockFetcherState] =
84+
validatedHeaders(headers.sortBy(_.number)).map(validHeaders => {
85+
val lastNumber = HeadersSeq.lastNumber(validHeaders)
86+
withPossibleNewTopAt(lastNumber)
87+
.copy(
88+
waitingHeaders = waitingHeaders ++ validHeaders,
89+
lastBlock = lastNumber.getOrElse(lastBlock)
90+
)
91+
})
8192

82-
def validatedHeaders(headers: Seq[BlockHeader]): Either[String, Seq[BlockHeader]] =
93+
/**
94+
* Validates received headers consistency and their compatibilty with the state
95+
* TODO ETCM-370: This needs to be more fine-grained and detailed so blacklisting can be re-enabled
96+
*/
97+
private def validatedHeaders(headers: Seq[BlockHeader]): Either[String, Seq[BlockHeader]] =
8398
if (headers.isEmpty) {
8499
Right(headers)
85100
} else {
86101
headers
87102
.asRight[String]
88-
.ensure("Given headers are not sequence with already fetched ones")(_.head.number <= nextToLastBlock)
89-
.ensure("Given headers aren't better than already fetched ones")(_.last.number > lastBlock)
90103
.ensure("Given headers should form a sequence without gaps")(HeadersSeq.areChain)
104+
.ensure("Given headers do not form a chain with already stored ones")(headers =>
105+
(waitingHeaders.lastOption, headers.headOption).mapN(_ isParentOf _).getOrElse(true)
106+
)
91107
}
92108

93109
def validateNewBlockHashes(hashes: Seq[BlockHash]): Either[String, Seq[BlockHash]] =
@@ -100,15 +116,26 @@ case class BlockFetcherState(
100116
}
101117
)
102118

103-
def addBodies(peer: Peer, bodies: Seq[BlockBody]): BlockFetcherState = {
104-
val (matching, waiting) = waitingHeaders.splitAt(bodies.length)
105-
val blocks = matching.zip(bodies).map((Block.apply _).tupled)
106-
107-
withPeerForBlocks(peer.id, blocks.map(_.header.number))
108-
.copy(
109-
readyBlocks = readyBlocks.enqueue(blocks),
110-
waitingHeaders = waiting
111-
)
119+
/**
120+
* Matches bodies with headers in queue and adding matched bodies to the blocks.
121+
* If bodies is empty collection - headers in queue are removed as the cause is:
122+
* - the headers are from rejected fork and therefore it won't be possible to resolve bodies for them
123+
* - given peer is still syncing (quite unlikely due to preference of peers with best total difficulty
124+
* when making a request)
125+
*/
126+
def addBodies(peerId: PeerId, bodies: Seq[BlockBody]): BlockFetcherState = {
127+
if (bodies.isEmpty) {
128+
copy(waitingHeaders = Queue.empty)
129+
} else {
130+
val (matching, waiting) = waitingHeaders.splitAt(bodies.length)
131+
val blocks = matching.zip(bodies).map((Block.apply _).tupled)
132+
133+
withPeerForBlocks(peerId, blocks.map(_.header.number))
134+
.copy(
135+
readyBlocks = readyBlocks.enqueue(blocks),
136+
waitingHeaders = waiting
137+
)
138+
}
112139
}
113140

114141
def appendNewBlock(block: Block, fromPeer: PeerId): BlockFetcherState =

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,13 @@ class BlockImporter(
7070
SignedTransaction.retrieveSendersInBackGround(blocks.toList.map(_.body))
7171
importBlocks(blocks)(state)
7272

73+
//TODO ETCM-389: Handle mined, checkpoint and new blocks uniformly
7374
case MinedBlock(block) =>
7475
if (!state.importing) {
7576
importMinedBlock(block, state)
7677
}
7778

79+
//TODO ETCM-389: Handle mined, checkpoint and new blocks uniformly
7880
case nc @ NewCheckpoint(parentHash, signatures) =>
7981
if (state.importing) {
8082
//We don't want to lose a checkpoint

src/main/scala/io/iohk/ethereum/domain/Block.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ case class Block(header: BlockHeader, body: BlockBody) {
2323

2424
val hasCheckpoint: Boolean = header.hasCheckpoint
2525

26-
def isParentOf(child: Block): Boolean = number + 1 == child.number && child.header.parentHash == hash
26+
def isParentOf(child: Block): Boolean = header.isParentOf(child.header)
2727
}
2828

2929
object Block {

src/main/scala/io/iohk/ethereum/domain/BlockHeader.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ case class BlockHeader(
4747

4848
val hasCheckpoint: Boolean = checkpoint.isDefined
4949

50+
def isParentOf(child: BlockHeader): Boolean = number + 1 == child.number && child.parentHash == hash
51+
5052
override def toString: String = {
5153
val (treasuryOptOutString: String, checkpointString: String) = extraFields match {
5254
case HefPostEcip1097(definedOptOut, maybeCheckpoint) =>

src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
package io.iohk.ethereum.blockchain.sync.regular
22

33
import java.net.InetSocketAddress
4-
54
import akka.actor.ActorSystem
65
import akka.testkit.{TestKit, TestProbe}
76
import com.miguno.akka.testing.VirtualTime
87
import io.iohk.ethereum.BlockHelpers
98
import io.iohk.ethereum.Fixtures.{Blocks => FixtureBlocks}
109
import io.iohk.ethereum.blockchain.sync.PeersClient.BlacklistPeer
11-
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.InvalidateBlocksFrom
10+
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.{InternalLastBlockImport, InvalidateBlocksFrom, PickBlocks}
1211
import io.iohk.ethereum.blockchain.sync.{PeersClient, TestSyncConfig}
13-
import io.iohk.ethereum.domain.{Block, ChainWeight}
12+
import io.iohk.ethereum.domain.{Block, ChainWeight, HeadersSeq}
1413
import io.iohk.ethereum.network.Peer
1514
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
1615
import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe}
@@ -124,6 +123,78 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w
124123
peersClient.expectMsgClass(classOf[BlacklistPeer])
125124
peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () }
126125
}
126+
127+
"should ensure blocks passed to importer are always forming chain" in new TestSetup {
128+
startFetcher()
129+
130+
triggerFetching()
131+
132+
val firstBlocksBatch = BlockHelpers.generateChain(syncConfig.blockHeadersPerRequest, FixtureBlocks.Genesis.block)
133+
val secondBlocksBatch = BlockHelpers.generateChain(syncConfig.blockHeadersPerRequest, firstBlocksBatch.last)
134+
val alternativeSecondBlocksBatch =
135+
BlockHelpers.generateChain(syncConfig.blockHeadersPerRequest, firstBlocksBatch.last)
136+
137+
// Fetcher requests for headers
138+
val firstGetBlockHeadersRequest =
139+
GetBlockHeaders(Left(1), syncConfig.blockHeadersPerRequest, skip = 0, reverse = false)
140+
peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () }
141+
142+
// Respond first headers request
143+
val firstGetBlockHeadersResponse = BlockHeaders(firstBlocksBatch.map(_.header))
144+
peersClient.reply(PeersClient.Response(fakePeer, firstGetBlockHeadersResponse))
145+
146+
// Second headers request with response pending
147+
val secondGetBlockHeadersRequest = GetBlockHeaders(
148+
Left(secondBlocksBatch.head.number),
149+
syncConfig.blockHeadersPerRequest,
150+
skip = 0,
151+
reverse = false
152+
)
153+
// Save the reference to respond to the ask pattern on fetcher
154+
val refForAnswerSecondHeaderReq = peersClient.expectMsgPF() {
155+
case PeersClient.Request(msg, _, _) if msg == secondGetBlockHeadersRequest => peersClient.lastSender
156+
}
157+
158+
// First bodies request
159+
val firstGetBlockBodiesRequest = GetBlockBodies(firstBlocksBatch.map(_.hash))
160+
val refForAnswerFirstBodiesReq = peersClient.expectMsgPF() {
161+
case PeersClient.Request(msg, _, _) if msg == firstGetBlockBodiesRequest => peersClient.lastSender
162+
}
163+
164+
// Block 16 is mined (we could have reached this stage due to invalidation messages sent to the fetcher)
165+
val minedBlock = alternativeSecondBlocksBatch.drop(5).head
166+
val minedBlockNumber = minedBlock.number
167+
blockFetcher ! InternalLastBlockImport(minedBlockNumber)
168+
169+
// Answer pending requests: first block bodies request + second block headers request
170+
val secondGetBlockHeadersResponse = BlockHeaders(secondBlocksBatch.map(_.header))
171+
peersClient.send(refForAnswerSecondHeaderReq, PeersClient.Response(fakePeer, secondGetBlockHeadersResponse))
172+
173+
val firstGetBlockBodiesResponse = BlockBodies(firstBlocksBatch.map(_.body))
174+
peersClient.send(refForAnswerFirstBodiesReq, PeersClient.Response(fakePeer, firstGetBlockBodiesResponse))
175+
176+
// Third headers request with response pending
177+
peersClient.expectMsgPF() { case PeersClient.Request(GetBlockHeaders(_, _, _, _), _, _) =>
178+
peersClient.lastSender
179+
}
180+
181+
// Second bodies request
182+
val refForAnswerSecondBodiesReq = peersClient.expectMsgPF() { case PeersClient.Request(GetBlockBodies(_), _, _) =>
183+
peersClient.lastSender
184+
}
185+
peersClient.send(
186+
refForAnswerSecondBodiesReq,
187+
PeersClient.Response(fakePeer, BlockBodies(alternativeSecondBlocksBatch.drop(6).map(_.body)))
188+
)
189+
190+
importer.send(blockFetcher, PickBlocks(syncConfig.blocksBatchSize))
191+
importer.ignoreMsg({ case BlockImporter.NotOnTop => true })
192+
importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) =>
193+
val headers = blocks.map(_.header).toList
194+
195+
assert(HeadersSeq.areChain(headers))
196+
}
197+
}
127198
}
128199

129200
trait TestSetup extends TestSyncConfig {

src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherStateSpec.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package io.iohk.ethereum.blockchain.sync.regular
22

33
import akka.actor.ActorSystem
44
import akka.testkit.{TestKit, TestProbe}
5-
import io.iohk.ethereum.domain.Block
5+
import io.iohk.ethereum.BlockHelpers
66
import io.iohk.ethereum.Fixtures.Blocks.ValidBlock
7+
import io.iohk.ethereum.domain.Block
78
import io.iohk.ethereum.network.PeerId
89
import org.scalatest.matchers.should.Matchers
910
import org.scalatest.wordspec.AnyWordSpecLike
1011

12+
import scala.collection.immutable.Queue
13+
1114
class BlockFetcherStateSpec extends TestKit(ActorSystem()) with AnyWordSpecLike with Matchers {
1215
"BlockFetcherState" when {
1316
"invalidating blocks" should {
@@ -33,5 +36,20 @@ class BlockFetcherStateSpec extends TestKit(ActorSystem()) with AnyWordSpecLike
3336
newState.knownTop shouldEqual newBestBlock.number
3437
}
3538
}
39+
40+
"handling new block bodies" should {
41+
"clear headers queue if got empty list of bodies" in {
42+
val headers = BlockHelpers.generateChain(5, BlockHelpers.genesis).map(_.header)
43+
val peer = PeerId("foo")
44+
45+
val result = BlockFetcherState
46+
.initial(TestProbe().ref, 0)
47+
.appendHeaders(headers)
48+
.map(_.addBodies(peer, List()))
49+
.map(_.waitingHeaders)
50+
51+
assert(result === Right(Queue.empty))
52+
}
53+
}
3654
}
3755
}

0 commit comments

Comments
 (0)