Skip to content

Commit 203aace

Browse files
authored
refactor: remove forkchoice casts (#1126)
1 parent 2297ad1 commit 203aace

File tree

7 files changed

+174
-93
lines changed

7 files changed

+174
-93
lines changed

lib/lambda_ethereum_consensus/beacon/beacon_chain.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do
33

44
use GenServer
55

6-
alias LambdaEthereumConsensus.ForkChoice
6+
alias LambdaEthereumConsensus.Beacon.PendingBlocks
77
alias LambdaEthereumConsensus.P2P.Gossip
88
alias LambdaEthereumConsensus.StateTransition.Misc
99
alias LambdaEthereumConsensus.Validator.ValidatorManager
@@ -173,7 +173,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do
173173
new_state = %BeaconChainState{state | time: time}
174174

175175
if time >= state.genesis_time do
176-
ForkChoice.on_tick(time)
176+
PendingBlocks.on_tick(time)
177177
# TODO: reduce time between ticks to account for gnosis' 5s slot time.
178178
old_logical_time = compute_logical_time(state)
179179
new_logical_time = compute_logical_time(new_state)

lib/lambda_ethereum_consensus/beacon/beacon_node.ex

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
55
require Logger
66

77
alias LambdaEthereumConsensus.Beacon.StoreSetup
8+
alias LambdaEthereumConsensus.ForkChoice
89
alias LambdaEthereumConsensus.ForkChoice.Head
910
alias LambdaEthereumConsensus.StateTransition.Cache
1011
alias LambdaEthereumConsensus.Store.Blocks
@@ -37,14 +38,15 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
3738
finalized: store.finalized_checkpoint
3839
}
3940

41+
ForkChoice.init_store(store, head_slot, time)
42+
4043
validator_children =
4144
get_validator_children(deposit_tree_snapshot, head_slot, head_root, store.genesis_time)
4245

4346
children =
4447
[
4548
{LambdaEthereumConsensus.Beacon.BeaconChain,
4649
{store.genesis_time, genesis_validators_root, fork_choice_data, time}},
47-
{LambdaEthereumConsensus.ForkChoice, {store, head_slot, time}},
4850
{LambdaEthereumConsensus.Libp2pPort, libp2p_args},
4951
LambdaEthereumConsensus.P2P.Peerbook,
5052
LambdaEthereumConsensus.P2P.IncomingRequests,
@@ -53,7 +55,9 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
5355
LambdaEthereumConsensus.P2P.Gossip.Attestation,
5456
LambdaEthereumConsensus.P2P.Gossip.BeaconBlock,
5557
LambdaEthereumConsensus.P2P.Gossip.BlobSideCar,
56-
LambdaEthereumConsensus.P2P.Gossip.OperationsCollector
58+
LambdaEthereumConsensus.P2P.Gossip.OperationsCollector,
59+
{Task.Supervisor, name: PruneStatesSupervisor},
60+
{Task.Supervisor, name: PruneBlocksSupervisor}
5761
] ++ validator_children
5862

5963
Supervisor.init(children, strategy: :one_for_all)

lib/lambda_ethereum_consensus/beacon/pending_blocks.ex

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
1616
alias LambdaEthereumConsensus.Store.Blocks
1717
alias Types.SignedBeaconBlock
1818

19-
@type block_status :: :pending | :invalid | :processing | :download | :download_blobs | :unknown
19+
@type block_status :: :pending | :invalid | :download | :download_blobs | :unknown
2020
@type block_info ::
2121
{SignedBeaconBlock.t(), :pending | :download_blobs}
22-
| {nil, :invalid | :processing | :download}
22+
| {nil, :invalid | :download}
2323
@type state :: %{Types.root() => block_info()}
2424

2525
##########################
@@ -35,6 +35,11 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
3535
GenServer.cast(__MODULE__, {:add_block, signed_block})
3636
end
3737

38+
@spec on_tick(Types.uint64()) :: :ok
39+
def on_tick(time) do
40+
GenServer.cast(__MODULE__, {:on_tick, time})
41+
end
42+
3843
##########################
3944
### GenServer Callbacks
4045
##########################
@@ -70,16 +75,9 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
7075
end
7176

7277
@impl true
73-
def handle_cast({:block_processed, block_root, true}, state) do
74-
# Block is valid. We immediately check if we can process another block.
75-
new_state = state |> Map.delete(block_root) |> process_blocks()
76-
{:noreply, new_state}
77-
end
78-
79-
@impl true
80-
def handle_cast({:block_processed, block_root, false}, state) do
81-
# Block is invalid
82-
{:noreply, state |> Map.put(block_root, {nil, :invalid})}
78+
def handle_cast({:on_tick, time}, state) do
79+
ForkChoice.on_tick(time)
80+
{:noreply, state}
8381
end
8482

