Skip to content

Commit 91bb3df

Browse files
modify cache to accept clusterName
1 parent 94146df commit 91bb3df

File tree

8 files changed

+334
-38
lines changed

8 files changed

+334
-38
lines changed

pkg/cache/cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ type Informers interface {
5656

5757
// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
5858
// of the underlying object.
59-
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, clusterName string) (Informer, error)
59+
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)
6060

6161
// Start runs all the informers known to this cache until the context is closed.
6262
// It blocks.

pkg/cache/informer_cache.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"reflect"
2323
"strings"
2424

25-
"k8s.io/apimachinery/pkg/api/meta"
2625
apimeta "k8s.io/apimachinery/pkg/api/meta"
2726
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2827
"k8s.io/apimachinery/pkg/runtime"
@@ -59,7 +58,7 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie
5958
return err
6059
}
6160

62-
started, cache, err := ip.InformersMap.Get(ctx, gvk, out, out.GetClusterName())
61+
started, cache, err := ip.InformersMap.Get(ctx, gvk, out)
6362
if err != nil {
6463
return err
6564
}
@@ -76,11 +75,11 @@ func (ip *informerCache) List(ctx context.Context, out client.ObjectList, opts .
7675
if err != nil {
7776
return err
7877
}
79-
metadata, err := meta.Accessor(cacheTypeObj)
80-
if err != nil {
81-
return err
82-
}
83-
started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj, metadata.GetClusterName())
78+
// metadata, err := meta.Accessor(cacheTypeObj)
79+
// if err != nil {
80+
// return err
81+
// }
82+
started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj)
8483
if err != nil {
8584
return err
8685
}
@@ -135,14 +134,14 @@ func (ip *informerCache) objectTypeForListObject(list client.ObjectList) (*schem
135134
}
136135

137136
// GetInformerForKind returns the informer for the GroupVersionKind.
138-
func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, clusterName string) (Informer, error) {
137+
func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
139138
// Map the gvk to an object
140139
obj, err := ip.Scheme.New(gvk)
141140
if err != nil {
142141
return nil, err
143142
}
144143

145-
_, i, err := ip.InformersMap.Get(ctx, gvk, obj, clusterName)
144+
_, i, err := ip.InformersMap.Get(ctx, gvk, obj)
146145
if err != nil {
147146
return nil, err
148147
}
@@ -157,7 +156,7 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In
157156
}
158157

159158
fmt.Println("getting informer from indexer")
160-
_, i, err := ip.InformersMap.Get(ctx, gvk, obj, obj.GetClusterName())
159+
_, i, err := ip.InformersMap.Get(ctx, gvk, obj)
161160
if err != nil {
162161
return nil, err
163162
}

pkg/cache/informertest/fake_cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type FakeInformers struct {
3939
}
4040

