Skip to content

Commit 7b823fb

Browse files
authored
abstract retry_utils and add retry for listener certficate attachment (#1631)
1 parent e75a1ca commit 7b823fb

10 files changed

+322
-44
lines changed

pkg/deploy/ec2/security_group_manager.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ import (
77
ec2sdk "github.com/aws/aws-sdk-go/service/ec2"
88
"github.com/go-logr/logr"
99
"github.com/pkg/errors"
10-
"k8s.io/apimachinery/pkg/util/wait"
1110
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services"
1211
"sigs.k8s.io/aws-load-balancer-controller/pkg/deploy/tracking"
1312
ec2model "sigs.k8s.io/aws-load-balancer-controller/pkg/model/ec2"
1413
"sigs.k8s.io/aws-load-balancer-controller/pkg/networking"
14+
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
1515
"time"
1616
)
1717

@@ -118,19 +118,12 @@ func (m *defaultSecurityGroupManager) Delete(ctx context.Context, sdkSG networki
118118
GroupId: awssdk.String(sdkSG.SecurityGroupID),
119119
}
120120

121-
ctx, cancel := context.WithTimeout(ctx, m.waitSGDeletionTimeout)
122-
defer cancel()
123121
m.logger.Info("deleting securityGroup",
124122
"securityGroupID", sdkSG.SecurityGroupID)
125-
if err := wait.PollImmediateUntil(m.waitSGDeletionPollInterval, func() (done bool, err error) {
126-
if _, err := m.ec2Client.DeleteSecurityGroupWithContext(ctx, req); err != nil {
127-
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "DependencyViolation" {
128-
return false, nil
129-
}
130-
return false, err
131-
}
132-
return true, nil
133-
}, ctx.Done()); err != nil {
123+
if err := runtime.RetryImmediateOnError(m.waitSGDeletionPollInterval, m.waitSGDeletionTimeout, isSecurityGroupDependencyViolationError, func() error {
124+
_, err := m.ec2Client.DeleteSecurityGroupWithContext(ctx, req)
125+
return err
126+
}); err != nil {
134127
return errors.Wrap(err, "failed to delete securityGroup")
135128
}
136129
m.logger.Info("deleted securityGroup",
@@ -174,3 +167,11 @@ func buildIPPermissionInfo(permission ec2model.IPPermission) (networking.IPPermi
174167
}
175168
return networking.IPPermissionInfo{}, errors.New("invalid ipPermission")
176169
}
170+
171+
func isSecurityGroupDependencyViolationError(err error) bool {
172+
var awsErr awserr.Error
173+
if errors.As(err, &awsErr) {
174+
return awsErr.Code() == "DependencyViolation"
175+
}
176+
return false
177+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package ec2
2+
3+
import (
4+
"github.com/aws/aws-sdk-go/aws/awserr"
5+
"github.com/pkg/errors"
6+
"github.com/stretchr/testify/assert"
7+
"testing"
8+
)
9+
10+
func Test_isSecurityGroupDependencyViolationError(t *testing.T) {
11+
type args struct {
12+
err error
13+
}
14+
tests := []struct {
15+
name string
16+
args args
17+
want bool
18+
}{
19+
{
20+
name: "is DependencyViolation error",
21+
args: args{
22+
err: awserr.New("DependencyViolation", "some message", nil),
23+
},
24+
want: true,
25+
},
26+
{
27+
name: "wraps DependencyViolation error",
28+
args: args{
29+
err: errors.Wrap(awserr.New("DependencyViolation", "some message", nil), "wrapped message"),
30+
},
31+
want: true,
32+
},
33+
{
34+
name: "isn't DependencyViolation error",
35+
args: args{
36+
err: errors.New("some other error"),
37+
},
38+
want: false,
39+
},
40+
}
41+
for _, tt := range tests {
42+
t.Run(tt.name, func(t *testing.T) {
43+
got := isSecurityGroupDependencyViolationError(tt.args.err)
44+
assert.Equal(t, tt.want, got)
45+
})
46+
}
47+
}

pkg/deploy/elbv2/listener_manager.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ import (
77
"github.com/go-logr/logr"
88
"github.com/google/go-cmp/cmp"
99
"github.com/google/go-cmp/cmp/cmpopts"
10+
"github.com/pkg/errors"
1011
"k8s.io/apimachinery/pkg/util/sets"
1112
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services"
1213
elbv2equality "sigs.k8s.io/aws-load-balancer-controller/pkg/equality/elbv2"
1314
elbv2model "sigs.k8s.io/aws-load-balancer-controller/pkg/model/elbv2"
15+
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
16+
"time"
1417
)
1518

1619
// ListenerManager is responsible for create/update/delete Listener resources.
@@ -24,8 +27,10 @@ type ListenerManager interface {
2427

2528
func NewDefaultListenerManager(elbv2Client services.ELBV2, logger logr.Logger) *defaultListenerManager {
2629
return &defaultListenerManager{
27-
elbv2Client: elbv2Client,
28-
logger: logger,
30+
elbv2Client: elbv2Client,
31+
logger: logger,
32+
waitLSExistencePollInterval: defaultWaitLSExistencePollInterval,
33+
waitLSExistenceTimeout: defaultWaitLSExistenceTimeout,
2934
}
3035
}
3136

@@ -35,6 +40,9 @@ var _ ListenerManager = &defaultListenerManager{}
3540
type defaultListenerManager struct {
3641
elbv2Client services.ELBV2
3742
logger logr.Logger
43+
44+
waitLSExistencePollInterval time.Duration
45+
waitLSExistenceTimeout time.Duration
3846
}
3947

4048
func (m *defaultListenerManager) Create(ctx context.Context, resLS *elbv2model.Listener) (elbv2model.ListenerStatus, error) {
@@ -56,8 +64,10 @@ func (m *defaultListenerManager) Create(ctx context.Context, resLS *elbv2model.L
5664
"resourceID", resLS.ID(),
5765
"arn", awssdk.StringValue(sdkLS.ListenerArn))
5866

59-
if err := m.updateSDKListenerWithExtraCertificates(ctx, resLS, sdkLS, true); err != nil {
60-
return elbv2model.ListenerStatus{}, err
67+
if err := runtime.RetryImmediateOnError(m.waitLSExistencePollInterval, m.waitLSExistenceTimeout, isListenerNotFoundError, func() error {
68+
return m.updateSDKListenerWithExtraCertificates(ctx, resLS, sdkLS, true)
69+
}); err != nil {
70+
return elbv2model.ListenerStatus{}, errors.Wrap(err, "failed to update extra certificates on listener")
6171
}
6272
return buildResListenerStatus(sdkLS), nil
6373
}

pkg/deploy/elbv2/listener_rule_manager.go

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,17 @@ package elbv2
33
import (
44
"context"
55
awssdk "github.com/aws/aws-sdk-go/aws"
6-
"github.com/aws/aws-sdk-go/aws/awserr"
76
elbv2sdk "github.com/aws/aws-sdk-go/service/elbv2"
87
"github.com/go-logr/logr"
98
"github.com/google/go-cmp/cmp"
109
"github.com/pkg/errors"
11-
"k8s.io/apimachinery/pkg/util/wait"
1210
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services"
1311
elbv2equality "sigs.k8s.io/aws-load-balancer-controller/pkg/equality/elbv2"
1412
elbv2model "sigs.k8s.io/aws-load-balancer-controller/pkg/model/elbv2"
13+
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
1514
"time"
1615
)
1716

18-
const (
19-
defaultWaitLSExistencePollInterval = 2 * time.Second
20-
defaultWaitLSExistenceTimeout = 20 * time.Second
21-
)
22-
2317
// ListenerRuleManager is responsible for create/update/delete ListenerRule resources.
2418
type ListenerRuleManager interface {
2519
Create(ctx context.Context, resLR *elbv2model.ListenerRule) (elbv2model.ListenerRuleStatus, error)
@@ -54,23 +48,18 @@ func (m *defaultListenerRuleManager) Create(ctx context.Context, resLR *elbv2mod
5448
return elbv2model.ListenerRuleStatus{}, err
5549
}
5650

57-
ctx, cancel := context.WithTimeout(ctx, m.waitLSExistenceTimeout)
58-
defer cancel()
5951
m.logger.Info("creating listener rule",
6052
"stackID", resLR.Stack().StackID(),
6153
"resourceID", resLR.ID())
6254
var sdkLR *elbv2sdk.Rule
63-
if err := wait.PollImmediateUntil(m.waitLSExistencePollInterval, func() (done bool, err error) {
55+
if err := runtime.RetryImmediateOnError(m.waitLSExistencePollInterval, m.waitLSExistenceTimeout, isListenerNotFoundError, func() error {
6456
resp, err := m.elbv2Client.CreateRuleWithContext(ctx, req)
6557
if err != nil {
66-
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "ListenerNotFound" {
67-
return false, nil
68-
}
69-
return false, err
58+
return err
7059
}
7160
sdkLR = resp.Rules[0]
72-
return true, nil
73-
}, ctx.Done()); err != nil {
61+
return nil
62+
}); err != nil {
7463
return elbv2model.ListenerRuleStatus{}, errors.Wrap(err, "failed to create listener rule")
7564
}
7665
m.logger.Info("created listener rule",

pkg/deploy/elbv2/listener_utils.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,16 @@ package elbv2
33
import (
44
"context"
55
awssdk "github.com/aws/aws-sdk-go/aws"
6+
"github.com/aws/aws-sdk-go/aws/awserr"
67
elbv2sdk "github.com/aws/aws-sdk-go/service/elbv2"
8+
"github.com/pkg/errors"
79
elbv2model "sigs.k8s.io/aws-load-balancer-controller/pkg/model/elbv2"
10+
"time"
11+
)
12+
13+
const (
14+
defaultWaitLSExistencePollInterval = 2 * time.Second
15+
defaultWaitLSExistenceTimeout = 20 * time.Second
816
)
917

1018
func buildSDKActions(modelActions []elbv2model.Action) ([]*elbv2sdk.Action, error) {
@@ -200,3 +208,11 @@ func buildSDKSourceIpConditionConfig(modelCfg elbv2model.SourceIPConditionConfig
200208
Values: awssdk.StringSlice(modelCfg.Values),
201209
}
202210
}
211+
212+
func isListenerNotFoundError(err error) bool {
213+
var awsErr awserr.Error
214+
if errors.As(err, &awsErr) {
215+
return awsErr.Code() == "ListenerNotFound"
216+
}
217+
return false
218+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package elbv2
2+
3+
import (
4+
"github.com/aws/aws-sdk-go/aws/awserr"
5+
"github.com/pkg/errors"
6+
"github.com/stretchr/testify/assert"
7+
"testing"
8+
)
9+
10+
func Test_isListenerNotFoundError(t *testing.T) {
11+
type args struct {
12+
err error
13+
}
14+
tests := []struct {
15+
name string
16+
args args
17+
want bool
18+
}{
19+
{
20+
name: "is ListenerNotFound error",
21+
args: args{
22+
err: awserr.New("ListenerNotFound", "some message", nil),
23+
},
24+
want: true,
25+
},
26+
{
27+
name: "wraps ListenerNotFound error",
28+
args: args{
29+
err: errors.Wrap(awserr.New("ListenerNotFound", "some message", nil), "wrapped message"),
30+
},
31+
want: true,
32+
},
33+
{
34+
name: "isn't ListenerNotFound error",
35+
args: args{
36+
err: errors.New("some other error"),
37+
},
38+
want: false,
39+
},
40+
}
41+
for _, tt := range tests {
42+
t.Run(tt.name, func(t *testing.T) {
43+
got := isListenerNotFoundError(tt.args.err)
44+
assert.Equal(t, tt.want, got)
45+
})
46+
}
47+
}

pkg/deploy/elbv2/target_group_manager.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ import (
77
elbv2sdk "github.com/aws/aws-sdk-go/service/elbv2"
88
"github.com/go-logr/logr"
99
"github.com/pkg/errors"
10-
"k8s.io/apimachinery/pkg/util/wait"
1110
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services"
1211
"sigs.k8s.io/aws-load-balancer-controller/pkg/deploy/tracking"
1312
elbv2model "sigs.k8s.io/aws-load-balancer-controller/pkg/model/elbv2"
13+
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
1414
"time"
1515
)
1616

@@ -107,19 +107,12 @@ func (m *defaultTargetGroupManager) Delete(ctx context.Context, sdkTG TargetGrou
107107
TargetGroupArn: sdkTG.TargetGroup.TargetGroupArn,
108108
}
109109

110-
ctx, cancel := context.WithTimeout(ctx, m.waitTGDeletionTimeout)
111-
defer cancel()
112110
m.logger.Info("deleting targetGroup",
113111
"arn", awssdk.StringValue(req.TargetGroupArn))
114-
if err := wait.PollImmediateUntil(m.waitTGDeletionPollInterval, func() (done bool, err error) {
115-
if _, err := m.elbv2Client.DeleteTargetGroupWithContext(ctx, req); err != nil {
116-
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "ResourceInUse" {
117-
return false, nil
118-
}
119-
return false, err
120-
}
121-
return true, nil
122-
}, ctx.Done()); err != nil {
112+
if err := runtime.RetryImmediateOnError(m.waitTGDeletionPollInterval, m.waitTGDeletionTimeout, isTargetGroupResourceInUseError, func() error {
113+
_, err := m.elbv2Client.DeleteTargetGroupWithContext(ctx, req)
114+
return err
115+
}); err != nil {
123116
return errors.Wrap(err, "failed to delete targetGroup")
124117
}
125118
m.logger.Info("deleted targetGroup",
@@ -248,3 +241,11 @@ func buildResTargetGroupStatus(sdkTG TargetGroupWithTags) elbv2model.TargetGroup
248241
TargetGroupARN: awssdk.StringValue(sdkTG.TargetGroup.TargetGroupArn),
249242
}
250243
}
244+
245+
func isTargetGroupResourceInUseError(err error) bool {
246+
var awsErr awserr.Error
247+
if errors.As(err, &awsErr) {
248+
return awsErr.Code() == "ResourceInUse"
249+
}
250+
return false
251+
}

pkg/deploy/elbv2/target_group_manager_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package elbv2
22

33
import (
44
awssdk "github.com/aws/aws-sdk-go/aws"
5+
"github.com/aws/aws-sdk-go/aws/awserr"
56
elbv2sdk "github.com/aws/aws-sdk-go/service/elbv2"
7+
"github.com/pkg/errors"
68
"github.com/stretchr/testify/assert"
79
"k8s.io/apimachinery/pkg/util/intstr"
810
elbv2model "sigs.k8s.io/aws-load-balancer-controller/pkg/model/elbv2"
@@ -510,3 +512,42 @@ func Test_buildResTargetGroupStatus(t *testing.T) {
510512
})
511513
}
512514
}
515+
516+
func Test_isTargetGroupResourceInUseError(t *testing.T) {
517+
type args struct {
518+
err error
519+
}
520+
tests := []struct {
521+
name string
522+
args args
523+
want bool
524+
}{
525+
{
526+
name: "is ResourceInUse error",
527+
args: args{
528+
err: awserr.New("ResourceInUse", "some message", nil),
529+
},
530+
want: true,
531+
},
532+
{
533+
name: "wraps ResourceInUse error",
534+
args: args{
535+
err: errors.Wrap(awserr.New("ResourceInUse", "some message", nil), "wrapped message"),
536+
},
537+
want: true,
538+
},
539+
{
540+
name: "isn't ResourceInUse error",
541+
args: args{
542+
err: errors.New("some other error"),
543+
},
544+
want: false,
545+
},
546+
}
547+
for _, tt := range tests {
548+
t.Run(tt.name, func(t *testing.T) {
549+
got := isTargetGroupResourceInUseError(tt.args.err)
550+
assert.Equal(t, tt.want, got)
551+
})
552+
}
553+
}

pkg/runtime/retry_utils.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package runtime
2+
3+
import (
4+
"k8s.io/apimachinery/pkg/util/wait"
5+
"time"
6+
)
7+
8+
// RetryImmediateOnError tries to run fn every interval until it succeeds, a non-retryable error occurs or the timeout is reached.
9+
func RetryImmediateOnError(interval time.Duration, timeout time.Duration, retryable func(error) bool, fn func() error) error {
10+
return wait.PollImmediate(interval, timeout, func() (bool, error) {
11+
err := fn()
12+
if err != nil {
13+
if retryable(err) {
14+
return false, nil
15+
}
16+
return false, err
17+
}
18+
return true, nil
19+
})
20+
}

0 commit comments

Comments
 (0)