Skip to content

refactor: remove GenServer behavior from gossip blobsidecar #1115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 7, 2024
1 change: 0 additions & 1 deletion lib/lambda_ethereum_consensus/beacon/beacon_node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
LambdaEthereumConsensus.Beacon.PendingBlocks,
LambdaEthereumConsensus.Beacon.SyncBlocks,
LambdaEthereumConsensus.P2P.Gossip.Attestation,
LambdaEthereumConsensus.P2P.Gossip.BlobSideCar,
LambdaEthereumConsensus.P2P.Gossip.OperationsCollector,
{Task.Supervisor, name: PruneStatesSupervisor},
{Task.Supervisor, name: PruneBlocksSupervisor}
Expand Down
2 changes: 1 addition & 1 deletion lib/lambda_ethereum_consensus/beacon/sync_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do
# TODO: handle subscription failures.
defp start_subscriptions() do
Gossip.BeaconBlock.subscribe_to_topic()
Gossip.BlobSideCar.start()
Gossip.BlobSideCar.subscribe_to_topics()
Gossip.OperationsCollector.start()
end

Expand Down
17 changes: 5 additions & 12 deletions lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do
:ok
end

@spec join_topic() :: :ok
def join_topic() do
# TODO: this doesn't take into account fork digest changes
topic_name = topic()
Libp2pPort.join_topic(self(), topic_name)
end

@spec subscribe_to_topic() :: :ok | :error
def subscribe_to_topic() do
topic()
Expand All @@ -59,15 +52,15 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do
end
end

##########################
### Private functions
##########################

defp topic() do
def topic() do
fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower)
"/eth2/#{fork_context}/beacon_block/ssz_snappy"
end

##########################
### Private functions
##########################

@spec validate(SignedBeaconBlock.t(), Types.slot()) :: :ok | {:error, any}
defp validate(%SignedBeaconBlock{message: block}, current_slot) do
cond do
Expand Down
77 changes: 27 additions & 50 deletions lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,60 +7,12 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BlobSideCar do
alias LambdaEthereumConsensus.P2P.Gossip.Handler
alias LambdaEthereumConsensus.Store.BlobDb

use GenServer

require Logger

@behaviour Handler

@type topics :: [String.t()]

##########################
### Public API
##########################

def start_link(init_arg) do
GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
end

def start() do
GenServer.call(__MODULE__, :start)
end

@impl true
def handle_gossip_message(topic, msg_id, message) do
GenServer.cast(__MODULE__, {:gossipsub, {topic, msg_id, message}})
end

##########################
### GenServer Callbacks
##########################

@impl true
@spec init(any()) :: {:ok, topics()} | {:stop, any()}
def init(_init_arg) do
# TODO: this doesn't take into account fork digest changes
fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower)

# Generate blob sidecar topics
# NOTE: there's one per blob index in Deneb (6 blobs per block)
topics =
Enum.map(0..(ChainSpec.get("BLOB_SIDECAR_SUBNET_COUNT") - 1), fn i ->
"/eth2/#{fork_context}/blob_sidecar_#{i}/ssz_snappy"
end)

Enum.each(topics, &Libp2pPort.join_topic/1)
{:ok, topics}
end

@impl true
def handle_call(:start, _from, topics) do
Enum.each(topics, fn topic -> Libp2pPort.subscribe_to_topic(topic, __MODULE__) end)
{:reply, :ok, topics}
end

@impl true
def handle_cast({:gossipsub, {_topic, msg_id, message}}, topics) do
def handle_gossip_message(_topic, msg_id, message) do
with {:ok, uncompressed} <- :snappyer.decompress(message),
{:ok, %Types.BlobSidecar{index: blob_index} = blob} <-
Ssz.from_ssz(uncompressed, Types.BlobSidecar) do
Expand All @@ -72,7 +24,32 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BlobSideCar do
Logger.warning("[Gossip] Blob rejected, reason: #{inspect(reason)}")
Libp2pPort.validate_message(msg_id, :reject)
end
end

{:noreply, topics}
@spec subscribe_to_topics() :: :ok | {:error, String.t()}
def subscribe_to_topics() do
topics()
|> Enum.each(fn topic ->
Libp2pPort.subscribe_to_topic(topic, __MODULE__)
|> case do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid the pipe for a single operation.

case Libp2pPort.subscribe_to_topic(topic, __MODULE__) do
 :ok -> ...
 {:error, reason} -> ...
end

:ok ->
:ok

{:error, reason} ->
Logger.error("[Gossip] Subscription failed: '#{reason}'")
{:error, reason}
end
end)
end

def topics() do
# TODO: this doesn't take into account fork digest changes
fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower)

# Generate blob sidecar topics
# NOTE: there's one per blob index in Deneb (6 blobs per block)
Enum.map(0..(ChainSpec.get("BLOB_SIDECAR_SUBNET_COUNT") - 1), fn i ->
"/eth2/#{fork_context}/blob_sidecar_#{i}/ssz_snappy"
end)
end
end
22 changes: 19 additions & 3 deletions lib/libp2p_port.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do

alias LambdaEthereumConsensus.Beacon.BeaconChain
alias LambdaEthereumConsensus.P2P.Gossip.BeaconBlock
alias LambdaEthereumConsensus.P2P.Gossip.BlobSideCar
alias LambdaEthereumConsensus.StateTransition.Misc
alias LambdaEthereumConsensus.Utils.BitVector
alias Types.EnrForkId
Expand Down Expand Up @@ -266,8 +267,22 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
cast_command(pid, {:update_enr, enr})
end

defp join_init_topics() do
BeaconBlock.join_topic()
@spec join_init_topics(port()) :: :ok | {:error, String.t()}
defp join_init_topics(port) do
topics = [BeaconBlock.topic()] ++ BlobSideCar.topics()

topics
|> Enum.each(fn topic_name ->
c = {:join, %JoinTopic{name: topic_name}}
data = Command.encode(%Command{c: c})

send_data(port, data)

:telemetry.execute([:port, :message], %{}, %{
function: "join_topic",
direction: "elixir->"
})
end)
end

########################
Expand All @@ -288,7 +303,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
|> InitArgs.encode()
|> then(&send_data(port, &1))

if join_init_topics, do: join_init_topics()
if join_init_topics, do: join_init_topics(port)

{:ok, %{port: port, new_peer_handler: new_peer_handler, subscriptors: %{}}}
end
Expand Down Expand Up @@ -443,6 +458,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
|> then(&struct!(InitArgs, &1))
end

@spec send_data(port(), iodata()) :: boolean()
defp send_data(port, data), do: Port.command(port, data)

defp send_protobuf(pid, %mod{} = protobuf) do
Expand Down
Loading