Skip to content

Commit 87f8e9e

Browse files
stttsk8s-publishing-bot
authored andcommitted
aggregator: split availability controller into local and remote part
Signed-off-by: Dr. Stefan Schimanski <[email protected]> Kubernetes-commit: 834cd7ca4a1c08b5d32d5e2da377310764f2c11c
1 parent 006e6b9 commit 87f8e9e

File tree

5 files changed

+451
-35
lines changed

5 files changed

+451
-35
lines changed

pkg/apiserver/apiserver.go

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,9 @@ import (
5151
openapiaggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
5252
openapiv3controller "k8s.io/kube-aggregator/pkg/controllers/openapiv3"
5353
openapiv3aggregator "k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator"
54+
localavailability "k8s.io/kube-aggregator/pkg/controllers/status/local"
5455
availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics"
55-
statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status/remote"
56+
remoteavailability "k8s.io/kube-aggregator/pkg/controllers/status/remote"
5657
apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest"
5758
openapicommon "k8s.io/kube-openapi/pkg/common"
5859
)
@@ -102,12 +103,11 @@ type ExtraConfig struct {
102103

103104
RejectForwardingRedirects bool
104105

105-
// DisableAvailableConditionController disables the controller that updates the Available conditions for
106-
// APIServices, Endpoints and Services. This controller runs in kube-aggregator and can interfere with
107-
// Generic Control Plane components when certain apis are not available.
108-
// TODO: We should find a better way to handle this. For now it will be for Generic Control Plane authors to
109-
// disable this controller if they see issues.
110-
DisableAvailableConditionController bool
106+
// DisableRemoteAvailableConditionController disables the controller that updates the Available conditions for
107+
// remote APIServices via querying endpoints of the referenced services. In generic controlplane use-cases,
108+
// the concept of services and endpoints might differ, and might require another implementation of this
109+
// controller. Local APIService are reconciled nevertheless.
110+
DisableRemoteAvailableConditionController bool
111111
}
112112

113113
// Config represents the configuration needed to create an APIAggregator.
@@ -320,6 +320,12 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
320320
})
321321
}
322322

323+
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
324+
informerFactory.Start(context.Done())
325+
c.GenericConfig.SharedInformerFactory.Start(context.Done())
326+
return nil
327+
})
328+
323329
// create shared (remote and local) availability metrics
324330
// TODO: decouple from legacyregistry
325331
metrics := availabilitymetrics.New()
@@ -328,10 +334,25 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
328334
return nil, err
329335
}
330336

331-
// If the AvailableConditionController is disabled, we don't need to start the informers
332-
// and the controller.
333-
if !c.ExtraConfig.DisableAvailableConditionController {
334-
availableController, err := statuscontrollers.NewAvailableConditionController(
337+
// always run local availability controller
338+
local, err := localavailability.New(
339+
informerFactory.Apiregistration().V1().APIServices(),
340+
apiregistrationClient.ApiregistrationV1(),
341+
metrics,
342+
)
343+
if err != nil {
344+
return nil, err
345+
}
346+
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-local-available-controller", func(context genericapiserver.PostStartHookContext) error {
347+
// if we end up blocking for long periods of time, we may need to increase workers.
348+
go local.Run(5, context.Done())
349+
return nil
350+
})
351+
352+
// conditionally run remote availability controller. This could be replaced in certain
353+
// generic controlplane use-cases where there is another concept of services and/or endpoints.
354+
if !c.ExtraConfig.DisableRemoteAvailableConditionController {
355+
remote, err := remoteavailability.New(
335356
informerFactory.Apiregistration().V1().APIServices(),
336357
c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
337358
c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
@@ -344,16 +365,9 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
344365
if err != nil {
345366
return nil, err
346367
}
347-
348-
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
349-
informerFactory.Start(context.Done())
350-
c.GenericConfig.SharedInformerFactory.Start(context.Done())
351-
return nil
352-
})
353-
354-
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
368+
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-remote-available-controller", func(context genericapiserver.PostStartHookContext) error {
355369
// if we end up blocking for long periods of time, we may need to increase workers.
356-
go availableController.Run(5, context.Done())
370+
go remote.Run(5, context.Done())
357371
return nil
358372
})
359373
}
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package external
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"time"
23+
24+
"k8s.io/apimachinery/pkg/api/equality"
25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28+
"k8s.io/apimachinery/pkg/util/wait"
29+
"k8s.io/client-go/tools/cache"
30+
"k8s.io/client-go/util/workqueue"
31+
"k8s.io/klog/v2"
32+
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
33+
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
34+
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
35+
informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
36+
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
37+
"k8s.io/kube-aggregator/pkg/controllers"
38+
availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics"
39+
)
40+
41+
// AvailableConditionController handles checking the availability of registered local API services.
42+
type AvailableConditionController struct {
43+
apiServiceClient apiregistrationclient.APIServicesGetter
44+
45+
apiServiceLister listers.APIServiceLister
46+
apiServiceSynced cache.InformerSynced
47+
48+
// To allow injection for testing.
49+
syncFn func(key string) error
50+
51+
queue workqueue.TypedRateLimitingInterface[string]
52+
53+
// metrics registered into legacy registry
54+
metrics *availabilitymetrics.Metrics
55+
}
56+
57+
// New returns a new local availability AvailableConditionController.
58+
func New(
59+
apiServiceInformer informers.APIServiceInformer,
60+
apiServiceClient apiregistrationclient.APIServicesGetter,
61+
metrics *availabilitymetrics.Metrics,
62+
) (*AvailableConditionController, error) {
63+
c := &AvailableConditionController{
64+
apiServiceClient: apiServiceClient,
65+
apiServiceLister: apiServiceInformer.Lister(),
66+
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
67+
// We want a fairly tight requeue time. The controller listens to the API, but because it relies on the routability of the
68+
// service network, it is possible for an external, non-watchable factor to affect availability. This keeps
69+
// the maximum disruption time to a minimum, but it does prevent hot loops.
70+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 30*time.Second),
71+
workqueue.TypedRateLimitingQueueConfig[string]{Name: "LocalAvailabilityController"},
72+
),
73+
metrics: metrics,
74+
}
75+
76+
// resync on this one because it is low cardinality and rechecking the actual discovery
77+
// allows us to detect health in a more timely fashion when network connectivity to
78+
// nodes is snipped, but the network still attempts to route there. See
79+
// https://github.com/openshift/origin/issues/17159#issuecomment-341798063
80+
apiServiceHandler, _ := apiServiceInformer.Informer().AddEventHandlerWithResyncPeriod(
81+
cache.ResourceEventHandlerFuncs{
82+
AddFunc: c.addAPIService,
83+
UpdateFunc: c.updateAPIService,
84+
DeleteFunc: c.deleteAPIService,
85+
},
86+
30*time.Second)
87+
c.apiServiceSynced = apiServiceHandler.HasSynced
88+
89+
c.syncFn = c.sync
90+
91+
return c, nil
92+
}
93+
94+
func (c *AvailableConditionController) sync(key string) error {
95+
originalAPIService, err := c.apiServiceLister.Get(key)
96+
if apierrors.IsNotFound(err) {
97+
c.metrics.ForgetAPIService(key)
98+
return nil
99+
}
100+
if err != nil {
101+
return err
102+
}
103+
104+
if originalAPIService.Spec.Service != nil {
105+
// this controller only handles local APIServices
106+
return nil
107+
}
108+
109+
// local API services are always considered available
110+
apiService := originalAPIService.DeepCopy()
111+
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, apiregistrationv1apihelper.NewLocalAvailableAPIServiceCondition())
112+
_, err = c.updateAPIServiceStatus(originalAPIService, apiService)
113+
return err
114+
}
115+
116+
// updateAPIServiceStatus only issues an update if a change is detected. We have a tight resync loop to quickly detect dead
117+
// apiservices. Doing that means we don't want to quickly issue no-op updates.
118+
func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) {
119+
// update this metric on every sync operation to reflect the actual state
120+
c.metrics.SetUnavailableGauge(newAPIService)
121+
122+
if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) {
123+
return newAPIService, nil
124+
}
125+
126+
orig := apiregistrationv1apihelper.GetAPIServiceConditionByType(originalAPIService, apiregistrationv1.Available)
127+
now := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available)
128+
unknown := apiregistrationv1.APIServiceCondition{
129+
Type: apiregistrationv1.Available,
130+
Status: apiregistrationv1.ConditionUnknown,
131+
}
132+
if orig == nil {
133+
orig = &unknown
134+
}
135+
if now == nil {
136+
now = &unknown
137+
}
138+
if *orig != *now {
139+
klog.V(2).InfoS("changing APIService availability", "name", newAPIService.Name, "oldStatus", orig.Status, "newStatus", now.Status, "message", now.Message, "reason", now.Reason)
140+
}
141+
142+
newAPIService, err := c.apiServiceClient.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{})
143+
if err != nil {
144+
return nil, err
145+
}
146+
147+
c.metrics.SetUnavailableCounter(originalAPIService, newAPIService)
148+
return newAPIService, nil
149+
}
150+
151+
// Run starts the AvailableConditionController loop which manages the availability condition of API services.
152+
func (c *AvailableConditionController) Run(workers int, stopCh <-chan struct{}) {
153+
defer utilruntime.HandleCrash()
154+
defer c.queue.ShutDown()
155+
156+
klog.Info("Starting LocalAvailability controller")
157+
defer klog.Info("Shutting down LocalAvailability controller")
158+
159+
// This waits not just for the informers to sync, but for our handlers
160+
// to be called; since the handlers are three different ways of
161+
// enqueueing the same thing, waiting for this permits the queue to
162+
// maximally de-duplicate the entries.
163+
if !controllers.WaitForCacheSync("LocalAvailability", stopCh, c.apiServiceSynced) {
164+
return
165+
}
166+
167+
for i := 0; i < workers; i++ {
168+
go wait.Until(c.runWorker, time.Second, stopCh)
169+
}
170+
171+
<-stopCh
172+
}
173+
174+
func (c *AvailableConditionController) runWorker() {
175+
for c.processNextWorkItem() {
176+
}
177+
}
178+
179+
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
180+
func (c *AvailableConditionController) processNextWorkItem() bool {
181+
key, quit := c.queue.Get()
182+
if quit {
183+
return false
184+
}
185+
defer c.queue.Done(key)
186+
187+
err := c.syncFn(key)
188+
if err == nil {
189+
c.queue.Forget(key)
190+
return true
191+
}
192+
193+
utilruntime.HandleError(fmt.Errorf("%v failed with: %w", key, err))
194+
c.queue.AddRateLimited(key)
195+
196+
return true
197+
}
198+
199+
func (c *AvailableConditionController) addAPIService(obj interface{}) {
200+
castObj := obj.(*apiregistrationv1.APIService)
201+
klog.V(4).Infof("Adding %s", castObj.Name)
202+
c.queue.Add(castObj.Name)
203+
}
204+
205+
func (c *AvailableConditionController) updateAPIService(oldObj, _ interface{}) {
206+
oldCastObj := oldObj.(*apiregistrationv1.APIService)
207+
klog.V(4).Infof("Updating %s", oldCastObj.Name)
208+
c.queue.Add(oldCastObj.Name)
209+
}
210+
211+
func (c *AvailableConditionController) deleteAPIService(obj interface{}) {
212+
castObj, ok := obj.(*apiregistrationv1.APIService)
213+
if !ok {
214+
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
215+
if !ok {
216+
klog.Errorf("Couldn't get object from tombstone %#v", obj)
217+
return
218+
}
219+
castObj, ok = tombstone.Obj.(*apiregistrationv1.APIService)
220+
if !ok {
221+
klog.Errorf("Tombstone contained object that is not expected %#v", obj)
222+
return
223+
}
224+
}
225+
klog.V(4).Infof("Deleting %q", castObj.Name)
226+
c.queue.Add(castObj.Name)
227+
}

0 commit comments

Comments
 (0)