Skip to content

Commit d97cfc8

Browse files
committed
fix(cache): update cache with O(1) data structures
The prior implementation used arrays to store cached objects. This meant that updates were O(n). In a controller I developed that managed ~70k PV and ~70k pvc 99.9% of CPU time was spent in cache updates pegging the entire process. This new implementation doesn't even have cache updates show up in the profiles and is using ~25m cpu for the same number of objects.
1 parent f274cbf commit d97cfc8

File tree

2 files changed

+105
-88
lines changed

2 files changed

+105
-88
lines changed

src/cache.ts

Lines changed: 87 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,23 @@ export interface ObjectCache<T> {
2020
list(namespace?: string): ReadonlyArray<T>;
2121
}
2222

23+
// exported for testing
24+
export type CacheMap<T extends KubernetesObject> = Map<string, Map<string, T>>;
25+
2326
export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, Informer<T> {
24-
private objects: T[] = [];
27+
private objects: CacheMap<T> = new Map();
2528
private resourceVersion: string;
26-
private readonly indexCache: { [key: string]: T[] } = {};
27-
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T> | ErrorCallback> } = {};
29+
private readonly callbackCache: {
30+
[key: string]: Array<ObjectCallback<T> | ErrorCallback>;
31+
} = {};
2832
private request: RequestResult | undefined;
29-
private stopped: boolean = false;
33+
private stopped = false;
3034

3135
public constructor(
3236
private readonly path: string,
3337
private readonly watch: Watch,
3438
private readonly listFn: ListPromise<T>,
35-
autoStart: boolean = true,
39+
autoStart = true,
3640
private readonly labelSelector?: string,
3741
) {
3842
this.callbackCache[ADD] = [];
@@ -93,18 +97,18 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
9397
}
9498

9599
public get(name: string, namespace?: string): T | undefined {
96-
return this.objects.find(
97-
(obj: T): boolean => {
98-
return obj.metadata!.name === name && (!namespace || obj.metadata!.namespace === namespace);
99-
},
100-
);
100+
const nsObjects = this.objects.get(namespace || '');
101+
if (nsObjects) {
102+
return nsObjects.get(name);
103+
}
101104
}
102105

103106
public list(namespace?: string | undefined): ReadonlyArray<T> {
104-
if (!namespace) {
105-
return this.objects;
107+
const namespaceObjects = this.objects.get(namespace || '');
108+
if (!namespaceObjects) {
109+
return [];
106110
}
107-
return this.indexCache[namespace] as ReadonlyArray<T>;
111+
return Array.from(namespaceObjects.values());
108112
}
109113

110114
public latestResourceVersion(): string {
@@ -118,9 +122,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
118122
}
119123
}
120124

121-
private async doneHandler(err: any): Promise<any> {
125+
private async doneHandler(err: unknown): Promise<void> {
122126
this._stop();
123-
if (err && err.statusCode === 410) {
127+
if (err && (err as { statusCode?: number }).statusCode === 410) {
124128
this.resourceVersion = '';
125129
} else if (err) {
126130
this.callbackCache[ERROR].forEach((elt: ErrorCallback) => elt(err));
@@ -136,16 +140,8 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
136140
const result = await promise;
137141
const list = result.body;
138142
this.objects = deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice());
139-
Object.keys(this.indexCache).forEach((key) => {
140-
const updateObjects = deleteItems(this.indexCache[key], list.items);
141-
if (updateObjects.length !== 0) {
142-
this.indexCache[key] = updateObjects;
143-
} else {
144-
delete this.indexCache[key];
145-
}
146-
});
147143
this.addOrUpdateItems(list.items);
148-
this.resourceVersion = list.metadata!.resourceVersion!;
144+
this.resourceVersion = list.metadata!.resourceVersion || '';
149145
}
150146
const queryParams = {
151147
resourceVersion: this.resourceVersion,
@@ -172,22 +168,10 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
172168
this.callbackCache[ADD].slice(),
173169
this.callbackCache[UPDATE].slice(),
174170
);
175-
if (obj.metadata!.namespace) {
176-
this.indexObj(obj);
177-
}
178171
});
179172
}
180173

