Skip to content

Commit ae398e1

Browse files
author
Nicolás Tallar
authored
[ETCM-211] Tracking of headers/bodies requests that will be ignored due to invalidation (#749)
1 parent 1667f33 commit ae398e1

File tree

4 files changed

+291
-31
lines changed

4 files changed

+291
-31
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ import cats.instances.future._
99
import cats.instances.option._
1010
import cats.syntax.either._
1111
import io.iohk.ethereum.blockchain.sync.PeersClient._
12+
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
13+
AwaitingBodiesToBeIgnored,
14+
AwaitingHeadersToBeIgnored
15+
}
1216
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop}
1317
import io.iohk.ethereum.crypto.kec256
1418
import io.iohk.ethereum.domain._
@@ -91,27 +95,51 @@ class BlockFetcher(
9195

9296
private def handleHeadersMessages(state: BlockFetcherState): Receive = {
9397
case Response(peer, BlockHeaders(headers)) if state.isFetchingHeaders =>
94-
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))
95-
96-
val newState = state.validatedHeaders(headers) match {
97-
case Left(err) =>
98-
peersClient ! BlacklistPeer(peer.id, err)
99-
state.fetchingHeaders(false)
100-
case Right(validHeaders) =>
101-
state.appendHeaders(validHeaders)
102-
}
98+
val newState =
99+
if (state.fetchingHeadersState == AwaitingHeadersToBeIgnored) {
100+
log.debug(
101+
"Received {} headers starting from block {} that will be ignored",
102+
headers.size,
103+
headers.headOption.map(_.number)
104+
)
105+
state.withHeaderFetchReceived
106+
} else {
107+
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))
108+
109+
state.validatedHeaders(headers) match {
110+
case Left(err) =>
111+
peersClient ! BlacklistPeer(peer.id, err)
112+
state.withHeaderFetchReceived
113+
case Right(validHeaders) =>
114+
state.withHeaderFetchReceived.appendHeaders(validHeaders)
115+
}
116+
}
103117

104118
fetchBlocks(newState)
105119
case RetryHeadersRequest if state.isFetchingHeaders =>
106-
log.debug("Retrying request for headers")
107-
fetchHeaders(state)
120+
log.debug("Time-out occurred while waiting for headers")
121+
122+
val newState = state.withHeaderFetchReceived
123+
fetchBlocks(newState)
108124
}
109125

110126
private def handleBodiesMessages(state: BlockFetcherState): Receive = {
111127
case Response(peer, BlockBodies(bodies)) if state.isFetchingBodies =>
112-
log.debug("Fetched {} block bodies", bodies.size)
113-
state.addBodies(peer, bodies) |> fetchBlocks
114-
case RetryBodiesRequest if state.isFetchingBodies => fetchBodies(state)
128+
val newState =
129+
if (state.fetchingBodiesState == AwaitingBodiesToBeIgnored) {
130+
log.debug("Received {} block bodies that will be ignored", bodies.size)
131+
state.withBodiesFetchReceived
132+
} else {
133+
log.debug("Fetched {} block bodies", bodies.size)
134+
state.withBodiesFetchReceived.addBodies(peer, bodies)
135+
}
136+
137+
fetchBlocks(newState)
138+
case RetryBodiesRequest if state.isFetchingBodies =>
139+
log.debug("Time-out occurred while waiting for bodies")
140+
141+
val newState = state.withBodiesFetchReceived
142+
fetchBlocks(newState)
115143
}
116144

