Skip to content

Commit 0c71597

Browse files
committed
ETCM-370: Refine responses validation on block fetcher side and base blacklisting on that
1 parent aafb637 commit 0c71597

File tree

6 files changed

+122
-24
lines changed

6 files changed

+122
-24
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,16 @@ object Blacklist {
163163
val code: Int = 27
164164
val name: String = "UnrequestedBodies"
165165
}
166-
case object RegularSyncRequestFailedType extends BlacklistReasonType with RegularSyncBlacklistGroup {
166+
case object UnrequestedHeadersType extends BlacklistReasonType with RegularSyncBlacklistGroup {
167167
val code: Int = 28
168+
val name: String = "UnrequestedHeaders"
169+
}
170+
case object RegularSyncRequestFailedType extends BlacklistReasonType with RegularSyncBlacklistGroup {
171+
val code: Int = 29
168172
val name: String = "RegularSyncRequestFailed"
169173
}
170174
case object BlockImportErrorType extends BlacklistReasonType with RegularSyncBlacklistGroup {
171-
val code: Int = 29
175+
val code: Int = 30
172176
val name: String = "BlockImportError"
173177
}
174178
}
@@ -281,6 +285,10 @@ object Blacklist {
281285
val reasonType: BlacklistReasonType = UnrequestedBodiesType
282286
val description: String = "Received unrequested bodies"
283287
}
288+
case object UnrequestedHeaders extends BlacklistReason {
289+
val reasonType: BlacklistReasonType = UnrequestedHeadersType
290+
val description: String = "Received unrequested headers"
291+
}
284292
final case class RegularSyncRequestFailed(error: String) extends BlacklistReason {
285293
val reasonType: BlacklistReasonType = RegularSyncRequestFailedType
286294
val description: String = s"Request failed with error: $error"

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@ import cats.instances.option._
99
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason
1010
import io.iohk.ethereum.consensus.validators.BlockValidator
1111
import io.iohk.ethereum.blockchain.sync.PeersClient._
12-
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
13-
AwaitingBodiesToBeIgnored,
14-
AwaitingHeadersToBeIgnored
15-
}
12+
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{AwaitingBodiesToBeIgnored, AwaitingHeadersToBeIgnored, HeadersNotFormingSeq, HeadersNotMatchingReadyBlocks, HeadersNotMatchingWaitingHeaders}
1613
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop}
1714
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressProtocol
1815
import io.iohk.ethereum.domain._
@@ -130,7 +127,7 @@ class BlockFetcher(
130127
blockProvider.foreach(peersClient ! BlacklistPeer(_, BlacklistReason.BlockImportError(reason)))
131128
fetchBlocks(newState)
132129

133-
case ReceivedHeaders(headers) if state.isFetchingHeaders =>
130+
case ReceivedHeaders(peer, headers) if state.isFetchingHeaders =>
134131
//First successful fetch
135132
if (state.waitingHeaders.isEmpty) {
136133
supervisor ! ProgressProtocol.StartedFetching
@@ -146,6 +143,14 @@ class BlockFetcher(
146143
} else {
147144
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))
148145
state.appendHeaders(headers) match {
146+
case Left(HeadersNotFormingSeq.description) =>
147+
log.info("Dismissed received headers due to: {}", HeadersNotFormingSeq.description)
148+
peersClient ! BlacklistPeer(peer.id, BlacklistReason.UnrequestedHeaders)
149+
state.withHeaderFetchReceived
150+
case Left(HeadersNotMatchingReadyBlocks.description) =>
151+
log.info("Dismissed received headers due to: {}", HeadersNotMatchingReadyBlocks.description)
152+
peersClient ! BlacklistPeer(peer.id, BlacklistReason.UnrequestedHeaders)
153+
state.withHeaderFetchReceived
149154
case Left(err) =>
150155
log.info("Dismissed received headers due to: {}", err)
151156
state.withHeaderFetchReceived
@@ -154,6 +159,10 @@ class BlockFetcher(
154159
}
155160
}
156161
fetchBlocks(newState)
162+
case ReceivedHeaders(peer, _) if !state.isFetchingHeaders =>
163+
peersClient ! BlacklistPeer(peer.id, BlacklistReason.UnrequestedHeaders)
164+
Behaviors.same
165+
157166
case RetryHeadersRequest if state.isFetchingHeaders =>
158167
log.debug("Something failed on a headers request, cancelling the request and re-fetching")
159168
fetchBlocks(state.withHeaderFetchReceived)
@@ -177,6 +186,10 @@ class BlockFetcher(
177186
fetchBlocks(newState)
178187
}
179188

189+
case ReceivedBodies(peer, _) if !state.isFetchingBodies =>
190+
peersClient ! BlacklistPeer(peer.id, BlacklistReason.UnrequestedBodies)
191+
Behaviors.same
192+
180193
case RetryBodiesRequest if state.isFetchingBodies =>
181194
log.debug("Something failed on a bodies request, cancelling the request and re-fetching")
182195
fetchBlocks(state.withBodiesFetchReceived)
@@ -332,7 +345,7 @@ object BlockFetcher {
332345
final case object RetryBodiesRequest extends FetchCommand
333346
final case object RetryHeadersRequest extends FetchCommand
334347
final case class AdaptedMessageFromEventBus(message: Message, peerId: PeerId) extends FetchCommand
335-
final case class ReceivedHeaders(headers: Seq[BlockHeader]) extends FetchCommand
348+
final case class ReceivedHeaders(peer: Peer, headers: Seq[BlockHeader]) extends FetchCommand
336349
final case class ReceivedBodies(peer: Peer, bodies: Seq[BlockBody]) extends FetchCommand
337350

338351
sealed trait FetchResponse

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import io.iohk.ethereum.consensus.validators.BlockValidator
1010
import io.iohk.ethereum.domain.{Block, BlockBody, BlockHeader, HeadersSeq}
1111
import io.iohk.ethereum.network.PeerId
1212
import io.iohk.ethereum.network.p2p.messages.PV62.BlockHash
13-
import io.iohk.ethereum.utils.ByteStringUtils
1413

1514
import scala.annotation.tailrec
1615
import scala.collection.immutable.Queue
@@ -86,7 +85,6 @@ case class BlockFetcherState(
8685

8786
/**
8887
* Validates received headers consistency and their compatibility with the state
89-
* TODO ETCM-370: This needs to be more fine-grained and detailed so blacklisting can be re-enabled
9088
*/
9189
private def validatedHeaders(headers: Seq[BlockHeader]): Either[String, Seq[BlockHeader]] =
9290
if (headers.isEmpty) {
@@ -96,7 +94,7 @@ case class BlockFetcherState(
9694
.asRight[String]
9795
.ensure("Given headers should form a sequence without gaps")(HeadersSeq.areChain)
9896
.ensure("Given headers should form a sequence with ready blocks")(checkConsistencyWithReadyBlocks)
99-
.ensure("Given headers do not form a chain with already stored ones")(headers =>
97+
.ensure("Given headers should form a chain with waiting headers")(headers =>
10098
(waitingHeaders.lastOption, headers.headOption).mapN(_ isParentOf _).getOrElse(true)
10199
)
102100
}
@@ -319,4 +317,17 @@ object BlockFetcherState {
319317
* State used to keep track of pending request to prevent multiple requests in parallel
320318
*/
321319
case object AwaitingBodiesToBeIgnored extends FetchingBodiesState
320+
321+
trait ValidationErrors {
322+
def description: String
323+
}
324+
case object HeadersNotFormingSeq extends ValidationErrors {
325+
val description = "Given headers should form a sequence without gaps"
326+
}
327+
case object HeadersNotMatchingReadyBlocks extends ValidationErrors {
328+
val description = "Given headers should form a sequence with ready blocks"
329+
}
330+
case object HeadersNotMatchingWaitingHeaders extends ValidationErrors {
331+
val description = "Given headers should form a chain with waiting headers"
332+
}
322333
}

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/HeadersFetcher.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ class HeadersFetcher(
3636
log.debug("Start fetching headers from block {}", blockNumber)
3737
requestHeaders(blockNumber, amount)
3838
Behaviors.same
39-
case AdaptedMessage(_, BlockHeaders(headers)) =>
39+
case AdaptedMessage(peer, BlockHeaders(headers)) =>
4040
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))
41-
supervisor ! BlockFetcher.ReceivedHeaders(headers)
41+
supervisor ! BlockFetcher.ReceivedHeaders(peer, headers)
4242
Behaviors.same
4343
case HeadersFetcher.RetryHeadersRequest =>
4444
supervisor ! BlockFetcher.RetryHeadersRequest

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package io.iohk.ethereum.blockchain.sync.regular
22

33
import akka.actor.{Actor, ActorLogging, ActorRef, AllForOneStrategy, Cancellable, Props, Scheduler, SupervisorStrategy}
44
import akka.actor.typed.{ActorRef => TypedActorRef}
5-
import io.iohk.ethereum.blockchain.sync.SyncProtocol
65
import io.iohk.ethereum.blockchain.sync.{Blacklist, SyncProtocol}
76
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status
87
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress

src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncSpec.scala

Lines changed: 77 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.iohk.ethereum.blockchain.sync.regular
22

3-
import akka.actor.{ActorRef, ActorSystem}
3+
import akka.actor.{ActorRef, ActorSystem, PoisonPill}
4+
import akka.actor.typed.{ActorRef => TypedActorRef}
45
import akka.testkit.TestActor.AutoPilot
56
import akka.testkit.TestKit
67
import akka.util.ByteString
@@ -10,6 +11,8 @@ import cats.syntax.traverse._
1011
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason
1112
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status
1213
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress
14+
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.{FetchCommand, PrintStatus, ReceivedHeaders}
15+
import io.iohk.ethereum.blockchain.sync.regular.HeadersFetcher.HeadersFetcherCommand
1316
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.NewCheckpoint
1417
import io.iohk.ethereum.blockchain.sync.{PeersClient, SyncProtocol}
1518
import io.iohk.ethereum.crypto.kec256
@@ -37,6 +40,7 @@ import org.scalatest.{Assertion, BeforeAndAfterEach}
3740

3841
import scala.concurrent.duration._
3942
import scala.concurrent.{Await, Future, Promise}
43+
import scala.math.BigInt
4044

4145
class RegularSyncSpec
4246
extends WordSpecBase
@@ -112,34 +116,97 @@ class RegularSyncSpec
112116
peersClient.expectMsg(PeersClient.BlacklistPeer(defaultPeer.id, BlacklistReason.RegularSyncRequestFailed("a random reason")))
113117
})
114118

115-
//TODO: To be re-enabled with ETCM-370
116-
"blacklist peer which returns headers starting from one with higher number than expected" ignore sync(
117-
new Fixture(
118-
testSystem
119-
) {
119+
"blacklist peer which returns headers starting from one with higher number than expected" in sync(
120+
new Fixture(testSystem) {
121+
var blockFetcher: ActorRef = _
122+
120123
regularSync ! SyncProtocol.Start
124+
peerEventBus.expectMsgClass(classOf[Subscribe])
125+
blockFetcher = peerEventBus.sender()
121126

122127
peersClient.expectMsgEq(blockHeadersChunkRequest(0))
123-
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked(1).headers)))
128+
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked.head.headers)))
129+
130+
val getBodies: PeersClient.Request[GetBlockBodies] = PeersClient.Request.create(
131+
GetBlockBodies(testBlocksChunked.head.headers.map(_.hash)),
132+
PeersClient.BestPeer
133+
)
134+
peersClient.expectMsgEq(getBodies)
135+
peersClient.reply(PeersClient.Response(defaultPeer, BlockBodies(testBlocksChunked.head.bodies)))
136+
137+
blockFetcher ! MessageFromPeer(NewBlock(testBlocks.last, ChainWeight.totalDifficultyOnly(testBlocks.last.number)), defaultPeer.id)
138+
peersClient.expectMsgEq(blockHeadersChunkRequest(1))
139+
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked(5).headers)))
124140
peersClient.expectMsgPF() {
125141
case PeersClient.BlacklistPeer(id, _) if id == defaultPeer.id => true
126142
}
127143
}
128144
)
129145

