Skip to content

Commit 4b47282

Browse files
ivanmatmatioktalz
authored andcommitted
BUG: this commit fixes issues: ip addresses of ingresses updating, creation of ingress delayed
- ip addresses of ingresses updated too slowly with publish service ip address. - creation of ingress delayed by unnecessary repeating operations or synchronous operations on previous point.
1 parent 88a9cb4 commit 4b47282

File tree

15 files changed

+487
-333
lines changed

15 files changed

+487
-333
lines changed

main.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,26 +98,24 @@ func main() {
9898
publishService := getNamespaceValue(osArgs.PublishService)
9999

100100
s := store.NewK8sStore(osArgs)
101+
k := k8s.New(
102+
osArgs,
103+
s.NamespacesAccess.Whitelist,
104+
publishService,
105+
)
101106

102107
c := controller.NewBuilder().
103108
WithHaproxyCfgFile(haproxyConf).
104109
WithEventChan(eventChan).
105110
WithIngressChan(ingressChan).
106111
WithStore(s).
107112
WithPublishService(publishService).
113+
WithUpdatePublishServiceFunc(k.UpdatePublishService).
114+
WithClientSet(k.GetClientset()).
108115
WithArgs(osArgs).Build()
109116

110-
k := k8s.New(
111-
osArgs,
112-
s.NamespacesAccess.Whitelist,
113-
publishService,
114-
)
115-
116117
go k.MonitorChanges(eventChan, ingressChan, stop)
117118
go c.Start()
118-
if publishService != nil {
119-
go ingress.UpdateStatus(k.GetClientset(), s, osArgs.IngressClass, osArgs.EmptyIngressClass, ingressChan, annotations.New())
120-
}
121119

122120
// Catch QUIT signals
123121
signalC := make(chan os.Signal, 1)

pkg/controller/builder.go

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/valyala/fasthttp"
1111
"github.com/valyala/fasthttp/fasthttpadaptor"
1212
"github.com/valyala/fasthttp/pprofhandler"
13+
"k8s.io/client-go/kubernetes"
1314

1415
"github.com/haproxytech/kubernetes-ingress/pkg/annotations"
1516
"github.com/haproxytech/kubernetes-ingress/pkg/haproxy"
@@ -24,17 +25,19 @@ import (
2425
)
2526

2627
type Builder struct {
27-
osArgs utils.OSArgs
28-
haproxyClient api.HAProxyClient
29-
haproxyEnv env.Env
30-
haproxyProcess process.Process
31-
haproxyRules rules.Rules
32-
haproxyCfgFile []byte
33-
annotations annotations.Annotations
34-
store store.K8s
35-
publishService *utils.NamespaceValue
36-
eventChan chan k8s.SyncDataEvent
37-
ingressChan chan ingress.Sync
28+
osArgs utils.OSArgs
29+
haproxyClient api.HAProxyClient
30+
haproxyEnv env.Env
31+
haproxyProcess process.Process
32+
haproxyRules rules.Rules
33+
haproxyCfgFile []byte
34+
annotations annotations.Annotations
35+
store store.K8s
36+
publishService *utils.NamespaceValue
37+
eventChan chan k8s.SyncDataEvent
38+
ingressChan chan ingress.Sync
39+
updatePublishServiceFunc func(ingresses []*ingress.Ingress, publishServiceAddresses []string)
40+
clientSet *kubernetes.Clientset
3841
}
3942

4043
var defaultEnv = env.Env{
@@ -114,6 +117,16 @@ func (builder *Builder) WithPublishService(publishService *utils.NamespaceValue)
114117
return builder
115118
}
116119

120+
func (builder *Builder) WithUpdatePublishServiceFunc(updatePublishServiceFunc func(ingresses []*ingress.Ingress, publishServiceAddresses []string)) *Builder {
121+
builder.updatePublishServiceFunc = updatePublishServiceFunc
122+
return builder
123+
}
124+
125+
func (builder *Builder) WithClientSet(clientSet *kubernetes.Clientset) *Builder {
126+
builder.clientSet = clientSet
127+
return builder
128+
}
129+
117130
func (builder *Builder) Build() *HAProxyController {
118131
if builder.haproxyCfgFile == nil {
119132
logger.Panic(errors.New("no HAProxy Config file provided"))
@@ -134,17 +147,20 @@ func (builder *Builder) Build() *HAProxyController {
134147

135148
prefix, errPrefix := utils.GetPodPrefix(os.Getenv("POD_NAME"))
136149
logger.Error(errPrefix)
150+
151+
builder.store.UpdateStatusFunc = ingress.NewStatusIngressUpdater(builder.clientSet, builder.store, builder.osArgs.IngressClass, builder.osArgs.EmptyIngressClass, builder.annotations)
137152
return &HAProxyController{
138-
osArgs: builder.osArgs,
139-
haproxy: haproxy,
140-
podNamespace: os.Getenv("POD_NAMESPACE"),
141-
podPrefix: prefix,
142-
store: builder.store,
143-
eventChan: builder.eventChan,
144-
ingressChan: builder.ingressChan,
145-
publishService: builder.publishService,
146-
annotations: builder.annotations,
147-
chShutdown: chShutdown,
153+
osArgs: builder.osArgs,
154+
haproxy: haproxy,
155+
podNamespace: os.Getenv("POD_NAMESPACE"),
156+
podPrefix: prefix,
157+
store: builder.store,
158+
eventChan: builder.eventChan,
159+
ingressChan: builder.ingressChan,
160+
publishService: builder.publishService,
161+
annotations: builder.annotations,
162+
chShutdown: chShutdown,
163+
updatePublishServiceFunc: builder.updatePublishServiceFunc,
148164
}
149165
}
150166

pkg/controller/controller.go

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,22 @@ var logger = utils.GetLogger()
3434

3535
// HAProxyController is ingress controller
3636
type HAProxyController struct {
37-
haproxy haproxy.HAProxy
38-
osArgs utils.OSArgs
39-
store store.K8s
40-
annotations annotations.Annotations
41-
publishService *utils.NamespaceValue
42-
auxCfgModTime int64
43-
eventChan chan k8s.SyncDataEvent
44-
ingressChan chan ingress.Sync
45-
ready bool
46-
reload bool
47-
restart bool
48-
updateHandlers []UpdateHandler
49-
podNamespace string
50-
podPrefix string
51-
chShutdown chan struct{}
37+
haproxy haproxy.HAProxy
38+
osArgs utils.OSArgs
39+
store store.K8s
40+
annotations annotations.Annotations
41+
publishService *utils.NamespaceValue
42+
auxCfgModTime int64
43+
eventChan chan k8s.SyncDataEvent
44+
ingressChan chan ingress.Sync
45+
ready bool
46+
reload bool
47+
restart bool
48+
updateHandlers []UpdateHandler
49+
podNamespace string
50+
podPrefix string
51+
chShutdown chan struct{}
52+
updatePublishServiceFunc func(ingresses []*ingress.Ingress, publishServiceAddresses []string)
5253
}
5354

5455
// Wrapping a Native-Client transaction and commit it.
@@ -110,27 +111,29 @@ func (c *HAProxyController) updateHAProxy() {
110111
logger.Error(route.CustomRoutesReset(c.haproxy))
111112
}
112113

114+
ingresses := []*ingress.Ingress{}
113115
for _, namespace := range c.store.Namespaces {
114116
if !namespace.Relevant {
115117
continue
116118
}
119+
c.store.SecretsProcessed = map[string]struct{}{}
117120
for _, ingResource := range namespace.Ingresses {
118121
i := ingress.New(c.store, ingResource, c.osArgs.IngressClass, c.osArgs.EmptyIngressClass, c.annotations)
119122
if i == nil {
120123
logger.Debugf("ingress '%s/%s' ignored: no matching IngressClass", ingResource.Namespace, ingResource.Name)
121124
continue
122125
}
123-
if c.publishService != nil && ingResource.Status == store.ADDED {
124-
select {
125-
case c.ingressChan <- ingress.Sync{Ingress: ingResource}:
126-
default:
127-
logger.Errorf("Ingress %s/%s: unable to sync status: sync channel full", ingResource.Namespace, ingResource.Name)
128-
}
126+
if ingResource.Status == store.ADDED {
127+
ingresses = append(ingresses, i)
129128
}
130129
c.reload = i.Update(c.store, c.haproxy, c.annotations) || c.reload
131130
}
132131
}
133132

133+
if len(ingresses) > 0 {
134+
go c.updatePublishServiceFunc(ingresses, c.store.PublishServiceAddresses)
135+
}
136+
134137
for _, handler := range c.updateHandlers {
135138
reload, err = handler.Update(c.store, c.haproxy, c.annotations)
136139
logger.Error(err)

pkg/controller/monitor.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ func (c *HAProxyController) SyncData() {
7171
change = c.store.EventSecret(ns, job.Data.(*store.Secret))
7272
case k8s.POD:
7373
change = c.store.EventPod(job.Data.(store.PodEvent))
74+
case k8s.PUBLISH_SERVICE:
75+
change = c.store.EventPublishService(ns, job.Data.(*store.Service))
7476
}
7577
hadChanges = hadChanges || change
7678
}

pkg/ingress/ingress.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,12 @@ func (i *Ingress) handlePath(k store.K8s, h haproxy.HAProxy, host string, path *
129129
return
130130
}
131131
// Endpoints
132-
endpointsReload := svc.HandleHAProxySrvs(k, h)
132+
service := svc.GetResource()
133+
var endpointsReload bool
134+
if _, ok := k.ServiceProcessed[service.Namespace+"/"+service.Name]; !ok {
135+
endpointsReload = svc.HandleHAProxySrvs(k, h)
136+
k.ServiceProcessed[service.Namespace+"/"+service.Name] = struct{}{}
137+
}
133138
return backendReload || endpointsReload || routeReload, err
134139
}
135140

@@ -207,11 +212,15 @@ func (i *Ingress) Update(k store.K8s, h haproxy.HAProxy, a annotations.Annotatio
207212
// Ingress secrets
208213
logger.Tracef("Ingress '%s/%s': processing secrets...", i.resource.Namespace, i.resource.Name)
209214
for _, tls := range i.resource.TLS {
215+
if _, ok := k.SecretsProcessed[i.resource.Namespace+"/"+tls.SecretName]; ok {
216+
continue
217+
}
210218
secret, secErr := k.GetSecret(i.resource.Namespace, tls.SecretName)
211219
if secErr != nil {
212220
logger.Warningf("Ingress '%s/%s': %s", i.resource.Namespace, i.resource.Name, secErr)
213221
continue
214222
}
223+
k.SecretsProcessed[i.resource.Namespace+"/"+tls.SecretName] = struct{}{}
215224
_, err := h.AddSecret(secret, certs.FT_CERT)
216225
logger.Error(err)
217226
}

pkg/ingress/status.go

Lines changed: 21 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -5,89 +5,37 @@ import (
55
"fmt"
66
"net"
77

8+
"github.com/haproxytech/kubernetes-ingress/pkg/annotations"
9+
"github.com/haproxytech/kubernetes-ingress/pkg/store"
10+
"github.com/haproxytech/kubernetes-ingress/pkg/utils"
811
corev1 "k8s.io/api/core/v1"
912
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
1013
networkingv1 "k8s.io/api/networking/v1"
1114
networkingv1beta "k8s.io/api/networking/v1beta1"
1215
k8serror "k8s.io/apimachinery/pkg/api/errors"
1316
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1417
"k8s.io/client-go/kubernetes"
15-
16-
"github.com/haproxytech/kubernetes-ingress/pkg/annotations"
17-
"github.com/haproxytech/kubernetes-ingress/pkg/store"
1818
)
1919

20-
func UpdateStatus(client *kubernetes.Clientset, k store.K8s, class string, emptyClass bool, channel chan Sync, a annotations.Annotations) {
21-
var i *Ingress
22-
addresses := []string{}
23-
for sync := range channel {
24-
// Published Service updated: Update all Ingresses
25-
if sync.Service != nil && getServiceAddresses(sync.Service, &addresses) {
26-
logger.Debug("Addresses of Ingress Controller service changed, status of all ingress resources are going to be updated")
27-
for _, ns := range k.Namespaces {
28-
if !ns.Relevant {
29-
continue
30-
}
31-
for _, ingress := range k.Namespaces[ns.Name].Ingresses {
32-
if i = New(k, ingress, class, emptyClass, a); i != nil {
33-
logger.Error(i.updateStatus(client, addresses))
34-
}
35-
}
20+
type UpdateStatus func(ingresses []*store.Ingress, publishServiceAddresses []string)
21+
22+
func NewStatusIngressUpdater(client *kubernetes.Clientset, k store.K8s, class string, emptyClass bool, a annotations.Annotations) UpdateStatus {
23+
return func(ingresses []*store.Ingress, publishServiceAddresses []string) {
24+
for _, ingress := range ingresses {
25+
if ing := New(k, ingress, class, emptyClass, a); ing != nil {
26+
logger.Error(ing.UpdateStatus(client, publishServiceAddresses))
3627
}
37-
} else if i = New(k, sync.Ingress, class, emptyClass, a); i != nil {
38-
// Update single Ingress
39-
logger.Error(i.updateStatus(client, addresses))
4028
}
4129
}
4230
}
4331

44-
func getServiceAddresses(service *corev1.Service, curAddr *[]string) (updated bool) {
45-
addresses := []string{}
46-
switch service.Spec.Type {
47-
case corev1.ServiceTypeExternalName:
48-
addresses = []string{service.Spec.ExternalName}
49-
case corev1.ServiceTypeClusterIP:
50-
addresses = []string{service.Spec.ClusterIP}
51-
case corev1.ServiceTypeNodePort:
52-
if service.Spec.ExternalIPs != nil {
53-
addresses = append(addresses, service.Spec.ExternalIPs...)
54-
} else {
55-
addresses = append(addresses, service.Spec.ClusterIP)
56-
}
57-
case corev1.ServiceTypeLoadBalancer:
58-
for _, ip := range service.Status.LoadBalancer.Ingress {
59-
if ip.IP == "" {
60-
addresses = append(addresses, ip.Hostname)
61-
} else {
62-
addresses = append(addresses, ip.IP)
63-
}
64-
}
65-
addresses = append(addresses, service.Spec.ExternalIPs...)
66-
default:
67-
logger.Errorf("Unable to extract IP address/es from service %s/%s", service.Namespace, service.Name)
68-
return
69-
}
32+
func (i *Ingress) UpdateStatus(client *kubernetes.Clientset, addresses []string) (err error) {
33+
var lbi []corev1.LoadBalancerIngress
7034

71-
if len(*curAddr) != len(addresses) {
72-
updated = true
73-
*curAddr = addresses
35+
if utils.EqualSliceStringsWithoutOrder(i.resource.Addresses, addresses) {
7436
return
7537
}
76-
for i, address := range addresses {
77-
if address != (*curAddr)[i] {
78-
updated = true
79-
break
80-
}
81-
}
82-
if updated {
83-
*curAddr = addresses
84-
}
85-
return
86-
}
8738

88-
func (i *Ingress) updateStatus(client *kubernetes.Clientset, addresses []string) (err error) {
89-
logger.Tracef("Updating status of Ingress %s/%s", i.resource.Namespace, i.resource.Name)
90-
var lbi []corev1.LoadBalancerIngress
9139
for _, addr := range addresses {
9240
if net.ParseIP(addr) == nil {
9341
lbi = append(lbi, corev1.LoadBalancerIngress{Hostname: addr})
@@ -135,6 +83,13 @@ func (i *Ingress) updateStatus(client *kubernetes.Clientset, addresses []string)
13583
return fmt.Errorf("failed to update LoadBalancer status of ingress %s/%s: %w", i.resource.Namespace, i.resource.Name, err)
13684
}
13785
logger.Tracef("Successful update of LoadBalancer status in ingress %s/%s", i.resource.Namespace, i.resource.Name)
138-
86+
// Allow to store the publish service addresses affected to the ingress for future comparison in update test.
87+
i.resource.Addresses = addresses
13988
return nil
14089
}
90+
91+
func UpdatePublishService(ingresses []*Ingress, api *kubernetes.Clientset, publishServiceAddresses []string) {
92+
for _, i := range ingresses {
93+
logger.Error(i.UpdateStatus(api, publishServiceAddresses))
94+
}
95+
}

0 commit comments

Comments
 (0)