Skip to content

Commit aec9e48

Browse files
authored
[server] Use a text feature flag to toggle which message updates come from redis and messagebus (#18209)
1 parent d44af7a commit aec9e48

File tree

2 files changed

+49
-13
lines changed

2 files changed

+49
-13
lines changed

components/server/src/messaging/local-message-broker.ts

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import { log } from "@gitpod/gitpod-protocol/lib/util/logging";
1515
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
1616
import { inject, injectable } from "inversify";
1717
import { MessageBusIntegration } from "../workspace/messagebus-integration";
18+
import { getExperimentsClientForBackend } from "@gitpod/gitpod-protocol/lib/experiments/configcat-server";
1819

1920
export interface PrebuildUpdateListener {
2021
(ctx: TraceContext, evt: PrebuildWithStatus): void;
@@ -69,7 +70,12 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
6970
this.disposables.push(
7071
this.messageBusIntegration.listenForPrebuildUpdates(
7172
undefined,
72-
(ctx: TraceContext, update: PrebuildWithStatus) => {
73+
async (ctx: TraceContext, update: PrebuildWithStatus) => {
74+
const enabled = await this.isRedisPubSubByTypeEnabled("prebuild");
75+
if (enabled) {
76+
log.debug("[messagebus] Prebuild listener is disabled through feature flag");
77+
return;
78+
}
7379
TraceContext.setOWI(ctx, { workspaceId: update.info.buildWorkspaceId });
7480

7581
const listeners = this.prebuildUpdateListeners.get(update.info.projectId) || [];
@@ -91,7 +97,12 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
9197
);
9298
this.disposables.push(
9399
this.messageBusIntegration.listenForPrebuildUpdatableQueue(
94-
(ctx: TraceContext, evt: HeadlessWorkspaceEvent) => {
100+
async (ctx: TraceContext, evt: HeadlessWorkspaceEvent) => {
101+
const enabled = await this.isRedisPubSubByTypeEnabled("prebuild-updatable");
102+
if (enabled) {
103+
log.debug("[messagebus] Prebuild updatable listener is disabled through feature flag");
104+
return;
105+
}
95106
TraceContext.setOWI(ctx, { workspaceId: evt.workspaceID });
96107

97108
const listeners =
@@ -110,7 +121,13 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
110121
this.disposables.push(
111122
this.messageBusIntegration.listenForWorkspaceInstanceUpdates(
112123
undefined,
113-
(ctx: TraceContext, instance: WorkspaceInstance, userId: string | undefined) => {
124+
async (ctx: TraceContext, instance: WorkspaceInstance, userId: string | undefined) => {
125+
const enabled = await this.isRedisPubSubByTypeEnabled("workspace-instance");
126+
if (enabled) {
127+
log.debug("[messagebus] Workspace instance listner is disabled through feature flag");
128+
return;
129+
}
130+
114131
TraceContext.setOWI(ctx, { userId, instanceId: instance.id });
115132

116133
if (!userId) {
@@ -170,4 +187,15 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
170187
}
171188
});
172189
}
190+
191+
private async isRedisPubSubByTypeEnabled(
192+
type: "workspace-instance" | "prebuild" | "prebuild-updatable",
193+
): Promise<boolean> {
194+
const enabledTypes = await getExperimentsClientForBackend().getValueAsync(
195+
"enableRedisPubSubByUpdateType",
196+
"none",
197+
{},
198+
);
199+
return enabledTypes.indexOf(type) >= 0;
200+
}
173201
}

components/server/src/messaging/redis-subscriber.ts

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import {
1919
} from "@gitpod/gitpod-protocol";
2020
import { log } from "@gitpod/gitpod-protocol/lib/util/logging";
2121
import { getExperimentsClientForBackend } from "@gitpod/gitpod-protocol/lib/experiments/configcat-server";
22-
import { Attributes } from "@gitpod/gitpod-protocol/lib/experiments/types";
2322
import { reportRedisUpdateCompleted, reportRedisUpdateReceived } from "../prometheus-metrics";
2423
import { Redis } from "ioredis";
2524

@@ -42,12 +41,6 @@ export class RedisSubscriber implements LocalMessageBroker {
4241
this.redis.on("message", async (channel: string, message: string) => {
4342
reportRedisUpdateReceived(channel);
4443

45-
const featureEnabled = await this.isRedisPubSubEnabled({});
46-
if (!featureEnabled) {
47-
log.debug("[redis] Redis listener is disabled through feature flag", { channel, message });
48-
return;
49-
}
50-
5144
let err: Error | undefined;
5245
try {
5346
await this.onMessage(channel, message);
@@ -64,6 +57,15 @@ export class RedisSubscriber implements LocalMessageBroker {
6457
private async onMessage(channel: string, message: string): Promise<void> {
6558
switch (channel) {
6659
case WorkspaceInstanceUpdatesChannel:
60+
const enabled = await this.isRedisPubSubByTypeEnabled("workspace-instance");
61+
if (!enabled) {
62+
log.debug("[redis] Redis workspace instance update is disabled through feature flag", {
63+
channel,
64+
message,
65+
});
66+
return;
67+
}
68+
6769
const parsed = JSON.parse(message) as RedisWorkspaceInstanceUpdate;
6870
return this.onInstanceUpdate(parsed);
6971
default:
@@ -112,8 +114,14 @@ export class RedisSubscriber implements LocalMessageBroker {
112114
});
113115
}
114116

115-
private async isRedisPubSubEnabled(attributes: Attributes): Promise<boolean> {
116-
const enabled = await getExperimentsClientForBackend().getValueAsync("enableRedisPubSub", false, attributes);
117-
return enabled;
117+
private async isRedisPubSubByTypeEnabled(
118+
type: "workspace-instance" | "prebuild" | "prebuild-updatable",
119+
): Promise<boolean> {
120+
const enabledTypes = await getExperimentsClientForBackend().getValueAsync(
121+
"enableRedisPubSubByUpdateType",
122+
"none",
123+
{},
124+
);
125+
return enabledTypes.indexOf(type) >= 0;
118126
}
119127
}

0 commit comments

Comments
 (0)