Skip to content

Commit 052b938

Browse files
committed
✨ Conditionally runnable controllers
Implement conditionally runnable controllers that wait and watch for their respective resource to exist in the discovery doc before running the controller. This enables the controller manager to: * Start without error when a CRD the controller watches does not exist. * Begin watching the CRD and start its controller once it isinstalled. * Stop the controller (and the respective informer) upon CRD uninstall. * Restart the controller upon reinstallling the CRD.
1 parent 1eff9a0 commit 052b938

File tree

15 files changed

+452
-28
lines changed

15 files changed

+452
-28
lines changed

go.mod

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,38 @@ module sigs.k8s.io/controller-runtime
33
go 1.15
44

55
require (
6+
github.com/coreos/go-etcd v2.0.0+incompatible // indirect
7+
github.com/cpuguy83/go-md2man v1.0.10 // indirect
8+
github.com/docker/docker v0.7.3-0.20190327010347-be7ac8be2ae0 // indirect
69
github.com/evanphx/json-patch v4.9.0+incompatible
710
github.com/fsnotify/fsnotify v1.4.9
811
github.com/go-logr/logr v0.2.1
912
github.com/go-logr/zapr v0.2.0
1013
github.com/google/go-cmp v0.5.2 // indirect
1114
github.com/googleapis/gnostic v0.5.1 // indirect
15+
github.com/gophercloud/gophercloud v0.1.0 // indirect
1216
github.com/hashicorp/golang-lru v0.5.4 // indirect
1317
github.com/imdario/mergo v0.3.10 // indirect
1418
github.com/onsi/ginkgo v1.14.1
1519
github.com/onsi/gomega v1.10.2
1620
github.com/prometheus/client_golang v1.7.1
1721
github.com/prometheus/client_model v0.2.0
22+
github.com/robfig/cron v1.2.0
23+
github.com/spf13/pflag v1.0.5
24+
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8 // indirect
1825
go.uber.org/goleak v1.1.10
1926
go.uber.org/zap v1.15.0
27+
golang.org/x/text v0.3.3 // indirect
2028
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
2129
gomodules.xyz/jsonpatch/v2 v2.1.0
2230
google.golang.org/appengine v1.6.6 // indirect
31+
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
2332
k8s.io/api v0.19.2
2433
k8s.io/apiextensions-apiserver v0.19.2
2534
k8s.io/apimachinery v0.19.2
2635
k8s.io/client-go v0.19.2
36+
k8s.io/klog v1.0.0 // indirect
2737
k8s.io/utils v0.0.0-20200729134348-d5654de09c73
38+
sigs.k8s.io/structured-merge-diff/v3 v3.0.0 // indirect
2839
sigs.k8s.io/yaml v1.2.0
2940
)

go.sum

Lines changed: 49 additions & 14 deletions
Large diffs are not rendered by default.

pkg/builder/controller.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package builder
1919
import (
2020
"fmt"
2121
"strings"
22+
"time"
2223

2324
"github.com/go-logr/logr"
2425
"k8s.io/apimachinery/pkg/runtime"
@@ -57,9 +58,11 @@ func ControllerManagedBy(m manager.Manager) *Builder {
5758

5859
// ForInput represents the information set by For method.
5960
type ForInput struct {
60-
object runtime.Object
61-
predicates []predicate.Predicate
62-
err error
61+
object runtime.Object
62+
predicates []predicate.Predicate
63+
err error
64+
conditionallyRun bool
65+
waitTime time.Duration
6366
}
6467

6568
// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
@@ -252,6 +255,11 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
252255
}
253256
ctrlOptions.Log = ctrlOptions.Log.WithValues("reconcilerGroup", gvk.Group, "reconcilerKind", gvk.Kind)
254257

258+
if blder.forInput.conditionallyRun {
259+
ctrlOptions.ConditionalOn = &blder.forInput.object
260+
ctrlOptions.ConditionalWaitTime = blder.forInput.waitTime
261+
}
262+
255263
// Build the controller and return.
256264
blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions)
257265
return err

pkg/builder/options.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@ limitations under the License.
1717
package builder
1818

1919
import (
20+
"time"
21+
2022
"sigs.k8s.io/controller-runtime/pkg/predicate"
2123
)
2224

25+
const defaultWaitTime = time.Minute
26+
2327
// {{{ "Functional" Option Interfaces
2428

2529
// ForOption is some configuration that modifies options for a For request.
@@ -75,4 +79,21 @@ var _ ForOption = &Predicates{}
7579
var _ OwnsOption = &Predicates{}
7680
var _ WatchesOption = &Predicates{}
7781

82+
// ConditionallyRun runs the controller
83+
// condtionally on the existence of the forInput object
84+
// in the cluster's discovery doc, letting you start a
85+
// controller manager for a CRD not yet installed on the cluster.
86+
type ConditionallyRun struct {
87+
WaitTime time.Duration
88+
}
89+
90+
func (w ConditionallyRun) ApplyToFor(opts *ForInput) {
91+
opts.conditionallyRun = true
92+
if w.WaitTime == time.Duration(0) {
93+
opts.waitTime = defaultWaitTime
94+
} else {
95+
opts.waitTime = w.WaitTime
96+
}
97+
}
98+
7899
// }}}

pkg/cache/cache.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ type Informers interface {
5858
// of the underlying object.
5959
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)
6060

