Skip to content

Commit 4ce920c

Browse files
authored
Merge pull request #3550 from rabbitmq/stream-queue-consumer-fix
Stream queue: use local pid for offset listeners
2 parents 49a4758 + db3944c commit 4ce920c

File tree

1 file changed

+17
-20
lines changed

1 file changed

+17
-20
lines changed

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,8 @@ begin_stream(#stream_client{name = QName, readers = Readers0} = State0,
274274
Actions = [],
275275
%% TODO: we need to monitor the local pid in case the stream is
276276
%% restarted
277-
{ok, State#stream_client{readers = Readers0#{Tag => Str0}}, Actions}
277+
{ok, State#stream_client{local_pid = LocalPid,
278+
readers = Readers0#{Tag => Str0}}, Actions}
278279
end.
279280

280281
cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
@@ -290,11 +291,11 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
290291

291292
credit(CTag, Credit, Drain, #stream_client{readers = Readers0,
292293
name = Name,
293-
leader = Leader} = State) ->
294+
local_pid = LocalPid} = State) ->
294295
{Readers1, Msgs} = case Readers0 of
295296
#{CTag := #stream{credit = Credit0} = Str0} ->
296297
Str1 = Str0#stream{credit = Credit0 + Credit},
297-
{Str, Msgs0} = stream_entries(Name, Leader, Str1),
298+
{Str, Msgs0} = stream_entries(Name, LocalPid, Str1),
298299
{Readers0#{CTag => Str}, Msgs0};
299300
_ ->
300301
{Readers0, []}
@@ -379,19 +380,15 @@ handle_event({osiris_written, From, _WriterId, Corrs},
379380
{ok, State#stream_client{correlation = Correlation,
380381
slow = Slow}, [{settled, From, MsgIds}]};
381382
handle_event({osiris_offset, _From, _Offs},
382-
State = #stream_client{leader = Leader,
383+
State = #stream_client{local_pid = LocalPid,
383384
readers = Readers0,
384385
name = Name}) ->
385386
%% offset isn't actually needed as we use the atomic to read the
386387
%% current committed
387388
{Readers, TagMsgs} = maps:fold(
388389
fun (Tag, Str0, {Acc, TM}) ->
389-
{Str, Msgs} = stream_entries(Name, Leader, Str0),
390-
%% HACK for now, better to just return but
391-
%% tricky with acks credits
392-
%% that also evaluate the stream
393-
% gen_server:cast(self(), {stream_delivery, Tag, Msgs}),
394-
{Acc#{Tag => Str}, [{Tag, Leader, Msgs} | TM]}
390+
{Str, Msgs} = stream_entries(Name, LocalPid, Str0),
391+
{Acc#{Tag => Str}, [{Tag, LocalPid, Msgs} | TM]}
395392
end, {#{}, []}, Readers0),
396393
Ack = true,
397394
Deliveries = [{deliver, Tag, Ack, OffsetMsg}
@@ -414,13 +411,13 @@ recover(_VHost, Queues) ->
414411
end, {[], []}, Queues).
415412

416413
settle(complete, CTag, MsgIds, #stream_client{readers = Readers0,
417-
name = Name,
418-
leader = Leader} = State) ->
414+
local_pid = LocalPid,
415+
name = Name} = State) ->
419416
Credit = length(MsgIds),
420417
{Readers, Msgs} = case Readers0 of
421418
#{CTag := #stream{credit = Credit0} = Str0} ->
422419
Str1 = Str0#stream{credit = Credit0 + Credit},
423-
{Str, Msgs0} = stream_entries(Name, Leader, Str1),
420+
{Str, Msgs0} = stream_entries(Name, LocalPid, Str1),
424421
{Readers0#{CTag => Str}, Msgs0};
425422
_ ->
426423
{Readers0, []}
@@ -848,10 +845,10 @@ check_queue_exists_in_local_node(Q) ->
848845
maybe_send_reply(_ChPid, undefined) -> ok;
849846
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
850847

851-
stream_entries(Name, Id, Str) ->
852-
stream_entries(Name, Id, Str, []).
848+
stream_entries(Name, LocalPid, Str) ->
849+
stream_entries(Name, LocalPid, Str, []).
853850

854-
stream_entries(Name, LeaderPid,
851+
stream_entries(Name, LocalPid,
855852
#stream{name = QName,
856853
credit = Credit,
857854
start_offset = StartOffs,
@@ -863,7 +860,7 @@ stream_entries(Name, LeaderPid,
863860
NextOffset = osiris_log:next_offset(Seg),
864861
case NextOffset > LOffs of
865862
true ->
866-
osiris:register_offset_listener(LeaderPid, NextOffset),
863+
osiris:register_offset_listener(LocalPid, NextOffset),
867864
{Str0#stream{log = Seg,
868865
listening_offset = NextOffset}, MsgIn};
869866
false ->
@@ -877,7 +874,7 @@ stream_entries(Name, LeaderPid,
877874
Msg0 = binary_to_msg(QName, B),
878875
Msg = rabbit_basic:add_header(<<"x-stream-offset">>,
879876
long, O, Msg0),
880-
{Name, LeaderPid, O, false, Msg}
877+
{Name, LocalPid, O, false, Msg}
881878
end || {O, B} <- Records,
882879
O >= StartOffs],
883880

@@ -892,10 +889,10 @@ stream_entries(Name, LeaderPid,
892889
false ->
893890
%% if there are fewer Msgs than Entries0 it means there were non-events
894891
%% in the log and we should recurse and try again
895-
stream_entries(Name, LeaderPid, Str, MsgIn ++ Msgs)
892+
stream_entries(Name, LocalPid, Str, MsgIn ++ Msgs)
896893
end
897894
end;
898-
stream_entries(_Name, _Id, Str, Msgs) ->
895+
stream_entries(_Name, _LocalPid, Str, Msgs) ->
899896
{Str, Msgs}.
900897

901898
binary_to_msg(#resource{virtual_host = VHost,

0 commit comments

Comments
 (0)