Skip to content

Monitor stream local member in stream queue #4217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

acogoluegnes
Copy link
Contributor

@acogoluegnes acogoluegnes commented Mar 2, 2022

This PR restarts an Osiris reader if the local member of a stream queue consumer restarts. This situation can happen during cluster upgrades, when stream leader move between nodes, which also implies restarting the processes of stream replicas.

To react to local member changes, this PR introduces a new "local member" listener type to the stream coordinator. This requires changing the structure of the state and a machine version bump. The new version of the stream coordinator is now 2.

The PR takes care of migrating the v1 state to the v2 state in the appropriate state machine callback.

It also removes the stream listeners data properly when the listener processes go away, which was not done previously.

Fixes #4133

@acogoluegnes
Copy link
Contributor Author

acogoluegnes commented Mar 8, 2022

Acceptance Steps

Consumer Keeps Consuming When Leader Is Restarted

We will run an AMQP 0.9.1 consumer app on a stream queue, kill the stream leader process, and make sure the application has not stopped consuming.

Start Broker

cd /tmp
git clone [email protected]:rabbitmq/rabbitmq-server.git rabbitmq-server-4133
cd rabbitmq-server-4133
git checkout rabbitmq-server-4133-monitor-stream-local-member
make run-broker PLUGINS=""

Start Client Application

This steps requires JBang to be installed.

cd /tmp
git clone [email protected]:acogoluegnes/sandbox.git
jbang /tmp/sandbox/src/main/java/StreamQueueMonitorStreamLocalMember4133.java

The application publishes a message every second, it also has a consumer that outputs something for each message:

Received message (offset 1)...
Received message (offset 2)...
Received message (offset 3)...
Received message (offset 4)...
Received message (offset 5)...
...

Restart the Stream Leader Process

We need to locate the PID of the stream leader process. Dump the state of the stream coordinator:

/tmp/rabbitmq-server-4133/sbin/rabbitmqctl eval 'rabbit_stream_coordinator:state().'

Output:

