Skip to content

add support for loadBalancerClass #2489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apis/elbv2/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apis/elbv2/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 9 additions & 24 deletions controllers/service/eventhandlers/service_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,28 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/aws-load-balancer-controller/pkg/annotations"
svcpkg "sigs.k8s.io/aws-load-balancer-controller/pkg/service"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// NewEnqueueRequestForServiceEvent constructs new enqueueRequestsForServiceEvent.
func NewEnqueueRequestForServiceEvent(eventRecorder record.EventRecorder, annotationParser annotations.Parser, logger logr.Logger) *enqueueRequestsForServiceEvent {
func NewEnqueueRequestForServiceEvent(eventRecorder record.EventRecorder,
serviceUtils svcpkg.ServiceUtils, logger logr.Logger) *enqueueRequestsForServiceEvent {
return &enqueueRequestsForServiceEvent{
eventRecorder: eventRecorder,
annotationParser: annotationParser,
logger: logger,
eventRecorder: eventRecorder,
serviceUtils: serviceUtils,
logger: logger,
}
}

var _ handler.EventHandler = (*enqueueRequestsForServiceEvent)(nil)

type enqueueRequestsForServiceEvent struct {
eventRecorder record.EventRecorder
annotationParser annotations.Parser
logger logr.Logger
eventRecorder record.EventRecorder
serviceUtils svcpkg.ServiceUtils
logger logr.Logger
}

func (h *enqueueRequestsForServiceEvent) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) {
Expand Down Expand Up @@ -57,24 +57,9 @@ func (h *enqueueRequestsForServiceEvent) Delete(e event.DeleteEvent, queue workq
func (h *enqueueRequestsForServiceEvent) Generic(e event.GenericEvent, queue workqueue.RateLimitingInterface) {
}

func (h *enqueueRequestsForServiceEvent) isServiceSupported(service *corev1.Service) bool {
lbType := ""
_ = h.annotationParser.ParseStringAnnotation(annotations.SvcLBSuffixLoadBalancerType, &lbType, service.Annotations)
if lbType == svcpkg.LoadBalancerTypeNLBIP {
return true
}
var lbTargetType string
_ = h.annotationParser.ParseStringAnnotation(annotations.SvcLBSuffixTargetType, &lbTargetType, service.Annotations)
if lbType == svcpkg.LoadBalancerTypeExternal && (lbTargetType == svcpkg.LoadBalancerTargetTypeIP ||
lbTargetType == svcpkg.LoadBalancerTargetTypeInstance) {
return true
}
return false
}

func (h *enqueueRequestsForServiceEvent) enqueueManagedService(queue workqueue.RateLimitingInterface, service *corev1.Service) {
// Check if the svc needs to be handled
if !h.isServiceSupported(service) {
if !h.serviceUtils.IsServicePendingFinalization(service) && !h.serviceUtils.IsServiceSupported(service) {
return
}
queue.Add(reconcile.Request{
Expand Down
69 changes: 47 additions & 22 deletions controllers/service/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,18 @@ func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorde
annotationParser := annotations.NewSuffixAnnotationParser(serviceAnnotationPrefix)
trackingProvider := tracking.NewDefaultProvider(serviceTagPrefix, config.ClusterName)
elbv2TaggingManager := elbv2.NewDefaultTaggingManager(cloud.ELBV2(), cloud.VpcID(), config.FeatureGates, logger)
serviceUtils := service.NewServiceUtils(annotationParser, serviceFinalizer, config.ServiceConfig.LoadBalancerClass, config.FeatureGates)
modelBuilder := service.NewDefaultModelBuilder(annotationParser, subnetsResolver, vpcInfoProvider, cloud.VpcID(), trackingProvider,
elbv2TaggingManager, config.ClusterName, config.DefaultTags, config.ExternalManagedTags, config.DefaultSSLPolicy)
elbv2TaggingManager, config.ClusterName, config.DefaultTags, config.ExternalManagedTags, config.DefaultSSLPolicy, serviceUtils)
stackMarshaller := deploy.NewDefaultStackMarshaller()
stackDeployer := deploy.NewDefaultStackDeployer(cloud, k8sClient, networkingSGManager, networkingSGReconciler, config, serviceTagPrefix, logger)
return &serviceReconciler{
k8sClient: k8sClient,
eventRecorder: eventRecorder,
finalizerManager: finalizerManager,
annotationParser: annotationParser,
k8sClient: k8sClient,
eventRecorder: eventRecorder,
finalizerManager: finalizerManager,
annotationParser: annotationParser,
loadBalancerClass: config.ServiceConfig.LoadBalancerClass,
serviceUtils: serviceUtils,

modelBuilder: modelBuilder,
stackMarshaller: stackMarshaller,
Expand All @@ -61,10 +64,12 @@ func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorde
}

type serviceReconciler struct {
k8sClient client.Client
eventRecorder record.EventRecorder
finalizerManager k8s.FinalizerManager
annotationParser annotations.Parser
k8sClient client.Client
eventRecorder record.EventRecorder
finalizerManager k8s.FinalizerManager
annotationParser annotations.Parser
loadBalancerClass string
serviceUtils service.ServiceUtils

modelBuilder service.ModelBuilder
stackMarshaller deploy.StackMarshaller
Expand All @@ -87,13 +92,17 @@ func (r *serviceReconciler) reconcile(ctx context.Context, req ctrl.Request) err
if err := r.k8sClient.Get(ctx, req.NamespacedName, svc); err != nil {
return client.IgnoreNotFound(err)
}
if !svc.DeletionTimestamp.IsZero() {
return r.cleanupLoadBalancerResources(ctx, svc)
stack, lb, err := r.buildModel(ctx, svc)
if err != nil {
return err
}
return r.reconcileLoadBalancerResources(ctx, svc)
if lb == nil {
return r.cleanupLoadBalancerResources(ctx, svc, stack)
}
return r.reconcileLoadBalancerResources(ctx, svc, stack, lb)
}

func (r *serviceReconciler) buildAndDeployModel(ctx context.Context, svc *corev1.Service) (core.Stack, *elbv2model.LoadBalancer, error) {
func (r *serviceReconciler) buildModel(ctx context.Context, svc *corev1.Service) (core.Stack, *elbv2model.LoadBalancer, error) {
stack, lb, err := r.modelBuilder.Build(ctx, svc)
if err != nil {
r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedBuildModel, fmt.Sprintf("Failed build model due to %v", err))
Expand All @@ -105,22 +114,25 @@ func (r *serviceReconciler) buildAndDeployModel(ctx context.Context, svc *corev1
return nil, nil, err
}
r.logger.Info("successfully built model", "model", stackJSON)
return stack, lb, nil
}

if err = r.stackDeployer.Deploy(ctx, stack); err != nil {
func (r *serviceReconciler) deployModel(ctx context.Context, svc *corev1.Service, stack core.Stack) error {
if err := r.stackDeployer.Deploy(ctx, stack); err != nil {
r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedDeployModel, fmt.Sprintf("Failed deploy model due to %v", err))
return nil, nil, err
return err
}
r.logger.Info("successfully deployed model", "service", k8s.NamespacedName(svc))

return stack, lb, nil
return nil
}

func (r *serviceReconciler) reconcileLoadBalancerResources(ctx context.Context, svc *corev1.Service) error {
func (r *serviceReconciler) reconcileLoadBalancerResources(ctx context.Context, svc *corev1.Service, stack core.Stack, lb *elbv2model.LoadBalancer) error {
if err := r.finalizerManager.AddFinalizers(ctx, svc, serviceFinalizer); err != nil {
r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedAddFinalizer, fmt.Sprintf("Failed add finalizer due to %v", err))
return err
}
_, lb, err := r.buildAndDeployModel(ctx, svc)
err := r.deployModel(ctx, svc, stack)
if err != nil {
return err
}
Expand All @@ -137,12 +149,16 @@ func (r *serviceReconciler) reconcileLoadBalancerResources(ctx context.Context,
return nil
}

func (r *serviceReconciler) cleanupLoadBalancerResources(ctx context.Context, svc *corev1.Service) error {
func (r *serviceReconciler) cleanupLoadBalancerResources(ctx context.Context, svc *corev1.Service, stack core.Stack) error {
if k8s.HasFinalizer(svc, serviceFinalizer) {
_, _, err := r.buildAndDeployModel(ctx, svc)
err := r.deployModel(ctx, svc, stack)
if err != nil {
return err
}
if err = r.cleanupServiceStatus(ctx, svc); err != nil {
r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedCleanupStatus, fmt.Sprintf("Failed update status due to %v", err))
return err
}
if err := r.finalizerManager.RemoveFinalizers(ctx, svc, serviceFinalizer); err != nil {
r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedRemoveFinalizer, fmt.Sprintf("Failed remove finalizer due to %v", err))
return err
Expand All @@ -168,6 +184,15 @@ func (r *serviceReconciler) updateServiceStatus(ctx context.Context, lbDNS strin
return nil
}

func (r *serviceReconciler) cleanupServiceStatus(ctx context.Context, svc *corev1.Service) error {
svcOld := svc.DeepCopy()
svc.Status.LoadBalancer = corev1.LoadBalancerStatus{}
if err := r.k8sClient.Status().Patch(ctx, svc, client.MergeFrom(svcOld)); err != nil {
return errors.Wrapf(err, "failed to cleanup service status: %v", k8s.NamespacedName(svc))
}
return nil
}

func (r *serviceReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
c, err := controller.New(controllerName, mgr, controller.Options{
MaxConcurrentReconciles: r.maxConcurrentReconciles,
Expand All @@ -183,8 +208,8 @@ func (r *serviceReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
}

func (r *serviceReconciler) setupWatches(_ context.Context, c controller.Controller) error {
svcEventHandler := eventhandlers.NewEnqueueRequestForServiceEvent(r.eventRecorder, r.annotationParser,
r.logger.WithName("eventHandlers").WithName("service"))
svcEventHandler := eventhandlers.NewEnqueueRequestForServiceEvent(r.eventRecorder,
r.serviceUtils, r.logger.WithName("eventHandlers").WithName("service"))
if err := c.Watch(&source.Kind{Type: &corev1.Service{}}, svcEventHandler); err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/controller_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type ControllerConfig struct {
IngressConfig IngressConfig
// Configurations for Addons feature
AddonsConfig AddonsConfig
// Configurations for the Service controller
ServiceConfig ServiceConfig

// Default AWS Tags that will be applied to all AWS resources managed by this controller.
DefaultTags map[string]string
Expand Down Expand Up @@ -127,6 +129,7 @@ func (cfg *ControllerConfig) BindFlags(fs *pflag.FlagSet) {
cfg.PodWebhookConfig.BindFlags(fs)
cfg.IngressConfig.BindFlags(fs)
cfg.AddonsConfig.BindFlags(fs)
cfg.ServiceConfig.BindFlags(fs)
}

// Validate the controller configuration
Expand Down
10 changes: 6 additions & 4 deletions pkg/config/feature_gates.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
type Feature string

const (
ListenerRulesTagging Feature = "ListenerRulesTagging"
WeightedTargetGroups Feature = "WeightedTargetGroups"
ListenerRulesTagging Feature = "ListenerRulesTagging"
WeightedTargetGroups Feature = "WeightedTargetGroups"
ServiceTypeLoadBalancerOnly Feature = "ServiceTypeLoadBalancerOnly"
)

type FeatureGates interface {
Expand Down Expand Up @@ -39,8 +40,9 @@ type defaultFeatureGates struct {
func NewFeatureGates() FeatureGates {
return &defaultFeatureGates{
featureState: map[Feature]bool{
ListenerRulesTagging: true,
WeightedTargetGroups: true,
ListenerRulesTagging: true,
WeightedTargetGroups: true,
ServiceTypeLoadBalancerOnly: false,
},
}
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/config/service_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package config

import "github.com/spf13/pflag"

const (
flagLoadBalancerClass = "load-balancer-class"
defaultLoadBalancerClass = "service.k8s.aws/nlb"
)

// ServiceConfig contains the configurations for the Service controller
type ServiceConfig struct {
// LoadBalancerClass is the name of the load balancer class reconciled by this controller
LoadBalancerClass string
}

// BindFlags binds the command line flags to the fields in the config object
func (cfg *ServiceConfig) BindFlags(fs *pflag.FlagSet) {
fs.StringVar(&cfg.LoadBalancerClass, flagLoadBalancerClass, defaultLoadBalancerClass,
"Name of the load balancer class reconciled by this controller")
}
1 change: 1 addition & 0 deletions pkg/k8s/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
ServiceEventReasonFailedAddFinalizer = "FailedAddFinalizer"
ServiceEventReasonFailedRemoveFinalizer = "FailedRemoveFinalizer"
ServiceEventReasonFailedUpdateStatus = "FailedUpdateStatus"
ServiceEventReasonFailedCleanupStatus = "FailedCleanupStatus"
ServiceEventReasonFailedBuildModel = "FailedBuildModel"
ServiceEventReasonFailedDeployModel = "FailedDeployModel"
ServiceEventReasonSuccessfullyReconciled = "SuccessfullyReconciled"
Expand Down
19 changes: 10 additions & 9 deletions pkg/service/model_build_target_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (t *defaultModelBuildTask) buildTargetGroup(ctx context.Context, port corev
if targetGroup, exists := t.tgByResID[tgResourceID]; exists {
return targetGroup, nil
}
targetType, err := t.buildTargetType(ctx)
targetType, err := t.buildTargetType(ctx, port)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -332,22 +332,23 @@ func (t *defaultModelBuildTask) buildTargetGroupHealthCheckUnhealthyThresholdCou
return unhealthyThresholdCount, nil
}

func (t *defaultModelBuildTask) buildTargetType(_ context.Context) (elbv2model.TargetType, error) {
func (t *defaultModelBuildTask) buildTargetType(_ context.Context, port corev1.ServicePort) (elbv2model.TargetType, error) {
svcType := t.service.Spec.Type
var lbType string
_ = t.annotationParser.ParseStringAnnotation(annotations.SvcLBSuffixLoadBalancerType, &lbType, t.service.Annotations)
var lbTargetType string
lbTargetType = string(t.defaultTargetType)
_ = t.annotationParser.ParseStringAnnotation(annotations.SvcLBSuffixTargetType, &lbTargetType, t.service.Annotations)
if lbType == LoadBalancerTypeNLBIP || (lbType == LoadBalancerTypeExternal && lbTargetType == LoadBalancerTargetTypeIP) {
if lbType == LoadBalancerTypeNLBIP || lbTargetType == LoadBalancerTargetTypeIP {
return elbv2model.TargetTypeIP, nil
}
if lbType == LoadBalancerTypeExternal && lbTargetType == LoadBalancerTargetTypeInstance {
if svcType == corev1.ServiceTypeClusterIP {
return "", errors.Errorf("unsupported service type \"%v\" for load balancer target type \"%v\"", svcType, lbTargetType)
}
return elbv2model.TargetTypeInstance, nil
if svcType == corev1.ServiceTypeClusterIP {
return "", errors.Errorf("unsupported service type \"%v\" for load balancer target type \"%v\"", svcType, lbTargetType)
}
if port.NodePort == 0 && t.service.Spec.AllocateLoadBalancerNodePorts != nil && !*t.service.Spec.AllocateLoadBalancerNodePorts {
return "", errors.New("unable to support instance target type with an unallocated NodePort")
}
return "", errors.Errorf("unsupported target type \"%v\" for load balancer type \"%v\"", lbTargetType, lbType)
return elbv2model.TargetTypeInstance, nil
}

func (t *defaultModelBuildTask) buildTargetGroupResourceID(svcKey types.NamespacedName, port intstr.IntOrString) string {
Expand Down
Loading