Skip to content

[release-4.16] OCPBUGS-48696: Fix excessive catalog source snapshots cause severe performance regression #957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ type Operator struct {
bundleUnpackTimeout time.Duration
clientFactory clients.Factory
muInstallPlan sync.Mutex
sourceInvalidator *resolver.RegistrySourceProvider
resolverSourceProvider *resolver.RegistrySourceProvider
operatorCacheProvider resolvercache.OperatorCacheProvider
}

type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
Expand Down Expand Up @@ -216,10 +217,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
clientFactory: clients.NewFactory(validatingConfig),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
resolverSourceProvider := NewOperatorGroupToggleSourceProvider(op.sourceInvalidator, logger, op.lister.OperatorsV1().OperatorGroupLister())
op.resolverSourceProvider = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
op.operatorCacheProvider = resolver.NewOperatorCacheProvider(lister, crClient, op.resolverSourceProvider, logger)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, resolverSourceProvider, logger)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.operatorCacheProvider, logger)
op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure)

// Wire OLM CR sharedIndexInformers
Expand Down Expand Up @@ -361,7 +362,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions)),
subscription.WithRegistryReconcilerFactory(op.reconciler),
subscription.WithGlobalCatalogNamespace(op.namespace),
subscription.WithSourceProvider(op.sourceInvalidator),
subscription.WithOperatorCacheProvider(op.operatorCacheProvider),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -782,10 +783,11 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {

o.logger.Infof("state.Key.Namespace=%s state.Key.Name=%s state.State=%s", state.Key.Namespace, state.Key.Name, state.State.String())
metrics.RegisterCatalogSourceState(state.Key.Name, state.Key.Namespace, state.State)
metrics.RegisterCatalogSourceSnapshotsTotal(state.Key.Name, state.Key.Namespace)

switch state.State {
case connectivity.Ready:
o.sourceInvalidator.Invalidate(resolvercache.SourceKey(state.Key))
o.resolverSourceProvider.Invalidate(resolvercache.SourceKey(state.Key))
if o.namespace == state.Key.Namespace {
namespaces, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer,
state.Key.Name, state.Key.Namespace)
Expand Down Expand Up @@ -897,6 +899,7 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) {
o.logger.WithField("source", sourceKey).Info("removed client for deleted catalogsource")

metrics.DeleteCatalogSourceStateMetric(catsrc.GetName(), catsrc.GetNamespace())
metrics.DeleteCatalogSourceSnapshotsTotal(catsrc.GetName(), catsrc.GetNamespace())
}

func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, _ error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type syncerConfig struct {
reconcilers kubestate.ReconcilerChain
registryReconcilerFactory reconciler.RegistryReconcilerFactory
globalCatalogNamespace string
sourceProvider resolverCache.SourceProvider
operatorCacheProvider resolverCache.OperatorCacheProvider
}

// SyncerOption is a configuration option for a subscription syncer.
Expand Down Expand Up @@ -130,9 +130,9 @@ func WithGlobalCatalogNamespace(namespace string) SyncerOption {
}
}

