Skip to content

Commit d6d845b

Browse files
authored
refactor: remove GenServer behavior from gossip BeaconBlock (#1113)
1 parent 203aace commit d6d845b

File tree

5 files changed

+47
-59
lines changed

5 files changed

+47
-59
lines changed

lib/lambda_ethereum_consensus/beacon/beacon_chain.ex

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do
44
use GenServer
55

66
alias LambdaEthereumConsensus.Beacon.PendingBlocks
7-
alias LambdaEthereumConsensus.P2P.Gossip
87
alias LambdaEthereumConsensus.StateTransition.Misc
98
alias LambdaEthereumConsensus.Validator.ValidatorManager
109
alias Types.BeaconState
@@ -243,11 +242,6 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do
243242

244243
defp notify_subscribers(logical_time) do
245244
log_new_slot(logical_time)
246-
247-
Enum.each([Gossip.BeaconBlock], fn subscriber ->
248-
GenServer.cast(subscriber, {:on_tick, logical_time})
249-
end)
250-
251245
ValidatorManager.notify_tick(logical_time)
252246
end
253247

lib/lambda_ethereum_consensus/beacon/beacon_node.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
5353
LambdaEthereumConsensus.Beacon.PendingBlocks,
5454
LambdaEthereumConsensus.Beacon.SyncBlocks,
5555
LambdaEthereumConsensus.P2P.Gossip.Attestation,
56-
LambdaEthereumConsensus.P2P.Gossip.BeaconBlock,
5756
LambdaEthereumConsensus.P2P.Gossip.BlobSideCar,
5857
LambdaEthereumConsensus.P2P.Gossip.OperationsCollector,
5958
{Task.Supervisor, name: PruneStatesSupervisor},
@@ -93,7 +92,8 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
9392
listen_addr: listen_addr,
9493
enable_discovery: true,
9594
discovery_addr: "0.0.0.0:#{port}",
96-
bootnodes: bootnodes
95+
bootnodes: bootnodes,
96+
join_init_topics: true
9797
]
9898
end
9999

lib/lambda_ethereum_consensus/beacon/sync_blocks.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,9 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do
8383
end
8484
end
8585

86+
# TODO: handle subscription failures.
8687
defp start_subscriptions() do
87-
Gossip.BeaconBlock.start()
88+
Gossip.BeaconBlock.subscribe_to_topic()
8889
Gossip.BlobSideCar.start()
8990
Gossip.OperationsCollector.start()
9091
end

lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex

Lines changed: 32 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -8,63 +8,17 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do
88
alias LambdaEthereumConsensus.P2P.Gossip.Handler
99
alias Types.SignedBeaconBlock
1010

11-
use GenServer
12-
1311
require Logger
1412
@behaviour Handler
1513

16-
@type state :: %{topic: String.t(), slot: Types.slot()}
17-
1814
##########################
1915
### Public API
2016
##########################
2117

22-
def start_link(init_arg) do
23-
GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
24-
end
25-
26-
def start() do
27-
GenServer.call(__MODULE__, :start)
28-
end
29-
30-
@spec notify_slot(Types.slot()) :: :ok
31-
def notify_slot(slot) do
32-
GenServer.cast(__MODULE__, {:slot_transition, slot})
33-
end
34-
3518
@impl true
36-
def handle_gossip_message(topic, msg_id, message) do
37-
GenServer.cast(__MODULE__, {:gossipsub, {topic, msg_id, message}})
38-
end
39-
40-
##########################
41-
### GenServer Callbacks
42-
##########################
43-
44-
@impl true
45-
@spec init(any()) :: {:ok, state()} | {:stop, any()}
46-
def init(_init_arg) do
47-
# TODO: this doesn't take into account fork digest changes
48-
fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower)
19+
def handle_gossip_message(_topic, msg_id, message) do
4920
slot = BeaconChain.get_current_slot()
50-
topic_name = "/eth2/#{fork_context}/beacon_block/ssz_snappy"
51-
Libp2pPort.join_topic(topic_name)
52-
{:ok, %{topic: topic_name, slot: slot}}
53-
end
54-
55-
@impl true
56-
def handle_call(:start, _from, %{topic: topic_name} = state) do
57-
Libp2pPort.subscribe_to_topic(topic_name, __MODULE__)
58-
{:reply, :ok, state}
59-
end
6021

