Skip to content

Commit 7a5e12f

Browse files
sjpotterbobymicroby
authored andcommitted
CSC POC ontop of Parser
1 parent 52e5562 commit 7a5e12f

File tree

16 files changed

+270
-51
lines changed

16 files changed

+270
-51
lines changed

packages/client/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,7 @@ export { GEO_REPLY_WITH, GeoReplyWith } from './lib/commands/GEOSEARCH_WITH';
2525
export { SetOptions } from './lib/commands/SET';
2626

2727
export { REDIS_FLUSH_MODES } from './lib/commands/FLUSHALL';
28+
29+
import { BasicClientSideCache, BasicPooledClientSideCache } from './lib/client/cache';
30+
export { BasicClientSideCache, BasicPooledClientSideCache };
31+

packages/client/lib/client/commands-queue.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ export default class RedisCommandsQueue {
5656
return this.#pubSub.isActive;
5757
}
5858

59+
#invalidateCallback?: (key: RedisArgument | null) => unknown;
60+
5961
constructor(
6062
respVersion: RespVersions,
6163
maxLength: number | null | undefined,
@@ -109,13 +111,30 @@ export default class RedisCommandsQueue {
109111
onErrorReply: err => this.#onErrorReply(err),
110112
onPush: push => {
111113
if (!this.#onPush(push)) {
112-
114+
switch (push[0].toString()) {
115+
case "invalidate": {
116+
if (this.#invalidateCallback) {
117+
if (push[1] !== null) {
118+
for (const key of push[1]) {
119+
this.#invalidateCallback(key);
120+
}
121+
} else {
122+
this.#invalidateCallback(null);
123+
}
124+
}
125+
break;
126+
}
127+
}
113128
}
114129
},
115130
getTypeMapping: () => this.#getTypeMapping()
116131
});
117132
}
118133

134+
setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) {
135+
this.#invalidateCallback = callback;
136+
}
137+
119138
addCommand<T>(
120139
args: ReadonlyArray<RedisArgument>,
121140
options?: CommandOptions

packages/client/lib/client/index.ts

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { ScanOptions, ScanCommonOptions } from '../commands/SCAN';
1616
import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode';
1717
import { RedisPoolOptions, RedisClientPool } from './pool';
1818
import { RedisVariadicArgument, parseArgs, pushVariadicArguments } from '../commands/generic-transformers';
19+
import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } from './cache';
1920
import { BasicCommandParser, CommandParser } from './parser';
2021

2122
export interface RedisClientOptions<
@@ -80,6 +81,10 @@ export interface RedisClientOptions<
8081
* TODO
8182
*/
8283
commandOptions?: CommandOptions<TYPE_MAPPING>;
84+
/**
85+
* TODO
86+
*/
87+
clientSideCache?: ClientSideCacheProvider | ClientSideCacheConfig;
8388
}
8489

8590
type WithCommands<
@@ -303,9 +308,8 @@ export default class RedisClient<
303308
// was in a watch transaction when
304309
// a topology change occured
305310
#dirtyWatch?: string;
306-
#epoch: number;
307311
#watchEpoch?: number;
308-
312+
#clientSideCache?: ClientSideCacheProvider;
309313
#credentialsSubscription: Disposable | null = null;
310314

311315
get options(): RedisClientOptions<M, F, S, RESP> | undefined {
@@ -324,6 +328,11 @@ export default class RedisClient<
324328
return this._self.#queue.isPubSubActive;
325329
}
326330

331+
get socketEpoch() {
332+
return this._self.#socket.socketEpoch;
333+
}
334+
335+
327336
get isWatching() {
328337
return this._self.#watchEpoch !== undefined;
329338
}
@@ -348,10 +357,20 @@ export default class RedisClient<
348357

349358
constructor(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) {
350359
super();
360+
351361
this.#options = this.#initiateOptions(options);
352362
this.#queue = this.#initiateQueue();
353363
this.#socket = this.#initiateSocket();
354-
this.#epoch = 0;
364+
365+
if (options?.clientSideCache) {
366+
if (options.clientSideCache instanceof ClientSideCacheProvider) {
367+
this.#clientSideCache = options.clientSideCache;
368+
} else {
369+
const cscConfig = options.clientSideCache;
370+
this.#clientSideCache = new BasicClientSideCache(cscConfig);
371+
}
372+
this.#queue.setInvalidateCallback(this.#clientSideCache.invalidate.bind(this.#clientSideCache));
373+
}
355374
}
356375

357376
#initiateOptions(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> | undefined {
@@ -512,6 +531,13 @@ export default class RedisClient<
512531
);
513532
}
514533

534+
if (this.#clientSideCache) {
535+
const tracking = this.#clientSideCache.trackingOn();
536+
if (tracking) {
537+
commands.push(tracking);
538+
}
539+
}
540+
515541
return commands;
516542
}
517543

