Skip to content

add event handler for ingressClass&ingressClassParams events #1991

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions controllers/ingress/eventhandlers/ingress_class_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package eventhandlers

import (
"context"
"github.com/go-logr/logr"
networking "k8s.io/api/networking/v1beta1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/aws-load-balancer-controller/pkg/ingress"
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
)

// NewEnqueueRequestsForIngressClassEvent constructs new enqueueRequestsForIngressClassEvent.
func NewEnqueueRequestsForIngressClassEvent(ingEventChan chan<- event.GenericEvent,
k8sClient client.Client, eventRecorder record.EventRecorder, logger logr.Logger) *enqueueRequestsForIngressClassEvent {
return &enqueueRequestsForIngressClassEvent{
ingEventChan: ingEventChan,
k8sClient: k8sClient,
eventRecorder: eventRecorder,
logger: logger,
}
}

var _ handler.EventHandler = (*enqueueRequestsForIngressClassEvent)(nil)

type enqueueRequestsForIngressClassEvent struct {
ingEventChan chan<- event.GenericEvent
k8sClient client.Client
eventRecorder record.EventRecorder
logger logr.Logger
}

func (h *enqueueRequestsForIngressClassEvent) Create(e event.CreateEvent, _ workqueue.RateLimitingInterface) {
h.enqueueImpactedIngresses(e.Meta)
}

func (h *enqueueRequestsForIngressClassEvent) Update(e event.UpdateEvent, _ workqueue.RateLimitingInterface) {
ingClassOld := e.ObjectOld.(*networking.IngressClass)
ingClassNew := e.ObjectNew.(*networking.IngressClass)

// we only care below update event:
// 2. IngressClass spec updates
// 3. IngressClass deletions
if equality.Semantic.DeepEqual(ingClassOld.Spec, ingClassNew.Spec) &&
equality.Semantic.DeepEqual(ingClassOld.DeletionTimestamp.IsZero(), ingClassNew.DeletionTimestamp.IsZero()) {
return
}

h.enqueueImpactedIngresses(e.MetaNew)
}

func (h *enqueueRequestsForIngressClassEvent) Delete(e event.DeleteEvent, _ workqueue.RateLimitingInterface) {
h.enqueueImpactedIngresses(e.Meta)
}

func (h *enqueueRequestsForIngressClassEvent) Generic(e event.GenericEvent, _ workqueue.RateLimitingInterface) {
h.enqueueImpactedIngresses(e.Meta)
}

func (h *enqueueRequestsForIngressClassEvent) enqueueImpactedIngresses(ingClass metav1.Object) {
ingList := &networking.IngressList{}
if err := h.k8sClient.List(context.Background(), ingList,
client.MatchingFields{ingress.IndexKeyIngressClassRefName: ingClass.GetName()}); err != nil {
h.logger.Error(err, "failed to fetch ingresses")
return
}

for index := range ingList.Items {
ing := &ingList.Items[index]
meta, _ := meta.Accessor(ing)

h.logger.V(1).Info("enqueue ingress for ingressClass event",
"ingressClass", ingClass.GetName(),
"ingress", k8s.NamespacedName(ing))
h.ingEventChan <- event.GenericEvent{
Meta: meta,
Object: ing,
}
}
}
86 changes: 86 additions & 0 deletions controllers/ingress/eventhandlers/ingress_class_params_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package eventhandlers

import (
"context"
"github.com/go-logr/logr"
networking "k8s.io/api/networking/v1beta1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1"
"sigs.k8s.io/aws-load-balancer-controller/pkg/ingress"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
)

// NewEnqueueRequestsForIngressClassParamsEvent constructs new enqueueRequestsForIngressClassParamsEvent.
func NewEnqueueRequestsForIngressClassParamsEvent(ingClassEventChan chan<- event.GenericEvent,
k8sClient client.Client, eventRecorder record.EventRecorder, logger logr.Logger) *enqueueRequestsForIngressClassParamsEvent {
return &enqueueRequestsForIngressClassParamsEvent{
ingClassEventChan: ingClassEventChan,
k8sClient: k8sClient,
eventRecorder: eventRecorder,
logger: logger,
}
}

