Skip to content

refactor: remove forkchoice casts #1126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/lambda_ethereum_consensus/beacon/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do

use GenServer

alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.Beacon.PendingBlocks
alias LambdaEthereumConsensus.P2P.Gossip
alias LambdaEthereumConsensus.StateTransition.Misc
alias LambdaEthereumConsensus.Validator.ValidatorManager
Expand Down Expand Up @@ -173,7 +173,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do
new_state = %BeaconChainState{state | time: time}

if time >= state.genesis_time do
ForkChoice.on_tick(time)
PendingBlocks.on_tick(time)
# TODO: reduce time between ticks to account for gnosis' 5s slot time.
old_logical_time = compute_logical_time(state)
new_logical_time = compute_logical_time(new_state)
Expand Down
8 changes: 6 additions & 2 deletions lib/lambda_ethereum_consensus/beacon/beacon_node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
require Logger

alias LambdaEthereumConsensus.Beacon.StoreSetup
alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.ForkChoice.Head
alias LambdaEthereumConsensus.StateTransition.Cache
alias LambdaEthereumConsensus.Store.Blocks
Expand Down Expand Up @@ -37,14 +38,15 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
finalized: store.finalized_checkpoint
}

ForkChoice.init_store(store, head_slot, time)

validator_children =
get_validator_children(deposit_tree_snapshot, head_slot, head_root, store.genesis_time)

children =
[
{LambdaEthereumConsensus.Beacon.BeaconChain,
{store.genesis_time, genesis_validators_root, fork_choice_data, time}},
{LambdaEthereumConsensus.ForkChoice, {store, head_slot, time}},
{LambdaEthereumConsensus.Libp2pPort, libp2p_args},
LambdaEthereumConsensus.P2P.Peerbook,
LambdaEthereumConsensus.P2P.IncomingRequests,
Expand All @@ -53,7 +55,9 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
LambdaEthereumConsensus.P2P.Gossip.Attestation,
LambdaEthereumConsensus.P2P.Gossip.BeaconBlock,
LambdaEthereumConsensus.P2P.Gossip.BlobSideCar,
LambdaEthereumConsensus.P2P.Gossip.OperationsCollector
LambdaEthereumConsensus.P2P.Gossip.OperationsCollector,
{Task.Supervisor, name: PruneStatesSupervisor},
{Task.Supervisor, name: PruneBlocksSupervisor}
] ++ validator_children

Supervisor.init(children, strategy: :one_for_all)
Expand Down
42 changes: 27 additions & 15 deletions lib/lambda_ethereum_consensus/beacon/pending_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
alias LambdaEthereumConsensus.Store.Blocks
alias Types.SignedBeaconBlock

@type block_status :: :pending | :invalid | :processing | :download | :download_blobs | :unknown
@type block_status :: :pending | :invalid | :download | :download_blobs | :unknown
@type block_info ::
{SignedBeaconBlock.t(), :pending | :download_blobs}
| {nil, :invalid | :processing | :download}
| {nil, :invalid | :download}
@type state :: %{Types.root() => block_info()}

##########################
Expand All @@ -35,6 +35,11 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
GenServer.cast(__MODULE__, {:add_block, signed_block})
end

@spec on_tick(Types.uint64()) :: :ok
def on_tick(time) do
GenServer.cast(__MODULE__, {:on_tick, time})
end

##########################
### GenServer Callbacks
##########################
Expand Down Expand Up @@ -70,16 +75,9 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
end

@impl true
def handle_cast({:block_processed, block_root, true}, state) do
# Block is valid. We immediately check if we can process another block.
new_state = state |> Map.delete(block_root) |> process_blocks()
{:noreply, new_state}
end

@impl true
def handle_cast({:block_processed, block_root, false}, state) do
# Block is invalid
{:noreply, state |> Map.put(block_root, {nil, :invalid})}
def handle_cast({:on_tick, time}, state) do
ForkChoice.on_tick(time)
{:noreply, state}
end

