Skip to content

Commit 6a03836

Browse files
committed
Add leader election to controller manager
1 parent 2d4d049 commit 6a03836

File tree

5 files changed

+198
-4
lines changed

5 files changed

+198
-4
lines changed

pkg/leaderelection/doc.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
/*
18+
Package leaderelection contains a constructors for a leader election resource lock
19+
*/
20+
package leaderelection

pkg/leaderelection/leader_election.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package leaderelection
18+
19+
import (
20+
"fmt"
21+
"os"
22+
23+
"k8s.io/apimachinery/pkg/util/uuid"
24+
"k8s.io/client-go/kubernetes"
25+
"k8s.io/client-go/rest"
26+
"k8s.io/client-go/tools/leaderelection/resourcelock"
27+
"sigs.k8s.io/controller-runtime/pkg/recorder"
28+
)
29+
30+
// Options provides the required configuration to create a new resource lock
31+
type Options struct {
32+
// LeaderElection determines whether or not to use leader election when
33+
// starting the manager.
34+
LeaderElection bool
35+
36+
// LeaderElectionNamespace determines the namespace in which the leader
37+
// election configmap will be created.
38+
LeaderElectionNamespace string
39+
40+
// LeaderElectionID determines the name of the configmap that leader election
41+
// will use for holding the leader lock.
42+
LeaderElectionID string
43+
}
44+
45+
// NewResourceLock creates a new config map resource lock for use in a leader
46+
// election loop
47+
func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, options Options) (resourcelock.Interface, error) {
48+
if !options.LeaderElection {
49+
return nil, nil
50+
}
51+
52+
if options.LeaderElectionID == "" || options.LeaderElectionNamespace == "" {
53+
return nil, fmt.Errorf("if leader election is enabled, both LeaderElectionID and LeaderElectionNamespace must be set")
54+
}
55+
56+
// Leader id, needs to be unique
57+
id, err := os.Hostname()
58+
if err != nil {
59+
return nil, err
60+
}
61+
id = id + "_" + string(uuid.NewUUID())
62+
63+
// Construct client for leader election
64+
client, err := kubernetes.NewForConfig(config)
65+
if err != nil {
66+
return nil, err
67+
}
68+
69+
// TODO(JoelSpeed): switch to leaderelection object in 1.12
70+
return resourcelock.New(resourcelock.ConfigMapsResourceLock,
71+
options.LeaderElectionNamespace,
72+
options.LeaderElectionID,
73+
client.CoreV1(),
74+
resourcelock.ResourceLockConfig{
75+
Identity: id,
76+
EventRecorder: recorderProvider.GetEventRecorderFor(id),
77+
})
78+
}

pkg/manager/internal.go

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@ limitations under the License.
1717
package manager
1818

