Skip to content

Use OTP 26 features #8553

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/check-build-system-equivalence.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
- cron: '0 2 * * *'
workflow_dispatch:
env:
erlang_version: 25.3
erlang_version: 26.0
elixir_version: 1.14
VERSION: 3.13.0
PLUGINS: amqp10_common amqp10_client rabbitmq_amqp1_0 rabbitmq_auth_backend_cache rabbitmq_auth_backend_http rabbitmq_auth_backend_ldap rabbitmq_auth_backend_oauth2 rabbitmq_auth_mechanism_ssl rabbitmq_consistent_hash_exchange rabbitmq_event_exchange rabbitmq_federation rabbitmq_jms_topic_exchange rabbitmq_mqtt rabbitmq_random_exchange rabbitmq_recent_history_exchange rabbitmq_sharding rabbitmq_shovel rabbitmq_stomp rabbitmq_stream rabbitmq_trust_store rabbitmq_web_dispatch rabbitmq_management_agent rabbitmq_management rabbitmq_prometheus rabbitmq_federation_management rabbitmq_shovel_management rabbitmq_stream_management rabbitmq_top rabbitmq_tracing rabbitmq_web_mqtt rabbitmq_web_mqtt_examples rabbitmq_web_stomp rabbitmq_web_stomp_examples rabbitmq_aws rabbitmq_peer_discovery_common rabbitmq_peer_discovery_aws rabbitmq_peer_discovery_k8s rabbitmq_peer_discovery_consul rabbitmq_peer_discovery_etcd
Expand Down
4 changes: 0 additions & 4 deletions .github/workflows/oci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ jobs:
# For example, for Git commit SHA '111aaa' and branch name 'main' and maximum supported Erlang major version '26',
# the following tags will be pushed to Dockerhub:
#
# * 111aaa-otp-min (image OTP 25)
# * main-otp-min (image OTP 25)
# * 111aaa-otp-max (image OTP 26)
# * main-otp-max (image OTP 26)

Expand All @@ -33,8 +31,6 @@ jobs:
fail-fast: false
matrix:
include:
- image_tag_suffix: otp-min-bazel
otp_version_id: 25_3
- image_tag_suffix: otp-max-bazel
otp_version_id: 26
steps:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-mixed-versions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ jobs:
fail-fast: false
matrix:
otp_version_id:
- "25_3"
- "26"
timeout-minutes: 120
steps:
- name: CHECKOUT REPOSITORY
Expand Down Expand Up @@ -171,7 +171,7 @@ jobs:
strategy:
matrix:
include:
- erlang_version: "25.3"
- erlang_version: "26.0"
elixir_version: 1.14.5
timeout-minutes: 60
steps:
Expand Down
3 changes: 0 additions & 3 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ jobs:
fail-fast: false
matrix:
otp_version_id:
- 25_3
- 26
timeout-minutes: 120
steps:
Expand Down Expand Up @@ -82,8 +81,6 @@ jobs:
fail-fast: false
matrix:
include:
- erlang_version: "25.3"
elixir_version: 1.14.5
- erlang_version: "26.0"
elixir_version: 1.14.5
timeout-minutes: 60
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

-export([check/1]).

-define(OTP_MINIMUM, "25.0").
-define(ERTS_MINIMUM, "13.0").
-define(OTP_MINIMUM, "26.0").
-define(ERTS_MINIMUM, "14.0").

