Skip to content

Commit b290157

Browse files
committed
First pass at creating catalog switching impl
Signed-off-by: John Hunkins <[email protected]>
1 parent e944849 commit b290157

File tree

12 files changed

+1148
-19
lines changed

12 files changed

+1148
-19
lines changed

cmd/catalog/main.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client"
1818
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog"
19+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalogtempate"
1920
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
2021
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorstatus"
2122
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/server"
@@ -140,9 +141,17 @@ func main() {
140141
log.Panicf("error configuring operator: %s", err.Error())
141142
}
142143

144+
opCatalogTemplate, err := catalogtempate.NewOperator(ctx, *kubeConfigPath, logger, *wakeupInterval, *catalogNamespace)
145+
if err != nil {
146+
log.Panicf("error configuring operator: %s", err.Error())
147+
}
148+
143149
op.Run(ctx)
144150
<-op.Ready()
145151

152+
opCatalogTemplate.Run(ctx)
153+
<-opCatalogTemplate.Ready()
154+
146155
if *writeStatusName != "" {
147156
operatorstatus.MonitorClusterStatus(*writeStatusName, op.AtLevel(), op.Done(), opClient, configClient, crClient)
148157
}

pkg/controller/operators/catalog/operator.go

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import (
5454
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
5555
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
5656
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/solver"
57+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/catalogsource"
5758
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/clients"
5859
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
5960
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/event"
@@ -800,24 +801,6 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
800801
return reflect.DeepEqual(a, b)
801802
}
802803

803-
updateStatusFunc := func(catsrc *v1alpha1.CatalogSource) error {
804-
latest, err := o.client.OperatorsV1alpha1().CatalogSources(catsrc.GetNamespace()).Get(context.TODO(), catsrc.GetName(), metav1.GetOptions{})
805-
if err != nil {
806-
logger.Errorf("error getting catalogsource - %v", err)
807-
return err
808-
}
809-
810-
out := latest.DeepCopy()
811-
out.Status = catsrc.Status
812-
813-
if _, err := o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(context.TODO(), out, metav1.UpdateOptions{}); err != nil {
814-
logger.Errorf("error while setting catalogsource status condition - %v", err)
815-
return err
816-
}
817-
818-
return nil
819-
}
820-
821804
chain := []CatalogSourceSyncFunc{
822805
validateSourceType,
823806
o.syncConfigMap,
@@ -838,7 +821,7 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
838821
return
839822
}
840823

