42
42
properties :: map ()}).
43
43
-record (consumer ,
44
44
{configuration :: # consumer_configuration {},
45
- credit :: integer (),
45
+ credit :: non_neg_integer (),
46
46
log :: osiris_log :state (),
47
47
last_listener_offset = undefined :: undefined | osiris :offset ()}).
48
48
-record (stream_connection_state ,
@@ -1018,18 +1018,7 @@ open(cast,
1018
1018
[Reason ]),
1019
1019
% % likely a connection problem
1020
1020
Consumer ;
1021
- {{segment , Log1 },
1022
- {credit , Credit1 },
1023
- {last_listener_offset ,
1024
- LLO1 }} ->
1025
- Consumer # consumer {log =
1026
- Log1 ,
1027
- credit
1028
- =
1029
- Credit1 ,
1030
- last_listener_offset
1031
- =
1032
- LLO1 }
1021
+ {ok , Csmr } -> Csmr
1033
1022
end
1034
1023
end ,
1035
1024
ConsumersAcc #{CorrelationId => Consumer1 }
@@ -1857,13 +1846,9 @@ handle_frame_post_auth(Transport,
1857
1846
" peer" ,
1858
1847
[]),
1859
1848
throw ({stop , normal });
1860
- {{segment , Log1 }, {credit , Credit1 },
1861
- {last_listener_offset , LLO1 }} ->
1862
- ConsumerState1 =
1863
- ConsumerState # consumer {log = Log1 ,
1864
- credit = Credit1 ,
1865
- last_listener_offset
1866
- = LLO1 },
1849
+ {ok ,
1850
+ # consumer {log = Log1 , credit = Credit1 } =
1851
+ ConsumerState1 } ->
1867
1852
Consumers1 =
1868
1853
Consumers #{SubscriptionId =>
1869
1854
ConsumerState1 },
@@ -1945,12 +1930,7 @@ handle_frame_post_auth(Transport,
1945
1930
" peer" ,
1946
1931
[]),
1947
1932
throw ({stop , normal });
1948
- {{segment , Log1 }, {credit , Credit1 },
1949
- {last_listener_offset , LLO1 }} ->
1950
- Consumer1 =
1951
- Consumer # consumer {log = Log1 ,
1952
- credit = Credit1 ,
1953
- last_listener_offset = LLO1 },
1933
+ {ok , Consumer1 } ->
1954
1934
{Connection ,
1955
1935
State # stream_connection_state {consumers =
1956
1936
Consumers #{SubscriptionId
@@ -2721,53 +2701,52 @@ send_file_callback(Transport,
2721
2701
2722
2702
send_chunks (Transport ,
2723
2703
# consumer {credit = Credit , last_listener_offset = LastLstOffset } =
2724
- State ,
2704
+ Consumer ,
2725
2705
Counter ) ->
2726
- send_chunks (Transport , State , Credit , LastLstOffset , Counter ).
2706
+ send_chunks (Transport , Consumer , Credit , LastLstOffset , Counter ).
2727
2707
2728
- send_chunks (_Transport ,
2729
- # consumer {log = Log },
2730
- 0 ,
2731
- LastLstOffset ,
2732
- _Counter ) ->
2733
- {{segment , Log }, {credit , 0 }, {last_listener_offset , LastLstOffset }};
2708
+ send_chunks (_Transport , Consumer , 0 , LastLstOffset , _Counter ) ->
2709
+ {ok ,
2710
+ Consumer # consumer {credit = 0 , last_listener_offset = LastLstOffset }};
2734
2711
send_chunks (Transport ,
2735
- # consumer {log = Log } = State ,
2712
+ # consumer {log = Log } = Consumer ,
2736
2713
Credit ,
2737
2714
LastLstOffset ,
2738
2715
Counter ) ->
2739
2716
send_chunks (Transport ,
2740
- State ,
2717
+ Consumer ,
2741
2718
Log ,
2742
2719
Credit ,
2743
2720
LastLstOffset ,
2744
2721
true ,
2745
2722
Counter ).
2746
2723
2747
2724
send_chunks (_Transport ,
2748
- _State ,
2749
- Segment ,
2725
+ Consumer ,
2726
+ Log ,
2750
2727
0 = _Credit ,
2751
2728
LastLstOffset ,
2752
2729
_Retry ,
2753
2730
_Counter ) ->
2754
- {{segment , Segment }, {credit , 0 },
2755
- {last_listener_offset , LastLstOffset }};
2731
+ {ok ,
2732
+ Consumer # consumer {log = Log ,
2733
+ credit = 0 ,
2734
+ last_listener_offset = LastLstOffset }};
2756
2735
send_chunks (Transport ,
2757
2736
# consumer {configuration = # consumer_configuration {socket = S }} =
2758
- State ,
2759
- Segment ,
2737
+ Consumer ,
2738
+ Log ,
2760
2739
Credit ,
2761
2740
LastLstOffset ,
2762
2741
Retry ,
2763
2742
Counter ) ->
2764
- case osiris_log :send_file (S , Segment ,
2765
- send_file_callback (Transport , State , Counter ))
2743
+ case osiris_log :send_file (S , Log ,
2744
+ send_file_callback (Transport , Consumer , Counter ))
2766
2745
of
2767
- {ok , Segment1 } ->
2746
+ {ok , Log1 } ->
2768
2747
send_chunks (Transport ,
2769
- State ,
2770
- Segment1 ,
2748
+ Consumer ,
2749
+ Log1 ,
2771
2750
Credit - 1 ,
2772
2751
LastLstOffset ,
2773
2752
true ,
@@ -2778,13 +2757,13 @@ send_chunks(Transport,
2778
2757
{error , closed };
2779
2758
{error , Reason } ->
2780
2759
{error , Reason };
2781
- {end_of_stream , Segment1 } ->
2760
+ {end_of_stream , Log1 } ->
2782
2761
case Retry of
2783
2762
true ->
2784
2763
timer :sleep (1 ),
2785
2764
send_chunks (Transport ,
2786
- State ,
2787
- Segment1 ,
2765
+ Consumer ,
2766
+ Log1 ,
2788
2767
Credit ,
2789
2768
LastLstOffset ,
2790
2769
false ,
@@ -2793,8 +2772,8 @@ send_chunks(Transport,
2793
2772
# consumer {configuration =
2794
2773
# consumer_configuration {member_pid =
2795
2774
LocalMember }} =
2796
- State ,
2797
- NextOffset = osiris_log :next_offset (Segment1 ),
2775
+ Consumer ,
2776
+ NextOffset = osiris_log :next_offset (Log1 ),
2798
2777
LLO = case {LastLstOffset , NextOffset > LastLstOffset } of
2799
2778
{undefined , _ } ->
2800
2779
osiris :register_offset_listener (LocalMember ,
@@ -2807,8 +2786,10 @@ send_chunks(Transport,
2807
2786
_ ->
2808
2787
LastLstOffset
2809
2788
end ,
2810
- {{segment , Segment1 }, {credit , Credit },
2811
- {last_listener_offset , LLO }}
2789
+ {ok ,
2790
+ Consumer # consumer {log = Log1 ,
2791
+ credit = Credit ,
2792
+ last_listener_offset = LLO }}
2812
2793
end
2813
2794
end .
2814
2795
0 commit comments