Skip to content

(fix) Resolver: list CatSrc using client, instead of referring to registry-server cache #3349

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
clientFactory: clients.NewFactory(validatingConfig),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger)
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
resolverSourceProvider := NewOperatorGroupToggleSourceProvider(op.sourceInvalidator, logger, op.lister.OperatorsV1().OperatorGroupLister())
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, resolverSourceProvider, logger)
Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/operators/catalog/subscription/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,16 @@ func (c *catalogHealthReconciler) Reconcile(ctx context.Context, in kubestate.St

var healthUpdated, deprecationUpdated bool
next, healthUpdated = s.UpdateHealth(c.now(), catalogHealth...)
if healthUpdated {
if _, err := c.client.OperatorsV1alpha1().Subscriptions(ns).UpdateStatus(ctx, s.Subscription(), metav1.UpdateOptions{}); err != nil {
return nil, err
}
}
deprecationUpdated, err = c.updateDeprecatedStatus(ctx, s.Subscription())
if err != nil {
return next, err
}
if healthUpdated || deprecationUpdated {
if deprecationUpdated {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this case we don't check for error... but the health case we do now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Daniel and I had a conversation about this here #3349 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you mean to say that this error is unchecked, I noticed it too, but left it alone since it isn't part of the PR. But since I'm in this area I could totally just add that in if that's what you were asking for.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably not clear because the diff cuts off the func definition, but the error is unchecked because this func uses named returns, and this is the last potential error-causing statement, so that error will be returned if there is one. I'm not defending it, I really don't like named returns, just explaining why it looks like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah no that's sounds fine to me. It's one of those "there's 2 ways of doing this and both are right", but it's not related to this PR.

_, err = c.client.OperatorsV1alpha1().Subscriptions(ns).UpdateStatus(ctx, s.Subscription(), metav1.UpdateOptions{})
}
case SubscriptionExistsState:
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/resolver/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (c *NamespacedOperatorCache) Error() error {
err := snapshot.err
snapshot.m.RUnlock()
if err != nil {
errs = append(errs, fmt.Errorf("failed to populate resolver cache from source %v: %w", key.String(), err))
errs = append(errs, fmt.Errorf("error using catalogsource %s/%s: %w", key.Namespace, key.Name, err))
}
}
return errors.NewAggregate(errs)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/resolver/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,5 +238,5 @@ func TestNamespaceOperatorCacheError(t *testing.T) {
key: ErrorSource{Error: errors.New("testing")},
})

require.EqualError(t, c.Namespaced("dummynamespace").Error(), "failed to populate resolver cache from source dummyname/dummynamespace: testing")
require.EqualError(t, c.Namespaced("dummynamespace").Error(), "error using catalogsource dummynamespace/dummyname: testing")
}
61 changes: 49 additions & 12 deletions pkg/controller/registry/resolver/source_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import (
"time"

"github.com/blang/semver/v4"
operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
"github.com/operator-framework/operator-registry/pkg/api"
"github.com/operator-framework/operator-registry/pkg/client"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/labels"
)

// todo: move to pkg/controller/operators/catalog
Expand Down Expand Up @@ -65,31 +68,65 @@ func (i *sourceInvalidator) GetValidChannel(key cache.SourceKey) <-chan struct{}
}

type RegistrySourceProvider struct {
rcp RegistryClientProvider
logger logrus.StdLogger
invalidator *sourceInvalidator
rcp RegistryClientProvider
catsrcLister v1alpha1listers.CatalogSourceLister
logger logrus.StdLogger
invalidator *sourceInvalidator
}

func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, logger logrus.StdLogger) *RegistrySourceProvider {
func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, catsrcLister v1alpha1listers.CatalogSourceLister, logger logrus.StdLogger) *RegistrySourceProvider {
return &RegistrySourceProvider{
rcp: rcp,
logger: logger,
rcp: rcp,
logger: logger,
catsrcLister: catsrcLister,
invalidator: &sourceInvalidator{
validChans: make(map[cache.SourceKey]chan struct{}),
ttl: 5 * time.Minute,
},
}
}

type errorSource struct {
error
}

func (s errorSource) Snapshot(_ context.Context) (*cache.Snapshot, error) {
return nil, s.error
}

