-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Monitor stream local member in stream queue #4217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Monitor stream local member in stream queue #4217
Conversation
Acceptance StepsConsumer Keeps Consuming When Leader Is RestartedWe will run an AMQP 0.9.1 consumer app on a stream queue, kill the stream leader process, and make sure the application has not stopped consuming. Start Brokercd /tmp
git clone [email protected]:rabbitmq/rabbitmq-server.git rabbitmq-server-4133
cd rabbitmq-server-4133
git checkout rabbitmq-server-4133-monitor-stream-local-member
make run-broker PLUGINS="" Start Client ApplicationThis steps requires JBang to be installed. cd /tmp
git clone [email protected]:acogoluegnes/sandbox.git
jbang /tmp/sandbox/src/main/java/StreamQueueMonitorStreamLocalMember4133.java The application publishes a message every second, it also has a consumer that outputs something for each message:
Restart the Stream Leader ProcessWe need to locate the PID of the stream leader process. Dump the state of the stream coordinator:
Output: {rabbit_stream_coordinator,
#{"__sq_1646746859112959129" =>
{stream,"__sq_1646746859112959129",2,
{resource,<<"/">>,queue,<<"sq">>},
#{epoch => 1,
event_formatter =>
{rabbit_stream_queue,format_osiris_event,
[{resource,<<"/">>,queue,<<"sq">>}]},
leader_locator_strategy => <<"client-local">>,
leader_node => 'rabbit@acogoluegnes-nuc',
name => "__sq_1646746859112959129",
nodes => ['rabbit@acogoluegnes-nuc'],
reference => {resource,<<"/">>,queue,<<"sq">>},
replica_nodes => [],retention => []},
['rabbit@acogoluegnes-nuc'],
#{'rabbit@acogoluegnes-nuc' =>
{member,
{running,2,<11591.1008.0>},
{writer,2},
'rabbit@acogoluegnes-nuc',undefined,
#{epoch => 2,
event_formatter =>
{rabbit_stream_queue,format_osiris_event,
[{resource,<<"/">>,queue,<<"sq">>}]},
leader_locator_strategy => <<"client-local">>,
leader_node => 'rabbit@acogoluegnes-nuc',
name => "__sq_1646746859112959129",
nodes => ['rabbit@acogoluegnes-nuc'],
reference => {resource,<<"/">>,queue,<<"sq">>},
replica_nodes => [],retention => []},
running}},
#{{<11591.1878.0>,leader} => <11591.1008.0>,
{<11591.1883.0>,leader} => <11591.1008.0>,
{<11591.1883.0>,member} =>
{'rabbit@acogoluegnes-nuc',<11591.1008.0>}},
undefined,
{updated,2},
running}},
#{<11591.1008.0> => {"__sq_1646746859112959129",member},
<11591.1878.0> => {#{"__sq_1646746859112959129" => ok},listener},
<11591.1883.0> => {#{"__sq_1646746859112959129" => ok},listener}},
#{},undefined,undefined} Locate the stream leader process, in our case it is {running,2,<11591.1008.0>} Make the PID a local one by replacing the first part by 0, in our case /tmp/rabbitmq-server-4133/sbin/rabbitmqctl eval 'exit(list_to_pid("<0.1008.0>"), kill).' If you dump the stream coordinator state again, the stream leader PID should have changed:
If you go back to the client application tab, it should still publish and consume, it should not have stopped. Stop the client application (Ctrl+C) and the broker. Cluster UpgradeThe PR changes the structure of the stream coordinator state, this implies a machine version upgrade. The following steps will check that a 3-node cluster can be upgraded. Create a generic Unix package from the branch and install it: make -C /tmp/rabbitmq-server-4133 package-generic-unix
mkdir /tmp/rabbitmq_server_4133
tar -xf /tmp/rabbitmq-server-4133/PACKAGES/rabbitmq-server-generic-unix-*.tar.xz --directory /tmp/rabbitmq_server_4133 --strip-components 1
echo "log.file.level = debug" > /tmp/rabbitmq_server_4133/etc/rabbitmq/rabbitmq.conf Get the generic Unix package of the latest 3.10 alpha and extract it (change mkdir /tmp/rabbitmq_server_310
tar -xf rabbitmq-server-generic-unix-3.10.0-alpha.[ALPHA].tar.xz --directory rabbitmq_server_310 --strip-components 1
echo "log.file.level = debug" > /tmp/rabbitmq_server_310/etc/rabbitmq/rabbitmq.conf
mkdir /tmp/rabbitmq-4133-mnesia-base Open 3 different tabs (one for each node) and launch the following commands. TAB 1: export RABBITMQ_NODENAME="rabbit1"
export RABBITMQ_NODE_PORT=5672
export RABBITMQ_DIST_PORT=25672
export RABBITMQ_MNESIA_BASE=/tmp/rabbitmq-4133-mnesia-base
alias rctl="/tmp/rabbitmq_server_310/sbin/rabbitmqctl" TAB 2: export RABBITMQ_NODENAME="rabbit2"
export RABBITMQ_NODE_PORT=5673
export RABBITMQ_DIST_PORT=25673
export RABBITMQ_MNESIA_BASE=/tmp/rabbitmq-4133-mnesia-base
alias rctl="/tmp/rabbitmq_server_310/sbin/rabbitmqctl" TAB 3: export RABBITMQ_NODENAME="rabbit3"
export RABBITMQ_NODE_PORT=5674
export RABBITMQ_DIST_PORT=25674
export RABBITMQ_MNESIA_BASE=/tmp/rabbitmq-4133-mnesia-base
alias rctl="/tmp/rabbitmq_server_310/sbin/rabbitmqctl" Now start each node. On each tab: /tmp/rabbitmq_server_310/sbin/rabbitmq-server -detached Form the cluster: TAB 2 & 3: rctl stop_app
rctl join_cluster rabbit1@$(hostname -s)
rctl start_app Make sure the 3 nodes joined the cluster. ANY TAB: rctl cluster_status Start the client application (requires the ANY TAB: jbang /tmp/sandbox/src/main/java/StreamQueueMonitorStreamLocalMember4133.java Stop the client after a few messages (Ctrl+C). Check the members of the stream coordinator. ANY TAB: rctl eval 'ra:members(rabbit_stream_coordinator).' It should output something like the following (the 3 members and the leader): {ok,[{rabbit_stream_coordinator,'rabbit1@acogoluegnes-nuc'},
{rabbit_stream_coordinator,'rabbit2@acogoluegnes-nuc'},
{rabbit_stream_coordinator,'rabbit3@acogoluegnes-nuc'}],
{rabbit_stream_coordinator,'rabbit1@acogoluegnes-nuc'}} We're going now to upgrade the nodes. The first node. TAB 1
Make sure the node joined the cluster again. ANY TAB: rctl cluster_status The second node. TAB 2
Make sure the node joined the cluster again. ANY TAB: rctl cluster_status Start the client application again. TAB 1: jbang /tmp/sandbox/src/main/java/StreamQueueMonitorStreamLocalMember4133.java Check the state of the stream coordinator. TAB 2: rctl eval 'rabbit_stream_coordinator:state().' The output should be like the following: {rabbit_stream_coordinator,
#{"__sq_1646920629979595000" =>
{stream,"__sq_1646920629979595000",3,
{resource,<<"/">>,queue,<<"sq">>},
#{epoch => 1,
event_formatter =>
{rabbit_stream_queue,format_osiris_event,
[{resource,<<"/">>,queue,<<"sq">>}]},
leader_locator_strategy => <<"client-local">>,
leader_node => 'rabbit1@acogoluegne-a01',
name => "__sq_1646920629979595000",
nodes =>
['rabbit1@acogoluegne-a01','rabbit3@acogoluegne-a01',
'rabbit2@acogoluegne-a01'],
reference => {resource,<<"/">>,queue,<<"sq">>},
replica_nodes =>
['rabbit3@acogoluegne-a01','rabbit2@acogoluegne-a01'],
retention => []},
['rabbit1@acogoluegne-a01','rabbit3@acogoluegne-a01',
'rabbit2@acogoluegne-a01'],
...
#{<12133.734.0> => <12133.624.0>,
<12133.739.0> => <12133.624.0>,
<12133.1196.0> => <12133.624.0>,
<12133.1221.0> => <12133.624.0>},
undefined,
{updated,3},
running}},
#{<11582.585.0> => {"__sq_1646920629979595000",member},
<12133.624.0> => {"__sq_1646920629979595000",member},
<12133.734.0> => {"__sq_1646920629979595000",listener},
<12133.739.0> => {"__sq_1646920629979595000",listener},
<12134.1614.0> => {"__sq_1646920629979595000",member}},
#{},undefined,undefined} In our case, a map of listeners contain terminated PIDs that should have been removed ( Upgrade the third node. TAB 3
Make sure the node joined the cluster again. ANY TAB: rctl cluster_status Dump the coordinator state. ANY TAB: rctl eval 'rabbit_stream_coordinator:state().' The output should something like this: {rabbit_stream_coordinator,
#{"__sq_1646920629979595000" =>
{stream,"__sq_1646920629979595000",3,
{resource,<<"/">>,queue,<<"sq">>},
#{epoch => 1,
event_formatter =>
{rabbit_stream_queue,format_osiris_event,
[{resource,<<"/">>,queue,<<"sq">>}]},
leader_locator_strategy => <<"client-local">>,
leader_node => 'rabbit1@acogoluegne-a01',
name => "__sq_1646920629979595000",
nodes =>
['rabbit1@acogoluegne-a01','rabbit3@acogoluegne-a01',
'rabbit2@acogoluegne-a01'],
reference => {resource,<<"/">>,queue,<<"sq">>},
replica_nodes =>
['rabbit3@acogoluegne-a01','rabbit2@acogoluegne-a01'],
retention => []},
['rabbit1@acogoluegne-a01','rabbit3@acogoluegne-a01',
'rabbit2@acogoluegne-a01'],
...
#{{<12133.734.0>,leader} => <12133.624.0>,
{<12133.739.0>,leader} => <12133.624.0>},
undefined,
{updated,3},
running}},
#{<12134.585.0> => {"__sq_1646920629979595000",member},
<11582.592.0> => {"__sq_1646920629979595000",member},
<12133.624.0> => {"__sq_1646920629979595000",member},
<12133.734.0> => {#{"__sq_1646920629979595000" => ok},listener},
<12133.739.0> => {#{"__sq_1646920629979595000" => ok},listener}},
undefined,undefined,undefined}
The listeners should have been converted to the new representation: #{{<12133.734.0>,leader} => <12133.624.0>,
{<12133.739.0>,leader} => <12133.624.0>}, The stale PIDs should have been collected. Even though the client application was launched when 2 nodes were on the last version, the stream coordinator was still running the previous version of the state machine. Let's stop and restart the client application and check out the coordinator state.
ANY TAB: jbang /tmp/sandbox/src/main/java/StreamQueueMonitorStreamLocalMember4133.java Dump the coordinator state. ANY TAB: rctl eval 'rabbit_stream_coordinator:state().' {rabbit_stream_coordinator,
#{"__sq_1646920629979595000" =>
{stream,"__sq_1646920629979595000",3,
{resource,<<"/">>,queue,<<"sq">>},
#{epoch => 1,
event_formatter =>
{rabbit_stream_queue,format_osiris_event,
[{resource,<<"/">>,queue,<<"sq">>}]},
leader_locator_strategy => <<"client-local">>,
leader_node => 'rabbit1@acogoluegne-a01',
name => "__sq_1646920629979595000",
nodes =>
['rabbit1@acogoluegne-a01','rabbit3@acogoluegne-a01',
'rabbit2@acogoluegne-a01'],
reference => {resource,<<"/">>,queue,<<"sq">>},
replica_nodes =>
['rabbit3@acogoluegne-a01','rabbit2@acogoluegne-a01'],
retention => []},
['rabbit1@acogoluegne-a01','rabbit3@acogoluegne-a01',
'rabbit2@acogoluegne-a01'],
...
#{{<12133.1586.0>,leader} => <12133.624.0>,
{<12133.1591.0>,leader} => <12133.624.0>,
{<12133.1591.0>,member} =>
{'rabbit1@acogoluegne-a01',<12133.624.0>}},
undefined,
{updated,3},
running}},
#{<11582.585.0> => {"__sq_1646920629979595000",member},
<12134.592.0> => {"__sq_1646920629979595000",member},
<12133.624.0> => {"__sq_1646920629979595000",member},
<12133.1586.0> => {#{"__sq_1646920629979595000" => ok},listener},
<12133.1591.0> => {#{"__sq_1646920629979595000" => ok},listener}},
undefined,undefined,undefined} Note the map of listeners, it has now a #{{<12133.1586.0>,leader} => <12133.624.0>,
{<12133.1591.0>,leader} => <12133.624.0>,
{<12133.1591.0>,member} => {'rabbit1@acogoluegne-a01',<12133.624.0>}} Stop the client application. Shut down the cluster. On each tab: rctl stop Delete the directories: cd /tmp
rm -rf /tmp/rabbitmq-4133-mnesia-base /tmp/rabbitmq-server-4133 /tmp/rabbitmq_server_310 /tmp/rabbitmq_server_4133 /tmp/sandbox |
ae7bcdc
to
4f7b7c8
Compare
This commit refactors the stream coordinator listener mechanism to support listeners that get notified when a stream member on the same node of the listening process gets restarted. Local member listeners are useful for AMQP consumers. The change requires a state machine version bump as it changes the state of the stream coordinator. This will be done in a subsequent commit. References #4133 (cherry picked from commit 3db593f8b02e17c23fe6e188ec775961b6f7cf94)
Adapt it to the new structure of stream listener. References #4133
This commit re-creates an Osiris reader when the local member of the stream cluster changes. Consumers can then restart dispatching messages where they left off instead of hanging. References #4133
The stream coordinator have the most up-to-date information. References #4133
Version bump necessary because of the state changes to handle local member listeners. References #4133
These 2 tests can fail in mixed-version, 2-node cluster testing. If the stream coordinator leader ends up on the lower version, it does not contain the fixes and the tests fail. With this commit, the 2 tests are skipped under the appropriate conditions. References #4133
These tests requires a level of availability that mixed-version clusters cannot provide, so they are skipped under these conditions. References #4133
4f7b7c8
to
e6a2670
Compare
Unlikely to pass as it randomly stops a node, which can be the stream coordinator leader. The 2 remaining nodes then cannot elect a new leader because they don't have the same version. References #4133
In stream coordinator. A v2 node, even if running v1, can call the function to register a local member when an AMQP listener is registered, so this commit makes such a call a no-op. References #4133
Monitors were not converted. References #4133
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks generally good. A few formatting issues and a couple of suggestions.
listeners = #{} :: #{pid() := LeaderPid :: pid()}, | ||
listeners = #{} :: #{pid() | %% v0 & v1 | ||
{pid(), leader | member} %% v2 | ||
:= LeaderPid :: pid()} | {node(), LocalPid :: pid()}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why include a node()
here? wont the node always be the one of the listener pid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All acceptance steps works as expected, although I had to tweak mkdir
to use mkdir -p
here and there.
@acogoluegnes I assumed that most feedback above (seems to be cosmetics and minor code style tweaks) is going to be addressed and focused on the functional part in my review. |
@kjnilsson ready for another round |
(cherry picked from commit 5692272)
(cherry picked from commit d484e04)
(cherry picked from commit 46662ec)
Monitor stream local member in stream queue (backport #4217)
This PR restarts an Osiris reader if the local member of a stream queue consumer restarts. This situation can happen during cluster upgrades, when stream leader move between nodes, which also implies restarting the processes of stream replicas.
To react to local member changes, this PR introduces a new "local member" listener type to the stream coordinator. This requires changing the structure of the state and a machine version bump. The new version of the stream coordinator is now 2.
The PR takes care of migrating the v1 state to the v2 state in the appropriate state machine callback.
It also removes the stream listeners data properly when the listener processes go away, which was not done previously.
Fixes #4133