Skip to content

Commit b27195f

Browse files
authored
Merge pull request #170 from justinsb/namespace_scoped_dynamic_watching
Allow namespace-scoped dynamic watching
2 parents c5c6df5 + 45e9a71 commit b27195f

File tree

3 files changed

+90
-182
lines changed

3 files changed

+90
-182
lines changed

pkg/patterns/declarative/pkg/watch/dynamic.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,24 +63,29 @@ type dynamicWatch struct {
6363
events chan event.GenericEvent
6464
}
6565

66-
func (dw *dynamicWatch) newDynamicClient(gvk schema.GroupVersionKind) (dynamic.ResourceInterface, error) {
66+
func (dw *dynamicWatch) newDynamicClient(gvk schema.GroupVersionKind, namespace string) (dynamic.ResourceInterface, error) {
6767
mapping, err := dw.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
6868
if err != nil {
6969
return nil, err
7070
}
71-
return dw.client.Resource(mapping.Resource), nil
71+
resource := dw.client.Resource(mapping.Resource)
72+
if namespace == "" {
73+
return resource, nil
74+
} else {
75+
return resource.Namespace(namespace), nil
76+
}
7277
}
7378

7479
// Add registers a watch for changes to 'trigger' filtered by 'options' to raise an event on 'target'
75-
func (dw *dynamicWatch) Add(trigger schema.GroupVersionKind, options metav1.ListOptions, target metav1.ObjectMeta) error {
76-
client, err := dw.newDynamicClient(trigger)
80+
func (dw *dynamicWatch) Add(trigger schema.GroupVersionKind, options metav1.ListOptions, filterNamespace string, target metav1.ObjectMeta) error {
81+
client, err := dw.newDynamicClient(trigger, filterNamespace)
7782
if err != nil {
7883
return fmt.Errorf("creating client for (%s): %v", trigger.String(), err)
7984
}
8085

8186
go func() {
8287
for {
83-
dw.watchUntilClosed(client, trigger, options, target)
88+
dw.watchUntilClosed(client, trigger, options, filterNamespace, target)
8489

8590
time.Sleep(WatchDelay)
8691
}
@@ -103,19 +108,19 @@ type clientObject struct {
103108
// from this Watch but it will ensure we always Reconcile when needed`.
104109
//
105110
// [1] https://github.com/kubernetes/kubernetes/issues/54878#issuecomment-357575276
106-
func (dw *dynamicWatch) watchUntilClosed(client dynamic.ResourceInterface, trigger schema.GroupVersionKind, options metav1.ListOptions, target metav1.ObjectMeta) {
111+
func (dw *dynamicWatch) watchUntilClosed(client dynamic.ResourceInterface, trigger schema.GroupVersionKind, options metav1.ListOptions, filterNamespace string, target metav1.ObjectMeta) {
107112
log := log.Log
108113

109114
// Though we don't use the resource version, we allow bookmarks to help keep TCP connections healthy.
110115
options.AllowWatchBookmarks = true
111116

112117
events, err := client.Watch(context.TODO(), options)
113118
if err != nil {
114-
log.WithValues("kind", trigger.String()).WithValues("namespace", target.Namespace).WithValues("labels", options.LabelSelector).Error(err, "adding watch to dynamic client")
119+
log.WithValues("kind", trigger.String()).WithValues("namespace", filterNamespace).WithValues("labels", options.LabelSelector).Error(err, "failed to add watch to dynamic client")
115120
return
116121
}
117122

118-
log.WithValues("kind", trigger.String()).WithValues("namespace", target.Namespace).WithValues("labels", options.LabelSelector).Info("watch began")
123+
log.WithValues("kind", trigger.String()).WithValues("namespace", filterNamespace).WithValues("labels", options.LabelSelector).Info("watch began")
119124

120125
// Always clean up watchers
121126
defer events.Stop()
@@ -129,7 +134,7 @@ func (dw *dynamicWatch) watchUntilClosed(client dynamic.ResourceInterface, trigg
129134
dw.events <- event.GenericEvent{Object: clientObject{Object: clientEvent.Object, ObjectMeta: &target}}
130135
}
131136

132-
log.WithValues("kind", trigger.String()).WithValues("namespace", target.Namespace).WithValues("labels", options.LabelSelector).Info("watch closed")
137+
log.WithValues("kind", trigger.String()).WithValues("namespace", filterNamespace).WithValues("labels", options.LabelSelector).Info("watch closed")
133138

134139
return
135140
}

pkg/patterns/declarative/watch.go

Lines changed: 76 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package declarative
1919
import (
2020
"context"
2121
"fmt"
22-
"sort"
22+
"sync"
2323

2424
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2525
"k8s.io/apimachinery/pkg/labels"
@@ -33,61 +33,119 @@ import (
3333
"sigs.k8s.io/kubebuilder-declarative-pattern/pkg/patterns/declarative/pkg/watch"
3434
)
3535

36-
type Source interface {
36+
type eventsSource interface {
3737
SetSink(sink Sink)
3838
}
3939

4040
type DynamicWatch interface {
41-
// Add registers a watch for changes to 'trigger' filtered by 'options' to raise an event on 'target'
42-
Add(trigger schema.GroupVersionKind, options metav1.ListOptions, target metav1.ObjectMeta) error
41+
// Add registers a watch for changes to 'trigger' filtered by 'options' to raise an event on 'target'.
42+
// If namespace is specified, the watch will be restricted to that namespace.
43+
Add(trigger schema.GroupVersionKind, options metav1.ListOptions, namespace string, target metav1.ObjectMeta) error
4344
}
4445

45-
// WatchAll creates a Watch on ctrl for all objects reconciled by recnl
46-
func WatchAll(config *rest.Config, ctrl controller.Controller, recnl Source, labelMaker LabelMaker) (chan struct{}, error) {
47-
if labelMaker == nil {
46+
// WatchChildrenOptions configures how we want to watch children.
47+
type WatchChildrenOptions struct {
48+
// RESTConfig is the configuration for connecting to the cluster
49+
RESTConfig *rest.Config
50+
51+
// LabelMaker is used to build the labels we should watch on.
52+
LabelMaker LabelMaker
53+
54+
// Controller contains the controller itself
55+
Controller controller.Controller
56+
57+
// Reconciler lets us hook into the post-apply lifecycle event.
58+
Reconciler eventsSource
59+
60+
// ScopeWatchesToNamespace controls whether watches are per-namespace.
61+
// This allows for more narrowly scoped RBAC permissions, at the cost of more watches.
62+
ScopeWatchesToNamespace bool
63+
}
64+
65+
// WatchAll creates a Watch on ctrl for all objects reconciled by recnl.
66+
// Deprecated: prefer WatchChildren (and consider setting ScopeWatchesToNamespace)
67+
func WatchAll(config *rest.Config, ctrl controller.Controller, reconciler eventsSource, labelMaker LabelMaker) (chan struct{}, error) {
68+
options := WatchChildrenOptions{
69+
RESTConfig: config,
70+
Controller: ctrl,
71+
Reconciler: reconciler,
72+
LabelMaker: labelMaker,
73+
ScopeWatchesToNamespace: false,
74+
}
75+
return WatchChildren(options)
76+
}
77+
78+
// WatchChildren sets up watching of the objects applied by a controller.
79+
func WatchChildren(options WatchChildrenOptions) (chan struct{}, error) {
80+
if options.LabelMaker == nil {
4881
return nil, fmt.Errorf("labelMaker is required to scope watches")
4982
}
5083

51-
dw, events, err := watch.NewDynamicWatch(*config)
84+
dw, events, err := watch.NewDynamicWatch(*options.RESTConfig)
5285
if err != nil {
5386
return nil, fmt.Errorf("creating dynamic watch: %v", err)
5487
}
88+
5589
src := &source.Channel{Source: events}
5690
// Inject a stop channel that will never close. The controller does not have a concept of
5791
// shutdown, so there is no oppritunity to stop the watch.
5892
stopCh := make(chan struct{})
5993
src.InjectStopChannel(stopCh)
60-
if err := ctrl.Watch(src, &handler.EnqueueRequestForObject{}); err != nil {
61-
return nil, fmt.Errorf("setting up dynamic watch on the controller: %v", err)
94+
if err := options.Controller.Watch(src, &handler.EnqueueRequestForObject{}); err != nil {
95+
return nil, fmt.Errorf("setting up dynamic watch on the controller: %w", err)
6296
}
63-
recnl.SetSink(&watchAll{dw, labelMaker, make(map[string]struct{})})
97+
98+
options.Reconciler.SetSink(&watchAll{
99+
dw: dw,
100+
options: options,
101+
registered: make(map[string]struct{})})
102+
64103
return stopCh, nil
65104
}
66105

67106
type watchAll struct {
68-
dw DynamicWatch
69-
labelMaker LabelMaker
107+
dw DynamicWatch
108+
109+
options WatchChildrenOptions
110+
111+
mutex sync.Mutex
112+
// registered tracks what we are currently watching, avoid duplicate watches.
70113
registered map[string]struct{}
71114
}
72115

116+
// Notify is called by the controller when the object changes. We establish any new watches.
73117
func (w *watchAll) Notify(ctx context.Context, dest DeclarativeObject, objs *manifest.Objects) error {
74118
log := log.Log
75119

76-
labelSelector, err := labels.ValidatedSelectorFromSet(w.labelMaker(ctx, dest))
120+
labelSelector, err := labels.ValidatedSelectorFromSet(w.options.LabelMaker(ctx, dest))
77121
if err != nil {
78122
return fmt.Errorf("failed to build label selector: %w", err)
79123
}
80124

81125
notify := metav1.ObjectMeta{Name: dest.GetName(), Namespace: dest.GetNamespace()}
82126
filter := metav1.ListOptions{LabelSelector: labelSelector.String()}
83127

84-
for _, gvk := range uniqueGroupVersionKind(objs) {
85-
key := fmt.Sprintf("%s,%s,%s", gvk.String(), labelSelector.String(), dest.GetNamespace())
86-
if _, ok := w.registered[key]; ok {
128+
// Protect against concurrent invocation
129+
w.mutex.Lock()
130+
defer w.mutex.Unlock()
131+
132+
for _, obj := range objs.Items {
133+
gvk := obj.GroupVersionKind()
134+
135+
key := fmt.Sprintf("gvk=%s:%s:%s;labels=%s", gvk.Group, gvk.Version, gvk.Kind, filter.LabelSelector)
136+
137+
filterNamespace := ""
138+
if w.options.ScopeWatchesToNamespace && obj.Namespace != "" {
139+
filterNamespace = obj.Namespace
140+
key += ";namespace=" + filterNamespace
141+
}
142+
143+
if _, found := w.registered[key]; found {
87144
continue
88145
}
89146

90-
err := w.dw.Add(gvk, filter, notify)
147+
log.Info("adding watch", "key", key)
148+
err := w.dw.Add(gvk, filter, filterNamespace, notify)
91149
if err != nil {
92150
log.WithValues("GroupVersionKind", gvk.String()).Error(err, "adding watch")
93151
continue
@@ -97,19 +155,3 @@ func (w *watchAll) Notify(ctx context.Context, dest DeclarativeObject, objs *man
97155
}
98156
return nil
99157
}
100-
101-
// uniqueGroupVersionKind returns all unique GroupVersionKind defined in objects
102-
func uniqueGroupVersionKind(objects *manifest.Objects) []schema.GroupVersionKind {
103-
kinds := map[schema.GroupVersionKind]struct{}{}
104-
for _, o := range objects.Items {
105-
kinds[o.GroupVersionKind()] = struct{}{}
106-
}
107-
var unique []schema.GroupVersionKind
108-
for gvk := range kinds {
109-
unique = append(unique, gvk)
110-
}
111-
sort.Slice(unique, func(i, j int) bool {
112-
return unique[i].String() < unique[j].String()
113-
})
114-
return unique
115-
}

pkg/patterns/declarative/watch_test.go

Lines changed: 0 additions & 139 deletions
This file was deleted.

0 commit comments

Comments
 (0)