Skip to content

Commit 7a2c454

Browse files
committed
Make CatalogSource the source of truth for available catalogs.
Internally, the catalog operator has always maintained a set of registry clients for each CatalogSource. Although this set is reconciled toward containing a client per CatalogSource object, there is some latency before changes made to CatalogSources are reflected in the client set, and differences between CatalogSources and client set membership are a potential source of error. Instead, the catalog operator should list CatalogSources from its informer cache to determine which catalogs are reachable from a given namespace. Signed-off-by: Ben Luddy <[email protected]>
1 parent 08a95ad commit 7a2c454

File tree

5 files changed

+125
-37
lines changed

5 files changed

+125
-37
lines changed

pkg/controller/operators/catalog/operator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
192192
clientFactory: clients.NewFactory(config),
193193
}
194194
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
195-
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger)
195+
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
196196
resolverSourceProvider := NewOperatorGroupToggleSourceProvider(op.sourceInvalidator, logger, op.lister.OperatorsV1().OperatorGroupLister())
197197
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID)
198198
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, resolverSourceProvider, logger)

pkg/controller/registry/grpc/source.go

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -256,40 +256,16 @@ func (s *SourceStore) Remove(key registry.CatalogKey) error {
256256
return source.Conn.Close()
257257
}
258258

259-
func (s *SourceStore) AsClients(namespaces ...string) map[registry.CatalogKey]registry.ClientInterface {
260-
refs := map[registry.CatalogKey]registry.ClientInterface{}
261-
s.sourcesLock.RLock()
262-
defer s.sourcesLock.RUnlock()
263-
for key, source := range s.sources {
264-
if source.LastConnect.IsZero() {
265-
continue
266-
}
267-
for _, namespace := range namespaces {
268-
if key.Namespace == namespace {
269-
refs[key] = registry.NewClientFromConn(source.Conn)
270-
}
271-
}
272-
}
273-
274-
// TODO : remove unhealthy
275-
return refs
276-
}
277-
278259
func (s *SourceStore) ClientsForNamespaces(namespaces ...string) map[registry.CatalogKey]client.Interface {
279260
refs := map[registry.CatalogKey]client.Interface{}
280261
s.sourcesLock.RLock()
281262
defer s.sourcesLock.RUnlock()
282263
for key, source := range s.sources {
283-
if source.LastConnect.IsZero() {
284-
continue
285-
}
286264
for _, namespace := range namespaces {
287265
if key.Namespace == namespace {
288266
refs[key] = client.NewClientFromConn(source.Conn)
289267
}
290268
}
291269
}
292-
293-
// TODO : remove unhealthy
294270
return refs
295271
}

pkg/controller/registry/resolver/source_registry.go

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ import (
88
"time"
99

1010
"github.com/blang/semver/v4"
11+
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
1112
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
1213
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
1314
"github.com/operator-framework/operator-registry/pkg/api"
1415
"github.com/operator-framework/operator-registry/pkg/client"
1516
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
1617
"github.com/sirupsen/logrus"
18+
"k8s.io/apimachinery/pkg/labels"
1719
)
1820

1921
// todo: move to pkg/controller/operators/catalog
@@ -66,31 +68,66 @@ func (i *sourceInvalidator) GetValidChannel(key cache.SourceKey) <-chan struct{}
6668

6769
type RegistrySourceProvider struct {
6870
rcp RegistryClientProvider
71+
catsrcs v1alpha1listers.CatalogSourceLister
6972
logger logrus.StdLogger
7073
invalidator *sourceInvalidator
7174
}
7275

73-
func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, logger logrus.StdLogger) *RegistrySourceProvider {
76+
func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, catsrcs v1alpha1listers.CatalogSourceLister, logger logrus.StdLogger) *RegistrySourceProvider {
7477
return &RegistrySourceProvider{
75-
rcp: rcp,
76-
logger: logger,
78+
rcp: rcp,
79+
catsrcs: catsrcs,
80+
logger: logger,
7781
invalidator: &sourceInvalidator{
7882
validChans: make(map[cache.SourceKey]chan struct{}),
7983
ttl: 5 * time.Minute,
8084
},
8185
}
8286
}
8387

