Skip to content

Commit 2872d85

Browse files
committed
React on pod events for readiness gates
We're relying on endpoints events to re-trigger reconciliations during rollouts, and we're considering pod's containers status (eg. are all pod's containers ContainerReady?) to either act upon, or swallow those events. Both (pods changes, ep changes) can be out of sync. For instance: a starting pod whose containers aren't all ContainerReady might have its addresses registered to an ep subset's NotReadyAddresses, kicking a reconcile event which won't propagate to the TargetGroups (+ conditions updates) since the pod is evaluated as not ready. Further pod changes won't kick an endpoint change (due to readiness gates, the pod's address will stay in NotReadyAddresses until we do something). As probably seen in #1205 . In order to react reliably on pods changes, we have to hook in a pod watch. Doing so is slightly expensive as we have to map pod -> [service ->] endpoint -> ingress on pods events, though we limit the search to pod's ns (services can only reference pods from their own ns, and ingress services from their ns).
1 parent 95ee2ac commit 2872d85

File tree

2 files changed

+126
-0
lines changed

2 files changed

+126
-0
lines changed

internal/ingress/controller/controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ func watchClusterEvents(c controller.Controller, cache cache.Cache, ingressChan
115115
}); err != nil {
116116
return err
117117
}
118+
if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handlers.EnqueueRequestsForPodsEvent{
119+
IngressClass: ingressClass,
120+
Cache: cache,
121+
}); err != nil {
122+
return err
123+
}
118124

119125
return nil
120126
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package handlers
2+
3+
import (
4+
"context"
5+
"reflect"
6+
7+
"github.com/golang/glog"
8+
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/alb/tg"
9+
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/ingress/annotations/class"
10+
corev1 "k8s.io/api/core/v1"
11+
extensions "k8s.io/api/extensions/v1beta1"
12+
"k8s.io/apimachinery/pkg/types"
13+
"k8s.io/client-go/util/workqueue"
14+
"sigs.k8s.io/controller-runtime/pkg/cache"
15+
"sigs.k8s.io/controller-runtime/pkg/client"
16+
"sigs.k8s.io/controller-runtime/pkg/event"
17+
"sigs.k8s.io/controller-runtime/pkg/handler"
18+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
19+
)
20+
21+
var _ handler.EventHandler = (*EnqueueRequestsForPodsEvent)(nil)
22+
23+
type EnqueueRequestsForPodsEvent struct {
24+
IngressClass string
25+
Cache cache.Cache
26+
}
27+
28+
// Create is called in response to an create event - e.g. Pod Creation.
29+
func (h *EnqueueRequestsForPodsEvent) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) {
30+
h.enqueueImpactedIngresses(e.Object.(*corev1.Pod), queue)
31+
}
32+
33+
// Update is called in response to an update event - e.g. Pod Updated.
34+
func (h *EnqueueRequestsForPodsEvent) Update(e event.UpdateEvent, queue workqueue.RateLimitingInterface) {
35+
podOld := e.ObjectOld.(*corev1.Pod)
36+
podNew := e.ObjectNew.(*corev1.Pod)
37+
if !reflect.DeepEqual(podOld, podNew) {
38+
h.enqueueImpactedIngresses(podNew, queue)
39+
}
40+
}
41+
42+
// Delete is called in response to a delete event - e.g. Pod Deleted.
43+
func (h *EnqueueRequestsForPodsEvent) Delete(e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
44+
}
45+
46+
// Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or
47+
// external trigger request - e.g. reconcile Autoscaling, or a Webhook.
48+
func (h *EnqueueRequestsForPodsEvent) Generic(event.GenericEvent, workqueue.RateLimitingInterface) {
49+
}
50+
51+
func (h *EnqueueRequestsForPodsEvent) enqueueImpactedIngresses(pod *corev1.Pod, queue workqueue.RateLimitingInterface) {
52+
ingressList := &extensions.IngressList{}
53+
if err := h.Cache.List(context.Background(), client.InNamespace(pod.Namespace), ingressList); err != nil {
54+
glog.Errorf("failed to fetch ingresses impacted by pod %s due to %v", pod.GetName(), err)
55+
return
56+
}
57+
58+
if pod.Status.PodIP == "" {
59+
return
60+
}
61+
62+
for _, ingress := range ingressList.Items {
63+
if !class.IsValidIngress(h.IngressClass, &ingress) {
64+
continue
65+
}
66+
67+
backends, _, err := tg.ExtractTargetGroupBackends(&ingress)
68+
if err != nil {
69+
glog.Errorf("failed to extract backend services from ingress %s/%s, reconciling the ingress. Error: %e",
70+
ingress.Namespace, ingress.Name, err)
71+
queue.Add(reconcile.Request{
72+
NamespacedName: types.NamespacedName{
73+
Namespace: ingress.Namespace,
74+
Name: ingress.Name,
75+
},
76+
})
77+
break
78+
}
79+
80+
for _, backend := range backends {
81+
endpoint := &corev1.Endpoints{}
82+
nspname := types.NamespacedName{
83+
Namespace: ingress.Namespace,
84+
Name: backend.ServiceName,
85+
}
86+
if err = h.Cache.Get(context.Background(), nspname, endpoint); err != nil {
87+
glog.Errorf("failed to fetch enpoint %s backing ingress %s/%s, ignoring",
88+
backend.ServiceName, ingress.Namespace, ingress.Name)
89+
continue
90+
}
91+
92+
if h.isPodInEndpoint(pod, endpoint) {
93+
queue.Add(reconcile.Request{
94+
NamespacedName: types.NamespacedName{
95+
Namespace: ingress.Namespace,
96+
Name: ingress.Name,
97+
},
98+
})
99+
}
100+
}
101+
}
102+
}
103+
104+
func (h *EnqueueRequestsForPodsEvent) isPodInEndpoint(pod *corev1.Pod, endpoint *corev1.Endpoints) bool {
105+
for _, sub := range endpoint.Subsets {
106+
for _, addr := range sub.Addresses {
107+
if addr.IP == "" || addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" || addr.TargetRef.Name != pod.Name {
108+
continue
109+
}
110+
return true
111+
}
112+
for _, addr := range sub.NotReadyAddresses {
113+
if addr.IP == "" || addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" || addr.TargetRef.Name != pod.Name {
114+
continue
115+
}
116+
return true
117+
}
118+
}
119+
return false
120+
}

0 commit comments

Comments
 (0)