Skip to content

Commit 5f1af13

Browse files
committed
Ensure that webhook server is thread/start-safe
This ensures that the webhook server is both threadsafe & "start-safe" -- i.e. you can register webhooks after starting the server. While this is generally not a common pattern, be allow runnables to be added to the manager after start, so it makes sense to do the same with hooks & the server.
1 parent 22a2c58 commit 5f1af13

File tree

6 files changed

+270
-44
lines changed

6 files changed

+270
-44
lines changed

pkg/builder/webhook_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import (
3434
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
3535
)
3636

37-
var _ = Describe("application", func() {
37+
var _ = Describe("webhook", func() {
3838
var stop chan struct{}
3939

4040
BeforeEach(func() {

pkg/envtest/webhook.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ type WebhookInstallOptions struct {
6464
// it will be automatically populated by the local temp dir
6565
LocalServingCertDir string
6666

67+
// CAData is the CA that can be used to trust the serving certificates in LocalServingCertDir.
68+
LocalServingCAData []byte
69+
6770
// MaxTime is the max time to wait
6871
MaxTime time.Duration
6972

@@ -143,8 +146,12 @@ func (o *WebhookInstallOptions) generateHostPort() (string, error) {
143146
return net.JoinHostPort(host, fmt.Sprintf("%d", port)), nil
144147
}
145148

146-
// Install installs specified webhooks to the API server
147-
func (o *WebhookInstallOptions) Install(config *rest.Config) error {
149+
// PrepWithoutInstalling does the setup parts of Install (populating host-port,
150+
// setting up CAs, etc), without actually truing to do anything with webhook
151+
// definitions. This is largely useful for internal testing of
152+
// controller-runtime, where we need a random host-port & caData for webhook
153+
// tests, but may be useful in similar scenarios.
154+
func (o *WebhookInstallOptions) PrepWithoutInstalling() error {
148155
hookCA, err := o.setupCA()
149156
if err != nil {
150157
return err
@@ -158,6 +165,15 @@ func (o *WebhookInstallOptions) Install(config *rest.Config) error {
158165
return err
159166
}
160167

168+
return nil
169+
}
170+
171+
// Install installs specified webhooks to the API server
172+
func (o *WebhookInstallOptions) Install(config *rest.Config) error {
173+
if err := o.PrepWithoutInstalling(); err != nil {
174+
return err
175+
}
176+
161177
if err := createWebhooks(config, o.MutatingWebhooks, o.ValidatingWebhooks); err != nil {
162178
return err
163179
}
@@ -273,6 +289,7 @@ func (o *WebhookInstallOptions) setupCA() ([]byte, error) {
273289
return nil, fmt.Errorf("unable to write webhook serving key to disk: %v", err)
274290
}
275291

292+
o.LocalServingCAData = certData
276293
return certData, nil
277294
}
278295

pkg/manager/internal.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -353,17 +353,32 @@ func (cm *controllerManager) GetAPIReader() client.Reader {
353353
}
354354

355355
func (cm *controllerManager) GetWebhookServer() *webhook.Server {
356-
if cm.webhookServer == nil {
356+
server, wasNew := func() (*webhook.Server, bool) {
357+
cm.mu.Lock()
358+
defer cm.mu.Unlock()
359+
360+
if cm.webhookServer != nil {
361+
return cm.webhookServer, false
362+
}
363+
357364
cm.webhookServer = &webhook.Server{
358365
Port: cm.port,
359366
Host: cm.host,
360367
CertDir: cm.certDir,
361368
}
362-
if err := cm.Add(cm.webhookServer); err != nil {
363-
panic("unable to add webhookServer to the controller manager")
369+
return cm.webhookServer, true
370+
}()
371+
372+
// only add the server if *we ourselves* just registered it.
373+
// Add has its own lock, so just do this separately -- there shouldn't
374+
// be a "race" in this lock gap because the condition is the population
375+
// of cm.webhookServer, not anything to do with Add.
376+
if wasNew {
377+
if err := cm.Add(server); err != nil {
378+
panic("unable to add webhook server to the controller manager")
364379
}
365380
}
366-
return cm.webhookServer
381+
return server
367382
}
368383

369384
func (cm *controllerManager) GetLogger() logr.Logger {

pkg/webhook/server.go

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ type Server struct {
7676

7777
// defaultingOnce ensures that the default fields are only ever set once.
7878
defaultingOnce sync.Once
79+
80+
// mu protects access to the webhook map & setFields for Start, Register, etc
81+
mu sync.Mutex
7982
}
8083

8184
// setDefaults does defaulting for the Server.
@@ -111,6 +114,9 @@ func (*Server) NeedLeaderElection() bool {
111114
// Register marks the given webhook as being served at the given path.
112115
// It panics if two hooks are registered on the same path.
113116
func (s *Server) Register(path string, hook http.Handler) {
117+
s.mu.Lock()
118+
defer s.mu.Unlock()
119+
114120
s.defaultingOnce.Do(s.setDefaults)
115121
_, found := s.webhooks[path]
116122
if found {
@@ -119,7 +125,28 @@ func (s *Server) Register(path string, hook http.Handler) {
119125
// TODO(directxman12): call setfields if we've already started the server
120126
s.webhooks[path] = hook
121127
s.WebhookMux.Handle(path, instrumentedHook(path, hook))
122-
log.Info("registering webhook", "path", path)
128+
129+
regLog := log.WithValues("path", path)
130+
regLog.Info("registering webhook")
131+
132+
// we've already been "started", inject dependencies here.
133+
// Otherwise, InjectFunc will do this for us later.
134+
if s.setFields != nil {
135+
if err := s.setFields(hook); err != nil {
136+
// TODO(directxman12): swallowing this error isn't great, but we'd have to
137+
// change the signature to fix that
138+
regLog.Error(err, "unable to inject fields into webhook during registration")
139+
}
140+
141+
baseHookLog := log.WithName("webhooks")
142+
143+
// NB(directxman12): we don't propagate this further by wrapping setFields because it's
144+
// unclear if this is how we want to deal with log propagation. In this specific instance,
145+
// we want to be able to pass a logger to webhooks because they don't know their own path.
146+
if _, err := inject.LoggerInto(baseHookLog.WithValues("webhook", path), hook); err != nil {
147+
regLog.Error(err, "unable to logger into webhook during registration")
148+
}
149+
}
123150
}
124151

125152
// instrumentedHook adds some instrumentation on top of the given webhook.
@@ -151,21 +178,6 @@ func (s *Server) Start(stop <-chan struct{}) error {
151178
baseHookLog := log.WithName("webhooks")
152179
baseHookLog.Info("starting webhook server")
153180

154-
// inject fields here as opposed to in Register so that we're certain to have our setFields
155-
// function available.
156-
for hookPath, webhook := range s.webhooks {
157-
if err := s.setFields(webhook); err != nil {
158-
return err
159-
}
160-
161-
// NB(directxman12): we don't propagate this further by wrapping setFields because it's
162-
// unclear if this is how we want to deal with log propagation. In this specific instance,
163-
// we want to be able to pass a logger to webhooks because they don't know their own path.
164-
if _, err := inject.LoggerInto(baseHookLog.WithValues("webhook", hookPath), webhook); err != nil {
165-
return err
166-
}
167-
}
168-
169181
certPath := filepath.Join(s.CertDir, s.CertName)
170182
keyPath := filepath.Join(s.CertDir, s.KeyName)
171183

@@ -238,5 +250,20 @@ func (s *Server) Start(stop <-chan struct{}) error {
238250
// InjectFunc injects the field setter into the server.
239251
func (s *Server) InjectFunc(f inject.Func) error {
240252
s.setFields = f
253+
254+
// inject fields here that weren't injected in Register because we didn't have setFields yet.
255+
baseHookLog := log.WithName("webhooks")
256+
for hookPath, webhook := range s.webhooks {
257+
if err := s.setFields(webhook); err != nil {
258+
return err
259+
}
260+
261+
// NB(directxman12): we don't propagate this further by wrapping setFields because it's
262+
// unclear if this is how we want to deal with log propagation. In this specific instance,
263+
// we want to be able to pass a logger to webhooks because they don't know their own path.
264+
if _, err := inject.LoggerInto(baseHookLog.WithValues("webhook", hookPath), webhook); err != nil {
265+
return err
266+
}
267+
}
241268
return nil
242269
}

pkg/webhook/server_test.go

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package webhook_test
18+
19+
import (
20+
"fmt"
21+
"io/ioutil"
22+
"net"
23+
"net/http"
24+
25+
. "github.com/onsi/ginkgo"
26+
. "github.com/onsi/gomega"
27+
"k8s.io/client-go/rest"
28+
"sigs.k8s.io/controller-runtime/pkg/envtest"
29+
"sigs.k8s.io/controller-runtime/pkg/webhook"
30+
)
31+
32+
var _ = Describe("Webhook Server", func() {
33+
var (
34+
stop chan struct{}
35+
testHostPort string
36+
client *http.Client
37+
server *webhook.Server
38+
)
39+
40+
BeforeEach(func() {
41+
stop = make(chan struct{})
42+
// closed in indivual tests differently
43+
44+
servingOpts := envtest.WebhookInstallOptions{}
45+
Expect(servingOpts.PrepWithoutInstalling()).To(Succeed())
46+
47+
testHostPort = net.JoinHostPort(servingOpts.LocalServingHost, fmt.Sprintf("%d", servingOpts.LocalServingPort))
48+
49+
// bypass needing to set up the x509 cert pool, etc ourselves
50+
clientTransport, err := rest.TransportFor(&rest.Config{
51+
TLSClientConfig: rest.TLSClientConfig{CAData: servingOpts.LocalServingCAData},
52+
})
53+
Expect(err).NotTo(HaveOccurred())
54+
client = &http.Client{
55+
Transport: clientTransport,
56+
}
57+
58+
server = &webhook.Server{
59+
Host: servingOpts.LocalServingHost,
60+
Port: servingOpts.LocalServingPort,
61+
CertDir: servingOpts.LocalServingCertDir,
62+
}
63+
64+
// TODO(directxman12): cleanup generated certificate dir, etc
65+
})
66+
67+
startServer := func() (done <-chan struct{}) {
68+
doneCh := make(chan struct{})
69+
go func() {
70+
defer GinkgoRecover()
71+
defer close(doneCh)
72+
Expect(server.Start(stop)).To(Succeed())
73+
}()
74+
// wait till we can ping the server to start the test
75+
Eventually(func() error {
76+
_, err := client.Get(fmt.Sprintf("https://%s/unservedpath", testHostPort))
77+
return err
78+
}).Should(Succeed())
79+
80+
// this is normally called before Start by the manager
81+
Expect(server.InjectFunc(func(i interface{}) error {
82+
boolInj, canInj := i.(interface{ InjectBool(bool) error })
83+
if !canInj {
84+
return nil
85+
}
86+
return boolInj.InjectBool(true)
87+
})).To(Succeed())
88+
89+
return doneCh
90+
}
91+
92+
// TODO(directxman12): figure out a good way to test all the serving setup
93+
// with httptest.Server to get all the niceness from that.
94+
95+
Context("when serving", func() {
96+
PIt("should verify the client CA name when asked to", func() {
97+
98+
})
99+
PIt("should support HTTP/2", func() {
100+
101+
})
102+
103+
// TODO(directxman12): figure out a good way to test the port default, etc
104+
})
105+
106+
It("should panic if a duplicate path is registered", func() {
107+
server.Register("/somepath", &testHandler{})
108+
doneCh := startServer()
109+
110+
Expect(func() { server.Register("/somepath", &testHandler{}) }).To(Panic())
111+
112+
close(stop)
113+
Eventually(doneCh, "4s").Should(BeClosed())
114+
})
115+
116+
Context("when registering new webhooks before starting", func() {
117+
It("should serve a webhook on the requested path", func() {
118+
server.Register("/somepath", &testHandler{})
119+
120+
doneCh := startServer()
121+
122+
Eventually(func() ([]byte, error) {
123+
resp, err := client.Get(fmt.Sprintf("https://%s/somepath", testHostPort))
124+
Expect(err).NotTo(HaveOccurred())
125+
defer resp.Body.Close()
126+
return ioutil.ReadAll(resp.Body)
127+
}).Should(Equal([]byte("gadzooks!")))
128+
129+
close(stop)
130+
Eventually(doneCh, "4s").Should(BeClosed())
131+
})
132+
133+
It("should inject dependencies eventually, given an inject func is eventually provided", func() {
134+
handler := &testHandler{}
135+
server.Register("/somepath", handler)
136+
doneCh := startServer()
137+
138+
Eventually(func() bool { return handler.injectedField }).Should(BeTrue())
139+
140+
close(stop)
141+
Eventually(doneCh, "4s").Should(BeClosed())
142+
})
143+
})
144+
145+
Context("when registering webhooks after starting", func() {
146+
var (
147+
doneCh <-chan struct{}
148+
)
149+
BeforeEach(func() {
150+
doneCh = startServer()
151+
})
152+
AfterEach(func() {
153+
// wait for cleanup to happen
154+
close(stop)
155+
Eventually(doneCh, "4s").Should(BeClosed())
156+
})
157+
158+
It("should serve a webhook on the requested path", func() {
159+
server.Register("/somepath", &testHandler{})
160+
resp, err := client.Get(fmt.Sprintf("https://%s/somepath", testHostPort))
161+
Expect(err).NotTo(HaveOccurred())
162+
defer resp.Body.Close()
163+
164+
Expect(ioutil.ReadAll(resp.Body)).To(Equal([]byte("gadzooks!")))
165+
})
166+
167+
It("should inject dependencies, if an inject func has been provided already", func() {
168+
handler := &testHandler{}
169+
server.Register("/somepath", handler)
170+
Expect(handler.injectedField).To(BeTrue())
171+
})
172+
})
173+
})
174+
175+
type testHandler struct {
176+
injectedField bool
177+
}
178+
179+
func (t *testHandler) InjectBool(val bool) error {
180+
t.injectedField = val
181+
return nil
182+
}
183+
func (t *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
184+
if _, err := resp.Write([]byte("gadzooks!")); err != nil {
185+
panic("unable to write http response!")
186+
}
187+
}

0 commit comments

Comments
 (0)