Skip to content

Commit 5cce51b

Browse files
committed
add OperatorGroup reconciliation
Signed-off-by: akihikokuroda <[email protected]>
1 parent 4f5c16c commit 5cce51b

File tree

2 files changed

+79
-1
lines changed

2 files changed

+79
-1
lines changed

pkg/controller/operators/catalog/installplan_sync.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,50 @@ func (o *Operator) triggerInstallPlanRetry(obj interface{}) (syncError error) {
7474
syncError = utilerrors.NewAggregate(errs)
7575
return
7676
}
77+
78+
// When the operatorgroup is created after the installplan, the installplan must be resync
79+
func (o *Operator) triggerInstallPlanUpdateForOperatorGroup(obj interface{}) (syncError error) {
80+
metaObj, ok := obj.(metav1.Object)
81+
if !ok {
82+
syncError = errors.New("casting to metav1 object failed")
83+
o.logger.Warn(syncError.Error())
84+
return
85+
}
86+
87+
ips, err := o.lister.OperatorsV1alpha1().InstallPlanLister().InstallPlans(metaObj.GetNamespace()).List(labels.Everything())
88+
if err != nil {
89+
syncError = err
90+
return
91+
}
92+
93+
update := func(ip *v1alpha1.InstallPlan) error {
94+
out := ip.DeepCopy()
95+
out.Status.Phase = v1alpha1.InstallPlanPhaseInstalling
96+
now := o.now()
97+
out.Status.SetCondition(v1alpha1.ConditionFailed(v1alpha1.InstallPlanInstalled, v1alpha1.InstallPlanReasonComponentFailed, "OperatorGroup updated", &now))
98+
_, err := o.client.OperatorsV1alpha1().InstallPlans(ip.GetNamespace()).UpdateStatus(context.TODO(), out, metav1.UpdateOptions{})
99+
100+
return err
101+
}
102+
103+
var errs []error
104+
for _, ip := range ips {
105+
106+
logger := o.logger.WithFields(logrus.Fields{
107+
"ip": ip.GetName(),
108+
"namespace": ip.GetNamespace(),
109+
"phase": ip.Status.Phase,
110+
})
111+
112+
if updateErr := update(ip); updateErr != nil {
113+
errs = append(errs, updateErr)
114+
logger.WithError(updateErr).Warn("failed to kick off InstallPlan retry")
115+
continue
116+
}
117+
118+
logger.Info("InstallPlan condition message set to 'OperatorGroup updated' for retry")
119+
}
120+
121+
syncError = utilerrors.NewAggregate(errs)
122+
return
123+
}

pkg/controller/operators/catalog/operator.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ type Operator struct {
102102
catsrcQueueSet *queueinformer.ResourceQueueSet
103103
subQueueSet *queueinformer.ResourceQueueSet
104104
ipQueueSet *queueinformer.ResourceQueueSet
105+
ogQueueSet *queueinformer.ResourceQueueSet
105106
nsResolveQueue workqueue.RateLimitingInterface
106107
namespace string
107108
recorder record.EventRecorder
@@ -179,6 +180,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
179180
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
180181
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
181182
ipQueueSet: queueinformer.NewEmptyResourceQueueSet(),
183+
ogQueueSet: queueinformer.NewEmptyResourceQueueSet(),
182184
catalogSubscriberIndexer: map[string]cache.Indexer{},
183185
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
184186
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient),
@@ -272,6 +274,25 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
272274
return nil, err
273275
}
274276

277+
// Wire OperatorGroup reconciliation
278+
operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups()
279+
op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister())
280+
ogQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "og")
281+
op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue)
282+
operatorGroupQueueInformer, err := queueinformer.NewQueueInformer(
283+
ctx,
284+
queueinformer.WithLogger(op.logger),
285+
queueinformer.WithQueue(ogQueue),
286+
queueinformer.WithInformer(operatorGroupInformer.Informer()),
287+
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncOperatorGroup).ToSyncer()),
288+
)
289+
if err != nil {
290+
return nil, err
291+
}
292+
if err := op.RegisterQueueInformer(operatorGroupQueueInformer); err != nil {
293+
return nil, err
294+
}
295+
275296
// Wire Subscriptions
276297
subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions()
277298
op.lister.OperatorsV1alpha1().RegisterSubscriptionLister(metav1.NamespaceAll, subInformer.Lister())
@@ -526,10 +547,20 @@ func (o *Operator) syncObject(obj interface{}) (syncError error) {
526547
}
527548

528549
o.requeueOwners(metaObj)
529-
530550
return o.triggerInstallPlanRetry(obj)
531551
}
532552

553+
func (o *Operator) syncOperatorGroup(obj interface{}) (syncError error) {
554+
// Assert as metav1.Object
555+
_, ok := obj.(metav1.Object)
556+
if !ok {
557+
syncError = errors.New("casting to metav1 object failed")
558+
o.logger.Warn(syncError.Error())
559+
return
560+
}
561+
return o.triggerInstallPlanUpdateForOperatorGroup(obj)
562+
}
563+
533564
func (o *Operator) handleDeletion(obj interface{}) {
534565
metaObj, ok := obj.(metav1.Object)
535566
if !ok {

0 commit comments

Comments
 (0)