Skip to content

Commit 64c22b1

Browse files
added leader election per controller
1 parent 13ee2bc commit 64c22b1

File tree

5 files changed

+211
-17
lines changed

5 files changed

+211
-17
lines changed

pkg/controller/controller.go

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,65 @@ package controller
1818

1919
import (
2020
"fmt"
21+
"time"
2122

23+
"github.com/go-logr/logr"
24+
"k8s.io/client-go/kubernetes/scheme"
25+
"k8s.io/client-go/rest"
26+
"k8s.io/client-go/tools/leaderelection/resourcelock"
2227
"k8s.io/client-go/util/workqueue"
2328
"sigs.k8s.io/controller-runtime/pkg/handler"
2429
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
30+
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
31+
internalrecorder "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
32+
"sigs.k8s.io/controller-runtime/pkg/leaderelection"
2533
"sigs.k8s.io/controller-runtime/pkg/manager"
2634
"sigs.k8s.io/controller-runtime/pkg/predicate"
2735
"sigs.k8s.io/controller-runtime/pkg/reconcile"
36+
"sigs.k8s.io/controller-runtime/pkg/recorder"
2837
"sigs.k8s.io/controller-runtime/pkg/source"
2938
)
3039

40+
const (
41+
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
42+
defaultLeaseDuration = 15 * time.Second
43+
defaultRenewDeadline = 10 * time.Second
44+
defaultRetryPeriod = 2 * time.Second
45+
)
46+
3147
// Options are the arguments for creating a new Controller
3248
type Options struct {
3349
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
3450
MaxConcurrentReconciles int
3551

3652
// Reconciler reconciles an object
3753
Reconciler reconcile.Reconciler
54+
55+
// LeaderElection determines whether or not to use leader election when
56+
// starting the controller.
57+
LeaderElection bool
58+
59+
// LeaderElectionNamespace determines the namespace in which the leader
60+
// election configmap will be created.
61+
LeaderElectionNamespace string
62+
63+
// LeaderElectionID determines the name of the configmap that leader election
64+
// will use for holding the leader lock.
65+
LeaderElectionID string
66+
67+
// LeaseDuration is the duration that non-leader candidates will
68+
// wait to force acquire leadership. This is measured against time of
69+
// last observed ack. Default is 15 seconds.
70+
LeaseDuration *time.Duration
71+
// RenewDeadline is the duration that the acting master will retry
72+
// refreshing leadership before giving up. Default is 10 seconds.
73+
RenewDeadline *time.Duration
74+
// RetryPeriod is the duration the LeaderElector clients should wait
75+
// between tries of actions. Default is 2 seconds.
76+
RetryPeriod *time.Duration
77+
78+
// Dependency injection for testing
79+
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
3880
}
3981

4082
// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
@@ -69,15 +111,22 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
69111
return nil, fmt.Errorf("must specify Name for Controller")
70112
}
71113

72-
if options.MaxConcurrentReconciles <= 0 {
73-
options.MaxConcurrentReconciles = 1
74-
}
114+
log := logf.RuntimeLog.WithValues("controller", name)
115+
116+
// Set default values for options fields
117+
options = setOptionsDefaults(options, name)
75118

76119
// Inject dependencies into Reconciler
77120
if err := mgr.SetFields(options.Reconciler); err != nil {
78121
return nil, err
79122
}
80123

124+
// Create the resource lock
125+
resourceLock, err := createResourceLock(mgr, options, log)
126+
if err != nil {
127+
return nil, err
128+
}
129+
81130
// Create controller with dependencies set
82131
c := &controller.Controller{
83132
Do: options.Reconciler,
@@ -89,8 +138,59 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
89138
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
90139
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
91140
Name: name,
141+
ResourceLock: resourceLock,
142+
LeaseDuration: *options.LeaseDuration,
143+
RenewDeadline: *options.RenewDeadline,
144+
RetryPeriod: *options.RetryPeriod,
92145
}
93146

94147
// Add the controller as a Manager components
95148
return c, mgr.Add(c)
96149
}
150+
151+
// setOptionsDefaults set default values for Options fields
152+
func setOptionsDefaults(options Options, name string) Options {
153+
if options.MaxConcurrentReconciles <= 0 {
154+
options.MaxConcurrentReconciles = 1
155+
}
156+
157+
if options.newResourceLock == nil {
158+
options.newResourceLock = leaderelection.NewResourceLock
159+
}
160+
161+
if options.LeaderElection && options.LeaderElectionID == "" {
162+
options.LeaderElectionID = fmt.Sprint(name, "-controller")
163+
}
164+
165+
leaseDuration, renewDeadline, retryPeriod := defaultLeaseDuration, defaultRenewDeadline, defaultRetryPeriod
166+
if options.LeaseDuration == nil {
167+
options.LeaseDuration = &leaseDuration
168+
}
169+
170+
if options.RenewDeadline == nil {
171+
options.RenewDeadline = &renewDeadline
172+
}
173+
174+
if options.RetryPeriod == nil {
175+
options.RetryPeriod = &retryPeriod
176+
}
177+
178+
return options
179+
}
180+
181+
func createResourceLock(mgr manager.Manager, options Options, log logr.Logger) (resourcelock.Interface, error) {
182+
recorderProvider, err := internalrecorder.NewProvider(mgr.GetConfig(), scheme.Scheme, log)
183+
if err != nil {
184+
return nil, err
185+
}
186+
187+
resourceLock, err := options.newResourceLock(mgr.GetConfig(), recorderProvider, leaderelection.Options{
188+
LeaderElection: options.LeaderElection,
189+
LeaderElectionID: options.LeaderElectionID,
190+
LeaderElectionNamespace: options.LeaderElectionNamespace,
191+
})
192+
if err != nil {
193+
return nil, err
194+
}
195+
return resourceLock, nil
196+
}

