Skip to content

Commit 22d9893

Browse files
committed
Scope controllers for a logical cluster
1 parent 23cd6a1 commit 22d9893

File tree

6 files changed

+39
-10
lines changed

6 files changed

+39
-10
lines changed

pkg/builder/controller.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"sigs.k8s.io/controller-runtime/pkg/predicate"
3737
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3838
"sigs.k8s.io/controller-runtime/pkg/source"
39+
"sigs.k8s.io/logical-cluster"
3940
)
4041

4142
// Supporting mocking out functions for testing.
@@ -60,6 +61,7 @@ type Builder struct {
6061
watchesInput []WatchesInput
6162
mgr manager.Manager
6263
cluster cluster.Cluster
64+
logicalName logical.Name
6365
globalPredicates []predicate.Predicate
6466
ctrl controller.Controller
6567
ctrlOptions controller.Options
@@ -69,6 +71,7 @@ type Builder struct {
6971
func (blder *Builder) clone() *Builder {
7072
clone := *blder
7173
clone.cluster = nil
74+
clone.logicalName = ""
7275
clone.ctrl = nil
7376
return &clone
7477
}
@@ -244,9 +247,10 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro
244247
return nil, blder.forInput.err
245248
}
246249

247-
if err := blder.mgr.AddRunnableBuilder(func(cl cluster.Cluster) (manager.Runnable, error) {
250+
if err := blder.mgr.AddLogicalRunnableBuilder(func(name logical.Name, cl cluster.Cluster) (manager.Runnable, error) {
248251
cloned := blder.clone()
249252
cloned.cluster = cl
253+
cloned.logicalName = name
250254
if err := cloned.do(r); err != nil {
251255
return nil, err
252256
}
@@ -351,6 +355,14 @@ func (blder *Builder) doWatch() error {
351355
return err
352356
}
353357
srckind.Type = typeForSrc
358+
} else if !ok {
359+
// If we're building a logical controller, raw watches are not allowed
360+
// given that the cache cannot be validated to be coming from the same cluter.
361+
// In the future, we could consider allowing this by satisfying a new interface
362+
// that sets and uses the cluster.
363+
if blder.logicalName != "" {
364+
return fmt.Errorf("when using a logical adapter, custom raw watches %T are not allowed", w.src)
365+
}
354366
}
355367

356368
if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
@@ -378,6 +390,10 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
378390
ctrlOptions.Reconciler = r
379391
}
380392

393+
if blder.logicalName != "" {
394+
ctrlOptions.LogicalCluster = blder.logicalName
395+
}
396+
381397
// Retrieve the GVK from the object we're reconciling
382398
// to prepopulate logger information, and to optionally generate a default name.
383399
var gvk schema.GroupVersionKind

pkg/controller/controller.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
3434
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3535
"sigs.k8s.io/controller-runtime/pkg/source"
36+
"sigs.k8s.io/logical-cluster"
3637
)
3738

3839
// Options are the arguments for creating a new Controller.
@@ -50,6 +51,10 @@ type Options struct {
5051
// LogConstructor is used to construct a logger used for this controller and passed
5152
// to each reconciliation via the context field.
5253
LogConstructor func(request *reconcile.Request) logr.Logger
54+
55+
// LogicalCluster is populated when the controller was created for a logical cluster.
56+
// This is used to determine if watch events without a logical.Name should be ignored.
57+
LogicalCluster logical.Name
5358
}
5459

5560
// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
@@ -121,7 +126,8 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
121126

122127
// Create controller with dependencies set
123128
return &controller.Controller{
124-
Do: options.Reconciler,
129+
Cluster: options.LogicalCluster,
130+
Do: options.Reconciler,
125131
MakeQueue: func() workqueue.RateLimitingInterface {
126132
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
127133
},

pkg/internal/controller/controller.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ type Controller struct {
4242
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
4343
Name string
4444

45+
// Cluster is the logical cluster that this controller is running against.
46+
// +optional
47+
Cluster logical.Name
48+
4549
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
4650
MaxConcurrentReconciles int
4751

@@ -317,7 +321,10 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
317321
ctx = addReconcileID(ctx, reconcileID)
318322

319323
// Set the Cluster on the request if it is set on the context.
320-
req.Cluster = logical.FromContext(ctx)
324+
if req.Cluster != c.Cluster {
325+
panic(fmt.Sprintf("controller was setup for logical cluster %q, got a request for a cluster %q, not allowed!", c.Cluster, req.Cluster))
326+
}
327+
req.Cluster = c.Cluster
321328

322329
// RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the
323330
// resource to be synced.

pkg/manager/internal.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ type controllerManager struct {
8181
errChan chan error
8282
runnables *runnables
8383

84-
runnableBuilders []func(cl cluster.Cluster) (Runnable, error)
84+
runnableBuilders []func(logical.Name, cluster.Cluster) (Runnable, error)
8585

8686
// defaultCluster holds a variety of methods to interact with a defaultCluster. Required.
8787
defaultCluster cluster.Cluster
@@ -319,7 +319,7 @@ func (cm *controllerManager) syncClusterAwareRunnables() {
319319
}
320320

321321
// Build the runnable.
322-
runnable, err := build(cluster)
322+
runnable, err := build(name, cluster)
323323
if err != nil {
324324
cluster.runnableBuilds = append(cluster.runnableBuilds, err)
325325
cm.logger.Error(err, "failed to build cluster aware runnable, won't retry", "clusterName", name)
@@ -410,7 +410,7 @@ func (cm *controllerManager) removeLogicalCluster(name logical.Name) error {
410410
return nil
411411
}
412412

413-
func (cm *controllerManager) AddRunnableBuilder(fn func(cl cluster.Cluster) (Runnable, error)) error {
413+
func (cm *controllerManager) AddLogicalRunnableBuilder(fn func(name logical.Name, cl cluster.Cluster) (Runnable, error)) error {
414414
cm.Lock()
415415
defer cm.Unlock()
416416
cm.runnableBuilders = append(cm.runnableBuilders, fn)

pkg/manager/manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ type Manager interface {
6767
// election was configured.
6868
Elected() <-chan struct{}
6969

70-
// AddRunnableBuilder adds a controller builder to the manager, which is used to build
70+
// AddLogicalRunnableBuilder adds a controller builder to the manager, which is used to build
7171
// controllers for a given cluster. This is useful when the Manager is running against many logical clusters.
72-
AddRunnableBuilder(func(cluster.Cluster) (Runnable, error)) error
72+
AddLogicalRunnableBuilder(func(logical.Name, cluster.Cluster) (Runnable, error)) error
7373

7474
// AddMetricsExtraHandler adds an extra handler served on path to the http server that serves metrics.
7575
// Might be useful to register some diagnostic endpoints e.g. pprof. Note that these endpoints meant to be

pkg/manager/manager_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1691,7 +1691,7 @@ var _ = Describe("manger.Manager", func() {
16911691

16921692
var built atomic.Bool
16931693
var started atomic.Bool
1694-
err = m.AddRunnableBuilder(func(cl cluster.Cluster) (Runnable, error) {
1694+
err = m.AddLogicalRunnableBuilder(func(name logical.Name, cl cluster.Cluster) (Runnable, error) {
16951695
built.Store(true)
16961696
return RunnableFunc(func(ctx context.Context) error {
16971697
Expect(logical.FromContext(ctx)).To(Equal(logical.Name("test-cluster")))
@@ -1735,7 +1735,7 @@ var _ = Describe("manger.Manager", func() {
17351735
var started atomic.Int64
17361736
var completed atomic.Int64
17371737
removedCh := make(chan struct{})
1738-
err = m.AddRunnableBuilder(func(cl cluster.Cluster) (Runnable, error) {
1738+
err = m.AddLogicalRunnableBuilder(func(name logical.Name, cl cluster.Cluster) (Runnable, error) {
17391739
built.Add(1)
17401740
return RunnableFunc(func(ctx context.Context) error {
17411741
defer completed.Add(1)

0 commit comments

Comments
 (0)