Skip to content

Fix the regression of IP mode support for fargate pods #2158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions pkg/k8s/node_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package k8s
import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"regexp"
"strings"
)

const (
toBeDeletedByCATaint = "ToBeDeletedByClusterAutoscaler"
)

var awsInstanceIDRegex = regexp.MustCompile("^i-[^/]*$")

// IsNodeReady returns whether node is ready.
func IsNodeReady(node *corev1.Node) bool {
nodeReadyCond := GetNodeCondition(node, corev1.NodeReady)
Expand Down Expand Up @@ -47,6 +50,10 @@ func ExtractNodeInstanceID(node *corev1.Node) (string, error) {
return "", errors.Errorf("providerID is not specified for node: %s", node.Name)
}

p := strings.Split(providerID, "/")
return p[len(p)-1], nil
providerIDParts := strings.Split(providerID, "/")
instanceID := providerIDParts[len(providerIDParts)-1]
if !awsInstanceIDRegex.MatchString(instanceID) {
return "", errors.Errorf("providerID %s is invalid for EC2 instances, node: %s", providerID, node.Name)
}
return instanceID, nil
}
16 changes: 15 additions & 1 deletion pkg/k8s/node_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func TestExtractNodeInstanceID(t *testing.T) {
wantErr: errors.New("providerID is not specified for node: my-node-name"),
},
{
name: "node with providerID",
name: "node by EC2 instance",
args: args{
node: &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -225,6 +225,20 @@ func TestExtractNodeInstanceID(t *testing.T) {
},
want: "i-abcdefg0",
},
{
name: "node by EKS Fargate",
args: args{
node: &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "fargate-ip-192-168-138-30.us-west-2.compute.internal",
},
Spec: corev1.NodeSpec{
ProviderID: "aws:///us-west-2b/368270442a-793d42d32c704bb793ca88a6a14ddd6e/fargate-ip-192-168-138-30.us-west-2.compute.internal",
},
},
},
wantErr: errors.New("providerID aws:///us-west-2b/368270442a-793d42d32c704bb793ca88a6a14ddd6e/fargate-ip-192-168-138-30.us-west-2.compute.internal is invalid for EC2 instances, node: fargate-ip-192-168-138-30.us-west-2.compute.internal"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
98 changes: 85 additions & 13 deletions pkg/networking/pod_eni_info_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/apimachinery/pkg/util/cache"
"k8s.io/apimachinery/pkg/util/sets"
"net"
"sigs.k8s.io/aws-load-balancer-controller/pkg/algorithm"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services"
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -20,6 +21,10 @@ import (

const (
defaultPodENIInfoCacheTTL = 10 * time.Minute
// EC2:DescribeNetworkInterface supports up to 200 filters per call.
describeNetworkInterfacesFiltersLimit = 200

labelEKSComputeType = "eks.amazonaws.com/compute-type"
)

// PodENIInfoResolver is responsible for resolve the AWS VPC ENI that supports pod network.
Expand All @@ -29,15 +34,17 @@ type PodENIInfoResolver interface {
}

// NewDefaultPodENIInfoResolver constructs new defaultPodENIInfoResolver.
func NewDefaultPodENIInfoResolver(k8sClient client.Client, ec2Client services.EC2, nodeInfoProvider NodeInfoProvider, logger logr.Logger) *defaultPodENIInfoResolver {
func NewDefaultPodENIInfoResolver(k8sClient client.Client, ec2Client services.EC2, nodeInfoProvider NodeInfoProvider, vpcID string, logger logr.Logger) *defaultPodENIInfoResolver {
return &defaultPodENIInfoResolver{
k8sClient: k8sClient,
ec2Client: ec2Client,
nodeInfoProvider: nodeInfoProvider,
logger: logger,
podENIInfoCache: cache.NewExpiring(),
podENIInfoCacheMutex: sync.RWMutex{},
podENIInfoCacheTTL: defaultPodENIInfoCacheTTL,
k8sClient: k8sClient,
ec2Client: ec2Client,
nodeInfoProvider: nodeInfoProvider,
vpcID: vpcID,
logger: logger,
podENIInfoCache: cache.NewExpiring(),
podENIInfoCacheMutex: sync.RWMutex{},
podENIInfoCacheTTL: defaultPodENIInfoCacheTTL,
describeNetworkInterfacesIPChunkSize: describeNetworkInterfacesFiltersLimit - 1, // we used 1 filter for VPC.
}
}

Expand All @@ -51,6 +58,8 @@ type defaultPodENIInfoResolver struct {
ec2Client services.EC2
// nodeInfoProvider
nodeInfoProvider NodeInfoProvider
// vpcID
vpcID string
// logger
logger logr.Logger

Expand All @@ -62,6 +71,9 @@ type defaultPodENIInfoResolver struct {
// TTL for each cache entries.
// Note: we assume pod's ENI information(e.g. securityGroups) haven't changed per podENICacheTTL.
podENIInfoCacheTTL time.Duration

// chunkSize when describe network interface with IPAddress filter.
describeNetworkInterfacesIPChunkSize int
}

func (r *defaultPodENIInfoResolver) Resolve(ctx context.Context, pods []k8s.PodInfo) (map[types.NamespacedName]ENIInfo, error) {
Expand Down Expand Up @@ -129,7 +141,8 @@ func (r *defaultPodENIInfoResolver) saveENIInfosToCache(pods []k8s.PodInfo, eniI
func (r *defaultPodENIInfoResolver) resolveViaCascadedLookup(ctx context.Context, pods []k8s.PodInfo) (map[types.NamespacedName]ENIInfo, error) {
resolveFuncs := []func(ctx context.Context, pods []k8s.PodInfo) (map[types.NamespacedName]ENIInfo, error){
r.resolveViaPodENIAnnotation,
r.resolveViaVPCIPAddress,
r.resolveViaNodeENIs,
r.resolveViaVPCENIs,
// TODO, add support for kubenet CNI plugin(kops) by resolve via routeTable.
}

Expand All @@ -151,7 +164,8 @@ func (r *defaultPodENIInfoResolver) resolveViaCascadedLookup(ctx context.Context
return eniInfoByPodKey, nil
}

// resolveViaPodENIAnnotation tries to resolve a pod ENI via the branch ENI annotation.
// resolveViaPodENIAnnotation tries to resolve pod ENI by lookup pod's ENIInfo annotation.
// with aws-vpc-cni CNI plugin's SecurityGroups for pods feature, podIP is supported by branchENI, whose information is exposed as pod annotation.
func (r *defaultPodENIInfoResolver) resolveViaPodENIAnnotation(ctx context.Context, pods []k8s.PodInfo) (map[types.NamespacedName]ENIInfo, error) {
podKeysByENIID := make(map[string][]types.NamespacedName)
for _, pod := range pods {
Expand Down Expand Up @@ -191,8 +205,9 @@ func (r *defaultPodENIInfoResolver) resolveViaPodENIAnnotation(ctx context.Conte
return eniInfoByPodKey, nil
}

// resolveViaVPCIPAddress tries to resolve Pod ENI through the Pod IPAddress within VPC.
func (r *defaultPodENIInfoResolver) resolveViaVPCIPAddress(ctx context.Context, pods []k8s.PodInfo) (map[types.NamespacedName]ENIInfo, error) {
// resolveViaNodeENIs tries to resolve Pod ENI by matching podIP against ENIs on EC2 node's ENIs.
// with aws-vpc-cni CNI plugin, podIP can be supported by either IPv4Addresses or IPv4Prefixes on ENI.
func (r *defaultPodENIInfoResolver) resolveViaNodeENIs(ctx context.Context, pods []k8s.PodInfo) (map[types.NamespacedName]ENIInfo, error) {
nodeKeysSet := make(map[types.NamespacedName]sets.Empty)
for _, pod := range pods {
nodeKey := types.NamespacedName{Name: pod.NodeName}
Expand All @@ -204,13 +219,20 @@ func (r *defaultPodENIInfoResolver) resolveViaVPCIPAddress(ctx context.Context,
if err := r.k8sClient.Get(ctx, nodeKey, node); err != nil {
return nil, err
}
// Fargate based nodes are not EC2 instances
if node.Labels[labelEKSComputeType] == "fargate" {
continue
}
nodes = append(nodes, node)
}
if len(nodes) == 0 {
return nil, nil
}

nodeInstanceByNodeKey, err := r.nodeInfoProvider.FetchNodeInstances(ctx, nodes)
if err != nil {
return nil, err
}

eniInfoByPodKey := make(map[types.NamespacedName]ENIInfo, len(pods))
for _, pod := range pods {
nodeKey := types.NamespacedName{Name: pod.NodeName}
Expand All @@ -226,6 +248,56 @@ func (r *defaultPodENIInfoResolver) resolveViaVPCIPAddress(ctx context.Context,
return eniInfoByPodKey, nil
}

// resolveViaVPCENIs tries to resolve pod ENI by matching podIP against ENIs in vpc.
// with EKS fargate pods, podIP is supported by an ENI in vpc.
func (r *defaultPodENIInfoResolver) resolveViaVPCENIs(ctx context.Context, pods []k8s.PodInfo) (map[types.NamespacedName]ENIInfo, error) {
podKeysByIP := make(map[string][]types.NamespacedName, len(pods))
for _, pod := range pods {
podKeysByIP[pod.PodIP] = append(podKeysByIP[pod.PodIP], pod.Key)
}
if len(podKeysByIP) == 0 {
return nil, nil
}

podIPs := sets.StringKeySet(podKeysByIP).List()
podIPChunks := algorithm.ChunkStrings(podIPs, r.describeNetworkInterfacesIPChunkSize)
eniByID := make(map[string]*ec2sdk.NetworkInterface)
for _, podIPChunk := range podIPChunks {
req := &ec2sdk.DescribeNetworkInterfacesInput{
Filters: []*ec2sdk.Filter{
{
Name: awssdk.String("vpc-id"),
Values: awssdk.StringSlice([]string{r.vpcID}),
},
{
Name: awssdk.String("addresses.private-ip-address"),
Values: awssdk.StringSlice(podIPChunk),
},
},
}
enis, err := r.ec2Client.DescribeNetworkInterfacesAsList(ctx, req)
if err != nil {
return nil, err
}
for _, eni := range enis {
eniID := awssdk.StringValue(eni.NetworkInterfaceId)
eniByID[eniID] = eni
}
}

eniInfoByPodKey := make(map[types.NamespacedName]ENIInfo)
for _, eni := range eniByID {
eniInfo := buildENIInfoViaENI(eni)
for _, addr := range eni.PrivateIpAddresses {
eniIP := awssdk.StringValue(addr.PrivateIpAddress)
for _, podKey := range podKeysByIP[eniIP] {
eniInfoByPodKey[podKey] = eniInfo
}
}
}
return eniInfoByPodKey, nil
}

// isPodSupportedByNodeENI checks whether pod is supported by specific nodeENI.
func (r *defaultPodENIInfoResolver) isPodSupportedByNodeENI(pod k8s.PodInfo, nodeENI *ec2sdk.InstanceNetworkInterface) bool {
for _, ipv4Address := range nodeENI.PrivateIpAddresses {
Expand Down
Loading