check(_Context) ->
?LOG_DEBUG(
Expand Down
1 change: 0 additions & 1 deletion deps/rabbit/scripts/rabbitmq-server
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ start_rabbitmq_server() {
-syslog logger '[]' \
-syslog syslog_error_logger false \
-kernel prevent_overlapping_partitions false \
-enable-feature maybe_expr \
"$@"
}

Expand Down
1 change: 0 additions & 1 deletion deps/rabbit/scripts/rabbitmq-server.bat
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ if "!RABBITMQ_ALLOW_INPUT!"=="" (
-syslog logger [] ^
-syslog syslog_error_logger false ^
-kernel prevent_overlapping_partitions false ^
-enable-feature maybe_expr ^
!STAR!

if ERRORLEVEL 1 (
Expand Down
1 change: 0 additions & 1 deletion deps/rabbit/scripts/rabbitmq-service.bat
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-syslog logger [] ^
-syslog syslog_error_logger false ^
-kernel prevent_overlapping_partitions false ^
-enable-feature maybe_expr ^
!STARVAR!

set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:\=\\!
Expand Down
56 changes: 7 additions & 49 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
fold/2]).

%% Routing. These functions are in the hot code path
-export([match/2, match_routing_key/3]).
-export([match/2, match_routing_key/2]).

%% Exported to be used by various rabbit_db_* modules
-export([
Expand Down Expand Up @@ -398,31 +398,22 @@ match_in_mnesia(SrcName, Match) ->
%% match_routing_key().
%% -------------------------------------------------------------------

-spec match_routing_key(Src, RoutingKeys, UseIndex) -> [Dst] when
-spec match_routing_key(Src, RoutingKeys) -> [Dst] when
Src :: rabbit_types:binding_source(),
Dst :: rabbit_types:binding_destination(),
RoutingKeys :: [binary() | '_'],
UseIndex :: boolean().
RoutingKeys :: [binary() | '_'].
%% @doc Matches all binding records that have `Src' as source of the binding
%% and that match any routing key in `RoutingKeys'.
%%
%% @returns the list of destinations
%%
%% @private

match_routing_key(SrcName, RoutingKeys, UseIndex) ->
match_routing_key(SrcName, RoutingKeys) ->
rabbit_db:run(
#{mnesia => fun() -> match_routing_key_in_mnesia(SrcName, RoutingKeys, UseIndex) end
#{mnesia => fun() -> route_in_mnesia(SrcName, RoutingKeys) end
}).

match_routing_key_in_mnesia(SrcName, RoutingKeys, UseIndex) ->
case UseIndex of
true ->
route_v2(?MNESIA_INDEX_TABLE, SrcName, RoutingKeys);
_ ->
route_in_mnesia_v1(SrcName, RoutingKeys)
end.

%% -------------------------------------------------------------------
%% recover().
%% -------------------------------------------------------------------
Expand Down Expand Up @@ -728,13 +719,13 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)).

%% Routing. Hot code path
%% -------------------------------------------------------------------------
route_in_mnesia_v1(SrcName, [RoutingKey]) ->
route_in_mnesia(SrcName, [RoutingKey]) ->
MatchHead = #route{binding = #binding{source = SrcName,
destination = '$1',
key = RoutingKey,
_ = '_'}},
ets:select(?MNESIA_TABLE, [{MatchHead, [], ['$1']}]);
route_in_mnesia_v1(SrcName, [_|_] = RoutingKeys) ->
route_in_mnesia(SrcName, [_|_] = RoutingKeys) ->
%% Normally we'd call mnesia:dirty_select/2 here, but that is quite
%% expensive for the same reasons as above, and, additionally, due to
%% mnesia 'fixing' the table with ets:safe_fixtable/2, which is wholly
Expand All @@ -753,36 +744,3 @@ route_in_mnesia_v1(SrcName, [_|_] = RoutingKeys) ->
Conditions = [list_to_tuple(['orelse' | [{'=:=', '$2', RKey} ||
RKey <- RoutingKeys]])],
ets:select(?MNESIA_TABLE, [{MatchHead, Conditions, ['$1']}]).

%% rabbit_router:match_routing_key/2 uses ets:select/2 to get destinations.
%% ets:select/2 is expensive because it needs to compile the match spec every
%% time and lookup does not happen by a hash key.
%%
%% In contrast, route_v2/2 increases end-to-end message sending throughput
%% (i.e. from RabbitMQ client to the queue process) by up to 35% by using ets:lookup_element/3.
%% Only the direct exchange type uses the rabbit_index_route table to store its
%% bindings by table key tuple {SourceExchange, RoutingKey}.
-spec route_v2(ets:table(), rabbit_types:binding_source(), [rabbit_router:routing_key(), ...]) ->
rabbit_router:match_result().
route_v2(Table, SrcName, [RoutingKey]) ->
%% optimization
destinations(Table, SrcName, RoutingKey);
route_v2(Table, SrcName, [_|_] = RoutingKeys) ->
lists:flatmap(fun(Key) ->
destinations(Table, SrcName, Key)
end, RoutingKeys).

