Skip to content

Commit b359bba

Browse files
the-mikedavisdcorbacho
authored andcommitted
Use a graph-like projection for topic bindings in Khepri
Performance is better than mnesia: 72k msg/s vs. 55k msg/s on my machine with this PerfTest. perf-test -p -e amq.topic -t topic -qp q.%d -qpf 1 -qpt 2 -c 3000 This is for two reasons: * In general, mnesia calls have some overhead for looking up the table and calling internal routines. Eventually the ETS calls end up being roughly the same as pure ETS equivalents. This is already noted in some places in the rabbit codebase. * The mnesia version of this `trie_match/2` function occurs inside an `mnesia:async_dirty/2` call - inside a transaction. This has a lot more overhead from locking the table which is eliminated when switching to pure-ETS calls. I don't think the transaction has any practical importance on the results of `trie_match/2` here. Some races between updates and queries are possible but I think highly unlikely. This approach should grow fairly well and be tolerant to binding churn: the projection of edges is a set so it should handle insertion/ deletions/updates of bindings well. This replaces a version of topic routing that uses regular expressions to perform the matching. Regular expressions are much faster at confirming/denying that a routing key matches a pattern but I suspect that it won't grow very well as the number of topic bindings increases because you need to do a full (or at least partial) table scan to find all of the bindings for the exchange first. If there are many bindings, I suspect that the lookup of bindings for an exchange will dominate the running time. TODO: * Figure out exactly where this code should belong. It's awkwardly split between the projection definition and `rabbit_db_topic_exchange` for now. * Check the correctness of the graph approach. * Compare the performance of the graph approach to the regex approach.
1 parent b300057 commit b359bba

File tree

2 files changed

+132
-70
lines changed

2 files changed

+132
-70
lines changed

deps/rabbit/src/rabbit_db_topic_exchange.erl

Lines changed: 54 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,6 @@
3737
-define(MNESIA_EDGE_TABLE, rabbit_topic_trie_edge).
3838
-define(MNESIA_BINDING_TABLE, rabbit_topic_trie_binding).
3939

40-
-define(HASH, <<"#">>).
41-
-define(STAR, <<"*">>).
42-
-define(DOT, <<"\\.">>).
43-
-define(ONE_WORD, <<"[^.]+">>).
44-
-define(ANYTHING, <<".*">>).
45-
-define(ZERO_OR_MORE, <<"(\\..+)?">>).
46-
4740
%% -------------------------------------------------------------------
4841
%% set().
4942
%% -------------------------------------------------------------------
@@ -197,23 +190,8 @@ match_in_mnesia(XName, RoutingKey) ->
197190
mnesia:async_dirty(fun trie_match/2, [XName, Words]).
198191

