|
| 1 | +package kcp |
| 2 | + |
| 3 | +import ( |
| 4 | + "reflect" |
| 5 | + "time" |
| 6 | + "unsafe" |
| 7 | + |
| 8 | + k8sruntime "k8s.io/apimachinery/pkg/runtime" |
| 9 | + "k8s.io/client-go/tools/cache" |
| 10 | + "sigs.k8s.io/controller-runtime/pkg/client" |
| 11 | +) |
| 12 | + |
| 13 | +type clusterAwareSharedIndexInformer struct { |
| 14 | + delegate cache.SharedIndexInformer |
| 15 | + keyFunc cache.KeyFunc |
| 16 | +} |
| 17 | + |
| 18 | +func NewClusterAwareSharedIndexInformerFunc(keyFunc cache.KeyFunc) client.NewInformerFunc { |
| 19 | + //TODO(kcp) could hardcode kcp indexers here or leave them plumbed through so they are passed when this function is called |
| 20 | + return func(lw cache.ListerWatcher, exampleObject k8sruntime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { |
| 21 | + return &clusterAwareSharedIndexInformer{ |
| 22 | + delegate: cache.NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, indexers), |
| 23 | + keyFunc: keyFunc, |
| 24 | + } |
| 25 | + } |
| 26 | +} |
| 27 | + |
| 28 | +func (i *clusterAwareSharedIndexInformer) AddEventHandler(handler cache.ResourceEventHandler) { |
| 29 | + i.delegate.AddEventHandler(handler) |
| 30 | +} |
| 31 | + |
| 32 | +func (i *clusterAwareSharedIndexInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) { |
| 33 | + i.delegate.AddEventHandlerWithResyncPeriod(handler, resyncPeriod) |
| 34 | +} |
| 35 | + |
| 36 | +func (i *clusterAwareSharedIndexInformer) GetStore() cache.Store { |
| 37 | + return i.delegate.GetStore() |
| 38 | +} |
| 39 | + |
| 40 | +func (i *clusterAwareSharedIndexInformer) GetController() cache.Controller { |
| 41 | + return i.delegate.GetController() |
| 42 | +} |
| 43 | + |
| 44 | +func (i *clusterAwareSharedIndexInformer) Run(stopCh <-chan struct{}) { |
| 45 | + go i.delegate.Run(stopCh) |
| 46 | + // TODO(kcp) this is fragile |
| 47 | + time.Sleep(10000) |
| 48 | + // need to update keyfunc in: |
| 49 | + // [x] i.delegate.indexer.keyFunc |
| 50 | + // [x] i.delegate.controller.config.Queue.keyFunc |
| 51 | + // [x] i.delegate.controller.reflector.store.keyFunc |
| 52 | + |
| 53 | + indexer := i.delegate.GetIndexer() |
| 54 | + i.setKeyFunc(indexer) |
| 55 | + |
| 56 | + pointerVal := reflect.ValueOf(i.delegate) |
| 57 | + val := reflect.Indirect(pointerVal) |
| 58 | + controllerField := val.FieldByName("controller") |
| 59 | + ptrToController := unsafe.Pointer(controllerField.UnsafeAddr()) |
| 60 | + controller := reflect.NewAt(controllerField.Type(), ptrToController).Elem().Interface() |
| 61 | + |
| 62 | + configval := reflect.Indirect(reflect.ValueOf(controller)).FieldByName("config") |
| 63 | + ptrToConfig := unsafe.Pointer(configval.UnsafeAddr()) |
| 64 | + config := (*cache.Config)(ptrToConfig) |
| 65 | + deltaFifo := config.Queue.(*cache.DeltaFIFO) |
| 66 | + i.setKeyFunc(deltaFifo) |
| 67 | + |
| 68 | + reflectorval := reflect.Indirect(reflect.ValueOf(controller)).FieldByName("reflector") |
| 69 | + ptrToReflector := unsafe.Pointer(reflectorval.UnsafeAddr()) |
| 70 | + reflector := (*cache.Reflector)(ptrToReflector) |
| 71 | + |
| 72 | + storeval := reflect.Indirect(reflect.ValueOf(reflector)).FieldByName("store") |
| 73 | + ptrToStore := unsafe.Pointer(storeval.UnsafeAddr()) |
| 74 | + deltaFifo = (*cache.DeltaFIFO)(ptrToStore) |
| 75 | + i.setKeyFunc(deltaFifo) |
| 76 | +} |
| 77 | + |
| 78 | +func (i *clusterAwareSharedIndexInformer) setKeyFunc(x interface{}) { |
| 79 | + pointerVal := reflect.ValueOf(x) |
| 80 | + val := reflect.Indirect(pointerVal) |
| 81 | + member := val.FieldByName("keyFunc") |
| 82 | + ptrToY := unsafe.Pointer(member.UnsafeAddr()) |
| 83 | + realPtrToY := (*cache.KeyFunc)(ptrToY) |
| 84 | + *realPtrToY = i.keyFunc |
| 85 | +} |
| 86 | + |
| 87 | +func (i *clusterAwareSharedIndexInformer) HasSynced() bool { |
| 88 | + return i.delegate.HasSynced() |
| 89 | +} |
| 90 | + |
| 91 | +func (i *clusterAwareSharedIndexInformer) LastSyncResourceVersion() string { |
| 92 | + return i.delegate.LastSyncResourceVersion() |
| 93 | +} |
| 94 | + |
| 95 | +func (i *clusterAwareSharedIndexInformer) SetWatchErrorHandler(handler cache.WatchErrorHandler) error { |
| 96 | + return i.delegate.SetWatchErrorHandler(handler) |
| 97 | +} |
| 98 | + |
| 99 | +func (i *clusterAwareSharedIndexInformer) AddIndexers(indexers cache.Indexers) error { |
| 100 | + return i.delegate.AddIndexers(indexers) |
| 101 | +} |
| 102 | + |
| 103 | +func (i *clusterAwareSharedIndexInformer) GetIndexer() cache.Indexer { |
| 104 | + return i.delegate.GetIndexer() |
| 105 | +} |
0 commit comments