destinations(Table, SrcName, RoutingKey) ->
%% Prefer try-catch block over checking Key existence with ets:member/2.
%% The latter reduces throughput by a few thousand messages per second because
%% of function db_member_hash in file erl_db_hash.c.
%% We optimise for the happy path, that is the binding / table key is present.
try
ets:lookup_element(Table,
{SrcName, RoutingKey},
#index_route.destination)
catch
error:badarg ->
[]
end.

18 changes: 5 additions & 13 deletions deps/rabbit/src/rabbit_exchange_type_direct.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ assert_args_equivalence(X, Args) ->
%% time and lookup does not happen by a hash key.
%%
%% In contrast, route_v2/2 increases end-to-end message sending throughput
%% (i.e. from RabbitMQ client to the queue process) by up to 35% by using ets:lookup_element/3.
%% (i.e. from RabbitMQ client to the queue process) by up to 35% by using ets:lookup_element/4.
%% Only the direct exchange type uses the rabbit_index_route table to store its
%% bindings by table key tuple {SourceExchange, RoutingKey}.
-spec route_v2(rabbit_types:binding_source(), [rabbit_router:routing_key(), ...]) ->
Expand All @@ -67,15 +67,7 @@ route_v2(SrcName, [_|_] = RoutingKeys) ->
end, RoutingKeys).

