Skip to content

Commit e024964

Browse files
Merge pull request #1356 from openshift-cherrypick-robot/cherry-pick-1353-to-release-4.4
[release-4.4] Bug 1783284: feat(jitter): add jitter to controllers to smooth out our spiky resource
2 parents 8cb4ed8 + 8cb2aa0 commit e024964

File tree

8 files changed

+148
-20
lines changed

8 files changed

+148
-20
lines changed

cmd/olm/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
configclientset "github.com/openshift/client-go/config/clientset/versioned"
1414
configv1client "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1"
15+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
1516
"github.com/prometheus/client_golang/prometheus/promhttp"
1617
log "github.com/sirupsen/logrus"
1718
v1 "k8s.io/api/core/v1"
@@ -195,7 +196,7 @@ func main() {
195196
ctx,
196197
olm.WithLogger(logger),
197198
olm.WithWatchedNamespaces(namespaces...),
198-
olm.WithResyncPeriod(*wakeupInterval),
199+
olm.WithResyncPeriod(queueinformer.ResyncWithJitter(*wakeupInterval, 0.2)),
199200
olm.WithExternalClient(crClient),
200201
olm.WithOperatorClient(opClient),
201202
olm.WithRestConfig(config),

pkg/controller/operators/catalog/operator.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ type Operator struct {
9898
type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
9999

100100
// NewOperator creates a new Catalog Operator.
101-
func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clock, logger *logrus.Logger, resyncPeriod time.Duration, configmapRegistryImage, utilImage string, operatorNamespace string) (*Operator, error) {
101+
func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clock, logger *logrus.Logger, resync time.Duration, configmapRegistryImage, utilImage string, operatorNamespace string) (*Operator, error) {
102+
resyncPeriod := queueinformer.ResyncWithJitter(resync, 0.2)
102103
config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
103104
if err != nil {
104105
return nil, err
@@ -149,7 +150,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
149150
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now)
150151

151152
// Wire OLM CR sharedIndexInformers
152-
crInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(op.client, resyncPeriod)
153+
crInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(op.client, resyncPeriod())
153154

154155
// Wire CSVs
155156
csvInformer := crInformerFactory.Operators().V1alpha1().ClusterServiceVersions()
@@ -249,7 +250,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
249250
}
250251

251252
// Wire k8s sharedIndexInformers
252-
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod)
253+
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod())
253254
sharedIndexInformers := []cache.SharedIndexInformer{}
254255

255256
// Wire Roles
@@ -321,7 +322,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
321322
}
322323

323324
// Register CustomResourceDefinition QueueInformer
324-
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsV1beta1Interface(), resyncPeriod).Apiextensions().V1beta1().CustomResourceDefinitions()
325+
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsV1beta1Interface(), resyncPeriod()).Apiextensions().V1beta1().CustomResourceDefinitions()
325326
op.lister.APIExtensionsV1beta1().RegisterCustomResourceDefinitionLister(crdInformer.Lister())
326327
crdQueueInformer, err := queueinformer.NewQueueInformer(
327328
ctx,
@@ -337,7 +338,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
337338
}
338339