var _ handler.EventHandler = (*enqueueRequestsForIngressClassParamsEvent)(nil)

type enqueueRequestsForIngressClassParamsEvent struct {
ingClassEventChan chan<- event.GenericEvent
k8sClient client.Client
eventRecorder record.EventRecorder
logger logr.Logger
}

func (h *enqueueRequestsForIngressClassParamsEvent) Create(e event.CreateEvent, _ workqueue.RateLimitingInterface) {
h.enqueueImpactedIngressClasses(e.Meta)
}

func (h *enqueueRequestsForIngressClassParamsEvent) Update(e event.UpdateEvent, _ workqueue.RateLimitingInterface) {
ingClassParamsOld := e.ObjectOld.(*elbv2api.IngressClassParams)
ingClassParamsNew := e.ObjectNew.(*elbv2api.IngressClassParams)

// we only care below update event:
// 2. IngressClassParams spec updates
// 3. IngressClassParams deletion
if equality.Semantic.DeepEqual(ingClassParamsOld.Spec, ingClassParamsNew.Spec) &&
equality.Semantic.DeepEqual(ingClassParamsOld.DeletionTimestamp.IsZero(), ingClassParamsNew.DeletionTimestamp.IsZero()) {
return
}

h.enqueueImpactedIngressClasses(e.MetaNew)
}

func (h *enqueueRequestsForIngressClassParamsEvent) Delete(e event.DeleteEvent, _ workqueue.RateLimitingInterface) {
h.enqueueImpactedIngressClasses(e.Meta)
}

func (h *enqueueRequestsForIngressClassParamsEvent) Generic(e event.GenericEvent, _ workqueue.RateLimitingInterface) {
// we don't have any generic event for secrets.
}

