Skip to content

Commit 239e136

Browse files
authored
Merge pull request #3397 from rabbitmq/handle-connection-closures-in-stream-reader
Handle closed connections in stream reader
2 parents 0dafe70 + 3b1714c commit 239e136

File tree

1 file changed

+64
-61
lines changed

1 file changed

+64
-61
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 64 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,6 +1006,10 @@ open(cast,
10061006
Consumer,
10071007
SendFileOct)
10081008
of
1009+
{error, closed} ->
1010+
rabbit_log_connection:info("Stream protocol connection has been closed by peer",
1011+
[]),
1012+
throw({stop, normal});
10091013
{error, Reason} ->
10101014
rabbit_log_connection:info("Error while sending chunks: ~p",
10111015
[Reason]),
@@ -1819,52 +1823,64 @@ handle_frame_post_auth(Transport,
18191823

18201824
rabbit_log:debug("Distributing existing messages to subscription ~p",
18211825
[SubscriptionId]),
1822-
{{segment, Segment1}, {credit, Credit1}} =
1823-
send_chunks(Transport, ConsumerState,
1824-
SendFileOct),
1825-
ConsumerState1 =
1826-
ConsumerState#consumer{segment = Segment1,
1827-
credit = Credit1},
1828-
Consumers1 =
1829-
Consumers#{SubscriptionId => ConsumerState1},
1830-
1831-
StreamSubscriptions1 =
1832-
case StreamSubscriptions of
1833-
#{Stream := SubscriptionIds} ->
1834-
StreamSubscriptions#{Stream =>
1835-
[SubscriptionId]
1836-
++ SubscriptionIds};
1837-
_ ->
1838-
StreamSubscriptions#{Stream =>
1839-
[SubscriptionId]}
1840-
end,
18411826

1842-
#consumer{counters = ConsumerCounters1} =
1843-
ConsumerState1,
1844-
1845-
ConsumerOffset = osiris_log:next_offset(Segment1),
1846-
ConsumerOffsetLag =
1847-
consumer_i(offset_lag, ConsumerState1),
1848-
1849-
rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) "
1850-
"distributed after subscription",
1851-
[SubscriptionId, ConsumerOffset,
1852-
messages_consumed(ConsumerCounters1)]),
1853-
1854-
rabbit_stream_metrics:consumer_created(self(),
1855-
stream_r(Stream,
1856-
Connection1),
1857-
SubscriptionId,
1858-
Credit1,
1859-
messages_consumed(ConsumerCounters1),
1860-
ConsumerOffset,
1861-
ConsumerOffsetLag,
1862-
Properties),
1863-
{Connection1#stream_connection{stream_subscriptions
1864-
=
1865-
StreamSubscriptions1},
1866-
State#stream_connection_state{consumers =
1867-
Consumers1}}
1827+
case send_chunks(Transport, ConsumerState,
1828+
SendFileOct)
1829+
of
1830+
{error, closed} ->
1831+
rabbit_log_connection:info("Stream protocol connection has been closed by peer",
1832+
[]),
1833+
throw({stop, normal});
1834+
{{segment, Segment1}, {credit, Credit1}} ->
1835+
ConsumerState1 =
1836+
ConsumerState#consumer{segment =
1837+
Segment1,
1838+
credit =
1839+
Credit1},
1840+
Consumers1 =
1841+
Consumers#{SubscriptionId =>
1842+
ConsumerState1},
1843+
1844+
StreamSubscriptions1 =
1845+
case StreamSubscriptions of
1846+
#{Stream := SubscriptionIds} ->
1847+
StreamSubscriptions#{Stream =>
1848+
[SubscriptionId]
1849+
++ SubscriptionIds};
1850+
_ ->
1851+
StreamSubscriptions#{Stream =>
1852+
[SubscriptionId]}
1853+
end,
1854+
1855+
#consumer{counters = ConsumerCounters1} =
1856+
ConsumerState1,
1857+
1858+
ConsumerOffset =
1859+
osiris_log:next_offset(Segment1),
1860+
ConsumerOffsetLag =
1861+
consumer_i(offset_lag, ConsumerState1),
1862+
1863+
rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) "
1864+
"distributed after subscription",
1865+
[SubscriptionId,
1866+
ConsumerOffset,
1867+
messages_consumed(ConsumerCounters1)]),
1868+
1869+
rabbit_stream_metrics:consumer_created(self(),
1870+
stream_r(Stream,
1871+
Connection1),
1872+
SubscriptionId,
1873+
Credit1,
1874+
messages_consumed(ConsumerCounters1),
1875+
ConsumerOffset,
1876+
ConsumerOffsetLag,
1877+
Properties),
1878+
{Connection1#stream_connection{stream_subscriptions
1879+
=
1880+
StreamSubscriptions1},
1881+
State#stream_connection_state{consumers =
1882+
Consumers1}}
1883+
end
18681884
end
18691885
end;
18701886
error ->
@@ -1893,22 +1909,9 @@ handle_frame_post_auth(Transport,
18931909
SendFileOct)
18941910
of
18951911
{error, closed} ->
1896-
rabbit_log:warning("Stream protocol connection for subscription ~p has been closed, removing "
1897-
"subscription",
1898-
[SubscriptionId]),
1899-
{Connection1, State1} =
1900-
remove_subscription(SubscriptionId, Connection, State),
1901-
1902-
Code = ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST,
1903-
Frame =
1904-
rabbit_stream_core:frame({response, 1,
1905-
{credit, Code,
1906-
SubscriptionId}}),
1907-
send(Transport, S, Frame),
1908-
rabbit_global_counters:increase_protocol_counter(stream,
1909-
?SUBSCRIPTION_ID_DOES_NOT_EXIST,
1910-
1),
1911-
{Connection1, State1};
1912+
rabbit_log_connection:info("Stream protocol connection has been closed by peer",
1913+
[]),
1914+
throw({stop, normal});
19121915
{{segment, Segment1}, {credit, Credit1}} ->
19131916
Consumer1 =
19141917
Consumer#consumer{segment = Segment1, credit = Credit1},

0 commit comments

Comments
 (0)