@@ -25,6 +25,7 @@ import (
25
25
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26
26
"k8s.io/apimachinery/pkg/runtime"
27
27
"k8s.io/apimachinery/pkg/runtime/schema"
28
+ "k8s.io/apimachinery/pkg/watch"
28
29
"k8s.io/client-go/dynamic"
29
30
"k8s.io/client-go/rest"
30
31
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -105,8 +106,10 @@ type clientObject struct {
105
106
func (dw * dynamicWatch ) watchUntilClosed (client dynamic.ResourceInterface , trigger schema.GroupVersionKind , options metav1.ListOptions , target metav1.ObjectMeta ) {
106
107
log := log .Log
107
108
108
- events , err := client .Watch (context .TODO (), options )
109
+ // Though we don't use the resource version, we allow bookmarks to help keep TCP connections healthy.
110
+ options .AllowWatchBookmarks = true
109
111
112
+ events , err := client .Watch (context .TODO (), options )
110
113
if err != nil {
111
114
log .WithValues ("kind" , trigger .String ()).WithValues ("namespace" , target .Namespace ).WithValues ("labels" , options .LabelSelector ).Error (err , "adding watch to dynamic client" )
112
115
return
@@ -118,6 +121,10 @@ func (dw *dynamicWatch) watchUntilClosed(client dynamic.ResourceInterface, trigg
118
121
defer events .Stop ()
119
122
120
123
for clientEvent := range events .ResultChan () {
124
+ if clientEvent .Type == watch .Bookmark {
125
+ // not an invalidation, we ignore it
126
+ continue
127
+ }
121
128
log .WithValues ("type" , clientEvent .Type ).WithValues ("kind" , trigger .String ()).Info ("broadcasting event" )
122
129
dw .events <- event.GenericEvent {Object : clientObject {Object : clientEvent .Object , ObjectMeta : & target }}
123
130
}
0 commit comments