Skip to content

Commit b8da86f

Browse files
committed
(fix) List catalogsource using client, instead of referring to cache
Using the information in the resolver cache to list the available catalogsources leads to the very common and widely known problem of using caches: invalid data due to a stale cache. This has showed up multiple times in production environments over the years, manifesting itself in the form of the all subscriptions in a namespace being transitioned into an error state when a Catalogsource that the cache claims to exist, has actually been deleted from the cluster, but the cache was not updated. The Subscriptions are transitioned to an error state because of the deleted catalogsource with the follwing error message: "message": "failed to populate resolver cache from source <deleted-catalogsource>: failed to list bundles: rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing dial tcp: lookup <deleted-catalogsource>.<ns>.svc on 172.....: no such host\"", "reason": "ErrorPreventedResolution", "status": "True", "type": "ResolutionFailed" This PR switches the information lookup from the cache, to using a client to list the CatalogSources present in the cluster.
1 parent 8089266 commit b8da86f

File tree

2 files changed

+47
-13
lines changed

2 files changed

+47
-13
lines changed

pkg/controller/operators/catalog/operator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
214214
clientFactory: clients.NewFactory(validatingConfig),
215215
}
216216
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
217-
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger)
217+
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
218218
resolverSourceProvider := NewOperatorGroupToggleSourceProvider(op.sourceInvalidator, logger, op.lister.OperatorsV1().OperatorGroupLister())
219219
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage)
220220
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, resolverSourceProvider, logger)

pkg/controller/registry/resolver/source_registry.go

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ import (
88
"time"
99

1010
"github.com/blang/semver/v4"
11+
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
1112
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
1213
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
1314
"github.com/operator-framework/operator-registry/pkg/api"
1415
"github.com/operator-framework/operator-registry/pkg/client"
1516
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
1617
"github.com/sirupsen/logrus"
18+
"k8s.io/apimachinery/pkg/labels"
1719
)
1820

1921
// todo: move to pkg/controller/operators/catalog
@@ -65,31 +67,63 @@ func (i *sourceInvalidator) GetValidChannel(key cache.SourceKey) <-chan struct{}
6567
}
6668

6769
type RegistrySourceProvider struct {
68-
rcp RegistryClientProvider
69-
logger logrus.StdLogger
70-
invalidator *sourceInvalidator
70+
rcp RegistryClientProvider
71+
catsrcLister v1alpha1listers.CatalogSourceLister
72+
logger logrus.StdLogger
73+
invalidator *sourceInvalidator
7174
}
7275

73-
func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, logger logrus.StdLogger) *RegistrySourceProvider {
76+
func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, catsrcLister v1alpha1listers.CatalogSourceLister, logger logrus.StdLogger) *RegistrySourceProvider {
7477
return &RegistrySourceProvider{
75-
rcp: rcp,
76-
logger: logger,
78+
rcp: rcp,
79+
logger: logger,
80+
catsrcLister: catsrcLister,
7781
invalidator: &sourceInvalidator{
7882
validChans: make(map[cache.SourceKey]chan struct{}),
7983
ttl: 5 * time.Minute,
8084
},
8185
}
8286
}
8387

88+
type errorSource struct {
89+
error
90+
}
91+
92+
func (s errorSource) Snapshot(_ context.Context) (*cache.Snapshot, error) {
93+
return nil, s.error
94+
}
95+
8496
func (a *RegistrySourceProvider) Sources(namespaces ...string) map[cache.SourceKey]cache.Source {
8597
result := make(map[cache.SourceKey]cache.Source)
86-
for key, client := range a.rcp.ClientsForNamespaces(namespaces...) {
87-
result[cache.SourceKey(key)] = &registrySource{
88-
key: cache.SourceKey(key),
89-
client: client,
90-
logger: a.logger,
91-
invalidator: a.invalidator,
98+
99+
cats, err := a.catsrcLister.List(labels.Everything())
100+
if err != nil {
101+
for _, ns := range namespaces {
102+
result[cache.SourceKey{Name: "", Namespace: ns}] = errorSource{
103+
error: fmt.Errorf("failed to list catalogsources for namespace %q: %w", ns, err),
104+
}
92105
}
106+
return result
107+
}
108+
109+
clients := a.rcp.ClientsForNamespaces(namespaces...)
110+
for _, cat := range cats {
111+
key := cache.SourceKey{Name: cat.Name, Namespace: cat.Namespace}
112+
if client, ok := clients[registry.CatalogKey{Name: cat.Name, Namespace: cat.Namespace}]; ok {
113+
result[key] = &registrySource{
114+
key: key,
115+
client: client,
116+
logger: a.logger,
117+
invalidator: a.invalidator,
118+
}
119+
} else {
120+
result[key] = errorSource{
121+
error: fmt.Errorf("no registry client established for catalogsource %s/%s", cat.Namespace, cat.Name),
122+
}
123+
}
124+
}
125+
if len(result) == 0 {
126+
return nil
93127
}
94128
return result
95129
}

0 commit comments

Comments
 (0)