@@ -565,6 +591,7 @@ export default class RedisClient<
565591
})
566592
.on('error', err => {
567593
this.emit('error', err);
594+
this.#clientSideCache?.onError();
568595
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
569596
this.#queue.flushWaitingForReply(err);
570597
} else {
@@ -573,7 +600,6 @@ export default class RedisClient<
573600
})
574601
.on('connect', () => this.emit('connect'))
575602
.on('ready', () => {
576-
this.#epoch++;
577603
this.emit('ready');
578604
this.#setPingTimer();
579605
this.#maybeScheduleWrite();
@@ -701,13 +727,21 @@ export default class RedisClient<
701727
commandOptions: CommandOptions<TYPE_MAPPING> | undefined,
702728
transformReply: TransformReply | undefined,
703729
) {
704-
const reply = await this.sendCommand(parser.redisArgs, commandOptions);
730+
const csc = this._self.#clientSideCache;
731+
const defaultTypeMapping = this._self.#options?.commandOptions === commandOptions;
705732

706-
if (transformReply) {
707-
return transformReply(reply, parser.preserve, commandOptions?.typeMapping);
708-
}
733+
const fn = () => { return this.sendCommand(parser.redisArgs, commandOptions) };
709734

710-
return reply;
735+
if (csc && command.CACHEABLE && defaultTypeMapping) {
736+
return await csc.handleCache(this._self, parser as BasicCommandParser, fn, transformReply, commandOptions?.typeMapping);
737+
} else {
738+
const reply = await fn();
739+
740+
if (transformReply) {
741+
return transformReply(reply, parser.preserve, commandOptions?.typeMapping);
742+
}
743+
return reply;
744+
}
711745
}
712746

713747
/**
@@ -872,7 +906,7 @@ export default class RedisClient<
872906
const reply = await this._self.sendCommand(
873907
pushVariadicArguments(['WATCH'], key)
874908
);
875-
this._self.#watchEpoch ??= this._self.#epoch;
909+
this._self.#watchEpoch ??= this._self.socketEpoch;
876910
return reply as unknown as ReplyWithTypeMapping<SimpleStringReply<'OK'>, TYPE_MAPPING>;
877911
}
878912

@@ -939,7 +973,7 @@ export default class RedisClient<
939973
}
940974

941975
const chainId = Symbol('Pipeline Chain'),
942-
promise = Promise.all(
976+
promise = Promise.allSettled(
943977
commands.map(({ args }) => this._self.#queue.addCommand(args, {
944978
chainId,
945979
typeMapping: this._commandOptions?.typeMapping
@@ -975,7 +1009,7 @@ export default class RedisClient<
9751009
throw new WatchError(dirtyWatch);
9761010
}
9771011

978-
if (watchEpoch && watchEpoch !== this._self.#epoch) {
1012+
if (watchEpoch && watchEpoch !== this._self.socketEpoch) {
9791013
throw new WatchError('Client reconnected after WATCH');
9801014
}
9811015

@@ -1199,6 +1233,7 @@ export default class RedisClient<
11991233
return new Promise<void>(resolve => {
12001234
clearTimeout(this._self.#pingTimer);
12011235
this._self.#socket.close();
1236+
this._self.#clientSideCache?.onClose();
12021237

12031238
if (this._self.#queue.isEmpty()) {
12041239
this._self.#socket.destroySocket();
@@ -1225,6 +1260,7 @@ export default class RedisClient<
12251260
clearTimeout(this._self.#pingTimer);
12261261
this._self.#queue.flushAll(new DisconnectsClientError());
12271262
this._self.#socket.destroy();
1263+
this._self.#clientSideCache?.onClose();
12281264
this._self.#credentialsSubscription?.dispose();
12291265
this._self.#credentialsSubscription = null;
12301266
}

packages/client/lib/client/linked-list.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ export class DoublyLinkedList<T> {
114114
export interface SinglyLinkedNode<T> {
115115
value: T;
116116
next: SinglyLinkedNode<T> | undefined;
117+
removed: boolean;
117118
}
118119

119120
export class SinglyLinkedList<T> {
@@ -140,7 +141,8 @@ export class SinglyLinkedList<T> {
140141

141142
const node = {
142143
value,
143-
next: undefined
144+
next: undefined,
145+
removed: false
144146
};
145147

146148
if (this.#head === undefined) {
@@ -151,6 +153,9 @@ export class SinglyLinkedList<T> {
151153
}
152154

153155
remove(node: SinglyLinkedNode<T>, parent: SinglyLinkedNode<T> | undefined) {
156+
if (node.removed) {
157+
throw new Error("node already removed");
158+
}
154159
--this.#length;
155160

156161
if (this.#head === node) {
@@ -165,6 +170,8 @@ export class SinglyLinkedList<T> {
165170
} else {
166171
parent!.next = node.next;
167172
}
173+
174+
node.removed = true;
168175
}
169176

170177
shift() {
@@ -177,6 +184,7 @@ export class SinglyLinkedList<T> {
177184
this.#head = node.next;
178185
}
179186

187+
node.removed = true;
180188
return node.value;
181189
}
182190

packages/client/lib/client/parser.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ export class BasicCommandParser implements CommandParser {
3333
return this.#keys[0];
3434
}
3535

36+
get cacheKey() {
37+
let cacheKey = this.#redisArgs.map((arg) => arg.length).join('_');
38+
return cacheKey + '_' + this.#redisArgs.join('_');
39+
}
40+
3641
push(...arg: Array<RedisArgument>) {
3742
this.#redisArgs.push(...arg);
3843
};

0 commit comments

Comments
 (0)