Skip to content

Commit 7dcbbb6

Browse files
authored
Merge pull request #186 from justinsb/avoid_spurious_messages
Improve watch event handling
2 parents 0867fae + a57cc03 commit 7dcbbb6

File tree

1 file changed

+33
-3
lines changed

1 file changed

+33
-3
lines changed

pkg/patterns/declarative/pkg/watch/dynamic.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ import (
2323

2424
"k8s.io/apimachinery/pkg/api/meta"
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2627
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/apimachinery/pkg/runtime/schema"
29+
"k8s.io/apimachinery/pkg/types"
2830
"k8s.io/apimachinery/pkg/watch"
2931
"k8s.io/client-go/dynamic"
3032
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -51,6 +53,10 @@ type dynamicWatch struct {
5153
client dynamic.Interface
5254
restMapper meta.RESTMapper
5355
events chan event.GenericEvent
56+
57+
// lastRV caches the last reported resource version.
58+
// This helps us avoid sending duplicate events (e.g. on a rewatch)
59+
lastRV map[types.NamespacedName]string
5460
}
5561

5662
func (dw *dynamicWatch) newDynamicClient(gvk schema.GroupVersionKind, namespace string) (dynamic.ResourceInterface, error) {
@@ -73,6 +79,8 @@ func (dw *dynamicWatch) Add(trigger schema.GroupVersionKind, options metav1.List
7379
return fmt.Errorf("creating client for (%s): %v", trigger.String(), err)
7480
}
7581

82+
dw.lastRV = make(map[types.NamespacedName]string)
83+
7684
go func() {
7785
for {
7886
dw.watchUntilClosed(client, trigger, options, filterNamespace, target)
@@ -116,11 +124,33 @@ func (dw *dynamicWatch) watchUntilClosed(client dynamic.ResourceInterface, trigg
116124
defer events.Stop()
117125

118126
for clientEvent := range events.ResultChan() {
119-
if clientEvent.Type == watch.Bookmark {
120-
// not an invalidation, we ignore it
127+
switch clientEvent.Type {
128+
case watch.Bookmark:
129+
// not an object change, we ignore it
121130
continue
131+
case watch.Error:
132+
log.Error(fmt.Errorf("unexpected error from watch: %v", clientEvent.Object), "error during watch")
133+
return
122134
}
123-
log.WithValues("type", clientEvent.Type).WithValues("kind", trigger.String()).Info("broadcasting event")
135+
136+
u := clientEvent.Object.(*unstructured.Unstructured)
137+
key := types.NamespacedName{Namespace: u.GetNamespace(), Name: u.GetName()}
138+
rv := u.GetResourceVersion()
139+
140+
switch clientEvent.Type {
141+
case watch.Deleted:
142+
// stop lastRV growing indefinitely
143+
delete(dw.lastRV, key)
144+
// We always send the delete notification
145+
case watch.Added, watch.Modified:
146+
if previousRV, found := dw.lastRV[key]; found && previousRV == rv {
147+
// Don't send spurious invalidations
148+
continue
149+
}
150+
dw.lastRV[key] = rv
151+
}
152+
153+
log.WithValues("type", clientEvent.Type).WithValues("kind", trigger.String()).WithValues("name", key.Name, "namespace", key.Namespace).Info("broadcasting event")
124154
dw.events <- event.GenericEvent{Object: clientObject{Object: clientEvent.Object, ObjectMeta: &target}}
125155
}
126156

0 commit comments

Comments
 (0)