Skip to content

Commit e69b456

Browse files
authored
Merge pull request kubernetes-sigs#1214 from DataDog/missed-readiness-gates-events
React on pod events for readiness gates
2 parents 31b2ce4 + cb5e850 commit e69b456

File tree

3 files changed

+144
-8
lines changed

3 files changed

+144
-8
lines changed

internal/ingress/backend/endpoint.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -182,15 +182,10 @@ func (resolver *endpointResolver) resolveIP(ingress *extensions.Ingress, backend
182182
continue
183183
}
184184

185-
// check if all containers are ready
186-
for _, condition := range pod.Status.Conditions {
187-
if condition.Type == api.ContainersReady {
188-
if condition.Status == api.ConditionTrue {
189-
addresses = append(addresses, epAddr)
190-
}
191-
break
192-
}
185+
if IsPodSuitableAsIPTarget(pod) {
186+
addresses = append(addresses, epAddr)
193187
}
188+
194189
}
195190
for _, epAddr := range addresses {
196191
result = append(result, &elbv2.TargetDescription{
@@ -226,6 +221,17 @@ func IsNodeSuitableAsTrafficProxy(node *corev1.Node) bool {
226221
return false
227222
}
228223

224+
// IsPodSuitableAsIPTarget check whether pod is suitable as a TargetGroup's target
225+
// (currently tested: are all pod's containers ready?).
226+
func IsPodSuitableAsIPTarget(pod *corev1.Pod) bool {
227+
for _, condition := range pod.Status.Conditions {
228+
if condition.Type == api.ContainersReady {
229+
return condition.Status == api.ConditionTrue
230+
}
231+
}
232+
return false
233+
}
234+
229235
// findServiceAndPort returns the service & servicePort by name
230236
func findServiceAndPort(store store.Storer, namespace string, serviceName string, servicePort intstr.IntOrString) (*corev1.Service, *corev1.ServicePort, error) {
231237
serviceKey := namespace + "/" + serviceName

internal/ingress/controller/controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ func watchClusterEvents(c controller.Controller, cache cache.Cache, ingressChan
115115
}); err != nil {
116116
return err
117117
}
118+
if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handlers.EnqueueRequestsForPodsEvent{
119+
IngressClass: ingressClass,
120+
Cache: cache,
121+
}); err != nil {
122+
return err
123+
}
118124

119125
return nil
120126
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package handlers
2+
3+
import (
4+
"context"
5+
6+
"github.com/golang/glog"
7+
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/alb/tg"
8+
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/ingress/annotations/class"
9+
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/ingress/backend"
10+
corev1 "k8s.io/api/core/v1"
11+
extensions "k8s.io/api/extensions/v1beta1"
12+
"k8s.io/apimachinery/pkg/types"
13+
"k8s.io/client-go/util/workqueue"
14+
"sigs.k8s.io/controller-runtime/pkg/cache"
15+
"sigs.k8s.io/controller-runtime/pkg/client"
16+
"sigs.k8s.io/controller-runtime/pkg/event"
17+
"sigs.k8s.io/controller-runtime/pkg/handler"
18+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
19+
)
20+
21+
var _ handler.EventHandler = (*EnqueueRequestsForPodsEvent)(nil)
22+
23+
type EnqueueRequestsForPodsEvent struct {
24+
IngressClass string
25+
Cache cache.Cache
26+
}
27+
28+
// Create is called in response to an create event - e.g. Pod Creation.
29+
func (h *EnqueueRequestsForPodsEvent) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) {
30+
}
31+
32+
// Update is called in response to an update event - e.g. Pod Updated.
33+
func (h *EnqueueRequestsForPodsEvent) Update(e event.UpdateEvent, queue workqueue.RateLimitingInterface) {
34+
podOld := e.ObjectOld.(*corev1.Pod)
35+
podNew := e.ObjectNew.(*corev1.Pod)
36+
37+
// we only enqueue reconcile events for pods whose containers changed state
38+
// (ContainersReady vs not ContainersReady).
39+
if backend.IsPodSuitableAsIPTarget(podNew) != backend.IsPodSuitableAsIPTarget(podOld) {
40+
// ... and only for pods referenced by an endpoint backing an ingress:
41+
h.enqueueImpactedIngresses(podNew, queue)
42+
}
43+
}
44+
45+
// Delete is called in response to a delete event - e.g. Pod Deleted.
46+
func (h *EnqueueRequestsForPodsEvent) Delete(e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
47+
}
48+
49+
// Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or
50+
// external trigger request - e.g. reconcile Autoscaling, or a Webhook.
51+
func (h *EnqueueRequestsForPodsEvent) Generic(event.GenericEvent, workqueue.RateLimitingInterface) {
52+
}
53+
54+
func (h *EnqueueRequestsForPodsEvent) enqueueImpactedIngresses(pod *corev1.Pod, queue workqueue.RateLimitingInterface) {
55+
ingressList := &extensions.IngressList{}
56+
if err := h.Cache.List(context.Background(), client.InNamespace(pod.Namespace), ingressList); err != nil {
57+
glog.Errorf("failed to fetch ingresses impacted by pod %s due to %v", pod.GetName(), err)
58+
return
59+
}
60+
61+
if pod.Status.PodIP == "" {
62+
return
63+
}
64+
65+
for _, ingress := range ingressList.Items {
66+
if !class.IsValidIngress(h.IngressClass, &ingress) {
67+
continue
68+
}
69+
70+
backends, _, err := tg.ExtractTargetGroupBackends(&ingress)
71+
if err != nil {
72+
glog.Errorf("failed to extract backend services from ingress %s/%s, reconciling the ingress. Error: %e",
73+
ingress.Namespace, ingress.Name, err)
74+
queue.Add(reconcile.Request{
75+
NamespacedName: types.NamespacedName{
76+
Namespace: ingress.Namespace,
77+
Name: ingress.Name,
78+
},
79+
})
80+
break
81+
}
82+
83+
for _, backend := range backends {
84+
endpoint := &corev1.Endpoints{}
85+
nspname := types.NamespacedName{
86+
Namespace: ingress.Namespace,
87+
Name: backend.ServiceName,
88+
}
89+
if err = h.Cache.Get(context.Background(), nspname, endpoint); err != nil {
90+
glog.Errorf("failed to fetch enpoint %s backing ingress %s/%s, ignoring",
91+
backend.ServiceName, ingress.Namespace, ingress.Name)
92+
continue
93+
}
94+
95+
if h.isPodInEndpoint(pod, endpoint) {
96+
queue.Add(reconcile.Request{
97+
NamespacedName: types.NamespacedName{
98+
Namespace: ingress.Namespace,
99+
Name: ingress.Name,
100+
},
101+
})
102+
break
103+
}
104+
}
105+
}
106+
}
107+
108+
func (h *EnqueueRequestsForPodsEvent) isPodInEndpoint(pod *corev1.Pod, endpoint *corev1.Endpoints) bool {
109+
for _, sub := range endpoint.Subsets {
110+
for _, addr := range sub.Addresses {
111+
if addr.IP == "" || addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" || addr.TargetRef.Name != pod.Name {
112+
continue
113+
}
114+
return true
115+
}
116+
for _, addr := range sub.NotReadyAddresses {
117+
if addr.IP == "" || addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" || addr.TargetRef.Name != pod.Name {
118+
continue
119+
}
120+
return true
121+
}
122+
}
123+
return false
124+
}

0 commit comments

Comments
 (0)