@@ -25,10 +25,14 @@ import {
25
25
updateSubscribersRegistered ,
26
26
} from "../prometheus-metrics" ;
27
27
import { Redis } from "ioredis" ;
28
+ import { WorkspaceDB } from "@gitpod/gitpod-db/lib" ;
28
29
29
30
@injectable ( )
30
31
export class RedisSubscriber implements LocalMessageBroker {
31
- constructor ( @inject ( Redis ) private readonly redis : Redis ) { }
32
+ constructor (
33
+ @inject ( Redis ) private readonly redis : Redis ,
34
+ @inject ( WorkspaceDB ) private readonly workspaceDB : WorkspaceDB ,
35
+ ) { }
32
36
33
37
protected workspaceInstanceUpdateListeners : Map < string , WorkspaceInstanceUpdateListener [ ] > = new Map ( ) ;
34
38
@@ -79,6 +83,33 @@ export class RedisSubscriber implements LocalMessageBroker {
79
83
80
84
private async onInstanceUpdate ( update : RedisWorkspaceInstanceUpdate ) : Promise < void > {
81
85
log . debug ( "[redis] Received instance update" , { update } ) ;
86
+
87
+ if ( ! update . ownerID || ! update . instanceID ) {
88
+ return ;
89
+ }
90
+
91
+ const listeners = this . workspaceInstanceUpdateListeners . get ( update . ownerID ) || [ ] ;
92
+ if ( listeners . length === 0 ) {
93
+ return ;
94
+ }
95
+
96
+ const ctx = { } ;
97
+ const instance = await this . workspaceDB . findInstanceById ( update . instanceID ) ;
98
+ if ( ! instance ) {
99
+ return ;
100
+ }
101
+
102
+ for ( const l of listeners ) {
103
+ try {
104
+ l ( ctx , instance ) ;
105
+ } catch ( err ) {
106
+ log . error (
107
+ { userId : update . ownerID , instanceId : instance . id , workspaceId : update . workspaceID } ,
108
+ "Failed to broadcast workspace instance update." ,
109
+ err ,
110
+ ) ;
111
+ }
112
+ }
82
113
}
83
114
84
115
async stop ( ) {
0 commit comments