Skip to content

Commit 778577e

Browse files
committed
✨ Conditionally runnable controllers
Implement conditionally runnable controllers that wait and watch for their respective resource to exist in the discovery doc before running the controller. This enables the controller manager to: * Start without error when a CRD the controller watches does not exist. * Begin watching the CRD and start its controller once it isinstalled. * Stop the controller (and the respective informer) upon CRD uninstall. * Restart the controller upon reinstallling the CRD.
1 parent 1eff9a0 commit 778577e

File tree

15 files changed

+462
-28
lines changed

15 files changed

+462
-28
lines changed

go.mod

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,38 @@ module sigs.k8s.io/controller-runtime
33
go 1.15
44

55
require (
6+
github.com/coreos/go-etcd v2.0.0+incompatible // indirect
7+
github.com/cpuguy83/go-md2man v1.0.10 // indirect
8+
github.com/docker/docker v0.7.3-0.20190327010347-be7ac8be2ae0 // indirect
69
github.com/evanphx/json-patch v4.9.0+incompatible
710
github.com/fsnotify/fsnotify v1.4.9
811
github.com/go-logr/logr v0.2.1
912
github.com/go-logr/zapr v0.2.0
1013
github.com/google/go-cmp v0.5.2 // indirect
1114
github.com/googleapis/gnostic v0.5.1 // indirect
15+
github.com/gophercloud/gophercloud v0.1.0 // indirect
1216
github.com/hashicorp/golang-lru v0.5.4 // indirect
1317
github.com/imdario/mergo v0.3.10 // indirect
1418
github.com/onsi/ginkgo v1.14.1
1519
github.com/onsi/gomega v1.10.2
1620
github.com/prometheus/client_golang v1.7.1
1721
github.com/prometheus/client_model v0.2.0
22+
github.com/robfig/cron v1.2.0
23+
github.com/spf13/pflag v1.0.5
24+
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8 // indirect
1825
go.uber.org/goleak v1.1.10
1926
go.uber.org/zap v1.15.0
27+
golang.org/x/text v0.3.3 // indirect
2028
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
2129
gomodules.xyz/jsonpatch/v2 v2.1.0
2230
google.golang.org/appengine v1.6.6 // indirect
31+
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
2332
k8s.io/api v0.19.2
2433
k8s.io/apiextensions-apiserver v0.19.2
2534
k8s.io/apimachinery v0.19.2
2635
k8s.io/client-go v0.19.2
36+
k8s.io/klog v1.0.0 // indirect
2737
k8s.io/utils v0.0.0-20200729134348-d5654de09c73
38+
sigs.k8s.io/structured-merge-diff/v3 v3.0.0 // indirect
2839
sigs.k8s.io/yaml v1.2.0
2940
)

go.sum

Lines changed: 49 additions & 14 deletions
Large diffs are not rendered by default.

pkg/builder/controller.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package builder
1919
import (
2020
"fmt"
2121
"strings"
22+
"time"
2223

2324
"github.com/go-logr/logr"
2425
"k8s.io/apimachinery/pkg/runtime"
@@ -57,9 +58,11 @@ func ControllerManagedBy(m manager.Manager) *Builder {
5758

5859
// ForInput represents the information set by For method.
5960
type ForInput struct {
60-
object runtime.Object
61-
predicates []predicate.Predicate
62-
err error
61+
err error
62+
object runtime.Object
63+
predicates []predicate.Predicate
64+
conditionallyRun bool
65+
waitTime time.Duration
6366
}
6467

6568
// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
@@ -252,6 +255,11 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
252255
}
253256
ctrlOptions.Log = ctrlOptions.Log.WithValues("reconcilerGroup", gvk.Group, "reconcilerKind", gvk.Kind)
254257

258+
if blder.forInput.conditionallyRun {
259+
ctrlOptions.ConditionalOn = &blder.forInput.object
260+
ctrlOptions.ConditionalWaitTime = blder.forInput.waitTime
261+
}
262+
255263
// Build the controller and return.
256264
blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions)
257265
return err

pkg/builder/options.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@ limitations under the License.
1717
package builder
1818

1919
import (
20+
"time"
21+
2022
"sigs.k8s.io/controller-runtime/pkg/predicate"
2123
)
2224

25+
const defaultWaitTime = time.Minute
26+
2327
// {{{ "Functional" Option Interfaces
2428

2529
// ForOption is some configuration that modifies options for a For request.
@@ -75,4 +79,21 @@ var _ ForOption = &Predicates{}
7579
var _ OwnsOption = &Predicates{}
7680
var _ WatchesOption = &Predicates{}
7781

82+
// ConditionallyRun runs the controller
83+
// condtionally on the existence of the forInput object
84+
// in the cluster's discovery doc, letting you start a
85+
// controller manager for a CRD not yet installed on the cluster.
86+
type ConditionallyRun struct {
87+
WaitTime time.Duration
88+
}
89+
90+
func (w ConditionallyRun) ApplyToFor(opts *ForInput) {
91+
opts.conditionallyRun = true
92+
if w.WaitTime == time.Duration(0) {
93+
opts.waitTime = defaultWaitTime
94+
} else {
95+
opts.waitTime = w.WaitTime
96+
}
97+
}
98+
7899
// }}}

pkg/cache/cache.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ type Informers interface {
5858
// of the underlying object.
5959
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)
6060

61+
// Remove the informer for the given object.
62+
Remove(obj runtime.Object) error
63+
6164
// Start runs all the informers known to this cache until the given channel is closed.
6265
// It blocks.
6366
Start(stopCh <-chan struct{}) error

