Skip to content

Commit c3a59fd

Browse files
Merge pull request #2271 from dinhxuanvu/pod-labels
Add filter labels to catalog pod cache
2 parents 1cb43cc + 9f1c29d commit c3a59fd

File tree

2 files changed

+41
-7
lines changed

2 files changed

+41
-7
lines changed

pkg/controller/bundle/bundle_unpacker.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/operator-framework/operator-registry/pkg/api"
1111
"github.com/operator-framework/operator-registry/pkg/configmap"
12+
"github.com/sirupsen/logrus"
1213
batchv1 "k8s.io/api/batch/v1"
1314
corev1 "k8s.io/api/core/v1"
1415
rbacv1 "k8s.io/api/rbac/v1"
@@ -39,6 +40,7 @@ const (
3940
// The time duration should be in the same format as accepted by time.ParseDuration()
4041
// e.g 1m30s
4142
BundleUnpackTimeoutAnnotationKey = "operatorframework.io/bundle-unpack-timeout"
43+
BundleUnpackPodLabel = "job-name"
4244
)
4345

4446
type BundleUnpackResult struct {
@@ -230,6 +232,7 @@ type Unpacker interface {
230232
}
231233

232234
type ConfigMapUnpacker struct {
235+
logger *logrus.Logger
233236
opmImage string
234237
utilImage string
235238
client kubernetes.Interface
@@ -277,6 +280,12 @@ func WithUtilImage(utilImage string) ConfigMapUnpackerOption {
277280
}
278281
}
279282

283+
func WithLogger(logger *logrus.Logger) ConfigMapUnpackerOption {
284+
return func(unpacker *ConfigMapUnpacker) {
285+
unpacker.logger = logger
286+
}
287+
}
288+
280289
func WithClient(client kubernetes.Interface) ConfigMapUnpackerOption {
281290
return func(unpacker *ConfigMapUnpacker) {
282291
unpacker.client = client
@@ -510,9 +519,10 @@ func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup,
510519
func (c *ConfigMapUnpacker) pendingContainerStatusMessages(job *batchv1.Job) (string, error) {
511520
containerStatusMessages := []string{}
512521
// List pods for unpack job
513-
podLabel := map[string]string{"job-name": job.GetName()}
514-
pods, listErr := c.podLister.Pods(job.GetNamespace()).List(k8slabels.SelectorFromSet(podLabel))
522+
podLabel := map[string]string{BundleUnpackPodLabel: job.GetName()}
523+
pods, listErr := c.podLister.Pods(job.GetNamespace()).List(k8slabels.SelectorFromValidatedSet(podLabel))
515524
if listErr != nil {
525+
c.logger.Errorf("Failed to list pods for job(%s): %v", job.GetName(), listErr)
516526
return "", fmt.Errorf("Failed to list pods for job(%s): %v", job.GetName(), listErr)
517527
}
518528

pkg/controller/operators/catalog/operator.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"k8s.io/apimachinery/pkg/labels"
2929
"k8s.io/apimachinery/pkg/runtime"
3030
"k8s.io/apimachinery/pkg/runtime/schema"
31+
"k8s.io/apimachinery/pkg/selection"
3132
utilclock "k8s.io/apimachinery/pkg/util/clock"
3233
utilerrors "k8s.io/apimachinery/pkg/util/errors"
3334
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -312,10 +313,32 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
312313
op.lister.CoreV1().RegisterServiceLister(metav1.NamespaceAll, serviceInformer.Lister())
313314
sharedIndexInformers = append(sharedIndexInformers, serviceInformer.Informer())
314315

315-
// Wire Pods
316-
podInformer := k8sInformerFactory.Core().V1().Pods()
317-
op.lister.CoreV1().RegisterPodLister(metav1.NamespaceAll, podInformer.Lister())
318-
sharedIndexInformers = append(sharedIndexInformers, podInformer.Informer())
316+
// Wire Pods for CatalogSource
317+
catsrcReq, err := labels.NewRequirement(reconciler.CatalogSourceLabelKey, selection.Exists, nil)
318+
if err != nil {
319+
return nil, err
320+
}
321+
322+
csPodLabels := labels.NewSelector()
323+
csPodLabels = csPodLabels.Add(*catsrcReq)
324+
csPodInformer := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod(), informers.WithTweakListOptions(func(options *metav1.ListOptions) {
325+
options.LabelSelector = csPodLabels.String()
326+
})).Core().V1().Pods()
327+
op.lister.CoreV1().RegisterPodLister(metav1.NamespaceAll, csPodInformer.Lister())
328+
sharedIndexInformers = append(sharedIndexInformers, csPodInformer.Informer())
329+
330+
// Wire Pods for BundleUnpack job
331+
buReq, err := labels.NewRequirement(bundle.BundleUnpackPodLabel, selection.Exists, nil)
332+
if err != nil {
333+
return nil, err
334+
}
335+
336+
buPodLabels := labels.NewSelector()
337+
buPodLabels = buPodLabels.Add(*buReq)
338+
buPodInformer := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod(), informers.WithTweakListOptions(func(options *metav1.ListOptions) {
339+
options.LabelSelector = buPodLabels.String()
340+
})).Core().V1().Pods()
341+
sharedIndexInformers = append(sharedIndexInformers, buPodInformer.Informer())
319342

320343
// Wire ConfigMaps
321344
configMapInformer := k8sInformerFactory.Core().V1().ConfigMaps()
@@ -346,11 +369,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
346369

347370
// Setup the BundleUnpacker
348371
op.bundleUnpacker, err = bundle.NewConfigmapUnpacker(
372+
bundle.WithLogger(op.logger),
349373
bundle.WithClient(op.opClient.KubernetesInterface()),
350374
bundle.WithCatalogSourceLister(catsrcInformer.Lister()),
351375
bundle.WithConfigMapLister(configMapInformer.Lister()),
352376
bundle.WithJobLister(jobInformer.Lister()),
353-
bundle.WithPodLister(podInformer.Lister()),
377+
bundle.WithPodLister(buPodInformer.Lister()),
354378
bundle.WithRoleLister(roleInformer.Lister()),
355379
bundle.WithRoleBindingLister(roleBindingInformer.Lister()),
356380
bundle.WithOPMImage(opmImage),

0 commit comments

Comments
 (0)