{rabbit_stream_coordinator,
    #{"__sq_1646746859112959129" =>
          {stream,"__sq_1646746859112959129",2,
              {resource,<<"/">>,queue,<<"sq">>},
              #{epoch => 1,
                event_formatter =>
                    {rabbit_stream_queue,format_osiris_event,
                        [{resource,<<"/">>,queue,<<"sq">>}]},
                leader_locator_strategy => <<"client-local">>,
                leader_node => 'rabbit@acogoluegnes-nuc',
                name => "__sq_1646746859112959129",
                nodes => ['rabbit@acogoluegnes-nuc'],
                reference => {resource,<<"/">>,queue,<<"sq">>},
                replica_nodes => [],retention => []},
              ['rabbit@acogoluegnes-nuc'],
              #{'rabbit@acogoluegnes-nuc' =>
                    {member,
                        {running,2,<11591.1008.0>},
                        {writer,2},
                        'rabbit@acogoluegnes-nuc',undefined,
                        #{epoch => 2,
                          event_formatter =>
                              {rabbit_stream_queue,format_osiris_event,
                                  [{resource,<<"/">>,queue,<<"sq">>}]},
                          leader_locator_strategy => <<"client-local">>,
                          leader_node => 'rabbit@acogoluegnes-nuc',
                          name => "__sq_1646746859112959129",
                          nodes => ['rabbit@acogoluegnes-nuc'],
                          reference => {resource,<<"/">>,queue,<<"sq">>},
                          replica_nodes => [],retention => []},
                        running}},
              #{{<11591.1878.0>,leader} => <11591.1008.0>,
                {<11591.1883.0>,leader} => <11591.1008.0>,
                {<11591.1883.0>,member} =>
                    {'rabbit@acogoluegnes-nuc',<11591.1008.0>}},
              undefined,
              {updated,2},
              running}},
    #{<11591.1008.0> => {"__sq_1646746859112959129",member},
      <11591.1878.0> => {#{"__sq_1646746859112959129" => ok},listener},
      <11591.1883.0> => {#{"__sq_1646746859112959129" => ok},listener}},
    #{},undefined,undefined}

Locate the stream leader process, in our case it is <11591.1008.0>:

{running,2,<11591.1008.0>}

Make the PID a local one by replacing the first part by 0, in our case <11591.1008.0> becomes <0.1008.0>. Then run the following command with the local PID to kill the leader process:

/tmp/rabbitmq-server-4133/sbin/rabbitmqctl eval 'exit(list_to_pid("<0.1008.0>"), kill).'

If you dump the stream coordinator state again, the stream leader PID should have changed:

/tmp/rabbitmq-server-4133/sbin/rabbitmqctl eval 'rabbit_stream_coordinator:state().'

If you go back to the client application tab, it should still publish and consume, it should not have stopped.

Stop the client application (Ctrl+C) and the broker.

Cluster Upgrade

The PR changes the structure of the stream coordinator state, this implies a machine version upgrade. The following steps will check that a 3-node cluster can be upgraded.

Create a generic Unix package from the branch and install it:

make -C /tmp/rabbitmq-server-4133 package-generic-unix
mkdir /tmp/rabbitmq_server_4133
tar -xf /tmp/rabbitmq-server-4133/PACKAGES/rabbitmq-server-generic-unix-*.tar.xz --directory /tmp/rabbitmq_server_4133 --strip-components 1
echo "log.file.level = debug" > /tmp/rabbitmq_server_4133/etc/rabbitmq/rabbitmq.conf

Get the generic Unix package of the latest 3.10 alpha and extract it (change [ALPHA] to the appropriate value):

mkdir /tmp/rabbitmq_server_310
tar -xf rabbitmq-server-generic-unix-3.10.0-alpha.[ALPHA].tar.xz --directory rabbitmq_server_310 --strip-components 1
echo "log.file.level = debug" > /tmp/rabbitmq_server_310/etc/rabbitmq/rabbitmq.conf
mkdir /tmp/rabbitmq-4133-mnesia-base

Open 3 different tabs (one for each node) and launch the following commands.

TAB 1:

export RABBITMQ_NODENAME="rabbit1"
export RABBITMQ_NODE_PORT=5672
export RABBITMQ_DIST_PORT=25672
export RABBITMQ_MNESIA_BASE=/tmp/rabbitmq-4133-mnesia-base
alias rctl="/tmp/rabbitmq_server_310/sbin/rabbitmqctl"

TAB 2:

export RABBITMQ_NODENAME="rabbit2"
export RABBITMQ_NODE_PORT=5673
export RABBITMQ_DIST_PORT=25673
export RABBITMQ_MNESIA_BASE=/tmp/rabbitmq-4133-mnesia-base
alias rctl="/tmp/rabbitmq_server_310/sbin/rabbitmqctl"

TAB 3:

export RABBITMQ_NODENAME="rabbit3"
export RABBITMQ_NODE_PORT=5674
export RABBITMQ_DIST_PORT=25674
export RABBITMQ_MNESIA_BASE=/tmp/rabbitmq-4133-mnesia-base
alias rctl="/tmp/rabbitmq_server_310/sbin/rabbitmqctl"

Now start each node.

On each tab:

/tmp/rabbitmq_server_310/sbin/rabbitmq-server -detached

Form the cluster:

TAB 2 & 3:

rctl stop_app
rctl join_cluster rabbit1@$(hostname -s)
rctl start_app

Make sure the 3 nodes joined the cluster.

ANY TAB:

rctl cluster_status

Start the client application (requires the sandbox repository and JBang to be installed, see above for details).

ANY TAB:

jbang /tmp/sandbox/src/main/java/StreamQueueMonitorStreamLocalMember4133.java

Stop the client after a few messages (Ctrl+C). Check the members of the stream coordinator.

ANY TAB:

rctl eval 'ra:members(rabbit_stream_coordinator).'

It should output something like the following (the 3 members and the leader):

{ok,[{rabbit_stream_coordinator,'rabbit1@acogoluegnes-nuc'},
     {rabbit_stream_coordinator,'rabbit2@acogoluegnes-nuc'},
     {rabbit_stream_coordinator,'rabbit3@acogoluegnes-nuc'}],
    {rabbit_stream_coordinator,'rabbit1@acogoluegnes-nuc'}}

We're going now to upgrade the nodes.

The first node.

TAB 1

rctl stop
alias rctl="/tmp/rabbitmq_server_4133/sbin/rabbitmqctl"
/tmp/rabbitmq_server_4133/sbin/rabbitmq-server -detached

Make sure the node joined the cluster again.

ANY TAB:

rctl cluster_status

The second node.

TAB 2

rctl stop
alias rctl="/tmp/rabbitmq_server_4133/sbin/rabbitmqctl"
/tmp/rabbitmq_server_4133/sbin/rabbitmq-server -detached

Make sure the node joined the cluster again.

ANY TAB:

rctl cluster_status

Start the client application again.

TAB 1:

jbang /tmp/sandbox/src/main/java/StreamQueueMonitorStreamLocalMember4133.java

Check the state of the stream coordinator.

TAB 2:

rctl eval 'rabbit_stream_coordinator:state().'

The output should be like the following:

{rabbit_stream_coordinator,
    #{"__sq_1646920629979595000" =>
          {stream,"__sq_1646920629979595000",3,
              {resource,<<"/">>,queue,<<"sq">>},
              #{epoch => 1,
                event_formatter =>
                    {rabbit_stream_queue,format_osiris_event,
                        [{resource,<<"/">>,queue,<<"sq">>}]},
                leader_locator_strategy => <<"client-local">>,
                leader_node => 'rabbit1@acogoluegne-a01',
                name => "__sq_1646920629979595000",
                nodes =>
                    ['rabbit1@acogoluegne-a01','rabbit3@acogoluegne-a01',
                     'rabbit2@acogoluegne-a01'],
                reference => {resource,<<"/">>,queue,<<"sq">>},
                replica_nodes =>
                    ['rabbit3@acogoluegne-a01','rabbit2@acogoluegne-a01'],
                retention => []},
              ['rabbit1@acogoluegne-a01','rabbit3@acogoluegne-a01',
               'rabbit2@acogoluegne-a01'],
              ...
              #{<12133.734.0> => <12133.624.0>,
                 <12133.739.0> => <12133.624.0>,
                 <12133.1196.0> => <12133.624.0>,
                 <12133.1221.0> => <12133.624.0>},
              undefined,
              {updated,3},
              running}},
    #{<11582.585.0> => {"__sq_1646920629979595000",member},
      <12133.624.0> => {"__sq_1646920629979595000",member},
      <12133.734.0> => {"__sq_1646920629979595000",listener},
      <12133.739.0> => {"__sq_1646920629979595000",listener},
      <12134.1614.0> => {"__sq_1646920629979595000",member}},
    #{},undefined,undefined}

