Skip to content

Commit 785da3c

Browse files
authored
Merge branch 'develop' into feature/ETCM-126-rpc-cleanup
2 parents 591aa68 + 7abdf78 commit 785da3c

File tree

4 files changed

+113
-44
lines changed

4 files changed

+113
-44
lines changed

src/main/resources/application.conf

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ mantis {
291291

292292
# Retry interval for resuming fast sync after all connections to peers were lost
293293
# Also retry interval in regular sync: for picking blocks batch and retrying requests
294-
sync-retry-interval = 15.seconds
294+
sync-retry-interval = 0.5 seconds
295295

296296
# Response time-out from peer during sync. If a peer fails to respond within this limit, it will be blacklisted
297297
peer-response-timeout = 3.minutes
@@ -325,7 +325,7 @@ mantis {
325325

326326
# During fast-sync when most up to date block is determined from peers, the actual target block number
327327
# will be decreased by this value
328-
target-block-offset = 500
328+
target-block-offset = 128
329329

330330
# How often to query peers for new blocks after the top of the chain has been reached
331331
check-for-new-block-interval = 10.seconds
@@ -335,7 +335,7 @@ mantis {
335335
fastsync-block-chain-only-peers-pool = 100
336336

337337
# time between 2 consecutive requests to peer when doing fast sync, this is to prevent flagging us as spammer
338-
fastsync-throttle = 0.3 seconds
338+
fastsync-throttle = 0.1 seconds
339339

340340
# When we receive a branch that is not rooted in our chain (we don't have a parent for the first header), it means
341341
# we found a fork. To resolve it, we need to query the same peer for previous headers, to find a common ancestor.

src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import io.iohk.ethereum.network.Peer
1616
import io.iohk.ethereum.network.p2p.messages.PV62._
1717
import io.iohk.ethereum.network.p2p.messages.PV63.MptNodeEncoders._
1818
import io.iohk.ethereum.network.p2p.messages.PV63._
19+
import io.iohk.ethereum.utils.ByteStringUtils
1920
import io.iohk.ethereum.utils.Config.SyncConfig
2021
import org.bouncycastle.util.encoders.Hex
2122

@@ -360,35 +361,36 @@ class FastSync(
360361
}
361362

362363
private def handleReceipts(peer: Peer, requestedHashes: Seq[ByteString], receipts: Seq[Seq[Receipt]]) = {
363-
validateReceipts(requestedHashes, receipts) match {
364-
case ReceiptsValidationResult.Valid(blockHashesWithReceipts) =>
365-
blockHashesWithReceipts.map { case (hash, receiptsForBlock) =>
366-
blockchain.storeReceipts(hash, receiptsForBlock)
367-
}.reduce(_.and(_))
368-
.commit()
369-
370-
val receivedHashes = blockHashesWithReceipts.unzip._1
371-
updateBestBlockIfNeeded(receivedHashes)
372-
373-
if (receipts.isEmpty) {
374-
val reason = s"got empty receipts for known hashes: ${requestedHashes.map(h => Hex.toHexString(h.toArray[Byte]))}"
375-
blacklist(peer.id, blacklistDuration, reason)
376-
}
377-
378-
val remainingReceipts = requestedHashes.drop(receipts.size)
379-
if (remainingReceipts.nonEmpty) {
380-
syncState = syncState.enqueueReceipts(remainingReceipts)
381-
}
364+
if (receipts.isEmpty) {
365+
val reason = s"got empty receipts for known hashes: ${requestedHashes.map(ByteStringUtils.hash2string)}"
366+
blacklist(peer.id, blacklistDuration, reason)
367+
syncState = syncState.enqueueReceipts(requestedHashes)
368+
} else {
369+
validateReceipts(requestedHashes, receipts) match {
370+
case ReceiptsValidationResult.Valid(blockHashesWithReceipts) =>
371+
blockHashesWithReceipts.map { case (hash, receiptsForBlock) =>
372+
blockchain.storeReceipts(hash, receiptsForBlock)
373+
}.reduce(_.and(_))
374+
.commit()
375+
376+
val receivedHashes = blockHashesWithReceipts.unzip._1
377+
updateBestBlockIfNeeded(receivedHashes)
378+
379+
val remainingReceipts = requestedHashes.drop(receipts.size)
380+
if (remainingReceipts.nonEmpty) {
381+
syncState = syncState.enqueueReceipts(remainingReceipts)
382+
}
382383

383-
case ReceiptsValidationResult.Invalid(error) =>
384-
val reason =
385-
s"got invalid receipts for known hashes: ${requestedHashes.map(h => Hex.toHexString(h.toArray[Byte]))}" +
386-
s" due to: $error"
387-
blacklist(peer.id, blacklistDuration, reason)
388-
syncState = syncState.enqueueReceipts(requestedHashes)
384+
case ReceiptsValidationResult.Invalid(error) =>
385+
val reason =
386+
s"got invalid receipts for known hashes: ${requestedHashes.map(h => Hex.toHexString(h.toArray[Byte]))}" +
387+
s" due to: $error"
388+
blacklist(peer.id, blacklistDuration, reason)
389+
syncState = syncState.enqueueReceipts(requestedHashes)
389390

390-
case ReceiptsValidationResult.DbError =>
391-
redownloadBlockchain()
391+
case ReceiptsValidationResult.DbError =>
392+
redownloadBlockchain()
393+
}
392394
}
393395

394396
processSyncing()
@@ -604,7 +606,6 @@ class FastSync(
604606
if (assignedHandlers.nonEmpty) {
605607
log.debug("There are no available peers, waiting for responses")
606608
} else {
607-
log.debug("There are no peers to download from, scheduling a retry in {}", syncRetryInterval)
608609
scheduler.scheduleOnce(syncRetryInterval, self, ProcessSyncing)
609610
}
610611
} else {

src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,68 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
145145
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer1.id))))
146146
}
147147

148+
it should "gracefully handle receiving empty receipts while syncing" in new TestSetup() {
149+
150+
val newSafeTarget = defaultExpectedTargetBlock + syncConfig.fastSyncBlockValidationX
151+
val bestBlockNumber = defaultExpectedTargetBlock
152+
val firstNewBlock = bestBlockNumber + 1
153+
154+
startWithState(defaultState.copy(
155+
bestBlockHeaderNumber = bestBlockNumber,
156+
safeDownloadTarget = newSafeTarget)
157+
)
158+
159+
Thread.sleep(1.seconds.toMillis)
160+
161+
syncController ! SyncController.Start
162+
163+
val handshakedPeers = HandshakedPeers(singlePeer)
164+
updateHandshakedPeers(handshakedPeers)
165+
etcPeerManager.setAutoPilot(new AutoPilot {
166+
def run(sender: ActorRef, msg: Any): AutoPilot = {
167+
if (msg == EtcPeerManagerActor.GetHandshakedPeers) {
168+
sender ! handshakedPeers
169+
}
170+
171+
this
172+
}
173+
})
174+
175+
val watcher = TestProbe()
176+
watcher.watch(syncController)
177+
178+
val newBlocks = getHeaders(firstNewBlock, syncConfig.blockHeadersPerRequest)
179+
val newReceipts = newBlocks.map(_.hash).map(_ => Seq.empty[Receipt])
180+
val newBodies = newBlocks.map(_ => BlockBody.empty)
181+
182+
//wait for peers throttle
183+
Thread.sleep(syncConfig.fastSyncThrottle.toMillis)
184+
sendBlockHeaders(firstNewBlock, newBlocks, peer1, newBlocks.size)
185+
186+
Thread.sleep(syncConfig.fastSyncThrottle.toMillis)
187+
sendNewTargetBlock(defaultTargetBlockHeader.copy(number = defaultTargetBlockHeader.number + 1), peer1, peer1Status, handshakedPeers)
188+
189+
Thread.sleep(1.second.toMillis)
190+
sendReceipts(newBlocks.map(_.hash), Seq(), peer1)
191+
192+
// Peer will be blacklisted for empty response, so wait he is blacklisted
193+
Thread.sleep(2.second.toMillis)
194+
sendReceipts(newBlocks.map(_.hash), newReceipts, peer1)
195+
196+
Thread.sleep(syncConfig.fastSyncThrottle.toMillis)
197+
sendBlockBodies(newBlocks.map(_.hash), newBodies, peer1)
198+
199+
Thread.sleep(syncConfig.fastSyncThrottle.toMillis)
200+
sendNodes(Seq(defaultTargetBlockHeader.stateRoot), Seq(defaultStateMptLeafWithAccount), peer1)
201+
202+
//switch to regular download
203+
etcPeerManager.expectMsg(EtcPeerManagerActor.SendMessage(
204+
GetBlockHeaders(Left(defaultTargetBlockHeader.number + 1), syncConfig.blockHeadersPerRequest, 0, reverse = false),
205+
peer1.id))
206+
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer1.id))))
207+
}
208+
209+
148210
it should "handle blocks that fail validation" in new TestSetup(_validators = new Mocks.MockValidatorsAlwaysSucceed {
149211
override val blockHeaderValidator: BlockHeaderValidator = { (blockHeader, getBlockHeaderByHash) => Left(HeaderPoWError) }
150212
}) {
@@ -444,11 +506,12 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
444506
peerMessageBus.expectMsg(Unsubscribe())
445507

446508
// response timeout
447-
Thread.sleep(2.seconds.toMillis)
448-
etcPeerManager.expectNoMessage()
509+
Thread.sleep(1.seconds.toMillis)
510+
511+
etcPeerManager.expectNoMessage(1.second)
449512

450513
// wait for blacklist timeout
451-
Thread.sleep(6.seconds.toMillis)
514+
Thread.sleep(2.seconds.toMillis)
452515

453516
// peer should not be blacklisted anymore
454517
etcPeerManager.expectMsg(EtcPeerManagerActor.SendMessage(GetNodeData(Seq(defaultTargetBlockHeader.stateRoot)), peer1.id))
@@ -577,7 +640,8 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
577640
minPeersToChooseTargetBlock = 1,
578641
peersScanInterval = 500.milliseconds,
579642
redownloadMissingStateNodes = false,
580-
fastSyncBlockValidationX = 10
643+
fastSyncBlockValidationX = 10,
644+
blacklistDuration = 1.second
581645
)
582646