//
func (h *enqueueRequestsForIngressClassParamsEvent) enqueueImpactedIngressClasses(ingClassParams metav1.Object) {
ingClassList := &networking.IngressClassList{}
if err := h.k8sClient.List(context.Background(), ingClassList,
client.MatchingFields{ingress.IndexKeyIngressClassParamsRefName: ingClassParams.GetName()}); err != nil {
h.logger.Error(err, "failed to fetch ingressClasses")
return
}
for index := range ingClassList.Items {
ingClass := &ingClassList.Items[index]
meta, _ := meta.Accessor(ingClass)

h.logger.V(1).Info("enqueue ingressClass for ingressClassParams event",
"ingressClassParams", ingClassParams.GetName(),
"ingressClass", ingClass.GetName())
h.ingClassEventChan <- event.GenericEvent{
Meta: meta,
Object: ingClass,
}
}
}
67 changes: 61 additions & 6 deletions controllers/ingress/group_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1"
"sigs.k8s.io/aws-load-balancer-controller/controllers/ingress/eventhandlers"
"sigs.k8s.io/aws-load-balancer-controller/pkg/annotations"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws"
Expand All @@ -30,6 +33,10 @@ import (
const (
ingressTagPrefix = "ingress.k8s.aws"
controllerName = "ingress"

// the groupVersion of used Ingress & IngressClass resource.
ingressResourcesGroupVersion = "networking.k8s.io/v1beta1"
ingressClassKind = "IngressClass"
)

// NewGroupReconciler constructs new GroupReconciler
Expand Down Expand Up @@ -197,24 +204,30 @@ func (r *groupReconciler) updateIngressStatus(ctx context.Context, lbDNS string,
return nil
}

func (r *groupReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
func (r *groupReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, clientSet *kubernetes.Clientset) error {
c, err := controller.New(controllerName, mgr, controller.Options{
MaxConcurrentReconciles: r.maxConcurrentReconciles,
Reconciler: r,
})
if err != nil {
return err
}
if err := r.setupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {

resList, err := clientSet.ServerResourcesForGroupVersion(ingressResourcesGroupVersion)
if err != nil {
return err
}
if err := r.setupWatches(ctx, c); err != nil {
ingressClassResourceAvailable := isResourceKindAvailable(resList, ingressClassKind)
if err := r.setupIndexes(ctx, mgr.GetFieldIndexer(), ingressClassResourceAvailable); err != nil {
return err
}
if err := r.setupWatches(ctx, c, ingressClassResourceAvailable); err != nil {
return err
}
return nil
}

func (r *groupReconciler) setupIndexes(ctx context.Context, fieldIndexer client.FieldIndexer) error {
func (r *groupReconciler) setupIndexes(ctx context.Context, fieldIndexer client.FieldIndexer, ingressClassResourceAvailable bool) error {
if err := fieldIndexer.IndexField(ctx, &networking.Ingress{}, ingress.IndexKeyServiceRefName,
func(obj k8sruntime.Object) []string {
return r.referenceIndexer.BuildServiceRefIndexes(context.Background(), obj.(*networking.Ingress))
Expand All @@ -236,10 +249,26 @@ func (r *groupReconciler) setupIndexes(ctx context.Context, fieldIndexer client.
); err != nil {
return err
}
if ingressClassResourceAvailable {
if err := fieldIndexer.IndexField(ctx, &networking.IngressClass{}, ingress.IndexKeyIngressClassParamsRefName,
func(obj k8sruntime.Object) []string {
return r.referenceIndexer.BuildIngressClassParamsRefIndexes(ctx, obj.(*networking.IngressClass))
},
); err != nil {
return err
}
if err := fieldIndexer.IndexField(ctx, &networking.Ingress{}, ingress.IndexKeyIngressClassRefName,
func(obj k8sruntime.Object) []string {
return r.referenceIndexer.BuildIngressClassRefIndexes(ctx, obj.(*networking.Ingress))
},
); err != nil {
return err
}
}
return nil
}

func (r *groupReconciler) setupWatches(_ context.Context, c controller.Controller) error {
func (r *groupReconciler) setupWatches(_ context.Context, c controller.Controller, ingressClassResourceAvailable bool) error {
ingEventChan := make(chan event.GenericEvent)
svcEventChan := make(chan event.GenericEvent)
ingEventHandler := eventhandlers.NewEnqueueRequestsForIngressEvent(r.groupLoader, r.eventRecorder,
Expand All @@ -248,7 +277,6 @@ func (r *groupReconciler) setupWatches(_ context.Context, c controller.Controlle
r.logger.WithName("eventHandlers").WithName("service"))
secretEventHandler := eventhandlers.NewEnqueueRequestsForSecretEvent(ingEventChan, svcEventChan, r.k8sClient, r.eventRecorder,
r.logger.WithName("eventHandlers").WithName("secret"))

if err := c.Watch(&source.Channel{Source: ingEventChan}, ingEventHandler); err != nil {
return err
}
Expand All @@ -264,5 +292,32 @@ func (r *groupReconciler) setupWatches(_ context.Context, c controller.Controlle
if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, secretEventHandler); err != nil {
return err
}

if ingressClassResourceAvailable {
ingClassEventChan := make(chan event.GenericEvent)
ingClassParamsEventHandler := eventhandlers.NewEnqueueRequestsForIngressClassParamsEvent(ingClassEventChan, r.k8sClient, r.eventRecorder,
r.logger.WithName("eventHandlers").WithName("ingressClassParams"))
ingClassEventHandler := eventhandlers.NewEnqueueRequestsForIngressClassEvent(ingEventChan, r.k8sClient, r.eventRecorder,
r.logger.WithName("eventHandlers").WithName("ingressClass"))
if err := c.Watch(&source.Channel{Source: ingClassEventChan}, ingClassEventHandler); err != nil {
return err
}
if err := c.Watch(&source.Kind{Type: &elbv2api.IngressClassParams{}}, ingClassParamsEventHandler); err != nil {
return err
}
if err := c.Watch(&source.Kind{Type: &networking.IngressClass{}}, ingClassEventHandler); err != nil {
return err
}
}
return nil
}

// isResourceKindAvailable checks whether specific kind is available.
func isResourceKindAvailable(resList *metav1.APIResourceList, kind string) bool {
for _, res := range resList.APIResources {
if res.Kind == kind {
return true
}
}
return false
}
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func main() {
setupLog.Error(err, "unable to obtain clientSet")
os.Exit(1)
}

podInfoRepo := k8s.NewDefaultPodInfoRepo(clientSet.CoreV1().RESTClient(), rtOpts.Namespace, ctrl.Log)
finalizerManager := k8s.NewDefaultFinalizerManager(mgr.GetClient(), ctrl.Log)
podENIResolver := networking.NewDefaultPodENIInfoResolver(cloud.EC2(), cloud.VpcID(), ctrl.Log)
Expand All @@ -118,7 +119,7 @@ func main() {
finalizerManager, tgbResManager,
controllerCFG, ctrl.Log.WithName("controllers").WithName("targetGroupBinding"))
ctx := context.Background()
if err = ingGroupReconciler.SetupWithManager(ctx, mgr); err != nil {
if err = ingGroupReconciler.SetupWithManager(ctx, mgr, clientSet); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Ingress")
os.Exit(1)
}
Expand Down
38 changes: 36 additions & 2 deletions pkg/ingress/reference_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,35 @@ package ingress

import (
"context"
awssdk "github.com/aws/aws-sdk-go/aws"
"github.com/go-logr/logr"
networking "k8s.io/api/networking/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1"
)

const (
// IndexKey for services referenced by Ingress.
// IndexKeyServiceRefName is index key for services referenced by Ingress.
IndexKeyServiceRefName = "ingress.serviceRef.name"
// IndexKey for secrets referenced by Ingress or Service.
// IndexKeySecretRefName is index key for secrets referenced by Ingress or Service.
IndexKeySecretRefName = "ingress.secretRef.name"
// IndexKeyIngressClassRefName is index key for ingressClass referenced by Ingress.
IndexKeyIngressClassRefName = "ingress.ingressClassRef.name"
// IndexKeyIngressClassParamsRefName is index key for ingressClassParams referenced by IngressClass.
IndexKeyIngressClassParamsRefName = "ingressClass.ingressClassParamsRef.name"
)

// ReferenceIndexer has the ability to index Ingresses with referenced objects.
type ReferenceIndexer interface {
// BuildServiceRefIndexes returns the name of related Service objects.
BuildServiceRefIndexes(ctx context.Context, ing *networking.Ingress) []string
// BuildSecretRefIndexes returns the name of related Secret objects.
BuildSecretRefIndexes(ctx context.Context, ingOrSvc metav1.Object) []string
// BuildIngressClassRefIndexes returns the name of related IngressClass objects.
BuildIngressClassRefIndexes(ctx context.Context, ing *networking.Ingress) []string
// BuildIngressClassParamsRefIndexes returns the name of related IngressClassParams objects.
BuildIngressClassParamsRefIndexes(ctx context.Context, ingClass *networking.IngressClass) []string
}

// NewDefaultReferenceIndexer constructs new defaultReferenceIndexer.
Expand Down Expand Up @@ -80,6 +92,28 @@ func (i *defaultReferenceIndexer) BuildSecretRefIndexes(ctx context.Context, ing
return extractSecretNamesFromAuthConfig(authCfg)
}

func (i *defaultReferenceIndexer) BuildIngressClassRefIndexes(_ context.Context, ing *networking.Ingress) []string {
if ing.Spec.IngressClassName == nil {
return nil
}

ingClassName := awssdk.StringValue(ing.Spec.IngressClassName)
return []string{ingClassName}
}

func (i *defaultReferenceIndexer) BuildIngressClassParamsRefIndexes(_ context.Context, ingClass *networking.IngressClass) []string {
if ingClass.Spec.Controller != ingressClassControllerALB || ingClass.Spec.Parameters == nil {
return nil
}
if ingClass.Spec.Parameters.APIGroup == nil ||
(*ingClass.Spec.Parameters.APIGroup) != elbv2api.GroupVersion.Group ||
ingClass.Spec.Parameters.Kind != ingressClassParamsKind {
return nil
}
ingClassParamsName := ingClass.Spec.Parameters.Name
return []string{ingClassParamsName}
}

func extractServiceNamesFromAction(action Action) []string {
if action.Type != ActionTypeForward || action.ForwardConfig == nil {
return nil
Expand Down
Loading