Skip to content

Commit 6b3dc24

Browse files
authored
ETCM-370: Refine responses validation on block fetcher side and base blacklisting on that (#993)
Fix tests
1 parent f144be7 commit 6b3dc24

File tree

8 files changed

+142
-29
lines changed

8 files changed

+142
-29
lines changed

src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import io.iohk.ethereum.consensus.{ConsensusConfig, FullConsensusConfig, pow}
2424
import io.iohk.ethereum.crypto
2525
import io.iohk.ethereum.domain._
2626
import io.iohk.ethereum.ledger._
27-
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
2827
import io.iohk.ethereum.network.PeerId
2928
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
3029
import io.iohk.ethereum.nodebuilder.VmSetup

src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,16 @@ object Blacklist {
163163
val code: Int = 27
164164
val name: String = "UnrequestedBodies"
165165
}
166-
case object RegularSyncRequestFailedType extends BlacklistReasonType with RegularSyncBlacklistGroup {
166+
case object UnrequestedHeadersType extends BlacklistReasonType with RegularSyncBlacklistGroup {
167167
val code: Int = 28
168+
val name: String = "UnrequestedHeaders"
169+
}
170+
case object RegularSyncRequestFailedType extends BlacklistReasonType with RegularSyncBlacklistGroup {
171+
val code: Int = 29
168172
val name: String = "RegularSyncRequestFailed"
169173
}
170174
case object BlockImportErrorType extends BlacklistReasonType with RegularSyncBlacklistGroup {
171-
val code: Int = 29
175+
val code: Int = 30
172176
val name: String = "BlockImportError"
173177
}
174178
}
@@ -281,6 +285,10 @@ object Blacklist {
281285
val reasonType: BlacklistReasonType = UnrequestedBodiesType
282286
val description: String = "Received unrequested bodies"
283287
}
288+
case object UnrequestedHeaders extends BlacklistReason {
289+
val reasonType: BlacklistReasonType = UnrequestedHeadersType
290+
val description: String = "Received unrequested headers"
291+
}
284292
final case class RegularSyncRequestFailed(error: String) extends BlacklistReason {
285293
val reasonType: BlacklistReasonType = RegularSyncRequestFailedType
286294
val description: String = s"Request failed with error: $error"

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ import io.iohk.ethereum.consensus.validators.BlockValidator
1111
import io.iohk.ethereum.blockchain.sync.PeersClient._
1212
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
1313
AwaitingBodiesToBeIgnored,
14-
AwaitingHeadersToBeIgnored
14+
AwaitingHeadersToBeIgnored,
15+
HeadersNotFormingSeq,
16+
HeadersNotMatchingReadyBlocks,
17+
HeadersNotMatchingWaitingHeaders
1518
}
1619
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop}
1720
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressProtocol
@@ -130,7 +133,7 @@ class BlockFetcher(
130133
blockProvider.foreach(peersClient ! BlacklistPeer(_, BlacklistReason.BlockImportError(reason)))
131134
fetchBlocks(newState)
132135

133-
case ReceivedHeaders(headers) if state.isFetchingHeaders =>
136+
case ReceivedHeaders(peer, headers) if state.isFetchingHeaders =>
134137
//First successful fetch
135138
if (state.waitingHeaders.isEmpty) {
136139
supervisor ! ProgressProtocol.StartedFetching
@@ -146,6 +149,14 @@ class BlockFetcher(
146149
} else {
147150
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))
148151
state.appendHeaders(headers) match {
152+
case Left(HeadersNotFormingSeq) =>
153+
log.info("Dismissed received headers due to: {}", HeadersNotFormingSeq.description)
154+
peersClient ! BlacklistPeer(peer.id, BlacklistReason.UnrequestedHeaders)
155+
state.withHeaderFetchReceived
156+
case Left(HeadersNotMatchingReadyBlocks) =>
157+
log.info("Dismissed received headers due to: {}", HeadersNotMatchingReadyBlocks.description)
158+
peersClient ! BlacklistPeer(peer.id, BlacklistReason.UnrequestedHeaders)
159+
state.withHeaderFetchReceived
149160
case Left(err) =>
150161
log.info("Dismissed received headers due to: {}", err)
151162
state.withHeaderFetchReceived
@@ -154,6 +165,11 @@ class BlockFetcher(
154165
}
155166
}
156167
fetchBlocks(newState)
168+
169+
case ReceivedHeaders(peer, _) if !state.isFetchingHeaders =>
170+
peersClient ! BlacklistPeer(peer.id, BlacklistReason.UnrequestedHeaders)
171+
Behaviors.same
172+
157173
case RetryHeadersRequest if state.isFetchingHeaders =>
158174
log.debug("Something failed on a headers request, cancelling the request and re-fetching")
159175
fetchBlocks(state.withHeaderFetchReceived)
@@ -177,6 +193,10 @@ class BlockFetcher(
177193
fetchBlocks(newState)
178194
}
179195

196+
case ReceivedBodies(peer, _) if !state.isFetchingBodies =>
197+
peersClient ! BlacklistPeer(peer.id, BlacklistReason.UnrequestedBodies)
198+
Behaviors.same
199+
180200
case RetryBodiesRequest if state.isFetchingBodies =>
181201
log.debug("Something failed on a bodies request, cancelling the request and re-fetching")
182202
fetchBlocks(state.withBodiesFetchReceived)
@@ -332,7 +352,7 @@ object BlockFetcher {
332352
final case object RetryBodiesRequest extends FetchCommand
333353
final case object RetryHeadersRequest extends FetchCommand
334354
final case class AdaptedMessageFromEventBus(message: Message, peerId: PeerId) extends FetchCommand
335-
final case class ReceivedHeaders(headers: Seq[BlockHeader]) extends FetchCommand
355+
final case class ReceivedHeaders(peer: Peer, headers: Seq[BlockHeader]) extends FetchCommand
336356
final case class ReceivedBodies(peer: Peer, bodies: Seq[BlockBody]) extends FetchCommand
337357

338358
sealed trait FetchResponse

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import io.iohk.ethereum.consensus.validators.BlockValidator
1010
import io.iohk.ethereum.domain.{Block, BlockBody, BlockHeader, HeadersSeq}
1111
import io.iohk.ethereum.network.PeerId
1212
import io.iohk.ethereum.network.p2p.messages.PV62.BlockHash
13-
import io.iohk.ethereum.utils.ByteStringUtils
1413

1514
import scala.annotation.tailrec
1615
import scala.collection.immutable.Queue
@@ -75,7 +74,7 @@ case class BlockFetcherState(
7574

7675
def takeHashes(amount: Int): Seq[ByteString] = waitingHeaders.take(amount).map(_.hash)
7776

78-
def appendHeaders(headers: Seq[BlockHeader]): Either[String, BlockFetcherState] =
77+
def appendHeaders(headers: Seq[BlockHeader]): Either[ValidationErrors, BlockFetcherState] =
7978
validatedHeaders(headers.sortBy(_.number)).map(validHeaders => {
8079
val lastNumber = HeadersSeq.lastNumber(validHeaders)
8180
withPossibleNewTopAt(lastNumber)
@@ -86,17 +85,16 @@ case class BlockFetcherState(
8685

8786
/**
8887
* Validates received headers consistency and their compatibility with the state
89-
* TODO ETCM-370: This needs to be more fine-grained and detailed so blacklisting can be re-enabled
9088
*/
91-
private def validatedHeaders(headers: Seq[BlockHeader]): Either[String, Seq[BlockHeader]] =
89+
private def validatedHeaders(headers: Seq[BlockHeader]): Either[ValidationErrors, Seq[BlockHeader]] =
9290
if (headers.isEmpty) {
9391
Right(headers)
9492
} else {
9593
headers
96-
.asRight[String]
97-
.ensure("Given headers should form a sequence without gaps")(HeadersSeq.areChain)
98-
.ensure("Given headers should form a sequence with ready blocks")(checkConsistencyWithReadyBlocks)
99-
.ensure("Given headers do not form a chain with already stored ones")(headers =>
94+
.asRight[ValidationErrors]
95+
.ensure(HeadersNotFormingSeq)(HeadersSeq.areChain)
96+
.ensure(HeadersNotMatchingReadyBlocks)(checkConsistencyWithReadyBlocks)
97+
.ensure(HeadersNotMatchingWaitingHeaders)(headers =>
10098
(waitingHeaders.lastOption, headers.headOption).mapN(_ isParentOf _).getOrElse(true)
10199
)
102100
}
@@ -319,4 +317,17 @@ object BlockFetcherState {
319317
* State used to keep track of pending request to prevent multiple requests in parallel
320318
*/
321319
case object AwaitingBodiesToBeIgnored extends FetchingBodiesState
320+
321+
sealed trait ValidationErrors {
322+
def description: String
323+
}
324+
case object HeadersNotFormingSeq extends ValidationErrors {
325+
val description = "Given headers should form a sequence without gaps"
326+
}
327+
case object HeadersNotMatchingReadyBlocks extends ValidationErrors {
328+
val description = "Given headers should form a sequence with ready blocks"
329+
}
330+
case object HeadersNotMatchingWaitingHeaders extends ValidationErrors {
331+
val description = "Given headers should form a chain with waiting headers"
332+
}
322333
}

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/HeadersFetcher.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ class HeadersFetcher(
3636
log.debug("Start fetching headers from block {}", blockNumber)
3737
requestHeaders(blockNumber, amount)
3838
Behaviors.same
39-
case AdaptedMessage(_, BlockHeaders(headers)) =>
39+
case AdaptedMessage(peer, BlockHeaders(headers)) =>
4040
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))
41-
supervisor ! BlockFetcher.ReceivedHeaders(headers)
41+
supervisor ! BlockFetcher.ReceivedHeaders(peer, headers)
4242
Behaviors.same
4343
case HeadersFetcher.RetryHeadersRequest =>
4444
supervisor ! BlockFetcher.RetryHeadersRequest

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package io.iohk.ethereum.blockchain.sync.regular
22

33
import akka.actor.{Actor, ActorLogging, ActorRef, AllForOneStrategy, Cancellable, Props, Scheduler, SupervisorStrategy}
44
import akka.actor.typed.{ActorRef => TypedActorRef}
5-
import io.iohk.ethereum.blockchain.sync.SyncProtocol
65
import io.iohk.ethereum.blockchain.sync.{Blacklist, SyncProtocol}
76
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status
87
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress

src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherStateSpec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package io.iohk.ethereum.blockchain.sync.regular
33
import akka.actor.ActorSystem
44
import akka.testkit.{TestKit, TestProbe}
55
import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
6+
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.HeadersNotMatchingReadyBlocks
67
import io.iohk.ethereum.{BlockHelpers, WithActorSystemShutDown}
78
import io.iohk.ethereum.network.PeerId
89
import io.iohk.ethereum.utils.ByteStringUtils
@@ -69,7 +70,7 @@ class BlockFetcherStateSpec
6970
.appendHeaders(blocks.map(_.header))
7071
.map(_.handleRequestedBlocks(blocks, peer))
7172

72-
assert(result.map(_.waitingHeaders) === Left("Given headers should form a sequence with ready blocks"))
73+
assert(result.map(_.waitingHeaders) === Left(HeadersNotMatchingReadyBlocks))
7374
}
7475
}
7576
}

src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncSpec.scala

Lines changed: 86 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
package io.iohk.ethereum.blockchain.sync.regular
22

3-
import akka.actor.{ActorRef, ActorSystem}
3+
import akka.actor.{ActorRef, ActorSystem, typed}
4+
import akka.actor.typed.{ActorRef => TypedActorRef}
45
import akka.testkit.TestActor.AutoPilot
5-
import akka.testkit.TestKit
6+
import akka.testkit.{TestKit, TestProbe}
67
import akka.util.ByteString
78
import cats.data.NonEmptyList
89
import cats.effect.Resource
910
import cats.syntax.traverse._
1011
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason
1112
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status
1213
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress
14+
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.Start
1315
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.NewCheckpoint
1416
import io.iohk.ethereum.blockchain.sync.{PeersClient, SyncProtocol}
1517
import io.iohk.ethereum.crypto.kec256
@@ -37,6 +39,7 @@ import org.scalatest.{Assertion, BeforeAndAfterEach}
3739

3840
import scala.concurrent.duration._
3941
import scala.concurrent.{Await, Future, Promise}
42+
import scala.math.BigInt
4043

4144
class RegularSyncSpec
4245
extends WordSpecBase
@@ -112,29 +115,101 @@ class RegularSyncSpec
112115
peersClient.expectMsg(PeersClient.BlacklistPeer(defaultPeer.id, BlacklistReason.RegularSyncRequestFailed("a random reason")))
113116
})
114117

115-
//TODO: To be re-enabled with ETCM-370
116-
"blacklist peer which returns headers starting from one with higher number than expected" ignore sync(
117-
new Fixture(
118-
testSystem
119-
) {
118+
"blacklist peer which returns headers starting from one with higher number than expected" in sync(
119+
new Fixture(testSystem) {
120+
var blockFetcher: ActorRef = _
121+
120122
regularSync ! SyncProtocol.Start
123+
peerEventBus.expectMsgClass(classOf[Subscribe])
124+
blockFetcher = peerEventBus.sender()
121125

122126
peersClient.expectMsgEq(blockHeadersChunkRequest(0))
123-
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked(1).headers)))
127+
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked.head.headers)))
128+
129+
val getBodies: PeersClient.Request[GetBlockBodies] = PeersClient.Request.create(
130+
GetBlockBodies(testBlocksChunked.head.headers.map(_.hash)),
131+
PeersClient.BestPeer
132+
)
133+
peersClient.expectMsgEq(getBodies)
134+
peersClient.reply(PeersClient.Response(defaultPeer, BlockBodies(testBlocksChunked.head.bodies)))
135+
136+
blockFetcher ! MessageFromPeer(NewBlock(testBlocks.last, ChainWeight.totalDifficultyOnly(testBlocks.last.number)), defaultPeer.id)
137+
peersClient.expectMsgEq(blockHeadersChunkRequest(1))
138+
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked(5).headers)))
124139
peersClient.expectMsgPF() {
125140
case PeersClient.BlacklistPeer(id, _) if id == defaultPeer.id => true
126141
}
127142
}
128143
)
129144

