Skip to content

[ETCM-396] faucet timeout #829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Dec 4, 2020
26 changes: 20 additions & 6 deletions src/main/scala/io/iohk/ethereum/faucet/FaucetConfig.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.iohk.ethereum.faucet

import com.typesafe.config.{ConfigFactory, Config}
import com.typesafe.config.{Config, ConfigFactory}
import io.iohk.ethereum.domain.Address

import scala.concurrent.duration.{FiniteDuration, _}
Expand All @@ -11,36 +11,50 @@ trait FaucetConfigBuilder {
lazy val faucetConfig: FaucetConfig = FaucetConfig(rawConfig)
}

case class RpcClientConfig(
address: String,
timeout: FiniteDuration
)

object RpcClientConfig {
def apply(rpcClientConfig: Config): RpcClientConfig = {

RpcClientConfig(
address = rpcClientConfig.getString("rpc-address"),
timeout = rpcClientConfig.getDuration("timeout").toMillis.millis
)
}
}

case class FaucetConfig(
walletAddress: Address,
walletPassword: String,
txGasPrice: BigInt,
txGasLimit: BigInt,
txValue: BigInt,
rpcAddress: String,
rpcClient: RpcClientConfig,
keyStoreDir: String,
minRequestInterval: FiniteDuration,
handlerTimeout: FiniteDuration,
responseTimeout: FiniteDuration,
actorCommunicationMargin: FiniteDuration,
supervisor: SupervisorConfig,
shutdownTimeout: FiniteDuration
)

