Skip to content

Replace hardcoded resolver cache mutation with CSV cache.Source. #2632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient)
res := resolver.NewOperatorStepResolver(lister, crClient, opClient.KubernetesInterface(), operatorNamespace, op.sources, logger)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.sources, logger)
op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure)

// Wire OLM CR sharedIndexInformers
Expand Down
36 changes: 15 additions & 21 deletions pkg/controller/registry/resolver/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func New(sp SourceProvider, options ...Option) *Cache {
}

type NamespacedOperatorCache struct {
existing *SourceKey
snapshots map[SourceKey]*snapshotHeader
}

Expand Down Expand Up @@ -236,6 +235,12 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
priority: c.sourcePriorityProvider.Priority(miss),
}

if miss.Virtual() {
// hack! always refresh virtual catalogs.
// todo: Sources should be responsible for determining when the Snapshots they produce become invalid
hdr.expiry = time.Time{}
}

hdr.m.Lock()
c.snapshots[miss] = &hdr
result.snapshots[miss] = &hdr
Expand Down Expand Up @@ -267,31 +272,14 @@ func (c *NamespacedOperatorCache) FindPreferred(preferred *SourceKey, preferredN
if preferred != nil && preferred.Empty() {
preferred = nil
}
sorted := newSortableSnapshots(c.existing, preferred, preferredNamespace, c.snapshots)
sorted := newSortableSnapshots(preferred, preferredNamespace, c.snapshots)
sort.Sort(sorted)
for _, snapshot := range sorted.snapshots {
result = append(result, snapshot.Find(p...)...)
}
return result
}

func (c *NamespacedOperatorCache) WithExistingOperators(snapshot *Snapshot, namespace string) MultiCatalogOperatorFinder {
key := NewVirtualSourceKey(namespace)
o := &NamespacedOperatorCache{
existing: &key,
snapshots: map[SourceKey]*snapshotHeader{
key: {
key: key,
snapshot: snapshot,
},
},
}
for k, v := range c.snapshots {
o.snapshots[k] = v
}
return o
}

func (c *NamespacedOperatorCache) Find(p ...Predicate) []*Entry {
return c.FindPreferred(nil, "", p...)
}
Expand Down Expand Up @@ -334,7 +322,14 @@ type sortableSnapshots struct {
existing *SourceKey
}