339340
// Namespace sync for resolving subscriptions
340-
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod).Core().V1().Namespaces()
341+
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
341342
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
342343
op.nsResolveQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resolver")
343344
namespaceQueueInformer, err := queueinformer.NewQueueInformer(

pkg/controller/operators/olm/config.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package olm
22

33
import (
4+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
45
"time"
56

67
"github.com/pkg/errors"
@@ -21,7 +22,7 @@ import (
2122
type OperatorOption func(*operatorConfig)
2223

2324
type operatorConfig struct {
24-
resyncPeriod time.Duration
25+
resyncPeriod func() time.Duration
2526
operatorNamespace string
2627
watchedNamespaces []string
2728
clock utilclock.Clock
@@ -49,8 +50,8 @@ func newInvalidConfigError(name, msg string) error {
4950
func (o *operatorConfig) validate() (err error) {
5051
// TODO: Add better config validation
5152
switch {
52-
case o.resyncPeriod < 0:
53-
err = newInvalidConfigError("resync period", "must be >= 0")
53+
case o.resyncPeriod == nil:
54+
err = newInvalidConfigError("resync period", "must not be nil")
5455
case o.operatorNamespace == metav1.NamespaceAll:
5556
err = newInvalidConfigError("operator namespace", "must be a single namespace")
5657
case len(o.watchedNamespaces) == 0:
@@ -80,7 +81,7 @@ func (o *operatorConfig) validate() (err error) {
8081

8182
func defaultOperatorConfig() *operatorConfig {
8283
return &operatorConfig{
83-
resyncPeriod: 30 * time.Second,
84+
resyncPeriod: queueinformer.ResyncWithJitter(30 * time.Second, 0.2),
8485
operatorNamespace: "default",
8586
watchedNamespaces: []string{metav1.NamespaceAll},
8687
clock: utilclock.RealClock{},
@@ -91,9 +92,9 @@ func defaultOperatorConfig() *operatorConfig {
9192
}
9293
}
9394

94-
func WithResyncPeriod(period time.Duration) OperatorOption {
95+
func WithResyncPeriod(resyncPeriod func() time.Duration) OperatorOption {
9596
return func(config *operatorConfig) {
96-
config.resyncPeriod = period
97+
config.resyncPeriod = resyncPeriod
9798
}
9899
}
99100

pkg/controller/operators/olm/operator.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
140140
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)
141141
for _, namespace := range config.watchedNamespaces {
142142
// Wire CSVs
143-
extInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(op.client, config.resyncPeriod, externalversions.WithNamespace(namespace))
143+
extInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(op.client, config.resyncPeriod(), externalversions.WithNamespace(namespace))
144144
csvInformer := extInformerFactory.Operators().V1alpha1().ClusterServiceVersions()
145145
op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister())
146146
csvQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/csv", namespace))
@@ -232,7 +232,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
232232
op.RegisterQueueInformer(subQueueInformer)
233233

234234
// Wire Deployments
235-
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod, informers.WithNamespace(namespace))
235+
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), informers.WithNamespace(namespace))
236236
depInformer := k8sInformerFactory.Apps().V1().Deployments()
237237
op.lister.AppsV1().RegisterDeploymentLister(namespace, depInformer.Lister())
238238
depQueueInformer, err := queueinformer.NewQueueInformer(
@@ -360,7 +360,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
360360
return nil, err
361361
}
362362

363-
k8sInformerFactory := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod)
363+
k8sInformerFactory := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod())
364364
clusterRoleInformer := k8sInformerFactory.Rbac().V1().ClusterRoles()
365365
op.lister.RbacV1().RegisterClusterRoleLister(clusterRoleInformer.Lister())
366366
clusterRoleQueueInformer, err := queueinformer.NewQueueInformer(
@@ -414,7 +414,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
414414
}
415415

416416
// Register APIService QueueInformer
417-
apiServiceInformer := kagg.NewSharedInformerFactory(op.opClient.ApiregistrationV1Interface(), config.resyncPeriod).Apiregistration().V1().APIServices()
417+
apiServiceInformer := kagg.NewSharedInformerFactory(op.opClient.ApiregistrationV1Interface(), config.resyncPeriod()).Apiregistration().V1().APIServices()
418418
op.lister.APIRegistrationV1().RegisterAPIServiceLister(apiServiceInformer.Lister())
419419
apiServiceQueueInformer, err := queueinformer.NewQueueInformer(
420420
ctx,
@@ -431,7 +431,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
431431
}
432432

433433
// Register CustomResourceDefinition QueueInformer
434-
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsV1beta1Interface(), config.resyncPeriod).Apiextensions().V1beta1().CustomResourceDefinitions()
434+
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsV1beta1Interface(), config.resyncPeriod()).Apiextensions().V1beta1().CustomResourceDefinitions()
435435
op.lister.APIExtensionsV1beta1().RegisterCustomResourceDefinitionLister(crdInformer.Lister())
436436
crdQueueInformer, err := queueinformer.NewQueueInformer(
437437
ctx,

pkg/controller/operators/olm/operator_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"crypto/x509"
99
"crypto/x509/pkix"
1010
"fmt"
11+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
1112
"math"
1213
"math/big"
1314
"reflect"
@@ -160,7 +161,7 @@ type fakeOperatorOption func(*fakeOperatorConfig)
160161

161162
func withResyncPeriod(period time.Duration) fakeOperatorOption {
162163
return func(config *fakeOperatorConfig) {
163-
config.resyncPeriod = period
164+
config.resyncPeriod = queueinformer.ResyncWithJitter(period, 0.1)
164165
}
165166
}
166167

@@ -257,7 +258,7 @@ func NewFakeOperator(ctx context.Context, options ...fakeOperatorOption) (*Opera
257258
// Apply options to default config
258259
config := &fakeOperatorConfig{
259260
operatorConfig: &operatorConfig{
260-
resyncPeriod: 5 * time.Minute,
261+
resyncPeriod: queueinformer.ResyncWithJitter(5 * time.Minute, 0.1),
261262
operatorNamespace: "default",
262263
watchedNamespaces: []string{metav1.NamespaceAll},
263264
clock: &utilclock.RealClock{},

pkg/lib/queueinformer/jitter.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package queueinformer
2+
3+
import (
4+
"math/rand"
5+
"time"
6+
)
7+
8+
const DefaultResyncPeriod = 15 * time.Minute
9+
10+
// ResyncWithJitter takes a resync interval and adds jitter within a percent difference.
11+
// factor is a value between 0 and 1 indicating the amount of jitter
12+
// a factor of 0.2 and a period of 10m will have a range of 8 to 12 minutes (20%)
13+
func ResyncWithJitter(resyncPeriod time.Duration, factor float64) func() time.Duration {
14+
return func() time.Duration {
15+
if factor < 0.0 || factor > 1.0 {
16+
return resyncPeriod
17+
}
18+
if resyncPeriod < 0.0 {
19+
return DefaultResyncPeriod
20+
}
21+
22+
// if we would wrap around, return resyncPeriod
23+
if time.Duration((1+factor)*resyncPeriod.Minutes())*time.Minute < 0.0 {
24+
return resyncPeriod
25+
}
26+
27+
min := resyncPeriod.Minutes() * (1 - factor)
28+
max := resyncPeriod.Minutes() * (1 + factor)
29+
30+
return time.Duration(min)*time.Minute + time.Duration(rand.Float64()*(max-min))*time.Minute
31+
}
32+
}

pkg/lib/queueinformer/jitter_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package queueinformer
2+
3+
import (
4+
"math/rand"
5+
"reflect"
6+
"testing"
7+
"testing/quick"
8+
"time"
9+
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestResyncWithJitter(t *testing.T) {
14+
type args struct {
15+
resyncPeriod time.Duration
16+
factor float64
17+
}
18+
tests := []struct {
19+
name string
20+
args args
21+
wantMin time.Duration
22+
wantMax time.Duration
23+
}{
24+
{
25+
name: "TypicalInput/Minutes",
26+
args: args{
27+
resyncPeriod: 15 * time.Minute,
28+
factor: 0.2,
29+
},
30+
wantMin: 12 * time.Minute,
31+
wantMax: 18 * time.Minute,
32+
},
33+
{
34+
name: "TypicalInput/Hours",
35+
args: args{
36+
resyncPeriod: 10 * time.Hour,
37+
factor: 0.1,
38+
},
39+
wantMin: 9 * time.Hour,
40+
wantMax: 11 * time.Hour,
41+
},
42+
{
43+
name: "BadInput/BadFactor",
44+
args: args{
45+
resyncPeriod: 10 * time.Hour,
46+
factor: -0.1,
47+
},
48+
wantMin: 10 * time.Hour,
49+
wantMax: 10 * time.Hour,
50+
},
51+
{
52+
name: "BadInput/BadResync",
53+
args: args{
54+
resyncPeriod: -10 * time.Hour,
55+
factor: 0.1,
56+
},
57+
wantMin: DefaultResyncPeriod,
58+
wantMax: DefaultResyncPeriod,
59+
},
60+
}
61+
for _, tt := range tests {
62+
t.Run(tt.name, func(t *testing.T) {
63+
got := ResyncWithJitter(tt.args.resyncPeriod, tt.args.factor)
64+
require.True(t, got() >= tt.wantMin)
65+
require.True(t, got() <= tt.wantMax)
66+
require.True(t, got() != got() || tt.wantMax == tt.wantMin)
67+
})
68+
}
69+
}
70+
71+
type float01 float64
72+
73+
func (float01) Generate(rand *rand.Rand, size int) reflect.Value {
74+
return reflect.ValueOf(float01(rand.Float64()))
75+
}
76+
77+
type dur time.Duration
78+
79+
func (dur) Generate(rand *rand.Rand, size int) reflect.Value {
80+
return reflect.ValueOf(dur(rand.Uint64() / 2))
81+
}
82+
83+
func TestGeneratesWithinRange(t *testing.T) {
84+
f := func(resync dur, factor float01) bool {
85+
resyncPeriod := time.Duration(resync)
86+
min := float64(resyncPeriod.Nanoseconds()) * (1 - float64(factor))
87+
max := float64(resyncPeriod.Nanoseconds()) * (1 + float64(factor))
88+
d := ResyncWithJitter(resyncPeriod, float64(factor))()
89+
return min < float64(d.Nanoseconds()) && float64(d.Nanoseconds()) < max
90+
}
91+
require.NoError(t, quick.Check(f, nil))
92+
}

scripts/build_local.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
set -e
77

88
if [ -z "$NO_MINIKUBE" ]; then
9-
pgrep -f "[m]inikube" >/dev/null || minikube start --kubernetes-version="v1.16.2" --extra-config=apiserver.v=4 || { echo 'Cannot start minikube.'; exit 1; }
9+
pgrep -f "[m]inikube" >/dev/null || minikube start --kubernetes-version="v1.16.4" --extra-config=apiserver.v=4 || { echo 'Cannot start minikube.'; exit 1; }
1010
eval "$(minikube docker-env)" || { echo 'Cannot switch to minikube docker'; exit 1; }
1111
kubectl config use-context minikube
1212
fi

0 commit comments

Comments
 (0)