Skip to content

Commit 20db4b7

Browse files
authored
Merge pull request #15 from pwittrock/client
Refactor packages to a more flattened structure
2 parents 210edd2 + 183f5bc commit 20db4b7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1546
-1096
lines changed

example/main.go

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,55 +22,50 @@ import (
2222
"log"
2323

2424
"github.com/kubernetes-sigs/controller-runtime/pkg/client"
25+
"github.com/kubernetes-sigs/controller-runtime/pkg/client/config"
2526
"github.com/kubernetes-sigs/controller-runtime/pkg/controller"
26-
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/eventhandler"
27-
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/reconcile"
28-
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/source"
27+
"github.com/kubernetes-sigs/controller-runtime/pkg/handler"
28+
"github.com/kubernetes-sigs/controller-runtime/pkg/manager"
29+
"github.com/kubernetes-sigs/controller-runtime/pkg/reconcile"
2930
logf "github.com/kubernetes-sigs/controller-runtime/pkg/runtime/log"
3031
"github.com/kubernetes-sigs/controller-runtime/pkg/runtime/signals"
32+
"github.com/kubernetes-sigs/controller-runtime/pkg/source"
3133
appsv1 "k8s.io/api/apps/v1"
3234
corev1 "k8s.io/api/core/v1"
3335
"k8s.io/apimachinery/pkg/api/errors"
36+
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
3437
)
3538

3639
func main() {
3740
flag.Parse()
3841
logf.SetLogger(logf.ZapLogger(false))
3942

4043
// Setup a Manager
41-
manager, err := controller.NewManager(controller.ManagerArgs{})
44+
mrg, err := manager.New(config.GetConfigOrDie(), manager.Options{})
4245
if err != nil {
4346
log.Fatal(err)
4447
}
4548

4649
// Setup a new controller to Reconcile ReplicaSets
47-
c, err := manager.NewController(
48-
controller.Options{Name: "foo-controller", MaxConcurrentReconciles: 1},
49-
&reconcileReplicaSet{client: manager.GetClient()},
50-
)
50+
c, err := controller.New("foo-controller", mrg, controller.Options{
51+
Reconcile: &reconcileReplicaSet{client: mrg.GetClient()},
52+
})
5153
if err != nil {
5254
log.Fatal(err)
5355
}
5456

55-
err = c.Watch(
56-
// Watch ReplicaSets
57-
&source.KindSource{Type: &appsv1.ReplicaSet{}},
58-
// Enqueue ReplicaSet object key
59-
&eventhandler.EnqueueHandler{})
60-
if err != nil {
57+
// Watch ReplicaSets and enqueue ReplicaSet object key
58+
if err := c.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.Enqueue{}); err != nil {
6159
log.Fatal(err)
6260
}
6361

64-
err = c.Watch(
65-
// Watch Pods
66-
&source.KindSource{Type: &corev1.Pod{}},
67-
// Enqueue Owning ReplicaSet object key
68-
&eventhandler.EnqueueOwnerHandler{OwnerType: &appsv1.ReplicaSet{}, IsController: true})
69-
if err != nil {
62+
// Watch Pods and enqueue owning ReplicaSet key
63+
if err := c.Watch(&source.Kind{Type: &corev1.Pod{}},
64+
&handler.EnqueueOwner{OwnerType: &appsv1.ReplicaSet{}, IsController: true}); err != nil {
7065
log.Fatal(err)
7166
}
7267

73-
log.Fatal(manager.Start(signals.SetupSignalHandler()))
68+
log.Fatal(mrg.Start(signals.SetupSignalHandler()))
7469
}
7570

7671
// reconcileReplicaSet reconciles ReplicaSets

pkg/cache/cache.go

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232

3333
"github.com/kubernetes-sigs/controller-runtime/pkg/client"
3434
logf "github.com/kubernetes-sigs/controller-runtime/pkg/runtime/log"
35+
toolscache "k8s.io/client-go/tools/cache"
3536
)
3637

3738
var log = logf.KBLog.WithName("object-cache")
@@ -43,6 +44,7 @@ var _ client.ReadInterface = &objectCache{}
4344
type objectCache struct {
4445
cachesByType map[reflect.Type]*singleObjectCache
4546
scheme *runtime.Scheme
47+
informers *informers
4648
}
4749

