Skip to content

Commit df427eb

Browse files
committed
Make CatalogSource the source of truth for available catalogs.
Internally, the catalog operator has always maintained a set of registry clients for each CatalogSource. Although this set is reconciled toward containing a client per CatalogSource object, there is some latency before changes made to CatalogSources are reflected in the client set, and differences between CatalogSources and client set membership are a potential source of error. Instead, the catalog operator should list CatalogSources from its informer cache to determine which catalogs are reachable from a given namespace. Signed-off-by: Ben Luddy <[email protected]>
1 parent 9ced412 commit df427eb

File tree

5 files changed

+112
-29
lines changed

5 files changed

+112
-29
lines changed

pkg/controller/operators/catalog/operator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
191191
clientFactory: clients.NewFactory(config),
192192
}
193193
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
194-
op.resolverSourceProvider = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger)
194+
op.resolverSourceProvider = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
195195
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient)
196196
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.resolverSourceProvider, logger)
197197
op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure)

pkg/controller/registry/grpc/source.go

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -256,40 +256,16 @@ func (s *SourceStore) Remove(key registry.CatalogKey) error {
256256
return source.Conn.Close()
257257
}
258258

259-
func (s *SourceStore) AsClients(namespaces ...string) map[registry.CatalogKey]registry.ClientInterface {
260-
refs := map[registry.CatalogKey]registry.ClientInterface{}
261-
s.sourcesLock.RLock()
262-
defer s.sourcesLock.RUnlock()
263-
for key, source := range s.sources {
264-
if source.LastConnect.IsZero() {
265-
continue
266-
}
267-
for _, namespace := range namespaces {
268-
if key.Namespace == namespace {
269-
refs[key] = registry.NewClientFromConn(source.Conn)
270-
}
271-
}
272-
}
273-
274-
// TODO : remove unhealthy
275-
return refs
276-
}
277-
278259
func (s *SourceStore) ClientsForNamespaces(namespaces ...string) map[registry.CatalogKey]client.Interface {
279260
refs := map[registry.CatalogKey]client.Interface{}
280261
s.sourcesLock.RLock()
281262
defer s.sourcesLock.RUnlock()
282263
for key, source := range s.sources {
283-
if source.LastConnect.IsZero() {
284-
continue
285-
}
286264
for _, namespace := range namespaces {
287265
if key.Namespace == namespace {
288266
refs[key] = client.NewClientFromConn(source.Conn)
289267
}
290268
}
291269
}
292-
293-
// TODO : remove unhealthy
294270
return refs
295271
}

pkg/controller/registry/resolver/source_registry.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,15 @@ import (
88
"time"
99

1010
"github.com/blang/semver/v4"
11+
"github.com/operator-framework/api/pkg/operators/v1alpha1"
12+
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
1113
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
1214
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
1315
"github.com/operator-framework/operator-registry/pkg/api"
1416
"github.com/operator-framework/operator-registry/pkg/client"
1517
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
1618
"github.com/sirupsen/logrus"
19+
"k8s.io/apimachinery/pkg/labels"
1720
)
1821

1922
// todo: move to pkg/controller/operators/catalog
@@ -66,22 +69,41 @@ func (i *sourceInvalidator) GetValidChannel(key cache.SourceKey) <-chan struct{}
6669

6770
type RegistrySourceProvider struct {
6871
rcp RegistryClientProvider
72+
catsrcs v1alpha1listers.CatalogSourceLister
6973
logger logrus.StdLogger
7074
invalidator *sourceInvalidator
7175
}
7276

73-
func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, logger logrus.StdLogger) *RegistrySourceProvider {
77+
func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, catsrcs v1alpha1listers.CatalogSourceLister, logger logrus.StdLogger) *RegistrySourceProvider {
7478
return &RegistrySourceProvider{
75-
rcp: rcp,
76-
logger: logger,
79+
rcp: rcp,
80+
catsrcs: catsrcs,
81+
logger: logger,
7782
invalidator: &sourceInvalidator{
7883
validChans: make(map[cache.SourceKey]chan struct{}),
7984
ttl: 5 * time.Minute,
8085
},
8186
}
8287
}
8388

