Skip to content

✨ Add support for creating namespaced watches #692

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,18 @@ type Informers interface {
// API kind and resource.
GetInformer(obj runtime.Object) (Informer, error)

// GetInformer fetches or constructs an informer for the given object that corresponds to a single
// API kind and resource.
GetInformerInNamespace(obj runtime.Object, namespace *string) (Informer, error)

// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
// of the underlying object.
GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error)

// GetInformerForKindInNamespace is similar to GetInformerInNamespace, except that it takes a group-version-kind, instead
// of the underlying object.
GetInformerForKindInNamespace(gvk schema.GroupVersionKind, namespace *string) (Informer, error)

// Start runs all the informers known to this cache until the given channel is closed.
// It blocks.
Start(stopCh <-chan struct{}) error
Expand Down
16 changes: 12 additions & 4 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out runt
return err
}

started, cache, err := ip.InformersMap.Get(gvk, out)
started, cache, err := ip.InformersMap.Get(gvk, out, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -101,7 +101,7 @@ func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...c
}
}

started, cache, err := ip.InformersMap.Get(gvk, cacheTypeObj)
started, cache, err := ip.InformersMap.Get(gvk, cacheTypeObj, nil)
if err != nil {
return err
}
Expand All @@ -115,12 +115,16 @@ func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...c

// GetInformerForKind returns the informer for the GroupVersionKind
func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error) {
return ip.GetInformerForKindInNamespace(gvk, nil)
}

func (ip *informerCache) GetInformerForKindInNamespace(gvk schema.GroupVersionKind, namespace *string) (Informer, error) {
// Map the gvk to an object
obj, err := ip.Scheme.New(gvk)
if err != nil {
return nil, err
}
_, i, err := ip.InformersMap.Get(gvk, obj)
_, i, err := ip.InformersMap.Get(gvk, obj, namespace)
if err != nil {
return nil, err
}
Expand All @@ -129,11 +133,15 @@ func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (Inform

// GetInformer returns the informer for the obj
func (ip *informerCache) GetInformer(obj runtime.Object) (Informer, error) {
return ip.GetInformerInNamespace(obj, nil)
}

func (ip *informerCache) GetInformerInNamespace(obj runtime.Object, namespace *string) (Informer, error) {
gvk, err := apiutil.GVKForObject(obj, ip.Scheme)
if err != nil {
return nil, err
}
_, i, err := ip.InformersMap.Get(gvk, obj)
_, i, err := ip.InformersMap.Get(gvk, obj, namespace)
if err != nil {
return nil, err
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ func (c *FakeInformers) GetInformerForKind(gvk schema.GroupVersionKind) (cache.I
return c.informerFor(gvk, obj)
}

// GetInformerForKindInNamespace implements Informers
func (c *FakeInformers) GetInformerForKindInNamespace(gvk schema.GroupVersionKind, namespace *string) (cache.Informer, error) {
if c.Scheme == nil {
c.Scheme = scheme.Scheme
}
obj, err := c.Scheme.New(gvk)
if err != nil {
return nil, err
}
return c.informerFor(gvk, obj)
}

// FakeInformerForKind implements Informers
func (c *FakeInformers) FakeInformerForKind(gvk schema.GroupVersionKind) (*controllertest.FakeInformer, error) {
if c.Scheme == nil {
Expand Down Expand Up @@ -79,6 +91,19 @@ func (c *FakeInformers) GetInformer(obj runtime.Object) (cache.Informer, error)
return c.informerFor(gvk, obj)
}

// GetInformerInNamespace implements Informers
func (c *FakeInformers) GetInformerInNamespace(obj runtime.Object, namespace *string) (cache.Informer, error) {
if c.Scheme == nil {
c.Scheme = scheme.Scheme
}
gvks, _, err := c.Scheme.ObjectKinds(obj)
if err != nil {
return nil, err
}
gvk := gvks[0]
return c.informerFor(gvk, obj)
}

// WaitForCacheSync implements Informers
func (c *FakeInformers) WaitForCacheSync(stop <-chan struct{}) bool {
if c.Synced == nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,16 @@ func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {

// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
// the Informer from the map.
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object, namespace *string) (bool, *MapEntry, error) {
_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
isUnstructured = isUnstructured || isUnstructuredList

if isUnstructured {
return m.unstructured.Get(gvk, obj)
return m.unstructured.Get(gvk, obj, namespace)
}

return m.structured.Get(gvk, obj)
return m.structured.Get(gvk, obj, namespace)
}

// newStructuredInformersMap creates a new InformersMap for structured objects.
Expand Down
69 changes: 44 additions & 25 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

// clientListWatcherFunc knows how to create a ListWatcher
type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error)
type createListWatcherFunc func(gvk schema.GroupVersionKind, namespace *string, ip *specificInformersMap) (*cache.ListWatch, error)

// newSpecificInformersMap returns a new specificInformersMap (like
// the generical InformersMap, except that it doesn't implement WaitForCacheSync).
Expand All @@ -49,7 +49,7 @@ func newSpecificInformersMap(config *rest.Config,
config: config,
Scheme: scheme,
mapper: mapper,
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
informers: make(map[MapKey]*MapEntry),
codecs: serializer.NewCodecFactory(scheme),
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
Expand All @@ -59,6 +59,11 @@ func newSpecificInformersMap(config *rest.Config,
return ip
}

type MapKey struct {
gvk schema.GroupVersionKind
namespace *string
}

// MapEntry contains the cached data for an Informer
type MapEntry struct {
// Informer is the cached informer
Expand All @@ -80,8 +85,8 @@ type specificInformersMap struct {
// mapper maps GroupVersionKinds to Resources
mapper meta.RESTMapper

// informersByGVK is the cache of informers keyed by groupVersionKind
informersByGVK map[schema.GroupVersionKind]*MapEntry
// informers is the cache of informers keyed by groupVersionKind and namespace
informers map[MapKey]*MapEntry

// codecs is used to create a new REST client
codecs serializer.CodecFactory
Expand Down Expand Up @@ -122,7 +127,7 @@ func (ip *specificInformersMap) Start(stop <-chan struct{}) {
ip.stop = stop

// Start each informer
for _, informer := range ip.informersByGVK {
for _, informer := range ip.informers {
go informer.Informer.Run(stop)
}

Expand All @@ -136,27 +141,31 @@ func (ip *specificInformersMap) Start(stop <-chan struct{}) {
func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {
ip.mu.RLock()
defer ip.mu.RUnlock()
syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informersByGVK))
for _, informer := range ip.informersByGVK {
syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informers))
for _, informer := range ip.informers {
syncedFuncs = append(syncedFuncs, informer.Informer.HasSynced)
}
return syncedFuncs
}

// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object, namespace *string) (bool, *MapEntry, error) {
key := MapKey{
gvk: gvk,
namespace: namespace,
}
// Return the informer if it is found
i, started, ok := func() (*MapEntry, bool, bool) {
ip.mu.RLock()
defer ip.mu.RUnlock()
i, ok := ip.informersByGVK[gvk]
i, ok := ip.informers[key]
return i, ip.started, ok
}()

if !ok {
var err error
if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
if i, started, err = ip.addInformerToMap(key, obj); err != nil {
return started, nil, err
}
}
Expand All @@ -171,20 +180,20 @@ func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Obj
return started, i, nil
}

func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) {
func (ip *specificInformersMap) addInformerToMap(key MapKey, obj runtime.Object) (*MapEntry, bool, error) {
ip.mu.Lock()
defer ip.mu.Unlock()

// Check the cache to see if we already have an Informer. If we do, return the Informer.
// This is for the case where 2 routines tried to get the informer when it wasn't in the map
// so neither returned early, but the first one created it.
if i, ok := ip.informersByGVK[gvk]; ok {
if i, ok := ip.informers[key]; ok {
return i, ip.started, nil
}

// Create a NewSharedIndexInformer and add it to the map.
var lw *cache.ListWatch
lw, err := ip.createListWatcher(gvk, ip)
lw, err := ip.createListWatcher(key.gvk, key.namespace, ip)
if err != nil {
return nil, false, err
}
Expand All @@ -193,9 +202,9 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
})
i := &MapEntry{
Informer: ni,
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: key.gvk},
}
ip.informersByGVK[gvk] = i
ip.informers[key] = i

// Start the Informer if need by
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
Expand All @@ -207,7 +216,7 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
}

// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
func createStructuredListWatch(gvk schema.GroupVersionKind, namespace *string, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
Expand All @@ -225,25 +234,30 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
return nil, err
}

ns := ip.namespace
if namespace != nil {
ns = *namespace
}

// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
res := listObj.DeepCopyObject()
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
isNamespaceScoped := ns != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(ns, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
isNamespaceScoped := ns != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
return client.Get().NamespaceIfScoped(ns, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
},
}, nil
}

func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
func createUnstructuredListWatch(gvk schema.GroupVersionKind, namespace *string, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
Expand All @@ -255,20 +269,25 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform
return nil, err
}

ns := ip.namespace
if namespace != nil {
ns = *namespace
}

// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(opts)
if ns != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ns).List(opts)
}
return dynamicClient.Resource(mapping.Resource).List(opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).Watch(opts)
if ns != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ns).Watch(opts)
}
return dynamicClient.Resource(mapping.Resource).Watch(opts)
},
Expand Down
11 changes: 9 additions & 2 deletions pkg/cache/multi_namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@ var _ Cache = &multiNamespaceCache{}

