Skip to content

Commit 2930d96

Browse files
authored
feat: add blobs pruning (#1102)
1 parent d6d845b commit 2930d96

File tree

3 files changed

+88
-1
lines changed

3 files changed

+88
-1
lines changed

lib/lambda_ethereum_consensus/beacon/beacon_node.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
5656
LambdaEthereumConsensus.P2P.Gossip.BlobSideCar,
5757
LambdaEthereumConsensus.P2P.Gossip.OperationsCollector,
5858
{Task.Supervisor, name: PruneStatesSupervisor},
59-
{Task.Supervisor, name: PruneBlocksSupervisor}
59+
{Task.Supervisor, name: PruneBlocksSupervisor},
60+
{Task.Supervisor, name: PruneBlobsSupervisor}
6061
] ++ validator_children
6162

6263
Supervisor.init(children, strategy: :one_for_all)

lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do
1010
alias LambdaEthereumConsensus.ForkChoice.Handlers
1111
alias LambdaEthereumConsensus.ForkChoice.Head
1212
alias LambdaEthereumConsensus.P2P.Gossip.OperationsCollector
13+
alias LambdaEthereumConsensus.Store.BlobDb
1314
alias LambdaEthereumConsensus.Store.BlockDb
1415
alias LambdaEthereumConsensus.Store.BlockDb.BlockInfo
1516
alias LambdaEthereumConsensus.Store.Blocks
@@ -129,6 +130,11 @@ defmodule LambdaEthereumConsensus.ForkChoice do
129130
PruneBlocksSupervisor,
130131
fn -> BlockDb.prune_blocks_older_than(new_finalized_slot) end
131132
)
133+
134+
Task.Supervisor.start_child(
135+
PruneBlobsSupervisor,
136+
fn -> BlobDb.prune_old_blobs(new_finalized_slot) end
137+
)
132138
end
133139
end
134140

lib/lambda_ethereum_consensus/store/blob_db.ex

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@ defmodule LambdaEthereumConsensus.Store.BlobDb do
22
@moduledoc """
33
Storage and retrieval of blobs.
44
"""
5+
require Logger
6+
57
alias LambdaEthereumConsensus.Store.Db
8+
alias LambdaEthereumConsensus.Store.Utils
69
alias Types.Blobdata
710
alias Types.BlobSidecar
811

912
@blob_sidecar_prefix "blob_sidecar"
1013
@blobdata_prefix "blobdata"
14+
@block_root_prefix "block_root"
1115

1216
@spec store_blob(BlobSidecar.t()) :: :ok
1317
def store_blob(%BlobSidecar{signed_block_header: %{message: block_header}} = blob) do
@@ -22,6 +26,9 @@ defmodule LambdaEthereumConsensus.Store.BlobDb do
2226

2327
key = blobdata_key(block_root, blob.index)
2428
Db.put(key, encoded_blobdata)
29+
30+
block_root_key = block_root_key(block_header.slot, blob.index)
31+
Db.put(block_root_key, block_root)
2532
end
2633

2734
# TODO: this is only used for tests
@@ -43,6 +50,16 @@ defmodule LambdaEthereumConsensus.Store.BlobDb do
4350
end
4451
end
4552

53+
@spec get_blob_by_slot_index(non_neg_integer(), non_neg_integer()) ::
54+
{:ok, BlobSidecar.t()} | {:error, String.t()} | :not_found
55+
def get_blob_by_slot_index(slot, index) do
56+
block_root_key = block_root_key(slot, index)
57+
58+
with {:ok, block_root} <- Db.get(block_root_key) do
59+
get_blob_sidecar(block_root, index)
60+
end
61+
end
62+
4663
@spec get_blob_with_proof(Types.root(), Types.blob_index()) ::
4764
{:ok, {Types.blob(), Types.kzg_proof()}} | {:error, String.t()} | :not_found
4865
def get_blob_with_proof(block_root, blob_index) do
@@ -55,8 +72,71 @@ defmodule LambdaEthereumConsensus.Store.BlobDb do
5572
end
5673
end
5774

75+
@spec prune_old_blobs(non_neg_integer()) :: :ok | {:error, String.t()} | :not_found
76+
def prune_old_blobs(current_finalized_slot) do
77+
slot =
78+
current_finalized_slot -
79+
ChainSpec.get("MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS") * ChainSpec.get("SLOTS_PER_EPOCH")
80+
81+
Logger.info("[BlobDb] Pruning started.", slot: slot)
82+
last_finalized_key = slot |> block_root_key(0)
83+
84+
with {:ok, it} <- Db.iterate(),
85+
{:ok, @block_root_prefix <> _, _value} <-
86+
Exleveldb.iterator_move(it, last_finalized_key),
87+
{:ok, keys_to_remove} <- get_block_root_keys_to_remove(it),
88+
:ok <- Exleveldb.iterator_close(it) do
89+
total_removed =
90+
keys_to_remove
91+
|> Enum.reduce_while(0, fn
92+
key, acc_removed_count ->
93+
case remove_blob_by_block_root_key(key) do
94+
:ok ->
95+
{:cont, acc_removed_count + 1}
96+
97+
_ ->
98+
Logger.error("[BlobDb] Error while removing key.", key: key)
99+
{:halt, acc_removed_count}
100+
end
101+
end)
102+
103+
Logger.info("[BlobDb] Pruning finished. #{total_removed} blobs removed.")
104+
end
105+
end
106+
107+
@spec get_block_root_keys_to_remove(list(binary()), :eleveldb.itr_ref()) ::
108+
{:ok, list(binary())}
109+
defp get_block_root_keys_to_remove(keys_to_remove \\ [], iterator) do
110+
case Exleveldb.iterator_move(iterator, :prev) do
111+
{:ok, <<@block_root_prefix, _rest::binary>> = block_root_key, _root} ->
112+
[block_root_key | keys_to_remove] |> get_block_root_keys_to_remove(iterator)
113+
114+
_ ->
115+
{:ok, keys_to_remove}
116+
end
117+
end
118+
119+
@spec remove_blob_by_block_root_key(binary()) :: :ok | :not_found
120+
defp remove_blob_by_block_root_key(block_root_key) do
121+
<<@block_root_prefix, _slot::unsigned-size(64), index>> =
122+
block_root_key
123+
124+
with {:ok, block_root} <- Db.get(block_root_key) do
125+
key_blob = blob_sidecar_key(block_root, index)
126+
key_data = blobdata_key(block_root, index)
127+
128+
Db.delete(block_root_key)
129+
Db.delete(key_blob)
130+
Db.delete(key_data)
131+
end
132+
end
133+
58134
defp blob_sidecar_key(block_root, blob_index),
59135
do: @blob_sidecar_prefix <> block_root <> <<blob_index>>
60136

61137
defp blobdata_key(block_root, blob_index), do: @blobdata_prefix <> block_root <> <<blob_index>>
138+
139+
defp block_root_key(blob_slot, blob_index) do
140+
Utils.get_key(@block_root_prefix, blob_slot) <> <<blob_index>>
141+
end
62142
end

0 commit comments

Comments
 (0)