Skip to content

Commit 95c1e6c

Browse files
Merge pull request #4372 from rabbitmq/mergify/bp/v3.9.x/pr-4371
Make Osiris listener registration idempotent (backport #4368) (backport #4371)
2 parents f3dc6bb + 486202c commit 95c1e6c

File tree

1 file changed

+71
-39
lines changed

1 file changed

+71
-39
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 71 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@
4141
counters :: atomics:atomics_ref(),
4242
properties :: map()}).
4343
-record(consumer,
44-
{configuration :: #consumer_configuration{}, credit :: integer(),
45-
log :: osiris_log:state()}).
44+
{configuration :: #consumer_configuration{},
45+
credit :: non_neg_integer(),
46+
log :: osiris_log:state(),
47+
last_listener_offset = undefined :: undefined | osiris:offset()}).
4648
-record(stream_connection_state,
4749
{data :: rabbit_stream_core:state(), blocked :: boolean(),
4850
consumers :: #{subscription_id() => #consumer{}}}).
@@ -1016,13 +1018,7 @@ open(cast,
10161018
[Reason]),
10171019
%% likely a connection problem
10181020
Consumer;
1019-
{{segment, Log1},
1020-
{credit, Credit1}} ->
1021-
Consumer#consumer{log =
1022-
Log1,
1023-
credit
1024-
=
1025-
Credit1}
1021+
{ok, Csmr} -> Csmr
10261022
end
10271023
end,
10281024
ConsumersAcc#{CorrelationId => Consumer1}
@@ -1850,11 +1846,9 @@ handle_frame_post_auth(Transport,
18501846
"peer",
18511847
[]),
18521848
throw({stop, normal});
1853-
{{segment, Log1}, {credit, Credit1}} ->
1854-
ConsumerState1 =
1855-
ConsumerState#consumer{log = Log1,
1856-
credit =
1857-
Credit1},
1849+
{ok,
1850+
#consumer{log = Log1, credit = Credit1} =
1851+
ConsumerState1} ->
18581852
Consumers1 =
18591853
Consumers#{SubscriptionId =>
18601854
ConsumerState1},
@@ -1923,19 +1917,20 @@ handle_frame_post_auth(Transport,
19231917
{credit, SubscriptionId, Credit}) ->
19241918
case Consumers of
19251919
#{SubscriptionId := Consumer} ->
1926-
#consumer{credit = AvailableCredit} = Consumer,
1920+
#consumer{credit = AvailableCredit, last_listener_offset = LLO} =
1921+
Consumer,
19271922
case send_chunks(Transport,
19281923
Consumer,
19291924
AvailableCredit + Credit,
1925+
LLO,
19301926
SendFileOct)
19311927
of
19321928
{error, closed} ->
19331929
rabbit_log_connection:info("Stream protocol connection has been closed by "
19341930
"peer",
19351931
[]),
19361932
throw({stop, normal});
1937-
{{segment, Log1}, {credit, Credit1}} ->
1938-
Consumer1 = Consumer#consumer{log = Log1, credit = Credit1},
1933+
{ok, Consumer1} ->
19391934
{Connection,
19401935
State#stream_connection_state{consumers =
19411936
Consumers#{SubscriptionId
@@ -2704,60 +2699,97 @@ send_file_callback(Transport,
27042699
set_consumer_offset(Counters, FirstOffsetInChunk)
27052700
end.
27062701

2707-
send_chunks(Transport, #consumer{credit = Credit} = State, Counter) ->
2708-
send_chunks(Transport, State, Credit, Counter).
2702+
send_chunks(Transport,
2703+
#consumer{credit = Credit, last_listener_offset = LastLstOffset} =
2704+
Consumer,
2705+
Counter) ->
2706+
send_chunks(Transport, Consumer, Credit, LastLstOffset, Counter).
27092707

2710-
send_chunks(_Transport, #consumer{log = Log}, 0, _Counter) ->
2711-
{{segment, Log}, {credit, 0}};
2708+
send_chunks(_Transport, Consumer, 0, LastLstOffset, _Counter) ->
2709+
{ok,
2710+
Consumer#consumer{credit = 0, last_listener_offset = LastLstOffset}};
27122711
send_chunks(Transport,
2713-
#consumer{log = Log} = State,
2712+
#consumer{log = Log} = Consumer,
27142713
Credit,
2714+
LastLstOffset,
27152715
Counter) ->
2716-
send_chunks(Transport, State, Log, Credit, true, Counter).
2716+
send_chunks(Transport,
2717+
Consumer,
2718+
Log,
2719+
Credit,
2720+
LastLstOffset,
2721+
true,
2722+
Counter).
27172723

27182724
send_chunks(_Transport,
2719-
_State,
2720-
Segment,
2725+
Consumer,
2726+
Log,
27212727
0 = _Credit,
2728+
LastLstOffset,
27222729
_Retry,
27232730
_Counter) ->
2724-
{{segment, Segment}, {credit, 0}};
2731+
{ok,
2732+
Consumer#consumer{log = Log,
2733+
credit = 0,
2734+
last_listener_offset = LastLstOffset}};
27252735
send_chunks(Transport,
27262736
#consumer{configuration = #consumer_configuration{socket = S}} =
2727-
State,
2728-
Segment,
2737+
Consumer,
2738+
Log,
27292739
Credit,
2740+
LastLstOffset,
27302741
Retry,
27312742
Counter) ->
2732-
case osiris_log:send_file(S, Segment,
2733-
send_file_callback(Transport, State, Counter))
2743+
case osiris_log:send_file(S, Log,
2744+
send_file_callback(Transport, Consumer, Counter))
27342745
of
2735-
{ok, Segment1} ->
2736-
send_chunks(Transport, State, Segment1, Credit - 1, true, Counter);
2746+
{ok, Log1} ->
2747+
send_chunks(Transport,
2748+
Consumer,
2749+
Log1,
2750+
Credit - 1,
2751+
LastLstOffset,
2752+
true,
2753+
Counter);
27372754
{error, closed} ->
27382755
{error, closed};
27392756
{error, enotconn} ->
27402757
{error, closed};
27412758
{error, Reason} ->
27422759
{error, Reason};
2743-
{end_of_stream, Segment1} ->
2760+
{end_of_stream, Log1} ->
27442761
case Retry of
27452762
true ->
27462763
timer:sleep(1),
27472764
send_chunks(Transport,
2748-
State,
2749-
Segment1,
2765+
Consumer,
2766+
Log1,
27502767
Credit,
2768+
LastLstOffset,
27512769
false,
27522770
Counter);
27532771
false ->
27542772
#consumer{configuration =
27552773
#consumer_configuration{member_pid =
27562774
LocalMember}} =
2757-
State,
2758-
osiris:register_offset_listener(LocalMember,
2759-
osiris_log:next_offset(Segment1)),
2760-
{{segment, Segment1}, {credit, Credit}}
2775+
Consumer,
2776+
NextOffset = osiris_log:next_offset(Log1),
2777+
LLO = case {LastLstOffset, NextOffset > LastLstOffset} of
2778+
{undefined, _} ->
2779+
osiris:register_offset_listener(LocalMember,
2780+
NextOffset),
2781+
NextOffset;
2782+
{_, true} ->
2783+
osiris:register_offset_listener(LocalMember,
2784+
NextOffset),
2785+
NextOffset;
2786+
_ ->
2787+
LastLstOffset
2788+
end,
2789+
{ok,
2790+
Consumer#consumer{log = Log1,
2791+
credit = Credit,
2792+
last_listener_offset = LLO}}
27612793
end
27622794
end.
27632795

0 commit comments

Comments
 (0)