1919
import (
20+
"fmt"
2021
"sync"
22+
"time"
2123

2224
"k8s.io/apimachinery/pkg/runtime"
2325
"k8s.io/client-go/rest"
26+
"k8s.io/client-go/tools/leaderelection"
27+
"k8s.io/client-go/tools/leaderelection/resourcelock"
2428
"k8s.io/client-go/tools/record"
2529
"sigs.k8s.io/controller-runtime/pkg/cache"
2630
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -56,6 +60,9 @@ type controllerManager struct {
5660
// (and EventHandlers, Sources and Predicates).
5761
recorderProvider recorder.Provider
5862

63+
// resourceLock
64+
resourceLock resourcelock.Interface
65+
5966
mu sync.Mutex
6067
started bool
6168
errChan chan error
@@ -133,6 +140,52 @@ func (cm *controllerManager) GetRecorder(name string) record.EventRecorder {
133140
}
134141

135142
func (cm *controllerManager) Start(stop <-chan struct{}) error {
143+
if cm.resourceLock == nil {
144+
go cm.start(stop)
145+
select {
146+
case <-stop:
147+
// we are done
148+
return nil
149+
case err := <-cm.errChan:
150+
// Error starting a controller
151+
return err
152+
}
153+
}
154+
155+
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
156+
Lock: cm.resourceLock,
157+
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
158+
// TODO(joelspeed): These timings should be configurable
159+
LeaseDuration: 15 * time.Second,
160+
RenewDeadline: 10 * time.Second,
161+
RetryPeriod: 2 * time.Second,
162+
Callbacks: leaderelection.LeaderCallbacks{
163+
OnStartedLeading: cm.start,
164+
OnStoppedLeading: func() {
165+
// Most implementations of leader election log.Fatal() here.
166+
// Since Start is wrapped in log.Fatal when called, we can just return
167+
// an error here which will cause the program to exit.
168+
cm.errChan <- fmt.Errorf("leader election lost")
169+
},
170+
},
171+
})
172+
if err != nil {
173+
return err
174+
}
175+
176+
go l.Run()
177+
178+
select {
179+
case <-stop:
180+
// We are done
181+
return nil
182+
case err := <-cm.errChan:
183+
// Error starting a controller
184+
return err
185+
}
186+
}
187+
188+
func (cm *controllerManager) start(stop <-chan struct{}) {
136189
func() {
137190
cm.mu.Lock()
138191
defer cm.mu.Unlock()
@@ -169,9 +222,6 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
169222
select {
170223
case <-stop:
171224
// We are done
172-
return nil
173-
case err := <-cm.errChan:
174-
// Error starting a controller
175-
return err
225+
return
176226
}
177227
}

pkg/manager/manager.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ import (
2525
"k8s.io/apimachinery/pkg/runtime"
2626
"k8s.io/client-go/kubernetes/scheme"
2727
"k8s.io/client-go/rest"
28+
"k8s.io/client-go/tools/leaderelection/resourcelock"
2829
"k8s.io/client-go/tools/record"
2930
"sigs.k8s.io/controller-runtime/pkg/cache"
3031
"sigs.k8s.io/controller-runtime/pkg/client"
3132
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
3233
internalrecorder "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
34+
"sigs.k8s.io/controller-runtime/pkg/leaderelection"
3335
"sigs.k8s.io/controller-runtime/pkg/recorder"
3436
)
3537

@@ -83,10 +85,23 @@ type Options struct {
8385
// value only if you know what you are doing. Defaults to 10 hours if unset.
8486
SyncPeriod *time.Duration
8587

88+
// LeaderElection determines whether or not to use leader election when
89+
// starting the manager.
90+
LeaderElection bool
91+
92+
// LeaderElectionNamespace determines the namespace in which the leader
93+
// election configmap will be created.
94+
LeaderElectionNamespace string
95+
96+
// LeaderElectionID determines the name of the configmap that leader election
97+
// will use for holding the leader lock.
98+
LeaderElectionID string
99+
86100
// Dependency injection for testing
87101
newCache func(config *rest.Config, opts cache.Options) (cache.Cache, error)
88102
newClient func(config *rest.Config, options client.Options) (client.Client, error)
89103
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger) (recorder.Provider, error)
104+
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
90105
}
91106

92107
// Runnable allows a component to be started.
@@ -140,6 +155,16 @@ func New(config *rest.Config, options Options) (Manager, error) {
140155
return nil, err
141156
}
142157

158+
// Create the resource lock to enable leader election)
159+
resourceLock, err := options.newResourceLock(config, recorderProvider, leaderelection.Options{
160+
LeaderElection: options.LeaderElection,
161+
LeaderElectionID: options.LeaderElectionID,
162+
LeaderElectionNamespace: options.LeaderElectionNamespace,
163+
})
164+
if err != nil {
165+
return nil, err
166+
}
167+
143168
return &controllerManager{
144169
config: config,
145170
scheme: options.Scheme,
@@ -148,6 +173,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
148173
fieldIndexes: cache,
149174
client: client.DelegatingClient{Reader: cache, Writer: writeObj, StatusClient: writeObj},
150175
recorderProvider: recorderProvider,
176+
resourceLock: resourceLock,
151177
}, nil
152178
}
153179

@@ -177,5 +203,10 @@ func setOptionsDefaults(options Options) Options {
177203
options.newRecorderProvider = internalrecorder.NewProvider
178204
}
179205

206+
// Allow newResourceLock to be mocked
207+
if options.newResourceLock == nil {
208+
options.newResourceLock = leaderelection.NewResourceLock
209+
}
210+
180211
return options
181212
}

pkg/manager/manager_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,21 @@ var _ = Describe("manger.Manager", func() {
9696

9797
close(done)
9898
})
99+
Context("with leader election enabled", func() {
100+
It("should return an error if ID not set", func() {
101+
m, err := New(cfg, Options{LeaderElection: true, LeaderElectionNamespace: "default"})
102+
Expect(m).To(BeNil())
103+
Expect(err).To(HaveOccurred())
104+
Expect(err.Error()).To(ContainSubstring("if leader election is enabled, both LeaderElectionID and LeaderElectionNamespace must be set"))
105+
})
106+
107+
It("should return an error if namespace not set", func() {
108+
m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime"})
109+
Expect(m).To(BeNil())
110+
Expect(err).To(HaveOccurred())
111+
Expect(err.Error()).To(ContainSubstring("if leader election is enabled, both LeaderElectionID and LeaderElectionNamespace must be set"))
112+
})
113+
})
99114
})
100115

101116
Describe("Start", func() {

0 commit comments

Comments
 (0)