Skip to content

[CHORE] Update akka version #662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@ val commonSettings = Seq(
resolvers += "rocksDb" at "https://dl.bintray.com/ethereum/maven/"

val dep = {
val akkaVersion = "2.5.12"
val akkaHttpVersion = "10.1.1"
val akkaVersion = "2.6.9"
val akkaHttpVersion = "10.2.0"
val circeVersion = "0.9.3"
val rocksDb = "5.9.2"

Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
"ch.megard" %% "akka-http-cors" % "0.3.0",
"ch.megard" %% "akka-http-cors" % "1.1.0",
"org.json4s" %% "json4s-native" % "3.5.4",
"de.heikoseeberger" %% "akka-http-json4s" % "1.21.0",
"de.heikoseeberger" %% "akka-http-json4s" % "1.34.0",
"com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % "it,test",
"io.suzaku" %% "boopickle" % "1.3.0",
"org.ethereum" % "rocksdbjni" % rocksDb,
Expand All @@ -37,9 +38,8 @@ val dep = {
"io.circe" %% "circe-generic" % circeVersion,
"io.circe" %% "circe-parser" % circeVersion,
"io.circe" %% "circe-generic-extras" % circeVersion,
"com.miguno.akka" %% "akka-mock-scheduler" % "0.5.1" % "it,test",
"com.miguno.akka" %% "akka-mock-scheduler" % "0.5.5" % "it,test",
"commons-io" % "commons-io" % "2.6",
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"org.scala-sbt.ipcsocket" % "ipcsocket" % "1.0.0",
"org.bouncycastle" % "bcprov-jdk15on" % "1.59",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
Expand All @@ -50,7 +50,6 @@ val dep = {
// mallet deps
"org.jline" % "jline" % "3.1.2",
"net.java.dev.jna" % "jna" % "4.5.1",
"org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.5",
"com.github.scopt" %% "scopt" % "3.7.0",

// Metrics (https://github.com/DataDog/java-dogstatsd-client)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ class DumpChainActor(peerManager: ActorRef, peerMessageBus: ActorRef, startBlock
}

//Periodically try to connect to bootstrap peer in case the connection failed before dump termination
val connectToBootstrapTimeout: Cancellable = context.system.scheduler.schedule(0 seconds, 4 seconds, () =>
peerManager ! PeerManagerActor.ConnectToPeer(new URI(bootstrapNode)))
val connectToBootstrapTimeout: Cancellable = context.system.scheduler.scheduleWithFixedDelay(0 seconds, 4 seconds, peerManager, PeerManagerActor.ConnectToPeer(new URI(bootstrapNode)))

val assignWorkTimeout: Cancellable = context.system.scheduler.schedule(0 seconds, 2 seconds, () => assignWork())
val assignWorkTimeout: Cancellable = context.system.scheduler.scheduleWithFixedDelay(0 seconds, 2 seconds)(() => assignWork())

// scalastyle:off
override def receive: Receive = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ class FastSync(

//Delay before starting to persist snapshot. It should be 0, as the presence of it marks that fast sync was started
private val persistStateSnapshotDelay: FiniteDuration = 0.seconds
private val syncStatePersistCancellable = scheduler.schedule(persistStateSnapshotDelay, persistStateSnapshotInterval, self, PersistSyncState)
private val printStatusCancellable = scheduler.schedule(printStatusInterval, printStatusInterval, self, PrintStatus)
private val heartBeat = scheduler.schedule(syncRetryInterval, syncRetryInterval * 2, self, ProcessSyncing)
private val syncStatePersistCancellable = scheduler.scheduleWithFixedDelay(persistStateSnapshotDelay, persistStateSnapshotInterval, self, PersistSyncState)
private val printStatusCancellable = scheduler.scheduleWithFixedDelay(printStatusInterval, printStatusInterval, self, PrintStatus)
private val heartBeat = scheduler.scheduleWithFixedDelay(syncRetryInterval, syncRetryInterval * 2, self, ProcessSyncing)

def receive: Receive = handleCommonMessages orElse {
case UpdateTargetBlock(state) => updateTargetBlock(state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ trait PeerListSupport {

var handshakedPeers: PeersMap = Map.empty

scheduler.schedule(0.seconds, syncConfig.peersScanInterval, etcPeerManager, EtcPeerManagerActor.GetHandshakedPeers)(global, context.self)
scheduler.scheduleWithFixedDelay(0.seconds, syncConfig.peersScanInterval, etcPeerManager, EtcPeerManagerActor.GetHandshakedPeers)(global, context.self)

def removePeer(peerId: PeerId): Unit = {
peerEventBus ! Unsubscribe(PeerDisconnectedClassifier(PeerSelector.WithId(peerId)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class PeersClient(
implicit val ec: ExecutionContext = context.dispatcher

val statusSchedule: Cancellable =
scheduler.schedule(syncConfig.printStatusInterval, syncConfig.printStatusInterval, self, PrintStatus)
scheduler.scheduleWithFixedDelay(syncConfig.printStatusInterval, syncConfig.printStatusInterval, self, PrintStatus)

def receive: Receive = running(Map())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class OldRegularSync(
import OldRegularSync._
import syncConfig._

scheduler.schedule(printStatusInterval, printStatusInterval, self, PrintStatus)(global)
scheduler.scheduleWithFixedDelay(printStatusInterval, printStatusInterval, self, PrintStatus)(global)

peerEventBus ! Subscribe(MessageClassifier(Set(NewBlock.code, NewBlockHashes.code), PeerSelector.AllPeers))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ class RegularSync(
"block-importer")

val printFetcherSchedule: Cancellable =
scheduler.schedule(syncConfig.printStatusInterval, syncConfig.printStatusInterval, fetcher, BlockFetcher.PrintStatus)(context.dispatcher)
scheduler.scheduleWithFixedDelay(syncConfig.printStatusInterval, syncConfig.printStatusInterval, fetcher, BlockFetcher.PrintStatus)(context.dispatcher)
val printImporterSchedule: Cancellable =
scheduler.schedule(syncConfig.printStatusInterval, syncConfig.printStatusInterval, importer, BlockImporter.PrintStatus)(context.dispatcher)
scheduler.scheduleWithFixedDelay(syncConfig.printStatusInterval, syncConfig.printStatusInterval, importer, BlockImporter.PrintStatus)(context.dispatcher)

override def receive: Receive = {
case Start =>
Expand Down
8 changes: 2 additions & 6 deletions src/main/scala/io/iohk/ethereum/extvm/ExtVMInterface.scala
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
package io.iohk.ethereum.extvm

import java.nio.ByteOrder

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Framing, Keep, Sink, SinkQueueWithCancel, Source, SourceQueueWithComplete, Tcp}
import akka.util.ByteString
import io.iohk.ethereum.ledger.{InMemoryWorldStateProxy, InMemoryWorldStateProxyStorage}
import io.iohk.ethereum.utils.{BlockchainConfig, VmConfig}
import io.iohk.ethereum.vm._

import java.nio.ByteOrder
import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}

class ExtVMInterface(externaVmConfig: VmConfig.ExternalConfig, blockchainConfig: BlockchainConfig, testMode: Boolean)(implicit system: ActorSystem)
extends VM[InMemoryWorldStateProxy, InMemoryWorldStateProxyStorage]{

private implicit val materializer = ActorMaterializer()

private var out: Option[SourceQueueWithComplete[ByteString]] = None

private var in: Option[SinkQueueWithCancel[ByteString]] = None
Expand Down
11 changes: 3 additions & 8 deletions src/main/scala/io/iohk/ethereum/extvm/VMServer.scala
Original file line number Diff line number Diff line change
@@ -1,28 +1,23 @@
package io.iohk.ethereum.extvm

import java.nio.ByteOrder

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Framing, Keep, Sink, Source, Tcp}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.util.ByteString
import com.google.protobuf.{ByteString => GByteString}
import com.typesafe.config.ConfigFactory
import io.iohk.ethereum.domain.{Address, BlockHeader}
import io.iohk.ethereum.extvm.Implicits._
import io.iohk.ethereum.utils._
import io.iohk.ethereum.vm._
import io.iohk.ethereum.vm.BlockchainConfigForEvm
import io.iohk.ethereum.vm.ProgramResult

import io.iohk.ethereum.vm.{BlockchainConfigForEvm, ProgramResult, _}
import java.nio.ByteOrder
import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}

object VmServerApp extends Logger {

implicit val system = ActorSystem("EVM_System")
implicit val materializer = ActorMaterializer()

def main(args: Array[String]): Unit = {
val config = ConfigFactory.load()
Expand Down
4 changes: 1 addition & 3 deletions src/main/scala/io/iohk/ethereum/faucet/Faucet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import java.security.SecureRandom

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import io.iohk.ethereum.keystore.KeyStoreImpl
import io.iohk.ethereum.mallet.service.RpcClient
Expand All @@ -19,13 +18,12 @@ object Faucet extends Logger {
val config = FaucetConfig(ConfigFactory.load())

implicit val system = ActorSystem("Faucet-system")
implicit val materializer = ActorMaterializer()

val keyStore = new KeyStoreImpl(KeyStoreConfig.customKeyStoreConfig(config.keyStoreDir), new SecureRandom())
val rpcClient = new RpcClient(config.rpcAddress)
val api = new FaucetApi(rpcClient, keyStore, config)

val bindingResultF = Http().bindAndHandle(api.route, config.listenInterface, config.listenPort)
val bindingResultF = Http().newServerAt(config.listenInterface, config.listenPort).bind(api.route)

bindingResultF onComplete {
case Success(serverBinding) => log.info(s"Faucet HTTP server listening on ${serverBinding.localAddress}")
Expand Down
5 changes: 2 additions & 3 deletions src/main/scala/io/iohk/ethereum/faucet/FaucetConfig.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package io.iohk.ethereum.faucet

import akka.http.scaladsl.model.headers.HttpOriginRange
import ch.megard.akka.http.cors.scaladsl.model.HttpOriginMatcher
import com.typesafe.config.{Config => TypesafeConfig}
import io.iohk.ethereum.domain.Address
import io.iohk.ethereum.utils.ConfigUtils

import scala.concurrent.duration.{FiniteDuration, _}

case class FaucetConfig(
Expand All @@ -13,7 +12,7 @@ case class FaucetConfig(
txGasPrice: BigInt,
txGasLimit: BigInt,
txValue: BigInt,
corsAllowedOrigins: HttpOriginRange,
corsAllowedOrigins: HttpOriginMatcher,
rpcAddress: String,
keyStoreDir: String,
listenInterface: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package io.iohk.ethereum.jsonrpc.server.http

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.HttpOriginRange
import akka.stream.ActorMaterializer
import ch.megard.akka.http.cors.scaladsl.model.HttpOriginMatcher
import io.iohk.ethereum.jsonrpc._
import io.iohk.ethereum.jsonrpc.server.http.JsonRpcHttpServer.JsonRpcHttpServerConfig
import io.iohk.ethereum.utils.Logger

import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

Expand All @@ -16,15 +14,13 @@ class BasicJsonRpcHttpServer(val jsonRpcController: JsonRpcController, config: J
extends JsonRpcHttpServer with Logger {

def run(): Unit = {
implicit val materializer = ActorMaterializer()

val bindingResultF = Http(actorSystem).bindAndHandle(route, config.interface, config.port)
val bindingResultF = Http(actorSystem).newServerAt(config.interface, config.port).bind(route)

bindingResultF onComplete {
case Success(serverBinding) => log.info(s"JSON RPC HTTP server listening on ${serverBinding.localAddress}")
case Failure(ex) => log.error("Cannot start JSON HTTP RPC server", ex)
}
}

override def corsAllowedOrigins: HttpOriginRange = config.corsAllowedOrigins
override def corsAllowedOrigins: HttpOriginMatcher = config.corsAllowedOrigins
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
package io.iohk.ethereum.jsonrpc.server.http

import java.security.SecureRandom

import akka.actor.ActorSystem
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.headers.HttpOriginRange
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{MalformedRequestContentRejection, RejectionHandler, Route}
import ch.megard.akka.http.cors.javadsl.CorsRejection
import ch.megard.akka.http.cors.scaladsl.CorsDirectives._
import ch.megard.akka.http.cors.scaladsl.model.HttpOriginMatcher
import ch.megard.akka.http.cors.scaladsl.settings.CorsSettings
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import io.iohk.ethereum.jsonrpc.{JsonRpcController, JsonRpcErrors, JsonRpcRequest, JsonRpcResponse}
import io.iohk.ethereum.utils.{ConfigUtils, Logger}
import java.security.SecureRandom
import org.json4s.JsonAST.JInt
import org.json4s.{DefaultFormats, native}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.Try
Expand All @@ -27,7 +25,7 @@ trait JsonRpcHttpServer extends Json4sSupport {

implicit val formats = DefaultFormats

def corsAllowedOrigins: HttpOriginRange
def corsAllowedOrigins: HttpOriginMatcher

val corsSettings = CorsSettings.defaultSettings
.withAllowGenericHttpRequests(true)
Expand Down Expand Up @@ -84,7 +82,7 @@ object JsonRpcHttpServer extends Logger {
val certificateKeyStorePath: Option[String]
val certificateKeyStoreType: Option[String]
val certificatePasswordFile: Option[String]
val corsAllowedOrigins: HttpOriginRange
val corsAllowedOrigins: HttpOriginMatcher
}

object JsonRpcHttpServerConfig {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package io.iohk.ethereum.jsonrpc.server.http

import java.io.{File, FileInputStream}
import java.security.{KeyStore, SecureRandom}
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}

import akka.actor.ActorSystem
import akka.http.scaladsl.model.headers.HttpOriginRange
import akka.http.scaladsl.{ConnectionContext, Http}
import akka.stream.ActorMaterializer
import ch.megard.akka.http.cors.scaladsl.model.HttpOriginMatcher
import io.iohk.ethereum.jsonrpc.JsonRpcController
import io.iohk.ethereum.jsonrpc.server.http.JsonRpcHttpServer.JsonRpcHttpServerConfig
import io.iohk.ethereum.jsonrpc.server.http.JsonRpcHttpsServer.HttpsSetupResult
import io.iohk.ethereum.utils.Logger

import java.io.{File, FileInputStream}
import java.security.{KeyStore, SecureRandom}
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.io.Source
import scala.util.{Failure, Success, Try}
Expand All @@ -22,8 +19,6 @@ class JsonRpcHttpsServer(val jsonRpcController: JsonRpcController, config: JsonR
extends JsonRpcHttpServer with Logger {

def run(): Unit = {
implicit val materializer = ActorMaterializer()

val maybeSslContext = validateCertificateFiles(config.certificateKeyStorePath, config.certificateKeyStoreType, config.certificatePasswordFile).flatMap{
case (keystorePath, keystoreType, passwordFile) =>
val passwordReader = Source.fromFile(passwordFile)
Expand All @@ -35,12 +30,11 @@ class JsonRpcHttpsServer(val jsonRpcController: JsonRpcController, config: JsonR
}
}

val maybeHttpsContext = maybeSslContext.map(sslContext => ConnectionContext.https(sslContext))
val maybeHttpsContext = maybeSslContext.map(sslContext => ConnectionContext.httpsServer(sslContext))

maybeHttpsContext match {
case Right(httpsContext) =>
Http().setDefaultServerHttpContext(httpsContext)
val bindingResultF = Http().bindAndHandle(route, config.interface, config.port, connectionContext = httpsContext)
val bindingResultF = Http().newServerAt(config.interface, config.port).enableHttps(httpsContext).bind(route)

bindingResultF onComplete {
case Success(serverBinding) => log.info(s"JSON RPC HTTPS server listening on ${serverBinding.localAddress}")
Expand Down Expand Up @@ -113,7 +107,7 @@ class JsonRpcHttpsServer(val jsonRpcController: JsonRpcController, config: JsonR
Left("HTTPS requires: certificate-keystore-path, certificate-keystore-type and certificate-password-file to be configured")
}

override def corsAllowedOrigins: HttpOriginRange = config.corsAllowedOrigins
override def corsAllowedOrigins: HttpOriginMatcher = config.corsAllowedOrigins
}

object JsonRpcHttpsServer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import java.util.UUID
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import io.circe.generic.auto._
Expand All @@ -32,7 +31,6 @@ object RpcClient {
val akkaConfig = ConfigFactory.load("mallet")

implicit val system = ActorSystem("mallet_rpc", akkaConfig)
implicit val mat = ActorMaterializer()
implicit val ec = scala.concurrent.ExecutionContext.Implicits.global

new RpcClient(node)
Expand All @@ -43,7 +41,7 @@ object RpcClient {
* Talks to a node over HTTP(S) JSON-RPC
* Note: the URI schema determines whether HTTP or HTTPS is used
*/
class RpcClient(node: Uri)(implicit system: ActorSystem, mat: ActorMaterializer, ec: ExecutionContext) {
class RpcClient(node: Uri)(implicit system: ActorSystem, ec: ExecutionContext) {
import CommonJsonCodecs._

//TODO: CL option
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class KnownNodesManager(

var toRemove: Set[URI] = Set.empty

scheduler.schedule(config.persistInterval, config.persistInterval, self, PersistChanges)
scheduler.scheduleWithFixedDelay(config.persistInterval, config.persistInterval, self, PersistChanges)

override def receive: Receive = {
case AddKnownNode(uri) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ class PeerManagerActor(
}

private def scheduleNodesUpdate(): Unit = {
scheduler.schedule(peerConfiguration.updateNodesInitialDelay, peerConfiguration.updateNodesInterval){
peerDiscoveryManager ! PeerDiscoveryManager.GetDiscoveredNodesInfo
}
scheduler.scheduleWithFixedDelay(
peerConfiguration.updateNodesInitialDelay,
peerConfiguration.updateNodesInterval,
peerDiscoveryManager,
PeerDiscoveryManager.GetDiscoveredNodesInfo
)
}

def listen(pendingPeers: PeerMap, peers: PeerMap): Receive = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class PeerDiscoveryManager(

if (discoveryConfig.discoveryEnabled) {
discoveryListener ! DiscoveryListener.Subscribe
context.system.scheduler.schedule(discoveryConfig.scanInitialDelay, discoveryConfig.scanInterval, self, Scan)
context.system.scheduler.scheduleWithFixedDelay(discoveryConfig.scanInitialDelay, discoveryConfig.scanInterval, self, Scan)
}

def scan(): Unit = {
Expand Down
Loading