Skip to content

Commit b029c95

Browse files
harishsurfbowenislandsong
authored andcommitted
Add Catalog Source priority for dependency resolution
This commit adds catsrc priority value to sorting consideration when resolving operator dependency. Catsrcs are ranked by their priority from high to low to be considered for supplying dependent operators. The default priorities for custom catsrcs are 0 and default catsrcs have negative priorities.
1 parent 5e5b11f commit b029c95

File tree

10 files changed

+655
-77
lines changed

10 files changed

+655
-77
lines changed

go.sum

Lines changed: 0 additions & 28 deletions
Large diffs are not rendered by default.

pkg/controller/operators/catalog/operator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
861861
// resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors
862862
steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace, querier)
863863
if err != nil {
864-
go o.recorder.Event(ns, corev1.EventTypeWarning,"ResolutionFailed", err.Error())
864+
go o.recorder.Event(ns, corev1.EventTypeWarning, "ResolutionFailed", err.Error())
865865
return err
866866
}
867867

pkg/controller/registry/resolver/cache.go

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ import (
99
"time"
1010

1111
"github.com/blang/semver"
12-
"github.com/operator-framework/operator-registry/pkg/api"
13-
"github.com/operator-framework/operator-registry/pkg/client"
14-
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
1512
"github.com/sirupsen/logrus"
1613

14+
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
1715
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
16+
"github.com/operator-framework/operator-registry/pkg/api"
17+
"github.com/operator-framework/operator-registry/pkg/client"
18+
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
1819
)
1920

