Skip to content

Commit 7bd6af6

Browse files
committed
Merge branch 'phase/beta1' into fix/windowsIniSupport
2 parents 1b37bff + bc24163 commit 7bd6af6

File tree

4 files changed

+121
-24
lines changed

4 files changed

+121
-24
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
package io.iohk.ethereum.blockchain.sync
22

3-
import scala.concurrent.duration.FiniteDuration
4-
import scala.concurrent.ExecutionContext.Implicits.global
53
import akka.actor._
64
import akka.util.ByteString
75
import io.iohk.ethereum.domain.BlockHeader
8-
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer}
6+
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
97
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
10-
import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe, Unsubscribe}
118
import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.MessageClassifier
12-
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
9+
import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe, Unsubscribe}
1310
import io.iohk.ethereum.network.p2p.messages.PV62.{BlockBody, BlockHeaders, GetBlockHeaders}
11+
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer}
1412
import io.iohk.ethereum.utils.Config.Sync._
1513

14+
import scala.concurrent.ExecutionContext.Implicits.global
15+
import scala.concurrent.duration.FiniteDuration
16+
1617
trait FastSync {
1718
selfSyncController: SyncController =>
1819

@@ -195,8 +196,13 @@ trait FastSync {
195196
case UpdateDownloadedNodesCount(num) =>
196197
downloadedNodesCount += num
197198

198-
case BlockBodiesReceived(_, requestedHashes, blockBodies) =>
199-
insertBlocks(requestedHashes, blockBodies)
199+
case BlockBodiesReceived(peer, requestedHashes, blockBodies) =>
200+
if (validateBlocks(requestedHashes, blockBodies)) {
201+
insertBlocks(requestedHashes, blockBodies)
202+
} else {
203+
blacklist(peer.id, blacklistDuration, s"responded with block bodies not matching block headers, blacklisting for $blacklistDuration")
204+
self ! FastSync.EnqueueBlockBodies(requestedHashes)
205+
}
200206

201207
case BlockHeadersReceived(_, headers) =>
202208
insertHeaders(headers)
@@ -211,13 +217,7 @@ trait FastSync {
211217
processSyncing()
212218

213219
case Terminated(ref) if assignedHandlers.contains(ref) =>
214-
context unwatch ref
215-
assignedHandlers -= ref
216-
mptNodesQueue ++= requestedMptNodes.getOrElse(ref, Nil)
217-
nonMptNodesQueue ++= requestedNonMptNodes.getOrElse(ref, Nil)
218-
blockBodiesQueue ++= requestedBlockBodies.getOrElse(ref, Nil)
219-
receiptsQueue ++= requestedReceipts.getOrElse(ref, Nil)
220-
cleanupRequestedMaps(ref)
220+
handleActorTerminate(ref)
221221

222222
case PrintStatus =>
223223
printStatus()
@@ -226,6 +226,16 @@ trait FastSync {
226226
persistSyncState()
227227
}
228228

229+
private def handleActorTerminate(ref: ActorRef) = {
230+
context unwatch ref
231+
assignedHandlers -= ref
232+
mptNodesQueue ++= requestedMptNodes.getOrElse(ref, Nil)
233+
nonMptNodesQueue ++= requestedNonMptNodes.getOrElse(ref, Nil)
234+
blockBodiesQueue ++= requestedBlockBodies.getOrElse(ref, Nil)
235+
receiptsQueue ++= requestedReceipts.getOrElse(ref, Nil)
236+
cleanupRequestedMaps(ref)
237+
}
238+
229239
private def persistSyncState(): Unit = {
230240
syncStateStorageActor ! SyncState(
231241
initialSyncState.targetBlock,
@@ -266,10 +276,16 @@ trait FastSync {
266276
requestedReceipts = requestedReceipts - handler
267277
}
268278

279+
private def validateBlocks(requestedHashes: Seq[ByteString], blockBodies: Seq[BlockBody]): Boolean = (requestedHashes zip blockBodies)
280+
.map { case (hash, body) => (blockchain.getBlockHeaderByHash(hash), body) }
281+
.forall {
282+
case (Some(header), body) =>
283+
val result = validators.blockValidator.validateHeaderAndBody(header, body)
284+
result.isRight
285+
case _ => false
286+
}
287+
269288
private def insertBlocks(requestedHashes: Seq[ByteString], blockBodies: Seq[BlockBody]): Unit = {
270-
//todo this is moved from FastSyncBlockBodiesRequestHandler.scala we should add block validation here [EC-249]
271-
//load header from chain by hash and check consistency with BlockValidator.validateHeaderAndBody
272-
//if invalid blacklist peer
273289
(requestedHashes zip blockBodies).foreach { case (hash, body) =>
274290
blockchain.save(hash, body)
275291
}

src/main/scala/io/iohk/ethereum/validators/BlockValidator.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
package io.iohk.ethereum.validators
22

3-
import akka.serialization.ByteArraySerializer
43
import akka.util.ByteString
54
import io.iohk.ethereum.crypto._
65
import io.iohk.ethereum.domain.{Block, BlockHeader, Receipt, SignedTransaction}
76
import io.iohk.ethereum.ledger.BloomFilter
8-
import io.iohk.ethereum.mpt.ByteArraySerializable
97
import io.iohk.ethereum.network.p2p.messages.PV62.BlockBody
10-
import io.iohk.ethereum.rlp.RLPImplicitConversions.toRlpList
11-
import io.iohk.ethereum.rlp._
128
import io.iohk.ethereum.utils.ByteUtils.or
139
import io.iohk.ethereum.validators.BlockValidator.BlockError
1410

src/test/scala/io/iohk/ethereum/Mocks.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ object Mocks {
5252
runFn(context.asInstanceOf[Ledger.PC]).asInstanceOf[ProgramResult[W, S]]
5353
}
5454

55+
class MockValidatorsFailingOnBlockBodies extends MockValidatorsAlwaysSucceed {
56+
57+
override val blockValidator: BlockValidator = new BlockValidator {
58+
override def validateBlockAndReceipts(block: Block, receipts: Seq[Receipt]) = Right(block)
59+
override def validateHeaderAndBody(blockHeader: BlockHeader, blockBody: BlockBody) = Left(BlockTransactionsHashError)
60+
}
61+
}
62+
5563
class MockValidatorsAlwaysSucceed extends Validators {
5664

5765
override val blockValidator: BlockValidator = new BlockValidator {

src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,16 @@ package io.iohk.ethereum.blockchain.sync
22

33
import java.net.InetSocketAddress
44

5-
import akka.actor.{ActorRef, ActorSystem, Props}
5+
import akka.actor.{ActorSystem, Props}
66
import akka.testkit.{TestActorRef, TestProbe}
77
import akka.util.ByteString
88
import com.miguno.akka.testing.VirtualTime
99
import io.iohk.ethereum.{Mocks, Timeouts}
1010
import io.iohk.ethereum.blockchain.sync.FastSync.{StateMptNodeHash, SyncState}
1111
import io.iohk.ethereum.blockchain.sync.SyncController.MinedBlock
12-
import io.iohk.ethereum.domain.{Account, Block, BlockHeader}
12+
import io.iohk.ethereum.domain.{Account, Block, BlockHeader, SignedTransaction}
1313
import io.iohk.ethereum.ledger.{BloomFilter, Ledger}
14-
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.{MessageFromPeer, PeerDisconnected}
14+
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
1515
import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.{MessageClassifier, PeerDisconnectedClassifier}
1616
import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe, Unsubscribe}
1717
import io.iohk.ethereum.network.EtcPeerManagerActor.{GetHandshakedPeers, HandshakedPeers, PeerInfo}
@@ -147,6 +147,83 @@ class SyncControllerSpec extends FlatSpec with Matchers {
147147
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer2.id))))
148148
}
149149

150+
it should "request for block bodies again if block bodies validation fails" in new TestSetup() {
151+
override val syncController = TestActorRef(Props(new SyncController(
152+
storagesInstance.storages.appStateStorage,
153+
blockchain,
154+
storagesInstance.storages,
155+
storagesInstance.storages.fastSyncStateStorage,
156+
ledger,
157+
new Mocks.MockValidatorsFailingOnBlockBodies,
158+
peerMessageBus.ref, pendingTransactionsManager.ref, ommersPool.ref, etcPeerManager.ref,
159+
externalSchedulerOpt = Some(time.scheduler))))
160+
161+
162+
val peer1TestProbe: TestProbe = TestProbe()(system)
163+
val peer1 = Peer(new InetSocketAddress("127.0.0.1", 0), peer1TestProbe.ref, incomingConnection = false)
164+
val peer2TestProbe: TestProbe = TestProbe()(system)
165+
val peer2 = Peer(new InetSocketAddress("127.0.0.1", 0), peer2TestProbe.ref, incomingConnection = false)
166+
167+
val expectedTargetBlock = 399500
168+
val targetBlockHeader: BlockHeader = baseBlockHeader.copy(
169+
number = expectedTargetBlock,
170+
stateRoot = ByteString(Hex.decode("deae1dfad5ec8dcef15915811e1f044d2543674fd648f94345231da9fc2646cc")))
171+
val bestBlockHeaderNumber: BigInt = targetBlockHeader.number - 1
172+
storagesInstance.storages.fastSyncStateStorage.putSyncState(SyncState(targetBlockHeader)
173+
.copy(bestBlockHeaderNumber = bestBlockHeaderNumber,
174+
mptNodesQueue = Seq(StateMptNodeHash(targetBlockHeader.stateRoot))))
175+
176+
time.advance(1.seconds)
177+
178+
val peerStatus = Status(1, 1, 20, ByteString("peer2_bestHash"), ByteString("unused"))
179+
180+
etcPeerManager.send(syncController, HandshakedPeers(Map(
181+
peer2 -> PeerInfo(peerStatus, forkAccepted = true, totalDifficulty = peerStatus.totalDifficulty, maxBlockNumber = 0))))
182+
183+
syncController ! SyncController.StartSync
184+
185+
val stateMptLeafWithAccount =
186+
ByteString(Hex.decode("f86d9e328415c225a782bb339b22acad1c739e42277bc7ef34de3623114997ce78b84cf84a0186cb7d8738d800a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a0c5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470"))
187+
188+
val watcher = TestProbe()
189+
watcher.watch(syncController)
190+
191+
etcPeerManager.expectMsg(
192+
EtcPeerManagerActor.SendMessage(GetNodeData(Seq(targetBlockHeader.stateRoot)), peer2.id))
193+
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(NodeData.code), PeerSelector.WithId(peer2.id))))
194+
peerMessageBus.reply(MessageFromPeer(NodeData(Seq(stateMptLeafWithAccount)), peer2.id))
195+
peerMessageBus.expectMsg(Unsubscribe(MessageClassifier(Set(NodeData.code), PeerSelector.WithId(peer2.id))))
196+
197+
etcPeerManager.expectMsg(EtcPeerManagerActor.SendMessage(
198+
GetBlockHeaders(Left(targetBlockHeader.number), expectedTargetBlock - bestBlockHeaderNumber, 0, reverse = false),
199+
peer2.id))
200+
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer2.id))))
201+
peerMessageBus.reply(MessageFromPeer(BlockHeaders(Seq(targetBlockHeader)), peer2.id))
202+
peerMessageBus.expectMsg(Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer2.id))))
203+
204+
etcPeerManager.expectMsg(
205+
EtcPeerManagerActor.SendMessage(GetReceipts(Seq(targetBlockHeader.hash)), peer2.id))
206+
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(Receipts.code), PeerSelector.WithId(peer2.id))))
207+
peerMessageBus.reply(MessageFromPeer(Receipts(Seq(Nil)), peer2.id))
208+
peerMessageBus.expectMsg(Unsubscribe(MessageClassifier(Set(Receipts.code), PeerSelector.WithId(peer2.id))))
209+
210+
etcPeerManager.expectMsg(
211+
EtcPeerManagerActor.SendMessage(GetBlockBodies(Seq(targetBlockHeader.hash)), peer2.id))
212+
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(BlockBodies.code), PeerSelector.WithId(peer2.id))))
213+
peerMessageBus.reply(MessageFromPeer(BlockBodies(Seq(BlockBody(Nil, Nil))), peer2.id))
214+
peerMessageBus.expectMsg(Unsubscribe(MessageClassifier(Set(BlockBodies.code), PeerSelector.WithId(peer2.id))))
215+
216+
//peer was blacklisted for bad block bodies. connecting second peer
217+
etcPeerManager.send(syncController, HandshakedPeers(Map(
218+
peer1 -> PeerInfo(peerStatus, forkAccepted = true, totalDifficulty = peerStatus.totalDifficulty, maxBlockNumber = 0))))
219+
220+
time.advance(1.seconds)
221+
222+
//ask different peer for block bodies again
223+
etcPeerManager.expectMsg(EtcPeerManagerActor.SendMessage(GetBlockBodies(Seq(targetBlockHeader.hash)), peer1.id))
224+
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(BlockBodies.code), PeerSelector.WithId(peer1.id))))
225+
}
226+
150227
it should "not use (blacklist) a peer that fails to respond within time limit" in new TestSetup() {
151228
val peer2TestProbe: TestProbe = TestProbe()(system)
152229

0 commit comments

Comments
 (0)