4141
// GetInformerForKind implements Informers.
42-
func (c *FakeInformers) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, clusterName string) (cache.Informer, error) {
42+
func (c *FakeInformers) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) {
4343
if c.Scheme == nil {
4444
c.Scheme = scheme.Scheme
4545
}
Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cache
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"time"
23+
24+
"github.com/prometheus/common/log"
25+
apimeta "k8s.io/apimachinery/pkg/api/meta"
26+
"k8s.io/apimachinery/pkg/runtime"
27+
"k8s.io/apimachinery/pkg/runtime/schema"
28+
"k8s.io/client-go/rest"
29+
"sigs.k8s.io/controller-runtime/pkg/client"
30+
)
31+
32+
// NewCacheFunc - Function for creating a new cache from the options and a rest config.
33+
type NewCacheFunc func(config *rest.Config, opts Options) (Cache, error)
34+
35+
// a new global namespaced cache to handle cluster scoped resources.
36+
const globalClusterCache = "_cluster"
37+
38+
// [_cluster -> cache ("*"), clusterNames -> caches ]
39+
40+
// MultiNamespacedCacheBuilder - Builder function to create a new multi-namespaced cache.
41+
// This will scope the cache to a list of namespaces. Listing for all namespaces
42+
// will list for all the namespaces that this knows about. By default this will create
43+
// a global cache for cluster scoped resource. Note that this is not intended
44+
// to be used for excluding namespaces, this is better done via a Predicate. Also note that
45+
// you may face performance issues when using this with a high number of namespaces.
46+
func MultiClusterCacheBuilder(clusterNames []string) NewCacheFunc {
47+
return func(config *rest.Config, opts Options) (Cache, error) {
48+
opts, err := defaultOpts(config, opts)
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
caches := map[string]Cache{}
54+
55+
// create aglobal cache for * scope
56+
gCache, err := New(config, opts)
57+
if err != nil {
58+
return nil, fmt.Errorf("error creating global cache %v", err)
59+
}
60+
61+
for _, cs := range clusterNames {
62+
opts.ClusterName = cs
63+
c, err := New(config, opts)
64+
if err != nil {
65+
return nil, err
66+
}
67+
caches[cs] = c
68+
}
69+
return &multiNamespaceCache{namespaceToCache: caches, Scheme: opts.Scheme, RESTMapper: opts.Mapper, clusterCache: gCache}, nil
70+
}
71+
}
72+
73+
// multiNamespaceCache knows how to handle multiple namespaced caches
74+
// Use this feature when scoping permissions for your
75+
// operator to a list of namespaces instead of watching every namespace
76+
// in the cluster.
77+
type multiClusterCache struct {
78+
clusterToCache map[string]Cache
79+
Scheme *runtime.Scheme
80+
RESTMapper apimeta.RESTMapper
81+
gClusterCache Cache // Point to "*"
82+
}
83+
84+
var _ Cache = &multiClusterCache{}
85+
86+
// Methods for multiNamespaceCache to conform to the Informers interface.
87+
func (c *multiClusterCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) {
88+
informers := map[string]Informer{}
89+
90+
//get CLusterName
91+
clusterName, err := getClusterName(obj)
92+
if err != nil {
93+
return nil, fmt.Errorf("error getting clustername %q", err)
94+
}
95+
96+
if len(clusterName) == "*" {
97+
globalInformer, err := c.gClusterCache.GetInformer(ctx, obj)
98+
if err != nil {
99+
return err
100+
}
101+
informers[globalClusterCache] = globalInformer
102+
}
103+
104+
for cs, cache := range c.clusterToCache {
105+
informer, err := cache.GetInformer(ctx, obj)
106+
if err != nil {
107+
return nil, err
108+
}
109+
informers[cs] = informer
110+
}
111+
112+
return &multiClusterCache{clusterToCache: informers}, nil
113+
114+
}
115+
116+
func getClusterName(obj client.Object) (string, error) {
117+
if obj == nil {
118+
return nil, fmt.Errorf("object cannot be empty %v", obj)
119+
}
120+
if obj.GetClusterName() != "" {
121+
return "*", nil
122+
}
123+
124+
return obj.GetClusterName(), nil
125+
}
126+
127+
func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, clusterName string) (Informer, error) {
128+
informers := map[string]Informer{} // clusterName -> informer
129+
130+
clusterName, err := getCLusterName(obj)
131+
if err != nil {
132+
return err
133+
}
134+
// globalCLusterCache
135+
if len(clusterName) == "*" {
136+
clusterCacheInf, err := c.gClusterCache.GetInformerForKind(ctx, gvk)
137+
if err != nil {
138+
return err
139+
}
140+
141+
informers[globalClusterCache] = clusterCacheInf
142+
return multiClusterCache{clusterToCache: informers}, nil
143+
}
144+
145+
for cs, cache := range c.clusterToCache {
146+
informer, err := cache.GetInformerForKind(ctx, gvk)
147+
if err != nil {
148+
return err
149+
}
150+
informer[cs] = informer
151+
}
152+
153+
return multiClusterCache{clusterToCache: informer}, nil
154+
}
155+
156+
func (c *multiClusterCache) Start(ctx context.Context) error {
157+
// start global cache
158+
go func() {
159+
err := c.glusterCache.Start(ctx)
160+
if err != nil {
161+
log.Error(err, "cluster scoped cache failed to start")
162+
}
163+
}()
164+
165+
// start namespaced caches
166+
for cs, cache := range c.clusterToCache {
167+
go func(cs string, cache Cache) {
168+
err := cache.Start(ctx)
169+
if err != nil {
170+
log.Error(err, "multiClusterCache cache failed to start cluster informer", "cluster", cs)
171+
}
172+
}(cs, cache)
173+
}
174+
175+
<-ctx.Done()
176+
return nil
177+
}
178+
179+
func (c *multiClusterCache) WaitForCacheSync(ctx context.Context) bool {
180+
synced := true
181+
for _, cache := range c.clusterToCache {
182+
if s := cache.WaitForCacheSync(ctx); !s {
183+
synced = s
184+
}
185+
}
186+
187+
// check if global cluster cache has synced
188+
if !c.gClusterCache.WaitForCacheSync(ctx) {
189+
synced = false
190+
}
191+
return synced
192+
}
193+
194+
func (c *multiNamespaceCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
195+
196+
clusterName, err := getCLusterName(obj)
197+
if err != nil {
198+
return err
199+
}
200+
201+
if clusterName == "*" {
202+
return c.gClusterCache.IndexField(ctx, obj, field, extractValue)
203+
}
204+
205+
for _, cache := range c.clusterToCache {
206+
if err := cache.IndexField(ctx, obj, field, extractValue); err != nil {
207+
return err
208+
}
209+
}
210+
return nil
211+
}
212+
213+
func (c *multiClusterCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object) error {
214+
clusterName, err := getClusterName(obj)
215+
if err != nil {
216+
return err
217+
}
218+
219+
if clusterName == "*" {
220+
// Look into the global cache to fetch the object
221+
return c.gClusterCache.Get(ctx, key, obj)
222+
}
223+
224+
cache, ok := c.clusterToCache[clusterName]
225+
if !ok {
226+
return fmt.Errorf("unable to get: %v because of unknown clusterName for the cache", key)
227+
}
228+
return cache.Get(ctx, key, obj)
229+
}
230+
231+
// List
232+
// ClusterName is not passed => error
233+
// ClusterName is passed => getCache
234+
// ListAll clusters => clusterName is "*"
235+
236+
func (c *multiClusterCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOptions) {
237+
listOpts := client.ListOptions{}
238+
listOpts.ApplyOptions(opts)
239+
240+
clusterName := opts.ClusterName
241+
if clusterName == "" {
242+
// initial stab - error out
243+
fmt.Errorf("cluster Name is empty in listOpts %v", listOpts)
244+
}
245+
246+
if clusterName == "*" {
247+
// Look at gloabal cluster cache
248+
return c.gClusterCache.List(ctx, list, opts...)
249+
}
250+
251+
// look at individual caches
252+
cache, ok := c.clusterToCache[clusterName]
253+
if !ok { // cache is not found to the particular cluster
254+
return fmt.Errorf("unable to get cache because clusterName %v is not known", clusterName)
255+
}
256+
return cache.List(ctx, list, opts...)
257+
}
258+
259+
// informer maps
260+
type multiClusterInformer struct {
261+
clusterNameToInformer map[string]Informer
262+
}
263+
264+
var _Informer = &multiClusterInformer
265+
266+
// AddEventHandler adds the handler to each namespaced informer.
267+
func (i *multiClusterInformer) AddEventHandler(handler toolscache.ResourceEventHandler) {
268+
for _, informer := range i.clusterNameToInformer {
269+
informer.AddEventHandler(handler)
270+
}
271+
}
272+
273+
// AddEventHandlerWithResyncPeriod adds the handler with a resync period to each namespaced informer.
274+
func (i *multiClusterInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) {
275+
for _, informer := range i.clusterNameToInformer {
276+
informer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
277+
}
278+
}
279+
280+
// AddIndexers adds the indexer for each namespaced informer.
281+
func (i *multiClusterInformer) AddIndexers(indexers toolscache.Indexers) error {
282+
for _, informer := range i.clusterNameToInformer {
283+
err := informer.AddIndexers(indexers)
284+
if err != nil {
285+
return err
286+
}
287+
}
288+
return nil
289+
}
290+
291+
// HasSynced checks if each namespaced informer has synced.
292+
func (i *multiClusterInformer) HasSynced() bool {
293+
for _, informer := range i.clusterNameToInformer {
294+
if ok := informer.HasSynced(); !ok {
295+
return ok
296+
}
297+
}
298+
return true
299+
}

0 commit comments

Comments
 (0)