Skip to content

Commit 813e5d7

Browse files
authored
Model builder for NLB IP mode (#1361)
1 parent 5342074 commit 813e5d7

File tree

3 files changed

+810
-0
lines changed

3 files changed

+810
-0
lines changed

pkg/annotations/constants.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,26 @@ const (
77
// IngressGroup
88
AnnotationSuffixGroupName = "group.name"
99
AnnotationSuffixGroupOrder = "group.order"
10+
11+
// NLB annotation suffixes
12+
// prefixes service.beta.kubernetes.io, service.kubernetes.io
13+
SvcLBSuffixLoadBalancerType = "aws-load-balancer-type"
14+
SvcLBSuffixInternal = "aws-load-balancer-internal"
15+
SvcLBSuffixProxyProtocol = "aws-load-balancer-proxy-protocol"
16+
SvcLBSuffixAccessLogEnabled = "aws-load-balancer-access-log-enabled"
17+
SvcLBSuffixAccessLogS3BucketName = "aws-load-balancer-access-log-s3-bucket-name"
18+
SvcLBSuffixAccessLogS3BucketPrefix = "aws-load-balancer-access-log-s3-bucket-prefix"
19+
SvcLBSuffixCrossZoneLoadBalancingEnabled = "aws-load-balancer-cross-zone-load-balancing-enabled"
20+
SvcLBSuffixSSLCertificate = "aws-load-balancer-ssl-cert"
21+
SvcLBSuffixSSLPorts = "aws-load-balancer-ssl-ports"
22+
SvcLBSuffixSSLNegotiationPolicy = "aws-load-balancer-ssl-negotiation-policy"
23+
SvcLBSuffixBEProtocol = "aws-load-balancer-backend-protocol"
24+
SvcLBSuffixAdditionalTags = "aws-load-balancer-additional-resource-tags"
25+
SvcLBSuffixHCHealthyThreshold = "aws-load-balancer-healthcheck-healthy-threshold"
26+
SvcLBSuffixHCUnhealthyThreshold = "aws-load-balancer-healthcheck-unhealthy-threshold"
27+
SvcLBSuffixHCTimeout = "aws-load-balancer-healthcheck-timeout"
28+
SvcLBSuffixHCInterval = "aws-load-balancer-healthcheck-interval"
29+
SvcLBSuffixHCProtocol = "aws-load-balancer-healthcheck-protocol"
30+
SvcLBSuffixHCPort = "aws-load-balancer-healthcheck-port"
31+
SvcLBSuffixHCPath = "aws-load-balancer-healthcheck-path"
1032
)

pkg/service/nlb/builder.go

Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
package nlb
2+
3+
import (
4+
"context"
5+
"crypto/md5"
6+
"encoding/hex"
7+
"fmt"
8+
"github.com/pkg/errors"
9+
corev1 "k8s.io/api/core/v1"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/types"
12+
"k8s.io/apimachinery/pkg/util/intstr"
13+
"k8s.io/apimachinery/pkg/util/sets"
14+
elbv2api "sigs.k8s.io/aws-alb-ingress-controller/apis/elbv2/v1alpha1"
15+
"sigs.k8s.io/aws-alb-ingress-controller/pkg/annotations"
16+
"sigs.k8s.io/aws-alb-ingress-controller/pkg/k8s"
17+
"sigs.k8s.io/aws-alb-ingress-controller/pkg/model/core"
18+
"sigs.k8s.io/aws-alb-ingress-controller/pkg/model/elbv2"
19+
"strconv"
20+
"strings"
21+
)
22+
23+
const (
24+
LBAttrsAccessLogsS3Enabled = "access_logs.s3.enabled"
25+
LBAttrsAccessLogsS3Bucket = "access_logs.s3.bucket"
26+
LBAttrsAccessLogsS3Prefix = "access_logs.s3.prefix"
27+
LBAttrsLoadBalancingCrossZoneEnabled = "load_balancing.cross_zone.enabled"
28+
TGAttrsProxyProtocolV2Enabled = "proxy_protocol_v2.enabled"
29+
30+
DefaultAccessLogS3Enabled = false
31+
DefaultAccessLogsS3Bucket = ""
32+
DefaultAccessLogsS3Prefix = ""
33+
DefaultLoadBalancingCrossZoneEnabled = false
34+
DefaultProxyProtocolV2Enabled = false
35+
DefaultHealthCheckProtocol = elbv2.ProtocolTCP
36+
DefaultHealthCheckPort = "traffic-port"
37+
DefaultHealthCheckPath = "/"
38+
DefaultHealthCheckInterval = 10
39+
DefaultHealthCheckTimeout = 10
40+
DefaultHealthCheckHealthyThreshold = 3
41+
DefaultHealthCheckUnhealthyThreshold = 3
42+
)
43+
44+
type Builder interface {
45+
Build(ctx context.Context) (core.Stack, error)
46+
}
47+
48+
type nlbBuilder struct {
49+
service *corev1.Service
50+
key types.NamespacedName
51+
annotationParser annotations.Parser
52+
}
53+
54+
func NewServiceBuilder(service *corev1.Service, key types.NamespacedName, annotationParser annotations.Parser) Builder {
55+
return &nlbBuilder{
56+
service: service,
57+
key: key,
58+
annotationParser: annotationParser,
59+
}
60+
}
61+
62+
func (b *nlbBuilder) Build(ctx context.Context) (core.Stack, error) {
63+
stack := core.NewDefaultStack()
64+
b.buildModel(ctx, stack)
65+
return stack, nil
66+
}
67+
68+
func (b *nlbBuilder) buildModel(ctx context.Context, stack core.Stack) error {
69+
if !b.service.DeletionTimestamp.IsZero() {
70+
return nil
71+
}
72+
spec, err := b.loadBalancerSpec(ctx)
73+
if err != nil {
74+
return err
75+
}
76+
nlb := elbv2.NewLoadBalancer(stack, k8s.NamespacedName(b.service).String(), spec)
77+
err = b.buildListeners(ctx, stack, nlb)
78+
if err != nil {
79+
return err
80+
}
81+
return nil
82+
}
83+
84+
func (b *nlbBuilder) loadBalancerSpec(ctx context.Context) (elbv2.LoadBalancerSpec, error) {
85+
ipAddressType := elbv2.IPAddressTypeIPV4
86+
var scheme elbv2.LoadBalancerScheme = elbv2.LoadBalancerSchemeInternetFacing
87+
internal := false
88+
if _, err := b.annotationParser.ParseBoolAnnotation(annotations.SvcLBSuffixInternal, &internal, b.service.Annotations); err != nil {
89+
return elbv2.LoadBalancerSpec{}, err
90+
} else if internal {
91+
scheme = elbv2.LoadBalancerSchemeInternal
92+
}
93+
94+
lbAttributes, err := b.buildLBAttributes(ctx)
95+
if err != nil {
96+
return elbv2.LoadBalancerSpec{}, err
97+
}
98+
tags := map[string]string{}
99+
b.annotationParser.ParseStringMapAnnotation(annotations.SvcLBSuffixAdditionalTags, &tags, b.service.Annotations)
100+
spec := elbv2.LoadBalancerSpec{
101+
Name: b.loadbalancerName(b.service),
102+
Type: elbv2.LoadBalancerTypeNetwork,
103+
Scheme: &scheme,
104+
IPAddressType: &ipAddressType,
105+
SubnetMappings: nil,
106+
LoadBalancerAttributes: lbAttributes,
107+
Tags: tags,
108+
}
109+
return spec, nil
110+
}
111+
112+
func (b *nlbBuilder) buildLBAttributes(ctx context.Context) ([]elbv2.LoadBalancerAttribute, error) {
113+
attrs := []elbv2.LoadBalancerAttribute{}
114+
accessLogEnabled := DefaultAccessLogS3Enabled
115+
bucketName := DefaultAccessLogsS3Bucket
116+
bucketPrefix := DefaultAccessLogsS3Prefix
117+
if _, err := b.annotationParser.ParseBoolAnnotation(annotations.SvcLBSuffixAccessLogEnabled, &accessLogEnabled, b.service.Annotations); err != nil {
118+
return attrs, err
119+
}
120+
if accessLogEnabled {
121+
b.annotationParser.ParseStringAnnotation(annotations.SvcLBSuffixAccessLogS3BucketName, &bucketName, b.service.Annotations)
122+
b.annotationParser.ParseStringAnnotation(annotations.SvcLBSuffixAccessLogS3BucketPrefix, &bucketPrefix, b.service.Annotations)
123+
}
124+
crossZoneEnabled := DefaultLoadBalancingCrossZoneEnabled
125+
if _, err := b.annotationParser.ParseBoolAnnotation(annotations.SvcLBSuffixCrossZoneLoadBalancingEnabled, &crossZoneEnabled, b.service.Annotations); err != nil {
126+
return []elbv2.LoadBalancerAttribute{}, err
127+
}
128+
129+
attrs = append(attrs, []elbv2.LoadBalancerAttribute{
130+
{
131+
Key: LBAttrsAccessLogsS3Enabled,
132+
Value: strconv.FormatBool(accessLogEnabled),
133+
},
134+
{
135+
Key: LBAttrsAccessLogsS3Bucket,
136+
Value: bucketName,
137+
},
138+
{
139+
Key: LBAttrsAccessLogsS3Prefix,
140+
Value: bucketPrefix,
141+
},
142+
{
143+
Key: LBAttrsLoadBalancingCrossZoneEnabled,
144+
Value: strconv.FormatBool(crossZoneEnabled),
145+
},
146+
}...)
147+
148+
return attrs, nil
149+
}
150+
151+
func (b *nlbBuilder) buildTargetHealthCheck(ctx context.Context) (*elbv2.TargetGroupHealthCheckConfig, error) {
152+
hc := elbv2.TargetGroupHealthCheckConfig{}
153+
protocol := DefaultHealthCheckProtocol
154+
b.annotationParser.ParseStringAnnotation(annotations.SvcLBSuffixHCProtocol, (*string)(&protocol), b.service.Annotations)
155+
protocol = strings.ToUpper(protocol)
156+
hc.Protocol = (*elbv2.Protocol)(&protocol)
157+
158+
path := DefaultHealthCheckPath
159+
b.annotationParser.ParseStringAnnotation(annotations.SvcLBSuffixHCPath, &path, b.service.Annotations)
160+
if protocol != elbv2.ProtocolTCP {
161+
hc.Path = &path
162+
}
163+
164+
healthCheckPort := intstr.FromString(DefaultHealthCheckPort)
165+
portAnnotationStr := DefaultHealthCheckPort
166+
if b.annotationParser.ParseStringAnnotation(annotations.SvcLBSuffixHCPort, &portAnnotationStr, b.service.Annotations); portAnnotationStr != DefaultHealthCheckPort {
167+
var portVal int64
168+
if _, err := b.annotationParser.ParseInt64Annotation(annotations.SvcLBSuffixHCPort, &portVal, b.service.Annotations); err != nil {
169+
return nil, err
170+
}
171+
healthCheckPort = intstr.FromInt(int(portVal))
172+
}
173+
hc.Port = &healthCheckPort
174+
175+
intervalSeconds := int64(DefaultHealthCheckInterval)
176+
if _, err := b.annotationParser.ParseInt64Annotation(annotations.SvcLBSuffixHCInterval, &intervalSeconds, b.service.Annotations); err != nil {
177+
return nil, err
178+
}
179+
hc.IntervalSeconds = &intervalSeconds
180+
181+
timeoutSeconds := int64(DefaultHealthCheckTimeout)
182+
if _, err := b.annotationParser.ParseInt64Annotation(annotations.SvcLBSuffixHCTimeout, &timeoutSeconds, b.service.Annotations); err != nil {
183+
return nil, err
184+
}
185+
hc.TimeoutSeconds = &timeoutSeconds
186+
187+
healthyThreshold := int64(DefaultHealthCheckHealthyThreshold)
188+
if _, err := b.annotationParser.ParseInt64Annotation(annotations.SvcLBSuffixHCHealthyThreshold, &healthyThreshold, b.service.Annotations); err != nil {
189+
return nil, err
190+
}
191+
hc.HealthyThresholdCount = &healthyThreshold
192+
193+
unhealthyThreshold := int64(DefaultHealthCheckUnhealthyThreshold)
194+
if _, err := b.annotationParser.ParseInt64Annotation(annotations.SvcLBSuffixHCUnhealthyThreshold, &unhealthyThreshold, b.service.Annotations); err != nil {
195+
return nil, err
196+
}
197+
hc.UnhealthyThresholdCount = &unhealthyThreshold
198+
199+
return &hc, nil
200+
}
201+
202+
func (b *nlbBuilder) targetGroupAttrs(ctx context.Context) ([]elbv2.TargetGroupAttribute, error) {
203+
attrs := []elbv2.TargetGroupAttribute{}
204+
proxyV2Enabled := DefaultProxyProtocolV2Enabled
205+
proxyV2Annotation := ""
206+
if b.annotationParser.ParseStringAnnotation(annotations.SvcLBSuffixProxyProtocol, &proxyV2Annotation, b.service.Annotations) {
207+
if proxyV2Annotation != "*" {
208+
return []elbv2.TargetGroupAttribute{}, errors.Errorf("Invalid value %v for Load Balancer proxy protocol v2 annotation, only value currently supported is *", proxyV2Annotation)
209+
}
210+
proxyV2Enabled = true
211+
}
212+
attrs = append(attrs, elbv2.TargetGroupAttribute{
213+
Key: TGAttrsProxyProtocolV2Enabled,
214+
Value: strconv.FormatBool(proxyV2Enabled),
215+
})
216+
return attrs, nil
217+
}
218+
219+
func (b *nlbBuilder) buildListeners(ctx context.Context, stack core.Stack, lb *elbv2.LoadBalancer) error {
220+
tgAttrs, err := b.targetGroupAttrs(ctx)
221+
if err != nil {
222+
return errors.Wrapf(err, "Unable to build target group attributes")
223+
}
224+
225+
var certificateARNs []string
226+
b.annotationParser.ParseStringSliceAnnotation(annotations.SvcLBSuffixSSLCertificate, &certificateARNs, b.service.Annotations)
227+
228+
var sslPorts []string
229+
b.annotationParser.ParseStringSliceAnnotation(annotations.SvcLBSuffixSSLPorts, &sslPorts, b.service.Annotations)
230+
sslPortsSet := sets.NewString(sslPorts...)
231+
232+
backendProtocol := ""
233+
b.annotationParser.ParseStringAnnotation(annotations.SvcLBSuffixBEProtocol, &backendProtocol, b.service.Annotations)
234+
235+
for _, port := range b.service.Spec.Ports {
236+
hc, err := b.buildTargetHealthCheck(ctx)
237+
tgProtocol := elbv2.Protocol(port.Protocol)
238+
listenerProtocol := elbv2.Protocol(port.Protocol)
239+
if err != nil {
240+
return err
241+
}
242+
243+
if tgProtocol != elbv2.ProtocolUDP && certificateARNs != nil && (sslPortsSet.Len() == 0 || sslPortsSet.Has(port.Name) || sslPortsSet.Has(strconv.Itoa(int(port.Port)))) {
244+
if backendProtocol == "ssl" {
245+
tgProtocol = elbv2.ProtocolTLS
246+
}
247+
listenerProtocol = elbv2.ProtocolTLS
248+
}
249+
tgName := b.targetGroupName(b.service, b.key, port.TargetPort, string(tgProtocol), hc)
250+
251+
targetGroup := elbv2.NewTargetGroup(stack, tgName, elbv2.TargetGroupSpec{
252+
Name: tgName,
253+
TargetType: elbv2.TargetTypeIP,
254+
Port: int64(port.TargetPort.IntVal),
255+
Protocol: tgProtocol,
256+
HealthCheckConfig: hc,
257+
TargetGroupAttributes: tgAttrs,
258+
})
259+
var targetType elbv2api.TargetType = elbv2api.TargetTypeIP
260+
_ = elbv2.NewTargetGroupBindingResource(stack, tgName, elbv2.TargetGroupBindingResourceSpec{
261+
TargetGroupARN: targetGroup.TargetGroupARN(),
262+
Template: elbv2.TargetGroupBindingTemplate{
263+
ObjectMeta: metav1.ObjectMeta{
264+
Namespace: b.service.Namespace,
265+
GenerateName: tgName,
266+
},
267+
Spec: elbv2api.TargetGroupBindingSpec{
268+
TargetType: &targetType,
269+
ServiceRef: elbv2api.ServiceReference{
270+
Name: b.service.Name,
271+
Port: port.TargetPort,
272+
},
273+
},
274+
},
275+
})
276+
277+
var sslPolicy *string = nil
278+
sslPolicyStr := ""
279+
if b.annotationParser.ParseStringAnnotation(annotations.SvcLBSuffixSSLNegotiationPolicy, &sslPolicyStr, b.service.Annotations) {
280+
sslPolicy = &sslPolicyStr
281+
}
282+
283+
certificates := []elbv2.Certificate{}
284+
for _, cert := range certificateARNs {
285+
certificates = append(certificates, elbv2.Certificate{&cert})
286+
}
287+
288+
_ = elbv2.NewListener(stack, strconv.Itoa(int(port.Port)), elbv2.ListenerSpec{
289+
LoadBalancerARN: lb.LoadBalancerARN(),
290+
Port: int64(port.Port),
291+
Protocol: listenerProtocol,
292+
Certificates: certificates,
293+
SSLPolicy: sslPolicy,
294+
DefaultActions: []elbv2.Action{
295+
{
296+
Type: elbv2.ActionTypeForward,
297+
ForwardConfig: &elbv2.ForwardActionConfig{
298+
TargetGroups: []elbv2.TargetGroupTuple{
299+
{
300+
TargetGroupARN: targetGroup.TargetGroupARN(),
301+
},
302+
},
303+
},
304+
},
305+
},
306+
})
307+
}
308+
return nil
309+
}
310+
311+
func (b *nlbBuilder) loadbalancerName(svc *corev1.Service) string {
312+
name := "a" + strings.Replace(string(svc.UID), "-", "", -1)
313+
if len(name) > 32 {
314+
name = name[:32]
315+
}
316+
return name
317+
}
318+
319+
func (b *nlbBuilder) targetGroupName(svc *corev1.Service, id types.NamespacedName, port intstr.IntOrString, proto string, hc *elbv2.TargetGroupHealthCheckConfig) string {
320+
uuidHash := md5.New()
321+
healthCheckProtocol := elbv2.ProtocolTCP
322+
healthCheckInterval := strconv.FormatInt(DefaultHealthCheckInterval, 10)
323+
if hc.Protocol != nil {
324+
healthCheckProtocol = string(*hc.Protocol)
325+
}
326+
if hc.IntervalSeconds != nil {
327+
healthCheckInterval = strconv.FormatInt(*hc.IntervalSeconds, 10)
328+
}
329+
_, _ = uuidHash.Write([]byte(svc.UID))
330+
_, _ = uuidHash.Write([]byte(port.String()))
331+
_, _ = uuidHash.Write([]byte(proto))
332+
_, _ = uuidHash.Write([]byte(healthCheckProtocol))
333+
_, _ = uuidHash.Write([]byte(healthCheckInterval))
334+
uuid := hex.EncodeToString(uuidHash.Sum(nil))
335+
return fmt.Sprintf("k8s-%.8s-%.8s-%.10s", id.Name, id.Namespace, uuid)
336+
}

0 commit comments

Comments
 (0)