Skip to content

Commit b6d106a

Browse files
committed
Improve watch event handling
Exit if we get an error notification. Check for duplicate messages and suppress them, as they are all going into the same channel. This avoids renotification when the watch connection is dropped.
1 parent 6b14c87 commit b6d106a

File tree

1 file changed

+33
-3
lines changed

1 file changed

+33
-3
lines changed

pkg/patterns/declarative/pkg/watch/dynamic.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ import (
2323

2424
"k8s.io/apimachinery/pkg/api/meta"
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2627
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/apimachinery/pkg/runtime/schema"
29+
"k8s.io/apimachinery/pkg/types"
2830
"k8s.io/apimachinery/pkg/watch"
2931
"k8s.io/client-go/dynamic"
3032
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -51,6 +53,10 @@ type dynamicWatch struct {
5153
client dynamic.Interface
5254
restMapper meta.RESTMapper
5355
events chan event.GenericEvent
56+
57+
// lastRV caches the last reported resource version.
58+
// This helps us avoid sending duplicate events (e.g. on a rewatch)
59+
lastRV map[types.NamespacedName]string
5460
}
5561

5662
func (dw *dynamicWatch) newDynamicClient(gvk schema.GroupVersionKind, namespace string) (dynamic.ResourceInterface, error) {
@@ -73,6 +79,8 @@ func (dw *dynamicWatch) Add(trigger schema.GroupVersionKind, options metav1.List
7379
return fmt.Errorf("creating client for (%s): %v", trigger.String(), err)
7480
}
7581

82+
dw.lastRV = make(map[types.NamespacedName]string)
83+
7684
go func() {
7785
for {
7886
dw.watchUntilClosed(client, trigger, options, filterNamespace, target)
@@ -116,11 +124,33 @@ func (dw *dynamicWatch) watchUntilClosed(client dynamic.ResourceInterface, trigg
116124
defer events.Stop()
117125

118126
for clientEvent := range events.ResultChan() {
119-
if clientEvent.Type == watch.Bookmark {
120-
// not an invalidation, we ignore it
127+
switch clientEvent.Type {
128+
case watch.Bookmark:
129+
// not an object change, we ignore it
121130
continue
131+
case watch.Error:
132+
log.Error(fmt.Errorf("unexpected error from watch: %v", clientEvent.Object), "error during watch")
133+
return
122134
}
123-
log.WithValues("type", clientEvent.Type).WithValues("kind", trigger.String()).Info("broadcasting event")
135+
136+
u := clientEvent.Object.(*unstructured.Unstructured)
137+
key := types.NamespacedName{Namespace: u.GetNamespace(), Name: u.GetName()}
138+
rv := u.GetResourceVersion()
139+
140+
switch clientEvent.Type {
141+
case watch.Deleted:
142+
// stop lastRV growing indefinitely
143+
delete(dw.lastRV, key)
144+
// We always send the delete notification
145+
case watch.Added, watch.Modified:
146+
if previousRV, found := dw.lastRV[key]; found && previousRV == rv {
147+
// Don't send spurious invalidations
148+
continue
149+
}
150+
dw.lastRV[key] = rv
151+
}
152+
153+
log.WithValues("type", clientEvent.Type).WithValues("kind", trigger.String(), "name", key.Name, "namespace", namespace).Info("broadcasting event")
124154
dw.events <- event.GenericEvent{Object: clientObject{Object: clientEvent.Object, ObjectMeta: &target}}
125155
}
126156

0 commit comments

Comments
 (0)