@@ -102,6 +102,7 @@ type Operator struct {
102
102
catsrcQueueSet * queueinformer.ResourceQueueSet
103
103
subQueueSet * queueinformer.ResourceQueueSet
104
104
ipQueueSet * queueinformer.ResourceQueueSet
105
+ ogQueueSet * queueinformer.ResourceQueueSet
105
106
nsResolveQueue workqueue.RateLimitingInterface
106
107
namespace string
107
108
recorder record.EventRecorder
@@ -179,6 +180,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
179
180
catsrcQueueSet : queueinformer .NewEmptyResourceQueueSet (),
180
181
subQueueSet : queueinformer .NewEmptyResourceQueueSet (),
181
182
ipQueueSet : queueinformer .NewEmptyResourceQueueSet (),
183
+ ogQueueSet : queueinformer .NewEmptyResourceQueueSet (),
182
184
catalogSubscriberIndexer : map [string ]cache.Indexer {},
183
185
serviceAccountQuerier : scoped .NewUserDefinedServiceAccountQuerier (logger , crClient ),
184
186
clientAttenuator : scoped .NewClientAttenuator (logger , config , opClient ),
@@ -272,6 +274,25 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
272
274
return nil , err
273
275
}
274
276
277
+ // Wire OperatorGroup reconciliation
278
+ operatorGroupInformer := crInformerFactory .Operators ().V1 ().OperatorGroups ()
279
+ op .lister .OperatorsV1 ().RegisterOperatorGroupLister (metav1 .NamespaceAll , operatorGroupInformer .Lister ())
280
+ ogQueue := workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "og" )
281
+ op .ogQueueSet .Set (metav1 .NamespaceAll , ogQueue )
282
+ operatorGroupQueueInformer , err := queueinformer .NewQueueInformer (
283
+ ctx ,
284
+ queueinformer .WithLogger (op .logger ),
285
+ queueinformer .WithQueue (ogQueue ),
286
+ queueinformer .WithInformer (operatorGroupInformer .Informer ()),
287
+ queueinformer .WithSyncer (queueinformer .LegacySyncHandler (op .syncOperatorGroup ).ToSyncer ()),
288
+ )
289
+ if err != nil {
290
+ return nil , err
291
+ }
292
+ if err := op .RegisterQueueInformer (operatorGroupQueueInformer ); err != nil {
293
+ return nil , err
294
+ }
295
+
275
296
// Wire Subscriptions
276
297
subInformer := crInformerFactory .Operators ().V1alpha1 ().Subscriptions ()
277
298
op .lister .OperatorsV1alpha1 ().RegisterSubscriptionLister (metav1 .NamespaceAll , subInformer .Lister ())
@@ -526,10 +547,20 @@ func (o *Operator) syncObject(obj interface{}) (syncError error) {
526
547
}
527
548
528
549
o .requeueOwners (metaObj )
529
-
530
550
return o .triggerInstallPlanRetry (obj )
531
551
}
532
552
553
+ func (o * Operator ) syncOperatorGroup (obj interface {}) (syncError error ) {
554
+ // Assert as metav1.Object
555
+ _ , ok := obj .(metav1.Object )
556
+ if ! ok {
557
+ syncError = errors .New ("casting to metav1 object failed" )
558
+ o .logger .Warn (syncError .Error ())
559
+ return
560
+ }
561
+ return o .triggerInstallPlanUpdateForOperatorGroup (obj )
562
+ }
563
+
533
564
func (o * Operator ) handleDeletion (obj interface {}) {
534
565
metaObj , ok := obj .(metav1.Object )
535
566
if ! ok {
0 commit comments