Skip to content

Commit a479832

Browse files
committed
Alternative implementation that forks bits from kubernetes/client-go
1 parent f653d09 commit a479832

File tree

4 files changed

+852
-69
lines changed

4 files changed

+852
-69
lines changed

pkg/kcp/controller.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package kcp
2+
3+
import (
4+
"reflect"
5+
"sync"
6+
"time"
7+
"unsafe"
8+
9+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
10+
"k8s.io/apimachinery/pkg/util/wait"
11+
"k8s.io/client-go/tools/cache"
12+
"k8s.io/utils/clock"
13+
)
14+
15+
// `*controller` implements cache.Controller
16+
type controller struct {
17+
config cache.Config
18+
reflector *cache.Reflector
19+
reflectorMutex sync.RWMutex
20+
clock clock.Clock
21+
}
22+
23+
// New makes a new cache.Controller from the given cache.Config.
24+
func New(c *cache.Config) cache.Controller {
25+
ctlr := &controller{
26+
config: *c,
27+
clock: &clock.RealClock{},
28+
}
29+
return ctlr
30+
}
31+
32+
// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
33+
// It's an error to call Run more than once.
34+
// Run blocks; call via go.
35+
func (c *controller) Run(stopCh <-chan struct{}) {
36+
defer utilruntime.HandleCrash()
37+
go func() {
38+
<-stopCh
39+
c.config.Queue.Close()
40+
}()
41+
r := cache.NewReflector(
42+
c.config.ListerWatcher,
43+
c.config.ObjectType,
44+
c.config.Queue,
45+
c.config.FullResyncPeriod,
46+
)
47+
r.ShouldResync = c.config.ShouldResync
48+
r.WatchListPageSize = c.config.WatchListPageSize
49+
50+
reflector := reflect.Indirect(reflect.ValueOf(r))
51+
clockField := reflector.FieldByName("clock")
52+
ptrToClock := unsafe.Pointer(clockField.UnsafeAddr())
53+
realPtrToClock := (*clock.Clock)(ptrToClock)
54+
*realPtrToClock = c.clock
55+
if c.config.WatchErrorHandler != nil {
56+
handlerField := reflector.FieldByName("watchErrorHandler")
57+
ptrToHandler := unsafe.Pointer(handlerField.UnsafeAddr())
58+
realPtrToHandler := (*cache.WatchErrorHandler)(ptrToHandler)
59+
*realPtrToHandler = c.config.WatchErrorHandler
60+
}
61+
62+
c.reflectorMutex.Lock()
63+
c.reflector = r
64+
c.reflectorMutex.Unlock()
65+
66+
var wg wait.Group
67+
68+
wg.StartWithChannel(stopCh, r.Run)
69+
70+
wait.Until(c.processLoop, time.Second, stopCh)
71+
wg.Wait()
72+
}
73+
74+
// Returns true once this controller has completed an initial resource listing
75+
func (c *controller) HasSynced() bool {
76+
return c.config.Queue.HasSynced()
77+
}
78+
79+
func (c *controller) LastSyncResourceVersion() string {
80+
c.reflectorMutex.RLock()
81+
defer c.reflectorMutex.RUnlock()
82+
if c.reflector == nil {
83+
return ""
84+
}
85+
return c.reflector.LastSyncResourceVersion()
86+
}
87+
88+
// processLoop drains the work queue.
89+
// TODO: Consider doing the processing in parallel. This will require a little thought
90+
// to make sure that we don't end up processing the same object multiple times
91+
// concurrently.
92+
//
93+
// TODO: Plumb through the stopCh here (and down to the queue) so that this can
94+
// actually exit when the controller is stopped. Or just give up on this stuff
95+
// ever being stoppable. Converting this whole package to use Context would
96+
// also be helpful.
97+
func (c *controller) processLoop() {
98+
for {
99+
obj, err := c.config.Queue.Pop(cache.PopProcessFunc(c.config.Process))
100+
if err != nil {
101+
if err == cache.ErrFIFOClosed {
102+
return
103+
}
104+
if c.config.RetryOnError {
105+
// This is the safe way to re-enqueue.
106+
c.config.Queue.AddIfNotPresent(obj)
107+
}
108+
}
109+
}
110+
}
111+
112+
// Multiplexes updates in the form of a list of cache.Deltas into a cache.Store, and informs
113+
// a given handler of events OnUpdate, OnAdd, OnDelete
114+
func processDeltas(
115+
// Object which receives event notifications from the given deltas
116+
handler cache.ResourceEventHandler,
117+
clientState cache.Store,
118+
transformer cache.TransformFunc,
119+
deltas cache.Deltas,
120+
) error {
121+
// from oldest to newest
122+
for _, d := range deltas {
123+
obj := d.Object
124+
if transformer != nil {
125+
var err error
126+
obj, err = transformer(obj)
127+
if err != nil {
128+
return err
129+
}
130+
}
131+
132+
switch d.Type {
133+
case cache.Sync, cache.Replaced, cache.Added, cache.Updated:
134+
if old, exists, err := clientState.Get(obj); err == nil && exists {
135+
if err := clientState.Update(obj); err != nil {
136+
return err
137+
}
138+
handler.OnUpdate(old, obj)
139+
} else {
140+
if err := clientState.Add(obj); err != nil {
141+
return err
142+
}
143+
handler.OnAdd(obj)
144+
}
145+
case cache.Deleted:
146+
if err := clientState.Delete(obj); err != nil {
147+
return err
148+
}
149+
handler.OnDelete(obj)
150+
}
151+
}
152+
return nil
153+
}

0 commit comments

Comments
 (0)