@@ -12,12 +12,32 @@ import cats.syntax.option._
12
12
13
13
import scala .collection .immutable .Queue
14
14
15
+ // scalastyle:off number.of.methods
16
+ /**
17
+ * State used by the BlockFetcher
18
+ *
19
+ * @param importer the BlockImporter actor reference
20
+ * @param readyBlocks
21
+ * @param waitingHeaders
22
+ * @param fetchingHeadersState the current state of the headers fetching, whether we
23
+ * - haven't fetched any yet
24
+ * - are awaiting a response
25
+ * - are awaiting a response but it should be ignored due to blocks being invalidated
26
+ * @param fetchingBodiesState the current state of the bodies fetching, whether we
27
+ * - haven't fetched any yet
28
+ * - are awaiting a response
29
+ * - are awaiting a response but it should be ignored due to blocks being invalidated
30
+ * @param stateNodeFetcher
31
+ * @param lastBlock
32
+ * @param knownTop
33
+ * @param blockProviders
34
+ */
15
35
case class BlockFetcherState (
16
36
importer : ActorRef ,
17
37
readyBlocks : Queue [Block ],
18
38
waitingHeaders : Queue [BlockHeader ],
19
- isFetchingHeaders : Boolean ,
20
- isFetchingBodies : Boolean ,
39
+ fetchingHeadersState : FetchingHeadersState ,
40
+ fetchingBodiesState : FetchingBodiesState ,
21
41
stateNodeFetcher : Option [StateNodeFetcher ],
22
42
lastBlock : BigInt ,
23
43
knownTop : BigInt ,
@@ -28,7 +48,7 @@ case class BlockFetcherState(
28
48
29
49
def isFetchingStateNode : Boolean = stateNodeFetcher.isDefined
30
50
31
- def hasEmptyBuffer : Boolean = readyBlocks.isEmpty && waitingHeaders.isEmpty
51
+ private def hasEmptyBuffer : Boolean = readyBlocks.isEmpty && waitingHeaders.isEmpty
32
52
33
53
def hasFetchedTopHeader : Boolean = lastBlock == knownTop
34
54
@@ -53,8 +73,7 @@ case class BlockFetcherState(
53
73
def takeHashes (amount : Int ): Seq [ByteString ] = waitingHeaders.take(amount).map(_.hash)
54
74
55
75
def appendHeaders (headers : Seq [BlockHeader ]): BlockFetcherState =
56
- fetchingHeaders(false )
57
- .withPossibleNewTopAt(headers.lastOption.map(_.number))
76
+ withPossibleNewTopAt(headers.lastOption.map(_.number))
58
77
.copy(
59
78
waitingHeaders = waitingHeaders ++ headers.filter(_.number > lastBlock).sortBy(_.number),
60
79
lastBlock = HeadersSeq .lastNumber(headers).getOrElse(lastBlock)
@@ -86,9 +105,11 @@ case class BlockFetcherState(
86
105
val (matching, waiting) = waitingHeaders.splitAt(bodies.length)
87
106
val blocks = matching.zip(bodies).map((Block .apply _).tupled)
88
107
89
- fetchingBodies(false )
90
- .withPeerForBlocks(peer.id, blocks.map(_.header.number))
91
- .copy(readyBlocks = readyBlocks.enqueue(blocks), waitingHeaders = waiting)
108
+ withPeerForBlocks(peer.id, blocks.map(_.header.number))
109
+ .copy(
110
+ readyBlocks = readyBlocks.enqueue(blocks),
111
+ waitingHeaders = waiting
112
+ )
92
113
}
93
114
94
115
def appendNewBlock (block : Block , fromPeer : PeerId ): BlockFetcherState =
@@ -126,18 +147,25 @@ case class BlockFetcherState(
126
147
127
148
def invalidateBlocksFrom (nr : BigInt ): (Option [PeerId ], BlockFetcherState ) = invalidateBlocksFrom(nr, Some (nr))
128
149
129
- def invalidateBlocksFrom (nr : BigInt , toBlacklist : Option [BigInt ]): (Option [PeerId ], BlockFetcherState ) =
150
+ def invalidateBlocksFrom (nr : BigInt , toBlacklist : Option [BigInt ]): (Option [PeerId ], BlockFetcherState ) = {
151
+ // We can't start completely from scratch as requests could be in progress, we have to keep special track of them
152
+ val newFetchingHeadersState =
153
+ if (fetchingHeadersState == AwaitingHeaders ) AwaitingHeadersToBeIgnored else fetchingHeadersState
154
+ val newFetchingBodiesState =
155
+ if (fetchingBodiesState == AwaitingBodies ) AwaitingBodiesToBeIgnored else fetchingBodiesState
156
+
130
157
(
131
158
toBlacklist.flatMap(blockProviders.get),
132
159
copy(
133
160
readyBlocks = Queue (),
134
161
waitingHeaders = Queue (),
135
162
lastBlock = (nr - 2 ).max(0 ),
136
- isFetchingHeaders = false ,
137
- isFetchingBodies = false ,
163
+ fetchingHeadersState = newFetchingHeadersState ,
164
+ fetchingBodiesState = newFetchingBodiesState ,
138
165
blockProviders = blockProviders - nr
139
166
)
140
167
)
168
+ }
141
169
142
170
def withLastBlock (nr : BigInt ): BlockFetcherState = copy(lastBlock = nr)
143
171
@@ -154,9 +182,13 @@ case class BlockFetcherState(
154
182
def withPeerForBlocks (peerId : PeerId , blocks : Seq [BigInt ]): BlockFetcherState =
155
183
copy(blockProviders = blockProviders ++ blocks.map(block => block -> peerId))
156
184
157
- def fetchingHeaders (isFetching : Boolean ): BlockFetcherState = copy(isFetchingHeaders = isFetching)
185
+ def isFetchingHeaders : Boolean = fetchingHeadersState != NoHeadersFetched
186
+ def withNewHeadersFetch : BlockFetcherState = copy(fetchingHeadersState = AwaitingHeaders )
187
+ def withHeaderFetchReceived : BlockFetcherState = copy(fetchingHeadersState = NoHeadersFetched )
158
188
159
- def fetchingBodies (isFetching : Boolean ): BlockFetcherState = copy(isFetchingBodies = isFetching)
189
+ def isFetchingBodies : Boolean = fetchingBodiesState != NoBodiesFetched
190
+ def withNewBodiesFetch : BlockFetcherState = copy(fetchingBodiesState = AwaitingBodies )
191
+ def withBodiesFetchReceived : BlockFetcherState = copy(fetchingBodiesState = NoBodiesFetched )
160
192
161
193
def fetchingStateNode (hash : ByteString , requestor : ActorRef ): BlockFetcherState =
162
194
copy(stateNodeFetcher = Some (StateNodeFetcher (hash, requestor)))
@@ -188,11 +220,31 @@ object BlockFetcherState {
188
220
importer = importer,
189
221
readyBlocks = Queue (),
190
222
waitingHeaders = Queue (),
191
- isFetchingHeaders = false ,
192
- isFetchingBodies = false ,
223
+ fetchingHeadersState = NoHeadersFetched ,
224
+ fetchingBodiesState = NoBodiesFetched ,
193
225
stateNodeFetcher = None ,
194
226
lastBlock = lastBlock,
195
227
knownTop = lastBlock + 1 ,
196
228
blockProviders = Map ()
197
229
)
230
+
231
+ trait FetchingHeadersState
232
+ case object NoHeadersFetched extends FetchingHeadersState
233
+ case object AwaitingHeaders extends FetchingHeadersState
234
+
235
+ /**
236
+ * Headers request in progress but will be ignored due to invalidation
237
+ * State used to keep track of pending request to prevent multiple requests in parallel
238
+ */
239
+ case object AwaitingHeadersToBeIgnored extends FetchingHeadersState
240
+
241
+ trait FetchingBodiesState
242
+ case object NoBodiesFetched extends FetchingBodiesState
243
+ case object AwaitingBodies extends FetchingBodiesState
244
+
245
+ /**
246
+ * Bodies request in progress but will be ignored due to invalidation
247
+ * State used to keep track of pending request to prevent multiple requests in parallel
248
+ */
249
+ case object AwaitingBodiesToBeIgnored extends FetchingBodiesState
198
250
}
0 commit comments