181-
private indexObj(obj: T): void {
182-
let namespaceList = this.indexCache[obj.metadata!.namespace!] as T[];
183-
if (!namespaceList) {
184-
namespaceList = [];
185-
this.indexCache[obj.metadata!.namespace!] = namespaceList;
186-
}
187-
addOrUpdateObject(namespaceList, obj);
188-
}
189-
190-
private watchHandler(phase: string, obj: T, watchObj?: any): void {
174+
private watchHandler(phase: string, obj: T, watchObj?: KubernetesObject): void {
191175
switch (phase) {
192176
case 'ADDED':
193177
case 'MODIFIED':
@@ -197,73 +181,97 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
197181
this.callbackCache[ADD].slice(),
198182
this.callbackCache[UPDATE].slice(),
199183
);
200-
if (obj.metadata!.namespace) {
201-
this.indexObj(obj);
202-
}
203184
break;
204185
case 'DELETED':
205186
deleteObject(this.objects, obj, this.callbackCache[DELETE].slice());
206-
if (obj.metadata!.namespace) {
207-
const namespaceList = this.indexCache[obj.metadata!.namespace!] as T[];
208-
if (namespaceList) {
209-
deleteObject(namespaceList, obj);
210-
}
211-
}
212187
break;
213188
case 'BOOKMARK':
214189
// nothing to do, here for documentation, mostly.
215190
break;
216191
}
217192
if (watchObj && watchObj.metadata) {
218-
this.resourceVersion = watchObj.metadata.resourceVersion;
193+
this.resourceVersion = watchObj.metadata!.resourceVersion || '';
194+
}
195+
}
196+
}
197+
198+
// exported for testing
199+
export function cacheMapFromList<T extends KubernetesObject>(newObjects: T[]): CacheMap<T> {
200+
const objects: CacheMap<T> = new Map();
201+
// build up the new list
202+
for (const obj of newObjects) {
203+
let namespaceObjects = objects.get(obj.metadata!.namespace || '');
204+
if (!namespaceObjects) {
205+
namespaceObjects = new Map();
206+
objects.set(obj.metadata!.namespace || '', namespaceObjects);
219207
}
208+
209+
const name = obj.metadata!.name || '';
210+
namespaceObjects.set(name, obj);
220211
}
212+
return objects;
221213
}
222214

223215
// external for testing
224216
export function deleteItems<T extends KubernetesObject>(
225-
oldObjects: T[],
217+
oldObjects: CacheMap<T>,
226218
newObjects: T[],
227219
deleteCallback?: Array<ObjectCallback<T>>,
228-
): T[] {
229-
return oldObjects.filter((obj: T) => {
230-
if (findKubernetesObject(newObjects, obj) === -1) {
231-
if (deleteCallback) {
232-
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(obj));
220+
): CacheMap<T> {
221+
const objects = cacheMapFromList(newObjects);
222+
223+
if (!deleteCallback) {
224+
return objects;
225+
}
226+
227+
for (const [namespace, oldNamespaceObjects] of oldObjects.entries()) {
228+
const newNamespaceObjects = objects.get(namespace);
229+
if (newNamespaceObjects) {
230+
for (const [name, oldObj] of oldNamespaceObjects.entries()) {
231+
if (newNamespaceObjects.has(name)) {
232+
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(oldObj));
233+
}
233234
}
234-
return false;
235+
} else {
236+
oldNamespaceObjects.forEach((obj: T) => {
237+
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(obj));
238+
});
235239
}
236-
return true;
237-
});
240+
}
241+
242+
return objects;
238243
}
239244