In our case, a map of listeners contain terminated PIDs that should have been removed (<12133.1196.0>, <12133.1221.0>). They are the PIDs of the channels of the previous run of the client application. They should go away when all the nodes are upgraded.

Upgrade the third node.

TAB 3

rctl stop
alias rctl="/tmp/rabbitmq_server_4133/sbin/rabbitmqctl"
/tmp/rabbitmq_server_4133/sbin/rabbitmq-server -detached

Make sure the node joined the cluster again.

ANY TAB:

rctl cluster_status

Dump the coordinator state.

ANY TAB:

rctl eval 'rabbit_stream_coordinator:state().'

The output should something like this:

{rabbit_stream_coordinator,
    #{"__sq_1646920629979595000" =>
          {stream,"__sq_1646920629979595000",3,
              {resource,<<"/">>,queue,<<"sq">>},
              #{epoch => 1,
                event_formatter =>
                    {rabbit_stream_queue,format_osiris_event,
                        [{resource,<<"/">>,queue,<<"sq">>}]},
                leader_locator_strategy => <<"client-local">>,
                leader_node => 'rabbit1@acogoluegne-a01',
                name => "__sq_1646920629979595000",
                nodes =>
                    ['rabbit1@acogoluegne-a01','rabbit3@acogoluegne-a01',
                     'rabbit2@acogoluegne-a01'],
                reference => {resource,<<"/">>,queue,<<"sq">>},
                replica_nodes =>
                    ['rabbit3@acogoluegne-a01','rabbit2@acogoluegne-a01'],
                retention => []},
              ['rabbit1@acogoluegne-a01','rabbit3@acogoluegne-a01',
               'rabbit2@acogoluegne-a01'],
             ...
              #{{<12133.734.0>,leader} => <12133.624.0>,
                 {<12133.739.0>,leader} => <12133.624.0>},
              undefined,
              {updated,3},
              running}},
    #{<12134.585.0> => {"__sq_1646920629979595000",member},
      <11582.592.0> => {"__sq_1646920629979595000",member},
      <12133.624.0> => {"__sq_1646920629979595000",member},
      <12133.734.0> => {#{"__sq_1646920629979595000" => ok},listener},
      <12133.739.0> => {#{"__sq_1646920629979595000" => ok},listener}},
    undefined,undefined,undefined}

The listeners should have been converted to the new representation:

#{{<12133.734.0>,leader} => <12133.624.0>,
   {<12133.739.0>,leader} => <12133.624.0>},

