|
41 | 41 | counters :: atomics:atomics_ref(),
|
42 | 42 | properties :: map()}).
|
43 | 43 | -record(consumer,
|
44 |
| - {configuration :: #consumer_configuration{}, credit :: integer(), |
45 |
| - log :: osiris_log:state()}). |
| 44 | + {configuration :: #consumer_configuration{}, |
| 45 | + credit :: non_neg_integer(), |
| 46 | + log :: osiris_log:state(), |
| 47 | + last_listener_offset = undefined :: undefined | osiris:offset()}). |
46 | 48 | -record(stream_connection_state,
|
47 | 49 | {data :: rabbit_stream_core:state(), blocked :: boolean(),
|
48 | 50 | consumers :: #{subscription_id() => #consumer{}}}).
|
@@ -1016,13 +1018,7 @@ open(cast,
|
1016 | 1018 | [Reason]),
|
1017 | 1019 | %% likely a connection problem
|
1018 | 1020 | Consumer;
|
1019 |
| - {{segment, Log1}, |
1020 |
| - {credit, Credit1}} -> |
1021 |
| - Consumer#consumer{log = |
1022 |
| - Log1, |
1023 |
| - credit |
1024 |
| - = |
1025 |
| - Credit1} |
| 1021 | + {ok, Csmr} -> Csmr |
1026 | 1022 | end
|
1027 | 1023 | end,
|
1028 | 1024 | ConsumersAcc#{CorrelationId => Consumer1}
|
@@ -1850,11 +1846,9 @@ handle_frame_post_auth(Transport,
|
1850 | 1846 | "peer",
|
1851 | 1847 | []),
|
1852 | 1848 | throw({stop, normal});
|
1853 |
| - {{segment, Log1}, {credit, Credit1}} -> |
1854 |
| - ConsumerState1 = |
1855 |
| - ConsumerState#consumer{log = Log1, |
1856 |
| - credit = |
1857 |
| - Credit1}, |
| 1849 | + {ok, |
| 1850 | + #consumer{log = Log1, credit = Credit1} = |
| 1851 | + ConsumerState1} -> |
1858 | 1852 | Consumers1 =
|
1859 | 1853 | Consumers#{SubscriptionId =>
|
1860 | 1854 | ConsumerState1},
|
@@ -1923,19 +1917,20 @@ handle_frame_post_auth(Transport,
|
1923 | 1917 | {credit, SubscriptionId, Credit}) ->
|
1924 | 1918 | case Consumers of
|
1925 | 1919 | #{SubscriptionId := Consumer} ->
|
1926 |
| - #consumer{credit = AvailableCredit} = Consumer, |
| 1920 | + #consumer{credit = AvailableCredit, last_listener_offset = LLO} = |
| 1921 | + Consumer, |
1927 | 1922 | case send_chunks(Transport,
|
1928 | 1923 | Consumer,
|
1929 | 1924 | AvailableCredit + Credit,
|
| 1925 | + LLO, |
1930 | 1926 | SendFileOct)
|
1931 | 1927 | of
|
1932 | 1928 | {error, closed} ->
|
1933 | 1929 | rabbit_log_connection:info("Stream protocol connection has been closed by "
|
1934 | 1930 | "peer",
|
1935 | 1931 | []),
|
1936 | 1932 | throw({stop, normal});
|
1937 |
| - {{segment, Log1}, {credit, Credit1}} -> |
1938 |
| - Consumer1 = Consumer#consumer{log = Log1, credit = Credit1}, |
| 1933 | + {ok, Consumer1} -> |
1939 | 1934 | {Connection,
|
1940 | 1935 | State#stream_connection_state{consumers =
|
1941 | 1936 | Consumers#{SubscriptionId
|
@@ -2704,60 +2699,97 @@ send_file_callback(Transport,
|
2704 | 2699 | set_consumer_offset(Counters, FirstOffsetInChunk)
|
2705 | 2700 | end.
|
2706 | 2701 |
|
2707 |
| -send_chunks(Transport, #consumer{credit = Credit} = State, Counter) -> |
2708 |
| - send_chunks(Transport, State, Credit, Counter). |
| 2702 | +send_chunks(Transport, |
| 2703 | + #consumer{credit = Credit, last_listener_offset = LastLstOffset} = |
| 2704 | + Consumer, |
| 2705 | + Counter) -> |
| 2706 | + send_chunks(Transport, Consumer, Credit, LastLstOffset, Counter). |
2709 | 2707 |
|
2710 |
| -send_chunks(_Transport, #consumer{log = Log}, 0, _Counter) -> |
2711 |
| - {{segment, Log}, {credit, 0}}; |
| 2708 | +send_chunks(_Transport, Consumer, 0, LastLstOffset, _Counter) -> |
| 2709 | + {ok, |
| 2710 | + Consumer#consumer{credit = 0, last_listener_offset = LastLstOffset}}; |
2712 | 2711 | send_chunks(Transport,
|
2713 |
| - #consumer{log = Log} = State, |
| 2712 | + #consumer{log = Log} = Consumer, |
2714 | 2713 | Credit,
|
| 2714 | + LastLstOffset, |
2715 | 2715 | Counter) ->
|
2716 |
| - send_chunks(Transport, State, Log, Credit, true, Counter). |
| 2716 | + send_chunks(Transport, |
| 2717 | + Consumer, |
| 2718 | + Log, |
| 2719 | + Credit, |
| 2720 | + LastLstOffset, |
| 2721 | + true, |
| 2722 | + Counter). |
2717 | 2723 |
|
2718 | 2724 | send_chunks(_Transport,
|
2719 |
| - _State, |
2720 |
| - Segment, |
| 2725 | + Consumer, |
| 2726 | + Log, |
2721 | 2727 | 0 = _Credit,
|
| 2728 | + LastLstOffset, |
2722 | 2729 | _Retry,
|
2723 | 2730 | _Counter) ->
|
2724 |
| - {{segment, Segment}, {credit, 0}}; |
| 2731 | + {ok, |
| 2732 | + Consumer#consumer{log = Log, |
| 2733 | + credit = 0, |
| 2734 | + last_listener_offset = LastLstOffset}}; |
2725 | 2735 | send_chunks(Transport,
|
2726 | 2736 | #consumer{configuration = #consumer_configuration{socket = S}} =
|
2727 |
| - State, |
2728 |
| - Segment, |
| 2737 | + Consumer, |
| 2738 | + Log, |
2729 | 2739 | Credit,
|
| 2740 | + LastLstOffset, |
2730 | 2741 | Retry,
|
2731 | 2742 | Counter) ->
|
2732 |
| - case osiris_log:send_file(S, Segment, |
2733 |
| - send_file_callback(Transport, State, Counter)) |
| 2743 | + case osiris_log:send_file(S, Log, |
| 2744 | + send_file_callback(Transport, Consumer, Counter)) |
2734 | 2745 | of
|
2735 |
| - {ok, Segment1} -> |
2736 |
| - send_chunks(Transport, State, Segment1, Credit - 1, true, Counter); |
| 2746 | + {ok, Log1} -> |
| 2747 | + send_chunks(Transport, |
| 2748 | + Consumer, |
| 2749 | + Log1, |
| 2750 | + Credit - 1, |
| 2751 | + LastLstOffset, |
| 2752 | + true, |
| 2753 | + Counter); |
2737 | 2754 | {error, closed} ->
|
2738 | 2755 | {error, closed};
|
2739 | 2756 | {error, enotconn} ->
|
2740 | 2757 | {error, closed};
|
2741 | 2758 | {error, Reason} ->
|
2742 | 2759 | {error, Reason};
|
2743 |
| - {end_of_stream, Segment1} -> |
| 2760 | + {end_of_stream, Log1} -> |
2744 | 2761 | case Retry of
|
2745 | 2762 | true ->
|
2746 | 2763 | timer:sleep(1),
|
2747 | 2764 | send_chunks(Transport,
|
2748 |
| - State, |
2749 |
| - Segment1, |
| 2765 | + Consumer, |
| 2766 | + Log1, |
2750 | 2767 | Credit,
|
| 2768 | + LastLstOffset, |
2751 | 2769 | false,
|
2752 | 2770 | Counter);
|
2753 | 2771 | false ->
|
2754 | 2772 | #consumer{configuration =
|
2755 | 2773 | #consumer_configuration{member_pid =
|
2756 | 2774 | LocalMember}} =
|
2757 |
| - State, |
2758 |
| - osiris:register_offset_listener(LocalMember, |
2759 |
| - osiris_log:next_offset(Segment1)), |
2760 |
| - {{segment, Segment1}, {credit, Credit}} |
| 2775 | + Consumer, |
| 2776 | + NextOffset = osiris_log:next_offset(Log1), |
| 2777 | + LLO = case {LastLstOffset, NextOffset > LastLstOffset} of |
| 2778 | + {undefined, _} -> |
| 2779 | + osiris:register_offset_listener(LocalMember, |
| 2780 | + NextOffset), |
| 2781 | + NextOffset; |
| 2782 | + {_, true} -> |
| 2783 | + osiris:register_offset_listener(LocalMember, |
| 2784 | + NextOffset), |
| 2785 | + NextOffset; |
| 2786 | + _ -> |
| 2787 | + LastLstOffset |
| 2788 | + end, |
| 2789 | + {ok, |
| 2790 | + Consumer#consumer{log = Log1, |
| 2791 | + credit = Credit, |
| 2792 | + last_listener_offset = LLO}} |
2761 | 2793 | end
|
2762 | 2794 | end.
|
2763 | 2795 |
|
|
0 commit comments