Skip to content

⚠️ Add predicates as variadic args for Owns, For, and Watches func #799

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 63 additions & 27 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ var getGvk = apiutil.GVKForObject

// Builder builds a Controller.
type Builder struct {
apiType runtime.Object
mgr manager.Manager
predicates []predicate.Predicate
managedObjects []runtime.Object
watchRequest []watchRequest
config *rest.Config
ctrl controller.Controller
ctrlOptions controller.Options
name string
forInput ForInput
ownsInput []OwnsInput
watchesInput []WatchesInput
mgr manager.Manager
globalPredicates []predicate.Predicate
config *rest.Config
ctrl controller.Controller
ctrlOptions controller.Options
name string
}

// ControllerManagedBy returns a new controller builder that will be started by the provided Manager
Expand All @@ -63,32 +63,62 @@ func (blder *Builder) ForType(apiType runtime.Object) *Builder {
return blder.For(apiType)
}

// ForInput represents the information set by For method.
type ForInput struct {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get away without exporting this type?
Likewise for OwnsInput and WatchesInput?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to allow custom implementations of options (which is nice), this has to be public

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so that others can implement ForOption.ApplyToFor and company. I see.

object runtime.Object
predicates []predicate.Predicate
}

// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
// update events by *reconciling the object*.
// This is the equivalent of calling
// Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{})
func (blder *Builder) For(apiType runtime.Object) *Builder {
blder.apiType = apiType
func (blder *Builder) For(object runtime.Object, opts ...ForOption) *Builder {
input := ForInput{object: object}
for _, opt := range opts {
opt.ApplyToFor(&input)
}

blder.forInput = input
return blder
}

// OwnsInput represents the information set by Owns method.
type OwnsInput struct {
object runtime.Object
predicates []predicate.Predicate
}

// Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to
// create / delete / update events by *reconciling the owner object*. This is the equivalent of calling
// Watches(&source.Kind{Type: <ForType-apiType>}, &handler.EnqueueRequestForOwner{OwnerType: apiType, IsController: true})
func (blder *Builder) Owns(apiType runtime.Object) *Builder {
blder.managedObjects = append(blder.managedObjects, apiType)
// Watches(&source.Kind{Type: <ForType-forInput>}, &handler.EnqueueRequestForOwner{OwnerType: apiType, IsController: true})
func (blder *Builder) Owns(object runtime.Object, opts ...OwnsOption) *Builder {
input := OwnsInput{object: object}
for _, opt := range opts {
opt.ApplyToOwns(&input)
}

blder.ownsInput = append(blder.ownsInput, input)
return blder
}

type watchRequest struct {
// WatchesInput represents the information set by Watches method.
type WatchesInput struct {
src source.Source
eventhandler handler.EventHandler
predicates []predicate.Predicate
}

// Watches exposes the lower-level ControllerManagedBy Watches functions through the builder. Consider using
// Owns or For instead of Watches directly.
func (blder *Builder) Watches(src source.Source, eventhandler handler.EventHandler) *Builder {
blder.watchRequest = append(blder.watchRequest, watchRequest{src: src, eventhandler: eventhandler})
// Specified predicates are registered only for given source.
func (blder *Builder) Watches(src source.Source, eventhandler handler.EventHandler, opts ...WatchesOption) *Builder {
input := WatchesInput{src: src, eventhandler: eventhandler}
for _, opt := range opts {
opt.ApplyToWatches(&input)
}

blder.watchesInput = append(blder.watchesInput, input)
return blder
}

Expand All @@ -102,9 +132,10 @@ func (blder *Builder) WithConfig(config *rest.Config) *Builder {

// WithEventFilter sets the event filters, to filter which create/update/delete/generic events eventually
// trigger reconciliations. For example, filtering on whether the resource version has changed.
// Given predicate is added for all watched objects.
// Defaults to the empty list.
func (blder *Builder) WithEventFilter(p predicate.Predicate) *Builder {
blder.predicates = append(blder.predicates, p)
blder.globalPredicates = append(blder.globalPredicates, p)
return blder
}

Expand Down Expand Up @@ -157,28 +188,33 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro

func (blder *Builder) doWatch() error {
// Reconcile type
src := &source.Kind{Type: blder.apiType}
src := &source.Kind{Type: blder.forInput.object}
hdler := &handler.EnqueueRequestForObject{}
err := blder.ctrl.Watch(src, hdler, blder.predicates...)
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
err := blder.ctrl.Watch(src, hdler, allPredicates...)
if err != nil {
return err
}

// Watches the managed types
for _, obj := range blder.managedObjects {
src := &source.Kind{Type: obj}
for _, own := range blder.ownsInput {
src := &source.Kind{Type: own.object}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.apiType,
OwnerType: blder.forInput.object,
IsController: true,
}
if err := blder.ctrl.Watch(src, hdler, blder.predicates...); err != nil {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
}

// Do the watch requests
for _, w := range blder.watchRequest {
if err := blder.ctrl.Watch(w.src, w.eventhandler, blder.predicates...); err != nil {
for _, w := range blder.watchesInput {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
return err
}

Expand All @@ -196,7 +232,7 @@ func (blder *Builder) getControllerName() (string, error) {
if blder.name != "" {
return blder.name, nil
}
gvk, err := getGvk(blder.apiType, blder.mgr.GetScheme())
gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())
if err != nil {
return "", err
}
Expand Down
63 changes: 63 additions & 0 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/scheme"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand Down Expand Up @@ -180,6 +182,67 @@ var _ = Describe("application", func() {
close(done)
}, 10)
})

Describe("Set custom predicates", func() {
It("should execute registered predicates only for assigned kind", func(done Done) {
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

var (
deployPrctExecuted = false
replicaSetPrctExecuted = false
allPrctExecuted = 0
)

deployPrct := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
defer GinkgoRecover()
// check that it was called only for deployment
Expect(e.Meta).To(BeAssignableToTypeOf(&appsv1.Deployment{}))
deployPrctExecuted = true
return true
},
}

replicaSetPrct := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
defer GinkgoRecover()
// check that it was called only for replicaset
Expect(e.Meta).To(BeAssignableToTypeOf(&appsv1.ReplicaSet{}))
replicaSetPrctExecuted = true
return true
},
}

allPrct := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
defer GinkgoRecover()
//check that it was called for all registered kinds
Expect(e.Meta).Should(Or(
BeAssignableToTypeOf(&appsv1.Deployment{}),
BeAssignableToTypeOf(&appsv1.ReplicaSet{}),
))

allPrctExecuted++
return true
},
}

bldr := ControllerManagedBy(m).
For(&appsv1.Deployment{}, WithPredicates(deployPrct)).
Owns(&appsv1.ReplicaSet{}, WithPredicates(replicaSetPrct)).
WithEventFilter(allPrct)

doReconcileTest("5", stop, bldr, m, true)

Expect(deployPrctExecuted).To(BeTrue(), "Deploy predicated should be called at least once")
Expect(replicaSetPrctExecuted).To(BeTrue(), "ReplicaSet predicated should be called at least once")
Expect(allPrctExecuted).To(BeNumerically(">=", 2), "Global Predicated should be called at least twice")

close(done)
})
})

})