841-
updateErr := updateStatusFunc(out)
824+
updateErr := catalogsource.UpdateStatus(logger, o.client, out)
842825
if syncError == nil && updateErr != nil {
843826
syncError = updateErr
844827
}
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
package catalogtempate
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
"sync"
8+
"time"
9+
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
12+
"github.com/operator-framework/api/pkg/operators/v1alpha1"
13+
14+
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
15+
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions"
16+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/catalogsource"
17+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
18+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
19+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
20+
"github.com/sirupsen/logrus"
21+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
22+
"k8s.io/apimachinery/pkg/runtime/schema"
23+
"k8s.io/client-go/discovery"
24+
"k8s.io/client-go/discovery/cached/memory"
25+
"k8s.io/client-go/dynamic"
26+
"k8s.io/client-go/dynamic/dynamicinformer"
27+
"k8s.io/client-go/restmapper"
28+
"k8s.io/client-go/tools/cache"
29+
"k8s.io/client-go/tools/clientcmd"
30+
"k8s.io/client-go/util/workqueue"
31+
)
32+
33+
const (
34+
StatusTypeTemplatesHaveResolved = "TemplatesHaveResolved"
35+
StatusTypeResolvedImage = "ResolvedImage"
36+
37+
ReasonUnableToResolve = "UnableToResolve"
38+
ReasonAllTemplatesResolved = "AllTemplatesResolved"
39+
)
40+
41+
type Operator struct {
42+
queueinformer.Operator
43+
logger *logrus.Logger // common logger
44+
namespace string // operator namespace
45+
client versioned.Interface // client used for OLM CRs
46+
dynamicClient dynamic.Interface // client used to dynamically discover resources
47+
dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory // factory to create shared informers for dynamic resources
48+
discoveryClient *discovery.DiscoveryClient // queries the API server to discover resources
49+
mapper *restmapper.DeferredDiscoveryRESTMapper // maps between GVK and GVR
50+
lister operatorlister.OperatorLister // union of versioned informer listers
51+
catalogSourceTemplateQueueSet *queueinformer.ResourceQueueSet // work queues for a catalog source update
52+
resyncPeriod func() time.Duration // period of time between resync
53+
dynamicResourceWatchesMap sync.Map // map to keep track of what GVR we've already opened watches for
54+
ctx context.Context // context used for shutting down
55+
56+
// cancel context.CancelFunc
57+
// once sync.Once
58+
}
59+
60+
func NewOperator(ctx context.Context, kubeconfigPath string, logger *logrus.Logger, resync time.Duration, operatorNamespace string) (*Operator, error) {
61+
resyncPeriod := queueinformer.ResyncWithJitter(resync, 0.2)
62+
63+
config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
64+
if err != nil {
65+
return nil, err
66+
}
67+
68+
// Create a new client for OLM types (CRs)
69+
crClient, err := versioned.NewForConfig(config)
70+
if err != nil {
71+
return nil, err
72+
}
73+
74+
// Create a new client for dynamic types
75+
dynamicClient, err := dynamic.NewForConfig(config)
76+
if err != nil {
77+
return nil, err
78+
}
79+
80+
// Create a new queueinformer-based operator.
81+
opClient, err := operatorclient.NewClientFromRestConfig(config)
82+
if err != nil {
83+
return nil, err
84+
}
85+
86+
queueOperator, err := queueinformer.NewOperator(opClient.KubernetesInterface().Discovery(), queueinformer.WithOperatorLogger(logger))
87+
if err != nil {
88+
return nil, err
89+
}
90+
91+
// DiscoveryClient queries the API server to discover resources
92+
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
93+
if err != nil {
94+
return nil, err
95+
}
96+
97+
mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(discoveryClient))
98+
99+
// Create an OperatorLister
100+
lister := operatorlister.NewLister()
101+
102+
op := &Operator{
103+
Operator: queueOperator,
104+
logger: logger,
105+
namespace: operatorNamespace,
106+
client: crClient,
107+
dynamicClient: dynamicClient,
108+
dynamicInformerFactory: dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, resyncPeriod()),
109+
discoveryClient: discoveryClient,
110+
mapper: mapper,
111+
lister: lister,
112+
catalogSourceTemplateQueueSet: queueinformer.NewEmptyResourceQueueSet(),
113+
resyncPeriod: resyncPeriod,
114+
// dynamicResourceWatchesMap: map[string]struct{}{},
115+
ctx: ctx,
116+
}
117+
118+
// Wire OLM CR sharedIndexInformers
119+
crInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(op.client, op.resyncPeriod())
120+
121+
// Wire CatalogSources
122+
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
123+
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
124+
catalogTemplateSrcQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "catalogSourceTemplate")
125+
op.catalogSourceTemplateQueueSet.Set(metav1.NamespaceAll, catalogTemplateSrcQueue)
126+
catsrcQueueInformer, err := queueinformer.NewQueueInformer(
127+
op.ctx,
128+
// TODO: commented out sections I don't think are necessary
129+
// queueinformer.WithMetricsProvider(metrics.NewMetricsCatalogSource(op.lister.OperatorsV1alpha1().CatalogSourceLister())),
130+
queueinformer.WithLogger(op.logger),
131+
queueinformer.WithQueue(catalogTemplateSrcQueue),
132+
queueinformer.WithInformer(catsrcInformer.Informer()),
133+
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncCatalogSources).ToSyncer()), // ToSyncerWithDelete(op.handleCatSrcDeletion)), TODO do we need to handle deletion specially?
134+
)
135+
if err != nil {
136+
return nil, err
137+
}
138+
if err := op.RegisterQueueInformer(catsrcQueueInformer); err != nil {
139+
return nil, err
140+
}
141+
142+
return op, nil
143+
}
144+
145+
func (o *Operator) syncCatalogSources(obj interface{}) error {
146+
// this is an opportunity to update the server version (regardless of any other actions for processing a catalog source)
147+
o.updateServerVersion()
148+
149+
inputCatalogSource, ok := obj.(*v1alpha1.CatalogSource)
150+
if !ok {
151+
o.logger.Debugf("wrong type: %#v", obj)
152+
return fmt.Errorf("casting CatalogSource failed")
153+
}
154+
155+
outputCatalogSource := inputCatalogSource.DeepCopy()
156+
157+
logger := o.logger.WithFields(logrus.Fields{
158+
"catSrcName": outputCatalogSource.GetName(),
159+
"id": queueinformer.NewLoopID(),
160+
})
161+
logger.Info("syncing catalog source for annotation templates")
162+
163+
// this is our opportunity to discover GVK templates and setup watchers (if possible)
164+
foundGVKs := catalogsource.InitializeCatalogSourceTemplates(outputCatalogSource)
165+
for _, gvk := range foundGVKs {
166+
o.processGVK(o.ctx, logger, gvk.GroupVersionKind)
167+
}
168+
169+
catalogImageTemplate := catalogsource.GetCatalogTemplateAnnotation(outputCatalogSource)
170+
171+
processedCatalogImageTemplate, unresolvedTemplates := catalogsource.ReplaceTemplates(catalogImageTemplate)
172+
173+
// make sure everything was resolved
174+
if len(unresolvedTemplates) == 0 {
175+
// all templates have been resolved
176+
177+
namespace := outputCatalogSource.GetNamespace()
178+
179+
// make sure that the processed image reference is actually different and update accordingly
180+
if outputCatalogSource.Spec.Image != processedCatalogImageTemplate {
181+
182+
outputCatalogSource.Spec.Image = processedCatalogImageTemplate
183+
184+
catalogsource.UpdateImageReferenceAndStatusCondition(logger, o.client, outputCatalogSource,
185+
metav1.Condition{
186+
Type: StatusTypeTemplatesHaveResolved,
187+
Status: metav1.ConditionTrue,
188+
Reason: ReasonAllTemplatesResolved,
189+
Message: "catalog image reference was successfully resolved",
190+
},
191+
metav1.Condition{
192+
Type: StatusTypeResolvedImage,
193+
Status: metav1.ConditionTrue,
194+
Reason: ReasonAllTemplatesResolved,
195+
Message: processedCatalogImageTemplate,
196+
},
197+
)
198+
199+
logger.Infof("The catalog image for catalog source %q within namespace %q image has been updated to %q", outputCatalogSource.GetName(), namespace, processedCatalogImageTemplate)
200+
} else {
201+
logger.Infof("The catalog image for catalog source %q within namespace %q image does not require an update because the image has not changed", outputCatalogSource.GetName(), namespace)
202+
}
203+
} else {
204+
// at least one template was unresolved, so update status accordingly
205+
206+
// quote the values and use comma separator
207+
quotedTemplates := fmt.Sprintf(`"%s"`, strings.Join(unresolvedTemplates, `", "`))
208+
209+
catalogsource.UpdateStatusCondition(logger, o.client, outputCatalogSource,
210+
metav1.Condition{
211+
Type: StatusTypeTemplatesHaveResolved,
212+
Status: metav1.ConditionFalse,
213+
Reason: ReasonUnableToResolve,
214+
Message: fmt.Sprintf("Cannot construct catalog image reference, variable(s) %s couldn't be resolved", quotedTemplates),
215+
},
216+
metav1.Condition{
217+
Type: StatusTypeResolvedImage,
218+
Status: metav1.ConditionFalse,
219+
Reason: ReasonUnableToResolve,
220+
Message: processedCatalogImageTemplate,
221+
},
222+
)
223+
if _, err := o.client.OperatorsV1alpha1().CatalogSources(outputCatalogSource.GetNamespace()).UpdateStatus(context.TODO(), outputCatalogSource, metav1.UpdateOptions{}); err != nil {
224+
logger.WithError(err).Error("unable to update CatalogSource status condition")
225+
return err
226+
}
227+
}
228+
229+
return nil
230+
}
231+
232+
// processGVK sets up a watcher for the GVK provided (if possible). Errors are logged but not returned
233+
func (o *Operator) processGVK(ctx context.Context, logger *logrus.Entry, gvk schema.GroupVersionKind) {
234+
if gvk.Empty() {
235+
logger.Warn("provided GVK is empty, unable to add watch")
236+
return
237+
}
238+
// setup a watcher for the GVK
239+
240+
mapping, err := o.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
241+
if err != nil {
242+
logger.WithError(err).Warn("unable to obtain preferred rest mapping for GVK %s", gvk.String())
243+
return
244+
}
245+
246+
// see if we already setup a watcher for this resource
247+
if _, ok := o.dynamicResourceWatchesMap.Load(mapping.Resource); !ok {
248+
// we've not come across this resource before so setup a watcher
249+
informer := o.dynamicInformerFactory.ForResource(mapping.Resource)
250+
informer.Informer().AddEventHandlerWithResyncPeriod(o.eventHandlers(ctx, o.processDynamicWatches), o.resyncPeriod())
251+
go informer.Informer().Run(ctx.Done())
252+
o.dynamicResourceWatchesMap.Store(mapping.Resource, struct{}{})
253+
}
254+
}
255+
256+
// eventHandlers is a generic handler that forwards all calls to provided notify function
257+
func (o *Operator) eventHandlers(ctx context.Context, notify func(ctx context.Context, obj interface{})) cache.ResourceEventHandlerFuncs {
258+
return cache.ResourceEventHandlerFuncs{
259+
AddFunc: func(obj interface{}) {
260+
notify(ctx, obj)
261+
},
262+
UpdateFunc: func(oldObj, newObj interface{}) {
263+
notify(ctx, newObj)
264+
},
265+
DeleteFunc: func(obj interface{}) {
266+
notify(ctx, obj)
267+
},
268+
}
269+
}
270+
271+
func (o *Operator) processDynamicWatches(ctx context.Context, obj interface{}) {
272+
// this is an opportunity to update the server version (regardless of any other actions for processing a dynamic watch)
273+
o.updateServerVersion()
274+
275+
if u, ok := obj.(*unstructured.Unstructured); ok {
276+
catalogsource.UpdateGVKValue(u, o.logger)
277+
} else {
278+
o.logger.Warn("object provided to processDynamicWatches was not unstructured.Unstructured type")
279+
}
280+
281+
}
282+
283+
func (o *Operator) updateServerVersion() {
284+
if serverVersion, err := o.discoveryClient.ServerVersion(); err != nil {
285+
o.logger.WithError(err).Warn("unable to obtain server version from discovery client")
286+
} else {
287+
catalogsource.UpdateKubeVersion(serverVersion, o.logger)
288+
}
289+
}

0 commit comments

Comments
 (0)