Skip to content

Commit 4d04689

Browse files
authored
[ETCM-531] Cache-based and thread-safe blacklist implementation (#921)
* First draft of cache-based blacklist * Fix logging output * Fix FastSyncSpec * Update nix-sbt sha * Update nix-sbt sha * Polish and add tests * Rename BlackListId to BlacklistId * Rework PeerListSupport a little bit * Small cleanup * [ETCM-531] Turn blacklist reasons into proper types, small improvements based on PR comments * Make BlacklistReasonType a sealed trait * Only log description for blacklist reason * ETCM-531 renamed minute -> minutes * ETCM-531 Reformat triggered by running sbt pp * ETCM-531 Fix expiration after read
1 parent e0e46ce commit 4d04689

File tree

15 files changed

+449
-93
lines changed

15 files changed

+449
-93
lines changed

build.sbt

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -111,24 +111,26 @@ lazy val node = {
111111
Seq(
112112
Dependencies.akka,
113113
Dependencies.akkaHttp,
114-
Dependencies.json4s,
115-
Dependencies.circe,
114+
Dependencies.apacheCommons,
116115
Dependencies.boopickle,
117-
Dependencies.rocksDb,
118-
Dependencies.enumeratum,
119-
Dependencies.testing,
120116
Dependencies.cats,
121-
Dependencies.monix,
122-
Dependencies.network,
117+
Dependencies.circe,
118+
Dependencies.cli,
123119
Dependencies.crypto,
124-
Dependencies.scopt,
120+
Dependencies.dependencies,
121+
Dependencies.enumeratum,
122+
Dependencies.guava,
123+
Dependencies.json4s,
124+
Dependencies.kamon,
125125
Dependencies.logging,
126-
Dependencies.apacheCommons,
127126
Dependencies.micrometer,
128-
Dependencies.kamon,
127+
Dependencies.monix,
128+
Dependencies.network,
129129
Dependencies.prometheus,
130-
Dependencies.cli,
131-
Dependencies.dependencies
130+
Dependencies.rocksDb,
131+
Dependencies.scaffeine,
132+
Dependencies.scopt,
133+
Dependencies.testing
132134
).flatten ++ malletDeps
133135
}
134136

nix/pkgs/mantis.nix

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ in sbt.mkDerivation {
5050

5151
# This sha represents the change dependencies of mantis.
5252
# Update this sha whenever you change the dependencies
53-
depsSha256 = "0gppwz6dvligrrgjmramyrm9723pwhg89cqfpxj22z2d86brwas2";
53+
depsSha256 = "07iixw8va4zwpiln2zy2gr245z1ir4jd6pqgmkzhwnhw3mf5j28k";
5454

5555
# this is the command used to to create the fixed-output-derivation
5656
depsWarmupCommand = "sbt compile --debug -Dnix=true";

project/Dependencies.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,19 @@ object Dependencies {
112112
jline,
113113
"org.scala-lang.modules" %% "scala-parser-combinators" % "1.1.2",
114114
"org.scala-sbt.ipcsocket" % "ipcsocket" % "1.1.0",
115-
"com.google.guava" % "guava" % "29.0-jre",
116115
"org.xerial.snappy" % "snappy-java" % "1.1.7.7",
117116
"org.web3j" % "core" % "5.0.0" % Test,
118117
"io.vavr" % "vavr" % "1.0.0-alpha-3"
119118
)
120119

120+
val guava: Seq[ModuleID] = {
121+
val version = "30.1-jre"
122+
Seq(
123+
"com.google.guava" % "guava" % version,
124+
"com.google.guava" % "guava-testlib" % version % "test"
125+
)
126+
}
127+
121128
val prometheus: Seq[ModuleID] = {
122129
val provider = "io.prometheus"
123130
val version = "0.9.0"
@@ -137,7 +144,7 @@ object Dependencies {
137144
"com.google.code.findbugs" % "jsr305" % "3.0.2" % Optional,
138145
provider % "micrometer-core" % version,
139146
provider % "micrometer-registry-jmx" % version,
140-
provider % "micrometer-registry-prometheus" % version,
147+
provider % "micrometer-registry-prometheus" % version
141148
)
142149
}
143150

@@ -153,4 +160,9 @@ object Dependencies {
153160
val shapeless: Seq[ModuleID] = Seq(
154161
"com.chuusai" %% "shapeless" % "2.3.3"
155162
)
163+
164+
val scaffeine: Seq[ModuleID] = Seq(
165+
"com.github.blemale" %% "scaffeine" % "4.0.2" % "compile"
166+
)
167+
156168
}

src/it/scala/io/iohk/ethereum/sync/util/FastSyncItSpecUtils.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,17 @@ import monix.eval.Task
1717
import scala.annotation.tailrec
1818
import scala.concurrent.duration._
1919
import scala.util.Try
20+
import io.iohk.ethereum.blockchain.sync.CacheBasedBlacklist
2021
object FastSyncItSpecUtils {
2122

2223
class FakePeer(peerName: String, fakePeerCustomConfig: FakePeerCustomConfig)
2324
extends CommonFakePeer(peerName, fakePeerCustomConfig) {
2425

2526
lazy val validators = new MockValidatorsAlwaysSucceed
2627

28+
val maxSize = 1000
29+
val blacklist = CacheBasedBlacklist.empty(maxSize)
30+
2731
lazy val fastSync = system.actorOf(
2832
FastSync.props(
2933
storagesInstance.storages.fastSyncStateStorage,
@@ -32,6 +36,7 @@ object FastSyncItSpecUtils {
3236
validators,
3337
peerEventBus,
3438
etcPeerManager,
39+
blacklist,
3540
testSyncConfig,
3641
system.scheduler
3742
)
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package io.iohk.ethereum.blockchain.sync
2+
3+
import com.github.benmanes.caffeine.cache.Caffeine
4+
import com.github.blemale.scaffeine.{Cache, Scaffeine}
5+
import io.iohk.ethereum.utils.Logger
6+
7+
import scala.concurrent.duration.FiniteDuration
8+
import scala.concurrent.duration._
9+
import scala.jdk.CollectionConverters._
10+
import scala.jdk.OptionConverters._
11+
import scala.jdk.DurationConverters._
12+
13+
import Blacklist._
14+
import io.iohk.ethereum.network.PeerId
15+
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason.BlacklistReasonType.WrongBlockHeadersType
16+
import io.iohk.ethereum.consensus.validators.std.StdBlockValidator.BlockError
17+
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason.BlacklistReasonType
18+
19+
trait Blacklist {
20+
def isBlacklisted(id: BlacklistId): Boolean
21+
def add(id: BlacklistId, duration: FiniteDuration, reason: BlacklistReason): Unit
22+
def remove(id: BlacklistId): Unit
23+
def keys: Set[BlacklistId]
24+
}
25+
26+
object Blacklist {
27+
import BlacklistReason._
28+
import BlacklistReasonType._
29+
30+
trait BlacklistId {
31+
def value: String
32+
}
33+
34+
sealed trait BlacklistReason {
35+
def reasonType: BlacklistReasonType
36+
def description: String
37+
}
38+
object BlacklistReason {
39+
sealed trait BlacklistReasonType {
40+
def code: Int
41+
def name: String
42+
}
43+
object BlacklistReasonType {
44+
case object WrongBlockHeadersType extends BlacklistReasonType {
45+
val code: Int = 1
46+
val name: String = "WrongBlockHeadersType"
47+
}
48+
case object BlockHeaderValidationFailedType extends BlacklistReasonType {
49+
val code: Int = 2
50+
val name: String = "BlockHeaderValidationFailed"
51+
}
52+
case object ErrorInBlockHeadersType extends BlacklistReasonType {
53+
val code: Int = 3
54+
val name: String = "ErrorInBlockHeaders"
55+
}
56+
case object EmptyBlockBodiesType extends BlacklistReasonType {
57+
val code: Int = 4
58+
val name: String = "EmptyBlockBodies"
59+
}
60+
case object BlockBodiesNotMatchingHeadersType extends BlacklistReasonType {
61+
val code: Int = 5
62+
val name: String = "BlockBodiesNotMatchingHeaders"
63+
}
64+
case object EmptyReceiptsType extends BlacklistReasonType {
65+
val code: Int = 6
66+
val name: String = "EmptyReceipts"
67+
}
68+
case object InvalidReceiptsType extends BlacklistReasonType {
69+
val code: Int = 7
70+
val name: String = "InvalidReceipts"
71+
}
72+
case object RequestFailedType extends BlacklistReasonType {
73+
val code: Int = 8
74+
val name: String = "RequestFailed"
75+
}
76+
case object PeerActorTerminatedType extends BlacklistReasonType {
77+
val code: Int = 9
78+
val name: String = "PeerActorTerminated"
79+
}
80+
}
81+
82+
case object WrongBlockHeaders extends BlacklistReason {
83+
val reasonType: BlacklistReasonType = WrongBlockHeadersType
84+
val description: String = "Wrong blockheaders response (empty or not chain forming)"
85+
}
86+
case object BlockHeaderValidationFailed extends BlacklistReason {
87+
val reasonType: BlacklistReasonType = BlockHeaderValidationFailedType
88+
val description: String = "Block header validation failed"
89+
}
90+
case object ErrorInBlockHeaders extends BlacklistReason {
91+
val reasonType: BlacklistReasonType = ErrorInBlockHeadersType
92+
val description: String = "Error in block headers response"
93+
}
94+
final case class EmptyBlockBodies(knownHashes: Seq[String]) extends BlacklistReason {
95+
val reasonType: BlacklistReasonType = EmptyBlockBodiesType
96+
val description: String = s"Got empty block bodies response for known hashes: $knownHashes"
97+
}
98+
case object BlockBodiesNotMatchingHeaders extends BlacklistReason {
99+
val reasonType: BlacklistReasonType = BlockBodiesNotMatchingHeadersType
100+
val description = "Block bodies not matching block headers"
101+
}
102+
final case class EmptyReceipts(knownHashes: Seq[String]) extends BlacklistReason {
103+
val reasonType: BlacklistReasonType = EmptyReceiptsType
104+
val description: String = s"Got empty receipts for known hashes: $knownHashes"
105+
}
106+
final case class InvalidReceipts(knownHashes: Seq[String], error: BlockError) extends BlacklistReason {
107+
val reasonType: BlacklistReasonType = InvalidReceiptsType
108+
val description: String = s"Got invalid receipts for known hashes: $knownHashes due to: $error"
109+
}
110+
final case class RequestFailed(error: String) extends BlacklistReason {
111+
val reasonType: BlacklistReasonType = RequestFailedType
112+
val description: String = s"Request failed with error: $error"
113+
}
114+
case object PeerActorTerminated extends BlacklistReason {
115+
val reasonType: BlacklistReasonType = PeerActorTerminatedType
116+
val description: String = "Peer actor terminated"
117+
}
118+
}
119+
}
120+
121+
final case class CacheBasedBlacklist(cache: Cache[BlacklistId, BlacklistReasonType]) extends Blacklist with Logger {
122+
123+
import CacheBasedBlacklist._
124+
125+
override def isBlacklisted(id: BlacklistId): Boolean = cache.getIfPresent(id).isDefined
126+
127+
override def add(id: BlacklistId, duration: FiniteDuration, reason: BlacklistReason): Unit = {
128+
log.info("Blacklisting peer [{}] for {}. Reason: {}", id, duration, reason.description)
129+
cache.policy().expireVariably().toScala match {
130+
case Some(varExpiration) => varExpiration.put(id, reason.reasonType, duration.toJava)
131+
case None =>
132+
log.warn(customExpirationError(id))
133+
cache.put(id, reason.reasonType)
134+
}
135+
}
136+
override def remove(id: BlacklistId): Unit = cache.invalidate(id)
137+
138+
override def keys: Set[BlacklistId] = cache.underlying.asMap().keySet().asScala.toSet
139+
}
140+
141+
object CacheBasedBlacklist {
142+
143+
def customExpirationError(id: BlacklistId): String =
144+
s"Unexpected error while adding peer [${id.value}] to blacklist using custom expiration time. Falling back to default expiration."
145+
146+
def empty(maxSize: Int): CacheBasedBlacklist = {
147+
val cache =
148+
Scaffeine()
149+
.expireAfter[BlacklistId, BlacklistReasonType](
150+
create = (_, _) => 60.minutes,
151+
update = (_, _, _) => 60.minutes,
152+
read = (_, _, duration) => duration // read access should not change the expiration time
153+
) // required to enable VarExpiration policy (i.e. set custom expiration time per element)
154+
.maximumSize(
155+
maxSize
156+
) // uses Window TinyLfu eviction policy, see https://github.com/ben-manes/caffeine/wiki/Efficiency
157+
.build[BlacklistId, BlacklistReasonType]()
158+
CacheBasedBlacklist(cache)
159+
}
160+
161+
}

src/main/scala/io/iohk/ethereum/blockchain/sync/BlacklistSupport.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package io.iohk.ethereum.blockchain.sync
22

3-
import scala.concurrent.duration.{Duration, FiniteDuration}
43
import akka.actor.{Actor, ActorLogging, Cancellable, Scheduler}
4+
import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistId
5+
56
import scala.collection.mutable
67
import scala.concurrent.ExecutionContext.Implicits.global
8+
import scala.concurrent.duration.{Duration, FiniteDuration}
79

10+
// will be removed once regular sync is switched to new blacklist implementation
811
trait BlacklistSupport {
912
selfActor: Actor with ActorLogging =>
1013

@@ -14,9 +17,9 @@ trait BlacklistSupport {
1417

1518
protected val maxBlacklistedNodes = 1000
1619

17-
val blacklistedPeers = mutable.LinkedHashMap.empty[BlackListId, Cancellable]
20+
val blacklistedPeers = mutable.LinkedHashMap.empty[BlacklistId, Cancellable]
1821

19-
def blacklist(blacklistId: BlackListId, duration: FiniteDuration, reason: String): Unit = {
22+
def blacklist(blacklistId: BlacklistId, duration: FiniteDuration, reason: String): Unit = {
2023
if (duration > Duration.Zero) {
2124
if (blacklistedPeers.size >= maxBlacklistedNodes) {
2225
removeOldestPeer()
@@ -30,13 +33,13 @@ trait BlacklistSupport {
3033
}
3134
}
3235

33-
def undoBlacklist(blacklistId: BlackListId): Unit = {
36+
def undoBlacklist(blacklistId: BlacklistId): Unit = {
3437
val peer = blacklistedPeers.get(blacklistId)
3538
peer.foreach(_.cancel())
3639
blacklistedPeers.remove(blacklistId)
3740
}
3841

39-
def isBlacklisted(blacklistId: BlackListId): Boolean =
42+
def isBlacklisted(blacklistId: BlacklistId): Boolean =
4043
blacklistedPeers.exists(_._1 == blacklistId)
4144

4245
def handleBlacklistMessages: Receive = { case UnblacklistPeer(ref) =>
@@ -52,9 +55,6 @@ trait BlacklistSupport {
5255

5356
object BlacklistSupport {
5457

55-
abstract class BlackListId {
56-
def value: String
57-
}
58+
private case class UnblacklistPeer(blacklistId: BlacklistId)
5859

59-
private case class UnblacklistPeer(blacklistId: BlackListId)
6060
}

src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupport.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import io.iohk.ethereum.utils.Config.SyncConfig
1111
import scala.concurrent.duration._
1212
import scala.concurrent.ExecutionContext.Implicits.global
1313

14+
// will be removed once regular sync is switched to new blacklist/peerlist implementation
1415
trait PeerListSupport {
1516
self: Actor with ActorLogging with BlacklistSupport =>
1617
import PeerListSupport._

0 commit comments

Comments
 (0)