Skip to content

Allow namespace-scoped dynamic watching #170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions pkg/patterns/declarative/pkg/watch/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,29 @@ type dynamicWatch struct {
events chan event.GenericEvent
}

func (dw *dynamicWatch) newDynamicClient(gvk schema.GroupVersionKind) (dynamic.ResourceInterface, error) {
func (dw *dynamicWatch) newDynamicClient(gvk schema.GroupVersionKind, namespace string) (dynamic.ResourceInterface, error) {
mapping, err := dw.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
return dw.client.Resource(mapping.Resource), nil
resource := dw.client.Resource(mapping.Resource)
if namespace == "" {
return resource, nil
} else {
return resource.Namespace(namespace), nil
}
}

// Add registers a watch for changes to 'trigger' filtered by 'options' to raise an event on 'target'
func (dw *dynamicWatch) Add(trigger schema.GroupVersionKind, options metav1.ListOptions, target metav1.ObjectMeta) error {
client, err := dw.newDynamicClient(trigger)
func (dw *dynamicWatch) Add(trigger schema.GroupVersionKind, options metav1.ListOptions, filterNamespace string, target metav1.ObjectMeta) error {
client, err := dw.newDynamicClient(trigger, filterNamespace)
if err != nil {
return fmt.Errorf("creating client for (%s): %v", trigger.String(), err)
}

go func() {
for {
dw.watchUntilClosed(client, trigger, options, target)
dw.watchUntilClosed(client, trigger, options, filterNamespace, target)

time.Sleep(WatchDelay)
}
Expand All @@ -103,19 +108,19 @@ type clientObject struct {
// from this Watch but it will ensure we always Reconcile when needed`.
//
// [1] https://github.com/kubernetes/kubernetes/issues/54878#issuecomment-357575276
func (dw *dynamicWatch) watchUntilClosed(client dynamic.ResourceInterface, trigger schema.GroupVersionKind, options metav1.ListOptions, target metav1.ObjectMeta) {
func (dw *dynamicWatch) watchUntilClosed(client dynamic.ResourceInterface, trigger schema.GroupVersionKind, options metav1.ListOptions, filterNamespace string, target metav1.ObjectMeta) {
log := log.Log

// Though we don't use the resource version, we allow bookmarks to help keep TCP connections healthy.
options.AllowWatchBookmarks = true

events, err := client.Watch(context.TODO(), options)
if err != nil {
log.WithValues("kind", trigger.String()).WithValues("namespace", target.Namespace).WithValues("labels", options.LabelSelector).Error(err, "adding watch to dynamic client")
log.WithValues("kind", trigger.String()).WithValues("namespace", filterNamespace).WithValues("labels", options.LabelSelector).Error(err, "failed to add watch to dynamic client")
return
}

log.WithValues("kind", trigger.String()).WithValues("namespace", target.Namespace).WithValues("labels", options.LabelSelector).Info("watch began")
log.WithValues("kind", trigger.String()).WithValues("namespace", filterNamespace).WithValues("labels", options.LabelSelector).Info("watch began")

// Always clean up watchers
defer events.Stop()
Expand All @@ -129,7 +134,7 @@ func (dw *dynamicWatch) watchUntilClosed(client dynamic.ResourceInterface, trigg
dw.events <- event.GenericEvent{Object: clientObject{Object: clientEvent.Object, ObjectMeta: &target}}
}

log.WithValues("kind", trigger.String()).WithValues("namespace", target.Namespace).WithValues("labels", options.LabelSelector).Info("watch closed")
log.WithValues("kind", trigger.String()).WithValues("namespace", filterNamespace).WithValues("labels", options.LabelSelector).Info("watch closed")

return
}
110 changes: 76 additions & 34 deletions pkg/patterns/declarative/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package declarative
import (
"context"
"fmt"
"sort"
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -33,61 +33,119 @@ import (
"sigs.k8s.io/kubebuilder-declarative-pattern/pkg/patterns/declarative/pkg/watch"
)

type Source interface {
type eventsSource interface {
SetSink(sink Sink)
}

type DynamicWatch interface {
// Add registers a watch for changes to 'trigger' filtered by 'options' to raise an event on 'target'
Add(trigger schema.GroupVersionKind, options metav1.ListOptions, target metav1.ObjectMeta) error
// Add registers a watch for changes to 'trigger' filtered by 'options' to raise an event on 'target'.
// If namespace is specified, the watch will be restricted to that namespace.
Add(trigger schema.GroupVersionKind, options metav1.ListOptions, namespace string, target metav1.ObjectMeta) error
}

// WatchAll creates a Watch on ctrl for all objects reconciled by recnl
func WatchAll(config *rest.Config, ctrl controller.Controller, recnl Source, labelMaker LabelMaker) (chan struct{}, error) {
if labelMaker == nil {
// WatchChildrenOptions configures how we want to watch children.
type WatchChildrenOptions struct {
// RESTConfig is the configuration for connecting to the cluster
RESTConfig *rest.Config

// LabelMaker is used to build the labels we should watch on.
LabelMaker LabelMaker

// Controller contains the controller itself
Controller controller.Controller

// Reconciler lets us hook into the post-apply lifecycle event.
Reconciler eventsSource

// ScopeWatchesToNamespace controls whether watches are per-namespace.
// This allows for more narrowly scoped RBAC permissions, at the cost of more watches.
ScopeWatchesToNamespace bool
}

// WatchAll creates a Watch on ctrl for all objects reconciled by recnl.
// Deprecated: prefer WatchChildren (and consider setting ScopeWatchesToNamespace)
func WatchAll(config *rest.Config, ctrl controller.Controller, reconciler eventsSource, labelMaker LabelMaker) (chan struct{}, error) {
options := WatchChildrenOptions{
RESTConfig: config,
Controller: ctrl,
Reconciler: reconciler,
LabelMaker: labelMaker,
ScopeWatchesToNamespace: false,
}
return WatchChildren(options)
}

// WatchChildren sets up watching of the objects applied by a controller.
func WatchChildren(options WatchChildrenOptions) (chan struct{}, error) {
if options.LabelMaker == nil {
return nil, fmt.Errorf("labelMaker is required to scope watches")
}

dw, events, err := watch.NewDynamicWatch(*config)
dw, events, err := watch.NewDynamicWatch(*options.RESTConfig)
if err != nil {
return nil, fmt.Errorf("creating dynamic watch: %v", err)
}

src := &source.Channel{Source: events}
// Inject a stop channel that will never close. The controller does not have a concept of
// shutdown, so there is no oppritunity to stop the watch.
stopCh := make(chan struct{})
src.InjectStopChannel(stopCh)
if err := ctrl.Watch(src, &handler.EnqueueRequestForObject{}); err != nil {
return nil, fmt.Errorf("setting up dynamic watch on the controller: %v", err)
if err := options.Controller.Watch(src, &handler.EnqueueRequestForObject{}); err != nil {
return nil, fmt.Errorf("setting up dynamic watch on the controller: %w", err)
}
recnl.SetSink(&watchAll{dw, labelMaker, make(map[string]struct{})})

options.Reconciler.SetSink(&watchAll{
dw: dw,
options: options,
registered: make(map[string]struct{})})

return stopCh, nil
}

type watchAll struct {
dw DynamicWatch
labelMaker LabelMaker
dw DynamicWatch

options WatchChildrenOptions

mutex sync.Mutex
// registered tracks what we are currently watching, avoid duplicate watches.
registered map[string]struct{}
}

// Notify is called by the controller when the object changes. We establish any new watches.
func (w *watchAll) Notify(ctx context.Context, dest DeclarativeObject, objs *manifest.Objects) error {
log := log.Log

labelSelector, err := labels.ValidatedSelectorFromSet(w.labelMaker(ctx, dest))
labelSelector, err := labels.ValidatedSelectorFromSet(w.options.LabelMaker(ctx, dest))
if err != nil {
return fmt.Errorf("failed to build label selector: %w", err)
}

notify := metav1.ObjectMeta{Name: dest.GetName(), Namespace: dest.GetNamespace()}
filter := metav1.ListOptions{LabelSelector: labelSelector.String()}

for _, gvk := range uniqueGroupVersionKind(objs) {
key := fmt.Sprintf("%s,%s,%s", gvk.String(), labelSelector.String(), dest.GetNamespace())
if _, ok := w.registered[key]; ok {
// Protect against concurrent invocation
w.mutex.Lock()
defer w.mutex.Unlock()

for _, obj := range objs.Items {
gvk := obj.GroupVersionKind()

key := fmt.Sprintf("gvk=%s:%s:%s;labels=%s", gvk.Group, gvk.Version, gvk.Kind, filter.LabelSelector)

filterNamespace := ""
if w.options.ScopeWatchesToNamespace && obj.Namespace != "" {
filterNamespace = obj.Namespace
key += ";namespace=" + filterNamespace
}

if _, found := w.registered[key]; found {
continue
}

err := w.dw.Add(gvk, filter, notify)
log.Info("adding watch", "key", key)
err := w.dw.Add(gvk, filter, filterNamespace, notify)
if err != nil {
log.WithValues("GroupVersionKind", gvk.String()).Error(err, "adding watch")
continue
Expand All @@ -97,19 +155,3 @@ func (w *watchAll) Notify(ctx context.Context, dest DeclarativeObject, objs *man
}
return nil
}

// uniqueGroupVersionKind returns all unique GroupVersionKind defined in objects
func uniqueGroupVersionKind(objects *manifest.Objects) []schema.GroupVersionKind {
kinds := map[schema.GroupVersionKind]struct{}{}
for _, o := range objects.Items {
kinds[o.GroupVersionKind()] = struct{}{}
}
var unique []schema.GroupVersionKind
for gvk := range kinds {
unique = append(unique, gvk)
}
sort.Slice(unique, func(i, j int) bool {
return unique[i].String() < unique[j].String()
})
return unique
}
139 changes: 0 additions & 139 deletions pkg/patterns/declarative/watch_test.go

This file was deleted.