Skip to content

Commit 3986790

Browse files
author
Eric Stroczynski
authored
handler,predicate: pause object reconciliation by annotation (#60)
* handler,predicate: add NewPause(key) to pause a controller by event handler or predicate via an annotation with key string key, respectively internal/annotation: add generic library for creating predicates and event handlers for arbitrary annotation keys Signed-off-by: Eric Stroczynski <[email protected]> * handler: create test registry for instrumented handler Signed-off-by: Eric Stroczynski <[email protected]>
1 parent ec654a6 commit 3986790

File tree

9 files changed

+1057
-7
lines changed

9 files changed

+1057
-7
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/operator-framework/operator-lib
33
go 1.15
44

55
require (
6+
github.com/go-logr/logr v0.4.0
67
github.com/onsi/ginkgo v1.16.4
78
github.com/onsi/gomega v1.13.0
89
github.com/operator-framework/api v0.10.0

handler/example_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright 2021 The Operator-SDK Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package handler_test
16+
17+
import (
18+
"context"
19+
"os"
20+
21+
"github.com/operator-framework/operator-lib/handler"
22+
v1 "k8s.io/api/core/v1"
23+
"sigs.k8s.io/controller-runtime/pkg/client/config"
24+
"sigs.k8s.io/controller-runtime/pkg/controller"
25+
"sigs.k8s.io/controller-runtime/pkg/manager"
26+
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
27+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
28+
"sigs.k8s.io/controller-runtime/pkg/source"
29+
)
30+
31+
// This example applies the Pause handler to all incoming Pod events on a Pod controller.
32+
func ExampleNewPause() {
33+
cfg, err := config.GetConfig()
34+
if err != nil {
35+
os.Exit(1)
36+
}
37+
38+
mgr, err := manager.New(cfg, manager.Options{})
39+
if err != nil {
40+
os.Exit(1)
41+
}
42+
43+
c, err := controller.NewUnmanaged("pod", mgr, controller.Options{
44+
Reconciler: reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
45+
return reconcile.Result{}, nil
46+
}),
47+
})
48+
if err != nil {
49+
os.Exit(1)
50+
}
51+
52+
// Filter out Pods with the "my.app/paused: true" annotation.
53+
pause, err := handler.NewPause("my.app/paused")
54+
if err != nil {
55+
os.Exit(1)
56+
}
57+
if err := c.Watch(&source.Kind{Type: &v1.Pod{}}, pause); err != nil {
58+
os.Exit(1)
59+
}
60+
61+
<-mgr.Elected()
62+
63+
if err := c.Start(signals.SetupSignalHandler()); err != nil {
64+
os.Exit(1)
65+
}
66+
}

handler/instrumented_enqueue_object_test.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,29 @@
1515
package handler
1616

1717
import (
18+
"github.com/operator-framework/operator-lib/handler/internal/metrics"
19+
1820
. "github.com/onsi/ginkgo"
1921
. "github.com/onsi/gomega"
22+
"github.com/prometheus/client_golang/prometheus"
2023
dto "github.com/prometheus/client_model/go"
2124
corev1 "k8s.io/api/core/v1"
2225
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2326
"k8s.io/apimachinery/pkg/types"
27+
"k8s.io/client-go/util/workqueue"
2428
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
2529
"sigs.k8s.io/controller-runtime/pkg/event"
26-
"sigs.k8s.io/controller-runtime/pkg/metrics"
2730
"sigs.k8s.io/controller-runtime/pkg/reconcile"
28-
29-
"k8s.io/client-go/util/workqueue"
3031
)
3132

3233
var _ = Describe("InstrumentedEnqueueRequestForObject", func() {
3334
var q workqueue.RateLimitingInterface
3435
var instance InstrumentedEnqueueRequestForObject
3536
var pod *corev1.Pod
3637

38+
registry := prometheus.NewRegistry()
39+
registry.MustRegister(metrics.ResourceCreatedAt)
40+
3741
BeforeEach(func() {
3842
q = controllertest.Queue{Interface: workqueue.New()}
3943
instance = InstrumentedEnqueueRequestForObject{}
@@ -69,7 +73,7 @@ var _ = Describe("InstrumentedEnqueueRequestForObject", func() {
6973
}))
7074

7175
// verify metrics
72-
gauges, err := metrics.Registry.Gather()
76+
gauges, err := registry.Gather()
7377
Expect(err).NotTo(HaveOccurred())
7478
Expect(len(gauges)).To(Equal(1))
7579
assertMetrics(gauges[0], 1, []*corev1.Pod{pod})
@@ -104,7 +108,7 @@ var _ = Describe("InstrumentedEnqueueRequestForObject", func() {
104108
}))
105109

106110
// verify metrics
107-
gauges, err := metrics.Registry.Gather()
111+
gauges, err := registry.Gather()
108112
Expect(err).NotTo(HaveOccurred())
109113
Expect(len(gauges)).To(Equal(0))
110114
})
@@ -129,7 +133,7 @@ var _ = Describe("InstrumentedEnqueueRequestForObject", func() {
129133
}))
130134

