Skip to content

refactor: remove GenServer behavior from gossip blobsidecar #1115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 7, 2024
1 change: 0 additions & 1 deletion lib/lambda_ethereum_consensus/beacon/beacon_node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
LambdaEthereumConsensus.Beacon.PendingBlocks,
LambdaEthereumConsensus.Beacon.SyncBlocks,
LambdaEthereumConsensus.P2P.Gossip.Attestation,
LambdaEthereumConsensus.P2P.Gossip.BlobSideCar,
LambdaEthereumConsensus.P2P.Gossip.OperationsCollector,
{Task.Supervisor, name: PruneStatesSupervisor},
{Task.Supervisor, name: PruneBlocksSupervisor}
Expand Down
2 changes: 1 addition & 1 deletion lib/lambda_ethereum_consensus/beacon/sync_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do
# TODO: handle subscription failures.
defp start_subscriptions() do
Gossip.BeaconBlock.subscribe_to_topic()
Gossip.BlobSideCar.start()
Gossip.BlobSideCar.subscribe_to_topics()
Gossip.OperationsCollector.start()
end

Expand Down
83 changes: 33 additions & 50 deletions lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,60 +7,12 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BlobSideCar do
alias LambdaEthereumConsensus.P2P.Gossip.Handler
alias LambdaEthereumConsensus.Store.BlobDb

use GenServer

require Logger

@behaviour Handler

@type topics :: [String.t()]

##########################
### Public API
##########################

def start_link(init_arg) do
GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
end

def start() do
GenServer.call(__MODULE__, :start)
end

@impl true
def handle_gossip_message(topic, msg_id, message) do
GenServer.cast(__MODULE__, {:gossipsub, {topic, msg_id, message}})
end

##########################
### GenServer Callbacks
##########################

@impl true
@spec init(any()) :: {:ok, topics()} | {:stop, any()}
def init(_init_arg) do
# TODO: this doesn't take into account fork digest changes
fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower)

# Generate blob sidecar topics
# NOTE: there's one per blob index in Deneb (6 blobs per block)
topics =
Enum.map(0..(ChainSpec.get("BLOB_SIDECAR_SUBNET_COUNT") - 1), fn i ->
"/eth2/#{fork_context}/blob_sidecar_#{i}/ssz_snappy"
end)

Enum.each(topics, &Libp2pPort.join_topic/1)
{:ok, topics}
end

@impl true
def handle_call(:start, _from, topics) do
Enum.each(topics, fn topic -> Libp2pPort.subscribe_to_topic(topic, __MODULE__) end)
{:reply, :ok, topics}
end

@impl true
def handle_cast({:gossipsub, {_topic, msg_id, message}}, topics) do
def handle_gossip_message(_topic, msg_id, message) do
with {:ok, uncompressed} <- :snappyer.decompress(message),
{:ok, %Types.BlobSidecar{index: blob_index} = blob} <-
Ssz.from_ssz(uncompressed, Types.BlobSidecar) do
Expand All @@ -72,7 +24,38 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BlobSideCar do
Logger.warning("[Gossip] Blob rejected, reason: #{inspect(reason)}")
Libp2pPort.validate_message(msg_id, :reject)
end
end

@spec join_topics() :: :ok
def join_topics() do
topics()
|> Enum.each(fn topic_name -> Libp2pPort.join_topic(self(), topic_name) end)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not use each if it can return an error and leave the node in an unusable state. We should be able to return an error here, for example, with a reduce, or with a map and then counting the errors.

end

@spec subscribe_to_topics() :: :ok | {:error, String.t()}
def subscribe_to_topics() do
topics()
|> Enum.each(fn topic ->
Libp2pPort.subscribe_to_topic(topic, __MODULE__)
|> case do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid the pipe for a single operation.

case Libp2pPort.subscribe_to_topic(topic, __MODULE__) do
 :ok -> ...
 {:error, reason} -> ...
end

:ok ->
:ok

{:error, reason} ->
Logger.error("[Gossip] Subscription failed: '#{reason}'")
{:error, reason}
end
end)
end

defp topics() do
# TODO: this doesn't take into account fork digest changes
fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower)

{:noreply, topics}
# Generate blob sidecar topics
# NOTE: there's one per blob index in Deneb (6 blobs per block)
Enum.map(0..(ChainSpec.get("BLOB_SIDECAR_SUBNET_COUNT") - 1), fn i ->
"/eth2/#{fork_context}/blob_sidecar_#{i}/ssz_snappy"
end)
end
end
2 changes: 2 additions & 0 deletions lib/libp2p_port.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do

alias LambdaEthereumConsensus.Beacon.BeaconChain
alias LambdaEthereumConsensus.P2P.Gossip.BeaconBlock
alias LambdaEthereumConsensus.P2P.Gossip.BlobSideCar
alias LambdaEthereumConsensus.StateTransition.Misc
alias LambdaEthereumConsensus.Utils.BitVector
alias Types.EnrForkId
Expand Down Expand Up @@ -268,6 +269,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do

defp join_init_topics() do
BeaconBlock.join_topic()
BlobSideCar.join_topics()
end

########################
Expand Down
Loading