84-
func (a *RegistrySourceProvider) Sources(namespaces ...string) map[cache.SourceKey]cache.Source {
85-
result := make(map[cache.SourceKey]cache.Source)
86-
for key, client := range a.rcp.ClientsForNamespaces(namespaces...) {
87-
result[cache.SourceKey(key)] = &registrySource{
88-
key: cache.SourceKey(key),
89-
client: client,
90-
logger: a.logger,
91-
invalidator: a.invalidator,
88+
type errorSource struct {
89+
error
90+
}
91+
92+
func (s errorSource) Snapshot(_ context.Context) (*cache.Snapshot, error) {
93+
return nil, s.error
94+
}
95+
96+
func (a *RegistrySourceProvider) Sources(namespaces ...string) (result map[cache.SourceKey]cache.Source) {
97+
result = make(map[cache.SourceKey]cache.Source)
98+
defer func() {
99+
if len(result) == 0 {
100+
result = nil
101+
}
102+
}()
103+
104+
cats, err := a.catsrcs.List(labels.Everything())
105+
if err != nil {
106+
for _, ns := range namespaces {
107+
result[cache.SourceKey{Name: "", Namespace: ns}] = errorSource{
108+
error: fmt.Errorf("failed to list catalogsources for namespace %q: %w", ns, err),
109+
}
110+
}
111+
return result
112+
}
113+
114+
clients := a.rcp.ClientsForNamespaces(namespaces...)
115+
for _, cat := range cats {
116+
key := cache.SourceKey{Name: cat.Name, Namespace: cat.Namespace}
117+
if client, ok := clients[registry.CatalogKey{Name: cat.Name, Namespace: cat.Namespace}]; ok {
118+
result[key] = &registrySource{
119+
key: cache.SourceKey(key),
120+
client: client,
121+
logger: a.logger,
122+
invalidator: a.invalidator,
123+
}
124+
continue
125+
}
126+
result[key] = errorSource{
127+
error: fmt.Errorf("no registry client established for catalogsource %s/%s", cat.Namespace, cat.Name),
92128
}
93129
}
130+
94131
return result
95132
}
96133

pkg/controller/registry/resolver/step_resolver.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio
5757
cacheSourceProvider := &mergedSourceProvider{
5858
sps: []cache.SourceProvider{
5959
sourceProvider,
60-
//SourceProviderFromRegistryClientProvider(provider, log),
6160
&csvSourceProvider{
6261
csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(),
6362
subLister: lister.OperatorsV1alpha1().SubscriptionLister(),

test/e2e/subscription_e2e_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
appsv1 "k8s.io/api/apps/v1"
2121
corev1 "k8s.io/api/core/v1"
2222
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
23+
"k8s.io/apimachinery/pkg/api/errors"
2324
apierrors "k8s.io/apimachinery/pkg/api/errors"
2425
"k8s.io/apimachinery/pkg/api/resource"
2526
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -77,6 +78,81 @@ var _ = Describe("Subscription", func() {
7778
TeardownNamespace(generatedNamespace.GetName())
7879
})
7980

81+
When("a registry server for a grpc-type CatalogSource is not running", func() {
82+
var (
83+
catsrc *v1alpha1.CatalogSource
84+
)
85+
86+
BeforeEach(func() {
87+
catsrc = &v1alpha1.CatalogSource{
88+
ObjectMeta: metav1.ObjectMeta{
89+
Namespace: generatedNamespace.GetName(),
90+
GenerateName: "without-registry-server-",
91+
},
92+
Spec: v1alpha1.CatalogSourceSpec{
93+
SourceType: v1alpha1.SourceTypeGrpc,
94+
Image: "@", // bad image ref, pod creation will fail
95+
},
96+
}
97+
Expect(ctx.Ctx().Client().Create(context.Background(), catsrc)).To(Succeed())
98+
})
99+
100+
AfterEach(func() {
101+
Eventually(func() error {
102+
if catsrc == nil {
103+
return nil
104+
}
105+
return ctx.Ctx().Client().Delete(context.Background(), catsrc)
106+
}).Should(Or(
107+
Succeed(),
108+
WithTransform(errors.IsNotFound, BeTrue()),
109+
))
110+
})
111+
112+
It("should indicate ErrorPreventedResolution on a dependent Subscription status", func() {
113+
sub := v1alpha1.Subscription{
114+
ObjectMeta: metav1.ObjectMeta{
115+
Namespace: generatedNamespace.GetName(),
116+
GenerateName: "test-subscription",
117+
},
118+
Spec: &v1alpha1.SubscriptionSpec{
119+
Package: "whatever",
120+
},
121+
}
122+
Expect(ctx.Ctx().Client().Create(context.Background(), &sub)).To(Succeed())
123+
124+
getCondition := func() (v1alpha1.SubscriptionCondition, error) {
125+
if err := ctx.Ctx().Client().Get(context.Background(), client.ObjectKeyFromObject(&sub), &sub); err != nil {
126+
return v1alpha1.SubscriptionCondition{}, err
127+
}
128+
cond := sub.Status.GetCondition(v1alpha1.SubscriptionResolutionFailed)
129+
return v1alpha1.SubscriptionCondition{
130+
Type: cond.Type,
131+
Reason: cond.Reason,
132+
Status: cond.Status,
133+
}, nil
134+
}
135+
136+
// this doesn't seem very robust. basically, this subscription condition should arrive directly at True/ErrorPreventedResolution without passing through any other non-Unknown states
137+
138+
Consistently(getCondition).Should(And(
139+
Not(Equal(v1alpha1.SubscriptionCondition{
140+
Type: v1alpha1.SubscriptionResolutionFailed,
141+
Reason: "ConstraintsNotSatisfiable",
142+
Status: corev1.ConditionTrue,
143+
})),
144+
))
145+
146+
Eventually(getCondition).Should(
147+
Equal(v1alpha1.SubscriptionCondition{
148+
Type: v1alpha1.SubscriptionResolutionFailed,
149+
Reason: "ErrorPreventedResolution",
150+
Status: corev1.ConditionTrue,
151+
}),
152+
)
153+
})
154+
})
155+
80156
When("an entry in the middle of a channel does not provide a required GVK", func() {
81157
var (
82158
teardown func()

0 commit comments

Comments
 (0)