Skip to content

Commit 0f629e9

Browse files
sjpotterbobymicroby
authored andcommitted
CSC POC ontop of Parser
1 parent 6f961bd commit 0f629e9

File tree

16 files changed

+271
-54
lines changed

16 files changed

+271
-54
lines changed

packages/client/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,7 @@ export { GEO_REPLY_WITH, GeoReplyWith } from './lib/commands/GEOSEARCH_WITH';
3434
export { SetOptions } from './lib/commands/SET';
3535

3636
export { REDIS_FLUSH_MODES } from './lib/commands/FLUSHALL';
37+
38+
import { BasicClientSideCache, BasicPooledClientSideCache } from './lib/client/cache';
39+
export { BasicClientSideCache, BasicPooledClientSideCache };
40+

packages/client/lib/client/commands-queue.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ export default class RedisCommandsQueue {
5656
return this.#pubSub.isActive;
5757
}
5858

59+
#invalidateCallback?: (key: RedisArgument | null) => unknown;
60+
5961
constructor(
6062
respVersion: RespVersions,
6163
maxLength: number | null | undefined,
@@ -109,13 +111,30 @@ export default class RedisCommandsQueue {
109111
onErrorReply: err => this.#onErrorReply(err),
110112
onPush: push => {
111113
if (!this.#onPush(push)) {
112-
114+
switch (push[0].toString()) {
115+
case "invalidate": {
116+
if (this.#invalidateCallback) {
117+
if (push[1] !== null) {
118+
for (const key of push[1]) {
119+
this.#invalidateCallback(key);
120+
}
121+
} else {
122+
this.#invalidateCallback(null);
123+
}
124+
}
125+
break;
126+
}
127+
}
113128
}
114129
},
115130
getTypeMapping: () => this.#getTypeMapping()
116131
});
117132
}
118133

134+
setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) {
135+
this.#invalidateCallback = callback;
136+
}
137+
119138
addCommand<T>(
120139
args: ReadonlyArray<RedisArgument>,
121140
options?: CommandOptions

packages/client/lib/client/index.ts

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { ScanOptions, ScanCommonOptions } from '../commands/SCAN';
1616
import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode';
1717
import { RedisPoolOptions, RedisClientPool } from './pool';
1818
import { RedisVariadicArgument, parseArgs, pushVariadicArguments } from '../commands/generic-transformers';
19+
import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } from './cache';
1920
import { BasicCommandParser, CommandParser } from './parser';
2021
import SingleEntryCache from '../single-entry-cache';
2122

@@ -81,6 +82,10 @@ export interface RedisClientOptions<
8182
* TODO
8283
*/
8384
commandOptions?: CommandOptions<TYPE_MAPPING>;
85+
/**
86+
* TODO
87+
*/
88+
clientSideCache?: ClientSideCacheProvider | ClientSideCacheConfig;
8489
}
8590

8691
type WithCommands<
@@ -313,9 +318,8 @@ export default class RedisClient<
313318
// was in a watch transaction when
314319
// a topology change occured
315320
#dirtyWatch?: string;
316-
#epoch: number;
317321
#watchEpoch?: number;
318-
322+
#clientSideCache?: ClientSideCacheProvider;
319323
#credentialsSubscription: Disposable | null = null;
320324

321325
get options(): RedisClientOptions<M, F, S, RESP> | undefined {
@@ -334,6 +338,11 @@ export default class RedisClient<
334338
return this._self.#queue.isPubSubActive;
335339
}
336340

341+
get socketEpoch() {
342+
return this._self.#socket.socketEpoch;
343+
}
344+
345+
337346
get isWatching() {
338347
return this._self.#watchEpoch !== undefined;
339348
}
@@ -358,10 +367,20 @@ export default class RedisClient<
358367

359368
constructor(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) {
360369
super();
370+
361371
this.#options = this.#initiateOptions(options);
362372
this.#queue = this.#initiateQueue();
363373
this.#socket = this.#initiateSocket();
364-
this.#epoch = 0;
374+
375+
if (options?.clientSideCache) {
376+
if (options.clientSideCache instanceof ClientSideCacheProvider) {
377+
this.#clientSideCache = options.clientSideCache;
378+
} else {
379+
const cscConfig = options.clientSideCache;
380+
this.#clientSideCache = new BasicClientSideCache(cscConfig);
381+
}
382+
this.#queue.setInvalidateCallback(this.#clientSideCache.invalidate.bind(this.#clientSideCache));
383+
}
365384
}
366385

367386
#initiateOptions(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> | undefined {
@@ -522,6 +541,13 @@ export default class RedisClient<
522541
);
523542
}
524543

544+
if (this.#clientSideCache) {
545+
const tracking = this.#clientSideCache.trackingOn();
546+
if (tracking) {
547+
commands.push(tracking);
548+
}
549+
}
550+
525551
return commands;
526552
}
527553