func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr manager.Manager, complete bool) {
Expand Down
72 changes: 72 additions & 0 deletions pkg/builder/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package builder

import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// {{{ "Functional" Option Interfaces

// ForOption is some configuration that modifies options for a For request.
type ForOption interface {
// ApplyToFor applies this configuration to the given for input.
ApplyToFor(*ForInput)
}

// OwnsOption is some configuration that modifies options for a owns request.
type OwnsOption interface {
// ApplyToOwns applies this configuration to the given owns input.
ApplyToOwns(*OwnsInput)
}

// WatchesOption is some configuration that modifies options for a watches request.
type WatchesOption interface {
// ApplyToWatches applies this configuration to the given watches options.
ApplyToWatches(*WatchesInput)
}

// }}}

// {{{ Multi-Type Options

// WithPredicates sets the given predicates list.
func WithPredicates(predicates ...predicate.Predicate) Predicates {
return Predicates{
predicates: predicates,
}
}

type Predicates struct {
predicates []predicate.Predicate
}

func (w Predicates) ApplyToFor(opts *ForInput) {
opts.predicates = w.predicates
}
func (w Predicates) ApplyToOwns(opts *OwnsInput) {
opts.predicates = w.predicates
}
func (w Predicates) ApplyToWatches(opts *WatchesInput) {
opts.predicates = w.predicates
}

var _ ForOption = &Predicates{}
var _ OwnsOption = &Predicates{}
var _ WatchesOption = &Predicates{}

// }}}