Skip to content

Allow TargetGroup endpoints outside the ELB VPC #1862

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ func main() {
sgManager := networking.NewDefaultSecurityGroupManager(cloud.EC2(), ctrl.Log)
sgReconciler := networking.NewDefaultSecurityGroupReconciler(sgManager, ctrl.Log)
azInfoProvider := networking.NewDefaultAZInfoProvider(cloud.EC2(), ctrl.Log.WithName("az-info-provider"))
vpcInfoProvider := networking.NewDefaultVPCInfoProvider(cloud.EC2(), ctrl.Log.WithName("vpc-info-provider"))
subnetResolver := networking.NewDefaultSubnetsResolver(azInfoProvider, cloud.EC2(), cloud.VpcID(), controllerCFG.ClusterName, ctrl.Log.WithName("subnets-resolver"))
vpcResolver := networking.NewDefaultVPCResolver(cloud.EC2(), cloud.VpcID(), ctrl.Log.WithName("vpc-resolver"))
tgbResManager := targetgroupbinding.NewDefaultResourceManager(mgr.GetClient(), cloud.ELBV2(), cloud.EC2(),
podInfoRepo, sgManager, sgReconciler, cloud.VpcID(), controllerCFG.ClusterName, mgr.GetEventRecorderFor("targetGroupBinding"), ctrl.Log, controllerCFG.EnableEndpointSlices, controllerCFG.DisableRestrictedSGRules)
podInfoRepo, sgManager, sgReconciler, cloud.VpcID(), controllerCFG.ClusterName, mgr.GetEventRecorderFor("targetGroupBinding"), ctrl.Log, controllerCFG.EnableEndpointSlices, controllerCFG.DisableRestrictedSGRules, vpcInfoProvider)
backendSGProvider := networking.NewBackendSGProvider(controllerCFG.ClusterName, controllerCFG.BackendSecurityGroup,
cloud.VpcID(), cloud.EC2(), mgr.GetClient(), ctrl.Log.WithName("backend-sg-provider"))
ingGroupReconciler := ingress.NewGroupReconciler(cloud, mgr.GetClient(), mgr.GetEventRecorderFor("ingress"),
Expand Down
2 changes: 1 addition & 1 deletion pkg/aws/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Cloud interface {
// Region for the kubernetes cluster
Region() string

// VPC ID for the the kubernetes cluster
// VpcID for the LoadBalancer resources.
VpcID() string
}

Expand Down
12 changes: 9 additions & 3 deletions pkg/aws/cloud_config.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package aws

import (
"time"

"github.com/spf13/pflag"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/throttle"
)

const (
flagAWSRegion = "aws-region"
flagAWSAPIEndpoints = "aws-api-endpoints"
flagAWSAPIThrottle = "aws-api-throttle"
flagAWSVpcID = "aws-vpc-id"
flagAWSVpcCacheTTL = "aws-vpc-cache-ttl"
flagAWSMaxRetries = "aws-max-retries"
flagAWSAPIEndpoints = "aws-api-endpoints"
defaultVpcID = ""
defaultRegion = ""
defaultAPIMaxRetries = 10
Expand All @@ -23,9 +26,12 @@ type CloudConfig struct {
// Throttle settings for AWS APIs
ThrottleConfig *throttle.ServiceOperationsThrottleConfig

// VPC ID of the Kubernetes cluster
// VpcID for the LoadBalancer resources.
VpcID string

// VPC cache TTL in minutes
VpcCacheTTL time.Duration

// Max retries configuration for AWS APIs
MaxRetries int

Expand All @@ -36,7 +42,7 @@ type CloudConfig struct {
func (cfg *CloudConfig) BindFlags(fs *pflag.FlagSet) {
fs.StringVar(&cfg.Region, flagAWSRegion, defaultRegion, "AWS Region for the kubernetes cluster")
fs.Var(cfg.ThrottleConfig, flagAWSAPIThrottle, "throttle settings for AWS APIs, format: serviceID1:operationRegex1=rate:burst,serviceID2:operationRegex2=rate:burst")
fs.StringVar(&cfg.VpcID, flagAWSVpcID, defaultVpcID, "AWS VPC ID for the Kubernetes cluster")
fs.StringVar(&cfg.VpcID, flagAWSVpcID, defaultVpcID, "AWS VpcID for the LoadBalancer resources")
fs.IntVar(&cfg.MaxRetries, flagAWSMaxRetries, defaultAPIMaxRetries, "Maximum retries for AWS APIs")
fs.StringToStringVar(&cfg.AWSEndpoints, flagAWSAPIEndpoints, nil, "Custom AWS endpoint configuration, format: serviceID1=URL1,serviceID2=URL2")
}
89 changes: 89 additions & 0 deletions pkg/networking/vpc_info_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package networking

import (
"context"
"sync"
"time"

awssdk "github.com/aws/aws-sdk-go/aws"
ec2sdk "github.com/aws/aws-sdk-go/service/ec2"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/util/cache"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services"
)

const defaultVPCInfoCacheTTL = 10 * time.Minute

// VPCInfoProvider is responsible for providing VPC info.
type VPCInfoProvider interface {
FetchVPCInfo(ctx context.Context, vpcID string) (*ec2sdk.Vpc, error)
}

// NewDefaultVPCInfoProvider constructs new defaultVPCInfoProvider.
func NewDefaultVPCInfoProvider(ec2Client services.EC2, logger logr.Logger) *defaultVPCInfoProvider {
return &defaultVPCInfoProvider{
ec2Client: ec2Client,
vpcInfoCache: cache.NewExpiring(),
vpcInfoCacheMutex: sync.RWMutex{},
vpcInfoCacheTTL: defaultVPCInfoCacheTTL,
logger: logger,
}
}

var _ VPCInfoProvider = &defaultVPCInfoProvider{}

// default implementation for VPCInfoProvider.
type defaultVPCInfoProvider struct {
ec2Client services.EC2
vpcInfoCache *cache.Expiring
vpcInfoCacheMutex sync.RWMutex
vpcInfoCacheTTL time.Duration

logger logr.Logger
}

func (p *defaultVPCInfoProvider) FetchVPCInfo(ctx context.Context, vpcID string) (*ec2sdk.Vpc, error) {
if vpcInfo := p.fetchVPCInfoFromCache(); vpcInfo != nil {
return vpcInfo, nil
}

// Fetch VPC info from the AWS API and cache response before returning.
vpcInfo, err := p.fetchVPCInfoFromAWS(ctx, vpcID)
if err != nil {
return nil, err
}
p.saveVPCInfoToCache(vpcInfo)

return vpcInfo, nil
}

func (p *defaultVPCInfoProvider) fetchVPCInfoFromCache() *ec2sdk.Vpc {
p.vpcInfoCacheMutex.RLock()
defer p.vpcInfoCacheMutex.RUnlock()

if rawCacheItem, exists := p.vpcInfoCache.Get("vpcInfo"); exists {
return rawCacheItem.(*ec2sdk.Vpc)
}

return nil
}

func (p *defaultVPCInfoProvider) saveVPCInfoToCache(vpcInfo *ec2sdk.Vpc) {
p.vpcInfoCacheMutex.Lock()
defer p.vpcInfoCacheMutex.Unlock()

p.vpcInfoCache.Set("vpcInfo", vpcInfo, p.vpcInfoCacheTTL)
}

// fetchVPCInfoFromAWS will fetch VPC info from the AWS API.
func (p *defaultVPCInfoProvider) fetchVPCInfoFromAWS(ctx context.Context, vpcID string) (*ec2sdk.Vpc, error) {
req := &ec2sdk.DescribeVpcsInput{
VpcIds: []*string{awssdk.String(vpcID)},
}
resp, err := p.ec2Client.DescribeVpcsWithContext(ctx, req)
if err != nil {
return nil, err
}

return resp.Vpcs[0], nil
}
50 changes: 50 additions & 0 deletions pkg/networking/vpc_info_provider_mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 82 additions & 0 deletions pkg/networking/vpc_info_provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package networking

import (
"context"
awssdk "github.com/aws/aws-sdk-go/aws"
ec2sdk "github.com/aws/aws-sdk-go/service/ec2"
gomock "github.com/golang/mock/gomock"
"reflect"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services"
"sigs.k8s.io/controller-runtime/pkg/log"
"testing"
)

func Test_defaultVPCInfoProvider_FetchVPCInfo(t *testing.T) {
type describeVpcCall struct {
input *ec2sdk.DescribeVpcsInput
output *ec2sdk.DescribeVpcsOutput
err error
}

type fields struct {
describeVpcCalls []describeVpcCall
}
tests := []struct {
name string
fields fields
want *ec2sdk.Vpc
wantErr bool
}{
{
name: "from AWS",
fields: fields{
describeVpcCalls: []describeVpcCall{
{
input: &ec2sdk.DescribeVpcsInput{
VpcIds: []*string{awssdk.String("vpc-2f09a348")},
},
output: &ec2sdk.DescribeVpcsOutput{
Vpcs: []*ec2sdk.Vpc{{VpcId: awssdk.String("vpc-2f09a348")}},
},
err: nil,
},
},
},
want: &ec2sdk.Vpc{
VpcId: awssdk.String("vpc-2f09a348"),
},
wantErr: false,
},
{
name: "from cache",
fields: fields{},
want: &ec2sdk.Vpc{
VpcId: awssdk.String("vpc-2f09a348"),
},
wantErr: false,
},
}

ctrl := gomock.NewController(t)
defer ctrl.Finish()

ec2Client := services.NewMockEC2(ctrl)
p := NewDefaultVPCInfoProvider(ec2Client, &log.NullLogger{})

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for _, call := range tt.fields.describeVpcCalls {
ec2Client.EXPECT().DescribeVpcsWithContext(gomock.Any(), call.input).Return(call.output, call.err)
}

got, err := p.FetchVPCInfo(context.Background(), "vpc-2f09a348")
if (err != nil) != tt.wantErr {
t.Errorf("defaultVPCInfoProvider.FetchVPCInfo() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("defaultVPCInfoProvider.FetchVPCInfo() = %v, want %v", got, tt.want)
}
})
}
}
7 changes: 4 additions & 3 deletions pkg/targetgroupbinding/networking_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package targetgroupbinding
import (
"context"
"fmt"
"net"
"strings"
"sync"

awssdk "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
ec2sdk "github.com/aws/aws-sdk-go/service/ec2"
Expand All @@ -13,14 +17,11 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"net"
elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1"
"sigs.k8s.io/aws-load-balancer-controller/pkg/backend"
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
"sigs.k8s.io/aws-load-balancer-controller/pkg/networking"
"sigs.k8s.io/controller-runtime/pkg/client"
"strings"
"sync"
)

const (
Expand Down
Loading