4850
var _ client.ReadInterface = &objectCache{}
@@ -54,7 +56,7 @@ func (o *objectCache) addInformer(gvk schema.GroupVersionKind, c cache.SharedInd
5456
log.Error(err, "could not register informer in objectCache for GVK", "GroupVersionKind", gvk)
5557
return
5658
}
57-
if _, found := o.cacheFor(obj); found {
59+
if o.has(obj) {
5860
return
5961
}
6062
o.registerCache(obj, gvk, c.GetIndexer())
@@ -68,18 +70,57 @@ func (o *objectCache) registerCache(obj runtime.Object, gvk schema.GroupVersionK
6870
}
6971
}
7072

71-
func (o *objectCache) cacheFor(obj runtime.Object) (*singleObjectCache, bool) {
73+
func (o *objectCache) has(obj runtime.Object) bool {
7274
objType := reflect.TypeOf(obj)
75+
_, found := o.cachesByType[objType]
76+
return found
77+
}
78+
79+
func (o *objectCache) init(obj runtime.Object) error {
80+
i, err := o.informers.GetInformer(obj)
81+
if err != nil {
82+
return err
83+
}
84+
if o.informers.started {
85+
log.Info("Waiting to sync cache for type.", "Type", fmt.Sprintf("%T", obj))
86+
toolscache.WaitForCacheSync(o.informers.stop, i.HasSynced)
87+
log.Info("Finished to syncing cache for type.", "Type", fmt.Sprintf("%T", obj))
88+
} else {
89+
return fmt.Errorf("must start Cache before calling Get or List %s %s",
90+
"Object", fmt.Sprintf("%T", obj))
91+
}
92+
return nil
93+
}
94+
95+
func (o *objectCache) cacheFor(obj runtime.Object) (*singleObjectCache, error) {
96+
if !o.informers.started {
97+
return nil, fmt.Errorf("must start Cache before calling Get or List %s %s",
98+
"Object", fmt.Sprintf("%T", obj))
99+
}
100+
objType := reflect.TypeOf(obj)
101+
73102
cache, isKnown := o.cachesByType[objType]
74-
return cache, isKnown
103+
if !isKnown {
104+
return nil, fmt.Errorf("no Cache found for %T - must call GetInformer", obj)
105+
}
106+
return cache, nil
75107
}
76108

77109
// Get implements populatingClient.ReadInterface
78110
func (o *objectCache) Get(ctx context.Context, key client.ObjectKey, out runtime.Object) error {
79-
cache, isKnown := o.cacheFor(out)
80-
if !isKnown {
81-
return fmt.Errorf("no cache for objects of type %T, must have asked for an watch/informer first", out)
111+
// Make sure there is a Cache for this type
112+
if !o.has(out) {
113+
err := o.init(out)
114+
if err != nil {
115+
return err
116+
}
117+
}
118+
119+
cache, err := o.cacheFor(out)
120+
if err != nil {
121+
return err
82122
}
123+
83124
return cache.Get(ctx, key, out)
84125
}
85126

@@ -89,6 +130,15 @@ func (o *objectCache) List(ctx context.Context, opts *client.ListOptions, out ru
89130
if err != nil {
90131
return nil
91132
}
133+
134+
ro, ok := itemsPtr.(runtime.Object)
135+
if ok && !o.has(ro) {
136+
err = o.init(ro)
137+
if err != nil {
138+
return err
139+
}
140+
}
141+
92142
// http://knowyourmeme.com/memes/this-is-fine
93143
outType := reflect.Indirect(reflect.ValueOf(itemsPtr)).Type().Elem()
94144
cache, isKnown := o.cachesByType[outType]

pkg/cache/cache_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,9 @@ var _ = Describe("Indexers", func() {
9090
var singleCache client.ReadInterface
9191

9292
BeforeEach(func() {
93-
var ok bool
94-
singleCache, ok = multiCache.cacheFor(&kapi.Pod{})
95-
Expect(ok).To(BeTrue())
93+
var err error
94+
singleCache, err = multiCache.cacheFor(&kapi.Pod{})
95+
Expect(err).NotTo(HaveOccurred())
9696
})
9797

9898
It("should be able to fetch a particular object by key", func() {

pkg/cache/informer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ func New(config *rest.Config, opts Options) (Cache, error) {
9292
informersByGVK: make(map[schema.GroupVersionKind]cache.SharedIndexInformer),
9393
resync: *opts.Resync,
9494
}
95+
i.objectCache.informers = i
9596

9697
return i, nil
9798
}

0 commit comments

Comments
 (0)