Skip to content

Commit 5081254

Browse files
committed
add OperatorGroup reconciliation
Signed-off-by: akihikokuroda <[email protected]>
1 parent dd0d7fd commit 5081254

File tree

3 files changed

+140
-1
lines changed

3 files changed

+140
-1
lines changed

pkg/controller/operators/catalog/installplan_sync.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,49 @@ func (o *Operator) triggerInstallPlanRetry(obj interface{}) (syncError error) {
7474
syncError = utilerrors.NewAggregate(errs)
7575
return
7676
}
77+
78+
// When the operatorgroup is created after the installplan creation, the installplan must be resync
79+
func (o *Operator) triggerInstallPlanUpdateForOperatorGroup(obj interface{}) (syncError error) {
80+
metaObj, ok := obj.(metav1.Object)
81+
if !ok {
82+
syncError = errors.New("casting to metav1 object failed")
83+
o.logger.Warn(syncError.Error())
84+
return
85+
}
86+
87+
ips, err := o.lister.OperatorsV1alpha1().InstallPlanLister().InstallPlans(metaObj.GetNamespace()).List(labels.Everything())
88+
if err != nil {
89+
syncError = err
90+
return
91+
}
92+
93+
update := func(ip *v1alpha1.InstallPlan) error {
94+
out := ip.DeepCopy()
95+
out.Status.Phase = v1alpha1.InstallPlanPhaseInstalling
96+
now := o.now()
97+
out.Status.SetCondition(v1alpha1.ConditionMet(v1alpha1.InstallPlanResolved, &now))
98+
_, err := o.client.OperatorsV1alpha1().InstallPlans(ip.GetNamespace()).UpdateStatus(context.TODO(), out, metav1.UpdateOptions{})
99+
100+
return err
101+
}
102+
103+
var errs []error
104+
for _, ip := range ips {
105+
logger := o.logger.WithFields(logrus.Fields{
106+
"ip": ip.GetName(),
107+
"namespace": ip.GetNamespace(),
108+
"phase": ip.Status.Phase,
109+
})
110+
111+
if updateErr := update(ip); updateErr != nil {
112+
errs = append(errs, updateErr)
113+
logger.WithError(updateErr).Warn("failed to kick off InstallPlan retry")
114+
continue
115+
}
116+
117+
logger.Info("InstallPlan condition message set to 'OperatorGroup updated' for retry")
118+
}
119+
120+
syncError = utilerrors.NewAggregate(errs)
121+
return
122+
}

pkg/controller/operators/catalog/operator.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ type Operator struct {
102102
catsrcQueueSet *queueinformer.ResourceQueueSet
103103
subQueueSet *queueinformer.ResourceQueueSet
104104
ipQueueSet *queueinformer.ResourceQueueSet
105+
ogQueueSet *queueinformer.ResourceQueueSet
105106
nsResolveQueue workqueue.RateLimitingInterface
106107
namespace string
107108
recorder record.EventRecorder
@@ -179,6 +180,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
179180
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
180181
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
181182
ipQueueSet: queueinformer.NewEmptyResourceQueueSet(),
183+
ogQueueSet: queueinformer.NewEmptyResourceQueueSet(),
182184
catalogSubscriberIndexer: map[string]cache.Indexer{},
183185
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
184186
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient),
@@ -272,6 +274,25 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
272274
return nil, err
273275
}
274276

277+
// Wire OperatorGroup reconciliation
278+
operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups()
279+
op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister())
280+
ogQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "og")
281+
op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue)
282+
operatorGroupQueueInformer, err := queueinformer.NewQueueInformer(
283+
ctx,
284+
queueinformer.WithLogger(op.logger),
285+
queueinformer.WithQueue(ogQueue),
286+
queueinformer.WithInformer(operatorGroupInformer.Informer()),
287+
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncOperatorGroup).ToSyncer()),
288+
)
289+
if err != nil {
290+
return nil, err
291+
}
292+
if err := op.RegisterQueueInformer(operatorGroupQueueInformer); err != nil {
293+
return nil, err
294+
}
295+
275296
// Wire Subscriptions
276297
subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions()
277298
op.lister.OperatorsV1alpha1().RegisterSubscriptionLister(metav1.NamespaceAll, subInformer.Lister())
@@ -526,10 +547,19 @@ func (o *Operator) syncObject(obj interface{}) (syncError error) {
526547
}
527548

