Skip to content

Commit af21ee4

Browse files
authored
feat: persist block with processing status (#1127)
1 parent 0b163a1 commit af21ee4

File tree

15 files changed

+175
-120
lines changed

15 files changed

+175
-120
lines changed

bench/block_processing.exs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ alias LambdaEthereumConsensus.ForkChoice.Handlers
33
alias LambdaEthereumConsensus.StateTransition.Cache
44
alias LambdaEthereumConsensus.Store
55
alias LambdaEthereumConsensus.Store.BlockDb
6+
alias LambdaEthereumConsensus.Store.BlockDb.BlockInfo
67
alias LambdaEthereumConsensus.Store.StateDb
78
alias Types.BeaconState
89
alias Types.SignedBeaconBlock
@@ -19,8 +20,8 @@ slot = 4_213_280
1920

2021
IO.puts("fetching blocks...")
2122
{:ok, %BeaconState{} = state} = StateDb.get_state_by_slot(slot)
22-
{:ok, %SignedBeaconBlock{} = block} = BlockDb.get_block_by_slot(slot)
23-
{:ok, %SignedBeaconBlock{} = new_block} = BlockDb.get_block_by_slot(slot + 1)
23+
{:ok, %BlockInfo{signed_block: block}} = BlockDb.get_block_info_by_slot(slot)
24+
{:ok, %BlockInfo{signed_block: new_block}} = BlockDb.get_block_info_by_slot(slot + 1)
2425

2526
IO.puts("initializing store...")
2627
{:ok, store} = Types.Store.get_forkchoice_store(state, block)

bench/multiple_blocks_processing.exs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ alias LambdaEthereumConsensus.ForkChoice.Handlers
33
alias LambdaEthereumConsensus.StateTransition.Cache
44
alias LambdaEthereumConsensus.Store
55
alias LambdaEthereumConsensus.Store.BlockDb
6+
alias LambdaEthereumConsensus.Store.BlockDb.BlockInfo
67
alias LambdaEthereumConsensus.Store.StateDb
78
alias Types.BeaconState
89
alias Types.SignedBeaconBlock
@@ -21,14 +22,14 @@ end_slot = start_slot + count
2122

2223
IO.puts("fetching blocks...")
2324
{:ok, %BeaconState{} = state} = StateDb.get_state_by_slot(start_slot)
24-
{:ok, %SignedBeaconBlock{} = signed_block} = BlockDb.get_block_by_slot(state.slot)
25+
{:ok, %BlockInfo{signed_block: signed_block}} = BlockDb.get_block_info_by_slot(state.slot)
2526

2627
blocks =
2728
(start_slot + 1)..end_slot
2829
# NOTE: we have to consider empty slots
2930
|> Enum.flat_map(fn slot ->
30-
case BlockDb.get_block_by_slot(slot) do
31-
{:ok, block} -> [block]
31+
case BlockDb.get_block_info_by_slot(slot) do
32+
{:ok, %BlockInfo{signed_block: block}} -> [block]
3233
:not_found -> []
3334
end
3435
end)

lib/beacon_api/controllers/v2/beacon_controller.ex

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ defmodule BeaconApi.V2.BeaconController do
55
alias BeaconApi.ErrorController
66
alias BeaconApi.Utils
77
alias LambdaEthereumConsensus.Store.BlockDb
8+
alias LambdaEthereumConsensus.Store.BlockDb.BlockInfo
89
alias LambdaEthereumConsensus.Store.Blocks
910
alias Types
1011

@@ -36,8 +37,8 @@ defmodule BeaconApi.V2.BeaconController do
3637

3738
def get_block(conn, %{block_id: "0x" <> hex_block_id}) do
3839
with {:ok, block_root} <- Base.decode16(hex_block_id, case: :mixed),
39-
%{} = block <- Blocks.get_signed_block(block_root) do
40-
conn |> block_response(block)
40+
%{} = block_info <- Blocks.get_block_info(block_root) do
41+
conn |> block_response(block_info.signed_block)
4142
else
4243
nil -> conn |> block_not_found()
4344
_ -> conn |> ErrorController.bad_request("Invalid block ID: 0x#{hex_block_id}")
@@ -46,8 +47,8 @@ defmodule BeaconApi.V2.BeaconController do
4647

4748
def get_block(conn, %{block_id: block_id}) do
4849
with {slot, ""} when slot >= 0 <- Integer.parse(block_id),
49-
{:ok, block} <- BlockDb.get_block_by_slot(slot) do
50-
conn |> block_response(block)
50+
{:ok, %BlockInfo{signed_block: signed_block}} <- BlockDb.get_block_info_by_slot(slot) do
51+
conn |> block_response(signed_block)
5152
else
5253
:not_found ->
5354
conn |> block_not_found()

lib/beacon_api/helpers.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ defmodule BeaconApi.Helpers do
100100

101101
def state_root_by_state_id(id) do
102102
with {:ok, {block_root, optimistic, finalized}} <- block_root_by_block_id(id),
103-
{:ok, block} <- BlockDb.get_block(block_root) do
104-
%{message: %{state_root: state_root}} = block
103+
{:ok, block_info} <- BlockDb.get_block_info(block_root) do
104+
state_root = block_info.signed_block.message.state_root
105105
{:ok, {state_root, optimistic, finalized}}
106106
end
107107
end
@@ -114,8 +114,8 @@ defmodule BeaconApi.Helpers do
114114
| :invalid_id
115115
def block_by_block_id(block_id) do
116116
with {:ok, {root, optimistic, finalized}} <- block_root_by_block_id(block_id),
117-
{:ok, block} <- BlockDb.get_block(root) do
118-
{:ok, {block, optimistic, finalized}}
117+
{:ok, block_info} <- BlockDb.get_block_info(root) do
118+
{:ok, {block_info.signed_block, optimistic, finalized}}
119119
end
120120
end
121121

lib/lambda_ethereum_consensus/beacon/pending_blocks.ex

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
1212
alias LambdaEthereumConsensus.P2P.BlobDownloader
1313
alias LambdaEthereumConsensus.P2P.BlockDownloader
1414
alias LambdaEthereumConsensus.Store.BlobDb
15+
alias LambdaEthereumConsensus.Store.BlockDb.BlockInfo
1516
alias LambdaEthereumConsensus.Store.Blocks
1617
alias Types.SignedBeaconBlock
1718

@@ -167,6 +168,8 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
167168
|> Enum.map(fn {root, {block, _}} -> {root, block} end)
168169
|> Enum.sort_by(fn {_, signed_block} -> signed_block.message.slot end)
169170
|> Enum.reduce(state, fn {block_root, signed_block}, state ->
171+
block_info = BlockInfo.from_block(signed_block, block_root, :pending)
172+
170173
parent_root = signed_block.message.parent_root
171174
parent_status = get_block_status(state, parent_root)
172175

@@ -185,7 +188,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
185188

186189
# If all the other conditions are false, add block to fork choice
187190
true ->
188-
ForkChoice.on_block(signed_block, block_root)
191+
ForkChoice.on_block(block_info)
189192
state |> Map.put(block_root, {signed_block, :processing})
190193
end
191194
end)

lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do
1212
alias LambdaEthereumConsensus.ForkChoice.Head
1313
alias LambdaEthereumConsensus.P2P.Gossip.OperationsCollector
1414
alias LambdaEthereumConsensus.Store.BlockDb
15+
alias LambdaEthereumConsensus.Store.BlockDb.BlockInfo
1516
alias LambdaEthereumConsensus.Store.Blocks
1617
alias LambdaEthereumConsensus.Store.StateDb
1718
alias LambdaEthereumConsensus.Store.StoreDb
@@ -36,9 +37,9 @@ defmodule LambdaEthereumConsensus.ForkChoice do
3637
GenServer.cast(__MODULE__, {:on_tick, time})
3738
end
3839

39-
@spec on_block(SignedBeaconBlock.t(), Types.root()) :: :ok | :error
40-
def on_block(signed_block, block_root) do
41-
GenServer.cast(__MODULE__, {:on_block, block_root, signed_block, self()})
40+
@spec on_block(BlockInfo.t()) :: :ok | :error
41+
def on_block(block_info) do
42+
GenServer.cast(__MODULE__, {:on_block, block_info, self()})
4243
end
4344

4445
@spec on_attestation(Types.Attestation.t()) :: :ok
@@ -69,16 +70,17 @@ defmodule LambdaEthereumConsensus.ForkChoice do
6970
end
7071

7172
@impl GenServer
72-
def handle_cast({:on_block, block_root, %SignedBeaconBlock{} = signed_block, from}, store) do
73-
slot = signed_block.message.slot
73+
def handle_cast({:on_block, %BlockInfo{} = block_info, from}, store) do
74+
slot = block_info.signed_block.message.slot
75+
block_root = block_info.root
7476

75-
Logger.info("[Fork choice] Adding new block", root: block_root, slot: slot)
77+
Logger.info("[Fork choice] Adding new block", root: block_info.root, slot: slot)
7678

7779
%Store{finalized_checkpoint: last_finalized_checkpoint} = store
7880

7981
result =
8082
:telemetry.span([:sync, :on_block], %{}, fn ->
81-
{process_block(block_root, signed_block, store), %{}}
83+
{process_block(block_info, store), %{}}
8284
end)
8385

8486
case result do
@@ -171,8 +173,9 @@ defmodule LambdaEthereumConsensus.ForkChoice do
171173
end)
172174
end
173175

