Skip to content

Commit 84a2f9b

Browse files
author
Dmitry Voronov
authored
[ETCM-266]-replaced-rate-limiter-built-on-twitter (#873)
[ETCM-266]-replaced-rate-limiter-built-on-twitter
1 parent c069d08 commit 84a2f9b

File tree

6 files changed

+97
-137
lines changed

6 files changed

+97
-137
lines changed

build.sbt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ lazy val node = {
122122
Dependencies.cats,
123123
Dependencies.monix,
124124
Dependencies.network,
125-
Dependencies.twitterUtilCollection,
126125
Dependencies.crypto,
127126
Dependencies.scopt,
128127
Dependencies.logging,

project/Dependencies.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,6 @@ object Dependencies {
9494
"org.codehaus.janino" % "janino" % "3.1.2"
9595
)
9696

97-
val twitterUtilCollection = Seq("com.twitter" %% "util-collection" % "19.1.0")
98-
9997
val crypto = Seq("org.bouncycastle" % "bcprov-jdk15on" % "1.66")
10098

10199
val scopt = Seq("com.github.scopt" % "scopt_2.12" % "3.7.1")

repo.nix

Lines changed: 0 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,54 +1065,6 @@
10651065
url = "https://repo1.maven.org/maven2/com/trueaccord/scalapb/scalapb-runtime_2.12/0.6.6/scalapb-runtime_2.12-0.6.6.pom";
10661066
sha256 = "1D151CC97E7A94E1795A66538692E047468C5D7E5491EC709A59F8D7B7ABB039";
10671067
};
1068-
"nix-public/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0-javadoc.jar" = {
1069-
url = "https://repo1.maven.org/maven2/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0-javadoc.jar";
1070-
sha256 = "3400584E84FE765101EE84DDE208CBB4AECAE33A2093ED989AEBD802916F9BEB";
1071-
};
1072-
"nix-public/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0-sources.jar" = {
1073-
url = "https://repo1.maven.org/maven2/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0-sources.jar";
1074-
sha256 = "1FAAC80D74C5E99B30A32C81D5F8A675F5DA1889584893822A720E7C044BF2DF";
1075-
};
1076-
"nix-public/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0.jar" = {
1077-
url = "https://repo1.maven.org/maven2/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0.jar";
1078-
sha256 = "5DF5983BB58E545CACD771504DC32A83CE27D4B2ABC45B3041ACD4F74ECF4418";
1079-
};
1080-
"nix-public/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0.pom" = {
1081-
url = "https://repo1.maven.org/maven2/com/twitter/util-collection_2.12/19.1.0/util-collection_2.12-19.1.0.pom";
1082-
sha256 = "D2155349209C1EC451F08FE3A4ECE7A916B118E46F3492A81FC24B7B90E9EA7E";
1083-
};
1084-
"nix-public/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0-javadoc.jar" = {
1085-
url = "https://repo1.maven.org/maven2/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0-javadoc.jar";
1086-
sha256 = "0B1ACC67A5C6C8281FF0322899E0E1F3C28516094D166412E2DBF0C960E94C8E";
1087-
};
1088-
"nix-public/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0-sources.jar" = {
1089-
url = "https://repo1.maven.org/maven2/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0-sources.jar";
1090-
sha256 = "43F4E01682C3517DCA2CDB15678419CEA6C904886231C5507552AA996B48439A";
1091-
};
1092-
"nix-public/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0.jar" = {
1093-
url = "https://repo1.maven.org/maven2/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0.jar";
1094-
sha256 = "E91061D0EC00F3573AA1311079E32A1F4A56D09BDE0D52715E4C035B2156AF52";
1095-
};
1096-
"nix-public/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0.pom" = {
1097-
url = "https://repo1.maven.org/maven2/com/twitter/util-core_2.12/19.1.0/util-core_2.12-19.1.0.pom";
1098-
sha256 = "0D0ECBD7D803EB1CC5337805A07242E87D7CD3C371FC67C243B3D68615E0F4CB";
1099-
};
1100-
"nix-public/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0-javadoc.jar" = {
1101-
url = "https://repo1.maven.org/maven2/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0-javadoc.jar";
1102-
sha256 = "87695034BFFD441D11ED0B3672F412795CE1B9F5770CF2E882281623B781F708";
1103-
};
1104-
"nix-public/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0-sources.jar" = {
1105-
url = "https://repo1.maven.org/maven2/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0-sources.jar";
1106-
sha256 = "FD71104B195EDE70D78FD38D49B0F8B4419C33EF59D7E23592272CF68538CEF9";
1107-
};
1108-
"nix-public/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0.jar" = {
1109-
url = "https://repo1.maven.org/maven2/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0.jar";
1110-
sha256 = "B1516320F07264AE2BE860F4C74FA2DF5EF8BD349C68D082D8EAECDFCF77F766";
1111-
};
1112-
"nix-public/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0.pom" = {
1113-
url = "https://repo1.maven.org/maven2/com/twitter/util-function_2.12/19.1.0/util-function_2.12-19.1.0.pom";
1114-
sha256 = "17DB0C4F7A9C79E6279A8A4D6CAAEAC2DF889FDD2D3BA002E648F436C34FC88B";
1115-
};
11161068
"nix-public/com/typesafe/akka/akka-actor_2.12/2.5.23/akka-actor_2.12-2.5.23-javadoc.jar" = {
11171069
url = "https://repo1.maven.org/maven2/com/typesafe/akka/akka-actor_2.12/2.5.23/akka-actor_2.12-2.5.23-javadoc.jar";
11181070
sha256 = "5C129DD97237DAB58EC6FBA946978916057BB6BBC414113BE8083ECD68A889E6";
@@ -2609,22 +2561,6 @@
26092561
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-java8-compat_2.12/0.8.0/scala-java8-compat_2.12-0.8.0.pom";
26102562
sha256 = "49E2711154CE9BA76962E043F4F8A6D0B890CD9E2FC6D0E99C3A0E6AD6C1950E";
26112563
};
2612-
"nix-public/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4-javadoc.jar" = {
2613-
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4-javadoc.jar";
2614-
sha256 = "228DA077BAEB60BB35E612780FB805126B0F88F9319AFDEA60E3BABEBC798BC2";
2615-
};
2616-
"nix-public/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4-sources.jar" = {
2617-
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4-sources.jar";
2618-
sha256 = "CB4BA7B7E598530FAEC863E5069864A28268EE4C636B0C46443884DCC4E07AC6";
2619-
};
2620-
"nix-public/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4.jar" = {
2621-
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4.jar";
2622-
sha256 = "282C78D064D3E8F09B3663190D9494B85E0BB7D96B0DA05994FE994384D96111";
2623-
};
2624-
"nix-public/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4.pom" = {
2625-
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.4/scala-parser-combinators_2.12-1.0.4.pom";
2626-
sha256 = "B512704E22EEF743D6288DED94943DFE78F9BB0636CCB5AC1919364BF0B4EF2A";
2627-
};
26282564
"nix-public/org/scala-lang/modules/scala-parser-combinators_2.12/1.1.2/scala-parser-combinators_2.12-1.1.2-javadoc.jar" = {
26292565
url = "https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.12/1.1.2/scala-parser-combinators_2.12-1.1.2-javadoc.jar";
26302566
sha256 = "7B7C26DC48B443E1268F8235AE009409C26AC80D976E313B4F3F7740C227AE59";

src/main/scala/io/iohk/ethereum/jsonrpc/server/http/JsonRpcHttpServer.scala

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.json4s.native.Serialization
2525
import org.json4s.{DefaultFormats, JInt, native}
2626
import scala.concurrent.duration.{FiniteDuration, _}
2727

28-
trait JsonRpcHttpServer extends Json4sSupport with RateLimit with Logger {
28+
trait JsonRpcHttpServer extends Json4sSupport with Logger {
2929
val jsonRpcController: JsonRpcBaseController
3030
val jsonRpcHealthChecker: JsonRpcHealthChecker
3131
val config: JsonRpcHttpServerConfig
@@ -54,37 +54,39 @@ trait JsonRpcHttpServer extends Json4sSupport with RateLimit with Logger {
5454
}
5555
.result()
5656

57+
protected val rateLimit = new RateLimit(config.rateLimit)
58+
5759
val route: Route = cors(corsSettings) {
5860
(path("healthcheck") & pathEndOrSingleSlash & get) {
5961
handleHealthcheck()
6062
} ~ (path("buildinfo") & pathEndOrSingleSlash & get) {
6163
handleBuildInfo()
6264
} ~ (pathEndOrSingleSlash & post) {
63-
(extractClientIP & entity(as[JsonRpcRequest])) { (clientAddress, request) =>
64-
handleRequest(clientAddress, request)
65-
} ~ entity(as[Seq[JsonRpcRequest]]) { request =>
66-
handleBatchRequest(request)
65+
// TODO: maybe rate-limit this one too?
66+
entity(as[JsonRpcRequest]) {
67+
case statusReq if statusReq.method == FaucetJsonRpcController.Status =>
68+
handleRequest(statusReq)
69+
case jsonReq =>
70+
rateLimit {
71+
handleRequest(jsonReq)
72+
}
73+
// TODO: separate paths for single and multiple requests
74+
// TODO: to prevent repeated body and json parsing
75+
} ~ entity(as[Seq[JsonRpcRequest]]) {
76+
case _ if config.rateLimit.enabled =>
77+
complete(StatusCodes.MethodNotAllowed, JsonRpcError.MethodNotFound)
78+
case reqSeq =>
79+
complete {
80+
Task
81+
.traverse(reqSeq)(request => jsonRpcController.handleRequest(request))
82+
.runToFuture
83+
}
6784
}
6885
}
6986
}
7087

71-
def handleRequest(clientAddress: RemoteAddress, request: JsonRpcRequest): StandardRoute = {
72-
//FIXME: FaucetJsonRpcController.Status should be part of a Healthcheck request or alike.
73-
// As a temporary solution, it is being excluded from the Rate Limit.
74-
if (config.rateLimit.enabled && request.method != FaucetJsonRpcController.Status) {
75-
handleRateLimitedRequest(clientAddress, request)
76-
} else complete(handleResponse(jsonRpcController.handleRequest(request)).runToFuture)
77-
}
78-
79-
def handleRateLimitedRequest(clientAddress: RemoteAddress, request: JsonRpcRequest): StandardRoute = {
80-
if (isBelowRateLimit(clientAddress))
81-
complete(handleResponse(jsonRpcController.handleRequest(request)).runToFuture)
82-
else {
83-
log.warn(s"Request limit exceeded for ip ${clientAddress.toIP.getOrElse("unknown")}")
84-
complete(
85-
(StatusCodes.TooManyRequests, JsonRpcError.RateLimitError(config.rateLimit.minRequestInterval.toSeconds))
86-
)
87-
}
88+
def handleRequest(request: JsonRpcRequest): StandardRoute = {
89+
complete(handleResponse(jsonRpcController.handleRequest(request)).runToFuture)
8890
}
8991

9092
private def handleResponse(f: Task[JsonRpcResponse]): Task[(StatusCode, JsonRpcResponse)] = f map { jsonRpcResponse =>
@@ -128,15 +130,6 @@ trait JsonRpcHttpServer extends Json4sSupport with RateLimit with Logger {
128130
)
129131
}
130132

131-
private def handleBatchRequest(requests: Seq[JsonRpcRequest]) = {
132-
if (!config.rateLimit.enabled) {
133-
complete {
134-
Task
135-
.traverse(requests)(request => jsonRpcController.handleRequest(request))
136-
.runToFuture
137-
}
138-
} else complete(StatusCodes.MethodNotAllowed, JsonRpcError.MethodNotFound)
139-
}
140133
}
141134

142135
object JsonRpcHttpServer extends Logger {
@@ -160,12 +153,15 @@ object JsonRpcHttpServer extends Logger {
160153
}
161154

162155
trait RateLimitConfig {
156+
// TODO: Move the rateLimit.enabled setting upwards:
157+
// TODO: If we don't need to limit the request rate at all - we don't have to define the other settings
163158
val enabled: Boolean
164159
val minRequestInterval: FiniteDuration
165160
val latestTimestampCacheSize: Int
166161
}
167162

168163
object RateLimitConfig {
164+
// TODO: Use pureconfig
169165
def apply(rateLimitConfig: TypesafeConfig): RateLimitConfig =
170166
new RateLimitConfig {
171167
override val enabled: Boolean = rateLimitConfig.getBoolean("enabled")
Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,71 @@
11
package io.iohk.ethereum.jsonrpc.server.http
22

3-
import java.time.Clock
3+
import java.time.Duration
44

5-
import akka.http.scaladsl.model.RemoteAddress
6-
import com.twitter.util.LruMap
7-
import io.iohk.ethereum.jsonrpc.server.http.JsonRpcHttpServer.JsonRpcHttpServerConfig
5+
import akka.NotUsed
6+
import akka.http.scaladsl.model.{RemoteAddress, StatusCodes}
7+
import akka.http.scaladsl.server.{Directive0, Route}
8+
import io.iohk.ethereum.jsonrpc.server.http.JsonRpcHttpServer.RateLimitConfig
9+
import akka.http.scaladsl.server.Directives._
10+
import com.google.common.base.Ticker
11+
import com.google.common.cache.CacheBuilder
12+
import io.iohk.ethereum.jsonrpc.JsonRpcError
13+
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
14+
import io.iohk.ethereum.jsonrpc.serialization.JsonSerializers
15+
import org.json4s.{DefaultFormats, Formats, Serialization, native}
816

9-
trait RateLimit {
17+
class RateLimit(config: RateLimitConfig) extends Directive0 with Json4sSupport {
1018

11-
val config: JsonRpcHttpServerConfig
19+
private implicit val serialization: Serialization = native.Serialization
20+
private implicit val formats: Formats = DefaultFormats + JsonSerializers.RpcErrorJsonSerializer
1221

13-
val latestRequestTimestamps = new LruMap[RemoteAddress, Long](config.rateLimit.latestTimestampCacheSize)
22+
private[this] lazy val minInterval = config.minRequestInterval.toSeconds
1423

15-
val clock: Clock = Clock.systemUTC()
24+
private[this] lazy val lru = {
25+
val nanoDuration = config.minRequestInterval.toNanos
26+
val javaDuration = Duration.ofNanos(nanoDuration)
27+
val ticker: Ticker = new Ticker {
28+
override def read(): Long = getCurrentTimeNanos
29+
}
30+
CacheBuilder
31+
.newBuilder()
32+
.weakKeys()
33+
.expireAfterAccess(javaDuration)
34+
.ticker(ticker)
35+
.build[RemoteAddress, NotUsed]()
36+
}
37+
38+
private[this] def isBelowRateLimit(ip: RemoteAddress): Boolean = {
39+
var exists = true
40+
lru.get(
41+
ip,
42+
() => {
43+
exists = false
44+
NotUsed
45+
}
46+
)
47+
exists
48+
}
1649

17-
def isBelowRateLimit(clientAddress: RemoteAddress): Boolean = {
18-
val timeMillis = clock.instant().toEpochMilli
19-
val latestRequestTimestamp = latestRequestTimestamps.getOrElse(clientAddress, 0L)
50+
// Override this to test
51+
protected def getCurrentTimeNanos: Long = System.nanoTime()
2052

21-
val response = latestRequestTimestamp + config.rateLimit.minRequestInterval.toMillis < timeMillis
22-
if (response) latestRequestTimestamps.put(clientAddress, timeMillis)
23-
response
53+
// Such algebras prevent if-elseif-else boilerplate in the JsonRPCServer code
54+
// It is also guaranteed that:
55+
// 1) no IP address is extracted unless config.enabled is true
56+
// 2) no LRU is created unless config.enabled is true
57+
// 3) cache is accessed only once (using get)
58+
override def tapply(f: Unit => Route): Route = {
59+
if (config.enabled) {
60+
extractClientIP { ip =>
61+
if (isBelowRateLimit(ip)) {
62+
val err = JsonRpcError.RateLimitError(minInterval)
63+
complete((StatusCodes.TooManyRequests, err))
64+
} else {
65+
f.apply(())
66+
}
67+
}
68+
} else f.apply(())
2469
}
70+
2571
}

src/test/scala/io/iohk/ethereum/jsonrpc/server/http/JsonRpcHttpServerSpec.scala

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.iohk.ethereum.jsonrpc.server.http
22

33
import java.net.InetAddress
4-
import java.time.{Clock, Instant, ZoneId}
54
import java.util.concurrent.TimeUnit
65

76
import akka.actor.ActorSystem
@@ -215,7 +214,7 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout
215214
status shouldEqual StatusCodes.TooManyRequests
216215
}
217216

218-
fakeClock.advanceTime(2 * serverConfigWithRateLimit.rateLimit.minRequestInterval.toMillis)
217+
mockJsonRpcHttpServerWithRateLimit.mockedTime = 50000000L
219218

220219
postRequest ~> Route.seal(mockJsonRpcHttpServerWithRateLimit.route) ~> check {
221220
status shouldEqual StatusCodes.OK
@@ -382,7 +381,7 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout
382381

383382
val rateLimitConfig = new RateLimitConfig {
384383
override val enabled: Boolean = false
385-
override val minRequestInterval: FiniteDuration = FiniteDuration.apply(5, TimeUnit.SECONDS)
384+
override val minRequestInterval: FiniteDuration = FiniteDuration.apply(20, TimeUnit.MILLISECONDS)
386385
override val latestTimestampCacheSize: Int = 1024
387386
}
388387

@@ -397,7 +396,7 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout
397396

398397
val rateLimitEnabledConfig = new RateLimitConfig {
399398
override val enabled: Boolean = true
400-
override val minRequestInterval: FiniteDuration = FiniteDuration.apply(5, TimeUnit.SECONDS)
399+
override val minRequestInterval: FiniteDuration = FiniteDuration.apply(20, TimeUnit.MILLISECONDS)
401400
override val latestTimestampCacheSize: Int = 1024
402401
}
403402

@@ -412,31 +411,27 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout
412411

413412
val mockJsonRpcController = mock[JsonRpcController]
414413
val mockJsonRpcHealthChecker = mock[JsonRpcHealthChecker]
415-
val fakeClock = new FakeClock
416414

417415
val mockJsonRpcHttpServer = new FakeJsonRpcHttpServer(
418416
jsonRpcController = mockJsonRpcController,
419417
jsonRpcHealthChecker = mockJsonRpcHealthChecker,
420418
config = serverConfig,
421-
cors = serverConfig.corsAllowedOrigins,
422-
testClock = fakeClock
419+
cors = serverConfig.corsAllowedOrigins
423420
)
424421

425422
val corsAllowedOrigin = HttpOrigin("http://localhost:3333")
426423
val mockJsonRpcHttpServerWithCors = new FakeJsonRpcHttpServer(
427424
jsonRpcController = mockJsonRpcController,
428425
jsonRpcHealthChecker = mockJsonRpcHealthChecker,
429426
config = serverConfig,
430-
cors = HttpOriginMatcher(corsAllowedOrigin),
431-
testClock = fakeClock
427+
cors = HttpOriginMatcher(corsAllowedOrigin)
432428
)
433429

434430
val mockJsonRpcHttpServerWithRateLimit = new FakeJsonRpcHttpServer(
435431
jsonRpcController = mockJsonRpcController,
436432
jsonRpcHealthChecker = mockJsonRpcHealthChecker,
437433
config = serverConfigWithRateLimit,
438-
cors = serverConfigWithRateLimit.corsAllowedOrigins,
439-
testClock = fakeClock
434+
cors = serverConfigWithRateLimit.corsAllowedOrigins
440435
)
441436
}
442437
}
@@ -467,27 +462,17 @@ class FakeJsonRpcHttpServer(
467462
val jsonRpcController: JsonRpcBaseController,
468463
val jsonRpcHealthChecker: JsonRpcHealthChecker,
469464
val config: JsonRpcHttpServerConfig,
470-
val cors: HttpOriginMatcher,
471-
val testClock: Clock
465+
val cors: HttpOriginMatcher
472466
)(implicit val actorSystem: ActorSystem)
473467
extends JsonRpcHttpServer
474468
with Logger {
475469
def run(): Unit = ()
476470
override def corsAllowedOrigins: HttpOriginMatcher = cors
477-
override val clock = testClock
478-
}
479-
480-
class FakeClock extends Clock {
481471

482-
var time: Instant = Instant.now()
472+
var mockedTime:Long = 0L
483473

484-
def advanceTime(seconds: Long): Unit = {
485-
time = time.plusSeconds(seconds)
474+
override protected val rateLimit: RateLimit = new RateLimit(config.rateLimit) {
475+
override protected def getCurrentTimeNanos: Long = FakeJsonRpcHttpServer.this.mockedTime
486476
}
487477

488-
override def getZone: ZoneId = ZoneId.systemDefault()
489-
490-
override def withZone(zone: ZoneId): Clock = Clock.fixed(time, getZone)
491-
492-
override def instant(): Instant = time
493478
}

0 commit comments

Comments
 (0)