Skip to content

Osiris 1.6.7 #9418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ bazel_dep(

bazel_dep(
name = "rabbitmq_osiris",
version = "1.6.4",
version = "1.6.7",
repo_name = "osiris",
)

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck prop
PLT_APPS += mnesia

dep_syslog = git https://github.com/schlagert/syslog 4.0.0
dep_osiris = git https://github.com/rabbitmq/osiris v1.6.4
dep_osiris = git https://github.com/rabbitmq/osiris v1.6.7
dep_systemd = hex 0.6.1
dep_seshat = git https://github.com/rabbitmq/seshat v0.6.1

Expand Down
2 changes: 2 additions & 0 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ writer_pid(StreamId) when is_list(StreamId) ->
MFA = {?MODULE, query_writer_pid, [StreamId]},
query_pid(StreamId, MFA).

-spec local_pid(string()) ->
{ok, pid()} | {error, not_found | term()}.
local_pid(StreamId) when is_list(StreamId) ->
MFA = {?MODULE, query_local_pid, [StreamId, node()]},
query_pid(StreamId, MFA).
Expand Down
114 changes: 52 additions & 62 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,11 @@ consume(Q, #{limiter_active := true}, _State)
when ?amqqueue_is_stream(Q) ->
{error, global_qos_not_supported_for_queue_type};
consume(Q, Spec,
#stream_client{filtering_supported = FilteringSupported} = QState0) when ?amqqueue_is_stream(Q) ->
#stream_client{filtering_supported = FilteringSupported} = QState0)
when ?amqqueue_is_stream(Q) ->
%% Messages should include the offset as a custom header.
case check_queue_exists_in_local_node(Q) of
ok ->
case get_local_pid(QState0) of
{LocalPid, QState} when is_pid(LocalPid) ->
#{no_ack := NoAck,
channel_pid := ChPid,
prefetch_count := ConsumerPrefetchCount,
Expand All @@ -249,30 +250,34 @@ consume(Q, Spec,
args := Args,
ok_msg := OkMsg} = Spec,
QName = amqqueue:get_name(Q),
case parse_offset_arg(rabbit_misc:table_lookup(Args, <<"x-stream-offset">>)) of
case parse_offset_arg(
rabbit_misc:table_lookup(Args, <<"x-stream-offset">>)) of
{error, _} = Err ->
Err;
{ok, OffsetSpec} ->
_ = rabbit_stream_coordinator:register_local_member_listener(Q),
rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, QName,
ConsumerPrefetchCount, false,
up, Args),
%% FIXME: reply needs to be sent before the stream begins sending
%% really it should be sent by the stream queue process like classic queues
%% do
maybe_send_reply(ChPid, OkMsg),
FilterSpec = filter_spec(Args),
case {FilterSpec, FilteringSupported} of
{#{filter_spec := _}, false} ->
{protocol_error, precondition_failed, "Filtering is not supported", []};
{protocol_error, precondition_failed,
"Filtering is not supported", []};
_ ->
begin_stream(QState0, ConsumerTag, OffsetSpec,
rabbit_core_metrics:consumer_created(ChPid, ConsumerTag,
ExclusiveConsume,
not NoAck, QName,
ConsumerPrefetchCount,
false, up, Args),
%% reply needs to be sent before the stream
%% begins sending
maybe_send_reply(ChPid, OkMsg),
_ = rabbit_stream_coordinator:register_local_member_listener(Q),
begin_stream(QState, ConsumerTag, OffsetSpec,
ConsumerPrefetchCount, FilterSpec)
end
end;
Err ->
Err
{undefined, _} ->
{protocol_error, precondition_failed,
"queue '~ts' does not have a running replica on the local node",
[rabbit_misc:rs(amqqueue:get_name(Q))]}
end.

-spec parse_offset_arg(undefined |
Expand Down Expand Up @@ -338,8 +343,7 @@ get_local_pid(#stream_client{local_pid = Pid} = State)
get_local_pid(#stream_client{leader = Pid} = State)
when is_pid(Pid) andalso node(Pid) == node() ->
{Pid, State#stream_client{local_pid = Pid}};
get_local_pid(#stream_client{stream_id = StreamId,
local_pid = undefined} = State) ->
get_local_pid(#stream_client{stream_id = StreamId} = State) ->
%% query local coordinator to get pid
case rabbit_stream_coordinator:local_pid(StreamId) of
{ok, Pid} ->
Expand All @@ -348,34 +352,30 @@ get_local_pid(#stream_client{stream_id = StreamId,
{undefined, State}
end.

begin_stream(#stream_client{name = QName, readers = Readers0} = State0,
Tag, Offset, Max, Options) ->
{LocalPid, State} = get_local_pid(State0),
case LocalPid of
undefined ->
{error, no_local_stream_replica_available};
_ ->
CounterSpec = {{?MODULE, QName, Tag, self()}, []},
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options),
NextOffset = osiris_log:next_offset(Seg0) - 1,
osiris:register_offset_listener(LocalPid, NextOffset),
%% TODO: avoid double calls to the same process
StartOffset = case Offset of
first -> NextOffset;
last -> NextOffset;
next -> NextOffset;
{timestamp, _} -> NextOffset;
_ -> Offset
end,
Str0 = #stream{credit = Max,
start_offset = StartOffset,
listening_offset = NextOffset,
log = Seg0,
max = Max,
reader_options = Options},
{ok, State#stream_client{local_pid = LocalPid,
readers = Readers0#{Tag => Str0}}}
end.
begin_stream(#stream_client{name = QName,
readers = Readers0,
local_pid = LocalPid} = State,
Tag, Offset, Max, Options)
when is_pid(LocalPid) ->
CounterSpec = {{?MODULE, QName, Tag, self()}, []},
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options),
NextOffset = osiris_log:next_offset(Seg0) - 1,
osiris:register_offset_listener(LocalPid, NextOffset),
StartOffset = case Offset of
first -> NextOffset;
last -> NextOffset;
next -> NextOffset;
{timestamp, _} -> NextOffset;
_ -> Offset
end,
Str0 = #stream{credit = Max,
start_offset = StartOffset,
listening_offset = NextOffset,
log = Seg0,
max = Max,
reader_options = Options},
{ok, State#stream_client{local_pid = LocalPid,
readers = Readers0#{Tag => Str0}}}.

cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
name = QName} = State) ->
Expand All @@ -395,8 +395,8 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
end.

credit(QName, CTag, Credit, Drain, #stream_client{readers = Readers0,
name = Name,
local_pid = LocalPid} = State) ->
name = Name,
local_pid = LocalPid} = State) ->
{Readers1, Msgs} = case Readers0 of
#{CTag := #stream{credit = Credit0} = Str0} ->
Str1 = Str0#stream{credit = Credit0 + Credit},
Expand All @@ -410,7 +410,8 @@ credit(QName, CTag, Credit, Drain, #stream_client{readers = Readers0,
true ->
case Readers1 of
#{CTag := #stream{credit = Credit1} = Str2} ->
{Readers0#{CTag => Str2#stream{credit = 0}}, [{send_drained, {CTag, Credit1}}]};
{Readers0#{CTag => Str2#stream{credit = 0}},
[{send_drained, {CTag, Credit1}}]};
_ ->
{Readers1, []}
end;
Expand Down Expand Up @@ -443,7 +444,7 @@ deliver0(MsgId, Msg,
soft_limit = SftLmt,
slow = Slow0,
filtering_supported = FilteringSupported} = State,
Actions0) ->
Actions0) ->
ok = osiris:write(LeaderPid, WriterId, Seq,
stream_message(Msg, FilteringSupported)),
Correlation = case MsgId of
Expand Down Expand Up @@ -993,17 +994,6 @@ stream_name(#resource{virtual_host = VHost, name = Name}) ->
recover(Q) ->
{ok, Q}.

check_queue_exists_in_local_node(Q) ->
#{name := StreamId} = amqqueue:get_type_state(Q),
case rabbit_stream_coordinator:local_pid(StreamId) of
{ok, Pid} when is_pid(Pid) ->
ok;
_ ->
{protocol_error, precondition_failed,
"queue '~ts' does not have a replica on the local node",
[rabbit_misc:rs(amqqueue:get_name(Q))]}
end.

maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).

Expand Down