@doc """
Expand Down Expand Up @@ -179,7 +177,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
state |> Map.put(block_root, {nil, :invalid})

# If parent isn't processed, block is pending
parent_status in [:processing, :pending, :download, :download_blobs] ->
parent_status in [:pending, :download, :download_blobs] ->
state

# If parent is not in fork choice, download parent
Expand All @@ -188,12 +186,26 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do

# If all the other conditions are false, add block to fork choice
true ->
ForkChoice.on_block(block_info)
state |> Map.put(block_root, {signed_block, :processing})
process_block(state, block_info)
end
end)
end

defp process_block(state, block_info) do
case ForkChoice.on_block(block_info) do
:ok ->
state |> Map.delete(block_info.root)

{:error, reason} ->
Logger.error("[PendingBlocks] Saving block as invalid #{reason}",
slot: block_info.signed_block.message.slot,
root: block_info.root
)

state |> Map.put(block_info.root, {nil, :invalid})
end
end

@spec get_block_status(state(), Types.root()) :: block_status()
defp get_block_status(state, block_root) do
state |> Map.get(block_root, {nil, :unknown}) |> elem(1)
Expand Down
103 changes: 31 additions & 72 deletions lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ defmodule LambdaEthereumConsensus.ForkChoice do
The Store is responsible for tracking information required for the fork choice algorithm.
"""

use GenServer
require Logger

alias LambdaEthereumConsensus.Beacon.BeaconChain
Expand All @@ -18,47 +17,14 @@ defmodule LambdaEthereumConsensus.ForkChoice do
alias LambdaEthereumConsensus.Store.StoreDb
alias LambdaEthereumConsensus.Validator.ValidatorManager
alias Types.Attestation
alias Types.BeaconState
alias Types.SignedBeaconBlock
alias Types.Store

##########################
### Public API
##########################

@spec start_link({BeaconState.t(), SignedBeaconBlock.t(), Types.uint64()}) ::
:ignore | {:error, any} | {:ok, pid}
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@spec on_tick(Types.uint64()) :: :ok
def on_tick(time) do
GenServer.cast(__MODULE__, {:on_tick, time})
end

@spec on_block(BlockInfo.t()) :: :ok | :error
def on_block(block_info) do
GenServer.cast(__MODULE__, {:on_block, block_info, self()})
end

@spec on_attestation(Types.Attestation.t()) :: :ok
def on_attestation(%Attestation{} = attestation) do
GenServer.cast(__MODULE__, {:on_attestation, attestation})
end

@spec notify_attester_slashing(Types.AttesterSlashing.t()) :: :ok
def notify_attester_slashing(attester_slashing) do
GenServer.cast(__MODULE__, {:attester_slashing, attester_slashing})
end

##########################
### GenServer Callbacks
##########################

@impl GenServer
@spec init({Store.t(), Types.slot(), Types.uint64()}) :: {:ok, Store.t()} | {:stop, any}
def init({%Store{} = store, head_slot, time}) do
@spec init_store(Store.t(), Types.slot(), Types.uint64()) :: :ok | :error
def init_store(%Store{} = store, head_slot, time) do
Logger.info("[Fork choice] Initialized store.", slot: head_slot)

store = Handlers.on_tick(store, time)
Expand All @@ -67,11 +33,10 @@ defmodule LambdaEthereumConsensus.ForkChoice do
:telemetry.execute([:sync, :on_block], %{slot: head_slot})

persist_store(store)
{:ok, store}
end

@impl GenServer
def handle_cast({:on_block, %BlockInfo{} = block_info, from}, _store) do
@spec on_block(BlockInfo.t()) :: :ok | {:error, String.t()}
def on_block(%BlockInfo{} = block_info) do
store = fetch_store!()
slot = block_info.signed_block.message.slot
block_root = block_info.root
Expand All @@ -90,25 +55,24 @@ defmodule LambdaEthereumConsensus.ForkChoice do
:telemetry.execute([:sync, :on_block], %{slot: slot})
Logger.info("[Fork choice] Added new block", slot: slot, root: block_root)

Task.async(__MODULE__, :recompute_head, [new_store])
:telemetry.span([:fork_choice, :recompute_head], %{}, fn ->
{recompute_head(new_store), %{}}
end)

%Store{finalized_checkpoint: new_finalized_checkpoint} = new_store

prune_old_states(last_finalized_checkpoint.epoch, new_finalized_checkpoint.epoch)

persist_store(new_store)
GenServer.cast(from, {:block_processed, block_root, true})
{:noreply, new_store}