131135
// verify metrics
132-
gauges, err := metrics.Registry.Gather()
136+
gauges, err := registry.Gather()
133137
Expect(err).NotTo(HaveOccurred())
134138
Expect(len(gauges)).To(Equal(0))
135139
})
@@ -164,7 +168,7 @@ var _ = Describe("InstrumentedEnqueueRequestForObject", func() {
164168
}))
165169

166170
// verify metrics
167-
gauges, err := metrics.Registry.Gather()
171+
gauges, err := registry.Gather()
168172
Expect(err).NotTo(HaveOccurred())
169173
Expect(len(gauges)).To(Equal(1))
170174
assertMetrics(gauges[0], 2, []*corev1.Pod{newpod, pod})

handler/pause.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright 2021 The Operator-SDK Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package handler
16+
17+
import (
18+
"github.com/operator-framework/operator-lib/internal/annotation"
19+
20+
"sigs.k8s.io/controller-runtime/pkg/handler"
21+
)
22+
23+
// NewPause returns an event handler that filters out objects with a truthy "paused" annotation.
24+
// When an annotation with key string key is present on an object and has a truthy value, ex. "true",
25+
// the watch constructed with this event handler will not add events for that object to the queue.
26+
// Key string key must be a valid annotation key.
27+
//
28+
// A note on security: since users that can CRUD a particular API can apply or remove annotations with
29+
// default cluster admission controllers, this same set of users can therefore start or stop reconciliation
30+
// of objects via this pause mechanism. If this is a concern, configure an admission webhook to enforce
31+
// a stricter annotation modification policy. See AdmissionReview configuration for user info available
32+
// to a webhook:
33+
// https://kubernetes.io/docs/reference/access-authn-authz/extensible-admission-controllers/#request
34+
func NewPause(key string) (handler.EventHandler, error) {
35+
return annotation.NewFalsyEventHandler(key, annotation.Options{Log: log})
36+
}