240245
// Only public for testing.
241246
export function addOrUpdateObject<T extends KubernetesObject>(
242-
objects: T[],
247+
objects: CacheMap<T>,
243248
obj: T,
244249
addCallback?: Array<ObjectCallback<T>>,
245250
updateCallback?: Array<ObjectCallback<T>>,
246251
): void {
247-
const ix = findKubernetesObject(objects, obj);
248-
if (ix === -1) {
249-
objects.push(obj);
252+
let namespaceObjects = objects.get(obj.metadata!.namespace || '');
253+
if (!namespaceObjects) {
254+
namespaceObjects = new Map();
255+
objects.set(obj.metadata!.namespace || '', namespaceObjects);
256+
}
257+
258+
const name = obj.metadata!.name || '';
259+
const found = namespaceObjects.get(name);
260+
if (!found) {
261+
namespaceObjects.set(name, obj);
250262
if (addCallback) {
251263
addCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
252264
}
253265
} else {
254-
if (!isSameVersion(objects[ix], obj)) {
255-
objects[ix] = obj;
266+
if (!isSameVersion(found, obj)) {
267+
namespaceObjects.set(name, obj);
256268
if (updateCallback) {
257269
updateCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
258270
}
259271
}
260272
}
261273
}
262274

263-
function isSameObject<T extends KubernetesObject>(o1: T, o2: T): boolean {
264-
return o1.metadata!.name === o2.metadata!.name && o1.metadata!.namespace === o2.metadata!.namespace;
265-
}
266-
267275
function isSameVersion<T extends KubernetesObject>(o1: T, o2: T): boolean {
268276
return (
269277
o1.metadata!.resourceVersion !== undefined &&
@@ -272,23 +280,26 @@ function isSameVersion<T extends KubernetesObject>(o1: T, o2: T): boolean {
272280
);
273281
}
274282

275-
function findKubernetesObject<T extends KubernetesObject>(objects: T[], obj: T): number {
276-
return objects.findIndex((elt: T) => {
277-
return isSameObject(elt, obj);
278-
});
279-
}
280-
281283
// Public for testing.
282284
export function deleteObject<T extends KubernetesObject>(
283-
objects: T[],
285+
objects: CacheMap<T>,
284286
obj: T,
285287
deleteCallback?: Array<ObjectCallback<T>>,
286288
): void {
287-
const ix = findKubernetesObject(objects, obj);
288-
if (ix !== -1) {
289-
objects.splice(ix, 1);
289+
const namespace = obj.metadata!.namespace || '';
290+
const name = obj.metadata!.name || '';
291+
292+
const namespaceObjects = objects.get(namespace);
293+
if (!namespaceObjects) {
294+
return;
295+
}
296+
const deleted = namespaceObjects.delete(name);
297+
if (deleted) {
290298
if (deleteCallback) {
291299
deleteCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
292300
}
301+
if (namespaceObjects.size === 0) {
302+
objects.delete(namespace);
303+
}
293304
}
294305
}

src/cache_test.ts

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { Duplex } from 'stream';
99
import { EventEmitter } from 'ws';
1010

1111
import { V1Namespace, V1NamespaceList, V1ObjectMeta, V1Pod, V1ListMeta } from './api';
12-
import { deleteObject, ListWatch, deleteItems } from './cache';
12+
import { deleteObject, ListWatch, deleteItems, CacheMap, cacheMapFromList } from './cache';
1313
import { KubeConfig } from './config';
1414
import { Cluster, Context, User } from './config_types';
1515
import { ADD, UPDATE, DELETE, ERROR, ListPromise, CHANGE } from './informer';
@@ -689,7 +689,7 @@ describe('ListWatchCache', () => {
689689
});
690690

691691
it('should delete an object correctly', () => {
692-
const list: V1Pod[] = [
692+
const cache: CacheMap<V1Pod> = cacheMapFromList([
693693
{
694694
metadata: {
695695
name: 'name1',
@@ -702,28 +702,34 @@ describe('ListWatchCache', () => {
702702
namespace: 'ns2',
703703
} as V1ObjectMeta,
704704
} as V1Pod,
705-
];
706-
deleteObject(list, {
705+
]);
706+
deleteObject(cache, {
707707
metadata: {
708708
name: 'other',
709709
namespace: 'ns1',
710710
},
711711
} as V1Pod);
712-
expect(list.length).to.equal(2);
713-
deleteObject(list, {
712+
expect(cache.size).to.equal(2);
713+
expect(cache.get('ns1').size).to.equal(1);
714+
expect(cache.get('ns2').size).to.equal(1);
715+
deleteObject(cache, {
714716
metadata: {
715717
name: 'name1',
716718
namespace: 'ns2',
717719
},
718720
} as V1Pod);
719-
expect(list.length).to.equal(2);
720-
deleteObject(list, {
721+
expect(cache.size).to.equal(2);
722+
expect(cache.get('ns1').size).to.equal(1);
723+
expect(cache.get('ns2').size).to.equal(1);
724+
deleteObject(cache, {
721725
metadata: {
722726
name: 'name1',
723727
namespace: 'ns1',
724728
},
725729
} as V1Pod);
726-
expect(list.length).to.equal(1);
730+
expect(cache.size).to.equal(1);
731+
expect(cache.has('ns1')).to.equal(false);
732+
expect(cache.get('ns2').size).to.equal(1);
727733
});
728734

729735
it('should not call handlers which have been unregistered', async () => {
@@ -1247,7 +1253,7 @@ describe('ListWatchCache', () => {
12471253

12481254
describe('delete items', () => {
12491255
it('should remove correctly', () => {
1250-
const listA: V1Pod[] = [
1256+
const cacheA: CacheMap<V1Pod> = cacheMapFromList([
12511257
{
12521258
metadata: {
12531259
name: 'name1',
@@ -1260,7 +1266,7 @@ describe('delete items', () => {
12601266
namespace: 'ns2',
12611267
} as V1ObjectMeta,
12621268
} as V1Pod,
1263-
];
1269+
]);
12641270
const listB: V1Pod[] = [
12651271
{
12661272
metadata: {
@@ -1285,7 +1291,7 @@ describe('delete items', () => {
12851291
];
12861292
const pods: V1Pod[] = [];
12871293

1288-
deleteItems(listA, listB, [(obj?: V1Pod) => pods.push(obj!)]);
1294+
deleteItems(cacheA, listB, [(obj?: V1Pod) => pods.push(obj!)]);
12891295
expect(pods).to.deep.equal(expected);
12901296
});
12911297

0 commit comments

Comments
 (0)