@@ -3,14 +3,17 @@ package io.iohk.ethereum.blockchain.sync.regular
3
3
import akka .actor .ActorRef
4
4
import akka .util .ByteString
5
5
import cats .data .NonEmptyList
6
+ // FIXME: By using this class, we are coupling sync process with a specific consensus (the standard one).
7
+ import io .iohk .ethereum .consensus .validators .std .StdBlockValidator
6
8
import io .iohk .ethereum .domain .{Block , BlockHeader , BlockBody , HeadersSeq }
7
- import io .iohk .ethereum .network .{ Peer , PeerId }
9
+ import io .iohk .ethereum .network .PeerId
8
10
import io .iohk .ethereum .network .p2p .messages .PV62 .BlockHash
9
11
import BlockFetcherState ._
10
12
import cats .syntax .either ._
11
13
import cats .syntax .option ._
12
14
13
15
import scala .collection .immutable .Queue
16
+ import scala .annotation .tailrec
14
17
15
18
// scalastyle:off number.of.methods
16
19
/**
@@ -101,14 +104,36 @@ case class BlockFetcherState(
101
104
}
102
105
)
103
106
104
- def addBodies (peer : Peer , bodies : Seq [BlockBody ]): BlockFetcherState = {
105
- val (matching, waiting) = waitingHeaders.splitAt(bodies.length)
106
- val blocks = matching.zip(bodies).map((Block .apply _).tupled)
107
+ def validateBodies (receivedBodies : Seq [BlockBody ]): Either [String , Seq [Block ]] =
108
+ bodiesAreOrderedSubsetOfRequested(waitingHeaders.toList, receivedBodies)
109
+ .toRight(
110
+ " Received unrequested bodies"
111
+ )
112
+
113
+ // Checks that the received block bodies are an ordered subset of the ones requested
114
+ @ tailrec
115
+ private def bodiesAreOrderedSubsetOfRequested (
116
+ requestedHeaders : Seq [BlockHeader ],
117
+ respondedBodies : Seq [BlockBody ],
118
+ matchedBlocks : Seq [Block ] = Nil
119
+ ): Option [Seq [Block ]] =
120
+ (requestedHeaders, respondedBodies) match {
121
+ case (Seq (), _ +: _) => None
122
+ case (_, Seq ()) => Some (matchedBlocks)
123
+ case (header +: remainingHeaders, body +: remainingBodies) =>
124
+ val doMatch = StdBlockValidator .validateHeaderAndBody(header, body).isRight
125
+ if (doMatch)
126
+ bodiesAreOrderedSubsetOfRequested(remainingHeaders, remainingBodies, matchedBlocks :+ Block (header, body))
127
+ else
128
+ bodiesAreOrderedSubsetOfRequested(remainingHeaders, respondedBodies, matchedBlocks)
129
+ }
107
130
108
- withPeerForBlocks(peer.id, blocks.map(_.header.number))
131
+ def appendNewBlocks (blocks : Seq [Block ], fromPeer : PeerId ): BlockFetcherState = {
132
+ val receivedHeaders = blocks.map(_.header)
133
+ withPeerForBlocks(fromPeer, blocks.map(_.header.number))
109
134
.copy(
110
- readyBlocks = readyBlocks.enqueue(blocks),
111
- waitingHeaders = waiting
135
+ readyBlocks = readyBlocks.enqueue(blocks.toList ),
136
+ waitingHeaders = waitingHeaders.diff(receivedHeaders)
112
137
)
113
138
}
114
139
0 commit comments