Skip to content

Commit d6301a3

Browse files
committed
Handle closed connections in stream reader
and throw and stop gracefully.
1 parent 3513fa0 commit d6301a3

File tree

1 file changed

+52
-59
lines changed

1 file changed

+52
-59
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 52 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,6 +1006,9 @@ open(cast,
10061006
Consumer,
10071007
SendFileOct)
10081008
of
1009+
{error, closed} ->
1010+
rabbit_log:info("Stream protocol connection has been closed by peer", []),
1011+
throw({stop, connection_closed});
10091012
{error, Reason} ->
10101013
rabbit_log_connection:info("Error while sending chunks: ~p",
10111014
[Reason]),
@@ -1819,52 +1822,56 @@ handle_frame_post_auth(Transport,
18191822

18201823
rabbit_log:debug("Distributing existing messages to subscription ~p",
18211824
[SubscriptionId]),
1822-
{{segment, Segment1}, {credit, Credit1}} =
1823-
send_chunks(Transport, ConsumerState,
1824-
SendFileOct),
1825-
ConsumerState1 =
1826-
ConsumerState#consumer{segment = Segment1,
1827-
credit = Credit1},
1828-
Consumers1 =
1829-
Consumers#{SubscriptionId => ConsumerState1},
1830-
1831-
StreamSubscriptions1 =
1832-
case StreamSubscriptions of
1833-
#{Stream := SubscriptionIds} ->
1834-
StreamSubscriptions#{Stream =>
1825+
1826+
case send_chunks(Transport, ConsumerState, SendFileOct) of
1827+
{error, closed} ->
1828+
rabbit_log:info("Stream protocol connection has been closed by peer", []),
1829+
throw({stop, connection_closed});
1830+
{{segment, Segment1}, {credit, Credit1}} ->
1831+
ConsumerState1 =
1832+
ConsumerState#consumer{segment = Segment1,
1833+
credit = Credit1},
1834+
Consumers1 =
1835+
Consumers#{SubscriptionId => ConsumerState1},
1836+
1837+
StreamSubscriptions1 =
1838+
case StreamSubscriptions of
1839+
#{Stream := SubscriptionIds} ->
1840+
StreamSubscriptions#{Stream =>
18351841
[SubscriptionId]
18361842
++ SubscriptionIds};
1837-
_ ->
1838-
StreamSubscriptions#{Stream =>
1843+
_ ->
1844+
StreamSubscriptions#{Stream =>
18391845
[SubscriptionId]}
1840-
end,
1841-
1842-
#consumer{counters = ConsumerCounters1} =
1843-
ConsumerState1,
1844-
1845-
ConsumerOffset = osiris_log:next_offset(Segment1),
1846-
ConsumerOffsetLag =
1847-
consumer_i(offset_lag, ConsumerState1),
1848-
1849-
rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) "
1850-
"distributed after subscription",
1851-
[SubscriptionId, ConsumerOffset,
1852-
messages_consumed(ConsumerCounters1)]),
1853-
1854-
rabbit_stream_metrics:consumer_created(self(),
1855-
stream_r(Stream,
1856-
Connection1),
1857-
SubscriptionId,
1858-
Credit1,
1859-
messages_consumed(ConsumerCounters1),
1860-
ConsumerOffset,
1861-
ConsumerOffsetLag,
1862-
Properties),
1863-
{Connection1#stream_connection{stream_subscriptions
1864-
=
1865-
StreamSubscriptions1},
1866-
State#stream_connection_state{consumers =
1867-
Consumers1}}
1846+
end,
1847+
1848+
#consumer{counters = ConsumerCounters1} =
1849+
ConsumerState1,
1850+
1851+
ConsumerOffset = osiris_log:next_offset(Segment1),
1852+
ConsumerOffsetLag =
1853+
consumer_i(offset_lag, ConsumerState1),
1854+
1855+
rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) "
1856+
"distributed after subscription",
1857+
[SubscriptionId, ConsumerOffset,
1858+
messages_consumed(ConsumerCounters1)]),
1859+
1860+
rabbit_stream_metrics:consumer_created(self(),
1861+
stream_r(Stream,
1862+
Connection1),
1863+
SubscriptionId,
1864+
Credit1,
1865+
messages_consumed(ConsumerCounters1),
1866+
ConsumerOffset,
1867+
ConsumerOffsetLag,
1868+
Properties),
1869+
{Connection1#stream_connection{stream_subscriptions
1870+
=
1871+
StreamSubscriptions1},
1872+
State#stream_connection_state{consumers =
1873+
Consumers1}}
1874+
end
18681875
end
18691876
end;
18701877
error ->
@@ -1893,22 +1900,8 @@ handle_frame_post_auth(Transport,
18931900
SendFileOct)
18941901
of
18951902
{error, closed} ->
1896-
rabbit_log:warning("Stream protocol connection for subscription ~p has been closed, removing "
1897-
"subscription",
1898-
[SubscriptionId]),
1899-
{Connection1, State1} =
1900-
remove_subscription(SubscriptionId, Connection, State),
1901-
1902-
Code = ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST,
1903-
Frame =
1904-
rabbit_stream_core:frame({response, 1,
1905-
{credit, Code,
1906-
SubscriptionId}}),
1907-
send(Transport, S, Frame),
1908-
rabbit_global_counters:increase_protocol_counter(stream,
1909-
?SUBSCRIPTION_ID_DOES_NOT_EXIST,
1910-
1),
1911-
{Connection1, State1};
1903+
rabbit_log:warning("Stream protocol connection has been closed by peer", []),
1904+
throw({stop, connection_closed});
19121905
{{segment, Segment1}, {credit, Credit1}} ->
19131906
Consumer1 =
19141907
Consumer#consumer{segment = Segment1, credit = Credit1},

0 commit comments

Comments
 (0)