Skip to content

Commit 1cc6939

Browse files
committed
Add EventHandler reference counting
1 parent 579fb28 commit 1cc6939

File tree

8 files changed

+167
-13
lines changed

8 files changed

+167
-13
lines changed

pkg/cache/cache.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,14 @@ type Informer interface {
8787
AddIndexers(indexers toolscache.Indexers) error
8888
//HasSynced return true if the informers underlying store has synced
8989
HasSynced() bool
90+
91+
// RemoveEventHandler currently just decrements a the count of event handlers
92+
// The goals it to have SharedInformer support RemoveEventHandler (and actually remove
93+
// the handler instead of just decrementing a count).
94+
RemoveEventHandler(id int) error
95+
96+
// CountEventHandlers returns the number of event handlers added to an informer.
97+
CountEventHandlers() int
9098
}
9199

92100
// Options are the optional arguments for creating a new InformersMap object

pkg/cache/informer_cache.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,5 @@ func (ip *informerCache) Remove(ctx context.Context, obj runtime.Object) error {
243243
return err
244244
}
245245

246-
ip.InformersMap.Remove(gvk, obj)
247-
return nil
246+
return ip.InformersMap.Remove(gvk, obj)
248247
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package internal
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"k8s.io/client-go/tools/cache"
8+
)
9+
10+
// CountingInformer exposes a way to track the number of EventHandlers
11+
// registered on an Informer.
12+
type CountingInformer interface {
13+
cache.SharedIndexInformer
14+
CountEventHandlers() int
15+
RemoveEventHandler(id int) error
16+
}
17+
18+
// HandlerCountingInformer implements the CountingInformer.
19+
// It increments the count every time AddEventHandler is called,
20+
// and decrements the count every time RemoveEventHandler is called.
21+
//
22+
// It doesn't actually RemoveEventHandlers because that feature is not implemented
23+
// in client-go, but we're are naming it this way to suggest what the interface would look
24+
// like if/when it does get added to client-go.
25+
//
26+
// We can get rid of this if apimachinery adds the ability to retrieve this from the SharedIndexInformer
27+
// but until then, we have to track it ourselves
28+
type HandlerCountingInformer struct {
29+
// Informer is the cached informer
30+
informer cache.SharedIndexInformer
31+
32+
// count indicates the number of EventHandlers registered on the informer
33+
count int
34+
}
35+
36+
func (i *HandlerCountingInformer) RemoveEventHandler(id int) error {
37+
i.count--
38+
fmt.Printf("decrement, count is %+v\n", i.count)
39+
return nil
40+
}
41+
42+
func (i *HandlerCountingInformer) AddEventHandler(handler cache.ResourceEventHandler) {
43+
i.count++
44+
fmt.Printf("increment, count is %+v\n", i.count)
45+
i.informer.AddEventHandler(handler)
46+
}
47+
48+
func (i *HandlerCountingInformer) CountEventHandlers() int {
49+
return i.count
50+
}
51+
52+
func (i *HandlerCountingInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) {
53+
i.count++
54+
i.informer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
55+
}
56+
func (i *HandlerCountingInformer) AddIndexers(indexers cache.Indexers) error {
57+
return i.informer.AddIndexers(indexers)
58+
}
59+
60+
func (i *HandlerCountingInformer) HasSynced() bool {
61+
return i.informer.HasSynced()
62+
}
63+
64+
func (i *HandlerCountingInformer) GetStore() cache.Store {
65+
return i.informer.GetStore()
66+
}
67+
68+
func (i *HandlerCountingInformer) GetController() cache.Controller {
69+
return i.informer.GetController()
70+
}
71+
72+
func (i *HandlerCountingInformer) LastSyncResourceVersion() string {
73+
return i.informer.LastSyncResourceVersion()
74+
}
75+
76+
func (i *HandlerCountingInformer) SetWatchErrorHandler(handler cache.WatchErrorHandler) error {
77+
return i.informer.SetWatchErrorHandler(handler)
78+
}
79+
80+
func (i *HandlerCountingInformer) GetIndexer() cache.Indexer {
81+
return i.informer.GetIndexer()
82+
}
83+
84+
func (i *HandlerCountingInformer) Run(stopCh <-chan struct{}) {
85+
i.informer.Run(stopCh)
86+
}

pkg/cache/internal/deleg_map.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,16 +93,16 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj
9393
}
9494