The stale PIDs should have been collected.

Even though the client application was launched when 2 nodes were on the last version, the stream coordinator was still running the previous version of the state machine. Let's stop and restart the client application and check out the coordinator state.

Ctrl-C the client application and start it again.

ANY TAB:

jbang /tmp/sandbox/src/main/java/StreamQueueMonitorStreamLocalMember4133.java

Dump the coordinator state.

ANY TAB:

rctl eval 'rabbit_stream_coordinator:state().'
{rabbit_stream_coordinator,
    #{"__sq_1646920629979595000" =>
          {stream,"__sq_1646920629979595000",3,
              {resource,<<"/">>,queue,<<"sq">>},
              #{epoch => 1,
                event_formatter =>
                    {rabbit_stream_queue,format_osiris_event,
                        [{resource,<<"/">>,queue,<<"sq">>}]},
                leader_locator_strategy => <<"client-local">>,
                leader_node => 'rabbit1@acogoluegne-a01',
                name => "__sq_1646920629979595000",
                nodes =>
                    ['rabbit1@acogoluegne-a01','rabbit3@acogoluegne-a01',
                     'rabbit2@acogoluegne-a01'],
                reference => {resource,<<"/">>,queue,<<"sq">>},
                replica_nodes =>
                    ['rabbit3@acogoluegne-a01','rabbit2@acogoluegne-a01'],
                retention => []},
              ['rabbit1@acogoluegne-a01','rabbit3@acogoluegne-a01',
               'rabbit2@acogoluegne-a01'],
              ...
              #{{<12133.1586.0>,leader} => <12133.624.0>,
                 {<12133.1591.0>,leader} => <12133.624.0>,
                 {<12133.1591.0>,member} =>
                    {'rabbit1@acogoluegne-a01',<12133.624.0>}},
              undefined,
              {updated,3},
              running}},
    #{<11582.585.0> => {"__sq_1646920629979595000",member},
      <12134.592.0> => {"__sq_1646920629979595000",member},
      <12133.624.0> => {"__sq_1646920629979595000",member},
      <12133.1586.0> => {#{"__sq_1646920629979595000" => ok},listener},
      <12133.1591.0> => {#{"__sq_1646920629979595000" => ok},listener}},
    undefined,undefined,undefined}

Note the map of listeners, it has now a member listener which will keep a consumer consuming when its local member changes:

#{{<12133.1586.0>,leader} => <12133.624.0>,
   {<12133.1591.0>,leader} => <12133.624.0>,
   {<12133.1591.0>,member} => {'rabbit1@acogoluegne-a01',<12133.624.0>}}

Stop the client application. Shut down the cluster.

On each tab:

rctl stop

Delete the directories:

cd /tmp
rm -rf /tmp/rabbitmq-4133-mnesia-base /tmp/rabbitmq-server-4133 /tmp/rabbitmq_server_310 /tmp/rabbitmq_server_4133 /tmp/sandbox

@acogoluegnes acogoluegnes force-pushed the rabbitmq-server-4133-monitor-stream-local-member branch from ae7bcdc to 4f7b7c8 Compare March 8, 2022 16:09
This commit refactors the stream coordinator listener mechanism
to support listeners that get notified when a stream member
on the same node of the listening process gets restarted.