174-
defp process_block(_block_root, %SignedBeaconBlock{} = signed_block, store) do
175-
with {:ok, new_store} <- Handlers.on_block(store, signed_block),
176+
@spec process_block(BlockInfo.t(), Store.t()) :: Store.t()
177+
defp process_block(%BlockInfo{signed_block: signed_block} = block_info, store) do
178+
with {:ok, new_store} <- Handlers.on_block(store, block_info),
176179
# process block attestations
177180
{:ok, new_store} <-
178181
signed_block.message.body.attestations

lib/lambda_ethereum_consensus/fork_choice/handlers.ex

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
1111
alias LambdaEthereumConsensus.StateTransition.Misc
1212
alias LambdaEthereumConsensus.StateTransition.Predicates
1313
alias LambdaEthereumConsensus.Store.BlobDb
14+
alias LambdaEthereumConsensus.Store.BlockDb.BlockInfo
1415
alias LambdaEthereumConsensus.Store.Blocks
1516
alias LambdaEthereumConsensus.Store.BlockStates
1617

@@ -22,7 +23,6 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
2223
alias Types.Checkpoint
2324
alias Types.IndexedAttestation
2425
alias Types.NewPayloadRequest
25-
alias Types.SignedBeaconBlock
2626
alias Types.Store
2727

