Skip to content

Refactor packages to a more flattened structure #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 11, 2018
37 changes: 16 additions & 21 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,55 +22,50 @@ import (
"log"

"github.com/kubernetes-sigs/controller-runtime/pkg/client"
"github.com/kubernetes-sigs/controller-runtime/pkg/client/config"
"github.com/kubernetes-sigs/controller-runtime/pkg/controller"
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/eventhandler"
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/reconcile"
"github.com/kubernetes-sigs/controller-runtime/pkg/controller/source"
"github.com/kubernetes-sigs/controller-runtime/pkg/handler"
"github.com/kubernetes-sigs/controller-runtime/pkg/manager"
"github.com/kubernetes-sigs/controller-runtime/pkg/reconcile"
logf "github.com/kubernetes-sigs/controller-runtime/pkg/runtime/log"
"github.com/kubernetes-sigs/controller-runtime/pkg/runtime/signals"
"github.com/kubernetes-sigs/controller-runtime/pkg/source"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)

func main() {
flag.Parse()
logf.SetLogger(logf.ZapLogger(false))

// Setup a Manager
manager, err := controller.NewManager(controller.ManagerArgs{})
mrg, err := manager.New(config.GetConfigOrDie(), manager.Options{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: mrg --> mgr

if err != nil {
log.Fatal(err)
}

// Setup a new controller to Reconcile ReplicaSets
c, err := manager.NewController(
controller.Options{Name: "foo-controller", MaxConcurrentReconciles: 1},
&reconcileReplicaSet{client: manager.GetClient()},
)
c, err := controller.New("foo-controller", mrg, controller.Options{
Reconcile: &reconcileReplicaSet{client: mrg.GetClient()},
})
if err != nil {
log.Fatal(err)
}

err = c.Watch(
// Watch ReplicaSets
&source.KindSource{Type: &appsv1.ReplicaSet{}},
// Enqueue ReplicaSet object key
&eventhandler.EnqueueHandler{})
if err != nil {
// Watch ReplicaSets and enqueue ReplicaSet object key
if err := c.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.Enqueue{}); err != nil {
log.Fatal(err)
}

err = c.Watch(
// Watch Pods
&source.KindSource{Type: &corev1.Pod{}},
// Enqueue Owning ReplicaSet object key
&eventhandler.EnqueueOwnerHandler{OwnerType: &appsv1.ReplicaSet{}, IsController: true})
if err != nil {
// Watch Pods and enqueue owning ReplicaSet key
if err := c.Watch(&source.Kind{Type: &corev1.Pod{}},
&handler.EnqueueOwner{OwnerType: &appsv1.ReplicaSet{}, IsController: true}); err != nil {
log.Fatal(err)
}

log.Fatal(manager.Start(signals.SetupSignalHandler()))
log.Fatal(mrg.Start(signals.SetupSignalHandler()))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good from end-user's perspective.

// reconcileReplicaSet reconciles ReplicaSets
Expand Down
62 changes: 56 additions & 6 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/kubernetes-sigs/controller-runtime/pkg/client"
logf "github.com/kubernetes-sigs/controller-runtime/pkg/runtime/log"
toolscache "k8s.io/client-go/tools/cache"
)

var log = logf.KBLog.WithName("object-cache")
Expand All @@ -43,6 +44,7 @@ var _ client.ReadInterface = &objectCache{}
type objectCache struct {
cachesByType map[reflect.Type]*singleObjectCache
scheme *runtime.Scheme
informers *informers
}

var _ client.ReadInterface = &objectCache{}
Expand All @@ -54,7 +56,7 @@ func (o *objectCache) addInformer(gvk schema.GroupVersionKind, c cache.SharedInd
log.Error(err, "could not register informer in objectCache for GVK", "GroupVersionKind", gvk)
return
}
if _, found := o.cacheFor(obj); found {
if o.has(obj) {
return
}
o.registerCache(obj, gvk, c.GetIndexer())
Expand All @@ -68,18 +70,57 @@ func (o *objectCache) registerCache(obj runtime.Object, gvk schema.GroupVersionK
}
}

func (o *objectCache) cacheFor(obj runtime.Object) (*singleObjectCache, bool) {
func (o *objectCache) has(obj runtime.Object) bool {
objType := reflect.TypeOf(obj)
_, found := o.cachesByType[objType]
return found
}

func (o *objectCache) init(obj runtime.Object) error {
i, err := o.informers.GetInformer(obj)
if err != nil {
return err
}
if o.informers.started {
log.Info("Waiting to sync cache for type.", "Type", fmt.Sprintf("%T", obj))
toolscache.WaitForCacheSync(o.informers.stop, i.HasSynced)
log.Info("Finished to syncing cache for type.", "Type", fmt.Sprintf("%T", obj))
} else {
return fmt.Errorf("must start Cache before calling Get or List %s %s",
"Object", fmt.Sprintf("%T", obj))
}
return nil
}

func (o *objectCache) cacheFor(obj runtime.Object) (*singleObjectCache, error) {
if !o.informers.started {
return nil, fmt.Errorf("must start Cache before calling Get or List %s %s",
"Object", fmt.Sprintf("%T", obj))
}
objType := reflect.TypeOf(obj)

cache, isKnown := o.cachesByType[objType]
return cache, isKnown
if !isKnown {
return nil, fmt.Errorf("no Cache found for %T - must call GetInformer", obj)
}
return cache, nil
}

// Get implements populatingClient.ReadInterface
func (o *objectCache) Get(ctx context.Context, key client.ObjectKey, out runtime.Object) error {
cache, isKnown := o.cacheFor(out)
if !isKnown {
return fmt.Errorf("no cache for objects of type %T, must have asked for an watch/informer first", out)
// Make sure there is a Cache for this type
if !o.has(out) {
err := o.init(out)
if err != nil {
return err
}
}

cache, err := o.cacheFor(out)
if err != nil {
return err
}

return cache.Get(ctx, key, out)
}

Expand All @@ -89,6 +130,15 @@ func (o *objectCache) List(ctx context.Context, opts *client.ListOptions, out ru
if err != nil {
return nil
}

ro, ok := itemsPtr.(runtime.Object)
if ok && !o.has(ro) {
err = o.init(ro)
if err != nil {
return err
}
}

// http://knowyourmeme.com/memes/this-is-fine
outType := reflect.Indirect(reflect.ValueOf(itemsPtr)).Type().Elem()
cache, isKnown := o.cachesByType[outType]
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ var _ = Describe("Indexers", func() {
var singleCache client.ReadInterface

BeforeEach(func() {
var ok bool
singleCache, ok = multiCache.cacheFor(&kapi.Pod{})
Expect(ok).To(BeTrue())
var err error
singleCache, err = multiCache.cacheFor(&kapi.Pod{})
Expect(err).NotTo(HaveOccurred())
})

It("should be able to fetch a particular object by key", func() {
Expand Down
1 change: 1 addition & 0 deletions pkg/cache/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func New(config *rest.Config, opts Options) (Cache, error) {
informersByGVK: make(map[schema.GroupVersionKind]cache.SharedIndexInformer),
resync: *opts.Resync,
}
i.objectCache.informers = i

return i, nil
}
Expand Down
Loading