func WithSourceProvider(provider resolverCache.SourceProvider) SyncerOption {
func WithOperatorCacheProvider(provider resolverCache.OperatorCacheProvider) SyncerOption {
return func(config *syncerConfig) {
config.sourceProvider = provider
config.operatorCacheProvider = provider
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type catalogHealthReconciler struct {
catalogLister listers.CatalogSourceLister
registryReconcilerFactory reconciler.RegistryReconcilerFactory
globalCatalogNamespace string
sourceProvider cache.SourceProvider
operatorCacheProvider cache.OperatorCacheProvider
logger logrus.StdLogger
}

// Reconcile reconciles subscription catalog health conditions.
Expand Down Expand Up @@ -126,21 +127,16 @@ func (c *catalogHealthReconciler) Reconcile(ctx context.Context, in kubestate.St
// updateDeprecatedStatus adds deprecation status conditions to the subscription when present in the cache entry then
// returns a bool value of true if any changes to the existing subscription have occurred.
func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, sub *v1alpha1.Subscription) (bool, error) {
if c.sourceProvider == nil {
if c.operatorCacheProvider == nil {
return false, nil
}
source, ok := c.sourceProvider.Sources(sub.Spec.CatalogSourceNamespace)[cache.SourceKey{

entries := c.operatorCacheProvider.Namespaced(sub.Spec.CatalogSourceNamespace).Catalog(cache.SourceKey{
Name: sub.Spec.CatalogSource,
Namespace: sub.Spec.CatalogSourceNamespace,
}]
if !ok {
return false, nil
}
snapshot, err := source.Snapshot(ctx)
if err != nil {
return false, err
}
if len(snapshot.Entries) == 0 {
}).Find(cache.PkgPredicate(sub.Spec.Package), cache.ChannelPredicate(sub.Spec.Channel))

if len(entries) == 0 {
return false, nil
}

Expand All @@ -149,12 +145,9 @@ func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, su
var deprecations *cache.Deprecations

found := false
for _, entry := range snapshot.Entries {
for _, entry := range entries {
// Find the cache entry that matches this subscription
if entry.SourceInfo == nil || entry.Package() != sub.Spec.Package {
continue
}
if sub.Spec.Channel != "" && entry.Channel() != sub.Spec.Channel {
if entry.SourceInfo == nil {
continue
}
if sub.Status.InstalledCSV != entry.Name {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/operator-framework/api/pkg/operators/install"
"github.com/operator-framework/api/pkg/operators/v1alpha1"
listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
resolverCache "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
"github.com/operator-framework/operator-lifecycle-manager/pkg/metrics"
Expand All @@ -38,7 +37,6 @@ type subscriptionSyncer struct {
installPlanLister listers.InstallPlanLister
globalCatalogNamespace string
notify kubestate.NotifyFunc
sourceProvider resolverCache.SourceProvider
}

// now returns the Syncer's current time.
Expand Down Expand Up @@ -218,7 +216,6 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S
reconcilers: config.reconcilers,
subscriptionCache: config.subscriptionInformer.GetIndexer(),
installPlanLister: config.lister.OperatorsV1alpha1().InstallPlanLister(),
sourceProvider: config.sourceProvider,
notify: func(event types.NamespacedName) {
// Notify Subscriptions by enqueuing to the Subscription queue.
config.subscriptionQueue.Add(event)
Expand Down Expand Up @@ -256,7 +253,8 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S
catalogLister: config.lister.OperatorsV1alpha1().CatalogSourceLister(),
registryReconcilerFactory: config.registryReconcilerFactory,
globalCatalogNamespace: config.globalCatalogNamespace,
sourceProvider: config.sourceProvider,
operatorCacheProvider: config.operatorCacheProvider,
logger: config.logger,
},
}
s.reconcilers = append(defaultReconcilers, s.reconcilers...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"slices"
"sort"
"strings"

Expand All @@ -29,15 +30,15 @@ type constraintProvider interface {
}

type Resolver struct {
cache *cache.Cache
cache cache.OperatorCacheProvider
log logrus.FieldLogger
pc *predicateConverter
systemConstraintsProvider constraintProvider
}

func NewDefaultResolver(rcp cache.SourceProvider, sourcePriorityProvider cache.SourcePriorityProvider, logger logrus.FieldLogger) *Resolver {
func NewDefaultResolver(cacheProvider cache.OperatorCacheProvider, logger logrus.FieldLogger) *Resolver {
return &Resolver{
cache: cache.New(rcp, cache.WithLogger(logger), cache.WithSourcePriorityProvider(sourcePriorityProvider)),
cache: cacheProvider,
log: logger,
pc: &predicateConverter{
celEnv: constraints.NewCelEnvironment(),
Expand Down Expand Up @@ -513,11 +514,13 @@ func (r *Resolver) addInvariants(namespacedCache cache.MultiCatalogOperatorFinde
}

for gvk, is := range gvkConflictToVariable {
slices.Sort(is)
s := NewSingleAPIProviderVariable(gvk.Group, gvk.Version, gvk.Kind, is)
variables[s.Identifier()] = s
}

for pkg, is := range packageConflictToVariable {
slices.Sort(is)
s := NewSinglePackageInstanceVariable(pkg, is)
variables[s.Identifier()] = s
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package solver

import (
"cmp"
"fmt"
"slices"
"strings"

"github.com/go-air/gini/inter"
Expand Down Expand Up @@ -203,5 +205,8 @@ func (d *litMapping) Conflicts(g inter.Assumable) []AppliedConstraint {
as = append(as, a)
}
}
slices.SortFunc(as, func(a, b AppliedConstraint) int {
return cmp.Compare(a.String(), b.String())
})
return as
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
"github.com/operator-framework/operator-lifecycle-manager/pkg/metrics"
"github.com/operator-framework/operator-registry/pkg/api"
"github.com/operator-framework/operator-registry/pkg/client"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
Expand Down Expand Up @@ -143,6 +144,9 @@ type registrySource struct {
}

func (s *registrySource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
s.logger.Printf("requesting snapshot for catalog source %s/%s", s.key.Namespace, s.key.Name)
metrics.IncrementCatalogSourceSnapshotsTotal(s.key.Name, s.key.Namespace)

// Fetching default channels this way makes many round trips
// -- may need to either add a new API to fetch all at once,
// or embed the information into Bundle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (pp catsrcPriorityProvider) Priority(key cache.SourceKey) int {
return catsrc.Spec.Priority
}

func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versioned.Interface, globalCatalogNamespace string, sourceProvider cache.SourceProvider, log logrus.FieldLogger) *OperatorStepResolver {
func NewOperatorCacheProvider(lister operatorlister.OperatorLister, client versioned.Interface, sourceProvider cache.SourceProvider, log logrus.FieldLogger) cache.OperatorCacheProvider {
cacheSourceProvider := &mergedSourceProvider{
sps: []cache.SourceProvider{
sourceProvider,
Expand All @@ -70,13 +70,19 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio
},
},
}
catSrcPriorityProvider := &catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()}

return cache.New(cacheSourceProvider, cache.WithLogger(log), cache.WithSourcePriorityProvider(catSrcPriorityProvider))
}

func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versioned.Interface, globalCatalogNamespace string, opCacheProvider cache.OperatorCacheProvider, log logrus.FieldLogger) *OperatorStepResolver {
stepResolver := &OperatorStepResolver{
subLister: lister.OperatorsV1alpha1().SubscriptionLister(),
csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(),
ogLister: lister.OperatorsV1().OperatorGroupLister(),
client: client,
globalCatalogNamespace: globalCatalogNamespace,
resolver: NewDefaultResolver(cacheSourceProvider, catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()}, log),
resolver: NewDefaultResolver(opCacheProvider, log),
log: log,
}

Expand Down
21 changes: 21 additions & 0 deletions staging/operator-lifecycle-manager/pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ var (
[]string{NamespaceLabel, NameLabel},
)

catalogSourceSnapshotsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "catalog_source_snapshots_total",
Help: "The number of times the catalog operator has requested a snapshot of data from a catalog source",
},
[]string{NamespaceLabel, NameLabel},
)

// exported since it's not handled by HandleMetrics
CSVUpgradeCount = prometheus.NewCounter(
prometheus.CounterOpts{
Expand Down Expand Up @@ -250,6 +258,7 @@ func RegisterCatalog() {
prometheus.MustRegister(subscriptionCount)
prometheus.MustRegister(catalogSourceCount)
prometheus.MustRegister(catalogSourceReady)
prometheus.MustRegister(catalogSourceSnapshotsTotal)
prometheus.MustRegister(SubscriptionSyncCount)
prometheus.MustRegister(dependencyResolutionSummary)
prometheus.MustRegister(installPlanWarningCount)
Expand All @@ -272,6 +281,18 @@ func DeleteCatalogSourceStateMetric(name, namespace string) {
catalogSourceReady.DeleteLabelValues(namespace, name)
}

func RegisterCatalogSourceSnapshotsTotal(name, namespace string) {
catalogSourceSnapshotsTotal.WithLabelValues(namespace, name).Add(0)
}

func IncrementCatalogSourceSnapshotsTotal(name, namespace string) {
catalogSourceSnapshotsTotal.WithLabelValues(namespace, name).Inc()
}

func DeleteCatalogSourceSnapshotsTotal(name, namespace string) {
catalogSourceSnapshotsTotal.DeleteLabelValues(namespace, name)
}

func DeleteCSVMetric(oldCSV *operatorsv1alpha1.ClusterServiceVersion) {
// Delete the old CSV metrics
csvAbnormal.DeleteLabelValues(oldCSV.Namespace, oldCSV.Name, oldCSV.Spec.Version.String(), string(oldCSV.Status.Phase), string(oldCSV.Status.Reason))
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading