Skip to content

Commit 2c4f4d2

Browse files
authored
refactor: remove GenServer behavior from gossip blobsidecar (#1115)
1 parent 2930d96 commit 2c4f4d2

File tree

5 files changed

+50
-67
lines changed

5 files changed

+50
-67
lines changed

lib/lambda_ethereum_consensus/beacon/beacon_node.ex

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
5353
LambdaEthereumConsensus.Beacon.PendingBlocks,
5454
LambdaEthereumConsensus.Beacon.SyncBlocks,
5555
LambdaEthereumConsensus.P2P.Gossip.Attestation,
56-
LambdaEthereumConsensus.P2P.Gossip.BlobSideCar,
5756
LambdaEthereumConsensus.P2P.Gossip.OperationsCollector,
5857
{Task.Supervisor, name: PruneStatesSupervisor},
5958
{Task.Supervisor, name: PruneBlocksSupervisor},

lib/lambda_ethereum_consensus/beacon/sync_blocks.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do
8686
# TODO: handle subscription failures.
8787
defp start_subscriptions() do
8888
Gossip.BeaconBlock.subscribe_to_topic()
89-
Gossip.BlobSideCar.start()
89+
Gossip.BlobSideCar.subscribe_to_topics()
9090
Gossip.OperationsCollector.start()
9191
end
9292

lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,6 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do
3838
:ok
3939
end
4040

41-
@spec join_topic() :: :ok
42-
def join_topic() do
43-
# TODO: this doesn't take into account fork digest changes
44-
topic_name = topic()
45-
Libp2pPort.join_topic(self(), topic_name)
46-
end
47-
4841
@spec subscribe_to_topic() :: :ok | :error
4942
def subscribe_to_topic() do
5043
topic()
@@ -59,15 +52,15 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do
5952
end
6053
end
6154

62-
##########################
63-
### Private functions
64-
##########################
65-
66-
defp topic() do
55+
def topic() do
6756
fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower)
6857
"/eth2/#{fork_context}/beacon_block/ssz_snappy"
6958
end
7059

60+
##########################
61+
### Private functions
62+
##########################
63+
7164
@spec validate(SignedBeaconBlock.t(), Types.slot()) :: :ok | {:error, any}
7265
defp validate(%SignedBeaconBlock{message: block}, current_slot) do
7366
cond do

lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex

Lines changed: 25 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -7,60 +7,12 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BlobSideCar do
77
alias LambdaEthereumConsensus.P2P.Gossip.Handler
88
alias LambdaEthereumConsensus.Store.BlobDb
99

10-
use GenServer
11-
1210
require Logger
1311

1412
@behaviour Handler
1513

16-
@type topics :: [String.t()]
17-
18-
##########################
19-
### Public API
20-
##########################
21-
22-
def start_link(init_arg) do
23-
GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
24-
end
25-
26-
def start() do
27-
GenServer.call(__MODULE__, :start)
28-
end
29-
30-
@impl true
31-
def handle_gossip_message(topic, msg_id, message) do
32-
GenServer.cast(__MODULE__, {:gossipsub, {topic, msg_id, message}})
33-
end
34-
35-
##########################
36-
### GenServer Callbacks
37-
##########################
38-
3914
@impl true
40-
@spec init(any()) :: {:ok, topics()} | {:stop, any()}
41-
def init(_init_arg) do
42-
# TODO: this doesn't take into account fork digest changes
43-
fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower)
44-
45-
# Generate blob sidecar topics
46-
# NOTE: there's one per blob index in Deneb (6 blobs per block)
47-
topics =
48-
Enum.map(0..(ChainSpec.get("BLOB_SIDECAR_SUBNET_COUNT") - 1), fn i ->
49-
"/eth2/#{fork_context}/blob_sidecar_#{i}/ssz_snappy"
50-
end)
51-
52-
Enum.each(topics, &Libp2pPort.join_topic/1)
53-
{:ok, topics}
54-
end
55-
56-
@impl true
57-
def handle_call(:start, _from, topics) do
58-
Enum.each(topics, fn topic -> Libp2pPort.subscribe_to_topic(topic, __MODULE__) end)
59-
{:reply, :ok, topics}
60-
end
61-
62-
@impl true
63-
def handle_cast({:gossipsub, {_topic, msg_id, message}}, topics) do
15+
def handle_gossip_message(_topic, msg_id, message) do
6416
with {:ok, uncompressed} <- :snappyer.decompress(message),
6517
{:ok, %Types.BlobSidecar{index: blob_index} = blob} <-
6618
Ssz.from_ssz(uncompressed, Types.BlobSidecar) do
@@ -72,7 +24,30 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BlobSideCar do
7224
Logger.warning("[Gossip] Blob rejected, reason: #{inspect(reason)}")
7325
Libp2pPort.validate_message(msg_id, :reject)
7426
end
27+
end
7528

76-
{:noreply, topics}
29+
@spec subscribe_to_topics() :: :ok | {:error, String.t()}
30+
def subscribe_to_topics() do
31+
Enum.each(topics(), fn topic ->
32+
case Libp2pPort.subscribe_to_topic(topic, __MODULE__) do
33+
:ok ->
34+
:ok
35+
36+
{:error, reason} ->
37+
Logger.error("[Gossip] Subscription failed: '#{reason}'")
38+
{:error, reason}
39+
end
40+
end)
41+
end
42+
43+
def topics() do
44+
# TODO: this doesn't take into account fork digest changes
45+
fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower)
46+
47+
# Generate blob sidecar topics
48+
# NOTE: there's one per blob index in Deneb (6 blobs per block)
49+
Enum.map(0..(ChainSpec.get("BLOB_SIDECAR_SUBNET_COUNT") - 1), fn i ->
50+
"/eth2/#{fork_context}/blob_sidecar_#{i}/ssz_snappy"
51+
end)
7752
end
7853
end

lib/libp2p_port.ex

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
1111

1212
alias LambdaEthereumConsensus.Beacon.BeaconChain
1313
alias LambdaEthereumConsensus.P2P.Gossip.BeaconBlock
14+
alias LambdaEthereumConsensus.P2P.Gossip.BlobSideCar
1415
alias LambdaEthereumConsensus.StateTransition.Misc
1516
alias LambdaEthereumConsensus.Utils.BitVector
1617
alias Types.EnrForkId
@@ -266,8 +267,22 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
266267
cast_command(pid, {:update_enr, enr})
267268
end
268269

269-
defp join_init_topics() do
270-
BeaconBlock.join_topic()
270+
@spec join_init_topics(port()) :: :ok | {:error, String.t()}
271+
defp join_init_topics(port) do
272+
topics = [BeaconBlock.topic()] ++ BlobSideCar.topics()
273+
274+
topics
275+
|> Enum.each(fn topic_name ->
276+
c = {:join, %JoinTopic{name: topic_name}}
277+
data = Command.encode(%Command{c: c})
278+
279+
send_data(port, data)
280+
281+
:telemetry.execute([:port, :message], %{}, %{
282+
function: "join_topic",
283+
direction: "elixir->"
284+
})
285+
end)
271286
end
272287

273288
########################
@@ -288,7 +303,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
288303
|> InitArgs.encode()
289304
|> then(&send_data(port, &1))
290305

291-
if join_init_topics, do: join_init_topics()
306+
if join_init_topics, do: join_init_topics(port)
292307

293308
{:ok, %{port: port, new_peer_handler: new_peer_handler, subscriptors: %{}}}
294309
end
@@ -443,6 +458,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
443458
|> then(&struct!(InitArgs, &1))
444459
end
445460

461+
@spec send_data(port(), iodata()) :: boolean()
446462
defp send_data(port, data), do: Port.command(port, data)
447463

448464
defp send_protobuf(pid, %mod{} = protobuf) do

0 commit comments

Comments
 (0)