@@ -16,6 +16,7 @@ package controller
16
16
17
17
import (
18
18
"os"
19
+ "strconv"
19
20
"time"
20
21
21
22
"k8s.io/client-go/informers"
@@ -31,20 +32,13 @@ func (c *HAProxyController) monitorChanges() {
31
32
stop := make (chan struct {})
32
33
crManager := NewCRManager (& c .Store , c .k8s .RestConfig , c .OSArgs .CacheResyncPeriod , c .eventChan , stop )
33
34
c .crManager = crManager
35
+ epMirror := c .endpointsMirroring ()
34
36
35
37
c .k8s .EventPods (c .PodNamespace , c .PodPrefix , c .OSArgs .CacheResyncPeriod , c .eventChan )
36
38
37
39
for _ , namespace := range c .getWhitelistedNamespaces () {
38
40
factory := informers .NewSharedInformerFactoryWithOptions (c .k8s .API , c .OSArgs .CacheResyncPeriod , informers .WithNamespace (namespace ))
39
41
40
- pi := c .getEndpointSlicesSharedInformer (factory )
41
- if pi != nil {
42
- c .k8s .EventsEndpointSlices (c .eventChan , stop , pi )
43
- } else {
44
- pi = factory .Core ().V1 ().Endpoints ().Informer ()
45
- c .k8s .EventsEndpoints (c .eventChan , stop , pi )
46
- }
47
-
48
42
svci := factory .Core ().V1 ().Services ().Informer ()
49
43
c .k8s .EventsServices (c .eventChan , c .ingressChan , stop , svci , c .PublishService )
50
44
@@ -59,18 +53,30 @@ func (c *HAProxyController) monitorChanges() {
59
53
60
54
var ii , ici cache.SharedIndexInformer
61
55
ii , ici = c .getIngressSharedInformers (factory )
56
+
62
57
if ii == nil {
63
58
logger .Panic ("ingress resources not supported in this cluster" )
64
59
}
65
60
c .k8s .EventsIngresses (c .eventChan , stop , ii )
66
61
67
- informersSynced = []cache.InformerSynced {pi . HasSynced , svci .HasSynced , nsi .HasSynced , ii .HasSynced , si .HasSynced , ci .HasSynced }
62
+ informersSynced = []cache.InformerSynced {svci .HasSynced , nsi .HasSynced , ii .HasSynced , si .HasSynced , ci .HasSynced }
68
63
informersSynced = append (informersSynced , crManager .RunInformers (namespace )... )
69
64
70
65
if ici != nil {
71
66
c .k8s .EventsIngressClass (c .eventChan , stop , ici )
72
67
informersSynced = append (informersSynced , ici .HasSynced )
73
68
}
69
+
70
+ epsi := c .getEndpointSlicesSharedInformer (factory )
71
+ if epsi != nil {
72
+ c .k8s .EventsEndpointSlices (c .eventChan , stop , epsi )
73
+ informersSynced = append (informersSynced , epsi .HasSynced )
74
+ }
75
+ if epsi == nil || ! epMirror {
76
+ epi := factory .Core ().V1 ().Endpoints ().Informer ()
77
+ c .k8s .EventsEndpoints (c .eventChan , stop , epi )
78
+ informersSynced = append (informersSynced , epi .HasSynced )
79
+ }
74
80
}
75
81
76
82
if ! cache .WaitForCacheSync (stop , informersSynced ... ) {
@@ -199,6 +205,29 @@ func (c *HAProxyController) getWhitelistedNamespaces() []string {
199
205
return namespaces
200
206
}
201
207
208
+ // if EndpointSliceMirroring is supported we can just watch endpointSlices
209
+ // Ref: https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/0752-endpointslices#endpointslicemirroring-controller
210
+ func (c * HAProxyController ) endpointsMirroring () bool {
211
+ var major , minor int
212
+ var err error
213
+ version , _ := c .k8s .API .ServerVersion ()
214
+ if version == nil {
215
+ return false
216
+ }
217
+ major , err = strconv .Atoi (version .Major )
218
+ if err != nil {
219
+ return false
220
+ }
221
+ minor , err = strconv .Atoi (version .Minor )
222
+ if err != nil {
223
+ return false
224
+ }
225
+ if major == 1 && minor < 19 {
226
+ return false
227
+ }
228
+ return true
229
+ }
230
+
202
231
// auxCfgManager returns restart or reload requirement based on state and transition of auxiliary configuration file.
203
232
func (c * HAProxyController ) auxCfgManager () (restart , reload bool ) {
204
233
info , errStat := os .Stat (c .Cfg .Env .AuxCFGFile )
0 commit comments