9595
// Remove will remove an new Informer from the InformersMap and stop it if it exists.
96-
func (m *InformersMap) Remove(gvk schema.GroupVersionKind, obj runtime.Object) {
96+
func (m *InformersMap) Remove(gvk schema.GroupVersionKind, obj runtime.Object) error {
9797
_, isUnstructured := obj.(*unstructured.Unstructured)
9898
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
9999
isUnstructured = isUnstructured || isUnstructuredList
100100

101101
switch {
102102
case isUnstructured:
103-
m.unstructured.Remove(gvk)
103+
return m.unstructured.Remove(gvk)
104104
default:
105-
m.structured.Remove(gvk)
105+
return m.structured.Remove(gvk)
106106
}
107107
}
108108

pkg/cache/internal/informers_map.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ func newSpecificInformersMap(config *rest.Config,
6565

6666
// MapEntry contains the cached data for an Informer
6767
type MapEntry struct {
68-
// Informer is the cached informer
69-
Informer cache.SharedIndexInformer
68+
// Informer is a SharedIndexInformer with addition count and remove event handler functionality.
69+
Informer CountingInformer
7070

7171
// CacheReader wraps Informer and implements the CacheReader interface for a single type
7272
Reader CacheReader
@@ -230,7 +230,7 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
230230
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
231231
})
232232
i := &MapEntry{
233-
Informer: ni,
233+
Informer: &HandlerCountingInformer{ni, 0},
234234
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
235235
stop: make(chan struct{}),
236236
}
@@ -241,21 +241,34 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
241241
// can you add eventhandlers?
242242
if ip.started {
243243
go i.Start(ip.stop)
244+
//go i.Start(StopOptions{
245+
// StopChannel: ip.stop,
246+
//})
244247
}
245248
return i, ip.started, nil
246249
}
247250

248251
// Remove removes an informer entry and stops it if it was running.
249-
func (ip *specificInformersMap) Remove(gvk schema.GroupVersionKind) {
252+
func (ip *specificInformersMap) Remove(gvk schema.GroupVersionKind) error {
250253
ip.mu.Lock()
251254
defer ip.mu.Unlock()
252255

253256
entry, ok := ip.informersByGVK[gvk]
254257
if !ok {
255-
return
258+
return nil
256259
}
260+
261+
chInformer, ok := entry.Informer.(*HandlerCountingInformer)
262+
if !ok {
263+
return fmt.Errorf("entry informer is not a HandlerCountingInformer")
264+
}
265+
if chInformer.CountEventHandlers() != 0 {
266+
return fmt.Errorf("attempting to remove informer with %d references", chInformer.CountEventHandlers())
267+
}
268+
257269
close(entry.stop)
258270
delete(ip.informersByGVK, gvk)
271+
return nil
259272
}
260273

261274
// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.

pkg/cache/multi_namespace_cache.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,23 @@ type multiNamespaceInformer struct {
195195

196196
var _ Informer = &multiNamespaceInformer{}
197197

198+
func (i *multiNamespaceInformer) CountEventHandlers() int {
199+
total := 0
200+
for _, informer := range i.namespaceToInformer {
201+
total += informer.CountEventHandlers()
202+
}
203+
return total
204+
}
205+
206+
func (i *multiNamespaceInformer) RemoveEventHandler(id int) error {
207+
for _, informer := range i.namespaceToInformer {
208+
if err := informer.RemoveEventHandler(id); err != nil {
209+
return err
210+
}
211+
}
212+
return nil
213+
}
214+
198215
// AddEventHandler adds the handler to each namespaced informer
199216
func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) {
200217
for _, informer := range i.namespaceToInformer {

pkg/internal/controller/controller.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,17 @@ func (c *Controller) Start(ctx context.Context) error {
158158
// caches to sync so that they have a chance to register their intendeded
159159
// caches.
160160
for _, watch := range c.startWatches {
161-
c.Log.Info("Starting EventSource", "source", watch.src)
162-
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
163-
return err
161+
stoppableSource, ok := watch.src.(source.StoppableSource)
162+
if ok {
163+
// TODO: use errgroup or waitgroup to not return until all goros have exited
164+
// (or something else to prevent leaks)
165+
go stoppableSource.StartStoppable(ctx, watch.handler, c.Queue, watch.predicates...)
166+
c.Log.Info("Starting STOPPABLE EventSource", "source", watch.src)
167+
} else {
168+
c.Log.Info("Starting EventSource", "source", watch.src)
169+
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
170+
return err
171+
}
164172
}
165173
}
166174

pkg/source/source.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ type SyncingSource interface {
6464
WaitForSync(ctx context.Context) error
6565
}
6666

67+
// StoppableSource expands the Source interface to add a start method that
68+
// blocks on the context's Done channel, so that we know when the controller has
69+
// been stopped and can remove/decremnt the EventHandler count on the informer appropriately.
70+
type StoppableSource interface {
71+
Source
72+
StartStoppable(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
73+
}
74+
6775
// NewKindWithCache creates a Source without InjectCache, so that it is assured that the given cache is used
6876
// and not overwritten. It can be used to watch objects in a different cluster by passing the cache
6977
// from that other cluster
@@ -123,6 +131,21 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
123131
return nil
124132
}
125133

134+
// StartStoppable blocks for start to finish and then calls RemoveEventHandler on the kind's informer.
135+
func (ks *Kind) StartStoppable(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
136+
prct ...predicate.Predicate) error {
137+
i, err := ks.cache.GetInformer(ctx, ks.Type)
138+
if err != nil {
139+
return err
140+
}
141+
if err := ks.Start(ctx, handler, queue, prct...); err != nil {
142+
return err
143+
}
144+
<-ctx.Done()
145+
i.RemoveEventHandler(-1)
146+
return nil
147+
}
148+
126149
func (ks *Kind) String() string {
127150
if ks.Type != nil && ks.Type.GetObjectKind() != nil {
128151
return fmt.Sprintf("kind source: %v", ks.Type.GetObjectKind().GroupVersionKind().String())

0 commit comments

Comments
 (0)