Skip to content

Commit 0424f89

Browse files
committed
Refactor cache package.
1 parent 5a1df6f commit 0424f89

File tree

11 files changed

+942
-725
lines changed

11 files changed

+942
-725
lines changed

pkg/cache/cache.go

Lines changed: 64 additions & 296 deletions
Original file line numberDiff line numberDiff line change
@@ -17,333 +17,101 @@ limitations under the License.
1717
package cache
1818

1919
import (
20-
"context"
21-
"fmt"
22-
"reflect"
23-
24-
"k8s.io/apimachinery/pkg/api/errors"
25-
apimeta "k8s.io/apimachinery/pkg/api/meta"
26-
"k8s.io/apimachinery/pkg/fields"
27-
"k8s.io/apimachinery/pkg/labels"
28-
"k8s.io/apimachinery/pkg/runtime"
29-
"k8s.io/apimachinery/pkg/runtime/schema"
30-
"k8s.io/apimachinery/pkg/selection"
31-
"k8s.io/client-go/tools/cache"
20+
"time"
3221

22+
"github.com/kubernetes-sigs/controller-runtime/pkg/cache/internal"
3323
"github.com/kubernetes-sigs/controller-runtime/pkg/client"
24+
"github.com/kubernetes-sigs/controller-runtime/pkg/client/apiutil"
3425
logf "github.com/kubernetes-sigs/controller-runtime/pkg/runtime/log"
26+
"k8s.io/apimachinery/pkg/api/meta"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/runtime/schema"
29+
"k8s.io/client-go/kubernetes/scheme"
30+
"k8s.io/client-go/rest"
3531
toolscache "k8s.io/client-go/tools/cache"
3632
)
3733

3834
var log = logf.KBLog.WithName("object-cache")
3935

