Skip to content

OCPBUGS-38290: [release-4.16] (fix) Resolver: list CatSrc using client, instead of referring to registry-server cache #838

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
clientFactory: clients.NewFactory(validatingConfig),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger)
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
resolverSourceProvider := NewOperatorGroupToggleSourceProvider(op.sourceInvalidator, logger, op.lister.OperatorsV1().OperatorGroupLister())
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, resolverSourceProvider, logger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,16 @@ func (c *catalogHealthReconciler) Reconcile(ctx context.Context, in kubestate.St

var healthUpdated, deprecationUpdated bool
next, healthUpdated = s.UpdateHealth(c.now(), catalogHealth...)
if healthUpdated {
if _, err := c.client.OperatorsV1alpha1().Subscriptions(ns).UpdateStatus(ctx, s.Subscription(), metav1.UpdateOptions{}); err != nil {
return nil, err
}
}
deprecationUpdated, err = c.updateDeprecatedStatus(ctx, s.Subscription())
if err != nil {
return next, err
}
if healthUpdated || deprecationUpdated {
if deprecationUpdated {
_, err = c.client.OperatorsV1alpha1().Subscriptions(ns).UpdateStatus(ctx, s.Subscription(), metav1.UpdateOptions{})
}
case SubscriptionExistsState:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (c *NamespacedOperatorCache) Error() error {
err := snapshot.err
snapshot.m.RUnlock()
if err != nil {
errs = append(errs, fmt.Errorf("failed to populate resolver cache from source %v: %w", key.String(), err))
errs = append(errs, fmt.Errorf("error using catalogsource %s/%s: %w", key.Namespace, key.Name, err))
}
}
return errors.NewAggregate(errs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,5 +238,5 @@ func TestNamespaceOperatorCacheError(t *testing.T) {
key: ErrorSource{Error: errors.New("testing")},
})

require.EqualError(t, c.Namespaced("dummynamespace").Error(), "failed to populate resolver cache from source dummyname/dummynamespace: testing")
require.EqualError(t, c.Namespaced("dummynamespace").Error(), "error using catalogsource dummynamespace/dummyname: testing")
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import (
"time"

"github.com/blang/semver/v4"
operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
"github.com/operator-framework/operator-registry/pkg/api"
"github.com/operator-framework/operator-registry/pkg/client"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/labels"
)

// todo: move to pkg/controller/operators/catalog
Expand Down Expand Up @@ -65,31 +68,65 @@ func (i *sourceInvalidator) GetValidChannel(key cache.SourceKey) <-chan struct{}
}

type RegistrySourceProvider struct {
rcp RegistryClientProvider
logger logrus.StdLogger
invalidator *sourceInvalidator
rcp RegistryClientProvider
catsrcLister v1alpha1listers.CatalogSourceLister
logger logrus.StdLogger
invalidator *sourceInvalidator
}

func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, logger logrus.StdLogger) *RegistrySourceProvider {
func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, catsrcLister v1alpha1listers.CatalogSourceLister, logger logrus.StdLogger) *RegistrySourceProvider {
return &RegistrySourceProvider{
rcp: rcp,
logger: logger,
rcp: rcp,
logger: logger,
catsrcLister: catsrcLister,
invalidator: &sourceInvalidator{
validChans: make(map[cache.SourceKey]chan struct{}),
ttl: 5 * time.Minute,
},
}
}

type errorSource struct {
error
}

func (s errorSource) Snapshot(_ context.Context) (*cache.Snapshot, error) {
return nil, s.error
}