// Methods for multiNamespaceCache to conform to the Informers interface
func (c *multiNamespaceCache) GetInformer(obj runtime.Object) (Informer, error) {
return c.GetInformerInNamespace(obj, nil)
}
func (c *multiNamespaceCache) GetInformerInNamespace(obj runtime.Object, namespace *string) (Informer, error) {
informers := map[string]Informer{}
for ns, cache := range c.namespaceToCache {
informer, err := cache.GetInformer(obj)
informer, err := cache.GetInformerInNamespace(obj, namespace)
if err != nil {
return nil, err
}
Expand All @@ -81,9 +84,13 @@ func (c *multiNamespaceCache) GetInformer(obj runtime.Object) (Informer, error)
}

func (c *multiNamespaceCache) GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error) {
return c.GetInformerForKindInNamespace(gvk, nil)
}

func (c *multiNamespaceCache) GetInformerForKindInNamespace(gvk schema.GroupVersionKind, namespace *string) (Informer, error) {
informers := map[string]Informer{}
for ns, cache := range c.namespaceToCache {
informer, err := cache.GetInformerForKind(gvk)
informer, err := cache.GetInformerForKindInNamespace(gvk, namespace)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type Kind struct {
// Type is the type of object to watch. e.g. &v1.Pod{}
Type runtime.Object

// Namespace is the namespace for watching the objects
Namespace *string

// cache used to watch APIs
cache cache.Cache
}
Expand All @@ -82,7 +85,7 @@ func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimiting
}

// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, err := ks.cache.GetInformer(ks.Type)
i, err := ks.cache.GetInformerInNamespace(ks.Type, ks.Namespace)
if err != nil {
if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
Expand Down