Skip to content

Commit db3944c

Browse files
committed
Stream queue: use local pid for offset listeners
When a consumer reaches the end of a stream it need to register an offset listener with the local stream member so that it can be notified when new stream messages are committed. The stream queue implementation for some reason registered offset listeners with the leader, not the local member.
1 parent 49a4758 commit db3944c

File tree

1 file changed

+17
-20
lines changed

1 file changed

+17
-20
lines changed

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,8 @@ begin_stream(#stream_client{name = QName, readers = Readers0} = State0,
274274
Actions = [],
275275
%% TODO: we need to monitor the local pid in case the stream is
276276
%% restarted
277-
{ok, State#stream_client{readers = Readers0#{Tag => Str0}}, Actions}
277+
{ok, State#stream_client{local_pid = LocalPid,
278+
readers = Readers0#{Tag => Str0}}, Actions}
278279
end.
279280

280281
cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
@@ -290,11 +291,11 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
290291

291292
credit(CTag, Credit, Drain, #stream_client{readers = Readers0,
292293
name = Name,
293-
leader = Leader} = State) ->
294+
local_pid = LocalPid} = State) ->
294295
{Readers1, Msgs} = case Readers0 of
295296
#{CTag := #stream{credit = Credit0} = Str0} ->
296297
Str1 = Str0#stream{credit = Credit0 + Credit},
297-
{Str, Msgs0} = stream_entries(Name, Leader, Str1),
298+
{Str, Msgs0} = stream_entries(Name, LocalPid, Str1),
298299
{Readers0#{CTag => Str}, Msgs0};
299300
_ ->
300301
{Readers0, []}
@@ -379,19 +380,15 @@ handle_event({osiris_written, From, _WriterId, Corrs},
379380
{ok, State#stream_client{correlation = Correlation,
380381
slow = Slow}, [{settled, From, MsgIds}]};
381382
handle_event({osiris_offset, _From, _Offs},
382-
State = #stream_client{leader = Leader,
383+
State = #stream_client{local_pid = LocalPid,
383384
readers = Readers0,
384385
name = Name}) ->
385386
%% offset isn't actually needed as we use the atomic to read the
386387
%% current committed
387388
{Readers, TagMsgs} = maps:fold(
388389
fun (Tag, Str0, {Acc, TM}) ->
389-
{Str, Msgs} = stream_entries(Name, Leader, Str0),
390-
%% HACK for now, better to just return but
391-
%% tricky with acks credits
392-
%% that also evaluate the stream
393-
% gen_server:cast(self(), {stream_delivery, Tag, Msgs}),
394-
{Acc#{Tag => Str}, [{Tag, Leader, Msgs} | TM]}
390+
{Str, Msgs} = stream_entries(Name, LocalPid, Str0),
391+
{Acc#{Tag => Str}, [{Tag, LocalPid, Msgs} | TM]}
395392
end, {#{}, []}, Readers0),
396393
Ack = true,
397394
Deliveries = [{deliver, Tag, Ack, OffsetMsg}
@@ -414,13 +411,13 @@ recover(_VHost, Queues) ->
414411
end, {[], []}, Queues).
415412

416413
settle(complete, CTag, MsgIds, #stream_client{readers = Readers0,
417-
name = Name,
418-
leader = Leader} = State) ->
414+
local_pid = LocalPid,
415+
name = Name} = State) ->
419416
Credit = length(MsgIds),
420417
{Readers, Msgs} = case Readers0 of
421418
#{CTag := #stream{credit = Credit0} = Str0} ->
422419
Str1 = Str0#stream{credit = Credit0 + Credit},
423-
{Str, Msgs0} = stream_entries(Name, Leader, Str1),
420+
{Str, Msgs0} = stream_entries(Name, LocalPid, Str1),
424421
{Readers0#{CTag => Str}, Msgs0};
425422
_ ->
426423
{Readers0, []}
@@ -848,10 +845,10 @@ check_queue_exists_in_local_node(Q) ->
848845
maybe_send_reply(_ChPid, undefined) -> ok;
849846
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
850847

851-
stream_entries(Name, Id, Str) ->
852-
stream_entries(Name, Id, Str, []).
848+
stream_entries(Name, LocalPid, Str) ->
849+
stream_entries(Name, LocalPid, Str, []).
853850

854-
stream_entries(Name, LeaderPid,
851+
stream_entries(Name, LocalPid,
855852
#stream{name = QName,
856853
credit = Credit,
857854
start_offset = StartOffs,
@@ -863,7 +860,7 @@ stream_entries(Name, LeaderPid,
863860
NextOffset = osiris_log:next_offset(Seg),
864861
case NextOffset > LOffs of
865862
true ->
866-
osiris:register_offset_listener(LeaderPid, NextOffset),
863+
osiris:register_offset_listener(LocalPid, NextOffset),
867864
{Str0#stream{log = Seg,
868865
listening_offset = NextOffset}, MsgIn};
869866
false ->
@@ -877,7 +874,7 @@ stream_entries(Name, LeaderPid,
877874
Msg0 = binary_to_msg(QName, B),
878875
Msg = rabbit_basic:add_header(<<"x-stream-offset">>,
879876
long, O, Msg0),
880-
{Name, LeaderPid, O, false, Msg}
877+
{Name, LocalPid, O, false, Msg}
881878
end || {O, B} <- Records,
882879
O >= StartOffs],
883880

@@ -892,10 +889,10 @@ stream_entries(Name, LeaderPid,
892889
false ->
893890
%% if there are fewer Msgs than Entries0 it means there were non-events
894891
%% in the log and we should recurse and try again
895-
stream_entries(Name, LeaderPid, Str, MsgIn ++ Msgs)
892+
stream_entries(Name, LocalPid, Str, MsgIn ++ Msgs)
896893
end
897894
end;
898-
stream_entries(_Name, _Id, Str, Msgs) ->
895+
stream_entries(_Name, _LocalPid, Str, Msgs) ->
899896
{Str, Msgs}.
900897

901898
binary_to_msg(#resource{virtual_host = VHost,

0 commit comments

Comments
 (0)