Skip to content

add OperatorGroup reconciliation #2517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions pkg/controller/operators/catalog/installplan_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,48 @@ func (o *Operator) triggerInstallPlanRetry(obj interface{}) (syncError error) {
syncError = utilerrors.NewAggregate(errs)
return
}

// When the operatorgroup is created after the installplan creation, the installplan must be resync
func (o *Operator) triggerInstallPlanUpdateForOperatorGroup(obj interface{}) (syncError error) {
metaObj, ok := obj.(metav1.Object)
if !ok {
syncError = errors.New("casting to metav1 object failed")
o.logger.Warn(syncError.Error())
return
}

ips, err := o.lister.OperatorsV1alpha1().InstallPlanLister().InstallPlans(metaObj.GetNamespace()).List(labels.Everything())
if err != nil {
syncError = err
return
}

update := func(ip *v1alpha1.InstallPlan) error {
out := ip.DeepCopy()
now := o.now()
out.Status.SetCondition(v1alpha1.ConditionMet(v1alpha1.InstallPlanResolved, &now))
_, err := o.client.OperatorsV1alpha1().InstallPlans(ip.GetNamespace()).UpdateStatus(context.TODO(), out, metav1.UpdateOptions{})

return err
}

var errs []error
for _, ip := range ips {
logger := o.logger.WithFields(logrus.Fields{
"ip": ip.GetName(),
"namespace": ip.GetNamespace(),
"phase": ip.Status.Phase,
})

if updateErr := update(ip); updateErr != nil {
errs = append(errs, updateErr)
logger.WithError(updateErr).Warn("failed to kick off InstallPlan retry")
continue
}

logger.Info("InstallPlan condition message set to 'OperatorGroup updated' for retry")
}

syncError = utilerrors.NewAggregate(errs)
return
}
34 changes: 32 additions & 2 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type Operator struct {
catsrcQueueSet *queueinformer.ResourceQueueSet
subQueueSet *queueinformer.ResourceQueueSet
ipQueueSet *queueinformer.ResourceQueueSet
ogQueueSet *queueinformer.ResourceQueueSet
nsResolveQueue workqueue.RateLimitingInterface
namespace string
recorder record.EventRecorder
Expand Down Expand Up @@ -181,6 +182,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
ipQueueSet: queueinformer.NewEmptyResourceQueueSet(),
ogQueueSet: queueinformer.NewEmptyResourceQueueSet(),
catalogSubscriberIndexer: map[string]cache.Indexer{},
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient),
Expand Down Expand Up @@ -275,6 +277,25 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil, err
}

// Wire OperatorGroup reconciliation
operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups()
op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister())
ogQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "og")
op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue)
operatorGroupQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithQueue(ogQueue),
queueinformer.WithInformer(operatorGroupInformer.Informer()),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncOperatorGroup).ToSyncer()),
)
if err != nil {
return nil, err
}
if err := op.RegisterQueueInformer(operatorGroupQueueInformer); err != nil {
return nil, err
}

// Wire Subscriptions
subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions()
op.lister.OperatorsV1alpha1().RegisterSubscriptionLister(metav1.NamespaceAll, subInformer.Lister())
Expand Down Expand Up @@ -529,10 +550,19 @@ func (o *Operator) syncObject(obj interface{}) (syncError error) {
}

o.requeueOwners(metaObj)

return o.triggerInstallPlanRetry(obj)
}

func (o *Operator) syncOperatorGroup(obj interface{}) (syncError error) {
_, ok := obj.(metav1.Object)
if !ok {
syncError = errors.New("casting to metav1 object failed")
o.logger.Warn(syncError.Error())
return
}
return o.triggerInstallPlanUpdateForOperatorGroup(obj)
}

func (o *Operator) handleDeletion(obj interface{}) {
metaObj, ok := obj.(metav1.Object)
if !ok {
Expand Down Expand Up @@ -1583,7 +1613,7 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) {

// Attempt to unpack bundles before installing
// Note: This should probably use the attenuated client to prevent users from resolving resources they otherwise don't have access to.
if len(plan.Status.BundleLookups) > 0 {
if plan.Status.BundleLookups != nil && len(plan.Status.BundleLookups) > 0 {
unpacked, out, err := o.unpackBundles(plan)
if err != nil {
// Retry sync if non-fatal error
Expand Down
63 changes: 63 additions & 0 deletions pkg/controller/operators/catalog/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1413,6 +1413,69 @@ func TestValidateExistingCRs(t *testing.T) {
}
}

func TestSyncOperatorGroup(t *testing.T) {
namespace := "ns"
inOperatorGroup := operatorGroup("og", "sa", namespace,
&corev1.ObjectReference{
Kind: "ServiceAccount",
Namespace: namespace,
Name: "sa",
})
tests := []struct {
testName string
err error
in *operatorsv1.OperatorGroup
expectedPhase v1alpha1.InstallPlanPhase
expectedCondition *v1alpha1.InstallPlanCondition
clientObjs []runtime.Object
}{
{
testName: "OnComplete",
in: inOperatorGroup,
clientObjs: []runtime.Object{
installPlan("p", namespace, v1alpha1.InstallPlanPhaseComplete, "csv"),
},
err: nil,
expectedPhase: v1alpha1.InstallPlanPhaseComplete,
expectedCondition: &v1alpha1.InstallPlanCondition{Type: v1alpha1.InstallPlanResolved, Status: corev1.ConditionTrue},
},
{
testName: "OnOnstalling",
in: inOperatorGroup,
clientObjs: []runtime.Object{
installPlan("p", namespace, v1alpha1.InstallPlanPhaseNone, "csv"),
},
err: nil,
expectedPhase: v1alpha1.InstallPlanPhaseNone,
expectedCondition: &v1alpha1.InstallPlanCondition{Type: v1alpha1.InstallPlanResolved, Status: corev1.ConditionTrue},
},
}

for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

tt.clientObjs = append(tt.clientObjs, tt.in)

op, err := NewFakeOperator(ctx, namespace, []string{namespace}, withClientObjs(tt.clientObjs...))
require.NoError(t, err)

err = op.syncOperatorGroup(tt.in)
require.Equal(t, tt.err, err)

ip, err := op.client.OperatorsV1alpha1().InstallPlans(namespace).Get(ctx, "p", metav1.GetOptions{})
require.NoError(t, err)

require.Equal(t, tt.expectedPhase, ip.Status.Phase)

if tt.expectedCondition != nil {
require.True(t, hasExpectedCondition(ip, *tt.expectedCondition))
}
})
}
}

func fakeConfigMapData() map[string]string {
data := make(map[string]string)
yaml, err := yaml.Marshal([]apiextensionsv1beta1.CustomResourceDefinition{crd("fake-crd")})
Expand Down