Skip to content

Commit 6df7011

Browse files
committed
Bug#21464787 THE CHANNEL SERVICE INTERFACE LACKS A METHOD TO CHECK THE RECEIVER THREAD ID.
Currently the channel service interface lacks a method to check the receiver thread id while having this method for the applier thread. In fact the interface should be generic and always allow to execute the same methods on both threads whenever it makes sense to. The method is now generic and allows to check either the receiver or the applier thread. It shall be useful to identify if the thread that executed some hook is the thread from channel A or B.
1 parent 64f5eb5 commit 6df7011

8 files changed

+200
-45
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
include/have_replication_observers_example_plugin.inc
2+
include/rpl_init.inc [topology=1->2]
3+
CHANGE MASTER TO MASTER_HOST="127.0.0.1", MASTER_USER="root", MASTER_PASSWORD="", MASTER_PORT=SERVER_1_PORT, MASTER_AUTO_POSITION=1 FOR CHANNEL "example_channel";
4+
Warnings:
5+
Note 1759 Sending passwords in plain text without SSL/TLS is extremely insecure.
6+
Note 1760 Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information.
7+
include/start_slave.inc
8+
include/assert.inc ['The example_channel is ON']
9+
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
10+
include/sync_slave_sql_with_master.inc
11+
include/install_replication_observers_example.inc
12+
SET @debug_saved= @@GLOBAL.DEBUG;
13+
SET @@GLOBAL.DEBUG= '+d,validate_replication_observers_plugin_server_channel_io_thread';
14+
INSERT INTO t1 VALUES(1);
15+
include/assert.inc ['The example_channel is OFF']
16+
SET @@GLOBAL.DEBUG= @debug_saved;
17+
RESET SLAVE ALL;
18+
DROP TABLE t1;
19+
include/uninstall_replication_observers_example.inc
20+
DROP TABLE t1;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
$RPL_OBS_EXAMPLE_OPT
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
$RPL_OBS_EXAMPLE_OPT
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
!include ../my.cnf
2+
3+
[mysqld.2]
4+
master-info-repository=TABLE
5+
relay-log-info-repository=TABLE
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# This test verifies the basic functionalities of the channel service interface.
2+
# Using the example plugin, we execute several of the interface methods on a
3+
# channel's receiver thread.
4+
5+
--source include/have_replication_observers_example_plugin.inc
6+
--source include/not_group_replication_plugin.inc
7+
--source include/not_embedded.inc
8+
--source include/have_debug.inc
9+
--source include/have_binlog_format_mixed.inc
10+
--source include/have_gtid.inc
11+
--let $rpl_topology= 1->2
12+
--let $rpl_skip_change_master= 1
13+
--let $rpl_skip_start_slave= 1
14+
--source include/rpl_init.inc
15+
16+
# Establish a connection to a slave with an example channel
17+
18+
--connection server_2
19+
--replace_result $SERVER_MYPORT_1 SERVER_1_PORT
20+
--eval CHANGE MASTER TO MASTER_HOST="127.0.0.1", MASTER_USER="root", MASTER_PASSWORD="", MASTER_PORT=$SERVER_MYPORT_1, MASTER_AUTO_POSITION=1 FOR CHANNEL "example_channel"
21+
--source include/start_slave.inc
22+
23+
--let $assert_text= 'The example_channel is ON'
24+
--let $assert_cond= [SELECT COUNT(*) AS count FROM performance_schema.replication_connection_status where CHANNEL_NAME="example_channel" and SERVICE_STATE="ON", count, 1] = 1
25+
--source include/assert.inc
26+
27+
--connection server_1
28+
29+
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
30+
31+
--let $wait_for_executed_gtid_set= 1
32+
--let $rpl_channel_name= example_channel
33+
--let $sync_slave_connection= server_2
34+
--source include/sync_slave_sql_with_master.inc
35+
36+
# Install the replication observers example plugin on the slave
37+
38+
--source include/install_replication_observers_example.inc
39+
40+
# Execute a query that on a server hook will execute a test against the several
41+
# method of the channel interface on the receiver thread of the created channel
42+
43+
SET @debug_saved= @@GLOBAL.DEBUG;
44+
SET @@GLOBAL.DEBUG= '+d,validate_replication_observers_plugin_server_channel_io_thread';
45+
--eval INSERT INTO t1 VALUES(1)
46+
47+
# The tests will stop the channel.
48+
49+
--let $assert_text= 'The example_channel is OFF'
50+
--let $assert_cond= [SELECT COUNT(*) AS count FROM performance_schema.replication_connection_status where CHANNEL_NAME="example_channel" and SERVICE_STATE="OFF", count, 1] = 1
51+
--source include/assert.inc
52+
53+
# Clean
54+
55+
SET @@GLOBAL.DEBUG= @debug_saved;
56+
57+
RESET SLAVE ALL;
58+
DROP TABLE t1;
59+
60+
--source include/uninstall_replication_observers_example.inc
61+
62+
--connection server_1
63+
DROP TABLE t1;