Local member listeners are useful for AMQP consumers.

The change requires a state machine version bump as it changes
the state of the stream coordinator. This will be done
in a subsequent commit.

References #4133

(cherry picked from commit 3db593f8b02e17c23fe6e188ec775961b6f7cf94)
Adapt it to the new structure of stream listener.

References #4133
This commit re-creates an Osiris reader when the local member
of the stream cluster changes. Consumers can then restart
dispatching messages where they left off instead of hanging.

References #4133
The stream coordinator have the most up-to-date information.

References #4133
Version bump necessary because of the state changes to
handle local member listeners.

References #4133
These 2 tests can fail in mixed-version, 2-node cluster testing. If
the stream coordinator leader ends up on the lower version, it
does not contain the fixes and the tests fail.

With this commit, the 2 tests are skipped under the appropriate
conditions.

References #4133
These tests requires a level of availability that mixed-version
clusters cannot provide, so they are skipped under these
conditions.

References #4133
@acogoluegnes acogoluegnes force-pushed the rabbitmq-server-4133-monitor-stream-local-member branch from 4f7b7c8 to e6a2670 Compare March 8, 2022 16:40
Unlikely to pass as it randomly stops a node, which
can be the stream coordinator leader. The 2 remaining nodes
then cannot elect a new leader because they don't have
the same version.

References #4133
In stream coordinator. A v2 node, even if running v1, can
call the function to register a local member when an AMQP listener
is registered, so this commit makes such a call a no-op.

References #4133
Monitors were not converted.

References #4133
@acogoluegnes acogoluegnes marked this pull request as ready for review March 10, 2022 14:14
Copy link
Contributor

@kjnilsson kjnilsson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks generally good. A few formatting issues and a couple of suggestions.

listeners = #{} :: #{pid() := LeaderPid :: pid()},
listeners = #{} :: #{pid() | %% v0 & v1
{pid(), leader | member} %% v2
:= LeaderPid :: pid()} | {node(), LocalPid :: pid()},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why include a node() here? wont the node always be the one of the listener pid?

Copy link
Collaborator

@michaelklishin michaelklishin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All acceptance steps works as expected, although I had to tweak mkdir to use mkdir -p here and there.

@michaelklishin
Copy link
Collaborator

@acogoluegnes I assumed that most feedback above (seems to be cosmetics and minor code style tweaks) is going to be addressed and focused on the functional part in my review.

@michaelklishin
Copy link
Collaborator

@kjnilsson ready for another round

@michaelklishin michaelklishin merged commit 361f9de into master Mar 17, 2022
@michaelklishin michaelklishin deleted the rabbitmq-server-4133-monitor-stream-local-member branch March 17, 2022 11:59
mergify bot pushed a commit that referenced this pull request Mar 17, 2022
(cherry picked from commit 5692272)
mergify bot pushed a commit that referenced this pull request Mar 17, 2022
(cherry picked from commit d484e04)
mergify bot pushed a commit that referenced this pull request Mar 17, 2022
(cherry picked from commit 46662ec)
michaelklishin added a commit that referenced this pull request Mar 17, 2022
Monitor stream local member in stream queue (backport #4217)
mergify bot pushed a commit that referenced this pull request Mar 17, 2022
(cherry picked from commit 5692272)
(cherry picked from commit 7188374)
mergify bot pushed a commit that referenced this pull request Mar 17, 2022
(cherry picked from commit d484e04)
(cherry picked from commit f6c0d5a)
mergify bot pushed a commit that referenced this pull request Mar 17, 2022
(cherry picked from commit 46662ec)
(cherry picked from commit f1a0bd7)
michaelklishin added a commit that referenced this pull request Mar 17, 2022
Monitor stream local member in stream queue (backport #4217) (backport #4282)
michaelklishin added a commit that referenced this pull request Mar 18, 2022
michaelklishin added a commit that referenced this pull request Mar 18, 2022
michaelklishin added a commit that referenced this pull request Mar 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Monitor stream local member in stream queue
3 participants