Skip to content

🐛 fix issue when webhook server refreshing cert #260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 28 additions & 12 deletions pkg/webhook/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"k8s.io/apimachinery/pkg/runtime"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
Expand All @@ -36,6 +37,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/types"
)

// default interval for checking cert is 90 days (~3 months)
var defaultCertRefreshInterval = 3 * 30 * 24 * time.Hour

// ServerOptions are options for configuring an admission webhook server.
type ServerOptions struct {
// Port is the port number that the server will serve.
Expand Down Expand Up @@ -128,6 +132,9 @@ type Server struct {
// manager is the manager that this webhook server will be registered.
manager manager.Manager

// httpServer is the actual server that serves the traffic.
httpServer *http.Server

once sync.Once
}

Expand Down Expand Up @@ -209,21 +216,21 @@ func (s *Server) Start(stop <-chan struct{}) error {
return s.run(stop)
}

func (s *Server) run(stop <-chan struct{}) error {
srv := &http.Server{
Addr: fmt.Sprintf(":%v", s.Port),
Handler: s.sMux,
}
func (s *Server) run(stop <-chan struct{}) error { // nolint: gocyclo
errCh := make(chan error)
serveFn := func() {
errCh <- srv.ListenAndServeTLS(path.Join(s.CertDir, writer.ServerCertName), path.Join(s.CertDir, writer.ServerKeyName))
s.httpServer = &http.Server{
Addr: fmt.Sprintf(":%v", s.Port),
Handler: s.sMux,
}
log.Info("starting the webhook server.")
errCh <- s.httpServer.ListenAndServeTLS(path.Join(s.CertDir, writer.ServerCertName), path.Join(s.CertDir, writer.ServerKeyName))
}

shutdownHappend := false
timer := time.Tick(wait.Jitter(defaultCertRefreshInterval, 0.1))
go serveFn()
for {
// TODO(mengqiy): add jitter to the timer
// Could use https://godoc.org/k8s.io/apimachinery/pkg/util/wait#Jitter
timer := time.Tick(6 * 30 * 24 * time.Hour)
select {
case <-timer:
changed, err := s.RefreshCert()
Expand All @@ -232,19 +239,28 @@ func (s *Server) run(stop <-chan struct{}) error {
return err
}
if !changed {
log.Info("no need to reload the certificates.")
continue
}
log.Info("server is shutting down to reload the certificates.")
err = srv.Shutdown(context.Background())
shutdownHappend = true
err = s.httpServer.Shutdown(context.Background())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is Shutdown is async, meaning return of shutdown doesn't necessarily mean the server is already done. But return of ListenAndServeTLS guarantees it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shutdown gracefully shuts down the server without interrupting any active connections. Shutdown works by first closing all open listeners, then closing all idle connections, and then waiting indefinitely for connections to return to idle and then shut down

Per https://golang.org/pkg/net/http/#Server.Shutdown, it is graceful shutdown and synchronized call. Did I miss anything?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused of those 2 methods, you are right. pls ignore it.

if err != nil {
log.Error(err, "encountering error when shutting down")
return err
}
timer = time.Tick(wait.Jitter(defaultCertRefreshInterval, 0.1))
go serveFn()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might have port conflict.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the question here is that shutting down the server will unbind the port, but if it will complete in time before the server tries to bind the same port next time.
I have been searching online for this question for quite a while. But I'm still not sure what the 100% correct thing to do here :/
It seems when the Listener is closed, it should have unbinded the port.
Some tests I added in this PR rotates the cert and reloads the server for multiple times. I haven't seen any issue about port conflict.

I'm open to suggestions if you have a better solution. If not, I will probably merge it as is :)

case <-stop:
return nil
return s.httpServer.Shutdown(context.Background())
case e := <-errCh:
return e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally if you start server here, it should just work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO if there is an unexpected error, we should surface the error instead of keeping retrying.

// Don't exit when getting an http.ErrServerClosed error due to restarting the server.
if shutdownHappend && e == http.ErrServerClosed {
shutdownHappend = false
} else if e != nil {
log.Error(e, "server returns an unexpected error")
return e
}
}
}
}
Expand Down
163 changes: 163 additions & 0 deletions pkg/webhook/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a test case to test the case where the server exceeds cert refresh interval? you can change defaultCertRefreshInterval to a smaller interview. I am worried that if you start a server on the same port immediately after shutdown, you will have port conflict.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC
https://github.com/kubernetes-sigs/controller-runtime/pull/260/files#diff-8b0412ec4fd52af1419bd0fcd0f2e101R121 is doing what you ask. The server restarts multiple times and rotates the certs.

Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package webhook

import (
"context"
"io/ioutil"
"net/http"
"os"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook/internal/cert"
"sigs.k8s.io/controller-runtime/pkg/webhook/internal/cert/generator"
"sigs.k8s.io/controller-runtime/pkg/webhook/internal/cert/writer"
"sigs.k8s.io/testing_frameworks/integration/addr"
)

type fakeCertWriter struct {
changed bool
}

func (cw *fakeCertWriter) EnsureCert(dnsName string) (*generator.Artifacts, bool, error) {
return &generator.Artifacts{}, cw.changed, nil
}

func (cw *fakeCertWriter) Inject(objs ...runtime.Object) error {
return nil
}

var _ = Describe("webhook server", func() {
Describe("run", func() {
var stop chan struct{}
var s *Server
var cn = "example.com"

BeforeEach(func() {
port, _, err := addr.Suggest()
Expect(err).NotTo(HaveOccurred())
s = &Server{
sMux: http.NewServeMux(),
ServerOptions: ServerOptions{
Port: int32(port),
BootstrapOptions: &BootstrapOptions{
Host: &cn,
},
},
}

cg := &generator.SelfSignedCertGenerator{}
s.CertDir, err = ioutil.TempDir("/tmp", "controller-runtime-")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we doing cleanup of this temp dir ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expect(err).NotTo(HaveOccurred())
certWriter, err := writer.NewFSCertWriter(writer.FSCertWriterOptions{CertGenerator: cg, Path: s.CertDir})
Expect(err).NotTo(HaveOccurred())
_, _, err = certWriter.EnsureCert(cn)
Expect(err).NotTo(HaveOccurred())

stop = make(chan struct{})
})

It("should stop if the stop channel is closed", func() {
var e error
go func() {
defer GinkgoRecover()
e = s.run(stop)
}()

Eventually(func() *http.Server {
return s.httpServer
}).ShouldNot(BeNil())

close(stop)
Expect(e).NotTo(HaveOccurred())
})

It("should exit if the server encounter an unexpected error", func() {
var e error
go func() {
defer GinkgoRecover()
e = s.run(stop)
}()

Eventually(func() *http.Server {
return s.httpServer
}).ShouldNot(BeNil())

err := s.httpServer.Shutdown(context.Background())
Expect(err).NotTo(HaveOccurred())

Eventually(func() error {
return e
}).Should(Equal(http.ErrServerClosed))

close(stop)
})

It("should be able to keep existing valid cert when timer fires", func() {
var e error
defaultCertRefreshInterval = 500 * time.Millisecond

s.certProvisioner = &cert.Provisioner{
CertWriter: &fakeCertWriter{changed: false},
}

go func() {
defer GinkgoRecover()
e = s.run(stop)
}()

// Wait for multiple cycles of timer firing
time.Sleep(2 * time.Second)
Expect(e).NotTo(HaveOccurred())

close(stop)
})

It("should be able to rotate the cert when timer fires", func() {
var e error
defaultCertRefreshInterval = 500 * time.Millisecond
s.certProvisioner = &cert.Provisioner{
CertWriter: &fakeCertWriter{changed: true},
}

go func() {
defer GinkgoRecover()
e = s.run(stop)
}()

Eventually(func() *http.Server {
return s.httpServer
}).ShouldNot(BeNil())

// Wait for multiple cycles of timer firing
time.Sleep(2 * time.Second)
Expect(e).NotTo(HaveOccurred())

close(stop)
})

AfterEach(func() {
defaultCertRefreshInterval = 3 * 30 * 24 * time.Hour
err := os.RemoveAll(s.CertDir)
Expect(err).NotTo(HaveOccurred())
}, 60)
})
})
57 changes: 57 additions & 0 deletions pkg/webhook/webhook_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package webhook

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
)

func TestSource(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecsWithDefaultAndCustomReporters(t, "Webhook Integration Suite", []Reporter{envtest.NewlineReporter{}})
}

var testenv *envtest.Environment
var cfg *rest.Config
var clientset *kubernetes.Clientset

var _ = BeforeSuite(func(done Done) {
logf.SetLogger(logf.ZapLoggerTo(GinkgoWriter, true))

testenv = &envtest.Environment{}

var err error
cfg, err = testenv.Start()
Expect(err).NotTo(HaveOccurred())

clientset, err = kubernetes.NewForConfig(cfg)
Expect(err).NotTo(HaveOccurred())

close(done)
}, 60)

var _ = AfterSuite(func() {
testenv.Stop()
})