Skip to content

Commit 8381e45

Browse files
committed
Add leader election to controller manager
1 parent d6eb543 commit 8381e45

File tree

5 files changed

+195
-4
lines changed

5 files changed

+195
-4
lines changed

pkg/leaderelection/doc.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
/*
18+
Package leaderelection contains a constructors for a leader election resource lock
19+
*/
20+
21+
package leaderelection

pkg/leaderelection/leader_election.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package leaderelection
18+
19+
import (
20+
"fmt"
21+
"os"
22+
23+
"github.com/pborman/uuid"
24+
"k8s.io/client-go/kubernetes"
25+
"k8s.io/client-go/rest"
26+
"k8s.io/client-go/tools/leaderelection/resourcelock"
27+
"sigs.k8s.io/controller-runtime/pkg/recorder"
28+
)
29+
30+
// Options provides the required configuration to create a new resource lock
31+
type Options struct {
32+
// LeaderElection determines whether or not to use leader election when
33+
// starting the manager.
34+
LeaderElection bool
35+
36+
// LeaderElectionNamespace determines the namespace in which the leader
37+
// election configmap will be created.
38+
LeaderElectionNamespace string
39+
40+
// LeaderElectionID determines the name of the configmap that leader election
41+
// will use for holding the leader lock.
42+
LeaderElectionID string
43+
}
44+
45+
// NewResourceLock creates a new config map resource lock for use in a leader
46+
// election loop
47+
func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, options Options) (resourcelock.Interface, error) {
48+
if !options.LeaderElection {
49+
return nil, nil
50+
}
51+
52+
if options.LeaderElectionID == "" || options.LeaderElectionNamespace == "" {
53+
return nil, fmt.Errorf("if leader election is enabled, both LeaderElectionID and LeaderElectionNamespace must be set")
54+
}
55+
56+
// Leader id, needs to be unique
57+
id, err := os.Hostname()
58+
if err != nil {
59+
return nil, err
60+
}
61+
id = id + "_" + string(uuid.NewUUID())
62+
63+
// Construct client for leader election
64+
client, err := kubernetes.NewForConfig(config)
65+
if err != nil {
66+
return nil, err
67+
}
68+
69+
return resourcelock.New(resourcelock.ConfigMapsResourceLock,
70+
options.LeaderElectionNamespace,
71+
options.LeaderElectionID,
72+
client.CoreV1(),
73+
resourcelock.ResourceLockConfig{
74+
Identity: id,
75+
EventRecorder: recorderProvider.GetEventRecorderFor(id),
76+
})
77+
}

pkg/manager/internal.go

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@ limitations under the License.
1717
package manager
1818