func (a *RegistrySourceProvider) Sources(namespaces ...string) map[cache.SourceKey]cache.Source {
result := make(map[cache.SourceKey]cache.Source)
for key, client := range a.rcp.ClientsForNamespaces(namespaces...) {
result[cache.SourceKey(key)] = &registrySource{
key: cache.SourceKey(key),
client: client,
logger: a.logger,
invalidator: a.invalidator,

cats := []*operatorsv1alpha1.CatalogSource{}
for _, ns := range namespaces {
catsInNamespace, err := a.catsrcLister.CatalogSources(ns).List(labels.Everything())
if err != nil {
result[cache.SourceKey{Name: "", Namespace: ns}] = errorSource{
error: fmt.Errorf("failed to list catalogsources for namespace %q: %w", ns, err),
}
return result
}
cats = append(cats, catsInNamespace...)
}

clients := a.rcp.ClientsForNamespaces(namespaces...)
for _, cat := range cats {
key := cache.SourceKey{Name: cat.Name, Namespace: cat.Namespace}
if client, ok := clients[registry.CatalogKey{Name: cat.Name, Namespace: cat.Namespace}]; ok {
result[key] = &registrySource{
key: key,
client: client,
logger: a.logger,
invalidator: a.invalidator,
}
} else {
result[key] = errorSource{
error: fmt.Errorf("no registry client established for catalogsource %s/%s", cat.Namespace, cat.Name),
}
}
}
if len(result) == 0 {
return nil
}
return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1"
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
controllerbundle "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/bundle"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
Expand All @@ -16,10 +17,12 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)

const (
BundleLookupConditionPacked v1alpha1.BundleLookupConditionType = "BundleLookupNotPersisted"
exclusionAnnotation string = "olm.operatorframework.io/exclude-global-namespace-resolution"
)

// init hooks provides the downstream a way to modify the upstream behavior
Expand All @@ -32,6 +35,7 @@ type StepResolver interface {
type OperatorStepResolver struct {
subLister v1alpha1listers.SubscriptionLister
csvLister v1alpha1listers.ClusterServiceVersionLister
ogLister v1listers.OperatorGroupLister
client versioned.Interface
globalCatalogNamespace string
resolver *Resolver
Expand Down Expand Up @@ -69,6 +73,7 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio
stepResolver := &OperatorStepResolver{
subLister: lister.OperatorsV1alpha1().SubscriptionLister(),
csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(),
ogLister: lister.OperatorsV1().OperatorGroupLister(),
client: client,
globalCatalogNamespace: globalCatalogNamespace,
resolver: NewDefaultResolver(cacheSourceProvider, catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()}, log),
Expand All @@ -91,7 +96,22 @@ func (r *OperatorStepResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step,
return nil, nil, nil, err
}

namespaces := []string{namespace, r.globalCatalogNamespace}
namespaces := []string{namespace}
ogs, err := r.ogLister.OperatorGroups(namespace).List(labels.Everything())
if err != nil {
return nil, nil, nil, fmt.Errorf("listing operatorgroups in namespace %s: %s", namespace, err)
}
if len(ogs) != 1 {
return nil, nil, nil, fmt.Errorf("expected 1 OperatorGroup in the namespace, found %d", len(ogs))
}
og := ogs[0]
if val, ok := og.Annotations[exclusionAnnotation]; ok && val == "true" {
// Exclusion specified
// Ignore the globalNamespace for the purposes of resolution in this namespace
r.log.Printf("excluding global catalogs from resolution in namespace %s", namespace)
} else {
namespaces = append(namespaces, r.globalCatalogNamespace)
}
operators, err := r.resolver.Resolve(namespaces, subs)
if err != nil {
return nil, nil, nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,7 @@ func TestResolver(t *testing.T) {
steps: [][]*v1alpha1.Step{},
subs: []*v1alpha1.Subscription{},
errAssert: func(t *testing.T, err error) {
assert.Contains(t, err.Error(), "failed to populate resolver cache from source @existing/catsrc-namespace: csv")
assert.Contains(t, err.Error(), "error using catalogsource catsrc-namespace/@existing: csv")
assert.Contains(t, err.Error(), "in phase Failed instead of Replacing")
},
},
Expand Down Expand Up @@ -1377,6 +1377,7 @@ func TestNamespaceResolverRBAC(t *testing.T) {
name: "NewSubscription/Permissions/ClusterPermissions",
clusterState: []runtime.Object{
newSub(namespace, "a", "alpha", catalog),
newOperatorGroup("test-og", namespace),
},
bundlesInCatalog: []*api.Bundle{bundle},
out: out{
Expand All @@ -1392,6 +1393,7 @@ func TestNamespaceResolverRBAC(t *testing.T) {
name: "don't create default service accounts",
clusterState: []runtime.Object{
newSub(namespace, "a", "alpha", catalog),
newOperatorGroup("test-og", namespace),
},
bundlesInCatalog: []*api.Bundle{bundleWithDefaultServiceAccount},
out: out{
Expand All @@ -1418,6 +1420,7 @@ func TestNamespaceResolverRBAC(t *testing.T) {
lister := operatorlister.NewLister()
lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, informerFactory.Operators().V1alpha1().Subscriptions().Lister())
lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, informerFactory.Operators().V1alpha1().ClusterServiceVersions().Lister())
lister.OperatorsV1().RegisterOperatorGroupLister(namespace, informerFactory.Operators().V1().OperatorGroups().Lister())

stubSnapshot := &resolvercache.Snapshot{}
for _, bundle := range tt.bundlesInCatalog {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading