@@ -6198,16 +6198,14 @@ class injector_transaction : public injector::transaction {
6198
6198
/* *
6199
6199
@brief Handle one data event received from NDB
6200
6200
6201
- @param pOp The NdbEventOperation that received data
6202
- @param trans The injector transaction
6203
- @param[out] trans_row_count Counter for rows in event
6204
- @param[out] replicated_row_count Counter for replicated rows in event
6201
+ @param pOp The NdbEventOperation that received data
6202
+ @param trans The injector transaction
6203
+ @param[out] epoch_ctx Epoch context for accumulated counters
6205
6204
@return 0 for success, other values (normally -1) for error
6206
6205
*/
6207
6206
int Ndb_binlog_thread::handle_data_event (const NdbEventOperation *pOp,
6208
6207
injector_transaction &trans,
6209
- unsigned &trans_row_count,
6210
- unsigned &replicated_row_count) {
6208
+ EpochContext &epoch_ctx) {
6211
6209
bool reflected_op = false ;
6212
6210
bool refresh_op = false ;
6213
6211
bool read_op = false ;
@@ -6329,7 +6327,7 @@ int Ndb_binlog_thread::handle_data_event(const NdbEventOperation *pOp,
6329
6327
else {
6330
6328
assert (!reflected_op && !refresh_op);
6331
6329
/* Track that we received a replicated row event */
6332
- if (likely (count_this_event)) replicated_row_count++;
6330
+ if (likely (count_this_event)) epoch_ctx. replicated_row_count ++;
6333
6331
6334
6332
if (!log_this_slave_update) {
6335
6333
/*
@@ -6419,7 +6417,7 @@ int Ndb_binlog_thread::handle_data_event(const NdbEventOperation *pOp,
6419
6417
case NDBEVENT::TE_INSERT:
6420
6418
if (likely (count_this_event)) {
6421
6419
row.n_inserts ++;
6422
- trans_row_count++;
6420
+ epoch_ctx. trans_row_count ++;
6423
6421
}
6424
6422
DBUG_PRINT (" info" , (" INSERT INTO %s.%s" , table->s ->db .str ,
6425
6423
table->s ->table_name .str ));
@@ -6449,7 +6447,7 @@ int Ndb_binlog_thread::handle_data_event(const NdbEventOperation *pOp,
6449
6447
case NDBEVENT::TE_DELETE:
6450
6448
if (likely (count_this_event)) {
6451
6449
row.n_deletes ++;
6452
- trans_row_count++;
6450
+ epoch_ctx. trans_row_count ++;
6453
6451
}
6454
6452
DBUG_PRINT (" info" , (" DELETE FROM %s.%s" , table->s ->db .str ,
6455
6453
table->s ->table_name .str ));
@@ -6494,7 +6492,7 @@ int Ndb_binlog_thread::handle_data_event(const NdbEventOperation *pOp,
6494
6492
case NDBEVENT::TE_UPDATE:
6495
6493
if (likely (count_this_event)) {
6496
6494
row.n_updates ++;
6497
- trans_row_count++;
6495
+ epoch_ctx. trans_row_count ++;
6498
6496
}
6499
6497
DBUG_PRINT (" info" ,
6500
6498
(" UPDATE %s.%s" , table->s ->db .str , table->s ->table_name .str ));
@@ -6703,7 +6701,7 @@ bool Ndb_binlog_thread::handle_events_for_epoch(THD *thd, injector *inj,
6703
6701
(uint)(ndb_latest_handled_binlog_epoch >> 32 ),
6704
6702
(uint)(ndb_latest_handled_binlog_epoch)));
6705
6703
6706
- commit_trans (trans, thd, current_epoch, 0 , 0 );
6704
+ commit_trans (trans, thd, current_epoch, EpochContext{} );
6707
6705
}
6708
6706
6709
6707
i_pOp = i_ndb->nextEvent2 ();
@@ -6743,15 +6741,13 @@ bool Ndb_binlog_thread::handle_events_for_epoch(THD *thd, injector *inj,
6743
6741
return false ; // Error, failed to inject ndb_apply_status
6744
6742
}
6745
6743
6746
- unsigned trans_row_count = 0 ;
6747
- unsigned replicated_row_count = 0 ;
6744
+ EpochContext epoch_ctx;
6748
6745
do {
6749
6746
assert (check_event_list_consistency (i_ndb, i_pOp));
6750
6747
6751
6748
const NdbDictionary::Event::TableEvent event_type = i_pOp->getEventType ();
6752
6749
if (event_type < NDBEVENT::TE_FIRST_NON_DATA_EVENT) {
6753
- if (handle_data_event (i_pOp, trans, trans_row_count,
6754
- replicated_row_count) != 0 ) {
6750
+ if (handle_data_event (i_pOp, trans, epoch_ctx) != 0 ) {
6755
6751
log_error (" Failed to handle data event" );
6756
6752
return false ; // Error, failed to handle data event
6757
6753
}
@@ -6772,8 +6768,7 @@ bool Ndb_binlog_thread::handle_events_for_epoch(THD *thd, injector *inj,
6772
6768
or is == NULL
6773
6769
*/
6774
6770
6775
- commit_trans (trans, thd, current_epoch, trans_row_count,
6776
- replicated_row_count);
6771
+ commit_trans (trans, thd, current_epoch, epoch_ctx);
6777
6772
6778
6773
return true ; // OK
6779
6774
}
@@ -7066,30 +7061,32 @@ static Uint64 find_epoch_to_handle(const NdbEventOperation *s_pOp,
7066
7061
return ndb_latest_received_binlog_epoch;
7067
7062
}
7068
7063
7064
+ bool Ndb_binlog_thread::EpochContext::is_empty_epoch () const {
7065
+ DBUG_TRACE;
7066
+ DBUG_PRINT (" enter" , (" trans_row_count: %d" , trans_row_count));
7067
+ DBUG_PRINT (" enter" , (" replicated_row_count: %d" , replicated_row_count));
7068
+ if (trans_row_count) {
7069
+ DBUG_PRINT (" exit" , (" binlog has recorded rows -> not empty" ));
7070
+ return false ;
7071
+ }
7072
+ if (opt_ndb_log_apply_status && replicated_row_count) {
7073
+ DBUG_PRINT (" info" , (" logging updates to ndb_apply_status" ));
7074
+ DBUG_PRINT (" exit" , (" received rows applied by a replica "
7075
+ " -> not empty" ));
7076
+ return false ;
7077
+ }
7078
+ DBUG_PRINT (" exit" , (" empty epoch" ));
7079
+ return true ;
7080
+ }
7081
+
7069
7082
void Ndb_binlog_thread::commit_trans (injector_transaction &trans, THD *thd,
7070
7083
Uint64 current_epoch,
7071
- unsigned trans_row_count,
7072
- unsigned replicated_row_count) {
7073
- if (!opt_ndb_log_empty_epochs) {
7074
- /*
7075
- If
7076
- - We did not add any 'real' rows to the Binlog
7077
- AND
7078
- - We did not apply any slave row updates, only
7079
- ndb_apply_status updates
7080
- THEN
7081
- Don't write the Binlog transaction which just
7082
- contains ndb_apply_status updates.
7083
- (For circular rep with log_apply_status, ndb_apply_status
7084
- updates will propagate while some related, real update
7085
- is propagating)
7086
- */
7087
- if ((trans_row_count == 0 ) &&
7088
- (!(opt_ndb_log_apply_status && replicated_row_count))) {
7089
- /* nothing to commit, rollback instead */
7090
- (void )trans.rollback (); // Rollback never fails (by design)
7091
- return ;
7092
- }
7084
+ EpochContext epoch_ctx) {
7085
+ DBUG_TRACE;
7086
+ if (!opt_ndb_log_empty_epochs && epoch_ctx.is_empty_epoch ()) {
7087
+ /* nothing to commit, rollback instead */
7088
+ (void )trans.rollback (); // Rollback never fails (by design)
7089
+ return ;
7093
7090
}
7094
7091
7095
7092
thd->set_proc_info (" Committing events to binlog" );
0 commit comments