Skip to content

Commit e13258f

Browse files
committed
add OperatorGroup reconciliation
Signed-off-by: akihikokuroda <[email protected]>
1 parent 094ae7b commit e13258f

File tree

3 files changed

+140
-2
lines changed

3 files changed

+140
-2
lines changed

pkg/controller/operators/catalog/installplan_sync.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,48 @@ func (o *Operator) triggerInstallPlanRetry(obj interface{}) (syncError error) {
7474
syncError = utilerrors.NewAggregate(errs)
7575
return
7676
}
77+
78+
// When the operatorgroup is created after the installplan creation, the installplan must be resync
79+
func (o *Operator) triggerInstallPlanUpdateForOperatorGroup(obj interface{}) (syncError error) {
80+
metaObj, ok := obj.(metav1.Object)
81+
if !ok {
82+
syncError = errors.New("casting to metav1 object failed")
83+
o.logger.Warn(syncError.Error())
84+
return
85+
}
86+
87+
ips, err := o.lister.OperatorsV1alpha1().InstallPlanLister().InstallPlans(metaObj.GetNamespace()).List(labels.Everything())
88+
if err != nil {
89+
syncError = err
90+
return
91+
}
92+
93+
update := func(ip *v1alpha1.InstallPlan) error {
94+
out := ip.DeepCopy()
95+
now := o.now()
96+
out.Status.SetCondition(v1alpha1.ConditionMet(v1alpha1.InstallPlanResolved, &now))
97+
_, err := o.client.OperatorsV1alpha1().InstallPlans(ip.GetNamespace()).UpdateStatus(context.TODO(), out, metav1.UpdateOptions{})
98+
99+
return err
100+
}
101+
102+
var errs []error
103+
for _, ip := range ips {
104+
logger := o.logger.WithFields(logrus.Fields{
105+
"ip": ip.GetName(),
106+
"namespace": ip.GetNamespace(),
107+
"phase": ip.Status.Phase,
108+
})
109+
110+
if updateErr := update(ip); updateErr != nil {
111+
errs = append(errs, updateErr)
112+
logger.WithError(updateErr).Warn("failed to kick off InstallPlan retry")
113+
continue
114+
}
115+
116+
logger.Info("InstallPlan condition message set to 'OperatorGroup updated' for retry")
117+
}
118+
119+
syncError = utilerrors.NewAggregate(errs)
120+
return
121+
}

pkg/controller/operators/catalog/operator.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ type Operator struct {
102102
catsrcQueueSet *queueinformer.ResourceQueueSet
103103
subQueueSet *queueinformer.ResourceQueueSet
104104
ipQueueSet *queueinformer.ResourceQueueSet
105+
ogQueueSet *queueinformer.ResourceQueueSet
105106
nsResolveQueue workqueue.RateLimitingInterface
106107
namespace string
107108
recorder record.EventRecorder
@@ -181,6 +182,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
181182
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
182183
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
183184
ipQueueSet: queueinformer.NewEmptyResourceQueueSet(),
185+
ogQueueSet: queueinformer.NewEmptyResourceQueueSet(),
184186
catalogSubscriberIndexer: map[string]cache.Indexer{},
185187
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
186188
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient),
@@ -275,6 +277,25 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
275277
return nil, err
276278
}
277279

280+
// Wire OperatorGroup reconciliation
281+
operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups()
282+
op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister())
283+
ogQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "og")
284+
op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue)
285+
operatorGroupQueueInformer, err := queueinformer.NewQueueInformer(
286+
ctx,
287+
queueinformer.WithLogger(op.logger),
288+
queueinformer.WithQueue(ogQueue),
289+
queueinformer.WithInformer(operatorGroupInformer.Informer()),
290+
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncOperatorGroup).ToSyncer()),
291+
)
292+
if err != nil {
293+
return nil, err
294+
}
295+
if err := op.RegisterQueueInformer(operatorGroupQueueInformer); err != nil {
296+
return nil, err
297+
}
298+
278299
// Wire Subscriptions
279300
subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions()
280301
op.lister.OperatorsV1alpha1().RegisterSubscriptionLister(metav1.NamespaceAll, subInformer.Lister())
@@ -529,10 +550,19 @@ func (o *Operator) syncObject(obj interface{}) (syncError error) {
529550
}
530551

531552
o.requeueOwners(metaObj)
532-
533553
return o.triggerInstallPlanRetry(obj)
534554
}
535555

