@@ -330,10 +330,6 @@ export class RunsReplicationService {
330
330
return ;
331
331
}
332
332
333
- this . logger . debug ( "Handling transaction" , {
334
- transaction,
335
- } ) ;
336
-
337
333
const lsnToUInt64Start = process . hrtime . bigint ( ) ;
338
334
339
335
// If there are events, we need to handle them
@@ -349,20 +345,32 @@ export class RunsReplicationService {
349
345
} ) )
350
346
) ;
351
347
352
- const currentSpan = this . _tracer . startSpan ( "handle_transaction" , {
353
- attributes : {
354
- "transaction.xid" : transaction . xid ,
355
- "transaction.replication_lag_ms" : transaction . replicationLagMs ,
356
- "transaction.events" : transaction . events . length ,
357
- "transaction.commit_end_lsn" : transaction . commitEndLsn ,
358
- "transaction.parse_duration_ms" : this . _currentParseDurationMs ?? undefined ,
359
- "transaction.lsn_to_uint64_ms" : lsnToUInt64DurationMs ,
360
- "transaction.version" : _version . toString ( ) ,
348
+ this . _tracer
349
+ . startSpan ( "handle_transaction" , {
350
+ attributes : {
351
+ "transaction.xid" : transaction . xid ,
352
+ "transaction.replication_lag_ms" : transaction . replicationLagMs ,
353
+ "transaction.events" : transaction . events . length ,
354
+ "transaction.commit_end_lsn" : transaction . commitEndLsn ,
355
+ "transaction.parse_duration_ms" : this . _currentParseDurationMs ?? undefined ,
356
+ "transaction.lsn_to_uint64_ms" : lsnToUInt64DurationMs ,
357
+ "transaction.version" : _version . toString ( ) ,
358
+ } ,
359
+ startTime : transaction . beginStartTimestamp ,
360
+ } )
361
+ . end ( ) ;
362
+
363
+ this . logger . debug ( "handle_transaction" , {
364
+ transaction : {
365
+ xid : transaction . xid ,
366
+ commitLsn : transaction . commitLsn ,
367
+ commitEndLsn : transaction . commitEndLsn ,
368
+ events : transaction . events . length ,
369
+ parseDurationMs : this . _currentParseDurationMs ,
370
+ lsnToUInt64DurationMs,
371
+ version : _version . toString ( ) ,
361
372
} ,
362
- startTime : transaction . beginStartTimestamp ,
363
373
} ) ;
364
-
365
- currentSpan . end ( ) ;
366
374
}
367
375
368
376
async #acknowledgeLatestTransaction( ) {
@@ -387,7 +395,7 @@ export class RunsReplicationService {
387
395
this . _lastAcknowledgedAt = now ;
388
396
this . _lastAcknowledgedLsn = this . _latestCommitEndLsn ;
389
397
390
- this . logger . debug ( "Acknowledging transaction " , {
398
+ this . logger . debug ( "acknowledge_latest_transaction " , {
391
399
commitEndLsn : this . _latestCommitEndLsn ,
392
400
lastAcknowledgedAt : this . _lastAcknowledgedAt ,
393
401
} ) ;
@@ -747,7 +755,7 @@ export class ConcurrentFlushScheduler<T> {
747
755
const callback = this . config . callback ;
748
756
749
757
const promise = this . concurrencyLimiter ( async ( ) => {
750
- await startSpan ( this . _tracer , "flushNextBatch" , async ( span ) => {
758
+ return await startSpan ( this . _tracer , "flushNextBatch" , async ( span ) => {
751
759
const batchId = nanoid ( ) ;
752
760
753
761
span . setAttribute ( "batch_id" , batchId ) ;
@@ -756,26 +764,47 @@ export class ConcurrentFlushScheduler<T> {
756
764
span . setAttribute ( "concurrency_pending_count" , this . concurrencyLimiter . pendingCount ) ;
757
765
span . setAttribute ( "concurrency_concurrency" , this . concurrencyLimiter . concurrency ) ;
758
766
767
+ this . logger . debug ( "flush_next_batch" , {
768
+ batchId,
769
+ batchSize : batch . length ,
770
+ concurrencyActiveCount : this . concurrencyLimiter . activeCount ,
771
+ concurrencyPendingCount : this . concurrencyLimiter . pendingCount ,
772
+ concurrencyConcurrency : this . concurrencyLimiter . concurrency ,
773
+ } ) ;
774
+
775
+ const start = performance . now ( ) ;
776
+
759
777
await callback ( batchId , batch ) ;
778
+
779
+ const end = performance . now ( ) ;
780
+
781
+ const duration = end - start ;
782
+
783
+ return {
784
+ batchId,
785
+ duration,
786
+ } ;
760
787
} ) ;
761
788
} ) ;
762
789
763
- const [ error ] = await tryCatch ( promise ) ;
790
+ const [ error , result ] = await tryCatch ( promise ) ;
764
791
765
792
if ( error ) {
766
- this . logger . error ( "Error flushing batch " , {
793
+ this . logger . error ( "flush_batch_error " , {
767
794
error,
768
795
} ) ;
769
796
770
797
this . failedBatchCount ++ ;
798
+ } else {
799
+ this . logger . debug ( "flush_batch_complete" , {
800
+ totalBatches : 1 ,
801
+ successfulBatches : 1 ,
802
+ failedBatches : 0 ,
803
+ totalFailedBatches : this . failedBatchCount ,
804
+ duration : result ?. duration ,
805
+ batchId : result ?. batchId ,
806
+ } ) ;
771
807
}
772
-
773
- this . logger . debug ( "Batch flush complete" , {
774
- totalBatches : 1 ,
775
- successfulBatches : 1 ,
776
- failedBatches : 0 ,
777
- totalFailedBatches : this . failedBatchCount ,
778
- } ) ;
779
808
}
780
809
}
781
810
0 commit comments