@@ -1007,7 +1007,8 @@ open(cast,
1007
1007
SendFileOct )
1008
1008
of
1009
1009
{error , closed } ->
1010
- rabbit_log_connection :info (" Stream protocol connection has been closed by peer" , []),
1010
+ rabbit_log_connection :info (" Stream protocol connection has been closed by peer" ,
1011
+ []),
1011
1012
throw ({stop , normal });
1012
1013
{error , Reason } ->
1013
1014
rabbit_log_connection :info (" Error while sending chunks: ~p " ,
@@ -1823,38 +1824,46 @@ handle_frame_post_auth(Transport,
1823
1824
rabbit_log :debug (" Distributing existing messages to subscription ~p " ,
1824
1825
[SubscriptionId ]),
1825
1826
1826
- case send_chunks (Transport , ConsumerState , SendFileOct ) of
1827
+ case send_chunks (Transport , ConsumerState ,
1828
+ SendFileOct )
1829
+ of
1827
1830
{error , closed } ->
1828
- rabbit_log_connection :info (" Stream protocol connection has been closed by peer" , []),
1831
+ rabbit_log_connection :info (" Stream protocol connection has been closed by peer" ,
1832
+ []),
1829
1833
throw ({stop , normal });
1830
1834
{{segment , Segment1 }, {credit , Credit1 }} ->
1831
1835
ConsumerState1 =
1832
- ConsumerState # consumer {segment = Segment1 ,
1833
- credit = Credit1 },
1836
+ ConsumerState # consumer {segment =
1837
+ Segment1 ,
1838
+ credit =
1839
+ Credit1 },
1834
1840
Consumers1 =
1835
- Consumers #{SubscriptionId => ConsumerState1 },
1841
+ Consumers #{SubscriptionId =>
1842
+ ConsumerState1 },
1836
1843
1837
1844
StreamSubscriptions1 =
1838
- case StreamSubscriptions of
1839
- #{Stream := SubscriptionIds } ->
1840
- StreamSubscriptions #{Stream =>
1841
- [SubscriptionId ]
1842
- ++ SubscriptionIds };
1843
- _ ->
1844
- StreamSubscriptions #{Stream =>
1845
- [SubscriptionId ]}
1846
- end ,
1845
+ case StreamSubscriptions of
1846
+ #{Stream := SubscriptionIds } ->
1847
+ StreamSubscriptions #{Stream =>
1848
+ [SubscriptionId ]
1849
+ ++ SubscriptionIds };
1850
+ _ ->
1851
+ StreamSubscriptions #{Stream =>
1852
+ [SubscriptionId ]}
1853
+ end ,
1847
1854
1848
1855
# consumer {counters = ConsumerCounters1 } =
1849
- ConsumerState1 ,
1856
+ ConsumerState1 ,
1850
1857
1851
- ConsumerOffset = osiris_log :next_offset (Segment1 ),
1858
+ ConsumerOffset =
1859
+ osiris_log :next_offset (Segment1 ),
1852
1860
ConsumerOffsetLag =
1853
- consumer_i (offset_lag , ConsumerState1 ),
1861
+ consumer_i (offset_lag , ConsumerState1 ),
1854
1862
1855
1863
rabbit_log :debug (" Subscription ~p is now at offset ~p with ~p message(s) "
1856
1864
" distributed after subscription" ,
1857
- [SubscriptionId , ConsumerOffset ,
1865
+ [SubscriptionId ,
1866
+ ConsumerOffset ,
1858
1867
messages_consumed (ConsumerCounters1 )]),
1859
1868
1860
1869
rabbit_stream_metrics :consumer_created (self (),
@@ -1867,10 +1876,10 @@ handle_frame_post_auth(Transport,
1867
1876
ConsumerOffsetLag ,
1868
1877
Properties ),
1869
1878
{Connection1 # stream_connection {stream_subscriptions
1870
- =
1871
- StreamSubscriptions1 },
1879
+ =
1880
+ StreamSubscriptions1 },
1872
1881
State # stream_connection_state {consumers =
1873
- Consumers1 }}
1882
+ Consumers1 }}
1874
1883
end
1875
1884
end
1876
1885
end ;
@@ -1900,7 +1909,8 @@ handle_frame_post_auth(Transport,
1900
1909
SendFileOct )
1901
1910
of
1902
1911
{error , closed } ->
1903
- rabbit_log_connection :info (" Stream protocol connection has been closed by peer" , []),
1912
+ rabbit_log_connection :info (" Stream protocol connection has been closed by peer" ,
1913
+ []),
1904
1914
throw ({stop , normal });
1905
1915
{{segment , Segment1 }, {credit , Credit1 }} ->
1906
1916
Consumer1 =
0 commit comments