@@ -46,8 +46,9 @@ import (
46
46
47
47
const (
48
48
// We have set a buffer in order to reduce times of context switches.
49
- incomingBufSize = 100
50
- outgoingBufSize = 100
49
+ incomingBufSize = 100
50
+ outgoingBufSize = 100
51
+ processEventConcurrency = 10
51
52
)
52
53
53
54
// defaultWatcherMaxLimit is used to facilitate construction tests
@@ -230,8 +231,7 @@ func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bo
230
231
go wc .startWatching (watchClosedCh , initialEventsEndBookmarkRequired , forceInitialEvents )
231
232
232
233
var resultChanWG sync.WaitGroup
233
- resultChanWG .Add (1 )
234
- go wc .processEvent (& resultChanWG )
234
+ wc .processEvents (& resultChanWG )
235
235
236
236
select {
237
237
case err := <- wc .errChan :
@@ -424,18 +424,25 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd
424
424
close (watchClosedCh )
425
425
}
426
426
427
- // processEvent processes events from etcd watcher and sends results to resultChan.
428
- func (wc * watchChan ) processEvent (wg * sync.WaitGroup ) {
427
+ // processEvents processes events from etcd watcher and sends results to resultChan.
428
+ func (wc * watchChan ) processEvents (wg * sync.WaitGroup ) {
429
+ if utilfeature .DefaultFeatureGate .Enabled (features .ConcurrentWatchObjectDecode ) {
430
+ wc .concurrentProcessEvents (wg )
431
+ } else {
432
+ wg .Add (1 )
433
+ go wc .serialProcessEvents (wg )
434
+ }
435
+ }
436
+ func (wc * watchChan ) serialProcessEvents (wg * sync.WaitGroup ) {
429
437
defer wg .Done ()
430
-
431
438
for {
432
439
select {
433
440
case e := <- wc .incomingEventChan :
434
441
res := wc .transform (e )
435
442
if res == nil {
436
443
continue
437
444
}
438
- if len (wc .resultChan ) == outgoingBufSize {
445
+ if len (wc .resultChan ) == cap ( wc . resultChan ) {
439
446
klog .V (3 ).InfoS ("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers" , "outgoingEvents" , outgoingBufSize , "objectType" , wc .watcher .objectType , "groupResource" , wc .watcher .groupResource )
440
447
}
441
448
// If user couldn't receive results fast enough, we also block incoming events from watcher.
@@ -452,6 +459,95 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
452
459
}
453
460
}
454
461
462
+ func (wc * watchChan ) concurrentProcessEvents (wg * sync.WaitGroup ) {
463
+ p := concurrentOrderedEventProcessing {
464
+ input : wc .incomingEventChan ,
465
+ processFunc : wc .transform ,
466
+ output : wc .resultChan ,
467
+ processingQueue : make (chan chan * watch.Event , processEventConcurrency - 1 ),
468
+
469
+ objectType : wc .watcher .objectType ,
470
+ groupResource : wc .watcher .groupResource ,
471
+ }
472
+ wg .Add (1 )
473
+ go func () {
474
+ defer wg .Done ()
475
+ p .scheduleEventProcessing (wc .ctx , wg )
476
+ }()
477
+ wg .Add (1 )
478
+ go func () {
479
+ defer wg .Done ()
480
+ p .collectEventProcessing (wc .ctx )
481
+ }()
482
+ }
483
+
484
+ type concurrentOrderedEventProcessing struct {
485
+ input chan * event
486
+ processFunc func (* event ) * watch.Event
487
+ output chan watch.Event
488
+
489
+ processingQueue chan chan * watch.Event
490
+ // Metadata for logging
491
+ objectType string
492
+ groupResource schema.GroupResource
493
+ }
494
+
495
+ func (p * concurrentOrderedEventProcessing ) scheduleEventProcessing (ctx context.Context , wg * sync.WaitGroup ) {
496
+ var e * event
497
+ for {
498
+ select {
499
+ case <- ctx .Done ():
500
+ return
501
+ case e = <- p .input :
502
+ }
503
+ processingResponse := make (chan * watch.Event , 1 )
504
+ select {
505
+ case <- ctx .Done ():
506
+ return
507
+ case p .processingQueue <- processingResponse :
508
+ }
509
+ wg .Add (1 )
510
+ go func (e * event , response chan <- * watch.Event ) {
511
+ defer wg .Done ()
512
+ select {
513
+ case <- ctx .Done ():
514
+ case response <- p .processFunc (e ):
515
+ }
516
+ }(e , processingResponse )
517
+ }
518
+ }
519
+
520
+ func (p * concurrentOrderedEventProcessing ) collectEventProcessing (ctx context.Context ) {
521
+ var processingResponse chan * watch.Event
522
+ var e * watch.Event
523
+ for {
524
+ select {
525
+ case <- ctx .Done ():
526
+ return
527
+ case processingResponse = <- p .processingQueue :
528
+ }
529
+ select {
530
+ case <- ctx .Done ():
531
+ return
532
+ case e = <- processingResponse :
533
+ }
534
+ if e == nil {
535
+ continue
536
+ }
537
+ if len (p .output ) == cap (p .output ) {
538
+ klog .V (3 ).InfoS ("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers" , "outgoingEvents" , outgoingBufSize , "objectType" , p .objectType , "groupResource" , p .groupResource )
539
+ }
540
+ // If user couldn't receive results fast enough, we also block incoming events from watcher.
541
+ // Because storing events in local will cause more memory usage.
542
+ // The worst case would be closing the fast watcher.
543
+ select {
544
+ case <- ctx .Done ():
545
+ return
546
+ case p .output <- * e :
547
+ }
548
+ }
549
+ }
550
+
455
551
func (wc * watchChan ) filter (obj runtime.Object ) bool {
456
552
if wc .internalPred .Empty () {
457
553
return true
0 commit comments