Skip to content

Commit 19cf5db

Browse files
fix errors - clusterCaching works
1 parent 91bb3df commit 19cf5db

File tree

5 files changed

+30
-51
lines changed

5 files changed

+30
-51
lines changed

examples/kcp/main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
2626
ctrl "sigs.k8s.io/controller-runtime"
2727
api "sigs.k8s.io/controller-runtime/examples/crd/pkg"
28+
"sigs.k8s.io/controller-runtime/pkg/cache"
2829
"sigs.k8s.io/controller-runtime/pkg/client"
2930
"sigs.k8s.io/controller-runtime/pkg/log"
3031
"sigs.k8s.io/controller-runtime/pkg/log/zap"
@@ -57,7 +58,10 @@ func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
5758
func main() {
5859
ctrl.SetLogger(zap.New())
5960

60-
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
61+
cfg := ctrl.GetConfigOrDie()
62+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
63+
NewCache: cache.MultiClusterCacheBuilder([]string{"*"}),
64+
})
6165
if err != nil {
6266
setupLog.Error(err, "unable to start manager")
6367
os.Exit(1)

pkg/builder/controller.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,11 @@ func (blder *Builder) doWatch() error {
229229
if err != nil {
230230
return err
231231
}
232-
src := &source.Kind{Type: typeForSrc, ClusterName: blder.cluster}
232+
233+
// watch the cluster which is being passed to the controller - initial watch
234+
typeForSrc.SetClusterName(blder.cluster)
235+
236+
src := &source.Kind{Type: typeForSrc}
233237
hdler := &handler.EnqueueRequestForObject{}
234238
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
235239
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
@@ -242,7 +246,7 @@ func (blder *Builder) doWatch() error {
242246
if err != nil {
243247
return err
244248
}
245-
src := &source.Kind{Type: typeForSrc, ClusterName: blder.cluster}
249+
src := &source.Kind{Type: typeForSrc}
246250
hdler := &handler.EnqueueRequestForOwner{
247251
OwnerType: blder.forInput.object,
248252
IsController: true,

pkg/cache/internal/clusterscoped_cache.go renamed to pkg/cache/clusterscoped_cache.go

Lines changed: 16 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,14 @@ import (
2121
"fmt"
2222
"time"
2323

24-
"github.com/prometheus/common/log"
2524
apimeta "k8s.io/apimachinery/pkg/api/meta"
2625
"k8s.io/apimachinery/pkg/runtime"
2726
"k8s.io/apimachinery/pkg/runtime/schema"
2827
"k8s.io/client-go/rest"
28+
toolscache "k8s.io/client-go/tools/cache"
2929
"sigs.k8s.io/controller-runtime/pkg/client"
3030
)
3131

32-
// NewCacheFunc - Function for creating a new cache from the options and a rest config.
33-
type NewCacheFunc func(config *rest.Config, opts Options) (Cache, error)
34-
3532
// a new global namespaced cache to handle cluster scoped resources.
3633
const globalClusterCache = "_cluster"
3734

@@ -66,7 +63,7 @@ func MultiClusterCacheBuilder(clusterNames []string) NewCacheFunc {
6663
}
6764
caches[cs] = c
6865
}
69-
return &multiNamespaceCache{namespaceToCache: caches, Scheme: opts.Scheme, RESTMapper: opts.Mapper, clusterCache: gCache}, nil
66+
return &multiClusterCache{clusterToCache: caches, Scheme: opts.Scheme, RESTMapper: opts.Mapper, gClusterCache: gCache}, nil
7067
}
7168
}
7269

@@ -93,10 +90,10 @@ func (c *multiClusterCache) GetInformer(ctx context.Context, obj client.Object)
9390
return nil, fmt.Errorf("error getting clustername %q", err)
9491
}
9592

96-
if len(clusterName) == "*" {
93+
if (clusterName) == "*" {
9794
globalInformer, err := c.gClusterCache.GetInformer(ctx, obj)
9895
if err != nil {
99-
return err
96+
return nil, err
10097
}
10198
informers[globalClusterCache] = globalInformer
10299
}
@@ -109,13 +106,13 @@ func (c *multiClusterCache) GetInformer(ctx context.Context, obj client.Object)
109106
informers[cs] = informer
110107
}
111108

112-
return &multiClusterCache{clusterToCache: informers}, nil
109+
return &multiClusterInformer{clusterNameToInformer: informers}, nil
113110

