Skip to content

🌱 [WIP] Conditional Controllers #1527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 51 additions & 2 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -72,6 +73,7 @@ type ForInput struct {
predicates []predicate.Predicate
objectProjection objectProjection
err error
conditional bool
}

// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
Expand All @@ -97,6 +99,7 @@ type OwnsInput struct {
object client.Object
predicates []predicate.Predicate
objectProjection objectProjection
conditional bool
}

// Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to
Expand All @@ -118,6 +121,7 @@ type WatchesInput struct {
eventhandler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
conditional bool
}

// Watches exposes the lower-level ControllerManagedBy Watches functions through the builder. Consider using
Expand Down Expand Up @@ -216,13 +220,48 @@ func (blder *Builder) project(obj client.Object, proj objectProjection) (client.
}
}

func (blder *Builder) generateConditionalSource(typeForSrc client.Object) (source.Source, error) {
gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())
if err != nil {
return nil, err
}
dc, err := discovery.NewDiscoveryClientForConfig(blder.mgr.GetConfig())
if err != nil {
return nil, err
}
existsInDiscovery := func() bool {
resources, err := dc.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
if err != nil {
return false
}
for _, res := range resources.APIResources {
if res.Kind == gvk.Kind {
return true
}
}
return false
}
return &source.ConditionalKind{Kind: source.Kind{Type: typeForSrc}, DiscoveryCheck: existsInDiscovery}, nil

}

func (blder *Builder) doWatch() error {
// Reconcile type
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}

var src source.Source
if blder.forInput.conditional {
var err error
src, err = blder.generateConditionalSource(typeForSrc)
if err != nil {
return err
}
} else {
src = &source.Kind{Type: typeForSrc}
}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
Expand All @@ -235,7 +274,17 @@ func (blder *Builder) doWatch() error {
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}

var src source.Source
if own.conditional {
var err error
src, err = blder.generateConditionalSource(typeForSrc)
if err != nil {
return err
}
} else {
src = &source.Kind{Type: typeForSrc}
}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.forInput.object,
IsController: true,
Expand Down
13 changes: 13 additions & 0 deletions pkg/builder/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,19 @@ func (p projectAs) ApplyToWatches(opts *WatchesInput) {
opts.objectProjection = objectProjection(p)
}

// Conditional confirms that it should configure a ConditionalSource
// that can be started/stopped/restarted based on the existence of
// the input's type in discovery.
type Conditional struct{}

func (s Conditional) ApplyToFor(opts *ForInput) {
opts.conditional = true
}

func (s Conditional) ApplyToOwns(opts *OwnsInput) {
opts.conditional = true
}

var (
// OnlyMetadata tells the controller to *only* cache metadata, and to watch
// the the API server in metadata-only form. This is useful when watching
Expand Down
4 changes: 4 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type Informers interface {
// API kind and resource.
GetInformer(ctx context.Context, obj client.Object) (Informer, error)

// GetInformerStop fetches the stop channel of the informer for the given object (constructing
// the informer if necessary). This stop channel fires when the controller has stopped.
GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error)

// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
// of the underlying object.
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)
Expand Down
22 changes: 18 additions & 4 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie
return err
}

started, cache, err := ip.InformersMap.Get(ctx, gvk, out)
started, cache, err := ip.InformersMap.Get(ctx, gvk, out, false)
if err != nil {
return err
}
Expand All @@ -76,7 +76,7 @@ func (ip *informerCache) List(ctx context.Context, out client.ObjectList, opts .
return err
}

started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj)
started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou
return nil, err
}

_, i, err := ip.InformersMap.Get(ctx, gvk, obj)
_, i, err := ip.InformersMap.Get(ctx, gvk, obj, false)
if err != nil {
return nil, err
}
Expand All @@ -152,13 +152,27 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In
return nil, err
}

_, i, err := ip.InformersMap.Get(ctx, gvk, obj)
_, i, err := ip.InformersMap.Get(ctx, gvk, obj, false)
if err != nil {
return nil, err
}
return i.Informer, err
}