pkg/cache/informertest/fake_cache.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ func (c *FakeInformers) Start(stopCh <-chan struct{}) error {
125125
return c.Error
126126
}
127127

128+
// Remove implements Cache
129+
func (c *FakeInformers) Remove(obj runtime.Object) error {
130+
return c.Error
131+
}
132+
128133
// IndexField implements Cache
129134
func (c *FakeInformers) IndexField(ctx context.Context, obj runtime.Object, field string, extractValue client.IndexerFunc) error {
130135
return nil

pkg/cache/multi_namespace_cache.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,15 @@ func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema
9494
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
9595
}
9696

97+
func (c *multiNamespaceCache) Remove(obj runtime.Object) error {
98+
for _, cache := range c.namespaceToCache {
99+
if err := cache.Remove(obj); err != nil {
100+
return err
101+
}
102+
}
103+
return nil
104+
}
105+
97106
func (c *multiNamespaceCache) Start(stopCh <-chan struct{}) error {
98107
for ns, cache := range c.namespaceToCache {
99108
go func(ns string, cache Cache) {

pkg/controller/controller.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ package controller
1818

1919
import (
2020
"fmt"
21+
"time"
2122

2223
"github.com/go-logr/logr"
24+
"k8s.io/apimachinery/pkg/runtime"
2325
"k8s.io/client-go/util/workqueue"
2426
"sigs.k8s.io/controller-runtime/pkg/handler"
2527
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
@@ -46,6 +48,20 @@ type Options struct {
4648
// Log is the logger used for this controller and passed to each reconciliation
4749
// request via the context field.
4850
Log logr.Logger
51+
52+
// Conditionally is a flag set during controller setup that indicates it should conditionally wait on the
53+
// ForObject to appear in the discovery doc before starting the controller. If set,
54+
// it will set ConditionalObject to the ForObject being reconciled for in the controller builder.
55+
//Conditionally bool
56+
57+
// ConditionalObject is the Object that the manager should wait on to appear
58+
// in the discovery document to indicate to begin running the controller
59+
// (and should stop the informer for this object if it disappears from the discovery doc).
60+
ConditionalOn *runtime.Object
61+
62+
// ConditionalWaitTime is the frequency at which the controller manager should check
63+
// the discovery doc for the existence of the CondtionalOn object.
64+
ConditionalWaitTime time.Duration
4965
}
5066

5167
// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
@@ -123,5 +139,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
123139
SetFields: mgr.SetFields,
124140
Name: name,
125141
Log: options.Log.WithName("controller").WithName(name),
142+
ConditionalOn: options.ConditionalOn,
143+
ConditionalWaitTime: options.ConditionalWaitTime,
126144
}, nil
127145
}

pkg/internal/controller/controller.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"time"
2525

2626
"github.com/go-logr/logr"
27+
"k8s.io/apimachinery/pkg/runtime"
2728
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2829
"k8s.io/apimachinery/pkg/util/wait"
2930
"k8s.io/client-go/util/workqueue"
@@ -46,6 +47,20 @@ type Controller struct {
4647
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
4748
MaxConcurrentReconciles int
4849

50+
// Conditionally is a flag set during controller setup that indicates it should conditionally wait on the
51+
// ForObject to appear in the discovery doc before starting the controller. If set,
52+
// it will set ConditionalObject to the ForObject being reconciled for in the controller builder.
53+
// Conditionally bool
54+
55+
// ConditionalOn is the Object that the manager should wait on to appear
56+
// in the discovery document to indicate to begin running the controller
57+
// (and should stop the informer for this object if it disappears from the discovery doc).
58+
ConditionalOn *runtime.Object
59+
60+
// ConditionalWaitTime is the frequency at which the controller manager should check
61+
// the discovery doc for the existence of the CondtionalOn object.
62+
ConditionalWaitTime time.Duration
63+
4964
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
5065
// ensures that the state of the system matches the state specified in the object.
5166
// Defaults to the DefaultReconcileFunc.
@@ -172,9 +187,14 @@ func (c *Controller) Start(stop <-chan struct{}) error {
172187

173188
// All the watches have been started, we can reset the local slice.
174189
//
175-
// We should never hold watches more than necessary, each watch source can hold a backing cache,
190+
// We should usually hold watches more than necessary, each watch source can hold a backing cache,
176191
// which won't be garbage collected if we hold a reference to it.
177-
c.startWatches = nil
192+
193+
// The exception to this is when the controller is configured as a conditional runnable, in which
194+
// case it needs to knowledge of the watches in the event that the controller is restarted.
195+
if c.ConditionalOn == nil {
196+
c.startWatches = nil
197+
}
178198

179199
if c.JitterPeriod == 0 {
180200
c.JitterPeriod = 1 * time.Second
@@ -200,6 +220,20 @@ func (c *Controller) Start(stop <-chan struct{}) error {
200220
return nil
201221
}
202222

223+
// GetConditionalOn returns the object that startCondtionalRunnables used
224+
// to determine whether a condtional runnable should be started/stopped (based
225+
// on the object's existence in the cluster's discovery doc).
226+
func (c *Controller) GetConditionalOn() *runtime.Object {
227+
return c.ConditionalOn
228+
}
229+
230+
// GetConditionalWaitTime returns the duration for which the
231+
// controller manager should wait on each iteration when checking the
232+
// discovery doc for the ConditionalOn object's existence.
233+
func (c *Controller) GetConditionalWaitTime() time.Duration {
234+
return c.ConditionalWaitTime
235+
}
236+
203237
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
204238
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
205239
func (c *Controller) worker() {

0 commit comments

Comments
 (0)