Skip to content

Commit 45e9a71

Browse files
committed
Allow namespace-scoped dynamic watching
In namespace-scoped mode, namespaced objects will be watched only in that namespace. To start using, use declarative.WatchChildren and specify ScopeWatchesToNamespace. We also deprecate WatchAll to allow for future extensibility (but retain it as a forwarding shim)
1 parent c5c6df5 commit 45e9a71

File tree

3 files changed

+90
-182
lines changed

3 files changed

+90
-182
lines changed

pkg/patterns/declarative/pkg/watch/dynamic.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,24 +63,29 @@ type dynamicWatch struct {
6363
events chan event.GenericEvent
6464
}
6565

66-
func (dw *dynamicWatch) newDynamicClient(gvk schema.GroupVersionKind) (dynamic.ResourceInterface, error) {
66+
func (dw *dynamicWatch) newDynamicClient(gvk schema.GroupVersionKind, namespace string) (dynamic.ResourceInterface, error) {
6767
mapping, err := dw.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
6868
if err != nil {
6969
return nil, err
7070
}
71-
return dw.client.Resource(mapping.Resource), nil
71+
resource := dw.client.Resource(mapping.Resource)
72+
if namespace == "" {
73+
return resource, nil
74+
} else {
75+
return resource.Namespace(namespace), nil
76+
}
7277
}
7378

7479
// Add registers a watch for changes to 'trigger' filtered by 'options' to raise an event on 'target'
75-
func (dw *dynamicWatch) Add(trigger schema.GroupVersionKind, options metav1.ListOptions, target metav1.ObjectMeta) error {
76-
client, err := dw.newDynamicClient(trigger)
80+
func (dw *dynamicWatch) Add(trigger schema.GroupVersionKind, options metav1.ListOptions, filterNamespace string, target metav1.ObjectMeta) error {
81+
client, err := dw.newDynamicClient(trigger, filterNamespace)
7782
if err != nil {
7883
return fmt.Errorf("creating client for (%s): %v", trigger.String(), err)
7984
}
8085

8186
go func() {
8287
for {
83-
dw.watchUntilClosed(client, trigger, options, target)
88+
dw.watchUntilClosed(client, trigger, options, filterNamespace, target)
8489

8590
time.Sleep(WatchDelay)
8691
}
@@ -103,19 +108,19 @@ type clientObject struct {
103108
// from this Watch but it will ensure we always Reconcile when needed`.
104109
//
105110
// [1] https://github.com/kubernetes/kubernetes/issues/54878#issuecomment-357575276
106-
func (dw *dynamicWatch) watchUntilClosed(client dynamic.ResourceInterface, trigger schema.GroupVersionKind, options metav1.ListOptions, target metav1.ObjectMeta) {
111+
func (dw *dynamicWatch) watchUntilClosed(client dynamic.ResourceInterface, trigger schema.GroupVersionKind, options metav1.ListOptions, filterNamespace string, target metav1.ObjectMeta) {
107112
log := log.Log
108113

109114
// Though we don't use the resource version, we allow bookmarks to help keep TCP connections healthy.
110115
options.AllowWatchBookmarks = true
111116

112117
events, err := client.Watch(context.TODO(), options)
113118
if err != nil {
114-
log.WithValues("kind", trigger.String()).WithValues("namespace", target.Namespace).WithValues("labels", options.LabelSelector).Error(err, "adding watch to dynamic client")
119+
log.WithValues("kind", trigger.String()).WithValues("namespace", filterNamespace).WithValues("labels", options.LabelSelector).Error(err, "failed to add watch to dynamic client")
115120
return
116121
}
117122

118-
log.WithValues("kind", trigger.String()).WithValues("namespace", target.Namespace).WithValues("labels", options.LabelSelector).Info("watch began")
123+
log.WithValues("kind", trigger.String()).WithValues("namespace", filterNamespace).WithValues("labels", options.LabelSelector).Info("watch began")
119124

120125
// Always clean up watchers
121126
defer events.Stop()
@@ -129,7 +134,7 @@ func (dw *dynamicWatch) watchUntilClosed(client dynamic.ResourceInterface, trigg
129134
dw.events <- event.GenericEvent{Object: clientObject{Object: clientEvent.Object, ObjectMeta: &target}}
130135
}
131136

132-
log.WithValues("kind", trigger.String()).WithValues("namespace", target.Namespace).WithValues("labels", options.LabelSelector).Info("watch closed")
137+
log.WithValues("kind", trigger.String()).WithValues("namespace", filterNamespace).WithValues("labels", options.LabelSelector).Info("watch closed")
133138

134139
return
135140
}

pkg/patterns/declarative/watch.go

Lines changed: 76 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package declarative
1919
import (
2020
"context"
2121
"fmt"
22-
"sort"
22+
"sync"
2323

2424
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2525
"k8s.io/apimachinery/pkg/labels"
@@ -33,61 +33,119 @@ import (
3333
"sigs.k8s.io/kubebuilder-declarative-pattern/pkg/patterns/declarative/pkg/watch"
3434
)
3535

36-
type Source interface {
36+
type eventsSource interface {
3737
SetSink(sink Sink)
3838
}
3939

4040
type DynamicWatch interface {
41-
// Add registers a watch for changes to 'trigger' filtered by 'options' to raise an event on 'target'
42-
Add(trigger schema.GroupVersionKind, options metav1.ListOptions, target metav1.ObjectMeta) error
41+
// Add registers a watch for changes to 'trigger' filtered by 'options' to raise an event on 'target'.
42+
// If namespace is specified, the watch will be restricted to that namespace.
43+
Add(trigger schema.GroupVersionKind, options metav1.ListOptions, namespace string, target metav1.ObjectMeta) error
4344
}
4445

45-
// WatchAll creates a Watch on ctrl for all objects reconciled by recnl
46-
func WatchAll(config *rest.Config, ctrl controller.Controller, recnl Source, labelMaker LabelMaker) (chan struct{}, error) {
47-
if labelMaker == nil {
46+
// WatchChildrenOptions configures how we want to watch children.
47+
type WatchChildrenOptions struct {
48+
// RESTConfig is the configuration for connecting to the cluster
49+
RESTConfig *rest.Config
50+
51+
// LabelMaker is used to build the labels we should watch on.
52+
LabelMaker LabelMaker
53+
54+
// Controller contains the controller itself
55+
Controller controller.Controller
56+
57+
// Reconciler lets us hook into the post-apply lifecycle event.
58+
Reconciler eventsSource
59+
60+
// ScopeWatchesToNamespace controls whether watches are per-namespace.
61+
// This allows for more narrowly scoped RBAC permissions, at the cost of more watches.
62+
ScopeWatchesToNamespace bool
63+
}
64+
65+
// WatchAll creates a Watch on ctrl for all objects reconciled by recnl.
66+
// Deprecated: prefer WatchChildren (and consider setting ScopeWatchesToNamespace)
67+
func WatchAll(config *rest.Config, ctrl controller.Controller, reconciler eventsSource, labelMaker LabelMaker) (chan struct{}, error) {
68+
options := WatchChildrenOptions{
69+
RESTConfig: config,
70+
Controller: ctrl,
71+
Reconciler: reconciler,
72+
LabelMaker: labelMaker,
73+
ScopeWatchesToNamespace: false,
74+
}
75+
return WatchChildren(options)
76+
}
77+
78+
// WatchChildren sets up watching of the objects applied by a controller.
79+
func WatchChildren(options WatchChildrenOptions) (chan struct{}, error) {
80+
if options.LabelMaker == nil {
4881
return nil, fmt.Errorf("labelMaker is required to scope watches")
4982
}
5083

51-
dw, events, err := watch.NewDynamicWatch(*config)
84+
dw, events, err := watch.NewDynamicWatch(*options.RESTConfig)
5285
if err != nil {
5386
return nil, fmt.Errorf("creating dynamic watch: %v", err)
5487
}
88+
5589
src := &source.Channel{Source: events}
5690
// Inject a stop channel that will never close. The controller does not have a concept of
5791
// shutdown, so there is no oppritunity to stop the watch.
5892
stopCh := make(chan struct{})
5993
src.InjectStopChannel(stopCh)
60-
if err := ctrl.Watch(src, &handler.EnqueueRequestForObject{}); err != nil {
61-
return nil, fmt.Errorf("setting up dynamic watch on the controller: %v", err)
94+
if err := options.Controller.Watch(src, &handler.EnqueueRequestForObject{}); err != nil {
95+
return nil, fmt.Errorf("setting up dynamic watch on the controller: %w", err)
6296
}
63-
recnl.SetSink(&watchAll{dw, labelMaker, make(map[string]struct{})})
97+
98+
options.Reconciler.SetSink(&watchAll{
99+
dw: dw,
100+
options: options,
101+
registered: make(map[string]struct{})})
102+
64103
return stopCh, nil
65104
}
66105

67106
type watchAll struct {
68-
dw DynamicWatch
69-
labelMaker LabelMaker
107+
dw DynamicWatch
108+
109+
options WatchChildrenOptions
110+
111+
mutex sync.Mutex
112+
// registered tracks what we are currently watching, avoid duplicate watches.
70113
registered map[string]struct{}
71114
}
72115

116+
// Notify is called by the controller when the object changes. We establish any new watches.
73117
func (w *watchAll) Notify(ctx context.Context, dest DeclarativeObject, objs *manifest.Objects) error {
74118
log := log.Log
75119

76-
labelSelector, err := labels.ValidatedSelectorFromSet(w.labelMaker(ctx, dest))
120+
labelSelector, err := labels.ValidatedSelectorFromSet(w.options.LabelMaker(ctx, dest))
77121
if err != nil {
78122
return fmt.Errorf("failed to build label selector: %w", err)
79123
}
80124

81125
notify := metav1.ObjectMeta{Name: dest.GetName(), Namespace: dest.GetNamespace()}
82126
filter := metav1.ListOptions{LabelSelector: labelSelector.String()}
83127

84-
for _, gvk := range uniqueGroupVersionKind(objs) {
85-
key := fmt.Sprintf("%s,%s,%s", gvk.String(), labelSelector.String(), dest.GetNamespace())
86-
if _, ok := w.registered[key]; ok {
128+
// Protect against concurrent invocation
129+
w.mutex.Lock()
130+
defer w.mutex.Unlock()
131+
132+
for _, obj := range objs.Items {
133+
gvk := obj.GroupVersionKind()
134+
135+
key := fmt.Sprintf("gvk=%s:%s:%s;labels=%s", gvk.Group, gvk.Version, gvk.Kind, filter.LabelSelector)
136+
137+
filterNamespace := ""
138+
if w.options.ScopeWatchesToNamespace && obj.Namespace != "" {
139+
filterNamespace = obj.Namespace
140+
key += ";namespace=" + filterNamespace
141+
}
142+
143+
if _, found := w.registered[key]; found {
87144
continue
88145
}
89146

90-
err := w.dw.Add(gvk, filter, notify)
147+
log.Info("adding watch", "key", key)
148+
err := w.dw.Add(gvk, filter, filterNamespace, notify)
91149
if err != nil {
92150
log.WithValues("GroupVersionKind", gvk.String()).Error(err, "adding watch")
93151
continue
@@ -97,19 +155,3 @@ func (w *watchAll) Notify(ctx context.Context, dest DeclarativeObject, objs *man
97155
}
98156
return nil
99157
}
100-
101-
// uniqueGroupVersionKind returns all unique GroupVersionKind defined in objects
102-
func uniqueGroupVersionKind(objects *manifest.Objects) []schema.GroupVersionKind {
103-
kinds := map[schema.GroupVersionKind]struct{}{}
104-
for _, o := range objects.Items {
105-
kinds[o.GroupVersionKind()] = struct{}{}
106-
}
107-
var unique []schema.GroupVersionKind
108-
for gvk := range kinds {
109-
unique = append(unique, gvk)
110-
}
111-
sort.Slice(unique, func(i, j int) bool {
112-
return unique[i].String() < unique[j].String()
113-
})
114-
return unique
115-
}

pkg/patterns/declarative/watch_test.go

Lines changed: 0 additions & 139 deletions
This file was deleted.

0 commit comments

Comments
 (0)