destinations(SrcName, RoutingKey) ->
%% Prefer try-catch block over checking Key existence with ets:member/2.
%% The latter reduces throughput by a few thousand messages per second because
%% of function db_member_hash in file erl_db_hash.c.
%% We optimise for the happy path, that is the binding / table key is present.
try
ets:lookup_element(rabbit_index_route,
{SrcName, RoutingKey},
#index_route.destination)
catch
error:badarg ->
[]
end.
ets:lookup_element(rabbit_index_route,
{SrcName, RoutingKey},
#index_route.destination,
[]).
22 changes: 7 additions & 15 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ cluster_state(Name) ->
case whereis(Name) of
undefined -> down;
_ ->
case ets_lookup_element(ra_state, Name, 2, undefined) of
case ets:lookup_element(ra_state, Name, 2, undefined) of
recover ->
recovering;
_ ->
Expand Down Expand Up @@ -1397,10 +1397,10 @@ i(messages, Q) when ?is_amqqueue(Q) ->
quorum_messages(QName);
i(messages_ready, Q) when ?is_amqqueue(Q) ->
QName = amqqueue:get_name(Q),
ets_lookup_element(queue_coarse_metrics, QName, 2, 0);
ets:lookup_element(queue_coarse_metrics, QName, 2, 0);
i(messages_unacknowledged, Q) when ?is_amqqueue(Q) ->
QName = amqqueue:get_name(Q),
ets_lookup_element(queue_coarse_metrics, QName, 3, 0);
ets:lookup_element(queue_coarse_metrics, QName, 3, 0);
i(policy, Q) ->
case rabbit_policy:name(Q) of
none -> '';
Expand All @@ -1418,7 +1418,7 @@ i(effective_policy_definition, Q) ->
end;
i(consumers, Q) when ?is_amqqueue(Q) ->
QName = amqqueue:get_name(Q),
Consumers = ets_lookup_element(queue_metrics, QName, 2, []),
Consumers = ets:lookup_element(queue_metrics, QName, 2, []),
proplists:get_value(consumers, Consumers, 0);
i(memory, Q) when ?is_amqqueue(Q) ->
{Name, _} = amqqueue:get_pid(Q),
Expand All @@ -1440,7 +1440,7 @@ i(state, Q) when ?is_amqqueue(Q) ->
end;
i(local_state, Q) when ?is_amqqueue(Q) ->
{Name, _} = amqqueue:get_pid(Q),
ets_lookup_element(ra_state, Name, 2, not_member);
ets:lookup_element(ra_state, Name, 2, not_member);
i(garbage_collection, Q) when ?is_amqqueue(Q) ->
{Name, _} = amqqueue:get_pid(Q),
try
Expand Down Expand Up @@ -1517,7 +1517,7 @@ open_files(Name) ->
undefined ->
{node(), 0};
Pid ->
{node(), ets_lookup_element(ra_open_file_metrics, Pid, 2, 0)}
{node(), ets:lookup_element(ra_open_file_metrics, Pid, 2, 0)}
end.

leader(Q) when ?is_amqqueue(Q) ->
Expand Down Expand Up @@ -1576,7 +1576,7 @@ is_process_alive(Name, Node) ->
-spec quorum_messages(rabbit_amqqueue:name()) -> non_neg_integer().

quorum_messages(QName) ->
ets_lookup_element(queue_coarse_metrics, QName, 4, 0).
ets:lookup_element(queue_coarse_metrics, QName, 4, 0).

quorum_ctag(Int) when is_integer(Int) ->
integer_to_binary(Int);
Expand Down Expand Up @@ -1695,14 +1695,6 @@ prepare_content(Content) ->
%% rabbit_fifo can directly parse it without having to decode again.
Content.

ets_lookup_element(Tbl, Key, Pos, Default) ->
try ets:lookup_element(Tbl, Key, Pos) of
V -> V
catch
_:badarg ->
Default
end.

erpc_call(Node, M, F, A, _Timeout)
when Node =:= node() ->
%% Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned:
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_router.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ match_bindings(SrcName, Match) ->
match_result().

match_routing_key(SrcName, RoutingKeys) ->
rabbit_db_binding:match_routing_key(SrcName, RoutingKeys, false).
rabbit_db_binding:match_routing_key(SrcName, RoutingKeys).
6 changes: 3 additions & 3 deletions deps/rabbit/test/rabbit_db_binding_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,10 @@ match_routing_key1(_Config) ->
Exchange2 = #exchange{name = XName2, durable = true},
Binding = #binding{source = XName1, key = <<"*.*">>, destination = XName2,
args = #{foo => bar}},
?assertEqual([], rabbit_db_binding:match_routing_key(XName1, [<<"a.b.c">>], false)),
?assertEqual([], rabbit_db_binding:match_routing_key(XName1, [<<"a.b.c">>])),
?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange2)),
?assertMatch(ok, rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
?assertEqual([], rabbit_db_binding:match_routing_key(XName1, [<<"a.b.c">>], false)),
?assertEqual([XName2], rabbit_db_binding:match_routing_key(XName1, [<<"a.b">>], false)),
?assertEqual([], rabbit_db_binding:match_routing_key(XName1, [<<"a.b.c">>])),
?assertEqual([XName2], rabbit_db_binding:match_routing_key(XName1, [<<"a.b">>])),
passed.
39 changes: 2 additions & 37 deletions deps/rabbit_common/src/rabbit_net.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
-include_lib("kernel/include/net_address.hrl").

-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, sync_recv/2, async_recv/3, port_command/2, getopts/2,
recv/1, sync_recv/2, async_recv/3, getopts/2,
setopts/2, send/2, close/1, fast_close/1, sockname/1, peername/1,
peercert/1, connection_string/2, socket_ends/2, is_loopback/1,
tcp_host/1, unwrap_socket/1, maybe_get_proxy_socket/1,
Expand Down Expand Up @@ -50,15 +50,14 @@
rabbit_types:error(any()).
-spec async_recv(socket(), integer(), timeout()) ->
rabbit_types:ok(any()).
-spec port_command(socket(), iolist()) -> 'true'.
-spec getopts
(socket(),
[atom() |
{raw, non_neg_integer(), non_neg_integer(),
non_neg_integer() | binary()}]) ->
ok_val_or_error(opts()).
-spec setopts(socket(), opts()) -> ok_or_any_error().
-spec send(socket(), binary() | iolist()) -> ok_or_any_error().
-spec send(socket(), iodata()) -> ok_or_any_error().
-spec close(socket()) -> ok_or_any_error().
-spec fast_close(socket()) -> ok_or_any_error().
-spec sockname(socket()) ->
Expand Down Expand Up @@ -161,40 +160,6 @@ async_recv(Sock, Length, infinity) when is_port(Sock) ->
async_recv(Sock, Length, Timeout) when is_port(Sock) ->
prim_inet:async_recv(Sock, Length, Timeout).

port_command(Sock, Data) when ?IS_SSL(Sock) ->
case ssl:send(Sock, Data) of
ok -> self() ! {inet_reply, Sock, ok},
true;
{error, Reason} -> erlang:error(Reason)
end;
port_command(Sock, Data) when is_port(Sock) ->
Fun = case persistent_term:get(rabbit_net_tcp_send, undefined) of
undefined ->
Rel = list_to_integer(erlang:system_info(otp_release)),
%% gen_tcp:send/2 does a selective receive of
%% {inet_reply, Sock, Status[, CallerTag]}
F = if Rel >= 26 ->
%% Selective receive is optimised:
%% https://github.com/erlang/otp/issues/6455
fun gen_tcp_send/2;
Rel < 26 ->
%% Avoid costly selective receive.
fun erlang:port_command/2
end,
ok = persistent_term:put(rabbit_net_tcp_send, F),
F;
F ->
F
end,
Fun(Sock, Data).

gen_tcp_send(Sock, Data) ->
case gen_tcp:send(Sock, Data) of
ok -> self() ! {inet_reply, Sock, ok},
true;
{error, Reason} -> erlang:error(Reason)
end.

getopts(Sock, Options) when ?IS_SSL(Sock) ->
ssl:getopts(Sock, Options);
getopts(Sock, Options) when is_port(Sock) ->
Expand Down
Loading