Skip to content

Commit 83a70cc

Browse files
committed
ETCM-760: Move networking and regular sync to new blacklist
1 parent 65037a8 commit 83a70cc

File tree

10 files changed

+66
-48
lines changed

10 files changed

+66
-48
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/PeersClient.scala

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.iohk.ethereum.blockchain.sync
22

33
import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props, Scheduler}
4+
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason
5+
import io.iohk.ethereum.blockchain.sync.PeerListSupportNg.PeerWithInfo
46
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
57
import io.iohk.ethereum.network.p2p.messages.Codes
68
import io.iohk.ethereum.network.{Peer, PeerId}
@@ -15,12 +17,12 @@ import scala.reflect.ClassTag
1517
class PeersClient(
1618
val etcPeerManager: ActorRef,
1719
val peerEventBus: ActorRef,
20+
val blacklist: Blacklist,
1821
val syncConfig: SyncConfig,
1922
implicit val scheduler: Scheduler
2023
) extends Actor
2124
with ActorLogging
22-
with PeerListSupport
23-
with BlacklistSupport {
25+
with PeerListSupportNg {
2426
import PeersClient._
2527

2628
implicit val ec: ExecutionContext = context.dispatcher
@@ -36,9 +38,9 @@ class PeersClient(
3638
}
3739

3840
def running(requesters: Requesters): Receive =
39-
handleBlacklistMessages orElse handlePeerListMessages orElse {
41+
handlePeerListMessages orElse {
4042
case PrintStatus => printStatus(requesters: Requesters)
41-
case BlacklistPeer(peerId, reason) => peerById(peerId).foreach(blacklistIfHandshaked(_, reason))
43+
case BlacklistPeer(peerId, reason) => blacklistIfHandshaked(peerId, syncConfig.blacklistDuration, reason)
4244
case Request(message, peerSelector, toSerializable) =>
4345
val requester = sender()
4446
selectPeer(peerSelector) match {
@@ -56,7 +58,7 @@ class PeersClient(
5658
handleResponse(requesters, Response(peer, message.asInstanceOf[Message]))
5759
case PeerRequestHandler.RequestFailed(peer, reason) =>
5860
log.warning(s"Request to peer ${peer.remoteAddress} failed - reason: $reason")
59-
handleResponse(requesters, RequestFailed(peer, reason))
61+
handleResponse(requesters, RequestFailed(peer, BlacklistReason.RegularSyncRequestFailed(reason)))
6062
}
6163

6264
private def makeRequest[RequestMsg <: Message, ResponseMsg <: Message](
@@ -108,12 +110,12 @@ class PeersClient(
108110
peersToDownloadFrom.size
109111
)
110112

111-
lazy val handshakedPeersStatus = handshakedPeers.map { case (peer, info) =>
113+
lazy val handshakedPeersStatus = handshakedPeers.map { case (peerId, peerWithInfo) =>
112114
val peerNetworkStatus = PeerNetworkStatus(
113-
peer,
114-
isBlacklisted = isBlacklisted(peer.id)
115+
peerWithInfo.peer,
116+
isBlacklisted = blacklist.isBlacklisted(peerId)
115117
)
116-
(peerNetworkStatus, info)
118+
(peerNetworkStatus, peerWithInfo.peerInfo)
117119
}
118120

119121
log.debug(s"Handshaked peers status (number of peers: ${handshakedPeersStatus.size}): $handshakedPeersStatus")
@@ -122,13 +124,19 @@ class PeersClient(
122124

123125
object PeersClient {
124126

125-
def props(etcPeerManager: ActorRef, peerEventBus: ActorRef, syncConfig: SyncConfig, scheduler: Scheduler): Props =
126-
Props(new PeersClient(etcPeerManager, peerEventBus, syncConfig, scheduler))
127+
def props(
128+
etcPeerManager: ActorRef,
129+
peerEventBus: ActorRef,
130+
blacklist: Blacklist,
131+
syncConfig: SyncConfig,
132+
scheduler: Scheduler
133+
): Props =
134+
Props(new PeersClient(etcPeerManager, peerEventBus, blacklist, syncConfig, scheduler))
127135

128136
type Requesters = Map[ActorRef, ActorRef]
129137

130138
sealed trait PeersClientMessage
131-
case class BlacklistPeer(peerId: PeerId, reason: String) extends PeersClientMessage
139+
case class BlacklistPeer(peerId: PeerId, reason: BlacklistReason) extends PeersClientMessage
132140
case class Request[RequestMsg <: Message](
133141
message: RequestMsg,
134142
peerSelector: PeerSelector,
@@ -155,16 +163,16 @@ object PeersClient {
155163

156164
sealed trait ResponseMessage
157165
case object NoSuitablePeer extends ResponseMessage
158-
case class RequestFailed(peer: Peer, reason: String) extends ResponseMessage
166+
case class RequestFailed(peer: Peer, reason: BlacklistReason) extends ResponseMessage
159167
case class Response[T <: Message](peer: Peer, message: T) extends ResponseMessage
160168

161169
sealed trait PeerSelector
162170
case object BestPeer extends PeerSelector
163171

164-
def bestPeer(peersToDownloadFrom: Map[Peer, PeerInfo]): Option[Peer] = {
165-
val peersToUse = peersToDownloadFrom
166-
.collect { case (ref, PeerInfo(_, chainWeight, true, _, _)) =>
167-
(ref, chainWeight)
172+
def bestPeer(peersToDownloadFrom: Map[PeerId, PeerWithInfo]): Option[Peer] = {
173+
val peersToUse = peersToDownloadFrom.values
174+
.collect { case PeerWithInfo(peer, PeerInfo(_, chainWeight, true, _, _)) =>
175+
(peer, chainWeight)
168176
}
169177

170178
if (peersToUse.nonEmpty) {

src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSyncBranchResolverActor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ class FastSyncBranchResolverActor(
192192
blacklistIfHandshaked(
193193
peer.id,
194194
syncConfig.blacklistDuration,
195-
BlacklistReason.RequestFailed(reason)
195+
BlacklistReason.FastSyncRequestFailed(reason)
196196
)
197197
restart()
198198
}

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockBroadcasterActor.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,22 @@ package io.iohk.ethereum.blockchain.sync.regular
22

33
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Scheduler}
44
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcast.BlockToBroadcast
5-
import io.iohk.ethereum.blockchain.sync.{BlacklistSupport, PeerListSupport}
5+
import io.iohk.ethereum.blockchain.sync.{Blacklist, PeerListSupportNg}
66
import io.iohk.ethereum.utils.Config.SyncConfig
77

88
class BlockBroadcasterActor(
99
broadcast: BlockBroadcast,
1010
val peerEventBus: ActorRef,
1111
val etcPeerManager: ActorRef,
12+
val blacklist: Blacklist,
1213
val syncConfig: SyncConfig,
1314
val scheduler: Scheduler
1415
) extends Actor
1516
with ActorLogging
16-
with PeerListSupport
17-
with BlacklistSupport {
17+
with PeerListSupportNg {
1818
import BlockBroadcasterActor._
1919

20-
override def receive: Receive = handlePeerListMessages orElse handleBlacklistMessages orElse handleBroadcastMessages
20+
override def receive: Receive = handlePeerListMessages orElse handleBroadcastMessages
2121

2222
private def handleBroadcastMessages: Receive = {
2323
case BroadcastBlock(newBlock) => broadcast.broadcastBlock(newBlock, handshakedPeers)
@@ -33,6 +33,7 @@ object BlockBroadcasterActor {
3333
broadcast: BlockBroadcast,
3434
peerEventBus: ActorRef,
3535
etcPeerManager: ActorRef,
36+
blacklist: Blacklist,
3637
syncConfig: SyncConfig,
3738
scheduler: Scheduler
3839
): Props =
@@ -41,6 +42,7 @@ object BlockBroadcasterActor {
4142
broadcast = broadcast,
4243
peerEventBus = peerEventBus,
4344
etcPeerManager = etcPeerManager,
45+
blacklist = blacklist,
4446
syncConfig = syncConfig,
4547
scheduler = scheduler
4648
)

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import akka.actor.{ActorRef => ClassicActorRef}
66
import akka.util.{ByteString, Timeout}
77
import cats.data.NonEmptyList
88
import cats.instances.option._
9+
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason
910
import io.iohk.ethereum.consensus.validators.BlockValidator
1011
import io.iohk.ethereum.blockchain.sync.PeersClient._
1112
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
@@ -126,7 +127,7 @@ class BlockFetcher(
126127
case InvalidateBlocksFrom(blockNr, reason, withBlacklist) =>
127128
val (blockProvider, newState) = state.invalidateBlocksFrom(blockNr, withBlacklist)
128129
log.debug("Invalidate blocks from {}", blockNr)
129-
blockProvider.foreach(peersClient ! BlacklistPeer(_, reason))
130+
blockProvider.foreach(peersClient ! BlacklistPeer(_, BlacklistReason.BlockImportError(reason)))
130131
fetchBlocks(newState)
131132

132133
case ReceivedHeaders(headers) if state.isFetchingHeaders =>
@@ -326,7 +327,7 @@ object BlockFetcher {
326327
def apply(from: BigInt, reason: String, toBlacklist: Option[BigInt]): InvalidateBlocksFrom =
327328
new InvalidateBlocksFrom(from, reason, toBlacklist)
328329
}
329-
final case class BlockImportFailed(blockNr: BigInt, reason: String) extends FetchCommand
330+
final case class BlockImportFailed(blockNr: BigInt, reason: BlacklistReason) extends FetchCommand
330331
final case class InternalLastBlockImport(blockNr: BigInt) extends FetchCommand
331332
final case object RetryBodiesRequest extends FetchCommand
332333
final case object RetryHeadersRequest extends FetchCommand

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import akka.actor.ActorRef
44
import akka.util.ByteString
55
import cats.data.NonEmptyList
66
import cats.implicits._
7+
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason
78
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState._
89
import io.iohk.ethereum.consensus.validators.BlockValidator
910
import io.iohk.ethereum.domain.{Block, BlockBody, BlockHeader, HeadersSeq}
@@ -122,10 +123,10 @@ case class BlockFetcherState(
122123
* even more, we could receive an empty chain and that will be considered valid. Here we just
123124
* validate that the received bodies corresponds to an ordered subset of the requested headers.
124125
*/
125-
def validateBodies(receivedBodies: Seq[BlockBody]): Either[String, Seq[Block]] =
126+
def validateBodies(receivedBodies: Seq[BlockBody]): Either[BlacklistReason, Seq[Block]] =
126127
bodiesAreOrderedSubsetOfRequested(waitingHeaders.toList, receivedBodies)
127128
.toRight(
128-
"Received unrequested bodies"
129+
BlacklistReason.UnrequestedBodies
129130
)
130131

131132
// Checks that the received block bodies are an ordered subset of the ones requested

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import akka.actor.Actor.Receive
44
import akka.actor.{Actor, ActorLogging, ActorRef, NotInfluenceReceiveTimeout, Props, ReceiveTimeout}
55
import cats.data.NonEmptyList
66
import cats.implicits._
7+
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason
78
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcast.BlockToBroadcast
89
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlocks
910
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressProtocol
@@ -20,7 +21,6 @@ import io.iohk.ethereum.utils.Config.SyncConfig
2021
import io.iohk.ethereum.utils.FunctorOps._
2122
import monix.eval.Task
2223
import monix.execution.Scheduler
23-
import org.bouncycastle.util.encoders.Hex
2424

2525
import scala.concurrent.duration._
2626

@@ -263,7 +263,7 @@ class BlockImporter(
263263
Task.raiseError(missingNodeException)
264264
case BlockImportFailed(error) =>
265265
if (informFetcherOnFail) {
266-
fetcher ! BlockFetcher.BlockImportFailed(block.number, error)
266+
fetcher ! BlockFetcher.BlockImportFailed(block.number, BlacklistReason.BlockImportError(error))
267267
}
268268
}
269269
.map(_ => Running)

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package io.iohk.ethereum.blockchain.sync.regular
33
import akka.actor.{Actor, ActorLogging, ActorRef, AllForOneStrategy, Cancellable, Props, Scheduler, SupervisorStrategy}
44
import akka.actor.typed.{ActorRef => TypedActorRef}
55
import io.iohk.ethereum.blockchain.sync.SyncProtocol
6+
import io.iohk.ethereum.blockchain.sync.{Blacklist, SyncProtocol}
67
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status
78
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress
89
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.{NewCheckpoint, ProgressProtocol, ProgressState}
@@ -21,6 +22,7 @@ class RegularSync(
2122
ledger: Ledger,
2223
blockchain: Blockchain,
2324
blockValidator: BlockValidator,
25+
blacklist: Blacklist,
2426
syncConfig: SyncConfig,
2527
ommersPool: ActorRef,
2628
pendingTransactionsManager: ActorRef,
@@ -38,7 +40,7 @@ class RegularSync(
3840

3941
val broadcaster: ActorRef = context.actorOf(
4042
BlockBroadcasterActor
41-
.props(new BlockBroadcast(etcPeerManager), peerEventBus, etcPeerManager, syncConfig, scheduler),
43+
.props(new BlockBroadcast(etcPeerManager), peerEventBus, etcPeerManager, blacklist, syncConfig, scheduler),
4244
"block-broadcaster"
4345
)
4446
val importer: ActorRef =
@@ -126,6 +128,7 @@ object RegularSync {
126128
ledger: Ledger,
127129
blockchain: Blockchain,
128130
blockValidator: BlockValidator,
131+
blacklist: Blacklist,
129132
syncConfig: SyncConfig,
130133
ommersPool: ActorRef,
131134
pendingTransactionsManager: ActorRef,
@@ -139,6 +142,7 @@ object RegularSync {
139142
ledger,
140143
blockchain,
141144
blockValidator,
145+
blacklist,
142146
syncConfig,
143147
ommersPool,
144148
pendingTransactionsManager,

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/StateNodeFetcher.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import io.iohk.ethereum.network.p2p.Message
1212
import io.iohk.ethereum.network.p2p.messages.PV63.{GetNodeData, NodeData}
1313
import io.iohk.ethereum.utils.Config.SyncConfig
1414
import cats.syntax.either._
15+
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason
1516
import monix.execution.Scheduler
1617

1718
import scala.util.{Failure, Success}
@@ -46,15 +47,13 @@ class StateNodeFetcher(
4647
requester
4748
.collect(stateNodeRequester => {
4849
val validatedNode = values
49-
.asRight[String]
50-
.ensure(s"Empty response from peer $peer, blacklisting")(_.nonEmpty)
51-
.ensure("Fetched node state hash doesn't match requested one, blacklisting peer")(nodes =>
52-
stateNodeRequester.hash == kec256(nodes.head)
53-
)
50+
.asRight[BlacklistReason]
51+
.ensure(BlacklistReason.EmptyStateNodeResponse)(_.nonEmpty)
52+
.ensure(BlacklistReason.WrongStateNodeResponse)(nodes => stateNodeRequester.hash == kec256(nodes.head))
5453

5554
validatedNode match {
5655
case Left(err) =>
57-
log.debug(err)
56+
log.debug(err.description)
5857
peersClient ! BlacklistPeer(peer.id, err)
5958
context.self ! StateNodeFetcher.FetchStateNode(stateNodeRequester.hash, stateNodeRequester.replyTo)
6059
Behaviors.same[StateNodeFetcherCommand]

0 commit comments

Comments
 (0)