Skip to content

Commit 8d9e683

Browse files
petuominMo3m3n
authored andcommitted
MEDIUM: Add support for endpoint slices
* Use endpointSlices to discover endpoints if the cluster supports them * Add permissions to the ClusterRole for querying the endpointslices Relocate HAPRoxySrvs from PortEndpoints to Namespace With EndpointSlices one object does not include all endpoints to a service anymore. In order to track changes over all slices the metadata used for syncing the new addresses and haproxy backend servers are moved to Namespace level.
1 parent ef3fe90 commit 8d9e683

File tree

12 files changed

+339
-175
lines changed

12 files changed

+339
-175
lines changed

controller/haproxy/api/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ type HAProxyClient interface {
5656
SetServerState(backendName string, serverName string, state string) error
5757
ServerGet(serverName, backendNa string) (models.Server, error)
5858
SetAuxCfgFile(auxCfgFile string)
59-
SyncBackendSrvs(oldEndpoints, newEndpoints *store.PortEndpoints) error
59+
SyncBackendSrvs(backend *store.RuntimeBackend, portUpdated bool) error
6060
UserListDeleteAll() error
6161
UserListExistsByGroup(group string) (bool, error)
6262
UserListCreateByGroup(group string, userPasswordMap map[string][]byte) error

controller/haproxy/api/runtime.go

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,20 @@ func (c *clientNative) GetMap(mapFile string) (*models.Map, error) {
3232
}
3333

3434
// SyncBackendSrvs syncs states and addresses of a backend servers with corresponding endpoints.
35-
func (c *clientNative) SyncBackendSrvs(oldEndpoints, newEndpoints *store.PortEndpoints) error {
36-
if oldEndpoints.BackendName == "" {
35+
func (c *clientNative) SyncBackendSrvs(backend *store.RuntimeBackend, portUpdated bool) error {
36+
if backend.Name == "" {
3737
return nil
3838
}
39-
newEndpoints.HAProxySrvs = oldEndpoints.HAProxySrvs
40-
newEndpoints.BackendName = oldEndpoints.BackendName
41-
haproxySrvs := newEndpoints.HAProxySrvs
42-
newAddresses := newEndpoints.AddrNew
43-
portChanged := newEndpoints.Port != oldEndpoints.Port
39+
haproxySrvs := backend.HAProxySrvs
40+
addresses := backend.Endpoints.Addresses
4441
// Disable stale entries from HAProxySrvs
4542
// and provide list of Disabled Srvs
4643
var disabled []*store.HAProxySrv
4744
var errors utils.Errors
4845
for i, srv := range haproxySrvs {
49-
srv.Modified = portChanged || srv.Modified
50-
if _, ok := newAddresses[srv.Address]; ok {
51-
delete(newAddresses, srv.Address)
46+
srv.Modified = srv.Modified || portUpdated
47+
if _, ok := addresses[srv.Address]; ok {
48+
delete(addresses, srv.Address)
5249
} else {
5350
haproxySrvs[i].Address = ""
5451
haproxySrvs[i].Modified = true
@@ -57,14 +54,14 @@ func (c *clientNative) SyncBackendSrvs(oldEndpoints, newEndpoints *store.PortEnd
5754
}
5855

5956
// Configure new Addresses in available HAProxySrvs
60-
for newAddr := range newAddresses {
57+
for newAddr := range addresses {
6158
if len(disabled) == 0 {
6259
break
6360
}
6461
disabled[0].Address = newAddr
6562
disabled[0].Modified = true
6663
disabled = disabled[1:]
67-
delete(newAddresses, newAddr)
64+
delete(addresses, newAddr)
6865
}
6966
// Dynamically updates HAProxy backend servers with HAProxySrvs content
7067
var addrErr, stateErr error
@@ -74,15 +71,15 @@ func (c *clientNative) SyncBackendSrvs(oldEndpoints, newEndpoints *store.PortEnd
7471
}
7572
if srv.Address == "" {
7673
// logger.Tracef("server '%s/%s' changed status to %v", newEndpoints.BackendName, srv.Name, "maint")
77-
addrErr = c.SetServerAddr(newEndpoints.BackendName, srv.Name, "127.0.0.1", 0)
78-
stateErr = c.SetServerState(newEndpoints.BackendName, srv.Name, "maint")
74+
addrErr = c.SetServerAddr(backend.Name, srv.Name, "127.0.0.1", 0)
75+
stateErr = c.SetServerState(backend.Name, srv.Name, "maint")
7976
} else {
8077
// logger.Tracef("server '%s/%s' changed status to %v", newEndpoints.BackendName, srv.Name, "ready")
81-
addrErr = c.SetServerAddr(newEndpoints.BackendName, srv.Name, srv.Address, int(newEndpoints.Port))
82-
stateErr = c.SetServerState(newEndpoints.BackendName, srv.Name, "ready")
78+
addrErr = c.SetServerAddr(backend.Name, srv.Name, srv.Address, int(backend.Endpoints.Port))
79+
stateErr = c.SetServerState(backend.Name, srv.Name, "ready")
8380
}
8481
if addrErr != nil || stateErr != nil {
85-
newEndpoints.DynUpdateFailed = true
82+
backend.DynUpdateFailed = true
8683
errors.Add(addrErr)
8784
errors.Add(stateErr)
8885
}

controller/ingress.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (c *HAProxyController) handleIngressPath(ingress *store.Ingress, host strin
100100
}
101101
c.Cfg.ActiveBackends[backendName] = struct{}{}
102102
// Endpoints
103-
endpointsReload := svc.HandleEndpoints(c.Client, c.Store)
103+
endpointsReload := svc.HandleHAProxySrvs(c.Client, c.Store)
104104
return backendReload || endpointsReload || routeReload, err
105105
}
106106

@@ -143,7 +143,7 @@ func (c *HAProxyController) setDefaultService(ingress *store.Ingress, frontends
143143
}
144144
}
145145
c.Cfg.ActiveBackends[backendName] = struct{}{}
146-
endpointsReload := svc.HandleEndpoints(c.Client, c.Store)
146+
endpointsReload := svc.HandleHAProxySrvs(c.Client, c.Store)
147147
reload = bdReload || ftReload || endpointsReload
148148
return reload, err
149149
}

controller/kubernetes.go

Lines changed: 133 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"k8s.io/apimachinery/pkg/fields"
2525
"k8s.io/apimachinery/pkg/util/wait"
2626

27+
discoveryv1 "k8s.io/api/discovery/v1"
28+
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
2729
"k8s.io/client-go/kubernetes"
2830
"k8s.io/client-go/rest"
2931
"k8s.io/client-go/tools/cache"
@@ -120,11 +122,12 @@ func (k *K8s) EventsNamespaces(channel chan SyncDataEvent, stop chan struct{}, i
120122
status = DELETED
121123
}
122124
item := &store.Namespace{
123-
Name: data.GetName(),
124-
Endpoints: make(map[string]*store.Endpoints),
125-
Services: make(map[string]*store.Service),
126-
Ingresses: make(map[string]*store.Ingress),
127-
Secret: make(map[string]*store.Secret),
125+
Name: data.GetName(),
126+
Endpoints: make(map[string]map[string]*store.Endpoints),
127+
Services: make(map[string]*store.Service),
128+
Ingresses: make(map[string]*store.Ingress),
129+
Secret: make(map[string]*store.Secret),
130+
HAProxyRuntime: make(map[string]map[string]*store.RuntimeBackend),
128131
CRs: &store.CustomResources{
129132
Global: make(map[string]*models.Global),
130133
Defaults: make(map[string]*models.Defaults),
@@ -144,11 +147,12 @@ func (k *K8s) EventsNamespaces(channel chan SyncDataEvent, stop chan struct{}, i
144147
}
145148
status := DELETED
146149
item := &store.Namespace{
147-
Name: data.GetName(),
148-
Endpoints: make(map[string]*store.Endpoints),
149-
Services: make(map[string]*store.Service),
150-
Ingresses: make(map[string]*store.Ingress),
151-
Secret: make(map[string]*store.Secret),
150+
Name: data.GetName(),
151+
Endpoints: make(map[string]map[string]*store.Endpoints),
152+
Services: make(map[string]*store.Service),
153+
Ingresses: make(map[string]*store.Ingress),
154+
Secret: make(map[string]*store.Secret),
155+
HAProxyRuntime: make(map[string]map[string]*store.RuntimeBackend),
152156
CRs: &store.CustomResources{
153157
Global: make(map[string]*models.Global),
154158
Defaults: make(map[string]*models.Defaults),
@@ -191,7 +195,7 @@ func (k *K8s) EventsNamespaces(channel chan SyncDataEvent, stop chan struct{}, i
191195
go informer.Run(stop)
192196
}
193197

194-
func (k *K8s) EventsEndpoints(channel chan SyncDataEvent, stop chan struct{}, informer cache.SharedIndexInformer) {
198+
func (k *K8s) EventsEndpointSlices(channel chan SyncDataEvent, stop chan struct{}, informer cache.SharedIndexInformer) {
195199
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
196200
AddFunc: func(obj interface{}) {
197201
item, err := k.convertToEndpoints(obj, ADDED)
@@ -227,44 +231,130 @@ func (k *K8s) EventsEndpoints(channel chan SyncDataEvent, stop chan struct{}, in
227231
}
228232

229233
func (k *K8s) convertToEndpoints(obj interface{}, status store.Status) (*store.Endpoints, error) {
230-
data, ok := obj.(*corev1.Endpoints)
231-
if !ok {
232-
k.Logger.Errorf("%s: Invalid data from k8s api, %s", ENDPOINTS, obj)
233-
return nil, ErrIgnored
234+
getServiceName := func(labels map[string]string) string {
235+
return labels["kubernetes.io/service-name"]
234236
}
235-
if data.GetNamespace() == "kube-system" {
236-
if data.ObjectMeta.Name == "kube-controller-manager" ||
237-
data.ObjectMeta.Name == "kube-scheduler" ||
238-
data.ObjectMeta.Name == "kubernetes-dashboard" ||
239-
data.ObjectMeta.Name == "kube-dns" {
240-
return nil, ErrIgnored
237+
238+
shouldIgnoreObject := func(namespace string, labels map[string]string) bool {
239+
serviceName := getServiceName(labels)
240+
if namespace == "kube-system" {
241+
if serviceName == "kube-controller-manager" ||
242+
serviceName == "kube-scheduler" ||
243+
serviceName == "kubernetes-dashboard" ||
244+
serviceName == "kube-dns" {
245+
return true
246+
}
241247
}
248+
return false
242249
}
243-
if data.ObjectMeta.GetDeletionTimestamp() != nil {
244-
// detect endpoints that are in terminating state
245-
status = DELETED
246-
}
247-
item := &store.Endpoints{
248-
Namespace: data.GetNamespace(),
249-
Service: data.GetName(),
250-
Ports: make(map[string]*store.PortEndpoints),
251-
Status: status,
252-
}
253-
for _, subset := range data.Subsets {
254-
for _, port := range subset.Ports {
255-
addresses := make(map[string]struct{})
256-
for _, address := range subset.Addresses {
257-
addresses[address.IP] = struct{}{}
258-
}
259-
item.Ports[port.Name] = &store.PortEndpoints{
260-
Port: int64(port.Port),
261-
AddrCount: len(addresses),
262-
AddrNew: addresses,
263-
HAProxySrvs: make([]*store.HAProxySrv, 0, len(addresses)),
250+
switch data := obj.(type) {
251+
case *discoveryv1beta1.EndpointSlice:
252+
if shouldIgnoreObject(data.GetNamespace(), data.GetLabels()) {
253+
return nil, ErrIgnored
254+
}
255+
item := &store.Endpoints{
256+
SliceName: data.Name,
257+
Namespace: data.GetNamespace(),
258+
Service: getServiceName(data.GetLabels()),
259+
Ports: make(map[string]*store.PortEndpoints),
260+
Status: status,
261+
}
262+
addresses := make(map[string]struct{})
263+
for _, endpoints := range data.Endpoints {
264+
for _, address := range endpoints.Addresses {
265+
addresses[address] = struct{}{}
266+
}
267+
}
268+
for _, port := range data.Ports {
269+
item.Ports[*port.Name] = &store.PortEndpoints{
270+
Port: int64(*port.Port),
271+
Addresses: addresses,
272+
}
273+
}
274+
return item, nil
275+
case *discoveryv1.EndpointSlice:
276+
if shouldIgnoreObject(data.GetNamespace(), data.GetLabels()) {
277+
return nil, ErrIgnored
278+
}
279+
item := &store.Endpoints{
280+
SliceName: data.Name,
281+
Namespace: data.GetNamespace(),
282+
Service: getServiceName(data.GetLabels()),
283+
Ports: make(map[string]*store.PortEndpoints),
284+
Status: status,
285+
}
286+
addresses := make(map[string]struct{})
287+
for _, endpoints := range data.Endpoints {
288+
for _, address := range endpoints.Addresses {
289+
addresses[address] = struct{}{}
290+
}
291+
}
292+
for _, port := range data.Ports {
293+
item.Ports[*port.Name] = &store.PortEndpoints{
294+
Port: int64(*port.Port),
295+
Addresses: addresses,
296+
}
297+
}
298+
return item, nil
299+
case *corev1.Endpoints:
300+
item := &store.Endpoints{
301+
Namespace: data.GetNamespace(),
302+
Service: data.GetName(),
303+
Ports: make(map[string]*store.PortEndpoints),
304+
Status: status,
305+
}
306+
for _, subset := range data.Subsets {
307+
for _, port := range subset.Ports {
308+
addresses := make(map[string]struct{})
309+
for _, address := range subset.Addresses {
310+
addresses[address.IP] = struct{}{}
311+
}
312+
item.Ports[port.Name] = &store.PortEndpoints{
313+
Port: int64(port.Port),
314+
Addresses: addresses,
315+
}
264316
}
265317
}
318+
return item, nil
319+
default:
320+
k.Logger.Errorf("%s: Invalid data from k8s api, %s", ENDPOINTS, obj)
321+
return nil, ErrIgnored
266322
}
267-
return item, nil
323+
}
324+
325+
func (k *K8s) EventsEndpoints(channel chan SyncDataEvent, stop chan struct{}, informer cache.SharedIndexInformer) {
326+
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
327+
AddFunc: func(obj interface{}) {
328+
item, err := k.convertToEndpoints(obj, ADDED)
329+
if errors.Is(err, ErrIgnored) {
330+
return
331+
}
332+
k.Logger.Tracef("%s %s: %s", ENDPOINTS, item.Status, item.Service)
333+
channel <- SyncDataEvent{SyncType: ENDPOINTS, Namespace: item.Namespace, Data: item}
334+
},
335+
DeleteFunc: func(obj interface{}) {
336+
item, err := k.convertToEndpoints(obj, DELETED)
337+
if errors.Is(err, ErrIgnored) {
338+
return
339+
}
340+
k.Logger.Tracef("%s %s: %s", ENDPOINTS, item.Status, item.Service)
341+
channel <- SyncDataEvent{SyncType: ENDPOINTS, Namespace: item.Namespace, Data: item}
342+
},
343+
UpdateFunc: func(oldObj, newObj interface{}) {
344+
item1, err := k.convertToEndpoints(oldObj, EMPTY)
345+
if errors.Is(err, ErrIgnored) {
346+
return
347+
}
348+
item2, _ := k.convertToEndpoints(newObj, MODIFIED)
349+
if item2.Equal(item1) {
350+
return
351+
}
352+
// fix modified state for ones that are deleted,new,same
353+
k.Logger.Tracef("%s %s: %s", ENDPOINTS, item2.Status, item2.Service)
354+
channel <- SyncDataEvent{SyncType: ENDPOINTS, Namespace: item2.Namespace, Data: item2}
355+
},
356+
})
357+
go informer.Run(stop)
268358
}
269359

270360
func (k *K8s) EventsIngressClass(channel chan SyncDataEvent, stop chan struct{}, informer cache.SharedIndexInformer) {

controller/monitor.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,13 @@ func (c *HAProxyController) monitorChanges() {
3737
for _, namespace := range c.getWhitelistedNamespaces() {
3838
factory := informers.NewSharedInformerFactoryWithOptions(c.k8s.API, c.OSArgs.CacheResyncPeriod, informers.WithNamespace(namespace))
3939

40-
pi := factory.Core().V1().Endpoints().Informer()
41-
c.k8s.EventsEndpoints(c.eventChan, stop, pi)
40+
pi := c.getEndpointSlicesSharedInformer(factory)
41+
if pi != nil {
42+
c.k8s.EventsEndpointSlices(c.eventChan, stop, pi)
43+
} else {
44+
pi = factory.Core().V1().Endpoints().Informer()
45+
c.k8s.EventsEndpoints(c.eventChan, stop, pi)
46+
}
4247

4348
svci := factory.Core().V1().Services().Informer()
4449
c.k8s.EventsServices(c.eventChan, c.statusChan, stop, svci, c.PublishService)
@@ -152,6 +157,30 @@ func (c *HAProxyController) getIngressSharedInformers(factory informers.SharedIn
152157
return ii, ici
153158
}
154159

160+
func (c *HAProxyController) getEndpointSlicesSharedInformer(factory informers.SharedInformerFactory) cache.SharedIndexInformer {
161+
for i, apiGroup := range []string{"discovery.k8s.io/v1", "discovery.k8s.io/v1beta1"} {
162+
resources, err := c.k8s.API.ServerResourcesForGroupVersion(apiGroup)
163+
if err != nil {
164+
continue
165+
}
166+
167+
for _, rs := range resources.APIResources {
168+
if rs.Name == "endpointslices" {
169+
switch i {
170+
case 0:
171+
logger.Debug("Using discovery.k8s.io/v1 endpointslices")
172+
return factory.Discovery().V1().EndpointSlices().Informer()
173+
174+
case 1:
175+
logger.Debug("Using discovery.k8s.io/v1beta1 endpointslices")
176+
return factory.Discovery().V1beta1().EndpointSlices().Informer()
177+
}
178+
}
179+
}
180+
}
181+
return nil
182+
}
183+
155184
func (c *HAProxyController) getWhitelistedNamespaces() []string {
156185
if len(c.Store.NamespacesAccess.Whitelist) == 0 {
157186
return []string{""}

0 commit comments

Comments
 (0)