Skip to content

Commit 0c4676b

Browse files
committed
split manager into its own package
1 parent dee0a9c commit 0c4676b

16 files changed

+1252
-856
lines changed

example/main.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/kubernetes-sigs/controller-runtime/pkg/client/config"
2626
"github.com/kubernetes-sigs/controller-runtime/pkg/controller"
2727
"github.com/kubernetes-sigs/controller-runtime/pkg/eventhandler"
28+
"github.com/kubernetes-sigs/controller-runtime/pkg/manager"
2829
"github.com/kubernetes-sigs/controller-runtime/pkg/reconcile"
2930
logf "github.com/kubernetes-sigs/controller-runtime/pkg/runtime/log"
3031
"github.com/kubernetes-sigs/controller-runtime/pkg/runtime/signals"
@@ -40,16 +41,16 @@ func main() {
4041
logf.SetLogger(logf.ZapLogger(false))
4142

4243
// Setup a Manager
43-
manager, err := controller.NewManager(config.GetConfigOrDie(), controller.ManagerArgs{})
44+
mrg, err := manager.New(config.GetConfigOrDie(), manager.Options{})
4445
if err != nil {
4546
log.Fatal(err)
4647
}
4748

4849
// Setup a new controller to Reconcile ReplicaSets
49-
c, err := manager.NewController(
50+
c, err := controller.New(
5051
"foo-controller",
51-
&reconcileReplicaSet{client: manager.GetClient()},
52-
controller.Options{})
52+
mrg,
53+
controller.Options{Reconcile: &reconcileReplicaSet{client: mrg.GetClient()}})
5354
if err != nil {
5455
log.Fatal(err)
5556
}
@@ -72,7 +73,7 @@ func main() {
7273
log.Fatal(err)
7374
}
7475

75-
log.Fatal(manager.Start(signals.SetupSignalHandler()))
76+
log.Fatal(mrg.Start(signals.SetupSignalHandler()))
7677
}
7778

7879
// reconcileReplicaSet reconciles ReplicaSets

pkg/controller/controller.go

Lines changed: 31 additions & 183 deletions
Original file line numberDiff line numberDiff line change
@@ -18,31 +18,26 @@ package controller
1818

1919
import (
2020
"fmt"
21-
"sync"
22-
"time"
2321

24-
"github.com/kubernetes-sigs/controller-runtime/pkg/cache"
25-
"github.com/kubernetes-sigs/controller-runtime/pkg/client"
2622
"github.com/kubernetes-sigs/controller-runtime/pkg/eventhandler"
23+
"github.com/kubernetes-sigs/controller-runtime/pkg/internal/controller"
24+
"github.com/kubernetes-sigs/controller-runtime/pkg/manager"
2725
"github.com/kubernetes-sigs/controller-runtime/pkg/predicate"
2826
"github.com/kubernetes-sigs/controller-runtime/pkg/reconcile"
27+
logf "github.com/kubernetes-sigs/controller-runtime/pkg/runtime/log"
2928
"github.com/kubernetes-sigs/controller-runtime/pkg/source"
30-
"k8s.io/apimachinery/pkg/runtime"
31-
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
32-
"k8s.io/apimachinery/pkg/util/wait"
33-
"k8s.io/client-go/rest"
34-
toolscache "k8s.io/client-go/tools/cache"
3529
"k8s.io/client-go/util/workqueue"
36-
37-
logf "github.com/kubernetes-sigs/controller-runtime/pkg/runtime/log"
3830
)
3931

40-
var log = logf.KBLog.WithName("controller").WithName("controller")
32+
var log = logf.KBLog.WithName("controller")
4133

4234
// Options are the arguments for creating a new Controller
4335
type Options struct {
44-
// maxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
36+
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
4537
MaxConcurrentReconciles int
38+
39+
// Reconcile reconciles an object
40+
Reconcile reconcile.Reconcile
4641
}
4742

4843
// Controller is a work queue that watches for changes to objects (i.e. Create / Update / Delete events) and
@@ -59,184 +54,37 @@ type Controller interface {
5954
Start(stop <-chan struct{}) error
6055
}
6156