117145
private def handleStateNodeMessages(state: BlockFetcherState): Receive = {
@@ -207,7 +235,7 @@ class BlockFetcher(
207235
.filter(!_.hasFetchedTopHeader)
208236
.filter(!_.hasReachedSize(syncConfig.maxFetcherQueueSize))
209237
.tap(fetchHeaders)
210-
.map(_.fetchingHeaders(true))
238+
.map(_.withNewHeadersFetch)
211239
.getOrElse(fetcherState)
212240

213241
private def fetchHeaders(state: BlockFetcherState): Unit = {
@@ -229,10 +257,12 @@ class BlockFetcher(
229257
.filter(!_.isFetchingBodies)
230258
.filter(_.waitingHeaders.nonEmpty)
231259
.tap(fetchBodies)
232-
.map(state => state.fetchingBodies(true))
260+
.map(state => state.withNewBodiesFetch)
233261
.getOrElse(fetcherState)
234262

235263
private def fetchBodies(state: BlockFetcherState): Unit = {
264+
log.debug("Fetching bodies")
265+
236266
val hashes = state.takeHashes(syncConfig.blockBodiesPerRequest)
237267
requestBlockBodies(hashes) pipeTo self
238268
}

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala

Lines changed: 67 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,32 @@ import cats.syntax.option._
1212

1313
import scala.collection.immutable.Queue
1414

15+
// scalastyle:off number.of.methods
16+
/**
17+
* State used by the BlockFetcher
18+
*
19+
* @param importer the BlockImporter actor reference
20+
* @param readyBlocks
21+
* @param waitingHeaders
22+
* @param fetchingHeadersState the current state of the headers fetching, whether we
23+
* - haven't fetched any yet
24+
* - are awaiting a response
25+
* - are awaiting a response but it should be ignored due to blocks being invalidated
26+
* @param fetchingBodiesState the current state of the bodies fetching, whether we
27+
* - haven't fetched any yet
28+
* - are awaiting a response
29+
* - are awaiting a response but it should be ignored due to blocks being invalidated
30+
* @param stateNodeFetcher
31+
* @param lastBlock
32+
* @param knownTop
33+
* @param blockProviders
34+
*/
1535
case class BlockFetcherState(
1636
importer: ActorRef,
1737
readyBlocks: Queue[Block],
1838
waitingHeaders: Queue[BlockHeader],
19-
isFetchingHeaders: Boolean,
20-
isFetchingBodies: Boolean,
39+
fetchingHeadersState: FetchingHeadersState,
40+
fetchingBodiesState: FetchingBodiesState,
2141
stateNodeFetcher: Option[StateNodeFetcher],
2242
lastBlock: BigInt,
2343
knownTop: BigInt,
@@ -28,7 +48,7 @@ case class BlockFetcherState(
2848

2949
def isFetchingStateNode: Boolean = stateNodeFetcher.isDefined
3050

31-
def hasEmptyBuffer: Boolean = readyBlocks.isEmpty && waitingHeaders.isEmpty
51+
private def hasEmptyBuffer: Boolean = readyBlocks.isEmpty && waitingHeaders.isEmpty
3252

3353
def hasFetchedTopHeader: Boolean = lastBlock == knownTop
3454

@@ -53,8 +73,7 @@ case class BlockFetcherState(
5373
def takeHashes(amount: Int): Seq[ByteString] = waitingHeaders.take(amount).map(_.hash)
5474

5575
def appendHeaders(headers: Seq[BlockHeader]): BlockFetcherState =
56-
fetchingHeaders(false)
57-
.withPossibleNewTopAt(headers.lastOption.map(_.number))
76+
withPossibleNewTopAt(headers.lastOption.map(_.number))
5877
.copy(
5978
waitingHeaders = waitingHeaders ++ headers.filter(_.number > lastBlock).sortBy(_.number),
6079
lastBlock = HeadersSeq.lastNumber(headers).getOrElse(lastBlock)
@@ -86,9 +105,11 @@ case class BlockFetcherState(
86105
val (matching, waiting) = waitingHeaders.splitAt(bodies.length)
87106
val blocks = matching.zip(bodies).map((Block.apply _).tupled)
88107

89-
fetchingBodies(false)
90-
.withPeerForBlocks(peer.id, blocks.map(_.header.number))
91-
.copy(readyBlocks = readyBlocks.enqueue(blocks), waitingHeaders = waiting)
108+
withPeerForBlocks(peer.id, blocks.map(_.header.number))
109+
.copy(
110+
readyBlocks = readyBlocks.enqueue(blocks),
111+
waitingHeaders = waiting
112+
)
92113
}
93114

94115
def appendNewBlock(block: Block, fromPeer: PeerId): BlockFetcherState =
@@ -126,18 +147,25 @@ case class BlockFetcherState(
126147

127148
def invalidateBlocksFrom(nr: BigInt): (Option[PeerId], BlockFetcherState) = invalidateBlocksFrom(nr, Some(nr))
128149

129-
def invalidateBlocksFrom(nr: BigInt, toBlacklist: Option[BigInt]): (Option[PeerId], BlockFetcherState) =
150+
def invalidateBlocksFrom(nr: BigInt, toBlacklist: Option[BigInt]): (Option[PeerId], BlockFetcherState) = {
151+
// We can't start completely from scratch as requests could be in progress, we have to keep special track of them
152+
val newFetchingHeadersState =
153+
if (fetchingHeadersState == AwaitingHeaders) AwaitingHeadersToBeIgnored else fetchingHeadersState
154+
val newFetchingBodiesState =
155+
if (fetchingBodiesState == AwaitingBodies) AwaitingBodiesToBeIgnored else fetchingBodiesState
156+
130157
(
131158
toBlacklist.flatMap(blockProviders.get),
132159
copy(
133160
readyBlocks = Queue(),
134161
waitingHeaders = Queue(),
135162
lastBlock = (nr - 2).max(0),
136-
isFetchingHeaders = false,
137-
isFetchingBodies = false,
163+
fetchingHeadersState = newFetchingHeadersState,
164+
fetchingBodiesState = newFetchingBodiesState,
138165
blockProviders = blockProviders - nr
139166
)
140167
)
168+
}
141169

142170
def withLastBlock(nr: BigInt): BlockFetcherState = copy(lastBlock = nr)
143171

@@ -154,9 +182,13 @@ case class BlockFetcherState(
154182
def withPeerForBlocks(peerId: PeerId, blocks: Seq[BigInt]): BlockFetcherState =
155183
copy(blockProviders = blockProviders ++ blocks.map(block => block -> peerId))
156184

157-
def fetchingHeaders(isFetching: Boolean): BlockFetcherState = copy(isFetchingHeaders = isFetching)
185+
def isFetchingHeaders: Boolean = fetchingHeadersState != NotFetchingHeaders
186+
def withNewHeadersFetch: BlockFetcherState = copy(fetchingHeadersState = AwaitingHeaders)
187+
def withHeaderFetchReceived: BlockFetcherState = copy(fetchingHeadersState = NotFetchingHeaders)
158188

159-
def fetchingBodies(isFetching: Boolean): BlockFetcherState = copy(isFetchingBodies = isFetching)
189+
def isFetchingBodies: Boolean = fetchingBodiesState != NotFetchingBodies
190+
def withNewBodiesFetch: BlockFetcherState = copy(fetchingBodiesState = AwaitingBodies)
191+
def withBodiesFetchReceived: BlockFetcherState = copy(fetchingBodiesState = NotFetchingBodies)
160192

161193
def fetchingStateNode(hash: ByteString, requestor: ActorRef): BlockFetcherState =
162194
copy(stateNodeFetcher = Some(StateNodeFetcher(hash, requestor)))
@@ -188,11 +220,31 @@ object BlockFetcherState {
188220
importer = importer,
189221
readyBlocks = Queue(),
190222
waitingHeaders = Queue(),
191-
isFetchingHeaders = false,
192-
isFetchingBodies = false,
223+
fetchingHeadersState = NotFetchingHeaders,
224+
fetchingBodiesState = NotFetchingBodies,
193225
stateNodeFetcher = None,
194226
lastBlock = lastBlock,
195227
knownTop = lastBlock + 1,
196228
blockProviders = Map()
197229
)
230+
231+
trait FetchingHeadersState
232+
case object NotFetchingHeaders extends FetchingHeadersState
233+
case object AwaitingHeaders extends FetchingHeadersState
234+
235+
/**
236+
* Headers request in progress but will be ignored due to invalidation
237+
* State used to keep track of pending request to prevent multiple requests in parallel
238+
*/
239+
case object AwaitingHeadersToBeIgnored extends FetchingHeadersState
240+
241+
trait FetchingBodiesState
242+
case object NotFetchingBodies extends FetchingBodiesState
243+
case object AwaitingBodies extends FetchingBodiesState
244+
245+
/**
246+
* Bodies request in progress but will be ignored due to invalidation
247+
* State used to keep track of pending request to prevent multiple requests in parallel
248+
*/
249+
case object AwaitingBodiesToBeIgnored extends FetchingBodiesState
198250
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.iohk.ethereum
2+
3+
import akka.util.ByteString
4+
import io.iohk.ethereum.domain.Block
5+
6+
object BlockHelpers {
7+
8+
def generateChain(amount: Int, parent: Block): Seq[Block] = {
9+
(1 to amount).foldLeft[Seq[Block]](Nil){ case (acc, _) =>
10+
val baseBlock = Fixtures.Blocks.ValidBlock.block
11+
12+
val parentHeader = acc.lastOption.getOrElse(parent)
13+
val blockHeader = baseBlock.header.copy(
14+
number = parentHeader.number + 1,
15+
parentHash = parentHeader.hash,
16+
// Random nonce used for having our blocks be different
17+
nonce = ByteString(Math.random().toString)
18+
)
19+
val block = baseBlock.copy(header = blockHeader)
20+
21+
acc :+ block
22+
}
23+
}
24+
25+
}

0 commit comments

Comments
 (0)