internal/annotation/filter.go

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
// Copyright 2021 The Operator-SDK Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package annotation contains event handler and predicate builders for annotations.
16+
// There are two types of builders:
17+
//
18+
// - Falsy builders result in objects being queued if the annotation is not present OR contains a falsy value.
19+
// - Truthy builders are the falsy complement: objects will be enqueued if the annotation is present AND contains a truthy value.
20+
//
21+
// Truthiness/falsiness is determined by Go's strconv.ParseBool().
22+
package annotation
23+
24+
import (
25+
"fmt"
26+
"strconv"
27+
28+
"github.com/go-logr/logr"
29+
"k8s.io/apimachinery/pkg/api/validation"
30+
"k8s.io/apimachinery/pkg/util/validation/field"
31+
"k8s.io/client-go/util/workqueue"
32+
"sigs.k8s.io/controller-runtime/pkg/client"
33+
"sigs.k8s.io/controller-runtime/pkg/event"
34+
"sigs.k8s.io/controller-runtime/pkg/handler"
35+
logf "sigs.k8s.io/controller-runtime/pkg/log"
36+
"sigs.k8s.io/controller-runtime/pkg/predicate"
37+
)
38+
39+
// Options configures a filter.
40+
type Options struct {
41+
Log logr.Logger
42+
43+
// Internally set.
44+
truthy bool
45+
}
46+
47+
// NewFalsyPredicate returns a predicate that passes objects
48+
// that do not have annotation with key string key or whose value is falsy.
49+
func NewFalsyPredicate(key string, opts Options) (predicate.Predicate, error) {
50+
opts.truthy = false
51+
return newFilter(key, opts)
52+
}
53+
54+
// NewFalsyEventHandler returns an event handler that enqueues objects
55+
// that do not have annotation with key string key or whose value is falsy.
56+
func NewFalsyEventHandler(key string, opts Options) (handler.EventHandler, error) {
57+
opts.truthy = false
58+
return newEventHandler(key, opts)
59+
}
60+
61+
// NewTruthyPredicate returns a predicate that passes objects
62+
// that do have annotation with key string key and whose value is truthy.
63+
func NewTruthyPredicate(key string, opts Options) (predicate.Predicate, error) {
64+
opts.truthy = true
65+
return newFilter(key, opts)
66+
}
67+
68+
// NewTruthyEventHandler returns an event handler that enqueues objects
69+
// that do have annotation with key string key and whose value is truthy.
70+
func NewTruthyEventHandler(key string, opts Options) (handler.EventHandler, error) {
71+
opts.truthy = true
72+
return newEventHandler(key, opts)
73+
}
74+
75+
func defaultOptions(opts *Options) {
76+
if opts.Log == nil {
77+
opts.Log = logf.Log
78+
}
79+
}
80+
81+
// newEventHandler returns a filter for use as an event handler.
82+
func newEventHandler(key string, opts Options) (handler.EventHandler, error) {
83+
f, err := newFilter(key, opts)
84+
if err != nil {
85+
return nil, err
86+
}
87+
88+
f.hdlr = &handler.EnqueueRequestForObject{}
89+
return handler.Funcs{
90+
CreateFunc: func(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
91+
if f.Create(evt) {
92+
f.hdlr.Create(evt, q)
93+
}
94+
},
95+
UpdateFunc: func(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
96+
if f.Update(evt) {
97+
f.hdlr.Update(evt, q)
98+
}
99+
},
100+
DeleteFunc: func(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
101+
if f.Delete(evt) {
102+
f.hdlr.Delete(evt, q)
103+
}
104+
},
105+
GenericFunc: func(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
106+
if f.Generic(evt) {
107+
f.hdlr.Generic(evt, q)
108+
}
109+
},
110+
}, nil
111+
}
112+
113+
// newFilter returns a filter for use as a predicate.
114+
func newFilter(key string, opts Options) (*filter, error) {
115+
defaultOptions(&opts)
116+
117+
// Make sure the annotation key and eventual value are valid together.
118+
if err := validateAnnotation(key, opts.truthy); err != nil {
119+
return nil, err
120+
}
121+
122+
f := filter{}
123+
f.key = key
124+
// Falsy filters return true in all cases except when the annotation is present and true.
125+
// Truthy filters only return true when the annotation is present and true.
126+
f.ret = !opts.truthy
127+
f.log = opts.Log.WithName("pause")
128+
return &f, nil
129+
}
130+
131+
func validateAnnotation(key string, truthy bool) error {
132+
fldPath := field.NewPath("metadata", "annotations")
133+
annotation := map[string]string{key: fmt.Sprintf("%v", truthy)}
134+
return validation.ValidateAnnotations(annotation, fldPath).ToAggregate()
135+
}
136+
137+
// filter implements a filter for objects with a truthy "paused" annotation (see Key).
138+
// When this annotation is removed or value does not evaluate to "true",
139+
// the controller will see events from these objects again.
140+
type filter struct {
141+
key string
142+
ret bool
143+
log logr.Logger
144+
hdlr *handler.EnqueueRequestForObject
145+
}
146+
147+
// Create implements predicate.Predicate.Create().
148+
func (f *filter) Create(evt event.CreateEvent) bool {
149+
if evt.Object == nil {
150+
if f.hdlr == nil {
151+
f.log.Error(nil, "CreateEvent received with no metadata", "event", evt)
152+
}
153+
return f.ret
154+
}
155+
return f.run(evt.Object)
156+
}
157+
158+
// Update implements predicate.Predicate.Update().
159+
func (f *filter) Update(evt event.UpdateEvent) bool {
160+
if evt.ObjectNew != nil {
161+
return f.run(evt.ObjectNew)
162+
} else if evt.ObjectOld != nil {
163+
return f.run(evt.ObjectOld)
164+
}
165+
if f.hdlr == nil {
166+
f.log.Error(nil, "UpdateEvent received with no metadata", "event", evt)
167+
}
168+
return f.ret
169+
}
170+
171+
// Delete implements predicate.Predicate.Delete().
172+
func (f *filter) Delete(evt event.DeleteEvent) bool {
173+
if evt.Object == nil {
174+
if f.hdlr == nil {
175+
f.log.Error(nil, "DeleteEvent received with no metadata", "event", evt)
176+
}
177+
return f.ret
178+
}
179+
return f.run(evt.Object)
180+
}
181+
182+
// Generic implements predicate.Predicate.Generic().
183+
func (f *filter) Generic(evt event.GenericEvent) bool {
184+
if evt.Object == nil {
185+
if f.hdlr == nil {
186+
f.log.Error(nil, "GenericEvent received with no metadata", "event", evt)
187+
}
188+
return f.ret
189+
}
190+
return f.run(evt.Object)
191+
}
192+
193+
func (f *filter) run(obj client.Object) bool {
194+
annotations := obj.GetAnnotations()
195+
if len(annotations) == 0 {
196+
return f.ret
197+
}
198+
annoStr, hasAnno := annotations[f.key]
199+
if !hasAnno {
200+
return f.ret
201+
}
202+
annoBool, err := strconv.ParseBool(annoStr)
203+
if err != nil {
204+
f.log.Error(err, "Bad annotation value", "key", f.key, "value", annoStr)
205+
return f.ret
206+
}
207+
// If the filter is falsy (f.ret == true) and value is false, then the object passes the filter.
208+
// If the filter is truthy (f.ret == false) and value is true, then the object passes the filter.
209+
return !annoBool == f.ret
210+
}

0 commit comments

Comments
 (0)