62-
var _ Controller = &controller{}
63-
64-
type controller struct {
65-
// name is used to uniquely identify a controller in tracing, logging and monitoring. name is required.
66-
name string
67-
68-
// maxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
69-
maxConcurrentReconciles int
70-
71-
// reconcile is a function that can be called at any time with the name / Namespace of an object and
72-
// ensures that the state of the system matches the state specified in the object.
73-
// Defaults to the DefaultReconcileFunc.
74-
reconcile reconcile.Reconcile
75-
76-
// client is a lazily initialized client. The controllerManager will initialize this when Start is called.
77-
client client.Client
78-
79-
// scheme is injected by the controllerManager when controllerManager.Start is called
80-
scheme *runtime.Scheme
81-
82-
// informers are injected by the controllerManager when controllerManager.Start is called
83-
cache cache.Cache
84-
85-
// config is the rest.config used to talk to the apiserver. Defaults to one of in-cluster, environment variable
86-
// specified, or the ~/.kube/config.
87-
config *rest.Config
88-
89-
// queue is an listeningQueue that listens for events from Informers and adds object keys to
90-
// the queue for processing
91-
queue workqueue.RateLimitingInterface
92-
93-
// inject is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates
94-
inject func(i interface{}) error
95-
96-
// mu is used to synchronize controller setup
97-
mu sync.Mutex
98-
99-
// jitterPeriod allows tests to reduce the jitterPeriod so they complete faster
100-
jitterPeriod time.Duration
101-
102-
// waitForCache allows tests to mock out the waitForCache function to return an error
103-
// defaults to cache.WaitForCacheSync
104-
waitForCache func(stopCh <-chan struct{}, cacheSyncs ...toolscache.InformerSynced) bool
105-
106-
// started is true if the Controller has been started
107-
started bool
108-
109-
// TODO(pwittrock): Consider initializing a logger with the controller name as the tag
110-
}
111-
112-
func (c *controller) Watch(src source.Source, evthdler eventhandler.EventHandler, prct ...predicate.Predicate) error {
113-
c.mu.Lock()
114-
defer c.mu.Unlock()
115-
116-
// Inject cache into arguments
117-
if err := c.inject(src); err != nil {
118-
return err
119-
}
120-
if err := c.inject(evthdler); err != nil {
121-
return err
122-
}
123-
for _, pr := range prct {
124-
if err := c.inject(pr); err != nil {
125-
return err
126-
}
57+
func New(name string, mrg manager.Manager, options Options) (Controller, error) {
58+
if options.Reconcile == nil {
59+
return nil, fmt.Errorf("must specify Reconcile")
12760
}
12861

129-
// TODO(pwittrock): wire in predicates
130-
131-
log.Info("Starting EventSource", "controller", c.name, "Source", src)
132-
return src.Start(evthdler, c.queue, prct...)
133-
}
134-
135-
func (c *controller) Start(stop <-chan struct{}) error {
136-
c.mu.Lock()
137-
defer c.mu.Unlock()
138-
139-
// TODO)(pwittrock): Reconsider HandleCrash
140-
defer utilruntime.HandleCrash()
141-
defer c.queue.ShutDown()
142-
143-
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
144-
log.Info("Starting controller", "controller", c.name)
145-
146-
// Wait for the caches to be synced before starting workers
147-
allInformers := c.cache.KnownInformersByType()
148-
syncedFuncs := make([]toolscache.InformerSynced, 0, len(allInformers))
149-
for _, informer := range allInformers {
150-
syncedFuncs = append(syncedFuncs, informer.HasSynced)
151-
}
152-
153-
if c.waitForCache == nil {
154-
c.waitForCache = toolscache.WaitForCacheSync
155-
}
156-
if ok := c.waitForCache(stop, syncedFuncs...); !ok {
157-
// This code is unreachable right now since WaitForCacheSync will never return an error
158-
// Leaving it here because that could happen in the future
159-
err := fmt.Errorf("failed to wait for %s caches to sync", c.name)
160-
log.Error(err, "Could not wait for Cache to sync", "controller", c.name)
161-
return err
162-
}
163-
164-
if c.jitterPeriod == 0 {
165-
c.jitterPeriod = time.Second
62+
if len(name) == 0 {
63+
return nil, fmt.Errorf("must specify Name for Controller")
16664
}
16765

168-
// Launch two workers to process resources
169-
log.Info("Starting workers", "controller", c.name, "WorkerCount", c.maxConcurrentReconciles)
170-
for i := 0; i < c.maxConcurrentReconciles; i++ {
171-
// Process work items
172-
go wait.Until(func() {
173-
for c.processNextWorkItem() {
174-
}
175-
}, c.jitterPeriod, stop)
66+
if options.MaxConcurrentReconciles <= 0 {
67+
options.MaxConcurrentReconciles = 1
17668
}
17769

178-
c.started = true
179-
180-
<-stop
181-
log.Info("Stopping workers", "controller", c.name)
182-
return nil
183-
}
184-
185-
// processNextWorkItem will read a single work item off the workqueue and
186-
// attempt to process it, by calling the syncHandler.
187-
func (c *controller) processNextWorkItem() bool {
188-
// This code copy-pasted from the sample-controller.
189-
190-
obj, shutdown := c.queue.Get()
191-
if obj == nil {
192-
// Sometimes the queue gives us nil items when it starts up
193-
c.queue.Forget(obj)
194-
}
195-
196-
if shutdown {
197-
// Stop working
198-
return false
70+
// Inject dependencies into Reconcile
71+
if err := mrg.SetFields(options.Reconcile); err != nil {
72+
return nil, err
19973
}
20074

201-
// We call Done here so the workqueue knows we have finished
202-
// processing this item. We also must remember to call Forget if we
203-
// do not want this work item being re-queued. For example, we do
204-
// not call Forget if a transient error occurs, instead the item is
205-
// put back on the workqueue and attempted again after a back-off
206-
// period.
207-
defer c.queue.Done(obj)
208-
var req reconcile.Request
209-
var ok bool
210-
if req, ok = obj.(reconcile.Request); !ok {
211-
// As the item in the workqueue is actually invalid, we call
212-
// Forget here else we'd go into a loop of attempting to
213-
// process a work item that is invalid.
214-
c.queue.Forget(obj)
215-
log.Error(nil, "Queue item was not a Request",
216-
"controller", c.name, "Type", fmt.Sprintf("%T", obj), "Value", obj)
217-
// Return true, don't take a break
218-
return true
75+
// Create controller with dependencies set
76+
c := &controller.Controller{
77+
Reconcile: options.Reconcile,
78+
Cache: mrg.GetCache(),
79+
Config: mrg.GetConfig(),
80+
Scheme: mrg.GetScheme(),
81+
Client: mrg.GetClient(),
82+
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
83+
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
84+
Name: name,
21985
}
22086

221-
// RunInformersAndControllers the syncHandler, passing it the namespace/name string of the
222-
// resource to be synced.
223-
if result, err := c.reconcile.Reconcile(req); err != nil {
224-
c.queue.AddRateLimited(req)
225-
log.Error(nil, "reconcile error", "controller", c.name, "Request", req)
226-
227-
return false
228-
} else if result.Requeue {
229-
c.queue.AddRateLimited(req)
230-
return true
231-
}
232-
233-
// Finally, if no error occurs we Forget this item so it does not
234-
// get queued again until another change happens.
235-
c.queue.Forget(obj)
236-
237-
// TODO(directxman12): What does 1 mean? Do we want level constants? Do we want levels at all?
238-
log.V(1).Info("Successfully Reconciled", "controller", c.name, "Request", req)
239-
240-
// Return true, don't take a break
241-
return true
87+
// Add the controller as a Manager componentsw
88+
mrg.Add(c)
89+
return c, nil
24290
}

pkg/controller/controller_integration_test.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/apimachinery/pkg/runtime/schema"
2828
"k8s.io/apimachinery/pkg/types"
2929

30+
"github.com/kubernetes-sigs/controller-runtime/pkg/manager"
3031
. "github.com/onsi/ginkgo"
3132
. "github.com/onsi/gomega"
3233
)
@@ -50,15 +51,17 @@ var _ = Describe("controller", func() {
5051

5152
It("should reconcile", func(done Done) {
5253
By("Creating the Manager")
53-
cm, err := controller.NewManager(cfg, controller.ManagerArgs{})
54+
cm, err := manager.New(cfg, manager.Options{})
5455
Expect(err).NotTo(HaveOccurred())
5556

5657
By("Creating the Controller")
57-
instance, err := cm.NewController("foo-controller", reconcile.Func(
58-
func(request reconcile.Request) (reconcile.Result, error) {
59-
reconciled <- request
60-
return reconcile.Result{}, nil
61-
}), controller.Options{})
58+
instance, err := controller.New("foo-controller", cm, controller.Options{
59+
Reconcile: reconcile.Func(
60+
func(request reconcile.Request) (reconcile.Result, error) {
61+
reconciled <- request
62+
return reconcile.Result{}, nil
63+
}),
64+
})
6265
Expect(err).NotTo(HaveOccurred())
6366

6467
By("Watching Resources")

pkg/controller/controller_suite_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"testing"
2121
"time"
2222

23-
"github.com/kubernetes-sigs/controller-runtime/pkg/controller"
2423
logf "github.com/kubernetes-sigs/controller-runtime/pkg/runtime/log"
2524
"github.com/kubernetes-sigs/controller-runtime/pkg/test"
2625
. "github.com/onsi/ginkgo"
@@ -45,7 +44,6 @@ var _ = BeforeSuite(func(done Done) {
4544

4645
var err error
4746
cfg, err = testenv.Start()
48-
controller.TestConfig = cfg
4947
Expect(err).NotTo(HaveOccurred())
5048

5149
time.Sleep(1 * time.Second)

0 commit comments

Comments
 (0)