Skip to content

Commit 90d110e

Browse files
committed
Return consumer in send_chunks
Not a tuple. The passed-in consumer is updated with the returned tuple, so we're better off returning the updated consumer record. References #4368
1 parent aa88fd5 commit 90d110e

File tree

1 file changed

+35
-54
lines changed

1 file changed

+35
-54
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 35 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
properties :: map()}).
4343
-record(consumer,
4444
{configuration :: #consumer_configuration{},
45-
credit :: integer(),
45+
credit :: non_neg_integer(),
4646
log :: osiris_log:state(),
4747
last_listener_offset = undefined :: undefined | osiris:offset()}).
4848
-record(stream_connection_state,
@@ -1018,18 +1018,7 @@ open(cast,
10181018
[Reason]),
10191019
%% likely a connection problem
10201020
Consumer;
1021-
{{segment, Log1},
1022-
{credit, Credit1},
1023-
{last_listener_offset,
1024-
LLO1}} ->
1025-
Consumer#consumer{log =
1026-
Log1,
1027-
credit
1028-
=
1029-
Credit1,
1030-
last_listener_offset
1031-
=
1032-
LLO1}
1021+
{ok, Csmr} -> Csmr
10331022
end
10341023
end,
10351024
ConsumersAcc#{CorrelationId => Consumer1}
@@ -1857,13 +1846,9 @@ handle_frame_post_auth(Transport,
18571846
"peer",
18581847
[]),
18591848
throw({stop, normal});
1860-
{{segment, Log1}, {credit, Credit1},
1861-
{last_listener_offset, LLO1}} ->
1862-
ConsumerState1 =
1863-
ConsumerState#consumer{log = Log1,
1864-
credit = Credit1,
1865-
last_listener_offset
1866-
= LLO1},
1849+
{ok,
1850+
#consumer{log = Log1, credit = Credit1} =
1851+
ConsumerState1} ->
18671852
Consumers1 =
18681853
Consumers#{SubscriptionId =>
18691854
ConsumerState1},
@@ -1945,12 +1930,7 @@ handle_frame_post_auth(Transport,
19451930
"peer",
19461931
[]),
19471932
throw({stop, normal});
1948-
{{segment, Log1}, {credit, Credit1},
1949-
{last_listener_offset, LLO1}} ->
1950-
Consumer1 =
1951-
Consumer#consumer{log = Log1,
1952-
credit = Credit1,
1953-
last_listener_offset = LLO1},
1933+
{ok, Consumer1} ->
19541934
{Connection,
19551935
State#stream_connection_state{consumers =
19561936
Consumers#{SubscriptionId
@@ -2721,53 +2701,52 @@ send_file_callback(Transport,
27212701

27222702
send_chunks(Transport,
27232703
#consumer{credit = Credit, last_listener_offset = LastLstOffset} =
2724-
State,
2704+
Consumer,
27252705
Counter) ->
2726-
send_chunks(Transport, State, Credit, LastLstOffset, Counter).
2706+
send_chunks(Transport, Consumer, Credit, LastLstOffset, Counter).
27272707

2728-
send_chunks(_Transport,
2729-
#consumer{log = Log},
2730-
0,
2731-
LastLstOffset,
2732-
_Counter) ->
2733-
{{segment, Log}, {credit, 0}, {last_listener_offset, LastLstOffset}};
2708+
send_chunks(_Transport, Consumer, 0, LastLstOffset, _Counter) ->
2709+
{ok,
2710+
Consumer#consumer{credit = 0, last_listener_offset = LastLstOffset}};
27342711
send_chunks(Transport,
2735-
#consumer{log = Log} = State,
2712+
#consumer{log = Log} = Consumer,
27362713
Credit,
27372714
LastLstOffset,
27382715
Counter) ->
27392716
send_chunks(Transport,
2740-
State,
2717+
Consumer,
27412718
Log,
27422719
Credit,
27432720
LastLstOffset,
27442721
true,
27452722
Counter).
27462723

27472724
send_chunks(_Transport,
2748-
_State,
2749-
Segment,
2725+
Consumer,
2726+
Log,
27502727
0 = _Credit,
27512728
LastLstOffset,
27522729
_Retry,
27532730
_Counter) ->
2754-
{{segment, Segment}, {credit, 0},
2755-
{last_listener_offset, LastLstOffset}};
2731+
{ok,
2732+
Consumer#consumer{log = Log,
2733+
credit = 0,
2734+
last_listener_offset = LastLstOffset}};
27562735
send_chunks(Transport,
27572736
#consumer{configuration = #consumer_configuration{socket = S}} =
2758-
State,
2759-
Segment,
2737+
Consumer,
2738+
Log,
27602739
Credit,
27612740
LastLstOffset,
27622741
Retry,
27632742
Counter) ->
2764-
case osiris_log:send_file(S, Segment,
2765-
send_file_callback(Transport, State, Counter))
2743+
case osiris_log:send_file(S, Log,
2744+
send_file_callback(Transport, Consumer, Counter))
27662745
of
2767-
{ok, Segment1} ->
2746+
{ok, Log1} ->
27682747
send_chunks(Transport,
2769-
State,
2770-
Segment1,
2748+
Consumer,
2749+
Log1,
27712750
Credit - 1,
27722751
LastLstOffset,
27732752
true,
@@ -2778,13 +2757,13 @@ send_chunks(Transport,
27782757
{error, closed};
27792758
{error, Reason} ->
27802759
{error, Reason};
2781-
{end_of_stream, Segment1} ->
2760+
{end_of_stream, Log1} ->
27822761
case Retry of
27832762
true ->
27842763
timer:sleep(1),
27852764
send_chunks(Transport,
2786-
State,
2787-
Segment1,
2765+
Consumer,
2766+
Log1,
27882767
Credit,
27892768
LastLstOffset,
27902769
false,
@@ -2793,8 +2772,8 @@ send_chunks(Transport,
27932772
#consumer{configuration =
27942773
#consumer_configuration{member_pid =
27952774
LocalMember}} =
2796-
State,
2797-
NextOffset = osiris_log:next_offset(Segment1),
2775+
Consumer,
2776+
NextOffset = osiris_log:next_offset(Log1),
27982777
LLO = case {LastLstOffset, NextOffset > LastLstOffset} of
27992778
{undefined, _} ->
28002779
osiris:register_offset_listener(LocalMember,
@@ -2807,8 +2786,10 @@ send_chunks(Transport,
28072786
_ ->
28082787
LastLstOffset
28092788
end,
2810-
{{segment, Segment1}, {credit, Credit},
2811-
{last_listener_offset, LLO}}
2789+
{ok,
2790+
Consumer#consumer{log = Log1,
2791+
credit = Credit,
2792+
last_listener_offset = LLO}}
28122793
end
28132794
end.
28142795

0 commit comments

Comments
 (0)