Skip to content

Commit 4823d44

Browse files
committed
Merge remote-tracking branch 'origin/develop' into etcm-275/async-processing
2 parents 0f644d5 + b8e013d commit 4823d44

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+780
-705
lines changed

.buildkite/pipeline.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ steps:
55
nix eval --json '(import ./.buildkite { pipeline = ./.buildkite/pipeline.nix; })' \
66
| buildkite-agent pipeline upload --no-interpolation
77
agents:
8-
queue: project42
8+
queue: project42
9+
timeout_in_minutes: 60
Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.iohk.ethereum.healthcheck
22

3-
import scala.concurrent.{ExecutionContext, Future}
4-
import scala.util.{Failure, Success}
3+
import monix.eval.Task
54

65
/**
76
* Represents a health check, runs it and interprets the outcome.
@@ -18,20 +17,18 @@ import scala.util.{Failure, Success}
1817
*/
1918
case class Healthcheck[Error, Result](
2019
description: String,
21-
f: () => Future[Either[Error, Result]],
20+
f: Task[Either[Error, Result]],
2221
mapResultToError: Result => Option[String] = (_: Result) => None,
2322
mapErrorToError: Error => Option[String] = (error: Error) => Some(String.valueOf(error)),
2423
mapExceptionToError: Throwable => Option[String] = (t: Throwable) => Some(String.valueOf(t))
2524
) {
2625

27-
def apply()(implicit ec: ExecutionContext): Future[HealthcheckResult] = {
28-
f().transform {
29-
case Success(Left(error)) =>
30-
Success(HealthcheckResult(description, mapErrorToError(error)))
31-
case Success(Right(result)) =>
32-
Success(HealthcheckResult(description, mapResultToError(result)))
33-
case Failure(t) =>
34-
Success(HealthcheckResult(description, mapExceptionToError(t)))
35-
}
26+
def apply(): Task[HealthcheckResult] = {
27+
f.map {
28+
case Left(error) =>
29+
HealthcheckResult(description, mapErrorToError(error))
30+
case Right(result) =>
31+
HealthcheckResult(description, mapResultToError(result))
32+
}.onErrorHandle(t => HealthcheckResult(description, mapExceptionToError(t)))
3633
}
3734
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.iohk.ethereum.jsonrpc
2+
3+
import akka.actor.{Actor, ActorRef}
4+
import akka.pattern.ask
5+
import akka.util.Timeout
6+
import monix.eval.Task
7+
8+
import scala.reflect.ClassTag
9+
10+
object AkkaTaskOps {
11+
implicit class TaskActorOps(val to: ActorRef) extends AnyVal {
12+
13+
def askFor[A](
14+
message: Any
15+
)(implicit timeout: Timeout, classTag: ClassTag[A], sender: ActorRef = Actor.noSender): Task[A] =
16+
Task
17+
.deferFuture((to ? message).mapTo[A])
18+
.timeout(timeout.duration)
19+
}
20+
}

src/main/scala/io/iohk/ethereum/jsonrpc/CheckpointingService.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@ import io.iohk.ethereum.blockchain.sync.regular.RegularSync.NewCheckpoint
66
import io.iohk.ethereum.crypto.ECDSASignature
77
import io.iohk.ethereum.domain.Blockchain
88
import io.iohk.ethereum.utils.Logger
9-
import monix.execution.Scheduler.Implicits.global
10-
11-
import scala.concurrent.Future
9+
import monix.eval.Task
1210

1311
class CheckpointingService(
1412
blockchain: Blockchain,
@@ -21,12 +19,12 @@ class CheckpointingService(
2119
lazy val bestBlockNum = blockchain.getBestBlockNumber()
2220
lazy val blockToReturnNum = bestBlockNum - bestBlockNum % req.checkpointingInterval
2321

24-
Future {
22+
Task {
2523
blockchain.getBlockByNumber(blockToReturnNum)
2624
}.flatMap {
2725
case Some(b) =>
2826
val resp = GetLatestBlockResponse(b.hash, b.number)
29-
Future.successful(Right(resp))
27+
Task.now(Right(resp))
3028

3129
case None =>
3230
log.error(
@@ -37,7 +35,7 @@ class CheckpointingService(
3735
}
3836
}
3937

40-
def pushCheckpoint(req: PushCheckpointRequest): ServiceResponse[PushCheckpointResponse] = Future {
38+
def pushCheckpoint(req: PushCheckpointRequest): ServiceResponse[PushCheckpointResponse] = Task {
4139
syncController ! NewCheckpoint(req.hash, req.signatures)
4240
Right(PushCheckpointResponse())
4341
}
Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,44 @@
11
package io.iohk.ethereum.jsonrpc
22

33
import akka.actor.ActorRef
4-
import akka.pattern._
54
import akka.util.Timeout
5+
import io.iohk.ethereum.jsonrpc.AkkaTaskOps._
66
import io.iohk.ethereum.jsonrpc.DebugService.{ListPeersInfoRequest, ListPeersInfoResponse}
77
import io.iohk.ethereum.network.EtcPeerManagerActor.{PeerInfo, PeerInfoResponse}
88
import io.iohk.ethereum.network.PeerManagerActor.Peers
99
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer, PeerActor, PeerId, PeerManagerActor}
10+
import monix.eval.Task
1011

11-
import scala.concurrent.ExecutionContext.Implicits.global
12-
import scala.concurrent.Future
1312
import scala.concurrent.duration._
1413

1514
object DebugService {
16-
1715
case class ListPeersInfoRequest()
1816
case class ListPeersInfoResponse(peers: List[PeerInfo])
19-
2017
}
2118

2219
class DebugService(peerManager: ActorRef, etcPeerManager: ActorRef) {
2320

2421
def listPeersInfo(getPeersInfoRequest: ListPeersInfoRequest): ServiceResponse[ListPeersInfoResponse] = {
25-
val result = for {
22+
for {
2623
ids <- getPeerIds
27-
peers <- Future.traverse(ids)(getPeerInfo)
28-
} yield ListPeersInfoResponse(peers.flatten)
29-
30-
result.map(Right(_))
24+
peers <- Task.traverse(ids)(getPeerInfo)
25+
} yield Right(ListPeersInfoResponse(peers.flatten))
3126
}
3227

33-
private def getPeerIds: Future[List[PeerId]] = {
28+
private def getPeerIds: Task[List[PeerId]] = {
3429
implicit val timeout: Timeout = Timeout(5.seconds)
3530

36-
(peerManager ? PeerManagerActor.GetPeers)
37-
.mapTo[Peers]
38-
.recover { case _ => Peers(Map.empty[Peer, PeerActor.Status]) }
31+
peerManager
32+
.askFor[Peers](PeerManagerActor.GetPeers)
33+
.onErrorRecover { case _ => Peers(Map.empty[Peer, PeerActor.Status]) }
3934
.map(_.peers.keySet.map(_.id).toList)
4035
}
4136

42-
private def getPeerInfo(peer: PeerId): Future[Option[PeerInfo]] = {
37+
private def getPeerInfo(peer: PeerId): Task[Option[PeerInfo]] = {
4338
implicit val timeout: Timeout = Timeout(5.seconds)
4439

45-
(etcPeerManager ? EtcPeerManagerActor.PeerInfoRequest(peer))
46-
.mapTo[PeerInfoResponse]
47-
.collect { case PeerInfoResponse(info) => info }
40+
etcPeerManager
41+
.askFor[PeerInfoResponse](EtcPeerManagerActor.PeerInfoRequest(peer))
42+
.map(resp => resp.peerInfo)
4843
}
4944
}

0 commit comments

Comments
 (0)