@@ -186,7 +186,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
186
186
return nil , err
187
187
}
188
188
189
- canFilter , err := labeller .Validate (ctx , logger , metadataClient )
189
+ canFilter , err := labeller .Validate (ctx , logger , metadataClient , crClient )
190
190
if err != nil {
191
191
return nil , err
192
192
}
@@ -208,10 +208,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
208
208
ogQueueSet : queueinformer .NewEmptyResourceQueueSet (),
209
209
catalogSubscriberIndexer : map [string ]cache.Indexer {},
210
210
serviceAccountQuerier : scoped .NewUserDefinedServiceAccountQuerier (logger , crClient ),
211
- clientAttenuator : scoped .NewClientAttenuator (logger , config , opClient ),
211
+ clientAttenuator : scoped .NewClientAttenuator (logger , validatingConfig , opClient ),
212
212
installPlanTimeout : installPlanTimeout ,
213
213
bundleUnpackTimeout : bundleUnpackTimeout ,
214
- clientFactory : clients .NewFactory (config ),
214
+ clientFactory : clients .NewFactory (validatingConfig ),
215
215
}
216
216
op .sources = grpc .NewSourceStore (logger , 10 * time .Second , 10 * time .Minute , op .syncSourceState )
217
217
op .sourceInvalidator = resolver .SourceProviderFromRegistryClientProvider (op .sources , logger )
@@ -381,10 +381,27 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
381
381
op .lister .RbacV1 ().RegisterRoleLister (metav1 .NamespaceAll , roleInformer .Lister ())
382
382
sharedIndexInformers = append (sharedIndexInformers , roleInformer .Informer ())
383
383
384
- labelObjects := func (gvr schema.GroupVersionResource , informer cache.SharedIndexInformer , sync queueinformer.LegacySyncHandler ) error {
384
+ complete := map [schema.GroupVersionResource ][]bool {}
385
+ completeLock := & sync.Mutex {}
386
+
387
+ labelObjects := func (gvr schema.GroupVersionResource , informer cache.SharedIndexInformer , sync func (done func () bool ) queueinformer.LegacySyncHandler ) error {
385
388
if canFilter {
386
389
return nil
387
390
}
391
+
392
+ // for each GVR, we may have more than one labelling controller active; each of which detects
393
+ // when it is done; we allocate a space in complete[gvr][idx] to hold that outcome and track it
394
+ var idx int
395
+ if _ , exists := complete [gvr ]; exists {
396
+ idx = len (complete [gvr ])
397
+ complete [gvr ] = append (complete [gvr ], false )
398
+ } else {
399
+ idx = 0
400
+ complete [gvr ] = []bool {false }
401
+ }
402
+ logger := op .logger .WithFields (logrus.Fields {"gvr" : gvr .String (), "index" : idx })
403
+ logger .Info ("registering labeller" )
404
+
388
405
queue := workqueue .NewRateLimitingQueueWithConfig (workqueue .DefaultControllerRateLimiter (), workqueue.RateLimitingQueueConfig {
389
406
Name : gvr .String (),
390
407
})
@@ -393,7 +410,24 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
393
410
queueinformer .WithQueue (queue ),
394
411
queueinformer .WithLogger (op .logger ),
395
412
queueinformer .WithInformer (informer ),
396
- queueinformer .WithSyncer (sync .ToSyncer ()),
413
+ queueinformer .WithSyncer (sync (func () bool {
414
+ // this function is called by the processor when it detects that it's work is done - so, for that
415
+ // particular labelling action on that particular GVR, all objects are in the correct state. when
416
+ // that action is done, we need to further know if that was the last action to be completed, as
417
+ // when every action we know about has been completed, we re-start the process to allow the future
418
+ // invocation of this process to filter informers (canFilter = true) and elide all this logic
419
+ completeLock .Lock ()
420
+ logger .Info ("labeller complete" )
421
+ complete [gvr ][idx ] = true
422
+ allDone := true
423
+ for _ , items := range complete {
424
+ for _ , done := range items {
425
+ allDone = allDone && done
426
+ }
427
+ }
428
+ completeLock .Unlock ()
429
+ return allDone
430
+ }).ToSyncer ()),
397
431
)
398
432
if err != nil {
399
433
return err
@@ -409,6 +443,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
409
443
rolesgvk := rbacv1 .SchemeGroupVersion .WithResource ("roles" )
410
444
if err := labelObjects (rolesgvk , roleInformer .Informer (), labeller .ObjectLabeler [* rbacv1.Role , * rbacv1applyconfigurations.RoleApplyConfiguration ](
411
445
ctx , op .logger , labeller .Filter (rolesgvk ),
446
+ roleInformer .Lister ().List ,
412
447
rbacv1applyconfigurations .Role ,
413
448
func (namespace string , ctx context.Context , cfg * rbacv1applyconfigurations.RoleApplyConfiguration , opts metav1.ApplyOptions ) (* rbacv1.Role , error ) {
414
449
return op .opClient .KubernetesInterface ().RbacV1 ().Roles (namespace ).Apply (ctx , cfg , opts )
@@ -421,6 +456,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
421
456
func (role * rbacv1.Role ) (string , error ) {
422
457
return resolver .PolicyRuleHashLabelValue (role .Rules )
423
458
},
459
+ roleInformer .Lister ().List ,
424
460
rbacv1applyconfigurations .Role ,
425
461
func (namespace string , ctx context.Context , cfg * rbacv1applyconfigurations.RoleApplyConfiguration , opts metav1.ApplyOptions ) (* rbacv1.Role , error ) {
426
462
return op .opClient .KubernetesInterface ().RbacV1 ().Roles (namespace ).Apply (ctx , cfg , opts )
@@ -437,6 +473,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
437
473
rolebindingsgvk := rbacv1 .SchemeGroupVersion .WithResource ("rolebindings" )
438
474
if err := labelObjects (rolebindingsgvk , roleBindingInformer .Informer (), labeller .ObjectLabeler [* rbacv1.RoleBinding , * rbacv1applyconfigurations.RoleBindingApplyConfiguration ](
439
475
ctx , op .logger , labeller .Filter (rolebindingsgvk ),
476
+ roleBindingInformer .Lister ().List ,
440
477
rbacv1applyconfigurations .RoleBinding ,
441
478
func (namespace string , ctx context.Context , cfg * rbacv1applyconfigurations.RoleBindingApplyConfiguration , opts metav1.ApplyOptions ) (* rbacv1.RoleBinding , error ) {
442
479
return op .opClient .KubernetesInterface ().RbacV1 ().RoleBindings (namespace ).Apply (ctx , cfg , opts )
@@ -449,6 +486,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
449
486
func (roleBinding * rbacv1.RoleBinding ) (string , error ) {
450
487
return resolver .RoleReferenceAndSubjectHashLabelValue (roleBinding .RoleRef , roleBinding .Subjects )
451
488
},
489
+ roleBindingInformer .Lister ().List ,
452
490
rbacv1applyconfigurations .RoleBinding ,
453
491
func (namespace string , ctx context.Context , cfg * rbacv1applyconfigurations.RoleBindingApplyConfiguration , opts metav1.ApplyOptions ) (* rbacv1.RoleBinding , error ) {
454
492
return op .opClient .KubernetesInterface ().RbacV1 ().RoleBindings (namespace ).Apply (ctx , cfg , opts )
@@ -464,7 +502,19 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
464
502
465
503
serviceaccountsgvk := corev1 .SchemeGroupVersion .WithResource ("serviceaccounts" )
466
504
if err := labelObjects (serviceaccountsgvk , serviceAccountInformer .Informer (), labeller .ObjectLabeler [* corev1.ServiceAccount , * corev1applyconfigurations.ServiceAccountApplyConfiguration ](
467
- ctx , op .logger , labeller .Filter (serviceaccountsgvk ),
505
+ ctx , op .logger , labeller .ServiceAccountFilter (func (namespace , name string ) bool {
506
+ operatorGroups , err := operatorGroupInformer .Lister ().OperatorGroups (namespace ).List (labels .Everything ())
507
+ if err != nil {
508
+ return false
509
+ }
510
+ for _ , operatorGroup := range operatorGroups {
511
+ if operatorGroup .Spec .ServiceAccountName == name {
512
+ return true
513
+ }
514
+ }
515
+ return false
516
+ }),
517
+ serviceAccountInformer .Lister ().List ,
468
518
corev1applyconfigurations .ServiceAccount ,
469
519
func (namespace string , ctx context.Context , cfg * corev1applyconfigurations.ServiceAccountApplyConfiguration , opts metav1.ApplyOptions ) (* corev1.ServiceAccount , error ) {
470
520
return op .opClient .KubernetesInterface ().CoreV1 ().ServiceAccounts (namespace ).Apply (ctx , cfg , opts )
@@ -481,6 +531,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
481
531
servicesgvk := corev1 .SchemeGroupVersion .WithResource ("services" )
482
532
if err := labelObjects (servicesgvk , serviceInformer .Informer (), labeller .ObjectLabeler [* corev1.Service , * corev1applyconfigurations.ServiceApplyConfiguration ](
483
533
ctx , op .logger , labeller .Filter (servicesgvk ),
534
+ serviceInformer .Lister ().List ,
484
535
corev1applyconfigurations .Service ,
485
536
func (namespace string , ctx context.Context , cfg * corev1applyconfigurations.ServiceApplyConfiguration , opts metav1.ApplyOptions ) (* corev1.Service , error ) {
486
537
return op .opClient .KubernetesInterface ().CoreV1 ().Services (namespace ).Apply (ctx , cfg , opts )
@@ -506,6 +557,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
506
557
podsgvk := corev1 .SchemeGroupVersion .WithResource ("pods" )
507
558
if err := labelObjects (podsgvk , csPodInformer .Informer (), labeller .ObjectLabeler [* corev1.Pod , * corev1applyconfigurations.PodApplyConfiguration ](
508
559
ctx , op .logger , labeller .Filter (podsgvk ),
560
+ csPodInformer .Lister ().List ,
509
561
corev1applyconfigurations .Pod ,
510
562
func (namespace string , ctx context.Context , cfg * corev1applyconfigurations.PodApplyConfiguration , opts metav1.ApplyOptions ) (* corev1.Pod , error ) {
511
563
return op .opClient .KubernetesInterface ().CoreV1 ().Pods (namespace ).Apply (ctx , cfg , opts )
@@ -543,6 +595,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
543
595
ctx , op .logger , labeller .JobFilter (func (namespace , name string ) (metav1.Object , error ) {
544
596
return configMapInformer .Lister ().ConfigMaps (namespace ).Get (name )
545
597
}),
598
+ jobInformer .Lister ().List ,
546
599
batchv1applyconfigurations .Job ,
547
600
func (namespace string , ctx context.Context , cfg * batchv1applyconfigurations.JobApplyConfiguration , opts metav1.ApplyOptions ) (* batchv1.Job , error ) {
548
601
return op .opClient .KubernetesInterface ().BatchV1 ().Jobs (namespace ).Apply (ctx , cfg , opts )
@@ -618,6 +671,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
618
671
customresourcedefinitionsgvk := apiextensionsv1 .SchemeGroupVersion .WithResource ("customresourcedefinitions" )
619
672
if err := labelObjects (customresourcedefinitionsgvk , crdInformer , labeller .ObjectPatchLabeler (
620
673
ctx , op .logger , labeller .Filter (customresourcedefinitionsgvk ),
674
+ crdLister .List ,
621
675
op .opClient .ApiextensionsInterface ().ApiextensionsV1 ().CustomResourceDefinitions ().Patch ,
622
676
)); err != nil {
623
677
return nil , err
@@ -1998,13 +2052,15 @@ func transitionInstallPlanState(log logrus.FieldLogger, transitioner installPlan
1998
2052
}
1999
2053
log .Debug ("attempting to install" )
2000
2054
if err := transitioner .ExecutePlan (out ); err != nil {
2001
- if now .Sub (out .Status .StartTime .Time ) >= timeout {
2055
+ if apierrors .IsForbidden (err ) || now .Sub (out .Status .StartTime .Time ) < timeout {
2056
+ // forbidden problems are never terminal since we don't know when a user might provide
2057
+ // the service account they specified with more permissions
2058
+ out .Status .Message = fmt .Sprintf ("retrying execution due to error: %s" , err .Error ())
2059
+ } else {
2002
2060
out .Status .SetCondition (v1alpha1 .ConditionFailed (v1alpha1 .InstallPlanInstalled ,
2003
2061
v1alpha1 .InstallPlanReasonComponentFailed , err .Error (), & now ))
2004
2062
out .Status .Phase = v1alpha1 .InstallPlanPhaseFailed
2005
2063
out .Status .Message = err .Error ()
2006
- } else {
2007
- out .Status .Message = fmt .Sprintf ("retrying execution due to error: %s" , err .Error ())
2008
2064
}
2009
2065
return out , err
2010
2066
} else if ! out .Status .NeedsRequeue () {
0 commit comments