8583
@doc """
@@ -179,7 +177,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
179177
state |> Map.put(block_root, {nil, :invalid})
180178

181179
# If parent isn't processed, block is pending
182-
parent_status in [:processing, :pending, :download, :download_blobs] ->
180+
parent_status in [:pending, :download, :download_blobs] ->
183181
state
184182

185183
# If parent is not in fork choice, download parent
@@ -188,12 +186,26 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
188186

189187
# If all the other conditions are false, add block to fork choice
190188
true ->
191-
ForkChoice.on_block(block_info)
192-
state |> Map.put(block_root, {signed_block, :processing})
189+
process_block(state, block_info)
193190
end
194191
end)
195192
end
196193

194+
defp process_block(state, block_info) do
195+
case ForkChoice.on_block(block_info) do
196+
:ok ->
197+
state |> Map.delete(block_info.root)
198+
199+
{:error, reason} ->
200+
Logger.error("[PendingBlocks] Saving block as invalid #{reason}",
201+
slot: block_info.signed_block.message.slot,
202+
root: block_info.root
203+
)
204+
205+
state |> Map.put(block_info.root, {nil, :invalid})
206+
end
207+
end
208+
197209
@spec get_block_status(state(), Types.root()) :: block_status()
198210
defp get_block_status(state, block_root) do
199211
state |> Map.get(block_root, {nil, :unknown}) |> elem(1)

lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex

Lines changed: 31 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ defmodule LambdaEthereumConsensus.ForkChoice do
33
The Store is responsible for tracking information required for the fork choice algorithm.
44
"""
55

6-
use GenServer
76
require Logger
87

98
alias LambdaEthereumConsensus.Beacon.BeaconChain
@@ -18,47 +17,14 @@ defmodule LambdaEthereumConsensus.ForkChoice do
1817
alias LambdaEthereumConsensus.Store.StoreDb
1918
alias LambdaEthereumConsensus.Validator.ValidatorManager
2019
alias Types.Attestation
21-
alias Types.BeaconState
22-
alias Types.SignedBeaconBlock
2320
alias Types.Store
2421

2522
##########################
2623
### Public API
2724
##########################
2825

29-
@spec start_link({BeaconState.t(), SignedBeaconBlock.t(), Types.uint64()}) ::
30-
:ignore | {:error, any} | {:ok, pid}
31-
def start_link(opts) do
32-
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
33-
end
34-
35-
@spec on_tick(Types.uint64()) :: :ok
36-
def on_tick(time) do
37-
GenServer.cast(__MODULE__, {:on_tick, time})
38-
end
39-
40-
@spec on_block(BlockInfo.t()) :: :ok | :error
41-
def on_block(block_info) do
42-
GenServer.cast(__MODULE__, {:on_block, block_info, self()})
43-
end
44-
45-
@spec on_attestation(Types.Attestation.t()) :: :ok
46-
def on_attestation(%Attestation{} = attestation) do
47-
GenServer.cast(__MODULE__, {:on_attestation, attestation})
48-
end
49-
50-
@spec notify_attester_slashing(Types.AttesterSlashing.t()) :: :ok
51-
def notify_attester_slashing(attester_slashing) do
52-
GenServer.cast(__MODULE__, {:attester_slashing, attester_slashing})
53-
end
54-
55-
##########################
56-
### GenServer Callbacks
57-
##########################
58-
59-
@impl GenServer
60-
@spec init({Store.t(), Types.slot(), Types.uint64()}) :: {:ok, Store.t()} | {:stop, any}
61-
def init({%Store{} = store, head_slot, time}) do
26+
@spec init_store(Store.t(), Types.slot(), Types.uint64()) :: :ok | :error
27+
def init_store(%Store{} = store, head_slot, time) do
6228
Logger.info("[Fork choice] Initialized store.", slot: head_slot)
6329

6430
store = Handlers.on_tick(store, time)
@@ -67,11 +33,10 @@ defmodule LambdaEthereumConsensus.ForkChoice do
6733
:telemetry.execute([:sync, :on_block], %{slot: head_slot})
6834

6935
persist_store(store)
70-
{:ok, store}
7136
end
7237

73-
@impl GenServer
74-
def handle_cast({:on_block, %BlockInfo{} = block_info, from}, _store) do
38+
@spec on_block(BlockInfo.t()) :: :ok | {:error, String.t()}
39+
def on_block(%BlockInfo{} = block_info) do
7540
store = fetch_store!()
7641
slot = block_info.signed_block.message.slot
7742
block_root = block_info.root
@@ -90,25 +55,24 @@ defmodule LambdaEthereumConsensus.ForkChoice do
9055
:telemetry.execute([:sync, :on_block], %{slot: slot})
9156
Logger.info("[Fork choice] Added new block", slot: slot, root: block_root)
9257

93-
Task.async(__MODULE__, :recompute_head, [new_store])
58+
:telemetry.span([:fork_choice, :recompute_head], %{}, fn ->
59+
{recompute_head(new_store), %{}}
60+
end)
9461

9562
%Store{finalized_checkpoint: new_finalized_checkpoint} = new_store
9663

9764
prune_old_states(last_finalized_checkpoint.epoch, new_finalized_checkpoint.epoch)
9865

9966
persist_store(new_store)
100-
GenServer.cast(from, {:block_processed, block_root, true})
101-
{:noreply, new_store}
10267

