Skip to content

✨ Add support for creating namespaced watches #694

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ type Cache interface {
type Informers interface {
// GetInformer fetches or constructs an informer for the given object that corresponds to a single
// API kind and resource.
GetInformer(obj runtime.Object) (Informer, error)
GetInformer(obj runtime.Object, opts ...client.ListOption) (Informer, error)

// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
// of the underlying object.
GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error)
GetInformerForKind(gvk schema.GroupVersionKind, opts ...client.ListOption) (Informer, error)

// Start runs all the informers known to this cache until the given channel is closed.
// It blocks.
Expand Down
12 changes: 6 additions & 6 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out runt
return err
}

started, cache, err := ip.InformersMap.Get(gvk, out)
started, cache, err := ip.InformersMap.Get(gvk, out, client.InNamespace(key.Namespace))
if err != nil {
return err
}
Expand Down Expand Up @@ -101,7 +101,7 @@ func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...c
}
}

started, cache, err := ip.InformersMap.Get(gvk, cacheTypeObj)
started, cache, err := ip.InformersMap.Get(gvk, cacheTypeObj, opts...)
if err != nil {
return err
}
Expand All @@ -114,26 +114,26 @@ func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...c
}

// GetInformerForKind returns the informer for the GroupVersionKind
func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error) {
func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind, opts ...client.ListOption) (Informer, error) {
// Map the gvk to an object
obj, err := ip.Scheme.New(gvk)
if err != nil {
return nil, err
}
_, i, err := ip.InformersMap.Get(gvk, obj)
_, i, err := ip.InformersMap.Get(gvk, obj, opts...)
if err != nil {
return nil, err
}
return i.Informer, err
}