130-
//TODO: To be re-enabled with ETCM-370
131-
"blacklist peer which returns headers not forming a chain" ignore sync(new Fixture(testSystem) {
145+
"blacklist peer which returns headers not forming a chain" in sync(new Fixture(testSystem) {
132146
regularSync ! SyncProtocol.Start
133147

134148
peersClient.expectMsgEq(blockHeadersChunkRequest(0))
135149
peersClient.reply(
136-
PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked.head.headers.filter(_.number % 2 == 0)))
150+
PeersClient.Response(defaultPeer, BlockHeaders(testBlocks.headers.filter(_.number % 2 == 0)))
151+
)
152+
peersClient.expectMsgPF() {
153+
case PeersClient.BlacklistPeer(id, _) if id == defaultPeer.id => true
154+
}
155+
})
156+
157+
"blacklist peer which sends headers that were not requested" in sync(new Fixture(testSystem) {
158+
import akka.actor.typed.scaladsl.adapter._
159+
160+
val blockImporter = TestProbe()
161+
val fetcher: typed.ActorRef[BlockFetcher.FetchCommand] =
162+
system.spawn(
163+
BlockFetcher(peersClient.ref, peerEventBus.ref, regularSync, syncConfig, validators.blockValidator),
164+
"block-fetcher"
165+
)
166+
167+
fetcher ! Start(blockImporter.ref, 0)
168+
169+
peersClient.expectMsgEq(blockHeadersChunkRequest(0))
170+
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked.head.headers)))
171+
172+
val getBodies: PeersClient.Request[GetBlockBodies] = PeersClient.Request.create(
173+
GetBlockBodies(testBlocksChunked.head.headers.map(_.hash)),
174+
PeersClient.BestPeer
137175
)
176+
177+
peersClient.expectMsgEq(getBodies)
178+
peersClient.reply(PeersClient.Response(defaultPeer, BlockBodies(testBlocksChunked.head.bodies)))
179+
180+
fetcher ! BlockFetcher.ReceivedHeaders(defaultPeer, testBlocksChunked(3).headers)
181+
182+
peersClient.expectMsgPF() {
183+
case PeersClient.BlacklistPeer(id, _) if id == defaultPeer.id => true
184+
}
185+
})
186+
187+
"blacklist peer which sends bodies that were not requested" in sync(new Fixture(testSystem) {
188+
import akka.actor.typed.scaladsl.adapter._
189+
190+
var blockFetcherAdapter: TypedActorRef[MessageFromPeer] = _
191+
val blockImporter = TestProbe()
192+
val fetcher: typed.ActorRef[BlockFetcher.FetchCommand] =
193+
system.spawn(
194+
BlockFetcher(peersClient.ref, peerEventBus.ref, regularSync, syncConfig, validators.blockValidator),
195+
"block-fetcher"
196+
)
197+
198+
fetcher ! Start(blockImporter.ref, 0)
199+
200+
peersClient.expectMsgEq(blockHeadersChunkRequest(0))
201+
peersClient.reply(PeersClient.Response(defaultPeer, BlockHeaders(testBlocksChunked.head.headers)))
202+
203+
val getBodies: PeersClient.Request[GetBlockBodies] = PeersClient.Request.create(
204+
GetBlockBodies(testBlocksChunked.head.headers.map(_.hash)),
205+
PeersClient.BestPeer
206+
)
207+
208+
peersClient.expectMsgEq(getBodies)
209+
peersClient.reply(PeersClient.Response(defaultPeer, BlockBodies(testBlocksChunked.head.bodies)))
210+
211+
fetcher ! BlockFetcher.ReceivedBodies(defaultPeer, testBlocksChunked(3).bodies)
212+
138213
peersClient.expectMsgPF() {
139214
case PeersClient.BlacklistPeer(id, _) if id == defaultPeer.id => true
140215
}

0 commit comments

Comments
 (0)