func newSortableSnapshots(existing, preferred *SourceKey, preferredNamespace string, snapshots map[SourceKey]*snapshotHeader) sortableSnapshots {
func newSortableSnapshots(preferred *SourceKey, preferredNamespace string, snapshots map[SourceKey]*snapshotHeader) sortableSnapshots {
var existing *SourceKey
for key := range snapshots {
if key.Virtual() && key.Namespace == preferredNamespace {
existing = &key
break
}
}
sorted := sortableSnapshots{
existing: existing,
preferred: preferred,
Expand Down Expand Up @@ -419,7 +414,6 @@ type OperatorFinder interface {
type MultiCatalogOperatorFinder interface {
Catalog(SourceKey) OperatorFinder
FindPreferred(preferred *SourceKey, preferredNamespace string, predicates ...Predicate) []*Entry
WithExistingOperators(snapshot *Snapshot, namespace string) MultiCatalogOperatorFinder
Error() error
OperatorFinder
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/controller/registry/resolver/cache/operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,6 @@ func (i *OperatorSourceInfo) String() string {
return fmt.Sprintf("%s/%s in %s/%s", i.Package, i.Channel, i.Catalog.Name, i.Catalog.Namespace)
}

var NoCatalog = SourceKey{Name: "", Namespace: ""}
var ExistingOperator = OperatorSourceInfo{Package: "", Channel: "", StartingCSV: "", Catalog: NoCatalog, DefaultChannel: false}

type Entry struct {
Name string
Replaces string
Expand Down
190 changes: 18 additions & 172 deletions pkg/controller/registry/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@ import (
"github.com/operator-framework/api/pkg/constraints"
"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/projection"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/solver"
"github.com/operator-framework/operator-registry/pkg/api"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
)

type OperatorResolver interface {
SolveOperators(csvs []*v1alpha1.ClusterServiceVersion, subs []*v1alpha1.Subscription, add map[cache.OperatorSourceInfo]struct{}) (cache.OperatorSet, error)
SolveOperators(csvs []*v1alpha1.ClusterServiceVersion, add map[cache.OperatorSourceInfo]struct{}) (cache.OperatorSet, error)
}

type SatResolver struct {
Expand Down Expand Up @@ -51,7 +50,7 @@ func (w *debugWriter) Write(b []byte) (int, error) {
return n, nil
}

func (r *SatResolver) SolveOperators(namespaces []string, csvs []*v1alpha1.ClusterServiceVersion, subs []*v1alpha1.Subscription) (cache.OperatorSet, error) {
func (r *SatResolver) SolveOperators(namespaces []string, subs []*v1alpha1.Subscription) (cache.OperatorSet, error) {
var errs []error

variables := make(map[solver.Identifier]solver.Variable)
Expand All @@ -60,14 +59,15 @@ func (r *SatResolver) SolveOperators(namespaces []string, csvs []*v1alpha1.Clust
// TODO: better abstraction
startingCSVs := make(map[string]struct{})

// build a virtual catalog of all currently installed CSVs
existingSnapshot, err := r.newSnapshotForNamespace(namespaces[0], subs, csvs)
if err != nil {
return nil, err
namespacedCache := r.cache.Namespaced(namespaces...)

if len(namespaces) < 1 {
// the first namespace is treated as the preferred namespace today
return nil, fmt.Errorf("at least one namespace must be provided to resolution")
}
namespacedCache := r.cache.Namespaced(namespaces...).WithExistingOperators(existingSnapshot, namespaces[0])

_, existingVariables, err := r.getBundleVariables(namespaces[0], cache.Filter(existingSnapshot.Entries, cache.True()), namespacedCache, visited)
preferredNamespace := namespaces[0]
_, existingVariables, err := r.getBundleVariables(preferredNamespace, namespacedCache.Catalog(cache.NewVirtualSourceKey(preferredNamespace)).Find(cache.True()), namespacedCache, visited)
if err != nil {
return nil, err
}
Expand All @@ -79,15 +79,16 @@ func (r *SatResolver) SolveOperators(namespaces []string, csvs []*v1alpha1.Clust
for _, sub := range subs {
// find the currently installed operator (if it exists)
var current *cache.Entry
for _, csv := range csvs {
if csv.Name == sub.Status.InstalledCSV {
op, err := newOperatorFromV1Alpha1CSV(csv)
if err != nil {
return nil, err
}
current = op
break

matches := namespacedCache.Catalog(cache.NewVirtualSourceKey(sub.Namespace)).Find(cache.CSVNamePredicate(sub.Status.InstalledCSV))
if len(matches) > 1 {
var names []string
for _, each := range matches {
names = append(names, each.Name)
}
return nil, fmt.Errorf("multiple name matches for status.installedCSV of subscription %s/%s: %s", sub.Namespace, sub.Name, strings.Join(names, ", "))
} else if len(matches) == 1 {
current = matches[0]
}

if current == nil && sub.Spec.StartingCSV != "" {
Expand Down Expand Up @@ -460,116 +461,6 @@ func (r *SatResolver) getBundleVariables(preferredNamespace string, bundleStack
return ids, variables, nil
}

func (r *SatResolver) inferProperties(csv *v1alpha1.ClusterServiceVersion, subs []*v1alpha1.Subscription) ([]*api.Property, error) {
var properties []*api.Property

packages := make(map[string]struct{})
for _, sub := range subs {
if sub.Status.InstalledCSV != csv.Name {
continue
}
// Without sanity checking the Subscription spec's
// package against catalog contents, updates to the
// Subscription spec could result in a bad package
// inference.
for _, entry := range r.cache.Namespaced(sub.Spec.CatalogSourceNamespace).Catalog(cache.SourceKey{Namespace: sub.Spec.CatalogSourceNamespace, Name: sub.Spec.CatalogSource}).Find(cache.And(cache.CSVNamePredicate(csv.Name), cache.PkgPredicate(sub.Spec.Package))) {
if pkg := entry.Package(); pkg != "" {
packages[pkg] = struct{}{}
}
}
}
if l := len(packages); l != 1 {
r.log.Warnf("could not unambiguously infer package name for %q (found %d distinct package names)", csv.Name, l)
return properties, nil
}
var pkg string
for pkg = range packages {
// Assign the single key to pkg.
}
var version string // Emit empty string rather than "0.0.0" if .spec.version is zero-valued.
if !csv.Spec.Version.Version.Equals(semver.Version{}) {
version = csv.Spec.Version.String()
}
pp, err := json.Marshal(opregistry.PackageProperty{
PackageName: pkg,
Version: version,
})
if err != nil {
return nil, fmt.Errorf("failed to marshal inferred package property: %w", err)
}
properties = append(properties, &api.Property{
Type: opregistry.PackageType,
Value: string(pp),
})

return properties, nil
}

func (r *SatResolver) newSnapshotForNamespace(namespace string, subs []*v1alpha1.Subscription, csvs []*v1alpha1.ClusterServiceVersion) (*cache.Snapshot, error) {
existingOperatorCatalog := cache.NewVirtualSourceKey(namespace)
// build a catalog snapshot of CSVs without subscriptions
csvSubscriptions := make(map[*v1alpha1.ClusterServiceVersion]*v1alpha1.Subscription)
for _, sub := range subs {
for _, csv := range csvs {
if csv.Name == sub.Status.InstalledCSV {
csvSubscriptions[csv] = sub
break
}
}
}
var csvsMissingProperties []*v1alpha1.ClusterServiceVersion
standaloneOperators := make([]*cache.Entry, 0)
for _, csv := range csvs {
op, err := newOperatorFromV1Alpha1CSV(csv)
if err != nil {
return nil, err
}

if anno, ok := csv.GetAnnotations()[projection.PropertiesAnnotationKey]; !ok {
csvsMissingProperties = append(csvsMissingProperties, csv)
if inferred, err := r.inferProperties(csv, subs); err != nil {
r.log.Warnf("unable to infer properties for csv %q: %w", csv.Name, err)
} else {
op.Properties = append(op.Properties, inferred...)
}
} else if props, err := projection.PropertyListFromPropertiesAnnotation(anno); err != nil {
return nil, fmt.Errorf("failed to retrieve properties of csv %q: %w", csv.GetName(), err)
} else {
op.Properties = props
}

op.SourceInfo = &cache.OperatorSourceInfo{
Catalog: existingOperatorCatalog,
Subscription: csvSubscriptions[csv],
}
// Try to determine source package name from properties and add to SourceInfo.
for _, p := range op.Properties {
if p.Type != opregistry.PackageType {
continue
}
var pp opregistry.PackageProperty
err := json.Unmarshal([]byte(p.Value), &pp)
if err != nil {
r.log.Warnf("failed to unmarshal package property of csv %q: %w", csv.Name, err)
continue
}
op.SourceInfo.Package = pp.PackageName
}

standaloneOperators = append(standaloneOperators, op)
}

if len(csvsMissingProperties) > 0 {
names := make([]string, len(csvsMissingProperties))
for i, csv := range csvsMissingProperties {
names[i] = csv.GetName()
}
r.log.Infof("considered csvs without properties annotation during resolution: %v", names)
}

return &cache.Snapshot{Entries: standaloneOperators}, nil
}

func (r *SatResolver) addInvariants(namespacedCache cache.MultiCatalogOperatorFinder, variables map[solver.Identifier]solver.Variable) {
// no two operators may provide the same GVK or Package in a namespace
gvkConflictToVariable := make(map[opregistry.GVKProperty][]solver.Identifier)
Expand Down Expand Up @@ -923,51 +814,6 @@ func predicateForRequiredLabelProperty(value string) (cache.Predicate, error) {
return cache.LabelPredicate(label.Label), nil
}

func newOperatorFromV1Alpha1CSV(csv *v1alpha1.ClusterServiceVersion) (*cache.Entry, error) {
providedAPIs := cache.EmptyAPISet()
for _, crdDef := range csv.Spec.CustomResourceDefinitions.Owned {
parts := strings.SplitN(crdDef.Name, ".", 2)
if len(parts) < 2 {
return nil, fmt.Errorf("error parsing crd name: %s", crdDef.Name)
}
providedAPIs[opregistry.APIKey{Plural: parts[0], Group: parts[1], Version: crdDef.Version, Kind: crdDef.Kind}] = struct{}{}
}
for _, api := range csv.Spec.APIServiceDefinitions.Owned {
providedAPIs[opregistry.APIKey{Group: api.Group, Version: api.Version, Kind: api.Kind, Plural: api.Name}] = struct{}{}
}

requiredAPIs := cache.EmptyAPISet()
for _, crdDef := range csv.Spec.CustomResourceDefinitions.Required {
parts := strings.SplitN(crdDef.Name, ".", 2)
if len(parts) < 2 {
return nil, fmt.Errorf("error parsing crd name: %s", crdDef.Name)
}
requiredAPIs[opregistry.APIKey{Plural: parts[0], Group: parts[1], Version: crdDef.Version, Kind: crdDef.Kind}] = struct{}{}
}
for _, api := range csv.Spec.APIServiceDefinitions.Required {
requiredAPIs[opregistry.APIKey{Group: api.Group, Version: api.Version, Kind: api.Kind, Plural: api.Name}] = struct{}{}
}

properties, err := providedAPIsToProperties(providedAPIs)
if err != nil {
return nil, err
}
dependencies, err := requiredAPIsToProperties(requiredAPIs)
if err != nil {
return nil, err
}
properties = append(properties, dependencies...)

return &cache.Entry{
Name: csv.GetName(),
Version: &csv.Spec.Version.Version,
ProvidedAPIs: providedAPIs,
RequiredAPIs: requiredAPIs,
SourceInfo: &cache.ExistingOperator,
Properties: properties,
}, nil
}

func providedAPIsToProperties(apis cache.APISet) (out []*api.Property, err error) {
out = make([]*api.Property, 0)
for a := range apis {
Expand Down
Loading