|
| 1 | +package main |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "crypto/tls" |
| 6 | + "crypto/x509" |
| 7 | + "flag" |
| 8 | + "fmt" |
| 9 | + "net/http" |
| 10 | + "os" |
| 11 | + "time" |
| 12 | + |
| 13 | + "github.com/gorilla/mux" |
| 14 | + apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" |
| 15 | + "k8s.io/apimachinery/pkg/api/meta" |
| 16 | + "k8s.io/client-go/discovery" |
| 17 | + "k8s.io/client-go/discovery/cached/memory" |
| 18 | + "k8s.io/client-go/dynamic" |
| 19 | + "k8s.io/client-go/dynamic/dynamicinformer" |
| 20 | + "k8s.io/client-go/informers" |
| 21 | + "k8s.io/client-go/kubernetes" |
| 22 | + corecache "k8s.io/client-go/listers/core/v1" |
| 23 | + "k8s.io/client-go/rest" |
| 24 | + "k8s.io/client-go/restmapper" |
| 25 | + "k8s.io/client-go/tools/cache" |
| 26 | + "k8s.io/client-go/tools/clientcmd" |
| 27 | + apiregv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" |
| 28 | + "path/filepath" |
| 29 | + "sigs.k8s.io/controller-runtime/pkg/log/zap" |
| 30 | + api "sigs.k8s.io/hierarchical-namespaces/api/v1alpha2" |
| 31 | + "sigs.k8s.io/hierarchical-namespaces/internal/apiextension/apiresources" |
| 32 | + "sigs.k8s.io/hierarchical-namespaces/internal/apiextension/clients" |
| 33 | + "sigs.k8s.io/hierarchical-namespaces/internal/apiextension/handlers" |
| 34 | +) |
| 35 | + |
| 36 | +const ( |
| 37 | + kubeSystemNamespace = "kube-system" |
| 38 | + extensionConfigMap = "extension-apiserver-authentication" |
| 39 | + clientCAKey = "requestheader-client-ca-file" |
| 40 | +) |
| 41 | + |
| 42 | +var ( |
| 43 | + setupLog = zap.New().WithName("setup-apiext") |
| 44 | + listenAddress string |
| 45 | + certPath string |
| 46 | + keyPath string |
| 47 | + debug bool |
| 48 | +) |
| 49 | + |
| 50 | +func main() { |
| 51 | + parseFlags() |
| 52 | + cfg, err := getConfig() |
| 53 | + if err != nil { |
| 54 | + setupLog.Error(err, "unable to get cluster config") |
| 55 | + os.Exit(1) |
| 56 | + } |
| 57 | + server(context.Background(), cfg) |
| 58 | +} |
| 59 | + |
| 60 | +func parseFlags() { |
| 61 | + setupLog.Info("Parsing flags") |
| 62 | + flag.StringVar(&listenAddress, "address", ":7443", "The address to listen on.") |
| 63 | + flag.StringVar(&certPath, "cert", "", "Path to the server cert.") |
| 64 | + flag.StringVar(&keyPath, "key", "", "Path to the server key.") |
| 65 | + flag.BoolVar(&debug, "debug", false, "Enable debug logging.") |
| 66 | + flag.Parse() |
| 67 | +} |
| 68 | + |
| 69 | +func getConfig() (*rest.Config, error) { |
| 70 | + cfg, err := rest.InClusterConfig() |
| 71 | + if err == nil { |
| 72 | + return cfg, nil |
| 73 | + } |
| 74 | + kubeconfig := os.Getenv("KUBECONFIG") |
| 75 | + if kubeconfig == "" { |
| 76 | + home, err := os.UserHomeDir() |
| 77 | + if err != nil { |
| 78 | + return nil, fmt.Errorf("could not get kubeconfig: %w", err) |
| 79 | + } |
| 80 | + kubeconfig = filepath.Join(home, ".kube", "config") |
| 81 | + } |
| 82 | + cfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig) |
| 83 | + if err != nil { |
| 84 | + return nil, err |
| 85 | + } |
| 86 | + return cfg, nil |
| 87 | +} |
| 88 | + |
| 89 | +func getMapper(cfg *rest.Config) (meta.RESTMapper, error) { |
| 90 | + k8sClient, err := kubernetes.NewForConfig(cfg) |
| 91 | + if err != nil { |
| 92 | + return nil, err |
| 93 | + } |
| 94 | + cache := memory.NewMemCacheClient(k8sClient.Discovery()) |
| 95 | + return restmapper.NewDeferredDiscoveryRESTMapper(cache), nil |
| 96 | +} |
| 97 | + |
| 98 | +func server(ctx context.Context, cfg *rest.Config) { |
| 99 | + discovery, err := discovery.NewDiscoveryClientForConfig(cfg) |
| 100 | + if err != nil { |
| 101 | + setupLog.Error(err, "could not start watcher") |
| 102 | + os.Exit(1) |
| 103 | + } |
| 104 | + clientGetter := clients.MediaTypeClientGetter(cfg) |
| 105 | + dynamicFactory, err := getDynamicInformerFactory(cfg) |
| 106 | + if err != nil { |
| 107 | + setupLog.Error(err, "could not get dynammic client for config") |
| 108 | + os.Exit(1) |
| 109 | + } |
| 110 | + crdInformer, apiServiceInformer := setUpAPIInformers(dynamicFactory, ctx.Done()) |
| 111 | + mapper, err := getMapper(cfg) |
| 112 | + if err != nil { |
| 113 | + setupLog.Error(err, "could not get REST mapper for config") |
| 114 | + os.Exit(1) |
| 115 | + } |
| 116 | + apis := apiresources.WatchAPIResources(ctx, discovery, crdInformer, apiServiceInformer, mapper) |
| 117 | + factory, err := getInformerFactory(cfg) |
| 118 | + if err != nil { |
| 119 | + setupLog.Error(err, "could not get informer factory") |
| 120 | + os.Exit(1) |
| 121 | + } |
| 122 | + namespaceCache, configMapCache, err := setUpInformers(factory, ctx.Done()) |
| 123 | + if err != nil { |
| 124 | + setupLog.Error(err, "failed to set up informers") |
| 125 | + os.Exit(1) |
| 126 | + } |
| 127 | + mux := mux.NewRouter() |
| 128 | + pathPrefix := fmt.Sprintf("/apis/%s", api.ResourcesGroupVersion.String()) |
| 129 | + mux.HandleFunc(pathPrefix, handlers.DiscoveryHandler(apis)) |
| 130 | + mux.HandleFunc(fmt.Sprintf("%s/{resource}", pathPrefix), handlers.Forwarder(clientGetter, apis)) |
| 131 | + mux.HandleFunc(fmt.Sprintf("%s/namespaces/{namespace}/{resource}", pathPrefix), handlers.NamespaceHandler(clientGetter, apis, namespaceCache)) |
| 132 | + mux.Use(handlers.AuthenticateMiddleware(configMapCache, extensionConfigMap)) |
| 133 | + |
| 134 | + clientCA, err := getClientCA(configMapCache) |
| 135 | + if err != nil { |
| 136 | + setupLog.Error(err, "could not get client CA from configmap") |
| 137 | + os.Exit(1) |
| 138 | + } |
| 139 | + caCertPool := x509.NewCertPool() |
| 140 | + caCertPool.AppendCertsFromPEM([]byte(clientCA)) |
| 141 | + tlsConfig := &tls.Config{ |
| 142 | + ClientCAs: caCertPool, |
| 143 | + ClientAuth: tls.RequireAndVerifyClientCert, |
| 144 | + } |
| 145 | + server := http.Server{ |
| 146 | + Addr: listenAddress, |
| 147 | + Handler: mux, |
| 148 | + TLSConfig: tlsConfig, |
| 149 | + } |
| 150 | + setupLog.Info(fmt.Sprintf("starting server on %s", listenAddress)) |
| 151 | + err = server.ListenAndServeTLS(certPath, keyPath) |
| 152 | + if err != nil { |
| 153 | + setupLog.Error(err, "could not start server") |
| 154 | + os.Exit(1) |
| 155 | + } |
| 156 | +} |
| 157 | + |
| 158 | +func getClientCA(configMapCache corecache.ConfigMapNamespaceLister) (string, error) { |
| 159 | + config, err := configMapCache.Get(extensionConfigMap) |
| 160 | + if err != nil { |
| 161 | + return "", err |
| 162 | + } |
| 163 | + clientCA, ok := config.Data[clientCAKey] |
| 164 | + if !ok { |
| 165 | + return "", fmt.Errorf("invalid extension config") |
| 166 | + } |
| 167 | + return string(clientCA), nil |
| 168 | +} |
| 169 | + |
| 170 | +func getInformerFactory(cfg *rest.Config) (informers.SharedInformerFactory, error) { |
| 171 | + clientset, err := kubernetes.NewForConfig(cfg) |
| 172 | + if err != nil { |
| 173 | + return nil, err |
| 174 | + } |
| 175 | + return informers.NewSharedInformerFactory(clientset, 0), nil |
| 176 | +} |
| 177 | + |
| 178 | +func getDynamicInformerFactory(cfg *rest.Config) (dynamicinformer.DynamicSharedInformerFactory, error) { |
| 179 | + dynamicClient, err := dynamic.NewForConfig(cfg) |
| 180 | + if err != nil { |
| 181 | + return nil, err |
| 182 | + } |
| 183 | + return dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, time.Minute), nil |
| 184 | +} |
| 185 | + |
| 186 | +func setUpInformers(factory informers.SharedInformerFactory, stop <-chan struct{}) (corecache.NamespaceLister, corecache.ConfigMapNamespaceLister, error) { |
| 187 | + namespaceInformer := factory.Core().V1().Namespaces() |
| 188 | + configMapInformer := factory.Core().V1().ConfigMaps() |
| 189 | + go factory.Start(stop) |
| 190 | + if !cache.WaitForCacheSync(stop, namespaceInformer.Informer().HasSynced, configMapInformer.Informer().HasSynced) { |
| 191 | + return nil, nil, fmt.Errorf("cached failed to sync") |
| 192 | + } |
| 193 | + return namespaceInformer.Lister(), configMapInformer.Lister().ConfigMaps(kubeSystemNamespace), nil |
| 194 | +} |
| 195 | + |
| 196 | +func setUpAPIInformers(factory dynamicinformer.DynamicSharedInformerFactory, stop <-chan struct{}) (cache.SharedIndexInformer, cache.SharedIndexInformer) { |
| 197 | + crdGVR := apiextv1.SchemeGroupVersion.WithResource("customresourcedefinitions") |
| 198 | + crdInformer := factory.ForResource(crdGVR).Informer() |
| 199 | + apiServiceGVR := apiregv1.SchemeGroupVersion.WithResource("apiservices") |
| 200 | + apiServiceInformer := factory.ForResource(apiServiceGVR).Informer() |
| 201 | + go factory.Start(stop) |
| 202 | + return crdInformer, apiServiceInformer |
| 203 | +} |
0 commit comments