61+
// Remove the informer for the given object.
62+
Remove(obj runtime.Object) error
63+
6164
// Start runs all the informers known to this cache until the given channel is closed.
6265
// It blocks.
6366
Start(stopCh <-chan struct{}) error

pkg/cache/informertest/fake_cache.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ func (c *FakeInformers) Start(stopCh <-chan struct{}) error {
125125
return c.Error
126126
}
127127

128+
// Remove implements Cache
129+
func (c *FakeInformers) Remove(obj runtime.Object) error {
130+
return c.Error
131+
}
132+
128133
// IndexField implements Cache
129134
func (c *FakeInformers) IndexField(ctx context.Context, obj runtime.Object, field string, extractValue client.IndexerFunc) error {
130135
return nil

pkg/cache/multi_namespace_cache.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,15 @@ func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema
9494
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
9595
}
9696

97+
func (c *multiNamespaceCache) Remove(obj runtime.Object) error {
98+
for _, cache := range c.namespaceToCache {
99+
if err := cache.Remove(obj); err != nil {
100+
return err
101+
}
102+
}
103+
return nil
104+
}
105+
97106
func (c *multiNamespaceCache) Start(stopCh <-chan struct{}) error {
98107
for ns, cache := range c.namespaceToCache {
99108
go func(ns string, cache Cache) {

pkg/controller/controller.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ package controller
1818

1919
import (
2020
"fmt"
21+
"time"
2122

2223
"github.com/go-logr/logr"
24+
"k8s.io/apimachinery/pkg/runtime"
2325
"k8s.io/client-go/util/workqueue"
2426
"sigs.k8s.io/controller-runtime/pkg/handler"
2527
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
@@ -46,6 +48,15 @@ type Options struct {
4648
// Log is the logger used for this controller and passed to each reconciliation
4749
// request via the context field.
4850
Log logr.Logger
51+
52+
// ConditionalObject is the Object that the manager should wait on to appear
53+
// in the discovery document to indicate to begin running the controller
54+
// (and should stop the informer for this object if it disappears from the discovery doc).
55+
ConditionalOn *runtime.Object
56+
57+
// ConditionalWaitTime is the frequency at which the controller manager should check
58+
// the discovery doc for the existence of the CondtionalOn object.
59+
ConditionalWaitTime time.Duration
4960
}
5061

5162
// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
@@ -123,5 +134,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
123134
SetFields: mgr.SetFields,
124135
Name: name,
125136
Log: options.Log.WithName("controller").WithName(name),
137+
ConditionalOn: options.ConditionalOn,
138+
ConditionalWaitTime: options.ConditionalWaitTime,
126139
}, nil
127140
}

pkg/internal/controller/controller.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"time"
2525

2626
"github.com/go-logr/logr"
27+
"k8s.io/apimachinery/pkg/runtime"
2728
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2829
"k8s.io/apimachinery/pkg/util/wait"
2930
"k8s.io/client-go/util/workqueue"
@@ -46,6 +47,15 @@ type Controller struct {
4647
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
4748
MaxConcurrentReconciles int
4849

50+
// ConditionalOn is the Object that the manager should wait on to appear
51+
// in the discovery document to indicate to begin running the controller
52+
// (and should stop the informer for this object if it disappears from the discovery doc).
53+
ConditionalOn *runtime.Object
54+
55+
// ConditionalWaitTime is the frequency at which the controller manager should check
56+
// the discovery doc for the existence of the CondtionalOn object.
57+
ConditionalWaitTime time.Duration
58+
4959
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
5060
// ensures that the state of the system matches the state specified in the object.
5161
// Defaults to the DefaultReconcileFunc.
@@ -172,9 +182,14 @@ func (c *Controller) Start(stop <-chan struct{}) error {
172182

173183
// All the watches have been started, we can reset the local slice.
174184
//
175-
// We should never hold watches more than necessary, each watch source can hold a backing cache,
185+
// We should usually hold watches more than necessary, each watch source can hold a backing cache,
176186
// which won't be garbage collected if we hold a reference to it.
177-
c.startWatches = nil
187+
188+
// The exception to this is when the controller is configured as a conditional runnable, in which
189+
// case it needs to knowledge of the watches in the event that the controller is restarted.
190+
if c.ConditionalOn == nil {
191+
c.startWatches = nil
192+
}
178193

179194
if c.JitterPeriod == 0 {
180195
c.JitterPeriod = 1 * time.Second
@@ -200,6 +215,20 @@ func (c *Controller) Start(stop <-chan struct{}) error {
200215
return nil
201216
}
202217

218+
// GetConditionalOn returns the object that startCondtionalRunnables used
219+
// to determine whether a condtional runnable should be started/stopped (based
220+
// on the object's existence in the cluster's discovery doc).
221+
func (c *Controller) GetConditionalOn() *runtime.Object {
222+
return c.ConditionalOn
223+
}
224+
225+
// GetConditionalWaitTime returns the duration for which the
226+
// controller manager should wait on each iteration when checking the
227+
// discovery doc for the ConditionalOn object's existence.
228+
func (c *Controller) GetConditionalWaitTime() time.Duration {
229+
return c.ConditionalWaitTime
230+
}
231+
203232
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
204233
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
205234
func (c *Controller) worker() {

0 commit comments

Comments
 (0)