40-
// objectCache is a ReadInterface
41-
var _ client.ReadInterface = &objectCache{}
36+
// Cache implements ReadInterface by reading objects from a cache populated by Informers
37+
type Cache interface {
38+
// Cache implements the client ReadInterface
39+
client.ReadInterface
4240

43-
// objectCache is a Kubernetes Object cache populated from Informers
44-
type objectCache struct {
45-
cachesByType map[reflect.Type]*singleObjectCache
46-
scheme *runtime.Scheme
47-
informers *informers
41+
// Cache implements Informers
42+
Informers
4843
}
4944

50-
var _ client.ReadInterface = &objectCache{}
45+
// Informers knows how to create or fetch informers for different group-version-kinds.
46+
// It's safe to call GetInformer from multiple threads.
47+
type Informers interface {
48+
// GetInformer fetches or constructs an informer for the given object that corresponds to a single
49+
// API kind and resource.
50+
GetInformer(obj runtime.Object) (toolscache.SharedIndexInformer, error)
5151

52-
// addInformer adds an informer to the objectCache
53-
func (o *objectCache) addInformer(gvk schema.GroupVersionKind, c cache.SharedIndexInformer) {
54-
obj, err := o.scheme.New(gvk)
55-
if err != nil {
56-
log.Error(err, "could not register informer in objectCache for GVK", "GroupVersionKind", gvk)
57-
return
58-
}
59-
if o.has(obj) {
60-
return
61-
}
62-
o.registerCache(obj, gvk, c.GetIndexer())
63-
}
64-
65-
func (o *objectCache) registerCache(obj runtime.Object, gvk schema.GroupVersionKind, store cache.Indexer) {
66-
objType := reflect.TypeOf(obj)
67-
o.cachesByType[objType] = &singleObjectCache{
68-
Indexer: store,
69-
GroupVersionKind: gvk,
70-
}
71-
}
72-
73-
func (o *objectCache) has(obj runtime.Object) bool {
74-
objType := reflect.TypeOf(obj)
75-
_, found := o.cachesByType[objType]
76-
return found
77-
}
78-
79-
func (o *objectCache) init(obj runtime.Object) error {
80-
i, err := o.informers.GetInformer(obj)
81-
if err != nil {
82-
return err
83-
}
84-
if o.informers.started {
85-
log.Info("Waiting to sync cache for type.", "Type", fmt.Sprintf("%T", obj))
86-
toolscache.WaitForCacheSync(o.informers.stop, i.HasSynced)
87-
log.Info("Finished to syncing cache for type.", "Type", fmt.Sprintf("%T", obj))
88-
} else {
89-
return fmt.Errorf("must start Cache before calling Get or List %s %s",
90-
"Object", fmt.Sprintf("%T", obj))
91-
}
92-
return nil
93-
}
94-
95-
func (o *objectCache) cacheFor(obj runtime.Object) (*singleObjectCache, error) {
96-
if !o.informers.started {
97-
return nil, fmt.Errorf("must start Cache before calling Get or List %s %s",
98-
"Object", fmt.Sprintf("%T", obj))
99-
}
100-
objType := reflect.TypeOf(obj)
52+
// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
53+
// of the underlying object.
54+
GetInformerForKind(gvk schema.GroupVersionKind) (toolscache.SharedIndexInformer, error)
10155

102-
cache, isKnown := o.cachesByType[objType]
103-
if !isKnown {
104-
return nil, fmt.Errorf("no Cache found for %T - must call GetInformer", obj)
105-
}
106-
return cache, nil
107-
}
108-
109-
// Get implements populatingClient.ReadInterface
110-
func (o *objectCache) Get(ctx context.Context, key client.ObjectKey, out runtime.Object) error {
111-
// Make sure there is a Cache for this type
112-
if !o.has(out) {
113-
err := o.init(out)
114-
if err != nil {
115-
return err
116-
}
117-
}
118-
119-
cache, err := o.cacheFor(out)
120-
if err != nil {
121-
return err
122-
}
123-
124-
return cache.Get(ctx, key, out)
125-
}
126-
127-
// List implements populatingClient.ReadInterface
128-
func (o *objectCache) List(ctx context.Context, opts *client.ListOptions, out runtime.Object) error {
129-
itemsPtr, err := apimeta.GetItemsPtr(out)
130-
if err != nil {
131-
return nil
132-
}
133-
134-
ro, ok := itemsPtr.(runtime.Object)
135-
if ok && !o.has(ro) {
136-
err = o.init(ro)
137-
if err != nil {
138-
return err
139-
}
140-
}
141-
142-
// http://knowyourmeme.com/memes/this-is-fine
143-
outType := reflect.Indirect(reflect.ValueOf(itemsPtr)).Type().Elem()
144-
cache, isKnown := o.cachesByType[outType]
145-
if !isKnown {
146-
return fmt.Errorf("no cache for objects of type %T", out)
147-
}
148-
return cache.List(ctx, opts, out)
149-
}
56+
// Start runs all the informers known to this cache until the given channel is closed.
57+
// It does not block.
58+
Start(stopCh <-chan struct{}) error
15059

151-
// singleObjectCache is a ReadInterface
152-
var _ client.ReadInterface = &singleObjectCache{}
60+
// WaitForCacheSync waits for all the caches to sync. Returns false if it could not sync a cache.
61+
WaitForCacheSync(stop <-chan struct{}) bool
15362

154-
// singleObjectCache is a ReadInterface that retrieves objects
155-
// from a single local cache populated by a watch.
156-
type singleObjectCache struct {
157-
// Indexer is the underlying indexer wrapped by this cache.
158-
Indexer cache.Indexer
159-
// GroupVersionKind is the group-version-kind of the resource.
160-
GroupVersionKind schema.GroupVersionKind
63+
// IndexField adds an index to a field.
64+
IndexField(obj runtime.Object, field string, extractValue client.IndexerFunc) error
16165
}
16266

163-
// Get implements populatingClient.Client
164-
func (c *singleObjectCache) Get(_ context.Context, key client.ObjectKey, out runtime.Object) error {
165-
storeKey := objectKeyToStoreKey(key)
166-
obj, exists, err := c.Indexer.GetByKey(storeKey)
167-
if err != nil {
168-
return err
169-
}
170-
if !exists {
171-
// Resource gets transformed into Kind in the error anyway, so this is fine
172-
return errors.NewNotFound(schema.GroupResource{
173-
Group: c.GroupVersionKind.Group,
174-
Resource: c.GroupVersionKind.Kind,
175-
}, key.Name)
176-
}
177-
if _, isObj := obj.(runtime.Object); !isObj {
178-
return fmt.Errorf("cache contained %T, which is not an Object", obj)
179-
}
67+
// Options are the optional arguments for creating a new Informers object
68+
type Options struct {
69+
// Scheme is the scheme to use for mapping objects to GroupVersionKinds
70+
Scheme *runtime.Scheme
18071

181-
// deep copy to avoid mutating cache
182-
// TODO(directxman12): revisit the decision to always deepcopy
183-
obj = obj.(runtime.Object).DeepCopyObject()
72+
// Mapper is the RESTMapper to use for mapping GroupVersionKinds to Resources
73+
Mapper meta.RESTMapper
18474

185-
// TODO(directxman12): this is a terrible hack, pls fix
186-
// (we should have deepcopyinto)
187-
outVal := reflect.ValueOf(out)
188-
objVal := reflect.ValueOf(obj)
189-
if !objVal.Type().AssignableTo(outVal.Type()) {
190-
return fmt.Errorf("cache had type %s, but %s was asked for", objVal.Type(), outVal.Type())
191-
}
192-
reflect.Indirect(outVal).Set(reflect.Indirect(objVal))
193-
return nil
75+
// Resync is the resync period
76+
Resync *time.Duration
19477
}
19578

196-
// List implements populatingClient.Client
197-
func (c *singleObjectCache) List(ctx context.Context, opts *client.ListOptions, out runtime.Object) error {
198-
var objs []interface{}
199-
var err error
200-
201-
if opts != nil && opts.FieldSelector != nil {
202-
// TODO(directxman12): support more complicated field selectors by
203-
// combining multiple indicies, GetIndexers, etc
204-
field, val, requiresExact := requiresExactMatch(opts.FieldSelector)
205-
if !requiresExact {
206-
return fmt.Errorf("non-exact field matches are not supported by the cache")
207-
}
208-
// list all objects by the field selector. If this is namespaced and we have one, ask for the
209-
// namespaced index key. Otherwise, ask for the non-namespaced variant by using the fake "all namespaces"
210-
// namespace.
211-
objs, err = c.Indexer.ByIndex(fieldIndexName(field), keyToNamespacedKey(opts.Namespace, val))
212-
} else if opts != nil && opts.Namespace != "" {
213-
objs, err = c.Indexer.ByIndex(cache.NamespaceIndex, opts.Namespace)
214-
} else {
215-
objs = c.Indexer.List()
216-
}
217-
if err != nil {
218-
return err
219-
}
220-
var labelSel labels.Selector
221-
if opts != nil && opts.LabelSelector != nil {
222-
labelSel = opts.LabelSelector
223-
}
79+
var _ Informers = &cache{}
80+
var _ client.ReadInterface = &cache{}
81+
var _ Cache = &cache{}
22482

225-
outItems, err := c.getListItems(objs, labelSel)
226-
if err != nil {
227-
return err
228-
}
229-
return apimeta.SetList(out, outItems)
83+
// cache is a Kubernetes Object cache populated from Informers. cache wraps a CacheProvider and InformerProvider.
84+
type cache struct {
85+
*internal.CacheProvider
86+
*internal.InformerProvider
23087
}
23188

232-
func (c *singleObjectCache) getListItems(objs []interface{}, labelSel labels.Selector) ([]runtime.Object, error) {
233-
outItems := make([]runtime.Object, 0, len(objs))
234-
for _, item := range objs {
235-
obj, isObj := item.(runtime.Object)
236-
if !isObj {
237-
return nil, fmt.Errorf("cache contained %T, which is not an Object", obj)
238-
}
239-
meta, err := apimeta.Accessor(obj)
240-
if err != nil {
241-
return nil, err
242-
}
243-
if labelSel != nil {
244-
lbls := labels.Set(meta.GetLabels())
245-
if !labelSel.Matches(lbls) {
246-
continue
247-
}
248-
}
249-
outItems = append(outItems, obj.DeepCopyObject())
89+
// New initializes and returns a new Cache
90+
func New(config *rest.Config, opts Options) (Cache, error) {
91+
// Use the default Kubernetes Scheme if unset
92+
if opts.Scheme == nil {
93+
opts.Scheme = scheme.Scheme
25094
}
251-
return outItems, nil
252-
}
253-
254-
// TODO: Make an interface with this function that has an Informers as an object on the struct
255-
// that automatically calls GetInformer and passes in the Indexer into indexByField
256-
257-
// noNamespaceNamespace is used as the "namespace" when we want to list across all namespaces
258-
const allNamespacesNamespace = "__all_namespaces"
25995

260-
// IndexField adds an indexer to the underlying cache, using extraction function to get
261-
// value(s) from the given field. This index can then be used by passing a field selector
262-
// to List. For one-to-one compatibility with "normal" field selectors, only return one value.
263-
// The values may be anything. They will automatically be prefixed with the namespace of the
264-
// given object, if present. The objects passed are guaranteed to be objects of the correct type.
265-
func (i *informers) IndexField(obj runtime.Object, field string, extractValue client.IndexerFunc) error {
266-
informer, err := i.GetInformer(obj)
267-
if err != nil {
268-
return err
269-
}
270-
return indexByField(informer.GetIndexer(), field, extractValue)
271-
}
272-
273-
func indexByField(indexer cache.Indexer, field string, extractor client.IndexerFunc) error {
274-
indexFunc := func(objRaw interface{}) ([]string, error) {
275-
// TODO(directxman12): check if this is the correct type?
276-
obj, isObj := objRaw.(runtime.Object)
277-
if !isObj {
278-
return nil, fmt.Errorf("object of type %T is not an Object", objRaw)
279-
}
280-
meta, err := apimeta.Accessor(obj)
96+
// Construct a new Mapper if unset
97+
if opts.Mapper == nil {
98+
var err error
99+
opts.Mapper, err = apiutil.NewDiscoveryRESTMapper(config)
281100
if err != nil {
101+
log.WithName("setup").Error(err, "Failed to get API Group-Resources")
282102
return nil, err
283103
}
284-
ns := meta.GetNamespace()
285-
286-
rawVals := extractor(obj)
287-
var vals []string
288-
if ns == "" {
289-
// if we're not doubling the keys for the namespaced case, just re-use what was returned to us
290-
vals = rawVals
291-
} else {
292-
// if we need to add non-namespaced versions too, double the length
293-
vals = make([]string, len(rawVals)*2)
294-
}
295-
for i, rawVal := range rawVals {
296-
// save a namespaced variant, so that we can ask
297-
// "what are all the object matching a given index *in a given namespace*"
298-
vals[i] = keyToNamespacedKey(ns, rawVal)
299-
if ns != "" {
300-
// if we have a namespace, also inject a special index key for listing
301-
// regardless of the object namespace
302-
vals[i+len(rawVals)] = keyToNamespacedKey("", rawVal)
303-
}
304-
}
305-
306-
return vals, nil
307-
}
308-
309-
return indexer.AddIndexers(cache.Indexers{fieldIndexName(field): indexFunc})
310-
}
311-
312-
// fieldIndexName constructs the name of the index over the given field,
313-
// for use with an Indexer.
314-
func fieldIndexName(field string) string {
315-
return "field:" + field
316-
}
317-
318-
// keyToNamespacedKey prefixes the given index key with a namespace
319-
// for use in field selector indexes.
320-
func keyToNamespacedKey(ns string, baseKey string) string {
321-
if ns != "" {
322-
return ns + "/" + baseKey
323104
}
324-
return allNamespacesNamespace + "/" + baseKey
325-
}
326105

327-
// objectKeyToStorageKey converts an object key to store key.
328-
// It's akin to MetaNamespaceKeyFunc. It's separate from
329-
// String to allow keeping the key format easily in sync with
330-
// MetaNamespaceKeyFunc.
331-
func objectKeyToStoreKey(k client.ObjectKey) string {
332-
if k.Namespace == "" {
333-
return k.Name
106+
// Default the resync period to 10 hours if unset
107+
if opts.Resync == nil {
108+
r := 10 * time.Hour
109+
opts.Resync = &r
334110
}
335-
return k.Namespace + "/" + k.Name
336-
}
337111

338-
// requiresExactMatch checks if the given field selector is of the form `k=v` or `k==v`.
339-
func requiresExactMatch(sel fields.Selector) (field, val string, required bool) {
340-
reqs := sel.Requirements()
341-
if len(reqs) != 1 {
342-
return "", "", false
343-
}
344-
req := reqs[0]
345-
if req.Operator != selection.Equals && req.Operator != selection.DoubleEquals {
346-
return "", "", false
347-
}
348-
return req.Field, req.Value, true
112+
cp := internal.NewCacheProvider()
113+
ip := internal.NewInformerProvider(config, opts.Scheme, opts.Mapper, *opts.Resync, cp)
114+
cp.SetInformerProvider(ip)
115+
c := &cache{InformerProvider: ip, CacheProvider: cp}
116+
return c, nil
349117
}

0 commit comments

Comments
 (0)