2828
import LambdaEthereumConsensus.Utils, only: [if_then_update: 3, map_ok: 2]
@@ -54,8 +54,9 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
5454
A block that is asserted as invalid due to unavailable PoW block may be valid at a later time,
5555
consider scheduling it for later processing in such case.
5656
"""
57-
@spec on_block(Store.t(), SignedBeaconBlock.t()) :: {:ok, Store.t()} | {:error, String.t()}
58-
def on_block(%Store{} = store, %SignedBeaconBlock{message: block} = signed_block) do
57+
@spec on_block(Store.t(), BlockInfo.t()) :: {:ok, Store.t()} | {:error, String.t()}
58+
def on_block(%Store{} = store, %BlockInfo{} = block_info) do
59+
block = block_info.signed_block.message
5960
%{epoch: finalized_epoch, root: finalized_root} = store.finalized_checkpoint
6061
finalized_slot = Misc.compute_start_slot_at_epoch(finalized_epoch)
6162

@@ -80,11 +81,11 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
8081
finalized_root != Store.get_checkpoint_block(store, block.parent_root, finalized_epoch) ->
8182
{:error, "block isn't descendant of latest finalized block"}
8283

83-
not (Ssz.hash_tree_root!(block) |> data_available?(block.body.blob_kzg_commitments)) ->
84+
not (block_info.root |> data_available?(block.body.blob_kzg_commitments)) ->
8485
{:error, "blob data not available"}
8586

8687
true ->
87-
compute_post_state(store, signed_block, base_state)
88+
compute_post_state(store, block_info, base_state)
8889
end
8990
end
9091

@@ -188,8 +189,8 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
188189
end
189190

190191
# Check the block is valid and compute the post-state.
191-
def compute_post_state(%Store{} = store, %SignedBeaconBlock{} = signed_block, state) do
192-
%{message: block} = signed_block
192+
def compute_post_state(%Store{} = store, %BlockInfo{} = block_info, state) do
193+
block = block_info.signed_block.message
193194

194195
payload = block.body.execution_payload
195196
parent_beacon_block_root = block.parent_root
@@ -210,33 +211,31 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
210211
|> handle_verify_payload_result()
211212
end)
212213

213-
with {:ok, state} <- StateTransition.state_transition(state, signed_block, true),
214+
with {:ok, state} <- StateTransition.state_transition(state, block_info.signed_block, true),
214215
{:ok, _execution_status} <- Task.await(payload_verification_task) do
215216
seconds_per_slot = ChainSpec.get("SECONDS_PER_SLOT")
216217
intervals_per_slot = Constants.intervals_per_slot()
217218
# Add proposer score boost if the block is timely
218219
time_into_slot = rem(store.time - store.genesis_time, seconds_per_slot)
219220
is_before_attesting_interval = time_into_slot < div(seconds_per_slot, intervals_per_slot)
220221

221-
block_root = Ssz.hash_tree_root!(block)
222-
223222
# Add new block and state to the store
224-
BlockStates.store_state(block_root, state)
223+
BlockStates.store_state(block_info.root, state)
225224

226225
is_first_block = store.proposer_boost_root == <<0::256>>
227226
# TODO: store block timeliness data?
228227
is_timely = Store.get_current_slot(store) == block.slot and is_before_attesting_interval
229228

230229
store
231-
|> Store.store_block(block_root, signed_block)
230+
|> Store.store_block_info(block_info)
232231
|> if_then_update(
233232
is_timely and is_first_block,
234-
&%Store{&1 | proposer_boost_root: block_root}
233+
&%Store{&1 | proposer_boost_root: block_info.root}
235234
)
236235
# Update checkpoints in store if necessary
237236
|> update_checkpoints(state.current_justified_checkpoint, state.finalized_checkpoint)
238237
# Eagerly compute unrealized justification and finality
239-
|> compute_pulled_up_tip(block_root, signed_block.message, state)
238+
|> compute_pulled_up_tip(block_info.root, block_info.signed_block.message, state)
240239
end
241240
end
242241

lib/lambda_ethereum_consensus/p2p/incoming_requests/handler.ex

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ defmodule LambdaEthereumConsensus.P2P.IncomingRequests.Handler do
8484
# TODO: extend cache to support slots as keys
8585
response_chunk =
8686
start_slot..end_slot
87-
|> Enum.map(&BlockDb.get_block_by_slot/1)
87+
|> Enum.map(&BlockDb.get_block_info_by_slot/1)
8888
|> Enum.map(&map_block_result/1)
8989
|> Enum.reject(&(&1 == :skip))
9090
|> ReqResp.encode_response()
@@ -103,7 +103,7 @@ defmodule LambdaEthereumConsensus.P2P.IncomingRequests.Handler do
103103
response_chunk =
104104
roots
105105
|> Enum.take(truncated_count)
106-
|> Enum.map(&Blocks.get_signed_block/1)
106+
|> Enum.map(&Blocks.get_block_info/1)
107107
|> Enum.map(&map_block_result/1)
108108
|> Enum.reject(&(&1 == :skip))
109109
|> ReqResp.encode_response()
@@ -124,6 +124,11 @@ defmodule LambdaEthereumConsensus.P2P.IncomingRequests.Handler do
124124
defp map_block_result({:ok, block}), do: map_block_result(block)
125125
defp map_block_result({:error, _}), do: {:error, {2, "Server Error"}}
126126

127-
defp map_block_result(block),
128-
do: {:ok, {block, BeaconChain.get_fork_digest_for_slot(block.message.slot)}}
127+
alias LambdaEthereumConsensus.Store.BlockDb.BlockInfo
128+
129+
defp map_block_result(%BlockInfo{} = block_info),
130+
do:
131+
{:ok,
132+
{block_info.signed_block,
133+
BeaconChain.get_fork_digest_for_slot(block_info.signed_block.message.slot)}}
129134
end

0 commit comments

Comments
 (0)