Skip to content

Commit 746f35f

Browse files
committed
🐛 Fakeclient: Allow concurrent patching if RV is unset
Currently, the fake client always errors if patches are done concurrently rather than only if the patch contains a ResourceVersion. This is because we have a number of checks including the one related to resourceVersion implemented in a `versionedTracker`. The `versionedTracker` receives the patched object and assumes that the patchedObject only contains a RV if the patch had one. That turns out not to be true, it almost always has one. The reason the object we receive in the `versionedTracker` almost always has a RV is that we use a client-go reactor to apply the patch to an object. The way this works is that the reactor takes the patch and a tracker, fetches the object from the tracker and applies the patch to it. This means that the resulting object always has a resourceVersion unless the patch explicitly set it to `null`. This `null` case apparently is special cased in the Kubernetes apiserver to be acceptable, so we do the same here. Fix the original issue by checking in the fakeclient if the patch modifies the RV and if not, retry conflicts.
1 parent 5af6ffa commit 746f35f

File tree

2 files changed

+78
-3
lines changed

2 files changed

+78
-3
lines changed

pkg/client/fake/client.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,11 @@ func (t versionedTracker) updateObject(gvr schema.GroupVersionResource, obj runt
468468
switch {
469469
case allowsUnconditionalUpdate(gvk):
470470
accessor.SetResourceVersion(oldAccessor.GetResourceVersion())
471+
// This is needed because if the patch explicitly sets the RV to null, the client-go reaction we use
472+
// to apply it and whose output we process here will have it unset. It is not clear why the Kubernetes
473+
// apiserver accepts such a patch, but it does so we just copy that behavior.
474+
// Kubernetes apiserver behavior can be checked like this:
475+
// `kubectl patch configmap foo --patch '{"metadata":{"annotations":{"foo":"bar"},"resourceVersion":null}}' -v=9`
471476
case bytes.
472477
Contains(debug.Stack(), []byte("sigs.k8s.io/controller-runtime/pkg/client/fake.(*fakeClient).Patch")):
473478
// We apply patches using a client-go reaction that ends up calling the trackers Update. As we can't change
@@ -904,6 +909,16 @@ func (c *fakeClient) patch(obj client.Object, patch client.Patch, opts ...client
904909
return err
905910
}
906911

912+
// retryOnConflict unless the patch includes a resourceVersion. We have to
913+
// compare the resource version of newObj with oldObject because newObj is
914+
// oldObj + the patch. It is important to note that the RV in it might be
915+
// different from the RV in obj, as oldObj is fetched from the tracker and
916+
// a concurrent actor could have modified it.
917+
retryOnConflict := true
918+
if newObj.GetResourceVersion() != oldAccessor.GetResourceVersion() {
919+
retryOnConflict = false
920+
}
921+
907922
// Validate that deletionTimestamp has not been changed
908923
if !deletionTimestampEqual(newObj, oldAccessor) {
909924
return fmt.Errorf("rejected patch, metadata.deletionTimestamp immutable")
@@ -912,6 +927,9 @@ func (c *fakeClient) patch(obj client.Object, patch client.Patch, opts ...client
912927
reaction := testing.ObjectReaction(c.tracker)
913928
handled, o, err := reaction(action)
914929
if err != nil {
930+
if retryOnConflict && apierrors.IsConflict(err) {
931+
return c.patch(obj, patch, opts...)
932+
}
915933
return err
916934
}
917935
if !handled {

pkg/client/fake/client_test.go

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"encoding/json"
2222
"fmt"
2323
"strconv"
24+
"sync"
2425
"time"
2526

2627
"github.com/google/go-cmp/cmp"
@@ -580,7 +581,7 @@ var _ = Describe("Fake client", func() {
580581
Expect(obj.ObjectMeta.ResourceVersion).To(Equal("1000"))
581582
})
582583

583-
It("should allow patch with non-set ResourceVersion for a resource that doesn't allow unconditional updates", func() {
584+
It("should allow patch when the patch sets RV to 'null'", func() {
584585
schemeBuilder := &scheme.Builder{GroupVersion: schema.GroupVersion{Group: "test", Version: "v1"}}
585586
schemeBuilder.Register(&WithPointerMeta{}, &WithPointerMetaList{})
586587

@@ -605,6 +606,7 @@ var _ = Describe("Fake client", func() {
605606
"foo": "bar",
606607
},
607608
}}
609+
608610
Expect(cl.Patch(context.Background(), newObj, client.MergeFrom(original))).To(Succeed())
609611

610612
patched := &WithPointerMeta{}
@@ -2134,6 +2136,61 @@ var _ = Describe("Fake client", func() {
21342136
Expect(apierrors.IsNotFound(err)).To(BeTrue())
21352137
})
21362138

2139+
It("should allow concurrent patches to a configMap", func() {
2140+
scheme := runtime.NewScheme()
2141+
Expect(corev1.AddToScheme(scheme)).To(Succeed())
2142+
2143+
obj := &corev1.ConfigMap{
2144+
ObjectMeta: metav1.ObjectMeta{
2145+
Name: "foo",
2146+
ResourceVersion: "0",
2147+
},
2148+
}
2149+
cl := NewClientBuilder().WithScheme(scheme).WithObjects(obj).Build()
2150+
wg := sync.WaitGroup{}
2151+
wg.Add(5)
2152+
2153+
for i := range 5 {
2154+
go func() {
2155+
defer wg.Done()
2156+
defer GinkgoRecover()
2157+
2158+
newObj := obj.DeepCopy()
2159+
newObj.Data = map[string]string{"foo": strconv.Itoa(i)}
2160+
Expect(cl.Patch(context.Background(), newObj, client.MergeFrom(obj))).To(Succeed())
2161+
}()
2162+
}
2163+
wg.Wait()
2164+
})
2165+
2166+
It("should not allow concurrent patches to a configMap if the patch contains a ResourceVersion", func() {
2167+
scheme := runtime.NewScheme()
2168+
Expect(corev1.AddToScheme(scheme)).To(Succeed())
2169+
2170+
obj := &corev1.ConfigMap{
2171+
ObjectMeta: metav1.ObjectMeta{
2172+
Name: "foo",
2173+
ResourceVersion: "0",
2174+
},
2175+
}
2176+
cl := NewClientBuilder().WithScheme(scheme).WithObjects(obj).Build()
2177+
wg := sync.WaitGroup{}
2178+
wg.Add(5)
2179+
2180+
for i := range 5 {
2181+
go func() {
2182+
defer wg.Done()
2183+
defer GinkgoRecover()
2184+
2185+
newObj := obj.DeepCopy()
2186+
newObj.ResourceVersion = "1" // include an invalid RV to cause a conflcit
2187+
newObj.Data = map[string]string{"foo": strconv.Itoa(i)}
2188+
Expect(apierrors.IsConflict(cl.Patch(context.Background(), newObj, client.MergeFrom(obj)))).To(BeTrue())
2189+
}()
2190+
}
2191+
wg.Wait()
2192+
})
2193+
21372194
It("disallows scale subresources on unsupported built-in types", func() {
21382195
scheme := runtime.NewScheme()
21392196
Expect(corev1.AddToScheme(scheme)).To(Succeed())
@@ -2288,8 +2345,8 @@ func (t *WithPointerMetaList) DeepCopyObject() runtime.Object {
22882345
}
22892346

22902347
type WithPointerMeta struct {
2291-
*metav1.TypeMeta
2292-
*metav1.ObjectMeta
2348+
*metav1.TypeMeta `json:",inline"`
2349+
*metav1.ObjectMeta `json:"metadata,omitempty"`
22932350
}
22942351

22952352
func (t *WithPointerMeta) DeepCopy() *WithPointerMeta {

0 commit comments

Comments
 (0)