@@ -56,16 +56,17 @@ var timeNow = func() metav1.Time { return metav1.NewTime(time.Now().UTC()) }
56
56
// resolving dependencies in a catalog.
57
57
type Operator struct {
58
58
* queueinformer.Operator
59
- client versioned.Interface
60
- lister operatorlister.OperatorLister
61
- namespace string
62
- sources map [resolver.CatalogKey ]resolver.SourceRef
63
- sourcesLock sync.RWMutex
64
- sourcesLastUpdate metav1.Time
65
- resolver resolver.Resolver
66
- subQueue workqueue.RateLimitingInterface
67
- catSrcQueueSet queueinformer.ResourceQueueSet
68
- reconciler reconciler.ReconcilerFactory
59
+ client versioned.Interface
60
+ lister operatorlister.OperatorLister
61
+ namespace string
62
+ sources map [resolver.CatalogKey ]resolver.SourceRef
63
+ sourcesLock sync.RWMutex
64
+ sourcesLastUpdate metav1.Time
65
+ resolver resolver.Resolver
66
+ subQueue workqueue.RateLimitingInterface
67
+ catSrcQueueSet queueinformer.ResourceQueueSet
68
+ namespaceResolveQueue workqueue.RateLimitingInterface
69
+ reconciler reconciler.ReconcilerFactory
69
70
}
70
71
71
72
// NewOperator creates a new Catalog Operator.
@@ -95,6 +96,7 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti
95
96
// resolver needs subscription and csv listers
96
97
lister .OperatorsV1alpha1 ().RegisterSubscriptionLister (namespace , nsInformerFactory .Operators ().V1alpha1 ().Subscriptions ().Lister ())
97
98
lister .OperatorsV1alpha1 ().RegisterClusterServiceVersionLister (namespace , nsInformerFactory .Operators ().V1alpha1 ().ClusterServiceVersions ().Lister ())
99
+ lister .OperatorsV1alpha1 ().RegisterInstallPlanLister (namespace , nsInformerFactory .Operators ().V1alpha1 ().InstallPlans ().Lister ())
98
100
}
99
101
100
102
// Create a new queueinformer-based operator.
@@ -209,6 +211,24 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti
209
211
OpClient : op .OpClient ,
210
212
Lister : op .lister ,
211
213
}
214
+
215
+ // Namespace sync for resolving subscriptions
216
+ namespaceInformer := informers .NewSharedInformerFactory (op .OpClient .KubernetesInterface (), wakeupInterval ).Core ().V1 ().Namespaces ()
217
+ resolvingNamespaceQueue := workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "resolver" )
218
+ namespaceQueueInformer := queueinformer .NewInformer (
219
+ resolvingNamespaceQueue ,
220
+ namespaceInformer .Informer (),
221
+ op .syncResolvingNamespace ,
222
+ nil ,
223
+ "resolver" ,
224
+ metrics .NewMetricsNil (),
225
+ logger ,
226
+ )
227
+
228
+ op .RegisterQueueInformer (namespaceQueueInformer )
229
+ op .lister .CoreV1 ().RegisterNamespaceLister (namespaceInformer .Lister ())
230
+ op .namespaceResolveQueue = resolvingNamespaceQueue
231
+
212
232
return op , nil
213
233
}
214
234
@@ -319,6 +339,7 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
319
339
320
340
logger := o .Log .WithFields (logrus.Fields {
321
341
"source" : catsrc .GetName (),
342
+ "id" : queueinformer .NewLoopID (),
322
343
})
323
344
logger .Debug ("syncing catsrc" )
324
345
out := catsrc .DeepCopy ()
@@ -456,9 +477,8 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
456
477
return err
457
478
}
458
479
459
- // Sync any dependent Subscriptions
460
- // TODO: this should go away, we should resync the namespace instead
461
- o .syncDependentSubscriptions (logger , out .GetName (), out .GetNamespace ())
480
+ // Trigger a resolve, will pick up any subscriptions that depend on the catalog
481
+ o .resolveNamespace (out .GetNamespace ())
462
482
463
483
return nil
464
484
}
@@ -487,36 +507,59 @@ func (o *Operator) syncDependentSubscriptions(logger *logrus.Entry, catalogSourc
487
507
}
488
508
}
489
509
490
- func (o * Operator ) syncSubscriptions (obj interface {}) error {
491
- sub , ok := obj .(* v1alpha1. Subscription )
510
+ func (o * Operator ) syncResolvingNamespace (obj interface {}) error {
511
+ ns , ok := obj .(* corev1. Namespace )
492
512
if ! ok {
493
513
o .Log .Debugf ("wrong type: %#v" , obj )
494
- return fmt .Errorf ("casting Subscription failed" )
514
+ return fmt .Errorf ("casting Namespace failed" )
495
515
}
496
- namespace := sub . GetNamespace ()
516
+ namespace := ns . GetName ()
497
517
498
518
logger := o .Log .WithFields (logrus.Fields {
499
- "sub" : sub .GetName (),
500
- "namespace" : sub .GetNamespace (),
501
- "source" : sub .Spec .CatalogSource ,
502
- "pkg" : sub .Spec .Package ,
503
- "channel" : sub .Spec .Channel ,
519
+ "namespace" : namespace ,
520
+ "id" : queueinformer .NewLoopID (),
504
521
})
505
522
506
- // record the current state of the desired corresponding CSV in the status. no-op if we don't know the csv yet.
507
- sub , err := o .ensureSubscriptionCSVState (logger , sub )
523
+ // get the set of sources that should be used for resolution and best-effort get their connections working
524
+ logger .Debug ("resolving sources" )
525
+ resolverSources := o .ensureResolverSources (logger , namespace )
526
+
527
+ logger .Debug ("checking if subscriptions need update" )
528
+
529
+ subs , err := o .lister .OperatorsV1alpha1 ().SubscriptionLister ().Subscriptions (namespace ).List (labels .Everything ())
508
530
if err != nil {
531
+ logger .WithError (err ).Debug ("couldn't list subscriptions" )
509
532
return err
510
533
}
511
534
512
- // return early if the subscription is up to date
513
- if o .nothingToUpdate (logger , sub ) {
535
+ shouldUpdate := false
536
+ for _ , sub := range subs {
537
+ logger := logger .WithFields (logrus.Fields {
538
+ "sub" : sub .GetName (),
539
+ "source" : sub .Spec .CatalogSource ,
540
+ "pkg" : sub .Spec .Package ,
541
+ "channel" : sub .Spec .Channel ,
542
+ })
543
+
544
+ // ensure the installplan reference is correct
545
+ sub , err := o .ensureSubscriptionInstallPlanState (logger , sub )
546
+ if err != nil {
547
+ return err
548
+ }
549
+
550
+ // record the current state of the desired corresponding CSV in the status. no-op if we don't know the csv yet.
551
+ sub , err = o .ensureSubscriptionCSVState (logger , sub )
552
+ if err != nil {
553
+ return err
554
+ }
555
+ shouldUpdate = shouldUpdate || ! o .nothingToUpdate (logger , sub )
556
+ }
557
+ if ! shouldUpdate {
558
+ logger .Debug ("all subscriptions up to date" )
514
559
return nil
515
560
}
516
561
517
- // get the set of sources that should be used for resolution and best-effort get their connections working
518
- logger .Debugf ("resolving sources for %s" , namespace )
519
- resolverSources := o .ensureResolverSources (logger , namespace )
562
+ logger .Debug ("resolving subscriptions in namespace" )
520
563
521
564
// resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors
522
565
steps , subs , err := o .resolver .ResolveSteps (namespace , resolver .NewNamespaceSourceQuerier (resolverSources ))
@@ -533,20 +576,35 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
533
576
break
534
577
}
535
578
}
536
-
537
579
installplanReference , err := o .createInstallPlan (namespace , subs , installPlanApproval , steps )
538
580
if err != nil {
539
581
logger .WithError (err ).Debug ("error creating installplan" )
540
582
return err
541
583
}
542
584
543
- if err := o .ensureSubscriptionInstallPlanState (namespace , subs , installplanReference ); err != nil {
585
+ if err := o .updateSubscriptionSetInstallPlanState (namespace , subs , installplanReference ); err != nil {
544
586
logger .WithError (err ).Debug ("error ensuring subscription installplan state" )
545
587
return err
546
588
}
547
589
return nil
548
590
}
549
591
592
+ func (o * Operator ) syncSubscriptions (obj interface {}) error {
593
+ sub , ok := obj .(* v1alpha1.Subscription )
594
+ if ! ok {
595
+ o .Log .Debugf ("wrong type: %#v" , obj )
596
+ return fmt .Errorf ("casting Subscription failed" )
597
+ }
598
+
599
+ o .resolveNamespace (sub .GetNamespace ())
600
+
601
+ return nil
602
+ }
603
+
604
+ func (o * Operator ) resolveNamespace (namespace string ) {
605
+ o .namespaceResolveQueue .AddRateLimited (namespace )
606
+ }
607
+
550
608
func (o * Operator ) ensureResolverSources (logger * logrus.Entry , namespace string ) map [resolver.CatalogKey ]registryclient.Interface {
551
609
// TODO: record connection status onto an object
552
610
resolverSources := make (map [resolver.CatalogKey ]registryclient.Interface , 0 )
@@ -606,6 +664,44 @@ func (o *Operator) nothingToUpdate(logger *logrus.Entry, sub *v1alpha1.Subscript
606
664
return false
607
665
}
608
666
667
+ func (o * Operator ) ensureSubscriptionInstallPlanState (logger * logrus.Entry , sub * v1alpha1.Subscription ) (* v1alpha1.Subscription , error ) {
668
+ if sub .Status .Install != nil {
669
+ return sub , nil
670
+ }
671
+
672
+ logger .Debug ("checking for existing installplan" )
673
+
674
+ // check if there's an installplan that created this subscription (only if it doesn't have a reference yet)
675
+ // this indicates it was newly resolved by another operator, and we should reference that installplan in the status
676
+ ips , err := o .lister .OperatorsV1alpha1 ().InstallPlanLister ().InstallPlans (sub .GetNamespace ()).List (labels .Everything ())
677
+ if err != nil {
678
+ logger .WithError (err ).Debug ("couldn't get installplans" )
679
+ // if we can't list, just continue processing
680
+ return sub , nil
681
+ }
682
+
683
+ out := sub .DeepCopy ()
684
+
685
+ for _ , ip := range ips {
686
+ for _ , step := range ip .Status .Plan {
687
+ // TODO: is this enough? should we check equality of pkg/channel?
688
+ if step != nil && step .Resource .Kind == v1alpha1 .SubscriptionKind && step .Resource .Name == sub .GetName () {
689
+ logger .WithField ("installplan" , ip .GetName ()).Debug ("found subscription in steps of existing installplan" )
690
+ out .Status .Install = o .referenceForInstallPlan (ip )
691
+ out .Status .State = v1alpha1 .SubscriptionStateUpgradePending
692
+ if updated , err := o .client .OperatorsV1alpha1 ().Subscriptions (sub .GetNamespace ()).UpdateStatus (out ); err != nil {
693
+ return nil , err
694
+ } else {
695
+ return updated , nil
696
+ }
697
+ }
698
+ }
699
+ }
700
+ logger .Debug ("did not find subscription in steps of existing installplan" )
701
+
702
+ return sub , nil
703
+ }
704
+
609
705
func (o * Operator ) ensureSubscriptionCSVState (logger * logrus.Entry , sub * v1alpha1.Subscription ) (* v1alpha1.Subscription , error ) {
610
706
if sub .Status .CurrentCSV == "" {
611
707
return sub , nil
@@ -637,7 +733,7 @@ func (o *Operator) ensureSubscriptionCSVState(logger *logrus.Entry, sub *v1alpha
637
733
return sub , nil
638
734
}
639
735
640
- func (o * Operator ) ensureSubscriptionInstallPlanState (namespace string , subs []* v1alpha1.Subscription , installPlanRef * v1alpha1.InstallPlanReference ) error {
736
+ func (o * Operator ) updateSubscriptionSetInstallPlanState (namespace string , subs []* v1alpha1.Subscription , installPlanRef * v1alpha1.InstallPlanReference ) error {
641
737
// TODO: parallel, sync waitgroup
642
738
for _ , sub := range subs {
643
739
sub .Status .Install = installPlanRef
@@ -699,13 +795,17 @@ func (o *Operator) createInstallPlan(namespace string, subs []*v1alpha1.Subscrip
699
795
if err != nil {
700
796
return nil , err
701
797
}
798
+ return o .referenceForInstallPlan (res ), nil
799
+
800
+ }
801
+
802
+ func (o * Operator ) referenceForInstallPlan (ip * v1alpha1.InstallPlan ) * v1alpha1.InstallPlanReference {
702
803
return & v1alpha1.InstallPlanReference {
703
- UID : res .GetUID (),
704
- Name : res .GetName (),
804
+ UID : ip .GetUID (),
805
+ Name : ip .GetName (),
705
806
APIVersion : v1alpha1 .SchemeGroupVersion .String (),
706
807
Kind : v1alpha1 .InstallPlanKind ,
707
- }, nil
708
-
808
+ }
709
809
}
710
810
711
811
func (o * Operator ) requeueSubscription (name , namespace string ) {
@@ -723,6 +823,7 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) {
723
823
}
724
824
725
825
logger := o .Log .WithFields (logrus.Fields {
826
+ "id" : queueinformer .NewLoopID (),
726
827
"ip" : plan .GetName (),
727
828
"namespace" : plan .GetNamespace (),
728
829
"phase" : plan .Status .Phase ,
@@ -749,7 +850,7 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) {
749
850
// notify subscription loop of installplan changes
750
851
if ownerutil .IsOwnedByKind (outInstallPlan , v1alpha1 .SubscriptionKind ) {
751
852
oref := ownerutil .GetOwnerByKind (outInstallPlan , v1alpha1 .SubscriptionKind )
752
- logger .Info ( "requeuing installplan owning subscription " )
853
+ logger .WithField ( "owner" , oref ). Debug ( "requeueing installplan owner " )
753
854
o .requeueSubscription (oref .Name , outInstallPlan .GetNamespace ())
754
855
}
755
856
@@ -900,7 +1001,7 @@ func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error {
900
1001
901
1002
// Attempt to create the Subscription
902
1003
sub .SetNamespace (namespace )
903
- created , err : = o .client .OperatorsV1alpha1 ().Subscriptions (sub .GetNamespace ()).Create (& sub )
1004
+ _ , err = o .client .OperatorsV1alpha1 ().Subscriptions (sub .GetNamespace ()).Create (& sub )
904
1005
if k8serrors .IsAlreadyExists (err ) {
905
1006
// If it already existed, mark the step as Present.
906
1007
plan .Status .Plan [i ].Status = v1alpha1 .StepStatusPresent
@@ -909,15 +1010,6 @@ func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error {
909
1010
} else {
910
1011
// If no error occurred, mark the step as Created.
911
1012
plan .Status .Plan [i ].Status = v1alpha1 .StepStatusCreated
912
- created .Status .Install = & v1alpha1.InstallPlanReference {
913
- UID : plan .GetUID (),
914
- Name : plan .GetName (),
915
- APIVersion : v1alpha1 .SchemeGroupVersion .String (),
916
- Kind : v1alpha1 .InstallPlanKind ,
917
- }
918
- if _ , err := o .client .OperatorsV1alpha1 ().Subscriptions (sub .GetNamespace ()).UpdateStatus (created ); err != nil {
919
- o .Log .WithError (err ).Warn ("couldn't set installplan reference on created subscription" )
920
- }
921
1013
}
922
1014
case secretKind :
923
1015
// TODO: this will confuse bundle users that include secrets in their bundles - this only handles pull secrets
0 commit comments