@@ -575,6 +601,7 @@ export default class RedisClient<
575601
})
576602
.on('error', err => {
577603
this.emit('error', err);
604+
this.#clientSideCache?.onError();
578605
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
579606
this.#queue.flushWaitingForReply(err);
580607
} else {
@@ -583,7 +610,6 @@ export default class RedisClient<
583610
})
584611
.on('connect', () => this.emit('connect'))
585612
.on('ready', () => {
586-
this.#epoch++;
587613
this.emit('ready');
588614
this.#setPingTimer();
589615
this.#maybeScheduleWrite();
@@ -711,14 +737,21 @@ export default class RedisClient<
711737
commandOptions: CommandOptions<TYPE_MAPPING> | undefined,
712738
transformReply: TransformReply | undefined,
713739
) {
714-
const reply = await this.sendCommand(parser.redisArgs, commandOptions);
740+
const csc = this._self.#clientSideCache;
741+
const defaultTypeMapping = this._self.#options?.commandOptions === commandOptions;
715742

716-
if (transformReply) {
717-
const res = transformReply(reply, parser.preserve, commandOptions?.typeMapping);
718-
return res
719-
}
743+
const fn = () => { return this.sendCommand(parser.redisArgs, commandOptions) };
720744

721-
return reply;
745+
if (csc && command.CACHEABLE && defaultTypeMapping) {
746+
return await csc.handleCache(this._self, parser as BasicCommandParser, fn, transformReply, commandOptions?.typeMapping);
747+
} else {
748+
const reply = await fn();
749+
750+
if (transformReply) {
751+
return transformReply(reply, parser.preserve, commandOptions?.typeMapping);
752+
}
753+
return reply;
754+
}
722755
}
723756

724757
/**
@@ -883,7 +916,7 @@ export default class RedisClient<
883916
const reply = await this._self.sendCommand(
884917
pushVariadicArguments(['WATCH'], key)
885918
);
886-
this._self.#watchEpoch ??= this._self.#epoch;
919+
this._self.#watchEpoch ??= this._self.socketEpoch;
887920
return reply as unknown as ReplyWithTypeMapping<SimpleStringReply<'OK'>, TYPE_MAPPING>;
888921
}
889922

@@ -950,7 +983,7 @@ export default class RedisClient<
950983
}
951984

952985
const chainId = Symbol('Pipeline Chain'),
953-
promise = Promise.all(
986+
promise = Promise.allSettled(
954987
commands.map(({ args }) => this._self.#queue.addCommand(args, {
955988
chainId,
956989
typeMapping: this._commandOptions?.typeMapping
@@ -986,7 +1019,7 @@ export default class RedisClient<
9861019
throw new WatchError(dirtyWatch);
9871020
}
9881021

989-
if (watchEpoch && watchEpoch !== this._self.#epoch) {
1022+
if (watchEpoch && watchEpoch !== this._self.socketEpoch) {
9901023
throw new WatchError('Client reconnected after WATCH');
9911024
}
9921025

@@ -1210,6 +1243,7 @@ export default class RedisClient<
12101243
return new Promise<void>(resolve => {
12111244
clearTimeout(this._self.#pingTimer);
12121245
this._self.#socket.close();
1246+
this._self.#clientSideCache?.onClose();
12131247

12141248
if (this._self.#queue.isEmpty()) {
12151249
this._self.#socket.destroySocket();
@@ -1236,6 +1270,7 @@ export default class RedisClient<
12361270
clearTimeout(this._self.#pingTimer);
12371271
this._self.#queue.flushAll(new DisconnectsClientError());
12381272
this._self.#socket.destroy();
1273+
this._self.#clientSideCache?.onClose();
12391274
this._self.#credentialsSubscription?.dispose();
12401275
this._self.#credentialsSubscription = null;
12411276
}

packages/client/lib/client/linked-list.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ export class DoublyLinkedList<T> {
114114
export interface SinglyLinkedNode<T> {
115115
value: T;
116116
next: SinglyLinkedNode<T> | undefined;
117+
removed: boolean;
117118
}
118119

119120
export class SinglyLinkedList<T> {
@@ -140,7 +141,8 @@ export class SinglyLinkedList<T> {
140141

141142
const node = {
142143
value,
143-
next: undefined
144+
next: undefined,
145+
removed: false
144146
};
145147

146148
if (this.#head === undefined) {
@@ -151,6 +153,9 @@ export class SinglyLinkedList<T> {
151153
}
152154

153155
remove(node: SinglyLinkedNode<T>, parent: SinglyLinkedNode<T> | undefined) {
156+
if (node.removed) {
157+
throw new Error("node already removed");
158+
}
154159
--this.#length;
155160

156161
if (this.#head === node) {
@@ -165,6 +170,8 @@ export class SinglyLinkedList<T> {
165170
} else {
166171
parent!.next = node.next;
167172
}
173+
174+
node.removed = true;
168175
}
169176

170177
shift() {
@@ -177,6 +184,7 @@ export class SinglyLinkedList<T> {
177184
this.#head = node.next;
178185
}
179186

187+
node.removed = true;
180188
return node.value;
181189
}
182190

packages/client/lib/client/parser.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ export class BasicCommandParser implements CommandParser {
3333
return this.#keys[0];
3434
}
3535

36+
get cacheKey() {
37+
let cacheKey = this.#redisArgs.map((arg) => arg.length).join('_');
38+
return cacheKey + '_' + this.#redisArgs.join('_');
39+
}
40+
3641
push(...arg: Array<RedisArgument>) {
3742
this.#redisArgs.push(...arg);
3843
};

0 commit comments

Comments
 (0)