41
41
counters :: atomics :atomics_ref (),
42
42
properties :: map ()}).
43
43
-record (consumer ,
44
- {configuration :: # consumer_configuration {}, credit :: integer (),
45
- log :: osiris_log :state ()}).
44
+ {configuration :: # consumer_configuration {},
45
+ credit :: integer (),
46
+ log :: osiris_log :state (),
47
+ last_listener_offset = undefined :: undefined | osiris :offset ()}).
46
48
-record (stream_connection_state ,
47
49
{data :: rabbit_stream_core :state (), blocked :: boolean (),
48
50
consumers :: #{subscription_id () => # consumer {}}}).
@@ -1017,12 +1019,17 @@ open(cast,
1017
1019
% % likely a connection problem
1018
1020
Consumer ;
1019
1021
{{segment , Log1 },
1020
- {credit , Credit1 }} ->
1022
+ {credit , Credit1 },
1023
+ {last_listener_offset ,
1024
+ LLO1 }} ->
1021
1025
Consumer # consumer {log =
1022
1026
Log1 ,
1023
1027
credit
1024
1028
=
1025
- Credit1 }
1029
+ Credit1 ,
1030
+ last_listener_offset
1031
+ =
1032
+ LLO1 }
1026
1033
end
1027
1034
end ,
1028
1035
ConsumersAcc #{CorrelationId => Consumer1 }
@@ -1850,11 +1857,13 @@ handle_frame_post_auth(Transport,
1850
1857
" peer" ,
1851
1858
[]),
1852
1859
throw ({stop , normal });
1853
- {{segment , Log1 }, {credit , Credit1 }} ->
1860
+ {{segment , Log1 }, {credit , Credit1 },
1861
+ {last_listener_offset , LLO1 }} ->
1854
1862
ConsumerState1 =
1855
1863
ConsumerState # consumer {log = Log1 ,
1856
- credit =
1857
- Credit1 },
1864
+ credit = Credit1 ,
1865
+ last_listener_offset
1866
+ = LLO1 },
1858
1867
Consumers1 =
1859
1868
Consumers #{SubscriptionId =>
1860
1869
ConsumerState1 },
@@ -1923,19 +1932,25 @@ handle_frame_post_auth(Transport,
1923
1932
{credit , SubscriptionId , Credit }) ->
1924
1933
case Consumers of
1925
1934
#{SubscriptionId := Consumer } ->
1926
- # consumer {credit = AvailableCredit } = Consumer ,
1935
+ # consumer {credit = AvailableCredit , last_listener_offset = LLO } =
1936
+ Consumer ,
1927
1937
case send_chunks (Transport ,
1928
1938
Consumer ,
1929
1939
AvailableCredit + Credit ,
1940
+ LLO ,
1930
1941
SendFileOct )
1931
1942
of
1932
1943
{error , closed } ->
1933
1944
rabbit_log_connection :info (" Stream protocol connection has been closed by "
1934
1945
" peer" ,
1935
1946
[]),
1936
1947
throw ({stop , normal });
1937
- {{segment , Log1 }, {credit , Credit1 }} ->
1938
- Consumer1 = Consumer # consumer {log = Log1 , credit = Credit1 },
1948
+ {{segment , Log1 }, {credit , Credit1 },
1949
+ {last_listener_offset , LLO1 }} ->
1950
+ Consumer1 =
1951
+ Consumer # consumer {log = Log1 ,
1952
+ credit = Credit1 ,
1953
+ last_listener_offset = LLO1 },
1939
1954
{Connection ,
1940
1955
State # stream_connection_state {consumers =
1941
1956
Consumers #{SubscriptionId
@@ -2704,36 +2719,59 @@ send_file_callback(Transport,
2704
2719
set_consumer_offset (Counters , FirstOffsetInChunk )
2705
2720
end .
2706
2721
2707
- send_chunks (Transport , # consumer {credit = Credit } = State , Counter ) ->
2708
- send_chunks (Transport , State , Credit , Counter ).
2722
+ send_chunks (Transport ,
2723
+ # consumer {credit = Credit , last_listener_offset = LastLstOffset } =
2724
+ State ,
2725
+ Counter ) ->
2726
+ send_chunks (Transport , State , Credit , LastLstOffset , Counter ).
2709
2727
2710
- send_chunks (_Transport , # consumer {log = Log }, 0 , _Counter ) ->
2711
- {{segment , Log }, {credit , 0 }};
2728
+ send_chunks (_Transport ,
2729
+ # consumer {log = Log },
2730
+ 0 ,
2731
+ LastLstOffset ,
2732
+ _Counter ) ->
2733
+ {{segment , Log }, {credit , 0 }, {last_listener_offset , LastLstOffset }};
2712
2734
send_chunks (Transport ,
2713
2735
# consumer {log = Log } = State ,
2714
2736
Credit ,
2737
+ LastLstOffset ,
2715
2738
Counter ) ->
2716
- send_chunks (Transport , State , Log , Credit , true , Counter ).
2739
+ send_chunks (Transport ,
2740
+ State ,
2741
+ Log ,
2742
+ Credit ,
2743
+ LastLstOffset ,
2744
+ true ,
2745
+ Counter ).
2717
2746
2718
2747
send_chunks (_Transport ,
2719
2748
_State ,
2720
2749
Segment ,
2721
2750
0 = _Credit ,
2751
+ LastLstOffset ,
2722
2752
_Retry ,
2723
2753
_Counter ) ->
2724
- {{segment , Segment }, {credit , 0 }};
2754
+ {{segment , Segment }, {credit , 0 },
2755
+ {last_listener_offset , LastLstOffset }};
2725
2756
send_chunks (Transport ,
2726
2757
# consumer {configuration = # consumer_configuration {socket = S }} =
2727
2758
State ,
2728
2759
Segment ,
2729
2760
Credit ,
2761
+ LastLstOffset ,
2730
2762
Retry ,
2731
2763
Counter ) ->
2732
2764
case osiris_log :send_file (S , Segment ,
2733
2765
send_file_callback (Transport , State , Counter ))
2734
2766
of
2735
2767
{ok , Segment1 } ->
2736
- send_chunks (Transport , State , Segment1 , Credit - 1 , true , Counter );
2768
+ send_chunks (Transport ,
2769
+ State ,
2770
+ Segment1 ,
2771
+ Credit - 1 ,
2772
+ LastLstOffset ,
2773
+ true ,
2774
+ Counter );
2737
2775
{error , closed } ->
2738
2776
{error , closed };
2739
2777
{error , enotconn } ->
@@ -2748,16 +2786,29 @@ send_chunks(Transport,
2748
2786
State ,
2749
2787
Segment1 ,
2750
2788
Credit ,
2789
+ LastLstOffset ,
2751
2790
false ,
2752
2791
Counter );
2753
2792
false ->
2754
2793
# consumer {configuration =
2755
2794
# consumer_configuration {member_pid =
2756
2795
LocalMember }} =
2757
2796
State ,
2758
- osiris :register_offset_listener (LocalMember ,
2759
- osiris_log :next_offset (Segment1 )),
2760
- {{segment , Segment1 }, {credit , Credit }}
2797
+ NextOffset = osiris_log :next_offset (Segment1 ),
2798
+ LLO = case {LastLstOffset , NextOffset > LastLstOffset } of
2799
+ {undefined , _ } ->
2800
+ osiris :register_offset_listener (LocalMember ,
2801
+ NextOffset ),
2802
+ NextOffset ;
2803
+ {_ , true } ->
2804
+ osiris :register_offset_listener (LocalMember ,
2805
+ NextOffset ),
2806
+ NextOffset ;
2807
+ _ ->
2808
+ LastLstOffset
2809
+ end ,
2810
+ {{segment , Segment1 }, {credit , Credit },
2811
+ {last_listener_offset , LLO }}
2761
2812
end
2762
2813
end .
2763
2814
0 commit comments