@@ -14,9 +14,11 @@ import (
14
14
rbacv1 "k8s.io/api/rbac/v1"
15
15
extinf "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
16
16
k8serrors "k8s.io/apimachinery/pkg/api/errors"
17
+ "k8s.io/apimachinery/pkg/api/meta"
17
18
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
19
"k8s.io/apimachinery/pkg/labels"
19
20
"k8s.io/apimachinery/pkg/runtime"
21
+ "k8s.io/apimachinery/pkg/selection"
20
22
utilclock "k8s.io/apimachinery/pkg/util/clock"
21
23
utilerrors "k8s.io/apimachinery/pkg/util/errors"
22
24
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -68,6 +70,7 @@ type Operator struct {
68
70
copiedCSVLister operatorsv1alpha1listers.ClusterServiceVersionLister
69
71
ogQueueSet * queueinformer.ResourceQueueSet
70
72
csvQueueSet * queueinformer.ResourceQueueSet
73
+ olmConfigQueue workqueue.RateLimitingInterface
71
74
csvCopyQueueSet * queueinformer.ResourceQueueSet
72
75
copiedCSVGCQueueSet * queueinformer.ResourceQueueSet
73
76
objGCQueueSet * queueinformer.ResourceQueueSet
@@ -124,6 +127,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
124
127
client : config .externalClient ,
125
128
ogQueueSet : queueinformer .NewEmptyResourceQueueSet (),
126
129
csvQueueSet : queueinformer .NewEmptyResourceQueueSet (),
130
+ olmConfigQueue : workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "olmConfig" ),
127
131
csvCopyQueueSet : queueinformer .NewEmptyResourceQueueSet (),
128
132
copiedCSVGCQueueSet : queueinformer .NewEmptyResourceQueueSet (),
129
133
objGCQueueSet : queueinformer .NewEmptyResourceQueueSet (),
@@ -433,6 +437,26 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
433
437
return nil , err
434
438
}
435
439
440
+ // Register QueueInformer for olmConfig
441
+ olmConfigInformer := externalversions .NewSharedInformerFactoryWithOptions (
442
+ op .client ,
443
+ config .resyncPeriod (),
444
+ ).Operators ().V1 ().OLMConfigs ().Informer ()
445
+ olmConfigQueueInformer , err := queueinformer .NewQueueInformer (
446
+ ctx ,
447
+ queueinformer .WithInformer (olmConfigInformer ),
448
+ queueinformer .WithLogger (op .logger ),
449
+ queueinformer .WithQueue (op .olmConfigQueue ),
450
+ queueinformer .WithIndexer (olmConfigInformer .GetIndexer ()),
451
+ queueinformer .WithSyncer (queueinformer .LegacySyncHandler (op .syncOLMConfig ).ToSyncer ()),
452
+ )
453
+ if err != nil {
454
+ return nil , err
455
+ }
456
+ if err := op .RegisterQueueInformer (olmConfigQueueInformer ); err != nil {
457
+ return nil , err
458
+ }
459
+
436
460
k8sInformerFactory := informers .NewSharedInformerFactory (op .opClient .KubernetesInterface (), config .resyncPeriod ())
437
461
clusterRoleInformer := k8sInformerFactory .Rbac ().V1 ().ClusterRoles ()
438
462
op .lister .RbacV1 ().RegisterClusterRoleLister (clusterRoleInformer .Lister ())
@@ -1194,13 +1218,143 @@ func (a *Operator) syncClusterServiceVersion(obj interface{}) (syncError error)
1194
1218
return
1195
1219
}
1196
1220
1221
+ func (a * Operator ) allNamespaceOperatorGroups () ([]* v1.OperatorGroup , error ) {
1222
+ operatorGroups , err := a .lister .OperatorsV1 ().OperatorGroupLister ().List (labels .Everything ())
1223
+ if err != nil {
1224
+ return nil , err
1225
+ }
1226
+
1227
+ result := []* v1.OperatorGroup {}
1228
+ for _ , operatorGroup := range operatorGroups {
1229
+ if NewNamespaceSet (operatorGroup .Status .Namespaces ).IsAllNamespaces () {
1230
+ result = append (result , operatorGroup )
1231
+ }
1232
+ }
1233
+ return result , nil
1234
+ }
1235
+
1236
+ func (a * Operator ) syncOLMConfig (obj interface {}) (syncError error ) {
1237
+ a .logger .Info ("Processing olmConfig" )
1238
+ olmConfig , ok := obj .(* v1.OLMConfig )
1239
+ if ! ok {
1240
+ return fmt .Errorf ("casting OLMConfig failed" )
1241
+ }
1242
+
1243
+ // Generate an array of allNamespace OperatorGroups
1244
+ allNSOperatorGroups , err := a .allNamespaceOperatorGroups ()
1245
+ if err != nil {
1246
+ return err
1247
+ }
1248
+
1249
+ nonCopiedCSVRequirement , err := labels .NewRequirement (v1alpha1 .CopiedLabelKey , selection .DoesNotExist , []string {})
1250
+ if err != nil {
1251
+ return err
1252
+ }
1253
+
1254
+ csvIsRequeued := false
1255
+ for _ , og := range allNSOperatorGroups {
1256
+ // Get all copied CSVs owned by this operatorGroup
1257
+ copiedCSVRequirement , err := labels .NewRequirement (v1alpha1 .CopiedLabelKey , selection .Equals , []string {og .GetNamespace ()})
1258
+ if err != nil {
1259
+ return err
1260
+ }
1261
+
1262
+ copiedCSVs , err := a .copiedCSVLister .List (labels .NewSelector ().Add (* copiedCSVRequirement ))
1263
+ if err != nil {
1264
+ return err
1265
+ }
1266
+
1267
+ // Filter to unique copies
1268
+ uniqueCopiedCSVs := map [string ]struct {}{}
1269
+ for _ , copiedCSV := range copiedCSVs {
1270
+ uniqueCopiedCSVs [copiedCSV .GetName ()] = struct {}{}
1271
+ }
1272
+
1273
+ csvs , err := a .lister .OperatorsV1alpha1 ().ClusterServiceVersionLister ().ClusterServiceVersions (og .GetNamespace ()).List (labels .NewSelector ().Add (* nonCopiedCSVRequirement ))
1274
+ if err != nil {
1275
+ return err
1276
+ }
1277
+
1278
+ for _ , csv := range csvs {
1279
+ // If the correct number of copied CSVs were found, continue
1280
+ if _ , ok := uniqueCopiedCSVs [csv .GetName ()]; ok == olmConfig .CopiedCSVsAreEnabled () {
1281
+ continue
1282
+ }
1283
+
1284
+ if err := a .csvQueueSet .Requeue (csv .GetNamespace (), csv .GetName ()); err != nil {
1285
+ a .logger .WithError (err ).Warn ("unable to requeue" )
1286
+ }
1287
+ csvIsRequeued = true
1288
+ }
1289
+ }
1290
+
1291
+ // Update the olmConfig status if it has changed.
1292
+ condition := getCopiedCSVsCondition (! olmConfig .CopiedCSVsAreEnabled (), csvIsRequeued )
1293
+ if ! isStatusConditionPresentAndAreTypeReasonMessageStatusEqual (olmConfig .Status .Conditions , condition ) {
1294
+ meta .SetStatusCondition (& olmConfig .Status .Conditions , condition )
1295
+ if _ , err := a .client .OperatorsV1 ().OLMConfigs ().UpdateStatus (context .TODO (), olmConfig , metav1.UpdateOptions {}); err != nil {
1296
+ return err
1297
+ }
1298
+ }
1299
+
1300
+ return nil
1301
+ }
1302
+
1303
+ func isStatusConditionPresentAndAreTypeReasonMessageStatusEqual (conditions []metav1.Condition , condition metav1.Condition ) bool {
1304
+ foundCondition := meta .FindStatusCondition (conditions , condition .Type )
1305
+ if foundCondition == nil {
1306
+ return false
1307
+ }
1308
+ return foundCondition .Type == condition .Type &&
1309
+ foundCondition .Reason == condition .Reason &&
1310
+ foundCondition .Message == condition .Message &&
1311
+ foundCondition .Status == condition .Status
1312
+ }
1313
+
1314
+ func getCopiedCSVsCondition (isDisabled , csvIsRequeued bool ) metav1.Condition {
1315
+ condition := metav1.Condition {
1316
+ Type : v1 .DisabledCopiedCSVsConditionType ,
1317
+ LastTransitionTime : metav1 .Now (),
1318
+ Status : metav1 .ConditionFalse ,
1319
+ }
1320
+ if ! isDisabled {
1321
+ condition .Reason = "CopiedCSVsEnabled"
1322
+ condition .Message = "Copied CSVs are enabled and present accross the cluster"
1323
+ if csvIsRequeued {
1324
+ condition .Message = "Copied CSVs are enabled and at least one copied CSVs is missing"
1325
+ }
1326
+ return condition
1327
+ }
1328
+
1329
+ if csvIsRequeued {
1330
+ condition .Reason = "CopiedCSVsFound"
1331
+ condition .Message = "Copied CSVs are disabled and at least one copied CSV was found for an operator installed in AllNamespace mode"
1332
+ return condition
1333
+ }
1334
+
1335
+ condition .Status = metav1 .ConditionTrue
1336
+ condition .Reason = "NoCopiedCSVsFound"
1337
+ condition .Message = "Copied CSVs are disabled and none were found for operators installed in AllNamespace mode"
1338
+
1339
+ return condition
1340
+ }
1341
+
1197
1342
func (a * Operator ) syncCopyCSV (obj interface {}) (syncError error ) {
1198
1343
clusterServiceVersion , ok := obj .(* v1alpha1.ClusterServiceVersion )
1199
1344
if ! ok {
1200
1345
a .logger .Debugf ("wrong type: %#v" , obj )
1201
1346
return fmt .Errorf ("casting ClusterServiceVersion failed" )
1202
1347
}
1203
1348
1349
+ olmConfig , err := a .client .OperatorsV1 ().OLMConfigs ().Get (context .TODO (), "cluster" , metav1.GetOptions {})
1350
+ if err != nil && ! k8serrors .IsNotFound (err ) {
1351
+ return err
1352
+ }
1353
+
1354
+ if err == nil {
1355
+ go a .olmConfigQueue .AddAfter (olmConfig , time .Second * 5 )
1356
+ }
1357
+
1204
1358
logger := a .logger .WithFields (logrus.Fields {
1205
1359
"id" : queueinformer .NewLoopID (),
1206
1360
"csv" : clusterServiceVersion .GetName (),
@@ -1222,15 +1376,145 @@ func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) {
1222
1376
"targetNamespaces" : strings .Join (operatorGroup .Status .Namespaces , "," ),
1223
1377
}).Debug ("copying csv to targets" )
1224
1378
1379
+ copiedCSVsAreEnabled , err := a .copiedCSVsAreEnabled ()
1380
+ if err != nil {
1381
+ return err
1382
+ }
1383
+
1225
1384
// Check if we need to do any copying / annotation for the operatorgroup
1226
- if err := a .ensureCSVsInNamespaces (clusterServiceVersion , operatorGroup , NewNamespaceSet (operatorGroup .Status .Namespaces )); err != nil {
1227
- logger .WithError (err ).Info ("couldn't copy CSV to target namespaces" )
1228
- syncError = err
1385
+ namespaceSet := NewNamespaceSet (operatorGroup .Status .Namespaces )
1386
+ if copiedCSVsAreEnabled || ! namespaceSet .IsAllNamespaces () {
1387
+ if err := a .ensureCSVsInNamespaces (clusterServiceVersion , operatorGroup , namespaceSet ); err != nil {
1388
+ logger .WithError (err ).Info ("couldn't copy CSV to target namespaces" )
1389
+ syncError = err
1390
+ }
1391
+
1392
+ // If the CSV was installed in AllNamespace mode, remove any "CSV Copying Disabled" events
1393
+ // in which the related object's name, namespace, and uid match the given CSV's.
1394
+ if namespaceSet .IsAllNamespaces () {
1395
+ if err := a .deleteCSVCopyingDisabledEvent (clusterServiceVersion ); err != nil {
1396
+ return err
1397
+ }
1398
+ }
1399
+ return
1400
+ }
1401
+
1402
+ requirement , err := labels .NewRequirement (v1alpha1 .CopiedLabelKey , selection .Equals , []string {clusterServiceVersion .Namespace })
1403
+ if err != nil {
1404
+ return err
1405
+ }
1406
+
1407
+ copiedCSVs , err := a .copiedCSVLister .List (labels .NewSelector ().Add (* requirement ))
1408
+ if err != nil {
1409
+ return err
1410
+ }
1411
+
1412
+ for _ , copiedCSV := range copiedCSVs {
1413
+ err := a .client .OperatorsV1alpha1 ().ClusterServiceVersions (copiedCSV .Namespace ).Delete (context .TODO (), copiedCSV .Name , metav1.DeleteOptions {})
1414
+ if err != nil && ! k8serrors .IsNotFound (err ) {
1415
+ return err
1416
+ }
1417
+ }
1418
+
1419
+ if err := a .createCSVCopyingDisabledEvent (clusterServiceVersion ); err != nil {
1420
+ return err
1229
1421
}
1230
1422
1231
1423
return
1232
1424
}
1233
1425
1426
+ // copiedCSVsAreEnabled determines if csv copying is enabled for OLM.
1427
+ //
1428
+ // This method will first attempt to get the "cluster" olmConfig resource,
1429
+ // if any error other than "IsNotFound" is encountered, false and the error
1430
+ // will be returned.
1431
+ //
1432
+ // If the "cluster" olmConfig resource is found, the value of
1433
+ // olmConfig.spec.features.disableCopiedCSVs will be returned along with a
1434
+ // nil error.
1435
+ //
1436
+ // If the "cluster" olmConfig resource is not found, true will be returned
1437
+ // without an error.
1438
+ func (a * Operator ) copiedCSVsAreEnabled () (bool , error ) {
1439
+ olmConfig , err := a .client .OperatorsV1 ().OLMConfigs ().Get (context .TODO (), "cluster" , metav1.GetOptions {})
1440
+ if err != nil {
1441
+ // Default to true if olmConfig singleton cannot be found
1442
+ if k8serrors .IsNotFound (err ) {
1443
+ return true , nil
1444
+ }
1445
+ // If there was an error that wasn't an IsNotFound, return the error
1446
+ return false , err
1447
+ }
1448
+
1449
+ // If there was no error, return value based on olmConfig singleton
1450
+ return olmConfig .CopiedCSVsAreEnabled (), nil
1451
+ }
1452
+
1453
+ func (a * Operator ) getCopiedCSVDisabledEventsForCSV (csv * v1alpha1.ClusterServiceVersion ) ([]corev1.Event , error ) {
1454
+ result := []corev1.Event {}
1455
+ if csv == nil {
1456
+ return result , nil
1457
+ }
1458
+
1459
+ events , err := a .opClient .KubernetesInterface ().CoreV1 ().Events (csv .GetNamespace ()).List (context .TODO (), metav1.ListOptions {})
1460
+ if err != nil {
1461
+ return nil , err
1462
+ }
1463
+
1464
+ for _ , event := range events .Items {
1465
+ if event .InvolvedObject .Namespace == csv .GetNamespace () &&
1466
+ event .InvolvedObject .Name == csv .GetName () &&
1467
+ event .InvolvedObject .UID == csv .GetUID () &&
1468
+ event .Reason == v1 .DisabledCopiedCSVsConditionType {
1469
+ result = append (result , event )
1470
+ }
1471
+ }
1472
+
1473
+ return result , nil
1474
+ }
1475
+
1476
+ func (a * Operator ) deleteCSVCopyingDisabledEvent (csv * v1alpha1.ClusterServiceVersion ) error {
1477
+ events , err := a .getCopiedCSVDisabledEventsForCSV (csv )
1478
+ if err != nil {
1479
+ return err
1480
+ }
1481
+
1482
+ // Remove existing events.
1483
+ return a .deleteEvents (events )
1484
+ }
1485
+
1486
+ func (a * Operator ) deleteEvents (events []corev1.Event ) error {
1487
+ for _ , event := range events {
1488
+ err := a .opClient .KubernetesInterface ().EventsV1 ().Events (event .GetNamespace ()).Delete (context .TODO (), event .GetName (), metav1.DeleteOptions {})
1489
+ if err != nil && ! k8serrors .IsNotFound (err ) {
1490
+ return err
1491
+ }
1492
+ }
1493
+ return nil
1494
+ }
1495
+
1496
+ func (a * Operator ) createCSVCopyingDisabledEvent (csv * v1alpha1.ClusterServiceVersion ) error {
1497
+ events , err := a .getCopiedCSVDisabledEventsForCSV (csv )
1498
+ if err != nil {
1499
+ return err
1500
+ }
1501
+
1502
+ if len (events ) == 1 {
1503
+ return nil
1504
+ }
1505
+
1506
+ // Remove existing events.
1507
+ if len (events ) > 1 {
1508
+ if err := a .deleteEvents (events ); err != nil {
1509
+ return err
1510
+ }
1511
+ }
1512
+
1513
+ a .recorder .Eventf (csv , corev1 .EventTypeWarning , v1 .DisabledCopiedCSVsConditionType , "CSV copying disabled for %s/%s" , csv .GetNamespace (), csv .GetName ())
1514
+
1515
+ return nil
1516
+ }
1517
+
1234
1518
func (a * Operator ) syncGcCsv (obj interface {}) (syncError error ) {
1235
1519
clusterServiceVersion , ok := obj .(* v1alpha1.ClusterServiceVersion )
1236
1520
if ! ok {
0 commit comments