10368
{:error, reason} ->
10469
Logger.error("[Fork choice] Failed to add block: #{reason}", slot: slot, root: block_root)
105-
GenServer.cast(from, {:block_processed, block_root, false})
106-
{:noreply, store}
70+
{:error, reason}
10771
end
10872
end
10973

110-
@impl GenServer
111-
def handle_cast({:on_attestation, %Attestation{} = attestation}, %Store{} = _state) do
74+
@spec on_attestation(Types.Attestation.t()) :: :ok
75+
def on_attestation(%Attestation{} = attestation) do
11276
state = fetch_store!()
11377
id = attestation.signature |> Base.encode16() |> String.slice(0, 8)
11478
Logger.debug("[Fork choice] Adding attestation #{id} to the store")
@@ -120,61 +84,56 @@ defmodule LambdaEthereumConsensus.ForkChoice do
12084
end
12185

12286
persist_store(state)
123-
{:noreply, state}
12487
end
12588

126-
@impl GenServer
127-
def handle_cast({:attester_slashing, attester_slashing}, _state) do
89+
@spec on_attester_slashing(Types.AttesterSlashing.t()) :: :ok
90+
def on_attester_slashing(attester_slashing) do
12891
Logger.info("[Fork choice] Adding attester slashing to the store")
12992
state = fetch_store!()
13093

131-
state =
132-
case Handlers.on_attester_slashing(state, attester_slashing) do
133-
{:ok, new_state} ->
134-
new_state
135-
136-
_ ->
137-
Logger.error("[Fork choice] Failed to add attester slashing to the store")
138-
state
139-
end
94+
case Handlers.on_attester_slashing(state, attester_slashing) do
95+
{:ok, new_state} ->
96+
persist_store(new_state)
14097

141-
persist_store(state)
142-
{:noreply, state}
98+
_ ->
99+
Logger.error("[Fork choice] Failed to add attester slashing to the store")
100+
end
143101
end
144102

145-
@impl GenServer
146-
def handle_cast({:on_tick, time}, _store) do
103+
@spec on_tick(Types.uint64()) :: :ok
104+
def on_tick(time) do
147105
store = fetch_store!()
148106
%Store{finalized_checkpoint: last_finalized_checkpoint} = store
149107

150108
new_store = Handlers.on_tick(store, time)
151109
%Store{finalized_checkpoint: new_finalized_checkpoint} = new_store
152110
prune_old_states(last_finalized_checkpoint.epoch, new_finalized_checkpoint.epoch)
153111
persist_store(new_store)
154-
{:noreply, new_store}
155-
end
156-
157-
@impl GenServer
158-
def handle_info(_msg, state) do
159-
{:noreply, state}
160112
end
161113

162114
##########################
163115
### Private Functions
164116
##########################
165117

166-
def prune_old_states(last_finalized_epoch, new_finalized_epoch) do
118+
defp prune_old_states(last_finalized_epoch, new_finalized_epoch) do
167119
if last_finalized_epoch < new_finalized_epoch do
168120
new_finalized_slot =
169121
new_finalized_epoch * ChainSpec.get("SLOTS_PER_EPOCH")
170122

171-
Task.async(StateDb, :prune_states_older_than, [new_finalized_slot])
172-
Task.async(BlockDb, :prune_blocks_older_than, [new_finalized_slot])
123+
Task.Supervisor.start_child(
124+
PruneStatesSupervisor,
125+
fn -> StateDb.prune_states_older_than(new_finalized_slot) end
126+
)
127+
128+
Task.Supervisor.start_child(
129+
PruneBlocksSupervisor,
130+
fn -> BlockDb.prune_blocks_older_than(new_finalized_slot) end
131+
)
173132
end
174133
end
175134

176135
@spec apply_handler(any(), any(), any()) :: any()
177-
def apply_handler(iter, state, handler) do
136+
defp apply_handler(iter, state, handler) do
178137
iter
179138
|> Enum.reduce_while({:ok, state}, fn
180139
x, {:ok, st} -> {:cont, handler.(st, x)}

lib/lambda_ethereum_consensus/store/block_db.ex

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do
1818
@type block_status ::
1919
:pending
2020
| :invalid
21-
| :processing
2221
| :download
2322
| :download_blobs
2423
| :unknown
@@ -47,7 +46,6 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do
4746
when atom in [
4847
:pending,
4948
:invalid,
50-
:processing,
5149
:download,
5250
:download_blobs,
5351
:unknown,

lib/lambda_ethereum_consensus/telemetry.ex

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ defmodule LambdaEthereumConsensus.Telemetry do
125125
last_value("db.size.total", unit: :byte),
126126

127127
# ForkChoice Metrics
128+
last_value("fork_choice.recompute_head.stop.duration",
129+
unit: {:native, :millisecond}
130+
),
131+
last_value("fork_choice.recompute_head.exception.duration",
132+
unit: {:native, :millisecond}
133+
),
128134
last_value("fork_choice.persist.stop.duration",
129135
unit: {:native, :millisecond}
130136
),

0 commit comments

Comments
 (0)