89+
type errorSource struct {
90+
error
91+
}
92+
93+
func (s errorSource) Snapshot(_ context.Context) (*cache.Snapshot, error) {
94+
return nil, s.error
95+
}
96+
8497
func (a *RegistrySourceProvider) Sources(namespaces ...string) map[cache.SourceKey]cache.Source {
98+
var catalogs []*v1alpha1.CatalogSource
99+
for _, namespace := range namespaces {
100+
cats, err := a.catsrcs.CatalogSources(namespace).List(labels.Everything())
101+
if err != nil {
102+
// todo: this must be properly exposed
103+
panic(err)
104+
}
105+
catalogs = append(catalogs, cats...)
106+
}
85107
result := make(map[cache.SourceKey]cache.Source)
86108
for key, client := range a.rcp.ClientsForNamespaces(namespaces...) {
87109
result[cache.SourceKey(key)] = &registrySource{
@@ -91,6 +113,14 @@ func (a *RegistrySourceProvider) Sources(namespaces ...string) map[cache.SourceK
91113
invalidator: a.invalidator,
92114
}
93115
}
116+
for _, cat := range catalogs {
117+
key := cache.SourceKey{Name: cat.Name, Namespace: cat.Namespace}
118+
if _, ok := result[key]; !ok {
119+
result[key] = errorSource{
120+
error: fmt.Errorf("registry server not reachable for catalogsource %s/%s", cat.Namespace, cat.Name),
121+
}
122+
}
123+
}
94124
return result
95125
}
96126

pkg/controller/registry/resolver/step_resolver.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio
5757
cacheSourceProvider := &mergedSourceProvider{
5858
sps: []cache.SourceProvider{
5959
sourceProvider,
60-
//SourceProviderFromRegistryClientProvider(provider, log),
6160
&csvSourceProvider{
6261
csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(),
6362
subLister: lister.OperatorsV1alpha1().SubscriptionLister(),

test/e2e/subscription_e2e_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
appsv1 "k8s.io/api/apps/v1"
2020
corev1 "k8s.io/api/core/v1"
2121
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
22+
"k8s.io/apimachinery/pkg/api/errors"
2223
apierrors "k8s.io/apimachinery/pkg/api/errors"
2324
"k8s.io/apimachinery/pkg/api/resource"
2425
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -27,6 +28,7 @@ import (
2728
"sigs.k8s.io/controller-runtime/pkg/client"
2829

2930
"github.com/operator-framework/api/pkg/lib/version"
31+
"github.com/operator-framework/api/pkg/operators/v1alpha1"
3032
operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
3133
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
3234
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install"
@@ -62,6 +64,82 @@ var _ = Describe("Subscription", func() {
6264
TeardownNamespace(generatedNamespace.GetName())
6365
})
6466

67+
When("a registry server is not started for a grpc-type CatalogSource", func() {
68+
var (
69+
catsrc *v1alpha1.CatalogSource
70+
)
71+
72+
BeforeEach(func() {
73+
catsrc = &v1alpha1.CatalogSource{
74+
ObjectMeta: metav1.ObjectMeta{
75+
Namespace: generatedNamespace.GetName(),
76+
GenerateName: "without-registry-server-",
77+
},
78+
Spec: v1alpha1.CatalogSourceSpec{
79+
SourceType: v1alpha1.SourceTypeGrpc,
80+
Image: "@", // bad image ref, pod creation will fail
81+
},
82+
}
83+
Expect(ctx.Ctx().Client().Create(context.Background(), catsrc)).To(Succeed())
84+
})
85+
86+
AfterEach(func() {
87+
Eventually(func() error {
88+
if catsrc == nil {
89+
return nil
90+
}
91+
return ctx.Ctx().Client().Delete(context.Background(), catsrc)
92+
}).Should(Or(
93+
Succeed(),
94+
WithTransform(errors.IsNotFound, BeTrue()),
95+
))
96+
})
97+
98+
It("should indicate ErrorPreventedResolution on a dependent Subscription status", func() {
99+
sub := v1alpha1.Subscription{
100+
ObjectMeta: metav1.ObjectMeta{
101+
Namespace: generatedNamespace.GetName(),
102+
GenerateName: "test-subscription",
103+
},
104+
Spec: &v1alpha1.SubscriptionSpec{
105+
Package: "whatever",
106+
},
107+
}
108+
Expect(ctx.Ctx().Client().Create(context.Background(), &sub)).To(Succeed())
109+
110+
getCondition := func() (v1alpha1.SubscriptionCondition, error) {
111+
if err := ctx.Ctx().Client().Get(context.Background(), client.ObjectKeyFromObject(&sub), &sub); err != nil {
112+
return v1alpha1.SubscriptionCondition{}, err
113+
}
114+
cond := sub.Status.GetCondition(v1alpha1.SubscriptionResolutionFailed)
115+
ctx.Ctx().Logf("%#v\n", cond)
116+
return v1alpha1.SubscriptionCondition{
117+
Type: cond.Type,
118+
Reason: cond.Reason,
119+
Status: cond.Status,
120+
}, nil
121+
}
122+
123+
// this doesn't seem very robust. basically, this subscription condition should arrive directly at True/ErrorPreventedResolution without passing through any other non-Unknown states
124+
125+
Consistently(getCondition).Should(And(
126+
Not(Equal(v1alpha1.SubscriptionCondition{
127+
Type: v1alpha1.SubscriptionResolutionFailed,
128+
Reason: "ConstraintsNotSatisfiable",
129+
Status: corev1.ConditionTrue,
130+
})),
131+
))
132+
133+
Eventually(getCondition).Should(
134+
Equal(v1alpha1.SubscriptionCondition{
135+
Type: v1alpha1.SubscriptionResolutionFailed,
136+
Reason: "ErrorPreventedResolution",
137+
Status: corev1.ConditionTrue,
138+
}),
139+
)
140+
})
141+
})
142+
65143
When("an entry in the middle of a channel does not provide a required GVK", func() {
66144
var (
67145
teardown func()

0 commit comments

Comments
 (0)