func (a *RegistrySourceProvider) Sources(namespaces ...string) map[cache.SourceKey]cache.Source {
result := make(map[cache.SourceKey]cache.Source)
for key, client := range a.rcp.ClientsForNamespaces(namespaces...) {
result[cache.SourceKey(key)] = &registrySource{
key: cache.SourceKey(key),
client: client,
logger: a.logger,
invalidator: a.invalidator,

cats := []*operatorsv1alpha1.CatalogSource{}
for _, ns := range namespaces {
catsInNamespace, err := a.catsrcLister.CatalogSources(ns).List(labels.Everything())
if err != nil {
result[cache.SourceKey{Name: "", Namespace: ns}] = errorSource{
error: fmt.Errorf("failed to list catalogsources for namespace %q: %w", ns, err),
}
return result
}
cats = append(cats, catsInNamespace...)
}

clients := a.rcp.ClientsForNamespaces(namespaces...)
for _, cat := range cats {
key := cache.SourceKey{Name: cat.Name, Namespace: cat.Namespace}
if client, ok := clients[registry.CatalogKey{Name: cat.Name, Namespace: cat.Namespace}]; ok {
result[key] = &registrySource{
key: key,
client: client,
logger: a.logger,
invalidator: a.invalidator,
}
} else {
result[key] = errorSource{
error: fmt.Errorf("no registry client established for catalogsource %s/%s", cat.Namespace, cat.Name),
}
}
}
if len(result) == 0 {
return nil
}
return result
}
Expand Down
22 changes: 21 additions & 1 deletion pkg/controller/registry/resolver/step_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1"
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
controllerbundle "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/bundle"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
Expand All @@ -16,10 +17,12 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)

const (
BundleLookupConditionPacked v1alpha1.BundleLookupConditionType = "BundleLookupNotPersisted"
exclusionAnnotation string = "olm.operatorframework.io/exclude-global-namespace-resolution"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is now part of the "API"? Is this documented?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's coming from this PR that added the feature (see ref).

So it's an existing feature, I'm just moving the usage to a different area in this PR. However, now that you bring it up, fyi I also have an experiment going on here to essentially redo the PR that added this feature, using only the change I made in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoided removing the og-source-provider in this PR because it needs to be back ported and wanted to keep the scope small. But I think that makes a lot of sense as a follow up immediately.

)

// init hooks provides the downstream a way to modify the upstream behavior
Expand All @@ -32,6 +35,7 @@ type StepResolver interface {
type OperatorStepResolver struct {
subLister v1alpha1listers.SubscriptionLister
csvLister v1alpha1listers.ClusterServiceVersionLister
ogLister v1listers.OperatorGroupLister
client versioned.Interface
globalCatalogNamespace string
resolver *Resolver
Expand Down Expand Up @@ -69,6 +73,7 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio
stepResolver := &OperatorStepResolver{
subLister: lister.OperatorsV1alpha1().SubscriptionLister(),
csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(),
ogLister: lister.OperatorsV1().OperatorGroupLister(),
client: client,
globalCatalogNamespace: globalCatalogNamespace,
resolver: NewDefaultResolver(cacheSourceProvider, catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()}, log),
Expand All @@ -91,7 +96,22 @@ func (r *OperatorStepResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step,
return nil, nil, nil, err
}

namespaces := []string{namespace, r.globalCatalogNamespace}
namespaces := []string{namespace}
ogs, err := r.ogLister.OperatorGroups(namespace).List(labels.Everything())
if err != nil {
return nil, nil, nil, fmt.Errorf("listing operatorgroups in namespace %s: %s", namespace, err)
}
if len(ogs) != 1 {
return nil, nil, nil, fmt.Errorf("expected 1 OperatorGroup in the namespace, found %d", len(ogs))
}
og := ogs[0]
if val, ok := og.Annotations[exclusionAnnotation]; ok && val == "true" {
// Exclusion specified
// Ignore the globalNamespace for the purposes of resolution in this namespace
r.log.Printf("excluding global catalogs from resolution in namespace %s", namespace)
} else {
namespaces = append(namespaces, r.globalCatalogNamespace)
}
operators, err := r.resolver.Resolve(namespaces, subs)
if err != nil {
return nil, nil, nil, err
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/registry/resolver/step_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,7 @@ func TestResolver(t *testing.T) {
steps: [][]*v1alpha1.Step{},
subs: []*v1alpha1.Subscription{},
errAssert: func(t *testing.T, err error) {
assert.Contains(t, err.Error(), "failed to populate resolver cache from source @existing/catsrc-namespace: csv")
assert.Contains(t, err.Error(), "error using catalogsource catsrc-namespace/@existing: csv")
assert.Contains(t, err.Error(), "in phase Failed instead of Replacing")
},
},
Expand Down Expand Up @@ -1377,6 +1377,7 @@ func TestNamespaceResolverRBAC(t *testing.T) {
name: "NewSubscription/Permissions/ClusterPermissions",
clusterState: []runtime.Object{
newSub(namespace, "a", "alpha", catalog),
newOperatorGroup("test-og", namespace),
},
bundlesInCatalog: []*api.Bundle{bundle},
out: out{
Expand All @@ -1392,6 +1393,7 @@ func TestNamespaceResolverRBAC(t *testing.T) {
name: "don't create default service accounts",
clusterState: []runtime.Object{
newSub(namespace, "a", "alpha", catalog),
newOperatorGroup("test-og", namespace),
},
bundlesInCatalog: []*api.Bundle{bundleWithDefaultServiceAccount},
out: out{
Expand All @@ -1418,6 +1420,7 @@ func TestNamespaceResolverRBAC(t *testing.T) {
lister := operatorlister.NewLister()
lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, informerFactory.Operators().V1alpha1().Subscriptions().Lister())
lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, informerFactory.Operators().V1alpha1().ClusterServiceVersions().Lister())
lister.OperatorsV1().RegisterOperatorGroupLister(namespace, informerFactory.Operators().V1().OperatorGroups().Lister())

stubSnapshot := &resolvercache.Snapshot{}
for _, bundle := range tt.bundlesInCatalog {
Expand Down
Loading