Skip to content

Commit ea5041d

Browse files
authored
Merge pull request #781 from bbatha/fix-informer-watch-optimization-restart
fix(cache): watch errors must call done handler
2 parents f274cbf + d12ec26 commit ea5041d

File tree

2 files changed

+98
-7
lines changed

2 files changed

+98
-7
lines changed

src/cache.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,10 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
120120

121121
private async doneHandler(err: any): Promise<any> {
122122
this._stop();
123-
if (err && err.statusCode === 410) {
123+
if (
124+
err &&
125+
((err as { statusCode?: number }).statusCode === 410 || (err as { code?: number }).code === 410)
126+
) {
124127
this.resourceVersion = '';
125128
} else if (err) {
126129
this.callbackCache[ERROR].forEach((elt: ErrorCallback) => elt(err));
@@ -187,7 +190,11 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
187190
addOrUpdateObject(namespaceList, obj);
188191
}
189192

190-
private watchHandler(phase: string, obj: T, watchObj?: any): void {
193+
private async watchHandler(
194+
phase: string,
195+
obj: T,
196+
watchObj?: { type: string; object: KubernetesObject },
197+
): Promise<void> {
191198
switch (phase) {
192199
case 'ADDED':
193200
case 'MODIFIED':
@@ -213,10 +220,11 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
213220
case 'BOOKMARK':
214221
// nothing to do, here for documentation, mostly.
215222
break;
223+
case 'ERROR':
224+
await this.doneHandler(obj);
225+
return;
216226
}
217-
if (watchObj && watchObj.metadata) {
218-
this.resourceVersion = watchObj.metadata.resourceVersion;
219-
}
227+
this.resourceVersion = obj.metadata!.resourceVersion || '';
220228
}
221229
}
222230

src/cache_test.ts

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,9 +1096,10 @@ describe('ListWatchCache', () => {
10961096
{
10971097
metadata: {
10981098
name: 'name3',
1099+
resourceVersion: '23456',
10991100
} as V1ObjectMeta,
11001101
} as V1Namespace,
1101-
{ metadata: { resourceVersion: '23456' } },
1102+
{ type: 'ADDED', metadata: { resourceVersion: '23456' } },
11021103
);
11031104

11041105
await informer.stop();
@@ -1153,9 +1154,91 @@ describe('ListWatchCache', () => {
11531154
{
11541155
metadata: {
11551156
name: 'name3',
1157+
resourceVersion: '23456',
11561158
} as V1ObjectMeta,
11571159
} as V1Namespace,
1158-
{ metadata: { resourceVersion: '23456' } },
1160+
{ type: 'ADDED', metadata: { resourceVersion: '23456' } },
1161+
);
1162+
1163+
await informer.stop();
1164+
1165+
let errorEmitted = false;
1166+
informer.on('error', () => (errorEmitted = true));
1167+
1168+
promise = new Promise((resolve) => {
1169+
mock.when(
1170+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1171+
).thenCall(() => {
1172+
resolve(new FakeRequest());
1173+
});
1174+
});
1175+
1176+
informer.start();
1177+
await promise;
1178+
1179+
const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();
1180+
1181+
const object = {
1182+
kind: 'Status',
1183+
apiVersion: 'v1',
1184+
metadata: {},
1185+
status: 'Failure',
1186+
message: 'too old resource version: 12345 (1234)',
1187+
reason: 'Expired',
1188+
code: 410,
1189+
};
1190+
await watchHandler('ERROR', object, { type: 'ERROR', object });
1191+
1192+
mock.verify(
1193+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1194+
).thrice();
1195+
expect(errorEmitted).to.equal(false);
1196+
expect(listCalls).to.be.equal(2);
1197+
});
1198+
1199+
it('should list if the watch errors from the last version', async () => {
1200+
const fakeWatch = mock.mock(Watch);
1201+
const list: V1Pod[] = [];
1202+
const listObj = {
1203+
metadata: {
1204+
resourceVersion: '12345',
1205+
} as V1ListMeta,
1206+
items: list,
1207+
} as V1NamespaceList;
1208+
1209+
let listCalls = 0;
1210+
const listFn: ListPromise<V1Namespace> = function(): Promise<{
1211+
response: http.IncomingMessage;
1212+
body: V1NamespaceList;
1213+
}> {
1214+
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>((resolve) => {
1215+
listCalls++;
1216+
resolve({ response: {} as http.IncomingMessage, body: listObj });
1217+
});
1218+
};
1219+
let promise = new Promise((resolve) => {
1220+
mock.when(
1221+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1222+
).thenCall(() => {
1223+
resolve(new FakeRequest());
1224+
});
1225+
});
1226+
1227+
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);
1228+
1229+
informer.start();
1230+
await promise;
1231+
1232+
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();
1233+
watchHandler(
1234+
'ADDED',
1235+
{
1236+
metadata: {
1237+
name: 'name3',
1238+
resourceVersion: '23456',
1239+
} as V1ObjectMeta,
1240+
} as V1Namespace,
1241+
{ type: 'ADDED', metadata: { resourceVersion: '23456' } },
11591242
);
11601243

11611244
await informer.stop();

0 commit comments

Comments
 (0)