Skip to content

Commit a4b17f6

Browse files
committed
Add hacky kcp informer func
1 parent 7fb59e4 commit a4b17f6

File tree

2 files changed

+112
-0
lines changed

2 files changed

+112
-0
lines changed

pkg/client/interfaces.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ package client
1818

1919
import (
2020
"context"
21+
"time"
2122

2223
"github.com/kcp-dev/logicalcluster"
2324

2425
apierrors "k8s.io/apimachinery/pkg/api/errors"
26+
"k8s.io/client-go/tools/cache"
2527

2628
"k8s.io/apimachinery/pkg/api/meta"
2729
"k8s.io/apimachinery/pkg/runtime"
@@ -48,6 +50,8 @@ type Patch interface {
4850
Data(obj Object) ([]byte, error)
4951
}
5052

53+
type NewInformerFunc func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer
54+
5155
// TODO(directxman12): is there a sane way to deal with get/delete options?
5256

5357
// Reader knows how to read and list Kubernetes objects.

pkg/kcp/informers.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package kcp
2+
3+
import (
4+
"reflect"
5+
"time"
6+
"unsafe"
7+
8+
k8sruntime "k8s.io/apimachinery/pkg/runtime"
9+
"k8s.io/client-go/tools/cache"
10+
"sigs.k8s.io/controller-runtime/pkg/client"
11+
)
12+
13+
type clusterAwareSharedIndexInformer struct {
14+
delegate cache.SharedIndexInformer
15+
keyFunc cache.KeyFunc
16+
}
17+
18+
func NewClusterAwareSharedIndexInformerFunc(keyFunc cache.KeyFunc) client.NewInformerFunc {
19+
//TODO(kcp) could hardcode kcp indexers here or leave them plumbed through so they are passed when this function is called
20+
return func(lw cache.ListerWatcher, exampleObject k8sruntime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
21+
return &clusterAwareSharedIndexInformer{
22+
delegate: cache.NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, indexers),
23+
keyFunc: keyFunc,
24+
}
25+
}
26+
}
27+
28+
func (i *clusterAwareSharedIndexInformer) AddEventHandler(handler cache.ResourceEventHandler) {
29+
i.delegate.AddEventHandler(handler)
30+
}
31+
32+
func (i *clusterAwareSharedIndexInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) {
33+
i.delegate.AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
34+
}
35+
36+
func (i *clusterAwareSharedIndexInformer) GetStore() cache.Store {
37+
return i.delegate.GetStore()
38+
}
39+
40+
func (i *clusterAwareSharedIndexInformer) GetController() cache.Controller {
41+
return i.delegate.GetController()
42+
}
43+
44+
func (i *clusterAwareSharedIndexInformer) Run(stopCh <-chan struct{}) {
45+
go i.delegate.Run(stopCh)
46+
// TODO(kcp) this is fragile
47+
time.Sleep(10000)
48+
// need to update keyfunc in:
49+
// [x] i.delegate.indexer.keyFunc
50+
// [x] i.delegate.controller.config.Queue.keyFunc
51+
// [x] i.delegate.controller.reflector.store.keyFunc
52+
53+
indexer := i.delegate.GetIndexer()
54+
i.setKeyFunc(indexer)
55+
56+
pointerVal := reflect.ValueOf(i.delegate)
57+
val := reflect.Indirect(pointerVal)
58+
controllerField := val.FieldByName("controller")
59+
ptrToController := unsafe.Pointer(controllerField.UnsafeAddr())
60+
controller := reflect.NewAt(controllerField.Type(), ptrToController).Elem().Interface()
61+
62+
configval := reflect.Indirect(reflect.ValueOf(controller)).FieldByName("config")
63+
ptrToConfig := unsafe.Pointer(configval.UnsafeAddr())
64+
config := (*cache.Config)(ptrToConfig)
65+
deltaFifo := config.Queue.(*cache.DeltaFIFO)
66+
i.setKeyFunc(deltaFifo)
67+
68+
reflectorval := reflect.Indirect(reflect.ValueOf(controller)).FieldByName("reflector")
69+
ptrToReflector := unsafe.Pointer(reflectorval.UnsafeAddr())
70+
reflector := (*cache.Reflector)(ptrToReflector)
71+
72+
storeval := reflect.Indirect(reflect.ValueOf(reflector)).FieldByName("store")
73+
ptrToStore := unsafe.Pointer(storeval.UnsafeAddr())
74+
deltaFifo = (*cache.DeltaFIFO)(ptrToStore)
75+
i.setKeyFunc(deltaFifo)
76+
77+
// deltaFifo := config.Queue.(*cache.DeltaFIFO)
78+
// i.setKeyFunc(deltaFifo)
79+
}
80+
81+
func (i *clusterAwareSharedIndexInformer) setKeyFunc(x interface{}) {
82+
pointerVal := reflect.ValueOf(x)
83+
val := reflect.Indirect(pointerVal)
84+
member := val.FieldByName("keyFunc")
85+
ptrToY := unsafe.Pointer(member.UnsafeAddr())
86+
realPtrToY := (*cache.KeyFunc)(ptrToY)
87+
*realPtrToY = i.keyFunc
88+
}
89+
90+
func (i *clusterAwareSharedIndexInformer) HasSynced() bool {
91+
return i.delegate.HasSynced()
92+
}
93+
94+
func (i *clusterAwareSharedIndexInformer) LastSyncResourceVersion() string {
95+
return i.delegate.LastSyncResourceVersion()
96+
}
97+
98+
func (i *clusterAwareSharedIndexInformer) SetWatchErrorHandler(handler cache.WatchErrorHandler) error {
99+
return i.delegate.SetWatchErrorHandler(handler)
100+
}
101+
102+
func (i *clusterAwareSharedIndexInformer) AddIndexers(indexers cache.Indexers) error {
103+
return i.delegate.AddIndexers(indexers)
104+
}
105+
106+
func (i *clusterAwareSharedIndexInformer) GetIndexer() cache.Indexer {
107+
return i.delegate.GetIndexer()
108+
}

0 commit comments

Comments
 (0)