Skip to content

Commit 1ed8f76

Browse files
committed
Check if member processes are alive in metadata command
In case the Mnesia record is stale.
1 parent d0d901f commit 1ed8f76

File tree

2 files changed

+93
-73
lines changed

2 files changed

+93
-73
lines changed

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long, ClusterSiz
9595
error;
9696
validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Locator} | T]) ->
9797
case lists:member(Locator, [<<"client-local">>,
98-
<<"random">>,
99-
<<"least-leaders">>]) of
98+
<<"random">>,
99+
<<"least-leaders">>]) of
100100
true ->
101101
validate_stream_queue_arguments(T);
102102
false ->
@@ -210,7 +210,25 @@ handle_call({topology, VirtualHost, Stream}, _From, State) ->
210210
{ok, Q} ->
211211
case is_stream_queue(Q) of
212212
true ->
213-
{ok, maps:with([leader_node, replica_nodes], amqqueue:get_type_state(Q))};
213+
QState = amqqueue:get_type_state(Q),
214+
ProcessAliveFun = fun(Pid) ->
215+
rpc:call(node(Pid), erlang, is_process_alive, [Pid], 10000)
216+
end,
217+
LeaderNode = case ProcessAliveFun(maps:get(leader_pid, QState)) of
218+
true ->
219+
maps:get(leader_node, QState);
220+
_ ->
221+
undefined
222+
end,
223+
ReplicaNodes = lists:foldl(fun(Pid, Acc) ->
224+
case ProcessAliveFun(Pid) of
225+
true ->
226+
Acc ++ [node(Pid)];
227+
_ ->
228+
Acc
229+
end
230+
end, [], maps:get(replica_pids, QState)),
231+
{ok, #{leader_node => LeaderNode, replica_nodes => ReplicaNodes}};
214232
_ ->
215233
{error, stream_not_found}
216234
end;

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 72 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -553,8 +553,8 @@ handle_frame_pre_auth(Transport, #stream_connection{socket = S} = Connection, St
553553
{Connection, State, Rest};
554554
handle_frame_pre_auth(Transport,
555555
#stream_connection{socket = S,
556-
authentication_state = AuthState0,
557-
host = Host} = Connection0, State,
556+
authentication_state = AuthState0,
557+
host = Host} = Connection0, State,
558558
<<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32,
559559
MechanismLength:16, Mechanism:MechanismLength/binary,
560560
SaslFragment/binary>>, Rest) ->
@@ -578,39 +578,39 @@ handle_frame_pre_auth(Transport,
578578
C1 = Connection0#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}},
579579
{C2, FrameFragment} =
580580
case AuthMechanism:handle_response(SaslBin, AuthState) of
581-
{refused, Username, Msg, Args} ->
582-
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
583-
auth_fail(Username, Msg, Args, C1, State),
584-
rabbit_log:warning(Msg, Args),
585-
{C1#stream_connection{connection_step = failure}, <<?RESPONSE_AUTHENTICATION_FAILURE:16>>};
586-
{protocol_error, Msg, Args} ->
587-
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, stream),
588-
notify_auth_result(none, user_authentication_failure,
589-
[{error, rabbit_misc:format(Msg, Args)}],
590-
C1, State),
591-
rabbit_log:warning(Msg, Args),
592-
{C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_ERROR:16>>};
593-
{challenge, Challenge, AuthState1} ->
594-
rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, stream),
595-
ChallengeSize = byte_size(Challenge),
596-
{C1#stream_connection{authentication_state = AuthState1, connection_step = authenticating},
597-
<<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>>
598-
};
599-
{ok, User = #user{username = Username}} ->
600-
case rabbit_access_control:check_user_loopback(Username, S) of
601-
ok ->
602-
rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, stream),
603-
notify_auth_result(Username, user_authentication_success,
604-
[], C1, State),
605-
{C1#stream_connection{authentication_state = done, user = User, connection_step = authenticated},
606-
<<?RESPONSE_CODE_OK:16>>
607-
};
608-
not_allowed ->
609-
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
610-
rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]),
611-
{C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>}
612-
end
613-
end,
581+
{refused, Username, Msg, Args} ->
582+
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
583+
auth_fail(Username, Msg, Args, C1, State),
584+
rabbit_log:warning(Msg, Args),
585+
{C1#stream_connection{connection_step = failure}, <<?RESPONSE_AUTHENTICATION_FAILURE:16>>};
586+
{protocol_error, Msg, Args} ->
587+
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, stream),
588+
notify_auth_result(none, user_authentication_failure,
589+
[{error, rabbit_misc:format(Msg, Args)}],
590+
C1, State),
591+
rabbit_log:warning(Msg, Args),
592+
{C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_ERROR:16>>};
593+
{challenge, Challenge, AuthState1} ->
594+
rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, stream),
595+
ChallengeSize = byte_size(Challenge),
596+
{C1#stream_connection{authentication_state = AuthState1, connection_step = authenticating},
597+
<<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>>
598+
};
599+
{ok, User = #user{username = Username}} ->
600+
case rabbit_access_control:check_user_loopback(Username, S) of
601+
ok ->
602+
rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, stream),
603+
notify_auth_result(Username, user_authentication_success,
604+
[], C1, State),
605+
{C1#stream_connection{authentication_state = done, user = User, connection_step = authenticated},
606+
<<?RESPONSE_CODE_OK:16>>
607+
};
608+
not_allowed ->
609+
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
610+
rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]),
611+
{C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>}
612+
end
613+
end,
614614
Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, FrameFragment/binary>>,
615615
frame(Transport, C1, Frame),
616616
{C2, Rest};
@@ -689,9 +689,9 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credi
689689
PublisherId:8/unsigned,
690690
MessageCount:32, Messages/binary>>, Rest) ->
691691
case rabbit_stream_utils:check_write_permitted(
692-
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
693-
User,
694-
#{}) of
692+
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
693+
User,
694+
#{}) of
695695
ok ->
696696
case lookup_leader(Stream, Connection) of
697697
cluster_not_found ->
@@ -721,9 +721,9 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket,
721721
<<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned, StreamSize:16, Stream:StreamSize/binary,
722722
OffsetType:16/signed, OffsetAndCredit/binary>>, Rest) ->
723723
case rabbit_stream_utils:check_read_permitted(
724-
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
725-
User,
726-
#{}) of
724+
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
725+
User,
726+
#{}) of
727727
ok ->
728728
case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
729729
{error, not_available} ->
@@ -851,13 +851,13 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, send_file_oct =
851851
handle_frame_post_auth(_Transport, #stream_connection{virtual_host = VirtualHost, user = User} = Connection,
852852
State,
853853
<<?COMMAND_COMMIT_OFFSET:16, ?VERSION_0:16, _CorrelationId:32,
854-
ReferenceSize:16, Reference:ReferenceSize/binary,
855-
StreamSize:16, Stream:StreamSize/binary, Offset:64>>, Rest) ->
854+
ReferenceSize:16, Reference:ReferenceSize/binary,
855+
StreamSize:16, Stream:StreamSize/binary, Offset:64>>, Rest) ->
856856