556+
func (o *Operator) syncOperatorGroup(obj interface{}) (syncError error) {
557+
_, ok := obj.(metav1.Object)
558+
if !ok {
559+
syncError = errors.New("casting to metav1 object failed")
560+
o.logger.Warn(syncError.Error())
561+
return
562+
}
563+
return o.triggerInstallPlanUpdateForOperatorGroup(obj)
564+
}
565+
536566
func (o *Operator) handleDeletion(obj interface{}) {
537567
metaObj, ok := obj.(metav1.Object)
538568
if !ok {
@@ -1583,7 +1613,7 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) {
15831613

15841614
// Attempt to unpack bundles before installing
15851615
// Note: This should probably use the attenuated client to prevent users from resolving resources they otherwise don't have access to.
1586-
if len(plan.Status.BundleLookups) > 0 {
1616+
if plan.Status.BundleLookups != nil && len(plan.Status.BundleLookups) > 0 {
15871617
unpacked, out, err := o.unpackBundles(plan)
15881618
if err != nil {
15891619
// Retry sync if non-fatal error

pkg/controller/operators/catalog/operator_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1413,6 +1413,69 @@ func TestValidateExistingCRs(t *testing.T) {
14131413
}
14141414
}
14151415

1416+
func TestSyncOperatorGroup(t *testing.T) {
1417+
namespace := "ns"
1418+
inOperatorGroup := operatorGroup("og", "sa", namespace,
1419+
&corev1.ObjectReference{
1420+
Kind: "ServiceAccount",
1421+
Namespace: namespace,
1422+
Name: "sa",
1423+
})
1424+
tests := []struct {
1425+
testName string
1426+
err error
1427+
in *operatorsv1.OperatorGroup
1428+
expectedPhase v1alpha1.InstallPlanPhase
1429+
expectedCondition *v1alpha1.InstallPlanCondition
1430+
clientObjs []runtime.Object
1431+
}{
1432+
{
1433+
testName: "OnComplete",
1434+
in: inOperatorGroup,
1435+
clientObjs: []runtime.Object{
1436+
installPlan("p", namespace, v1alpha1.InstallPlanPhaseComplete, "csv"),
1437+
},
1438+
err: nil,
1439+
expectedPhase: v1alpha1.InstallPlanPhaseInstalling,
1440+
expectedCondition: &v1alpha1.InstallPlanCondition{Type: v1alpha1.InstallPlanResolved, Status: corev1.ConditionTrue},
1441+
},
1442+
{
1443+
testName: "OnOnstalling",
1444+
in: inOperatorGroup,
1445+
clientObjs: []runtime.Object{
1446+
installPlan("p", namespace, v1alpha1.InstallPlanPhaseNone, "csv"),
1447+
},
1448+
err: nil,
1449+
expectedPhase: v1alpha1.InstallPlanPhaseInstalling,
1450+
expectedCondition: &v1alpha1.InstallPlanCondition{Type: v1alpha1.InstallPlanResolved, Status: corev1.ConditionTrue},
1451+
},
1452+
}
1453+
1454+
for _, tt := range tests {
1455+
t.Run(tt.testName, func(t *testing.T) {
1456+
ctx, cancel := context.WithCancel(context.TODO())
1457+
defer cancel()
1458+
1459+
tt.clientObjs = append(tt.clientObjs, tt.in)
1460+
1461+
op, err := NewFakeOperator(ctx, namespace, []string{namespace}, withClientObjs(tt.clientObjs...))
1462+
require.NoError(t, err)
1463+
1464+
err = op.syncOperatorGroup(tt.in)
1465+
require.Equal(t, tt.err, err)
1466+
1467+
ip, err := op.client.OperatorsV1alpha1().InstallPlans(namespace).Get(ctx, "p", metav1.GetOptions{})
1468+
require.NoError(t, err)
1469+
1470+
require.Equal(t, tt.expectedPhase, ip.Status.Phase)
1471+
1472+
if tt.expectedCondition != nil {
1473+
require.True(t, hasExpectedCondition(ip, *tt.expectedCondition))
1474+
}
1475+
})
1476+
}
1477+
}
1478+
14161479
func fakeConfigMapData() map[string]string {
14171480
data := make(map[string]string)
14181481
yaml, err := yaml.Marshal([]apiextensionsv1beta1.CustomResourceDefinition{crd("fake-crd")})

0 commit comments

Comments
 (0)