plugin/replication_observers_example/replication_observers_example.cc

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ static MYSQL_PLUGIN plugin_info_ptr;
3030
int validate_plugin_server_requirements(Trans_param *param);
3131
int test_channel_service_interface_initialization();
3232
int test_channel_service_interface();
33+
int test_channel_service_interface_io_thread();
3334

3435
/*
3536
Will register the number of calls to each method of Server state
@@ -198,6 +199,8 @@ int trans_before_dml(Trans_param *param, int& out_val)
198199
out_val= 1;);
199200
DBUG_EXECUTE_IF("validate_replication_observers_plugin_server_channels",
200201
test_channel_service_interface(););
202+
DBUG_EXECUTE_IF("validate_replication_observers_plugin_server_channel_io_thread",
203+
test_channel_service_interface_io_thread(););
201204
DBUG_EXECUTE_IF("validate_replication_observers_plugin_server_channels_init",
202205
test_channel_service_interface_initialization(););
203206
return 0;
@@ -677,8 +680,9 @@ int test_channel_service_interface()
677680

678681
//Extract the applier id
679682
long unsigned int * applier_id= NULL;
680-
channel_get_appliers_thread_id(interface_channel,
681-
&applier_id);
683+
channel_get_thread_id(interface_channel,
684+
CHANNEL_APPLIER_THREAD,
685+
&applier_id);
682686
DBUG_ASSERT(*applier_id > 0);
683687
my_free(applier_id);
684688

@@ -708,6 +712,45 @@ int test_channel_service_interface()
708712
return (error && exists && running && gno);
709713
}
710714

715+
int test_channel_service_interface_io_thread()
716+
{
717+
//The initialization method should return OK
718+
int error= initialize_channel_service_interface();
719+
DBUG_ASSERT(!error);
720+
721+
char interface_channel[]= "example_channel";
722+
723+
//Assert the channel exists
724+
bool exists= channel_is_active(interface_channel, CHANNEL_NO_THD);
725+
DBUG_ASSERT(exists);
726+
727+
//Assert that the applier receiver is running
728+
bool running= channel_is_active(interface_channel, CHANNEL_RECEIVER_THREAD);
729+
DBUG_ASSERT(running);
730+
731+
//Extract the receiver id
732+
long unsigned int * thread_id= NULL;
733+
int num_threads= channel_get_thread_id(interface_channel,
734+
CHANNEL_APPLIER_THREAD,
735+
&thread_id);
736+
DBUG_ASSERT(num_threads == 1);
737+
DBUG_ASSERT(*thread_id > 0);
738+
my_free(thread_id);
739+
740+
741+
//Stop the channel
742+
error= channel_stop(interface_channel,
743+
3,
744+
10000);
745+
DBUG_ASSERT(!error);
746+
747+
//Assert that the receiver thread is not running
748+
running= channel_is_active(interface_channel, CHANNEL_RECEIVER_THREAD);
749+
DBUG_ASSERT(!running);
750+
751+
return (error && exists && running && num_threads);
752+
}
753+
711754
/*
712755
Initialize the Replication Observer example at server start or plugin
713756
installation.

sql/rpl_channel_service_interface.cc

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -519,12 +519,13 @@ bool channel_is_active(const char* channel, enum_channel_thread_types thd_type)
519519
DBUG_RETURN(false);
520520
}
521521

522-
int channel_get_appliers_thread_id(const char* channel,
523-
unsigned long** appliers_id)
522+
int channel_get_thread_id(const char* channel,
523+
enum_channel_thread_types thd_type,
524+
unsigned long** thread_id)
524525
{
525-
DBUG_ENTER("channel_get_appliers_thread_id(channel, *appliers_id");
526+
DBUG_ENTER("channel_get_thread_id(channel, thread_type ,*thread_id");
526527

527-
int number_appliers= -1;
528+
int number_threads= -1;
528529

529530
Master_info *mi= msr_map.get_mi(channel);
530531

@@ -533,45 +534,64 @@ int channel_get_appliers_thread_id(const char* channel,
533534
DBUG_RETURN(RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR);
534535
}
535536

536-
if (mi->rli != NULL)
537+
switch(thd_type)
537538
{
538-
mysql_mutex_lock(&mi->rli->run_lock);
539-
540-
int num_workers= mi->rli->slave_parallel_workers;
541-
if (num_workers > 1)
542-
{
543-
*appliers_id=
544-
(unsigned long*) my_malloc(PSI_NOT_INSTRUMENTED,
545-
num_workers * sizeof(unsigned long),
546-
MYF(MY_WME));
547-
unsigned long *appliers_id_pointer= *appliers_id;
548-
549-
for (int i= 0; i < num_workers; i++, appliers_id_pointer++)
539+
case CHANNEL_RECEIVER_THREAD:
540+
mysql_mutex_lock(&mi->info_thd_lock);
541+
if (mi->info_thd != NULL)
550542
{
551-
mysql_mutex_lock(&mi->rli->workers.at(i)->info_thd_lock);
552-
*appliers_id_pointer= mi->rli->workers.at(i)->info_thd->thread_id();
553-
mysql_mutex_unlock(&mi->rli->workers.at(i)->info_thd_lock);
543+
*thread_id= (unsigned long*) my_malloc(PSI_NOT_INSTRUMENTED,
544+
sizeof(unsigned long),
545+
MYF(MY_WME));
546+
**thread_id= mi->info_thd->thread_id();
547+
number_threads= 1;
554548
}
555-
556-
number_appliers= num_workers;
557-
}
558-
else
559-
{
560-
if (mi->rli->info_thd != NULL)
549+
mysql_mutex_unlock(&mi->info_thd_lock);
550+
break;
551+
case CHANNEL_APPLIER_THREAD:
552+
if (mi->rli != NULL)
561553
{
562-
*appliers_id= (unsigned long*) my_malloc(PSI_NOT_INSTRUMENTED,
563-
sizeof(unsigned long),
564-
MYF(MY_WME));
565-
mysql_mutex_lock(&mi->rli->info_thd_lock);
566-
**appliers_id= mi->rli->info_thd->thread_id();
567-
mysql_mutex_unlock(&mi->rli->info_thd_lock);
568-
number_appliers= 1;
554+
mysql_mutex_lock(&mi->rli->run_lock);
555+
556+
int num_workers= mi->rli->slave_parallel_workers;
557+
if (num_workers > 1)
558+
{
559+
*thread_id=
560+
(unsigned long*) my_malloc(PSI_NOT_INSTRUMENTED,
561+
num_workers * sizeof(unsigned long),
562+
MYF(MY_WME));
563+
unsigned long *appliers_id_pointer= *thread_id;
564+
565+
for (int i= 0; i < num_workers; i++, appliers_id_pointer++)
566+
{
567+
mysql_mutex_lock(&mi->rli->workers.at(i)->info_thd_lock);
568+
*appliers_id_pointer= mi->rli->workers.at(i)->info_thd->thread_id();
569+
mysql_mutex_unlock(&mi->rli->workers.at(i)->info_thd_lock);
570+
}
571+
572+
number_threads= num_workers;
573+
}
574+
else
575+
{
576+
if (mi->rli->info_thd != NULL)
577+
{
578+
*thread_id= (unsigned long*) my_malloc(PSI_NOT_INSTRUMENTED,
579+
sizeof(unsigned long),
580+
MYF(MY_WME));
581+
mysql_mutex_lock(&mi->rli->info_thd_lock);
582+
**thread_id= mi->rli->info_thd->thread_id();
583+
mysql_mutex_unlock(&mi->rli->info_thd_lock);
584+
number_threads= 1;
585+
}
586+
}
587+
mysql_mutex_unlock(&mi->rli->run_lock);
569588
}
570-
}
571-
mysql_mutex_unlock(&mi->rli->run_lock);
589+
break;
590+
default:
591+
DBUG_RETURN(number_threads);
572592
}
573593

574-
DBUG_RETURN(number_appliers);
594+
DBUG_RETURN(number_threads);
575595
}
576596

577597
long long channel_get_last_delivered_gno(const char* channel, int sidno)

sql/rpl_channel_service_interface.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -216,18 +216,20 @@ int channel_purge_queue(const char* channel, bool reset_all);
216216
bool channel_is_active(const char* channel, enum_channel_thread_types type);
217217

218218
/**
219-
Returns the ids of the channel appliers.
220-
If more than one applier exists, a channel is returned
219+
Returns the id(s) of the channel threads: receiver or applier.
220+
If more than one applier exists, an array is returned
221221
222222
@param[in] channel The channel name
223-
@param[out] appliers_id The array of id(s)
223+
@param[in] thread_type The thread type (receiver or applier)
224+
@param[out] thread_id The array of id(s)
224225
225226
@return the number of returned ids
226-
@retval <=0 the channel does no exists, or the applier is not present
227-
@retval >0 the number of applier ids returned.
227+
@retval -1 the channel does no exists, or the thread is not present
228+
@retval >0 the number of thread ids returned.
228229
*/
229-
int channel_get_appliers_thread_id(const char* channel,
230-
unsigned long** appliers_id);
230+
int channel_get_thread_id(const char* channel,
231+
enum_channel_thread_types thread_type,
232+
unsigned long** thread_id);
231233

232234
/**
233235
Returns last GNO from applier from a given UUID.

0 commit comments

Comments
 (0)