199192
match_in_khepri(XName, RoutingKey) ->
200-
Root = khepri_exchange_type_topic_path(XName) ++ [rabbit_khepri:if_has_data_wildcard()],
201-
case rabbit_khepri:fold(
202-
Root,
203-
fun(Path0, #{data := Set}, Acc) ->
204-
Path = lists:nthtail(4, Path0),
205-
case is_re_topic_match(Path, RoutingKey) of
206-
true ->
207-
Bindings = sets:to_list(Set),
208-
[maps:get(destination, B) || B <- Bindings] ++ Acc;
209-
false ->
210-
Acc
211-
end
212-
end,
213-
[]) of
214-
{ok, List} -> List;
215-
_ -> []
216-
end.
193+
Words = split_topic_key_binary(RoutingKey),
194+
trie_match_in_khepri(XName, Words).
217195

218196
%% --------------------------------------------------------------
219197
%% Migration
@@ -590,51 +568,6 @@ split_topic_key_binary(Key) ->
590568
Words = split_topic_key(Key, [], []),
591569
[list_to_binary(W) || W <- Words].
592570

593-
is_re_topic_match([?HASH], _) ->
594-
true;
595-
is_re_topic_match([A], A) ->
596-
true;
597-
is_re_topic_match([], <<>>) ->
598-
true;
599-
is_re_topic_match([], _) ->
600-
false;
601-
is_re_topic_match(Path00, RoutingKey) ->
602-
Path0 = path_to_re(Path00),
603-
Path = << <<B/binary >> || B <- Path0 >>,
604-
case Path of
605-
?ANYTHING -> true;
606-
_ ->
607-
RE = <<$^,Path/binary,$$>>,
608-
case re:run(RoutingKey, RE, [{capture, none}]) of
609-
nomatch -> false;
610-
_ -> true
611-
end
612-
end.
613-
614-
path_to_re([?STAR | Rest]) ->
615-
path_to_re(Rest, [?ONE_WORD]);
616-
path_to_re([?HASH | Rest]) ->
617-
path_to_re(Rest, [?ANYTHING]);
618-
path_to_re([Bin | Rest]) ->
619-
path_to_re(Rest, [Bin]).
620-
621-
path_to_re([], Acc) ->
622-
lists:reverse(Acc);
623-
path_to_re([?STAR | Rest], [?ANYTHING | _] = Acc) ->
624-
path_to_re(Rest, [?ONE_WORD | Acc]);
625-
path_to_re([?STAR | Rest], Acc) ->
626-
path_to_re(Rest, [?ONE_WORD, ?DOT | Acc]);
627-
path_to_re([?HASH | Rest], [?HASH | _] = Acc) ->
628-
path_to_re(Rest, Acc);
629-
path_to_re([?HASH | Rest], [?ANYTHING | _] = Acc) ->
630-
path_to_re(Rest, Acc);
631-
path_to_re([?HASH | Rest], Acc) ->
632-
path_to_re(Rest, [?ZERO_OR_MORE | Acc]);
633-
path_to_re([Bin | Rest], [?ANYTHING | _] = Acc) ->
634-
path_to_re(Rest, [Bin | Acc]);
635-
path_to_re([Bin | Rest], Acc) ->
636-
path_to_re(Rest, [Bin, ?DOT | Acc]).
637-
638571
ensure_topic_deletion_ets() ->
639572
Tab = rabbit_db_topic_exchange_delete_table,
640573
case ets:whereis(Tab) of
@@ -652,3 +585,55 @@ ensure_topic_migration_ets() ->
652585
Tid ->
653586
Tid
654587
end.
588+
589+
%% Khepri topic graph
590+
591+
trie_match_in_khepri(X, Words) ->
592+
trie_match_in_khepri(X, root, Words, []).
593+
594+
trie_match_in_khepri(X, Node, [], ResAcc) ->
595+
trie_match_part_in_khepri(
596+
X, Node, <<"#">>, fun trie_match_skip_any_in_khepri/4, [],
597+
trie_bindings_in_khepri(X, Node) ++ ResAcc);
598+
trie_match_in_khepri(X, Node, [W | RestW] = Words, ResAcc) ->
599+
lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) ->
600+
trie_match_part_in_khepri(
601+
X, Node, WArg, MatchFun, RestWArg, Acc)
602+
end, ResAcc, [{W, fun trie_match_in_khepri/4, RestW},
603+
{<<"*">>, fun trie_match_in_khepri/4, RestW},
604+
{<<"#">>,
605+
fun trie_match_skip_any_in_khepri/4, Words}]).
606+
607+
trie_match_part_in_khepri(X, Node, Search, MatchFun, RestW, ResAcc) ->
608+
case trie_child_in_khepri(X, Node, Search) of
609+
{ok, NextNode} -> MatchFun(X, NextNode, RestW, ResAcc);
610+
error -> ResAcc
611+
end.
612+
613+
trie_match_skip_any_in_khepri(X, Node, [], ResAcc) ->
614+
trie_match_in_khepri(X, Node, [], ResAcc);
615+
trie_match_skip_any_in_khepri(X, Node, [_ | RestW] = Words, ResAcc) ->
616+
trie_match_skip_any_in_khepri(
617+
X, Node, RestW,
618+
trie_match_in_khepri(X, Node, Words, ResAcc)).
619+
620+
trie_child_in_khepri(X, Node, Word) ->
621+
case ets:lookup(rabbit_khepri_topic_trie,
622+
#trie_edge{exchange_name = X,
623+
node_id = Node,
624+
word = Word}) of
625+
[#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode};
626+
[] -> error
627+
end.
628+
629+
trie_bindings_in_khepri(X, Node) ->
630+
case ets:lookup(rabbit_khepri_topic_trie,
631+
#trie_edge{exchange_name = X,
632+
node_id = Node,
633+
word = bindings}) of
634+
[#topic_trie_edge{node_id = {bindings, Bindings}}] ->
635+
[Dest || #{destination := Dest} <- sets:to_list(Bindings)];
636+
[] ->
637+
[]
638+
end.
639+

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,8 @@ register_projections() ->
596596
fun register_rabbit_users_projection/0,
597597
fun register_rabbit_user_permissions_projection/0,
598598
fun register_rabbit_bindings_projection/0,
599-
fun register_rabbit_index_route_projection/0],
599+
fun register_rabbit_index_route_projection/0,
600+
fun register_rabbit_topic_graph_projection/0],
600601
[case RegisterFun() of
601602
ok -> ok;
602603
{error, exists} -> ok;
@@ -719,3 +720,79 @@ projection_fun_for_sets(MapFun) ->
719720
(_Table, _Path, _OldProps, _NewProps) ->
720721
ok
721722
end.
723+
724+
register_rabbit_topic_graph_projection() ->
725+
Name = rabbit_khepri_topic_trie,
726+
Options = #{keypos => #topic_trie_edge.trie_edge},
727+
ProjectionFun =
728+
fun (Table, Path, #{data := _OldBindings}, #{data := NewBindings}) ->
729+
[BindingEdge | _RestEdges] = edges_for_path(Path, NewBindings),
730+
ets:insert(Table, BindingEdge);
731+
(Table, Path, _OldProps, #{data := NewBindings}) ->
732+
Edges = edges_for_path(Path, NewBindings),
733+
ets:insert(Table, Edges);
734+
(Table, Path, #{data := OldBindings}, _NewProps) ->
735+
[BindingEdge | RestEdges] = edges_for_path(Path, OldBindings),
736+
ets:delete_object(Table, BindingEdge),
737+
trim_while_out_degree_is_zero(RestEdges);
738+
(_Table, _Path, _OldProps, _NewProps) ->
739+
ok
740+
end,
741+
Projection = khepri_projection:new(Name, ProjectionFun, Options),
742+
PathPattern = [rabbit_db_topic_exchange,
743+
topic_trie_binding,
744+
_VHost = ?KHEPRI_WILDCARD_STAR,
745+
_ExchangeName = ?KHEPRI_WILDCARD_STAR,
746+
_Routes = ?KHEPRI_WILDCARD_STAR_STAR],
747+
khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection).
748+
749+
edges_for_path(
750+
[rabbit_db_topic_exchange, topic_trie_binding,
751+
VHost, ExchangeName | Components],
752+
Bindings) ->
753+
Exchange = rabbit_misc:r(VHost, exchange, ExchangeName),
754+
edges_for_path([root | Components], Bindings, Exchange, []).
755+
756+
edges_for_path([FromNodeId, To | Rest], Bindings, Exchange, Edges) ->
757+
ToNodeId = [To | FromNodeId],
758+
Edge = #topic_trie_edge{trie_edge = #trie_edge{exchange_name = Exchange,
759+
node_id = FromNodeId,
760+
word = To},
761+
node_id = ToNodeId},
762+
edges_for_path([ToNodeId | Rest], Bindings, Exchange, [Edge | Edges]);
763+
edges_for_path([LeafId], Bindings, Exchange, Edges) ->
764+
ToNodeId = {bindings, Bindings},
765+
Edge = #topic_trie_edge{trie_edge = #trie_edge{exchange_name = Exchange,
766+
node_id = LeafId,
767+
word = bindings},
768+
node_id = ToNodeId},
769+
[Edge | Edges].
770+
771+
-spec trim_while_out_degree_is_zero(Edges) -> ok
772+
when
773+
Edges :: [Edge],
774+
Edge :: #topic_trie_edge{}.
775+
776+
trim_while_out_degree_is_zero([]) ->
777+
ok;
778+
trim_while_out_degree_is_zero([Edge | Rest]) ->
779+
#topic_trie_edge{trie_edge = #trie_edge{exchange_name = Exchange,
780+
node_id = _FromNodeId},
781+
node_id = ToNodeId} = Edge,
782+
OutEdgePattern = #topic_trie_edge{trie_edge =
783+
#trie_edge{exchange_name = Exchange,
784+
node_id = ToNodeId,
785+
word = '_'},
786+
node_id = '_'},
787+
case ets:match(rabbit_khepri_topic_trie, OutEdgePattern, 1) of
788+
'$end_of_table' ->
789+
%% If the ToNode has an out degree of zero, trim the edge to
790+
%% the node, effectively erasing ToNode.
791+
ets:delete_object(rabbit_khepri_topic_trie, Edge),
792+
trim_while_out_degree_is_zero(Rest);
793+
{_Match, _Continuation} ->
794+
%% Return after finding the first node with a non-zero out-degree.
795+
%% If a node has a non-zero out-degree then all of its ancestors
796+
%% must as well.
797+
ok
798+
end.

0 commit comments

Comments
 (0)