Skip to content

[server, ws-man-bridge] Remove messagebus WEB-618 #18238

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions components/server/src/container-module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

import { ContainerModule } from "inversify";

import { MessageBusHelper, MessageBusHelperImpl } from "@gitpod/gitpod-messagebus/lib";
import { MessagebusConfiguration } from "@gitpod/gitpod-messagebus/lib/config";
import { IAnalyticsWriter } from "@gitpod/gitpod-protocol/lib/analytics";
import {
ConfigCatClientFactory,
Expand Down Expand Up @@ -73,7 +71,6 @@ import { WebhookEventGarbageCollector } from "./jobs/webhook-gc";
import { WorkspaceGarbageCollector } from "./jobs/workspace-gc";
import { LinkedInService } from "./linkedin-service";
import { LivenessController } from "./liveness/liveness-controller";
import { LocalMessageBroker, LocalRabbitMQBackedMessageBroker } from "./messaging/local-message-broker";
import { MonitoringEndpointsApp } from "./monitoring-endpoints";
import { OAuthController } from "./oauth-server/oauth-controller";
import { OneTimeSecretServer } from "./one-time-secret-server";
Expand Down Expand Up @@ -117,7 +114,6 @@ import { HeadlessLogController } from "./workspace/headless-log-controller";
import { HeadlessLogService } from "./workspace/headless-log-service";
import { ImageSourceProvider } from "./workspace/image-source-provider";
import { ImageBuildPrefixContextParser } from "./workspace/imagebuild-prefix-context-parser";
import { MessageBusIntegration } from "./workspace/messagebus-integration";
import { OpenPrebuildPrefixContextParser } from "./workspace/open-prebuild-prefix-context-parser";
import { ReferrerPrefixParser } from "./workspace/referrer-prefix-context-parser";
import { SnapshotContextParser } from "./workspace/snapshot-context-parser";
Expand Down Expand Up @@ -169,11 +165,6 @@ export const productionContainerModule = new ContainerModule(
bind(ServerFactory).toAutoFactory(GitpodServerImpl);
bind(UserController).toSelf().inSingletonScope();

bind(MessagebusConfiguration).toSelf().inSingletonScope();
bind(MessageBusHelper).to(MessageBusHelperImpl).inSingletonScope();
bind(MessageBusIntegration).toSelf().inSingletonScope();
bind(LocalMessageBroker).to(LocalRabbitMQBackedMessageBroker).inSingletonScope();

bind(GitpodServerImpl).toSelf();
bind(WebsocketConnectionManager)
.toDynamicValue((ctx) => {
Expand Down
185 changes: 1 addition & 184 deletions components/server/src/messaging/local-message-broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,8 @@
* See License.AGPL.txt in the project root for license information.
*/

import {
Disposable,
DisposableCollection,
HeadlessWorkspaceEvent,
PrebuildWithStatus,
WorkspaceInstance,
} from "@gitpod/gitpod-protocol";
import { log } from "@gitpod/gitpod-protocol/lib/util/logging";
import { HeadlessWorkspaceEvent, PrebuildWithStatus, WorkspaceInstance } from "@gitpod/gitpod-protocol";
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
import { inject, injectable } from "inversify";
import { MessageBusIntegration } from "../workspace/messagebus-integration";
import { getExperimentsClientForBackend } from "@gitpod/gitpod-protocol/lib/experiments/configcat-server";

export interface PrebuildUpdateListener {
(ctx: TraceContext, evt: PrebuildWithStatus): void;
Expand All @@ -26,176 +16,3 @@ export interface HeadlessWorkspaceEventListener {
export interface WorkspaceInstanceUpdateListener {
(ctx: TraceContext, instance: WorkspaceInstance): void;
}

export const LocalMessageBroker = Symbol("LocalMessageBroker");
export interface LocalMessageBroker {
start(): Promise<void>;

stop(): Promise<void>;

listenForPrebuildUpdates(projectId: string, listener: PrebuildUpdateListener): Disposable;

listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable;

listenForWorkspaceInstanceUpdates(userId: string, listener: WorkspaceInstanceUpdateListener): Disposable;
}

/**
* With our current code we basically create O(ws*p) queues for every user (ws = nr of websockets connections,
* p = nr of projects). This already breaks production regularly, as each queue consumes memory (and in
* consequence: CPU) on the messagebus. This is unnecessary in 90% of the cases, as we don't use that updates anyways.
*
* The core issue here is two-fold:
* 1) we create a lot of websocket connections: 1 for each dashboard tab, 2 for each workspace (frontend + supervisor)
* 2) we currently create new queues for each websocket connection: because of how the contracts around messages between server and dashboard evolved this is not trivial to change.
*
* To mitigate this (and also to pave the path for a switch to a less complicated message distribution technology) we
* introduce this LocalMessageBroker. It is meant to be a 100% backwards compatible replacement so we don't need to touch the
* dashboard/server logic w.r.t. to expected messages.
* It creates one queue per topic with rabbitmq, and does the distribution internally (per/to server instance).
*/
@injectable()
export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
static readonly UNDEFINED_KEY = "undefined";

@inject(MessageBusIntegration) protected readonly messageBusIntegration: MessageBusIntegration;

protected prebuildUpdateListeners: Map<string, PrebuildUpdateListener[]> = new Map();
protected headlessWorkspaceEventListeners: Map<string, HeadlessWorkspaceEventListener[]> = new Map();
protected workspaceInstanceUpdateListeners: Map<string, WorkspaceInstanceUpdateListener[]> = new Map();

protected readonly disposables = new DisposableCollection();

async start() {
this.disposables.push(
this.messageBusIntegration.listenForPrebuildUpdates(
undefined,
async (ctx: TraceContext, update: PrebuildWithStatus) => {
const enabled = await this.isRedisPubSubByTypeEnabled("prebuild");
if (enabled) {
log.debug("[messagebus] Prebuild listener is disabled through feature flag");
return;
}
TraceContext.setOWI(ctx, { workspaceId: update.info.buildWorkspaceId });

const listeners = this.prebuildUpdateListeners.get(update.info.projectId) || [];
for (const l of listeners) {
try {
l(ctx, update);
} catch (err) {
TraceContext.setError(ctx, err);
log.error(
{ userId: update.info.userId, workspaceId: update.info.buildWorkspaceId },
"listenForPrebuildUpdates",
err,
{ projectId: update.info.projectId, prebuildId: update.info.id },
);
}
}
},
),
);
this.disposables.push(
this.messageBusIntegration.listenForPrebuildUpdatableQueue(
async (ctx: TraceContext, evt: HeadlessWorkspaceEvent) => {
const enabled = await this.isRedisPubSubByTypeEnabled("prebuild-updatable");
if (enabled) {
log.debug("[messagebus] Prebuild updatable listener is disabled through feature flag");
return;
}
TraceContext.setOWI(ctx, { workspaceId: evt.workspaceID });

const listeners =
this.headlessWorkspaceEventListeners.get(LocalRabbitMQBackedMessageBroker.UNDEFINED_KEY) || [];
for (const l of listeners) {
try {
l(ctx, evt);
} catch (err) {
TraceContext.setError(ctx, err);
log.error({ workspaceId: evt.workspaceID }, "listenForPrebuildUpdatableQueue", err);
}
}
},
),
);
this.disposables.push(
this.messageBusIntegration.listenForWorkspaceInstanceUpdates(
undefined,
async (ctx: TraceContext, instance: WorkspaceInstance, userId: string | undefined) => {
const enabled = await this.isRedisPubSubByTypeEnabled("workspace-instance");
if (enabled) {
log.debug("[messagebus] Workspace instance listner is disabled through feature flag");
return;
}

TraceContext.setOWI(ctx, { userId, instanceId: instance.id });

if (!userId) {
return;
}

const listeners = this.workspaceInstanceUpdateListeners.get(userId) || [];
for (const l of listeners) {
try {
l(ctx, instance);
} catch (err) {
TraceContext.setError(ctx, err);
log.error({ userId, instanceId: instance.id }, "listenForWorkspaceInstanceUpdates", err);
}
}
},
),
);
}

async stop() {
this.disposables.dispose();
}

listenForPrebuildUpdates(projectId: string, listener: PrebuildUpdateListener): Disposable {
return this.doRegister(projectId, listener, this.prebuildUpdateListeners);
}

listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable {
// we're being cheap here in re-using a map where it just needs to be a plain array.
return this.doRegister(
LocalRabbitMQBackedMessageBroker.UNDEFINED_KEY,
listener,
this.headlessWorkspaceEventListeners,
);
}

listenForWorkspaceInstanceUpdates(userId: string, listener: WorkspaceInstanceUpdateListener): Disposable {
return this.doRegister(userId, listener, this.workspaceInstanceUpdateListeners);
}

protected doRegister<L>(key: string, listener: L, listenersStore: Map<string, L[]>): Disposable {
let listeners = listenersStore.get(key);
if (listeners === undefined) {
listeners = [];
listenersStore.set(key, listeners);
}
listeners.push(listener);
return Disposable.create(() => {
const ls = listeners!;
const idx = ls.findIndex((l) => l === listener);
if (idx !== -1) {
ls.splice(idx, 1);
}
if (ls.length === 0) {
listenersStore.delete(key);
}
});
}

private async isRedisPubSubByTypeEnabled(
type: "workspace-instance" | "prebuild" | "prebuild-updatable",
): Promise<boolean> {
const enabledTypes = await getExperimentsClientForBackend().getValueAsync(
"enableRedisPubSubByUpdateType",
"none",
{},
);
return enabledTypes.indexOf(type) >= 0;
}
}
54 changes: 5 additions & 49 deletions components/server/src/messaging/redis-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

import {
HeadlessWorkspaceEventListener,
LocalMessageBroker,
LocalRabbitMQBackedMessageBroker,
PrebuildUpdateListener,
WorkspaceInstanceUpdateListener,
} from "./local-message-broker";
Expand All @@ -24,7 +22,6 @@ import {
WorkspaceInstanceUpdatesChannel,
} from "@gitpod/gitpod-protocol";
import { log } from "@gitpod/gitpod-protocol/lib/util/logging";
import { getExperimentsClientForBackend } from "@gitpod/gitpod-protocol/lib/experiments/configcat-server";
import {
reportRedisUpdateCompleted,
reportRedisUpdateReceived,
Expand All @@ -33,8 +30,10 @@ import {
import { Redis } from "ioredis";
import { WorkspaceDB } from "@gitpod/gitpod-db/lib";

const UNDEFINED_KEY = "undefined";

@injectable()
export class RedisSubscriber implements LocalMessageBroker {
export class RedisSubscriber {
constructor(
@inject(Redis) private readonly redis: Redis,
@inject(WorkspaceDB) private readonly workspaceDB: WorkspaceDB,
Expand Down Expand Up @@ -73,37 +72,12 @@ export class RedisSubscriber implements LocalMessageBroker {
private async onMessage(channel: string, message: string): Promise<void> {
switch (channel) {
case WorkspaceInstanceUpdatesChannel:
const wsInstanceTypeEnabled = await this.isRedisPubSubByTypeEnabled("workspace-instance");
if (!wsInstanceTypeEnabled) {
log.debug("[redis] Redis workspace instance update is disabled through feature flag", {
channel,
message,
});
return;
}

return this.onInstanceUpdate(JSON.parse(message) as RedisWorkspaceInstanceUpdate);

case PrebuildUpdatesChannel:
const prebuildTypeEnabled = await this.isRedisPubSubByTypeEnabled("prebuild");
if (!prebuildTypeEnabled) {
log.debug("[redis] Redis prebuild update is disabled through feature flag", {
channel,
message,
});
return;
}
return this.onPrebuildUpdate(JSON.parse(message) as RedisPrebuildUpdate);

case HeadlessUpdatesChannel:
const headlessTypeEnabled = await this.isRedisPubSubByTypeEnabled("prebuild-updatable");
if (!headlessTypeEnabled) {
log.debug("[redis] Redis headless update is disabled through feature flag", {
channel,
message,
});
return;
}
return this.onHeadlessUpdate(JSON.parse(message) as RedisHeadlessUpdate);

default:
Expand Down Expand Up @@ -185,8 +159,7 @@ export class RedisSubscriber implements LocalMessageBroker {
return;
}

const listeners =
this.headlessWorkspaceEventListeners.get(LocalRabbitMQBackedMessageBroker.UNDEFINED_KEY) || [];
const listeners = this.headlessWorkspaceEventListeners.get(UNDEFINED_KEY) || [];
if (listeners.length === 0) {
return;
}
Expand All @@ -211,12 +184,7 @@ export class RedisSubscriber implements LocalMessageBroker {

listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable {
// we're being cheap here in re-using a map where it just needs to be a plain array.
return this.doRegister(
LocalRabbitMQBackedMessageBroker.UNDEFINED_KEY,
listener,
this.headlessWorkspaceEventListeners,
"prebuild-updatable",
);
return this.doRegister(UNDEFINED_KEY, listener, this.headlessWorkspaceEventListeners, "prebuild-updatable");
}

listenForWorkspaceInstanceUpdates(userId: string, listener: WorkspaceInstanceUpdateListener): Disposable {
Expand Down Expand Up @@ -248,16 +216,4 @@ export class RedisSubscriber implements LocalMessageBroker {
}
});
}

private async isRedisPubSubByTypeEnabled(
type: "workspace-instance" | "prebuild" | "prebuild-updatable",
): Promise<boolean> {
const enabledTypes = await getExperimentsClientForBackend().getValueAsync(
"enableRedisPubSubByUpdateType",
"none",
{},
);
log.debug("Enabled types in redis publisher:", enabledTypes);
return enabledTypes.indexOf(type) >= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import {
WorkspaceConfig,
} from "@gitpod/gitpod-protocol";
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
import { LocalMessageBroker } from "../messaging/local-message-broker";
import { repeat } from "@gitpod/gitpod-protocol/lib/util/repeat";
import { RedisSubscriber } from "../messaging/redis-subscriber";

Expand All @@ -44,7 +43,6 @@ export type AuthenticatedGithubProvider = (
export class PrebuildStatusMaintainer implements Disposable {
constructor(
@inject(TracedWorkspaceDB) private readonly workspaceDB: DBWithTracing<WorkspaceDB>,
@inject(LocalMessageBroker) private readonly localMessageBroker: LocalMessageBroker,
@inject(RedisSubscriber) private readonly subscriber: RedisSubscriber,
) {}

Expand All @@ -56,9 +54,6 @@ export class PrebuildStatusMaintainer implements Disposable {
this.githubApiProvider = githubApiProvider;

this.disposables.pushAll([
this.localMessageBroker.listenForPrebuildUpdatableEvents((ctx, msg) =>
this.handlePrebuildFinished(ctx, msg),
),
this.subscriber.listenForPrebuildUpdatableEvents((ctx, msg) => this.handlePrebuildFinished(ctx, msg)),
]);
this.disposables.push(repeat(this.periodicUpdatableCheck.bind(this), MAX_UPDATABLE_AGE / 2));
Expand Down
Loading