Skip to content

Commit aaf3c85

Browse files
authored
[ETCM-252] faucet - handle account (#783)
* add faucet actors * add supervisor config * change test name * change status from faucet * add wallet service spec * add test from actor * add more test * add new properties * fix properties * add changes from faucet rpc service * faucet shutdown when I can't load the wallet * remove todo of name * add faucet handler builder * fix config
1 parent 7a1323e commit aaf3c85

File tree

13 files changed

+593
-77
lines changed

13 files changed

+593
-77
lines changed

src/main/scala/io/iohk/ethereum/faucet/FaucetConfig.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package io.iohk.ethereum.faucet
22

3-
import com.typesafe.config.{ConfigFactory, Config => TypesafeConfig}
3+
import com.typesafe.config.{ConfigFactory, Config}
44
import io.iohk.ethereum.domain.Address
55

66
import scala.concurrent.duration.{FiniteDuration, _}
77

88
trait FaucetConfigBuilder {
9-
lazy val rawConfig: TypesafeConfig = ConfigFactory.load()
10-
lazy val rawMantisConfig: TypesafeConfig = rawConfig.getConfig("mantis")
9+
lazy val rawConfig: Config = ConfigFactory.load()
10+
lazy val rawMantisConfig: Config = rawConfig.getConfig("mantis")
1111
lazy val faucetConfig: FaucetConfig = FaucetConfig(rawConfig)
1212
}
1313

@@ -19,11 +19,15 @@ case class FaucetConfig(
1919
txValue: BigInt,
2020
rpcAddress: String,
2121
keyStoreDir: String,
22-
minRequestInterval: FiniteDuration
22+
minRequestInterval: FiniteDuration,
23+
handlerTimeout: FiniteDuration,
24+
responseTimeout: FiniteDuration,
25+
supervisor: SupervisorConfig,
26+
shutdownTimeout: FiniteDuration
2327
)
2428

2529
object FaucetConfig {
26-
def apply(typesafeConfig: TypesafeConfig): FaucetConfig = {
30+
def apply(typesafeConfig: Config): FaucetConfig = {
2731
val faucetConfig = typesafeConfig.getConfig("faucet")
2832

2933
FaucetConfig(
@@ -34,7 +38,11 @@ object FaucetConfig {
3438
txValue = faucetConfig.getLong("tx-value"),
3539
rpcAddress = faucetConfig.getString("rpc-address"),
3640
keyStoreDir = faucetConfig.getString("keystore-dir"),
37-
minRequestInterval = faucetConfig.getDuration("min-request-interval").toMillis.millis
41+
minRequestInterval = faucetConfig.getDuration("min-request-interval").toMillis.millis,
42+
handlerTimeout = faucetConfig.getDuration("handler-timeout").toMillis.millis,
43+
responseTimeout = faucetConfig.getDuration("response-timeout").toMillis.millis,
44+
supervisor = SupervisorConfig(faucetConfig),
45+
shutdownTimeout = faucetConfig.getDuration("shutdown-timeout").toMillis.millis
3846
)
3947
}
4048
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package io.iohk.ethereum.faucet
2+
3+
import akka.actor.{Actor, ActorLogging, Props}
4+
import akka.util.ByteString
5+
import io.iohk.ethereum.domain.Address
6+
import io.iohk.ethereum.faucet.FaucetHandler.WalletException
7+
import io.iohk.ethereum.faucet.FaucetStatus.WalletAvailable
8+
import io.iohk.ethereum.faucet.jsonrpc.WalletService
9+
import io.iohk.ethereum.keystore.KeyStore.KeyStoreError
10+
import io.iohk.ethereum.keystore.Wallet
11+
import monix.execution.Scheduler.Implicits.global
12+
13+
class FaucetHandler(walletService: WalletService, config: FaucetConfig) extends Actor with ActorLogging {
14+
15+
import FaucetHandler.FaucetHandlerMsg._
16+
import FaucetHandler.FaucetHandlerResponse._
17+
18+
override def preStart(): Unit = {
19+
self ! Initialization
20+
}
21+
22+
override def receive: Receive = unavailable()
23+
24+
private def unavailable(): Receive = {
25+
case Status =>
26+
sender() ! StatusResponse(FaucetStatus.FaucetUnavailable)
27+
28+
case Initialization => {
29+
log.info("Initialization called (faucet unavailable)")
30+
walletService.getWallet.runSyncUnsafe() match {
31+
case Left(error) =>
32+
log.error(s"Couldn't initialize wallet - error: $error")
33+
throw new WalletException(error)
34+
case Right(wallet) =>
35+
log.info("Faucet initialization succeeded")
36+
context become available(wallet)
37+
}
38+
}
39+
case SendFunds(addressTo: Address) =>
40+
log.info(
41+
s"SendFunds called, to: $addressTo, value: ${config.txValue}, gas price: ${config.txGasPrice}," +
42+
s" gas limit: ${config.txGasLimit} (faucet unavailable)"
43+
)
44+
sender() ! FaucetIsUnavailable
45+
}
46+
47+
private def available(wallet: Wallet): Receive = {
48+
case Status =>
49+
val respondTo = sender()
50+
respondTo ! StatusResponse(WalletAvailable)
51+
52+
case Initialization =>
53+
log.debug("Initialization called (faucet available)")
54+
sender() ! FaucetIsAlreadyAvailable
55+
56+
case SendFunds(addressTo: Address) =>
57+
log.info(
58+
s"SendFunds called, to: $addressTo, value: ${config.txValue}, gas price: ${config.txGasPrice}, gas limit: ${config.txGasLimit} (faucet available)"
59+
)
60+
val respondTo = sender()
61+
// We Only consider the request fail if we found out
62+
// wallet is not properly initialized
63+
walletService
64+
.sendFunds(wallet, addressTo)
65+
.map {
66+
case Right(txHash) =>
67+
respondTo ! TransactionSent(txHash)
68+
case Left(error) =>
69+
respondTo ! WalletRpcClientError(error.msg)
70+
}
71+
.runAsyncAndForget
72+
}
73+
}
74+
75+
object FaucetHandler {
76+
77+
sealed abstract class FaucetHandlerMsg
78+
object FaucetHandlerMsg {
79+
case object Status extends FaucetHandlerMsg
80+
case object Initialization extends FaucetHandlerMsg
81+
case class SendFunds(address: Address) extends FaucetHandlerMsg
82+
}
83+
sealed trait FaucetHandlerResponse
84+
object FaucetHandlerResponse {
85+
case class StatusResponse(status: FaucetStatus) extends FaucetHandlerResponse
86+
case object FaucetIsUnavailable extends FaucetHandlerResponse
87+
case object FaucetIsAlreadyAvailable extends FaucetHandlerResponse
88+
case class WalletRpcClientError(error: String) extends FaucetHandlerResponse
89+
case class TransactionSent(txHash: ByteString) extends FaucetHandlerResponse
90+
}
91+
92+
class WalletException(keyStoreError: KeyStoreError) extends RuntimeException(keyStoreError.toString)
93+
94+
def props(walletRpcClient: WalletService, config: FaucetConfig): Props = Props(
95+
new FaucetHandler(walletRpcClient, config)
96+
)
97+
98+
val name: String = "FaucetHandler"
99+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.iohk.ethereum.faucet
2+
3+
import akka.actor.{ActorRef, ActorSystem, OneForOneStrategy, SupervisorStrategy}
4+
import akka.pattern.{BackoffOpts, BackoffSupervisor}
5+
import io.iohk.ethereum.faucet.FaucetHandler.WalletException
6+
import io.iohk.ethereum.faucet.jsonrpc.WalletService
7+
import io.iohk.ethereum.utils.Logger
8+
9+
import scala.concurrent.duration._
10+
11+
object FaucetSupervisor {
12+
val name = "FaucetSupervisor"
13+
}
14+
15+
class FaucetSupervisor(walletRpcClient: WalletService, config: FaucetConfig, shutdown: () => Unit)(implicit
16+
system: ActorSystem
17+
) extends Logger {
18+
19+
val childProps = FaucetHandler.props(walletRpcClient, config)
20+
21+
val minBackoff: FiniteDuration = config.supervisor.minBackoff
22+
val maxBackoff: FiniteDuration = config.supervisor.maxBackoff
23+
val randomFactor: Double = config.supervisor.randomFactor
24+
val autoReset: FiniteDuration = config.supervisor.autoReset
25+
26+
val supervisorProps = BackoffSupervisor.props(
27+
BackoffOpts
28+
.onFailure(
29+
childProps,
30+
childName = FaucetHandler.name,
31+
minBackoff = minBackoff,
32+
maxBackoff = maxBackoff,
33+
randomFactor = randomFactor
34+
)
35+
.withAutoReset(autoReset)
36+
.withSupervisorStrategy(OneForOneStrategy() {
37+
case error: WalletException
38+
log.error(s"Stop ${FaucetHandler.name}", error)
39+
shutdown()
40+
SupervisorStrategy.Stop
41+
case error
42+
log.error(s"Restart ${FaucetHandler.name}", error)
43+
SupervisorStrategy.Restart
44+
})
45+
)
46+
val supervisor: ActorRef = system.actorOf(supervisorProps, FaucetSupervisor.name)
47+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.iohk.ethereum.faucet
2+
3+
import com.typesafe.config.Config
4+
import scala.concurrent.duration.{FiniteDuration, _}
5+
6+
case class SupervisorConfig(
7+
minBackoff: FiniteDuration,
8+
maxBackoff: FiniteDuration,
9+
randomFactor: Double,
10+
autoReset: FiniteDuration,
11+
attempts: Int,
12+
delay: FiniteDuration
13+
)
14+
object SupervisorConfig {
15+
def apply(typesafeConfig: Config): SupervisorConfig = {
16+
val supervisorConfig = typesafeConfig.getConfig("supervisor")
17+
18+
SupervisorConfig(
19+
supervisorConfig.getDuration("min-backoff").toMillis.millis,
20+
supervisorConfig.getDuration("man-backoff").toMillis.millis,
21+
supervisorConfig.getDouble("random-factor"),
22+
supervisorConfig.getDuration("auto-reset").toMillis.millis,
23+
supervisorConfig.getInt("attempts"),
24+
supervisorConfig.getDuration("delay").toMillis.millis
25+
)
26+
27+
}
28+
}

src/main/scala/io/iohk/ethereum/faucet/jsonrpc/FaucetBuilder.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ package io.iohk.ethereum.faucet.jsonrpc
33
import java.security.SecureRandom
44

55
import akka.actor.ActorSystem
6-
import io.iohk.ethereum.faucet.FaucetConfigBuilder
6+
import io.iohk.ethereum.faucet.{FaucetConfigBuilder, FaucetSupervisor}
77
import io.iohk.ethereum.jsonrpc.server.controllers.ApisBase
88
import io.iohk.ethereum.jsonrpc.server.controllers.JsonRpcBaseController.JsonRpcConfig
99
import io.iohk.ethereum.jsonrpc.server.http.JsonRpcHttpServer
1010
import io.iohk.ethereum.keystore.KeyStoreImpl
1111
import io.iohk.ethereum.mallet.service.RpcClient
1212
import io.iohk.ethereum.utils.{ConfigUtils, KeyStoreConfig, Logger}
1313

14+
import scala.concurrent.Await
1415
import scala.util.Try
1516

1617
trait ActorSystemBuilder {
@@ -25,11 +26,13 @@ trait FaucetControllerBuilder {
2526
}
2627

2728
trait FaucetRpcServiceBuilder {
28-
self: FaucetConfigBuilder with FaucetControllerBuilder with ActorSystemBuilder =>
29+
self: FaucetConfigBuilder with FaucetControllerBuilder with ActorSystemBuilder with ShutdownHookBuilder =>
2930

3031
val keyStore = new KeyStoreImpl(KeyStoreConfig.customKeyStoreConfig(faucetConfig.keyStoreDir), new SecureRandom())
3132
val rpcClient = new RpcClient(faucetConfig.rpcAddress)
32-
val faucetRpcService = new FaucetRpcService(rpcClient, keyStore, faucetConfig)
33+
val walletService = new WalletService(rpcClient, keyStore, faucetConfig)
34+
val faucetSupervisor: FaucetSupervisor = new FaucetSupervisor(walletService, faucetConfig, shutdown)(system)
35+
val faucetRpcService = new FaucetRpcService(faucetConfig)(system)
3336
}
3437

3538
trait FaucetJsonRpcHealthCheckBuilder {
@@ -84,6 +87,20 @@ trait FaucetJsonRpcHttpServerBuilder {
8487
)
8588
}
8689

90+
trait ShutdownHookBuilder {
91+
self: ActorSystemBuilder with FaucetConfigBuilder with Logger =>
92+
93+
def shutdown(): Unit = {
94+
Await.ready(
95+
system.terminate.map(
96+
_ ->
97+
log.info("actor system finished")
98+
)(system.dispatcher),
99+
faucetConfig.shutdownTimeout
100+
)
101+
}
102+
}
103+
87104
class FaucetServer
88105
extends ActorSystemBuilder
89106
with FaucetConfigBuilder
@@ -95,6 +112,7 @@ class FaucetServer
95112
with FaucetJsonRpcHealthCheckBuilder
96113
with FaucetJsonRpcControllerBuilder
97114
with FaucetJsonRpcHttpServerBuilder
115+
with ShutdownHookBuilder
98116
with Logger {
99117

100118
override def systemName: String = "Faucet-system"
@@ -109,4 +127,5 @@ class FaucetServer
109127
case Right(jsonRpcServer) => jsonRpcServer.run()
110128
case Left(error) => throw new RuntimeException(s"$error")
111129
}
130+
112131
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.iohk.ethereum.faucet.jsonrpc
2+
3+
import akka.actor.{ActorRef, ActorSystem}
4+
import akka.pattern.RetrySupport
5+
import akka.util.Timeout
6+
import io.iohk.ethereum.faucet.{FaucetConfigBuilder, FaucetHandler, FaucetSupervisor}
7+
import monix.eval.Task
8+
9+
trait FaucetHandlerBuilder {
10+
self: FaucetConfigBuilder with RetrySupport =>
11+
12+
val handlerPath = s"user/${FaucetSupervisor.name}/${FaucetHandler.name}"
13+
lazy val attempts = faucetConfig.supervisor.attempts
14+
lazy val delay = faucetConfig.supervisor.delay
15+
16+
lazy val handlerTimeout: Timeout = Timeout(faucetConfig.handlerTimeout)
17+
18+
def faucetHandler()(implicit system: ActorSystem): Task[ActorRef] = {
19+
Task.deferFuture(
20+
retry(() => system.actorSelection(handlerPath).resolveOne(handlerTimeout.duration), attempts, delay)(
21+
system.dispatcher,
22+
system.scheduler
23+
)
24+
)
25+
}
26+
27+
}

0 commit comments

Comments
 (0)