object FaucetConfig {
def apply(typesafeConfig: Config): FaucetConfig = {
val faucetConfig = typesafeConfig.getConfig("faucet")

FaucetConfig(
walletAddress = Address(faucetConfig.getString("wallet-address")),
walletPassword = faucetConfig.getString("wallet-password"),
txGasPrice = faucetConfig.getLong("tx-gas-price"),
txGasLimit = faucetConfig.getLong("tx-gas-limit"),
txValue = faucetConfig.getLong("tx-value"),
rpcAddress = faucetConfig.getString("rpc-client.rpc-address"),
rpcClient = RpcClientConfig(faucetConfig.getConfig("rpc-client")),
keyStoreDir = faucetConfig.getString("keystore-dir"),
minRequestInterval = faucetConfig.getDuration("min-request-interval").toMillis.millis,
handlerTimeout = faucetConfig.getDuration("handler-timeout").toMillis.millis,
responseTimeout = faucetConfig.getDuration("response-timeout").toMillis.millis,
actorCommunicationMargin = faucetConfig.getDuration("actor-communication-margin").toMillis.millis,
supervisor = SupervisorConfig(faucetConfig),
shutdownTimeout = faucetConfig.getDuration("shutdown-timeout").toMillis.millis
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object SupervisorConfig {

SupervisorConfig(
supervisorConfig.getDuration("min-backoff").toMillis.millis,
supervisorConfig.getDuration("man-backoff").toMillis.millis,
supervisorConfig.getDuration("max-backoff").toMillis.millis,
supervisorConfig.getDouble("random-factor"),
supervisorConfig.getDuration("auto-reset").toMillis.millis,
supervisorConfig.getInt("attempts"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ trait FaucetRpcServiceBuilder {
)

val walletRpcClient: WalletRpcClient =
new WalletRpcClient(faucetConfig.rpcAddress, () => sslContext("faucet.rpc-client"))
new WalletRpcClient(
faucetConfig.rpcClient.address,
faucetConfig.rpcClient.timeout,
() => sslContext("faucet.rpc-client")
)
val walletService = new WalletService(walletRpcClient, keyStore, faucetConfig)
val faucetSupervisor: FaucetSupervisor = new FaucetSupervisor(walletService, faucetConfig, shutdown)(system)
val faucetRpcService = new FaucetRpcService(faucetConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class FaucetRpcService(config: FaucetConfig)(implicit system: ActorSystem)
with FaucetHandlerSelector
with Logger {

implicit lazy val actorTimeout: Timeout = Timeout(config.responseTimeout)
implicit lazy val actorTimeout: Timeout = Timeout(config.actorCommunicationMargin + config.rpcClient.timeout)

def sendFunds(sendFundsRequest: SendFundsRequest): ServiceResponse[SendFundsResponse] =
selectFaucetHandler()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import javax.net.ssl.SSLContext
import monix.eval.Task

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration

class WalletRpcClient(node: Uri, getSSLContext: () => Either[SSLError, SSLContext])(implicit
class WalletRpcClient(node: Uri, timeout: Duration, getSSLContext: () => Either[SSLError, SSLContext])(implicit
system: ActorSystem,
ec: ExecutionContext
) extends RpcClient(node, getSSLContext)
) extends RpcClient(node, timeout, getSSLContext)
with Logger {
import io.iohk.ethereum.jsonrpc.client.CommonJsonCodecs._

Expand Down
30 changes: 26 additions & 4 deletions src/main/scala/io/iohk/ethereum/jsonrpc/client/RpcClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import java.util.UUID

import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.{ConnectionContext, Http, HttpsConnectionContext}
import akka.stream.StreamTcpException
import akka.stream.scaladsl.TcpIdleTimeoutException
import io.circe.generic.auto._
import io.circe.parser.parse
import io.circe.syntax._
Expand All @@ -18,8 +21,9 @@ import javax.net.ssl.SSLContext
import monix.eval.Task

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

abstract class RpcClient(node: Uri, getSSLContext: () => Either[SSLError, SSLContext])(implicit
abstract class RpcClient(node: Uri, timeout: Duration, getSSLContext: () => Either[SSLError, SSLContext])(implicit
system: ActorSystem,
ec: ExecutionContext
) extends Logger {
Expand All @@ -32,6 +36,12 @@ abstract class RpcClient(node: Uri, getSSLContext: () => Either[SSLError, SSLCon
Http().defaultClientHttpsContext
}

lazy val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(system)
.withConnectionSettings(
ClientConnectionSettings(system)
.withIdleTimeout(timeout)
)

protected def doRequest[T: Decoder](method: String, args: Seq[Json]): RpcResponse[T] = {
doJsonRequest(method, args).map(_.flatMap(getResult[T]))
}
Expand Down Expand Up @@ -60,11 +70,21 @@ abstract class RpcClient(node: Uri, getSSLContext: () => Either[SSLError, SSLCon

Task
.deferFuture(for {
response <- Http().singleRequest(request, connectionContext)
response <- Http().singleRequest(request, connectionContext, connectionPoolSettings)
data <- Unmarshal(response.entity).to[String]
} yield parse(data).left.map(e => RpcClientError(e.message)))
} yield parse(data).left.map(e => ParserError(e.message)))
.onErrorHandle { ex: Throwable =>
Left(RpcClientError(s"RPC request failed: ${exceptionToString(ex)}"))
ex match {
case _: TcpIdleTimeoutException =>
log.error("RPC request", ex)
Left(ConnectionError(s"RPC request timeout"))
case _: StreamTcpException =>
log.error("Connection not established", ex)
Left(ConnectionError(s"Connection not established"))
case _ =>
log.error("RPC request failed", ex)
Left(RpcClientError("RPC request failed"))
}
}
}

Expand Down Expand Up @@ -97,5 +117,7 @@ object RpcClient {

case class ParserError(msg: String) extends RpcError

case class ConnectionError(msg: String) extends RpcError

case class RpcClientError(msg: String) extends RpcError
}
5 changes: 3 additions & 2 deletions src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,18 @@ faucet {
rpc-client {
rpc-address = "http://127.0.0.1:8546/"
certificate = null
timeout = 2.seconds
}

min-request-interval = 1.minute

handler-timeout = 2.seconds

response-timeout = 2.seconds
actor-communication-margin = 1.seconds

supervisor {
min-backoff = 3.seconds
man-backoff = 30.seconds
max-backoff = 30.seconds
random-factor = 0.2
auto-reset = 10.seconds
attempts = 4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import io.iohk.ethereum.faucet.FaucetHandler.FaucetHandlerResponse.{
}
import io.iohk.ethereum.faucet.FaucetStatus.WalletAvailable
import io.iohk.ethereum.faucet.jsonrpc.FaucetDomain.{SendFundsRequest, StatusRequest}
import io.iohk.ethereum.faucet.{FaucetConfig, SupervisorConfig}
import io.iohk.ethereum.faucet.{FaucetConfig, RpcClientConfig, SupervisorConfig}
import io.iohk.ethereum.jsonrpc.JsonRpcError
import io.iohk.ethereum.testing.ActorsTesting.simpleAutoPilot
import io.iohk.ethereum.{NormalPatience, WithActorSystemShutDown}
Expand Down Expand Up @@ -133,11 +133,11 @@ class FaucetRpcServiceSpec
txGasPrice = 10,
txGasLimit = 20,
txValue = 1,
rpcAddress = "",
rpcClient = RpcClientConfig(address = "", timeout = 10.seconds),
keyStoreDir = "",
minRequestInterval = 10.seconds,
handlerTimeout = 10.seconds,
responseTimeout = 10.seconds,
actorCommunicationMargin = 10.seconds,
supervisor = mock[SupervisorConfig],
shutdownTimeout = 15.seconds
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import java.security.SecureRandom
import akka.util.ByteString
import io.iohk.ethereum.crypto._
import io.iohk.ethereum.domain.{Address, Transaction}
import io.iohk.ethereum.faucet.{FaucetConfig, SupervisorConfig}
import io.iohk.ethereum.faucet.{FaucetConfig, RpcClientConfig, SupervisorConfig}
import io.iohk.ethereum.jsonrpc.client.RpcClient.ConnectionError
import io.iohk.ethereum.keystore.KeyStore.DecryptionFailed
import io.iohk.ethereum.keystore.{KeyStore, Wallet}
import io.iohk.ethereum.network.p2p.messages.CommonMessages.SignedTransactions.SignedTransactionEnc
Expand All @@ -21,7 +22,7 @@ import scala.concurrent.duration._

class WalletServiceSpec extends AnyFlatSpec with Matchers with MockFactory {

"Wallet Service" should "send a transaction" in new TestSetup {
"Wallet Service" should "send a transaction successfully when getNonce and sendTransaction successfully" in new TestSetup {

val receivingAddress = Address("0x99")
val currentNonce = 2
Expand All @@ -44,6 +45,17 @@ class WalletServiceSpec extends AnyFlatSpec with Matchers with MockFactory {

}

it should "failure the transaction when get timeout of getNonce" in new TestSetup {

val timeout = ConnectionError("timeout")
(walletRpcClient.getNonce _).expects(config.walletAddress).returning(Task.pure(Left(timeout)))

val res = walletService.sendFunds(wallet, Address("0x99")).runSyncUnsafe()

res shouldEqual Left(timeout)

}

it should "get wallet successful" in new TestSetup {
(mockKeyStore.unlockAccount _).expects(config.walletAddress, config.walletPassword).returning(Right(wallet))

Expand Down Expand Up @@ -76,11 +88,11 @@ class WalletServiceSpec extends AnyFlatSpec with Matchers with MockFactory {
txGasPrice = 10,
txGasLimit = 20,
txValue = 1,
rpcAddress = "",
rpcClient = RpcClientConfig("", timeout = 10.seconds),
keyStoreDir = "",
minRequestInterval = 10.seconds,
handlerTimeout = 10.seconds,
responseTimeout = 10.seconds,
actorCommunicationMargin = 10.seconds,
supervisor = mock[SupervisorConfig],
shutdownTimeout = 15.seconds
)
Expand Down
7 changes: 5 additions & 2 deletions src/universal/conf/faucet.conf
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ faucet {
# null value indicates HTTPS is not being used
# password-file = "tls/password"
#}

# Response time-out from rpc client resolve
timeout = 3.seconds
}

# How often can a single IP address send a request
Expand All @@ -49,12 +52,12 @@ faucet {
handler-timeout = 1.seconds

# Response time-out from actor resolve
response-timeout = 3.seconds
actor-communication-margin = 1.seconds

# Supervisor with BackoffSupervisor pattern
supervisor {
min-backoff = 3.seconds
man-backoff = 30.seconds
max-backoff = 30.seconds
random-factor = 0.2
auto-reset = 10.seconds
attempts = 4
Expand Down