528549
o.requeueOwners(metaObj)
529-
530550
return o.triggerInstallPlanRetry(obj)
531551
}
532552

553+
func (o *Operator) syncOperatorGroup(obj interface{}) (syncError error) {
554+
_, ok := obj.(metav1.Object)
555+
if !ok {
556+
syncError = errors.New("casting to metav1 object failed")
557+
o.logger.Warn(syncError.Error())
558+
return
559+
}
560+
return o.triggerInstallPlanUpdateForOperatorGroup(obj)
561+
}
562+
533563
func (o *Operator) handleDeletion(obj interface{}) {
534564
metaObj, ok := obj.(metav1.Object)
535565
if !ok {

pkg/controller/operators/catalog/operator_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1413,6 +1413,69 @@ func TestValidateExistingCRs(t *testing.T) {
14131413
}
14141414
}
14151415

1416+
func TestSyncOperatorGroup(t *testing.T) {
1417+
namespace := "ns"
1418+
inOperatorGroup := operatorGroup("og", "sa", namespace,
1419+
&corev1.ObjectReference{
1420+
Kind: "ServiceAccount",
1421+
Namespace: namespace,
1422+
Name: "sa",
1423+
})
1424+
tests := []struct {
1425+
testName string
1426+
err error
1427+
in *operatorsv1.OperatorGroup
1428+
expectedPhase v1alpha1.InstallPlanPhase
1429+
expectedCondition *v1alpha1.InstallPlanCondition
1430+
clientObjs []runtime.Object
1431+
}{
1432+
{
1433+
testName: "OnComplete",
1434+
in: inOperatorGroup,
1435+
clientObjs: []runtime.Object{
1436+
installPlan("p", namespace, v1alpha1.InstallPlanPhaseComplete, "csv"),
1437+
},
1438+
err: nil,
1439+
expectedPhase: v1alpha1.InstallPlanPhaseInstalling,
1440+
expectedCondition: &v1alpha1.InstallPlanCondition{Type: v1alpha1.InstallPlanResolved, Status: corev1.ConditionTrue},
1441+
},
1442+
{
1443+
testName: "OnOnstalling",
1444+
in: inOperatorGroup,
1445+
clientObjs: []runtime.Object{
1446+
installPlan("p", namespace, v1alpha1.InstallPlanPhaseNone, "csv"),
1447+
},
1448+
err: nil,
1449+
expectedPhase: v1alpha1.InstallPlanPhaseInstalling,
1450+
expectedCondition: &v1alpha1.InstallPlanCondition{Type: v1alpha1.InstallPlanResolved, Status: corev1.ConditionTrue},
1451+
},
1452+
}
1453+
1454+
for _, tt := range tests {
1455+
t.Run(tt.testName, func(t *testing.T) {
1456+
ctx, cancel := context.WithCancel(context.TODO())
1457+
defer cancel()
1458+
1459+
tt.clientObjs = append(tt.clientObjs, tt.in)
1460+
1461+
op, err := NewFakeOperator(ctx, namespace, []string{namespace}, withClientObjs(tt.clientObjs...))
1462+
require.NoError(t, err)
1463+
1464+
err = op.syncOperatorGroup(tt.in)
1465+
require.Equal(t, tt.err, err)
1466+
1467+
ip, err := op.client.OperatorsV1alpha1().InstallPlans(namespace).Get(ctx, "p", metav1.GetOptions{})
1468+
require.NoError(t, err)
1469+
1470+
require.Equal(t, tt.expectedPhase, ip.Status.Phase)
1471+
1472+
if tt.expectedCondition != nil {
1473+
require.True(t, hasExpectedCondition(ip, *tt.expectedCondition))
1474+
}
1475+
})
1476+
}
1477+
}
1478+
14161479
func fakeConfigMapData() map[string]string {
14171480
data := make(map[string]string)
14181481
yaml, err := yaml.Marshal([]apiextensionsv1beta1.CustomResourceDefinition{crd("fake-crd")})

0 commit comments

Comments
 (0)