Skip to content

Commit 68d02b3

Browse files
ncdcfabianvf
andcommitted
UPSTREAM: <carry>: support logical clusters
Signed-off-by: Andy Goldstein <[email protected]> Co-authored-by: Fabian von Feilitzsch <[email protected]>
1 parent 108fafd commit 68d02b3

File tree

4 files changed

+53
-29
lines changed

4 files changed

+53
-29
lines changed

pkg/cache/internal/cache_reader.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import (
2121
"fmt"
2222
"reflect"
2323

24+
kcpcache "github.com/kcp-dev/apimachinery/pkg/cache"
25+
"github.com/kcp-dev/logicalcluster"
26+
2427
apierrors "k8s.io/apimachinery/pkg/api/errors"
2528
apimeta "k8s.io/apimachinery/pkg/api/meta"
2629
"k8s.io/apimachinery/pkg/fields"
@@ -54,11 +57,11 @@ type CacheReader struct {
5457
}
5558

5659
// Get checks the indexer for the object and writes a copy of it if found.
57-
func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object) error {
60+
func (c *CacheReader) Get(ctx context.Context, key client.ObjectKey, out client.Object) error {
5861
if c.scopeName == apimeta.RESTScopeNameRoot {
5962
key.Namespace = ""
6063
}
61-
storeKey := objectKeyToStoreKey(key)
64+
storeKey := objectKeyToStoreKey(ctx, key)
6265

6366
// Lookup the object from the indexer cache
6467
obj, exists, err := c.indexer.GetByKey(storeKey)
@@ -105,13 +108,15 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Ob
105108
}
106109

107110
// List lists items out of the indexer and writes them to out.
108-
func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...client.ListOption) error {
111+
func (c *CacheReader) List(ctx context.Context, out client.ObjectList, opts ...client.ListOption) error {
109112
var objs []interface{}
110113
var err error
111114

112115
listOpts := client.ListOptions{}
113116
listOpts.ApplyOptions(opts)
114117

118+
clusterName, _ := logicalcluster.ClusterFromContext(ctx)
119+
115120
switch {
116121
case listOpts.FieldSelector != nil:
117122
// TODO(directxman12): support more complicated field selectors by
@@ -125,9 +130,17 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
125130
// namespace.
126131
objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToNamespacedKey(listOpts.Namespace, val))
127132
case listOpts.Namespace != "":
128-
objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace)
133+
if clusterName.Empty() {
134+
objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace)
135+
} else {
136+
objs, err = c.indexer.ByIndex(kcpcache.ClusterAndNamespaceIndexName, kcpcache.ToClusterAwareKey(clusterName.String(), listOpts.Namespace, ""))
137+
}
129138
default:
130-
objs = c.indexer.List()
139+
if clusterName.Empty() {
140+
objs = c.indexer.List()
141+
} else {
142+
objs, err = c.indexer.ByIndex(kcpcache.ClusterIndexName, kcpcache.ToClusterAwareKey(clusterName.String(), "", ""))
143+
}
131144
}
132145
if err != nil {
133146
return err
@@ -179,7 +192,12 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
179192
// It's akin to MetaNamespaceKeyFunc. It's separate from
180193
// String to allow keeping the key format easily in sync with
181194
// MetaNamespaceKeyFunc.
182-
func objectKeyToStoreKey(k client.ObjectKey) string {
195+
func objectKeyToStoreKey(ctx context.Context, k client.ObjectKey) string {
196+
cluster, ok := logicalcluster.ClusterFromContext(ctx)
197+
if ok {
198+
return kcpcache.ToClusterAwareKey(cluster.String(), k.Namespace, k.Name)
199+
}
200+
183201
if k.Namespace == "" {
184202
return k.Name
185203
}

pkg/handler/enqueue.go

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ limitations under the License.
1717
package handler
1818

1919
import (
20+
"github.com/kcp-dev/logicalcluster"
2021
"k8s.io/apimachinery/pkg/types"
2122
"k8s.io/client-go/util/workqueue"
23+
"sigs.k8s.io/controller-runtime/pkg/client"
2224
"sigs.k8s.io/controller-runtime/pkg/event"
2325
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
2426
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -41,25 +43,16 @@ func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.Rate
4143
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
4244
return
4345
}
44-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
45-
Name: evt.Object.GetName(),
46-
Namespace: evt.Object.GetNamespace(),
47-
}})
46+
q.Add(request(evt.Object))
4847
}
4948

5049
// Update implements EventHandler.
5150
func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
5251
switch {
5352
case evt.ObjectNew != nil:
54-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
55-
Name: evt.ObjectNew.GetName(),
56-
Namespace: evt.ObjectNew.GetNamespace(),
57-
}})
53+
q.Add(request(evt.ObjectNew))
5854
case evt.ObjectOld != nil:
59-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
60-
Name: evt.ObjectOld.GetName(),
61-
Namespace: evt.ObjectOld.GetNamespace(),
62-
}})
55+
q.Add(request(evt.ObjectOld))
6356
default:
6457
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
6558
}
@@ -71,10 +64,7 @@ func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.Rate
7164
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
7265
return
7366
}
74-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
75-
Name: evt.Object.GetName(),
76-
Namespace: evt.Object.GetNamespace(),
77-
}})
67+
q.Add(request(evt.Object))
7868
}
7969

8070
// Generic implements EventHandler.
@@ -83,8 +73,16 @@ func (e *EnqueueRequestForObject) Generic(evt event.GenericEvent, q workqueue.Ra
8373
enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt)
8474
return
8575
}
86-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
87-
Name: evt.Object.GetName(),
88-
Namespace: evt.Object.GetNamespace(),
89-
}})
76+
q.Add(request(evt.Object))
77+
}
78+
79+
func request(obj client.Object) reconcile.Request {
80+
return reconcile.Request{
81+
// TODO(kcp) Need to implement a non-kcp-specific way to support this
82+
ClusterName: logicalcluster.From(obj).String(),
83+
NamespacedName: types.NamespacedName{
84+
Namespace: obj.GetNamespace(),
85+
Name: obj.GetName(),
86+
},
87+
}
9088
}

pkg/handler/enqueue_owner.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package handler
1919
import (
2020
"fmt"
2121

22+
"github.com/kcp-dev/logicalcluster"
2223
"k8s.io/apimachinery/pkg/api/meta"
2324
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2425
"k8s.io/apimachinery/pkg/runtime"
@@ -134,9 +135,12 @@ func (e *EnqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object,
134135
// object in the event.
135136
if ref.Kind == e.groupKind.Kind && refGV.Group == e.groupKind.Group {
136137
// Match found - add a Request for the object referred to in the OwnerReference
137-
request := reconcile.Request{NamespacedName: types.NamespacedName{
138-
Name: ref.Name,
139-
}}
138+
request := reconcile.Request{
139+
ClusterName: logicalcluster.From(object).String(),
140+
NamespacedName: types.NamespacedName{
141+
Name: ref.Name,
142+
},
143+
}
140144

141145
// if owner is not namespaced then we should set the namespace to the empty
142146
mapping, err := e.mapper.RESTMapping(e.groupKind, refGV.Version)

pkg/reconcile/reconcile.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ func (r *Result) IsZero() bool {
4747
type Request struct {
4848
// NamespacedName is the name and namespace of the object to reconcile.
4949
types.NamespacedName
50+
51+
// ClusterName can be used for reconciling requests across multiple clusters,
52+
// to prevent objects with the same name and namespace from conflicting
53+
ClusterName string
5054
}
5155

5256
/*

0 commit comments

Comments
 (0)