Skip to content

feat: add support for multiple validators. #1080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
May 22, 2024
Merged
23 changes: 9 additions & 14 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ switches = [
listen_address: [:string, :keep],
discovery_port: :integer,
boot_nodes: :string,
keystore_file: :string,
keystore_password_file: :string
keystore_dir: :string,
keystore_pass_dir: :string
]

is_testing = Config.config_env() == :test
Expand Down Expand Up @@ -50,8 +50,8 @@ enable_beacon_api = Keyword.get(args, :beacon_api, not is_nil(beacon_api_port))
listen_addresses = Keyword.get_values(args, :listen_address)
discovery_port = Keyword.get(args, :discovery_port, 9000)
cli_bootnodes = Keyword.get(args, :boot_nodes, "")
keystore = Keyword.get(args, :keystore_file)
keystore_pass = Keyword.get(args, :keystore_password_file)
keystore_dir = Keyword.get(args, :keystore_dir)
keystore_pass_dir = Keyword.get(args, :keystore_pass_dir)

if not is_nil(testnet_dir) and not is_nil(checkpoint_sync_url) do
IO.puts("Both checkpoint sync and testnet url specified (only one should be specified).")
Expand Down Expand Up @@ -153,17 +153,10 @@ config :lambda_ethereum_consensus, BeaconApi.Endpoint,
layout: false
]

if is_binary(keystore) and is_binary(keystore_pass) do
{pubkey, privkey} = Keystore.decode_from_files!(keystore, keystore_pass)
config :lambda_ethereum_consensus, LambdaEthereumConsensus.Validator.Supervisor,
keystore_dir: keystore_dir,
keystore_pass_dir: keystore_pass_dir

config :lambda_ethereum_consensus, LambdaEthereumConsensus.Validator,
pubkey: pubkey,
privkey: privkey
end

# Metrics

# Configures metrics
# TODO: we should set this dynamically
block_time_ms =
case network do
Expand All @@ -175,6 +168,8 @@ block_time_ms =
_ -> 12_000
end

# Metrics

config :lambda_ethereum_consensus, LambdaEthereumConsensus.Telemetry,
block_processing_buckets: [0.5, 1.0, 1.5, 2, 4, 6, 8] |> Enum.map(&(&1 * block_time_ms)),
port: metrics_port
Expand Down
4 changes: 3 additions & 1 deletion lib/lambda_ethereum_consensus/beacon/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,11 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do
defp notify_subscribers(logical_time) do
log_new_slot(logical_time)

