Skip to content

Commit 0423490

Browse files
author
Ulf Lilleengen
committed
Add support for creating namespaced watches
1 parent 198197f commit 0423490

File tree

6 files changed

+80
-35
lines changed

6 files changed

+80
-35
lines changed

pkg/cache/cache.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,18 @@ type Informers interface {
5353
// API kind and resource.
5454
GetInformer(obj runtime.Object) (Informer, error)
5555

56+
// GetInformer fetches or constructs an informer for the given object that corresponds to a single
57+
// API kind and resource.
58+
GetInformerInNamespace(obj runtime.Object, namespace *string) (Informer, error)
59+
5660
// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
5761
// of the underlying object.
5862
GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error)
5963

64+
// GetInformerForKindInNamespace is similar to GetInformerInNamespace, except that it takes a group-version-kind, instead
65+
// of the underlying object.
66+
GetInformerForKindInNamespace(gvk schema.GroupVersionKind, namespace *string) (Informer, error)
67+
6068
// Start runs all the informers known to this cache until the given channel is closed.
6169
// It blocks.
6270
Start(stopCh <-chan struct{}) error

pkg/cache/informer_cache.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out runt
5757
return err
5858
}
5959

60-
started, cache, err := ip.InformersMap.Get(gvk, out)
60+
started, cache, err := ip.InformersMap.Get(gvk, out, &key.Namespace)
6161
if err != nil {
6262
return err
6363
}
@@ -101,7 +101,7 @@ func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...c
101101
}
102102
}
103103

104-
started, cache, err := ip.InformersMap.Get(gvk, cacheTypeObj)
104+
started, cache, err := ip.InformersMap.Get(gvk, cacheTypeObj, nil)
105105
if err != nil {
106106
return err
107107
}
@@ -115,12 +115,16 @@ func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...c
115115

