Skip to content

Commit 91f7e8c

Browse files
committed
[server] Remove messagebus
1 parent 765aa36 commit 91f7e8c

File tree

8 files changed

+9
-471
lines changed

8 files changed

+9
-471
lines changed

components/server/src/container-module.ts

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66

77
import { ContainerModule } from "inversify";
88

9-
import { MessageBusHelper, MessageBusHelperImpl } from "@gitpod/gitpod-messagebus/lib";
10-
import { MessagebusConfiguration } from "@gitpod/gitpod-messagebus/lib/config";
119
import { IAnalyticsWriter } from "@gitpod/gitpod-protocol/lib/analytics";
1210
import {
1311
ConfigCatClientFactory,
@@ -73,7 +71,6 @@ import { WebhookEventGarbageCollector } from "./jobs/webhook-gc";
7371
import { WorkspaceGarbageCollector } from "./jobs/workspace-gc";
7472
import { LinkedInService } from "./linkedin-service";
7573
import { LivenessController } from "./liveness/liveness-controller";
76-
import { LocalMessageBroker, LocalRabbitMQBackedMessageBroker } from "./messaging/local-message-broker";
7774
import { MonitoringEndpointsApp } from "./monitoring-endpoints";
7875
import { OAuthController } from "./oauth-server/oauth-controller";
7976
import { OneTimeSecretServer } from "./one-time-secret-server";
@@ -117,7 +114,6 @@ import { HeadlessLogController } from "./workspace/headless-log-controller";
117114
import { HeadlessLogService } from "./workspace/headless-log-service";
118115
import { ImageSourceProvider } from "./workspace/image-source-provider";
119116
import { ImageBuildPrefixContextParser } from "./workspace/imagebuild-prefix-context-parser";
120-
import { MessageBusIntegration } from "./workspace/messagebus-integration";
121117
import { OpenPrebuildPrefixContextParser } from "./workspace/open-prebuild-prefix-context-parser";
122118
import { ReferrerPrefixParser } from "./workspace/referrer-prefix-context-parser";
123119
import { SnapshotContextParser } from "./workspace/snapshot-context-parser";
@@ -169,11 +165,6 @@ export const productionContainerModule = new ContainerModule(
169165
bind(ServerFactory).toAutoFactory(GitpodServerImpl);
170166
bind(UserController).toSelf().inSingletonScope();
171167

172-
bind(MessagebusConfiguration).toSelf().inSingletonScope();
173-
bind(MessageBusHelper).to(MessageBusHelperImpl).inSingletonScope();
174-
bind(MessageBusIntegration).toSelf().inSingletonScope();
175-
bind(LocalMessageBroker).to(LocalRabbitMQBackedMessageBroker).inSingletonScope();
176-
177168
bind(GitpodServerImpl).toSelf();
178169
bind(WebsocketConnectionManager)
179170
.toDynamicValue((ctx) => {

components/server/src/messaging/local-message-broker.ts

Lines changed: 1 addition & 184 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,8 @@
44
* See License.AGPL.txt in the project root for license information.
55
*/
66

7-
import {
8-
Disposable,
9-
DisposableCollection,
10-
HeadlessWorkspaceEvent,
11-
PrebuildWithStatus,
12-
WorkspaceInstance,
13-
} from "@gitpod/gitpod-protocol";
14-
import { log } from "@gitpod/gitpod-protocol/lib/util/logging";
7+
import { HeadlessWorkspaceEvent, PrebuildWithStatus, WorkspaceInstance } from "@gitpod/gitpod-protocol";
158
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
16-
import { inject, injectable } from "inversify";
17-
import { MessageBusIntegration } from "../workspace/messagebus-integration";
18-
import { getExperimentsClientForBackend } from "@gitpod/gitpod-protocol/lib/experiments/configcat-server";
199

2010
export interface PrebuildUpdateListener {
2111
(ctx: TraceContext, evt: PrebuildWithStatus): void;
@@ -26,176 +16,3 @@ export interface HeadlessWorkspaceEventListener {
2616
export interface WorkspaceInstanceUpdateListener {
2717
(ctx: TraceContext, instance: WorkspaceInstance): void;
2818
}
29-
30-
export const LocalMessageBroker = Symbol("LocalMessageBroker");
31-
export interface LocalMessageBroker {
32-
start(): Promise<void>;
33-
34-
stop(): Promise<void>;
35-
36-
listenForPrebuildUpdates(projectId: string, listener: PrebuildUpdateListener): Disposable;
37-
38-
listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable;
39-
40-
listenForWorkspaceInstanceUpdates(userId: string, listener: WorkspaceInstanceUpdateListener): Disposable;
41-
}
42-
43-
/**
44-
* With our current code we basically create O(ws*p) queues for every user (ws = nr of websockets connections,
45-
* p = nr of projects). This already breaks production regularly, as each queue consumes memory (and in
46-
* consequence: CPU) on the messagebus. This is unnecessary in 90% of the cases, as we don't use that updates anyways.
47-
*
48-
* The core issue here is two-fold:
49-
* 1) we create a lot of websocket connections: 1 for each dashboard tab, 2 for each workspace (frontend + supervisor)
50-
* 2) we currently create new queues for each websocket connection: because of how the contracts around messages between server and dashboard evolved this is not trivial to change.
51-
*
52-
* To mitigate this (and also to pave the path for a switch to a less complicated message distribution technology) we
53-
* introduce this LocalMessageBroker. It is meant to be a 100% backwards compatible replacement so we don't need to touch the
54-
* dashboard/server logic w.r.t. to expected messages.
55-
* It creates one queue per topic with rabbitmq, and does the distribution internally (per/to server instance).
56-
*/
57-
@injectable()
58-
export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
59-
static readonly UNDEFINED_KEY = "undefined";
60-
61-
@inject(MessageBusIntegration) protected readonly messageBusIntegration: MessageBusIntegration;
62-
63-
protected prebuildUpdateListeners: Map<string, PrebuildUpdateListener[]> = new Map();
64-
protected headlessWorkspaceEventListeners: Map<string, HeadlessWorkspaceEventListener[]> = new Map();
65-
protected workspaceInstanceUpdateListeners: Map<string, WorkspaceInstanceUpdateListener[]> = new Map();
66-
67-
protected readonly disposables = new DisposableCollection();
68-
69-
async start() {
70-
this.disposables.push(
71-
this.messageBusIntegration.listenForPrebuildUpdates(
72-
undefined,
73-
async (ctx: TraceContext, update: PrebuildWithStatus) => {
74-
const enabled = await this.isRedisPubSubByTypeEnabled("prebuild");
75-
if (enabled) {
76-
log.debug("[messagebus] Prebuild listener is disabled through feature flag");
77-
return;
78-
}
79-
TraceContext.setOWI(ctx, { workspaceId: update.info.buildWorkspaceId });
80-
81-
const listeners = this.prebuildUpdateListeners.get(update.info.projectId) || [];
82-
for (const l of listeners) {
83-
try {
84-
l(ctx, update);
85-
} catch (err) {
86-
TraceContext.setError(ctx, err);
87-
log.error(
88-
{ userId: update.info.userId, workspaceId: update.info.buildWorkspaceId },
89-
"listenForPrebuildUpdates",
90-
err,
91-
{ projectId: update.info.projectId, prebuildId: update.info.id },
92-
);
93-
}
94-
}
95-
},
96-
),
97-
);
98-
this.disposables.push(
99-
this.messageBusIntegration.listenForPrebuildUpdatableQueue(
100-
async (ctx: TraceContext, evt: HeadlessWorkspaceEvent) => {
101-
const enabled = await this.isRedisPubSubByTypeEnabled("prebuild-updatable");
102-
if (enabled) {
103-
log.debug("[messagebus] Prebuild updatable listener is disabled through feature flag");
104-
return;
105-
}
106-
TraceContext.setOWI(ctx, { workspaceId: evt.workspaceID });
107-
108-
const listeners =
109-
this.headlessWorkspaceEventListeners.get(LocalRabbitMQBackedMessageBroker.UNDEFINED_KEY) || [];
110-
for (const l of listeners) {
111-
try {
112-
l(ctx, evt);
113-
} catch (err) {
114-
TraceContext.setError(ctx, err);
115-
log.error({ workspaceId: evt.workspaceID }, "listenForPrebuildUpdatableQueue", err);
116-
}
117-
}
118-
},
119-
),
120-
);
121-
this.disposables.push(
122-
this.messageBusIntegration.listenForWorkspaceInstanceUpdates(
123-
undefined,
124-
async (ctx: TraceContext, instance: WorkspaceInstance, userId: string | undefined) => {
125-
const enabled = await this.isRedisPubSubByTypeEnabled("workspace-instance");
126-
if (enabled) {
127-
log.debug("[messagebus] Workspace instance listner is disabled through feature flag");
128-
return;
129-
}
130-
131-
TraceContext.setOWI(ctx, { userId, instanceId: instance.id });
132-
133-
if (!userId) {
134-
return;
135-
}
136-
137-
const listeners = this.workspaceInstanceUpdateListeners.get(userId) || [];
138-
for (const l of listeners) {
139-
try {
140-
l(ctx, instance);
141-
} catch (err) {
142-
TraceContext.setError(ctx, err);
143-
log.error({ userId, instanceId: instance.id }, "listenForWorkspaceInstanceUpdates", err);
144-
}
145-
}
146-
},
147-
),
148-
);
149-
}
150-
151-
async stop() {
152-
this.disposables.dispose();
153-
}
154-
155-
listenForPrebuildUpdates(projectId: string, listener: PrebuildUpdateListener): Disposable {
156-
return this.doRegister(projectId, listener, this.prebuildUpdateListeners);
157-
}
158-
159-
listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable {
160-
// we're being cheap here in re-using a map where it just needs to be a plain array.
161-
return this.doRegister(
162-
LocalRabbitMQBackedMessageBroker.UNDEFINED_KEY,
163-
listener,
164-
this.headlessWorkspaceEventListeners,
165-
);
166-
}
167-
168-
listenForWorkspaceInstanceUpdates(userId: string, listener: WorkspaceInstanceUpdateListener): Disposable {
169-
return this.doRegister(userId, listener, this.workspaceInstanceUpdateListeners);
170-
}
171-
172-
protected doRegister<L>(key: string, listener: L, listenersStore: Map<string, L[]>): Disposable {
173-
let listeners = listenersStore.get(key);
174-
if (listeners === undefined) {
175-
listeners = [];
176-
listenersStore.set(key, listeners);
177-
}
178-
listeners.push(listener);
179-
return Disposable.create(() => {
180-
const ls = listeners!;
181-
const idx = ls.findIndex((l) => l === listener);
182-
if (idx !== -1) {
183-
ls.splice(idx, 1);
184-
}
185-
if (ls.length === 0) {
186-
listenersStore.delete(key);
187-
}
188-
});
189-
}
190-
191-
private async isRedisPubSubByTypeEnabled(
192-
type: "workspace-instance" | "prebuild" | "prebuild-updatable",
193-
): Promise<boolean> {
194-
const enabledTypes = await getExperimentsClientForBackend().getValueAsync(
195-
"enableRedisPubSubByUpdateType",
196-
"none",
197-
{},
198-
);
199-
return enabledTypes.indexOf(type) >= 0;
200-
}
201-
}

components/server/src/messaging/redis-subscriber.ts

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66

77
import {
88
HeadlessWorkspaceEventListener,
9-
LocalMessageBroker,
10-
LocalRabbitMQBackedMessageBroker,
119
PrebuildUpdateListener,
1210
WorkspaceInstanceUpdateListener,
1311
} from "./local-message-broker";
@@ -33,8 +31,10 @@ import {
3331
import { Redis } from "ioredis";
3432
import { WorkspaceDB } from "@gitpod/gitpod-db/lib";
3533

34+
const UNDEFINED_KEY = "undefined";
35+
3636
@injectable()
37-
export class RedisSubscriber implements LocalMessageBroker {
37+
export class RedisSubscriber {
3838
constructor(
3939
@inject(Redis) private readonly redis: Redis,
4040
@inject(WorkspaceDB) private readonly workspaceDB: WorkspaceDB,
@@ -185,8 +185,7 @@ export class RedisSubscriber implements LocalMessageBroker {
185185
return;
186186
}
187187

188-
const listeners =
189-
this.headlessWorkspaceEventListeners.get(LocalRabbitMQBackedMessageBroker.UNDEFINED_KEY) || [];
188+
const listeners = this.headlessWorkspaceEventListeners.get(UNDEFINED_KEY) || [];
190189
if (listeners.length === 0) {
191190
return;
192191
}
@@ -211,12 +210,7 @@ export class RedisSubscriber implements LocalMessageBroker {
211210

212211
listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable {
213212
// we're being cheap here in re-using a map where it just needs to be a plain array.
214-
return this.doRegister(
215-
LocalRabbitMQBackedMessageBroker.UNDEFINED_KEY,
216-
listener,
217-
this.headlessWorkspaceEventListeners,
218-
"prebuild-updatable",
219-
);
213+
return this.doRegister(UNDEFINED_KEY, listener, this.headlessWorkspaceEventListeners, "prebuild-updatable");
220214
}
221215

222216
listenForWorkspaceInstanceUpdates(userId: string, listener: WorkspaceInstanceUpdateListener): Disposable {

components/server/src/prebuilds/prebuilt-status-maintainer.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import {
1818
WorkspaceConfig,
1919
} from "@gitpod/gitpod-protocol";
2020
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
21-
import { LocalMessageBroker } from "../messaging/local-message-broker";
2221
import { repeat } from "@gitpod/gitpod-protocol/lib/util/repeat";
2322
import { RedisSubscriber } from "../messaging/redis-subscriber";
2423

@@ -44,7 +43,6 @@ export type AuthenticatedGithubProvider = (
4443
export class PrebuildStatusMaintainer implements Disposable {
4544
constructor(
4645
@inject(TracedWorkspaceDB) private readonly workspaceDB: DBWithTracing<WorkspaceDB>,
47-
@inject(LocalMessageBroker) private readonly localMessageBroker: LocalMessageBroker,
4846
@inject(RedisSubscriber) private readonly subscriber: RedisSubscriber,
4947
) {}
5048

@@ -56,9 +54,6 @@ export class PrebuildStatusMaintainer implements Disposable {
5654
this.githubApiProvider = githubApiProvider;
5755

5856
this.disposables.pushAll([
59-
this.localMessageBroker.listenForPrebuildUpdatableEvents((ctx, msg) =>
60-
this.handlePrebuildFinished(ctx, msg),
61-
),
6257
this.subscriber.listenForPrebuildUpdatableEvents((ctx, msg) => this.handlePrebuildFinished(ctx, msg)),
6358
]);
6459
this.disposables.push(repeat(this.periodicUpdatableCheck.bind(this), MAX_UPDATABLE_AGE / 2));

components/server/src/server.ts

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import { toIWebSocket } from "@gitpod/gitpod-protocol/lib/messaging/node/connect
1919
import { WsExpressHandler, WsRequestHandler } from "./express/ws-handler";
2020
import { isAllowedWebsocketDomain, bottomErrorHandler, unhandledToError } from "./express-util";
2121
import { createWebSocketConnection } from "vscode-ws-jsonrpc/lib";
22-
import { MessageBusIntegration } from "./workspace/messagebus-integration";
2322
import { log } from "@gitpod/gitpod-protocol/lib/util/logging";
2423
import { AddressInfo } from "net";
2524
import { WorkspaceDownloadService } from "./workspace/workspace-download-service";
@@ -41,7 +40,6 @@ import {
4140
import { NewsletterSubscriptionController } from "./user/newsletter-subscription-controller";
4241
import { Config } from "./config";
4342
import { DebugApp } from "@gitpod/gitpod-protocol/lib/util/debug-app";
44-
import { LocalMessageBroker } from "./messaging/local-message-broker";
4543
import { WsConnectionHandler } from "./express/ws-connection-handler";
4644
import { LivenessController } from "./liveness/liveness-controller";
4745
import { IamSessionApp } from "./iam/iam-session-app";
@@ -77,8 +75,6 @@ export class Server {
7775
@inject(Authenticator) private readonly authenticator: Authenticator,
7876
@inject(UserController) private readonly userController: UserController,
7977
@inject(WebsocketConnectionManager) private readonly websocketConnectionHandler: WebsocketConnectionManager,
80-
@inject(MessageBusIntegration) private readonly messagebus: MessageBusIntegration,
81-
@inject(LocalMessageBroker) private readonly localMessageBroker: LocalMessageBroker,
8278
@inject(WorkspaceDownloadService) private readonly workspaceDownloadService: WorkspaceDownloadService,
8379
@inject(LivenessController) private readonly livenessController: LivenessController,
8480
@inject(MonitoringEndpointsApp) private readonly monitoringEndpointsApp: MonitoringEndpointsApp,
@@ -269,13 +265,6 @@ export class Server {
269265
this.installWebsocketConnectionGauge();
270266
this.installWebsocketClientContextGauge();
271267

272-
// Connect to message bus
273-
await this.messagebus.connect();
274-
275-
// Start local message broker
276-
await this.localMessageBroker.start();
277-
this.disposables.push(Disposable.create(() => this.localMessageBroker.stop().catch(log.error)));
278-
279268
await this.redisSubscriber.start();
280269
this.disposables.push(Disposable.create(() => this.redisSubscriber.stop().catch(log.error)));
281270

0 commit comments

Comments
 (0)