Enum.each([Validator, Gossip.BeaconBlock], fn subscriber ->
Enum.each([Gossip.BeaconBlock], fn subscriber ->
GenServer.cast(subscriber, {:on_tick, logical_time})
end)

Validator.Supervisor.notify_tick(logical_time)
end

defp log_new_slot({slot, :first_third}) do
Expand Down
18 changes: 7 additions & 11 deletions lib/lambda_ethereum_consensus/beacon/beacon_node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,13 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
[]
end

defp get_validator_children(snapshot, head_slot, head_root, genesis_time) do
if is_nil(Application.get_env(:lambda_ethereum_consensus, Validator)) do
[]
else
%BeaconState{eth1_data_votes: votes} = BlockStates.get_state!(head_root)
# TODO: move checkpoint sync outside and move this to application.ex
[
{Validator, {head_slot, head_root}},
{LambdaEthereumConsensus.Execution.ExecutionChain, {genesis_time, snapshot, votes}}
]
end
defp get_validator_children(snapshot, slot, head_root, genesis_time) do
%BeaconState{eth1_data_votes: votes} = BlockStates.get_state!(head_root)
# TODO: move checkpoint sync outside and move this to application.ex
[
{Validator.Supervisor, {slot, head_root}},
{LambdaEthereumConsensus.Execution.ExecutionChain, {genesis_time, snapshot, votes}}
]
end

defp get_libp2p_args() do
Expand Down
2 changes: 1 addition & 1 deletion lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do
%{slot: slot, body: body} = head_block

OperationsCollector.notify_new_block(head_block)
Validator.notify_new_block(slot, head_root)
Validator.Supervisor.notify_new_block(slot, head_root)
ExecutionChain.notify_new_block(slot, body.eth1_data, body.execution_payload)

BeaconChain.update_fork_choice_cache(
Expand Down
12 changes: 10 additions & 2 deletions lib/lambda_ethereum_consensus/validator/block_builder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,16 @@ defmodule LambdaEthereumConsensus.Validator.BlockBuilder do
end

defp get_finalized_block_hash(state) do
finalized_block = Blocks.get_block!(state.finalized_checkpoint.root)
finalized_hash = finalized_block.body.execution_payload.block_hash
finalized_root = state.finalized_checkpoint.root

finalized_hash =
if finalized_root == <<0::256>> do
<<0::256>>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dealing with the case where there is not finalized block yet

https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#forkchoicestatev1

Note: safeBlockHash and finalizedBlockHash fields are allowed to have 0x0000000000000000000000000000000000000000000000000000000000000000 value unless transition block is finalized.

else
finalized_block = Blocks.get_block!(state.finalized_checkpoint.root)
finalized_block.body.execution_payload.block_hash
end

{:ok, finalized_hash}
end

Expand Down
80 changes: 80 additions & 0 deletions lib/lambda_ethereum_consensus/validator/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
defmodule LambdaEthereumConsensus.Validator.Supervisor do
@moduledoc false

use Supervisor

require Logger
alias LambdaEthereumConsensus.Validator

def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end

@impl true
def init({slot, head_root}) do
config = Application.get_env(:lambda_ethereum_consensus, __MODULE__, [])

keystore_dir = Keyword.get(config, :keystore_dir)
keystore_pass_dir = Keyword.get(config, :keystore_pass_dir)

if keystore_dir == nil or keystore_pass_dir == nil do
Logger.warning(
"[Validator] No keystore_dir or keystore_pass_dir provided. Validator will not start."
)

:ignore
else
validator_keys = get_validator_keys(keystore_dir, keystore_pass_dir)

children =
validator_keys
|> Enum.map(fn {pubkey, privkey} ->
Supervisor.child_spec({Validator, {slot, head_root, {pubkey, privkey}}},
id: pubkey |> Base.encode16(case: :lower) |> String.to_atom()
)
end)

Supervisor.init(children, strategy: :one_for_one)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One validator dying should not affect others, so I applied :one_for_one.

end
end

def notify_new_block(slot, head_root) do
cast_to_children({:new_block, slot, head_root})
end

def notify_tick(logical_time) do
cast_to_children({:on_tick, logical_time})
end

defp cast_to_children(msg) do
__MODULE__
|> Supervisor.which_children()
|> Enum.each(fn {_, pid, _, _} -> GenServer.cast(pid, msg) end)
end

@spec get_validator_keys(binary(), binary()) :: list({Bls.pubkey(), Bls.privkey()})
defp get_validator_keys(keystore_dir, keystore_pass_dir) do
keystore_files = File.ls!(keystore_dir) |> Enum.sort()
keystore_pass_files = File.ls!(keystore_pass_dir) |> Enum.sort()

Enum.zip(keystore_files, keystore_pass_files)
|> Enum.map(fn {keystore_file, keystore_pass_file} ->
keystore_file = Path.join(keystore_dir, keystore_file)
keystore_pass_file = Path.join(keystore_pass_dir, keystore_pass_file)

# TODO: remove `try` and handle errors properly
# TODO: match keystore file and pass file based on name
try do
Keystore.decode_from_files!(keystore_file, keystore_pass_file)
rescue
error ->
Logger.error(
"[Validator] Failed to decode keystore file: #{keystore_file}. Pass file: #{keystore_pass_file} Error: #{inspect(error)}"
)

nil
end
end)
|> Enum.filter(&is_tuple/1)
end
end
87 changes: 64 additions & 23 deletions lib/lambda_ethereum_consensus/validator/validator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule LambdaEthereumConsensus.Validator do
require Logger

alias LambdaEthereumConsensus.Beacon.BeaconChain
alias LambdaEthereumConsensus.Beacon.PendingBlocks
alias LambdaEthereumConsensus.ForkChoice.Handlers
alias LambdaEthereumConsensus.Libp2pPort
alias LambdaEthereumConsensus.P2P.Gossip
Expand All @@ -26,7 +27,11 @@ defmodule LambdaEthereumConsensus.Validator do
### Public API
##########################

def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)
def start_link({_, _, {pubkey, _}} = opts) do
# TODO: if possible, use validator index instead of pubkey
name = Atom.to_string(__MODULE__) <> "_" <> Base.encode16(pubkey, case: :lower)
GenServer.start_link(__MODULE__, opts, name: String.to_atom(name))
end

def notify_new_block(slot, head_root),
do: GenServer.cast(__MODULE__, {:new_block, slot, head_root})
Expand All @@ -35,29 +40,39 @@ defmodule LambdaEthereumConsensus.Validator do
### GenServer Callbacks
##########################

@impl true
def init({slot, head_root}) do
config = Application.get_env(:lambda_ethereum_consensus, __MODULE__, [])

validator =
case {Keyword.get(config, :pubkey), Keyword.get(config, :privkey)} do
{nil, nil} -> nil
{pubkey, privkey} -> %{index: nil, privkey: privkey, pubkey: pubkey}
end
@type state :: %{
slot: Types.slot(),
root: Types.root(),
duties: %{
attester: list(:not_computed),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is list(:not_computed | %{...})

proposer: :not_computed | list(Types.slot())
},
validator: any(),
payload_builder: {Types.slot(), Types.root(), BlockBuilder.payload_id()} | nil
}

@impl true
@spec init({Types.slot(), Types.root(), {Bls.pubkey(), Bls.privkey()}}) ::
{:ok, state, {:continue, any}}
def init({head_slot, head_root, {pubkey, privkey}}) do
state = %{
slot: slot,
slot: head_slot,
root: head_root,
duties: empty_duties(),
validator: validator
validator: %{
pubkey: pubkey,
privkey: privkey,
index: nil
},
payload_builder: nil
}

{:ok, state, {:continue, nil}}
end

@impl true
def handle_continue(nil, %{validator: nil} = state), do: {:noreply, state}
@spec handle_continue(nil, state) :: {:noreply, state}

@impl true
def handle_continue(nil, %{slot: slot, root: root} = state) do
case try_setup_validator(state, slot, root) do
nil ->
Expand All @@ -69,6 +84,7 @@ defmodule LambdaEthereumConsensus.Validator do
end
end

@spec try_setup_validator(state, Types.slot(), Types.root()) :: state | nil
defp try_setup_validator(state, slot, root) do
epoch = Misc.compute_epoch_at_slot(slot)
beacon = fetch_target_state(epoch, root)
Expand All @@ -87,6 +103,8 @@ defmodule LambdaEthereumConsensus.Validator do
end
end

@spec handle_cast(any, state) :: {:noreply, state}

@impl true
def handle_cast(_, %{validator: nil} = state), do: {:noreply, state}

Expand Down Expand Up @@ -151,7 +169,8 @@ defmodule LambdaEthereumConsensus.Validator do
defp update_state(%{slot: slot, root: root} = state, slot, root), do: state

defp update_state(%{slot: slot, root: _other_root} = state, slot, head_root) do
Logger.warning("[Validator] Block came late", slot: slot, root: head_root)
# TODO: this log is appearing for every block
# Logger.warning("[Validator] Block came late", slot: slot, root: head_root)

# TODO: rollback stale data instead of the whole cache
epoch = Misc.compute_epoch_at_slot(slot + 1)
Expand Down Expand Up @@ -186,6 +205,7 @@ defmodule LambdaEthereumConsensus.Validator do
%{state | slot: slot, root: head_root, duties: new_duties}
end

@spec fetch_target_state(Types.epoch(), Types.root()) :: Types.BeaconState.t()
defp fetch_target_state(epoch, root) do
{:ok, state} = Handlers.compute_target_checkpoint_state(epoch, root)
state
Expand All @@ -211,7 +231,7 @@ defmodule LambdaEthereumConsensus.Validator do
old -> old |> Enum.reverse() |> Enum.take(1)
end

%{duties | attester: attester_duties, proposer: old_duty ++ proposer_duties}
%{duties | attester: attester_duties, proposer: Enum.dedup(old_duty ++ proposer_duties)}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reintroduces a bug related to last-slot-in-epoch proposals. We should instead discard "future" duties before computing old_duty. Something like:

slot = Misc.compute_start_slot_for_epoch(epoch)
case duties.proposer do
  :not_computed -> []
  old -> old |> Enum.filter(&(&1 < slot)) |> Enum.reverse() |> Enum.take(1)
end

end

defp maybe_update_attester_duties([epp, ep0, ep1], beacon_state, epoch, validator) do
Expand Down Expand Up @@ -261,14 +281,14 @@ defmodule LambdaEthereumConsensus.Validator do

defp join(subnets) do
if not Enum.empty?(subnets) do
Logger.info("Joining subnets: #{Enum.join(subnets, ", ")}")
Logger.debug("Joining subnets: #{Enum.join(subnets, ", ")}")
Enum.each(subnets, &Gossip.Attestation.join/1)
end
end

defp leave(subnets) do
if not Enum.empty?(subnets) do
Logger.info("Leaving subnets: #{Enum.join(subnets, ", ")}")
Logger.debug("Leaving subnets: #{Enum.join(subnets, ", ")}")
Enum.each(subnets, &Gossip.Attestation.leave/1)
end
end
Expand All @@ -278,7 +298,7 @@ defmodule LambdaEthereumConsensus.Validator do
# Drop the first element, which is the previous epoch's duty
|> Stream.drop(1)
|> Enum.each(fn %{index_in_committee: i, committee_index: ci, slot: slot} ->
Logger.info(
Logger.debug(
"[Validator] #{validator_index} has to attest in committee #{ci} of slot #{slot} with index #{i}"
)
end)
Expand Down Expand Up @@ -458,6 +478,8 @@ defmodule LambdaEthereumConsensus.Validator do
Map.put(duty, :subnet_id, subnet_id)
end

@spec fetch_validator_index(Types.BeaconState.t(), %{index: nil, pubkey: Bls.pubkey()}) ::
non_neg_integer() | nil
defp fetch_validator_index(beacon, %{index: nil, pubkey: pk}) do
Enum.find_index(beacon.validators, &(&1.pubkey == pk))
end
Expand Down Expand Up @@ -488,8 +510,18 @@ defmodule LambdaEthereumConsensus.Validator do
defp start_payload_builder(state, proposed_slot, head_root) do
# TODO: handle reorgs and late blocks
Logger.info("[Validator] Starting to build payload for slot #{proposed_slot}")
{:ok, payload_id} = BlockBuilder.start_building_payload(proposed_slot, head_root)
%{state | payload_builder: {proposed_slot, head_root, payload_id}}

case BlockBuilder.start_building_payload(proposed_slot, head_root) do
{:ok, payload_id} ->
%{state | payload_builder: {proposed_slot, head_root, payload_id}}

{:error, reason} ->
Logger.error(
"[Validator] Failed to start building payload for slot #{proposed_slot}. Reason: #{reason}"
)

state
end
end

defp maybe_propose(state, slot) do
Expand All @@ -500,8 +532,16 @@ defmodule LambdaEthereumConsensus.Validator do
end
end

defp propose(%{root: head_root, validator: validator} = state, proposed_slot) do
{^proposed_slot, ^head_root, payload_id} = state.block_builder
defp propose(%{payload_builder: nil} = state, _proposed_slot) do
Logger.error("[Validator] Tried to propose a block without a payload builder")
state
end

defp propose(
%{root: head_root, validator: validator, payload_builder: payload_builder} = state,
proposed_slot
) do
{^proposed_slot, ^head_root, payload_id} = payload_builder

build_result =
BlockBuilder.build_block(
Expand All @@ -517,6 +557,7 @@ defmodule LambdaEthereumConsensus.Validator do

case build_result do
{:ok, {signed_block, blob_sidecars}} ->
PendingBlocks.add_block(signed_block)
publish_block(signed_block)
Enum.each(blob_sidecars, &publish_sidecar/1)

Expand Down
Loading