116116
// GetInformerForKind returns the informer for the GroupVersionKind
117117
func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error) {
118+
return ip.GetInformerForKindInNamespace(gvk, nil)
119+
}
120+
121+
func (ip *informerCache) GetInformerForKindInNamespace(gvk schema.GroupVersionKind, namespace *string) (Informer, error) {
118122
// Map the gvk to an object
119123
obj, err := ip.Scheme.New(gvk)
120124
if err != nil {
121125
return nil, err
122126
}
123-
_, i, err := ip.InformersMap.Get(gvk, obj)
127+
_, i, err := ip.InformersMap.Get(gvk, obj, namespace)
124128
if err != nil {
125129
return nil, err
126130
}
@@ -129,11 +133,15 @@ func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (Inform
129133

130134
// GetInformer returns the informer for the obj
131135
func (ip *informerCache) GetInformer(obj runtime.Object) (Informer, error) {
136+
return ip.GetInformerInNamespace(obj, nil)
137+
}
138+
139+
func (ip *informerCache) GetInformerInNamespace(obj runtime.Object, namespace *string) (Informer, error) {
132140
gvk, err := apiutil.GVKForObject(obj, ip.Scheme)
133141
if err != nil {
134142
return nil, err
135143
}
136-
_, i, err := ip.InformersMap.Get(gvk, obj)
144+
_, i, err := ip.InformersMap.Get(gvk, obj, namespace)
137145
if err != nil {
138146
return nil, err
139147
}

pkg/cache/internal/deleg_map.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,16 @@ func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {
7373

7474
// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
7575
// the Informer from the map.
76-
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
76+
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object, namespace *string) (bool, *MapEntry, error) {
7777
_, isUnstructured := obj.(*unstructured.Unstructured)
7878
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
7979
isUnstructured = isUnstructured || isUnstructuredList
8080

8181
if isUnstructured {
82-
return m.unstructured.Get(gvk, obj)
82+
return m.unstructured.Get(gvk, obj, namespace)
8383
}
8484

85-
return m.structured.Get(gvk, obj)
85+
return m.structured.Get(gvk, obj, namespace)
8686
}
8787

8888
// newStructuredInformersMap creates a new InformersMap for structured objects.

pkg/cache/internal/informers_map.go

Lines changed: 44 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import (
3535
)
3636

3737
// clientListWatcherFunc knows how to create a ListWatcher
38-
type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error)
38+
type createListWatcherFunc func(gvk schema.GroupVersionKind, namespace *string, ip *specificInformersMap) (*cache.ListWatch, error)
3939

4040
// newSpecificInformersMap returns a new specificInformersMap (like
4141
// the generical InformersMap, except that it doesn't implement WaitForCacheSync).
@@ -49,7 +49,7 @@ func newSpecificInformersMap(config *rest.Config,
4949
config: config,
5050
Scheme: scheme,
5151
mapper: mapper,
52-
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
52+
informers: make(map[MapKey]*MapEntry),
5353
codecs: serializer.NewCodecFactory(scheme),
5454
paramCodec: runtime.NewParameterCodec(scheme),
5555
resync: resync,
@@ -59,6 +59,11 @@ func newSpecificInformersMap(config *rest.Config,
5959
return ip
6060
}
6161

62+
type MapKey struct {
63+
gvk schema.GroupVersionKind
64+
namespace *string
65+
}
66+
6267
// MapEntry contains the cached data for an Informer
6368
type MapEntry struct {
6469
// Informer is the cached informer
@@ -80,8 +85,8 @@ type specificInformersMap struct {
8085
// mapper maps GroupVersionKinds to Resources
8186
mapper meta.RESTMapper
8287

83-
// informersByGVK is the cache of informers keyed by groupVersionKind
84-
informersByGVK map[schema.GroupVersionKind]*MapEntry
88+
// informers is the cache of informers keyed by groupVersionKind and namespace
89+
informers map[MapKey]*MapEntry
8590

8691
// codecs is used to create a new REST client
8792
codecs serializer.CodecFactory
@@ -122,7 +127,7 @@ func (ip *specificInformersMap) Start(stop <-chan struct{}) {
122127
ip.stop = stop
123128

124129
// Start each informer
125-
for _, informer := range ip.informersByGVK {
130+
for _, informer := range ip.informers {
126131
go informer.Informer.Run(stop)
127132
}
128133

@@ -136,27 +141,31 @@ func (ip *specificInformersMap) Start(stop <-chan struct{}) {
136141
func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {
137142
ip.mu.RLock()
138143
defer ip.mu.RUnlock()
139-
syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informersByGVK))
140-
for _, informer := range ip.informersByGVK {
144+
syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informers))
145+
for _, informer := range ip.informers {
141146
syncedFuncs = append(syncedFuncs, informer.Informer.HasSynced)
142147
}
143148
return syncedFuncs
144149
}
145150

146151
// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
147152
// the Informer from the map.
148-
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
153+
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object, namespace *string) (bool, *MapEntry, error) {
154+
key := MapKey{
155+
gvk: gvk,
156+
namespace: namespace,
157+
}
149158
// Return the informer if it is found
150159
i, started, ok := func() (*MapEntry, bool, bool) {
151160
ip.mu.RLock()
152161
defer ip.mu.RUnlock()
153-
i, ok := ip.informersByGVK[gvk]
162+
i, ok := ip.informers[key]
154163
return i, ip.started, ok
155164
}()
156165

157166
if !ok {
158167
var err error
159-
if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
168+
if i, started, err = ip.addInformerToMap(key, obj); err != nil {
160169
return started, nil, err
161170
}
162171
}
@@ -171,20 +180,20 @@ func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Obj
171180
return started, i, nil
172181
}
173182

174-
func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) {
183+
func (ip *specificInformersMap) addInformerToMap(key MapKey, obj runtime.Object) (*MapEntry, bool, error) {
175184
ip.mu.Lock()
176185
defer ip.mu.Unlock()
177186

178187
// Check the cache to see if we already have an Informer. If we do, return the Informer.
179188
// This is for the case where 2 routines tried to get the informer when it wasn't in the map
180189
// so neither returned early, but the first one created it.
181-
if i, ok := ip.informersByGVK[gvk]; ok {
190+
if i, ok := ip.informers[key]; ok {
182191
return i, ip.started, nil
183192
}
184193

185194
// Create a NewSharedIndexInformer and add it to the map.
186195
var lw *cache.ListWatch
187-
lw, err := ip.createListWatcher(gvk, ip)
196+
lw, err := ip.createListWatcher(key.gvk, key.namespace, ip)
188197
if err != nil {
189198
return nil, false, err
190199
}
@@ -193,9 +202,9 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
193202
})
194203
i := &MapEntry{
195204
Informer: ni,
196-
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
205+
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: key.gvk},
197206
}
198-
ip.informersByGVK[gvk] = i
207+
ip.informers[key] = i
199208

200209
// Start the Informer if need by
201210
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
@@ -207,7 +216,7 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
207216
}
208217

209218
// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
210-
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
219+
func createStructuredListWatch(gvk schema.GroupVersionKind, namespace *string, ip *specificInformersMap) (*cache.ListWatch, error) {
211220
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
212221
// groupVersionKind to the Resource API we will use.
213222
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
@@ -225,25 +234,30 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
225234
return nil, err
226235
}
227236

237+
ns := ip.namespace
238+
if namespace != nil {
239+
ns = *namespace
240+
}
241+
228242
// Create a new ListWatch for the obj
229243
return &cache.ListWatch{
230244
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
231245
res := listObj.DeepCopyObject()
232-
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
233-
err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
246+
isNamespaceScoped := ns != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
247+
err := client.Get().NamespaceIfScoped(ns, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
234248
return res, err
235249
},
236250
// Setup the watch function
237251
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
238252
// Watch needs to be set to true separately
239253
opts.Watch = true
240-
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
241-
return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
254+
isNamespaceScoped := ns != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
255+
return client.Get().NamespaceIfScoped(ns, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
242256
},
243257
}, nil
244258
}
245259

246-
func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
260+
func createUnstructuredListWatch(gvk schema.GroupVersionKind, namespace *string, ip *specificInformersMap) (*cache.ListWatch, error) {
247261
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
248262
// groupVersionKind to the Resource API we will use.
249263
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
@@ -255,20 +269,25 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform
255269
return nil, err
256270
}
257271

272+
ns := ip.namespace
273+
if namespace != nil {
274+
ns = *namespace
275+
}
276+
258277
// Create a new ListWatch for the obj
259278
return &cache.ListWatch{
260279
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
261-
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
262-
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(opts)
280+
if ns != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
281+
return dynamicClient.Resource(mapping.Resource).Namespace(ns).List(opts)
263282
}
264283
return dynamicClient.Resource(mapping.Resource).List(opts)
265284
},
266285
// Setup the watch function
267286
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
268287
// Watch needs to be set to true separately
269288
opts.Watch = true
270-
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
271-
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).Watch(opts)
289+
if ns != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
290+
return dynamicClient.Resource(mapping.Resource).Namespace(ns).Watch(opts)
272291
}
273292
return dynamicClient.Resource(mapping.Resource).Watch(opts)
274293
},