857857
case rabbit_stream_utils:check_write_permitted(
858-
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
859-
User,
860-
#{}) of
858+
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
859+
User,
860+
#{}) of
861861
ok ->
862862
case lookup_leader(Stream, Connection) of
863863
cluster_not_found ->
@@ -880,24 +880,24 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
880880
StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
881881
FrameSize = ?RESPONSE_FRAME_SIZE + 8,
882882
{ResponseCode, Offset} = case rabbit_stream_utils:check_read_permitted(
883-
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
884-
User,
885-
#{}) of
886-
ok ->
887-
case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
888-
{error, not_found} ->
889-
{?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0};
890-
{ok, LocalMemberPid} ->
891-
{?RESPONSE_CODE_OK, case osiris:read_tracking(LocalMemberPid, Reference) of
892-
undefined ->
893-
0;
894-
Offt ->
895-
Offt
896-
end}
897-
end;
898-
error ->
899-
{?RESPONSE_CODE_ACCESS_REFUSED, 0}
900-
end,
883+
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
884+
User,
885+
#{}) of
886+
ok ->
887+
case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
888+
{error, not_found} ->
889+
{?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0};
890+
{ok, LocalMemberPid} ->
891+
{?RESPONSE_CODE_OK, case osiris:read_tracking(LocalMemberPid, Reference) of
892+
undefined ->
893+
0;
894+
Offt ->
895+
Offt
896+
end}
897+
end;
898+
error ->
899+
{?RESPONSE_CODE_ACCESS_REFUSED, 0}
900+
end,
901901
Transport:send(S, [<<FrameSize:32, ?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16>>,
902902
<<CorrelationId:32>>, <<ResponseCode:16>>, <<Offset:64>>]),
903903
{Connection, State, Rest};
@@ -909,9 +909,9 @@ handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost,
909909
{ok, StreamName} ->
910910
{Arguments, _Rest} = rabbit_stream_utils:parse_map(ArgumentsBinary, ArgumentsCount),
911911
case rabbit_stream_utils:check_configure_permitted(
912-
#resource{name = StreamName, kind = queue, virtual_host = VirtualHost},
913-
User,
914-
#{}) of
912+
#resource{name = StreamName, kind = queue, virtual_host = VirtualHost},
913+
User,
914+
#{}) of
915915
ok ->
916916
case rabbit_stream_manager:create(VirtualHost, StreamName, Arguments, Username) of
917917
{ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} ->
@@ -940,9 +940,9 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
940940
user = #user{username = Username} = User} = Connection, State,
941941
<<?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
942942
case rabbit_stream_utils:check_configure_permitted(
943-
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
944-
User,
945-
#{}) of
943+
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
944+
User,
945+
#{}) of
946946
ok ->
947947
case rabbit_stream_manager:delete(VirtualHost, Stream, Username) of
948948
{ok, deleted} ->
@@ -973,6 +973,8 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
973973
%% get the nodes involved in the streams
974974
NodesMap = lists:foldl(fun(Stream, Acc) ->
975975
case rabbit_stream_manager:topology(VirtualHost, Stream) of
976+
{ok, #{leader_node := undefined, replica_nodes := ReplicaNodes}} ->
977+
lists:foldl(fun(ReplicaNode, NodesAcc) -> maps:put(ReplicaNode, ok, NodesAcc) end, Acc, ReplicaNodes);
976978
{ok, #{leader_node := LeaderNode, replica_nodes := ReplicaNodes}} ->
977979
Acc1 = maps:put(LeaderNode, ok, Acc),
978980
lists:foldl(fun(ReplicaNode, NodesAcc) -> maps:put(ReplicaNode, ok, NodesAcc) end, Acc1, ReplicaNodes);

0 commit comments

Comments
 (0)