1919
import (
20+
"fmt"
2021
"sync"
22+
"time"
2123

2224
"k8s.io/apimachinery/pkg/runtime"
2325
"k8s.io/client-go/rest"
26+
"k8s.io/client-go/tools/leaderelection"
27+
"k8s.io/client-go/tools/leaderelection/resourcelock"
2428
"k8s.io/client-go/tools/record"
2529
"sigs.k8s.io/controller-runtime/pkg/cache"
2630
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -56,6 +60,9 @@ type controllerManager struct {
5660
// (and EventHandlers, Sources and Predicates).
5761
recorderProvider recorder.Provider
5862

63+
// resourceLock
64+
resourceLock resourcelock.Interface
65+
5966
mu sync.Mutex
6067
started bool
6168
errChan chan error
@@ -133,6 +140,49 @@ func (cm *controllerManager) GetRecorder(name string) record.EventRecorder {
133140
}
134141

135142
func (cm *controllerManager) Start(stop <-chan struct{}) error {
143+
if cm.resourceLock == nil {
144+
go cm.start(stop)
145+
select {
146+
case <-stop:
147+
// we are done
148+
return nil
149+
case err := <-cm.errChan:
150+
// Error starting a controller
151+
return err
152+
}
153+
}
154+
155+
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
156+
Lock: cm.resourceLock,
157+
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
158+
// TODO(joelspeed): These timings should be configurable
159+
LeaseDuration: 15 * time.Second,
160+
RenewDeadline: 10 * time.Second,
161+
RetryPeriod: 2 * time.Second,
162+
Callbacks: leaderelection.LeaderCallbacks{
163+
OnStartedLeading: cm.start,
164+
OnStoppedLeading: func() {
165+
cm.errChan <- fmt.Errorf("leader election lost")
166+
},
167+
},
168+
})
169+
if err != nil {
170+
return err
171+
}
172+
173+
go l.Run()
174+
175+
select {
176+
case <-stop:
177+
// We are done
178+
return nil
179+
case err := <-cm.errChan:
180+
// Error starting a controller
181+
return err
182+
}
183+
}
184+
185+
func (cm *controllerManager) start(stop <-chan struct{}) {
136186
func() {
137187
cm.mu.Lock()
138188
defer cm.mu.Unlock()
@@ -169,9 +219,6 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
169219
select {
170220
case <-stop:
171221
// We are done
172-
return nil
173-
case err := <-cm.errChan:
174-
// Error starting a controller
175-
return err
222+
return
176223
}
177224
}

pkg/manager/manager.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@ import (
2424
"k8s.io/apimachinery/pkg/runtime"
2525
"k8s.io/client-go/kubernetes/scheme"
2626
"k8s.io/client-go/rest"
27+
"k8s.io/client-go/tools/leaderelection/resourcelock"
2728
"k8s.io/client-go/tools/record"
2829
"sigs.k8s.io/controller-runtime/pkg/cache"
2930
"sigs.k8s.io/controller-runtime/pkg/client"
3031
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
3132
internalrecorder "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
33+
"sigs.k8s.io/controller-runtime/pkg/leaderelection"
3234
"sigs.k8s.io/controller-runtime/pkg/recorder"
3335
)
3436

@@ -82,10 +84,23 @@ type Options struct {
8284
// value only if you know what you are doing. Defaults to 10 hours if unset.
8385
SyncPeriod *time.Duration
8486

87+
// LeaderElection determines whether or not to use leader election when
88+
// starting the manager.
89+
LeaderElection bool
90+
91+
// LeaderElectionNamespace determines the namespace in which the leader
92+
// election configmap will be created.
93+
LeaderElectionNamespace string
94+
95+
// LeaderElectionID determines the name of the configmap that leader election
96+
// will use for holding the leader lock.
97+
LeaderElectionID string
98+
8599
// Dependency injection for testing
86100
newCache func(config *rest.Config, opts cache.Options) (cache.Cache, error)
87101
newClient func(config *rest.Config, options client.Options) (client.Client, error)
88102
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme) (recorder.Provider, error)
103+
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
89104
}
90105

91106
// Runnable allows a component to be started.
@@ -137,6 +152,16 @@ func New(config *rest.Config, options Options) (Manager, error) {
137152
return nil, err
138153
}
139154

155+
// Create the resource lock to enable leader election)
156+
resourceLock, err := options.newResourceLock(config, recorderProvider, leaderelection.Options{
157+
LeaderElection: options.LeaderElection,
158+
LeaderElectionID: options.LeaderElectionID,
159+
LeaderElectionNamespace: options.LeaderElectionNamespace,
160+
})
161+
if err != nil {
162+
return nil, err
163+
}
164+
140165
return &controllerManager{
141166
config: config,
142167
scheme: options.Scheme,
@@ -145,6 +170,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
145170
fieldIndexes: cache,
146171
client: client.DelegatingClient{Reader: cache, Writer: writeObj, StatusClient: writeObj},
147172
recorderProvider: recorderProvider,
173+
resourceLock: resourceLock,
148174
}, nil
149175
}
150176

@@ -174,5 +200,10 @@ func setOptionsDefaults(options Options) Options {
174200
options.newRecorderProvider = internalrecorder.NewProvider
175201
}
176202

203+
// Allow newResourceLock to be mocked
204+
if options.newResourceLock == nil {
205+
options.newResourceLock = leaderelection.NewResourceLock
206+
}
207+
177208
return options
178209
}

pkg/manager/manager_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,21 @@ var _ = Describe("manger.Manager", func() {
9595

9696
close(done)
9797
})
98+
Context("with leader election enabled", func() {
99+
It("should return an error if ID not set", func() {
100+
m, err := New(cfg, Options{LeaderElection: true, LeaderElectionNamespace: "default"})
101+
Expect(m).To(BeNil())
102+
Expect(err).To(HaveOccurred())
103+
Expect(err.Error()).To(ContainSubstring("if leader election is enabled, both LeaderElectionID and LeaderElectionNamespace must be set"))
104+
})
105+
106+
It("should return an error if namespace not set", func() {
107+
m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime"})
108+
Expect(m).To(BeNil())
109+
Expect(err).To(HaveOccurred())
110+
Expect(err.Error()).To(ContainSubstring("if leader election is enabled, both LeaderElectionID and LeaderElectionNamespace must be set"))
111+
})
112+
})
98113
})
99114

100115
Describe("Start", func() {

0 commit comments

Comments
 (0)