// GetInformer returns the informer for the obj
func (ip *informerCache) GetInformer(obj runtime.Object) (Informer, error) {
func (ip *informerCache) GetInformer(obj runtime.Object, opts ...client.ListOption) (Informer, error) {
gvk, err := apiutil.GVKForObject(obj, ip.Scheme)
if err != nil {
return nil, err
}
_, i, err := ip.InformersMap.Get(gvk, obj)
_, i, err := ip.InformersMap.Get(gvk, obj, opts...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type FakeInformers struct {
}

// GetInformerForKind implements Informers
func (c *FakeInformers) GetInformerForKind(gvk schema.GroupVersionKind) (cache.Informer, error) {
func (c *FakeInformers) GetInformerForKind(gvk schema.GroupVersionKind, opts ...client.ListOption) (cache.Informer, error) {
if c.Scheme == nil {
c.Scheme = scheme.Scheme
}
Expand Down Expand Up @@ -67,7 +67,7 @@ func (c *FakeInformers) FakeInformerForKind(gvk schema.GroupVersionKind) (*contr
}

// GetInformer implements Informers
func (c *FakeInformers) GetInformer(obj runtime.Object) (cache.Informer, error) {
func (c *FakeInformers) GetInformer(obj runtime.Object, opts ...client.ListOption) (cache.Informer, error) {
if c.Scheme == nil {
c.Scheme = scheme.Scheme
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"sigs.k8s.io/controller-runtime/pkg/client"
)

// InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
Expand Down Expand Up @@ -73,16 +75,16 @@ func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {

// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
// the Informer from the map.
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object, opts ...client.ListOption) (bool, *MapEntry, error) {
_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
isUnstructured = isUnstructured || isUnstructuredList

if isUnstructured {
return m.unstructured.Get(gvk, obj)
return m.unstructured.Get(gvk, obj, opts...)
}

return m.structured.Get(gvk, obj)
return m.structured.Get(gvk, obj, opts...)
}

// newStructuredInformersMap creates a new InformersMap for structured objects.
Expand Down
74 changes: 48 additions & 26 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

// clientListWatcherFunc knows how to create a ListWatcher
type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error)
type createListWatcherFunc func(gvk schema.GroupVersionKind, namespace string, ip *specificInformersMap) (*cache.ListWatch, error)

// newSpecificInformersMap returns a new specificInformersMap (like
// the generical InformersMap, except that it doesn't implement WaitForCacheSync).
Expand All @@ -49,7 +50,7 @@ func newSpecificInformersMap(config *rest.Config,
config: config,
Scheme: scheme,
mapper: mapper,
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
informers: make(map[MapKey]*MapEntry),
codecs: serializer.NewCodecFactory(scheme),
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
Expand All @@ -59,6 +60,12 @@ func newSpecificInformersMap(config *rest.Config,
return ip
}

// MapKey is the key for looking up informers
type MapKey struct {
gvk schema.GroupVersionKind
namespace string
}

// MapEntry contains the cached data for an Informer
type MapEntry struct {
// Informer is the cached informer
Expand All @@ -80,8 +87,8 @@ type specificInformersMap struct {
// mapper maps GroupVersionKinds to Resources
mapper meta.RESTMapper

// informersByGVK is the cache of informers keyed by groupVersionKind
informersByGVK map[schema.GroupVersionKind]*MapEntry
// informers is the cache of informers keyed by groupVersionKind and namespace
informers map[MapKey]*MapEntry

// codecs is used to create a new REST client
codecs serializer.CodecFactory
Expand All @@ -106,7 +113,7 @@ type specificInformersMap struct {
// unstructured objects.
createListWatcher createListWatcherFunc

// namespace is the namespace that all ListWatches are restricted to
// namespace is the namespace that all ListWatches are restricted to unless overridden
// default or empty string means all namespaces
namespace string
}
Expand All @@ -122,7 +129,7 @@ func (ip *specificInformersMap) Start(stop <-chan struct{}) {
ip.stop = stop

// Start each informer
for _, informer := range ip.informersByGVK {
for _, informer := range ip.informers {
go informer.Informer.Run(stop)
}

Expand All @@ -136,27 +143,42 @@ func (ip *specificInformersMap) Start(stop <-chan struct{}) {
func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {
ip.mu.RLock()
defer ip.mu.RUnlock()
syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informersByGVK))
for _, informer := range ip.informersByGVK {
syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informers))
for _, informer := range ip.informers {
syncedFuncs = append(syncedFuncs, informer.Informer.HasSynced)
}
return syncedFuncs
}

// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object, opts ...client.ListOption) (bool, *MapEntry, error) {
// Set default list options and override
listOpts := client.ListOptions{
Namespace: ip.namespace,
}
listOpts.ApplyOptions(opts)

// If configured using a global namespace, use its cache to avoid duplication
if ip.namespace == "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So generally this looks like it will work and I like the overall idea of "use specific cache unless configured to use global one" :)

There are however some things to consider:

  • If I read this correctly, using a source.Kind with a Namespace setting and a non-namespaced specificInformersMap will result in getting events from all namespaces, which is not expected
  • The current approach means that as soon as any namespace is configured, caches will get established for all namespaces an object was ever requested from which is also unexpected
  • When thinking about this approach in the context of label selectors I think just a binary "cache for everything if global cache, else distinct cache for every label selector ever used" is not enough, as it may lead to quite a bit of duplication again. Maybe a list of label selectors for which we want a distinct cache and then have the client internally either use the specific cache or do the label-selecting after receiving the objects from the cache?
  • More of a nit, but I think it will be better to not re-use client.ListOptions for the Get on the cache map, as that forces us to keep their feature set on par

At the end of the day thought, this is the point where I can not give you any further feedback if the approach is feasible or has more issues I didn't consider, we will have to wait for @DirectXMan12 to chime in.

But thanks a lot for pushing this forward, its really great to see some movement here :)

listOpts.Namespace = ip.namespace
}

key := MapKey{
gvk: gvk,
namespace: listOpts.Namespace,
}
// Return the informer if it is found
i, started, ok := func() (*MapEntry, bool, bool) {
ip.mu.RLock()
defer ip.mu.RUnlock()
i, ok := ip.informersByGVK[gvk]
i, ok := ip.informers[key]
return i, ip.started, ok
}()

if !ok {
var err error
if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
if i, started, err = ip.addInformerToMap(key, obj); err != nil {
return started, nil, err
}
}
Expand All @@ -171,20 +193,20 @@ func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Obj
return started, i, nil
}

func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) {
func (ip *specificInformersMap) addInformerToMap(key MapKey, obj runtime.Object) (*MapEntry, bool, error) {
ip.mu.Lock()
defer ip.mu.Unlock()

// Check the cache to see if we already have an Informer. If we do, return the Informer.
// This is for the case where 2 routines tried to get the informer when it wasn't in the map
// so neither returned early, but the first one created it.
if i, ok := ip.informersByGVK[gvk]; ok {
if i, ok := ip.informers[key]; ok {
return i, ip.started, nil
}

// Create a NewSharedIndexInformer and add it to the map.
var lw *cache.ListWatch
lw, err := ip.createListWatcher(gvk, ip)
lw, err := ip.createListWatcher(key.gvk, key.namespace, ip)
if err != nil {
return nil, false, err
}
Expand All @@ -193,9 +215,9 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
})
i := &MapEntry{
Informer: ni,
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: key.gvk},
}
ip.informersByGVK[gvk] = i
ip.informers[key] = i

// Start the Informer if need by
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
Expand All @@ -207,7 +229,7 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
}

// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
func createStructuredListWatch(gvk schema.GroupVersionKind, namespace string, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
Expand All @@ -229,21 +251,21 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
res := listObj.DeepCopyObject()
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
return client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
},
}, nil
}

func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
func createUnstructuredListWatch(gvk schema.GroupVersionKind, namespace string, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
Expand All @@ -258,17 +280,17 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(opts)
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(namespace).List(opts)
}
return dynamicClient.Resource(mapping.Resource).List(opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).Watch(opts)
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(namespace).Watch(opts)
}
return dynamicClient.Resource(mapping.Resource).Watch(opts)
},
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/multi_namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ type multiNamespaceCache struct {
var _ Cache = &multiNamespaceCache{}

// Methods for multiNamespaceCache to conform to the Informers interface
func (c *multiNamespaceCache) GetInformer(obj runtime.Object) (Informer, error) {
func (c *multiNamespaceCache) GetInformer(obj runtime.Object, opts ...client.ListOption) (Informer, error) {
informers := map[string]Informer{}
for ns, cache := range c.namespaceToCache {
informer, err := cache.GetInformer(obj)
informer, err := cache.GetInformer(obj, opts...)
if err != nil {
return nil, err
}
Expand All @@ -80,10 +80,10 @@ func (c *multiNamespaceCache) GetInformer(obj runtime.Object) (Informer, error)
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
}

func (c *multiNamespaceCache) GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error) {
func (c *multiNamespaceCache) GetInformerForKind(gvk schema.GroupVersionKind, opts ...client.ListOption) (Informer, error) {
informers := map[string]Informer{}
for ns, cache := range c.namespaceToCache {
informer, err := cache.GetInformerForKind(gvk)
informer, err := cache.GetInformerForKind(gvk, opts...)
if err != nil {
return nil, err
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source/internal"

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

Expand Down Expand Up @@ -60,12 +61,23 @@ type Kind struct {
// Type is the type of object to watch. e.g. &v1.Pod{}
Type runtime.Object

// Options are the list options for watching the objects.
// TODO: Only namespace is used by the implementation at present
options []client.ListOption

// cache used to watch APIs
cache cache.Cache
}

var _ Source = &Kind{}

func NewKind(obj runtime.Object, opts ...client.ListOption) *Kind {
return &Kind{
Type: obj,
options: opts,
}
}

// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
Expand All @@ -82,7 +94,7 @@ func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimiting
}

// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, err := ks.cache.GetInformer(ks.Type)
i, err := ks.cache.GetInformer(ks.Type, ks.options...)
if err != nil {
if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
Expand Down