583647
lazy val syncController = TestActorRef(Props(new SyncController(

src/test/scala/io/iohk/ethereum/consensus/ethash/EthashUtilsSpec.scala

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,20 @@ package io.iohk.ethereum.consensus.ethash
22

33
import akka.util.ByteString
44
import io.iohk.ethereum.crypto.kec256
5-
import org.scalacheck.Arbitrary
5+
import org.scalacheck.Gen
66
import org.bouncycastle.util.encoders.Hex
77
import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks
88
import org.scalatest.flatspec.AnyFlatSpec
99
import org.scalatest.matchers.should.Matchers
1010

11+
import scala.annotation.tailrec
12+
1113
class EthashUtilsSpec extends AnyFlatSpec with Matchers with ScalaCheckPropertyChecks {
1214

1315
import io.iohk.ethereum.consensus.ethash.EthashUtils._
1416

1517
"Ethash" should "generate correct hash" in {
16-
forAll(Arbitrary.arbitrary[Long].filter(_ < 15000000)) { blockNumber =>
18+
forAll(Gen.choose[Long](0, 15000000L)) { blockNumber =>
1719
seed(epoch(blockNumber)) shouldBe seedForBlockReference(blockNumber)
1820
}
1921
}
@@ -115,13 +117,15 @@ class EthashUtilsSpec extends AnyFlatSpec with Matchers with ScalaCheckPropertyC
115117
}
116118

117119
def seedForBlockReference(blockNumber: BigInt): ByteString = {
118-
if (blockNumber < EPOCH_LENGTH) {
119-
//wrong version from YP:
120-
//ByteString(kec256(Hex.decode("00" * 32)))
121-
//working version:
122-
ByteString(Hex.decode("00" * 32))
123-
} else {
124-
kec256(seedForBlockReference(blockNumber - EPOCH_LENGTH))
120+
@tailrec
121+
def go(current: BigInt, currentHash: ByteString): ByteString = {
122+
if (current < EPOCH_LENGTH) {
123+
currentHash
124+
} else {
125+
go(current - EPOCH_LENGTH, kec256(currentHash))
126+
}
125127
}
128+
129+
go(blockNumber, ByteString(Hex.decode("00" * 32)))
126130
}
127131
}

0 commit comments

Comments
 (0)