Skip to content

Commit 0b163a1

Browse files
refactor: libp2pPort gossip as callbacks instead of pids (#1111)
Co-authored-by: Tomás Grüner <[email protected]>
1 parent 605bfe0 commit 0b163a1

File tree

7 files changed

+79
-36
lines changed

7 files changed

+79
-36
lines changed

lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
77
alias LambdaEthereumConsensus.Beacon.BeaconChain
88
alias LambdaEthereumConsensus.Libp2pPort
99
alias LambdaEthereumConsensus.P2P
10+
alias LambdaEthereumConsensus.P2P.Gossip.Handler
1011
alias LambdaEthereumConsensus.StateTransition.Misc
1112

13+
@behaviour Handler
14+
1215
def start_link(init_arg) do
1316
GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
1417
end
@@ -22,6 +25,11 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
2225
update_enr()
2326
end
2427

28+
@impl true
29+
def handle_gossip_message(topic, msg_id, message) do
30+
GenServer.cast(__MODULE__, {:gossipsub, {topic, msg_id, message}})
31+
end
32+
2533
@spec leave(non_neg_integer()) :: :ok
2634
def leave(subnet_id) do
2735
topic = get_topic_name(subnet_id)
@@ -98,7 +106,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
98106
attestations = Map.put(state.attestations, subnet_id, [attestation])
99107
attnets = Map.put(state.attnets, subnet_id, attestation.data)
100108
new_state = %{state | attnets: attnets, attestations: attestations}
101-
get_topic_name(subnet_id) |> Libp2pPort.subscribe_to_topic()
109+
get_topic_name(subnet_id) |> Libp2pPort.subscribe_to_topic(__MODULE__)
102110
{:reply, :ok, new_state}
103111
end
104112

@@ -113,7 +121,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
113121
end
114122

115123
@impl true
116-
def handle_info({:gossipsub, {topic, msg_id, message}}, state) do
124+
def handle_cast({:gossipsub, {topic, msg_id, message}}, state) do
117125
subnet_id = extract_subnet_id(topic)
118126

119127
with {:ok, uncompressed} <- :snappyer.decompress(message),

lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do
55
alias LambdaEthereumConsensus.Beacon.BeaconChain
66
alias LambdaEthereumConsensus.Beacon.PendingBlocks
77
alias LambdaEthereumConsensus.Libp2pPort
8+
alias LambdaEthereumConsensus.P2P.Gossip.Handler
89
alias Types.SignedBeaconBlock
910

1011
use GenServer
1112

1213
require Logger
14+
@behaviour Handler
1315

1416
@type state :: %{topic: String.t(), slot: Types.slot()}
1517

@@ -30,6 +32,11 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do
3032
GenServer.cast(__MODULE__, {:slot_transition, slot})
3133
end
3234

35+
@impl true
36+
def handle_gossip_message(topic, msg_id, message) do
37+
GenServer.cast(__MODULE__, {:gossipsub, {topic, msg_id, message}})
38+
end
39+
3340
##########################
3441
### GenServer Callbacks
3542
##########################
@@ -47,7 +54,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do
4754

4855
@impl true
4956
def handle_call(:start, _from, %{topic: topic_name} = state) do
50-
Libp2pPort.subscribe_to_topic(topic_name)
57+
Libp2pPort.subscribe_to_topic(topic_name, __MODULE__)
5158
{:reply, :ok, state}
5259
end
5360

@@ -57,7 +64,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do
5764
end
5865

5966
@impl true
60-
def handle_info({:gossipsub, {_topic, msg_id, message}}, %{slot: slot} = state) do
67+
def handle_cast({:gossipsub, {_topic, msg_id, message}}, %{slot: slot} = state) do
6168
with {:ok, uncompressed} <- :snappyer.decompress(message),
6269
{:ok, signed_block} <- Ssz.from_ssz(uncompressed, SignedBeaconBlock),
6370
:ok <- validate(signed_block, slot) do

lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,15 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BlobSideCar do
44
"""
55
alias LambdaEthereumConsensus.Beacon.BeaconChain
66
alias LambdaEthereumConsensus.Libp2pPort
7+
alias LambdaEthereumConsensus.P2P.Gossip.Handler
78
alias LambdaEthereumConsensus.Store.BlobDb
89

910
use GenServer
1011

1112
require Logger
1213

14+
@behaviour Handler
15+
1316
@type topics :: [String.t()]
1417

1518
##########################
@@ -24,6 +27,11 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BlobSideCar do
2427
GenServer.call(__MODULE__, :start)
2528
end
2629

30+
@impl true
31+
def handle_gossip_message(topic, msg_id, message) do
32+
GenServer.cast(__MODULE__, {:gossipsub, {topic, msg_id, message}})
33+
end
34+
2735
##########################
2836
### GenServer Callbacks
2937
##########################
@@ -47,12 +55,12 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BlobSideCar do
4755

4856
@impl true
4957
def handle_call(:start, _from, topics) do
50-
Enum.each(topics, &Libp2pPort.subscribe_to_topic/1)
58+
Enum.each(topics, fn topic -> Libp2pPort.subscribe_to_topic(topic, __MODULE__) end)
5159
{:reply, :ok, topics}
5260
end
5361

5462
@impl true
55-
def handle_info({:gossipsub, {_topic, msg_id, message}}, topics) do
63+
def handle_cast({:gossipsub, {_topic, msg_id, message}}, topics) do
5664
with {:ok, uncompressed} <- :snappyer.decompress(message),
5765
{:ok, %Types.BlobSidecar{index: blob_index} = blob} <-
5866
Ssz.from_ssz(uncompressed, Types.BlobSidecar) do
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
defmodule LambdaEthereumConsensus.P2P.Gossip.Handler do
2+
@moduledoc """
3+
Gossip handler behaviour
4+
"""
5+
6+
@callback handle_gossip_message(binary(), binary(), iodata()) :: :ok | {:error, any}
7+
end

lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do
66

77
alias LambdaEthereumConsensus.Beacon.BeaconChain
88
alias LambdaEthereumConsensus.Libp2pPort
9+
alias LambdaEthereumConsensus.P2P.Gossip.Handler
910
alias LambdaEthereumConsensus.StateTransition.Misc
1011
alias LambdaEthereumConsensus.Utils.BitField
1112
alias Types.Attestation
@@ -17,6 +18,8 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do
1718

1819
require Logger
1920

21+
@behaviour Handler
22+
2023
@operations [
2124
:bls_to_execution_change,
2225
:attester_slashing,
@@ -77,6 +80,11 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do
7780
GenServer.cast(__MODULE__, {:new_block, block.slot, operations})
7881
end
7982

83+
@impl true
84+
def handle_gossip_message(topic, msg_id, message) do
85+
GenServer.cast(__MODULE__, {:gossipsub, {topic, msg_id, message}})
86+
end
87+
8088
@impl GenServer
8189
def init(_init_arg) do
8290
topics = get_topic_names()
@@ -88,7 +96,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do
8896

8997
@impl true
9098
def handle_call(:start, _from, %{topics: topics} = state) do
91-
Enum.each(topics, &Libp2pPort.subscribe_to_topic/1)
99+
Enum.each(topics, fn topic -> Libp2pPort.subscribe_to_topic(topic, __MODULE__) end)
92100
{:reply, :ok, state}
93101
end
94102

@@ -108,7 +116,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do
108116
end
109117

110118
@impl true
111-
def handle_info(
119+
def handle_cast(
112120
{:gossipsub,
113121
{<<_::binary-size(15)>> <> "beacon_aggregate_and_proof" <> _, _msg_id, message}},
114122
state

lib/libp2p_port.ex

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -187,13 +187,15 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
187187
Subscribes to the given topic. After this, messages published to the topic
188188
will be received by `self()`.
189189
"""
190-
@spec subscribe_to_topic(GenServer.server(), String.t()) :: :ok | {:error, String.t()}
191-
def subscribe_to_topic(pid \\ __MODULE__, topic_name) do
190+
@spec subscribe_to_topic(GenServer.server(), String.t(), module()) :: :ok | {:error, String.t()}
191+
def subscribe_to_topic(pid \\ __MODULE__, topic_name, module) do
192192
:telemetry.execute([:port, :message], %{}, %{
193193
function: "subscribe_to_topic",
194194
direction: "elixir->"
195195
})
196196

197+
GenServer.cast(pid, {:new_subscriptor, topic_name, module})
198+
197199
call_command(pid, {:subscribe, %SubscribeToTopic{name: topic_name}})
198200
end
199201

@@ -279,7 +281,13 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
279281
|> InitArgs.encode()
280282
|> then(&send_data(port, &1))
281283

282-
{:ok, %{port: port, new_peer_handler: new_peer_handler}}
284+
{:ok, %{port: port, new_peer_handler: new_peer_handler, subscriptors: %{}}}
285+
end
286+
287+
@impl GenServer
288+
def handle_cast({:new_subscriptor, topic, module}, %{subscriptors: subscriptors} = state) do
289+
new_subscriptors = Map.put(subscriptors, topic, module)
290+
{:noreply, %{state | subscriptors: new_subscriptors}}
283291
end
284292

285293
@impl GenServer
@@ -316,10 +324,13 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
316324
### PRIVATE FUNCTIONS
317325
######################
318326

319-
defp handle_notification(%GossipSub{} = gs, _state) do
327+
defp handle_notification(%GossipSub{} = gs, %{subscriptors: subscriptors}) do
320328
:telemetry.execute([:port, :message], %{}, %{function: "gossipsub", direction: "->elixir"})
321-
handler_pid = :erlang.binary_to_term(gs.handler)
322-
send(handler_pid, {:gossipsub, {gs.topic, gs.msg_id, gs.message}})
329+
330+
case Map.fetch(subscriptors, gs.topic) do
331+
{:ok, module} -> module.handle_gossip_message(gs.topic, gs.msg_id, gs.message)
332+
:error -> Logger.error("[Gossip] Received gossip from unknown topic: #{gs.topic}.")
333+
end
323334
end
324335

325336
defp handle_notification(

test/unit/libp2p_port_test.exs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ defmodule Unit.Libp2pPortTest do
44

55
alias LambdaEthereumConsensus.Beacon.BeaconChain
66
alias LambdaEthereumConsensus.Libp2pPort
7+
alias LambdaEthereumConsensus.P2P.Gossip.Handler
78

89
doctest Libp2pPort
910

@@ -86,37 +87,30 @@ defmodule Unit.Libp2pPortTest do
8687
start_port(:publisher)
8788
start_port(:gossiper, listen_addr: gossiper_addr)
8889

90+
# Send the PID in the message, so that we can receive a notification later.
91+
message = self() |> :erlang.term_to_binary()
8992
topic = "/test/gossipping"
90-
message = "hello world!"
9193

9294
# Connect the two peers
9395
%{peer_id: id} = Libp2pPort.get_node_identity(:gossiper)
9496
:ok = Libp2pPort.add_peer(:publisher, id, gossiper_addr, 999_999_999_999)
9597

96-
pid = self()
97-
98-
spawn_link(fn ->
99-
# Subscribe to the topic
100-
:ok = Libp2pPort.subscribe_to_topic(:gossiper, topic)
101-
send(pid, :subscribed)
102-
103-
# Receive the message
104-
assert {^topic, message_id, ^message} = Libp2pPort.receive_gossip()
105-
106-
Libp2pPort.validate_message(message_id, :accept)
107-
108-
# Send acknowledgement
109-
send(pid, :received)
110-
end)
111-
112-
# Give a head start to the other process
113-
assert_receive :subscribed, 100
98+
# Subscribe to the topic
99+
:ok = Libp2pPort.subscribe_to_topic(:gossiper, topic, __MODULE__)
114100

115101
# Publish message
116102
:ok = Libp2pPort.publish(:publisher, topic, message)
117103

118-
# Receive acknowledgement
119-
assert_receive :received, 100
104+
# Receive the message
105+
assert {^topic, message_id, ^message} = Libp2pPort.receive_gossip()
106+
107+
Libp2pPort.validate_message(message_id, :accept)
108+
end
109+
110+
@behaviour Handler
111+
def handle_gossip_message(topic, msg_id, message) do
112+
# Decode the PID from the message and send a notification.
113+
send(:erlang.binary_to_term(message), {:gossipsub, {topic, msg_id, message}})
120114
end
121115

122116
defp retry_test(f, retries) do
@@ -137,7 +131,7 @@ defmodule Unit.Libp2pPortTest do
137131
port = start_port(:some, listen_addr: ["/ip4/127.0.0.1/tcp/48790"])
138132
topic = "test"
139133

140-
Libp2pPort.subscribe_to_topic(port, topic)
134+
Libp2pPort.subscribe_to_topic(port, topic, __MODULE__)
141135
Libp2pPort.leave_topic(port, topic)
142136
Libp2pPort.join_topic(port, topic)
143137
end

0 commit comments

Comments
 (0)