-
Notifications
You must be signed in to change notification settings - Fork 40
feat: add support for multiple validators. #1080
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
8dec6f0
42af559
4305729
257be5b
2d9ee4c
0a7a84f
426c1e8
daed033
fca122a
7627b5c
2d88d6d
a3adbc5
52dee65
5896f93
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -202,8 +202,16 @@ defmodule LambdaEthereumConsensus.Validator.BlockBuilder do | |
end | ||
|
||
defp get_finalized_block_hash(state) do | ||
finalized_block = Blocks.get_block!(state.finalized_checkpoint.root) | ||
finalized_hash = finalized_block.body.execution_payload.block_hash | ||
finalized_root = state.finalized_checkpoint.root | ||
|
||
finalized_hash = | ||
if finalized_root == <<0::256>> do | ||
<<0::256>> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dealing with the case where there is not finalized block yet https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#forkchoicestatev1
|
||
else | ||
finalized_block = Blocks.get_block!(state.finalized_checkpoint.root) | ||
finalized_block.body.execution_payload.block_hash | ||
end | ||
|
||
{:ok, finalized_hash} | ||
end | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
defmodule LambdaEthereumConsensus.Validator.Supervisor do | ||
@moduledoc false | ||
|
||
use Supervisor | ||
|
||
require Logger | ||
alias LambdaEthereumConsensus.Validator | ||
|
||
def start_link(opts) do | ||
Supervisor.start_link(__MODULE__, opts, name: __MODULE__) | ||
end | ||
|
||
@impl true | ||
def init({slot, head_root}) do | ||
config = Application.get_env(:lambda_ethereum_consensus, __MODULE__, []) | ||
|
||
keystore_dir = Keyword.get(config, :keystore_dir) | ||
keystore_pass_dir = Keyword.get(config, :keystore_pass_dir) | ||
|
||
if keystore_dir == nil or keystore_pass_dir == nil do | ||
Logger.warning( | ||
"[Validator] No keystore_dir or keystore_pass_dir provided. Validator will not start." | ||
) | ||
|
||
:ignore | ||
else | ||
validator_keys = get_validator_keys(keystore_dir, keystore_pass_dir) | ||
|
||
children = | ||
validator_keys | ||
|> Enum.map(fn {pubkey, privkey} -> | ||
Supervisor.child_spec({Validator, {slot, head_root, {pubkey, privkey}}}, | ||
id: pubkey |> Base.encode16(case: :lower) |> String.to_atom() | ||
) | ||
end) | ||
|
||
Supervisor.init(children, strategy: :one_for_one) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One validator dying should not affect others, so I applied |
||
end | ||
end | ||
|
||
def notify_new_block(slot, head_root) do | ||
cast_to_children({:new_block, slot, head_root}) | ||
end | ||
|
||
def notify_tick(logical_time) do | ||
cast_to_children({:on_tick, logical_time}) | ||
end | ||
|
||
defp cast_to_children(msg) do | ||
__MODULE__ | ||
|> Supervisor.which_children() | ||
|> Enum.each(fn {_, pid, _, _} -> GenServer.cast(pid, msg) end) | ||
end | ||
|
||
@spec get_validator_keys(binary(), binary()) :: list({Bls.pubkey(), Bls.privkey()}) | ||
defp get_validator_keys(keystore_dir, keystore_pass_dir) do | ||
keystore_files = File.ls!(keystore_dir) |> Enum.sort() | ||
keystore_pass_files = File.ls!(keystore_pass_dir) |> Enum.sort() | ||
|
||
Enum.zip(keystore_files, keystore_pass_files) | ||
|> Enum.map(fn {keystore_file, keystore_pass_file} -> | ||
keystore_file = Path.join(keystore_dir, keystore_file) | ||
keystore_pass_file = Path.join(keystore_pass_dir, keystore_pass_file) | ||
|
||
# TODO: remove `try` and handle errors properly | ||
# TODO: match keystore file and pass file based on name | ||
try do | ||
Keystore.decode_from_files!(keystore_file, keystore_pass_file) | ||
rescue | ||
error -> | ||
Logger.error( | ||
"[Validator] Failed to decode keystore file: #{keystore_file}. Pass file: #{keystore_pass_file} Error: #{inspect(error)}" | ||
) | ||
|
||
nil | ||
end | ||
end) | ||
|> Enum.filter(&is_tuple/1) | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ defmodule LambdaEthereumConsensus.Validator do | |
require Logger | ||
|
||
alias LambdaEthereumConsensus.Beacon.BeaconChain | ||
alias LambdaEthereumConsensus.Beacon.PendingBlocks | ||
alias LambdaEthereumConsensus.ForkChoice.Handlers | ||
alias LambdaEthereumConsensus.Libp2pPort | ||
alias LambdaEthereumConsensus.P2P.Gossip | ||
|
@@ -26,7 +27,11 @@ defmodule LambdaEthereumConsensus.Validator do | |
### Public API | ||
########################## | ||
|
||
def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__) | ||
def start_link({_, _, {pubkey, _}} = opts) do | ||
# TODO: if possible, use validator index instead of pubkey | ||
name = Atom.to_string(__MODULE__) <> "_" <> Base.encode16(pubkey, case: :lower) | ||
GenServer.start_link(__MODULE__, opts, name: String.to_atom(name)) | ||
end | ||
|
||
def notify_new_block(slot, head_root), | ||
do: GenServer.cast(__MODULE__, {:new_block, slot, head_root}) | ||
|
@@ -35,29 +40,39 @@ defmodule LambdaEthereumConsensus.Validator do | |
### GenServer Callbacks | ||
########################## | ||
|
||
@impl true | ||
def init({slot, head_root}) do | ||
config = Application.get_env(:lambda_ethereum_consensus, __MODULE__, []) | ||
|
||
validator = | ||
case {Keyword.get(config, :pubkey), Keyword.get(config, :privkey)} do | ||
{nil, nil} -> nil | ||
{pubkey, privkey} -> %{index: nil, privkey: privkey, pubkey: pubkey} | ||
end | ||
@type state :: %{ | ||
slot: Types.slot(), | ||
root: Types.root(), | ||
duties: %{ | ||
attester: list(:not_computed), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is |
||
proposer: :not_computed | list(Types.slot()) | ||
}, | ||
validator: any(), | ||
payload_builder: {Types.slot(), Types.root(), BlockBuilder.payload_id()} | nil | ||
} | ||
|
||
@impl true | ||
@spec init({Types.slot(), Types.root(), {Bls.pubkey(), Bls.privkey()}}) :: | ||
{:ok, state, {:continue, any}} | ||
def init({head_slot, head_root, {pubkey, privkey}}) do | ||
state = %{ | ||
slot: slot, | ||
slot: head_slot, | ||
mpaulucci marked this conversation as resolved.
Show resolved
Hide resolved
|
||
root: head_root, | ||
duties: empty_duties(), | ||
validator: validator | ||
validator: %{ | ||
pubkey: pubkey, | ||
privkey: privkey, | ||
index: nil | ||
}, | ||
payload_builder: nil | ||
} | ||
|
||
{:ok, state, {:continue, nil}} | ||
end | ||
|
||
@impl true | ||
def handle_continue(nil, %{validator: nil} = state), do: {:noreply, state} | ||
@spec handle_continue(nil, state) :: {:noreply, state} | ||
|
||
@impl true | ||
def handle_continue(nil, %{slot: slot, root: root} = state) do | ||
case try_setup_validator(state, slot, root) do | ||
nil -> | ||
|
@@ -69,6 +84,7 @@ defmodule LambdaEthereumConsensus.Validator do | |
end | ||
end | ||
|
||
@spec try_setup_validator(state, Types.slot(), Types.root()) :: state | nil | ||
defp try_setup_validator(state, slot, root) do | ||
epoch = Misc.compute_epoch_at_slot(slot) | ||
beacon = fetch_target_state(epoch, root) | ||
|
@@ -87,6 +103,8 @@ defmodule LambdaEthereumConsensus.Validator do | |
end | ||
end | ||
|
||
@spec handle_cast(any, state) :: {:noreply, state} | ||
|
||
@impl true | ||
def handle_cast(_, %{validator: nil} = state), do: {:noreply, state} | ||
|
||
|
@@ -151,7 +169,8 @@ defmodule LambdaEthereumConsensus.Validator do | |
defp update_state(%{slot: slot, root: root} = state, slot, root), do: state | ||
|
||
defp update_state(%{slot: slot, root: _other_root} = state, slot, head_root) do | ||
Logger.warning("[Validator] Block came late", slot: slot, root: head_root) | ||
# TODO: this log is appearing for every block | ||
# Logger.warning("[Validator] Block came late", slot: slot, root: head_root) | ||
|
||
# TODO: rollback stale data instead of the whole cache | ||
epoch = Misc.compute_epoch_at_slot(slot + 1) | ||
|
@@ -186,6 +205,7 @@ defmodule LambdaEthereumConsensus.Validator do | |
%{state | slot: slot, root: head_root, duties: new_duties} | ||
end | ||
|
||
@spec fetch_target_state(Types.epoch(), Types.root()) :: Types.BeaconState.t() | ||
defp fetch_target_state(epoch, root) do | ||
{:ok, state} = Handlers.compute_target_checkpoint_state(epoch, root) | ||
state | ||
|
@@ -211,7 +231,7 @@ defmodule LambdaEthereumConsensus.Validator do | |
old -> old |> Enum.reverse() |> Enum.take(1) | ||
end | ||
|
||
%{duties | attester: attester_duties, proposer: old_duty ++ proposer_duties} | ||
%{duties | attester: attester_duties, proposer: Enum.dedup(old_duty ++ proposer_duties)} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This reintroduces a bug related to last-slot-in-epoch proposals. We should instead discard "future" duties before computing slot = Misc.compute_start_slot_for_epoch(epoch)
case duties.proposer do
:not_computed -> []
old -> old |> Enum.filter(&(&1 < slot)) |> Enum.reverse() |> Enum.take(1)
end |
||
end | ||
|
||
defp maybe_update_attester_duties([epp, ep0, ep1], beacon_state, epoch, validator) do | ||
|
@@ -261,14 +281,14 @@ defmodule LambdaEthereumConsensus.Validator do | |
|
||
defp join(subnets) do | ||
if not Enum.empty?(subnets) do | ||
Logger.info("Joining subnets: #{Enum.join(subnets, ", ")}") | ||
Logger.debug("Joining subnets: #{Enum.join(subnets, ", ")}") | ||
Enum.each(subnets, &Gossip.Attestation.join/1) | ||
end | ||
end | ||
|
||
defp leave(subnets) do | ||
if not Enum.empty?(subnets) do | ||
Logger.info("Leaving subnets: #{Enum.join(subnets, ", ")}") | ||
Logger.debug("Leaving subnets: #{Enum.join(subnets, ", ")}") | ||
Enum.each(subnets, &Gossip.Attestation.leave/1) | ||
end | ||
end | ||
|
@@ -278,7 +298,7 @@ defmodule LambdaEthereumConsensus.Validator do | |
# Drop the first element, which is the previous epoch's duty | ||
|> Stream.drop(1) | ||
|> Enum.each(fn %{index_in_committee: i, committee_index: ci, slot: slot} -> | ||
Logger.info( | ||
Logger.debug( | ||
"[Validator] #{validator_index} has to attest in committee #{ci} of slot #{slot} with index #{i}" | ||
) | ||
end) | ||
|
@@ -458,6 +478,8 @@ defmodule LambdaEthereumConsensus.Validator do | |
Map.put(duty, :subnet_id, subnet_id) | ||
end | ||
|
||
@spec fetch_validator_index(Types.BeaconState.t(), %{index: nil, pubkey: Bls.pubkey()}) :: | ||
non_neg_integer() | nil | ||
defp fetch_validator_index(beacon, %{index: nil, pubkey: pk}) do | ||
Enum.find_index(beacon.validators, &(&1.pubkey == pk)) | ||
end | ||
|
@@ -488,8 +510,18 @@ defmodule LambdaEthereumConsensus.Validator do | |
defp start_payload_builder(state, proposed_slot, head_root) do | ||
# TODO: handle reorgs and late blocks | ||
Logger.info("[Validator] Starting to build payload for slot #{proposed_slot}") | ||
{:ok, payload_id} = BlockBuilder.start_building_payload(proposed_slot, head_root) | ||
%{state | payload_builder: {proposed_slot, head_root, payload_id}} | ||
|
||
case BlockBuilder.start_building_payload(proposed_slot, head_root) do | ||
{:ok, payload_id} -> | ||
%{state | payload_builder: {proposed_slot, head_root, payload_id}} | ||
|
||
{:error, reason} -> | ||
Logger.error( | ||
"[Validator] Failed to start building payload for slot #{proposed_slot}. Reason: #{reason}" | ||
) | ||
|
||
state | ||
end | ||
end | ||
|
||
defp maybe_propose(state, slot) do | ||
|
@@ -500,8 +532,16 @@ defmodule LambdaEthereumConsensus.Validator do | |
end | ||
end | ||
|
||
defp propose(%{root: head_root, validator: validator} = state, proposed_slot) do | ||
{^proposed_slot, ^head_root, payload_id} = state.block_builder | ||
defp propose(%{payload_builder: nil} = state, _proposed_slot) do | ||
Logger.error("[Validator] Tried to propose a block without a payload builder") | ||
state | ||
end | ||
|
||
defp propose( | ||
%{root: head_root, validator: validator, payload_builder: payload_builder} = state, | ||
proposed_slot | ||
) do | ||
{^proposed_slot, ^head_root, payload_id} = payload_builder | ||
|
||
build_result = | ||
BlockBuilder.build_block( | ||
|
@@ -517,6 +557,7 @@ defmodule LambdaEthereumConsensus.Validator do | |
|
||
case build_result do | ||
{:ok, {signed_block, blob_sidecars}} -> | ||
PendingBlocks.add_block(signed_block) | ||
publish_block(signed_block) | ||
Enum.each(blob_sidecars, &publish_sidecar/1) | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.