114111
}
115112

116113
func getClusterName(obj client.Object) (string, error) {
117114
if obj == nil {
118-
return nil, fmt.Errorf("object cannot be empty %v", obj)
115+
return "", fmt.Errorf("object cannot be empty %v", obj)
119116
}
120117
if obj.GetClusterName() != "" {
121118
return "*", nil
@@ -124,39 +121,14 @@ func getClusterName(obj client.Object) (string, error) {
124121
return obj.GetClusterName(), nil
125122
}
126123

127-
func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, clusterName string) (Informer, error) {
128-
informers := map[string]Informer{} // clusterName -> informer
129-
130-
clusterName, err := getCLusterName(obj)
131-
if err != nil {
132-
return err
133-
}
134-
// globalCLusterCache
135-
if len(clusterName) == "*" {
136-
clusterCacheInf, err := c.gClusterCache.GetInformerForKind(ctx, gvk)
137-
if err != nil {
138-
return err
139-
}
140-
141-
informers[globalClusterCache] = clusterCacheInf
142-
return multiClusterCache{clusterToCache: informers}, nil
143-
}
144-
145-
for cs, cache := range c.clusterToCache {
146-
informer, err := cache.GetInformerForKind(ctx, gvk)
147-
if err != nil {
148-
return err
149-
}
150-
informer[cs] = informer
151-
}
152-
153-
return multiClusterCache{clusterToCache: informer}, nil
124+
func (c *multiClusterCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
125+
return nil, fmt.Errorf("not supported in multiClustercache")
154126
}
155127

156128
func (c *multiClusterCache) Start(ctx context.Context) error {
157129
// start global cache
158130
go func() {
159-
err := c.glusterCache.Start(ctx)
131+
err := c.gClusterCache.Start(ctx)
160132
if err != nil {
161133
log.Error(err, "cluster scoped cache failed to start")
162134
}
@@ -191,9 +163,9 @@ func (c *multiClusterCache) WaitForCacheSync(ctx context.Context) bool {
191163
return synced
192164
}
193165

194-
func (c *multiNamespaceCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
166+
func (c *multiClusterCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
195167

196-
clusterName, err := getCLusterName(obj)
168+
clusterName, err := getClusterName(obj)
197169
if err != nil {
198170
return err
199171
}
@@ -233,16 +205,17 @@ func (c *multiClusterCache) Get(ctx context.Context, key client.ObjectKey, obj c
233205
// ClusterName is passed => getCache
234206
// ListAll clusters => clusterName is "*"
235207

236-
func (c *multiClusterCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOptions) {
208+
func (c *multiClusterCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
237209
listOpts := client.ListOptions{}
238-
listOpts.ApplyOptions(opts)
239210

240-
clusterName := opts.ClusterName
211+
clusterName := listOpts.ClusterName
241212
if clusterName == "" {
242213
// initial stab - error out
243214
fmt.Errorf("cluster Name is empty in listOpts %v", listOpts)
244215
}
245216

217+
listOpts.ApplyOptions(opts)
218+
246219
if clusterName == "*" {
247220
// Look at gloabal cluster cache
248221
return c.gClusterCache.List(ctx, list, opts...)
@@ -261,7 +234,7 @@ type multiClusterInformer struct {
261234
clusterNameToInformer map[string]Informer
262235
}
263236

264-
var _Informer = &multiClusterInformer
237+
var _Informer = &multiClusterInformer{}
265238

266239
// AddEventHandler adds the handler to each namespaced informer.
267240
func (i *multiClusterInformer) AddEventHandler(handler toolscache.ResourceEventHandler) {

pkg/cache/internal/deleg_map.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ func NewInformersMap(config *rest.Config,
5656
disableDeepCopy DisableDeepCopyByGVK,
5757
) *InformersMap {
5858
return &InformersMap{
59-
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, clusterName, selectors, disableDeepCopy),
60-
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, clusterName, selectors, disableDeepCopy),
61-
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, clusterName, selectors, disableDeepCopy),
59+
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
60+
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
61+
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
6262

6363
Scheme: scheme,
6464
}

pkg/source/source.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,6 @@ type Kind struct {
9494
// cache used to watch APIs
9595
cache cache.Cache
9696

97-
ClusterName string
98-
9997
// started may contain an error if one was encountered during startup. If its closed and does not
10098
// contain an error, startup and syncing finished.
10199
started chan error

0 commit comments

Comments
 (0)