Skip to content

Commit cb7a1b5

Browse files
committed
Merge remote-tracking branch 'origin/phase/beta1' into fix/downloadMptFirst
2 parents dd5275c + c0db1a5 commit cb7a1b5

File tree

7 files changed

+52
-28
lines changed

7 files changed

+52
-28
lines changed

src/main/resources/application.conf

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ grothendieck {
152152
# Enabled JSON-RPC APIs over the HTTP endpoint
153153
# Available choices are: eth, web3, net, personal
154154
apis = "eth,web3,net"
155+
156+
net {
157+
peer-manager-timeout = 5.seconds
158+
}
155159
}
156160
}
157161

@@ -343,7 +347,7 @@ grothendieck {
343347
# Time at which a filter remains valid
344348
filter-timeout = 10.minutes
345349

346-
filter-manager-query-timeout = 3.seconds
350+
filter-manager-query-timeout = 3.minutes
347351
}
348352

349353
}

src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -173,20 +173,10 @@ trait FastSync {
173173

174174
private var blockChainOnlyPeers = Set.empty[Peer]
175175

176-
private val syncStatePersistCancellable =
177-
scheduler.schedule(persistStateSnapshotInterval, persistStateSnapshotInterval) {
178-
syncStateStorageActor ! SyncState(
179-
initialSyncState.targetBlock,
180-
requestedMptNodes.values.flatten.toSeq.distinct ++ mptNodesQueue,
181-
requestedNonMptNodes.values.flatten.toSeq.distinct ++ nonMptNodesQueue,
182-
requestedBlockBodies.values.flatten.toSeq.distinct ++ blockBodiesQueue,
183-
requestedReceipts.values.flatten.toSeq.distinct ++ receiptsQueue,
184-
downloadedNodesCount,
185-
bestBlockHeaderNumber)
186-
}
187-
176+
private val syncStatePersistCancellable = scheduler.schedule(persistStateSnapshotInterval, persistStateSnapshotInterval, self, PersistSyncState)
188177
private val heartBeat = scheduler.schedule(syncRetryInterval, syncRetryInterval * 2, self, ProcessSyncing)
189178

179+
// scalastyle:off cyclomatic.complexity
190180
def receive: Receive = handlePeerUpdates orElse handleFailingMptPeers orElse {
191181
case EnqueueNodes(hashes) =>
192182
hashes.foreach {
@@ -231,6 +221,20 @@ trait FastSync {
231221

232222
case PrintStatus =>
233223
printStatus()
224+
225+
case PersistSyncState =>
226+
persistSyncState()
227+
}
228+
229+
private def persistSyncState(): Unit = {
230+
syncStateStorageActor ! SyncState(
231+
initialSyncState.targetBlock,
232+
requestedMptNodes.values.flatten.toSeq.distinct ++ mptNodesQueue,
233+
requestedNonMptNodes.values.flatten.toSeq.distinct ++ nonMptNodesQueue,
234+
requestedBlockBodies.values.flatten.toSeq.distinct ++ blockBodiesQueue,
235+
requestedReceipts.values.flatten.toSeq.distinct ++ receiptsQueue,
236+
downloadedNodesCount,
237+
bestBlockHeaderNumber)
234238
}
235239

236240
private def handleFailingMptPeers: Receive ={
@@ -448,6 +452,7 @@ object FastSync {
448452
private case object TargetBlockTimeout
449453

450454
private case object ProcessSyncing
455+
private case object PersistSyncState
451456
case class BlockChainOnlyDownload(peer: Peer)
452457

453458
case class SyncState(

src/main/scala/io/iohk/ethereum/jsonrpc/NetService.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package io.iohk.ethereum.jsonrpc
33
import akka.actor.ActorRef
44
import akka.agent.Agent
55
import akka.util.Timeout
6+
import io.iohk.ethereum.jsonrpc.NetService.NetServiceConfig
67
import io.iohk.ethereum.network.PeerManagerActor
78
import io.iohk.ethereum.utils.ServerStatus.{Listening, NotListening}
89
import io.iohk.ethereum.utils.{Config, NodeStatus}
@@ -20,9 +21,19 @@ object NetService {
2021

2122
case class PeerCountRequest()
2223
case class PeerCountResponse(value: Int)
24+
25+
case class NetServiceConfig(peerManagerTimeout: FiniteDuration)
26+
27+
object NetServiceConfig {
28+
def apply(etcClientConfig: com.typesafe.config.Config): NetServiceConfig = {
29+
val netServiceConfig = etcClientConfig.getConfig("network.rpc.net")
30+
NetServiceConfig(
31+
peerManagerTimeout = netServiceConfig.getDuration("peer-manager-timeout").toMillis.millis)
32+
}
33+
}
2334
}
2435

25-
class NetService(nodeStatusHolder: Agent[NodeStatus], peerManager: ActorRef) {
36+
class NetService(nodeStatusHolder: Agent[NodeStatus], peerManager: ActorRef, config: NetServiceConfig) {
2637
import NetService._
2738

2839
def version(req: VersionRequest): ServiceResponse[VersionResponse] = {
@@ -42,7 +53,7 @@ class NetService(nodeStatusHolder: Agent[NodeStatus], peerManager: ActorRef) {
4253

4354
def peerCount(req: PeerCountRequest): ServiceResponse[PeerCountResponse] = {
4455
import akka.pattern.ask
45-
implicit val timeout = Timeout(2.seconds)
56+
implicit val timeout = Timeout(config.peerManagerTimeout)
4657

4758
(peerManager ? PeerManagerActor.GetPeers)
4859
.mapTo[PeerManagerActor.Peers]

src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import scala.concurrent.Future
1212
import scala.concurrent.duration._
1313
import akka.actor.SupervisorStrategy.Stop
1414
import akka.actor._
15-
import akka.agent.Agent
1615
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.PeerDisconnected
1716
import io.iohk.ethereum.network.PeerEventBusActor.Publish
1817
import io.iohk.ethereum.network.discovery.PeerDiscoveryManager
@@ -21,7 +20,8 @@ import io.iohk.ethereum.network.handshaker.Handshaker.HandshakeResult
2120
import io.iohk.ethereum.network.p2p.{MessageDecoder, MessageSerializable}
2221
import io.iohk.ethereum.network.rlpx.AuthHandshaker
2322
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
24-
import io.iohk.ethereum.utils.NodeStatus
23+
24+
import scala.util.{Failure, Success}
2525

2626
class PeerManagerActor(
2727
peerEventBus: ActorRef,
@@ -161,8 +161,9 @@ class PeerManagerActor(
161161
Future.traverse(peers.values) { peer =>
162162
(peer.ref ? PeerActor.GetStatus)
163163
.mapTo[PeerActor.StatusResponse]
164-
.map(sr => (peer, sr.status))
165-
}.map(r => Peers.apply(r.toMap))
164+
.map { sr => Success((peer, sr.status)) }
165+
.recover { case ex => Failure(ex) }
166+
}.map(r => Peers.apply(r.collect { case Success(v) => v }.toMap))
166167
}
167168

168169
}

src/main/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManager.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,7 @@ class PeerDiscoveryManager(
3434

3535
if (discoveryConfig.discoveryEnabled) {
3636
discoveryListener ! DiscoveryListener.Subscribe
37-
38-
context.system.scheduler.schedule(discoveryConfig.scanInitialDelay, discoveryConfig.scanInterval) {
39-
scan()
40-
}
37+
context.system.scheduler.schedule(discoveryConfig.scanInitialDelay, discoveryConfig.scanInterval, self, Scan)
4138
}
4239

4340
def scan(): Unit = {
@@ -78,6 +75,8 @@ class PeerDiscoveryManager(
7875

7976
case GetDiscoveredNodes =>
8077
sender() ! DiscoveredNodes(nodes.values.toSet)
78+
79+
case Scan => scan()
8180
}
8281

8382
private def sendPing(toNodeId: ByteString, toAddr: InetSocketAddress): Unit = {
@@ -130,4 +129,6 @@ object PeerDiscoveryManager {
130129

131130
case object GetDiscoveredNodes
132131
case class DiscoveredNodes(nodes: Set[Node])
132+
133+
private case object Scan
133134
}

src/main/scala/io/iohk/ethereum/nodebuilder/NodeBuilder.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import io.iohk.ethereum.db.components.{SharedLevelDBDataSources, Storages}
1111
import io.iohk.ethereum.db.storage.AppStateStorage
1212
import io.iohk.ethereum.db.storage.pruning.PruningMode
1313
import io.iohk.ethereum.domain.{Blockchain, BlockchainImpl}
14+
import io.iohk.ethereum.jsonrpc.NetService.NetServiceConfig
1415
import io.iohk.ethereum.ledger.{Ledger, LedgerImpl}
1516
import io.iohk.ethereum.network.{PeerManagerActor, ServerActor}
1617
import io.iohk.ethereum.jsonrpc._
@@ -226,7 +227,9 @@ trait Web3ServiceBuilder {
226227
trait NetServiceBuilder {
227228
this: PeerManagerActorBuilder with NodeStatusBuilder =>
228229

229-
lazy val netService = new NetService(nodeStatusHolder, peerManager)
230+
lazy val netServiceConfig = NetServiceConfig(Config.config)
231+
232+
lazy val netService = new NetService(nodeStatusHolder, peerManager, netServiceConfig)
230233
}
231234

232235
trait PendingTransactionsManagerBuilder {
@@ -259,9 +262,7 @@ trait FilterManagerBuilder {
259262
keyStore,
260263
pendingTransactionsManager,
261264
filterConfig,
262-
txPoolConfig
263-
)
264-
)
265+
txPoolConfig), "filter-manager")
265266
}
266267

267268
trait BlockGeneratorBuilder {

src/test/scala/io/iohk/ethereum/jsonrpc/NetServiceSpec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import io.iohk.ethereum.utils.{NodeStatus, ServerStatus}
1313
import org.scalatest.concurrent.ScalaFutures
1414
import org.scalatest.{FlatSpec, Matchers}
1515

16+
import scala.concurrent.duration._
1617
import scala.concurrent.ExecutionContext.Implicits.global
1718

1819
class NetServiceSpec extends FlatSpec with Matchers with ScalaFutures with NormalPatience with SecureRandomBuilder {
@@ -46,7 +47,7 @@ class NetServiceSpec extends FlatSpec with Matchers with ScalaFutures with Norma
4647

4748
val nodeStatus = NodeStatus(crypto.generateKeyPair(secureRandom), ServerStatus.Listening(new InetSocketAddress(9000)),
4849
discoveryStatus = ServerStatus.NotListening)
49-
val netService = new NetService(Agent(nodeStatus), peerManager.ref)
50+
val netService = new NetService(Agent(nodeStatus), peerManager.ref, NetServiceConfig(5.seconds))
5051
}
5152

5253
}

0 commit comments

Comments
 (0)