Skip to content

React on pod events for readiness gates #1214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions internal/ingress/backend/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,10 @@ func (resolver *endpointResolver) resolveIP(ingress *extensions.Ingress, backend
continue
}

// check if all containers are ready
for _, condition := range pod.Status.Conditions {
if condition.Type == api.ContainersReady {
if condition.Status == api.ConditionTrue {
addresses = append(addresses, epAddr)
}
break
}
if IsPodSuitableAsIPTarget(pod) {
addresses = append(addresses, epAddr)
}

}
for _, epAddr := range addresses {
result = append(result, &elbv2.TargetDescription{
Expand Down Expand Up @@ -226,6 +221,17 @@ func IsNodeSuitableAsTrafficProxy(node *corev1.Node) bool {
return false
}

// IsPodSuitableAsIPTarget check whether pod is suitable as a TargetGroup's target
// (currently tested: are all pod's containers ready?).
func IsPodSuitableAsIPTarget(pod *corev1.Pod) bool {
for _, condition := range pod.Status.Conditions {
if condition.Type == api.ContainersReady {
return condition.Status == api.ConditionTrue
}
}
return false
}

// findServiceAndPort returns the service & servicePort by name
func findServiceAndPort(store store.Storer, namespace string, serviceName string, servicePort intstr.IntOrString) (*corev1.Service, *corev1.ServicePort, error) {
serviceKey := namespace + "/" + serviceName
Expand Down
6 changes: 6 additions & 0 deletions internal/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ func watchClusterEvents(c controller.Controller, cache cache.Cache, ingressChan
}); err != nil {
return err
}
if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handlers.EnqueueRequestsForPodsEvent{
IngressClass: ingressClass,
Cache: cache,
}); err != nil {
return err
}

return nil
}
124 changes: 124 additions & 0 deletions internal/ingress/controller/handlers/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package handlers

import (
"context"

"github.com/golang/glog"
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/alb/tg"
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/ingress/annotations/class"
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/ingress/backend"
corev1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var _ handler.EventHandler = (*EnqueueRequestsForPodsEvent)(nil)

type EnqueueRequestsForPodsEvent struct {
IngressClass string
Cache cache.Cache
}

// Create is called in response to an create event - e.g. Pod Creation.
func (h *EnqueueRequestsForPodsEvent) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) {
}

// Update is called in response to an update event - e.g. Pod Updated.
func (h *EnqueueRequestsForPodsEvent) Update(e event.UpdateEvent, queue workqueue.RateLimitingInterface) {
podOld := e.ObjectOld.(*corev1.Pod)
podNew := e.ObjectNew.(*corev1.Pod)

// we only enqueue reconcile events for pods whose containers changed state
// (ContainersReady vs not ContainersReady).
if backend.IsPodSuitableAsIPTarget(podNew) != backend.IsPodSuitableAsIPTarget(podOld) {
// ... and only for pods referenced by an endpoint backing an ingress:
h.enqueueImpactedIngresses(podNew, queue)
}
}

// Delete is called in response to a delete event - e.g. Pod Deleted.
func (h *EnqueueRequestsForPodsEvent) Delete(e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
}

// Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or
// external trigger request - e.g. reconcile Autoscaling, or a Webhook.
func (h *EnqueueRequestsForPodsEvent) Generic(event.GenericEvent, workqueue.RateLimitingInterface) {
}

func (h *EnqueueRequestsForPodsEvent) enqueueImpactedIngresses(pod *corev1.Pod, queue workqueue.RateLimitingInterface) {
ingressList := &extensions.IngressList{}
if err := h.Cache.List(context.Background(), client.InNamespace(pod.Namespace), ingressList); err != nil {
glog.Errorf("failed to fetch ingresses impacted by pod %s due to %v", pod.GetName(), err)
return
}

if pod.Status.PodIP == "" {
return
}

for _, ingress := range ingressList.Items {
if !class.IsValidIngress(h.IngressClass, &ingress) {
continue
}

backends, _, err := tg.ExtractTargetGroupBackends(&ingress)
if err != nil {
glog.Errorf("failed to extract backend services from ingress %s/%s, reconciling the ingress. Error: %e",
ingress.Namespace, ingress.Name, err)
queue.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: ingress.Namespace,
Name: ingress.Name,
},
})
break
}

for _, backend := range backends {
endpoint := &corev1.Endpoints{}
nspname := types.NamespacedName{
Namespace: ingress.Namespace,
Name: backend.ServiceName,
}
if err = h.Cache.Get(context.Background(), nspname, endpoint); err != nil {
glog.Errorf("failed to fetch enpoint %s backing ingress %s/%s, ignoring",
backend.ServiceName, ingress.Namespace, ingress.Name)
continue
}

if h.isPodInEndpoint(pod, endpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to only reconcile the ingress if the ContainersReady condition is true to avoid unnecessary reconciliation iterations (and thus many AWS API calls) on frequent pod status updates during pod startup?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@devkid the point of reconciliations IMO is that you don't have to rely on state comparison to make a decision.

In our test we didn't had any issues yet with AWS API limits.

Also we shouldn't rely on the fact that right now only containers ready is the important and driving factor for registering pods in the endpoints table. Upstream k8s could change or extend that logic any time and it would break the implementation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @nirnanaaa, that would specialise the watch by embedding expectations on implementations details of an unrelated part in the code base (which might change); eg. setting a trap for futur maintainers.

Copy link
Collaborator

@M00nF1sh M00nF1sh Apr 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nirnanaaa
I think we need to have the ContainerReady check. we have the check to decide whether to register the Pod to ALB anyway. Under current architecture, the reconcile will call AWS APIs to check every ALB settings before reconcile targetGroups, which is inefficient and cause troubles if there are many ingresses.

I think it's better to have below changes:

  1. have a common function defined in backends package like "IsPodSuitableAsIPTarget", which checks containersReady
  2. change https://github.com/kubernetes-sigs/aws-alb-ingress-controller/blob/master/internal/ingress/backend/endpoint.go#L186 to call the function above for decide whether pod in NotReadyAddresses should be registered.
  3. only enqueue Ingress for pod Update event when (IsPodSuitableAsIPTarget(podNew) != IsPodSuitableAsIPTarget(podOld)) && PodInEndpoint(podNew)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@M00nF1sh does that really justify adding all the complex logic to have oldRef stored, the handling if no old ref was found in store etc ? I don't think the comparison would be a huge deal other than duplicating logic.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@M00nF1sh pushed a commit doing exactly that

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, ready to go :D, just one nit, we need a "break" after queue.Add :D

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, added the break (optimize the loop but precludes pods being part of several readiness gate backed ingresses - which I guess was a shady corner case wrt readiness gates semantics anyway?).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bpineau The break would only quit the loop that iterates over the backends, i.e. all other ingresses would still be processed. Enqueuing the ingress for reconciliation will reconcile all target groups anyhow.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@devkid you're right

queue.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: ingress.Namespace,
Name: ingress.Name,
},
})
break
}
}
}
}

func (h *EnqueueRequestsForPodsEvent) isPodInEndpoint(pod *corev1.Pod, endpoint *corev1.Endpoints) bool {
for _, sub := range endpoint.Subsets {
for _, addr := range sub.Addresses {
if addr.IP == "" || addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" || addr.TargetRef.Name != pod.Name {
continue
}
return true
}
for _, addr := range sub.NotReadyAddresses {
if addr.IP == "" || addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" || addr.TargetRef.Name != pod.Name {
continue
}
return true
}
}
return false
}