// GetInformerStop returns the stopChannel of the informer for the obj
func (ip *informerCache) GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) {
gvk, err := apiutil.GVKForObject(obj, ip.Scheme)
if err != nil {
return nil, err
}
_, i, err := ip.InformersMap.Get(ctx, gvk, obj, true)
if err != nil {
return nil, err
}
return i.StopCh, err

}

// NeedLeaderElection implements the LeaderElectionRunnable interface
// to indicate that this can be started without requiring the leader lock
func (ip *informerCache) NeedLeaderElection() bool {
Expand Down
6 changes: 6 additions & 0 deletions pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cac
return c.informerFor(gvk, obj)
}

// GetStoppableInformer implements informers
func (c *FakeInformers) GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) {
// TODO: not implemented
return nil, nil
}

// WaitForCacheSync implements Informers
func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool {
if c.Synced == nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,18 @@ func (m *InformersMap) WaitForCacheSync(ctx context.Context) bool {

// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
// the Informer from the map.
func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) {
switch obj.(type) {
case *unstructured.Unstructured:
return m.unstructured.Get(ctx, gvk, obj)
return m.unstructured.Get(ctx, gvk, obj, stopOnError)
case *unstructured.UnstructuredList:
return m.unstructured.Get(ctx, gvk, obj)
return m.unstructured.Get(ctx, gvk, obj, stopOnError)
case *metav1.PartialObjectMetadata:
return m.metadata.Get(ctx, gvk, obj)
return m.metadata.Get(ctx, gvk, obj, stopOnError)
case *metav1.PartialObjectMetadataList:
return m.metadata.Get(ctx, gvk, obj)
return m.metadata.Get(ctx, gvk, obj, stopOnError)
default:
return m.structured.Get(ctx, gvk, obj)
return m.structured.Get(ctx, gvk, obj, stopOnError)
}
}

Expand Down
29 changes: 25 additions & 4 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type MapEntry struct {

// CacheReader wraps Informer and implements the CacheReader interface for a single type
Reader CacheReader

// StopCh is a channel that is closed after
// the informer stops
StopCh <-chan struct{}
}

// specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
Expand Down Expand Up @@ -171,7 +175,7 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {

// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) {
// Return the informer if it is found
i, started, ok := func() (*MapEntry, bool, bool) {
ip.mu.RLock()
Expand All @@ -182,7 +186,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion

if !ok {
var err error
if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
if i, started, err = ip.addInformerToMap(gvk, obj, stopOnError); err != nil {
return started, nil, err
}
}
Expand All @@ -197,7 +201,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion
return started, i, nil
}

func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) {
func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (*MapEntry, bool, error) {
ip.mu.Lock()
defer ip.mu.Unlock()

Expand Down Expand Up @@ -228,17 +232,34 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
default:
}

informerStop := make(chan struct{})
i := &MapEntry{
Informer: ni,
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()},
StopCh: informerStop,
}
ip.informersByGVK[gvk] = i

go func() {
<-ip.stop
close(informerStop)
}()

i.Informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
// TODO: ensure the error is a kind not found error before stopping
if stopOnError {
close(informerStop)
}
})

// Start the Informer if need by
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
// can you add eventhandlers?
if ip.started {
go i.Informer.Run(ip.stop)
go func() {
i.Informer.Run(informerStop)
delete(ip.informersByGVK, gvk)
}()
}
return i, ip.started, nil
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/cache/multi_namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cache
import (
"context"
"fmt"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -90,6 +91,29 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
}

func (c *multiNamespaceCache) GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) {
multiStopCh := make(chan struct{})
var wg sync.WaitGroup
for _, cache := range c.namespaceToCache {
stopCh, err := cache.GetInformerStop(ctx, obj)
if err != nil {
return nil, err
}
wg.Add(1)
go func(stopCh <-chan struct{}) {
defer wg.Done()
<-stopCh

}(stopCh)
}

go func() {
defer close(multiStopCh)
wg.Done()
}()
return multiStopCh, nil
}

func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
informers := map[string]Informer{}
for ns, cache := range c.namespaceToCache {
Expand Down
Loading