{:error, reason} ->
Logger.error("[Fork choice] Failed to add block: #{reason}", slot: slot, root: block_root)
GenServer.cast(from, {:block_processed, block_root, false})
{:noreply, store}
{:error, reason}
end
end

@impl GenServer
def handle_cast({:on_attestation, %Attestation{} = attestation}, %Store{} = _state) do
@spec on_attestation(Types.Attestation.t()) :: :ok
def on_attestation(%Attestation{} = attestation) do
state = fetch_store!()
id = attestation.signature |> Base.encode16() |> String.slice(0, 8)
Logger.debug("[Fork choice] Adding attestation #{id} to the store")
Expand All @@ -120,61 +84,56 @@ defmodule LambdaEthereumConsensus.ForkChoice do
end

persist_store(state)
{:noreply, state}
end

@impl GenServer
def handle_cast({:attester_slashing, attester_slashing}, _state) do
@spec on_attester_slashing(Types.AttesterSlashing.t()) :: :ok
def on_attester_slashing(attester_slashing) do
Logger.info("[Fork choice] Adding attester slashing to the store")
state = fetch_store!()

state =
case Handlers.on_attester_slashing(state, attester_slashing) do
{:ok, new_state} ->
new_state

_ ->
Logger.error("[Fork choice] Failed to add attester slashing to the store")
state
end
case Handlers.on_attester_slashing(state, attester_slashing) do
{:ok, new_state} ->
persist_store(new_state)

persist_store(state)
{:noreply, state}
_ ->
Logger.error("[Fork choice] Failed to add attester slashing to the store")
end
end

@impl GenServer
def handle_cast({:on_tick, time}, _store) do
@spec on_tick(Types.uint64()) :: :ok
def on_tick(time) do
store = fetch_store!()
%Store{finalized_checkpoint: last_finalized_checkpoint} = store

new_store = Handlers.on_tick(store, time)
%Store{finalized_checkpoint: new_finalized_checkpoint} = new_store
prune_old_states(last_finalized_checkpoint.epoch, new_finalized_checkpoint.epoch)
persist_store(new_store)
{:noreply, new_store}
end

@impl GenServer
def handle_info(_msg, state) do
{:noreply, state}
end

##########################
### Private Functions
##########################

def prune_old_states(last_finalized_epoch, new_finalized_epoch) do
defp prune_old_states(last_finalized_epoch, new_finalized_epoch) do
if last_finalized_epoch < new_finalized_epoch do
new_finalized_slot =
new_finalized_epoch * ChainSpec.get("SLOTS_PER_EPOCH")

Task.async(StateDb, :prune_states_older_than, [new_finalized_slot])
Task.async(BlockDb, :prune_blocks_older_than, [new_finalized_slot])
Task.Supervisor.start_child(
PruneStatesSupervisor,
fn -> StateDb.prune_states_older_than(new_finalized_slot) end
)

Task.Supervisor.start_child(
PruneBlocksSupervisor,
fn -> BlockDb.prune_blocks_older_than(new_finalized_slot) end
)
end
end

@spec apply_handler(any(), any(), any()) :: any()
def apply_handler(iter, state, handler) do
defp apply_handler(iter, state, handler) do
iter
|> Enum.reduce_while({:ok, state}, fn
x, {:ok, st} -> {:cont, handler.(st, x)}
Expand Down
2 changes: 0 additions & 2 deletions lib/lambda_ethereum_consensus/store/block_db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do
@type block_status ::
:pending
| :invalid
| :processing
| :download
| :download_blobs
| :unknown
Expand Down Expand Up @@ -47,7 +46,6 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do
when atom in [
:pending,
:invalid,
:processing,
:download,
:download_blobs,
:unknown,
Expand Down
6 changes: 6 additions & 0 deletions lib/lambda_ethereum_consensus/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ defmodule LambdaEthereumConsensus.Telemetry do
last_value("db.size.total", unit: :byte),

# ForkChoice Metrics
last_value("fork_choice.recompute_head.stop.duration",
unit: {:native, :millisecond}
),
last_value("fork_choice.recompute_head.exception.duration",
unit: {:native, :millisecond}
),
last_value("fork_choice.persist.stop.duration",
unit: {:native, :millisecond}
),
Expand Down
Loading
Loading