pkg/cache/multi_namespace_cache.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,12 @@ var _ Cache = &multiNamespaceCache{}
6969

7070
// Methods for multiNamespaceCache to conform to the Informers interface
7171
func (c *multiNamespaceCache) GetInformer(obj runtime.Object) (Informer, error) {
72+
return c.GetInformerInNamespace(obj, nil)
73+
}
74+
func (c *multiNamespaceCache) GetInformerInNamespace(obj runtime.Object, namespace *string) (Informer, error) {
7275
informers := map[string]Informer{}
7376
for ns, cache := range c.namespaceToCache {
74-
informer, err := cache.GetInformer(obj)
77+
informer, err := cache.GetInformerInNamespace(obj, namespace)
7578
if err != nil {
7679
return nil, err
7780
}
@@ -81,9 +84,13 @@ func (c *multiNamespaceCache) GetInformer(obj runtime.Object) (Informer, error)
8184
}
8285

8386
func (c *multiNamespaceCache) GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error) {
87+
return c.GetInformerForKindInNamespace(gvk, nil)
88+
}
89+
90+
func (c *multiNamespaceCache) GetInformerForKindInNamespace(gvk schema.GroupVersionKind, namespace *string) (Informer, error) {
8491
informers := map[string]Informer{}
8592
for ns, cache := range c.namespaceToCache {
86-
informer, err := cache.GetInformerForKind(gvk)
93+
informer, err := cache.GetInformerForKindInNamespace(gvk, namespace)
8794
if err != nil {
8895
return nil, err
8996
}

pkg/source/source.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ type Kind struct {
6060
// Type is the type of object to watch. e.g. &v1.Pod{}
6161
Type runtime.Object
6262

63+
// Namespace is the namespace for watching the objects
64+
Namespace *string
65+
6366
// cache used to watch APIs
6467
cache cache.Cache
6568
}
@@ -82,7 +85,7 @@ func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimiting
8285
}
8386

8487
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
85-
i, err := ks.cache.GetInformer(ks.Type)
88+
i, err := ks.cache.GetInformerInNamespace(ks.Type, ks.Namespace)
8689
if err != nil {
8790
if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
8891
log.Error(err, "if kind is a CRD, it should be installed before calling Start",

0 commit comments

Comments
 (0)