2021
type RegistryClientProvider interface {
@@ -43,27 +44,33 @@ type OperatorCacheProvider interface {
4344
}
4445

4546
type OperatorCache struct {
46-
logger logrus.FieldLogger
47-
rcp RegistryClientProvider
48-
snapshots map[registry.CatalogKey]*CatalogSnapshot
49-
ttl time.Duration
50-
sem chan struct{}
51-
m sync.RWMutex
47+
logger logrus.FieldLogger
48+
rcp RegistryClientProvider
49+
catsrcLister v1alpha1.CatalogSourceLister
50+
snapshots map[registry.CatalogKey]*CatalogSnapshot
51+
ttl time.Duration
52+
sem chan struct{}
53+
m sync.RWMutex
5254
}
5355

56+
const defaultCatalogSourcePriority int = 0
57+
58+
type catalogSourcePriority int
59+
5460
var _ OperatorCacheProvider = &OperatorCache{}
5561

56-
func NewOperatorCache(rcp RegistryClientProvider, log logrus.FieldLogger) *OperatorCache {
62+
func NewOperatorCache(rcp RegistryClientProvider, log logrus.FieldLogger, catsrcLister v1alpha1.CatalogSourceLister) *OperatorCache {
5763
const (
5864
MaxConcurrentSnapshotUpdates = 4
5965
)
6066

6167
return &OperatorCache{
62-
logger: log,
63-
rcp: rcp,
64-
snapshots: make(map[registry.CatalogKey]*CatalogSnapshot),
65-
ttl: 5 * time.Minute,
66-
sem: make(chan struct{}, MaxConcurrentSnapshotUpdates),
68+
logger: log,
69+
rcp: rcp,
70+
catsrcLister: catsrcLister,
71+
snapshots: make(map[registry.CatalogKey]*CatalogSnapshot),
72+
ttl: 5 * time.Minute,
73+
sem: make(chan struct{}, MaxConcurrentSnapshotUpdates),
6774
}
6875
}
6976

@@ -151,11 +158,20 @@ func (c *OperatorCache) Namespaced(namespaces ...string) MultiCatalogOperatorFin
151158

152159
for _, miss := range misses {
153160
ctx, cancel := context.WithTimeout(context.Background(), CachePopulateTimeout)
161+
162+
catsrcPriority := defaultCatalogSourcePriority
163+
// Ignoring error and treat catsrc priority as 0 if not found.
164+
catsrc, err := c.catsrcLister.CatalogSources(miss.Namespace).Get(miss.Name)
165+
if err == nil {
166+
catsrcPriority = catsrc.Spec.Priority
167+
}
168+
154169
s := CatalogSnapshot{
155-
logger: c.logger.WithField("catalog", miss),
156-
key: miss,
157-
expiry: now.Add(c.ttl),
158-
pop: cancel,
170+
logger: c.logger.WithField("catalog", miss),
171+
key: miss,
172+
expiry: now.Add(c.ttl),
173+
pop: cancel,
174+
priority: catalogSourcePriority(catsrcPriority),
159175
}
160176
s.m.Lock()
161177
c.snapshots[miss] = &s
@@ -222,13 +238,13 @@ func ensurePackageProperty(o *Operator, name, version string) {
222238
PackageName: name,
223239
Version: version,
224240
}
225-
byte, err := json.Marshal(prop)
241+
bytes, err := json.Marshal(prop)
226242
if err != nil {
227243
return
228244
}
229245
o.properties = append(o.properties, &api.Property{
230246
Type: opregistry.PackageType,
231-
Value: string(byte),
247+
Value: string(bytes),
232248
})
233249
}
234250

@@ -277,6 +293,7 @@ type CatalogSnapshot struct {
277293
operators []*Operator
278294
m sync.RWMutex
279295
pop context.CancelFunc
296+
priority catalogSourcePriority
280297
}
281298

282299
func (s *CatalogSnapshot) Cancel() {
@@ -354,10 +371,14 @@ func (s SortableSnapshots) Less(i, j int) bool {
354371
return false
355372
}
356373

357-
// the rest are sorted first in namespace preference order, then by name
374+
// the rest are sorted first on priority, namespace and then by name
375+
if s.snapshots[i].priority != s.snapshots[j].priority {
376+
return s.snapshots[i].priority > s.snapshots[j].priority
377+
}
358378
if s.snapshots[i].key.Namespace != s.snapshots[j].key.Namespace {
359379
return s.namespaces[s.snapshots[i].key.Namespace] < s.namespaces[s.snapshots[j].key.Namespace]
360380
}
381+
361382
return s.snapshots[i].key.Name < s.snapshots[j].key.Name
362383
}
363384

pkg/controller/registry/resolver/cache_test.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ import (
1313
"github.com/stretchr/testify/assert"
1414
"github.com/stretchr/testify/require"
1515

16+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
17+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
1618
"github.com/operator-framework/operator-registry/pkg/api"
1719
"github.com/operator-framework/operator-registry/pkg/client"
1820
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
19-
20-
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
2121
)
2222

2323
type BundleStreamStub struct {
@@ -83,8 +83,8 @@ func TestOperatorCacheConcurrency(t *testing.T) {
8383
const (
8484
NWorkers = 64
8585
)
86-
8786
rcp := RegistryClientProviderStub{}
87+
catsrcLister := operatorlister.NewLister().OperatorsV1alpha1().CatalogSourceLister()
8888
var keys []registry.CatalogKey
8989
for i := 0; i < 128; i++ {
9090
for j := 0; j < 8; j++ {
@@ -106,7 +106,7 @@ func TestOperatorCacheConcurrency(t *testing.T) {
106106
}
107107
}
108108

109-
c := NewOperatorCache(rcp, logrus.New())
109+
c := NewOperatorCache(rcp, logrus.New(), catsrcLister)
110110

111111
errs := make(chan error)
112112
for w := 0; w < NWorkers; w++ {
@@ -140,6 +140,7 @@ func TestOperatorCacheConcurrency(t *testing.T) {
140140

141141
func TestOperatorCacheExpiration(t *testing.T) {
142142
rcp := RegistryClientProviderStub{}
143+
catsrcLister := operatorlister.NewLister().OperatorsV1alpha1().CatalogSourceLister()
143144
key := registry.CatalogKey{Namespace: "dummynamespace", Name: "dummyname"}
144145
rcp[key] = &RegistryClientStub{
145146
BundleIterator: client.NewBundleIterator(&BundleStreamStub{
@@ -155,14 +156,15 @@ func TestOperatorCacheExpiration(t *testing.T) {
155156
}),
156157
}
157158

158-
c := NewOperatorCache(rcp, logrus.New())
159+
c := NewOperatorCache(rcp, logrus.New(), catsrcLister)
159160
c.ttl = 0 // instantly stale
160161

161162
require.Len(t, c.Namespaced("dummynamespace").Catalog(key).Find(WithCSVName("csvname")), 1)
162163
}
163164

164165
func TestOperatorCacheReuse(t *testing.T) {
165166
rcp := RegistryClientProviderStub{}
167+
catsrcLister := operatorlister.NewLister().OperatorsV1alpha1().CatalogSourceLister()
166168
key := registry.CatalogKey{Namespace: "dummynamespace", Name: "dummyname"}
167169
rcp[key] = &RegistryClientStub{
168170
BundleIterator: client.NewBundleIterator(&BundleStreamStub{
@@ -178,7 +180,7 @@ func TestOperatorCacheReuse(t *testing.T) {
178180
}),
179181
}
180182

181-
c := NewOperatorCache(rcp, logrus.New())
183+
c := NewOperatorCache(rcp, logrus.New(), catsrcLister)
182184

183185
require.Len(t, c.Namespaced("dummynamespace").Catalog(key).Find(WithCSVName("csvname")), 1)
184186
}
@@ -290,6 +292,7 @@ func TestCatalogSnapshotFind(t *testing.T) {
290292

291293
func TestStripPluralRequiredAndProvidedAPIKeys(t *testing.T) {
292294
rcp := RegistryClientProviderStub{}
295+
catsrcLister := operatorlister.NewLister().OperatorsV1alpha1().CatalogSourceLister()
293296
key := registry.CatalogKey{Namespace: "testnamespace", Name: "testname"}
294297
rcp[key] = &RegistryClientStub{
295298
BundleIterator: client.NewBundleIterator(&BundleStreamStub{
@@ -327,7 +330,7 @@ func TestStripPluralRequiredAndProvidedAPIKeys(t *testing.T) {
327330
}),
328331
}
329332

330-
c := NewOperatorCache(rcp, logrus.New())
333+
c := NewOperatorCache(rcp, logrus.New(), catsrcLister)
331334

332335
nc := c.Namespaced("testnamespace")
333336
result, err := AtLeast(1, nc.Find(ProvidingAPI(opregistry.APIKey{Group: "g", Version: "v1", Kind: "K"})))

pkg/controller/registry/resolver/operators.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func NewOperatorFromBundle(bundle *api.Bundle, startingCSV string, sourceKey reg
258258

259259
// legacy support - if the api doesn't contain properties/dependencies, build them from required/provided apis
260260
properties := bundle.Properties
261-
if properties == nil || len(properties) == 0{
261+
if properties == nil || len(properties) == 0 {
262262
properties, err = apisToProperties(provided)
263263
if err != nil {
264264
return nil, err
@@ -297,8 +297,6 @@ func NewOperatorFromBundle(bundle *api.Bundle, startingCSV string, sourceKey reg
297297
return op, nil
298298
}
299299

300-
301-
302300
return &Operator{
303301
name: bundle.CsvName,
304302
replaces: bundle.Replaces,

pkg/controller/registry/resolver/resolver.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ import (
66
"fmt"
77
"sort"
88

9-
"github.com/operator-framework/api/pkg/operators/v1alpha1"
10-
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
119
"github.com/sirupsen/logrus"
1210
utilerrors "k8s.io/apimachinery/pkg/util/errors"
1311

12+
"github.com/operator-framework/api/pkg/operators/v1alpha1"
13+
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
1414
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
1515
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/solver"
16+
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
1617
)
1718

1819
type OperatorResolver interface {
@@ -24,9 +25,9 @@ type SatResolver struct {
2425
log logrus.FieldLogger
2526
}
2627

27-
func NewDefaultSatResolver(rcp RegistryClientProvider, log logrus.FieldLogger) *SatResolver {
28+
func NewDefaultSatResolver(rcp RegistryClientProvider, catsrcLister v1alpha1listers.CatalogSourceLister, log logrus.FieldLogger) *SatResolver {
2829
return &SatResolver{
29-
cache: NewOperatorCache(rcp, log),
30+
cache: NewOperatorCache(rcp, log, catsrcLister),
3031
log: log,
3132
}
3233
}

0 commit comments

Comments
 (0)