@@ -89,15 +89,7 @@ export function runShapeStream<TRunTypes extends AnyRunTypes>(
89
89
) : RunSubscription < TRunTypes > {
90
90
const abortController = new AbortController ( ) ;
91
91
92
- const version1 = new SSEStreamSubscriptionFactory (
93
- getEnvVar ( "TRIGGER_STREAM_URL" , getEnvVar ( "TRIGGER_API_URL" ) ) ?? "https://api.trigger.dev" ,
94
- {
95
- headers : options ?. headers ,
96
- signal : abortController . signal ,
97
- }
98
- ) ;
99
-
100
- const version2 = new ElectricStreamSubscriptionFactory (
92
+ const streamFactory = new SSEStreamSubscriptionFactory (
101
93
getEnvVar ( "TRIGGER_STREAM_URL" , getEnvVar ( "TRIGGER_API_URL" ) ) ?? "https://api.trigger.dev" ,
102
94
{
103
95
headers : options ?. headers ,
@@ -124,7 +116,7 @@ export function runShapeStream<TRunTypes extends AnyRunTypes>(
124
116
const $options : RunSubscriptionOptions = {
125
117
runShapeStream : runStreamInstance . stream ,
126
118
stopRunShapeStream : ( ) => runStreamInstance . stop ( 30 * 1000 ) ,
127
- streamFactory : new VersionedStreamSubscriptionFactory ( version1 , version2 ) ,
119
+ streamFactory : streamFactory ,
128
120
abortController,
129
121
...options ,
130
122
} ;
@@ -138,12 +130,7 @@ export interface StreamSubscription {
138
130
}
139
131
140
132
export interface StreamSubscriptionFactory {
141
- createSubscription (
142
- metadata : Record < string , unknown > ,
143
- runId : string ,
144
- streamKey : string ,
145
- baseUrl ?: string
146
- ) : StreamSubscription ;
133
+ createSubscription ( runId : string , streamKey : string , baseUrl ?: string ) : StreamSubscription ;
147
134
}
148
135
149
136
// Real implementation for production
@@ -194,12 +181,7 @@ export class SSEStreamSubscriptionFactory implements StreamSubscriptionFactory {
194
181
private options : { headers ?: Record < string , string > ; signal ?: AbortSignal }
195
182
) { }
196
183
197
- createSubscription (
198
- metadata : Record < string , unknown > ,
199
- runId : string ,
200
- streamKey : string ,
201
- baseUrl ?: string
202
- ) : StreamSubscription {
184
+ createSubscription ( runId : string , streamKey : string , baseUrl ?: string ) : StreamSubscription {
203
185
if ( ! runId || ! streamKey ) {
204
186
throw new Error ( "runId and streamKey are required" ) ;
205
187
}
@@ -238,63 +220,6 @@ export class ElectricStreamSubscription implements StreamSubscription {
238
220
}
239
221
}
240
222
241
- export class ElectricStreamSubscriptionFactory implements StreamSubscriptionFactory {
242
- constructor (
243
- private baseUrl : string ,
244
- private options : { headers ?: Record < string , string > ; signal ?: AbortSignal }
245
- ) { }
246
-
247
- createSubscription (
248
- metadata : Record < string , unknown > ,
249
- runId : string ,
250
- streamKey : string ,
251
- baseUrl ?: string
252
- ) : StreamSubscription {
253
- if ( ! runId || ! streamKey ) {
254
- throw new Error ( "runId and streamKey are required" ) ;
255
- }
256
-
257
- return new ElectricStreamSubscription (
258
- `${ baseUrl ?? this . baseUrl } /realtime/v2/streams/${ runId } /${ streamKey } ` ,
259
- this . options
260
- ) ;
261
- }
262
- }
263
-
264
- export class VersionedStreamSubscriptionFactory implements StreamSubscriptionFactory {
265
- constructor (
266
- private version1 : StreamSubscriptionFactory ,
267
- private version2 : StreamSubscriptionFactory
268
- ) { }
269
-
270
- createSubscription (
271
- metadata : Record < string , unknown > ,
272
- runId : string ,
273
- streamKey : string ,
274
- baseUrl ?: string
275
- ) : StreamSubscription {
276
- if ( ! runId || ! streamKey ) {
277
- throw new Error ( "runId and streamKey are required" ) ;
278
- }
279
-
280
- const version =
281
- typeof metadata . $$streamsVersion === "string" ? metadata . $$streamsVersion : "v1" ;
282
-
283
- const $baseUrl =
284
- typeof metadata . $$streamsBaseUrl === "string" ? metadata . $$streamsBaseUrl : baseUrl ;
285
-
286
- if ( version === "v1" ) {
287
- return this . version1 . createSubscription ( metadata , runId , streamKey , $baseUrl ) ;
288
- }
289
-
290
- if ( version === "v2" ) {
291
- return this . version2 . createSubscription ( metadata , runId , streamKey , $baseUrl ) ;
292
- }
293
-
294
- throw new Error ( `Unknown stream version: ${ version } ` ) ;
295
- }
296
- }
297
-
298
223
export interface RunShapeProvider {
299
224
onShape ( callback : ( shape : SubscribeRunRawShape ) => Promise < void > ) : Promise < ( ) => void > ;
300
225
}
@@ -385,7 +310,6 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
385
310
activeStreams . add ( streamKey ) ;
386
311
387
312
const subscription = this . options . streamFactory . createSubscription (
388
- run . metadata ,
389
313
run . id ,
390
314
streamKey ,
391
315
this . options . client ?. baseUrl
0 commit comments