130-
//TODO: To be re-enabled with ETCM-370
131-
"blacklist peer which returns headers not forming a chain" ignore sync(new Fixture(testSystem) {
146+
"blacklist peer which returns headers not forming a chain" in sync(new Fixture(testSystem) {
132147
regularSync ! SyncProtocol.Start
133148

134149
peersClient.expectMsgEq(blockHeadersChunkRequest(0))
135150
peersClient.reply(
136-
PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked.head.headers.filter(_.number % 2 == 0)))
151+
PeersClient.Response(defaultPeer, BlockHeaders(testBlocks.headers.filter(_.number % 2 == 0)))
137152
)
138153
peersClient.expectMsgPF() {
139154
case PeersClient.BlacklistPeer(id, _) if id == defaultPeer.id => true
140155
}
141156
})
142157

158+
"blacklist peer which sends headers that were not requested" ignore sync(new Fixture(testSystem) {
159+
import akka.actor.typed.scaladsl.adapter._
160+
var blockFetcherAdapter: TypedActorRef[MessageFromPeer] = _
161+
var blockFetcher: TypedActorRef[FetchCommand] = _
162+
// var blockFetcher: ActorRef = _
163+
164+
regularSync ! SyncProtocol.Start
165+
peerEventBus.expectMsgClass(classOf[Subscribe])
166+
blockFetcherAdapter = peerEventBus.sender()
167+
168+
peersClient.expectMsgEq(blockHeadersChunkRequest(0))
169+
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked.head.headers)))
170+
171+
val getBodies: PeersClient.Request[GetBlockBodies] = PeersClient.Request.create(
172+
GetBlockBodies(testBlocksChunked.head.headers.map(_.hash)),
173+
PeersClient.BestPeer
174+
)
175+
val getBodies2: PeersClient.Request[GetBlockBodies] = PeersClient.Request.create(
176+
GetBlockBodies(testBlocksChunked(1).headers.map(_.hash)),
177+
PeersClient.BestPeer
178+
)
179+
peersClient.expectMsgEq(getBodies)
180+
peersClient.reply(PeersClient.Response(defaultPeer, BlockBodies(testBlocksChunked.head.bodies)))
181+
182+
blockFetcherAdapter ! MessageFromPeer(NewBlock(testBlocks(3), ChainWeight.totalDifficultyOnly(testBlocks(3).number)), defaultPeer.id)
183+
peersClient.expectMsgEq(blockHeadersChunkRequest(1))
184+
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked(2).headers)))
185+
186+
peersClient.expectMsgClass(classOf[PeersClient.BlacklistPeer])
187+
188+
blockFetcher = peersClient.sender()
189+
190+
println("============ "+ blockFetcher)
191+
println("------------ "+ blockFetcherAdapter)
192+
blockFetcher ! PrintStatus
193+
194+
val notBlacklistedPeer = peerByNumber(1)
195+
peersClient.expectMsgEq(blockHeadersChunkRequest(1))
196+
peersClient.reply(PeersClient.Response(notBlacklistedPeer, BlockHeaders(testBlocksChunked(1).headers)))
197+
peersClient.expectMsgEq(getBodies2)
198+
peersClient.reply(PeersClient.Response(notBlacklistedPeer, BlockBodies(testBlocksChunked(1).bodies)))
199+
200+
peersClient.expectNoMessage()
201+
202+
blockFetcher ! BlockFetcher.ReceivedHeaders(notBlacklistedPeer, testBlocksChunked(3).headers)
203+
blockFetcher ! BlockFetcher.ReceivedBodies(notBlacklistedPeer, testBlocksChunked(3).bodies)
204+
205+
peersClient.expectMsgPF() {
206+
case PeersClient.BlacklistPeer(id, _) if id == notBlacklistedPeer.id => true
207+
}
208+
})
209+
143210
"wait for time defined in config until issuing a retry request due to no suitable peer" in sync(
144211
new Fixture(
145212
testSystem

0 commit comments

Comments
 (0)