61-
@impl true
62-
def handle_cast({:on_tick, {slot, _}}, state) do
63-
{:noreply, state |> Map.put(:slot, slot)}
64-
end
65-
66-
@impl true
67-
def handle_cast({:gossipsub, {_topic, msg_id, message}}, %{slot: slot} = state) do
6822
with {:ok, uncompressed} <- :snappyer.decompress(message),
6923
{:ok, signed_block} <- Ssz.from_ssz(uncompressed, SignedBeaconBlock),
7024
:ok <- validate(signed_block, slot) do
@@ -81,7 +35,37 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do
8135
Libp2pPort.validate_message(msg_id, :reject)
8236
end
8337

84-
{:noreply, state}
38+
:ok
39+
end
40+
41+
@spec join_topic() :: :ok
42+
def join_topic() do
43+
# TODO: this doesn't take into account fork digest changes
44+
topic_name = topic()
45+
Libp2pPort.join_topic(self(), topic_name)
46+
end
47+
48+
@spec subscribe_to_topic() :: :ok | :error
49+
def subscribe_to_topic() do
50+
topic()
51+
|> Libp2pPort.subscribe_to_topic(__MODULE__)
52+
|> case do
53+
:ok ->
54+
:ok
55+
56+
{:error, reason} ->
57+
Logger.error("[Gossip] Subscription failed: '#{reason}'")
58+
:error
59+
end
60+
end
61+
62+
##########################
63+
### Private functions
64+
##########################
65+
66+
defp topic() do
67+
fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower)
68+
"/eth2/#{fork_context}/beacon_block/ssz_snappy"
8569
end
8670

8771
@spec validate(SignedBeaconBlock.t(), Types.slot()) :: :ok | {:error, any}

lib/libp2p_port.ex

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
1010
use GenServer
1111

1212
alias LambdaEthereumConsensus.Beacon.BeaconChain
13+
alias LambdaEthereumConsensus.P2P.Gossip.BeaconBlock
1314
alias LambdaEthereumConsensus.StateTransition.Misc
1415
alias LambdaEthereumConsensus.Utils.BitVector
1516
alias Types.EnrForkId
@@ -53,6 +54,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
5354
| {:discovery_addr, String.t()}
5455
| {:bootnodes, [String.t()]}
5556
| {:new_peer_handler, pid()}
57+
| {:join_init_topics, boolean()}
5658

5759
######################
5860
### API
@@ -164,14 +166,14 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
164166
Joins the given topic.
165167
This does not subscribe to the topic, use `subscribe_to_topic/2` for that.
166168
"""
167-
@spec join_topic(GenServer.server(), String.t()) :: :ok | {:error, String.t()}
169+
@spec join_topic(GenServer.server(), String.t()) :: :ok
168170
def join_topic(pid \\ __MODULE__, topic_name) do
169171
:telemetry.execute([:port, :message], %{}, %{
170172
function: "join_topic",
171173
direction: "elixir->"
172174
})
173175

174-
call_command(pid, {:join, %JoinTopic{name: topic_name}})
176+
cast_command(pid, {:join, %JoinTopic{name: topic_name}})
175177
end
176178

177179
@doc """
@@ -264,13 +266,18 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
264266
cast_command(pid, {:update_enr, enr})
265267
end
266268

269+
defp join_init_topics() do
270+
BeaconBlock.join_topic()
271+
end
272+
267273
########################
268274
### GenServer Callbacks
269275
########################
270276

271277
@impl GenServer
272278
def init(args) do
273279
{new_peer_handler, args} = Keyword.pop(args, :new_peer_handler, nil)
280+
{join_init_topics, args} = Keyword.pop(args, :join_init_topics, false)
274281

275282
port = Port.open({:spawn, @port_name}, [:binary, {:packet, 4}, :exit_status])
276283

@@ -281,6 +288,8 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
281288
|> InitArgs.encode()
282289
|> then(&send_data(port, &1))
283290

291+
if join_init_topics, do: join_init_topics()
292+
284293
{:ok, %{port: port, new_peer_handler: new_peer_handler, subscriptors: %{}}}
285294
end
286295

0 commit comments

Comments
 (0)