pkg/controller/controller_integration_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,14 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package controller_test
17+
package controller
1818

1919
import (
2020
appsv1 "k8s.io/api/apps/v1"
2121
corev1 "k8s.io/api/core/v1"
2222
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2323
"k8s.io/apimachinery/pkg/runtime/schema"
2424
"k8s.io/apimachinery/pkg/types"
25-
"sigs.k8s.io/controller-runtime/pkg/controller"
2625
"sigs.k8s.io/controller-runtime/pkg/handler"
2726
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2827
"sigs.k8s.io/controller-runtime/pkg/source"
@@ -55,7 +54,7 @@ var _ = Describe("controller", func() {
5554
Expect(err).NotTo(HaveOccurred())
5655

5756
By("Creating the Controller")
58-
instance, err := controller.New("foo-controller", cm, controller.Options{
57+
instance, err := New("foo-controller", cm, Options{
5958
Reconciler: reconcile.Func(
6059
func(request reconcile.Request) (reconcile.Result, error) {
6160
reconciled <- request

pkg/controller/controller_suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package controller_test
17+
package controller
1818

1919
import (
2020
"testing"

pkg/controller/controller_test.go

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,20 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package controller_test
17+
package controller
1818

1919
import (
2020
"fmt"
2121

2222
. "github.com/onsi/ginkgo"
2323
. "github.com/onsi/gomega"
24+
"k8s.io/client-go/rest"
25+
"k8s.io/client-go/tools/leaderelection/resourcelock"
2426
"sigs.k8s.io/controller-runtime/pkg/client"
25-
"sigs.k8s.io/controller-runtime/pkg/controller"
27+
"sigs.k8s.io/controller-runtime/pkg/leaderelection"
2628
"sigs.k8s.io/controller-runtime/pkg/manager"
2729
"sigs.k8s.io/controller-runtime/pkg/reconcile"
30+
"sigs.k8s.io/controller-runtime/pkg/recorder"
2831
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
2932
)
3033

@@ -46,7 +49,7 @@ var _ = Describe("controller.Controller", func() {
4649
It("should return an error if Name is not Specified", func(done Done) {
4750
m, err := manager.New(cfg, manager.Options{})
4851
Expect(err).NotTo(HaveOccurred())
49-
c, err := controller.New("", m, controller.Options{Reconciler: rec})
52+
c, err := New("", m, Options{Reconciler: rec})
5053
Expect(c).To(BeNil())
5154
Expect(err.Error()).To(ContainSubstring("must specify Name for Controller"))
5255

@@ -57,7 +60,7 @@ var _ = Describe("controller.Controller", func() {
5760
m, err := manager.New(cfg, manager.Options{})
5861
Expect(err).NotTo(HaveOccurred())
5962

60-
c, err := controller.New("foo", m, controller.Options{})
63+
c, err := New("foo", m, Options{})
6164
Expect(c).To(BeNil())
6265
Expect(err.Error()).To(ContainSubstring("must specify Reconciler"))
6366

@@ -68,7 +71,7 @@ var _ = Describe("controller.Controller", func() {
6871
m, err := manager.New(cfg, manager.Options{})
6972
Expect(err).NotTo(HaveOccurred())
7073

71-
c, err := controller.New("foo", m, controller.Options{Reconciler: &failRec{}})
74+
c, err := New("foo", m, Options{Reconciler: &failRec{}})
7275
Expect(c).To(BeNil())
7376
Expect(err).To(HaveOccurred())
7477
Expect(err.Error()).To(ContainSubstring("expected error"))
@@ -80,16 +83,49 @@ var _ = Describe("controller.Controller", func() {
8083
m, err := manager.New(cfg, manager.Options{})
8184
Expect(err).NotTo(HaveOccurred())
8285

83-
c1, err := controller.New("c1", m, controller.Options{Reconciler: rec})
86+
c1, err := New("c1", m, Options{Reconciler: rec})
8487
Expect(err).NotTo(HaveOccurred())
8588
Expect(c1).ToNot(BeNil())
8689

87-
c2, err := controller.New("c2", m, controller.Options{Reconciler: rec})
90+
c2, err := New("c2", m, Options{Reconciler: rec})
8891
Expect(err).NotTo(HaveOccurred())
8992
Expect(c2).ToNot(BeNil())
9093

9194
close(done)
9295
})
96+
97+
It("with leader election enabled should default ID to controller name if ID is not set", func(done Done) {
98+
var rl resourcelock.Interface
99+
m, err := manager.New(cfg, manager.Options{})
100+
Expect(err).NotTo(HaveOccurred())
101+
c, err := New("test", m, Options{
102+
Reconciler: rec,
103+
LeaderElection: true,
104+
LeaderElectionNamespace: "default",
105+
newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) {
106+
var err error
107+
rl, err = leaderelection.NewResourceLock(config, recorderProvider, options)
108+
return rl, err
109+
},
110+
})
111+
Expect(c).ToNot(BeNil())
112+
Expect(err).ToNot(HaveOccurred())
113+
Expect(rl.Describe()).To(Equal("default/test-controller"))
114+
close(done)
115+
})
116+
117+
It("should return an error if namespace not set and not running in cluster", func() {
118+
m, err := manager.New(cfg, manager.Options{})
119+
Expect(err).NotTo(HaveOccurred())
120+
c, err := New("test", m, Options{
121+
Reconciler: rec,
122+
LeaderElection: true,
123+
LeaderElectionID: "test-controller",
124+
})
125+
Expect(c).To(BeNil())
126+
Expect(err).To(HaveOccurred())
127+
Expect(err.Error()).To(ContainSubstring("unable to find leader election namespace: not running in-cluster, please specify LeaderElectionNamespace"))
128+
})
93129
})
94130
})
95131

pkg/internal/controller/controller.go

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package controller
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"sync"
2223
"time"
@@ -25,6 +26,8 @@ import (
2526
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2627
"k8s.io/apimachinery/pkg/util/wait"
2728
"k8s.io/client-go/rest"
29+
"k8s.io/client-go/tools/leaderelection"
30+
"k8s.io/client-go/tools/leaderelection/resourcelock"
2831
"k8s.io/client-go/tools/record"
2932
"k8s.io/client-go/util/workqueue"
3033
"sigs.k8s.io/controller-runtime/pkg/cache"
@@ -92,6 +95,21 @@ type Controller struct {
9295
// Kubernetes API.
9396
Recorder record.EventRecorder
9497

98+
errChan chan error
99+
100+
// resourceLock forms the basis for leader election
101+
ResourceLock resourcelock.Interface
102+
103+
// leaseDuration is the duration that non-leader candidates will
104+
// wait to force acquire leadership.
105+
LeaseDuration time.Duration
106+
// renewDeadline is the duration that the acting master will retry
107+
// refreshing leadership before giving up.
108+
RenewDeadline time.Duration
109+
// retryPeriod is the duration the LeaderElector clients should wait
110+
// between tries of actions.
111+
RetryPeriod time.Duration
112+
95113
// TODO(community): Consider initializing a logger with the Controller Name as the tag
96114
}
97115

@@ -130,6 +148,26 @@ func (c *Controller) Start(stop <-chan struct{}) error {
130148
defer utilruntime.HandleCrash()
131149
defer c.Queue.ShutDown()
132150

151+
c.errChan = make(chan error)
152+
if c.ResourceLock != nil {
153+
err := c.startLeaderElection(stop)
154+
if err != nil {
155+
return err
156+
}
157+
} else {
158+
go c.runController(stop)
159+
}
160+
161+
select {
162+
case <-stop:
163+
log.Info("Stopping workers", "controller", c.Name)
164+
return nil
165+
case err := <-c.errChan:
166+
return err
167+
}
168+
}
169+
170+
func (c *Controller) runController(stop <-chan struct{}) {
133171
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
134172
log.Info("Starting Controller", "controller", c.Name)
135173

@@ -143,7 +181,8 @@ func (c *Controller) Start(stop <-chan struct{}) error {
143181
err := fmt.Errorf("failed to wait for %s caches to sync", c.Name)
144182
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
145183
c.mu.Unlock()
146-
return err
184+
c.errChan <- err
185+
return
147186
}
148187

149188
if c.JitterPeriod == 0 {
@@ -162,9 +201,29 @@ func (c *Controller) Start(stop <-chan struct{}) error {
162201

163202
c.Started = true
164203
c.mu.Unlock()
204+
}
205+
206+
func (c *Controller) startLeaderElection(stop <-chan struct{}) (err error) {
207+
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
208+
Lock: c.ResourceLock,
209+
LeaseDuration: c.LeaseDuration,
210+
RenewDeadline: c.RenewDeadline,
211+
RetryPeriod: c.RetryPeriod,
212+
Callbacks: leaderelection.LeaderCallbacks{
213+
OnStartedLeading: func(_ context.Context) {
214+
c.runController(stop)
215+
},
216+
OnStoppedLeading: func() {
217+
c.errChan <- fmt.Errorf("leader election lost")
218+
},
219+
},
220+
})
221+
if err != nil {
222+
return err
223+
}
165224

166-
<-stop
167-
log.Info("Stopping workers", "controller", c.Name)
225+
// Start the leader elector process
226+
go l.Run(context.Background())
168227
return nil
169228
}
170229

0 commit comments

Comments
 (0)