Skip to content

Commit 9f126de

Browse files
Merge pull request #1458 from benluddy/resolver-cache
Add an operator cache for use by the resolver.
2 parents c64f52f + 6fb2dac commit 9f126de

File tree

3 files changed

+603
-5
lines changed

3 files changed

+603
-5
lines changed

pkg/controller/registry/grpc/source.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,18 +190,19 @@ func (s *SourceStore) Remove(key resolver.CatalogKey) error {
190190
return source.Conn.Close()
191191
}
192192

193-
func (s *SourceStore) AsClients(globalNamespace, localNamespace string) map[resolver.CatalogKey]client.Interface {
193+
func (s *SourceStore) AsClients(namespaces ...string) map[resolver.CatalogKey]client.Interface {
194194
refs := map[resolver.CatalogKey]client.Interface{}
195195
s.sourcesLock.RLock()
196196
defer s.sourcesLock.RUnlock()
197197
for key, source := range s.sources {
198-
if !(key.Namespace == globalNamespace || key.Namespace == localNamespace) {
199-
continue
200-
}
201198
if source.LastConnect.IsZero() {
202199
continue
203200
}
204-
refs[key] = client.NewClientFromConn(source.Conn)
201+
for _, namespace := range namespaces {
202+
if key.Namespace == namespace {
203+
refs[key] = client.NewClientFromConn(source.Conn)
204+
}
205+
}
205206
}
206207

207208
// TODO : remove unhealthy
Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
package resolver
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
9+
"github.com/blang/semver"
10+
"github.com/sirupsen/logrus"
11+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
13+
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
14+
"github.com/operator-framework/operator-registry/pkg/client"
15+
)
16+
17+
type ClientProvider interface {
18+
Get() (client.Interface, error)
19+
}
20+
21+
type LazyClient struct {
22+
address string
23+
c client.Interface
24+
m sync.Mutex
25+
}
26+
27+
func (lc *LazyClient) Get() (client.Interface, error) {
28+
lc.m.Lock()
29+
defer lc.m.Unlock()
30+
if lc.c != nil {
31+
return lc.c, nil
32+
}
33+
client, err := client.NewClient(lc.address)
34+
if err != nil {
35+
return nil, err
36+
}
37+
lc.c = client
38+
return lc.c, nil
39+
}
40+
41+
type RegistryClientProvider interface {
42+
ClientsForNamespaces(namespaces ...string) map[CatalogKey]ClientProvider
43+
}
44+
45+
type DefaultRegistryClientProvider struct {
46+
logger logrus.FieldLogger
47+
c versioned.Interface
48+
}
49+
50+
func NewDefaultRegistryClientProvider(c versioned.Interface) *DefaultRegistryClientProvider {
51+
return &DefaultRegistryClientProvider{
52+
logger: logrus.New(),
53+
c: c,
54+
}
55+
}
56+
57+
func (rcp *DefaultRegistryClientProvider) ClientsForNamespaces(namespaces ...string) map[CatalogKey]ClientProvider {
58+
result := make(map[CatalogKey]ClientProvider)
59+
for _, namespace := range namespaces {
60+
list, err := rcp.c.OperatorsV1alpha1().CatalogSources(namespace).List(context.TODO(), metav1.ListOptions{})
61+
if err != nil {
62+
rcp.logger.Errorf("failed to list catalogsources in %s: %s", namespace, err.Error())
63+
continue
64+
}
65+
for _, source := range list.Items {
66+
if source.Status.RegistryServiceStatus == nil {
67+
continue
68+
}
69+
key := CatalogKey{
70+
Namespace: source.Namespace,
71+
Name: source.Name,
72+
}
73+
result[key] = &LazyClient{address: source.Address()}
74+
}
75+
}
76+
return result
77+
}
78+
79+
type CatalogDependencyCache interface {
80+
GetCSVNameFromCatalog(csvName string, catalog CatalogKey) (Operator, error)
81+
GetCSVNameFromAllCatalogs(csvName string) ([]Operator, error)
82+
GetPackageFromAllCatalogs(pkg string) ([]Operator, error)
83+
GetPackageVersionFromAllCatalogs(pkg string, version semver.Version) ([]Operator, error)
84+
}
85+
86+
type OperatorCache struct {
87+
logger logrus.FieldLogger
88+
rcp RegistryClientProvider
89+
snapshots map[CatalogKey]*CatalogSnapshot
90+
ttl time.Duration
91+
sem chan struct{}
92+
m sync.RWMutex
93+
}
94+
95+
func NewOperatorCache(rcp RegistryClientProvider) *OperatorCache {
96+
const (
97+
MaxConcurrentSnapshotUpdates = 4
98+
)
99+
100+
return &OperatorCache{
101+
logger: logrus.New(),
102+
rcp: rcp,
103+
snapshots: make(map[CatalogKey]*CatalogSnapshot),
104+
ttl: 5 * time.Minute,
105+
sem: make(chan struct{}, MaxConcurrentSnapshotUpdates),
106+
}
107+
}
108+
109+
type NamespacedOperatorCache struct {
110+
snapshots map[CatalogKey]*CatalogSnapshot
111+
}
112+
113+
var _ CatalogDependencyCache = &NamespacedOperatorCache{}
114+
115+
func (c *OperatorCache) Namespaced(namespaces ...string) *NamespacedOperatorCache {
116+
const (
117+
CachePopulateTimeout = time.Minute
118+
)
119+
120+
now := time.Now()
121+
clients := c.rcp.ClientsForNamespaces(namespaces...)
122+
123+
result := NamespacedOperatorCache{
124+
snapshots: make(map[CatalogKey]*CatalogSnapshot),
125+
}
126+
127+
var misses []CatalogKey
128+
func() {
129+
c.m.RLock()
130+
defer c.m.RUnlock()
131+
for key := range clients {
132+
if snapshot, ok := c.snapshots[key]; ok && !snapshot.Expired(now) {
133+
result.snapshots[key] = snapshot
134+
} else {
135+
misses = append(misses, key)
136+
}
137+
}
138+
}()
139+
140+
if len(misses) == 0 {
141+
return &result
142+
}
143+
144+
c.m.Lock()
145+
defer c.m.Unlock()
146+
147+
// Take the opportunity to clear expired snapshots while holding the lock.
148+
var expired []CatalogKey
149+
for key, snapshot := range c.snapshots {
150+
if snapshot.Expired(now) {
151+
snapshot.Cancel()
152+
expired = append(expired, key)
153+
}
154+
}
155+
for _, key := range expired {
156+
delete(c.snapshots, key)
157+
158+
}
159+
160+
// Check for any snapshots that were populated while waiting to acquire the lock.
161+
var found int
162+
for i := range misses {
163+
if snapshot, ok := c.snapshots[misses[i]]; ok && !snapshot.Expired(now) {
164+
result.snapshots[misses[i]] = snapshot
165+
misses[found], misses[i] = misses[i], misses[found]
166+
found++
167+
}
168+
}
169+
misses = misses[found:]
170+
171+
for _, miss := range misses {
172+
ctx, cancel := context.WithTimeout(context.Background(), CachePopulateTimeout)
173+
s := CatalogSnapshot{
174+
logger: c.logger.WithField("catalog", miss),
175+
key: miss,
176+
expiry: now.Add(c.ttl),
177+
pop: cancel,
178+
}
179+
s.m.Lock()
180+
c.snapshots[miss] = &s
181+
result.snapshots[miss] = &s
182+
go c.populate(ctx, &s, clients[miss])
183+
}
184+
185+
return &result
186+
}
187+
188+
func (c *OperatorCache) populate(ctx context.Context, snapshot *CatalogSnapshot, provider ClientProvider) {
189+
defer snapshot.m.Unlock()
190+
191+
c.sem <- struct{}{}
192+
defer func() { <-c.sem }()
193+
194+
registry, err := provider.Get()
195+
if err != nil {
196+
snapshot.logger.Errorf("failed to connect to registry: %s", err.Error())
197+
return
198+
}
199+
200+
it, err := registry.ListBundles(ctx)
201+
if err != nil {
202+
snapshot.logger.Errorf("failed to list bundles: %s", err.Error())
203+
return
204+
}
205+
206+
var operators []Operator
207+
for b := it.Next(); b != nil; b = it.Next() {
208+
o, err := NewOperatorFromBundle(b, "", snapshot.key)
209+
if err != nil {
210+
snapshot.logger.Warnf("failed to construct operator from bundle, continuing: %s", err.Error())
211+
continue
212+
}
213+
operators = append(operators, *o)
214+
}
215+
if err := it.Error(); err != nil {
216+
snapshot.logger.Warnf("error encountered while listing bundles: %s", err.Error())
217+
}
218+
219+
snapshot.operators = operators
220+
}
221+
222+
type CatalogSnapshot struct {
223+
logger logrus.FieldLogger
224+
key CatalogKey
225+
expiry time.Time
226+
operators []Operator
227+
m sync.RWMutex
228+
pop context.CancelFunc
229+
}
230+
231+
func (s *CatalogSnapshot) Cancel() {
232+
s.pop()
233+
}
234+
235+
func (s *CatalogSnapshot) Expired(at time.Time) bool {
236+
return !at.Before(s.expiry)
237+
}
238+
239+
type OperatorPredicate func(*Operator) bool
240+
241+
func (s *CatalogSnapshot) Find(p OperatorPredicate) []Operator {
242+
s.m.RLock()
243+
defer s.m.RUnlock()
244+
245+
var result []Operator
246+
for _, o := range s.operators {
247+
if p(&o) {
248+
result = append(result, o)
249+
}
250+
}
251+
return result
252+
}
253+
254+
func (n *NamespacedOperatorCache) GetCSVNameFromCatalog(csvName string, catalog CatalogKey) (Operator, error) {
255+
s, ok := n.snapshots[catalog]
256+
if !ok {
257+
return Operator{}, fmt.Errorf("catalog %s not found", catalog)
258+
}
259+
operators := s.Find(func(o *Operator) bool {
260+
return o.name == csvName
261+
})
262+
if len(operators) == 0 {
263+
return Operator{}, fmt.Errorf("operator %s not found in catalog %s", csvName, catalog)
264+
}
265+
if len(operators) > 1 {
266+
return Operator{}, fmt.Errorf("multiple operators named %s found in catalog %s", csvName, catalog)
267+
}
268+
return operators[0], nil
269+
}
270+
271+
func (n *NamespacedOperatorCache) GetCSVNameFromAllCatalogs(csvName string) ([]Operator, error) {
272+
var result []Operator
273+
for _, s := range n.snapshots {
274+
result = append(result, s.Find(func(o *Operator) bool {
275+
return o.name == csvName
276+
})...)
277+
}
278+
if len(result) == 0 {
279+
return nil, fmt.Errorf("operator %s not found in any catalog", csvName)
280+
}
281+
return result, nil
282+
}
283+
284+
func (n *NamespacedOperatorCache) GetPackageFromAllCatalogs(pkg string) ([]Operator, error) {
285+
var result []Operator
286+
for _, s := range n.snapshots {
287+
result = append(result, s.Find(func(o *Operator) bool {
288+
return o.Package() == pkg
289+
})...)
290+
}
291+
if len(result) == 0 {
292+
return nil, fmt.Errorf("operator with package %s not found in any catalog", pkg)
293+
}
294+
return result, nil
295+
}
296+
297+
func (n *NamespacedOperatorCache) GetPackageVersionFromAllCatalogs(pkg string, version semver.Version) ([]Operator, error) {
298+
var result []Operator
299+
for _, s := range n.snapshots {
300+
result = append(result, s.Find(func(o *Operator) bool {
301+
return o.Package() == pkg && o.version.Equals(version)
302+
})...)
303+
}
304+
if len(result) == 0 {
305+
return nil, fmt.Errorf("operator with package %s and version %s not found in any catalog", pkg, version)
306+
}
307+
return result, nil
308+
}

0 commit comments

Comments
 (0)