Skip to content

resolver: Add support for excluding global catalogs from resolution #2788

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions pkg/controller/operators/catalog/og_source_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package catalog

import (
"context"
"fmt"

v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/labels"
)

type OperatorGroupToggleSourceProvider struct {
sp cache.SourceProvider
logger *logrus.Logger
ogLister v1listers.OperatorGroupLister
}

func NewOperatorGroupToggleSourceProvider(sp cache.SourceProvider, logger *logrus.Logger,
ogLister v1listers.OperatorGroupLister) *OperatorGroupToggleSourceProvider {
return &OperatorGroupToggleSourceProvider{
sp: sp,
logger: logger,
ogLister: ogLister,
}
}

const exclusionAnnotation string = "olm.operatorframework.io/exclude-global-namespace-resolution"

func (e *OperatorGroupToggleSourceProvider) Sources(namespaces ...string) map[cache.SourceKey]cache.Source {
// Check if annotation is set first
resolutionNamespaces, err := e.CheckForExclusion(namespaces...)
if err != nil {
e.logger.Errorf("error checking namespaces %#v for global resolution exlusion: %s", namespaces, err)
// Fail early with a dummy Source that returns an error
// TODO: Update the Sources interface to return an error
m := make(map[cache.SourceKey]cache.Source)
key := cache.SourceKey{Name: "operatorgroup-unavailable", Namespace: namespaces[0]}
source := errorSource{err}
m[key] = source
return m
}
return e.sp.Sources(resolutionNamespaces...)
}

type errorSource struct {
error
}

func (e errorSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
return nil, e.error
}

func (e *OperatorGroupToggleSourceProvider) CheckForExclusion(namespaces ...string) ([]string, error) {
var defaultResult = namespaces
// The first namespace provided is always the current namespace being synced
var ownNamespace = namespaces[0]
var toggledResult = []string{ownNamespace}

// Check the OG on the NS provided for the exclusion annotation
ogs, err := e.ogLister.OperatorGroups(ownNamespace).List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("listing operatorgroups in namespace %s: %s", ownNamespace, err)
}

if len(ogs) != 1 {
// Problem with the operatorgroup configuration in the namespace, or the operatorgroup has not yet been persisted
// Note: a resync will be triggered if/when the operatorgroup becomes available
return nil, fmt.Errorf("found %d operatorgroups in namespace %s: expected 1", len(ogs), ownNamespace)
}

var og = ogs[0]
if val, ok := og.Annotations[exclusionAnnotation]; ok && val == "true" {
// Exclusion specified
// Ignore the globalNamespace for the purposes of resolution in this namespace
e.logger.Printf("excluding global catalogs from resolution in namespace %s", ownNamespace)
return toggledResult, nil
}

return defaultResult, nil
}
38 changes: 33 additions & 5 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/client-go/util/workqueue"

"github.com/operator-framework/api/pkg/operators/reference"
operatorsv1 "github.com/operator-framework/api/pkg/operators/v1"
"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions"
Expand Down Expand Up @@ -118,7 +119,7 @@ type Operator struct {
bundleUnpackTimeout time.Duration
clientFactory clients.Factory
muInstallPlan sync.Mutex
resolverSourceProvider *resolver.RegistrySourceProvider
sourceInvalidator *resolver.RegistrySourceProvider
}

type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
Expand Down Expand Up @@ -191,9 +192,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
clientFactory: clients.NewFactory(config),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.resolverSourceProvider = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger)
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger)
resolverSourceProvider := NewOperatorGroupToggleSourceProvider(op.sourceInvalidator, logger, op.lister.OperatorsV1().OperatorGroupLister())
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.resolverSourceProvider, logger)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, resolverSourceProvider, logger)
op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure)

// Wire OLM CR sharedIndexInformers
Expand Down Expand Up @@ -259,7 +261,19 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo

operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups()
op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister())
if err := op.RegisterInformer(operatorGroupInformer.Informer()); err != nil {
ogQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ogs")
op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue)
operatorGroupQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithQueue(ogQueue),
queueinformer.WithInformer(operatorGroupInformer.Informer()),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncOperatorGroups).ToSyncer()),
)
if err != nil {
return nil, err
}
if err := op.RegisterQueueInformer(operatorGroupQueueInformer); err != nil {
return nil, err
}

Expand Down Expand Up @@ -475,7 +489,7 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {

switch state.State {
case connectivity.Ready:
o.resolverSourceProvider.Invalidate(resolvercache.SourceKey(state.Key))
o.sourceInvalidator.Invalidate(resolvercache.SourceKey(state.Key))
if o.namespace == state.Key.Namespace {
namespaces, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer,
state.Key.Name, state.Key.Namespace)
Expand Down Expand Up @@ -1085,6 +1099,20 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
return nil
}

// syncOperatorGroups requeues the namespace resolution queue on changes to an operatorgroup
// This is because the operatorgroup is now an input to resolution via the global catalog exclusion annotation
func (o *Operator) syncOperatorGroups(obj interface{}) error {
og, ok := obj.(*operatorsv1.OperatorGroup)
if !ok {
o.logger.Debugf("wrong type: %#v", obj)
return fmt.Errorf("casting OperatorGroup failed")
}

o.nsResolveQueue.Add(og.GetNamespace())

return nil
}

func (o *Operator) nothingToUpdate(logger *logrus.Entry, sub *v1alpha1.Subscription) bool {
if sub.Status.InstallPlanRef != nil && sub.Status.State == v1alpha1.SubscriptionStateUpgradePending {
logger.Debugf("skipping update: installplan already created")
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/resolver/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (c *NamespacedOperatorCache) Error() error {
err := snapshot.err
snapshot.m.RUnlock()
if err != nil {
errs = append(errs, fmt.Errorf("error using catalog %s (in namespace %s): %w", key.Name, key.Namespace, err))
errs = append(errs, fmt.Errorf("failed to populate resolver cache from source %v: %w", key.String(), err))
}
}
return errors.NewAggregate(errs)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/resolver/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,5 +238,5 @@ func TestNamespaceOperatorCacheError(t *testing.T) {
key: ErrorSource{Error: errors.New("testing")},
})

require.EqualError(t, c.Namespaced("dummynamespace").Error(), "error using catalog dummyname (in namespace dummynamespace): testing")
require.EqualError(t, c.Namespaced("dummynamespace").Error(), "failed to populate resolver cache from source dummyname/dummynamespace: testing")
}
2 changes: 1 addition & 1 deletion pkg/controller/registry/resolver/step_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,7 @@ func TestResolver(t *testing.T) {
steps: [][]*v1alpha1.Step{},
subs: []*v1alpha1.Subscription{},
errAssert: func(t *testing.T, err error) {
assert.Contains(t, err.Error(), "error using catalog @existing (in namespace catsrc-namespace): csv")
assert.Contains(t, err.Error(), "failed to populate resolver cache from source @existing/catsrc-namespace: csv catsrc-namespace/a.v1")
assert.Contains(t, err.Error(), "in phase Failed instead of Replacing")
},
},
Expand Down
133 changes: 133 additions & 0 deletions test/e2e/catalog_exclusion_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package e2e

import (
"context"
"path/filepath"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
operatorsv1 "github.com/operator-framework/api/pkg/operators/v1"
"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/test/e2e/ctx"
"github.com/operator-framework/operator-lifecycle-manager/test/e2e/util"
. "github.com/operator-framework/operator-lifecycle-manager/test/e2e/util/gomega"
"google.golang.org/grpc/connectivity"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8scontrollerclient "sigs.k8s.io/controller-runtime/pkg/client"
)

const magicCatalogDir = "magiccatalog"

var _ = Describe("Global Catalog Exclusion", func() {
var (
testNamespace corev1.Namespace
determinedE2eClient *util.DeterminedE2EClient
operatorGroup operatorsv1.OperatorGroup
localCatalog *MagicCatalog
)

BeforeEach(func() {
determinedE2eClient = util.NewDeterminedClient(ctx.Ctx().E2EClient())

By("creating a namespace with an own namespace operator group without annotations")
e2eTestNamespace := genName("global-catalog-exclusion-e2e-")
operatorGroup = operatorsv1.OperatorGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: e2eTestNamespace,
Name: genName("og-"),
Annotations: nil,
},
Spec: operatorsv1.OperatorGroupSpec{
TargetNamespaces: []string{e2eTestNamespace},
},
}
testNamespace = SetupGeneratedTestNamespaceWithOperatorGroup(e2eTestNamespace, operatorGroup)

By("creating a broken catalog in the global namespace")
globalCatalog := &v1alpha1.CatalogSource{
ObjectMeta: metav1.ObjectMeta{
Name: genName("bad-global-catalog-"),
Namespace: operatorNamespace,
},
Spec: v1alpha1.CatalogSourceSpec{
DisplayName: "Broken Global Catalog Source",
SourceType: v1alpha1.SourceTypeGrpc,
Address: "1.1.1.1:1337", // points to non-existing service
},
}
_ = determinedE2eClient.Create(context.Background(), globalCatalog)

By("creating a healthy catalog in the test namespace")
localCatalogName := genName("good-catsrc-")
var err error = nil

fbcPath := filepath.Join(testdataDir, magicCatalogDir, "fbc_initial.yaml")
localCatalog, err = NewMagicCatalogFromFile(determinedE2eClient, testNamespace.GetName(), localCatalogName, fbcPath)
Expect(err).To(Succeed())

// deploy catalog blocks until the catalog has reached a ready state or fails
Expect(localCatalog.DeployCatalog(context.Background())).To(Succeed())

By("checking that the global catalog is broken")
// Adding this check here to speed up the test
// the global catalog can fail while we wait for the local catalog to get to a ready state
EventuallyResource(globalCatalog).Should(HaveGrpcConnectionWithLastConnectionState(connectivity.TransientFailure))
})

AfterEach(func() {
TeardownNamespace(testNamespace.GetName())
})

When("a subscription referring to the local catalog is created", func() {
var subscription *v1alpha1.Subscription

BeforeEach(func() {
subscription = &v1alpha1.Subscription{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace.GetName(),
Name: genName("local-subscription-"),
},
Spec: &v1alpha1.SubscriptionSpec{
CatalogSource: localCatalog.GetName(),
CatalogSourceNamespace: localCatalog.GetNamespace(),
Package: "packageA",
Channel: "stable",
InstallPlanApproval: v1alpha1.ApprovalAutomatic,
},
}

By("creating a subscription")
_ = determinedE2eClient.Create(context.Background(), subscription)
})

When("the operator group is annotated with olm.operatorframework.io/exclude-global-namespace-resolution=true", func() {

It("the broken subscription should resolve and have state AtLatest", func() {
By("checking that the subscription is not resolving and has a condition with type ResolutionFailed")
EventuallyResource(subscription).Should(ContainSubscriptionConditionOfType(v1alpha1.SubscriptionResolutionFailed))

By("annotating the operator group with olm.operatorframework.io/exclude-global-namespace-resolution=true")
Eventually(func() error {
annotatedOperatorGroup := operatorGroup.DeepCopy()
if err := determinedE2eClient.Get(context.Background(), k8scontrollerclient.ObjectKeyFromObject(annotatedOperatorGroup), annotatedOperatorGroup); err != nil {
return err
}

if annotatedOperatorGroup.Annotations == nil {
annotatedOperatorGroup.Annotations = map[string]string{}
}

annotatedOperatorGroup.Annotations["olm.operatorframework.io/exclude-global-namespace-resolution"] = "true"
if err := determinedE2eClient.Update(context.Background(), annotatedOperatorGroup); err != nil {
return err
}
return nil
}).Should(Succeed())

By("checking that the subscription resolves and has state AtLatest")
EventuallyResource(subscription).Should(HaveSubscriptionState(v1alpha1.SubscriptionStateAtLatest))
})
})
})
})